Merge pull request #976 from DJ2LS/ls-winlink

more work on socket interface
pull/972/head^2
DJ2LS 2025-05-06 12:42:00 +02:00 committed by GitHub
commit d9645e6544
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 354 additions and 119 deletions

View File

@ -393,7 +393,6 @@ class ARQDataTypeHandler:
Returns:
The ZLIB-compressed data as a bytearray.
"""
compressed_data = lzma.compress(data)
compressor = zlib.compressobj(level=6, wbits=-zlib.MAX_WBITS, strategy=zlib.Z_FILTERED)
compressed_data = compressor.compress(data) + compressor.flush()
@ -468,65 +467,49 @@ class ARQDataTypeHandler:
def prepare_p2p_connection(self, data):
"""Prepares P2P connection data for transmission using GZIP compression.
"""Prepares P2P connection data for transmission using ZLIB compression.
This method compresses the given data using GZIP and logs the size of the
data before and after compression.
This method compresses the provided data using ZLIB with specific settings for
peer-to-peer connection sessions. It logs the size of the data before and after
compression and prints the current P2P connection sessions.
Args:
data: The bytearray containing the P2P connection data to be compressed.
Returns:
The GZIP-compressed data as a bytearray.
The ZLIB-compressed data as a bytearray.
"""
compressed_data = gzip.compress(data)
self.log(f"Preparing gzip compressed P2P_CONNECTION data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
compressor = zlib.compressobj(level=6, wbits=-zlib.MAX_WBITS, strategy=zlib.Z_FILTERED)
compressed_data = compressor.compress(data) + compressor.flush()
self.log(f"Preparing zlib compressed P2P_CONNECTION data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
print(self.ctx.state_manager.p2p_connection_sessions)
return compressed_data
def handle_p2p_connection(self, data, statistics):
"""Handles GZIP compressed P2P connection data.
This method decompresses the provided GZIP data, logs size information,
and then distributes the decompressed data to active P2P connection sessions.
decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
decompressed_data = decompressor.decompress(data)
decompressed_data += decompressor.flush()
Args:
data: The bytearray containing the GZIP compressed P2P connection data.
statistics: A dictionary containing statistics related to the ARQ session.
"""
decompressed_data = gzip.decompress(data)
self.log(f"Handling gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes")
for session_id in self.ctx.state_manager.p2p_connection_sessions:
self.ctx.state_manager.p2p_connection_sessions[session_id].received_arq(decompressed_data)
def failed_p2p_connection(self, data, statistics):
"""Handles failed GZIP compressed P2P connection data.
This method decompresses the provided GZIP data, logs size information
and an error message, and then returns the decompressed data.
Args:
data: The bytearray containing the GZIP compressed P2P connection data.
statistics: A dictionary containing statistics related to the ARQ session.
Returns:
The decompressed data as a bytearray.
"""
decompressed_data = gzip.decompress(data)
decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
decompressed_data = decompressor.decompress(data)
decompressed_data += decompressor.flush()
self.log(f"Handling failed gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes", isWarning=True)
for session_id in self.ctx.state_manager.p2p_connection_sessions:
self.ctx.state_manager.p2p_connection_sessions[session_id].failed_arq()
def transmitted_p2p_connection(self, data, statistics):
"""Handles the successful transmission of GZIP compressed P2P connection data.
This method decompresses the provided GZIP data and then calls the
`transmitted_arq` method for each active P2P connection session.
Args:
data: The bytearray containing the GZIP compressed P2P connection data.
statistics: A dictionary containing statistics related to the ARQ session.
"""
decompressed_data = gzip.decompress(data)
decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
decompressed_data = decompressor.decompress(data)
decompressed_data += decompressor.flush()
for session_id in self.ctx.state_manager.p2p_connection_sessions:
self.ctx.state_manager.p2p_connection_sessions[session_id].transmitted_arq(decompressed_data)

View File

@ -5,7 +5,7 @@ from modem_frametypes import FRAME_TYPE
from codec2 import FREEDV_MODE
from enum import Enum
import time
from p2p_connection import States as P2PStates
class IRS_State(Enum):
"""Enumeration representing the states of an IRS (Information Receiving Station) ARQ session.
@ -82,6 +82,8 @@ class ARQSessionIRS(arq_session.ARQSession):
},
IRS_State.RESUME: {
FRAME_TYPE.ARQ_SESSION_OPEN.value: 'send_open_ack',
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
}
@ -113,6 +115,13 @@ class ARQSessionIRS(arq_session.ARQSession):
# instance of p2p connection
self.running_p2p_connection = None
try:
self.running_p2p_connection = self.ctx.state_manager.get_p2p_connection_session(self.id)
# register arq session in p2p connection
self.running_p2p_connection.running_arq_session = self
self.running_p2p_connection.set_state(P2PStates.ARQ_SESSION)
except Exception as e:
self.log("Error getting p2p connection session")
self.type_byte = None
self.total_length = 0
@ -282,7 +291,12 @@ class ARQSessionIRS(arq_session.ARQSession):
# update p2p connection timeout
if self.running_p2p_connection:
self.running_p2p_connection.last_data_timestamp = time.time()
try:
self.running_p2p_connection.last_data_timestamp = time.time()
self.running_p2p_connection.running_arq_session = self
self.running_p2p_connection.set_state(P2PStates.ARQ_SESSION)
except Exception as e:
self.log(f"Error handling P2P states: {e}")
remaining_data_length = self.total_length - self.received_bytes
self.log(f"Remaining data: {remaining_data_length}", isWarning=True)
@ -297,6 +311,8 @@ class ARQSessionIRS(arq_session.ARQSession):
self.received_data[frame['offset']:] = data_part
#self.received_bytes += len(data_part)
self.received_bytes = len(self.received_data)
print(self.received_bytes)
print(self.received_data)
self.log(f"Received {self.received_bytes}/{self.total_length} bytes")
self.ctx.event_manager.send_arq_session_progress(
False, self.id, self.dxcall, self.received_bytes, self.total_length, self.state.name, self.speed_level, self.calculate_session_statistics(self.received_bytes, self.total_length))
@ -355,7 +371,7 @@ class ARQSessionIRS(arq_session.ARQSession):
flag_final=True,
flag_checksum=False)
self.transmit_frame(ack, mode=FREEDV_MODE.signalling_ack)
self.log("CRC fail at the end of transmission!")
self.log("CRC fail at the end of transmission!", isWarning=True)
return self.transmission_failed()
@ -474,7 +490,8 @@ class ARQSessionIRS(arq_session.ARQSession):
self.log("Transmission failed!")
#self.ctx.rf_modem.demodulator.set_decode_mode()
session_stats = self.calculate_session_statistics(self.received_bytes, self.total_length)
self.received_data = None
self.received_bytes = 0
self.ctx.event_manager.send_arq_session_finished(True, self.id, self.dxcall,False, self.state.name, statistics=session_stats)
if self.ctx.config_manager.config['STATION']['enable_stats']:
self.statistics.push(self.state.name, session_stats, self.dxcall)

View File

@ -24,7 +24,8 @@ class DataFrameFactory:
}
P2P_FLAGS = {
'BUFFER_EMPTY': 0, # Bit-position for indicating the BUFFER EMPTY state
'HAS_DATA': 0, # Bit-position for indicating the BUFFER EMPTY state
'ANNOUNCE_ARQ': 1, # Bit-position for announcing an ARQ session
}
def __init__(self, ctx):
@ -174,7 +175,7 @@ class DataFrameFactory:
# heartbeat for "is alive"
self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT.value] = {
"frame_length": self.LENGTH_ACK_FRAME,
"frame_length": self.LENGTH_SIG1_FRAME,
"session_id": 1,
"flag": 1
}
@ -183,6 +184,7 @@ class DataFrameFactory:
self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value] = {
"frame_length": self.LENGTH_SIG1_FRAME,
"session_id": 1,
"flag": 1,
}
# p2p payload frames
@ -241,13 +243,12 @@ class DataFrameFactory:
for key, item_length in frame_template.items():
if key == "frame_length":
continue
if not isinstance(item_length, int):
item_length = len(content[key])
print(frame_length)
print(item_length)
print(content)
#print(frame_length)
#print(item_length)
#print(content)
if buffer_position + item_length > frame_length:
raise OverflowError("Frame data overflow!")
frame[buffer_position: buffer_position + item_length] = content[key]
@ -270,14 +271,13 @@ class DataFrameFactory:
frame_template = self.template_list.get(frametype)
extracted_data = {"frame_type": FR_TYPE(frametype).name, "frame_type_int": frametype}
for key, item_length in frame_template.items():
if key == "frame_length":
continue
# data is always on the last payload slots
if item_length in ["dynamic"] and key in["data"]:
print(len(frame))
#print(len(frame))
data = frame[buffer_position:-2]
item_length = len(data)
else:
@ -320,7 +320,7 @@ class DataFrameFactory:
# get_flag returns True or False based on the bit value at the flag's position
extracted_data[key][flag] = helpers.get_flag(data, flag, flag_dict)
if frametype in [FR_TYPE.P2P_CONNECTION_PAYLOAD.value, FR_TYPE.P2P_CONNECTION_HEARTBEAT]:
if frametype in [FR_TYPE.P2P_CONNECTION_PAYLOAD.value, FR_TYPE.P2P_CONNECTION_HEARTBEAT.value, FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value]:
flag_dict = self.P2P_FLAGS
for flag in flag_dict:
# Update extracted_data with the status of each flag
@ -540,10 +540,12 @@ class DataFrameFactory:
}
return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT_ACK, payload)
def build_p2p_connection_heartbeat(self, session_id, flag_buffer_empty=False):
def build_p2p_connection_heartbeat(self, session_id, flag_has_data=False, flag_announce_arq=False):
flag = 0b00000000
if flag_buffer_empty:
flag = helpers.set_flag(flag, 'BUFFER_EMPTY', True, self.P2P_FLAGS)
if flag_has_data:
flag = helpers.set_flag(flag, 'HAS_DATA', True, self.P2P_FLAGS)
if flag_announce_arq:
flag = helpers.set_flag(flag, 'ANNOUNCE_ARQ', True, self.P2P_FLAGS)
payload = {
"session_id": session_id.to_bytes(1, 'big'),
@ -551,17 +553,25 @@ class DataFrameFactory:
}
return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT, payload)
def build_p2p_connection_heartbeat_ack(self, session_id):
def build_p2p_connection_heartbeat_ack(self, session_id, flag_has_data=False, flag_announce_arq=False):
flag = 0b00000000
if flag_has_data:
flag = helpers.set_flag(flag, 'HAS_DATA', True, self.P2P_FLAGS)
if flag_announce_arq:
flag = helpers.set_flag(flag, 'ANNOUNCE_ARQ', True, self.P2P_FLAGS)
payload = {
"session_id": session_id.to_bytes(1, 'big'),
"flag": flag.to_bytes(1, 'big'),
}
return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK, payload)
def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes, flag_buffer_empty=False):
def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes, flag_has_data=False, flag_announce_arq=False):
flag = 0b00000000
if flag_buffer_empty:
flag = helpers.set_flag(flag, 'BUFFER_EMPTY', True, self.P2P_FLAGS)
if flag_has_data:
flag = helpers.set_flag(flag, 'HAS_DATA', True, self.P2P_FLAGS)
if flag_announce_arq:
flag = helpers.set_flag(flag, 'ANNOUNCE_ARQ', True, self.P2P_FLAGS)
payload = {
"session_id": session_id.to_bytes(1, 'big'),
@ -569,7 +579,7 @@ class DataFrameFactory:
"flag": flag.to_bytes(1, 'big'),
"data": data,
}
print(self.get_bytes_per_frame(freedv_mode))
#print(self.get_bytes_per_frame(freedv_mode))
return self.construct(
FR_TYPE.P2P_CONNECTION_PAYLOAD,
payload,

View File

@ -156,6 +156,9 @@ class Demodulator():
bytes_per_frame= self.MODE_DICT[mode]["bytes_per_frame"]
state_buffer = self.MODE_DICT[mode]["state_buffer"]
mode_name = self.MODE_DICT[mode]["name"]
last_rx_status = 0
try:
while self.stream and self.stream.active and not self.shutdown_flag.is_set():
threading.Event().wait(0.01)
@ -176,10 +179,10 @@ class Demodulator():
if rx_status not in [0]:
self.is_codec2_traffic_counter = self.is_codec2_traffic_cooldown
self.log.debug(
f"[MDM] [demod_audio] [mode={mode_name}]", rx_status=rx_status,
sync_flag=codec2.api.rx_sync_flags_to_text[rx_status]
)
if last_rx_status != rx_status:
self.log.debug(f"[MDM] [DEMOD] [mode={mode_name}] [State: {last_rx_status} >>> {rx_status}]", sync_flag=codec2.api.rx_sync_flags_to_text[rx_status])
last_rx_status = rx_status
# decrement codec traffic counter for making state smoother
if self.is_codec2_traffic_counter > 0:
@ -191,6 +194,9 @@ class Demodulator():
state_buffer.append(rx_status)
else:
nbytes = 0
self.reset_data_sync()
audiobuffer.nbuffer = 0
audiobuffer.pop(nin)

View File

@ -41,9 +41,10 @@ class DISPATCHER:
"name": "P2P Connection PAYLOAD"},
FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: {"class": P2PConnectionFrameHandler,
"name": "P2P Connection PAYLOAD ACK"},
#FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"},
#FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"},
FR_TYPE.P2P_CONNECTION_HEARTBEAT.value: {"class": P2PConnectionFrameHandler,
"name": "P2P Connection HEARTBEAT"},
FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: {"class": P2PConnectionFrameHandler,
"name": "P2P Connection HEARTBEAT ACK"},
FR_TYPE.ARQ_STOP.value: {"class": ARQFrameHandler, "name": "ARQ STOP"},
FR_TYPE.ARQ_STOP_ACK.value: {"class": ARQFrameHandler, "name": "ARQ STOP ACK"},
FR_TYPE.BEACON.value: {"class": BeaconFrameHandler, "name": "BEACON"},

View File

@ -108,7 +108,7 @@ class FrameHandler():
#check for p2p connection
elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK', 'P2P_CONNECTION_DISCONNECT', 'P2P_CONNECTION_DISCONNECT_ACK']:
elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK', 'P2P_CONNECTION_HEARTBEAT','P2P_CONNECTION_HEARTBEAT_ACK', 'P2P_CONNECTION_DISCONNECT', 'P2P_CONNECTION_DISCONNECT_ACK']:
session_id = self.details['frame']['session_id']
if session_id in self.ctx.state_manager.p2p_connection_sessions:
valid = True

View File

@ -54,6 +54,8 @@ class P2PConnectionFrameHandler(frame_handler.FrameHandler):
FR.P2P_CONNECTION_DISCONNECT_ACK.value,
FR.P2P_CONNECTION_PAYLOAD.value,
FR.P2P_CONNECTION_PAYLOAD_ACK.value,
FR.P2P_CONNECTION_HEARTBEAT.value,
FR.P2P_CONNECTION_HEARTBEAT_ACK.value,
]:
session = self.ctx.state_manager.get_p2p_connection_session(session_id)

View File

@ -7,12 +7,10 @@ import structlog
import random
from queue import Queue
import time
from command_arq_raw import ARQRawCommand
import numpy as np
import base64
from arq_data_type_handler import ARQDataTypeHandler, ARQ_SESSION_TYPES
from arq_session_iss import ARQSessionISS
import helpers
import zlib
class States(Enum):
NEW = 0
@ -20,8 +18,8 @@ class States(Enum):
CONNECT_SENT = 2
CONNECT_ACK_SENT = 3
CONNECTED = 4
#HEARTBEAT_SENT = 5
#HEARTBEAT_ACK_SENT = 6
AWAITING_DATA = 5
PAYLOAD_TRANSMISSION = 6
PAYLOAD_SENT = 7
ARQ_SESSION = 8
DISCONNECTING = 9
@ -34,6 +32,8 @@ class P2PConnection:
STATE_TRANSITION = {
States.NEW: {
FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs',
FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat',
},
States.CONNECTING: {
FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss',
@ -43,16 +43,34 @@ class P2PConnection:
FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss',
FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'received_data',
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat',
FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack',
},
States.PAYLOAD_TRANSMISSION: {
FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data',
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat',
FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack',
},
States.PAYLOAD_SENT: {
FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data',
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat',
FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack',
},
States.AWAITING_DATA: {
FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'received_data',
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat',
FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack',
},
States.ARQ_SESSION: {
FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data',
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
FRAME_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value: 'received_heartbeat_ack',
},
States.DISCONNECTING: {
@ -65,8 +83,13 @@ class P2PConnection:
},
States.ABORTED:{
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack',
},
States.FAILED: {
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack',
FRAME_TYPE.P2P_CONNECTION_HEARTBEAT.value: 'received_heartbeat',
},
}
@ -96,38 +119,63 @@ class P2PConnection:
self.session_id = self.generate_id()
self.event_frame_received = threading.Event()
# set if iss buffer is empty
self.iss_buffer_empty = threading.Event()
self.RETRIES_CONNECT = 3
self.TIMEOUT_CONNECT = 5
self.TIMEOUT_DATA = 5
self.RETRIES_DATA = 5
self.TIMEOUT_HEARTBEAT = 10
self.ENTIRE_CONNECTION_TIMEOUT = 180
self.is_ISS = False # Indicator, if we are ISS or IRS
self.is_Master = False # Indicator, if we are Maste or Not
self.last_data_timestamp= time.time()
self.start_data_processing_worker()
self.flag_has_data = False
self.flag_announce_arq = False
self.transmission_in_progress = False # indicatews, if we are waiting for an ongoing transmission
self.running_arq_session = None
def start_data_processing_worker(self):
"""Starts a worker thread to monitor the transmit data queue and process data."""
def data_processing_worker():
last_isalive = 0
while True:
if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state is not States.ARQ_SESSION:
now = time.time()
if now > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state not in [States.DISCONNECTING, States.DISCONNECTED, States.ARQ_SESSION, States.FAILED, States.PAYLOAD_TRANSMISSION]:
self.disconnect()
return
if time.time() > self.last_data_timestamp + 6 and self.state is States.CONNECTED:
# start sending data
self.iss_buffer_empty.set()
# this is our heartbeat logic, only ISS will run it
if now > self.last_data_timestamp + 5 and self.state in [States.CONNECTED, States.PAYLOAD_SENT] and self.is_ISS and not self.transmission_in_progress:
self.log("Sending heartbeat")
if not self.p2p_data_tx_queue.empty() and self.state == States.CONNECTED:
if self.p2p_data_tx_queue.empty():
self.flag_has_data = False
else:
self.flag_has_data = True
self.transmit_heartbeat(has_data=self.flag_has_data)
if self.state in [States.CONNECTED, States.PAYLOAD_SENT] and self.is_Master:
threading.Event().wait(0.5)
self.process_data_queue()
threading.Event().wait(0.500)
# call every 60s
if now - last_isalive >= 60:
try:
self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_iamalive()
except Exception as e:
self.log("Failed to send IAMALIVE command", e)
last_isalive = now
threading.Event().wait(0.100)
@ -150,7 +198,7 @@ class P2PConnection:
self.frequency_offset = frequency_offset
def log(self, message, isWarning = False):
msg = f"[{type(self).__name__}][id={self.session_id}][state={self.state}][ISS={bool(self.is_ISS)}]: {message}"
msg = f"[{type(self).__name__}][id={self.session_id}][state={self.state}][ISS={bool(self.is_ISS)}][Master={bool(self.is_Master)}]: {message}"
logger = self.logger.warn if isWarning else self.logger.info
logger(msg)
@ -170,13 +218,11 @@ class P2PConnection:
if frame_type in self.STATE_TRANSITION[self.state]:
action_name = self.STATE_TRANSITION[self.state][frame_type]
response = getattr(self, action_name)(frame)
return
self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}")
def transmit_frame(self, frame: bytearray, mode='auto'):
self.log("Transmitting frame")
if mode in ['auto']:
mode = self.get_mode_by_speed_level(self.speed_level)
@ -185,6 +231,7 @@ class P2PConnection:
def transmit_wait_and_retry(self, frame_or_burst, timeout, retries, mode):
while retries > 0:
self.event_frame_received = threading.Event()
self.transmission_in_progress = True
if isinstance(frame_or_burst, list): burst = frame_or_burst
else: burst = [frame_or_burst]
for f in burst:
@ -192,12 +239,13 @@ class P2PConnection:
self.event_frame_received.clear()
self.log(f"Waiting {timeout} seconds...")
if self.event_frame_received.wait(timeout):
self.transmission_in_progress = False
return
self.log("Timeout!")
retries = retries - 1
#self.connected_iss() # override connection state for simulation purposes
self.session_failed()
#self.session_failed()
def launch_twr(self, frame_or_burst, timeout, retries, mode):
twr = threading.Thread(target = self.transmit_wait_and_retry, args=[frame_or_burst, timeout, retries, mode], daemon=True)
@ -206,7 +254,7 @@ class P2PConnection:
def transmit_and_wait_irs(self, frame, timeout, mode):
self.event_frame_received.clear()
self.transmit_frame(frame, mode)
self.log(f"Waiting {timeout} seconds...")
#self.log(f"Waiting {timeout} seconds...")
#if not self.event_frame_received.wait(timeout):
# self.log("Timeout waiting for ISS. Session failed.")
# self.transmission_failed()
@ -228,11 +276,7 @@ class P2PConnection:
self.log("CONNECTED ISS...........................")
self.set_state(States.CONNECTED)
self.is_ISS = True
# start sending data
self.iss_buffer_empty.set()
self.log(frame)
self.is_Master = True
if self.ctx.socket_interface_manager and hasattr(self.ctx.socket_interface_manager.command_server, "command_handler"):
self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth)
@ -242,6 +286,7 @@ class P2PConnection:
self.ctx.socket_interface_manager.command_server.command_handler.session = self
self.set_state(States.CONNECTED)
self.is_ISS = False
self.is_Master = False
self.origin = frame["origin"]
self.destination = frame["destination"]
self.destination_crc = frame["destination_crc"]
@ -264,47 +309,71 @@ class P2PConnection:
def process_data_queue(self, frame=None):
if self.p2p_data_tx_queue.empty():
print("buffer empty....")
payload = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_buffer_empty=True)
self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=FREEDV_MODE.signalling_ack)
self.set_state(States.PAYLOAD_SENT)
time.sleep(5)
self.is_Master = False
return
if not self.iss_buffer_empty.is_set():
return
self.is_Master = True
print("processing data....")
buffer_size = self.get_tx_queue_buffer_size()
self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_buffer_size(buffer_size)
data = self.p2p_data_tx_queue.get()
raw_data = self.p2p_data_tx_queue.get()
sequence_id = random.randint(0,255)
compressor = zlib.compressobj(level=6, wbits=-zlib.MAX_WBITS, strategy=zlib.Z_FILTERED)
data = compressor.compress(raw_data) + compressor.flush()
self.log(f"Processing data....{len(data)} Bytes - {raw_data}")
if len(data) <= 11:
mode = FREEDV_MODE.signalling
elif 11 < len(data) <= 32:
mode = FREEDV_MODE.datac4
else:
self.transmit_arq(data)
self.log("Using ARQ for sending data...")
self.set_state(States.ARQ_SESSION)
self.transmit_arq(raw_data)
return
payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data)
self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA,mode=mode)
self.set_state(States.PAYLOAD_SENT)
# Additional return statement for avoiding wrong data.
if self.state is States.ARQ_SESSION:
return
self.log("Using burst for sending data...")
self.set_state(States.PAYLOAD_TRANSMISSION)
if self.p2p_data_tx_queue.empty():
self.flag_has_data = False
else:
self.flag_has_data = True
payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data, flag_has_data=self.flag_has_data)
self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=mode)
return
def received_data(self, frame):
self.log(f"received data...: {frame}")
self.iss_buffer_empty = threading.Event()
self.log(f"received data...")
ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0)
self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling_ack)
if not frame["flag"]["HAS_DATA"]:
self.set_state(States.CONNECTED)
else:
self.set_state(States.AWAITING_DATA)
try:
received_data = frame['data'].rstrip(b'\x00')
decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
received_data = decompressor.decompress(received_data)
received_data += decompressor.flush()
if self.ctx.socket_interface_manager and hasattr(self.ctx.socket_interface_manager.data_server, "data_handler"):
self.log(f"sending {len(received_data)} bytes to data socket client")
self.log(f"sending {len(received_data)} bytes to data socket client: {received_data}")
self.ctx.socket_interface_manager.data_server.data_handler.send_data_to_client(received_data)
except Exception as e:
@ -314,18 +383,105 @@ class P2PConnection:
def transmitted_data(self, frame):
print("transmitted data...")
self.set_state(States.CONNECTED)
self.log("Transmitted data...")
if self.p2p_data_tx_queue.empty():
self.log("Transmitted all data...")
self.set_state(States.CONNECTED)
else:
self.log("Moving to next payload...")
self.set_state(States.PAYLOAD_SENT)
buffer_size = self.get_tx_queue_buffer_size()
self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_buffer_size(buffer_size)
def transmit_heartbeat(self, has_data=False, announce_arq=False):
# heartbeats will be transmit by ISS only, therefore only IRS can reveice heartbeat ack
self.last_data_timestamp = time.time()
if self.ctx.state_manager.is_receiving_codec2_signal():
return
heartbeat = self.frame_factory.build_p2p_connection_heartbeat(self.session_id, flag_has_data=has_data, flag_announce_arq=announce_arq)
self.launch_twr(heartbeat, self.TIMEOUT_HEARTBEAT, 10, mode=FREEDV_MODE.signalling)
def transmit_heartbeat_ack(self):
self.log("Transmitting heartbeat ACK")
if self.p2p_data_tx_queue.empty():
self.flag_has_data = False
else:
self.flag_has_data = True
self.last_data_timestamp = time.time()
heartbeat_ack = self.frame_factory.build_p2p_connection_heartbeat_ack(self.session_id, flag_has_data=self.flag_has_data,flag_announce_arq=self.flag_announce_arq)
self.launch_twr_irs(heartbeat_ack, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling)
def received_heartbeat(self, frame):
print(frame)
print("received heartbeat...")
self.iss_buffer_empty.set()
# we don't accept heartbeats as ISS
if self.is_ISS:
return
# ensure we have correct state
if self.state is States.NEW:
self.connected_irs()
self.log("Received heartbeat...")
self.last_data_timestamp = time.time()
if frame["flag"]["HAS_DATA"]:
self.log("Opposite station has data")
self.is_Master = False
else:
if self.p2p_data_tx_queue.empty():
self.log("Opposite station's data buffer is empty as well -- We won't become data master now")
self.is_Master = False
self.flag_has_data = False
else:
self.log("Opposite station's data buffer is empty. We can become data master now")
self.is_Master = True
self.flag_has_data = True
if frame["flag"]["ANNOUNCE_ARQ"]:
self.log("Opposite station announced ARQ, changing state")
self.is_Master = False
self.set_state(States.ARQ_SESSION)
self.transmit_heartbeat_ack()
def received_heartbeat_ack(self, frame):
self.last_data_timestamp = time.time()
self.log("Received heartbeat ack from opposite...")
if frame["flag"]["HAS_DATA"] and not self.flag_has_data:
self.log("other station has data, we are not")
self.is_Master = False
self.set_state(States.AWAITING_DATA)
elif frame["flag"]["HAS_DATA"] and self.flag_has_data:
self.log("other station has data and we as well, we become master")
self.is_Master = True
self.set_state(States.CONNECTED)
else:
self.log("other station has no data, we become master now")
self.is_Master = True
self.set_state(States.CONNECTED)
if frame["flag"]["ANNOUNCE_ARQ"]:
self.log("other station announced arq, changing state")
self.is_Master = False
self.set_state(States.ARQ_SESSION)
self.event_frame_received.set()
def disconnect(self):
if self.state not in [States.DISCONNECTING, States.DISCONNECTED]:
if self.state not in [States.DISCONNECTING, States.DISCONNECTED, States.ARQ_SESSION]:
self.set_state(States.DISCONNECTING)
self.abort_arq()
disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id)
self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
return
@ -333,12 +489,27 @@ class P2PConnection:
def abort_connection(self):
# abort is a dirty disconnect
self.log("ABORTING...............")
self.abort_arq()
self.event_frame_received.set()
self.set_state(States.DISCONNECTED)
def abort_arq(self):
try:
if self.running_arq_session:
self.running_arq_session.abort_transmission()
except Exception as e:
self.log(f"Error stopping ARQ session {e}")
finally:
self.delete_arq_session()
def received_disconnect(self, frame):
self.log("DISCONNECTED...............")
self.set_state(States.DISCONNECTED)
self.abort_arq()
if self.ctx.socket_interface_manager:
self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_disconnected()
self.is_ISS = False
@ -351,6 +522,8 @@ class P2PConnection:
if self.ctx.socket_interface_manager:
self.ctx.socket_interface_manager.command_server.command_handler.socket_respond_disconnected()
def transmit_arq(self, data):
"""
This function needs to be fixed - we want to send ARQ data within a p2p connection
@ -363,13 +536,14 @@ class P2PConnection:
else:
arq_destination = self.origin
self.log(f"ARQ Destination: {self.destination}")
threading.Event().wait(3)
prepared_data, type_byte = self.arq_data_type_handler.prepare(data, ARQ_SESSION_TYPES.p2p_connection)
iss = ARQSessionISS(self.ctx, arq_destination, prepared_data, type_byte)
iss.id = self.session_id
# register p2p connection to arq session
iss.running_p2p_connection = self
self.running_arq_session = iss
if iss.id:
self.ctx.state_manager.register_arq_iss_session(iss)
iss.start()
@ -378,10 +552,16 @@ class P2PConnection:
def transmitted_arq(self, transmitted_data):
self.last_data_timestamp = time.time()
self.set_state(States.CONNECTED)
self.is_Master = True
self.delete_arq_session()
self.running_arq_session = None
def received_arq(self, received_data):
self.last_data_timestamp = time.time()
self.set_state(States.CONNECTED)
self.running_arq_session = None
self.delete_arq_session()
try:
if self.ctx.socket_interface_manager and hasattr(self.ctx.socket_interface_manager.data_server, "data_handler"):
@ -392,4 +572,25 @@ class P2PConnection:
self.log(f"Error sending data to socket: {e}")
def failed_arq(self):
self.set_state(States.CONNECTED)
self.set_state(States.CONNECTED)
self.delete_arq_session()
def delete_arq_session(self):
"""
Delete both ARQ IRS and ISS sessions.
For each session type, retrieve the session by ID and then remove it.
Log any errors encountered during retrieval or removal.
"""
for kind in ('irs', 'iss'):
get = getattr(self.ctx.state_manager, f"get_arq_{kind}_session")
remove = getattr(self.ctx.state_manager, f"remove_arq_{kind}_session")
try:
# Retrieve the session (may raise if not found)
session = get(self.session_id)
# Remove the session
remove(session.id)
except Exception as e:
self.log(f"Error handling ARQ {kind.upper()} session: {e}", isWarning=True)
def get_tx_queue_buffer_size(self):
return sum(len(item) for item in list(self.p2p_data_tx_queue.queue))

View File

@ -46,7 +46,10 @@ class SocketCommandHandler:
def handle_disconnect(self, data):
self.send_response(f"OK")
self.session.disconnect()
try:
self.session.disconnect()
except Exception as e:
self.log(f"Error disconnecting socket: {e}", isWarning = True)
def handle_mycall(self, data):
#Storing all of the callsigns assigned by client, to make sure they are checked later in new frames.
@ -63,7 +66,10 @@ class SocketCommandHandler:
def handle_abort(self, data):
# Logic for handling ABORT command
self.send_response(f"OK")
self.session.abort_connection()
try:
self.session.abort_connection()
except Exception as e:
self.send_response(f"ERR: {e}")
self.send_response(f"DISCONNECTED")
def handle_public(self, data):
@ -99,13 +105,22 @@ class SocketCommandHandler:
def socket_respond_connected(self, origin, destination, bandwidth):
print("[socket interface_commands] socket_respond_connected")
if self.ctx.socket_interface_manager.connecting_callsign:
message = f"CONNECTED {self.ctx.socket_interface_manager.connecting_callsign} {destination} {bandwidth}"
#message = f"CONNECTED {self.ctx.socket_interface_manager.connecting_callsign} {destination} {bandwidth}"
message = f"CONNECTED {origin} {destination} {bandwidth}"
else:
message = f"CONNECTED {origin} {destination} {bandwidth}"
self.send_response(f"UNENCRYPTED LINK")
self.send_response(f"LINK REGISTERED")
self.send_response(message)
def socket_respond_iamalive(self):
try:
self.send_response(f"IAMALIVE")
except Exception as e:
self.log(f"sending iamalive failed {e}")
def socket_respond_buffer_size(self, buffer_size):
self.send_response(f"BUFFER {buffer_size}")
def socket_respond_ptt(self, state):
""" send the PTT state via command socket"""
if state:

View File

@ -554,7 +554,7 @@ class StateManager:
bool: True if the session was registered, False otherwise.
"""
if session.session_id in self.p2p_connection_sessions:
print("session already registered...")
print("session already registered...", session.session_id)
return False
self.p2p_connection_sessions[session.session_id] = session
return True