Commit 8a2b8575 authored by Valentin Platzgummer's avatar Valentin Platzgummer

thread issue seems to be solved

parent b9f31cb4
......@@ -16,6 +16,10 @@
#include <memory>
#define SMART_RTL_MAX_ATTEMPTS 3 // times
#define SMART_RTL_ATTEMPT_INTERVAL 200 // ms
#define EVENT_TIMER_INTERVAL 50 // ms
// const char* WimaController::wimaFileExtension = "wima";
......@@ -70,14 +74,14 @@ WimaController::WimaController(QObject *parent)
, _flightSpeed (settingsGroup, _metaDataMap[flightSpeedName])
, _arrivalReturnSpeed (settingsGroup, _metaDataMap[arrivalReturnSpeedName])
, _altitude (settingsGroup, _metaDataMap[altitudeName])
, _measurementPathLength (-1)
, _lowBatteryHandlingTriggered (false)
, _snakeCalcInProgress (false)
, _snakeTileWidth (settingsGroup, _metaDataMap[snakeTileWidthName])
, _snakeTileHeight (settingsGroup, _metaDataMap[snakeTileHeightName])
, _snakeMinTileArea (settingsGroup, _metaDataMap[snakeMinTileAreaName])
, _snakeLineDistance (settingsGroup, _metaDataMap[snakeLineDistanceName])
, _snakeMinTransectLength (settingsGroup, _metaDataMap[snakeMinTransectLengthName])
, _measurementPathLength (-1)
, _lowBatteryHandlingTriggered (false)
, _snakeCalcInProgress (false)
, _nemoHeartbeat (0 /*status: not connected*/)
, _fallbackStatus (0 /*status: not connected*/)
, _pRosBridge (new ROSBridge::ROSBridge())
......
......@@ -43,10 +43,6 @@
#include <map>
#define SMART_RTL_MAX_ATTEMPTS 3 // times
#define SMART_RTL_ATTEMPT_INTERVAL 200 // ms
#define EVENT_TIMER_INTERVAL 50 // ms
using namespace snake;
......@@ -400,8 +396,6 @@ private:
// Snake
bool _snakeCalcInProgress;
bool _snakeRecalcNecessary;
bool _scenarioDefinedBool;
SnakeWorker _snakeWorker;
Scenario _scenario;
::GeoPoint3D _snakeOrigin;
......
......@@ -328,9 +328,9 @@ template <class IntType = long, template <class, class...> class ContainterType
class GenericProgress : public MessageBaseClass{
public:
typedef MessageGroups::ProgressGroup Group;
GenericProgress(){}
GenericProgress(const ContainterType<IntType> &progress) :_progress(progress){}
GenericProgress(const GenericProgress &p) :_progress(p.progress()){}
GenericProgress() : MessageBaseClass() {}
GenericProgress(const ContainterType<IntType> &progress) : MessageBaseClass(), _progress(progress){}
GenericProgress(const GenericProgress &p) :MessageBaseClass(), _progress(p.progress()){}
~GenericProgress() {}
virtual GenericProgress *Clone() const override { return new GenericProgress(*this); }
......
......@@ -53,7 +53,7 @@ void RosbridgeWsClient::start(const std::__cxx11::string &client_name, std::shar
#ifdef DEBUG
std::thread client_thread([client_name, client]() {
#else
std::thread client_thread([client, client_name]() {
std::thread client_thread([client]() {
#endif
client->start();
......@@ -64,8 +64,10 @@ void RosbridgeWsClient::start(const std::__cxx11::string &client_name, std::shar
client->on_message = NULL;
client->on_close = NULL;
client->on_error = NULL;
#ifdef DEBUG
std::cout << "start" << client_name << " client reference count:" << client.use_count() << std::endl;
std::cout << client_name << " thread end" << std::endl;
#endif
});
client_thread.detach();
......@@ -89,28 +91,28 @@ RosbridgeWsClient::~RosbridgeWsClient()
bool RosbridgeWsClient::connected(){
// auto p = std::make_shared<std::promise<void>>();
// auto future = p->get_future();
// auto status_client = std::make_shared<WsClient>(server_port_path);
// status_client->on_open = [p](std::shared_ptr<WsClient::Connection>) {
// p->set_value();
// };
// std::thread t([status_client]{
// status_client->start();
// status_client->on_open = NULL;
// status_client->on_message = NULL;
// status_client->on_close = NULL;
// status_client->on_error = NULL;
// });
// auto status = future.wait_for(std::chrono::milliseconds(20));
// status_client->stop();
// t.join();
// bool connected = status == std::future_status::ready;
// //std::cout << "connected(): " << connected << std::endl;
// return connected;
auto p = std::make_shared<std::promise<void>>();
auto future = p->get_future();
auto status_client = std::make_shared<WsClient>(server_port_path);
status_client->on_open = [p](std::shared_ptr<WsClient::Connection>) {
p->set_value();
};
std::thread t([status_client]{
status_client->start();
status_client->on_open = NULL;
status_client->on_message = NULL;
status_client->on_close = NULL;
status_client->on_error = NULL;
});
auto status = future.wait_for(std::chrono::milliseconds(20));
status_client->stop();
t.join();
bool connected = status == std::future_status::ready;
//std::cout << "connected(): " << connected << std::endl;
return connected;
return true;
}
......@@ -142,29 +144,6 @@ std::shared_ptr<WsClient> RosbridgeWsClient::getClient(const std::string &client
}
void RosbridgeWsClient::stopClient(const std::string &client_name)
{
std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end())
{
it->second->stop();
it->second->on_open = NULL;
it->second->on_message = NULL;
it->second->on_close = NULL;
it->second->on_error = NULL;
#ifdef DEBUG
std::cout << client_name << " has been stopped" << std::endl;
#endif
}
#ifdef DEBUG
else
{
std::cerr << client_name << " has not been created" << std::endl;
}
#endif
}
void RosbridgeWsClient::removeClient(const std::string &client_name)
{
std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
......@@ -176,23 +155,21 @@ void RosbridgeWsClient::removeClient(const std::string &client_name)
#ifdef DEBUG
std::thread t([client, client_name](){
#else
std::thread t([client, client_name](){
std::thread t([client](){
#endif
std::this_thread::sleep_for(std::chrono::milliseconds(100));
client->stop();
client->on_open = NULL;
client->on_message = NULL;
client->on_close = NULL;
client->on_error = NULL;
std::cout << "removeClient thread: " << client_name << " reference count: " << client.use_count() << std::endl;
std::cout << client_name << " has been removed" << std::endl;
// The next lines of code seem to cause a double free or corruption error, why?
// client->on_open = NULL;
// client->on_message = NULL;
// client->on_close = NULL;
// client->on_error = NULL;
#ifdef DEBUG
std::cout << "removeClient thread: " << client_name << " reference count: " << client.use_count() << std::endl;
std::cout << client_name << " has been removed" << std::endl;
#endif
});
client_map.erase(it);
t.detach();
std::cout << "removeClient: " << client_name << " reference count: " << client.use_count() << std::endl;
}
#ifdef DEBUG
else
......@@ -202,6 +179,28 @@ void RosbridgeWsClient::removeClient(const std::string &client_name)
#endif
}
void RosbridgeWsClient::removeClient(const std::string &client_name)
{
stopClient(client_name);
{
std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end())
{
client_map.erase(it);
#ifdef DEBUG
std::cout << "removeClient: " << client_name << " reference count: " << client.use_count() << std::endl;
#endif
}
#ifdef DEBUG
else
{
std::cerr << client_name << " has not been created" << std::endl;
}
#endif
}
}
std::string RosbridgeWsClient::getAdvertisedTopics(){
auto pPromise(std::make_shared<std::promise<std::string>>());
auto future = pPromise->get_future();
......@@ -211,7 +210,7 @@ std::string RosbridgeWsClient::getAdvertisedTopics(){
pPromise->set_value(in_message->string());
connection->send_close(1000);
});
auto status = future.wait_for(std::chrono::milliseconds(25)); // enought?
auto status = future.wait_for(std::chrono::milliseconds(20)); // enought?
return status == std::future_status::ready ? future.get() : std::string();
}
......@@ -691,7 +690,7 @@ void RosbridgeWsClient::waitForService(const std::string &service, const std::sh
if ( serviceAvailable(service) ){
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Avoid excessive polling.
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Avoid excessive polling.
};
#ifdef DEBUG
auto e = std::chrono::high_resolution_clock::now();
......@@ -723,7 +722,7 @@ void RosbridgeWsClient::waitForTopic(const std::string &topic, const std::shared
if ( topicAvailable(topic) ){
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Avoid excessive polling.
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Avoid excessive polling.
};
#ifdef DEBUG
auto e = std::chrono::high_resolution_clock::now();
......
......@@ -25,17 +25,21 @@ void ROSBridge::ComPrivate::TopicPublisher::start()
// Init.
std::unordered_map<std::string, std::string> topicMap;
// Main Loop.
while( !this->_stopped->load()){
std::unique_lock<std::mutex> lk(this->_mutex);
// Check if new data available, wait if not.
if (this->_queue.empty()){
this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case.
continue;
while( !this->_stopped->load() ){
JsonDocUPtr pJsonDoc;
{
std::unique_lock<std::mutex> lk(this->_mutex);
// Check if new data available, wait if not.
if (this->_queue.empty()){
if ( _stopped->load() ) // Check condition again while holding the lock.
break;
this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case.
continue;
}
// Pop message from queue.
pJsonDoc = std::move(this->_queue.front());
this->_queue.pop_front();
}
// Pop message from queue.
JsonDocUPtr pJsonDoc(std::move(this->_queue.front()));
this->_queue.pop_front();
lk.unlock();
// Get tag from Json message and remove it.
Tag tag;
......@@ -78,15 +82,20 @@ void ROSBridge::ComPrivate::TopicPublisher::reset()
{
if ( _stopped->load() ) // stop called while thread not running.
return;
std::cout << "TopicPublisher: _stopped->store(true)." << std::endl;
_stopped->store(true);
std::cout << "TopicPublisher: ask publisher thread to stop." << std::endl;
_cv.notify_one(); // Wake publisher thread.
{
std::lock_guard<std::mutex> lk(_mutex);
std::cout << "TopicPublisher: _stopped->store(true)." << std::endl;
_stopped->store(true);
std::cout << "TopicPublisher: ask publisher thread to stop." << std::endl;
_cv.notify_one(); // Wake publisher thread.
}
if ( !_pThread )
return;
_pThread->join();
std::cout << "TopicPublisher: publisher thread joined." << std::endl;
_queue.clear();
std::cout << "TopicPublisher: queue cleard." << std::endl;
{
_queue.clear();
std::cout << "TopicPublisher: queue cleard." << std::endl;
}
}
......@@ -46,16 +46,16 @@ void ROSBridge::ROSBridge::reset()
auto start = std::chrono::high_resolution_clock::now();
std::cout << "ROSBridge::reset: reset publisher" << std::endl;
_topicPublisher.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
//std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::cout << "ROSBridge::reset: reset subscriber" << std::endl;
_topicSubscriber.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
//std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::cout << "ROSBridge::reset: reset server" << std::endl;
_server.reset();
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
//std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::cout << "ROSBridge::reset: _stopped->store(true)" << std::endl;
_stopped->store(true);
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
//std::this_thread::sleep_for(std::chrono::milliseconds(2000));
auto end = std::chrono::high_resolution_clock::now();
auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count();
std::cout << "ROSBridge reset duration: " << delta << " ms" << std::endl;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment