|
|
|
@ -1,3 +1,4 @@
|
|
|
|
|
#!/usr/bin/python3
|
|
|
|
|
from time import sleep
|
|
|
|
|
import argparse
|
|
|
|
|
import threading
|
|
|
|
@ -8,7 +9,7 @@ import datetime
|
|
|
|
|
import time
|
|
|
|
|
import math
|
|
|
|
|
import traceback
|
|
|
|
|
import importlib
|
|
|
|
|
from importlib import util
|
|
|
|
|
|
|
|
|
|
class RNS():
|
|
|
|
|
@staticmethod
|
|
|
|
@ -48,6 +49,7 @@ class KISS():
|
|
|
|
|
CMD_RADIO_STATE = 0x06
|
|
|
|
|
CMD_RADIO_LOCK = 0x07
|
|
|
|
|
CMD_DETECT = 0x08
|
|
|
|
|
CMD_IMPLICIT = 0x09
|
|
|
|
|
CMD_PROMISC = 0x0E
|
|
|
|
|
CMD_READY = 0x0F
|
|
|
|
|
CMD_STAT_RX = 0x21
|
|
|
|
@ -114,6 +116,8 @@ class RNode():
|
|
|
|
|
self.r_lock = None
|
|
|
|
|
self.r_stat_rssi = 0
|
|
|
|
|
self.r_stat_snr = 0
|
|
|
|
|
self.r_implicit_length = 0
|
|
|
|
|
|
|
|
|
|
self.rssi_offset = 157
|
|
|
|
|
|
|
|
|
|
self.sf = None
|
|
|
|
@ -121,6 +125,7 @@ class RNode():
|
|
|
|
|
self.txpower = None
|
|
|
|
|
self.frequency = None
|
|
|
|
|
self.bandwidth = None
|
|
|
|
|
self.implicit_length = 0
|
|
|
|
|
|
|
|
|
|
self.detected = None
|
|
|
|
|
|
|
|
|
@ -253,6 +258,10 @@ class RNode():
|
|
|
|
|
self.r_cr = byte
|
|
|
|
|
RNS.log("Radio reporting coding rate is "+str(self.r_cr))
|
|
|
|
|
self.updateBitrate()
|
|
|
|
|
elif (command == KISS.CMD_IMPLICIT):
|
|
|
|
|
self.r_implicit_length = byte
|
|
|
|
|
if self.r_implicit_length != 0:
|
|
|
|
|
RNS.log("Radio in implicit header mode, listening for packets with a length of "+str(self.r_implicit_length)+" bytes")
|
|
|
|
|
elif (command == KISS.CMD_RADIO_STATE):
|
|
|
|
|
self.r_state = byte
|
|
|
|
|
elif (command == KISS.CMD_RADIO_LOCK):
|
|
|
|
@ -317,6 +326,7 @@ class RNode():
|
|
|
|
|
self.setTXPower()
|
|
|
|
|
self.setSpreadingFactor()
|
|
|
|
|
self.setCodingRate()
|
|
|
|
|
self.setImplicitLength()
|
|
|
|
|
self.setRadioState(KISS.RADIO_STATE_ON)
|
|
|
|
|
|
|
|
|
|
def setFrequency(self):
|
|
|
|
@ -367,6 +377,15 @@ class RNode():
|
|
|
|
|
if written != len(kiss_command):
|
|
|
|
|
raise IOError("An IO error occurred while configuring coding rate for "+self(str))
|
|
|
|
|
|
|
|
|
|
def setImplicitLength(self):
|
|
|
|
|
if self.implicit_length != 0:
|
|
|
|
|
length = KISS.escape(bytes([self.implicit_length]))
|
|
|
|
|
|
|
|
|
|
kiss_command = bytes([KISS.FEND])+bytes([KISS.CMD_IMPLICIT])+length+bytes([KISS.FEND])
|
|
|
|
|
written = self.serial.write(kiss_command)
|
|
|
|
|
if written != len(kiss_command):
|
|
|
|
|
raise IOError("An IO error occurred while configuring implicit header mode for "+self(str))
|
|
|
|
|
|
|
|
|
|
def setRadioState(self, state):
|
|
|
|
|
kiss_command = bytes([KISS.FEND])+bytes([KISS.CMD_RADIO_STATE])+bytes([state])+bytes([KISS.FEND])
|
|
|
|
|
written = self.serial.write(kiss_command)
|
|
|
|
@ -396,11 +415,18 @@ def device_probe(rnode):
|
|
|
|
|
|
|
|
|
|
def packet_captured(data, rnode_instance):
|
|
|
|
|
if rnode_instance.console_output:
|
|
|
|
|
RNS.log("["+str(rnode_instance.r_stat_rssi)+" dBm] [SNR "+str(rnode_instance.r_stat_snr)+" dB] ["+str(len(data))+" bytes]\t"+str(data));
|
|
|
|
|
if rnode_instance.print_hex:
|
|
|
|
|
if len(data) == 1:
|
|
|
|
|
data = [data]
|
|
|
|
|
datastring = "\n"+RNS.hexrep(data)+"\n"
|
|
|
|
|
else:
|
|
|
|
|
datastring = str(data)
|
|
|
|
|
|
|
|
|
|
RNS.log("["+str(rnode_instance.r_stat_rssi)+" dBm] [SNR "+str(rnode_instance.r_stat_snr)+" dB] ["+str(len(data))+" bytes]\t"+datastring);
|
|
|
|
|
if rnode_instance.write_to_disk:
|
|
|
|
|
try:
|
|
|
|
|
filename = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S.%f")+".pkt"
|
|
|
|
|
file = open(rnode_instance.write_dir+"/"+filename, "w")
|
|
|
|
|
file = open(rnode_instance.write_dir+"/"+filename, "wb")
|
|
|
|
|
file.write(data)
|
|
|
|
|
file.close()
|
|
|
|
|
except Exception as e:
|
|
|
|
@ -410,7 +436,7 @@ def packet_captured(data, rnode_instance):
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
try:
|
|
|
|
|
if not importlib.util.find_spec("serial"):
|
|
|
|
|
if not util.find_spec("serial"):
|
|
|
|
|
raise ImportError("Serial module could not be found")
|
|
|
|
|
except ImportError:
|
|
|
|
|
print("")
|
|
|
|
@ -424,12 +450,14 @@ def main():
|
|
|
|
|
try:
|
|
|
|
|
parser = argparse.ArgumentParser(description="LoRa packet sniffer for RNode hardware.")
|
|
|
|
|
parser.add_argument("-C", "--console", action="store_true", help="Print captured packets to the console")
|
|
|
|
|
parser.add_argument("-H", "--hex", action="store_true", help="Print out packets as hexadecimal")
|
|
|
|
|
parser.add_argument("-W", action="store", metavar="directory", type=str, default=None, help="Write captured packets to a directory")
|
|
|
|
|
parser.add_argument("--freq", action="store", metavar="Hz", type=int, default=None, help="Frequency in Hz")
|
|
|
|
|
parser.add_argument("--bw", action="store", metavar="Hz", type=int, default=None, help="Bandwidth in Hz")
|
|
|
|
|
parser.add_argument("--txp", action="store", metavar="dBm", type=int, default=None, help="TX power in dBm")
|
|
|
|
|
parser.add_argument("--sf", action="store", metavar="factor", type=int, default=None, help="Spreading factor")
|
|
|
|
|
parser.add_argument("--cr", action="store", metavar="rate", type=int, default=None, help="Coding rate")
|
|
|
|
|
parser.add_argument("--implicit", action="store", metavar="length", type=int, default=None, help="Packet length in implicit header mode")
|
|
|
|
|
|
|
|
|
|
parser.add_argument("port", nargs="?", default=None, help="Serial port where RNode is attached", type=str)
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
@ -512,10 +540,10 @@ def main():
|
|
|
|
|
print("Bandwidth in Hz:\t", end=' ')
|
|
|
|
|
rnode.bandwidth = int(input())
|
|
|
|
|
|
|
|
|
|
if args.txp:
|
|
|
|
|
if args.txp and (args.txp >= 0 and args.txp <= 17):
|
|
|
|
|
rnode.txpower = args.txp
|
|
|
|
|
else:
|
|
|
|
|
rnode.txpower = 2
|
|
|
|
|
rnode.txpower = 0
|
|
|
|
|
|
|
|
|
|
if args.sf:
|
|
|
|
|
rnode.sf = args.sf
|
|
|
|
@ -529,6 +557,16 @@ def main():
|
|
|
|
|
print("Coding rate:\t\t", end=' ')
|
|
|
|
|
rnode.cr = int(input())
|
|
|
|
|
|
|
|
|
|
if args.implicit:
|
|
|
|
|
rnode.implicit_length = args.implicit
|
|
|
|
|
else:
|
|
|
|
|
rnode.implicit_length = 0
|
|
|
|
|
|
|
|
|
|
if args.hex:
|
|
|
|
|
rnode.print_hex = True
|
|
|
|
|
else:
|
|
|
|
|
rnode.print_hex = False
|
|
|
|
|
|
|
|
|
|
rnode.initRadio()
|
|
|
|
|
rnode.setPromiscuousMode(True)
|
|
|
|
|
sleep(0.5)
|
|
|
|
|