python: Implement data callbacks.
* lang/python/gpgme.i (object_to_gpgme_t): Set exception on error. * lang/python/helpers.c (pyDataReadCb): New function. (pyDataWriteCb): Likewise. (pyDataSeekCb): Likewise. (pyDataReleaseCb): Likewise. (pygpgme_data_new_from_cbs): Likewise. * lang/python/helpers.h (pygpgme_data_new_from_cbs): New prototype. * lang/python/pyme/core.py (Data.__init__): Fix docstring, fix read callbacks. (Data.__del__): Fix read callbacks. (Data._free_readcb): Drop function. (Data._free_datacbs): New function. (Data.new_from_cbs): Fix setting the callbacks. (Data.write): Raise stashed exceptions. (Data.read): Likewise. * lang/python/tests/t-callbacks.py: Test new functionality. * lang/python/tests/t-data.py: Likewise. Signed-off-by: Justus Winter <justus@gnupg.org>
This commit is contained in:
parent
ebfe2300c3
commit
2ae847c027
@ -180,10 +180,13 @@ PyObject* object_to_gpgme_t(PyObject* input, const char* objtype, int argnum) {
|
|||||||
gpgme_data_t pubkey, gpgme_data_t seckey,
|
gpgme_data_t pubkey, gpgme_data_t seckey,
|
||||||
gpgme_data_t out};
|
gpgme_data_t out};
|
||||||
|
|
||||||
// SWIG has problem interpreting ssize_t, off_t or gpgme_error_t in gpgme.h
|
/* SWIG has problems interpreting ssize_t, off_t or gpgme_error_t in
|
||||||
|
gpgme.h. */
|
||||||
|
/* XXX: This is wrong at least for off_t if compiled with LFS. */
|
||||||
%typemap(out) ssize_t, off_t, gpgme_error_t, gpgme_err_code_t, gpgme_err_source_t, gpg_error_t {
|
%typemap(out) ssize_t, off_t, gpgme_error_t, gpgme_err_code_t, gpgme_err_source_t, gpg_error_t {
|
||||||
$result = PyLong_FromLong($1);
|
$result = PyLong_FromLong($1);
|
||||||
}
|
}
|
||||||
|
/* XXX: This is wrong at least for off_t if compiled with LFS. */
|
||||||
%typemap(in) ssize_t, off_t, gpgme_error_t, gpgme_err_code_t, gpgme_err_source_t, gpg_error_t {
|
%typemap(in) ssize_t, off_t, gpgme_error_t, gpgme_err_code_t, gpgme_err_source_t, gpg_error_t {
|
||||||
$1 = PyLong_AsLong($input);
|
$1 = PyLong_AsLong($input);
|
||||||
}
|
}
|
||||||
@ -201,7 +204,7 @@ PyObject* object_to_gpgme_t(PyObject* input, const char* objtype, int argnum) {
|
|||||||
Py_XDECREF($result); /* Blow away any previous result */
|
Py_XDECREF($result); /* Blow away any previous result */
|
||||||
if (result < 0) { /* Check for I/O error */
|
if (result < 0) { /* Check for I/O error */
|
||||||
free($1);
|
free($1);
|
||||||
return NULL;
|
return PyErr_SetFromErrno(PyExc_RuntimeError);
|
||||||
}
|
}
|
||||||
$result = PyBytes_FromStringAndSize($1,result);
|
$result = PyBytes_FromStringAndSize($1,result);
|
||||||
free($1);
|
free($1);
|
||||||
|
@ -402,3 +402,245 @@ gpgme_error_t pyEditCb(void *opaque, gpgme_status_code_t status,
|
|||||||
Py_XDECREF(retval);
|
Py_XDECREF(retval);
|
||||||
return err_status;
|
return err_status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Data callbacks. */
|
||||||
|
|
||||||
|
/* Read up to SIZE bytes into buffer BUFFER from the data object with
|
||||||
|
the handle HOOK. Return the number of characters read, 0 on EOF
|
||||||
|
and -1 on error. If an error occurs, errno is set. */
|
||||||
|
static ssize_t pyDataReadCb(void *hook, void *buffer, size_t size)
|
||||||
|
{
|
||||||
|
ssize_t result;
|
||||||
|
PyObject *pyhook = (PyObject *) hook;
|
||||||
|
PyObject *self = NULL;
|
||||||
|
PyObject *func = NULL;
|
||||||
|
PyObject *dataarg = NULL;
|
||||||
|
PyObject *pyargs = NULL;
|
||||||
|
PyObject *retval = NULL;
|
||||||
|
|
||||||
|
assert (PyTuple_Check(pyhook));
|
||||||
|
assert (PyTuple_Size(pyhook) == 5 || PyTuple_Size(pyhook) == 6);
|
||||||
|
|
||||||
|
self = PyTuple_GetItem(pyhook, 0);
|
||||||
|
func = PyTuple_GetItem(pyhook, 1);
|
||||||
|
if (PyTuple_Size(pyhook) == 6) {
|
||||||
|
dataarg = PyTuple_GetItem(pyhook, 5);
|
||||||
|
pyargs = PyTuple_New(2);
|
||||||
|
} else {
|
||||||
|
pyargs = PyTuple_New(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
PyTuple_SetItem(pyargs, 0, PyLong_FromSize_t(size));
|
||||||
|
if (dataarg) {
|
||||||
|
Py_INCREF(dataarg);
|
||||||
|
PyTuple_SetItem(pyargs, 1, dataarg);
|
||||||
|
}
|
||||||
|
|
||||||
|
retval = PyObject_CallObject(func, pyargs);
|
||||||
|
Py_DECREF(pyargs);
|
||||||
|
if (PyErr_Occurred()) {
|
||||||
|
pygpgme_stash_callback_exception(self);
|
||||||
|
result = -1;
|
||||||
|
goto leave;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (! PyBytes_Check(retval)) {
|
||||||
|
PyErr_Format(PyExc_TypeError,
|
||||||
|
"expected bytes from read callback, got %s",
|
||||||
|
retval->ob_type->tp_name);
|
||||||
|
pygpgme_stash_callback_exception(self);
|
||||||
|
result = -1;
|
||||||
|
goto leave;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (PyBytes_Size(retval) > size) {
|
||||||
|
PyErr_Format(PyExc_TypeError,
|
||||||
|
"expected %zu bytes from read callback, got %zu",
|
||||||
|
size, PyBytes_Size(retval));
|
||||||
|
pygpgme_stash_callback_exception(self);
|
||||||
|
result = -1;
|
||||||
|
goto leave;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(buffer, PyBytes_AsString(retval), PyBytes_Size(retval));
|
||||||
|
result = PyBytes_Size(retval);
|
||||||
|
|
||||||
|
leave:
|
||||||
|
Py_XDECREF(retval);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Write up to SIZE bytes from buffer BUFFER to the data object with
|
||||||
|
the handle HOOK. Return the number of characters written, or -1
|
||||||
|
on error. If an error occurs, errno is set. */
|
||||||
|
static ssize_t pyDataWriteCb(void *hook, const void *buffer, size_t size)
|
||||||
|
{
|
||||||
|
ssize_t result;
|
||||||
|
PyObject *pyhook = (PyObject *) hook;
|
||||||
|
PyObject *self = NULL;
|
||||||
|
PyObject *func = NULL;
|
||||||
|
PyObject *dataarg = NULL;
|
||||||
|
PyObject *pyargs = NULL;
|
||||||
|
PyObject *retval = NULL;
|
||||||
|
|
||||||
|
assert (PyTuple_Check(pyhook));
|
||||||
|
assert (PyTuple_Size(pyhook) == 5 || PyTuple_Size(pyhook) == 6);
|
||||||
|
|
||||||
|
self = PyTuple_GetItem(pyhook, 0);
|
||||||
|
func = PyTuple_GetItem(pyhook, 2);
|
||||||
|
if (PyTuple_Size(pyhook) == 6) {
|
||||||
|
dataarg = PyTuple_GetItem(pyhook, 5);
|
||||||
|
pyargs = PyTuple_New(2);
|
||||||
|
} else {
|
||||||
|
pyargs = PyTuple_New(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
PyTuple_SetItem(pyargs, 0, PyBytes_FromStringAndSize(buffer, size));
|
||||||
|
if (dataarg) {
|
||||||
|
Py_INCREF(dataarg);
|
||||||
|
PyTuple_SetItem(pyargs, 1, dataarg);
|
||||||
|
}
|
||||||
|
|
||||||
|
retval = PyObject_CallObject(func, pyargs);
|
||||||
|
Py_DECREF(pyargs);
|
||||||
|
if (PyErr_Occurred()) {
|
||||||
|
pygpgme_stash_callback_exception(self);
|
||||||
|
result = -1;
|
||||||
|
goto leave;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (! PyLong_Check(retval)) {
|
||||||
|
PyErr_Format(PyExc_TypeError,
|
||||||
|
"expected int from read callback, got %s",
|
||||||
|
retval->ob_type->tp_name);
|
||||||
|
pygpgme_stash_callback_exception(self);
|
||||||
|
result = -1;
|
||||||
|
goto leave;
|
||||||
|
}
|
||||||
|
|
||||||
|
result = PyLong_AsSsize_t(retval);
|
||||||
|
|
||||||
|
leave:
|
||||||
|
Py_XDECREF(retval);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Set the current position from where the next read or write starts
|
||||||
|
in the data object with the handle HOOK to OFFSET, relativ to
|
||||||
|
WHENCE. Returns the new offset in bytes from the beginning of the
|
||||||
|
data object. */
|
||||||
|
static off_t pyDataSeekCb(void *hook, off_t offset, int whence)
|
||||||
|
{
|
||||||
|
off_t result;
|
||||||
|
PyObject *pyhook = (PyObject *) hook;
|
||||||
|
PyObject *self = NULL;
|
||||||
|
PyObject *func = NULL;
|
||||||
|
PyObject *dataarg = NULL;
|
||||||
|
PyObject *pyargs = NULL;
|
||||||
|
PyObject *retval = NULL;
|
||||||
|
|
||||||
|
assert (PyTuple_Check(pyhook));
|
||||||
|
assert (PyTuple_Size(pyhook) == 5 || PyTuple_Size(pyhook) == 6);
|
||||||
|
|
||||||
|
self = PyTuple_GetItem(pyhook, 0);
|
||||||
|
func = PyTuple_GetItem(pyhook, 3);
|
||||||
|
if (PyTuple_Size(pyhook) == 6) {
|
||||||
|
dataarg = PyTuple_GetItem(pyhook, 5);
|
||||||
|
pyargs = PyTuple_New(3);
|
||||||
|
} else {
|
||||||
|
pyargs = PyTuple_New(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined(_FILE_OFFSET_BITS) && _FILE_OFFSET_BITS == 64
|
||||||
|
PyTuple_SetItem(pyargs, 0, PyLong_FromLongLong((long long) offset));
|
||||||
|
#else
|
||||||
|
PyTuple_SetItem(pyargs, 0, PyLong_FromLong((long) offset));
|
||||||
|
#endif
|
||||||
|
PyTuple_SetItem(pyargs, 1, PyLong_FromLong((long) whence));
|
||||||
|
if (dataarg) {
|
||||||
|
Py_INCREF(dataarg);
|
||||||
|
PyTuple_SetItem(pyargs, 2, dataarg);
|
||||||
|
}
|
||||||
|
|
||||||
|
retval = PyObject_CallObject(func, pyargs);
|
||||||
|
Py_DECREF(pyargs);
|
||||||
|
if (PyErr_Occurred()) {
|
||||||
|
pygpgme_stash_callback_exception(self);
|
||||||
|
result = -1;
|
||||||
|
goto leave;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (! PyLong_Check(retval)) {
|
||||||
|
PyErr_Format(PyExc_TypeError,
|
||||||
|
"expected int from read callback, got %s",
|
||||||
|
retval->ob_type->tp_name);
|
||||||
|
pygpgme_stash_callback_exception(self);
|
||||||
|
result = -1;
|
||||||
|
goto leave;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined(_FILE_OFFSET_BITS) && _FILE_OFFSET_BITS == 64
|
||||||
|
result = PyLong_AsLongLong(retval);
|
||||||
|
#else
|
||||||
|
result = PyLong_AsLong(retval);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
leave:
|
||||||
|
Py_XDECREF(retval);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Close the data object with the handle HOOK. */
|
||||||
|
static void pyDataReleaseCb(void *hook)
|
||||||
|
{
|
||||||
|
PyObject *pyhook = (PyObject *) hook;
|
||||||
|
PyObject *self = NULL;
|
||||||
|
PyObject *func = NULL;
|
||||||
|
PyObject *dataarg = NULL;
|
||||||
|
PyObject *pyargs = NULL;
|
||||||
|
PyObject *retval = NULL;
|
||||||
|
|
||||||
|
assert (PyTuple_Check(pyhook));
|
||||||
|
assert (PyTuple_Size(pyhook) == 5 || PyTuple_Size(pyhook) == 6);
|
||||||
|
|
||||||
|
self = PyTuple_GetItem(pyhook, 0);
|
||||||
|
func = PyTuple_GetItem(pyhook, 4);
|
||||||
|
if (PyTuple_Size(pyhook) == 6) {
|
||||||
|
dataarg = PyTuple_GetItem(pyhook, 5);
|
||||||
|
pyargs = PyTuple_New(1);
|
||||||
|
} else {
|
||||||
|
pyargs = PyTuple_New(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dataarg) {
|
||||||
|
Py_INCREF(dataarg);
|
||||||
|
PyTuple_SetItem(pyargs, 0, dataarg);
|
||||||
|
}
|
||||||
|
|
||||||
|
retval = PyObject_CallObject(func, pyargs);
|
||||||
|
Py_XDECREF(retval);
|
||||||
|
Py_DECREF(pyargs);
|
||||||
|
if (PyErr_Occurred())
|
||||||
|
pygpgme_stash_callback_exception(self);
|
||||||
|
}
|
||||||
|
|
||||||
|
gpgme_error_t pygpgme_data_new_from_cbs(gpgme_data_t *r_data,
|
||||||
|
PyObject *pycbs,
|
||||||
|
PyObject **freelater)
|
||||||
|
{
|
||||||
|
static struct gpgme_data_cbs cbs = {
|
||||||
|
pyDataReadCb,
|
||||||
|
pyDataWriteCb,
|
||||||
|
pyDataSeekCb,
|
||||||
|
pyDataReleaseCb,
|
||||||
|
};
|
||||||
|
PyObject *dataarg = NULL;
|
||||||
|
|
||||||
|
assert (PyTuple_Check(pycbs));
|
||||||
|
assert (PyTuple_Size(pycbs) == 5 || PyTuple_Size(pycbs) == 6);
|
||||||
|
|
||||||
|
Py_INCREF(pycbs);
|
||||||
|
*freelater = pycbs;
|
||||||
|
|
||||||
|
return gpgme_data_new_from_cbs(r_data, &cbs, (void *) pycbs);
|
||||||
|
}
|
||||||
|
@ -40,3 +40,7 @@ void pygpgme_set_status_cb(gpgme_ctx_t ctx, PyObject *cb,
|
|||||||
|
|
||||||
gpgme_error_t pyEditCb(void *opaque, gpgme_status_code_t status,
|
gpgme_error_t pyEditCb(void *opaque, gpgme_status_code_t status,
|
||||||
const char *args, int fd);
|
const char *args, int fd);
|
||||||
|
|
||||||
|
gpgme_error_t pygpgme_data_new_from_cbs(gpgme_data_t *r_data,
|
||||||
|
PyObject *pycbs,
|
||||||
|
PyObject **freelater);
|
||||||
|
@ -384,7 +384,7 @@ class Data(GpgmeWrapper):
|
|||||||
|
|
||||||
If cbs is specified, it MUST be a tuple of the form:
|
If cbs is specified, it MUST be a tuple of the form:
|
||||||
|
|
||||||
((read_cb, write_cb, seek_cb, release_cb), hook)
|
(read_cb, write_cb, seek_cb, release_cb[, hook])
|
||||||
|
|
||||||
where func is a callback function taking two arguments (count,
|
where func is a callback function taking two arguments (count,
|
||||||
hook) and returning a string of read data, or None on EOF.
|
hook) and returning a string of read data, or None on EOF.
|
||||||
@ -396,7 +396,7 @@ class Data(GpgmeWrapper):
|
|||||||
|
|
||||||
Any other use will result in undefined or erroneous behavior."""
|
Any other use will result in undefined or erroneous behavior."""
|
||||||
super().__init__(None)
|
super().__init__(None)
|
||||||
self.last_readcb = None
|
self.data_cbs = None
|
||||||
|
|
||||||
if cbs != None:
|
if cbs != None:
|
||||||
self.new_from_cbs(*cbs)
|
self.new_from_cbs(*cbs)
|
||||||
@ -419,15 +419,18 @@ class Data(GpgmeWrapper):
|
|||||||
|
|
||||||
if self.wrapped != None and pygpgme.gpgme_data_release:
|
if self.wrapped != None and pygpgme.gpgme_data_release:
|
||||||
pygpgme.gpgme_data_release(self.wrapped)
|
pygpgme.gpgme_data_release(self.wrapped)
|
||||||
self._free_readcb()
|
if self._callback_excinfo:
|
||||||
|
print(self._callback_excinfo)
|
||||||
|
pygpgme.pygpgme_raise_callback_exception(self)
|
||||||
|
self._free_datacbs()
|
||||||
|
|
||||||
def _free_readcb(self):
|
def _free_datacbs(self):
|
||||||
if self.last_readcb != None:
|
if self.data_cbs != None:
|
||||||
if pygpgme.pygpgme_clear_generic_cb:
|
if pygpgme.pygpgme_clear_generic_cb:
|
||||||
pygpgme.pygpgme_clear_generic_cb(self.last_readcb)
|
pygpgme.pygpgme_clear_generic_cb(self.data_cbs)
|
||||||
if pygpgme.delete_PyObject_p_p:
|
if pygpgme.delete_PyObject_p_p:
|
||||||
pygpgme.delete_PyObject_p_p(self.last_readcb)
|
pygpgme.delete_PyObject_p_p(self.data_cbs)
|
||||||
self.last_readcb = None
|
self.data_cbs = None
|
||||||
|
|
||||||
def new(self):
|
def new(self):
|
||||||
tmp = pygpgme.new_gpgme_data_t_p()
|
tmp = pygpgme.new_gpgme_data_t_p()
|
||||||
@ -453,14 +456,18 @@ class Data(GpgmeWrapper):
|
|||||||
self.wrapped = pygpgme.gpgme_data_t_p_value(tmp)
|
self.wrapped = pygpgme.gpgme_data_t_p_value(tmp)
|
||||||
pygpgme.delete_gpgme_data_t_p(tmp)
|
pygpgme.delete_gpgme_data_t_p(tmp)
|
||||||
|
|
||||||
def new_from_cbs(self, funcs, hook):
|
def new_from_cbs(self, read_cb, write_cb, seek_cb, release_cb, hook=None):
|
||||||
"""Argument funcs must be a 4 element tuple with callbacks:
|
assert self.data_cbs == None
|
||||||
(read_cb, write_cb, seek_cb, release_cb)"""
|
self.data_cbs = pygpgme.new_PyObject_p_p()
|
||||||
tmp = pygpgme.new_gpgme_data_t_p()
|
tmp = pygpgme.new_gpgme_data_t_p()
|
||||||
self._free_readcb()
|
if hook != None:
|
||||||
self.last_readcb = pygpgme.new_PyObject_p_p()
|
hookdata = (weakref.ref(self),
|
||||||
hookdata = (funcs, hook)
|
read_cb, write_cb, seek_cb, release_cb, hook)
|
||||||
pygpgme.pygpgme_data_new_from_cbs(tmp, hookdata, self.last_readcb)
|
else:
|
||||||
|
hookdata = (weakref.ref(self),
|
||||||
|
read_cb, write_cb, seek_cb, release_cb)
|
||||||
|
errorcheck(
|
||||||
|
pygpgme.pygpgme_data_new_from_cbs(tmp, hookdata, self.data_cbs))
|
||||||
self.wrapped = pygpgme.gpgme_data_t_p_value(tmp)
|
self.wrapped = pygpgme.gpgme_data_t_p_value(tmp)
|
||||||
pygpgme.delete_gpgme_data_t_p(tmp)
|
pygpgme.delete_gpgme_data_t_p(tmp)
|
||||||
|
|
||||||
@ -512,7 +519,10 @@ class Data(GpgmeWrapper):
|
|||||||
If a string is given, it is implicitly encoded using UTF-8."""
|
If a string is given, it is implicitly encoded using UTF-8."""
|
||||||
written = pygpgme.gpgme_data_write(self.wrapped, buffer)
|
written = pygpgme.gpgme_data_write(self.wrapped, buffer)
|
||||||
if written < 0:
|
if written < 0:
|
||||||
raise GPGMEError.fromSyserror()
|
if self._callback_excinfo:
|
||||||
|
pygpgme.pygpgme_raise_callback_exception(self)
|
||||||
|
else:
|
||||||
|
raise GPGMEError.fromSyserror()
|
||||||
return written
|
return written
|
||||||
|
|
||||||
def read(self, size = -1):
|
def read(self, size = -1):
|
||||||
@ -527,11 +537,24 @@ class Data(GpgmeWrapper):
|
|||||||
return ''
|
return ''
|
||||||
|
|
||||||
if size > 0:
|
if size > 0:
|
||||||
return pygpgme.gpgme_data_read(self.wrapped, size)
|
try:
|
||||||
|
result = pygpgme.gpgme_data_read(self.wrapped, size)
|
||||||
|
except:
|
||||||
|
if self._callback_excinfo:
|
||||||
|
pygpgme.pygpgme_raise_callback_exception(self)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
return result
|
||||||
else:
|
else:
|
||||||
chunks = []
|
chunks = []
|
||||||
while 1:
|
while True:
|
||||||
result = pygpgme.gpgme_data_read(self.wrapped, 4096)
|
try:
|
||||||
|
result = pygpgme.gpgme_data_read(self.wrapped, 4096)
|
||||||
|
except:
|
||||||
|
if self._callback_excinfo:
|
||||||
|
pygpgme.pygpgme_raise_callback_exception(self)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
if len(result) == 0:
|
if len(result) == 0:
|
||||||
break
|
break
|
||||||
chunks.append(result)
|
chunks.append(result)
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
# You should have received a copy of the GNU Lesser General Public
|
# You should have received a copy of the GNU Lesser General Public
|
||||||
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import os
|
||||||
from pyme import core, constants
|
from pyme import core, constants
|
||||||
import support
|
import support
|
||||||
|
|
||||||
@ -181,3 +182,73 @@ except Exception as e:
|
|||||||
assert e == myException
|
assert e == myException
|
||||||
else:
|
else:
|
||||||
assert False, "Expected an error, got none"
|
assert False, "Expected an error, got none"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# Test the data callbacks.
|
||||||
|
def read_cb(amount, hook=None):
|
||||||
|
assert hook == cookie
|
||||||
|
return 0
|
||||||
|
def release_cb(hook=None):
|
||||||
|
assert hook == cookie
|
||||||
|
data = core.Data(cbs=(read_cb, None, None, release_cb, cookie))
|
||||||
|
try:
|
||||||
|
data.read()
|
||||||
|
except Exception as e:
|
||||||
|
assert type(e) == TypeError
|
||||||
|
else:
|
||||||
|
assert False, "Expected an error, got none"
|
||||||
|
|
||||||
|
def read_cb(amount):
|
||||||
|
raise myException
|
||||||
|
data = core.Data(cbs=(read_cb, None, None, lambda: None))
|
||||||
|
try:
|
||||||
|
data.read()
|
||||||
|
except Exception as e:
|
||||||
|
assert e == myException
|
||||||
|
else:
|
||||||
|
assert False, "Expected an error, got none"
|
||||||
|
|
||||||
|
|
||||||
|
def write_cb(what, hook=None):
|
||||||
|
assert hook == cookie
|
||||||
|
return "wrong type"
|
||||||
|
data = core.Data(cbs=(None, write_cb, None, release_cb, cookie))
|
||||||
|
try:
|
||||||
|
data.write(b'stuff')
|
||||||
|
except Exception as e:
|
||||||
|
assert type(e) == TypeError
|
||||||
|
else:
|
||||||
|
assert False, "Expected an error, got none"
|
||||||
|
|
||||||
|
def write_cb(what):
|
||||||
|
raise myException
|
||||||
|
data = core.Data(cbs=(None, write_cb, None, lambda: None))
|
||||||
|
try:
|
||||||
|
data.write(b'stuff')
|
||||||
|
except Exception as e:
|
||||||
|
assert e == myException
|
||||||
|
else:
|
||||||
|
assert False, "Expected an error, got none"
|
||||||
|
|
||||||
|
|
||||||
|
def seek_cb(offset, whence, hook=None):
|
||||||
|
assert hook == cookie
|
||||||
|
return "wrong type"
|
||||||
|
data = core.Data(cbs=(None, None, seek_cb, release_cb, cookie))
|
||||||
|
try:
|
||||||
|
data.seek(0, os.SEEK_SET)
|
||||||
|
except Exception as e:
|
||||||
|
assert type(e) == TypeError
|
||||||
|
else:
|
||||||
|
assert False, "Expected an error, got none"
|
||||||
|
|
||||||
|
def seek_cb(offset, whence):
|
||||||
|
raise myException
|
||||||
|
data = core.Data(cbs=(None, None, seek_cb, lambda: None))
|
||||||
|
try:
|
||||||
|
data.seek(0, os.SEEK_SET)
|
||||||
|
except Exception as e:
|
||||||
|
assert e == myException
|
||||||
|
else:
|
||||||
|
assert False, "Expected an error, got none"
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
# You should have received a copy of the GNU Lesser General Public
|
# You should have received a copy of the GNU Lesser General Public
|
||||||
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import io
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
from pyme import core
|
from pyme import core
|
||||||
@ -79,3 +80,43 @@ with tempfile.NamedTemporaryFile() as tmp:
|
|||||||
# Open using name, offset, and length.
|
# Open using name, offset, and length.
|
||||||
data = core.Data(file=tmp.name, offset=23, length=42)
|
data = core.Data(file=tmp.name, offset=23, length=42)
|
||||||
assert data.read() == binjunk[23:23+42]
|
assert data.read() == binjunk[23:23+42]
|
||||||
|
|
||||||
|
# Test callbacks.
|
||||||
|
class DataObject(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.buffer = io.BytesIO()
|
||||||
|
self.released = False
|
||||||
|
|
||||||
|
def read(self, amount, hook=None):
|
||||||
|
assert not self.released
|
||||||
|
return self.buffer.read(amount)
|
||||||
|
|
||||||
|
def write(self, data, hook=None):
|
||||||
|
assert not self.released
|
||||||
|
return self.buffer.write(data)
|
||||||
|
|
||||||
|
def seek(self, offset, whence, hook=None):
|
||||||
|
assert not self.released
|
||||||
|
return self.buffer.seek(offset, whence)
|
||||||
|
|
||||||
|
def release(self, hook=None):
|
||||||
|
assert not self.released
|
||||||
|
self.released = True
|
||||||
|
|
||||||
|
do = DataObject()
|
||||||
|
cookie = object()
|
||||||
|
data = core.Data(cbs=(do.read, do.write, do.seek, do.release, cookie))
|
||||||
|
data.write('Hello world!')
|
||||||
|
data.seek(0, os.SEEK_SET)
|
||||||
|
assert data.read() == b'Hello world!'
|
||||||
|
del data
|
||||||
|
assert do.released
|
||||||
|
|
||||||
|
# Again, without the cookie.
|
||||||
|
do = DataObject()
|
||||||
|
data = core.Data(cbs=(do.read, do.write, do.seek, do.release))
|
||||||
|
data.write('Hello world!')
|
||||||
|
data.seek(0, os.SEEK_SET)
|
||||||
|
assert data.read() == b'Hello world!'
|
||||||
|
del data
|
||||||
|
assert do.released
|
||||||
|
Loading…
Reference in New Issue
Block a user