aboutsummaryrefslogtreecommitdiffstats
path: root/lang/python/pyme
diff options
context:
space:
mode:
authorJustus Winter <[email protected]>2016-06-08 16:58:57 +0000
committerJustus Winter <[email protected]>2016-07-15 16:28:09 +0000
commit1f318b7aaaa77672fab117d54fe75221780df83c (patch)
tree0f65a2c78fb5cf0f196c0134b9adaa72d33a1236 /lang/python/pyme
parentQt: Disable keylocate test for gnupg < 2.0.10 (diff)
downloadgpgme-1f318b7aaaa77672fab117d54fe75221780df83c.tar.gz
gpgme-1f318b7aaaa77672fab117d54fe75221780df83c.zip
python: Add an idiomatic interface.
* configure.ac: Bump required Python version. * lang/python/pyme/__init__.py: Update docstring. Import Context and Data. * lang/python/pyme/core.py (Context.encrypt): New function. (Context.decrypt): Likewise. (Context.sign): Likewise. (Context.verify): Likewise. * lang/python/pyme/errors.py: Add new errors. * lang/python/pyme/util.py (process_constants): Rework and return the inserted keys. * lang/python/tests/Makefile.am (EXTRA_DIST): Add new keys. * lang/python/tests/encrypt-only.asc: New file. * lang/python/tests/sign-only.asc: Likewise. * lang/python/tests/initial.py: Mark key 'Alpha' as trusted, import new keys. * lang/python/tests/support.py: Add fingerprints of known keys. (in_srcdir): New function. (print_data): Handle bytes too. (mark_key_trusted): New function. * lang/python/tests/t-decrypt-verify.py: Adjust test. Test idiomatic interface. * lang/python/tests/t-decrypt.py: Test idiomatic interface. * lang/python/tests/t-encrypt-sign.py: Likewise. * lang/python/tests/t-encrypt-sym.py: Likewise. * lang/python/tests/t-encrypt.py: Likewise. * lang/python/tests/t-idiomatic.py: Simplify. * lang/python/tests/t-keylist.py: Adjust to newly trusted key. * lang/python/tests/t-sign.py: Likewise. Test idiomatic interface. * lang/python/tests/t-signers.py: Likewise. * lang/python/tests/t-verify.py: Likewise. Signed-off-by: Justus Winter <[email protected]>
Diffstat (limited to 'lang/python/pyme')
-rw-r--r--lang/python/pyme/__init__.py69
-rw-r--r--lang/python/pyme/core.py324
-rw-r--r--lang/python/pyme/errors.py59
-rw-r--r--lang/python/pyme/util.py23
4 files changed, 388 insertions, 87 deletions
diff --git a/lang/python/pyme/__init__.py b/lang/python/pyme/__init__.py
index e377f595..c42f7945 100644
--- a/lang/python/pyme/__init__.py
+++ b/lang/python/pyme/__init__.py
@@ -40,6 +40,20 @@ FEATURES
* Fully object-oriented with convenient classes and modules.
+QUICK EXAMPLE
+-------------
+
+ >>> import pyme
+ >>> with pyme.Context() as c:
+ >>> with pyme.Context() as c:
+ ... cipher, _, _ = c.encrypt("Hello world :)".encode(),
+ ... passphrase="abc")
+ ... c.decrypt(cipher, passphrase="abc")
+ ...
+ (b'Hello world :)',
+ <pyme.results.DecryptResult object at 0x7f5ab8121080>,
+ <pyme.results.VerifyResult object at 0x7f5ab81219b0>)
+
GENERAL OVERVIEW
----------------
@@ -78,59 +92,14 @@ do not appear explicitly anywhere. You can use dir() python built-in command
on an object to see what methods and fields it has but their meaning can
be found only in GPGME documentation.
-QUICK START SAMPLE PROGRAM
---------------------------
-This program is not for serious encryption, but for example purposes only!
-
-import sys
-import os
-from pyme import core, constants
-
-# Set up our input and output buffers.
-
-plain = core.Data('This is my message.')
-cipher = core.Data()
-
-# Initialize our context.
-
-c = core.Context()
-c.set_armor(1)
-
-# Set up the recipients.
-
-sys.stdout.write("Enter name of your recipient: ")
-sys.stdout.flush()
-name = sys.stdin.readline().strip()
-c.op_keylist_start(name, 0)
-r = c.op_keylist_next()
-
-# Do the encryption.
-
-c.op_encrypt([r], 1, plain, cipher)
-cipher.seek(0, os.SEEK_SET)
-sys.stdout.buffer.write(cipher.read())
-
-Note that although there is no explicit error checking done here, the
-Python GPGME library is automatically doing error-checking, and will
-raise an exception if there is any problem.
-
-This program is in the Pyme distribution as examples/simple.py. The examples
-directory contains more advanced samples as well.
-
FOR MORE INFORMATION
--------------------
-PYME homepage: http://pyme.sourceforge.net
-GPGME documentation: http://pyme.sourceforge.net/doc/gpgme/index.html
-GPGME homepage: http://www.gnupg.org/gpgme.html
-
-Base classes: pyme.core (START HERE!)
-Error classes: pyme.errors
-Constants: pyme.constants
-Version information: pyme.version
-Utilities: pyme.util
-
-Base classes are documented at pyme.core.
+PYME3 homepage: https://www.gnupg.org/
+GPGME documentation: https://www.gnupg.org/documentation/manuals/gpgme/
"""
__all__ = ['core', 'errors', 'constants', 'util', 'callbacks', 'version']
+
+from .core import Context
+from .core import Data
diff --git a/lang/python/pyme/core.py b/lang/python/pyme/core.py
index e5ccf7cd..6ca8cb82 100644
--- a/lang/python/pyme/core.py
+++ b/lang/python/pyme/core.py
@@ -25,6 +25,7 @@ and the 'Data' class describing buffers of data.
"""
import re
+import os
import weakref
from . import pygpgme
from .errors import errorcheck, GPGMEError
@@ -166,6 +167,303 @@ class Context(GpgmeWrapper):
"""
+ def __init__(self, armor=False, textmode=False, offline=False,
+ signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT,
+ wrapped=None):
+ """Construct a context object
+
+ Keyword arguments:
+ armor -- enable ASCII armoring (default False)
+ textmode -- enable canonical text mode (default False)
+ offline -- do not contact external key sources (default False)
+ signers -- list of keys used for signing (default [])
+ pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT)
+
+ """
+ if wrapped:
+ self.own = False
+ else:
+ tmp = pygpgme.new_gpgme_ctx_t_p()
+ errorcheck(pygpgme.gpgme_new(tmp))
+ wrapped = pygpgme.gpgme_ctx_t_p_value(tmp)
+ pygpgme.delete_gpgme_ctx_t_p(tmp)
+ self.own = True
+ super().__init__(wrapped)
+ self.armor = armor
+ self.textmode = textmode
+ self.offline = offline
+ self.signers = signers
+ self.pinentry_mode = pinentry_mode
+
+ def encrypt(self, plaintext, recipients=[], sign=True, sink=None,
+ passphrase=None, always_trust=False, add_encrypt_to=False,
+ prepare=False, expect_sign=False, compress=True):
+ """Encrypt data
+
+ Encrypt the given plaintext for the given recipients. If the
+ list of recipients is empty, the data is encrypted
+ symmetrically with a passphrase.
+
+ The passphrase can be given as parameter, using a callback
+ registered at the context, or out-of-band via pinentry.
+
+ Keyword arguments:
+ recipients -- list of keys to encrypt to
+ sign -- sign plaintext (default True)
+ sink -- write result to sink instead of returning it
+ passphrase -- for symmetric encryption
+ always_trust -- always trust the keys (default False)
+ add_encrypt_to -- encrypt to configured additional keys (default False)
+ prepare -- (ui) prepare for encryption (default False)
+ expect_sign -- (ui) prepare for signing (default False)
+ compress -- compress plaintext (default True)
+
+ Returns:
+ ciphertext -- the encrypted data (or None if sink is given)
+ result -- additional information about the encryption
+ sign_result -- additional information about the signature(s)
+
+ Raises:
+ InvalidRecipients -- if encryption using a particular key failed
+ InvalidSigners -- if signing using a particular key failed
+ GPGMEError -- as signaled by the underlying library
+
+ """
+ ciphertext = sink if sink else Data()
+ flags = 0
+ flags |= always_trust * constants.ENCRYPT_ALWAYS_TRUST
+ flags |= (not add_encrypt_to) * constants.ENCRYPT_NO_ENCRYPT_TO
+ flags |= prepare * constants.ENCRYPT_PREPARE
+ flags |= expect_sign * constants.ENCRYPT_EXPECT_SIGN
+ flags |= (not compress) * constants.ENCRYPT_NO_COMPRESS
+
+ if passphrase != None:
+ old_pinentry_mode = self.pinentry_mode
+ old_passphrase_cb = getattr(self, '_passphrase_cb', None)
+ self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
+ def passphrase_cb(hint, desc, prev_bad, hook=None):
+ return passphrase
+ self.set_passphrase_cb(passphrase_cb)
+
+ try:
+ if sign:
+ self.op_encrypt_sign(recipients, flags, plaintext, ciphertext)
+ else:
+ self.op_encrypt(recipients, flags, plaintext, ciphertext)
+ except errors.GPGMEError as e:
+ if e.getcode() == errors.UNUSABLE_PUBKEY:
+ result = self.op_encrypt_result()
+ if result.invalid_recipients:
+ raise errors.InvalidRecipients(result.invalid_recipients)
+ if e.getcode() == errors.UNUSABLE_SECKEY:
+ sig_result = self.op_sign_result()
+ if sig_result.invalid_signers:
+ raise errors.InvalidSigners(sig_result.invalid_signers)
+ raise
+ finally:
+ if passphrase != None:
+ self.pinentry_mode = old_pinentry_mode
+ if old_passphrase_cb:
+ self.set_passphrase_cb(*old_passphrase_cb[1:])
+
+ result = self.op_encrypt_result()
+ assert not result.invalid_recipients
+ sig_result = self.op_sign_result() if sign else None
+ assert not sig_result or not sig_result.invalid_signers
+
+ cipherbytes = None
+ if not sink:
+ ciphertext.seek(0, os.SEEK_SET)
+ cipherbytes = ciphertext.read()
+ return cipherbytes, result, sig_result
+
+ def decrypt(self, ciphertext, sink=None, passphrase=None, verify=True):
+ """Decrypt data
+
+ Decrypt the given ciphertext and verify any signatures. If
+ VERIFY is an iterable of keys, the ciphertext must be signed
+ by all those keys, otherwise an error is raised.
+
+ If the ciphertext is symmetrically encrypted using a
+ passphrase, that passphrase can be given as parameter, using a
+ callback registered at the context, or out-of-band via
+ pinentry.
+
+ Keyword arguments:
+ sink -- write result to sink instead of returning it
+ passphrase -- for symmetric decryption
+ verify -- check signatures (default True)
+
+ Returns:
+ plaintext -- the decrypted data (or None if sink is given)
+ result -- additional information about the decryption
+ verify_result -- additional information about the signature(s)
+
+ Raises:
+ UnsupportedAlgorithm -- if an unsupported algorithm was used
+ BadSignatures -- if a bad signature is encountered
+ MissingSignatures -- if expected signatures are missing or bad
+ GPGMEError -- as signaled by the underlying library
+
+ """
+ plaintext = sink if sink else Data()
+
+ if passphrase != None:
+ old_pinentry_mode = self.pinentry_mode
+ old_passphrase_cb = getattr(self, '_passphrase_cb', None)
+ self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
+ def passphrase_cb(hint, desc, prev_bad, hook=None):
+ return passphrase
+ self.set_passphrase_cb(passphrase_cb)
+
+ try:
+ if verify:
+ self.op_decrypt_verify(ciphertext, plaintext)
+ else:
+ self.op_decrypt(ciphertext, plaintext)
+ finally:
+ if passphrase != None:
+ self.pinentry_mode = old_pinentry_mode
+ if old_passphrase_cb:
+ self.set_passphrase_cb(*old_passphrase_cb[1:])
+
+ result = self.op_decrypt_result()
+ verify_result = self.op_verify_result() if verify else None
+ if result.unsupported_algorithm:
+ raise errors.UnsupportedAlgorithm(result.unsupported_algorithm)
+
+ if verify:
+ if any(s.status != errors.NO_ERROR
+ for s in verify_result.signatures):
+ raise errors.BadSignatures(verify_result)
+
+ if verify and verify != True:
+ missing = list()
+ for key in verify:
+ ok = False
+ for subkey in key.subkeys:
+ for sig in verify_result.signatures:
+ if sig.summary & constants.SIGSUM_VALID == 0:
+ continue
+ if subkey.can_sign and subkey.fpr == sig.fpr:
+ ok = True
+ break
+ if ok:
+ break
+ if not ok:
+ missing.append(key)
+ if missing:
+ raise errors.MissingSignatures(verify_result, missing)
+
+ plainbytes = None
+ if not sink:
+ plaintext.seek(0, os.SEEK_SET)
+ plainbytes = plaintext.read()
+ return plainbytes, result, verify_result
+
+ def sign(self, data, sink=None, mode=constants.SIG_MODE_NORMAL):
+ """Sign data
+
+ Sign the given data with either the configured default local
+ key, or the 'signers' keys of this context.
+
+ Keyword arguments:
+ mode -- signature mode (default: normal, see below)
+ sink -- write result to sink instead of returning it
+
+ Returns:
+ either
+ signed_data -- encoded data and signature (normal mode)
+ signature -- only the signature data (detached mode)
+ cleartext -- data and signature as text (cleartext mode)
+ (or None if sink is given)
+ result -- additional information about the signature(s)
+
+ Raises:
+ InvalidSigners -- if signing using a particular key failed
+ GPGMEError -- as signaled by the underlying library
+
+ """
+ signeddata = sink if sink else Data()
+
+ try:
+ self.op_sign(data, signeddata, mode)
+ except errors.GPGMEError as e:
+ if e.getcode() == errors.UNUSABLE_SECKEY:
+ result = self.op_sign_result()
+ if result.invalid_signers:
+ raise errors.InvalidSigners(result.invalid_signers)
+ raise
+
+ result = self.op_sign_result()
+ assert not result.invalid_signers
+
+ signedbytes = None
+ if not sink:
+ signeddata.seek(0, os.SEEK_SET)
+ signedbytes = signeddata.read()
+ return signedbytes, result
+
+ def verify(self, signed_data, signature=None, sink=None, verify=[]):
+ """Verify signatures
+
+ Verify signatures over data. If VERIFY is an iterable of
+ keys, the ciphertext must be signed by all those keys,
+ otherwise an error is raised.
+
+ Keyword arguments:
+ signature -- detached signature data
+ sink -- write result to sink instead of returning it
+
+ Returns:
+ data -- the plain data
+ (or None if sink is given, or we verified a detached signature)
+ result -- additional information about the signature(s)
+
+ Raises:
+ BadSignatures -- if a bad signature is encountered
+ MissingSignatures -- if expected signatures are missing or bad
+ GPGMEError -- as signaled by the underlying library
+
+ """
+ if signature:
+ # Detached signature, we don't return the plain text.
+ data = None
+ else:
+ data = sink if sink else Data()
+
+ if signature:
+ self.op_verify(signature, signed_data, None)
+ else:
+ self.op_verify(signed_data, None, data)
+
+ result = self.op_verify_result()
+ if any(s.status != errors.NO_ERROR for s in result.signatures):
+ raise errors.BadSignatures(result)
+
+ missing = list()
+ for key in verify:
+ ok = False
+ for subkey in key.subkeys:
+ for sig in result.signatures:
+ if sig.summary & constants.SIGSUM_VALID == 0:
+ continue
+ if subkey.can_sign and subkey.fpr == sig.fpr:
+ ok = True
+ break
+ if ok:
+ break
+ if not ok:
+ missing.append(key)
+ if missing:
+ raise errors.MissingSignatures(result, missing)
+
+ plainbytes = None
+ if data and not sink:
+ data.seek(0, os.SEEK_SET)
+ plainbytes = data.read()
+ return plainbytes, result
+
@property
def signers(self):
"""Keys used for signing"""
@@ -204,32 +502,6 @@ class Context(GpgmeWrapper):
return 0
_boolean_properties = {'armor', 'textmode', 'offline'}
- def __init__(self, armor=False, textmode=False, offline=False,
- signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT,
- wrapped=None):
- """Construct a context object
-
- Keyword arguments:
- armor -- enable ASCII armoring (default False)
- textmode -- enable canonical text mode (default False)
- offline -- do not contact external key sources (default False)
- signers -- list of keys used for signing (default [])
- pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT)
- """
- if wrapped:
- self.own = False
- else:
- tmp = pygpgme.new_gpgme_ctx_t_p()
- errorcheck(pygpgme.gpgme_new(tmp))
- wrapped = pygpgme.gpgme_ctx_t_p_value(tmp)
- pygpgme.delete_gpgme_ctx_t_p(tmp)
- self.own = True
- super().__init__(wrapped)
- self.armor = armor
- self.textmode = textmode
- self.offline = offline
- self.signers = signers
- self.pinentry_mode = pinentry_mode
def __del__(self):
if not pygpgme:
diff --git a/lang/python/pyme/errors.py b/lang/python/pyme/errors.py
index f96877b6..0194931c 100644
--- a/lang/python/pyme/errors.py
+++ b/lang/python/pyme/errors.py
@@ -20,7 +20,10 @@ from . import util
util.process_constants('GPG_ERR_', globals())
-class GPGMEError(Exception):
+class PymeError(Exception):
+ pass
+
+class GPGMEError(PymeError):
def __init__(self, error = None, message = None):
self.error = error
self.message = message
@@ -43,8 +46,60 @@ class GPGMEError(Exception):
return pygpgme.gpgme_err_source(self.error)
def __str__(self):
- return "%s (%d,%d)"%(self.getstring(), self.getsource(), self.getcode())
+ return self.getstring()
def errorcheck(retval, extradata = None):
if retval:
raise GPGMEError(retval, extradata)
+
+# These errors are raised in the idiomatic interface code.
+
+class EncryptionError(PymeError):
+ pass
+
+class InvalidRecipients(EncryptionError):
+ def __init__(self, recipients):
+ self.recipients = recipients
+ def __str__(self):
+ return ", ".join("{}: {}".format(r.fpr,
+ pygpgme.gpgme_strerror(r.reason))
+ for r in self.recipients)
+
+class DeryptionError(PymeError):
+ pass
+
+class UnsupportedAlgorithm(DeryptionError):
+ def __init__(self, algorithm):
+ self.algorithm = algorithm
+ def __str__(self):
+ return self.algorithm
+
+class SigningError(PymeError):
+ pass
+
+class InvalidSigners(SigningError):
+ def __init__(self, signers):
+ self.signers = signers
+ def __str__(self):
+ return ", ".join("{}: {}".format(s.fpr,
+ pygpgme.gpgme_strerror(s.reason))
+ for s in self.signers)
+
+class VerificationError(PymeError):
+ pass
+
+class BadSignatures(VerificationError):
+ def __init__(self, result):
+ self.result = result
+ def __str__(self):
+ return ", ".join("{}: {}".format(s.fpr,
+ pygpgme.gpgme_strerror(s.status))
+ for s in self.result.signatures
+ if s.status != NO_ERROR)
+
+class MissingSignatures(VerificationError):
+ def __init__(self, result, missing):
+ self.result = result
+ self.missing = missing
+ def __str__(self):
+ return ", ".join(k.subkeys[0].fpr for k in self.missing)
diff --git a/lang/python/pyme/util.py b/lang/python/pyme/util.py
index 5527a1a2..bbd28fe7 100644
--- a/lang/python/pyme/util.py
+++ b/lang/python/pyme/util.py
@@ -1,3 +1,4 @@
+# Copyright (C) 2016 g10 Code GmbH
# Copyright (C) 2004,2008 Igor Belyi <[email protected]>
# Copyright (C) 2002 John Goerzen <[email protected]>
#
@@ -17,12 +18,16 @@
from . import pygpgme
-def process_constants(starttext, dict):
- """Called by the constant libraries to load up the appropriate constants
- from the C library."""
- index = len(starttext)
- for identifier in dir(pygpgme):
- if not identifier.startswith(starttext):
- continue
- name = identifier[index:]
- dict[name] = getattr(pygpgme, identifier)
+def process_constants(prefix, scope):
+ """Called by the constant modules to load up the constants from the C
+ library starting with PREFIX. Matching constants will be inserted
+ into SCOPE with PREFIX stripped from the names. Returns the names
+ of inserted constants.
+
+ """
+ index = len(prefix)
+ constants = {identifier[index:]: getattr(pygpgme, identifier)
+ for identifier in dir(pygpgme)
+ if identifier.startswith(prefix)}
+ scope.update(constants)
+ return list(constants.keys())