Handle MQTT client memory leak

pull/74/merge
Mark Qvist 2025-03-27 22:16:54 +01:00
parent b2c3411c90
commit a0a6b0fd55
2 changed files with 36 additions and 5 deletions

View File

@ -8,6 +8,7 @@ import sqlite3
import random import random
import shlex import shlex
import re import re
import gc
import RNS.vendor.umsgpack as msgpack import RNS.vendor.umsgpack as msgpack
import RNS.Interfaces.Interface as Interface import RNS.Interfaces.Interface as Interface
@ -268,6 +269,7 @@ class SidebandCore():
self.webshare_ssl_cert_path = self.app_dir+"/app_storage/ssl_cert.pem" self.webshare_ssl_cert_path = self.app_dir+"/app_storage/ssl_cert.pem"
self.mqtt = None self.mqtt = None
self.mqtt_handle_lock = threading.Lock()
self.first_run = True self.first_run = True
self.saving_configuration = False self.saving_configuration = False
@ -2141,6 +2143,7 @@ class SidebandCore():
dbc = db.cursor() dbc = db.cursor()
dbc.execute("CREATE TABLE IF NOT EXISTS telemetry (id INTEGER PRIMARY KEY, dest_context BLOB, ts INTEGER, data BLOB)") dbc.execute("CREATE TABLE IF NOT EXISTS telemetry (id INTEGER PRIMARY KEY, dest_context BLOB, ts INTEGER, data BLOB)")
dbc.execute("CREATE INDEX IF NOT EXISTS idx_telemetry_ts ON telemetry(ts)")
db.commit() db.commit()
def _db_upgradetables(self): def _db_upgradetables(self):
@ -3269,13 +3272,38 @@ class SidebandCore():
self.setstate("app.flags.last_telemetry", time.time()) self.setstate("app.flags.last_telemetry", time.time())
def mqtt_handle_telemetry(self, context_dest, telemetry): def mqtt_handle_telemetry(self, context_dest, telemetry):
with self.mqtt_handle_lock:
# TODO: Remove debug
if hasattr(self, "last_mqtt_recycle") and time.time() > self.last_mqtt_recycle + 60*4:
# RNS.log("Recycling MQTT handler", RNS.LOG_DEBUG)
self.mqtt.stop()
self.mqtt.client = None
self.mqtt = None
gc.collect()
if self.mqtt == None: if self.mqtt == None:
self.mqtt = MQTT() self.mqtt = MQTT()
self.last_mqtt_recycle = time.time()
self.mqtt.set_server(self.config["telemetry_mqtt_host"], self.config["telemetry_mqtt_port"]) self.mqtt.set_server(self.config["telemetry_mqtt_host"], self.config["telemetry_mqtt_port"])
self.mqtt.set_auth(self.config["telemetry_mqtt_user"], self.config["telemetry_mqtt_pass"]) self.mqtt.set_auth(self.config["telemetry_mqtt_user"], self.config["telemetry_mqtt_pass"])
self.mqtt.handle(context_dest, telemetry) self.mqtt.handle(context_dest, telemetry)
# TODO: Remove debug
# if not hasattr(self, "memtr"):
# from pympler import muppy
# from pympler import summary
# import resource
# self.res = resource
# self.ms = summary; self.mp = muppy
# self.memtr = self.ms.summarize(self.mp.get_objects())
# RNS.log(f"RSS: {RNS.prettysize(self.res.getrusage(self.res.RUSAGE_SELF).ru_maxrss*1000)}")
# else:
# memsum = self.ms.summarize(self.mp.get_objects())
# memdiff = self.ms.get_diff(self.memtr, memsum)
# self.ms.print_(memdiff)
# RNS.log(f"RSS: {RNS.prettysize(self.res.getrusage(self.res.RUSAGE_SELF).ru_maxrss*1000)}")
def update_telemetry(self): def update_telemetry(self):
try: try:
try: try:

View File

@ -49,6 +49,9 @@ class MQTT():
time.sleep(MQTT.SCHEDULER_SLEEP) time.sleep(MQTT.SCHEDULER_SLEEP)
try: self.disconnect()
except Exception as e: RNS.log(f"An error occurred while disconnecting MQTT server: {e}", RNS.LOG_ERROR)
RNS.log("Stopped MQTT scheduler", RNS.LOG_DEBUG) RNS.log("Stopped MQTT scheduler", RNS.LOG_DEBUG)
def connect_failed(self, client, userdata): def connect_failed(self, client, userdata):