Compare commits

..

2 Commits

Author SHA1 Message Date
Charles Duffy 3909474a4d Continue search for a usable binary if entries are symlinks
Existing code gives up searching for a GnuPG executable (returning None --
contrary to the docstring indicating that a RuntimeError will be thrown in all
cases where no binary can be found) if the first item found is a symlink or is
not accessed via an absolute path.

This refactored version moves the filtering logic down into the _which helper
-- and thus continues to search past unacceptable results -- even if the first
item found is not acceptable.
2014-09-30 20:50:46 -05:00
Charles Duffy fa47edce68 Ignore PROGRESS messages during verify operation
Parsing such messages would change documented behavior (which specifies a
limited set of messages, not conforming with the given status codes). Ignoring
them is thus the safer change.
2014-09-30 20:50:40 -05:00
21 changed files with 4042 additions and 1236 deletions

View File

@ -1,33 +0,0 @@
[run]
source =
gnupg
branch = True
#parallel = True
timid = True
[report]
modules = gnupg
omit =
*/test*
*/_version*
*/__init__*
*/copyleft*
*/sitecustomize*
# Regexes for lines to exclude from report generation:
exclude_lines =
pragma: no cover
# don't complain if the code doesn't hit unimplemented sections:
raise NotImplementedError
pass
# don't complain if non-runnable or debuging code isn't run:
if 0:
if False:
def __repr__
if __name__ == .__main__.:
# Ignore source code which cannot be found:
ignore_errors = True
# Exit with status code 2 if under this percentage is covered:
fail_under = 10
[html]
directory = docs/coverage-html

12
.travis.yml 100644
View File

@ -0,0 +1,12 @@
language: python
before_install:
- sudo apt-get install gpg
python:
- "2.6"
- "2.7"
- "3.3"
install:
- make install
# command to run tests, e.g. python setup.py test
script:
- make test

View File

@ -1,15 +1,4 @@
include LICENSE
include requirements.txt
recursive-include examples *.py
recursive-include docs *.rst
include docs/_static
include docs/conf.py
include docs/make.bat
include docs/Makefile
prune docs/_build
prune private
global-exclude *.log *~
graft docs

View File

@ -2,8 +2,6 @@ SHELL=/bin/sh
TESTDIR=./gnupg/test
TESTHANDLE=$(TESTDIR)/test_gnupg.py
FILES=$(SHELL find ./gnupg/ -name "*.py" -printf "%p,")
PYTHON=$(SHELL which python)
PYTHON3=$(SHELL which python3)
PKG_NAME=python-gnupg
DOC_DIR=docs
DOC_BUILD_DIR:=$(DOC_DIR)/_build
@ -36,11 +34,8 @@ cleanup-tests-all: cleanup-tests
rm -rf tests/tmp
cleanup-build:
-rm MANIFEST
-rm -rf build
cleanup-dist:
-rm -rf dist
mkdir buildnot
rm -rf build*
# it's not strictly necessary that gnupg2, gpg-agent, pinentry, or pip be
# installed, so ignore error exit statuses for those commands
@ -52,70 +47,23 @@ test-before: cleanup-src cleanup-tests
which python && python --version
-which pip && pip --version && pip list
test-run: test-before
python $(TESTHANDLE) \
basic \
encodings \
parsers \
keyrings \
listkeys \
genkey \
sign \
crypt
py3k-test-run: test-before
python3 $(TESTHANDLE) \
basic \
encodings \
parsers \
keyrings \
listkeys \
genkey \
sign \
crypt
coverage-run: test-before
coverage run --rcfile=".coveragerc" $(PYTHON) $(TESTHANDLE) \
basic \
encodings \
parsers \
keyrings \
listkeys \
genkeys \
sign \
crypt
py3k-coverage-run: test-before
coverage run --rcfile=".coveragerc" $(PYTHON3) $(TESTHANDLE) \
basic \
encodings \
parsers \
keyrings \
listkeys \
genkeys \
sign \
crypt
coverage-report:
coverage report --rcfile=".coveragerc"
coverage-html:
coverage html --rcfile=".coveragerc"
clean-test:
test: test-before
python $(TESTHANDLE) basic encodings parsers keyrings listkeys genkey \
sign crypt
touch gnupg/test/placeholder.log
mv gnupg/test/*.log gnupg/test/logs/
rm gnupg/test/logs/placeholder.log
touch gnupg/test/random_seed_is_sekritly_pi
rm gnupg/test/random_seed*
test: test-run clean-test
py3k-test: py3k-test-run clean-test
coverage: coverage-run coverage-report coverage-html clean-test
py3k-coverage: py3k-coverage-run coverage-report coverage-html clean-test
py3k-test: test-before
python3 $(TESTHANDLE) basic encodings parsers keyrings listkeys genkey \
sign crypt
touch gnupg/test/placeholder.log
mv gnupg/test/*.log gnupg/test/logs/
rm gnupg/test/logs/placeholder.log
touch gnupg/test/random_seed_is_sekritly_pi
rm gnupg/test/random_seed*
install:
python setup.py install --record installed-files.txt
@ -142,8 +90,3 @@ docs-html:
docs-zipfile: docs-html
cd $(DOC_HTML_DIR) && { find . -name '*' | zip -@ -v ../$(DOC_BUILD_ZIP) ;};
@echo "Built documentation in $(DOC_BUILD_DIR)/$(DOC_BUILD_ZIP)"
upload: cleanup-build
python setup.py bdist_egg upload --sign
#python setup.py bdist_wheel upload --sign
python setup.py sdist --formats=gztar,zip upload --sign

7
TODO
View File

@ -5,6 +5,13 @@
It would be nice to make the file descriptors for communication with the GnuPG
process configurable, and not the default, hard-coded 0=stdin 1=stdout
2=stderr.
** TODO look into RDBMS/ORM for public key storage :io:db:
see http://stackoverflow.com/q/1235594 and http://elixir.ematia.de/trac/wiki
memcached and pymemcached were the first ones I looked at, then I discovered
redis, which seemed better. At some point we should look into using elixer,
mentioned in the above SO post, so that the backend DB can be chosen freely
and we´re not restricting users to only memcached/cassandra/redis/sqlite/etc.
* Key editing :editkey:
** TODO add '--edit-key' feature :editkey:

View File

@ -170,7 +170,7 @@ def displayNewKey(key):
# `result` is a `gnupg._parsers.ListKeys`, which is list-like, so iterate
# over all the keys and display their info:
for gpgkey in keylist:
for k, v in gpgkey.items():
for k, v in gpgkey:
log.info("%s: %s" % (k.capitalize(), v))
return keylist

View File

@ -32,22 +32,14 @@ import encodings
import locale
import os
import platform
import psutil
import shlex
import subprocess
import sys
import threading
## Using psutil is recommended, but since the extension doesn't run with the
## PyPy interpreter, we'll run even if it's not present.
try:
import psutil
except ImportError:
psutil = None
from . import _parsers
from . import _util
from ._util import b
from ._util import s
from ._parsers import _check_preferences
from ._parsers import _sanitise_list
@ -83,49 +75,19 @@ class GPGMeta(type):
instance containing the gpg-agent process' information to
``cls._agent_proc``.
For Unix systems, we check that the effective UID of this
``python-gnupg`` process is also the owner of the gpg-agent
process. For Windows, we check that the usernames of the owners are
the same. (Sorry Windows users; maybe you should switch to anything
else.)
.. note: This function will only run if the psutil_ Python extension
is installed. Because psutil won't run with the PyPy interpreter,
use of it is optional (although highly recommended).
.. _psutil: https://pypi.python.org/pypi/psutil
:returns: True if there exists a gpg-agent process running under the
same effective user ID as that of this program. Otherwise,
returns False.
returns None.
"""
if not psutil:
return False
this_process = psutil.Process(os.getpid())
ownership_match = False
if _util._running_windows:
identity = this_process.username()
else:
identity = this_process.uids
identity = psutil.Process(os.getpid()).uids
for proc in psutil.process_iter():
if (proc.name == "gpg-agent") and proc.is_running:
log.debug("Found gpg-agent process with pid %d" % proc.pid)
if _util._running_windows:
if proc.username() == identity:
ownership_match = True
else:
if proc.uids == identity:
ownership_match = True
if ownership_match:
log.debug("Effective UIDs of this process and gpg-agent match")
setattr(cls, '_agent_proc', proc)
return True
return False
if proc.uids == identity:
log.debug(
"Effective UIDs of this process and gpg-agent match")
setattr(cls, '_agent_proc', proc)
return True
class GPGBase(object):
@ -149,7 +111,7 @@ class GPGBase(object):
def __init__(self, binary=None, home=None, keyring=None, secring=None,
use_agent=False, default_preference_list=None,
ignore_homedir_permissions=False, verbose=False, options=None):
verbose=False, options=None):
"""Create a ``GPGBase``.
This class is used to set up properties for controlling the behaviour
@ -172,18 +134,13 @@ class GPGBase(object):
:ivar str secring: The filename in **homedir** to use as the keyring
file for secret keys.
"""
self.ignore_homedir_permissions = ignore_homedir_permissions
self.binary = _util._find_binary(binary)
self.homedir = os.path.expanduser(home) if home else _util._conf
self.homedir = home if home else _util._conf
pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg'
sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg'
self.keyring = os.path.join(self._homedir, pub)
self.secring = os.path.join(self._homedir, sec)
self.options = list(_parsers._sanitise_list(options)) if options else None
#: The version string of our GnuPG binary
self.binary_version = '0.0.0'
self.verbose = False
self.options = _parsers._sanitise(options) if options else None
if default_preference_list:
self._prefs = _check_preferences(default_preference_list, 'all')
@ -198,14 +155,6 @@ class GPGBase(object):
self._filesystemencoding = encodings.normalize_encoding(
sys.getfilesystemencoding().lower())
# Issue #49: https://github.com/isislovecruft/python-gnupg/issues/49
#
# During `line = stream.readline()` in `_read_response()`, the Python
# codecs module will choke on Unicode data, so we globally monkeypatch
# the "strict" error handler to use the builtin `replace_errors`
# handler:
codecs.register_error('strict', codecs.replace_errors)
self._keyserver = 'hkp://wwwkeys.pgp.net'
self.__generated_keys = os.path.join(self.homedir, 'generated-keys')
@ -215,12 +164,18 @@ class GPGBase(object):
"'verbose' must be boolean, string, or 0 <= n <= 9"
assert isinstance(use_agent, bool), "'use_agent' must be boolean"
if self.options is not None:
assert isinstance(self.options, list), "options not list"
assert isinstance(self.options, str), "options not string"
except (AssertionError, AttributeError) as ae:
log.error("GPGBase.__init__(): %s" % str(ae))
raise RuntimeError(str(ae))
else:
self._set_verbose(verbose)
if verbose is True:
# The caller wants logging, but we need a valid --debug-level
# for gpg. Default to "basic", and warn about the ambiguity.
# (garrettr)
verbose = "basic"
log.warning('GPG(verbose=True) is ambiguous, defaulting to "basic" logging')
self.verbose = verbose
self.use_agent = use_agent
if hasattr(self, '_agent_proc') \
@ -228,9 +183,6 @@ class GPGBase(object):
if hasattr(self, '__remove_path__'):
self.__remove_path__('pinentry')
# Assign our self.binary_version attribute:
self._check_sane_and_get_gpg_version()
def __remove_path__(self, prog=None, at_exit=True):
"""Remove the directories containing a program from the system's
``$PATH``. If ``GPGBase.binary`` is in a directory being removed, it
@ -416,21 +368,18 @@ class GPGBase(object):
log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd)
_util._create_if_necessary(hd)
if self.ignore_homedir_permissions:
self._homedir = hd
try:
log.debug("GPGBase._homedir_setter(): checking permissions")
assert _util._has_readwrite(hd), \
"Homedir '%s' needs read/write permissions" % hd
except AssertionError as ae:
msg = ("Unable to set '%s' as GnuPG homedir" % directory)
log.debug("GPGBase.homedir.setter(): %s" % msg)
log.debug(str(ae))
raise RuntimeError(str(ae))
else:
try:
log.debug("GPGBase._homedir_setter(): checking permissions")
assert _util._has_readwrite(hd), \
"Homedir '%s' needs read/write permissions" % hd
except AssertionError as ae:
msg = ("Unable to set '%s' as GnuPG homedir" % directory)
log.debug("GPGBase.homedir.setter(): %s" % msg)
log.debug(str(ae))
raise RuntimeError(str(ae))
else:
log.info("Setting homedir to '%s'" % hd)
self._homedir = hd
log.info("Setting homedir to '%s'" % hd)
self._homedir = hd
homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter)
@ -487,24 +436,6 @@ class GPGBase(object):
_generated_keys = _util.InheritableProperty(_generated_keys_getter,
_generated_keys_setter)
def _check_sane_and_get_gpg_version(self):
"""Check that everything runs alright, and grab the gpg binary's
version number while we're at it, storing it as :data:`binary_version`.
:raises RuntimeError: if we cannot invoke the gpg binary.
"""
proc = self._open_subprocess(["--list-config", "--with-colons"])
result = self._result_map['list'](self)
self._read_data(proc.stdout, result)
if proc.returncode:
raise RuntimeError("Error invoking gpg: %s" % result.data)
else:
proc.terminate()
version_line = str(result.data).partition(':version:')[2]
self.binary_version = version_line.split('\n')[0]
log.debug("Using GnuPG version %s" % self.binary_version)
def _make_args(self, args, passphrase=False):
"""Make a list of command line elements for GPG.
@ -539,29 +470,21 @@ class GPGBase(object):
if passphrase: cmd.append('--batch --passphrase-fd 0')
if self.use_agent is True: cmd.append('--use-agent')
elif self.use_agent is False: cmd.append('--no-use-agent')
# The arguments for debugging and verbosity should be placed into the
# cmd list before the options/args in order to resolve Issue #76:
# https://github.com/isislovecruft/python-gnupg/issues/76
if self.verbose:
cmd.append('--debug-all')
if (isinstance(self.verbose, str) or
(isinstance(self.verbose, int) and (self.verbose >= 1))):
# GnuPG<=1.4.18 parses the `--debug-level` command in a way
# that is incompatible with all other GnuPG versions. :'(
if self.binary_version and (self.binary_version <= '1.4.18'):
cmd.append('--debug-level=%s' % self.verbose)
else:
cmd.append('--debug-level %s' % self.verbose)
if self.use_agent: cmd.append('--use-agent')
else: cmd.append('--no-use-agent')
if self.options:
[cmd.append(opt) for opt in iter(_sanitise_list(self.options))]
if args:
[cmd.append(arg) for arg in iter(_sanitise_list(args))]
if self.verbose:
cmd.append('--debug-all')
if ((isinstance(self.verbose, str) and
self.verbose in ['basic', 'advanced', 'expert', 'guru'])
or (isinstance(self.verbose, int) and (1<=self.verbose<=9))):
cmd.append('--debug-level %s' % self.verbose)
return cmd
def _open_subprocess(self, args=None, passphrase=False):
@ -669,36 +592,6 @@ class GPGBase(object):
log.debug("Finishing reading from stream %r..." % stream.__repr__())
log.debug("Read %4d bytes total" % len(result.data))
def _set_verbose(self, verbose):
"""Check and set our :data:`verbose` attribute.
The debug-level must be a string or an integer. If it is one of
the allowed strings, GnuPG will translate it internally to it's
corresponding integer level:
basic = 1-2
advanced = 3-5
expert = 6-8
guru = 9+
If it's not one of the recognised string levels, then then
entire argument is ignored by GnuPG. :(
To fix that stupid behaviour, if they wanted debugging but typo'd
the string level (or specified ``verbose=True``), we'll default to
'basic' logging.
"""
string_levels = ('basic', 'advanced', 'expert', 'guru')
if verbose is True:
# The caller wants logging, but we need a valid --debug-level
# for gpg. Default to "basic", and warn about the ambiguity.
verbose = 'basic'
if (isinstance(verbose, str) and not (verbose in string_levels)):
verbose = 'basic'
self.verbose = verbose
def _collect_output(self, process, result, writer=None, stdin=None):
"""Drain the subprocesses output streams, writing the collected output
to the result. If a writer thread (writing to the subprocess) is given,
@ -806,19 +699,6 @@ class GPGBase(object):
## We could use _handle_io here except for the fact that if the
## passphrase is bad, gpg bails and you can't write the message.
result = self._result_map['sign'](self)
## If the passphrase is an empty string, the message up to and
## including its first newline will be cut off before making it to the
## GnuPG process. Therefore, if the passphrase='' or passphrase=b'',
## we set passphrase=None. See Issue #82:
## https://github.com/isislovecruft/python-gnupg/issues/82
if _util._is_string(passphrase):
passphrase = passphrase if len(passphrase) > 0 else None
elif _util._is_bytes(passphrase):
passphrase = s(passphrase) if len(passphrase) > 0 else None
else:
passphrase = None
proc = self._open_subprocess(args, passphrase is not None)
try:
if passphrase:
@ -838,8 +718,6 @@ class GPGBase(object):
symmetric=False,
always_trust=True,
output=None,
throw_keyids=False,
hidden_recipients=None,
cipher_algo='AES256',
digest_algo='SHA512',
compress_algo='ZLIB'):
@ -912,14 +790,6 @@ class GPGBase(object):
>>> decrypted
'The crow flies at midnight.'
:param bool throw_keyids: If True, make all **recipients** keyids be
zero'd out in packet information. This is the same as using
**hidden_recipients** for all **recipients**. (Default: False).
:param list hidden_recipients: A list of recipients that should have
their keyids zero'd out in packet information.
:param str cipher_algo: The cipher algorithm to use. To see available
algorithms with your version of GnuPG, do:
:command:`$ gpg --with-colons --list-config
@ -971,7 +841,6 @@ class GPGBase(object):
## is decryptable with a passphrase or secretkey.
if symmetric: args.append('--symmetric')
if encrypt: args.append('--encrypt')
if throw_keyids: args.append('--throw-keyids')
if len(recipients) >= 1:
log.debug("GPG.encrypt() called for recipients '%s' with type '%s'"
@ -987,54 +856,39 @@ class GPGBase(object):
log.info("Can't accept recipient string: %s"
% recp)
else:
self._add_recipient_string(args, hidden_recipients, str(recp))
args.append('--recipient %s' % str(recp))
continue
## will give unicode in 2.x as '\uXXXX\uXXXX'
if isinstance(hidden_recipients, (list, tuple)):
if [s for s in hidden_recipients if recp in str(s)]:
args.append('--hidden-recipient %r' % recp)
else:
args.append('--recipient %r' % recp)
else:
args.append('--recipient %r' % recp)
args.append('--recipient %r' % recp)
continue
if isinstance(recp, str):
self._add_recipient_string(args, hidden_recipients, recp)
args.append('--recipient %s' % recp)
elif (not _util._py3k) and isinstance(recp, basestring):
for recp in recipients.split('\x20'):
self._add_recipient_string(args, hidden_recipients, recp)
args.append('--recipient %s' % recp)
elif _util._py3k and isinstance(recp, str):
for recp in recipients.split(' '):
self._add_recipient_string(args, hidden_recipients, recp)
args.append('--recipient %s' % recp)
## ...and now that we've proven py3k is better...
else:
log.debug("Don't know what to do with recipients: %r"
log.debug("Don't know what to do with recipients: '%s'"
% recipients)
result = self._result_map['crypt'](self)
log.debug("Got data '%s' with type '%s'." % (data, type(data)))
self._handle_io(args, data, result, passphrase=passphrase, binary=True)
# Avoid writing raw encrypted bytes to terminal loggers and breaking
# them in that adorable way where they spew hieroglyphics until reset:
if armor:
log.debug("\n%s" % result.data)
log.debug("Got data '%s' with type '%s'."
% (data, type(data)))
self._handle_io(args, data, result,
passphrase=passphrase, binary=True)
log.debug("\n%s" % result.data)
if output_filename:
log.info("Writing encrypted output to file: %s" % output_filename)
with open(output_filename, 'wb') as fh:
with open(output_filename, 'w+') as fh:
fh.write(result.data)
fh.flush()
log.info("Encrypted output written successfully.")
return result
def _add_recipient_string(self, args, hidden_recipients, recipient):
if isinstance(hidden_recipients, (list, tuple)):
if [s for s in hidden_recipients if recipient in str(s)]:
args.append('--hidden-recipient %s' % recipient)
else:
args.append('--recipient %s' % recipient)
else:
args.append('--recipient %s' % recipient)

View File

@ -367,7 +367,7 @@ def _sanitise(*args):
checked += (val + " ")
log.debug("_check_option(): No checks for %s" % val)
return checked.rstrip(' ')
return checked
is_flag = lambda x: x.startswith('--')
@ -475,8 +475,6 @@ def _get_options_group(group=None):
'--export-secret-subkeys',
'--fingerprint',
'--gen-revoke',
'--hidden-encrypt-to',
'--hidden-recipient',
'--list-key',
'--list-keys',
'--list-public-keys',
@ -516,7 +514,6 @@ def _get_options_group(group=None):
'--import',
'--verify',
'--verify-files',
'--output',
])
#: These options expect a string. see :func:`_check_preferences`.
pref_options = frozenset(['--digest-algo',
@ -558,9 +555,6 @@ def _get_options_group(group=None):
'--list-public-keys',
'--list-secret-keys',
'--list-sigs',
'--lock-multiple',
'--lock-never',
'--lock-once',
'--no-default-keyring',
'--no-default-recipient',
'--no-emit-version',
@ -572,7 +566,6 @@ def _get_options_group(group=None):
'--quiet',
'--sign',
'--symmetric',
'--throw-keyids',
'--use-agent',
'--verbose',
'--version',
@ -912,7 +905,6 @@ class Sign(object):
timestamp = None
#: xxx fill me in
what = None
status = None
def __init__(self, gpg):
self._gpg = gpg
@ -935,9 +927,9 @@ class Sign(object):
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
"GOOD_PASSPHRASE", "MISSING_PASSPHRASE", "PINENTRY_LAUNCHED",
"BEGIN_SIGNING", "CARDCTRL", "INV_SGNR", "SIGEXPIRED"):
self.status = key.replace("_", " ").lower()
"GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL",
"INV_SGNR", "SIGEXPIRED"):
pass
elif key == "SIG_CREATED":
(self.sig_type, self.sig_algo, self.sig_hash_algo,
self.what, self.timestamp, self.fingerprint) = value.split()
@ -954,7 +946,6 @@ class Sign(object):
else:
raise ValueError("Unknown status message: %r" % key)
class ListKeys(list):
"""Handle status messages for --list-keys.
@ -965,6 +956,7 @@ class ListKeys(list):
| crs = X.509 certificate and private key available
| ssb = secret subkey (secondary key)
| uat = user attribute (same as user id except for field 10).
| sig = signature
| rev = revocation signature
| pkd = public key data (special field format, see below)
| grp = reserved for gpgsm
@ -975,10 +967,8 @@ class ListKeys(list):
super(ListKeys, self).__init__()
self._gpg = gpg
self.curkey = None
self.curuid = None
self.fingerprints = []
self.uids = []
self.sigs = {}
def key(self, args):
vars = ("""
@ -988,12 +978,8 @@ class ListKeys(list):
for i in range(len(vars)):
self.curkey[vars[i]] = args[i]
self.curkey['uids'] = []
self.curkey['sigs'] = {}
if self.curkey['uid']:
self.curuid = self.curkey['uid']
self.curkey['uids'].append(self.curuid)
self.sigs[self.curuid] = set()
self.curkey['sigs'][self.curuid] = []
self.curkey['uids'].append(self.curkey['uid'])
del self.curkey['uid']
self.curkey['subkeys'] = []
self.append(self.curkey)
@ -1008,21 +994,8 @@ class ListKeys(list):
uid = args[9]
uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
self.curkey['uids'].append(uid)
self.curuid = uid
self.curkey['sigs'][uid] = []
self.sigs[uid] = set()
self.uids.append(uid)
def sig(self, args):
vars = ("""
type trust length algo keyid date expires dummy ownertrust uid
""").split()
sig = {}
for i in range(len(vars)):
sig[vars[i]] = args[i]
self.curkey['sigs'][self.curuid].append(sig)
self.sigs[self.curuid].add(sig['keyid'])
def sub(self, args):
subkey = [args[4], args[11]]
self.curkey['subkeys'].append(subkey)
@ -1032,52 +1005,42 @@ class ListKeys(list):
class ImportResult(object):
"""Parse GnuPG status messages for key import operations."""
"""Parse GnuPG status messages for key import operations.
:type gpg: :class:`gnupg.GPG`
:param gpg: An instance of :class:`gnupg.GPG`.
"""
_ok_reason = {'0': 'Not actually changed',
'1': 'Entirely new key',
'2': 'New user IDs',
'4': 'New signatures',
'8': 'New subkeys',
'16': 'Contains private key',
'17': 'Contains private key',}
_problem_reason = { '0': 'No specific reason given',
'1': 'Invalid Certificate',
'2': 'Issuer Certificate missing',
'3': 'Certificate Chain too long',
'4': 'Error storing certificate', }
_fields = '''count no_user_id imported imported_rsa unchanged
n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups
not_imported'''.split()
_counts = OrderedDict(
zip(_fields, [int(0) for x in range(len(_fields))]) )
#: A list of strings containing the fingerprints of the GnuPG keyIDs
#: imported.
fingerprints = list()
#: A list containing dictionaries with information gathered on keys
#: imported.
results = list()
def __init__(self, gpg):
"""Start parsing the results of a key import operation.
:type gpg: :class:`gnupg.GPG`
:param gpg: An instance of :class:`gnupg.GPG`.
"""
self._gpg = gpg
#: A map from GnuPG codes shown with the ``IMPORT_OK`` status message
#: to their human-meaningful English equivalents.
self._ok_reason = {'0': 'Not actually changed',
'1': 'Entirely new key',
'2': 'New user IDs',
'4': 'New signatures',
'8': 'New subkeys',
'16': 'Contains private key',
'17': 'Contains private key',}
#: A map from GnuPG codes shown with the ``IMPORT_PROBLEM`` status
#: message to their human-meaningful English equivalents.
self._problem_reason = { '0': 'No specific reason given',
'1': 'Invalid Certificate',
'2': 'Issuer Certificate missing',
'3': 'Certificate Chain too long',
'4': 'Error storing certificate', }
#: All the possible status messages pertaining to actions taken while
#: importing a key.
self._fields = '''count no_user_id imported imported_rsa unchanged
n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups
not_imported'''.split()
#: Counts of all the status message results, :data:`_fields` which
#: have appeared.
self.counts = OrderedDict(
zip(self._fields, [int(0) for x in range(len(self._fields))]))
#: A list of strings containing the fingerprints of the GnuPG keyIDs
#: imported.
self.fingerprints = list()
#: A list containing dictionaries with information gathered on keys
#: imported.
self.results = list()
self.counts = self._counts
def __nonzero__(self):
"""Override the determination for truthfulness evaluation.
@ -1085,7 +1048,7 @@ class ImportResult(object):
:rtype: bool
:returns: True if we have immport some keys, False otherwise.
"""
if self.counts['not_imported'] > 0: return False
if self.counts.not_imported > 0: return False
if len(self.fingerprints) == 0: return False
return True
__bool__ = __nonzero__
@ -1093,7 +1056,7 @@ class ImportResult(object):
def _handle_status(self, key, value):
"""Parse a status code from the attached GnuPG process.
:raises ValueError: if the status message is unknown.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key == "IMPORTED":
# this duplicates info we already see in import_ok & import_problem
@ -1226,37 +1189,6 @@ class Verify(object):
self.trust_level = None
#: The string corresponding to the ``trust_level`` number.
self.trust_text = None
#: The subpackets. These are stored as a dictionary, in the following
#: form:
#: Verify.subpackets = {'SUBPACKET_NUMBER': {'flags': FLAGS,
#: 'length': LENGTH,
#: 'data': DATA},
#: 'ANOTHER_SUBPACKET_NUMBER': {...}}
self.subpackets = {}
#: The signature or key notations. These are also stored as a
#: dictionary, in the following form:
#:
#: Verify.notations = {NOTATION_NAME: NOTATION_DATA}
#:
#: For example, the Bitcoin core developer, Peter Todd, encodes in
#: every signature the header of the latest block on the Bitcoin
#: blockchain (to prove that a GnuPG signature that Peter made was made
#: *after* a specific point in time). These look like:
#:
#: gpg: Signature notation: blockhash@bitcoin.org=000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a
#:
#: Which python-gnupg would store as:
#:
#: Verify.notations['blockhash@bitcoin.org'] = '000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a'
self.notations = {}
#: This will be a str or None. If not None, it is the last
#: ``NOTATION_NAME`` we stored in the ``notations`` dict. Because we're
#: not assured that a ``NOTATION_DATA`` status will arrive *immediately*
#: after its corresponding ``NOTATION_NAME``, we store the latest
#: ``NOTATION_NAME`` here until we get its corresponding
#: ``NOTATION_DATA``.
self._last_notation_name = None
def __nonzero__(self):
"""Override the determination for truthfulness evaluation.
@ -1277,8 +1209,7 @@ class Verify(object):
self.trust_level = self.TRUST_LEVELS[key]
elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT",
"PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
"DECRYPTION_OKAY", "INV_SGNR", "PROGRESS",
"PINENTRY_LAUNCHED"):
"DECRYPTION_OKAY", "INV_SGNR", "PROGRESS"):
pass
elif key == "BADSIG":
self.valid = False
@ -1289,7 +1220,6 @@ class Verify(object):
self.status = 'signature good'
self.key_id, self.username = value.split(None, 1)
elif key == "VALIDSIG":
self.valid = True
(self.fingerprint,
self.creation_date,
self.sig_timestamp,
@ -1315,106 +1245,17 @@ class Verify(object):
self.valid = False
self.key_id = value
self.status = 'no public key'
# These are useless in Verify, since they are spit out for any
# pub/subkeys on the key, not just the one doing the signing.
# if we want to check for signatures make with expired key,
# the relevant flags are REVKEYSIG and KEYREVOKED.
elif key in ("KEYEXPIRED", "SIGEXPIRED"):
# these are useless in verify, since they are spit out for any
# pub/subkeys on the key, not just the one doing the signing.
# if we want to check for signatures with expired key,
# the relevant flag is EXPKEYSIG.
pass
# The signature has an expiration date which has already passed
# (EXPKEYSIG), or the signature has been revoked (REVKEYSIG):
elif key in ("EXPKEYSIG", "REVKEYSIG"):
# signed with expired or revoked key
self.valid = False
self.key_id = value.split()[0]
self.status = (('%s %s') % (key[:3], key[3:])).lower()
# This is super annoying, and bad design on the part of GnuPG, in my
# opinion.
#
# This flag can get triggered if a valid signature is made, and then
# later the key (or subkey) which created the signature is
# revoked. When this happens, GnuPG will output:
#
# REVKEYSIG 075BFD18B365D34C Test Expired Key <test@python-gnupg.git>
# VALIDSIG DAB69B05F591640B7F4DCBEA075BFD18B365D34C 2014-09-26 1411700539 0 4 0 1 2 00 4BA800F77452A6C29447FF20F4AF76ACBBE22CE2
# KEYREVOKED
#
# Meaning that we have a timestamp for when the signature was created,
# and we know that the signature is valid, but since GnuPG gives us no
# timestamp for when the key was revoked... we have no ability to
# determine if the valid signature was made *before* the signing key
# was revoked or *after*. Meaning that if you are like me and you sign
# all your software releases and git commits, and you also practice
# good opsec by doing regular key rotations, your old signatures made
# by your expired/revoked keys (even though they were created when the
# key was still good) are considered bad because GnuPG is a
# braindamaged piece of shit.
#
# Software engineering, motherfuckers, DO YOU SPEAK IT?
#
# The signing key which created the signature has since been revoked
# (KEYREVOKED), and we're going to ignore it (but add something to the
# status message):
elif key in ("KEYREVOKED"):
self.status = '\n'.join([self.status, "key revoked"])
# SIG_SUBPACKET <type> <flags> <len> <data>
# This indicates that a signature subpacket was seen. The format is
# the same as the "spk" record above.
#
# [...]
#
# SPK - Signature subpacket records
#
# - Field 2 :: Subpacket number as per RFC-4880 and later.
# - Field 3 :: Flags in hex. Currently the only two bits assigned
# are 1, to indicate that the subpacket came from the
# hashed part of the signature, and 2, to indicate the
# subpacket was marked critical.
# - Field 4 :: Length of the subpacket. Note that this is the
# length of the subpacket, and not the length of field
# 5 below. Due to the need for %-encoding, the length
# of field 5 may be up to 3x this value.
# - Field 5 :: The subpacket data. Printable ASCII is shown as
# ASCII, but other values are rendered as %XX where XX
# is the hex value for the byte.
elif key in ("SIG_SUBPACKET"):
fields = value.split()
try:
subpacket_number = fields[0]
self.subpackets[subpacket_number] = {'flags': None,
'length': None,
'data': None}
except IndexError:
# We couldn't parse the subpacket type (an RFC4880
# identifier), so we shouldn't continue parsing.
pass
else:
# Pull as much data as we can parse out of the subpacket:
try:
self.subpackets[subpacket_number]['flags'] = fields[1]
self.subpackets[subpacket_number]['length'] = fields[2]
self.subpackets[subpacket_number]['data'] = fields[3]
except IndexError:
pass
# NOTATION_
# There are actually two related status codes to convey notation
# data:
#
# - NOTATION_NAME <name>
# - NOTATION_DATA <string>
#
# <name> and <string> are %XX escaped; the data may be split among
# several NOTATION_DATA lines.
elif key.startswith("NOTATION_"):
if key.endswith("NAME"):
self.notations[value] = str()
self._last_notation_name = value
elif key.endswith("DATA"):
if self._last_notation_name is not None:
# Append the NOTATION_DATA to any previous data we
# received for that NOTATION_NAME:
self.notations[self._last_notation_name] += value
else:
pass
else:
raise ValueError("Unknown status message: %r" % key)
@ -1519,33 +1360,26 @@ class ListPackets(object):
self.need_passphrase_sym = None
#: The keyid and uid which this data is encrypted to.
self.userid_hint = None
#: The first key that we detected that a message was encrypted
#: to. This is provided for backwards compatibility. As of Issue #77_,
#: the ``encrypted_to`` attribute should be used instead.
self.key = None
#: A list of keyid's that the message has been encrypted to.
self.encrypted_to = []
def _handle_status(self, key, value):
"""Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED',
'END_DECRYPTION', 'GOOD_PASSPHRASE', 'BAD_PASSPHRASE'):
pass
elif key == 'NODATA':
if key == 'NODATA':
self.status = nodata(value)
elif key == 'ENC_TO':
key, _, _ = value.split()
if not self.key:
self.key = key
self.encrypted_to.append(key)
elif key == ('NEED_PASSPHRASE', 'MISSING_PASSPHRASE'):
# This will only capture keys in our keyring. In the future we
# may want to include multiple unknown keys in this list.
self.key, _, _ = value.split()
elif key == 'NEED_PASSPHRASE':
self.need_passphrase = True
elif key == 'NEED_PASSPHRASE_SYM':
self.need_passphrase_sym = True
elif key == 'USERID_HINT':
self.userid_hint = value.strip().split()
elif key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED',
'END_DECRYPTION'):
pass
else:
raise ValueError("Unknown status message: %r" % key)

View File

@ -57,7 +57,7 @@ def export_ownertrust(cls, trustdb=None):
except (OSError, IOError) as err:
log.debug(str(err))
export_proc = cls._open_subprocess(['--export-ownertrust'])
export_proc = cls._open_subprocess('--export-ownertrust')
tdb = open(trustdb, 'wb')
_util._threaded_copy_data(export_proc.stdout, tdb)
@ -71,7 +71,7 @@ def import_ownertrust(self, trustdb=None):
if trustdb is None:
trustdb = os.path.join(cls.homedir, 'trustdb.gpg')
import_proc = cls._open_subprocess(['--import-ownertrust'])
import_proc = cls._open_subprocess('--import-ownertrust')
tdb = open(trustdb, 'rb')
_util._threaded_copy_data(tdb, import_proc.stdin)
@ -98,6 +98,6 @@ def fix_trustdb(cls, trustdb=None):
"""
if trustdb is None:
trustdb = os.path.join(cls.homedir, 'trustdb.gpg')
export_proc = cls._open_subprocess(['--export-ownertrust'])
import_proc = cls._open_subprocess(['--import-ownertrust'])
export_proc = cls._open_subprocess('--export-ownertrust')
import_proc = cls._open_subprocess('--import-ownertrust')
_util._threaded_copy_data(export_proc.stdout, import_proc.stdin)

View File

@ -28,58 +28,18 @@ from time import mktime
import codecs
import encodings
import os
import psutil
import threading
import random
import re
import string
import sys
# These are all the classes which are stream-like; they are used in
# :func:`_is_stream`.
_STREAMLIKE_TYPES = []
# These StringIO classes are actually utilised.
try:
import io
from io import StringIO
from io import BytesIO
except ImportError:
from cStringIO import StringIO
else:
# The io.IOBase type covers the above example for an open file handle in
# Python3, as well as both io.BytesIO and io.StringIO.
_STREAMLIKE_TYPES.append(io.IOBase)
# The remaining StringIO classes which are imported are used to determine if a
# object is a stream-like in :func:`_is_stream`.
if 2 == sys.version_info[0]:
# Import the StringIO class from the StringIO module since it is a
# commonly used stream class. It is distinct from either of the
# StringIO's that may be loaded in the above try/except clause, so the
# name is prefixed with an underscore to distinguish it.
from StringIO import StringIO as _StringIO_StringIO
_STREAMLIKE_TYPES.append(_StringIO_StringIO)
# Import the cStringIO module to test for the cStringIO stream types,
# InputType and OutputType. See
# http://stackoverflow.com/questions/14735295/to-check-an-instance-is-stringio
import cStringIO as _cStringIO
_STREAMLIKE_TYPES.append(_cStringIO.InputType)
_STREAMLIKE_TYPES.append(_cStringIO.OutputType)
# In Python2:
#
# >>> type(open('README.md', 'rb'))
# <open file 'README.md', mode 'rb' at 0x7f9493951d20>
#
# whereas, in Python3, the `file` builtin doesn't exist and instead we get:
#
# >>> type(open('README.md', 'rb'))
# <_io.BufferedReader name='README.md'>
#
# which is covered by the above addition of io.IOBase.
_STREAMLIKE_TYPES.append(file)
from . import _logger
@ -96,9 +56,6 @@ try:
except NameError:
_py3k = True
_running_windows = False
if "win" in sys.platform:
_running_windows = True
## Directory shortcuts:
## we don't want to use this one because it writes to the install dir:
@ -106,20 +63,6 @@ if "win" in sys.platform:
_here = os.path.join(os.getcwd(), 'gnupg') ## current dir
_test = os.path.join(os.path.join(_here, 'test'), 'tmp') ## ./tests/tmp
_user = os.environ.get('HOME') ## $HOME
# Fix for Issue #74: we shouldn't expect that a $HOME directory is set in all
# environs. https://github.com/isislovecruft/python-gnupg/issues/74
if not _user:
_user = '/tmp/python-gnupg'
try:
os.makedirs(_user)
except (OSError, IOError):
_user = os.getcwd()
# If we can't use $HOME, but we have (or can create) a
# /tmp/python-gnupg/gnupghome directory, then we'll default to using
# that. Otherwise, we'll use the current directory + /gnupghome.
_user = os.path.sep.join([_user, 'gnupghome'])
_ugpg = os.path.join(_user, '.gnupg') ## $HOME/.gnupg
_conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg')
## $HOME/.config/python-gnupg
@ -127,9 +70,6 @@ _conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg')
## Logger is disabled by default
log = _logger.create_logger(0)
#: Compiled regex for determining a GnuPG binary's version:
_VERSION_STRING_REGEX = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*')
def find_encodings(enc=None, system=False):
"""Find functions for encoding translations for a specific codec.
@ -165,51 +105,6 @@ def find_encodings(enc=None, system=False):
return coder
if _py3k:
def b(x):
"""See http://python3porting.com/problems.html#nicer-solutions"""
coder = find_encodings()
if isinstance(x, bytes):
return coder.encode(x.decode(coder.name))[0]
else:
return coder.encode(x)[0]
def s(x):
if isinstance(x, str):
return x
elif isinstance(x, (bytes, bytearray)):
return x.decode(find_encodings().name)
else:
raise NotImplemented
else:
def b(x):
"""See http://python3porting.com/problems.html#nicer-solutions"""
return x
def s(x):
if isinstance(x, basestring):
return x
elif isinstance(x, (bytes, bytearray)):
return x.decode(find_encodings().name)
else:
raise NotImplemented
def binary(data):
coder = find_encodings()
if _py3k and isinstance(data, bytes):
encoded = coder.encode(data.decode(coder.name))[0]
elif _py3k and isinstance(data, str):
encoded = coder.encode(data)[0]
elif not _py3k and type(data) is not str:
encoded = coder.encode(data)[0]
else:
encoded = data
return encoded
def author_info(name, contact=None, public_key=None):
"""Easy object-oriented representation of contributor info.
@ -229,6 +124,8 @@ def _copy_data(instream, outstream):
"""
sent = 0
coder = find_encodings()
while True:
if ((_py3k and isinstance(instream, str)) or
(not _py3k and isinstance(instream, basestring))):
@ -238,64 +135,24 @@ def _copy_data(instream, outstream):
data = instream.read(1024)
if len(data) == 0:
break
sent += len(data)
encoded = binary(data)
log.debug("Sending %d bytes of data..." % sent)
log.debug("Encoded data (type %s):\n%s" % (type(encoded), encoded))
if not _py3k:
log.debug("Sending chunk %d bytes:\n%s"
% (sent, data))
try:
outstream.write(data)
except UnicodeError:
try:
outstream.write(encoded)
except IOError as ioe:
# Can get 'broken pipe' errors even when all data was sent
if 'Broken pipe' in str(ioe):
log.error('Error sending data: Broken pipe')
else:
log.exception(ioe)
outstream.write(coder.encode(data))
except IOError:
log.exception("Error sending data: Broken pipe")
break
except IOError as ioe:
# Can get 'broken pipe' errors even when all data was sent
if 'Broken pipe' in str(ioe):
log.error('Error sending data: Broken pipe')
else:
log.debug("Wrote data type <type 'str'> to outstream.")
else:
try:
outstream.write(bytes(encoded))
except TypeError as te:
# XXX FIXME This appears to happen because
# _threaded_copy_data() sometimes passes the `outstream` as an
# object with type <_io.BufferredWriter> and at other times
# with type <encodings.utf_8.StreamWriter>. We hit the
# following error when the `outstream` has type
# <encodings.utf_8.StreamWriter>.
if not "convert 'bytes' object to str implicitly" in str(te):
log.error(str(te))
try:
outstream.write(encoded.decode())
except TypeError as yate:
# We hit the "'str' does not support the buffer interface"
# error in Python3 when the `outstream` is an io.BytesIO and
# we try to write a str to it. We don't care about that
# error, we'll just try again with bytes.
if not "does not support the buffer interface" in str(yate):
log.error(str(yate))
except IOError as ioe:
# Can get 'broken pipe' errors even when all data was sent
if 'Broken pipe' in str(ioe):
log.error('Error sending data: Broken pipe')
else:
log.exception(ioe)
break
else:
log.debug("Wrote data type <class 'str'> outstream.")
except IOError as ioe:
# Can get 'broken pipe' errors even when all data was sent
if 'Broken pipe' in str(ioe):
log.error('Error sending data: Broken pipe')
else:
log.exception(ioe)
break
else:
log.debug("Wrote data type <class 'bytes'> to outstream.")
log.exception(ioe)
break
try:
outstream.close()
except IOError as ioe:
@ -403,8 +260,6 @@ def _find_binary(binary=None):
"""
found = None
if binary is not None:
if os.path.isabs(binary) and os.path.isfile(binary):
return binary
if not os.path.isabs(binary):
try:
found = _which(binary)
@ -473,32 +328,7 @@ def _is_stream(input):
:rtype: bool
:returns: True if :param:input is a stream, False if otherwise.
"""
return isinstance(input, tuple(_STREAMLIKE_TYPES))
def _is_string(thing):
"""Check that **thing** is a string. The definition of the latter depends
upon the Python version.
:param thing: The thing to check if it's a string.
:rtype: bool
:returns: ``True`` if **thing** is string (or unicode in Python2).
"""
if (_py3k and isinstance(thing, str)):
return True
if (not _py3k and isinstance(thing, basestring)):
return True
return False
def _is_bytes(thing):
"""Check that **thing** is bytes.
:param thing: The thing to check if it's bytes.
:rtype: bool
:returns: ``True`` if **thing** is bytes or a bytearray.
"""
if isinstance(thing, (bytes, bytearray)):
return True
return False
return isinstance(input, BytesIO) or isinstance(input, StringIO)
def _is_list_or_tuple(instance):
"""Check that ``instance`` is a list or tuple.
@ -531,26 +361,21 @@ def _is_gpg2(version):
return True
return False
def _make_binary_stream(thing, encoding=None, armor=True):
"""Encode **thing**, then make it stream/file-like.
:param thing: The thing to turn into a encoded stream.
:rtype: ``io.BytesIO`` or ``io.StringIO``.
:returns: The encoded **thing**, wrapped in an ``io.BytesIO`` (if
available), otherwise wrapped in a ``io.StringIO``.
def _make_binary_stream(s, encoding):
"""
xxx fill me in
"""
if _py3k:
if isinstance(thing, str):
thing = thing.encode(encoding)
else:
if type(thing) is not str:
thing = thing.encode(encoding)
try:
rv = BytesIO(thing)
except NameError:
rv = StringIO(thing)
if _py3k:
if isinstance(s, str):
s = s.encode(encoding)
else:
if type(s) is not str:
s = s.encode(encoding)
from io import BytesIO
rv = BytesIO(s)
except ImportError:
rv = StringIO(s)
return rv
def _make_passphrase(length=None, save=False, file=None):
@ -571,7 +396,7 @@ def _make_passphrase(length=None, save=False, file=None):
passphrase = _make_random_string(length)
if save:
ruid, euid, suid = os.getresuid()
ruid, euid, suid = psutil.Process(os.getpid()).uids
gid = os.getgid()
now = mktime(localtime())
@ -602,7 +427,8 @@ def _match_version_string(version):
:param str version: A version string in the form x.x.x
"""
matched = _VERSION_STRING_REGEX.match(version)
regex = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*')
matched = regex.match(version)
g = matched.groups()
major, minor, micro = int(g[0]), int(g[2]), int(g[4])
return (major, minor, micro)

View File

@ -179,9 +179,9 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
return None
return {"version": dirname[len(parentdir_prefix):], "full": ""}
tag_prefix = ""
parentdir_prefix = "gnupg-"
versionfile_source = "gnupg/_version.py"
tag_prefix = "python-gnupg-"
parentdir_prefix = "python-gnupg-"
versionfile_source = "src/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full }

View File

@ -36,7 +36,13 @@ import os
import re
import textwrap
try:
from io import StringIO
except ImportError:
from cStringIO import StringIO
#: see :pep:`328` http://docs.python.org/2.5/whatsnew/pep-328.html
from . import _parsers
from . import _util
from . import _trust
from ._meta import GPGBase
@ -60,7 +66,7 @@ class GPG(GPGBase):
def __init__(self, binary=None, homedir=None, verbose=False,
use_agent=False, keyring=None, secring=None,
ignore_homedir_permissions=False, options=None):
options=None):
"""Initialize a GnuPG process wrapper.
:param str binary: Name for GnuPG binary executable. If the absolute
@ -73,10 +79,6 @@ class GPG(GPGBase):
and private keyrings. Default is whatever GnuPG
defaults to.
:type ignore_homedir_permissions: :obj:`bool`
:param ignore_homedir_permissions: If true, bypass check that homedir
be writable.
:type verbose: :obj:`str` or :obj:`int` or :obj:`bool`
:param verbose: String or numeric value to pass to GnuPG's
``--debug-level`` option. See the GnuPG man page for
@ -121,16 +123,12 @@ class GPG(GPGBase):
secring=secring,
options=options,
verbose=verbose,
use_agent=use_agent,
ignore_homedir_permissions=ignore_homedir_permissions,
)
use_agent=use_agent,)
log.info(textwrap.dedent("""
Initialised settings:
binary: %s
binary version: %s
homedir: %s
ignore_homedir_permissions: %s
keyring: %s
secring: %s
default_preference_list: %s
@ -138,16 +136,9 @@ class GPG(GPGBase):
options: %s
verbose: %s
use_agent: %s
""" % (self.binary,
self.binary_version,
self.homedir,
self.ignore_homedir_permissions,
self.keyring,
self.secring,
self.default_preference_list,
self.keyserver, self.options,
str(self.verbose),
str(self.use_agent))))
""" % (self.binary, self.homedir, self.keyring, self.secring,
self.default_preference_list, self.keyserver, self.options,
str(self.verbose), str(self.use_agent))))
self._batch_dir = os.path.join(self.homedir, 'batch-files')
self._key_dir = os.path.join(self.homedir, 'generated-keys')
@ -156,52 +147,58 @@ class GPG(GPGBase):
self.temp_keyring = None
#: The secring used in the most recently created batch file
self.temp_secring = None
#: The version string of our GnuPG binary
self.binary_version = str()
# Make sure that the trustdb exists, or else GnuPG will exit with a
# fatal error (at least it does with GnuPG>=2.0.0):
self.create_trustdb()
## check that everything runs alright, and grab the gpg binary's
## version number while we're at it:
proc = self._open_subprocess(["--list-config", "--with-colons"])
result = self._result_map['list'](self)
self._read_data(proc.stdout, result)
if proc.returncode:
raise RuntimeError("Error invoking gpg: %s" % result.data)
# The --no-use-agent and --use-agent options were deprecated in GnuPG
# 2.x, so we should set use_agent to None here to avoid having
# GPGBase._make_args() add either one.
if self.is_gpg2():
self.use_agent = None
version_line = str(result.data).partition(':version:')[2]
self.binary_version = version_line.split('\n')[0]
log.debug("Using GnuPG version %s" % self.binary_version)
if _util._is_gpg2:
# Make GnuPG>=2.0.0-only methods public:
self.fix_trustdb = self._fix_trustdb
self.import_ownertrust = self._import_ownertrust
self.export_ownertrust = self._export_ownertrust
# Make sure that the trustdb exists, or else GnuPG will exit with
# a fatal error (at least it does with GnuPG>=2.0.0):
self._create_trustdb()
@functools.wraps(_trust._create_trustdb)
def create_trustdb(self):
def _create_trustdb(self):
if self.is_gpg2():
_trust._create_trustdb(self)
else:
log.info("Creating the trustdb is only available with GnuPG>=2.x")
# For backward compatibility with python-gnupg<=1.3.1:
_create_trustdb = create_trustdb
@functools.wraps(_trust.fix_trustdb)
def fix_trustdb(self, trustdb=None):
def _fix_trustdb(self, trustdb=None):
if self.is_gpg2():
_trust.fix_trustdb(self)
else:
log.info("Fixing the trustdb is only available with GnuPG>=2.x")
# For backward compatibility with python-gnupg<=1.3.1:
_fix_trustdb = fix_trustdb
@functools.wraps(_trust.import_ownertrust)
def import_ownertrust(self, trustdb=None):
def _import_ownertrust(self, trustdb=None):
if self.is_gpg2():
_trust.import_ownertrust(self)
else:
log.info("Importing ownertrust is only available with GnuPG>=2.x")
# For backward compatibility with python-gnupg<=1.3.1:
_import_ownertrust = import_ownertrust
@functools.wraps(_trust.export_ownertrust)
def export_ownertrust(self, trustdb=None):
def _export_ownertrust(self, trustdb=None):
if self.is_gpg2():
_trust.export_ownertrust(self)
else:
log.info("Exporting ownertrust is only available with GnuPG>=2.x")
# For backward compatibility with python-gnupg<=1.3.1:
_export_ownertrust = export_ownertrust
def is_gpg1(self):
"""Returns true if using GnuPG <= 1.x."""
@ -287,13 +284,15 @@ class GPG(GPGBase):
signatures. If using detached signatures, the file containing the
detached signature should be specified as the ``sig_file``.
:param file file: A file descriptor object.
:param file file: A file descriptor object. Its type will be checked
with :func:`_util._is_file`.
:param str sig_file: A file containing the GPG signature data for
``file``. If given, ``file`` is verified via this detached
signature. Its type will be checked with :func:`_util._is_file`.
signature.
"""
fn = None
result = self._result_map['verify'](self)
if sig_file is None:
@ -308,15 +307,19 @@ class GPG(GPGBase):
return result
log.debug('verify_file(): Handling detached verification')
sig_fh = None
data_fh = None
try:
sig_fh = open(sig_file, 'rb')
data_fh = open(file, 'rb')
args = ["--verify %s -" % sig_fh.name]
proc = self._open_subprocess(args)
writer = _util._threaded_copy_data(file, proc.stdin)
writer = _util._threaded_copy_data(data_fh, proc.stdin)
self._collect_output(proc, result, writer, stdin=proc.stdin)
finally:
if sig_fh and not sig_fh.closed:
sig_fh.close()
if data_fh and not data_fh.closed:
data_fh.close()
return result
def import_keys(self, key_data):
@ -485,7 +488,19 @@ class GPG(GPGBase):
self._collect_output(p, result, stdin=p.stdin)
lines = result.data.decode(self._encoding,
self._decode_errors).splitlines()
self._parse_keys(result)
valid_keywords = 'pub uid sec fpr sub'.split()
for line in lines:
if self.verbose:
print(line)
log.debug("%r", line.rstrip())
if not line:
break
L = line.strip().split(':')
if not L:
continue
keyword = L[0]
if keyword in valid_keywords:
getattr(result, keyword)(L)
return result
def list_packets(self, raw_data):
@ -506,8 +521,8 @@ class GPG(GPGBase):
>>> assert key.fingerprint
:rtype: dict
:returns: res.sigs is a dictionary whose keys are the uids and whose
values are a set of signature keyids.
:returns: A dictionary whose keys are the original keyid parameters,
and whose values are lists of signatures.
"""
if len(keyids) > self._batch_limit:
raise ValueError(
@ -522,26 +537,8 @@ class GPG(GPGBase):
proc = self._open_subprocess(args)
result = self._result_map['list'](self)
self._collect_output(proc, result, stdin=proc.stdin)
self._parse_keys(result)
return result
def _parse_keys(self, result):
lines = result.data.decode(self._encoding,
self._decode_errors).splitlines()
valid_keywords = 'pub uid sec fpr sub sig'.split()
for line in lines:
if self.verbose:
print(line)
log.debug("%r", line.rstrip())
if not line:
break
L = line.strip().split(':')
if not L:
continue
keyword = L[0]
if keyword in valid_keywords:
getattr(result, keyword)(L)
def gen_key(self, input):
"""Generate a GnuPG key through batch file key generation. See
:meth:`GPG.gen_key_input()` for creating the control input.
@ -801,7 +798,7 @@ class GPG(GPGBase):
key = key.replace('_','-').title()
## to set 'cert', 'Key-Usage' must be blank string
if not key in ('Key-Usage', 'Subkey-Usage'):
if type('')(val).strip():
if str(val).strip():
parms[key] = val
## if Key-Type is 'default', make Subkey-Type also be 'default'
@ -944,13 +941,6 @@ generate keys. Please see
'The crow flies at midnight.'
:param bool throw_keyids: If True, make all **recipients** keyids be
zero'd out in packet information. This is the same as using
**hidden_recipients** for all **recipients**. (Default: False).
:param list hidden_recipients: A list of recipients that should have
their keyids zero'd out in packet information.
:param str cipher_algo: The cipher algorithm to use. To see available
algorithms with your version of GnuPG, do:
:command:`$ gpg --with-colons --list-config ciphername`.
@ -966,10 +956,7 @@ generate keys. Please see
.. seealso:: :meth:`._encrypt`
"""
if _is_stream(data):
stream = data
else:
stream = _make_binary_stream(data, self._encoding)
stream = _make_binary_stream(data, self._encoding)
result = self._encrypt(stream, recipients, **kwargs)
stream.close()
return result

View File

@ -26,7 +26,6 @@ A test harness and unittests for gnupg.py.
from __future__ import absolute_import
from __future__ import print_function
from __future__ import with_statement
from argparse import ArgumentParser
from codecs import open as open
from functools import wraps
@ -34,8 +33,10 @@ from glob import glob
from time import localtime
from time import mktime
import encodings
import doctest
import io
import logging
import os
import shutil
import sys
@ -191,9 +192,7 @@ class GPGTestCase(unittest.TestCase):
print(fixed)
test_file = os.path.join(_files, 'cypherpunk_manifesto')
self.assertTrue(os.path.isfile(test_file))
datafd = open(test_file, 'rb')
has_shell = self.gpg.verify_file(datafd, sig_file=fixed)
datafd.close()
has_shell = self.gpg.verify_file(test_file, fixed)
self.assertFalse(has_shell.valid)
def test_parsers_fix_unsafe_semicolon(self):
@ -289,8 +288,8 @@ class GPGTestCase(unittest.TestCase):
self.assertTrue(os.path.isabs(self.gpg.binary))
def test_make_args_drop_protected_options(self):
"""Test that unsupported gpg options are dropped, and supported ones remain."""
self.gpg.options = ['--tyrannosaurus-rex', '--stegosaurus', '--lock-never']
"""Test that unsupported gpg options are dropped."""
self.gpg.options = ['--tyrannosaurus-rex', '--stegosaurus']
gpg_binary_path = _util._find_binary('gpg')
cmd = self.gpg._make_args(None, False)
expected = [gpg_binary_path,
@ -298,8 +297,7 @@ class GPGTestCase(unittest.TestCase):
'--homedir "%s"' % self.homedir,
'--no-default-keyring --keyring %s' % self.keyring,
'--secret-keyring %s' % self.secring,
'--no-use-agent',
'--lock-never']
'--no-use-agent']
self.assertListEqual(cmd, expected)
def test_make_args(self):
@ -390,10 +388,7 @@ class GPGTestCase(unittest.TestCase):
def test_gen_key_input(self):
"""Test that GnuPG batch file creation is successful."""
key_input = self.generate_key_input("Francisco Ferrer", "an.ok")
if _util._py3k:
self.assertIsInstance(key_input, str)
else:
self.assertIsInstance(key_input, basestring)
self.assertIsInstance(key_input, str)
self.assertGreater(key_input.find('Francisco Ferrer'), 0)
def test_rsa_key_generation(self):
@ -569,7 +564,7 @@ class GPGTestCase(unittest.TestCase):
def test_import_only(self):
"""Test that key import works."""
self.test_list_keys_initial_public()
self.assertTrue(self.gpg.import_keys(KEYS_TO_IMPORT))
self.gpg.import_keys(KEYS_TO_IMPORT)
public_keys = self.gpg.list_keys()
self.assertTrue(is_list_with_len(public_keys, 2),
"2-element list expected")
@ -626,66 +621,6 @@ class GPGTestCase(unittest.TestCase):
passphrase='wrong horse battery staple')
self.assertFalse(sig, "Bad passphrase should fail")
def test_signature_string_passphrase_empty_string(self):
"""Test that a signing attempt with passphrase='' creates a valid
signature.
See Issue #82: https://github.com/isislovecruft/python-gnupg/issues/82
"""
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
res1 = self.gpg.import_keys(fh1.read())
key1 = res1.fingerprints[0]
message = 'abc\ndef\n'
sig = self.gpg.sign(message, default_key=key1, passphrase='')
self.assertTrue(sig)
self.assertTrue(message in str(sig))
def test_signature_string_passphrase_empty_bytes_literal(self):
"""Test that a signing attempt with passphrase=b'' creates a valid
signature.
See Issue #82: https://github.com/isislovecruft/python-gnupg/issues/82
"""
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
res1 = self.gpg.import_keys(fh1.read())
key1 = res1.fingerprints[0]
message = 'abc\ndef\n'
sig = self.gpg.sign(message, default_key=key1, passphrase=b'')
self.assertTrue(sig)
print("%r" % str(sig))
self.assertTrue(message in str(sig))
def test_signature_string_passphrase_bytes_literal(self):
"""Test that a signing attempt with passphrase=b'overalls' creates a
valid signature.
"""
with open(os.path.join(_files, 'kat.sec')) as fh1:
res1 = self.gpg.import_keys(fh1.read())
key1 = res1.fingerprints[0]
message = 'abc\ndef\n'
sig = self.gpg.sign(message, default_key=key1, passphrase=b'overalls')
self.assertTrue(sig)
print("%r" % str(sig))
self.assertTrue(message in str(sig))
def test_signature_string_passphrase_None(self):
"""Test that a signing attempt with passphrase=None fails creates a
valid signature.
See Issue #82: https://github.com/isislovecruft/python-gnupg/issues/82
"""
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
res1 = self.gpg.import_keys(fh1.read())
key1 = res1.fingerprints[0]
message = 'abc\ndef\n'
sig = self.gpg.sign(message, default_key=key1, passphrase=None)
self.assertTrue(sig)
self.assertTrue(message in str(sig))
def test_signature_file(self):
"""Test that signing a message file works."""
key = self.generate_key("Leonard Adleman", "rsa.com")
@ -729,7 +664,6 @@ class GPGTestCase(unittest.TestCase):
sig = self.gpg.sign(message, default_key=key.fingerprint,
passphrase='johanborst')
self.assertTrue(sig, "Good passphrase should succeed")
try:
file = _util._make_binary_stream(sig.data, self.gpg._encoding)
verified = self.gpg.verify_file(file)
@ -762,7 +696,7 @@ class GPGTestCase(unittest.TestCase):
datafd.seek(0)
sigfd.seek(0)
verified = self.gpg.verify_file(datafd, sig_file=sigfn)
verified = self.gpg.verify_file(datafn, sigfn)
if key.fingerprint != verified.fingerprint:
log.warn("key fingerprint: %r", key.fingerprint)
@ -773,7 +707,7 @@ class GPGTestCase(unittest.TestCase):
os.unlink(sigfn)
def test_signature_verification_detached_binary(self):
"""Test that detached signature verification in binary mode works."""
"""Test that detached signature verification in binary mode fails."""
key = self.generate_key("Adi Shamir", "rsa.com")
datafn = os.path.join(_files, 'cypherpunk_manifesto')
@ -781,6 +715,7 @@ class GPGTestCase(unittest.TestCase):
datafd = open(datafn, 'rb')
data = datafd.read()
datafd.close()
sig = self.gpg.sign(data, default_key=key.fingerprint,
passphrase='adishamir',
@ -796,96 +731,29 @@ class GPGTestCase(unittest.TestCase):
sigfd.close()
self.assertTrue(sigfd.closed, "Sigfile '%s' should be closed" % sigfn)
with self.assertRaises(UnicodeDecodeError):
print("SIG=%s" % sig)
datafd.seek(0)
verification = self.gpg.verify_file(datafd, sig_file=sigfn)
verifysig = open(sigfn, 'rb')
verification = self.gpg.verify_file(data, verifysig)
self.assertTrue(isinstance(verification, gnupg._parsers.Verify))
self.assertTrue(verification.valid)
datafd.close()
self.assertFalse(verification.valid)
if os.path.isfile(sigfn):
os.unlink(sigfn)
def test_deletion_public_key(self):
"""Test that key deletion for public keys works, and that it leaves the
corresponding secret key intact.
"""
key1 = None
key2 = None
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
res1 = self.gpg.import_keys(fh1.read())
key1 = res1.fingerprints[0]
with open(os.path.join(_files, 'test_key_2.sec')) as fh2:
res2 = self.gpg.import_keys(fh2.read())
key2 = res2.fingerprints[0]
def test_deletion(self):
"""Test that key deletion works."""
self.gpg.import_keys(KEYS_TO_IMPORT)
public_keys = self.gpg.list_keys()
self.assertTrue(len(public_keys), 2)
self.gpg.delete_keys(key1)
self.assertTrue(is_list_with_len(public_keys, 2),
"2-element list expected, got %d" % len(public_keys))
self.gpg.delete_keys(public_keys[0]['fingerprint'])
public_keys = self.gpg.list_keys()
secret_keys = self.gpg.list_keys(secret=True)
self.assertTrue(len(public_keys), 1)
self.assertTrue(len(secret_keys), 2)
def test_deletion_secret_key(self):
"""Test that key deletion for secret keys works, and that it leaves the
corresponding public key intact.
"""
key1 = None
key2 = None
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
res1 = self.gpg.import_keys(fh1.read())
key1 = res1.fingerprints[0]
with open(os.path.join(_files, 'test_key_2.sec')) as fh2:
res2 = self.gpg.import_keys(fh2.read())
key2 = res2.fingerprints[0]
public_keys = self.gpg.list_keys()
secret_keys = self.gpg.list_keys(secret=True)
self.assertEqual(len(public_keys), 2)
self.assertEqual(len(secret_keys), 2)
self.gpg.delete_keys(key1, secret=True)
public_keys = self.gpg.list_keys()
secret_keys = self.gpg.list_keys(secret=True)
self.assertEqual(len(public_keys), 2)
self.assertEqual(len(secret_keys), 1)
def test_deletion_subkeys(self):
"""Test that key deletion for subkeys deletes both the public and
secret portions of the key.
"""
key1 = None
key2 = None
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
res1 = self.gpg.import_keys(fh1.read())
key1 = res1.fingerprints[0]
with open(os.path.join(_files, 'test_key_2.sec')) as fh2:
res2 = self.gpg.import_keys(fh2.read())
key2 = res2.fingerprints[0]
public_keys = self.gpg.list_keys()
secret_keys = self.gpg.list_keys(secret=True)
self.assertEqual(len(public_keys), 2)
self.assertEqual(len(secret_keys), 2)
self.gpg.delete_keys(key1, subkeys=True)
public_keys = self.gpg.list_keys()
secret_keys = self.gpg.list_keys(secret=True)
self.assertEqual(len(public_keys), 1)
self.assertEqual(len(secret_keys), 1)
self.assertTrue(is_list_with_len(public_keys, 1),
"1-element list expected, got %d" % len(public_keys))
log.debug("test_deletion ends")
def test_encryption(self):
"""Test encryption of a message string"""
@ -908,75 +776,6 @@ authentication."""
log.debug("Encrypted: %s" % encrypted)
self.assertNotEquals(message, encrypted)
def _encryption_test_setup(self):
passphrase = "craiggentry"
key = self.generate_key("Craig Gentry", "xorr.ox", passphrase=passphrase)
fpr = str(key.fingerprint)
gentry = self.gpg.export_keys(key.fingerprint)
self.gpg.import_keys(gentry)
message = """
In 2010 Riggio and Sicari presented a practical application of homomorphic
encryption to a hybrid wireless sensor/mesh network. The system enables
transparent multi-hop wireless backhauls that are able to perform statistical
analysis of different kinds of data (temperature, humidity, etc.) coming from
a WSN while ensuring both end-to-end encryption and hop-by-hop
authentication."""
return (message, fpr, passphrase)
def _encryption_test(self, stream_type, message, fingerprint, passphrase):
stream = stream_type(message)
encrypted = self.gpg.encrypt(stream, fingerprint).data
decrypted = self.gpg.decrypt(encrypted, passphrase=passphrase).data
if isinstance(decrypted, bytes):
decrypted = decrypted.decode()
if isinstance(message, bytes):
message = message.decode()
self.assertEqual(message, decrypted)
def test_encryption_of_file_like_objects_io_StringIO(self):
"""Test encryption of file-like object io.StringIO."""
message, fpr, passphrase = self._encryption_test_setup()
try:
from io import StringIO
if _util._py3k:
self._encryption_test(StringIO, message, fpr, passphrase)
else:
self._encryption_test(StringIO, unicode(message), fpr, passphrase)
except ImportError:
pass
def test_encryption_of_file_like_objects_io_BytesIO(self):
"""Test encryption of file-like object io.BytesIO."""
message, fpr, passphrase = self._encryption_test_setup()
try:
from io import BytesIO
if _util._py3k:
self._encryption_test(BytesIO, bytes(message, 'utf-8'), fpr, passphrase)
else:
self._encryption_test(BytesIO, message, fpr, passphrase)
except ImportError:
pass
def test_encryption_of_file_like_objects_StringIO_StringIO(self):
"""Test encryption of file-like object StringIO.StringIO (Python2 only)."""
message, fpr, passphrase = self._encryption_test_setup()
if not _util._py3k:
from StringIO import StringIO
self._encryption_test(StringIO, message, fpr, passphrase)
def test_encryption_of_file_like_objects_cStringIO_StringIO(self):
"""Test encryption of file-like object cStringIO.StringIO (Python2 only)."""
message, fpr, passphrase = self._encryption_test_setup()
if not _util._py3k:
from cStringIO import StringIO
self._encryption_test(StringIO, message, fpr, passphrase)
def test_encryption_alt_encoding(self):
"""Test encryption with latin-1 encoding"""
key = self.generate_key("Craig Gentry", "xorr.ox",
@ -985,7 +784,11 @@ authentication."""
key = self.generate_key("Marten van Dijk", "xorr.ox")
dijk = str(key.fingerprint)
self.gpg._encoding = 'latin-1'
data = u'Hello, André!'.encode(self.gpg._encoding)
if _util._py3k:
data = 'Hello, André!'
else:
data = unicode('Hello, André', self.gpg._encoding)
data = data.encode(self.gpg._encoding)
encrypted = self.gpg.encrypt(data, gentry)
edata = str(encrypted.data)
self.assertNotEqual(data, edata)
@ -1082,104 +885,6 @@ authentication."""
self.assertEqual(message, decrypted)
def test_decryption_with_bytes_literal(self):
"""Test that ``decrypt(encrypt(b'foo'), ...)`` is successful."""
with open(os.path.join(_files, 'kat.sec')) as katsec:
self.gpg.import_keys(katsec.read())
kat = self.gpg.list_keys('kat')[0]['fingerprint']
message_filename = os.path.join(_files, 'cypherpunk_manifesto')
with open(message_filename, 'rb') as f:
output = os.path.join(self.gpg.homedir, 'test-decryption-with-bytes-literal.gpg')
kwargs = dict(compress_algo='Uncompressed')
message = b'Dance like a psycho'
encrypted = self.gpg.encrypt(message, kat, **kwargs)
self.assertTrue(encrypted.ok)
self.assertGreater(len(str(encrypted)), 0)
decrypted = self.gpg.decrypt(encrypted.data, passphrase='overalls')
self.assertTrue(decrypted.ok)
self.assertGreater(len(str(decrypted)), 0)
# Decode the message so that we can easily compare it with the
# decrypted version in both Python2 and Python3:
decoded = message.decode(self.gpg._encoding, self.gpg._decode_errors)
self.assertEqual(str(decrypted), decoded)
def test_encryption_one_hidden_recipient_one_not(self):
"""Test to ensure hidden recipient isn't detailed in packet info"""
alice = open(os.path.join(_files, 'test_key_1.pub'))
alice_pub = alice.read()
alice_public = self.gpg.import_keys(alice_pub)
res = alice_public.results[-1:][0]
alice_pfpr = str(res['fingerprint'])
alice.close()
bob = open(os.path.join(_files, 'test_key_2.pub'))
bob_pub = bob.read()
bob_public = self.gpg.import_keys(bob_pub)
res = bob_public.results[-1:][0]
bob_pfpr = str(res['fingerprint'])
bob.close()
message = """
In 2010 Riggio and Sicari presented a practical application of homomorphic
encryption to a hybrid wireless sensor/mesh network. The system enables
transparent multi-hop wireless backhauls that are able to perform statistical
analysis of different kinds of data (temperature, humidity, etc.) coming from
a WSN while ensuring both end-to-end encryption and hop-by-hop
authentication."""
enc = self.gpg.encrypt(message, alice_pfpr, bob_pfpr, hidden_recipients=[alice_pfpr])
encrypted = str(enc)
log.debug("keyid = %s"
% alice_pfpr)
self.assertNotEquals(message, encrypted)
## We expect Alice's key to be hidden (returned as zero's) and Bob's
## key to be there.
expected_values = ["0000000000000000", "E0ED97345F2973D6"]
self.assertEquals(expected_values, self.gpg.list_packets(encrypted).encrypted_to)
def test_encryption_throw_keyids(self):
"""Test to ensure throw-keyids=True causes all recipients to be hidden.
"""
alice = open(os.path.join(_files, 'test_key_1.pub'))
alice_pub = alice.read()
alice_public = self.gpg.import_keys(alice_pub)
res = alice_public.results[-1:][0]
alice_pfpr = str(res['fingerprint'])
alice.close()
bob = open(os.path.join(_files, 'test_key_2.pub'))
bob_pub = bob.read()
bob_public = self.gpg.import_keys(bob_pub)
res = bob_public.results[-1:][0]
bob_pfpr = str(res['fingerprint'])
bob.close()
message = """
Pairing-based cryptography has led to several cryptographic advancements. One
of these advancements is more powerful and more efficient non-interactive
zero-knowledge proofs. The seminal idea was to hide the values for the
evaluation of the pairing in a commitment. Using different commitment schemes,
this idea was used to build zero-knowledge proof systems under the sub-group
hiding and under the decisional linear assumption. These proof systems prove
circuit satisfiability, and thus by the CookLevin theorem allow to prove
membership for every language in NP. The size of the common reference string
and the proofs is relatively small, however transforming a statement into a
boolean circuit causes a considerable overhead."""
enc = self.gpg.encrypt(message, alice_pfpr, bob_pfpr, throw_keyids=True)
encrypted = str(enc)
log.debug("keyid = %s"
% alice_pfpr)
self.assertNotEquals(message, encrypted)
## We expect Alice's key to be hidden (returned as zero's) and Bob's
## key to be there.
expected_values = ["0000000000000000", "0000000000000000"]
packets = self.gpg.list_packets(encrypted)
self.assertEquals(expected_values, packets.encrypted_to)
def test_encryption_decryption_multi_recipient(self):
"""Test decryption of an encrypted string for multiple users"""
@ -1302,9 +1007,9 @@ know, maybe you shouldn't be doing it in the first place.
self.assertTrue(os.path.isfile(output))
# Check the contents:
with open(output, 'rb') as fh:
with open(output) as fh:
encrypted_message = fh.read()
self.assertTrue(b"-----BEGIN PGP MESSAGE-----" in encrypted_message)
log.debug("Encrypted file contains:\n\n%s\n" % encrypted_message)
def test_encryption_to_filehandle(self):
"""Test that ``encrypt(..., output=filelikething)`` is successful."""
@ -1324,45 +1029,9 @@ know, maybe you shouldn't be doing it in the first place.
self.assertTrue(os.path.isfile(output))
# Check the contents:
with open(output, 'rb') as fh:
with open(output) as fh:
encrypted_message = fh.read()
self.assertTrue(b"-----BEGIN PGP MESSAGE-----" in encrypted_message)
def test_encryption_from_filehandle(self):
"""Test that ``encrypt(open('foo'), ...)`` is successful."""
message_filename = os.path.join(_files, 'cypherpunk_manifesto')
with open(message_filename, 'rb') as f:
output = os.path.join(self.gpg.homedir, 'test-encryption-from-filehandle.gpg')
kwargs = dict(passphrase='speedtest',
symmetric=True,
cipher_algo='AES256',
encrypt=False,
output=output)
encrypted = self.gpg.encrypt(f, None, **kwargs)
self.assertTrue(encrypted.ok)
self.assertGreater(len(encrypted.data), 0)
def test_encryption_with_output(self):
"""Test that ``encrypt('foo', ..., output='/foo/bar/baz')`` is successful."""
message_filename = os.path.join(_files, 'cypherpunk_manifesto')
with open (message_filename, 'rb') as f:
data = f.read()
output = os.path.join(self.gpg.homedir, 'test-encryption-with-output.gpg')
kwargs = dict(passphrase='speedtest',
symmetric=True,
cipher_algo='AES256',
encrypt=False,
output=output)
encrypted = self.gpg.encrypt(data, None, **kwargs)
self.assertTrue(encrypted.ok)
self.assertGreater(len(encrypted.data), 0)
self.assertTrue(os.path.isfile(output))
# Check the contents:
with open(output, 'rb') as fh:
encrypted_message = fh.read()
self.assertTrue(b"-----BEGIN PGP MESSAGE-----" in encrypted_message)
log.debug("Encrypted file contains:\n\n%s\n" % encrypted_message)
suites = { 'parsers': set(['test_parsers_fix_unsafe',
@ -1399,41 +1068,25 @@ suites = { 'parsers': set(['test_parsers_fix_unsafe',
'test_signature_verification_detached',
'test_signature_verification_detached_binary',
'test_signature_file',
'test_signature_string_passphrase_empty_string',
'test_signature_string_passphrase_empty_bytes_literal',
'test_signature_string_passphrase_bytes_literal',
'test_signature_string_passphrase_None',
'test_signature_string_bad_passphrase',
'test_signature_string_verification',
'test_signature_string_algorithm_encoding']),
'crypt': set(['test_encryption',
'test_encryption_of_file_like_objects_io_StringIO',
'test_encryption_of_file_like_objects_io_BytesIO',
'test_encryption_of_file_like_objects_StringIO_StringIO',
'test_encryption_of_file_like_objects_cStringIO_StringIO',
'test_encryption_alt_encoding',
'test_encryption_multi_recipient',
'test_encryption_decryption_multi_recipient',
'test_encryption_one_hidden_recipient_one_not',
'test_encryption_throw_keyids',
'test_decryption',
'test_decryption_with_bytes_literal',
'test_symmetric_encryption_and_decryption',
'test_file_encryption_and_decryption',
'test_encryption_to_filename',
'test_encryption_to_filehandle',
'test_encryption_from_filehandle',
'test_encryption_with_output',]),
'test_encryption_to_filehandle',]),
'listkeys': set(['test_list_keys_after_generation']),
'keyrings': set(['test_public_keyring',
'test_secret_keyring',
'test_import_and_export',
'test_deletion_public_key',
'test_deletion_secret_key',
'test_deletion_subkeys',
'test_import_only']),
'recvkeys': set(['test_recv_keys_default']),
}
'test_deletion',
'test_import_only',
'test_recv_keys_default',]), }
def main(args):
if not args.quiet:

View File

@ -0,0 +1,30 @@
From cfcb84db5452b1fbc801ca85f2f70015660f3132 Mon Sep 17 00:00:00 2001
From: Lunar <lunar@anargeek.net>
Date: Wed, 6 Mar 2013 18:39:34 +0100
Subject: [PATCH] Make _open_subprocess argument more explicit in _handle_io
The previous way worked out, but was really harder to understand.
---
gnupg.py | 6 +++++-
1 files changed, 5 insertions(+), 1 deletions(-)
diff --git a/gnupg.py b/gnupg.py
index 4a73164..479e6dd 100644
--- a/gnupg.py
+++ b/gnupg.py
@@ -984,7 +984,11 @@ class GPG(object):
"""
Handle a call to GPG - pass input data, collect output data.
"""
- p = self._open_subprocess(args, passphrase is not None)
+ if passphrase is not None:
+ ask_passphrase = True
+ else:
+ ask_passphrase = False
+ p = self._open_subprocess(args, ask_passphrase)
if not binary:
stdin = codecs.getwriter(self.encoding)(p.stdin)
else:
--
1.7.2.5

23
patches/README.md 100644
View File

@ -0,0 +1,23 @@
This patches folder is managed by quilt, which is a tool for automatic patch
application and removal. To use quilt with the patches in this directory,
navigate to the top level directory of this repository, and do:
$ quilt setup patches/series
To add an externally created patch (in other words, one created with ```diff
--git``` or ```git diff```), place that .patch or .diff file in this directory,
and do:
$ quilt import patches/<patchfile>
Then, to apply the new patch, do:
$ quilt push
Removing patches from the stack can be done with:
$ quilt pop
Please see the man quilt(1) for more information on adding and importing new
patches. The debian package maintainer guides also have chapters on quilt
usage.

1
patches/series 100644
View File

@ -0,0 +1 @@
0001-Make-_open_subprocess-argument-more-explicit-in-_han.patch

View File

@ -0,0 +1,558 @@
#!/usr/bin/env python
"""Bootstrap distribute installation
If you want to use setuptools in your package's setup.py, just include this
file in the same directory with it, and add this to the top of your setup.py::
from distribute_setup import use_setuptools
use_setuptools()
If you want to require a specific version of setuptools, set a download
mirror, or use an alternate download directory, you can do so by supplying
the appropriate options to ``use_setuptools()``.
This file can also be run as a script to install or upgrade setuptools.
This file was taken from http://nightly.ziade.org/distribute_setup.py
on 2013-05-27.
"""
import os
import shutil
import sys
import time
import fnmatch
import tempfile
import tarfile
import optparse
from distutils import log
try:
from site import USER_SITE
except ImportError:
USER_SITE = None
try:
import subprocess
def _python_cmd(*args):
args = (sys.executable,) + args
return subprocess.call(args) == 0
except ImportError:
# will be used for python 2.3
def _python_cmd(*args):
args = (sys.executable,) + args
# quoting arguments if windows
if sys.platform == 'win32':
def quote(arg):
if ' ' in arg:
return '"%s"' % arg
return arg
args = [quote(arg) for arg in args]
return os.spawnl(os.P_WAIT, sys.executable, *args) == 0
DEFAULT_VERSION = "0.6.44"
DEFAULT_URL = "http://pypi.python.org/packages/source/d/distribute/"
SETUPTOOLS_FAKED_VERSION = "0.6c11"
SETUPTOOLS_PKG_INFO = """\
Metadata-Version: 1.0
Name: setuptools
Version: %s
Summary: xxxx
Home-page: xxx
Author: xxx
Author-email: xxx
License: xxx
Description: xxx
""" % SETUPTOOLS_FAKED_VERSION
def _install(tarball, install_args=()):
# extracting the tarball
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
tar = tarfile.open(tarball)
_extractall(tar)
tar.close()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
# installing
log.warn('Installing Distribute')
if not _python_cmd('setup.py', 'install', *install_args):
log.warn('Something went wrong during the installation.')
log.warn('See the error message above.')
# exitcode will be 2
return 2
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
def _build_egg(egg, tarball, to_dir):
# extracting the tarball
tmpdir = tempfile.mkdtemp()
log.warn('Extracting in %s', tmpdir)
old_wd = os.getcwd()
try:
os.chdir(tmpdir)
tar = tarfile.open(tarball)
_extractall(tar)
tar.close()
# going in the directory
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
os.chdir(subdir)
log.warn('Now working in %s', subdir)
# building an egg
log.warn('Building a Distribute egg in %s', to_dir)
_python_cmd('setup.py', '-q', 'bdist_egg', '--dist-dir', to_dir)
finally:
os.chdir(old_wd)
shutil.rmtree(tmpdir)
# returning the result
log.warn(egg)
if not os.path.exists(egg):
raise IOError('Could not build the egg.')
def _do_download(version, download_base, to_dir, download_delay):
egg = os.path.join(to_dir, 'distribute-%s-py%d.%d.egg'
% (version, sys.version_info[0], sys.version_info[1]))
if not os.path.exists(egg):
tarball = download_setuptools(version, download_base,
to_dir, download_delay)
_build_egg(egg, tarball, to_dir)
sys.path.insert(0, egg)
import setuptools
setuptools.bootstrap_install_from = egg
def use_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
to_dir=os.curdir, download_delay=15, no_fake=True):
# making sure we use the absolute path
to_dir = os.path.abspath(to_dir)
was_imported = 'pkg_resources' in sys.modules or \
'setuptools' in sys.modules
try:
try:
import pkg_resources
# Setuptools 0.7b and later is a suitable (and preferable)
# substitute for any Distribute version.
try:
pkg_resources.require("setuptools>=0.7b")
return
except pkg_resources.DistributionNotFound:
pass
if not hasattr(pkg_resources, '_distribute'):
if not no_fake:
_fake_setuptools()
raise ImportError
except ImportError:
return _do_download(version, download_base, to_dir, download_delay)
try:
pkg_resources.require("distribute>=" + version)
return
except pkg_resources.VersionConflict:
e = sys.exc_info()[1]
if was_imported:
sys.stderr.write(
"The required version of distribute (>=%s) is not available,\n"
"and can't be installed while this script is running. Please\n"
"install a more recent version first, using\n"
"'easy_install -U distribute'."
"\n\n(Currently using %r)\n" % (version, e.args[0]))
sys.exit(2)
else:
del pkg_resources, sys.modules['pkg_resources'] # reload ok
return _do_download(version, download_base, to_dir,
download_delay)
except pkg_resources.DistributionNotFound:
return _do_download(version, download_base, to_dir,
download_delay)
finally:
if not no_fake:
_create_fake_setuptools_pkg_info(to_dir)
def download_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
to_dir=os.curdir, delay=15):
"""Download distribute from a specified location and return its filename
`version` should be a valid distribute version number that is available
as an egg for download under the `download_base` URL (which should end
with a '/'). `to_dir` is the directory where the egg will be downloaded.
`delay` is the number of seconds to pause before an actual download
attempt.
"""
# making sure we use the absolute path
to_dir = os.path.abspath(to_dir)
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
tgz_name = "distribute-%s.tar.gz" % version
url = download_base + tgz_name
saveto = os.path.join(to_dir, tgz_name)
src = dst = None
if not os.path.exists(saveto): # Avoid repeated downloads
try:
log.warn("Downloading %s", url)
src = urlopen(url)
# Read/write all in one block, so we don't create a corrupt file
# if the download is interrupted.
data = src.read()
dst = open(saveto, "wb")
dst.write(data)
finally:
if src:
src.close()
if dst:
dst.close()
return os.path.realpath(saveto)
def _no_sandbox(function):
def __no_sandbox(*args, **kw):
try:
from setuptools.sandbox import DirectorySandbox
if not hasattr(DirectorySandbox, '_old'):
def violation(*args):
pass
DirectorySandbox._old = DirectorySandbox._violation
DirectorySandbox._violation = violation
patched = True
else:
patched = False
except ImportError:
patched = False
try:
return function(*args, **kw)
finally:
if patched:
DirectorySandbox._violation = DirectorySandbox._old
del DirectorySandbox._old
return __no_sandbox
def _patch_file(path, content):
"""Will backup the file then patch it"""
f = open(path)
existing_content = f.read()
f.close()
if existing_content == content:
# already patched
log.warn('Already patched.')
return False
log.warn('Patching...')
_rename_path(path)
f = open(path, 'w')
try:
f.write(content)
finally:
f.close()
return True
_patch_file = _no_sandbox(_patch_file)
def _same_content(path, content):
f = open(path)
existing_content = f.read()
f.close()
return existing_content == content
def _rename_path(path):
new_name = path + '.OLD.%s' % time.time()
log.warn('Renaming %s to %s', path, new_name)
os.rename(path, new_name)
return new_name
def _remove_flat_installation(placeholder):
if not os.path.isdir(placeholder):
log.warn('Unkown installation at %s', placeholder)
return False
found = False
for file in os.listdir(placeholder):
if fnmatch.fnmatch(file, 'setuptools*.egg-info'):
found = True
break
if not found:
log.warn('Could not locate setuptools*.egg-info')
return
log.warn('Moving elements out of the way...')
pkg_info = os.path.join(placeholder, file)
if os.path.isdir(pkg_info):
patched = _patch_egg_dir(pkg_info)
else:
patched = _patch_file(pkg_info, SETUPTOOLS_PKG_INFO)
if not patched:
log.warn('%s already patched.', pkg_info)
return False
# now let's move the files out of the way
for element in ('setuptools', 'pkg_resources.py', 'site.py'):
element = os.path.join(placeholder, element)
if os.path.exists(element):
_rename_path(element)
else:
log.warn('Could not find the %s element of the '
'Setuptools distribution', element)
return True
_remove_flat_installation = _no_sandbox(_remove_flat_installation)
def _after_install(dist):
log.warn('After install bootstrap.')
placeholder = dist.get_command_obj('install').install_purelib
_create_fake_setuptools_pkg_info(placeholder)
def _create_fake_setuptools_pkg_info(placeholder):
if not placeholder or not os.path.exists(placeholder):
log.warn('Could not find the install location')
return
pyver = '%s.%s' % (sys.version_info[0], sys.version_info[1])
setuptools_file = 'setuptools-%s-py%s.egg-info' % \
(SETUPTOOLS_FAKED_VERSION, pyver)
pkg_info = os.path.join(placeholder, setuptools_file)
if os.path.exists(pkg_info):
log.warn('%s already exists', pkg_info)
return
log.warn('Creating %s', pkg_info)
try:
f = open(pkg_info, 'w')
except EnvironmentError:
log.warn("Don't have permissions to write %s, skipping", pkg_info)
return
try:
f.write(SETUPTOOLS_PKG_INFO)
finally:
f.close()
pth_file = os.path.join(placeholder, 'setuptools.pth')
log.warn('Creating %s', pth_file)
f = open(pth_file, 'w')
try:
f.write(os.path.join(os.curdir, setuptools_file))
finally:
f.close()
_create_fake_setuptools_pkg_info = _no_sandbox(
_create_fake_setuptools_pkg_info
)
def _patch_egg_dir(path):
# let's check if it's already patched
pkg_info = os.path.join(path, 'EGG-INFO', 'PKG-INFO')
if os.path.exists(pkg_info):
if _same_content(pkg_info, SETUPTOOLS_PKG_INFO):
log.warn('%s already patched.', pkg_info)
return False
_rename_path(path)
os.mkdir(path)
os.mkdir(os.path.join(path, 'EGG-INFO'))
pkg_info = os.path.join(path, 'EGG-INFO', 'PKG-INFO')
f = open(pkg_info, 'w')
try:
f.write(SETUPTOOLS_PKG_INFO)
finally:
f.close()
return True
_patch_egg_dir = _no_sandbox(_patch_egg_dir)
def _before_install():
log.warn('Before install bootstrap.')
_fake_setuptools()
def _under_prefix(location):
if 'install' not in sys.argv:
return True
args = sys.argv[sys.argv.index('install') + 1:]
for index, arg in enumerate(args):
for option in ('--root', '--prefix'):
if arg.startswith('%s=' % option):
top_dir = arg.split('root=')[-1]
return location.startswith(top_dir)
elif arg == option:
if len(args) > index:
top_dir = args[index + 1]
return location.startswith(top_dir)
if arg == '--user' and USER_SITE is not None:
return location.startswith(USER_SITE)
return True
def _fake_setuptools():
log.warn('Scanning installed packages')
try:
import pkg_resources
except ImportError:
# we're cool
log.warn('Setuptools or Distribute does not seem to be installed.')
return
ws = pkg_resources.working_set
try:
setuptools_dist = ws.find(
pkg_resources.Requirement.parse('setuptools', replacement=False)
)
except TypeError:
# old distribute API
setuptools_dist = ws.find(
pkg_resources.Requirement.parse('setuptools')
)
if setuptools_dist is None:
log.warn('No setuptools distribution found')
return
# detecting if it was already faked
setuptools_location = setuptools_dist.location
log.warn('Setuptools installation detected at %s', setuptools_location)
# if --root or --preix was provided, and if
# setuptools is not located in them, we don't patch it
if not _under_prefix(setuptools_location):
log.warn('Not patching, --root or --prefix is installing Distribute'
' in another location')
return
# let's see if its an egg
if not setuptools_location.endswith('.egg'):
log.warn('Non-egg installation')
res = _remove_flat_installation(setuptools_location)
if not res:
return
else:
log.warn('Egg installation')
pkg_info = os.path.join(setuptools_location, 'EGG-INFO', 'PKG-INFO')
if (os.path.exists(pkg_info) and
_same_content(pkg_info, SETUPTOOLS_PKG_INFO)):
log.warn('Already patched.')
return
log.warn('Patching...')
# let's create a fake egg replacing setuptools one
res = _patch_egg_dir(setuptools_location)
if not res:
return
log.warn('Patching complete.')
_relaunch()
def _relaunch():
log.warn('Relaunching...')
# we have to relaunch the process
# pip marker to avoid a relaunch bug
_cmd1 = ['-c', 'install', '--single-version-externally-managed']
_cmd2 = ['-c', 'install', '--record']
if sys.argv[:3] == _cmd1 or sys.argv[:3] == _cmd2:
sys.argv[0] = 'setup.py'
args = [sys.executable] + sys.argv
sys.exit(subprocess.call(args))
def _extractall(self, path=".", members=None):
"""Extract all members from the archive to the current working
directory and set owner, modification time and permissions on
directories afterwards. `path' specifies a different directory
to extract to. `members' is optional and must be a subset of the
list returned by getmembers().
"""
import copy
import operator
from tarfile import ExtractError
directories = []
if members is None:
members = self
for tarinfo in members:
if tarinfo.isdir():
# Extract directories with a safe mode.
directories.append(tarinfo)
tarinfo = copy.copy(tarinfo)
tarinfo.mode = 448 # decimal for oct 0700
self.extract(tarinfo, path)
# Reverse sort directories.
if sys.version_info < (2, 4):
def sorter(dir1, dir2):
return cmp(dir1.name, dir2.name)
directories.sort(sorter)
directories.reverse()
else:
directories.sort(key=operator.attrgetter('name'), reverse=True)
# Set correct owner, mtime and filemode on directories.
for tarinfo in directories:
dirpath = os.path.join(path, tarinfo.name)
try:
self.chown(tarinfo, dirpath)
self.utime(tarinfo, dirpath)
self.chmod(tarinfo, dirpath)
except ExtractError:
e = sys.exc_info()[1]
if self.errorlevel > 1:
raise
else:
self._dbg(1, "tarfile: %s" % e)
def _build_install_args(options):
"""
Build the arguments to 'python setup.py install' on the distribute package
"""
install_args = []
if options.user_install:
if sys.version_info < (2, 6):
log.warn("--user requires Python 2.6 or later")
raise SystemExit(1)
install_args.append('--user')
return install_args
def _parse_args():
"""
Parse the command line for options
"""
parser = optparse.OptionParser()
parser.add_option(
'--user', dest='user_install', action='store_true', default=False,
help='install in user site package (requires Python 2.6 or later)')
parser.add_option(
'--download-base', dest='download_base', metavar="URL",
default=DEFAULT_URL,
help='alternative URL from where to download the distribute package')
options, args = parser.parse_args()
# positional arguments are ignored
return options
def main(version=DEFAULT_VERSION):
"""Install or upgrade setuptools and EasyInstall"""
options = _parse_args()
tarball = download_setuptools(download_base=options.download_base)
return _install(tarball, _build_install_args(options))
if __name__ == '__main__':
sys.exit(main())

1153
scripts/get-pip.py 100755

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,55 @@
#!/bin/bash
# -*- coding: utf-8 -*-
#
# This file is part of python-gnupg, a Python interface to GnuPG.
# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
# © 2013 Andrej B.
# © 2013 LEAP Encryption Access Project
# © 2008-2012 Vinay Sajip
# © 2005 Steve Traugott
# © 2004 A.M. Kuchling
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option)
# any later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
project=python-gnupg
VENV=$(which virtualenv)
WRPR=$(which virtualenvwrapper.sh)
if ! test -n "$VENV" ; then
printf "Couldn't find virtualenv. Are you sure it's installed?"
exit 1
fi
if ! test -n "$WRPR"; then
printf "Couldn't find virtualenvwrapper. Are you sure it's installed?"
exit 1
fi
test -r "$WRPR" && . $WRPR
okay=$?
if test "$okay" -eq 0 ; then
printf "Using %s as WORKON_HOME for the new virtualenv...\n" $PWD
printf "What should the name of the new virtualenv be? (default: '%s')\n" $project
read -p"Name for this virtualenv?: " name
if test -z "$name"; then
name="$project"
fi
printf "Using '$name' as our project's name..."
printf "Creating virtualenv..."
mkvirtualenv -a "$PWD" --no-site-packages --unzip-setuptools \
--distribute --prompt="(gnupg)" "$name"
exit $?
else
printf "Something went wrong..."
printf "Exit code %d from mkvirtualenv." "$okay"
exit $okay
fi

1937
scripts/pep8.py 100644

File diff suppressed because it is too large Load Diff

View File

@ -22,19 +22,11 @@
from __future__ import absolute_import
from __future__ import print_function
import platform
import setuptools
import sys
import os
import versioneer
try:
import __pypy__
except ImportError:
_isPyPy = False
else:
_isPyPy = True
versioneer.versionfile_source = 'gnupg/_version.py'
versioneer.versionfile_build = 'gnupg/_version.py'
@ -83,13 +75,6 @@ def get_requirements():
# Required to make `collections.OrderedDict` available on Python<=2.6
requirements.append('ordereddict==1.1#a0ed854ee442051b249bfad0f638bbec')
# Don't try to install psutil on PyPy:
if _isPyPy:
for line in requirements[:]:
if line.startswith('psutil'):
print("Not installing %s on PyPy..." % line)
requirements.remove(line)
return requirements, links
@ -104,8 +89,8 @@ This module allows easy access to GnuPG's key management, encryption and \
signature functionality from Python programs, by interacting with GnuPG \
through file descriptors. Input arguments are strictly checked and sanitised, \
and therefore this module should be safe to use in networked applications \
requiring direct user input. It is intended for use on Windows, MacOS X, BSD, \
or Linux, with Python 2.6, Python 2.7, Python 3.3, Python 3.4, or PyPy.
requiring direct user input. It is intended for use with Python 2.6 or \
greater.
""",
license="GPLv3+",
@ -134,13 +119,7 @@ or Linux, with Python 2.6, Python 2.7, Python 3.3, Python 3.4, or PyPy.
classifiers=[
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Intended Audience :: System Administrators",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"Operating System :: Android",
"Operating System :: MacOS :: MacOS X",
"Operating System :: Microsoft :: Windows",
"Operating System :: POSIX :: BSD",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 3",
@ -148,8 +127,6 @@ or Linux, with Python 2.6, Python 2.7, Python 3.3, Python 3.4, or PyPy.
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Security :: Cryptography",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Utilities",]