diff options
Diffstat (limited to 'lang/python/pyme')
-rw-r--r-- | lang/python/pyme/__init__.py | 4 | ||||
-rw-r--r-- | lang/python/pyme/constants/__init__.py | 23 | ||||
-rw-r--r-- | lang/python/pyme/core.py | 313 | ||||
-rw-r--r-- | lang/python/pyme/errors.py | 59 | ||||
-rw-r--r-- | lang/python/pyme/util.py | 23 |
5 files changed, 383 insertions, 39 deletions
diff --git a/lang/python/pyme/__init__.py b/lang/python/pyme/__init__.py index e377f595..880f6a19 100644 --- a/lang/python/pyme/__init__.py +++ b/lang/python/pyme/__init__.py @@ -134,3 +134,7 @@ Base classes are documented at pyme.core. """ __all__ = ['core', 'errors', 'constants', 'util', 'callbacks', 'version'] + +from .core import Context +from .core import Data +from .errors import GPGMEError diff --git a/lang/python/pyme/constants/__init__.py b/lang/python/pyme/constants/__init__.py index 2e91d769..875b6dc1 100644 --- a/lang/python/pyme/constants/__init__.py +++ b/lang/python/pyme/constants/__init__.py @@ -1,6 +1,25 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2016 g10 Code GmbH +# Copyright (C) 2004 Igor Belyi <[email protected]> +# Copyright (C) 2002 John Goerzen <[email protected]> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, see <http://www.gnu.org/licenses/>. from pyme import util -util.process_constants('GPGME_', globals()) +constants = util.process_constants('GPGME_', globals()) -__all__ = ['data', 'event', 'import', 'keylist', 'md', 'pk', +__all__ = constants + \ + ['data', 'event', 'import', 'keylist', 'md', 'pk', 'protocol', 'sig', 'sigsum', 'status', 'validity'] diff --git a/lang/python/pyme/core.py b/lang/python/pyme/core.py index 09f71a1a..365d342b 100644 --- a/lang/python/pyme/core.py +++ b/lang/python/pyme/core.py @@ -25,6 +25,7 @@ and the 'Data' class describing buffers of data. """ import re +import os import weakref from . import pygpgme from .errors import errorcheck, GPGMEError @@ -166,6 +167,291 @@ class Context(GpgmeWrapper): """ + def __init__(self, armor=False, textmode=False, offline=False, + signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT, + wrapped=None): + """Construct a context object. + + Keyword arguments: + armor -- enable ASCII armoring (default False) + textmode -- enable canonical text mode (default False) + offline -- do not contact external key sources (default False) + signers -- list of keys used for signing (default []) + pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT) + + """ + if wrapped: + self.own = False + else: + tmp = pygpgme.new_gpgme_ctx_t_p() + errorcheck(pygpgme.gpgme_new(tmp)) + wrapped = pygpgme.gpgme_ctx_t_p_value(tmp) + pygpgme.delete_gpgme_ctx_t_p(tmp) + self.own = True + super().__init__(wrapped) + self.armor = armor + self.textmode = textmode + self.offline = offline + self.signers = signers + self.pinentry_mode = pinentry_mode + + def encrypt(self, plaintext, recipients=[], sign=True, sink=None, + passphrase=None, always_trust=False, add_encrypt_to=False, + prepare=False, expect_sign=False, compress=True): + """Encrypt data + + Encrypt the given plaintext. + + Keyword arguments: + recipients -- list of keys to encrypt to + sign -- sign plaintext (default True) + sink -- write result to sink instead of returning it + passphrase -- for symmetric encryption + always_trust -- always trust the keys (default False) + add_encrypt_to -- encrypt to configured additional keys (default False) + prepare -- (ui) prepare for encryption (default False) + expect_sign -- (ui) prepare for signing (default False) + compress -- compress plaintext (default True) + + Returns: + ciphertext -- the encrypted data (or None if sink is given) + result -- additional information about the encryption + sign_result -- additional information about the signature(s) + + Raises: + InvalidRecipients -- if encryption using a particular key failed + InvalidSigners -- if signing using a particular key failed + GPGMEError -- as signaled by the underlying library + + """ + ciphertext = sink if sink else Data() + flags = 0 + flags |= always_trust * constants.ENCRYPT_ALWAYS_TRUST + flags |= (not add_encrypt_to) * constants.ENCRYPT_NO_ENCRYPT_TO + flags |= prepare * constants.ENCRYPT_PREPARE + flags |= expect_sign * constants.ENCRYPT_EXPECT_SIGN + flags |= (not compress) * constants.ENCRYPT_NO_COMPRESS + + if passphrase != None: + old_pinentry_mode = self.pinentry_mode + old_passphrase_cb = getattr(self, '_passphrase_cb', None) + self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK + def passphrase_cb(hint, desc, prev_bad, hook=None): + return passphrase + self.set_passphrase_cb(passphrase_cb) + + try: + if sign: + self.op_encrypt_sign(recipients, flags, plaintext, ciphertext) + else: + self.op_encrypt(recipients, flags, plaintext, ciphertext) + except errors.GPGMEError as e: + if e.getcode() == errors.UNUSABLE_PUBKEY: + result = self.op_encrypt_result() + if result.invalid_recipients: + raise errors.InvalidRecipients(result.invalid_recipients) + if e.getcode() == errors.UNUSABLE_SECKEY: + sig_result = self.op_sign_result() + if sig_result.invalid_signers: + raise errors.InvalidSigners(sig_result.invalid_signers) + raise + finally: + if passphrase != None: + self.pinentry_mode = old_pinentry_mode + if old_passphrase_cb: + self.set_passphrase_cb(*old_passphrase_cb[1:]) + + result = self.op_encrypt_result() + assert not result.invalid_recipients + sig_result = self.op_sign_result() if sign else None + assert not sig_result or not sig_result.invalid_signers + + cipherbytes = None + if not sink: + ciphertext.seek(0, os.SEEK_SET) + cipherbytes = ciphertext.read() + return cipherbytes, result, sig_result + + def decrypt(self, ciphertext, sink=None, passphrase=None, verify=True): + """Decrypt data + + Decrypt the given ciphertext and verify any signatures. If + VERIFY is an iterable of keys, the ciphertext must be signed + by all those keys, otherwise an error is raised. + + Keyword arguments: + sink -- write result to sink instead of returning it + passphrase -- for symmetric decryption or accessing the key + verify -- check signatures (default True) + + Returns: + plainttext -- the decrypted data (or None if sink is given) + result -- additional information about the decryption + + Raises: + UnsupportedAlgorithm -- if an unsupported algorithm was used + BadSignatures -- if a bad signature is encountered + MissingSignatures -- if expected signatures are missing or bad + GPGMEError -- as signaled by the underlying library + + """ + plaintext = sink if sink else Data() + + if passphrase != None: + old_pinentry_mode = self.pinentry_mode + old_passphrase_cb = getattr(self, '_passphrase_cb', None) + self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK + def passphrase_cb(hint, desc, prev_bad, hook=None): + return passphrase + self.set_passphrase_cb(passphrase_cb) + + try: + if verify: + self.op_decrypt_verify(ciphertext, plaintext) + else: + self.op_decrypt(ciphertext, plaintext) + finally: + if passphrase != None: + self.pinentry_mode = old_pinentry_mode + if old_passphrase_cb: + self.set_passphrase_cb(*old_passphrase_cb[1:]) + + result = self.op_decrypt_result() + verify_result = self.op_verify_result() if verify else None + if result.unsupported_algorithm: + raise errors.UnsupportedAlgorithm(result.unsupported_algorithm) + + if verify: + if any(s.status != errors.NO_ERROR + for s in verify_result.signatures): + raise errors.BadSignatures(verify_result) + + if verify and verify != True: + missing = list() + for key in verify: + ok = False + for subkey in key.subkeys: + for sig in verify_result.signatures: + if sig.summary & constants.SIGSUM_VALID == 0: + continue + if subkey.can_sign and subkey.fpr == sig.fpr: + ok = True + break + if ok: + break + if not ok: + missing.append(key) + if missing: + raise errors.MissingSignatures(verify_result, missing) + + plainbytes = None + if not sink: + plaintext.seek(0, os.SEEK_SET) + plainbytes = plaintext.read() + return plainbytes, result, verify_result + + def sign(self, data, sink=None, mode=constants.SIG_MODE_NORMAL): + """Sign data + + Sign the given data. + + Keyword arguments: + mode -- signature mode (default: normal, see below) + sink -- write result to sink instead of returning it + + Returns: + either + signed_data -- encoded data and signature (normal mode) + signature -- only the signature data (detached mode) + cleartext -- data and signature as text (cleartext mode) + (or None if sink is given) + result -- additional information about the signature(s) + + Raises: + InvalidSigners -- if signing using a particular key failed + GPGMEError -- as signaled by the underlying library + + """ + signeddata = sink if sink else Data() + + try: + self.op_sign(data, signeddata, mode) + except errors.GPGMEError as e: + if e.getcode() == errors.UNUSABLE_SECKEY: + result = self.op_sign_result() + if result.invalid_signers: + raise errors.InvalidSigners(result.invalid_signers) + raise + + result = self.op_sign_result() + assert not result.invalid_signers + + signedbytes = None + if not sink: + signeddata.seek(0, os.SEEK_SET) + signedbytes = signeddata.read() + return signedbytes, result + + def verify(self, signed_data, signature=None, sink=None, verify=[]): + """Verify signatures + + Verify signatures over data. If VERIFY is an iterable of + keys, the ciphertext must be signed by all those keys, + otherwise an error is raised. + + Keyword arguments: + signature -- detached signature data + sink -- write result to sink instead of returning it + + Returns: + data -- the plain data + (or None if sink is given, or we verified a detached signature) + result -- additional information about the signature(s) + + Raises: + BadSignatures -- if a bad signature is encountered + MissingSignatures -- if expected signatures are missing or bad + GPGMEError -- as signaled by the underlying library + + """ + if signature: + # Detached signature, we don't return the plain text. + data = None + else: + data = sink if sink else Data() + + if signature: + self.op_verify(signature, signed_data, None) + else: + self.op_verify(signed_data, None, data) + + result = self.op_verify_result() + if any(s.status != errors.NO_ERROR for s in result.signatures): + raise errors.BadSignatures(result) + + missing = list() + for key in verify: + ok = False + for subkey in key.subkeys: + for sig in result.signatures: + if sig.summary & constants.SIGSUM_VALID == 0: + continue + if subkey.can_sign and subkey.fpr == sig.fpr: + ok = True + break + if ok: + break + if not ok: + missing.append(key) + if missing: + raise errors.MissingSignatures(result, missing) + + plainbytes = None + if data and not sink: + data.seek(0, os.SEEK_SET) + plainbytes = data.read() + return plainbytes, result + @property def signers(self): """Keys used for signing""" @@ -204,32 +490,6 @@ class Context(GpgmeWrapper): return 0 _boolean_properties = {'armor', 'textmode', 'offline'} - def __init__(self, armor=False, textmode=False, offline=False, - signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT, - wrapped=None): - """Construct a context object - - Keyword arguments: - armor -- enable ASCII armoring (default False) - textmode -- enable canonical text mode (default False) - offline -- do not contact external key sources (default False) - signers -- list of keys used for signing (default []) - pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT) - """ - if wrapped: - self.own = False - else: - tmp = pygpgme.new_gpgme_ctx_t_p() - errorcheck(pygpgme.gpgme_new(tmp)) - wrapped = pygpgme.gpgme_ctx_t_p_value(tmp) - pygpgme.delete_gpgme_ctx_t_p(tmp) - self.own = True - super().__init__(wrapped) - self.armor = armor - self.textmode = textmode - self.offline = offline - self.signers = signers - self.pinentry_mode = pinentry_mode def __del__(self): if not pygpgme: @@ -420,6 +680,7 @@ class Context(GpgmeWrapper): pygpgme.pygpgme_raise_callback_exception(self) errorcheck(result) + class Data(GpgmeWrapper): """Data buffer diff --git a/lang/python/pyme/errors.py b/lang/python/pyme/errors.py index f96877b6..0194931c 100644 --- a/lang/python/pyme/errors.py +++ b/lang/python/pyme/errors.py @@ -20,7 +20,10 @@ from . import util util.process_constants('GPG_ERR_', globals()) -class GPGMEError(Exception): +class PymeError(Exception): + pass + +class GPGMEError(PymeError): def __init__(self, error = None, message = None): self.error = error self.message = message @@ -43,8 +46,60 @@ class GPGMEError(Exception): return pygpgme.gpgme_err_source(self.error) def __str__(self): - return "%s (%d,%d)"%(self.getstring(), self.getsource(), self.getcode()) + return self.getstring() def errorcheck(retval, extradata = None): if retval: raise GPGMEError(retval, extradata) + +# These errors are raised in the idiomatic interface code. + +class EncryptionError(PymeError): + pass + +class InvalidRecipients(EncryptionError): + def __init__(self, recipients): + self.recipients = recipients + def __str__(self): + return ", ".join("{}: {}".format(r.fpr, + pygpgme.gpgme_strerror(r.reason)) + for r in self.recipients) + +class DeryptionError(PymeError): + pass + +class UnsupportedAlgorithm(DeryptionError): + def __init__(self, algorithm): + self.algorithm = algorithm + def __str__(self): + return self.algorithm + +class SigningError(PymeError): + pass + +class InvalidSigners(SigningError): + def __init__(self, signers): + self.signers = signers + def __str__(self): + return ", ".join("{}: {}".format(s.fpr, + pygpgme.gpgme_strerror(s.reason)) + for s in self.signers) + +class VerificationError(PymeError): + pass + +class BadSignatures(VerificationError): + def __init__(self, result): + self.result = result + def __str__(self): + return ", ".join("{}: {}".format(s.fpr, + pygpgme.gpgme_strerror(s.status)) + for s in self.result.signatures + if s.status != NO_ERROR) + +class MissingSignatures(VerificationError): + def __init__(self, result, missing): + self.result = result + self.missing = missing + def __str__(self): + return ", ".join(k.subkeys[0].fpr for k in self.missing) diff --git a/lang/python/pyme/util.py b/lang/python/pyme/util.py index 5527a1a2..bbd28fe7 100644 --- a/lang/python/pyme/util.py +++ b/lang/python/pyme/util.py @@ -1,3 +1,4 @@ +# Copyright (C) 2016 g10 Code GmbH # Copyright (C) 2004,2008 Igor Belyi <[email protected]> # Copyright (C) 2002 John Goerzen <[email protected]> # @@ -17,12 +18,16 @@ from . import pygpgme -def process_constants(starttext, dict): - """Called by the constant libraries to load up the appropriate constants - from the C library.""" - index = len(starttext) - for identifier in dir(pygpgme): - if not identifier.startswith(starttext): - continue - name = identifier[index:] - dict[name] = getattr(pygpgme, identifier) +def process_constants(prefix, scope): + """Called by the constant modules to load up the constants from the C + library starting with PREFIX. Matching constants will be inserted + into SCOPE with PREFIX stripped from the names. Returns the names + of inserted constants. + + """ + index = len(prefix) + constants = {identifier[index:]: getattr(pygpgme, identifier) + for identifier in dir(pygpgme) + if identifier.startswith(prefix)} + scope.update(constants) + return list(constants.keys()) |