#include "RosBridgeClient.h" #include #include #include #include #include struct Task { std::function ready; // Condition under which command should be executed. std::function execute; // Command to execute. std::function expired; // Returns true if the command is expired. std::function clear_up; // operation to perform if task expired. std::string name; }; void RosbridgeWsClient::start(const std::__cxx11::string &client_name, std::shared_ptr client, const std::__cxx11::string &message) { #ifndef ROS_BRIDGE_DEBUG (void)client_name; #endif if (!client->on_open) { #ifdef ROS_BRIDGE_DEBUG client->on_open = [client_name, message]( std::shared_ptr connection) { #else client->on_open = [message](std::shared_ptr connection) { #endif #ifdef ROS_BRIDGE_DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); }; } #ifdef ROS_BRIDGE_DEBUG if (!client->on_message) { client->on_message = [client_name](std::shared_ptr /*connection*/, std::shared_ptr in_message) { std::cout << client_name << ": Message received: " << in_message->string() << std::endl; }; } if (!client->on_close) { client->on_close = [client_name](std::shared_ptr /*connection*/, int status, const std::string & /*reason*/) { std::cout << client_name << ": Closed connection with status code " << status << std::endl; }; } if (!client->on_error) { // See // http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference.html, // Error Codes for error code meanings client->on_error = [client_name](std::shared_ptr /*connection*/, const SimpleWeb::error_code &ec) { std::cout << client_name << ": Error: " << ec << ", error message: " << ec.message() << std::endl; }; } #endif #ifdef ROS_BRIDGE_DEBUG std::thread client_thread([client_name, client]() { #else std::thread client_thread([client]() { #endif client->start(); #ifdef ROS_BRIDGE_DEBUG std::cout << client_name << ": Terminated" << std::endl; #endif client->on_open = NULL; client->on_message = NULL; client->on_close = NULL; client->on_error = NULL; #ifdef ROS_BRIDGE_DEBUG std::cout << client_name << " thread end" << std::endl; #endif }); client_thread.detach(); } RosbridgeWsClient::RosbridgeWsClient(const std::string &server_port_path, bool run) : server_port_path(server_port_path), isConnected(std::make_shared(false)), stopped(std::make_shared(true)) { if (run) this->run(); } RosbridgeWsClient::RosbridgeWsClient( const std::__cxx11::string &server_port_path) : RosbridgeWsClient::RosbridgeWsClient(server_port_path, true) {} RosbridgeWsClient::~RosbridgeWsClient() { reset(); } bool RosbridgeWsClient::connected() { return isConnected->load(); } void RosbridgeWsClient::run() { if (!stopped->load()) return; stopped->store(false); // Start periodic thread to monitor connection status, advertised topics etc. periodic_thread = std::make_shared([this] { std::list task_list; constexpr auto poll_interval = std::chrono::seconds(1); auto poll_time_point = std::chrono::high_resolution_clock::now(); while (!this->stopped->load()) { // ==================================================================================== // Add tasks. // ==================================================================================== if (std::chrono::high_resolution_clock::now() > poll_time_point) { poll_time_point = std::chrono::high_resolution_clock::now() + poll_interval; std::string reset_status_task_name = "reset_status_task"; // Add status task if necessary. auto const it = std::find_if(task_list.begin(), task_list.end(), [&reset_status_task_name](const Task &t) { return t.name == reset_status_task_name; }); if (it == task_list.end()) { // Check connection status. auto status_set = std::make_shared(false); auto status_client = std::make_shared(this->server_port_path); status_client->on_open = [status_set, this](std::shared_ptr) { this->isConnected->store(true); status_set->store(true); }; std::thread status_thread( [status_client] { status_client->start(); }); status_thread.detach(); // Create task to reset isConnected after one second. Task reset_task; reset_task.name = reset_status_task_name; // condition auto now = std::chrono::high_resolution_clock::now(); const auto t_trigger = now + std::chrono::seconds(1); reset_task.ready = [t_trigger] { return std::chrono::high_resolution_clock::now() > t_trigger; }; // command reset_task.execute = [status_client, status_set, this] { status_client->stop(); this->isConnected->store(false); status_set->store(true); }; // expired reset_task.expired = [status_set] { return status_set->load(); }; // clear up reset_task.clear_up = [status_client, this] { status_client->stop(); }; task_list.push_back(reset_task); } if (this->isConnected->load()) { // Add available topics task if neccessary. std::string reset_topics_task_name = "reset_topics_task"; auto const topics_it = std::find_if(task_list.begin(), task_list.end(), [&reset_topics_task_name](const Task &t) { return t.name == reset_topics_task_name; }); if (topics_it == task_list.end()) { // Call /rosapi/topics service. auto topics_set = std::make_shared(false); this->callService( "/rosapi/topics", [topics_set, this](std::shared_ptr connection, std::shared_ptr in_message) { std::unique_lock lk(this->mutex); this->available_topics = in_message->string(); lk.unlock(); topics_set->store(true); connection->send_close(1000); }); // Create task to reset topics after one second. Task reset_task; reset_task.name = reset_topics_task_name; // condition auto now = std::chrono::high_resolution_clock::now(); auto t_trigger = now + std::chrono::seconds(1); reset_task.ready = [t_trigger] { return std::chrono::high_resolution_clock::now() > t_trigger; }; // command reset_task.execute = [topics_set, this] { std::unique_lock lk(this->mutex); this->available_topics.clear(); lk.unlock(); topics_set->store(true); }; // expired reset_task.expired = [topics_set] { return topics_set->load(); }; // clear up reset_task.clear_up = [this] { return; }; task_list.push_back(reset_task); } // Add available services task if neccessary. std::string reset_services_name = "reset_services_task"; auto const services_it = std::find_if(task_list.begin(), task_list.end(), [&reset_services_name](const Task &t) { return t.name == reset_services_name; }); if (services_it == task_list.end()) { // Call /rosapi/services service. auto services_set = std::make_shared(false); this->callService( "/rosapi/services", [this, services_set]( std::shared_ptr connection, std::shared_ptr in_message) { std::unique_lock lk(this->mutex); this->available_services = in_message->string(); lk.unlock(); services_set->store(true); connection->send_close(1000); }); // Create task to reset services after one second. Task reset_task; reset_task.name = reset_services_name; // condition auto now = std::chrono::high_resolution_clock::now(); auto t_trigger = now + std::chrono::seconds(1); reset_task.ready = [t_trigger] { return std::chrono::high_resolution_clock::now() > t_trigger; }; // command reset_task.execute = [services_set, this] { std::unique_lock lk(this->mutex); this->available_services.clear(); lk.unlock(); services_set->store(true); }; // expired reset_task.expired = [services_set] { return services_set->load(); }; // clear up reset_task.clear_up = [this] { return; }; task_list.push_back(reset_task); } } else { std::lock_guard lk(mutex); available_topics.clear(); available_services.clear(); } } // ==================================================================================== // Process tasks. // ==================================================================================== for (auto task_it = task_list.begin(); task_it != task_list.end();) { if (!task_it->expired()) { if (task_it->ready()) { task_it->execute(); task_it = task_list.erase(task_it); } else { ++task_it; } } else { task_it->clear_up(); task_it = task_list.erase(task_it); } } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } // Clear up remaining tasks. for (auto task_it = task_list.begin(); task_it != task_list.end(); ++task_it) { task_it->clear_up(); } task_list.clear(); #ifdef ROS_BRIDGE_DEBUG std::cout << "periodic thread end" << std::endl; #endif }); } bool RosbridgeWsClient::running() { return !this->stopped->load(); } void RosbridgeWsClient::stop() { if (stopped->load()) return; stopped->store(true); periodic_thread->join(); } void RosbridgeWsClient::reset() { stop(); unsubscribeAll(); unadvertiseAll(); unadvertiseAllServices(); std::unique_lock lk(mutex); for (auto it = client_map.begin(); it != client_map.end();) { std::string client_name = it->first; lk.unlock(); removeClient(client_name); lk.lock(); it = client_map.begin(); } } void RosbridgeWsClient::addClient(const std::string &client_name) { std::lock_guard lk(mutex); std::unordered_map>::iterator it = client_map.find(client_name); if (it == client_map.end()) { client_map[client_name] = std::make_shared(server_port_path); } #ifdef ROS_BRIDGE_DEBUG else { std::cerr << client_name << " has already been created" << std::endl; } #endif } std::shared_ptr RosbridgeWsClient::getClient(const std::string &client_name) { std::lock_guard lk(mutex); std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { return it->second; } return NULL; } void RosbridgeWsClient::stopClient(const std::string &client_name) { std::lock_guard lk(mutex); std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { // Stop the client asynchronously in 100 ms. // This is to ensure, that all threads involving the client have been // launched. std::shared_ptr client = it->second; #ifdef ROS_BRIDGE_DEBUG std::thread t([client, client_name]() { #else std::thread t([client]() { #endif std::this_thread::sleep_for(std::chrono::milliseconds(100)); client->stop(); // The next lines of code seem to cause a double free or corruption error, // why? // client->on_open = NULL; // client->on_message = NULL; // client->on_close = NULL; // client->on_error = NULL; #ifdef ROS_BRIDGE_DEBUG std::cout << client_name << " has been removed" << std::endl; #endif }); t.detach(); } #ifdef ROS_BRIDGE_DEBUG else { std::cerr << client_name << " has not been created" << std::endl; } #endif } void RosbridgeWsClient::removeClient(const std::string &client_name) { stopClient(client_name); { std::lock_guard lk(mutex); std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { client_map.erase(it); } #ifdef ROS_BRIDGE_DEBUG else { std::cerr << client_name << " has not been created" << std::endl; } #endif } } std::string RosbridgeWsClient::getAdvertisedTopics() { std::lock_guard lk(mutex); return available_topics; } std::string RosbridgeWsClient::getAdvertisedServices() { std::lock_guard lk(mutex); return available_services; } bool RosbridgeWsClient::topicAvailable(const std::string &topic) { std::lock_guard lk(mutex); #ifdef ROS_BRIDGE_DEBUG std::cout << "checking if topic " << topic << " is available" << std::endl; std::cout << "available topics: " << available_topics << std::endl; #endif size_t pos; { pos = available_topics.find(topic); } bool ret = pos != std::string::npos ? true : false; #ifdef ROS_BRIDGE_DEBUG if (ret) { std::cout << "topic " << topic << " is available" << std::endl; } else { std::cout << "topic " << topic << " is not available" << std::endl; } #endif return ret; } void RosbridgeWsClient::advertise(const std::string &client_name, const std::string &topic, const std::string &type, const std::string &id) { std::lock_guard lk(mutex); std::unordered_map>::iterator it_client = client_map.find(client_name); if (it_client != client_map.end()) { auto it_ser_top = std::find_if( service_topic_list.begin(), service_topic_list.end(), [topic](const EntryData &td) { return topic == std::get(td); }); if (it_ser_top != service_topic_list.end()) { #ifdef ROS_BRIDGE_DEBUG std::cerr << "topic: " << topic << " already advertised" << std::endl; #endif return; } auto client = it_client->second; std::weak_ptr wpClient = client; service_topic_list.push_back(std::make_tuple(EntryType::AdvertisedTopic, topic, client_name, wpClient)); std::string message = "\"op\":\"advertise\", \"topic\":\"" + topic + "\", \"type\":\"" + type + "\""; if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } message = "{" + message + "}"; #ifdef ROS_BRIDGE_DEBUG client->on_open = [this, topic, message, client_name]( std::shared_ptr connection) { #else client->on_open = [this, topic, message]( std::shared_ptr connection) { #endif #ifdef ROS_BRIDGE_DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); }; start(client_name, client, message); } #ifdef ROS_BRIDGE_DEBUG else { std::cerr << client_name << "has not been created" << std::endl; } #endif } void RosbridgeWsClient::unadvertise(const std::string &topic, const std::string &id) { std::lock_guard lk(mutex); auto it_ser_top = std::find_if( service_topic_list.begin(), service_topic_list.end(), [&topic](const EntryData &td) { return topic == std::get(td); }); if (it_ser_top == service_topic_list.end()) { #ifdef ROS_BRIDGE_DEBUG std::cerr << "topic: " << topic << " not advertised" << std::endl; #endif return; } std::string message = "\"op\":\"unadvertise\""; if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } message += ", \"topic\":\"" + topic + "\""; message = "{" + message + "}"; std::string client_name = "topic_unadvertiser" + topic; auto client = std::make_shared(server_port_path); #ifdef ROS_BRIDGE_DEBUG client->on_open = [client_name, message]( std::shared_ptr connection) { #else client->on_open = [message](std::shared_ptr connection) { #endif #ifdef ROS_BRIDGE_DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); // unadvertise connection->send_close(1000); }; start(client_name, client, message); service_topic_list.erase(it_ser_top); } void RosbridgeWsClient::unadvertiseAll() { for (auto entry : service_topic_list) { if (std::get(entry) == EntryType::AdvertisedTopic) { unadvertise(std::get(entry)); } } } void RosbridgeWsClient::publish(const std::string &topic, const rapidjson::Document &msg, const std::string &id) { std::lock_guard lk(mutex); auto it_ser_top = std::find_if( service_topic_list.begin(), service_topic_list.end(), [&topic](const EntryData &td) { return topic == std::get(td); }); if (it_ser_top == service_topic_list.end()) { #ifdef ROS_BRIDGE_DEBUG std::cerr << "topic: " << topic << " not yet advertised" << std::endl; #endif return; } rapidjson::StringBuffer strbuf; rapidjson::Writer writer(strbuf); msg.Accept(writer); std::string client_name = "publish_client" + topic; std::string message = "\"op\":\"publish\", \"topic\":\"" + topic + "\", \"msg\":" + strbuf.GetString(); if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } message = "{" + message + "}"; std::shared_ptr publish_client = std::make_shared(server_port_path); #ifdef ROS_BRIDGE_DEBUG publish_client->on_open = [message, client_name](std::shared_ptr connection) { #else publish_client->on_open = [message, client_name](std::shared_ptr connection) { #endif #ifdef ROS_BRIDGE_DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message." << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); // TODO: This could be improved by creating a thread to keep publishing // the message instead of closing it right away connection->send_close(1000); }; start(client_name, publish_client, message); } void RosbridgeWsClient::subscribe(const std::string &client_name, const std::string &topic, const InMessage &callback, const std::string &id, const std::string &type, int throttle_rate, int queue_length, int fragment_size, const std::string &compression) { std::lock_guard lk(mutex); std::unordered_map>::iterator it_client = client_map.find(client_name); if (it_client != client_map.end()) { auto it_ser_top = std::find_if( service_topic_list.begin(), service_topic_list.end(), [topic](const EntryData &td) { return topic == std::get(td); }); if (it_ser_top != service_topic_list.end()) { #ifdef ROS_BRIDGE_DEBUG std::cerr << "topic: " << topic << " already advertised" << std::endl; #endif return; } auto client = it_client->second; std::weak_ptr wpClient = client; service_topic_list.push_back(std::make_tuple(EntryType::SubscribedTopic, topic, client_name, wpClient)); std::string message = "\"op\":\"subscribe\", \"topic\":\"" + topic + "\""; if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } if (type.compare("") != 0) { message += ", \"type\":\"" + type + "\""; } if (throttle_rate > -1) { message += ", \"throttle_rate\":" + std::to_string(throttle_rate); } if (queue_length > -1) { message += ", \"queue_length\":" + std::to_string(queue_length); } if (fragment_size > -1) { message += ", \"fragment_size\":" + std::to_string(fragment_size); } if (compression.compare("none") == 0 || compression.compare("png") == 0) { message += ", \"compression\":\"" + compression + "\""; } message = "{" + message + "}"; client->on_message = callback; this->start(client_name, client, message); // subscribe to topic } #ifdef ROS_BRIDGE_DEBUG else { std::cerr << client_name << "has not been created" << std::endl; } #endif } void RosbridgeWsClient::unsubscribe(const std::string &topic, const std::string &id) { std::lock_guard lk(mutex); auto it_ser_top = std::find_if( service_topic_list.begin(), service_topic_list.end(), [topic](const EntryData &td) { return topic == std::get(td); }); if (it_ser_top == service_topic_list.end()) { #ifdef ROS_BRIDGE_DEBUG std::cerr << "topic: " << topic << " not advertised" << std::endl; #endif return; } std::string message = "\"op\":\"unsubscribe\""; if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } message += ", \"topic\":\"" + topic + "\""; message = "{" + message + "}"; std::string client_name = "topic_unsubscriber" + topic; auto client = std::make_shared(server_port_path); #ifdef ROS_BRIDGE_DEBUG client->on_open = [client_name, message]( std::shared_ptr connection) { #else client->on_open = [message](std::shared_ptr connection) { #endif #ifdef ROS_BRIDGE_DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); // unadvertise connection->send_close(1000); }; start(client_name, client, message); service_topic_list.erase(it_ser_top); } void RosbridgeWsClient::unsubscribeAll() { for (auto entry : service_topic_list) { if (std::get(entry) == EntryType::SubscribedTopic) { unsubscribe(std::get(entry)); } } } void RosbridgeWsClient::advertiseService(const std::string &client_name, const std::string &service, const std::string &type, const InMessage &callback) { std::lock_guard lk(mutex); std::unordered_map>::iterator it_client = client_map.find(client_name); if (it_client != client_map.end()) { auto it_ser_top = std::find_if( service_topic_list.begin(), service_topic_list.end(), [service](const EntryData &td) { return service == std::get(td); }); if (it_ser_top != service_topic_list.end()) { #ifdef ROS_BRIDGE_DEBUG std::cerr << "service: " << service << " already advertised" << std::endl; #endif return; } auto client = it_client->second; std::weak_ptr wpClient = client; service_topic_list.push_back(std::make_tuple( EntryType::AdvertisedService, service, client_name, wpClient)); std::string message = "{\"op\":\"advertise_service\", \"service\":\"" + service + "\", \"type\":\"" + type + "\"}"; it_client->second->on_message = callback; start(client_name, it_client->second, message); } #ifdef ROS_BRIDGE_DEBUG else { std::cerr << client_name << "has not been created" << std::endl; } #endif } void RosbridgeWsClient::unadvertiseService(const std::string &service) { std::lock_guard lk(mutex); auto it_ser_top = std::find_if( service_topic_list.begin(), service_topic_list.end(), [service](const EntryData &td) { return service == std::get(td); }); if (it_ser_top == service_topic_list.end()) { #ifdef ROS_BRIDGE_DEBUG std::cerr << "service: " << service << " not advertised" << std::endl; #endif return; } std::string message = "\"op\":\"unadvertise_service\""; message += ", \"service\":\"" + service + "\""; message = "{" + message + "}"; std::string client_name = "service_unadvertiser" + service; auto client = std::make_shared(server_port_path); #ifdef ROS_BRIDGE_DEBUG client->on_open = [client_name, message]( std::shared_ptr connection) { #else client->on_open = [message](std::shared_ptr connection) { #endif #ifdef ROS_BRIDGE_DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); // unadvertise connection->send_close(1000); }; start(client_name, client, message); service_topic_list.erase(it_ser_top); } void RosbridgeWsClient::unadvertiseAllServices() { for (auto entry : service_topic_list) { if (std::get(entry) == EntryType::AdvertisedService) { unadvertiseService( std::get(entry)); } } } void RosbridgeWsClient::serviceResponse(const std::string &service, const std::string &id, bool result, const rapidjson::Document &values) { std::string message = "\"op\":\"service_response\", \"service\":\"" + service + "\", \"result\":" + ((result) ? "true" : "false"); // Rosbridge somehow does not allow service_response to be sent without id and // values , so we cannot omit them even though the documentation says they are // optional. message += ", \"id\":\"" + id + "\""; // Convert JSON document to string rapidjson::StringBuffer strbuf; rapidjson::Writer writer(strbuf); values.Accept(writer); message += ", \"values\":" + std::string(strbuf.GetString()); message = "{" + message + "}"; std::string client_name = "service_response_client" + service; std::shared_ptr service_response_client = std::make_shared(server_port_path); #ifdef ROS_BRIDGE_DEBUG service_response_client->on_open = [message, client_name](std::shared_ptr connection) { #else service_response_client->on_open = [message](std::shared_ptr connection) { #endif #ifdef ROS_BRIDGE_DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); connection->send_close(1000); }; start(client_name, service_response_client, message); } void RosbridgeWsClient::callService(const std::string &service, const InMessage &callback, const rapidjson::Document &args, const std::string &id, int fragment_size, const std::string &compression) { std::string message = "\"op\":\"call_service\", \"service\":\"" + service + "\""; if (!args.IsNull()) { rapidjson::StringBuffer strbuf; rapidjson::Writer writer(strbuf); args.Accept(writer); message += ", \"args\":" + std::string(strbuf.GetString()); } if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } if (fragment_size > -1) { message += ", \"fragment_size\":" + std::to_string(fragment_size); } if (compression.compare("none") == 0 || compression.compare("png") == 0) { message += ", \"compression\":\"" + compression + "\""; } message = "{" + message + "}"; std::string client_name = "call_service_client" + service; std::shared_ptr call_service_client = std::make_shared(server_port_path); if (callback) { call_service_client->on_message = callback; } else { call_service_client->on_message = [client_name](std::shared_ptr connection, std::shared_ptr in_message) { #ifdef ROS_BRIDGE_DEBUG std::cout << client_name << ": Message received: " << in_message->string() << std::endl; std::cout << client_name << ": Sending close connection" << std::endl; #else (void)in_message; #endif connection->send_close(1000); }; } start(client_name, call_service_client, message); } bool RosbridgeWsClient::serviceAvailable(const std::string &service) { #ifdef ROS_BRIDGE_DEBUG std::cout << "checking if service " << service << " is available" << std::endl; #endif size_t pos; { std::lock_guard lk(mutex); pos = available_services.find(service); } bool ret = pos != std::string::npos ? true : false; #ifdef ROS_BRIDGE_DEBUG if (ret) { std::cout << "service " << service << " is available" << std::endl; } else { std::cout << "service " << service << " is not available" << std::endl; } #endif return ret; } void RosbridgeWsClient::waitForService(const std::string &service) { waitForService(service, [] { return false; // never stop }); } void RosbridgeWsClient::waitForService(const std::string &service, const std::function stop) { #ifdef ROS_BRIDGE_DEBUG auto s = std::chrono::high_resolution_clock::now(); long counter = 0; #endif const auto poll_interval = std::chrono::milliseconds(1000); auto poll_time_point = std::chrono::high_resolution_clock::now(); while (!stop()) { if (std::chrono::high_resolution_clock::now() > poll_time_point) { #ifdef ROS_BRIDGE_DEBUG ++counter; #endif if (serviceAvailable(service)) { break; } poll_time_point = std::chrono::high_resolution_clock::now() + poll_interval; } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } }; #ifdef ROS_BRIDGE_DEBUG auto e = std::chrono::high_resolution_clock::now(); std::cout << "waitForService() " << service << " time: " << std::chrono::duration_cast(e - s).count() << " ms." << std::endl; std::cout << "waitForTopic() " << service << ": number of calls to topicAvailable: " << counter << std::endl; #endif } void RosbridgeWsClient::waitForTopic(const std::string &topic) { waitForTopic(topic, [] { return false; // never stop }); } void RosbridgeWsClient::waitForTopic(const std::string &topic, const std::function stop) { #ifdef ROS_BRIDGE_DEBUG auto s = std::chrono::high_resolution_clock::now(); long counter = 0; #endif const auto poll_interval = std::chrono::milliseconds(1000); auto poll_time_point = std::chrono::high_resolution_clock::now(); while (!stop()) { if (std::chrono::high_resolution_clock::now() > poll_time_point) { #ifdef ROS_BRIDGE_DEBUG ++counter; #endif if (topicAvailable(topic)) { break; } poll_time_point = std::chrono::high_resolution_clock::now() + poll_interval; } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } }; #ifdef ROS_BRIDGE_DEBUG auto e = std::chrono::high_resolution_clock::now(); std::cout << "waitForTopic() " << topic << " time: " << std::chrono::duration_cast(e - s).count() << " ms." << std::endl; std::cout << "waitForTopic() " << topic << ": number of calls to topicAvailable: " << counter << std::endl; #endif } bool is_valid_port_path(std::string server_port_path) { std::regex url_regex( "^(((([a-z]|[A-z])+([0-9]|_)*\\.*([a-z]|[A-z])+([0-9]|_)*))" "|(((1?[0-9]{1,2}|2[0-4][0-9]|25[0-5])\\.){3}(1?[0-9]{1,2}|2[0-4][0-9]|" "25[0-5]){1}))" ":[0-9]+$"); return std::regex_match(server_port_path, url_regex); }