python: Fix teardown of ephemeral contexts.

* lang/python/tests/support.py (EphemeralContext): New function.
* lang/python/tests/t-quick-key-creation.py: Use the new function to
manage ephemeral contexts.
* lang/python/tests/t-quick-key-manipulation.py: Likewise.
* lang/python/tests/t-quick-subkey-creation.py: Likewise.
--

Previously, there was a problem with cleaning up ephemeral home
directories.  shutil.rmtree deleted the agents main socket, gpg-agent
detected that, and deleted the other sockets as well, racing
shutil.rmtree which did not cope will with that.

Fix this by asking the agent nicely to shut down.

Signed-off-by: Justus Winter <justus@g10code.com>
This commit is contained in:
Justus Winter 2017-02-17 12:18:56 +01:00
parent 9350168a1e
commit de8494b16b
No known key found for this signature in database
GPG Key ID: DD1A52F9DA8C9020
4 changed files with 232 additions and 242 deletions

View File

@ -18,9 +18,12 @@
from __future__ import absolute_import, print_function, unicode_literals
del absolute_import, print_function, unicode_literals
import contextlib
import shutil
import sys
import os
import tempfile
import time
import gpg
# known keys
@ -85,5 +88,24 @@ else:
self.path = tempfile.mkdtemp()
return self.path
def __exit__(self, *args):
import shutil
shutil.rmtree(self.path)
@contextlib.contextmanager
def EphemeralContext():
with TemporaryDirectory() as tmp:
home = os.environ['GNUPGHOME']
shutil.copy(os.path.join(home, "gpg.conf"), tmp)
shutil.copy(os.path.join(home, "gpg-agent.conf"), tmp)
with gpg.Context(home_dir=tmp) as ctx:
yield ctx
# Ask the agent to quit.
agent_socket = os.path.join(tmp, "S.gpg-agent")
ctx.protocol = gpg.constants.protocol.ASSUAN
ctx.set_engine_info(ctx.protocol, file_name=agent_socket)
ctx.assuan_transact(["KILLAGENT"])
# Block until it is really gone.
while os.path.exists(agent_socket):
time.sleep(.01)

View File

@ -22,42 +22,33 @@ del absolute_import, print_function, unicode_literals
import gpg
import itertools
import os
import shutil
import time
import support
alpha = "Alpha <alpha@invalid.example.net>"
def copy_configuration(destination):
home = os.environ['GNUPGHOME']
shutil.copy(os.path.join(home, "gpg.conf"), destination)
shutil.copy(os.path.join(home, "gpg-agent.conf"), destination)
with support.EphemeralContext() as ctx:
res = ctx.create_key(alpha)
with support.TemporaryDirectory() as tmp:
copy_configuration(tmp)
with gpg.Context(home_dir=tmp) as ctx:
res = ctx.create_key(alpha)
keys = list(ctx.keylist())
assert len(keys) == 1, "Weird number of keys created"
keys = list(ctx.keylist())
assert len(keys) == 1, "Weird number of keys created"
key = keys[0]
assert key.fpr == res.fpr
assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
assert key.subkeys[0].expires > 0, "Expected primary key to expire"
key = keys[0]
assert key.fpr == res.fpr
assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
assert key.subkeys[0].expires > 0, "Expected primary key to expire"
# Try to create a key with the same UID
try:
ctx.create_key(alpha)
assert False, "Expected an error but got none"
except gpg.errors.GpgError as e:
pass
# Try to create a key with the same UID
try:
ctx.create_key(alpha)
assert False, "Expected an error but got none"
except gpg.errors.GpgError as e:
pass
# Try to create a key with the same UID, now with force!
res2 = ctx.create_key(alpha, force=True)
assert res.fpr != res2.fpr
# Try to create a key with the same UID, now with force!
res2 = ctx.create_key(alpha, force=True)
assert res.fpr != res2.fpr
# From here on, we use one context, and create unique UIDs
@ -67,85 +58,82 @@ def make_uid():
uid_counter += 1
return "user{0}@invalid.example.org".format(uid_counter)
with support.TemporaryDirectory() as tmp:
copy_configuration(tmp)
with gpg.Context(home_dir=tmp) as ctx:
with support.EphemeralContext() as ctx:
# Check gpg.constants.create.NOEXPIRE...
res = ctx.create_key(make_uid(), expires=False)
key = ctx.get_key(res.fpr, secret=True)
assert key.fpr == res.fpr
assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
assert key.subkeys[0].expires == 0, "Expected primary key not to expire"
# Check gpg.constants.create.NOEXPIRE...
res = ctx.create_key(make_uid(), expires=False)
t = 2 * 24 * 60 * 60
slack = 5 * 60
res = ctx.create_key(make_uid(), expires_in=t)
key = ctx.get_key(res.fpr, secret=True)
assert key.fpr == res.fpr
assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
assert abs(time.time() + t - key.subkeys[0].expires) < slack, \
"Primary keys expiration time is off"
# Check capabilities
for sign, encrypt, certify, authenticate in itertools.product([False, True],
[False, True],
[False, True],
[False, True]):
# Filter some out
if not (sign or encrypt or certify or authenticate):
# This triggers the default capabilities tested before.
continue
if (sign or encrypt or authenticate) and not certify:
# The primary key always certifies.
continue
res = ctx.create_key(make_uid(), algorithm="rsa",
sign=sign, encrypt=encrypt, certify=certify,
authenticate=authenticate)
key = ctx.get_key(res.fpr, secret=True)
assert key.fpr == res.fpr
assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
assert key.subkeys[0].expires == 0, "Expected primary key not to expire"
assert len(key.subkeys) == 1, \
"Expected no subkey for non-default capabilities"
t = 2 * 24 * 60 * 60
slack = 5 * 60
res = ctx.create_key(make_uid(), expires_in=t)
key = ctx.get_key(res.fpr, secret=True)
assert key.fpr == res.fpr
assert len(key.subkeys) == 2, "Expected one primary key and one subkey"
assert abs(time.time() + t - key.subkeys[0].expires) < slack, \
"Primary keys expiration time is off"
p = key.subkeys[0]
assert sign == p.can_sign
assert encrypt == p.can_encrypt
assert certify == p.can_certify
assert authenticate == p.can_authenticate
# Check capabilities
for sign, encrypt, certify, authenticate in itertools.product([False, True],
[False, True],
[False, True],
[False, True]):
# Filter some out
if not (sign or encrypt or certify or authenticate):
# This triggers the default capabilities tested before.
continue
if (sign or encrypt or authenticate) and not certify:
# The primary key always certifies.
continue
# Check algorithm
res = ctx.create_key(make_uid(), algorithm="rsa")
key = ctx.get_key(res.fpr, secret=True)
assert key.fpr == res.fpr
for k in key.subkeys:
assert k.pubkey_algo == 1
res = ctx.create_key(make_uid(), algorithm="rsa",
sign=sign, encrypt=encrypt, certify=certify,
authenticate=authenticate)
key = ctx.get_key(res.fpr, secret=True)
assert key.fpr == res.fpr
assert len(key.subkeys) == 1, \
"Expected no subkey for non-default capabilities"
# Check algorithm with size
res = ctx.create_key(make_uid(), algorithm="rsa1024")
key = ctx.get_key(res.fpr, secret=True)
assert key.fpr == res.fpr
for k in key.subkeys:
assert k.pubkey_algo == 1
assert k.length == 1024
p = key.subkeys[0]
assert sign == p.can_sign
assert encrypt == p.can_encrypt
assert certify == p.can_certify
assert authenticate == p.can_authenticate
# Check algorithm future-default
ctx.create_key(make_uid(), algorithm="future-default")
# Check algorithm
res = ctx.create_key(make_uid(), algorithm="rsa")
key = ctx.get_key(res.fpr, secret=True)
assert key.fpr == res.fpr
for k in key.subkeys:
assert k.pubkey_algo == 1
# Check passphrase protection
recipient = make_uid()
passphrase = "streng geheim"
res = ctx.create_key(recipient, passphrase=passphrase)
ciphertext, _, _ = ctx.encrypt(b"hello there", recipients=[ctx.get_key(res.fpr)])
# Check algorithm with size
res = ctx.create_key(make_uid(), algorithm="rsa1024")
key = ctx.get_key(res.fpr, secret=True)
assert key.fpr == res.fpr
for k in key.subkeys:
assert k.pubkey_algo == 1
assert k.length == 1024
cb_called = False
def cb(*args):
global cb_called
cb_called = True
return passphrase
ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK
ctx.set_passphrase_cb(cb)
# Check algorithm future-default
ctx.create_key(make_uid(), algorithm="future-default")
# Check passphrase protection
recipient = make_uid()
passphrase = "streng geheim"
res = ctx.create_key(recipient, passphrase=passphrase)
ciphertext, _, _ = ctx.encrypt(b"hello there", recipients=[ctx.get_key(res.fpr)])
cb_called = False
def cb(*args):
global cb_called
cb_called = True
return passphrase
ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK
ctx.set_passphrase_cb(cb)
plaintext, _, _ = ctx.decrypt(ciphertext)
assert plaintext == b"hello there"
assert cb_called
plaintext, _, _ = ctx.decrypt(ciphertext)
assert plaintext == b"hello there"
assert cb_called

View File

@ -21,83 +21,72 @@ from __future__ import absolute_import, print_function, unicode_literals
del absolute_import, print_function, unicode_literals
import gpg
import itertools
import os
import shutil
import time
import support
alpha = "Alpha <alpha@invalid.example.net>"
bravo = "Bravo <bravo@invalid.example.net>"
def copy_configuration(destination):
home = os.environ['GNUPGHOME']
shutil.copy(os.path.join(home, "gpg.conf"), destination)
shutil.copy(os.path.join(home, "gpg-agent.conf"), destination)
with support.EphemeralContext() as ctx:
res = ctx.create_key(alpha, certify=True)
key = ctx.get_key(res.fpr)
assert len(key.subkeys) == 1, "Expected one primary key and no subkeys"
assert len(key.uids) == 1, "Expected exactly one UID"
with support.TemporaryDirectory() as tmp:
copy_configuration(tmp)
with gpg.Context(home_dir=tmp) as ctx:
res = ctx.create_key(alpha, certify=True)
def get_uid(uid):
key = ctx.get_key(res.fpr)
assert len(key.subkeys) == 1, "Expected one primary key and no subkeys"
assert len(key.uids) == 1, "Expected exactly one UID"
for u in key.uids:
if u.uid == uid:
return u
return None
def get_uid(uid):
key = ctx.get_key(res.fpr)
for u in key.uids:
if u.uid == uid:
return u
return None
# sanity check
uid = get_uid(alpha)
assert uid, "UID alpha not found"
assert uid.revoked == 0
# sanity check
uid = get_uid(alpha)
assert uid, "UID alpha not found"
assert uid.revoked == 0
# add bravo
ctx.key_add_uid(key, bravo)
uid = get_uid(bravo)
assert uid, "UID bravo not found"
assert uid.revoked == 0
# add bravo
ctx.key_add_uid(key, bravo)
uid = get_uid(bravo)
assert uid, "UID bravo not found"
assert uid.revoked == 0
# revoke alpha
ctx.key_revoke_uid(key, alpha)
uid = get_uid(alpha)
assert uid, "UID alpha not found"
assert uid.revoked == 1
uid = get_uid(bravo)
assert uid, "UID bravo not found"
assert uid.revoked == 0
# revoke alpha
# try to revoke the last UID
try:
ctx.key_revoke_uid(key, alpha)
uid = get_uid(alpha)
assert uid, "UID alpha not found"
assert uid.revoked == 1
uid = get_uid(bravo)
assert uid, "UID bravo not found"
assert uid.revoked == 0
# IMHO this should fail. issue2961.
# assert False, "Expected an error but got none"
except gpg.errors.GpgError:
pass
# try to revoke the last UID
try:
ctx.key_revoke_uid(key, alpha)
# IMHO this should fail. issue2961.
# assert False, "Expected an error but got none"
except gpg.errors.GpgError:
pass
# Everything should be the same
uid = get_uid(alpha)
assert uid, "UID alpha not found"
assert uid.revoked == 1
uid = get_uid(bravo)
assert uid, "UID bravo not found"
assert uid.revoked == 0
# Everything should be the same
uid = get_uid(alpha)
assert uid, "UID alpha not found"
assert uid.revoked == 1
uid = get_uid(bravo)
assert uid, "UID bravo not found"
assert uid.revoked == 0
# try to revoke a non-existent UID
try:
ctx.key_revoke_uid(key, "i dont exist")
# IMHO this should fail. issue2963.
# assert False, "Expected an error but got none"
except gpg.errors.GpgError:
pass
# try to revoke a non-existent UID
try:
ctx.key_revoke_uid(key, "i dont exist")
# IMHO this should fail. issue2963.
# assert False, "Expected an error but got none"
except gpg.errors.GpgError:
pass
# try to add an pre-existent UID
try:
ctx.key_add_uid(key, bravo)
assert False, "Expected an error but got none"
except gpg.errors.GpgError:
pass
# try to add an pre-existent UID
try:
ctx.key_add_uid(key, bravo)
assert False, "Expected an error but got none"
except gpg.errors.GpgError:
pass

View File

@ -22,8 +22,6 @@ del absolute_import, print_function, unicode_literals
import gpg
import itertools
import os
import shutil
import time
import support
@ -31,91 +29,84 @@ import support
alpha = "Alpha <alpha@invalid.example.net>"
bravo = "Bravo <bravo@invalid.example.net>"
def copy_configuration(destination):
home = os.environ['GNUPGHOME']
shutil.copy(os.path.join(home, "gpg.conf"), destination)
shutil.copy(os.path.join(home, "gpg-agent.conf"), destination)
with support.EphemeralContext() as ctx:
res = ctx.create_key(alpha, certify=True)
keys = list(ctx.keylist())
assert len(keys) == 1, "Weird number of keys created"
key = keys[0]
assert key.fpr == res.fpr
assert len(key.subkeys) == 1, "Expected one primary key and no subkeys"
with support.TemporaryDirectory() as tmp:
copy_configuration(tmp)
with gpg.Context(home_dir=tmp) as ctx:
res = ctx.create_key(alpha, certify=True)
keys = list(ctx.keylist())
assert len(keys) == 1, "Weird number of keys created"
key = keys[0]
assert key.fpr == res.fpr
assert len(key.subkeys) == 1, "Expected one primary key and no subkeys"
def get_subkey(fpr):
k = ctx.get_key(fpr)
for sk in k.subkeys:
if sk.fpr == fpr:
return sk
return None
def get_subkey(fpr):
k = ctx.get_key(fpr)
for sk in k.subkeys:
if sk.fpr == fpr:
return sk
return None
# Check gpg.constants.create.NOEXPIRE...
res = ctx.create_subkey(key, expires=False)
subkey = get_subkey(res.fpr)
assert subkey.expires == 0, "Expected subkey not to expire"
assert subkey.can_encrypt, \
"Default subkey capabilities do not include encryption"
# Check gpg.constants.create.NOEXPIRE...
res = ctx.create_subkey(key, expires=False)
t = 2 * 24 * 60 * 60
slack = 5 * 60
res = ctx.create_subkey(key, expires_in=t)
subkey = get_subkey(res.fpr)
assert abs(time.time() + t - subkey.expires) < slack, \
"subkeys expiration time is off"
# Check capabilities
for sign, encrypt, authenticate in itertools.product([False, True],
[False, True],
[False, True]):
# Filter some out
if not (sign or encrypt or authenticate):
# This triggers the default capabilities tested before.
continue
res = ctx.create_subkey(key, sign=sign, encrypt=encrypt,
authenticate=authenticate)
subkey = get_subkey(res.fpr)
assert subkey.expires == 0, "Expected subkey not to expire"
assert subkey.can_encrypt, \
"Default subkey capabilities do not include encryption"
assert sign == subkey.can_sign
assert encrypt == subkey.can_encrypt
assert authenticate == subkey.can_authenticate
t = 2 * 24 * 60 * 60
slack = 5 * 60
res = ctx.create_subkey(key, expires_in=t)
subkey = get_subkey(res.fpr)
assert abs(time.time() + t - subkey.expires) < slack, \
"subkeys expiration time is off"
# Check algorithm
res = ctx.create_subkey(key, algorithm="rsa")
subkey = get_subkey(res.fpr)
assert subkey.pubkey_algo == 1
# Check capabilities
for sign, encrypt, authenticate in itertools.product([False, True],
[False, True],
[False, True]):
# Filter some out
if not (sign or encrypt or authenticate):
# This triggers the default capabilities tested before.
continue
# Check algorithm with size
res = ctx.create_subkey(key, algorithm="rsa1024")
subkey = get_subkey(res.fpr)
assert subkey.pubkey_algo == 1
assert subkey.length == 1024
res = ctx.create_subkey(key, sign=sign, encrypt=encrypt,
authenticate=authenticate)
subkey = get_subkey(res.fpr)
assert sign == subkey.can_sign
assert encrypt == subkey.can_encrypt
assert authenticate == subkey.can_authenticate
# Check algorithm future-default
ctx.create_subkey(key, algorithm="future-default")
# Check algorithm
res = ctx.create_subkey(key, algorithm="rsa")
subkey = get_subkey(res.fpr)
assert subkey.pubkey_algo == 1
# Check passphrase protection. For this we create a new key
# so that we have a key with just one encryption subkey.
bravo_res = ctx.create_key(bravo, certify=True)
bravo_key = ctx.get_key(bravo_res.fpr)
assert len(bravo_key.subkeys) == 1, "Expected one primary key and no subkeys"
# Check algorithm with size
res = ctx.create_subkey(key, algorithm="rsa1024")
subkey = get_subkey(res.fpr)
assert subkey.pubkey_algo == 1
assert subkey.length == 1024
passphrase = "streng geheim"
res = ctx.create_subkey(bravo_key, passphrase=passphrase)
ciphertext, _, _ = ctx.encrypt(b"hello there",
recipients=[ctx.get_key(bravo_res.fpr)])
# Check algorithm future-default
ctx.create_subkey(key, algorithm="future-default")
cb_called = False
def cb(*args):
global cb_called
cb_called = True
return passphrase
ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK
ctx.set_passphrase_cb(cb)
# Check passphrase protection. For this we create a new key
# so that we have a key with just one encryption subkey.
bravo_res = ctx.create_key(bravo, certify=True)
bravo_key = ctx.get_key(bravo_res.fpr)
assert len(bravo_key.subkeys) == 1, "Expected one primary key and no subkeys"
passphrase = "streng geheim"
res = ctx.create_subkey(bravo_key, passphrase=passphrase)
ciphertext, _, _ = ctx.encrypt(b"hello there",
recipients=[ctx.get_key(bravo_res.fpr)])
cb_called = False
def cb(*args):
global cb_called
cb_called = True
return passphrase
ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK
ctx.set_passphrase_cb(cb)
plaintext, _, _ = ctx.decrypt(ciphertext)
assert plaintext == b"hello there"
assert cb_called
plaintext, _, _ = ctx.decrypt(ciphertext)
assert plaintext == b"hello there"
assert cb_called