python bindings: core — PEP8 compliance

* Fixed most of the PEP8 errors in core.py
* Those remaining may need more than little edits and are a bit
  strange (too clearly the result of a programmer who has spent far
  too much time dealing with Lisp so that for Python it looks
  ... strange).
This commit is contained in:
Ben McGinnes 2018-06-17 15:58:44 +10:00
parent 5a80e75500
commit a5b24ae46c

View File

@ -1,3 +1,19 @@
# -*- coding: utf-8 -*-
import re
import os
import warnings
import weakref
from . import gpgme
from .errors import errorcheck, GPGMEError
from . import constants
from . import errors
from . import util
from __future__ import absolute_import, print_function, unicode_literals
del absolute_import, print_function, unicode_literals
# Copyright (C) 2016-2017 g10 Code GmbH
# Copyright (C) 2004,2008 Igor Belyi <belyi@users.sourceforge.net>
# Copyright (C) 2002 John Goerzen <jgoerzen@complete.org>
@ -24,18 +40,6 @@ and the 'Data' class describing buffers of data.
"""
from __future__ import absolute_import, print_function, unicode_literals
del absolute_import, print_function, unicode_literals
import re
import os
import warnings
import weakref
from . import gpgme
from .errors import errorcheck, GPGMEError
from . import constants
from . import errors
from . import util
class GpgmeWrapper(object):
"""Base wrapper class
@ -64,7 +68,7 @@ class GpgmeWrapper(object):
return hash(repr(self.wrapped))
def __eq__(self, other):
if other == None:
if other is None:
return False
else:
return repr(self.wrapped) == repr(other.wrapped)
@ -102,8 +106,10 @@ class GpgmeWrapper(object):
"{}get_{}".format(self._cprefix, key))
set_func = getattr(gpgme,
"{}set_{}".format(self._cprefix, key))
def get(slf):
return bool(get_func(slf.wrapped))
def set_(slf, value):
set_func(slf.wrapped, bool(value))
@ -116,9 +122,10 @@ class GpgmeWrapper(object):
return get(self)
_munge_docstring = re.compile(r'gpgme_([^(]*)\(([^,]*), (.*\) -> .*)')
def __getattr__(self, key):
"""On-the-fly generation of wrapper methods and properties"""
if key[0] == '_' or self._cprefix == None:
if key[0] == '_' or self._cprefix is None:
return None
if key in self._boolean_properties:
@ -160,6 +167,7 @@ class GpgmeWrapper(object):
else:
super(GpgmeWrapper, self).__setattr__(key, value)
class Context(GpgmeWrapper):
"""Context for cryptographic operations
@ -212,7 +220,7 @@ class Context(GpgmeWrapper):
Helper function to retrieve the results of an operation, or
None if SINK is given.
"""
if sink or data == None:
if sink or data is None:
return None
data.seek(0, os.SEEK_SET)
return data.read()
@ -267,10 +275,11 @@ class Context(GpgmeWrapper):
flags |= expect_sign * constants.ENCRYPT_EXPECT_SIGN
flags |= (not compress) * constants.ENCRYPT_NO_COMPRESS
if passphrase != None:
if passphrase is not None:
old_pinentry_mode = self.pinentry_mode
old_passphrase_cb = getattr(self, '_passphrase_cb', None)
self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
def passphrase_cb(hint, desc, prev_bad, hook=None):
return passphrase
self.set_passphrase_cb(passphrase_cb)
@ -301,7 +310,7 @@ class Context(GpgmeWrapper):
e.results = results
raise e
finally:
if passphrase != None:
if passphrase is not None:
self.pinentry_mode = old_pinentry_mode
if old_passphrase_cb:
self.set_passphrase_cb(*old_passphrase_cb[1:])
@ -344,10 +353,11 @@ class Context(GpgmeWrapper):
"""
plaintext = sink if sink else Data()
if passphrase != None:
if passphrase is not None:
old_pinentry_mode = self.pinentry_mode
old_passphrase_cb = getattr(self, '_passphrase_cb', None)
self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
def passphrase_cb(hint, desc, prev_bad, hook=None):
return passphrase
self.set_passphrase_cb(passphrase_cb)
@ -365,7 +375,7 @@ class Context(GpgmeWrapper):
result, verify_result)
raise e
finally:
if passphrase != None:
if passphrase is not None:
self.pinentry_mode = old_pinentry_mode
if old_passphrase_cb:
self.set_passphrase_cb(*old_passphrase_cb[1:])
@ -382,7 +392,7 @@ class Context(GpgmeWrapper):
for s in verify_result.signatures):
raise errors.BadSignatures(verify_result, results=results)
if verify and verify != True:
if not verify: # was: if verify and verify != True:
missing = list()
for key in verify:
ok = False
@ -515,14 +525,14 @@ class Context(GpgmeWrapper):
Imports the given data into the Context.
Returns:
-- an object describing the results of imported or updated
-- an object describing the results of imported or updated
keys
Raises:
TypeError -- Very rarely.
GPGMEError -- as signaled by the underlying library:
Import status errors, when they occur, will usually
Import status errors, when they occur, will usually
be of NODATA. NO_PUBKEY indicates something
managed to run the function without any
arguments, while an argument of None triggers
@ -592,8 +602,8 @@ class Context(GpgmeWrapper):
self.op_keylist_end()
def create_key(self, userid, algorithm=None, expires_in=0, expires=True,
sign=False, encrypt=False, certify=False, authenticate=False,
passphrase=None, force=False):
sign=False, encrypt=False, certify=False,
authenticate=False, passphrase=None, force=False):
"""Create a primary key
Create a primary key for the user id USERID.
@ -630,9 +640,10 @@ class Context(GpgmeWrapper):
encrypt -- request the encryption capability (see above)
certify -- request the certification capability (see above)
authenticate -- request the authentication capability (see above)
passphrase -- protect the key with a passphrase (default: no passphrase)
force -- force key creation even if a key with the same userid exists
(default: False)
passphrase -- protect the key with a passphrase (default: no
passphrase)
force -- force key creation even if a key with the same userid
exists (default: False)
Returns:
-- an object describing the result of the key creation
@ -645,20 +656,21 @@ class Context(GpgmeWrapper):
old_pinentry_mode = self.pinentry_mode
old_passphrase_cb = getattr(self, '_passphrase_cb', None)
self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
def passphrase_cb(hint, desc, prev_bad, hook=None):
return passphrase
self.set_passphrase_cb(passphrase_cb)
try:
self.op_createkey(userid, algorithm,
0, # reserved
0, # reserved
expires_in,
None, # extrakey
None, # extrakey
((constants.create.SIGN if sign else 0)
| (constants.create.ENCR if encrypt else 0)
| (constants.create.CERT if certify else 0)
| (constants.create.AUTH if authenticate else 0)
| (constants.create.NOPASSWD if passphrase == None else 0)
| (constants.create.NOPASSWD if passphrase is None else 0)
| (0 if expires else constants.create.NOEXPIRE)
| (constants.create.FORCE if force else 0)))
finally:
@ -670,7 +682,8 @@ class Context(GpgmeWrapper):
return self.op_genkey_result()
def create_subkey(self, key, algorithm=None, expires_in=0, expires=True,
sign=False, encrypt=False, authenticate=False, passphrase=None):
sign=False, encrypt=False, authenticate=False,
passphrase=None):
"""Create a subkey
Create a subkey for the given KEY. As subkeys are a concept
@ -706,7 +719,8 @@ class Context(GpgmeWrapper):
sign -- request the signing capability (see above)
encrypt -- request the encryption capability (see above)
authenticate -- request the authentication capability (see above)
passphrase -- protect the subkey with a passphrase (default: no passphrase)
passphrase -- protect the subkey with a passphrase (default: no
passphrase)
Returns:
-- an object describing the result of the subkey creation
@ -719,19 +733,20 @@ class Context(GpgmeWrapper):
old_pinentry_mode = self.pinentry_mode
old_passphrase_cb = getattr(self, '_passphrase_cb', None)
self.pinentry_mode = constants.PINENTRY_MODE_LOOPBACK
def passphrase_cb(hint, desc, prev_bad, hook=None):
return passphrase
self.set_passphrase_cb(passphrase_cb)
try:
self.op_createsubkey(key, algorithm,
0, # reserved
0, # reserved
expires_in,
((constants.create.SIGN if sign else 0)
| (constants.create.ENCR if encrypt else 0)
| (constants.create.AUTH if authenticate else 0)
| (constants.create.NOPASSWD
if passphrase == None else 0)
if passphrase is None else 0)
| (0 if expires else constants.create.NOEXPIRE)))
finally:
if util.is_a_string(passphrase):
@ -792,8 +807,8 @@ class Context(GpgmeWrapper):
"""
flags = 0
if uids == None or util.is_a_string(uids):
pass#through unchanged
if uids is None or util.is_a_string(uids):
pass # through unchanged
else:
flags |= constants.keysign.LFSEP
uids = "\n".join(uids)
@ -883,10 +898,10 @@ class Context(GpgmeWrapper):
GPGMEError -- as signaled by the underlying library
"""
if key == None:
if key is None:
raise ValueError("First argument cannot be None")
if sink == None:
if sink is None:
sink = Data()
if fnc_value:
@ -904,6 +919,7 @@ class Context(GpgmeWrapper):
def signers(self):
"""Keys used for signing"""
return [self.signers_enum(i) for i in range(self.signers_count())]
@signers.setter
def signers(self, signers):
old = self.signers
@ -919,6 +935,7 @@ class Context(GpgmeWrapper):
def pinentry_mode(self):
"""Pinentry mode"""
return self.get_pinentry_mode()
@pinentry_mode.setter
def pinentry_mode(self, value):
self.set_pinentry_mode(value)
@ -927,6 +944,7 @@ class Context(GpgmeWrapper):
def protocol(self):
"""Protocol to use"""
return self.get_protocol()
@protocol.setter
def protocol(self, value):
errorcheck(gpgme.gpgme_engine_check_version(value))
@ -936,6 +954,7 @@ class Context(GpgmeWrapper):
def home_dir(self):
"""Engine's home directory"""
return self.engine_info.home_dir
@home_dir.setter
def home_dir(self, value):
self.set_engine_info(self.protocol, home_dir=value)
@ -948,25 +967,15 @@ class Context(GpgmeWrapper):
# The list of functions is created using:
#
# $ grep '^gpgme_error_t ' obj/lang/python/python3.5-gpg/gpgme.h \
# | grep -v _op_ | awk "/\(gpgme_ctx/ { printf (\"'%s',\\n\", \$2) } "
return ((name.startswith('gpgme_op_')
and not name.endswith('_result'))
or name in {
'gpgme_new',
'gpgme_set_ctx_flag',
'gpgme_set_protocol',
'gpgme_set_sub_protocol',
'gpgme_set_keylist_mode',
'gpgme_set_pinentry_mode',
'gpgme_set_locale',
'gpgme_ctx_set_engine_info',
'gpgme_signers_add',
'gpgme_sig_notation_add',
'gpgme_set_sender',
'gpgme_cancel',
'gpgme_cancel_async',
'gpgme_get_key',
})
# | grep -v _op_ | awk "/\(gpgme_ctx/ { printf (\"'%s',\\n\", \$2) } "
return ((name.startswith('gpgme_op_') and not
name.endswith('_result')) or name in {'gpgme_new',
'gpgme_set_ctx_flag', 'gpgme_set_protocol',
'gpgme_set_sub_protocol', 'gpgme_set_keylist_mode',
'gpgme_set_pinentry_mode', 'gpgme_set_locale',
'gpgme_ctx_set_engine_info', 'gpgme_signers_add',
'gpgme_sig_notation_add', 'gpgme_set_sender', 'gpgme_cancel',
'gpgme_cancel_async', 'gpgme_get_key'})
_boolean_properties = {'armor', 'textmode', 'offline'}
@ -985,6 +994,7 @@ class Context(GpgmeWrapper):
# Implement the context manager protocol.
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.__del__()
@ -1079,10 +1089,10 @@ class Context(GpgmeWrapper):
Please see the GPGME manual for more information.
"""
if func == None:
if func is None:
hookdata = None
else:
if hook == None:
if hook is None:
hookdata = (weakref.ref(self), func)
else:
hookdata = (weakref.ref(self), func, hook)
@ -1104,10 +1114,10 @@ class Context(GpgmeWrapper):
Please see the GPGME manual for more information.
"""
if func == None:
if func is None:
hookdata = None
else:
if hook == None:
if hook is None:
hookdata = (weakref.ref(self), func)
else:
hookdata = (weakref.ref(self), func, hook)
@ -1128,10 +1138,10 @@ class Context(GpgmeWrapper):
Please see the GPGME manual for more information.
"""
if func == None:
if func is None:
hookdata = None
else:
if hook == None:
if hook is None:
hookdata = (weakref.ref(self), func)
else:
hookdata = (weakref.ref(self), func, hook)
@ -1229,7 +1239,8 @@ class Data(GpgmeWrapper):
# This list is compiled using
#
# $ grep -v '^gpgme_error_t ' obj/lang/python/python3.5-gpg/gpgme.h \
# | awk "/\(gpgme_data_t/ { printf (\"'%s',\\n\", \$2) } " | sed "s/'\\*/'/"
# | awk "/\(gpgme_data_t/ { printf (\"'%s',\\n\", \$2) } " \
# | sed "s/'\\*/'/"
return name not in {
'gpgme_data_read',
'gpgme_data_write',
@ -1286,13 +1297,13 @@ class Data(GpgmeWrapper):
super(Data, self).__init__(None)
self.data_cbs = None
if cbs != None:
if cbs is not None:
self.new_from_cbs(*cbs)
elif string != None:
elif string is not None:
self.new_from_mem(string, copy)
elif file != None and offset != None and length != None:
elif file is not None and offset is not None and length is not None:
self.new_from_filepart(file, offset, length)
elif file != None:
elif file is not None:
if util.is_a_string(file):
self.new_from_file(file, copy)
else:
@ -1305,7 +1316,7 @@ class Data(GpgmeWrapper):
# At interpreter shutdown, gpgme is set to NONE.
return
if self.wrapped != None and gpgme.gpgme_data_release:
if self.wrapped is not None and gpgme.gpgme_data_release:
gpgme.gpgme_data_release(self.wrapped)
if self._callback_excinfo:
gpgme.gpg_raise_callback_exception(self)
@ -1315,6 +1326,7 @@ class Data(GpgmeWrapper):
# Implement the context manager protocol.
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.__del__()
@ -1329,7 +1341,8 @@ class Data(GpgmeWrapper):
def new_from_mem(self, string, copy=True):
tmp = gpgme.new_gpgme_data_t_p()
errorcheck(gpgme.gpgme_data_new_from_mem(tmp,string,len(string),copy))
errorcheck(gpgme.gpgme_data_new_from_mem(tmp, string, len(string),
copy))
self.wrapped = gpgme.gpgme_data_t_p_value(tmp)
gpgme.delete_gpgme_data_t_p(tmp)
@ -1347,7 +1360,7 @@ class Data(GpgmeWrapper):
def new_from_cbs(self, read_cb, write_cb, seek_cb, release_cb, hook=None):
tmp = gpgme.new_gpgme_data_t_p()
if hook != None:
if hook is not None:
hookdata = (weakref.ref(self),
read_cb, write_cb, seek_cb, release_cb, hook)
else:
@ -1374,9 +1387,9 @@ class Data(GpgmeWrapper):
filename = file
else:
fp = gpgme.fdopen(file.fileno(), file.mode)
if fp == None:
raise ValueError("Failed to open file from %s arg %s" % \
(str(type(file)), str(file)))
if fp is None:
raise ValueError("Failed to open file from %s arg %s" %
(str(type(file)), str(file)))
errorcheck(gpgme.gpgme_data_new_from_filepart(tmp, filename, fp,
offset, length))
@ -1412,7 +1425,7 @@ class Data(GpgmeWrapper):
raise GPGMEError.fromSyserror()
return written
def read(self, size = -1):
def read(self, size=-1):
"""Read at most size bytes, returned as bytes.
If the size argument is negative or omitted, read until EOF is reached.
@ -1447,6 +1460,7 @@ class Data(GpgmeWrapper):
chunks.append(result)
return b''.join(chunks)
def pubkey_algo_string(subkey):
"""Return short algorithm string
@ -1459,6 +1473,7 @@ def pubkey_algo_string(subkey):
"""
return gpgme.gpgme_pubkey_algo_string(subkey)
def pubkey_algo_name(algo):
"""Return name of public key algorithm
@ -1471,6 +1486,7 @@ def pubkey_algo_name(algo):
"""
return gpgme.gpgme_pubkey_algo_name(algo)
def hash_algo_name(algo):
"""Return name of hash algorithm
@ -1483,6 +1499,7 @@ def hash_algo_name(algo):
"""
return gpgme.gpgme_hash_algo_name(algo)
def get_protocol_name(proto):
"""Get protocol description
@ -1494,6 +1511,7 @@ def get_protocol_name(proto):
"""
return gpgme.gpgme_get_protocol_name(proto)
def addrspec_from_uid(uid):
"""Return the address spec
@ -1505,6 +1523,7 @@ def addrspec_from_uid(uid):
"""
return gpgme.gpgme_addrspec_from_uid(uid)
def check_version(version=None):
return gpgme.gpgme_check_version(version)
@ -1514,13 +1533,15 @@ def check_version(version=None):
# it unless she really wants to check for a certain version.
check_version()
def engine_check_version (proto):
def engine_check_version(proto):
try:
errorcheck(gpgme.gpgme_engine_check_version(proto))
return True
except errors.GPGMEError:
return False
def get_engine_info():
ptr = gpgme.new_gpgme_engine_info_t_p()
try:
@ -1531,6 +1552,7 @@ def get_engine_info():
gpgme.delete_gpgme_engine_info_t_p(ptr)
return info
def set_engine_info(proto, file_name, home_dir=None):
"""Changes the default configuration of the crypto engine implementing
the protocol 'proto'. 'file_name' is the file name of
@ -1539,10 +1561,12 @@ def set_engine_info(proto, file_name, home_dir=None):
used if omitted)."""
errorcheck(gpgme.gpgme_set_engine_info(proto, file_name, home_dir))
def set_locale(category, value):
"""Sets the default locale used by contexts"""
errorcheck(gpgme.gpgme_set_locale(None, category, value))
def wait(hang):
"""Wait for asynchronous call on any Context to finish.
Wait forever if hang is True.
@ -1556,7 +1580,7 @@ def wait(hang):
context = gpgme.gpgme_wait(None, ptr, hang)
status = gpgme.gpgme_error_t_p_value(ptr)
gpgme.delete_gpgme_error_t_p(ptr)
if context == None:
if context is None:
errorcheck(status)
else:
context = Context(context)