diff options
| author | Justus Winter <[email protected]> | 2017-02-17 11:18:56 +0000 | 
|---|---|---|
| committer | Justus Winter <[email protected]> | 2017-02-17 11:18:56 +0000 | 
| commit | de8494b16bc50c60a8438f2cae1f8c88e8949f7a (patch) | |
| tree | 3fae3b3fd43eb9e860fc446415dccd0d6358044c /lang/python/tests | |
| parent | python: Fix using strings as commands in the assuan protocol. (diff) | |
| download | gpgme-de8494b16bc50c60a8438f2cae1f8c88e8949f7a.tar.gz gpgme-de8494b16bc50c60a8438f2cae1f8c88e8949f7a.zip | |
python: Fix teardown of ephemeral contexts.
* lang/python/tests/support.py (EphemeralContext): New function.
* lang/python/tests/t-quick-key-creation.py: Use the new function to
manage ephemeral contexts.
* lang/python/tests/t-quick-key-manipulation.py: Likewise.
* lang/python/tests/t-quick-subkey-creation.py: Likewise.
--
Previously, there was a problem with cleaning up ephemeral home
directories.  shutil.rmtree deleted the agents main socket, gpg-agent
detected that, and deleted the other sockets as well, racing
shutil.rmtree which did not cope will with that.
Fix this by asking the agent nicely to shut down.
Signed-off-by: Justus Winter <[email protected]>
Diffstat (limited to '')
| -rw-r--r-- | lang/python/tests/support.py | 24 | ||||
| -rwxr-xr-x | lang/python/tests/t-quick-key-creation.py | 200 | ||||
| -rwxr-xr-x | lang/python/tests/t-quick-key-manipulation.py | 117 | ||||
| -rwxr-xr-x | lang/python/tests/t-quick-subkey-creation.py | 169 | 
4 files changed, 250 insertions, 260 deletions
| diff --git a/lang/python/tests/support.py b/lang/python/tests/support.py index ed5bf615..a381270d 100644 --- a/lang/python/tests/support.py +++ b/lang/python/tests/support.py @@ -18,9 +18,12 @@  from __future__ import absolute_import, print_function, unicode_literals  del absolute_import, print_function, unicode_literals +import contextlib +import shutil  import sys  import os  import tempfile +import time  import gpg  # known keys @@ -85,5 +88,24 @@ else:              self.path = tempfile.mkdtemp()              return self.path          def __exit__(self, *args): -            import shutil              shutil.rmtree(self.path) + +def EphemeralContext(): +    with TemporaryDirectory() as tmp: +        home = os.environ['GNUPGHOME'] +        shutil.copy(os.path.join(home, "gpg.conf"), tmp) +        shutil.copy(os.path.join(home, "gpg-agent.conf"), tmp) + +        with gpg.Context(home_dir=tmp) as ctx: +            yield ctx + +            # Ask the agent to quit. +            agent_socket = os.path.join(tmp, "S.gpg-agent") +            ctx.protocol = gpg.constants.protocol.ASSUAN +            ctx.set_engine_info(ctx.protocol, file_name=agent_socket) +            ctx.assuan_transact(["KILLAGENT"]) + +            # Block until it is really gone. +            while os.path.exists(agent_socket): +                time.sleep(.01) diff --git a/lang/python/tests/t-quick-key-creation.py b/lang/python/tests/t-quick-key-creation.py index ea63dc3f..c642c5b4 100755 --- a/lang/python/tests/t-quick-key-creation.py +++ b/lang/python/tests/t-quick-key-creation.py @@ -22,42 +22,33 @@ del absolute_import, print_function, unicode_literals  import gpg  import itertools -import os -import shutil  import time  import support  alpha = "Alpha <[email protected]>" -def copy_configuration(destination): -    home = os.environ['GNUPGHOME'] -    shutil.copy(os.path.join(home, "gpg.conf"), destination) -    shutil.copy(os.path.join(home, "gpg-agent.conf"), destination) +with support.EphemeralContext() as ctx: +    res = ctx.create_key(alpha) -with support.TemporaryDirectory() as tmp: -    copy_configuration(tmp) -    with gpg.Context(home_dir=tmp) as ctx: -        res = ctx.create_key(alpha) +    keys = list(ctx.keylist()) +    assert len(keys) == 1, "Weird number of keys created" -        keys = list(ctx.keylist()) -        assert len(keys) == 1, "Weird number of keys created" +    key = keys[0] +    assert key.fpr == res.fpr +    assert len(key.subkeys) == 2, "Expected one primary key and one subkey" +    assert key.subkeys[0].expires > 0, "Expected primary key to expire" -        key = keys[0] -        assert key.fpr == res.fpr -        assert len(key.subkeys) == 2, "Expected one primary key and one subkey" -        assert key.subkeys[0].expires > 0, "Expected primary key to expire" - -        # Try to create a key with the same UID -        try: -            ctx.create_key(alpha) -            assert False, "Expected an error but got none" -        except gpg.errors.GpgError as e: -            pass +    # Try to create a key with the same UID +    try: +        ctx.create_key(alpha) +        assert False, "Expected an error but got none" +    except gpg.errors.GpgError as e: +        pass -        # Try to create a key with the same UID, now with force! -        res2 = ctx.create_key(alpha, force=True) -        assert res.fpr != res2.fpr +    # Try to create a key with the same UID, now with force! +    res2 = ctx.create_key(alpha, force=True) +    assert res.fpr != res2.fpr  # From here on, we use one context, and create unique UIDs @@ -67,85 +58,82 @@ def make_uid():      uid_counter += 1      return "user{0}@invalid.example.org".format(uid_counter) -with support.TemporaryDirectory() as tmp: -    copy_configuration(tmp) -    with gpg.Context(home_dir=tmp) as ctx: - -        # Check gpg.constants.create.NOEXPIRE... -        res = ctx.create_key(make_uid(), expires=False) -        key = ctx.get_key(res.fpr, secret=True) -        assert key.fpr == res.fpr -        assert len(key.subkeys) == 2, "Expected one primary key and one subkey" -        assert key.subkeys[0].expires == 0, "Expected primary key not to expire" - -        t = 2 * 24 * 60 * 60 -        slack = 5 * 60 -        res = ctx.create_key(make_uid(), expires_in=t) -        key = ctx.get_key(res.fpr, secret=True) -        assert key.fpr == res.fpr -        assert len(key.subkeys) == 2, "Expected one primary key and one subkey" -        assert abs(time.time() + t - key.subkeys[0].expires) < slack, \ -            "Primary keys expiration time is off" - -        # Check capabilities -        for sign, encrypt, certify, authenticate in itertools.product([False, True], -                                                                      [False, True], -                                                                      [False, True], -                                                                      [False, True]): -            # Filter some out -            if not (sign or encrypt or certify or authenticate): -                # This triggers the default capabilities tested before. -                continue -            if (sign or encrypt or authenticate) and not certify: -                # The primary key always certifies. -                continue - -            res = ctx.create_key(make_uid(), algorithm="rsa", -                                 sign=sign, encrypt=encrypt, certify=certify, -                                 authenticate=authenticate) -            key = ctx.get_key(res.fpr, secret=True) -            assert key.fpr == res.fpr -            assert len(key.subkeys) == 1, \ -                "Expected no subkey for non-default capabilities" - -            p = key.subkeys[0] -            assert sign == p.can_sign -            assert encrypt == p.can_encrypt -            assert certify == p.can_certify -            assert authenticate == p.can_authenticate - -        # Check algorithm -        res = ctx.create_key(make_uid(), algorithm="rsa") -        key = ctx.get_key(res.fpr, secret=True) -        assert key.fpr == res.fpr -        for k in key.subkeys: -            assert k.pubkey_algo == 1 - -        # Check algorithm with size -        res = ctx.create_key(make_uid(), algorithm="rsa1024") +with support.EphemeralContext() as ctx: +    # Check gpg.constants.create.NOEXPIRE... +    res = ctx.create_key(make_uid(), expires=False) +    key = ctx.get_key(res.fpr, secret=True) +    assert key.fpr == res.fpr +    assert len(key.subkeys) == 2, "Expected one primary key and one subkey" +    assert key.subkeys[0].expires == 0, "Expected primary key not to expire" + +    t = 2 * 24 * 60 * 60 +    slack = 5 * 60 +    res = ctx.create_key(make_uid(), expires_in=t) +    key = ctx.get_key(res.fpr, secret=True) +    assert key.fpr == res.fpr +    assert len(key.subkeys) == 2, "Expected one primary key and one subkey" +    assert abs(time.time() + t - key.subkeys[0].expires) < slack, \ +        "Primary keys expiration time is off" + +    # Check capabilities +    for sign, encrypt, certify, authenticate in itertools.product([False, True], +                                                                  [False, True], +                                                                  [False, True], +                                                                  [False, True]): +        # Filter some out +        if not (sign or encrypt or certify or authenticate): +            # This triggers the default capabilities tested before. +            continue +        if (sign or encrypt or authenticate) and not certify: +            # The primary key always certifies. +            continue + +        res = ctx.create_key(make_uid(), algorithm="rsa", +                             sign=sign, encrypt=encrypt, certify=certify, +                             authenticate=authenticate)          key = ctx.get_key(res.fpr, secret=True)          assert key.fpr == res.fpr -        for k in key.subkeys: -            assert k.pubkey_algo == 1 -            assert k.length == 1024 - -        # Check algorithm future-default -        ctx.create_key(make_uid(), algorithm="future-default") - -        # Check passphrase protection -        recipient = make_uid() -        passphrase = "streng geheim" -        res = ctx.create_key(recipient, passphrase=passphrase) -        ciphertext, _, _ = ctx.encrypt(b"hello there", recipients=[ctx.get_key(res.fpr)]) - -        cb_called = False -        def cb(*args): -            global cb_called -            cb_called = True -            return passphrase -        ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK -        ctx.set_passphrase_cb(cb) - -        plaintext, _, _ = ctx.decrypt(ciphertext) -        assert plaintext == b"hello there" -        assert cb_called +        assert len(key.subkeys) == 1, \ +            "Expected no subkey for non-default capabilities" + +        p = key.subkeys[0] +        assert sign == p.can_sign +        assert encrypt == p.can_encrypt +        assert certify == p.can_certify +        assert authenticate == p.can_authenticate + +    # Check algorithm +    res = ctx.create_key(make_uid(), algorithm="rsa") +    key = ctx.get_key(res.fpr, secret=True) +    assert key.fpr == res.fpr +    for k in key.subkeys: +        assert k.pubkey_algo == 1 + +    # Check algorithm with size +    res = ctx.create_key(make_uid(), algorithm="rsa1024") +    key = ctx.get_key(res.fpr, secret=True) +    assert key.fpr == res.fpr +    for k in key.subkeys: +        assert k.pubkey_algo == 1 +        assert k.length == 1024 + +    # Check algorithm future-default +    ctx.create_key(make_uid(), algorithm="future-default") + +    # Check passphrase protection +    recipient = make_uid() +    passphrase = "streng geheim" +    res = ctx.create_key(recipient, passphrase=passphrase) +    ciphertext, _, _ = ctx.encrypt(b"hello there", recipients=[ctx.get_key(res.fpr)]) + +    cb_called = False +    def cb(*args): +        global cb_called +        cb_called = True +        return passphrase +    ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK +    ctx.set_passphrase_cb(cb) + +    plaintext, _, _ = ctx.decrypt(ciphertext) +    assert plaintext == b"hello there" +    assert cb_called diff --git a/lang/python/tests/t-quick-key-manipulation.py b/lang/python/tests/t-quick-key-manipulation.py index 62c395ab..12c18ce5 100755 --- a/lang/python/tests/t-quick-key-manipulation.py +++ b/lang/python/tests/t-quick-key-manipulation.py @@ -21,83 +21,72 @@ from __future__ import absolute_import, print_function, unicode_literals  del absolute_import, print_function, unicode_literals  import gpg -import itertools -import os -import shutil -import time  import support  alpha = "Alpha <[email protected]>"  bravo = "Bravo <[email protected]>" -def copy_configuration(destination): -    home = os.environ['GNUPGHOME'] -    shutil.copy(os.path.join(home, "gpg.conf"), destination) -    shutil.copy(os.path.join(home, "gpg-agent.conf"), destination) +with support.EphemeralContext() as ctx: +    res = ctx.create_key(alpha, certify=True) +    key = ctx.get_key(res.fpr) +    assert len(key.subkeys) == 1, "Expected one primary key and no subkeys" +    assert len(key.uids) == 1, "Expected exactly one UID" -with support.TemporaryDirectory() as tmp: -    copy_configuration(tmp) -    with gpg.Context(home_dir=tmp) as ctx: -        res = ctx.create_key(alpha, certify=True) +    def get_uid(uid):          key = ctx.get_key(res.fpr) -        assert len(key.subkeys) == 1, "Expected one primary key and no subkeys" -        assert len(key.uids) == 1, "Expected exactly one UID" +        for u in key.uids: +            if u.uid == uid: +                return u +        return None -        def get_uid(uid): -            key = ctx.get_key(res.fpr) -            for u in key.uids: -                if u.uid == uid: -                    return u -            return None +    # sanity check +    uid = get_uid(alpha) +    assert uid, "UID alpha not found" +    assert uid.revoked == 0 -        # sanity check -        uid = get_uid(alpha) -        assert uid, "UID alpha not found" -        assert uid.revoked == 0 +    # add bravo +    ctx.key_add_uid(key, bravo) +    uid = get_uid(bravo) +    assert uid, "UID bravo not found" +    assert uid.revoked == 0 -        # add bravo -        ctx.key_add_uid(key, bravo) -        uid = get_uid(bravo) -        assert uid, "UID bravo not found" -        assert uid.revoked == 0 +    # revoke alpha +    ctx.key_revoke_uid(key, alpha) +    uid = get_uid(alpha) +    assert uid, "UID alpha not found" +    assert uid.revoked == 1 +    uid = get_uid(bravo) +    assert uid, "UID bravo not found" +    assert uid.revoked == 0 -        # revoke alpha +    # try to revoke the last UID +    try:          ctx.key_revoke_uid(key, alpha) -        uid = get_uid(alpha) -        assert uid, "UID alpha not found" -        assert uid.revoked == 1 -        uid = get_uid(bravo) -        assert uid, "UID bravo not found" -        assert uid.revoked == 0 - -        # try to revoke the last UID -        try: -            ctx.key_revoke_uid(key, alpha) -            # IMHO this should fail.  issue2961. -            # assert False, "Expected an error but got none" -        except gpg.errors.GpgError: -            pass +        # IMHO this should fail.  issue2961. +        # assert False, "Expected an error but got none" +    except gpg.errors.GpgError: +        pass -        # Everything should be the same -        uid = get_uid(alpha) -        assert uid, "UID alpha not found" -        assert uid.revoked == 1 -        uid = get_uid(bravo) -        assert uid, "UID bravo not found" -        assert uid.revoked == 0 +    # Everything should be the same +    uid = get_uid(alpha) +    assert uid, "UID alpha not found" +    assert uid.revoked == 1 +    uid = get_uid(bravo) +    assert uid, "UID bravo not found" +    assert uid.revoked == 0 -        # try to revoke a non-existent UID -        try: -            ctx.key_revoke_uid(key, "i dont exist") -            # IMHO this should fail.  issue2963. -            # assert False, "Expected an error but got none" -        except gpg.errors.GpgError: -            pass +    # try to revoke a non-existent UID +    try: +        ctx.key_revoke_uid(key, "i dont exist") +        # IMHO this should fail.  issue2963. +        # assert False, "Expected an error but got none" +    except gpg.errors.GpgError: +        pass -        # try to add an pre-existent UID -        try: -            ctx.key_add_uid(key, bravo) -            assert False, "Expected an error but got none" -        except gpg.errors.GpgError: -            pass +    # try to add an pre-existent UID +    try: +        ctx.key_add_uid(key, bravo) +        assert False, "Expected an error but got none" +    except gpg.errors.GpgError: +        pass diff --git a/lang/python/tests/t-quick-subkey-creation.py b/lang/python/tests/t-quick-subkey-creation.py index 0d9f71fb..ad4f35c6 100755 --- a/lang/python/tests/t-quick-subkey-creation.py +++ b/lang/python/tests/t-quick-subkey-creation.py @@ -22,8 +22,6 @@ del absolute_import, print_function, unicode_literals  import gpg  import itertools -import os -import shutil  import time  import support @@ -31,91 +29,84 @@ import support  alpha = "Alpha <[email protected]>"  bravo = "Bravo <[email protected]>" -def copy_configuration(destination): -    home = os.environ['GNUPGHOME'] -    shutil.copy(os.path.join(home, "gpg.conf"), destination) -    shutil.copy(os.path.join(home, "gpg-agent.conf"), destination) - -with support.TemporaryDirectory() as tmp: -    copy_configuration(tmp) -    with gpg.Context(home_dir=tmp) as ctx: -        res = ctx.create_key(alpha, certify=True) -        keys = list(ctx.keylist()) -        assert len(keys) == 1, "Weird number of keys created" -        key = keys[0] -        assert key.fpr == res.fpr -        assert len(key.subkeys) == 1, "Expected one primary key and no subkeys" - -        def get_subkey(fpr): -            k = ctx.get_key(fpr) -            for sk in k.subkeys: -                if sk.fpr == fpr: -                    return sk -            return None - -        # Check gpg.constants.create.NOEXPIRE... -        res = ctx.create_subkey(key, expires=False) +with support.EphemeralContext() as ctx: +    res = ctx.create_key(alpha, certify=True) +    keys = list(ctx.keylist()) +    assert len(keys) == 1, "Weird number of keys created" +    key = keys[0] +    assert key.fpr == res.fpr +    assert len(key.subkeys) == 1, "Expected one primary key and no subkeys" + +    def get_subkey(fpr): +        k = ctx.get_key(fpr) +        for sk in k.subkeys: +            if sk.fpr == fpr: +                return sk +        return None + +    # Check gpg.constants.create.NOEXPIRE... +    res = ctx.create_subkey(key, expires=False) +    subkey = get_subkey(res.fpr) +    assert subkey.expires == 0, "Expected subkey not to expire" +    assert subkey.can_encrypt, \ +        "Default subkey capabilities do not include encryption" + +    t = 2 * 24 * 60 * 60 +    slack = 5 * 60 +    res = ctx.create_subkey(key, expires_in=t) +    subkey = get_subkey(res.fpr) +    assert abs(time.time() + t - subkey.expires) < slack, \ +        "subkeys expiration time is off" + +    # Check capabilities +    for sign, encrypt, authenticate in itertools.product([False, True], +                                                         [False, True], +                                                         [False, True]): +        # Filter some out +        if not (sign or encrypt or authenticate): +            # This triggers the default capabilities tested before. +            continue + +        res = ctx.create_subkey(key, sign=sign, encrypt=encrypt, +                                authenticate=authenticate)          subkey = get_subkey(res.fpr) -        assert subkey.expires == 0, "Expected subkey not to expire" -        assert subkey.can_encrypt, \ -            "Default subkey capabilities do not include encryption" - -        t = 2 * 24 * 60 * 60 -        slack = 5 * 60 -        res = ctx.create_subkey(key, expires_in=t) -        subkey = get_subkey(res.fpr) -        assert abs(time.time() + t - subkey.expires) < slack, \ -            "subkeys expiration time is off" - -        # Check capabilities -        for sign, encrypt, authenticate in itertools.product([False, True], -                                                             [False, True], -                                                             [False, True]): -            # Filter some out -            if not (sign or encrypt or authenticate): -                # This triggers the default capabilities tested before. -                continue - -            res = ctx.create_subkey(key, sign=sign, encrypt=encrypt, -                                    authenticate=authenticate) -            subkey = get_subkey(res.fpr) -            assert sign == subkey.can_sign -            assert encrypt == subkey.can_encrypt -            assert authenticate == subkey.can_authenticate - -        # Check algorithm -        res = ctx.create_subkey(key, algorithm="rsa") -        subkey = get_subkey(res.fpr) -        assert subkey.pubkey_algo == 1 - -        # Check algorithm with size -        res = ctx.create_subkey(key, algorithm="rsa1024") -        subkey = get_subkey(res.fpr) -        assert subkey.pubkey_algo == 1 -        assert subkey.length == 1024 - -        # Check algorithm future-default -        ctx.create_subkey(key, algorithm="future-default") - -        # Check passphrase protection.  For this we create a new key -        # so that we have a key with just one encryption subkey. -        bravo_res = ctx.create_key(bravo, certify=True) -        bravo_key = ctx.get_key(bravo_res.fpr) -        assert len(bravo_key.subkeys) == 1, "Expected one primary key and no subkeys" - -        passphrase = "streng geheim" -        res = ctx.create_subkey(bravo_key, passphrase=passphrase) -        ciphertext, _, _ = ctx.encrypt(b"hello there", -                                       recipients=[ctx.get_key(bravo_res.fpr)]) - -        cb_called = False -        def cb(*args): -            global cb_called -            cb_called = True -            return passphrase -        ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK -        ctx.set_passphrase_cb(cb) - -        plaintext, _, _ = ctx.decrypt(ciphertext) -        assert plaintext == b"hello there" -        assert cb_called +        assert sign == subkey.can_sign +        assert encrypt == subkey.can_encrypt +        assert authenticate == subkey.can_authenticate + +    # Check algorithm +    res = ctx.create_subkey(key, algorithm="rsa") +    subkey = get_subkey(res.fpr) +    assert subkey.pubkey_algo == 1 + +    # Check algorithm with size +    res = ctx.create_subkey(key, algorithm="rsa1024") +    subkey = get_subkey(res.fpr) +    assert subkey.pubkey_algo == 1 +    assert subkey.length == 1024 + +    # Check algorithm future-default +    ctx.create_subkey(key, algorithm="future-default") + +    # Check passphrase protection.  For this we create a new key +    # so that we have a key with just one encryption subkey. +    bravo_res = ctx.create_key(bravo, certify=True) +    bravo_key = ctx.get_key(bravo_res.fpr) +    assert len(bravo_key.subkeys) == 1, "Expected one primary key and no subkeys" + +    passphrase = "streng geheim" +    res = ctx.create_subkey(bravo_key, passphrase=passphrase) +    ciphertext, _, _ = ctx.encrypt(b"hello there", +                                   recipients=[ctx.get_key(bravo_res.fpr)]) + +    cb_called = False +    def cb(*args): +        global cb_called +        cb_called = True +        return passphrase +    ctx.pinentry_mode = gpg.constants.PINENTRY_MODE_LOOPBACK +    ctx.set_passphrase_cb(cb) + +    plaintext, _, _ = ctx.decrypt(ciphertext) +    assert plaintext == b"hello there" +    assert cb_called | 
