diff --git a/src/comm/ros_bridge/include/RosBridgeClient.h b/src/comm/ros_bridge/include/RosBridgeClient.h index 3d4ac6c1d3a1450cfa947e82711cd2f44bd2e470..ae1aeff3685431d6e78a6ea4f9bce90d8cd34c17 100644 --- a/src/comm/ros_bridge/include/RosBridgeClient.h +++ b/src/comm/ros_bridge/include/RosBridgeClient.h @@ -47,6 +47,9 @@ class RosbridgeWsClient std::deque advertised_topics_list; std::mutex mutex; + // Set to true on construction and to false on destruction. Used to notify independet threads. + std::shared_ptr alive; + void start(const std::string& client_name, std::shared_ptr client, const std::string& message) { if (!client->on_open) @@ -109,7 +112,8 @@ class RosbridgeWsClient } public: - RosbridgeWsClient(const std::string& server_port_path) + RosbridgeWsClient(const std::string& server_port_path) : + alive(std::make_shared(true)) { this->server_port_path = server_port_path; } @@ -124,6 +128,7 @@ public: { removeClient(client.first); } + alive->store(false); } // The execution can take up to 100 ms! @@ -235,6 +240,50 @@ public: #endif } + //! + //! \brief Returns a string containing all advertised topics. + //! \return Returns a string containing all advertised topics. + //! + //! \note This function will wait until the /rosapi/topics service is available. + //! \note Call connected() in advance to ensure that a connection was established. + //! + std::string get_advertised_topics(){ + this->waitForService("/rosapi/topics"); + std::shared_ptr> promise; + auto future = promise->get_future(); + // Call /rosapi/topics to see if topic is already available. + this->callService("/rosapi/topics", [promise]( + std::shared_ptr connection, + std::shared_ptr in_message){ + promise->set_value(in_message->string()); + connection->send_close(1000); + }); + future.wait(); + return future.get(); + } + + //! + //! \brief Returns a string containing all advertised services. + //! \return Returns a string containing all advertised services. + //! + //! \note This function will wait until the /rosapi/services service is available. + //! \note Call connected() in advance to ensure that a connection was established. + //! + std::string get_advertised_services(){ + this->waitForService("/rosapi/services"); + std::shared_ptr> promise; + auto future = promise->get_future(); + // Call /rosapi/services to see if topic is already available. + this->callService("/rosapi/services", [promise]( + std::shared_ptr connection, + std::shared_ptr in_message){ + promise->set_value(in_message->string()); + connection->send_close(1000); + }); + future.wait(); + return future.get(); + } + // Gets the client from client_map and starts it. Advertising is essentially sending a message. // One client per topic! void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "") @@ -266,9 +315,9 @@ public: message = "{" + message + "}"; #ifdef DEBUG - client->on_open = [client_name, message, ready](std::shared_ptr connection) { + client->on_open = [this, topic, message, ready, client_name](std::shared_ptr connection) { #else - client->on_open = [message, ready](std::shared_ptr connection) { + client->on_open = [this, topic, message, ready](std::shared_ptr connection) { #endif #ifdef DEBUG @@ -276,11 +325,22 @@ public: std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); - // Wait for rosbridge_server to process the request and mark topic as "ready". - // This could be avoided by waiting for a status message. However, at the time of - // writing rosbridge_server status messages are still experimental. - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - ready->store(true); + // Now wait until topic is successfully advertised. + while (true){ + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::string advertised_topics = this->get_advertised_topics(); + auto pos = advertised_topics.find(topic); + if (pos != std::string::npos){ // topic advertised! +#ifdef DEBUG + std::cout << client_name << ": topic " << topic << "successfully advertised" << std::endl; +#endif + break; + } +#ifdef DEBUG + std::cout << client_name << ": waiting for topic " << topic << "to be advertised" << std::endl; +#endif + } + ready->store(true); // Mark topic as successfully advertised. }; start(client_name, client, message); @@ -386,8 +446,8 @@ public: std::cout << client_name << ": Sending message." << std::endl; //std::cout << client_name << ": Sending message: " << message << std::endl; #endif - while ( !ready->load() ){ // Wait for the topic to be advertised. - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + while ( !ready->load() ){ // Wait for the topic to be successfully advertised. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); } connection->send(message); @@ -430,10 +490,34 @@ public: { message += ", \"compression\":\"" + compression + "\""; } - message = "{" + message + "}"; + message = "{" + message + "}"; + + auto client = it->second; + client->on_message = callback; + const auto cAlive = this->alive; + // Start thread to wait for topic to be advertised. + std::thread t([this, client_name, client, message, cAlive, topic](){ + while (cAlive->load()){ // RosBridgeClient alive? + auto advertised_topics = this->get_advertised_topics(); + auto pos = advertised_topics.find(topic); + if (pos != std::string::npos){ // desired topic available! +#ifdef DEBUG + std::cout << client_name << ": topic " << topic << "available" << std::endl; +#endif + break; + } +#ifdef DEBUG + std::cout << client_name << ": waiting for topic " << topic << std::endl; +#endif + std::this_thread::sleep_for(std::chrono::seconds(1)); + } - it->second->on_message = callback; - start(client_name, it->second, message); + if ( cAlive->load() ){ + this->start(client_name, client, message); // subscribe to topic + } + }); + + t.detach(); } #ifdef DEBUG else