Add checks to utils.py, update docstrings.

* Add _create_gpghome()
 * Add _is_file()
 * Add _is_stream()
 * Add _is_list_or_tuple()
 * Add _today()
 * Add _which()
 * Update docstrings for methods in ListPackets.
feature/documentation-builds-html
Isis Lovecruft 2013-04-15 00:33:17 +00:00
parent 425ca28a48
commit 2e6d34385b
No known key found for this signature in database
GPG Key ID: A3ADB67A2CDB8B35
1 changed files with 95 additions and 155 deletions

View File

@ -100,174 +100,114 @@ def _find_gpgbinary(gpgbinary=None):
:returns: The absolute path to the GnuPG binary to use, if no exceptions :returns: The absolute path to the GnuPG binary to use, if no exceptions
occur. occur.
""" """
Handle status messages for --list-packets. binary = None
if gpgbinary is not None:
if not os.path.isabs(gpgbinary):
try: binary = _which(gpgbinary)[0]
except IndexError as ie: logger.debug(ie.message)
if binary is None:
try: binary = _which('gpg')[0]
except IndexError: raise RuntimeError("gpg is not installed")
try:
assert os.path.isabs(binary), "Path to gpg binary not absolute"
assert not os.path.islink(binary), "Path to gpg binary is symlink"
assert os.access(binary, os.X_OK), "Lacking +x perms for gpg binary"
except (AssertionError, AttributeError) as ae:
logger.debug("util._find_gpgbinary(): %s" % ae.message)
else:
return binary
def _has_readwrite(path):
""" """
Determine if the real uid/gid of the executing user has read and write
permissions for a directory or a file.
def __init__(self, gpg): :param str path: The path to the directory or file to check permissions
self.gpg = gpg for.
self.nodata = None :rtype: bool
self.key = None :returns: True if real uid/gid has read+write permissions, False otherwise.
self.need_passphrase = None
self.need_passphrase_sym = None
self.userid_hint = None
def handle_status(self, key, value):
# TODO: write tests for handle_status
if key == 'NODATA':
self.nodata = True
if key == 'ENC_TO':
# This will only capture keys in our keyring. In the future we
# may want to include multiple unknown keys in this list.
self.key, _, _ = value.split()
if key == 'NEED_PASSPHRASE':
self.need_passphrase = True
if key == 'NEED_PASSPHRASE_SYM':
self.need_passphrase_sym = True
if key == 'USERID_HINT':
self.userid_hint = value.strip().split()
class GPGWrapper(gnupg.GPG):
""" """
This is a temporary class for handling GPG requests, and should be return os.access(path, os.R_OK and os.W_OK)
replaced by a more general class used throughout the project.
def _is_file(input):
"""Check that the size of the thing which is supposed to be a filename has
size greater than zero, without following symbolic links or using
:func:os.path.isfile.
:param input: An object to check.
:rtype: bool
:returns: True if :param:input is file-like, False otherwise.
""" """
try:
assert os.lstat(input).st_size > 0, "not a file: %s" % input
except (AssertionError, TypeError) as error:
logger.debug(error.message)
return False
else:
return True
GNUPG_HOME = os.environ['HOME'] + "/.config/leap/gnupg" def _is_stream(input):
GNUPG_BINARY = "/usr/bin/gpg" # this has to be changed based on OS """Check that the input is a byte stream.
def __init__(self, gpgbinary=GNUPG_BINARY, gnupghome=GNUPG_HOME, :param input: An object provided for reading from or writing to.
verbose=False, use_agent=False, keyring=None, options=None): :rtype: bool
super(GPGWrapper, self).__init__(gnupghome=gnupghome, :returns: True if :param:input is a stream, False if otherwise.
gpgbinary=gpgbinary, """
verbose=verbose, return isinstance(input, BytesIO) or isinstance(input, StringIO)
use_agent=use_agent,
keyring=keyring,
options=options)
self.result_map['list-packets'] = ListPackets
def find_key_by_email(self, email, secret=False): def _is_list_or_tuple(instance):
""" """Check that ``instance`` is a list or tuple.
Find user's key based on their email.
"""
for key in self.list_keys(secret=secret):
for uid in key['uids']:
if re.search(email, uid):
return key
raise LookupError("GnuPG public key for email %s not found!" % email)
def find_key_by_subkey(self, subkey): :param instance: The object to type check.
for key in self.list_keys(): :rtype: bool
for sub in key['subkeys']: :returns: True if ``instance`` is a list or tuple, False otherwise.
if sub[0] == subkey: """
return key return isinstance(instance,list) or isinstance(instance,tuple)
raise LookupError(
"GnuPG public key for subkey %s not found!" % subkey)
def find_key_by_keyid(self, keyid): ## xxx unused function?
for key in self.list_keys(): def _today():
if keyid == key['keyid']: """Get the current date.
return key
raise LookupError(
"GnuPG public key for subkey %s not found!" % subkey)
def encrypt(self, data, recipient, sign=None, always_trust=True, :rtype: str
passphrase=None, symmetric=False): :returns: The date, in the format '%Y-%m-%d'.
""" """
Encrypt data using GPG. now_string = datetime.now().__str__()
""" return now_string.split(' ', 1)[0]
# TODO: devise a way so we don't need to "always trust".
return super(GPGWrapper, self).encrypt(data, recipient, sign=sign,
always_trust=always_trust,
passphrase=passphrase,
symmetric=symmetric,
cipher_algo='AES256')
def decrypt(self, data, always_trust=True, passphrase=None): def _which(executable, flags=os.X_OK):
""" """Borrowed from Twisted's :mod:twisted.python.proutils .
Decrypt data using GPG.
"""
# TODO: devise a way so we don't need to "always trust".
return super(GPGWrapper, self).decrypt(data,
always_trust=always_trust,
passphrase=passphrase)
def send_keys(self, keyserver, *keyids): Search PATH for executable files with the given name.
"""
Send keys to a keyserver
"""
result = self.result_map['list'](self)
gnupg.logger.debug('send_keys: %r', keyids)
data = gnupg._make_binary_stream("", self.encoding)
args = ['--keyserver', keyserver, '--send-keys']
args.extend(keyids)
self._handle_io(args, data, result, binary=True)
gnupg.logger.debug('send_keys result: %r', result.__dict__)
data.close()
return result
def encrypt_file(self, file, recipients, sign=None, On newer versions of MS-Windows, the PATHEXT environment variable will be
always_trust=False, passphrase=None, set to the list of file extensions for files considered executable. This
armor=True, output=None, symmetric=False, will normally include things like ".EXE". This fuction will also find files
cipher_algo=None): with the given name ending with any of these extensions.
"Encrypt the message read from the file-like object 'file'"
args = ['--encrypt']
if symmetric:
args = ['--symmetric']
if cipher_algo:
args.append('--cipher-algo %s' % cipher_algo)
else:
args = ['--encrypt']
if not _is_sequence(recipients):
recipients = (recipients,)
for recipient in recipients:
args.append('--recipient "%s"' % recipient)
if armor: # create ascii-armored output - set to False for binary
args.append('--armor')
if output: # write the output to a file with the specified name
if os.path.exists(output):
os.remove(output) # to avoid overwrite confirmation message
args.append('--output "%s"' % output)
if sign:
args.append('--sign --default-key "%s"' % sign)
if always_trust:
args.append("--always-trust")
result = self.result_map['crypt'](self)
self._handle_io(args, file, result, passphrase=passphrase, binary=True)
logger.debug('encrypt result: %r', result.data)
return result
def list_packets(self, raw_data): On MS-Windows the only flag that has any meaning is os.F_OK. Any other
args = ["--list-packets"] flags will be ignored.
result = self.result_map['list-packets'](self)
self._handle_io(
args,
_make_binary_stream(raw_data, self.encoding),
result,
)
return result
def encrypted_to(self, raw_data): Note: This function does not help us prevent an attacker who can already
""" manipulate the environment's PATH settings from placing malicious code
Return the key to which raw_data is encrypted to. higher in the PATH. It also does happily follows links.
"""
# TODO: make this support multiple keys.
result = self.list_packets(raw_data)
if not result.key:
raise LookupError(
"Content is not encrypted to a GnuPG key!")
try:
return self.find_key_by_keyid(result.key)
except:
return self.find_key_by_subkey(result.key)
def is_encrypted_sym(self, raw_data): :param str name: The name for which to search.
result = self.list_packets(raw_data) :param int flags: Arguments to L{os.access}.
return bool(result.need_passphrase_sym) :rtype: list
:returns: A list of the full paths to files found, in the order in which
def is_encrypted_asym(self, raw_data): they were found.
result = self.list_packets(raw_data) """
return bool(result.key) result = []
exts = filter(None, os.environ.get('PATHEXT', '').split(os.pathsep))
def is_encrypted(self, raw_data): path = os.environ.get('PATH', None)
self.is_encrypted_asym() or self.is_encrypted_sym() if path is None:
return []
for p in os.environ.get('PATH', '').split(os.pathsep):
p = os.path.join(p, executable)
if os.access(p, flags):
result.append(p)
for e in exts:
pext = p + e
if os.access(pext, flags):
result.append(pext)
return result