GPG.encoding → GPG._encoding
parent
5b731736fe
commit
87ced52f85
|
@ -103,7 +103,7 @@ class GPGBase(object):
|
||||||
encoding = locale.getpreferredencoding()
|
encoding = locale.getpreferredencoding()
|
||||||
if encoding is None: # This happens on Jython!
|
if encoding is None: # This happens on Jython!
|
||||||
encoding = sys.stdin.encoding
|
encoding = sys.stdin.encoding
|
||||||
self.encoding = encoding.lower().replace('-', '_')
|
self._encoding = encoding.lower().replace('-', '_')
|
||||||
self.filesystemencoding = encodings.normalize_encoding(
|
self.filesystemencoding = encodings.normalize_encoding(
|
||||||
sys.getfilesystemencoding().lower())
|
sys.getfilesystemencoding().lower())
|
||||||
|
|
||||||
|
|
|
@ -766,7 +766,7 @@ class Sign(object):
|
||||||
__bool__ = __nonzero__
|
__bool__ = __nonzero__
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.data.decode(self.gpg.encoding, self.gpg._decode_errors)
|
return self.data.decode(self.gpg._encoding, self.gpg._decode_errors)
|
||||||
|
|
||||||
def handle_status(self, key, value):
|
def handle_status(self, key, value):
|
||||||
"""Parse a status code from the attached GnuPG process.
|
"""Parse a status code from the attached GnuPG process.
|
||||||
|
@ -1101,7 +1101,7 @@ class Crypt(Verify):
|
||||||
__bool__ = __nonzero__
|
__bool__ = __nonzero__
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return self.data.decode(self.gpg.encoding, self.gpg._decode_errors)
|
return self.data.decode(self.gpg._encoding, self.gpg._decode_errors)
|
||||||
|
|
||||||
def handle_status(self, key, value):
|
def handle_status(self, key, value):
|
||||||
"""Parse a status code from the attached GnuPG process.
|
"""Parse a status code from the attached GnuPG process.
|
||||||
|
|
30
src/gnupg.py
30
src/gnupg.py
|
@ -339,7 +339,7 @@ use_agent: %s
|
||||||
make sure it's joined before returning. If a stdin stream is given,
|
make sure it's joined before returning. If a stdin stream is given,
|
||||||
close it before returning.
|
close it before returning.
|
||||||
"""
|
"""
|
||||||
stderr = codecs.getreader(self.encoding)(process.stderr)
|
stderr = codecs.getreader(self._encoding)(process.stderr)
|
||||||
rr = threading.Thread(target=self._read_response, args=(stderr, result))
|
rr = threading.Thread(target=self._read_response, args=(stderr, result))
|
||||||
rr.setDaemon(True)
|
rr.setDaemon(True)
|
||||||
log.debug('stderr reader: %r', rr)
|
log.debug('stderr reader: %r', rr)
|
||||||
|
@ -368,11 +368,11 @@ use_agent: %s
|
||||||
"""Handle a call to GPG - pass input data, collect output data."""
|
"""Handle a call to GPG - pass input data, collect output data."""
|
||||||
p = self._open_subprocess(args, passphrase)
|
p = self._open_subprocess(args, passphrase)
|
||||||
if not binary:
|
if not binary:
|
||||||
stdin = codecs.getwriter(self.encoding)(p.stdin)
|
stdin = codecs.getwriter(self._encoding)(p.stdin)
|
||||||
else:
|
else:
|
||||||
stdin = p.stdin
|
stdin = p.stdin
|
||||||
if passphrase:
|
if passphrase:
|
||||||
_util._write_passphrase(stdin, passphrase, self.encoding)
|
_util._write_passphrase(stdin, passphrase, self._encoding)
|
||||||
writer = _util._threaded_copy_data(file, stdin)
|
writer = _util._threaded_copy_data(file, stdin)
|
||||||
self._collect_output(p, result, writer, stdin)
|
self._collect_output(p, result, writer, stdin)
|
||||||
return result
|
return result
|
||||||
|
@ -421,7 +421,7 @@ use_agent: %s
|
||||||
% (data, kwargs[keyid]))
|
% (data, kwargs[keyid]))
|
||||||
else:
|
else:
|
||||||
log.warn("No 'sign_with' keyid given! Using default key.")
|
log.warn("No 'sign_with' keyid given! Using default key.")
|
||||||
stream = _make_binary_stream(data, self.encoding)
|
stream = _make_binary_stream(data, self._encoding)
|
||||||
result = self._sign_file(stream, **kwargs)
|
result = self._sign_file(stream, **kwargs)
|
||||||
stream.close()
|
stream.close()
|
||||||
|
|
||||||
|
@ -467,7 +467,7 @@ use_agent: %s
|
||||||
proc = self._open_subprocess(args, passphrase is not None)
|
proc = self._open_subprocess(args, passphrase is not None)
|
||||||
try:
|
try:
|
||||||
if passphrase:
|
if passphrase:
|
||||||
_util._write_passphrase(proc.stdin, passphrase, self.encoding)
|
_util._write_passphrase(proc.stdin, passphrase, self._encoding)
|
||||||
writer = _util._threaded_copy_data(file, proc.stdin)
|
writer = _util._threaded_copy_data(file, proc.stdin)
|
||||||
except IOError as ioe:
|
except IOError as ioe:
|
||||||
log.exception("Error writing message: %s" % ioe.message)
|
log.exception("Error writing message: %s" % ioe.message)
|
||||||
|
@ -490,7 +490,7 @@ use_agent: %s
|
||||||
>>> assert verify
|
>>> assert verify
|
||||||
|
|
||||||
"""
|
"""
|
||||||
f = _make_binary_stream(data, self.encoding)
|
f = _make_binary_stream(data, self._encoding)
|
||||||
result = self.verify_file(f)
|
result = self.verify_file(f)
|
||||||
f.close()
|
f.close()
|
||||||
return result
|
return result
|
||||||
|
@ -575,7 +575,7 @@ use_agent: %s
|
||||||
|
|
||||||
result = self._result_map['import'](self)
|
result = self._result_map['import'](self)
|
||||||
log.info('Importing: %r', key_data[:256])
|
log.info('Importing: %r', key_data[:256])
|
||||||
data = _make_binary_stream(key_data, self.encoding)
|
data = _make_binary_stream(key_data, self._encoding)
|
||||||
self._handle_io(['--import'], data, result, binary=True)
|
self._handle_io(['--import'], data, result, binary=True)
|
||||||
data.close()
|
data.close()
|
||||||
return result
|
return result
|
||||||
|
@ -593,7 +593,7 @@ use_agent: %s
|
||||||
safe_keyserver = _fix_unsafe(keyserver)
|
safe_keyserver = _fix_unsafe(keyserver)
|
||||||
|
|
||||||
result = self._result_map['import'](self)
|
result = self._result_map['import'](self)
|
||||||
data = _make_binary_stream("", self.encoding)
|
data = _make_binary_stream("", self._encoding)
|
||||||
args = ['--keyserver', keyserver, '--recv-keys']
|
args = ['--keyserver', keyserver, '--recv-keys']
|
||||||
|
|
||||||
if keyids:
|
if keyids:
|
||||||
|
@ -672,7 +672,7 @@ use_agent: %s
|
||||||
result = self._result_map['delete'](self) # any result will do
|
result = self._result_map['delete'](self) # any result will do
|
||||||
self._collect_output(p, result, stdin=p.stdin)
|
self._collect_output(p, result, stdin=p.stdin)
|
||||||
log.debug('Exported:%s%r' % (os.linesep, result.data))
|
log.debug('Exported:%s%r' % (os.linesep, result.data))
|
||||||
return result.data.decode(self.encoding, self._decode_errors)
|
return result.data.decode(self._encoding, self._decode_errors)
|
||||||
|
|
||||||
def list_keys(self, secret=False):
|
def list_keys(self, secret=False):
|
||||||
"""List the keys currently in the keyring.
|
"""List the keys currently in the keyring.
|
||||||
|
@ -709,7 +709,7 @@ use_agent: %s
|
||||||
# Get the response information
|
# Get the response information
|
||||||
result = self._result_map['list'](self)
|
result = self._result_map['list'](self)
|
||||||
self._collect_output(p, result, stdin=p.stdin)
|
self._collect_output(p, result, stdin=p.stdin)
|
||||||
lines = result.data.decode(self.encoding,
|
lines = result.data.decode(self._encoding,
|
||||||
self._decode_errors).splitlines()
|
self._decode_errors).splitlines()
|
||||||
valid_keywords = 'pub uid sec fpr sub'.split()
|
valid_keywords = 'pub uid sec fpr sub'.split()
|
||||||
for line in lines:
|
for line in lines:
|
||||||
|
@ -730,7 +730,7 @@ use_agent: %s
|
||||||
"""List the packet contents of a file."""
|
"""List the packet contents of a file."""
|
||||||
args = ["--list-packets"]
|
args = ["--list-packets"]
|
||||||
result = self._result_map['packets'](self)
|
result = self._result_map['packets'](self)
|
||||||
self._handle_io(args, _make_binary_stream(raw_data, self.encoding),
|
self._handle_io(args, _make_binary_stream(raw_data, self._encoding),
|
||||||
result)
|
result)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
@ -781,7 +781,7 @@ use_agent: %s
|
||||||
## see TODO file, tag :gen_key: for todo items
|
## see TODO file, tag :gen_key: for todo items
|
||||||
args = ["--gen-key --batch"]
|
args = ["--gen-key --batch"]
|
||||||
key = self._result_map['generate'](self)
|
key = self._result_map['generate'](self)
|
||||||
f = _make_binary_stream(input, self.encoding)
|
f = _make_binary_stream(input, self._encoding)
|
||||||
self._handle_io(args, f, key, binary=True)
|
self._handle_io(args, f, key, binary=True)
|
||||||
f.close()
|
f.close()
|
||||||
|
|
||||||
|
@ -1269,7 +1269,7 @@ generate keys. Please see
|
||||||
>>> assert result.fingerprint == print1
|
>>> assert result.fingerprint == print1
|
||||||
|
|
||||||
"""
|
"""
|
||||||
stream = _make_binary_stream(data, self.encoding)
|
stream = _make_binary_stream(data, self._encoding)
|
||||||
result = self.encrypt_file(stream, recipients, **kwargs)
|
result = self.encrypt_file(stream, recipients, **kwargs)
|
||||||
stream.close()
|
stream.close()
|
||||||
return result
|
return result
|
||||||
|
@ -1279,7 +1279,7 @@ generate keys. Please see
|
||||||
|
|
||||||
:param message: A string or file-like object to decrypt.
|
:param message: A string or file-like object to decrypt.
|
||||||
"""
|
"""
|
||||||
stream = _make_binary_stream(message, self.encoding)
|
stream = _make_binary_stream(message, self._encoding)
|
||||||
result = self.decrypt_file(stream, **kwargs)
|
result = self.decrypt_file(stream, **kwargs)
|
||||||
stream.close()
|
stream.close()
|
||||||
return result
|
return result
|
||||||
|
@ -1349,7 +1349,7 @@ class GPGWrapper(GPG):
|
||||||
"""Send keys to a keyserver."""
|
"""Send keys to a keyserver."""
|
||||||
result = self._result_map['list'](self)
|
result = self._result_map['list'](self)
|
||||||
log.debug('send_keys: %r', keyids)
|
log.debug('send_keys: %r', keyids)
|
||||||
data = _util._make_binary_stream("", self.encoding)
|
data = _util._make_binary_stream("", self._encoding)
|
||||||
args = ['--keyserver', keyserver, '--send-keys']
|
args = ['--keyserver', keyserver, '--send-keys']
|
||||||
args.extend(keyids)
|
args.extend(keyids)
|
||||||
self._handle_io(args, data, result, binary=True)
|
self._handle_io(args, data, result, binary=True)
|
||||||
|
|
|
@ -554,7 +554,7 @@ class GPGTestCase(unittest.TestCase):
|
||||||
|
|
||||||
def test_signature_string_alternate_encoding(self):
|
def test_signature_string_alternate_encoding(self):
|
||||||
key = self.generate_key("Nos Oignons", "nos-oignons.net")
|
key = self.generate_key("Nos Oignons", "nos-oignons.net")
|
||||||
self.gpg.encoding = 'latin-1'
|
self.gpg._encoding = 'latin-1'
|
||||||
message = "Mêle-toi de tes oignons"
|
message = "Mêle-toi de tes oignons"
|
||||||
sig = self.gpg.sign(message, default_key=key.fingerprint,
|
sig = self.gpg.sign(message, default_key=key.fingerprint,
|
||||||
passphrase='nosoignons')
|
passphrase='nosoignons')
|
||||||
|
@ -603,7 +603,7 @@ class GPGTestCase(unittest.TestCase):
|
||||||
passphrase='johanborst')
|
passphrase='johanborst')
|
||||||
self.assertTrue(sig, "Good passphrase should succeed")
|
self.assertTrue(sig, "Good passphrase should succeed")
|
||||||
try:
|
try:
|
||||||
file = _util._make_binary_stream(sig.data, self.gpg.encoding)
|
file = _util._make_binary_stream(sig.data, self.gpg._encoding)
|
||||||
verified = self.gpg.verify_file(file)
|
verified = self.gpg.verify_file(file)
|
||||||
except UnicodeDecodeError: #happens in Python 2.6
|
except UnicodeDecodeError: #happens in Python 2.6
|
||||||
verified = self.gpg.verify_file(io.BytesIO(sig.data))
|
verified = self.gpg.verify_file(io.BytesIO(sig.data))
|
||||||
|
@ -686,12 +686,12 @@ authentication."""
|
||||||
gentry = str(key.fingerprint)
|
gentry = str(key.fingerprint)
|
||||||
key = self.generate_key("Marten van Dijk", "xorr.ox")
|
key = self.generate_key("Marten van Dijk", "xorr.ox")
|
||||||
dijk = str(key.fingerprint)
|
dijk = str(key.fingerprint)
|
||||||
self.gpg.encoding = 'latin-1'
|
self.gpg._encoding = 'latin-1'
|
||||||
if _util._py3k:
|
if _util._py3k:
|
||||||
data = 'Hello, André!'
|
data = 'Hello, André!'
|
||||||
else:
|
else:
|
||||||
data = unicode('Hello, André', self.gpg.encoding)
|
data = unicode('Hello, André', self.gpg._encoding)
|
||||||
data = data.encode(self.gpg.encoding)
|
data = data.encode(self.gpg._encoding)
|
||||||
encrypted = self.gpg.encrypt(data, gentry)
|
encrypted = self.gpg.encrypt(data, gentry)
|
||||||
edata = str(encrypted.data)
|
edata = str(encrypted.data)
|
||||||
self.assertNotEqual(data, edata)
|
self.assertNotEqual(data, edata)
|
||||||
|
@ -868,7 +868,7 @@ authentication."""
|
||||||
fdata = enc2.read()
|
fdata = enc2.read()
|
||||||
ddata = str(self.gpg.decrypt(fdata, passphrase="overalls"))
|
ddata = str(self.gpg.decrypt(fdata, passphrase="overalls"))
|
||||||
|
|
||||||
data = data.encode(self.gpg.encoding)
|
data = data.encode(self.gpg._encoding)
|
||||||
if ddata != data:
|
if ddata != data:
|
||||||
log.debug("data was: %r" % data)
|
log.debug("data was: %r" % data)
|
||||||
log.debug("new (from filehandle): %r" % fdata)
|
log.debug("new (from filehandle): %r" % fdata)
|
||||||
|
|
Loading…
Reference in New Issue