python: idiomatic interface prototype

Signed-off-by: Justus Winter <justus@g10code.com>
This commit is contained in:
Justus Winter 2016-06-08 18:58:57 +02:00
parent 004026a94f
commit d6408ce471
20 changed files with 757 additions and 82 deletions

View File

@ -363,7 +363,7 @@ if test "$found" = "1"; then
enabled_languages=$(echo $enabled_languages | sed 's/python//')
fi
else
AM_PATH_PYTHON([3.3])
AM_PATH_PYTHON([3.4])
AX_SWIG_PYTHON
if test -z "$PYTHON_VERSION"; then
if test "$explicit_languages" = "1"; then

View File

@ -134,3 +134,7 @@ Base classes are documented at pyme.core.
"""
__all__ = ['core', 'errors', 'constants', 'util', 'callbacks', 'version']
from .core import Context
from .core import Data
from .errors import GPGMEError

View File

@ -1,6 +1,25 @@
#!/usr/bin/env python3
# Copyright (C) 2016 g10 Code GmbH
# Copyright (C) 2004 Igor Belyi <belyi@users.sourceforge.net>
# Copyright (C) 2002 John Goerzen <jgoerzen@complete.org>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
from pyme import util
util.process_constants('GPGME_', globals())
constants = util.process_constants('GPGME_', globals())
__all__ = ['data', 'event', 'import', 'keylist', 'md', 'pk',
__all__ = constants + \
['data', 'event', 'import', 'keylist', 'md', 'pk',
'protocol', 'sig', 'sigsum', 'status', 'validity']

View File

@ -25,6 +25,7 @@ and the 'Data' class describing buffers of data.
"""
import re
import os
import weakref
from . import pygpgme
from .errors import errorcheck, GPGMEError
@ -166,6 +167,291 @@ class Context(GpgmeWrapper):
"""
def __init__(self, armor=False, textmode=False, offline=False,
signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT,
wrapped=None):
"""Construct a context object.
Keyword arguments:
armor -- enable ASCII armoring (default False)
textmode -- enable canonical text mode (default False)
offline -- do not contact external key sources (default False)
signers -- list of keys used for signing (default [])
pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT)
"""
if wrapped:
self.own = False
else:
tmp = pygpgme.new_gpgme_ctx_t_p()
errorcheck(pygpgme.gpgme_new(tmp))
wrapped = pygpgme.gpgme_ctx_t_p_value(tmp)
pygpgme.delete_gpgme_ctx_t_p(tmp)
self.own = True
super().__init__(wrapped)
self.armor = armor
self.textmode = textmode
self.offline = offline
self.signers = signers
self.pinentry_mode = pinentry_mode
def encrypt(self, plaintext, recipients=[], sign=True, sink=None,
passphrase=None, always_trust=False, add_encrypt_to=False,
prepare=False, expect_sign=False, compress=True):
"""Encrypt data
Encrypt the given plaintext.
Keyword arguments:
recipients -- list of keys to encrypt to
sign -- sign plaintext (default True)
sink -- write result to sink instead of returning it
passphrase -- for symmetric encryption
always_trust -- always trust the keys (default False)
add_encrypt_to -- encrypt to configured additional keys (default False)
prepare -- (ui) prepare for encryption (default False)
expect_sign -- (ui) prepare for signing (default False)
compress -- compress plaintext (default True)
Returns:
ciphertext -- the encrypted data (or None if sink is given)
result -- additional information about the encryption
sign_result -- additional information about the signature(s)
Raises:
InvalidRecipients -- if encryption using a particular key failed
InvalidSigners -- if signing using a particular key failed
GPGMEError -- as signaled by the underlying library
"""
ciphertext = sink if sink else Data()
flags = 0
flags |= always_trust * constants.ENCRYPT_ALWAYS_TRUST
flags |= (not add_encrypt_to) * constants.ENCRYPT_NO_ENCRYPT_TO
flags |= prepare * constants.ENCRYPT_PREPARE
flags |= expect_sign * constants.ENCRYPT_EXPECT_SIGN
flags |= (not compress) * constants.ENCRYPT_NO_COMPRESS
if passphrase != None:
old_pinentry_mode = self.pinentry_mode
old_passphrase_cb = getattr(self, '_passphrase_cb', None)
self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
def passphrase_cb(hint, desc, prev_bad, hook=None):
return passphrase
self.set_passphrase_cb(passphrase_cb)
try:
if sign:
self.op_encrypt_sign(recipients, flags, plaintext, ciphertext)
else:
self.op_encrypt(recipients, flags, plaintext, ciphertext)
except errors.GPGMEError as e:
if e.getcode() == errors.UNUSABLE_PUBKEY:
result = self.op_encrypt_result()
if result.invalid_recipients:
raise errors.InvalidRecipients(result.invalid_recipients)
if e.getcode() == errors.UNUSABLE_SECKEY:
sig_result = self.op_sign_result()
if sig_result.invalid_signers:
raise errors.InvalidSigners(sig_result.invalid_signers)
raise
finally:
if passphrase != None:
self.pinentry_mode = old_pinentry_mode
if old_passphrase_cb:
self.set_passphrase_cb(*old_passphrase_cb[1:])
result = self.op_encrypt_result()
assert not result.invalid_recipients
sig_result = self.op_sign_result() if sign else None
assert not sig_result or not sig_result.invalid_signers
cipherbytes = None
if not sink:
ciphertext.seek(0, os.SEEK_SET)
cipherbytes = ciphertext.read()
return cipherbytes, result, sig_result
def decrypt(self, ciphertext, sink=None, passphrase=None, verify=True):
"""Decrypt data
Decrypt the given ciphertext and verify any signatures. If
VERIFY is an iterable of keys, the ciphertext must be signed
by all those keys, otherwise an error is raised.
Keyword arguments:
sink -- write result to sink instead of returning it
passphrase -- for symmetric decryption or accessing the key
verify -- check signatures (default True)
Returns:
plainttext -- the decrypted data (or None if sink is given)
result -- additional information about the decryption
Raises:
UnsupportedAlgorithm -- if an unsupported algorithm was used
BadSignatures -- if a bad signature is encountered
MissingSignatures -- if expected signatures are missing or bad
GPGMEError -- as signaled by the underlying library
"""
plaintext = sink if sink else Data()
if passphrase != None:
old_pinentry_mode = self.pinentry_mode
old_passphrase_cb = getattr(self, '_passphrase_cb', None)
self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
def passphrase_cb(hint, desc, prev_bad, hook=None):
return passphrase
self.set_passphrase_cb(passphrase_cb)
try:
if verify:
self.op_decrypt_verify(ciphertext, plaintext)
else:
self.op_decrypt(ciphertext, plaintext)
finally:
if passphrase != None:
self.pinentry_mode = old_pinentry_mode
if old_passphrase_cb:
self.set_passphrase_cb(*old_passphrase_cb[1:])
result = self.op_decrypt_result()
verify_result = self.op_verify_result() if verify else None
if result.unsupported_algorithm:
raise errors.UnsupportedAlgorithm(result.unsupported_algorithm)
if verify:
if any(s.status != errors.NO_ERROR
for s in verify_result.signatures):
raise errors.BadSignatures(verify_result)
if verify and verify != True:
missing = list()
for key in verify:
ok = False
for subkey in key.subkeys:
for sig in verify_result.signatures:
if sig.summary & constants.SIGSUM_VALID == 0:
continue
if subkey.can_sign and subkey.fpr == sig.fpr:
ok = True
break
if ok:
break
if not ok:
missing.append(key)
if missing:
raise errors.MissingSignatures(verify_result, missing)
plainbytes = None
if not sink:
plaintext.seek(0, os.SEEK_SET)
plainbytes = plaintext.read()
return plainbytes, result, verify_result
def sign(self, data, sink=None, mode=constants.SIG_MODE_NORMAL):
"""Sign data
Sign the given data.
Keyword arguments:
mode -- signature mode (default: normal, see below)
sink -- write result to sink instead of returning it
Returns:
either
signed_data -- encoded data and signature (normal mode)
signature -- only the signature data (detached mode)
cleartext -- data and signature as text (cleartext mode)
(or None if sink is given)
result -- additional information about the signature(s)
Raises:
InvalidSigners -- if signing using a particular key failed
GPGMEError -- as signaled by the underlying library
"""
signeddata = sink if sink else Data()
try:
self.op_sign(data, signeddata, mode)
except errors.GPGMEError as e:
if e.getcode() == errors.UNUSABLE_SECKEY:
result = self.op_sign_result()
if result.invalid_signers:
raise errors.InvalidSigners(result.invalid_signers)
raise
result = self.op_sign_result()
assert not result.invalid_signers
signedbytes = None
if not sink:
signeddata.seek(0, os.SEEK_SET)
signedbytes = signeddata.read()
return signedbytes, result
def verify(self, signed_data, signature=None, sink=None, verify=[]):
"""Verify signatures
Verify signatures over data. If VERIFY is an iterable of
keys, the ciphertext must be signed by all those keys,
otherwise an error is raised.
Keyword arguments:
signature -- detached signature data
sink -- write result to sink instead of returning it
Returns:
data -- the plain data
(or None if sink is given, or we verified a detached signature)
result -- additional information about the signature(s)
Raises:
BadSignatures -- if a bad signature is encountered
MissingSignatures -- if expected signatures are missing or bad
GPGMEError -- as signaled by the underlying library
"""
if signature:
# Detached signature, we don't return the plain text.
data = None
else:
data = sink if sink else Data()
if signature:
self.op_verify(signature, signed_data, None)
else:
self.op_verify(signed_data, None, data)
result = self.op_verify_result()
if any(s.status != errors.NO_ERROR for s in result.signatures):
raise errors.BadSignatures(result)
missing = list()
for key in verify:
ok = False
for subkey in key.subkeys:
for sig in result.signatures:
if sig.summary & constants.SIGSUM_VALID == 0:
continue
if subkey.can_sign and subkey.fpr == sig.fpr:
ok = True
break
if ok:
break
if not ok:
missing.append(key)
if missing:
raise errors.MissingSignatures(result, missing)
plainbytes = None
if data and not sink:
data.seek(0, os.SEEK_SET)
plainbytes = data.read()
return plainbytes, result
@property
def signers(self):
"""Keys used for signing"""
@ -204,32 +490,6 @@ class Context(GpgmeWrapper):
return 0
_boolean_properties = {'armor', 'textmode', 'offline'}
def __init__(self, armor=False, textmode=False, offline=False,
signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT,
wrapped=None):
"""Construct a context object
Keyword arguments:
armor -- enable ASCII armoring (default False)
textmode -- enable canonical text mode (default False)
offline -- do not contact external key sources (default False)
signers -- list of keys used for signing (default [])
pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT)
"""
if wrapped:
self.own = False
else:
tmp = pygpgme.new_gpgme_ctx_t_p()
errorcheck(pygpgme.gpgme_new(tmp))
wrapped = pygpgme.gpgme_ctx_t_p_value(tmp)
pygpgme.delete_gpgme_ctx_t_p(tmp)
self.own = True
super().__init__(wrapped)
self.armor = armor
self.textmode = textmode
self.offline = offline
self.signers = signers
self.pinentry_mode = pinentry_mode
def __del__(self):
if not pygpgme:
@ -420,6 +680,7 @@ class Context(GpgmeWrapper):
pygpgme.pygpgme_raise_callback_exception(self)
errorcheck(result)
class Data(GpgmeWrapper):
"""Data buffer

View File

@ -20,7 +20,10 @@ from . import util
util.process_constants('GPG_ERR_', globals())
class GPGMEError(Exception):
class PymeError(Exception):
pass
class GPGMEError(PymeError):
def __init__(self, error = None, message = None):
self.error = error
self.message = message
@ -43,8 +46,60 @@ class GPGMEError(Exception):
return pygpgme.gpgme_err_source(self.error)
def __str__(self):
return "%s (%d,%d)"%(self.getstring(), self.getsource(), self.getcode())
return self.getstring()
def errorcheck(retval, extradata = None):
if retval:
raise GPGMEError(retval, extradata)
# These errors are raised in the idiomatic interface code.
class EncryptionError(PymeError):
pass
class InvalidRecipients(EncryptionError):
def __init__(self, recipients):
self.recipients = recipients
def __str__(self):
return ", ".join("{}: {}".format(r.fpr,
pygpgme.gpgme_strerror(r.reason))
for r in self.recipients)
class DeryptionError(PymeError):
pass
class UnsupportedAlgorithm(DeryptionError):
def __init__(self, algorithm):
self.algorithm = algorithm
def __str__(self):
return self.algorithm
class SigningError(PymeError):
pass
class InvalidSigners(SigningError):
def __init__(self, signers):
self.signers = signers
def __str__(self):
return ", ".join("{}: {}".format(s.fpr,
pygpgme.gpgme_strerror(s.reason))
for s in self.signers)
class VerificationError(PymeError):
pass
class BadSignatures(VerificationError):
def __init__(self, result):
self.result = result
def __str__(self):
return ", ".join("{}: {}".format(s.fpr,
pygpgme.gpgme_strerror(s.status))
for s in self.result.signatures
if s.status != NO_ERROR)
class MissingSignatures(VerificationError):
def __init__(self, result, missing):
self.result = result
self.missing = missing
def __str__(self):
return ", ".join(k.subkeys[0].fpr for k in self.missing)

View File

@ -1,3 +1,4 @@
# Copyright (C) 2016 g10 Code GmbH
# Copyright (C) 2004,2008 Igor Belyi <belyi@users.sourceforge.net>
# Copyright (C) 2002 John Goerzen <jgoerzen@complete.org>
#
@ -17,12 +18,16 @@
from . import pygpgme
def process_constants(starttext, dict):
"""Called by the constant libraries to load up the appropriate constants
from the C library."""
index = len(starttext)
for identifier in dir(pygpgme):
if not identifier.startswith(starttext):
continue
name = identifier[index:]
dict[name] = getattr(pygpgme, identifier)
def process_constants(prefix, scope):
"""Called by the constant modules to load up the constants from the C
library starting with PREFIX. Matching constants will be inserted
into SCOPE with PREFIX stripped from the names. Returns the names
of inserted constants.
"""
index = len(prefix)
constants = {identifier[index:]: getattr(pygpgme, identifier)
for identifier in dir(pygpgme)
if identifier.startswith(prefix)}
scope.update(constants)
return list(constants.keys())

View File

@ -51,7 +51,7 @@ py_tests = t-wrapper.py \
t-idiomatic.py
TESTS = initial.py $(py_tests) final.py
EXTRA_DIST = support.py $(TESTS)
EXTRA_DIST = support.py $(TESTS) encrypt-only.asc sign-only.asc
CLEANFILES = secring.gpg pubring.gpg pubring.kbx trustdb.gpg dirmngr.conf \
gpg-agent.conf pubring.kbx~ gpg.conf pubring.gpg~ \

View File

@ -0,0 +1,33 @@
-----BEGIN PGP PRIVATE KEY BLOCK-----
Version: GnuPG v2
lQPGBFd/jL0BCAD8jfoblIrlHS0shDCbSiO7RFaT6sEa/6tSPkv6XzBba9oXOkuO
FLTkNpIwPb92U8SOS+27j7n9v6U5NW2tyZwIoeLb8lUyKnCBr22IUhTFVXf7fros
zmPugsJaDBi9f7RL0bqiCn4EV3DGKyAukZklk1k1JV4Ec3dEPMAmL9LmnvXreEjU
pQZZN9sJV32ew8CYkZ6AB8foFQwfxn4x0iUoKvj8kW9RsY1KMPucp4YiFhHeMZW1
5wGAZdEIZYKyWEp4bi/wC9yn/TUR5uNWc0uVJzQvuHwaYjolPW89DinjBkPEJCBr
RwumaOWfbu/hb51wBoUTmUr9diVw93L2ROLPABEBAAH+BwMC1bmUAoPJKI/WBiHm
P6tSNRLdd+7etfjAFvKL7Ob2pNTrc3hbtyOLIQ9tuEaqXEyfnCms/DCg8QdkaFUv
Nkoj0W5+G/MQuR2jIvrq/wyL/4jIw0AFbp9/V1JbSXZh2g1eJLnnykn7uPxCbDFY
FrVeFmkhoxZ3pid6ZQSWlxXsdW+YMvbUfNIIZpbygI/alIBvbDS1YJYEBDCwFZjU
7quE2Ufxo8dm34EHcmbpYpn4r3DUrU5AHQ2fIprLIVqHn4+NUrR8WZS9nCnIeu/z
OaJUZ2lJFRjUC6Gpsbsw6Xwh4Ntwzyt2SsXc+UVZngjozw3yw0VpDifxMBqcd+9x
baSc7dfbOZF2BCZOwnB7/QrFZDaqe5b3n6rTdj1va/CrJMuxbgaNAjvLpdT2EUPZ
fHDAdPAjASofxBREv+HIKwksuPJ9cvavZU6Q4KQA7buo25hd7yjuba4WbLQhp0jH
AT1P7SdakMhk/IFcUKFdB3ZyZZZ1JTTPa2xZn9yDa3Jb1t7IMLYLwY6EFbjvaxH5
WEGZvOAq2iEa941mxv4miwgf7MQPx6g9u0+dXc7iZApwWs9MNfJo3J25sKhWK5Be
Bu3w7c6nrlg40GtPuDRgaBvYWbVerJcepTA/EPfugEJtRsDJkt7wZq1H9lWHU7Ih
Up6/+XKtBzlCIqYjorzFLnC721pcKFcPhLgvtjjNJvUsLXbr9CwnBub/eTFcfRb2
ro60H9cOhf0fQSQyvkZWfzq0BN6rG27G1KhyprsJAmpW0fTHHkB4V19788C2sTQv
D93VU3Nd6MWocwAYtPWmtwXPpuOAU9IcwAvVTxBeBJCXxbH3uyx1frwDXA7lf4Pb
a8hMoMMVU+rAG1uepKI5h4seBIKP7qKEKAPloI6/Vtf7/Ump4DKprS1QpfOW+lsX
aR48lgNR6sQXtDdFbmNyeXB0aW9uIE9ubHkgKHRlc3Qga2V5LCBkbyBub3QgdXNl
KSA8ZW9AZXhhbXBsZS5vcmc+iQE3BBMBCAAhBQJXf4y9AhsNBQsJCAcCBhUICQoL
AgQWAgMBAh4BAheAAAoJEJIFcnabn+Gc/KgH/07wzrsBzTqdI5L6cIqQ81Vq8ASj
tsuYoVfFxymB8F/AxpnLMhYRuWQTcoUHQ/olG2yA0C6o4e1JPAmh6LQGwr0eRnc2
2tr4cbnQAhXpJ8xOR6kH9eE8nGeC7tlEeeV/Wnj3SLZOXOjYjnA9bA3JX9DP3qcz
w1sKQPEHsGkMJuT0ZadnlJ1qw8AnnNKLDlG4kIO9hz3qB8BjxFZf+j5f/nhFNv5I
pnNdMcDwQqHVrwD6WO+Xmmdykab0awL9To0S9DG9ohcXuJiTMa8vtXFSBM0koUDk
BWajEq+QAcDpmdFsQr4/gbzvHkAIVTQb0seJr4gpmXFZu3TMuGVD9j13GaI=
=38ri
-----END PGP PRIVATE KEY BLOCK-----

View File

@ -19,6 +19,20 @@
import os
import subprocess
import pyme
import support
support.init_gpgme(pyme.constants.PROTOCOL_OpenPGP)
subprocess.check_call([os.path.join(os.getenv('top_srcdir'),
"tests", "start-stop-agent"), "--start"])
with pyme.Context() as c:
alpha = c.get_key("A0FF4590BB6122EDEF6E3C542D727CC768697734", False)
bob = c.get_key("D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2", False)
# Mark alpha as trusted. The signature verification tests expect
# this.
support.mark_key_trusted(c, alpha)
c.op_import(open(support.in_srcdir("encrypt-only.asc")))
c.op_import(open(support.in_srcdir("sign-only.asc")))

View File

@ -0,0 +1,33 @@
-----BEGIN PGP PRIVATE KEY BLOCK-----
Version: GnuPG v2
lQPFBFd/jO8BCADiull4EVJiKmJqclPyU6GhTlbJXw7Ch0zbFAauOWYT3ACmgr1U
KfJlZ2sPe2EezZkVSACxgIjTCzcgKQLh/swXdhO8uEgWEIN8f07WcSVDrcRGYwDS
KFSRsK0bfO/OQQDUsSkNQSHjcOdLnCHCinMrQi1mBZOs+Y/DXOkkEV1zbFFV7q6X
4vX9HSWwTRQTdOV9CFZykbwM+X1YIZlVtpOAKqSNJi3P17uQF7P9zko6HWKKKQ5S
96BfXUOIpBRl82R85/yQgeGrWlvZ2BT2ittscNQlBKqLHJ7LIeDr9ctbKlKZjHTn
Da7NYg+PoMHspbizjSONbEzpcR/9ZUq16oJJABEBAAH+BwMC7hQZNJSmlX/W6sfL
0wakX6kTsiCEMy2vMCRcZ769JKT234avHtkL/g7MBJEzqdG9HSEp7+LHGuOWJhfa
20f61WvPT5ujUIy//QXJ9a8z877jCm+fHKCTDXGYLLfCkJLfr3/GfTRy6gaIGTSw
BqZaRelPvHbMp+eiFqDkf8W/E1LO3/83k87+pXggjz4p0OasyMw8RcDmy+IKBMGG
bzet5WIKHIhpblIzuuucQHOjtwA8vCedub3F4lcRuULe2GW6sNuCB9kjSC9g6D1d
bJ+WYi5GiUUQARGSNXiWVoVPLpEo0i6/2bKJ7vBYGRewNp42ebVQU2bFW7uzhaIq
4itzNTjFNTpcxX3Lo0/mzJpe7pVRJwN+HGahNGT0EtPDsT/nNTFDUq8e8nt0U9/8
0eekg4MRBJEzE3A+wosIHPjzCkQgu98+nh79rPMbCpZVxNfLb136cTkubmHCWptN
T2MbqK2L4hMcOxHGGOmI9SjFltNeKtTsVtkxh3Vj67UESPdN550centfasJYA0bj
guRQfHWHJXYIfFwblIFkl8xtIVLTeWlQMEvc7oI8jcJOc2ri8Zdjj/55xxv/RvjC
ZKzfjPpdkLYcN1zP/hETLD68u7WmiMAYCr8Eq9YQ3oKklUpWxRMCAAtmgjGGpm5P
QQW+36s96Q3cuG8R0Z4Wo8y89FgWzCEzuAhemCdffoUA8kn0HJQaVndnExJb1Ebz
wp+zsX/JqiOFvcKHJAWCaXkk0oXVi1aIV4tQyCPfhyjnd846K7g8UabAz51IJHvF
CXRAmqJvu26NqjYOfWBJJxZQsPH4FjPfYx+e/MFPZa+UTKCfzaOHClrePHUDHw58
Ez5ItcORYn51IWW33r+c4tlhW5mrjMD7FcjFOuYT4EIivd5BSnwLP0fjBz8TBVAY
yyFO+YAXTQ+0MVNpZ24gT25seSAodGVzdCBrZXksIGRvIG5vdCB1c2UpIDxzb0Bl
eGFtcGxlLm9yZz6JATcEEwEIACEFAld/jO8CGwMFCwkIBwIGFQgJCgsCBBYCAwEC
HgECF4AACgkQ/tFT8S8Y9F3PAwgAvKav6+luvcAhrpBMO4z/Q8kDMtO5AW1KTEcz
neqpj5eTVJVbYUgDuBlEXbFYtcZmYyYtJC5KQkN3bxPmehVUzGk27UYWMWbPIWyU
riGcFL5BWWQaKSqiWUypzhNVnxYoiWVhHeJ36LICVMpLBaubgcpwCSW/j58yZo/7
XRwf40OblXr4cevIW4Oq5GSxKOQF+DCErF6BeikC2i+NoqSxwNiIO/1NUxs8QfAI
z8UT/bSUXr62BWLfeCIDGgXutMMPth3tKi4DlvLCzI6eYJrd8E3Rt7iUZm9IH8OQ
Djv2DKnL/E/AP8oITItrOmICqfEWcj+Tk2Xep4pCCMNU+Pa0yg==
=gG5b
-----END PGP PRIVATE KEY BLOCK-----

View File

@ -19,14 +19,48 @@ import sys
import os
from pyme import core
# known keys
alpha = "A0FF4590BB6122EDEF6E3C542D727CC768697734"
bob = "D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2"
encrypt_only = "F52770D5C4DB41408D918C9F920572769B9FE19C"
sign_only = "7CCA20CCDE5394CEE71C9F0BFED153F12F18F45D"
def make_filename(name):
return os.path.join(os.environ['top_srcdir'], 'tests', 'gpg', name)
def in_srcdir(name):
return os.path.join(os.environ['srcdir'], name)
def init_gpgme(proto):
core.engine_check_version(proto)
verbose = int(os.environ.get('verbose', 0)) > 1
def print_data(data):
if verbose:
data.seek(0, os.SEEK_SET)
sys.stdout.buffer.write(data.read())
try:
# See if it is a file-like object.
data.seek(0, os.SEEK_SET)
data = data.read()
except:
# Hope for the best.
pass
sys.stdout.buffer.write(data)
def mark_key_trusted(ctx, key):
class Editor(object):
def __init__(self):
self.steps = ["trust", "save"]
def edit(self, status, args, out):
if args == "keyedit.prompt":
result = self.steps.pop(0)
elif args == "edit_ownertrust.value":
result = "5"
elif args == "edit_ownertrust.set_ultimate.okay":
result = "Y"
elif args == "keyedit.save.okay":
result = "Y"
else:
result = None
return result
with core.Data() as sink:
ctx.op_edit(key, Editor().edit, sink, sink)

View File

@ -17,6 +17,7 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
import pyme
from pyme import core, constants, errors
import support
@ -28,7 +29,7 @@ def check_verify_result(result, summary, fpr, status):
assert errors.GPGMEError(sig.status).getcode() == status
assert len(sig.notations) == 0
assert not sig.wrong_key_usage
assert sig.validity == constants.VALIDITY_UNKNOWN
assert sig.validity == constants.VALIDITY_FULL
assert errors.GPGMEError(sig.validity_reason).getcode() == errors.NO_ERROR
support.init_gpgme(constants.PROTOCOL_OpenPGP)
@ -45,6 +46,29 @@ assert not result.unsupported_algorithm, \
support.print_data(sink)
verify_result = c.op_verify_result()
check_verify_result(verify_result, 0,
check_verify_result(verify_result,
constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
errors.NO_ERROR)
# Idiomatic interface.
with pyme.Context() as c:
alpha = c.get_key("A0FF4590BB6122EDEF6E3C542D727CC768697734", False)
bob = c.get_key("D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2", False)
plaintext, _, verify_result = \
c.decrypt(open(support.make_filename("cipher-2.asc")), verify=[alpha])
assert plaintext.find(b'Wenn Sie dies lesen k') >= 0, \
'Plaintext not found'
check_verify_result(verify_result,
constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
errors.NO_ERROR)
try:
c.decrypt(open(support.make_filename("cipher-2.asc")),
verify=[alpha, bob])
except errors.MissingSignatures as e:
assert len(e.missing) == 1
assert e.missing[0] == bob
else:
assert False, "Expected an error, got none"

View File

@ -17,6 +17,7 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
import pyme
from pyme import core, constants
import support
@ -32,3 +33,10 @@ assert not result.unsupported_algorithm, \
"Unsupported algorithm: {}".format(result.unsupported_algorithm)
support.print_data(sink)
# Idiomatic interface.
with pyme.Context() as c:
plaintext, _, _ = c.decrypt(open(support.make_filename("cipher-1.asc")))
assert len(plaintext) > 0
assert plaintext.find(b'Wenn Sie dies lesen k') >= 0, \
'Plaintext not found'

View File

@ -18,6 +18,7 @@
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
import sys
import pyme
from pyme import core, constants
import support
@ -69,3 +70,26 @@ for recipients in (keys, []):
check_result(result, constants.SIG_MODE_NORMAL)
support.print_data(sink)
# Idiomatic interface.
with pyme.Context(armor=True) as c:
message = "Hallo Leute\n".encode()
ciphertext, _, sig_result = c.encrypt(message,
recipients=keys,
always_trust=True)
assert len(ciphertext) > 0
assert ciphertext.find(b'BEGIN PGP MESSAGE') > 0, 'Marker not found'
check_result(sig_result, constants.SIG_MODE_NORMAL)
c.signers = [c.get_key(support.sign_only, True)]
c.encrypt(message, recipients=keys, always_trust=True)
c.signers = [c.get_key(support.encrypt_only, True)]
try:
c.encrypt(message, recipients=keys, always_trust=True)
except pyme.errors.InvalidSigners as e:
assert len(e.signers) == 1
assert support.encrypt_only.endswith(e.signers[0].fpr)
else:
assert False, "Expected an InvalidSigners error, got none"

View File

@ -18,6 +18,7 @@
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
import os
import pyme
from pyme import core, constants
import support
@ -61,3 +62,22 @@ for passphrase in ("abc", b"abc"):
plaintext = plain.read()
assert plaintext == b"Hallo Leute\n", \
"Wrong plaintext {!r}".format(plaintext)
# Idiomatic interface.
for passphrase in ("abc", b"abc"):
with pyme.Context(armor=True) as c:
# Check that the passphrase callback is not altered.
def f(*args):
assert False
c.set_passphrase_cb(f)
message = "Hallo Leute\n".encode()
ciphertext, _, _ = c.encrypt(message,
passphrase=passphrase,
sign=False)
assert ciphertext.find(b'BEGIN PGP MESSAGE') > 0, 'Marker not found'
plaintext, _, _ = c.decrypt(ciphertext, passphrase=passphrase)
assert plaintext == message, 'Message body not recovered'
assert c._passphrase_cb[1] == f, "Passphrase callback not restored"

View File

@ -17,6 +17,7 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
import pyme
from pyme import core, constants
import support
@ -34,6 +35,28 @@ keys.append(c.get_key("D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2", False))
c.op_encrypt(keys, constants.ENCRYPT_ALWAYS_TRUST, source, sink)
result = c.op_encrypt_result()
assert not result.invalid_recipients, \
"Invalid recipient encountered: {}".format(result.invalid_recipients.fpr)
"Invalid recipients: {}".format(", ".join(r.fpr for r in result.recipients))
support.print_data(sink)
# Idiomatic interface.
with pyme.Context(armor=True) as c:
ciphertext, _, _ = c.encrypt("Hallo Leute\n".encode(),
recipients=keys,
sign=False,
always_trust=True)
assert len(ciphertext) > 0
assert ciphertext.find(b'BEGIN PGP MESSAGE') > 0, 'Marker not found'
c.encrypt("Hallo Leute\n".encode(),
recipients=[c.get_key(support.encrypt_only, False)],
sign=False, always_trust=True)
try:
c.encrypt("Hallo Leute\n".encode(),
recipients=[c.get_key(support.sign_only, False)],
sign=False, always_trust=True)
except pyme.errors.InvalidRecipients as e:
assert len(e.recipients) == 1
assert support.sign_only.endswith(e.recipients[0].fpr)
else:
assert False, "Expected an InvalidRecipients error, got none"

View File

@ -20,13 +20,13 @@
import io
import os
import tempfile
from pyme import core, constants, errors
import pyme
import support
support.init_gpgme(constants.PROTOCOL_OpenPGP)
support.init_gpgme(pyme.constants.PROTOCOL_OpenPGP)
# Both Context and Data can be used as context manager:
with core.Context() as c, core.Data() as d:
with pyme.Context() as c, pyme.Data() as d:
c.get_engine_info()
d.write(b"Halloechen")
leak_c = c
@ -35,16 +35,17 @@ assert leak_c.wrapped == None
assert leak_d.wrapped == None
def sign_and_verify(source, signed, sink):
with core.Context() as c:
c.op_sign(source, signed, constants.SIG_MODE_NORMAL)
with pyme.Context() as c:
c.op_sign(source, signed, pyme.constants.SIG_MODE_NORMAL)
signed.seek(0, os.SEEK_SET)
c.op_verify(signed, None, sink)
result = c.op_verify_result()
assert len(result.signatures) == 1, "Unexpected number of signatures"
sig = result.signatures[0]
assert sig.summary == 0
assert errors.GPGMEError(sig.status).getcode() == errors.NO_ERROR
assert sig.summary == (pyme.constants.SIGSUM_VALID |
pyme.constants.SIGSUM_GREEN)
assert pyme.GPGMEError(sig.status).getcode() == pyme.errors.NO_ERROR
sink.seek(0, os.SEEK_SET)
assert sink.read() == b"Hallo Leute\n"
@ -71,5 +72,5 @@ else:
# Demonstrate automatic wrapping of objects implementing the buffer
# interface, and the use of data objects with the 'with' statement.
with io.BytesIO(preallocate) as signed, core.Data() as sink:
with io.BytesIO(preallocate) as signed, pyme.Data() as sink:
sign_and_verify(b"Hallo Leute\n", signed, sink)

View File

@ -19,34 +19,38 @@
import sys
import os
import pyme
from pyme import core, constants
import support
def fail(msg):
raise RuntimeError(msg)
def check_result(r, typ):
if r.invalid_signers:
sys.exit("Invalid signer found: {}".format(r.invalid_signers.fpr))
fail("Invalid signer found: {}".format(r.invalid_signers.fpr))
if len(r.signatures) != 1:
sys.exit("Unexpected number of signatures created")
fail("Unexpected number of signatures created")
signature = r.signatures[0]
if signature.type != typ:
sys.exit("Wrong type of signature created")
fail("Wrong type of signature created")
if signature.pubkey_algo != constants.PK_DSA:
sys.exit("Wrong pubkey algorithm reported: {}".format(
fail("Wrong pubkey algorithm reported: {}".format(
signature.pubkey_algo))
if signature.hash_algo != constants.MD_SHA1:
sys.exit("Wrong hash algorithm reported: {}".format(
fail("Wrong hash algorithm reported: {}".format(
signature.hash_algo))
if signature.sig_class != 1:
sys.exit("Wrong signature class reported: {}".format(
fail("Wrong signature class reported: {}".format(
signature.sig_class))
if signature.fpr != "A0FF4590BB6122EDEF6E3C542D727CC768697734":
sys.exit("Wrong fingerprint reported: {}".format(signature.fpr))
fail("Wrong fingerprint reported: {}".format(signature.fpr))
support.init_gpgme(constants.PROTOCOL_OpenPGP)
@ -82,3 +86,35 @@ c.op_sign(source, sink, constants.SIG_MODE_CLEAR)
result = c.op_sign_result()
check_result(result, constants.SIG_MODE_CLEAR)
support.print_data(sink)
# Idiomatic interface.
with pyme.Context(armor=True, textmode=True) as c:
message = "Hallo Leute\n".encode()
signed, _ = c.sign(message)
assert len(signed) > 0
assert signed.find(b'BEGIN PGP MESSAGE') > 0, 'Message not found'
signed, _ = c.sign(message, mode=pyme.constants.SIG_MODE_DETACH)
assert len(signed) > 0
assert signed.find(b'BEGIN PGP SIGNATURE') > 0, 'Signature not found'
signed, _ = c.sign(message, mode=pyme.constants.SIG_MODE_CLEAR)
assert len(signed) > 0
assert signed.find(b'BEGIN PGP SIGNED MESSAGE') > 0, 'Message not found'
assert signed.find(message) > 0, 'Message content not found'
assert signed.find(b'BEGIN PGP SIGNATURE') > 0, 'Signature not found'
with pyme.Context() as c:
message = "Hallo Leute\n".encode()
c.signers = [c.get_key(support.sign_only, True)]
c.sign(message)
c.signers = [c.get_key(support.encrypt_only, True)]
try:
c.sign(message)
except pyme.errors.InvalidSigners as e:
assert len(e.signers) == 1
assert support.encrypt_only.endswith(e.signers[0].fpr)
else:
assert False, "Expected an InvalidSigners error, got none"

View File

@ -18,35 +18,39 @@
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
import sys
import pyme
from pyme import core, constants
import support
def fail(msg):
raise RuntimeError(msg)
def check_result(r, typ):
if r.invalid_signers:
sys.exit("Invalid signer found: {}".format(r.invalid_signers.fpr))
fail("Invalid signer found: {}".format(r.invalid_signers.fpr))
if len(r.signatures) != 2:
sys.exit("Unexpected number of signatures created")
fail("Unexpected number of signatures created")
for signature in r.signatures:
if signature.type != typ:
sys.exit("Wrong type of signature created")
fail("Wrong type of signature created")
if signature.pubkey_algo != constants.PK_DSA:
sys.exit("Wrong pubkey algorithm reported: {}".format(
fail("Wrong pubkey algorithm reported: {}".format(
signature.pubkey_algo))
if signature.hash_algo != constants.MD_SHA1:
sys.exit("Wrong hash algorithm reported: {}".format(
fail("Wrong hash algorithm reported: {}".format(
signature.hash_algo))
if signature.sig_class != 1:
sys.exit("Wrong signature class reported: {}".format(
signature.sig_class))
fail("Wrong signature class reported: got {}, want {}".format(
signature.sig_class, 1))
if signature.fpr not in ("A0FF4590BB6122EDEF6E3C542D727CC768697734",
"23FD347A419429BACCD5E72D6BC4778054ACD246"):
sys.exit("Wrong fingerprint reported: {}".format(signature.fpr))
fail("Wrong fingerprint reported: {}".format(signature.fpr))
support.init_gpgme(constants.PROTOCOL_OpenPGP)
@ -73,3 +77,20 @@ for mode in (constants.SIG_MODE_NORMAL, constants.SIG_MODE_DETACH,
result = c.op_sign_result()
check_result(result, mode)
support.print_data(sink)
# Idiomatic interface.
with pyme.Context(armor=True, textmode=True, signers=keys) as c:
message = "Hallo Leute\n".encode()
signed, result = c.sign(message)
check_result(result, constants.SIG_MODE_NORMAL)
assert signed.find(b'BEGIN PGP MESSAGE') > 0, 'Message not found'
signed, result = c.sign(message, mode=constants.SIG_MODE_DETACH)
check_result(result, constants.SIG_MODE_DETACH)
assert signed.find(b'BEGIN PGP SIGNATURE') > 0, 'Signature not found'
signed, result = c.sign(message, mode=constants.SIG_MODE_CLEAR)
check_result(result, constants.SIG_MODE_CLEAR)
assert signed.find(b'BEGIN PGP SIGNED MESSAGE') > 0, 'Message not found'
assert signed.find(message) > 0, 'Message content not found'
assert signed.find(b'BEGIN PGP SIGNATURE') > 0, 'Signature not found'

View File

@ -18,12 +18,13 @@
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
import os
import pyme
from pyme import core, constants, errors
import support
test_text1 = "Just GNU it!\n"
test_text1f= "Just GNU it?\n"
test_sig1 = """-----BEGIN PGP SIGNATURE-----
test_text1 = b"Just GNU it!\n"
test_text1f= b"Just GNU it?\n"
test_sig1 = b"""-----BEGIN PGP SIGNATURE-----
iN0EABECAJ0FAjoS+i9FFIAAAAAAAwA5YmFyw7bDpMO8w58gZGFzIHdhcmVuIFVt
bGF1dGUgdW5kIGpldHp0IGVpbiBwcm96ZW50JS1aZWljaGVuNRSAAAAAAAgAJGZv
@ -34,7 +35,7 @@ dADGKXF/Hcb+AKCJWPphZCphduxSvrzH0hgzHdeQaA==
-----END PGP SIGNATURE-----
"""
test_sig2 = """-----BEGIN PGP MESSAGE-----
test_sig2 = b"""-----BEGIN PGP MESSAGE-----
owGbwMvMwCSoW1RzPCOz3IRxjXQSR0lqcYleSUWJTZOvjVdpcYmCu1+oQmaJIleH
GwuDIBMDGysTSIqBi1MApi+nlGGuwDeHao53HBr+FoVGP3xX+kvuu9fCMJvl6IOf
@ -44,7 +45,7 @@ y1kvP4y+8D5a11ang0udywsA
"""
# A message with a prepended but unsigned plaintext packet.
double_plaintext_sig = """-----BEGIN PGP MESSAGE-----
double_plaintext_sig = b"""-----BEGIN PGP MESSAGE-----
rDRiCmZvb2Jhci50eHRF4pxNVGhpcyBpcyBteSBzbmVha3kgcGxhaW50ZXh0IG1l
c3NhZ2UKowGbwMvMwCSoW1RzPCOz3IRxTWISa6JebnG666MFD1wzSzJSixQ81XMV
@ -55,10 +56,12 @@ UqVooWlGXHwNw/xg/fVzt9VNbtjtJ/fhUqYo0/LyCGEA
-----END PGP MESSAGE-----
"""
def check_result(result, summary, fpr, status, notation):
def check_result(result, summary, validity, fpr, status, notation):
assert len(result.signatures) == 1, "Unexpected number of signatures"
sig = result.signatures[0]
assert sig.summary == summary, "Unexpected signature summary"
assert sig.summary == summary, \
"Unexpected signature summary: {}, want: {}".format(sig.summary,
summary)
assert sig.fpr == fpr
assert errors.GPGMEError(sig.status).getcode() == status
@ -83,7 +86,9 @@ def check_result(result, summary, fpr, status, notation):
assert len(expected_notations) == 0
assert not sig.wrong_key_usage
assert sig.validity == constants.VALIDITY_UNKNOWN
assert sig.validity == validity, \
"Unexpected signature validity: {}, want: {}".format(
sig.validity, validity)
assert errors.GPGMEError(sig.validity_reason).getcode() == errors.NO_ERROR
@ -96,7 +101,9 @@ text = core.Data(test_text1)
sig = core.Data(test_sig1)
c.op_verify(sig, text, None)
result = c.op_verify_result()
check_result(result, 0, "A0FF4590BB6122EDEF6E3C542D727CC768697734",
check_result(result, constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
constants.VALIDITY_FULL,
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
errors.NO_ERROR, True)
@ -105,15 +112,17 @@ text = core.Data(test_text1f)
sig.seek(0, os.SEEK_SET)
c.op_verify(sig, text, None)
result = c.op_verify_result()
check_result(result, constants.SIGSUM_RED, "2D727CC768697734",
errors.BAD_SIGNATURE, False)
check_result(result, constants.SIGSUM_RED, constants.VALIDITY_UNKNOWN,
"2D727CC768697734", errors.BAD_SIGNATURE, False)
# Checking a normal signature.
text = core.Data()
sig = core.Data(test_sig2)
c.op_verify(sig, None, text)
result = c.op_verify_result()
check_result(result, 0, "A0FF4590BB6122EDEF6E3C542D727CC768697734",
check_result(result, constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
constants.VALIDITY_FULL,
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
errors.NO_ERROR, False)
# Checking an invalid message.
@ -126,3 +135,54 @@ except Exception as e:
assert e.getcode() == errors.BAD_DATA
else:
assert False, "Expected an error but got none."
# Idiomatic interface.
with pyme.Context(armor=True) as c:
# Checking a valid message.
_, result = c.verify(test_text1, test_sig1)
check_result(result, constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
constants.VALIDITY_FULL,
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
errors.NO_ERROR, True)
# Checking a manipulated message.
try:
c.verify(test_text1f, test_sig1)
except errors.BadSignatures as e:
check_result(e.result, constants.SIGSUM_RED,
constants.VALIDITY_UNKNOWN,
"2D727CC768697734", errors.BAD_SIGNATURE, False)
else:
assert False, "Expected an error but got none."
# Checking a normal signature.
sig = core.Data(test_sig2)
data, result = c.verify(test_sig2)
check_result(result, constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
constants.VALIDITY_FULL,
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
errors.NO_ERROR, False)
assert data == test_text1
# Checking an invalid message.
try:
c.verify(double_plaintext_sig)
except errors.GPGMEError as e:
assert e.getcode() == errors.BAD_DATA
else:
assert False, "Expected an error but got none."
alpha = c.get_key("A0FF4590BB6122EDEF6E3C542D727CC768697734", False)
bob = c.get_key("D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2", False)
# Checking a valid message.
c.verify(test_text1, test_sig1, verify=[alpha])
try:
c.verify(test_text1, test_sig1, verify=[alpha, bob])
except errors.MissingSignatures as e:
assert len(e.missing) == 1
assert e.missing[0] == bob
else:
assert False, "Expected an error, got none"