diff options
| -rw-r--r-- | lang/python/helpers.c | 65 | ||||
| -rw-r--r-- | lang/python/helpers.h | 2 | ||||
| -rw-r--r-- | lang/python/pyme/core.py | 32 | ||||
| -rwxr-xr-x | lang/python/tests/t-callbacks.py | 35 | 
4 files changed, 134 insertions, 0 deletions
| diff --git a/lang/python/helpers.c b/lang/python/helpers.c index d0c1f3b6..ec7264aa 100644 --- a/lang/python/helpers.c +++ b/lang/python/helpers.c @@ -272,7 +272,72 @@ void pygpgme_set_progress_cb(gpgme_ctx_t ctx, PyObject *cb, PyObject **freelater    *freelater = cb;    gpgme_set_progress_cb(ctx, (gpgme_progress_cb_t) pyProgressCb, (void *) cb);  } + +/* Status callbacks.  */ +static gpgme_error_t pyStatusCb(void *hook, const char *keyword, +                                const char *args) { +  gpgme_error_t err = 0; +  PyObject *pyhook = (PyObject *) hook; +  PyObject *self = NULL; +  PyObject *func = NULL; +  PyObject *dataarg = NULL; +  PyObject *pyargs = NULL; +  PyObject *retval = NULL; + +  assert (PyTuple_Check(pyhook)); +  assert (PyTuple_Size(pyhook) == 2 || PyTuple_Size(pyhook) == 3); +  self = PyTuple_GetItem(pyhook, 0); +  func = PyTuple_GetItem(pyhook, 1); +  if (PyTuple_Size(pyhook) == 3) { +    dataarg = PyTuple_GetItem(pyhook, 2); +    pyargs = PyTuple_New(3); +  } else { +    pyargs = PyTuple_New(2); +  } + +  if (keyword) +    PyTuple_SetItem(pyargs, 0, PyUnicode_DecodeUTF8(keyword, strlen (keyword), +                                                    "strict")); +  else +    { +      Py_INCREF(Py_None); +      PyTuple_SetItem(pyargs, 0, Py_None); +    } +  PyTuple_SetItem(pyargs, 1, PyUnicode_DecodeUTF8(args, strlen (args), +                                                "strict")); +  if (PyErr_Occurred()) { +    err = gpg_error(GPG_ERR_GENERAL); +    Py_DECREF(pyargs); +    goto leave; +  } + +  if (dataarg) { +    Py_INCREF(dataarg); +    PyTuple_SetItem(pyargs, 2, dataarg); +  } + +  retval = PyObject_CallObject(func, pyargs); +  if (PyErr_Occurred()) +    err = pygpgme_exception2code(); +  Py_DECREF(pyargs); +  Py_XDECREF(retval); + + leave: +  if (err) +    pygpgme_stash_callback_exception(self); +  return err; +} +void pygpgme_set_status_cb(gpgme_ctx_t ctx, PyObject *cb, +                           PyObject **freelater) { +  if (cb == Py_None) { +    gpgme_set_status_cb(ctx, NULL, NULL); +    return; +  } +  Py_INCREF(cb); +  *freelater = cb; +  gpgme_set_status_cb(ctx, (gpgme_status_cb_t) pyStatusCb, (void *) cb); +}  /* Edit callbacks.  */  gpgme_error_t pyEditCb(void *opaque, gpgme_status_code_t status, diff --git a/lang/python/helpers.h b/lang/python/helpers.h index 4bd8ef81..5dd88a06 100644 --- a/lang/python/helpers.h +++ b/lang/python/helpers.h @@ -35,6 +35,8 @@ PyObject *pygpgme_raise_callback_exception(PyObject *self);  void pygpgme_set_passphrase_cb(gpgme_ctx_t ctx, PyObject *cb,  			       PyObject **freelater);  void pygpgme_set_progress_cb(gpgme_ctx_t ctx, PyObject *cb, PyObject **freelater); +void pygpgme_set_status_cb(gpgme_ctx_t ctx, PyObject *cb, +                           PyObject **freelater);  gpgme_error_t pyEditCb(void *opaque, gpgme_status_code_t status,  		       const char *args, int fd); diff --git a/lang/python/pyme/core.py b/lang/python/pyme/core.py index 43a24131..f15444be 100644 --- a/lang/python/pyme/core.py +++ b/lang/python/pyme/core.py @@ -64,6 +64,7 @@ class Context(GpgmeWrapper):          super().__init__(wrapped)          self.last_passcb = None          self.last_progresscb = None +        self.last_statuscb = None      def __del__(self):          if not pygpgme: @@ -91,6 +92,14 @@ class Context(GpgmeWrapper):                  pygpgme.delete_PyObject_p_p(self.last_progresscb)              self.last_progresscb = None +    def _free_statuscb(self): +        if self.last_statuscb != None: +            if pygpgme.pygpgme_clear_generic_cb: +                pygpgme.pygpgme_clear_generic_cb(self.last_statuscb) +            if pygpgme.delete_PyObject_p_p: +                pygpgme.delete_PyObject_p_p(self.last_statuscb) +            self.last_statuscb = None +      def op_keylist_all(self, *args, **kwargs):          self.op_keylist_start(*args, **kwargs)          key = self.op_keylist_next() @@ -195,6 +204,29 @@ class Context(GpgmeWrapper):                  hookdata = (self, func, hook)          pygpgme.pygpgme_set_progress_cb(self.wrapped, hookdata, self.last_progresscb) +    def set_status_cb(self, func, hook=None): +        """Sets the status callback to the function specified by FUNC.  If +        FUNC is None, the callback will be cleared. + +        The function will be called with two arguments, keyword and +        args.  If HOOK is not None, it will be supplied as third +        argument. + +        Please see the GPGME manual for more information. + +        """ +        self._free_statuscb() +        if func == None: +            hookdata = None +        else: +            self.last_statuscb = pygpgme.new_PyObject_p_p() +            if hook == None: +                hookdata = (self, func) +            else: +                hookdata = (self, func, hook) +        pygpgme.pygpgme_set_status_cb(self.wrapped, hookdata, +                                      self.last_statuscb) +      def get_engine_info(self):          """Returns this context specific engine info"""          return pygpgme.gpgme_ctx_get_engine_info(self.wrapped) diff --git a/lang/python/tests/t-callbacks.py b/lang/python/tests/t-callbacks.py index d962dc41..57975264 100755 --- a/lang/python/tests/t-callbacks.py +++ b/lang/python/tests/t-callbacks.py @@ -146,3 +146,38 @@ except Exception as e:      assert e == myException  else:      assert False, "Expected an error, got none" + + + +# Test the status callback. +source = core.Data("Hallo Leute\n") +sink = core.Data() + +status_cb_called = False +def status_cb(keyword, args, hook=None): +    global status_cb_called +    status_cb_called = True +    assert hook == cookie + +c = core.Context() +c.set_status_cb(status_cb, cookie) +c.set_ctx_flag("full-status", "1") +c.op_encrypt([alpha], constants.ENCRYPT_ALWAYS_TRUST, source, sink) +assert status_cb_called + +# Test exceptions. +source = core.Data("Hallo Leute\n") +sink = core.Data() + +def status_cb(keyword, args): +    raise myException + +c = core.Context() +c.set_status_cb(status_cb, None) +c.set_ctx_flag("full-status", "1") +try: +    c.op_encrypt([alpha], constants.ENCRYPT_ALWAYS_TRUST, source, sink) +except Exception as e: +    assert e == myException +else: +    assert False, "Expected an error, got none" | 
