Commit 46b4c65d authored by Valentin Platzgummer's avatar Valentin Platzgummer

RosBridgeClient imporved

parent 25095ced
......@@ -7,10 +7,11 @@
struct Task{
std::function<bool(void)> condition; // Condition under which command should be executed.
std::function<void(void)> command; // Command to execute.
std::function<bool(void)> ready; // Condition under which command should be executed.
std::function<void(void)> execute; // Command to execute.
std::function<bool(void)> expired; // Returns true if the command is expired.
std::function<void(void)> clear_up; // operation to perform if task expired.
std::string name;
};
void RosbridgeWsClient::start(const std::__cxx11::string &client_name, std::shared_ptr<WsClient> client, const std::__cxx11::string &message)
......@@ -83,104 +84,208 @@ void RosbridgeWsClient::start(const std::__cxx11::string &client_name, std::shar
RosbridgeWsClient::RosbridgeWsClient(const std::__cxx11::string &server_port_path) :
server_port_path(server_port_path)
, isConnected(std::make_shared<std::atomic_bool>(false))
, stopped(std::make_shared<std::atomic_bool>(false))
, hasConnection(std::make_shared<std::atomic_bool>(false))
{
// Start poll thread to monitor connection status, advertised topics etc.
pPoll_thread = std::make_shared<std::thread> ([this] {
const auto poll_interval = std::chrono::milliseconds(1000);
// Start periodic thread to monitor connection status, advertised topics etc.
periodic_thread = std::make_shared<std::thread> ([this] {
std::list<Task> task_list;
constexpr auto poll_interval = std::chrono::seconds(1);
auto poll_time_point = std::chrono::high_resolution_clock::now();
while ( !this->stopped->load() ) {
auto start = std::chrono::high_resolution_clock::now();
auto pPromise_status = std::make_shared<std::promise<void>>();
// Check connection status.
auto future_status = pPromise_status->get_future();
auto status_client = std::make_shared<WsClient>(this->server_port_path);
status_client->on_open = [pPromise_status](std::shared_ptr<WsClient::Connection>) {
pPromise_status->set_value();
};
std::thread status_thread([status_client]{
status_client->start();
status_client->on_open = NULL;
status_client->on_message = NULL;
status_client->on_close = NULL;
status_client->on_error = NULL;
});
std::list<std::pair<std::tuple<bool(void)>, /*condition*/
std::function<void(void)>, /*command*/
std::function<void(void)> /*clear_up*/
>> to_do_list;
auto pConnection_task = std::make_shared<Task>;
pConnection_task->condition = [&future_status]{
return future_status.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
};
pConnection_task->command = [&status_client, &status_thread, &future_status, this]{
status_client->stop();
status_thread.join();
auto status = future_status.wait_for(std::chrono::milliseconds(0));
this->hasConnection->store(status == std::future_status::ready);
};
auto now = std::chrono::high_resolution_clock::now();
auto t_expire = now + std::chrono::seconds(1);
pConnection_task->expired = [t_expire]{
return std::chrono::high_resolution_clock::now() > t_expire;
}
pConnection_task->clear_up = []{
}
to_do_list.push_back(pConnection_task);
std::future<std::string> future_topics;
std::future<std::string> future_services;
if ( this->hasConnection->load() ){
// Fetch available topics
auto pPromise_topics(std::make_shared<std::promise<std::string>>());
future_topics = pPromise_topics->get_future();
this->callService("/rosapi/topics", [pPromise_topics](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
pPromise_topics->set_value(in_message->string());
connection->send_close(1000);
// ====================================================================================
// Add tasks.
// ====================================================================================
if ( std::chrono::high_resolution_clock::now() > poll_time_point) {
poll_time_point = std::chrono::high_resolution_clock::now() + poll_interval;
std::cout << "Starting new poll." << std::endl;
std::cout << "connected: " << this->isConnected->load() << std::endl;
std::string reset_status_task_name = "reset_status_task";
// Add status task if necessary.
auto const it = std::find_if(task_list.begin(), task_list.end(),
[&reset_status_task_name](const Task &t){
return t.name == reset_status_task_name;
});
condition = [&future_topics]{
return future_topics.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
};
command = [&future_topics, this]{
std::lock_guard<std::mutex> lk(this->mutex);
this->available_topics = future_topics.get();
if ( it == task_list.end() ){
std::cout << "Adding status_task" << std::endl;
// Check connection status.
auto status_set = std::make_shared<std::atomic_bool>(false);
auto status_client = std::make_shared<WsClient>(this->server_port_path);
status_client->on_open = [status_set, this](std::shared_ptr<WsClient::Connection>) {
std::cout << "status_client opened" << std::endl;
this->isConnected->store(true);
status_set->store(true);
};
std::thread status_thread([status_client]{
status_client->start();
status_client->on_open = NULL;
status_client->on_message = NULL;
status_client->on_close = NULL;
status_client->on_error = NULL;
});
status_thread.detach();
// Create task to reset isConnected after one second.
Task reset_task;
reset_task.name = reset_status_task_name;
// condition
auto now = std::chrono::high_resolution_clock::now();
const auto t_trigger = now + std::chrono::seconds(1);
reset_task.ready = [t_trigger]{
return std::chrono::high_resolution_clock::now() > t_trigger;
};
// command
reset_task.execute = [status_client, status_set, this]{
status_client->stop();
this->isConnected->store(false);
status_set->store(true);
};
// expired
reset_task.expired = [status_set]{
return status_set->load();
};
// clear up
reset_task.clear_up = [status_client, this]{
status_client->stop();
};
task_list.push_back(reset_task);
}
to_do_list.push_back(std::make_pair(condition, command));
// Fetch available services
auto pPromise_services(std::make_shared<std::promise<std::string>>());
future_services = pPromise_services->get_future();
this->callService("/rosapi/services", [pPromise_services](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
pPromise_services->set_value(in_message->string());
connection->send_close(1000);
});
condition = [&future_services]{
return future_services.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
};
command = [&future_services, this]{
std::lock_guard<std::mutex> lk(this->mutex);
this->available_services = future_services.get();
if ( this->isConnected->load() ){
// Add available topics task if neccessary.
std::string reset_topics_task_name = "reset_topics_task";
auto const topics_it = std::find_if(task_list.begin(), task_list.end(), [&reset_topics_task_name](const Task &t){
return t.name == reset_topics_task_name;
});
if ( topics_it == task_list.end() ){
// Call /rosapi/topics service.
std::cout << "Adding reset_topics_task" << std::endl;
auto topics_set = std::make_shared<std::atomic_bool>(false);
this->callService("/rosapi/topics", [topics_set, this](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
std::cout << "/rosapi/topics: " << in_message->string() << std::endl;
std::unique_lock<std::mutex> lk(this->mutex);
this->available_topics = in_message->string();
lk.unlock();
topics_set->store(true);
connection->send_close(1000);
});
// Create task to reset topics after one second.
Task reset_task;
reset_task.name = reset_topics_task_name;
// condition
auto now = std::chrono::high_resolution_clock::now();
auto t_trigger = now + std::chrono::seconds(1);
reset_task.ready = [t_trigger]{
return std::chrono::high_resolution_clock::now() > t_trigger;
};
// command
reset_task.execute = [topics_set, this]{
std::unique_lock<std::mutex> lk(this->mutex);
this->available_topics.clear();
lk.unlock();
topics_set->store(true);
};
// expired
reset_task.expired = [topics_set]{
return topics_set->load();
};
// clear up
reset_task.clear_up = [this]{
return;
};
task_list.push_back(reset_task);
}
// Add available services task if neccessary.
std::string reset_services_name = "reset_services_task";
auto const services_it = std::find_if(task_list.begin(), task_list.end(), [&reset_services_name](const Task &t){
return t.name == reset_services_name;
});
if ( services_it == task_list.end() ){
// Call /rosapi/services service.
std::cout << "Adding reset_services_task" << std::endl;
auto services_set = std::make_shared<std::atomic_bool>(false);
this->callService("/rosapi/services", [this, services_set](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
std::cout << "/rosapi/services: " << in_message->string() << std::endl;
std::unique_lock<std::mutex> lk(this->mutex);
this->available_services = in_message->string();
lk.unlock();
services_set->store(true);
connection->send_close(1000);
});
// Create task to reset services after one second.
Task reset_task;
reset_task.name = reset_services_name;
// condition
auto now = std::chrono::high_resolution_clock::now();
auto t_trigger = now + std::chrono::seconds(1);
reset_task.ready = [t_trigger]{
return std::chrono::high_resolution_clock::now() > t_trigger;
};
// command
reset_task.execute = [services_set, this]{
std::unique_lock<std::mutex> lk(this->mutex);
this->available_services.clear();
lk.unlock();
services_set->store(true);
};
// expired
reset_task.expired = [services_set]{
return services_set->load();
};
// clear up
reset_task.clear_up = [this]{
return;
};
task_list.push_back(reset_task);
}
} else {
std::lock_guard<std::mutex> lk(mutex);
available_topics.clear();
available_services.clear();
}
to_do_list.push_back(std::make_pair(condition, command));
}
auto end = std::chrono::high_resolution_clock::now();
auto t_sleep = poll_interval-(end-start);
if ( t_sleep > std::chrono::milliseconds(0)){
std::this_thread::sleep_for(t_sleep);
// ====================================================================================
// Process tasks.
// ====================================================================================
for ( auto task_it = task_list.begin(); task_it != task_list.end(); ){
std::cout << "processing task: " << task_it->name << std::endl;
if ( !task_it->expired() ){
if ( task_it->ready() ){
std::cout << "executing task: " << task_it->name << std::endl;
task_it->execute();
task_it = task_list.erase(task_it);
} else {
std::cout << "noting to do for task: " << task_it->name << std::endl;
++task_it;
}
} else {
std::cout << "task expired: " << task_it->name << std::endl;
task_it->clear_up();
task_it = task_list.erase(task_it);
}
std::cout << std::endl;
}
std::cout << "task list size: " << task_list.size() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
std::cout << "connection monitor thread end" << std::endl;
// Clear up remaining tasks.
for ( auto task_it = task_list.begin(); task_it != task_list.end(); ++task_it){
task_it->clear_up();
}
task_list.clear();
std::cout << "periodic thread end" << std::endl;
});
}
......@@ -194,13 +299,11 @@ RosbridgeWsClient::~RosbridgeWsClient()
{
removeClient(client.first);
}
pPoll_thread->join();
periodic_thread->join();
}
bool RosbridgeWsClient::connected(){
std::cout << "connected " << hasConnection->load() << std::endl;
return hasConnection->load();
return isConnected->load();
}
void RosbridgeWsClient::addClient(const std::string &client_name)
......
......@@ -44,26 +44,18 @@ class RosbridgeWsClient
ServiceTopicName = 1,
ClientName = 2,
WPClient = 3
};
struct Task;
};
const std::string server_port_path;
std::unordered_map<std::string /*client name*/,
std::shared_ptr<WsClient> /*client*/> client_map;
std::deque<EntryData> service_topic_list;
std::mutex mutex;
std::shared_ptr<std::atomic_bool> hasConnection;
std::shared_ptr<std::atomic_bool> isConnected;
std::shared_ptr<std::atomic_bool> stopped;
std::shared_ptr<std::thread> pPoll_thread;
std::string available_topics;
std::string available_services;
std::shared_ptr<std::thread> pWorker_thread;
std::list<std::shared_ptr<Task>> to_do_list;
std::mutex task_mutex;
std::shared_ptr<std::thread> periodic_thread;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment