Rewrite checks in gnupg.GPG.__init__().

feature/documentation-builds-dirhtml
Isis Lovecruft 2013-03-16 02:07:22 +00:00 committed by Isis Lovecruft
parent 852a0fae97
commit a7940e3c84
1 changed files with 72 additions and 36 deletions

108
gnupg.py
View File

@ -1049,53 +1049,89 @@ class GPG(object):
@param keyring: Name of alternative keyring file to use. If specified,
the default keyring is not used.
@options: A list of additional options to pass to the GPG binary.
@rtype:
@raises: RuntimeError with explanation message if there is a problem
invoking gpg.
@returns:
"""
if gpgbinary:
full = _which(gpgbinary)[0]
if full is not None:
assert os.path.isabs(full), "Couldn't get full path to gpg"
assert not os.path.islink(full), "Path to gpg binary is link"
safe_gpgbinary = _fix_unsafe(gpgbinary)
## if using the default, or if the given gpgbinary is not absolute,
## then find the absolute path and check that we have +x permissions
if not os.path.isabs(safe_gpgbinary):
that = _which(safe_gpgbinary)
self.gpgbinary = that[0] if (len(that) > 0) else None
else:
self.gpgbinary = safe_gpgbinary
assert self.gpgbinary, "Could not find gpgbinary %s" % safe_gpgbinary
if not gpgbinary:
try:
standard_binary = _which('gpg')[0]
except IndexError:
raise RuntimeError("gpg is not installed")
self.gpghome = _fix_unsafe(gpghome)
if self.gpghome is not None:
if not os.path.isdir(self.gpghome):
os.makedirs(self.gpghome, 0x1C0)
assert _has_readwrite(self.gpghome), \
"Need read+write for GnuPG home directory: %s" % self.gpghome
self.keyring = _fix_unsafe(keyring)
if self.keyring is not None:
assert _is_file(self.keyring), "Could not find %s" % self.keyring
assert isinstance(verbose, bool), "'verbose' must be boolean"
self.verbose = verbose
assert isinstance(use_agent, bool), "'use_agent' must be boolean"
self.use_agent = use_agent
## find the absolute path, check that it is not a link, and check that
## we have exec permissions
that = _which(gpgbinary or 'gpg')
full = that[0] if (len(that) > 0) else None
if not full:
gpgbinary = 'gpg'
that = _which(gpgbinary)
full = that[0] if (len(that) > 0) else None
self.gpgbinary = full
self.options = _sanitise(options) if options else None
self.gpghome = _fix_unsafe(gpghome)
if self.gpghome:
if not os.path.isdir(self.gpghome):
message = ("Creating gpg home dir: %s" % gpghome)
logger.debug("GPG.__init__(): %s" % message)
os.makedirs(self.gpghome, 0x1C0)
if not os.path.isabs(self.gpghome):
message = ("Got non-abs gpg home dir path: %s" % self.gpghome)
logger.debug("GPG.__init__(): %s" % message)
self.gpghome = os.path.abspath(self.gpghome)
else:
message = ("Unsuitable gpg home dir: %s" % gpghome)
logger.debug("GPG.__init__(): %s" % message)
safe_keyring = _fix_unsafe(keyring)
if not safe_keyring:
safe_keyring = 'pubring.asc'
self.keyring = os.path.join(self.gpghome, safe_keyring)
## xxx TODO: hack the locale module away so we can use this on android
self.encoding = locale.getpreferredencoding()
if self.encoding is None: # This happens on Jython!
self.encoding = sys.stdin.encoding
p = self._open_subprocess(["--version"])
result = self.result_map['verify'](self) # any result will do for this
self._collect_output(p, result, stdin=p.stdin)
if p.returncode != 0:
raise ValueError("Error invoking gpg: %s: %s"
% (p.returncode, result.stderr))
try:
assert os.path.isabs(full), "Couldn't get full path to gpg"
assert not os.path.islink(full), "Path to gpg binary is link"
assert os.access(full, os.X_OK), "gpg binary must be executable"
assert self.gpghome is not None, "Got None for self.gpghome"
assert _has_readwrite(self.gpghome), ("Home dir %s needs r+w"
% self.gpghome)
assert self.gpgbinary, "Could not find gpgbinary %s" % full
assert isinstance(verbose, bool), "'verbose' must be boolean"
assert isinstance(use_agent, bool), "'use_agent' must be boolean"
if self.options:
assert isinstance(options, str), ("options not formatted: %s"
% options)
except (AssertionError, AttributeError) as ae:
logger.debug("GPG.__init__(): %s" % ae.message)
raise RuntimeError(ae.message)
else:
self.gpgbinary = full
self.verbose = verbose
self.use_agent = use_agent
proc = self._open_subprocess(["--version"])
result = self.result_map['verify'](self)
self._collect_output(proc, result, stdin=proc.stdin)
if proc.returncode != 0:
raise RuntimeError("Error invoking gpg: %s: %s"
% (proc.returncode, result.stderr))
if self.keyring:
try:
assert _has_readwrite(self.keyring), ("Need r+w for %s"
% self.keyring)
except AssertionError as ae:
logger.debug(ae.message)
def make_args(self, args, passphrase=False):
"""