From d6c2be7d93b9e4dff36ca4285c2dfeacffdfc1ce Mon Sep 17 00:00:00 2001 From: Ken Date: Thu, 25 Jan 2024 19:18:48 +0000 Subject: [PATCH] Update tunnel events --- evio/broker/__init__.py | 7 +-- evio/controllers/geneve_tunnel.py | 28 +++-------- evio/controllers/link_manager.py | 31 ++---------- evio/controllers/topology.py | 84 +++++++++++++------------------ evio/controllers/tunnel.py | 8 ++- 5 files changed, 54 insertions(+), 104 deletions(-) diff --git a/evio/broker/__init__.py b/evio/broker/__init__.py index e096012..230d1d0 100644 --- a/evio/broker/__init__.py +++ b/evio/broker/__init__.py @@ -90,6 +90,7 @@ __all__ = [ "delim_mac_str", "statement_false", "TC_REQUEST_TIMEOUT", + "EDGE_CONNECTED_TIMEOUT", ] EVIO_VER_REL: str = f"{EVIO_VER_MJR}.{EVIO_VER_MNR}.{EVIO_VER_REV}.{EVIO_VER_BLD}" LOG_DIRECTORY = "/var/log/evio/" @@ -124,16 +125,16 @@ CM_TIMER_EVENT_INTERVAL = 30 PRESENCE_UPDATE_INTERVAL = 30 PEER_EXCLUSION_INTERVAL = 60 TRIM_CHECK_INTERVAL = 300 -TC_PRCS_CHK_INTERVAL = 5 # tincan process checks +TC_PRCS_CHK_INTERVAL = 10 # tincan process checks CACHE_ENTRY_TIMEOUT = 60 PEER_CHKIN_TIMEOUT = 7200 # 2 hrs CBT_DFLT_TIMEOUT = 160 JID_RESOLUTION_TIMEOUT = 15 GENEVE_SETUP_TIMEOUT = 90 LINK_SETUP_TIMEOUT = 130 -TC_REQUEST_TIMEOUT = 30 # exipry of a req to tincan +TC_REQUEST_TIMEOUT = 40 # exipry of a req to tincan +EDGE_CONNECTED_TIMEOUT = 140 -# perfd = None perfd = PerformanceData() CONFIG = { diff --git a/evio/controllers/geneve_tunnel.py b/evio/controllers/geneve_tunnel.py index 8c09d8a..b24a888 100644 --- a/evio/controllers/geneve_tunnel.py +++ b/evio/controllers/geneve_tunnel.py @@ -73,16 +73,10 @@ class GeneveTunnel(ControllerModule): def _deauth_tnl(self, tnl: Tunnel): self._tunnels.pop(tnl.tnlid, None) self.logger.info("Deauthorizing expired tunnel %s", tnl) - param = { - "UpdateType": TUNNEL_EVENTS.AuthExpired, - "OverlayId": tnl.overlay_id, - "PeerId": tnl.peer_id, - "TunnelId": tnl.tnlid, - "TapName": tnl.tap_name, - } - self._gnv_updates_publisher.post_update(param) def _rollback_tnl(self, tnl: Tunnel): + if tnl.tnlid not in self._tunnels: + return self.logger.info("Removing expired tunnel %s", tnl) self._tunnels.pop(tnl.tnlid, None) self._remove_tunnel(tnl.tap_name) @@ -185,11 +179,10 @@ class GeneveTunnel(ControllerModule): DATAPLANE_TYPES.Geneve, ) self._tunnels[tnlid] = tnl - self.register_timed_transaction( - tnl, - self.is_tnl_completed, - self.on_tnl_timeout, + self.register_deferred_call( GENEVE_SETUP_TIMEOUT, + self.on_tnl_timeout, + (tnl,), ) self.logger.debug( "TunnelId:%s authorization for Peer:%s completed", @@ -197,13 +190,6 @@ class GeneveTunnel(ControllerModule): peer_id[:7], ) cbt.set_response({"Message": "Geneve tunnel authorization completed"}, True) - event_param = { - "UpdateType": TUNNEL_EVENTS.Authorized, - "OverlayId": olid, - "PeerId": peer_id, - "TunnelId": tnlid, - } - self._gnv_updates_publisher.post_update(event_param) self.complete_cbt(cbt) def req_handler_create_tunnel(self, cbt: CBT): @@ -468,7 +454,9 @@ class GeneveTunnel(ControllerModule): def is_tnl_completed(self, tnl: Tunnel) -> bool: return bool(tnl.state == TUNNEL_STATES.ONLINE) - def on_tnl_timeout(self, tnl: Tunnel, timeout: float): + def on_tnl_timeout(self, tnl: Tunnel): + if self.is_tnl_completed(tnl): + return if tnl.state == TUNNEL_STATES.AUTHORIZED: self._deauth_tnl(tnl) else: diff --git a/evio/controllers/link_manager.py b/evio/controllers/link_manager.py index a3813a5..9513015 100644 --- a/evio/controllers/link_manager.py +++ b/evio/controllers/link_manager.py @@ -176,14 +176,7 @@ class LinkManager(ControllerModule): cbt.set_response( "Authorization completed, TunnelId:{0}".format(tnlid[:7]), True ) - lnkupd_param = { - "UpdateType": TUNNEL_EVENTS.Authorized, - "OverlayId": olid, - "PeerId": peer_id, - "TunnelId": tnlid, - } self.complete_cbt(cbt) - self._link_updates_publisher.post_update(lnkupd_param) def req_handler_create_tunnel(self, cbt: CBT): """Create Link: Phase 1 Node A @@ -435,25 +428,19 @@ class LinkManager(ControllerModule): olid = cbt.request.params["OverlayId"] peer_id = cbt.request.params["PeerId"] tnlid = cbt.request.params["TunnelId"] - if tnlid not in self._tunnels: + tnl = self._tunnels.get(tnlid, None) + if tnl is None: cbt.set_response("No record", True) self.complete_cbt(cbt) - elif ( - self._tunnels[tnlid].tunnel_state == TUNNEL_STATES.AUTHORIZED - or self._tunnels[tnlid].tunnel_state == TUNNEL_STATES.ONLINE - or self._tunnels[tnlid].tunnel_state == TUNNEL_STATES.OFFLINE - ): - tn = self._tunnels[tnlid].tap_name + else: + tnl.tunnel_state = TUNNEL_STATES.ABORTED params = { "OverlayId": olid, "TunnelId": tnlid, "PeerId": peer_id, - "TapName": tn, + "TapName": tnl.tap_name, } self.register_cbt("TincanTunnel", "TCI_REMOVE_TUNNEL", params, cbt) - else: - cbt.set_response("Tunnel busy, retry operation", False) - self.complete_cbt(cbt) except KeyError as err: cbt.set_response(f"Insufficient parameters {err}", False) self.complete_cbt(cbt) @@ -1001,14 +988,6 @@ class LinkManager(ControllerModule): if not tnl: return self.logger.info("Deauthorizing tunnel %s", tnlid) - param = { - "UpdateType": TUNNEL_EVENTS.AuthExpired, - "OverlayId": tnl.overlay_id, - "PeerId": tnl.peer_id, - "TunnelId": tnlid, - "TapName": tnl.tap_name, - } - self._link_updates_publisher.post_update(param) self._cleanup_failed_tunnel_data(tnl) def _rollback_link_creation_changes(self, tnlid): diff --git a/evio/controllers/topology.py b/evio/controllers/topology.py index 500cbd2..fb19374 100644 --- a/evio/controllers/topology.py +++ b/evio/controllers/topology.py @@ -32,7 +32,7 @@ from typing import Optional import broker from broker import ( # PEER_DISCOVERY_COALESCE, - CBT_DFLT_TIMEOUT, + EDGE_CONNECTED_TIMEOUT, MAX_CONCURRENT_OPS, MAX_ON_DEMAND_EDGES, MAX_SUCCESSIVE_FAILS, @@ -381,26 +381,13 @@ class Topology(ControllerModule): peer_id = update["PeerId"] overlay_id = update["OverlayId"] ovl = self._net_ovls[overlay_id] - if event == TUNNEL_EVENTS.Authorized: - """Role B""" - ce = ovl.adjacency_list[peer_id] - if ce.edge_state != EDGE_STATES.PreAuth: - raise RuntimeError(f"Invalid edge state {ce}") - ce.edge_state = EDGE_STATES.Authorized - elif event == TUNNEL_EVENTS.AuthExpired: - """Role B""" - ce = ovl.adjacency_list[peer_id] - if ce.edge_state != EDGE_STATES.Authorized: - raise RuntimeError(f"Invalid edge state {ce}") - ce.edge_state = EDGE_STATES.Deleting - ovl.adjacency_list.pop(peer_id, None) - if peer_id in ovl.known_peers: - ovl.known_peers[peer_id].exclude() - elif event == TUNNEL_EVENTS.Connected: + if event == TUNNEL_EVENTS.Connected: """Roles A & B""" ce = ovl.adjacency_list[peer_id] if ce.edge_state != EDGE_STATES.Authorized: - raise RuntimeError(f"Invalid edge state {ce}") + self.logger.warning( + "Tunnel connct event is inconsistent with current edge state %s", ce + ) ce.edge_state = EDGE_STATES.Connected ce.connected_time = update["ConnectedTimestamp"] ovl.known_peers[peer_id].restore() @@ -417,8 +404,9 @@ class Topology(ControllerModule): elif event == TUNNEL_EVENTS.Disconnected: ce = ovl.adjacency_list[peer_id] if ce.edge_state != EDGE_STATES.Connected: - raise RuntimeError( - f"Tunnel disconnected event is invalid for edge state {ce}" + self.logger.warning( + "Tunnel disconnected event is inconsistent with current edge state %s", + ce, ) # the local topology did not request removal of the connection if ( @@ -440,8 +428,9 @@ class Topology(ControllerModule): self._remove_tunnel(ovl, ce.dataplane, peer_id, ce.edge_id, ce.edge_type) elif event == TUNNEL_EVENTS.Removed: """The removed event is also generated for tincan process failure""" - ce = ovl.adjacency_list.pop(peer_id, None) + ce: ConnectionEdge = ovl.adjacency_list.pop(peer_id, None) if ce: + ce.edge_state = EDGE_STATES.Deleting self.logger.info("Edge %s removed from adjacency list", ce.edge_id) else: self.logger.warning("Invalid UpdateType specified for event %s", event) @@ -541,11 +530,10 @@ class Topology(ControllerModule): ) ce.edge_state = EDGE_STATES.PreAuth net_ovl.adjacency_list[ce.peer_id] = ce - self.register_timed_transaction( - (ce, olid), - self._is_connedge_connected, + self.register_deferred_call( + EDGE_CONNECTED_TIMEOUT, self._on_connedge_timeout, - CBT_DFLT_TIMEOUT, + (ce, olid), ) self.logger.info( "Authorizing EdgeRequest %s/%s<-%s (%s)", @@ -599,6 +587,7 @@ class Topology(ControllerModule): olid = cbt.request.params["OverlayId"] peer_id = cbt.request.params["PeerId"] ce = self._net_ovls[olid].adjacency_list[peer_id] + ce.edge_state = EDGE_STATES.Authorized perfd.record( { "ReportedBy": self.name, @@ -661,11 +650,10 @@ class Topology(ControllerModule): self.free_cbt(cbt) ce = ovl.adjacency_list.get(peer_id) if ce.edge_state != EDGE_STATES.Connected: - self.register_timed_transaction( - (ce, olid), - self._is_connedge_connected, + self.register_deferred_call( + EDGE_CONNECTED_TIMEOUT, self._on_connedge_timeout, - 30, + (ce, olid), ) def resp_handler_remove_tnl(self, cbt: CBT): @@ -674,11 +662,8 @@ class Topology(ControllerModule): ovl = self._net_ovls[olid] peer_id = params["PeerId"] ce = ovl.adjacency_list.pop(peer_id, None) - if not cbt.response.status: - self.logger.warning( - "Failed to remove topology edge. Reason: %s", cbt.response.data - ) - else: + if ce is not None: + ce.edge_state = EDGE_STATES.Deleting # record tunnel terminated on successful removal of the tunnel perfd.record( { @@ -690,13 +675,6 @@ class Topology(ControllerModule): } ) self.free_cbt(cbt) - ce_state = "" - if ce: - ce_state = ce.edge_state - ce.edge_state = EDGE_STATES.Deleting - del ce - if ce_state == EDGE_STATES.Connected: - self._process_next_transition(ovl) ################################################################################################ @@ -775,6 +753,7 @@ class Topology(ControllerModule): if net_ovl.acquire(): tns = net_ovl.transformation.pop_head() if tns.operation == OP_TYPE.Add: + net_ovl.transformation.clear() # force regenerating graph self._initiate_negotiate_edge(net_ovl, tns.conn_edge) elif tns.operation == OP_TYPE.Remove: self._initiate_remove_edge(net_ovl, tns.conn_edge.peer_id) @@ -1114,15 +1093,20 @@ class Topology(ControllerModule): if peer_id in net_ovl.known_peers: net_ovl.known_peers[peer_id].exclude() - def _is_connedge_connected(self, ce: tuple[ConnectionEdge, str]) -> bool: - return bool( - ce[0].edge_state == EDGE_STATES.Connected and ce[0].connected_time != 0.0 - ) + def _is_connedge_connected(self, ce: ConnectionEdge) -> bool: + return bool(ce.edge_state == EDGE_STATES.Connected and ce.connected_time != 0.0) - def _on_connedge_timeout(self, ce: tuple[ConnectionEdge, str], timeout: float): - ce, olid = ce + def _on_connedge_timeout(self, ce: ConnectionEdge, olid: str): + if self._is_connedge_connected(ce): + return ovl = self._net_ovls[olid] ce.edge_state = EDGE_STATES.Deleting - ovl.adjacency_list.pop(ce.peer_id, None) - if ce.peer_id in ovl.known_peers: - ovl.known_peers[ce.peer_id].exclude() + # necessary to check the edge_id as the conn edge could have been + # recreated during the expiration period + if ( + ce.peer_id in ovl.adjacency_list + and ovl.adjacency_list[ce.peer_id].edge_id == ce.edge_id + ): + ovl.adjacency_list.pop(ce.peer_id, None) + if ce.peer_id in ovl.known_peers: + ovl.known_peers[ce.peer_id].exclude() diff --git a/evio/controllers/tunnel.py b/evio/controllers/tunnel.py index 0dbf404..c2147bd 100644 --- a/evio/controllers/tunnel.py +++ b/evio/controllers/tunnel.py @@ -28,11 +28,8 @@ from pyroute2 import IPRoute # type: ignore TunnelEvents = namedtuple( "TUNNEL_EVENTS", - ["Authorized", "AuthExpired", "Created", "Connected", "Disconnected", "Removed"], + ["Connected", "Disconnected", "Removed"], defaults=[ - "LnkEvAuthorized", - "LnkEvAuthExpired", - "LnkEvCreated", "LnkEvConnected", "LnkEvDisconnected", "LnkEvRemoved", @@ -42,7 +39,7 @@ TUNNEL_EVENTS = TunnelEvents() TunnelStates = namedtuple( "TUNNEL_STATES", - ["AUTHORIZED", "CREATING", "QUERYING", "ONLINE", "OFFLINE", "FAILED"], + ["AUTHORIZED", "CREATING", "QUERYING", "ONLINE", "OFFLINE", "FAILED", "ABORTED"], defaults=[ "TNL_AUTHORIZED", "TNL_CREATING", @@ -50,6 +47,7 @@ TunnelStates = namedtuple( "TNL_ONLINE", "TNL_OFFLINE", "TNL_FAILED", + "TNL_ABORTED", ], ) TUNNEL_STATES = TunnelStates()