From 6da8be56f9cb7282ee52c7595f223829805c4503 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Thu, 1 May 2025 23:00:18 +0200 Subject: [PATCH] attempt using an buffer empty indicator for iss --- freedata_server/data_frame_factory.py | 38 ++++++++++++++++++++++++--- freedata_server/p2p_connection.py | 24 +++++++++++++++++ 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/freedata_server/data_frame_factory.py b/freedata_server/data_frame_factory.py index 1867037e..0012e3d9 100644 --- a/freedata_server/data_frame_factory.py +++ b/freedata_server/data_frame_factory.py @@ -23,6 +23,10 @@ class DataFrameFactory: 'AWAY_FROM_KEY': 0, # Bit-position for indicating the AWAY FROM KEY state } + P2P_FLAGS = { + 'BUFFER_EMPTY': 0, # Bit-position for indicating the BUFFER EMPTY state + } + def __init__(self, ctx): self.ctx = ctx @@ -170,8 +174,9 @@ class DataFrameFactory: # heartbeat for "is alive" self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT.value] = { - "frame_length": self.LENGTH_SIG1_FRAME, + "frame_length": self.LENGTH_ACK_FRAME, "session_id": 1, + "flag": 1 } # ack heartbeat @@ -185,6 +190,16 @@ class DataFrameFactory: "frame_length": None, "session_id": 1, "sequence_id": 1, + "flag": 1, + "data": "dynamic", + } + + # p2p payload frames + self.template_list[FR_TYPE.P2P_CONNECTION_PAYLOAD.value] = { + "frame_length": None, + "session_id": 1, + "sequence_id": 1, + "flag": 1, "data": "dynamic", } @@ -305,6 +320,12 @@ class DataFrameFactory: # get_flag returns True or False based on the bit value at the flag's position extracted_data[key][flag] = helpers.get_flag(data, flag, flag_dict) + if frametype in [FR_TYPE.P2P_CONNECTION_PAYLOAD.value, FR_TYPE.P2P_CONNECTION_HEARTBEAT]: + flag_dict = self.P2P_FLAGS + for flag in flag_dict: + # Update extracted_data with the status of each flag + # get_flag returns True or False based on the bit value at the flag's position + extracted_data[key][flag] = helpers.get_flag(data, flag, flag_dict) else: extracted_data[key] = data @@ -519,9 +540,14 @@ class DataFrameFactory: } return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT_ACK, payload) - def build_p2p_connection_heartbeat(self, session_id): + def build_p2p_connection_heartbeat(self, session_id, flag_buffer_empty=False): + flag = 0b00000000 + if flag_buffer_empty: + flag = helpers.set_flag(flag, 'BUFFER_EMPTY', True, self.P2P_FLAGS) + payload = { "session_id": session_id.to_bytes(1, 'big'), + "flag": flag.to_bytes(1, 'big'), } return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT, payload) @@ -531,10 +557,16 @@ class DataFrameFactory: } return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK, payload) - def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes): + def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes, flag_buffer_empty=False): + flag = 0b00000000 + if flag_buffer_empty: + flag = helpers.set_flag(flag, 'BUFFER_EMPTY', True, self.P2P_FLAGS) + + payload = { "session_id": session_id.to_bytes(1, 'big'), "sequence_id": sequence_id.to_bytes(1, 'big'), + "flag": flag.to_bytes(1, 'big'), "data": data, } print(self.get_bytes_per_frame(freedv_mode)) diff --git a/freedata_server/p2p_connection.py b/freedata_server/p2p_connection.py index ccd217d1..31cbfece 100644 --- a/freedata_server/p2p_connection.py +++ b/freedata_server/p2p_connection.py @@ -47,6 +47,7 @@ class P2PConnection: States.PAYLOAD_SENT: { FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data', FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat', }, States.ARQ_SESSION: { @@ -95,6 +96,9 @@ class P2PConnection: self.session_id = self.generate_id() self.event_frame_received = threading.Event() + # set if iss buffer is empty + self.iss_buffer_empty = threading.Event() + self.RETRIES_CONNECT = 3 self.TIMEOUT_CONNECT = 5 @@ -221,6 +225,9 @@ class P2PConnection: self.set_state(States.CONNECTED) self.is_ISS = True + # iss starts sending data + self.iss_buffer_empty.set() + self.log(frame) if self.ctx.socket_interface_manager and hasattr(self.ctx.socket_interface_manager.command_server, "command_handler"): self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) @@ -254,7 +261,16 @@ class P2PConnection: def process_data_queue(self, frame=None): if self.p2p_data_tx_queue.empty(): + print("buffer empty....") + payload = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=True) + self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=FREEDV_MODE.signalling_ack) + self.set_state(States.PAYLOAD_SENT) + time.sleep(5) return + + if not self.iss_buffer_empty.is_set(): + return + print("processing data....") data = self.p2p_data_tx_queue.get() @@ -277,6 +293,8 @@ class P2PConnection: def received_data(self, frame): self.log(f"received data...: {frame}") + self.iss_buffer_empty = threading.Event() + ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0) self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling_ack) @@ -296,6 +314,12 @@ class P2PConnection: print("transmitted data...") self.set_state(States.CONNECTED) + def received_heartbeat(self, frame): + print(frame) + print("received heartbeat...") + self.iss_buffer_empty.set() + + def disconnect(self): if self.state not in [States.DISCONNECTING, States.DISCONNECTED]: self.set_state(States.DISCONNECTING)