#include "TopicPublisher.h" ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker, RosbridgeWsClient *rbc) : _running(false) , _casePacker(casePacker) , _rbc(rbc) { } ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher() { this->reset(); } void ROSBridge::ComPrivate::TopicPublisher::start() { if ( _running.load() ) // start called while thread running. return; _running.store(true); _rbc->addClient(ROSBridge::ComPrivate::_topicAdvertiserKey); _pThread.reset(new std::thread(&ROSBridge::ComPrivate::transmittLoop, std::cref(*_casePacker), std::ref(*_rbc), std::ref(_queue), std::ref(_queueMutex), std::ref(_advertisedTopicsHashList), std::cref(_running))); } void ROSBridge::ComPrivate::TopicPublisher::reset() { if ( !_running.load() ) // stop called while thread not running. return; _running.store(false); if ( !_pThread ) return; _pThread->join(); _pThread.reset(); _rbc->removeClient(ROSBridge::ComPrivate::_topicAdvertiserKey); _queue.clear(); _advertisedTopicsHashList.clear(); } void ROSBridge::ComPrivate::transmittLoop(const ROSBridge::CasePacker &casePacker, RosbridgeWsClient &rbc, ROSBridge::ComPrivate::JsonQueue &queue, std::mutex &queueMutex, HashSet &advertisedTopicsHashList, const std::atomic &running) { rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey); while(running.load()){ // Pop message from queue. queueMutex.lock(); //std::cout << "Queue size: " << queue.size() << std::endl; if (queue.empty()){ queueMutex.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(20)); continue; } JsonDocUPtr pJsonDoc(std::move(queue.front())); queue.pop_front(); queueMutex.unlock(); // Debug output. // std::cout << "Transmitter loop json document:" << std::endl; // rapidjson::OStreamWrapper out(std::cout); // rapidjson::Writer writer(out); // pJsonDoc->Accept(writer); // std::cout << std::endl << std::endl; // Get tag from Json message and remove it. Tag tag; bool ret = casePacker.getTag(pJsonDoc, tag); assert(ret); // Json message does not contain a tag; (void)ret; casePacker.removeTag(pJsonDoc); // Check if topic must be advertised. // Advertised topics are stored in advertisedTopicsHashList as // a hash. HashType hash = ROSBridge::ComPrivate::getHash(tag.topic()); if ( advertisedTopicsHashList.count(hash) == 0) { advertisedTopicsHashList.insert(hash); rbc.advertise(ROSBridge::ComPrivate::_topicAdvertiserKey, tag.topic(), tag.messageType() ); } // Debug output. //std::cout << "Hash Set size: " << advertisedTopicsHashList.size() << std::endl; // Send Json message. rbc.publish(tag.topic(), *pJsonDoc.get()); } // while loop }