/* * (c) Copyright, Real-Time Innovations, 2021. All rights reserved. * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the software solely for use with RTI Connext DDS. Licensee may * redistribute copies of the software provided that all such copies are subject * to this license. The software is provided "as is", with no warranty of any * type, including any warranty for fitness for any purpose. RTI is under no * obligation to maintain or support the software. RTI shall not be liable for * any incidental or consequential damages arising out of the use or inability * to use the software. */ using System; using System.Threading; using Omg.Dds.Core; using Rti.Dds.Core; using Rti.Dds.Core.Policy; using Rti.Dds.Domain; using Rti.Dds.Publication; using Rti.Dds.Topics; namespace AsyncPublicationExample { /// /// Example application that publishes AsyncPublicationExample.HelloWorld. /// public static class HelloWorldPublisher { /// /// Runs the publisher example. /// public static void RunPublisher( int domainId, int sampleCount, CancellationToken cancellationToken, bool useXmlQos = false) { // Start communicating in a domain, usually one participant per application // Load default QoS profile from USER_QOS_PROFILES.xml file using DomainParticipant participant = DomainParticipantFactory.Instance.CreateParticipant(domainId); // A Topic has a name and a datatype. Topic topic = participant.CreateTopic("Example async"); // Create a Publisher Publisher publisher = participant.CreatePublisher(); DataWriterQos writerQos; if (useXmlQos) { // Retrieve the DataWriterQos from the default profile in // USER_QOS_PROFILES.xml writerQos = QosProvider.Default.GetDataWriterQos(); } else { // Configure the DataWriterQos in code, to the same effect as // the XML file. writerQos = publisher.DefaultDataWriterQos .WithReliability(policy => { policy.Kind = ReliabilityKind.Reliable; policy.MaxBlockingTime = Duration.FromSeconds(60); }) .WithHistory(policy => { policy.Kind = HistoryKind.KeepLast; policy.Depth = 12; }) .WithProtocol(policy => { policy.RtpsReliableWriter.MinSendWindowSize = 50; policy.RtpsReliableWriter.MaxSendWindowSize = 50; }) .WithPublishMode(policy => { policy.Kind = PublishModeKind.Asynchronous; // The built-in fixed-rate flow controller publishes // all written samples once per second policy.FlowControllerName = PublishMode.FixedRateFlowControllerName; }); } DataWriter writer = publisher.CreateDataWriter(topic, writerQos); var sample = new HelloWorld(); for (int count = 0; count < sampleCount && !cancellationToken.IsCancellationRequested; count++) { // Modify the data to be sent here sample.x = count; Console.WriteLine($"Writing x={sample.x}"); // With asynchronous publish mode, the Write() puts the sample // in the queue but doesn't send it until the flow controller // indicates so. writer.Write(sample); Thread.Sleep(100); } // You can wait until all written samples have been actually published // (note that this doesn't ensure that they have actually been received // if the sample required retransmission) writer.WaitForAsynchronousPublishing(Duration.FromSeconds(10)); } } } // namespace AsyncPublicationExample