diff options
author | Daniel Kahn Gillmor <[email protected]> | 2016-10-28 20:45:49 +0000 |
---|---|---|
committer | Justus Winter <[email protected]> | 2016-10-31 14:42:27 +0000 |
commit | 2fac017618a76882605125b05ff1f7393fe99860 (patch) | |
tree | b7eddc17682b6aa91b8c3c723a2fc02f28ac0a6c /lang/python/gpg/core.py | |
parent | core: New API functions gpgme_set_sender, gpgme_get_sender. (diff) | |
download | gpgme-2fac017618a76882605125b05ff1f7393fe99860.tar.gz gpgme-2fac017618a76882605125b05ff1f7393fe99860.zip |
python: Rename Python module from PyME to gpg.
This follows weeks of discussion on the gnupg-devel mailing list.
Hopefully it will make it easier for people using Python to use GnuPG
in the future.
Signed-off-by: Daniel Kahn Gillmor <[email protected]>
Diffstat (limited to 'lang/python/gpg/core.py')
-rw-r--r-- | lang/python/gpg/core.py | 1164 |
1 files changed, 1164 insertions, 0 deletions
diff --git a/lang/python/gpg/core.py b/lang/python/gpg/core.py new file mode 100644 index 00000000..748bcbb9 --- /dev/null +++ b/lang/python/gpg/core.py @@ -0,0 +1,1164 @@ +# Copyright (C) 2016 g10 Code GmbH +# Copyright (C) 2004,2008 Igor Belyi <[email protected]> +# Copyright (C) 2002 John Goerzen <[email protected]> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +"""Core functionality + +Core functionality of GPGME wrapped in a object-oriented fashion. +Provides the 'Context' class for performing cryptographic operations, +and the 'Data' class describing buffers of data. + +""" + +from __future__ import absolute_import, print_function, unicode_literals +del absolute_import, print_function, unicode_literals + +import re +import os +import warnings +import weakref +from . import gpgme +from .errors import errorcheck, GPGMEError +from . import constants +from . import errors +from . import util + +class GpgmeWrapper(object): + """Base wrapper class + + Not to be instantiated directly. + + """ + + def __init__(self, wrapped): + self._callback_excinfo = None + self.wrapped = wrapped + + def __repr__(self): + return '<{}/{!r}>'.format(super(GpgmeWrapper, self).__repr__(), + self.wrapped) + + def __str__(self): + acc = ['{}.{}'.format(__name__, self.__class__.__name__)] + flags = [f for f in self._boolean_properties if getattr(self, f)] + if flags: + acc.append('({})'.format(' '.join(flags))) + + return '<{}>'.format(' '.join(acc)) + + def __hash__(self): + return hash(repr(self.wrapped)) + + def __eq__(self, other): + if other == None: + return False + else: + return repr(self.wrapped) == repr(other.wrapped) + + @property + def _ctype(self): + """The name of the c type wrapped by this class + + Must be set by child classes. + + """ + raise NotImplementedError() + + @property + def _cprefix(self): + """The common prefix of c functions wrapped by this class + + Must be set by child classes. + + """ + raise NotImplementedError() + + def _errorcheck(self, name): + """Must be implemented by child classes. + + This function must return a trueish value for all c functions + returning gpgme_error_t.""" + raise NotImplementedError() + + """The set of all boolean properties""" + _boolean_properties = set() + + def __wrap_boolean_property(self, key, do_set=False, value=None): + get_func = getattr(gpgme, + "{}get_{}".format(self._cprefix, key)) + set_func = getattr(gpgme, + "{}set_{}".format(self._cprefix, key)) + def get(slf): + return bool(get_func(slf.wrapped)) + def set_(slf, value): + set_func(slf.wrapped, bool(value)) + + p = property(get, set_, doc="{} flag".format(key)) + setattr(self.__class__, key, p) + + if do_set: + set_(self, bool(value)) + else: + return get(self) + + _munge_docstring = re.compile(r'gpgme_([^(]*)\(([^,]*), (.*\) -> .*)') + def __getattr__(self, key): + """On-the-fly generation of wrapper methods and properties""" + if key[0] == '_' or self._cprefix == None: + return None + + if key in self._boolean_properties: + return self.__wrap_boolean_property(key) + + name = self._cprefix + key + func = getattr(gpgme, name) + + if self._errorcheck(name): + def _funcwrap(slf, *args): + result = func(slf.wrapped, *args) + if slf._callback_excinfo: + gpgme.gpg_raise_callback_exception(slf) + return errorcheck(result, "Invocation of " + name) + else: + def _funcwrap(slf, *args): + result = func(slf.wrapped, *args) + if slf._callback_excinfo: + gpgme.gpg_raise_callback_exception(slf) + return result + + doc = self._munge_docstring.sub(r'\2.\1(\3', getattr(func, "__doc__")) + _funcwrap.__doc__ = doc + + # Monkey-patch the class. + setattr(self.__class__, key, _funcwrap) + + # Bind the method to 'self'. + def wrapper(*args): + return _funcwrap(self, *args) + wrapper.__doc__ = doc + + return wrapper + + def __setattr__(self, key, value): + """On-the-fly generation of properties""" + if key in self._boolean_properties: + self.__wrap_boolean_property(key, True, value) + else: + super(GpgmeWrapper, self).__setattr__(key, value) + +class Context(GpgmeWrapper): + """Context for cryptographic operations + + All cryptographic operations in GPGME are performed within a + context, which contains the internal state of the operation as + well as configuration parameters. By using several contexts you + can run several cryptographic operations in parallel, with + different configuration. + + Access to a context must be synchronized. + + """ + + def __init__(self, armor=False, textmode=False, offline=False, + signers=[], pinentry_mode=constants.PINENTRY_MODE_DEFAULT, + protocol=constants.PROTOCOL_OpenPGP, + wrapped=None): + """Construct a context object + + Keyword arguments: + armor -- enable ASCII armoring (default False) + textmode -- enable canonical text mode (default False) + offline -- do not contact external key sources (default False) + signers -- list of keys used for signing (default []) + pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT) + protocol -- protocol to use (default PROTOCOL_OpenPGP) + + """ + if wrapped: + self.own = False + else: + tmp = gpgme.new_gpgme_ctx_t_p() + errorcheck(gpgme.gpgme_new(tmp)) + wrapped = gpgme.gpgme_ctx_t_p_value(tmp) + gpgme.delete_gpgme_ctx_t_p(tmp) + self.own = True + super(Context, self).__init__(wrapped) + self.armor = armor + self.textmode = textmode + self.offline = offline + self.signers = signers + self.pinentry_mode = pinentry_mode + self.protocol = protocol + + def encrypt(self, plaintext, recipients=[], sign=True, sink=None, + passphrase=None, always_trust=False, add_encrypt_to=False, + prepare=False, expect_sign=False, compress=True): + """Encrypt data + + Encrypt the given plaintext for the given recipients. If the + list of recipients is empty, the data is encrypted + symmetrically with a passphrase. + + The passphrase can be given as parameter, using a callback + registered at the context, or out-of-band via pinentry. + + Keyword arguments: + recipients -- list of keys to encrypt to + sign -- sign plaintext (default True) + sink -- write result to sink instead of returning it + passphrase -- for symmetric encryption + always_trust -- always trust the keys (default False) + add_encrypt_to -- encrypt to configured additional keys (default False) + prepare -- (ui) prepare for encryption (default False) + expect_sign -- (ui) prepare for signing (default False) + compress -- compress plaintext (default True) + + Returns: + ciphertext -- the encrypted data (or None if sink is given) + result -- additional information about the encryption + sign_result -- additional information about the signature(s) + + Raises: + InvalidRecipients -- if encryption using a particular key failed + InvalidSigners -- if signing using a particular key failed + GPGMEError -- as signaled by the underlying library + + """ + ciphertext = sink if sink else Data() + flags = 0 + flags |= always_trust * constants.ENCRYPT_ALWAYS_TRUST + flags |= (not add_encrypt_to) * constants.ENCRYPT_NO_ENCRYPT_TO + flags |= prepare * constants.ENCRYPT_PREPARE + flags |= expect_sign * constants.ENCRYPT_EXPECT_SIGN + flags |= (not compress) * constants.ENCRYPT_NO_COMPRESS + + if passphrase != None: + old_pinentry_mode = self.pinentry_mode + old_passphrase_cb = getattr(self, '_passphrase_cb', None) + self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK + def passphrase_cb(hint, desc, prev_bad, hook=None): + return passphrase + self.set_passphrase_cb(passphrase_cb) + + try: + if sign: + self.op_encrypt_sign(recipients, flags, plaintext, ciphertext) + else: + self.op_encrypt(recipients, flags, plaintext, ciphertext) + except errors.GPGMEError as e: + if e.getcode() == errors.UNUSABLE_PUBKEY: + result = self.op_encrypt_result() + if result.invalid_recipients: + raise errors.InvalidRecipients(result.invalid_recipients) + if e.getcode() == errors.UNUSABLE_SECKEY: + sig_result = self.op_sign_result() + if sig_result.invalid_signers: + raise errors.InvalidSigners(sig_result.invalid_signers) + raise + finally: + if passphrase != None: + self.pinentry_mode = old_pinentry_mode + if old_passphrase_cb: + self.set_passphrase_cb(*old_passphrase_cb[1:]) + + result = self.op_encrypt_result() + assert not result.invalid_recipients + sig_result = self.op_sign_result() if sign else None + assert not sig_result or not sig_result.invalid_signers + + cipherbytes = None + if not sink: + ciphertext.seek(0, os.SEEK_SET) + cipherbytes = ciphertext.read() + return cipherbytes, result, sig_result + + def decrypt(self, ciphertext, sink=None, passphrase=None, verify=True): + """Decrypt data + + Decrypt the given ciphertext and verify any signatures. If + VERIFY is an iterable of keys, the ciphertext must be signed + by all those keys, otherwise an error is raised. + + If the ciphertext is symmetrically encrypted using a + passphrase, that passphrase can be given as parameter, using a + callback registered at the context, or out-of-band via + pinentry. + + Keyword arguments: + sink -- write result to sink instead of returning it + passphrase -- for symmetric decryption + verify -- check signatures (default True) + + Returns: + plaintext -- the decrypted data (or None if sink is given) + result -- additional information about the decryption + verify_result -- additional information about the signature(s) + + Raises: + UnsupportedAlgorithm -- if an unsupported algorithm was used + BadSignatures -- if a bad signature is encountered + MissingSignatures -- if expected signatures are missing or bad + GPGMEError -- as signaled by the underlying library + + """ + plaintext = sink if sink else Data() + + if passphrase != None: + old_pinentry_mode = self.pinentry_mode + old_passphrase_cb = getattr(self, '_passphrase_cb', None) + self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK + def passphrase_cb(hint, desc, prev_bad, hook=None): + return passphrase + self.set_passphrase_cb(passphrase_cb) + + try: + if verify: + self.op_decrypt_verify(ciphertext, plaintext) + else: + self.op_decrypt(ciphertext, plaintext) + finally: + if passphrase != None: + self.pinentry_mode = old_pinentry_mode + if old_passphrase_cb: + self.set_passphrase_cb(*old_passphrase_cb[1:]) + + result = self.op_decrypt_result() + verify_result = self.op_verify_result() if verify else None + if result.unsupported_algorithm: + raise errors.UnsupportedAlgorithm(result.unsupported_algorithm) + + if verify: + if any(s.status != errors.NO_ERROR + for s in verify_result.signatures): + raise errors.BadSignatures(verify_result) + + if verify and verify != True: + missing = list() + for key in verify: + ok = False + for subkey in key.subkeys: + for sig in verify_result.signatures: + if sig.summary & constants.SIGSUM_VALID == 0: + continue + if subkey.can_sign and subkey.fpr == sig.fpr: + ok = True + break + if ok: + break + if not ok: + missing.append(key) + if missing: + raise errors.MissingSignatures(verify_result, missing) + + plainbytes = None + if not sink: + plaintext.seek(0, os.SEEK_SET) + plainbytes = plaintext.read() + return plainbytes, result, verify_result + + def sign(self, data, sink=None, mode=constants.SIG_MODE_NORMAL): + """Sign data + + Sign the given data with either the configured default local + key, or the 'signers' keys of this context. + + Keyword arguments: + mode -- signature mode (default: normal, see below) + sink -- write result to sink instead of returning it + + Returns: + either + signed_data -- encoded data and signature (normal mode) + signature -- only the signature data (detached mode) + cleartext -- data and signature as text (cleartext mode) + (or None if sink is given) + result -- additional information about the signature(s) + + Raises: + InvalidSigners -- if signing using a particular key failed + GPGMEError -- as signaled by the underlying library + + """ + signeddata = sink if sink else Data() + + try: + self.op_sign(data, signeddata, mode) + except errors.GPGMEError as e: + if e.getcode() == errors.UNUSABLE_SECKEY: + result = self.op_sign_result() + if result.invalid_signers: + raise errors.InvalidSigners(result.invalid_signers) + raise + + result = self.op_sign_result() + assert not result.invalid_signers + + signedbytes = None + if not sink: + signeddata.seek(0, os.SEEK_SET) + signedbytes = signeddata.read() + return signedbytes, result + + def verify(self, signed_data, signature=None, sink=None, verify=[]): + """Verify signatures + + Verify signatures over data. If VERIFY is an iterable of + keys, the ciphertext must be signed by all those keys, + otherwise an error is raised. + + Keyword arguments: + signature -- detached signature data + sink -- write result to sink instead of returning it + + Returns: + data -- the plain data + (or None if sink is given, or we verified a detached signature) + result -- additional information about the signature(s) + + Raises: + BadSignatures -- if a bad signature is encountered + MissingSignatures -- if expected signatures are missing or bad + GPGMEError -- as signaled by the underlying library + + """ + if signature: + # Detached signature, we don't return the plain text. + data = None + else: + data = sink if sink else Data() + + if signature: + self.op_verify(signature, signed_data, None) + else: + self.op_verify(signed_data, None, data) + + result = self.op_verify_result() + if any(s.status != errors.NO_ERROR for s in result.signatures): + raise errors.BadSignatures(result) + + missing = list() + for key in verify: + ok = False + for subkey in key.subkeys: + for sig in result.signatures: + if sig.summary & constants.SIGSUM_VALID == 0: + continue + if subkey.can_sign and subkey.fpr == sig.fpr: + ok = True + break + if ok: + break + if not ok: + missing.append(key) + if missing: + raise errors.MissingSignatures(result, missing) + + plainbytes = None + if data and not sink: + data.seek(0, os.SEEK_SET) + plainbytes = data.read() + return plainbytes, result + + def keylist(self, pattern=None, secret=False): + """List keys + + Keyword arguments: + pattern -- return keys matching pattern (default: all keys) + secret -- return only secret keys + + Returns: + -- an iterator returning key objects + + Raises: + GPGMEError -- as signaled by the underlying library + """ + return self.op_keylist_all(pattern, secret) + + def assuan_transact(self, command, + data_cb=None, inquire_cb=None, status_cb=None): + """Issue a raw assuan command + + This function can be used to issue a raw assuan command to the + engine. + + If command is a string or bytes, it will be used as-is. If it + is an iterable of strings, it will be properly escaped and + joined into an well-formed assuan command. + + Keyword arguments: + data_cb -- a callback receiving data lines + inquire_cb -- a callback providing more information + status_cb -- a callback receiving status lines + + Returns: + result -- the result of command as GPGMEError + + Raises: + GPGMEError -- as signaled by the underlying library + + """ + + if isinstance(command, (str, bytes)): + cmd = command + else: + cmd = " ".join(util.percent_escape(f) for f in command) + + errptr = gpgme.new_gpgme_error_t_p() + + err = gpgme.gpgme_op_assuan_transact_ext( + self.wrapped, + cmd, + (weakref.ref(self), data_cb) if data_cb else None, + (weakref.ref(self), inquire_cb) if inquire_cb else None, + (weakref.ref(self), status_cb) if status_cb else None, + errptr) + + if self._callback_excinfo: + gpgme.gpg_raise_callback_exception(self) + + errorcheck(err) + + status = gpgme.gpgme_error_t_p_value(errptr) + gpgme.delete_gpgme_error_t_p(errptr) + + return GPGMEError(status) if status != 0 else None + + def interact(self, key, func, sink=None, flags=0, fnc_value=None): + """Interact with the engine + + This method can be used to edit keys and cards interactively. + KEY is the key to edit, FUNC is called repeatedly with two + unicode arguments, 'keyword' and 'args'. See the GPGME manual + for details. + + Keyword arguments: + sink -- if given, additional output is written here + flags -- use constants.INTERACT_CARD to edit a card + + Raises: + GPGMEError -- as signaled by the underlying library + + """ + if key == None: + raise ValueError("First argument cannot be None") + + if sink == None: + sink = Data() + + if fnc_value: + opaquedata = (weakref.ref(self), func, fnc_value) + else: + opaquedata = (weakref.ref(self), func) + + result = gpgme.gpgme_op_interact(self.wrapped, key, flags, + opaquedata, sink) + if self._callback_excinfo: + gpgme.gpg_raise_callback_exception(self) + errorcheck(result) + + @property + def signers(self): + """Keys used for signing""" + return [self.signers_enum(i) for i in range(self.signers_count())] + @signers.setter + def signers(self, signers): + old = self.signers + self.signers_clear() + try: + for key in signers: + self.signers_add(key) + except: + self.signers = old + raise + + @property + def pinentry_mode(self): + """Pinentry mode""" + return self.get_pinentry_mode() + @pinentry_mode.setter + def pinentry_mode(self, value): + self.set_pinentry_mode(value) + + @property + def protocol(self): + """Protocol to use""" + return self.get_protocol() + @protocol.setter + def protocol(self, value): + errorcheck(gpgme.gpgme_engine_check_version(value)) + self.set_protocol(value) + + _ctype = 'gpgme_ctx_t' + _cprefix = 'gpgme_' + + def _errorcheck(self, name): + """This function should list all functions returning gpgme_error_t""" + return ((name.startswith('gpgme_op_') + and not name.endswith('_result')) + or name in { + 'gpgme_set_ctx_flag', + 'gpgme_set_protocol', + 'gpgme_set_sub_protocol', + 'gpgme_set_keylist_mode', + 'gpgme_set_pinentry_mode', + 'gpgme_set_locale', + 'gpgme_set_engine_info', + 'gpgme_signers_add', + 'gpgme_get_sig_key', + 'gpgme_sig_notation_add', + 'gpgme_cancel', + 'gpgme_cancel_async', + 'gpgme_cancel_get_key', + }) + + _boolean_properties = {'armor', 'textmode', 'offline'} + + def __del__(self): + if not gpgme: + # At interpreter shutdown, gpgme is set to NONE. + return + + self._free_passcb() + self._free_progresscb() + self._free_statuscb() + if self.own and self.wrapped and gpgme.gpgme_release: + gpgme.gpgme_release(self.wrapped) + self.wrapped = None + + # Implement the context manager protocol. + def __enter__(self): + return self + def __exit__(self, type, value, tb): + self.__del__() + + def op_keylist_all(self, *args, **kwargs): + self.op_keylist_start(*args, **kwargs) + key = self.op_keylist_next() + while key: + yield key + key = self.op_keylist_next() + self.op_keylist_end() + + def op_keylist_next(self): + """Returns the next key in the list created + by a call to op_keylist_start(). The object returned + is of type Key.""" + ptr = gpgme.new_gpgme_key_t_p() + try: + errorcheck(gpgme.gpgme_op_keylist_next(self.wrapped, ptr)) + key = gpgme.gpgme_key_t_p_value(ptr) + except errors.GPGMEError as excp: + key = None + if excp.getcode() != errors.EOF: + raise excp + gpgme.delete_gpgme_key_t_p(ptr) + if key: + key.__del__ = lambda self: gpgme.gpgme_key_unref(self) + return key + + def get_key(self, fpr, secret=False): + """Get a key given a fingerprint + + Keyword arguments: + secret -- to request a secret key + + Returns: + -- the matching key + + Raises: + KeyError -- if the key was not found + GPGMEError -- as signaled by the underlying library + + """ + ptr = gpgme.new_gpgme_key_t_p() + + try: + errorcheck(gpgme.gpgme_get_key(self.wrapped, fpr, ptr, secret)) + except errors.GPGMEError as e: + if e.getcode() == errors.EOF: + raise errors.KeyNotFound(fpr) + raise e + + key = gpgme.gpgme_key_t_p_value(ptr) + gpgme.delete_gpgme_key_t_p(ptr) + assert key + key.__del__ = lambda self: gpgme.gpgme_key_unref(self) + return key + + def op_trustlist_all(self, *args, **kwargs): + self.op_trustlist_start(*args, **kwargs) + trust = self.op_trustlist_next() + while trust: + yield trust + trust = self.op_trustlist_next() + self.op_trustlist_end() + + def op_trustlist_next(self): + """Returns the next trust item in the list created + by a call to op_trustlist_start(). The object returned + is of type TrustItem.""" + ptr = gpgme.new_gpgme_trust_item_t_p() + try: + errorcheck(gpgme.gpgme_op_trustlist_next(self.wrapped, ptr)) + trust = gpgme.gpgme_trust_item_t_p_value(ptr) + except errors.GPGMEError as excp: + trust = None + if excp.getcode() != errors.EOF: + raise + gpgme.delete_gpgme_trust_item_t_p(ptr) + return trust + + def set_passphrase_cb(self, func, hook=None): + """Sets the passphrase callback to the function specified by func. + + When the system needs a passphrase, it will call func with three args: + hint, a string describing the key it needs the passphrase for; + desc, a string describing the passphrase it needs; + prev_bad, a boolean equal True if this is a call made after + unsuccessful previous attempt. + + If hook has a value other than None it will be passed into the func + as a forth argument. + + Please see the GPGME manual for more information. + """ + if func == None: + hookdata = None + else: + if hook == None: + hookdata = (weakref.ref(self), func) + else: + hookdata = (weakref.ref(self), func, hook) + gpgme.gpg_set_passphrase_cb(self, hookdata) + + def _free_passcb(self): + if gpgme.gpg_set_passphrase_cb: + self.set_passphrase_cb(None) + + def set_progress_cb(self, func, hook=None): + """Sets the progress meter callback to the function specified by FUNC. + If FUNC is None, the callback will be cleared. + + This function will be called to provide an interactive update + of the system's progress. The function will be called with + three arguments, type, total, and current. If HOOK is not + None, it will be supplied as fourth argument. + + Please see the GPGME manual for more information. + + """ + if func == None: + hookdata = None + else: + if hook == None: + hookdata = (weakref.ref(self), func) + else: + hookdata = (weakref.ref(self), func, hook) + gpgme.gpg_set_progress_cb(self, hookdata) + + def _free_progresscb(self): + if gpgme.gpg_set_progress_cb: + self.set_progress_cb(None) + + def set_status_cb(self, func, hook=None): + """Sets the status callback to the function specified by FUNC. If + FUNC is None, the callback will be cleared. + + The function will be called with two arguments, keyword and + args. If HOOK is not None, it will be supplied as third + argument. + + Please see the GPGME manual for more information. + + """ + if func == None: + hookdata = None + else: + if hook == None: + hookdata = (weakref.ref(self), func) + else: + hookdata = (weakref.ref(self), func, hook) + gpgme.gpg_set_status_cb(self, hookdata) + + def _free_statuscb(self): + if gpgme.gpg_set_status_cb: + self.set_status_cb(None) + + @property + def engine_info(self): + """Configuration of the engine currently in use""" + p = self.protocol + infos = [i for i in self.get_engine_info() if i.protocol == p] + assert len(infos) == 1 + return infos[0] + + def get_engine_info(self): + """Get engine configuration + + Returns information about all configured and installed + engines. + + Returns: + infos -- a list of engine infos + + """ + return gpgme.gpgme_ctx_get_engine_info(self.wrapped) + + def set_engine_info(self, proto, file_name=None, home_dir=None): + """Change engine configuration + + Changes the configuration of the crypto engine implementing + the protocol 'proto' for the context. + + Keyword arguments: + file_name -- engine program file name (unchanged if None) + home_dir -- configuration directory (unchanged if None) + + """ + errorcheck(gpgme.gpgme_ctx_set_engine_info( + self.wrapped, proto, file_name, home_dir)) + + def wait(self, hang): + """Wait for asynchronous call to finish. Wait forever if hang is True. + Raises an exception on errors. + + Please read the GPGME manual for more information. + + """ + ptr = gpgme.new_gpgme_error_t_p() + gpgme.gpgme_wait(self.wrapped, ptr, hang) + status = gpgme.gpgme_error_t_p_value(ptr) + gpgme.delete_gpgme_error_t_p(ptr) + errorcheck(status) + + def op_edit(self, key, func, fnc_value, out): + """Start key editing using supplied callback function + + Note: This interface is deprecated and will be removed with + GPGME 1.8. Please use .interact instead. Furthermore, we + implement this using gpgme_op_interact, so callbacks will get + called with string keywords instead of numeric status + messages. Code that is using constants.STATUS_X or + constants.status.X will continue to work, whereas code using + magic numbers will break as a result. + + """ + warnings.warn("Call to deprecated method op_edit.", + category=DeprecationWarning) + return self.interact(key, func, sink=out, fnc_value=fnc_value) + + +class Data(GpgmeWrapper): + """Data buffer + + A lot of data has to be exchanged between the user and the crypto + engine, like plaintext messages, ciphertext, signatures and + information about the keys. The technical details about + exchanging the data information are completely abstracted by + GPGME. The user provides and receives the data via `gpgme_data_t' + objects, regardless of the communication protocol between GPGME + and the crypto engine in use. + + This Data class is the implementation of the GpgmeData objects. + + Please see the information about __init__ for instantiation. + + """ + + _ctype = 'gpgme_data_t' + _cprefix = 'gpgme_data_' + + def _errorcheck(self, name): + """This function should list all functions returning gpgme_error_t""" + return name not in { + 'gpgme_data_release_and_get_mem', + 'gpgme_data_get_encoding', + 'gpgme_data_seek', + 'gpgme_data_get_file_name', + } + + def __init__(self, string=None, file=None, offset=None, + length=None, cbs=None, copy=True): + """Initialize a new gpgme_data_t object. + + If no args are specified, make it an empty object. + + If string alone is specified, initialize it with the data + contained there. + + If file, offset, and length are all specified, file must + be either a filename or a file-like object, and the object + will be initialized by reading the specified chunk from the file. + + If cbs is specified, it MUST be a tuple of the form: + + (read_cb, write_cb, seek_cb, release_cb[, hook]) + + where the first four items are functions implementing reading, + writing, seeking the data, and releasing any resources once + the data object is deallocated. The functions must match the + following prototypes: + + def read(amount, hook=None): + return <a b"bytes" object> + + def write(data, hook=None): + return <the number of bytes written> + + def seek(offset, whence, hook=None): + return <the new file position> + + def release(hook=None): + <return value and exceptions are ignored> + + The functions may be bound methods. In that case, you can + simply use the 'self' reference instead of using a hook. + + If file is specified without any other arguments, then + it must be a filename, and the object will be initialized from + that file. + + """ + super(Data, self).__init__(None) + self.data_cbs = None + + if cbs != None: + self.new_from_cbs(*cbs) + elif string != None: + self.new_from_mem(string, copy) + elif file != None and offset != None and length != None: + self.new_from_filepart(file, offset, length) + elif file != None: + if util.is_a_string(file): + self.new_from_file(file, copy) + else: + self.new_from_fd(file) + else: + self.new() + + def __del__(self): + if not gpgme: + # At interpreter shutdown, gpgme is set to NONE. + return + + if self.wrapped != None and gpgme.gpgme_data_release: + gpgme.gpgme_data_release(self.wrapped) + if self._callback_excinfo: + gpgme.gpg_raise_callback_exception(self) + self.wrapped = None + self._free_datacbs() + + # Implement the context manager protocol. + def __enter__(self): + return self + def __exit__(self, type, value, tb): + self.__del__() + + def _free_datacbs(self): + self._data_cbs = None + + def new(self): + tmp = gpgme.new_gpgme_data_t_p() + errorcheck(gpgme.gpgme_data_new(tmp)) + self.wrapped = gpgme.gpgme_data_t_p_value(tmp) + gpgme.delete_gpgme_data_t_p(tmp) + + def new_from_mem(self, string, copy=True): + tmp = gpgme.new_gpgme_data_t_p() + errorcheck(gpgme.gpgme_data_new_from_mem(tmp,string,len(string),copy)) + self.wrapped = gpgme.gpgme_data_t_p_value(tmp) + gpgme.delete_gpgme_data_t_p(tmp) + + def new_from_file(self, filename, copy=True): + tmp = gpgme.new_gpgme_data_t_p() + try: + errorcheck(gpgme.gpgme_data_new_from_file(tmp, filename, copy)) + except errors.GPGMEError as e: + if e.getcode() == errors.INV_VALUE and not copy: + raise ValueError("delayed reads are not yet supported") + else: + raise e + self.wrapped = gpgme.gpgme_data_t_p_value(tmp) + gpgme.delete_gpgme_data_t_p(tmp) + + def new_from_cbs(self, read_cb, write_cb, seek_cb, release_cb, hook=None): + tmp = gpgme.new_gpgme_data_t_p() + if hook != None: + hookdata = (weakref.ref(self), + read_cb, write_cb, seek_cb, release_cb, hook) + else: + hookdata = (weakref.ref(self), + read_cb, write_cb, seek_cb, release_cb) + gpgme.gpg_data_new_from_cbs(self, hookdata, tmp) + self.wrapped = gpgme.gpgme_data_t_p_value(tmp) + gpgme.delete_gpgme_data_t_p(tmp) + + def new_from_filepart(self, file, offset, length): + """This wraps the GPGME gpgme_data_new_from_filepart() function. + The argument "file" may be: + + * a string specifying a file name, or + * a file-like object supporting the fileno() and the mode attribute. + + """ + + tmp = gpgme.new_gpgme_data_t_p() + filename = None + fp = None + + if util.is_a_string(file): + filename = file + else: + fp = gpgme.fdopen(file.fileno(), file.mode) + if fp == None: + raise ValueError("Failed to open file from %s arg %s" % \ + (str(type(file)), str(file))) + + errorcheck(gpgme.gpgme_data_new_from_filepart(tmp, filename, fp, + offset, length)) + self.wrapped = gpgme.gpgme_data_t_p_value(tmp) + gpgme.delete_gpgme_data_t_p(tmp) + + def new_from_fd(self, file): + """This wraps the GPGME gpgme_data_new_from_fd() function. The + argument "file" must be a file-like object, supporting the + fileno() method. + + """ + tmp = gpgme.new_gpgme_data_t_p() + errorcheck(gpgme.gpgme_data_new_from_fd(tmp, file.fileno())) + self.wrapped = gpgme.gpgme_data_t_p_value(tmp) + gpgme.delete_gpgme_data_t_p(tmp) + + def new_from_stream(self, file): + """This wrap around gpgme_data_new_from_stream is an alias for + new_from_fd() method since in python there's not difference + between file stream and file descriptor""" + self.new_from_fd(file) + + def write(self, buffer): + """Write buffer given as string or bytes. + + If a string is given, it is implicitly encoded using UTF-8.""" + written = gpgme.gpgme_data_write(self.wrapped, buffer) + if written < 0: + if self._callback_excinfo: + gpgme.gpg_raise_callback_exception(self) + else: + raise GPGMEError.fromSyserror() + return written + + def read(self, size = -1): + """Read at most size bytes, returned as bytes. + + If the size argument is negative or omitted, read until EOF is reached. + + Returns the data read, or the empty string if there was no data + to read before EOF was reached.""" + + if size == 0: + return '' + + if size > 0: + try: + result = gpgme.gpgme_data_read(self.wrapped, size) + except: + if self._callback_excinfo: + gpgme.gpg_raise_callback_exception(self) + else: + raise + return result + else: + chunks = [] + while True: + try: + result = gpgme.gpgme_data_read(self.wrapped, 4096) + except: + if self._callback_excinfo: + gpgme.gpg_raise_callback_exception(self) + else: + raise + if len(result) == 0: + break + chunks.append(result) + return b''.join(chunks) + +def pubkey_algo_name(algo): + return gpgme.gpgme_pubkey_algo_name(algo) + +def hash_algo_name(algo): + return gpgme.gpgme_hash_algo_name(algo) + +def get_protocol_name(proto): + return gpgme.gpgme_get_protocol_name(proto) + +def check_version(version=None): + return gpgme.gpgme_check_version(version) + +# check_version also makes sure that several subsystems are properly +# initialized, and it must be run at least once before invoking any +# other function. We do it here so that the user does not have to do +# it unless she really wants to check for a certain version. +check_version() + +def engine_check_version (proto): + try: + errorcheck(gpgme.gpgme_engine_check_version(proto)) + return True + except errors.GPGMEError: + return False + +def get_engine_info(): + ptr = gpgme.new_gpgme_engine_info_t_p() + try: + errorcheck(gpgme.gpgme_get_engine_info(ptr)) + info = gpgme.gpgme_engine_info_t_p_value(ptr) + except errors.GPGMEError: + info = None + gpgme.delete_gpgme_engine_info_t_p(ptr) + return info + +def set_engine_info(proto, file_name, home_dir=None): + """Changes the default configuration of the crypto engine implementing + the protocol 'proto'. 'file_name' is the file name of + the executable program implementing this protocol. 'home_dir' is the + directory name of the configuration directory (engine's default is + used if omitted).""" + errorcheck(gpgme.gpgme_set_engine_info(proto, file_name, home_dir)) + +def set_locale(category, value): + """Sets the default locale used by contexts""" + errorcheck(gpgme.gpgme_set_locale(None, category, value)) + +def wait(hang): + """Wait for asynchronous call on any Context to finish. + Wait forever if hang is True. + + For finished anynch calls it returns a tuple (status, context): + status - status return by asnynchronous call. + context - context which caused this call to return. + + Please read the GPGME manual of more information.""" + ptr = gpgme.new_gpgme_error_t_p() + context = gpgme.gpgme_wait(None, ptr, hang) + status = gpgme.gpgme_error_t_p_value(ptr) + gpgme.delete_gpgme_error_t_p(ptr) + if context == None: + errorcheck(status) + else: + context = Context(context) + return (status, context) |