diff --git a/src/comm/ros_bridge/include/ROSBridge.h b/src/comm/ros_bridge/include/ROSBridge.h index d1b50d05aa2ffff4be8cf0e8bcc51d57b3748b4b..c0d6af5c7c40693f0077e4656a9eb326f5b8715a 100644 --- a/src/comm/ros_bridge/include/ROSBridge.h +++ b/src/comm/ros_bridge/include/ROSBridge.h @@ -45,7 +45,7 @@ public: bool isRunning(); private: - bool _running; + std::shared_ptr _stopped; TypeFactory _typeFactory; JsonFactory _jsonFactory; CasePacker _casePacker; diff --git a/src/comm/ros_bridge/include/RosBridgeClient.h b/src/comm/ros_bridge/include/RosBridgeClient.h index ae1aeff3685431d6e78a6ea4f9bce90d8cd34c17..0abad5746e321fda19cfbab5fee880da7f4638c3 100644 --- a/src/comm/ros_bridge/include/RosBridgeClient.h +++ b/src/comm/ros_bridge/include/RosBridgeClient.h @@ -30,26 +30,30 @@ constexpr typename std::underlying_type::type integral(T value) class RosbridgeWsClient { - using TopicData = std::tuple /*client*/, - std::shared_ptr /*topic ready*/>; - enum class TopicEnum{ - TopicName = 0, - ClientName = 1, - Client = 2, - Ready = 3 - }; + enum class EntryType{ + SubscribedTopic, + AdvertisedTopic, + AdvertisedService, + }; + + using EntryData = std::tuple /*client*/>; + + enum class EntryEnum{ + EntryType = 0, + ServiceTopicName = 1, + ClientName = 2, + WPClient = 3 + }; std::string server_port_path; std::unordered_map /*client*/> client_map; - std::deque advertised_topics_list; + std::deque service_topic_list; std::mutex mutex; - // Set to true on construction and to false on destruction. Used to notify independet threads. - std::shared_ptr alive; - void start(const std::string& client_name, std::shared_ptr client, const std::string& message) { if (!client->on_open) @@ -112,23 +116,20 @@ class RosbridgeWsClient } public: - RosbridgeWsClient(const std::string& server_port_path) : - alive(std::make_shared(true)) + RosbridgeWsClient(const std::string& server_port_path) { this->server_port_path = server_port_path; } ~RosbridgeWsClient() { - for (auto& topicData : advertised_topics_list){ - unadvertise(std::get(topicData), - std::get(topicData)); - } + unsubscribeAll(); + unadvertiseAll(); + unadvertiseAllServices(); for (auto& client : client_map) { removeClient(client.first); } - alive->store(false); } // The execution can take up to 100 ms! @@ -225,6 +226,10 @@ public: #endif std::this_thread::sleep_for(std::chrono::milliseconds(100)); client->stop(); + client->on_open = NULL; + client->on_message = NULL; + client->on_close = NULL; + client->on_error = NULL; #ifdef DEBUG std::cout << client_name << " has been removed" << std::endl; #endif @@ -246,16 +251,15 @@ public: //! //! \note This function will wait until the /rosapi/topics service is available. //! \note Call connected() in advance to ensure that a connection was established. + //! \pre Connection must be established, \see \fn connected(). //! std::string get_advertised_topics(){ - this->waitForService("/rosapi/topics"); - std::shared_ptr> promise; - auto future = promise->get_future(); - // Call /rosapi/topics to see if topic is already available. - this->callService("/rosapi/topics", [promise]( + auto pPromise(std::make_shared>()); + auto future = pPromise->get_future(); + this->callService("/rosapi/topics", [pPromise]( std::shared_ptr connection, std::shared_ptr in_message){ - promise->set_value(in_message->string()); + pPromise->set_value(in_message->string()); connection->send_close(1000); }); future.wait(); @@ -270,20 +274,28 @@ public: //! \note Call connected() in advance to ensure that a connection was established. //! std::string get_advertised_services(){ - this->waitForService("/rosapi/services"); - std::shared_ptr> promise; - auto future = promise->get_future(); + auto pPromise(std::make_shared>()); + auto future = pPromise->get_future(); // Call /rosapi/services to see if topic is already available. - this->callService("/rosapi/services", [promise]( + this->callService("/rosapi/services", [pPromise]( std::shared_ptr connection, std::shared_ptr in_message){ - promise->set_value(in_message->string()); + pPromise->set_value(in_message->string()); connection->send_close(1000); }); future.wait(); return future.get(); } + bool topicAvailable(const std::string &topic){ +#ifdef DEBUG + std::cout << "checking if topic " << topic << " is available" << std::endl; +#endif + std::string advertised_topics = this->get_advertised_topics(); + auto pos = advertised_topics.find(topic); + return pos != std::string::npos ? true : false; + } + // Gets the client from client_map and starts it. Advertising is essentially sending a message. // One client per topic! void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "") @@ -292,20 +304,20 @@ public: std::unordered_map>::iterator it_client = client_map.find(client_name); if (it_client != client_map.end()) { - auto it_topic = std::find_if(advertised_topics_list.begin(), - advertised_topics_list.end(), - [topic](const TopicData &td){ - return topic == std::get(td); + auto it_ser_top = std::find_if(service_topic_list.begin(), + service_topic_list.end(), + [topic](const EntryData &td){ + return topic == std::get(td); }); - if ( it_topic != advertised_topics_list.end()){ + if ( it_ser_top != service_topic_list.end()){ #ifdef DEBUG std::cerr << "topic: " << topic << " already advertised" << std::endl; #endif return; } auto client = it_client->second; - auto ready = std::make_shared(false); - advertised_topics_list.push_back(std::make_tuple(topic, client_name, client, ready)); + std::weak_ptr wpClient = client; + service_topic_list.push_back(std::make_tuple(EntryType::AdvertisedTopic, topic, client_name, wpClient)); std::string message = "\"op\":\"advertise\", \"topic\":\"" + topic + "\", \"type\":\"" + type + "\""; if (id.compare("") != 0) @@ -315,7 +327,7 @@ public: message = "{" + message + "}"; #ifdef DEBUG - client->on_open = [this, topic, message, ready, client_name](std::shared_ptr connection) { + client->on_open = [this, topic, message, client_name](std::shared_ptr connection) { #else client->on_open = [this, topic, message, ready](std::shared_ptr connection) { #endif @@ -325,22 +337,6 @@ public: std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); - // Now wait until topic is successfully advertised. - while (true){ - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - std::string advertised_topics = this->get_advertised_topics(); - auto pos = advertised_topics.find(topic); - if (pos != std::string::npos){ // topic advertised! -#ifdef DEBUG - std::cout << client_name << ": topic " << topic << "successfully advertised" << std::endl; -#endif - break; - } -#ifdef DEBUG - std::cout << client_name << ": waiting for topic " << topic << "to be advertised" << std::endl; -#endif - } - ready->store(true); // Mark topic as successfully advertised. }; start(client_name, client, message); @@ -353,73 +349,79 @@ public: #endif } - void unadvertise(const std::string& client_name, - const std::string& topic, + //! + //! \brief Unadvertises the topice \p topic + //! \param topic The topic to be unadvertised + //! \param id + //! \pre The topic \p topic must be advertised, \see topicAvailable(). + //! + void unadvertise(const std::string& topic, const std::string& id = ""){ std::lock_guard lk(mutex); - std::unordered_map>::iterator it_client = client_map.find(client_name); - if (it_client != client_map.end()) - { - // Topic advertised? - auto it_topic = std::find_if(advertised_topics_list.begin(), - advertised_topics_list.end(), - [topic](const TopicData &td){ - return topic == std::get(td); - }); - if ( it_topic == advertised_topics_list.end()){ + auto it_ser_top = std::find_if(service_topic_list.begin(), + service_topic_list.end(), + [topic](const EntryData &td){ + return topic == std::get(td); + }); + if ( it_ser_top == service_topic_list.end()){ #ifdef DEBUG - std::cerr << "topic: " << topic << " not advertised" << std::endl; + std::cerr << "topic: " << topic << " not advertised" << std::endl; #endif - return; - } + return; + } - std::string message = "\"op\":\"unadvertise\""; - if (id.compare("") != 0) - { + std::string message = "\"op\":\"unadvertise\""; + if (id.compare("") != 0) + { message += ", \"id\":\"" + id + "\""; - } - message += ", \"topic\":\"" + topic + "\""; - message = "{" + message + "}"; + } + message += ", \"topic\":\"" + topic + "\""; + message = "{" + message + "}"; - auto client = it_client->second; - auto ready = std::get(*it_topic); + std::string client_name = "topic_unadvertiser" + topic; + auto client = std::make_shared(server_port_path); #ifdef DEBUG - client->on_open = [client_name, message, ready](std::shared_ptr connection) { + client->on_open = [client_name, message](std::shared_ptr connection) { #else - client->on_open = [message, ready](std::shared_ptr connection) { + client->on_open = [message](std::shared_ptr connection) { #endif #ifdef DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif - while ( !ready->load() ){ // Wait for topic to be advertised. - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } - connection->send(message); + connection->send(message); // unadvertise + connection->send_close(1000); }; start(client_name, client, message); - advertised_topics_list.erase(it_topic); - } - #ifdef DEBUG - else - { - std::cerr << client_name << "has not been created" << std::endl; - } - #endif + service_topic_list.erase(it_ser_top); + } + void unadvertiseAll(){ + for (auto entry : service_topic_list){ + if ( std::get(entry) == EntryType::AdvertisedTopic ){ + unadvertise(std::get(entry)); + } + } } + //! + //! \brief Publishes the message \p msg to the topic \p topic. + //! \param topic The topic to publish the message. + //! \param msg The message to publish. + //! \param id + //! \pre The topic \p topic must be advertised, \see topicAvailable(). + //! void publish(const std::string& topic, const rapidjson::Document& msg, const std::string& id = "") { std::lock_guard lk(mutex); - auto it_topic = std::find_if(advertised_topics_list.begin(), - advertised_topics_list.end(), - [&topic](const TopicData &td){ - return topic == std::get(td); + auto it_ser_top = std::find_if(service_topic_list.begin(), + service_topic_list.end(), + [&topic](const EntryData &td){ + return topic == std::get(td); }); - if ( it_topic == advertised_topics_list.end() ){ + if ( it_ser_top == service_topic_list.end() ){ #ifdef DEBUG std::cerr << "topic: " << topic << " not yet advertised" << std::endl; #endif @@ -439,16 +441,16 @@ public: message = "{" + message + "}"; std::shared_ptr publish_client = std::make_shared(server_port_path); - auto ready = std::get(*it_topic); - publish_client->on_open = [message, client_name, ready](std::shared_ptr connection) { +#ifdef DEBUG + publish_client->on_open = [message, client_name](std::shared_ptr connection) { +#else + publish_client->on_open = [message, client_name](std::shared_ptr connection) { +#endif #ifdef DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message." << std::endl; //std::cout << client_name << ": Sending message: " << message << std::endl; #endif - while ( !ready->load() ){ // Wait for the topic to be successfully advertised. - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } connection->send(message); // TODO: This could be improved by creating a thread to keep publishing the message instead of closing it right away @@ -458,12 +460,40 @@ public: start(client_name, publish_client, message); } + //! + //! \brief Subscribes the client \p client_name to the topic \p topic. + //! \param client_name + //! \param topic + //! \param callback + //! \param id + //! \param type + //! \param throttle_rate + //! \param queue_length + //! \param fragment_size + //! \param compression + //! \pre The topic \p topic must be advertised, \see topicAvailable(). + //! void subscribe(const std::string& client_name, const std::string& topic, const InMessage& callback, const std::string& id = "", const std::string& type = "", int throttle_rate = -1, int queue_length = -1, int fragment_size = -1, const std::string& compression = "") { std::lock_guard lk(mutex); - std::unordered_map>::iterator it = client_map.find(client_name); - if (it != client_map.end()) + std::unordered_map>::iterator it_client = client_map.find(client_name); + if (it_client != client_map.end()) { + auto it_ser_top = std::find_if(service_topic_list.begin(), + service_topic_list.end(), + [topic](const EntryData &td){ + return topic == std::get(td); + }); + if ( it_ser_top != service_topic_list.end()){ + #ifdef DEBUG + std::cerr << "topic: " << topic << " already advertised" << std::endl; + #endif + return; + } + auto client = it_client->second; + std::weak_ptr wpClient = client; + service_topic_list.push_back(std::make_tuple(EntryType::SubscribedTopic, topic, client_name, wpClient)); + std::string message = "\"op\":\"subscribe\", \"topic\":\"" + topic + "\""; if (id.compare("") != 0) @@ -490,34 +520,10 @@ public: { message += ", \"compression\":\"" + compression + "\""; } - message = "{" + message + "}"; + message = "{" + message + "}"; - auto client = it->second; client->on_message = callback; - const auto cAlive = this->alive; - // Start thread to wait for topic to be advertised. - std::thread t([this, client_name, client, message, cAlive, topic](){ - while (cAlive->load()){ // RosBridgeClient alive? - auto advertised_topics = this->get_advertised_topics(); - auto pos = advertised_topics.find(topic); - if (pos != std::string::npos){ // desired topic available! -#ifdef DEBUG - std::cout << client_name << ": topic " << topic << "available" << std::endl; -#endif - break; - } -#ifdef DEBUG - std::cout << client_name << ": waiting for topic " << topic << std::endl; -#endif - std::this_thread::sleep_for(std::chrono::seconds(1)); - } - - if ( cAlive->load() ){ - this->start(client_name, client, message); // subscribe to topic - } - }); - - t.detach(); + this->start(client_name, client, message); // subscribe to topic } #ifdef DEBUG else @@ -527,16 +533,82 @@ public: #endif } + void unsubscribe(const std::string& topic, + const std::string& id = ""){ + std::lock_guard lk(mutex); + auto it_ser_top = std::find_if(service_topic_list.begin(), + service_topic_list.end(), + [topic](const EntryData &td){ + return topic == std::get(td); + }); + if ( it_ser_top == service_topic_list.end()){ + #ifdef DEBUG + std::cerr << "topic: " << topic << " not advertised" << std::endl; + #endif + return; + } + + std::string message = "\"op\":\"unsubscribe\""; + if (id.compare("") != 0) + { + message += ", \"id\":\"" + id + "\""; + } + message += ", \"topic\":\"" + topic + "\""; + message = "{" + message + "}"; + + std::string client_name = "topic_unsubscriber" + topic; + auto client = std::make_shared(server_port_path); + #ifdef DEBUG + client->on_open = [client_name, message](std::shared_ptr connection) { + #else + client->on_open = [message](std::shared_ptr connection) { + #endif + + #ifdef DEBUG + std::cout << client_name << ": Opened connection" << std::endl; + std::cout << client_name << ": Sending message: " << message << std::endl; + #endif + connection->send(message); // unadvertise + connection->send_close(1000); + }; + + start(client_name, client, message); + service_topic_list.erase(it_ser_top); + } + + void unsubscribeAll(){ + for (auto entry : service_topic_list){ + if( std::get(entry) == EntryType::SubscribedTopic ) { + unsubscribe(std::get(entry)); + } + } + } + void advertiseService(const std::string& client_name, const std::string& service, const std::string& type, const InMessage& callback) { std::lock_guard lk(mutex); - std::unordered_map>::iterator it = client_map.find(client_name); - if (it != client_map.end()) + std::unordered_map>::iterator it_client = client_map.find(client_name); + if (it_client != client_map.end()) { + auto it_ser_top = std::find_if(service_topic_list.begin(), + service_topic_list.end(), + [service](const EntryData &td){ + return service == std::get(td); + }); + if ( it_ser_top != service_topic_list.end()){ +#ifdef DEBUG + std::cerr << "service: " << service << " already advertised" << std::endl; +#endif + return; + } + auto client = it_client->second; + std::weak_ptr wpClient = client; + service_topic_list.push_back(std::make_tuple(EntryType::AdvertisedService, service, client_name, wpClient)); + std::string message = "{\"op\":\"advertise_service\", \"service\":\"" + service + "\", \"type\":\"" + type + "\"}"; - it->second->on_message = callback; - start(client_name, it->second, message); + it_client->second->on_message = callback; + start(client_name, it_client->second, message); } #ifdef DEBUG else @@ -546,6 +618,52 @@ public: #endif } + void unadvertiseService(const std::string& service){ + std::lock_guard lk(mutex); + auto it_ser_top = std::find_if(service_topic_list.begin(), + service_topic_list.end(), + [service](const EntryData &td){ + return service == std::get(td); + }); + if ( it_ser_top == service_topic_list.end()){ + #ifdef DEBUG + std::cerr << "service: " << service << " not advertised" << std::endl; + #endif + return; + } + + std::string message = "\"op\":\"unadvertise_service\""; + message += ", \"service\":\"" + service + "\""; + message = "{" + message + "}"; + + std::string client_name = "topic_unsubscriber" + service; + auto client = std::make_shared(server_port_path); + #ifdef DEBUG + client->on_open = [client_name, message](std::shared_ptr connection) { + #else + client->on_open = [message](std::shared_ptr connection) { + #endif + + #ifdef DEBUG + std::cout << client_name << ": Opened connection" << std::endl; + std::cout << client_name << ": Sending message: " << message << std::endl; + #endif + connection->send(message); // unadvertise + connection->send_close(1000); + }; + + start(client_name, client, message); + service_topic_list.erase(it_ser_top); + } + + void unadvertiseAllServices(){ + for (auto entry : service_topic_list){ + if( std::get(entry) == EntryType::AdvertisedService ) { + unadvertiseService(std::get(entry)); + } + } + } + void serviceResponse(const std::string& service, const std::string& id, bool result, const rapidjson::Document& values) { std::string message = "\"op\":\"service_response\", \"service\":\"" + service + "\", \"result\":" + ((result)? "true" : "false"); @@ -562,19 +680,24 @@ public: message += ", \"values\":" + std::string(strbuf.GetString()); message = "{" + message + "}"; + std::string client_name = "service_response_client" + service; std::shared_ptr service_response_client = std::make_shared(server_port_path); +#ifdef DEBUG + service_response_client->on_open = [message, client_name](std::shared_ptr connection) { +#else service_response_client->on_open = [message](std::shared_ptr connection) { +#endif #ifdef DEBUG - std::cout << "service_response_client: Opened connection" << std::endl; - std::cout << "service_response_client: Sending message: " << message << std::endl; + std::cout << client_name << ": Opened connection" << std::endl; + std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); connection->send_close(1000); }; - start("service_response_client", service_response_client, message); + start(client_name, service_response_client, message); } void callService(const std::string& service, const InMessage& callback, const rapidjson::Document& args = {}, const std::string& id = "", int fragment_size = -1, const std::string& compression = "") @@ -603,6 +726,7 @@ public: } message = "{" + message + "}"; + std::string client_name = "call_service_client" + service; std::shared_ptr call_service_client = std::make_shared(server_port_path); if (callback) @@ -611,16 +735,16 @@ public: } else { - call_service_client->on_message = [](std::shared_ptr connection, std::shared_ptr in_message) { + call_service_client->on_message = [client_name](std::shared_ptr connection, std::shared_ptr in_message) { #ifdef DEBUG - std::cout << "call_service_client: Message received: " << in_message->string() << std::endl; - std::cout << "call_service_client: Sending close connection" << std::endl; + std::cout << client_name << ": Message received: " << in_message->string() << std::endl; + std::cout << client_name << ": Sending close connection" << std::endl; #endif connection->send_close(1000); }; } - start("call_service_client", call_service_client, message); + start(client_name, call_service_client, message); } //! @@ -631,70 +755,12 @@ public: //! bool serviceAvailable(const std::string& service) { - const rapidjson::Document args = {}; - const std::string& id = ""; - const int fragment_size = -1; - const std::string& compression = ""; - std::string message = "\"op\":\"call_service\", \"service\":\"" + service + "\""; - std::string client_name("wait_for_service_client"); - - if (!args.IsNull()) - { - rapidjson::StringBuffer strbuf; - rapidjson::Writer writer(strbuf); - args.Accept(writer); - - message += ", \"args\":" + std::string(strbuf.GetString()); - } - if (id.compare("") != 0) - { - message += ", \"id\":\"" + id + "\""; - } - if (fragment_size > -1) - { - message += ", \"fragment_size\":" + std::to_string(fragment_size); - } - if (compression.compare("none") == 0 || compression.compare("png") == 0) - { - message += ", \"compression\":\"" + compression + "\""; - } - message = "{" + message + "}"; - - std::shared_ptr wait_for_service_client = std::make_shared(server_port_path); - std::shared_ptr> p(std::make_shared>()); - auto future = p->get_future(); #ifdef DEBUG - wait_for_service_client->on_message = [p, service, client_name]( -#else - wait_for_service_client->on_message = [p]( -#endif - std::shared_ptr connection, - std::shared_ptr in_message){ -#ifdef DEBUG -#endif - rapidjson::Document doc; - doc.Parse(in_message->string().c_str()); - if ( !doc.HasParseError() - && doc.HasMember("result") - && doc["result"].IsBool() - && doc["result"] == true ) - { -#ifdef DEBUG - std::cout << client_name << ": " - << "service " << service - << " available." << std::endl; - std::cout << client_name << ": " - << "message: " << in_message->string() - << std::endl; -#endif - p->set_value(true); - } else { - p->set_value(false); - } - connection->send_close(1000); - }; - start(client_name, wait_for_service_client, message); - return future.get(); + std::cout << "checking if service " << service << " is available" << std::endl; +#endif + std::string advertised_services = this->get_advertised_services(); + auto pos = advertised_services.find(service); + return pos != std::string::npos ? true : false; } //! @@ -704,28 +770,80 @@ public: //! void waitForService(const std::string& service) { + auto stop = std::make_shared(false); + waitForService(service, stop); + } + //! + //! \brief Waits until the service with the name \p service is available. + //! \param service Service name. + //! \param stop Flag to stop waiting. + //! @note This function will block as long as the service is not available or \p stop is false. + //! + void waitForService(const std::string& service, const std::shared_ptr stop) + { +#ifdef DEBUG + auto s = std::chrono::high_resolution_clock::now(); + long counter = 0; +#endif + while( !stop->load() ) + { +#ifdef DEBUG + ++counter; +#endif + if ( serviceAvailable(service) ){ + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid excessive polling. + }; +#ifdef DEBUG + auto e = std::chrono::high_resolution_clock::now(); + std::cout << "waitForService() " << service << " time: " + << std::chrono::duration_cast(e-s).count() + << " ms." << std::endl; + std::cout << "waitForTopic() " << service << ": number of calls to topicAvailable: " + << counter << std::endl; +#endif + } + //! + //! \brief Waits until the topic with the name \p topic is available. + //! \param topic Topic name. + //! @note This function will block as long as the topic is not available. + //! + void waitForTopic(const std::string& topic) + { + auto stop = std::make_shared(false); + waitForTopic(topic, stop); + } + //! + //! \brief Waits until the topic with the name \p topic is available. + //! \param topic Topic name. + //! \param stop Flag to stop waiting. + //! @note This function will block as long as the topic is not available or \p stop is false. + //! + void waitForTopic(const std::string& topic, const std::shared_ptr stop) + { #ifdef DEBUG auto s = std::chrono::high_resolution_clock::now(); long counter = 0; #endif - while(true) + while( !stop->load()) { #ifdef DEBUG ++counter; #endif - if (serviceAvailable(service)){ + if ( topicAvailable(topic) ){ break; } - std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Avoid excessive polling. + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid excessive polling. }; #ifdef DEBUG auto e = std::chrono::high_resolution_clock::now(); - std::cout << "waitForService(): time: " + std::cout << "waitForTopic() " << topic << " time: " << std::chrono::duration_cast(e-s).count() << " ms." << std::endl; - std::cout << "waitForService(): clients launched: " + std::cout << "waitForTopic() " << topic << ": number of calls to topicAvailable: " << counter << std::endl; #endif } diff --git a/src/comm/ros_bridge/include/Server.cpp b/src/comm/ros_bridge/include/Server.cpp index 8341f78739429102f1c5e3fcb43db6f61e1092a9..e1650be408a34f051afb4c52341eac6d99b688eb 100644 --- a/src/comm/ros_bridge/include/Server.cpp +++ b/src/comm/ros_bridge/include/Server.cpp @@ -5,6 +5,7 @@ ROSBridge::ComPrivate::Server::Server(CasePacker &casePacker, RosbridgeWsClient &rbc) : _rbc(rbc) , _casePacker(casePacker) + , _stopped(std::make_shared(true)) { } @@ -14,10 +15,10 @@ void ROSBridge::ComPrivate::Server::advertiseService(const std::string &service, const Server::CallbackType &userCallback) { std::string clientName = _serviceAdvertiserKey + service; - auto it = std::find(_clientList.begin(), _clientList.end(), clientName); - if (it != _clientList.end()) + auto it = _serviceMap.find(clientName); + if (it != _serviceMap.end()) return; // Service allready advertised. - _clientList.push_back(clientName); + _serviceMap.insert(std::make_pair(clientName, service)); _rbc.addClient(clientName); auto rbc = &_rbc; @@ -80,14 +81,16 @@ ROSBridge::ComPrivate::Server::~Server() void ROSBridge::ComPrivate::Server::start() { - _running = true; + _stopped->store(false); } void ROSBridge::ComPrivate::Server::reset() { - if (!_running) + if ( _stopped->load() ) return; - for (const auto& str : _clientList) - _rbc.removeClient(str); - _clientList.clear(); + for (const auto& item : _serviceMap){ + _rbc.unadvertiseService(item.second); + _rbc.removeClient(item.first); + } + _serviceMap.clear(); } diff --git a/src/comm/ros_bridge/include/Server.h b/src/comm/ros_bridge/include/Server.h index 85f93c86c742ef4770b4f2a9a913e438724501b6..4f622ca54ad6a97982592bdaad002217ec9243c8 100644 --- a/src/comm/ros_bridge/include/Server.h +++ b/src/comm/ros_bridge/include/Server.h @@ -5,12 +5,14 @@ #include "ros_bridge/include/TypeFactory.h" #include "ros_bridge/include/CasePacker.h" +#include + namespace ROSBridge { namespace ComPrivate { class Server { - typedef std::vector ClientList; + typedef std::unordered_map ServiceMap; public: typedef std::function CallbackType; @@ -33,8 +35,8 @@ private: RosbridgeWsClient &_rbc; CasePacker &_casePacker; - ClientList _clientList; - bool _running; + ServiceMap _serviceMap; + std::shared_ptr _stopped; }; } // namespace ComPrivate } // namespace ROSBridge diff --git a/src/comm/ros_bridge/include/TopicPublisher.cpp b/src/comm/ros_bridge/include/TopicPublisher.cpp index 687be18d3be9b096d1556b29ba87631ba29f470e..430fe7ca021053fe8788fe246e2c4c9496c95caa 100644 --- a/src/comm/ros_bridge/include/TopicPublisher.cpp +++ b/src/comm/ros_bridge/include/TopicPublisher.cpp @@ -1,22 +1,10 @@ #include "TopicPublisher.h" - - - - -struct ROSBridge::ComPrivate::ThreadData -{ - const ROSBridge::CasePacker &casePacker; - RosbridgeWsClient &rbc; - ROSBridge::ComPrivate::JsonQueue &queue; - std::mutex &queueMutex; - const std::atomic &running; - std::condition_variable &cv; -}; +#include ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker &casePacker, RosbridgeWsClient &rbc) : - _running(false) + _stopped(std::make_shared(true)) , _casePacker(casePacker) , _rbc(rbc) { @@ -30,25 +18,65 @@ ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher() void ROSBridge::ComPrivate::TopicPublisher::start() { - if ( _running.load() ) // start called while thread running. + if ( !_stopped->load() ) // start called while thread running. return; - _running.store(true); - ROSBridge::ComPrivate::ThreadData data{ - _casePacker, - _rbc, - _queue, - _queueMutex, - _running, - _cv - }; - _pThread = std::make_unique(&ROSBridge::ComPrivate::transmittLoop, data); + _stopped->store(false); + _pThread = std::make_unique([this]{ + // Init. + std::unordered_map topicMap; + // Main Loop. + while( !this->_stopped->load()){ + std::unique_lock lk(this->_mutex); + // Check if new data available, wait if not. + if (this->_queue.empty()){ + this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case. + continue; + } + // Pop message from queue. + JsonDocUPtr pJsonDoc(std::move(this->_queue.front())); + this->_queue.pop_front(); + lk.unlock(); + + // Get tag from Json message and remove it. + Tag tag; + bool ret = this->_casePacker.getTag(pJsonDoc, tag); + assert(ret); // Json message does not contain a tag; + (void)ret; + this->_casePacker.removeTag(pJsonDoc); + + // Check if topic must be advertised. + // Advertised topics are stored in advertisedTopicsHashList as + // a hash. + std::string clientName = ROSBridge::ComPrivate::_topicAdvertiserKey + + tag.topic(); + auto it = topicMap.find(clientName); + if ( it == topicMap.end()) { // Need to advertise topic? + topicMap.insert(std::make_pair(clientName, tag.topic())); + this->_rbc.addClient(clientName); + this->_rbc.advertise(clientName, + tag.topic(), + tag.messageType() ); + this->_rbc.waitForTopic(tag.topic(), this->_stopped); // Wait until topic is advertised. + } + + // Publish Json message. + if ( !this->_stopped->load() ) + this->_rbc.publish(tag.topic(), *pJsonDoc.get()); + } // while loop + + // Tidy up. + for (auto& it : topicMap){ + this->_rbc.unadvertise(it.second); + this->_rbc.removeClient(it.first); + } + }); } void ROSBridge::ComPrivate::TopicPublisher::reset() { - if ( !_running.load() ) // stop called while thread not running. + if ( _stopped->load() ) // stop called while thread not running. return; - _running.store(false); + _stopped->store(true); _cv.notify_one(); // Wake publisher thread. if ( !_pThread ) return; @@ -56,51 +84,3 @@ void ROSBridge::ComPrivate::TopicPublisher::reset() _queue.clear(); } -void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data) -{ - // Init. - ClientMap clientMap; - // Main Loop. - while(data.running.load()){ - std::unique_lock lk(data.queueMutex); - // Check if new data available, wait if not. - if (data.queue.empty()){ - data.cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case. - continue; - } - // Pop message from queue. - JsonDocUPtr pJsonDoc(std::move(data.queue.front())); - data.queue.pop_front(); - lk.unlock(); - - // Get tag from Json message and remove it. - Tag tag; - bool ret = data.casePacker.getTag(pJsonDoc, tag); - assert(ret); // Json message does not contain a tag; - (void)ret; - data.casePacker.removeTag(pJsonDoc); - - // Check if topic must be advertised. - // Advertised topics are stored in advertisedTopicsHashList as - // a hash. - std::string clientName = ROSBridge::ComPrivate::_topicAdvertiserKey - + tag.topic(); - HashType hash = ROSBridge::ComPrivate::getHash(clientName); - auto it = clientMap.find(hash); - if ( it == clientMap.end()) { // Need to advertise topic? - clientMap.insert(std::make_pair(hash, clientName)); - data.rbc.addClient(clientName); - data.rbc.advertise(clientName, - tag.topic(), - tag.messageType() ); - } - - // Publish Json message. - data.rbc.publish(tag.topic(), *pJsonDoc.get()); - } // while loop - - // Tidy up. - for (auto& it : clientMap) - data.rbc.removeClient(it.second); -} - diff --git a/src/comm/ros_bridge/include/TopicPublisher.h b/src/comm/ros_bridge/include/TopicPublisher.h index 1c9dc680aec09f50ba5d9a64c3185e7d00ec6b07..9ca78b948d05aba710ba5caeb17934e40837a4bf 100644 --- a/src/comm/ros_bridge/include/TopicPublisher.h +++ b/src/comm/ros_bridge/include/TopicPublisher.h @@ -37,7 +37,7 @@ public: void publish(JsonDocUPtr docPtr){ { - std::lock_guard lock(_queueMutex); + std::lock_guard lock(_mutex); _queue.push_back(std::move(docPtr)); } _cv.notify_one(); // Wake publisher thread. @@ -51,8 +51,8 @@ public: private: JsonQueue _queue; - std::mutex _queueMutex; - std::atomic _running; + std::mutex _mutex; + std::shared_ptr _stopped; CasePacker &_casePacker; RosbridgeWsClient &_rbc; CondVar _cv; @@ -60,8 +60,5 @@ private: }; -void transmittLoop(ThreadData data); - - } // namespace CommunicatorDetail } // namespace ROSBridge diff --git a/src/comm/ros_bridge/include/TopicSubscriber.cpp b/src/comm/ros_bridge/include/TopicSubscriber.cpp index 7a1d8311fbdfa364ec6bf22c4413ed075d1d2aa8..e4fc27b5cf7da5dc1577bf1157faf412a514a738 100644 --- a/src/comm/ros_bridge/include/TopicSubscriber.cpp +++ b/src/comm/ros_bridge/include/TopicSubscriber.cpp @@ -5,7 +5,7 @@ ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(CasePacker &casePacker, RosbridgeWsClient &rbc) : _casePacker(casePacker) , _rbc(rbc) - , _running(false) + , _stopped(std::make_shared(true)) { } @@ -17,120 +17,78 @@ ROSBridge::ComPrivate::TopicSubscriber::~TopicSubscriber() void ROSBridge::ComPrivate::TopicSubscriber::start() { - _running = true; + _stopped->store(false); } void ROSBridge::ComPrivate::TopicSubscriber::reset() { - if ( _running ){ - _running = false; + if ( !_stopped->load() ){ + _stopped->store(true); { - for (std::string clientName : _clientList){ - _rbc.removeClient(clientName); + for (auto &item : _topicMap){ + _rbc.unsubscribe(item.second); + _rbc.removeClient(item.first); } } - { - std::lock_guard lk(_callbackMapStruct.mutex); - _callbackMapStruct.map.clear(); - } - _clientList.clear(); + _topicMap.clear(); } } -bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( +void ROSBridge::ComPrivate::TopicSubscriber::subscribe( const char *topic, const std::function &callback) { - if ( !_running ) - return false; + if ( _stopped->load() ) + return; std::string clientName = ROSBridge::ComPrivate::_topicSubscriberKey + std::string(topic); - _clientList.push_back(clientName); - - HashType hash = getHash(clientName); - { - std::lock_guard lk(_callbackMapStruct.mutex); - auto ret = _callbackMapStruct.map.insert(std::make_pair(hash, callback)); // - if ( !ret.second ) - return false; // Topic subscription already present. - + auto it = _topicMap.find(clientName); + if ( it != _topicMap.end() ){ // Topic already subscribed? + return; } + _topicMap.insert(std::make_pair(clientName, std::string(topic))); using namespace std::placeholders; - auto f = std::bind(&ROSBridge::ComPrivate::subscriberCallback, - hash, - std::ref(_callbackMapStruct), - _1, _2); - -// std::cout << std::endl; -// std::cout << "topic subscription" << std::endl; -// std::cout << "client name: " << clientName << std::endl; -// std::cout << "topic: " << topic << std::endl; - { - auto start = std::chrono::high_resolution_clock::now(); + auto callbackWrapper = [callback]( + std::shared_ptr, + std::shared_ptr in_message){ + // Parse document. + JsonDoc docFull; + docFull.Parse(in_message->string().c_str()); + if ( docFull.HasParseError() ) { + std::cout << "Json document has parse error: " + << in_message->string() + << std::endl; + return; + } else if (!docFull.HasMember("msg")) { + std::cout << "Json document does not contain a message (\"msg\"): " + << in_message->string() + << std::endl; + return; + } + + // Extract message and call callback. + JsonDocUPtr pDoc(new JsonDoc()); + pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator()); + callback(std::move(pDoc)); + return; + }; + + if ( !_rbc.topicAvailable(topic) ){ + // Wait until topic is available. + std::thread t([this, clientName, topic, callbackWrapper]{ + this->_rbc.waitForTopic(topic, this->_stopped); + if ( !this->_stopped->load() ){ + this->_rbc.addClient(clientName); + this->_rbc.subscribe(clientName, topic, callbackWrapper); + } + }); + t.detach(); + } else { _rbc.addClient(clientName); - auto end = std::chrono::high_resolution_clock::now(); - std::cout << "add client time: " - << std::chrono::duration_cast(end-start).count() - << " ms" << std::endl; - start = std::chrono::high_resolution_clock::now(); - _rbc.subscribe(clientName, - topic, - f ); - end = std::chrono::high_resolution_clock::now(); - std::cout << "subscribe time: " - << std::chrono::duration_cast(end-start).count() - << " ms" << std::endl; + _rbc.subscribe(clientName, topic, callbackWrapper); } - - return true; } -using WsClient = SimpleWeb::SocketClient; -void ROSBridge::ComPrivate::subscriberCallback( - const HashType &hash, - ROSBridge::ComPrivate::MapStruct &mapWrapper, - std::shared_ptr, - std::shared_ptr in_message) -{ - // Parse document. - JsonDoc docFull; - docFull.Parse(in_message->string().c_str()); - if ( docFull.HasParseError() ) { - std::cout << "Json document has parse error: " - << in_message->string() - << std::endl; - return; - } else if (!docFull.HasMember("msg")) { - std::cout << "Json document does not contain a message (\"msg\"): " - << in_message->string() - << std::endl; - return; - } - - -// std::cout << "Json document: " -// << in_message->string() -// << std::endl; - - // Search callback. - CallbackType callback; - { - std::lock_guard lk(mapWrapper.mutex); - auto it = mapWrapper.map.find(hash); - if (it == mapWrapper.map.end()) { - //assert(false); // callback not found - return; - } - callback = it->second; - } - - // Extract message and call callback. - JsonDocUPtr pDoc(new JsonDoc()); - pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator()); - callback(std::move(pDoc)); - return; - -} diff --git a/src/comm/ros_bridge/include/TopicSubscriber.h b/src/comm/ros_bridge/include/TopicSubscriber.h index 585e364771181201657952739438a5fea7b9653b..5aae3cd510da6b0cee4fde85c48a6cd384a29a18 100644 --- a/src/comm/ros_bridge/include/TopicSubscriber.h +++ b/src/comm/ros_bridge/include/TopicSubscriber.h @@ -5,20 +5,17 @@ #include "ros_bridge/include/TypeFactory.h" #include "ros_bridge/include/CasePacker.h" +#include + namespace ROSBridge { namespace ComPrivate { typedef std::function CallbackType; -struct MapStruct{ - typedef std::map Map; - Map map; - std::mutex mutex; -}; class TopicSubscriber { - typedef std::vector ClientList; + typedef std::unordered_map TopicMap; public: TopicSubscriber() = delete; @@ -34,7 +31,7 @@ public: //! @return Returns false if a subscription to this topic allready exists. //! //! @note Only one callback can be registered. - bool subscribe(const char* topic, const CallbackType &callback); + void subscribe(const char* topic, const CallbackType &callback); private: @@ -42,14 +39,9 @@ private: CasePacker &_casePacker; RosbridgeWsClient &_rbc; - MapStruct _callbackMapStruct; - ClientList _clientList; - bool _running; + TopicMap _topicMap; + std::shared_ptr _stopped; }; -void subscriberCallback(const HashType &hash, - MapStruct &mapWrapper, - std::shared_ptr /*connection*/, - std::shared_ptr in_message); } // namespace ComPrivate } // namespace ROSBridge diff --git a/src/comm/ros_bridge/src/ROSBridge.cpp b/src/comm/ros_bridge/src/ROSBridge.cpp index adac201da53bd77c40e6dd4cdb4ff3a0a4e17576..69b226b218bd5d4615f9909b47d42ab531a55636 100644 --- a/src/comm/ros_bridge/src/ROSBridge.cpp +++ b/src/comm/ros_bridge/src/ROSBridge.cpp @@ -1,7 +1,7 @@ #include "ros_bridge/include/ROSBridge.h" ROSBridge::ROSBridge::ROSBridge() : - _running(false) + _stopped(std::make_shared(true)) , _casePacker(&_typeFactory, &_jsonFactory) , _rbc("localhost:9090") , _topicPublisher(_casePacker, _rbc) @@ -38,7 +38,7 @@ void ROSBridge::ROSBridge::start() _topicPublisher.start(); _topicSubscriber.start(); _server.start(); - _running = true; + _stopped->store(false); } void ROSBridge::ROSBridge::reset() @@ -46,7 +46,7 @@ void ROSBridge::ROSBridge::reset() _topicPublisher.reset(); _topicSubscriber.reset(); _server.reset(); - _running = false; + _stopped->store(true); } bool ROSBridge::ROSBridge::ping() @@ -56,6 +56,6 @@ bool ROSBridge::ROSBridge::ping() bool ROSBridge::ROSBridge::isRunning() { - return _running; + return !_stopped->load(); }