FreeDATA/freedata_server/command.py

194 lines
6.5 KiB
Python

from data_frame_factory import DataFrameFactory
import queue
from codec2 import FREEDV_MODE
import structlog
from state_manager import StateManager
from arq_data_type_handler import ARQDataTypeHandler
class TxCommand:
"""Base class for transmit commands.
This class provides a common interface and basic functionality for all
transmit commands, including logging, parameter handling, frame building,
and transmission via the modem.
"""
def __init__(self, config: dict, state_manager: StateManager, event_manager, apiParams:dict = {}, socket_interface_manager=None):
"""Initializes a new TxCommand instance.
This method sets up the command with the given configuration, state
manager, event manager, API parameters, and socket command handler.
It also initializes a DataFrameFactory for building frames and an
ARQDataTypeHandler for ARQ data handling.
Args:
config (dict): The configuration dictionary.
state_manager (StateManager): The state manager object.
event_manager: The event manager object.
apiParams (dict, optional): API parameters for the command. Defaults to {}.
socket_command_handler (optional): The socket command handler object. Defaults to None.
"""
self.config = config
self.logger = structlog.get_logger(type(self).__name__)
self.state_manager = state_manager
self.event_manager = event_manager
self.set_params_from_api(apiParams)
self.frame_factory = DataFrameFactory(config)
self.arq_data_type_handler = ARQDataTypeHandler(event_manager, state_manager)
self.socket_interface_manager = socket_interface_manager
def log(self, message, isWarning = False):
"""Logs a message with the command's name.
This method logs a message prefixed with the command's name, using
either a warning or info log level based on the isWarning flag.
Args:
message (str): The message to log.
isWarning (bool, optional): Whether to log as a warning. Defaults to False.
"""
msg = f"[{type(self).__name__}]: {message}"
logger = self.logger.warn if isWarning else self.logger.info
logger(msg)
def set_params_from_api(self, apiParams):
"""Sets parameters from the API.
This method is intended to be overridden by subclasses to handle
setting specific parameters from the API request.
Args:
apiParams (dict): A dictionary of API parameters.
"""
pass
def get_name(self):
"""Returns the name of the command.
This method returns the name of the command, which is the name of the
class.
Returns:
str: The name of the command.
"""
return type(self).__name__
def emit_event(self, event_queue):
"""Emits an event to the event queue.
This method is a placeholder for emitting events related to the
command's execution. It's intended to be overridden by subclasses
to provide specific event handling.
Args:
event_queue (queue.Queue): The event queue.
"""
pass
def log_message(self):
"""Generates a log message for the command.
This method returns a string indicating that the command is running,
including the command's name.
Returns:
str: The log message.
"""
return f"Running {self.get_name()}"
def build_frame(self):
"""Builds the frame for the command.
This method is a placeholder for building the frame to be transmitted.
It's intended to be overridden by subclasses to generate the
appropriate frame data.
"""
pass
def get_tx_mode(self):
"""Returns the transmission mode for the command.
This method returns the default transmission mode for the command,
which is FREEDV_MODE.signalling. Subclasses can override this method
to specify a different mode.
Returns:
FREEDV_MODE: The transmission mode.
"""
if self.config['EXP'].get('enable_vhf'):
mode = FREEDV_MODE.data_vhf_1
else:
mode = FREEDV_MODE.signalling
return mode
def make_modem_queue_item(self, mode, repeat, repeat_delay, frame):
"""Creates a dictionary representing a modem queue item.
This method creates a dictionary containing the parameters for a
modem transmission, including the mode, repeat count, repeat delay,
and the frame data.
Args:
mode: The transmission mode.
repeat (int): The number of times to repeat the transmission.
repeat_delay (float): The delay between repetitions in seconds.
frame (bytearray): The frame data to transmit.
Returns:
dict: A dictionary representing the modem queue item.
"""
return {
'mode': mode,
'repeat': repeat,
'repeat_delay': repeat_delay,
'frame': frame,
}
def transmit(self, modem):
"""Transmits the command frame via the modem.
This method builds the command frame and transmits it once using the
modem with the configured transmission mode.
Args:
modem: The modem object.
"""
frame = self.build_frame()
modem.transmit(self.get_tx_mode(), 1, 0, frame)
def run(self, event_queue: queue.Queue, modem):
"""Runs the command.
This method emits an event to the event queue, logs a message
indicating the command is running, and transmits the command frame
via the modem.
Args:
event_queue (queue.Queue): The event queue.
modem: The modem object.
"""
self.emit_event(event_queue)
self.logger.info(self.log_message())
self.transmit(modem)
def test(self, event_queue: queue.Queue):
"""Tests the command by building and returning the frame.
This method emits an event, logs a message, and builds the frame
that would be transmitted, but doesn't actually transmit it.
This is useful for testing the frame building logic without
using the modem.
Args:
event_queue (queue.Queue): The event queue.
Returns:
bytearray: The built frame.
"""
self.emit_event(event_queue)
self.logger.info(self.log_message())
return self.build_frame()