Add MetaGPG and GPGBase for handling attribute/property setup.
* These classes also play a few other tricks, like the steps taken to disable pinentry if GPG.use_agent=False: 1) detect if gpg-agent is running for the same EUID as python, and if it is then 2) find out if the user wants us to use it, and if they don't then 3) find if pinentry is installed, and if it is then 4) modify the system PATH to exclude the directory where pinentry is, symlinking out gpg if gpg was in the same directory, and then 5) register an _exithandler with the interpreter to replace the original environment when the interpreter exits.testing/mmn/mktime_takes_localtime_not_gmtime
parent
a4630e1cfb
commit
848e410f19
|
@ -1 +1,2 @@
|
|||
Sphinx>=1.1
|
||||
psutil>=0.5.1
|
||||
|
|
254
src/gnupg.py
254
src/gnupg.py
|
@ -78,9 +78,12 @@ try:
|
|||
except ImportError:
|
||||
from cStringIO import StringIO
|
||||
|
||||
from pprint import pprint
|
||||
from psutil import process_iter
|
||||
from subprocess import Popen
|
||||
from subprocess import PIPE
|
||||
|
||||
import atexit
|
||||
import codecs
|
||||
## For AOS, the locale module will need to point to a wrapper around the
|
||||
## java.util.Locale class.
|
||||
|
@ -104,13 +107,258 @@ from _util import log
|
|||
import _util
|
||||
import _parsers
|
||||
|
||||
import parsers
|
||||
|
||||
class GPGMeta(type):
|
||||
"""Metaclass for changing the :meth:GPG.__init__ initialiser.
|
||||
|
||||
Detects running gpg-agent processes and the presence of a pinentry
|
||||
program, and disables pinentry so that python-gnupg can write the
|
||||
passphrase to the controlled GnuPG process without killing the agent.
|
||||
"""
|
||||
|
||||
def __new__(cls, name, bases, attrs):
|
||||
"""Construct the initialiser for GPG"""
|
||||
log.debug("Metaclass __new__ constructor called for %r" % cls)
|
||||
if cls._find_agent():
|
||||
## call the normal GPG.__init__() initialisor:
|
||||
attrs['init'] = cls.__init__ ## nothing changed for now
|
||||
attrs['_remove_agent'] = True
|
||||
return super(GPGMeta, cls).__new__(cls, name, bases, attrs)
|
||||
|
||||
@classmethod
|
||||
def _find_agent(cls):
|
||||
"""Discover if a gpg-agent process for the current euid is running.
|
||||
|
||||
If there is a matching gpg-agent process, set a :class:psutil.Process
|
||||
instance containing the gpg-agent process' information to
|
||||
:attr:cls._agent_proc.
|
||||
|
||||
:returns: True if there exists a gpg-agent process running under the
|
||||
same effective user ID as that of this program. Otherwise,
|
||||
returns None.
|
||||
"""
|
||||
identity = os.getresuid()
|
||||
for proc in process_iter():
|
||||
if (proc.name == "gpg-agent") and proc.is_running:
|
||||
log.debug("Found gpg-agent process with pid %d" % proc.pid)
|
||||
if proc.uids == identity:
|
||||
log.debug(
|
||||
"Effective UIDs of this process and gpg-agent match")
|
||||
setattr(cls, '_agent_proc', proc)
|
||||
return True
|
||||
|
||||
## xxx we might not need this, try setting:
|
||||
## attrs['remove_path'] = __remove_path__
|
||||
|
||||
# @classmethod
|
||||
# def _init_decorator(cls):
|
||||
# """Wraps the :meth:__init__ function in a partial of itself."""
|
||||
# log.debug("_init_decorator called for %s" % cls.__init__.__repr__())
|
||||
# def _init_wrapper(*args, **kwargs):
|
||||
# wraps(cls.__init__, *args, **kwargs)
|
||||
# if getattr(cls, '_agent_proc', None) is not None:
|
||||
# cls.__remove_path__(prog='pinentry')
|
||||
# return _init_wrapper
|
||||
|
||||
|
||||
import util as _util
|
||||
class GPGBase(object):
|
||||
"""Base class to control process initialisation and for property storage."""
|
||||
|
||||
__metaclass__ = GPGMeta
|
||||
|
||||
def __init__(self, binary=None, home=None, keyring=None, secring=None,
|
||||
use_agent=False, default_preference_list=None,
|
||||
verbose=False, options=None):
|
||||
|
||||
self.binary = _util._find_binary(binary)
|
||||
self.homedir = home if home else _conf
|
||||
pub = _fix_unsafe(keyring) if keyring else 'pubring.gpg'
|
||||
sec = _fix_unsafe(secring) if secring else 'secring.gpg'
|
||||
self.keyring = os.path.join(self._homedir, pub)
|
||||
self.secring = os.path.join(self._homedir, sec)
|
||||
self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH ZLIB ZIP'
|
||||
self.options = _sanitise(options) if options else None
|
||||
|
||||
self.encoding = locale.getpreferredencoding()
|
||||
if self.encoding is None: # This happens on Jython!
|
||||
self.encoding = sys.stdin.encoding
|
||||
|
||||
try:
|
||||
assert self.binary, "Could not find binary %s" % binary
|
||||
assert isinstance(verbose, (bool, str, int)), \
|
||||
"'verbose' must be boolean, string, or 0 <= n <= 9"
|
||||
assert isinstance(use_agent, bool), "'use_agent' must be boolean"
|
||||
|
||||
if self.options is not None:
|
||||
assert isinstance(self.options, str), "options not string"
|
||||
except (AssertionError, AttributeError) as ae:
|
||||
log.error("GPGBase.__init__(): %s" % ae.message)
|
||||
raise RuntimeError(ae.message)
|
||||
else:
|
||||
self.verbose = verbose
|
||||
self.use_agent = use_agent
|
||||
|
||||
if hasattr(self, '_agent_proc') \
|
||||
and getattr(self, '_remove_agent', None) is True:
|
||||
if hasattr(self, '__remove_path__'):
|
||||
self.__remove_path__('pinentry')
|
||||
|
||||
def __remove_path__(self, prog=None):
|
||||
log.debug("Attempting to remove %s from system PATH" % str(prog))
|
||||
if (prog is None) or (not isinstance(prog, str)): return
|
||||
|
||||
try:
|
||||
program = _util._which(prog)[0]
|
||||
except (OSError, IOerror, IndexError) as err:
|
||||
log.err(err.message)
|
||||
log.err("Cannot find program '%s', not changing PATH." % prog)
|
||||
return
|
||||
|
||||
## __remove_path__ cannot be an @classmethod in GPGMeta, because
|
||||
## the use_agent attribute must be set by the instance.
|
||||
if not self.use_agent:
|
||||
program_path = os.path.dirname(program)
|
||||
|
||||
## symlink our gpg binary into the current working directory
|
||||
if os.path.dirname(self.gpg.binary) == program_path:
|
||||
os.symlink(self.gpg.binary, os.path.join(os.getcwd(), 'gpg'))
|
||||
|
||||
## copy the original environment so we can put it back later:
|
||||
env_copy = os.environ
|
||||
path_copy = os.environ.pop('PATH')
|
||||
assert not os.environ.has_key('PATH')
|
||||
log.debug("Created a copy of system PATH: %r" % path_copy)
|
||||
|
||||
path_string = '{}:'.format(program_path)
|
||||
rm_program = path_copy.replace(path_string, None)
|
||||
|
||||
@staticmethod
|
||||
def update_path(env_copy, path_value):
|
||||
log.debug("Updating system path...")
|
||||
os.environ = env_copy
|
||||
os.environ.update({'PATH': path_value})
|
||||
log.debug("System $PATH: %s" % os.environ['PATH'])
|
||||
|
||||
update_path(env_copy, rm_program)
|
||||
|
||||
## register an _exithandler with the python interpreter:
|
||||
atexit.register(update_path, env_copy, path_copy)
|
||||
|
||||
# @property
|
||||
# def keyring(self):
|
||||
# """Get the public keyring."""
|
||||
# return self._keyring
|
||||
#
|
||||
# @keyring.setter
|
||||
# def keyring(self, pub):
|
||||
# """Set the file to use as GnuPG's current (public) keyring.
|
||||
#
|
||||
# :param str pub: The filename, relative to :attr:``GPG.homedir``, to use
|
||||
# for storing public key data.
|
||||
# """
|
||||
# ring = _fix_unsafe(pub) if pub else 'pubring.gpg'
|
||||
# self._keyring = os.path.join(self._homedir, ring)
|
||||
#
|
||||
# @property
|
||||
# def secring(self):
|
||||
# """Get the secret keyring."""
|
||||
# return self._secring
|
||||
#
|
||||
# @secring.setter
|
||||
# def secring(self, sec):
|
||||
# """Set the file to use as GnuPG's current secret keyring.
|
||||
#
|
||||
# :param str pub: The filename, relative to :attr:``GPG.homedir``, to use
|
||||
# for storing secret key data.
|
||||
# """
|
||||
# ring = _fix_unsafe(sec) if sec else 'secring.gpg'
|
||||
# self._secring = os.path.join(self._homedir, ring)
|
||||
|
||||
@property
|
||||
def default_preference_list(self):
|
||||
"""Get the default preference list."""
|
||||
return self._prefs
|
||||
|
||||
@default_preference_list.setter
|
||||
def default_preference_list(self, prefs):
|
||||
"""Set the default preference list.
|
||||
|
||||
:param str prefs: A string containing the default preferences for
|
||||
ciphers, digests, and compression algorithms.
|
||||
"""
|
||||
prefs = _check_preferences(prefs)
|
||||
if prefs is not None:
|
||||
self._prefs = prefs
|
||||
|
||||
@default_preference_list.deleter
|
||||
def default_preference_list(self, prefs):
|
||||
"""Reset the default preference list to its original state.
|
||||
|
||||
Note that "original state" does not mean the default preference
|
||||
list for whichever version of GnuPG is being used. It means the
|
||||
default preference list defined by :attr:`GPGBase._preferences`.
|
||||
|
||||
Using BZIP2 is avoided due to not interacting well with some versions
|
||||
of GnuPG>=2.0.0.
|
||||
"""
|
||||
self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH ZLIB ZIP'
|
||||
|
||||
|
||||
class GPG(object):
|
||||
def _homedir_getter(self):
|
||||
"""Get the directory currently being used as GnuPG's homedir.
|
||||
|
||||
If unspecified, use $HOME/.config/python-gnupg/
|
||||
|
||||
:rtype: str
|
||||
:returns: The absolute path to the current GnuPG homedir.
|
||||
"""
|
||||
return self._homedir
|
||||
|
||||
def _homedir_setter(self, directory):
|
||||
"""Set the directory to use as GnuPG's homedir.
|
||||
|
||||
If unspecified, use $HOME/.config/python-gnupg. If specified, ensure
|
||||
that the ``directory`` does not contain various shell escape
|
||||
characters. If ``directory`` is not found, it will be automatically
|
||||
created. Lastly, the ``direcory`` will be checked that the EUID has
|
||||
read and write permissions for it.
|
||||
|
||||
:param str homedir: A relative or absolute path to the directory to use
|
||||
for storing/accessing GnuPG's files, including
|
||||
keyrings and the trustdb.
|
||||
:raises: :exc:`RuntimeError` if unable to find a suitable directory to
|
||||
use.
|
||||
"""
|
||||
if not directory:
|
||||
log.debug("GPGBase._homedir_setter(): Using default homedir: '%s'"
|
||||
% _conf)
|
||||
directory = _conf
|
||||
|
||||
hd = _fix_unsafe(directory)
|
||||
log.debug("GPGBase._homedir_setter(): got directory '%s'" % hd)
|
||||
|
||||
if hd:
|
||||
log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd)
|
||||
_util._create_if_necessary(hd)
|
||||
|
||||
try:
|
||||
log.debug("GPGBase._homedir_setter(): checking permissions")
|
||||
assert _util._has_readwrite(hd), \
|
||||
"Homedir '%s' needs read/write permissions" % hd
|
||||
except AssertionError as ae:
|
||||
msg = ("Unable to set '%s' as GnuPG homedir" % directory)
|
||||
log.debug("GPGBase.homedir.setter(): %s" % msg)
|
||||
log.debug(ae.message)
|
||||
raise RuntimeError(ae.message)
|
||||
else:
|
||||
log.debug("GPGBase:")
|
||||
log.info("Setting homedir to '%s'" % hd)
|
||||
self._homedir = hd
|
||||
|
||||
homedir = InheritableProperty(_homedir_getter, _homedir_setter)
|
||||
|
||||
|
||||
class GPG(GPGBase):
|
||||
"""Encapsulate access to the gpg executable"""
|
||||
_decode_errors = 'strict'
|
||||
|
||||
|
|
Loading…
Reference in New Issue