python-gnupg/gnupg/util.py

188 lines
6.7 KiB
Python

"""
Utilities for Soledad.
"""
import os
import gnupg
import re
from gnupg import (
logger,
_is_sequence,
_make_binary_stream,
)
class ListPackets():
"""
Handle status messages for --list-packets.
"""
def __init__(self, gpg):
self.gpg = gpg
self.nodata = None
self.key = None
self.need_passphrase = None
self.need_passphrase_sym = None
self.userid_hint = None
def handle_status(self, key, value):
# TODO: write tests for handle_status
if key == 'NODATA':
self.nodata = True
if key == 'ENC_TO':
# This will only capture keys in our keyring. In the future we
# may want to include multiple unknown keys in this list.
self.key, _, _ = value.split()
if key == 'NEED_PASSPHRASE':
self.need_passphrase = True
if key == 'NEED_PASSPHRASE_SYM':
self.need_passphrase_sym = True
if key == 'USERID_HINT':
self.userid_hint = value.strip().split()
class GPGWrapper(gnupg.GPG):
"""
This is a temporary class for handling GPG requests, and should be
replaced by a more general class used throughout the project.
"""
GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg"
GNUPG_BINARY = "/usr/bin/gpg" # this has to be changed based on OS
def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME,
verbose=False, use_agent=False, keyring=None, options=None):
super(GPGWrapper, self).__init__(gnupghome=gnupghome,
gpgbinary=gpgbinary,
verbose=verbose,
use_agent=use_agent,
keyring=keyring,
options=options)
self.result_map['list-packets'] = ListPackets
def find_key_by_email(self, email, secret=False):
"""
Find user's key based on their email.
"""
for key in self.list_keys(secret=secret):
for uid in key['uids']:
if re.search(email, uid):
return key
raise LookupError("GnuPG public key for email %s not found!" % email)
def find_key_by_subkey(self, subkey):
for key in self.list_keys():
for sub in key['subkeys']:
if sub[0] == subkey:
return key
raise LookupError(
"GnuPG public key for subkey %s not found!" % subkey)
def find_key_by_keyid(self, keyid):
for key in self.list_keys():
if keyid == key['keyid']:
return key
raise LookupError(
"GnuPG public key for subkey %s not found!" % subkey)
def encrypt(self, data, recipient, sign=None, always_trust=True,
passphrase=None, symmetric=False):
"""
Encrypt data using GPG.
"""
# TODO: devise a way so we don't need to "always trust".
return super(GPGWrapper, self).encrypt(data, recipient, sign=sign,
always_trust=always_trust,
passphrase=passphrase,
symmetric=symmetric,
cipher_algo='AES256')
def decrypt(self, data, always_trust=True, passphrase=None):
"""
Decrypt data using GPG.
"""
# TODO: devise a way so we don't need to "always trust".
return super(GPGWrapper, self).decrypt(data,
always_trust=always_trust,
passphrase=passphrase)
def send_keys(self, keyserver, *keyids):
"""
Send keys to a keyserver
"""
result = self.result_map['list'](self)
gnupg.logger.debug('send_keys: %r', keyids)
data = gnupg._make_binary_stream("", self.encoding)
args = ['--keyserver', keyserver, '--send-keys']
args.extend(keyids)
self._handle_io(args, data, result, binary=True)
gnupg.logger.debug('send_keys result: %r', result.__dict__)
data.close()
return result
def encrypt_file(self, file, recipients, sign=None,
always_trust=False, passphrase=None,
armor=True, output=None, symmetric=False,
cipher_algo=None):
"Encrypt the message read from the file-like object 'file'"
args = ['--encrypt']
if symmetric:
args = ['--symmetric']
if cipher_algo:
args.append('--cipher-algo %s' % cipher_algo)
else:
args = ['--encrypt']
if not _is_sequence(recipients):
recipients = (recipients,)
for recipient in recipients:
args.append('--recipient "%s"' % recipient)
if armor: # create ascii-armored output - set to False for binary
args.append('--armor')
if output: # write the output to a file with the specified name
if os.path.exists(output):
os.remove(output) # to avoid overwrite confirmation message
args.append('--output "%s"' % output)
if sign:
args.append('--sign --default-key "%s"' % sign)
if always_trust:
args.append("--always-trust")
result = self.result_map['crypt'](self)
self._handle_io(args, file, result, passphrase=passphrase, binary=True)
logger.debug('encrypt result: %r', result.data)
return result
def list_packets(self, raw_data):
args = ["--list-packets"]
result = self.result_map['list-packets'](self)
self._handle_io(
args,
_make_binary_stream(raw_data, self.encoding),
result,
)
return result
def encrypted_to(self, raw_data):
"""
Return the key to which raw_data is encrypted to.
"""
# TODO: make this support multiple keys.
result = self.list_packets(raw_data)
if not result.key:
raise LookupError(
"Content is not encrypted to a GnuPG key!")
try:
return self.find_key_by_keyid(result.key)
except:
return self.find_key_by_subkey(result.key)
def is_encrypted_sym(self, raw_data):
result = self.list_packets(raw_data)
return bool(result.need_passphrase_sym)
def is_encrypted_asym(self, raw_data):
result = self.list_packets(raw_data)
return bool(result.key)
def is_encrypted(self, raw_data):
self.is_encrypted_asym() or self.is_encrypted_sym()