diff options
Diffstat (limited to 'lang/python/pyme')
| -rw-r--r-- | lang/python/pyme/__init__.py | 69 | ||||
| -rw-r--r-- | lang/python/pyme/core.py | 324 | ||||
| -rw-r--r-- | lang/python/pyme/errors.py | 59 | ||||
| -rw-r--r-- | lang/python/pyme/util.py | 23 | 
4 files changed, 388 insertions, 87 deletions
diff --git a/lang/python/pyme/__init__.py b/lang/python/pyme/__init__.py index e377f595..c42f7945 100644 --- a/lang/python/pyme/__init__.py +++ b/lang/python/pyme/__init__.py @@ -40,6 +40,20 @@ FEATURES   * Fully object-oriented with convenient classes and modules. +QUICK EXAMPLE +------------- + +    >>> import pyme +    >>> with pyme.Context() as c: +    >>> with pyme.Context() as c: +    ...     cipher, _, _ = c.encrypt("Hello world :)".encode(), +    ...                              passphrase="abc") +    ...     c.decrypt(cipher, passphrase="abc") +    ... +    (b'Hello world :)', +     <pyme.results.DecryptResult object at 0x7f5ab8121080>, +     <pyme.results.VerifyResult object at 0x7f5ab81219b0>) +  GENERAL OVERVIEW  ---------------- @@ -78,59 +92,14 @@ do not appear explicitly anywhere. You can use dir() python built-in command  on an object to see what methods and fields it has but their meaning can  be found only in GPGME documentation. -QUICK START SAMPLE PROGRAM --------------------------- -This program is not for serious encryption, but for example purposes only! - -import sys -import os -from pyme import core, constants - -# Set up our input and output buffers. - -plain = core.Data('This is my message.') -cipher = core.Data() - -# Initialize our context. - -c = core.Context() -c.set_armor(1) - -# Set up the recipients. - -sys.stdout.write("Enter name of your recipient: ") -sys.stdout.flush() -name = sys.stdin.readline().strip() -c.op_keylist_start(name, 0) -r = c.op_keylist_next() - -# Do the encryption. - -c.op_encrypt([r], 1, plain, cipher) -cipher.seek(0, os.SEEK_SET) -sys.stdout.buffer.write(cipher.read()) - -Note that although there is no explicit error checking done here, the -Python GPGME library is automatically doing error-checking, and will -raise an exception if there is any problem. - -This program is in the Pyme distribution as examples/simple.py.  The examples -directory contains more advanced samples as well. -  FOR MORE INFORMATION  -------------------- -PYME homepage: http://pyme.sourceforge.net -GPGME documentation: http://pyme.sourceforge.net/doc/gpgme/index.html -GPGME homepage: http://www.gnupg.org/gpgme.html - -Base classes: pyme.core (START HERE!) -Error classes: pyme.errors -Constants: pyme.constants -Version information: pyme.version -Utilities: pyme.util - -Base classes are documented at pyme.core. +PYME3 homepage: https://www.gnupg.org/ +GPGME documentation: https://www.gnupg.org/documentation/manuals/gpgme/  """  __all__ = ['core', 'errors', 'constants', 'util', 'callbacks', 'version'] + +from .core import Context +from .core import Data diff --git a/lang/python/pyme/core.py b/lang/python/pyme/core.py index e5ccf7cd..6ca8cb82 100644 --- a/lang/python/pyme/core.py +++ b/lang/python/pyme/core.py @@ -25,6 +25,7 @@ and the 'Data' class describing buffers of data.  """  import re +import os  import weakref  from . import pygpgme  from .errors import errorcheck, GPGMEError @@ -166,6 +167,303 @@ class Context(GpgmeWrapper):      """ +    def __init__(self, armor=False, textmode=False, offline=False, +                 signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT, +                 wrapped=None): +        """Construct a context object + +        Keyword arguments: +        armor		-- enable ASCII armoring (default False) +        textmode	-- enable canonical text mode (default False) +        offline		-- do not contact external key sources (default False) +        signers		-- list of keys used for signing (default []) +        pinentry_mode	-- pinentry mode (default PINENTRY_MODE_DEFAULT) + +        """ +        if wrapped: +            self.own = False +        else: +            tmp = pygpgme.new_gpgme_ctx_t_p() +            errorcheck(pygpgme.gpgme_new(tmp)) +            wrapped = pygpgme.gpgme_ctx_t_p_value(tmp) +            pygpgme.delete_gpgme_ctx_t_p(tmp) +            self.own = True +        super().__init__(wrapped) +        self.armor = armor +        self.textmode = textmode +        self.offline = offline +        self.signers = signers +        self.pinentry_mode = pinentry_mode + +    def encrypt(self, plaintext, recipients=[], sign=True, sink=None, +                passphrase=None, always_trust=False, add_encrypt_to=False, +                prepare=False, expect_sign=False, compress=True): +        """Encrypt data + +        Encrypt the given plaintext for the given recipients.  If the +        list of recipients is empty, the data is encrypted +        symmetrically with a passphrase. + +        The passphrase can be given as parameter, using a callback +        registered at the context, or out-of-band via pinentry. + +        Keyword arguments: +        recipients	-- list of keys to encrypt to +        sign		-- sign plaintext (default True) +        sink		-- write result to sink instead of returning it +        passphrase	-- for symmetric encryption +        always_trust	-- always trust the keys (default False) +        add_encrypt_to	-- encrypt to configured additional keys (default False) +        prepare		-- (ui) prepare for encryption (default False) +        expect_sign	-- (ui) prepare for signing (default False) +        compress	-- compress plaintext (default True) + +        Returns: +        ciphertext	-- the encrypted data (or None if sink is given) +        result		-- additional information about the encryption +        sign_result	-- additional information about the signature(s) + +        Raises: +        InvalidRecipients -- if encryption using a particular key failed +        InvalidSigners	-- if signing using a particular key failed +        GPGMEError	-- as signaled by the underlying library + +        """ +        ciphertext = sink if sink else Data() +        flags = 0 +        flags |= always_trust * constants.ENCRYPT_ALWAYS_TRUST +        flags |= (not add_encrypt_to) * constants.ENCRYPT_NO_ENCRYPT_TO +        flags |= prepare * constants.ENCRYPT_PREPARE +        flags |= expect_sign * constants.ENCRYPT_EXPECT_SIGN +        flags |= (not compress) * constants.ENCRYPT_NO_COMPRESS + +        if passphrase != None: +            old_pinentry_mode = self.pinentry_mode +            old_passphrase_cb = getattr(self, '_passphrase_cb', None) +            self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK +            def passphrase_cb(hint, desc, prev_bad, hook=None): +                return passphrase +            self.set_passphrase_cb(passphrase_cb) + +        try: +            if sign: +                self.op_encrypt_sign(recipients, flags, plaintext, ciphertext) +            else: +                self.op_encrypt(recipients, flags, plaintext, ciphertext) +        except errors.GPGMEError as e: +            if e.getcode() == errors.UNUSABLE_PUBKEY: +                result = self.op_encrypt_result() +                if result.invalid_recipients: +                    raise errors.InvalidRecipients(result.invalid_recipients) +            if e.getcode() == errors.UNUSABLE_SECKEY: +                sig_result = self.op_sign_result() +                if sig_result.invalid_signers: +                    raise errors.InvalidSigners(sig_result.invalid_signers) +            raise +        finally: +            if passphrase != None: +                self.pinentry_mode = old_pinentry_mode +                if old_passphrase_cb: +                    self.set_passphrase_cb(*old_passphrase_cb[1:]) + +        result = self.op_encrypt_result() +        assert not result.invalid_recipients +        sig_result = self.op_sign_result() if sign else None +        assert not sig_result or not sig_result.invalid_signers + +        cipherbytes = None +        if not sink: +            ciphertext.seek(0, os.SEEK_SET) +            cipherbytes = ciphertext.read() +        return cipherbytes, result, sig_result + +    def decrypt(self, ciphertext, sink=None, passphrase=None, verify=True): +        """Decrypt data + +        Decrypt the given ciphertext and verify any signatures.  If +        VERIFY is an iterable of keys, the ciphertext must be signed +        by all those keys, otherwise an error is raised. + +        If the ciphertext is symmetrically encrypted using a +        passphrase, that passphrase can be given as parameter, using a +        callback registered at the context, or out-of-band via +        pinentry. + +        Keyword arguments: +        sink		-- write result to sink instead of returning it +        passphrase	-- for symmetric decryption +        verify		-- check signatures (default True) + +        Returns: +        plaintext	-- the decrypted data (or None if sink is given) +        result		-- additional information about the decryption +        verify_result	-- additional information about the signature(s) + +        Raises: +        UnsupportedAlgorithm -- if an unsupported algorithm was used +        BadSignatures	-- if a bad signature is encountered +        MissingSignatures -- if expected signatures are missing or bad +        GPGMEError	-- as signaled by the underlying library + +        """ +        plaintext = sink if sink else Data() + +        if passphrase != None: +            old_pinentry_mode = self.pinentry_mode +            old_passphrase_cb = getattr(self, '_passphrase_cb', None) +            self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK +            def passphrase_cb(hint, desc, prev_bad, hook=None): +                return passphrase +            self.set_passphrase_cb(passphrase_cb) + +        try: +            if verify: +                self.op_decrypt_verify(ciphertext, plaintext) +            else: +                self.op_decrypt(ciphertext, plaintext) +        finally: +            if passphrase != None: +                self.pinentry_mode = old_pinentry_mode +                if old_passphrase_cb: +                    self.set_passphrase_cb(*old_passphrase_cb[1:]) + +        result = self.op_decrypt_result() +        verify_result = self.op_verify_result() if verify else None +        if result.unsupported_algorithm: +            raise errors.UnsupportedAlgorithm(result.unsupported_algorithm) + +        if verify: +            if any(s.status != errors.NO_ERROR +                   for s in verify_result.signatures): +                raise errors.BadSignatures(verify_result) + +        if verify and verify != True: +            missing = list() +            for key in verify: +                ok = False +                for subkey in key.subkeys: +                    for sig in verify_result.signatures: +                        if sig.summary & constants.SIGSUM_VALID == 0: +                            continue +                        if subkey.can_sign and subkey.fpr == sig.fpr: +                            ok = True +                            break +                    if ok: +                        break +                if not ok: +                    missing.append(key) +            if missing: +                raise errors.MissingSignatures(verify_result, missing) + +        plainbytes = None +        if not sink: +            plaintext.seek(0, os.SEEK_SET) +            plainbytes = plaintext.read() +        return plainbytes, result, verify_result + +    def sign(self, data, sink=None, mode=constants.SIG_MODE_NORMAL): +        """Sign data + +        Sign the given data with either the configured default local +        key, or the 'signers' keys of this context. + +        Keyword arguments: +        mode		-- signature mode (default: normal, see below) +        sink		-- write result to sink instead of returning it + +        Returns: +        either +          signed_data	-- encoded data and signature (normal mode) +          signature	-- only the signature data (detached mode) +          cleartext	-- data and signature as text (cleartext mode) +            (or None if sink is given) +        result		-- additional information about the signature(s) + +        Raises: +        InvalidSigners	-- if signing using a particular key failed +        GPGMEError	-- as signaled by the underlying library + +        """ +        signeddata = sink if sink else Data() + +        try: +            self.op_sign(data, signeddata, mode) +        except errors.GPGMEError as e: +            if e.getcode() == errors.UNUSABLE_SECKEY: +                result = self.op_sign_result() +                if result.invalid_signers: +                    raise errors.InvalidSigners(result.invalid_signers) +            raise + +        result = self.op_sign_result() +        assert not result.invalid_signers + +        signedbytes = None +        if not sink: +            signeddata.seek(0, os.SEEK_SET) +            signedbytes = signeddata.read() +        return signedbytes, result + +    def verify(self, signed_data, signature=None, sink=None, verify=[]): +        """Verify signatures + +        Verify signatures over data.  If VERIFY is an iterable of +        keys, the ciphertext must be signed by all those keys, +        otherwise an error is raised. + +        Keyword arguments: +        signature	-- detached signature data +        sink		-- write result to sink instead of returning it + +        Returns: +        data		-- the plain data +            (or None if sink is given, or we verified a detached signature) +        result		-- additional information about the signature(s) + +        Raises: +        BadSignatures	-- if a bad signature is encountered +        MissingSignatures -- if expected signatures are missing or bad +        GPGMEError	-- as signaled by the underlying library + +        """ +        if signature: +            # Detached signature, we don't return the plain text. +            data = None +        else: +            data = sink if sink else Data() + +        if signature: +            self.op_verify(signature, signed_data, None) +        else: +            self.op_verify(signed_data, None, data) + +        result = self.op_verify_result() +        if any(s.status != errors.NO_ERROR for s in result.signatures): +            raise errors.BadSignatures(result) + +        missing = list() +        for key in verify: +            ok = False +            for subkey in key.subkeys: +                for sig in result.signatures: +                    if sig.summary & constants.SIGSUM_VALID == 0: +                        continue +                    if subkey.can_sign and subkey.fpr == sig.fpr: +                        ok = True +                        break +                if ok: +                    break +            if not ok: +                missing.append(key) +        if missing: +            raise errors.MissingSignatures(result, missing) + +        plainbytes = None +        if data and not sink: +            data.seek(0, os.SEEK_SET) +            plainbytes = data.read() +        return plainbytes, result +      @property      def signers(self):          """Keys used for signing""" @@ -204,32 +502,6 @@ class Context(GpgmeWrapper):          return 0      _boolean_properties = {'armor', 'textmode', 'offline'} -    def __init__(self, armor=False, textmode=False, offline=False, -                 signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT, -                 wrapped=None): -        """Construct a context object - -        Keyword arguments: -        armor		-- enable ASCII armoring (default False) -        textmode	-- enable canonical text mode (default False) -        offline		-- do not contact external key sources (default False) -        signers		-- list of keys used for signing (default []) -        pinentry_mode	-- pinentry mode (default PINENTRY_MODE_DEFAULT) -        """ -        if wrapped: -            self.own = False -        else: -            tmp = pygpgme.new_gpgme_ctx_t_p() -            errorcheck(pygpgme.gpgme_new(tmp)) -            wrapped = pygpgme.gpgme_ctx_t_p_value(tmp) -            pygpgme.delete_gpgme_ctx_t_p(tmp) -            self.own = True -        super().__init__(wrapped) -        self.armor = armor -        self.textmode = textmode -        self.offline = offline -        self.signers = signers -        self.pinentry_mode = pinentry_mode      def __del__(self):          if not pygpgme: diff --git a/lang/python/pyme/errors.py b/lang/python/pyme/errors.py index f96877b6..0194931c 100644 --- a/lang/python/pyme/errors.py +++ b/lang/python/pyme/errors.py @@ -20,7 +20,10 @@ from . import util  util.process_constants('GPG_ERR_', globals()) -class GPGMEError(Exception): +class PymeError(Exception): +    pass + +class GPGMEError(PymeError):      def __init__(self, error = None, message = None):          self.error = error          self.message = message @@ -43,8 +46,60 @@ class GPGMEError(Exception):          return pygpgme.gpgme_err_source(self.error)      def __str__(self): -        return "%s (%d,%d)"%(self.getstring(), self.getsource(), self.getcode()) +        return self.getstring()  def errorcheck(retval, extradata = None):      if retval:          raise GPGMEError(retval, extradata) + +# These errors are raised in the idiomatic interface code. + +class EncryptionError(PymeError): +    pass + +class InvalidRecipients(EncryptionError): +    def __init__(self, recipients): +        self.recipients = recipients +    def __str__(self): +        return ", ".join("{}: {}".format(r.fpr, +                                         pygpgme.gpgme_strerror(r.reason)) +                         for r in self.recipients) + +class DeryptionError(PymeError): +    pass + +class UnsupportedAlgorithm(DeryptionError): +    def __init__(self, algorithm): +        self.algorithm = algorithm +    def __str__(self): +        return self.algorithm + +class SigningError(PymeError): +    pass + +class InvalidSigners(SigningError): +    def __init__(self, signers): +        self.signers = signers +    def __str__(self): +        return ", ".join("{}: {}".format(s.fpr, +                                         pygpgme.gpgme_strerror(s.reason)) +                         for s in self.signers) + +class VerificationError(PymeError): +    pass + +class BadSignatures(VerificationError): +    def __init__(self, result): +        self.result = result +    def __str__(self): +        return ", ".join("{}: {}".format(s.fpr, +                                         pygpgme.gpgme_strerror(s.status)) +                         for s in self.result.signatures +                         if s.status != NO_ERROR) + +class MissingSignatures(VerificationError): +    def __init__(self, result, missing): +        self.result = result +        self.missing = missing +    def __str__(self): +        return ", ".join(k.subkeys[0].fpr for k in self.missing) diff --git a/lang/python/pyme/util.py b/lang/python/pyme/util.py index 5527a1a2..bbd28fe7 100644 --- a/lang/python/pyme/util.py +++ b/lang/python/pyme/util.py @@ -1,3 +1,4 @@ +# Copyright (C) 2016 g10 Code GmbH  # Copyright (C) 2004,2008 Igor Belyi <[email protected]>  # Copyright (C) 2002 John Goerzen <[email protected]>  # @@ -17,12 +18,16 @@  from . import pygpgme -def process_constants(starttext, dict): -    """Called by the constant libraries to load up the appropriate constants -    from the C library.""" -    index = len(starttext) -    for identifier in dir(pygpgme): -        if not identifier.startswith(starttext): -            continue -        name = identifier[index:] -        dict[name] = getattr(pygpgme, identifier) +def process_constants(prefix, scope): +    """Called by the constant modules to load up the constants from the C +    library starting with PREFIX.  Matching constants will be inserted +    into SCOPE with PREFIX stripped from the names.  Returns the names +    of inserted constants. + +    """ +    index = len(prefix) +    constants = {identifier[index:]: getattr(pygpgme, identifier) +                 for identifier in dir(pygpgme) +                 if identifier.startswith(prefix)} +    scope.update(constants) +    return list(constants.keys())  | 
