Merge branch 'fix/2162-stream-check' into develop

feature/documentation-builds-html
Isis Lovecruft 2013-04-10 22:00:09 +00:00
commit 3152784ada
No known key found for this signature in database
GPG Key ID: A3ADB67A2CDB8B35
2 changed files with 77 additions and 65 deletions

View File

@ -70,6 +70,8 @@ try:
except ImportError:
from cStringIO import StringIO
from datetime import datetime
import codecs
import locale
import logging
@ -114,12 +116,12 @@ def _copy_data(instream, outstream):
Copy data from one stream to another.
:param instream: A file descriptor to read from.
:param outstream: A file descriptor to write to.
:param outstream: The file descriptor of a tmpfile to write to.
"""
sent = 0
try:
assert isinstance(instream, BytesIO), "instream is not a file"
assert _is_stream(instream), "instream is not a stream"
assert isinstance(outstream, file), "outstream is not a file"
except AssertionError as ae:
logger.exception(ae)
@ -408,7 +410,8 @@ def _is_allowed(input):
## eg, --no-show-photos would mitigate things like
## https://www-01.ibm.com/support/docview.wss?uid=swg21620982
_allowed = frozenset(
['--list-keys', '--list-packets', '--with-colons',
['--list-keys', '--list-secret-keys', '--fixed-list-mode',
'--list-packets', '--with-colons',
'--delete-keys', '--delete-secret-keys',
'--encrypt', '--encrypt-files',
'--print-mds', '--print-md', '--sign',
@ -426,7 +429,7 @@ def _is_allowed(input):
assert _allowed.issubset(_possible), \
'_allowed is not subset of known options, difference: %s' \
% _allowed.difference(_possible)
except AssertionError as ae: ## 'as' syntax requires python>=2.6
except AssertionError as ae:
logger.debug("gnupg._is_allowed(): %s" % ae.message)
raise UsageError(ae.message)
@ -445,7 +448,7 @@ def _is_allowed(input):
try:
assert hyphenated in _allowed
except AssertionError as ae:
logger.warn("Dropping option '%s'..."
logger.warn("_is_allowed(): Dropping option '%s'..."
% _fix_unsafe(hyphenated))
raise ProtectedOption("Option '%s' not supported."
% _fix_unsafe(hyphenated))
@ -467,6 +470,15 @@ def _is_file(input):
else:
return True
def _is_stream(input):
"""Check that the input is a byte stream.
:param input: An object provided for reading from or writing to
:rtype: C{bool}
:returns: True if :param:`input` is a stream, False if otherwise.
"""
return isinstance(input, BytesIO)
def _is_sequence(instance):
return isinstance(instance,list) or isinstance(instance,tuple)
@ -484,6 +496,11 @@ def _make_binary_stream(s, encoding):
rv = StringIO(s)
return rv
def _today():
"""Get the current date as a string in the form %Y-%m-%d."""
now_string = datetime.now().__str__()
return now_string.split(' ', 1)[0]
def _sanitise(*args):
"""
Take an arg or the key portion of a kwarg and check that it is in the set
@ -528,8 +545,10 @@ def _sanitise(*args):
--recipient isis@leap.se" to gpg, then "--encrypt" would be
an arg without a value, and "--recipient" would also be an
arg, with a value of "isis@leap.se".
:type sanitised: C{str}
:ivar sanitised: The sanitised, allowed options.
:type checked: C{list}
:ivar checked: The sanitised, allowed options and values.
:rtype: C{str}
:returns: A string of the items in :ivar:`checked` delimited by spaces.
"""
safe_values = str()
@ -538,26 +557,29 @@ def _sanitise(*args):
assert allowed_flag is not None, \
"_check_arg_and_value(): got None for allowed_flag"
except (AssertionError, ProtectedOption) as error:
logger.warn(error.message)
logger.debug("Dropping option '%s'..." % _fix_unsafe(arg))
logger.warn("_sanitise(): %s" % error.message)
else:
safe_values += (allowed_flag + " ")
if isinstance(value, str):
value_list = value.split(' ')
for value in value_list:
safe_value = _fix_unsafe(value)
if allowed_flag == '--encrypt' or '--encrypt-files' \
or '--decrypt' or '--decrypt-file' \
or '--import' or '--verify':
if safe_value is not None and not safe_value.strip() == "":
if allowed_flag in ['--encrypt', '--encrypt-files',
'--decrypt', '--decrypt-file',
'--import', '--verify']:
## Place checks here:
if not safe_value == "" and _is_file(safe_value):
safe_values += (safe_value + " ")
if _is_file(safe_value):
safe_values += (safe_value + " ")
else:
logger.debug(
"_sanitize(): Got non-file for %s option: %s"
% (allowed_flag, safe_value))
else:
logger.debug("Got non-filename for %s option: %s"
% (allowed_flag, safe_value))
else:
safe_values += (safe_value + " ")
logger.debug("Got non-checked value: %s" % safe_value)
safe_values += (safe_value + " ")
logger.debug(
"_sanitize(): No configured checks for value: %s"
% safe_value)
return safe_values
checked = []
@ -583,7 +605,7 @@ def _sanitise(*args):
logger.debug("Got non-flag argument: %s" % filo[0])
filo.pop()
safe = _check_arg_and_value(new_arg, new_value)
if safe is not None and safe.strip() != '':
if safe is not None and not safe.strip() == '':
logger.debug("_sanitise(): appending args: %s" % safe)
checked.append(safe)
else:
@ -623,7 +645,7 @@ def _sanitise_list(arg_list):
def _threaded_copy_data(instream, outstream):
wr = threading.Thread(target=_copy_data, args=(instream, outstream))
wr.setDaemon(True)
logger.debug('data copier: %r, %r, %r', wr, instream, outstream)
logger.debug('_threaded_copy_data(): %r, %r, %r', wr, instream, outstream)
wr.start()
return wr
@ -1100,17 +1122,19 @@ class GPG(object):
:returns:
"""
logger.warn("")
if not gpghome:
gpghome = os.path.join(os.getcwd(), 'gnupg')
self.gpghome = _fix_unsafe(gpghome)
if self.gpghome:
if not os.path.isdir(self.gpghome):
message = ("Creating gpg home dir: %s" % gpghome)
logger.debug("GPG.__init__(): %s" % message)
logger.warn("GPG.__init__(): %s" % message)
os.makedirs(self.gpghome, 0x1C0)
if not os.path.isabs(self.gpghome):
message = ("Got non-abs gpg home dir path: %s" % self.gpghome)
logger.debug("GPG.__init__(): %s" % message)
logger.warn("GPG.__init__(): %s" % message)
self.gpghome = os.path.abspath(self.gpghome)
else:
message = ("Unsuitable gpg home dir: %s" % gpghome)
@ -1215,9 +1239,6 @@ class GPG(object):
[cmd.append(opt) for opt in iter(_sanitise_list(self.options))]
if args:
[cmd.append(arg) for arg in iter(_sanitise_list(args))]
## so that we don't print it twice, here and in _open_subprocess():
if not self.verbose:
logger.debug("make_args(): Using command: %s" % cmd)
return cmd
def _open_subprocess(self, args=None, passphrase=False):
@ -1304,15 +1325,11 @@ class GPG(object):
stderr.close()
stdout.close()
def _handle_io(self, args, file, result, passphrase=None, binary=False):
def _handle_io(self, args, file, result, passphrase=False, binary=False):
"""
Handle a call to GPG - pass input data, collect output data.
"""
if passphrase is not None:
ask_passphrase = True
else:
ask_passphrase = False
p = self._open_subprocess(args, ask_passphrase)
p = self._open_subprocess(args, passphrase)
if not binary:
stdin = codecs.getwriter(self.encoding)(p.stdin)
else:
@ -1620,8 +1637,15 @@ class GPG(object):
return result
def gen_key_input(self, **kwargs):
"""
Generate --gen-key input per gpg doc/DETAILS
"""Generate GnuPG key(s) through batch file key generation.
The GnuPG batch file key generation feature allows unattended key
generation by creating a file with special syntax and then providing it
to:
$ gpg --gen-key --batch <batch file>
see http://www.gnupg.org/documentation/manuals/gnupg-devel/Unattended-GPG-key-generation.html#Unattended-GPG-key-generation
for more details.
"""
parms = {}
for key, val in list(kwargs.items()):
@ -1629,7 +1653,7 @@ class GPG(object):
if str(val).strip(): # skip empty strings
parms[key] = val
parms.setdefault('Key-Type', 'RSA')
parms.setdefault('Key-Length', 2048)
parms.setdefault('Key-Length', 4096)
parms.setdefault('Name-Real', "Autogenerated Key")
parms.setdefault('Name-Comment', "Generated by gnupg.py")
try:
@ -1642,6 +1666,8 @@ class GPG(object):
out = "Key-Type: %s\n" % parms.pop('Key-Type')
for key, val in list(parms.items()):
out += "%s: %s\n" % (key, val)
out += "%%pubring %s.pub\n" % self.pubring
out += "%%secring %s.pub\n" % self.secring
out += "%commit\n"
return out

View File

@ -184,28 +184,23 @@ class GPGTestCase(unittest.TestCase):
self.assertGreater(result.find(expected4), 0)
def test_gpg_binary_not_abs(self):
"""
Test that a non-absolute path to gpg results in a full path.
"""
"""Test that a non-absolute path to gpg results in a full path."""
self.assertTrue(os.path.isabs(self.gpg.gpgbinary))
def test_make_args_drop_protected_options(self):
"""
Test that unsupported gpg options are dropped.
"""
"""Test that unsupported gpg options are dropped."""
self.gpg.options = ['--tyrannosaurus-rex', '--stegosaurus']
self.gpg.keyring = self.secring
cmd = self.gpg.make_args(None, False)
expected = ['/usr/bin/gpg',
'--status-fd 2 --no-tty',
'--homedir "%s"' % os.path.join(os.getcwd(), 'keys'),
'--no-default-keyring --keyring "%s"' % self.secring]
'--no-default-keyring --keyring %s --secret-keyring %s'
% (self.pubring, self.secring)]
self.assertListEqual(cmd, expected)
def test_make_args(self):
"""
Test argument line construction.
"""
"""Test argument line construction."""
not_allowed = ['--bicycle', '--zeppelin', 'train', 'flying-carpet']
self.gpg.options = not_allowed[:-2]
args = self.gpg.make_args(not_allowed[2:], False)
@ -214,18 +209,14 @@ class GPGTestCase(unittest.TestCase):
self.assertNotIn(na, args)
def test_list_keys_initial_public(self):
"""
Test that initially there are no public keys.
"""
"""Test that initially there are no public keys."""
public_keys = self.gpg.list_keys()
self.assertTrue(is_list_with_len(public_keys, 0),
"Empty list expected...got instead: %s"
% str(public_keys))
def test_list_keys_initial_secret(self):
"""
Test that initially there are no secret keys.
"""
"""Test that initially there are no secret keys."""
private_keys = self.gpg.list_keys(secret=True)
self.assertTrue(is_list_with_len(private_keys, 0),
"Empty list expected...got instead: %s"
@ -244,22 +235,20 @@ class GPGTestCase(unittest.TestCase):
def generate_key_input(self, real_name, email_domain, key_length=None,
key_type=None, subkey_type=None, passphrase=None):
"""
Generate a GnuPG batch file for key unattended key creation.
"""
"""Generate a GnuPG batch file for key unattended key creation."""
name = real_name.lower().replace(' ', '')
## XXX will GPG just use it's defaults? does it have defaults if
## we've just given it a homedir without a gpg.conf?
key_type = 'RSA'if key_type is None else key_type
key_length = 2048 if key_length is None else key_length
key_length = 4096 if key_length is None else key_length
batch = {'Key-Type': key_type,
'Key-Length': key_length,
'Name-Comment': 'python-gnupg tester',
'Expire-Date': 1,
'Name-Real': '%s' % real_name,
'Name-Email': ("%s@%s" % (name, email_domain)) }
'Name-Email': ("%s@%s" % (name, email_domain))}
batch['Passphrase'] = name if passphrase is None else passphrase
@ -271,18 +260,15 @@ class GPGTestCase(unittest.TestCase):
return key_input
def generate_key(self, real_name, email_domain, **kwargs):
"""
Generate a basic key.
"""
"""Generate a basic key."""
key_input = self.generate_key_input(real_name, email_domain, **kwargs)
key = self.gpg.gen_key(key_input)
def test_gen_key_input(self):
"""
Test that GnuPG batch file creation is successful.
"""
"""Test that GnuPG batch file creation is successful."""
key_input = self.generate_key_input("Francisco Ferrer", "an.ok")
self.assertIsNotNone(key_input)
self.assertGreater(key_input.find('Francisco Ferrer'), 0)
def test_rsa_key_generation(self):
"""
@ -342,9 +328,9 @@ class GPGTestCase(unittest.TestCase):
'Key-Type': 'INVALID',
'Key-Length': 1024,
'Subkey-Type': 'ELG-E',
'Subkey-Length': 2048,
'Subkey-Length': 1024,
'Name-Comment': 'A test user',
'Expire-Date': self.expire_today(),
'Expire-Date': 1,
'Name-Real': 'Test Name',
'Name-Email': 'test.name@example.com',
}