/* * (c) Copyright, Real-Time Innovations, 2020. All rights reserved. * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the software solely for use with RTI Connext DDS. Licensee may * redistribute copies of the software provided that all such copies are subject * to this license. The software is provided "as is", with no warranty of any * type, including any warranty for fitness for any purpose. RTI is under no * obligation to maintain or support the software. RTI shall not be liable for * any incidental or consequential damages arising out of the use or inability * to use the software. */ #include #include #include #include #include // for logging // alternatively, to include all the standard APIs: // // or to include both the standard APIs and extensions: // // // For more information about the headers and namespaces, see: // https://community.rti.com/static/documentation/connext-dds/6.1.0/doc/api/connext_dds/api_cpp2/group__DDSNamespaceModule.html // For information on how to use extensions, see: // https://community.rti.com/static/documentation/connext-dds/6.1.0/doc/api/connext_dds/api_cpp2/group__DDSCpp2Conventions.html #include "Test.hpp" #include "application.hpp" // for command line parsing and ctrl-c int process_data(dds::sub::DataReader reader) { // Take all samples int count = 0; dds::sub::LoanedSamples samples = reader.take(); for (const auto& sample : samples) { if (sample.info().valid()) { count++; std::cout << sample.data() << std::endl; } else { std::cout << "Instance state changed to " << sample.info().state().instance_state() << std::endl; } } return count; } // The LoanedSamples destructor returns the loan template class PushReader { public: PushReader(dds::domain::DomainParticipant &participant); virtual ~PushReader(); public: // virtual void on_data_available(dds::sub::DataReader &reader); protected: class Listener : public dds::sub::NoOpDataReaderListener { void on_data_available(dds::sub::DataReader &reader); }; private: dds::sub::DataReader _reader; dds::sub::Subscriber _subscriber; dds::topic::Topic _topic; }; template PushReader::PushReader(dds::domain::DomainParticipant &participant) : _reader(dds::core::null), _subscriber(dds::core::null), _topic(dds::core::null) { _subscriber = dds::sub::Subscriber(participant); _topic = dds::topic::Topic(participant, "Example MyData"); _reader = dds::sub::DataReader(_subscriber, _topic); _reader.set_listener(std::make_shared(), dds::core::status::StatusMask::data_available()); } template PushReader::~PushReader() { if (_reader != dds::core::null) { if (_reader.get_listener() != nullptr) { std::cout << "set listener to null " << std::endl; _reader.set_listener(nullptr); } } } template void PushReader::Listener::on_data_available(dds::sub::DataReader &reader) { process_data(reader); } void run_subscriber_application(unsigned int domain_id, unsigned int sample_count) { // DDS objects behave like shared pointers or value types // (see https://community.rti.com/best-practices/use-modern-c-types-correctly) // Start communicating in a domain, usually one participant per application dds::domain::DomainParticipant participant(domain_id); // Create a Topic with a name and a datatype //dds::topic::Topic topic(participant, "Example MyData"); // Create a Subscriber and DataReader with default Qos //dds::sub::Subscriber subscriber(participant); //dds::sub::DataReader reader(subscriber, topic); PushReader *reader = new PushReader(participant); std::cout << "*** Reader Created" << std::endl; //// Create a ReadCondition for any data received on this reader and set a //// handler to process the data //unsigned int samples_read = 0; //dds::sub::cond::ReadCondition read_condition( // reader, // dds::sub::status::DataState::any(), // [reader, &samples_read]() { samples_read += process_data(reader); }); //// WaitSet will be woken when the attached condition is triggered //dds::core::cond::WaitSet waitset; //waitset += read_condition; //while (!application::shutdown_requested && samples_read < sample_count) { // std::cout << "MyData subscriber sleeping up to 1 sec..." << std::endl; // // Run the handlers of the active conditions. Wait for up to 1 second. // //waitset.dispatch(dds::core::Duration(1)); //} for (int i = 0; i<10; ++i) { dds::core::Duration sleep_time(1); std::cout << "*** Sleeping..." << std::endl; rti::util::sleep(sleep_time); } delete reader; std::cout << "*** Reader deleted" << std::endl; } int main(int argc, char *argv[]) { using namespace application; // Parse arguments and handle control-C auto arguments = parse_arguments(argc, argv); if (arguments.parse_result == ParseReturn::exit) { return EXIT_SUCCESS; } else if (arguments.parse_result == ParseReturn::failure) { return EXIT_FAILURE; } setup_signal_handlers(); // Sets Connext verbosity to help debugging rti::config::Logger::instance().verbosity(arguments.verbosity); try { run_subscriber_application(arguments.domain_id, arguments.sample_count); } catch (const std::exception& ex) { // This will catch DDS exceptions std::cerr << "Exception in run_subscriber_application(): " << ex.what() << std::endl; return EXIT_FAILURE; } // Releases the memory used by the participant factory. Optional at // application exit dds::domain::DomainParticipant::finalize_participant_factory(); return EXIT_SUCCESS; }