#include "topic_subscriber.h" #include ros_bridge::com_private::TopicSubscriber::TopicSubscriber(RosbridgeWsClient &rbc) : _rbc(rbc) , _stopped(std::make_shared(true)) { } ros_bridge::com_private::TopicSubscriber::~TopicSubscriber() { this->reset(); } void ros_bridge::com_private::TopicSubscriber::start() { _stopped->store(false); } void ros_bridge::com_private::TopicSubscriber::reset() { if ( !_stopped->load() ){ _stopped->store(true); { for (auto &item : _topicMap){ _rbc.unsubscribe(item.second); _rbc.removeClient(item.first); } } _topicMap.clear(); } } void ros_bridge::com_private::TopicSubscriber::subscribe( const char *topic, const std::function &callback) { if ( _stopped->load() ) return; std::string clientName = ros_bridge::com_private::_topicSubscriberKey + std::string(topic); auto it = _topicMap.find(clientName); if ( it != _topicMap.end() ){ // Topic already subscribed? return; } _topicMap.insert(std::make_pair(clientName, std::string(topic))); // Wrap callback. auto callbackWrapper = [callback]( std::shared_ptr, std::shared_ptr in_message) { // Parse document. JsonDoc docFull; docFull.Parse(in_message->string().c_str()); if ( docFull.HasParseError() ) { std::cout << "Json document has parse error: " << in_message->string() << std::endl; return; } else if (!docFull.HasMember("msg")) { std::cout << "Json document does not contain a message (\"msg\"): " << in_message->string() << std::endl; return; } // Extract message and call callback. JsonDocUPtr pDoc(new JsonDoc()); pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator()); callback(std::move(pDoc)); }; if ( !_rbc.topicAvailable(topic) ){ // Wait until topic is available. std::cout << "TopicSubscriber: Starting wait thread, " << clientName << std::endl; std::thread t([this, clientName, topic, callbackWrapper]{ this->_rbc.waitForTopic(topic, [this]{ return this->_stopped->load(); }); if ( !this->_stopped->load() ){ this->_rbc.addClient(clientName); this->_rbc.subscribe(clientName, topic, callbackWrapper); std::cout << "TopicSubscriber: wait thread subscription successfull: " << clientName << std::endl; } std::cout << "TopicSubscriber: wait thread end, " << clientName << std::endl; }); t.detach(); } else { _rbc.addClient(clientName); _rbc.subscribe(clientName, topic, callbackWrapper); std::cout << "TopicSubscriber: subscription successfull: " << clientName << std::endl; } }