From 1872e512b5092a63acf85a09207e35bc3d562108 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Wed, 27 May 2020 19:14:17 +0200 Subject: [PATCH] Migrated to Python 3 --- .gitignore | 1 + rnodeconf.py | 162 ++++++++++++++++++++++++++++++--------------------- 2 files changed, 96 insertions(+), 67 deletions(-) diff --git a/.gitignore b/.gitignore index f9dd2c1..4521584 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ romwipe otatest boardinfo RNode.py +.DS_Store # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/rnodeconf.py b/rnodeconf.py index 09d9f44..cd80884 100755 --- a/rnodeconf.py +++ b/rnodeconf.py @@ -1,14 +1,14 @@ -#!/usr/bin/python +#!python3 from time import sleep import argparse import threading import os.path import struct import datetime -import urllib import time import math -import imp +from urllib.request import urlretrieve +from importlib import util rnode = None rnode_serial = None @@ -30,13 +30,13 @@ class RNS(): delimiter = ":" if not delimit: delimiter = "" - hexrep = delimiter.join("{:02x}".format(ord(c)) for c in data) + hexrep = delimiter.join("{:02x}".format(c) for c in data) return hexrep @staticmethod def prettyhexrep(data): delimiter = "" - hexrep = "<"+delimiter.join("{:02x}".format(ord(c)) for c in data)+">" + hexrep = "<"+delimiter.join("{:02x}".format(c) for c in data)+">" return hexrep class KISS(): @@ -59,6 +59,7 @@ class KISS(): CMD_STAT_RX = 0x21 CMD_STAT_TX = 0x22 CMD_STAT_RSSI = 0x23 + CMD_STAT_SNR = 0x24 CMD_BLINK = 0x30 CMD_RANDOM = 0x40 CMD_FW_VERSION = 0x50 @@ -170,9 +171,9 @@ class RNode(): byte = ord(self.serial.read(1)) last_read_ms = int(time.time()*1000) - if (in_frame and byte == KISS.FEND and command == KISS.CMD_DATA): + if (in_frame and byte == KISS.FEND and command == KISS.CMD_ROM_READ): + self.eeprom = data_buffer in_frame = False - self.processIncoming(data_buffer) data_buffer = b"" command_buffer = b"" elif (byte == KISS.FEND): @@ -183,6 +184,17 @@ class RNode(): elif (in_frame and len(data_buffer) < 512): if (len(data_buffer) == 0 and command == KISS.CMD_UNKNOWN): command = byte + elif (command == KISS.CMD_ROM_READ): + if (byte == KISS.FESC): + escape = True + else: + if (escape): + if (byte == KISS.TFEND): + byte = KISS.FEND + if (byte == KISS.TFESC): + byte = KISS.FESC + escape = False + data_buffer = data_buffer+bytes([byte]) elif (command == KISS.CMD_DATA): if (byte == KISS.FESC): escape = True @@ -236,7 +248,7 @@ class RNode(): if (byte == KISS.TFESC): byte = KISS.FESC escape = False - command_buffer = command_buffer+byte + command_buffer = command_buffer+bytes([byte]) if (len(command_buffer) == 2): self.major_version = command_buffer[0] self.minor_version = command_buffer[1] @@ -297,6 +309,11 @@ class RNode(): RNS.log(str(self)+" hardware TX error (code "+RNS.hexrep(byte)+")") else: RNS.log(str(self)+" hardware error (code "+RNS.hexrep(byte)+")") + elif (command == KISS.CMD_DETECT): + if byte == KISS.DETECT_RESP: + self.detected = True + else: + self.detected = False else: time_since_last = int(time.time()*1000) - last_read_ms @@ -309,13 +326,14 @@ class RNode(): sleep(0.08) except Exception as e: + raise e exit() def updateBitrate(self): try: - self.bitrate = self.r_sf * ( (4.0/self.cr) / (math.pow(2,self.r_sf)/(self.r_bandwidth/1000)) ) * 1000 + self.bitrate = self.r_sf * ( (4.0/self.r_cr) / (math.pow(2,self.r_sf)/(self.r_bandwidth/1000)) ) * 1000 self.bitrate_kbps = round(self.bitrate/1000.0, 2) - except: + except Exception as e: self.bitrate = 0 def updateVersion(self): @@ -409,7 +427,9 @@ class RNode(): sleep(13); def write_eeprom(self, addr, byte): - kiss_command = bytes([KISS.FEND, KISS.CMD_ROM_WRITE, KISS.escape(addr), KISS.escape(byte), KISS.FEND]) + write_payload = b"" + bytes([addr, byte]) + write_payload = KISS.escape(write_payload) + kiss_command = bytes([KISS.FEND, KISS.CMD_ROM_WRITE]) + write_payload + bytes([KISS.FEND]) written = self.serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while writing EEPROM") @@ -429,28 +449,30 @@ class RNode(): self.parse_eeprom() def parse_eeprom(self): - if self.eeprom[ord(ROM.ADDR_INFO_LOCK)] == ROM.INFO_LOCK_BYTE: + if self.eeprom[ROM.ADDR_INFO_LOCK] == ROM.INFO_LOCK_BYTE: + # TODO: Remove + RNS.log("Provisioned device") from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend self.provisioned = True - self.product = self.eeprom[ord(ROM.ADDR_PRODUCT)] - self.model = self.eeprom[ord(ROM.ADDR_MODEL)] - self.hw_rev = self.eeprom[ord(ROM.ADDR_HW_REV)] - self.serialno = "" + self.eeprom[ord(ROM.ADDR_SERIAL)] + self.eeprom[ord(ROM.ADDR_SERIAL)+1] + self.eeprom[ord(ROM.ADDR_SERIAL)+2] + self.eeprom[ord(ROM.ADDR_SERIAL)+3] - self.made = "" + self.eeprom[ord(ROM.ADDR_MADE)] + self.eeprom[ord(ROM.ADDR_MADE)+1] + self.eeprom[ord(ROM.ADDR_MADE)+2] + self.eeprom[ord(ROM.ADDR_MADE)+3] - self.checksum = "" + self.product = self.eeprom[ROM.ADDR_PRODUCT] + self.model = self.eeprom[ROM.ADDR_MODEL] + self.hw_rev = self.eeprom[ROM.ADDR_HW_REV] + self.serialno = bytes([self.eeprom[ROM.ADDR_SERIAL], self.eeprom[ROM.ADDR_SERIAL+1], self.eeprom[ROM.ADDR_SERIAL+2], self.eeprom[ROM.ADDR_SERIAL+3]]) + self.made = bytes([self.eeprom[ROM.ADDR_MADE], self.eeprom[ROM.ADDR_MADE+1], self.eeprom[ROM.ADDR_MADE+2], self.eeprom[ROM.ADDR_MADE+3]]) + self.checksum = b"" - self.min_freq = ranges[ord(self.model)][0] - self.max_freq = ranges[ord(self.model)][1] - self.max_output = ranges[ord(self.model)][2] + self.min_freq = ranges[self.model][0] + self.max_freq = ranges[self.model][1] + self.max_output = ranges[self.model][2] try: - self.min_freq = ranges[ord(self.model)][0] - self.max_freq = ranges[ord(self.model)][1] - self.max_output = ranges[ord(self.model)][2] + self.min_freq = ranges[self.model][0] + self.max_freq = ranges[self.model][1] + self.max_output = ranges[self.model][2] except Exception as e: RNS.log("Exception") RNS.log(str(e)) @@ -459,20 +481,17 @@ class RNode(): self.max_output = 0 for i in range(0,16): - self.checksum = self.checksum+self.eeprom[ord(ROM.ADDR_CHKSUM)+i] + self.checksum = self.checksum+bytes([self.eeprom[ROM.ADDR_CHKSUM+i]]) - self.signature = "" + self.signature = b"" for i in range(0,128): - self.signature = self.signature+self.eeprom[ord(ROM.ADDR_SIGNATURE)+i] + self.signature = self.signature+bytes([self.eeprom[ROM.ADDR_SIGNATURE+i]]) - checksummed_info = self.product+self.model+self.hw_rev+self.serialno+self.made + checksummed_info = b"" + bytes([self.product]) + bytes([self.model]) + bytes([self.hw_rev]) + self.serialno + self.made digest = hashes.Hash(hashes.MD5(), backend=default_backend()) digest.update(checksummed_info) checksum = digest.finalize() - #RNS.log("EEPROM checksum: "+RNS.hexrep(self.checksum)) - #RNS.log("Calculated checksum: "+RNS.hexrep(checksum)) - if self.checksum != checksum: self.provisioned = False RNS.log("EEPROM checksum mismatch") @@ -485,7 +504,7 @@ class RNode(): for known in known_keys: vendor = known[0] public_hexrep = known[1] - public_bytes = public_hexrep.decode("hex") + public_bytes = bytes.fromhex(public_hexrep) public_key = load_der_public_key(public_bytes, backend=default_backend()) try: public_key.verify( @@ -502,13 +521,13 @@ class RNode(): except Exception as e: RNS.log("Board signature validation failed") - if self.eeprom[ord(ROM.ADDR_CONF_OK)] == ROM.CONF_OK_BYTE: + if self.eeprom[ROM.ADDR_CONF_OK] == ROM.CONF_OK_BYTE: self.configured = True - self.conf_sf = ord(self.eeprom[ord(ROM.ADDR_CONF_SF)]) - self.conf_cr = ord(self.eeprom[ord(ROM.ADDR_CONF_CR)]) - self.conf_txpower = ord(self.eeprom[ord(ROM.ADDR_CONF_TXP)]) - self.conf_frequency = ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)]) << 24 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+1]) << 16 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+2]) << 8 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+3]) - self.conf_bandwidth = ord(self.eeprom[ord(ROM.ADDR_CONF_BW)]) << 24 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+1]) << 16 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+2]) << 8 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+3]) + self.conf_sf = self.eeprom[ROM.ADDR_CONF_SF] + self.conf_cr = self.eeprom[ROM.ADDR_CONF_CR] + self.conf_txpower = self.eeprom[ROM.ADDR_CONF_TXP] + self.conf_frequency = self.eeprom[ROM.ADDR_CONF_FREQ] << 24 | self.eeprom[ROM.ADDR_CONF_FREQ+1] << 16 | self.eeprom[ROM.ADDR_CONF_FREQ+2] << 8 | self.eeprom[ROM.ADDR_CONF_FREQ+3] + self.conf_bandwidth = self.eeprom[ROM.ADDR_CONF_BW] << 24 | self.eeprom[ROM.ADDR_CONF_BW+1] << 16 | self.eeprom[ROM.ADDR_CONF_BW+2] << 8 | self.eeprom[ROM.ADDR_CONF_BW+3] else: self.configured = False else: @@ -531,20 +550,22 @@ def config_interface(): if __name__ == "__main__": try: - imp.find_module("serial") + if not util.find_spec("serial"): + raise ImportError("Serial module could not be found") except ImportError: print("") print("RNode Config Utility needs pyserial to work.") - print("You can install it with: pip install pyserial") + print("You can install it with: pip3 install pyserial") print("") exit() try: - imp.find_module("cryptography") + if not util.find_spec("cryptography"): + raise ImportError("Cryptography module could not be found") except ImportError: print("") print("RNode Config Utility needs the cryptography module to work.") - print("You can install it with: pip install cryptography") + print("You can install it with: pip3 install cryptography") print("") exit() @@ -565,8 +586,8 @@ if __name__ == "__main__": parser.add_argument("--freq", action="store", metavar="Hz", type=int, default=None, help="Frequency in Hz for TNC mode") parser.add_argument("--bw", action="store", metavar="Hz", type=int, default=None, help="Bandwidth in Hz for TNC mode") parser.add_argument("--txp", action="store", metavar="dBm", type=int, default=None, help="TX power in dBm for TNC mode") - parser.add_argument("--sf", action="store", metavar="factor", type=int, default=None, help="Spreading factor for TNC mode") - parser.add_argument("--cr", action="store", metavar="rate", type=int, default=None, help="Coding rate for TNC mode") + parser.add_argument("--sf", action="store", metavar="factor", type=int, default=None, help="Spreading factor for TNC mode (7 - 12)") + parser.add_argument("--cr", action="store", metavar="rate", type=int, default=None, help="Coding rate for TNC mode (5 - 8)") parser.add_argument("--model", action="store", metavar="model", type=str, default=None, help="Model code for EEPROM bootstrap") parser.add_argument("--hwrev", action="store", metavar="revision", type=int, default=None, help="Hardware revision EEPROM bootstrap") parser.add_argument("--nocheck", action="store_true", help="Don't check for firmware updates online") @@ -587,7 +608,7 @@ if __name__ == "__main__": if args.public: private_bytes = None try: - file = open("./firmware/signing.key", "r") + file = open("./firmware/signing.key", "rb") private_bytes = file.read() file.close() except Exception as e: @@ -634,7 +655,7 @@ if __name__ == "__main__": RNS.log("Signing key already exists, not overwriting!") RNS.log("Manually delete this key to create a new one.") else: - file = open("./firmware/signing.key", "w") + file = open("./firmware/signing.key", "wb") file.write(private_bytes) file.close() @@ -651,7 +672,7 @@ if __name__ == "__main__": if not args.nocheck: try: RNS.log("Downloading latest firmware from GitHub...") - urllib.urlretrieve(firmware_update_url, "update/rnode_update.hex") + urlretrieve(firmware_update_url, "update/rnode_update.hex") RNS.log("Firmware download completed") except Exception as e: RNS.log("Could not download firmware update") @@ -742,7 +763,7 @@ if __name__ == "__main__": timestamp = time.time() filename = str(time.strftime("%Y-%m-%d_%H-%M-%S")) path = "./eeprom/"+filename+".eeprom" - file = open(path, "w") + file = open(path, "wb") file.write(rnode.eeprom) file.close() RNS.log("EEPROM backup written to: "+path) @@ -762,9 +783,9 @@ if __name__ == "__main__": RNS.log("") RNS.log("Board info:") RNS.log("\tFirmware version:\t"+rnode.version) - RNS.log("\tProduct code:\t\t"+RNS.hexrep(rnode.product)) - RNS.log("\tModel code:\t\t"+RNS.hexrep(rnode.model)) - RNS.log("\tHardware revision:\t"+RNS.hexrep(rnode.hw_rev)) + RNS.log("\tProduct code:\t\t"+bytes([rnode.product]).hex()) + RNS.log("\tModel code:\t\t"+bytes([rnode.model]).hex()) + RNS.log("\tHardware revision:\t"+bytes([rnode.hw_rev]).hex()) RNS.log("\tSerial number:\t\t"+RNS.hexrep(rnode.serialno)) RNS.log("\tFrequency range:\t"+str(rnode.min_freq/1e6)+" MHz - "+str(rnode.max_freq/1e6)+" MHz") RNS.log("\tMax TX power:\t\t"+str(rnode.max_output)+" dBm") @@ -772,19 +793,22 @@ if __name__ == "__main__": if rnode.configured: rnode.bandwidth = rnode.conf_bandwidth + rnode.r_bandwidth = rnode.conf_bandwidth rnode.sf = rnode.conf_sf + rnode.r_sf = rnode.conf_sf rnode.cr = rnode.conf_cr + rnode.r_cr = rnode.conf_cr rnode.updateBitrate() + txp_mw = round(pow(10, (rnode.conf_txpower/10)), 3) RNS.log("\tDevice signature:\t"+sigstring) RNS.log(""); RNS.log("\tDevice mode:\t\tTNC") RNS.log("\t Frequency:\t\t"+str((rnode.conf_frequency/1000000.0))+" MHz") RNS.log("\t Bandwidth:\t\t"+str(rnode.conf_bandwidth/1000.0)+" KHz") - RNS.log("\t TX power:\t\t"+str(rnode.conf_txpower)+" dBm") + RNS.log("\t TX power:\t\t"+str(rnode.conf_txpower)+" dBm ("+str(txp_mw)+" mW)") RNS.log("\t Spreading factor:\t"+str(rnode.conf_sf)) RNS.log("\t Coding rate:\t\t"+str(rnode.conf_cr)) RNS.log("\t On-air bitrate:\t"+str(rnode.bitrate_kbps)+" kbps") - else: RNS.log("\tDevice mode:\t\tNormal (host-controlled)") RNS.log("\tDevice signature:\t"+sigstring) @@ -839,7 +863,9 @@ if __name__ == "__main__": file.write(str(serialno)) file.close() - info_chunk = ROM.PRODUCT_RNODE+model+hwrev+serial_bytes+time_bytes + info_chunk = b"" + bytes([ROM.PRODUCT_RNODE, model, ord(hwrev)]) + info_chunk += serial_bytes + info_chunk += time_bytes digest = hashes.Hash(hashes.MD5(), backend=default_backend()) digest.update(info_chunk) checksum = digest.finalize() @@ -849,7 +875,7 @@ if __name__ == "__main__": key_path = "./firmware/signing.key" if os.path.isfile(key_path): try: - file = open(key_path, "r") + file = open(key_path, "rb") private_bytes = file.read() file.close() private_key = serialization.load_der_private_key( @@ -884,31 +910,31 @@ if __name__ == "__main__": time.sleep(0.006) rnode.write_eeprom(ROM.ADDR_MODEL, model) time.sleep(0.006) - rnode.write_eeprom(ROM.ADDR_HW_REV, hwrev) + rnode.write_eeprom(ROM.ADDR_HW_REV, ord(hwrev)) time.sleep(0.006) rnode.write_eeprom(ROM.ADDR_SERIAL, serial_bytes[0]) time.sleep(0.006) - rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+1), serial_bytes[1]) + rnode.write_eeprom(ROM.ADDR_SERIAL+1, serial_bytes[1]) time.sleep(0.006) - rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+2), serial_bytes[2]) + rnode.write_eeprom(ROM.ADDR_SERIAL+2, serial_bytes[2]) time.sleep(0.006) - rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+3), serial_bytes[3]) + rnode.write_eeprom(ROM.ADDR_SERIAL+3, serial_bytes[3]) time.sleep(0.006) rnode.write_eeprom(ROM.ADDR_MADE, time_bytes[0]) time.sleep(0.006) - rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+1), time_bytes[1]) + rnode.write_eeprom(ROM.ADDR_MADE+1, time_bytes[1]) time.sleep(0.006) - rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+2), time_bytes[2]) + rnode.write_eeprom(ROM.ADDR_MADE+2, time_bytes[2]) time.sleep(0.006) - rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+3), time_bytes[3]) + rnode.write_eeprom(ROM.ADDR_MADE+3, time_bytes[3]) time.sleep(0.006) for i in range(0,16): - rnode.write_eeprom(chr(ord(ROM.ADDR_CHKSUM)+i), checksum[i]) + rnode.write_eeprom(ROM.ADDR_CHKSUM+i, checksum[i]) time.sleep(0.006) for i in range(0,128): - rnode.write_eeprom(chr(ord(ROM.ADDR_SIGNATURE)+i), signature[i]) + rnode.write_eeprom(ROM.ADDR_SIGNATURE+i, signature[i]) time.sleep(0.006) rnode.write_eeprom(ROM.ADDR_INFO_LOCK, ROM.INFO_LOCK_BYTE) @@ -918,8 +944,9 @@ if __name__ == "__main__": if rnode.provisioned: RNS.log("EEPROM Bootstrapping successful!") try: - file = open("./firmware/device_db/"+serial_bytes.encode("hex"), "w") - file.write(rnode.eeprom) + file = open("./firmware/device_db/"+serial_bytes.hex(), "wb") + written = file.write(rnode.eeprom) + file.close() except Exception as e: RNS.log("WARNING: Could not backup device EEPROM to disk") exit() @@ -927,8 +954,9 @@ if __name__ == "__main__": RNS.log("EEPROM was written, but validation failed. Check your settings.") exit() except Exception as e: - RNS.log("An error occured while writing EEPROM. The contained exception was:") + RNS.log("An error occurred while writing EEPROM. The contained exception was:") RNS.log(str(e)) + raise e else: RNS.log("Invalid data specified, cancelling EEPROM write")