/* example_subscriber.cxx */ #include #include #ifdef RTI_VX653 #include #endif #include "example.h" #include "exampleSupport.h" #include "ndds/ndds_cpp.h" #include "ndds/ndds_namespace_cpp.h" using namespace DDS; class HelloDataProcessor { public: virtual void data_available(DataReader* reader) { printf("data available for DataReader: %p on topic \"%s\"\n", reader, reader->get_topicdescription()->get_name()); HelloStructDataReader *HelloStruct_reader = NULL; HelloStructSeq data_seq; SampleInfoSeq info_seq; ReturnCode_t retcode; int i; HelloStruct_reader = HelloStructDataReader::narrow(reader); if (HelloStruct_reader == NULL) { printf("DataReader narrow error\n"); return; } retcode = HelloStruct_reader->take( data_seq, info_seq, LENGTH_UNLIMITED, ANY_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE); if (retcode == RETCODE_NO_DATA) { return; } else if (retcode != RETCODE_OK) { printf("take error %d\n", retcode); return; } for (i = 0; i < data_seq.length(); ++i) { if (info_seq[i].valid_data) { HelloStructTypeSupport::print_data(&data_seq[i]); } } retcode = HelloStruct_reader->return_loan(data_seq, info_seq); if (retcode != RETCODE_OK) { printf("return loan error %d\n", retcode); } } }; /* Delete all entities */ static int subscriber_shutdown( DomainParticipant *participant) { ReturnCode_t retcode; int status = 0; if (participant != NULL) { retcode = participant->delete_contained_entities(); if (retcode != RETCODE_OK) { printf("delete_contained_entities error %d\n", retcode); status = -1; } retcode = TheParticipantFactory->delete_participant(participant); if (retcode != RETCODE_OK) { printf("delete_participant error %d\n", retcode); status = -1; } } /* RTI Connext provides the finalize_instance() method on domain participant factory for people who want to release memory used by the participant factory. Uncomment the following block of code for clean destruction of the singleton. */ /* retcode = DomainParticipantFactory::finalize_instance(); if (retcode != RETCODE_OK) { printf("finalize_instance error %d\n", retcode); status = -1; } */ return status; } extern "C" int subscriber_main(int domainId, int sample_count) { DomainParticipant *participant = NULL; Subscriber *subscriber = NULL; Topic *topic = NULL; DataReader *reader = NULL; ReturnCode_t retcode; const char *type_name = NULL; int count = 0; Duration_t receive_period = {4,0}; int status = 0; /* To customize the participant QoS, use the configuration file USER_QOS_PROFILES.xml */ participant = TheParticipantFactory->create_participant( domainId, PARTICIPANT_QOS_DEFAULT, NULL /* listener */, STATUS_MASK_NONE); if (participant == NULL) { printf("create_participant error\n"); subscriber_shutdown(participant); return -1; } /* To customize the subscriber QoS, use the configuration file USER_QOS_PROFILES.xml */ subscriber = participant->create_subscriber( SUBSCRIBER_QOS_DEFAULT, NULL /* listener */, STATUS_MASK_NONE); if (subscriber == NULL) { printf("create_subscriber error\n"); subscriber_shutdown(participant); return -1; } /* Register the type before creating the topic */ type_name = HelloStructTypeSupport::get_type_name(); retcode = HelloStructTypeSupport::register_type( participant, type_name); if (retcode != RETCODE_OK) { printf("register_type error %d\n", retcode); subscriber_shutdown(participant); return -1; } /* To customize the topic QoS, use the configuration file USER_QOS_PROFILES.xml */ topic = participant->create_topic( "Example HelloStruct", type_name, TOPIC_QOS_DEFAULT, NULL /* listener */, STATUS_MASK_NONE); if (topic == NULL) { printf("create_topic error\n"); subscriber_shutdown(participant); return -1; } /* To customize the data reader QoS, use the configuration file USER_QOS_PROFILES.xml */ reader = subscriber->create_datareader( topic, DATAREADER_QOS_DEFAULT, NULL, STATUS_MASK_ALL); if (reader == NULL) { printf("create_datareader error\n"); subscriber_shutdown(participant); return -1; } DDS::WaitSet *myWaitSet = new WaitSet(); HelloStructDataReader *myDataReader = HelloStructDataReader::narrow(reader); DDS::StatusCondition *myCondition = myDataReader->get_statuscondition(); myCondition->set_enabled_statuses( DDS_SUBSCRIPTION_MATCHED_STATUS | DDS_LIVELINESS_CHANGED_STATUS | DDS_DATA_AVAILABLE_STATUS); myWaitSet->attach_condition(myCondition); //Now you can wait for any events on the DataReader DDSConditionSeq active_conditions; DDS_Duration_t timeout = { 30, 0 }; // 1 second DDS_SubscriptionMatchedStatus subsMatchedStatus; DDS_LivelinessChangedStatus livelinessChangedStatus; /* Create a helper class to process the data */ HelloDataProcessor *data_processor = new HelloDataProcessor(); // Dedicate the thread to handle the events as they come for (count=0; (sample_count == 0) || (count < sample_count); ++count) { DDS::ReturnCode_t retcode = myWaitSet->wait(active_conditions, timeout); if (retcode == DDS_RETCODE_TIMEOUT) { // Handle timeout printf("Wait TIMEOUT\n"); } else if (retcode == DDS_RETCODE_OK) { // Since this example has only one condition this will always be myDataReader, but // in the more general case of multiple conditions we can use the code below to access // the DataReader assocated with each active conditions returned by the myWaitSet->wait() // on the output parameter 'active_conditions' DDS::DataReader *activeDataReader = (DDS::DataReader *)myCondition->get_entity(); // IMPORTANT: must call dataReader->get_***_status() for all the triggered statuses // on the StatusCondion myCondition that caused the WaitSet to wakeup. Otherwise // the trigger status of the condition is not cleared and the next time we call // the 'wait' operation on the WaitSet it will wakeup immediately DDS_StatusMask triggered_statuses = activeDataReader->get_status_changes() & myCondition->get_enabled_statuses(); if ( triggered_statuses & DDS_SUBSCRIPTION_MATCHED_STATUS ) { // Handle DDS_SUBSCRIPTION_MATCHED_STATUS // WARNING. Must call activeDataReader->get_subscription_matched_status(); to clear it printf("Got: DDS_SUBSCRIPTION_MATCHED_STATUS for DataReader: %p on topic \"%s\"\n", activeDataReader, activeDataReader->get_topicdescription()->get_name()); activeDataReader->get_subscription_matched_status(subsMatchedStatus); } if ( triggered_statuses & DDS_LIVELINESS_CHANGED_STATUS ) { // Handle DDS_LIVELINESS_CHANGED_STATUS // WARNING. Must call myCondition->get_liveliness_changed_status(); to clear it printf("Got: DDS_LIVELINESS_CHANGED_STATUS for DataReader: %p on topic \"%s\"\n", activeDataReader, activeDataReader->get_topicdescription()->get_name()); activeDataReader->get_liveliness_changed_status(livelinessChangedStatus); } if ( triggered_statuses & DDS_DATA_AVAILABLE_STATUS ) { // Handle DDS_DATA_AVAILABLE_STATUS // WARNING. Must call activeDataReader->read() or similar to clear it; printf("Got: DDS_DATA_AVAILABLE_STATUS for DataReader: %p on topic \"%s\"\n", activeDataReader, activeDataReader->get_topicdescription()->get_name()); data_processor->data_available(myDataReader); } } } /* Delete all entities */ status = subscriber_shutdown(participant); delete data_processor; return status; } #if defined(RTI_WINCE) int wmain(int argc, wchar_t** argv) { int domainId = 0; int sample_count = 0; /* infinite loop */ if (argc >= 2) { domainId = _wtoi(argv[1]); } if (argc >= 3) { sample_count = _wtoi(argv[2]); } /* Uncomment this to turn on additional logging NDDSConfigLogger::get_instance()-> set_verbosity_by_category(NDDS_CONFIG_LOG_CATEGORY_API, NDDS_CONFIG_LOG_VERBOSITY_STATUS_ALL); */ return subscriber_main(domainId, sample_count); } #elif !(defined(RTI_VXWORKS) && !defined(__RTP__)) && !defined(RTI_PSOS) int main(int argc, char *argv[]) { int domainId = 0; int sample_count = 0; /* infinite loop */ if (argc >= 2) { domainId = atoi(argv[1]); } if (argc >= 3) { sample_count = atoi(argv[2]); } /* Uncomment this to turn on additional logging NDDSConfigLogger::get_instance()-> set_verbosity_by_category(NDDS_CONFIG_LOG_CATEGORY_API, NDDS_CONFIG_LOG_VERBOSITY_STATUS_ALL); */ return subscriber_main(domainId, sample_count); } #endif #ifdef RTI_VX653 const unsigned char* __ctype = *(__ctypePtrGet()); extern "C" void usrAppInit () { #ifdef USER_APPL_INIT USER_APPL_INIT; /* for backwards compatibility */ #endif /* add application specific code here */ taskSpawn("sub", RTI_OSAPI_THREAD_PRIORITY_NORMAL, 0x8, 0x150000, (FUNCPTR)subscriber_main, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0); } #endif