# -*- coding: utf-8 -*- """ Created on Fri Dec 25 21:25:14 2020 @author: DJ2LS """ import time from datetime import datetime,timezone import structlog import numpy as np import threading import hashlib import hmac import os import sys from pathlib import Path import platform import subprocess import psutil import glob log = structlog.get_logger("helpers") def wait(seconds: float) -> bool: """ Args: seconds: Returns: """ timeout = time.time() + seconds while time.time() < timeout: threading.Event().wait(0.01) return True def get_crc_8(data: str) -> bytes: """ Calculate CRC-8-CCITT checksum for the given data using the ITU I.432.1 specification. Args: data (str): Input data as a string. Returns: bytes: CRC-8-CCITT checksum of the provided data. """ crc = 0x00 polynomial = 0x07 xor_out = 0x55 if not isinstance(data, (bytes, bytearray)): data = bytes(data, "utf-8") for byte in data: crc ^= byte for _ in range(8): if crc & 0x80: crc = (crc << 1) ^ polynomial else: crc <<= 1 crc &= 0xFF # Final XOR value crc ^= xor_out return crc.to_bytes(1, byteorder="big") def get_crc_16(data: str) -> bytes: """ Calculate CRC-16-CCITT-FALSE checksum for the given data using the provided specification. Args: data (str): Input data as a string. Returns: bytes: CRC-16-CCITT-FALSE checksum of the provided data. """ crc = 0xFFFF polynomial = 0x1021 xor_out = 0 if not isinstance(data, (bytes, bytearray)): data = bytes(data, "utf-8") for byte in data: crc ^= byte << 8 for _ in range(8): if crc & 0x8000: crc = (crc << 1) ^ polynomial else: crc <<= 1 crc &= 0xFFFF # Final XOR value crc ^= xor_out return crc.to_bytes(2, byteorder="big") def get_crc_24(data: str) -> bytes: """ Calculate CRC-24-OPENPGP checksum for the given data using the provided specification. Args: data (str): Input data as a string. Returns: bytes: CRC-24-OPENPGP checksum of the provided data. """ crc = 0xB704CE polynomial = 0x864CFB xor_out = 0 if not isinstance(data, (bytes, bytearray)): data = bytes(data, "utf-8") for byte in data: crc ^= byte << 16 for _ in range(8): if crc & 0x800000: crc = (crc << 1) ^ polynomial else: crc <<= 1 crc &= 0xFFFFFF # Final XOR value crc ^= xor_out return crc.to_bytes(3, byteorder="big") def get_crc_32(data: str) -> bytes: """ Calculate CRC-32 checksum for the given data using the Ethernet specification. Args: data (str): Input data as a string. Returns: bytes: CRC-32 checksum of the provided data. """ def reflect(data, width): """ Reflects the bits in the given data. Args: data (int): The data to reflect. width (int): The bit width of the data. Returns: int: The reflected data. """ reflected_data = 0 for i in range(width): if data & (1 << i): reflected_data |= (1 << (width - 1 - i)) return reflected_data crc = 0xFFFFFFFF polynomial = 0x04C11DB7 xor_out = 0 if not isinstance(data, (bytes, bytearray)): data = bytes(data, "utf-8") for byte in data: byte = reflect(byte, 8) crc ^= byte << 24 for _ in range(8): if crc & 0x80000000: crc = (crc << 1) ^ polynomial else: crc <<= 1 crc &= 0xFFFFFFFF crc = reflect(crc, 32) crc ^= 0xFFFFFFFF crc ^= xor_out return crc.to_bytes(4, byteorder="big") from datetime import datetime, timezone import time def add_to_heard_stations(dxcallsign, dxgrid, datatype, snr, offset, frequency, heard_stations_list, distance_km=None, distance_miles=None, away_from_key=False): """ Args: dxcallsign (str): The callsign of the DX station. dxgrid (str): The Maidenhead grid square of the DX station. datatype (str): The type of data received (e.g., FT8, CW). snr (int): Signal-to-noise ratio of the received signal. offset (float): Frequency offset. frequency (float): Base frequency of the received signal. heard_stations_list (list): List containing heard stations. distance_km (float): Distance to the DX station in kilometers. distance_miles (float): Distance to the DX station in miles. away_from_key (bool): Away from key indicator Returns: Nothing. The function updates the heard_stations_list in-place. """ # Convert current timestamp to an integer current_timestamp = int(datetime.now(timezone.utc).timestamp()) # Initialize the new entry new_entry = [ dxcallsign, dxgrid, current_timestamp, datatype, snr, offset, frequency, distance_km, distance_miles, away_from_key ] # Check if the buffer is empty or if the callsign is not already in the list if not any(dxcallsign == station[0] for station in heard_stations_list): heard_stations_list.append(new_entry) else: # Search for the existing entry and update for i, entry in enumerate(heard_stations_list): if entry[0] == dxcallsign: heard_stations_list[i] = new_entry break def callsign_to_bytes(callsign: str) -> bytes: """ Args: callsign: Returns: """ # http://www.aprs.org/aprs11/SSIDs.txt # -0 Your primary station usually fixed and message capable # -1 generic additional station, digi, mobile, wx, etc # -2 generic additional station, digi, mobile, wx, etc # -3 generic additional station, digi, mobile, wx, etc # -4 generic additional station, digi, mobile, wx, etc # -5 Other networks (Dstar, Iphones, Androids, Blackberry's etc) # -6 Special activity, Satellite ops, camping or 6 meters, etc # -7 walkie talkies, HT's or other human portable # -8 boats, sailboats, RV's or second main mobile # -9 Primary Mobile (usually message capable) # -10 internet, Igates, echolink, winlink, AVRS, APRN, etc # -11 balloons, aircraft, spacecraft, etc # -12 APRStt, DTMF, RFID, devices, one-way trackers*, etc # -13 Weather stations # -14 Truckers or generally full time drivers # -15 generic additional station, digi, mobile, wx, etc # Try converting to bytestring if possible type string try: callsign = callsign.encode("utf-8") except TypeError: # This is expected depending on the type of the `callsign` argument. # log.debug("[HLP] callsign_to_bytes: Error converting callsign to bytes:", e=err) pass except Exception as err: log.debug("[HLP] callsign_to_bytes: Error converting callsign to bytes:", e=err, data=callsign) # Need this step to reduce the needed payload by the callsign # (stripping "-" out of the callsign) callsign = callsign.split(b"-") ssid = 0 try: ssid = int(callsign[1]) except IndexError: # This is expected when callsign doesn't have a dash. # log.debug("[HLP] callsign_to_bytes: Error callsign SSID to integer:", e=err) pass except Exception as err: log.debug("[HLP] callsign_to_bytes: Error splitting callsign/ssid:", e=err) # callsign = callsign[0] # bytestring = bytearray(8) # bytestring[:len(callsign)] = callsign # bytestring[7:8] = bytes([ssid]) # ---- callsign with encoding always 6 bytes long callsign = callsign[0].decode("utf-8") ssid = bytes([ssid]).decode("utf-8") return encode_call(callsign + ssid) # return bytes(bytestring) def bytes_to_callsign(bytestring: bytes) -> bytes: """ Convert our callsign, received by a frame to a callsign in a human readable format Args: bytestring: Returns: bytes """ # http://www.aprs.org/aprs11/SSIDs.txt # -0 Your primary station usually fixed and message capable # -1 generic additional station, digi, mobile, wx, etc # -2 generic additional station, digi, mobile, wx, etc # -3 generic additional station, digi, mobile, wx, etc # -4 generic additional station, digi, mobile, wx, etc # -5 Other networks (Dstar, Iphones, Androids, Blackberry's etc) # -6 Special activity, Satellite ops, camping or 6 meters, etc # -7 walkie talkies, HT's or other human portable # -8 boats, sailboats, RV's or second main mobile # -9 Primary Mobile (usually message capable) # -10 internet, Igates, echolink, winlink, AVRS, APRN, etc # -11 balloons, aircraft, spacecraft, etc # -12 APRStt, DTMF, RFID, devices, one-way trackers*, etc # -13 Weather stations # -14 Truckers or generally full time drivers # -15 generic additional station, digi, mobile, wx, etc # we need to do this step to reduce the needed paypload by the callsign ( stripping "-" out of the callsign ) """ callsign = bytes(bytestring[:7]) callsign = callsign.rstrip(b"\x00") ssid = int.from_bytes(bytes(bytestring[7:8]), "big") callsign = callsign + b"-" callsign = callsign.decode("utf-8") callsign = callsign + str(ssid) callsign = callsign.encode("utf-8") return bytes(callsign) """ decoded = decode_call(bytestring) callsign = decoded[:-1] ssid = ord(bytes(decoded[-1], "utf-8")) return bytes(f"{callsign}-{ssid}", "utf-8") def separate_callsign_from_ssid(callsign:bytes): # We want the callsign without SSID splitted_callsign = callsign.split(b"-") callsign = splitted_callsign[0] ssid = splitted_callsign[1].decode() return callsign, ssid def check_callsign(callsign: str, crc_to_check: bytes, ssid_list): """ Function to check a crc against a callsign to calculate the ssid by generating crc until we find the correct SSID Args: callsign: Callsign which we want to check crc_to_check: The CRC which we want the callsign to check against Returns: [True, Callsign + SSID] False """ if not isinstance(callsign, (bytes)): callsign = bytes(callsign,'utf-8') try: # We want the callsign without SSID splitted_callsign = callsign.split(b"-") callsign = splitted_callsign[0] ssid = splitted_callsign[1].decode() except IndexError: # This is expected when `callsign` doesn't have a dash. ssid = 0 except Exception as err: log.debug("[HLP] check_callsign: Error converting to bytes:", e=err) # ensure, we are always have the own ssid in ssid_list even if it is empty if ssid not in ssid_list: ssid_list.append(str(ssid)) for ssid in ssid_list: call_with_ssid = callsign + b'-' + (str(ssid)).encode('utf-8') callsign_crc = get_crc_24(call_with_ssid) callsign_crc = callsign_crc.hex() if callsign_crc == crc_to_check: log.debug("[HLP] check_callsign matched:", call_with_ssid=call_with_ssid, checksum=crc_to_check) return [True, call_with_ssid.decode()] if get_crc_24(callsign).hex() == crc_to_check: log.debug("[HLP] check_callsign matched:", call_without_ssid=callsign, checksum=crc_to_check) return [True, callsign.decode()] log.debug("[HLP] check_callsign: Check failed:", callsign=callsign, crc_to_check=crc_to_check, own_crc=callsign_crc) return [False, b''] def check_session_id(id: bytes, id_to_check: bytes): """ Funktion to check if we received the correct session id Args: id: our own session id id_to_check: The session id byte we want to check Returns: True False """ if id_to_check == b'\x00': return False log.debug("[HLP] check_sessionid: Checking:", ownid=id, check=id_to_check) return id == id_to_check def encode_grid(grid): """ @author: DB1UJ Args: grid:string: maidenhead QTH locater [a-r][a-r][0-9][0-9][a-x][a-x] Returns: 4 bytes contains 26 bit valid data with encoded grid locator """ out_code_word = 0 grid = grid.upper() # upper case to be save int_first = ord(grid[0]) - 65 # -65 offset for "A" become zero, utf8 table int_sec = ord(grid[1]) - 65 # -65 offset for "A" become zero, utf8 table int_val = (int_first * 18) + int_sec # encode for modulo devision, 2 numbers in 1 out_code_word = int_val & 0b111111111 # only 9 bit LSB A - R * A - R is needed out_code_word <<= 9 # shift 9 bit left having space next bits, letter A-R * A-R int_val = int(grid[2:4]) # number string to number int, highest value 99 out_code_word |= int_val & 0b1111111 # using bit OR to add new value out_code_word <<= 7 # shift 7 bit left having space next bits, letter A-X int_val = ord(grid[4]) - 65 # -65 offset for 'A' become zero, utf8 table out_code_word |= int_val & 0b11111 # using bit OR to add new value out_code_word <<= 5 # shift 5 bit left having space next bits, letter A-X int_val = ord(grid[5]) - 65 # -65 offset for 'A' become zero, utf8 table out_code_word |= int_val & 0b11111 # using bit OR to add new value return out_code_word.to_bytes(length=4, byteorder="big") def decode_grid(b_code_word: bytes): """ @author: DB1UJ Args: b_code_word:bytes: 4 bytes with 26 bit valid data LSB Returns: grid:str: upper case maidenhead QTH locater [A-R][A-R][0-9][0-9][A-X][A-X] """ code_word = int.from_bytes(b_code_word, byteorder="big", signed=False) grid = chr((code_word & 0b11111) + 65) code_word >>= 5 grid = chr((code_word & 0b11111) + 65) + grid code_word >>= 7 grid = str(int(code_word & 0b1111111)) + grid if (code_word & 0b1111111) < 10: grid = f"0{grid}" code_word >>= 9 int_val = int(code_word & 0b111111111) int_first, int_sec = divmod(int_val, 18) return chr(int(int_first) + 65) + chr(int(int_sec) + 65) + grid def encode_call(call): """ @author: DB1UJ Args: call:string: ham radio call sign [A-Z,0-9], last char SSID 0-63 Returns: 6 bytes contains 6 bits/sign encoded 8 char call sign with binary SSID (only upper letters + numbers, SSID) """ out_code_word = 0 call = call.upper() # upper case to be save for char in call: int_val = ord(char) - 48 # -48 reduce bits, begin with first number utf8 table out_code_word <<= 6 # shift left 6 bit, making space for a new char out_code_word |= ( int_val & 0b111111 ) # bit OR adds the new char, masked with AND 0b111111 out_code_word >>= 6 # clean last char out_code_word <<= 6 # make clean space out_code_word |= ord(call[-1]) & 0b111111 # add the SSID uncoded only 0 - 63 return out_code_word.to_bytes(length=6, byteorder="big") def decode_call(b_code_word: bytes): """ @author: DB1UJ Args: b_code_word:bytes: 6 bytes with 6 bits/sign valid data char signs LSB Returns: call:str: upper case ham radio call sign [A-Z,0-9] + binary SSID """ code_word = int.from_bytes(b_code_word, byteorder="big", signed=False) ssid = chr(code_word & 0b111111) # save the uncoded binary SSID call = str() while code_word != 0: call = chr((code_word & 0b111111) + 48) + call code_word >>= 6 call = call[:-1] + ssid # remove the last char from call and replace with SSID return call def snr_to_bytes(snr): """create a byte from snr value """ # make sure we have onl 1 byte snr # min max = -12.7 / 12.7 # enough for detecting if a channel is good or bad snr = snr * 10 snr = np.clip(snr, -127, 127) snr = int(snr).to_bytes(1, byteorder='big', signed=True) return snr def snr_from_bytes(snr): """create int from snr byte""" snr = int.from_bytes(snr, byteorder='big', signed=True) snr = snr / 10 return snr def safe_execute(default, exception, function, *args): """ https://stackoverflow.com/a/36671208 from json import loads safe_execute("Oh no, explosions occurred!", TypeError, loads, None) """ try: return function(*args) except exception: return default def return_key_from_object(default, obj, key): try: return obj[key] except KeyError: return default def bool_to_string(state): return "True" if state else "False" def get_hmac_salt(dxcallsign: bytes, mycallsign: bytes): filename = f"freedata_hmac_STATION_{mycallsign.decode('utf-8')}_REMOTE_{dxcallsign.decode('utf-8')}.txt" if sys.platform in ["linux"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename elif sys.platform in ["darwin"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename elif sys.platform in ["win32", "win64"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename else: try: subfolder = Path('hmac') filepath = subfolder / filename except Exception as e: log.error( "[Modem] [HMAC] File lookup error", file=filepath, ) # check if file exists else return false if not check_if_file_exists(filepath): return False log.info("[SCK] [HMAC] File lookup", file=filepath) try: with open(filepath, "r") as file: line = file.readlines() hmac_salt = bytes(line[-1], "utf-8").split(b'\n') hmac_salt = hmac_salt[0] return hmac_salt if delete_last_line_from_hmac_list(filepath, -1) else False except Exception as e: log.warning("[SCK] [HMAC] File lookup failed", file=filepath, e=e) return False def search_hmac_salt(dxcallsign: bytes, mycallsign: bytes, search_token, data_frame, token_iters): filename = f"freedata_hmac_STATION_{mycallsign.decode('utf-8')}_REMOTE_{dxcallsign.decode('utf-8')}.txt" if sys.platform in ["linux"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename elif sys.platform in ["darwin"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename elif sys.platform in ["win32", "win64"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename else: try: subfolder = Path('hmac') filepath = subfolder / filename except Exception as e: log.error( "[Modem] [HMAC] File lookup error", file=filepath, ) # check if file exists else return false if not check_if_file_exists(filepath): log.warning( "[Modem] [HMAC] Token file not found", file=filepath, ) return False try: with open(filepath, "r") as file: token_list = file.readlines() token_iters = min(token_iters, len(token_list)) for _ in range(1, token_iters + 1): key = token_list[len(token_list) - _][:-1] key = bytes(key, "utf-8") search_digest = hmac.new(key, data_frame, hashlib.sha256).digest()[:4] # TODO Remove this debugging information if not needed anymore # print("-----------------------------------------") # print(_) # print(f" key-------------{key}") # print(f" key-------------{token_list[len(token_list) - _][:-1]}") # print(f" key-------------{key.hex()}") # print(f" search token----{search_token.hex()}") # print(f" search digest---{search_digest.hex()}") if search_token.hex() == search_digest.hex(): token_position = len(token_list) - _ delete_last_line_from_hmac_list(filepath, token_position) log.info( "[Modem] [HMAC] Signature found", expected=search_token.hex(), ) return True log.warning( "[Modem] [HMAC] Signature not found", expected=search_token.hex(), filepath=filepath, ) return False except Exception as e: log.warning( "[Modem] [HMAC] Lookup failed", e=e, expected=search_token, ) return False def delete_last_line_from_hmac_list(filepath, position): # check if file exists else return false if not check_if_file_exists(filepath): return False try: linearray = [] with open(filepath, "r") as file: linearray = file.readlines()[:position] #print(linearray) with open(filepath, "w") as file: #print(linearray) for line in linearray: file.write(line) return True except Exception: return False def check_if_file_exists(path): try: # check if file size is present and filesize > 0 if os.path.isfile(path): filesize = os.path.getsize(path) if filesize > 0: return True else: return False else: return False except Exception as e: log.warning( "[Modem] [FILE] Lookup failed", e=e, path=path, ) return False def set_bit(byte, position, value): """Set the bit at 'position' to 'value' in the given byte.""" if not 0 <= position <= 7: raise ValueError("Position must be between 0 and 7") if value: return byte | (1 << position) else: return byte & ~(1 << position) def get_bit(byte, position): """Get the boolean value of the bit at 'position' in the given byte.""" if not 0 <= position <= 7: raise ValueError("Position must be between 0 and 7") return (byte & (1 << position)) != 0 def set_flag(byte, flag_name, value, flag_dict): """Set the flag in the byte according to the flag dictionary. # Define a dictionary mapping flag names to their bit positions flag_dict = { 'FLAG1': 0, # Bit position for FLAG1 'FLAG2': 1, # Bit position for FLAG2, etc. 'FLAG3': 2 } """ if flag_name not in flag_dict: raise ValueError(f"Unknown flag name: {flag_name}") position = flag_dict[flag_name] return set_bit(byte, position, value) def get_flag(byte, flag_name, flag_dict): """Get the value of the flag from the byte according to the flag dictionary.""" if flag_name not in flag_dict: raise ValueError(f"Unknown flag name: {flag_name}") position = flag_dict[flag_name] return get_bit(byte, position) def find_binary_paths(binary_name="rigctld", search_system_wide=False): """ Search for a binary within the current working directory, its subdirectories, and optionally, system-wide locations and the PATH environment variable. :param binary_name: The base name of the binary to search for, without extension. :param search_system_wide: Boolean flag to enable or disable system-wide search. :return: A list of full paths to the binary if found, otherwise an empty list. """ binary_paths = [] # Initialize an empty list to store found paths # Adjust binary name for Windows if platform.system() == 'Windows': binary_name += ".exe" # Search in the current working directory and subdirectories root_path = os.getcwd() for dirpath, dirnames, filenames in os.walk(root_path): if binary_name in filenames: binary_paths.append(os.path.join(dirpath, binary_name)) # If system-wide search is enabled, look in system locations and PATH if search_system_wide: system_paths = os.environ.get('PATH', '').split(os.pathsep) # Optionally add common binary locations for Unix-like and Windows systems if platform.system() != 'Windows': system_paths.extend(['/usr/bin', '/usr/local/bin', '/bin']) else: system_paths.extend(glob.glob("C:\\Program Files\\Hamlib*\\bin")) system_paths.extend(glob.glob("C:\\Program Files (x86)\\Hamlib*\\bin")) for path in system_paths: potential_path = os.path.join(path, binary_name) if os.path.isfile(potential_path): binary_paths.append(potential_path) return binary_paths def kill_and_execute(binary_path, additional_args=None): """ Kills any running instances of the binary across Linux, macOS, and Windows, then starts a new one non-blocking. :param binary_path: The full path to the binary to execute. :param additional_args: A list of additional arguments to pass to the binary. :return: subprocess.Popen object of the started process """ # Kill any existing instances of the binary for proc in psutil.process_iter(attrs=['pid', 'name', 'cmdline']): try: cmdline = proc.info['cmdline'] # Ensure cmdline is iterable and not None if cmdline and binary_path in ' '.join(cmdline): proc.kill() print(f"Killed running instance with PID: {proc.info['pid']}") except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass # Process no longer exists or no permission to kill # Execute the binary with additional arguments non-blocking command = [binary_path] + (additional_args if additional_args else []) return subprocess.Popen(command) def kill_process(proc): try: ps_proc = psutil.Process(proc.pid) ps_proc.kill() print(f"Killed running instance with PID: {proc.pid}") except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess) as e: print(f"Failed to kill process: {e}")