python: Fix reading data from existing files.
* lang/python/pyme/core.py (Data.__init__): Add 'copy' kwargument, and pass it to functions supporting it. PEP8 fix. (Data.new_from_fd): PEP8 fix. (Data.new_from_file): Give a more helpful error message if copy is False. PEP8 fix. (Data.new_from_fd): Hand the file descriptor to 'gpgme_data_new_from_fd', not a stream. Fix docstring. * lang/python/tests/t-data.py: Add tests for this. Signed-off-by: Justus Winter <justus@gnupg.org>
This commit is contained in:
parent
ce73ae9d0c
commit
bf188e280b
@ -370,8 +370,8 @@ class Data(GpgmeWrapper):
|
||||
return 0
|
||||
return 1
|
||||
|
||||
def __init__(self, string = None, file = None, offset = None,
|
||||
length = None, cbs = None):
|
||||
def __init__(self, string=None, file=None, offset=None,
|
||||
length=None, cbs=None, copy=True):
|
||||
"""Initialize a new gpgme_data_t object.
|
||||
|
||||
If no args are specified, make it an empty object.
|
||||
@ -402,12 +402,12 @@ class Data(GpgmeWrapper):
|
||||
if cbs != None:
|
||||
self.new_from_cbs(*cbs)
|
||||
elif string != None:
|
||||
self.new_from_mem(string)
|
||||
self.new_from_mem(string, copy)
|
||||
elif file != None and offset != None and length != None:
|
||||
self.new_from_filepart(file, offset, length)
|
||||
elif file != None:
|
||||
if type(file) == type("x"):
|
||||
self.new_from_file(file)
|
||||
self.new_from_file(file, copy)
|
||||
else:
|
||||
self.new_from_fd(file)
|
||||
else:
|
||||
@ -436,15 +436,21 @@ class Data(GpgmeWrapper):
|
||||
self.wrapped = pygpgme.gpgme_data_t_p_value(tmp)
|
||||
pygpgme.delete_gpgme_data_t_p(tmp)
|
||||
|
||||
def new_from_mem(self, string, copy = 1):
|
||||
def new_from_mem(self, string, copy=True):
|
||||
tmp = pygpgme.new_gpgme_data_t_p()
|
||||
errorcheck(pygpgme.gpgme_data_new_from_mem(tmp,string,len(string),copy))
|
||||
self.wrapped = pygpgme.gpgme_data_t_p_value(tmp)
|
||||
pygpgme.delete_gpgme_data_t_p(tmp)
|
||||
|
||||
def new_from_file(self, filename, copy = 1):
|
||||
def new_from_file(self, filename, copy=True):
|
||||
tmp = pygpgme.new_gpgme_data_t_p()
|
||||
try:
|
||||
errorcheck(pygpgme.gpgme_data_new_from_file(tmp, filename, copy))
|
||||
except errors.GPGMEError as e:
|
||||
if e.getcode() == errors.INV_VALUE and not copy:
|
||||
raise ValueError("delayed reads are not yet supported")
|
||||
else:
|
||||
raise e
|
||||
self.wrapped = pygpgme.gpgme_data_t_p_value(tmp)
|
||||
pygpgme.delete_gpgme_data_t_p(tmp)
|
||||
|
||||
@ -485,16 +491,13 @@ class Data(GpgmeWrapper):
|
||||
pygpgme.delete_gpgme_data_t_p(tmp)
|
||||
|
||||
def new_from_fd(self, file):
|
||||
"""This wraps the GPGME gpgme_data_new_from_fd() function.
|
||||
The argument "file" may be a file-like object, supporting the fileno()
|
||||
call and the mode attribute."""
|
||||
"""This wraps the GPGME gpgme_data_new_from_fd() function. The
|
||||
argument "file" must be a file-like object, supporting the
|
||||
fileno() method.
|
||||
|
||||
"""
|
||||
tmp = pygpgme.new_gpgme_data_t_p()
|
||||
fp = pygpgme.fdopen(file.fileno(), file.mode)
|
||||
if fp == None:
|
||||
raise ValueError("Failed to open file from %s arg %s" % \
|
||||
(str(type(file)), str(file)))
|
||||
errorcheck(pygpgme.gpgme_data_new_from_fd(tmp, fp))
|
||||
errorcheck(pygpgme.gpgme_data_new_from_fd(tmp, file.fileno()))
|
||||
self.wrapped = pygpgme.gpgme_data_t_p_value(tmp)
|
||||
pygpgme.delete_gpgme_data_t_p(tmp)
|
||||
|
||||
|
@ -18,7 +18,7 @@
|
||||
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import os
|
||||
|
||||
import tempfile
|
||||
from pyme import core
|
||||
|
||||
data = core.Data('Hello world!')
|
||||
@ -32,6 +32,9 @@ assert data.read() == b''
|
||||
data = core.Data(b'Hello world!')
|
||||
assert data.read() == b'Hello world!'
|
||||
|
||||
data = core.Data(b'Hello world!', copy=False)
|
||||
assert data.read() == b'Hello world!'
|
||||
|
||||
data = core.Data()
|
||||
data.write('Hello world!')
|
||||
data.seek(0, os.SEEK_SET)
|
||||
@ -47,3 +50,32 @@ data = core.Data()
|
||||
data.write(binjunk)
|
||||
data.seek(0, os.SEEK_SET)
|
||||
assert data.read() == binjunk
|
||||
|
||||
# Test reading from an existing file.
|
||||
with tempfile.NamedTemporaryFile() as tmp:
|
||||
tmp.write(binjunk)
|
||||
tmp.flush()
|
||||
tmp.seek(0)
|
||||
|
||||
# Open using name.
|
||||
data = core.Data(file=tmp.name)
|
||||
assert data.read() == binjunk
|
||||
|
||||
# Open using name, without copying.
|
||||
if False:
|
||||
# delayed reads are not yet supported
|
||||
data = core.Data(file=tmp.name, copy=False)
|
||||
assert data.read() == binjunk
|
||||
|
||||
# Open using stream.
|
||||
tmp.seek(0)
|
||||
data = core.Data(file=tmp)
|
||||
assert data.read() == binjunk
|
||||
|
||||
# Open using stream, offset, and length.
|
||||
data = core.Data(file=tmp, offset=0, length=42)
|
||||
assert data.read() == binjunk[:42]
|
||||
|
||||
# Open using name, offset, and length.
|
||||
data = core.Data(file=tmp.name, offset=23, length=42)
|
||||
assert data.read() == binjunk[23:23+42]
|
||||
|
Loading…
Reference in New Issue
Block a user