Merge branch 'fix/89-python3' into develop

fix/93-encrypt-to-stream
Isis Lovecruft 2015-03-13 02:47:57 +00:00
commit 9d8d6cce5c
No known key found for this signature in database
GPG Key ID: 18C16EC5F9F1D673
4 changed files with 129 additions and 48 deletions

View File

@ -1005,7 +1005,7 @@ class GPGBase(object):
if output_filename:
log.info("Writing encrypted output to file: %s" % output_filename)
with open(output_filename, 'w+') as fh:
with open(output_filename, 'wb') as fh:
fh.write(result.data)
fh.flush()
log.info("Encrypted output written successfully.")

View File

@ -167,7 +167,6 @@ def _copy_data(instream, outstream):
:param file outstream: The file descriptor of a tmpfile to write to.
"""
sent = 0
coder = find_encodings()
while True:
@ -179,24 +178,73 @@ def _copy_data(instream, outstream):
data = instream.read(1024)
if len(data) == 0:
break
sent += len(data)
log.debug("Sending chunk %d bytes:\n%s"
% (sent, data))
try:
outstream.write(data)
except UnicodeError:
log.debug("Sending chunk %d bytes:\n%s" % (sent, data))
if _py3k and isinstance(data, bytes):
encoded = coder.encode(data.decode(coder.name))[0]
elif _py3k and isinstance(data, str):
encoded = coder.encode(data)[0]
elif not _py3k and type(data) is not str:
encoded = coder.encode(data)[0]
else:
encoded = data
log.debug("Writing encoded data with type %s to outstream... "
% type(encoded))
if not _py3k:
try:
outstream.write(coder.encode(data))
except IOError:
log.exception("Error sending data: Broken pipe")
outstream.write(encoded)
except IOError as ioe:
# Can get 'broken pipe' errors even when all data was sent
if 'Broken pipe' in str(ioe):
log.error('Error sending data: Broken pipe')
else:
log.exception(ioe)
break
except IOError as ioe:
# Can get 'broken pipe' errors even when all data was sent
if 'Broken pipe' in str(ioe):
log.error('Error sending data: Broken pipe')
else:
log.exception(ioe)
break
log.debug("Wrote data type <type 'str'> to outstream.")
else:
try:
outstream.write(bytes(encoded))
except TypeError as te:
# XXX FIXME This appears to happen because
# _threaded_copy_data() sometimes passes the `outstream` as an
# object with type <_io.BufferredWriter> and at other times
# with type <encodings.utf_8.StreamWriter>. We hit the
# following error when the `outstream` has type
# <encodings.utf_8.StreamWriter>.
if not "convert 'bytes' object to str implicitly" in str(te):
log.error(str(te))
try:
outstream.write(encoded.decode())
except TypeError as yate:
# We hit the "'str' does not support the buffer interface"
# error in Python3 when the `outstream` is an io.BytesIO and
# we try to write a str to it. We don't care about that
# error, we'll just try again with bytes.
if not "does not support the buffer interface" in str(yate):
log.error(str(yate))
except IOError as ioe:
# Can get 'broken pipe' errors even when all data was sent
if 'Broken pipe' in str(ioe):
log.error('Error sending data: Broken pipe')
else:
log.exception(ioe)
break
else:
log.debug("Wrote data type <class 'str'> outstream.")
except IOError as ioe:
# Can get 'broken pipe' errors even when all data was sent
if 'Broken pipe' in str(ioe):
log.error('Error sending data: Broken pipe')
else:
log.exception(ioe)
break
else:
log.debug("Wrote data type <class 'bytes'> to outstream.")
try:
outstream.close()
except IOError as ioe:

View File

@ -801,7 +801,7 @@ class GPG(GPGBase):
key = key.replace('_','-').title()
## to set 'cert', 'Key-Usage' must be blank string
if not key in ('Key-Usage', 'Subkey-Usage'):
if str(val).strip():
if type(u'')(val).strip():
parms[key] = val
## if Key-Type is 'default', make Subkey-Type also be 'default'

View File

@ -26,6 +26,7 @@ A test harness and unittests for gnupg.py.
from __future__ import absolute_import
from __future__ import print_function
from __future__ import with_statement
from argparse import ArgumentParser
from codecs import open as open
from functools import wraps
@ -389,7 +390,10 @@ class GPGTestCase(unittest.TestCase):
def test_gen_key_input(self):
"""Test that GnuPG batch file creation is successful."""
key_input = self.generate_key_input("Francisco Ferrer", "an.ok")
self.assertIsInstance(key_input, str)
if _util._py3k:
self.assertIsInstance(key_input, str)
else:
self.assertIsInstance(key_input, basestring)
self.assertGreater(key_input.find('Francisco Ferrer'), 0)
def test_rsa_key_generation(self):
@ -777,14 +781,12 @@ authentication."""
log.debug("Encrypted: %s" % encrypted)
self.assertNotEquals(message, encrypted)
def test_encryption_of_file_like_objects(self):
"""Test encryption of file-like objects"""
key = self.generate_key("Craig Gentry", "xorr.ox",
passphrase="craiggentry")
gentry_fpr = str(key.fingerprint)
def _encryption_test_setup(self):
passphrase = "craiggentry"
key = self.generate_key("Craig Gentry", "xorr.ox", passphrase=passphrase)
fpr = str(key.fingerprint)
gentry = self.gpg.export_keys(key.fingerprint)
self.gpg.import_keys(gentry)
message = """
In 2010 Riggio and Sicari presented a practical application of homomorphic
encryption to a hybrid wireless sensor/mesh network. The system enables
@ -792,32 +794,63 @@ transparent multi-hop wireless backhauls that are able to perform statistical
analysis of different kinds of data (temperature, humidity, etc.) coming from
a WSN while ensuring both end-to-end encryption and hop-by-hop
authentication."""
return (message, fpr, passphrase)
def _encryption_test_wrapper(stream_type, message):
stream = stream_type(message)
encrypted = str(self.gpg.encrypt(stream, gentry_fpr))
decrypted = str(self.gpg.decrypt(encrypted,
passphrase="craiggentry"))
self.assertEqual(message, decrypted)
def _encryption_test(self, stream_type, message, fingerprint, passphrase):
stream = stream_type(message)
encrypted = self.gpg.encrypt(stream, fingerprint).data
decrypted = self.gpg.decrypt(encrypted, passphrase=passphrase).data
if isinstance(decrypted, bytes):
decrypted = decrypted.decode()
if isinstance(message, bytes):
message = message.decode()
self.assertEqual(message, decrypted)
def test_encryption_of_file_like_objects_io_StringIO(self):
"""Test encryption of file-like object io.StringIO."""
message, fpr, passphrase = self._encryption_test_setup()
# Test io.StringIO and io.BytesIO (Python 2.6+)
try:
from io import StringIO, BytesIO
_encryption_test_wrapper(StringIO, unicode(message))
_encryption_test_wrapper(BytesIO, message)
from io import StringIO
if _util._py3k:
self._encryption_test(StringIO, message, fpr, passphrase)
else:
self._encryption_test(StringIO, unicode(message), fpr, passphrase)
except ImportError:
pass
# Test StringIO.StringIO
from StringIO import StringIO
_encryption_test_wrapper(StringIO, message)
def test_encryption_of_file_like_objects_io_BytesIO(self):
"""Test encryption of file-like object io.BytesIO."""
message, fpr, passphrase = self._encryption_test_setup()
# Test cStringIO.StringIO
from cStringIO import StringIO
_encryption_test_wrapper(StringIO, message)
try:
from io import BytesIO
if _util._py3k:
self._encryption_test(BytesIO, bytes(message, 'utf-8'), fpr, passphrase)
else:
self._encryption_test(BytesIO, message, fpr, passphrase)
except ImportError:
pass
def test_encryption_of_file_like_objects_StringIO_StringIO(self):
"""Test encryption of file-like object StringIO.StringIO (Python2 only)."""
message, fpr, passphrase = self._encryption_test_setup()
if not _util._py3k:
from StringIO import StringIO
self._encryption_test(StringIO, message, fpr, passphrase)
def test_encryption_of_file_like_objects_cStringIO_StringIO(self):
"""Test encryption of file-like object cStringIO.StringIO (Python2 only)."""
message, fpr, passphrase = self._encryption_test_setup()
if not _util._py3k:
from cStringIO import StringIO
self._encryption_test(StringIO, message, fpr, passphrase)
def test_encryption_alt_encoding(self):
"""Test encryption with latin-1 encoding"""
key = self.generate_key("Craig Gentry", "xorr.ox",
passphrase="craiggentry")
@ -825,11 +858,7 @@ authentication."""
key = self.generate_key("Marten van Dijk", "xorr.ox")
dijk = str(key.fingerprint)
self.gpg._encoding = 'latin-1'
if _util._py3k:
data = 'Hello, André!'
else:
data = unicode('Hello, André', self.gpg._encoding)
data = data.encode(self.gpg._encoding)
data = u'Hello, André!'.encode(self.gpg._encoding)
encrypted = self.gpg.encrypt(data, gentry)
edata = str(encrypted.data)
self.assertNotEqual(data, edata)
@ -998,7 +1027,8 @@ boolean circuit causes a considerable overhead."""
## We expect Alice's key to be hidden (returned as zero's) and Bob's
## key to be there.
expected_values = ["0000000000000000", "0000000000000000"]
self.assertEquals(expected_values, self.gpg.list_packets(encrypted).encrypted_to)
packets = self.gpg.list_packets(encrypted)
self.assertEquals(expected_values, packets.encrypted_to)
def test_encryption_decryption_multi_recipient(self):
"""Test decryption of an encrypted string for multiple users"""
@ -1187,7 +1217,10 @@ suites = { 'parsers': set(['test_parsers_fix_unsafe',
'test_signature_string_verification',
'test_signature_string_algorithm_encoding']),
'crypt': set(['test_encryption',
'test_encryption_of_file_like_objects',
'test_encryption_of_file_like_objects_io_StringIO',
'test_encryption_of_file_like_objects_io_BytesIO',
'test_encryption_of_file_like_objects_StringIO_StringIO',
'test_encryption_of_file_like_objects_cStringIO_StringIO',
'test_encryption_alt_encoding',
'test_encryption_multi_recipient',
'test_encryption_decryption_multi_recipient',