aboutsummaryrefslogtreecommitdiffstats
path: root/lang/python/src/core.py
diff options
context:
space:
mode:
Diffstat (limited to 'lang/python/src/core.py')
-rw-r--r--lang/python/src/core.py1795
1 files changed, 0 insertions, 1795 deletions
diff --git a/lang/python/src/core.py b/lang/python/src/core.py
deleted file mode 100644
index 152ea295..00000000
--- a/lang/python/src/core.py
+++ /dev/null
@@ -1,1795 +0,0 @@
-# -*- coding: utf-8 -*-
-
-from __future__ import absolute_import, print_function, unicode_literals
-
-import re
-import os
-import warnings
-import weakref
-
-from . import gpgme
-from .errors import errorcheck, GPGMEError
-from . import constants
-from . import errors
-from . import util
-
-del absolute_import, print_function, unicode_literals
-
-# Copyright (C) 2016-2018 g10 Code GmbH
-# Copyright (C) 2004, 2008 Igor Belyi <[email protected]>
-# Copyright (C) 2002 John Goerzen <[email protected]>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-"""Core functionality
-
-Core functionality of GPGME wrapped in a object-oriented fashion.
-Provides the 'Context' class for performing cryptographic operations,
-and the 'Data' class describing buffers of data.
-
-"""
-
-
-class GpgmeWrapper(object):
- """Base wrapper class
-
- Not to be instantiated directly.
-
- """
-
- def __init__(self, wrapped):
- self._callback_excinfo = None
- self.wrapped = wrapped
-
- def __repr__(self):
- return '<{}/{!r}>'.format(
- super(GpgmeWrapper, self).__repr__(), self.wrapped)
-
- def __str__(self):
- acc = ['{}.{}'.format(__name__, self.__class__.__name__)]
- flags = [f for f in self._boolean_properties if getattr(self, f)]
- if flags:
- acc.append('({})'.format(' '.join(flags)))
-
- return '<{}>'.format(' '.join(acc))
-
- def __hash__(self):
- return hash(repr(self.wrapped))
-
- def __eq__(self, other):
- if other is None:
- return False
- else:
- return repr(self.wrapped) == repr(other.wrapped)
-
- @property
- def _ctype(self):
- """The name of the c type wrapped by this class
-
- Must be set by child classes.
-
- """
- raise NotImplementedError()
-
- @property
- def _cprefix(self):
- """The common prefix of c functions wrapped by this class
-
- Must be set by child classes.
-
- """
- raise NotImplementedError()
-
- def _errorcheck(self, name):
- """Must be implemented by child classes.
-
- This function must return a trueish value for all c functions
- returning gpgme_error_t."""
- raise NotImplementedError()
-
- """The set of all boolean properties"""
- _boolean_properties = set()
-
- def __wrap_boolean_property(self, key, do_set=False, value=None):
- get_func = getattr(gpgme, "{}get_{}".format(self._cprefix, key))
- set_func = getattr(gpgme, "{}set_{}".format(self._cprefix, key))
-
- def get(slf):
- if not slf.wrapped:
- return False
- return bool(get_func(slf.wrapped))
-
- def set_(slf, value):
- if not slf.wrapped:
- return
- set_func(slf.wrapped, bool(value))
-
- p = property(get, set_, doc="{} flag".format(key))
- setattr(self.__class__, key, p)
-
- if do_set:
- set_(self, bool(value))
- else:
- return get(self)
-
- _munge_docstring = re.compile(r'gpgme_([^(]*)\(([^,]*), (.*\) -> .*)')
-
- def __getattr__(self, key):
- """On-the-fly generation of wrapper methods and properties"""
- if key[0] == '_' or self._cprefix is None:
- return None
-
- if key in self._boolean_properties:
- return self.__wrap_boolean_property(key)
-
- name = self._cprefix + key
- func = getattr(gpgme, name)
-
- if self._errorcheck(name):
-
- def _funcwrap(slf, *args):
- if not slf.wrapped:
- return None
- result = func(slf.wrapped, *args)
- if slf._callback_excinfo:
- gpgme.gpg_raise_callback_exception(slf)
- return errorcheck(result, name)
- else:
-
- def _funcwrap(slf, *args):
- if not slf.wrapped:
- return None
- result = func(slf.wrapped, *args)
- if slf._callback_excinfo:
- gpgme.gpg_raise_callback_exception(slf)
- return result
-
- doc_orig = getattr(func, "__doc__")
- if doc_orig:
- doc = self._munge_docstring.sub(r'\2.\1(\3', doc_orig)
- else:
- doc = None
-
- _funcwrap.__doc__ = doc
-
- # Monkey-patch the class.
- setattr(self.__class__, key, _funcwrap)
-
- # Bind the method to 'self'.
- def wrapper(*args):
- return _funcwrap(self, *args)
-
- wrapper.__doc__ = doc
-
- return wrapper
-
- def __setattr__(self, key, value):
- """On-the-fly generation of properties"""
- if key in self._boolean_properties:
- self.__wrap_boolean_property(key, True, value)
- else:
- super(GpgmeWrapper, self).__setattr__(key, value)
-
-
-class Context(GpgmeWrapper):
- """Context for cryptographic operations
-
- All cryptographic operations in GPGME are performed within a
- context, which contains the internal state of the operation as
- well as configuration parameters. By using several contexts you
- can run several cryptographic operations in parallel, with
- different configuration.
-
- Access to a context must be synchronized.
-
- """
-
- def __init__(self,
- armor=False,
- textmode=False,
- offline=False,
- signers=[],
- pinentry_mode=constants.PINENTRY_MODE_DEFAULT,
- protocol=constants.PROTOCOL_OpenPGP,
- wrapped=None,
- home_dir=None):
- """Construct a context object
-
- Keyword arguments:
- armor -- enable ASCII armoring (default False)
- textmode -- enable canonical text mode (default False)
- offline -- do not contact external key sources (default False)
- signers -- list of keys used for signing (default [])
- pinentry_mode -- pinentry mode (default PINENTRY_MODE_DEFAULT)
- protocol -- protocol to use (default PROTOCOL_OpenPGP)
- home_dir -- state directory (default is the engine default)
-
- """
- if wrapped:
- self.own = False
- else:
- tmp = gpgme.new_gpgme_ctx_t_p()
- errorcheck(gpgme.gpgme_new(tmp))
- wrapped = gpgme.gpgme_ctx_t_p_value(tmp)
- gpgme.delete_gpgme_ctx_t_p(tmp)
- self.own = True
- super(Context, self).__init__(wrapped)
- self.armor = armor
- self.textmode = textmode
- self.offline = offline
- self.signers = signers
- self.pinentry_mode = pinentry_mode
- self.protocol = protocol
- self.home_dir = home_dir
-
- def __read__(self, sink, data):
- """Read helper
-
- Helper function to retrieve the results of an operation, or
- None if SINK is given.
- """
- if sink or data is None:
- return None
- data.seek(0, os.SEEK_SET)
- return data.read()
-
- def __repr__(self):
- return ("Context(armor={0.armor}, "
- "textmode={0.textmode}, offline={0.offline}, "
- "signers={0.signers}, pinentry_mode={0.pinentry_mode}, "
- "protocol={0.protocol}, home_dir={0.home_dir}"
- ")").format(self)
-
- def encrypt(self,
- plaintext,
- recipients=[],
- sign=True,
- sink=None,
- passphrase=None,
- always_trust=False,
- add_encrypt_to=False,
- prepare=False,
- expect_sign=False,
- compress=True):
- """Encrypt data
-
- Encrypt the given plaintext for the given recipients. If the
- list of recipients is empty, the data is encrypted
- symmetrically with a passphrase.
-
- The passphrase can be given as parameter, using a callback
- registered at the context, or out-of-band via pinentry.
-
- Keyword arguments:
- recipients -- list of keys to encrypt to
- sign -- sign plaintext (default True)
- sink -- write result to sink instead of returning it
- passphrase -- for symmetric encryption
- always_trust -- always trust the keys (default False)
- add_encrypt_to -- encrypt to configured additional keys (default False)
- prepare -- (ui) prepare for encryption (default False)
- expect_sign -- (ui) prepare for signing (default False)
- compress -- compress plaintext (default True)
-
- Returns:
- ciphertext -- the encrypted data (or None if sink is given)
- result -- additional information about the encryption
- sign_result -- additional information about the signature(s)
-
- Raises:
- InvalidRecipients -- if encryption using a particular key failed
- InvalidSigners -- if signing using a particular key failed
- GPGMEError -- as signaled by the underlying library
-
- """
- ciphertext = sink if sink else Data()
- flags = 0
- flags |= always_trust * constants.ENCRYPT_ALWAYS_TRUST
- flags |= (not add_encrypt_to) * constants.ENCRYPT_NO_ENCRYPT_TO
- flags |= prepare * constants.ENCRYPT_PREPARE
- flags |= expect_sign * constants.ENCRYPT_EXPECT_SIGN
- flags |= (not compress) * constants.ENCRYPT_NO_COMPRESS
-
- if passphrase is not None:
- old_pinentry_mode = self.pinentry_mode
- old_passphrase_cb = getattr(self, '_passphrase_cb', None)
- self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
-
- def passphrase_cb(hint, desc, prev_bad, hook=None):
- return passphrase
-
- self.set_passphrase_cb(passphrase_cb)
-
- try:
- if sign:
- self.op_encrypt_sign(recipients, flags, plaintext, ciphertext)
- else:
- self.op_encrypt(recipients, flags, plaintext, ciphertext)
- except errors.GPGMEError as e:
- result = self.op_encrypt_result()
- sig_result = self.op_sign_result() if sign else None
- results = (self.__read__(sink, ciphertext), result, sig_result)
- if e.getcode() == errors.UNUSABLE_PUBKEY:
- if result.invalid_recipients:
- raise errors.InvalidRecipients(
- result.invalid_recipients,
- error=e.error,
- results=results)
- if e.getcode() == errors.UNUSABLE_SECKEY:
- sig_result = self.op_sign_result()
- if sig_result.invalid_signers:
- raise errors.InvalidSigners(
- sig_result.invalid_signers,
- error=e.error,
- results=results)
- # Otherwise, just raise the error, but attach the results
- # first.
- e.results = results
- raise e
- finally:
- if passphrase is not None:
- self.pinentry_mode = old_pinentry_mode
- gpgme.gpg_set_passphrase_cb(self, old_passphrase_cb)
-
- result = self.op_encrypt_result()
- assert not result.invalid_recipients
- sig_result = self.op_sign_result() if sign else None
- assert not sig_result or not sig_result.invalid_signers
-
- return self.__read__(sink, ciphertext), result, sig_result
-
- def decrypt(self, ciphertext, sink=None, passphrase=None, verify=True, filter_signatures=True):
- """Decrypt data
-
- Decrypt the given ciphertext and verify any signatures. If
- VERIFY is an iterable of keys, the ciphertext must be signed
- by all those keys, otherwise a MissingSignatures error is
- raised. Note: if VERIFY is an empty iterable, that is treated
- the same as passing verify=True (that is, verify signatures
- and return data about any valid signatures found, but no
- signatures are required and no MissingSignatures error will be
- raised).
-
- The filter_signatures argument can be used to force this
- function to return signatures that are not fully trusted - for
- example because they were made by unknown keys.
-
- If the ciphertext is symmetrically encrypted using a
- passphrase, that passphrase can be given as parameter, using a
- callback registered at the context, or out-of-band via
- pinentry.
-
- Keyword arguments:
- sink -- write result to sink instead of returning it
- passphrase -- for symmetric decryption
- verify -- check signatures (boolean or iterable of keys,
- see above) (default True)
- filter_signatures -- if this function should filter out signatures
- that are not completely OK (default True)
-
- Returns:
- plaintext -- the decrypted data (or None if sink is given)
- result -- additional information about the decryption
- verify_result -- additional information about the valid
- signature(s) found
-
- Raises:
- UnsupportedAlgorithm -- if an unsupported algorithm was used
- MissingSignatures -- if expected signatures are missing or bad
- GPGMEError -- as signaled by the underlying library
-
- """
- do_sig_verification = False
- required_keys = None
- plaintext = sink if sink else Data()
-
- if passphrase is not None:
- old_pinentry_mode = self.pinentry_mode
- old_passphrase_cb = getattr(self, '_passphrase_cb', None)
- self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
-
- def passphrase_cb(hint, desc, prev_bad, hook=None):
- return passphrase
-
- self.set_passphrase_cb(passphrase_cb)
-
- try:
- if isinstance(verify, bool):
- do_sig_verification = verify
- elif verify is None:
- warnings.warn(
- "ctx.decrypt called with verify=None, should be bool or iterable (treating as False).",
- category=DeprecationWarning)
- do_sig_verification = False
- else:
- # we hope this is an iterable:
- required_keys = verify
- do_sig_verification = True
-
- if do_sig_verification:
- self.op_decrypt_verify(ciphertext, plaintext)
- else:
- self.op_decrypt(ciphertext, plaintext)
- except errors.GPGMEError as e:
- result = self.op_decrypt_result()
- if do_sig_verification:
- verify_result = self.op_verify_result()
- else:
- verify_result = None
- # Just raise the error, but attach the results first.
- e.results = (self.__read__(sink, plaintext), result, verify_result)
- raise e
- finally:
- if passphrase is not None:
- self.pinentry_mode = old_pinentry_mode
- gpgme.gpg_set_passphrase_cb(self, old_passphrase_cb)
-
- result = self.op_decrypt_result()
-
- if do_sig_verification:
- verify_result = self.op_verify_result()
- else:
- verify_result = None
-
- results = (self.__read__(sink, plaintext), result, verify_result)
-
- if result.unsupported_algorithm:
- raise errors.UnsupportedAlgorithm(result.unsupported_algorithm,
- results=results)
-
- if do_sig_verification:
- if filter_signatures:
- verify_result.signatures = list(filter(lambda s: s.status == errors.NO_ERROR, verify_result.signatures))
- if required_keys is not None:
- missing = []
- for key in required_keys:
- ok = False
- for subkey in key.subkeys:
- for sig in verify_result.signatures:
- if sig.summary & constants.SIGSUM_VALID == 0:
- continue
- if subkey.can_sign and subkey.fpr == sig.fpr:
- ok = True
- break
- if ok:
- break
- if not ok:
- missing.append(key)
- if missing:
- raise errors.MissingSignatures(verify_result, missing,
- results=results)
-
- return results
-
- def sign(self, data, sink=None, mode=constants.SIG_MODE_NORMAL):
- """Sign data
-
- Sign the given data with either the configured default local
- key, or the 'signers' keys of this context.
-
- Keyword arguments:
- mode -- signature mode (default: normal, see below)
- sink -- write result to sink instead of returning it
-
- Returns:
- either
- signed_data -- encoded data and signature (normal mode)
- signature -- only the signature data (detached mode)
- cleartext -- data and signature as text (cleartext mode)
- (or None if sink is given)
- result -- additional information about the signature(s)
-
- Raises:
- InvalidSigners -- if signing using a particular key failed
- GPGMEError -- as signaled by the underlying library
-
- """
- signeddata = sink if sink else Data()
-
- try:
- self.op_sign(data, signeddata, mode)
- except errors.GPGMEError as e:
- results = (self.__read__(sink, signeddata), self.op_sign_result())
- if e.getcode() == errors.UNUSABLE_SECKEY:
- if results[1].invalid_signers:
- raise errors.InvalidSigners(
- results[1].invalid_signers,
- error=e.error,
- results=results)
- e.results = results
- raise e
-
- result = self.op_sign_result()
- assert not result.invalid_signers
-
- return self.__read__(sink, signeddata), result
-
- def verify(self, signed_data, signature=None, sink=None, verify=[]):
- """Verify signatures
-
- Verify signatures over data. If VERIFY is an iterable of
- keys, the ciphertext must be signed by all those keys,
- otherwise an error is raised.
-
- Keyword arguments:
- signature -- detached signature data
- sink -- write result to sink instead of returning it
-
- Returns:
- data -- the plain data
- (or None if sink is given, or we verified a detached signature)
- result -- additional information about the signature(s)
-
- Raises:
- BadSignatures -- if a bad signature is encountered
- MissingSignatures -- if expected signatures are missing or bad
- GPGMEError -- as signaled by the underlying library
-
- """
- if signature:
- # Detached signature, we don't return the plain text.
- data = None
- else:
- data = sink if sink else Data()
-
- try:
- if signature:
- self.op_verify(signature, signed_data, None)
- else:
- self.op_verify(signed_data, None, data)
- except errors.GPGMEError as e:
- # Just raise the error, but attach the results first.
- e.results = (self.__read__(sink, data), self.op_verify_result())
- raise e
-
- results = (self.__read__(sink, data), self.op_verify_result())
- if any(s.status != errors.NO_ERROR for s in results[1].signatures):
- raise errors.BadSignatures(results[1], results=results)
-
- missing = list()
- for key in verify:
- ok = False
- for subkey in key.subkeys:
- for sig in results[1].signatures:
- if sig.summary & constants.SIGSUM_VALID == 0:
- continue
- if subkey.can_sign and subkey.fpr == sig.fpr:
- ok = True
- break
- if ok:
- break
- if not ok:
- missing.append(key)
- if missing:
- raise errors.MissingSignatures(
- results[1], missing, results=results)
-
- return results
-
- def key_import(self, data):
- """Import data
-
- Imports the given data into the Context.
-
- Returns:
- -- an object describing the results of imported or updated
- keys
-
- Raises:
- TypeError -- Very rarely.
- GPGMEError -- as signaled by the underlying library:
-
- Import status errors, when they occur, will usually
- be of NODATA. NO_PUBKEY indicates something
- managed to run the function without any
- arguments, while an argument of None triggers
- the first NODATA of errors.GPGME in the
- exception.
- """
- try:
- self.op_import(data)
- result = self.op_import_result()
- if result.considered == 0:
- status = constants.STATUS_IMPORT_PROBLEM
- else:
- status = constants.STATUS_KEY_CONSIDERED
- except Exception as e:
- if e == errors.GPGMEError:
- if e.code_str == "No data":
- status = constants.STATUS_NODATA
- else:
- status = constants.STATUS_FILE_ERROR
- elif e == TypeError and hasattr(data, "decode") is True:
- status = constants.STATUS_NO_PUBKEY
- elif e == TypeError and hasattr(data, "encode") is True:
- status = constants.STATUS_FILE_ERROR
- else:
- status = constants.STATUS_ERROR
-
- if status == constants.STATUS_KEY_CONSIDERED:
- import_result = result
- else:
- import_result = status
-
- return import_result
-
- def key_export(self, pattern=None):
- """Export keys.
-
- Exports public keys matching the pattern specified. If no
- pattern is specified then exports all available keys.
-
- Keyword arguments:
- pattern -- return keys matching pattern (default: all keys)
-
- Returns:
- -- A key block containing one or more OpenPGP keys in
- either ASCII armoured or binary format as determined
- by the Context(). If there are no matching keys it
- returns None.
-
- Raises:
- GPGMEError -- as signaled by the underlying library.
- """
- data = Data()
- mode = 0
- try:
- self.op_export(pattern, mode, data)
- data.seek(0, os.SEEK_SET)
- pk_result = data.read()
- except GPGMEError as e:
- raise e
-
- if len(pk_result) > 0:
- result = pk_result
- else:
- result = None
-
- return result
-
- def key_export_minimal(self, pattern=None):
- """Export keys.
-
- Exports public keys matching the pattern specified in a
- minimised format. If no pattern is specified then exports all
- available keys.
-
- Keyword arguments:
- pattern -- return keys matching pattern (default: all keys)
-
- Returns:
- -- A key block containing one or more minimised OpenPGP
- keys in either ASCII armoured or binary format as
- determined by the Context(). If there are no matching
- keys it returns None.
-
- Raises:
- GPGMEError -- as signaled by the underlying library.
- """
- data = Data()
- mode = gpgme.GPGME_EXPORT_MODE_MINIMAL
- try:
- self.op_export(pattern, mode, data)
- data.seek(0, os.SEEK_SET)
- pk_result = data.read()
- except GPGMEError as e:
- raise e
-
- if len(pk_result) > 0:
- result = pk_result
- else:
- result = None
-
- return result
-
- def key_export_secret(self, pattern=None):
- """Export secret keys.
-
- Exports secret keys matching the pattern specified. If no
- pattern is specified then exports or attempts to export all
- available secret keys.
-
- IMPORTANT: Each secret key to be exported will prompt for its
- passphrase via an invocation of pinentry and gpg-agent. If the
- passphrase is not entered or does not match then no data will be
- exported. This is the same result as when specifying a pattern
- that is not matched by the available keys.
-
- Keyword arguments:
- pattern -- return keys matching pattern (default: all keys)
-
- Returns:
- -- On success a key block containing one or more OpenPGP
- secret keys in either ASCII armoured or binary format
- as determined by the Context().
- -- On failure while not raising an exception, returns None.
-
- Raises:
- GPGMEError -- as signaled by the underlying library.
- """
- data = Data()
- mode = gpgme.GPGME_EXPORT_MODE_SECRET
- try:
- self.op_export(pattern, mode, data)
- data.seek(0, os.SEEK_SET)
- sk_result = data.read()
- except GPGMEError as e:
- raise e
-
- if len(sk_result) > 0:
- result = sk_result
- else:
- result = None
-
- return result
-
- def keylist(self,
- pattern=None,
- secret=False,
- mode=constants.keylist.mode.LOCAL,
- source=None):
- """List keys
-
- Keyword arguments:
- pattern -- return keys matching pattern (default: all keys)
- secret -- return only secret keys (default: False)
- mode -- keylist mode (default: list local keys)
- source -- read keys from source instead from the keyring
- (all other options are ignored in this case)
-
- Returns:
- -- an iterator returning key objects
-
- Raises:
- GPGMEError -- as signaled by the underlying library
- """
- if not source:
- self.set_keylist_mode(mode)
- self.op_keylist_start(pattern, secret)
- else:
- # Automatic wrapping of SOURCE is not possible here,
- # because the object must not be deallocated until the
- # iteration over the results ends.
- if not isinstance(source, Data):
- source = Data(file=source)
- self.op_keylist_from_data_start(source, 0)
-
- key = self.op_keylist_next()
- while key:
- yield key
- key = self.op_keylist_next()
- self.op_keylist_end()
-
- def create_key(self,
- userid,
- algorithm=None,
- expires_in=0,
- expires=True,
- sign=False,
- encrypt=False,
- certify=False,
- authenticate=False,
- passphrase=None,
- force=False):
- """Create a primary key
-
- Create a primary key for the user id USERID.
-
- ALGORITHM may be used to specify the public key encryption
- algorithm for the new key. By default, a reasonable default
- is chosen. You may use "future-default" to select an
- algorithm that will be the default in a future implementation
- of the engine. ALGORITHM may be a string like "rsa", or
- "rsa2048" to explicitly request an algorithm and a key size.
-
- EXPIRES_IN specifies the expiration time of the key in number
- of seconds since the keys creation. By default, a reasonable
- expiration time is chosen. If you want to create a key that
- does not expire, use the keyword argument EXPIRES.
-
- SIGN, ENCRYPT, CERTIFY, and AUTHENTICATE can be used to
- request the capabilities of the new key. If you don't request
- any, a reasonable set of capabilities is selected, and in case
- of OpenPGP, a subkey with a reasonable set of capabilities is
- created.
-
- If PASSPHRASE is None (the default), then the key will not be
- protected with a passphrase. If PASSPHRASE is a string, it
- will be used to protect the key. If PASSPHRASE is True, the
- passphrase must be supplied using a passphrase callback or
- out-of-band with a pinentry.
-
- Keyword arguments:
- algorithm -- public key algorithm, see above (default: reasonable)
- expires_in -- expiration time in seconds (default: reasonable)
- expires -- whether the key should expire (default: True)
- sign -- request the signing capability (see above)
- encrypt -- request the encryption capability (see above)
- certify -- request the certification capability (see above)
- authenticate -- request the authentication capability (see above)
- passphrase -- protect the key with a passphrase (default: no
- passphrase)
- force -- force key creation even if a key with the same userid
- exists (default: False)
-
- Returns:
- -- an object describing the result of the key creation
-
- Raises:
- GPGMEError -- as signaled by the underlying library
-
- """
- if util.is_a_string(passphrase):
- old_pinentry_mode = self.pinentry_mode
- old_passphrase_cb = getattr(self, '_passphrase_cb', None)
- self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
-
- def passphrase_cb(hint, desc, prev_bad, hook=None):
- return passphrase
-
- self.set_passphrase_cb(passphrase_cb)
-
- try:
- self.op_createkey(
- userid,
- algorithm,
- 0, # reserved
- expires_in,
- None, # extrakey
- ((constants.create.SIGN if sign else 0) |
- (constants.create.ENCR if encrypt else 0) |
- (constants.create.CERT if certify else 0) |
- (constants.create.AUTH if authenticate else 0) |
- (constants.create.NOPASSWD if passphrase is None else 0) |
- (0 if expires else constants.create.NOEXPIRE) |
- (constants.create.FORCE if force else 0)))
- finally:
- if util.is_a_string(passphrase):
- self.pinentry_mode = old_pinentry_mode
- gpgme.gpg_set_passphrase_cb(self, old_passphrase_cb)
-
- return self.op_genkey_result()
-
- def create_subkey(self,
- key,
- algorithm=None,
- expires_in=0,
- expires=True,
- sign=False,
- encrypt=False,
- authenticate=False,
- passphrase=None):
- """Create a subkey
-
- Create a subkey for the given KEY. As subkeys are a concept
- of OpenPGP, calling this is only valid for the OpenPGP
- protocol.
-
- ALGORITHM may be used to specify the public key encryption
- algorithm for the new subkey. By default, a reasonable
- default is chosen. You may use "future-default" to select an
- algorithm that will be the default in a future implementation
- of the engine. ALGORITHM may be a string like "rsa", or
- "rsa2048" to explicitly request an algorithm and a key size.
-
- EXPIRES_IN specifies the expiration time of the subkey in
- number of seconds since the subkeys creation. By default, a
- reasonable expiration time is chosen. If you want to create a
- subkey that does not expire, use the keyword argument EXPIRES.
-
- SIGN, ENCRYPT, and AUTHENTICATE can be used to request the
- capabilities of the new subkey. If you don't request any, an
- encryption subkey is generated.
-
- If PASSPHRASE is None (the default), then the subkey will not
- be protected with a passphrase. If PASSPHRASE is a string, it
- will be used to protect the subkey. If PASSPHRASE is True,
- the passphrase must be supplied using a passphrase callback or
- out-of-band with a pinentry.
-
- Keyword arguments:
- algorithm -- public key algorithm, see above (default: reasonable)
- expires_in -- expiration time in seconds (default: reasonable)
- expires -- whether the subkey should expire (default: True)
- sign -- request the signing capability (see above)
- encrypt -- request the encryption capability (see above)
- authenticate -- request the authentication capability (see above)
- passphrase -- protect the subkey with a passphrase (default: no
- passphrase)
-
- Returns:
- -- an object describing the result of the subkey creation
-
- Raises:
- GPGMEError -- as signaled by the underlying library
-
- """
- if util.is_a_string(passphrase):
- old_pinentry_mode = self.pinentry_mode
- old_passphrase_cb = getattr(self, '_passphrase_cb', None)
- self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
-
- def passphrase_cb(hint, desc, prev_bad, hook=None):
- return passphrase
-
- self.set_passphrase_cb(passphrase_cb)
-
- try:
- self.op_createsubkey(
- key,
- algorithm,
- 0, # reserved
- expires_in,
- ((constants.create.SIGN if sign else 0) |
- (constants.create.ENCR if encrypt else 0) |
- (constants.create.AUTH if authenticate else 0) |
- (constants.create.NOPASSWD if passphrase is None else 0) |
- (0 if expires else constants.create.NOEXPIRE)))
- finally:
- if util.is_a_string(passphrase):
- self.pinentry_mode = old_pinentry_mode
- gpgme.gpg_set_passphrase_cb(self, old_passphrase_cb)
-
- return self.op_genkey_result()
-
- def key_add_uid(self, key, uid):
- """Add a UID
-
- Add the uid UID to the given KEY. Calling this function is
- only valid for the OpenPGP protocol.
-
- Raises:
- GPGMEError -- as signaled by the underlying library
-
- """
- self.op_adduid(key, uid, 0)
-
- def key_revoke_uid(self, key, uid):
- """Revoke a UID
-
- Revoke the uid UID from the given KEY. Calling this function
- is only valid for the OpenPGP protocol.
-
- Raises:
- GPGMEError -- as signaled by the underlying library
-
- """
- self.op_revuid(key, uid, 0)
-
- def key_sign(self, key, uids=None, expires_in=False, local=False):
- """Sign a key
-
- Sign a key with the current set of signing keys. Calling this
- function is only valid for the OpenPGP protocol.
-
- If UIDS is None (the default), then all UIDs are signed. If
- it is a string, then only the matching UID is signed. If it
- is a list of strings, then all matching UIDs are signed. Note
- that a case-sensitive exact string comparison is done.
-
- EXPIRES_IN specifies the expiration time of the signature in
- seconds. If EXPIRES_IN is False, the signature does not
- expire.
-
- Keyword arguments:
- uids -- user ids to sign, see above (default: sign all)
- expires_in -- validity period of the signature in seconds
- (default: do not expire)
- local -- create a local, non-exportable signature
- (default: False)
-
- Raises:
- GPGMEError -- as signaled by the underlying library
-
- """
- flags = 0
- if uids is None or util.is_a_string(uids):
- pass # through unchanged
- else:
- flags |= constants.keysign.LFSEP
- uids = "\n".join(uids)
-
- if not expires_in:
- flags |= constants.keysign.NOEXPIRE
-
- if local:
- flags |= constants.keysign.LOCAL
-
- self.op_keysign(key, uids, expires_in, flags)
-
- def key_tofu_policy(self, key, policy):
- """Set a keys' TOFU policy
-
- Set the TOFU policy associated with KEY to POLICY. Calling
- this function is only valid for the OpenPGP protocol.
-
- Raises:
- GPGMEError -- as signaled by the underlying library
-
- """
- self.op_tofu_policy(key, policy)
-
- def assuan_transact(self,
- command,
- data_cb=None,
- inquire_cb=None,
- status_cb=None):
- """Issue a raw assuan command
-
- This function can be used to issue a raw assuan command to the
- engine.
-
- If command is a string or bytes, it will be used as-is. If it
- is an iterable of strings, it will be properly escaped and
- joined into a well-formed assuan command.
-
- Keyword arguments:
- data_cb -- a callback receiving data lines
- inquire_cb -- a callback providing more information
- status_cb -- a callback receiving status lines
-
- Returns:
- result -- the result of command as GPGMEError
-
- Raises:
- GPGMEError -- as signaled by the underlying library
-
- """
-
- if util.is_a_string(command) or isinstance(command, bytes):
- cmd = command
- else:
- cmd = " ".join(util.percent_escape(f) for f in command)
-
- errptr = gpgme.new_gpgme_error_t_p()
-
- err = gpgme.gpgme_op_assuan_transact_ext(
- self.wrapped, cmd, (weakref.ref(self), data_cb)
- if data_cb else None, (weakref.ref(self), inquire_cb)
- if inquire_cb else None, (weakref.ref(self), status_cb)
- if status_cb else None, errptr)
-
- if self._callback_excinfo:
- gpgme.gpg_raise_callback_exception(self)
-
- errorcheck(err)
-
- status = gpgme.gpgme_error_t_p_value(errptr)
- gpgme.delete_gpgme_error_t_p(errptr)
-
- return GPGMEError(status) if status != 0 else None
-
- def interact(self, key, func, sink=None, flags=0, fnc_value=None):
- """Interact with the engine
-
- This method can be used to edit keys and cards interactively.
- KEY is the key to edit, FUNC is called repeatedly with two
- unicode arguments, 'keyword' and 'args'. See the GPGME manual
- for details.
-
- Keyword arguments:
- sink -- if given, additional output is written here
- flags -- use constants.INTERACT_CARD to edit a card
-
- Raises:
- GPGMEError -- as signaled by the underlying library
-
- """
- if key is None:
- raise ValueError("First argument cannot be None")
-
- if sink is None:
- sink = Data()
-
- if fnc_value:
- opaquedata = (weakref.ref(self), func, fnc_value)
- else:
- opaquedata = (weakref.ref(self), func)
-
- result = gpgme.gpgme_op_interact(self.wrapped, key, flags, opaquedata,
- sink)
- if self._callback_excinfo:
- gpgme.gpg_raise_callback_exception(self)
- errorcheck(result)
-
- @property
- def signers(self):
- """Keys used for signing"""
- if not self.wrapped:
- return None
- return [self.signers_enum(i) for i in range(self.signers_count())]
-
- @signers.setter
- def signers(self, signers):
- old = self.signers
- self.signers_clear()
- try:
- for key in signers:
- self.signers_add(key)
- except:
- self.signers = old
- raise
-
- @property
- def pinentry_mode(self):
- """Pinentry mode"""
- return self.get_pinentry_mode()
-
- @pinentry_mode.setter
- def pinentry_mode(self, value):
- self.set_pinentry_mode(value)
-
- @property
- def protocol(self):
- """Protocol to use"""
- return self.get_protocol()
-
- @protocol.setter
- def protocol(self, value):
- errorcheck(gpgme.gpgme_engine_check_version(value))
- self.set_protocol(value)
-
- @property
- def home_dir(self):
- """Engine's home directory"""
- if not self.wrapped:
- return None
- return self.engine_info.home_dir
-
- @home_dir.setter
- def home_dir(self, value):
- self.set_engine_info(self.protocol, home_dir=value)
-
- _ctype = 'gpgme_ctx_t'
- _cprefix = 'gpgme_'
-
- def _errorcheck(self, name):
- """This function should list all functions returning gpgme_error_t"""
- # The list of functions is created using:
- #
- # $ grep '^gpgme_error_t ' obj/lang/python/python3.5-gpg/gpgme.h \
- # | grep -v _op_ | awk "/\(gpgme_ctx/ { printf (\"'%s',\\n\", \$2) } "
- return ((name.startswith('gpgme_op_') and not
- name.endswith('_result')) or name in {
- 'gpgme_new', 'gpgme_set_ctx_flag', 'gpgme_set_protocol',
- 'gpgme_set_sub_protocol', 'gpgme_set_keylist_mode',
- 'gpgme_set_pinentry_mode', 'gpgme_set_locale',
- 'gpgme_ctx_set_engine_info', 'gpgme_signers_add',
- 'gpgme_sig_notation_add', 'gpgme_set_sender',
- 'gpgme_cancel', 'gpgme_cancel_async', 'gpgme_get_key',
- 'gpgme_get_sig_key',
- })
-
- _boolean_properties = {'armor', 'textmode', 'offline'}
-
- def __del__(self):
- if not gpgme:
- # At interpreter shutdown, gpgme is set to NONE.
- return
-
- self._free_passcb()
- self._free_progresscb()
- self._free_statuscb()
- if self.own and self.wrapped and gpgme.gpgme_release:
- gpgme.gpgme_release(self.wrapped)
- self.wrapped = None
-
- # Implement the context manager protocol.
- def __enter__(self):
- return self
-
- def __exit__(self, type, value, tb):
- return False
-
- def op_keylist_all(self, *args, **kwargs):
- self.op_keylist_start(*args, **kwargs)
- key = self.op_keylist_next()
- while key:
- yield key
- key = self.op_keylist_next()
- self.op_keylist_end()
-
- def op_keylist_next(self):
- """Returns the next key in the list created
- by a call to op_keylist_start(). The object returned
- is of type Key."""
- ptr = gpgme.new_gpgme_key_t_p()
- try:
- errorcheck(gpgme.gpgme_op_keylist_next(self.wrapped, ptr))
- key = gpgme.gpgme_key_t_p_value(ptr)
- except errors.GPGMEError as excp:
- key = None
- if excp.getcode() != errors.EOF:
- raise excp
- gpgme.delete_gpgme_key_t_p(ptr)
- if key:
- key.__del__ = lambda self: gpgme.gpgme_key_unref(self)
- return key
-
- def get_key(self, fpr, secret=False):
- """Get a key given a fingerprint
-
- Keyword arguments:
- secret -- to request a secret key
-
- Returns:
- -- the matching key
-
- Raises:
- KeyError -- if the key was not found
- GPGMEError -- as signaled by the underlying library
-
- """
- ptr = gpgme.new_gpgme_key_t_p()
-
- try:
- errorcheck(gpgme.gpgme_get_key(self.wrapped, fpr, ptr, secret))
- except errors.GPGMEError as e:
- if e.getcode() == errors.EOF:
- raise errors.KeyNotFound(fpr)
- raise e
-
- key = gpgme.gpgme_key_t_p_value(ptr)
- gpgme.delete_gpgme_key_t_p(ptr)
- assert key
- key.__del__ = lambda self: gpgme.gpgme_key_unref(self)
- return key
-
- def op_trustlist_all(self, *args, **kwargs):
- self.op_trustlist_start(*args, **kwargs)
- trust = self.op_trustlist_next()
- while trust:
- yield trust
- trust = self.op_trustlist_next()
- self.op_trustlist_end()
-
- def op_trustlist_next(self):
- """Returns the next trust item in the list created
- by a call to op_trustlist_start(). The object returned
- is of type TrustItem."""
- ptr = gpgme.new_gpgme_trust_item_t_p()
- try:
- errorcheck(gpgme.gpgme_op_trustlist_next(self.wrapped, ptr))
- trust = gpgme.gpgme_trust_item_t_p_value(ptr)
- except errors.GPGMEError as excp:
- trust = None
- if excp.getcode() != errors.EOF:
- raise
- gpgme.delete_gpgme_trust_item_t_p(ptr)
- return trust
-
- def set_passphrase_cb(self, func, hook=None):
- """Sets the passphrase callback to the function specified by func.
-
- When the system needs a passphrase, it will call func with three args:
- hint, a string describing the key it needs the passphrase for;
- desc, a string describing the passphrase it needs;
- prev_bad, a boolean equal True if this is a call made after
- unsuccessful previous attempt.
-
- If hook has a value other than None it will be passed into the func
- as a forth argument.
-
- Please see the GPGME manual for more information.
- """
- if func is None:
- hookdata = None
- else:
- if hook is None:
- hookdata = (weakref.ref(self), func)
- else:
- hookdata = (weakref.ref(self), func, hook)
- gpgme.gpg_set_passphrase_cb(self, hookdata)
-
- def _free_passcb(self):
- if gpgme.gpg_set_passphrase_cb:
- self.set_passphrase_cb(None)
-
- def set_progress_cb(self, func, hook=None):
- """Sets the progress meter callback to the function specified by FUNC.
- If FUNC is None, the callback will be cleared.
-
- This function will be called to provide an interactive update
- of the system's progress. The function will be called with
- three arguments, type, total, and current. If HOOK is not
- None, it will be supplied as fourth argument.
-
- Please see the GPGME manual for more information.
-
- """
- if func is None:
- hookdata = None
- else:
- if hook is None:
- hookdata = (weakref.ref(self), func)
- else:
- hookdata = (weakref.ref(self), func, hook)
- gpgme.gpg_set_progress_cb(self, hookdata)
-
- def _free_progresscb(self):
- if gpgme.gpg_set_progress_cb:
- self.set_progress_cb(None)
-
- def set_status_cb(self, func, hook=None):
- """Sets the status callback to the function specified by FUNC. If
- FUNC is None, the callback will be cleared.
-
- The function will be called with two arguments, keyword and
- args. If HOOK is not None, it will be supplied as third
- argument.
-
- Please see the GPGME manual for more information.
-
- """
- if func is None:
- hookdata = None
- else:
- if hook is None:
- hookdata = (weakref.ref(self), func)
- else:
- hookdata = (weakref.ref(self), func, hook)
- gpgme.gpg_set_status_cb(self, hookdata)
-
- def _free_statuscb(self):
- if gpgme.gpg_set_status_cb:
- self.set_status_cb(None)
-
- @property
- def engine_info(self):
- """Configuration of the engine currently in use"""
- p = self.protocol
- infos = [i for i in self.get_engine_info() if i.protocol == p]
- assert len(infos) == 1
- return infos[0]
-
- def get_engine_info(self):
- """Get engine configuration
-
- Returns information about all configured and installed
- engines.
-
- Returns:
- infos -- a list of engine infos
-
- """
- return gpgme.gpgme_ctx_get_engine_info(self.wrapped)
-
- def set_engine_info(self, proto, file_name=None, home_dir=None):
- """Change engine configuration
-
- Changes the configuration of the crypto engine implementing
- the protocol 'proto' for the context.
-
- Keyword arguments:
- file_name -- engine program file name (unchanged if None)
- home_dir -- configuration directory (unchanged if None)
-
- """
- self.ctx_set_engine_info(proto, file_name, home_dir)
-
- def wait(self, hang):
- """Wait for asynchronous call to finish. Wait forever if hang is True.
- Raises an exception on errors.
-
- Please read the GPGME manual for more information.
-
- """
- ptr = gpgme.new_gpgme_error_t_p()
- gpgme.gpgme_wait(self.wrapped, ptr, hang)
- status = gpgme.gpgme_error_t_p_value(ptr)
- gpgme.delete_gpgme_error_t_p(ptr)
- errorcheck(status)
-
- def op_edit(self, key, func, fnc_value, out):
- """Start key editing using supplied callback function
-
- Note: This interface is deprecated and will be removed with
- GPGME 1.8. Please use .interact instead. Furthermore, we
- implement this using gpgme_op_interact, so callbacks will get
- called with string keywords instead of numeric status
- messages. Code that is using constants.STATUS_X or
- constants.status.X will continue to work, whereas code using
- magic numbers will break as a result.
-
- """
- warnings.warn(
- "Call to deprecated method op_edit.", category=DeprecationWarning)
- return self.interact(key, func, sink=out, fnc_value=fnc_value)
-
-
-class Data(GpgmeWrapper):
- """Data buffer
-
- A lot of data has to be exchanged between the user and the crypto
- engine, like plaintext messages, ciphertext, signatures and
- information about the keys. The technical details about
- exchanging the data information are completely abstracted by
- GPGME. The user provides and receives the data via `gpgme_data_t'
- objects, regardless of the communication protocol between GPGME
- and the crypto engine in use.
-
- This Data class is the implementation of the GpgmeData objects.
-
- Please see the information about __init__ for instantiation.
-
- """
-
- _ctype = 'gpgme_data_t'
- _cprefix = 'gpgme_data_'
-
- def _errorcheck(self, name):
- """This function should list all functions returning gpgme_error_t"""
- # This list is compiled using
- #
- # $ grep -v '^gpgme_error_t ' obj/lang/python/python3.5-gpg/gpgme.h \
- # | awk "/\(gpgme_data_t/ { printf (\"'%s',\\n\", \$2) } " \
- # | sed "s/'\\*/'/"
- return name not in {
- 'gpgme_data_read',
- 'gpgme_data_write',
- 'gpgme_data_seek',
- 'gpgme_data_release',
- 'gpgme_data_release_and_get_mem',
- 'gpgme_data_get_encoding',
- 'gpgme_data_get_file_name',
- 'gpgme_data_set_flag',
- 'gpgme_data_identify',
- }
-
- def __init__(self,
- string=None,
- file=None,
- offset=None,
- length=None,
- cbs=None,
- copy=True):
- """Initialize a new gpgme_data_t object.
-
- If no args are specified, make it an empty object.
-
- If string alone is specified, initialize it with the data
- contained there.
-
- If file, offset, and length are all specified, file must
- be either a filename or a file-like object, and the object
- will be initialized by reading the specified chunk from the file.
-
- If cbs is specified, it MUST be a tuple of the form:
-
- (read_cb, write_cb, seek_cb, release_cb[, hook])
-
- where the first four items are functions implementing reading,
- writing, seeking the data, and releasing any resources once
- the data object is deallocated. The functions must match the
- following prototypes:
-
- def read(amount, hook=None):
- return <a b"bytes" object>
-
- def write(data, hook=None):
- return <the number of bytes written>
-
- def seek(offset, whence, hook=None):
- return <the new file position>
-
- def release(hook=None):
- <return value and exceptions are ignored>
-
- The functions may be bound methods. In that case, you can
- simply use the 'self' reference instead of using a hook.
-
- If file is specified without any other arguments, then
- it must be a filename, and the object will be initialized from
- that file.
-
- """
- super(Data, self).__init__(None)
- self.data_cbs = None
-
- if cbs is not None:
- self.new_from_cbs(*cbs)
- elif string is not None:
- self.new_from_mem(string, copy)
- elif file is not None and offset is not None and length is not None:
- self.new_from_filepart(file, offset, length)
- elif file is not None:
- if util.is_a_string(file):
- self.new_from_file(file, copy)
- else:
- self.new_from_fd(file)
- else:
- self.new()
-
- def __del__(self):
- if not gpgme:
- # At interpreter shutdown, gpgme is set to NONE.
- return
-
- if self.wrapped is not None and gpgme.gpgme_data_release:
- gpgme.gpgme_data_release(self.wrapped)
- if self._callback_excinfo:
- gpgme.gpg_raise_callback_exception(self)
- self.wrapped = None
- self._free_datacbs()
-
- # Implement the context manager protocol.
- def __enter__(self):
- return self
-
- def __exit__(self, type, value, tb):
- return False
-
- def _free_datacbs(self):
- self._data_cbs = None
-
- def new(self):
- tmp = gpgme.new_gpgme_data_t_p()
- errorcheck(gpgme.gpgme_data_new(tmp))
- self.wrapped = gpgme.gpgme_data_t_p_value(tmp)
- gpgme.delete_gpgme_data_t_p(tmp)
-
- def new_from_mem(self, string, copy=True):
- tmp = gpgme.new_gpgme_data_t_p()
- errorcheck(
- gpgme.gpgme_data_new_from_mem(tmp, string, len(string), copy))
- self.wrapped = gpgme.gpgme_data_t_p_value(tmp)
- gpgme.delete_gpgme_data_t_p(tmp)
-
- def new_from_file(self, filename, copy=True):
- tmp = gpgme.new_gpgme_data_t_p()
- try:
- errorcheck(gpgme.gpgme_data_new_from_file(tmp, filename, copy))
- except errors.GPGMEError as e:
- if e.getcode() == errors.INV_VALUE and not copy:
- raise ValueError("delayed reads are not yet supported")
- else:
- raise e
- self.wrapped = gpgme.gpgme_data_t_p_value(tmp)
- gpgme.delete_gpgme_data_t_p(tmp)
-
- def new_from_cbs(self, read_cb, write_cb, seek_cb, release_cb, hook=None):
- tmp = gpgme.new_gpgme_data_t_p()
- if hook is not None:
- hookdata = (weakref.ref(self), read_cb, write_cb, seek_cb,
- release_cb, hook)
- else:
- hookdata = (weakref.ref(self), read_cb, write_cb, seek_cb,
- release_cb)
- gpgme.gpg_data_new_from_cbs(self, hookdata, tmp)
- self.wrapped = gpgme.gpgme_data_t_p_value(tmp)
- gpgme.delete_gpgme_data_t_p(tmp)
-
- def new_from_filepart(self, file, offset, length):
- """This wraps the GPGME gpgme_data_new_from_filepart() function.
- The argument "file" may be:
-
- * a string specifying a file name, or
- * a file-like object supporting the fileno() and the mode attribute.
-
- """
-
- tmp = gpgme.new_gpgme_data_t_p()
- filename = None
- fp = None
-
- if util.is_a_string(file):
- filename = file
- else:
- fp = gpgme.fdopen(file.fileno(), file.mode)
- if fp is None:
- raise ValueError("Failed to open file from %s arg %s" % (str(
- type(file)), str(file)))
-
- errorcheck(
- gpgme.gpgme_data_new_from_filepart(tmp, filename, fp, offset,
- length))
- self.wrapped = gpgme.gpgme_data_t_p_value(tmp)
- gpgme.delete_gpgme_data_t_p(tmp)
-
- def new_from_fd(self, file):
- """This wraps the GPGME gpgme_data_new_from_fd() function. The
- argument "file" must be a file-like object, supporting the
- fileno() method.
-
- """
- tmp = gpgme.new_gpgme_data_t_p()
- errorcheck(gpgme.gpgme_data_new_from_fd(tmp, file.fileno()))
- self.wrapped = gpgme.gpgme_data_t_p_value(tmp)
- gpgme.delete_gpgme_data_t_p(tmp)
-
- def new_from_stream(self, file):
- """This wrap around gpgme_data_new_from_stream is an alias for
- new_from_fd() method since in python there's no difference
- between file stream and file descriptor."""
- self.new_from_fd(file)
-
- def new_from_estream(self, file):
- """This wrap around gpgme_data_new_from_estream is an alias for
- new_from_fd() method since in python there's no difference
- between file stream and file descriptor, but using fd broke."""
- self.new_from_stream(file)
-
- def write(self, buffer):
- """Write buffer given as string or bytes.
-
- If a string is given, it is implicitly encoded using UTF-8."""
- written = gpgme.gpgme_data_write(self.wrapped, buffer)
- if written < 0:
- if self._callback_excinfo:
- gpgme.gpg_raise_callback_exception(self)
- else:
- raise GPGMEError.fromSyserror()
- return written
-
- def read(self, size=-1):
- """Read at most size bytes, returned as bytes.
-
- If the size argument is negative or omitted, read until EOF is reached.
-
- Returns the data read, or the empty string if there was no data
- to read before EOF was reached."""
-
- if size == 0:
- return ''
-
- if size > 0:
- try:
- result = gpgme.gpgme_data_read(self.wrapped, size)
- except:
- if self._callback_excinfo:
- gpgme.gpg_raise_callback_exception(self)
- else:
- raise
- return result
- else:
- chunks = []
- while True:
- try:
- result = gpgme.gpgme_data_read(self.wrapped, 4096)
- except:
- if self._callback_excinfo:
- gpgme.gpg_raise_callback_exception(self)
- else:
- raise
- if len(result) == 0:
- break
- chunks.append(result)
- return b''.join(chunks)
-
-
-def pubkey_algo_string(subkey):
- """Return short algorithm string
-
- Return a public key algorithm string (e.g. "rsa2048") for a given
- SUBKEY.
-
- Returns:
- algo - a string
-
- """
- return gpgme.gpgme_pubkey_algo_string(subkey)
-
-
-def pubkey_algo_name(algo):
- """Return name of public key algorithm
-
- Return the name of the public key algorithm for a given numeric
- algorithm id ALGO (cf. RFC4880).
-
- Returns:
- algo - a string
-
- """
- return gpgme.gpgme_pubkey_algo_name(algo)
-
-
-def hash_algo_name(algo):
- """Return name of hash algorithm
-
- Return the name of the hash algorithm for a given numeric
- algorithm id ALGO (cf. RFC4880).
-
- Returns:
- algo - a string
-
- """
- return gpgme.gpgme_hash_algo_name(algo)
-
-
-def get_protocol_name(proto):
- """Get protocol description
-
- Get the string describing protocol PROTO.
-
- Returns:
- proto - a string
-
- """
- return gpgme.gpgme_get_protocol_name(proto)
-
-
-def addrspec_from_uid(uid):
- """Return the address spec
-
- Return the addr-spec (cf. RFC2822 section 4.3) from a user id UID.
-
- Returns:
- addr_spec - a string
-
- """
- return gpgme.gpgme_addrspec_from_uid(uid)
-
-
-def check_version(version=None):
- return gpgme.gpgme_check_version(version)
-
-
-# check_version also makes sure that several subsystems are properly
-# initialized, and it must be run at least once before invoking any
-# other function. We do it here so that the user does not have to do
-# it unless she really wants to check for a certain version.
-check_version()
-
-
-def engine_check_version(proto):
- try:
- errorcheck(gpgme.gpgme_engine_check_version(proto))
- return True
- except errors.GPGMEError:
- return False
-
-
-def get_engine_info():
- ptr = gpgme.new_gpgme_engine_info_t_p()
- try:
- errorcheck(gpgme.gpgme_get_engine_info(ptr))
- info = gpgme.gpgme_engine_info_t_p_value(ptr)
- except errors.GPGMEError:
- info = None
- gpgme.delete_gpgme_engine_info_t_p(ptr)
- return info
-
-
-def set_engine_info(proto, file_name, home_dir=None):
- """Changes the default configuration of the crypto engine implementing
- the protocol 'proto'. 'file_name' is the file name of
- the executable program implementing this protocol. 'home_dir' is the
- directory name of the configuration directory (engine's default is
- used if omitted)."""
- errorcheck(gpgme.gpgme_set_engine_info(proto, file_name, home_dir))
-
-
-def set_locale(category, value):
- """Sets the default locale used by contexts"""
- errorcheck(gpgme.gpgme_set_locale(None, category, value))
-
-
-def wait(hang):
- """Wait for asynchronous call on any Context to finish.
- Wait forever if hang is True.
-
- For finished anynch calls it returns a tuple (status, context):
- status - status return by asnynchronous call.
- context - context which caused this call to return.
-
- Please read the GPGME manual of more information."""
- ptr = gpgme.new_gpgme_error_t_p()
- context = gpgme.gpgme_wait(None, ptr, hang)
- status = gpgme.gpgme_error_t_p_value(ptr)
- gpgme.delete_gpgme_error_t_p(ptr)
- if context is None:
- errorcheck(status)
- else:
- context = Context(context)
- return (status, context)