Added a UI.
Broke out some functions from loramon.py to other files since loramon was getting complicated. The UI can be turned on with -U option. Old console mode function is preserved.pull/6/head
parent
bc624b1a67
commit
88318b17a1
|
@ -0,0 +1,141 @@
|
|||
import datetime
|
||||
import time
|
||||
import threading
|
||||
|
||||
class RNSSerial():
|
||||
def __init__(self, serial_port):
|
||||
self.serial_device = serial_port
|
||||
|
||||
# Create a lock object
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def Write(self, data):
|
||||
count = None
|
||||
with self.lock:
|
||||
count = self.serial_device.write(data)
|
||||
return count
|
||||
|
||||
def Read(self, count):
|
||||
byte = None
|
||||
with self.lock:
|
||||
byte = self.serial_device.read(count)
|
||||
return byte
|
||||
|
||||
def IsOpen(self):
|
||||
state = None
|
||||
with self.lock:
|
||||
state = self.serial_device.is_open
|
||||
return state
|
||||
|
||||
def InWaiting(self):
|
||||
state = None
|
||||
with self.lock:
|
||||
state = self.serial_device.in_waiting
|
||||
return state
|
||||
|
||||
class RNS():
|
||||
log_enabled = True
|
||||
ui_msg_queue = None
|
||||
|
||||
def __init__ (self):
|
||||
RNS.log_enabled = True
|
||||
RNS.ui_msg_queue = None
|
||||
|
||||
@staticmethod
|
||||
def log(msg):
|
||||
if RNS.log_enabled:
|
||||
logtimefmt = "%Y-%m-%d %H:%M:%S"
|
||||
timestamp = time.time()
|
||||
logstring = "["+time.strftime(logtimefmt)+"] "+msg
|
||||
if RNS.ui_msg_queue == None:
|
||||
print(logstring)
|
||||
else:
|
||||
RNS.ui_msg_queue.put(logstring)
|
||||
|
||||
@staticmethod
|
||||
def hexrep(data, delimit=True):
|
||||
delimiter = ":"
|
||||
if not delimit:
|
||||
delimiter = ""
|
||||
hexrep = delimiter.join("{:02x}".format(c) for c in data)
|
||||
return hexrep
|
||||
|
||||
@staticmethod
|
||||
def prettyhexrep(data):
|
||||
delimiter = ""
|
||||
hexrep = "<"+delimiter.join("{:02x}".format(c) for c in data)+">"
|
||||
return hexrep
|
||||
|
||||
class KISS():
|
||||
FEND = 0xC0
|
||||
FESC = 0xDB
|
||||
TFEND = 0xDC
|
||||
TFESC = 0xDD
|
||||
|
||||
CMD_UNKNOWN = 0xFE
|
||||
CMD_DATA = 0x00
|
||||
CMD_FREQUENCY = 0x01
|
||||
CMD_BANDWIDTH = 0x02
|
||||
CMD_TXPOWER = 0x03
|
||||
CMD_SF = 0x04
|
||||
CMD_CR = 0x05
|
||||
CMD_RADIO_STATE = 0x06
|
||||
CMD_RADIO_LOCK = 0x07
|
||||
CMD_DETECT = 0x08
|
||||
CMD_IMPLICIT = 0x09
|
||||
CMD_PROMISC = 0x0E
|
||||
CMD_READY = 0x0F
|
||||
CMD_STAT_RX = 0x21
|
||||
CMD_STAT_TX = 0x22
|
||||
CMD_STAT_RSSI = 0x23
|
||||
CMD_STAT_SNR = 0x24
|
||||
CMD_STAT_CHTM = 0x25
|
||||
CMD_STAT_PHYPRM = 0x26
|
||||
CMD_STAT_BAT = 0x27
|
||||
CMD_STAT_CSMA = 0x28
|
||||
CMD_BLINK = 0x30
|
||||
CMD_RANDOM = 0x40
|
||||
CMD_FW_VERSION = 0x50
|
||||
CMD_ROM_READ = 0x51
|
||||
CMD_ROM_WRITE = 0x52
|
||||
CMD_CONF_SAVE = 0x53
|
||||
CMD_CONF_DELETE = 0x54
|
||||
DETECT_REQ = 0x73
|
||||
DETECT_RESP = 0x46
|
||||
RADIO_STATE_OFF = 0x00
|
||||
RADIO_STATE_ON = 0x01
|
||||
RADIO_STATE_ASK = 0xFF
|
||||
|
||||
CMD_ERROR = 0x90
|
||||
ERROR_INITRADIO = 0x01
|
||||
ERROR_TXFAILED = 0x02
|
||||
ERROR_EEPROM_LOCKED = 0x03
|
||||
|
||||
@staticmethod
|
||||
def escape(data):
|
||||
data = data.replace(bytes([0xdb]), bytes([0xdb, 0xdd]))
|
||||
data = data.replace(bytes([0xc0]), bytes([0xdb, 0xdc]))
|
||||
return data
|
||||
|
||||
class ROM():
|
||||
PRODUCT_RNODE = chr(0x03)
|
||||
MODEL_A4 = chr(0xA4)
|
||||
MODEL_A9 = chr(0xA9)
|
||||
|
||||
ADDR_PRODUCT = chr(0x00)
|
||||
ADDR_MODEL = chr(0x01)
|
||||
ADDR_HW_REV = chr(0x02)
|
||||
ADDR_SERIAL = chr(0x03)
|
||||
ADDR_MADE = chr(0x07)
|
||||
ADDR_CHKSUM = chr(0x0B)
|
||||
ADDR_SIGNATURE = chr(0x1B)
|
||||
ADDR_INFO_LOCK = chr(0x9B)
|
||||
ADDR_CONF_SF = chr(0x9C)
|
||||
ADDR_CONF_CR = chr(0x9D)
|
||||
ADDR_CONF_TXP = chr(0x9E)
|
||||
ADDR_CONF_BW = chr(0x9F)
|
||||
ADDR_CONF_FREQ = chr(0xA3)
|
||||
ADDR_CONF_OK = chr(0xA7)
|
||||
|
||||
INFO_LOCK_BYTE = chr(0x73)
|
||||
CONF_OK_BYTE = chr(0x73)
|
|
@ -0,0 +1,177 @@
|
|||
import urwid
|
||||
import queue
|
||||
|
||||
class SubmitEdit(urwid.Edit):
|
||||
"""Custom Edit widget that submits on Enter key."""
|
||||
def __init__(self, on_submit_callback, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.on_submit_callback = on_submit_callback
|
||||
|
||||
def keypress(self, size, key):
|
||||
if key == 'enter':
|
||||
self.on_submit_callback()
|
||||
return None
|
||||
return super().keypress(size, key)
|
||||
|
||||
|
||||
class LoRaMonUIApp:
|
||||
def __init__(self, queue_from_radio):
|
||||
|
||||
#setup the Radio specific variables
|
||||
self.frequency = None
|
||||
self.bandwidth = None
|
||||
self.spread_factor = None
|
||||
self.coding_rate = None
|
||||
self.battery = None
|
||||
self.packets_received = 0
|
||||
|
||||
# flag to select original or scrollable list
|
||||
self.ORIGINAL_WIDGET = False
|
||||
|
||||
# Right pane output list
|
||||
self.output_lines = []
|
||||
if self.ORIGINAL_WIDGET: #original code
|
||||
self.output_widget = urwid.Text("", align="left")
|
||||
self.output_box = urwid.LineBox(
|
||||
urwid.Filler(self.output_widget, valign='top'),
|
||||
title="Output"
|
||||
)
|
||||
else: #scrollable
|
||||
self.output_widget = urwid.SimpleListWalker([])
|
||||
self.listbox = urwid.ListBox(self.output_widget)
|
||||
self.output_box = urwid.LineBox(self.listbox, title="Log Output")
|
||||
|
||||
# Shared queue for sending messages
|
||||
self.message_queue = queue_from_radio
|
||||
|
||||
# Left-top: Menu
|
||||
menu_items = [("Menu 1", self.start_thread), ("Menu 2", self.menu_action), ("Menu 3", self.menu_action)]
|
||||
menu_widgets = []
|
||||
|
||||
self.caption_text_widgets = []
|
||||
self.caption_text_widgets.append(urwid.AttrMap(urwid.Text("Radio Freq: "), None))
|
||||
self.caption_text_widgets.append(urwid.AttrMap(urwid.Text("Radio BW: "), None))
|
||||
self.caption_text_widgets.append(urwid.AttrMap(urwid.Text("Radio SF: "), None))
|
||||
self.caption_text_widgets.append(urwid.AttrMap(urwid.Text("Radio CR: "), None))
|
||||
|
||||
|
||||
self.input_edit_widgets = []
|
||||
self.input_edit_widgets.append(urwid.AttrMap(SubmitEdit(self.submit_input,"Freq: ", ""), None))
|
||||
self.input_edit_widgets.append(urwid.AttrMap(SubmitEdit(self.submit_input," BW: ", ""), None))
|
||||
self.input_edit_widgets.append(urwid.AttrMap(SubmitEdit(self.submit_input," SF: ", ""), None))
|
||||
self.input_edit_widgets.append(urwid.AttrMap(SubmitEdit(self.submit_input," CR: ", ""), None))
|
||||
|
||||
for i in range(len(self.input_edit_widgets)):
|
||||
menu_widgets.append(self.caption_text_widgets[i])
|
||||
menu_widgets.append(self.input_edit_widgets[i])
|
||||
|
||||
self.battery_text_widget = urwid.AttrMap(urwid.Text("Battery: "), None)
|
||||
menu_widgets.append(self.battery_text_widget)
|
||||
|
||||
self.packets_received_widget = urwid.AttrMap(urwid.Text("Packets: "), None)
|
||||
menu_widgets.append(self.packets_received_widget)
|
||||
|
||||
|
||||
for label, handler in menu_items:
|
||||
button = urwid.Button(label)
|
||||
urwid.connect_signal(button, 'click', handler, user_args={label})
|
||||
menu_widgets.append(urwid.AttrMap(button, None, focus_map='reversed'))
|
||||
|
||||
menu_listbox = urwid.ListBox(urwid.SimpleFocusListWalker(menu_widgets))
|
||||
menu_box = urwid.LineBox(menu_listbox, title="Menu")
|
||||
|
||||
# Left-bottom: Input
|
||||
self.input_edit = SubmitEdit(self.submit_input, caption="> ")
|
||||
submit_button = urwid.Button("Submit")
|
||||
urwid.connect_signal(submit_button, 'click', lambda button: self.submit_input())
|
||||
|
||||
input_widgets = urwid.Pile([
|
||||
urwid.AttrMap(self.input_edit, None),
|
||||
urwid.AttrMap(submit_button, None, focus_map='reversed')
|
||||
])
|
||||
input_box = urwid.LineBox(input_widgets, title="User Input")
|
||||
|
||||
# Left Pane: Stack menu + input
|
||||
left_pane = urwid.Pile([
|
||||
('weight', 2, menu_box),
|
||||
('weight', 1, input_box)
|
||||
])
|
||||
|
||||
# Layout: Columns
|
||||
columns = urwid.Columns([
|
||||
('weight', 1, left_pane),
|
||||
('weight', 2, self.output_box)
|
||||
])
|
||||
|
||||
if False:
|
||||
self.view = urwid.Frame(
|
||||
header=urwid.Text("Menu Thread Example — Press 'q' to quit"),
|
||||
body=columns
|
||||
)
|
||||
else:
|
||||
self.view = columns
|
||||
|
||||
self.loop = urwid.MainLoop(self.view, unhandled_input=self.handle_input)
|
||||
|
||||
self.loop.set_alarm_in(.1, self.set_in_alarm_handler)
|
||||
|
||||
def append_output(self, line):
|
||||
if self.ORIGINAL_WIDGET:
|
||||
self.output_lines.append(line)
|
||||
self.output_widget.set_text("\n".join(self.output_lines))
|
||||
else:
|
||||
self.output_widget.append(urwid.Text(line))
|
||||
# move to the bottom
|
||||
#self.output_widget.set_focus(len(self.output_widget) - 1)
|
||||
|
||||
def menu_action(self, button, label):
|
||||
self.append_output(f"You clicked: {label}")
|
||||
|
||||
def start_thread(self, button, label):
|
||||
self.append_output("Menu 1: Starting background task...")
|
||||
thread = threading.Thread(target=self.background_task)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
|
||||
def background_task(self):
|
||||
time.sleep(.5) # Simulate some background work
|
||||
#print("Background_task calling set_alarm_in")
|
||||
#self.loop.set_alarm_in(0, self.set_in_alarm_handler)
|
||||
msg = "<--->"
|
||||
#print(f"[Sender] Sending: {msg}\n")
|
||||
self.message_queue.put(msg)
|
||||
|
||||
def set_in_alarm_handler(self, loop, data):
|
||||
#print("\nset_in_alarm_handler running")
|
||||
if not self.message_queue.empty():
|
||||
msg = self.message_queue.get()
|
||||
#print(f"[Receiver] Got message: {msg}")
|
||||
self.append_output(msg)
|
||||
|
||||
#debug message, printing radio frequency
|
||||
#self.append_output(str(self.caption_text_widgets[0].original_widget.text))
|
||||
|
||||
|
||||
#update radio parameters
|
||||
self.caption_text_widgets[0].original_widget.set_text("Radio Freq: " + str(self.frequency))
|
||||
self.caption_text_widgets[1].original_widget.set_text("Radio BW: " + str(self.bandwidth))
|
||||
self.caption_text_widgets[2].original_widget.set_text("Radio SF: " + str(self.spread_factor))
|
||||
self.caption_text_widgets[3].original_widget.set_text("Radio CR: " + str(self.coding_rate))
|
||||
self.battery_text_widget.original_widget.set_text ("Battery: " + str(self.battery))
|
||||
self.packets_received_widget.original_widget.set_text ("Packets: " + str(self.packet_received))
|
||||
|
||||
self.loop.set_alarm_in(.1, self.set_in_alarm_handler)
|
||||
|
||||
def submit_input(self):
|
||||
user_text = self.input_edit.edit_text.strip()
|
||||
if user_text:
|
||||
self.append_output(f"You entered: {user_text}")
|
||||
self.input_edit.set_edit_text("") # Clear input
|
||||
|
||||
def handle_input(self, key):
|
||||
#print("\n\nhandle_input")
|
||||
if key in ('q', 'Q'):
|
||||
raise urwid.ExitMainLoop()
|
||||
|
||||
def run(self):
|
||||
self.loop.run()
|
Loading…
Reference in New Issue