/* (c) Copyright, Real-Time Innovations, 2012. All rights reserved. * Author: Gerardo Pardo-Castellote */ package com.rti.dds.snippets; import java.util.concurrent.ConcurrentSkipListMap; import com.rti.dds.domain.*; import com.rti.dds.dynamicdata.DynamicData; import com.rti.dds.dynamicdata.DynamicDataMemberInfo; import com.rti.dds.dynamicdata.DynamicDataReader; import com.rti.dds.dynamicdata.DynamicDataSeq; import com.rti.dds.dynamicdata.DynamicDataTypeSupport; import com.rti.dds.typecode.TCKind; import com.rti.dds.infrastructure.*; import com.rti.dds.publication.builtin.*; import com.rti.dds.subscription.*; import com.rti.dds.topic.Topic; import com.rti.dds.typecode.TypeCode; import com.rti.dds.typecode.TypeCodeFactory; import com.rti.ndds.config.Version; import java.util.HashMap; import java.util.Map; import java.util.Iterator; import java.util.Set; import java.sql.*; public class MonitorData { private static int verbosity = 1; private int domainId; private DomainParticipant participant; private PublicationBuiltinTopicDataDataReader publicationsDR; private final static int MAX_ACTIVE_CONDITIONS = 3; // We will only install e conditions on the private ConditionSeq activeConditionSeq; private WaitSet discoveryWaitSet; // This will hold the discovered types private ConcurrentSkipListMap discoveredTypes; // JDBC driver name and database URL static final String JDBC_DRIVER = "com.mysql.jdbc.Driver"; static final String DB_URL = "jdbc:mysql://localhost/STUDENTS"; // Database credentials static final String USER = "username"; static final String PASS = "password"; private static class PrintDynamicDataListener extends DataReaderAdapter { DynamicDataSeq _dataSeq = new DynamicDataSeq(); SampleInfoSeq _infoSeq = new SampleInfoSeq(); HashMap hmap = new HashMap(); String gh; public void on_data_available(DataReader reader) { DynamicDataReader dynamicDR = (DynamicDataReader)reader; try { dynamicDR.take( _dataSeq, _infoSeq, ResourceLimitsQosPolicy.LENGTH_UNLIMITED, SampleStateKind.ANY_SAMPLE_STATE, ViewStateKind.ANY_VIEW_STATE, InstanceStateKind.ANY_INSTANCE_STATE); for(int i = 0; i < _dataSeq.size(); ++i) { SampleInfo info = (SampleInfo)_infoSeq.get(i); if (info.valid_data) { DynamicData data = (DynamicData)_dataSeq.get(i); System.out.println("Received data..."); System.out.println("Type is:"); data.get_type().print_IDL(0); //System.out.println("Received data type..." +data.get_type()); /*System.out.println("Received data type..." +data); System.out.println("Received data kind..." +data.get_type_kind()); hmap.put(data.get_type(), data); gh = data.toString(); System.out.println("member count ..." +data.get_member_count ());*/ //typecode = data; // int bufferSize = typeCode.get_serialized_size(0); //int bufferSize = data.get_serialized_size(0); //CdrOutputStream stream = new CdrOutputStream(bufferSize); //typeCode.serialize(stream); //return stream.getBuffer().getBuffer(); StringBuilder stringBuilder = new StringBuilder(); for(int j = 0; j < data.get_member_count(); ++j) { System.out.println("member info .." +data.get_member_count()); DynamicDataMemberInfo _memberInfo = new DynamicDataMemberInfo(); data.get_member_info_by_index(_memberInfo, j) ; System.out.println(_memberInfo.member_kind); // Translate according to member kind if (_memberInfo.member_kind == TCKind.TK_BOOLEAN) { // Member present in this sample? if (_memberInfo.member_exists) { // Boolean type; assign directly stringBuilder.append(data.get_boolean( _memberInfo.member_name, _memberInfo.member_id)); } } else if (_memberInfo.member_kind == TCKind.TK_OCTET) { // Member present in this sample? if (_memberInfo.member_exists) { // Byte type; assign directly stringBuilder.append(data.get_byte( _memberInfo.member_name, _memberInfo.member_id)); } } else if (_memberInfo.member_kind == TCKind.TK_SEQUENCE) { // Sequence; only Octet Sequence if (_memberInfo.element_kind != TCKind.TK_OCTET) { continue; } // Member present in this sample? if (_memberInfo.member_exists) { // Get byte array from octet sequence data.get_byte_seq(_octetSequence, _memberInfo.member_name, _memberInfo.member_id); byte[] byteArray = null; byteArray = _octetSequence.toArrayByte(byteArray); // Assign byte array jmsMapMessage.setBytes(_memberInfo.member_name, byteArray); } else { // Member not present in this sample; set default jmsMapMessage.setBytes(_memberInfo.member_name, DEFAULT_OCTET_SEQUENCE_VALUE); } } else if (_memberInfo.member_kind == TCKind.TK_CHAR) { // Member present in this sample? if (_memberInfo.member_exists) { // Char type; assign directly jmsMapMessage.setChar(_memberInfo.member_name, dynamicDataSample.get_char( _memberInfo.member_name, _memberInfo.member_id)); } else { // Member not present in this sample; set default jmsMapMessage.setChar(_memberInfo.member_name, DEFAULT_CHAR_VALUE); } } else if (_memberInfo.member_kind == TCKind.TK_DOUBLE) { // Member present in this sample? if (_memberInfo.member_exists) { // Double type; assign directly jmsMapMessage.setDouble(_memberInfo.member_name, dynamicDataSample.get_double( _memberInfo.member_name, _memberInfo.member_id)); } else { // Member not present in this sample; set default jmsMapMessage.setDouble(_memberInfo.member_name, DEFAULT_DOUBLE_VALUE); } } else if (_memberInfo.member_kind == TCKind.TK_FLOAT) { // Member present in this sample? if (_memberInfo.member_exists) { // Float type; assign directly jmsMapMessage.setFloat(_memberInfo.member_name, dynamicDataSample.get_float( _memberInfo.member_name, _memberInfo.member_id)); } else { // Member not present in this sample; set default jmsMapMessage.setFloat(_memberInfo.member_name, DEFAULT_FLOAT_VALUE); } } else if (_memberInfo.member_kind == TCKind.TK_LONG) { // Member present in this sample? if (_memberInfo.member_exists) { // Long type; assign int from long jmsMapMessage.setInt(_memberInfo.member_name, dynamicDataSample.get_int( _memberInfo.member_name, _memberInfo.member_id)); } else { // Member not present in this sample; set default jmsMapMessage.setInt(_memberInfo.member_name, DEFAULT_LONG_VALUE); } } else if (_memberInfo.member_kind == TCKind.TK_LONGLONG) { // Member present in this sample? if (_memberInfo.member_exists) { // Long Long type; assign long from long long jmsMapMessage.setLong(_memberInfo.member_name, dynamicDataSample.get_long( _memberInfo.member_name, _memberInfo.member_id)); } else { // Member not present in this sample; set default jmsMapMessage.setLong(_memberInfo.member_name, DEFAULT_LONGLONG_VALUE); } } else if (_memberInfo.member_kind == TCKind.TK_SHORT) { // Member present in this sample? if (_memberInfo.member_exists) { // Short type; assign directly jmsMapMessage.setShort(_memberInfo.member_name, dynamicDataSample.get_short( _memberInfo.member_name, _memberInfo.member_id)); } else { // Member not present in this sample; set default jmsMapMessage.setShort(_memberInfo.member_name, DEFAULT_SHORT_VALUE); } } else if (_memberInfo.member_kind == TCKind.TK_STRING) { // Member present in this sample? if (_memberInfo.member_exists) { // String type; assign directly jmsMapMessage.setString(_memberInfo.member_name, new String(dynamicDataSample.get_string( _memberInfo.member_name, _memberInfo.member_id))); } //stringBuilder.append(data.get_string(dinfo.member_name, dinfo.member_id)); //System.out.println("Pankaj String is .. " +stringBuilder.toString()); /*String sql = "INSERT INTO table" + "(USER_ID, USERNAME, CREATED_BY, CREATED_DATE) " + "VALUES" + "(1,'mkyong','system', " + "to_date('" + getCurrentTimeStamp() + "', 'yyyy/mm/dd hh24:mi:ss'))"; + "VALUES (100, 'Zara', 'Ali', 18)"; stmt.executeUpdate(sql); "INSERT INTO incomeCalc(ID, TIPS, HOURS, GAS, HOURLY_EARNINGS) " + "VALUES (3, 75, 6, 25, 18.50)" }*/ } System.out.println("Calling DynamicData.print..."); data.print(null, 0); System.out.println("DynamicData.print complete"); long value1 = data.get_long(null, 1); long value2 = data.get_long(null, 2); System.out.println(" value1 = " + value1 + "\n value2 = " + value2); } } } catch (RETCODE_NO_DATA noData) { // No data to process } finally { dynamicDR.return_loan(_dataSeq, _infoSeq); } } } private boolean start(int theDomainId) { Version version = Version.get_instance (); System.out.println("Running RTI Connext version: " + version); discoveredTypes = new ConcurrentSkipListMap(); domainId = theDomainId; DomainParticipantFactory factory = DomainParticipantFactory.get_instance(); DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos(); // This instructs the DomainParticipantFactory to not enable the DomainParticipant // entities it creates automatically. This is needed so that we have a chance to // retrieve the builtin data-readers before the participant starts receiving // discovery data. Later it is explained why this is needed factoryQos.entity_factory.autoenable_created_entities = false; DomainParticipantQos pQos = new DomainParticipantQos(); factory.get_default_participant_qos(pQos); pQos.participant_name.name = "RTI Connext Monitor Types Snippet"; try { participant = factory.create_participant( domainId, pQos, //DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT, null, // listener StatusKind.STATUS_MASK_NONE); } catch ( Exception e) { String lastStartError = "Error creating the DDS domain. Common causes are:" + "\n - Lack of a network. E.g disconected wireless." + "\n - A network interface that does not bind multicast addresses. In some platforms enabling using the TUN interface " + "\n for (Open)VPN causes this. If this is your situation try configure (Open)VPN to use TAP instead."; System.out.println(lastStartError); return false; } // We count ourselves as a participant that is present // The "lookup_xxx" operations not only retrieve the built-in data-readers but also // activate the caching of discovered types. To save resources discovered types // are only saved in the built-in reader cache which is only active after calling // the corresponding "lookup_xxx" operation. // It is for this reason that we instructed the DomainParticipantFactory to not // automatically enable the DomainParticipant. This gives us the opportunity to // retrieve the built-in entities and activate the caching of discovered types // before we receive any discovery information. If we did not do this we may // miss the data-types of the first few entities discovered publicationsDR = (PublicationBuiltinTopicDataDataReader) participant.get_builtin_subscriber().lookup_datareader("DCPSPublication"); // Enable the participant. This causes it to start receiving discovery traffic. // Note: Enable fails if there is no network of if any interfaces are not multicast enabled // In some platforms (e.g. MacOSX) having the VPN running causes enable() to fail try { participant.enable(); } catch ( Exception e) { String lastStartError = "Error enabling the DDS domain. Common causes are:" + "\n - Lack of a network. E.g disconected wireless." + "\n - A network interface that does not bind multicast addresses. In some platforms enabling using the TUN interface " + "\n for (Open)VPN causes this. If this is your situation try configure (Open)VPN to use TAP instead."; System.out.println(lastStartError); return false; } // Create a WairSet object that can be used to block the calling thread until there is // discovery data to read. This avoids having to poll and this use CPU continuously. discoveryWaitSet = new WaitSet(); // Attach the conditions that would wakeup the waitset. In this case the arrival of data on // any of the built-in datareaders discoveryWaitSet.attach_condition(publicationsDR.get_statuscondition()); publicationsDR.get_statuscondition().set_enabled_statuses(StatusKind.DATA_AVAILABLE_STATUS); activeConditionSeq = new ConditionSeq(MAX_ACTIVE_CONDITIONS); return true; } private void waitForDiscoveredDataWriters() { Duration_t waitDuration = new Duration_t(Duration_t.DURATION_INFINITE_SEC, Duration_t.DURATION_INFINITE_NSEC); System.out.println("waitForDiscoveredDataWriters"); try { discoveryWaitSet.wait(activeConditionSeq, waitDuration); } catch (RETCODE_TIMEOUT timeoutRetcode) {} } private void processDiscoveredDataWriters() { System.out.println("processDiscoveredDataWriters"); if ( publicationsDR.get_statuscondition().get_trigger_value() ) { processTypesInDiscoveredDataWriters(); } } private void processTypesInDiscoveredDataWriters() { PublicationBuiltinTopicData publicationData = new PublicationBuiltinTopicData(); SampleInfo info = new SampleInfo(); try { while ( true ) { publicationsDR.take_next_sample(publicationData, info); // We are only interested in new DataWriters which we have not // seen before. This is because the data-type will never change for any // one DataWriter to other updates indicating change of QoS or deletion of the // DataWriter can be safely ignored if ( info.view_state == ViewStateKind.NEW_VIEW_STATE ) { System.out.println("DataWriter (New)" + " name: \"" + publicationData.publication_name.name + "\"" + " topic: \"" + publicationData.topic_name + "\"" + " type: \"" + publicationData.type_name + "\"" ); processDiscoveredTopic(publicationData.topic_name, publicationData.type_name, publicationData.type_code); } } } catch (RETCODE_NO_DATA noData) { } // catch (RETCODE_BAD_PARAMETER badParam) { } finally { } } private void processDiscoveredTopic(String topic_name, String type_name, TypeCode type_code) { TypeCode existingType = null; DynamicDataTypeSupport typeSupport = null; System.out.println("Discovered topic: \"" + topic_name + "\" with type: \"" + type_name + "\""); if ( type_code == null ) { System.out.println("No type information was supplied for type: \"" + type_name + "\""); return; } // See if we already had a type with the same name: existingType = discoveredTypes.get(type_name); if ( existingType == null ) { System.out.println("This type had not seen before. Its structure is:"); type_code.print_IDL(0); discoveredTypes.put(type_name, type_code); // register the discovered type with the Participant typeSupport = new DynamicDataTypeSupport(type_code, DynamicDataTypeSupport.TYPE_PROPERTY_DEFAULT); typeSupport.register_type(participant, type_name); } else { System.out.println("This type had been seen already. Comparing the type definitions..."); if ( existingType.equals(type_code) ) { System.out.println("The type matches the existing definition"); } else { System.out.println("The type DOES NOT match the existing definition"); System.out.println("This is the existing definition:"); existingType.print_IDL(0); System.out.println("This is the definition of the type just discovered:"); type_code.print_IDL(0); } } // Check if we had already a topic created with that name if ( this.participant.lookup_topicdescription(topic_name) == null ) { System.out.println("This topic \"" + topic_name + "\" had not seen before. Creating it."); // Topic was not known. Create a Topic and DataReader try { Topic topic = participant.create_topic( topic_name, type_name, DomainParticipant.TOPIC_QOS_DEFAULT, null /* listener */, StatusKind.STATUS_MASK_NONE); PrintDynamicDataListener listener = new PrintDynamicDataListener(); DataReader dataReader = participant.create_datareader( topic, Subscriber.DATAREADER_QOS_DEFAULT, listener, StatusKind.DATA_AVAILABLE_STATUS); } catch ( Exception e ) { System.out.println("creation of Topic and DataReader failed with execption: " + e) ; } } } public static void main(String args[]) throws InterruptedException { String NDDSHOME = System.getenv("NDDSHOME"); String DYLD_LIBRARY_PATH = System.getenv("DYLD_LIBRARY_PATH"); System.out.println("NDDSHOME="+ NDDSHOME); System.out.println("DYLD_LIBRARY_PATH="+ DYLD_LIBRARY_PATH); int domainId = 0; Connection conn = null; Statement stmt = null; // stmt = conn.createStatement(); MonitorData dataSpy = new MonitorData(); if ( !dataSpy.start(domainId) ) { return; } while (true) { dataSpy.waitForDiscoveredDataWriters(); dataSpy.processDiscoveredDataWriters(); } } }