From 88318b17a12f138ba39aeeb03e5fb8ccb2026c15 Mon Sep 17 00:00:00 2001 From: Alligitor <8754766+alligitor@users.noreply.github.com> Date: Sun, 20 Apr 2025 13:33:42 +0000 Subject: [PATCH] Added a UI. Broke out some functions from loramon.py to other files since loramon was getting complicated. The UI can be turned on with -U option. Old console mode function is preserved. --- loramon/LoRaMonHelperClasses.py | 141 +++++++++++++++++++++++++ loramon/LoRaMonUIApp.py | 177 ++++++++++++++++++++++++++++++++ 2 files changed, 318 insertions(+) create mode 100644 loramon/LoRaMonHelperClasses.py create mode 100644 loramon/LoRaMonUIApp.py diff --git a/loramon/LoRaMonHelperClasses.py b/loramon/LoRaMonHelperClasses.py new file mode 100644 index 0000000..e0a3688 --- /dev/null +++ b/loramon/LoRaMonHelperClasses.py @@ -0,0 +1,141 @@ +import datetime +import time +import threading + +class RNSSerial(): + def __init__(self, serial_port): + self.serial_device = serial_port + + # Create a lock object + self.lock = threading.Lock() + + def Write(self, data): + count = None + with self.lock: + count = self.serial_device.write(data) + return count + + def Read(self, count): + byte = None + with self.lock: + byte = self.serial_device.read(count) + return byte + + def IsOpen(self): + state = None + with self.lock: + state = self.serial_device.is_open + return state + + def InWaiting(self): + state = None + with self.lock: + state = self.serial_device.in_waiting + return state + +class RNS(): + log_enabled = True + ui_msg_queue = None + + def __init__ (self): + RNS.log_enabled = True + RNS.ui_msg_queue = None + + @staticmethod + def log(msg): + if RNS.log_enabled: + logtimefmt = "%Y-%m-%d %H:%M:%S" + timestamp = time.time() + logstring = "["+time.strftime(logtimefmt)+"] "+msg + if RNS.ui_msg_queue == None: + print(logstring) + else: + RNS.ui_msg_queue.put(logstring) + + @staticmethod + def hexrep(data, delimit=True): + delimiter = ":" + if not delimit: + delimiter = "" + hexrep = delimiter.join("{:02x}".format(c) for c in data) + return hexrep + + @staticmethod + def prettyhexrep(data): + delimiter = "" + hexrep = "<"+delimiter.join("{:02x}".format(c) for c in data)+">" + return hexrep + +class KISS(): + FEND = 0xC0 + FESC = 0xDB + TFEND = 0xDC + TFESC = 0xDD + + CMD_UNKNOWN = 0xFE + CMD_DATA = 0x00 + CMD_FREQUENCY = 0x01 + CMD_BANDWIDTH = 0x02 + CMD_TXPOWER = 0x03 + CMD_SF = 0x04 + CMD_CR = 0x05 + CMD_RADIO_STATE = 0x06 + CMD_RADIO_LOCK = 0x07 + CMD_DETECT = 0x08 + CMD_IMPLICIT = 0x09 + CMD_PROMISC = 0x0E + CMD_READY = 0x0F + CMD_STAT_RX = 0x21 + CMD_STAT_TX = 0x22 + CMD_STAT_RSSI = 0x23 + CMD_STAT_SNR = 0x24 + CMD_STAT_CHTM = 0x25 + CMD_STAT_PHYPRM = 0x26 + CMD_STAT_BAT = 0x27 + CMD_STAT_CSMA = 0x28 + CMD_BLINK = 0x30 + CMD_RANDOM = 0x40 + CMD_FW_VERSION = 0x50 + CMD_ROM_READ = 0x51 + CMD_ROM_WRITE = 0x52 + CMD_CONF_SAVE = 0x53 + CMD_CONF_DELETE = 0x54 + DETECT_REQ = 0x73 + DETECT_RESP = 0x46 + RADIO_STATE_OFF = 0x00 + RADIO_STATE_ON = 0x01 + RADIO_STATE_ASK = 0xFF + + CMD_ERROR = 0x90 + ERROR_INITRADIO = 0x01 + ERROR_TXFAILED = 0x02 + ERROR_EEPROM_LOCKED = 0x03 + + @staticmethod + def escape(data): + data = data.replace(bytes([0xdb]), bytes([0xdb, 0xdd])) + data = data.replace(bytes([0xc0]), bytes([0xdb, 0xdc])) + return data + +class ROM(): + PRODUCT_RNODE = chr(0x03) + MODEL_A4 = chr(0xA4) + MODEL_A9 = chr(0xA9) + + ADDR_PRODUCT = chr(0x00) + ADDR_MODEL = chr(0x01) + ADDR_HW_REV = chr(0x02) + ADDR_SERIAL = chr(0x03) + ADDR_MADE = chr(0x07) + ADDR_CHKSUM = chr(0x0B) + ADDR_SIGNATURE = chr(0x1B) + ADDR_INFO_LOCK = chr(0x9B) + ADDR_CONF_SF = chr(0x9C) + ADDR_CONF_CR = chr(0x9D) + ADDR_CONF_TXP = chr(0x9E) + ADDR_CONF_BW = chr(0x9F) + ADDR_CONF_FREQ = chr(0xA3) + ADDR_CONF_OK = chr(0xA7) + + INFO_LOCK_BYTE = chr(0x73) + CONF_OK_BYTE = chr(0x73) diff --git a/loramon/LoRaMonUIApp.py b/loramon/LoRaMonUIApp.py new file mode 100644 index 0000000..8867130 --- /dev/null +++ b/loramon/LoRaMonUIApp.py @@ -0,0 +1,177 @@ +import urwid +import queue + +class SubmitEdit(urwid.Edit): + """Custom Edit widget that submits on Enter key.""" + def __init__(self, on_submit_callback, *args, **kwargs): + super().__init__(*args, **kwargs) + self.on_submit_callback = on_submit_callback + + def keypress(self, size, key): + if key == 'enter': + self.on_submit_callback() + return None + return super().keypress(size, key) + + +class LoRaMonUIApp: + def __init__(self, queue_from_radio): + + #setup the Radio specific variables + self.frequency = None + self.bandwidth = None + self.spread_factor = None + self.coding_rate = None + self.battery = None + self.packets_received = 0 + + # flag to select original or scrollable list + self.ORIGINAL_WIDGET = False + + # Right pane output list + self.output_lines = [] + if self.ORIGINAL_WIDGET: #original code + self.output_widget = urwid.Text("", align="left") + self.output_box = urwid.LineBox( + urwid.Filler(self.output_widget, valign='top'), + title="Output" + ) + else: #scrollable + self.output_widget = urwid.SimpleListWalker([]) + self.listbox = urwid.ListBox(self.output_widget) + self.output_box = urwid.LineBox(self.listbox, title="Log Output") + + # Shared queue for sending messages + self.message_queue = queue_from_radio + + # Left-top: Menu + menu_items = [("Menu 1", self.start_thread), ("Menu 2", self.menu_action), ("Menu 3", self.menu_action)] + menu_widgets = [] + + self.caption_text_widgets = [] + self.caption_text_widgets.append(urwid.AttrMap(urwid.Text("Radio Freq: "), None)) + self.caption_text_widgets.append(urwid.AttrMap(urwid.Text("Radio BW: "), None)) + self.caption_text_widgets.append(urwid.AttrMap(urwid.Text("Radio SF: "), None)) + self.caption_text_widgets.append(urwid.AttrMap(urwid.Text("Radio CR: "), None)) + + + self.input_edit_widgets = [] + self.input_edit_widgets.append(urwid.AttrMap(SubmitEdit(self.submit_input,"Freq: ", ""), None)) + self.input_edit_widgets.append(urwid.AttrMap(SubmitEdit(self.submit_input," BW: ", ""), None)) + self.input_edit_widgets.append(urwid.AttrMap(SubmitEdit(self.submit_input," SF: ", ""), None)) + self.input_edit_widgets.append(urwid.AttrMap(SubmitEdit(self.submit_input," CR: ", ""), None)) + + for i in range(len(self.input_edit_widgets)): + menu_widgets.append(self.caption_text_widgets[i]) + menu_widgets.append(self.input_edit_widgets[i]) + + self.battery_text_widget = urwid.AttrMap(urwid.Text("Battery: "), None) + menu_widgets.append(self.battery_text_widget) + + self.packets_received_widget = urwid.AttrMap(urwid.Text("Packets: "), None) + menu_widgets.append(self.packets_received_widget) + + + for label, handler in menu_items: + button = urwid.Button(label) + urwid.connect_signal(button, 'click', handler, user_args={label}) + menu_widgets.append(urwid.AttrMap(button, None, focus_map='reversed')) + + menu_listbox = urwid.ListBox(urwid.SimpleFocusListWalker(menu_widgets)) + menu_box = urwid.LineBox(menu_listbox, title="Menu") + + # Left-bottom: Input + self.input_edit = SubmitEdit(self.submit_input, caption="> ") + submit_button = urwid.Button("Submit") + urwid.connect_signal(submit_button, 'click', lambda button: self.submit_input()) + + input_widgets = urwid.Pile([ + urwid.AttrMap(self.input_edit, None), + urwid.AttrMap(submit_button, None, focus_map='reversed') + ]) + input_box = urwid.LineBox(input_widgets, title="User Input") + + # Left Pane: Stack menu + input + left_pane = urwid.Pile([ + ('weight', 2, menu_box), + ('weight', 1, input_box) + ]) + + # Layout: Columns + columns = urwid.Columns([ + ('weight', 1, left_pane), + ('weight', 2, self.output_box) + ]) + + if False: + self.view = urwid.Frame( + header=urwid.Text("Menu Thread Example — Press 'q' to quit"), + body=columns + ) + else: + self.view = columns + + self.loop = urwid.MainLoop(self.view, unhandled_input=self.handle_input) + + self.loop.set_alarm_in(.1, self.set_in_alarm_handler) + + def append_output(self, line): + if self.ORIGINAL_WIDGET: + self.output_lines.append(line) + self.output_widget.set_text("\n".join(self.output_lines)) + else: + self.output_widget.append(urwid.Text(line)) + # move to the bottom + #self.output_widget.set_focus(len(self.output_widget) - 1) + + def menu_action(self, button, label): + self.append_output(f"You clicked: {label}") + + def start_thread(self, button, label): + self.append_output("Menu 1: Starting background task...") + thread = threading.Thread(target=self.background_task) + thread.daemon = True + thread.start() + + def background_task(self): + time.sleep(.5) # Simulate some background work + #print("Background_task calling set_alarm_in") + #self.loop.set_alarm_in(0, self.set_in_alarm_handler) + msg = "<--->" + #print(f"[Sender] Sending: {msg}\n") + self.message_queue.put(msg) + + def set_in_alarm_handler(self, loop, data): + #print("\nset_in_alarm_handler running") + if not self.message_queue.empty(): + msg = self.message_queue.get() + #print(f"[Receiver] Got message: {msg}") + self.append_output(msg) + + #debug message, printing radio frequency + #self.append_output(str(self.caption_text_widgets[0].original_widget.text)) + + + #update radio parameters + self.caption_text_widgets[0].original_widget.set_text("Radio Freq: " + str(self.frequency)) + self.caption_text_widgets[1].original_widget.set_text("Radio BW: " + str(self.bandwidth)) + self.caption_text_widgets[2].original_widget.set_text("Radio SF: " + str(self.spread_factor)) + self.caption_text_widgets[3].original_widget.set_text("Radio CR: " + str(self.coding_rate)) + self.battery_text_widget.original_widget.set_text ("Battery: " + str(self.battery)) + self.packets_received_widget.original_widget.set_text ("Packets: " + str(self.packet_received)) + + self.loop.set_alarm_in(.1, self.set_in_alarm_handler) + + def submit_input(self): + user_text = self.input_edit.edit_text.strip() + if user_text: + self.append_output(f"You entered: {user_text}") + self.input_edit.set_edit_text("") # Clear input + + def handle_input(self, key): + #print("\n\nhandle_input") + if key in ('q', 'Q'): + raise urwid.ExitMainLoop() + + def run(self): + self.loop.run()