aboutsummaryrefslogtreecommitdiffstats
path: root/lang/python/pyme
diff options
context:
space:
mode:
Diffstat (limited to 'lang/python/pyme')
-rw-r--r--lang/python/pyme/__init__.py4
-rw-r--r--lang/python/pyme/constants/__init__.py23
-rw-r--r--lang/python/pyme/core.py313
-rw-r--r--lang/python/pyme/errors.py59
-rw-r--r--lang/python/pyme/util.py23
5 files changed, 383 insertions, 39 deletions
diff --git a/lang/python/pyme/__init__.py b/lang/python/pyme/__init__.py
index e377f595..880f6a19 100644
--- a/lang/python/pyme/__init__.py
+++ b/lang/python/pyme/__init__.py
@@ -134,3 +134,7 @@ Base classes are documented at pyme.core.
"""
__all__ = ['core', 'errors', 'constants', 'util', 'callbacks', 'version']
+
+from .core import Context
+from .core import Data
+from .errors import GPGMEError
diff --git a/lang/python/pyme/constants/__init__.py b/lang/python/pyme/constants/__init__.py
index 2e91d769..875b6dc1 100644
--- a/lang/python/pyme/constants/__init__.py
+++ b/lang/python/pyme/constants/__init__.py
@@ -1,6 +1,25 @@
+#!/usr/bin/env python3
+
+# Copyright (C) 2016 g10 Code GmbH
+# Copyright (C) 2004 Igor Belyi <[email protected]>
+# Copyright (C) 2002 John Goerzen <[email protected]>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, see <http://www.gnu.org/licenses/>.
from pyme import util
-util.process_constants('GPGME_', globals())
+constants = util.process_constants('GPGME_', globals())
-__all__ = ['data', 'event', 'import', 'keylist', 'md', 'pk',
+__all__ = constants + \
+ ['data', 'event', 'import', 'keylist', 'md', 'pk',
'protocol', 'sig', 'sigsum', 'status', 'validity']
diff --git a/lang/python/pyme/core.py b/lang/python/pyme/core.py
index 09f71a1a..365d342b 100644
--- a/lang/python/pyme/core.py
+++ b/lang/python/pyme/core.py
@@ -25,6 +25,7 @@ and the 'Data' class describing buffers of data.
"""
import re
+import os
import weakref
from . import pygpgme
from .errors import errorcheck, GPGMEError
@@ -166,6 +167,291 @@ class Context(GpgmeWrapper):
"""
+ def __init__(self, armor=False, textmode=False, offline=False,
+ signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT,
+ wrapped=None):
+ """Construct a context object.
+
+ Keyword arguments:
+ armor -- enable ASCII armoring (default False)
+ textmode -- enable canonical text mode (default False)
+ offline -- do not contact external key sources (default False)
+ signers -- list of keys used for signing (default [])
+ pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT)
+
+ """
+ if wrapped:
+ self.own = False
+ else:
+ tmp = pygpgme.new_gpgme_ctx_t_p()
+ errorcheck(pygpgme.gpgme_new(tmp))
+ wrapped = pygpgme.gpgme_ctx_t_p_value(tmp)
+ pygpgme.delete_gpgme_ctx_t_p(tmp)
+ self.own = True
+ super().__init__(wrapped)
+ self.armor = armor
+ self.textmode = textmode
+ self.offline = offline
+ self.signers = signers
+ self.pinentry_mode = pinentry_mode
+
+ def encrypt(self, plaintext, recipients=[], sign=True, sink=None,
+ passphrase=None, always_trust=False, add_encrypt_to=False,
+ prepare=False, expect_sign=False, compress=True):
+ """Encrypt data
+
+ Encrypt the given plaintext.
+
+ Keyword arguments:
+ recipients -- list of keys to encrypt to
+ sign -- sign plaintext (default True)
+ sink -- write result to sink instead of returning it
+ passphrase -- for symmetric encryption
+ always_trust -- always trust the keys (default False)
+ add_encrypt_to -- encrypt to configured additional keys (default False)
+ prepare -- (ui) prepare for encryption (default False)
+ expect_sign -- (ui) prepare for signing (default False)
+ compress -- compress plaintext (default True)
+
+ Returns:
+ ciphertext -- the encrypted data (or None if sink is given)
+ result -- additional information about the encryption
+ sign_result -- additional information about the signature(s)
+
+ Raises:
+ InvalidRecipients -- if encryption using a particular key failed
+ InvalidSigners -- if signing using a particular key failed
+ GPGMEError -- as signaled by the underlying library
+
+ """
+ ciphertext = sink if sink else Data()
+ flags = 0
+ flags |= always_trust * constants.ENCRYPT_ALWAYS_TRUST
+ flags |= (not add_encrypt_to) * constants.ENCRYPT_NO_ENCRYPT_TO
+ flags |= prepare * constants.ENCRYPT_PREPARE
+ flags |= expect_sign * constants.ENCRYPT_EXPECT_SIGN
+ flags |= (not compress) * constants.ENCRYPT_NO_COMPRESS
+
+ if passphrase != None:
+ old_pinentry_mode = self.pinentry_mode
+ old_passphrase_cb = getattr(self, '_passphrase_cb', None)
+ self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
+ def passphrase_cb(hint, desc, prev_bad, hook=None):
+ return passphrase
+ self.set_passphrase_cb(passphrase_cb)
+
+ try:
+ if sign:
+ self.op_encrypt_sign(recipients, flags, plaintext, ciphertext)
+ else:
+ self.op_encrypt(recipients, flags, plaintext, ciphertext)
+ except errors.GPGMEError as e:
+ if e.getcode() == errors.UNUSABLE_PUBKEY:
+ result = self.op_encrypt_result()
+ if result.invalid_recipients:
+ raise errors.InvalidRecipients(result.invalid_recipients)
+ if e.getcode() == errors.UNUSABLE_SECKEY:
+ sig_result = self.op_sign_result()
+ if sig_result.invalid_signers:
+ raise errors.InvalidSigners(sig_result.invalid_signers)
+ raise
+ finally:
+ if passphrase != None:
+ self.pinentry_mode = old_pinentry_mode
+ if old_passphrase_cb:
+ self.set_passphrase_cb(*old_passphrase_cb[1:])
+
+ result = self.op_encrypt_result()
+ assert not result.invalid_recipients
+ sig_result = self.op_sign_result() if sign else None
+ assert not sig_result or not sig_result.invalid_signers
+
+ cipherbytes = None
+ if not sink:
+ ciphertext.seek(0, os.SEEK_SET)
+ cipherbytes = ciphertext.read()
+ return cipherbytes, result, sig_result
+
+ def decrypt(self, ciphertext, sink=None, passphrase=None, verify=True):
+ """Decrypt data
+
+ Decrypt the given ciphertext and verify any signatures. If
+ VERIFY is an iterable of keys, the ciphertext must be signed
+ by all those keys, otherwise an error is raised.
+
+ Keyword arguments:
+ sink -- write result to sink instead of returning it
+ passphrase -- for symmetric decryption or accessing the key
+ verify -- check signatures (default True)
+
+ Returns:
+ plainttext -- the decrypted data (or None if sink is given)
+ result -- additional information about the decryption
+
+ Raises:
+ UnsupportedAlgorithm -- if an unsupported algorithm was used
+ BadSignatures -- if a bad signature is encountered
+ MissingSignatures -- if expected signatures are missing or bad
+ GPGMEError -- as signaled by the underlying library
+
+ """
+ plaintext = sink if sink else Data()
+
+ if passphrase != None:
+ old_pinentry_mode = self.pinentry_mode
+ old_passphrase_cb = getattr(self, '_passphrase_cb', None)
+ self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
+ def passphrase_cb(hint, desc, prev_bad, hook=None):
+ return passphrase
+ self.set_passphrase_cb(passphrase_cb)
+
+ try:
+ if verify:
+ self.op_decrypt_verify(ciphertext, plaintext)
+ else:
+ self.op_decrypt(ciphertext, plaintext)
+ finally:
+ if passphrase != None:
+ self.pinentry_mode = old_pinentry_mode
+ if old_passphrase_cb:
+ self.set_passphrase_cb(*old_passphrase_cb[1:])
+
+ result = self.op_decrypt_result()
+ verify_result = self.op_verify_result() if verify else None
+ if result.unsupported_algorithm:
+ raise errors.UnsupportedAlgorithm(result.unsupported_algorithm)
+
+ if verify:
+ if any(s.status != errors.NO_ERROR
+ for s in verify_result.signatures):
+ raise errors.BadSignatures(verify_result)
+
+ if verify and verify != True:
+ missing = list()
+ for key in verify:
+ ok = False
+ for subkey in key.subkeys:
+ for sig in verify_result.signatures:
+ if sig.summary & constants.SIGSUM_VALID == 0:
+ continue
+ if subkey.can_sign and subkey.fpr == sig.fpr:
+ ok = True
+ break
+ if ok:
+ break
+ if not ok:
+ missing.append(key)
+ if missing:
+ raise errors.MissingSignatures(verify_result, missing)
+
+ plainbytes = None
+ if not sink:
+ plaintext.seek(0, os.SEEK_SET)
+ plainbytes = plaintext.read()
+ return plainbytes, result, verify_result
+
+ def sign(self, data, sink=None, mode=constants.SIG_MODE_NORMAL):
+ """Sign data
+
+ Sign the given data.
+
+ Keyword arguments:
+ mode -- signature mode (default: normal, see below)
+ sink -- write result to sink instead of returning it
+
+ Returns:
+ either
+ signed_data -- encoded data and signature (normal mode)
+ signature -- only the signature data (detached mode)
+ cleartext -- data and signature as text (cleartext mode)
+ (or None if sink is given)
+ result -- additional information about the signature(s)
+
+ Raises:
+ InvalidSigners -- if signing using a particular key failed
+ GPGMEError -- as signaled by the underlying library
+
+ """
+ signeddata = sink if sink else Data()
+
+ try:
+ self.op_sign(data, signeddata, mode)
+ except errors.GPGMEError as e:
+ if e.getcode() == errors.UNUSABLE_SECKEY:
+ result = self.op_sign_result()
+ if result.invalid_signers:
+ raise errors.InvalidSigners(result.invalid_signers)
+ raise
+
+ result = self.op_sign_result()
+ assert not result.invalid_signers
+
+ signedbytes = None
+ if not sink:
+ signeddata.seek(0, os.SEEK_SET)
+ signedbytes = signeddata.read()
+ return signedbytes, result
+
+ def verify(self, signed_data, signature=None, sink=None, verify=[]):
+ """Verify signatures
+
+ Verify signatures over data. If VERIFY is an iterable of
+ keys, the ciphertext must be signed by all those keys,
+ otherwise an error is raised.
+
+ Keyword arguments:
+ signature -- detached signature data
+ sink -- write result to sink instead of returning it
+
+ Returns:
+ data -- the plain data
+ (or None if sink is given, or we verified a detached signature)
+ result -- additional information about the signature(s)
+
+ Raises:
+ BadSignatures -- if a bad signature is encountered
+ MissingSignatures -- if expected signatures are missing or bad
+ GPGMEError -- as signaled by the underlying library
+
+ """
+ if signature:
+ # Detached signature, we don't return the plain text.
+ data = None
+ else:
+ data = sink if sink else Data()
+
+ if signature:
+ self.op_verify(signature, signed_data, None)
+ else:
+ self.op_verify(signed_data, None, data)
+
+ result = self.op_verify_result()
+ if any(s.status != errors.NO_ERROR for s in result.signatures):
+ raise errors.BadSignatures(result)
+
+ missing = list()
+ for key in verify:
+ ok = False
+ for subkey in key.subkeys:
+ for sig in result.signatures:
+ if sig.summary & constants.SIGSUM_VALID == 0:
+ continue
+ if subkey.can_sign and subkey.fpr == sig.fpr:
+ ok = True
+ break
+ if ok:
+ break
+ if not ok:
+ missing.append(key)
+ if missing:
+ raise errors.MissingSignatures(result, missing)
+
+ plainbytes = None
+ if data and not sink:
+ data.seek(0, os.SEEK_SET)
+ plainbytes = data.read()
+ return plainbytes, result
+
@property
def signers(self):
"""Keys used for signing"""
@@ -204,32 +490,6 @@ class Context(GpgmeWrapper):
return 0
_boolean_properties = {'armor', 'textmode', 'offline'}
- def __init__(self, armor=False, textmode=False, offline=False,
- signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT,
- wrapped=None):
- """Construct a context object
-
- Keyword arguments:
- armor -- enable ASCII armoring (default False)
- textmode -- enable canonical text mode (default False)
- offline -- do not contact external key sources (default False)
- signers -- list of keys used for signing (default [])
- pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT)
- """
- if wrapped:
- self.own = False
- else:
- tmp = pygpgme.new_gpgme_ctx_t_p()
- errorcheck(pygpgme.gpgme_new(tmp))
- wrapped = pygpgme.gpgme_ctx_t_p_value(tmp)
- pygpgme.delete_gpgme_ctx_t_p(tmp)
- self.own = True
- super().__init__(wrapped)
- self.armor = armor
- self.textmode = textmode
- self.offline = offline
- self.signers = signers
- self.pinentry_mode = pinentry_mode
def __del__(self):
if not pygpgme:
@@ -420,6 +680,7 @@ class Context(GpgmeWrapper):
pygpgme.pygpgme_raise_callback_exception(self)
errorcheck(result)
+
class Data(GpgmeWrapper):
"""Data buffer
diff --git a/lang/python/pyme/errors.py b/lang/python/pyme/errors.py
index f96877b6..0194931c 100644
--- a/lang/python/pyme/errors.py
+++ b/lang/python/pyme/errors.py
@@ -20,7 +20,10 @@ from . import util
util.process_constants('GPG_ERR_', globals())
-class GPGMEError(Exception):
+class PymeError(Exception):
+ pass
+
+class GPGMEError(PymeError):
def __init__(self, error = None, message = None):
self.error = error
self.message = message
@@ -43,8 +46,60 @@ class GPGMEError(Exception):
return pygpgme.gpgme_err_source(self.error)
def __str__(self):
- return "%s (%d,%d)"%(self.getstring(), self.getsource(), self.getcode())
+ return self.getstring()
def errorcheck(retval, extradata = None):
if retval:
raise GPGMEError(retval, extradata)
+
+# These errors are raised in the idiomatic interface code.
+
+class EncryptionError(PymeError):
+ pass
+
+class InvalidRecipients(EncryptionError):
+ def __init__(self, recipients):
+ self.recipients = recipients
+ def __str__(self):
+ return ", ".join("{}: {}".format(r.fpr,
+ pygpgme.gpgme_strerror(r.reason))
+ for r in self.recipients)
+
+class DeryptionError(PymeError):
+ pass
+
+class UnsupportedAlgorithm(DeryptionError):
+ def __init__(self, algorithm):
+ self.algorithm = algorithm
+ def __str__(self):
+ return self.algorithm
+
+class SigningError(PymeError):
+ pass
+
+class InvalidSigners(SigningError):
+ def __init__(self, signers):
+ self.signers = signers
+ def __str__(self):
+ return ", ".join("{}: {}".format(s.fpr,
+ pygpgme.gpgme_strerror(s.reason))
+ for s in self.signers)
+
+class VerificationError(PymeError):
+ pass
+
+class BadSignatures(VerificationError):
+ def __init__(self, result):
+ self.result = result
+ def __str__(self):
+ return ", ".join("{}: {}".format(s.fpr,
+ pygpgme.gpgme_strerror(s.status))
+ for s in self.result.signatures
+ if s.status != NO_ERROR)
+
+class MissingSignatures(VerificationError):
+ def __init__(self, result, missing):
+ self.result = result
+ self.missing = missing
+ def __str__(self):
+ return ", ".join(k.subkeys[0].fpr for k in self.missing)
diff --git a/lang/python/pyme/util.py b/lang/python/pyme/util.py
index 5527a1a2..bbd28fe7 100644
--- a/lang/python/pyme/util.py
+++ b/lang/python/pyme/util.py
@@ -1,3 +1,4 @@
+# Copyright (C) 2016 g10 Code GmbH
# Copyright (C) 2004,2008 Igor Belyi <[email protected]>
# Copyright (C) 2002 John Goerzen <[email protected]>
#
@@ -17,12 +18,16 @@
from . import pygpgme
-def process_constants(starttext, dict):
- """Called by the constant libraries to load up the appropriate constants
- from the C library."""
- index = len(starttext)
- for identifier in dir(pygpgme):
- if not identifier.startswith(starttext):
- continue
- name = identifier[index:]
- dict[name] = getattr(pygpgme, identifier)
+def process_constants(prefix, scope):
+ """Called by the constant modules to load up the constants from the C
+ library starting with PREFIX. Matching constants will be inserted
+ into SCOPE with PREFIX stripped from the names. Returns the names
+ of inserted constants.
+
+ """
+ index = len(prefix)
+ constants = {identifier[index:]: getattr(pygpgme, identifier)
+ for identifier in dir(pygpgme)
+ if identifier.startswith(prefix)}
+ scope.update(constants)
+ return list(constants.keys())