diff --git a/evio/broker/__init__.py b/evio/broker/__init__.py index 20618ae..cd1db57 100644 --- a/evio/broker/__init__.py +++ b/evio/broker/__init__.py @@ -63,7 +63,6 @@ __all__ = [ "GENEVE_SETUP_TIMEOUT", "MIN_SUCCESSORS", "MAX_ON_DEMAND_EDGES", - "PEER_DISCOVERY_COALESCE", "EXCLUSION_BASE_INTERVAL", "MAX_SUCCESSIVE_FAILS", "TRIM_CHECK_INTERVAL", @@ -121,7 +120,6 @@ SDN_CONTROLLER_PORT: Literal[6633] = 6633 GENEVE_SETUP_TIMEOUT: Literal[180] = 180 MIN_SUCCESSORS: Literal[2] = 2 MAX_ON_DEMAND_EDGES: Literal[3] = 3 -PEER_DISCOVERY_COALESCE: Literal[1] = 1 EXCLUSION_BASE_INTERVAL: Literal[60] = 60 MAX_SUCCESSIVE_FAILS: Literal[4] = 4 TRIM_CHECK_INTERVAL: Literal[300] = 300 diff --git a/evio/broker/broker.py b/evio/broker/broker.py index 78d1708..aba0aa9 100644 --- a/evio/broker/broker.py +++ b/evio/broker/broker.py @@ -205,8 +205,6 @@ class Broker: self._que_listener.start() for k, v in self.cfg_controllers.items(): ctr_lgl = self._config["Broker"].get("LogLevel", LOG_LEVEL) - if "LogLevel" in self._controller_config(k): - ctr_lgl = self._controller_config(k)["LogLevel"] self._setup_controller_logger((k, v["Module"]), formatter, ctr_lgl) def _setup_controller_logger( @@ -250,8 +248,8 @@ class Broker: for ctrl_name in self._load_order: self.load_module(ctrl_name) - self._ipc.start() self._timers.start() + self._ipc.start() # intialize the the CMs via their respective nexus for ctrl_name in self._load_order: self._nexus_map[ctrl_name].initialize() @@ -368,8 +366,6 @@ class Broker: def terminate(self): with self._nexus_lock: - self._timers.terminate() - self._ipc.terminate() for ctrl_name in reversed(self._load_order): wn = self._nexus_map[ctrl_name]._cm_thread.name self._nexus_map[ctrl_name].work_queue.put(None) @@ -377,6 +373,8 @@ class Broker: wn = self._nexus_map[ctrl_name]._cm_thread.name self._nexus_map[ctrl_name]._cm_thread.join() self.logger.info("%s exited", wn) + self._ipc.terminate() + self._timers.terminate() for ql in self._cm_qlisteners: ql.stop() self._que_listener.stop() @@ -505,6 +503,9 @@ class Broker: def register_timed_transaction(self, entry: Transaction): self._timers.register(entry) + def register_dpc(self, delay, call, params=()): + self._timers.register_dpc(delay, call, params) + def dispach_proxy_msg(self, msg: ProxyMsg): # task structure # dict(Request=dict(Target=CM, Action=None, Params=None), diff --git a/evio/broker/cbt.py b/evio/broker/cbt.py index 3c9ffcb..2244ea2 100644 --- a/evio/broker/cbt.py +++ b/evio/broker/cbt.py @@ -90,9 +90,9 @@ class CBT: return introspect(self) def __itr__(self): - yield ("status", self.status) yield ("initiator", self.initiator) yield ("recipient", self.recipient) + yield ("status", self.status) yield ("data", self.data) def update(self, data, status: bool): @@ -116,7 +116,10 @@ class CBT: self.response: Optional[CBT.Response] = None self.context: dict = {} for k, v in kwargs.items(): - self.context[k] = v + if hasattr(self, k): + self.k = v + else: + self.context[k] = v self.lifespan = CBT_LIFESPAN self.time_created: float = 0.0 self.time_submited: float = 0.0 diff --git a/evio/broker/controller_module.py b/evio/broker/controller_module.py index 4dabf8c..ebf87f2 100644 --- a/evio/broker/controller_module.py +++ b/evio/broker/controller_module.py @@ -50,9 +50,9 @@ class ControllerModule: def _setup_logger(self): self.logger = logging.getLogger(self.__class__.__name__) - # if "LogLevel" in self.config: - # self.logger.setLevel(self.config["LogLevel"]) - # return + if "LogLevel" in self.config: + self.logger.setLevel(self.config["LogLevel"]) + return @abstractmethod def initialize(self): @@ -260,6 +260,9 @@ class ControllerModule: raise ValueError(f"Object already marked as completed {obj}") self._nexus.register_timed_transaction(obj, is_completed, on_expired, lifespan) + def register_deferred_call(self, delay, call, params=()): + self._nexus.register_deferred_call(delay, call, params) + @abstractmethod def handle_ipc(self, msg: ProxyMsg): NotImplemented diff --git a/evio/broker/nexus.py b/evio/broker/nexus.py index 57057e5..8e27bf6 100644 --- a/evio/broker/nexus.py +++ b/evio/broker/nexus.py @@ -23,7 +23,7 @@ import queue import threading import time -from . import CONTROLLER_TIMER_INTERVAL, EVENT_PERIOD, statement_false +from . import CONTROLLER_TIMER_INTERVAL, EVENT_PERIOD from .cbt import CBT from .process_proxy import ProxyMsg from .timed_transactions import Transaction @@ -44,10 +44,6 @@ class Nexus: ) self._timer_loop_cnt: int = 1 self._pending_cbts: dict[int, CBT] = {} - self._last_ctlr_update_ts = time.time() - self._ctlr_update = Transaction( - self, statement_false, self.on_timer, self._timer_interval - ) @property def controller(self): @@ -131,7 +127,7 @@ class Nexus: def start_controller(self): self._cm_thread.start() - self._broker.register_timed_transaction(self._ctlr_update) + self._broker.register_dpc(self._timer_interval, self.on_timer) def __worker(self): # get CBT from the local queue and call process_cbt() of the @@ -182,11 +178,7 @@ class Nexus: f"Unexpected CBT state for expired event. {cbt}" ) - def _schedule_ctlr_update(self): - self._ctlr_update.lifespan = self._timer_interval - self._broker.register_timed_transaction(self._ctlr_update) - - def on_timer(self, nexus, time_expired: float): + def on_timer(self): try: self._controller.log_state() self._controller.on_timer_event() @@ -194,7 +186,7 @@ class Nexus: self._controller.logger.warning( "on_timer exception: %s", err, exc_info=True ) - self._schedule_ctlr_update() + self._broker.register_dpc(self._timer_interval, self.on_timer) def query_param(self, param_name=""): return self._broker.query_param(param_name) @@ -235,5 +227,8 @@ class Nexus: ) ) + def register_deferred_call(self, delay, call, params): + self._broker.register_dpc(delay, call, params) + def send_ipc(self, msg: ProxyMsg): self._broker.send_ipc(msg) diff --git a/evio/broker/process_proxy.py b/evio/broker/process_proxy.py index 447824d..aaf0af1 100644 --- a/evio/broker/process_proxy.py +++ b/evio/broker/process_proxy.py @@ -53,16 +53,14 @@ class ProxyMsg: def data(self, payload: bytearray): self.ts = time.time() self._json = None - if payload is None: - return self.payload = payload def __repr__(self) -> str: - return f"{self.fileno}:{self.payload.decode('utf-8')}" + return f"{self.fileno}:{self.json}" @property def json(self): - if self._json is None: + if self._json is None and self.payload: self._json = json.loads(self.payload.decode("utf-8")) return self._json @@ -124,6 +122,8 @@ class ProcessProxy: while not self._exit_ev.is_set(): while self.tx_que.qsize() > 0: msg: ProxyMsg = self.tx_que.get_nowait() + if not msg.data: + continue node = connections.get(msg.fileno, None) if node is not None: if not node.event & select.EPOLLOUT: @@ -154,6 +154,7 @@ class ProcessProxy: "Node %s IPC read hangup", node.skt.fileno() ) if not node.tx_deque: + connections.pop(fileno) self.close_client(node) elif event & select.EPOLLHUP: node = connections.pop(fileno) @@ -164,10 +165,12 @@ class ProcessProxy: bufsz = int.from_bytes( connections[fileno].skt.recv(2), sys.byteorder ) - if bufsz <= 0: - node = connections[fileno] - node.skt.shutdown(socket.SHUT_WR) - elif bufsz > 65507: + if bufsz == 0: + connections[fileno].skt.recv(0) + self.logger.warning( + "Zero byte read buffer size received" + ) + elif bufsz < 0 or bufsz > 65507: node = connections[fileno] connections.pop(fileno) self.close_client(node) diff --git a/evio/broker/timed_transactions.py b/evio/broker/timed_transactions.py index 95bcab5..10e8a76 100644 --- a/evio/broker/timed_transactions.py +++ b/evio/broker/timed_transactions.py @@ -31,7 +31,12 @@ class TimedTransactions: def register(self, entry: Transaction): if self._exit_ev.is_set(): return - self._sched.enter(entry.lifespan, entry.priority, self._get_expired, [entry]) + self._sched.enter(entry.lifespan, entry.priority, self._get_expired, (entry,)) + + def register_dpc(self, delay: float, call, params: tuple): + if self._exit_ev.is_set(): + return + self._sched.enter(delay, 15, call, params) def _get_expired(self, entry): if not entry.is_completed(): diff --git a/evio/controllers/bridge_controller.py b/evio/controllers/bridge_controller.py index 18b61f1..1005960 100644 --- a/evio/controllers/bridge_controller.py +++ b/evio/controllers/bridge_controller.py @@ -27,6 +27,7 @@ except ImportError: import copy import logging +import signal import subprocess import threading from abc import ABCMeta, abstractmethod @@ -49,6 +50,7 @@ from broker.cbt import CBT from broker.controller_module import ControllerModule from broker.process_proxy import ProxyMsg from pyroute2 import IPRoute +from pyroute2.netlink.exceptions import NetlinkError from .tunnel import TUNNEL_EVENTS @@ -171,10 +173,12 @@ class OvsBridge(BridgeABC): def add_port(self, port_name, port_descr): self.del_port(port_name) - self.flush_ip_addresses(self.name) with IPRoute() as ipr: - idx = ipr.link_lookup(ifname=port_name)[0] - ipr.link("set", index=idx, mtu=self.mtu) + idx = ipr.link_lookup(ifname=port_name) + if len(idx) < 1: + raise NetlinkError(19, "No such device") + ipr.flush_addr(index=idx[0]) + ipr.link("set", index=idx[0], mtu=self.mtu) broker.run_proc( [OvsBridge.brctl, "--may-exist", "add-port", self.name, port_name] ) @@ -263,9 +267,12 @@ class LinuxBridge(BridgeABC): def add_port(self, port_name, port_descr): with IPRoute() as ipr: - idx = ipr.link_lookup(ifname=port_name)[0] - ipr.link("set", index=idx, mtu=self.mtu) - ipr.link("set", index=idx, master=ipr.link_lookup(ifname=self.name)[0]) + idx = ipr.link_lookup(ifname=port_name) + if len(idx) < 1: + raise NetlinkError(19, "No such device") + ipr.flush_addr(index=idx[0]) + ipr.link("set", index=idx[0], mtu=self.mtu) + ipr.link("set", index=idx[0], master=ipr.link_lookup(ifname=self.name)[0]) self.ports.add(port_name) self.port_descriptors[port_name] = port_descr @@ -307,16 +314,20 @@ class VNIC(BridgeABC): def add_port(self, port_name): self.name = port_name with IPRoute() as ipr: - idx = ipr.link_lookup(ifname=port_name)[0] - ipr.link("set", index=idx, mtu=self.mtu) - ipr.addr("add", index=idx, address=self.ip_addr, mask=self.prefix_len) - ipr.link("set", index=idx, state="up") + idx = ipr.link_lookup(ifname=port_name) + if len(idx) < 1: + raise NetlinkError(19, "No such device") + ipr.link("set", index=idx[0], mtu=self.mtu) + ipr.addr("add", index=idx[0], address=self.ip_addr, mask=self.prefix_len) + ipr.link("set", index=idx[0], state="up") def del_port(self, port_name): with IPRoute() as ipr: - idx = ipr.link_lookup(ifname="port_name")[0] - ipr.link("set", index=idx, state="down") - ipr.link("del", index=idx) + idx = ipr.link_lookup(ifname=port_name) + if len(idx) < 1: + raise NetlinkError(19, "No such device") + ipr.link("set", index=idx[0], state="down") + ipr.link("del", index=idx[0]) ################################################################################################### @@ -447,7 +458,7 @@ class BridgeController(ControllerModule): net_ovl["NetDevice"] = {} # start the BF proxy if at least one overlay is configured for it if "BoundedFlood" in self.config: - self._start_bf_proxy_server() + self._start_boundedflood() # create and configure the bridge for each overlay _ = self._create_overlay_bridges() publishers = self.get_registered_publishers() @@ -490,7 +501,7 @@ class BridgeController(ControllerModule): "TOP_REQUEST_OND_TUNNEL": self.resp_handler_default, } - def _start_bf_proxy_server(self): + def _start_boundedflood(self): bf_config = self.config["BoundedFlood"] bf_config["NodeId"] = self.node_id bf_ovls = bf_config.pop("Overlays") @@ -502,7 +513,7 @@ class BridgeController(ControllerModule): ) bf_config[br_name] = bf_ovls[olid] bf_config[br_name]["OverlayId"] = olid - self.start_bf_client_module(bf_config) + self._start_bf_module(bf_config) def _create_overlay_bridges(self) -> dict: ign_br_names = {} @@ -548,7 +559,7 @@ class BridgeController(ControllerModule): except Exception as err: self._tunnels[overlay_id].pop(port_name, None) bridge.del_port(port_name) - self.logger.info("Failed to add port %s. %s", tnl_data, err, exc_info=True) + self.logger.info("Failed to add port %s. %s", tnl_data, err) def req_handler_manage_bridge(self, cbt: CBT): try: @@ -558,9 +569,12 @@ class BridgeController(ControllerModule): if cbt.request.params["UpdateType"] == TUNNEL_EVENTS.Connected: self._add_tunnel_port(olid, port_name, cbt.request.params) elif cbt.request.params["UpdateType"] == TUNNEL_EVENTS.Removed: - self._tunnels[olid].pop(port_name, None) - bridge.del_port(port_name) - self.logger.info("Port %s removed from bridge %s", port_name, bridge) + if port_name: + self._tunnels[olid].pop(port_name, None) + bridge.del_port(port_name) + self.logger.info( + "Port %s removed from bridge %s", port_name, bridge + ) except Exception as err: self.logger.warning("Manage bridge error %s", err, exc_info=True) cbt.set_response(None, True) @@ -569,10 +583,17 @@ class BridgeController(ControllerModule): def on_timer_event(self): for tnl in self._tunnels.values(): tnl.trim() + if self._bf_proc is not None: + exit_code = self._bf_proc.poll() + if exit_code: + self.logger.info( + "BF module terminated unexpectedly (%s), restarting.", exit_code + ) + self._start_boundedflood() def terminate(self): try: - self.stop_bf_module() + self._stop_bf_module() for olid, bridge in self._ovl_net.items(): if self.overlays[olid]["NetDevice"].get( "AutoDelete", BRIDGE_AUTO_DELETE @@ -671,7 +692,7 @@ class BridgeController(ControllerModule): msg.data = json.dumps(task).encode("utf-8") self.send_ipc(msg) - def start_bf_client_module(self, bf_config): + def _start_bf_module(self, bf_config): RyuManager = spawn.find_executable("ryu-manager") if RyuManager is None: raise RuntimeError("RyuManager was not found, is it installed?") @@ -687,13 +708,13 @@ class BridgeController(ControllerModule): ] self._bf_proc = subprocess.Popen(cmd) - def stop_bf_module(self, wt: int = 1.15): + def _stop_bf_module(self, wt: int = 1.15): if self._bf_proc is not None: try: exit_code = self._bf_proc.poll() if exit_code is None: - self._bf_proc.terminate() - self._bf_proc.wait() + self._bf_proc.send_signal(signal.SIGINT) + self._bf_proc.wait(wt) else: self.logger.debug( "BoundedFlood process %s already exited with %s", @@ -707,7 +728,5 @@ class BridgeController(ControllerModule): "Killing unresponsive BoundedFlood: %s", self._bf_proc.pid ) self._bf_proc.kill() + self._bf_proc = None self.logger.info("BoundedFlood terminated") - - -# Todo: check if BF process exited and restart if not shutting down diff --git a/evio/controllers/geneve_tunnel.py b/evio/controllers/geneve_tunnel.py index 22353fb..78bc3e5 100644 --- a/evio/controllers/geneve_tunnel.py +++ b/evio/controllers/geneve_tunnel.py @@ -444,4 +444,4 @@ class GeneveTunnel(ControllerModule): if tnl.state == TUNNEL_STATES.AUTHORIZED: self._deauth_tnl(tnl) else: - self._rollback_tnl([tnl]) + self._rollback_tnl(tnl) diff --git a/evio/controllers/link_manager.py b/evio/controllers/link_manager.py index 0d53b7c..71f477c 100644 --- a/evio/controllers/link_manager.py +++ b/evio/controllers/link_manager.py @@ -83,6 +83,9 @@ class Tunnel: def tunnel_state(self, new_state): self.state = new_state + def is_tnl_online(self) -> bool: + return bool(self.tunnel_state == TUNNEL_STATES.ONLINE) + class LinkManager(ControllerModule): TAPNAME_MAXLEN = 15 @@ -375,13 +378,15 @@ class LinkManager(ControllerModule): self._tunnels[tnlid].link.status_retry = 0 elif cbt.request.params["Command"] == "LinkDisconnected": if self._tunnels[tnlid].tunnel_state != TUNNEL_STATES.QUERYING: - self.logger.debug("Link %s is disconnected", tnlid) # issue a link state check only if it not already being done + self.logger.debug("Link %s is disconnected", tnlid) self._tunnels[tnlid].tunnel_state = TUNNEL_STATES.QUERYING cbt.set_response(data=None, status=True) - self.register_cbt( - "TincanTunnel", "TCI_QUERY_LINK_INFO", {"TunnelId": tnlid} - ) + self.register_deferred_call( + 5, + self.register_cbt, + ("TincanTunnel", "TCI_QUERY_LINK_INFO", {"TunnelId": tnlid}), + ) # issue link stat check in 5 secs as the link can reconnect elif cbt.request.params["Command"] == "TincanTunnelFailed": lnkid = self.link_id(tnlid) if lnkid: @@ -736,8 +741,7 @@ class LinkManager(ControllerModule): return ign_netinf def is_tnl_online(self, tnl: Tunnel) -> bool: - return bool(tnl.tunnel_state == TUNNEL_STATES.ONLINE) - # return bool(tnl.link and tnl.link.creation_state == 0xC0) + return tnl.is_tnl_online() def _remove_link_from_tunnel(self, tnlid): tnl = self._tunnels.get(tnlid) @@ -855,7 +859,8 @@ class LinkManager(ControllerModule): self.complete_cbt(parent_cbt) return lnkid = self.link_id(tnlid) - self._tunnels[tnlid].link.creation_state = 0xC0 + tnl = self._tunnels[tnlid] + tnl.link.creation_state = 0xC0 self.logger.debug( "Creating link %s to peer %s (5/5 Initiator)", tnlid[:7], peer_id[:7] ) @@ -868,6 +873,13 @@ class LinkManager(ControllerModule): self.node_id[:7], peer_id[:7], ) + if not tnl.is_tnl_online(): + self.register_timed_transaction( + tnl, + self.is_tnl_online, + self.on_tnl_timeout, + LINK_SETUP_TIMEOUT, + ) def _complete_link_endpt_request(self, cbt: CBT): # Create Link: Phase 4 Node B @@ -1036,40 +1048,3 @@ class LinkManager(ControllerModule): self._tunnels.pop(tnl.tnlid, None) if tnl.link: self._links.pop(tnl.link.lnkid, None) - - -""" TODO: OUTDATED, NEED TO BE UPDATED -################################################################################################### -Link Manager state and event specifications -################################################################################################### - -If LM fails a CBT there will be no further events fired for the tunnel. -Once tunnel goes online an explicit CBT LNK_REMOVE_TUNNEL is required. -Partially created tunnels that fails will be removed automatically by LM. - -Events -(1) TunnelEvents.AuthExpired - After a successful completion of CBT LNK_AUTH_TUNNEL, the tunnel -descriptor is created and TunnelEvents.Authorized is fired. -(2) TunnelEvents.AuthExpired - If no action is taken on the tunnel within LinkSetupTimeout LM will -fire TunnelEvents.AuthExpired and remove the associated tunnel descriptor. -(3) ##REMOVED## TunnelEvents.Created - On both nodes A & B, on a successful completion of CBT -TCI_CREATE_TUNNEL, the TAP device exists and TunnelEvents.Created is fired. -(4) TunnelEvents.Connected - After Tincan delivers the online event to LM TunnelEvents.Connected -is fired. -(5) TunnelEvents.Disconnected - After Tincan signals link offline or QUERYy_LNK_STATUS discovers -offline TunnelEvents.Disconnected is fired. -(6) TunnelEvents.Removed - After the TAP device is removed TunnelEvents.Removed is fired and the -tunnel descriptor is removed. Tunnel must be in TUNNEL_STATES.ONLINE or TUNNEL_STATES.OFFLINE - - Internal States -(1) TUNNEL_STATES.AUTHORIZED - After a successful completion of CBT LNK_AUTH_TUNNEL, the tunnel -descriptor exists. -(2) TUNNEL_STATES.CREATING - entered on reception of CBT LNK_CREATE_TUNNEL. -(3) TUNNEL_STATES.QUERYING - entered before issuing CBT TCI_QUERY_LINK_INFO. Happens when -LinkStateChange is LINK_STATE_DOWN and state is not already TUNNEL_STATES.QUERYING; OR -TCI_QUERY_LINK_INFO is OFFLINE and state is not already TUNNEL_STATES.QUERYING. -(4) TUNNEL_STATES.ONLINE - entered when CBT TCI_QUERY_LINK_INFO is ONLINE or LinkStateChange is -LINK_STATE_UP. -(5) TUNNEL_STATES.OFFLINE - entered when QUERY_LNK_STATUS is OFFLINE or LinkStateChange is -LINK_STATE_DOWN event. -""" diff --git a/evio/controllers/signal.py b/evio/controllers/signal.py index 37477a2..4c764ab 100644 --- a/evio/controllers/signal.py +++ b/evio/controllers/signal.py @@ -39,7 +39,7 @@ from typing import Optional, Tuple, Union import broker import slixmpp -from broker import CACHE_EXPIRY_INTERVAL, PRESENCE_INTERVAL, statement_false +from broker import CACHE_EXPIRY_INTERVAL, PRESENCE_INTERVAL from broker.cbt import CBT from broker.controller_module import ControllerModule from broker.remote_action import RemoteAction @@ -403,17 +403,13 @@ class XmppTransport(slixmpp.ClientXMPP): self.loop.close() self.logger.debug("Event loop closed on XMPP overlay=%s", self._overlay_id) - def shutdown( - self, - ): - self.logger.debug("Initiating shutdown of XMPP overlay=%s", self._overlay_id) - self.disconnect(reason="controller shutdown", ignore_send_queue=True) - # self.loop.call_soon_threadsafe(self.disconnect(reason="controller shutdown", ignore_send_queue=True)) - self.logger.debug("Disconnect of XMPP overlay=%s", self._overlay_id) + def shutdown(self): + self.logger.debug("Initiating shutdown of XMPP overlay %s", self._overlay_id) + self.loop.call_soon_threadsafe(self.disconnect, 2, "controller shutdown", True) class XmppCircle: - _REFLECT: list[str] = ["xport", "_transmission_queue", "jid_cache"] + _REFLECT: list[str] = ["xport", "transmit_queues", "jid_cache"] def __init__( self, node_id: str, overlay_id: str, ovl_config: dict, **kwargs @@ -507,11 +503,9 @@ class Signal(ControllerModule): ) self._circles[olid] = xcir xcir.start() - self.register_timed_transaction( - self, - statement_false, - self.on_exp_presence, + self.register_deferred_call( PRESENCE_INTERVAL * random.randint(1, 5), + self.on_exp_presence, ) self.logger.info("Controller module loaded") @@ -520,16 +514,13 @@ class Signal(ControllerModule): 20, 50 ) - def on_exp_presence(self, *_): + def on_exp_presence(self): with self._lck: for circ in self._circles.values(): if circ.xport and circ.xport.is_connected(): circ.xport.send_presence_safe(pstatus="ident#" + self.node_id) - self.register_timed_transaction( - self, - statement_false, - self.on_exp_presence, - self._next_anc_interval(), + self.register_deferred_call( + self._next_anc_interval(), self.on_exp_presence ) def on_presence(self, msg): @@ -709,33 +700,26 @@ class Signal(ControllerModule): def scavenge_expired_outgoing_rem_acts(self, outgoing_rem_acts: dict[str, Queue]): # clear out the JID Refresh queue for a peer if the oldest entry age exceeds the limit peer_ids = [] - for peer_id in outgoing_rem_acts: - peer_qlen = outgoing_rem_acts[peer_id].qsize() - if not outgoing_rem_acts[peer_id].queue: - continue - remact_descr = outgoing_rem_acts[peer_id].queue[ - 0 - ] # peek at the first/oldest entry - if time.time() - remact_descr[2] >= self._jid_resolution_timeout: - peer_ids.append(peer_id) - self.logger.debug( - "Remote acts scavenged for removal peer_id %s qlength %d", - peer_id, - peer_qlen, - ) + for peer_id, transmit_queue in outgoing_rem_acts.items(): + if transmit_queue.queue: + remact_descr = transmit_queue.queue[0] # peek at the first/oldest entry + if time.time() - remact_descr[2] >= self._jid_resolution_timeout: + peer_ids.append(peer_id) + self.logger.debug( + "Remote act scavenged for removal %s", remact_descr + ) for peer_id in peer_ids: - remact_que = outgoing_rem_acts.pop(peer_id, Queue()) - while True: - try: - remact = remact_que.get_nowait() - except Empty: - break - else: - tag = remact[1].action_tag - cbt = self._cbts_pending_remote_resp.pop(tag, None) - cbt.set_response("Peer lookup failed", False) - self.complete_cbt(cbt) - remact_que.task_done() + transmit_queue: Queue = outgoing_rem_acts[peer_id] + try: + remact = transmit_queue.get_nowait() + except Empty: + return + tag = remact[1].action_tag + cbt = self._cbts_pending_remote_resp.pop(tag, None) + # if cbt: + cbt.set_response("Peer lookup failed", False) + self.complete_cbt(cbt) + transmit_queue.task_done() def _setup_circle(self, overlay_id: str): xcir = XmppCircle( diff --git a/evio/controllers/tincan_tunnel.py b/evio/controllers/tincan_tunnel.py index 02b684f..20ff62b 100644 --- a/evio/controllers/tincan_tunnel.py +++ b/evio/controllers/tincan_tunnel.py @@ -26,10 +26,11 @@ except ImportError: import subprocess import time +from copy import deepcopy from threading import Event import broker -from broker import TINCAN_CHK_INTERVAL, statement_false +from broker import TINCAN_CHK_INTERVAL from broker.cbt import CBT from broker.controller_module import ControllerModule from broker.process_proxy import ProxyMsg @@ -83,7 +84,7 @@ class TincanTunnel(ControllerModule): self._register_req_handlers() self._register_resp_handlers() self._tci_publisher = self.publish_subscription("TCI_TUNNEL_EVENT") - self.on_expire_chk_tincan() + self.register_deferred_call(TINCAN_CHK_INTERVAL, self.on_expire_chk_tincan) self.logger.info("Controller module loaded") def _register_abort_handlers(self): @@ -135,7 +136,7 @@ class TincanTunnel(ControllerModule): def _create_tunnel(self, cbt: CBT): msg = cbt.request.params tnlid = msg["TunnelId"] - ctl = broker.CTL_CREATE_TUNNEL + ctl = deepcopy(broker.CTL_CREATE_TUNNEL) ctl["TransactionId"] = cbt.tag req = ctl["Request"] req["StunServers"] = msg["StunServers"] @@ -147,9 +148,6 @@ class TincanTunnel(ControllerModule): tc_proc = self._tc_proc_tbl[tnlid] self._tnl_cbts[cbt.tag] = cbt self.send_control(tc_proc.ipc_id, json.dumps(ctl)) - for turn in req["TurnServers"]: - turn["User"] = "***" - turn["Password"] = "***" def req_handler_create_link(self, cbt: CBT): try: @@ -159,6 +157,8 @@ class TincanTunnel(ControllerModule): cbt.add_context("OnRegister", self._create_link) self._tnl_cbts[tnlid] = cbt self._start_tincan(tnlid) + self._tc_proc_tbl[tnlid].ovlid = msg["OverlayId"] + self._tc_proc_tbl[tnlid].tap_name = msg["TapName"] else: self._create_link(cbt) except Exception: @@ -169,7 +169,7 @@ class TincanTunnel(ControllerModule): def _create_link(self, cbt: CBT): msg = cbt.request.params tnlid = msg["TunnelId"] - ctl = broker.CTL_CREATE_LINK + ctl = deepcopy(broker.CTL_CREATE_LINK) ctl["TransactionId"] = cbt.tag req = ctl["Request"] req["TunnelId"] = tnlid @@ -187,9 +187,6 @@ class TincanTunnel(ControllerModule): tc_proc = self._tc_proc_tbl[tnlid] self._tnl_cbts[cbt.tag] = cbt self.send_control(tc_proc.ipc_id, json.dumps(ctl)) - for turn in req["TurnServers"]: - turn["User"] = "***" - turn["Password"] = "***" def req_handler_query_candidate_address_set(self, cbt: CBT): msg = cbt.request.params @@ -198,7 +195,7 @@ class TincanTunnel(ControllerModule): err_msg = f"No tunnel exists for tunnel ID: {tnlid[:7]}" cbt.set_response({"ErrorMsg": err_msg, "Status": False}) return - ctl = broker.CTL_QUERY_CAS + ctl = deepcopy(broker.CTL_QUERY_CAS) ctl["TransactionId"] = cbt.tag ctl["Request"]["TunnelId"] = tnlid tc_proc = self._tc_proc_tbl[tnlid] @@ -206,17 +203,16 @@ class TincanTunnel(ControllerModule): self.send_control(tc_proc.ipc_id, json.dumps(ctl)) def req_handler_query_link_stats(self, cbt: CBT): - msg = cbt.request.params - tnlid = msg["TunnelId"] - if tnlid not in self._tc_proc_tbl: + tnlid = cbt.request.params["TunnelId"] + tc_proc = self._tc_proc_tbl.get(tnlid) + if not tc_proc: err_msg = f"No tunnel exists for tunnel ID: {tnlid[:7]}" cbt.set_response({"ErrorMsg": err_msg, "Status": False}) self.complete_cbt(cbt) return - ctl = broker.CTL_QUERY_LINK_STATS + ctl = deepcopy(broker.CTL_QUERY_LINK_STATS) ctl["TransactionId"] = cbt.tag ctl["Request"]["TunnelId"] = tnlid - tc_proc = self._tc_proc_tbl[tnlid] self._tnl_cbts[cbt.tag] = cbt self.send_control(tc_proc.ipc_id, json.dumps(ctl)) @@ -228,8 +224,9 @@ class TincanTunnel(ControllerModule): cbt.set_response(err_msg, True) self.complete_cbt(cbt) return - self.logger.debug("Removing tunnel %s", tnlid) + self.logger.info("Removing tunnel %s", tnlid) tc_proc = self._tc_proc_tbl.pop(tnlid, None) + self._pids.pop(tc_proc.proc.pid, None) self._stop_tincan(tc_proc) cbt.set_response("Tunnel removed", True) self.complete_cbt(cbt) @@ -242,7 +239,7 @@ class TincanTunnel(ControllerModule): cbt.set_response({"ErrorMsg": err_msg, "Status": False}) self.complete_cbt(cbt) return - ctl = broker.CTL_REMOVE_LINK + ctl = deepcopy(broker.CTL_REMOVE_LINK) ctl["TransactionId"] = cbt.tag req = ctl["Request"] req["TunnelId"] = tnlid @@ -252,16 +249,18 @@ class TincanTunnel(ControllerModule): self.send_control(tc_proc.ipc_id, json.dumps(ctl)) def req_handler_send_echo(self, cbt: CBT): - ctl = broker.CTL_ECHO + ctl = deepcopy(broker.CTL_ECHO) ctl["TransactionId"] = cbt.tag tnlid = cbt.request.params tc_proc = self._tc_proc_tbl.get(tnlid) - if tc_proc.do_chk and tc_proc.echo_replies > 0: + if tc_proc and tc_proc.do_chk and tc_proc.echo_replies > 0: tc_proc.echo_replies -= 1 ctl["Request"]["Message"] = tc_proc.tnlid self._tnl_cbts[cbt.tag] = cbt self.send_control(tc_proc.ipc_id, json.dumps(ctl)) + else: + tc_proc.do_chk = False cbt.set_response(f"Cannot send echo to {tc_proc}", False) self.complete_cbt(cbt) @@ -269,6 +268,9 @@ class TincanTunnel(ControllerModule): tnlid = cbt.response.data if cbt.response.status and tnlid in self._tc_proc_tbl: self._tc_proc_tbl[tnlid].echo_replies = broker.MAX_HEARTBEATS + self.register_internal_cbt( + "TCI_QUERY_LINK_INFO", {"TunnelId": tnlid}, lifespan=15 + ) else: self.logger.info(cbt.response.data) self.free_cbt(cbt) @@ -288,32 +290,35 @@ class TincanTunnel(ControllerModule): else: # tincan process unresponsive self.logger.warning( - "unnel: %s health check failed, terminating process: %s", + "Tunnel: %s health check failed, terminating process: %s", tnlid, tc_proc, ) - self._stop_tincan(tc_proc) - self._notify_tincan_terminated(tnlid) + self._pids.pop(tc_proc.proc.pid, None) self._tc_proc_tbl.pop(tnlid, None) + self._stop_tincan(tc_proc) + self._notify_tincan_terminated(tc_proc) - def req_handler_check_process(self, cbt): - if self.exit_ev.is_set(): - return + def req_handler_check_process(self, cbt: CBT): exit_code = None rmv = [] - for tnlid, tc_proc in self._tc_proc_tbl.items(): + for tc_proc in self._tc_proc_tbl.values(): exit_code = tc_proc.proc.poll() if exit_code: # tincan process crashed - rmv.append(tnlid) - for tnlid in rmv: - self.logger.warning( - "Tincan process %s exited unexpectedly with code, %s", - tc_proc.proc.pid, - exit_code, - ) - self._notify_tincan_terminated(tnlid) - self._tc_proc_tbl.pop(tnlid, None) + self.logger.warning( + "Tincan process %s exited unexpectedly with code %s", + tc_proc.proc.pid, + exit_code, + ) + rmv.append(tc_proc) + for tc_proc in rmv: + self._pids.pop(tc_proc.proc.pid) + self._tc_proc_tbl.pop(tc_proc.tnlid, None) + self._remove_tap(tc_proc.tap_name) + self._notify_tincan_terminated(tc_proc) + cbt.set_response(rmv, True) + self.complete_cbt(cbt) def on_timer_event(self): if self.exit_ev.is_set(): @@ -321,18 +326,14 @@ class TincanTunnel(ControllerModule): # send an echo health check every timer interval, eg., 30s for tnlid, tc_proc in self._tc_proc_tbl.items(): if tc_proc.do_chk: - self.register_internal_cbt("_TCI_SEND_ECHO", tnlid) + self.register_internal_cbt("_TCI_SEND_ECHO", tnlid, lifespan=10) - def on_expire_chk_tincan(self, *_): + def on_expire_chk_tincan(self): if self.exit_ev.is_set(): return - self.register_internal_cbt("_TCI_CHK_PROCESS") - self.register_timed_transaction( - self, - statement_false, - self.on_expire_chk_tincan, - TINCAN_CHK_INTERVAL, - ) + if self._tc_proc_tbl: + self.register_internal_cbt("_TCI_CHK_PROCESS") + self.register_deferred_call(TINCAN_CHK_INTERVAL, self.on_expire_chk_tincan) def terminate(self): self.exit_ev.set() @@ -399,34 +400,35 @@ class TincanTunnel(ControllerModule): except subprocess.TimeoutExpired: exit_code = tc_proc.proc.poll() if exit_code is None: - self._remove_tap() + self._remove_tap(tc_proc.tap_name) tc_proc.proc.kill() self._kill_times.append(self._kill_times[-1] + time.time() - ts) self.logger.debug("Killed unresponsive Tincan: %s", tc_proc.proc.pid) - self._pids.pop(tc_proc.proc.pid) self.logger.info( "Process %s for tunnel %s terminated", tc_proc.proc.pid, tc_proc.tnlid ) - def _notify_tincan_terminated(self, tnlid: str): + def _notify_tincan_terminated(self, tc_proc: TincanProcess): self._tci_publisher.post_update( { "Command": "TincanTunnelFailed", "Reason": "Tincan process terminated", - "OverlayId": self._tc_proc_tbl[tnlid].ovlid, - "TunnelId": tnlid, - "TapName": self._tc_proc_tbl[tnlid].tap_name, + "OverlayId": tc_proc.ovlid, + "TunnelId": tc_proc.tnlid, + "TapName": tc_proc.tap_name, } ) def handle_ipc(self, msg: ProxyMsg): + if self.exit_ev.is_set(): + return try: ctl = msg.json if ctl["ProtocolVersion"] != EVIO_VER_CTL: raise ValueError("Invalid control version detected") - # self.logger.debug("Received dataplane control - %s", ctl) # Get the original CBT if this is the response if ctl["ControlType"] == "Response": + # self.logger.debug("Received Tincan control response: %s", ctl) cbt = self._tnl_cbts.pop(ctl["TransactionId"]) cbt.set_response( ctl["Response"]["Message"], diff --git a/evio/controllers/topology.py b/evio/controllers/topology.py index 6e9cfb4..3adaabb 100644 --- a/evio/controllers/topology.py +++ b/evio/controllers/topology.py @@ -31,14 +31,13 @@ from random import randint from typing import Optional import broker -from broker import ( +from broker import ( # PEER_DISCOVERY_COALESCE, CBT_LIFESPAN, EXCLUSION_BASE_INTERVAL, MAX_CONCURRENT_OPS, MAX_ON_DEMAND_EDGES, MAX_SUCCESSIVE_FAILS, MIN_SUCCESSORS, - PEER_DISCOVERY_COALESCE, STALE_INTERVAL, SUCCESSIVE_FAIL_DECR, SUCCESSIVE_FAIL_INCR, @@ -355,18 +354,7 @@ class Topology(ControllerModule): disc = DiscoveredPeer(peer_id) self._net_ovls[olid].known_peers[peer_id] = disc disc.presence() - if disc.is_available: - self._net_ovls[olid].new_peer_count += 1 - if self._net_ovls[olid].new_peer_count >= self.config.get( - "PeerDiscoveryCoalesce", PEER_DISCOVERY_COALESCE - ): - self.logger.info( - "Coalesced %d of %d discovered peers, attempting update on overlay %s", - self._net_ovls[olid].new_peer_count, - self.config.get("PeerDiscoveryCoalesce", PEER_DISCOVERY_COALESCE), - olid, - ) - self._update_overlay(olid) + self._update_overlay(olid) cbt.set_response(None, True) self.complete_cbt(cbt) @@ -678,6 +666,14 @@ class Topology(ControllerModule): self._process_next_transition(ovl) else: self.free_cbt(cbt) + ce = ovl.adjacency_list.get(peer_id) + if ce.edge_state != EDGE_STATES.Connected: + self.register_timed_transaction( + (ce, olid), + self._is_connedge_connected, + self._on_connedge_timeout, + 30, + ) def resp_handler_remove_tnl(self, cbt: CBT): params = cbt.request.params @@ -1074,9 +1070,9 @@ class Topology(ControllerModule): raise ValueError(f"Invalid request: Undefinfed tunnel type {dataplane}") def _initiate_remove_edge(self, net_ovl: NetworkOverlay, peer_id: str): - if peer_id not in net_ovl.adjacency_list: - raise RuntimeWarning("No connection edge to peer found") - ce = net_ovl.adjacency_list[peer_id] + ce = net_ovl.adjacency_list.get(peer_id) + if not ce: + return if ( ce.edge_state == EDGE_STATES.Connected and ce.role == CONNECTION_ROLE.Initiator @@ -1091,8 +1087,6 @@ class Topology(ControllerModule): raise ValueError("Successor threshold not met") self.logger.debug("Removing edge %s", ce) self._remove_tunnel(net_ovl, ce.dataplane, ce.peer_id, ce.edge_id) - return True - return False def _remove_tunnel( self, diff --git a/evio/controllers/usage_report.py b/evio/controllers/usage_report.py index 8a31e2d..2850fda 100644 --- a/evio/controllers/usage_report.py +++ b/evio/controllers/usage_report.py @@ -30,7 +30,7 @@ import urllib.request as request from urllib.error import HTTPError, URLError from broker.cbt import CBT -from broker.controller_module import ControllerModule +from broker.controller_module import ControllerModule, introspect class UsageReport(ControllerModule): @@ -44,6 +44,9 @@ class UsageReport(ControllerModule): "NodeId": hashlib.sha256(self.node_id.encode("utf-8")).hexdigest(), } + def __repr__(self): + return introspect(self) + def initialize(self): self.logger.info("Controller module loaded")