#!/usr/bin/python from time import sleep import argparse import threading import os.path import struct import datetime import time import math import imp rnode = None rnode_serial = None rnode_baudrate = 115200 known_keys = [["unsigned.io", "30819f300d06092a864886f70d010101050003818d0030818902818100e5d46084e445595376bf7efd9c6ccf19d39abbc59afdb763207e4ff68b8d00ebffb63847aa2fe6dd10783d3ea63b55ac66f71ad885c20e223709f0d51ed5c6c0d0b093be9e1d165bb8a483a548b67a3f7a1e4580f50e75b306593fa6067ae259d3e297717bd7ff8c8f5b07f2bed89929a9a0321026cf3699524db98e2d18fb2d020300ff39"]] class RNS(): @staticmethod def log(msg): logtimefmt = "%Y-%m-%d %H:%M:%S" timestamp = time.time() logstring = "["+time.strftime(logtimefmt)+"] "+msg print(logstring) @staticmethod def hexrep(data, delimit=True): delimiter = ":" if not delimit: delimiter = "" hexrep = delimiter.join("{:02x}".format(ord(c)) for c in data) return hexrep @staticmethod def prettyhexrep(data): delimiter = "" hexrep = "<"+delimiter.join("{:02x}".format(ord(c)) for c in data)+">" return hexrep class KISS(): FEND = chr(0xC0) FESC = chr(0xDB) TFEND = chr(0xDC) TFESC = chr(0xDD) CMD_UNKNOWN = chr(0xFE) CMD_DATA = chr(0x00) CMD_FREQUENCY = chr(0x01) CMD_BANDWIDTH = chr(0x02) CMD_TXPOWER = chr(0x03) CMD_SF = chr(0x04) CMD_CR = chr(0x05) CMD_RADIO_STATE = chr(0x06) CMD_RADIO_LOCK = chr(0x07) CMD_DETECT = chr(0x08) CMD_READY = chr(0x0F) CMD_STAT_RX = chr(0x21) CMD_STAT_TX = chr(0x22) CMD_STAT_RSSI = chr(0x23) CMD_BLINK = chr(0x30) CMD_RANDOM = chr(0x40) CMD_FW_VERSION = chr(0x50) CMD_ROM_READ = chr(0x51) CMD_ROM_WRITE = chr(0x52) CMD_CONF_SAVE = chr(0x53) CMD_CONF_DELETE = chr(0x54) DETECT_REQ = chr(0x73) DETECT_RESP = chr(0x46) RADIO_STATE_OFF = chr(0x00) RADIO_STATE_ON = chr(0x01) RADIO_STATE_ASK = chr(0xFF) CMD_ERROR = chr(0x90) ERROR_INITRADIO = chr(0x01) ERROR_TXFAILED = chr(0x02) ERROR_EEPROM_LOCKED = chr(0x03) @staticmethod def escape(data): data = data.replace(chr(0xdb), chr(0xdb)+chr(0xdd)) data = data.replace(chr(0xc0), chr(0xdb)+chr(0xdc)) return data class ROM(): PRODUCT_RNODE = chr(0x03) MODEL_A4 = chr(0xA4) MODEL_A9 = chr(0xA9) ADDR_PRODUCT = chr(0x00) ADDR_MODEL = chr(0x01) ADDR_HW_REV = chr(0x02) ADDR_SERIAL = chr(0x03) ADDR_MADE = chr(0x07) ADDR_CHKSUM = chr(0x0B) ADDR_SIGNATURE = chr(0x1B) ADDR_INFO_LOCK = chr(0x9B) ADDR_CONF_SF = chr(0x9C) ADDR_CONF_CR = chr(0x9D) ADDR_CONF_TXP = chr(0x9E) ADDR_CONF_BW = chr(0x9F) ADDR_CONF_FREQ = chr(0xA3) ADDR_CONF_OK = chr(0xA7) INFO_LOCK_BYTE = chr(0x73) CONF_OK_BYTE = chr(0x73) class RNode(): def __init__(self, serial_instance): self.serial = serial_instance self.timeout = 100 self.r_frequency = None self.r_bandwidth = None self.r_txpower = None self.r_sf = None self.r_state = None self.r_lock = None self.sf = None self.cr = None self.txpower = None self.frequency = None self.bandwidth = None self.detected = None self.eeprom = None self.major_version = None self.minor_version = None self.version = None self.provisioned = None self.product = None self.model = None self.hw_rev = None self.made = None self.serialno = None self.checksum = None self.signature = None self.signature_valid = False self.vendor = None self.configured = None self.conf_sf = None self.conf_cr = None self.conf_txpower = None self.conf_frequency = None self.conf_bandwidth = None def readLoop(self): try: in_frame = False escape = False command = KISS.CMD_UNKNOWN data_buffer = "" command_buffer = "" last_read_ms = int(time.time()*1000) while rnode_serial.is_open: if rnode_serial.in_waiting: byte = rnode_serial.read(1) last_read_ms = int(time.time()*1000) if (in_frame and byte == KISS.FEND and command == KISS.CMD_ROM_READ): self.eeprom = data_buffer #RNS.log("Read "+str(len(self.eeprom))+" bytes of EEPROM from device") #RNS.log(RNS.hexrep(self.eeprom)) in_frame = False data_buffer = "" command_buffer = "" elif (byte == KISS.FEND): in_frame = True command = KISS.CMD_UNKNOWN data_buffer = "" command_buffer = "" elif (in_frame and len(data_buffer) < 512): if (len(data_buffer) == 0 and command == KISS.CMD_UNKNOWN): command = byte elif (command == KISS.CMD_ROM_READ): if (byte == KISS.FESC): escape = True else: if (escape): if (byte == KISS.TFEND): byte = KISS.FEND if (byte == KISS.TFESC): byte = KISS.FESC escape = False data_buffer = data_buffer+byte elif (command == KISS.CMD_FREQUENCY): if (byte == KISS.FESC): escape = True else: if (escape): if (byte == KISS.TFEND): byte = KISS.FEND if (byte == KISS.TFESC): byte = KISS.FESC escape = False command_buffer = command_buffer+byte if (len(command_buffer) == 4): self.r_frequency = ord(command_buffer[0]) << 24 | ord(command_buffer[1]) << 16 | ord(command_buffer[2]) << 8 | ord(command_buffer[3]) RNS.log("Radio reporting frequency is "+str(self.r_frequency/1000000.0)+" MHz") self.updateBitrate() elif (command == KISS.CMD_BANDWIDTH): if (byte == KISS.FESC): escape = True else: if (escape): if (byte == KISS.TFEND): byte = KISS.FEND if (byte == KISS.TFESC): byte = KISS.FESC escape = False command_buffer = command_buffer+byte if (len(command_buffer) == 4): self.r_bandwidth = ord(command_buffer[0]) << 24 | ord(command_buffer[1]) << 16 | ord(command_buffer[2]) << 8 | ord(command_buffer[3]) RNS.log("Radio reporting bandwidth is "+str(self.r_bandwidth/1000.0)+" KHz") self.updateBitrate() elif (command == KISS.CMD_FW_VERSION): if (byte == KISS.FESC): escape = True else: if (escape): if (byte == KISS.TFEND): byte = KISS.FEND if (byte == KISS.TFESC): byte = KISS.FESC escape = False command_buffer = command_buffer+byte if (len(command_buffer) == 2): self.major_version = ord(command_buffer[0]) self.minor_version = ord(command_buffer[1]) self.updateVersion() elif (command == KISS.CMD_TXPOWER): self.r_txpower = ord(byte) RNS.log("Radio reporting TX power is "+str(self.r_txpower)+" dBm") elif (command == KISS.CMD_SF): self.r_sf = ord(byte) RNS.log("Radio reporting spreading factor is "+str(self.r_sf)) self.updateBitrate() elif (command == KISS.CMD_CR): self.r_cr = ord(byte) RNS.log("Radio reporting coding rate is "+str(self.r_cr)) self.updateBitrate() elif (command == KISS.CMD_RADIO_STATE): self.r_state = ord(byte) elif (command == KISS.CMD_RADIO_LOCK): self.r_lock = ord(byte) elif (command == KISS.CMD_ERROR): if (byte == KISS.ERROR_INITRADIO): RNS.log(str(self)+" hardware initialisation error (code "+RNS.hexrep(byte)+")") elif (byte == KISS.ERROR_INITRADIO): RNS.log(str(self)+" hardware TX error (code "+RNS.hexrep(byte)+")") else: RNS.log(str(self)+" hardware error (code "+RNS.hexrep(byte)+")") elif (command == KISS.CMD_DETECT): if byte == KISS.DETECT_RESP: self.detected = True else: self.detected = False else: time_since_last = int(time.time()*1000) - last_read_ms if len(data_buffer) > 0 and time_since_last > self.timeout: RNS.log(str(self)+" serial read timeout") data_buffer = "" in_frame = False command = KISS.CMD_UNKNOWN escape = False sleep(0.08) except Exception as e: exit() def updateBitrate(self): try: self.bitrate = self.sf * ( (4.0/self.cr) / (math.pow(2,self.sf)/(self.bandwidth/1000)) ) * 1000 self.bitrate_kbps = round(self.bitrate/1000.0, 2) except: self.bitrate = 0 def updateVersion(self): minstr = str(self.minor_version) if len(minstr) == 1: minstr = "0"+minstr self.version = str(self.major_version)+"."+minstr def detect(self): kiss_command = KISS.FEND+KISS.CMD_DETECT+KISS.DETECT_REQ+KISS.FEND+KISS.CMD_FW_VERSION+chr(0x00)+KISS.FEND written = rnode_serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while configuring spreading factor for "+self(str)) def initRadio(self): self.setFrequency() self.setBandwidth() self.setTXPower() self.setSpreadingFactor() self.setCodingRate() self.setRadioState(KISS.RADIO_STATE_ON) def setFrequency(self): c1 = self.frequency >> 24 c2 = self.frequency >> 16 & 0xFF c3 = self.frequency >> 8 & 0xFF c4 = self.frequency & 0xFF data = KISS.escape(chr(c1)+chr(c2)+chr(c3)+chr(c4)) kiss_command = KISS.FEND+KISS.CMD_FREQUENCY+data+KISS.FEND written = self.serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while configuring frequency for "+self(str)) def setBandwidth(self): c1 = self.bandwidth >> 24 c2 = self.bandwidth >> 16 & 0xFF c3 = self.bandwidth >> 8 & 0xFF c4 = self.bandwidth & 0xFF data = KISS.escape(chr(c1)+chr(c2)+chr(c3)+chr(c4)) kiss_command = KISS.FEND+KISS.CMD_BANDWIDTH+data+KISS.FEND written = self.serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while configuring bandwidth") def setTXPower(self): txp = chr(self.txpower) kiss_command = KISS.FEND+KISS.CMD_TXPOWER+txp+KISS.FEND written = self.serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while configuring TX power") def setSpreadingFactor(self): sf = chr(self.sf) kiss_command = KISS.FEND+KISS.CMD_SF+sf+KISS.FEND written = self.serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while configuring spreading factor") def setCodingRate(self): cr = chr(self.cr) kiss_command = KISS.FEND+KISS.CMD_CR+cr+KISS.FEND written = self.serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while configuring coding rate") def setRadioState(self, state): kiss_command = KISS.FEND+KISS.CMD_RADIO_STATE+state+KISS.FEND written = self.serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while configuring radio state") def setNormalMode(self): kiss_command = KISS.FEND+KISS.CMD_CONF_DELETE+chr(0x00)+KISS.FEND written = self.serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while configuring device mode") def setTNCMode(self): kiss_command = KISS.FEND+KISS.CMD_CONF_SAVE+chr(0x00)+KISS.FEND written = self.serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while configuring device mode") def write_eeprom(self, addr, byte): kiss_command = KISS.FEND+KISS.CMD_ROM_WRITE+KISS.escape(addr)+KISS.escape(byte)+KISS.FEND written = self.serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while writing EEPROM") def download_eeprom(self): kiss_command = KISS.FEND+KISS.CMD_ROM_READ+chr(0x00)+KISS.FEND written = self.serial.write(kiss_command) if written != len(kiss_command): raise IOError("An IO error occurred while configuring radio state") sleep(0.2) if self.eeprom == None: RNS.log("Could not download EEPROM from device. Is a valid firmware installed?") exit() else: self.parse_eeprom() def parse_eeprom(self): if self.eeprom[ord(ROM.ADDR_INFO_LOCK)] == ROM.INFO_LOCK_BYTE: from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend self.provisioned = True self.product = self.eeprom[ord(ROM.ADDR_PRODUCT)] self.model = self.eeprom[ord(ROM.ADDR_MODEL)] self.hw_rev = self.eeprom[ord(ROM.ADDR_HW_REV)] self.serialno = "" + self.eeprom[ord(ROM.ADDR_SERIAL)] + self.eeprom[ord(ROM.ADDR_SERIAL)+1] + self.eeprom[ord(ROM.ADDR_SERIAL)+2] + self.eeprom[ord(ROM.ADDR_SERIAL)+3] self.made = "" + self.eeprom[ord(ROM.ADDR_MADE)] + self.eeprom[ord(ROM.ADDR_MADE)+1] + self.eeprom[ord(ROM.ADDR_MADE)+2] + self.eeprom[ord(ROM.ADDR_MADE)+3] self.checksum = "" for i in range(0,16): self.checksum = self.checksum+self.eeprom[ord(ROM.ADDR_CHKSUM)+i] self.signature = "" for i in range(0,128): self.signature = self.signature+self.eeprom[ord(ROM.ADDR_SIGNATURE)+i] checksummed_info = self.product+self.model+self.hw_rev+self.serialno+self.made digest = hashes.Hash(hashes.MD5(), backend=default_backend()) digest.update(checksummed_info) checksum = digest.finalize() #RNS.log("EEPROM checksum: "+RNS.hexrep(self.checksum)) #RNS.log("Calculated checksum: "+RNS.hexrep(checksum)) if self.checksum != checksum: self.provisioned = False RNS.log("EEPROM checksum mismatch") exit() else: RNS.log("EEPROM checksum correct") from cryptography.hazmat.primitives.serialization import load_der_public_key from cryptography.hazmat.primitives.serialization import load_der_private_key from cryptography.hazmat.primitives.asymmetric import padding for known in known_keys: vendor = known[0] public_hexrep = known[1] public_bytes = public_hexrep.decode("hex") public_key = load_der_public_key(public_bytes, backend=default_backend()) try: public_key.verify( self.signature, self.checksum, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256()) RNS.log("Board signature validated") self.signature_valid = True self.vendor = vendor except Exception as e: RNS.log("Board signature validation failed") if self.eeprom[ord(ROM.ADDR_CONF_OK)] == ROM.CONF_OK_BYTE: self.configured = True self.conf_sf = ord(self.eeprom[ord(ROM.ADDR_CONF_SF)]) self.conf_cr = ord(self.eeprom[ord(ROM.ADDR_CONF_CR)]) self.conf_txpower = ord(self.eeprom[ord(ROM.ADDR_CONF_TXP)]) self.conf_frequency = ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)]) << 24 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+1]) << 16 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+2]) << 8 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+3]) self.conf_bandwidth = ord(self.eeprom[ord(ROM.ADDR_CONF_BW)]) << 24 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+1]) << 16 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+2]) << 8 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+3]) else: self.configured = False else: self.provisioned = False # TODO: remove this def test(self): self.frequency = 868000000 self.bandwidth = 125000 self.txpower = 2 self.sf = 7 self.cr = 5 self.initRadio() self.setRadioState(KISS.RADIO_STATE_OFF) def device_probe(): sleep(2.5) rnode.detect() sleep(0.1) if rnode.detected == True: RNS.log("Device connected") RNS.log("Firmware version: "+rnode.version) return True else: raise IOError("Got invalid response while detecting device") def config_interface(): pass if __name__ == "__main__": try: imp.find_module("serial") except ImportError: print("") print("RNode Config Utility needs pyserial to work.") print("You can install it with: pip install pyserial") print("") exit() try: imp.find_module("cryptography") except ImportError: print("") print("RNode Config Utility needs the cryptography module to work.") print("You can install it with: pip install cryptography") print("") exit() import serial try: parser = argparse.ArgumentParser(description="RNode Configuration and firmware utility. This program allows you to change various settings and startup modes of RNode. It can also flash and update the firmware, and manage device EEPROM.") parser.add_argument("-i", "--info", action="store_true", help="Show device info") parser.add_argument("-T", "--tnc", action="store_true", help="Switch device to TNC mode") parser.add_argument("-N", "--normal", action="store_true", help="Switch device to normal mode") parser.add_argument("-b", "--backup", action="store_true", help="Backup EEPROM to file") parser.add_argument("-d", "--dump", action="store_true", help="Dump EEPROM to console") parser.add_argument("-f", "--flash", action="store_true", help="Flash firmware and bootstrap EEPROM") parser.add_argument("-r", "--rom", action="store_true", help="Bootstrap EEPROM without flashing firmware") parser.add_argument("-u", "--update", action="store_true", help="Update firmware") parser.add_argument("-k", "--key", action="store_true", help="Generate a new signing key and exit") parser.add_argument("-p", "--public", action="store_true", help="Display public part of signing key") parser.add_argument("--model", action="store", metavar="model", type=str, default=None, help="Model code for EEPROM bootstrap") parser.add_argument("--hwrev", action="store", metavar="revision", type=int, default=None, help="Hardware revision EEPROM bootstrap") parser.add_argument("--freq", action="store", metavar="Hz", type=int, default=None, help="Frequency in Hz for TNC mode") parser.add_argument("--bw", action="store", metavar="Hz", type=int, default=None, help="Bandwidth in Hz for TNC mode") parser.add_argument("--txp", action="store", metavar="dBm", type=int, default=None, help="TX power in dBm for TNC mode") parser.add_argument("--sf", action="store", metavar="factor", type=int, default=None, help="Spreading factor for TNC mode") parser.add_argument("--cr", action="store", metavar="rate", type=int, default=None, help="Coding rate for TNC mode") parser.add_argument("port", nargs="?", default=None, help="serial port where RNode is attached", type=str) args = parser.parse_args() if args.public or args.key or args.flash or args.rom: from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.serialization import load_der_public_key from cryptography.hazmat.primitives.serialization import load_der_private_key from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import padding if args.public: private_bytes = None try: file = open("./firmware/signing.key", "r") private_bytes = file.read() file.close() except Exception as e: RNS.log("Could not load signing key") try: private_key = serialization.load_der_private_key( private_bytes, password=None, backend=default_backend() ) public_key = private_key.public_key() public_bytes = public_key.public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) RNS.log("Public key:") RNS.log(RNS.hexrep(public_bytes, delimit=False)) except Exception as e: RNS.log("Could not deserialize signing key") RNS.log(str(e)) exit() if args.key: RNS.log("Generating a new signing key...") private_key = rsa.generate_private_key( public_exponent=65337, key_size=1024, backend=default_backend() ) private_bytes = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) public_key = private_key.public_key() public_bytes = public_key.public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) if os.path.isdir("./firmware"): if os.path.isfile("./firmware/signing.key"): RNS.log("Signing key already exists, not overwriting!") RNS.log("Manually delete this key to create a new one.") else: file = open("./firmware/signing.key", "w") file.write(private_bytes) file.close() RNS.log("Wrote signing key") RNS.log("Public key:") RNS.log(RNS.hexrep(public_bytes, delimit=False)) else: RNS.log("The firmware directory does not exist, can't write key!") exit() if args.port: if args.flash: if os.path.isfile("./firmware/firmware.hex"): try: RNS.log("Flashing RNode firmware to device on "+args.port) from subprocess import call flash_status = call(["avrdude", "-P", args.port, "-p", "m1284p", "-c", "arduino", "-b", "115200", "-U", "flash:w:firmware/firmware.hex"]) if flash_status == 0: RNS.log("Done flashing") args.rom = True else: exit() except Exception as e: RNS.log("Error while flashing") RNS.log(str(e)) else: RNS.log("Firmware file not found") exit() RNS.log("Opening serial port "+args.port+"...") try: rnode_serial = serial.Serial( port = args.port, baudrate = rnode_baudrate, bytesize = 8, parity = serial.PARITY_NONE, stopbits = 1, xonxoff = False, rtscts = False, timeout = 0, inter_byte_timeout = None, write_timeout = None, dsrdtr = False ) except Exception as e: RNS.log("Could not open the specified serial port. The contained exception was:") RNS.log(str(e)) exit() rnode = RNode(rnode_serial) thread = threading.Thread(target=rnode.readLoop) thread.setDaemon(True) thread.start() try: device_probe() except Exception as e: RNS.log("Serial port opened, but RNode did not respond. Is a valid firmware installed?") print(e) exit() RNS.log("Reading EEPROM...") rnode.download_eeprom() if args.dump: RNS.log("EEPROM contents:") RNS.log(RNS.hexrep(rnode.eeprom)) exit() if args.backup: try: timestamp = time.time() filename = str(time.strftime("%Y-%m-%d_%H-%M-%S")) path = "./eeprom/"+filename+".eeprom" file = open(path, "w") file.write(rnode.eeprom) file.close() RNS.log("EEPROM backup written to: "+path) except Exception as e: RNS.log("EEPROM was successfully downloaded from device,") RNS.log("but file could not be written to disk.") exit() if args.info: if rnode.provisioned: timestamp = struct.unpack(">I", rnode.made)[0] timestring = datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") sigstring = "Unverified" if rnode.signature_valid: sigstring = "Genuine board, vendor is "+rnode.vendor RNS.log("") RNS.log("Board info:") RNS.log("\tFirmware version:\t"+rnode.version) RNS.log("\tProduct code:\t\t"+RNS.hexrep(rnode.product)) RNS.log("\tModel code:\t\t"+RNS.hexrep(rnode.model)) RNS.log("\tHardware revision:\t"+RNS.hexrep(rnode.hw_rev)) RNS.log("\tSerial number:\t\t"+RNS.hexrep(rnode.serialno)) RNS.log("\tManufactured:\t\t"+timestring) if rnode.configured: rnode.bandwidth = rnode.conf_bandwidth rnode.sf = rnode.conf_sf rnode.cr = rnode.conf_cr rnode.updateBitrate() RNS.log("\tDevice signature:\t"+sigstring) RNS.log(""); RNS.log("\tDevice mode:\t\tTNC") RNS.log("\t Frequency:\t\t"+str((rnode.conf_frequency/1000000.0))+" MHz") RNS.log("\t Bandwidth:\t\t"+str(rnode.conf_bandwidth/1000.0)+" KHz") RNS.log("\t TX power:\t\t"+str(rnode.conf_txpower)+" dBm") RNS.log("\t Spreading factor:\t"+str(rnode.conf_sf)) RNS.log("\t Coding rate:\t\t"+str(rnode.conf_cr)) RNS.log("\t On-air bitrate:\t"+str(rnode.bitrate_kbps)+" kbps") else: RNS.log("\tDevice mode:\t\tNormal (host-controlled)") RNS.log("\tDevice signature:\t"+sigstring) print("") exit() else: RNS.log("EEPROM is invalid, no further information available") exit() if args.rom: if rnode.provisioned: RNS.log("EEPROM bootstrap was requested, but a valid EEPROM was already present.") RNS.log("No changes are being made.") exit() else: counter = None counter_path = "./firmware/serial.counter" try: if os.path.isfile(counter_path): file = open(counter_path, "r") counter_str = file.read() counter = int(counter_str) file.close() else: counter = 0 except Exception as e: RNS.log("Could not create device serial number, exiting") RNS.log(str(e)) exit() serialno = counter+1 model = None hwrev = None if args.model == "a4": model = ROM.MODEL_A4 if args.model == "a9": model = ROM.MODEL_A9 if args.hwrev > 0 and args.hwrev < 256: hwrev = chr(args.hwrev) if serialno > 0 and model != None and hwrev != None: try: from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend timestamp = int(time.time()) time_bytes = struct.pack(">I", timestamp) serial_bytes = struct.pack(">I", serialno) file = open(counter_path, "w") file.write(str(serialno)) file.close() info_chunk = ROM.PRODUCT_RNODE+model+hwrev+serial_bytes+time_bytes digest = hashes.Hash(hashes.MD5(), backend=default_backend()) digest.update(info_chunk) checksum = digest.finalize() RNS.log("Loading signing key...") signature = None key_path = "./firmware/signing.key" if os.path.isfile(key_path): try: file = open(key_path, "r") private_bytes = file.read() file.close() private_key = serialization.load_der_private_key( private_bytes, password=None, backend=default_backend() ) public_key = private_key.public_key() public_bytes = public_key.public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) signature = private_key.sign( checksum, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) except Exception as e: RNS.log("Error while signing EEPROM") RNS.log(str(e)) else: RNS.log("No signing key found") exit() RNS.log("Bootstrapping device EEPROM...") rnode.write_eeprom(ROM.ADDR_PRODUCT, ROM.PRODUCT_RNODE) time.sleep(0.006) rnode.write_eeprom(ROM.ADDR_MODEL, model) time.sleep(0.006) rnode.write_eeprom(ROM.ADDR_HW_REV, hwrev) time.sleep(0.006) rnode.write_eeprom(ROM.ADDR_SERIAL, serial_bytes[0]) time.sleep(0.006) rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+1), serial_bytes[1]) time.sleep(0.006) rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+2), serial_bytes[2]) time.sleep(0.006) rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+3), serial_bytes[3]) time.sleep(0.006) rnode.write_eeprom(ROM.ADDR_MADE, time_bytes[0]) time.sleep(0.006) rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+1), time_bytes[1]) time.sleep(0.006) rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+2), time_bytes[2]) time.sleep(0.006) rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+3), time_bytes[3]) time.sleep(0.006) for i in range(0,16): rnode.write_eeprom(chr(ord(ROM.ADDR_CHKSUM)+i), checksum[i]) time.sleep(0.006) for i in range(0,128): rnode.write_eeprom(chr(ord(ROM.ADDR_SIGNATURE)+i), signature[i]) time.sleep(0.006) rnode.write_eeprom(ROM.ADDR_INFO_LOCK, ROM.INFO_LOCK_BYTE) RNS.log("EEPROM written! Validating...") rnode.download_eeprom() if rnode.provisioned: RNS.log("EEPROM Bootstrapping successful!") exit() else: RNS.log("EEPROM was written, but validation failed. Check your settings.") exit() except Exception as e: RNS.log("An error occured while writing EEPROM. The contained exception was:") RNS.log(str(e)) else: RNS.log("Invalid data specified, cancelling EEPROM write") exit() if rnode.provisioned: if args.normal: rnode.setNormalMode() RNS.log("Device set to normal (host-controlled) operating mode") exit() if args.tnc: if not (args.freq and args.bw and args.txp and args.sf and args.cr): RNS.log("Please input startup configuration:") print("") if args.freq: rnode.frequency = args.freq else: print "Frequency in Hz:\t", rnode.frequency = int(raw_input()) if args.bw: rnode.bandwidth = args.bw else: print "Bandwidth in Hz:\t", rnode.bandwidth = int(raw_input()) if args.txp: rnode.txpower = args.txp else: print "TX Power in dBm:\t", rnode.txpower = int(raw_input()) if args.sf: rnode.sf = args.sf else: print "Spreading factor:\t", rnode.sf = int(raw_input()) if args.cr: rnode.cr = args.cr else: print "Coding rate:\t\t", rnode.cr = int(raw_input()) print("") rnode.initRadio() sleep(0.5) rnode.setTNCMode() RNS.log("Device set to TNC operating mode") sleep(1.0) exit() config_interface() else: RNS.log("This device contains a valid firmware, but EEPROM is invalid.") RNS.log("Probably the device has not been initialised, or the EEPROM has been erased.") RNS.log("Please correctly initialise the device and try again!") else: print("") parser.print_help() print("") exit() except KeyboardInterrupt: print("") exit()