#include "topic_publisher.h" #include ros_bridge::com_private::TopicPublisher::TopicPublisher(CasePacker &casePacker, RosbridgeWsClient &rbc) : _stopped(std::make_shared(true)) , _casePacker(casePacker) , _rbc(rbc) { } ros_bridge::com_private::TopicPublisher::~TopicPublisher() { this->reset(); } void ros_bridge::com_private::TopicPublisher::start() { if ( !_stopped->load() ) // start called while thread running. return; _stopped->store(false); _pThread = std::make_unique([this]{ // Init. std::unordered_map topicMap; // Main Loop. while( !this->_stopped->load() ){ std::unique_lock lk(this->_mutex); // Check if new data available, wait if not. if (this->_queue.empty()){ if ( _stopped->load() ) // Check condition again while holding the lock. break; this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case. continue; } // Pop message from queue. JsonDocUPtr pJsonDoc = std::move(this->_queue.front()); this->_queue.pop_front(); lk.unlock(); // Get topic and type from Json message and remove it. std::string topic; assert(com_private::getTopic(*pJsonDoc, topic)); assert(com_private::removeTopic(*pJsonDoc)); std::string type; assert(com_private::getType(*pJsonDoc, type)); assert(com_private::removeType(*pJsonDoc)); // Check if topic must be advertised. std::string clientName = ros_bridge::com_private::_topicAdvertiserKey + topic; auto it = topicMap.find(clientName); if ( it == topicMap.end()) { // Need to advertise topic? topicMap.insert(std::make_pair(clientName, topic)); this->_rbc.addClient(clientName); this->_rbc.advertise(clientName, topic, type); this->_rbc.waitForTopic(topic, [this]{ return this->_stopped->load(); }); // Wait until topic is advertised. } // Publish Json message. if ( !this->_stopped->load() ) this->_rbc.publish(topic, *pJsonDoc); } // while loop // Tidy up. for (auto& it : topicMap){ this->_rbc.unadvertise(it.second); this->_rbc.removeClient(it.first); } std::cout << "TopicPublisher: publisher thread terminated." << std::endl; }); } void ros_bridge::com_private::TopicPublisher::reset() { if ( _stopped->load() ) // stop called while thread not running. return; std::unique_lock lk(_mutex); _stopped->store(true); _cv.notify_one(); // Wake publisher thread. lk.unlock(); if ( !_pThread ) return; _pThread->join(); lk.lock(); _queue.clear(); } void ros_bridge::com_private::TopicPublisher::publish( ros_bridge::com_private::JsonDocUPtr docPtr, const char *topic){ docPtr->AddMember("topic", topic, doc->GetAllocator()); std::unique_lock lock(_mutex); _queue.push_back(std::move(docPtr)); lock.unlock(); _cv.notify_one(); // Wake publisher thread. }