Commit 7308bf55 authored by Valentin Platzgummer's avatar Valentin Platzgummer

nemo interface improved

parent 29514d84
......@@ -463,7 +463,6 @@ HEADERS += \
src/MeasurementComplexItem/nemo_interface/Task.h \
src/MeasurementComplexItem/nemo_interface/TaskDispatcher.h \
src/MeasurementComplexItem/nemo_interface/tileHelper.h \
src/MeasurementComplexItem/routing.h \
src/comm/ros_bridge/include/messages/nemo_msgs/labeled_progress.h \
src/MeasurementComplexItem/nemo_interface/MeasurementTile.h \
src/QmlControls/QmlUnitsConversion.h \
......@@ -539,7 +538,6 @@ SOURCES += \
src/MeasurementComplexItem/nemo_interface/MeasurementTile.cpp \
src/MeasurementComplexItem/nemo_interface/Task.cpp \
src/MeasurementComplexItem/nemo_interface/TaskDispatcher.cpp \
src/MeasurementComplexItem/routing.cpp \
src/Vehicle/VehicleEscStatusFactGroup.cc \
src/MeasurementComplexItem/AreaData.cc \
src/api/QGCCorePlugin.cc \
......
......@@ -52,7 +52,11 @@ void TaskDispatcher::start() {
ULock lk1(this->_mutex);
if (!_running) {
this->_running = true;
_future = QtConcurrent::run([this]() mutable { return this->run(); });
auto p = std::make_shared<std::promise<void>>();
_threadFuture = p->get_future();
QtConcurrent::run([this, p = std::move(p)]() mutable {
return this->run(std::move(*p));
});
lk1.unlock();
}
}
......@@ -61,8 +65,8 @@ void TaskDispatcher::stop() {
ULock lk1(this->_mutex);
if (_running) {
this->_running = false;
_threadFuture.wait();
lk1.unlock();
this->_future.waitForFinished();
}
}
......@@ -88,7 +92,7 @@ std::size_t TaskDispatcher::pendingTasks() {
bool TaskDispatcher::idle() { return this->pendingTasks() == 0; }
void TaskDispatcher::run() {
void TaskDispatcher::run(std::promise<void> p) {
while (true) {
ULock lk1(this->_mutex);
......@@ -112,6 +116,7 @@ void TaskDispatcher::run() {
break;
}
}
p.set_value();
}
} // namespace nemo_interface
......@@ -56,13 +56,12 @@ public:
bool idle();
protected:
void run();
void run(std::promise<void> p);
private:
QFuture<void> _future;
std::deque<std::unique_ptr<Task>> _taskQueue;
std::deque<std::promise<QVariant>> _promiseQueue;
std::future<void> _threadFuture;
bool _running;
std::mutex _mutex;
};
......
......@@ -37,7 +37,7 @@ void Rosbridge::start() {
_impl->moveToThread(_t);
connect(_impl, &RosbridgeImpl::stateChanged, this,
&Rosbridge::stateChanged);
&Rosbridge::_onImplStateChanged);
connect(this, &Rosbridge::_start, _impl, &RosbridgeImpl::start);
connect(this, &Rosbridge::_stop, _impl, &RosbridgeImpl::stop);
......@@ -173,6 +173,26 @@ void Rosbridge::waitForTopic(const QString &topic) {
}
}
bool Rosbridge::waitForTopic(const QString &topic,
const std::function<bool()> &condition) {
std::future<bool> fut;
do {
fut = topicAvailable(topic);
// wait
while (true) {
auto status = fut.wait_for(std::chrono::milliseconds(5));
if (!condition()) {
return false;
}
if (status == std::future_status::ready) {
break;
}
}
} while (!fut.get());
return true;
}
void Rosbridge::advertiseService(const QString &service, const QString &type,
CallBackWReturn callback) {
emit _advertiseService(service, type, callback);
......@@ -244,6 +264,26 @@ void Rosbridge::waitForService(const QString &service) {
}
}
bool Rosbridge::waitForService(const QString &service,
const std::function<bool()> &condition) {
std::future<bool> fut;
do {
fut = serviceAvailable(service);
// wait
while (true) {
auto status = fut.wait_for(std::chrono::milliseconds(5));
if (!condition()) {
return false;
}
if (status == std::future_status::ready) {
break;
}
}
} while (!fut.get());
return true;
}
Rosbridge::STATE translate(RosbridgeImpl::STATE s) {
switch (s) {
case RosbridgeImpl::STATE::STOPPED:
......@@ -260,3 +300,4 @@ Rosbridge::STATE translate(RosbridgeImpl::STATE s) {
qCritical() << "unreachable branch!";
return Rosbridge::STATE::STOPPED;
}
void Rosbridge::_onImplStateChanged() { emit stateChanged(); }
......@@ -58,6 +58,8 @@ public:
void unsubscribeAllTopics();
void waitForTopic(const QString &topic);
bool waitForTopic(const QString &topic,
const std::function<bool()> &condition);
// Services.
void advertiseService(const QString &service, const QString &type,
......@@ -75,6 +77,8 @@ public:
std::future<QJsonObject> getAdvertisedServices();
void waitForService(const QString &service);
bool waitForService(const QString &service,
const std::function<bool()> &condition);
signals:
void stateChanged();
......@@ -99,6 +103,8 @@ signals:
const QJsonObject &args = QJsonObject());
void _unadvertiseService(const QString &service);
void _unadvertiseAllServices();
private slots:
void _onImplStateChanged();
private:
std::atomic<STATE> _state;
......
This diff is collapsed.
#ifndef ROUTING_H
#define ROUTING_H
#endif // ROUTING_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment