From 7840360bb4b08b813b34cfe2de0ee017b6159776 Mon Sep 17 00:00:00 2001 From: Isis Lovecruft Date: Tue, 2 Apr 2013 10:27:26 +0000 Subject: [PATCH] General code cleanup. * Move exceptions and global parameters to top of file. * Move global function to top of file before classes and alphabetise them. --- gnupg.py | 850 ++++++++++++++++++++++++++++--------------------------- 1 file changed, 427 insertions(+), 423 deletions(-) diff --git a/gnupg.py b/gnupg.py index 2ce6698..4643c89 100644 --- a/gnupg.py +++ b/gnupg.py @@ -89,10 +89,21 @@ try: except NameError: _py3k = True + +ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I) + logger = logging.getLogger(__name__) if not logger.handlers: logger.addHandler(NullHandler()) + +class ProtectedOption(Exception): + """Raised when the option passed to GPG is disallowed.""" + +class UsageError(Exception): + """Raised when you're Doing It Wrong.""" + + def _copy_data(instream, outstream): """ Copy data from one stream to another. @@ -140,396 +151,6 @@ def _copy_data(instream, outstream): else: logger.debug("closed output, %d bytes sent", sent) -def _threaded_copy_data(instream, outstream): - wr = threading.Thread(target=_copy_data, args=(instream, outstream)) - wr.setDaemon(True) - logger.debug('data copier: %r, %r, %r', wr, instream, outstream) - wr.start() - return wr - -def _write_passphrase(stream, passphrase, encoding): - passphrase = '%s\n' % passphrase - passphrase = passphrase.encode(encoding) - stream.write(passphrase) - logger.debug("Wrote passphrase.") - -def _is_sequence(instance): - return isinstance(instance,list) or isinstance(instance,tuple) - -def _make_binary_stream(s, encoding): - try: - if _py3k: - if isinstance(s, str): - s = s.encode(encoding) - else: - if type(s) is not str: - s = s.encode(encoding) - from io import BytesIO - rv = BytesIO(s) - except ImportError: - rv = StringIO(s) - return rv - -class Verify(object): - "Handle status messages for --verify" - - TRUST_UNDEFINED = 0 - TRUST_NEVER = 1 - TRUST_MARGINAL = 2 - TRUST_FULLY = 3 - TRUST_ULTIMATE = 4 - - TRUST_LEVELS = { - "TRUST_UNDEFINED" : TRUST_UNDEFINED, - "TRUST_NEVER" : TRUST_NEVER, - "TRUST_MARGINAL" : TRUST_MARGINAL, - "TRUST_FULLY" : TRUST_FULLY, - "TRUST_ULTIMATE" : TRUST_ULTIMATE, - } - - def __init__(self, gpg): - self.gpg = gpg - self.valid = False - self.fingerprint = self.creation_date = self.timestamp = None - self.signature_id = self.key_id = None - self.username = None - self.status = None - self.pubkey_fingerprint = None - self.expire_timestamp = None - self.sig_timestamp = None - self.trust_text = None - self.trust_level = None - - def __nonzero__(self): - return self.valid - - __bool__ = __nonzero__ - - def handle_status(self, key, value): - if key in self.TRUST_LEVELS: - self.trust_text = key - self.trust_level = self.TRUST_LEVELS[key] - elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT", - "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", - "DECRYPTION_OKAY", "INV_SGNR"): - pass - elif key == "BADSIG": - self.valid = False - self.status = 'signature bad' - self.key_id, self.username = value.split(None, 1) - elif key == "GOODSIG": - self.valid = True - self.status = 'signature good' - self.key_id, self.username = value.split(None, 1) - elif key == "VALIDSIG": - (self.fingerprint, - self.creation_date, - self.sig_timestamp, - self.expire_timestamp) = value.split()[:4] - # may be different if signature is made with a subkey - self.pubkey_fingerprint = value.split()[-1] - self.status = 'signature valid' - elif key == "SIG_ID": - (self.signature_id, - self.creation_date, self.timestamp) = value.split() - elif key == "ERRSIG": - self.valid = False - (self.key_id, - algo, hash_algo, - cls, - self.timestamp) = value.split()[:5] - self.status = 'signature error' - elif key == "DECRYPTION_FAILED": - self.valid = False - self.key_id = value - self.status = 'decryption failed' - elif key == "NO_PUBKEY": - self.valid = False - self.key_id = value - self.status = 'no public key' - elif key in ("KEYEXPIRED", "SIGEXPIRED"): - # these are useless in verify, since they are spit out for any - # pub/subkeys on the key, not just the one doing the signing. - # if we want to check for signatures with expired key, - # the relevant flag is EXPKEYSIG. - pass - elif key in ("EXPKEYSIG", "REVKEYSIG"): - # signed with expired or revoked key - self.valid = False - self.key_id = value.split()[0] - self.status = (('%s %s') % (key[:3], key[3:])).lower() - else: - raise ValueError("Unknown status message: %r" % key) - -class ImportResult(object): - "Handle status messages for --import" - - counts = '''count no_user_id imported imported_rsa unchanged - n_uids n_subk n_sigs n_revoc sec_read sec_imported - sec_dups not_imported'''.split() - def __init__(self, gpg): - self.gpg = gpg - self.imported = [] - self.results = [] - self.fingerprints = [] - for result in self.counts: - setattr(self, result, None) - - def __nonzero__(self): - if self.not_imported: return False - if not self.fingerprints: return False - return True - - __bool__ = __nonzero__ - - ok_reason = { - '0': 'Not actually changed', - '1': 'Entirely new key', - '2': 'New user IDs', - '4': 'New signatures', - '8': 'New subkeys', - '16': 'Contains private key', - } - - problem_reason = { - '0': 'No specific reason given', - '1': 'Invalid Certificate', - '2': 'Issuer Certificate missing', - '3': 'Certificate Chain too long', - '4': 'Error storing certificate', - } - - def handle_status(self, key, value): - if key == "IMPORTED": - # this duplicates info we already see in import_ok & import_problem - pass - elif key == "NODATA": - self.results.append({'fingerprint': None, - 'problem': '0', 'text': 'No valid data found'}) - elif key == "IMPORT_OK": - reason, fingerprint = value.split() - reasons = [] - for code, text in list(self.ok_reason.items()): - if int(reason) | int(code) == int(reason): - reasons.append(text) - reasontext = '\n'.join(reasons) + "\n" - self.results.append({'fingerprint': fingerprint, - 'ok': reason, 'text': reasontext}) - self.fingerprints.append(fingerprint) - elif key == "IMPORT_PROBLEM": - try: - reason, fingerprint = value.split() - except: - reason = value - fingerprint = '' - self.results.append({'fingerprint': fingerprint, - 'problem': reason, 'text': self.problem_reason[reason]}) - elif key == "IMPORT_RES": - import_res = value.split() - for i in range(len(self.counts)): - setattr(self, self.counts[i], int(import_res[i])) - elif key == "KEYEXPIRED": - self.results.append({'fingerprint': None, - 'problem': '0', 'text': 'Key expired'}) - elif key == "SIGEXPIRED": - self.results.append({'fingerprint': None, - 'problem': '0', 'text': 'Signature expired'}) - else: - raise ValueError("Unknown status message: %r" % key) - - def summary(self): - l = [] - l.append('%d imported' % self.imported) - if self.not_imported: - l.append('%d not imported' % self.not_imported) - return ', '.join(l) - -ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I) - -class ListKeys(list): - ''' Handle status messages for --list-keys. - - Handle pub and uid (relating the latter to the former). - - Don't care about (info from src/DETAILS): - - crt = X.509 certificate - crs = X.509 certificate and private key available - ssb = secret subkey (secondary key) - uat = user attribute (same as user id except for field 10). - sig = signature - rev = revocation signature - pkd = public key data (special field format, see below) - grp = reserved for gpgsm - rvk = revocation key - ''' - def __init__(self, gpg): - self.gpg = gpg - self.curkey = None - self.fingerprints = [] - self.uids = [] - - def key(self, args): - vars = (""" - type trust length algo keyid date expires dummy ownertrust uid - """).split() - self.curkey = {} - for i in range(len(vars)): - self.curkey[vars[i]] = args[i] - self.curkey['uids'] = [] - if self.curkey['uid']: - self.curkey['uids'].append(self.curkey['uid']) - del self.curkey['uid'] - self.curkey['subkeys'] = [] - self.append(self.curkey) - - pub = sec = key - - def fpr(self, args): - self.curkey['fingerprint'] = args[9] - self.fingerprints.append(args[9]) - - def uid(self, args): - uid = args[9] - uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) - self.curkey['uids'].append(uid) - self.uids.append(uid) - - def sub(self, args): - subkey = [args[4], args[11]] - self.curkey['subkeys'].append(subkey) - - def handle_status(self, key, value): - pass - -class Crypt(Verify): - "Handle status messages for --encrypt and --decrypt" - def __init__(self, gpg): - Verify.__init__(self, gpg) - self.data = '' - self.ok = False - self.status = '' - - def __nonzero__(self): - if self.ok: return True - return False - - __bool__ = __nonzero__ - - def __str__(self): - return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) - - def handle_status(self, key, value): - if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", - "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", - "CARDCTRL"): - # in the case of ERROR, this is because a more specific error - # message will have come first - pass - elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", - "MISSING_PASSPHRASE", "DECRYPTION_FAILED", - "KEY_NOT_CREATED"): - self.status = key.replace("_", " ").lower() - elif key == "NEED_PASSPHRASE_SYM": - self.status = 'need symmetric passphrase' - elif key == "BEGIN_DECRYPTION": - self.status = 'decryption incomplete' - elif key == "BEGIN_ENCRYPTION": - self.status = 'encryption incomplete' - elif key == "DECRYPTION_OKAY": - self.status = 'decryption ok' - self.ok = True - elif key == "END_ENCRYPTION": - self.status = 'encryption ok' - self.ok = True - elif key == "INV_RECP": - self.status = 'invalid recipient' - elif key == "KEYEXPIRED": - self.status = 'key expired' - elif key == "SIG_CREATED": - self.status = 'sig created' - elif key == "SIGEXPIRED": - self.status = 'sig expired' - else: - Verify.handle_status(self, key, value) - -class GenKey(object): - "Handle status messages for --gen-key" - def __init__(self, gpg): - self.gpg = gpg - self.type = None - self.fingerprint = None - - def __nonzero__(self): - if self.fingerprint: return True - return False - - __bool__ = __nonzero__ - - def __str__(self): - return self.fingerprint or '' - - def handle_status(self, key, value): - if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED"): - pass - elif key == "KEY_CREATED": - (self.type, self.fingerprint) = value.split() - else: - raise ValueError("Unknown status message: %r" % key) - -class DeleteResult(object): - "Handle status messages for --delete-key and --delete-secret-key" - def __init__(self, gpg): - self.gpg = gpg - self.status = 'ok' - - def __str__(self): - return self.status - - problem_reason = { - '1': 'No such key', - '2': 'Must delete secret key first', - '3': 'Ambigious specification', - } - - def handle_status(self, key, value): - if key == "DELETE_PROBLEM": - self.status = self.problem_reason.get(value, "Unknown error: %r" - % value) - else: - raise ValueError("Unknown status message: %r" % key) - -class Sign(object): - "Handle status messages for --sign" - def __init__(self, gpg): - self.gpg = gpg - self.type = None - self.fingerprint = None - - def __nonzero__(self): - return self.fingerprint is not None - - __bool__ = __nonzero__ - - def __str__(self): - return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) - - def handle_status(self, key, value): - if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", - "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR"): - pass - elif key == "SIG_CREATED": - (self.type, algo, hashalgo, cls, self.timestamp, - self.fingerprint) = value.split() - else: - raise ValueError("Unknown status message: %r" % key) - -class ProtectedOption(Exception): - """Raised when the option passed to GPG is disallowed.""" - -class UsageError(Exception): - """Raised when you're Doing It Wrong.""" - def _fix_unsafe(input): """ Find characters used to escape from a string into a shell, and wrap them @@ -548,20 +169,6 @@ def _fix_unsafe(input): except TypeError: return None -def _is_file(input): - """ - Check that the size of the thing which is supposed to be a filename has - size greater than zero, without following symbolic links or using - :func:`os.path.isfile`. - """ - try: - assert os.lstat(input).st_size > 0, "not a file: %s" % input - except (AssertionError, TypeError) as error: - logger.debug(error.message) - return False - else: - return True - def _has_readwrite(path): """ Determine if the real uid/gid of the executing user has read and write @@ -575,25 +182,6 @@ def _has_readwrite(path): """ return os.access(path, os.R_OK and os.W_OK) -def _underscore(input, remove_prefix=False): - """ - Change hyphens to underscores so that GPG option names can be easily - tranlated to object attributes. - - @type input: C{str} - @param input: The input intended for the gnupg process. - - @type remove_prefix: C{bool} - @param remove_prefix: If True, strip leading hyphens from the input. - - @rtype: C{str} - @return: The :param:input with hyphens changed to underscores. - """ - if not remove_prefix: - return input.replace('-', '_') - else: - return input.lstrip('-').replace('-', '_') - def _hyphenate(input, add_prefix=False): """ Change underscores to hyphens so that object attributes can be easily @@ -856,6 +444,37 @@ def _is_allowed(input): return input return None +def _is_file(input): + """ + Check that the size of the thing which is supposed to be a filename has + size greater than zero, without following symbolic links or using + :func:`os.path.isfile`. + """ + try: + assert os.lstat(input).st_size > 0, "not a file: %s" % input + except (AssertionError, TypeError) as error: + logger.debug(error.message) + return False + else: + return True + +def _is_sequence(instance): + return isinstance(instance,list) or isinstance(instance,tuple) + +def _make_binary_stream(s, encoding): + try: + if _py3k: + if isinstance(s, str): + s = s.encode(encoding) + else: + if type(s) is not str: + s = s.encode(encoding) + from io import BytesIO + rv = BytesIO(s) + except ImportError: + rv = StringIO(s) + return rv + def _sanitise(*args): """ Take an arg or the key portion of a kwarg and check that it is in the set @@ -997,6 +616,32 @@ def _sanitise_list(arg_list): if safe_arg != "": yield safe_arg +def _threaded_copy_data(instream, outstream): + wr = threading.Thread(target=_copy_data, args=(instream, outstream)) + wr.setDaemon(True) + logger.debug('data copier: %r, %r, %r', wr, instream, outstream) + wr.start() + return wr + +def _underscore(input, remove_prefix=False): + """ + Change hyphens to underscores so that GPG option names can be easily + tranlated to object attributes. + + @type input: C{str} + @param input: The input intended for the gnupg process. + + @type remove_prefix: C{bool} + @param remove_prefix: If True, strip leading hyphens from the input. + + @rtype: C{str} + @return: The :param:input with hyphens changed to underscores. + """ + if not remove_prefix: + return input.replace('-', '_') + else: + return input.lstrip('-').replace('-', '_') + def _which(executable, flags=os.X_OK): """Borrowed from Twisted's :mod:twisted.python.proutils . @@ -1037,6 +682,365 @@ def _which(executable, flags=os.X_OK): result.append(pext) return result +def _write_passphrase(stream, passphrase, encoding): + passphrase = '%s\n' % passphrase + passphrase = passphrase.encode(encoding) + stream.write(passphrase) + logger.debug("Wrote passphrase.") + + +class Verify(object): + "Handle status messages for --verify" + + TRUST_UNDEFINED = 0 + TRUST_NEVER = 1 + TRUST_MARGINAL = 2 + TRUST_FULLY = 3 + TRUST_ULTIMATE = 4 + + TRUST_LEVELS = { + "TRUST_UNDEFINED" : TRUST_UNDEFINED, + "TRUST_NEVER" : TRUST_NEVER, + "TRUST_MARGINAL" : TRUST_MARGINAL, + "TRUST_FULLY" : TRUST_FULLY, + "TRUST_ULTIMATE" : TRUST_ULTIMATE, + } + + def __init__(self, gpg): + self.gpg = gpg + self.valid = False + self.fingerprint = self.creation_date = self.timestamp = None + self.signature_id = self.key_id = None + self.username = None + self.status = None + self.pubkey_fingerprint = None + self.expire_timestamp = None + self.sig_timestamp = None + self.trust_text = None + self.trust_level = None + + def __nonzero__(self): + return self.valid + + __bool__ = __nonzero__ + + def handle_status(self, key, value): + if key in self.TRUST_LEVELS: + self.trust_text = key + self.trust_level = self.TRUST_LEVELS[key] + elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT", + "PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO", + "DECRYPTION_OKAY", "INV_SGNR"): + pass + elif key == "BADSIG": + self.valid = False + self.status = 'signature bad' + self.key_id, self.username = value.split(None, 1) + elif key == "GOODSIG": + self.valid = True + self.status = 'signature good' + self.key_id, self.username = value.split(None, 1) + elif key == "VALIDSIG": + (self.fingerprint, + self.creation_date, + self.sig_timestamp, + self.expire_timestamp) = value.split()[:4] + # may be different if signature is made with a subkey + self.pubkey_fingerprint = value.split()[-1] + self.status = 'signature valid' + elif key == "SIG_ID": + (self.signature_id, + self.creation_date, self.timestamp) = value.split() + elif key == "ERRSIG": + self.valid = False + (self.key_id, + algo, hash_algo, + cls, + self.timestamp) = value.split()[:5] + self.status = 'signature error' + elif key == "DECRYPTION_FAILED": + self.valid = False + self.key_id = value + self.status = 'decryption failed' + elif key == "NO_PUBKEY": + self.valid = False + self.key_id = value + self.status = 'no public key' + elif key in ("KEYEXPIRED", "SIGEXPIRED"): + # these are useless in verify, since they are spit out for any + # pub/subkeys on the key, not just the one doing the signing. + # if we want to check for signatures with expired key, + # the relevant flag is EXPKEYSIG. + pass + elif key in ("EXPKEYSIG", "REVKEYSIG"): + # signed with expired or revoked key + self.valid = False + self.key_id = value.split()[0] + self.status = (('%s %s') % (key[:3], key[3:])).lower() + else: + raise ValueError("Unknown status message: %r" % key) + +class ImportResult(object): + "Handle status messages for --import" + + counts = '''count no_user_id imported imported_rsa unchanged + n_uids n_subk n_sigs n_revoc sec_read sec_imported + sec_dups not_imported'''.split() + def __init__(self, gpg): + self.gpg = gpg + self.imported = [] + self.results = [] + self.fingerprints = [] + for result in self.counts: + setattr(self, result, None) + + def __nonzero__(self): + if self.not_imported: return False + if not self.fingerprints: return False + return True + + __bool__ = __nonzero__ + + ok_reason = { + '0': 'Not actually changed', + '1': 'Entirely new key', + '2': 'New user IDs', + '4': 'New signatures', + '8': 'New subkeys', + '16': 'Contains private key', + } + + problem_reason = { + '0': 'No specific reason given', + '1': 'Invalid Certificate', + '2': 'Issuer Certificate missing', + '3': 'Certificate Chain too long', + '4': 'Error storing certificate', + } + + def handle_status(self, key, value): + if key == "IMPORTED": + # this duplicates info we already see in import_ok & import_problem + pass + elif key == "NODATA": + self.results.append({'fingerprint': None, + 'problem': '0', 'text': 'No valid data found'}) + elif key == "IMPORT_OK": + reason, fingerprint = value.split() + reasons = [] + for code, text in list(self.ok_reason.items()): + if int(reason) | int(code) == int(reason): + reasons.append(text) + reasontext = '\n'.join(reasons) + "\n" + self.results.append({'fingerprint': fingerprint, + 'ok': reason, 'text': reasontext}) + self.fingerprints.append(fingerprint) + elif key == "IMPORT_PROBLEM": + try: + reason, fingerprint = value.split() + except: + reason = value + fingerprint = '' + self.results.append({'fingerprint': fingerprint, + 'problem': reason, 'text': self.problem_reason[reason]}) + elif key == "IMPORT_RES": + import_res = value.split() + for i in range(len(self.counts)): + setattr(self, self.counts[i], int(import_res[i])) + elif key == "KEYEXPIRED": + self.results.append({'fingerprint': None, + 'problem': '0', 'text': 'Key expired'}) + elif key == "SIGEXPIRED": + self.results.append({'fingerprint': None, + 'problem': '0', 'text': 'Signature expired'}) + else: + raise ValueError("Unknown status message: %r" % key) + + def summary(self): + l = [] + l.append('%d imported' % self.imported) + if self.not_imported: + l.append('%d not imported' % self.not_imported) + return ', '.join(l) + +class ListKeys(list): + ''' Handle status messages for --list-keys. + + Handle pub and uid (relating the latter to the former). + + Don't care about (info from src/DETAILS): + + crt = X.509 certificate + crs = X.509 certificate and private key available + ssb = secret subkey (secondary key) + uat = user attribute (same as user id except for field 10). + sig = signature + rev = revocation signature + pkd = public key data (special field format, see below) + grp = reserved for gpgsm + rvk = revocation key + ''' + def __init__(self, gpg): + self.gpg = gpg + self.curkey = None + self.fingerprints = [] + self.uids = [] + + def key(self, args): + vars = (""" + type trust length algo keyid date expires dummy ownertrust uid + """).split() + self.curkey = {} + for i in range(len(vars)): + self.curkey[vars[i]] = args[i] + self.curkey['uids'] = [] + if self.curkey['uid']: + self.curkey['uids'].append(self.curkey['uid']) + del self.curkey['uid'] + self.curkey['subkeys'] = [] + self.append(self.curkey) + + pub = sec = key + + def fpr(self, args): + self.curkey['fingerprint'] = args[9] + self.fingerprints.append(args[9]) + + def uid(self, args): + uid = args[9] + uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid) + self.curkey['uids'].append(uid) + self.uids.append(uid) + + def sub(self, args): + subkey = [args[4], args[11]] + self.curkey['subkeys'].append(subkey) + + def handle_status(self, key, value): + pass + +class Crypt(Verify): + "Handle status messages for --encrypt and --decrypt" + def __init__(self, gpg): + Verify.__init__(self, gpg) + self.data = '' + self.ok = False + self.status = '' + + def __nonzero__(self): + if self.ok: return True + return False + + __bool__ = __nonzero__ + + def __str__(self): + return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) + + def handle_status(self, key, value): + if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION", + "BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA", + "CARDCTRL"): + # in the case of ERROR, this is because a more specific error + # message will have come first + pass + elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE", + "MISSING_PASSPHRASE", "DECRYPTION_FAILED", + "KEY_NOT_CREATED"): + self.status = key.replace("_", " ").lower() + elif key == "NEED_PASSPHRASE_SYM": + self.status = 'need symmetric passphrase' + elif key == "BEGIN_DECRYPTION": + self.status = 'decryption incomplete' + elif key == "BEGIN_ENCRYPTION": + self.status = 'encryption incomplete' + elif key == "DECRYPTION_OKAY": + self.status = 'decryption ok' + self.ok = True + elif key == "END_ENCRYPTION": + self.status = 'encryption ok' + self.ok = True + elif key == "INV_RECP": + self.status = 'invalid recipient' + elif key == "KEYEXPIRED": + self.status = 'key expired' + elif key == "SIG_CREATED": + self.status = 'sig created' + elif key == "SIGEXPIRED": + self.status = 'sig expired' + else: + Verify.handle_status(self, key, value) + +class GenKey(object): + "Handle status messages for --gen-key" + def __init__(self, gpg): + self.gpg = gpg + self.type = None + self.fingerprint = None + + def __nonzero__(self): + if self.fingerprint: return True + return False + + __bool__ = __nonzero__ + + def __str__(self): + return self.fingerprint or '' + + def handle_status(self, key, value): + if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED"): + pass + elif key == "KEY_CREATED": + (self.type, self.fingerprint) = value.split() + else: + raise ValueError("Unknown status message: %r" % key) + +class DeleteResult(object): + "Handle status messages for --delete-key and --delete-secret-key" + def __init__(self, gpg): + self.gpg = gpg + self.status = 'ok' + + def __str__(self): + return self.status + + problem_reason = { + '1': 'No such key', + '2': 'Must delete secret key first', + '3': 'Ambigious specification', + } + + def handle_status(self, key, value): + if key == "DELETE_PROBLEM": + self.status = self.problem_reason.get(value, "Unknown error: %r" + % value) + else: + raise ValueError("Unknown status message: %r" % key) + +class Sign(object): + "Handle status messages for --sign" + def __init__(self, gpg): + self.gpg = gpg + self.type = None + self.fingerprint = None + + def __nonzero__(self): + return self.fingerprint is not None + + __bool__ = __nonzero__ + + def __str__(self): + return self.data.decode(self.gpg.encoding, self.gpg.decode_errors) + + def handle_status(self, key, value): + if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE", + "GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR"): + pass + elif key == "SIG_CREATED": + (self.type, algo, hashalgo, cls, self.timestamp, + self.fingerprint) = value.split() + else: + raise ValueError("Unknown status message: %r" % key) + class GPG(object): """Encapsulate access to the gpg executable""" decode_errors = 'strict'