rnodeconfigutil/rnodeconf

942 lines
30 KiB
Python
Executable File

#!/usr/bin/python
from time import sleep
import argparse
import threading
import os.path
import struct
import datetime
import time
import math
import imp
rnode = None
rnode_serial = None
rnode_baudrate = 115200
known_keys = [["unsigned.io", "30819f300d06092a864886f70d010101050003818d0030818902818100e5d46084e445595376bf7efd9c6ccf19d39abbc59afdb763207e4ff68b8d00ebffb63847aa2fe6dd10783d3ea63b55ac66f71ad885c20e223709f0d51ed5c6c0d0b093be9e1d165bb8a483a548b67a3f7a1e4580f50e75b306593fa6067ae259d3e297717bd7ff8c8f5b07f2bed89929a9a0321026cf3699524db98e2d18fb2d020300ff39"]]
ranges = { 0xA4: [410000000, 525000000, 14], 0xA9: [820000000, 1020000000, 17] }
class RNS():
@staticmethod
def log(msg):
logtimefmt = "%Y-%m-%d %H:%M:%S"
timestamp = time.time()
logstring = "["+time.strftime(logtimefmt)+"] "+msg
print(logstring)
@staticmethod
def hexrep(data, delimit=True):
delimiter = ":"
if not delimit:
delimiter = ""
hexrep = delimiter.join("{:02x}".format(ord(c)) for c in data)
return hexrep
@staticmethod
def prettyhexrep(data):
delimiter = ""
hexrep = "<"+delimiter.join("{:02x}".format(ord(c)) for c in data)+">"
return hexrep
class KISS():
FEND = chr(0xC0)
FESC = chr(0xDB)
TFEND = chr(0xDC)
TFESC = chr(0xDD)
CMD_UNKNOWN = chr(0xFE)
CMD_DATA = chr(0x00)
CMD_FREQUENCY = chr(0x01)
CMD_BANDWIDTH = chr(0x02)
CMD_TXPOWER = chr(0x03)
CMD_SF = chr(0x04)
CMD_CR = chr(0x05)
CMD_RADIO_STATE = chr(0x06)
CMD_RADIO_LOCK = chr(0x07)
CMD_DETECT = chr(0x08)
CMD_READY = chr(0x0F)
CMD_STAT_RX = chr(0x21)
CMD_STAT_TX = chr(0x22)
CMD_STAT_RSSI = chr(0x23)
CMD_BLINK = chr(0x30)
CMD_RANDOM = chr(0x40)
CMD_FW_VERSION = chr(0x50)
CMD_ROM_READ = chr(0x51)
CMD_ROM_WRITE = chr(0x52)
CMD_CONF_SAVE = chr(0x53)
CMD_CONF_DELETE = chr(0x54)
DETECT_REQ = chr(0x73)
DETECT_RESP = chr(0x46)
RADIO_STATE_OFF = chr(0x00)
RADIO_STATE_ON = chr(0x01)
RADIO_STATE_ASK = chr(0xFF)
CMD_ERROR = chr(0x90)
ERROR_INITRADIO = chr(0x01)
ERROR_TXFAILED = chr(0x02)
ERROR_EEPROM_LOCKED = chr(0x03)
@staticmethod
def escape(data):
data = data.replace(chr(0xdb), chr(0xdb)+chr(0xdd))
data = data.replace(chr(0xc0), chr(0xdb)+chr(0xdc))
return data
class ROM():
PRODUCT_RNODE = chr(0x03)
MODEL_A4 = chr(0xA4)
MODEL_A9 = chr(0xA9)
ADDR_PRODUCT = chr(0x00)
ADDR_MODEL = chr(0x01)
ADDR_HW_REV = chr(0x02)
ADDR_SERIAL = chr(0x03)
ADDR_MADE = chr(0x07)
ADDR_CHKSUM = chr(0x0B)
ADDR_SIGNATURE = chr(0x1B)
ADDR_INFO_LOCK = chr(0x9B)
ADDR_CONF_SF = chr(0x9C)
ADDR_CONF_CR = chr(0x9D)
ADDR_CONF_TXP = chr(0x9E)
ADDR_CONF_BW = chr(0x9F)
ADDR_CONF_FREQ = chr(0xA3)
ADDR_CONF_OK = chr(0xA7)
INFO_LOCK_BYTE = chr(0x73)
CONF_OK_BYTE = chr(0x73)
class RNode():
def __init__(self, serial_instance):
self.serial = serial_instance
self.timeout = 100
self.r_frequency = None
self.r_bandwidth = None
self.r_txpower = None
self.r_sf = None
self.r_state = None
self.r_lock = None
self.sf = None
self.cr = None
self.txpower = None
self.frequency = None
self.bandwidth = None
self.detected = None
self.eeprom = None
self.major_version = None
self.minor_version = None
self.version = None
self.provisioned = None
self.product = None
self.model = None
self.hw_rev = None
self.made = None
self.serialno = None
self.checksum = None
self.signature = None
self.signature_valid = False
self.vendor = None
self.min_freq = None
self.max_freq = None
self.max_output = None
self.configured = None
self.conf_sf = None
self.conf_cr = None
self.conf_txpower = None
self.conf_frequency = None
self.conf_bandwidth = None
def readLoop(self):
try:
in_frame = False
escape = False
command = KISS.CMD_UNKNOWN
data_buffer = ""
command_buffer = ""
last_read_ms = int(time.time()*1000)
while rnode_serial.is_open:
if rnode_serial.in_waiting:
byte = rnode_serial.read(1)
last_read_ms = int(time.time()*1000)
if (in_frame and byte == KISS.FEND and command == KISS.CMD_ROM_READ):
self.eeprom = data_buffer
#RNS.log("Read "+str(len(self.eeprom))+" bytes of EEPROM from device")
#RNS.log(RNS.hexrep(self.eeprom))
in_frame = False
data_buffer = ""
command_buffer = ""
elif (byte == KISS.FEND):
in_frame = True
command = KISS.CMD_UNKNOWN
data_buffer = ""
command_buffer = ""
elif (in_frame and len(data_buffer) < 512):
if (len(data_buffer) == 0 and command == KISS.CMD_UNKNOWN):
command = byte
elif (command == KISS.CMD_ROM_READ):
if (byte == KISS.FESC):
escape = True
else:
if (escape):
if (byte == KISS.TFEND):
byte = KISS.FEND
if (byte == KISS.TFESC):
byte = KISS.FESC
escape = False
data_buffer = data_buffer+byte
elif (command == KISS.CMD_FREQUENCY):
if (byte == KISS.FESC):
escape = True
else:
if (escape):
if (byte == KISS.TFEND):
byte = KISS.FEND
if (byte == KISS.TFESC):
byte = KISS.FESC
escape = False
command_buffer = command_buffer+byte
if (len(command_buffer) == 4):
self.r_frequency = ord(command_buffer[0]) << 24 | ord(command_buffer[1]) << 16 | ord(command_buffer[2]) << 8 | ord(command_buffer[3])
RNS.log("Radio reporting frequency is "+str(self.r_frequency/1000000.0)+" MHz")
self.updateBitrate()
elif (command == KISS.CMD_BANDWIDTH):
if (byte == KISS.FESC):
escape = True
else:
if (escape):
if (byte == KISS.TFEND):
byte = KISS.FEND
if (byte == KISS.TFESC):
byte = KISS.FESC
escape = False
command_buffer = command_buffer+byte
if (len(command_buffer) == 4):
self.r_bandwidth = ord(command_buffer[0]) << 24 | ord(command_buffer[1]) << 16 | ord(command_buffer[2]) << 8 | ord(command_buffer[3])
RNS.log("Radio reporting bandwidth is "+str(self.r_bandwidth/1000.0)+" KHz")
self.updateBitrate()
elif (command == KISS.CMD_FW_VERSION):
if (byte == KISS.FESC):
escape = True
else:
if (escape):
if (byte == KISS.TFEND):
byte = KISS.FEND
if (byte == KISS.TFESC):
byte = KISS.FESC
escape = False
command_buffer = command_buffer+byte
if (len(command_buffer) == 2):
self.major_version = ord(command_buffer[0])
self.minor_version = ord(command_buffer[1])
self.updateVersion()
elif (command == KISS.CMD_TXPOWER):
self.r_txpower = ord(byte)
RNS.log("Radio reporting TX power is "+str(self.r_txpower)+" dBm")
elif (command == KISS.CMD_SF):
self.r_sf = ord(byte)
RNS.log("Radio reporting spreading factor is "+str(self.r_sf))
self.updateBitrate()
elif (command == KISS.CMD_CR):
self.r_cr = ord(byte)
RNS.log("Radio reporting coding rate is "+str(self.r_cr))
self.updateBitrate()
elif (command == KISS.CMD_RADIO_STATE):
self.r_state = ord(byte)
elif (command == KISS.CMD_RADIO_LOCK):
self.r_lock = ord(byte)
elif (command == KISS.CMD_ERROR):
if (byte == KISS.ERROR_INITRADIO):
RNS.log(str(self)+" hardware initialisation error (code "+RNS.hexrep(byte)+")")
elif (byte == KISS.ERROR_INITRADIO):
RNS.log(str(self)+" hardware TX error (code "+RNS.hexrep(byte)+")")
else:
RNS.log(str(self)+" hardware error (code "+RNS.hexrep(byte)+")")
elif (command == KISS.CMD_DETECT):
if byte == KISS.DETECT_RESP:
self.detected = True
else:
self.detected = False
else:
time_since_last = int(time.time()*1000) - last_read_ms
if len(data_buffer) > 0 and time_since_last > self.timeout:
RNS.log(str(self)+" serial read timeout")
data_buffer = ""
in_frame = False
command = KISS.CMD_UNKNOWN
escape = False
sleep(0.08)
except Exception as e:
exit()
def updateBitrate(self):
try:
self.bitrate = self.sf * ( (4.0/self.cr) / (math.pow(2,self.sf)/(self.bandwidth/1000)) ) * 1000
self.bitrate_kbps = round(self.bitrate/1000.0, 2)
except:
self.bitrate = 0
def updateVersion(self):
minstr = str(self.minor_version)
if len(minstr) == 1:
minstr = "0"+minstr
self.version = str(self.major_version)+"."+minstr
def detect(self):
kiss_command = KISS.FEND+KISS.CMD_DETECT+KISS.DETECT_REQ+KISS.FEND+KISS.CMD_FW_VERSION+chr(0x00)+KISS.FEND
written = rnode_serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while configuring spreading factor for "+self(str))
def initRadio(self):
self.setFrequency()
self.setBandwidth()
self.setTXPower()
self.setSpreadingFactor()
self.setCodingRate()
self.setRadioState(KISS.RADIO_STATE_ON)
def setFrequency(self):
c1 = self.frequency >> 24
c2 = self.frequency >> 16 & 0xFF
c3 = self.frequency >> 8 & 0xFF
c4 = self.frequency & 0xFF
data = KISS.escape(chr(c1)+chr(c2)+chr(c3)+chr(c4))
kiss_command = KISS.FEND+KISS.CMD_FREQUENCY+data+KISS.FEND
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while configuring frequency for "+self(str))
def setBandwidth(self):
c1 = self.bandwidth >> 24
c2 = self.bandwidth >> 16 & 0xFF
c3 = self.bandwidth >> 8 & 0xFF
c4 = self.bandwidth & 0xFF
data = KISS.escape(chr(c1)+chr(c2)+chr(c3)+chr(c4))
kiss_command = KISS.FEND+KISS.CMD_BANDWIDTH+data+KISS.FEND
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while configuring bandwidth")
def setTXPower(self):
txp = chr(self.txpower)
kiss_command = KISS.FEND+KISS.CMD_TXPOWER+txp+KISS.FEND
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while configuring TX power")
def setSpreadingFactor(self):
sf = chr(self.sf)
kiss_command = KISS.FEND+KISS.CMD_SF+sf+KISS.FEND
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while configuring spreading factor")
def setCodingRate(self):
cr = chr(self.cr)
kiss_command = KISS.FEND+KISS.CMD_CR+cr+KISS.FEND
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while configuring coding rate")
def setRadioState(self, state):
kiss_command = KISS.FEND+KISS.CMD_RADIO_STATE+state+KISS.FEND
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while configuring radio state")
def setNormalMode(self):
kiss_command = KISS.FEND+KISS.CMD_CONF_DELETE+chr(0x00)+KISS.FEND
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while configuring device mode")
def setTNCMode(self):
kiss_command = KISS.FEND+KISS.CMD_CONF_SAVE+chr(0x00)+KISS.FEND
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while configuring device mode")
def write_eeprom(self, addr, byte):
kiss_command = KISS.FEND+KISS.CMD_ROM_WRITE+KISS.escape(addr)+KISS.escape(byte)+KISS.FEND
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while writing EEPROM")
def download_eeprom(self):
kiss_command = KISS.FEND+KISS.CMD_ROM_READ+chr(0x00)+KISS.FEND
written = self.serial.write(kiss_command)
if written != len(kiss_command):
raise IOError("An IO error occurred while configuring radio state")
sleep(0.2)
if self.eeprom == None:
RNS.log("Could not download EEPROM from device. Is a valid firmware installed?")
exit()
else:
self.parse_eeprom()
def parse_eeprom(self):
if self.eeprom[ord(ROM.ADDR_INFO_LOCK)] == ROM.INFO_LOCK_BYTE:
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
self.provisioned = True
self.product = self.eeprom[ord(ROM.ADDR_PRODUCT)]
self.model = self.eeprom[ord(ROM.ADDR_MODEL)]
self.hw_rev = self.eeprom[ord(ROM.ADDR_HW_REV)]
self.serialno = "" + self.eeprom[ord(ROM.ADDR_SERIAL)] + self.eeprom[ord(ROM.ADDR_SERIAL)+1] + self.eeprom[ord(ROM.ADDR_SERIAL)+2] + self.eeprom[ord(ROM.ADDR_SERIAL)+3]
self.made = "" + self.eeprom[ord(ROM.ADDR_MADE)] + self.eeprom[ord(ROM.ADDR_MADE)+1] + self.eeprom[ord(ROM.ADDR_MADE)+2] + self.eeprom[ord(ROM.ADDR_MADE)+3]
self.checksum = ""
self.min_freq = ranges[ord(self.model)][0]
self.max_freq = ranges[ord(self.model)][1]
self.max_output = ranges[ord(self.model)][2]
try:
self.min_freq = ranges[ord(self.model)][0]
self.max_freq = ranges[ord(self.model)][1]
self.max_output = ranges[ord(self.model)][2]
except Exception as e:
RNS.log("Exception")
RNS.log(str(e))
self.min_freq = 0
self.max_freq = 0
self.max_output = 0
for i in range(0,16):
self.checksum = self.checksum+self.eeprom[ord(ROM.ADDR_CHKSUM)+i]
self.signature = ""
for i in range(0,128):
self.signature = self.signature+self.eeprom[ord(ROM.ADDR_SIGNATURE)+i]
checksummed_info = self.product+self.model+self.hw_rev+self.serialno+self.made
digest = hashes.Hash(hashes.MD5(), backend=default_backend())
digest.update(checksummed_info)
checksum = digest.finalize()
#RNS.log("EEPROM checksum: "+RNS.hexrep(self.checksum))
#RNS.log("Calculated checksum: "+RNS.hexrep(checksum))
if self.checksum != checksum:
self.provisioned = False
RNS.log("EEPROM checksum mismatch")
exit()
else:
RNS.log("EEPROM checksum correct")
from cryptography.hazmat.primitives.serialization import load_der_public_key
from cryptography.hazmat.primitives.serialization import load_der_private_key
from cryptography.hazmat.primitives.asymmetric import padding
for known in known_keys:
vendor = known[0]
public_hexrep = known[1]
public_bytes = public_hexrep.decode("hex")
public_key = load_der_public_key(public_bytes, backend=default_backend())
try:
public_key.verify(
self.signature,
self.checksum,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256())
RNS.log("Board signature validated")
self.signature_valid = True
self.vendor = vendor
except Exception as e:
RNS.log("Board signature validation failed")
if self.eeprom[ord(ROM.ADDR_CONF_OK)] == ROM.CONF_OK_BYTE:
self.configured = True
self.conf_sf = ord(self.eeprom[ord(ROM.ADDR_CONF_SF)])
self.conf_cr = ord(self.eeprom[ord(ROM.ADDR_CONF_CR)])
self.conf_txpower = ord(self.eeprom[ord(ROM.ADDR_CONF_TXP)])
self.conf_frequency = ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)]) << 24 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+1]) << 16 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+2]) << 8 | ord(self.eeprom[ord(ROM.ADDR_CONF_FREQ)+3])
self.conf_bandwidth = ord(self.eeprom[ord(ROM.ADDR_CONF_BW)]) << 24 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+1]) << 16 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+2]) << 8 | ord(self.eeprom[ord(ROM.ADDR_CONF_BW)+3])
else:
self.configured = False
else:
self.provisioned = False
# TODO: remove this
def test(self):
self.frequency = 868000000
self.bandwidth = 125000
self.txpower = 2
self.sf = 7
self.cr = 5
self.initRadio()
self.setRadioState(KISS.RADIO_STATE_OFF)
def device_probe():
sleep(2.5)
rnode.detect()
sleep(0.1)
if rnode.detected == True:
RNS.log("Device connected")
RNS.log("Firmware version: "+rnode.version)
return True
else:
raise IOError("Got invalid response while detecting device")
def config_interface():
pass
if __name__ == "__main__":
try:
imp.find_module("serial")
except ImportError:
print("")
print("RNode Config Utility needs pyserial to work.")
print("You can install it with: pip install pyserial")
print("")
exit()
try:
imp.find_module("cryptography")
except ImportError:
print("")
print("RNode Config Utility needs the cryptography module to work.")
print("You can install it with: pip install cryptography")
print("")
exit()
import serial
try:
parser = argparse.ArgumentParser(description="RNode Configuration and firmware utility. This program allows you to change various settings and startup modes of RNode. It can also flash and update the firmware, and manage device EEPROM.")
parser.add_argument("-i", "--info", action="store_true", help="Show device info")
parser.add_argument("-T", "--tnc", action="store_true", help="Switch device to TNC mode")
parser.add_argument("-N", "--normal", action="store_true", help="Switch device to normal mode")
parser.add_argument("-b", "--backup", action="store_true", help="Backup EEPROM to file")
parser.add_argument("-d", "--dump", action="store_true", help="Dump EEPROM to console")
parser.add_argument("-f", "--flash", action="store_true", help="Flash firmware and bootstrap EEPROM")
parser.add_argument("-r", "--rom", action="store_true", help="Bootstrap EEPROM without flashing firmware")
parser.add_argument("-u", "--update", action="store_true", help="Update firmware")
parser.add_argument("-k", "--key", action="store_true", help="Generate a new signing key and exit")
parser.add_argument("-p", "--public", action="store_true", help="Display public part of signing key")
parser.add_argument("--model", action="store", metavar="model", type=str, default=None, help="Model code for EEPROM bootstrap")
parser.add_argument("--hwrev", action="store", metavar="revision", type=int, default=None, help="Hardware revision EEPROM bootstrap")
parser.add_argument("--freq", action="store", metavar="Hz", type=int, default=None, help="Frequency in Hz for TNC mode")
parser.add_argument("--bw", action="store", metavar="Hz", type=int, default=None, help="Bandwidth in Hz for TNC mode")
parser.add_argument("--txp", action="store", metavar="dBm", type=int, default=None, help="TX power in dBm for TNC mode")
parser.add_argument("--sf", action="store", metavar="factor", type=int, default=None, help="Spreading factor for TNC mode")
parser.add_argument("--cr", action="store", metavar="rate", type=int, default=None, help="Coding rate for TNC mode")
parser.add_argument("port", nargs="?", default=None, help="serial port where RNode is attached", type=str)
args = parser.parse_args()
if args.public or args.key or args.flash or args.rom:
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.serialization import load_der_public_key
from cryptography.hazmat.primitives.serialization import load_der_private_key
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import padding
if args.public:
private_bytes = None
try:
file = open("./firmware/signing.key", "r")
private_bytes = file.read()
file.close()
except Exception as e:
RNS.log("Could not load signing key")
try:
private_key = serialization.load_der_private_key(
private_bytes,
password=None,
backend=default_backend()
)
public_key = private_key.public_key()
public_bytes = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
RNS.log("Public key:")
RNS.log(RNS.hexrep(public_bytes, delimit=False))
except Exception as e:
RNS.log("Could not deserialize signing key")
RNS.log(str(e))
exit()
if args.key:
RNS.log("Generating a new signing key...")
private_key = rsa.generate_private_key(
public_exponent=65337,
key_size=1024,
backend=default_backend()
)
private_bytes = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_key = private_key.public_key()
public_bytes = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
if os.path.isdir("./firmware"):
if os.path.isfile("./firmware/signing.key"):
RNS.log("Signing key already exists, not overwriting!")
RNS.log("Manually delete this key to create a new one.")
else:
file = open("./firmware/signing.key", "w")
file.write(private_bytes)
file.close()
RNS.log("Wrote signing key")
RNS.log("Public key:")
RNS.log(RNS.hexrep(public_bytes, delimit=False))
else:
RNS.log("The firmware directory does not exist, can't write key!")
exit()
if args.port:
if args.flash:
if os.path.isfile("./firmware/firmware.hex"):
try:
RNS.log("Flashing RNode firmware to device on "+args.port)
from subprocess import call
flash_status = call(["avrdude", "-P", args.port, "-p", "m1284p", "-c", "arduino", "-b", "115200", "-U", "flash:w:firmware/firmware.hex"])
if flash_status == 0:
RNS.log("Done flashing")
args.rom = True
else:
exit()
except Exception as e:
RNS.log("Error while flashing")
RNS.log(str(e))
else:
RNS.log("Firmware file not found")
exit()
RNS.log("Opening serial port "+args.port+"...")
try:
rnode_serial = serial.Serial(
port = args.port,
baudrate = rnode_baudrate,
bytesize = 8,
parity = serial.PARITY_NONE,
stopbits = 1,
xonxoff = False,
rtscts = False,
timeout = 0,
inter_byte_timeout = None,
write_timeout = None,
dsrdtr = False
)
except Exception as e:
RNS.log("Could not open the specified serial port. The contained exception was:")
RNS.log(str(e))
exit()
rnode = RNode(rnode_serial)
thread = threading.Thread(target=rnode.readLoop)
thread.setDaemon(True)
thread.start()
try:
device_probe()
except Exception as e:
RNS.log("Serial port opened, but RNode did not respond. Is a valid firmware installed?")
print(e)
exit()
RNS.log("Reading EEPROM...")
rnode.download_eeprom()
if args.dump:
RNS.log("EEPROM contents:")
RNS.log(RNS.hexrep(rnode.eeprom))
exit()
if args.backup:
try:
timestamp = time.time()
filename = str(time.strftime("%Y-%m-%d_%H-%M-%S"))
path = "./eeprom/"+filename+".eeprom"
file = open(path, "w")
file.write(rnode.eeprom)
file.close()
RNS.log("EEPROM backup written to: "+path)
except Exception as e:
RNS.log("EEPROM was successfully downloaded from device,")
RNS.log("but file could not be written to disk.")
exit()
if args.info:
if rnode.provisioned:
timestamp = struct.unpack(">I", rnode.made)[0]
timestring = datetime.datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
sigstring = "Unverified"
if rnode.signature_valid:
sigstring = "Genuine board, vendor is "+rnode.vendor
RNS.log("")
RNS.log("Board info:")
RNS.log("\tFirmware version:\t"+rnode.version)
RNS.log("\tProduct code:\t\t"+RNS.hexrep(rnode.product))
RNS.log("\tModel code:\t\t"+RNS.hexrep(rnode.model))
RNS.log("\tHardware revision:\t"+RNS.hexrep(rnode.hw_rev))
RNS.log("\tSerial number:\t\t"+RNS.hexrep(rnode.serialno))
RNS.log("\tFrequency range:\t"+str(rnode.min_freq/1e6)+" MHz - "+str(rnode.max_freq/1e6)+" MHz")
RNS.log("\tMax TX power:\t\t"+str(rnode.max_output)+" dBm")
RNS.log("\tManufactured:\t\t"+timestring)
if rnode.configured:
rnode.bandwidth = rnode.conf_bandwidth
rnode.sf = rnode.conf_sf
rnode.cr = rnode.conf_cr
rnode.updateBitrate()
RNS.log("\tDevice signature:\t"+sigstring)
RNS.log("");
RNS.log("\tDevice mode:\t\tTNC")
RNS.log("\t Frequency:\t\t"+str((rnode.conf_frequency/1000000.0))+" MHz")
RNS.log("\t Bandwidth:\t\t"+str(rnode.conf_bandwidth/1000.0)+" KHz")
RNS.log("\t TX power:\t\t"+str(rnode.conf_txpower)+" dBm")
RNS.log("\t Spreading factor:\t"+str(rnode.conf_sf))
RNS.log("\t Coding rate:\t\t"+str(rnode.conf_cr))
RNS.log("\t On-air bitrate:\t"+str(rnode.bitrate_kbps)+" kbps")
else:
RNS.log("\tDevice mode:\t\tNormal (host-controlled)")
RNS.log("\tDevice signature:\t"+sigstring)
print("")
exit()
else:
RNS.log("EEPROM is invalid, no further information available")
exit()
if args.rom:
if rnode.provisioned:
RNS.log("EEPROM bootstrap was requested, but a valid EEPROM was already present.")
RNS.log("No changes are being made.")
exit()
else:
counter = None
counter_path = "./firmware/serial.counter"
try:
if os.path.isfile(counter_path):
file = open(counter_path, "r")
counter_str = file.read()
counter = int(counter_str)
file.close()
else:
counter = 0
except Exception as e:
RNS.log("Could not create device serial number, exiting")
RNS.log(str(e))
exit()
serialno = counter+1
model = None
hwrev = None
if args.model == "a4":
model = ROM.MODEL_A4
if args.model == "a9":
model = ROM.MODEL_A9
if args.hwrev > 0 and args.hwrev < 256:
hwrev = chr(args.hwrev)
if serialno > 0 and model != None and hwrev != None:
try:
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
timestamp = int(time.time())
time_bytes = struct.pack(">I", timestamp)
serial_bytes = struct.pack(">I", serialno)
file = open(counter_path, "w")
file.write(str(serialno))
file.close()
info_chunk = ROM.PRODUCT_RNODE+model+hwrev+serial_bytes+time_bytes
digest = hashes.Hash(hashes.MD5(), backend=default_backend())
digest.update(info_chunk)
checksum = digest.finalize()
RNS.log("Loading signing key...")
signature = None
key_path = "./firmware/signing.key"
if os.path.isfile(key_path):
try:
file = open(key_path, "r")
private_bytes = file.read()
file.close()
private_key = serialization.load_der_private_key(
private_bytes,
password=None,
backend=default_backend()
)
public_key = private_key.public_key()
public_bytes = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
signature = private_key.sign(
checksum,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
except Exception as e:
RNS.log("Error while signing EEPROM")
RNS.log(str(e))
else:
RNS.log("No signing key found")
exit()
RNS.log("Bootstrapping device EEPROM...")
rnode.write_eeprom(ROM.ADDR_PRODUCT, ROM.PRODUCT_RNODE)
time.sleep(0.006)
rnode.write_eeprom(ROM.ADDR_MODEL, model)
time.sleep(0.006)
rnode.write_eeprom(ROM.ADDR_HW_REV, hwrev)
time.sleep(0.006)
rnode.write_eeprom(ROM.ADDR_SERIAL, serial_bytes[0])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+1), serial_bytes[1])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+2), serial_bytes[2])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_SERIAL)+3), serial_bytes[3])
time.sleep(0.006)
rnode.write_eeprom(ROM.ADDR_MADE, time_bytes[0])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+1), time_bytes[1])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+2), time_bytes[2])
time.sleep(0.006)
rnode.write_eeprom(chr(ord(ROM.ADDR_MADE)+3), time_bytes[3])
time.sleep(0.006)
for i in range(0,16):
rnode.write_eeprom(chr(ord(ROM.ADDR_CHKSUM)+i), checksum[i])
time.sleep(0.006)
for i in range(0,128):
rnode.write_eeprom(chr(ord(ROM.ADDR_SIGNATURE)+i), signature[i])
time.sleep(0.006)
rnode.write_eeprom(ROM.ADDR_INFO_LOCK, ROM.INFO_LOCK_BYTE)
RNS.log("EEPROM written! Validating...")
rnode.download_eeprom()
if rnode.provisioned:
RNS.log("EEPROM Bootstrapping successful!")
exit()
else:
RNS.log("EEPROM was written, but validation failed. Check your settings.")
exit()
except Exception as e:
RNS.log("An error occured while writing EEPROM. The contained exception was:")
RNS.log(str(e))
else:
RNS.log("Invalid data specified, cancelling EEPROM write")
exit()
if rnode.provisioned:
if args.normal:
rnode.setNormalMode()
RNS.log("Device set to normal (host-controlled) operating mode")
exit()
if args.tnc:
if not (args.freq and args.bw and args.txp and args.sf and args.cr):
RNS.log("Please input startup configuration:")
print("")
if args.freq:
rnode.frequency = args.freq
else:
print "Frequency in Hz:\t",
rnode.frequency = int(raw_input())
if args.bw:
rnode.bandwidth = args.bw
else:
print "Bandwidth in Hz:\t",
rnode.bandwidth = int(raw_input())
if args.txp:
rnode.txpower = args.txp
else:
print "TX Power in dBm:\t",
rnode.txpower = int(raw_input())
if args.sf:
rnode.sf = args.sf
else:
print "Spreading factor:\t",
rnode.sf = int(raw_input())
if args.cr:
rnode.cr = args.cr
else:
print "Coding rate:\t\t",
rnode.cr = int(raw_input())
print("")
rnode.initRadio()
sleep(0.5)
rnode.setTNCMode()
RNS.log("Device set to TNC operating mode")
sleep(1.0)
exit()
config_interface()
else:
RNS.log("This device contains a valid firmware, but EEPROM is invalid.")
RNS.log("Probably the device has not been initialised, or the EEPROM has been erased.")
RNS.log("Please correctly initialise the device and try again!")
else:
print("")
parser.print_help()
print("")
exit()
except KeyboardInterrupt:
print("")
exit()