package com.lmco.chassis.aar; import com.rti.*; import static com.rti.RTIRecorderAdminMessageType.*; import static com.rti.RTIRecorderResponseKind.RTI_RECORDER_RESPONSE_OK; import com.rti.dds.infrastructure.InstanceHandle_t; import com.rti.dds.domain.DomainParticipant; import com.rti.dds.domain.DomainParticipantFactory; import com.rti.dds.infrastructure.RETCODE_NO_DATA; import com.rti.dds.infrastructure.StatusKind; import com.rti.dds.topic.*; import com.rti.dds.publication.*; import com.rti.dds.subscription.*; import static com.rti.dds.infrastructure.StatusKind.STATUS_MASK_NONE; import java.io.IOException; import java.util.Date; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class RtiRecordControlSample { private static final Logger logger = LogManager.getLogger(RtiRecordControl.class); /** * The difference between the domain id of the sim being recorded and the domain id of the RTI * remote recording API. */ private static final int RTI_RECORD_SVC_DOMAIN_OFFSET_FROM_SIM_DOMAIN = 100; private static final String RTI_CONFIG_FILENAME = "rtirecord.xml"; private final int simulatorId; private DomainParticipant domainParticipant; private Topic topicCommand = null; private Topic topicStatus = null; private RTIRecorderAdminMessageDataReader dataReader; private RTIRecorderAdminMessageDataWriter dataWriter; private Process rtiRecordingProcess = null; private int adminMsgId = 1; public RtiRecordControlSample(int simulatorId) { this.simulatorId = simulatorId; } /** * Initialize the RTI record service and connect to it using the DDS API. */ public boolean init() { domainParticipant = DomainParticipantFactory.TheParticipantFactory.create_participant( simulatorId + RTI_RECORD_SVC_DOMAIN_OFFSET_FROM_SIM_DOMAIN, DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT, null /* listener */, StatusKind.STATUS_MASK_NONE); RTIRecorderAdminMessageTypeSupport.register_type(domainParticipant, RTI_RECORDER_COMMAND_TYPE.VALUE); TopicQos tqos = new TopicQos(); topicStatus = domainParticipant.create_topic( RTI_RECORDER_COMMAND_RESPONSE_TOPIC.VALUE, RTI_RECORDER_COMMAND_TYPE.VALUE, tqos, null, STATUS_MASK_NONE); Subscriber subscriber = domainParticipant.create_subscriber( DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null, STATUS_MASK_NONE); dataReader = (RTIRecorderAdminMessageDataReader) subscriber.create_datareader_with_profile( topicStatus, "BuiltinQosLibExp", "Generic.StrictReliable", null, STATUS_MASK_NONE); topicCommand = domainParticipant.create_topic( RTI_RECORDER_COMMAND_REQUEST_TOPIC.VALUE, RTI_RECORDER_COMMAND_TYPE.VALUE, tqos, null, STATUS_MASK_NONE); Publisher publisher = domainParticipant.create_publisher( DomainParticipant.PUBLISHER_QOS_DEFAULT, null, STATUS_MASK_NONE); dataWriter = (RTIRecorderAdminMessageDataWriter) publisher.create_datawriter_with_profile( topicCommand, "BuiltinQosLibExp", "Generic.StrictReliable", null, STATUS_MASK_NONE); startRecorderProcess(); waitForInfo(); return true; } /** * Start the recording service from RTI. */ private void startRecorderProcess() { try { // Launch the recorder. Configuration has remote control enabled. // There are multiple configurations in the config file, one per simulator // They are all identical, just the domain ID is different. String configuration = "sim" + Integer.toString(simulatorId); // Start the RTI Recording service. command line looks like: // rtirecord -cfgFile rtirecord.xml -cfgName sim1 rtiRecordingProcess = new ProcessBuilder(). command("/opt/rti/rti_connext_dds-5.2.0/bin/rtirecord", "-cfgFile", RTI_CONFIG_FILENAME, "-cfgName", configuration). start(); } catch (IOException ioException) { logger.error("Error attempting to start rtirecord. Is rtirecord in your PATH?", ioException); } } /** * */ private void waitForInfo() { RTIRecorderAdminMessage recordAdmin = waitFor(0, RTI_RECORDER_ADMIN_INFO); if (recordAdmin != null) { logger.info("RTI record service info: " + recordAdmin.msg.info.state.toString()); logger.debug(new Date().toString() + " - Info MSG received: " + recordAdmin.msg.info.state.toString()); } else { logger.debug(new Date().toString() + " - Never received info message!"); } } /** * */ private void waitForOK(int msgId) { RTIRecorderAdminMessage recordAdmin = waitFor(msgId, RTI_RECORDER_ADMIN_RESPONSE); if (recordAdmin == null || recordAdmin.msg.result.kind != RTI_RECORDER_RESPONSE_OK) { logger.error("Failed to receive response from RTI Record service."); logger.debug(new Date().toString() + " - MISSING OK RESPONSE For msg " + Integer.toString(msgId)); } else { logger.debug(new Date().toString() + " - GOT OK RESPONSE For msg " + Integer.toString(msgId)); } } /** * Wait for a specific type of message to arrive in the status topic. Timeout is large. * * @param msgType 0 to ignore msg_id comparison, non zero to return the matching response. * @return */ private RTIRecorderAdminMessage waitFor(int msgId, RTIRecorderAdminMessageType msgType) { RTIRecorderAdminMessage result = null; int wrongData = 0, noData = 0; Date start = new Date(); while (result == null) { try { RTIRecorderAdminMessage recordAdmin = new RTIRecorderAdminMessage(); SampleInfo info = new SampleInfo(); dataReader.take_next_sample(recordAdmin, info); if (info.valid_data) { if (recordAdmin.msg._d == msgType && (msgId == 0 || recordAdmin.msg_id == msgId)) { result = recordAdmin; } else { wrongData++; } } else { logger.error("Unexpected invalid data for sample - ignoring"); } } catch (RETCODE_NO_DATA ex) { // This is expected frequently. When a command is issued, it often takes the service // a second to open, close, or perform other DB I/O functions. noData++; try { Thread.sleep(500); } catch (InterruptedException ex1) { logger.error("Sleep interrupted while waiting to get next sample.", ex1); } } Date now = new Date(); long waitTime = now.getTime() - start.getTime(); if(waitTime > 5000) { logger.warn(String.format( "Unusually long delay before waiting for %s msg with id %d; " + "Checking every 250ms, Wrong Data: %d, No Data: %d", msgType, msgId, wrongData, noData)); start = now; } } return result; } private void sendCommand(RTIRecorderAdminMessageType commandType) { RTIRecorderAdminMessage recordAdmin = (RTIRecorderAdminMessage) RTIRecorderAdminMessage.create(); recordAdmin.msg._d = commandType; recordAdmin.msg_id = adminMsgId; logger.debug(new Date().toString() + " - Sending command: " + commandType.toString()); dataWriter.write(recordAdmin, InstanceHandle_t.HANDLE_NIL); waitForOK(adminMsgId++); } /** * Start the recording the exercise. */ public void start() { sendCommand(RTI_RECORDER_ADMIN_START); } /** * Stop recording the exercise. This should close the recording data base file, so it can be * post-processed. */ public void stop() { sendCommand(RTI_RECORDER_ADMIN_STOP); } /** * Shutdown the recording service gracefully, we don't need it anymore. */ public void shutdown() { sendCommand(RTI_RECORDER_ADMIN_SHUTDOWN); // Remove the publisher, subscriber, topics, reader, and writer domainParticipant.delete_contained_entities(); // and finally the participant itself. DomainParticipantFactory.TheParticipantFactory.delete_participant(domainParticipant); } public void pause() { sendCommand(RTI_RECORDER_ADMIN_PAUSE); } public void resume() { sendCommand(RTI_RECORDER_ADMIN_RESUME); } public static void main(String[] args) { try { // Run rtiddsping with sample period of 0.01 (100 samples per second) RtiRecordControlSample recorder = new RtiRecordControlSample(1); recorder.init(); recorder.start(); Thread.sleep(3000); recorder.pause(); Thread.sleep(3000); recorder.resume(); Thread.sleep(3000); recorder.stop(); recorder.shutdown(); } catch (InterruptedException ex) { } } }