python: Add an idiomatic interface.
* configure.ac: Bump required Python version. * lang/python/pyme/__init__.py: Update docstring. Import Context and Data. * lang/python/pyme/core.py (Context.encrypt): New function. (Context.decrypt): Likewise. (Context.sign): Likewise. (Context.verify): Likewise. * lang/python/pyme/errors.py: Add new errors. * lang/python/pyme/util.py (process_constants): Rework and return the inserted keys. * lang/python/tests/Makefile.am (EXTRA_DIST): Add new keys. * lang/python/tests/encrypt-only.asc: New file. * lang/python/tests/sign-only.asc: Likewise. * lang/python/tests/initial.py: Mark key 'Alpha' as trusted, import new keys. * lang/python/tests/support.py: Add fingerprints of known keys. (in_srcdir): New function. (print_data): Handle bytes too. (mark_key_trusted): New function. * lang/python/tests/t-decrypt-verify.py: Adjust test. Test idiomatic interface. * lang/python/tests/t-decrypt.py: Test idiomatic interface. * lang/python/tests/t-encrypt-sign.py: Likewise. * lang/python/tests/t-encrypt-sym.py: Likewise. * lang/python/tests/t-encrypt.py: Likewise. * lang/python/tests/t-idiomatic.py: Simplify. * lang/python/tests/t-keylist.py: Adjust to newly trusted key. * lang/python/tests/t-sign.py: Likewise. Test idiomatic interface. * lang/python/tests/t-signers.py: Likewise. * lang/python/tests/t-verify.py: Likewise. Signed-off-by: Justus Winter <justus@g10code.com>
This commit is contained in:
parent
d2f2cbd297
commit
1f318b7aaa
@ -363,7 +363,7 @@ if test "$found" = "1"; then
|
|||||||
enabled_languages=$(echo $enabled_languages | sed 's/python//')
|
enabled_languages=$(echo $enabled_languages | sed 's/python//')
|
||||||
fi
|
fi
|
||||||
else
|
else
|
||||||
AM_PATH_PYTHON([3.3])
|
AM_PATH_PYTHON([3.4])
|
||||||
AX_SWIG_PYTHON
|
AX_SWIG_PYTHON
|
||||||
if test -z "$PYTHON_VERSION"; then
|
if test -z "$PYTHON_VERSION"; then
|
||||||
if test "$explicit_languages" = "1"; then
|
if test "$explicit_languages" = "1"; then
|
||||||
|
@ -40,6 +40,20 @@ FEATURES
|
|||||||
|
|
||||||
* Fully object-oriented with convenient classes and modules.
|
* Fully object-oriented with convenient classes and modules.
|
||||||
|
|
||||||
|
QUICK EXAMPLE
|
||||||
|
-------------
|
||||||
|
|
||||||
|
>>> import pyme
|
||||||
|
>>> with pyme.Context() as c:
|
||||||
|
>>> with pyme.Context() as c:
|
||||||
|
... cipher, _, _ = c.encrypt("Hello world :)".encode(),
|
||||||
|
... passphrase="abc")
|
||||||
|
... c.decrypt(cipher, passphrase="abc")
|
||||||
|
...
|
||||||
|
(b'Hello world :)',
|
||||||
|
<pyme.results.DecryptResult object at 0x7f5ab8121080>,
|
||||||
|
<pyme.results.VerifyResult object at 0x7f5ab81219b0>)
|
||||||
|
|
||||||
GENERAL OVERVIEW
|
GENERAL OVERVIEW
|
||||||
----------------
|
----------------
|
||||||
|
|
||||||
@ -78,59 +92,14 @@ do not appear explicitly anywhere. You can use dir() python built-in command
|
|||||||
on an object to see what methods and fields it has but their meaning can
|
on an object to see what methods and fields it has but their meaning can
|
||||||
be found only in GPGME documentation.
|
be found only in GPGME documentation.
|
||||||
|
|
||||||
QUICK START SAMPLE PROGRAM
|
|
||||||
--------------------------
|
|
||||||
This program is not for serious encryption, but for example purposes only!
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import os
|
|
||||||
from pyme import core, constants
|
|
||||||
|
|
||||||
# Set up our input and output buffers.
|
|
||||||
|
|
||||||
plain = core.Data('This is my message.')
|
|
||||||
cipher = core.Data()
|
|
||||||
|
|
||||||
# Initialize our context.
|
|
||||||
|
|
||||||
c = core.Context()
|
|
||||||
c.set_armor(1)
|
|
||||||
|
|
||||||
# Set up the recipients.
|
|
||||||
|
|
||||||
sys.stdout.write("Enter name of your recipient: ")
|
|
||||||
sys.stdout.flush()
|
|
||||||
name = sys.stdin.readline().strip()
|
|
||||||
c.op_keylist_start(name, 0)
|
|
||||||
r = c.op_keylist_next()
|
|
||||||
|
|
||||||
# Do the encryption.
|
|
||||||
|
|
||||||
c.op_encrypt([r], 1, plain, cipher)
|
|
||||||
cipher.seek(0, os.SEEK_SET)
|
|
||||||
sys.stdout.buffer.write(cipher.read())
|
|
||||||
|
|
||||||
Note that although there is no explicit error checking done here, the
|
|
||||||
Python GPGME library is automatically doing error-checking, and will
|
|
||||||
raise an exception if there is any problem.
|
|
||||||
|
|
||||||
This program is in the Pyme distribution as examples/simple.py. The examples
|
|
||||||
directory contains more advanced samples as well.
|
|
||||||
|
|
||||||
FOR MORE INFORMATION
|
FOR MORE INFORMATION
|
||||||
--------------------
|
--------------------
|
||||||
PYME homepage: http://pyme.sourceforge.net
|
PYME3 homepage: https://www.gnupg.org/
|
||||||
GPGME documentation: http://pyme.sourceforge.net/doc/gpgme/index.html
|
GPGME documentation: https://www.gnupg.org/documentation/manuals/gpgme/
|
||||||
GPGME homepage: http://www.gnupg.org/gpgme.html
|
|
||||||
|
|
||||||
Base classes: pyme.core (START HERE!)
|
|
||||||
Error classes: pyme.errors
|
|
||||||
Constants: pyme.constants
|
|
||||||
Version information: pyme.version
|
|
||||||
Utilities: pyme.util
|
|
||||||
|
|
||||||
Base classes are documented at pyme.core.
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
__all__ = ['core', 'errors', 'constants', 'util', 'callbacks', 'version']
|
__all__ = ['core', 'errors', 'constants', 'util', 'callbacks', 'version']
|
||||||
|
|
||||||
|
from .core import Context
|
||||||
|
from .core import Data
|
||||||
|
@ -25,6 +25,7 @@ and the 'Data' class describing buffers of data.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
import os
|
||||||
import weakref
|
import weakref
|
||||||
from . import pygpgme
|
from . import pygpgme
|
||||||
from .errors import errorcheck, GPGMEError
|
from .errors import errorcheck, GPGMEError
|
||||||
@ -166,6 +167,303 @@ class Context(GpgmeWrapper):
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def __init__(self, armor=False, textmode=False, offline=False,
|
||||||
|
signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT,
|
||||||
|
wrapped=None):
|
||||||
|
"""Construct a context object
|
||||||
|
|
||||||
|
Keyword arguments:
|
||||||
|
armor -- enable ASCII armoring (default False)
|
||||||
|
textmode -- enable canonical text mode (default False)
|
||||||
|
offline -- do not contact external key sources (default False)
|
||||||
|
signers -- list of keys used for signing (default [])
|
||||||
|
pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT)
|
||||||
|
|
||||||
|
"""
|
||||||
|
if wrapped:
|
||||||
|
self.own = False
|
||||||
|
else:
|
||||||
|
tmp = pygpgme.new_gpgme_ctx_t_p()
|
||||||
|
errorcheck(pygpgme.gpgme_new(tmp))
|
||||||
|
wrapped = pygpgme.gpgme_ctx_t_p_value(tmp)
|
||||||
|
pygpgme.delete_gpgme_ctx_t_p(tmp)
|
||||||
|
self.own = True
|
||||||
|
super().__init__(wrapped)
|
||||||
|
self.armor = armor
|
||||||
|
self.textmode = textmode
|
||||||
|
self.offline = offline
|
||||||
|
self.signers = signers
|
||||||
|
self.pinentry_mode = pinentry_mode
|
||||||
|
|
||||||
|
def encrypt(self, plaintext, recipients=[], sign=True, sink=None,
|
||||||
|
passphrase=None, always_trust=False, add_encrypt_to=False,
|
||||||
|
prepare=False, expect_sign=False, compress=True):
|
||||||
|
"""Encrypt data
|
||||||
|
|
||||||
|
Encrypt the given plaintext for the given recipients. If the
|
||||||
|
list of recipients is empty, the data is encrypted
|
||||||
|
symmetrically with a passphrase.
|
||||||
|
|
||||||
|
The passphrase can be given as parameter, using a callback
|
||||||
|
registered at the context, or out-of-band via pinentry.
|
||||||
|
|
||||||
|
Keyword arguments:
|
||||||
|
recipients -- list of keys to encrypt to
|
||||||
|
sign -- sign plaintext (default True)
|
||||||
|
sink -- write result to sink instead of returning it
|
||||||
|
passphrase -- for symmetric encryption
|
||||||
|
always_trust -- always trust the keys (default False)
|
||||||
|
add_encrypt_to -- encrypt to configured additional keys (default False)
|
||||||
|
prepare -- (ui) prepare for encryption (default False)
|
||||||
|
expect_sign -- (ui) prepare for signing (default False)
|
||||||
|
compress -- compress plaintext (default True)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ciphertext -- the encrypted data (or None if sink is given)
|
||||||
|
result -- additional information about the encryption
|
||||||
|
sign_result -- additional information about the signature(s)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
InvalidRecipients -- if encryption using a particular key failed
|
||||||
|
InvalidSigners -- if signing using a particular key failed
|
||||||
|
GPGMEError -- as signaled by the underlying library
|
||||||
|
|
||||||
|
"""
|
||||||
|
ciphertext = sink if sink else Data()
|
||||||
|
flags = 0
|
||||||
|
flags |= always_trust * constants.ENCRYPT_ALWAYS_TRUST
|
||||||
|
flags |= (not add_encrypt_to) * constants.ENCRYPT_NO_ENCRYPT_TO
|
||||||
|
flags |= prepare * constants.ENCRYPT_PREPARE
|
||||||
|
flags |= expect_sign * constants.ENCRYPT_EXPECT_SIGN
|
||||||
|
flags |= (not compress) * constants.ENCRYPT_NO_COMPRESS
|
||||||
|
|
||||||
|
if passphrase != None:
|
||||||
|
old_pinentry_mode = self.pinentry_mode
|
||||||
|
old_passphrase_cb = getattr(self, '_passphrase_cb', None)
|
||||||
|
self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
|
||||||
|
def passphrase_cb(hint, desc, prev_bad, hook=None):
|
||||||
|
return passphrase
|
||||||
|
self.set_passphrase_cb(passphrase_cb)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if sign:
|
||||||
|
self.op_encrypt_sign(recipients, flags, plaintext, ciphertext)
|
||||||
|
else:
|
||||||
|
self.op_encrypt(recipients, flags, plaintext, ciphertext)
|
||||||
|
except errors.GPGMEError as e:
|
||||||
|
if e.getcode() == errors.UNUSABLE_PUBKEY:
|
||||||
|
result = self.op_encrypt_result()
|
||||||
|
if result.invalid_recipients:
|
||||||
|
raise errors.InvalidRecipients(result.invalid_recipients)
|
||||||
|
if e.getcode() == errors.UNUSABLE_SECKEY:
|
||||||
|
sig_result = self.op_sign_result()
|
||||||
|
if sig_result.invalid_signers:
|
||||||
|
raise errors.InvalidSigners(sig_result.invalid_signers)
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
if passphrase != None:
|
||||||
|
self.pinentry_mode = old_pinentry_mode
|
||||||
|
if old_passphrase_cb:
|
||||||
|
self.set_passphrase_cb(*old_passphrase_cb[1:])
|
||||||
|
|
||||||
|
result = self.op_encrypt_result()
|
||||||
|
assert not result.invalid_recipients
|
||||||
|
sig_result = self.op_sign_result() if sign else None
|
||||||
|
assert not sig_result or not sig_result.invalid_signers
|
||||||
|
|
||||||
|
cipherbytes = None
|
||||||
|
if not sink:
|
||||||
|
ciphertext.seek(0, os.SEEK_SET)
|
||||||
|
cipherbytes = ciphertext.read()
|
||||||
|
return cipherbytes, result, sig_result
|
||||||
|
|
||||||
|
def decrypt(self, ciphertext, sink=None, passphrase=None, verify=True):
|
||||||
|
"""Decrypt data
|
||||||
|
|
||||||
|
Decrypt the given ciphertext and verify any signatures. If
|
||||||
|
VERIFY is an iterable of keys, the ciphertext must be signed
|
||||||
|
by all those keys, otherwise an error is raised.
|
||||||
|
|
||||||
|
If the ciphertext is symmetrically encrypted using a
|
||||||
|
passphrase, that passphrase can be given as parameter, using a
|
||||||
|
callback registered at the context, or out-of-band via
|
||||||
|
pinentry.
|
||||||
|
|
||||||
|
Keyword arguments:
|
||||||
|
sink -- write result to sink instead of returning it
|
||||||
|
passphrase -- for symmetric decryption
|
||||||
|
verify -- check signatures (default True)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
plaintext -- the decrypted data (or None if sink is given)
|
||||||
|
result -- additional information about the decryption
|
||||||
|
verify_result -- additional information about the signature(s)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
UnsupportedAlgorithm -- if an unsupported algorithm was used
|
||||||
|
BadSignatures -- if a bad signature is encountered
|
||||||
|
MissingSignatures -- if expected signatures are missing or bad
|
||||||
|
GPGMEError -- as signaled by the underlying library
|
||||||
|
|
||||||
|
"""
|
||||||
|
plaintext = sink if sink else Data()
|
||||||
|
|
||||||
|
if passphrase != None:
|
||||||
|
old_pinentry_mode = self.pinentry_mode
|
||||||
|
old_passphrase_cb = getattr(self, '_passphrase_cb', None)
|
||||||
|
self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
|
||||||
|
def passphrase_cb(hint, desc, prev_bad, hook=None):
|
||||||
|
return passphrase
|
||||||
|
self.set_passphrase_cb(passphrase_cb)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if verify:
|
||||||
|
self.op_decrypt_verify(ciphertext, plaintext)
|
||||||
|
else:
|
||||||
|
self.op_decrypt(ciphertext, plaintext)
|
||||||
|
finally:
|
||||||
|
if passphrase != None:
|
||||||
|
self.pinentry_mode = old_pinentry_mode
|
||||||
|
if old_passphrase_cb:
|
||||||
|
self.set_passphrase_cb(*old_passphrase_cb[1:])
|
||||||
|
|
||||||
|
result = self.op_decrypt_result()
|
||||||
|
verify_result = self.op_verify_result() if verify else None
|
||||||
|
if result.unsupported_algorithm:
|
||||||
|
raise errors.UnsupportedAlgorithm(result.unsupported_algorithm)
|
||||||
|
|
||||||
|
if verify:
|
||||||
|
if any(s.status != errors.NO_ERROR
|
||||||
|
for s in verify_result.signatures):
|
||||||
|
raise errors.BadSignatures(verify_result)
|
||||||
|
|
||||||
|
if verify and verify != True:
|
||||||
|
missing = list()
|
||||||
|
for key in verify:
|
||||||
|
ok = False
|
||||||
|
for subkey in key.subkeys:
|
||||||
|
for sig in verify_result.signatures:
|
||||||
|
if sig.summary & constants.SIGSUM_VALID == 0:
|
||||||
|
continue
|
||||||
|
if subkey.can_sign and subkey.fpr == sig.fpr:
|
||||||
|
ok = True
|
||||||
|
break
|
||||||
|
if ok:
|
||||||
|
break
|
||||||
|
if not ok:
|
||||||
|
missing.append(key)
|
||||||
|
if missing:
|
||||||
|
raise errors.MissingSignatures(verify_result, missing)
|
||||||
|
|
||||||
|
plainbytes = None
|
||||||
|
if not sink:
|
||||||
|
plaintext.seek(0, os.SEEK_SET)
|
||||||
|
plainbytes = plaintext.read()
|
||||||
|
return plainbytes, result, verify_result
|
||||||
|
|
||||||
|
def sign(self, data, sink=None, mode=constants.SIG_MODE_NORMAL):
|
||||||
|
"""Sign data
|
||||||
|
|
||||||
|
Sign the given data with either the configured default local
|
||||||
|
key, or the 'signers' keys of this context.
|
||||||
|
|
||||||
|
Keyword arguments:
|
||||||
|
mode -- signature mode (default: normal, see below)
|
||||||
|
sink -- write result to sink instead of returning it
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
either
|
||||||
|
signed_data -- encoded data and signature (normal mode)
|
||||||
|
signature -- only the signature data (detached mode)
|
||||||
|
cleartext -- data and signature as text (cleartext mode)
|
||||||
|
(or None if sink is given)
|
||||||
|
result -- additional information about the signature(s)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
InvalidSigners -- if signing using a particular key failed
|
||||||
|
GPGMEError -- as signaled by the underlying library
|
||||||
|
|
||||||
|
"""
|
||||||
|
signeddata = sink if sink else Data()
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.op_sign(data, signeddata, mode)
|
||||||
|
except errors.GPGMEError as e:
|
||||||
|
if e.getcode() == errors.UNUSABLE_SECKEY:
|
||||||
|
result = self.op_sign_result()
|
||||||
|
if result.invalid_signers:
|
||||||
|
raise errors.InvalidSigners(result.invalid_signers)
|
||||||
|
raise
|
||||||
|
|
||||||
|
result = self.op_sign_result()
|
||||||
|
assert not result.invalid_signers
|
||||||
|
|
||||||
|
signedbytes = None
|
||||||
|
if not sink:
|
||||||
|
signeddata.seek(0, os.SEEK_SET)
|
||||||
|
signedbytes = signeddata.read()
|
||||||
|
return signedbytes, result
|
||||||
|
|
||||||
|
def verify(self, signed_data, signature=None, sink=None, verify=[]):
|
||||||
|
"""Verify signatures
|
||||||
|
|
||||||
|
Verify signatures over data. If VERIFY is an iterable of
|
||||||
|
keys, the ciphertext must be signed by all those keys,
|
||||||
|
otherwise an error is raised.
|
||||||
|
|
||||||
|
Keyword arguments:
|
||||||
|
signature -- detached signature data
|
||||||
|
sink -- write result to sink instead of returning it
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
data -- the plain data
|
||||||
|
(or None if sink is given, or we verified a detached signature)
|
||||||
|
result -- additional information about the signature(s)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
BadSignatures -- if a bad signature is encountered
|
||||||
|
MissingSignatures -- if expected signatures are missing or bad
|
||||||
|
GPGMEError -- as signaled by the underlying library
|
||||||
|
|
||||||
|
"""
|
||||||
|
if signature:
|
||||||
|
# Detached signature, we don't return the plain text.
|
||||||
|
data = None
|
||||||
|
else:
|
||||||
|
data = sink if sink else Data()
|
||||||
|
|
||||||
|
if signature:
|
||||||
|
self.op_verify(signature, signed_data, None)
|
||||||
|
else:
|
||||||
|
self.op_verify(signed_data, None, data)
|
||||||
|
|
||||||
|
result = self.op_verify_result()
|
||||||
|
if any(s.status != errors.NO_ERROR for s in result.signatures):
|
||||||
|
raise errors.BadSignatures(result)
|
||||||
|
|
||||||
|
missing = list()
|
||||||
|
for key in verify:
|
||||||
|
ok = False
|
||||||
|
for subkey in key.subkeys:
|
||||||
|
for sig in result.signatures:
|
||||||
|
if sig.summary & constants.SIGSUM_VALID == 0:
|
||||||
|
continue
|
||||||
|
if subkey.can_sign and subkey.fpr == sig.fpr:
|
||||||
|
ok = True
|
||||||
|
break
|
||||||
|
if ok:
|
||||||
|
break
|
||||||
|
if not ok:
|
||||||
|
missing.append(key)
|
||||||
|
if missing:
|
||||||
|
raise errors.MissingSignatures(result, missing)
|
||||||
|
|
||||||
|
plainbytes = None
|
||||||
|
if data and not sink:
|
||||||
|
data.seek(0, os.SEEK_SET)
|
||||||
|
plainbytes = data.read()
|
||||||
|
return plainbytes, result
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def signers(self):
|
def signers(self):
|
||||||
"""Keys used for signing"""
|
"""Keys used for signing"""
|
||||||
@ -204,32 +502,6 @@ class Context(GpgmeWrapper):
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
_boolean_properties = {'armor', 'textmode', 'offline'}
|
_boolean_properties = {'armor', 'textmode', 'offline'}
|
||||||
def __init__(self, armor=False, textmode=False, offline=False,
|
|
||||||
signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT,
|
|
||||||
wrapped=None):
|
|
||||||
"""Construct a context object
|
|
||||||
|
|
||||||
Keyword arguments:
|
|
||||||
armor -- enable ASCII armoring (default False)
|
|
||||||
textmode -- enable canonical text mode (default False)
|
|
||||||
offline -- do not contact external key sources (default False)
|
|
||||||
signers -- list of keys used for signing (default [])
|
|
||||||
pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT)
|
|
||||||
"""
|
|
||||||
if wrapped:
|
|
||||||
self.own = False
|
|
||||||
else:
|
|
||||||
tmp = pygpgme.new_gpgme_ctx_t_p()
|
|
||||||
errorcheck(pygpgme.gpgme_new(tmp))
|
|
||||||
wrapped = pygpgme.gpgme_ctx_t_p_value(tmp)
|
|
||||||
pygpgme.delete_gpgme_ctx_t_p(tmp)
|
|
||||||
self.own = True
|
|
||||||
super().__init__(wrapped)
|
|
||||||
self.armor = armor
|
|
||||||
self.textmode = textmode
|
|
||||||
self.offline = offline
|
|
||||||
self.signers = signers
|
|
||||||
self.pinentry_mode = pinentry_mode
|
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self):
|
||||||
if not pygpgme:
|
if not pygpgme:
|
||||||
|
@ -20,7 +20,10 @@ from . import util
|
|||||||
|
|
||||||
util.process_constants('GPG_ERR_', globals())
|
util.process_constants('GPG_ERR_', globals())
|
||||||
|
|
||||||
class GPGMEError(Exception):
|
class PymeError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class GPGMEError(PymeError):
|
||||||
def __init__(self, error = None, message = None):
|
def __init__(self, error = None, message = None):
|
||||||
self.error = error
|
self.error = error
|
||||||
self.message = message
|
self.message = message
|
||||||
@ -43,8 +46,60 @@ class GPGMEError(Exception):
|
|||||||
return pygpgme.gpgme_err_source(self.error)
|
return pygpgme.gpgme_err_source(self.error)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "%s (%d,%d)"%(self.getstring(), self.getsource(), self.getcode())
|
return self.getstring()
|
||||||
|
|
||||||
def errorcheck(retval, extradata = None):
|
def errorcheck(retval, extradata = None):
|
||||||
if retval:
|
if retval:
|
||||||
raise GPGMEError(retval, extradata)
|
raise GPGMEError(retval, extradata)
|
||||||
|
|
||||||
|
# These errors are raised in the idiomatic interface code.
|
||||||
|
|
||||||
|
class EncryptionError(PymeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class InvalidRecipients(EncryptionError):
|
||||||
|
def __init__(self, recipients):
|
||||||
|
self.recipients = recipients
|
||||||
|
def __str__(self):
|
||||||
|
return ", ".join("{}: {}".format(r.fpr,
|
||||||
|
pygpgme.gpgme_strerror(r.reason))
|
||||||
|
for r in self.recipients)
|
||||||
|
|
||||||
|
class DeryptionError(PymeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class UnsupportedAlgorithm(DeryptionError):
|
||||||
|
def __init__(self, algorithm):
|
||||||
|
self.algorithm = algorithm
|
||||||
|
def __str__(self):
|
||||||
|
return self.algorithm
|
||||||
|
|
||||||
|
class SigningError(PymeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class InvalidSigners(SigningError):
|
||||||
|
def __init__(self, signers):
|
||||||
|
self.signers = signers
|
||||||
|
def __str__(self):
|
||||||
|
return ", ".join("{}: {}".format(s.fpr,
|
||||||
|
pygpgme.gpgme_strerror(s.reason))
|
||||||
|
for s in self.signers)
|
||||||
|
|
||||||
|
class VerificationError(PymeError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class BadSignatures(VerificationError):
|
||||||
|
def __init__(self, result):
|
||||||
|
self.result = result
|
||||||
|
def __str__(self):
|
||||||
|
return ", ".join("{}: {}".format(s.fpr,
|
||||||
|
pygpgme.gpgme_strerror(s.status))
|
||||||
|
for s in self.result.signatures
|
||||||
|
if s.status != NO_ERROR)
|
||||||
|
|
||||||
|
class MissingSignatures(VerificationError):
|
||||||
|
def __init__(self, result, missing):
|
||||||
|
self.result = result
|
||||||
|
self.missing = missing
|
||||||
|
def __str__(self):
|
||||||
|
return ", ".join(k.subkeys[0].fpr for k in self.missing)
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
# Copyright (C) 2016 g10 Code GmbH
|
||||||
# Copyright (C) 2004,2008 Igor Belyi <belyi@users.sourceforge.net>
|
# Copyright (C) 2004,2008 Igor Belyi <belyi@users.sourceforge.net>
|
||||||
# Copyright (C) 2002 John Goerzen <jgoerzen@complete.org>
|
# Copyright (C) 2002 John Goerzen <jgoerzen@complete.org>
|
||||||
#
|
#
|
||||||
@ -17,12 +18,16 @@
|
|||||||
|
|
||||||
from . import pygpgme
|
from . import pygpgme
|
||||||
|
|
||||||
def process_constants(starttext, dict):
|
def process_constants(prefix, scope):
|
||||||
"""Called by the constant libraries to load up the appropriate constants
|
"""Called by the constant modules to load up the constants from the C
|
||||||
from the C library."""
|
library starting with PREFIX. Matching constants will be inserted
|
||||||
index = len(starttext)
|
into SCOPE with PREFIX stripped from the names. Returns the names
|
||||||
for identifier in dir(pygpgme):
|
of inserted constants.
|
||||||
if not identifier.startswith(starttext):
|
|
||||||
continue
|
"""
|
||||||
name = identifier[index:]
|
index = len(prefix)
|
||||||
dict[name] = getattr(pygpgme, identifier)
|
constants = {identifier[index:]: getattr(pygpgme, identifier)
|
||||||
|
for identifier in dir(pygpgme)
|
||||||
|
if identifier.startswith(prefix)}
|
||||||
|
scope.update(constants)
|
||||||
|
return list(constants.keys())
|
||||||
|
@ -52,7 +52,7 @@ py_tests = t-wrapper.py \
|
|||||||
t-idiomatic.py
|
t-idiomatic.py
|
||||||
|
|
||||||
TESTS = initial.py $(py_tests) final.py
|
TESTS = initial.py $(py_tests) final.py
|
||||||
EXTRA_DIST = support.py $(TESTS)
|
EXTRA_DIST = support.py $(TESTS) encrypt-only.asc sign-only.asc
|
||||||
|
|
||||||
CLEANFILES = secring.gpg pubring.gpg pubring.kbx trustdb.gpg dirmngr.conf \
|
CLEANFILES = secring.gpg pubring.gpg pubring.kbx trustdb.gpg dirmngr.conf \
|
||||||
gpg-agent.conf pubring.kbx~ gpg.conf pubring.gpg~ \
|
gpg-agent.conf pubring.kbx~ gpg.conf pubring.gpg~ \
|
||||||
|
33
lang/python/tests/encrypt-only.asc
Normal file
33
lang/python/tests/encrypt-only.asc
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
-----BEGIN PGP PRIVATE KEY BLOCK-----
|
||||||
|
Version: GnuPG v2
|
||||||
|
|
||||||
|
lQPGBFd/jL0BCAD8jfoblIrlHS0shDCbSiO7RFaT6sEa/6tSPkv6XzBba9oXOkuO
|
||||||
|
FLTkNpIwPb92U8SOS+27j7n9v6U5NW2tyZwIoeLb8lUyKnCBr22IUhTFVXf7fros
|
||||||
|
zmPugsJaDBi9f7RL0bqiCn4EV3DGKyAukZklk1k1JV4Ec3dEPMAmL9LmnvXreEjU
|
||||||
|
pQZZN9sJV32ew8CYkZ6AB8foFQwfxn4x0iUoKvj8kW9RsY1KMPucp4YiFhHeMZW1
|
||||||
|
5wGAZdEIZYKyWEp4bi/wC9yn/TUR5uNWc0uVJzQvuHwaYjolPW89DinjBkPEJCBr
|
||||||
|
RwumaOWfbu/hb51wBoUTmUr9diVw93L2ROLPABEBAAH+BwMC1bmUAoPJKI/WBiHm
|
||||||
|
P6tSNRLdd+7etfjAFvKL7Ob2pNTrc3hbtyOLIQ9tuEaqXEyfnCms/DCg8QdkaFUv
|
||||||
|
Nkoj0W5+G/MQuR2jIvrq/wyL/4jIw0AFbp9/V1JbSXZh2g1eJLnnykn7uPxCbDFY
|
||||||
|
FrVeFmkhoxZ3pid6ZQSWlxXsdW+YMvbUfNIIZpbygI/alIBvbDS1YJYEBDCwFZjU
|
||||||
|
7quE2Ufxo8dm34EHcmbpYpn4r3DUrU5AHQ2fIprLIVqHn4+NUrR8WZS9nCnIeu/z
|
||||||
|
OaJUZ2lJFRjUC6Gpsbsw6Xwh4Ntwzyt2SsXc+UVZngjozw3yw0VpDifxMBqcd+9x
|
||||||
|
baSc7dfbOZF2BCZOwnB7/QrFZDaqe5b3n6rTdj1va/CrJMuxbgaNAjvLpdT2EUPZ
|
||||||
|
fHDAdPAjASofxBREv+HIKwksuPJ9cvavZU6Q4KQA7buo25hd7yjuba4WbLQhp0jH
|
||||||
|
AT1P7SdakMhk/IFcUKFdB3ZyZZZ1JTTPa2xZn9yDa3Jb1t7IMLYLwY6EFbjvaxH5
|
||||||
|
WEGZvOAq2iEa941mxv4miwgf7MQPx6g9u0+dXc7iZApwWs9MNfJo3J25sKhWK5Be
|
||||||
|
Bu3w7c6nrlg40GtPuDRgaBvYWbVerJcepTA/EPfugEJtRsDJkt7wZq1H9lWHU7Ih
|
||||||
|
Up6/+XKtBzlCIqYjorzFLnC721pcKFcPhLgvtjjNJvUsLXbr9CwnBub/eTFcfRb2
|
||||||
|
ro60H9cOhf0fQSQyvkZWfzq0BN6rG27G1KhyprsJAmpW0fTHHkB4V19788C2sTQv
|
||||||
|
D93VU3Nd6MWocwAYtPWmtwXPpuOAU9IcwAvVTxBeBJCXxbH3uyx1frwDXA7lf4Pb
|
||||||
|
a8hMoMMVU+rAG1uepKI5h4seBIKP7qKEKAPloI6/Vtf7/Ump4DKprS1QpfOW+lsX
|
||||||
|
aR48lgNR6sQXtDdFbmNyeXB0aW9uIE9ubHkgKHRlc3Qga2V5LCBkbyBub3QgdXNl
|
||||||
|
KSA8ZW9AZXhhbXBsZS5vcmc+iQE3BBMBCAAhBQJXf4y9AhsNBQsJCAcCBhUICQoL
|
||||||
|
AgQWAgMBAh4BAheAAAoJEJIFcnabn+Gc/KgH/07wzrsBzTqdI5L6cIqQ81Vq8ASj
|
||||||
|
tsuYoVfFxymB8F/AxpnLMhYRuWQTcoUHQ/olG2yA0C6o4e1JPAmh6LQGwr0eRnc2
|
||||||
|
2tr4cbnQAhXpJ8xOR6kH9eE8nGeC7tlEeeV/Wnj3SLZOXOjYjnA9bA3JX9DP3qcz
|
||||||
|
w1sKQPEHsGkMJuT0ZadnlJ1qw8AnnNKLDlG4kIO9hz3qB8BjxFZf+j5f/nhFNv5I
|
||||||
|
pnNdMcDwQqHVrwD6WO+Xmmdykab0awL9To0S9DG9ohcXuJiTMa8vtXFSBM0koUDk
|
||||||
|
BWajEq+QAcDpmdFsQr4/gbzvHkAIVTQb0seJr4gpmXFZu3TMuGVD9j13GaI=
|
||||||
|
=38ri
|
||||||
|
-----END PGP PRIVATE KEY BLOCK-----
|
@ -19,6 +19,20 @@
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import pyme
|
||||||
|
import support
|
||||||
|
support.init_gpgme(pyme.constants.PROTOCOL_OpenPGP)
|
||||||
|
|
||||||
subprocess.check_call([os.path.join(os.getenv('top_srcdir'),
|
subprocess.check_call([os.path.join(os.getenv('top_srcdir'),
|
||||||
"tests", "start-stop-agent"), "--start"])
|
"tests", "start-stop-agent"), "--start"])
|
||||||
|
|
||||||
|
with pyme.Context() as c:
|
||||||
|
alpha = c.get_key("A0FF4590BB6122EDEF6E3C542D727CC768697734", False)
|
||||||
|
bob = c.get_key("D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2", False)
|
||||||
|
|
||||||
|
# Mark alpha as trusted. The signature verification tests expect
|
||||||
|
# this.
|
||||||
|
support.mark_key_trusted(c, alpha)
|
||||||
|
|
||||||
|
c.op_import(open(support.in_srcdir("encrypt-only.asc")))
|
||||||
|
c.op_import(open(support.in_srcdir("sign-only.asc")))
|
||||||
|
33
lang/python/tests/sign-only.asc
Normal file
33
lang/python/tests/sign-only.asc
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
-----BEGIN PGP PRIVATE KEY BLOCK-----
|
||||||
|
Version: GnuPG v2
|
||||||
|
|
||||||
|
lQPFBFd/jO8BCADiull4EVJiKmJqclPyU6GhTlbJXw7Ch0zbFAauOWYT3ACmgr1U
|
||||||
|
KfJlZ2sPe2EezZkVSACxgIjTCzcgKQLh/swXdhO8uEgWEIN8f07WcSVDrcRGYwDS
|
||||||
|
KFSRsK0bfO/OQQDUsSkNQSHjcOdLnCHCinMrQi1mBZOs+Y/DXOkkEV1zbFFV7q6X
|
||||||
|
4vX9HSWwTRQTdOV9CFZykbwM+X1YIZlVtpOAKqSNJi3P17uQF7P9zko6HWKKKQ5S
|
||||||
|
96BfXUOIpBRl82R85/yQgeGrWlvZ2BT2ittscNQlBKqLHJ7LIeDr9ctbKlKZjHTn
|
||||||
|
Da7NYg+PoMHspbizjSONbEzpcR/9ZUq16oJJABEBAAH+BwMC7hQZNJSmlX/W6sfL
|
||||||
|
0wakX6kTsiCEMy2vMCRcZ769JKT234avHtkL/g7MBJEzqdG9HSEp7+LHGuOWJhfa
|
||||||
|
20f61WvPT5ujUIy//QXJ9a8z877jCm+fHKCTDXGYLLfCkJLfr3/GfTRy6gaIGTSw
|
||||||
|
BqZaRelPvHbMp+eiFqDkf8W/E1LO3/83k87+pXggjz4p0OasyMw8RcDmy+IKBMGG
|
||||||
|
bzet5WIKHIhpblIzuuucQHOjtwA8vCedub3F4lcRuULe2GW6sNuCB9kjSC9g6D1d
|
||||||
|
bJ+WYi5GiUUQARGSNXiWVoVPLpEo0i6/2bKJ7vBYGRewNp42ebVQU2bFW7uzhaIq
|
||||||
|
4itzNTjFNTpcxX3Lo0/mzJpe7pVRJwN+HGahNGT0EtPDsT/nNTFDUq8e8nt0U9/8
|
||||||
|
0eekg4MRBJEzE3A+wosIHPjzCkQgu98+nh79rPMbCpZVxNfLb136cTkubmHCWptN
|
||||||
|
T2MbqK2L4hMcOxHGGOmI9SjFltNeKtTsVtkxh3Vj67UESPdN550centfasJYA0bj
|
||||||
|
guRQfHWHJXYIfFwblIFkl8xtIVLTeWlQMEvc7oI8jcJOc2ri8Zdjj/55xxv/RvjC
|
||||||
|
ZKzfjPpdkLYcN1zP/hETLD68u7WmiMAYCr8Eq9YQ3oKklUpWxRMCAAtmgjGGpm5P
|
||||||
|
QQW+36s96Q3cuG8R0Z4Wo8y89FgWzCEzuAhemCdffoUA8kn0HJQaVndnExJb1Ebz
|
||||||
|
wp+zsX/JqiOFvcKHJAWCaXkk0oXVi1aIV4tQyCPfhyjnd846K7g8UabAz51IJHvF
|
||||||
|
CXRAmqJvu26NqjYOfWBJJxZQsPH4FjPfYx+e/MFPZa+UTKCfzaOHClrePHUDHw58
|
||||||
|
Ez5ItcORYn51IWW33r+c4tlhW5mrjMD7FcjFOuYT4EIivd5BSnwLP0fjBz8TBVAY
|
||||||
|
yyFO+YAXTQ+0MVNpZ24gT25seSAodGVzdCBrZXksIGRvIG5vdCB1c2UpIDxzb0Bl
|
||||||
|
eGFtcGxlLm9yZz6JATcEEwEIACEFAld/jO8CGwMFCwkIBwIGFQgJCgsCBBYCAwEC
|
||||||
|
HgECF4AACgkQ/tFT8S8Y9F3PAwgAvKav6+luvcAhrpBMO4z/Q8kDMtO5AW1KTEcz
|
||||||
|
neqpj5eTVJVbYUgDuBlEXbFYtcZmYyYtJC5KQkN3bxPmehVUzGk27UYWMWbPIWyU
|
||||||
|
riGcFL5BWWQaKSqiWUypzhNVnxYoiWVhHeJ36LICVMpLBaubgcpwCSW/j58yZo/7
|
||||||
|
XRwf40OblXr4cevIW4Oq5GSxKOQF+DCErF6BeikC2i+NoqSxwNiIO/1NUxs8QfAI
|
||||||
|
z8UT/bSUXr62BWLfeCIDGgXutMMPth3tKi4DlvLCzI6eYJrd8E3Rt7iUZm9IH8OQ
|
||||||
|
Djv2DKnL/E/AP8oITItrOmICqfEWcj+Tk2Xep4pCCMNU+Pa0yg==
|
||||||
|
=gG5b
|
||||||
|
-----END PGP PRIVATE KEY BLOCK-----
|
@ -19,14 +19,48 @@ import sys
|
|||||||
import os
|
import os
|
||||||
from pyme import core
|
from pyme import core
|
||||||
|
|
||||||
|
# known keys
|
||||||
|
alpha = "A0FF4590BB6122EDEF6E3C542D727CC768697734"
|
||||||
|
bob = "D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2"
|
||||||
|
encrypt_only = "F52770D5C4DB41408D918C9F920572769B9FE19C"
|
||||||
|
sign_only = "7CCA20CCDE5394CEE71C9F0BFED153F12F18F45D"
|
||||||
|
|
||||||
def make_filename(name):
|
def make_filename(name):
|
||||||
return os.path.join(os.environ['top_srcdir'], 'tests', 'gpg', name)
|
return os.path.join(os.environ['top_srcdir'], 'tests', 'gpg', name)
|
||||||
|
|
||||||
|
def in_srcdir(name):
|
||||||
|
return os.path.join(os.environ['srcdir'], name)
|
||||||
|
|
||||||
def init_gpgme(proto):
|
def init_gpgme(proto):
|
||||||
core.engine_check_version(proto)
|
core.engine_check_version(proto)
|
||||||
|
|
||||||
verbose = int(os.environ.get('verbose', 0)) > 1
|
verbose = int(os.environ.get('verbose', 0)) > 1
|
||||||
def print_data(data):
|
def print_data(data):
|
||||||
if verbose:
|
if verbose:
|
||||||
data.seek(0, os.SEEK_SET)
|
try:
|
||||||
sys.stdout.buffer.write(data.read())
|
# See if it is a file-like object.
|
||||||
|
data.seek(0, os.SEEK_SET)
|
||||||
|
data = data.read()
|
||||||
|
except:
|
||||||
|
# Hope for the best.
|
||||||
|
pass
|
||||||
|
sys.stdout.buffer.write(data)
|
||||||
|
|
||||||
|
def mark_key_trusted(ctx, key):
|
||||||
|
class Editor(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.steps = ["trust", "save"]
|
||||||
|
def edit(self, status, args, out):
|
||||||
|
if args == "keyedit.prompt":
|
||||||
|
result = self.steps.pop(0)
|
||||||
|
elif args == "edit_ownertrust.value":
|
||||||
|
result = "5"
|
||||||
|
elif args == "edit_ownertrust.set_ultimate.okay":
|
||||||
|
result = "Y"
|
||||||
|
elif args == "keyedit.save.okay":
|
||||||
|
result = "Y"
|
||||||
|
else:
|
||||||
|
result = None
|
||||||
|
return result
|
||||||
|
with core.Data() as sink:
|
||||||
|
ctx.op_edit(key, Editor().edit, sink, sink)
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
# You should have received a copy of the GNU Lesser General Public
|
# You should have received a copy of the GNU Lesser General Public
|
||||||
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import pyme
|
||||||
from pyme import core, constants, errors
|
from pyme import core, constants, errors
|
||||||
import support
|
import support
|
||||||
|
|
||||||
@ -28,7 +29,7 @@ def check_verify_result(result, summary, fpr, status):
|
|||||||
assert errors.GPGMEError(sig.status).getcode() == status
|
assert errors.GPGMEError(sig.status).getcode() == status
|
||||||
assert len(sig.notations) == 0
|
assert len(sig.notations) == 0
|
||||||
assert not sig.wrong_key_usage
|
assert not sig.wrong_key_usage
|
||||||
assert sig.validity == constants.VALIDITY_UNKNOWN
|
assert sig.validity == constants.VALIDITY_FULL
|
||||||
assert errors.GPGMEError(sig.validity_reason).getcode() == errors.NO_ERROR
|
assert errors.GPGMEError(sig.validity_reason).getcode() == errors.NO_ERROR
|
||||||
|
|
||||||
support.init_gpgme(constants.PROTOCOL_OpenPGP)
|
support.init_gpgme(constants.PROTOCOL_OpenPGP)
|
||||||
@ -45,6 +46,29 @@ assert not result.unsupported_algorithm, \
|
|||||||
support.print_data(sink)
|
support.print_data(sink)
|
||||||
|
|
||||||
verify_result = c.op_verify_result()
|
verify_result = c.op_verify_result()
|
||||||
check_verify_result(verify_result, 0,
|
check_verify_result(verify_result,
|
||||||
|
constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
|
||||||
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
|
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
|
||||||
errors.NO_ERROR)
|
errors.NO_ERROR)
|
||||||
|
|
||||||
|
# Idiomatic interface.
|
||||||
|
with pyme.Context() as c:
|
||||||
|
alpha = c.get_key("A0FF4590BB6122EDEF6E3C542D727CC768697734", False)
|
||||||
|
bob = c.get_key("D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2", False)
|
||||||
|
plaintext, _, verify_result = \
|
||||||
|
c.decrypt(open(support.make_filename("cipher-2.asc")), verify=[alpha])
|
||||||
|
assert plaintext.find(b'Wenn Sie dies lesen k') >= 0, \
|
||||||
|
'Plaintext not found'
|
||||||
|
check_verify_result(verify_result,
|
||||||
|
constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
|
||||||
|
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
|
||||||
|
errors.NO_ERROR)
|
||||||
|
|
||||||
|
try:
|
||||||
|
c.decrypt(open(support.make_filename("cipher-2.asc")),
|
||||||
|
verify=[alpha, bob])
|
||||||
|
except errors.MissingSignatures as e:
|
||||||
|
assert len(e.missing) == 1
|
||||||
|
assert e.missing[0] == bob
|
||||||
|
else:
|
||||||
|
assert False, "Expected an error, got none"
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
# You should have received a copy of the GNU Lesser General Public
|
# You should have received a copy of the GNU Lesser General Public
|
||||||
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import pyme
|
||||||
from pyme import core, constants
|
from pyme import core, constants
|
||||||
import support
|
import support
|
||||||
|
|
||||||
@ -32,3 +33,10 @@ assert not result.unsupported_algorithm, \
|
|||||||
"Unsupported algorithm: {}".format(result.unsupported_algorithm)
|
"Unsupported algorithm: {}".format(result.unsupported_algorithm)
|
||||||
|
|
||||||
support.print_data(sink)
|
support.print_data(sink)
|
||||||
|
|
||||||
|
# Idiomatic interface.
|
||||||
|
with pyme.Context() as c:
|
||||||
|
plaintext, _, _ = c.decrypt(open(support.make_filename("cipher-1.asc")))
|
||||||
|
assert len(plaintext) > 0
|
||||||
|
assert plaintext.find(b'Wenn Sie dies lesen k') >= 0, \
|
||||||
|
'Plaintext not found'
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
import pyme
|
||||||
from pyme import core, constants
|
from pyme import core, constants
|
||||||
import support
|
import support
|
||||||
|
|
||||||
@ -69,3 +70,26 @@ for recipients in (keys, []):
|
|||||||
check_result(result, constants.SIG_MODE_NORMAL)
|
check_result(result, constants.SIG_MODE_NORMAL)
|
||||||
|
|
||||||
support.print_data(sink)
|
support.print_data(sink)
|
||||||
|
|
||||||
|
|
||||||
|
# Idiomatic interface.
|
||||||
|
with pyme.Context(armor=True) as c:
|
||||||
|
message = "Hallo Leute\n".encode()
|
||||||
|
ciphertext, _, sig_result = c.encrypt(message,
|
||||||
|
recipients=keys,
|
||||||
|
always_trust=True)
|
||||||
|
assert len(ciphertext) > 0
|
||||||
|
assert ciphertext.find(b'BEGIN PGP MESSAGE') > 0, 'Marker not found'
|
||||||
|
check_result(sig_result, constants.SIG_MODE_NORMAL)
|
||||||
|
|
||||||
|
c.signers = [c.get_key(support.sign_only, True)]
|
||||||
|
c.encrypt(message, recipients=keys, always_trust=True)
|
||||||
|
|
||||||
|
c.signers = [c.get_key(support.encrypt_only, True)]
|
||||||
|
try:
|
||||||
|
c.encrypt(message, recipients=keys, always_trust=True)
|
||||||
|
except pyme.errors.InvalidSigners as e:
|
||||||
|
assert len(e.signers) == 1
|
||||||
|
assert support.encrypt_only.endswith(e.signers[0].fpr)
|
||||||
|
else:
|
||||||
|
assert False, "Expected an InvalidSigners error, got none"
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import pyme
|
||||||
from pyme import core, constants
|
from pyme import core, constants
|
||||||
import support
|
import support
|
||||||
|
|
||||||
@ -61,3 +62,22 @@ for passphrase in ("abc", b"abc"):
|
|||||||
plaintext = plain.read()
|
plaintext = plain.read()
|
||||||
assert plaintext == b"Hallo Leute\n", \
|
assert plaintext == b"Hallo Leute\n", \
|
||||||
"Wrong plaintext {!r}".format(plaintext)
|
"Wrong plaintext {!r}".format(plaintext)
|
||||||
|
|
||||||
|
# Idiomatic interface.
|
||||||
|
for passphrase in ("abc", b"abc"):
|
||||||
|
with pyme.Context(armor=True) as c:
|
||||||
|
# Check that the passphrase callback is not altered.
|
||||||
|
def f(*args):
|
||||||
|
assert False
|
||||||
|
c.set_passphrase_cb(f)
|
||||||
|
|
||||||
|
message = "Hallo Leute\n".encode()
|
||||||
|
ciphertext, _, _ = c.encrypt(message,
|
||||||
|
passphrase=passphrase,
|
||||||
|
sign=False)
|
||||||
|
assert ciphertext.find(b'BEGIN PGP MESSAGE') > 0, 'Marker not found'
|
||||||
|
|
||||||
|
plaintext, _, _ = c.decrypt(ciphertext, passphrase=passphrase)
|
||||||
|
assert plaintext == message, 'Message body not recovered'
|
||||||
|
|
||||||
|
assert c._passphrase_cb[1] == f, "Passphrase callback not restored"
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
# You should have received a copy of the GNU Lesser General Public
|
# You should have received a copy of the GNU Lesser General Public
|
||||||
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import pyme
|
||||||
from pyme import core, constants
|
from pyme import core, constants
|
||||||
import support
|
import support
|
||||||
|
|
||||||
@ -34,6 +35,28 @@ keys.append(c.get_key("D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2", False))
|
|||||||
c.op_encrypt(keys, constants.ENCRYPT_ALWAYS_TRUST, source, sink)
|
c.op_encrypt(keys, constants.ENCRYPT_ALWAYS_TRUST, source, sink)
|
||||||
result = c.op_encrypt_result()
|
result = c.op_encrypt_result()
|
||||||
assert not result.invalid_recipients, \
|
assert not result.invalid_recipients, \
|
||||||
"Invalid recipient encountered: {}".format(result.invalid_recipients.fpr)
|
"Invalid recipients: {}".format(", ".join(r.fpr for r in result.recipients))
|
||||||
|
|
||||||
support.print_data(sink)
|
support.print_data(sink)
|
||||||
|
|
||||||
|
# Idiomatic interface.
|
||||||
|
with pyme.Context(armor=True) as c:
|
||||||
|
ciphertext, _, _ = c.encrypt("Hallo Leute\n".encode(),
|
||||||
|
recipients=keys,
|
||||||
|
sign=False,
|
||||||
|
always_trust=True)
|
||||||
|
assert len(ciphertext) > 0
|
||||||
|
assert ciphertext.find(b'BEGIN PGP MESSAGE') > 0, 'Marker not found'
|
||||||
|
|
||||||
|
c.encrypt("Hallo Leute\n".encode(),
|
||||||
|
recipients=[c.get_key(support.encrypt_only, False)],
|
||||||
|
sign=False, always_trust=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
c.encrypt("Hallo Leute\n".encode(),
|
||||||
|
recipients=[c.get_key(support.sign_only, False)],
|
||||||
|
sign=False, always_trust=True)
|
||||||
|
except pyme.errors.InvalidRecipients as e:
|
||||||
|
assert len(e.recipients) == 1
|
||||||
|
assert support.sign_only.endswith(e.recipients[0].fpr)
|
||||||
|
else:
|
||||||
|
assert False, "Expected an InvalidRecipients error, got none"
|
||||||
|
@ -20,13 +20,13 @@
|
|||||||
import io
|
import io
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
from pyme import core, constants, errors
|
import pyme
|
||||||
import support
|
import support
|
||||||
|
|
||||||
support.init_gpgme(constants.PROTOCOL_OpenPGP)
|
support.init_gpgme(pyme.constants.PROTOCOL_OpenPGP)
|
||||||
|
|
||||||
# Both Context and Data can be used as context manager:
|
# Both Context and Data can be used as context manager:
|
||||||
with core.Context() as c, core.Data() as d:
|
with pyme.Context() as c, pyme.Data() as d:
|
||||||
c.get_engine_info()
|
c.get_engine_info()
|
||||||
d.write(b"Halloechen")
|
d.write(b"Halloechen")
|
||||||
leak_c = c
|
leak_c = c
|
||||||
@ -35,16 +35,17 @@ assert leak_c.wrapped == None
|
|||||||
assert leak_d.wrapped == None
|
assert leak_d.wrapped == None
|
||||||
|
|
||||||
def sign_and_verify(source, signed, sink):
|
def sign_and_verify(source, signed, sink):
|
||||||
with core.Context() as c:
|
with pyme.Context() as c:
|
||||||
c.op_sign(source, signed, constants.SIG_MODE_NORMAL)
|
c.op_sign(source, signed, pyme.constants.SIG_MODE_NORMAL)
|
||||||
signed.seek(0, os.SEEK_SET)
|
signed.seek(0, os.SEEK_SET)
|
||||||
c.op_verify(signed, None, sink)
|
c.op_verify(signed, None, sink)
|
||||||
result = c.op_verify_result()
|
result = c.op_verify_result()
|
||||||
|
|
||||||
assert len(result.signatures) == 1, "Unexpected number of signatures"
|
assert len(result.signatures) == 1, "Unexpected number of signatures"
|
||||||
sig = result.signatures[0]
|
sig = result.signatures[0]
|
||||||
assert sig.summary == 0
|
assert sig.summary == (pyme.constants.SIGSUM_VALID |
|
||||||
assert errors.GPGMEError(sig.status).getcode() == errors.NO_ERROR
|
pyme.constants.SIGSUM_GREEN)
|
||||||
|
assert pyme.errors.GPGMEError(sig.status).getcode() == pyme.errors.NO_ERROR
|
||||||
|
|
||||||
sink.seek(0, os.SEEK_SET)
|
sink.seek(0, os.SEEK_SET)
|
||||||
assert sink.read() == b"Hallo Leute\n"
|
assert sink.read() == b"Hallo Leute\n"
|
||||||
@ -71,5 +72,5 @@ else:
|
|||||||
|
|
||||||
# Demonstrate automatic wrapping of objects implementing the buffer
|
# Demonstrate automatic wrapping of objects implementing the buffer
|
||||||
# interface, and the use of data objects with the 'with' statement.
|
# interface, and the use of data objects with the 'with' statement.
|
||||||
with io.BytesIO(preallocate) as signed, core.Data() as sink:
|
with io.BytesIO(preallocate) as signed, pyme.Data() as sink:
|
||||||
sign_and_verify(b"Hallo Leute\n", signed, sink)
|
sign_and_verify(b"Hallo Leute\n", signed, sink)
|
||||||
|
@ -115,8 +115,15 @@ def check_global(key, uids, n_subkeys):
|
|||||||
"Key unexpectedly carries issuer name: {}".format(key.issuer_name)
|
"Key unexpectedly carries issuer name: {}".format(key.issuer_name)
|
||||||
assert not key.chain_id, \
|
assert not key.chain_id, \
|
||||||
"Key unexpectedly carries chain ID: {}".format(key.chain_id)
|
"Key unexpectedly carries chain ID: {}".format(key.chain_id)
|
||||||
assert key.owner_trust == constants.VALIDITY_UNKNOWN, \
|
|
||||||
|
# Only key Alfa is trusted
|
||||||
|
assert key.uids[0].name == 'Alfa Test' \
|
||||||
|
or key.owner_trust == constants.VALIDITY_UNKNOWN, \
|
||||||
"Key has unexpected owner trust: {}".format(key.owner_trust)
|
"Key has unexpected owner trust: {}".format(key.owner_trust)
|
||||||
|
assert key.uids[0].name != 'Alfa Test' \
|
||||||
|
or key.owner_trust == constants.VALIDITY_ULTIMATE, \
|
||||||
|
"Key has unexpected owner trust: {}".format(key.owner_trust)
|
||||||
|
|
||||||
assert len(key.subkeys) - 1 == n_subkeys, \
|
assert len(key.subkeys) - 1 == n_subkeys, \
|
||||||
"Key `{}' has unexpected number of subkeys".format(uids[0][0])
|
"Key `{}' has unexpected number of subkeys".format(uids[0][0])
|
||||||
|
|
||||||
@ -161,7 +168,10 @@ def check_subkey(fpr, which, subkey):
|
|||||||
def check_uid(which, ref, uid):
|
def check_uid(which, ref, uid):
|
||||||
assert not uid.revoked, which + " user ID unexpectedly revoked"
|
assert not uid.revoked, which + " user ID unexpectedly revoked"
|
||||||
assert not uid.invalid, which + " user ID unexpectedly invalid"
|
assert not uid.invalid, which + " user ID unexpectedly invalid"
|
||||||
assert uid.validity == constants.VALIDITY_UNKNOWN, \
|
assert uid.validity == (constants.VALIDITY_UNKNOWN
|
||||||
|
if uid.name.split()[0]
|
||||||
|
not in {'Alfa', 'Alpha', 'Alice'} else
|
||||||
|
constants.VALIDITY_ULTIMATE), \
|
||||||
which + " user ID has unexpectedly validity: {}".format(uid.validity)
|
which + " user ID has unexpectedly validity: {}".format(uid.validity)
|
||||||
assert not uid.signatures, which + " user ID unexpectedly signed"
|
assert not uid.signatures, which + " user ID unexpectedly signed"
|
||||||
assert uid.name == ref[0], \
|
assert uid.name == ref[0], \
|
||||||
|
@ -19,34 +19,38 @@
|
|||||||
|
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
import pyme
|
||||||
from pyme import core, constants
|
from pyme import core, constants
|
||||||
import support
|
import support
|
||||||
|
|
||||||
|
def fail(msg):
|
||||||
|
raise RuntimeError(msg)
|
||||||
|
|
||||||
def check_result(r, typ):
|
def check_result(r, typ):
|
||||||
if r.invalid_signers:
|
if r.invalid_signers:
|
||||||
sys.exit("Invalid signer found: {}".format(r.invalid_signers.fpr))
|
fail("Invalid signer found: {}".format(r.invalid_signers.fpr))
|
||||||
|
|
||||||
if len(r.signatures) != 1:
|
if len(r.signatures) != 1:
|
||||||
sys.exit("Unexpected number of signatures created")
|
fail("Unexpected number of signatures created")
|
||||||
|
|
||||||
signature = r.signatures[0]
|
signature = r.signatures[0]
|
||||||
if signature.type != typ:
|
if signature.type != typ:
|
||||||
sys.exit("Wrong type of signature created")
|
fail("Wrong type of signature created")
|
||||||
|
|
||||||
if signature.pubkey_algo != constants.PK_DSA:
|
if signature.pubkey_algo != constants.PK_DSA:
|
||||||
sys.exit("Wrong pubkey algorithm reported: {}".format(
|
fail("Wrong pubkey algorithm reported: {}".format(
|
||||||
signature.pubkey_algo))
|
signature.pubkey_algo))
|
||||||
|
|
||||||
if signature.hash_algo != constants.MD_SHA1:
|
if signature.hash_algo != constants.MD_SHA1:
|
||||||
sys.exit("Wrong hash algorithm reported: {}".format(
|
fail("Wrong hash algorithm reported: {}".format(
|
||||||
signature.hash_algo))
|
signature.hash_algo))
|
||||||
|
|
||||||
if signature.sig_class != 1:
|
if signature.sig_class != 1:
|
||||||
sys.exit("Wrong signature class reported: {}".format(
|
fail("Wrong signature class reported: {}".format(
|
||||||
signature.sig_class))
|
signature.sig_class))
|
||||||
|
|
||||||
if signature.fpr != "A0FF4590BB6122EDEF6E3C542D727CC768697734":
|
if signature.fpr != "A0FF4590BB6122EDEF6E3C542D727CC768697734":
|
||||||
sys.exit("Wrong fingerprint reported: {}".format(signature.fpr))
|
fail("Wrong fingerprint reported: {}".format(signature.fpr))
|
||||||
|
|
||||||
|
|
||||||
support.init_gpgme(constants.PROTOCOL_OpenPGP)
|
support.init_gpgme(constants.PROTOCOL_OpenPGP)
|
||||||
@ -82,3 +86,35 @@ c.op_sign(source, sink, constants.SIG_MODE_CLEAR)
|
|||||||
result = c.op_sign_result()
|
result = c.op_sign_result()
|
||||||
check_result(result, constants.SIG_MODE_CLEAR)
|
check_result(result, constants.SIG_MODE_CLEAR)
|
||||||
support.print_data(sink)
|
support.print_data(sink)
|
||||||
|
|
||||||
|
# Idiomatic interface.
|
||||||
|
with pyme.Context(armor=True, textmode=True) as c:
|
||||||
|
message = "Hallo Leute\n".encode()
|
||||||
|
signed, _ = c.sign(message)
|
||||||
|
assert len(signed) > 0
|
||||||
|
assert signed.find(b'BEGIN PGP MESSAGE') > 0, 'Message not found'
|
||||||
|
|
||||||
|
signed, _ = c.sign(message, mode=pyme.constants.SIG_MODE_DETACH)
|
||||||
|
assert len(signed) > 0
|
||||||
|
assert signed.find(b'BEGIN PGP SIGNATURE') > 0, 'Signature not found'
|
||||||
|
|
||||||
|
signed, _ = c.sign(message, mode=pyme.constants.SIG_MODE_CLEAR)
|
||||||
|
assert len(signed) > 0
|
||||||
|
assert signed.find(b'BEGIN PGP SIGNED MESSAGE') > 0, 'Message not found'
|
||||||
|
assert signed.find(message) > 0, 'Message content not found'
|
||||||
|
assert signed.find(b'BEGIN PGP SIGNATURE') > 0, 'Signature not found'
|
||||||
|
|
||||||
|
with pyme.Context() as c:
|
||||||
|
message = "Hallo Leute\n".encode()
|
||||||
|
|
||||||
|
c.signers = [c.get_key(support.sign_only, True)]
|
||||||
|
c.sign(message)
|
||||||
|
|
||||||
|
c.signers = [c.get_key(support.encrypt_only, True)]
|
||||||
|
try:
|
||||||
|
c.sign(message)
|
||||||
|
except pyme.errors.InvalidSigners as e:
|
||||||
|
assert len(e.signers) == 1
|
||||||
|
assert support.encrypt_only.endswith(e.signers[0].fpr)
|
||||||
|
else:
|
||||||
|
assert False, "Expected an InvalidSigners error, got none"
|
||||||
|
@ -18,35 +18,39 @@
|
|||||||
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
import pyme
|
||||||
from pyme import core, constants
|
from pyme import core, constants
|
||||||
import support
|
import support
|
||||||
|
|
||||||
|
def fail(msg):
|
||||||
|
raise RuntimeError(msg)
|
||||||
|
|
||||||
def check_result(r, typ):
|
def check_result(r, typ):
|
||||||
if r.invalid_signers:
|
if r.invalid_signers:
|
||||||
sys.exit("Invalid signer found: {}".format(r.invalid_signers.fpr))
|
fail("Invalid signer found: {}".format(r.invalid_signers.fpr))
|
||||||
|
|
||||||
if len(r.signatures) != 2:
|
if len(r.signatures) != 2:
|
||||||
sys.exit("Unexpected number of signatures created")
|
fail("Unexpected number of signatures created")
|
||||||
|
|
||||||
for signature in r.signatures:
|
for signature in r.signatures:
|
||||||
if signature.type != typ:
|
if signature.type != typ:
|
||||||
sys.exit("Wrong type of signature created")
|
fail("Wrong type of signature created")
|
||||||
|
|
||||||
if signature.pubkey_algo != constants.PK_DSA:
|
if signature.pubkey_algo != constants.PK_DSA:
|
||||||
sys.exit("Wrong pubkey algorithm reported: {}".format(
|
fail("Wrong pubkey algorithm reported: {}".format(
|
||||||
signature.pubkey_algo))
|
signature.pubkey_algo))
|
||||||
|
|
||||||
if signature.hash_algo != constants.MD_SHA1:
|
if signature.hash_algo != constants.MD_SHA1:
|
||||||
sys.exit("Wrong hash algorithm reported: {}".format(
|
fail("Wrong hash algorithm reported: {}".format(
|
||||||
signature.hash_algo))
|
signature.hash_algo))
|
||||||
|
|
||||||
if signature.sig_class != 1:
|
if signature.sig_class != 1:
|
||||||
sys.exit("Wrong signature class reported: {}".format(
|
fail("Wrong signature class reported: got {}, want {}".format(
|
||||||
signature.sig_class))
|
signature.sig_class, 1))
|
||||||
|
|
||||||
if signature.fpr not in ("A0FF4590BB6122EDEF6E3C542D727CC768697734",
|
if signature.fpr not in ("A0FF4590BB6122EDEF6E3C542D727CC768697734",
|
||||||
"23FD347A419429BACCD5E72D6BC4778054ACD246"):
|
"23FD347A419429BACCD5E72D6BC4778054ACD246"):
|
||||||
sys.exit("Wrong fingerprint reported: {}".format(signature.fpr))
|
fail("Wrong fingerprint reported: {}".format(signature.fpr))
|
||||||
|
|
||||||
|
|
||||||
support.init_gpgme(constants.PROTOCOL_OpenPGP)
|
support.init_gpgme(constants.PROTOCOL_OpenPGP)
|
||||||
@ -73,3 +77,20 @@ for mode in (constants.SIG_MODE_NORMAL, constants.SIG_MODE_DETACH,
|
|||||||
result = c.op_sign_result()
|
result = c.op_sign_result()
|
||||||
check_result(result, mode)
|
check_result(result, mode)
|
||||||
support.print_data(sink)
|
support.print_data(sink)
|
||||||
|
|
||||||
|
# Idiomatic interface.
|
||||||
|
with pyme.Context(armor=True, textmode=True, signers=keys) as c:
|
||||||
|
message = "Hallo Leute\n".encode()
|
||||||
|
signed, result = c.sign(message)
|
||||||
|
check_result(result, constants.SIG_MODE_NORMAL)
|
||||||
|
assert signed.find(b'BEGIN PGP MESSAGE') > 0, 'Message not found'
|
||||||
|
|
||||||
|
signed, result = c.sign(message, mode=constants.SIG_MODE_DETACH)
|
||||||
|
check_result(result, constants.SIG_MODE_DETACH)
|
||||||
|
assert signed.find(b'BEGIN PGP SIGNATURE') > 0, 'Signature not found'
|
||||||
|
|
||||||
|
signed, result = c.sign(message, mode=constants.SIG_MODE_CLEAR)
|
||||||
|
check_result(result, constants.SIG_MODE_CLEAR)
|
||||||
|
assert signed.find(b'BEGIN PGP SIGNED MESSAGE') > 0, 'Message not found'
|
||||||
|
assert signed.find(message) > 0, 'Message content not found'
|
||||||
|
assert signed.find(b'BEGIN PGP SIGNATURE') > 0, 'Signature not found'
|
||||||
|
@ -18,12 +18,13 @@
|
|||||||
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import pyme
|
||||||
from pyme import core, constants, errors
|
from pyme import core, constants, errors
|
||||||
import support
|
import support
|
||||||
|
|
||||||
test_text1 = "Just GNU it!\n"
|
test_text1 = b"Just GNU it!\n"
|
||||||
test_text1f= "Just GNU it?\n"
|
test_text1f= b"Just GNU it?\n"
|
||||||
test_sig1 = """-----BEGIN PGP SIGNATURE-----
|
test_sig1 = b"""-----BEGIN PGP SIGNATURE-----
|
||||||
|
|
||||||
iN0EABECAJ0FAjoS+i9FFIAAAAAAAwA5YmFyw7bDpMO8w58gZGFzIHdhcmVuIFVt
|
iN0EABECAJ0FAjoS+i9FFIAAAAAAAwA5YmFyw7bDpMO8w58gZGFzIHdhcmVuIFVt
|
||||||
bGF1dGUgdW5kIGpldHp0IGVpbiBwcm96ZW50JS1aZWljaGVuNRSAAAAAAAgAJGZv
|
bGF1dGUgdW5kIGpldHp0IGVpbiBwcm96ZW50JS1aZWljaGVuNRSAAAAAAAgAJGZv
|
||||||
@ -34,7 +35,7 @@ dADGKXF/Hcb+AKCJWPphZCphduxSvrzH0hgzHdeQaA==
|
|||||||
-----END PGP SIGNATURE-----
|
-----END PGP SIGNATURE-----
|
||||||
"""
|
"""
|
||||||
|
|
||||||
test_sig2 = """-----BEGIN PGP MESSAGE-----
|
test_sig2 = b"""-----BEGIN PGP MESSAGE-----
|
||||||
|
|
||||||
owGbwMvMwCSoW1RzPCOz3IRxjXQSR0lqcYleSUWJTZOvjVdpcYmCu1+oQmaJIleH
|
owGbwMvMwCSoW1RzPCOz3IRxjXQSR0lqcYleSUWJTZOvjVdpcYmCu1+oQmaJIleH
|
||||||
GwuDIBMDGysTSIqBi1MApi+nlGGuwDeHao53HBr+FoVGP3xX+kvuu9fCMJvl6IOf
|
GwuDIBMDGysTSIqBi1MApi+nlGGuwDeHao53HBr+FoVGP3xX+kvuu9fCMJvl6IOf
|
||||||
@ -44,7 +45,7 @@ y1kvP4y+8D5a11ang0udywsA
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
# A message with a prepended but unsigned plaintext packet.
|
# A message with a prepended but unsigned plaintext packet.
|
||||||
double_plaintext_sig = """-----BEGIN PGP MESSAGE-----
|
double_plaintext_sig = b"""-----BEGIN PGP MESSAGE-----
|
||||||
|
|
||||||
rDRiCmZvb2Jhci50eHRF4pxNVGhpcyBpcyBteSBzbmVha3kgcGxhaW50ZXh0IG1l
|
rDRiCmZvb2Jhci50eHRF4pxNVGhpcyBpcyBteSBzbmVha3kgcGxhaW50ZXh0IG1l
|
||||||
c3NhZ2UKowGbwMvMwCSoW1RzPCOz3IRxTWISa6JebnG666MFD1wzSzJSixQ81XMV
|
c3NhZ2UKowGbwMvMwCSoW1RzPCOz3IRxTWISa6JebnG666MFD1wzSzJSixQ81XMV
|
||||||
@ -55,10 +56,12 @@ UqVooWlGXHwNw/xg/fVzt9VNbtjtJ/fhUqYo0/LyCGEA
|
|||||||
-----END PGP MESSAGE-----
|
-----END PGP MESSAGE-----
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def check_result(result, summary, fpr, status, notation):
|
def check_result(result, summary, validity, fpr, status, notation):
|
||||||
assert len(result.signatures) == 1, "Unexpected number of signatures"
|
assert len(result.signatures) == 1, "Unexpected number of signatures"
|
||||||
sig = result.signatures[0]
|
sig = result.signatures[0]
|
||||||
assert sig.summary == summary, "Unexpected signature summary"
|
assert sig.summary == summary, \
|
||||||
|
"Unexpected signature summary: {}, want: {}".format(sig.summary,
|
||||||
|
summary)
|
||||||
assert sig.fpr == fpr
|
assert sig.fpr == fpr
|
||||||
assert errors.GPGMEError(sig.status).getcode() == status
|
assert errors.GPGMEError(sig.status).getcode() == status
|
||||||
|
|
||||||
@ -83,7 +86,9 @@ def check_result(result, summary, fpr, status, notation):
|
|||||||
assert len(expected_notations) == 0
|
assert len(expected_notations) == 0
|
||||||
|
|
||||||
assert not sig.wrong_key_usage
|
assert not sig.wrong_key_usage
|
||||||
assert sig.validity == constants.VALIDITY_UNKNOWN
|
assert sig.validity == validity, \
|
||||||
|
"Unexpected signature validity: {}, want: {}".format(
|
||||||
|
sig.validity, validity)
|
||||||
assert errors.GPGMEError(sig.validity_reason).getcode() == errors.NO_ERROR
|
assert errors.GPGMEError(sig.validity_reason).getcode() == errors.NO_ERROR
|
||||||
|
|
||||||
|
|
||||||
@ -96,7 +101,9 @@ text = core.Data(test_text1)
|
|||||||
sig = core.Data(test_sig1)
|
sig = core.Data(test_sig1)
|
||||||
c.op_verify(sig, text, None)
|
c.op_verify(sig, text, None)
|
||||||
result = c.op_verify_result()
|
result = c.op_verify_result()
|
||||||
check_result(result, 0, "A0FF4590BB6122EDEF6E3C542D727CC768697734",
|
check_result(result, constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
|
||||||
|
constants.VALIDITY_FULL,
|
||||||
|
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
|
||||||
errors.NO_ERROR, True)
|
errors.NO_ERROR, True)
|
||||||
|
|
||||||
|
|
||||||
@ -105,15 +112,17 @@ text = core.Data(test_text1f)
|
|||||||
sig.seek(0, os.SEEK_SET)
|
sig.seek(0, os.SEEK_SET)
|
||||||
c.op_verify(sig, text, None)
|
c.op_verify(sig, text, None)
|
||||||
result = c.op_verify_result()
|
result = c.op_verify_result()
|
||||||
check_result(result, constants.SIGSUM_RED, "2D727CC768697734",
|
check_result(result, constants.SIGSUM_RED, constants.VALIDITY_UNKNOWN,
|
||||||
errors.BAD_SIGNATURE, False)
|
"2D727CC768697734", errors.BAD_SIGNATURE, False)
|
||||||
|
|
||||||
# Checking a normal signature.
|
# Checking a normal signature.
|
||||||
text = core.Data()
|
text = core.Data()
|
||||||
sig = core.Data(test_sig2)
|
sig = core.Data(test_sig2)
|
||||||
c.op_verify(sig, None, text)
|
c.op_verify(sig, None, text)
|
||||||
result = c.op_verify_result()
|
result = c.op_verify_result()
|
||||||
check_result(result, 0, "A0FF4590BB6122EDEF6E3C542D727CC768697734",
|
check_result(result, constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
|
||||||
|
constants.VALIDITY_FULL,
|
||||||
|
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
|
||||||
errors.NO_ERROR, False)
|
errors.NO_ERROR, False)
|
||||||
|
|
||||||
# Checking an invalid message.
|
# Checking an invalid message.
|
||||||
@ -126,3 +135,54 @@ except Exception as e:
|
|||||||
assert e.getcode() == errors.BAD_DATA
|
assert e.getcode() == errors.BAD_DATA
|
||||||
else:
|
else:
|
||||||
assert False, "Expected an error but got none."
|
assert False, "Expected an error but got none."
|
||||||
|
|
||||||
|
|
||||||
|
# Idiomatic interface.
|
||||||
|
with pyme.Context(armor=True) as c:
|
||||||
|
# Checking a valid message.
|
||||||
|
_, result = c.verify(test_text1, test_sig1)
|
||||||
|
check_result(result, constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
|
||||||
|
constants.VALIDITY_FULL,
|
||||||
|
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
|
||||||
|
errors.NO_ERROR, True)
|
||||||
|
|
||||||
|
# Checking a manipulated message.
|
||||||
|
try:
|
||||||
|
c.verify(test_text1f, test_sig1)
|
||||||
|
except errors.BadSignatures as e:
|
||||||
|
check_result(e.result, constants.SIGSUM_RED,
|
||||||
|
constants.VALIDITY_UNKNOWN,
|
||||||
|
"2D727CC768697734", errors.BAD_SIGNATURE, False)
|
||||||
|
else:
|
||||||
|
assert False, "Expected an error but got none."
|
||||||
|
|
||||||
|
# Checking a normal signature.
|
||||||
|
sig = core.Data(test_sig2)
|
||||||
|
data, result = c.verify(test_sig2)
|
||||||
|
check_result(result, constants.SIGSUM_VALID | constants.SIGSUM_GREEN,
|
||||||
|
constants.VALIDITY_FULL,
|
||||||
|
"A0FF4590BB6122EDEF6E3C542D727CC768697734",
|
||||||
|
errors.NO_ERROR, False)
|
||||||
|
assert data == test_text1
|
||||||
|
|
||||||
|
# Checking an invalid message.
|
||||||
|
try:
|
||||||
|
c.verify(double_plaintext_sig)
|
||||||
|
except errors.GPGMEError as e:
|
||||||
|
assert e.getcode() == errors.BAD_DATA
|
||||||
|
else:
|
||||||
|
assert False, "Expected an error but got none."
|
||||||
|
|
||||||
|
alpha = c.get_key("A0FF4590BB6122EDEF6E3C542D727CC768697734", False)
|
||||||
|
bob = c.get_key("D695676BDCEDCC2CDD6152BCFE180B1DA9E3B0B2", False)
|
||||||
|
|
||||||
|
# Checking a valid message.
|
||||||
|
c.verify(test_text1, test_sig1, verify=[alpha])
|
||||||
|
|
||||||
|
try:
|
||||||
|
c.verify(test_text1, test_sig1, verify=[alpha, bob])
|
||||||
|
except errors.MissingSignatures as e:
|
||||||
|
assert len(e.missing) == 1
|
||||||
|
assert e.missing[0] == bob
|
||||||
|
else:
|
||||||
|
assert False, "Expected an error, got none"
|
||||||
|
Loading…
Reference in New Issue
Block a user