diff --git a/gnupg/_meta.py b/gnupg/_meta.py index 53d7620..3f7ac5d 100644 --- a/gnupg/_meta.py +++ b/gnupg/_meta.py @@ -75,19 +75,40 @@ class GPGMeta(type): instance containing the gpg-agent process' information to ``cls._agent_proc``. + For Unix systems, we check that the effective UID of this + ``python-gnupg`` process is also the owner of the gpg-agent + process. For Windows, we check that the usernames of the owners are + the same. (Sorry Windows users; maybe you should switch to anything + else.) + :returns: True if there exists a gpg-agent process running under the same effective user ID as that of this program. Otherwise, - returns None. + returns False. """ - identity = psutil.Process(os.getpid()).uids + this_process = psutil.Process(os.getpid()) + ownership_match = False + + if _util._running_windows: + identity = this_process.username() + else: + identity = this_process.uids + for proc in psutil.process_iter(): if (proc.name == "gpg-agent") and proc.is_running: log.debug("Found gpg-agent process with pid %d" % proc.pid) - if proc.uids == identity: - log.debug( - "Effective UIDs of this process and gpg-agent match") - setattr(cls, '_agent_proc', proc) - return True + if _util._running_windows: + if proc.username() == identity: + ownership_match = True + else: + if proc.uids == identity: + ownership_match = True + + if ownership_match: + log.debug("Effective UIDs of this process and gpg-agent match") + setattr(cls, '_agent_proc', proc) + return True + + return False class GPGBase(object): @@ -159,6 +180,14 @@ class GPGBase(object): self._filesystemencoding = encodings.normalize_encoding( sys.getfilesystemencoding().lower()) + # Issue #49: https://github.com/isislovecruft/python-gnupg/issues/49 + # + # During `line = stream.readline()` in `_read_response()`, the Python + # codecs module will choke on Unicode data, so we globally monkeypatch + # the "strict" error handler to use the builtin `replace_errors` + # handler: + codecs.register_error('strict', codecs.replace_errors) + self._keyserver = 'hkp://wwwkeys.pgp.net' self.__generated_keys = os.path.join(self.homedir, 'generated-keys') @@ -492,11 +521,9 @@ class GPGBase(object): if self.use_agent: cmd.append('--use-agent') else: cmd.append('--no-use-agent') - if self.options: - [cmd.append(opt) for opt in iter(_sanitise_list(self.options))] - if args: - [cmd.append(arg) for arg in iter(_sanitise_list(args))] - + # The arguments for debugging and verbosity should be placed into the + # cmd list before the options/args in order to resolve Issue #76: + # https://github.com/isislovecruft/python-gnupg/issues/76 if self.verbose: cmd.append('--debug-all') @@ -509,6 +536,11 @@ class GPGBase(object): else: cmd.append('--debug-level %s' % self.verbose) + if self.options: + [cmd.append(opt) for opt in iter(_sanitise_list(self.options))] + if args: + [cmd.append(arg) for arg in iter(_sanitise_list(args))] + return cmd def _open_subprocess(self, args=None, passphrase=False): @@ -772,6 +804,8 @@ class GPGBase(object): symmetric=False, always_trust=True, output=None, + throw_keyids=False, + hidden_recipients=None, cipher_algo='AES256', digest_algo='SHA512', compress_algo='ZLIB'): @@ -844,6 +878,14 @@ class GPGBase(object): >>> decrypted 'The crow flies at midnight.' + + :param bool throw_keyids: If True, make all **recipients** keyids be + zero'd out in packet information. This is the same as using + **hidden_recipients** for all **recipients**. (Default: False). + + :param list hidden_recipients: A list of recipients that should have + their keyids zero'd out in packet information. + :param str cipher_algo: The cipher algorithm to use. To see available algorithms with your version of GnuPG, do: :command:`$ gpg --with-colons --list-config @@ -895,6 +937,7 @@ class GPGBase(object): ## is decryptable with a passphrase or secretkey. if symmetric: args.append('--symmetric') if encrypt: args.append('--encrypt') + if throw_keyids: args.append('--throw-keyids') if len(recipients) >= 1: log.debug("GPG.encrypt() called for recipients '%s' with type '%s'" @@ -910,21 +953,27 @@ class GPGBase(object): log.info("Can't accept recipient string: %s" % recp) else: - args.append('--recipient %s' % str(recp)) + self._add_recipient_string(args, hidden_recipients, str(recp)) continue ## will give unicode in 2.x as '\uXXXX\uXXXX' - args.append('--recipient %r' % recp) + if isinstance(hidden_recipients, (list, tuple)): + if [s for s in hidden_recipients if recp in str(s)]: + args.append('--hidden-recipient %r' % recp) + else: + args.append('--recipient %r' % recp) + else: + args.append('--recipient %r' % recp) continue if isinstance(recp, str): - args.append('--recipient %s' % recp) + self._add_recipient_string(args, hidden_recipients, recp) elif (not _util._py3k) and isinstance(recp, basestring): for recp in recipients.split('\x20'): - args.append('--recipient %s' % recp) + self._add_recipient_string(args, hidden_recipients, recp) elif _util._py3k and isinstance(recp, str): for recp in recipients.split(' '): - args.append('--recipient %s' % recp) + self._add_recipient_string(args, hidden_recipients, recp) ## ...and now that we've proven py3k is better... else: @@ -946,3 +995,12 @@ class GPGBase(object): log.info("Encrypted output written successfully.") return result + + def _add_recipient_string(self, args, hidden_recipients, recipient): + if isinstance(hidden_recipients, (list, tuple)): + if [s for s in hidden_recipients if recipient in str(s)]: + args.append('--hidden-recipient %s' % recipient) + else: + args.append('--recipient %s' % recipient) + else: + args.append('--recipient %s' % recipient) diff --git a/gnupg/_parsers.py b/gnupg/_parsers.py index 3dd4aad..93da10b 100644 --- a/gnupg/_parsers.py +++ b/gnupg/_parsers.py @@ -959,7 +959,6 @@ class ListKeys(list): | crs = X.509 certificate and private key available | ssb = secret subkey (secondary key) | uat = user attribute (same as user id except for field 10). - | sig = signature | rev = revocation signature | pkd = public key data (special field format, see below) | grp = reserved for gpgsm @@ -970,8 +969,10 @@ class ListKeys(list): super(ListKeys, self).__init__() self._gpg = gpg self.curkey = None + self.curuid = None self.fingerprints = [] self.uids = [] + self.sigs = {} def key(self, args): vars = (""" @@ -981,8 +982,12 @@ class ListKeys(list): for i in range(len(vars)): self.curkey[vars[i]] = args[i] self.curkey['uids'] = [] + self.curkey['sigs'] = {} if self.curkey['uid']: - self.curkey['uids'].append(self.curkey['uid']) + self.curuid = self.curkey['uid'] + self.curkey['uids'].append(self.curuid) + self.sigs[self.curuid] = set() + self.curkey['sigs'][self.curuid] = [] del self.curkey['uid'] self.curkey['subkeys'] = [] self.append(self.curkey) @@ -997,8 +1002,21 @@ class ListKeys(list): uid = args[9] uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) self.curkey['uids'].append(uid) + self.curuid = uid + self.curkey['sigs'][uid] = [] + self.sigs[uid] = set() self.uids.append(uid) + def sig(self, args): + vars = (""" + type trust length algo keyid date expires dummy ownertrust uid + """).split() + sig = {} + for i in range(len(vars)): + sig[vars[i]] = args[i] + self.curkey['sigs'][self.curuid].append(sig) + self.sigs[self.curuid].add(sig['keyid']) + def sub(self, args): subkey = [args[4], args[11]] self.curkey['subkeys'].append(subkey) @@ -1008,42 +1026,52 @@ class ListKeys(list): class ImportResult(object): - """Parse GnuPG status messages for key import operations. - - :type gpg: :class:`gnupg.GPG` - :param gpg: An instance of :class:`gnupg.GPG`. - """ - _ok_reason = {'0': 'Not actually changed', - '1': 'Entirely new key', - '2': 'New user IDs', - '4': 'New signatures', - '8': 'New subkeys', - '16': 'Contains private key', - '17': 'Contains private key',} - - _problem_reason = { '0': 'No specific reason given', - '1': 'Invalid Certificate', - '2': 'Issuer Certificate missing', - '3': 'Certificate Chain too long', - '4': 'Error storing certificate', } - - _fields = '''count no_user_id imported imported_rsa unchanged - n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups - not_imported'''.split() - _counts = OrderedDict( - zip(_fields, [int(0) for x in range(len(_fields))]) ) - - #: A list of strings containing the fingerprints of the GnuPG keyIDs - #: imported. - fingerprints = list() - - #: A list containing dictionaries with information gathered on keys - #: imported. - results = list() + """Parse GnuPG status messages for key import operations.""" def __init__(self, gpg): + """Start parsing the results of a key import operation. + + :type gpg: :class:`gnupg.GPG` + :param gpg: An instance of :class:`gnupg.GPG`. + """ self._gpg = gpg - self.counts = self._counts + + #: A map from GnuPG codes shown with the ``IMPORT_OK`` status message + #: to their human-meaningful English equivalents. + self._ok_reason = {'0': 'Not actually changed', + '1': 'Entirely new key', + '2': 'New user IDs', + '4': 'New signatures', + '8': 'New subkeys', + '16': 'Contains private key', + '17': 'Contains private key',} + + #: A map from GnuPG codes shown with the ``IMPORT_PROBLEM`` status + #: message to their human-meaningful English equivalents. + self._problem_reason = { '0': 'No specific reason given', + '1': 'Invalid Certificate', + '2': 'Issuer Certificate missing', + '3': 'Certificate Chain too long', + '4': 'Error storing certificate', } + + #: All the possible status messages pertaining to actions taken while + #: importing a key. + self._fields = '''count no_user_id imported imported_rsa unchanged + n_uids n_subk n_sigs n_revoc sec_read sec_imported sec_dups + not_imported'''.split() + + #: Counts of all the status message results, :data:`_fields` which + #: have appeared. + self.counts = OrderedDict( + zip(self._fields, [int(0) for x in range(len(self._fields))])) + + #: A list of strings containing the fingerprints of the GnuPG keyIDs + #: imported. + self.fingerprints = list() + + #: A list containing dictionaries with information gathered on keys + #: imported. + self.results = list() def __nonzero__(self): """Override the determination for truthfulness evaluation. @@ -1059,7 +1087,7 @@ class ImportResult(object): def _handle_status(self, key, value): """Parse a status code from the attached GnuPG process. - :raises: :exc:`~exceptions.ValueError` if the status message is unknown. + :raises ValueError: if the status message is unknown. """ if key == "IMPORTED": # this duplicates info we already see in import_ok & import_problem @@ -1192,6 +1220,37 @@ class Verify(object): self.trust_level = None #: The string corresponding to the ``trust_level`` number. self.trust_text = None + #: The subpackets. These are stored as a dictionary, in the following + #: form: + #: Verify.subpackets = {'SUBPACKET_NUMBER': {'flags': FLAGS, + #: 'length': LENGTH, + #: 'data': DATA}, + #: 'ANOTHER_SUBPACKET_NUMBER': {...}} + self.subpackets = {} + #: The signature or key notations. These are also stored as a + #: dictionary, in the following form: + #: + #: Verify.notations = {NOTATION_NAME: NOTATION_DATA} + #: + #: For example, the Bitcoin core developer, Peter Todd, encodes in + #: every signature the header of the latest block on the Bitcoin + #: blockchain (to prove that a GnuPG signature that Peter made was made + #: *after* a specific point in time). These look like: + #: + #: gpg: Signature notation: blockhash@bitcoin.org=000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a + #: + #: Which python-gnupg would store as: + #: + #: Verify.notations['blockhash@bitcoin.org'] = '000000000000000006f793d4461ee3e756ff04cc62581c96a42ed67dc233da3a' + self.notations = {} + + #: This will be a str or None. If not None, it is the last + #: ``NOTATION_NAME`` we stored in the ``notations`` dict. Because we're + #: not assured that a ``NOTATION_DATA`` status will arrive *immediately* + #: after its corresponding ``NOTATION_NAME``, we store the latest + #: ``NOTATION_NAME`` here until we get its corresponding + #: ``NOTATION_DATA``. + self._last_notation_name = None def __nonzero__(self): """Override the determination for truthfulness evaluation. @@ -1212,7 +1271,7 @@ class Verify(object): self.trust_level = self.TRUST_LEVELS[key] elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT", "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", - "DECRYPTION_OKAY", "INV_SGNR"): + "DECRYPTION_OKAY", "INV_SGNR", "PROGRESS"): pass elif key == "BADSIG": self.valid = False @@ -1290,6 +1349,65 @@ class Verify(object): # status message): elif key in ("KEYREVOKED"): self.status = '\n'.join([self.status, "key revoked"]) + # SIG_SUBPACKET + # This indicates that a signature subpacket was seen. The format is + # the same as the "spk" record above. + # + # [...] + # + # SPK - Signature subpacket records + # + # - Field 2 :: Subpacket number as per RFC-4880 and later. + # - Field 3 :: Flags in hex. Currently the only two bits assigned + # are 1, to indicate that the subpacket came from the + # hashed part of the signature, and 2, to indicate the + # subpacket was marked critical. + # - Field 4 :: Length of the subpacket. Note that this is the + # length of the subpacket, and not the length of field + # 5 below. Due to the need for %-encoding, the length + # of field 5 may be up to 3x this value. + # - Field 5 :: The subpacket data. Printable ASCII is shown as + # ASCII, but other values are rendered as %XX where XX + # is the hex value for the byte. + elif key in ("SIG_SUBPACKET"): + fields = value.split() + try: + subpacket_number = fields[0] + self.subpackets[subpacket_number] = {'flags': None, + 'length': None, + 'data': None} + except IndexError: + # We couldn't parse the subpacket type (an RFC4880 + # identifier), so we shouldn't continue parsing. + pass + else: + # Pull as much data as we can parse out of the subpacket: + try: + self.subpackets[subpacket_number]['flags'] = fields[1] + self.subpackets[subpacket_number]['length'] = fields[2] + self.subpackets[subpacket_number]['data'] = fields[3] + except IndexError: + pass + # NOTATION_ + # There are actually two related status codes to convey notation + # data: + # + # - NOTATION_NAME + # - NOTATION_DATA + # + # and are %XX escaped; the data may be split among + # several NOTATION_DATA lines. + elif key.startswith("NOTATION_"): + if key.endswith("NAME"): + self.notations[value] = str() + self._last_notation_name = value + elif key.endswith("DATA"): + if self._last_notation_name is not None: + # Append the NOTATION_DATA to any previous data we + # received for that NOTATION_NAME: + self.notations[self._last_notation_name] += value + else: + pass else: raise ValueError("Unknown status message: %r" % key) @@ -1394,6 +1512,12 @@ class ListPackets(object): self.need_passphrase_sym = None #: The keyid and uid which this data is encrypted to. self.userid_hint = None + #: The first key that we detected that a message was encrypted + #: to. This is provided for backwards compatibility. As of Issue #77_, + #: the ``encrypted_to`` attribute should be used instead. + self.key = None + #: A list of keyid's that the message has been encrypted to. + self.encrypted_to = [] def _handle_status(self, key, value): """Parse a status code from the attached GnuPG process. @@ -1403,9 +1527,10 @@ class ListPackets(object): if key == 'NODATA': self.status = nodata(value) elif key == 'ENC_TO': - # This will only capture keys in our keyring. In the future we - # may want to include multiple unknown keys in this list. - self.key, _, _ = value.split() + key, _, _ = value.split() + if not self.key: + self.key = key + self.encrypted_to.append(key) elif key == 'NEED_PASSPHRASE': self.need_passphrase = True elif key == 'NEED_PASSPHRASE_SYM': diff --git a/gnupg/_trust.py b/gnupg/_trust.py index 514ae8c..224e7b6 100644 --- a/gnupg/_trust.py +++ b/gnupg/_trust.py @@ -57,7 +57,7 @@ def export_ownertrust(cls, trustdb=None): except (OSError, IOError) as err: log.debug(str(err)) - export_proc = cls._open_subprocess('--export-ownertrust') + export_proc = cls._open_subprocess(['--export-ownertrust']) tdb = open(trustdb, 'wb') _util._threaded_copy_data(export_proc.stdout, tdb) @@ -71,7 +71,7 @@ def import_ownertrust(self, trustdb=None): if trustdb is None: trustdb = os.path.join(cls.homedir, 'trustdb.gpg') - import_proc = cls._open_subprocess('--import-ownertrust') + import_proc = cls._open_subprocess(['--import-ownertrust']) tdb = open(trustdb, 'rb') _util._threaded_copy_data(tdb, import_proc.stdin) @@ -98,6 +98,6 @@ def fix_trustdb(cls, trustdb=None): """ if trustdb is None: trustdb = os.path.join(cls.homedir, 'trustdb.gpg') - export_proc = cls._open_subprocess('--export-ownertrust') - import_proc = cls._open_subprocess('--import-ownertrust') + export_proc = cls._open_subprocess(['--export-ownertrust']) + import_proc = cls._open_subprocess(['--import-ownertrust']) _util._threaded_copy_data(export_proc.stdout, import_proc.stdin) diff --git a/gnupg/_util.py b/gnupg/_util.py index 9f0a5c1..fff8c0c 100644 --- a/gnupg/_util.py +++ b/gnupg/_util.py @@ -56,6 +56,9 @@ try: except NameError: _py3k = True +_running_windows = False +if "win" in sys.platform: + _running_windows = True ## Directory shortcuts: ## we don't want to use this one because it writes to the install dir: @@ -63,6 +66,20 @@ except NameError: _here = os.path.join(os.getcwd(), 'gnupg') ## current dir _test = os.path.join(os.path.join(_here, 'test'), 'tmp') ## ./tests/tmp _user = os.environ.get('HOME') ## $HOME + +# Fix for Issue #74: we shouldn't expect that a $HOME directory is set in all +# environs. https://github.com/isislovecruft/python-gnupg/issues/74 +if not _user: + _user = '/tmp/python-gnupg' + try: + os.makedirs(_user) + except (OSError, IOError): + _user = os.getcwd() + # If we can't use $HOME, but we have (or can create) a + # /tmp/python-gnupg/gnupghome directory, then we'll default to using + # that. Otherwise, we'll use the current directory + /gnupghome. + _user = os.path.sep.join([_user, 'gnupghome']) + _ugpg = os.path.join(_user, '.gnupg') ## $HOME/.gnupg _conf = os.path.join(os.path.join(_user, '.config'), 'python-gnupg') ## $HOME/.config/python-gnupg @@ -277,7 +294,7 @@ def _find_binary(binary=None): elif os.access(binary, os.X_OK): found = binary if found is None: - try: found = _which('gpg')[0] + try: found = _which('gpg', abspath_only=True, disallow_symlinks=True)[0] except IndexError as ie: log.error("Could not find binary for 'gpg'.") try: found = _which('gpg2')[0] @@ -286,14 +303,7 @@ def _find_binary(binary=None): if found is None: raise RuntimeError("GnuPG is not installed!") - try: - assert os.path.isabs(found), "Path to gpg binary not absolute" - assert not os.path.islink(found), "Path to gpg binary is symlink" - assert os.access(found, os.X_OK), "Lacking +x perms for gpg binary" - except (AssertionError, AttributeError) as ae: - log.error(str(ae)) - else: - return found + return found def _has_readwrite(path): """ @@ -489,7 +499,7 @@ def _utc_epoch(): """Get the seconds since epoch.""" return int(mktime(localtime())) -def _which(executable, flags=os.X_OK): +def _which(executable, flags=os.X_OK, abspath_only=False, disallow_symlinks=False): """Borrowed from Twisted's :mod:twisted.python.proutils . Search PATH for executable files with the given name. @@ -512,6 +522,17 @@ def _which(executable, flags=os.X_OK): :returns: A list of the full paths to files found, in the order in which they were found. """ + def _can_allow(p): + if not os.access(p, flags): + return False + if abspath_only and not os.path.abspath(p): + log.warn('Ignoring %r (path is not absolute)', p) + return False + if disallow_symlinks and os.path.islink(p): + log.warn('Ignoring %r (path is a symlink)', p) + return False + return True + result = [] exts = filter(None, os.environ.get('PATHEXT', '').split(os.pathsep)) path = os.environ.get('PATH', None) @@ -519,11 +540,11 @@ def _which(executable, flags=os.X_OK): return [] for p in os.environ.get('PATH', '').split(os.pathsep): p = os.path.join(p, executable) - if os.access(p, flags): + if _can_allow(p): result.append(p) for e in exts: pext = p + e - if os.access(pext, flags): + if _can_allow(pext): result.append(pext) return result diff --git a/gnupg/_version.py b/gnupg/_version.py index 42d41a2..ff8ef94 100644 --- a/gnupg/_version.py +++ b/gnupg/_version.py @@ -179,9 +179,9 @@ def versions_from_parentdir(parentdir_prefix, versionfile_source, verbose=False) return None return {"version": dirname[len(parentdir_prefix):], "full": ""} -tag_prefix = "python-gnupg-" -parentdir_prefix = "python-gnupg-" -versionfile_source = "src/_version.py" +tag_prefix = "" +parentdir_prefix = "gnupg-" +versionfile_source = "gnupg/_version.py" def get_versions(default={"version": "unknown", "full": ""}, verbose=False): variables = { "refnames": git_refnames, "full": git_full } diff --git a/gnupg/gnupg.py b/gnupg/gnupg.py index 42498db..7168017 100644 --- a/gnupg/gnupg.py +++ b/gnupg/gnupg.py @@ -471,19 +471,7 @@ class GPG(GPGBase): self._collect_output(p, result, stdin=p.stdin) lines = result.data.decode(self._encoding, self._decode_errors).splitlines() - valid_keywords = 'pub uid sec fpr sub'.split() - for line in lines: - if self.verbose: - print(line) - log.debug("%r", line.rstrip()) - if not line: - break - L = line.strip().split(':') - if not L: - continue - keyword = L[0] - if keyword in valid_keywords: - getattr(result, keyword)(L) + self._parse_keys(result) return result def list_packets(self, raw_data): @@ -504,8 +492,8 @@ class GPG(GPGBase): >>> assert key.fingerprint :rtype: dict - :returns: A dictionary whose keys are the original keyid parameters, - and whose values are lists of signatures. + :returns: res.sigs is a dictionary whose keys are the uids and whose + values are a set of signature keyids. """ if len(keyids) > self._batch_limit: raise ValueError( @@ -520,8 +508,26 @@ class GPG(GPGBase): proc = self._open_subprocess(args) result = self._result_map['list'](self) self._collect_output(proc, result, stdin=proc.stdin) + self._parse_keys(result) return result + def _parse_keys(self, result): + lines = result.data.decode(self._encoding, + self._decode_errors).splitlines() + valid_keywords = 'pub uid sec fpr sub sig'.split() + for line in lines: + if self.verbose: + print(line) + log.debug("%r", line.rstrip()) + if not line: + break + L = line.strip().split(':') + if not L: + continue + keyword = L[0] + if keyword in valid_keywords: + getattr(result, keyword)(L) + def gen_key(self, input): """Generate a GnuPG key through batch file key generation. See :meth:`GPG.gen_key_input()` for creating the control input. @@ -924,6 +930,13 @@ generate keys. Please see 'The crow flies at midnight.' + :param bool throw_keyids: If True, make all **recipients** keyids be + zero'd out in packet information. This is the same as using + **hidden_recipients** for all **recipients**. (Default: False). + + :param list hidden_recipients: A list of recipients that should have + their keyids zero'd out in packet information. + :param str cipher_algo: The cipher algorithm to use. To see available algorithms with your version of GnuPG, do: :command:`$ gpg --with-colons --list-config ciphername`. diff --git a/gnupg/test/test_gnupg.py b/gnupg/test/test_gnupg.py index 6a352a6..49f5ba5 100755 --- a/gnupg/test/test_gnupg.py +++ b/gnupg/test/test_gnupg.py @@ -731,8 +731,6 @@ class GPGTestCase(unittest.TestCase): sigfd.close() self.assertTrue(sigfd.closed, "Sigfile '%s' should be closed" % sigfn) - with self.assertRaises(UnicodeDecodeError): - print("SIG=%s" % sig) datafd.seek(0) verification = self.gpg.verify_file(datafd, sig_file=sigfn) @@ -887,6 +885,80 @@ authentication.""" self.assertEqual(message, decrypted) + def test_encryption_one_hidden_recipient_one_not(self): + """Test to ensure hidden recipient isn't detailed in packet info""" + + alice = open(os.path.join(_files, 'test_key_1.pub')) + alice_pub = alice.read() + alice_public = self.gpg.import_keys(alice_pub) + res = alice_public.results[-1:][0] + alice_pfpr = str(res['fingerprint']) + alice.close() + + bob = open(os.path.join(_files, 'test_key_2.pub')) + bob_pub = bob.read() + bob_public = self.gpg.import_keys(bob_pub) + res = bob_public.results[-1:][0] + bob_pfpr = str(res['fingerprint']) + bob.close() + + message = """ +In 2010 Riggio and Sicari presented a practical application of homomorphic +encryption to a hybrid wireless sensor/mesh network. The system enables +transparent multi-hop wireless backhauls that are able to perform statistical +analysis of different kinds of data (temperature, humidity, etc.) coming from +a WSN while ensuring both end-to-end encryption and hop-by-hop +authentication.""" + enc = self.gpg.encrypt(message, alice_pfpr, bob_pfpr, hidden_recipients=[alice_pfpr]) + encrypted = str(enc) + log.debug("keyid = %s" + % alice_pfpr) + + self.assertNotEquals(message, encrypted) + ## We expect Alice's key to be hidden (returned as zero's) and Bob's + ## key to be there. + expected_values = ["0000000000000000", "E0ED97345F2973D6"] + self.assertEquals(expected_values, self.gpg.list_packets(encrypted).encrypted_to) + + def test_encryption_throw_keyids(self): + """Test to ensure throw-keyids=True causes all recipients to be hidden. + """ + alice = open(os.path.join(_files, 'test_key_1.pub')) + alice_pub = alice.read() + alice_public = self.gpg.import_keys(alice_pub) + res = alice_public.results[-1:][0] + alice_pfpr = str(res['fingerprint']) + alice.close() + + bob = open(os.path.join(_files, 'test_key_2.pub')) + bob_pub = bob.read() + bob_public = self.gpg.import_keys(bob_pub) + res = bob_public.results[-1:][0] + bob_pfpr = str(res['fingerprint']) + bob.close() + + message = """ +Pairing-based cryptography has led to several cryptographic advancements. One +of these advancements is more powerful and more efficient non-interactive +zero-knowledge proofs. The seminal idea was to hide the values for the +evaluation of the pairing in a commitment. Using different commitment schemes, +this idea was used to build zero-knowledge proof systems under the sub-group +hiding and under the decisional linear assumption. These proof systems prove +circuit satisfiability, and thus by the Cook–Levin theorem allow to prove +membership for every language in NP. The size of the common reference string +and the proofs is relatively small, however transforming a statement into a +boolean circuit causes a considerable overhead.""" + enc = self.gpg.encrypt(message, alice_pfpr, bob_pfpr, throw_keyids=True) + encrypted = str(enc) + log.debug("keyid = %s" + % alice_pfpr) + + self.assertNotEquals(message, encrypted) + ## We expect Alice's key to be hidden (returned as zero's) and Bob's + ## key to be there. + expected_values = ["0000000000000000", "0000000000000000"] + self.assertEquals(expected_values, self.gpg.list_packets(encrypted).encrypted_to) + def test_encryption_decryption_multi_recipient(self): """Test decryption of an encrypted string for multiple users""" @@ -1077,6 +1149,8 @@ suites = { 'parsers': set(['test_parsers_fix_unsafe', 'test_encryption_alt_encoding', 'test_encryption_multi_recipient', 'test_encryption_decryption_multi_recipient', + 'test_encryption_one_hidden_recipient_one_not', + 'test_encryption_throw_keyids', 'test_decryption', 'test_symmetric_encryption_and_decryption', 'test_file_encryption_and_decryption',