first success with test

pull/978/head
DJ2LS 2025-05-25 12:39:12 +02:00
parent 708ed22c0a
commit b32de53f7c
2 changed files with 58 additions and 76 deletions

View File

@ -24,39 +24,6 @@ import codec2
import arq_session_irs import arq_session_irs
import os import os
class TestModem:
def __init__(self, event_q, state_q):
self.data_queue_received = queue.Queue()
self.demodulator = unittest.mock.Mock()
self.event_manager = EventManager([event_q])
self.logger = structlog.get_logger('Modem')
self.states = StateManager(state_q)
def getFrameTransmissionTime(self, mode):
samples = 0
c2instance = codec2.open_instance(mode.value)
samples += codec2.api.freedv_get_n_tx_preamble_modem_samples(c2instance)
samples += codec2.api.freedv_get_n_tx_modem_samples(c2instance)
samples += codec2.api.freedv_get_n_tx_postamble_modem_samples(c2instance)
time = samples / 8000
#print(mode)
#if mode == codec2.FREEDV_MODE.signalling:
# time = 0.69
#print(time)
return time
def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool:
# Simulate transmission time
tx_time = self.getFrameTransmissionTime(mode) + 0.1 # PTT
self.logger.info(f"TX {tx_time} seconds...")
threading.Event().wait(tx_time)
transmission = {
'mode': mode,
'bytes': frames,
}
self.data_queue_received.put(transmission)
class DummyCtx: class DummyCtx:
def __init__(self): def __init__(self):
@ -69,8 +36,6 @@ class TestARQSession(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
# TODO:
# ESTABLISH MODEM CHANNELS CORRECTLY SO WE ARE GETTING THE CORRESPINDING BURSTS
cls.logger = structlog.get_logger("TESTS") cls.logger = structlog.get_logger("TESTS")

View File

@ -54,51 +54,66 @@ class TestMessageProtocol(unittest.TestCase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
cls.ctxISS = AppContext('freedata_server/config.ini.example')
cls.ctxISS.config_manager.read()
cls.ctxIRS = AppContext('freedata_server/config.ini.example')
cls.ctxIRS.config_manager.read()
cls.config = cls.ctx.config_manager.config
cls.logger = structlog.get_logger("TESTS") cls.logger = structlog.get_logger("TESTS")
cls.frame_factory = DataFrameFactory(cls.ctx)
# ISS # ISS
cls.iss_state_manager = StateManager(queue.Queue())
cls.iss_state_manager.set_channel_busy_condition_codec2(False) cls.ctx_ISS = AppContext('freedata_server/config.ini.example')
cls.iss_event_manager = EventManager([queue.Queue()]) cls.ctx_ISS.TESTMODE = True
cls.iss_event_queue = queue.Queue() cls.ctx_ISS.startup()
cls.iss_state_queue = queue.Queue()
cls.iss_modem = TestModem(cls.iss_event_queue, cls.iss_state_queue)
cls.iss_frame_dispatcher = DISPATCHER(cls.ctx)
# IRS # IRS
cls.irs_state_manager = StateManager(queue.Queue()) cls.ctx_IRS = AppContext('freedata_server/config.ini.example')
cls.irs_event_manager = EventManager([queue.Queue()]) cls.ctx_IRS.TESTMODE = True
cls.irs_event_queue = queue.Queue() cls.ctx_IRS.startup()
cls.irs_state_queue = queue.Queue()
cls.irs_modem = TestModem(cls.irs_event_queue, cls.irs_state_queue)
cls.irs_frame_dispatcher = DISPATCHER(cls.ctx)
cls.loss_probability = 30 # simulate a busy condition
cls.channels_running = True cls.ctx_IRS.state_manager.channel_busy_slot = [True, False, False, False, False]
# Frame loss probability in %
cls.loss_probability = 0
cls.channels_running = False
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
cls.ctx.shutdown() cls.ctx_IRS.shutdown()
cls.ctx_ISS.shutdown()
def channelWorker(self, modem_transmit_queue: queue.Queue, frame_dispatcher: DISPATCHER):
def channelWorker(self, ctx_a, ctx_b):
while self.channels_running: while self.channels_running:
try: try:
transmission = modem_transmit_queue.get(timeout=1) # Station A gets the data from its transmit queue
transmission["bytes"] += bytes(2) # simulate CRC transmission = ctx_a.TESTMODE_TRANSMIT_QUEUE.get(timeout=1)
print(f"Station A sending: {transmission[1]}", len(transmission[1]), transmission[0])
transmission[1] += bytes(2) # 2bytes crc simulation
if random.randint(0, 100) < self.loss_probability: if random.randint(0, 100) < self.loss_probability:
self.logger.info(f"[{threading.current_thread().name}] Frame lost...") self.logger.info(f"[{threading.current_thread().name}] Frame lost...")
continue continue
frame_dispatcher.process_data(transmission['bytes'], None, len(transmission['bytes']), 0, 0)
# Forward data from Station A to Station B's receive queue
if ctx_b:
ctx_b.TESTMODE_RECEIVE_QUEUE.put(transmission)
self.logger.info(f"Data forwarded to Station B")
frame_bytes = transmission[1]
if len(frame_bytes) == 5:
mode_name = "SIGNALLING_ACK"
else:
mode_name = transmission[0]
snr = 15
ctx_b.service_manager.frame_dispatcher.process_data(
frame_bytes, None, len(frame_bytes), snr, 0, mode_name=mode_name
)
except queue.Empty: except queue.Empty:
continue continue
self.logger.info(f"[{threading.current_thread().name}] Channel closed.") self.logger.info(f"[{threading.current_thread().name}] Channel closed.")
def waitForSession(self, event_queue, outbound=False): def waitForSession(self, event_queue, outbound=False):
@ -114,21 +129,17 @@ class TestMessageProtocol(unittest.TestCase):
def establishChannels(self): def establishChannels(self):
self.channels_running = True self.channels_running = True
self.iss_to_irs_channel = threading.Thread(target=self.channelWorker, self.channelA = threading.Thread(target=self.channelWorker,args=[self.ctx_ISS, self.ctx_IRS],name = "channelA")
args=[self.iss_modem.data_queue_received, self.irs_frame_dispatcher], self.channelA.start()
name="ISS->IRS")
self.irs_to_iss_channel = threading.Thread(target=self.channelWorker, self.channelB = threading.Thread(target=self.channelWorker,args=[self.ctx_IRS, self.ctx_ISS],name = "channelB")
args=[self.irs_modem.data_queue_received, self.iss_frame_dispatcher], self.channelB.start()
name="IRS->ISS")
self.iss_to_irs_channel.start()
self.irs_to_iss_channel.start()
def waitAndCloseChannels(self): def waitAndCloseChannels(self):
self.waitForSession(self.iss_event_queue, outbound=True) self.waitForSession(self.ctx_ISS.modem_events, True)
self.channels_running = False
self.waitForSession(self.ctx_IRS.modem_events, False)
self.channels_running = False self.channels_running = False
self.waitForSession(self.irs_event_queue, outbound=False)
self.iss_to_irs_channel.join()
self.irs_to_iss_channel.join()
def testMessageViaSession(self): def testMessageViaSession(self):
self.loss_probability = 0 # no loss self.loss_probability = 0 # no loss
@ -142,8 +153,14 @@ class TestMessageProtocol(unittest.TestCase):
command = command_message_send.SendMessageCommand(self.ctx, params) command = command_message_send.SendMessageCommand(self.ctx, params)
command.run() command.run()
self.waitAndCloseChannels() #del cmd
print(self.ctx_ISS.TESTMODE_EVENTS.empty())
while not self.ctx_ISS.TESTMODE_EVENTS.empty():
event = self.ctx_ISS.TESTMODE_EVENTS.get()
success = event.get('arq-transfer-outbound', {}).get('success', None)
if success is not None:
self.assertTrue(success, f"Test failed because of wrong success: {success}")
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()