Migrated to Python 3

pull/2/head
Mark Qvist 2020-05-27 19:14:17 +02:00
parent 4f4ba852fd
commit 1872e512b5
2 changed files with 96 additions and 67 deletions

1
.gitignore vendored
View File

@ -11,6 +11,7 @@ romwipe
otatest
boardinfo
RNode.py
.DS_Store
# Byte-compiled / optimized / DLL files
__pycache__/

View File

@ -1,14 +1,14 @@
#!/usr/bin/python
#!python3
from time import sleep
import argparse
import threading
import os.path
import struct
import datetime
import urllib
import time
import math
import imp
from urllib.request import urlretrieve
from importlib import util
rnode = None
rnode_serial = None
@ -30,13 +30,13 @@ class RNS():
delimiter = ":"
if not delimit:
delimiter = ""
hexrep = delimiter.join("{:02x}".format(ord(c)) for c in data)
hexrep = delimiter.join("{:02x}".format(c) for c in data)
return hexrep
@staticmethod
def prettyhexrep(data):
delimiter = ""
hexrep = "<"+delimiter.join("{:02x}".format(ord(c)) for c in data)+">"
hexrep = "<"+delimiter.join("{:02x}".format(c) for c in data)+">"
return hexrep
class KISS():
@ -59,6 +59,7 @@ class KISS():
CMD_STAT_RX = 0x21
CMD_STAT_TX = 0x22
CMD_STAT_RSSI = 0x23
CMD_STAT_SNR = 0x24
CMD_BLINK = 0x30
CMD_RANDOM = 0x40
CMD_FW_VERSION = 0x50
@ -170,9 +171,9 @@ class RNode():
byte = ord(self.serial.read(1))
last_read_ms = int(time.time()*1000)
if (in_frame and byte == KISS.FEND and command == KISS.CMD_DATA):
if (in_frame and byte == KISS.FEND and command == KISS.CMD_ROM_READ):
self.eeprom = data_buffer
in_frame = False
self.processIncoming(data_buffer)
data_buffer = b""
command_buffer = b""
elif (byte == KISS.FEND):
@ -183,6 +184,17 @@ class RNode():
elif (in_frame and len(data_buffer) < 512):
if (len(data_buffer) == 0 and command == KISS.CMD_UNKNOWN):
command = byte
elif (command == KISS.CMD_ROM_READ):
if (byte == KISS.FESC):
escape = True
else:
if (escape):
if (byte == KISS.TFEND):
byte = KISS.FEND
if (byte == KISS.TFESC):
byte = KISS.FESC
escape = False
data_buffer = data_buffer+bytes([byte])
elif (command == KISS.CMD_DATA):
if (byte == KISS.FESC):
escape = True
@ -236,7 +248,7 @@ class RNode():
if (byte == KISS.TFESC):
byte = KISS.FESC
escape = False
command_buffer = command_buffer+byte
command_buffer = command_buffer+bytes([byte])
if (len(command_buffer) == 2):
self.major_version = command_buffer[0]
self.minor_version = command_buffer[1]
@ -297,6 +309,11 @@ class RNode():
RNS.log(str(self)+" hardware TX error (code "+RNS.hexrep(byte)+")")
else:
RNS.log(str(self)+" hardware error (code "+RNS.hexrep(byte)+")")
elif (command == KISS.CMD_DETECT):
if byte == KISS.DETECT_RESP:
self.detected = True
else:
self.detected = False
else:
time_since_last = int(time.time()*1000) - last_read_ms
@ -309,13 +326,14 @@ class RNode():
sleep(0.08)
except Exception as e:
raise e
exit()
def updateBitrate(self):
try:
self.bitrate = self.r_sf * ( (4.0/self.cr) / (math.pow(2,self.r_sf)/(self.r_bandwidth/1000)) ) * 1000
self.bitrate = self.r_sf * ( (4.0/self.r_cr) / (math.pow(2,self.r_sf)/(self.r_bandwidth/1000)) ) * 1000
self.bitrate_kbps = round(self.bitrate/1000.0, 2)
except:
except Exception as e:
self.bitrate = 0
def updateVersion(self):
@ -409,7 +427,9 @@ class RNode():
sleep(13);
def write_eeprom(self, addr, byte):
kiss_command = bytes([KISS.FEND, KISS.CMD_ROM_WRITE, KISS.escape(addr), KISS.escape(byte), KISS.FEND])
write_payload = b"" + bytes([addr, byte])
write_payload = KISS.escape(write_payload)
kiss_command = bytes([KISS.FEND, KISS.CMD_ROM_WRITE]) + write_payload + bytes([KISS.FEND])
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while writing EEPROM")
@ -429,28 +449,30 @@ class RNode():
self.parse_eeprom()
def parse_eeprom(self):
if self.eeprom[ord(ROM.ADDR_INFO_LOCK)] == ROM.INFO_LOCK_BYTE:
if self.eeprom[ROM.ADDR_INFO_LOCK] == ROM.INFO_LOCK_BYTE:
# TODO: Remove
RNS.log("Provisioned device")
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
self.provisioned = True
self.product = self.eeprom[ord(ROM.ADDR_PRODUCT)]
self.model = self.eeprom[ord(ROM.ADDR_MODEL)]
self.hw_rev = self.eeprom[ord(ROM.ADDR_HW_REV)]
self.serialno = "" + self.eeprom[ord(ROM.ADDR_SERIAL)] + self.eeprom[ord(ROM.ADDR_SERIAL)+1] + self.eeprom[ord(ROM.ADDR_SERIAL)+2] + self.eeprom[ord(ROM.ADDR_SERIAL)+3]
self.made = "" + self.eeprom[ord(ROM.ADDR_MADE)] + self.eeprom[ord(ROM.ADDR_MADE)+1] + self.eeprom[ord(ROM.ADDR_MADE)+2] + self.eeprom[ord(ROM.ADDR_MADE)+3]
self.checksum = ""
self.product = self.eeprom[ROM.ADDR_PRODUCT]
self.model = self.eeprom[ROM.ADDR_MODEL]
self.hw_rev = self.eeprom[ROM.ADDR_HW_REV]
self.serialno = bytes([self.eeprom[ROM.ADDR_SERIAL], self.eeprom[ROM.ADDR_SERIAL+1], self.eeprom[ROM.ADDR_SERIAL+2], self.eeprom[ROM.ADDR_SERIAL+3]])
self.made = bytes([self.eeprom[ROM.ADDR_MADE], self.eeprom[ROM.ADDR_MADE+1], self.eeprom[ROM.ADDR_MADE+2], self.eeprom[ROM.ADDR_MADE+3]])
self.checksum = b""
self.min_freq = ranges[ord(self.model)][0]
self.max_freq = ranges[ord(self.model)][1]
self.max_output = ranges[ord(self.model)][2]
self.min_freq = ranges[self.model][0]
self.max_freq = ranges[self.model][1]
self.max_output = ranges[self.model][2]
try:
self.min_freq = ranges[ord(self.model)][0]
self.max_freq = ranges[ord(self.model)][1]
self.max_output = ranges[ord(self.model)][2]
self.min_freq = ranges[self.model][0]
self.max_freq = ranges[self.model][1]
self.max_output = ranges[self.model][2]
except Exception as e:
RNS.log("Exception")
RNS.log(str(e))
@ -459,20 +481,17 @@ class RNode():
self.max_output = 0
for i in range(0,16):
self.checksum = self.checksum+self.eeprom[ord(ROM.ADDR_CHKSUM)+i]
self.checksum = self.checksum+bytes([self.eeprom[ROM.ADDR_CHKSUM+i]])
self.signature = ""
self.signature = b""
for i in range(0,128):
self.signature = self.signature+self.eeprom[ord(ROM.ADDR_SIGNATURE)+i]
self.signature = self.signature+bytes([self.eeprom[ROM.ADDR_SIGNATURE+i]])
checksummed_info = self.product+self.model+self.hw_rev+self.serialno+self.made
checksummed_info = b"" + bytes([self.product]) + bytes([self.model]) + bytes([self.hw_rev]) + self.serialno + self.made
digest = hashes.Hash(hashes.MD5(), backend=default_backend())
digest.update(checksummed_info)
checksum = digest.finalize()
#RNS.log("EEPROM checksum: "+RNS.hexrep(self.checksum))
#RNS.log("Calculated checksum: "+RNS.hexrep(checksum))
if self.checksum != checksum:
self.provisioned = False
RNS.log("EEPROM checksum mismatch")
@ -485,7 +504,7 @@ class RNode():
for known in known_keys:
vendor = known[0]
public_hexrep = known[1]
public_bytes = public_hexrep.decode("hex")
public_bytes = bytes.fromhex(public_hexrep)
public_key = load_der_public_key(public_bytes, backend=default_backend())
try:
public_key.verify(
@ -502,13 +521,13 @@ class RNode():
except Exception as e:
RNS.log("Board signature validation failed")
if self.eeprom[ord(ROM.ADDR_CONF_OK)] == ROM.CONF_OK_BYTE:
if self.eeprom[ROM.ADDR_CONF_OK] == ROM.CONF_OK_BYTE:
self.configured = True
self.conf_sf = ord(self.eeprom[ord(ROM.ADDR_CONF_SF)])
self.conf_cr = ord(self.eeprom[ord(ROM.ADDR_CONF_CR)])
self.conf_txpower = ord(self.eeprom[ord(ROM.ADDR_CONF_TXP)])
self.conf_frequency = ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)]) << 24 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+1]) << 16 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+2]) << 8 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+3])
self.conf_bandwidth = ord(self.eeprom[ord(ROM.ADDR_CONF_BW)]) << 24 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+1]) << 16 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+2]) << 8 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+3])
self.conf_sf = self.eeprom[ROM.ADDR_CONF_SF]
self.conf_cr = self.eeprom[ROM.ADDR_CONF_CR]
self.conf_txpower = self.eeprom[ROM.ADDR_CONF_TXP]
self.conf_frequency = self.eeprom[ROM.ADDR_CONF_FREQ] << 24 | self.eeprom[ROM.ADDR_CONF_FREQ+1] << 16 | self.eeprom[ROM.ADDR_CONF_FREQ+2] << 8 | self.eeprom[ROM.ADDR_CONF_FREQ+3]
self.conf_bandwidth = self.eeprom[ROM.ADDR_CONF_BW] << 24 | self.eeprom[ROM.ADDR_CONF_BW+1] << 16 | self.eeprom[ROM.ADDR_CONF_BW+2] << 8 | self.eeprom[ROM.ADDR_CONF_BW+3]
else:
self.configured = False
else:
@ -531,20 +550,22 @@ def config_interface():
if __name__ == "__main__":
try:
imp.find_module("serial")
if not util.find_spec("serial"):
raise ImportError("Serial module could not be found")
except ImportError:
print("")
print("RNode Config Utility needs pyserial to work.")
print("You can install it with: pip install pyserial")
print("You can install it with: pip3 install pyserial")
print("")
exit()
try:
imp.find_module("cryptography")
if not util.find_spec("cryptography"):
raise ImportError("Cryptography module could not be found")
except ImportError:
print("")
print("RNode Config Utility needs the cryptography module to work.")
print("You can install it with: pip install cryptography")
print("You can install it with: pip3 install cryptography")
print("")
exit()
@ -565,8 +586,8 @@ if __name__ == "__main__":
parser.add_argument("--freq", action="store", metavar="Hz", type=int, default=None, help="Frequency in Hz for TNC mode")
parser.add_argument("--bw", action="store", metavar="Hz", type=int, default=None, help="Bandwidth in Hz for TNC mode")
parser.add_argument("--txp", action="store", metavar="dBm", type=int, default=None, help="TX power in dBm for TNC mode")
parser.add_argument("--sf", action="store", metavar="factor", type=int, default=None, help="Spreading factor for TNC mode")
parser.add_argument("--cr", action="store", metavar="rate", type=int, default=None, help="Coding rate for TNC mode")
parser.add_argument("--sf", action="store", metavar="factor", type=int, default=None, help="Spreading factor for TNC mode (7 - 12)")
parser.add_argument("--cr", action="store", metavar="rate", type=int, default=None, help="Coding rate for TNC mode (5 - 8)")
parser.add_argument("--model", action="store", metavar="model", type=str, default=None, help="Model code for EEPROM bootstrap")
parser.add_argument("--hwrev", action="store", metavar="revision", type=int, default=None, help="Hardware revision EEPROM bootstrap")
parser.add_argument("--nocheck", action="store_true", help="Don't check for firmware updates online")
@ -587,7 +608,7 @@ if __name__ == "__main__":
if args.public:
private_bytes = None
try:
file = open("./firmware/signing.key", "r")
file = open("./firmware/signing.key", "rb")
private_bytes = file.read()
file.close()
except Exception as e:
@ -634,7 +655,7 @@ if __name__ == "__main__":
RNS.log("Signing key already exists, not overwriting!")
RNS.log("Manually delete this key to create a new one.")
else:
file = open("./firmware/signing.key", "w")
file = open("./firmware/signing.key", "wb")
file.write(private_bytes)
file.close()
@ -651,7 +672,7 @@ if __name__ == "__main__":
if not args.nocheck:
try:
RNS.log("Downloading latest firmware from GitHub...")
urllib.urlretrieve(firmware_update_url, "update/rnode_update.hex")
urlretrieve(firmware_update_url, "update/rnode_update.hex")
RNS.log("Firmware download completed")
except Exception as e:
RNS.log("Could not download firmware update")
@ -742,7 +763,7 @@ if __name__ == "__main__":
timestamp = time.time()
filename = str(time.strftime("%Y-%m-%d_%H-%M-%S"))
path = "./eeprom/"+filename+".eeprom"
file = open(path, "w")
file = open(path, "wb")
file.write(rnode.eeprom)
file.close()
RNS.log("EEPROM backup written to: "+path)
@ -762,9 +783,9 @@ if __name__ == "__main__":
RNS.log("")
RNS.log("Board info:")
RNS.log("\tFirmware version:\t"+rnode.version)
RNS.log("\tProduct code:\t\t"+RNS.hexrep(rnode.product))
RNS.log("\tModel code:\t\t"+RNS.hexrep(rnode.model))
RNS.log("\tHardware revision:\t"+RNS.hexrep(rnode.hw_rev))
RNS.log("\tProduct code:\t\t"+bytes([rnode.product]).hex())
RNS.log("\tModel code:\t\t"+bytes([rnode.model]).hex())
RNS.log("\tHardware revision:\t"+bytes([rnode.hw_rev]).hex())
RNS.log("\tSerial number:\t\t"+RNS.hexrep(rnode.serialno))
RNS.log("\tFrequency range:\t"+str(rnode.min_freq/1e6)+" MHz - "+str(rnode.max_freq/1e6)+" MHz")
RNS.log("\tMax TX power:\t\t"+str(rnode.max_output)+" dBm")
@ -772,19 +793,22 @@ if __name__ == "__main__":
if rnode.configured:
rnode.bandwidth = rnode.conf_bandwidth
rnode.r_bandwidth = rnode.conf_bandwidth
rnode.sf = rnode.conf_sf
rnode.r_sf = rnode.conf_sf
rnode.cr = rnode.conf_cr
rnode.r_cr = rnode.conf_cr
rnode.updateBitrate()
txp_mw = round(pow(10, (rnode.conf_txpower/10)), 3)
RNS.log("\tDevice signature:\t"+sigstring)
RNS.log("");
RNS.log("\tDevice mode:\t\tTNC")
RNS.log("\t Frequency:\t\t"+str((rnode.conf_frequency/1000000.0))+" MHz")
RNS.log("\t Bandwidth:\t\t"+str(rnode.conf_bandwidth/1000.0)+" KHz")
RNS.log("\t TX power:\t\t"+str(rnode.conf_txpower)+" dBm")
RNS.log("\t TX power:\t\t"+str(rnode.conf_txpower)+" dBm ("+str(txp_mw)+" mW)")
RNS.log("\t Spreading factor:\t"+str(rnode.conf_sf))
RNS.log("\t Coding rate:\t\t"+str(rnode.conf_cr))
RNS.log("\t On-air bitrate:\t"+str(rnode.bitrate_kbps)+" kbps")
else:
RNS.log("\tDevice mode:\t\tNormal (host-controlled)")
RNS.log("\tDevice signature:\t"+sigstring)
@ -839,7 +863,9 @@ if __name__ == "__main__":
file.write(str(serialno))
file.close()
info_chunk = ROM.PRODUCT_RNODE+model+hwrev+serial_bytes+time_bytes
info_chunk = b"" + bytes([ROM.PRODUCT_RNODE, model, ord(hwrev)])
info_chunk += serial_bytes
info_chunk += time_bytes
digest = hashes.Hash(hashes.MD5(), backend=default_backend())
digest.update(info_chunk)
checksum = digest.finalize()
@ -849,7 +875,7 @@ if __name__ == "__main__":
key_path = "./firmware/signing.key"
if os.path.isfile(key_path):
try:
file = open(key_path, "r")
file = open(key_path, "rb")
private_bytes = file.read()
file.close()
private_key = serialization.load_der_private_key(
@ -884,31 +910,31 @@ if __name__ == "__main__":
time.sleep(0.006)
rnode.write_eeprom(ROM.ADDR_MODEL, model)
time.sleep(0.006)
rnode.write_eeprom(ROM.ADDR_HW_REV, hwrev)
rnode.write_eeprom(ROM.ADDR_HW_REV, ord(hwrev))
time.sleep(0.006)
rnode.write_eeprom(ROM.ADDR_SERIAL, serial_bytes[0])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+1), serial_bytes[1])
rnode.write_eeprom(ROM.ADDR_SERIAL+1, serial_bytes[1])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+2), serial_bytes[2])
rnode.write_eeprom(ROM.ADDR_SERIAL+2, serial_bytes[2])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+3), serial_bytes[3])
rnode.write_eeprom(ROM.ADDR_SERIAL+3, serial_bytes[3])
time.sleep(0.006)
rnode.write_eeprom(ROM.ADDR_MADE, time_bytes[0])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+1), time_bytes[1])
rnode.write_eeprom(ROM.ADDR_MADE+1, time_bytes[1])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+2), time_bytes[2])
rnode.write_eeprom(ROM.ADDR_MADE+2, time_bytes[2])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+3), time_bytes[3])
rnode.write_eeprom(ROM.ADDR_MADE+3, time_bytes[3])
time.sleep(0.006)
for i in range(0,16):
rnode.write_eeprom(chr(ord(ROM.ADDR_CHKSUM)+i), checksum[i])
rnode.write_eeprom(ROM.ADDR_CHKSUM+i, checksum[i])
time.sleep(0.006)
for i in range(0,128):
rnode.write_eeprom(chr(ord(ROM.ADDR_SIGNATURE)+i), signature[i])
rnode.write_eeprom(ROM.ADDR_SIGNATURE+i, signature[i])
time.sleep(0.006)
rnode.write_eeprom(ROM.ADDR_INFO_LOCK, ROM.INFO_LOCK_BYTE)
@ -918,8 +944,9 @@ if __name__ == "__main__":
if rnode.provisioned:
RNS.log("EEPROM Bootstrapping successful!")
try:
file = open("./firmware/device_db/"+serial_bytes.encode("hex"), "w")
file.write(rnode.eeprom)
file = open("./firmware/device_db/"+serial_bytes.hex(), "wb")
written = file.write(rnode.eeprom)
file.close()
except Exception as e:
RNS.log("WARNING: Could not backup device EEPROM to disk")
exit()
@ -927,8 +954,9 @@ if __name__ == "__main__":
RNS.log("EEPROM was written, but validation failed. Check your settings.")
exit()
except Exception as e:
RNS.log("An error occured while writing EEPROM. The contained exception was:")
RNS.log("An error occurred while writing EEPROM. The contained exception was:")
RNS.log(str(e))
raise e
else:
RNS.log("Invalid data specified, cancelling EEPROM write")