From f3c6054a986ffc2d3de612a38ca0ca24eaad40f0 Mon Sep 17 00:00:00 2001 From: Stefan DK5SM <58949681+dk5sm@users.noreply.github.com> Date: Mon, 28 Apr 2025 17:01:10 +0200 Subject: [PATCH 01/36] revert commit 9ae348f / possible fix for #954 ...as it breaks CAT and PTT control --- freedata_server/rigctld.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/freedata_server/rigctld.py b/freedata_server/rigctld.py index 3967b5d3..03605033 100644 --- a/freedata_server/rigctld.py +++ b/freedata_server/rigctld.py @@ -382,7 +382,7 @@ class radio: purposes. It handles potential errors during command execution. """ try: - vfo_response = self.send_command('\\dump_caps') + vfo_response = self.send_command(r'\dump_caps') print(vfo_response) except Exception as e: @@ -397,7 +397,7 @@ class radio: check. """ try: - vfo_response = self.send_command(r'\\chk_vfo') + vfo_response = self.send_command(r'\chk_vfo') if vfo_response in [1, "1"]: self.parameters['chk_vfo'] = True else: From 0be46148b55963d1facf1b47f673fdb9ac758aab Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Fri, 2 May 2025 13:59:54 +0200 Subject: [PATCH 02/36] more p2p adjustments --- freedata_server/data_frame_factory.py | 6 +++--- freedata_server/p2p_connection.py | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 0012e3d9..74234342 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -245,9 +245,9 @@ class DataFrameFactory: if not isinstance(item_length, int): item_length = len(content[key]) - print(frame_length) - print(item_length) - print(content) + #print(frame_length) + #print(item_length) + #print(content) if buffer_position + item_length > frame_length: raise OverflowError("Frame data overflow!") frame[buffer_position: buffer_position + item_length] = content[key] diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index fda91989..62c5270a 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -43,6 +43,8 @@ class P2PConnection: FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss', FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'received_data', FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', + }, States.PAYLOAD_SENT: { FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data', @@ -53,6 +55,7 @@ class P2PConnection: States.ARQ_SESSION: { FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data', FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', }, States.DISCONNECTING: { From 2d8fedb4d006b5ebe6ec3214a5dd82820fff90ab Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Fri, 2 May 2025 14:10:03 +0200 Subject: [PATCH 03/36] more p2p adjustments --- freedata_server/frame_dispatcher.py | 3 ++- freedata_server/frame_handler_p2p_connection.py | 2 ++ freedata_server/p2p_connection.py | 15 +++++++-------- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/freedata_server/frame_dispatcher.py b/freedata_server/frame_dispatcher.py index b88efdf8..b750b208 100644 --- a/freedata_server/frame_dispatcher.py +++ b/freedata_server/frame_dispatcher.py @@ -41,7 +41,8 @@ class DISPATCHER: "name": "P2P Connection PAYLOAD"}, FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection PAYLOAD ACK"}, - + FR_TYPE.P2P_CONNECTION_HEARTBEAT.value: {"class": P2PConnectionFrameHandler, + "name": "P2P Connection HEARTBEAT"}, #FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"}, #FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"}, FR_TYPE.ARQ_STOP.value: {"class": ARQFrameHandler, "name": "ARQ STOP"}, diff --git a/freedata_server/frame_handler_p2p_connection.py b/freedata_server/frame_handler_p2p_connection.py index 020f50d0..52c2386e 100644 --- a/freedata_server/frame_handler_p2p_connection.py +++ b/freedata_server/frame_handler_p2p_connection.py @@ -54,6 +54,8 @@ class P2PConnectionFrameHandler(frame_handler.FrameHandler): FR.P2P_CONNECTION_DISCONNECT_ACK.value, FR.P2P_CONNECTION_PAYLOAD.value, FR.P2P_CONNECTION_PAYLOAD_ACK.value, + FR.P2P_CONNECTION_HEARTBEAT.value, + ]: session = self.ctx.state_manager.get_p2p_connection_session(session_id) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 62c5270a..8bf31864 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -110,7 +110,7 @@ class P2PConnection: self.ENTIRE_CONNECTION_TIMEOUT = 180 self.is_ISS = False # Indicator, if we are ISS or IRS - + self.is_Master = False # Indicator, if we are Maste or Not self.last_data_timestamp= time.time() self.start_data_processing_worker() @@ -128,7 +128,7 @@ class P2PConnection: # start sending data self.iss_buffer_empty.set() - if not self.p2p_data_tx_queue.empty() and self.state == States.CONNECTED: + if self.state == States.CONNECTED and self.is_Master: self.process_data_queue() threading.Event().wait(0.500) @@ -231,6 +231,7 @@ class P2PConnection: self.log("CONNECTED ISS...........................") self.set_state(States.CONNECTED) self.is_ISS = True + self.is_Master = True # start sending data self.iss_buffer_empty.set() @@ -245,6 +246,7 @@ class P2PConnection: self.ctx.socket_interface_manager.command_server.command_handler.session = self self.set_state(States.CONNECTED) self.is_ISS = False + self.is_Master = False self.origin = frame["origin"] self.destination = frame["destination"] self.destination_crc = frame["destination_crc"] @@ -267,14 +269,11 @@ class P2PConnection: def process_data_queue(self, frame=None): if self.p2p_data_tx_queue.empty(): - print("buffer empty....") + print("buffer empty....", "setting isMaster = False", "sending heartbeat") + self.is_Master = False payload = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=True) - self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=FREEDV_MODE.signalling_ack) + self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=FREEDV_MODE.signalling) self.set_state(States.PAYLOAD_SENT) - time.sleep(5) - return - - if not self.iss_buffer_empty.is_set(): return print("processing data....") From 38edb46d32ace0bd9e33a078555e8700c24c0e87 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Fri, 2 May 2025 14:13:01 +0200 Subject: [PATCH 04/36] more p2p adjustments --- freedata_server/frame_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/freedata_server/frame_handler.py b/freedata_server/frame_handler.py index f124ec04..aa3ee04f 100644 --- a/freedata_server/frame_handler.py +++ b/freedata_server/frame_handler.py @@ -108,7 +108,7 @@ class FrameHandler(): #check for p2p connection - elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK', 'P2P_CONNECTION_DISCONNECT', 'P2P_CONNECTION_DISCONNECT_ACK']: + elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK', 'P2P_CONNECTION_HEARTBEAT', 'P2P_CONNECTION_DISCONNECT', 'P2P_CONNECTION_DISCONNECT_ACK']: session_id = self.details['frame']['session_id'] if session_id in self.ctx.state_manager.p2p_connection_sessions: valid = True From 9516019e16761739a0645b50acc627abb712d683 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Fri, 2 May 2025 14:34:01 +0200 Subject: [PATCH 05/36] more p2p adjustments --- freedata_server/p2p_connection.py | 7 ++++++- freedata_server/socket_interface_commands.py | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 8bf31864..ced69691 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -273,7 +273,11 @@ class P2PConnection: self.is_Master = False payload = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=True) self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=FREEDV_MODE.signalling) - self.set_state(States.PAYLOAD_SENT) + # just send burst without expecting an answer + self.set_state(States.CONNECTED) + self.last_data_timestamp = time.time() + self.event_frame_received.set() + return print("processing data....") @@ -321,6 +325,7 @@ class P2PConnection: def received_heartbeat(self, frame): print(frame) + print(frame['flag']) print("received heartbeat...") self.iss_buffer_empty.set() diff --git a/freedata_server/socket_interface_commands.py b/freedata_server/socket_interface_commands.py index 3f0de5ae..45d132cf 100644 --- a/freedata_server/socket_interface_commands.py +++ b/freedata_server/socket_interface_commands.py @@ -99,7 +99,8 @@ class SocketCommandHandler: def socket_respond_connected(self, origin, destination, bandwidth): print("[socket interface_commands] socket_respond_connected") if self.ctx.socket_interface_manager.connecting_callsign: - message = f"CONNECTED {self.ctx.socket_interface_manager.connecting_callsign} {destination} {bandwidth}" + #message = f"CONNECTED {self.ctx.socket_interface_manager.connecting_callsign} {destination} {bandwidth}" + message = f"CONNECTED {origin} {destination} {bandwidth}" else: message = f"CONNECTED {origin} {destination} {bandwidth}" self.send_response(f"UNENCRYPTED LINK") From 9a573118a58149adde3aed62a5a729e69b3700e3 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Fri, 2 May 2025 14:52:38 +0200 Subject: [PATCH 06/36] more p2p adjustments --- freedata_server/p2p_connection.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index ced69691..10c33ccf 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -124,12 +124,13 @@ class P2PConnection: self.disconnect() return - if time.time() > self.last_data_timestamp + 6 and self.state is States.CONNECTED: - # start sending data - self.iss_buffer_empty.set() + if time.time() > self.last_data_timestamp + 15 and self.state is States.CONNECTED: + print("no data within last 15s. Taking master status") + self.is_Master = True if self.state == States.CONNECTED and self.is_Master: self.process_data_queue() + threading.Event().wait(0.500) @@ -153,7 +154,7 @@ class P2PConnection: self.frequency_offset = frequency_offset def log(self, message, isWarning = False): - msg = f"[{type(self).__name__}][id={self.session_id}][state={self.state}][ISS={bool(self.is_ISS)}]: {message}" + msg = f"[{type(self).__name__}][id={self.session_id}][state={self.state}][ISS={bool(self.is_ISS)}][Master={bool(self.is_Master)}]: {message}" logger = self.logger.warn if isWarning else self.logger.info logger(msg) @@ -272,7 +273,9 @@ class P2PConnection: print("buffer empty....", "setting isMaster = False", "sending heartbeat") self.is_Master = False payload = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=True) - self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=FREEDV_MODE.signalling) + #self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=FREEDV_MODE.signalling) + self.transmit_frame([payload], FREEDV_MODE.signalling) + # just send burst without expecting an answer self.set_state(States.CONNECTED) self.last_data_timestamp = time.time() From f98dffb8b176cdb0b8841a5f5be9519af1893597 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Fri, 2 May 2025 14:58:41 +0200 Subject: [PATCH 07/36] more p2p adjustments --- freedata_server/data_frame_factory.py | 2 +- freedata_server/p2p_connection.py | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 74234342..29904a23 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -174,7 +174,7 @@ class DataFrameFactory: # heartbeat for "is alive" self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT.value] = { - "frame_length": self.LENGTH_ACK_FRAME, + "frame_length": self.LENGTH_SIG1_FRAME, "session_id": 1, "flag": 1 } diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 10c33ccf..16393d64 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -99,8 +99,7 @@ class P2PConnection: self.session_id = self.generate_id() self.event_frame_received = threading.Event() - # set if iss buffer is empty - self.iss_buffer_empty = threading.Event() + self.RETRIES_CONNECT = 3 @@ -234,8 +233,6 @@ class P2PConnection: self.is_ISS = True self.is_Master = True - # start sending data - self.iss_buffer_empty.set() self.log(frame) if self.ctx.socket_interface_manager and hasattr(self.ctx.socket_interface_manager.command_server, "command_handler"): @@ -305,8 +302,6 @@ class P2PConnection: def received_data(self, frame): self.log(f"received data...: {frame}") - self.iss_buffer_empty = threading.Event() - ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0) self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling_ack) @@ -329,8 +324,11 @@ class P2PConnection: def received_heartbeat(self, frame): print(frame) print(frame['flag']) + if frame['flag']['BUFFER_EMPTY']: + print("other stations buffer is empty. We can become master now") + self.is_Master = True + print("received heartbeat...") - self.iss_buffer_empty.set() def disconnect(self): From be9999efb38c2e841787b5c03ca3f79022d30b8e Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Fri, 2 May 2025 15:47:06 +0200 Subject: [PATCH 08/36] announce arq session --- freedata_server/data_frame_factory.py | 5 +++- freedata_server/frame_dispatcher.py | 4 +-- freedata_server/frame_handler.py | 2 +- .../frame_handler_p2p_connection.py | 2 +- freedata_server/p2p_connection.py | 28 +++++++++++++++---- 5 files changed, 31 insertions(+), 10 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 29904a23..5aea3c24 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -25,6 +25,7 @@ class DataFrameFactory: P2P_FLAGS = { 'BUFFER_EMPTY': 0, # Bit-position for indicating the BUFFER EMPTY state + 'ANNOUNCE_ARQ': 1, # Bit-position for announcing an ARQ session } def __init__(self, ctx): @@ -540,10 +541,12 @@ class DataFrameFactory: } return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT_ACK, payload) - def build_p2p_connection_heartbeat(self, session_id, flag_buffer_empty=False): + def build_p2p_connection_heartbeat(self, session_id, flag_buffer_empty=False, flag_announce_arq=False): flag = 0b00000000 if flag_buffer_empty: flag = helpers.set_flag(flag, 'BUFFER_EMPTY', True, self.P2P_FLAGS) + if flag_announce_arq: + flag = helpers.set_flag(flag, 'ANNOUNCE_ARQ', True, self.P2P_FLAGS) payload = { "session_id": session_id.to_bytes(1, 'big'), diff --git a/freedata_server/frame_dispatcher.py b/freedata_server/frame_dispatcher.py index b750b208..37439ca9 100644 --- a/freedata_server/frame_dispatcher.py +++ b/freedata_server/frame_dispatcher.py @@ -43,8 +43,8 @@ class DISPATCHER: "name": "P2P Connection PAYLOAD ACK"}, FR_TYPE.P2P_CONNECTION_HEARTBEAT.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection HEARTBEAT"}, - #FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"}, - #FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"}, + FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: {"class": P2PConnectionFrameHandler, + "name": "P2P Connection HEARTBEAT ACK"}, FR_TYPE.ARQ_STOP.value: {"class": ARQFrameHandler, "name": "ARQ STOP"}, FR_TYPE.ARQ_STOP_ACK.value: {"class": ARQFrameHandler, "name": "ARQ STOP ACK"}, FR_TYPE.BEACON.value: {"class": BeaconFrameHandler, "name": "BEACON"}, diff --git a/freedata_server/frame_handler.py b/freedata_server/frame_handler.py index aa3ee04f..f4727abd 100644 --- a/freedata_server/frame_handler.py +++ b/freedata_server/frame_handler.py @@ -108,7 +108,7 @@ class FrameHandler(): #check for p2p connection - elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK', 'P2P_CONNECTION_HEARTBEAT', 'P2P_CONNECTION_DISCONNECT', 'P2P_CONNECTION_DISCONNECT_ACK']: + elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK', 'P2P_CONNECTION_HEARTBEAT','P2P_CONNECTION_HEARTBEAT_ACK', 'P2P_CONNECTION_DISCONNECT', 'P2P_CONNECTION_DISCONNECT_ACK']: session_id = self.details['frame']['session_id'] if session_id in self.ctx.state_manager.p2p_connection_sessions: valid = True diff --git a/freedata_server/frame_handler_p2p_connection.py b/freedata_server/frame_handler_p2p_connection.py index 52c2386e..b53cd5a3 100644 --- a/freedata_server/frame_handler_p2p_connection.py +++ b/freedata_server/frame_handler_p2p_connection.py @@ -55,7 +55,7 @@ class P2PConnectionFrameHandler(frame_handler.FrameHandler): FR.P2P_CONNECTION_PAYLOAD.value, FR.P2P_CONNECTION_PAYLOAD_ACK.value, FR.P2P_CONNECTION_HEARTBEAT.value, - + FR.P2P_CONNECTION_HEARTBEAT_ACK.value, ]: session = self.ctx.state_manager.get_p2p_connection_session(session_id) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 16393d64..0df46aa7 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -44,18 +44,20 @@ class P2PConnection: FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'received_data', FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack', }, States.PAYLOAD_SENT: { FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data', FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack', }, States.ARQ_SESSION: { FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data', FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', - FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack', }, States.DISCONNECTING: { @@ -322,13 +324,22 @@ class P2PConnection: self.set_state(States.CONNECTED) def received_heartbeat(self, frame): - print(frame) - print(frame['flag']) + print("received heartbeat...") + if frame['flag']['BUFFER_EMPTY']: print("other stations buffer is empty. We can become master now") self.is_Master = True + if frame['flag']['ANNOUNCE_ARQ']: + print("other station announced arq, changing state") + self.is_Master = False + self.set_state(States.ARQ_SESSION) + + def received_heartbeat_ack(self, frame): + print("received heartbeat ack...") + + self.event_frame_received.set() + self.set_state(States.ARQ_SESSION) - print("received heartbeat...") def disconnect(self): @@ -371,7 +382,14 @@ class P2PConnection: else: arq_destination = self.origin - self.log(f"ARQ Destination: {self.destination}") + self.log(f"ANNOUNCING ARQ to destination: {self.destination}") + heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=False, flag_announce_arq=True) + self.launch_twr(heartbeat, 15, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) + self.event_frame_received.wait() + + self.log(f"ARQ destination: {self.destination}") + + prepared_data, type_byte = self.arq_data_type_handler.prepare(data, ARQ_SESSION_TYPES.p2p_connection) iss = ARQSessionISS(self.ctx, arq_destination, prepared_data, type_byte) iss.id = self.session_id From cf433c6ed3b834fbdcf964a5c904c873db175ca4 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Fri, 2 May 2025 17:38:00 +0200 Subject: [PATCH 09/36] announce arq session --- freedata_server/data_frame_factory.py | 2 +- freedata_server/p2p_connection.py | 70 ++++++++++++++++++--------- 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 5aea3c24..58128508 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -25,7 +25,7 @@ class DataFrameFactory: P2P_FLAGS = { 'BUFFER_EMPTY': 0, # Bit-position for indicating the BUFFER EMPTY state - 'ANNOUNCE_ARQ': 1, # Bit-position for announcing an ARQ session + 'ANNOUNCE_ARQ': 1, # Bit-position for announcing an ARQ session } def __init__(self, ctx): diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 0df46aa7..6a76c7fc 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -115,6 +115,9 @@ class P2PConnection: self.last_data_timestamp= time.time() self.start_data_processing_worker() + self.flag_buffer_empty = False + self.flag_announce_arq = False + def start_data_processing_worker(self): """Starts a worker thread to monitor the transmit data queue and process data.""" @@ -125,9 +128,10 @@ class P2PConnection: self.disconnect() return - if time.time() > self.last_data_timestamp + 15 and self.state is States.CONNECTED: - print("no data within last 15s. Taking master status") - self.is_Master = True + # thats our heartbeat logic, only ISS will run it + if time.time() > self.last_data_timestamp + 15 and self.state is States.CONNECTED and self.is_ISS and self.state is not States.ARQ_SESSION: + print("no data within last 15s. Sending heartbeat") + self.transmit_heartbeat() if self.state == States.CONNECTED and self.is_Master: self.process_data_queue() @@ -269,19 +273,12 @@ class P2PConnection: def process_data_queue(self, frame=None): if self.p2p_data_tx_queue.empty(): - print("buffer empty....", "setting isMaster = False", "sending heartbeat") + self.buffer_empty = True self.is_Master = False - payload = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=True) - #self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=FREEDV_MODE.signalling) - self.transmit_frame([payload], FREEDV_MODE.signalling) - - # just send burst without expecting an answer - self.set_state(States.CONNECTED) - self.last_data_timestamp = time.time() - self.event_frame_received.set() - return + self.buffer_empty = False + self.is_Master = True print("processing data....") data = self.p2p_data_tx_queue.get() @@ -323,22 +320,46 @@ class P2PConnection: print("transmitted data...") self.set_state(States.CONNECTED) + def transmit_heartbeat(self, buffer_empty=False, announce_arq=False): + # heartbeats will be transmit by ISS only, therefore only IRS can reveice heartbeat ack + heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=buffer_empty, flag_announce_arq=announce_arq) + self.launch_twr(heartbeat, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) + + def transmit_heartbeat_ack(self): + heartbeat_ack = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=self.buffer_empty, + flag_announce_arq=self.announce_arq) + self.transmit_frame([heartbeat_ack], FREEDV_MODE.signalling) + def received_heartbeat(self, frame): print("received heartbeat...") - if frame['flag']['BUFFER_EMPTY']: - print("other stations buffer is empty. We can become master now") + if frame['flag']['BUFFER_EMPTY'] and self.buffer_empty: + print("other stations buffer is empty as well. We wont become data master now") + self.is_Master = False + + if frame['flag']['BUFFER_EMPTY'] and not self.buffer_empty: + print("other stations buffer is empty. We can become data master now") self.is_Master = True + if frame['flag']['ANNOUNCE_ARQ']: print("other station announced arq, changing state") self.is_Master = False self.set_state(States.ARQ_SESSION) + self.transmit_heartbeat_ack() + def received_heartbeat_ack(self, frame): - print("received heartbeat ack...") + print("received heartbeat ack from IRS...") + if frame['flag']['BUFFER_EMPTY']: + print("other stations buffer is empty. We can become data master now") + self.is_Master = True + + if frame['flag']['ANNOUNCE_ARQ']: + print("other station announced arq, changing state") + self.is_Master = False + self.set_state(States.ARQ_SESSION) self.event_frame_received.set() - self.set_state(States.ARQ_SESSION) @@ -382,13 +403,14 @@ class P2PConnection: else: arq_destination = self.origin - self.log(f"ANNOUNCING ARQ to destination: {self.destination}") - heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=False, flag_announce_arq=True) - self.launch_twr(heartbeat, 15, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) - self.event_frame_received.wait() - - self.log(f"ARQ destination: {self.destination}") - + #self.log(f"ANNOUNCING ARQ to destination: {self.destination}") + #heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=False, flag_announce_arq=True) + #self.launch_twr(heartbeat, 5, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) + #self.event_frame_received.wait() + #self.log(f"ARQ destination: {self.destination}") + self.transmit_heartbeat(announce_arq=True) + print("wait some time until ARQ starts....") + threading.Event().wait(5) prepared_data, type_byte = self.arq_data_type_handler.prepare(data, ARQ_SESSION_TYPES.p2p_connection) iss = ARQSessionISS(self.ctx, arq_destination, prepared_data, type_byte) From 80fdf8da3bc0c31e8a5c85649dd9a83cfcd77f6e Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Sun, 4 May 2025 13:45:48 +0200 Subject: [PATCH 10/36] work on heartbeat --- freedata_server/p2p_connection.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 6a76c7fc..238523d4 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -238,9 +238,6 @@ class P2PConnection: self.set_state(States.CONNECTED) self.is_ISS = True self.is_Master = True - - - self.log(frame) if self.ctx.socket_interface_manager and hasattr(self.ctx.socket_interface_manager.command_server, "command_handler"): self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) @@ -322,26 +319,29 @@ class P2PConnection: def transmit_heartbeat(self, buffer_empty=False, announce_arq=False): # heartbeats will be transmit by ISS only, therefore only IRS can reveice heartbeat ack + self.last_data_timestamp = time.time() heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=buffer_empty, flag_announce_arq=announce_arq) - self.launch_twr(heartbeat, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) + self.launch_twr(heartbeat, 6, 10, mode=FREEDV_MODE.signalling) def transmit_heartbeat_ack(self): + self.last_data_timestamp = time.time() heartbeat_ack = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=self.buffer_empty, flag_announce_arq=self.announce_arq) self.transmit_frame([heartbeat_ack], FREEDV_MODE.signalling) def received_heartbeat(self, frame): print("received heartbeat...") - - if frame['flag']['BUFFER_EMPTY'] and self.buffer_empty: + self.last_data_timestamp = time.time() + print(frame) + if bool(frame.get('flag', {}).get('BUFFER_EMPTY', False)) and self.buffer_empty: print("other stations buffer is empty as well. We wont become data master now") self.is_Master = False - if frame['flag']['BUFFER_EMPTY'] and not self.buffer_empty: + if bool(frame.get('flag', {}).get('BUFFER_EMPTY', False)) and not self.buffer_empty: print("other stations buffer is empty. We can become data master now") self.is_Master = True - if frame['flag']['ANNOUNCE_ARQ']: + if bool(frame.get('flag', {}).get('ANNOUNCE_ARQ', False)) : print("other station announced arq, changing state") self.is_Master = False self.set_state(States.ARQ_SESSION) @@ -349,6 +349,7 @@ class P2PConnection: self.transmit_heartbeat_ack() def received_heartbeat_ack(self, frame): + self.last_data_timestamp = time.time() print("received heartbeat ack from IRS...") if frame['flag']['BUFFER_EMPTY']: print("other stations buffer is empty. We can become data master now") From cd6d9dddca6758c29a1c35d9c8363749daab2b9a Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Sun, 4 May 2025 14:10:45 +0200 Subject: [PATCH 11/36] work on heartbeat --- freedata_server/data_frame_factory.py | 10 +++++++++- freedata_server/p2p_connection.py | 17 ++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 58128508..4bae10f6 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -184,6 +184,7 @@ class DataFrameFactory: self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value] = { "frame_length": self.LENGTH_SIG1_FRAME, "session_id": 1, + "flag": 1, } # p2p payload frames @@ -554,9 +555,16 @@ class DataFrameFactory: } return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT, payload) - def build_p2p_connection_heartbeat_ack(self, session_id): + def build_p2p_connection_heartbeat_ack(self, session_id, flag_buffer_empty=False, flag_announce_arq=False): + flag = 0b00000000 + if flag_buffer_empty: + flag = helpers.set_flag(flag, 'BUFFER_EMPTY', True, self.P2P_FLAGS) + if flag_announce_arq: + flag = helpers.set_flag(flag, 'ANNOUNCE_ARQ', True, self.P2P_FLAGS) + payload = { "session_id": session_id.to_bytes(1, 'big'), + "flag": flag.to_bytes(1, 'big'), } return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK, payload) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 238523d4..3d764b0f 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -112,12 +112,16 @@ class P2PConnection: self.is_ISS = False # Indicator, if we are ISS or IRS self.is_Master = False # Indicator, if we are Maste or Not + self.announce_arq = False + self.buffer_empty = False + self.last_data_timestamp= time.time() self.start_data_processing_worker() self.flag_buffer_empty = False self.flag_announce_arq = False + self.transmission_in_progress = False # indicatews, if we are waiting for an ongoing transmission def start_data_processing_worker(self): """Starts a worker thread to monitor the transmit data queue and process data.""" @@ -129,7 +133,7 @@ class P2PConnection: return # thats our heartbeat logic, only ISS will run it - if time.time() > self.last_data_timestamp + 15 and self.state is States.CONNECTED and self.is_ISS and self.state is not States.ARQ_SESSION: + if time.time() > self.last_data_timestamp + 15 and self.state is States.CONNECTED and self.is_ISS and not self.transmission_in_progress: print("no data within last 15s. Sending heartbeat") self.transmit_heartbeat() @@ -194,6 +198,7 @@ class P2PConnection: def transmit_wait_and_retry(self, frame_or_burst, timeout, retries, mode): while retries > 0: self.event_frame_received = threading.Event() + self.transmission_in_progress = True if isinstance(frame_or_burst, list): burst = frame_or_burst else: burst = [frame_or_burst] for f in burst: @@ -201,6 +206,7 @@ class P2PConnection: self.event_frame_received.clear() self.log(f"Waiting {timeout} seconds...") if self.event_frame_received.wait(timeout): + self.transmission_in_progress = False return self.log("Timeout!") retries = retries - 1 @@ -324,15 +330,16 @@ class P2PConnection: self.launch_twr(heartbeat, 6, 10, mode=FREEDV_MODE.signalling) def transmit_heartbeat_ack(self): + print("transmit heartbeat ack") self.last_data_timestamp = time.time() - heartbeat_ack = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=self.buffer_empty, - flag_announce_arq=self.announce_arq) - self.transmit_frame([heartbeat_ack], FREEDV_MODE.signalling) + heartbeat_ack = self.frame_factory.build_p2p_connection_heartbeat_ack(self.session_id, flag_buffer_empty=self.buffer_empty,flag_announce_arq=self.announce_arq) + print(heartbeat_ack) + self.launch_twr_irs(heartbeat_ack, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) + def received_heartbeat(self, frame): print("received heartbeat...") self.last_data_timestamp = time.time() - print(frame) if bool(frame.get('flag', {}).get('BUFFER_EMPTY', False)) and self.buffer_empty: print("other stations buffer is empty as well. We wont become data master now") self.is_Master = False From e7cdb0ee4b43be6d174928dc3df054cc161cdb2a Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sun, 4 May 2025 14:29:10 +0200 Subject: [PATCH 12/36] work on heatbeat --- freedata_server/p2p_connection.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 3d764b0f..39ebefb8 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -340,19 +340,23 @@ class P2PConnection: def received_heartbeat(self, frame): print("received heartbeat...") self.last_data_timestamp = time.time() - if bool(frame.get('flag', {}).get('BUFFER_EMPTY', False)) and self.buffer_empty: - print("other stations buffer is empty as well. We wont become data master now") - self.is_Master = False - if bool(frame.get('flag', {}).get('BUFFER_EMPTY', False)) and not self.buffer_empty: - print("other stations buffer is empty. We can become data master now") - self.is_Master = True + buffer_empty_flag = frame.get('flag', {}).get('BUFFER_EMPTY', False) + announce_arq_flag = frame.get('flag', {}).get('ANNOUNCE_ARQ', False) - if bool(frame.get('flag', {}).get('ANNOUNCE_ARQ', False)) : - print("other station announced arq, changing state") + if buffer_empty_flag: + if self.buffer_empty: + print("other station's buffer is empty as well. We won't become data master now") + self.is_Master = False + else: + print("other station's buffer is empty. We can become data master now") + self.is_Master = True + + if announce_arq_flag: + print("other station announced ARQ, changing state") self.is_Master = False self.set_state(States.ARQ_SESSION) - + print("transmit heartbeat ack") self.transmit_heartbeat_ack() def received_heartbeat_ack(self, frame): From f8c49bedf91d6de840103cc2b7bf47fc1497ec8e Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Sun, 4 May 2025 14:38:21 +0200 Subject: [PATCH 13/36] work on heartbeat --- freedata_server/data_frame_factory.py | 2 +- freedata_server/p2p_connection.py | 23 ++++++++++++----------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 4bae10f6..d758f1a9 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -322,7 +322,7 @@ class DataFrameFactory: # get_flag returns True or False based on the bit value at the flag's position extracted_data[key][flag] = helpers.get_flag(data, flag, flag_dict) - if frametype in [FR_TYPE.P2P_CONNECTION_PAYLOAD.value, FR_TYPE.P2P_CONNECTION_HEARTBEAT]: + if frametype in [FR_TYPE.P2P_CONNECTION_PAYLOAD.value, FR_TYPE.P2P_CONNECTION_HEARTBEAT, FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK]: flag_dict = self.P2P_FLAGS for flag in flag_dict: # Update extracted_data with the status of each flag diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 39ebefb8..4179c3a6 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -7,9 +7,6 @@ import structlog import random from queue import Queue import time -from command_arq_raw import ARQRawCommand -import numpy as np -import base64 from arq_data_type_handler import ARQDataTypeHandler, ARQ_SESSION_TYPES from arq_session_iss import ARQSessionISS import helpers @@ -112,8 +109,7 @@ class P2PConnection: self.is_ISS = False # Indicator, if we are ISS or IRS self.is_Master = False # Indicator, if we are Maste or Not - self.announce_arq = False - self.buffer_empty = False + self.last_data_timestamp= time.time() self.start_data_processing_worker() @@ -276,11 +272,9 @@ class P2PConnection: def process_data_queue(self, frame=None): if self.p2p_data_tx_queue.empty(): - self.buffer_empty = True self.is_Master = False return - self.buffer_empty = False self.is_Master = True print("processing data....") @@ -331,8 +325,13 @@ class P2PConnection: def transmit_heartbeat_ack(self): print("transmit heartbeat ack") + + if self.p2p_data_tx_queue.empty(): + self.flag_buffer_empty = True + + self.last_data_timestamp = time.time() - heartbeat_ack = self.frame_factory.build_p2p_connection_heartbeat_ack(self.session_id, flag_buffer_empty=self.buffer_empty,flag_announce_arq=self.announce_arq) + heartbeat_ack = self.frame_factory.build_p2p_connection_heartbeat_ack(self.session_id, flag_buffer_empty=self.flag_buffer_empty,flag_announce_arq=self.flag_announce_arq) print(heartbeat_ack) self.launch_twr_irs(heartbeat_ack, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) @@ -345,7 +344,7 @@ class P2PConnection: announce_arq_flag = frame.get('flag', {}).get('ANNOUNCE_ARQ', False) if buffer_empty_flag: - if self.buffer_empty: + if self.p2p_data_tx_queue.empty(): print("other station's buffer is empty as well. We won't become data master now") self.is_Master = False else: @@ -362,11 +361,13 @@ class P2PConnection: def received_heartbeat_ack(self, frame): self.last_data_timestamp = time.time() print("received heartbeat ack from IRS...") - if frame['flag']['BUFFER_EMPTY']: + buffer_empty_flag = frame.get('flag', {}).get('BUFFER_EMPTY', False) + announce_arq_flag = frame.get('flag', {}).get('ANNOUNCE_ARQ', False) + if buffer_empty_flag: print("other stations buffer is empty. We can become data master now") self.is_Master = True - if frame['flag']['ANNOUNCE_ARQ']: + if announce_arq_flag: print("other station announced arq, changing state") self.is_Master = False self.set_state(States.ARQ_SESSION) From 81f500802dcb841e04c87e4ac4bdd38c33fc6e10 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 09:33:52 +0200 Subject: [PATCH 14/36] work on heartbeat --- freedata_server/data_frame_factory.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index d758f1a9..9fdd00a9 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -243,7 +243,6 @@ class DataFrameFactory: for key, item_length in frame_template.items(): if key == "frame_length": continue - if not isinstance(item_length, int): item_length = len(content[key]) @@ -272,7 +271,6 @@ class DataFrameFactory: frame_template = self.template_list.get(frametype) extracted_data = {"frame_type": FR_TYPE(frametype).name, "frame_type_int": frametype} - for key, item_length in frame_template.items(): if key == "frame_length": continue @@ -322,7 +320,7 @@ class DataFrameFactory: # get_flag returns True or False based on the bit value at the flag's position extracted_data[key][flag] = helpers.get_flag(data, flag, flag_dict) - if frametype in [FR_TYPE.P2P_CONNECTION_PAYLOAD.value, FR_TYPE.P2P_CONNECTION_HEARTBEAT, FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK]: + if frametype in [FR_TYPE.P2P_CONNECTION_PAYLOAD.value, FR_TYPE.P2P_CONNECTION_HEARTBEAT.value, FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value]: flag_dict = self.P2P_FLAGS for flag in flag_dict: # Update extracted_data with the status of each flag From ec4a8c90e2b86c030baaf99c21704b1bf9d1b352 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 09:55:13 +0200 Subject: [PATCH 15/36] work on heartbeat --- freedata_server/p2p_connection.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 4179c3a6..b01f8709 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -326,16 +326,12 @@ class P2PConnection: def transmit_heartbeat_ack(self): print("transmit heartbeat ack") - if self.p2p_data_tx_queue.empty(): - self.flag_buffer_empty = True - - + self.flag_buffer_empty = self.p2p_data_tx_queue.empty() self.last_data_timestamp = time.time() heartbeat_ack = self.frame_factory.build_p2p_connection_heartbeat_ack(self.session_id, flag_buffer_empty=self.flag_buffer_empty,flag_announce_arq=self.flag_announce_arq) print(heartbeat_ack) self.launch_twr_irs(heartbeat_ack, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) - def received_heartbeat(self, frame): print("received heartbeat...") self.last_data_timestamp = time.time() @@ -363,9 +359,13 @@ class P2PConnection: print("received heartbeat ack from IRS...") buffer_empty_flag = frame.get('flag', {}).get('BUFFER_EMPTY', False) announce_arq_flag = frame.get('flag', {}).get('ANNOUNCE_ARQ', False) + if buffer_empty_flag: print("other stations buffer is empty. We can become data master now") self.is_Master = True + else: + print("other station has data to be sent...") + self.is_Master = False if announce_arq_flag: print("other station announced arq, changing state") From 9c2f8fc1f8b036c1973dd5ae48cff55a8d482803 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 10:53:03 +0200 Subject: [PATCH 16/36] work on heartbeat --- freedata_server/data_frame_factory.py | 20 ++++---- freedata_server/p2p_connection.py | 68 +++++++++++++++++++-------- 2 files changed, 58 insertions(+), 30 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 9fdd00a9..1b6e2844 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -24,7 +24,7 @@ class DataFrameFactory: } P2P_FLAGS = { - 'BUFFER_EMPTY': 0, # Bit-position for indicating the BUFFER EMPTY state + 'HAS_DATA': 0, # Bit-position for indicating the BUFFER EMPTY state 'ANNOUNCE_ARQ': 1, # Bit-position for announcing an ARQ session } @@ -540,10 +540,10 @@ class DataFrameFactory: } return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT_ACK, payload) - def build_p2p_connection_heartbeat(self, session_id, flag_buffer_empty=False, flag_announce_arq=False): + def build_p2p_connection_heartbeat(self, session_id, flag_has_data=False, flag_announce_arq=False): flag = 0b00000000 - if flag_buffer_empty: - flag = helpers.set_flag(flag, 'BUFFER_EMPTY', True, self.P2P_FLAGS) + if flag_has_data: + flag = helpers.set_flag(flag, 'has_data', True, self.P2P_FLAGS) if flag_announce_arq: flag = helpers.set_flag(flag, 'ANNOUNCE_ARQ', True, self.P2P_FLAGS) @@ -553,10 +553,10 @@ class DataFrameFactory: } return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT, payload) - def build_p2p_connection_heartbeat_ack(self, session_id, flag_buffer_empty=False, flag_announce_arq=False): + def build_p2p_connection_heartbeat_ack(self, session_id, flag_has_data=False, flag_announce_arq=False): flag = 0b00000000 - if flag_buffer_empty: - flag = helpers.set_flag(flag, 'BUFFER_EMPTY', True, self.P2P_FLAGS) + if flag_has_data: + flag = helpers.set_flag(flag, 'has_data', True, self.P2P_FLAGS) if flag_announce_arq: flag = helpers.set_flag(flag, 'ANNOUNCE_ARQ', True, self.P2P_FLAGS) @@ -566,10 +566,10 @@ class DataFrameFactory: } return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK, payload) - def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes, flag_buffer_empty=False): + def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes, flag_has_data=False): flag = 0b00000000 - if flag_buffer_empty: - flag = helpers.set_flag(flag, 'BUFFER_EMPTY', True, self.P2P_FLAGS) + if flag_has_data: + flag = helpers.set_flag(flag, 'has_data', True, self.P2P_FLAGS) payload = { diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index b01f8709..f6d7ae16 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -17,6 +17,7 @@ class States(Enum): CONNECT_SENT = 2 CONNECT_ACK_SENT = 3 CONNECTED = 4 + AWAITING_DATA = 5 #HEARTBEAT_SENT = 5 #HEARTBEAT_ACK_SENT = 6 PAYLOAD_SENT = 7 @@ -114,7 +115,7 @@ class P2PConnection: self.last_data_timestamp= time.time() self.start_data_processing_worker() - self.flag_buffer_empty = False + self.flag_has_data = False self.flag_announce_arq = False self.transmission_in_progress = False # indicatews, if we are waiting for an ongoing transmission @@ -131,13 +132,21 @@ class P2PConnection: # thats our heartbeat logic, only ISS will run it if time.time() > self.last_data_timestamp + 15 and self.state is States.CONNECTED and self.is_ISS and not self.transmission_in_progress: print("no data within last 15s. Sending heartbeat") - self.transmit_heartbeat() + + if self.p2p_data_tx_queue.empty(): + self.flag_has_data = False + else: + self.flag_has_data = True + self.transmit_heartbeat(has_data=self.flag_has_data) if self.state == States.CONNECTED and self.is_Master: self.process_data_queue() threading.Event().wait(0.500) + if self.state is not States.ARQ_SESSION and self.is_Master: + threading.Event().wait(5) + self.process_data_queue() @@ -289,7 +298,12 @@ class P2PConnection: self.transmit_arq(data) return - payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data) + if self.p2p_data_tx_queue.empty(): + self.flag_has_data = False + else: + self.flag_has_data = True + + payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data, flag_has_data=self.flag_has_data) self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA,mode=mode) self.set_state(States.PAYLOAD_SENT) @@ -301,6 +315,9 @@ class P2PConnection: ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0) self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling_ack) + if not frame["flag"]["HAS_DATA"] and self.is_ISS: + self.set_state(States.CONNECTED) + try: received_data = frame['data'].rstrip(b'\x00') if self.ctx.socket_interface_manager and hasattr(self.ctx.socket_interface_manager.data_server, "data_handler"): @@ -317,18 +334,23 @@ class P2PConnection: print("transmitted data...") self.set_state(States.CONNECTED) - def transmit_heartbeat(self, buffer_empty=False, announce_arq=False): + def transmit_heartbeat(self, has_data=False, announce_arq=False): # heartbeats will be transmit by ISS only, therefore only IRS can reveice heartbeat ack self.last_data_timestamp = time.time() - heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=buffer_empty, flag_announce_arq=announce_arq) + + heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_has_data=has_data, flag_announce_arq=announce_arq) self.launch_twr(heartbeat, 6, 10, mode=FREEDV_MODE.signalling) def transmit_heartbeat_ack(self): print("transmit heartbeat ack") - self.flag_buffer_empty = self.p2p_data_tx_queue.empty() + if self.p2p_data_tx_queue.empty(): + self.flag_has_data = False + else: + self.flag_has_data = True + self.last_data_timestamp = time.time() - heartbeat_ack = self.frame_factory.build_p2p_connection_heartbeat_ack(self.session_id, flag_buffer_empty=self.flag_buffer_empty,flag_announce_arq=self.flag_announce_arq) + heartbeat_ack = self.frame_factory.build_p2p_connection_heartbeat_ack(self.session_id, flag_has_data=self.flag_has_data,flag_announce_arq=self.flag_announce_arq) print(heartbeat_ack) self.launch_twr_irs(heartbeat_ack, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) @@ -336,18 +358,24 @@ class P2PConnection: print("received heartbeat...") self.last_data_timestamp = time.time() - buffer_empty_flag = frame.get('flag', {}).get('BUFFER_EMPTY', False) - announce_arq_flag = frame.get('flag', {}).get('ANNOUNCE_ARQ', False) + if frame["flag"]["HAS_DATA"]: + print("other station has data") + self.is_Master = False + + else: - if buffer_empty_flag: if self.p2p_data_tx_queue.empty(): print("other station's buffer is empty as well. We won't become data master now") self.is_Master = False + self.flag_has_data = False else: print("other station's buffer is empty. We can become data master now") self.is_Master = True + self.flag_has_data = True - if announce_arq_flag: + + + if frame["flag"]["ANNOUNCE_ARQ"]: print("other station announced ARQ, changing state") self.is_Master = False self.set_state(States.ARQ_SESSION) @@ -357,17 +385,17 @@ class P2PConnection: def received_heartbeat_ack(self, frame): self.last_data_timestamp = time.time() print("received heartbeat ack from IRS...") - buffer_empty_flag = frame.get('flag', {}).get('BUFFER_EMPTY', False) - announce_arq_flag = frame.get('flag', {}).get('ANNOUNCE_ARQ', False) - if buffer_empty_flag: - print("other stations buffer is empty. We can become data master now") - self.is_Master = True - else: - print("other station has data to be sent...") + if frame["flag"]["HAS_DATA"]: + print("other station has data") self.is_Master = False + self.set_state(States.AWAITING_DATA) + else: + print("other station has no data, we become master now") + self.is_Master = True + self.set_state(States.CONNECTED) - if announce_arq_flag: + if frame["flag"]["ANNOUNCE_ARQ"]: print("other station announced arq, changing state") self.is_Master = False self.set_state(States.ARQ_SESSION) @@ -417,7 +445,7 @@ class P2PConnection: arq_destination = self.origin #self.log(f"ANNOUNCING ARQ to destination: {self.destination}") - #heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=False, flag_announce_arq=True) + #heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_has_data=False, flag_announce_arq=True) #self.launch_twr(heartbeat, 5, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) #self.event_frame_received.wait() #self.log(f"ARQ destination: {self.destination}") From 5e4b7b5591f8e6e5fd8d8050ca61061345216ea2 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 10:56:11 +0200 Subject: [PATCH 17/36] work on heartbeat --- freedata_server/data_frame_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 1b6e2844..4a94a8eb 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -543,7 +543,7 @@ class DataFrameFactory: def build_p2p_connection_heartbeat(self, session_id, flag_has_data=False, flag_announce_arq=False): flag = 0b00000000 if flag_has_data: - flag = helpers.set_flag(flag, 'has_data', True, self.P2P_FLAGS) + flag = helpers.set_flag(flag, 'HAS_DATA', True, self.P2P_FLAGS) if flag_announce_arq: flag = helpers.set_flag(flag, 'ANNOUNCE_ARQ', True, self.P2P_FLAGS) From af4ab6c91d4c45b7550d3cbca107ec3ba0899fee Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 10:56:54 +0200 Subject: [PATCH 18/36] work on heartbeat --- freedata_server/data_frame_factory.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 4a94a8eb..d71b8b01 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -556,7 +556,7 @@ class DataFrameFactory: def build_p2p_connection_heartbeat_ack(self, session_id, flag_has_data=False, flag_announce_arq=False): flag = 0b00000000 if flag_has_data: - flag = helpers.set_flag(flag, 'has_data', True, self.P2P_FLAGS) + flag = helpers.set_flag(flag, 'HAS_DATA', True, self.P2P_FLAGS) if flag_announce_arq: flag = helpers.set_flag(flag, 'ANNOUNCE_ARQ', True, self.P2P_FLAGS) @@ -569,7 +569,7 @@ class DataFrameFactory: def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes, flag_has_data=False): flag = 0b00000000 if flag_has_data: - flag = helpers.set_flag(flag, 'has_data', True, self.P2P_FLAGS) + flag = helpers.set_flag(flag, 'HAS_DATA', True, self.P2P_FLAGS) payload = { From 50a80eec983be684b8763cdf23f5675d0430a524 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 11:08:12 +0200 Subject: [PATCH 19/36] work on heartbeat --- freedata_server/p2p_connection.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index f6d7ae16..5d88855b 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -386,10 +386,14 @@ class P2PConnection: self.last_data_timestamp = time.time() print("received heartbeat ack from IRS...") - if frame["flag"]["HAS_DATA"]: - print("other station has data") + if frame["flag"]["HAS_DATA"] and not self.flag_has_data: + print("other station has data, we are not") self.is_Master = False self.set_state(States.AWAITING_DATA) + elif frame["flag"]["HAS_DATA"] and self.flag_has_data: + print("other station has data and we as well, we become master") + self.is_Master = True + self.set_state(States.CONNECTED) else: print("other station has no data, we become master now") self.is_Master = True From 272e8947af202d126d5c1222127f19a877429252 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 11:16:28 +0200 Subject: [PATCH 20/36] adjusted disconnecting --- freedata_server/p2p_connection.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 5d88855b..2ba6b127 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -52,6 +52,11 @@ class P2PConnection: FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack', }, + States.AWAITING_DATA: { + FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'transmitted_data', + FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack', + }, States.ARQ_SESSION: { FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data', FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', @@ -125,7 +130,7 @@ class P2PConnection: def data_processing_worker(): while True: - if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state is not States.ARQ_SESSION: + if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state not in [States.DISCONNECTING, States.DISCONNECTED, States.ARQ_SESSION, States.FAILED]: self.disconnect() return @@ -409,7 +414,7 @@ class P2PConnection: def disconnect(self): - if self.state not in [States.DISCONNECTING, States.DISCONNECTED]: + if self.state not in [States.DISCONNECTING, States.DISCONNECTED, States.ARQ_SESSION]: self.set_state(States.DISCONNECTING) disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id) self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) From 59c0c4a2ef3cb006f9c87571fa46d90d6d9c7468 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 12:27:07 +0200 Subject: [PATCH 21/36] adding compression --- freedata_server/data_frame_factory.py | 5 +++-- freedata_server/p2p_connection.py | 25 +++++++++++++++++++++---- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index d71b8b01..a60963f4 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -566,11 +566,12 @@ class DataFrameFactory: } return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK, payload) - def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes, flag_has_data=False): + def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes, flag_has_data=False, flag_announce_arq=False): flag = 0b00000000 if flag_has_data: flag = helpers.set_flag(flag, 'HAS_DATA', True, self.P2P_FLAGS) - + if flag_announce_arq: + flag = helpers.set_flag(flag, 'ANNOUNCE_ARQ', True, self.P2P_FLAGS) payload = { "session_id": session_id.to_bytes(1, 'big'), diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 2ba6b127..0ad0fb60 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -10,6 +10,7 @@ import time from arq_data_type_handler import ARQDataTypeHandler, ARQ_SESSION_TYPES from arq_session_iss import ARQSessionISS import helpers +import zlib class States(Enum): NEW = 0 @@ -292,15 +293,18 @@ class P2PConnection: self.is_Master = True print("processing data....") - data = self.p2p_data_tx_queue.get() + raw_data = self.p2p_data_tx_queue.get() sequence_id = random.randint(0,255) + compressor = zlib.compressobj(level=6, wbits=-zlib.MAX_WBITS, strategy=zlib.Z_FILTERED) + data = compressor.compress(raw_data) + compressor.flush() + if len(data) <= 11: mode = FREEDV_MODE.signalling elif 11 < len(data) <= 32: mode = FREEDV_MODE.datac4 else: - self.transmit_arq(data) + self.transmit_arq(raw_data) return if self.p2p_data_tx_queue.empty(): @@ -322,9 +326,16 @@ class P2PConnection: if not frame["flag"]["HAS_DATA"] and self.is_ISS: self.set_state(States.CONNECTED) + else: + self.set_state(States.AWAITING_DATA) try: received_data = frame['data'].rstrip(b'\x00') + + decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS) + received_data = decompressor.decompress(received_data) + received_data += decompressor.flush() + if self.ctx.socket_interface_manager and hasattr(self.ctx.socket_interface_manager.data_server, "data_handler"): self.log(f"sending {len(received_data)} bytes to data socket client") self.ctx.socket_interface_manager.data_server.data_handler.send_data_to_client(received_data) @@ -360,6 +371,9 @@ class P2PConnection: self.launch_twr_irs(heartbeat_ack, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) def received_heartbeat(self, frame): + # we don't accept heartbeats as ISS + if self.is_ISS: + return print("received heartbeat...") self.last_data_timestamp = time.time() @@ -441,6 +455,8 @@ class P2PConnection: if self.ctx.socket_interface_manager: self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_disconnected() + + def transmit_arq(self, data): """ This function needs to be fixed - we want to send ARQ data within a p2p connection @@ -458,7 +474,7 @@ class P2PConnection: #self.launch_twr(heartbeat, 5, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) #self.event_frame_received.wait() #self.log(f"ARQ destination: {self.destination}") - self.transmit_heartbeat(announce_arq=True) + #self.transmit_heartbeat(announce_arq=True) print("wait some time until ARQ starts....") threading.Event().wait(5) @@ -476,6 +492,7 @@ class P2PConnection: def transmitted_arq(self, transmitted_data): self.last_data_timestamp = time.time() self.set_state(States.CONNECTED) + self.is_Master = True def received_arq(self, received_data): self.last_data_timestamp = time.time() @@ -490,4 +507,4 @@ class P2PConnection: self.log(f"Error sending data to socket: {e}") def failed_arq(self): - self.set_state(States.CONNECTED) \ No newline at end of file + self.set_state(States.CONNECTED) From 4e15e6004a3b3ad121e072d8b661109118d8db2b Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 13:54:52 +0200 Subject: [PATCH 22/36] ensure correct state on NEW --- freedata_server/p2p_connection.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 0ad0fb60..d9814873 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -33,6 +33,8 @@ class P2PConnection: STATE_TRANSITION = { States.NEW: { FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', + }, States.CONNECTING: { FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss', @@ -56,6 +58,7 @@ class P2PConnection: States.AWAITING_DATA: { FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'transmitted_data', FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack', }, States.ARQ_SESSION: { @@ -374,6 +377,11 @@ class P2PConnection: # we don't accept heartbeats as ISS if self.is_ISS: return + + # ensure we have correct state + if self.state is States.NEW: + self.connected_irs() + print("received heartbeat...") self.last_data_timestamp = time.time() From 32bdfd335659fb4d79d5cdc87e78000ae39605f3 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 14:03:26 +0200 Subject: [PATCH 23/36] fixing wrong burst handling --- freedata_server/p2p_connection.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index d9814873..54a93941 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -56,7 +56,7 @@ class P2PConnection: }, States.AWAITING_DATA: { - FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'transmitted_data', + FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'received_data', FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack', @@ -197,7 +197,6 @@ class P2PConnection: if frame_type in self.STATE_TRANSITION[self.state]: action_name = self.STATE_TRANSITION[self.state][frame_type] response = getattr(self, action_name)(frame) - return self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}") From b6a801fab76bde96b35526ab9fa14fda62131add Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 16:16:50 +0200 Subject: [PATCH 24/36] fixing wrong burst handling --- freedata_server/p2p_connection.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 54a93941..ac360888 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -139,7 +139,7 @@ class P2PConnection: return # thats our heartbeat logic, only ISS will run it - if time.time() > self.last_data_timestamp + 15 and self.state is States.CONNECTED and self.is_ISS and not self.transmission_in_progress: + if time.time() > self.last_data_timestamp + 10 and self.state is States.CONNECTED and self.is_ISS and not self.transmission_in_progress: print("no data within last 15s. Sending heartbeat") if self.p2p_data_tx_queue.empty(): @@ -154,7 +154,7 @@ class P2PConnection: threading.Event().wait(0.500) if self.state is not States.ARQ_SESSION and self.is_Master: - threading.Event().wait(5) + threading.Event().wait(2) self.process_data_queue() @@ -293,29 +293,40 @@ class P2PConnection: return self.is_Master = True - print("processing data....") raw_data = self.p2p_data_tx_queue.get() sequence_id = random.randint(0,255) + compressor = zlib.compressobj(level=6, wbits=-zlib.MAX_WBITS, strategy=zlib.Z_FILTERED) data = compressor.compress(raw_data) + compressor.flush() + self.log(f"Processing data....{len(data)} Bytes - {raw_data}") + + if len(data) <= 11: mode = FREEDV_MODE.signalling elif 11 < len(data) <= 32: mode = FREEDV_MODE.datac4 else: + self.log("Using ARQ for sending data...") + self.set_state(States.ARQ_SESSION) self.transmit_arq(raw_data) return + # Additional return statement for avoiding wrong data. + if self.state is States.ARQ_SESSION: + return + + self.log("Using burst for sending data...") + if self.p2p_data_tx_queue.empty(): self.flag_has_data = False else: self.flag_has_data = True payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data, flag_has_data=self.flag_has_data) - self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA,mode=mode) + self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=mode) self.set_state(States.PAYLOAD_SENT) return From 039c9b4a015dd23b7a20be4308cea9620c96e802 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 16:25:41 +0200 Subject: [PATCH 25/36] fixing wrong burst handling --- freedata_server/p2p_connection.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index ac360888..35dd0be8 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -148,14 +148,12 @@ class P2PConnection: self.flag_has_data = True self.transmit_heartbeat(has_data=self.flag_has_data) - if self.state == States.CONNECTED and self.is_Master: + if self.state in [States.CONNECTED] and self.is_Master: + threading.Event().wait(3) self.process_data_queue() threading.Event().wait(0.500) - if self.state is not States.ARQ_SESSION and self.is_Master: - threading.Event().wait(2) - self.process_data_queue() @@ -361,7 +359,7 @@ class P2PConnection: def transmitted_data(self, frame): print("transmitted data...") - self.set_state(States.CONNECTED) + #self.set_state(States.CONNECTED) def transmit_heartbeat(self, has_data=False, announce_arq=False): # heartbeats will be transmit by ISS only, therefore only IRS can reveice heartbeat ack From 00c46c90f86f79636d5421a589fccf98cf37d811 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 20:23:59 +0200 Subject: [PATCH 26/36] adding additional payload transmission state --- freedata_server/p2p_connection.py | 28 +++++++++++++++++++--------- freedata_server/state_manager.py | 2 +- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 35dd0be8..51152563 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -19,8 +19,7 @@ class States(Enum): CONNECT_ACK_SENT = 3 CONNECTED = 4 AWAITING_DATA = 5 - #HEARTBEAT_SENT = 5 - #HEARTBEAT_ACK_SENT = 6 + PAYLOAD_TRANSMISSION = 6 PAYLOAD_SENT = 7 ARQ_SESSION = 8 DISCONNECTING = 9 @@ -47,6 +46,13 @@ class P2PConnection: FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack', + }, + States.PAYLOAD_TRANSMISSION: { + FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data', + FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack', + }, States.PAYLOAD_SENT: { FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data', @@ -77,8 +83,12 @@ class P2PConnection: }, States.ABORTED:{ + FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack', }, States.FAILED: { + FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack', }, } @@ -134,12 +144,12 @@ class P2PConnection: def data_processing_worker(): while True: - if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state not in [States.DISCONNECTING, States.DISCONNECTED, States.ARQ_SESSION, States.FAILED]: + if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state not in [States.DISCONNECTING, States.DISCONNECTED, States.ARQ_SESSION, States.FAILED, States.PAYLOAD_TRANSMISSION]: self.disconnect() return # thats our heartbeat logic, only ISS will run it - if time.time() > self.last_data_timestamp + 10 and self.state is States.CONNECTED and self.is_ISS and not self.transmission_in_progress: + if time.time() > self.last_data_timestamp + 10 and self.state in [States.CONNECTED, States.PAYLOAD_SENT] and self.is_ISS and not self.transmission_in_progress: print("no data within last 15s. Sending heartbeat") if self.p2p_data_tx_queue.empty(): @@ -148,7 +158,7 @@ class P2PConnection: self.flag_has_data = True self.transmit_heartbeat(has_data=self.flag_has_data) - if self.state in [States.CONNECTED] and self.is_Master: + if self.state in [States.CONNECTED, States.PAYLOAD_SENT] and self.is_Master: threading.Event().wait(3) self.process_data_queue() @@ -317,7 +327,7 @@ class P2PConnection: return self.log("Using burst for sending data...") - + self.set_state(States.PAYLOAD_TRANSMISSION) if self.p2p_data_tx_queue.empty(): self.flag_has_data = False else: @@ -325,7 +335,7 @@ class P2PConnection: payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data, flag_has_data=self.flag_has_data) self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=mode) - self.set_state(States.PAYLOAD_SENT) + return @@ -358,8 +368,8 @@ class P2PConnection: def transmitted_data(self, frame): - print("transmitted data...") - #self.set_state(States.CONNECTED) + self.log("Transmitted data...") + self.set_state(States.PAYLOAD_SENT) def transmit_heartbeat(self, has_data=False, announce_arq=False): # heartbeats will be transmit by ISS only, therefore only IRS can reveice heartbeat ack diff --git a/freedata_server/state_manager.py b/freedata_server/state_manager.py index 44a7058e..24c3f098 100644 --- a/freedata_server/state_manager.py +++ b/freedata_server/state_manager.py @@ -554,7 +554,7 @@ class StateManager: bool: True if the session was registered, False otherwise. """ if session.session_id in self.p2p_connection_sessions: - print("session already registered...") + print("session already registered...", session.session_id) return False self.p2p_connection_sessions[session.session_id] = session return True From 128723fe19673e78c48c165df3f92211a78d6d8a Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 20:29:32 +0200 Subject: [PATCH 27/36] state fix after sending all data --- freedata_server/p2p_connection.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 51152563..b8c8a964 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -369,7 +369,12 @@ class P2PConnection: def transmitted_data(self, frame): self.log("Transmitted data...") - self.set_state(States.PAYLOAD_SENT) + if self.p2p_data_tx_queue.empty(): + self.log("Transmitted all data...") + self.set_state(States.CONNECTED) + else: + self.log("Moving to next payload...") + self.set_state(States.PAYLOAD_SENT) def transmit_heartbeat(self, has_data=False, announce_arq=False): # heartbeats will be transmit by ISS only, therefore only IRS can reveice heartbeat ack @@ -379,7 +384,7 @@ class P2PConnection: self.launch_twr(heartbeat, 6, 10, mode=FREEDV_MODE.signalling) def transmit_heartbeat_ack(self): - print("transmit heartbeat ack") + self.log("Transmitting heartbeat ACK") if self.p2p_data_tx_queue.empty(): self.flag_has_data = False @@ -400,7 +405,7 @@ class P2PConnection: if self.state is States.NEW: self.connected_irs() - print("received heartbeat...") + self.log("Received heartbeat...") self.last_data_timestamp = time.time() if frame["flag"]["HAS_DATA"]: From cd47ed51345c2bfda7aa1915f38fa345dfaea53c Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 20:37:21 +0200 Subject: [PATCH 28/36] state fix after sending all data --- freedata_server/p2p_connection.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index b8c8a964..3e7fbffb 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -340,12 +340,12 @@ class P2PConnection: return def received_data(self, frame): - self.log(f"received data...: {frame}") + self.log(f"received data...") ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0) self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling_ack) - if not frame["flag"]["HAS_DATA"] and self.is_ISS: + if not frame["flag"]["HAS_DATA"]: self.set_state(States.CONNECTED) else: self.set_state(States.AWAITING_DATA) @@ -358,7 +358,7 @@ class P2PConnection: received_data += decompressor.flush() if self.ctx.socket_interface_manager and hasattr(self.ctx.socket_interface_manager.data_server, "data_handler"): - self.log(f"sending {len(received_data)} bytes to data socket client") + self.log(f"sending {len(received_data)} bytes to data socket client: {received_data}") self.ctx.socket_interface_manager.data_server.data_handler.send_data_to_client(received_data) except Exception as e: From 258cc05f32b48fb231202d4976ef3d41a52b0b6d Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 20:53:36 +0200 Subject: [PATCH 29/36] adjustment to arq session handling --- freedata_server/arq_session_irs.py | 1 + freedata_server/p2p_connection.py | 23 +++++++++++++++++--- freedata_server/socket_interface_commands.py | 5 ++++- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/freedata_server/arq_session_irs.py b/freedata_server/arq_session_irs.py index dec881f1..f16bcc41 100644 --- a/freedata_server/arq_session_irs.py +++ b/freedata_server/arq_session_irs.py @@ -283,6 +283,7 @@ class ARQSessionIRS(arq_session.ARQSession): # update p2p connection timeout if self.running_p2p_connection: self.running_p2p_connection.last_data_timestamp = time.time() + self.running_p2p_connection.running_arq_session = self remaining_data_length = self.total_length - self.received_bytes self.log(f"Remaining data: {remaining_data_length}", isWarning=True) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 3e7fbffb..1384d978 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -138,7 +138,7 @@ class P2PConnection: self.flag_announce_arq = False self.transmission_in_progress = False # indicatews, if we are waiting for an ongoing transmission - + self.running_arq_session = None def start_data_processing_worker(self): """Starts a worker thread to monitor the transmit data queue and process data.""" @@ -461,6 +461,9 @@ class P2PConnection: def disconnect(self): if self.state not in [States.DISCONNECTING, States.DISCONNECTED, States.ARQ_SESSION]: self.set_state(States.DISCONNECTING) + + self.abort_arq() + disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id) self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) return @@ -468,12 +471,25 @@ class P2PConnection: def abort_connection(self): # abort is a dirty disconnect self.log("ABORTING...............") + + self.abort_arq() + self.event_frame_received.set() self.set_state(States.DISCONNECTED) + def abort_arq(self): + try: + if self.running_arq_session: + self.running_arq_session.abort_transmission() + except Exception as e: + self.log(f"Error stopping ARQ session {e}") + def received_disconnect(self, frame): self.log("DISCONNECTED...............") self.set_state(States.DISCONNECTED) + + self.abort_arq() + if self.ctx.socket_interface_manager: self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_disconnected() self.is_ISS = False @@ -514,7 +530,7 @@ class P2PConnection: iss.id = self.session_id # register p2p connection to arq session iss.running_p2p_connection = self - + self.running_arq_session = iss if iss.id: self.ctx.state_manager.register_arq_iss_session(iss) iss.start() @@ -524,11 +540,12 @@ class P2PConnection: self.last_data_timestamp = time.time() self.set_state(States.CONNECTED) self.is_Master = True + self.running_arq_session = None def received_arq(self, received_data): self.last_data_timestamp = time.time() self.set_state(States.CONNECTED) - + self.running_arq_session = None try: if self.ctx.socket_interface_manager and hasattr(self.ctx.socket_interface_manager.data_server, "data_handler"): self.log(f"sending {len(received_data)} bytes to data socket client") diff --git a/freedata_server/socket_interface_commands.py b/freedata_server/socket_interface_commands.py index 45d132cf..3777dee6 100644 --- a/freedata_server/socket_interface_commands.py +++ b/freedata_server/socket_interface_commands.py @@ -63,7 +63,10 @@ class SocketCommandHandler: def handle_abort(self, data): # Logic for handling ABORT command self.send_response(f"OK") - self.session.abort_connection() + try: + self.session.abort_connection() + except Exception as e: + self.send_response(f"ERR: {e}") self.send_response(f"DISCONNECTED") def handle_public(self, data): From 2d7aba787d010f6f79fc4c86d987776c3200c91b Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 21:12:08 +0200 Subject: [PATCH 30/36] improved timing --- freedata_server/p2p_connection.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 1384d978..c8df500d 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -149,7 +149,7 @@ class P2PConnection: return # thats our heartbeat logic, only ISS will run it - if time.time() > self.last_data_timestamp + 10 and self.state in [States.CONNECTED, States.PAYLOAD_SENT] and self.is_ISS and not self.transmission_in_progress: + if time.time() > self.last_data_timestamp + 5 and self.state in [States.CONNECTED, States.PAYLOAD_SENT] and self.is_ISS and not self.transmission_in_progress: print("no data within last 15s. Sending heartbeat") if self.p2p_data_tx_queue.empty(): @@ -159,10 +159,10 @@ class P2PConnection: self.transmit_heartbeat(has_data=self.flag_has_data) if self.state in [States.CONNECTED, States.PAYLOAD_SENT] and self.is_Master: - threading.Event().wait(3) + threading.Event().wait(0.5) self.process_data_queue() - threading.Event().wait(0.500) + threading.Event().wait(0.100) From 0527637ae57fe1a89536e3d7fb0b377a64068dad Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 21:27:55 +0200 Subject: [PATCH 31/36] adjusted logging --- freedata_server/data_frame_factory.py | 4 +-- freedata_server/p2p_connection.py | 34 ++++++++------------ freedata_server/socket_interface_commands.py | 5 ++- 3 files changed, 19 insertions(+), 24 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index a60963f4..0a0f07ee 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -277,7 +277,7 @@ class DataFrameFactory: # data is always on the last payload slots if item_length in ["dynamic"] and key in["data"]: - print(len(frame)) + #print(len(frame)) data = frame[buffer_position:-2] item_length = len(data) else: @@ -579,7 +579,7 @@ class DataFrameFactory: "flag": flag.to_bytes(1, 'big'), "data": data, } - print(self.get_bytes_per_frame(freedv_mode)) + #print(self.get_bytes_per_frame(freedv_mode)) return self.construct( FR_TYPE.P2P_CONNECTION_PAYLOAD, payload, diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index c8df500d..b3306f34 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -148,9 +148,9 @@ class P2PConnection: self.disconnect() return - # thats our heartbeat logic, only ISS will run it + # this is our heartbeat logic, only ISS will run it if time.time() > self.last_data_timestamp + 5 and self.state in [States.CONNECTED, States.PAYLOAD_SENT] and self.is_ISS and not self.transmission_in_progress: - print("no data within last 15s. Sending heartbeat") + self.log("Sending heartbeat") if self.p2p_data_tx_queue.empty(): self.flag_has_data = False @@ -393,7 +393,6 @@ class P2PConnection: self.last_data_timestamp = time.time() heartbeat_ack = self.frame_factory.build_p2p_connection_heartbeat_ack(self.session_id, flag_has_data=self.flag_has_data,flag_announce_arq=self.flag_announce_arq) - print(heartbeat_ack) self.launch_twr_irs(heartbeat_ack, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) def received_heartbeat(self, frame): @@ -409,48 +408,48 @@ class P2PConnection: self.last_data_timestamp = time.time() if frame["flag"]["HAS_DATA"]: - print("other station has data") + self.log("Opposite station has data") self.is_Master = False else: if self.p2p_data_tx_queue.empty(): - print("other station's buffer is empty as well. We won't become data master now") + self.log("Opposite station's data buffer is empty as well -- We won't become data master now") self.is_Master = False self.flag_has_data = False else: - print("other station's buffer is empty. We can become data master now") + self.log("Opposite station's data buffer is empty. We can become data master now") self.is_Master = True self.flag_has_data = True if frame["flag"]["ANNOUNCE_ARQ"]: - print("other station announced ARQ, changing state") + self.log("Opposite station announced ARQ, changing state") self.is_Master = False self.set_state(States.ARQ_SESSION) - print("transmit heartbeat ack") + self.log("transmit heartbeat ack") self.transmit_heartbeat_ack() def received_heartbeat_ack(self, frame): self.last_data_timestamp = time.time() - print("received heartbeat ack from IRS...") + self.log("Received heartbeat ack from opposite...") if frame["flag"]["HAS_DATA"] and not self.flag_has_data: - print("other station has data, we are not") + self.log("other station has data, we are not") self.is_Master = False self.set_state(States.AWAITING_DATA) elif frame["flag"]["HAS_DATA"] and self.flag_has_data: - print("other station has data and we as well, we become master") + self.log("other station has data and we as well, we become master") self.is_Master = True self.set_state(States.CONNECTED) else: - print("other station has no data, we become master now") + self.log("other station has no data, we become master now") self.is_Master = True self.set_state(States.CONNECTED) if frame["flag"]["ANNOUNCE_ARQ"]: - print("other station announced arq, changing state") + self.log("other station announced arq, changing state") self.is_Master = False self.set_state(States.ARQ_SESSION) @@ -516,14 +515,7 @@ class P2PConnection: else: arq_destination = self.origin - #self.log(f"ANNOUNCING ARQ to destination: {self.destination}") - #heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_has_data=False, flag_announce_arq=True) - #self.launch_twr(heartbeat, 5, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) - #self.event_frame_received.wait() - #self.log(f"ARQ destination: {self.destination}") - #self.transmit_heartbeat(announce_arq=True) - print("wait some time until ARQ starts....") - threading.Event().wait(5) + threading.Event().wait(3) prepared_data, type_byte = self.arq_data_type_handler.prepare(data, ARQ_SESSION_TYPES.p2p_connection) iss = ARQSessionISS(self.ctx, arq_destination, prepared_data, type_byte) diff --git a/freedata_server/socket_interface_commands.py b/freedata_server/socket_interface_commands.py index 3777dee6..6ea2ead9 100644 --- a/freedata_server/socket_interface_commands.py +++ b/freedata_server/socket_interface_commands.py @@ -46,7 +46,10 @@ class SocketCommandHandler: def handle_disconnect(self, data): self.send_response(f"OK") - self.session.disconnect() + try: + self.session.disconnect() + except Exception as e: + self.log(f"Error disconnecting socket: {e}", isWarning = True) def handle_mycall(self, data): #Storing all of the callsigns assigned by client, to make sure they are checked later in new frames. From 6bd4618fc8a946e82889a6095c3a8da986e751ec Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Mon, 5 May 2025 22:14:05 +0200 Subject: [PATCH 32/36] adjusted compression --- freedata_server/arq_data_type_handler.py | 55 ++++++++---------------- freedata_server/arq_session_irs.py | 7 ++- 2 files changed, 24 insertions(+), 38 deletions(-) diff --git a/freedata_server/arq_data_type_handler.py b/freedata_server/arq_data_type_handler.py index 84f41034..4dc337e1 100644 --- a/freedata_server/arq_data_type_handler.py +++ b/freedata_server/arq_data_type_handler.py @@ -393,7 +393,6 @@ class ARQDataTypeHandler: Returns: The ZLIB-compressed data as a bytearray. """ - compressed_data = lzma.compress(data) compressor = zlib.compressobj(level=6, wbits=-zlib.MAX_WBITS, strategy=zlib.Z_FILTERED) compressed_data = compressor.compress(data) + compressor.flush() @@ -468,65 +467,49 @@ class ARQDataTypeHandler: def prepare_p2p_connection(self, data): - """Prepares P2P connection data for transmission using GZIP compression. + """Prepares P2P connection data for transmission using ZLIB compression. - This method compresses the given data using GZIP and logs the size of the - data before and after compression. + This method compresses the provided data using ZLIB with specific settings for + peer-to-peer connection sessions. It logs the size of the data before and after + compression and prints the current P2P connection sessions. Args: data: The bytearray containing the P2P connection data to be compressed. Returns: - The GZIP-compressed data as a bytearray. + The ZLIB-compressed data as a bytearray. """ - compressed_data = gzip.compress(data) - self.log(f"Preparing gzip compressed P2P_CONNECTION data: {len(data)} Bytes >>> {len(compressed_data)} Bytes") + + compressor = zlib.compressobj(level=6, wbits=-zlib.MAX_WBITS, strategy=zlib.Z_FILTERED) + compressed_data = compressor.compress(data) + compressor.flush() + + self.log(f"Preparing zlib compressed P2P_CONNECTION data: {len(data)} Bytes >>> {len(compressed_data)} Bytes") print(self.ctx.state_manager.p2p_connection_sessions) return compressed_data def handle_p2p_connection(self, data, statistics): - """Handles GZIP compressed P2P connection data. - This method decompresses the provided GZIP data, logs size information, - and then distributes the decompressed data to active P2P connection sessions. + decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS) + decompressed_data = decompressor.decompress(data) + decompressed_data += decompressor.flush() - Args: - data: The bytearray containing the GZIP compressed P2P connection data. - statistics: A dictionary containing statistics related to the ARQ session. - """ - decompressed_data = gzip.decompress(data) self.log(f"Handling gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes") for session_id in self.ctx.state_manager.p2p_connection_sessions: self.ctx.state_manager.p2p_connection_sessions[session_id].received_arq(decompressed_data) def failed_p2p_connection(self, data, statistics): - """Handles failed GZIP compressed P2P connection data. - This method decompresses the provided GZIP data, logs size information - and an error message, and then returns the decompressed data. - - Args: - data: The bytearray containing the GZIP compressed P2P connection data. - statistics: A dictionary containing statistics related to the ARQ session. - - Returns: - The decompressed data as a bytearray. - """ - decompressed_data = gzip.decompress(data) + decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS) + decompressed_data = decompressor.decompress(data) + decompressed_data += decompressor.flush() self.log(f"Handling failed gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes", isWarning=True) for session_id in self.ctx.state_manager.p2p_connection_sessions: self.ctx.state_manager.p2p_connection_sessions[session_id].failed_arq() def transmitted_p2p_connection(self, data, statistics): - """Handles the successful transmission of GZIP compressed P2P connection data. - This method decompresses the provided GZIP data and then calls the - `transmitted_arq` method for each active P2P connection session. - - Args: - data: The bytearray containing the GZIP compressed P2P connection data. - statistics: A dictionary containing statistics related to the ARQ session. - """ - decompressed_data = gzip.decompress(data) + decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS) + decompressed_data = decompressor.decompress(data) + decompressed_data += decompressor.flush() for session_id in self.ctx.state_manager.p2p_connection_sessions: self.ctx.state_manager.p2p_connection_sessions[session_id].transmitted_arq(decompressed_data) \ No newline at end of file diff --git a/freedata_server/arq_session_irs.py b/freedata_server/arq_session_irs.py index f16bcc41..65e217b4 100644 --- a/freedata_server/arq_session_irs.py +++ b/freedata_server/arq_session_irs.py @@ -298,6 +298,8 @@ class ARQSessionIRS(arq_session.ARQSession): self.received_data[frame['offset']:] = data_part #self.received_bytes += len(data_part) self.received_bytes = len(self.received_data) + print(self.received_bytes) + print(self.received_data) self.log(f"Received {self.received_bytes}/{self.total_length} bytes") self.ctx.event_manager.send_arq_session_progress( False, self.id, self.dxcall, self.received_bytes, self.total_length, self.state.name, self.speed_level, self.calculate_session_statistics(self.received_bytes, self.total_length)) @@ -356,7 +358,7 @@ class ARQSessionIRS(arq_session.ARQSession): flag_final=True, flag_checksum=False) self.transmit_frame(ack, mode=FREEDV_MODE.signalling_ack) - self.log("CRC fail at the end of transmission!") + self.log("CRC fail at the end of transmission!", isWarning=True) return self.transmission_failed() @@ -475,7 +477,8 @@ class ARQSessionIRS(arq_session.ARQSession): self.log("Transmission failed!") #self.ctx.rf_modem.demodulator.set_decode_mode() session_stats = self.calculate_session_statistics(self.received_bytes, self.total_length) - + self.received_data = None + self.received_bytes = 0 self.ctx.event_manager.send_arq_session_finished(True, self.id, self.dxcall,False, self.state.name, statistics=session_stats) if self.ctx.config_manager.config['STATION']['enable_stats']: self.statistics.push(self.state.name, session_stats, self.dxcall) From 6a04e145bf9afeb3cb878915aff7087c18ce9ba6 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Tue, 6 May 2025 09:57:14 +0200 Subject: [PATCH 33/36] delete arq session after not needed anymore --- freedata_server/arq_session_irs.py | 6 ++++++ freedata_server/p2p_connection.py | 24 ++++++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/freedata_server/arq_session_irs.py b/freedata_server/arq_session_irs.py index 65e217b4..3d33cf0f 100644 --- a/freedata_server/arq_session_irs.py +++ b/freedata_server/arq_session_irs.py @@ -113,6 +113,12 @@ class ARQSessionIRS(arq_session.ARQSession): # instance of p2p connection self.running_p2p_connection = None + try: + self.running_p2p_connection = self.ctx.state_manager.get_p2p_connection_session(self.id) + # register arq session in p2p connection + self.running_p2p_connection.running_arq_session = self + except Exception as e: + self.log("Error getting p2p connection session") self.type_byte = None self.total_length = 0 diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index b3306f34..e8d48b4f 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -482,6 +482,8 @@ class P2PConnection: self.running_arq_session.abort_transmission() except Exception as e: self.log(f"Error stopping ARQ session {e}") + finally: + self.delete_arq_session() def received_disconnect(self, frame): self.log("DISCONNECTED...............") @@ -532,12 +534,16 @@ class P2PConnection: self.last_data_timestamp = time.time() self.set_state(States.CONNECTED) self.is_Master = True + self.delete_arq_session() self.running_arq_session = None def received_arq(self, received_data): self.last_data_timestamp = time.time() self.set_state(States.CONNECTED) self.running_arq_session = None + + self.delete_arq_session() + try: if self.ctx.socket_interface_manager and hasattr(self.ctx.socket_interface_manager.data_server, "data_handler"): self.log(f"sending {len(received_data)} bytes to data socket client") @@ -548,3 +554,21 @@ class P2PConnection: def failed_arq(self): self.set_state(States.CONNECTED) + self.delete_arq_session() + + def delete_arq_session(self): + """ + Delete both ARQ IRS and ISS sessions. + For each session type, retrieve the session by ID and then remove it. + Log any errors encountered during retrieval or removal. + """ + for kind in ('irs', 'iss'): + get = getattr(self.ctx.state_manager, f"get_arq_{kind}_session") + remove = getattr(self.ctx.state_manager, f"remove_arq_{kind}_session") + try: + # Retrieve the session (may raise if not found) + session = get(self.session_id) + # Remove the session + remove(session.id) + except Exception as e: + self.log(f"Error handling ARQ {kind.upper()} session: {e}", isWarning=True) From 41370087ec81b80bae38880b39106d2baf73e3b7 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Tue, 6 May 2025 11:33:46 +0200 Subject: [PATCH 34/36] add iamalive and buffer command --- freedata_server/arq_session_irs.py | 12 +++++++--- freedata_server/p2p_connection.py | 24 ++++++++++++++++++-- freedata_server/socket_interface_commands.py | 6 +++++ 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/freedata_server/arq_session_irs.py b/freedata_server/arq_session_irs.py index 3d33cf0f..cb1d8cb6 100644 --- a/freedata_server/arq_session_irs.py +++ b/freedata_server/arq_session_irs.py @@ -5,7 +5,7 @@ from modem_frametypes import FRAME_TYPE from codec2 import FREEDV_MODE from enum import Enum import time - +from p2p_connection import States as P2PStates class IRS_State(Enum): """Enumeration representing the states of an IRS (Information Receiving Station) ARQ session. @@ -82,6 +82,7 @@ class ARQSessionIRS(arq_session.ARQSession): }, IRS_State.RESUME: { FRAME_TYPE.ARQ_SESSION_OPEN.value: 'send_open_ack', + } @@ -117,6 +118,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.running_p2p_connection = self.ctx.state_manager.get_p2p_connection_session(self.id) # register arq session in p2p connection self.running_p2p_connection.running_arq_session = self + self.running_p2p_connection.set_state(P2PStates.ARQ_SESSION) except Exception as e: self.log("Error getting p2p connection session") @@ -288,8 +290,12 @@ class ARQSessionIRS(arq_session.ARQSession): # update p2p connection timeout if self.running_p2p_connection: - self.running_p2p_connection.last_data_timestamp = time.time() - self.running_p2p_connection.running_arq_session = self + try: + self.running_p2p_connection.last_data_timestamp = time.time() + self.running_p2p_connection.running_arq_session = self + self.running_p2p_connection.set_state(P2PStates.ARQ_SESSION) + except Exception as e: + self.log(f"Error handling P2P states: {e}") remaining_data_length = self.total_length - self.received_bytes self.log(f"Remaining data: {remaining_data_length}", isWarning=True) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index e8d48b4f..d093aa7e 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -89,6 +89,7 @@ class P2PConnection: States.FAILED: { FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', }, } @@ -143,13 +144,15 @@ class P2PConnection: """Starts a worker thread to monitor the transmit data queue and process data.""" def data_processing_worker(): + last_isalive = 0 while True: - if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state not in [States.DISCONNECTING, States.DISCONNECTED, States.ARQ_SESSION, States.FAILED, States.PAYLOAD_TRANSMISSION]: + now = time.time() + if now > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state not in [States.DISCONNECTING, States.DISCONNECTED, States.ARQ_SESSION, States.FAILED, States.PAYLOAD_TRANSMISSION]: self.disconnect() return # this is our heartbeat logic, only ISS will run it - if time.time() > self.last_data_timestamp + 5 and self.state in [States.CONNECTED, States.PAYLOAD_SENT] and self.is_ISS and not self.transmission_in_progress: + if now > self.last_data_timestamp + 5 and self.state in [States.CONNECTED, States.PAYLOAD_SENT] and self.is_ISS and not self.transmission_in_progress: self.log("Sending heartbeat") if self.p2p_data_tx_queue.empty(): @@ -162,6 +165,13 @@ class P2PConnection: threading.Event().wait(0.5) self.process_data_queue() + + # call every 60s + if now - last_isalive >= 60: + self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_iamalive() + + last_isalive = now + threading.Event().wait(0.100) @@ -302,6 +312,9 @@ class P2PConnection: self.is_Master = True + buffer_size = self.get_tx_queue_buffer_size() + self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_buffer_size(buffer_size) + raw_data = self.p2p_data_tx_queue.get() sequence_id = random.randint(0,255) @@ -376,6 +389,10 @@ class P2PConnection: self.log("Moving to next payload...") self.set_state(States.PAYLOAD_SENT) + buffer_size = self.get_tx_queue_buffer_size() + self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_buffer_size(buffer_size) + + def transmit_heartbeat(self, has_data=False, announce_arq=False): # heartbeats will be transmit by ISS only, therefore only IRS can reveice heartbeat ack self.last_data_timestamp = time.time() @@ -572,3 +589,6 @@ class P2PConnection: remove(session.id) except Exception as e: self.log(f"Error handling ARQ {kind.upper()} session: {e}", isWarning=True) + + def get_tx_queue_buffer_size(self): + return sum(len(item) for item in list(self.p2p_data_tx_queue.queue)) diff --git a/freedata_server/socket_interface_commands.py b/freedata_server/socket_interface_commands.py index 6ea2ead9..ca02b29f 100644 --- a/freedata_server/socket_interface_commands.py +++ b/freedata_server/socket_interface_commands.py @@ -113,6 +113,12 @@ class SocketCommandHandler: self.send_response(f"LINK REGISTERED") self.send_response(message) + def socket_respond_iamalive(self): + self.send_response(f"IAMALIVE") + + def socket_respond_buffer_size(self, buffer_size): + self.send_response(f"BUFFER {buffer_size}") + def socket_respond_ptt(self, state): """ send the PTT state via command socket""" if state: From e2450d2266bb7b07c974db2528c04843cf1603d8 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Tue, 6 May 2025 12:13:26 +0200 Subject: [PATCH 35/36] add iamalive and buffer command --- freedata_server/arq_session_irs.py | 1 + freedata_server/demodulator.py | 14 ++++++++++---- freedata_server/p2p_connection.py | 10 ++++++---- freedata_server/socket_interface_commands.py | 6 ++++-- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/freedata_server/arq_session_irs.py b/freedata_server/arq_session_irs.py index cb1d8cb6..1b7f5d4a 100644 --- a/freedata_server/arq_session_irs.py +++ b/freedata_server/arq_session_irs.py @@ -82,6 +82,7 @@ class ARQSessionIRS(arq_session.ARQSession): }, IRS_State.RESUME: { FRAME_TYPE.ARQ_SESSION_OPEN.value: 'send_open_ack', + FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data', } diff --git a/freedata_server/demodulator.py b/freedata_server/demodulator.py index 7d68dd9f..333852dd 100644 --- a/freedata_server/demodulator.py +++ b/freedata_server/demodulator.py @@ -156,6 +156,9 @@ class Demodulator(): bytes_per_frame= self.MODE_DICT[mode]["bytes_per_frame"] state_buffer = self.MODE_DICT[mode]["state_buffer"] mode_name = self.MODE_DICT[mode]["name"] + + last_rx_status = 0 + try: while self.stream and self.stream.active and not self.shutdown_flag.is_set(): threading.Event().wait(0.01) @@ -176,10 +179,10 @@ class Demodulator(): if rx_status not in [0]: self.is_codec2_traffic_counter = self.is_codec2_traffic_cooldown - self.log.debug( - f"[MDM] [demod_audio] [mode={mode_name}]", rx_status=rx_status, - sync_flag=codec2.api.rx_sync_flags_to_text[rx_status] - ) + + if last_rx_status != rx_status: + self.log.debug(f"[MDM] [DEMOD] [mode={mode_name}] [State: {last_rx_status} >>> {rx_status}]", sync_flag=codec2.api.rx_sync_flags_to_text[rx_status]) + last_rx_status = rx_status # decrement codec traffic counter for making state smoother if self.is_codec2_traffic_counter > 0: @@ -191,6 +194,9 @@ class Demodulator(): state_buffer.append(rx_status) else: nbytes = 0 + self.reset_data_sync() + audiobuffer.nbuffer = 0 + audiobuffer.pop(nin) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index d093aa7e..695b9b7a 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -168,8 +168,10 @@ class P2PConnection: # call every 60s if now - last_isalive >= 60: - self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_iamalive() - + try: + self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_iamalive() + except Exception as e: + self.log("Failed to send IAMALIVE command", e) last_isalive = now threading.Event().wait(0.100) @@ -243,7 +245,7 @@ class P2PConnection: retries = retries - 1 #self.connected_iss() # override connection state for simulation purposes - self.session_failed() + #self.session_failed() def launch_twr(self, frame_or_burst, timeout, retries, mode): twr = threading.Thread(target = self.transmit_wait_and_retry, args=[frame_or_burst, timeout, retries, mode], daemon=True) @@ -398,7 +400,7 @@ class P2PConnection: self.last_data_timestamp = time.time() heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_has_data=has_data, flag_announce_arq=announce_arq) - self.launch_twr(heartbeat, 6, 10, mode=FREEDV_MODE.signalling) + self.launch_twr(heartbeat, 5, 10, mode=FREEDV_MODE.signalling) def transmit_heartbeat_ack(self): self.log("Transmitting heartbeat ACK") diff --git a/freedata_server/socket_interface_commands.py b/freedata_server/socket_interface_commands.py index ca02b29f..ee9d2fdd 100644 --- a/freedata_server/socket_interface_commands.py +++ b/freedata_server/socket_interface_commands.py @@ -114,8 +114,10 @@ class SocketCommandHandler: self.send_response(message) def socket_respond_iamalive(self): - self.send_response(f"IAMALIVE") - + try: + self.send_response(f"IAMALIVE") + except Exception as e: + self.log(f"sending iamalive failed {e}") def socket_respond_buffer_size(self, buffer_size): self.send_response(f"BUFFER {buffer_size}") From eecdf3bae06ffc896fd6424ad6879ccc699bbee9 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Tue, 6 May 2025 12:35:12 +0200 Subject: [PATCH 36/36] first full run of wl session --- freedata_server/p2p_connection.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index 695b9b7a..e6410598 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -126,6 +126,7 @@ class P2PConnection: self.TIMEOUT_CONNECT = 5 self.TIMEOUT_DATA = 5 self.RETRIES_DATA = 5 + self.TIMEOUT_HEARTBEAT = 10 self.ENTIRE_CONNECTION_TIMEOUT = 180 self.is_ISS = False # Indicator, if we are ISS or IRS @@ -222,7 +223,6 @@ class P2PConnection: self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}") def transmit_frame(self, frame: bytearray, mode='auto'): - self.log("Transmitting frame") if mode in ['auto']: mode = self.get_mode_by_speed_level(self.speed_level) @@ -254,7 +254,7 @@ class P2PConnection: def transmit_and_wait_irs(self, frame, timeout, mode): self.event_frame_received.clear() self.transmit_frame(frame, mode) - self.log(f"Waiting {timeout} seconds...") + #self.log(f"Waiting {timeout} seconds...") #if not self.event_frame_received.wait(timeout): # self.log("Timeout waiting for ISS. Session failed.") # self.transmission_failed() @@ -398,9 +398,10 @@ class P2PConnection: def transmit_heartbeat(self, has_data=False, announce_arq=False): # heartbeats will be transmit by ISS only, therefore only IRS can reveice heartbeat ack self.last_data_timestamp = time.time() - + if self.ctx.state_manager.is_receiving_codec2_signal(): + return heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_has_data=has_data, flag_announce_arq=announce_arq) - self.launch_twr(heartbeat, 5, 10, mode=FREEDV_MODE.signalling) + self.launch_twr(heartbeat, self.TIMEOUT_HEARTBEAT, 10, mode=FREEDV_MODE.signalling) def transmit_heartbeat_ack(self): self.log("Transmitting heartbeat ACK") @@ -447,7 +448,6 @@ class P2PConnection: self.log("Opposite station announced ARQ, changing state") self.is_Master = False self.set_state(States.ARQ_SESSION) - self.log("transmit heartbeat ack") self.transmit_heartbeat_ack() def received_heartbeat_ack(self, frame):