From bf188e280b8b4fc775f33c47e2e1e275ed044004 Mon Sep 17 00:00:00 2001 From: Justus Winter Date: Wed, 25 May 2016 12:47:28 +0200 Subject: [PATCH] python: Fix reading data from existing files. * lang/python/pyme/core.py (Data.__init__): Add 'copy' kwargument, and pass it to functions supporting it. PEP8 fix. (Data.new_from_fd): PEP8 fix. (Data.new_from_file): Give a more helpful error message if copy is False. PEP8 fix. (Data.new_from_fd): Hand the file descriptor to 'gpgme_data_new_from_fd', not a stream. Fix docstring. * lang/python/tests/t-data.py: Add tests for this. Signed-off-by: Justus Winter --- lang/python/pyme/core.py | 33 ++++++++++++++++++--------------- lang/python/tests/t-data.py | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 16 deletions(-) diff --git a/lang/python/pyme/core.py b/lang/python/pyme/core.py index 5f8378de..0c2dd602 100644 --- a/lang/python/pyme/core.py +++ b/lang/python/pyme/core.py @@ -370,8 +370,8 @@ class Data(GpgmeWrapper): return 0 return 1 - def __init__(self, string = None, file = None, offset = None, - length = None, cbs = None): + def __init__(self, string=None, file=None, offset=None, + length=None, cbs=None, copy=True): """Initialize a new gpgme_data_t object. If no args are specified, make it an empty object. @@ -402,12 +402,12 @@ class Data(GpgmeWrapper): if cbs != None: self.new_from_cbs(*cbs) elif string != None: - self.new_from_mem(string) + self.new_from_mem(string, copy) elif file != None and offset != None and length != None: self.new_from_filepart(file, offset, length) elif file != None: if type(file) == type("x"): - self.new_from_file(file) + self.new_from_file(file, copy) else: self.new_from_fd(file) else: @@ -436,15 +436,21 @@ class Data(GpgmeWrapper): self.wrapped = pygpgme.gpgme_data_t_p_value(tmp) pygpgme.delete_gpgme_data_t_p(tmp) - def new_from_mem(self, string, copy = 1): + def new_from_mem(self, string, copy=True): tmp = pygpgme.new_gpgme_data_t_p() errorcheck(pygpgme.gpgme_data_new_from_mem(tmp,string,len(string),copy)) self.wrapped = pygpgme.gpgme_data_t_p_value(tmp) pygpgme.delete_gpgme_data_t_p(tmp) - def new_from_file(self, filename, copy = 1): + def new_from_file(self, filename, copy=True): tmp = pygpgme.new_gpgme_data_t_p() - errorcheck(pygpgme.gpgme_data_new_from_file(tmp, filename, copy)) + try: + errorcheck(pygpgme.gpgme_data_new_from_file(tmp, filename, copy)) + except errors.GPGMEError as e: + if e.getcode() == errors.INV_VALUE and not copy: + raise ValueError("delayed reads are not yet supported") + else: + raise e self.wrapped = pygpgme.gpgme_data_t_p_value(tmp) pygpgme.delete_gpgme_data_t_p(tmp) @@ -485,16 +491,13 @@ class Data(GpgmeWrapper): pygpgme.delete_gpgme_data_t_p(tmp) def new_from_fd(self, file): - """This wraps the GPGME gpgme_data_new_from_fd() function. - The argument "file" may be a file-like object, supporting the fileno() - call and the mode attribute.""" + """This wraps the GPGME gpgme_data_new_from_fd() function. The + argument "file" must be a file-like object, supporting the + fileno() method. + """ tmp = pygpgme.new_gpgme_data_t_p() - fp = pygpgme.fdopen(file.fileno(), file.mode) - if fp == None: - raise ValueError("Failed to open file from %s arg %s" % \ - (str(type(file)), str(file))) - errorcheck(pygpgme.gpgme_data_new_from_fd(tmp, fp)) + errorcheck(pygpgme.gpgme_data_new_from_fd(tmp, file.fileno())) self.wrapped = pygpgme.gpgme_data_t_p_value(tmp) pygpgme.delete_gpgme_data_t_p(tmp) diff --git a/lang/python/tests/t-data.py b/lang/python/tests/t-data.py index af2eb986..6cf10fab 100755 --- a/lang/python/tests/t-data.py +++ b/lang/python/tests/t-data.py @@ -18,7 +18,7 @@ # License along with this program; if not, see . import os - +import tempfile from pyme import core data = core.Data('Hello world!') @@ -32,6 +32,9 @@ assert data.read() == b'' data = core.Data(b'Hello world!') assert data.read() == b'Hello world!' +data = core.Data(b'Hello world!', copy=False) +assert data.read() == b'Hello world!' + data = core.Data() data.write('Hello world!') data.seek(0, os.SEEK_SET) @@ -47,3 +50,32 @@ data = core.Data() data.write(binjunk) data.seek(0, os.SEEK_SET) assert data.read() == binjunk + +# Test reading from an existing file. +with tempfile.NamedTemporaryFile() as tmp: + tmp.write(binjunk) + tmp.flush() + tmp.seek(0) + + # Open using name. + data = core.Data(file=tmp.name) + assert data.read() == binjunk + + # Open using name, without copying. + if False: + # delayed reads are not yet supported + data = core.Data(file=tmp.name, copy=False) + assert data.read() == binjunk + + # Open using stream. + tmp.seek(0) + data = core.Data(file=tmp) + assert data.read() == binjunk + + # Open using stream, offset, and length. + data = core.Data(file=tmp, offset=0, length=42) + assert data.read() == binjunk[:42] + + # Open using name, offset, and length. + data = core.Data(file=tmp.name, offset=23, length=42) + assert data.read() == binjunk[23:23+42]