From 46b4c65d789be12fecd42fecb535524c9025eee2 Mon Sep 17 00:00:00 2001 From: Valentin Platzgummer Date: Tue, 18 Aug 2020 17:27:41 +0200 Subject: [PATCH] RosBridgeClient imporved --- .../ros_bridge/include/RosBridgeClient.cpp | 291 ++++++++++++------ src/comm/ros_bridge/include/RosBridgeClient.h | 14 +- 2 files changed, 200 insertions(+), 105 deletions(-) diff --git a/src/comm/ros_bridge/include/RosBridgeClient.cpp b/src/comm/ros_bridge/include/RosBridgeClient.cpp index 70cafb1b1..84de696fe 100644 --- a/src/comm/ros_bridge/include/RosBridgeClient.cpp +++ b/src/comm/ros_bridge/include/RosBridgeClient.cpp @@ -7,10 +7,11 @@ struct Task{ - std::function condition; // Condition under which command should be executed. - std::function command; // Command to execute. + std::function ready; // Condition under which command should be executed. + std::function execute; // Command to execute. std::function expired; // Returns true if the command is expired. std::function clear_up; // operation to perform if task expired. + std::string name; }; void RosbridgeWsClient::start(const std::__cxx11::string &client_name, std::shared_ptr client, const std::__cxx11::string &message) @@ -83,104 +84,208 @@ void RosbridgeWsClient::start(const std::__cxx11::string &client_name, std::shar RosbridgeWsClient::RosbridgeWsClient(const std::__cxx11::string &server_port_path) : server_port_path(server_port_path) + , isConnected(std::make_shared(false)) , stopped(std::make_shared(false)) - , hasConnection(std::make_shared(false)) { - // Start poll thread to monitor connection status, advertised topics etc. - pPoll_thread = std::make_shared ([this] { - const auto poll_interval = std::chrono::milliseconds(1000); + // Start periodic thread to monitor connection status, advertised topics etc. + periodic_thread = std::make_shared ([this] { + std::list task_list; + constexpr auto poll_interval = std::chrono::seconds(1); + auto poll_time_point = std::chrono::high_resolution_clock::now(); while ( !this->stopped->load() ) { - auto start = std::chrono::high_resolution_clock::now(); - auto pPromise_status = std::make_shared>(); - - // Check connection status. - auto future_status = pPromise_status->get_future(); - - auto status_client = std::make_shared(this->server_port_path); - status_client->on_open = [pPromise_status](std::shared_ptr) { - pPromise_status->set_value(); - }; - - std::thread status_thread([status_client]{ - status_client->start(); - status_client->on_open = NULL; - status_client->on_message = NULL; - status_client->on_close = NULL; - status_client->on_error = NULL; - }); - - std::list, /*condition*/ - std::function, /*command*/ - std::function /*clear_up*/ - >> to_do_list; - - auto pConnection_task = std::make_shared; - pConnection_task->condition = [&future_status]{ - return future_status.wait_for(std::chrono::seconds(0)) == std::future_status::ready; - }; - pConnection_task->command = [&status_client, &status_thread, &future_status, this]{ - status_client->stop(); - status_thread.join(); - auto status = future_status.wait_for(std::chrono::milliseconds(0)); - this->hasConnection->store(status == std::future_status::ready); - }; - auto now = std::chrono::high_resolution_clock::now(); - auto t_expire = now + std::chrono::seconds(1); - pConnection_task->expired = [t_expire]{ - return std::chrono::high_resolution_clock::now() > t_expire; - } - pConnection_task->clear_up = []{ - - } - to_do_list.push_back(pConnection_task); - - std::future future_topics; - std::future future_services; - if ( this->hasConnection->load() ){ - // Fetch available topics - auto pPromise_topics(std::make_shared>()); - future_topics = pPromise_topics->get_future(); - this->callService("/rosapi/topics", [pPromise_topics]( - std::shared_ptr connection, - std::shared_ptr in_message){ - pPromise_topics->set_value(in_message->string()); - connection->send_close(1000); + // ==================================================================================== + // Add tasks. + // ==================================================================================== + if ( std::chrono::high_resolution_clock::now() > poll_time_point) { + poll_time_point = std::chrono::high_resolution_clock::now() + poll_interval; + std::cout << "Starting new poll." << std::endl; + std::cout << "connected: " << this->isConnected->load() << std::endl; + std::string reset_status_task_name = "reset_status_task"; + // Add status task if necessary. + auto const it = std::find_if(task_list.begin(), task_list.end(), + [&reset_status_task_name](const Task &t){ + return t.name == reset_status_task_name; }); - condition = [&future_topics]{ - return future_topics.wait_for(std::chrono::seconds(0)) == std::future_status::ready; - }; - command = [&future_topics, this]{ - std::lock_guard lk(this->mutex); - this->available_topics = future_topics.get(); + if ( it == task_list.end() ){ + std::cout << "Adding status_task" << std::endl; + // Check connection status. + auto status_set = std::make_shared(false); + auto status_client = std::make_shared(this->server_port_path); + status_client->on_open = [status_set, this](std::shared_ptr) { + std::cout << "status_client opened" << std::endl; + this->isConnected->store(true); + status_set->store(true); + }; + + std::thread status_thread([status_client]{ + status_client->start(); + status_client->on_open = NULL; + status_client->on_message = NULL; + status_client->on_close = NULL; + status_client->on_error = NULL; + }); + status_thread.detach(); + + // Create task to reset isConnected after one second. + Task reset_task; + reset_task.name = reset_status_task_name; + // condition + auto now = std::chrono::high_resolution_clock::now(); + const auto t_trigger = now + std::chrono::seconds(1); + reset_task.ready = [t_trigger]{ + return std::chrono::high_resolution_clock::now() > t_trigger; + }; + // command + reset_task.execute = [status_client, status_set, this]{ + status_client->stop(); + this->isConnected->store(false); + status_set->store(true); + }; + // expired + reset_task.expired = [status_set]{ + return status_set->load(); + }; + // clear up + reset_task.clear_up = [status_client, this]{ + status_client->stop(); + }; + task_list.push_back(reset_task); } - to_do_list.push_back(std::make_pair(condition, command)); - - // Fetch available services - auto pPromise_services(std::make_shared>()); - future_services = pPromise_services->get_future(); - this->callService("/rosapi/services", [pPromise_services]( - std::shared_ptr connection, - std::shared_ptr in_message){ - pPromise_services->set_value(in_message->string()); - connection->send_close(1000); - }); - condition = [&future_services]{ - return future_services.wait_for(std::chrono::seconds(0)) == std::future_status::ready; - }; - command = [&future_services, this]{ - std::lock_guard lk(this->mutex); - this->available_services = future_services.get(); + + if ( this->isConnected->load() ){ + // Add available topics task if neccessary. + std::string reset_topics_task_name = "reset_topics_task"; + auto const topics_it = std::find_if(task_list.begin(), task_list.end(), [&reset_topics_task_name](const Task &t){ + return t.name == reset_topics_task_name; + }); + if ( topics_it == task_list.end() ){ + // Call /rosapi/topics service. + std::cout << "Adding reset_topics_task" << std::endl; + auto topics_set = std::make_shared(false); + this->callService("/rosapi/topics", [topics_set, this]( + std::shared_ptr connection, + std::shared_ptr in_message){ + std::cout << "/rosapi/topics: " << in_message->string() << std::endl; + std::unique_lock lk(this->mutex); + this->available_topics = in_message->string(); + lk.unlock(); + topics_set->store(true); + connection->send_close(1000); + }); + + // Create task to reset topics after one second. + Task reset_task; + reset_task.name = reset_topics_task_name; + // condition + auto now = std::chrono::high_resolution_clock::now(); + auto t_trigger = now + std::chrono::seconds(1); + reset_task.ready = [t_trigger]{ + return std::chrono::high_resolution_clock::now() > t_trigger; + }; + // command + reset_task.execute = [topics_set, this]{ + std::unique_lock lk(this->mutex); + this->available_topics.clear(); + lk.unlock(); + topics_set->store(true); + }; + // expired + reset_task.expired = [topics_set]{ + return topics_set->load(); + }; + // clear up + reset_task.clear_up = [this]{ + return; + }; + task_list.push_back(reset_task); + } + + // Add available services task if neccessary. + std::string reset_services_name = "reset_services_task"; + auto const services_it = std::find_if(task_list.begin(), task_list.end(), [&reset_services_name](const Task &t){ + return t.name == reset_services_name; + }); + if ( services_it == task_list.end() ){ + // Call /rosapi/services service. + std::cout << "Adding reset_services_task" << std::endl; + auto services_set = std::make_shared(false); + this->callService("/rosapi/services", [this, services_set]( + std::shared_ptr connection, + std::shared_ptr in_message){ + std::cout << "/rosapi/services: " << in_message->string() << std::endl; + std::unique_lock lk(this->mutex); + this->available_services = in_message->string(); + lk.unlock(); + services_set->store(true); + connection->send_close(1000); + }); + + // Create task to reset services after one second. + Task reset_task; + reset_task.name = reset_services_name; + // condition + auto now = std::chrono::high_resolution_clock::now(); + auto t_trigger = now + std::chrono::seconds(1); + reset_task.ready = [t_trigger]{ + return std::chrono::high_resolution_clock::now() > t_trigger; + }; + // command + reset_task.execute = [services_set, this]{ + std::unique_lock lk(this->mutex); + this->available_services.clear(); + lk.unlock(); + services_set->store(true); + }; + // expired + reset_task.expired = [services_set]{ + return services_set->load(); + }; + // clear up + reset_task.clear_up = [this]{ + return; + }; + task_list.push_back(reset_task); + } + } else { + std::lock_guard lk(mutex); + available_topics.clear(); + available_services.clear(); } - to_do_list.push_back(std::make_pair(condition, command)); } - auto end = std::chrono::high_resolution_clock::now(); - auto t_sleep = poll_interval-(end-start); - if ( t_sleep > std::chrono::milliseconds(0)){ - std::this_thread::sleep_for(t_sleep); + // ==================================================================================== + // Process tasks. + // ==================================================================================== + for ( auto task_it = task_list.begin(); task_it != task_list.end(); ){ + std::cout << "processing task: " << task_it->name << std::endl; + if ( !task_it->expired() ){ + if ( task_it->ready() ){ + std::cout << "executing task: " << task_it->name << std::endl; + task_it->execute(); + task_it = task_list.erase(task_it); + } else { + std::cout << "noting to do for task: " << task_it->name << std::endl; + ++task_it; + } + } else { + std::cout << "task expired: " << task_it->name << std::endl; + task_it->clear_up(); + task_it = task_list.erase(task_it); + } + std::cout << std::endl; } + + std::cout << "task list size: " << task_list.size() << std::endl; + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - std::cout << "connection monitor thread end" << std::endl; + + // Clear up remaining tasks. + for ( auto task_it = task_list.begin(); task_it != task_list.end(); ++task_it){ + task_it->clear_up(); + } + task_list.clear(); + std::cout << "periodic thread end" << std::endl; }); } @@ -194,13 +299,11 @@ RosbridgeWsClient::~RosbridgeWsClient() { removeClient(client.first); } - pPoll_thread->join(); + periodic_thread->join(); } bool RosbridgeWsClient::connected(){ - - std::cout << "connected " << hasConnection->load() << std::endl; - return hasConnection->load(); + return isConnected->load(); } void RosbridgeWsClient::addClient(const std::string &client_name) diff --git a/src/comm/ros_bridge/include/RosBridgeClient.h b/src/comm/ros_bridge/include/RosBridgeClient.h index d9850c30f..82dc4862f 100644 --- a/src/comm/ros_bridge/include/RosBridgeClient.h +++ b/src/comm/ros_bridge/include/RosBridgeClient.h @@ -44,26 +44,18 @@ class RosbridgeWsClient ServiceTopicName = 1, ClientName = 2, WPClient = 3 - }; - struct Task; + }; const std::string server_port_path; std::unordered_map /*client*/> client_map; std::deque service_topic_list; std::mutex mutex; - std::shared_ptr hasConnection; + std::shared_ptr isConnected; std::shared_ptr stopped; - - std::shared_ptr pPoll_thread; std::string available_topics; std::string available_services; - - - std::shared_ptr pWorker_thread; - - std::list> to_do_list; - std::mutex task_mutex; + std::shared_ptr periodic_thread; -- 2.22.0