diff --git a/requirements.txt b/requirements.txt index 50feaef..c82f9d5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ Sphinx>=1.1 +psutil>=0.5.1 diff --git a/src/gnupg.py b/src/gnupg.py index b09fcef..98da167 100644 --- a/src/gnupg.py +++ b/src/gnupg.py @@ -78,9 +78,12 @@ try: except ImportError: from cStringIO import StringIO +from pprint import pprint +from psutil import process_iter from subprocess import Popen from subprocess import PIPE +import atexit import codecs ## For AOS, the locale module will need to point to a wrapper around the ## java.util.Locale class. @@ -104,13 +107,258 @@ from _util import log import _util import _parsers -import parsers + +class GPGMeta(type): + """Metaclass for changing the :meth:GPG.__init__ initialiser. + + Detects running gpg-agent processes and the presence of a pinentry + program, and disables pinentry so that python-gnupg can write the + passphrase to the controlled GnuPG process without killing the agent. + """ + + def __new__(cls, name, bases, attrs): + """Construct the initialiser for GPG""" + log.debug("Metaclass __new__ constructor called for %r" % cls) + if cls._find_agent(): + ## call the normal GPG.__init__() initialisor: + attrs['init'] = cls.__init__ ## nothing changed for now + attrs['_remove_agent'] = True + return super(GPGMeta, cls).__new__(cls, name, bases, attrs) + + @classmethod + def _find_agent(cls): + """Discover if a gpg-agent process for the current euid is running. + + If there is a matching gpg-agent process, set a :class:psutil.Process + instance containing the gpg-agent process' information to + :attr:cls._agent_proc. + + :returns: True if there exists a gpg-agent process running under the + same effective user ID as that of this program. Otherwise, + returns None. + """ + identity = os.getresuid() + for proc in process_iter(): + if (proc.name == "gpg-agent") and proc.is_running: + log.debug("Found gpg-agent process with pid %d" % proc.pid) + if proc.uids == identity: + log.debug( + "Effective UIDs of this process and gpg-agent match") + setattr(cls, '_agent_proc', proc) + return True + + ## xxx we might not need this, try setting: + ## attrs['remove_path'] = __remove_path__ + + # @classmethod + # def _init_decorator(cls): + # """Wraps the :meth:__init__ function in a partial of itself.""" + # log.debug("_init_decorator called for %s" % cls.__init__.__repr__()) + # def _init_wrapper(*args, **kwargs): + # wraps(cls.__init__, *args, **kwargs) + # if getattr(cls, '_agent_proc', None) is not None: + # cls.__remove_path__(prog='pinentry') + # return _init_wrapper -import util as _util +class GPGBase(object): + """Base class to control process initialisation and for property storage.""" + + __metaclass__ = GPGMeta + + def __init__(self, binary=None, home=None, keyring=None, secring=None, + use_agent=False, default_preference_list=None, + verbose=False, options=None): + + self.binary = _util._find_binary(binary) + self.homedir = home if home else _conf + pub = _fix_unsafe(keyring) if keyring else 'pubring.gpg' + sec = _fix_unsafe(secring) if secring else 'secring.gpg' + self.keyring = os.path.join(self._homedir, pub) + self.secring = os.path.join(self._homedir, sec) + self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH ZLIB ZIP' + self.options = _sanitise(options) if options else None + + self.encoding = locale.getpreferredencoding() + if self.encoding is None: # This happens on Jython! + self.encoding = sys.stdin.encoding + + try: + assert self.binary, "Could not find binary %s" % binary + assert isinstance(verbose, (bool, str, int)), \ + "'verbose' must be boolean, string, or 0 <= n <= 9" + assert isinstance(use_agent, bool), "'use_agent' must be boolean" + + if self.options is not None: + assert isinstance(self.options, str), "options not string" + except (AssertionError, AttributeError) as ae: + log.error("GPGBase.__init__(): %s" % ae.message) + raise RuntimeError(ae.message) + else: + self.verbose = verbose + self.use_agent = use_agent + + if hasattr(self, '_agent_proc') \ + and getattr(self, '_remove_agent', None) is True: + if hasattr(self, '__remove_path__'): + self.__remove_path__('pinentry') + + def __remove_path__(self, prog=None): + log.debug("Attempting to remove %s from system PATH" % str(prog)) + if (prog is None) or (not isinstance(prog, str)): return + + try: + program = _util._which(prog)[0] + except (OSError, IOerror, IndexError) as err: + log.err(err.message) + log.err("Cannot find program '%s', not changing PATH." % prog) + return + + ## __remove_path__ cannot be an @classmethod in GPGMeta, because + ## the use_agent attribute must be set by the instance. + if not self.use_agent: + program_path = os.path.dirname(program) + + ## symlink our gpg binary into the current working directory + if os.path.dirname(self.gpg.binary) == program_path: + os.symlink(self.gpg.binary, os.path.join(os.getcwd(), 'gpg')) + + ## copy the original environment so we can put it back later: + env_copy = os.environ + path_copy = os.environ.pop('PATH') + assert not os.environ.has_key('PATH') + log.debug("Created a copy of system PATH: %r" % path_copy) + + path_string = '{}:'.format(program_path) + rm_program = path_copy.replace(path_string, None) + + @staticmethod + def update_path(env_copy, path_value): + log.debug("Updating system path...") + os.environ = env_copy + os.environ.update({'PATH': path_value}) + log.debug("System $PATH: %s" % os.environ['PATH']) + + update_path(env_copy, rm_program) + + ## register an _exithandler with the python interpreter: + atexit.register(update_path, env_copy, path_copy) + + # @property + # def keyring(self): + # """Get the public keyring.""" + # return self._keyring + # + # @keyring.setter + # def keyring(self, pub): + # """Set the file to use as GnuPG's current (public) keyring. + # + # :param str pub: The filename, relative to :attr:``GPG.homedir``, to use + # for storing public key data. + # """ + # ring = _fix_unsafe(pub) if pub else 'pubring.gpg' + # self._keyring = os.path.join(self._homedir, ring) + # + # @property + # def secring(self): + # """Get the secret keyring.""" + # return self._secring + # + # @secring.setter + # def secring(self, sec): + # """Set the file to use as GnuPG's current secret keyring. + # + # :param str pub: The filename, relative to :attr:``GPG.homedir``, to use + # for storing secret key data. + # """ + # ring = _fix_unsafe(sec) if sec else 'secring.gpg' + # self._secring = os.path.join(self._homedir, ring) + + @property + def default_preference_list(self): + """Get the default preference list.""" + return self._prefs + + @default_preference_list.setter + def default_preference_list(self, prefs): + """Set the default preference list. + + :param str prefs: A string containing the default preferences for + ciphers, digests, and compression algorithms. + """ + prefs = _check_preferences(prefs) + if prefs is not None: + self._prefs = prefs + + @default_preference_list.deleter + def default_preference_list(self, prefs): + """Reset the default preference list to its original state. + + Note that "original state" does not mean the default preference + list for whichever version of GnuPG is being used. It means the + default preference list defined by :attr:`GPGBase._preferences`. + + Using BZIP2 is avoided due to not interacting well with some versions + of GnuPG>=2.0.0. + """ + self._prefs = 'SHA512 SHA384 SHA256 AES256 CAMELLIA256 TWOFISH ZLIB ZIP' -class GPG(object): + def _homedir_getter(self): + """Get the directory currently being used as GnuPG's homedir. + + If unspecified, use $HOME/.config/python-gnupg/ + + :rtype: str + :returns: The absolute path to the current GnuPG homedir. + """ + return self._homedir + + def _homedir_setter(self, directory): + """Set the directory to use as GnuPG's homedir. + + If unspecified, use $HOME/.config/python-gnupg. If specified, ensure + that the ``directory`` does not contain various shell escape + characters. If ``directory`` is not found, it will be automatically + created. Lastly, the ``direcory`` will be checked that the EUID has + read and write permissions for it. + + :param str homedir: A relative or absolute path to the directory to use + for storing/accessing GnuPG's files, including + keyrings and the trustdb. + :raises: :exc:`RuntimeError` if unable to find a suitable directory to + use. + """ + if not directory: + log.debug("GPGBase._homedir_setter(): Using default homedir: '%s'" + % _conf) + directory = _conf + + hd = _fix_unsafe(directory) + log.debug("GPGBase._homedir_setter(): got directory '%s'" % hd) + + if hd: + log.debug("GPGBase._homedir_setter(): Check existence of '%s'" % hd) + _util._create_if_necessary(hd) + + try: + log.debug("GPGBase._homedir_setter(): checking permissions") + assert _util._has_readwrite(hd), \ + "Homedir '%s' needs read/write permissions" % hd + except AssertionError as ae: + msg = ("Unable to set '%s' as GnuPG homedir" % directory) + log.debug("GPGBase.homedir.setter(): %s" % msg) + log.debug(ae.message) + raise RuntimeError(ae.message) + else: + log.debug("GPGBase:") + log.info("Setting homedir to '%s'" % hd) + self._homedir = hd + + homedir = InheritableProperty(_homedir_getter, _homedir_setter) + + +class GPG(GPGBase): """Encapsulate access to the gpg executable""" _decode_errors = 'strict'