Compare commits
70 Commits
Author | SHA1 | Date |
---|---|---|
|
2beac24161 | |
|
4f1b1f6a8d | |
|
da59707945 | |
|
a6d024ff49 | |
|
a06c93d6e7 | |
|
76d70c68aa | |
|
94f81c3226 | |
|
a749acf486 | |
|
16107bc8a8 | |
|
9be01ec6df | |
|
e90ae54738 | |
|
fea39ec83e | |
|
b0584854e1 | |
|
49079f9672 | |
|
adc4994a0d | |
|
8c5730666a | |
|
5895997554 | |
|
917a3dcd89 | |
|
b7f520244e | |
|
742ccd77fb | |
|
d66d86ca7e | |
|
0af2720754 | |
|
10df44b75f | |
|
af781626a9 | |
|
cf4d3efd8d | |
|
b7ff69092a | |
|
8e6c2a752c | |
|
79285c4c17 | |
|
6d1890389c | |
|
8579331562 | |
|
0c87da3d78 | |
|
38685ae001 | |
|
9d8d6cce5c | |
|
43164fa7db | |
|
a7e772f10a | |
|
4be6fb75e3 | |
|
b970917701 | |
|
782a81b46a | |
|
b3dd20a7e5 | |
|
af403fe144 | |
|
657be31ae1 | |
|
749ef6fa00 | |
|
2cf3dd1c86 | |
|
3127c21d55 | |
|
2c57c0f6d0 | |
|
88bfaaffc2 | |
|
eb205774fb | |
|
5d03c3c5eb | |
|
ae5cb33d63 | |
|
d3e6ae33b4 | |
|
d3b7dd0353 | |
|
d31d0cf131 | |
|
f3c193d8b4 | |
|
55b586fd18 | |
|
f858080148 | |
|
572429eed9 | |
|
ceb1c2fbbd | |
|
90c6613684 | |
|
939728694c | |
|
d66b23b896 | |
|
6c15f25ee5 | |
|
8c261eba30 | |
|
f8ccdc5028 | |
|
8a7699236c | |
|
5025df1661 | |
|
b1dab1570d | |
|
8f92335476 | |
|
613e84cd56 | |
|
a1c45a6f63 | |
|
77c6c3d0e5 |
|
@ -0,0 +1,33 @@
|
|||
[run]
|
||||
source =
|
||||
gnupg
|
||||
branch = True
|
||||
#parallel = True
|
||||
timid = True
|
||||
|
||||
[report]
|
||||
modules = gnupg
|
||||
omit =
|
||||
*/test*
|
||||
*/_version*
|
||||
*/__init__*
|
||||
*/copyleft*
|
||||
*/sitecustomize*
|
||||
# Regexes for lines to exclude from report generation:
|
||||
exclude_lines =
|
||||
pragma: no cover
|
||||
# don't complain if the code doesn't hit unimplemented sections:
|
||||
raise NotImplementedError
|
||||
pass
|
||||
# don't complain if non-runnable or debuging code isn't run:
|
||||
if 0:
|
||||
if False:
|
||||
def __repr__
|
||||
if __name__ == .__main__.:
|
||||
# Ignore source code which cannot be found:
|
||||
ignore_errors = True
|
||||
# Exit with status code 2 if under this percentage is covered:
|
||||
fail_under = 10
|
||||
|
||||
[html]
|
||||
directory = docs/coverage-html
|
71
Makefile
71
Makefile
|
@ -2,6 +2,8 @@ SHELL=/bin/sh
|
|||
TESTDIR=./gnupg/test
|
||||
TESTHANDLE=$(TESTDIR)/test_gnupg.py
|
||||
FILES=$(SHELL find ./gnupg/ -name "*.py" -printf "%p,")
|
||||
PYTHON=$(SHELL which python)
|
||||
PYTHON3=$(SHELL which python3)
|
||||
PKG_NAME=python-gnupg
|
||||
DOC_DIR=docs
|
||||
DOC_BUILD_DIR:=$(DOC_DIR)/_build
|
||||
|
@ -50,23 +52,70 @@ test-before: cleanup-src cleanup-tests
|
|||
which python && python --version
|
||||
-which pip && pip --version && pip list
|
||||
|
||||
test: test-before
|
||||
python $(TESTHANDLE) basic encodings parsers keyrings listkeys genkey \
|
||||
sign crypt
|
||||
test-run: test-before
|
||||
python $(TESTHANDLE) \
|
||||
basic \
|
||||
encodings \
|
||||
parsers \
|
||||
keyrings \
|
||||
listkeys \
|
||||
genkey \
|
||||
sign \
|
||||
crypt
|
||||
|
||||
py3k-test-run: test-before
|
||||
python3 $(TESTHANDLE) \
|
||||
basic \
|
||||
encodings \
|
||||
parsers \
|
||||
keyrings \
|
||||
listkeys \
|
||||
genkey \
|
||||
sign \
|
||||
crypt
|
||||
|
||||
coverage-run: test-before
|
||||
coverage run --rcfile=".coveragerc" $(PYTHON) $(TESTHANDLE) \
|
||||
basic \
|
||||
encodings \
|
||||
parsers \
|
||||
keyrings \
|
||||
listkeys \
|
||||
genkeys \
|
||||
sign \
|
||||
crypt
|
||||
|
||||
py3k-coverage-run: test-before
|
||||
coverage run --rcfile=".coveragerc" $(PYTHON3) $(TESTHANDLE) \
|
||||
basic \
|
||||
encodings \
|
||||
parsers \
|
||||
keyrings \
|
||||
listkeys \
|
||||
genkeys \
|
||||
sign \
|
||||
crypt
|
||||
|
||||
coverage-report:
|
||||
coverage report --rcfile=".coveragerc"
|
||||
|
||||
coverage-html:
|
||||
coverage html --rcfile=".coveragerc"
|
||||
|
||||
clean-test:
|
||||
touch gnupg/test/placeholder.log
|
||||
mv gnupg/test/*.log gnupg/test/logs/
|
||||
rm gnupg/test/logs/placeholder.log
|
||||
touch gnupg/test/random_seed_is_sekritly_pi
|
||||
rm gnupg/test/random_seed*
|
||||
|
||||
py3k-test: test-before
|
||||
python3 $(TESTHANDLE) basic encodings parsers keyrings listkeys genkey \
|
||||
sign crypt
|
||||
touch gnupg/test/placeholder.log
|
||||
mv gnupg/test/*.log gnupg/test/logs/
|
||||
rm gnupg/test/logs/placeholder.log
|
||||
touch gnupg/test/random_seed_is_sekritly_pi
|
||||
rm gnupg/test/random_seed*
|
||||
test: test-run clean-test
|
||||
|
||||
py3k-test: py3k-test-run clean-test
|
||||
|
||||
coverage: coverage-run coverage-report coverage-html clean-test
|
||||
|
||||
py3k-coverage: py3k-coverage-run coverage-report coverage-html clean-test
|
||||
|
||||
install:
|
||||
python setup.py install --record installed-files.txt
|
||||
|
|
|
@ -170,7 +170,7 @@ def displayNewKey(key):
|
|||
# `result` is a `gnupg._parsers.ListKeys`, which is list-like, so iterate
|
||||
# over all the keys and display their info:
|
||||
for gpgkey in keylist:
|
||||
for k, v in gpgkey:
|
||||
for k, v in gpgkey.items():
|
||||
log.info("%s: %s" % (k.capitalize(), v))
|
||||
|
||||
return keylist
|
||||
|
|
|
@ -32,14 +32,22 @@ import encodings
|
|||
import locale
|
||||
import os
|
||||
import platform
|
||||
import psutil
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
|
||||
## Using psutil is recommended, but since the extension doesn't run with the
|
||||
## PyPy interpreter, we'll run even if it's not present.
|
||||
try:
|
||||
import psutil
|
||||
except ImportError:
|
||||
psutil = None
|
||||
|
||||
from . import _parsers
|
||||
from . import _util
|
||||
from ._util import b
|
||||
from ._util import s
|
||||
|
||||
from ._parsers import _check_preferences
|
||||
from ._parsers import _sanitise_list
|
||||
|
@ -81,10 +89,19 @@ class GPGMeta(type):
|
|||
the same. (Sorry Windows users; maybe you should switch to anything
|
||||
else.)
|
||||
|
||||
.. note: This function will only run if the psutil_ Python extension
|
||||
is installed. Because psutil won't run with the PyPy interpreter,
|
||||
use of it is optional (although highly recommended).
|
||||
|
||||
.. _psutil: https://pypi.python.org/pypi/psutil
|
||||
|
||||
:returns: True if there exists a gpg-agent process running under the
|
||||
same effective user ID as that of this program. Otherwise,
|
||||
returns False.
|
||||
"""
|
||||
if not psutil:
|
||||
return False
|
||||
|
||||
this_process = psutil.Process(os.getpid())
|
||||
ownership_match = False
|
||||
|
||||
|
@ -132,7 +149,7 @@ class GPGBase(object):
|
|||
|
||||
def __init__(self, binary=None, home=None, keyring=None, secring=None,
|
||||
use_agent=False, default_preference_list=None,
|
||||
verbose=False, options=None):
|
||||
ignore_homedir_permissions=False, verbose=False, options=None):
|
||||
"""Create a ``GPGBase``.
|
||||
|
||||
This class is used to set up properties for controlling the behaviour
|
||||
|
@ -155,13 +172,14 @@ class GPGBase(object):
|
|||
:ivar str secring: The filename in **homedir** to use as the keyring
|
||||
file for secret keys.
|
||||
"""
|
||||
self.ignore_homedir_permissions = ignore_homedir_permissions
|
||||
self.binary = _util._find_binary(binary)
|
||||
self.homedir = os.path.expanduser(home) if home else _util._conf
|
||||
pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg'
|
||||
sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg'
|
||||
self.keyring = os.path.join(self._homedir, pub)
|
||||
self.secring = os.path.join(self._homedir, sec)
|
||||
self.options = _parsers._sanitise(options) if options else None
|
||||
self.options = list(_parsers._sanitise_list(options)) if options else None
|
||||
|
||||
#: The version string of our GnuPG binary
|
||||
self.binary_version = '0.0.0'
|
||||
|
@ -197,7 +215,7 @@ class GPGBase(object):
|
|||
"'verbose' must be boolean, string, or 0 <= n <= 9"
|
||||
assert isinstance(use_agent, bool), "'use_agent' must be boolean"
|
||||
if self.options is not None:
|
||||
assert isinstance(self.options, str), "options not string"
|
||||
assert isinstance(self.options, list), "options not list"
|
||||
except (AssertionError, AttributeError) as ae:
|
||||
log.error("GPGBase.__init__(): %s" % str(ae))
|
||||
raise RuntimeError(str(ae))
|
||||
|
@ -398,18 +416,21 @@ class GPGBase(object):
|
|||
log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd)
|
||||
_util._create_if_necessary(hd)
|
||||
|
||||
try:
|
||||
log.debug("GPGBase._homedir_setter(): checking permissions")
|
||||
assert _util._has_readwrite(hd), \
|
||||
"Homedir '%s' needs read/write permissions" % hd
|
||||
except AssertionError as ae:
|
||||
msg = ("Unable to set '%s' as GnuPG homedir" % directory)
|
||||
log.debug("GPGBase.homedir.setter(): %s" % msg)
|
||||
log.debug(str(ae))
|
||||
raise RuntimeError(str(ae))
|
||||
else:
|
||||
log.info("Setting homedir to '%s'" % hd)
|
||||
if self.ignore_homedir_permissions:
|
||||
self._homedir = hd
|
||||
else:
|
||||
try:
|
||||
log.debug("GPGBase._homedir_setter(): checking permissions")
|
||||
assert _util._has_readwrite(hd), \
|
||||
"Homedir '%s' needs read/write permissions" % hd
|
||||
except AssertionError as ae:
|
||||
msg = ("Unable to set '%s' as GnuPG homedir" % directory)
|
||||
log.debug("GPGBase.homedir.setter(): %s" % msg)
|
||||
log.debug(str(ae))
|
||||
raise RuntimeError(str(ae))
|
||||
else:
|
||||
log.info("Setting homedir to '%s'" % hd)
|
||||
self._homedir = hd
|
||||
|
||||
homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter)
|
||||
|
||||
|
@ -518,8 +539,8 @@ class GPGBase(object):
|
|||
|
||||
if passphrase: cmd.append('--batch --passphrase-fd 0')
|
||||
|
||||
if self.use_agent: cmd.append('--use-agent')
|
||||
else: cmd.append('--no-use-agent')
|
||||
if self.use_agent is True: cmd.append('--use-agent')
|
||||
elif self.use_agent is False: cmd.append('--no-use-agent')
|
||||
|
||||
# The arguments for debugging and verbosity should be placed into the
|
||||
# cmd list before the options/args in order to resolve Issue #76:
|
||||
|
@ -785,6 +806,19 @@ class GPGBase(object):
|
|||
## We could use _handle_io here except for the fact that if the
|
||||
## passphrase is bad, gpg bails and you can't write the message.
|
||||
result = self._result_map['sign'](self)
|
||||
|
||||
## If the passphrase is an empty string, the message up to and
|
||||
## including its first newline will be cut off before making it to the
|
||||
## GnuPG process. Therefore, if the passphrase='' or passphrase=b'',
|
||||
## we set passphrase=None. See Issue #82:
|
||||
## https://github.com/isislovecruft/python-gnupg/issues/82
|
||||
if _util._is_string(passphrase):
|
||||
passphrase = passphrase if len(passphrase) > 0 else None
|
||||
elif _util._is_bytes(passphrase):
|
||||
passphrase = s(passphrase) if len(passphrase) > 0 else None
|
||||
else:
|
||||
passphrase = None
|
||||
|
||||
proc = self._open_subprocess(args, passphrase is not None)
|
||||
try:
|
||||
if passphrase:
|
||||
|
@ -975,21 +1009,21 @@ class GPGBase(object):
|
|||
for recp in recipients.split(' '):
|
||||
self._add_recipient_string(args, hidden_recipients, recp)
|
||||
## ...and now that we've proven py3k is better...
|
||||
|
||||
else:
|
||||
log.debug("Don't know what to do with recipients: '%s'"
|
||||
log.debug("Don't know what to do with recipients: %r"
|
||||
% recipients)
|
||||
|
||||
result = self._result_map['crypt'](self)
|
||||
log.debug("Got data '%s' with type '%s'."
|
||||
% (data, type(data)))
|
||||
self._handle_io(args, data, result,
|
||||
passphrase=passphrase, binary=True)
|
||||
log.debug("\n%s" % result.data)
|
||||
log.debug("Got data '%s' with type '%s'." % (data, type(data)))
|
||||
self._handle_io(args, data, result, passphrase=passphrase, binary=True)
|
||||
# Avoid writing raw encrypted bytes to terminal loggers and breaking
|
||||
# them in that adorable way where they spew hieroglyphics until reset:
|
||||
if armor:
|
||||
log.debug("\n%s" % result.data)
|
||||
|
||||
if output_filename:
|
||||
log.info("Writing encrypted output to file: %s" % output_filename)
|
||||
with open(output_filename, 'w+') as fh:
|
||||
with open(output_filename, 'wb') as fh:
|
||||
fh.write(result.data)
|
||||
fh.flush()
|
||||
log.info("Encrypted output written successfully.")
|
||||
|
|
|
@ -367,7 +367,7 @@ def _sanitise(*args):
|
|||
checked += (val + " ")
|
||||
log.debug("_check_option(): No checks for %s" % val)
|
||||
|
||||
return checked
|
||||
return checked.rstrip(' ')
|
||||
|
||||
is_flag = lambda x: x.startswith('--')
|
||||
|
||||
|
@ -516,6 +516,7 @@ def _get_options_group(group=None):
|
|||
'--import',
|
||||
'--verify',
|
||||
'--verify-files',
|
||||
'--output',
|
||||
])
|
||||
#: These options expect a string. see :func:`_check_preferences`.
|
||||
pref_options = frozenset(['--digest-algo',
|
||||
|
@ -557,6 +558,9 @@ def _get_options_group(group=None):
|
|||
'--list-public-keys',
|
||||
'--list-secret-keys',
|
||||
'--list-sigs',
|
||||
'--lock-multiple',
|
||||
'--lock-never',
|
||||
'--lock-once',
|
||||
'--no-default-keyring',
|
||||
'--no-default-recipient',
|
||||
'--no-emit-version',
|
||||
|
@ -908,6 +912,7 @@ class Sign(object):
|
|||
timestamp = None
|
||||
#: xxx fill me in
|
||||
what = None
|
||||
status = None
|
||||
|
||||
def __init__(self, gpg):
|
||||
self._gpg = gpg
|
||||
|
@ -930,9 +935,9 @@ class Sign(object):
|
|||
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
|
||||
"""
|
||||
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
|
||||
"GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL",
|
||||
"INV_SGNR", "SIGEXPIRED"):
|
||||
pass
|
||||
"GOOD_PASSPHRASE", "MISSING_PASSPHRASE", "PINENTRY_LAUNCHED",
|
||||
"BEGIN_SIGNING", "CARDCTRL", "INV_SGNR", "SIGEXPIRED"):
|
||||
self.status = key.replace("_", " ").lower()
|
||||
elif key == "SIG_CREATED":
|
||||
(self.sig_type, self.sig_algo, self.sig_hash_algo,
|
||||
self.what, self.timestamp, self.fingerprint) = value.split()
|
||||
|
@ -949,6 +954,7 @@ class Sign(object):
|
|||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
|
||||
class ListKeys(list):
|
||||
"""Handle status messages for --list-keys.
|
||||
|
||||
|
@ -1271,7 +1277,8 @@ class Verify(object):
|
|||
self.trust_level = self.TRUST_LEVELS[key]
|
||||
elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT",
|
||||
"PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
|
||||
"DECRYPTION_OKAY", "INV_SGNR", "PROGRESS"):
|
||||
"DECRYPTION_OKAY", "INV_SGNR", "PROGRESS",
|
||||
"PINENTRY_LAUNCHED"):
|
||||
pass
|
||||
elif key == "BADSIG":
|
||||
self.valid = False
|
||||
|
@ -1524,21 +1531,21 @@ class ListPackets(object):
|
|||
|
||||
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
|
||||
"""
|
||||
if key == 'NODATA':
|
||||
if key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED',
|
||||
'END_DECRYPTION', 'GOOD_PASSPHRASE', 'BAD_PASSPHRASE'):
|
||||
pass
|
||||
elif key == 'NODATA':
|
||||
self.status = nodata(value)
|
||||
elif key == 'ENC_TO':
|
||||
key, _, _ = value.split()
|
||||
if not self.key:
|
||||
self.key = key
|
||||
self.encrypted_to.append(key)
|
||||
elif key == 'NEED_PASSPHRASE':
|
||||
elif key == ('NEED_PASSPHRASE', 'MISSING_PASSPHRASE'):
|
||||
self.need_passphrase = True
|
||||
elif key == 'NEED_PASSPHRASE_SYM':
|
||||
self.need_passphrase_sym = True
|
||||
elif key == 'USERID_HINT':
|
||||
self.userid_hint = value.strip().split()
|
||||
elif key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED',
|
||||
'END_DECRYPTION'):
|
||||
pass
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
|
217
gnupg/_util.py
217
gnupg/_util.py
|
@ -28,18 +28,58 @@ from time import mktime
|
|||
import codecs
|
||||
import encodings
|
||||
import os
|
||||
import psutil
|
||||
import threading
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import sys
|
||||
|
||||
# These are all the classes which are stream-like; they are used in
|
||||
# :func:`_is_stream`.
|
||||
_STREAMLIKE_TYPES = []
|
||||
|
||||
# These StringIO classes are actually utilised.
|
||||
try:
|
||||
import io
|
||||
from io import StringIO
|
||||
from io import BytesIO
|
||||
except ImportError:
|
||||
from cStringIO import StringIO
|
||||
else:
|
||||
# The io.IOBase type covers the above example for an open file handle in
|
||||
# Python3, as well as both io.BytesIO and io.StringIO.
|
||||
_STREAMLIKE_TYPES.append(io.IOBase)
|
||||
|
||||
# The remaining StringIO classes which are imported are used to determine if a
|
||||
# object is a stream-like in :func:`_is_stream`.
|
||||
if 2 == sys.version_info[0]:
|
||||
# Import the StringIO class from the StringIO module since it is a
|
||||
# commonly used stream class. It is distinct from either of the
|
||||
# StringIO's that may be loaded in the above try/except clause, so the
|
||||
# name is prefixed with an underscore to distinguish it.
|
||||
from StringIO import StringIO as _StringIO_StringIO
|
||||
_STREAMLIKE_TYPES.append(_StringIO_StringIO)
|
||||
|
||||
# Import the cStringIO module to test for the cStringIO stream types,
|
||||
# InputType and OutputType. See
|
||||
# http://stackoverflow.com/questions/14735295/to-check-an-instance-is-stringio
|
||||
import cStringIO as _cStringIO
|
||||
_STREAMLIKE_TYPES.append(_cStringIO.InputType)
|
||||
_STREAMLIKE_TYPES.append(_cStringIO.OutputType)
|
||||
|
||||
# In Python2:
|
||||
#
|
||||
# >>> type(open('README.md', 'rb'))
|
||||
# <open file 'README.md', mode 'rb' at 0x7f9493951d20>
|
||||
#
|
||||
# whereas, in Python3, the `file` builtin doesn't exist and instead we get:
|
||||
#
|
||||
# >>> type(open('README.md', 'rb'))
|
||||
# <_io.BufferedReader name='README.md'>
|
||||
#
|
||||
# which is covered by the above addition of io.IOBase.
|
||||
_STREAMLIKE_TYPES.append(file)
|
||||
|
||||
|
||||
from . import _logger
|
||||
|
||||
|
@ -125,6 +165,51 @@ def find_encodings(enc=None, system=False):
|
|||
|
||||
return coder
|
||||
|
||||
|
||||
if _py3k:
|
||||
def b(x):
|
||||
"""See http://python3porting.com/problems.html#nicer-solutions"""
|
||||
coder = find_encodings()
|
||||
if isinstance(x, bytes):
|
||||
return coder.encode(x.decode(coder.name))[0]
|
||||
else:
|
||||
return coder.encode(x)[0]
|
||||
|
||||
def s(x):
|
||||
if isinstance(x, str):
|
||||
return x
|
||||
elif isinstance(x, (bytes, bytearray)):
|
||||
return x.decode(find_encodings().name)
|
||||
else:
|
||||
raise NotImplemented
|
||||
else:
|
||||
def b(x):
|
||||
"""See http://python3porting.com/problems.html#nicer-solutions"""
|
||||
return x
|
||||
|
||||
def s(x):
|
||||
if isinstance(x, basestring):
|
||||
return x
|
||||
elif isinstance(x, (bytes, bytearray)):
|
||||
return x.decode(find_encodings().name)
|
||||
else:
|
||||
raise NotImplemented
|
||||
|
||||
def binary(data):
|
||||
coder = find_encodings()
|
||||
|
||||
if _py3k and isinstance(data, bytes):
|
||||
encoded = coder.encode(data.decode(coder.name))[0]
|
||||
elif _py3k and isinstance(data, str):
|
||||
encoded = coder.encode(data)[0]
|
||||
elif not _py3k and type(data) is not str:
|
||||
encoded = coder.encode(data)[0]
|
||||
else:
|
||||
encoded = data
|
||||
|
||||
return encoded
|
||||
|
||||
|
||||
def author_info(name, contact=None, public_key=None):
|
||||
"""Easy object-oriented representation of contributor info.
|
||||
|
||||
|
@ -144,8 +229,6 @@ def _copy_data(instream, outstream):
|
|||
"""
|
||||
sent = 0
|
||||
|
||||
coder = find_encodings()
|
||||
|
||||
while True:
|
||||
if ((_py3k and isinstance(instream, str)) or
|
||||
(not _py3k and isinstance(instream, basestring))):
|
||||
|
@ -155,24 +238,64 @@ def _copy_data(instream, outstream):
|
|||
data = instream.read(1024)
|
||||
if len(data) == 0:
|
||||
break
|
||||
|
||||
sent += len(data)
|
||||
log.debug("Sending chunk %d bytes:\n%s"
|
||||
% (sent, data))
|
||||
try:
|
||||
outstream.write(data)
|
||||
except UnicodeError:
|
||||
encoded = binary(data)
|
||||
log.debug("Sending %d bytes of data..." % sent)
|
||||
log.debug("Encoded data (type %s):\n%s" % (type(encoded), encoded))
|
||||
|
||||
if not _py3k:
|
||||
try:
|
||||
outstream.write(coder.encode(data))
|
||||
except IOError:
|
||||
log.exception("Error sending data: Broken pipe")
|
||||
outstream.write(encoded)
|
||||
except IOError as ioe:
|
||||
# Can get 'broken pipe' errors even when all data was sent
|
||||
if 'Broken pipe' in str(ioe):
|
||||
log.error('Error sending data: Broken pipe')
|
||||
else:
|
||||
log.exception(ioe)
|
||||
break
|
||||
except IOError as ioe:
|
||||
# Can get 'broken pipe' errors even when all data was sent
|
||||
if 'Broken pipe' in str(ioe):
|
||||
log.error('Error sending data: Broken pipe')
|
||||
else:
|
||||
log.exception(ioe)
|
||||
break
|
||||
log.debug("Wrote data type <type 'str'> to outstream.")
|
||||
else:
|
||||
try:
|
||||
outstream.write(bytes(encoded))
|
||||
except TypeError as te:
|
||||
# XXX FIXME This appears to happen because
|
||||
# _threaded_copy_data() sometimes passes the `outstream` as an
|
||||
# object with type <_io.BufferredWriter> and at other times
|
||||
# with type <encodings.utf_8.StreamWriter>. We hit the
|
||||
# following error when the `outstream` has type
|
||||
# <encodings.utf_8.StreamWriter>.
|
||||
if not "convert 'bytes' object to str implicitly" in str(te):
|
||||
log.error(str(te))
|
||||
try:
|
||||
outstream.write(encoded.decode())
|
||||
except TypeError as yate:
|
||||
# We hit the "'str' does not support the buffer interface"
|
||||
# error in Python3 when the `outstream` is an io.BytesIO and
|
||||
# we try to write a str to it. We don't care about that
|
||||
# error, we'll just try again with bytes.
|
||||
if not "does not support the buffer interface" in str(yate):
|
||||
log.error(str(yate))
|
||||
except IOError as ioe:
|
||||
# Can get 'broken pipe' errors even when all data was sent
|
||||
if 'Broken pipe' in str(ioe):
|
||||
log.error('Error sending data: Broken pipe')
|
||||
else:
|
||||
log.exception(ioe)
|
||||
break
|
||||
else:
|
||||
log.debug("Wrote data type <class 'str'> outstream.")
|
||||
except IOError as ioe:
|
||||
# Can get 'broken pipe' errors even when all data was sent
|
||||
if 'Broken pipe' in str(ioe):
|
||||
log.error('Error sending data: Broken pipe')
|
||||
else:
|
||||
log.exception(ioe)
|
||||
break
|
||||
else:
|
||||
log.debug("Wrote data type <class 'bytes'> to outstream.")
|
||||
|
||||
try:
|
||||
outstream.close()
|
||||
except IOError as ioe:
|
||||
|
@ -350,7 +473,32 @@ def _is_stream(input):
|
|||
:rtype: bool
|
||||
:returns: True if :param:input is a stream, False if otherwise.
|
||||
"""
|
||||
return isinstance(input, BytesIO) or isinstance(input, StringIO)
|
||||
return isinstance(input, tuple(_STREAMLIKE_TYPES))
|
||||
|
||||
def _is_string(thing):
|
||||
"""Check that **thing** is a string. The definition of the latter depends
|
||||
upon the Python version.
|
||||
|
||||
:param thing: The thing to check if it's a string.
|
||||
:rtype: bool
|
||||
:returns: ``True`` if **thing** is string (or unicode in Python2).
|
||||
"""
|
||||
if (_py3k and isinstance(thing, str)):
|
||||
return True
|
||||
if (not _py3k and isinstance(thing, basestring)):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _is_bytes(thing):
|
||||
"""Check that **thing** is bytes.
|
||||
|
||||
:param thing: The thing to check if it's bytes.
|
||||
:rtype: bool
|
||||
:returns: ``True`` if **thing** is bytes or a bytearray.
|
||||
"""
|
||||
if isinstance(thing, (bytes, bytearray)):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _is_list_or_tuple(instance):
|
||||
"""Check that ``instance`` is a list or tuple.
|
||||
|
@ -383,21 +531,26 @@ def _is_gpg2(version):
|
|||
return True
|
||||
return False
|
||||
|
||||
def _make_binary_stream(s, encoding):
|
||||
"""
|
||||
xxx fill me in
|
||||
def _make_binary_stream(thing, encoding=None, armor=True):
|
||||
"""Encode **thing**, then make it stream/file-like.
|
||||
|
||||
:param thing: The thing to turn into a encoded stream.
|
||||
:rtype: ``io.BytesIO`` or ``io.StringIO``.
|
||||
:returns: The encoded **thing**, wrapped in an ``io.BytesIO`` (if
|
||||
available), otherwise wrapped in a ``io.StringIO``.
|
||||
"""
|
||||
if _py3k:
|
||||
if isinstance(thing, str):
|
||||
thing = thing.encode(encoding)
|
||||
else:
|
||||
if type(thing) is not str:
|
||||
thing = thing.encode(encoding)
|
||||
|
||||
try:
|
||||
if _py3k:
|
||||
if isinstance(s, str):
|
||||
s = s.encode(encoding)
|
||||
else:
|
||||
if type(s) is not str:
|
||||
s = s.encode(encoding)
|
||||
from io import BytesIO
|
||||
rv = BytesIO(s)
|
||||
except ImportError:
|
||||
rv = StringIO(s)
|
||||
rv = BytesIO(thing)
|
||||
except NameError:
|
||||
rv = StringIO(thing)
|
||||
|
||||
return rv
|
||||
|
||||
def _make_passphrase(length=None, save=False, file=None):
|
||||
|
@ -418,7 +571,7 @@ def _make_passphrase(length=None, save=False, file=None):
|
|||
passphrase = _make_random_string(length)
|
||||
|
||||
if save:
|
||||
ruid, euid, suid = psutil.Process(os.getpid()).uids
|
||||
ruid, euid, suid = os.getresuid()
|
||||
gid = os.getgid()
|
||||
now = mktime(localtime())
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ class GPG(GPGBase):
|
|||
|
||||
def __init__(self, binary=None, homedir=None, verbose=False,
|
||||
use_agent=False, keyring=None, secring=None,
|
||||
options=None):
|
||||
ignore_homedir_permissions=False, options=None):
|
||||
"""Initialize a GnuPG process wrapper.
|
||||
|
||||
:param str binary: Name for GnuPG binary executable. If the absolute
|
||||
|
@ -73,6 +73,10 @@ class GPG(GPGBase):
|
|||
and private keyrings. Default is whatever GnuPG
|
||||
defaults to.
|
||||
|
||||
:type ignore_homedir_permissions: :obj:`bool`
|
||||
:param ignore_homedir_permissions: If true, bypass check that homedir
|
||||
be writable.
|
||||
|
||||
:type verbose: :obj:`str` or :obj:`int` or :obj:`bool`
|
||||
:param verbose: String or numeric value to pass to GnuPG's
|
||||
``--debug-level`` option. See the GnuPG man page for
|
||||
|
@ -117,13 +121,16 @@ class GPG(GPGBase):
|
|||
secring=secring,
|
||||
options=options,
|
||||
verbose=verbose,
|
||||
use_agent=use_agent,)
|
||||
use_agent=use_agent,
|
||||
ignore_homedir_permissions=ignore_homedir_permissions,
|
||||
)
|
||||
|
||||
log.info(textwrap.dedent("""
|
||||
Initialised settings:
|
||||
binary: %s
|
||||
binary version: %s
|
||||
homedir: %s
|
||||
ignore_homedir_permissions: %s
|
||||
keyring: %s
|
||||
secring: %s
|
||||
default_preference_list: %s
|
||||
|
@ -134,6 +141,7 @@ class GPG(GPGBase):
|
|||
""" % (self.binary,
|
||||
self.binary_version,
|
||||
self.homedir,
|
||||
self.ignore_homedir_permissions,
|
||||
self.keyring,
|
||||
self.secring,
|
||||
self.default_preference_list,
|
||||
|
@ -153,6 +161,12 @@ class GPG(GPGBase):
|
|||
# fatal error (at least it does with GnuPG>=2.0.0):
|
||||
self.create_trustdb()
|
||||
|
||||
# The --no-use-agent and --use-agent options were deprecated in GnuPG
|
||||
# 2.x, so we should set use_agent to None here to avoid having
|
||||
# GPGBase._make_args() add either one.
|
||||
if self.is_gpg2():
|
||||
self.use_agent = None
|
||||
|
||||
@functools.wraps(_trust._create_trustdb)
|
||||
def create_trustdb(self):
|
||||
if self.is_gpg2():
|
||||
|
@ -787,7 +801,7 @@ class GPG(GPGBase):
|
|||
key = key.replace('_','-').title()
|
||||
## to set 'cert', 'Key-Usage' must be blank string
|
||||
if not key in ('Key-Usage', 'Subkey-Usage'):
|
||||
if str(val).strip():
|
||||
if type('')(val).strip():
|
||||
parms[key] = val
|
||||
|
||||
## if Key-Type is 'default', make Subkey-Type also be 'default'
|
||||
|
@ -952,7 +966,10 @@ generate keys. Please see
|
|||
|
||||
.. seealso:: :meth:`._encrypt`
|
||||
"""
|
||||
stream = _make_binary_stream(data, self._encoding)
|
||||
if _is_stream(data):
|
||||
stream = data
|
||||
else:
|
||||
stream = _make_binary_stream(data, self._encoding)
|
||||
result = self._encrypt(stream, recipients, **kwargs)
|
||||
stream.close()
|
||||
return result
|
||||
|
|
|
@ -26,6 +26,7 @@ A test harness and unittests for gnupg.py.
|
|||
from __future__ import absolute_import
|
||||
from __future__ import print_function
|
||||
from __future__ import with_statement
|
||||
|
||||
from argparse import ArgumentParser
|
||||
from codecs import open as open
|
||||
from functools import wraps
|
||||
|
@ -288,8 +289,8 @@ class GPGTestCase(unittest.TestCase):
|
|||
self.assertTrue(os.path.isabs(self.gpg.binary))
|
||||
|
||||
def test_make_args_drop_protected_options(self):
|
||||
"""Test that unsupported gpg options are dropped."""
|
||||
self.gpg.options = ['--tyrannosaurus-rex', '--stegosaurus']
|
||||
"""Test that unsupported gpg options are dropped, and supported ones remain."""
|
||||
self.gpg.options = ['--tyrannosaurus-rex', '--stegosaurus', '--lock-never']
|
||||
gpg_binary_path = _util._find_binary('gpg')
|
||||
cmd = self.gpg._make_args(None, False)
|
||||
expected = [gpg_binary_path,
|
||||
|
@ -297,7 +298,8 @@ class GPGTestCase(unittest.TestCase):
|
|||
'--homedir "%s"' % self.homedir,
|
||||
'--no-default-keyring --keyring %s' % self.keyring,
|
||||
'--secret-keyring %s' % self.secring,
|
||||
'--no-use-agent']
|
||||
'--no-use-agent',
|
||||
'--lock-never']
|
||||
self.assertListEqual(cmd, expected)
|
||||
|
||||
def test_make_args(self):
|
||||
|
@ -388,7 +390,10 @@ class GPGTestCase(unittest.TestCase):
|
|||
def test_gen_key_input(self):
|
||||
"""Test that GnuPG batch file creation is successful."""
|
||||
key_input = self.generate_key_input("Francisco Ferrer", "an.ok")
|
||||
self.assertIsInstance(key_input, str)
|
||||
if _util._py3k:
|
||||
self.assertIsInstance(key_input, str)
|
||||
else:
|
||||
self.assertIsInstance(key_input, basestring)
|
||||
self.assertGreater(key_input.find('Francisco Ferrer'), 0)
|
||||
|
||||
def test_rsa_key_generation(self):
|
||||
|
@ -621,6 +626,66 @@ class GPGTestCase(unittest.TestCase):
|
|||
passphrase='wrong horse battery staple')
|
||||
self.assertFalse(sig, "Bad passphrase should fail")
|
||||
|
||||
def test_signature_string_passphrase_empty_string(self):
|
||||
"""Test that a signing attempt with passphrase='' creates a valid
|
||||
signature.
|
||||
|
||||
See Issue #82: https://github.com/isislovecruft/python-gnupg/issues/82
|
||||
"""
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
message = 'abc\ndef\n'
|
||||
sig = self.gpg.sign(message, default_key=key1, passphrase='')
|
||||
self.assertTrue(sig)
|
||||
self.assertTrue(message in str(sig))
|
||||
|
||||
def test_signature_string_passphrase_empty_bytes_literal(self):
|
||||
"""Test that a signing attempt with passphrase=b'' creates a valid
|
||||
signature.
|
||||
|
||||
See Issue #82: https://github.com/isislovecruft/python-gnupg/issues/82
|
||||
"""
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
message = 'abc\ndef\n'
|
||||
sig = self.gpg.sign(message, default_key=key1, passphrase=b'')
|
||||
self.assertTrue(sig)
|
||||
print("%r" % str(sig))
|
||||
self.assertTrue(message in str(sig))
|
||||
|
||||
def test_signature_string_passphrase_bytes_literal(self):
|
||||
"""Test that a signing attempt with passphrase=b'overalls' creates a
|
||||
valid signature.
|
||||
"""
|
||||
with open(os.path.join(_files, 'kat.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
message = 'abc\ndef\n'
|
||||
sig = self.gpg.sign(message, default_key=key1, passphrase=b'overalls')
|
||||
self.assertTrue(sig)
|
||||
print("%r" % str(sig))
|
||||
self.assertTrue(message in str(sig))
|
||||
|
||||
def test_signature_string_passphrase_None(self):
|
||||
"""Test that a signing attempt with passphrase=None fails creates a
|
||||
valid signature.
|
||||
|
||||
See Issue #82: https://github.com/isislovecruft/python-gnupg/issues/82
|
||||
"""
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
message = 'abc\ndef\n'
|
||||
sig = self.gpg.sign(message, default_key=key1, passphrase=None)
|
||||
self.assertTrue(sig)
|
||||
self.assertTrue(message in str(sig))
|
||||
|
||||
def test_signature_file(self):
|
||||
"""Test that signing a message file works."""
|
||||
key = self.generate_key("Leonard Adleman", "rsa.com")
|
||||
|
@ -743,17 +808,84 @@ class GPGTestCase(unittest.TestCase):
|
|||
if os.path.isfile(sigfn):
|
||||
os.unlink(sigfn)
|
||||
|
||||
def test_deletion(self):
|
||||
"""Test that key deletion works."""
|
||||
self.gpg.import_keys(KEYS_TO_IMPORT)
|
||||
def test_deletion_public_key(self):
|
||||
"""Test that key deletion for public keys works, and that it leaves the
|
||||
corresponding secret key intact.
|
||||
"""
|
||||
key1 = None
|
||||
key2 = None
|
||||
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
with open(os.path.join(_files, 'test_key_2.sec')) as fh2:
|
||||
res2 = self.gpg.import_keys(fh2.read())
|
||||
key2 = res2.fingerprints[0]
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
self.assertTrue(is_list_with_len(public_keys, 2),
|
||||
"2-element list expected, got %d" % len(public_keys))
|
||||
self.gpg.delete_keys(public_keys[0]['fingerprint'])
|
||||
self.assertTrue(len(public_keys), 2)
|
||||
|
||||
self.gpg.delete_keys(key1)
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
self.assertTrue(is_list_with_len(public_keys, 1),
|
||||
"1-element list expected, got %d" % len(public_keys))
|
||||
log.debug("test_deletion ends")
|
||||
secret_keys = self.gpg.list_keys(secret=True)
|
||||
self.assertTrue(len(public_keys), 1)
|
||||
self.assertTrue(len(secret_keys), 2)
|
||||
|
||||
def test_deletion_secret_key(self):
|
||||
"""Test that key deletion for secret keys works, and that it leaves the
|
||||
corresponding public key intact.
|
||||
"""
|
||||
key1 = None
|
||||
key2 = None
|
||||
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
with open(os.path.join(_files, 'test_key_2.sec')) as fh2:
|
||||
res2 = self.gpg.import_keys(fh2.read())
|
||||
key2 = res2.fingerprints[0]
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
secret_keys = self.gpg.list_keys(secret=True)
|
||||
self.assertEqual(len(public_keys), 2)
|
||||
self.assertEqual(len(secret_keys), 2)
|
||||
|
||||
self.gpg.delete_keys(key1, secret=True)
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
secret_keys = self.gpg.list_keys(secret=True)
|
||||
self.assertEqual(len(public_keys), 2)
|
||||
self.assertEqual(len(secret_keys), 1)
|
||||
|
||||
def test_deletion_subkeys(self):
|
||||
"""Test that key deletion for subkeys deletes both the public and
|
||||
secret portions of the key.
|
||||
"""
|
||||
key1 = None
|
||||
key2 = None
|
||||
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
with open(os.path.join(_files, 'test_key_2.sec')) as fh2:
|
||||
res2 = self.gpg.import_keys(fh2.read())
|
||||
key2 = res2.fingerprints[0]
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
secret_keys = self.gpg.list_keys(secret=True)
|
||||
self.assertEqual(len(public_keys), 2)
|
||||
self.assertEqual(len(secret_keys), 2)
|
||||
|
||||
self.gpg.delete_keys(key1, subkeys=True)
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
secret_keys = self.gpg.list_keys(secret=True)
|
||||
self.assertEqual(len(public_keys), 1)
|
||||
self.assertEqual(len(secret_keys), 1)
|
||||
|
||||
def test_encryption(self):
|
||||
"""Test encryption of a message string"""
|
||||
|
@ -776,6 +908,75 @@ authentication."""
|
|||
log.debug("Encrypted: %s" % encrypted)
|
||||
self.assertNotEquals(message, encrypted)
|
||||
|
||||
def _encryption_test_setup(self):
|
||||
passphrase = "craiggentry"
|
||||
key = self.generate_key("Craig Gentry", "xorr.ox", passphrase=passphrase)
|
||||
fpr = str(key.fingerprint)
|
||||
gentry = self.gpg.export_keys(key.fingerprint)
|
||||
self.gpg.import_keys(gentry)
|
||||
message = """
|
||||
In 2010 Riggio and Sicari presented a practical application of homomorphic
|
||||
encryption to a hybrid wireless sensor/mesh network. The system enables
|
||||
transparent multi-hop wireless backhauls that are able to perform statistical
|
||||
analysis of different kinds of data (temperature, humidity, etc.) coming from
|
||||
a WSN while ensuring both end-to-end encryption and hop-by-hop
|
||||
authentication."""
|
||||
return (message, fpr, passphrase)
|
||||
|
||||
def _encryption_test(self, stream_type, message, fingerprint, passphrase):
|
||||
stream = stream_type(message)
|
||||
encrypted = self.gpg.encrypt(stream, fingerprint).data
|
||||
decrypted = self.gpg.decrypt(encrypted, passphrase=passphrase).data
|
||||
|
||||
if isinstance(decrypted, bytes):
|
||||
decrypted = decrypted.decode()
|
||||
if isinstance(message, bytes):
|
||||
message = message.decode()
|
||||
|
||||
self.assertEqual(message, decrypted)
|
||||
|
||||
def test_encryption_of_file_like_objects_io_StringIO(self):
|
||||
"""Test encryption of file-like object io.StringIO."""
|
||||
message, fpr, passphrase = self._encryption_test_setup()
|
||||
|
||||
try:
|
||||
from io import StringIO
|
||||
if _util._py3k:
|
||||
self._encryption_test(StringIO, message, fpr, passphrase)
|
||||
else:
|
||||
self._encryption_test(StringIO, unicode(message), fpr, passphrase)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
def test_encryption_of_file_like_objects_io_BytesIO(self):
|
||||
"""Test encryption of file-like object io.BytesIO."""
|
||||
message, fpr, passphrase = self._encryption_test_setup()
|
||||
|
||||
try:
|
||||
from io import BytesIO
|
||||
if _util._py3k:
|
||||
self._encryption_test(BytesIO, bytes(message, 'utf-8'), fpr, passphrase)
|
||||
else:
|
||||
self._encryption_test(BytesIO, message, fpr, passphrase)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
def test_encryption_of_file_like_objects_StringIO_StringIO(self):
|
||||
"""Test encryption of file-like object StringIO.StringIO (Python2 only)."""
|
||||
message, fpr, passphrase = self._encryption_test_setup()
|
||||
|
||||
if not _util._py3k:
|
||||
from StringIO import StringIO
|
||||
self._encryption_test(StringIO, message, fpr, passphrase)
|
||||
|
||||
def test_encryption_of_file_like_objects_cStringIO_StringIO(self):
|
||||
"""Test encryption of file-like object cStringIO.StringIO (Python2 only)."""
|
||||
message, fpr, passphrase = self._encryption_test_setup()
|
||||
|
||||
if not _util._py3k:
|
||||
from cStringIO import StringIO
|
||||
self._encryption_test(StringIO, message, fpr, passphrase)
|
||||
|
||||
def test_encryption_alt_encoding(self):
|
||||
"""Test encryption with latin-1 encoding"""
|
||||
key = self.generate_key("Craig Gentry", "xorr.ox",
|
||||
|
@ -784,11 +985,7 @@ authentication."""
|
|||
key = self.generate_key("Marten van Dijk", "xorr.ox")
|
||||
dijk = str(key.fingerprint)
|
||||
self.gpg._encoding = 'latin-1'
|
||||
if _util._py3k:
|
||||
data = 'Hello, André!'
|
||||
else:
|
||||
data = unicode('Hello, André', self.gpg._encoding)
|
||||
data = data.encode(self.gpg._encoding)
|
||||
data = u'Hello, André!'.encode(self.gpg._encoding)
|
||||
encrypted = self.gpg.encrypt(data, gentry)
|
||||
edata = str(encrypted.data)
|
||||
self.assertNotEqual(data, edata)
|
||||
|
@ -885,6 +1082,29 @@ authentication."""
|
|||
|
||||
self.assertEqual(message, decrypted)
|
||||
|
||||
def test_decryption_with_bytes_literal(self):
|
||||
"""Test that ``decrypt(encrypt(b'foo'), ...)`` is successful."""
|
||||
with open(os.path.join(_files, 'kat.sec')) as katsec:
|
||||
self.gpg.import_keys(katsec.read())
|
||||
kat = self.gpg.list_keys('kat')[0]['fingerprint']
|
||||
|
||||
message_filename = os.path.join(_files, 'cypherpunk_manifesto')
|
||||
with open(message_filename, 'rb') as f:
|
||||
output = os.path.join(self.gpg.homedir, 'test-decryption-with-bytes-literal.gpg')
|
||||
kwargs = dict(compress_algo='Uncompressed')
|
||||
message = b'Dance like a psycho'
|
||||
encrypted = self.gpg.encrypt(message, kat, **kwargs)
|
||||
self.assertTrue(encrypted.ok)
|
||||
self.assertGreater(len(str(encrypted)), 0)
|
||||
|
||||
decrypted = self.gpg.decrypt(encrypted.data, passphrase='overalls')
|
||||
self.assertTrue(decrypted.ok)
|
||||
self.assertGreater(len(str(decrypted)), 0)
|
||||
# Decode the message so that we can easily compare it with the
|
||||
# decrypted version in both Python2 and Python3:
|
||||
decoded = message.decode(self.gpg._encoding, self.gpg._decode_errors)
|
||||
self.assertEqual(str(decrypted), decoded)
|
||||
|
||||
def test_encryption_one_hidden_recipient_one_not(self):
|
||||
"""Test to ensure hidden recipient isn't detailed in packet info"""
|
||||
|
||||
|
@ -957,7 +1177,8 @@ boolean circuit causes a considerable overhead."""
|
|||
## We expect Alice's key to be hidden (returned as zero's) and Bob's
|
||||
## key to be there.
|
||||
expected_values = ["0000000000000000", "0000000000000000"]
|
||||
self.assertEquals(expected_values, self.gpg.list_packets(encrypted).encrypted_to)
|
||||
packets = self.gpg.list_packets(encrypted)
|
||||
self.assertEquals(expected_values, packets.encrypted_to)
|
||||
|
||||
def test_encryption_decryption_multi_recipient(self):
|
||||
"""Test decryption of an encrypted string for multiple users"""
|
||||
|
@ -1081,9 +1302,9 @@ know, maybe you shouldn't be doing it in the first place.
|
|||
self.assertTrue(os.path.isfile(output))
|
||||
|
||||
# Check the contents:
|
||||
with open(output) as fh:
|
||||
with open(output, 'rb') as fh:
|
||||
encrypted_message = fh.read()
|
||||
log.debug("Encrypted file contains:\n\n%s\n" % encrypted_message)
|
||||
self.assertTrue(b"-----BEGIN PGP MESSAGE-----" in encrypted_message)
|
||||
|
||||
def test_encryption_to_filehandle(self):
|
||||
"""Test that ``encrypt(..., output=filelikething)`` is successful."""
|
||||
|
@ -1103,9 +1324,45 @@ know, maybe you shouldn't be doing it in the first place.
|
|||
self.assertTrue(os.path.isfile(output))
|
||||
|
||||
# Check the contents:
|
||||
with open(output) as fh:
|
||||
with open(output, 'rb') as fh:
|
||||
encrypted_message = fh.read()
|
||||
log.debug("Encrypted file contains:\n\n%s\n" % encrypted_message)
|
||||
self.assertTrue(b"-----BEGIN PGP MESSAGE-----" in encrypted_message)
|
||||
|
||||
def test_encryption_from_filehandle(self):
|
||||
"""Test that ``encrypt(open('foo'), ...)`` is successful."""
|
||||
message_filename = os.path.join(_files, 'cypherpunk_manifesto')
|
||||
with open(message_filename, 'rb') as f:
|
||||
output = os.path.join(self.gpg.homedir, 'test-encryption-from-filehandle.gpg')
|
||||
kwargs = dict(passphrase='speedtest',
|
||||
symmetric=True,
|
||||
cipher_algo='AES256',
|
||||
encrypt=False,
|
||||
output=output)
|
||||
encrypted = self.gpg.encrypt(f, None, **kwargs)
|
||||
self.assertTrue(encrypted.ok)
|
||||
self.assertGreater(len(encrypted.data), 0)
|
||||
|
||||
def test_encryption_with_output(self):
|
||||
"""Test that ``encrypt('foo', ..., output='/foo/bar/baz')`` is successful."""
|
||||
message_filename = os.path.join(_files, 'cypherpunk_manifesto')
|
||||
with open (message_filename, 'rb') as f:
|
||||
data = f.read()
|
||||
|
||||
output = os.path.join(self.gpg.homedir, 'test-encryption-with-output.gpg')
|
||||
kwargs = dict(passphrase='speedtest',
|
||||
symmetric=True,
|
||||
cipher_algo='AES256',
|
||||
encrypt=False,
|
||||
output=output)
|
||||
encrypted = self.gpg.encrypt(data, None, **kwargs)
|
||||
self.assertTrue(encrypted.ok)
|
||||
self.assertGreater(len(encrypted.data), 0)
|
||||
self.assertTrue(os.path.isfile(output))
|
||||
|
||||
# Check the contents:
|
||||
with open(output, 'rb') as fh:
|
||||
encrypted_message = fh.read()
|
||||
self.assertTrue(b"-----BEGIN PGP MESSAGE-----" in encrypted_message)
|
||||
|
||||
|
||||
suites = { 'parsers': set(['test_parsers_fix_unsafe',
|
||||
|
@ -1142,27 +1399,41 @@ suites = { 'parsers': set(['test_parsers_fix_unsafe',
|
|||
'test_signature_verification_detached',
|
||||
'test_signature_verification_detached_binary',
|
||||
'test_signature_file',
|
||||
'test_signature_string_passphrase_empty_string',
|
||||
'test_signature_string_passphrase_empty_bytes_literal',
|
||||
'test_signature_string_passphrase_bytes_literal',
|
||||
'test_signature_string_passphrase_None',
|
||||
'test_signature_string_bad_passphrase',
|
||||
'test_signature_string_verification',
|
||||
'test_signature_string_algorithm_encoding']),
|
||||
'crypt': set(['test_encryption',
|
||||
'test_encryption_of_file_like_objects_io_StringIO',
|
||||
'test_encryption_of_file_like_objects_io_BytesIO',
|
||||
'test_encryption_of_file_like_objects_StringIO_StringIO',
|
||||
'test_encryption_of_file_like_objects_cStringIO_StringIO',
|
||||
'test_encryption_alt_encoding',
|
||||
'test_encryption_multi_recipient',
|
||||
'test_encryption_decryption_multi_recipient',
|
||||
'test_encryption_one_hidden_recipient_one_not',
|
||||
'test_encryption_throw_keyids',
|
||||
'test_decryption',
|
||||
'test_decryption_with_bytes_literal',
|
||||
'test_symmetric_encryption_and_decryption',
|
||||
'test_file_encryption_and_decryption',
|
||||
'test_encryption_to_filename',
|
||||
'test_encryption_to_filehandle',]),
|
||||
'test_encryption_to_filehandle',
|
||||
'test_encryption_from_filehandle',
|
||||
'test_encryption_with_output',]),
|
||||
'listkeys': set(['test_list_keys_after_generation']),
|
||||
'keyrings': set(['test_public_keyring',
|
||||
'test_secret_keyring',
|
||||
'test_import_and_export',
|
||||
'test_deletion',
|
||||
'test_import_only',
|
||||
'test_recv_keys_default',]), }
|
||||
'test_deletion_public_key',
|
||||
'test_deletion_secret_key',
|
||||
'test_deletion_subkeys',
|
||||
'test_import_only']),
|
||||
'recvkeys': set(['test_recv_keys_default']),
|
||||
}
|
||||
|
||||
def main(args):
|
||||
if not args.quiet:
|
||||
|
|
27
setup.py
27
setup.py
|
@ -22,11 +22,19 @@
|
|||
from __future__ import absolute_import
|
||||
from __future__ import print_function
|
||||
|
||||
import platform
|
||||
import setuptools
|
||||
import sys
|
||||
import os
|
||||
import versioneer
|
||||
|
||||
try:
|
||||
import __pypy__
|
||||
except ImportError:
|
||||
_isPyPy = False
|
||||
else:
|
||||
_isPyPy = True
|
||||
|
||||
|
||||
versioneer.versionfile_source = 'gnupg/_version.py'
|
||||
versioneer.versionfile_build = 'gnupg/_version.py'
|
||||
|
@ -75,6 +83,13 @@ def get_requirements():
|
|||
# Required to make `collections.OrderedDict` available on Python<=2.6
|
||||
requirements.append('ordereddict==1.1#a0ed854ee442051b249bfad0f638bbec')
|
||||
|
||||
# Don't try to install psutil on PyPy:
|
||||
if _isPyPy:
|
||||
for line in requirements[:]:
|
||||
if line.startswith('psutil'):
|
||||
print("Not installing %s on PyPy..." % line)
|
||||
requirements.remove(line)
|
||||
|
||||
return requirements, links
|
||||
|
||||
|
||||
|
@ -89,8 +104,8 @@ This module allows easy access to GnuPG's key management, encryption and \
|
|||
signature functionality from Python programs, by interacting with GnuPG \
|
||||
through file descriptors. Input arguments are strictly checked and sanitised, \
|
||||
and therefore this module should be safe to use in networked applications \
|
||||
requiring direct user input. It is intended for use with Python 2.6 or \
|
||||
greater.
|
||||
requiring direct user input. It is intended for use on Windows, MacOS X, BSD, \
|
||||
or Linux, with Python 2.6, Python 2.7, Python 3.3, Python 3.4, or PyPy.
|
||||
""",
|
||||
license="GPLv3+",
|
||||
|
||||
|
@ -119,7 +134,13 @@ greater.
|
|||
classifiers=[
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"Intended Audience :: Developers",
|
||||
"Intended Audience :: System Administrators",
|
||||
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
|
||||
"Operating System :: Android",
|
||||
"Operating System :: MacOS :: MacOS X",
|
||||
"Operating System :: Microsoft :: Windows",
|
||||
"Operating System :: POSIX :: BSD",
|
||||
"Operating System :: POSIX :: Linux",
|
||||
"Programming Language :: Python",
|
||||
"Programming Language :: Python :: 2",
|
||||
"Programming Language :: Python :: 3",
|
||||
|
@ -127,6 +148,8 @@ greater.
|
|||
"Programming Language :: Python :: 2.7",
|
||||
"Programming Language :: Python :: 3.3",
|
||||
"Programming Language :: Python :: 3.4",
|
||||
"Programming Language :: Python :: Implementation :: CPython",
|
||||
"Programming Language :: Python :: Implementation :: PyPy",
|
||||
"Topic :: Security :: Cryptography",
|
||||
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||
"Topic :: Utilities",]
|
||||
|
|
Loading…
Reference in New Issue