// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. /*! * @file IPCPublisher.hpp * This header file contains the declaration of the publisher functions. * * This file was generated by the tool fastddsgen. */ #ifndef FAST_DDS_GENERATED__IPCPublisher_HPP #define FAST_DDS_GENERATED__IPCPublisher_HPP #include <condition_variable> #include <fastdds/dds/domain/DomainParticipant.hpp> #include <fastdds/dds/domain/DomainParticipantFactory.hpp> #include <fastdds/dds/publisher/DataWriterListener.hpp> #include <fastdds/dds/topic/TypeSupport.hpp> #include "IPublisher.h" #include "test.hpp" #include <string> #include <fastdds/dds/domain/DomainParticipantFactory.hpp> #include <fastdds/dds/log/Log.hpp> #include <fastdds/dds/publisher/DataWriter.hpp> #include <fastdds/dds/publisher/Publisher.hpp> #include <fastdds/dds/publisher/qos/DataWriterQos.hpp> #include <fastdds/dds/publisher/qos/PublisherQos.hpp> //#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.hpp> #include "testPubSubTypes.hpp" using namespace eprosima::fastdds::dds; class IPCPublisherBase { public: virtual ~IPCPublisherBase() = default; virtual void stop() = 0; }; template<typename MessageT> class IPCPublisher : public IPCPublisherBase, public eprosima::fastdds::dds::DataWriterListener { public: IPCPublisher(const int& domain_id, const std::string& topicName) : factory_(nullptr) , participant_(nullptr) , publisher_(nullptr) , topic_(nullptr) , writer_(nullptr) , type_(new typename MakePubSubType<MessageT>::Type) , matched_(0) , samples_sent_(0) , stop_(false) , topicName_(topicName) { // Create the participant DomainParticipantQos pqos = PARTICIPANT_QOS_DEFAULT; pqos.name("HelloSecurity_pub_participant"); factory_ = DomainParticipantFactory::get_shared_instance(); // PropertyPolicy property_policy; // property_policy.properties().properties().emplace_back("fastrtps.transport", "SHM"); participant_ = factory_->create_participant(domain_id, pqos, nullptr, StatusMask::none()); if (participant_ == nullptr) { throw std::runtime_error("HelloSecurity Participant initialization failed"); } // Register the type type_.register_type(participant_); // Create the publisher PublisherQos pub_qos = PUBLISHER_QOS_DEFAULT; participant_->get_default_publisher_qos(pub_qos); publisher_ = participant_->create_publisher(pub_qos, nullptr, StatusMask::none()); if (publisher_ == nullptr) { throw std::runtime_error("HelloSecurity Publisher initialization failed"); } // Create the topic TopicQos topic_qos = TOPIC_QOS_DEFAULT; participant_->get_default_topic_qos(topic_qos); topic_ = participant_->create_topic(topicName_, type_.get_type_name(), topic_qos); if (topic_ == nullptr) { throw std::runtime_error("HelloSecurity Topic initialization failed"); } // Create the data writer DataWriterQos writer_qos = DATAWRITER_QOS_DEFAULT; publisher_->get_default_datawriter_qos(writer_qos); writer_qos.reliability().kind = ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS; writer_qos.durability().kind = DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS; writer_qos.history().kind = HistoryQosPolicyKind::KEEP_ALL_HISTORY_QOS; writer_ = publisher_->create_datawriter(topic_, writer_qos, this, StatusMask::all()); if (writer_ == nullptr) { throw std::runtime_error("HelloSecurity DataWriter initialization failed"); } }; ~IPCPublisher() {} //! Publisher matched method void on_publication_matched( eprosima::fastdds::dds::DataWriter* writer, const eprosima::fastdds::dds::PublicationMatchedStatus& info) override { if (info.current_count_change == 1) { { std::lock_guard<std::mutex> lock(mutex_); matched_ = info.current_count; } std::cout << "HelloSecurity Publisher matched." << std::endl; cv_.notify_one(); } else if (info.current_count_change == -1) { { std::lock_guard<std::mutex> lock(mutex_); matched_ = info.current_count; } std::cout << "HelloSecurity Publisher unmatched." << std::endl; } else { std::cout << info.current_count_change << " is not a valid value for PublicationMatchedStatus current count change" << std::endl; } } //! Run publisher void run() {} //! Trigger the end of execution void stop() { stop_.store(true); cv_.notify_one(); } std::string getTopic() const { return topicName_; } //! Publish a sample bool publish(const MessageT& msg) { bool ret = false; // Wait for the data endpoints discovery std::unique_lock<std::mutex> matched_lock(mutex_); cv_.wait(matched_lock, [&]() { // at least one has been discovered return ((matched_ > 0) || is_stopped()); }); if (!is_stopped()) { ret = (RETCODE_OK == writer_->write(&msg)); } return ret; } private: //! Return the current state of execution bool is_stopped() { return stop_.load(); } std::shared_ptr<eprosima::fastdds::dds::DomainParticipantFactory> factory_; eprosima::fastdds::dds::DomainParticipant* participant_; eprosima::fastdds::dds::Publisher* publisher_; eprosima::fastdds::dds::Topic* topic_; eprosima::fastdds::dds::DataWriter* writer_; eprosima::fastdds::dds::TypeSupport type_; std::condition_variable cv_; int32_t matched_; std::mutex mutex_; const uint32_t period_ms_ = 100; // in ms uint16_t samples_sent_; std::atomic<bool> stop_; std::string topicName_; }; #endif // FAST_DDS_GENERATED__IPCPublisher_HPP