#include "topic_subscriber.h" #include static const char *topicSubscriberKey = "topic_subscriber"; ros_bridge::com_private::TopicSubscriber::TopicSubscriber( RosbridgeWsClient &rbc) : _rbc(rbc), _stopped(std::make_shared(true)) {} ros_bridge::com_private::TopicSubscriber::~TopicSubscriber() { this->reset(); } void ros_bridge::com_private::TopicSubscriber::start() { _stopped->store(false); } void ros_bridge::com_private::TopicSubscriber::reset() { if (!_stopped->load()) { _stopped->store(true); { for (auto &item : _topicMap) { _rbc.unsubscribe(item.second); _rbc.removeClient(item.first); } } _topicMap.clear(); } } void ros_bridge::com_private::TopicSubscriber::subscribe( const char *topic, const std::function &callback) { if (_stopped->load()) return; std::string clientName = topicSubscriberKey + std::string(topic); auto it = _topicMap.find(clientName); if (it != _topicMap.end()) { // Topic already subscribed? return; } _topicMap.insert(std::make_pair(clientName, std::string(topic))); // Wrap callback. auto callbackWrapper = [callback](std::shared_ptr, std::shared_ptr in_message) { // Parse document. JsonDoc docFull; docFull.Parse(in_message->string().c_str()); if (docFull.HasParseError()) { std::cout << "Json document has parse error: " << in_message->string() << std::endl; return; } else if (!docFull.HasMember("msg")) { std::cout << "Json document does not contain a message (\"msg\"): " << in_message->string() << std::endl; return; } // Extract message and call callback. JsonDocUPtr pDoc(new JsonDoc()); pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator()); callback(std::move(pDoc)); }; if (!_rbc.topicAvailable(topic)) { // Wait until topic is available. std::cout << "TopicSubscriber: Starting wait thread, " << clientName << std::endl; std::thread t([this, clientName, topic, callbackWrapper] { this->_rbc.waitForTopic(topic, [this] { return this->_stopped->load(); }); if (!this->_stopped->load()) { this->_rbc.addClient(clientName); this->_rbc.subscribe(clientName, topic, callbackWrapper); std::cout << "TopicSubscriber: wait thread subscription successfull: " << clientName << std::endl; } std::cout << "TopicSubscriber: wait thread end, " << clientName << std::endl; }); t.detach(); } else { _rbc.addClient(clientName); _rbc.subscribe(clientName, topic, callbackWrapper); std::cout << "TopicSubscriber: subscription successfull: " << clientName << std::endl; } }