#include "TopicPublisher.h" #include ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker &casePacker, RosbridgeWsClient &rbc) : _stopped(std::make_shared(true)) , _casePacker(casePacker) , _rbc(rbc) { } ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher() { this->reset(); } void ROSBridge::ComPrivate::TopicPublisher::start() { if ( !_stopped->load() ) // start called while thread running. return; _stopped->store(false); _pThread = std::make_unique([this]{ // Init. std::unordered_map topicMap; // Main Loop. while( !this->_stopped->load() ){ JsonDocUPtr pJsonDoc; { std::unique_lock lk(this->_mutex); // Check if new data available, wait if not. if (this->_queue.empty()){ if ( _stopped->load() ) // Check condition again while holding the lock. break; this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case. continue; } // Pop message from queue. pJsonDoc = std::move(this->_queue.front()); this->_queue.pop_front(); } // Get tag from Json message and remove it. Tag tag; bool ret = this->_casePacker.getTag(pJsonDoc, tag); assert(ret); // Json message does not contain a tag; (void)ret; this->_casePacker.removeTag(pJsonDoc); // Check if topic must be advertised. // Advertised topics are stored in advertisedTopicsHashList as // a hash. std::string clientName = ROSBridge::ComPrivate::_topicAdvertiserKey + tag.topic(); auto it = topicMap.find(clientName); if ( it == topicMap.end()) { // Need to advertise topic? topicMap.insert(std::make_pair(clientName, tag.topic())); this->_rbc.addClient(clientName); this->_rbc.advertise(clientName, tag.topic(), tag.messageType() ); this->_rbc.waitForTopic(tag.topic(), [this]{ return this->_stopped->load(); }); // Wait until topic is advertised. } // Publish Json message. if ( !this->_stopped->load() ) this->_rbc.publish(tag.topic(), *pJsonDoc.get()); } // while loop // Tidy up. for (auto& it : topicMap){ this->_rbc.unadvertise(it.second); this->_rbc.removeClient(it.first); } std::cout << "TopicPublisher: publisher thread terminated." << std::endl; }); } void ROSBridge::ComPrivate::TopicPublisher::reset() { if ( _stopped->load() ) // stop called while thread not running. return; { std::lock_guard lk(_mutex); std::cout << "TopicPublisher: _stopped->store(true)." << std::endl; _stopped->store(true); std::cout << "TopicPublisher: ask publisher thread to stop." << std::endl; _cv.notify_one(); // Wake publisher thread. } if ( !_pThread ) return; _pThread->join(); std::cout << "TopicPublisher: publisher thread joined." << std::endl; { _queue.clear(); std::cout << "TopicPublisher: queue cleard." << std::endl; } }