#include "topic_publisher.h" static const char *topicAdvertiserKey = "topic_advertiser"; #include ros_bridge::com_private::TopicPublisher::TopicPublisher(RosbridgeWsClient &rbc) : _stopped(std::make_shared(true)), _rbc(rbc) {} ros_bridge::com_private::TopicPublisher::~TopicPublisher() { this->reset(); } void ros_bridge::com_private::TopicPublisher::start() { if (!_stopped->load()) // start called while thread running. return; _stopped->store(false); _pThread = std::make_unique([this] { // Main Loop. while (!this->_stopped->load()) { std::unique_lock lk(this->_mutex); // Check if new data available, wait if not. if (this->_queue.empty()) { if (_stopped->load()) // Check condition again while holding the lock. break; this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter // in this case. continue; } // Pop message from queue. JsonDocUPtr pJsonDoc = std::move(this->_queue.front()); this->_queue.pop_front(); lk.unlock(); // Get topic and type from Json message and remove it. std::string topic; bool ret = com_private::getTopic(*pJsonDoc, topic); assert(ret); (void)ret; // Debug rapidjson::StringBuffer sb; rapidjson::Writer writer(sb); pJsonDoc->Accept(writer); std::cout << "message: " << sb.GetString() << std::endl; std::cout << "topic: " << topic << std::endl; ret = com_private::removeTopic(*pJsonDoc); assert(ret); (void)ret; // Wait for topic (Rosbridge needs some time to process a advertise() // request). this->_rbc.waitForTopic(topic, [this] { return this->_stopped->load(); }); // Publish Json message. if (!this->_stopped->load()) this->_rbc.publish(topic, *pJsonDoc); } // while loop #ifdef ROS_BRIDGE_DEBUG std::cout << "TopicPublisher: publisher thread terminated." << std::endl; #endif }); } void ros_bridge::com_private::TopicPublisher::reset() { if (_stopped->load()) // stop called while thread not running. return; std::unique_lock lk(_mutex); _stopped->store(true); _cv.notify_one(); // Wake publisher thread. lk.unlock(); if (!_pThread) return; _pThread->join(); lk.lock(); // Tidy up. for (auto &it : this->_topicMap) { this->_rbc.unadvertise(it.first); this->_rbc.removeClient(it.second); } this->_topicMap.clear(); _queue.clear(); } void ros_bridge::com_private::TopicPublisher::publish( ros_bridge::com_private::JsonDocUPtr docPtr, const char *topic) { // Check if topic was advertised. std::string t(topic); std::unique_lock lk(this->_mutex); auto it = this->_topicMap.find(t); if (it == this->_topicMap.end()) { // Not yet advertised? lk.unlock(); #ifdef ROS_BRIDGE_DEBUG std::cerr << "TopicPublisher: topic " << t << " not yet advertised" << std::endl; #endif return; } lk.unlock(); // Add topic information to json doc. auto &allocator = docPtr->GetAllocator(); rapidjson::Value key("topic", allocator); rapidjson::Value value(topic, allocator); docPtr->AddMember(key, value, allocator); lk.lock(); _queue.push_back(std::move(docPtr)); lk.unlock(); _cv.notify_one(); // Wake publisher thread. } bool ros_bridge::com_private::TopicPublisher::advertise(const char *topic, const char *type) { std::unique_lock lk(this->_mutex); std::string t(topic); auto it = this->_topicMap.find(t); if (it == this->_topicMap.end()) { // Need to advertise topic? std::string clientName = std::string(topicAdvertiserKey) + t; this->_topicMap.insert(std::make_pair(t, clientName)); lk.unlock(); this->_rbc.addClient(clientName); this->_rbc.advertise(clientName, t, type); return true; } else { lk.unlock(); #if ROS_BRIDGE_DEBUG std::cerr << "TopicPublisher: topic " << topic << " already advertised" << std::endl; #endif return false; } }