#include "TopicSubscriber.h" ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(ROSBridge::CasePacker *casePacker, RosbridgeWsClient *rbc, std::mutex *rbcMutex) : _casePacker(casePacker) , _rbc(rbc) , _rbcMutex(rbcMutex) , _running(false) { } ROSBridge::ComPrivate::TopicSubscriber::~TopicSubscriber() { this->reset(); } void ROSBridge::ComPrivate::TopicSubscriber::start() { _running = true; } void ROSBridge::ComPrivate::TopicSubscriber::reset() { if ( _running ){ { std::lock_guard lk(*_rbcMutex); for (std::string clientName : _clientList){ _rbc->removeClient(clientName); } } _running = false; { std::lock_guard lk(_callbackMap.mutex); _callbackMap.map.clear(); } _clientList.clear(); } } bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( const char *topic, const std::function &callback) { if ( !_running ) return false; std::string clientName = ROSBridge::ComPrivate::_topicSubscriberKey + std::string(topic); _clientList.push_back(clientName); HashType hash = getHash(clientName); { std::lock_guard lk(_callbackMap.mutex); auto ret = _callbackMap.map.insert(std::make_pair(hash, callback)); // if ( !ret.second ) return false; // Topic subscription already present. } using namespace std::placeholders; auto f = std::bind(&ROSBridge::ComPrivate::subscriberCallback, hash, std::ref(_callbackMap), _1, _2); // std::cout << std::endl; // std::cout << "topic subscription" << std::endl; // std::cout << "client name: " << clientName << std::endl; // std::cout << "topic: " << topic << std::endl; { std::lock_guard lk(*_rbcMutex); _rbc->addClient(clientName); _rbc->subscribe(clientName, topic, f ); } return true; } using WsClient = SimpleWeb::SocketClient; void ROSBridge::ComPrivate::subscriberCallback( const HashType &hash, ROSBridge::ComPrivate::CallbackMapWrapper &mapWrapper, std::shared_ptr, std::shared_ptr in_message) { // Parse document. JsonDoc docFull; docFull.Parse(in_message->string().c_str()); if ( docFull.HasParseError() ) { std::cout << "Json document has parse error: " << in_message->string() << std::endl; return; } else if (!docFull.HasMember("msg")) { std::cout << "Json document does not contain a message (\"msg\"): " << in_message->string() << std::endl; return; } // std::cout << "Json document: " // << in_message->string() // << std::endl; // Search callback. CallbackType callback; { std::lock_guard lk(mapWrapper.mutex); auto it = mapWrapper.map.find(hash); if (it == mapWrapper.map.end()) { //assert(false); // callback not found return; } callback = it->second; } // Extract message and call callback. JsonDocUPtr pDoc(new JsonDoc()); pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator()); callback(std::move(pDoc)); return; }