From 25095cedd74d671895fead33ffb582a840372c61 Mon Sep 17 00:00:00 2001 From: Valentin Platzgummer Date: Tue, 18 Aug 2020 09:52:26 +0200 Subject: [PATCH] 123 --- .../ros_bridge/include/RosBridgeClient.cpp | 66 ++++++++++++++----- src/comm/ros_bridge/include/RosBridgeClient.h | 9 ++- .../ros_bridge/include/TopicSubscriber.cpp | 3 + 3 files changed, 61 insertions(+), 17 deletions(-) diff --git a/src/comm/ros_bridge/include/RosBridgeClient.cpp b/src/comm/ros_bridge/include/RosBridgeClient.cpp index d72a58a27..70cafb1b1 100644 --- a/src/comm/ros_bridge/include/RosBridgeClient.cpp +++ b/src/comm/ros_bridge/include/RosBridgeClient.cpp @@ -5,6 +5,14 @@ #include #include + +struct Task{ + std::function condition; // Condition under which command should be executed. + std::function command; // Command to execute. + std::function expired; // Returns true if the command is expired. + std::function clear_up; // operation to perform if task expired. +}; + void RosbridgeWsClient::start(const std::__cxx11::string &client_name, std::shared_ptr client, const std::__cxx11::string &message) { #ifndef DEBUG @@ -101,43 +109,69 @@ RosbridgeWsClient::RosbridgeWsClient(const std::__cxx11::string &server_port_pat status_client->on_error = NULL; }); - auto status = future_status.wait_for(std::chrono::milliseconds(200)); - status_client->stop(); - status_thread.join(); - this->hasConnection->store(status == std::future_status::ready); + std::list, /*condition*/ + std::function, /*command*/ + std::function /*clear_up*/ + >> to_do_list; + + auto pConnection_task = std::make_shared; + pConnection_task->condition = [&future_status]{ + return future_status.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + }; + pConnection_task->command = [&status_client, &status_thread, &future_status, this]{ + status_client->stop(); + status_thread.join(); + auto status = future_status.wait_for(std::chrono::milliseconds(0)); + this->hasConnection->store(status == std::future_status::ready); + }; + auto now = std::chrono::high_resolution_clock::now(); + auto t_expire = now + std::chrono::seconds(1); + pConnection_task->expired = [t_expire]{ + return std::chrono::high_resolution_clock::now() > t_expire; + } + pConnection_task->clear_up = []{ + + } + to_do_list.push_back(pConnection_task); + std::future future_topics; + std::future future_services; if ( this->hasConnection->load() ){ // Fetch available topics auto pPromise_topics(std::make_shared>()); - auto future_topics = pPromise_topics->get_future(); + future_topics = pPromise_topics->get_future(); this->callService("/rosapi/topics", [pPromise_topics]( std::shared_ptr connection, std::shared_ptr in_message){ pPromise_topics->set_value(in_message->string()); connection->send_close(1000); }); - auto status_topics = future_topics.wait_for(std::chrono::milliseconds(200)); + condition = [&future_topics]{ + return future_topics.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + }; + command = [&future_topics, this]{ + std::lock_guard lk(this->mutex); + this->available_topics = future_topics.get(); + } + to_do_list.push_back(std::make_pair(condition, command)); // Fetch available services auto pPromise_services(std::make_shared>()); - auto future_services = pPromise_services->get_future(); + future_services = pPromise_services->get_future(); this->callService("/rosapi/services", [pPromise_services]( std::shared_ptr connection, std::shared_ptr in_message){ pPromise_services->set_value(in_message->string()); connection->send_close(1000); }); - auto status_services = future_services.wait_for(std::chrono::milliseconds(200)); - - // Store topics and services - { + condition = [&future_services]{ + return future_services.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + }; + command = [&future_services, this]{ std::lock_guard lk(this->mutex); - this->available_topics = status_topics == std::future_status::ready ? - future_topics.get() : std::string(); - this->available_services = status_services == std::future_status::ready ? - future_services.get() : std::string(); - + this->available_services = future_services.get(); } + to_do_list.push_back(std::make_pair(condition, command)); } auto end = std::chrono::high_resolution_clock::now(); diff --git a/src/comm/ros_bridge/include/RosBridgeClient.h b/src/comm/ros_bridge/include/RosBridgeClient.h index 433a096ac..d9850c30f 100644 --- a/src/comm/ros_bridge/include/RosBridgeClient.h +++ b/src/comm/ros_bridge/include/RosBridgeClient.h @@ -44,7 +44,8 @@ class RosbridgeWsClient ServiceTopicName = 1, ClientName = 2, WPClient = 3 - }; + }; + struct Task; const std::string server_port_path; std::unordered_map pWorker_thread; + + std::list> to_do_list; + std::mutex task_mutex; + + void start(const std::string& client_name, std::shared_ptr client, const std::string& message); diff --git a/src/comm/ros_bridge/include/TopicSubscriber.cpp b/src/comm/ros_bridge/include/TopicSubscriber.cpp index 7e20672a2..2314c1126 100644 --- a/src/comm/ros_bridge/include/TopicSubscriber.cpp +++ b/src/comm/ros_bridge/include/TopicSubscriber.cpp @@ -54,6 +54,7 @@ void ROSBridge::ComPrivate::TopicSubscriber::subscribe( } _topicMap.insert(std::make_pair(clientName, std::string(topic))); + // Wrap callback. using namespace std::placeholders; auto callbackWrapper = [callback]( std::shared_ptr, @@ -88,6 +89,7 @@ void ROSBridge::ComPrivate::TopicSubscriber::subscribe( if ( !this->_stopped->load() ){ this->_rbc.addClient(clientName); this->_rbc.subscribe(clientName, topic, callbackWrapper); + std::cout << "TopicSubscriber: wait thread subscription successfull: " << clientName << std::endl; } std::cout << "TopicSubscriber: wait thread end, " << clientName << std::endl; }); @@ -95,6 +97,7 @@ void ROSBridge::ComPrivate::TopicSubscriber::subscribe( } else { _rbc.addClient(clientName); _rbc.subscribe(clientName, topic, callbackWrapper); + std::cout << "TopicSubscriber: subscription successfull: " << clientName << std::endl; } } -- 2.22.0