diff options
| author | Justus Winter <[email protected]> | 2017-02-16 13:53:11 +0000 | 
|---|---|---|
| committer | Justus Winter <[email protected]> | 2017-02-16 15:43:10 +0000 | 
| commit | 476b97822b169c30cc246c1de2ff94cf89084706 (patch) | |
| tree | a11b21e34c3f414a5524e353083faaa7c6ba25ad /lang/python | |
| parent | python: Fix passphrase callback wrapping. (diff) | |
| download | gpgme-476b97822b169c30cc246c1de2ff94cf89084706.tar.gz gpgme-476b97822b169c30cc246c1de2ff94cf89084706.zip | |
python: Support quick key creation.
* NEWS: Update.
* lang/python/gpg/constants/__init__.py: Import new file.
* lang/python/gpg/constants/create.py: New file.
* lang/python/gpg/core.py (Context.create_key): New function.
* lang/python/tests/Makefile.am (XTESTS): Add new test.
* lang/python/tests/support.py (TemporaryDirectory): New class.
* lang/python/tests/t-quick-key-creation.py: New file.
Signed-off-by: Justus Winter <[email protected]>
Diffstat (limited to '')
| -rw-r--r-- | lang/python/gpg/constants/__init__.py | 6 | ||||
| -rw-r--r-- | lang/python/gpg/constants/create.py | 25 | ||||
| -rw-r--r-- | lang/python/gpg/core.py | 80 | ||||
| -rw-r--r-- | lang/python/tests/Makefile.am | 3 | ||||
| -rw-r--r-- | lang/python/tests/support.py | 15 | ||||
| -rwxr-xr-x | lang/python/tests/t-quick-key-creation.py | 151 | 
6 files changed, 275 insertions, 5 deletions
| diff --git a/lang/python/gpg/constants/__init__.py b/lang/python/gpg/constants/__init__.py index 4fb3d6f9..2bf180e5 100644 --- a/lang/python/gpg/constants/__init__.py +++ b/lang/python/gpg/constants/__init__.py @@ -25,8 +25,8 @@ util.process_constants('GPGME_', globals())  del util  # For convenience, we import the modules here. -from . import data, event, keylist, md, pk -from . import protocol, sig, sigsum, status, validity +from . import data, keylist, sig # The subdirs. +from . import create, event, md, pk, protocol, sigsum, status, validity  # A complication arises because 'import' is a reserved keyword.  # Import it as 'Import' instead. @@ -34,7 +34,7 @@ globals()['Import'] = getattr(__import__('', globals(), locals(),                                           [str('import')], 1), "import")  __all__ = ['data', 'event', 'import', 'keylist', 'md', 'pk', -           'protocol', 'sig', 'sigsum', 'status', 'validity'] +           'protocol', 'sig', 'sigsum', 'status', 'validity', 'create']  # GPGME 1.7 replaced gpgme_op_edit with gpgme_op_interact.  We  # implement gpg.Context.op_edit using gpgme_op_interact, so the diff --git a/lang/python/gpg/constants/create.py b/lang/python/gpg/constants/create.py new file mode 100644 index 00000000..132e96d4 --- /dev/null +++ b/lang/python/gpg/constants/create.py @@ -0,0 +1,25 @@ +# Flags for key creation +# +# Copyright (C) 2017 g10 Code GmbH +# +# This file is part of GPGME. +# +# GPGME is free software; you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation; either version 2.1 of the +# License, or (at your option) any later version. +# +# GPGME is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General +# Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, see <http://www.gnu.org/licenses/>. + +from __future__ import absolute_import, print_function, unicode_literals +del absolute_import, print_function, unicode_literals + +from gpg import util +util.process_constants('GPGME_CREATE_', globals()) +del util diff --git a/lang/python/gpg/core.py b/lang/python/gpg/core.py index 3a635160..c5af1b18 100644 --- a/lang/python/gpg/core.py +++ b/lang/python/gpg/core.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016 g10 Code GmbH +# Copyright (C) 2016-2017 g10 Code GmbH  # Copyright (C) 2004,2008 Igor Belyi <[email protected]>  # Copyright (C) 2002 John Goerzen <[email protected]>  # @@ -501,6 +501,84 @@ class Context(GpgmeWrapper):          self.set_keylist_mode(mode)          return self.op_keylist_all(pattern, secret) +    def create_key(self, userid, algorithm=None, expires_in=0, expires=True, +                   sign=False, encrypt=False, certify=False, authenticate=False, +                   passphrase=None, force=False): +        """Create a primary key + +        Create a primary key for the user id USERID. + +        ALGORITHM may be used to specify the public key encryption +        algorithm for the new key.  By default, a reasonable default +        is chosen.  You may use "future-default" to select an +        algorithm that will be the default in a future implementation +        of the engine.  ALGORITHM may be a string like "rsa", or +        "rsa2048" to explicitly request an algorithm and a key size. + +        EXPIRES_IN specifies the expiration time of the key in number +        of seconds since the keys creation.  By default, a reasonable +        expiration time is chosen.  If you want to create a key that +        does not expire, use the keyword argument EXPIRES. + +        SIGN, ENCRYPT, CERTIFY, and AUTHENTICATE can be used to +        request the capabilities of the new key.  If you don't request +        any, a reasonable set of capabilities is selected, and in case +        of OpenPGP, a subkey with a reasonable set of capabilities is +        created. + +        If PASSPHRASE is None (the default), then the key will not be +        protected with a passphrase.  If PASSPHRASE is a string, it +        will be used to protect the key.  If PASSPHRASE is True, the +        passphrase must be supplied using a passphrase callback or +        out-of-band with a pinentry. + +        Keyword arguments: +        algorithm    -- public key algorithm, see above (default: reasonable) +        expires_in   -- expiration time in seconds (default: reasonable) +        expires      -- whether or not the key should expire (default: True) +        sign         -- request the signing capability (see above) +        encrypt      -- request the encryption capability (see above) +        certify      -- request the certification capability (see above) +        authenticate -- request the authentication capability (see above) +        passphrase   -- protect the key with a passphrase (default: no passphrase) +        force        -- force key creation even if a key with the same userid exists +                                                          (default: False) + +        Returns: +                     -- an object describing the result of the key creation + +        Raises: +        GPGMEError   -- as signaled by the underlying library + +        """ +        if util.is_a_string(passphrase): +            old_pinentry_mode = self.pinentry_mode +            old_passphrase_cb = getattr(self, '_passphrase_cb', None) +            self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK +            def passphrase_cb(hint, desc, prev_bad, hook=None): +                return passphrase +            self.set_passphrase_cb(passphrase_cb) + +        try: +            self.op_createkey(userid, algorithm, +                              0, # reserved +                              expires_in, +                              None, # extrakey +                              ((constants.create.SIGN if sign else 0) +                               | (constants.create.ENCR if encrypt else 0) +                               | (constants.create.CERT if certify else 0) +                               | (constants.create.AUTH if authenticate else 0) +                               | (constants.create.NOPASSWD if passphrase == None else 0) +                               | (0 if expires else constants.create.NOEXPIRE) +                               | (constants.create.FORCE if force else 0))) +        finally: +            if util.is_a_string(passphrase): +                self.pinentry_mode = old_pinentry_mode +                if old_passphrase_cb: +                    self.set_passphrase_cb(*old_passphrase_cb[1:]) + +        return self.op_genkey_result() +      def assuan_transact(self, command,                          data_cb=None, inquire_cb=None, status_cb=None):          """Issue a raw assuan command diff --git a/lang/python/tests/Makefile.am b/lang/python/tests/Makefile.am index d7f2e58b..5469e751 100644 --- a/lang/python/tests/Makefile.am +++ b/lang/python/tests/Makefile.am @@ -50,7 +50,8 @@ py_tests = t-wrapper.py \  	t-encrypt-large.py \  	t-file-name.py \  	t-idiomatic.py \ -	t-protocol-assuan.py +	t-protocol-assuan.py \ +	t-quick-key-creation.py  XTESTS = initial.py $(py_tests) final.py  EXTRA_DIST = support.py $(XTESTS) encrypt-only.asc sign-only.asc \ diff --git a/lang/python/tests/support.py b/lang/python/tests/support.py index 0b04bb6f..ed5bf615 100644 --- a/lang/python/tests/support.py +++ b/lang/python/tests/support.py @@ -20,6 +20,7 @@ del absolute_import, print_function, unicode_literals  import sys  import os +import tempfile  import gpg  # known keys @@ -72,3 +73,17 @@ def mark_key_trusted(ctx, key):              return result      with gpg.Data() as sink:          ctx.op_edit(key, Editor().edit, sink, sink) + + +# Python2/3 compatibility +if hasattr(tempfile, "TemporaryDirectory"): +    # Python3.2 and up +    TemporaryDirectory = tempfile.TemporaryDirectory +else: +    class TemporaryDirectory(object): +        def __enter__(self): +            self.path = tempfile.mkdtemp() +            return self.path +        def __exit__(self, *args): +            import shutil +            shutil.rmtree(self.path) diff --git a/lang/python/tests/t-quick-key-creation.py b/lang/python/tests/t-quick-key-creation.py new file mode 100755 index 00000000..ea63dc3f --- /dev/null +++ b/lang/python/tests/t-quick-key-creation.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python + +# Copyright (C) 2017 g10 Code GmbH +# +# This file is part of GPGME. +# +# GPGME is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# GPGME is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General +# Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, see <http://www.gnu.org/licenses/>. + +from __future__ import absolute_import, print_function, unicode_literals +del absolute_import, print_function, unicode_literals + +import gpg +import itertools +import os +import shutil +import time + +import support + +alpha = "Alpha <[email protected]>" + +def copy_configuration(destination): +    home = os.environ['GNUPGHOME'] +    shutil.copy(os.path.join(home, "gpg.conf"), destination) +    shutil.copy(os.path.join(home, "gpg-agent.conf"), destination) + +with support.TemporaryDirectory() as tmp: +    copy_configuration(tmp) +    with gpg.Context(home_dir=tmp) as ctx: +        res = ctx.create_key(alpha) + +        keys = list(ctx.keylist()) +        assert len(keys) == 1, "Weird number of keys created" + +        key = keys[0] +        assert key.fpr == res.fpr +        assert len(key.subkeys) == 2, "Expected one primary key and one subkey" +        assert key.subkeys[0].expires > 0, "Expected primary key to expire" + +        # Try to create a key with the same UID +        try: +            ctx.create_key(alpha) +            assert False, "Expected an error but got none" +        except gpg.errors.GpgError as e: +            pass + +        # Try to create a key with the same UID, now with force! +        res2 = ctx.create_key(alpha, force=True) +        assert res.fpr != res2.fpr + + +# From here on, we use one context, and create unique UIDs +uid_counter = 0 +def make_uid(): +    global uid_counter +    uid_counter += 1 +    return "user{0}@invalid.example.org".format(uid_counter) + +with support.TemporaryDirectory() as tmp: +    copy_configuration(tmp) +    with gpg.Context(home_dir=tmp) as ctx: + +        # Check gpg.constants.create.NOEXPIRE... +        res = ctx.create_key(make_uid(), expires=False) +        key = ctx.get_key(res.fpr, secret=True) +        assert key.fpr == res.fpr +        assert len(key.subkeys) == 2, "Expected one primary key and one subkey" +        assert key.subkeys[0].expires == 0, "Expected primary key not to expire" + +        t = 2 * 24 * 60 * 60 +        slack = 5 * 60 +        res = ctx.create_key(make_uid(), expires_in=t) +        key = ctx.get_key(res.fpr, secret=True) +        assert key.fpr == res.fpr +        assert len(key.subkeys) == 2, "Expected one primary key and one subkey" +        assert abs(time.time() + t - key.subkeys[0].expires) < slack, \ +            "Primary keys expiration time is off" + +        # Check capabilities +        for sign, encrypt, certify, authenticate in itertools.product([False, True], +                                                                      [False, True], +                                                                      [False, True], +                                                                      [False, True]): +            # Filter some out +            if not (sign or encrypt or certify or authenticate): +                # This triggers the default capabilities tested before. +                continue +            if (sign or encrypt or authenticate) and not certify: +                # The primary key always certifies. +                continue + +            res = ctx.create_key(make_uid(), algorithm="rsa", +                                 sign=sign, encrypt=encrypt, certify=certify, +                                 authenticate=authenticate) +            key = ctx.get_key(res.fpr, secret=True) +            assert key.fpr == res.fpr +            assert len(key.subkeys) == 1, \ +                "Expected no subkey for non-default capabilities" + +            p = key.subkeys[0] +            assert sign == p.can_sign +            assert encrypt == p.can_encrypt +            assert certify == p.can_certify +            assert authenticate == p.can_authenticate + +        # Check algorithm +        res = ctx.create_key(make_uid(), algorithm="rsa") +        key = ctx.get_key(res.fpr, secret=True) +        assert key.fpr == res.fpr +        for k in key.subkeys: +            assert k.pubkey_algo == 1 + +        # Check algorithm with size +        res = ctx.create_key(make_uid(), algorithm="rsa1024") +        key = ctx.get_key(res.fpr, secret=True) +        assert key.fpr == res.fpr +        for k in key.subkeys: +            assert k.pubkey_algo == 1 +            assert k.length == 1024 + +        # Check algorithm future-default +        ctx.create_key(make_uid(), algorithm="future-default") + +        # Check passphrase protection +        recipient = make_uid() +        passphrase = "streng geheim" +        res = ctx.create_key(recipient, passphrase=passphrase) +        ciphertext, _, _ = ctx.encrypt(b"hello there", recipients=[ctx.get_key(res.fpr)]) + +        cb_called = False +        def cb(*args): +            global cb_called +            cb_called = True +            return passphrase +        ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK +        ctx.set_passphrase_cb(cb) + +        plaintext, _, _ = ctx.decrypt(ciphertext) +        assert plaintext == b"hello there" +        assert cb_called | 
