From e34a0ece5304f7a94fef7bf5a96073e589d9f7ed Mon Sep 17 00:00:00 2001 From: Andrew Hutchings Date: Thu, 24 Feb 2022 14:16:01 +0000 Subject: [PATCH] Completely refactor AES GCM Some bad assumptions were made during the creation of our Python AES GCM code. This is now modified to be more in-line with other libraries. This is an API breaking change on unreleased code. This now allows for aad data to be used, varying length of authentication tags and fixes a bug for multipart. 1. Now unified to a single class AesGcmStream() 2. Used `encrypt()` and `decrypt()` instead of `update()` to avoid confusion over encryption and aad semantics 3. final tag_bytes is configurable in the constructor 4. `set_aad()` added to add the aad data 5. aad data is cleared after first `encrypt()` or `decrypt()` call due to quirk in the C API. 6. More tests added --- docs/streaming.rst | 32 ++++-------- tests/test_aesgcmstream.py | 93 ++++++++++++++++++++++++++++++----- wolfcrypt/ciphers.py | 99 ++++++++++++++++++++++++-------------- 3 files changed, 151 insertions(+), 73 deletions(-) diff --git a/docs/streaming.rst b/docs/streaming.rst index 485b79e..a0417f9 100644 --- a/docs/streaming.rst +++ b/docs/streaming.rst @@ -9,10 +9,10 @@ Steaming Encryption Classes Interface ~~~~~~~~~ -AesGcmStreamEncrypt -~~~~~~~~~~~~~~~~~~~ +AesGcmStream +~~~~~~~~~~~~ -.. autoclass:: AesGcmStreamEncrypt +.. autoclass:: AesGcmStream :members: :inherited-members: @@ -22,29 +22,15 @@ AesGcmStreamEncrypt >>> from wolfcrypt.ciphers import AesGcmStreamEncrypt >>> from binascii import hexlify as b2h - >>> gcm = AesGcmStreamEncrypt(b'fedcba9876543210', b'0123456789abcdef') - >>> buf = gcm.update("hello world") + >>> gcm = AesGcmStream(b'fedcba9876543210', b'0123456789abcdef') + >>> buf = gcm.encrypt("hello world") >>> authTag = gcm.final() >>> b2h(buf) b'5ba7d42e1bf01d7998e932' >>> b2h(authTag) - b'cef91ba0c8c6431c7e19f64c9d9e371b' - -AesGcmStreamDecrypt -~~~~~~~~~~~~~~~~~~~ - -.. autoclass:: AesGcmStreamDecrypt - :members: - :inherited-members: - -**Example:** - -.. doctest:: - - >>> from wolfcrypt.ciphers import AesGcmStreamDecrypt, t2b - >>> from binascii import unhexlify as h2b - >>> gcm = AesGcmStreamDecrypt(b'fedcba9876543210', b'0123456789abcdef') - >>> buf = gcm.update(h2b(b'5ba7d42e1bf01d7998e932')) - >>> gcm.final(h2b(b'cef91ba0c8c6431c7e19f64c9d9e371b')) + b'8f85338aa0b13f48f8b17482dbb8acca' + >>> gcm = AesGcmStream(b'fedcba9876543210', b'0123456789abcdef') + >>> buf = gcm.decrypt(h2b(b'5ba7d42e1bf01d7998e932')) + >>> gcm.final(h2b(b'8f85338aa0b13f48f8b17482dbb8acca')) >>> t2b(buf) b'hello world' diff --git a/tests/test_aesgcmstream.py b/tests/test_aesgcmstream.py index 30a14ab..1b949a1 100644 --- a/tests/test_aesgcmstream.py +++ b/tests/test_aesgcmstream.py @@ -25,34 +25,101 @@ import pytest from wolfcrypt._ffi import ffi as _ffi from wolfcrypt._ffi import lib as _lib from wolfcrypt.utils import t2b +from wolfcrypt.exceptions import WolfCryptError from binascii import hexlify as b2h, unhexlify as h2b -from wolfcrypt.ciphers import AesGcmStreamEncrypt, AesGcmStreamDecrypt +from wolfcrypt.ciphers import AesGcmStream def test_encrypt(): key = "fedcba9876543210" iv = "0123456789abcdef" - gcm = AesGcmStreamEncrypt(key, iv) - buf = gcm.update("hello world") + gcm = AesGcmStream(key, iv) + buf = gcm.encrypt("hello world") authTag = gcm.final() - assert b2h(authTag) == bytes('cef91ba0c8c6431c7e19f64c9d9e371b', 'utf-8') + assert b2h(authTag) == bytes('ac8fcee96dc6ef8e5236da19b6197d2e', 'utf-8') assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") - gcmdec = AesGcmStreamDecrypt(key, iv) - bufdec = gcmdec.update(buf) + gcmdec = AesGcmStream(key, iv) + bufdec = gcmdec.decrypt(buf) + gcmdec.final(authTag) + assert bufdec == t2b("hello world") + +def test_encrypt_short_tag(): + key = "fedcba9876543210" + iv = "0123456789abcdef" + gcm = AesGcmStream(key, iv, 12) + buf = gcm.encrypt("hello world") + authTag = gcm.final() + assert b2h(authTag) == bytes('ac8fcee96dc6ef8e5236da19', 'utf-8') + assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") + gcmdec = AesGcmStream(key, iv) + bufdec = gcmdec.decrypt(buf) gcmdec.final(authTag) assert bufdec == t2b("hello world") def test_multipart(): key = "fedcba9876543210" iv = "0123456789abcdef" - gcm = AesGcmStreamEncrypt(key, iv) - buf = gcm.update("hello") - buf += gcm.update(" world") + gcm = AesGcmStream(key, iv) + buf = gcm.encrypt("hello") + buf += gcm.encrypt(" world") authTag = gcm.final() - assert b2h(authTag) == bytes('6862647a27c7b6aa0a6882b3e117e944', 'utf-8') + assert b2h(authTag) == bytes('ac8fcee96dc6ef8e5236da19b6197d2e', 'utf-8') assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") - gcmdec = AesGcmStreamDecrypt(key, iv) - bufdec = gcmdec.update(buf[:5]) - bufdec += gcmdec.update(buf[5:]) + gcmdec = AesGcmStream(key, iv) + bufdec = gcmdec.decrypt(buf[:5]) + bufdec += gcmdec.decrypt(buf[5:]) gcmdec.final(authTag) assert bufdec == t2b("hello world") + +def test_encrypt_aad(): + key = "fedcba9876543210" + iv = "0123456789abcdef" + aad = "aad data" + gcm = AesGcmStream(key, iv) + gcm.set_aad(aad) + buf = gcm.encrypt("hello world") + authTag = gcm.final() + print(b2h(authTag)) + assert b2h(authTag) == bytes('8f85338aa0b13f48f8b17482dbb8acca', 'utf-8') + assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") + gcmdec = AesGcmStream(key, iv) + gcmdec.set_aad(aad) + bufdec = gcmdec.decrypt(buf) + gcmdec.final(authTag) + assert bufdec == t2b("hello world") + +def test_multipart_aad(): + key = "fedcba9876543210" + iv = "0123456789abcdef" + aad = "aad data" + gcm = AesGcmStream(key, iv) + gcm.set_aad(aad) + buf = gcm.encrypt("hello") + buf += gcm.encrypt(" world") + authTag = gcm.final() + assert b2h(authTag) == bytes('8f85338aa0b13f48f8b17482dbb8acca', 'utf-8') + assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") + gcmdec = AesGcmStream(key, iv) + gcmdec.set_aad(aad) + bufdec = gcmdec.decrypt(buf[:5]) + bufdec += gcmdec.decrypt(buf[5:]) + gcmdec.final(authTag) + assert bufdec == t2b("hello world") + +def test_encrypt_aad_bad(): + key = "fedcba9876543210" + iv = "0123456789abcdef" + aad = "aad data" + aad_bad = "bad data" + gcm = AesGcmStream(key, iv) + gcm.set_aad(aad) + buf = gcm.encrypt("hello world") + authTag = gcm.final() + print(b2h(authTag)) + assert b2h(authTag) == bytes('8f85338aa0b13f48f8b17482dbb8acca', 'utf-8') + assert b2h(buf) == bytes('5ba7d42e1bf01d7998e932', "utf-8") + gcmdec = AesGcmStream(key, iv) + gcmdec.set_aad(aad_bad) + gcmdec.decrypt(buf) + with pytest.raises(WolfCryptError): + gcmdec.final(authTag) diff --git a/wolfcrypt/ciphers.py b/wolfcrypt/ciphers.py index a490915..3e22bd9 100644 --- a/wolfcrypt/ciphers.py +++ b/wolfcrypt/ciphers.py @@ -273,73 +273,98 @@ if _lib.AES_ENABLED: raise ValueError("Invalid mode associated to cipher") if _lib.AESGCM_STREAM: - class _AesGcmStream(object): + class AesGcmStream(object): """ AES GCM Stream """ block_size = 16 _key_sizes = [16, 24, 32] _native_type = "Aes *" + _aad = bytes() + _tag_bytes = 16 + _mode = None - def __init__(self, key, IV): + def __init__(self, key, IV, tag_bytes=16): + """ + tag_bytes is the number of bytes to use for the authentication tag during encryption + """ key = t2b(key) IV = t2b(IV) + self._tag_bytes = tag_bytes if len(key) not in self._key_sizes: raise ValueError("key must be %s in length, not %d" % (self._key_sizes, len(key))) self._native_object = _ffi.new(self._native_type) _lib.wc_AesInit(self._native_object, _ffi.NULL, -2) - self._authIn = _ffi.new("byte[%d]" % self.block_size) ret = _lib.wc_AesGcmInit(self._native_object, key, len(key), IV, len(IV)) if ret < 0: raise WolfCryptError("Init error (%d)" % ret) - def update(self, data): + def set_aad(self, data): """ - Updates the stream with another segment of data. + Set the additional authentication data for the stream + """ + if self._mode is not None: + raise WolfCryptError("AAD can only be set before encrypt() or decrypt() is called") + self._aad = t2b(data) + + def encrypt(self, data): + """ + Add more data to the encryption stream """ - ret = 0 data = t2b(data) + if self._mode is None: + self._mode = _ENCRYPTION + elif self._mode == _DECRYPTION: + raise WolfCryptError("Class instance already in use for decryption") self._buf = _ffi.new("byte[%d]" % (len(data))) - ret = self._update(data) + ret = _lib.wc_AesGcmEncryptUpdate(self._native_object, self._buf, data, len(data), self._aad, len(self._aad)) + if ret < 0: + raise WolfCryptError("Decryption error (%d)" % ret) + # Reset aad after first packet + self._aad = bytes() + return bytes(self._buf) + + def decrypt(self, data): + """ + Add more data to the decryption stream + """ + data = t2b(data) + if self._mode is None: + self._mode = _DECRYPTION + elif self._mode == _ENCRYPTION: + raise WolfCryptError("Class instance already in use for decryption") + self._buf = _ffi.new("byte[%d]" % (len(data))) + ret = _lib.wc_AesGcmDecryptUpdate(self._native_object, self._buf, data, len(data), self._aad, len(self._aad)) + # Reset after first packet + self._aad = bytes() if ret < 0: raise WolfCryptError("Decryption error (%d)" % ret) return bytes(self._buf) - class AesGcmStreamEncrypt(_AesGcmStream): - """ - AES GCM Streaming Encryption - """ - def _update(self, data): - return _lib.wc_AesGcmEncryptUpdate(self._native_object, self._buf, data, len(data), self._authIn, self.block_size) - - def final(self): + def final(self, authTag=None): """ - Finalize the stream and return an authentication tag for the stream. + When encrypting, finalize the stream and return an authentication tag for the stream. + When decrypting, verify the authentication tag for the stream. + The authTag parameter is only used for decrypting. """ - authTag = _ffi.new("byte[%d]" % self.block_size) - ret = _lib.wc_AesGcmEncryptFinal(self._native_object, authTag, self.block_size) - if ret < 0: - raise WolfCryptError("Encryption error (%d)" % ret) - return _ffi.buffer(authTag)[:] + if self._mode is None: + raise WolfCryptError("Final called with no encryption or decryption") + elif self._mode == _ENCRYPTION: + authTag = _ffi.new("byte[%d]" % self._tag_bytes) + ret = _lib.wc_AesGcmEncryptFinal(self._native_object, authTag, self._tag_bytes) + if ret < 0: + raise WolfCryptError("Encryption error (%d)" % ret) + return _ffi.buffer(authTag)[:] + else: + if authTag is None: + raise WolfCryptError("authTag parameter required") + authTag = t2b(authTag) + ret = _lib.wc_AesGcmDecryptFinal(self._native_object, authTag, len(authTag)) + if ret < 0: + raise WolfCryptError("Decryption error (%d)" % ret) - class AesGcmStreamDecrypt(_AesGcmStream): - """ - AES GCM Streaming Decryption - """ - def _update(self, data): - return _lib.wc_AesGcmDecryptUpdate(self._native_object, self._buf, data, len(data), self._authIn, self.block_size) - - def final(self, authTag): - """ - Finalize the stream and verify using the provided authentication tag. - """ - authTag = t2b(authTag) - ret = _lib.wc_AesGcmDecryptFinal(self._native_object, authTag, self.block_size) - if ret < 0: - raise WolfCryptError("Decryption error (%d)" % ret) - if _lib.CHACHA_ENABLED: class ChaCha(_Cipher): """