#include "TopicSubscriber.h" #include ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(CasePacker &casePacker, RosbridgeWsClient &rbc) : _casePacker(casePacker) , _rbc(rbc) , _stopped(std::make_shared(true)) { } ROSBridge::ComPrivate::TopicSubscriber::~TopicSubscriber() { this->reset(); } void ROSBridge::ComPrivate::TopicSubscriber::start() { _stopped->store(false); } void ROSBridge::ComPrivate::TopicSubscriber::reset() { if ( !_stopped->load() ){ std::cout << "TopicSubscriber: _stopped->store(true) " << std::endl; _stopped->store(true); { for (auto &item : _topicMap){ std::cout << "TopicSubscriber: unsubscribe " << item.second << std::endl; _rbc.unsubscribe(item.second); std::cout << "TopicSubscriber: removeClient " << item.first << std::endl; _rbc.removeClient(item.first); } } _topicMap.clear(); std::cout << "TopicSubscriber: _topicMap cleared " << std::endl; } } void ROSBridge::ComPrivate::TopicSubscriber::subscribe( const char *topic, const std::function &callback) { if ( _stopped->load() ) return; std::string clientName = ROSBridge::ComPrivate::_topicSubscriberKey + std::string(topic); auto it = _topicMap.find(clientName); if ( it != _topicMap.end() ){ // Topic already subscribed? return; } _topicMap.insert(std::make_pair(clientName, std::string(topic))); // Wrap callback. using namespace std::placeholders; auto callbackWrapper = [callback]( std::shared_ptr, std::shared_ptr in_message) { // Parse document. JsonDoc docFull; docFull.Parse(in_message->string().c_str()); if ( docFull.HasParseError() ) { std::cout << "Json document has parse error: " << in_message->string() << std::endl; return; } else if (!docFull.HasMember("msg")) { std::cout << "Json document does not contain a message (\"msg\"): " << in_message->string() << std::endl; return; } // Extract message and call callback. JsonDocUPtr pDoc(new JsonDoc()); pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator()); callback(std::move(pDoc)); }; if ( !_rbc.topicAvailable(topic) ){ // Wait until topic is available. std::cout << "TopicSubscriber: Starting wait thread, " << clientName << std::endl; std::thread t([this, clientName, topic, callbackWrapper]{ this->_rbc.waitForTopic(topic, [this]{ return this->_stopped->load(); }); if ( !this->_stopped->load() ){ this->_rbc.addClient(clientName); this->_rbc.subscribe(clientName, topic, callbackWrapper); std::cout << "TopicSubscriber: wait thread subscription successfull: " << clientName << std::endl; } std::cout << "TopicSubscriber: wait thread end, " << clientName << std::endl; }); t.detach(); } else { _rbc.addClient(clientName); _rbc.subscribe(clientName, topic, callbackWrapper); std::cout << "TopicSubscriber: subscription successfull: " << clientName << std::endl; } }