/* (c) Copyright, Real-Time Innovations, 2012. All rights reserved. * Author: Gerardo Pardo-Castellote */ package com.rti.dds.snippets; import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.ConcurrentSkipListMap; import com.rti.dds.cdr.CdrBuffer; import com.rti.dds.cdr.CdrInputStream; import com.rti.dds.cdr.CdrOutputStream; import com.rti.dds.domain.*; import com.rti.dds.infrastructure.*; import com.rti.dds.publication.builtin.*; import com.rti.dds.subscription.*; import com.rti.dds.subscription.builtin.*; import com.rti.dds.typecode.TypeCode; import com.rti.dds.typecode.TypeCodeFactory; import com.rti.dds.util.NativeInterface; import com.rti.ndds.config.Version; public class MonitorDicoveredTypes { private static int verbosity = 1; private int domainId; private DomainParticipant participant; private PublicationBuiltinTopicDataDataReader publicationsDR; private SubscriptionBuiltinTopicDataDataReader subscriptionsDR; private final static int MAX_ACTIVE_CONDITIONS = 3; // We will only install e conditions on the private ConditionSeq activeConditionSeq; private WaitSet discoveryWaitSet; // This will hold the discovered types private ConcurrentSkipListMap discoveredTypes; public boolean start(int theDomainId) { Version version = Version.get_instance (); System.out.println("Running RTI Connext version: " + version); discoveredTypes = new ConcurrentSkipListMap(); domainId = theDomainId; DomainParticipantFactory factory = DomainParticipantFactory.get_instance(); DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos(); // This instructs the DomainParticipantFactory to not enable the DomainParticipant // entities it creates automatically. This is needed so that we have a chance to // retrieve the builtin data-readers before the participant starts receiving // discovery data. Later it is explained why this is needed factoryQos.entity_factory.autoenable_created_entities = false; DomainParticipantQos pQos = new DomainParticipantQos(); factory.get_default_participant_qos(pQos); pQos.participant_name.name = "RTI Connext Monitor Types Snippet"; try { participant = factory.create_participant( domainId, pQos, //DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT, null, // listener StatusKind.STATUS_MASK_NONE); } catch ( Exception e) { String lastStartError = "Error creating the DDS domain. Common causes are:" + "\n - Lack of a network. E.g disconected wireless." + "\n - A network interface that does not bind multicast addresses. In some platforms enabling using the TUN interface " + "\n for (Open)VPN causes this. If this is your situation try configure (Open)VPN to use TAP instead."; System.out.println(lastStartError); return false; } // We count ourselves as a participant that is present // The "lookup_xxx" operations not only retrieve the builtin data-readers but also // activate the cacheing of discovered types. To save resources discovered types // are only saved in the builtin reader cache which is only active after calling // the corresponding "lookup_xxx" operation. // It is for this reason that we instructed the DomainParticipantFactory to not // automatically enabled the DomainParticipant. This gives us the opportuity to // retrieve the builtin entities and activate the cacheing of discovered types // beforfe we receive any discovery information. If we did not do this we may // miss the data-types of the first few entities discovered publicationsDR = (PublicationBuiltinTopicDataDataReader) participant.get_builtin_subscriber().lookup_datareader("DCPSPublication"); subscriptionsDR = (SubscriptionBuiltinTopicDataDataReader) participant.get_builtin_subscriber().lookup_datareader("DCPSSubscription"); // Enable the participant. This causes it to start receiving discovery traffic. // Note: Enable fails if there is no network of if any interfaces are not multicast enabled // In some platforms (e.g. MacOSX) having the VPN running causes enable() to fail try { participant.enable(); } catch ( Exception e) { String lastStartError = "Error enabling the DDS domain. Common causes are:" + "\n - Lack of a network. E.g disconected wireless." + "\n - A network interface that does not bind multicast addresses. In some platforms enabling using the TUN interface " + "\n for (Open)VPN causes this. If this is your situation try configure (Open)VPN to use TAP instead."; System.out.println(lastStartError); return false; } // Create a WairSet object that can be used to block the calling thread until there is // discovery data to read. This avoids having to poll and this use CPU continuously. discoveryWaitSet = new WaitSet(); // Attach the conditions that would wakeup the waitset. In this case the arrival of data on // any of the builting datareaders discoveryWaitSet.attach_condition(publicationsDR.get_statuscondition()); publicationsDR.get_statuscondition().set_enabled_statuses(StatusKind.DATA_AVAILABLE_STATUS); discoveryWaitSet.attach_condition(subscriptionsDR.get_statuscondition()); subscriptionsDR.get_statuscondition().set_enabled_statuses(StatusKind.DATA_AVAILABLE_STATUS); activeConditionSeq = new ConditionSeq(MAX_ACTIVE_CONDITIONS); return true; } public void waitForDiscoveryData() { Duration_t waitDuration = new Duration_t(Duration_t.DURATION_INFINITE_SEC, Duration_t.DURATION_INFINITE_NSEC); System.out.println("waitForDiscoveryData"); discoveryWaitSet.wait(activeConditionSeq, waitDuration); } /** * Processes all the new TypeCodes discovered entering the new ones * into an internal map organized by their type_name and reporting situations * where the same type_name is used with different definitions of the TypeCode * * @return A list with the TypeCodes that were entered into the internal map * that is the ones that correspond to a type_name that was not seen before */ public ArrayList processDiscoveredTypes() { ArrayList newTypeCodes = new ArrayList(); System.out.println("processDiscoveredTypes"); if ( publicationsDR.get_statuscondition().get_trigger_value() ) { processTypesInDiscoveredDataWriters(newTypeCodes); } if ( subscriptionsDR.get_statuscondition().get_trigger_value() ) { processTypesInDiscoveredDataReaders(newTypeCodes); } return newTypeCodes; } private void processTypesInDiscoveredDataReaders(ArrayList newTypeCodes) { SubscriptionBuiltinTopicData subscriptionData = new SubscriptionBuiltinTopicData();; SampleInfo info = new SampleInfo();; try { while ( true ) { subscriptionsDR.take_next_sample(subscriptionData, info); // We are only interested in new DataReaders which we have not // seen before. This is because the data-type will never change for any // one DataReader to other updates indicating change of QoS or deletion of the // DataReader can be safely ignored if ( info.view_state == ViewStateKind.NEW_VIEW_STATE ) { System.out.println("DataReader (New)" + " name: \"" + subscriptionData.subscription_name.name + "\"" + " topic: \"" + subscriptionData.topic_name + "\"" + " type: \"" + subscriptionData.type_name + "\"" ); if (processType(subscriptionData.type_name, subscriptionData.type_code)) { newTypeCodes.add(subscriptionData.type_code); } } } } catch (RETCODE_NO_DATA noData) { } // catch (RETCODE_BAD_PARAMETER badParam) { } finally { } } private void processTypesInDiscoveredDataWriters(ArrayList newTypeCodes) { PublicationBuiltinTopicData publicationData = new PublicationBuiltinTopicData(); SampleInfo info = new SampleInfo(); try { while ( true ) { publicationsDR.take_next_sample(publicationData, info); // We are only interested in new DataWriters which we have not // seen before. This is because the data-type will never change for any // one DataWriter to other updates indicating change of QoS or deletion of the // DataWriter can be safely ignored if ( info.view_state == ViewStateKind.NEW_VIEW_STATE ) { System.out.println("DataWriter (New)" + " name: \"" + publicationData.publication_name.name + "\"" + " topic: \"" + publicationData.topic_name + "\"" + " type: \"" + publicationData.type_name + "\"" ); if ( processType(publicationData.type_name, publicationData.type_code) ) { newTypeCodes.add(publicationData.type_code); } } } } catch (RETCODE_NO_DATA noData) { } // catch (RETCODE_BAD_PARAMETER badParam) { } finally { } } /** This function illustrates how to serialize a TypeCode in Java such that it can be saved in * a file or sent out-of band. * * @param typeCode a TypeCode such as the one received via discovery in * the BuitinPublicationTopicData or BuitinSubscriptionTopicData * * @return A byte array containing a serialized raw-byte representation of the TypeCode */ public byte[] serializeTypeCode(TypeCode typeCode) { int bufferSize = typeCode.get_serialized_size(0); CdrOutputStream stream = new CdrOutputStream(bufferSize); typeCode.serialize(stream); return stream.getBuffer().getBuffer(); } /** * This function illustrates how to create a TypeCode in Java from the serialized representation * obtained by the call to serializeTypeCode(). * * @param serializedTypeCode an array of bytes containing the serialized representation of * a TypeCode such as the one returned by serializeTypeCode() * * @return A TypeCode that corresponds to that serialized representation */ // a file or sent out-of band. public TypeCode deserializeTypeCode(byte[] serializedTypeCode) { boolean needByteSwap = NativeInterface.getInstance().isNativeByteOrderLittleEndian(); CdrInputStream stream = new CdrInputStream(serializedTypeCode, needByteSwap); TypeCode typeCode = TypeCodeFactory.get_instance().create_tc_from_stream(stream); return typeCode; } /** * Adds the type_code into the discoveredTypes map when it is the first time that it * sees a type_code associated with the name type_name. It also reports errors if * if a type_code is found which has the same name as another but an different * definition. * * Looks up the type_code in the discoveredTypes Map to determine if there * is already one with that same type_name. * * If no TypeCode with name type_name is found, then it is entered into discoveredTypes * If a TypeCode if found with type_name then the TypeCodes are compared and if they are * not the same it prints an error and the two types. * * The function returns true if the type_code was entered into the discoveredTypes map, * that is, if it is teh first time that the type_name was seen. * * @param type_name the type name associated with the type_code * @param type_code the TypeCode that describes the structure of the type. * * @return true if this is a type that was not seen before. Otherwise returns false */ private boolean processType(String type_name, TypeCode type_code) { TypeCode existingType = null; System.out.println("Discovered type: " + type_name); if ( type_code == null ) { System.out.println("No type information was supplied for type: \"" + type_name + "\""); return false; } // See if we already had a type with the same name: existingType = discoveredTypes.get(type_name); if ( existingType == null ) { System.out.println("This type had not seen before. Its structure is:"); type_code.print_IDL(0); discoveredTypes.put(type_name, type_code); return true; } else { System.out.println("This type had been seen already. Comparing the type definitions..."); if ( existingType.equals(type_code) ) { System.out.println("The type matches the existing definition"); return false; } else { System.out.println("The type DOES NOT match the existing definition"); System.out.println("This is the existing definition:"); existingType.print_IDL(0); System.out.println("This is the definition of the type just discovered:"); type_code.print_IDL(0); return true; } } } public static void main(String args[]) throws InterruptedException { String NDDSHOME = System.getenv("NDDSHOME"); String DYLD_LIBRARY_PATH = System.getenv("DYLD_LIBRARY_PATH"); System.out.println("NDDSHOME="+ NDDSHOME); System.out.println("DYLD_LIBRARY_PATH="+ DYLD_LIBRARY_PATH); int domainId = 0; MonitorDicoveredTypes typesSpy = new MonitorDicoveredTypes(); if ( !typesSpy.start(domainId) ) { return; } while (true) { typesSpy.waitForDiscoveryData(); typesSpy.processDiscoveredTypes(); } } }