Compare commits
145 Commits
1.3.1.tg.0
...
master
Author | SHA1 | Date |
---|---|---|
|
2beac24161 | |
|
4f1b1f6a8d | |
|
da59707945 | |
|
a6d024ff49 | |
|
a06c93d6e7 | |
|
76d70c68aa | |
|
94f81c3226 | |
|
a749acf486 | |
|
16107bc8a8 | |
|
9be01ec6df | |
|
e90ae54738 | |
|
fea39ec83e | |
|
b0584854e1 | |
|
49079f9672 | |
|
adc4994a0d | |
|
8c5730666a | |
|
5895997554 | |
|
917a3dcd89 | |
|
b7f520244e | |
|
742ccd77fb | |
|
d66d86ca7e | |
|
0af2720754 | |
|
10df44b75f | |
|
af781626a9 | |
|
cf4d3efd8d | |
|
b7ff69092a | |
|
8e6c2a752c | |
|
79285c4c17 | |
|
6d1890389c | |
|
8579331562 | |
|
0c87da3d78 | |
|
38685ae001 | |
|
9d8d6cce5c | |
|
43164fa7db | |
|
a7e772f10a | |
|
4be6fb75e3 | |
|
b970917701 | |
|
782a81b46a | |
|
b3dd20a7e5 | |
|
af403fe144 | |
|
657be31ae1 | |
|
749ef6fa00 | |
|
2cf3dd1c86 | |
|
3127c21d55 | |
|
2c57c0f6d0 | |
|
88bfaaffc2 | |
|
eb205774fb | |
|
5d03c3c5eb | |
|
ae5cb33d63 | |
|
d3e6ae33b4 | |
|
d3b7dd0353 | |
|
d31d0cf131 | |
|
f3c193d8b4 | |
|
55b586fd18 | |
|
f858080148 | |
|
572429eed9 | |
|
ceb1c2fbbd | |
|
90c6613684 | |
|
939728694c | |
|
d66b23b896 | |
|
6c15f25ee5 | |
|
8c261eba30 | |
|
f8ccdc5028 | |
|
8a7699236c | |
|
5025df1661 | |
|
b1dab1570d | |
|
8f92335476 | |
|
613e84cd56 | |
|
1a22565e24 | |
|
81b2d9d9c2 | |
|
513a48d876 | |
|
09c6a08637 | |
|
09a0af7b41 | |
|
906b1a7142 | |
|
cbebe4f509 | |
|
a1e4a8a756 | |
|
a1c45a6f63 | |
|
77c6c3d0e5 | |
|
cc959c755d | |
|
63fe93f8b5 | |
|
f3515c44d7 | |
|
a3e2f70cd8 | |
|
5aaf1df5d1 | |
|
245cf73baf | |
|
8e5b3256fa | |
|
30c8c05f84 | |
|
f9d00794ee | |
|
99e597f196 | |
|
de82a1c32a | |
|
1104908ef9 | |
|
899d429f7a | |
|
59992eb8c3 | |
|
fecf92c0ac | |
|
350027b18c | |
|
f70a7dc4f0 | |
|
327d2e5f66 | |
|
df5fdb5aa8 | |
|
751f25cb35 | |
|
d9116bace6 | |
|
d1025ae524 | |
|
79b1cbc260 | |
|
d1a8fb101f | |
|
98e0fc7cf5 | |
|
1b8c305829 | |
|
41c388e8f7 | |
|
7c2d060fca | |
|
4a8a8c8592 | |
|
432833bdcb | |
|
f6453b96a6 | |
|
3a287d6e11 | |
|
17cf24c30d | |
|
572c911b82 | |
|
2266a146c3 | |
|
3f2fc00ef0 | |
|
177d762cef | |
|
eeeb8580b3 | |
|
29c971585d | |
|
f97c78d963 | |
|
ba632f3dad | |
|
345ac83742 | |
|
1472a5ccb7 | |
|
4da7ec92bd | |
|
08370fbc02 | |
|
b51b0eb5ac | |
|
bb5e2444ad | |
|
6e228c3ef0 | |
|
eb25ef2b91 | |
|
7dc3b66de4 | |
|
ed031210ca | |
|
69cc991938 | |
|
5ad99acd7c | |
|
d5fe2d7cc4 | |
|
7c56dd5864 | |
|
f521545903 | |
|
728b5c8ec9 | |
|
042550d151 | |
|
bde27abcf2 | |
|
293fa6b2e0 | |
|
a26cc9c0f4 | |
|
adf27fbdef | |
|
961bf8da1b | |
|
ff96904233 | |
|
2bac4a67ed | |
|
4dccc94d24 | |
|
580a5429f9 |
|
@ -0,0 +1,33 @@
|
|||
[run]
|
||||
source =
|
||||
gnupg
|
||||
branch = True
|
||||
#parallel = True
|
||||
timid = True
|
||||
|
||||
[report]
|
||||
modules = gnupg
|
||||
omit =
|
||||
*/test*
|
||||
*/_version*
|
||||
*/__init__*
|
||||
*/copyleft*
|
||||
*/sitecustomize*
|
||||
# Regexes for lines to exclude from report generation:
|
||||
exclude_lines =
|
||||
pragma: no cover
|
||||
# don't complain if the code doesn't hit unimplemented sections:
|
||||
raise NotImplementedError
|
||||
pass
|
||||
# don't complain if non-runnable or debuging code isn't run:
|
||||
if 0:
|
||||
if False:
|
||||
def __repr__
|
||||
if __name__ == .__main__.:
|
||||
# Ignore source code which cannot be found:
|
||||
ignore_errors = True
|
||||
# Exit with status code 2 if under this percentage is covered:
|
||||
fail_under = 10
|
||||
|
||||
[html]
|
||||
directory = docs/coverage-html
|
12
.travis.yml
12
.travis.yml
|
@ -1,12 +0,0 @@
|
|||
language: python
|
||||
before_install:
|
||||
- sudo apt-get install gpg
|
||||
python:
|
||||
- "2.6"
|
||||
- "2.7"
|
||||
- "3.3"
|
||||
install:
|
||||
- make install
|
||||
# command to run tests, e.g. python setup.py test
|
||||
script:
|
||||
- make test
|
13
MANIFEST.in
13
MANIFEST.in
|
@ -1,4 +1,15 @@
|
|||
include LICENSE
|
||||
include requirements.txt
|
||||
|
||||
graft docs
|
||||
recursive-include examples *.py
|
||||
|
||||
recursive-include docs *.rst
|
||||
include docs/_static
|
||||
include docs/conf.py
|
||||
include docs/make.bat
|
||||
include docs/Makefile
|
||||
prune docs/_build
|
||||
|
||||
prune private
|
||||
|
||||
global-exclude *.log *~
|
||||
|
|
83
Makefile
83
Makefile
|
@ -2,6 +2,8 @@ SHELL=/bin/sh
|
|||
TESTDIR=./gnupg/test
|
||||
TESTHANDLE=$(TESTDIR)/test_gnupg.py
|
||||
FILES=$(SHELL find ./gnupg/ -name "*.py" -printf "%p,")
|
||||
PYTHON=$(SHELL which python)
|
||||
PYTHON3=$(SHELL which python3)
|
||||
PKG_NAME=python-gnupg
|
||||
DOC_DIR=docs
|
||||
DOC_BUILD_DIR:=$(DOC_DIR)/_build
|
||||
|
@ -34,8 +36,11 @@ cleanup-tests-all: cleanup-tests
|
|||
rm -rf tests/tmp
|
||||
|
||||
cleanup-build:
|
||||
mkdir buildnot
|
||||
rm -rf build*
|
||||
-rm MANIFEST
|
||||
-rm -rf build
|
||||
|
||||
cleanup-dist:
|
||||
-rm -rf dist
|
||||
|
||||
# it's not strictly necessary that gnupg2, gpg-agent, pinentry, or pip be
|
||||
# installed, so ignore error exit statuses for those commands
|
||||
|
@ -47,23 +52,70 @@ test-before: cleanup-src cleanup-tests
|
|||
which python && python --version
|
||||
-which pip && pip --version && pip list
|
||||
|
||||
test: test-before
|
||||
python $(TESTHANDLE) basic encodings parsers keyrings listkeys genkey \
|
||||
sign crypt
|
||||
test-run: test-before
|
||||
python $(TESTHANDLE) \
|
||||
basic \
|
||||
encodings \
|
||||
parsers \
|
||||
keyrings \
|
||||
listkeys \
|
||||
genkey \
|
||||
sign \
|
||||
crypt
|
||||
|
||||
py3k-test-run: test-before
|
||||
python3 $(TESTHANDLE) \
|
||||
basic \
|
||||
encodings \
|
||||
parsers \
|
||||
keyrings \
|
||||
listkeys \
|
||||
genkey \
|
||||
sign \
|
||||
crypt
|
||||
|
||||
coverage-run: test-before
|
||||
coverage run --rcfile=".coveragerc" $(PYTHON) $(TESTHANDLE) \
|
||||
basic \
|
||||
encodings \
|
||||
parsers \
|
||||
keyrings \
|
||||
listkeys \
|
||||
genkeys \
|
||||
sign \
|
||||
crypt
|
||||
|
||||
py3k-coverage-run: test-before
|
||||
coverage run --rcfile=".coveragerc" $(PYTHON3) $(TESTHANDLE) \
|
||||
basic \
|
||||
encodings \
|
||||
parsers \
|
||||
keyrings \
|
||||
listkeys \
|
||||
genkeys \
|
||||
sign \
|
||||
crypt
|
||||
|
||||
coverage-report:
|
||||
coverage report --rcfile=".coveragerc"
|
||||
|
||||
coverage-html:
|
||||
coverage html --rcfile=".coveragerc"
|
||||
|
||||
clean-test:
|
||||
touch gnupg/test/placeholder.log
|
||||
mv gnupg/test/*.log gnupg/test/logs/
|
||||
rm gnupg/test/logs/placeholder.log
|
||||
touch gnupg/test/random_seed_is_sekritly_pi
|
||||
rm gnupg/test/random_seed*
|
||||
|
||||
py3k-test: test-before
|
||||
python3 $(TESTHANDLE) basic encodings parsers keyrings listkeys genkey \
|
||||
sign crypt
|
||||
touch gnupg/test/placeholder.log
|
||||
mv gnupg/test/*.log gnupg/test/logs/
|
||||
rm gnupg/test/logs/placeholder.log
|
||||
touch gnupg/test/random_seed_is_sekritly_pi
|
||||
rm gnupg/test/random_seed*
|
||||
test: test-run clean-test
|
||||
|
||||
py3k-test: py3k-test-run clean-test
|
||||
|
||||
coverage: coverage-run coverage-report coverage-html clean-test
|
||||
|
||||
py3k-coverage: py3k-coverage-run coverage-report coverage-html clean-test
|
||||
|
||||
install:
|
||||
python setup.py install --record installed-files.txt
|
||||
|
@ -90,3 +142,8 @@ docs-html:
|
|||
docs-zipfile: docs-html
|
||||
cd $(DOC_HTML_DIR) && { find . -name '*' | zip -@ -v ../$(DOC_BUILD_ZIP) ;};
|
||||
@echo "Built documentation in $(DOC_BUILD_DIR)/$(DOC_BUILD_ZIP)"
|
||||
|
||||
upload: cleanup-build
|
||||
python setup.py bdist_egg upload --sign
|
||||
#python setup.py bdist_wheel upload --sign
|
||||
python setup.py sdist --formats=gztar,zip upload --sign
|
||||
|
|
7
TODO
7
TODO
|
@ -5,13 +5,6 @@
|
|||
It would be nice to make the file descriptors for communication with the GnuPG
|
||||
process configurable, and not the default, hard-coded 0=stdin 1=stdout
|
||||
2=stderr.
|
||||
** TODO look into RDBMS/ORM for public key storage :io:db:
|
||||
see http://stackoverflow.com/q/1235594 and http://elixir.ematia.de/trac/wiki
|
||||
|
||||
memcached and pymemcached were the first ones I looked at, then I discovered
|
||||
redis, which seemed better. At some point we should look into using elixer,
|
||||
mentioned in the above SO post, so that the backend DB can be chosen freely
|
||||
and we´re not restricting users to only memcached/cassandra/redis/sqlite/etc.
|
||||
|
||||
* Key editing :editkey:
|
||||
** TODO add '--edit-key' feature :editkey:
|
||||
|
|
|
@ -170,7 +170,7 @@ def displayNewKey(key):
|
|||
# `result` is a `gnupg._parsers.ListKeys`, which is list-like, so iterate
|
||||
# over all the keys and display their info:
|
||||
for gpgkey in keylist:
|
||||
for k, v in gpgkey:
|
||||
for k, v in gpgkey.items():
|
||||
log.info("%s: %s" % (k.capitalize(), v))
|
||||
|
||||
return keylist
|
||||
|
|
250
gnupg/_meta.py
250
gnupg/_meta.py
|
@ -32,14 +32,22 @@ import encodings
|
|||
import locale
|
||||
import os
|
||||
import platform
|
||||
import psutil
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
import threading
|
||||
|
||||
## Using psutil is recommended, but since the extension doesn't run with the
|
||||
## PyPy interpreter, we'll run even if it's not present.
|
||||
try:
|
||||
import psutil
|
||||
except ImportError:
|
||||
psutil = None
|
||||
|
||||
from . import _parsers
|
||||
from . import _util
|
||||
from ._util import b
|
||||
from ._util import s
|
||||
|
||||
from ._parsers import _check_preferences
|
||||
from ._parsers import _sanitise_list
|
||||
|
@ -75,19 +83,49 @@ class GPGMeta(type):
|
|||
instance containing the gpg-agent process' information to
|
||||
``cls._agent_proc``.
|
||||
|
||||
For Unix systems, we check that the effective UID of this
|
||||
``python-gnupg`` process is also the owner of the gpg-agent
|
||||
process. For Windows, we check that the usernames of the owners are
|
||||
the same. (Sorry Windows users; maybe you should switch to anything
|
||||
else.)
|
||||
|
||||
.. note: This function will only run if the psutil_ Python extension
|
||||
is installed. Because psutil won't run with the PyPy interpreter,
|
||||
use of it is optional (although highly recommended).
|
||||
|
||||
.. _psutil: https://pypi.python.org/pypi/psutil
|
||||
|
||||
:returns: True if there exists a gpg-agent process running under the
|
||||
same effective user ID as that of this program. Otherwise,
|
||||
returns None.
|
||||
returns False.
|
||||
"""
|
||||
identity = psutil.Process(os.getpid()).uids
|
||||
if not psutil:
|
||||
return False
|
||||
|
||||
this_process = psutil.Process(os.getpid())
|
||||
ownership_match = False
|
||||
|
||||
if _util._running_windows:
|
||||
identity = this_process.username()
|
||||
else:
|
||||
identity = this_process.uids
|
||||
|
||||
for proc in psutil.process_iter():
|
||||
if (proc.name == "gpg-agent") and proc.is_running:
|
||||
log.debug("Found gpg-agent process with pid %d" % proc.pid)
|
||||
if proc.uids == identity:
|
||||
log.debug(
|
||||
"Effective UIDs of this process and gpg-agent match")
|
||||
setattr(cls, '_agent_proc', proc)
|
||||
return True
|
||||
if _util._running_windows:
|
||||
if proc.username() == identity:
|
||||
ownership_match = True
|
||||
else:
|
||||
if proc.uids == identity:
|
||||
ownership_match = True
|
||||
|
||||
if ownership_match:
|
||||
log.debug("Effective UIDs of this process and gpg-agent match")
|
||||
setattr(cls, '_agent_proc', proc)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
class GPGBase(object):
|
||||
|
@ -111,7 +149,7 @@ class GPGBase(object):
|
|||
|
||||
def __init__(self, binary=None, home=None, keyring=None, secring=None,
|
||||
use_agent=False, default_preference_list=None,
|
||||
verbose=False, options=None):
|
||||
ignore_homedir_permissions=False, verbose=False, options=None):
|
||||
"""Create a ``GPGBase``.
|
||||
|
||||
This class is used to set up properties for controlling the behaviour
|
||||
|
@ -134,13 +172,18 @@ class GPGBase(object):
|
|||
:ivar str secring: The filename in **homedir** to use as the keyring
|
||||
file for secret keys.
|
||||
"""
|
||||
self.ignore_homedir_permissions = ignore_homedir_permissions
|
||||
self.binary = _util._find_binary(binary)
|
||||
self.homedir = home if home else _util._conf
|
||||
self.homedir = os.path.expanduser(home) if home else _util._conf
|
||||
pub = _parsers._fix_unsafe(keyring) if keyring else 'pubring.gpg'
|
||||
sec = _parsers._fix_unsafe(secring) if secring else 'secring.gpg'
|
||||
self.keyring = os.path.join(self._homedir, pub)
|
||||
self.secring = os.path.join(self._homedir, sec)
|
||||
self.options = _parsers._sanitise(options) if options else None
|
||||
self.options = list(_parsers._sanitise_list(options)) if options else None
|
||||
|
||||
#: The version string of our GnuPG binary
|
||||
self.binary_version = '0.0.0'
|
||||
self.verbose = False
|
||||
|
||||
if default_preference_list:
|
||||
self._prefs = _check_preferences(default_preference_list, 'all')
|
||||
|
@ -155,6 +198,14 @@ class GPGBase(object):
|
|||
self._filesystemencoding = encodings.normalize_encoding(
|
||||
sys.getfilesystemencoding().lower())
|
||||
|
||||
# Issue #49: https://github.com/isislovecruft/python-gnupg/issues/49
|
||||
#
|
||||
# During `line = stream.readline()` in `_read_response()`, the Python
|
||||
# codecs module will choke on Unicode data, so we globally monkeypatch
|
||||
# the "strict" error handler to use the builtin `replace_errors`
|
||||
# handler:
|
||||
codecs.register_error('strict', codecs.replace_errors)
|
||||
|
||||
self._keyserver = 'hkp://wwwkeys.pgp.net'
|
||||
self.__generated_keys = os.path.join(self.homedir, 'generated-keys')
|
||||
|
||||
|
@ -164,18 +215,12 @@ class GPGBase(object):
|
|||
"'verbose' must be boolean, string, or 0 <= n <= 9"
|
||||
assert isinstance(use_agent, bool), "'use_agent' must be boolean"
|
||||
if self.options is not None:
|
||||
assert isinstance(self.options, str), "options not string"
|
||||
assert isinstance(self.options, list), "options not list"
|
||||
except (AssertionError, AttributeError) as ae:
|
||||
log.error("GPGBase.__init__(): %s" % str(ae))
|
||||
raise RuntimeError(str(ae))
|
||||
else:
|
||||
if verbose is True:
|
||||
# The caller wants logging, but we need a valid --debug-level
|
||||
# for gpg. Default to "basic", and warn about the ambiguity.
|
||||
# (garrettr)
|
||||
verbose = "basic"
|
||||
log.warning('GPG(verbose=True) is ambiguous, defaulting to "basic" logging')
|
||||
self.verbose = verbose
|
||||
self._set_verbose(verbose)
|
||||
self.use_agent = use_agent
|
||||
|
||||
if hasattr(self, '_agent_proc') \
|
||||
|
@ -183,6 +228,9 @@ class GPGBase(object):
|
|||
if hasattr(self, '__remove_path__'):
|
||||
self.__remove_path__('pinentry')
|
||||
|
||||
# Assign our self.binary_version attribute:
|
||||
self._check_sane_and_get_gpg_version()
|
||||
|
||||
def __remove_path__(self, prog=None, at_exit=True):
|
||||
"""Remove the directories containing a program from the system's
|
||||
``$PATH``. If ``GPGBase.binary`` is in a directory being removed, it
|
||||
|
@ -368,18 +416,21 @@ class GPGBase(object):
|
|||
log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd)
|
||||
_util._create_if_necessary(hd)
|
||||
|
||||
try:
|
||||
log.debug("GPGBase._homedir_setter(): checking permissions")
|
||||
assert _util._has_readwrite(hd), \
|
||||
"Homedir '%s' needs read/write permissions" % hd
|
||||
except AssertionError as ae:
|
||||
msg = ("Unable to set '%s' as GnuPG homedir" % directory)
|
||||
log.debug("GPGBase.homedir.setter(): %s" % msg)
|
||||
log.debug(str(ae))
|
||||
raise RuntimeError(str(ae))
|
||||
else:
|
||||
log.info("Setting homedir to '%s'" % hd)
|
||||
if self.ignore_homedir_permissions:
|
||||
self._homedir = hd
|
||||
else:
|
||||
try:
|
||||
log.debug("GPGBase._homedir_setter(): checking permissions")
|
||||
assert _util._has_readwrite(hd), \
|
||||
"Homedir '%s' needs read/write permissions" % hd
|
||||
except AssertionError as ae:
|
||||
msg = ("Unable to set '%s' as GnuPG homedir" % directory)
|
||||
log.debug("GPGBase.homedir.setter(): %s" % msg)
|
||||
log.debug(str(ae))
|
||||
raise RuntimeError(str(ae))
|
||||
else:
|
||||
log.info("Setting homedir to '%s'" % hd)
|
||||
self._homedir = hd
|
||||
|
||||
homedir = _util.InheritableProperty(_homedir_getter, _homedir_setter)
|
||||
|
||||
|
@ -436,6 +487,24 @@ class GPGBase(object):
|
|||
_generated_keys = _util.InheritableProperty(_generated_keys_getter,
|
||||
_generated_keys_setter)
|
||||
|
||||
def _check_sane_and_get_gpg_version(self):
|
||||
"""Check that everything runs alright, and grab the gpg binary's
|
||||
version number while we're at it, storing it as :data:`binary_version`.
|
||||
|
||||
:raises RuntimeError: if we cannot invoke the gpg binary.
|
||||
"""
|
||||
proc = self._open_subprocess(["--list-config", "--with-colons"])
|
||||
result = self._result_map['list'](self)
|
||||
self._read_data(proc.stdout, result)
|
||||
if proc.returncode:
|
||||
raise RuntimeError("Error invoking gpg: %s" % result.data)
|
||||
else:
|
||||
proc.terminate()
|
||||
|
||||
version_line = str(result.data).partition(':version:')[2]
|
||||
self.binary_version = version_line.split('\n')[0]
|
||||
log.debug("Using GnuPG version %s" % self.binary_version)
|
||||
|
||||
def _make_args(self, args, passphrase=False):
|
||||
"""Make a list of command line elements for GPG.
|
||||
|
||||
|
@ -470,21 +539,29 @@ class GPGBase(object):
|
|||
|
||||
if passphrase: cmd.append('--batch --passphrase-fd 0')
|
||||
|
||||
if self.use_agent: cmd.append('--use-agent')
|
||||
else: cmd.append('--no-use-agent')
|
||||
if self.use_agent is True: cmd.append('--use-agent')
|
||||
elif self.use_agent is False: cmd.append('--no-use-agent')
|
||||
|
||||
# The arguments for debugging and verbosity should be placed into the
|
||||
# cmd list before the options/args in order to resolve Issue #76:
|
||||
# https://github.com/isislovecruft/python-gnupg/issues/76
|
||||
if self.verbose:
|
||||
cmd.append('--debug-all')
|
||||
|
||||
if (isinstance(self.verbose, str) or
|
||||
(isinstance(self.verbose, int) and (self.verbose >= 1))):
|
||||
# GnuPG<=1.4.18 parses the `--debug-level` command in a way
|
||||
# that is incompatible with all other GnuPG versions. :'(
|
||||
if self.binary_version and (self.binary_version <= '1.4.18'):
|
||||
cmd.append('--debug-level=%s' % self.verbose)
|
||||
else:
|
||||
cmd.append('--debug-level %s' % self.verbose)
|
||||
|
||||
if self.options:
|
||||
[cmd.append(opt) for opt in iter(_sanitise_list(self.options))]
|
||||
if args:
|
||||
[cmd.append(arg) for arg in iter(_sanitise_list(args))]
|
||||
|
||||
if self.verbose:
|
||||
cmd.append('--debug-all')
|
||||
if ((isinstance(self.verbose, str) and
|
||||
self.verbose in ['basic', 'advanced', 'expert', 'guru'])
|
||||
or (isinstance(self.verbose, int) and (1<=self.verbose<=9))):
|
||||
cmd.append('--debug-level %s' % self.verbose)
|
||||
|
||||
return cmd
|
||||
|
||||
def _open_subprocess(self, args=None, passphrase=False):
|
||||
|
@ -592,6 +669,36 @@ class GPGBase(object):
|
|||
log.debug("Finishing reading from stream %r..." % stream.__repr__())
|
||||
log.debug("Read %4d bytes total" % len(result.data))
|
||||
|
||||
def _set_verbose(self, verbose):
|
||||
"""Check and set our :data:`verbose` attribute.
|
||||
The debug-level must be a string or an integer. If it is one of
|
||||
the allowed strings, GnuPG will translate it internally to it's
|
||||
corresponding integer level:
|
||||
|
||||
basic = 1-2
|
||||
advanced = 3-5
|
||||
expert = 6-8
|
||||
guru = 9+
|
||||
|
||||
If it's not one of the recognised string levels, then then
|
||||
entire argument is ignored by GnuPG. :(
|
||||
|
||||
To fix that stupid behaviour, if they wanted debugging but typo'd
|
||||
the string level (or specified ``verbose=True``), we'll default to
|
||||
'basic' logging.
|
||||
"""
|
||||
string_levels = ('basic', 'advanced', 'expert', 'guru')
|
||||
|
||||
if verbose is True:
|
||||
# The caller wants logging, but we need a valid --debug-level
|
||||
# for gpg. Default to "basic", and warn about the ambiguity.
|
||||
verbose = 'basic'
|
||||
|
||||
if (isinstance(verbose, str) and not (verbose in string_levels)):
|
||||
verbose = 'basic'
|
||||
|
||||
self.verbose = verbose
|
||||
|
||||
def _collect_output(self, process, result, writer=None, stdin=None):
|
||||
"""Drain the subprocesses output streams, writing the collected output
|
||||
to the result. If a writer thread (writing to the subprocess) is given,
|
||||
|
@ -699,6 +806,19 @@ class GPGBase(object):
|
|||
## We could use _handle_io here except for the fact that if the
|
||||
## passphrase is bad, gpg bails and you can't write the message.
|
||||
result = self._result_map['sign'](self)
|
||||
|
||||
## If the passphrase is an empty string, the message up to and
|
||||
## including its first newline will be cut off before making it to the
|
||||
## GnuPG process. Therefore, if the passphrase='' or passphrase=b'',
|
||||
## we set passphrase=None. See Issue #82:
|
||||
## https://github.com/isislovecruft/python-gnupg/issues/82
|
||||
if _util._is_string(passphrase):
|
||||
passphrase = passphrase if len(passphrase) > 0 else None
|
||||
elif _util._is_bytes(passphrase):
|
||||
passphrase = s(passphrase) if len(passphrase) > 0 else None
|
||||
else:
|
||||
passphrase = None
|
||||
|
||||
proc = self._open_subprocess(args, passphrase is not None)
|
||||
try:
|
||||
if passphrase:
|
||||
|
@ -718,6 +838,8 @@ class GPGBase(object):
|
|||
symmetric=False,
|
||||
always_trust=True,
|
||||
output=None,
|
||||
throw_keyids=False,
|
||||
hidden_recipients=None,
|
||||
cipher_algo='AES256',
|
||||
digest_algo='SHA512',
|
||||
compress_algo='ZLIB'):
|
||||
|
@ -790,6 +912,14 @@ class GPGBase(object):
|
|||
>>> decrypted
|
||||
'The crow flies at midnight.'
|
||||
|
||||
|
||||
:param bool throw_keyids: If True, make all **recipients** keyids be
|
||||
zero'd out in packet information. This is the same as using
|
||||
**hidden_recipients** for all **recipients**. (Default: False).
|
||||
|
||||
:param list hidden_recipients: A list of recipients that should have
|
||||
their keyids zero'd out in packet information.
|
||||
|
||||
:param str cipher_algo: The cipher algorithm to use. To see available
|
||||
algorithms with your version of GnuPG, do:
|
||||
:command:`$ gpg --with-colons --list-config
|
||||
|
@ -841,6 +971,7 @@ class GPGBase(object):
|
|||
## is decryptable with a passphrase or secretkey.
|
||||
if symmetric: args.append('--symmetric')
|
||||
if encrypt: args.append('--encrypt')
|
||||
if throw_keyids: args.append('--throw-keyids')
|
||||
|
||||
if len(recipients) >= 1:
|
||||
log.debug("GPG.encrypt() called for recipients '%s' with type '%s'"
|
||||
|
@ -856,39 +987,54 @@ class GPGBase(object):
|
|||
log.info("Can't accept recipient string: %s"
|
||||
% recp)
|
||||
else:
|
||||
args.append('--recipient %s' % str(recp))
|
||||
self._add_recipient_string(args, hidden_recipients, str(recp))
|
||||
continue
|
||||
## will give unicode in 2.x as '\uXXXX\uXXXX'
|
||||
args.append('--recipient %r' % recp)
|
||||
if isinstance(hidden_recipients, (list, tuple)):
|
||||
if [s for s in hidden_recipients if recp in str(s)]:
|
||||
args.append('--hidden-recipient %r' % recp)
|
||||
else:
|
||||
args.append('--recipient %r' % recp)
|
||||
else:
|
||||
args.append('--recipient %r' % recp)
|
||||
continue
|
||||
if isinstance(recp, str):
|
||||
args.append('--recipient %s' % recp)
|
||||
self._add_recipient_string(args, hidden_recipients, recp)
|
||||
|
||||
elif (not _util._py3k) and isinstance(recp, basestring):
|
||||
for recp in recipients.split('\x20'):
|
||||
args.append('--recipient %s' % recp)
|
||||
self._add_recipient_string(args, hidden_recipients, recp)
|
||||
|
||||
elif _util._py3k and isinstance(recp, str):
|
||||
for recp in recipients.split(' '):
|
||||
args.append('--recipient %s' % recp)
|
||||
self._add_recipient_string(args, hidden_recipients, recp)
|
||||
## ...and now that we've proven py3k is better...
|
||||
|
||||
else:
|
||||
log.debug("Don't know what to do with recipients: '%s'"
|
||||
log.debug("Don't know what to do with recipients: %r"
|
||||
% recipients)
|
||||
|
||||
result = self._result_map['crypt'](self)
|
||||
log.debug("Got data '%s' with type '%s'."
|
||||
% (data, type(data)))
|
||||
self._handle_io(args, data, result,
|
||||
passphrase=passphrase, binary=True)
|
||||
log.debug("\n%s" % result.data)
|
||||
log.debug("Got data '%s' with type '%s'." % (data, type(data)))
|
||||
self._handle_io(args, data, result, passphrase=passphrase, binary=True)
|
||||
# Avoid writing raw encrypted bytes to terminal loggers and breaking
|
||||
# them in that adorable way where they spew hieroglyphics until reset:
|
||||
if armor:
|
||||
log.debug("\n%s" % result.data)
|
||||
|
||||
if output_filename:
|
||||
log.info("Writing encrypted output to file: %s" % output_filename)
|
||||
with open(output_filename, 'w+') as fh:
|
||||
with open(output_filename, 'wb') as fh:
|
||||
fh.write(result.data)
|
||||
fh.flush()
|
||||
log.info("Encrypted output written successfully.")
|
||||
|
||||
return result
|
||||
|
||||
def _add_recipient_string(self, args, hidden_recipients, recipient):
|
||||
if isinstance(hidden_recipients, (list, tuple)):
|
||||
if [s for s in hidden_recipients if recipient in str(s)]:
|
||||
args.append('--hidden-recipient %s' % recipient)
|
||||
else:
|
||||
args.append('--recipient %s' % recipient)
|
||||
else:
|
||||
args.append('--recipient %s' % recipient)
|
||||
|
|
|
@ -367,7 +367,7 @@ def _sanitise(*args):
|
|||
checked += (val + " ")
|
||||
log.debug("_check_option(): No checks for %s" % val)
|
||||
|
||||
return checked
|
||||
return checked.rstrip(' ')
|
||||
|
||||
is_flag = lambda x: x.startswith('--')
|
||||
|
||||
|
@ -475,6 +475,8 @@ def _get_options_group(group=None):
|
|||
'--export-secret-subkeys',
|
||||
'--fingerprint',
|
||||
'--gen-revoke',
|
||||
'--hidden-encrypt-to',
|
||||
'--hidden-recipient',
|
||||
'--list-key',
|
||||
'--list-keys',
|
||||
'--list-public-keys',
|
||||
|
@ -514,6 +516,7 @@ def _get_options_group(group=None):
|
|||
'--import',
|
||||
'--verify',
|
||||
'--verify-files',
|
||||
'--output',
|
||||
])
|
||||
#: These options expect a string. see :func:`_check_preferences`.
|
||||
pref_options = frozenset(['--digest-algo',
|
||||
|
@ -555,6 +558,9 @@ def _get_options_group(group=None):
|
|||
'--list-public-keys',
|
||||
'--list-secret-keys',
|
||||
'--list-sigs',
|
||||
'--lock-multiple',
|
||||
'--lock-never',
|
||||
'--lock-once',
|
||||
'--no-default-keyring',
|
||||
'--no-default-recipient',
|
||||
'--no-emit-version',
|
||||
|
@ -566,6 +572,7 @@ def _get_options_group(group=None):
|
|||
'--quiet',
|
||||
'--sign',
|
||||
'--symmetric',
|
||||
'--throw-keyids',
|
||||
'--use-agent',
|
||||
'--verbose',
|
||||
'--version',
|
||||
|
@ -905,6 +912,7 @@ class Sign(object):
|
|||
timestamp = None
|
||||
#: xxx fill me in
|
||||
what = None
|
||||
status = None
|
||||
|
||||
def __init__(self, gpg):
|
||||
self._gpg = gpg
|
||||
|
@ -927,9 +935,9 @@ class Sign(object):
|
|||
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
|
||||
"""
|
||||
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
|
||||
"GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL",
|
||||
"INV_SGNR", "SIGEXPIRED"):
|
||||
pass
|
||||
"GOOD_PASSPHRASE", "MISSING_PASSPHRASE", "PINENTRY_LAUNCHED",
|
||||
"BEGIN_SIGNING", "CARDCTRL", "INV_SGNR", "SIGEXPIRED"):
|
||||
self.status = key.replace("_", " ").lower()
|
||||
elif key == "SIG_CREATED":
|
||||
(self.sig_type, self.sig_algo, self.sig_hash_algo,
|
||||
self.what, self.timestamp, self.fingerprint) = value.split()
|
||||
|
@ -946,6 +954,7 @@ class Sign(object):
|
|||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
|
||||
class ListKeys(list):
|
||||
"""Handle status messages for --list-keys.
|
||||
|
||||
|
@ -956,7 +965,6 @@ class ListKeys(list):
|
|||
| crs = X.509 certificate and private key available
|
||||
| ssb = secret subkey (secondary key)
|
||||
| uat = user attribute (same as user id except for field 10).
|
||||
| sig = signature
|
||||
| rev = revocation signature
|
||||
| pkd = public key data (special field format, see below)
|
||||
| grp = reserved for gpgsm
|
||||
|
@ -967,8 +975,10 @@ class ListKeys(list):
|
|||
super(ListKeys, self).__init__()
|
||||
self._gpg = gpg
|
||||
self.curkey = None
|
||||
self.curuid = None
|
||||
self.fingerprints = []
|
||||
self.uids = []
|
||||
self.sigs = {}
|
||||
|
||||
def key(self, args):
|
||||
vars = ("""
|
||||
|
@ -978,8 +988,12 @@ class ListKeys(list):
|
|||
for i in range(len(vars)):
|
||||
self.curkey[vars[i]] = args[i]
|
||||
self.curkey['uids'] = []
|
||||
self.curkey['sigs'] = {}
|
||||
if self.curkey['uid']:
|
||||
self.curkey['uids'].append(self.curkey['uid'])
|
||||
self.curuid = self.curkey['uid']
|
||||
self.curkey['uids'].append(self.curuid)
|
||||
self.sigs[self.curuid] = set()
|
||||
self.curkey['sigs'][self.curuid] = []
|
||||
del self.curkey['uid']
|
||||
self.curkey['subkeys'] = []
|
||||
self.append(self.curkey)
|
||||
|
@ -994,8 +1008,21 @@ class ListKeys(list):
|
|||
uid = args[9]
|
||||
uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
|
||||
self.curkey['uids'].append(uid)
|
||||
self.curuid = uid
|
||||
self.curkey['sigs'][uid] = []
|
||||
self.sigs[uid] = set()
|
||||
self.uids.append(uid)
|
||||
|
||||
def sig(self, args):
|
||||
vars = ("""
|
||||
type trust length algo keyid date expires dummy ownertrust uid
|
||||
""").split()
|
||||
sig = {}
|
||||
for i in range(len(vars)):
|
||||
sig[vars[i]] = args[i]
|
||||
self.curkey['sigs'][self.curuid].append(sig)
|
||||
self.sigs[self.curuid].add(sig['keyid'])
|
||||
|
||||
def sub(self, args):
|
||||
subkey = [args[4], args[11]]
|
||||
self.curkey['subkeys'].append(subkey)
|
||||
|
@ -1005,42 +1032,52 @@ class ListKeys(list):
|
|||
|
||||
|
||||
class ImportResult(object):
|
||||
"""Parse GnuPG status messages for key import operations.
|
||||
|
||||
:type gpg: :class:`gnupg.GPG`
|
||||
:param gpg: An instance of :class:`gnupg.GPG`.
|
||||
"""
|
||||
_ok_reason = {'0': 'Not actually changed',
|
||||
'1': 'Entirely new key',
|
||||
'2': 'New user IDs',
|
||||
'4': 'New signatures',
|
||||
'8': 'New subkeys',
|
||||
'16': 'Contains private key',
|
||||
'17': 'Contains private key',}
|
||||
|
||||
_problem_reason = { '0': 'No specific reason given',
|
||||
'1': 'Invalid Certificate',
|
||||
'2': 'Issuer Certificate missing',
|
||||
'3': 'Certificate Chain too long',
|
||||
'4': 'Error storing certificate', }
|
||||
|
||||
_fields = '''count no_user_id imported imported_rsa unchanged
|
||||
n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups
|
||||
not_imported'''.split()
|
||||
_counts = OrderedDict(
|
||||
zip(_fields, [int(0) for x in range(len(_fields))]) )
|
||||
|
||||
#: A list of strings containing the fingerprints of the GnuPG keyIDs
|
||||
#: imported.
|
||||
fingerprints = list()
|
||||
|
||||
#: A list containing dictionaries with information gathered on keys
|
||||
#: imported.
|
||||
results = list()
|
||||
"""Parse GnuPG status messages for key import operations."""
|
||||
|
||||
def __init__(self, gpg):
|
||||
"""Start parsing the results of a key import operation.
|
||||
|
||||
:type gpg: :class:`gnupg.GPG`
|
||||
:param gpg: An instance of :class:`gnupg.GPG`.
|
||||
"""
|
||||
self._gpg = gpg
|
||||
self.counts = self._counts
|
||||
|
||||
#: A map from GnuPG codes shown with the ``IMPORT_OK`` status message
|
||||
#: to their human-meaningful English equivalents.
|
||||
self._ok_reason = {'0': 'Not actually changed',
|
||||
'1': 'Entirely new key',
|
||||
'2': 'New user IDs',
|
||||
'4': 'New signatures',
|
||||
'8': 'New subkeys',
|
||||
'16': 'Contains private key',
|
||||
'17': 'Contains private key',}
|
||||
|
||||
#: A map from GnuPG codes shown with the ``IMPORT_PROBLEM`` status
|
||||
#: message to their human-meaningful English equivalents.
|
||||
self._problem_reason = { '0': 'No specific reason given',
|
||||
'1': 'Invalid Certificate',
|
||||
'2': 'Issuer Certificate missing',
|
||||
'3': 'Certificate Chain too long',
|
||||
'4': 'Error storing certificate', }
|
||||
|
||||
#: All the possible status messages pertaining to actions taken while
|
||||
#: importing a key.
|
||||
self._fields = '''count no_user_id imported imported_rsa unchanged
|
||||
n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups
|
||||
not_imported'''.split()
|
||||
|
||||
#: Counts of all the status message results, :data:`_fields` which
|
||||
#: have appeared.
|
||||
self.counts = OrderedDict(
|
||||
zip(self._fields, [int(0) for x in range(len(self._fields))]))
|
||||
|
||||
#: A list of strings containing the fingerprints of the GnuPG keyIDs
|
||||
#: imported.
|
||||
self.fingerprints = list()
|
||||
|
||||
#: A list containing dictionaries with information gathered on keys
|
||||
#: imported.
|
||||
self.results = list()
|
||||
|
||||
def __nonzero__(self):
|
||||
"""Override the determination for truthfulness evaluation.
|
||||
|
@ -1048,7 +1085,7 @@ class ImportResult(object):
|
|||
:rtype: bool
|
||||
:returns: True if we have immport some keys, False otherwise.
|
||||
"""
|
||||
if self.counts.not_imported > 0: return False
|
||||
if self.counts['not_imported'] > 0: return False
|
||||
if len(self.fingerprints) == 0: return False
|
||||
return True
|
||||
__bool__ = __nonzero__
|
||||
|
@ -1056,7 +1093,7 @@ class ImportResult(object):
|
|||
def _handle_status(self, key, value):
|
||||
"""Parse a status code from the attached GnuPG process.
|
||||
|
||||
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
|
||||
:raises ValueError: if the status message is unknown.
|
||||
"""
|
||||
if key == "IMPORTED":
|
||||
# this duplicates info we already see in import_ok & import_problem
|
||||
|
@ -1189,6 +1226,37 @@ class Verify(object):
|
|||
self.trust_level = None
|
||||
#: The string corresponding to the ``trust_level`` number.
|
||||
self.trust_text = None
|
||||
#: The subpackets. These are stored as a dictionary, in the following
|
||||
#: form:
|
||||
#: Verify.subpackets = {'SUBPACKET_NUMBER': {'flags': FLAGS,
|
||||
#: 'length': LENGTH,
|
||||
#: 'data': DATA},
|
||||
#: 'ANOTHER_SUBPACKET_NUMBER': {...}}
|
||||
self.subpackets = {}
|
||||
#: The signature or key notations. These are also stored as a
|
||||
#: dictionary, in the following form:
|
||||
#:
|
||||
#: Verify.notations = {NOTATION_NAME: NOTATION_DATA}
|
||||
#:
|
||||
#: For example, the Bitcoin core developer, Peter Todd, encodes in
|
||||
#: every signature the header of the latest block on the Bitcoin
|
||||
#: blockchain (to prove that a GnuPG signature that Peter made was made
|
||||
#: *after* a specific point in time). These look like:
|
||||
#:
|
||||
#: gpg: Signature notation: blockhash@bitcoin.org=000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a
|
||||
#:
|
||||
#: Which python-gnupg would store as:
|
||||
#:
|
||||
#: Verify.notations['blockhash@bitcoin.org'] = '000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a'
|
||||
self.notations = {}
|
||||
|
||||
#: This will be a str or None. If not None, it is the last
|
||||
#: ``NOTATION_NAME`` we stored in the ``notations`` dict. Because we're
|
||||
#: not assured that a ``NOTATION_DATA`` status will arrive *immediately*
|
||||
#: after its corresponding ``NOTATION_NAME``, we store the latest
|
||||
#: ``NOTATION_NAME`` here until we get its corresponding
|
||||
#: ``NOTATION_DATA``.
|
||||
self._last_notation_name = None
|
||||
|
||||
def __nonzero__(self):
|
||||
"""Override the determination for truthfulness evaluation.
|
||||
|
@ -1209,7 +1277,8 @@ class Verify(object):
|
|||
self.trust_level = self.TRUST_LEVELS[key]
|
||||
elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT",
|
||||
"PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
|
||||
"DECRYPTION_OKAY", "INV_SGNR"):
|
||||
"DECRYPTION_OKAY", "INV_SGNR", "PROGRESS",
|
||||
"PINENTRY_LAUNCHED"):
|
||||
pass
|
||||
elif key == "BADSIG":
|
||||
self.valid = False
|
||||
|
@ -1220,6 +1289,7 @@ class Verify(object):
|
|||
self.status = 'signature good'
|
||||
self.key_id, self.username = value.split(None, 1)
|
||||
elif key == "VALIDSIG":
|
||||
self.valid = True
|
||||
(self.fingerprint,
|
||||
self.creation_date,
|
||||
self.sig_timestamp,
|
||||
|
@ -1245,17 +1315,106 @@ class Verify(object):
|
|||
self.valid = False
|
||||
self.key_id = value
|
||||
self.status = 'no public key'
|
||||
# These are useless in Verify, since they are spit out for any
|
||||
# pub/subkeys on the key, not just the one doing the signing.
|
||||
# if we want to check for signatures make with expired key,
|
||||
# the relevant flags are REVKEYSIG and KEYREVOKED.
|
||||
elif key in ("KEYEXPIRED", "SIGEXPIRED"):
|
||||
# these are useless in verify, since they are spit out for any
|
||||
# pub/subkeys on the key, not just the one doing the signing.
|
||||
# if we want to check for signatures with expired key,
|
||||
# the relevant flag is EXPKEYSIG.
|
||||
pass
|
||||
# The signature has an expiration date which has already passed
|
||||
# (EXPKEYSIG), or the signature has been revoked (REVKEYSIG):
|
||||
elif key in ("EXPKEYSIG", "REVKEYSIG"):
|
||||
# signed with expired or revoked key
|
||||
self.valid = False
|
||||
self.key_id = value.split()[0]
|
||||
self.status = (('%s %s') % (key[:3], key[3:])).lower()
|
||||
# This is super annoying, and bad design on the part of GnuPG, in my
|
||||
# opinion.
|
||||
#
|
||||
# This flag can get triggered if a valid signature is made, and then
|
||||
# later the key (or subkey) which created the signature is
|
||||
# revoked. When this happens, GnuPG will output:
|
||||
#
|
||||
# REVKEYSIG 075BFD18B365D34C Test Expired Key <test@python-gnupg.git>
|
||||
# VALIDSIG DAB69B05F591640B7F4DCBEA075BFD18B365D34C 2014-09-26 1411700539 0 4 0 1 2 00 4BA800F77452A6C29447FF20F4AF76ACBBE22CE2
|
||||
# KEYREVOKED
|
||||
#
|
||||
# Meaning that we have a timestamp for when the signature was created,
|
||||
# and we know that the signature is valid, but since GnuPG gives us no
|
||||
# timestamp for when the key was revoked... we have no ability to
|
||||
# determine if the valid signature was made *before* the signing key
|
||||
# was revoked or *after*. Meaning that if you are like me and you sign
|
||||
# all your software releases and git commits, and you also practice
|
||||
# good opsec by doing regular key rotations, your old signatures made
|
||||
# by your expired/revoked keys (even though they were created when the
|
||||
# key was still good) are considered bad because GnuPG is a
|
||||
# braindamaged piece of shit.
|
||||
#
|
||||
# Software engineering, motherfuckers, DO YOU SPEAK IT?
|
||||
#
|
||||
# The signing key which created the signature has since been revoked
|
||||
# (KEYREVOKED), and we're going to ignore it (but add something to the
|
||||
# status message):
|
||||
elif key in ("KEYREVOKED"):
|
||||
self.status = '\n'.join([self.status, "key revoked"])
|
||||
# SIG_SUBPACKET <type> <flags> <len> <data>
|
||||
# This indicates that a signature subpacket was seen. The format is
|
||||
# the same as the "spk" record above.
|
||||
#
|
||||
# [...]
|
||||
#
|
||||
# SPK - Signature subpacket records
|
||||
#
|
||||
# - Field 2 :: Subpacket number as per RFC-4880 and later.
|
||||
# - Field 3 :: Flags in hex. Currently the only two bits assigned
|
||||
# are 1, to indicate that the subpacket came from the
|
||||
# hashed part of the signature, and 2, to indicate the
|
||||
# subpacket was marked critical.
|
||||
# - Field 4 :: Length of the subpacket. Note that this is the
|
||||
# length of the subpacket, and not the length of field
|
||||
# 5 below. Due to the need for %-encoding, the length
|
||||
# of field 5 may be up to 3x this value.
|
||||
# - Field 5 :: The subpacket data. Printable ASCII is shown as
|
||||
# ASCII, but other values are rendered as %XX where XX
|
||||
# is the hex value for the byte.
|
||||
elif key in ("SIG_SUBPACKET"):
|
||||
fields = value.split()
|
||||
try:
|
||||
subpacket_number = fields[0]
|
||||
self.subpackets[subpacket_number] = {'flags': None,
|
||||
'length': None,
|
||||
'data': None}
|
||||
except IndexError:
|
||||
# We couldn't parse the subpacket type (an RFC4880
|
||||
# identifier), so we shouldn't continue parsing.
|
||||
pass
|
||||
else:
|
||||
# Pull as much data as we can parse out of the subpacket:
|
||||
try:
|
||||
self.subpackets[subpacket_number]['flags'] = fields[1]
|
||||
self.subpackets[subpacket_number]['length'] = fields[2]
|
||||
self.subpackets[subpacket_number]['data'] = fields[3]
|
||||
except IndexError:
|
||||
pass
|
||||
# NOTATION_
|
||||
# There are actually two related status codes to convey notation
|
||||
# data:
|
||||
#
|
||||
# - NOTATION_NAME <name>
|
||||
# - NOTATION_DATA <string>
|
||||
#
|
||||
# <name> and <string> are %XX escaped; the data may be split among
|
||||
# several NOTATION_DATA lines.
|
||||
elif key.startswith("NOTATION_"):
|
||||
if key.endswith("NAME"):
|
||||
self.notations[value] = str()
|
||||
self._last_notation_name = value
|
||||
elif key.endswith("DATA"):
|
||||
if self._last_notation_name is not None:
|
||||
# Append the NOTATION_DATA to any previous data we
|
||||
# received for that NOTATION_NAME:
|
||||
self.notations[self._last_notation_name] += value
|
||||
else:
|
||||
pass
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
|
@ -1360,26 +1519,33 @@ class ListPackets(object):
|
|||
self.need_passphrase_sym = None
|
||||
#: The keyid and uid which this data is encrypted to.
|
||||
self.userid_hint = None
|
||||
#: The first key that we detected that a message was encrypted
|
||||
#: to. This is provided for backwards compatibility. As of Issue #77_,
|
||||
#: the ``encrypted_to`` attribute should be used instead.
|
||||
self.key = None
|
||||
#: A list of keyid's that the message has been encrypted to.
|
||||
self.encrypted_to = []
|
||||
|
||||
def _handle_status(self, key, value):
|
||||
"""Parse a status code from the attached GnuPG process.
|
||||
|
||||
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
|
||||
"""
|
||||
if key == 'NODATA':
|
||||
if key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED',
|
||||
'END_DECRYPTION', 'GOOD_PASSPHRASE', 'BAD_PASSPHRASE'):
|
||||
pass
|
||||
elif key == 'NODATA':
|
||||
self.status = nodata(value)
|
||||
elif key == 'ENC_TO':
|
||||
# This will only capture keys in our keyring. In the future we
|
||||
# may want to include multiple unknown keys in this list.
|
||||
self.key, _, _ = value.split()
|
||||
elif key == 'NEED_PASSPHRASE':
|
||||
key, _, _ = value.split()
|
||||
if not self.key:
|
||||
self.key = key
|
||||
self.encrypted_to.append(key)
|
||||
elif key == ('NEED_PASSPHRASE', 'MISSING_PASSPHRASE'):
|
||||
self.need_passphrase = True
|
||||
elif key == 'NEED_PASSPHRASE_SYM':
|
||||
self.need_passphrase_sym = True
|
||||
elif key == 'USERID_HINT':
|
||||
self.userid_hint = value.strip().split()
|
||||
elif key in ('NO_SECKEY', 'BEGIN_DECRYPTION', 'DECRYPTION_FAILED',
|
||||
'END_DECRYPTION'):
|
||||
pass
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
|
|
@ -57,7 +57,7 @@ def export_ownertrust(cls, trustdb=None):
|
|||
except (OSError, IOError) as err:
|
||||
log.debug(str(err))
|
||||
|
||||
export_proc = cls._open_subprocess('--export-ownertrust')
|
||||
export_proc = cls._open_subprocess(['--export-ownertrust'])
|
||||
tdb = open(trustdb, 'wb')
|
||||
_util._threaded_copy_data(export_proc.stdout, tdb)
|
||||
|
||||
|
@ -71,7 +71,7 @@ def import_ownertrust(self, trustdb=None):
|
|||
if trustdb is None:
|
||||
trustdb = os.path.join(cls.homedir, 'trustdb.gpg')
|
||||
|
||||
import_proc = cls._open_subprocess('--import-ownertrust')
|
||||
import_proc = cls._open_subprocess(['--import-ownertrust'])
|
||||
tdb = open(trustdb, 'rb')
|
||||
_util._threaded_copy_data(tdb, import_proc.stdin)
|
||||
|
||||
|
@ -98,6 +98,6 @@ def fix_trustdb(cls, trustdb=None):
|
|||
"""
|
||||
if trustdb is None:
|
||||
trustdb = os.path.join(cls.homedir, 'trustdb.gpg')
|
||||
export_proc = cls._open_subprocess('--export-ownertrust')
|
||||
import_proc = cls._open_subprocess('--import-ownertrust')
|
||||
export_proc = cls._open_subprocess(['--export-ownertrust'])
|
||||
import_proc = cls._open_subprocess(['--import-ownertrust'])
|
||||
_util._threaded_copy_data(export_proc.stdout, import_proc.stdin)
|
||||
|
|
270
gnupg/_util.py
270
gnupg/_util.py
|
@ -28,18 +28,58 @@ from time import mktime
|
|||
import codecs
|
||||
import encodings
|
||||
import os
|
||||
import psutil
|
||||
import threading
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import sys
|
||||
|
||||
# These are all the classes which are stream-like; they are used in
|
||||
# :func:`_is_stream`.
|
||||
_STREAMLIKE_TYPES = []
|
||||
|
||||
# These StringIO classes are actually utilised.
|
||||
try:
|
||||
import io
|
||||
from io import StringIO
|
||||
from io import BytesIO
|
||||
except ImportError:
|
||||
from cStringIO import StringIO
|
||||
else:
|
||||
# The io.IOBase type covers the above example for an open file handle in
|
||||
# Python3, as well as both io.BytesIO and io.StringIO.
|
||||
_STREAMLIKE_TYPES.append(io.IOBase)
|
||||
|
||||
# The remaining StringIO classes which are imported are used to determine if a
|
||||
# object is a stream-like in :func:`_is_stream`.
|
||||
if 2 == sys.version_info[0]:
|
||||
# Import the StringIO class from the StringIO module since it is a
|
||||
# commonly used stream class. It is distinct from either of the
|
||||
# StringIO's that may be loaded in the above try/except clause, so the
|
||||
# name is prefixed with an underscore to distinguish it.
|
||||
from StringIO import StringIO as _StringIO_StringIO
|
||||
_STREAMLIKE_TYPES.append(_StringIO_StringIO)
|
||||
|
||||
# Import the cStringIO module to test for the cStringIO stream types,
|
||||
# InputType and OutputType. See
|
||||
# http://stackoverflow.com/questions/14735295/to-check-an-instance-is-stringio
|
||||
import cStringIO as _cStringIO
|
||||
_STREAMLIKE_TYPES.append(_cStringIO.InputType)
|
||||
_STREAMLIKE_TYPES.append(_cStringIO.OutputType)
|
||||
|
||||
# In Python2:
|
||||
#
|
||||
# >>> type(open('README.md', 'rb'))
|
||||
# <open file 'README.md', mode 'rb' at 0x7f9493951d20>
|
||||
#
|
||||
# whereas, in Python3, the `file` builtin doesn't exist and instead we get:
|
||||
#
|
||||
# >>> type(open('README.md', 'rb'))
|
||||
# <_io.BufferedReader name='README.md'>
|
||||
#
|
||||
# which is covered by the above addition of io.IOBase.
|
||||
_STREAMLIKE_TYPES.append(file)
|
||||
|
||||
|
||||
from . import _logger
|
||||
|
||||
|
@ -56,6 +96,9 @@ try:
|
|||
except NameError:
|
||||
_py3k = True
|
||||
|
||||
_running_windows = False
|
||||
if "win" in sys.platform:
|
||||
_running_windows = True
|
||||
|
||||
## Directory shortcuts:
|
||||
## we don't want to use this one because it writes to the install dir:
|
||||
|
@ -63,6 +106,20 @@ except NameError:
|
|||
_here = os.path.join(os.getcwd(), 'gnupg') ## current dir
|
||||
_test = os.path.join(os.path.join(_here, 'test'), 'tmp') ## ./tests/tmp
|
||||
_user = os.environ.get('HOME') ## $HOME
|
||||
|
||||
# Fix for Issue #74: we shouldn't expect that a $HOME directory is set in all
|
||||
# environs. https://github.com/isislovecruft/python-gnupg/issues/74
|
||||
if not _user:
|
||||
_user = '/tmp/python-gnupg'
|
||||
try:
|
||||
os.makedirs(_user)
|
||||
except (OSError, IOError):
|
||||
_user = os.getcwd()
|
||||
# If we can't use $HOME, but we have (or can create) a
|
||||
# /tmp/python-gnupg/gnupghome directory, then we'll default to using
|
||||
# that. Otherwise, we'll use the current directory + /gnupghome.
|
||||
_user = os.path.sep.join([_user, 'gnupghome'])
|
||||
|
||||
_ugpg = os.path.join(_user, '.gnupg') ## $HOME/.gnupg
|
||||
_conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg')
|
||||
## $HOME/.config/python-gnupg
|
||||
|
@ -70,6 +127,9 @@ _conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg')
|
|||
## Logger is disabled by default
|
||||
log = _logger.create_logger(0)
|
||||
|
||||
#: Compiled regex for determining a GnuPG binary's version:
|
||||
_VERSION_STRING_REGEX = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*')
|
||||
|
||||
|
||||
def find_encodings(enc=None, system=False):
|
||||
"""Find functions for encoding translations for a specific codec.
|
||||
|
@ -105,6 +165,51 @@ def find_encodings(enc=None, system=False):
|
|||
|
||||
return coder
|
||||
|
||||
|
||||
if _py3k:
|
||||
def b(x):
|
||||
"""See http://python3porting.com/problems.html#nicer-solutions"""
|
||||
coder = find_encodings()
|
||||
if isinstance(x, bytes):
|
||||
return coder.encode(x.decode(coder.name))[0]
|
||||
else:
|
||||
return coder.encode(x)[0]
|
||||
|
||||
def s(x):
|
||||
if isinstance(x, str):
|
||||
return x
|
||||
elif isinstance(x, (bytes, bytearray)):
|
||||
return x.decode(find_encodings().name)
|
||||
else:
|
||||
raise NotImplemented
|
||||
else:
|
||||
def b(x):
|
||||
"""See http://python3porting.com/problems.html#nicer-solutions"""
|
||||
return x
|
||||
|
||||
def s(x):
|
||||
if isinstance(x, basestring):
|
||||
return x
|
||||
elif isinstance(x, (bytes, bytearray)):
|
||||
return x.decode(find_encodings().name)
|
||||
else:
|
||||
raise NotImplemented
|
||||
|
||||
def binary(data):
|
||||
coder = find_encodings()
|
||||
|
||||
if _py3k and isinstance(data, bytes):
|
||||
encoded = coder.encode(data.decode(coder.name))[0]
|
||||
elif _py3k and isinstance(data, str):
|
||||
encoded = coder.encode(data)[0]
|
||||
elif not _py3k and type(data) is not str:
|
||||
encoded = coder.encode(data)[0]
|
||||
else:
|
||||
encoded = data
|
||||
|
||||
return encoded
|
||||
|
||||
|
||||
def author_info(name, contact=None, public_key=None):
|
||||
"""Easy object-oriented representation of contributor info.
|
||||
|
||||
|
@ -124,8 +229,6 @@ def _copy_data(instream, outstream):
|
|||
"""
|
||||
sent = 0
|
||||
|
||||
coder = find_encodings()
|
||||
|
||||
while True:
|
||||
if ((_py3k and isinstance(instream, str)) or
|
||||
(not _py3k and isinstance(instream, basestring))):
|
||||
|
@ -135,24 +238,64 @@ def _copy_data(instream, outstream):
|
|||
data = instream.read(1024)
|
||||
if len(data) == 0:
|
||||
break
|
||||
|
||||
sent += len(data)
|
||||
log.debug("Sending chunk %d bytes:\n%s"
|
||||
% (sent, data))
|
||||
try:
|
||||
outstream.write(data)
|
||||
except UnicodeError:
|
||||
encoded = binary(data)
|
||||
log.debug("Sending %d bytes of data..." % sent)
|
||||
log.debug("Encoded data (type %s):\n%s" % (type(encoded), encoded))
|
||||
|
||||
if not _py3k:
|
||||
try:
|
||||
outstream.write(coder.encode(data))
|
||||
except IOError:
|
||||
log.exception("Error sending data: Broken pipe")
|
||||
outstream.write(encoded)
|
||||
except IOError as ioe:
|
||||
# Can get 'broken pipe' errors even when all data was sent
|
||||
if 'Broken pipe' in str(ioe):
|
||||
log.error('Error sending data: Broken pipe')
|
||||
else:
|
||||
log.exception(ioe)
|
||||
break
|
||||
except IOError as ioe:
|
||||
# Can get 'broken pipe' errors even when all data was sent
|
||||
if 'Broken pipe' in str(ioe):
|
||||
log.error('Error sending data: Broken pipe')
|
||||
else:
|
||||
log.exception(ioe)
|
||||
break
|
||||
log.debug("Wrote data type <type 'str'> to outstream.")
|
||||
else:
|
||||
try:
|
||||
outstream.write(bytes(encoded))
|
||||
except TypeError as te:
|
||||
# XXX FIXME This appears to happen because
|
||||
# _threaded_copy_data() sometimes passes the `outstream` as an
|
||||
# object with type <_io.BufferredWriter> and at other times
|
||||
# with type <encodings.utf_8.StreamWriter>. We hit the
|
||||
# following error when the `outstream` has type
|
||||
# <encodings.utf_8.StreamWriter>.
|
||||
if not "convert 'bytes' object to str implicitly" in str(te):
|
||||
log.error(str(te))
|
||||
try:
|
||||
outstream.write(encoded.decode())
|
||||
except TypeError as yate:
|
||||
# We hit the "'str' does not support the buffer interface"
|
||||
# error in Python3 when the `outstream` is an io.BytesIO and
|
||||
# we try to write a str to it. We don't care about that
|
||||
# error, we'll just try again with bytes.
|
||||
if not "does not support the buffer interface" in str(yate):
|
||||
log.error(str(yate))
|
||||
except IOError as ioe:
|
||||
# Can get 'broken pipe' errors even when all data was sent
|
||||
if 'Broken pipe' in str(ioe):
|
||||
log.error('Error sending data: Broken pipe')
|
||||
else:
|
||||
log.exception(ioe)
|
||||
break
|
||||
else:
|
||||
log.debug("Wrote data type <class 'str'> outstream.")
|
||||
except IOError as ioe:
|
||||
# Can get 'broken pipe' errors even when all data was sent
|
||||
if 'Broken pipe' in str(ioe):
|
||||
log.error('Error sending data: Broken pipe')
|
||||
else:
|
||||
log.exception(ioe)
|
||||
break
|
||||
else:
|
||||
log.debug("Wrote data type <class 'bytes'> to outstream.")
|
||||
|
||||
try:
|
||||
outstream.close()
|
||||
except IOError as ioe:
|
||||
|
@ -260,6 +403,8 @@ def _find_binary(binary=None):
|
|||
"""
|
||||
found = None
|
||||
if binary is not None:
|
||||
if os.path.isabs(binary) and os.path.isfile(binary):
|
||||
return binary
|
||||
if not os.path.isabs(binary):
|
||||
try:
|
||||
found = _which(binary)
|
||||
|
@ -272,7 +417,7 @@ def _find_binary(binary=None):
|
|||
elif os.access(binary, os.X_OK):
|
||||
found = binary
|
||||
if found is None:
|
||||
try: found = _which('gpg')[0]
|
||||
try: found = _which('gpg', abspath_only=True, disallow_symlinks=True)[0]
|
||||
except IndexError as ie:
|
||||
log.error("Could not find binary for 'gpg'.")
|
||||
try: found = _which('gpg2')[0]
|
||||
|
@ -281,14 +426,7 @@ def _find_binary(binary=None):
|
|||
if found is None:
|
||||
raise RuntimeError("GnuPG is not installed!")
|
||||
|
||||
try:
|
||||
assert os.path.isabs(found), "Path to gpg binary not absolute"
|
||||
assert not os.path.islink(found), "Path to gpg binary is symlink"
|
||||
assert os.access(found, os.X_OK), "Lacking +x perms for gpg binary"
|
||||
except (AssertionError, AttributeError) as ae:
|
||||
log.error(str(ae))
|
||||
else:
|
||||
return found
|
||||
return found
|
||||
|
||||
def _has_readwrite(path):
|
||||
"""
|
||||
|
@ -335,7 +473,32 @@ def _is_stream(input):
|
|||
:rtype: bool
|
||||
:returns: True if :param:input is a stream, False if otherwise.
|
||||
"""
|
||||
return isinstance(input, BytesIO) or isinstance(input, StringIO)
|
||||
return isinstance(input, tuple(_STREAMLIKE_TYPES))
|
||||
|
||||
def _is_string(thing):
|
||||
"""Check that **thing** is a string. The definition of the latter depends
|
||||
upon the Python version.
|
||||
|
||||
:param thing: The thing to check if it's a string.
|
||||
:rtype: bool
|
||||
:returns: ``True`` if **thing** is string (or unicode in Python2).
|
||||
"""
|
||||
if (_py3k and isinstance(thing, str)):
|
||||
return True
|
||||
if (not _py3k and isinstance(thing, basestring)):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _is_bytes(thing):
|
||||
"""Check that **thing** is bytes.
|
||||
|
||||
:param thing: The thing to check if it's bytes.
|
||||
:rtype: bool
|
||||
:returns: ``True`` if **thing** is bytes or a bytearray.
|
||||
"""
|
||||
if isinstance(thing, (bytes, bytearray)):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _is_list_or_tuple(instance):
|
||||
"""Check that ``instance`` is a list or tuple.
|
||||
|
@ -368,21 +531,26 @@ def _is_gpg2(version):
|
|||
return True
|
||||
return False
|
||||
|
||||
def _make_binary_stream(s, encoding):
|
||||
"""
|
||||
xxx fill me in
|
||||
def _make_binary_stream(thing, encoding=None, armor=True):
|
||||
"""Encode **thing**, then make it stream/file-like.
|
||||
|
||||
:param thing: The thing to turn into a encoded stream.
|
||||
:rtype: ``io.BytesIO`` or ``io.StringIO``.
|
||||
:returns: The encoded **thing**, wrapped in an ``io.BytesIO`` (if
|
||||
available), otherwise wrapped in a ``io.StringIO``.
|
||||
"""
|
||||
if _py3k:
|
||||
if isinstance(thing, str):
|
||||
thing = thing.encode(encoding)
|
||||
else:
|
||||
if type(thing) is not str:
|
||||
thing = thing.encode(encoding)
|
||||
|
||||
try:
|
||||
if _py3k:
|
||||
if isinstance(s, str):
|
||||
s = s.encode(encoding)
|
||||
else:
|
||||
if type(s) is not str:
|
||||
s = s.encode(encoding)
|
||||
from io import BytesIO
|
||||
rv = BytesIO(s)
|
||||
except ImportError:
|
||||
rv = StringIO(s)
|
||||
rv = BytesIO(thing)
|
||||
except NameError:
|
||||
rv = StringIO(thing)
|
||||
|
||||
return rv
|
||||
|
||||
def _make_passphrase(length=None, save=False, file=None):
|
||||
|
@ -403,7 +571,7 @@ def _make_passphrase(length=None, save=False, file=None):
|
|||
passphrase = _make_random_string(length)
|
||||
|
||||
if save:
|
||||
ruid, euid, suid = psutil.Process(os.getpid()).uids
|
||||
ruid, euid, suid = os.getresuid()
|
||||
gid = os.getgid()
|
||||
now = mktime(localtime())
|
||||
|
||||
|
@ -434,8 +602,7 @@ def _match_version_string(version):
|
|||
|
||||
:param str version: A version string in the form x.x.x
|
||||
"""
|
||||
regex = re.compile('(\d)*(\.)*(\d)*(\.)*(\d)*')
|
||||
matched = regex.match(version)
|
||||
matched = _VERSION_STRING_REGEX.match(version)
|
||||
g = matched.groups()
|
||||
major, minor, micro = int(g[0]), int(g[2]), int(g[4])
|
||||
return (major, minor, micro)
|
||||
|
@ -485,7 +652,7 @@ def _utc_epoch():
|
|||
"""Get the seconds since epoch."""
|
||||
return int(mktime(localtime()))
|
||||
|
||||
def _which(executable, flags=os.X_OK):
|
||||
def _which(executable, flags=os.X_OK, abspath_only=False, disallow_symlinks=False):
|
||||
"""Borrowed from Twisted's :mod:twisted.python.proutils .
|
||||
|
||||
Search PATH for executable files with the given name.
|
||||
|
@ -508,6 +675,17 @@ def _which(executable, flags=os.X_OK):
|
|||
:returns: A list of the full paths to files found, in the order in which
|
||||
they were found.
|
||||
"""
|
||||
def _can_allow(p):
|
||||
if not os.access(p, flags):
|
||||
return False
|
||||
if abspath_only and not os.path.abspath(p):
|
||||
log.warn('Ignoring %r (path is not absolute)', p)
|
||||
return False
|
||||
if disallow_symlinks and os.path.islink(p):
|
||||
log.warn('Ignoring %r (path is a symlink)', p)
|
||||
return False
|
||||
return True
|
||||
|
||||
result = []
|
||||
exts = filter(None, os.environ.get('PATHEXT', '').split(os.pathsep))
|
||||
path = os.environ.get('PATH', None)
|
||||
|
@ -515,11 +693,11 @@ def _which(executable, flags=os.X_OK):
|
|||
return []
|
||||
for p in os.environ.get('PATH', '').split(os.pathsep):
|
||||
p = os.path.join(p, executable)
|
||||
if os.access(p, flags):
|
||||
if _can_allow(p):
|
||||
result.append(p)
|
||||
for e in exts:
|
||||
pext = p + e
|
||||
if os.access(pext, flags):
|
||||
if _can_allow(pext):
|
||||
result.append(pext)
|
||||
return result
|
||||
|
||||
|
|
|
@ -179,9 +179,9 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
|
|||
return None
|
||||
return {"version": dirname[len(parentdir_prefix):], "full": ""}
|
||||
|
||||
tag_prefix = "python-gnupg-"
|
||||
parentdir_prefix = "python-gnupg-"
|
||||
versionfile_source = "src/_version.py"
|
||||
tag_prefix = ""
|
||||
parentdir_prefix = "gnupg-"
|
||||
versionfile_source = "gnupg/_version.py"
|
||||
|
||||
def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
|
||||
variables = { "refnames": git_refnames, "full": git_full }
|
||||
|
|
139
gnupg/gnupg.py
139
gnupg/gnupg.py
|
@ -36,13 +36,7 @@ import os
|
|||
import re
|
||||
import textwrap
|
||||
|
||||
try:
|
||||
from io import StringIO
|
||||
except ImportError:
|
||||
from cStringIO import StringIO
|
||||
|
||||
#: see :pep:`328` http://docs.python.org/2.5/whatsnew/pep-328.html
|
||||
from . import _parsers
|
||||
from . import _util
|
||||
from . import _trust
|
||||
from ._meta import GPGBase
|
||||
|
@ -66,7 +60,7 @@ class GPG(GPGBase):
|
|||
|
||||
def __init__(self, binary=None, homedir=None, verbose=False,
|
||||
use_agent=False, keyring=None, secring=None,
|
||||
options=None):
|
||||
ignore_homedir_permissions=False, options=None):
|
||||
"""Initialize a GnuPG process wrapper.
|
||||
|
||||
:param str binary: Name for GnuPG binary executable. If the absolute
|
||||
|
@ -79,6 +73,10 @@ class GPG(GPGBase):
|
|||
and private keyrings. Default is whatever GnuPG
|
||||
defaults to.
|
||||
|
||||
:type ignore_homedir_permissions: :obj:`bool`
|
||||
:param ignore_homedir_permissions: If true, bypass check that homedir
|
||||
be writable.
|
||||
|
||||
:type verbose: :obj:`str` or :obj:`int` or :obj:`bool`
|
||||
:param verbose: String or numeric value to pass to GnuPG's
|
||||
``--debug-level`` option. See the GnuPG man page for
|
||||
|
@ -123,12 +121,16 @@ class GPG(GPGBase):
|
|||
secring=secring,
|
||||
options=options,
|
||||
verbose=verbose,
|
||||
use_agent=use_agent,)
|
||||
use_agent=use_agent,
|
||||
ignore_homedir_permissions=ignore_homedir_permissions,
|
||||
)
|
||||
|
||||
log.info(textwrap.dedent("""
|
||||
Initialised settings:
|
||||
binary: %s
|
||||
binary version: %s
|
||||
homedir: %s
|
||||
ignore_homedir_permissions: %s
|
||||
keyring: %s
|
||||
secring: %s
|
||||
default_preference_list: %s
|
||||
|
@ -136,9 +138,16 @@ class GPG(GPGBase):
|
|||
options: %s
|
||||
verbose: %s
|
||||
use_agent: %s
|
||||
""" % (self.binary, self.homedir, self.keyring, self.secring,
|
||||
self.default_preference_list, self.keyserver, self.options,
|
||||
str(self.verbose), str(self.use_agent))))
|
||||
""" % (self.binary,
|
||||
self.binary_version,
|
||||
self.homedir,
|
||||
self.ignore_homedir_permissions,
|
||||
self.keyring,
|
||||
self.secring,
|
||||
self.default_preference_list,
|
||||
self.keyserver, self.options,
|
||||
str(self.verbose),
|
||||
str(self.use_agent))))
|
||||
|
||||
self._batch_dir = os.path.join(self.homedir, 'batch-files')
|
||||
self._key_dir = os.path.join(self.homedir, 'generated-keys')
|
||||
|
@ -147,58 +156,52 @@ class GPG(GPGBase):
|
|||
self.temp_keyring = None
|
||||
#: The secring used in the most recently created batch file
|
||||
self.temp_secring = None
|
||||
#: The version string of our GnuPG binary
|
||||
self.binary_version = str()
|
||||
|
||||
## check that everything runs alright, and grab the gpg binary's
|
||||
## version number while we're at it:
|
||||
proc = self._open_subprocess(["--list-config", "--with-colons"])
|
||||
result = self._result_map['list'](self)
|
||||
self._read_data(proc.stdout, result)
|
||||
if proc.returncode:
|
||||
raise RuntimeError("Error invoking gpg: %s" % result.data)
|
||||
# Make sure that the trustdb exists, or else GnuPG will exit with a
|
||||
# fatal error (at least it does with GnuPG>=2.0.0):
|
||||
self.create_trustdb()
|
||||
|
||||
version_line = str(result.data).partition(':version:')[2]
|
||||
self.binary_version = version_line.split('\n')[0]
|
||||
log.debug("Using GnuPG version %s" % self.binary_version)
|
||||
|
||||
if _util._is_gpg2:
|
||||
# Make GnuPG>=2.0.0-only methods public:
|
||||
self.fix_trustdb = self._fix_trustdb
|
||||
self.import_ownertrust = self._import_ownertrust
|
||||
self.export_ownertrust = self._export_ownertrust
|
||||
|
||||
# Make sure that the trustdb exists, or else GnuPG will exit with
|
||||
# a fatal error (at least it does with GnuPG>=2.0.0):
|
||||
self._create_trustdb()
|
||||
# The --no-use-agent and --use-agent options were deprecated in GnuPG
|
||||
# 2.x, so we should set use_agent to None here to avoid having
|
||||
# GPGBase._make_args() add either one.
|
||||
if self.is_gpg2():
|
||||
self.use_agent = None
|
||||
|
||||
@functools.wraps(_trust._create_trustdb)
|
||||
def _create_trustdb(self):
|
||||
def create_trustdb(self):
|
||||
if self.is_gpg2():
|
||||
_trust._create_trustdb(self)
|
||||
else:
|
||||
log.info("Creating the trustdb is only available with GnuPG>=2.x")
|
||||
# For backward compatibility with python-gnupg<=1.3.1:
|
||||
_create_trustdb = create_trustdb
|
||||
|
||||
@functools.wraps(_trust.fix_trustdb)
|
||||
def _fix_trustdb(self, trustdb=None):
|
||||
def fix_trustdb(self, trustdb=None):
|
||||
if self.is_gpg2():
|
||||
_trust.fix_trustdb(self)
|
||||
else:
|
||||
log.info("Fixing the trustdb is only available with GnuPG>=2.x")
|
||||
# For backward compatibility with python-gnupg<=1.3.1:
|
||||
_fix_trustdb = fix_trustdb
|
||||
|
||||
@functools.wraps(_trust.import_ownertrust)
|
||||
def _import_ownertrust(self, trustdb=None):
|
||||
def import_ownertrust(self, trustdb=None):
|
||||
if self.is_gpg2():
|
||||
_trust.import_ownertrust(self)
|
||||
else:
|
||||
log.info("Importing ownertrust is only available with GnuPG>=2.x")
|
||||
# For backward compatibility with python-gnupg<=1.3.1:
|
||||
_import_ownertrust = import_ownertrust
|
||||
|
||||
@functools.wraps(_trust.export_ownertrust)
|
||||
def _export_ownertrust(self, trustdb=None):
|
||||
def export_ownertrust(self, trustdb=None):
|
||||
if self.is_gpg2():
|
||||
_trust.export_ownertrust(self)
|
||||
else:
|
||||
log.info("Exporting ownertrust is only available with GnuPG>=2.x")
|
||||
# For backward compatibility with python-gnupg<=1.3.1:
|
||||
_export_ownertrust = export_ownertrust
|
||||
|
||||
def is_gpg1(self):
|
||||
"""Returns true if using GnuPG <= 1.x."""
|
||||
|
@ -284,15 +287,13 @@ class GPG(GPGBase):
|
|||
signatures. If using detached signatures, the file containing the
|
||||
detached signature should be specified as the ``sig_file``.
|
||||
|
||||
:param file file: A file descriptor object. Its type will be checked
|
||||
with :func:`_util._is_file`.
|
||||
:param file file: A file descriptor object.
|
||||
|
||||
:param str sig_file: A file containing the GPG signature data for
|
||||
``file``. If given, ``file`` is verified via this detached
|
||||
signature.
|
||||
signature. Its type will be checked with :func:`_util._is_file`.
|
||||
"""
|
||||
|
||||
fn = None
|
||||
result = self._result_map['verify'](self)
|
||||
|
||||
if sig_file is None:
|
||||
|
@ -307,19 +308,15 @@ class GPG(GPGBase):
|
|||
return result
|
||||
log.debug('verify_file(): Handling detached verification')
|
||||
sig_fh = None
|
||||
data_fh = None
|
||||
try:
|
||||
sig_fh = open(sig_file, 'rb')
|
||||
data_fh = open(file, 'rb')
|
||||
args = ["--verify %s -" % sig_fh.name]
|
||||
proc = self._open_subprocess(args)
|
||||
writer = _util._threaded_copy_data(data_fh, proc.stdin)
|
||||
writer = _util._threaded_copy_data(file, proc.stdin)
|
||||
self._collect_output(proc, result, writer, stdin=proc.stdin)
|
||||
finally:
|
||||
if sig_fh and not sig_fh.closed:
|
||||
sig_fh.close()
|
||||
if data_fh and not data_fh.closed:
|
||||
data_fh.close()
|
||||
return result
|
||||
|
||||
def import_keys(self, key_data):
|
||||
|
@ -488,19 +485,7 @@ class GPG(GPGBase):
|
|||
self._collect_output(p, result, stdin=p.stdin)
|
||||
lines = result.data.decode(self._encoding,
|
||||
self._decode_errors).splitlines()
|
||||
valid_keywords = 'pub uid sec fpr sub'.split()
|
||||
for line in lines:
|
||||
if self.verbose:
|
||||
print(line)
|
||||
log.debug("%r", line.rstrip())
|
||||
if not line:
|
||||
break
|
||||
L = line.strip().split(':')
|
||||
if not L:
|
||||
continue
|
||||
keyword = L[0]
|
||||
if keyword in valid_keywords:
|
||||
getattr(result, keyword)(L)
|
||||
self._parse_keys(result)
|
||||
return result
|
||||
|
||||
def list_packets(self, raw_data):
|
||||
|
@ -521,8 +506,8 @@ class GPG(GPGBase):
|
|||
>>> assert key.fingerprint
|
||||
|
||||
:rtype: dict
|
||||
:returns: A dictionary whose keys are the original keyid parameters,
|
||||
and whose values are lists of signatures.
|
||||
:returns: res.sigs is a dictionary whose keys are the uids and whose
|
||||
values are a set of signature keyids.
|
||||
"""
|
||||
if len(keyids) > self._batch_limit:
|
||||
raise ValueError(
|
||||
|
@ -537,8 +522,26 @@ class GPG(GPGBase):
|
|||
proc = self._open_subprocess(args)
|
||||
result = self._result_map['list'](self)
|
||||
self._collect_output(proc, result, stdin=proc.stdin)
|
||||
self._parse_keys(result)
|
||||
return result
|
||||
|
||||
def _parse_keys(self, result):
|
||||
lines = result.data.decode(self._encoding,
|
||||
self._decode_errors).splitlines()
|
||||
valid_keywords = 'pub uid sec fpr sub sig'.split()
|
||||
for line in lines:
|
||||
if self.verbose:
|
||||
print(line)
|
||||
log.debug("%r", line.rstrip())
|
||||
if not line:
|
||||
break
|
||||
L = line.strip().split(':')
|
||||
if not L:
|
||||
continue
|
||||
keyword = L[0]
|
||||
if keyword in valid_keywords:
|
||||
getattr(result, keyword)(L)
|
||||
|
||||
def gen_key(self, input):
|
||||
"""Generate a GnuPG key through batch file key generation. See
|
||||
:meth:`GPG.gen_key_input()` for creating the control input.
|
||||
|
@ -798,7 +801,7 @@ class GPG(GPGBase):
|
|||
key = key.replace('_','-').title()
|
||||
## to set 'cert', 'Key-Usage' must be blank string
|
||||
if not key in ('Key-Usage', 'Subkey-Usage'):
|
||||
if str(val).strip():
|
||||
if type('')(val).strip():
|
||||
parms[key] = val
|
||||
|
||||
## if Key-Type is 'default', make Subkey-Type also be 'default'
|
||||
|
@ -941,6 +944,13 @@ generate keys. Please see
|
|||
'The crow flies at midnight.'
|
||||
|
||||
|
||||
:param bool throw_keyids: If True, make all **recipients** keyids be
|
||||
zero'd out in packet information. This is the same as using
|
||||
**hidden_recipients** for all **recipients**. (Default: False).
|
||||
|
||||
:param list hidden_recipients: A list of recipients that should have
|
||||
their keyids zero'd out in packet information.
|
||||
|
||||
:param str cipher_algo: The cipher algorithm to use. To see available
|
||||
algorithms with your version of GnuPG, do:
|
||||
:command:`$ gpg --with-colons --list-config ciphername`.
|
||||
|
@ -956,7 +966,10 @@ generate keys. Please see
|
|||
|
||||
.. seealso:: :meth:`._encrypt`
|
||||
"""
|
||||
stream = _make_binary_stream(data, self._encoding)
|
||||
if _is_stream(data):
|
||||
stream = data
|
||||
else:
|
||||
stream = _make_binary_stream(data, self._encoding)
|
||||
result = self._encrypt(stream, recipients, **kwargs)
|
||||
stream.close()
|
||||
return result
|
||||
|
|
|
@ -26,6 +26,7 @@ A test harness and unittests for gnupg.py.
|
|||
from __future__ import absolute_import
|
||||
from __future__ import print_function
|
||||
from __future__ import with_statement
|
||||
|
||||
from argparse import ArgumentParser
|
||||
from codecs import open as open
|
||||
from functools import wraps
|
||||
|
@ -33,10 +34,8 @@ from glob import glob
|
|||
from time import localtime
|
||||
from time import mktime
|
||||
|
||||
import encodings
|
||||
import doctest
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
|
@ -192,7 +191,9 @@ class GPGTestCase(unittest.TestCase):
|
|||
print(fixed)
|
||||
test_file = os.path.join(_files, 'cypherpunk_manifesto')
|
||||
self.assertTrue(os.path.isfile(test_file))
|
||||
has_shell = self.gpg.verify_file(test_file, fixed)
|
||||
datafd = open(test_file, 'rb')
|
||||
has_shell = self.gpg.verify_file(datafd, sig_file=fixed)
|
||||
datafd.close()
|
||||
self.assertFalse(has_shell.valid)
|
||||
|
||||
def test_parsers_fix_unsafe_semicolon(self):
|
||||
|
@ -288,8 +289,8 @@ class GPGTestCase(unittest.TestCase):
|
|||
self.assertTrue(os.path.isabs(self.gpg.binary))
|
||||
|
||||
def test_make_args_drop_protected_options(self):
|
||||
"""Test that unsupported gpg options are dropped."""
|
||||
self.gpg.options = ['--tyrannosaurus-rex', '--stegosaurus']
|
||||
"""Test that unsupported gpg options are dropped, and supported ones remain."""
|
||||
self.gpg.options = ['--tyrannosaurus-rex', '--stegosaurus', '--lock-never']
|
||||
gpg_binary_path = _util._find_binary('gpg')
|
||||
cmd = self.gpg._make_args(None, False)
|
||||
expected = [gpg_binary_path,
|
||||
|
@ -297,7 +298,8 @@ class GPGTestCase(unittest.TestCase):
|
|||
'--homedir "%s"' % self.homedir,
|
||||
'--no-default-keyring --keyring %s' % self.keyring,
|
||||
'--secret-keyring %s' % self.secring,
|
||||
'--no-use-agent']
|
||||
'--no-use-agent',
|
||||
'--lock-never']
|
||||
self.assertListEqual(cmd, expected)
|
||||
|
||||
def test_make_args(self):
|
||||
|
@ -388,7 +390,10 @@ class GPGTestCase(unittest.TestCase):
|
|||
def test_gen_key_input(self):
|
||||
"""Test that GnuPG batch file creation is successful."""
|
||||
key_input = self.generate_key_input("Francisco Ferrer", "an.ok")
|
||||
self.assertIsInstance(key_input, str)
|
||||
if _util._py3k:
|
||||
self.assertIsInstance(key_input, str)
|
||||
else:
|
||||
self.assertIsInstance(key_input, basestring)
|
||||
self.assertGreater(key_input.find('Francisco Ferrer'), 0)
|
||||
|
||||
def test_rsa_key_generation(self):
|
||||
|
@ -564,7 +569,7 @@ class GPGTestCase(unittest.TestCase):
|
|||
def test_import_only(self):
|
||||
"""Test that key import works."""
|
||||
self.test_list_keys_initial_public()
|
||||
self.gpg.import_keys(KEYS_TO_IMPORT)
|
||||
self.assertTrue(self.gpg.import_keys(KEYS_TO_IMPORT))
|
||||
public_keys = self.gpg.list_keys()
|
||||
self.assertTrue(is_list_with_len(public_keys, 2),
|
||||
"2-element list expected")
|
||||
|
@ -621,6 +626,66 @@ class GPGTestCase(unittest.TestCase):
|
|||
passphrase='wrong horse battery staple')
|
||||
self.assertFalse(sig, "Bad passphrase should fail")
|
||||
|
||||
def test_signature_string_passphrase_empty_string(self):
|
||||
"""Test that a signing attempt with passphrase='' creates a valid
|
||||
signature.
|
||||
|
||||
See Issue #82: https://github.com/isislovecruft/python-gnupg/issues/82
|
||||
"""
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
message = 'abc\ndef\n'
|
||||
sig = self.gpg.sign(message, default_key=key1, passphrase='')
|
||||
self.assertTrue(sig)
|
||||
self.assertTrue(message in str(sig))
|
||||
|
||||
def test_signature_string_passphrase_empty_bytes_literal(self):
|
||||
"""Test that a signing attempt with passphrase=b'' creates a valid
|
||||
signature.
|
||||
|
||||
See Issue #82: https://github.com/isislovecruft/python-gnupg/issues/82
|
||||
"""
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
message = 'abc\ndef\n'
|
||||
sig = self.gpg.sign(message, default_key=key1, passphrase=b'')
|
||||
self.assertTrue(sig)
|
||||
print("%r" % str(sig))
|
||||
self.assertTrue(message in str(sig))
|
||||
|
||||
def test_signature_string_passphrase_bytes_literal(self):
|
||||
"""Test that a signing attempt with passphrase=b'overalls' creates a
|
||||
valid signature.
|
||||
"""
|
||||
with open(os.path.join(_files, 'kat.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
message = 'abc\ndef\n'
|
||||
sig = self.gpg.sign(message, default_key=key1, passphrase=b'overalls')
|
||||
self.assertTrue(sig)
|
||||
print("%r" % str(sig))
|
||||
self.assertTrue(message in str(sig))
|
||||
|
||||
def test_signature_string_passphrase_None(self):
|
||||
"""Test that a signing attempt with passphrase=None fails creates a
|
||||
valid signature.
|
||||
|
||||
See Issue #82: https://github.com/isislovecruft/python-gnupg/issues/82
|
||||
"""
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
message = 'abc\ndef\n'
|
||||
sig = self.gpg.sign(message, default_key=key1, passphrase=None)
|
||||
self.assertTrue(sig)
|
||||
self.assertTrue(message in str(sig))
|
||||
|
||||
def test_signature_file(self):
|
||||
"""Test that signing a message file works."""
|
||||
key = self.generate_key("Leonard Adleman", "rsa.com")
|
||||
|
@ -664,6 +729,7 @@ class GPGTestCase(unittest.TestCase):
|
|||
sig = self.gpg.sign(message, default_key=key.fingerprint,
|
||||
passphrase='johanborst')
|
||||
self.assertTrue(sig, "Good passphrase should succeed")
|
||||
|
||||
try:
|
||||
file = _util._make_binary_stream(sig.data, self.gpg._encoding)
|
||||
verified = self.gpg.verify_file(file)
|
||||
|
@ -696,7 +762,7 @@ class GPGTestCase(unittest.TestCase):
|
|||
datafd.seek(0)
|
||||
sigfd.seek(0)
|
||||
|
||||
verified = self.gpg.verify_file(datafn, sigfn)
|
||||
verified = self.gpg.verify_file(datafd, sig_file=sigfn)
|
||||
|
||||
if key.fingerprint != verified.fingerprint:
|
||||
log.warn("key fingerprint: %r", key.fingerprint)
|
||||
|
@ -707,7 +773,7 @@ class GPGTestCase(unittest.TestCase):
|
|||
os.unlink(sigfn)
|
||||
|
||||
def test_signature_verification_detached_binary(self):
|
||||
"""Test that detached signature verification in binary mode fails."""
|
||||
"""Test that detached signature verification in binary mode works."""
|
||||
|
||||
key = self.generate_key("Adi Shamir", "rsa.com")
|
||||
datafn = os.path.join(_files, 'cypherpunk_manifesto')
|
||||
|
@ -715,7 +781,6 @@ class GPGTestCase(unittest.TestCase):
|
|||
|
||||
datafd = open(datafn, 'rb')
|
||||
data = datafd.read()
|
||||
datafd.close()
|
||||
|
||||
sig = self.gpg.sign(data, default_key=key.fingerprint,
|
||||
passphrase='adishamir',
|
||||
|
@ -731,29 +796,96 @@ class GPGTestCase(unittest.TestCase):
|
|||
sigfd.close()
|
||||
|
||||
self.assertTrue(sigfd.closed, "Sigfile '%s' should be closed" % sigfn)
|
||||
with self.assertRaises(UnicodeDecodeError):
|
||||
print("SIG=%s" % sig)
|
||||
|
||||
verifysig = open(sigfn, 'rb')
|
||||
verification = self.gpg.verify_file(data, verifysig)
|
||||
datafd.seek(0)
|
||||
verification = self.gpg.verify_file(datafd, sig_file=sigfn)
|
||||
|
||||
self.assertTrue(isinstance(verification, gnupg._parsers.Verify))
|
||||
self.assertFalse(verification.valid)
|
||||
self.assertTrue(verification.valid)
|
||||
|
||||
datafd.close()
|
||||
|
||||
if os.path.isfile(sigfn):
|
||||
os.unlink(sigfn)
|
||||
|
||||
def test_deletion(self):
|
||||
"""Test that key deletion works."""
|
||||
self.gpg.import_keys(KEYS_TO_IMPORT)
|
||||
def test_deletion_public_key(self):
|
||||
"""Test that key deletion for public keys works, and that it leaves the
|
||||
corresponding secret key intact.
|
||||
"""
|
||||
key1 = None
|
||||
key2 = None
|
||||
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
with open(os.path.join(_files, 'test_key_2.sec')) as fh2:
|
||||
res2 = self.gpg.import_keys(fh2.read())
|
||||
key2 = res2.fingerprints[0]
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
self.assertTrue(is_list_with_len(public_keys, 2),
|
||||
"2-element list expected, got %d" % len(public_keys))
|
||||
self.gpg.delete_keys(public_keys[0]['fingerprint'])
|
||||
self.assertTrue(len(public_keys), 2)
|
||||
|
||||
self.gpg.delete_keys(key1)
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
self.assertTrue(is_list_with_len(public_keys, 1),
|
||||
"1-element list expected, got %d" % len(public_keys))
|
||||
log.debug("test_deletion ends")
|
||||
secret_keys = self.gpg.list_keys(secret=True)
|
||||
self.assertTrue(len(public_keys), 1)
|
||||
self.assertTrue(len(secret_keys), 2)
|
||||
|
||||
def test_deletion_secret_key(self):
|
||||
"""Test that key deletion for secret keys works, and that it leaves the
|
||||
corresponding public key intact.
|
||||
"""
|
||||
key1 = None
|
||||
key2 = None
|
||||
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
with open(os.path.join(_files, 'test_key_2.sec')) as fh2:
|
||||
res2 = self.gpg.import_keys(fh2.read())
|
||||
key2 = res2.fingerprints[0]
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
secret_keys = self.gpg.list_keys(secret=True)
|
||||
self.assertEqual(len(public_keys), 2)
|
||||
self.assertEqual(len(secret_keys), 2)
|
||||
|
||||
self.gpg.delete_keys(key1, secret=True)
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
secret_keys = self.gpg.list_keys(secret=True)
|
||||
self.assertEqual(len(public_keys), 2)
|
||||
self.assertEqual(len(secret_keys), 1)
|
||||
|
||||
def test_deletion_subkeys(self):
|
||||
"""Test that key deletion for subkeys deletes both the public and
|
||||
secret portions of the key.
|
||||
"""
|
||||
key1 = None
|
||||
key2 = None
|
||||
|
||||
with open(os.path.join(_files, 'test_key_1.sec')) as fh1:
|
||||
res1 = self.gpg.import_keys(fh1.read())
|
||||
key1 = res1.fingerprints[0]
|
||||
|
||||
with open(os.path.join(_files, 'test_key_2.sec')) as fh2:
|
||||
res2 = self.gpg.import_keys(fh2.read())
|
||||
key2 = res2.fingerprints[0]
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
secret_keys = self.gpg.list_keys(secret=True)
|
||||
self.assertEqual(len(public_keys), 2)
|
||||
self.assertEqual(len(secret_keys), 2)
|
||||
|
||||
self.gpg.delete_keys(key1, subkeys=True)
|
||||
|
||||
public_keys = self.gpg.list_keys()
|
||||
secret_keys = self.gpg.list_keys(secret=True)
|
||||
self.assertEqual(len(public_keys), 1)
|
||||
self.assertEqual(len(secret_keys), 1)
|
||||
|
||||
def test_encryption(self):
|
||||
"""Test encryption of a message string"""
|
||||
|
@ -776,6 +908,75 @@ authentication."""
|
|||
log.debug("Encrypted: %s" % encrypted)
|
||||
self.assertNotEquals(message, encrypted)
|
||||
|
||||
def _encryption_test_setup(self):
|
||||
passphrase = "craiggentry"
|
||||
key = self.generate_key("Craig Gentry", "xorr.ox", passphrase=passphrase)
|
||||
fpr = str(key.fingerprint)
|
||||
gentry = self.gpg.export_keys(key.fingerprint)
|
||||
self.gpg.import_keys(gentry)
|
||||
message = """
|
||||
In 2010 Riggio and Sicari presented a practical application of homomorphic
|
||||
encryption to a hybrid wireless sensor/mesh network. The system enables
|
||||
transparent multi-hop wireless backhauls that are able to perform statistical
|
||||
analysis of different kinds of data (temperature, humidity, etc.) coming from
|
||||
a WSN while ensuring both end-to-end encryption and hop-by-hop
|
||||
authentication."""
|
||||
return (message, fpr, passphrase)
|
||||
|
||||
def _encryption_test(self, stream_type, message, fingerprint, passphrase):
|
||||
stream = stream_type(message)
|
||||
encrypted = self.gpg.encrypt(stream, fingerprint).data
|
||||
decrypted = self.gpg.decrypt(encrypted, passphrase=passphrase).data
|
||||
|
||||
if isinstance(decrypted, bytes):
|
||||
decrypted = decrypted.decode()
|
||||
if isinstance(message, bytes):
|
||||
message = message.decode()
|
||||
|
||||
self.assertEqual(message, decrypted)
|
||||
|
||||
def test_encryption_of_file_like_objects_io_StringIO(self):
|
||||
"""Test encryption of file-like object io.StringIO."""
|
||||
message, fpr, passphrase = self._encryption_test_setup()
|
||||
|
||||
try:
|
||||
from io import StringIO
|
||||
if _util._py3k:
|
||||
self._encryption_test(StringIO, message, fpr, passphrase)
|
||||
else:
|
||||
self._encryption_test(StringIO, unicode(message), fpr, passphrase)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
def test_encryption_of_file_like_objects_io_BytesIO(self):
|
||||
"""Test encryption of file-like object io.BytesIO."""
|
||||
message, fpr, passphrase = self._encryption_test_setup()
|
||||
|
||||
try:
|
||||
from io import BytesIO
|
||||
if _util._py3k:
|
||||
self._encryption_test(BytesIO, bytes(message, 'utf-8'), fpr, passphrase)
|
||||
else:
|
||||
self._encryption_test(BytesIO, message, fpr, passphrase)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
def test_encryption_of_file_like_objects_StringIO_StringIO(self):
|
||||
"""Test encryption of file-like object StringIO.StringIO (Python2 only)."""
|
||||
message, fpr, passphrase = self._encryption_test_setup()
|
||||
|
||||
if not _util._py3k:
|
||||
from StringIO import StringIO
|
||||
self._encryption_test(StringIO, message, fpr, passphrase)
|
||||
|
||||
def test_encryption_of_file_like_objects_cStringIO_StringIO(self):
|
||||
"""Test encryption of file-like object cStringIO.StringIO (Python2 only)."""
|
||||
message, fpr, passphrase = self._encryption_test_setup()
|
||||
|
||||
if not _util._py3k:
|
||||
from cStringIO import StringIO
|
||||
self._encryption_test(StringIO, message, fpr, passphrase)
|
||||
|
||||
def test_encryption_alt_encoding(self):
|
||||
"""Test encryption with latin-1 encoding"""
|
||||
key = self.generate_key("Craig Gentry", "xorr.ox",
|
||||
|
@ -784,11 +985,7 @@ authentication."""
|
|||
key = self.generate_key("Marten van Dijk", "xorr.ox")
|
||||
dijk = str(key.fingerprint)
|
||||
self.gpg._encoding = 'latin-1'
|
||||
if _util._py3k:
|
||||
data = 'Hello, André!'
|
||||
else:
|
||||
data = unicode('Hello, André', self.gpg._encoding)
|
||||
data = data.encode(self.gpg._encoding)
|
||||
data = u'Hello, André!'.encode(self.gpg._encoding)
|
||||
encrypted = self.gpg.encrypt(data, gentry)
|
||||
edata = str(encrypted.data)
|
||||
self.assertNotEqual(data, edata)
|
||||
|
@ -885,6 +1082,104 @@ authentication."""
|
|||
|
||||
self.assertEqual(message, decrypted)
|
||||
|
||||
def test_decryption_with_bytes_literal(self):
|
||||
"""Test that ``decrypt(encrypt(b'foo'), ...)`` is successful."""
|
||||
with open(os.path.join(_files, 'kat.sec')) as katsec:
|
||||
self.gpg.import_keys(katsec.read())
|
||||
kat = self.gpg.list_keys('kat')[0]['fingerprint']
|
||||
|
||||
message_filename = os.path.join(_files, 'cypherpunk_manifesto')
|
||||
with open(message_filename, 'rb') as f:
|
||||
output = os.path.join(self.gpg.homedir, 'test-decryption-with-bytes-literal.gpg')
|
||||
kwargs = dict(compress_algo='Uncompressed')
|
||||
message = b'Dance like a psycho'
|
||||
encrypted = self.gpg.encrypt(message, kat, **kwargs)
|
||||
self.assertTrue(encrypted.ok)
|
||||
self.assertGreater(len(str(encrypted)), 0)
|
||||
|
||||
decrypted = self.gpg.decrypt(encrypted.data, passphrase='overalls')
|
||||
self.assertTrue(decrypted.ok)
|
||||
self.assertGreater(len(str(decrypted)), 0)
|
||||
# Decode the message so that we can easily compare it with the
|
||||
# decrypted version in both Python2 and Python3:
|
||||
decoded = message.decode(self.gpg._encoding, self.gpg._decode_errors)
|
||||
self.assertEqual(str(decrypted), decoded)
|
||||
|
||||
def test_encryption_one_hidden_recipient_one_not(self):
|
||||
"""Test to ensure hidden recipient isn't detailed in packet info"""
|
||||
|
||||
alice = open(os.path.join(_files, 'test_key_1.pub'))
|
||||
alice_pub = alice.read()
|
||||
alice_public = self.gpg.import_keys(alice_pub)
|
||||
res = alice_public.results[-1:][0]
|
||||
alice_pfpr = str(res['fingerprint'])
|
||||
alice.close()
|
||||
|
||||
bob = open(os.path.join(_files, 'test_key_2.pub'))
|
||||
bob_pub = bob.read()
|
||||
bob_public = self.gpg.import_keys(bob_pub)
|
||||
res = bob_public.results[-1:][0]
|
||||
bob_pfpr = str(res['fingerprint'])
|
||||
bob.close()
|
||||
|
||||
message = """
|
||||
In 2010 Riggio and Sicari presented a practical application of homomorphic
|
||||
encryption to a hybrid wireless sensor/mesh network. The system enables
|
||||
transparent multi-hop wireless backhauls that are able to perform statistical
|
||||
analysis of different kinds of data (temperature, humidity, etc.) coming from
|
||||
a WSN while ensuring both end-to-end encryption and hop-by-hop
|
||||
authentication."""
|
||||
enc = self.gpg.encrypt(message, alice_pfpr, bob_pfpr, hidden_recipients=[alice_pfpr])
|
||||
encrypted = str(enc)
|
||||
log.debug("keyid = %s"
|
||||
% alice_pfpr)
|
||||
|
||||
self.assertNotEquals(message, encrypted)
|
||||
## We expect Alice's key to be hidden (returned as zero's) and Bob's
|
||||
## key to be there.
|
||||
expected_values = ["0000000000000000", "E0ED97345F2973D6"]
|
||||
self.assertEquals(expected_values, self.gpg.list_packets(encrypted).encrypted_to)
|
||||
|
||||
def test_encryption_throw_keyids(self):
|
||||
"""Test to ensure throw-keyids=True causes all recipients to be hidden.
|
||||
"""
|
||||
alice = open(os.path.join(_files, 'test_key_1.pub'))
|
||||
alice_pub = alice.read()
|
||||
alice_public = self.gpg.import_keys(alice_pub)
|
||||
res = alice_public.results[-1:][0]
|
||||
alice_pfpr = str(res['fingerprint'])
|
||||
alice.close()
|
||||
|
||||
bob = open(os.path.join(_files, 'test_key_2.pub'))
|
||||
bob_pub = bob.read()
|
||||
bob_public = self.gpg.import_keys(bob_pub)
|
||||
res = bob_public.results[-1:][0]
|
||||
bob_pfpr = str(res['fingerprint'])
|
||||
bob.close()
|
||||
|
||||
message = """
|
||||
Pairing-based cryptography has led to several cryptographic advancements. One
|
||||
of these advancements is more powerful and more efficient non-interactive
|
||||
zero-knowledge proofs. The seminal idea was to hide the values for the
|
||||
evaluation of the pairing in a commitment. Using different commitment schemes,
|
||||
this idea was used to build zero-knowledge proof systems under the sub-group
|
||||
hiding and under the decisional linear assumption. These proof systems prove
|
||||
circuit satisfiability, and thus by the Cook–Levin theorem allow to prove
|
||||
membership for every language in NP. The size of the common reference string
|
||||
and the proofs is relatively small, however transforming a statement into a
|
||||
boolean circuit causes a considerable overhead."""
|
||||
enc = self.gpg.encrypt(message, alice_pfpr, bob_pfpr, throw_keyids=True)
|
||||
encrypted = str(enc)
|
||||
log.debug("keyid = %s"
|
||||
% alice_pfpr)
|
||||
|
||||
self.assertNotEquals(message, encrypted)
|
||||
## We expect Alice's key to be hidden (returned as zero's) and Bob's
|
||||
## key to be there.
|
||||
expected_values = ["0000000000000000", "0000000000000000"]
|
||||
packets = self.gpg.list_packets(encrypted)
|
||||
self.assertEquals(expected_values, packets.encrypted_to)
|
||||
|
||||
def test_encryption_decryption_multi_recipient(self):
|
||||
"""Test decryption of an encrypted string for multiple users"""
|
||||
|
||||
|
@ -1007,9 +1302,9 @@ know, maybe you shouldn't be doing it in the first place.
|
|||
self.assertTrue(os.path.isfile(output))
|
||||
|
||||
# Check the contents:
|
||||
with open(output) as fh:
|
||||
with open(output, 'rb') as fh:
|
||||
encrypted_message = fh.read()
|
||||
log.debug("Encrypted file contains:\n\n%s\n" % encrypted_message)
|
||||
self.assertTrue(b"-----BEGIN PGP MESSAGE-----" in encrypted_message)
|
||||
|
||||
def test_encryption_to_filehandle(self):
|
||||
"""Test that ``encrypt(..., output=filelikething)`` is successful."""
|
||||
|
@ -1029,9 +1324,45 @@ know, maybe you shouldn't be doing it in the first place.
|
|||
self.assertTrue(os.path.isfile(output))
|
||||
|
||||
# Check the contents:
|
||||
with open(output) as fh:
|
||||
with open(output, 'rb') as fh:
|
||||
encrypted_message = fh.read()
|
||||
log.debug("Encrypted file contains:\n\n%s\n" % encrypted_message)
|
||||
self.assertTrue(b"-----BEGIN PGP MESSAGE-----" in encrypted_message)
|
||||
|
||||
def test_encryption_from_filehandle(self):
|
||||
"""Test that ``encrypt(open('foo'), ...)`` is successful."""
|
||||
message_filename = os.path.join(_files, 'cypherpunk_manifesto')
|
||||
with open(message_filename, 'rb') as f:
|
||||
output = os.path.join(self.gpg.homedir, 'test-encryption-from-filehandle.gpg')
|
||||
kwargs = dict(passphrase='speedtest',
|
||||
symmetric=True,
|
||||
cipher_algo='AES256',
|
||||
encrypt=False,
|
||||
output=output)
|
||||
encrypted = self.gpg.encrypt(f, None, **kwargs)
|
||||
self.assertTrue(encrypted.ok)
|
||||
self.assertGreater(len(encrypted.data), 0)
|
||||
|
||||
def test_encryption_with_output(self):
|
||||
"""Test that ``encrypt('foo', ..., output='/foo/bar/baz')`` is successful."""
|
||||
message_filename = os.path.join(_files, 'cypherpunk_manifesto')
|
||||
with open (message_filename, 'rb') as f:
|
||||
data = f.read()
|
||||
|
||||
output = os.path.join(self.gpg.homedir, 'test-encryption-with-output.gpg')
|
||||
kwargs = dict(passphrase='speedtest',
|
||||
symmetric=True,
|
||||
cipher_algo='AES256',
|
||||
encrypt=False,
|
||||
output=output)
|
||||
encrypted = self.gpg.encrypt(data, None, **kwargs)
|
||||
self.assertTrue(encrypted.ok)
|
||||
self.assertGreater(len(encrypted.data), 0)
|
||||
self.assertTrue(os.path.isfile(output))
|
||||
|
||||
# Check the contents:
|
||||
with open(output, 'rb') as fh:
|
||||
encrypted_message = fh.read()
|
||||
self.assertTrue(b"-----BEGIN PGP MESSAGE-----" in encrypted_message)
|
||||
|
||||
|
||||
suites = { 'parsers': set(['test_parsers_fix_unsafe',
|
||||
|
@ -1068,25 +1399,41 @@ suites = { 'parsers': set(['test_parsers_fix_unsafe',
|
|||
'test_signature_verification_detached',
|
||||
'test_signature_verification_detached_binary',
|
||||
'test_signature_file',
|
||||
'test_signature_string_passphrase_empty_string',
|
||||
'test_signature_string_passphrase_empty_bytes_literal',
|
||||
'test_signature_string_passphrase_bytes_literal',
|
||||
'test_signature_string_passphrase_None',
|
||||
'test_signature_string_bad_passphrase',
|
||||
'test_signature_string_verification',
|
||||
'test_signature_string_algorithm_encoding']),
|
||||
'crypt': set(['test_encryption',
|
||||
'test_encryption_of_file_like_objects_io_StringIO',
|
||||
'test_encryption_of_file_like_objects_io_BytesIO',
|
||||
'test_encryption_of_file_like_objects_StringIO_StringIO',
|
||||
'test_encryption_of_file_like_objects_cStringIO_StringIO',
|
||||
'test_encryption_alt_encoding',
|
||||
'test_encryption_multi_recipient',
|
||||
'test_encryption_decryption_multi_recipient',
|
||||
'test_encryption_one_hidden_recipient_one_not',
|
||||
'test_encryption_throw_keyids',
|
||||
'test_decryption',
|
||||
'test_decryption_with_bytes_literal',
|
||||
'test_symmetric_encryption_and_decryption',
|
||||
'test_file_encryption_and_decryption',
|
||||
'test_encryption_to_filename',
|
||||
'test_encryption_to_filehandle',]),
|
||||
'test_encryption_to_filehandle',
|
||||
'test_encryption_from_filehandle',
|
||||
'test_encryption_with_output',]),
|
||||
'listkeys': set(['test_list_keys_after_generation']),
|
||||
'keyrings': set(['test_public_keyring',
|
||||
'test_secret_keyring',
|
||||
'test_import_and_export',
|
||||
'test_deletion',
|
||||
'test_import_only',
|
||||
'test_recv_keys_default',]), }
|
||||
'test_deletion_public_key',
|
||||
'test_deletion_secret_key',
|
||||
'test_deletion_subkeys',
|
||||
'test_import_only']),
|
||||
'recvkeys': set(['test_recv_keys_default']),
|
||||
}
|
||||
|
||||
def main(args):
|
||||
if not args.quiet:
|
||||
|
|
|
@ -1,30 +0,0 @@
|
|||
From cfcb84db5452b1fbc801ca85f2f70015660f3132 Mon Sep 17 00:00:00 2001
|
||||
From: Lunar <lunar@anargeek.net>
|
||||
Date: Wed, 6 Mar 2013 18:39:34 +0100
|
||||
Subject: [PATCH] Make _open_subprocess argument more explicit in _handle_io
|
||||
|
||||
The previous way worked out, but was really harder to understand.
|
||||
---
|
||||
gnupg.py | 6 +++++-
|
||||
1 files changed, 5 insertions(+), 1 deletions(-)
|
||||
|
||||
diff --git a/gnupg.py b/gnupg.py
|
||||
index 4a73164..479e6dd 100644
|
||||
--- a/gnupg.py
|
||||
+++ b/gnupg.py
|
||||
@@ -984,7 +984,11 @@ class GPG(object):
|
||||
"""
|
||||
Handle a call to GPG - pass input data, collect output data.
|
||||
"""
|
||||
- p = self._open_subprocess(args, passphrase is not None)
|
||||
+ if passphrase is not None:
|
||||
+ ask_passphrase = True
|
||||
+ else:
|
||||
+ ask_passphrase = False
|
||||
+ p = self._open_subprocess(args, ask_passphrase)
|
||||
if not binary:
|
||||
stdin = codecs.getwriter(self.encoding)(p.stdin)
|
||||
else:
|
||||
--
|
||||
1.7.2.5
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
This patches folder is managed by quilt, which is a tool for automatic patch
|
||||
application and removal. To use quilt with the patches in this directory,
|
||||
navigate to the top level directory of this repository, and do:
|
||||
|
||||
$ quilt setup patches/series
|
||||
|
||||
To add an externally created patch (in other words, one created with ```diff
|
||||
--git``` or ```git diff```), place that .patch or .diff file in this directory,
|
||||
and do:
|
||||
|
||||
$ quilt import patches/<patchfile>
|
||||
|
||||
Then, to apply the new patch, do:
|
||||
|
||||
$ quilt push
|
||||
|
||||
Removing patches from the stack can be done with:
|
||||
|
||||
$ quilt pop
|
||||
|
||||
Please see the man quilt(1) for more information on adding and importing new
|
||||
patches. The debian package maintainer guides also have chapters on quilt
|
||||
usage.
|
|
@ -1 +0,0 @@
|
|||
0001-Make-_open_subprocess-argument-more-explicit-in-_han.patch
|
|
@ -1,558 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
"""Bootstrap distribute installation
|
||||
|
||||
If you want to use setuptools in your package's setup.py, just include this
|
||||
file in the same directory with it, and add this to the top of your setup.py::
|
||||
|
||||
from distribute_setup import use_setuptools
|
||||
use_setuptools()
|
||||
|
||||
If you want to require a specific version of setuptools, set a download
|
||||
mirror, or use an alternate download directory, you can do so by supplying
|
||||
the appropriate options to ``use_setuptools()``.
|
||||
|
||||
This file can also be run as a script to install or upgrade setuptools.
|
||||
|
||||
This file was taken from http://nightly.ziade.org/distribute_setup.py
|
||||
on 2013-05-27.
|
||||
"""
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import time
|
||||
import fnmatch
|
||||
import tempfile
|
||||
import tarfile
|
||||
import optparse
|
||||
|
||||
from distutils import log
|
||||
|
||||
try:
|
||||
from site import USER_SITE
|
||||
except ImportError:
|
||||
USER_SITE = None
|
||||
|
||||
try:
|
||||
import subprocess
|
||||
|
||||
def _python_cmd(*args):
|
||||
args = (sys.executable,) + args
|
||||
return subprocess.call(args) == 0
|
||||
|
||||
except ImportError:
|
||||
# will be used for python 2.3
|
||||
def _python_cmd(*args):
|
||||
args = (sys.executable,) + args
|
||||
# quoting arguments if windows
|
||||
if sys.platform == 'win32':
|
||||
def quote(arg):
|
||||
if ' ' in arg:
|
||||
return '"%s"' % arg
|
||||
return arg
|
||||
args = [quote(arg) for arg in args]
|
||||
return os.spawnl(os.P_WAIT, sys.executable, *args) == 0
|
||||
|
||||
DEFAULT_VERSION = "0.6.44"
|
||||
DEFAULT_URL = "http://pypi.python.org/packages/source/d/distribute/"
|
||||
SETUPTOOLS_FAKED_VERSION = "0.6c11"
|
||||
|
||||
SETUPTOOLS_PKG_INFO = """\
|
||||
Metadata-Version: 1.0
|
||||
Name: setuptools
|
||||
Version: %s
|
||||
Summary: xxxx
|
||||
Home-page: xxx
|
||||
Author: xxx
|
||||
Author-email: xxx
|
||||
License: xxx
|
||||
Description: xxx
|
||||
""" % SETUPTOOLS_FAKED_VERSION
|
||||
|
||||
|
||||
def _install(tarball, install_args=()):
|
||||
# extracting the tarball
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
log.warn('Extracting in %s', tmpdir)
|
||||
old_wd = os.getcwd()
|
||||
try:
|
||||
os.chdir(tmpdir)
|
||||
tar = tarfile.open(tarball)
|
||||
_extractall(tar)
|
||||
tar.close()
|
||||
|
||||
# going in the directory
|
||||
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
|
||||
os.chdir(subdir)
|
||||
log.warn('Now working in %s', subdir)
|
||||
|
||||
# installing
|
||||
log.warn('Installing Distribute')
|
||||
if not _python_cmd('setup.py', 'install', *install_args):
|
||||
log.warn('Something went wrong during the installation.')
|
||||
log.warn('See the error message above.')
|
||||
# exitcode will be 2
|
||||
return 2
|
||||
finally:
|
||||
os.chdir(old_wd)
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
|
||||
def _build_egg(egg, tarball, to_dir):
|
||||
# extracting the tarball
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
log.warn('Extracting in %s', tmpdir)
|
||||
old_wd = os.getcwd()
|
||||
try:
|
||||
os.chdir(tmpdir)
|
||||
tar = tarfile.open(tarball)
|
||||
_extractall(tar)
|
||||
tar.close()
|
||||
|
||||
# going in the directory
|
||||
subdir = os.path.join(tmpdir, os.listdir(tmpdir)[0])
|
||||
os.chdir(subdir)
|
||||
log.warn('Now working in %s', subdir)
|
||||
|
||||
# building an egg
|
||||
log.warn('Building a Distribute egg in %s', to_dir)
|
||||
_python_cmd('setup.py', '-q', 'bdist_egg', '--dist-dir', to_dir)
|
||||
|
||||
finally:
|
||||
os.chdir(old_wd)
|
||||
shutil.rmtree(tmpdir)
|
||||
# returning the result
|
||||
log.warn(egg)
|
||||
if not os.path.exists(egg):
|
||||
raise IOError('Could not build the egg.')
|
||||
|
||||
|
||||
def _do_download(version, download_base, to_dir, download_delay):
|
||||
egg = os.path.join(to_dir, 'distribute-%s-py%d.%d.egg'
|
||||
% (version, sys.version_info[0], sys.version_info[1]))
|
||||
if not os.path.exists(egg):
|
||||
tarball = download_setuptools(version, download_base,
|
||||
to_dir, download_delay)
|
||||
_build_egg(egg, tarball, to_dir)
|
||||
sys.path.insert(0, egg)
|
||||
import setuptools
|
||||
setuptools.bootstrap_install_from = egg
|
||||
|
||||
|
||||
def use_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
|
||||
to_dir=os.curdir, download_delay=15, no_fake=True):
|
||||
# making sure we use the absolute path
|
||||
to_dir = os.path.abspath(to_dir)
|
||||
was_imported = 'pkg_resources' in sys.modules or \
|
||||
'setuptools' in sys.modules
|
||||
try:
|
||||
try:
|
||||
import pkg_resources
|
||||
|
||||
# Setuptools 0.7b and later is a suitable (and preferable)
|
||||
# substitute for any Distribute version.
|
||||
try:
|
||||
pkg_resources.require("setuptools>=0.7b")
|
||||
return
|
||||
except pkg_resources.DistributionNotFound:
|
||||
pass
|
||||
|
||||
if not hasattr(pkg_resources, '_distribute'):
|
||||
if not no_fake:
|
||||
_fake_setuptools()
|
||||
raise ImportError
|
||||
except ImportError:
|
||||
return _do_download(version, download_base, to_dir, download_delay)
|
||||
try:
|
||||
pkg_resources.require("distribute>=" + version)
|
||||
return
|
||||
except pkg_resources.VersionConflict:
|
||||
e = sys.exc_info()[1]
|
||||
if was_imported:
|
||||
sys.stderr.write(
|
||||
"The required version of distribute (>=%s) is not available,\n"
|
||||
"and can't be installed while this script is running. Please\n"
|
||||
"install a more recent version first, using\n"
|
||||
"'easy_install -U distribute'."
|
||||
"\n\n(Currently using %r)\n" % (version, e.args[0]))
|
||||
sys.exit(2)
|
||||
else:
|
||||
del pkg_resources, sys.modules['pkg_resources'] # reload ok
|
||||
return _do_download(version, download_base, to_dir,
|
||||
download_delay)
|
||||
except pkg_resources.DistributionNotFound:
|
||||
return _do_download(version, download_base, to_dir,
|
||||
download_delay)
|
||||
finally:
|
||||
if not no_fake:
|
||||
_create_fake_setuptools_pkg_info(to_dir)
|
||||
|
||||
|
||||
def download_setuptools(version=DEFAULT_VERSION, download_base=DEFAULT_URL,
|
||||
to_dir=os.curdir, delay=15):
|
||||
"""Download distribute from a specified location and return its filename
|
||||
|
||||
`version` should be a valid distribute version number that is available
|
||||
as an egg for download under the `download_base` URL (which should end
|
||||
with a '/'). `to_dir` is the directory where the egg will be downloaded.
|
||||
`delay` is the number of seconds to pause before an actual download
|
||||
attempt.
|
||||
"""
|
||||
# making sure we use the absolute path
|
||||
to_dir = os.path.abspath(to_dir)
|
||||
try:
|
||||
from urllib.request import urlopen
|
||||
except ImportError:
|
||||
from urllib2 import urlopen
|
||||
tgz_name = "distribute-%s.tar.gz" % version
|
||||
url = download_base + tgz_name
|
||||
saveto = os.path.join(to_dir, tgz_name)
|
||||
src = dst = None
|
||||
if not os.path.exists(saveto): # Avoid repeated downloads
|
||||
try:
|
||||
log.warn("Downloading %s", url)
|
||||
src = urlopen(url)
|
||||
# Read/write all in one block, so we don't create a corrupt file
|
||||
# if the download is interrupted.
|
||||
data = src.read()
|
||||
dst = open(saveto, "wb")
|
||||
dst.write(data)
|
||||
finally:
|
||||
if src:
|
||||
src.close()
|
||||
if dst:
|
||||
dst.close()
|
||||
return os.path.realpath(saveto)
|
||||
|
||||
|
||||
def _no_sandbox(function):
|
||||
def __no_sandbox(*args, **kw):
|
||||
try:
|
||||
from setuptools.sandbox import DirectorySandbox
|
||||
if not hasattr(DirectorySandbox, '_old'):
|
||||
def violation(*args):
|
||||
pass
|
||||
DirectorySandbox._old = DirectorySandbox._violation
|
||||
DirectorySandbox._violation = violation
|
||||
patched = True
|
||||
else:
|
||||
patched = False
|
||||
except ImportError:
|
||||
patched = False
|
||||
|
||||
try:
|
||||
return function(*args, **kw)
|
||||
finally:
|
||||
if patched:
|
||||
DirectorySandbox._violation = DirectorySandbox._old
|
||||
del DirectorySandbox._old
|
||||
|
||||
return __no_sandbox
|
||||
|
||||
|
||||
def _patch_file(path, content):
|
||||
"""Will backup the file then patch it"""
|
||||
f = open(path)
|
||||
existing_content = f.read()
|
||||
f.close()
|
||||
if existing_content == content:
|
||||
# already patched
|
||||
log.warn('Already patched.')
|
||||
return False
|
||||
log.warn('Patching...')
|
||||
_rename_path(path)
|
||||
f = open(path, 'w')
|
||||
try:
|
||||
f.write(content)
|
||||
finally:
|
||||
f.close()
|
||||
return True
|
||||
|
||||
_patch_file = _no_sandbox(_patch_file)
|
||||
|
||||
|
||||
def _same_content(path, content):
|
||||
f = open(path)
|
||||
existing_content = f.read()
|
||||
f.close()
|
||||
return existing_content == content
|
||||
|
||||
|
||||
def _rename_path(path):
|
||||
new_name = path + '.OLD.%s' % time.time()
|
||||
log.warn('Renaming %s to %s', path, new_name)
|
||||
os.rename(path, new_name)
|
||||
return new_name
|
||||
|
||||
|
||||
def _remove_flat_installation(placeholder):
|
||||
if not os.path.isdir(placeholder):
|
||||
log.warn('Unkown installation at %s', placeholder)
|
||||
return False
|
||||
found = False
|
||||
for file in os.listdir(placeholder):
|
||||
if fnmatch.fnmatch(file, 'setuptools*.egg-info'):
|
||||
found = True
|
||||
break
|
||||
if not found:
|
||||
log.warn('Could not locate setuptools*.egg-info')
|
||||
return
|
||||
|
||||
log.warn('Moving elements out of the way...')
|
||||
pkg_info = os.path.join(placeholder, file)
|
||||
if os.path.isdir(pkg_info):
|
||||
patched = _patch_egg_dir(pkg_info)
|
||||
else:
|
||||
patched = _patch_file(pkg_info, SETUPTOOLS_PKG_INFO)
|
||||
|
||||
if not patched:
|
||||
log.warn('%s already patched.', pkg_info)
|
||||
return False
|
||||
# now let's move the files out of the way
|
||||
for element in ('setuptools', 'pkg_resources.py', 'site.py'):
|
||||
element = os.path.join(placeholder, element)
|
||||
if os.path.exists(element):
|
||||
_rename_path(element)
|
||||
else:
|
||||
log.warn('Could not find the %s element of the '
|
||||
'Setuptools distribution', element)
|
||||
return True
|
||||
|
||||
_remove_flat_installation = _no_sandbox(_remove_flat_installation)
|
||||
|
||||
|
||||
def _after_install(dist):
|
||||
log.warn('After install bootstrap.')
|
||||
placeholder = dist.get_command_obj('install').install_purelib
|
||||
_create_fake_setuptools_pkg_info(placeholder)
|
||||
|
||||
|
||||
def _create_fake_setuptools_pkg_info(placeholder):
|
||||
if not placeholder or not os.path.exists(placeholder):
|
||||
log.warn('Could not find the install location')
|
||||
return
|
||||
pyver = '%s.%s' % (sys.version_info[0], sys.version_info[1])
|
||||
setuptools_file = 'setuptools-%s-py%s.egg-info' % \
|
||||
(SETUPTOOLS_FAKED_VERSION, pyver)
|
||||
pkg_info = os.path.join(placeholder, setuptools_file)
|
||||
if os.path.exists(pkg_info):
|
||||
log.warn('%s already exists', pkg_info)
|
||||
return
|
||||
|
||||
log.warn('Creating %s', pkg_info)
|
||||
try:
|
||||
f = open(pkg_info, 'w')
|
||||
except EnvironmentError:
|
||||
log.warn("Don't have permissions to write %s, skipping", pkg_info)
|
||||
return
|
||||
try:
|
||||
f.write(SETUPTOOLS_PKG_INFO)
|
||||
finally:
|
||||
f.close()
|
||||
|
||||
pth_file = os.path.join(placeholder, 'setuptools.pth')
|
||||
log.warn('Creating %s', pth_file)
|
||||
f = open(pth_file, 'w')
|
||||
try:
|
||||
f.write(os.path.join(os.curdir, setuptools_file))
|
||||
finally:
|
||||
f.close()
|
||||
|
||||
_create_fake_setuptools_pkg_info = _no_sandbox(
|
||||
_create_fake_setuptools_pkg_info
|
||||
)
|
||||
|
||||
|
||||
def _patch_egg_dir(path):
|
||||
# let's check if it's already patched
|
||||
pkg_info = os.path.join(path, 'EGG-INFO', 'PKG-INFO')
|
||||
if os.path.exists(pkg_info):
|
||||
if _same_content(pkg_info, SETUPTOOLS_PKG_INFO):
|
||||
log.warn('%s already patched.', pkg_info)
|
||||
return False
|
||||
_rename_path(path)
|
||||
os.mkdir(path)
|
||||
os.mkdir(os.path.join(path, 'EGG-INFO'))
|
||||
pkg_info = os.path.join(path, 'EGG-INFO', 'PKG-INFO')
|
||||
f = open(pkg_info, 'w')
|
||||
try:
|
||||
f.write(SETUPTOOLS_PKG_INFO)
|
||||
finally:
|
||||
f.close()
|
||||
return True
|
||||
|
||||
_patch_egg_dir = _no_sandbox(_patch_egg_dir)
|
||||
|
||||
|
||||
def _before_install():
|
||||
log.warn('Before install bootstrap.')
|
||||
_fake_setuptools()
|
||||
|
||||
|
||||
def _under_prefix(location):
|
||||
if 'install' not in sys.argv:
|
||||
return True
|
||||
args = sys.argv[sys.argv.index('install') + 1:]
|
||||
for index, arg in enumerate(args):
|
||||
for option in ('--root', '--prefix'):
|
||||
if arg.startswith('%s=' % option):
|
||||
top_dir = arg.split('root=')[-1]
|
||||
return location.startswith(top_dir)
|
||||
elif arg == option:
|
||||
if len(args) > index:
|
||||
top_dir = args[index + 1]
|
||||
return location.startswith(top_dir)
|
||||
if arg == '--user' and USER_SITE is not None:
|
||||
return location.startswith(USER_SITE)
|
||||
return True
|
||||
|
||||
|
||||
def _fake_setuptools():
|
||||
log.warn('Scanning installed packages')
|
||||
try:
|
||||
import pkg_resources
|
||||
except ImportError:
|
||||
# we're cool
|
||||
log.warn('Setuptools or Distribute does not seem to be installed.')
|
||||
return
|
||||
ws = pkg_resources.working_set
|
||||
try:
|
||||
setuptools_dist = ws.find(
|
||||
pkg_resources.Requirement.parse('setuptools', replacement=False)
|
||||
)
|
||||
except TypeError:
|
||||
# old distribute API
|
||||
setuptools_dist = ws.find(
|
||||
pkg_resources.Requirement.parse('setuptools')
|
||||
)
|
||||
|
||||
if setuptools_dist is None:
|
||||
log.warn('No setuptools distribution found')
|
||||
return
|
||||
# detecting if it was already faked
|
||||
setuptools_location = setuptools_dist.location
|
||||
log.warn('Setuptools installation detected at %s', setuptools_location)
|
||||
|
||||
# if --root or --preix was provided, and if
|
||||
# setuptools is not located in them, we don't patch it
|
||||
if not _under_prefix(setuptools_location):
|
||||
log.warn('Not patching, --root or --prefix is installing Distribute'
|
||||
' in another location')
|
||||
return
|
||||
|
||||
# let's see if its an egg
|
||||
if not setuptools_location.endswith('.egg'):
|
||||
log.warn('Non-egg installation')
|
||||
res = _remove_flat_installation(setuptools_location)
|
||||
if not res:
|
||||
return
|
||||
else:
|
||||
log.warn('Egg installation')
|
||||
pkg_info = os.path.join(setuptools_location, 'EGG-INFO', 'PKG-INFO')
|
||||
if (os.path.exists(pkg_info) and
|
||||
_same_content(pkg_info, SETUPTOOLS_PKG_INFO)):
|
||||
log.warn('Already patched.')
|
||||
return
|
||||
log.warn('Patching...')
|
||||
# let's create a fake egg replacing setuptools one
|
||||
res = _patch_egg_dir(setuptools_location)
|
||||
if not res:
|
||||
return
|
||||
log.warn('Patching complete.')
|
||||
_relaunch()
|
||||
|
||||
|
||||
def _relaunch():
|
||||
log.warn('Relaunching...')
|
||||
# we have to relaunch the process
|
||||
# pip marker to avoid a relaunch bug
|
||||
_cmd1 = ['-c', 'install', '--single-version-externally-managed']
|
||||
_cmd2 = ['-c', 'install', '--record']
|
||||
if sys.argv[:3] == _cmd1 or sys.argv[:3] == _cmd2:
|
||||
sys.argv[0] = 'setup.py'
|
||||
args = [sys.executable] + sys.argv
|
||||
sys.exit(subprocess.call(args))
|
||||
|
||||
|
||||
def _extractall(self, path=".", members=None):
|
||||
"""Extract all members from the archive to the current working
|
||||
directory and set owner, modification time and permissions on
|
||||
directories afterwards. `path' specifies a different directory
|
||||
to extract to. `members' is optional and must be a subset of the
|
||||
list returned by getmembers().
|
||||
"""
|
||||
import copy
|
||||
import operator
|
||||
from tarfile import ExtractError
|
||||
directories = []
|
||||
|
||||
if members is None:
|
||||
members = self
|
||||
|
||||
for tarinfo in members:
|
||||
if tarinfo.isdir():
|
||||
# Extract directories with a safe mode.
|
||||
directories.append(tarinfo)
|
||||
tarinfo = copy.copy(tarinfo)
|
||||
tarinfo.mode = 448 # decimal for oct 0700
|
||||
self.extract(tarinfo, path)
|
||||
|
||||
# Reverse sort directories.
|
||||
if sys.version_info < (2, 4):
|
||||
def sorter(dir1, dir2):
|
||||
return cmp(dir1.name, dir2.name)
|
||||
directories.sort(sorter)
|
||||
directories.reverse()
|
||||
else:
|
||||
directories.sort(key=operator.attrgetter('name'), reverse=True)
|
||||
|
||||
# Set correct owner, mtime and filemode on directories.
|
||||
for tarinfo in directories:
|
||||
dirpath = os.path.join(path, tarinfo.name)
|
||||
try:
|
||||
self.chown(tarinfo, dirpath)
|
||||
self.utime(tarinfo, dirpath)
|
||||
self.chmod(tarinfo, dirpath)
|
||||
except ExtractError:
|
||||
e = sys.exc_info()[1]
|
||||
if self.errorlevel > 1:
|
||||
raise
|
||||
else:
|
||||
self._dbg(1, "tarfile: %s" % e)
|
||||
|
||||
|
||||
def _build_install_args(options):
|
||||
"""
|
||||
Build the arguments to 'python setup.py install' on the distribute package
|
||||
"""
|
||||
install_args = []
|
||||
if options.user_install:
|
||||
if sys.version_info < (2, 6):
|
||||
log.warn("--user requires Python 2.6 or later")
|
||||
raise SystemExit(1)
|
||||
install_args.append('--user')
|
||||
return install_args
|
||||
|
||||
def _parse_args():
|
||||
"""
|
||||
Parse the command line for options
|
||||
"""
|
||||
parser = optparse.OptionParser()
|
||||
parser.add_option(
|
||||
'--user', dest='user_install', action='store_true', default=False,
|
||||
help='install in user site package (requires Python 2.6 or later)')
|
||||
parser.add_option(
|
||||
'--download-base', dest='download_base', metavar="URL",
|
||||
default=DEFAULT_URL,
|
||||
help='alternative URL from where to download the distribute package')
|
||||
options, args = parser.parse_args()
|
||||
# positional arguments are ignored
|
||||
return options
|
||||
|
||||
def main(version=DEFAULT_VERSION):
|
||||
"""Install or upgrade setuptools and EasyInstall"""
|
||||
options = _parse_args()
|
||||
tarball = download_setuptools(download_base=options.download_base)
|
||||
return _install(tarball, _build_install_args(options))
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main())
|
1153
scripts/get-pip.py
1153
scripts/get-pip.py
File diff suppressed because it is too large
Load Diff
|
@ -1,55 +0,0 @@
|
|||
#!/bin/bash
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# This file is part of python-gnupg, a Python interface to GnuPG.
|
||||
# Copyright © 2013 Isis Lovecruft, <isis@leap.se> 0xA3ADB67A2CDB8B35
|
||||
# © 2013 Andrej B.
|
||||
# © 2013 LEAP Encryption Access Project
|
||||
# © 2008-2012 Vinay Sajip
|
||||
# © 2005 Steve Traugott
|
||||
# © 2004 A.M. Kuchling
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it
|
||||
# under the terms of the GNU General Public License as published by the Free
|
||||
# Software Foundation, either version 3 of the License, or (at your option)
|
||||
# any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
# FITNESS FOR A PARTICULAR PURPOSE. See the included LICENSE file for details.
|
||||
|
||||
project=python-gnupg
|
||||
VENV=$(which virtualenv)
|
||||
WRPR=$(which virtualenvwrapper.sh)
|
||||
|
||||
|
||||
if ! test -n "$VENV" ; then
|
||||
printf "Couldn't find virtualenv. Are you sure it's installed?"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! test -n "$WRPR"; then
|
||||
printf "Couldn't find virtualenvwrapper. Are you sure it's installed?"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
test -r "$WRPR" && . $WRPR
|
||||
okay=$?
|
||||
|
||||
if test "$okay" -eq 0 ; then
|
||||
printf "Using %s as WORKON_HOME for the new virtualenv...\n" $PWD
|
||||
printf "What should the name of the new virtualenv be? (default: '%s')\n" $project
|
||||
read -p"Name for this virtualenv?: " name
|
||||
if test -z "$name"; then
|
||||
name="$project"
|
||||
fi
|
||||
printf "Using '$name' as our project's name..."
|
||||
printf "Creating virtualenv..."
|
||||
mkvirtualenv -a "$PWD" --no-site-packages --unzip-setuptools \
|
||||
--distribute --prompt="(gnupg)" "$name"
|
||||
exit $?
|
||||
else
|
||||
printf "Something went wrong..."
|
||||
printf "Exit code %d from mkvirtualenv." "$okay"
|
||||
exit $okay
|
||||
fi
|
1937
scripts/pep8.py
1937
scripts/pep8.py
File diff suppressed because it is too large
Load Diff
27
setup.py
27
setup.py
|
@ -22,11 +22,19 @@
|
|||
from __future__ import absolute_import
|
||||
from __future__ import print_function
|
||||
|
||||
import platform
|
||||
import setuptools
|
||||
import sys
|
||||
import os
|
||||
import versioneer
|
||||
|
||||
try:
|
||||
import __pypy__
|
||||
except ImportError:
|
||||
_isPyPy = False
|
||||
else:
|
||||
_isPyPy = True
|
||||
|
||||
|
||||
versioneer.versionfile_source = 'gnupg/_version.py'
|
||||
versioneer.versionfile_build = 'gnupg/_version.py'
|
||||
|
@ -75,6 +83,13 @@ def get_requirements():
|
|||
# Required to make `collections.OrderedDict` available on Python<=2.6
|
||||
requirements.append('ordereddict==1.1#a0ed854ee442051b249bfad0f638bbec')
|
||||
|
||||
# Don't try to install psutil on PyPy:
|
||||
if _isPyPy:
|
||||
for line in requirements[:]:
|
||||
if line.startswith('psutil'):
|
||||
print("Not installing %s on PyPy..." % line)
|
||||
requirements.remove(line)
|
||||
|
||||
return requirements, links
|
||||
|
||||
|
||||
|
@ -89,8 +104,8 @@ This module allows easy access to GnuPG's key management, encryption and \
|
|||
signature functionality from Python programs, by interacting with GnuPG \
|
||||
through file descriptors. Input arguments are strictly checked and sanitised, \
|
||||
and therefore this module should be safe to use in networked applications \
|
||||
requiring direct user input. It is intended for use with Python 2.6 or \
|
||||
greater.
|
||||
requiring direct user input. It is intended for use on Windows, MacOS X, BSD, \
|
||||
or Linux, with Python 2.6, Python 2.7, Python 3.3, Python 3.4, or PyPy.
|
||||
""",
|
||||
license="GPLv3+",
|
||||
|
||||
|
@ -119,7 +134,13 @@ greater.
|
|||
classifiers=[
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"Intended Audience :: Developers",
|
||||
"Intended Audience :: System Administrators",
|
||||
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
|
||||
"Operating System :: Android",
|
||||
"Operating System :: MacOS :: MacOS X",
|
||||
"Operating System :: Microsoft :: Windows",
|
||||
"Operating System :: POSIX :: BSD",
|
||||
"Operating System :: POSIX :: Linux",
|
||||
"Programming Language :: Python",
|
||||
"Programming Language :: Python :: 2",
|
||||
"Programming Language :: Python :: 3",
|
||||
|
@ -127,6 +148,8 @@ greater.
|
|||
"Programming Language :: Python :: 2.7",
|
||||
"Programming Language :: Python :: 3.3",
|
||||
"Programming Language :: Python :: 3.4",
|
||||
"Programming Language :: Python :: Implementation :: CPython",
|
||||
"Programming Language :: Python :: Implementation :: PyPy",
|
||||
"Topic :: Security :: Cryptography",
|
||||
"Topic :: Software Development :: Libraries :: Python Modules",
|
||||
"Topic :: Utilities",]
|
||||
|
|
Loading…
Reference in New Issue