# EdgeVPNio # Copyright 2020, University of Florida # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. import importlib import time import unittest from queue import Queue from time import sleep from unittest.mock import MagicMock, Mock, patch from slixmpp import register_stanza_plugin, Message, Callback, StanzaPath, JID from framework.CBT import CBT from framework.CFxHandle import CFxHandle from modules.Signal import XmppTransport, JidCache, EvioSignal class SignalTest(unittest.TestCase): def setup_vars_mocks(self): """ Setup the variables and the mocks required by the unit tests. :return: The signal object and signal dictionary """ cfx_handle = Mock() module = importlib.import_module("modules.{0}" .format("Signal")) module_class = getattr(module, "Signal") sig_dict = { "Signal": { "Enabled": True, "PresenceInterval": 10, "CacheExpiry": 5, "Overlays": { "A0FB389": { "HostAddress": "1.1.1.1", "Port": "5222", "Username": "raj", "Password": "raj", "AuthenticationMethod": "PASSWORD" } }, "NodeId": "1234434323" } } signal = module_class(cfx_handle, sig_dict["Signal"], "Signal") cfx_handle._cm_instance = signal cfx_handle._cm_config = sig_dict["Signal"] return sig_dict, signal def testtransport_start_event_handler(self): """ Test to check the start of the event handler of the signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory(1, sig_dict["Signal"]["Overlays"]["A0FB389"], signal, None, None, None) transport._sig.sig_log = MagicMock() transport.add_event_handler = MagicMock() transport.register_handler = MagicMock() transport.get_roster = MagicMock() transport.start_event_handler(event=None) transport.add_event_handler.assert_called_once() transport.get_roster.assert_called_once() print("Passed : testtransport_start_event_handler") def testtransport_connect_to_server(self): """ Test to check the connect to server of the transport instance of the signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory(1, sig_dict["Signal"]["Overlays"]["A0FB389"], signal, None, None, None) transport._sig.sig_log = MagicMock() transport.connect = MagicMock() XmppTransport.connect_to_server(transport) transport._sig.sig_log.assert_called_once() transport.connect.assert_called_once() print("Passed : testtransport_connect_to_server") def testtransport_start_process(self): """ Test to check the start_process method of the transport instance of the signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory(1, sig_dict["Signal"]["Overlays"]["A0FB389"], signal, None, None, None) transport.loop.run_forever = MagicMock() transport.start_process() transport.loop.run_forever.assert_called_once() print("Passed : testtransport_start_process") def testtransport_factory_with_password(self): """ Test to check the factory method of the transport instance of the signal class. """ sig_dict, signal = self.setup_vars_mocks() self.assertTrue(isinstance( XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None), XmppTransport)) print("Passed : testtransport_factory_with_password") def testtransport_factory_with_x509(self): """ Test to check the factory method of the transport instance with x509 auth_method of the signal class. """ sig_dict, signal = self.setup_vars_mocks() sig_dict["Signal"]["Overlays"]["A0FB389"]["AuthenticationMethod"] = "x509" sig_dict["Signal"]["Overlays"]["A0FB389"]["TrustStore"] = {} sig_dict["Signal"]["Overlays"]["A0FB389"]["CertDirectory"] = "/home/cert" sig_dict["Signal"]["Overlays"]["A0FB389"]["CertFile"] = "file1" sig_dict["Signal"]["Overlays"]["A0FB389"]["KeyFile"] = "keyfile" transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) self.assertTrue(isinstance(transport, XmppTransport)) print(transport.certfile) print(transport.keyfile) assert transport.certfile == "/home/cert\\file1" assert transport.keyfile == "/home/cert\keyfile" print("Passed : testtransport_factory_with_x509") def testtransport_factory_without_password(self): """ Test to check the factory method of the transport instance without the password of the signal class. """ sig_dict, signal = self.setup_vars_mocks() sig_dict["Signal"]["Overlays"]["A0FB389"]["Password"] = None transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport.add_event_handler = MagicMock() transport.register_handler = MagicMock() transport.get_roster = MagicMock() self.assertTrue(isinstance(transport, XmppTransport)) print("Passed : testtransport_factory_without_password") def testtransport_factory_without_user(self): """ Test to check the factory method of the transport instance without the username of the signal class. """ sig_dict, signal = self.setup_vars_mocks() sig_dict["Signal"]["Overlays"]["A0FB389"]["Username"] = None with self.assertRaises(RuntimeError): transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) print("Passed : testtransport_factory_without_user") def testtransport_presence_event_handler(self): """ Test to check the presence method with ident of the signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) presence = {"from": "raj", "to": "raj@ipop", "status": "ident#12344323"} transport.boundjid = JID("raj@ipop/ipop") transport.send_msg = MagicMock() jid_cache = Mock() presence_publisher = Mock() transport._presence_publisher = presence_publisher transport._presence_publisher.post_update = MagicMock() transport._jid_cache = jid_cache transport._jid_cache.add_entry = MagicMock() transport.presence_event_handler(presence) transport.send_msg.assert_called_once() print("Passed : testtransport_presence_event_handler") def testtransport_presence_event_handler_with_uid(self): """ Test to check the presence method with uid of the signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) presence = {"from": "raj", "to": "raj@ipop", "status": "uid?#1234434323"} transport.boundjid = JID("raj@ipop/ipop") transport.send_msg = MagicMock() jid_cache = Mock() presence_publisher = Mock() transport._presence_publisher = presence_publisher transport._presence_publisher.post_update = MagicMock() transport._jid_cache = jid_cache transport._jid_cache.add_entry = MagicMock() transport.presence_event_handler(presence) transport.send_msg.assert_called_once() print("Passed : testtransport_presence_event_handler_with_uid") def testtransport_presence_event_handler_with_no_status(self): """ Test to check the presence method with no valid status of the signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) presence = {"from": "raj", "to": "raj@ipop", "status": "ipop?#1234434323"} transport.boundjid = JID("raj@ipop/ipop") jid_cache = Mock() presence_publisher = Mock() transport._presence_publisher = presence_publisher transport._presence_publisher.post_update = MagicMock() transport._jid_cache = jid_cache transport._jid_cache.add_entry = MagicMock() transport._sig.sig_log = MagicMock() transport.presence_event_handler(presence) transport._sig.sig_log.assert_called_once() print("Passed : testtransport_presence_event_handler_with_uid") def testtransport_presence_event_handler_with_exception(self): """ Test to check the presence method with an exception raised of the signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) presence = {"from": "raj", "to": "raj@ipop", "status": "uid?#1234434323"} transport.boundjid = JID("raj@ipop/ipop") transport.send_msg = MagicMock() transport.send_msg.side_effect = Exception() jid_cache = Mock() presence_publisher = Mock() transport._presence_publisher = presence_publisher transport._presence_publisher.post_update = MagicMock() transport._jid_cache = jid_cache transport._jid_cache.add_entry = MagicMock() transport._sig.sig_log = MagicMock() transport.presence_event_handler(presence) transport.send_msg.assert_called_once() transport._sig.sig_log.assert_called_once() print("Passed : testtransport_presence_event_handler_with_uid") def testtransport_message_listener_with_uid(self): """ Test to check the message_listener method with uid of the signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport._jid_cache = JidCache(signal, 30) register_stanza_plugin(Message, EvioSignal) msg = Message() msg["from"] = "ipop" transport.boundjid.full = "edgevpn" msg["evio"]["type"] = "uid!" msg["evio"]["payload"] = "123#456" item = {0: "invk", 1: {"ActionTag": "1"}, 2: 5} q = Queue() q.put_nowait(item) outgoing_rem_acts = {"456": q} transport._outgoing_rem_acts = outgoing_rem_acts transport.send_msg = MagicMock() transport.message_listener(msg) assert transport._jid_cache.lookup("456") == "123" transport.send_msg.assert_called_once() print("Passed : testtransport_presence_event_handler_with_uid") def testtransport_message_listener_with_announce_to_same_node(self): """ Test to check the message_listener method with announce to the same of the signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport._jid_cache = JidCache(signal, 30) register_stanza_plugin(Message, EvioSignal) msg = Message() msg["from"] = "ipop" transport.boundjid.full = "edgevpn" msg["evio"]["type"] = "announce" msg["evio"]["payload"] = "123#456" sig_dict["Signal"]["NodeId"] = "456" transport.send_msg = MagicMock() transport._presence_publisher = Mock() transport._presence_publisher.post_update = MagicMock() transport.message_listener(msg) self.assertEqual(transport._jid_cache.lookup("456"), None) transport.send_msg.assert_not_called() transport._presence_publisher.post_update.assert_not_called() print("Passed : testtransport_message_listener_with_announce_to_same_node") def testtransport_message_listener_with_announce_to_different_node(self): """ Test to check the message_listener method with announce to a different node of the signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport._jid_cache = JidCache(signal, 30) register_stanza_plugin(Message, EvioSignal) msg = Message() msg["from"] = "ipop" transport.boundjid.full = "edgevpn" msg["evio"]["type"] = "announce" msg["evio"]["payload"] = "123#456" transport._presence_publisher = Mock() transport._presence_publisher.post_update = MagicMock() transport.message_listener(msg) self.assertEqual(transport._jid_cache.lookup("456"), "123") transport._presence_publisher.post_update.assert_called_once() print("Passed : testtransport_message_listener_with_announce_to_same_node") @patch('json.loads') def testtransport_message_listener_with_invk(self, mock_loads): """ Test to check the message_listener method with invk to a different node of the signal class. """ sig_dict, signal = self.setup_vars_mocks() mock_loads.return_value = MagicMock() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport._jid_cache = JidCache(signal, 30) register_stanza_plugin(Message, EvioSignal) msg = Message() msg["from"] = "ipop" transport.boundjid.full = "edgevpn" msg["evio"]["type"] = "invk" msg["evio"]["payload"] = {"Action": "announce"} transport._sig.handle_remote_action = MagicMock() transport.message_listener(msg) mock_loads.assert_called_once() transport._sig.handle_remote_action.assert_called_once() print("Passed : testtransport_message_listener_with_invk") def testsignal_initialize(self): """ Test to check the initialize method of the signal class. """ sig_dict, signal = self.setup_vars_mocks() signal._create_transport_instance = MagicMock() signal.initialize() signal._create_transport_instance.assert_called_once() print("Passed : testsignal_initialize") @patch("modules.Signal.XmppTransport.factory") def testsignal_create_transport(self, mock_factory): """ Test to check the create transport method of the signal class. """ cfx_handle = Mock() cfx_handle.query_param.return_value = 30 module = importlib.import_module("modules.{0}" .format("Signal")) module_class = getattr(module, "Signal") sig_dict = {"Signal": {"Enabled": True, "Overlays": { "A0FB389": { "HostAddress": "1.1.1.1", "Port": "5222", "Username": "raj", "Password": "raj", "AuthenticationMethod": "PASSWORD" } }, "NodeId": "1234434323" } } signal = module_class(cfx_handle, sig_dict["Signal"], "Signal") cfx_handle._cm_instance = signal cfx_handle._cm_config = sig_dict transport = XmppTransport(sig_dict["Signal"]["Overlays"]["A0FB389"]["Username"], sig_dict["Signal"]["Overlays"]["A0FB389"]["Password"], sasl_mech="PLAIN") mock_factory.return_value = transport transport.connect_to_server = MagicMock() assert signal._create_transport_instance("1", sig_dict["Signal"]["Overlays"]["A0FB389"], None, None) == transport print("Passed : testsignal_create_transport") def testsignal_handle_remote_action_invoke(self): """ Test to check the handling of remote action with action as invoke in the signal class. """ sig_dict, signal = self.setup_vars_mocks() rem_act = {"RecipientCM": "1234434323", "Action": "Sleep", "Params": "None", "OverlayId": "A0FB389", "RecipientId": "1234434323"} signal.submit_cbt = MagicMock() signal.handle_remote_action("A0FB389", rem_act, "invk") signal.submit_cbt.assert_called_once() print("Passed : testsignal_handle_remote_action_invoke") def testsignal_handle_remote_action_complete(self): """ Test to check the handling of remote action with action as complete in the signal class. """ sig_dict, signal = self.setup_vars_mocks() rem_act = {"RecipientCM": "1234434323", "Action": "Sleep", "Params": "None", "OverlayId": "A0FB389", "InitiatorId": "1234434323", "ActionTag": "None", "Status": "Active"} signal.complete_cbt = MagicMock() signal.handle_remote_action("A0FB389", rem_act, "cmpt") signal.complete_cbt.assert_called_once() print("Passed : testsignal_handle_remote_action_complete") @patch("modules.Signal.XmppTransport.Message") def testtransport_send_message(self, msg_mock): """ Test to check the send message method of transport instance of the signal class. """ register_stanza_plugin(Message, EvioSignal) sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport.loop.call_soon_threadsafe = MagicMock() transport.register_handler( Callback("ipop", StanzaPath("message/ipop"), transport.message_listener)) msg = transport.Message() msg_mock.return_value = msg msg.send = MagicMock() transport.send_msg("2", "invk", "Data") transport.loop.call_soon_threadsafe.assert_called_once() transport.loop.call_soon_threadsafe.assert_called_with(msg.send) print("Passed : testtransport_send_message") def testjid_cache_add_lookup_entry(self): """ Test to check the lookup method of the jid-cache of the signal class. """ sig_dict, signal = self.setup_vars_mocks() jid_cache = JidCache(signal, 30) jid_cache.add_entry("123", "2345") assert jid_cache.lookup("123") == "2345" print("Passed : testjid_cache_add_lookup_entry") def testjid_cache_scavenge(self): """ Test to check the scavenge method of the jid-cache of the signal class. """ sig_dict, signal = self.setup_vars_mocks() jid_cache = JidCache(signal, 5) jid_cache.add_entry("123", "2345") assert jid_cache.lookup("123") == "2345" sleep(5) jid_cache.scavenge() assert jid_cache.lookup("123") is None print("Passed : testjid_cache_scavenge") def testsignal_req_handler_initiate_remote_action(self): """ Test to check the handling remote action method with a request of the signal class. """ sig_dict, signal = self.setup_vars_mocks() jid_cache = JidCache(signal, 5) jid_cache.add_entry("1", "2345") transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport.send_msg = MagicMock() signal._circles = {"A0FB389": {"JidCache": jid_cache, "Transport": transport}} cbt = CBT() cbt.request.params = {"RecipientId": "1", "OverlayId": "A0FB389"} signal.req_handler_initiate_remote_action(cbt) transport.send_msg.assert_called_once() print("Passed : testsignal_req_handler_initiate_remote_action") def testsignal_resp_handler_remote_action(self): """ Test to check the handling remote action method with a response of the signal class. """ sig_dict, signal = self.setup_vars_mocks() cbt = CBT() cbt.request.params = {"RecipientId": "1", "OverlayId": "A0FB389"} jid_cache = JidCache(signal, 5) jid_cache.add_entry("1", "2345") transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport.send_msg = MagicMock() signal._circles = {"A0FB389": {"JidCache": jid_cache, "Transport": transport}} cbt.tag = "1" signal.submit_cbt(cbt) resp = CBT.Response() cbt.response = resp rem_act = {"InitiatorId": "1", "OverlayId": "A0FB389"} signal._remote_acts["1"] = rem_act signal.submit_cbt(cbt) signal.transmit_remote_act = MagicMock() signal.free_cbt = MagicMock() signal.resp_handler_remote_action(cbt) signal.transmit_remote_act.assert_called_once() signal.free_cbt.assert_called_once() print("Passed : testsignal_resp_handler_remote_action") def testsignal_req_handler_query_reporting_data(self): """ Test to check the reporting of data method of the signal class. """ sig_dict, signal = self.setup_vars_mocks() cbt = CBT() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport._host = "IPOP" transport.boundjid.full = "ipopuser" signal._circles = {"A0FB389": {"Transport": transport}} signal.complete_cbt = MagicMock() signal.req_handler_query_reporting_data(cbt) signal.complete_cbt.assert_called_once() print("Passed : testsignal_req_handler_query_reporting_data") def testtransmit_remote_act(self): """ Test to check the transmit remote action method of the signal class. """ rem_act = {"InitiatorId": "1", "OverlayId": "A0FB389"} sig_dict, signal = self.setup_vars_mocks() jid_cache = JidCache(signal, 5) jid_cache.add_entry("1", "2345") transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport.send_msg = MagicMock() signal._circles = {"A0FB389": {"JidCache": jid_cache, "Transport": transport}} signal.transmit_remote_act(rem_act, "1", "invk") transport.send_msg.assert_called_once() print("Passed : testtransmit_remote_act") def testtransmit_remote_act_nopeer_jid(self): """ Test to check the transmit remote action method with no peer jid of the signal class. """ rem_act = {"InitiatorId": "1", "OverlayId": "A0FB389"} sig_dict, signal = self.setup_vars_mocks() jid_cache = JidCache(signal, 5) transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport.send_presence = MagicMock() signal._circles = {"A0FB389": {"JidCache": jid_cache, "Transport": transport, "OutgoingRemoteActs": {}}} signal.transmit_remote_act(rem_act, "1", "invk") transport.send_presence.assert_called_once() print("Passed : testtransmit_remote_act_nopeer_jid") def testsignal_process_cbt_request_rem_act(self): """ Test to check the process cbt method with a request to initiate a remote action of the signal class. """ sig_dict, signal = self.setup_vars_mocks() cbt = CBT() cbt.op_type = "Request" cbt.request.action = "SIG_REMOTE_ACTION" signal.req_handler_initiate_remote_action = MagicMock() signal.process_cbt(cbt) signal.req_handler_initiate_remote_action.assert_called_once() print("Passed : testprocess_cbt_request_rem_act") def testsignal_process_cbt_request_rep_data(self): """ Test to check the process cbt method with a request to report data of the signal class. """ sig_dict, signal = self.setup_vars_mocks() cbt = CBT() cbt.op_type = "Request" cbt.request.action = "SIG_QUERY_REPORTING_DATA" signal.req_handler_query_reporting_data = MagicMock() signal.process_cbt(cbt) signal.req_handler_query_reporting_data.assert_called_once() print("Passed : testprocess_cbt_request_rep_data") def testsignal_process_cbt_resp_tag_present(self): """ Test to check the process cbt method with a response with the cbt tag present. """ sig_dict, signal = self.setup_vars_mocks() signal._remote_acts = {"1"} signal.resp_handler_remote_action = MagicMock() cbt = CBT() cbt.op_type = "Response" cbt.tag = "1" signal.process_cbt(cbt) signal.resp_handler_remote_action.assert_called_once() print("Passed : testprocess_cbt_resp_tag_present") def testsignal_process_cbt_resp_with_parent(self): """ Test to check the process cbt method with a response with the parent present and no tag. """ sig_dict, signal = self.setup_vars_mocks() cbt1 = CBT() cbt = CBT() cbt.op_type = "Response" cbt.parent = cbt1 resp = CBT.Response() resp.data = "Data" cbt.response = resp cbt.response.status = "OK" cbt1.child_count = 1 signal.free_cbt = MagicMock() signal.complete_cbt = MagicMock() signal.process_cbt(cbt) signal.free_cbt.assert_called_once() signal.complete_cbt.assert_called_once() print("Passed : test_process_cbt_resp_with_parent") def testsignal_process_cbt_resp_with_parent_more_children(self): """ Test to check the process cbt method with a response with the parent present and no tag and more than 1 child. """ sig_dict, signal = self.setup_vars_mocks() cbt1 = CBT() cbt = CBT() cbt.op_type = "Response" cbt.parent = cbt1 resp = CBT.Response() resp.data = "Data" cbt.response = resp cbt.response.status = "OK" cbt1.child_count = 2 signal.free_cbt = MagicMock() signal.complete_cbt = MagicMock() signal.process_cbt(cbt) signal.free_cbt.assert_called_once() signal.complete_cbt.assert_not_called() print("Passed : test_process_cbt_resp_with_parent") def testsignal_terminate(self): """ Test to check the terminate method of signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) transport.shutdown = MagicMock() signal._circles = {"A0FB389": {"Transport": transport, "OutgoingRemoteActs": {}}} signal.terminate() transport.shutdown.assert_called_once() print("Passed : test_signal_terminate") def testsignal_scavenge_pending_cbts(self): """ Test to check if scavenge of pending CBT works with one above the request timeout. """ cfxObject = Mock() cfx_handle = CFxHandle(cfxObject) module = importlib.import_module("modules.{0}" .format("Signal")) module_class = getattr(module, "Signal") sig_dict = {"Signal": {"Enabled": True, "Overlays": { "A0FB389": { "HostAddress": "1.1.1.1", "Port": "5222", "Username": "raj", "Password": "raj", "AuthenticationMethod": "PASSWORD" } } }, "NodeId": "1234434323" } signal = module_class(cfx_handle, sig_dict, "Signal") cfx_handle._cm_instance = signal cfx_handle._cm_config = sig_dict cbt1 = CBT() cbt1.tag = "1" cbt1.time_submit = time.time() - 5 signal.request_timeout = 5 cbt2 = CBT() cbt2.tag = "2" cbt2.time_submit = time.time() - 1 signal.complete_cbt = MagicMock() signal._cfx_handle._pending_cbts.update({"0": cbt1}) signal._cfx_handle._pending_cbts.update({"1": cbt2}) assert len(signal._cfx_handle._pending_cbts.items()) == 2 signal.scavenge_pending_cbts() assert len(signal._cfx_handle._pending_cbts.items()) == 1 items = {} items.update({"1": cbt2}) assert signal._cfx_handle._pending_cbts == items print("Passed : testsignal_scavenge_pending_cbts") def testsignal_scavenge_expired_outgoing_rem_acts_single_entry(self): """ Test to check scavenge remote actions method with a single entry of the signal class. """ sig_dict, signal = self.setup_vars_mocks() signal.request_timeout = 3 item = {0: "invk", 1: {"ActionTag": "1"}, 2: 5} q = Queue() q.put_nowait(item) outgoing_rem_acts = {"1": q} signal.complete_cbt = MagicMock() signal.scavenge_expired_outgoing_rem_acts(outgoing_rem_acts) signal.complete_cbt.assert_called_once() print("Passed : testsignal_scavenge_expired_outgoing_rem_acts_single_entry") def testsignal_scavenge_expired_outgoing_rem_acts_multiple_entries(self): """ Test to check scavenge remote actions method with multiple entries of the signal class. """ sig_dict, signal = self.setup_vars_mocks() signal.request_timeout = 3 item1 = {0: "invk", 1: {"ActionTag": "1"}, 2: 5} item2 = {0: "invk", 1: {"ActionTag": "2"}, 2: 6} q = Queue() q.put_nowait(item1) q.put_nowait(item2) outgoing_rem_acts = {"1": q} assert outgoing_rem_acts["1"].qsize() == 2 signal.complete_cbt = MagicMock() signal.scavenge_expired_outgoing_rem_acts(outgoing_rem_acts) assert len(outgoing_rem_acts) == 0 assert signal.complete_cbt.call_count == 2 print("Passed : testsignal_scavenge_expired_outgoing_rem_acts_multiple_entries") def testsignal_timer_method(self): """ Test to check the timer method of the signal class. """ sig_dict, signal = self.setup_vars_mocks() transport = XmppTransport.factory("1", sig_dict["Signal"]["Overlays"]["A0FB389"], signal, signal._presence_publisher, None, None) rem_acts = {} jid_cache = JidCache(signal, 5) jid_cache.scavenge = MagicMock() signal.scavenge_pending_cbts = MagicMock() transport.event_loop = MagicMock() signal._circles = { "A0FB389": {"Announce": 0, "Transport": transport, "OutgoingRemoteActs": rem_acts, "JidCache": jid_cache}} signal._circles["A0FB389"]["Transport"].event_loop.call_soon_threadsafe = MagicMock() signal.timer_method() signal._circles["A0FB389"]["Transport"].event_loop.call_soon_threadsafe.assert_called_once() jid_cache.scavenge.assert_called_once() signal.scavenge_pending_cbts.assert_called_once() print("Passed : testsignal_timer_method") if __name__ == '__main__': unittest.main()