diff options
Diffstat (limited to 'lang/python/src/core.py')
-rw-r--r-- | lang/python/src/core.py | 539 |
1 files changed, 382 insertions, 157 deletions
diff --git a/lang/python/src/core.py b/lang/python/src/core.py index bd95d231..d4711317 100644 --- a/lang/python/src/core.py +++ b/lang/python/src/core.py @@ -1,5 +1,22 @@ -# Copyright (C) 2016-2017 g10 Code GmbH -# Copyright (C) 2004,2008 Igor Belyi <[email protected]> +# -*- coding: utf-8 -*- + +from __future__ import absolute_import, print_function, unicode_literals + +import re +import os +import warnings +import weakref + +from . import gpgme +from .errors import errorcheck, GPGMEError +from . import constants +from . import errors +from . import util + +del absolute_import, print_function, unicode_literals + +# Copyright (C) 2016-2018 g10 Code GmbH +# Copyright (C) 2004, 2008 Igor Belyi <[email protected]> # Copyright (C) 2002 John Goerzen <[email protected]> # # This library is free software; you can redistribute it and/or @@ -15,7 +32,6 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - """Core functionality Core functionality of GPGME wrapped in a object-oriented fashion. @@ -24,18 +40,6 @@ and the 'Data' class describing buffers of data. """ -from __future__ import absolute_import, print_function, unicode_literals -del absolute_import, print_function, unicode_literals - -import re -import os -import warnings -import weakref -from . import gpgme -from .errors import errorcheck, GPGMEError -from . import constants -from . import errors -from . import util class GpgmeWrapper(object): """Base wrapper class @@ -49,8 +53,8 @@ class GpgmeWrapper(object): self.wrapped = wrapped def __repr__(self): - return '<{}/{!r}>'.format(super(GpgmeWrapper, self).__repr__(), - self.wrapped) + return '<{}/{!r}>'.format( + super(GpgmeWrapper, self).__repr__(), self.wrapped) def __str__(self): acc = ['{}.{}'.format(__name__, self.__class__.__name__)] @@ -64,7 +68,7 @@ class GpgmeWrapper(object): return hash(repr(self.wrapped)) def __eq__(self, other): - if other == None: + if other is None: return False else: return repr(self.wrapped) == repr(other.wrapped) @@ -98,12 +102,12 @@ class GpgmeWrapper(object): _boolean_properties = set() def __wrap_boolean_property(self, key, do_set=False, value=None): - get_func = getattr(gpgme, - "{}get_{}".format(self._cprefix, key)) - set_func = getattr(gpgme, - "{}set_{}".format(self._cprefix, key)) + get_func = getattr(gpgme, "{}get_{}".format(self._cprefix, key)) + set_func = getattr(gpgme, "{}set_{}".format(self._cprefix, key)) + def get(slf): return bool(get_func(slf.wrapped)) + def set_(slf, value): set_func(slf.wrapped, bool(value)) @@ -116,9 +120,10 @@ class GpgmeWrapper(object): return get(self) _munge_docstring = re.compile(r'gpgme_([^(]*)\(([^,]*), (.*\) -> .*)') + def __getattr__(self, key): """On-the-fly generation of wrapper methods and properties""" - if key[0] == '_' or self._cprefix == None: + if key[0] == '_' or self._cprefix is None: return None if key in self._boolean_properties: @@ -128,12 +133,14 @@ class GpgmeWrapper(object): func = getattr(gpgme, name) if self._errorcheck(name): + def _funcwrap(slf, *args): result = func(slf.wrapped, *args) if slf._callback_excinfo: gpgme.gpg_raise_callback_exception(slf) return errorcheck(result, name) else: + def _funcwrap(slf, *args): result = func(slf.wrapped, *args) if slf._callback_excinfo: @@ -149,6 +156,7 @@ class GpgmeWrapper(object): # Bind the method to 'self'. def wrapper(*args): return _funcwrap(self, *args) + wrapper.__doc__ = doc return wrapper @@ -160,6 +168,7 @@ class GpgmeWrapper(object): else: super(GpgmeWrapper, self).__setattr__(key, value) + class Context(GpgmeWrapper): """Context for cryptographic operations @@ -173,10 +182,15 @@ class Context(GpgmeWrapper): """ - def __init__(self, armor=False, textmode=False, offline=False, - signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT, + def __init__(self, + armor=False, + textmode=False, + offline=False, + signers=[], + pinentry_mode=constants.PINENTRY_MODE_DEFAULT, protocol=constants.PROTOCOL_OpenPGP, - wrapped=None, home_dir=None): + wrapped=None, + home_dir=None): """Construct a context object Keyword arguments: @@ -212,22 +226,29 @@ class Context(GpgmeWrapper): Helper function to retrieve the results of an operation, or None if SINK is given. """ - if sink or data == None: + if sink or data is None: return None data.seek(0, os.SEEK_SET) return data.read() def __repr__(self): - return ( - "Context(armor={0.armor}, " - "textmode={0.textmode}, offline={0.offline}, " - "signers={0.signers}, pinentry_mode={0.pinentry_mode}, " - "protocol={0.protocol}, home_dir={0.home_dir}" - ")").format(self) - - def encrypt(self, plaintext, recipients=[], sign=True, sink=None, - passphrase=None, always_trust=False, add_encrypt_to=False, - prepare=False, expect_sign=False, compress=True): + return ("Context(armor={0.armor}, " + "textmode={0.textmode}, offline={0.offline}, " + "signers={0.signers}, pinentry_mode={0.pinentry_mode}, " + "protocol={0.protocol}, home_dir={0.home_dir}" + ")").format(self) + + def encrypt(self, + plaintext, + recipients=[], + sign=True, + sink=None, + passphrase=None, + always_trust=False, + add_encrypt_to=False, + prepare=False, + expect_sign=False, + compress=True): """Encrypt data Encrypt the given plaintext for the given recipients. If the @@ -267,12 +288,14 @@ class Context(GpgmeWrapper): flags |= expect_sign * constants.ENCRYPT_EXPECT_SIGN flags |= (not compress) * constants.ENCRYPT_NO_COMPRESS - if passphrase != None: + if passphrase is not None: old_pinentry_mode = self.pinentry_mode old_passphrase_cb = getattr(self, '_passphrase_cb', None) self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK + def passphrase_cb(hint, desc, prev_bad, hook=None): return passphrase + self.set_passphrase_cb(passphrase_cb) try: @@ -283,25 +306,26 @@ class Context(GpgmeWrapper): except errors.GPGMEError as e: result = self.op_encrypt_result() sig_result = self.op_sign_result() if sign else None - results = (self.__read__(sink, ciphertext), - result, sig_result) + results = (self.__read__(sink, ciphertext), result, sig_result) if e.getcode() == errors.UNUSABLE_PUBKEY: if result.invalid_recipients: - raise errors.InvalidRecipients(result.invalid_recipients, - error=e.error, - results=results) + raise errors.InvalidRecipients( + result.invalid_recipients, + error=e.error, + results=results) if e.getcode() == errors.UNUSABLE_SECKEY: sig_result = self.op_sign_result() if sig_result.invalid_signers: - raise errors.InvalidSigners(sig_result.invalid_signers, - error=e.error, - results=results) + raise errors.InvalidSigners( + sig_result.invalid_signers, + error=e.error, + results=results) # Otherwise, just raise the error, but attach the results # first. e.results = results raise e finally: - if passphrase != None: + if passphrase is not None: self.pinentry_mode = old_pinentry_mode if old_passphrase_cb: self.set_passphrase_cb(*old_passphrase_cb[1:]) @@ -344,12 +368,14 @@ class Context(GpgmeWrapper): """ plaintext = sink if sink else Data() - if passphrase != None: + if passphrase is not None: old_pinentry_mode = self.pinentry_mode old_passphrase_cb = getattr(self, '_passphrase_cb', None) self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK + def passphrase_cb(hint, desc, prev_bad, hook=None): return passphrase + self.set_passphrase_cb(passphrase_cb) try: @@ -361,11 +387,10 @@ class Context(GpgmeWrapper): result = self.op_decrypt_result() verify_result = self.op_verify_result() if verify else None # Just raise the error, but attach the results first. - e.results = (self.__read__(sink, plaintext), - result, verify_result) + e.results = (self.__read__(sink, plaintext), result, verify_result) raise e finally: - if passphrase != None: + if passphrase is not None: self.pinentry_mode = old_pinentry_mode if old_passphrase_cb: self.set_passphrase_cb(*old_passphrase_cb[1:]) @@ -374,15 +399,15 @@ class Context(GpgmeWrapper): verify_result = self.op_verify_result() if verify else None results = (self.__read__(sink, plaintext), result, verify_result) if result.unsupported_algorithm: - raise errors.UnsupportedAlgorithm(result.unsupported_algorithm, - results=results) + raise errors.UnsupportedAlgorithm( + result.unsupported_algorithm, results=results) if verify: if any(s.status != errors.NO_ERROR for s in verify_result.signatures): raise errors.BadSignatures(verify_result, results=results) - if verify and verify != True: + if not verify: # was: if verify and verify != True: missing = list() for key in verify: ok = False @@ -398,8 +423,8 @@ class Context(GpgmeWrapper): if not ok: missing.append(key) if missing: - raise errors.MissingSignatures(verify_result, missing, - results=results) + raise errors.MissingSignatures( + verify_result, missing, results=results) return results @@ -431,13 +456,13 @@ class Context(GpgmeWrapper): try: self.op_sign(data, signeddata, mode) except errors.GPGMEError as e: - results = (self.__read__(sink, signeddata), - self.op_sign_result()) + results = (self.__read__(sink, signeddata), self.op_sign_result()) if e.getcode() == errors.UNUSABLE_SECKEY: if results[1].invalid_signers: - raise errors.InvalidSigners(results[1].invalid_signers, - error=e.error, - results=results) + raise errors.InvalidSigners( + results[1].invalid_signers, + error=e.error, + results=results) e.results = results raise e @@ -481,8 +506,7 @@ class Context(GpgmeWrapper): self.op_verify(signed_data, None, data) except errors.GPGMEError as e: # Just raise the error, but attach the results first. - e.results = (self.__read__(sink, data), - self.op_verify_result()) + e.results = (self.__read__(sink, data), self.op_verify_result()) raise e results = (self.__read__(sink, data), self.op_verify_result()) @@ -504,12 +528,171 @@ class Context(GpgmeWrapper): if not ok: missing.append(key) if missing: - raise errors.MissingSignatures(results[1], missing, - results=results) + raise errors.MissingSignatures( + results[1], missing, results=results) return results - def keylist(self, pattern=None, secret=False, + def key_import(self, data): + """Import data + + Imports the given data into the Context. + + Returns: + -- an object describing the results of imported or updated + keys + + Raises: + TypeError -- Very rarely. + GPGMEError -- as signaled by the underlying library: + + Import status errors, when they occur, will usually + be of NODATA. NO_PUBKEY indicates something + managed to run the function without any + arguments, while an argument of None triggers + the first NODATA of errors.GPGME in the + exception. + """ + try: + self.op_import(data) + result = self.op_import_result() + if result.considered == 0: + status = constants.STATUS_IMPORT_PROBLEM + else: + status = constants.STATUS_KEY_CONSIDERED + except Exception as e: + if e == errors.GPGMEError: + if e.code_str == "No data": + status = constants.STATUS_NODATA + else: + status = constants.STATUS_FILE_ERROR + elif e == TypeError and hasattr(data, "decode") is True: + status = constants.STATUS_NO_PUBKEY + elif e == TypeError and hasattr(data, "encode") is True: + status = constants.STATUS_FILE_ERROR + else: + status = constants.STATUS_ERROR + + if status == constants.STATUS_KEY_CONSIDERED: + import_result = result + else: + import_result = status + + return import_result + + def key_export(self, pattern=None): + """Export keys. + + Exports public keys matching the pattern specified. If no + pattern is specified then exports all available keys. + + Keyword arguments: + pattern -- return keys matching pattern (default: all keys) + + Returns: + -- A key block containing one or more OpenPGP keys in + either ASCII armoured or binary format as determined + by the Context(). If there are no matching keys it + returns None. + + Raises: + GPGMEError -- as signaled by the underlying library. + """ + data = Data() + mode = 0 + try: + self.op_export(pattern, mode, data) + data.seek(0, os.SEEK_SET) + pk_result = data.read() + except GPGMEError as e: + pk_result = e + + if len(pk_result) > 0: + result = pk_result + else: + result = None + + return result + + def key_export_minimal(self, pattern=None): + """Export keys. + + Exports public keys matching the pattern specified in a + minimised format. If no pattern is specified then exports all + available keys. + + Keyword arguments: + pattern -- return keys matching pattern (default: all keys) + + Returns: + -- A key block containing one or more minimised OpenPGP + keys in either ASCII armoured or binary format as + determined by the Context(). If there are no matching + keys it returns None. + + Raises: + GPGMEError -- as signaled by the underlying library. + """ + data = Data() + mode = gpgme.GPGME_EXPORT_MODE_MINIMAL + try: + self.op_export(pattern, mode, data) + data.seek(0, os.SEEK_SET) + pk_result = data.read() + except GPGMEError as e: + pk_result = e + + if len(pk_result) > 0: + result = pk_result + else: + result = None + + return result + + def key_export_secret(self, pattern=None): + """Export secret keys. + + Exports secret keys matching the pattern specified. If no + pattern is specified then exports or attempts to export all + available secret keys. + + IMPORTANT: Each secret key to be exported will prompt for its + passphrase via an invocation of pinentry and gpg-agent. If the + passphrase is not entered or does not match then no data will be + exported. This is the same result as when specifying a pattern + that is not matched by the available keys. + + Keyword arguments: + pattern -- return keys matching pattern (default: all keys) + + Returns: + -- On success a key block containing one or more OpenPGP + secret keys in either ASCII armoured or binary format + as determined by the Context(). + -- On failure while not raising an exception, returns None. + + Raises: + GPGMEError -- as signaled by the underlying library. + """ + data = Data() + mode = gpgme.GPGME_EXPORT_MODE_SECRET + try: + self.op_export(pattern, mode, data) + data.seek(0, os.SEEK_SET) + sk_result = data.read() + except GPGMEError as e: + sk_result = e + + if len(sk_result) > 0: + result = sk_result + else: + result = None + + return result + + def keylist(self, + pattern=None, + secret=False, mode=constants.keylist.mode.LOCAL, source=None): """List keys @@ -544,9 +727,17 @@ class Context(GpgmeWrapper): key = self.op_keylist_next() self.op_keylist_end() - def create_key(self, userid, algorithm=None, expires_in=0, expires=True, - sign=False, encrypt=False, certify=False, authenticate=False, - passphrase=None, force=False): + def create_key(self, + userid, + algorithm=None, + expires_in=0, + expires=True, + sign=False, + encrypt=False, + certify=False, + authenticate=False, + passphrase=None, + force=False): """Create a primary key Create a primary key for the user id USERID. @@ -583,9 +774,10 @@ class Context(GpgmeWrapper): encrypt -- request the encryption capability (see above) certify -- request the certification capability (see above) authenticate -- request the authentication capability (see above) - passphrase -- protect the key with a passphrase (default: no passphrase) - force -- force key creation even if a key with the same userid exists - (default: False) + passphrase -- protect the key with a passphrase (default: no + passphrase) + force -- force key creation even if a key with the same userid + exists (default: False) Returns: -- an object describing the result of the key creation @@ -598,22 +790,26 @@ class Context(GpgmeWrapper): old_pinentry_mode = self.pinentry_mode old_passphrase_cb = getattr(self, '_passphrase_cb', None) self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK + def passphrase_cb(hint, desc, prev_bad, hook=None): return passphrase + self.set_passphrase_cb(passphrase_cb) try: - self.op_createkey(userid, algorithm, - 0, # reserved - expires_in, - None, # extrakey - ((constants.create.SIGN if sign else 0) - | (constants.create.ENCR if encrypt else 0) - | (constants.create.CERT if certify else 0) - | (constants.create.AUTH if authenticate else 0) - | (constants.create.NOPASSWD if passphrase == None else 0) - | (0 if expires else constants.create.NOEXPIRE) - | (constants.create.FORCE if force else 0))) + self.op_createkey( + userid, + algorithm, + 0, # reserved + expires_in, + None, # extrakey + ((constants.create.SIGN if sign else 0) | + (constants.create.ENCR if encrypt else 0) | + (constants.create.CERT if certify else 0) | + (constants.create.AUTH if authenticate else 0) | + (constants.create.NOPASSWD if passphrase is None else 0) | + (0 if expires else constants.create.NOEXPIRE) | + (constants.create.FORCE if force else 0))) finally: if util.is_a_string(passphrase): self.pinentry_mode = old_pinentry_mode @@ -622,8 +818,15 @@ class Context(GpgmeWrapper): return self.op_genkey_result() - def create_subkey(self, key, algorithm=None, expires_in=0, expires=True, - sign=False, encrypt=False, authenticate=False, passphrase=None): + def create_subkey(self, + key, + algorithm=None, + expires_in=0, + expires=True, + sign=False, + encrypt=False, + authenticate=False, + passphrase=None): """Create a subkey Create a subkey for the given KEY. As subkeys are a concept @@ -659,7 +862,8 @@ class Context(GpgmeWrapper): sign -- request the signing capability (see above) encrypt -- request the encryption capability (see above) authenticate -- request the authentication capability (see above) - passphrase -- protect the subkey with a passphrase (default: no passphrase) + passphrase -- protect the subkey with a passphrase (default: no + passphrase) Returns: -- an object describing the result of the subkey creation @@ -672,20 +876,23 @@ class Context(GpgmeWrapper): old_pinentry_mode = self.pinentry_mode old_passphrase_cb = getattr(self, '_passphrase_cb', None) self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK + def passphrase_cb(hint, desc, prev_bad, hook=None): return passphrase + self.set_passphrase_cb(passphrase_cb) try: - self.op_createsubkey(key, algorithm, - 0, # reserved - expires_in, - ((constants.create.SIGN if sign else 0) - | (constants.create.ENCR if encrypt else 0) - | (constants.create.AUTH if authenticate else 0) - | (constants.create.NOPASSWD - if passphrase == None else 0) - | (0 if expires else constants.create.NOEXPIRE))) + self.op_createsubkey( + key, + algorithm, + 0, # reserved + expires_in, + ((constants.create.SIGN if sign else 0) | + (constants.create.ENCR if encrypt else 0) | + (constants.create.AUTH if authenticate else 0) | + (constants.create.NOPASSWD if passphrase is None else 0) | + (0 if expires else constants.create.NOEXPIRE))) finally: if util.is_a_string(passphrase): self.pinentry_mode = old_pinentry_mode @@ -745,8 +952,8 @@ class Context(GpgmeWrapper): """ flags = 0 - if uids == None or util.is_a_string(uids): - pass#through unchanged + if uids is None or util.is_a_string(uids): + pass # through unchanged else: flags |= constants.keysign.LFSEP uids = "\n".join(uids) @@ -771,8 +978,11 @@ class Context(GpgmeWrapper): """ self.op_tofu_policy(key, policy) - def assuan_transact(self, command, - data_cb=None, inquire_cb=None, status_cb=None): + def assuan_transact(self, + command, + data_cb=None, + inquire_cb=None, + status_cb=None): """Issue a raw assuan command This function can be used to issue a raw assuan command to the @@ -803,12 +1013,10 @@ class Context(GpgmeWrapper): errptr = gpgme.new_gpgme_error_t_p() err = gpgme.gpgme_op_assuan_transact_ext( - self.wrapped, - cmd, - (weakref.ref(self), data_cb) if data_cb else None, - (weakref.ref(self), inquire_cb) if inquire_cb else None, - (weakref.ref(self), status_cb) if status_cb else None, - errptr) + self.wrapped, cmd, (weakref.ref(self), data_cb) + if data_cb else None, (weakref.ref(self), inquire_cb) + if inquire_cb else None, (weakref.ref(self), status_cb) + if status_cb else None, errptr) if self._callback_excinfo: gpgme.gpg_raise_callback_exception(self) @@ -836,10 +1044,10 @@ class Context(GpgmeWrapper): GPGMEError -- as signaled by the underlying library """ - if key == None: + if key is None: raise ValueError("First argument cannot be None") - if sink == None: + if sink is None: sink = Data() if fnc_value: @@ -847,8 +1055,8 @@ class Context(GpgmeWrapper): else: opaquedata = (weakref.ref(self), func) - result = gpgme.gpgme_op_interact(self.wrapped, key, flags, - opaquedata, sink) + result = gpgme.gpgme_op_interact(self.wrapped, key, flags, opaquedata, + sink) if self._callback_excinfo: gpgme.gpg_raise_callback_exception(self) errorcheck(result) @@ -857,6 +1065,7 @@ class Context(GpgmeWrapper): def signers(self): """Keys used for signing""" return [self.signers_enum(i) for i in range(self.signers_count())] + @signers.setter def signers(self, signers): old = self.signers @@ -872,6 +1081,7 @@ class Context(GpgmeWrapper): def pinentry_mode(self): """Pinentry mode""" return self.get_pinentry_mode() + @pinentry_mode.setter def pinentry_mode(self, value): self.set_pinentry_mode(value) @@ -880,6 +1090,7 @@ class Context(GpgmeWrapper): def protocol(self): """Protocol to use""" return self.get_protocol() + @protocol.setter def protocol(self, value): errorcheck(gpgme.gpgme_engine_check_version(value)) @@ -889,6 +1100,7 @@ class Context(GpgmeWrapper): def home_dir(self): """Engine's home directory""" return self.engine_info.home_dir + @home_dir.setter def home_dir(self, value): self.set_engine_info(self.protocol, home_dir=value) @@ -901,24 +1113,15 @@ class Context(GpgmeWrapper): # The list of functions is created using: # # $ grep '^gpgme_error_t ' obj/lang/python/python3.5-gpg/gpgme.h \ - # | grep -v _op_ | awk "/\(gpgme_ctx/ { printf (\"'%s',\\n\", \$2) } " - return ((name.startswith('gpgme_op_') - and not name.endswith('_result')) - or name in { - 'gpgme_new', - 'gpgme_set_ctx_flag', - 'gpgme_set_protocol', - 'gpgme_set_sub_protocol', - 'gpgme_set_keylist_mode', - 'gpgme_set_pinentry_mode', - 'gpgme_set_locale', - 'gpgme_ctx_set_engine_info', - 'gpgme_signers_add', - 'gpgme_sig_notation_add', - 'gpgme_set_sender', - 'gpgme_cancel', - 'gpgme_cancel_async', - 'gpgme_get_key', + # | grep -v _op_ | awk "/\(gpgme_ctx/ { printf (\"'%s',\\n\", \$2) } " + return ((name.startswith('gpgme_op_') and not + name.endswith('_result')) or name in { + 'gpgme_new', 'gpgme_set_ctx_flag', 'gpgme_set_protocol', + 'gpgme_set_sub_protocol', 'gpgme_set_keylist_mode', + 'gpgme_set_pinentry_mode', 'gpgme_set_locale', + 'gpgme_ctx_set_engine_info', 'gpgme_signers_add', + 'gpgme_sig_notation_add', 'gpgme_set_sender', + 'gpgme_cancel', 'gpgme_cancel_async', 'gpgme_get_key' }) _boolean_properties = {'armor', 'textmode', 'offline'} @@ -938,6 +1141,7 @@ class Context(GpgmeWrapper): # Implement the context manager protocol. def __enter__(self): return self + def __exit__(self, type, value, tb): self.__del__() @@ -1032,10 +1236,10 @@ class Context(GpgmeWrapper): Please see the GPGME manual for more information. """ - if func == None: + if func is None: hookdata = None else: - if hook == None: + if hook is None: hookdata = (weakref.ref(self), func) else: hookdata = (weakref.ref(self), func, hook) @@ -1057,10 +1261,10 @@ class Context(GpgmeWrapper): Please see the GPGME manual for more information. """ - if func == None: + if func is None: hookdata = None else: - if hook == None: + if hook is None: hookdata = (weakref.ref(self), func) else: hookdata = (weakref.ref(self), func, hook) @@ -1081,10 +1285,10 @@ class Context(GpgmeWrapper): Please see the GPGME manual for more information. """ - if func == None: + if func is None: hookdata = None else: - if hook == None: + if hook is None: hookdata = (weakref.ref(self), func) else: hookdata = (weakref.ref(self), func, hook) @@ -1152,8 +1356,8 @@ class Context(GpgmeWrapper): magic numbers will break as a result. """ - warnings.warn("Call to deprecated method op_edit.", - category=DeprecationWarning) + warnings.warn( + "Call to deprecated method op_edit.", category=DeprecationWarning) return self.interact(key, func, sink=out, fnc_value=fnc_value) @@ -1182,7 +1386,8 @@ class Data(GpgmeWrapper): # This list is compiled using # # $ grep -v '^gpgme_error_t ' obj/lang/python/python3.5-gpg/gpgme.h \ - # | awk "/\(gpgme_data_t/ { printf (\"'%s',\\n\", \$2) } " | sed "s/'\\*/'/" + # | awk "/\(gpgme_data_t/ { printf (\"'%s',\\n\", \$2) } " \ + # | sed "s/'\\*/'/" return name not in { 'gpgme_data_read', 'gpgme_data_write', @@ -1194,8 +1399,13 @@ class Data(GpgmeWrapper): 'gpgme_data_identify', } - def __init__(self, string=None, file=None, offset=None, - length=None, cbs=None, copy=True): + def __init__(self, + string=None, + file=None, + offset=None, + length=None, + cbs=None, + copy=True): """Initialize a new gpgme_data_t object. If no args are specified, make it an empty object. @@ -1239,13 +1449,13 @@ class Data(GpgmeWrapper): super(Data, self).__init__(None) self.data_cbs = None - if cbs != None: + if cbs is not None: self.new_from_cbs(*cbs) - elif string != None: + elif string is not None: self.new_from_mem(string, copy) - elif file != None and offset != None and length != None: + elif file is not None and offset is not None and length is not None: self.new_from_filepart(file, offset, length) - elif file != None: + elif file is not None: if util.is_a_string(file): self.new_from_file(file, copy) else: @@ -1258,7 +1468,7 @@ class Data(GpgmeWrapper): # At interpreter shutdown, gpgme is set to NONE. return - if self.wrapped != None and gpgme.gpgme_data_release: + if self.wrapped is not None and gpgme.gpgme_data_release: gpgme.gpgme_data_release(self.wrapped) if self._callback_excinfo: gpgme.gpg_raise_callback_exception(self) @@ -1268,6 +1478,7 @@ class Data(GpgmeWrapper): # Implement the context manager protocol. def __enter__(self): return self + def __exit__(self, type, value, tb): self.__del__() @@ -1282,7 +1493,8 @@ class Data(GpgmeWrapper): def new_from_mem(self, string, copy=True): tmp = gpgme.new_gpgme_data_t_p() - errorcheck(gpgme.gpgme_data_new_from_mem(tmp,string,len(string),copy)) + errorcheck( + gpgme.gpgme_data_new_from_mem(tmp, string, len(string), copy)) self.wrapped = gpgme.gpgme_data_t_p_value(tmp) gpgme.delete_gpgme_data_t_p(tmp) @@ -1300,12 +1512,12 @@ class Data(GpgmeWrapper): def new_from_cbs(self, read_cb, write_cb, seek_cb, release_cb, hook=None): tmp = gpgme.new_gpgme_data_t_p() - if hook != None: - hookdata = (weakref.ref(self), - read_cb, write_cb, seek_cb, release_cb, hook) + if hook is not None: + hookdata = (weakref.ref(self), read_cb, write_cb, seek_cb, + release_cb, hook) else: - hookdata = (weakref.ref(self), - read_cb, write_cb, seek_cb, release_cb) + hookdata = (weakref.ref(self), read_cb, write_cb, seek_cb, + release_cb) gpgme.gpg_data_new_from_cbs(self, hookdata, tmp) self.wrapped = gpgme.gpgme_data_t_p_value(tmp) gpgme.delete_gpgme_data_t_p(tmp) @@ -1327,12 +1539,13 @@ class Data(GpgmeWrapper): filename = file else: fp = gpgme.fdopen(file.fileno(), file.mode) - if fp == None: - raise ValueError("Failed to open file from %s arg %s" % \ - (str(type(file)), str(file))) + if fp is None: + raise ValueError("Failed to open file from %s arg %s" % (str( + type(file)), str(file))) - errorcheck(gpgme.gpgme_data_new_from_filepart(tmp, filename, fp, - offset, length)) + errorcheck( + gpgme.gpgme_data_new_from_filepart(tmp, filename, fp, offset, + length)) self.wrapped = gpgme.gpgme_data_t_p_value(tmp) gpgme.delete_gpgme_data_t_p(tmp) @@ -1365,7 +1578,7 @@ class Data(GpgmeWrapper): raise GPGMEError.fromSyserror() return written - def read(self, size = -1): + def read(self, size=-1): """Read at most size bytes, returned as bytes. If the size argument is negative or omitted, read until EOF is reached. @@ -1400,6 +1613,7 @@ class Data(GpgmeWrapper): chunks.append(result) return b''.join(chunks) + def pubkey_algo_string(subkey): """Return short algorithm string @@ -1412,6 +1626,7 @@ def pubkey_algo_string(subkey): """ return gpgme.gpgme_pubkey_algo_string(subkey) + def pubkey_algo_name(algo): """Return name of public key algorithm @@ -1424,6 +1639,7 @@ def pubkey_algo_name(algo): """ return gpgme.gpgme_pubkey_algo_name(algo) + def hash_algo_name(algo): """Return name of hash algorithm @@ -1436,6 +1652,7 @@ def hash_algo_name(algo): """ return gpgme.gpgme_hash_algo_name(algo) + def get_protocol_name(proto): """Get protocol description @@ -1447,6 +1664,7 @@ def get_protocol_name(proto): """ return gpgme.gpgme_get_protocol_name(proto) + def addrspec_from_uid(uid): """Return the address spec @@ -1458,22 +1676,26 @@ def addrspec_from_uid(uid): """ return gpgme.gpgme_addrspec_from_uid(uid) + def check_version(version=None): return gpgme.gpgme_check_version(version) + # check_version also makes sure that several subsystems are properly # initialized, and it must be run at least once before invoking any # other function. We do it here so that the user does not have to do # it unless she really wants to check for a certain version. check_version() -def engine_check_version (proto): + +def engine_check_version(proto): try: errorcheck(gpgme.gpgme_engine_check_version(proto)) return True except errors.GPGMEError: return False + def get_engine_info(): ptr = gpgme.new_gpgme_engine_info_t_p() try: @@ -1484,6 +1706,7 @@ def get_engine_info(): gpgme.delete_gpgme_engine_info_t_p(ptr) return info + def set_engine_info(proto, file_name, home_dir=None): """Changes the default configuration of the crypto engine implementing the protocol 'proto'. 'file_name' is the file name of @@ -1492,10 +1715,12 @@ def set_engine_info(proto, file_name, home_dir=None): used if omitted).""" errorcheck(gpgme.gpgme_set_engine_info(proto, file_name, home_dir)) + def set_locale(category, value): """Sets the default locale used by contexts""" errorcheck(gpgme.gpgme_set_locale(None, category, value)) + def wait(hang): """Wait for asynchronous call on any Context to finish. Wait forever if hang is True. @@ -1509,7 +1734,7 @@ def wait(hang): context = gpgme.gpgme_wait(None, ptr, hang) status = gpgme.gpgme_error_t_p_value(ptr) gpgme.delete_gpgme_error_t_p(ptr) - if context == None: + if context is None: errorcheck(status) else: context = Context(context) |