#include "TopicPublisher.h" struct ROSBridge::ComPrivate::ThreadData { const ROSBridge::CasePacker &casePacker; RosbridgeWsClient &rbc; ROSBridge::ComPrivate::JsonQueue &queue; std::mutex &queueMutex; const std::atomic &running; std::condition_variable &cv; }; ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker &casePacker, RosbridgeWsClient &rbc) : _running(false) , _casePacker(casePacker) , _rbc(rbc) { } ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher() { this->reset(); } void ROSBridge::ComPrivate::TopicPublisher::start() { if ( _running.load() ) // start called while thread running. return; _running.store(true); ROSBridge::ComPrivate::ThreadData data{ _casePacker, _rbc, _queue, _queueMutex, _running, _cv }; _pThread = std::make_unique(&ROSBridge::ComPrivate::transmittLoop, data); } void ROSBridge::ComPrivate::TopicPublisher::reset() { if ( !_running.load() ) // stop called while thread not running. return; _running.store(false); _cv.notify_one(); // Wake publisher thread. if ( !_pThread ) return; _pThread->join(); _queue.clear(); } void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data) { // Init. ClientMap clientMap; // Main Loop. while(data.running.load()){ std::unique_lock lk(data.queueMutex); // Check if new data available, wait if not. if (data.queue.empty()){ data.cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case. continue; } // Pop message from queue. JsonDocUPtr pJsonDoc(std::move(data.queue.front())); data.queue.pop_front(); lk.unlock(); // Get tag from Json message and remove it. Tag tag; bool ret = data.casePacker.getTag(pJsonDoc, tag); assert(ret); // Json message does not contain a tag; (void)ret; data.casePacker.removeTag(pJsonDoc); // Check if topic must be advertised. // Advertised topics are stored in advertisedTopicsHashList as // a hash. std::string clientName = ROSBridge::ComPrivate::_topicAdvertiserKey + tag.topic(); HashType hash = ROSBridge::ComPrivate::getHash(clientName); auto it = clientMap.find(hash); if ( it == clientMap.end()) { // Need to advertise topic? clientMap.insert(std::make_pair(hash, clientName)); data.rbc.addClient(clientName); data.rbc.advertise(clientName, tag.topic(), tag.messageType() ); } // Publish Json message. data.rbc.publish(tag.topic(), *pJsonDoc.get()); } // while loop // Tidy up. for (auto& it : clientMap) data.rbc.removeClient(it.second); }