Commit 25095ced authored by Valentin Platzgummer's avatar Valentin Platzgummer

123

parent 3a20c1ae
......@@ -5,6 +5,14 @@
#include <thread>
#include <future>
struct Task{
std::function<bool(void)> condition; // Condition under which command should be executed.
std::function<void(void)> command; // Command to execute.
std::function<bool(void)> expired; // Returns true if the command is expired.
std::function<void(void)> clear_up; // operation to perform if task expired.
};
void RosbridgeWsClient::start(const std::__cxx11::string &client_name, std::shared_ptr<WsClient> client, const std::__cxx11::string &message)
{
#ifndef DEBUG
......@@ -101,43 +109,69 @@ RosbridgeWsClient::RosbridgeWsClient(const std::__cxx11::string &server_port_pat
status_client->on_error = NULL;
});
auto status = future_status.wait_for(std::chrono::milliseconds(200));
status_client->stop();
status_thread.join();
this->hasConnection->store(status == std::future_status::ready);
std::list<std::pair<std::tuple<bool(void)>, /*condition*/
std::function<void(void)>, /*command*/
std::function<void(void)> /*clear_up*/
>> to_do_list;
auto pConnection_task = std::make_shared<Task>;
pConnection_task->condition = [&future_status]{
return future_status.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
};
pConnection_task->command = [&status_client, &status_thread, &future_status, this]{
status_client->stop();
status_thread.join();
auto status = future_status.wait_for(std::chrono::milliseconds(0));
this->hasConnection->store(status == std::future_status::ready);
};
auto now = std::chrono::high_resolution_clock::now();
auto t_expire = now + std::chrono::seconds(1);
pConnection_task->expired = [t_expire]{
return std::chrono::high_resolution_clock::now() > t_expire;
}
pConnection_task->clear_up = []{
}
to_do_list.push_back(pConnection_task);
std::future<std::string> future_topics;
std::future<std::string> future_services;
if ( this->hasConnection->load() ){
// Fetch available topics
auto pPromise_topics(std::make_shared<std::promise<std::string>>());
auto future_topics = pPromise_topics->get_future();
future_topics = pPromise_topics->get_future();
this->callService("/rosapi/topics", [pPromise_topics](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
pPromise_topics->set_value(in_message->string());
connection->send_close(1000);
});
auto status_topics = future_topics.wait_for(std::chrono::milliseconds(200));
condition = [&future_topics]{
return future_topics.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
};
command = [&future_topics, this]{
std::lock_guard<std::mutex> lk(this->mutex);
this->available_topics = future_topics.get();
}
to_do_list.push_back(std::make_pair(condition, command));
// Fetch available services
auto pPromise_services(std::make_shared<std::promise<std::string>>());
auto future_services = pPromise_services->get_future();
future_services = pPromise_services->get_future();
this->callService("/rosapi/services", [pPromise_services](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
pPromise_services->set_value(in_message->string());
connection->send_close(1000);
});
auto status_services = future_services.wait_for(std::chrono::milliseconds(200));
// Store topics and services
{
condition = [&future_services]{
return future_services.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
};
command = [&future_services, this]{
std::lock_guard<std::mutex> lk(this->mutex);
this->available_topics = status_topics == std::future_status::ready ?
future_topics.get() : std::string();
this->available_services = status_services == std::future_status::ready ?
future_services.get() : std::string();
this->available_services = future_services.get();
}
to_do_list.push_back(std::make_pair(condition, command));
}
auto end = std::chrono::high_resolution_clock::now();
......
......@@ -44,7 +44,8 @@ class RosbridgeWsClient
ServiceTopicName = 1,
ClientName = 2,
WPClient = 3
};
};
struct Task;
const std::string server_port_path;
std::unordered_map<std::string /*client name*/,
......@@ -59,6 +60,12 @@ class RosbridgeWsClient
std::string available_services;
std::shared_ptr<std::thread> pWorker_thread;
std::list<std::shared_ptr<Task>> to_do_list;
std::mutex task_mutex;
void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message);
......
......@@ -54,6 +54,7 @@ void ROSBridge::ComPrivate::TopicSubscriber::subscribe(
}
_topicMap.insert(std::make_pair(clientName, std::string(topic)));
// Wrap callback.
using namespace std::placeholders;
auto callbackWrapper = [callback](
std::shared_ptr<WsClient::Connection>,
......@@ -88,6 +89,7 @@ void ROSBridge::ComPrivate::TopicSubscriber::subscribe(
if ( !this->_stopped->load() ){
this->_rbc.addClient(clientName);
this->_rbc.subscribe(clientName, topic, callbackWrapper);
std::cout << "TopicSubscriber: wait thread subscription successfull: " << clientName << std::endl;
}
std::cout << "TopicSubscriber: wait thread end, " << clientName << std::endl;
});
......@@ -95,6 +97,7 @@ void ROSBridge::ComPrivate::TopicSubscriber::subscribe(
} else {
_rbc.addClient(clientName);
_rbc.subscribe(clientName, topic, callbackWrapper);
std::cout << "TopicSubscriber: subscription successfull: " << clientName << std::endl;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment