diff --git a/NEWS b/NEWS index 82f403c4..7d30b94d 100644 --- a/NEWS +++ b/NEWS @@ -21,6 +21,7 @@ Noteworthy changes in version 1.8.1 (unreleased) py: Context.__init__ EXTENDED: New keyword arg home_dir. py: Context.home_dir NEW. py: Context.keylist EXTENDED: New keyword arg mode. + py: Context.create_key NEW. py: core.pubkey_algo_string NEW. py: core.addrspec_from_uid NEW. diff --git a/lang/python/gpg/constants/__init__.py b/lang/python/gpg/constants/__init__.py index 4fb3d6f9..2bf180e5 100644 --- a/lang/python/gpg/constants/__init__.py +++ b/lang/python/gpg/constants/__init__.py @@ -25,8 +25,8 @@ util.process_constants('GPGME_', globals()) del util # For convenience, we import the modules here. -from . import data, event, keylist, md, pk -from . import protocol, sig, sigsum, status, validity +from . import data, keylist, sig # The subdirs. +from . import create, event, md, pk, protocol, sigsum, status, validity # A complication arises because 'import' is a reserved keyword. # Import it as 'Import' instead. @@ -34,7 +34,7 @@ globals()['Import'] = getattr(__import__('', globals(), locals(), [str('import')], 1), "import") __all__ = ['data', 'event', 'import', 'keylist', 'md', 'pk', - 'protocol', 'sig', 'sigsum', 'status', 'validity'] + 'protocol', 'sig', 'sigsum', 'status', 'validity', 'create'] # GPGME 1.7 replaced gpgme_op_edit with gpgme_op_interact. We # implement gpg.Context.op_edit using gpgme_op_interact, so the diff --git a/lang/python/gpg/constants/create.py b/lang/python/gpg/constants/create.py new file mode 100644 index 00000000..132e96d4 --- /dev/null +++ b/lang/python/gpg/constants/create.py @@ -0,0 +1,25 @@ +# Flags for key creation +# +# Copyright (C) 2017 g10 Code GmbH +# +# This file is part of GPGME. +# +# GPGME is free software; you can redistribute it and/or modify it +# under the terms of the GNU Lesser General Public License as +# published by the Free Software Foundation; either version 2.1 of the +# License, or (at your option) any later version. +# +# GPGME is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General +# Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, see . + +from __future__ import absolute_import, print_function, unicode_literals +del absolute_import, print_function, unicode_literals + +from gpg import util +util.process_constants('GPGME_CREATE_', globals()) +del util diff --git a/lang/python/gpg/core.py b/lang/python/gpg/core.py index 3a635160..c5af1b18 100644 --- a/lang/python/gpg/core.py +++ b/lang/python/gpg/core.py @@ -1,4 +1,4 @@ -# Copyright (C) 2016 g10 Code GmbH +# Copyright (C) 2016-2017 g10 Code GmbH # Copyright (C) 2004,2008 Igor Belyi # Copyright (C) 2002 John Goerzen # @@ -501,6 +501,84 @@ class Context(GpgmeWrapper): self.set_keylist_mode(mode) return self.op_keylist_all(pattern, secret) + def create_key(self, userid, algorithm=None, expires_in=0, expires=True, + sign=False, encrypt=False, certify=False, authenticate=False, + passphrase=None, force=False): + """Create a primary key + + Create a primary key for the user id USERID. + + ALGORITHM may be used to specify the public key encryption + algorithm for the new key. By default, a reasonable default + is chosen. You may use "future-default" to select an + algorithm that will be the default in a future implementation + of the engine. ALGORITHM may be a string like "rsa", or + "rsa2048" to explicitly request an algorithm and a key size. + + EXPIRES_IN specifies the expiration time of the key in number + of seconds since the keys creation. By default, a reasonable + expiration time is chosen. If you want to create a key that + does not expire, use the keyword argument EXPIRES. + + SIGN, ENCRYPT, CERTIFY, and AUTHENTICATE can be used to + request the capabilities of the new key. If you don't request + any, a reasonable set of capabilities is selected, and in case + of OpenPGP, a subkey with a reasonable set of capabilities is + created. + + If PASSPHRASE is None (the default), then the key will not be + protected with a passphrase. If PASSPHRASE is a string, it + will be used to protect the key. If PASSPHRASE is True, the + passphrase must be supplied using a passphrase callback or + out-of-band with a pinentry. + + Keyword arguments: + algorithm -- public key algorithm, see above (default: reasonable) + expires_in -- expiration time in seconds (default: reasonable) + expires -- whether or not the key should expire (default: True) + sign -- request the signing capability (see above) + encrypt -- request the encryption capability (see above) + certify -- request the certification capability (see above) + authenticate -- request the authentication capability (see above) + passphrase -- protect the key with a passphrase (default: no passphrase) + force -- force key creation even if a key with the same userid exists + (default: False) + + Returns: + -- an object describing the result of the key creation + + Raises: + GPGMEError -- as signaled by the underlying library + + """ + if util.is_a_string(passphrase): + old_pinentry_mode = self.pinentry_mode + old_passphrase_cb = getattr(self, '_passphrase_cb', None) + self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK + def passphrase_cb(hint, desc, prev_bad, hook=None): + return passphrase + self.set_passphrase_cb(passphrase_cb) + + try: + self.op_createkey(userid, algorithm, + 0, # reserved + expires_in, + None, # extrakey + ((constants.create.SIGN if sign else 0) + | (constants.create.ENCR if encrypt else 0) + | (constants.create.CERT if certify else 0) + | (constants.create.AUTH if authenticate else 0) + | (constants.create.NOPASSWD if passphrase == None else 0) + | (0 if expires else constants.create.NOEXPIRE) + | (constants.create.FORCE if force else 0))) + finally: + if util.is_a_string(passphrase): + self.pinentry_mode = old_pinentry_mode + if old_passphrase_cb: + self.set_passphrase_cb(*old_passphrase_cb[1:]) + + return self.op_genkey_result() + def assuan_transact(self, command, data_cb=None, inquire_cb=None, status_cb=None): """Issue a raw assuan command diff --git a/lang/python/tests/Makefile.am b/lang/python/tests/Makefile.am index d7f2e58b..5469e751 100644 --- a/lang/python/tests/Makefile.am +++ b/lang/python/tests/Makefile.am @@ -50,7 +50,8 @@ py_tests = t-wrapper.py \ t-encrypt-large.py \ t-file-name.py \ t-idiomatic.py \ - t-protocol-assuan.py + t-protocol-assuan.py \ + t-quick-key-creation.py XTESTS = initial.py $(py_tests) final.py EXTRA_DIST = support.py $(XTESTS) encrypt-only.asc sign-only.asc \ diff --git a/lang/python/tests/support.py b/lang/python/tests/support.py index 0b04bb6f..ed5bf615 100644 --- a/lang/python/tests/support.py +++ b/lang/python/tests/support.py @@ -20,6 +20,7 @@ del absolute_import, print_function, unicode_literals import sys import os +import tempfile import gpg # known keys @@ -72,3 +73,17 @@ def mark_key_trusted(ctx, key): return result with gpg.Data() as sink: ctx.op_edit(key, Editor().edit, sink, sink) + + +# Python2/3 compatibility +if hasattr(tempfile, "TemporaryDirectory"): + # Python3.2 and up + TemporaryDirectory = tempfile.TemporaryDirectory +else: + class TemporaryDirectory(object): + def __enter__(self): + self.path = tempfile.mkdtemp() + return self.path + def __exit__(self, *args): + import shutil + shutil.rmtree(self.path) diff --git a/lang/python/tests/t-quick-key-creation.py b/lang/python/tests/t-quick-key-creation.py new file mode 100755 index 00000000..ea63dc3f --- /dev/null +++ b/lang/python/tests/t-quick-key-creation.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python + +# Copyright (C) 2017 g10 Code GmbH +# +# This file is part of GPGME. +# +# GPGME is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# GPGME is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General +# Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this program; if not, see . + +from __future__ import absolute_import, print_function, unicode_literals +del absolute_import, print_function, unicode_literals + +import gpg +import itertools +import os +import shutil +import time + +import support + +alpha = "Alpha " + +def copy_configuration(destination): + home = os.environ['GNUPGHOME'] + shutil.copy(os.path.join(home, "gpg.conf"), destination) + shutil.copy(os.path.join(home, "gpg-agent.conf"), destination) + +with support.TemporaryDirectory() as tmp: + copy_configuration(tmp) + with gpg.Context(home_dir=tmp) as ctx: + res = ctx.create_key(alpha) + + keys = list(ctx.keylist()) + assert len(keys) == 1, "Weird number of keys created" + + key = keys[0] + assert key.fpr == res.fpr + assert len(key.subkeys) == 2, "Expected one primary key and one subkey" + assert key.subkeys[0].expires > 0, "Expected primary key to expire" + + # Try to create a key with the same UID + try: + ctx.create_key(alpha) + assert False, "Expected an error but got none" + except gpg.errors.GpgError as e: + pass + + # Try to create a key with the same UID, now with force! + res2 = ctx.create_key(alpha, force=True) + assert res.fpr != res2.fpr + + +# From here on, we use one context, and create unique UIDs +uid_counter = 0 +def make_uid(): + global uid_counter + uid_counter += 1 + return "user{0}@invalid.example.org".format(uid_counter) + +with support.TemporaryDirectory() as tmp: + copy_configuration(tmp) + with gpg.Context(home_dir=tmp) as ctx: + + # Check gpg.constants.create.NOEXPIRE... + res = ctx.create_key(make_uid(), expires=False) + key = ctx.get_key(res.fpr, secret=True) + assert key.fpr == res.fpr + assert len(key.subkeys) == 2, "Expected one primary key and one subkey" + assert key.subkeys[0].expires == 0, "Expected primary key not to expire" + + t = 2 * 24 * 60 * 60 + slack = 5 * 60 + res = ctx.create_key(make_uid(), expires_in=t) + key = ctx.get_key(res.fpr, secret=True) + assert key.fpr == res.fpr + assert len(key.subkeys) == 2, "Expected one primary key and one subkey" + assert abs(time.time() + t - key.subkeys[0].expires) < slack, \ + "Primary keys expiration time is off" + + # Check capabilities + for sign, encrypt, certify, authenticate in itertools.product([False, True], + [False, True], + [False, True], + [False, True]): + # Filter some out + if not (sign or encrypt or certify or authenticate): + # This triggers the default capabilities tested before. + continue + if (sign or encrypt or authenticate) and not certify: + # The primary key always certifies. + continue + + res = ctx.create_key(make_uid(), algorithm="rsa", + sign=sign, encrypt=encrypt, certify=certify, + authenticate=authenticate) + key = ctx.get_key(res.fpr, secret=True) + assert key.fpr == res.fpr + assert len(key.subkeys) == 1, \ + "Expected no subkey for non-default capabilities" + + p = key.subkeys[0] + assert sign == p.can_sign + assert encrypt == p.can_encrypt + assert certify == p.can_certify + assert authenticate == p.can_authenticate + + # Check algorithm + res = ctx.create_key(make_uid(), algorithm="rsa") + key = ctx.get_key(res.fpr, secret=True) + assert key.fpr == res.fpr + for k in key.subkeys: + assert k.pubkey_algo == 1 + + # Check algorithm with size + res = ctx.create_key(make_uid(), algorithm="rsa1024") + key = ctx.get_key(res.fpr, secret=True) + assert key.fpr == res.fpr + for k in key.subkeys: + assert k.pubkey_algo == 1 + assert k.length == 1024 + + # Check algorithm future-default + ctx.create_key(make_uid(), algorithm="future-default") + + # Check passphrase protection + recipient = make_uid() + passphrase = "streng geheim" + res = ctx.create_key(recipient, passphrase=passphrase) + ciphertext, _, _ = ctx.encrypt(b"hello there", recipients=[ctx.get_key(res.fpr)]) + + cb_called = False + def cb(*args): + global cb_called + cb_called = True + return passphrase + ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK + ctx.set_passphrase_cb(cb) + + plaintext, _, _ = ctx.decrypt(ciphertext) + assert plaintext == b"hello there" + assert cb_called