Added ability to include metadata on resource transfers

pull/824/head
Mark Qvist 2025-05-10 15:38:06 +02:00
parent 5fb6abd019
commit 763078a1ae
1 changed files with 98 additions and 44 deletions

View File

@ -33,6 +33,7 @@ import os
import bz2 import bz2
import math import math
import time import time
import struct
import tempfile import tempfile
import threading import threading
from threading import Lock from threading import Lock
@ -119,6 +120,9 @@ class Resource:
# fit in 3 bytes in resource advertisements. # fit in 3 bytes in resource advertisements.
MAX_EFFICIENT_SIZE = 16 * 1024 * 1024 - 1 MAX_EFFICIENT_SIZE = 16 * 1024 * 1024 - 1
RESPONSE_MAX_GRACE_TIME = 10 RESPONSE_MAX_GRACE_TIME = 10
# Max metadata size is 16777215 (0xFFFFFF) bytes
METADATA_MAX_SIZE = 16 * 1024 * 1024 - 1
# The maximum size to auto-compress with # The maximum size to auto-compress with
# bz2 before sending. # bz2 before sending.
@ -196,12 +200,15 @@ class Resource:
resource.started_transferring = resource.last_activity resource.started_transferring = resource.last_activity
resource.storagepath = RNS.Reticulum.resourcepath+"/"+resource.original_hash.hex() resource.storagepath = RNS.Reticulum.resourcepath+"/"+resource.original_hash.hex()
resource.meta_storagepath = resource.storagepath+".meta"
resource.segment_index = adv.i resource.segment_index = adv.i
resource.total_segments = adv.l resource.total_segments = adv.l
if adv.l > 1:
resource.split = True if adv.l > 1: resource.split = True
else: else: resource.split = False
resource.split = False
if adv.x: resource.has_metadata = True
else: resource.has_metadata = False
resource.hashmap = [None] * resource.total_parts resource.hashmap = [None] * resource.total_parts
resource.hashmap_height = 0 resource.hashmap_height = 0
@ -227,9 +234,7 @@ class Resource:
RNS.log("Error while executing resource started callback from "+str(resource)+". The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error while executing resource started callback from "+str(resource)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
resource.hashmap_update(0, resource.hashmap_raw) resource.hashmap_update(0, resource.hashmap_raw)
resource.watchdog_job() resource.watchdog_job()
return resource return resource
else: else:
@ -243,15 +248,32 @@ class Resource:
# Create a resource for transmission to a remote destination # Create a resource for transmission to a remote destination
# The data passed can be either a bytes-array or a file opened # The data passed can be either a bytes-array or a file opened
# in binary read mode. # in binary read mode.
def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None, timeout = None, segment_index = 1, original_hash = None, request_id = None, is_response = False): def __init__(self, data, link, metadata=None, advertise=True, auto_compress=True, callback=None, progress_callback=None,
timeout = None, segment_index = 1, original_hash = None, request_id = None, is_response = False, sent_metadata_size=0):
data_size = None data_size = None
resource_data = None resource_data = None
self.assembly_lock = False self.assembly_lock = False
self.preparing_next_segment = False self.preparing_next_segment = False
self.next_segment = None self.next_segment = None
self.metadata = None
self.has_metadata = False
self.metadata_size = sent_metadata_size
if metadata != None:
packed_metadata = umsgpack.packb(metadata)
metadata_size = len(packed_metadata)
if metadata_size > Resource.METADATA_MAX_SIZE:
raise SystemError("Resource metadata size exceeded")
else:
self.metadata = struct.pack(">I", metadata_size)[1:] + packed_metadata
self.metadata_size = len(self.metadata)
self.has_metadata = True
else:
self.metadata = b""
if data != None: if data != None:
if not hasattr(data, "read") and len(data) > Resource.MAX_EFFICIENT_SIZE: if not hasattr(data, "read") and self.metadata_size + len(data) > Resource.MAX_EFFICIENT_SIZE:
original_data = data original_data = data
data_size = len(original_data) data_size = len(original_data)
data = tempfile.TemporaryFile() data = tempfile.TemporaryFile()
@ -259,31 +281,43 @@ class Resource:
del original_data del original_data
if hasattr(data, "read"): if hasattr(data, "read"):
if data_size == None: if data_size == None: data_size = os.stat(data.name).st_size
data_size = os.stat(data.name).st_size self.total_size = data_size + self.metadata_size
self.total_size = data_size if self.total_size <= Resource.MAX_EFFICIENT_SIZE:
if data_size <= Resource.MAX_EFFICIENT_SIZE:
self.total_segments = 1 self.total_segments = 1
self.segment_index = 1 self.segment_index = 1
self.split = False self.split = False
resource_data = data.read() resource_data = data.read()
data.close() data.close()
else: else:
self.total_segments = ((data_size-1)//Resource.MAX_EFFICIENT_SIZE)+1 # self.total_segments = ((data_size-1)//Resource.MAX_EFFICIENT_SIZE)+1
# self.segment_index = segment_index
# self.split = True
# seek_index = segment_index-1
# seek_position = seek_index*Resource.MAX_EFFICIENT_SIZE
self.total_segments = ((self.total_size-1)//Resource.MAX_EFFICIENT_SIZE)+1
self.segment_index = segment_index self.segment_index = segment_index
self.split = True self.split = True
seek_index = segment_index-1 seek_index = segment_index-1
seek_position = seek_index*Resource.MAX_EFFICIENT_SIZE first_read_size = Resource.MAX_EFFICIENT_SIZE - self.metadata_size
if segment_index == 1:
seek_position = 0
segment_read_size = first_read_size
else:
seek_position = first_read_size + ((seek_index-1)*Resource.MAX_EFFICIENT_SIZE)
segment_read_size = Resource.MAX_EFFICIENT_SIZE
data.seek(seek_position) data.seek(seek_position)
resource_data = data.read(Resource.MAX_EFFICIENT_SIZE) resource_data = data.read(segment_read_size)
self.input_file = data self.input_file = data
elif isinstance(data, bytes): elif isinstance(data, bytes):
data_size = len(data) data_size = len(data)
self.total_size = data_size self.total_size = data_size + self.metadata_size
resource_data = data resource_data = data
self.total_segments = 1 self.total_segments = 1
@ -296,7 +330,9 @@ class Resource:
else: else:
raise TypeError("Invalid data instance type passed to resource initialisation") raise TypeError("Invalid data instance type passed to resource initialisation")
data = resource_data if resource_data:
if self.has_metadata: data = self.metadata + resource_data
else: data = resource_data
self.status = Resource.NONE self.status = Resource.NONE
self.link = link self.link = link
@ -559,6 +595,7 @@ class Resource:
else: else:
sleep_time = self.last_activity + self.part_timeout_factor*((3*self.sdu)/self.eifr) + Resource.RETRY_GRACE_TIME + extra_wait - time.time() sleep_time = self.last_activity + self.part_timeout_factor*((3*self.sdu)/self.eifr) + Resource.RETRY_GRACE_TIME + extra_wait - time.time()
# TODO: Remove debug at some point
# RNS.log(f"EIFR {RNS.prettyspeed(self.eifr)}, ETOF {RNS.prettyshorttime(expected_tof_remaining)} ", RNS.LOG_DEBUG, pt=True) # RNS.log(f"EIFR {RNS.prettyspeed(self.eifr)}, ETOF {RNS.prettyshorttime(expected_tof_remaining)} ", RNS.LOG_DEBUG, pt=True)
# RNS.log(f"Resource ST {RNS.prettyshorttime(sleep_time)}, RTT {RNS.prettyshorttime(self.rtt or self.link.rtt)}, {self.outstanding_parts} left", RNS.LOG_DEBUG, pt=True) # RNS.log(f"Resource ST {RNS.prettyshorttime(sleep_time)}, RTT {RNS.prettyshorttime(self.rtt or self.link.rtt)}, {self.outstanding_parts} left", RNS.LOG_DEBUG, pt=True)
@ -628,22 +665,27 @@ class Resource:
self.status = Resource.ASSEMBLING self.status = Resource.ASSEMBLING
stream = b"".join(self.parts) stream = b"".join(self.parts)
if self.encrypted: if self.encrypted: data = self.link.decrypt(stream)
data = self.link.decrypt(stream) else: data = stream
else:
data = stream
# Strip off random hash # Strip off random hash
data = data[Resource.RANDOM_HASH_SIZE:] data = data[Resource.RANDOM_HASH_SIZE:]
if self.compressed: if self.compressed: self.data = bz2.decompress(data)
self.data = bz2.decompress(data) else: self.data = data
else:
self.data = data
calculated_hash = RNS.Identity.full_hash(self.data+self.random_hash) calculated_hash = RNS.Identity.full_hash(self.data+self.random_hash)
if calculated_hash == self.hash: if calculated_hash == self.hash:
if self.has_metadata and self.segment_index == 1:
# TODO: Add early metadata_ready callback
metadata_size = data[0] << 16 | data[1] << 8 | data[2]
packed_metadata = data[3:3+metadata_size]
metadata_file = open(self.meta_storagepath, "wb")
metadata_file.write(packed_metadata)
metadata_file.close()
del packed_metadata
data = data[3+metadata_size:]
self.file = open(self.storagepath, "ab") self.file = open(self.storagepath, "ab")
self.file.write(self.data) self.file.write(self.data)
self.file.close() self.file.close()
@ -662,21 +704,27 @@ class Resource:
if self.segment_index == self.total_segments: if self.segment_index == self.total_segments:
if self.callback != None: if self.callback != None:
if not os.path.isfile(self.meta_storagepath):
self.metadata = None
else:
metadata_file = open(self.meta_storagepath, "rb")
self.metadata = umsgpack.unpackb(metadata_file.read())
metadata_file.close()
try: os.unlink(self.meta_storagepath)
except Exception as e:
RNS.log(f"Error while cleaning up resource metadata file, the contained exception was: {e}", RNS.LOG_ERROR)
self.data = open(self.storagepath, "rb") self.data = open(self.storagepath, "rb")
try: try: self.callback(self)
self.callback(self)
except Exception as e: except Exception as e:
RNS.log("Error while executing resource assembled callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error while executing resource assembled callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
try: try:
if hasattr(self.data, "close") and callable(self.data.close): if hasattr(self.data, "close") and callable(self.data.close): self.data.close()
self.data.close()
os.unlink(self.storagepath) os.unlink(self.storagepath)
except Exception as e: except Exception as e:
RNS.log("Error while cleaning up resource files, the contained exception was:", RNS.LOG_ERROR) RNS.log(f"Error while cleaning up resource files, the contained exception was: {e}", RNS.LOG_ERROR)
RNS.log(str(e))
else: else:
RNS.log("Resource segment "+str(self.segment_index)+" of "+str(self.total_segments)+" received, waiting for next segment to be announced", RNS.LOG_DEBUG) RNS.log("Resource segment "+str(self.segment_index)+" of "+str(self.total_segments)+" received, waiting for next segment to be announced", RNS.LOG_DEBUG)
@ -708,6 +756,7 @@ class Resource:
is_response = self.is_response, is_response = self.is_response,
advertise = False, advertise = False,
auto_compress = self.auto_compress, auto_compress = self.auto_compress,
sent_metadata_size = self.metadata_size,
) )
def validate_proof(self, proof_data): def validate_proof(self, proof_data):
@ -720,18 +769,18 @@ class Resource:
# If all segments were processed, we'll # If all segments were processed, we'll
# signal that the resource sending concluded # signal that the resource sending concluded
if self.callback != None: if self.callback != None:
try: try: self.callback(self)
self.callback(self) except Exception as e: RNS.log("Error while executing resource concluded callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
except Exception as e:
RNS.log("Error while executing resource concluded callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
finally: finally:
try: try:
if hasattr(self, "input_file"): if hasattr(self, "input_file"):
if hasattr(self.input_file, "close") and callable(self.input_file.close): if hasattr(self.input_file, "close") and callable(self.input_file.close): self.input_file.close()
self.input_file.close() except Exception as e: RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR)
else:
except Exception as e: try:
RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR) if hasattr(self, "input_file"):
if hasattr(self.input_file, "close") and callable(self.input_file.close): self.input_file.close()
except Exception as e: RNS.log("Error while closing resource input file: "+str(e), RNS.LOG_ERROR)
else: else:
# Otherwise we'll recursively create the # Otherwise we'll recursively create the
# next segment of the resource # next segment of the resource
@ -1202,6 +1251,7 @@ class ResourceAdvertisement:
self.c = resource.compressed # Compression flag self.c = resource.compressed # Compression flag
self.e = resource.encrypted # Encryption flag self.e = resource.encrypted # Encryption flag
self.s = resource.split # Split flag self.s = resource.split # Split flag
self.x = resource.has_metadata # Metadata flag
self.i = resource.segment_index # Segment index self.i = resource.segment_index # Segment index
self.l = resource.total_segments # Total segments self.l = resource.total_segments # Total segments
self.q = resource.request_id # ID of associated request self.q = resource.request_id # ID of associated request
@ -1217,7 +1267,7 @@ class ResourceAdvertisement:
self.p = True self.p = True
# Flags # Flags
self.f = 0x00 | self.p << 4 | self.u << 3 | self.s << 2 | self.c << 1 | self.e self.f = 0x00 | self.x << 5 | self.p << 4 | self.u << 3 | self.s << 2 | self.c << 1 | self.e
def get_transfer_size(self): def get_transfer_size(self):
return self.t return self.t
@ -1237,6 +1287,9 @@ class ResourceAdvertisement:
def is_compressed(self): def is_compressed(self):
return self.c return self.c
def has_metadata(self):
return self.x
def get_link(self): def get_link(self):
return self.link return self.link
@ -1286,5 +1339,6 @@ class ResourceAdvertisement:
adv.s = True if ((adv.f >> 2) & 0x01) == 0x01 else False adv.s = True if ((adv.f >> 2) & 0x01) == 0x01 else False
adv.u = True if ((adv.f >> 3) & 0x01) == 0x01 else False adv.u = True if ((adv.f >> 3) & 0x01) == 0x01 else False
adv.p = True if ((adv.f >> 4) & 0x01) == 0x01 else False adv.p = True if ((adv.f >> 4) & 0x01) == 0x01 else False
adv.x = True if ((adv.f >> 5) & 0x01) == 0x01 else False
return adv return adv