Commit b02ca686 authored by Valentin Platzgummer's avatar Valentin Platzgummer

123

parent 32ddf1bc
...@@ -79,8 +79,8 @@ WimaController::WimaController(QObject *parent) ...@@ -79,8 +79,8 @@ WimaController::WimaController(QObject *parent)
, _snakeMinTileArea (settingsGroup, _metaDataMap[snakeMinTileAreaName]) , _snakeMinTileArea (settingsGroup, _metaDataMap[snakeMinTileAreaName])
, _snakeLineDistance (settingsGroup, _metaDataMap[snakeLineDistanceName]) , _snakeLineDistance (settingsGroup, _metaDataMap[snakeLineDistanceName])
, _snakeMinTransectLength (settingsGroup, _metaDataMap[snakeMinTransectLengthName]) , _snakeMinTransectLength (settingsGroup, _metaDataMap[snakeMinTransectLengthName])
, _measurementPathLength (-1)
, _lowBatteryHandlingTriggered (false) , _lowBatteryHandlingTriggered (false)
, _measurementPathLength (-1)
, _snakeCalcInProgress (false) , _snakeCalcInProgress (false)
, _nemoHeartbeat (0 /*status: not connected*/) , _nemoHeartbeat (0 /*status: not connected*/)
, _fallbackStatus (0 /*status: not connected*/) , _fallbackStatus (0 /*status: not connected*/)
......
...@@ -78,21 +78,22 @@ RosbridgeWsClient::RosbridgeWsClient(const std::__cxx11::string &server_port_pat ...@@ -78,21 +78,22 @@ RosbridgeWsClient::RosbridgeWsClient(const std::__cxx11::string &server_port_pat
, stopped(std::make_shared<std::atomic_bool>(false)) , stopped(std::make_shared<std::atomic_bool>(false))
, hasConnection(std::make_shared<std::atomic_bool>(false)) , hasConnection(std::make_shared<std::atomic_bool>(false))
{ {
// Start thread to monitor connection status. // Start poll thread to monitor connection status, advertised topics etc.
// There might be a better way to do this. pPoll_thread = std::make_shared<std::thread> ([this] {
std::thread t([this] () { const auto poll_interval = std::chrono::milliseconds(1000);
const auto t_max = std::chrono::milliseconds(500); while ( !this->stopped->load() ) {
while ( !this->stopped->load() ){
auto start = std::chrono::high_resolution_clock::now(); auto start = std::chrono::high_resolution_clock::now();
auto pPromise = std::make_shared<std::promise<void>>(); auto pPromise_status = std::make_shared<std::promise<void>>();
auto future = pPromise->get_future();
// Check connection status.
auto future_status = pPromise_status->get_future();
auto status_client = std::make_shared<WsClient>(this->server_port_path); auto status_client = std::make_shared<WsClient>(this->server_port_path);
status_client->on_open = [pPromise](std::shared_ptr<WsClient::Connection>) { status_client->on_open = [pPromise_status](std::shared_ptr<WsClient::Connection>) {
pPromise->set_value(); pPromise_status->set_value();
}; };
std::thread poll_thread([status_client]{ std::thread status_thread([status_client]{
status_client->start(); status_client->start();
status_client->on_open = NULL; status_client->on_open = NULL;
status_client->on_message = NULL; status_client->on_message = NULL;
...@@ -100,20 +101,53 @@ RosbridgeWsClient::RosbridgeWsClient(const std::__cxx11::string &server_port_pat ...@@ -100,20 +101,53 @@ RosbridgeWsClient::RosbridgeWsClient(const std::__cxx11::string &server_port_pat
status_client->on_error = NULL; status_client->on_error = NULL;
}); });
auto status = future_status.wait_for(std::chrono::milliseconds(200));
auto status = future.wait_for(std::chrono::milliseconds(t_max));
status_client->stop(); status_client->stop();
poll_thread.join(); status_thread.join();
this->hasConnection->store(status == std::future_status::ready); this->hasConnection->store(status == std::future_status::ready);
if ( this->hasConnection->load() ){
// Fetch available topics
auto pPromise_topics(std::make_shared<std::promise<std::string>>());
auto future_topics = pPromise_topics->get_future();
this->callService("/rosapi/topics", [pPromise_topics](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
pPromise_topics->set_value(in_message->string());
connection->send_close(1000);
});
auto status_topics = future_topics.wait_for(std::chrono::milliseconds(200));
// Fetch available services
auto pPromise_services(std::make_shared<std::promise<std::string>>());
auto future_services = pPromise_services->get_future();
this->callService("/rosapi/services", [pPromise_services](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
pPromise_services->set_value(in_message->string());
connection->send_close(1000);
});
auto status_services = future_services.wait_for(std::chrono::milliseconds(200));
// Store topics and services
{
std::lock_guard<std::mutex> lk(this->mutex);
this->available_topics = status_topics == std::future_status::ready ?
future_topics.get() : std::string();
this->available_services = status_services == std::future_status::ready ?
future_services.get() : std::string();
}
}
auto end = std::chrono::high_resolution_clock::now(); auto end = std::chrono::high_resolution_clock::now();
auto t_sleep = std::chrono::milliseconds(t_max)-(end-start); auto t_sleep = poll_interval-(end-start);
if ( t_sleep > std::chrono::milliseconds(0)){ if ( t_sleep > std::chrono::milliseconds(0)){
std::this_thread::sleep_for(t_sleep); std::this_thread::sleep_for(t_sleep);
} }
} }
std::cout << "connection monitor thread end" << std::endl; std::cout << "connection monitor thread end" << std::endl;
}); });
t.detach();
} }
RosbridgeWsClient::~RosbridgeWsClient() RosbridgeWsClient::~RosbridgeWsClient()
...@@ -126,6 +160,7 @@ RosbridgeWsClient::~RosbridgeWsClient() ...@@ -126,6 +160,7 @@ RosbridgeWsClient::~RosbridgeWsClient()
{ {
removeClient(client.first); removeClient(client.first);
} }
pPoll_thread->join();
} }
bool RosbridgeWsClient::connected(){ bool RosbridgeWsClient::connected(){
...@@ -220,38 +255,24 @@ void RosbridgeWsClient::removeClient(const std::string &client_name) ...@@ -220,38 +255,24 @@ void RosbridgeWsClient::removeClient(const std::string &client_name)
} }
std::string RosbridgeWsClient::getAdvertisedTopics(){ std::string RosbridgeWsClient::getAdvertisedTopics(){
auto pPromise(std::make_shared<std::promise<std::string>>()); std::lock_guard<std::mutex> lk(mutex);
auto future = pPromise->get_future(); return available_topics;
this->callService("/rosapi/topics", [pPromise](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
pPromise->set_value(in_message->string());
connection->send_close(1000);
});
auto status = future.wait_for(std::chrono::milliseconds(20)); // enought?
return status == std::future_status::ready ? future.get() : std::string();
} }
std::string RosbridgeWsClient::getAdvertisedServices(){ std::string RosbridgeWsClient::getAdvertisedServices(){
auto pPromise(std::make_shared<std::promise<std::string>>()); std::lock_guard<std::mutex> lk(mutex);
auto future = pPromise->get_future(); return available_services;
// Call /rosapi/services to see if topic is already available.
this->callService("/rosapi/services", [pPromise](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
pPromise->set_value(in_message->string());
connection->send_close(1000);
});
auto status = future.wait_for(std::chrono::milliseconds(25)); // enought?
return status == std::future_status::ready ? future.get() : std::string();
} }
bool RosbridgeWsClient::topicAvailable(const std::string &topic){ bool RosbridgeWsClient::topicAvailable(const std::string &topic){
#ifdef DEBUG #ifdef DEBUG
std::cout << "checking if topic " << topic << " is available" << std::endl; std::cout << "checking if topic " << topic << " is available" << std::endl;
#endif #endif
std::string advertised_topics = this->getAdvertisedTopics(); size_t pos;
auto pos = advertised_topics.find(topic); {
std::lock_guard<std::mutex> lk(mutex);
pos = available_topics.find(topic);
}
return pos != std::string::npos ? true : false; return pos != std::string::npos ? true : false;
} }
...@@ -683,8 +704,11 @@ bool RosbridgeWsClient::serviceAvailable(const std::string &service) ...@@ -683,8 +704,11 @@ bool RosbridgeWsClient::serviceAvailable(const std::string &service)
#ifdef DEBUG #ifdef DEBUG
std::cout << "checking if service " << service << " is available" << std::endl; std::cout << "checking if service " << service << " is available" << std::endl;
#endif #endif
std::string advertised_services = this->getAdvertisedServices(); size_t pos;
auto pos = advertised_services.find(service); {
std::lock_guard<std::mutex> lk(mutex);
pos = available_services.find(service);
}
return pos != std::string::npos ? true : false; return pos != std::string::npos ? true : false;
} }
...@@ -699,15 +723,22 @@ void RosbridgeWsClient::waitForService(const std::string &service, const std::sh ...@@ -699,15 +723,22 @@ void RosbridgeWsClient::waitForService(const std::string &service, const std::sh
auto s = std::chrono::high_resolution_clock::now(); auto s = std::chrono::high_resolution_clock::now();
long counter = 0; long counter = 0;
#endif #endif
const auto poll_interval = std::chrono::milliseconds(1000);
auto end = std::chrono::high_resolution_clock::now() + poll_interval;
while( !stop->load() ) while( !stop->load() )
{ {
if ( std::chrono::high_resolution_clock::now() > end ){
#ifdef DEBUG #ifdef DEBUG
++counter; ++counter;
#endif #endif
if ( serviceAvailable(service) ){ if ( serviceAvailable(service) ){
break; break;
} else {
end = std::chrono::high_resolution_clock::now() + poll_interval;
}
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Avoid excessive polling.
}; };
#ifdef DEBUG #ifdef DEBUG
auto e = std::chrono::high_resolution_clock::now(); auto e = std::chrono::high_resolution_clock::now();
...@@ -731,15 +762,22 @@ void RosbridgeWsClient::waitForTopic(const std::string &topic, const std::shared ...@@ -731,15 +762,22 @@ void RosbridgeWsClient::waitForTopic(const std::string &topic, const std::shared
auto s = std::chrono::high_resolution_clock::now(); auto s = std::chrono::high_resolution_clock::now();
long counter = 0; long counter = 0;
#endif #endif
const auto poll_interval = std::chrono::milliseconds(1000);
auto end = std::chrono::high_resolution_clock::now() + poll_interval;
while( !stop->load() ) while( !stop->load() )
{ {
if ( std::chrono::high_resolution_clock::now() > end ){
#ifdef DEBUG #ifdef DEBUG
++counter; ++counter;
#endif #endif
if ( topicAvailable(topic) ){ if ( topicAvailable(topic) ){
break; break;
} else {
end = std::chrono::high_resolution_clock::now() + poll_interval;
}
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Avoid excessive polling.
}; };
#ifdef DEBUG #ifdef DEBUG
auto e = std::chrono::high_resolution_clock::now(); auto e = std::chrono::high_resolution_clock::now();
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <mutex> #include <mutex>
#include <tuple> #include <tuple>
#include <deque> #include <deque>
#include <thread>
using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>; using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>;
using InMessage = std::function<void(std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::InMessage>)>; using InMessage = std::function<void(std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::InMessage>)>;
...@@ -53,6 +54,12 @@ class RosbridgeWsClient ...@@ -53,6 +54,12 @@ class RosbridgeWsClient
std::shared_ptr<std::atomic_bool> hasConnection; std::shared_ptr<std::atomic_bool> hasConnection;
std::shared_ptr<std::atomic_bool> stopped; std::shared_ptr<std::atomic_bool> stopped;
std::shared_ptr<std::thread> pPoll_thread;
std::string available_topics;
std::string available_services;
void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message); void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message);
public: public:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment