#include "TopicPublisher.h" struct ROSBridge::ComPrivate::ThreadData { const ROSBridge::CasePacker &casePacker; RosbridgeWsClient &rbc; std::mutex &rbcMutex; ROSBridge::ComPrivate::JsonQueue &queue; std::mutex &queueMutex; const std::atomic &running; std::condition_variable &cv; }; ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker, RosbridgeWsClient *rbc, std::mutex *rbcMutex) : _running(false) , _casePacker(casePacker) , _rbc(rbc) , _rbcMutex(rbcMutex) { } ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher() { this->reset(); } void ROSBridge::ComPrivate::TopicPublisher::start() { if ( _running.load() ) // start called while thread running. return; _running.store(true); ROSBridge::ComPrivate::ThreadData data{ *_casePacker, *_rbc, *_rbcMutex, _queue, _queueMutex, _running, _cv }; _pThread = std::make_unique(&ROSBridge::ComPrivate::transmittLoop, data); } void ROSBridge::ComPrivate::TopicPublisher::reset() { if ( !_running.load() ) // stop called while thread not running. return; _running.store(false); _cv.notify_one(); // Wake publisher thread. if ( !_pThread ) return; _pThread->join(); _queue.clear(); } void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data) { // Init. ClientMap clientMap; // { // std::lock_guard lk(data.rbcMutex); // data.rbc.addClient(ROSBridge::ComPrivate::_topicAdvertiserKey); // data.rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey); // } // Main Loop. while(data.running.load()){ std::unique_lock lk(data.queueMutex); // Check if new data available, wait if not. if (data.queue.empty()){ data.cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case. continue; } // Pop message from queue. JsonDocUPtr pJsonDoc(std::move(data.queue.front())); data.queue.pop_front(); lk.unlock(); // Get tag from Json message and remove it. Tag tag; bool ret = data.casePacker.getTag(pJsonDoc, tag); assert(ret); // Json message does not contain a tag; (void)ret; data.casePacker.removeTag(pJsonDoc); // Check if topic must be advertised. // Advertised topics are stored in advertisedTopicsHashList as // a hash. std::string clientName = ROSBridge::ComPrivate::_topicAdvertiserKey + tag.topic(); HashType hash = ROSBridge::ComPrivate::getHash(clientName); auto it = clientMap.find(hash); if ( it == clientMap.end()) { // Need to advertise topic? clientMap.insert(std::make_pair(hash, clientName)); std::cout << clientName << ";" << tag.topic() << ";" << tag.messageType() << ";" << std::endl; { std::lock_guard lk(data.rbcMutex); data.rbc.addClient(clientName); data.rbc.advertise(clientName, tag.topic(), tag.messageType() ); } } // Publish Json message. { std::lock_guard lk(data.rbcMutex); data.rbc.publish(tag.topic(), *pJsonDoc.get()); } } // while loop // Tidy up. { std::lock_guard lk(data.rbcMutex); for (auto& it : clientMap) data.rbc.removeClient(it.second); } }