python: Improve error handling.

* NEWS: Update.
* lang/python/src/core.py (Context.__read__): New helper function.
(Context.encrypt): Attach partial results to exceptions.
(Context.decrypt): Likewise.
(Context.sign): Likewise.
(Context.verify): Likewise.
* lang/python/src/errors.py (GpgError): Move the core of GPGMEError to
this class, add a nicer interface for it.  This makes the errors
thrown by this library more uniform, and allows us to track the
underlying error in synthesized high-level errors.
(GPGMEError): Simplify.
(...): Make sure to call the parent classes' constructor in all other
classes.
--

Attach partial results to errors.  Some operations return results even
though they signal an error.  Of course this information must be taken
with a grain of salt.  But often, this information is useful for
diagnostic uses or to give the user feedback.  Since the normal
control flow is disrupted by the exception, the callee can no longer
return results, hence we attach them to the exception objects.

GnuPG-bug-id: 3271
Signed-off-by: Justus Winter <justus@g10code.com>
This commit is contained in:
Justus Winter 2017-07-11 12:18:42 +02:00
parent fe79eb8de3
commit 1e68f93dc5
No known key found for this signature in database
GPG Key ID: DD1A52F9DA8C9020
3 changed files with 153 additions and 65 deletions

1
NEWS
View File

@ -10,6 +10,7 @@ Noteworthy changes in version 1.9.1 (unreleased)
cpp: Signature::isDeVs NEW. cpp: Signature::isDeVs NEW.
py: DecryptResult EXTENDED: New boolean field 'is_de_vs'. py: DecryptResult EXTENDED: New boolean field 'is_de_vs'.
py: Signature EXTENDED: New boolean field 'is_de_vs'. py: Signature EXTENDED: New boolean field 'is_de_vs'.
py: GpgError EXTENDED: Partial results in 'results'.
Noteworthy changes in version 1.9.0 (2017-03-28) Noteworthy changes in version 1.9.0 (2017-03-28)
------------------------------------------------ ------------------------------------------------

View File

@ -132,7 +132,7 @@ class GpgmeWrapper(object):
result = func(slf.wrapped, *args) result = func(slf.wrapped, *args)
if slf._callback_excinfo: if slf._callback_excinfo:
gpgme.gpg_raise_callback_exception(slf) gpgme.gpg_raise_callback_exception(slf)
return errorcheck(result, "Invocation of " + name) return errorcheck(result, name)
else: else:
def _funcwrap(slf, *args): def _funcwrap(slf, *args):
result = func(slf.wrapped, *args) result = func(slf.wrapped, *args)
@ -206,6 +206,17 @@ class Context(GpgmeWrapper):
self.protocol = protocol self.protocol = protocol
self.home_dir = home_dir self.home_dir = home_dir
def __read__(self, sink, data):
"""Read helper
Helper function to retrieve the results of an operation, or
None if SINK is given.
"""
if sink or data == None:
return None
data.seek(0, os.SEEK_SET)
return data.read()
def __repr__(self): def __repr__(self):
return ( return (
"Context(armor={0.armor}, " "Context(armor={0.armor}, "
@ -270,15 +281,25 @@ class Context(GpgmeWrapper):
else: else:
self.op_encrypt(recipients, flags, plaintext, ciphertext) self.op_encrypt(recipients, flags, plaintext, ciphertext)
except errors.GPGMEError as e: except errors.GPGMEError as e:
if e.getcode() == errors.UNUSABLE_PUBKEY:
result = self.op_encrypt_result() result = self.op_encrypt_result()
sig_result = self.op_sign_result() if sign else None
results = (self.__read__(sink, ciphertext),
result, sig_result)
if e.getcode() == errors.UNUSABLE_PUBKEY:
if result.invalid_recipients: if result.invalid_recipients:
raise errors.InvalidRecipients(result.invalid_recipients) raise errors.InvalidRecipients(result.invalid_recipients,
error=e.error,
results=results)
if e.getcode() == errors.UNUSABLE_SECKEY: if e.getcode() == errors.UNUSABLE_SECKEY:
sig_result = self.op_sign_result() sig_result = self.op_sign_result()
if sig_result.invalid_signers: if sig_result.invalid_signers:
raise errors.InvalidSigners(sig_result.invalid_signers) raise errors.InvalidSigners(sig_result.invalid_signers,
raise error=e.error,
results=results)
# Otherwise, just raise the error, but attach the results
# first.
e.results = results
raise e
finally: finally:
if passphrase != None: if passphrase != None:
self.pinentry_mode = old_pinentry_mode self.pinentry_mode = old_pinentry_mode
@ -290,11 +311,7 @@ class Context(GpgmeWrapper):
sig_result = self.op_sign_result() if sign else None sig_result = self.op_sign_result() if sign else None
assert not sig_result or not sig_result.invalid_signers assert not sig_result or not sig_result.invalid_signers
cipherbytes = None return self.__read__(sink, ciphertext), result, sig_result
if not sink:
ciphertext.seek(0, os.SEEK_SET)
cipherbytes = ciphertext.read()
return cipherbytes, result, sig_result
def decrypt(self, ciphertext, sink=None, passphrase=None, verify=True): def decrypt(self, ciphertext, sink=None, passphrase=None, verify=True):
"""Decrypt data """Decrypt data
@ -340,6 +357,13 @@ class Context(GpgmeWrapper):
self.op_decrypt_verify(ciphertext, plaintext) self.op_decrypt_verify(ciphertext, plaintext)
else: else:
self.op_decrypt(ciphertext, plaintext) self.op_decrypt(ciphertext, plaintext)
except errors.GPGMEError as e:
result = self.op_decrypt_result()
verify_result = self.op_verify_result() if verify else None
# Just raise the error, but attach the results first.
e.results = (self.__read__(sink, plaintext),
result, verify_result)
raise e
finally: finally:
if passphrase != None: if passphrase != None:
self.pinentry_mode = old_pinentry_mode self.pinentry_mode = old_pinentry_mode
@ -348,13 +372,15 @@ class Context(GpgmeWrapper):
result = self.op_decrypt_result() result = self.op_decrypt_result()
verify_result = self.op_verify_result() if verify else None verify_result = self.op_verify_result() if verify else None
results = (self.__read__(sink, plaintext), result, verify_result)
if result.unsupported_algorithm: if result.unsupported_algorithm:
raise errors.UnsupportedAlgorithm(result.unsupported_algorithm) raise errors.UnsupportedAlgorithm(result.unsupported_algorithm,
results=results)
if verify: if verify:
if any(s.status != errors.NO_ERROR if any(s.status != errors.NO_ERROR
for s in verify_result.signatures): for s in verify_result.signatures):
raise errors.BadSignatures(verify_result) raise errors.BadSignatures(verify_result, results=results)
if verify and verify != True: if verify and verify != True:
missing = list() missing = list()
@ -372,13 +398,10 @@ class Context(GpgmeWrapper):
if not ok: if not ok:
missing.append(key) missing.append(key)
if missing: if missing:
raise errors.MissingSignatures(verify_result, missing) raise errors.MissingSignatures(verify_result, missing,
results=results)
plainbytes = None return results
if not sink:
plaintext.seek(0, os.SEEK_SET)
plainbytes = plaintext.read()
return plainbytes, result, verify_result
def sign(self, data, sink=None, mode=constants.SIG_MODE_NORMAL): def sign(self, data, sink=None, mode=constants.SIG_MODE_NORMAL):
"""Sign data """Sign data
@ -408,20 +431,20 @@ class Context(GpgmeWrapper):
try: try:
self.op_sign(data, signeddata, mode) self.op_sign(data, signeddata, mode)
except errors.GPGMEError as e: except errors.GPGMEError as e:
results = (self.__read__(sink, signeddata),
self.op_sign_result())
if e.getcode() == errors.UNUSABLE_SECKEY: if e.getcode() == errors.UNUSABLE_SECKEY:
result = self.op_sign_result() if results[1].invalid_signers:
if result.invalid_signers: raise errors.InvalidSigners(results[1].invalid_signers,
raise errors.InvalidSigners(result.invalid_signers) error=e.error,
raise results=results)
e.results = results
raise e
result = self.op_sign_result() result = self.op_sign_result()
assert not result.invalid_signers assert not result.invalid_signers
signedbytes = None return self.__read__(sink, signeddata), result
if not sink:
signeddata.seek(0, os.SEEK_SET)
signedbytes = signeddata.read()
return signedbytes, result
def verify(self, signed_data, signature=None, sink=None, verify=[]): def verify(self, signed_data, signature=None, sink=None, verify=[]):
"""Verify signatures """Verify signatures
@ -451,20 +474,26 @@ class Context(GpgmeWrapper):
else: else:
data = sink if sink else Data() data = sink if sink else Data()
try:
if signature: if signature:
self.op_verify(signature, signed_data, None) self.op_verify(signature, signed_data, None)
else: else:
self.op_verify(signed_data, None, data) self.op_verify(signed_data, None, data)
except errors.GPGMEError as e:
# Just raise the error, but attach the results first.
e.results = (self.__read__(sink, data),
self.op_verify_result())
raise e
result = self.op_verify_result() results = (self.__read__(sink, data), self.op_verify_result())
if any(s.status != errors.NO_ERROR for s in result.signatures): if any(s.status != errors.NO_ERROR for s in results[1].signatures):
raise errors.BadSignatures(result) raise errors.BadSignatures(results[1], results=results)
missing = list() missing = list()
for key in verify: for key in verify:
ok = False ok = False
for subkey in key.subkeys: for subkey in key.subkeys:
for sig in result.signatures: for sig in results[1].signatures:
if sig.summary & constants.SIGSUM_VALID == 0: if sig.summary & constants.SIGSUM_VALID == 0:
continue continue
if subkey.can_sign and subkey.fpr == sig.fpr: if subkey.can_sign and subkey.fpr == sig.fpr:
@ -475,13 +504,10 @@ class Context(GpgmeWrapper):
if not ok: if not ok:
missing.append(key) missing.append(key)
if missing: if missing:
raise errors.MissingSignatures(result, missing) raise errors.MissingSignatures(results[1], missing,
results=results)
plainbytes = None return results
if data and not sink:
data.seek(0, os.SEEK_SET)
plainbytes = data.read()
return plainbytes, result
def keylist(self, pattern=None, secret=False, def keylist(self, pattern=None, secret=False,
mode=constants.keylist.mode.LOCAL, mode=constants.keylist.mode.LOCAL,

View File

@ -1,3 +1,4 @@
# Copyright (C) 2016-2017 g10 Code GmbH
# Copyright (C) 2004 Igor Belyi <belyi@users.sourceforge.net> # Copyright (C) 2004 Igor Belyi <belyi@users.sourceforge.net>
# Copyright (C) 2002 John Goerzen <jgoerzen@complete.org> # Copyright (C) 2002 John Goerzen <jgoerzen@complete.org>
# #
@ -30,32 +31,89 @@ util.process_constants('GPG_ERR_', globals())
del util del util
class GpgError(Exception): class GpgError(Exception):
pass """A GPG Error
This is the base of all errors thrown by this library.
If the error originated from GPGME, then additional information
can be found by looking at 'code' for the error code, and 'source'
for the errors origin. Suitable constants for comparison are
defined in this module. 'code_str' and 'source_str' are
human-readable versions of the former two properties.
If 'context' is not None, then it contains a human-readable hint
as to where the error originated from.
If 'results' is not None, it is a tuple containing results of the
operation that failed. The tuples elements are the results of the
function that raised the error. Some operations return results
even though they signal an error. Of course this information must
be taken with a grain of salt. But often, this information is
useful for diagnostic uses or to give the user feedback. Since
the normal control flow is disrupted by the exception, the callee
can no longer return results, hence we attach them to the
exception objects.
"""
def __init__(self, error=None, context=None, results=None):
self.error = error
self.context = context
self.results = results
@property
def code(self):
if self.error == None:
return None
return gpgme.gpgme_err_code(self.error)
@property
def code_str(self):
if self.error == None:
return None
return gpgme.gpgme_strerror(self.error)
@property
def source(self):
if self.error == None:
return None
return gpgme.gpgme_err_source(self.error)
@property
def source_str(self):
if self.error == None:
return None
return gpgme.gpgme_strsource(self.error)
def __str__(self):
msgs = []
if self.context != None:
msgs.append(self.context)
if self.error != None:
msgs.append(self.source_str)
msgs.append(self.code_str)
return ': '.join(msgs)
class GPGMEError(GpgError): class GPGMEError(GpgError):
def __init__(self, error = None, message = None): '''Generic error
self.error = error
self.message = message
This is a generic error that wraps the underlying libraries native
error type. It is thrown when the low-level API is invoked and
returns an error. This is the error that was used in PyME.
'''
@classmethod @classmethod
def fromSyserror(cls): def fromSyserror(cls):
return cls(gpgme.gpgme_err_code_from_syserror()) return cls(gpgme.gpgme_err_code_from_syserror())
@property
def message(self):
return self.context
def getstring(self): def getstring(self):
message = "%s: %s" % (gpgme.gpgme_strsource(self.error), return str(self)
gpgme.gpgme_strerror(self.error))
if self.message != None:
message = "%s: %s" % (self.message, message)
return message
def getcode(self): def getcode(self):
return gpgme.gpgme_err_code(self.error) return self.code
def getsource(self): def getsource(self):
return gpgme.gpgme_err_source(self.error) return self.source
def __str__(self):
return self.getstring()
def errorcheck(retval, extradata = None): def errorcheck(retval, extradata = None):
if retval: if retval:
@ -81,7 +139,8 @@ class EncryptionError(GpgError):
pass pass
class InvalidRecipients(EncryptionError): class InvalidRecipients(EncryptionError):
def __init__(self, recipients): def __init__(self, recipients, **kwargs):
EncryptionError.__init__(self, **kwargs)
self.recipients = recipients self.recipients = recipients
def __str__(self): def __str__(self):
return ", ".join("{}: {}".format(r.fpr, return ", ".join("{}: {}".format(r.fpr,
@ -92,7 +151,8 @@ class DeryptionError(GpgError):
pass pass
class UnsupportedAlgorithm(DeryptionError): class UnsupportedAlgorithm(DeryptionError):
def __init__(self, algorithm): def __init__(self, algorithm, **kwargs):
DeryptionError.__init__(self, **kwargs)
self.algorithm = algorithm self.algorithm = algorithm
def __str__(self): def __str__(self):
return self.algorithm return self.algorithm
@ -101,7 +161,8 @@ class SigningError(GpgError):
pass pass
class InvalidSigners(SigningError): class InvalidSigners(SigningError):
def __init__(self, signers): def __init__(self, signers, **kwargs):
SigningError.__init__(self, **kwargs)
self.signers = signers self.signers = signers
def __str__(self): def __str__(self):
return ", ".join("{}: {}".format(s.fpr, return ", ".join("{}: {}".format(s.fpr,
@ -109,11 +170,11 @@ class InvalidSigners(SigningError):
for s in self.signers) for s in self.signers)
class VerificationError(GpgError): class VerificationError(GpgError):
pass def __init__(self, result, **kwargs):
GpgError.__init__(self, **kwargs)
self.result = result
class BadSignatures(VerificationError): class BadSignatures(VerificationError):
def __init__(self, result):
self.result = result
def __str__(self): def __str__(self):
return ", ".join("{}: {}".format(s.fpr, return ", ".join("{}: {}".format(s.fpr,
gpgme.gpgme_strerror(s.status)) gpgme.gpgme_strerror(s.status))
@ -121,8 +182,8 @@ class BadSignatures(VerificationError):
if s.status != NO_ERROR) if s.status != NO_ERROR)
class MissingSignatures(VerificationError): class MissingSignatures(VerificationError):
def __init__(self, result, missing): def __init__(self, result, missing, **kwargs):
self.result = result VerificationError.__init__(self, result, **kwargs)
self.missing = missing self.missing = missing
def __str__(self): def __str__(self):
return ", ".join(k.subkeys[0].fpr for k in self.missing) return ", ".join(k.subkeys[0].fpr for k in self.missing)