From de8494b16bc50c60a8438f2cae1f8c88e8949f7a Mon Sep 17 00:00:00 2001 From: Justus Winter Date: Fri, 17 Feb 2017 12:18:56 +0100 Subject: [PATCH] python: Fix teardown of ephemeral contexts. * lang/python/tests/support.py (EphemeralContext): New function. * lang/python/tests/t-quick-key-creation.py: Use the new function to manage ephemeral contexts. * lang/python/tests/t-quick-key-manipulation.py: Likewise. * lang/python/tests/t-quick-subkey-creation.py: Likewise. -- Previously, there was a problem with cleaning up ephemeral home directories. shutil.rmtree deleted the agents main socket, gpg-agent detected that, and deleted the other sockets as well, racing shutil.rmtree which did not cope will with that. Fix this by asking the agent nicely to shut down. Signed-off-by: Justus Winter --- lang/python/tests/support.py | 24 ++- lang/python/tests/t-quick-key-creation.py | 184 ++++++++---------- lang/python/tests/t-quick-key-manipulation.py | 117 +++++------ lang/python/tests/t-quick-subkey-creation.py | 149 +++++++------- 4 files changed, 232 insertions(+), 242 deletions(-) diff --git a/lang/python/tests/support.py b/lang/python/tests/support.py index ed5bf615..a381270d 100644 --- a/lang/python/tests/support.py +++ b/lang/python/tests/support.py @@ -18,9 +18,12 @@ from __future__ import absolute_import, print_function, unicode_literals del absolute_import, print_function, unicode_literals +import contextlib +import shutil import sys import os import tempfile +import time import gpg # known keys @@ -85,5 +88,24 @@ else: self.path = tempfile.mkdtemp() return self.path def __exit__(self, *args): - import shutil shutil.rmtree(self.path) + +@contextlib.contextmanager +def EphemeralContext(): + with TemporaryDirectory() as tmp: + home = os.environ['GNUPGHOME'] + shutil.copy(os.path.join(home, "gpg.conf"), tmp) + shutil.copy(os.path.join(home, "gpg-agent.conf"), tmp) + + with gpg.Context(home_dir=tmp) as ctx: + yield ctx + + # Ask the agent to quit. + agent_socket = os.path.join(tmp, "S.gpg-agent") + ctx.protocol = gpg.constants.protocol.ASSUAN + ctx.set_engine_info(ctx.protocol, file_name=agent_socket) + ctx.assuan_transact(["KILLAGENT"]) + + # Block until it is really gone. + while os.path.exists(agent_socket): + time.sleep(.01) diff --git a/lang/python/tests/t-quick-key-creation.py b/lang/python/tests/t-quick-key-creation.py index ea63dc3f..c642c5b4 100755 --- a/lang/python/tests/t-quick-key-creation.py +++ b/lang/python/tests/t-quick-key-creation.py @@ -22,42 +22,33 @@ del absolute_import, print_function, unicode_literals import gpg import itertools -import os -import shutil import time import support alpha = "Alpha " -def copy_configuration(destination): - home = os.environ['GNUPGHOME'] - shutil.copy(os.path.join(home, "gpg.conf"), destination) - shutil.copy(os.path.join(home, "gpg-agent.conf"), destination) +with support.EphemeralContext() as ctx: + res = ctx.create_key(alpha) -with support.TemporaryDirectory() as tmp: - copy_configuration(tmp) - with gpg.Context(home_dir=tmp) as ctx: - res = ctx.create_key(alpha) + keys = list(ctx.keylist()) + assert len(keys) == 1, "Weird number of keys created" - keys = list(ctx.keylist()) - assert len(keys) == 1, "Weird number of keys created" + key = keys[0] + assert key.fpr == res.fpr + assert len(key.subkeys) == 2, "Expected one primary key and one subkey" + assert key.subkeys[0].expires > 0, "Expected primary key to expire" - key = keys[0] - assert key.fpr == res.fpr - assert len(key.subkeys) == 2, "Expected one primary key and one subkey" - assert key.subkeys[0].expires > 0, "Expected primary key to expire" + # Try to create a key with the same UID + try: + ctx.create_key(alpha) + assert False, "Expected an error but got none" + except gpg.errors.GpgError as e: + pass - # Try to create a key with the same UID - try: - ctx.create_key(alpha) - assert False, "Expected an error but got none" - except gpg.errors.GpgError as e: - pass - - # Try to create a key with the same UID, now with force! - res2 = ctx.create_key(alpha, force=True) - assert res.fpr != res2.fpr + # Try to create a key with the same UID, now with force! + res2 = ctx.create_key(alpha, force=True) + assert res.fpr != res2.fpr # From here on, we use one context, and create unique UIDs @@ -67,85 +58,82 @@ def make_uid(): uid_counter += 1 return "user{0}@invalid.example.org".format(uid_counter) -with support.TemporaryDirectory() as tmp: - copy_configuration(tmp) - with gpg.Context(home_dir=tmp) as ctx: +with support.EphemeralContext() as ctx: + # Check gpg.constants.create.NOEXPIRE... + res = ctx.create_key(make_uid(), expires=False) + key = ctx.get_key(res.fpr, secret=True) + assert key.fpr == res.fpr + assert len(key.subkeys) == 2, "Expected one primary key and one subkey" + assert key.subkeys[0].expires == 0, "Expected primary key not to expire" - # Check gpg.constants.create.NOEXPIRE... - res = ctx.create_key(make_uid(), expires=False) + t = 2 * 24 * 60 * 60 + slack = 5 * 60 + res = ctx.create_key(make_uid(), expires_in=t) + key = ctx.get_key(res.fpr, secret=True) + assert key.fpr == res.fpr + assert len(key.subkeys) == 2, "Expected one primary key and one subkey" + assert abs(time.time() + t - key.subkeys[0].expires) < slack, \ + "Primary keys expiration time is off" + + # Check capabilities + for sign, encrypt, certify, authenticate in itertools.product([False, True], + [False, True], + [False, True], + [False, True]): + # Filter some out + if not (sign or encrypt or certify or authenticate): + # This triggers the default capabilities tested before. + continue + if (sign or encrypt or authenticate) and not certify: + # The primary key always certifies. + continue + + res = ctx.create_key(make_uid(), algorithm="rsa", + sign=sign, encrypt=encrypt, certify=certify, + authenticate=authenticate) key = ctx.get_key(res.fpr, secret=True) assert key.fpr == res.fpr - assert len(key.subkeys) == 2, "Expected one primary key and one subkey" - assert key.subkeys[0].expires == 0, "Expected primary key not to expire" + assert len(key.subkeys) == 1, \ + "Expected no subkey for non-default capabilities" - t = 2 * 24 * 60 * 60 - slack = 5 * 60 - res = ctx.create_key(make_uid(), expires_in=t) - key = ctx.get_key(res.fpr, secret=True) - assert key.fpr == res.fpr - assert len(key.subkeys) == 2, "Expected one primary key and one subkey" - assert abs(time.time() + t - key.subkeys[0].expires) < slack, \ - "Primary keys expiration time is off" + p = key.subkeys[0] + assert sign == p.can_sign + assert encrypt == p.can_encrypt + assert certify == p.can_certify + assert authenticate == p.can_authenticate - # Check capabilities - for sign, encrypt, certify, authenticate in itertools.product([False, True], - [False, True], - [False, True], - [False, True]): - # Filter some out - if not (sign or encrypt or certify or authenticate): - # This triggers the default capabilities tested before. - continue - if (sign or encrypt or authenticate) and not certify: - # The primary key always certifies. - continue + # Check algorithm + res = ctx.create_key(make_uid(), algorithm="rsa") + key = ctx.get_key(res.fpr, secret=True) + assert key.fpr == res.fpr + for k in key.subkeys: + assert k.pubkey_algo == 1 - res = ctx.create_key(make_uid(), algorithm="rsa", - sign=sign, encrypt=encrypt, certify=certify, - authenticate=authenticate) - key = ctx.get_key(res.fpr, secret=True) - assert key.fpr == res.fpr - assert len(key.subkeys) == 1, \ - "Expected no subkey for non-default capabilities" + # Check algorithm with size + res = ctx.create_key(make_uid(), algorithm="rsa1024") + key = ctx.get_key(res.fpr, secret=True) + assert key.fpr == res.fpr + for k in key.subkeys: + assert k.pubkey_algo == 1 + assert k.length == 1024 - p = key.subkeys[0] - assert sign == p.can_sign - assert encrypt == p.can_encrypt - assert certify == p.can_certify - assert authenticate == p.can_authenticate + # Check algorithm future-default + ctx.create_key(make_uid(), algorithm="future-default") - # Check algorithm - res = ctx.create_key(make_uid(), algorithm="rsa") - key = ctx.get_key(res.fpr, secret=True) - assert key.fpr == res.fpr - for k in key.subkeys: - assert k.pubkey_algo == 1 + # Check passphrase protection + recipient = make_uid() + passphrase = "streng geheim" + res = ctx.create_key(recipient, passphrase=passphrase) + ciphertext, _, _ = ctx.encrypt(b"hello there", recipients=[ctx.get_key(res.fpr)]) - # Check algorithm with size - res = ctx.create_key(make_uid(), algorithm="rsa1024") - key = ctx.get_key(res.fpr, secret=True) - assert key.fpr == res.fpr - for k in key.subkeys: - assert k.pubkey_algo == 1 - assert k.length == 1024 + cb_called = False + def cb(*args): + global cb_called + cb_called = True + return passphrase + ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK + ctx.set_passphrase_cb(cb) - # Check algorithm future-default - ctx.create_key(make_uid(), algorithm="future-default") - - # Check passphrase protection - recipient = make_uid() - passphrase = "streng geheim" - res = ctx.create_key(recipient, passphrase=passphrase) - ciphertext, _, _ = ctx.encrypt(b"hello there", recipients=[ctx.get_key(res.fpr)]) - - cb_called = False - def cb(*args): - global cb_called - cb_called = True - return passphrase - ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK - ctx.set_passphrase_cb(cb) - - plaintext, _, _ = ctx.decrypt(ciphertext) - assert plaintext == b"hello there" - assert cb_called + plaintext, _, _ = ctx.decrypt(ciphertext) + assert plaintext == b"hello there" + assert cb_called diff --git a/lang/python/tests/t-quick-key-manipulation.py b/lang/python/tests/t-quick-key-manipulation.py index 62c395ab..12c18ce5 100755 --- a/lang/python/tests/t-quick-key-manipulation.py +++ b/lang/python/tests/t-quick-key-manipulation.py @@ -21,83 +21,72 @@ from __future__ import absolute_import, print_function, unicode_literals del absolute_import, print_function, unicode_literals import gpg -import itertools -import os -import shutil -import time import support alpha = "Alpha " bravo = "Bravo " -def copy_configuration(destination): - home = os.environ['GNUPGHOME'] - shutil.copy(os.path.join(home, "gpg.conf"), destination) - shutil.copy(os.path.join(home, "gpg-agent.conf"), destination) +with support.EphemeralContext() as ctx: + res = ctx.create_key(alpha, certify=True) + key = ctx.get_key(res.fpr) + assert len(key.subkeys) == 1, "Expected one primary key and no subkeys" + assert len(key.uids) == 1, "Expected exactly one UID" -with support.TemporaryDirectory() as tmp: - copy_configuration(tmp) - with gpg.Context(home_dir=tmp) as ctx: - res = ctx.create_key(alpha, certify=True) + def get_uid(uid): key = ctx.get_key(res.fpr) - assert len(key.subkeys) == 1, "Expected one primary key and no subkeys" - assert len(key.uids) == 1, "Expected exactly one UID" + for u in key.uids: + if u.uid == uid: + return u + return None - def get_uid(uid): - key = ctx.get_key(res.fpr) - for u in key.uids: - if u.uid == uid: - return u - return None + # sanity check + uid = get_uid(alpha) + assert uid, "UID alpha not found" + assert uid.revoked == 0 - # sanity check - uid = get_uid(alpha) - assert uid, "UID alpha not found" - assert uid.revoked == 0 + # add bravo + ctx.key_add_uid(key, bravo) + uid = get_uid(bravo) + assert uid, "UID bravo not found" + assert uid.revoked == 0 - # add bravo - ctx.key_add_uid(key, bravo) - uid = get_uid(bravo) - assert uid, "UID bravo not found" - assert uid.revoked == 0 + # revoke alpha + ctx.key_revoke_uid(key, alpha) + uid = get_uid(alpha) + assert uid, "UID alpha not found" + assert uid.revoked == 1 + uid = get_uid(bravo) + assert uid, "UID bravo not found" + assert uid.revoked == 0 - # revoke alpha + # try to revoke the last UID + try: ctx.key_revoke_uid(key, alpha) - uid = get_uid(alpha) - assert uid, "UID alpha not found" - assert uid.revoked == 1 - uid = get_uid(bravo) - assert uid, "UID bravo not found" - assert uid.revoked == 0 + # IMHO this should fail. issue2961. + # assert False, "Expected an error but got none" + except gpg.errors.GpgError: + pass - # try to revoke the last UID - try: - ctx.key_revoke_uid(key, alpha) - # IMHO this should fail. issue2961. - # assert False, "Expected an error but got none" - except gpg.errors.GpgError: - pass + # Everything should be the same + uid = get_uid(alpha) + assert uid, "UID alpha not found" + assert uid.revoked == 1 + uid = get_uid(bravo) + assert uid, "UID bravo not found" + assert uid.revoked == 0 - # Everything should be the same - uid = get_uid(alpha) - assert uid, "UID alpha not found" - assert uid.revoked == 1 - uid = get_uid(bravo) - assert uid, "UID bravo not found" - assert uid.revoked == 0 + # try to revoke a non-existent UID + try: + ctx.key_revoke_uid(key, "i dont exist") + # IMHO this should fail. issue2963. + # assert False, "Expected an error but got none" + except gpg.errors.GpgError: + pass - # try to revoke a non-existent UID - try: - ctx.key_revoke_uid(key, "i dont exist") - # IMHO this should fail. issue2963. - # assert False, "Expected an error but got none" - except gpg.errors.GpgError: - pass - - # try to add an pre-existent UID - try: - ctx.key_add_uid(key, bravo) - assert False, "Expected an error but got none" - except gpg.errors.GpgError: - pass + # try to add an pre-existent UID + try: + ctx.key_add_uid(key, bravo) + assert False, "Expected an error but got none" + except gpg.errors.GpgError: + pass diff --git a/lang/python/tests/t-quick-subkey-creation.py b/lang/python/tests/t-quick-subkey-creation.py index 0d9f71fb..ad4f35c6 100755 --- a/lang/python/tests/t-quick-subkey-creation.py +++ b/lang/python/tests/t-quick-subkey-creation.py @@ -22,8 +22,6 @@ del absolute_import, print_function, unicode_literals import gpg import itertools -import os -import shutil import time import support @@ -31,91 +29,84 @@ import support alpha = "Alpha " bravo = "Bravo " -def copy_configuration(destination): - home = os.environ['GNUPGHOME'] - shutil.copy(os.path.join(home, "gpg.conf"), destination) - shutil.copy(os.path.join(home, "gpg-agent.conf"), destination) +with support.EphemeralContext() as ctx: + res = ctx.create_key(alpha, certify=True) + keys = list(ctx.keylist()) + assert len(keys) == 1, "Weird number of keys created" + key = keys[0] + assert key.fpr == res.fpr + assert len(key.subkeys) == 1, "Expected one primary key and no subkeys" -with support.TemporaryDirectory() as tmp: - copy_configuration(tmp) - with gpg.Context(home_dir=tmp) as ctx: - res = ctx.create_key(alpha, certify=True) - keys = list(ctx.keylist()) - assert len(keys) == 1, "Weird number of keys created" - key = keys[0] - assert key.fpr == res.fpr - assert len(key.subkeys) == 1, "Expected one primary key and no subkeys" + def get_subkey(fpr): + k = ctx.get_key(fpr) + for sk in k.subkeys: + if sk.fpr == fpr: + return sk + return None - def get_subkey(fpr): - k = ctx.get_key(fpr) - for sk in k.subkeys: - if sk.fpr == fpr: - return sk - return None + # Check gpg.constants.create.NOEXPIRE... + res = ctx.create_subkey(key, expires=False) + subkey = get_subkey(res.fpr) + assert subkey.expires == 0, "Expected subkey not to expire" + assert subkey.can_encrypt, \ + "Default subkey capabilities do not include encryption" - # Check gpg.constants.create.NOEXPIRE... - res = ctx.create_subkey(key, expires=False) + t = 2 * 24 * 60 * 60 + slack = 5 * 60 + res = ctx.create_subkey(key, expires_in=t) + subkey = get_subkey(res.fpr) + assert abs(time.time() + t - subkey.expires) < slack, \ + "subkeys expiration time is off" + + # Check capabilities + for sign, encrypt, authenticate in itertools.product([False, True], + [False, True], + [False, True]): + # Filter some out + if not (sign or encrypt or authenticate): + # This triggers the default capabilities tested before. + continue + + res = ctx.create_subkey(key, sign=sign, encrypt=encrypt, + authenticate=authenticate) subkey = get_subkey(res.fpr) - assert subkey.expires == 0, "Expected subkey not to expire" - assert subkey.can_encrypt, \ - "Default subkey capabilities do not include encryption" + assert sign == subkey.can_sign + assert encrypt == subkey.can_encrypt + assert authenticate == subkey.can_authenticate - t = 2 * 24 * 60 * 60 - slack = 5 * 60 - res = ctx.create_subkey(key, expires_in=t) - subkey = get_subkey(res.fpr) - assert abs(time.time() + t - subkey.expires) < slack, \ - "subkeys expiration time is off" + # Check algorithm + res = ctx.create_subkey(key, algorithm="rsa") + subkey = get_subkey(res.fpr) + assert subkey.pubkey_algo == 1 - # Check capabilities - for sign, encrypt, authenticate in itertools.product([False, True], - [False, True], - [False, True]): - # Filter some out - if not (sign or encrypt or authenticate): - # This triggers the default capabilities tested before. - continue + # Check algorithm with size + res = ctx.create_subkey(key, algorithm="rsa1024") + subkey = get_subkey(res.fpr) + assert subkey.pubkey_algo == 1 + assert subkey.length == 1024 - res = ctx.create_subkey(key, sign=sign, encrypt=encrypt, - authenticate=authenticate) - subkey = get_subkey(res.fpr) - assert sign == subkey.can_sign - assert encrypt == subkey.can_encrypt - assert authenticate == subkey.can_authenticate + # Check algorithm future-default + ctx.create_subkey(key, algorithm="future-default") - # Check algorithm - res = ctx.create_subkey(key, algorithm="rsa") - subkey = get_subkey(res.fpr) - assert subkey.pubkey_algo == 1 + # Check passphrase protection. For this we create a new key + # so that we have a key with just one encryption subkey. + bravo_res = ctx.create_key(bravo, certify=True) + bravo_key = ctx.get_key(bravo_res.fpr) + assert len(bravo_key.subkeys) == 1, "Expected one primary key and no subkeys" - # Check algorithm with size - res = ctx.create_subkey(key, algorithm="rsa1024") - subkey = get_subkey(res.fpr) - assert subkey.pubkey_algo == 1 - assert subkey.length == 1024 + passphrase = "streng geheim" + res = ctx.create_subkey(bravo_key, passphrase=passphrase) + ciphertext, _, _ = ctx.encrypt(b"hello there", + recipients=[ctx.get_key(bravo_res.fpr)]) - # Check algorithm future-default - ctx.create_subkey(key, algorithm="future-default") + cb_called = False + def cb(*args): + global cb_called + cb_called = True + return passphrase + ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK + ctx.set_passphrase_cb(cb) - # Check passphrase protection. For this we create a new key - # so that we have a key with just one encryption subkey. - bravo_res = ctx.create_key(bravo, certify=True) - bravo_key = ctx.get_key(bravo_res.fpr) - assert len(bravo_key.subkeys) == 1, "Expected one primary key and no subkeys" - - passphrase = "streng geheim" - res = ctx.create_subkey(bravo_key, passphrase=passphrase) - ciphertext, _, _ = ctx.encrypt(b"hello there", - recipients=[ctx.get_key(bravo_res.fpr)]) - - cb_called = False - def cb(*args): - global cb_called - cb_called = True - return passphrase - ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK - ctx.set_passphrase_cb(cb) - - plaintext, _, _ = ctx.decrypt(ciphertext) - assert plaintext == b"hello there" - assert cb_called + plaintext, _, _ = ctx.decrypt(ciphertext) + assert plaintext == b"hello there" + assert cb_called