#include "TopicPublisher.h" struct ROSBridge::ComPrivate::ThreadData { const ROSBridge::CasePacker &casePacker; RosbridgeWsClient &rbc; std::mutex &rbcMutex; ROSBridge::ComPrivate::JsonQueue &queue; std::mutex &queueMutex; ROSBridge::ComPrivate::HashSet &advertisedTopicsHashList; const std::atomic &running; std::condition_variable &cv; }; ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker, RosbridgeWsClient *rbc, std::mutex *rbcMutex) : _running(false) , _casePacker(casePacker) , _rbc(rbc) , _rbcMutex(rbcMutex) { } ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher() { this->reset(); } void ROSBridge::ComPrivate::TopicPublisher::start() { if ( _running.load() ) // start called while thread running. return; _running.store(true); ROSBridge::ComPrivate::ThreadData data{ *_casePacker, *_rbc, *_rbcMutex, _queue, _queueMutex, _advertisedTopicsHashList, _running, _cv }; _pThread = std::make_unique(&ROSBridge::ComPrivate::transmittLoop, data); } void ROSBridge::ComPrivate::TopicPublisher::reset() { if ( !_running.load() ) // stop called while thread not running. return; _running.store(false); _cv.notify_one(); // Wake publisher thread. if ( !_pThread ) return; _pThread->join(); _queue.clear(); _advertisedTopicsHashList.clear(); } void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data) { // Init. { std::lock_guard lk(data.rbcMutex); data.rbc.addClient(ROSBridge::ComPrivate::_topicAdvertiserKey); data.rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey); } // Main Loop. while(data.running.load()){ std::unique_lock lk(data.queueMutex); // Check if new data available, wait if not. if (data.queue.empty()){ data.cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case. continue; } // Pop message from queue. JsonDocUPtr pJsonDoc(std::move(data.queue.front())); data.queue.pop_front(); lk.unlock(); // Get tag from Json message and remove it. Tag tag; bool ret = data.casePacker.getTag(pJsonDoc, tag); assert(ret); // Json message does not contain a tag; (void)ret; data.casePacker.removeTag(pJsonDoc); // Check if topic must be advertised. // Advertised topics are stored in advertisedTopicsHashList as // a hash. HashType hash = ROSBridge::ComPrivate::getHash(tag.topic()); if ( data.advertisedTopicsHashList.count(hash) == 0) { data.advertisedTopicsHashList.insert(hash); { std::cout << ROSBridge::ComPrivate::_topicAdvertiserKey << ";" << tag.topic() << ";" << tag.messageType() << ";" << std::endl; std::lock_guard lk(data.rbcMutex); data.rbc.advertise(ROSBridge::ComPrivate::_topicAdvertiserKey, tag.topic(), tag.messageType() ); } } // Publish Json message. { std::lock_guard lk(data.rbcMutex); data.rbc.publish(tag.topic(), *pJsonDoc.get()); } } // while loop // Tidy up. { std::lock_guard lk(data.rbcMutex); data.rbc.removeClient(ROSBridge::ComPrivate::_topicAdvertiserKey); data.rbc.removeClient(ROSBridge::ComPrivate::_topicPublisherKey); } }