#include "TopicPublisher.h" void ROSBridge::ComPrivate::transmittLoop(const ROSBridge::CasePacker &casePacker, RosbridgeWsClient &rbc, ROSBridge::ComPrivate::JsonQueue &queue, std::mutex &queueMutex, const std::atomic &stopFlag) { rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey); while(!stopFlag.load()){ // Pop message from queue. queueMutex.lock(); //std::cout << "Queue size: " << queue.size() << std::endl; if (queue.empty()){ queueMutex.unlock(); std::this_thread::sleep_for(std::chrono::milliseconds(20)); continue; } JsonDocUPtr pJsonDoc(std::move(queue.front())); queue.pop_front(); queueMutex.unlock(); // Debug output. // std::cout << "Transmitter loop json document:" << std::endl; // rapidjson::OStreamWrapper out(std::cout); // rapidjson::Writer writer(out); // pJsonDoc->Accept(writer); // std::cout << std::endl << std::endl; // Get tag from Json message and remove it. Tag tag; bool ret = casePacker.getTag(*pJsonDoc.get(), tag); assert(ret); // Json message does not contain a tag; (void)ret; casePacker.removeTag(*pJsonDoc.get()); // Send Json message. rbc.publish(tag.topic(), *pJsonDoc.get()); } } ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker, RosbridgeWsClient *rbc) : _stopFlag(true) , _casePacker(casePacker) , _rbc(rbc) { } ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher() { this->stop(); } void ROSBridge::ComPrivate::TopicPublisher::start() { if ( !_stopFlag.load() ) // start called while thread running. return; _stopFlag.store(false); _pThread.reset(new std::thread(&ROSBridge::ComPrivate::transmittLoop, std::cref(*_casePacker), std::ref(*_rbc), std::ref(_queue), std::ref(_queueMutex), std::cref(_stopFlag))); } void ROSBridge::ComPrivate::TopicPublisher::stop() { if ( _stopFlag.load() ) // start called while thread not running. return; _stopFlag.store(true); if ( !_pThread ) return; _pThread->join(); _pThread.reset(); }