Commit 585c0b82 authored by Valentin Platzgummer's avatar Valentin Platzgummer

RosBridgeClient::advertise mod

parent 4f76dc12
...@@ -47,6 +47,9 @@ class RosbridgeWsClient ...@@ -47,6 +47,9 @@ class RosbridgeWsClient
std::deque<TopicData> advertised_topics_list; std::deque<TopicData> advertised_topics_list;
std::mutex mutex; std::mutex mutex;
// Set to true on construction and to false on destruction. Used to notify independet threads.
std::shared_ptr<std::atomic_bool> alive;
void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message) void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message)
{ {
if (!client->on_open) if (!client->on_open)
...@@ -109,7 +112,8 @@ class RosbridgeWsClient ...@@ -109,7 +112,8 @@ class RosbridgeWsClient
} }
public: public:
RosbridgeWsClient(const std::string& server_port_path) RosbridgeWsClient(const std::string& server_port_path) :
alive(std::make_shared<std::atomic_bool>(true))
{ {
this->server_port_path = server_port_path; this->server_port_path = server_port_path;
} }
...@@ -124,6 +128,7 @@ public: ...@@ -124,6 +128,7 @@ public:
{ {
removeClient(client.first); removeClient(client.first);
} }
alive->store(false);
} }
// The execution can take up to 100 ms! // The execution can take up to 100 ms!
...@@ -235,6 +240,50 @@ public: ...@@ -235,6 +240,50 @@ public:
#endif #endif
} }
//!
//! \brief Returns a string containing all advertised topics.
//! \return Returns a string containing all advertised topics.
//!
//! \note This function will wait until the /rosapi/topics service is available.
//! \note Call connected() in advance to ensure that a connection was established.
//!
std::string get_advertised_topics(){
this->waitForService("/rosapi/topics");
std::shared_ptr<std::promise<std::string>> promise;
auto future = promise->get_future();
// Call /rosapi/topics to see if topic is already available.
this->callService("/rosapi/topics", [promise](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
promise->set_value(in_message->string());
connection->send_close(1000);
});
future.wait();
return future.get();
}
//!
//! \brief Returns a string containing all advertised services.
//! \return Returns a string containing all advertised services.
//!
//! \note This function will wait until the /rosapi/services service is available.
//! \note Call connected() in advance to ensure that a connection was established.
//!
std::string get_advertised_services(){
this->waitForService("/rosapi/services");
std::shared_ptr<std::promise<std::string>> promise;
auto future = promise->get_future();
// Call /rosapi/services to see if topic is already available.
this->callService("/rosapi/services", [promise](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
promise->set_value(in_message->string());
connection->send_close(1000);
});
future.wait();
return future.get();
}
// Gets the client from client_map and starts it. Advertising is essentially sending a message. // Gets the client from client_map and starts it. Advertising is essentially sending a message.
// One client per topic! // One client per topic!
void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "") void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "")
...@@ -266,9 +315,9 @@ public: ...@@ -266,9 +315,9 @@ public:
message = "{" + message + "}"; message = "{" + message + "}";
#ifdef DEBUG #ifdef DEBUG
client->on_open = [client_name, message, ready](std::shared_ptr<WsClient::Connection> connection) { client->on_open = [this, topic, message, ready, client_name](std::shared_ptr<WsClient::Connection> connection) {
#else #else
client->on_open = [message, ready](std::shared_ptr<WsClient::Connection> connection) { client->on_open = [this, topic, message, ready](std::shared_ptr<WsClient::Connection> connection) {
#endif #endif
#ifdef DEBUG #ifdef DEBUG
...@@ -276,11 +325,22 @@ public: ...@@ -276,11 +325,22 @@ public:
std::cout << client_name << ": Sending message: " << message << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl;
#endif #endif
connection->send(message); connection->send(message);
// Wait for rosbridge_server to process the request and mark topic as "ready". // Now wait until topic is successfully advertised.
// This could be avoided by waiting for a status message. However, at the time of while (true){
// writing rosbridge_server status messages are still experimental. std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::string advertised_topics = this->get_advertised_topics();
ready->store(true); auto pos = advertised_topics.find(topic);
if (pos != std::string::npos){ // topic advertised!
#ifdef DEBUG
std::cout << client_name << ": topic " << topic << "successfully advertised" << std::endl;
#endif
break;
}
#ifdef DEBUG
std::cout << client_name << ": waiting for topic " << topic << "to be advertised" << std::endl;
#endif
}
ready->store(true); // Mark topic as successfully advertised.
}; };
start(client_name, client, message); start(client_name, client, message);
...@@ -386,8 +446,8 @@ public: ...@@ -386,8 +446,8 @@ public:
std::cout << client_name << ": Sending message." << std::endl; std::cout << client_name << ": Sending message." << std::endl;
//std::cout << client_name << ": Sending message: " << message << std::endl; //std::cout << client_name << ": Sending message: " << message << std::endl;
#endif #endif
while ( !ready->load() ){ // Wait for the topic to be advertised. while ( !ready->load() ){ // Wait for the topic to be successfully advertised.
std::this_thread::sleep_for(std::chrono::milliseconds(500)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
connection->send(message); connection->send(message);
...@@ -430,10 +490,34 @@ public: ...@@ -430,10 +490,34 @@ public:
{ {
message += ", \"compression\":\"" + compression + "\""; message += ", \"compression\":\"" + compression + "\"";
} }
message = "{" + message + "}"; message = "{" + message + "}";
auto client = it->second;
client->on_message = callback;
const auto cAlive = this->alive;
// Start thread to wait for topic to be advertised.
std::thread t([this, client_name, client, message, cAlive, topic](){
while (cAlive->load()){ // RosBridgeClient alive?
auto advertised_topics = this->get_advertised_topics();
auto pos = advertised_topics.find(topic);
if (pos != std::string::npos){ // desired topic available!
#ifdef DEBUG
std::cout << client_name << ": topic " << topic << "available" << std::endl;
#endif
break;
}
#ifdef DEBUG
std::cout << client_name << ": waiting for topic " << topic << std::endl;
#endif
std::this_thread::sleep_for(std::chrono::seconds(1));
}
it->second->on_message = callback; if ( cAlive->load() ){
start(client_name, it->second, message); this->start(client_name, client, message); // subscribe to topic
}
});
t.detach();
} }
#ifdef DEBUG #ifdef DEBUG
else else
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment