#include "topic_publisher.h" #include ros_bridge::com_private::TopicPublisher::TopicPublisher(RosbridgeWsClient &rbc) : _stopped(std::make_shared(true)) , _rbc(rbc) { } ros_bridge::com_private::TopicPublisher::~TopicPublisher() { this->reset(); } void ros_bridge::com_private::TopicPublisher::start() { if ( !_stopped->load() ) // start called while thread running. return; _stopped->store(false); _pThread = std::make_unique([this]{ // Main Loop. while( !this->_stopped->load() ){ std::unique_lock lk(this->_mutex); // Check if new data available, wait if not. if (this->_queue.empty()){ if ( _stopped->load() ) // Check condition again while holding the lock. break; this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case. continue; } // Pop message from queue. JsonDocUPtr pJsonDoc = std::move(this->_queue.front()); this->_queue.pop_front(); lk.unlock(); // Get topic and type from Json message and remove it. std::string topic; bool ret = com_private::getTopic(*pJsonDoc, topic); assert(ret); (void)ret; ret = com_private::removeTopic(*pJsonDoc); assert(ret); (void)ret; // Wait for topic (Rosbridge needs some time to process a advertise() request). this->_rbc.waitForTopic(topic, [this]{ return this->_stopped->load(); }); // Publish Json message. if ( !this->_stopped->load() ) this->_rbc.publish(topic, *pJsonDoc); } // while loop #ifdef ROS_BRIDGE_DEBUG std::cout << "TopicPublisher: publisher thread terminated." << std::endl; #endif }); } void ros_bridge::com_private::TopicPublisher::reset() { if ( _stopped->load() ) // stop called while thread not running. return; std::unique_lock lk(_mutex); _stopped->store(true); _cv.notify_one(); // Wake publisher thread. lk.unlock(); if ( !_pThread ) return; _pThread->join(); lk.lock(); // Tidy up. for (auto& it : this->_topicMap){ this->_rbc.unadvertise(it.first); this->_rbc.removeClient(it.second); } this->_topicMap.clear(); _queue.clear(); } void ros_bridge::com_private::TopicPublisher::publish( ros_bridge::com_private::JsonDocUPtr docPtr, const char *topic) { // Check if topic was advertised. std::string t(topic); std::unique_lock lk(this->_mutex); auto it = this->_topicMap.find(t); if ( it == this->_topicMap.end()) { // Not yet advertised? lk.unlock(); #ifdef ROS_BRIDGE_DEBUG std::cerr << "TopicPublisher: topic " << t << " not yet advertised" << std::endl; #endif return; } lk.unlock(); // Add topic information to json doc. auto &allocator = docPtr->GetAllocator(); rapidjson::Value key("topic", allocator); rapidjson::Value value(topic, allocator); docPtr->AddMember(key, value, allocator); lk.lock(); _queue.push_back(std::move(docPtr)); lk.unlock(); _cv.notify_one(); // Wake publisher thread. } bool ros_bridge::com_private::TopicPublisher::advertise(const char *topic, const char *type) { std::unique_lock lk(this->_mutex); std::string t(topic); auto it = this->_topicMap.find(t); if ( it == this->_topicMap.end()) { // Need to advertise topic? std::string clientName = std::string(ros_bridge::com_private::_topicAdvertiserKey) + t; this->_topicMap.insert(std::make_pair(t, clientName)); lk.unlock(); this->_rbc.addClient(clientName); this->_rbc.advertise(clientName, t, type); return true; } else { lk.unlock(); #if ROS_BRIDGE_DEBUG std::cerr << "TopicPublisher: topic " << topic << " already advertised" << std::endl; #endif return false; } }