Completely refactor AES GCM

Some bad assumptions were made during the creation of our Python AES GCM
code. This is now modified to be more in-line with other libraries. This
is an API breaking change on unreleased code.

This now allows for aad data to be used, varying length of
authentication tags and fixes a bug for multipart.

1. Now unified to a single class AesGcmStream()
2. Used `encrypt()` and `decrypt()` instead of `update()` to avoid
   confusion over encryption and aad semantics
3. final tag_bytes is configurable in the constructor
4. `set_aad()` added to add the aad data
5. aad data is cleared after first `encrypt()` or `decrypt()` call due
   to quirk in the C API.
6. More tests added
pull/41/head
Andrew Hutchings 2022-02-24 14:16:01 +00:00 committed by Daniele Lacamera
parent 969681a731
commit e34a0ece53
3 changed files with 151 additions and 73 deletions

View File

@ -9,10 +9,10 @@ Steaming Encryption Classes
Interface Interface
~~~~~~~~~ ~~~~~~~~~
AesGcmStreamEncrypt AesGcmStream
~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~
.. autoclass:: AesGcmStreamEncrypt .. autoclass:: AesGcmStream
:members: :members:
:inherited-members: :inherited-members:
@ -22,29 +22,15 @@ AesGcmStreamEncrypt
>>> from wolfcrypt.ciphers import AesGcmStreamEncrypt >>> from wolfcrypt.ciphers import AesGcmStreamEncrypt
>>> from binascii import hexlify as b2h >>> from binascii import hexlify as b2h
>>> gcm = AesGcmStreamEncrypt(b'fedcba9876543210', b'0123456789abcdef') >>> gcm = AesGcmStream(b'fedcba9876543210', b'0123456789abcdef')
>>> buf = gcm.update("hello world") >>> buf = gcm.encrypt("hello world")
>>> authTag = gcm.final() >>> authTag = gcm.final()
>>> b2h(buf) >>> b2h(buf)
b'5ba7d42e1bf01d7998e932' b'5ba7d42e1bf01d7998e932'
>>> b2h(authTag) >>> b2h(authTag)
b'cef91ba0c8c6431c7e19f64c9d9e371b' b'8f85338aa0b13f48f8b17482dbb8acca'
>>> gcm = AesGcmStream(b'fedcba9876543210', b'0123456789abcdef')
AesGcmStreamDecrypt >>> buf = gcm.decrypt(h2b(b'5ba7d42e1bf01d7998e932'))
~~~~~~~~~~~~~~~~~~~ >>> gcm.final(h2b(b'8f85338aa0b13f48f8b17482dbb8acca'))
.. autoclass:: AesGcmStreamDecrypt
:members:
:inherited-members:
**Example:**
.. doctest::
>>> from wolfcrypt.ciphers import AesGcmStreamDecrypt, t2b
>>> from binascii import unhexlify as h2b
>>> gcm = AesGcmStreamDecrypt(b'fedcba9876543210', b'0123456789abcdef')
>>> buf = gcm.update(h2b(b'5ba7d42e1bf01d7998e932'))
>>> gcm.final(h2b(b'cef91ba0c8c6431c7e19f64c9d9e371b'))
>>> t2b(buf) >>> t2b(buf)
b'hello world' b'hello world'

View File

@ -25,34 +25,101 @@ import pytest
from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import ffi as _ffi
from wolfcrypt._ffi import lib as _lib from wolfcrypt._ffi import lib as _lib
from wolfcrypt.utils import t2b from wolfcrypt.utils import t2b
from wolfcrypt.exceptions import WolfCryptError
from binascii import hexlify as b2h, unhexlify as h2b from binascii import hexlify as b2h, unhexlify as h2b
from wolfcrypt.ciphers import AesGcmStreamEncrypt, AesGcmStreamDecrypt from wolfcrypt.ciphers import AesGcmStream
def test_encrypt(): def test_encrypt():
key = "fedcba9876543210" key = "fedcba9876543210"
iv = "0123456789abcdef" iv = "0123456789abcdef"
gcm = AesGcmStreamEncrypt(key, iv) gcm = AesGcmStream(key, iv)
buf = gcm.update("hello world") buf = gcm.encrypt("hello world")
authTag = gcm.final() authTag = gcm.final()
assert b2h(authTag) == bytes('cef91ba0c8c6431c7e19f64c9d9e371b', 'utf-8') assert b2h(authTag) == bytes('ac8fcee96dc6ef8e5236da19b6197d2e', 'utf-8')
assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8")
gcmdec = AesGcmStreamDecrypt(key, iv) gcmdec = AesGcmStream(key, iv)
bufdec = gcmdec.update(buf) bufdec = gcmdec.decrypt(buf)
gcmdec.final(authTag)
assert bufdec == t2b("hello world")
def test_encrypt_short_tag():
key = "fedcba9876543210"
iv = "0123456789abcdef"
gcm = AesGcmStream(key, iv, 12)
buf = gcm.encrypt("hello world")
authTag = gcm.final()
assert b2h(authTag) == bytes('ac8fcee96dc6ef8e5236da19', 'utf-8')
assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8")
gcmdec = AesGcmStream(key, iv)
bufdec = gcmdec.decrypt(buf)
gcmdec.final(authTag) gcmdec.final(authTag)
assert bufdec == t2b("hello world") assert bufdec == t2b("hello world")
def test_multipart(): def test_multipart():
key = "fedcba9876543210" key = "fedcba9876543210"
iv = "0123456789abcdef" iv = "0123456789abcdef"
gcm = AesGcmStreamEncrypt(key, iv) gcm = AesGcmStream(key, iv)
buf = gcm.update("hello") buf = gcm.encrypt("hello")
buf += gcm.update(" world") buf += gcm.encrypt(" world")
authTag = gcm.final() authTag = gcm.final()
assert b2h(authTag) == bytes('6862647a27c7b6aa0a6882b3e117e944', 'utf-8') assert b2h(authTag) == bytes('ac8fcee96dc6ef8e5236da19b6197d2e', 'utf-8')
assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8")
gcmdec = AesGcmStreamDecrypt(key, iv) gcmdec = AesGcmStream(key, iv)
bufdec = gcmdec.update(buf[:5]) bufdec = gcmdec.decrypt(buf[:5])
bufdec += gcmdec.update(buf[5:]) bufdec += gcmdec.decrypt(buf[5:])
gcmdec.final(authTag) gcmdec.final(authTag)
assert bufdec == t2b("hello world") assert bufdec == t2b("hello world")
def test_encrypt_aad():
key = "fedcba9876543210"
iv = "0123456789abcdef"
aad = "aad data"
gcm = AesGcmStream(key, iv)
gcm.set_aad(aad)
buf = gcm.encrypt("hello world")
authTag = gcm.final()
print(b2h(authTag))
assert b2h(authTag) == bytes('8f85338aa0b13f48f8b17482dbb8acca', 'utf-8')
assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8")
gcmdec = AesGcmStream(key, iv)
gcmdec.set_aad(aad)
bufdec = gcmdec.decrypt(buf)
gcmdec.final(authTag)
assert bufdec == t2b("hello world")
def test_multipart_aad():
key = "fedcba9876543210"
iv = "0123456789abcdef"
aad = "aad data"
gcm = AesGcmStream(key, iv)
gcm.set_aad(aad)
buf = gcm.encrypt("hello")
buf += gcm.encrypt(" world")
authTag = gcm.final()
assert b2h(authTag) == bytes('8f85338aa0b13f48f8b17482dbb8acca', 'utf-8')
assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8")
gcmdec = AesGcmStream(key, iv)
gcmdec.set_aad(aad)
bufdec = gcmdec.decrypt(buf[:5])
bufdec += gcmdec.decrypt(buf[5:])
gcmdec.final(authTag)
assert bufdec == t2b("hello world")
def test_encrypt_aad_bad():
key = "fedcba9876543210"
iv = "0123456789abcdef"
aad = "aad data"
aad_bad = "bad data"
gcm = AesGcmStream(key, iv)
gcm.set_aad(aad)
buf = gcm.encrypt("hello world")
authTag = gcm.final()
print(b2h(authTag))
assert b2h(authTag) == bytes('8f85338aa0b13f48f8b17482dbb8acca', 'utf-8')
assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8")
gcmdec = AesGcmStream(key, iv)
gcmdec.set_aad(aad_bad)
gcmdec.decrypt(buf)
with pytest.raises(WolfCryptError):
gcmdec.final(authTag)

View File

@ -273,73 +273,98 @@ if _lib.AES_ENABLED:
raise ValueError("Invalid mode associated to cipher") raise ValueError("Invalid mode associated to cipher")
if _lib.AESGCM_STREAM: if _lib.AESGCM_STREAM:
class _AesGcmStream(object): class AesGcmStream(object):
""" """
AES GCM Stream AES GCM Stream
""" """
block_size = 16 block_size = 16
_key_sizes = [16, 24, 32] _key_sizes = [16, 24, 32]
_native_type = "Aes *" _native_type = "Aes *"
_aad = bytes()
_tag_bytes = 16
_mode = None
def __init__(self, key, IV): def __init__(self, key, IV, tag_bytes=16):
"""
tag_bytes is the number of bytes to use for the authentication tag during encryption
"""
key = t2b(key) key = t2b(key)
IV = t2b(IV) IV = t2b(IV)
self._tag_bytes = tag_bytes
if len(key) not in self._key_sizes: if len(key) not in self._key_sizes:
raise ValueError("key must be %s in length, not %d" % raise ValueError("key must be %s in length, not %d" %
(self._key_sizes, len(key))) (self._key_sizes, len(key)))
self._native_object = _ffi.new(self._native_type) self._native_object = _ffi.new(self._native_type)
_lib.wc_AesInit(self._native_object, _ffi.NULL, -2) _lib.wc_AesInit(self._native_object, _ffi.NULL, -2)
self._authIn = _ffi.new("byte[%d]" % self.block_size)
ret = _lib.wc_AesGcmInit(self._native_object, key, len(key), IV, len(IV)) ret = _lib.wc_AesGcmInit(self._native_object, key, len(key), IV, len(IV))
if ret < 0: if ret < 0:
raise WolfCryptError("Init error (%d)" % ret) raise WolfCryptError("Init error (%d)" % ret)
def update(self, data): def set_aad(self, data):
""" """
Updates the stream with another segment of data. Set the additional authentication data for the stream
"""
if self._mode is not None:
raise WolfCryptError("AAD can only be set before encrypt() or decrypt() is called")
self._aad = t2b(data)
def encrypt(self, data):
"""
Add more data to the encryption stream
""" """
ret = 0
data = t2b(data) data = t2b(data)
if self._mode is None:
self._mode = _ENCRYPTION
elif self._mode == _DECRYPTION:
raise WolfCryptError("Class instance already in use for decryption")
self._buf = _ffi.new("byte[%d]" % (len(data))) self._buf = _ffi.new("byte[%d]" % (len(data)))
ret = self._update(data) ret = _lib.wc_AesGcmEncryptUpdate(self._native_object, self._buf, data, len(data), self._aad, len(self._aad))
if ret < 0:
raise WolfCryptError("Decryption error (%d)" % ret)
# Reset aad after first packet
self._aad = bytes()
return bytes(self._buf)
def decrypt(self, data):
"""
Add more data to the decryption stream
"""
data = t2b(data)
if self._mode is None:
self._mode = _DECRYPTION
elif self._mode == _ENCRYPTION:
raise WolfCryptError("Class instance already in use for decryption")
self._buf = _ffi.new("byte[%d]" % (len(data)))
ret = _lib.wc_AesGcmDecryptUpdate(self._native_object, self._buf, data, len(data), self._aad, len(self._aad))
# Reset after first packet
self._aad = bytes()
if ret < 0: if ret < 0:
raise WolfCryptError("Decryption error (%d)" % ret) raise WolfCryptError("Decryption error (%d)" % ret)
return bytes(self._buf) return bytes(self._buf)
class AesGcmStreamEncrypt(_AesGcmStream): def final(self, authTag=None):
""" """
AES GCM Streaming Encryption When encrypting, finalize the stream and return an authentication tag for the stream.
When decrypting, verify the authentication tag for the stream.
The authTag parameter is only used for decrypting.
""" """
def _update(self, data): if self._mode is None:
return _lib.wc_AesGcmEncryptUpdate(self._native_object, self._buf, data, len(data), self._authIn, self.block_size) raise WolfCryptError("Final called with no encryption or decryption")
elif self._mode == _ENCRYPTION:
def final(self): authTag = _ffi.new("byte[%d]" % self._tag_bytes)
""" ret = _lib.wc_AesGcmEncryptFinal(self._native_object, authTag, self._tag_bytes)
Finalize the stream and return an authentication tag for the stream.
"""
authTag = _ffi.new("byte[%d]" % self.block_size)
ret = _lib.wc_AesGcmEncryptFinal(self._native_object, authTag, self.block_size)
if ret < 0: if ret < 0:
raise WolfCryptError("Encryption error (%d)" % ret) raise WolfCryptError("Encryption error (%d)" % ret)
return _ffi.buffer(authTag)[:] return _ffi.buffer(authTag)[:]
else:
if authTag is None:
class AesGcmStreamDecrypt(_AesGcmStream): raise WolfCryptError("authTag parameter required")
"""
AES GCM Streaming Decryption
"""
def _update(self, data):
return _lib.wc_AesGcmDecryptUpdate(self._native_object, self._buf, data, len(data), self._authIn, self.block_size)
def final(self, authTag):
"""
Finalize the stream and verify using the provided authentication tag.
"""
authTag = t2b(authTag) authTag = t2b(authTag)
ret = _lib.wc_AesGcmDecryptFinal(self._native_object, authTag, self.block_size) ret = _lib.wc_AesGcmDecryptFinal(self._native_object, authTag, len(authTag))
if ret < 0: if ret < 0:
raise WolfCryptError("Decryption error (%d)" % ret) raise WolfCryptError("Decryption error (%d)" % ret)
if _lib.CHACHA_ENABLED: if _lib.CHACHA_ENABLED:
class ChaCha(_Cipher): class ChaCha(_Cipher):
""" """