import time import multiprocessing import logging import settings import rticonnextdds_connector as rti from datetime import datetime, timedelta logging.basicConfig(filename=settings.LOG_FILE, level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S', format='%(asctime)s:%(levelname)s:%(message)s') class DDS_Algo: """ Create a DDS entity to exchange network information with other entities in a DDS domain :param DDS Id: Id to authenticate in DDS domain during discovery process :param DDS Participant Name: Participant name in DDS domain :param DDS Participant IP: Participant ip to communicate with other participants :param Main GC: GC that manages the region where the cluster is deployed :return: """ cluster = "" def __init__(self, dds_id, dds_participant_name, main_gc): self.own_key_name = "K8S_Master" self.main_gc = main_gc self.isActiveMainGC = True self.id = dds_id DDS_Algo.cluster = dds_participant_name self.sample_count = 0 self.connector = rti.Connector("MyParticipantLibrary::kubernetes-control-plane1", "topologia.xml") print(self.connector.get_version) self.writer = self.connector.get_output("kubernetes-control-plane1-pub::kubernetes-control-plane1-dw") self.reader = self.connector.get_input("kubernetes-control-plane1-sub::kubernetes-control-plane1-dr") def read_samples(self): from scheduler import Scheduler print("Waiting for publications...") self.reader.wait_for_publications() matched_pubs = self.reader.matched_publications for pub_info in matched_pubs: print(pub_info) print("Waiting for data...") while True: self.connector.wait() self.reader.take() for sample in self.reader.samples.valid_data_iter: print(sample.__dict__) data = sample.get_dictionary() print(data) identificador = data["Identificador"] sender = data["DestinationNode"] if identificador == DDS_Algo.cluster and sender != DDS_Algo.cluster: node = data["NodeId"] vnfName = data["TerminationPointId"] if Scheduler.monitor.multideployed_pods.isPodList(lambda x: x.metadata.name == vnfName): Scheduler.bind_to_node(vnfName, node) pod = Scheduler.monitor.multideployed_pods.getPod(lambda x: x.metadata.name == vnfName) Scheduler.monitor.multideployed_pods.items.remove(pod) is_scheduled = Scheduler.is_pod_deployed(pod) service = Scheduler.monitor.all_services.getService(lambda x: x.id_ == pod.service_id) if is_scheduled: self.writer.instance["Identificador"] = "VNF_Deployed" self.writer.instance["NodeId"] = vnfName self.writer.instance["TerminationPointId"] = service.name self.writer.instance["DestinationNode"] = DDS_Algo.cluster dt = int(datetime.now().timestamp() * 1000000000) self.writer.write(source_timestamp=dt)