Commit 823c2823 authored by Valentin Platzgummer's avatar Valentin Platzgummer

thread errors now, error when unadvertising services

parent 5c6b9306
......@@ -135,28 +135,29 @@ public:
// The execution can take up to 100 ms!
bool connected(){
auto p = std::make_shared<std::promise<void>>();
auto future = p->get_future();
auto callback = [](std::shared_ptr<WsClient::Connection> connection, std::shared_ptr<std::promise<void>> p) {
p->set_value();
connection->send_close(1000);
};
std::shared_ptr<WsClient> status_client = std::make_shared<WsClient>(server_port_path);
status_client->on_open = std::bind(callback, std::placeholders::_1, p);
std::async([status_client]{
status_client->start();
status_client->on_open = NULL;
status_client->on_message = NULL;
status_client->on_close = NULL;
status_client->on_error = NULL;
});
auto status = future.wait_for(std::chrono::milliseconds(100));
return status == std::future_status::ready;
// auto p = std::make_shared<std::promise<void>>();
// auto future = p->get_future();
// auto status_client = std::make_shared<WsClient>(server_port_path);
// status_client->on_open = [p](std::shared_ptr<WsClient::Connection>) {
// p->set_value();
// };
// std::thread t([status_client]{
// status_client->start();
// status_client->on_open = NULL;
// status_client->on_message = NULL;
// status_client->on_close = NULL;
// status_client->on_error = NULL;
// });
// auto status = future.wait_for(std::chrono::milliseconds(20));
// status_client->stop();
// t.join();
// bool connected = status == std::future_status::ready;
// //std::cout << "connected(): " << connected << std::endl;
// return connected;
return true;
}
// Adds a client to the client_map.
......@@ -234,6 +235,7 @@ public:
std::cout << client_name << " has been removed" << std::endl;
#endif
});
it->second.reset();
client_map.erase(it);
t.detach();
}
......@@ -329,7 +331,7 @@ public:
#ifdef DEBUG
client->on_open = [this, topic, message, client_name](std::shared_ptr<WsClient::Connection> connection) {
#else
client->on_open = [this, topic, message, ready](std::shared_ptr<WsClient::Connection> connection) {
client->on_open = [this, topic, message](std::shared_ptr<WsClient::Connection> connection) {
#endif
#ifdef DEBUG
......
......@@ -88,9 +88,14 @@ void ROSBridge::ComPrivate::Server::reset()
{
if ( _stopped->load() )
return;
std::cout << "Server: _stopped->store(true)" << std::endl;
_stopped->store(true);
for (const auto& item : _serviceMap){
std::cout << "Server: unadvertiseService " << item.second << std::endl;
_rbc.unadvertiseService(item.second);
std::cout << "Server: removeClient " << item.first << std::endl;
_rbc.removeClient(item.first);
}
std::cout << "Server: _serviceMap cleared " << std::endl;
_serviceMap.clear();
}
......@@ -69,6 +69,8 @@ void ROSBridge::ComPrivate::TopicPublisher::start()
this->_rbc.unadvertise(it.second);
this->_rbc.removeClient(it.first);
}
std::cout << "TopicPublisher: publisher thread terminated." << std::endl;
});
}
......@@ -76,11 +78,15 @@ void ROSBridge::ComPrivate::TopicPublisher::reset()
{
if ( _stopped->load() ) // stop called while thread not running.
return;
std::cout << "TopicPublisher: _stopped->store(true)." << std::endl;
_stopped->store(true);
std::cout << "TopicPublisher: ask publisher thread to stop." << std::endl;
_cv.notify_one(); // Wake publisher thread.
if ( !_pThread )
return;
_pThread->join();
std::cout << "TopicPublisher: publisher thread joined." << std::endl;
_queue.clear();
std::cout << "TopicPublisher: queue cleard." << std::endl;
}
......@@ -23,14 +23,18 @@ void ROSBridge::ComPrivate::TopicSubscriber::start()
void ROSBridge::ComPrivate::TopicSubscriber::reset()
{
if ( !_stopped->load() ){
std::cout << "TopicSubscriber: _stopped->store(true) " << std::endl;
_stopped->store(true);
{
for (auto &item : _topicMap){
std::cout << "TopicSubscriber: unsubscribe " << item.second << std::endl;
_rbc.unsubscribe(item.second);
std::cout << "TopicSubscriber: removeClient " << item.first << std::endl;
_rbc.removeClient(item.first);
}
}
_topicMap.clear();
std::cout << "TopicSubscriber: _topicMap cleared " << std::endl;
}
}
......
......@@ -43,9 +43,13 @@ void ROSBridge::ROSBridge::start()
void ROSBridge::ROSBridge::reset()
{
std::cout << "ROSBridge::reset: reset publisher" << std::endl;
_topicPublisher.reset();
std::cout << "ROSBridge::reset: reset subscriber" << std::endl;
_topicSubscriber.reset();
std::cout << "ROSBridge::reset: reset server" << std::endl;
_server.reset();
std::cout << "ROSBridge::reset: _stopped->store(true)" << std::endl;
_stopped->store(true);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment