/* * (c) Copyright, Real-Time Innovations, 2021. All rights reserved. * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the software solely for use with RTI Connext DDS. Licensee may * redistribute copies of the software provided that all such copies are subject * to this license. The software is provided "as is", with no warranty of any * type, including any warranty for fitness for any purpose. RTI is under no * obligation to maintain or support the software. RTI shall not be liable for * any incidental or consequential damages arising out of the use or inability * to use the software. */ using System; using System.Threading; using Omg.Dds.Core; using Rti.Dds.Core; using Rti.Dds.Domain; using Rti.Dds.Publication; using Rti.Dds.Topics; namespace FlowControllerExample { /// /// Example application that publishes FlowControllerExample.HelloWorld. /// public static class HelloWorldPublisher { /// /// Runs the publisher example. /// public static void RunPublisher( int domainId, int sampleCount, uint tokenBucketPeriodMs, CancellationToken cancellationToken) { // Create a custom flow controller configuration DomainParticipantQos participantQos = ConfigureFlowController( tokenBucketPeriodMs); // Start communicating in a domain, usually one participant per application using DomainParticipant participant = DomainParticipantFactory.Instance.CreateParticipant(domainId, participantQos); // A Topic has a name and a datatype. Topic topic = participant.CreateTopic("Example cfc"); // Create a Publisher Publisher publisher = participant.CreatePublisher(); // Create a writer with the QoS specified in the default profile // in USER_QOS_PROFILES.xml, which sets up the publish mode policy DataWriter writer = publisher.CreateDataWriter(topic); // Create a sample to write with a long payload var sample = new HelloWorld { str = new string('a', 999) }; for (int count = 0; count < sampleCount && !cancellationToken.IsCancellationRequested; count++) { // Simulate a bursty writer by sending 10 samples at a time, // after sleeping for a period if (count % 10 == 0) { Thread.Sleep(1000); } // Modify the data to be sent here sample.x = count; Console.WriteLine($"Writing x={sample.x}"); // With asynchronous publish mode, the Write() puts the sample // in the queue but doesn't send it until the flow controller // indicates so. writer.Write(sample); } try { // Wait until all written samples have been actually published writer.WaitForAsynchronousPublishing(maxWait: Duration.FromSeconds(10)); // And wait until the DataReader has acknowledged them writer.WaitForAcknowledgments(maxWait: Duration.FromSeconds(10)); } catch (TimeoutException) { Console.WriteLine("Timed out waiting to publish all samples"); } } private static DomainParticipantQos ConfigureFlowController( uint tokenBucketPeriodMs) { // We'll get the QoS defined in the XML profile "cfc_Profile" // (the default profile in USER_QOS_PROFILES.xml) and tweak the // token bucket period DomainParticipantQos baseQos = QosProvider.Default.GetDomainParticipantQos(); if (tokenBucketPeriodMs == 0) { return baseQos; } // This is the name we use in the XML file const string flowControllerName = "dds.flow_controller.token_bucket.custom_flowcontroller"; // baseQos.WithProperty(...) creates a new DomainParticipantQos // object with the changes we specify in the lambda function. return baseQos.WithProperty(property => { // In WithProperty the input argument 'property' contains the // current values of baseQos.Property. We will just modify these // two properties. var period = Duration.FromMilliseconds(tokenBucketPeriodMs); property[$"{flowControllerName}.token_bucket.period.sec"] = period.Seconds.ToString(); property[$"{flowControllerName}.token_bucket.period.nanosec"] = period.Nanoseconds.ToString(); }); } } } // namespace FlowControllerExample