python: Support the Assuan engine.

* lang/python/gpgme.i: Add typemaps for the Assuan protocol callbacks.
* lang/python/helpers.c (_pyme_assuan_{data,inquire,status}_cb): New
functions.
* lang/python/private.h (_pyme_assuan_{data,inquire,status}_cb): New
prototypes.
* lang/python/pyme/core.py (Context.assuan_transact): New method.
* lang/python/pyme/util.py (percent_escape): New function.
* lang/python/tests/Makefile.am (py_tests): Add new test.
* lang/python/tests/t-protocol-assuan.py: New file.

Signed-off-by: Justus Winter <justus@g10code.com>
This commit is contained in:
Justus Winter 2016-07-28 12:40:54 +02:00
parent 355d707286
commit de69fa496c
7 changed files with 302 additions and 1 deletions

View File

@ -443,6 +443,60 @@
$2 = $input; $2 = $input;
} }
/* The assuan protocol callbacks. */
%typemap(in) (gpgme_assuan_data_cb_t data_cb, void *data_cb_value) {
if ($input == Py_None)
$1 = $2 = NULL;
else
{
if (! PyTuple_Check($input))
return PyErr_Format(PyExc_TypeError, "callback must be a tuple");
if (PyTuple_Size($input) != 2)
return PyErr_Format(PyExc_TypeError,
"callback must be a tuple of size 2");
if (! PyCallable_Check(PyTuple_GetItem($input, 1)))
return PyErr_Format(PyExc_TypeError, "second item must be callable");
$1 = _pyme_assuan_data_cb;
$2 = $input;
}
}
%typemap(in) (gpgme_assuan_inquire_cb_t inq_cb, void *inq_cb_value) {
if ($input == Py_None)
$1 = $2 = NULL;
else
{
if (! PyTuple_Check($input))
return PyErr_Format(PyExc_TypeError, "callback must be a tuple");
if (PyTuple_Size($input) != 2)
return PyErr_Format(PyExc_TypeError,
"callback must be a tuple of size 2");
if (! PyCallable_Check(PyTuple_GetItem($input, 1)))
return PyErr_Format(PyExc_TypeError, "second item must be callable");
$1 = _pyme_assuan_inquire_cb;
$2 = $input;
}
}
%typemap(in) (gpgme_assuan_status_cb_t stat_cb, void *stat_cb_value) {
if ($input == Py_None)
$1 = $2 = NULL;
else
{
if (! PyTuple_Check($input))
return PyErr_Format(PyExc_TypeError, "callback must be a tuple");
if (PyTuple_Size($input) != 2)
return PyErr_Format(PyExc_TypeError,
"callback must be a tuple of size 2");
if (! PyCallable_Check(PyTuple_GetItem($input, 1)))
return PyErr_Format(PyExc_TypeError, "second item must be callable");
$1 = _pyme_assuan_status_cb;
$2 = $input;
}
}
/* Include the unmodified <gpgme.h> for cc, and the cleaned-up local /* Include the unmodified <gpgme.h> for cc, and the cleaned-up local
version for SWIG. We do, however, want to hide certain fields on version for SWIG. We do, however, want to hide certain fields on
some structs, which we provide prior to including the version for some structs, which we provide prior to including the version for

View File

@ -937,3 +937,119 @@ pygpgme_data_new_from_cbs(PyObject *self,
Py_INCREF(Py_None); Py_INCREF(Py_None);
return Py_None; return Py_None;
} }
/* The assuan callbacks. */
gpgme_error_t
_pyme_assuan_data_cb (void *hook, const void *data, size_t datalen)
{
gpgme_error_t err = 0;
PyObject *pyhook = (PyObject *) hook;
PyObject *self = NULL;
PyObject *func = NULL;
PyObject *py_data = NULL;
PyObject *retval = NULL;
assert (PyTuple_Check(pyhook));
assert (PyTuple_Size(pyhook) == 2);
self = PyTuple_GetItem(pyhook, 0);
func = PyTuple_GetItem(pyhook, 1);
assert (PyCallable_Check(func));
py_data = PyBytes_FromStringAndSize(data, datalen);
if (py_data == NULL)
return NULL; /* raise */
retval = PyObject_CallFunctionObjArgs(func, py_data, NULL);
if (PyErr_Occurred())
err = pygpgme_exception2code();
Py_DECREF(py_data);
Py_XDECREF(retval);
leave:
if (err)
pygpgme_stash_callback_exception(self);
return err;
}
gpgme_error_t
_pyme_assuan_inquire_cb (void *hook, const char *name, const char *args,
gpgme_data_t *r_data)
{
gpgme_error_t err = 0;
PyObject *pyhook = (PyObject *) hook;
PyObject *self = NULL;
PyObject *func = NULL;
PyObject *py_name = NULL;
PyObject *py_args = NULL;
PyObject *retval = NULL;
assert (PyTuple_Check(pyhook));
assert (PyTuple_Size(pyhook) == 2);
self = PyTuple_GetItem(pyhook, 0);
func = PyTuple_GetItem(pyhook, 1);
assert (PyCallable_Check(func));
py_name = PyUnicode_FromString(name);
if (py_name == NULL)
return NULL; /* raise */
py_args = PyUnicode_FromString(args);
if (py_args == NULL)
return NULL; /* raise */
retval = PyObject_CallFunctionObjArgs(func, py_name, py_args, NULL);
if (PyErr_Occurred())
err = pygpgme_exception2code();
Py_DECREF(py_name);
Py_DECREF(py_args);
Py_XDECREF(retval);
/* FIXME: Returning data is not yet implemented. */
r_data = NULL;
leave:
if (err)
pygpgme_stash_callback_exception(self);
return err;
}
gpgme_error_t
_pyme_assuan_status_cb (void *hook, const char *status, const char *args)
{
gpgme_error_t err = 0;
PyObject *pyhook = (PyObject *) hook;
PyObject *self = NULL;
PyObject *func = NULL;
PyObject *py_status = NULL;
PyObject *py_args = NULL;
PyObject *retval = NULL;
assert (PyTuple_Check(pyhook));
assert (PyTuple_Size(pyhook) == 2);
self = PyTuple_GetItem(pyhook, 0);
func = PyTuple_GetItem(pyhook, 1);
assert (PyCallable_Check(func));
py_status = PyUnicode_FromString(status);
if (py_status == NULL)
return NULL; /* raise */
py_args = PyUnicode_FromString(args);
if (py_args == NULL)
return NULL; /* raise */
retval = PyObject_CallFunctionObjArgs(func, py_status, py_args, NULL);
if (PyErr_Occurred())
err = pygpgme_exception2code();
Py_DECREF(py_status);
Py_DECREF(py_args);
Py_XDECREF(retval);
leave:
if (err)
pygpgme_stash_callback_exception(self);
return err;
}

View File

@ -35,4 +35,12 @@ PyObject *pygpgme_wrap_fragile_result(PyObject *fragile, const char *classname);
gpgme_error_t pyEditCb(void *opaque, gpgme_status_code_t status, gpgme_error_t pyEditCb(void *opaque, gpgme_status_code_t status,
const char *args, int fd); const char *args, int fd);
gpgme_error_t _pyme_assuan_data_cb (void *hook,
const void *data, size_t datalen);
gpgme_error_t _pyme_assuan_inquire_cb (void *hook,
const char *name, const char *args,
gpgme_data_t *r_data);
gpgme_error_t _pyme_assuan_status_cb (void *hook,
const char *status, const char *args);
#endif /* _PYME_PRIVATE_H_ */ #endif /* _PYME_PRIVATE_H_ */

View File

@ -31,6 +31,7 @@ from . import pygpgme
from .errors import errorcheck, GPGMEError from .errors import errorcheck, GPGMEError
from . import constants from . import constants
from . import errors from . import errors
from . import util
class GpgmeWrapper(object): class GpgmeWrapper(object):
"""Base wrapper class """Base wrapper class
@ -467,6 +468,55 @@ class Context(GpgmeWrapper):
plainbytes = data.read() plainbytes = data.read()
return plainbytes, result return plainbytes, result
def assuan_transact(self, command,
data_cb=None, inquire_cb=None, status_cb=None):
"""Issue a raw assuan command
This function can be used to issue a raw assuan command to the
engine.
If command is a string or bytes, it will be used as-is. If it
is an iterable of strings, it will be properly escaped and
joined into an well-formed assuan command.
Keyword arguments:
data_cb -- a callback receiving data lines
inquire_cb -- a callback providing more information
status_cb -- a callback receiving status lines
Returns:
result -- the result of command as GPGMEError
Raises:
GPGMEError -- as signaled by the underlying library
"""
if isinstance(command, (str, bytes)):
cmd = command
else:
cmd = " ".join(util.percent_escape(f) for f in command)
errptr = pygpgme.new_gpgme_error_t_p()
err = pygpgme.gpgme_op_assuan_transact_ext(
self.wrapped,
cmd,
(weakref.ref(self), data_cb) if data_cb else None,
(weakref.ref(self), inquire_cb) if inquire_cb else None,
(weakref.ref(self), status_cb) if status_cb else None,
errptr)
if self._callback_excinfo:
pygpgme.pygpgme_raise_callback_exception(self)
errorcheck(err)
status = pygpgme.gpgme_error_t_p_value(errptr)
pygpgme.delete_gpgme_error_t_p(errptr)
return GPGMEError(status) if status != 0 else None
@property @property
def signers(self): def signers(self):
"""Keys used for signing""" """Keys used for signing"""

View File

@ -31,3 +31,9 @@ def process_constants(prefix, scope):
if identifier.startswith(prefix)} if identifier.startswith(prefix)}
scope.update(constants) scope.update(constants)
return list(constants.keys()) return list(constants.keys())
def percent_escape(s):
return ''.join(
'%{0:2x}'.format(ord(c))
if c == '+' or c == '"' or c == '%' or ord(c) <= 0x20 else c
for c in s)

View File

@ -49,7 +49,8 @@ py_tests = t-wrapper.py \
t-wait.py \ t-wait.py \
t-encrypt-large.py \ t-encrypt-large.py \
t-file-name.py \ t-file-name.py \
t-idiomatic.py t-idiomatic.py \
t-protocol-assuan.py
TESTS = initial.py $(py_tests) final.py TESTS = initial.py $(py_tests) final.py
EXTRA_DIST = support.py $(TESTS) encrypt-only.asc sign-only.asc EXTRA_DIST = support.py $(TESTS) encrypt-only.asc sign-only.asc

View File

@ -0,0 +1,66 @@
#!/usr/bin/env python3
# Copyright (C) 2016 g10 Code GmbH
#
# This file is part of GPGME.
#
# GPGME is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# GPGME is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
import pyme
with pyme.Context(protocol=pyme.constants.PROTOCOL_ASSUAN) as c:
# Do nothing.
c.assuan_transact('nop')
c.assuan_transact('NOP')
c.assuan_transact(['NOP'])
err = c.assuan_transact('idontexist')
assert err.getsource() == pyme.errors.SOURCE_GPGAGENT
assert err.getcode() == pyme.errors.ASS_UNKNOWN_CMD
# Invoke the pinentry to get a confirmation.
c.assuan_transact(['GET_CONFIRMATION', 'Hello there'])
data = []
def data_cb(line):
data.append(line)
err = c.assuan_transact(['GETINFO', 'version'], data_cb=data_cb)
assert not err
assert len(data) == 1
data = []
err = c.assuan_transact(['GETINFO', 's2k_count'], data_cb=data_cb)
if not err:
assert len(data) == 1
assert int(data[0]) > 0
# XXX HELP sends status lines if we could use ASSUAN_CONVEY_COMMENTS.
status = []
def status_cb(line, args):
status.append((line, args))
alphas_grip = '76F7E2B35832976B50A27A282D9B87E44577EB66'
err = c.assuan_transact(['KEYINFO', alphas_grip], status_cb=status_cb)
if not err:
assert len(status) == 1
line, args = status[0]
assert line.startswith('KEYINFO')
assert args.startswith(alphas_grip)
# XXX: test these callbacks, e.g. using PRESET_PASSPHRASE
# XXX: once issue2428 is resolved
def inq_cb(name, args):
print("inq_cb", name, args)