Recovery improvements

master
Ken Subratie 2023-11-06 18:38:38 -05:00
parent dd5ca986ef
commit 23684083e0
14 changed files with 208 additions and 223 deletions

View File

@ -63,7 +63,6 @@ __all__ = [
"GENEVE_SETUP_TIMEOUT",
"MIN_SUCCESSORS",
"MAX_ON_DEMAND_EDGES",
"PEER_DISCOVERY_COALESCE",
"EXCLUSION_BASE_INTERVAL",
"MAX_SUCCESSIVE_FAILS",
"TRIM_CHECK_INTERVAL",
@ -121,7 +120,6 @@ SDN_CONTROLLER_PORT: Literal[6633] = 6633
GENEVE_SETUP_TIMEOUT: Literal[180] = 180
MIN_SUCCESSORS: Literal[2] = 2
MAX_ON_DEMAND_EDGES: Literal[3] = 3
PEER_DISCOVERY_COALESCE: Literal[1] = 1
EXCLUSION_BASE_INTERVAL: Literal[60] = 60
MAX_SUCCESSIVE_FAILS: Literal[4] = 4
TRIM_CHECK_INTERVAL: Literal[300] = 300

View File

@ -205,8 +205,6 @@ class Broker:
self._que_listener.start()
for k, v in self.cfg_controllers.items():
ctr_lgl = self._config["Broker"].get("LogLevel", LOG_LEVEL)
if "LogLevel" in self._controller_config(k):
ctr_lgl = self._controller_config(k)["LogLevel"]
self._setup_controller_logger((k, v["Module"]), formatter, ctr_lgl)
def _setup_controller_logger(
@ -250,8 +248,8 @@ class Broker:
for ctrl_name in self._load_order:
self.load_module(ctrl_name)
self._ipc.start()
self._timers.start()
self._ipc.start()
# intialize the the CMs via their respective nexus
for ctrl_name in self._load_order:
self._nexus_map[ctrl_name].initialize()
@ -368,8 +366,6 @@ class Broker:
def terminate(self):
with self._nexus_lock:
self._timers.terminate()
self._ipc.terminate()
for ctrl_name in reversed(self._load_order):
wn = self._nexus_map[ctrl_name]._cm_thread.name
self._nexus_map[ctrl_name].work_queue.put(None)
@ -377,6 +373,8 @@ class Broker:
wn = self._nexus_map[ctrl_name]._cm_thread.name
self._nexus_map[ctrl_name]._cm_thread.join()
self.logger.info("%s exited", wn)
self._ipc.terminate()
self._timers.terminate()
for ql in self._cm_qlisteners:
ql.stop()
self._que_listener.stop()
@ -505,6 +503,9 @@ class Broker:
def register_timed_transaction(self, entry: Transaction):
self._timers.register(entry)
def register_dpc(self, delay, call, params=()):
self._timers.register_dpc(delay, call, params)
def dispach_proxy_msg(self, msg: ProxyMsg):
# task structure
# dict(Request=dict(Target=CM, Action=None, Params=None),

View File

@ -90,9 +90,9 @@ class CBT:
return introspect(self)
def __itr__(self):
yield ("status", self.status)
yield ("initiator", self.initiator)
yield ("recipient", self.recipient)
yield ("status", self.status)
yield ("data", self.data)
def update(self, data, status: bool):
@ -116,7 +116,10 @@ class CBT:
self.response: Optional[CBT.Response] = None
self.context: dict = {}
for k, v in kwargs.items():
self.context[k] = v
if hasattr(self, k):
self.k = v
else:
self.context[k] = v
self.lifespan = CBT_LIFESPAN
self.time_created: float = 0.0
self.time_submited: float = 0.0

View File

@ -50,9 +50,9 @@ class ControllerModule:
def _setup_logger(self):
self.logger = logging.getLogger(self.__class__.__name__)
# if "LogLevel" in self.config:
# self.logger.setLevel(self.config["LogLevel"])
# return
if "LogLevel" in self.config:
self.logger.setLevel(self.config["LogLevel"])
return
@abstractmethod
def initialize(self):
@ -260,6 +260,9 @@ class ControllerModule:
raise ValueError(f"Object already marked as completed {obj}")
self._nexus.register_timed_transaction(obj, is_completed, on_expired, lifespan)
def register_deferred_call(self, delay, call, params=()):
self._nexus.register_deferred_call(delay, call, params)
@abstractmethod
def handle_ipc(self, msg: ProxyMsg):
NotImplemented

View File

@ -23,7 +23,7 @@ import queue
import threading
import time
from . import CONTROLLER_TIMER_INTERVAL, EVENT_PERIOD, statement_false
from . import CONTROLLER_TIMER_INTERVAL, EVENT_PERIOD
from .cbt import CBT
from .process_proxy import ProxyMsg
from .timed_transactions import Transaction
@ -44,10 +44,6 @@ class Nexus:
)
self._timer_loop_cnt: int = 1
self._pending_cbts: dict[int, CBT] = {}
self._last_ctlr_update_ts = time.time()
self._ctlr_update = Transaction(
self, statement_false, self.on_timer, self._timer_interval
)
@property
def controller(self):
@ -131,7 +127,7 @@ class Nexus:
def start_controller(self):
self._cm_thread.start()
self._broker.register_timed_transaction(self._ctlr_update)
self._broker.register_dpc(self._timer_interval, self.on_timer)
def __worker(self):
# get CBT from the local queue and call process_cbt() of the
@ -182,11 +178,7 @@ class Nexus:
f"Unexpected CBT state for expired event. {cbt}"
)
def _schedule_ctlr_update(self):
self._ctlr_update.lifespan = self._timer_interval
self._broker.register_timed_transaction(self._ctlr_update)
def on_timer(self, nexus, time_expired: float):
def on_timer(self):
try:
self._controller.log_state()
self._controller.on_timer_event()
@ -194,7 +186,7 @@ class Nexus:
self._controller.logger.warning(
"on_timer exception: %s", err, exc_info=True
)
self._schedule_ctlr_update()
self._broker.register_dpc(self._timer_interval, self.on_timer)
def query_param(self, param_name=""):
return self._broker.query_param(param_name)
@ -235,5 +227,8 @@ class Nexus:
)
)
def register_deferred_call(self, delay, call, params):
self._broker.register_dpc(delay, call, params)
def send_ipc(self, msg: ProxyMsg):
self._broker.send_ipc(msg)

View File

@ -53,16 +53,14 @@ class ProxyMsg:
def data(self, payload: bytearray):
self.ts = time.time()
self._json = None
if payload is None:
return
self.payload = payload
def __repr__(self) -> str:
return f"{self.fileno}:{self.payload.decode('utf-8')}"
return f"{self.fileno}:{self.json}"
@property
def json(self):
if self._json is None:
if self._json is None and self.payload:
self._json = json.loads(self.payload.decode("utf-8"))
return self._json
@ -124,6 +122,8 @@ class ProcessProxy:
while not self._exit_ev.is_set():
while self.tx_que.qsize() > 0:
msg: ProxyMsg = self.tx_que.get_nowait()
if not msg.data:
continue
node = connections.get(msg.fileno, None)
if node is not None:
if not node.event & select.EPOLLOUT:
@ -154,6 +154,7 @@ class ProcessProxy:
"Node %s IPC read hangup", node.skt.fileno()
)
if not node.tx_deque:
connections.pop(fileno)
self.close_client(node)
elif event & select.EPOLLHUP:
node = connections.pop(fileno)
@ -164,10 +165,12 @@ class ProcessProxy:
bufsz = int.from_bytes(
connections[fileno].skt.recv(2), sys.byteorder
)
if bufsz <= 0:
node = connections[fileno]
node.skt.shutdown(socket.SHUT_WR)
elif bufsz > 65507:
if bufsz == 0:
connections[fileno].skt.recv(0)
self.logger.warning(
"Zero byte read buffer size received"
)
elif bufsz < 0 or bufsz > 65507:
node = connections[fileno]
connections.pop(fileno)
self.close_client(node)

View File

@ -31,7 +31,12 @@ class TimedTransactions:
def register(self, entry: Transaction):
if self._exit_ev.is_set():
return
self._sched.enter(entry.lifespan, entry.priority, self._get_expired, [entry])
self._sched.enter(entry.lifespan, entry.priority, self._get_expired, (entry,))
def register_dpc(self, delay: float, call, params: tuple):
if self._exit_ev.is_set():
return
self._sched.enter(delay, 15, call, params)
def _get_expired(self, entry):
if not entry.is_completed():

View File

@ -27,6 +27,7 @@ except ImportError:
import copy
import logging
import signal
import subprocess
import threading
from abc import ABCMeta, abstractmethod
@ -49,6 +50,7 @@ from broker.cbt import CBT
from broker.controller_module import ControllerModule
from broker.process_proxy import ProxyMsg
from pyroute2 import IPRoute
from pyroute2.netlink.exceptions import NetlinkError
from .tunnel import TUNNEL_EVENTS
@ -171,10 +173,12 @@ class OvsBridge(BridgeABC):
def add_port(self, port_name, port_descr):
self.del_port(port_name)
self.flush_ip_addresses(self.name)
with IPRoute() as ipr:
idx = ipr.link_lookup(ifname=port_name)[0]
ipr.link("set", index=idx, mtu=self.mtu)
idx = ipr.link_lookup(ifname=port_name)
if len(idx) < 1:
raise NetlinkError(19, "No such device")
ipr.flush_addr(index=idx[0])
ipr.link("set", index=idx[0], mtu=self.mtu)
broker.run_proc(
[OvsBridge.brctl, "--may-exist", "add-port", self.name, port_name]
)
@ -263,9 +267,12 @@ class LinuxBridge(BridgeABC):
def add_port(self, port_name, port_descr):
with IPRoute() as ipr:
idx = ipr.link_lookup(ifname=port_name)[0]
ipr.link("set", index=idx, mtu=self.mtu)
ipr.link("set", index=idx, master=ipr.link_lookup(ifname=self.name)[0])
idx = ipr.link_lookup(ifname=port_name)
if len(idx) < 1:
raise NetlinkError(19, "No such device")
ipr.flush_addr(index=idx[0])
ipr.link("set", index=idx[0], mtu=self.mtu)
ipr.link("set", index=idx[0], master=ipr.link_lookup(ifname=self.name)[0])
self.ports.add(port_name)
self.port_descriptors[port_name] = port_descr
@ -307,16 +314,20 @@ class VNIC(BridgeABC):
def add_port(self, port_name):
self.name = port_name
with IPRoute() as ipr:
idx = ipr.link_lookup(ifname=port_name)[0]
ipr.link("set", index=idx, mtu=self.mtu)
ipr.addr("add", index=idx, address=self.ip_addr, mask=self.prefix_len)
ipr.link("set", index=idx, state="up")
idx = ipr.link_lookup(ifname=port_name)
if len(idx) < 1:
raise NetlinkError(19, "No such device")
ipr.link("set", index=idx[0], mtu=self.mtu)
ipr.addr("add", index=idx[0], address=self.ip_addr, mask=self.prefix_len)
ipr.link("set", index=idx[0], state="up")
def del_port(self, port_name):
with IPRoute() as ipr:
idx = ipr.link_lookup(ifname="port_name")[0]
ipr.link("set", index=idx, state="down")
ipr.link("del", index=idx)
idx = ipr.link_lookup(ifname=port_name)
if len(idx) < 1:
raise NetlinkError(19, "No such device")
ipr.link("set", index=idx[0], state="down")
ipr.link("del", index=idx[0])
###################################################################################################
@ -447,7 +458,7 @@ class BridgeController(ControllerModule):
net_ovl["NetDevice"] = {}
# start the BF proxy if at least one overlay is configured for it
if "BoundedFlood" in self.config:
self._start_bf_proxy_server()
self._start_boundedflood()
# create and configure the bridge for each overlay
_ = self._create_overlay_bridges()
publishers = self.get_registered_publishers()
@ -490,7 +501,7 @@ class BridgeController(ControllerModule):
"TOP_REQUEST_OND_TUNNEL": self.resp_handler_default,
}
def _start_bf_proxy_server(self):
def _start_boundedflood(self):
bf_config = self.config["BoundedFlood"]
bf_config["NodeId"] = self.node_id
bf_ovls = bf_config.pop("Overlays")
@ -502,7 +513,7 @@ class BridgeController(ControllerModule):
)
bf_config[br_name] = bf_ovls[olid]
bf_config[br_name]["OverlayId"] = olid
self.start_bf_client_module(bf_config)
self._start_bf_module(bf_config)
def _create_overlay_bridges(self) -> dict:
ign_br_names = {}
@ -548,7 +559,7 @@ class BridgeController(ControllerModule):
except Exception as err:
self._tunnels[overlay_id].pop(port_name, None)
bridge.del_port(port_name)
self.logger.info("Failed to add port %s. %s", tnl_data, err, exc_info=True)
self.logger.info("Failed to add port %s. %s", tnl_data, err)
def req_handler_manage_bridge(self, cbt: CBT):
try:
@ -558,9 +569,12 @@ class BridgeController(ControllerModule):
if cbt.request.params["UpdateType"] == TUNNEL_EVENTS.Connected:
self._add_tunnel_port(olid, port_name, cbt.request.params)
elif cbt.request.params["UpdateType"] == TUNNEL_EVENTS.Removed:
self._tunnels[olid].pop(port_name, None)
bridge.del_port(port_name)
self.logger.info("Port %s removed from bridge %s", port_name, bridge)
if port_name:
self._tunnels[olid].pop(port_name, None)
bridge.del_port(port_name)
self.logger.info(
"Port %s removed from bridge %s", port_name, bridge
)
except Exception as err:
self.logger.warning("Manage bridge error %s", err, exc_info=True)
cbt.set_response(None, True)
@ -569,10 +583,17 @@ class BridgeController(ControllerModule):
def on_timer_event(self):
for tnl in self._tunnels.values():
tnl.trim()
if self._bf_proc is not None:
exit_code = self._bf_proc.poll()
if exit_code:
self.logger.info(
"BF module terminated unexpectedly (%s), restarting.", exit_code
)
self._start_boundedflood()
def terminate(self):
try:
self.stop_bf_module()
self._stop_bf_module()
for olid, bridge in self._ovl_net.items():
if self.overlays[olid]["NetDevice"].get(
"AutoDelete", BRIDGE_AUTO_DELETE
@ -671,7 +692,7 @@ class BridgeController(ControllerModule):
msg.data = json.dumps(task).encode("utf-8")
self.send_ipc(msg)
def start_bf_client_module(self, bf_config):
def _start_bf_module(self, bf_config):
RyuManager = spawn.find_executable("ryu-manager")
if RyuManager is None:
raise RuntimeError("RyuManager was not found, is it installed?")
@ -687,13 +708,13 @@ class BridgeController(ControllerModule):
]
self._bf_proc = subprocess.Popen(cmd)
def stop_bf_module(self, wt: int = 1.15):
def _stop_bf_module(self, wt: int = 1.15):
if self._bf_proc is not None:
try:
exit_code = self._bf_proc.poll()
if exit_code is None:
self._bf_proc.terminate()
self._bf_proc.wait()
self._bf_proc.send_signal(signal.SIGINT)
self._bf_proc.wait(wt)
else:
self.logger.debug(
"BoundedFlood process %s already exited with %s",
@ -707,7 +728,5 @@ class BridgeController(ControllerModule):
"Killing unresponsive BoundedFlood: %s", self._bf_proc.pid
)
self._bf_proc.kill()
self._bf_proc = None
self.logger.info("BoundedFlood terminated")
# Todo: check if BF process exited and restart if not shutting down

View File

@ -444,4 +444,4 @@ class GeneveTunnel(ControllerModule):
if tnl.state == TUNNEL_STATES.AUTHORIZED:
self._deauth_tnl(tnl)
else:
self._rollback_tnl([tnl])
self._rollback_tnl(tnl)

View File

@ -83,6 +83,9 @@ class Tunnel:
def tunnel_state(self, new_state):
self.state = new_state
def is_tnl_online(self) -> bool:
return bool(self.tunnel_state == TUNNEL_STATES.ONLINE)
class LinkManager(ControllerModule):
TAPNAME_MAXLEN = 15
@ -375,13 +378,15 @@ class LinkManager(ControllerModule):
self._tunnels[tnlid].link.status_retry = 0
elif cbt.request.params["Command"] == "LinkDisconnected":
if self._tunnels[tnlid].tunnel_state != TUNNEL_STATES.QUERYING:
self.logger.debug("Link %s is disconnected", tnlid)
# issue a link state check only if it not already being done
self.logger.debug("Link %s is disconnected", tnlid)
self._tunnels[tnlid].tunnel_state = TUNNEL_STATES.QUERYING
cbt.set_response(data=None, status=True)
self.register_cbt(
"TincanTunnel", "TCI_QUERY_LINK_INFO", {"TunnelId": tnlid}
)
self.register_deferred_call(
5,
self.register_cbt,
("TincanTunnel", "TCI_QUERY_LINK_INFO", {"TunnelId": tnlid}),
) # issue link stat check in 5 secs as the link can reconnect
elif cbt.request.params["Command"] == "TincanTunnelFailed":
lnkid = self.link_id(tnlid)
if lnkid:
@ -736,8 +741,7 @@ class LinkManager(ControllerModule):
return ign_netinf
def is_tnl_online(self, tnl: Tunnel) -> bool:
return bool(tnl.tunnel_state == TUNNEL_STATES.ONLINE)
# return bool(tnl.link and tnl.link.creation_state == 0xC0)
return tnl.is_tnl_online()
def _remove_link_from_tunnel(self, tnlid):
tnl = self._tunnels.get(tnlid)
@ -855,7 +859,8 @@ class LinkManager(ControllerModule):
self.complete_cbt(parent_cbt)
return
lnkid = self.link_id(tnlid)
self._tunnels[tnlid].link.creation_state = 0xC0
tnl = self._tunnels[tnlid]
tnl.link.creation_state = 0xC0
self.logger.debug(
"Creating link %s to peer %s (5/5 Initiator)", tnlid[:7], peer_id[:7]
)
@ -868,6 +873,13 @@ class LinkManager(ControllerModule):
self.node_id[:7],
peer_id[:7],
)
if not tnl.is_tnl_online():
self.register_timed_transaction(
tnl,
self.is_tnl_online,
self.on_tnl_timeout,
LINK_SETUP_TIMEOUT,
)
def _complete_link_endpt_request(self, cbt: CBT):
# Create Link: Phase 4 Node B
@ -1036,40 +1048,3 @@ class LinkManager(ControllerModule):
self._tunnels.pop(tnl.tnlid, None)
if tnl.link:
self._links.pop(tnl.link.lnkid, None)
""" TODO: OUTDATED, NEED TO BE UPDATED
###################################################################################################
Link Manager state and event specifications
###################################################################################################
If LM fails a CBT there will be no further events fired for the tunnel.
Once tunnel goes online an explicit CBT LNK_REMOVE_TUNNEL is required.
Partially created tunnels that fails will be removed automatically by LM.
Events
(1) TunnelEvents.AuthExpired - After a successful completion of CBT LNK_AUTH_TUNNEL, the tunnel
descriptor is created and TunnelEvents.Authorized is fired.
(2) TunnelEvents.AuthExpired - If no action is taken on the tunnel within LinkSetupTimeout LM will
fire TunnelEvents.AuthExpired and remove the associated tunnel descriptor.
(3) ##REMOVED## TunnelEvents.Created - On both nodes A & B, on a successful completion of CBT
TCI_CREATE_TUNNEL, the TAP device exists and TunnelEvents.Created is fired.
(4) TunnelEvents.Connected - After Tincan delivers the online event to LM TunnelEvents.Connected
is fired.
(5) TunnelEvents.Disconnected - After Tincan signals link offline or QUERYy_LNK_STATUS discovers
offline TunnelEvents.Disconnected is fired.
(6) TunnelEvents.Removed - After the TAP device is removed TunnelEvents.Removed is fired and the
tunnel descriptor is removed. Tunnel must be in TUNNEL_STATES.ONLINE or TUNNEL_STATES.OFFLINE
Internal States
(1) TUNNEL_STATES.AUTHORIZED - After a successful completion of CBT LNK_AUTH_TUNNEL, the tunnel
descriptor exists.
(2) TUNNEL_STATES.CREATING - entered on reception of CBT LNK_CREATE_TUNNEL.
(3) TUNNEL_STATES.QUERYING - entered before issuing CBT TCI_QUERY_LINK_INFO. Happens when
LinkStateChange is LINK_STATE_DOWN and state is not already TUNNEL_STATES.QUERYING; OR
TCI_QUERY_LINK_INFO is OFFLINE and state is not already TUNNEL_STATES.QUERYING.
(4) TUNNEL_STATES.ONLINE - entered when CBT TCI_QUERY_LINK_INFO is ONLINE or LinkStateChange is
LINK_STATE_UP.
(5) TUNNEL_STATES.OFFLINE - entered when QUERY_LNK_STATUS is OFFLINE or LinkStateChange is
LINK_STATE_DOWN event.
"""

View File

@ -39,7 +39,7 @@ from typing import Optional, Tuple, Union
import broker
import slixmpp
from broker import CACHE_EXPIRY_INTERVAL, PRESENCE_INTERVAL, statement_false
from broker import CACHE_EXPIRY_INTERVAL, PRESENCE_INTERVAL
from broker.cbt import CBT
from broker.controller_module import ControllerModule
from broker.remote_action import RemoteAction
@ -403,17 +403,13 @@ class XmppTransport(slixmpp.ClientXMPP):
self.loop.close()
self.logger.debug("Event loop closed on XMPP overlay=%s", self._overlay_id)
def shutdown(
self,
):
self.logger.debug("Initiating shutdown of XMPP overlay=%s", self._overlay_id)
self.disconnect(reason="controller shutdown", ignore_send_queue=True)
# self.loop.call_soon_threadsafe(self.disconnect(reason="controller shutdown", ignore_send_queue=True))
self.logger.debug("Disconnect of XMPP overlay=%s", self._overlay_id)
def shutdown(self):
self.logger.debug("Initiating shutdown of XMPP overlay %s", self._overlay_id)
self.loop.call_soon_threadsafe(self.disconnect, 2, "controller shutdown", True)
class XmppCircle:
_REFLECT: list[str] = ["xport", "_transmission_queue", "jid_cache"]
_REFLECT: list[str] = ["xport", "transmit_queues", "jid_cache"]
def __init__(
self, node_id: str, overlay_id: str, ovl_config: dict, **kwargs
@ -507,11 +503,9 @@ class Signal(ControllerModule):
)
self._circles[olid] = xcir
xcir.start()
self.register_timed_transaction(
self,
statement_false,
self.on_exp_presence,
self.register_deferred_call(
PRESENCE_INTERVAL * random.randint(1, 5),
self.on_exp_presence,
)
self.logger.info("Controller module loaded")
@ -520,16 +514,13 @@ class Signal(ControllerModule):
20, 50
)
def on_exp_presence(self, *_):
def on_exp_presence(self):
with self._lck:
for circ in self._circles.values():
if circ.xport and circ.xport.is_connected():
circ.xport.send_presence_safe(pstatus="ident#" + self.node_id)
self.register_timed_transaction(
self,
statement_false,
self.on_exp_presence,
self._next_anc_interval(),
self.register_deferred_call(
self._next_anc_interval(), self.on_exp_presence
)
def on_presence(self, msg):
@ -709,33 +700,26 @@ class Signal(ControllerModule):
def scavenge_expired_outgoing_rem_acts(self, outgoing_rem_acts: dict[str, Queue]):
# clear out the JID Refresh queue for a peer if the oldest entry age exceeds the limit
peer_ids = []
for peer_id in outgoing_rem_acts:
peer_qlen = outgoing_rem_acts[peer_id].qsize()
if not outgoing_rem_acts[peer_id].queue:
continue
remact_descr = outgoing_rem_acts[peer_id].queue[
0
] # peek at the first/oldest entry
if time.time() - remact_descr[2] >= self._jid_resolution_timeout:
peer_ids.append(peer_id)
self.logger.debug(
"Remote acts scavenged for removal peer_id %s qlength %d",
peer_id,
peer_qlen,
)
for peer_id, transmit_queue in outgoing_rem_acts.items():
if transmit_queue.queue:
remact_descr = transmit_queue.queue[0] # peek at the first/oldest entry
if time.time() - remact_descr[2] >= self._jid_resolution_timeout:
peer_ids.append(peer_id)
self.logger.debug(
"Remote act scavenged for removal %s", remact_descr
)
for peer_id in peer_ids:
remact_que = outgoing_rem_acts.pop(peer_id, Queue())
while True:
try:
remact = remact_que.get_nowait()
except Empty:
break
else:
tag = remact[1].action_tag
cbt = self._cbts_pending_remote_resp.pop(tag, None)
cbt.set_response("Peer lookup failed", False)
self.complete_cbt(cbt)
remact_que.task_done()
transmit_queue: Queue = outgoing_rem_acts[peer_id]
try:
remact = transmit_queue.get_nowait()
except Empty:
return
tag = remact[1].action_tag
cbt = self._cbts_pending_remote_resp.pop(tag, None)
# if cbt:
cbt.set_response("Peer lookup failed", False)
self.complete_cbt(cbt)
transmit_queue.task_done()
def _setup_circle(self, overlay_id: str):
xcir = XmppCircle(

View File

@ -26,10 +26,11 @@ except ImportError:
import subprocess
import time
from copy import deepcopy
from threading import Event
import broker
from broker import TINCAN_CHK_INTERVAL, statement_false
from broker import TINCAN_CHK_INTERVAL
from broker.cbt import CBT
from broker.controller_module import ControllerModule
from broker.process_proxy import ProxyMsg
@ -83,7 +84,7 @@ class TincanTunnel(ControllerModule):
self._register_req_handlers()
self._register_resp_handlers()
self._tci_publisher = self.publish_subscription("TCI_TUNNEL_EVENT")
self.on_expire_chk_tincan()
self.register_deferred_call(TINCAN_CHK_INTERVAL, self.on_expire_chk_tincan)
self.logger.info("Controller module loaded")
def _register_abort_handlers(self):
@ -135,7 +136,7 @@ class TincanTunnel(ControllerModule):
def _create_tunnel(self, cbt: CBT):
msg = cbt.request.params
tnlid = msg["TunnelId"]
ctl = broker.CTL_CREATE_TUNNEL
ctl = deepcopy(broker.CTL_CREATE_TUNNEL)
ctl["TransactionId"] = cbt.tag
req = ctl["Request"]
req["StunServers"] = msg["StunServers"]
@ -147,9 +148,6 @@ class TincanTunnel(ControllerModule):
tc_proc = self._tc_proc_tbl[tnlid]
self._tnl_cbts[cbt.tag] = cbt
self.send_control(tc_proc.ipc_id, json.dumps(ctl))
for turn in req["TurnServers"]:
turn["User"] = "***"
turn["Password"] = "***"
def req_handler_create_link(self, cbt: CBT):
try:
@ -159,6 +157,8 @@ class TincanTunnel(ControllerModule):
cbt.add_context("OnRegister", self._create_link)
self._tnl_cbts[tnlid] = cbt
self._start_tincan(tnlid)
self._tc_proc_tbl[tnlid].ovlid = msg["OverlayId"]
self._tc_proc_tbl[tnlid].tap_name = msg["TapName"]
else:
self._create_link(cbt)
except Exception:
@ -169,7 +169,7 @@ class TincanTunnel(ControllerModule):
def _create_link(self, cbt: CBT):
msg = cbt.request.params
tnlid = msg["TunnelId"]
ctl = broker.CTL_CREATE_LINK
ctl = deepcopy(broker.CTL_CREATE_LINK)
ctl["TransactionId"] = cbt.tag
req = ctl["Request"]
req["TunnelId"] = tnlid
@ -187,9 +187,6 @@ class TincanTunnel(ControllerModule):
tc_proc = self._tc_proc_tbl[tnlid]
self._tnl_cbts[cbt.tag] = cbt
self.send_control(tc_proc.ipc_id, json.dumps(ctl))
for turn in req["TurnServers"]:
turn["User"] = "***"
turn["Password"] = "***"
def req_handler_query_candidate_address_set(self, cbt: CBT):
msg = cbt.request.params
@ -198,7 +195,7 @@ class TincanTunnel(ControllerModule):
err_msg = f"No tunnel exists for tunnel ID: {tnlid[:7]}"
cbt.set_response({"ErrorMsg": err_msg, "Status": False})
return
ctl = broker.CTL_QUERY_CAS
ctl = deepcopy(broker.CTL_QUERY_CAS)
ctl["TransactionId"] = cbt.tag
ctl["Request"]["TunnelId"] = tnlid
tc_proc = self._tc_proc_tbl[tnlid]
@ -206,17 +203,16 @@ class TincanTunnel(ControllerModule):
self.send_control(tc_proc.ipc_id, json.dumps(ctl))
def req_handler_query_link_stats(self, cbt: CBT):
msg = cbt.request.params
tnlid = msg["TunnelId"]
if tnlid not in self._tc_proc_tbl:
tnlid = cbt.request.params["TunnelId"]
tc_proc = self._tc_proc_tbl.get(tnlid)
if not tc_proc:
err_msg = f"No tunnel exists for tunnel ID: {tnlid[:7]}"
cbt.set_response({"ErrorMsg": err_msg, "Status": False})
self.complete_cbt(cbt)
return
ctl = broker.CTL_QUERY_LINK_STATS
ctl = deepcopy(broker.CTL_QUERY_LINK_STATS)
ctl["TransactionId"] = cbt.tag
ctl["Request"]["TunnelId"] = tnlid
tc_proc = self._tc_proc_tbl[tnlid]
self._tnl_cbts[cbt.tag] = cbt
self.send_control(tc_proc.ipc_id, json.dumps(ctl))
@ -228,8 +224,9 @@ class TincanTunnel(ControllerModule):
cbt.set_response(err_msg, True)
self.complete_cbt(cbt)
return
self.logger.debug("Removing tunnel %s", tnlid)
self.logger.info("Removing tunnel %s", tnlid)
tc_proc = self._tc_proc_tbl.pop(tnlid, None)
self._pids.pop(tc_proc.proc.pid, None)
self._stop_tincan(tc_proc)
cbt.set_response("Tunnel removed", True)
self.complete_cbt(cbt)
@ -242,7 +239,7 @@ class TincanTunnel(ControllerModule):
cbt.set_response({"ErrorMsg": err_msg, "Status": False})
self.complete_cbt(cbt)
return
ctl = broker.CTL_REMOVE_LINK
ctl = deepcopy(broker.CTL_REMOVE_LINK)
ctl["TransactionId"] = cbt.tag
req = ctl["Request"]
req["TunnelId"] = tnlid
@ -252,16 +249,18 @@ class TincanTunnel(ControllerModule):
self.send_control(tc_proc.ipc_id, json.dumps(ctl))
def req_handler_send_echo(self, cbt: CBT):
ctl = broker.CTL_ECHO
ctl = deepcopy(broker.CTL_ECHO)
ctl["TransactionId"] = cbt.tag
tnlid = cbt.request.params
tc_proc = self._tc_proc_tbl.get(tnlid)
if tc_proc.do_chk and tc_proc.echo_replies > 0:
if tc_proc and tc_proc.do_chk and tc_proc.echo_replies > 0:
tc_proc.echo_replies -= 1
ctl["Request"]["Message"] = tc_proc.tnlid
self._tnl_cbts[cbt.tag] = cbt
self.send_control(tc_proc.ipc_id, json.dumps(ctl))
else:
tc_proc.do_chk = False
cbt.set_response(f"Cannot send echo to {tc_proc}", False)
self.complete_cbt(cbt)
@ -269,6 +268,9 @@ class TincanTunnel(ControllerModule):
tnlid = cbt.response.data
if cbt.response.status and tnlid in self._tc_proc_tbl:
self._tc_proc_tbl[tnlid].echo_replies = broker.MAX_HEARTBEATS
self.register_internal_cbt(
"TCI_QUERY_LINK_INFO", {"TunnelId": tnlid}, lifespan=15
)
else:
self.logger.info(cbt.response.data)
self.free_cbt(cbt)
@ -288,32 +290,35 @@ class TincanTunnel(ControllerModule):
else:
# tincan process unresponsive
self.logger.warning(
"unnel: %s health check failed, terminating process: %s",
"Tunnel: %s health check failed, terminating process: %s",
tnlid,
tc_proc,
)
self._stop_tincan(tc_proc)
self._notify_tincan_terminated(tnlid)
self._pids.pop(tc_proc.proc.pid, None)
self._tc_proc_tbl.pop(tnlid, None)
self._stop_tincan(tc_proc)
self._notify_tincan_terminated(tc_proc)
def req_handler_check_process(self, cbt):
if self.exit_ev.is_set():
return
def req_handler_check_process(self, cbt: CBT):
exit_code = None
rmv = []
for tnlid, tc_proc in self._tc_proc_tbl.items():
for tc_proc in self._tc_proc_tbl.values():
exit_code = tc_proc.proc.poll()
if exit_code:
# tincan process crashed
rmv.append(tnlid)
for tnlid in rmv:
self.logger.warning(
"Tincan process %s exited unexpectedly with code, %s",
tc_proc.proc.pid,
exit_code,
)
self._notify_tincan_terminated(tnlid)
self._tc_proc_tbl.pop(tnlid, None)
self.logger.warning(
"Tincan process %s exited unexpectedly with code %s",
tc_proc.proc.pid,
exit_code,
)
rmv.append(tc_proc)
for tc_proc in rmv:
self._pids.pop(tc_proc.proc.pid)
self._tc_proc_tbl.pop(tc_proc.tnlid, None)
self._remove_tap(tc_proc.tap_name)
self._notify_tincan_terminated(tc_proc)
cbt.set_response(rmv, True)
self.complete_cbt(cbt)
def on_timer_event(self):
if self.exit_ev.is_set():
@ -321,18 +326,14 @@ class TincanTunnel(ControllerModule):
# send an echo health check every timer interval, eg., 30s
for tnlid, tc_proc in self._tc_proc_tbl.items():
if tc_proc.do_chk:
self.register_internal_cbt("_TCI_SEND_ECHO", tnlid)
self.register_internal_cbt("_TCI_SEND_ECHO", tnlid, lifespan=10)
def on_expire_chk_tincan(self, *_):
def on_expire_chk_tincan(self):
if self.exit_ev.is_set():
return
self.register_internal_cbt("_TCI_CHK_PROCESS")
self.register_timed_transaction(
self,
statement_false,
self.on_expire_chk_tincan,
TINCAN_CHK_INTERVAL,
)
if self._tc_proc_tbl:
self.register_internal_cbt("_TCI_CHK_PROCESS")
self.register_deferred_call(TINCAN_CHK_INTERVAL, self.on_expire_chk_tincan)
def terminate(self):
self.exit_ev.set()
@ -399,34 +400,35 @@ class TincanTunnel(ControllerModule):
except subprocess.TimeoutExpired:
exit_code = tc_proc.proc.poll()
if exit_code is None:
self._remove_tap()
self._remove_tap(tc_proc.tap_name)
tc_proc.proc.kill()
self._kill_times.append(self._kill_times[-1] + time.time() - ts)
self.logger.debug("Killed unresponsive Tincan: %s", tc_proc.proc.pid)
self._pids.pop(tc_proc.proc.pid)
self.logger.info(
"Process %s for tunnel %s terminated", tc_proc.proc.pid, tc_proc.tnlid
)
def _notify_tincan_terminated(self, tnlid: str):
def _notify_tincan_terminated(self, tc_proc: TincanProcess):
self._tci_publisher.post_update(
{
"Command": "TincanTunnelFailed",
"Reason": "Tincan process terminated",
"OverlayId": self._tc_proc_tbl[tnlid].ovlid,
"TunnelId": tnlid,
"TapName": self._tc_proc_tbl[tnlid].tap_name,
"OverlayId": tc_proc.ovlid,
"TunnelId": tc_proc.tnlid,
"TapName": tc_proc.tap_name,
}
)
def handle_ipc(self, msg: ProxyMsg):
if self.exit_ev.is_set():
return
try:
ctl = msg.json
if ctl["ProtocolVersion"] != EVIO_VER_CTL:
raise ValueError("Invalid control version detected")
# self.logger.debug("Received dataplane control - %s", ctl)
# Get the original CBT if this is the response
if ctl["ControlType"] == "Response":
# self.logger.debug("Received Tincan control response: %s", ctl)
cbt = self._tnl_cbts.pop(ctl["TransactionId"])
cbt.set_response(
ctl["Response"]["Message"],

View File

@ -31,14 +31,13 @@ from random import randint
from typing import Optional
import broker
from broker import (
from broker import ( # PEER_DISCOVERY_COALESCE,
CBT_LIFESPAN,
EXCLUSION_BASE_INTERVAL,
MAX_CONCURRENT_OPS,
MAX_ON_DEMAND_EDGES,
MAX_SUCCESSIVE_FAILS,
MIN_SUCCESSORS,
PEER_DISCOVERY_COALESCE,
STALE_INTERVAL,
SUCCESSIVE_FAIL_DECR,
SUCCESSIVE_FAIL_INCR,
@ -355,18 +354,7 @@ class Topology(ControllerModule):
disc = DiscoveredPeer(peer_id)
self._net_ovls[olid].known_peers[peer_id] = disc
disc.presence()
if disc.is_available:
self._net_ovls[olid].new_peer_count += 1
if self._net_ovls[olid].new_peer_count >= self.config.get(
"PeerDiscoveryCoalesce", PEER_DISCOVERY_COALESCE
):
self.logger.info(
"Coalesced %d of %d discovered peers, attempting update on overlay %s",
self._net_ovls[olid].new_peer_count,
self.config.get("PeerDiscoveryCoalesce", PEER_DISCOVERY_COALESCE),
olid,
)
self._update_overlay(olid)
self._update_overlay(olid)
cbt.set_response(None, True)
self.complete_cbt(cbt)
@ -678,6 +666,14 @@ class Topology(ControllerModule):
self._process_next_transition(ovl)
else:
self.free_cbt(cbt)
ce = ovl.adjacency_list.get(peer_id)
if ce.edge_state != EDGE_STATES.Connected:
self.register_timed_transaction(
(ce, olid),
self._is_connedge_connected,
self._on_connedge_timeout,
30,
)
def resp_handler_remove_tnl(self, cbt: CBT):
params = cbt.request.params
@ -1074,9 +1070,9 @@ class Topology(ControllerModule):
raise ValueError(f"Invalid request: Undefinfed tunnel type {dataplane}")
def _initiate_remove_edge(self, net_ovl: NetworkOverlay, peer_id: str):
if peer_id not in net_ovl.adjacency_list:
raise RuntimeWarning("No connection edge to peer found")
ce = net_ovl.adjacency_list[peer_id]
ce = net_ovl.adjacency_list.get(peer_id)
if not ce:
return
if (
ce.edge_state == EDGE_STATES.Connected
and ce.role == CONNECTION_ROLE.Initiator
@ -1091,8 +1087,6 @@ class Topology(ControllerModule):
raise ValueError("Successor threshold not met")
self.logger.debug("Removing edge %s", ce)
self._remove_tunnel(net_ovl, ce.dataplane, ce.peer_id, ce.edge_id)
return True
return False
def _remove_tunnel(
self,

View File

@ -30,7 +30,7 @@ import urllib.request as request
from urllib.error import HTTPError, URLError
from broker.cbt import CBT
from broker.controller_module import ControllerModule
from broker.controller_module import ControllerModule, introspect
class UsageReport(ControllerModule):
@ -44,6 +44,9 @@ class UsageReport(ControllerModule):
"NodeId": hashlib.sha256(self.node_id.encode("utf-8")).hexdigest(),
}
def __repr__(self):
return introspect(self)
def initialize(self):
self.logger.info("Controller module loaded")