From b02ca6869aa65ec60c95c5b2e5d4c6ab8539b539 Mon Sep 17 00:00:00 2001 From: Valentin Platzgummer Date: Fri, 7 Aug 2020 19:10:40 +0200 Subject: [PATCH] 123 --- src/Wima/WimaController.cc | 2 +- .../ros_bridge/include/RosBridgeClient.cpp | 134 +++++++++++------- src/comm/ros_bridge/include/RosBridgeClient.h | 7 + 3 files changed, 94 insertions(+), 49 deletions(-) diff --git a/src/Wima/WimaController.cc b/src/Wima/WimaController.cc index 6943e7b4c..9daf5ee3d 100644 --- a/src/Wima/WimaController.cc +++ b/src/Wima/WimaController.cc @@ -79,8 +79,8 @@ WimaController::WimaController(QObject *parent) , _snakeMinTileArea (settingsGroup, _metaDataMap[snakeMinTileAreaName]) , _snakeLineDistance (settingsGroup, _metaDataMap[snakeLineDistanceName]) , _snakeMinTransectLength (settingsGroup, _metaDataMap[snakeMinTransectLengthName]) - , _measurementPathLength (-1) , _lowBatteryHandlingTriggered (false) + , _measurementPathLength (-1) , _snakeCalcInProgress (false) , _nemoHeartbeat (0 /*status: not connected*/) , _fallbackStatus (0 /*status: not connected*/) diff --git a/src/comm/ros_bridge/include/RosBridgeClient.cpp b/src/comm/ros_bridge/include/RosBridgeClient.cpp index 5977cec46..d72a58a27 100644 --- a/src/comm/ros_bridge/include/RosBridgeClient.cpp +++ b/src/comm/ros_bridge/include/RosBridgeClient.cpp @@ -78,21 +78,22 @@ RosbridgeWsClient::RosbridgeWsClient(const std::__cxx11::string &server_port_pat , stopped(std::make_shared(false)) , hasConnection(std::make_shared(false)) { - // Start thread to monitor connection status. - // There might be a better way to do this. - std::thread t([this] () { - const auto t_max = std::chrono::milliseconds(500); - while ( !this->stopped->load() ){ + // Start poll thread to monitor connection status, advertised topics etc. + pPoll_thread = std::make_shared ([this] { + const auto poll_interval = std::chrono::milliseconds(1000); + while ( !this->stopped->load() ) { auto start = std::chrono::high_resolution_clock::now(); - auto pPromise = std::make_shared>(); - auto future = pPromise->get_future(); + auto pPromise_status = std::make_shared>(); + + // Check connection status. + auto future_status = pPromise_status->get_future(); auto status_client = std::make_shared(this->server_port_path); - status_client->on_open = [pPromise](std::shared_ptr) { - pPromise->set_value(); + status_client->on_open = [pPromise_status](std::shared_ptr) { + pPromise_status->set_value(); }; - std::thread poll_thread([status_client]{ + std::thread status_thread([status_client]{ status_client->start(); status_client->on_open = NULL; status_client->on_message = NULL; @@ -100,20 +101,53 @@ RosbridgeWsClient::RosbridgeWsClient(const std::__cxx11::string &server_port_pat status_client->on_error = NULL; }); - - auto status = future.wait_for(std::chrono::milliseconds(t_max)); + auto status = future_status.wait_for(std::chrono::milliseconds(200)); status_client->stop(); - poll_thread.join(); + status_thread.join(); this->hasConnection->store(status == std::future_status::ready); + + if ( this->hasConnection->load() ){ + // Fetch available topics + auto pPromise_topics(std::make_shared>()); + auto future_topics = pPromise_topics->get_future(); + this->callService("/rosapi/topics", [pPromise_topics]( + std::shared_ptr connection, + std::shared_ptr in_message){ + pPromise_topics->set_value(in_message->string()); + connection->send_close(1000); + }); + auto status_topics = future_topics.wait_for(std::chrono::milliseconds(200)); + + // Fetch available services + auto pPromise_services(std::make_shared>()); + auto future_services = pPromise_services->get_future(); + this->callService("/rosapi/services", [pPromise_services]( + std::shared_ptr connection, + std::shared_ptr in_message){ + pPromise_services->set_value(in_message->string()); + connection->send_close(1000); + }); + auto status_services = future_services.wait_for(std::chrono::milliseconds(200)); + + // Store topics and services + { + std::lock_guard lk(this->mutex); + this->available_topics = status_topics == std::future_status::ready ? + future_topics.get() : std::string(); + this->available_services = status_services == std::future_status::ready ? + future_services.get() : std::string(); + + } + } + auto end = std::chrono::high_resolution_clock::now(); - auto t_sleep = std::chrono::milliseconds(t_max)-(end-start); + auto t_sleep = poll_interval-(end-start); if ( t_sleep > std::chrono::milliseconds(0)){ std::this_thread::sleep_for(t_sleep); } } std::cout << "connection monitor thread end" << std::endl; }); - t.detach(); } RosbridgeWsClient::~RosbridgeWsClient() @@ -126,6 +160,7 @@ RosbridgeWsClient::~RosbridgeWsClient() { removeClient(client.first); } + pPoll_thread->join(); } bool RosbridgeWsClient::connected(){ @@ -220,38 +255,24 @@ void RosbridgeWsClient::removeClient(const std::string &client_name) } std::string RosbridgeWsClient::getAdvertisedTopics(){ - auto pPromise(std::make_shared>()); - auto future = pPromise->get_future(); - this->callService("/rosapi/topics", [pPromise]( - std::shared_ptr connection, - std::shared_ptr in_message){ - pPromise->set_value(in_message->string()); - connection->send_close(1000); - }); - auto status = future.wait_for(std::chrono::milliseconds(20)); // enought? - return status == std::future_status::ready ? future.get() : std::string(); + std::lock_guard lk(mutex); + return available_topics; } std::string RosbridgeWsClient::getAdvertisedServices(){ - auto pPromise(std::make_shared>()); - auto future = pPromise->get_future(); - // Call /rosapi/services to see if topic is already available. - this->callService("/rosapi/services", [pPromise]( - std::shared_ptr connection, - std::shared_ptr in_message){ - pPromise->set_value(in_message->string()); - connection->send_close(1000); - }); - auto status = future.wait_for(std::chrono::milliseconds(25)); // enought? - return status == std::future_status::ready ? future.get() : std::string(); + std::lock_guard lk(mutex); + return available_services; } bool RosbridgeWsClient::topicAvailable(const std::string &topic){ #ifdef DEBUG std::cout << "checking if topic " << topic << " is available" << std::endl; #endif - std::string advertised_topics = this->getAdvertisedTopics(); - auto pos = advertised_topics.find(topic); + size_t pos; + { + std::lock_guard lk(mutex); + pos = available_topics.find(topic); + } return pos != std::string::npos ? true : false; } @@ -683,8 +704,11 @@ bool RosbridgeWsClient::serviceAvailable(const std::string &service) #ifdef DEBUG std::cout << "checking if service " << service << " is available" << std::endl; #endif - std::string advertised_services = this->getAdvertisedServices(); - auto pos = advertised_services.find(service); + size_t pos; + { + std::lock_guard lk(mutex); + pos = available_services.find(service); + } return pos != std::string::npos ? true : false; } @@ -699,15 +723,22 @@ void RosbridgeWsClient::waitForService(const std::string &service, const std::sh auto s = std::chrono::high_resolution_clock::now(); long counter = 0; #endif + const auto poll_interval = std::chrono::milliseconds(1000); + auto end = std::chrono::high_resolution_clock::now() + poll_interval; while( !stop->load() ) { + if ( std::chrono::high_resolution_clock::now() > end ){ #ifdef DEBUG - ++counter; + ++counter; #endif - if ( serviceAvailable(service) ){ - break; + if ( serviceAvailable(service) ){ + break; + } else { + end = std::chrono::high_resolution_clock::now() + poll_interval; + } + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Avoid excessive polling. }; #ifdef DEBUG auto e = std::chrono::high_resolution_clock::now(); @@ -731,15 +762,22 @@ void RosbridgeWsClient::waitForTopic(const std::string &topic, const std::shared auto s = std::chrono::high_resolution_clock::now(); long counter = 0; #endif + const auto poll_interval = std::chrono::milliseconds(1000); + auto end = std::chrono::high_resolution_clock::now() + poll_interval; while( !stop->load() ) { + if ( std::chrono::high_resolution_clock::now() > end ){ #ifdef DEBUG - ++counter; + ++counter; #endif - if ( topicAvailable(topic) ){ - break; + if ( topicAvailable(topic) ){ + break; + } else { + end = std::chrono::high_resolution_clock::now() + poll_interval; + } + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Avoid excessive polling. }; #ifdef DEBUG auto e = std::chrono::high_resolution_clock::now(); diff --git a/src/comm/ros_bridge/include/RosBridgeClient.h b/src/comm/ros_bridge/include/RosBridgeClient.h index e548d58c9..433a096ac 100644 --- a/src/comm/ros_bridge/include/RosBridgeClient.h +++ b/src/comm/ros_bridge/include/RosBridgeClient.h @@ -15,6 +15,7 @@ #include #include #include +#include using WsClient = SimpleWeb::SocketClient; using InMessage = std::function, std::shared_ptr)>; @@ -53,6 +54,12 @@ class RosbridgeWsClient std::shared_ptr hasConnection; std::shared_ptr stopped; + std::shared_ptr pPoll_thread; + std::string available_topics; + std::string available_services; + + + void start(const std::string& client_name, std::shared_ptr client, const std::string& message); public: -- 2.22.0