General code cleanup.
* Move exceptions and global parameters to top of file. * Move global function to top of file before classes and alphabetise them.feature/documentation-builds-dirhtml
parent
b8fd74dde3
commit
7840360bb4
850
gnupg.py
850
gnupg.py
|
@ -89,10 +89,21 @@ try:
|
|||
except NameError:
|
||||
_py3k = True
|
||||
|
||||
|
||||
ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
if not logger.handlers:
|
||||
logger.addHandler(NullHandler())
|
||||
|
||||
|
||||
class ProtectedOption(Exception):
|
||||
"""Raised when the option passed to GPG is disallowed."""
|
||||
|
||||
class UsageError(Exception):
|
||||
"""Raised when you're Doing It Wrong."""
|
||||
|
||||
|
||||
def _copy_data(instream, outstream):
|
||||
"""
|
||||
Copy data from one stream to another.
|
||||
|
@ -140,396 +151,6 @@ def _copy_data(instream, outstream):
|
|||
else:
|
||||
logger.debug("closed output, %d bytes sent", sent)
|
||||
|
||||
def _threaded_copy_data(instream, outstream):
|
||||
wr = threading.Thread(target=_copy_data, args=(instream, outstream))
|
||||
wr.setDaemon(True)
|
||||
logger.debug('data copier: %r, %r, %r', wr, instream, outstream)
|
||||
wr.start()
|
||||
return wr
|
||||
|
||||
def _write_passphrase(stream, passphrase, encoding):
|
||||
passphrase = '%s\n' % passphrase
|
||||
passphrase = passphrase.encode(encoding)
|
||||
stream.write(passphrase)
|
||||
logger.debug("Wrote passphrase.")
|
||||
|
||||
def _is_sequence(instance):
|
||||
return isinstance(instance,list) or isinstance(instance,tuple)
|
||||
|
||||
def _make_binary_stream(s, encoding):
|
||||
try:
|
||||
if _py3k:
|
||||
if isinstance(s, str):
|
||||
s = s.encode(encoding)
|
||||
else:
|
||||
if type(s) is not str:
|
||||
s = s.encode(encoding)
|
||||
from io import BytesIO
|
||||
rv = BytesIO(s)
|
||||
except ImportError:
|
||||
rv = StringIO(s)
|
||||
return rv
|
||||
|
||||
class Verify(object):
|
||||
"Handle status messages for --verify"
|
||||
|
||||
TRUST_UNDEFINED = 0
|
||||
TRUST_NEVER = 1
|
||||
TRUST_MARGINAL = 2
|
||||
TRUST_FULLY = 3
|
||||
TRUST_ULTIMATE = 4
|
||||
|
||||
TRUST_LEVELS = {
|
||||
"TRUST_UNDEFINED" : TRUST_UNDEFINED,
|
||||
"TRUST_NEVER" : TRUST_NEVER,
|
||||
"TRUST_MARGINAL" : TRUST_MARGINAL,
|
||||
"TRUST_FULLY" : TRUST_FULLY,
|
||||
"TRUST_ULTIMATE" : TRUST_ULTIMATE,
|
||||
}
|
||||
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.valid = False
|
||||
self.fingerprint = self.creation_date = self.timestamp = None
|
||||
self.signature_id = self.key_id = None
|
||||
self.username = None
|
||||
self.status = None
|
||||
self.pubkey_fingerprint = None
|
||||
self.expire_timestamp = None
|
||||
self.sig_timestamp = None
|
||||
self.trust_text = None
|
||||
self.trust_level = None
|
||||
|
||||
def __nonzero__(self):
|
||||
return self.valid
|
||||
|
||||
__bool__ = __nonzero__
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key in self.TRUST_LEVELS:
|
||||
self.trust_text = key
|
||||
self.trust_level = self.TRUST_LEVELS[key]
|
||||
elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT",
|
||||
"PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
|
||||
"DECRYPTION_OKAY", "INV_SGNR"):
|
||||
pass
|
||||
elif key == "BADSIG":
|
||||
self.valid = False
|
||||
self.status = 'signature bad'
|
||||
self.key_id, self.username = value.split(None, 1)
|
||||
elif key == "GOODSIG":
|
||||
self.valid = True
|
||||
self.status = 'signature good'
|
||||
self.key_id, self.username = value.split(None, 1)
|
||||
elif key == "VALIDSIG":
|
||||
(self.fingerprint,
|
||||
self.creation_date,
|
||||
self.sig_timestamp,
|
||||
self.expire_timestamp) = value.split()[:4]
|
||||
# may be different if signature is made with a subkey
|
||||
self.pubkey_fingerprint = value.split()[-1]
|
||||
self.status = 'signature valid'
|
||||
elif key == "SIG_ID":
|
||||
(self.signature_id,
|
||||
self.creation_date, self.timestamp) = value.split()
|
||||
elif key == "ERRSIG":
|
||||
self.valid = False
|
||||
(self.key_id,
|
||||
algo, hash_algo,
|
||||
cls,
|
||||
self.timestamp) = value.split()[:5]
|
||||
self.status = 'signature error'
|
||||
elif key == "DECRYPTION_FAILED":
|
||||
self.valid = False
|
||||
self.key_id = value
|
||||
self.status = 'decryption failed'
|
||||
elif key == "NO_PUBKEY":
|
||||
self.valid = False
|
||||
self.key_id = value
|
||||
self.status = 'no public key'
|
||||
elif key in ("KEYEXPIRED", "SIGEXPIRED"):
|
||||
# these are useless in verify, since they are spit out for any
|
||||
# pub/subkeys on the key, not just the one doing the signing.
|
||||
# if we want to check for signatures with expired key,
|
||||
# the relevant flag is EXPKEYSIG.
|
||||
pass
|
||||
elif key in ("EXPKEYSIG", "REVKEYSIG"):
|
||||
# signed with expired or revoked key
|
||||
self.valid = False
|
||||
self.key_id = value.split()[0]
|
||||
self.status = (('%s %s') % (key[:3], key[3:])).lower()
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
class ImportResult(object):
|
||||
"Handle status messages for --import"
|
||||
|
||||
counts = '''count no_user_id imported imported_rsa unchanged
|
||||
n_uids n_subk n_sigs n_revoc sec_read sec_imported
|
||||
sec_dups not_imported'''.split()
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.imported = []
|
||||
self.results = []
|
||||
self.fingerprints = []
|
||||
for result in self.counts:
|
||||
setattr(self, result, None)
|
||||
|
||||
def __nonzero__(self):
|
||||
if self.not_imported: return False
|
||||
if not self.fingerprints: return False
|
||||
return True
|
||||
|
||||
__bool__ = __nonzero__
|
||||
|
||||
ok_reason = {
|
||||
'0': 'Not actually changed',
|
||||
'1': 'Entirely new key',
|
||||
'2': 'New user IDs',
|
||||
'4': 'New signatures',
|
||||
'8': 'New subkeys',
|
||||
'16': 'Contains private key',
|
||||
}
|
||||
|
||||
problem_reason = {
|
||||
'0': 'No specific reason given',
|
||||
'1': 'Invalid Certificate',
|
||||
'2': 'Issuer Certificate missing',
|
||||
'3': 'Certificate Chain too long',
|
||||
'4': 'Error storing certificate',
|
||||
}
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key == "IMPORTED":
|
||||
# this duplicates info we already see in import_ok & import_problem
|
||||
pass
|
||||
elif key == "NODATA":
|
||||
self.results.append({'fingerprint': None,
|
||||
'problem': '0', 'text': 'No valid data found'})
|
||||
elif key == "IMPORT_OK":
|
||||
reason, fingerprint = value.split()
|
||||
reasons = []
|
||||
for code, text in list(self.ok_reason.items()):
|
||||
if int(reason) | int(code) == int(reason):
|
||||
reasons.append(text)
|
||||
reasontext = '\n'.join(reasons) + "\n"
|
||||
self.results.append({'fingerprint': fingerprint,
|
||||
'ok': reason, 'text': reasontext})
|
||||
self.fingerprints.append(fingerprint)
|
||||
elif key == "IMPORT_PROBLEM":
|
||||
try:
|
||||
reason, fingerprint = value.split()
|
||||
except:
|
||||
reason = value
|
||||
fingerprint = '<unknown>'
|
||||
self.results.append({'fingerprint': fingerprint,
|
||||
'problem': reason, 'text': self.problem_reason[reason]})
|
||||
elif key == "IMPORT_RES":
|
||||
import_res = value.split()
|
||||
for i in range(len(self.counts)):
|
||||
setattr(self, self.counts[i], int(import_res[i]))
|
||||
elif key == "KEYEXPIRED":
|
||||
self.results.append({'fingerprint': None,
|
||||
'problem': '0', 'text': 'Key expired'})
|
||||
elif key == "SIGEXPIRED":
|
||||
self.results.append({'fingerprint': None,
|
||||
'problem': '0', 'text': 'Signature expired'})
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
def summary(self):
|
||||
l = []
|
||||
l.append('%d imported' % self.imported)
|
||||
if self.not_imported:
|
||||
l.append('%d not imported' % self.not_imported)
|
||||
return ', '.join(l)
|
||||
|
||||
ESCAPE_PATTERN = re.compile(r'\\x([0-9a-f][0-9a-f])', re.I)
|
||||
|
||||
class ListKeys(list):
|
||||
''' Handle status messages for --list-keys.
|
||||
|
||||
Handle pub and uid (relating the latter to the former).
|
||||
|
||||
Don't care about (info from src/DETAILS):
|
||||
|
||||
crt = X.509 certificate
|
||||
crs = X.509 certificate and private key available
|
||||
ssb = secret subkey (secondary key)
|
||||
uat = user attribute (same as user id except for field 10).
|
||||
sig = signature
|
||||
rev = revocation signature
|
||||
pkd = public key data (special field format, see below)
|
||||
grp = reserved for gpgsm
|
||||
rvk = revocation key
|
||||
'''
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.curkey = None
|
||||
self.fingerprints = []
|
||||
self.uids = []
|
||||
|
||||
def key(self, args):
|
||||
vars = ("""
|
||||
type trust length algo keyid date expires dummy ownertrust uid
|
||||
""").split()
|
||||
self.curkey = {}
|
||||
for i in range(len(vars)):
|
||||
self.curkey[vars[i]] = args[i]
|
||||
self.curkey['uids'] = []
|
||||
if self.curkey['uid']:
|
||||
self.curkey['uids'].append(self.curkey['uid'])
|
||||
del self.curkey['uid']
|
||||
self.curkey['subkeys'] = []
|
||||
self.append(self.curkey)
|
||||
|
||||
pub = sec = key
|
||||
|
||||
def fpr(self, args):
|
||||
self.curkey['fingerprint'] = args[9]
|
||||
self.fingerprints.append(args[9])
|
||||
|
||||
def uid(self, args):
|
||||
uid = args[9]
|
||||
uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
|
||||
self.curkey['uids'].append(uid)
|
||||
self.uids.append(uid)
|
||||
|
||||
def sub(self, args):
|
||||
subkey = [args[4], args[11]]
|
||||
self.curkey['subkeys'].append(subkey)
|
||||
|
||||
def handle_status(self, key, value):
|
||||
pass
|
||||
|
||||
class Crypt(Verify):
|
||||
"Handle status messages for --encrypt and --decrypt"
|
||||
def __init__(self, gpg):
|
||||
Verify.__init__(self, gpg)
|
||||
self.data = ''
|
||||
self.ok = False
|
||||
self.status = ''
|
||||
|
||||
def __nonzero__(self):
|
||||
if self.ok: return True
|
||||
return False
|
||||
|
||||
__bool__ = __nonzero__
|
||||
|
||||
def __str__(self):
|
||||
return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION",
|
||||
"BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA",
|
||||
"CARDCTRL"):
|
||||
# in the case of ERROR, this is because a more specific error
|
||||
# message will have come first
|
||||
pass
|
||||
elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
|
||||
"MISSING_PASSPHRASE", "DECRYPTION_FAILED",
|
||||
"KEY_NOT_CREATED"):
|
||||
self.status = key.replace("_", " ").lower()
|
||||
elif key == "NEED_PASSPHRASE_SYM":
|
||||
self.status = 'need symmetric passphrase'
|
||||
elif key == "BEGIN_DECRYPTION":
|
||||
self.status = 'decryption incomplete'
|
||||
elif key == "BEGIN_ENCRYPTION":
|
||||
self.status = 'encryption incomplete'
|
||||
elif key == "DECRYPTION_OKAY":
|
||||
self.status = 'decryption ok'
|
||||
self.ok = True
|
||||
elif key == "END_ENCRYPTION":
|
||||
self.status = 'encryption ok'
|
||||
self.ok = True
|
||||
elif key == "INV_RECP":
|
||||
self.status = 'invalid recipient'
|
||||
elif key == "KEYEXPIRED":
|
||||
self.status = 'key expired'
|
||||
elif key == "SIG_CREATED":
|
||||
self.status = 'sig created'
|
||||
elif key == "SIGEXPIRED":
|
||||
self.status = 'sig expired'
|
||||
else:
|
||||
Verify.handle_status(self, key, value)
|
||||
|
||||
class GenKey(object):
|
||||
"Handle status messages for --gen-key"
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.type = None
|
||||
self.fingerprint = None
|
||||
|
||||
def __nonzero__(self):
|
||||
if self.fingerprint: return True
|
||||
return False
|
||||
|
||||
__bool__ = __nonzero__
|
||||
|
||||
def __str__(self):
|
||||
return self.fingerprint or ''
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED"):
|
||||
pass
|
||||
elif key == "KEY_CREATED":
|
||||
(self.type, self.fingerprint) = value.split()
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
class DeleteResult(object):
|
||||
"Handle status messages for --delete-key and --delete-secret-key"
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.status = 'ok'
|
||||
|
||||
def __str__(self):
|
||||
return self.status
|
||||
|
||||
problem_reason = {
|
||||
'1': 'No such key',
|
||||
'2': 'Must delete secret key first',
|
||||
'3': 'Ambigious specification',
|
||||
}
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key == "DELETE_PROBLEM":
|
||||
self.status = self.problem_reason.get(value, "Unknown error: %r"
|
||||
% value)
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
class Sign(object):
|
||||
"Handle status messages for --sign"
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.type = None
|
||||
self.fingerprint = None
|
||||
|
||||
def __nonzero__(self):
|
||||
return self.fingerprint is not None
|
||||
|
||||
__bool__ = __nonzero__
|
||||
|
||||
def __str__(self):
|
||||
return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
|
||||
"GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR"):
|
||||
pass
|
||||
elif key == "SIG_CREATED":
|
||||
(self.type, algo, hashalgo, cls, self.timestamp,
|
||||
self.fingerprint) = value.split()
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
class ProtectedOption(Exception):
|
||||
"""Raised when the option passed to GPG is disallowed."""
|
||||
|
||||
class UsageError(Exception):
|
||||
"""Raised when you're Doing It Wrong."""
|
||||
|
||||
def _fix_unsafe(input):
|
||||
"""
|
||||
Find characters used to escape from a string into a shell, and wrap them
|
||||
|
@ -548,20 +169,6 @@ def _fix_unsafe(input):
|
|||
except TypeError:
|
||||
return None
|
||||
|
||||
def _is_file(input):
|
||||
"""
|
||||
Check that the size of the thing which is supposed to be a filename has
|
||||
size greater than zero, without following symbolic links or using
|
||||
:func:`os.path.isfile`.
|
||||
"""
|
||||
try:
|
||||
assert os.lstat(input).st_size > 0, "not a file: %s" % input
|
||||
except (AssertionError, TypeError) as error:
|
||||
logger.debug(error.message)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def _has_readwrite(path):
|
||||
"""
|
||||
Determine if the real uid/gid of the executing user has read and write
|
||||
|
@ -575,25 +182,6 @@ def _has_readwrite(path):
|
|||
"""
|
||||
return os.access(path, os.R_OK and os.W_OK)
|
||||
|
||||
def _underscore(input, remove_prefix=False):
|
||||
"""
|
||||
Change hyphens to underscores so that GPG option names can be easily
|
||||
tranlated to object attributes.
|
||||
|
||||
@type input: C{str}
|
||||
@param input: The input intended for the gnupg process.
|
||||
|
||||
@type remove_prefix: C{bool}
|
||||
@param remove_prefix: If True, strip leading hyphens from the input.
|
||||
|
||||
@rtype: C{str}
|
||||
@return: The :param:input with hyphens changed to underscores.
|
||||
"""
|
||||
if not remove_prefix:
|
||||
return input.replace('-', '_')
|
||||
else:
|
||||
return input.lstrip('-').replace('-', '_')
|
||||
|
||||
def _hyphenate(input, add_prefix=False):
|
||||
"""
|
||||
Change underscores to hyphens so that object attributes can be easily
|
||||
|
@ -856,6 +444,37 @@ def _is_allowed(input):
|
|||
return input
|
||||
return None
|
||||
|
||||
def _is_file(input):
|
||||
"""
|
||||
Check that the size of the thing which is supposed to be a filename has
|
||||
size greater than zero, without following symbolic links or using
|
||||
:func:`os.path.isfile`.
|
||||
"""
|
||||
try:
|
||||
assert os.lstat(input).st_size > 0, "not a file: %s" % input
|
||||
except (AssertionError, TypeError) as error:
|
||||
logger.debug(error.message)
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def _is_sequence(instance):
|
||||
return isinstance(instance,list) or isinstance(instance,tuple)
|
||||
|
||||
def _make_binary_stream(s, encoding):
|
||||
try:
|
||||
if _py3k:
|
||||
if isinstance(s, str):
|
||||
s = s.encode(encoding)
|
||||
else:
|
||||
if type(s) is not str:
|
||||
s = s.encode(encoding)
|
||||
from io import BytesIO
|
||||
rv = BytesIO(s)
|
||||
except ImportError:
|
||||
rv = StringIO(s)
|
||||
return rv
|
||||
|
||||
def _sanitise(*args):
|
||||
"""
|
||||
Take an arg or the key portion of a kwarg and check that it is in the set
|
||||
|
@ -997,6 +616,32 @@ def _sanitise_list(arg_list):
|
|||
if safe_arg != "":
|
||||
yield safe_arg
|
||||
|
||||
def _threaded_copy_data(instream, outstream):
|
||||
wr = threading.Thread(target=_copy_data, args=(instream, outstream))
|
||||
wr.setDaemon(True)
|
||||
logger.debug('data copier: %r, %r, %r', wr, instream, outstream)
|
||||
wr.start()
|
||||
return wr
|
||||
|
||||
def _underscore(input, remove_prefix=False):
|
||||
"""
|
||||
Change hyphens to underscores so that GPG option names can be easily
|
||||
tranlated to object attributes.
|
||||
|
||||
@type input: C{str}
|
||||
@param input: The input intended for the gnupg process.
|
||||
|
||||
@type remove_prefix: C{bool}
|
||||
@param remove_prefix: If True, strip leading hyphens from the input.
|
||||
|
||||
@rtype: C{str}
|
||||
@return: The :param:input with hyphens changed to underscores.
|
||||
"""
|
||||
if not remove_prefix:
|
||||
return input.replace('-', '_')
|
||||
else:
|
||||
return input.lstrip('-').replace('-', '_')
|
||||
|
||||
def _which(executable, flags=os.X_OK):
|
||||
"""Borrowed from Twisted's :mod:twisted.python.proutils .
|
||||
|
||||
|
@ -1037,6 +682,365 @@ def _which(executable, flags=os.X_OK):
|
|||
result.append(pext)
|
||||
return result
|
||||
|
||||
def _write_passphrase(stream, passphrase, encoding):
|
||||
passphrase = '%s\n' % passphrase
|
||||
passphrase = passphrase.encode(encoding)
|
||||
stream.write(passphrase)
|
||||
logger.debug("Wrote passphrase.")
|
||||
|
||||
|
||||
class Verify(object):
|
||||
"Handle status messages for --verify"
|
||||
|
||||
TRUST_UNDEFINED = 0
|
||||
TRUST_NEVER = 1
|
||||
TRUST_MARGINAL = 2
|
||||
TRUST_FULLY = 3
|
||||
TRUST_ULTIMATE = 4
|
||||
|
||||
TRUST_LEVELS = {
|
||||
"TRUST_UNDEFINED" : TRUST_UNDEFINED,
|
||||
"TRUST_NEVER" : TRUST_NEVER,
|
||||
"TRUST_MARGINAL" : TRUST_MARGINAL,
|
||||
"TRUST_FULLY" : TRUST_FULLY,
|
||||
"TRUST_ULTIMATE" : TRUST_ULTIMATE,
|
||||
}
|
||||
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.valid = False
|
||||
self.fingerprint = self.creation_date = self.timestamp = None
|
||||
self.signature_id = self.key_id = None
|
||||
self.username = None
|
||||
self.status = None
|
||||
self.pubkey_fingerprint = None
|
||||
self.expire_timestamp = None
|
||||
self.sig_timestamp = None
|
||||
self.trust_text = None
|
||||
self.trust_level = None
|
||||
|
||||
def __nonzero__(self):
|
||||
return self.valid
|
||||
|
||||
__bool__ = __nonzero__
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key in self.TRUST_LEVELS:
|
||||
self.trust_text = key
|
||||
self.trust_level = self.TRUST_LEVELS[key]
|
||||
elif key in ("RSA_OR_IDEA", "NODATA", "IMPORT_RES", "PLAINTEXT",
|
||||
"PLAINTEXT_LENGTH", "POLICY_URL", "DECRYPTION_INFO",
|
||||
"DECRYPTION_OKAY", "INV_SGNR"):
|
||||
pass
|
||||
elif key == "BADSIG":
|
||||
self.valid = False
|
||||
self.status = 'signature bad'
|
||||
self.key_id, self.username = value.split(None, 1)
|
||||
elif key == "GOODSIG":
|
||||
self.valid = True
|
||||
self.status = 'signature good'
|
||||
self.key_id, self.username = value.split(None, 1)
|
||||
elif key == "VALIDSIG":
|
||||
(self.fingerprint,
|
||||
self.creation_date,
|
||||
self.sig_timestamp,
|
||||
self.expire_timestamp) = value.split()[:4]
|
||||
# may be different if signature is made with a subkey
|
||||
self.pubkey_fingerprint = value.split()[-1]
|
||||
self.status = 'signature valid'
|
||||
elif key == "SIG_ID":
|
||||
(self.signature_id,
|
||||
self.creation_date, self.timestamp) = value.split()
|
||||
elif key == "ERRSIG":
|
||||
self.valid = False
|
||||
(self.key_id,
|
||||
algo, hash_algo,
|
||||
cls,
|
||||
self.timestamp) = value.split()[:5]
|
||||
self.status = 'signature error'
|
||||
elif key == "DECRYPTION_FAILED":
|
||||
self.valid = False
|
||||
self.key_id = value
|
||||
self.status = 'decryption failed'
|
||||
elif key == "NO_PUBKEY":
|
||||
self.valid = False
|
||||
self.key_id = value
|
||||
self.status = 'no public key'
|
||||
elif key in ("KEYEXPIRED", "SIGEXPIRED"):
|
||||
# these are useless in verify, since they are spit out for any
|
||||
# pub/subkeys on the key, not just the one doing the signing.
|
||||
# if we want to check for signatures with expired key,
|
||||
# the relevant flag is EXPKEYSIG.
|
||||
pass
|
||||
elif key in ("EXPKEYSIG", "REVKEYSIG"):
|
||||
# signed with expired or revoked key
|
||||
self.valid = False
|
||||
self.key_id = value.split()[0]
|
||||
self.status = (('%s %s') % (key[:3], key[3:])).lower()
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
class ImportResult(object):
|
||||
"Handle status messages for --import"
|
||||
|
||||
counts = '''count no_user_id imported imported_rsa unchanged
|
||||
n_uids n_subk n_sigs n_revoc sec_read sec_imported
|
||||
sec_dups not_imported'''.split()
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.imported = []
|
||||
self.results = []
|
||||
self.fingerprints = []
|
||||
for result in self.counts:
|
||||
setattr(self, result, None)
|
||||
|
||||
def __nonzero__(self):
|
||||
if self.not_imported: return False
|
||||
if not self.fingerprints: return False
|
||||
return True
|
||||
|
||||
__bool__ = __nonzero__
|
||||
|
||||
ok_reason = {
|
||||
'0': 'Not actually changed',
|
||||
'1': 'Entirely new key',
|
||||
'2': 'New user IDs',
|
||||
'4': 'New signatures',
|
||||
'8': 'New subkeys',
|
||||
'16': 'Contains private key',
|
||||
}
|
||||
|
||||
problem_reason = {
|
||||
'0': 'No specific reason given',
|
||||
'1': 'Invalid Certificate',
|
||||
'2': 'Issuer Certificate missing',
|
||||
'3': 'Certificate Chain too long',
|
||||
'4': 'Error storing certificate',
|
||||
}
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key == "IMPORTED":
|
||||
# this duplicates info we already see in import_ok & import_problem
|
||||
pass
|
||||
elif key == "NODATA":
|
||||
self.results.append({'fingerprint': None,
|
||||
'problem': '0', 'text': 'No valid data found'})
|
||||
elif key == "IMPORT_OK":
|
||||
reason, fingerprint = value.split()
|
||||
reasons = []
|
||||
for code, text in list(self.ok_reason.items()):
|
||||
if int(reason) | int(code) == int(reason):
|
||||
reasons.append(text)
|
||||
reasontext = '\n'.join(reasons) + "\n"
|
||||
self.results.append({'fingerprint': fingerprint,
|
||||
'ok': reason, 'text': reasontext})
|
||||
self.fingerprints.append(fingerprint)
|
||||
elif key == "IMPORT_PROBLEM":
|
||||
try:
|
||||
reason, fingerprint = value.split()
|
||||
except:
|
||||
reason = value
|
||||
fingerprint = '<unknown>'
|
||||
self.results.append({'fingerprint': fingerprint,
|
||||
'problem': reason, 'text': self.problem_reason[reason]})
|
||||
elif key == "IMPORT_RES":
|
||||
import_res = value.split()
|
||||
for i in range(len(self.counts)):
|
||||
setattr(self, self.counts[i], int(import_res[i]))
|
||||
elif key == "KEYEXPIRED":
|
||||
self.results.append({'fingerprint': None,
|
||||
'problem': '0', 'text': 'Key expired'})
|
||||
elif key == "SIGEXPIRED":
|
||||
self.results.append({'fingerprint': None,
|
||||
'problem': '0', 'text': 'Signature expired'})
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
def summary(self):
|
||||
l = []
|
||||
l.append('%d imported' % self.imported)
|
||||
if self.not_imported:
|
||||
l.append('%d not imported' % self.not_imported)
|
||||
return ', '.join(l)
|
||||
|
||||
class ListKeys(list):
|
||||
''' Handle status messages for --list-keys.
|
||||
|
||||
Handle pub and uid (relating the latter to the former).
|
||||
|
||||
Don't care about (info from src/DETAILS):
|
||||
|
||||
crt = X.509 certificate
|
||||
crs = X.509 certificate and private key available
|
||||
ssb = secret subkey (secondary key)
|
||||
uat = user attribute (same as user id except for field 10).
|
||||
sig = signature
|
||||
rev = revocation signature
|
||||
pkd = public key data (special field format, see below)
|
||||
grp = reserved for gpgsm
|
||||
rvk = revocation key
|
||||
'''
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.curkey = None
|
||||
self.fingerprints = []
|
||||
self.uids = []
|
||||
|
||||
def key(self, args):
|
||||
vars = ("""
|
||||
type trust length algo keyid date expires dummy ownertrust uid
|
||||
""").split()
|
||||
self.curkey = {}
|
||||
for i in range(len(vars)):
|
||||
self.curkey[vars[i]] = args[i]
|
||||
self.curkey['uids'] = []
|
||||
if self.curkey['uid']:
|
||||
self.curkey['uids'].append(self.curkey['uid'])
|
||||
del self.curkey['uid']
|
||||
self.curkey['subkeys'] = []
|
||||
self.append(self.curkey)
|
||||
|
||||
pub = sec = key
|
||||
|
||||
def fpr(self, args):
|
||||
self.curkey['fingerprint'] = args[9]
|
||||
self.fingerprints.append(args[9])
|
||||
|
||||
def uid(self, args):
|
||||
uid = args[9]
|
||||
uid = ESCAPE_PATTERN.sub(lambda m: chr(int(m.group(1), 16)), uid)
|
||||
self.curkey['uids'].append(uid)
|
||||
self.uids.append(uid)
|
||||
|
||||
def sub(self, args):
|
||||
subkey = [args[4], args[11]]
|
||||
self.curkey['subkeys'].append(subkey)
|
||||
|
||||
def handle_status(self, key, value):
|
||||
pass
|
||||
|
||||
class Crypt(Verify):
|
||||
"Handle status messages for --encrypt and --decrypt"
|
||||
def __init__(self, gpg):
|
||||
Verify.__init__(self, gpg)
|
||||
self.data = ''
|
||||
self.ok = False
|
||||
self.status = ''
|
||||
|
||||
def __nonzero__(self):
|
||||
if self.ok: return True
|
||||
return False
|
||||
|
||||
__bool__ = __nonzero__
|
||||
|
||||
def __str__(self):
|
||||
return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key in ("ENC_TO", "USERID_HINT", "GOODMDC", "END_DECRYPTION",
|
||||
"BEGIN_SIGNING", "NO_SECKEY", "ERROR", "NODATA",
|
||||
"CARDCTRL"):
|
||||
# in the case of ERROR, this is because a more specific error
|
||||
# message will have come first
|
||||
pass
|
||||
elif key in ("NEED_PASSPHRASE", "BAD_PASSPHRASE", "GOOD_PASSPHRASE",
|
||||
"MISSING_PASSPHRASE", "DECRYPTION_FAILED",
|
||||
"KEY_NOT_CREATED"):
|
||||
self.status = key.replace("_", " ").lower()
|
||||
elif key == "NEED_PASSPHRASE_SYM":
|
||||
self.status = 'need symmetric passphrase'
|
||||
elif key == "BEGIN_DECRYPTION":
|
||||
self.status = 'decryption incomplete'
|
||||
elif key == "BEGIN_ENCRYPTION":
|
||||
self.status = 'encryption incomplete'
|
||||
elif key == "DECRYPTION_OKAY":
|
||||
self.status = 'decryption ok'
|
||||
self.ok = True
|
||||
elif key == "END_ENCRYPTION":
|
||||
self.status = 'encryption ok'
|
||||
self.ok = True
|
||||
elif key == "INV_RECP":
|
||||
self.status = 'invalid recipient'
|
||||
elif key == "KEYEXPIRED":
|
||||
self.status = 'key expired'
|
||||
elif key == "SIG_CREATED":
|
||||
self.status = 'sig created'
|
||||
elif key == "SIGEXPIRED":
|
||||
self.status = 'sig expired'
|
||||
else:
|
||||
Verify.handle_status(self, key, value)
|
||||
|
||||
class GenKey(object):
|
||||
"Handle status messages for --gen-key"
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.type = None
|
||||
self.fingerprint = None
|
||||
|
||||
def __nonzero__(self):
|
||||
if self.fingerprint: return True
|
||||
return False
|
||||
|
||||
__bool__ = __nonzero__
|
||||
|
||||
def __str__(self):
|
||||
return self.fingerprint or ''
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key in ("PROGRESS", "GOOD_PASSPHRASE", "NODATA", "KEY_NOT_CREATED"):
|
||||
pass
|
||||
elif key == "KEY_CREATED":
|
||||
(self.type, self.fingerprint) = value.split()
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
class DeleteResult(object):
|
||||
"Handle status messages for --delete-key and --delete-secret-key"
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.status = 'ok'
|
||||
|
||||
def __str__(self):
|
||||
return self.status
|
||||
|
||||
problem_reason = {
|
||||
'1': 'No such key',
|
||||
'2': 'Must delete secret key first',
|
||||
'3': 'Ambigious specification',
|
||||
}
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key == "DELETE_PROBLEM":
|
||||
self.status = self.problem_reason.get(value, "Unknown error: %r"
|
||||
% value)
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
class Sign(object):
|
||||
"Handle status messages for --sign"
|
||||
def __init__(self, gpg):
|
||||
self.gpg = gpg
|
||||
self.type = None
|
||||
self.fingerprint = None
|
||||
|
||||
def __nonzero__(self):
|
||||
return self.fingerprint is not None
|
||||
|
||||
__bool__ = __nonzero__
|
||||
|
||||
def __str__(self):
|
||||
return self.data.decode(self.gpg.encoding, self.gpg.decode_errors)
|
||||
|
||||
def handle_status(self, key, value):
|
||||
if key in ("USERID_HINT", "NEED_PASSPHRASE", "BAD_PASSPHRASE",
|
||||
"GOOD_PASSPHRASE", "BEGIN_SIGNING", "CARDCTRL", "INV_SGNR"):
|
||||
pass
|
||||
elif key == "SIG_CREATED":
|
||||
(self.type, algo, hashalgo, cls, self.timestamp,
|
||||
self.fingerprint) = value.split()
|
||||
else:
|
||||
raise ValueError("Unknown status message: %r" % key)
|
||||
|
||||
class GPG(object):
|
||||
"""Encapsulate access to the gpg executable"""
|
||||
decode_errors = 'strict'
|
||||
|
|
Loading…
Reference in New Issue