#include "TopicPublisher.h" struct ROSBridge::ComPrivate::ThreadData { const ROSBridge::CasePacker &casePacker; RosbridgeWsClient &rbc; std::mutex &rbcMutex; ROSBridge::ComPrivate::JsonQueue &queue; std::mutex &queueMutex; ROSBridge::ComPrivate::HashSet &advertisedTopicsHashList; const std::atomic &running; std::condition_variable &cv; }; ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker, RosbridgeWsClient *rbc, std::mutex *rbcMutex) : _running(false) , _casePacker(casePacker) , _rbc(rbc) , _rbcMutex(rbcMutex) { } ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher() { this->reset(); } void ROSBridge::ComPrivate::TopicPublisher::start() { if ( _running.load() ) // start called while thread running. return; _running.store(true); { std::lock_guard lk(*_rbcMutex); _rbc->addClient(ROSBridge::ComPrivate::_topicAdvertiserKey); _rbc->addClient(ROSBridge::ComPrivate::_topicPublisherKey); } ROSBridge::ComPrivate::ThreadData data{ *_casePacker, *_rbc, *_rbcMutex, _queue, _queueMutex, _advertisedTopicsHashList, _running, _cv }; _pThread = std::make_unique(&ROSBridge::ComPrivate::transmittLoop, data); } void ROSBridge::ComPrivate::TopicPublisher::reset() { if ( !_running.load() ) // stop called while thread not running. return; _running.store(false); _cv.notify_one(); // Wake publisher thread. if ( !_pThread ) return; _pThread->join(); { std::lock_guard lk(*_rbcMutex); _rbc->removeClient(ROSBridge::ComPrivate::_topicAdvertiserKey); _rbc->removeClient(ROSBridge::ComPrivate::_topicPublisherKey); } _queue.clear(); _advertisedTopicsHashList.clear(); } void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data) { while(data.running.load()){ std::unique_lock lk(data.queueMutex); // Check if new data available, wait if not. if (data.queue.empty()){ data.cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case. continue; } // Pop message from queue. JsonDocUPtr pJsonDoc(std::move(data.queue.front())); data.queue.pop_front(); lk.unlock(); // Debug output. // std::cout << "Transmitter loop json document:" << std::endl; // rapidjson::OStreamWrapper out(std::cout); // rapidjson::Writer writer(out); // pJsonDoc->Accept(writer); // std::cout << std::endl << std::endl; // Get tag from Json message and remove it. Tag tag; bool ret = data.casePacker.getTag(pJsonDoc, tag); assert(ret); // Json message does not contain a tag; (void)ret; data.casePacker.removeTag(pJsonDoc); // Check if topic must be advertised. // Advertised topics are stored in advertisedTopicsHashList as // a hash. HashType hash = ROSBridge::ComPrivate::getHash(tag.topic()); if ( data.advertisedTopicsHashList.count(hash) == 0) { data.advertisedTopicsHashList.insert(hash); { std::lock_guard lk(data.rbcMutex); data.rbc.advertise(ROSBridge::ComPrivate::_topicAdvertiserKey, tag.topic(), tag.messageType() ); } } // Debug output. //std::cout << "Hash Set size: " << advertisedTopicsHashList.size() << std::endl; // Send Json message. { std::lock_guard lk(data.rbcMutex); data.rbc.publish(tag.topic(), *pJsonDoc.get()); } } // while loop }