FreeDATA/freedata_server/serial_ptt.py

217 lines
5.1 KiB
Python

import serial
import structlog
class radio:
"""Controls PTT via serial port.
This class manages PTT (Push-to-Talk) control using a serial port. It
handles serial connection setup, RTS/DTR line manipulation based on
configuration, and PTT activation/deactivation. It also provides basic
radio parameter handling, although most of these methods currently
return None.
"""
log = structlog.get_logger("radio (rigctld)")
def __init__(self, config, state_manager):
"""Initializes the serial PTT controller.
Args:
config (dict): Configuration dictionary.
state_manager (StateManager): State manager instance.
"""
self.parameters = {
'frequency': '---',
'mode': '---',
'alc': '---',
'strength': '---',
'bandwidth': '---',
'rf': '---',
'ptt': False, # Initial PTT state is set to False
'tuner': False,
'swr': '---'
}
self.config = config
self.state_manager = state_manager
self.serial_rts = self.config["RADIO"]["serial_rts"]
self.serial_dtr = self.config["RADIO"]["serial_dtr"]
self.serial_comport = self.config["RADIO"]["ptt_port"]
try:
if self.serial_comport in ["ignore"]:
self.state_manager.set_radio("radio_status", False)
raise serial.SerialException
self.serial_connection = serial.Serial(self.serial_comport)
# Set initial states for RTS and DTR based on config
self.set_rts_state(self.serial_rts)
self.set_dtr_state(self.serial_dtr)
self.set_ptt(False)
self.state_manager.set_radio("radio_status", True)
except serial.SerialException as e:
self.log.warning(f"Error: could not open PTT port {self.serial_comport}: {e}")
self.state_manager.set_radio("radio_status", False)
self.serial_connection = None
def connect(self, **kwargs):
"""
Args:
**kwargs:
Returns:
"""
return True
def disconnect(self, **kwargs):
"""
Args:
**kwargs:
Returns:
"""
if self.serial_connection and self.serial_connection.is_open:
self.serial_connection.close()
print("Serial connection closed.")
return True
def get_frequency(self):
""" """
return None
def get_mode(self):
""" """
return None
def get_level(self):
""" """
return None
def get_alc(self):
""" """
return None
def get_meter(self):
""" """
return None
def get_bandwidth(self):
""" """
return None
def get_strength(self):
""" """
return None
def get_tuner(self):
""" """
return None
def get_swr(self):
""" """
return None
def set_bandwidth(self):
""" """
return None
def set_mode(self, mode):
"""
Args:
mode:
Returns:
"""
return None
def set_tuner(self, state):
"""
Args:
mode:
Returns:
"""
return None
def set_frequency(self, frequency):
"""
Args:
mode:
Returns:
"""
return None
def get_status(self):
"""
Args:
mode:
Returns:
"""
return True
def get_ptt(self):
""" """
return self.parameters['ptt']
def set_ptt(self, state):
"""
Args:
state: Boolean - True to activate PTT, False to deactivate
Returns:
state: Boolean - The new PTT state
"""
if self.serial_connection is None:
self.log.warning("Error: Serial connection not established.")
return self.parameters['ptt']
try:
self.set_rts_state(state)
self.set_dtr_state(state)
self.parameters['ptt'] = state
except serial.SerialException as e:
self.log.warning(f"Error: {e}")
self.parameters['ptt'] = False
return self.parameters['ptt']
def set_rts_state(self, state):
"""
Args:
state: Boolean - True to activate, False to deactivate
Sets the RTS line based on the configuration.
"""
if self.serial_rts == "ON":
self.serial_connection.rts = state
else:
self.serial_connection.rts = not state
def set_dtr_state(self, state):
"""
Args:
state: Boolean - True to activate, False to deactivate
Sets the DTR line based on the configuration.
"""
if self.serial_dtr == "ON":
self.serial_connection.dtr = state
else:
self.serial_connection.dtr = not state
def close_rig(self):
""" """
return
def get_parameters(self):
return self.parameters
def stop_service(self):
pass