python: Robust exception handling in callbacks.

* lang/python/helpers.c (pygpgme_stash_callback_exception): New
function.
(pygpgme_raise_callback_exception): Likewise.
(pyPassphraseCb): Stash python errors.
* lang/python/helpers.h (pygpgme_raise_callback_exception): New
prototype.
* lang/python/pyme/core.py ({Context,Data}.__init__): Move common
initialization to superclass.
(Context.set_progress_cb): Hand in 'self'.
* lang/python/pyme/util.py (GpgmeWrapper.__init__): New function.
(GpgmeWrapper.__getattr__): Raise stashed exceptions.
* lang/python/tests/Makefile.am (py_tests): Add new test.
* lang/python/tests/t-callbacks.py: New file.

Signed-off-by: Justus Winter <justus@gnupg.org>
This commit is contained in:
Justus Winter 2016-05-19 11:03:27 +02:00
parent 2f748b5a2d
commit d90857a08c
6 changed files with 187 additions and 13 deletions

View File

@ -17,6 +17,8 @@
# License along with this library; if not, write to the Free Software # License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/ */
#include <assert.h>
#include <stdio.h> #include <stdio.h>
#include <gpgme.h> #include <gpgme.h>
#include <stdlib.h> #include <stdlib.h>
@ -59,12 +61,84 @@ void pygpgme_clear_generic_cb(PyObject **cb) {
Py_DECREF(*cb); Py_DECREF(*cb);
} }
/* Exception support for callbacks. */
#define EXCINFO "_callback_excinfo"
static void pygpgme_stash_callback_exception(PyObject *self)
{
PyObject *ptype, *pvalue, *ptraceback, *excinfo;
PyErr_Fetch(&ptype, &pvalue, &ptraceback);
excinfo = PyTuple_New(3);
PyTuple_SetItem(excinfo, 0, ptype);
if (pvalue)
PyTuple_SetItem(excinfo, 1, pvalue);
else {
Py_INCREF(Py_None);
PyTuple_SetItem(excinfo, 1, Py_None);
}
if (ptraceback)
PyTuple_SetItem(excinfo, 2, ptraceback);
else {
Py_INCREF(Py_None);
PyTuple_SetItem(excinfo, 2, Py_None);
}
PyObject_SetAttrString(self, EXCINFO, excinfo);
}
PyObject *pygpgme_raise_callback_exception(PyObject *self)
{
PyObject *ptype, *pvalue, *ptraceback, *excinfo;
if (! PyObject_HasAttrString(self, EXCINFO))
goto leave;
excinfo = PyObject_GetAttrString(self, EXCINFO);
if (! PyTuple_Check(excinfo))
{
Py_DECREF(excinfo);
goto leave;
}
ptype = PyTuple_GetItem(excinfo, 0);
Py_INCREF(excinfo);
pvalue = PyTuple_GetItem(excinfo, 1);
if (pvalue == Py_None)
pvalue = NULL;
else
Py_INCREF(pvalue);
ptraceback = PyTuple_GetItem(excinfo, 2);
if (ptraceback == Py_None)
ptraceback = NULL;
else
Py_INCREF(ptraceback);
Py_DECREF(excinfo);
PyErr_Restore(ptype, pvalue, ptraceback);
Py_INCREF(Py_None);
PyObject_SetAttrString(self, EXCINFO, Py_None);
return NULL; /* Raise exception. */
leave:
Py_INCREF(Py_None);
return Py_None;
}
#undef EXCINFO
static gpgme_error_t pyPassphraseCb(void *hook, static gpgme_error_t pyPassphraseCb(void *hook,
const char *uid_hint, const char *uid_hint,
const char *passphrase_info, const char *passphrase_info,
int prev_was_bad, int prev_was_bad,
int fd) { int fd) {
PyObject *pyhook = (PyObject *) hook; PyObject *pyhook = (PyObject *) hook;
PyObject *self = NULL;
PyObject *func = NULL; PyObject *func = NULL;
PyObject *args = NULL; PyObject *args = NULL;
PyObject *retval = NULL; PyObject *retval = NULL;
@ -73,12 +147,13 @@ static gpgme_error_t pyPassphraseCb(void *hook,
pygpgme_exception_init(); pygpgme_exception_init();
if (PyTuple_Check(pyhook)) { assert (PyTuple_Check(pyhook));
func = PyTuple_GetItem(pyhook, 0); self = PyTuple_GetItem(pyhook, 0);
dataarg = PyTuple_GetItem(pyhook, 1); func = PyTuple_GetItem(pyhook, 1);
if (PyTuple_Size(pyhook) == 3) {
dataarg = PyTuple_GetItem(pyhook, 2);
args = PyTuple_New(4); args = PyTuple_New(4);
} else { } else {
func = pyhook;
args = PyTuple_New(3); args = PyTuple_New(3);
} }
@ -90,6 +165,11 @@ static gpgme_error_t pyPassphraseCb(void *hook,
else else
PyTuple_SetItem(args, 0, PyUnicode_DecodeUTF8(uid_hint, strlen (uid_hint), PyTuple_SetItem(args, 0, PyUnicode_DecodeUTF8(uid_hint, strlen (uid_hint),
"strict")); "strict"));
if (PyErr_Occurred()) {
Py_DECREF(args);
err_status = gpg_error(GPG_ERR_GENERAL);
goto leave;
}
PyTuple_SetItem(args, 1, PyBytes_FromString(passphrase_info)); PyTuple_SetItem(args, 1, PyBytes_FromString(passphrase_info));
PyTuple_SetItem(args, 2, PyBool_FromLong((long)prev_was_bad)); PyTuple_SetItem(args, 2, PyBool_FromLong((long)prev_was_bad));
@ -117,7 +197,8 @@ static gpgme_error_t pyPassphraseCb(void *hook,
PyErr_Format(PyExc_TypeError, PyErr_Format(PyExc_TypeError,
"expected str or bytes from passphrase callback, got %s", "expected str or bytes from passphrase callback, got %s",
retval->ob_type->tp_name); retval->ob_type->tp_name);
return gpg_error(GPG_ERR_GENERAL); err_status = gpg_error(GPG_ERR_GENERAL);
goto leave;
} }
write(fd, buf, len); write(fd, buf, len);
@ -126,6 +207,10 @@ static gpgme_error_t pyPassphraseCb(void *hook,
} }
} }
leave:
if (err_status)
pygpgme_stash_callback_exception(self);
return err_status; return err_status;
} }

View File

@ -30,6 +30,7 @@ void pygpgme_exception_init(void);
gpgme_error_t pygpgme_exception2code(void); gpgme_error_t pygpgme_exception2code(void);
void pygpgme_clear_generic_cb(PyObject **cb); void pygpgme_clear_generic_cb(PyObject **cb);
PyObject *pygpgme_raise_callback_exception(PyObject *self);
void pygpgme_set_passphrase_cb(gpgme_ctx_t ctx, PyObject *cb, void pygpgme_set_passphrase_cb(gpgme_ctx_t ctx, PyObject *cb,
PyObject **freelater); PyObject **freelater);

View File

@ -54,14 +54,14 @@ class Context(GpgmeWrapper):
def __init__(self, wrapped=None): def __init__(self, wrapped=None):
if wrapped: if wrapped:
self.wrapped = wrapped
self.own = False self.own = False
else: else:
tmp = pygpgme.new_gpgme_ctx_t_p() tmp = pygpgme.new_gpgme_ctx_t_p()
errorcheck(pygpgme.gpgme_new(tmp)) errorcheck(pygpgme.gpgme_new(tmp))
self.wrapped = pygpgme.gpgme_ctx_t_p_value(tmp) wrapped = pygpgme.gpgme_ctx_t_p_value(tmp)
pygpgme.delete_gpgme_ctx_t_p(tmp) pygpgme.delete_gpgme_ctx_t_p(tmp)
self.own = True self.own = True
super().__init__(wrapped)
self.last_passcb = None self.last_passcb = None
self.last_progresscb = None self.last_progresscb = None
@ -167,9 +167,9 @@ class Context(GpgmeWrapper):
else: else:
self.last_passcb = pygpgme.new_PyObject_p_p() self.last_passcb = pygpgme.new_PyObject_p_p()
if hook == None: if hook == None:
hookdata = func hookdata = (self, func)
else: else:
hookdata = (func, hook) hookdata = (self, func, hook)
pygpgme.pygpgme_set_passphrase_cb(self.wrapped, hookdata, self.last_passcb) pygpgme.pygpgme_set_passphrase_cb(self.wrapped, hookdata, self.last_passcb)
def set_progress_cb(self, func, hook=None): def set_progress_cb(self, func, hook=None):
@ -282,7 +282,7 @@ class Data(GpgmeWrapper):
that file. that file.
Any other use will result in undefined or erroneous behavior.""" Any other use will result in undefined or erroneous behavior."""
self.wrapped = None super().__init__(None)
self.last_readcb = None self.last_readcb = None
if cbs != None: if cbs != None:

View File

@ -32,6 +32,11 @@ def process_constants(starttext, dict):
class GpgmeWrapper(object): class GpgmeWrapper(object):
"""Base class all Pyme wrappers for GPGME functionality. Not to be """Base class all Pyme wrappers for GPGME functionality. Not to be
instantiated directly.""" instantiated directly."""
def __init__(self, wrapped):
self._callback_excinfo = None
self.wrapped = wrapped
def __repr__(self): def __repr__(self):
return '<instance of %s.%s with GPG object at %s>' % \ return '<instance of %s.%s with GPG object at %s>' % \
(__name__, self.__class__.__name__, (__name__, self.__class__.__name__,
@ -78,11 +83,16 @@ class GpgmeWrapper(object):
if self._errorcheck(name): if self._errorcheck(name):
def _funcwrap(slf, *args, **kwargs): def _funcwrap(slf, *args, **kwargs):
return errorcheck(func(slf.wrapped, *args, **kwargs), result = func(slf.wrapped, *args, **kwargs)
"Invocation of " + name) if slf._callback_excinfo:
pygpgme.pygpgme_raise_callback_exception(slf)
return errorcheck(result, "Invocation of " + name)
else: else:
def _funcwrap(slf, *args, **kwargs): def _funcwrap(slf, *args, **kwargs):
return func(slf.wrapped, *args, **kwargs) result = func(slf.wrapped, *args, **kwargs)
if slf._callback_excinfo:
pygpgme.pygpgme_raise_callback_exception(slf)
return result
_funcwrap.__doc__ = getattr(func, "__doc__") _funcwrap.__doc__ = getattr(func, "__doc__")

View File

@ -29,6 +29,7 @@ TESTS_ENVIRONMENT = GNUPGHOME=$(abs_builddir) \
PYTHONPATH=`echo $(abs_builddir)/../build/lib.*` PYTHONPATH=`echo $(abs_builddir)/../build/lib.*`
py_tests = t-wrapper.py \ py_tests = t-wrapper.py \
t-callbacks.py \
t-data.py \ t-data.py \
t-encrypt.py \ t-encrypt.py \
t-encrypt-sym.py \ t-encrypt-sym.py \

View File

@ -0,0 +1,77 @@
#!/usr/bin/env python3
# Copyright (C) 2016 g10 Code GmbH
#
# This file is part of GPGME.
#
# GPGME is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# GPGME is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
# Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
import os
from pyme import core, constants
import support
support.init_gpgme(constants.PROTOCOL_OpenPGP)
c = core.Context()
c.set_pinentry_mode(constants.PINENTRY_MODE_LOOPBACK)
source = core.Data("Hallo Leute\n")
sink = core.Data()
# Valid passphrases, both as string and bytes.
for passphrase in ('foo', b'foo'):
def passphrase_cb(hint, desc, prev_bad, hook=None):
assert hook == passphrase
return hook
c.set_passphrase_cb(passphrase_cb, passphrase)
c.op_encrypt([], 0, source, sink)
# Returning an invalid type.
def passphrase_cb(hint, desc, prev_bad, hook=None):
return 0
c.set_passphrase_cb(passphrase_cb, None)
try:
c.op_encrypt([], 0, source, sink)
except Exception as e:
assert type(e) == TypeError
assert str(e) == "expected str or bytes from passphrase callback, got int"
else:
assert False, "Expected an error, got none"
# Raising an exception inside callback.
myException = Exception()
def passphrase_cb(hint, desc, prev_bad, hook=None):
raise myException
c.set_passphrase_cb(passphrase_cb, None)
try:
c.op_encrypt([], 0, source, sink)
except Exception as e:
assert e == myException
else:
assert False, "Expected an error, got none"
# Wrong kind of callback function.
def bad_passphrase_cb():
pass
c.set_passphrase_cb(bad_passphrase_cb, None)
try:
c.op_encrypt([], 0, source, sink)
except Exception as e:
assert type(e) == TypeError
else:
assert False, "Expected an error, got none"