Merge branch 'release/1.3.3'

master
Isis Lovecruft 2014-11-27 02:58:47 +00:00
commit 613e84cd56
No known key found for this signature in database
GPG Key ID: 18C16EC5F9F1D673
7 changed files with 384 additions and 93 deletions

View File

@ -75,19 +75,40 @@ class GPGMeta(type):
instance containing the gpg-agent process' information to instance containing the gpg-agent process' information to
``cls._agent_proc``. ``cls._agent_proc``.
For Unix systems, we check that the effective UID of this
``python-gnupg`` process is also the owner of the gpg-agent
process. For Windows, we check that the usernames of the owners are
the same. (Sorry Windows users; maybe you should switch to anything
else.)
:returns: True if there exists a gpg-agent process running under the :returns: True if there exists a gpg-agent process running under the
same effective user ID as that of this program. Otherwise, same effective user ID as that of this program. Otherwise,
returns None. returns False.
""" """
identity = psutil.Process(os.getpid()).uids this_process = psutil.Process(os.getpid())
ownership_match = False
if _util._running_windows:
identity = this_process.username()
else:
identity = this_process.uids
for proc in psutil.process_iter(): for proc in psutil.process_iter():
if (proc.name == "gpg-agent") and proc.is_running: if (proc.name == "gpg-agent") and proc.is_running:
log.debug("Found gpg-agent process with pid %d" % proc.pid) log.debug("Found gpg-agent process with pid %d" % proc.pid)
if proc.uids == identity: if _util._running_windows:
log.debug( if proc.username() == identity:
"Effective UIDs of this process and gpg-agent match") ownership_match = True
setattr(cls, '_agent_proc', proc) else:
return True if proc.uids == identity:
ownership_match = True
if ownership_match:
log.debug("Effective UIDs of this process and gpg-agent match")
setattr(cls, '_agent_proc', proc)
return True
return False
class GPGBase(object): class GPGBase(object):
@ -159,6 +180,14 @@ class GPGBase(object):
self._filesystemencoding = encodings.normalize_encoding( self._filesystemencoding = encodings.normalize_encoding(
sys.getfilesystemencoding().lower()) sys.getfilesystemencoding().lower())
# Issue #49: https://github.com/isislovecruft/python-gnupg/issues/49
#
# During `line = stream.readline()` in `_read_response()`, the Python
# codecs module will choke on Unicode data, so we globally monkeypatch
# the "strict" error handler to use the builtin `replace_errors`
# handler:
codecs.register_error('strict', codecs.replace_errors)
self._keyserver = 'hkp://wwwkeys.pgp.net' self._keyserver = 'hkp://wwwkeys.pgp.net'
self.__generated_keys = os.path.join(self.homedir, 'generated-keys') self.__generated_keys = os.path.join(self.homedir, 'generated-keys')
@ -492,11 +521,9 @@ class GPGBase(object):
if self.use_agent: cmd.append('--use-agent') if self.use_agent: cmd.append('--use-agent')
else: cmd.append('--no-use-agent') else: cmd.append('--no-use-agent')
if self.options: # The arguments for debugging and verbosity should be placed into the
[cmd.append(opt) for opt in iter(_sanitise_list(self.options))] # cmd list before the options/args in order to resolve Issue #76:
if args: # https://github.com/isislovecruft/python-gnupg/issues/76
[cmd.append(arg) for arg in iter(_sanitise_list(args))]
if self.verbose: if self.verbose:
cmd.append('--debug-all') cmd.append('--debug-all')
@ -509,6 +536,11 @@ class GPGBase(object):
else: else:
cmd.append('--debug-level %s' % self.verbose) cmd.append('--debug-level %s' % self.verbose)
if self.options:
[cmd.append(opt) for opt in iter(_sanitise_list(self.options))]
if args:
[cmd.append(arg) for arg in iter(_sanitise_list(args))]
return cmd return cmd
def _open_subprocess(self, args=None, passphrase=False): def _open_subprocess(self, args=None, passphrase=False):
@ -772,6 +804,8 @@ class GPGBase(object):
symmetric=False, symmetric=False,
always_trust=True, always_trust=True,
output=None, output=None,
throw_keyids=False,
hidden_recipients=None,
cipher_algo='AES256', cipher_algo='AES256',
digest_algo='SHA512', digest_algo='SHA512',
compress_algo='ZLIB'): compress_algo='ZLIB'):
@ -844,6 +878,14 @@ class GPGBase(object):
>>> decrypted >>> decrypted
'The crow flies at midnight.' 'The crow flies at midnight.'
:param bool throw_keyids: If True, make all **recipients** keyids be
zero'd out in packet information. This is the same as using
**hidden_recipients** for all **recipients**. (Default: False).
:param list hidden_recipients: A list of recipients that should have
their keyids zero'd out in packet information.
:param str cipher_algo: The cipher algorithm to use. To see available :param str cipher_algo: The cipher algorithm to use. To see available
algorithms with your version of GnuPG, do: algorithms with your version of GnuPG, do:
:command:`$ gpg --with-colons --list-config :command:`$ gpg --with-colons --list-config
@ -895,6 +937,7 @@ class GPGBase(object):
## is decryptable with a passphrase or secretkey. ## is decryptable with a passphrase or secretkey.
if symmetric: args.append('--symmetric') if symmetric: args.append('--symmetric')
if encrypt: args.append('--encrypt') if encrypt: args.append('--encrypt')
if throw_keyids: args.append('--throw-keyids')
if len(recipients) >= 1: if len(recipients) >= 1:
log.debug("GPG.encrypt() called for recipients '%s' with type '%s'" log.debug("GPG.encrypt() called for recipients '%s' with type '%s'"
@ -910,21 +953,27 @@ class GPGBase(object):
log.info("Can't accept recipient string: %s" log.info("Can't accept recipient string: %s"
% recp) % recp)
else: else:
args.append('--recipient %s' % str(recp)) self._add_recipient_string(args, hidden_recipients, str(recp))
continue continue
## will give unicode in 2.x as '\uXXXX\uXXXX' ## will give unicode in 2.x as '\uXXXX\uXXXX'
args.append('--recipient %r' % recp) if isinstance(hidden_recipients, (list, tuple)):
if [s for s in hidden_recipients if recp in str(s)]:
args.append('--hidden-recipient %r' % recp)
else:
args.append('--recipient %r' % recp)
else:
args.append('--recipient %r' % recp)
continue continue
if isinstance(recp, str): if isinstance(recp, str):
args.append('--recipient %s' % recp) self._add_recipient_string(args, hidden_recipients, recp)
elif (not _util._py3k) and isinstance(recp, basestring): elif (not _util._py3k) and isinstance(recp, basestring):
for recp in recipients.split('\x20'): for recp in recipients.split('\x20'):
args.append('--recipient %s' % recp) self._add_recipient_string(args, hidden_recipients, recp)
elif _util._py3k and isinstance(recp, str): elif _util._py3k and isinstance(recp, str):
for recp in recipients.split(' '): for recp in recipients.split(' '):
args.append('--recipient %s' % recp) self._add_recipient_string(args, hidden_recipients, recp)
## ...and now that we've proven py3k is better... ## ...and now that we've proven py3k is better...
else: else:
@ -946,3 +995,12 @@ class GPGBase(object):
log.info("Encrypted output written successfully.") log.info("Encrypted output written successfully.")
return result return result
def _add_recipient_string(self, args, hidden_recipients, recipient):
if isinstance(hidden_recipients, (list, tuple)):
if [s for s in hidden_recipients if recipient in str(s)]:
args.append('--hidden-recipient %s' % recipient)
else:
args.append('--recipient %s' % recipient)
else:
args.append('--recipient %s' % recipient)

View File

@ -959,7 +959,6 @@ class ListKeys(list):
| crs = X.509 certificate and private key available | crs = X.509 certificate and private key available
| ssb = secret subkey (secondary key) | ssb = secret subkey (secondary key)
| uat = user attribute (same as user id except for field 10). | uat = user attribute (same as user id except for field 10).
| sig = signature
| rev = revocation signature | rev = revocation signature
| pkd = public key data (special field format, see below) | pkd = public key data (special field format, see below)
| grp = reserved for gpgsm | grp = reserved for gpgsm
@ -970,8 +969,10 @@ class ListKeys(list):
super(ListKeys, self).__init__() super(ListKeys, self).__init__()
self._gpg = gpg self._gpg = gpg
self.curkey = None self.curkey = None
self.curuid = None
self.fingerprints = [] self.fingerprints = []
self.uids = [] self.uids = []
self.sigs = {}
def key(self, args): def key(self, args):
vars = (""" vars = ("""
@ -981,8 +982,12 @@ class ListKeys(list):
for i in range(len(vars)): for i in range(len(vars)):
self.curkey[vars[i]] = args[i] self.curkey[vars[i]] = args[i]
self.curkey['uids'] = [] self.curkey['uids'] = []
self.curkey['sigs'] = {}
if self.curkey['uid']: if self.curkey['uid']:
self.curkey['uids'].append(self.curkey['uid']) self.curuid = self.curkey['uid']
self.curkey['uids'].append(self.curuid)
self.sigs[self.curuid] = set()
self.curkey['sigs'][self.curuid] = []
del self.curkey['uid'] del self.curkey['uid']
self.curkey['subkeys'] = [] self.curkey['subkeys'] = []
self.append(self.curkey) self.append(self.curkey)
@ -997,8 +1002,21 @@ class ListKeys(list):
uid = args[9] uid = args[9]
uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
self.curkey['uids'].append(uid) self.curkey['uids'].append(uid)
self.curuid = uid
self.curkey['sigs'][uid] = []
self.sigs[uid] = set()
self.uids.append(uid) self.uids.append(uid)
def sig(self, args):
vars = ("""
type trust length algo keyid date expires dummy ownertrust uid
""").split()
sig = {}
for i in range(len(vars)):
sig[vars[i]] = args[i]
self.curkey['sigs'][self.curuid].append(sig)
self.sigs[self.curuid].add(sig['keyid'])
def sub(self, args): def sub(self, args):
subkey = [args[4], args[11]] subkey = [args[4], args[11]]
self.curkey['subkeys'].append(subkey) self.curkey['subkeys'].append(subkey)
@ -1008,42 +1026,52 @@ class ListKeys(list):
class ImportResult(object): class ImportResult(object):
"""Parse GnuPG status messages for key import operations. """Parse GnuPG status messages for key import operations."""
:type gpg: :class:`gnupg.GPG`
:param gpg: An instance of :class:`gnupg.GPG`.
"""
_ok_reason = {'0': 'Not actually changed',
'1': 'Entirely new key',
'2': 'New user IDs',
'4': 'New signatures',
'8': 'New subkeys',
'16': 'Contains private key',
'17': 'Contains private key',}
_problem_reason = { '0': 'No specific reason given',
'1': 'Invalid Certificate',
'2': 'Issuer Certificate missing',
'3': 'Certificate Chain too long',
'4': 'Error storing certificate', }
_fields = '''count no_user_id imported imported_rsa unchanged
n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups
not_imported'''.split()
_counts = OrderedDict(
zip(_fields, [int(0) for x in range(len(_fields))]) )
#: A list of strings containing the fingerprints of the GnuPG keyIDs
#: imported.
fingerprints = list()
#: A list containing dictionaries with information gathered on keys
#: imported.
results = list()
def __init__(self, gpg): def __init__(self, gpg):
"""Start parsing the results of a key import operation.
:type gpg: :class:`gnupg.GPG`
:param gpg: An instance of :class:`gnupg.GPG`.
"""
self._gpg = gpg self._gpg = gpg
self.counts = self._counts
#: A map from GnuPG codes shown with the ``IMPORT_OK`` status message
#: to their human-meaningful English equivalents.
self._ok_reason = {'0': 'Not actually changed',
'1': 'Entirely new key',
'2': 'New user IDs',
'4': 'New signatures',
'8': 'New subkeys',
'16': 'Contains private key',
'17': 'Contains private key',}
#: A map from GnuPG codes shown with the ``IMPORT_PROBLEM`` status
#: message to their human-meaningful English equivalents.
self._problem_reason = { '0': 'No specific reason given',
'1': 'Invalid Certificate',
'2': 'Issuer Certificate missing',
'3': 'Certificate Chain too long',
'4': 'Error storing certificate', }
#: All the possible status messages pertaining to actions taken while
#: importing a key.
self._fields = '''count no_user_id imported imported_rsa unchanged
n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups
not_imported'''.split()
#: Counts of all the status message results, :data:`_fields` which
#: have appeared.
self.counts = OrderedDict(
zip(self._fields, [int(0) for x in range(len(self._fields))]))
#: A list of strings containing the fingerprints of the GnuPG keyIDs
#: imported.
self.fingerprints = list()
#: A list containing dictionaries with information gathered on keys
#: imported.
self.results = list()
def __nonzero__(self): def __nonzero__(self):
"""Override the determination for truthfulness evaluation. """Override the determination for truthfulness evaluation.
@ -1059,7 +1087,7 @@ class ImportResult(object):
def _handle_status(self, key, value): def _handle_status(self, key, value):
"""Parse a status code from the attached GnuPG process. """Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown. :raises ValueError: if the status message is unknown.
""" """
if key == "IMPORTED": if key == "IMPORTED":
# this duplicates info we already see in import_ok & import_problem # this duplicates info we already see in import_ok & import_problem
@ -1192,6 +1220,37 @@ class Verify(object):
self.trust_level = None self.trust_level = None
#: The string corresponding to the ``trust_level`` number. #: The string corresponding to the ``trust_level`` number.
self.trust_text = None self.trust_text = None
#: The subpackets. These are stored as a dictionary, in the following
#: form:
#: Verify.subpackets = {'SUBPACKET_NUMBER': {'flags': FLAGS,
#: 'length': LENGTH,
#: 'data': DATA},
#: 'ANOTHER_SUBPACKET_NUMBER': {...}}
self.subpackets = {}
#: The signature or key notations. These are also stored as a
#: dictionary, in the following form:
#:
#: Verify.notations = {NOTATION_NAME: NOTATION_DATA}
#:
#: For example, the Bitcoin core developer, Peter Todd, encodes in
#: every signature the header of the latest block on the Bitcoin
#: blockchain (to prove that a GnuPG signature that Peter made was made
#: *after* a specific point in time). These look like:
#:
#: gpg: Signature notation: blockhash@bitcoin.org=000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a
#:
#: Which python-gnupg would store as:
#:
#: Verify.notations['blockhash@bitcoin.org'] = '000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a'
self.notations = {}
#: This will be a str or None. If not None, it is the last
#: ``NOTATION_NAME`` we stored in the ``notations`` dict. Because we're
#: not assured that a ``NOTATION_DATA`` status will arrive *immediately*
#: after its corresponding ``NOTATION_NAME``, we store the latest
#: ``NOTATION_NAME`` here until we get its corresponding
#: ``NOTATION_DATA``.
self._last_notation_name = None
def __nonzero__(self): def __nonzero__(self):
"""Override the determination for truthfulness evaluation. """Override the determination for truthfulness evaluation.
@ -1212,7 +1271,7 @@ class Verify(object):
self.trust_level = self.TRUST_LEVELS[key] self.trust_level = self.TRUST_LEVELS[key]
elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT", elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT",
"PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
"DECRYPTION_OKAY", "INV_SGNR"): "DECRYPTION_OKAY", "INV_SGNR", "PROGRESS"):
pass pass
elif key == "BADSIG": elif key == "BADSIG":
self.valid = False self.valid = False
@ -1290,6 +1349,65 @@ class Verify(object):
# status message): # status message):
elif key in ("KEYREVOKED"): elif key in ("KEYREVOKED"):
self.status = '\n'.join([self.status, "key revoked"]) self.status = '\n'.join([self.status, "key revoked"])
# SIG_SUBPACKET <type> <flags> <len> <data>
# This indicates that a signature subpacket was seen. The format is
# the same as the "spk" record above.
#
# [...]
#
# SPK - Signature subpacket records
#
# - Field 2 :: Subpacket number as per RFC-4880 and later.
# - Field 3 :: Flags in hex. Currently the only two bits assigned
# are 1, to indicate that the subpacket came from the
# hashed part of the signature, and 2, to indicate the
# subpacket was marked critical.
# - Field 4 :: Length of the subpacket. Note that this is the
# length of the subpacket, and not the length of field
# 5 below. Due to the need for %-encoding, the length
# of field 5 may be up to 3x this value.
# - Field 5 :: The subpacket data. Printable ASCII is shown as
# ASCII, but other values are rendered as %XX where XX
# is the hex value for the byte.
elif key in ("SIG_SUBPACKET"):
fields = value.split()
try:
subpacket_number = fields[0]
self.subpackets[subpacket_number] = {'flags': None,
'length': None,
'data': None}
except IndexError:
# We couldn't parse the subpacket type (an RFC4880
# identifier), so we shouldn't continue parsing.
pass
else:
# Pull as much data as we can parse out of the subpacket:
try:
self.subpackets[subpacket_number]['flags'] = fields[1]
self.subpackets[subpacket_number]['length'] = fields[2]
self.subpackets[subpacket_number]['data'] = fields[3]
except IndexError:
pass
# NOTATION_
# There are actually two related status codes to convey notation
# data:
#
# - NOTATION_NAME <name>
# - NOTATION_DATA <string>
#
# <name> and <string> are %XX escaped; the data may be split among
# several NOTATION_DATA lines.
elif key.startswith("NOTATION_"):
if key.endswith("NAME"):
self.notations[value] = str()
self._last_notation_name = value
elif key.endswith("DATA"):
if self._last_notation_name is not None:
# Append the NOTATION_DATA to any previous data we
# received for that NOTATION_NAME:
self.notations[self._last_notation_name] += value
else:
pass
else: else:
raise ValueError("Unknown status message: %r" % key) raise ValueError("Unknown status message: %r" % key)
@ -1394,6 +1512,12 @@ class ListPackets(object):
self.need_passphrase_sym = None self.need_passphrase_sym = None
#: The keyid and uid which this data is encrypted to. #: The keyid and uid which this data is encrypted to.
self.userid_hint = None self.userid_hint = None
#: The first key that we detected that a message was encrypted
#: to. This is provided for backwards compatibility. As of Issue #77_,
#: the ``encrypted_to`` attribute should be used instead.
self.key = None
#: A list of keyid's that the message has been encrypted to.
self.encrypted_to = []
def _handle_status(self, key, value): def _handle_status(self, key, value):
"""Parse a status code from the attached GnuPG process. """Parse a status code from the attached GnuPG process.
@ -1403,9 +1527,10 @@ class ListPackets(object):
if key == 'NODATA': if key == 'NODATA':
self.status = nodata(value) self.status = nodata(value)
elif key == 'ENC_TO': elif key == 'ENC_TO':
# This will only capture keys in our keyring. In the future we key, _, _ = value.split()
# may want to include multiple unknown keys in this list. if not self.key:
self.key, _, _ = value.split() self.key = key
self.encrypted_to.append(key)
elif key == 'NEED_PASSPHRASE': elif key == 'NEED_PASSPHRASE':
self.need_passphrase = True self.need_passphrase = True
elif key == 'NEED_PASSPHRASE_SYM': elif key == 'NEED_PASSPHRASE_SYM':

View File

@ -57,7 +57,7 @@ def export_ownertrust(cls, trustdb=None):
except (OSError, IOError) as err: except (OSError, IOError) as err:
log.debug(str(err)) log.debug(str(err))
export_proc = cls._open_subprocess('--export-ownertrust') export_proc = cls._open_subprocess(['--export-ownertrust'])
tdb = open(trustdb, 'wb') tdb = open(trustdb, 'wb')
_util._threaded_copy_data(export_proc.stdout, tdb) _util._threaded_copy_data(export_proc.stdout, tdb)
@ -71,7 +71,7 @@ def import_ownertrust(self, trustdb=None):
if trustdb is None: if trustdb is None:
trustdb = os.path.join(cls.homedir, 'trustdb.gpg') trustdb = os.path.join(cls.homedir, 'trustdb.gpg')
import_proc = cls._open_subprocess('--import-ownertrust') import_proc = cls._open_subprocess(['--import-ownertrust'])
tdb = open(trustdb, 'rb') tdb = open(trustdb, 'rb')
_util._threaded_copy_data(tdb, import_proc.stdin) _util._threaded_copy_data(tdb, import_proc.stdin)
@ -98,6 +98,6 @@ def fix_trustdb(cls, trustdb=None):
""" """
if trustdb is None: if trustdb is None:
trustdb = os.path.join(cls.homedir, 'trustdb.gpg') trustdb = os.path.join(cls.homedir, 'trustdb.gpg')
export_proc = cls._open_subprocess('--export-ownertrust') export_proc = cls._open_subprocess(['--export-ownertrust'])
import_proc = cls._open_subprocess('--import-ownertrust') import_proc = cls._open_subprocess(['--import-ownertrust'])
_util._threaded_copy_data(export_proc.stdout, import_proc.stdin) _util._threaded_copy_data(export_proc.stdout, import_proc.stdin)

View File

@ -56,6 +56,9 @@ try:
except NameError: except NameError:
_py3k = True _py3k = True
_running_windows = False
if "win" in sys.platform:
_running_windows = True
## Directory shortcuts: ## Directory shortcuts:
## we don't want to use this one because it writes to the install dir: ## we don't want to use this one because it writes to the install dir:
@ -63,6 +66,20 @@ except NameError:
_here = os.path.join(os.getcwd(), 'gnupg') ## current dir _here = os.path.join(os.getcwd(), 'gnupg') ## current dir
_test = os.path.join(os.path.join(_here, 'test'), 'tmp') ## ./tests/tmp _test = os.path.join(os.path.join(_here, 'test'), 'tmp') ## ./tests/tmp
_user = os.environ.get('HOME') ## $HOME _user = os.environ.get('HOME') ## $HOME
# Fix for Issue #74: we shouldn't expect that a $HOME directory is set in all
# environs. https://github.com/isislovecruft/python-gnupg/issues/74
if not _user:
_user = '/tmp/python-gnupg'
try:
os.makedirs(_user)
except (OSError, IOError):
_user = os.getcwd()
# If we can't use $HOME, but we have (or can create) a
# /tmp/python-gnupg/gnupghome directory, then we'll default to using
# that. Otherwise, we'll use the current directory + /gnupghome.
_user = os.path.sep.join([_user, 'gnupghome'])
_ugpg = os.path.join(_user, '.gnupg') ## $HOME/.gnupg _ugpg = os.path.join(_user, '.gnupg') ## $HOME/.gnupg
_conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg') _conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg')
## $HOME/.config/python-gnupg ## $HOME/.config/python-gnupg
@ -277,7 +294,7 @@ def _find_binary(binary=None):
elif os.access(binary, os.X_OK): elif os.access(binary, os.X_OK):
found = binary found = binary
if found is None: if found is None:
try: found = _which('gpg')[0] try: found = _which('gpg', abspath_only=True, disallow_symlinks=True)[0]
except IndexError as ie: except IndexError as ie:
log.error("Could not find binary for 'gpg'.") log.error("Could not find binary for 'gpg'.")
try: found = _which('gpg2')[0] try: found = _which('gpg2')[0]
@ -286,14 +303,7 @@ def _find_binary(binary=None):
if found is None: if found is None:
raise RuntimeError("GnuPG is not installed!") raise RuntimeError("GnuPG is not installed!")
try: return found
assert os.path.isabs(found), "Path to gpg binary not absolute"
assert not os.path.islink(found), "Path to gpg binary is symlink"
assert os.access(found, os.X_OK), "Lacking +x perms for gpg binary"
except (AssertionError, AttributeError) as ae:
log.error(str(ae))
else:
return found
def _has_readwrite(path): def _has_readwrite(path):
""" """
@ -489,7 +499,7 @@ def _utc_epoch():
"""Get the seconds since epoch.""" """Get the seconds since epoch."""
return int(mktime(localtime())) return int(mktime(localtime()))
def _which(executable, flags=os.X_OK): def _which(executable, flags=os.X_OK, abspath_only=False, disallow_symlinks=False):
"""Borrowed from Twisted's :mod:twisted.python.proutils . """Borrowed from Twisted's :mod:twisted.python.proutils .
Search PATH for executable files with the given name. Search PATH for executable files with the given name.
@ -512,6 +522,17 @@ def _which(executable, flags=os.X_OK):
:returns: A list of the full paths to files found, in the order in which :returns: A list of the full paths to files found, in the order in which
they were found. they were found.
""" """
def _can_allow(p):
if not os.access(p, flags):
return False
if abspath_only and not os.path.abspath(p):
log.warn('Ignoring %r (path is not absolute)', p)
return False
if disallow_symlinks and os.path.islink(p):
log.warn('Ignoring %r (path is a symlink)', p)
return False
return True
result = [] result = []
exts = filter(None, os.environ.get('PATHEXT', '').split(os.pathsep)) exts = filter(None, os.environ.get('PATHEXT', '').split(os.pathsep))
path = os.environ.get('PATH', None) path = os.environ.get('PATH', None)
@ -519,11 +540,11 @@ def _which(executable, flags=os.X_OK):
return [] return []
for p in os.environ.get('PATH', '').split(os.pathsep): for p in os.environ.get('PATH', '').split(os.pathsep):
p = os.path.join(p, executable) p = os.path.join(p, executable)
if os.access(p, flags): if _can_allow(p):
result.append(p) result.append(p)
for e in exts: for e in exts:
pext = p + e pext = p + e
if os.access(pext, flags): if _can_allow(pext):
result.append(pext) result.append(pext)
return result return result

View File

@ -179,9 +179,9 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False)
return None return None
return {"version": dirname[len(parentdir_prefix):], "full": ""} return {"version": dirname[len(parentdir_prefix):], "full": ""}
tag_prefix = "python-gnupg-" tag_prefix = ""
parentdir_prefix = "python-gnupg-" parentdir_prefix = "gnupg-"
versionfile_source = "src/_version.py" versionfile_source = "gnupg/_version.py"
def get_versions(default={"version": "unknown", "full": ""}, verbose=False): def get_versions(default={"version": "unknown", "full": ""}, verbose=False):
variables = { "refnames": git_refnames, "full": git_full } variables = { "refnames": git_refnames, "full": git_full }

View File

@ -471,19 +471,7 @@ class GPG(GPGBase):
self._collect_output(p, result, stdin=p.stdin) self._collect_output(p, result, stdin=p.stdin)
lines = result.data.decode(self._encoding, lines = result.data.decode(self._encoding,
self._decode_errors).splitlines() self._decode_errors).splitlines()
valid_keywords = 'pub uid sec fpr sub'.split() self._parse_keys(result)
for line in lines:
if self.verbose:
print(line)
log.debug("%r", line.rstrip())
if not line:
break
L = line.strip().split(':')
if not L:
continue
keyword = L[0]
if keyword in valid_keywords:
getattr(result, keyword)(L)
return result return result
def list_packets(self, raw_data): def list_packets(self, raw_data):
@ -504,8 +492,8 @@ class GPG(GPGBase):
>>> assert key.fingerprint >>> assert key.fingerprint
:rtype: dict :rtype: dict
:returns: A dictionary whose keys are the original keyid parameters, :returns: res.sigs is a dictionary whose keys are the uids and whose
and whose values are lists of signatures. values are a set of signature keyids.
""" """
if len(keyids) > self._batch_limit: if len(keyids) > self._batch_limit:
raise ValueError( raise ValueError(
@ -520,8 +508,26 @@ class GPG(GPGBase):
proc = self._open_subprocess(args) proc = self._open_subprocess(args)
result = self._result_map['list'](self) result = self._result_map['list'](self)
self._collect_output(proc, result, stdin=proc.stdin) self._collect_output(proc, result, stdin=proc.stdin)
self._parse_keys(result)
return result return result
def _parse_keys(self, result):
lines = result.data.decode(self._encoding,
self._decode_errors).splitlines()
valid_keywords = 'pub uid sec fpr sub sig'.split()
for line in lines:
if self.verbose:
print(line)
log.debug("%r", line.rstrip())
if not line:
break
L = line.strip().split(':')
if not L:
continue
keyword = L[0]
if keyword in valid_keywords:
getattr(result, keyword)(L)
def gen_key(self, input): def gen_key(self, input):
"""Generate a GnuPG key through batch file key generation. See """Generate a GnuPG key through batch file key generation. See
:meth:`GPG.gen_key_input()` for creating the control input. :meth:`GPG.gen_key_input()` for creating the control input.
@ -924,6 +930,13 @@ generate keys. Please see
'The crow flies at midnight.' 'The crow flies at midnight.'
:param bool throw_keyids: If True, make all **recipients** keyids be
zero'd out in packet information. This is the same as using
**hidden_recipients** for all **recipients**. (Default: False).
:param list hidden_recipients: A list of recipients that should have
their keyids zero'd out in packet information.
:param str cipher_algo: The cipher algorithm to use. To see available :param str cipher_algo: The cipher algorithm to use. To see available
algorithms with your version of GnuPG, do: algorithms with your version of GnuPG, do:
:command:`$ gpg --with-colons --list-config ciphername`. :command:`$ gpg --with-colons --list-config ciphername`.

View File

@ -731,8 +731,6 @@ class GPGTestCase(unittest.TestCase):
sigfd.close() sigfd.close()
self.assertTrue(sigfd.closed, "Sigfile '%s' should be closed" % sigfn) self.assertTrue(sigfd.closed, "Sigfile '%s' should be closed" % sigfn)
with self.assertRaises(UnicodeDecodeError):
print("SIG=%s" % sig)
datafd.seek(0) datafd.seek(0)
verification = self.gpg.verify_file(datafd, sig_file=sigfn) verification = self.gpg.verify_file(datafd, sig_file=sigfn)
@ -887,6 +885,80 @@ authentication."""
self.assertEqual(message, decrypted) self.assertEqual(message, decrypted)
def test_encryption_one_hidden_recipient_one_not(self):
"""Test to ensure hidden recipient isn't detailed in packet info"""
alice = open(os.path.join(_files, 'test_key_1.pub'))
alice_pub = alice.read()
alice_public = self.gpg.import_keys(alice_pub)
res = alice_public.results[-1:][0]
alice_pfpr = str(res['fingerprint'])
alice.close()
bob = open(os.path.join(_files, 'test_key_2.pub'))
bob_pub = bob.read()
bob_public = self.gpg.import_keys(bob_pub)
res = bob_public.results[-1:][0]
bob_pfpr = str(res['fingerprint'])
bob.close()
message = """
In 2010 Riggio and Sicari presented a practical application of homomorphic
encryption to a hybrid wireless sensor/mesh network. The system enables
transparent multi-hop wireless backhauls that are able to perform statistical
analysis of different kinds of data (temperature, humidity, etc.) coming from
a WSN while ensuring both end-to-end encryption and hop-by-hop
authentication."""
enc = self.gpg.encrypt(message, alice_pfpr, bob_pfpr, hidden_recipients=[alice_pfpr])
encrypted = str(enc)
log.debug("keyid = %s"
% alice_pfpr)
self.assertNotEquals(message, encrypted)
## We expect Alice's key to be hidden (returned as zero's) and Bob's
## key to be there.
expected_values = ["0000000000000000", "E0ED97345F2973D6"]
self.assertEquals(expected_values, self.gpg.list_packets(encrypted).encrypted_to)
def test_encryption_throw_keyids(self):
"""Test to ensure throw-keyids=True causes all recipients to be hidden.
"""
alice = open(os.path.join(_files, 'test_key_1.pub'))
alice_pub = alice.read()
alice_public = self.gpg.import_keys(alice_pub)
res = alice_public.results[-1:][0]
alice_pfpr = str(res['fingerprint'])
alice.close()
bob = open(os.path.join(_files, 'test_key_2.pub'))
bob_pub = bob.read()
bob_public = self.gpg.import_keys(bob_pub)
res = bob_public.results[-1:][0]
bob_pfpr = str(res['fingerprint'])
bob.close()
message = """
Pairing-based cryptography has led to several cryptographic advancements. One
of these advancements is more powerful and more efficient non-interactive
zero-knowledge proofs. The seminal idea was to hide the values for the
evaluation of the pairing in a commitment. Using different commitment schemes,
this idea was used to build zero-knowledge proof systems under the sub-group
hiding and under the decisional linear assumption. These proof systems prove
circuit satisfiability, and thus by the CookLevin theorem allow to prove
membership for every language in NP. The size of the common reference string
and the proofs is relatively small, however transforming a statement into a
boolean circuit causes a considerable overhead."""
enc = self.gpg.encrypt(message, alice_pfpr, bob_pfpr, throw_keyids=True)
encrypted = str(enc)
log.debug("keyid = %s"
% alice_pfpr)
self.assertNotEquals(message, encrypted)
## We expect Alice's key to be hidden (returned as zero's) and Bob's
## key to be there.
expected_values = ["0000000000000000", "0000000000000000"]
self.assertEquals(expected_values, self.gpg.list_packets(encrypted).encrypted_to)
def test_encryption_decryption_multi_recipient(self): def test_encryption_decryption_multi_recipient(self):
"""Test decryption of an encrypted string for multiple users""" """Test decryption of an encrypted string for multiple users"""
@ -1077,6 +1149,8 @@ suites = { 'parsers': set(['test_parsers_fix_unsafe',
'test_encryption_alt_encoding', 'test_encryption_alt_encoding',
'test_encryption_multi_recipient', 'test_encryption_multi_recipient',
'test_encryption_decryption_multi_recipient', 'test_encryption_decryption_multi_recipient',
'test_encryption_one_hidden_recipient_one_not',
'test_encryption_throw_keyids',
'test_decryption', 'test_decryption',
'test_symmetric_encryption_and_decryption', 'test_symmetric_encryption_and_decryption',
'test_file_encryption_and_decryption', 'test_file_encryption_and_decryption',