some work in progress regarding automated testing

dev-connected-mode
DJ2LS 2025-02-02 17:39:50 +01:00
parent ae364ac57d
commit fdd3be4b56
1 changed files with 16 additions and 4 deletions

View File

@ -73,7 +73,7 @@ class TestP2PConnectionSession(unittest.TestCase):
cls.iss_modem = TestModem(cls.iss_event_queue, cls.iss_state_queue)
cls.iss_socket_interface_manager = SocketInterfaceHandler(cls.iss_modem, cls.config, cls.iss_state_manager, cls.iss_event_manager).start_servers()
cls.iss_socket_interface_manager = SocketInterfaceHandler(cls.iss_modem, cls.iss_config_manager, cls.iss_state_manager, cls.iss_event_manager).start_servers()
cls.iss_frame_dispatcher = DISPATCHER(cls.config,
cls.iss_event_manager,
@ -84,13 +84,21 @@ class TestP2PConnectionSession(unittest.TestCase):
#cls.iss_socket_command_handler = CommandSocket(TestSocket(), '127.0.0.1', 51234)
# IRS
cls.irs_config_manager = config_manager
cls.irs_state_manager = StateManager(queue.Queue())
cls.irs_event_manager = EventManager([queue.Queue()])
cls.irs_event_queue = queue.Queue()
cls.irs_state_queue = queue.Queue()
cls.irs_p2p_data_queue = queue.Queue()
cls.irs_modem = TestModem(cls.irs_event_queue, cls.irs_state_queue)
cls.irs_socket_interface_manager = SocketInterfaceHandler(cls.irs_modem, cls.config, cls.irs_state_manager, cls.irs_event_manager).start_servers()
conf = cls.irs_config_manager.read()
conf['SOCKET_INTERFACE']['cmd_port'] = 9000
conf['SOCKET_INTERFACE']['data_port'] = 9001
cls.irs_config_manager.write(conf)
cls.irs_socket_interface_manager = SocketInterfaceHandler(cls.irs_modem, cls.irs_config_manager, cls.irs_state_manager, cls.irs_event_manager).start_servers()
cls.irs_frame_dispatcher = DISPATCHER(cls.config,
cls.irs_event_manager,
cls.irs_state_manager,
@ -151,13 +159,14 @@ class TestP2PConnectionSession(unittest.TestCase):
length = random.randint(min_length, max_length)
return ''.join(random.choices(string.ascii_letters, k=length))#
def DisabledtestARQSessionSmallPayload(self):
def testARQSessionSmallPayload(self):
# set Packet Error Rate (PER) / frame loss probability
self.loss_probability = 0
self.establishChannels()
handler = SocketCommandHandler(TestSocket(self), self.iss_modem, self.iss_config_manager, self.iss_state_manager, self.iss_event_manager)
handler = SocketCommandHandler(TestSocket(self), self.iss_modem, self.iss_config_manager, self.iss_state_manager, self.iss_event_manager, self.iss_socket_interface_manager)
handler.handle_connect(["AA1AAA-1", "BB2BBB-2"])
self.connected_event = threading.Event()
@ -194,5 +203,8 @@ class TestSocket:
self.disconnect_received = True
self.test_class.assertEqual(b'DISCONNECTED\r\n', b'DISCONNECTED\r\n')
if __name__ == '__main__':
unittest.main()