#include "NemoInterface.h" #include "QGCApplication.h" #include "QGCLoggingCategory.h" #include "QGCToolbox.h" #include "SettingsFact.h" #include "SettingsManager.h" #include "WimaSettings.h" #include #include #include #include #include #include "GenericSingelton.h" #include "geometry/MeasurementArea.h" #include "geometry/geometry.h" #include "nemo_interface/FutureWatcher.h" #include "nemo_interface/MeasurementTile.h" #include "nemo_interface/QNemoHeartbeat.h" #include "nemo_interface/TaskDispatcher.h" #include "ros_bridge/include/messages/geographic_msgs/geopoint.h" #include "ros_bridge/include/messages/nemo_msgs/heartbeat.h" #include "ros_bridge/include/messages/nemo_msgs/progress_array.h" #include "ros_bridge/include/messages/nemo_msgs/tile.h" #include "ros_bridge/include/messages/nemo_msgs/tile_array.h" #include "rosbridge/rosbridge.h" QGC_LOGGING_CATEGORY(NemoInterfaceLog, "NemoInterfaceLog") #define NO_HEARTBEAT_TIMEOUT 5000 // ms #define RESTART_INTERVAl 600000 // ms == 10 min #define RESTART_RETRY_INTERVAl 2000 // ms #define SYNC_INTERVAL 10000 // ms #define SYNC_RETRY_INTERVAL 2000 // ms static constexpr auto maxResponseTime = std::chrono::milliseconds(10000); static const char *progressTopic = "/nemo/progress"; static const char *heartbeatTopic = "/nemo/heartbeat"; using hrc = std::chrono::high_resolution_clock; using ROSBridgePtr = std::shared_ptr; typedef ros_bridge::messages::nemo_msgs::tile::GenericTile Tile; typedef std::map> TileMap; typedef std::map> TileMapConst; typedef ros_bridge::messages::nemo_msgs::heartbeat::Heartbeat Heartbeat; typedef nemo_interface::TaskDispatcher Dispatcher; typedef nemo_interface::FutureWatcher FutureWatcher; class NemoInterface::Impl { enum class STATE { STOPPED, START_BRIDGE, WEBSOCKET_DETECTED, TRY_SETUP, USER_SYNC, SYS_SYNC, READY, WEBSOCKET_TIMEOUT, HEARTBEAT_TIMEOUT }; public: Impl(NemoInterface *p); ~Impl(); void start(); void stop(); // Tile editing. // Functions that require communication to device. std::shared_future addTiles(const TilePtrArray &tileArray); std::shared_future removeTiles(const IDArray &idArray); std::shared_future clearTiles(); // Functions that don't require communication to device. TileArray getTiles(const IDArray &idArray) const; TileArray getAllTiles() const; LogicalArray containsTiles(const IDArray &idArray) const; std::size_t size() const; bool empty() const; // Progress. ProgressArray getProgress() const; ProgressArray getProgress(const IDArray &idArray) const; NemoInterface::STATUS status() const; bool running() const; // thread safe bool ready() const; // thread safe const QString &infoString() const; const QString &warningString() const; private: void _doTopicServiceSetup(); void _checkVersion(); void _subscribeProgressTopic(); void _subscribeHearbeatTopic(); void _doAction(); void _trySynchronize(); void _synchronizeIfNeccessary(); void _tryRestart(); bool _isSynchronized() const; bool _userSync() const; // thread safe bool _sysSync() const; // thread safe void _onFutureWatcherFinished(); // thread safe void _onHeartbeatTimeout(); // thread safe void _onRosbridgeStateChanged(); // called from dispatcher thread! QVariant _callAddTiles( std::shared_ptr>> pTileArray); // called from dispatcher thread! QVariant _callRemoveTiles(std::shared_ptr pIdArray); // called from dispatcher thread! QVariant _callClearTiles(); // called from dispatcher thread! QVariant _callGetProgress(std::shared_ptr pIdArray); QVariant _callGetAllProgress(); QVariant _callGetAllTiles(); QVariant _callGetVersion(); enum class CALL_NAME { ADD_TILES, REMOVE_TILES, CLEAR_TILES, GET_PROGRESS, GET_ALL_TILES, GET_ALL_PROGRESS, GET_VERSION }; QString _toString(CALL_NAME name); void _addTilesRemote( std::shared_ptr>> pTileArray, std::promise promise); void _addTilesRemote2(std::shared_ptr>> pTileArray, std::promise promise); void _setTiles(std::shared_ptr>> pTileArray, std::promise promise); void _setVersion(QString version, std::promise promise); void _removeTilesRemote(std::shared_ptr idArray, std::promise promise); void _clearTilesRemote(std::promise promise); void _updateProgress(std::shared_ptr pArray, std::promise promise); void _onHeartbeatReceived(const QNemoHeartbeat &hb, std::promise promise); void _setInfoString(const QString &info); void _setWarningString(const QString &warning); bool _setState(STATE newState); // not thread safe static bool _ready(STATE s); static bool _userSync(STATE s); static bool _sysSync(STATE s); static bool _running(STATE s); static NemoInterface::STATUS _status(STATE state); static QString _toString(STATE s); static QString _toString(NemoInterface::STATUS s); static QString _localVersion; QString _remoteVersion; std::atomic _state; std::atomic_bool _versionOK; std::atomic_bool _progressTopicOK; std::atomic_bool _heartbeatTopicOK; std::atomic _lastCall; ROSBridgePtr _pRosbridge; TileMap _remoteTiles; TileMapConst _localTiles; NemoInterface *const _parent; Dispatcher _dispatcher; QString _infoString; QString _warningString; QTimer _timeoutTimer; QTimer _syncTimer; QTimer _restartTimer; QNemoHeartbeat _lastHeartbeat; FutureWatcher _futureWatcher; }; QString NemoInterface::Impl::_localVersion("V_1.0"); using StatusMap = std::map; static StatusMap statusMap{ std::make_pair( NemoInterface::STATUS::NOT_CONNECTED, "Not Connected"), std::make_pair(NemoInterface::STATUS::ERROR, "ERROR"), std::make_pair(NemoInterface::STATUS::SYNC, "Synchronizing"), std::make_pair(NemoInterface::STATUS::READY, "Ready"), std::make_pair( NemoInterface::STATUS::TIMEOUT, "Timeout"), std::make_pair( NemoInterface::STATUS::WEBSOCKET_DETECTED, "Websocket Detected")}; NemoInterface::Impl::Impl(NemoInterface *p) : _state(STATE::STOPPED), _versionOK(false), _progressTopicOK(false), _heartbeatTopicOK(false), _parent(p) { // ROS Bridge. WimaSettings *wimaSettings = qgcApp()->toolbox()->settingsManager()->wimaSettings(); auto connectionStringFact = wimaSettings->rosbridgeConnectionString(); auto setConnectionString = [connectionStringFact, this] { auto connectionString = connectionStringFact->rawValue().toString(); bool wasRunning = this->running(); this->stop(); this->_pRosbridge = std::make_shared( QUrl(QString("ws://") + connectionString.toLocal8Bit().data())); if (wasRunning) { this->start(); } }; connect(connectionStringFact, &SettingsFact::rawValueChanged, setConnectionString); setConnectionString(); // Heartbeat timeout. connect(&this->_timeoutTimer, &QTimer::timeout, std::bind(&Impl::_onHeartbeatTimeout, this)); connect(this->_pRosbridge.get(), &Rosbridge::stateChanged, this->_parent, [this] { this->_onRosbridgeStateChanged(); }); connect(&this->_futureWatcher, &FutureWatcher::finished, this->_parent, [this] { this->_onFutureWatcherFinished(); }); connect(&this->_restartTimer, &QTimer::timeout, this->_parent, [this] { this->_tryRestart(); }); connect(&this->_syncTimer, &QTimer::timeout, this->_parent, [this] { this->_synchronizeIfNeccessary(); }); } NemoInterface::Impl::~Impl() { this->_pRosbridge->stop(); } void NemoInterface::Impl::start() { if (!running()) { this->_setState(STATE::START_BRIDGE); this->_doAction(); } } void NemoInterface::Impl::stop() { if (running()) { this->_setState(STATE::STOPPED); this->_doAction(); } } std::shared_future NemoInterface::Impl::addTiles(const TilePtrArray &tileArray) { using namespace nemo_interface; // qDebug() << "addTiles called"; if (tileArray.size() > 0) { // copy unknown tiles auto pTileArray = std::make_shared>>(); auto pIdArray = std::make_shared(); for (const auto *pTile : tileArray) { auto id = pTile->id(); const auto it = this->_localTiles.find(id); Q_ASSERT(it == _localTiles.end()); if (Q_LIKELY(it == _localTiles.end())) { auto pTileCopy = std::make_shared(pTile->coordinateList(), 0.0, id); _localTiles.insert(std::make_pair(id, pTileCopy)); pTileArray->push_back(pTileCopy); pIdArray->push_back(id); } else { qCDebug(NemoInterfaceLog) << "addTiles(): tile with id: " << pTile->id() << "already added."; } } if (pTileArray->size() > 0) { emit this->_parent->tilesChanged(); } // ready for send? if (pTileArray->size() > 0 && (this->ready() || this->_userSync())) { this->_setState(STATE::USER_SYNC); this->_doAction(); // create add tiles command. auto pTask = std::make_unique( std::bind(&Impl::_callAddTiles, this, pTileArray)); // dispatch command. auto ret = _dispatcher.dispatch(std::move(pTask)); auto addFuture = ret.share(); // create get progress cmd. pTask = std::make_unique([this, addFuture, pIdArray] { addFuture.wait(); if (addFuture.get().toBool()) { return this->_callGetProgress(pIdArray); } else { return QVariant(false); } }); // dispatch command. ret = _dispatcher.dispatch(std::move(pTask)); auto progressFuture = ret.share(); _futureWatcher.setFuture(progressFuture); return progressFuture; } } std::promise p; p.set_value(QVariant(false)); return p.get_future(); } std::shared_future NemoInterface::Impl::removeTiles(const IDArray &idArray) { using namespace nemo_interface; // qDebug() << "removeTiles called"; if (idArray.size() > 0) { // copy known ids auto pIdArray = std::make_shared(); for (const auto &id : idArray) { const auto it = this->_localTiles.find(id); Q_ASSERT(it != _localTiles.end()); if (Q_LIKELY(it != _localTiles.end())) { _localTiles.erase(it); pIdArray->push_back(id); } else { qCDebug(NemoInterfaceLog) << "removeTiles(): unknown id: " << id << "."; } } if (pIdArray->size() > 0) { emit this->_parent->tilesChanged(); } // ready for send? if (pIdArray->size() > 0 && (this->ready() || this->_userSync())) { this->_setState(STATE::USER_SYNC); this->_doAction(); // create command. auto cmd = std::make_unique( std::bind(&Impl::_callRemoveTiles, this, pIdArray)); // dispatch command and return. auto ret = _dispatcher.dispatch(std::move(cmd)); auto sfut = ret.share(); _futureWatcher.setFuture(sfut); return sfut; } } std::promise p; p.set_value(QVariant(false)); return p.get_future(); } std::shared_future NemoInterface::Impl::clearTiles() { using namespace nemo_interface; // qDebug() << "clearTiles called"; // clear local tiles (_localTiles) if (!_localTiles.empty()) { this->_localTiles.clear(); emit this->_parent->tilesChanged(); } if (this->ready() || this->_userSync()) { this->_setState(STATE::USER_SYNC); this->_doAction(); // create command. auto pTask = std::make_unique(std::bind(&Impl::_callClearTiles, this)); // dispatch command and return. auto ret = _dispatcher.dispatch(std::move(pTask)); auto sfut = ret.share(); _futureWatcher.setFuture(sfut); return sfut; } else { std::promise p; p.set_value(QVariant(false)); return p.get_future(); } } TileArray NemoInterface::Impl::getTiles(const IDArray &idArray) const { TileArray tileArray; if (this->ready()) { for (const auto &id : idArray) { const auto it = _remoteTiles.find(id); if (it != _remoteTiles.end()) { MeasurementTile copy; copy.setId(it->second->id()); copy.setProgress(it->second->progress()); copy.setPath(it->second->tile()); tileArray.append(std::move(copy)); } } } else { for (const auto &id : idArray) { const auto it = _localTiles.find(id); if (it != _localTiles.end()) { MeasurementTile copy; copy.setId(it->second->id()); copy.setProgress(it->second->progress()); copy.setPath(it->second->tile()); tileArray.append(std::move(copy)); } } } return tileArray; } TileArray NemoInterface::Impl::getAllTiles() const { TileArray tileArray; if (this->ready()) { for (const auto &entry : _remoteTiles) { auto pTile = entry.second; MeasurementTile copy; copy.setId(pTile->id()); copy.setProgress(pTile->progress()); copy.setPath(pTile->tile()); tileArray.append(std::move(copy)); } } else { for (const auto &entry : _localTiles) { auto pTile = entry.second; MeasurementTile copy; copy.setId(pTile->id()); copy.setProgress(pTile->progress()); copy.setPath(pTile->tile()); tileArray.append(std::move(copy)); } } return tileArray; } LogicalArray NemoInterface::Impl::containsTiles(const IDArray &idArray) const { LogicalArray logicalArray; for (const auto &id : idArray) { const auto &it = _localTiles.find(id); logicalArray.append(it != _localTiles.end()); } return logicalArray; } std::size_t NemoInterface::Impl::size() const { return _localTiles.size(); } bool NemoInterface::Impl::empty() const { return _localTiles.empty(); } ProgressArray NemoInterface::Impl::getProgress() const { ProgressArray progressArray; if (this->_isSynchronized()) { for (const auto &entry : _remoteTiles) { progressArray.append( LabeledProgress{entry.second->progress(), entry.second->id()}); } } else { for (const auto &entry : _localTiles) { progressArray.append( LabeledProgress{entry.second->progress(), entry.second->id()}); } } return progressArray; } ProgressArray NemoInterface::Impl::getProgress(const IDArray &idArray) const { ProgressArray progressArray; if (this->_isSynchronized()) { for (const auto &id : idArray) { const auto it = _remoteTiles.find(id); if (it != _remoteTiles.end()) { progressArray.append( LabeledProgress{it->second->progress(), it->second->id()}); } } } else { for (const auto &id : idArray) { const auto it = _localTiles.find(id); if (it != _localTiles.end()) { progressArray.append( LabeledProgress{it->second->progress(), it->second->id()}); } } } return progressArray; } NemoInterface::STATUS NemoInterface::Impl::status() const { return _status(this->_state); } bool NemoInterface::Impl::running() const { return _running(this->_state); } bool NemoInterface::Impl::ready() const { return _ready(this->_state.load()); } bool NemoInterface::Impl::_sysSync() const { return _sysSync(this->_state); } void NemoInterface::Impl::_onFutureWatcherFinished() { if (this->ready() || this->_userSync() || this->_sysSync()) { static long tries = 0; auto lastTransactionSuccessfull = _futureWatcher.result().toBool(); if (!lastTransactionSuccessfull) { ++tries; qCDebug(NemoInterfaceLog) << "last transaction unsuccessfull: " << _toString(_lastCall); if (tries < 5) { QTimer::singleShot(SYNC_RETRY_INTERVAL, this->_parent, [this] { this->_trySynchronize(); }); } else { _setWarningString("The last 5 transactions failed! Please check the " "connection and consider reseting the connection."); tries = 0; } } else { tries = 0; } } } void NemoInterface::Impl::_onHeartbeatTimeout() { this->_setState(STATE::HEARTBEAT_TIMEOUT); this->_doAction(); } void NemoInterface::Impl::_onRosbridgeStateChanged() { auto state = this->_pRosbridge->state(); if (state == Rosbridge::STATE::CONNECTED) { if (this->_state == STATE::START_BRIDGE || this->_state == STATE::WEBSOCKET_TIMEOUT) { this->_setState(STATE::WEBSOCKET_DETECTED); this->_doAction(); } } else if (state == Rosbridge::STATE::TIMEOUT) { if (this->_state == STATE::TRY_SETUP || this->_state == STATE::READY || this->_state == STATE::WEBSOCKET_DETECTED || this->_state == STATE::HEARTBEAT_TIMEOUT) { this->_setState(STATE::WEBSOCKET_TIMEOUT); this->_doAction(); } } } bool NemoInterface::Impl::_userSync() const { return _userSync(this->_state); } const QString &NemoInterface::Impl::infoString() const { return _infoString; } const QString &NemoInterface::Impl::warningString() const { return _warningString; } void NemoInterface::Impl::_updateProgress(std::shared_ptr pArray, std::promise promise) { // qDebug() << "_updateProgress called"; bool error = false; for (auto itLP = pArray->begin(); itLP != pArray->end();) { auto it = _remoteTiles.find(itLP->id()); if (Q_LIKELY(it != _remoteTiles.end())) { it->second->setProgress(itLP->progress()); ++itLP; } else { qCDebug(NemoInterfaceLog) << "_updateProgress(): tile with id " << itLP->id() << " not found."; itLP = pArray->erase(itLP); error = true; } } if (pArray->size() > 0) { emit _parent->progressChanged(*pArray); } promise.set_value(!error); } void NemoInterface::Impl::_onHeartbeatReceived(const QNemoHeartbeat &hb, std::promise promise) { _lastHeartbeat = hb; this->_timeoutTimer.start(NO_HEARTBEAT_TIMEOUT); if (this->_state == STATE::TRY_SETUP) { this->_setState(STATE::READY); this->_doAction(); } else if (this->_state == STATE::HEARTBEAT_TIMEOUT) { this->_setState(STATE::READY); this->_doAction(); } promise.set_value(true); } void NemoInterface::Impl::_setInfoString(const QString &info) { if (_infoString != info) { _infoString = info; emit this->_parent->infoStringChanged(); } } void NemoInterface::Impl::_setWarningString(const QString &warning) { if (_warningString != warning) { _warningString = warning; emit this->_parent->warningStringChanged(); } } void NemoInterface::Impl::_doTopicServiceSetup() {} void NemoInterface::Impl::_checkVersion() { auto pTask = std::make_unique([this] { // wait for service std::future fut; long tries = 0; long maxTries = 50; do { fut = this->_pRosbridge->serviceAvailable("/nemo/get_version"); // wait while (true) { auto status = fut.wait_for(std::chrono::milliseconds(5)); if (this->_dispatcher.isInterruptionRequested()) { return QVariant(false); } if (status == std::future_status::ready) { break; } } ++tries; if (tries > maxTries) { qCWarning(NemoInterfaceLog) << "_checkVersion: service /nemo/get_version not available."; bool value = QMetaObject::invokeMethod(this->_parent /* context */, [this]() { this->_setWarningString("Version checking failed."); }); Q_ASSERT(value == true); return QVariant(false); } } while (!fut.get()); // call service return this->_callGetVersion(); }); this->_dispatcher.dispatch(std::move(pTask)); } void NemoInterface::Impl::_subscribeProgressTopic() { auto pTask = std::make_unique([this] { // wait for service std::future fut; long tries = 0; long maxTries = 50; do { fut = this->_pRosbridge->topicAvailable(progressTopic); // wait while (true) { auto status = fut.wait_for(std::chrono::milliseconds(5)); if (this->_dispatcher.isInterruptionRequested()) { return QVariant(false); } if (status == std::future_status::ready) { break; } } ++tries; if (tries > maxTries) { qCWarning(NemoInterfaceLog) << "_subscribeProgressTopic: topic /nemo/progress not available."; bool value = QMetaObject::invokeMethod(this->_parent /* context */, [this]() { this->_setWarningString("Progress topic not available."); }); Q_ASSERT(value == true); return QVariant(false); } } while (!fut.get()); // subscribe this->_pRosbridge->subscribeTopic(progressTopic, [this]( const QJsonObject &o) { ros_bridge::messages::nemo_msgs::progress_array::ProgressArray progressArray; if (ros_bridge::messages::nemo_msgs::progress_array::fromJson( o, progressArray)) { // correct range errors of progress for (auto &lp : progressArray.progress_array()) { bool rangeError = false; if (lp.progress() < 0) { lp.setProgress(0); rangeError = true; } if (lp.progress() > 100) { lp.setProgress(100); rangeError = true; } if (rangeError) { qCWarning(NemoInterfaceLog) << "/nemo/progress progress out " "of range, value was set to: " << lp.progress(); } } auto p = std::make_shared(); *p = std::move(progressArray.progress_array()); std::promise promise; auto future = promise.get_future(); bool value = QMetaObject::invokeMethod( this->_parent, [this, p, promise = std::move(promise)]() mutable { this->_updateProgress(p, std::move(promise)); }); Q_ASSERT(value == true); future.wait(); } else { qCWarning(NemoInterfaceLog) << "/nemo/progress not able to " "create ProgressArray form json: " << o; } }); this->_progressTopicOK = true; bool value = QMetaObject::invokeMethod(this->_parent /* context */, [this]() { this->_doAction(); }); Q_ASSERT(value == true); return QVariant(true); }); this->_dispatcher.dispatch(std::move(pTask)); } void NemoInterface::Impl::_subscribeHearbeatTopic() { auto pTask = std::make_unique([this] { // wait for service std::future fut; long tries = 0; long maxTries = 50; do { fut = this->_pRosbridge->topicAvailable(heartbeatTopic); // wait while (true) { auto status = fut.wait_for(std::chrono::milliseconds(5)); if (this->_dispatcher.isInterruptionRequested()) { return QVariant(false); } if (status == std::future_status::ready) { break; } } ++tries; if (tries > maxTries) { qCWarning(NemoInterfaceLog) << "_subscribeHeartbeatTopic: topic /nemo/hearbeat not available."; bool value = QMetaObject::invokeMethod(this->_parent /* context */, [this]() { this->_setWarningString("Heartbeat topic not available."); }); Q_ASSERT(value == true); return QVariant(false); } } while (!fut.get()); // subscribe using namespace ros_bridge::messages; this->_pRosbridge->subscribeTopic( heartbeatTopic, [this](const QJsonObject &o) { nemo_msgs::heartbeat::Heartbeat heartbeat; if (nemo_msgs::heartbeat::fromJson(o, heartbeat)) { std::promise promise; auto future = promise.get_future(); bool value = QMetaObject::invokeMethod( this->_parent, [this, heartbeat, promise = std::move(promise)]() mutable { this->_onHeartbeatReceived(heartbeat, std::move(promise)); }); Q_ASSERT(value == true); future.wait(); } else { qCWarning(NemoInterfaceLog) << "/nemo/heartbeat not able to " "create Heartbeat form json: " << o; } }); this->_heartbeatTopicOK = true; bool value = QMetaObject::invokeMethod(this->_parent /* context */, [this]() { this->_doAction(); }); Q_ASSERT(value == true); return QVariant(true); }); this->_dispatcher.dispatch(std::move(pTask)); } void NemoInterface::Impl::_trySynchronize() { if ((this->_state == STATE::READY || this->_state == STATE::SYS_SYNC || this->_state == STATE::USER_SYNC) && !_isSynchronized()) { if (!_dispatcher.idle()) { QTimer::singleShot(SYNC_RETRY_INTERVAL, this->_parent, [this] { this->_trySynchronize(); }); qCWarning(NemoInterfaceLog) << "synchronization defered"; return; } qCWarning(NemoInterfaceLog) << "trying to synchronize"; this->_setState(STATE::SYS_SYNC); this->_doAction(); // create clear cmd. auto pTask = std::make_unique( std::bind(&Impl::_callClearTiles, this)); // dispatch command. Q_ASSERT(_dispatcher.pendingTasks() == 0); auto ret = _dispatcher.dispatch(std::move(pTask)); auto clearFuture = ret.share(); // create tile array. auto pTileArray = std::make_shared>>(); for (const auto &pair : _localTiles) { pTileArray->push_back(pair.second); } // create addTiles cmd. pTask = std::make_unique([this, pTileArray, clearFuture] { clearFuture.wait(); if (clearFuture.get().toBool()) { return this->_callAddTiles(pTileArray); } else { return QVariant(false); } }); // dispatch command. ret = _dispatcher.dispatch(std::move(pTask)); auto addFuture = ret.share(); // create GetAllProgress cmd. pTask = std::make_unique([this, addFuture] { addFuture.wait(); if (addFuture.get().toBool()) { return this->_callGetAllProgress(); } else { return QVariant(false); } }); // dispatch command. ret = _dispatcher.dispatch(std::move(pTask)); _futureWatcher.setFuture(ret.share()); } } void NemoInterface::Impl::_synchronizeIfNeccessary() { if (_dispatcher.idle()) { // create getAllTiles cmd. auto pTask = std::make_unique( [this] { return this->_callGetAllTiles(); }); // dispatch command. auto ret = _dispatcher.dispatch(std::move(pTask)); auto fut = ret.share(); _futureWatcher.setFuture(fut); _syncTimer.start(SYNC_INTERVAL); } else { _syncTimer.start(SYNC_RETRY_INTERVAL); } } void NemoInterface::Impl::_tryRestart() { if (this->running()) { if (_dispatcher.idle()) { qDebug() << "_tryRestart: restarting"; this->stop(); this->start(); _restartTimer.start(RESTART_INTERVAl); } else { _restartTimer.start(RESTART_RETRY_INTERVAl); } } } bool NemoInterface::Impl::_isSynchronized() const { return _localTiles.size() > 0 && _remoteTiles.size() > 0 && std::equal( _localTiles.begin(), _localTiles.end(), _remoteTiles.begin(), [](const auto &a, const auto &b) { return a.first == b.first; }); } void NemoInterface::Impl::_doAction() { static bool resetDone = false; switch (this->_state) { case STATE::STOPPED: _setWarningString(""); _setInfoString(""); this->_timeoutTimer.stop(); this->_restartTimer.stop(); this->_syncTimer.stop(); this->_clearTilesRemote(std::promise()); if (this->_pRosbridge->state() != Rosbridge::STATE::STOPPED) { this->_pRosbridge->stop(); } _versionOK = false; _progressTopicOK = false; _heartbeatTopicOK = false; break; case STATE::START_BRIDGE: this->_pRosbridge->start(); this->_restartTimer.start(RESTART_INTERVAl); break; case STATE::WEBSOCKET_DETECTED: resetDone = false; this->_setState(STATE::TRY_SETUP); this->_doAction(); break; case STATE::TRY_SETUP: if (!_versionOK) { this->_checkVersion(); } else if (!_progressTopicOK) { this->_subscribeProgressTopic(); } else if (!_heartbeatTopicOK) { this->_subscribeHearbeatTopic(); } else { this->_timeoutTimer.start(NO_HEARTBEAT_TIMEOUT); } break; case STATE::READY: this->_trySynchronize(); this->_syncTimer.start(SYNC_INTERVAL); break; case STATE::USER_SYNC: case STATE::SYS_SYNC: break; case STATE::HEARTBEAT_TIMEOUT: this->_clearTilesRemote(std::promise()); this->_syncTimer.stop(); break; case STATE::WEBSOCKET_TIMEOUT: if (!resetDone) { resetDone = true; this->_pRosbridge->stop(); this->_pRosbridge->start(); } this->_timeoutTimer.stop(); this->_syncTimer.stop(); this->_clearTilesRemote(std::promise()); _versionOK = false; _progressTopicOK = false; _heartbeatTopicOK = false; break; }; } QVariant NemoInterface::Impl::_callAddTiles( std::shared_ptr>> pTileArray) { // qDebug() << "_callAddTiles called"; this->_lastCall = CALL_NAME::ADD_TILES; // create json object QJsonArray jsonTileArray; for (auto &&tile : *pTileArray) { using namespace ros_bridge::messages; QJsonObject jsonTile; if (!nemo_msgs::tile::toJson(*tile, jsonTile)) { qCDebug(NemoInterfaceLog) << "addTiles(): not able to create json object: tile id: " << tile->id() << " progress: " << tile->progress() << " points: " << tile->tile(); } jsonTileArray.append(std::move(jsonTile)); } // for QJsonObject req; req["in_tile_array"] = std::move(jsonTileArray); // create response handler. auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); auto responseHandler = [promise_response](const QJsonObject &o) mutable { // check if transaction was successfull if (o.contains("success") && o["success"].isBool()) { promise_response->set_value(o["success"].toBool()); } else { qCWarning(NemoInterfaceLog) << "/nemo/add_tiles no \"success\" key or wrong type: " << o; promise_response->set_value(false); } }; // call service. this->_pRosbridge->callService("/nemo/add_tiles", responseHandler, req); // wait for response. auto tStart = hrc::now(); bool abort = true; do { auto status = future_response.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { abort = false; break; } } while (hrc::now() - tStart < maxResponseTime || this->_dispatcher.isInterruptionRequested()); if (abort) { qCWarning(NemoInterfaceLog) << "addTiles(): Websocket not responding to request."; return QVariant(false); } // transaction error? if (!future_response.get()) { return QVariant(false); } // add remote tiles (_remoteTiles) std::promise promise; auto future = promise.get_future(); bool value = QMetaObject::invokeMethod( this->_parent /* context */, [this, pTileArray, promise = std::move(promise)]() mutable { this->_addTilesRemote(pTileArray, std::move(promise)); }); Q_ASSERT(value == true); // return success return QVariant(future.get()); } QVariant NemoInterface::Impl::_callRemoveTiles(std::shared_ptr pIdArray) { // qDebug() << "_callRemoveTiles called"; this->_lastCall = CALL_NAME::REMOVE_TILES; // create json object QJsonArray jsonIdArray; for (auto &&id : *pIdArray) { using namespace ros_bridge::messages; jsonIdArray.append(id); } // for QJsonObject req; req["ids"] = std::move(jsonIdArray); // create response handler. auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); auto responseHandler = [promise_response](const QJsonObject &o) mutable { // check if transaction was successfull QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); if (o.contains("success") && o["success"].isBool()) { promise_response->set_value(o["success"].toBool()); } else { qCWarning(NemoInterfaceLog) << "/nemo/remove_tiles no \"success\" key or wrong type: " << msg; promise_response->set_value(false); } }; // call service. this->_pRosbridge->callService("/nemo/remove_tiles", responseHandler, req); // wait for response. auto tStart = hrc::now(); bool abort = true; do { auto status = future_response.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { abort = false; break; } } while (hrc::now() - tStart < maxResponseTime || this->_dispatcher.isInterruptionRequested()); if (abort) { qCWarning(NemoInterfaceLog) << "remove_tiles(): Websocket not responding to request."; return QVariant(false); } // transaction error? if (!future_response.get()) { return QVariant(false); } // remove remote tiles (_remoteTiles) std::promise promise; auto future = promise.get_future(); bool value = QMetaObject::invokeMethod( this->_parent /* context */, [this, pIdArray, promise = std::move(promise)]() mutable { this->_removeTilesRemote(pIdArray, std::move(promise)); }); Q_ASSERT(value == true); // return success return QVariant(future.get()); } QVariant NemoInterface::Impl::_callClearTiles() { // qDebug() << "_callClearTiles called"; this->_lastCall = CALL_NAME::CLEAR_TILES; // create response handler. auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); auto responseHandler = [promise_response](const QJsonObject &) mutable { // check if transaction was successfull promise_response->set_value(true); }; // call service. this->_pRosbridge->callService("/nemo/clear_tiles", responseHandler, QJsonObject()); // wait for response. auto tStart = hrc::now(); bool abort = true; do { auto status = future_response.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { abort = false; break; } } while (hrc::now() - tStart < maxResponseTime || this->_dispatcher.isInterruptionRequested()); if (abort) { qCWarning(NemoInterfaceLog) << "Websocket not responding to request."; return QVariant(false); } // transaction failed? if (!future_response.get()) { return QVariant(false); } // clear remote tiles (_remoteTiles) std::promise promise; auto future = promise.get_future(); bool value = QMetaObject::invokeMethod( this->_parent, [this, promise = std::move(promise)]() mutable { this->_clearTilesRemote(std::move(promise)); }); Q_ASSERT(value == true); // return success return QVariant(future.get()); } QVariant NemoInterface::Impl::_callGetProgress(std::shared_ptr pIdArray) { // qDebug() << "_callGetProgress called"; this->_lastCall = CALL_NAME::GET_PROGRESS; // create json object QJsonArray jsonIdArray; for (auto &&id : *pIdArray) { using namespace ros_bridge::messages; jsonIdArray.append(id); } // for QJsonObject req; req["ids"] = std::move(jsonIdArray); // create response handler. typedef std::shared_ptr ResponseType; auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); auto responseHandler = [promise_response](const QJsonObject &o) mutable { // check if transaction was successfull ros_bridge::messages::nemo_msgs::progress_array::ProgressArray progressArrayMsg; if (ros_bridge::messages::nemo_msgs::progress_array::fromJson( o, progressArrayMsg)) { auto pArray = std::make_shared(); *pArray = std::move(progressArrayMsg.progress_array()); promise_response->set_value(pArray); } else { qCWarning(NemoInterfaceLog) << "/nemo/get_progress error while creating ProgressArray " "from json."; promise_response->set_value(nullptr); } }; // call service. this->_pRosbridge->callService("/nemo/get_progress", responseHandler, req); // wait for response. auto tStart = hrc::now(); bool abort = true; do { auto status = future_response.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { abort = false; break; } } while (hrc::now() - tStart < maxResponseTime || this->_dispatcher.isInterruptionRequested()); if (abort) { qCWarning(NemoInterfaceLog) << "all_remove_tiles(): Websocket not responding to request."; return QVariant(false); } // transaction error? auto pArray = future_response.get(); if (pArray == nullptr) { return QVariant(false); } // remove remote tiles (_remoteTiles) std::promise promise; auto future = promise.get_future(); bool value = QMetaObject::invokeMethod( this->_parent, [this, pArray, promise = std::move(promise)]() mutable { this->_updateProgress(pArray, std::move(promise)); }); Q_ASSERT(value == true); // return success return QVariant(future.get()); } QVariant NemoInterface::Impl::_callGetAllProgress() { // qDebug() << "_callGetAllProgress called"; this->_lastCall = CALL_NAME::GET_ALL_PROGRESS; // create response handler. typedef std::shared_ptr ResponseType; auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); auto responseHandler = [promise_response](const QJsonObject &o) mutable { // check if transaction was successfull ros_bridge::messages::nemo_msgs::progress_array::ProgressArray progressArrayMsg; if (ros_bridge::messages::nemo_msgs::progress_array::fromJson( o, progressArrayMsg)) { auto pArray = std::make_shared(); *pArray = std::move(progressArrayMsg.progress_array()); promise_response->set_value(pArray); } else { qCWarning(NemoInterfaceLog) << "/nemo/get_all_progress error while creating ProgressArray " "from json. msg: " << o; promise_response->set_value(nullptr); } }; // call service. this->_pRosbridge->callService("/nemo/get_all_progress", responseHandler, QJsonObject()); // wait for response. auto tStart = hrc::now(); bool abort = true; do { auto status = future_response.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { abort = false; break; } } while (hrc::now() - tStart < maxResponseTime || this->_dispatcher.isInterruptionRequested()); if (abort) { qCWarning(NemoInterfaceLog) << "all_remove_tiles(): Websocket not responding to request."; return QVariant(false); } // transaction error? auto pArray = future_response.get(); if (pArray == nullptr) { return QVariant(false); } // remove remote tiles (_remoteTiles) std::promise promise; auto future = promise.get_future(); bool value = QMetaObject::invokeMethod( this->_parent, [this, pArray, promise = std::move(promise)]() mutable { this->_updateProgress(pArray, std::move(promise)); }); Q_ASSERT(value == true); // return success return QVariant(future.get()); } QVariant NemoInterface::Impl::_callGetAllTiles() { // qDebug() << "_callGetAllProgress called"; this->_lastCall = CALL_NAME::GET_ALL_TILES; // create response handler. typedef std::shared_ptr>> ResponseType; auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); auto responseHandler = [promise_response](const QJsonObject &o) mutable { const char *const tileArrayKey = "tile_array"; // check if transaction was successfull if (o.contains(tileArrayKey) && o[tileArrayKey].isArray()) { auto pArray = std::make_shared>>(); const auto jsonArray = o[tileArrayKey].toArray(); for (int i = 0; i < jsonArray.size(); ++i) { if (jsonArray[i].isObject()) { QJsonObject o = jsonArray[i].toObject(); auto tile = std::make_shared(); if (ros_bridge::messages::nemo_msgs::tile::fromJson(o, *tile)) { pArray->push_back(tile); } else { qCWarning(NemoInterfaceLog) << "/nemo/get_all_tiles error while creating tile."; promise_response->set_value(nullptr); } } else { qCWarning(NemoInterfaceLog) << "/nemo/get_all_tiles json array does not contain objects."; promise_response->set_value(nullptr); } } // success! promise_response->set_value(pArray); } else { qCWarning(NemoInterfaceLog) << "/nemo/get_all_tiles no tile_array key or wrong type."; promise_response->set_value(nullptr); } }; // call service. this->_pRosbridge->callService("/nemo/get_all_tiles", responseHandler, QJsonObject()); // wait for response. auto tStart = hrc::now(); bool abort = true; do { auto status = future_response.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { abort = false; break; } } while (hrc::now() - tStart < maxResponseTime || this->_dispatcher.isInterruptionRequested()); if (abort) { qCWarning(NemoInterfaceLog) << "all_remove_tiles(): Websocket not responding to request."; return QVariant(false); } // transaction error? auto pArray = future_response.get(); if (pArray == nullptr) { return QVariant(false); } // remote tiles (_remoteTiles) std::promise promise; auto future = promise.get_future(); bool value = QMetaObject::invokeMethod( this->_parent, [this, pArray, promise = std::move(promise)]() mutable { this->_setTiles(pArray, std::move(promise)); }); Q_ASSERT(value == true); // return success return QVariant(future.get()); } QVariant NemoInterface::Impl::_callGetVersion() { this->_lastCall = CALL_NAME::GET_VERSION; // create response handler. typedef QString ResponseType; auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); auto responseHandler = [promise_response](const QJsonObject &o) mutable { const char *const versionKey = "version"; // check if transaction was successfull if (o.contains(versionKey) && o[versionKey].isString()) { const auto version = o[versionKey].toString(); promise_response->set_value(version); } else { qCWarning(NemoInterfaceLog) << "/nemo/get_version no version key or wrong type."; promise_response->set_value("error!"); } }; // call service. this->_pRosbridge->callService("/nemo/get_version", responseHandler, QJsonObject()); // wait for response. auto tStart = hrc::now(); bool abort = true; do { auto status = future_response.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { abort = false; break; } } while (hrc::now() - tStart < maxResponseTime || this->_dispatcher.isInterruptionRequested()); if (abort) { qCWarning(NemoInterfaceLog) << "all_remove_tiles(): Websocket not responding to request."; return QVariant(false); } // transaction error? auto version = future_response.get(); if (version == "error!") { return QVariant(false); } // remote tiles (_remoteTiles) std::promise promise; auto future = promise.get_future(); bool value = QMetaObject::invokeMethod( this->_parent, [this, version, promise = std::move(promise)]() mutable { this->_setVersion(version, std::move(promise)); }); Q_ASSERT(value == true); // return success return QVariant(future.get()); } QString NemoInterface::Impl::_toString(NemoInterface::Impl::CALL_NAME name) { switch (name) { case CALL_NAME::ADD_TILES: return QString("ADD_TILES"); case CALL_NAME::REMOVE_TILES: return QString("REMOVE_TILES"); case CALL_NAME::CLEAR_TILES: return QString("CLEAR_TILES"); case CALL_NAME::GET_PROGRESS: return QString("GET_PROGRESS"); case CALL_NAME::GET_ALL_PROGRESS: return QString("GET_ALL_PROGRESS"); case CALL_NAME::GET_ALL_TILES: return QString("GET_ALL_TILES"); case CALL_NAME::GET_VERSION: return QString("GET_VERSION"); } return QString("unknown CALL_NAME"); } void NemoInterface::Impl::_addTilesRemote( std::shared_ptr>> pTileArray, std::promise promise) { // qDebug() << "_addTilesRemote called"; auto pArrayDup = std::make_shared>>(); for (auto &&pTile : *pTileArray) { pArrayDup->push_back(std::make_shared(*pTile)); } _addTilesRemote2(pArrayDup, std::move(promise)); } void NemoInterface::Impl::_addTilesRemote2( std::shared_ptr>> pTileArray, std::promise promise) { // qDebug() << "_addTilesRemote2 called"; bool anyChange = false; bool error = false; for (auto &&pTile : *pTileArray) { auto id = pTile->id(); auto it = _remoteTiles.find(id); if (Q_LIKELY(it == _remoteTiles.end())) { auto ret = _remoteTiles.insert(std::make_pair(id, pTile)); Q_ASSERT(ret.second == true); Q_UNUSED(ret); anyChange = true; } else { qCWarning(NemoInterfaceLog) << "_addTilesRemote: tile with id " << id << " already added."; if (pTile->tile() != it->second->tile()) { error = true; } } } if (anyChange) { if (this->_isSynchronized()) { this->_setState(STATE::READY); this->_doAction(); } } promise.set_value(!error); } void NemoInterface::Impl::_setTiles( std::shared_ptr>> pTileArray, std::promise promise) { bool error = false; _remoteTiles.clear(); for (auto &&pTile : *pTileArray) { auto id = pTile->id(); auto it = _remoteTiles.find(id); if (Q_LIKELY(it == _remoteTiles.end())) { auto ret = _remoteTiles.insert(std::make_pair(id, pTile)); Q_ASSERT(ret.second == true); Q_UNUSED(ret); } else { qCWarning(NemoInterfaceLog) << "_addTilesRemote: tile with id " << id << " already added."; error = true; } } if (error || !this->_isSynchronized()) { qDebug() << "_setTiles: trying to synchronize"; _trySynchronize(); } promise.set_value(true); } void NemoInterface::Impl::_setVersion(QString version, std::promise promise) { _remoteVersion = version; if (_remoteVersion != _localVersion) { _setWarningString("Local protocol version (" + _localVersion + ") does not match remote version (" + _remoteVersion + ")."); } else { _versionOK = true; _doAction(); } promise.set_value(true); } void NemoInterface::Impl::_removeTilesRemote(std::shared_ptr idArray, std::promise promise) { // qDebug() << "_removeTilesRemote called"; bool anyChange = false; for (auto &&id : *idArray) { auto it = _remoteTiles.find(id); if (Q_LIKELY(it != _remoteTiles.end())) { _remoteTiles.erase(it); anyChange = true; } else { qCWarning(NemoInterfaceLog) << "_removeTilesRemote: tile with unknown id " << id << "."; } } if (anyChange) { if (this->_isSynchronized()) { this->_setState(STATE::READY); this->_doAction(); } } promise.set_value(true); } void NemoInterface::Impl::_clearTilesRemote(std::promise promise) { // qDebug() << "_clearTilesRemote called"; if (_remoteTiles.size() > 0) { _remoteTiles.clear(); if (this->_isSynchronized()) { this->_setState(STATE::READY); this->_doAction(); } } promise.set_value(true); } bool NemoInterface::Impl::_setState(STATE newState) { if (newState != this->_state) { auto oldState = this->_state.load(); this->_state = newState; qCDebug(NemoInterfaceLog) << "state: " << _toString(oldState) << " -> " << _toString(newState); auto oldStatus = _status(oldState); auto newStatus = _status(newState); if (oldStatus != newStatus) { emit this->_parent->statusChanged(); } if (_running(oldState) != _running(newState)) { emit this->_parent->runningChanged(); } return true; } else { return false; } } bool NemoInterface::Impl::_ready(NemoInterface::Impl::STATE s) { return s == STATE::READY; } bool NemoInterface::Impl::_userSync(NemoInterface::Impl::STATE s) { return s == STATE::USER_SYNC; } bool NemoInterface::Impl::_sysSync(NemoInterface::Impl::STATE s) { return s == STATE::SYS_SYNC; } bool NemoInterface::Impl::_running(NemoInterface::Impl::STATE s) { return s != STATE::STOPPED; } NemoInterface::STATUS NemoInterface::Impl::_status(NemoInterface::Impl::STATE state) { NemoInterface::STATUS status; switch (state) { case STATE::STOPPED: status = NemoInterface::STATUS::NOT_CONNECTED; break; case STATE::START_BRIDGE: status = NemoInterface::STATUS::NOT_CONNECTED; break; case STATE::WEBSOCKET_DETECTED: status = NemoInterface::STATUS::WEBSOCKET_DETECTED; break; case STATE::TRY_SETUP: status = NemoInterface::STATUS::WEBSOCKET_DETECTED; break; case STATE::READY: status = NemoInterface::STATUS::READY; break; case STATE::USER_SYNC: case STATE::SYS_SYNC: status = NemoInterface::STATUS::SYNC; break; case STATE::WEBSOCKET_TIMEOUT: case STATE::HEARTBEAT_TIMEOUT: status = NemoInterface::STATUS::TIMEOUT; break; } return status; } QString NemoInterface::Impl::_toString(NemoInterface::Impl::STATE s) { switch (s) { case STATE::STOPPED: return QString("STOPPED"); case STATE::START_BRIDGE: return QString("START_BRIDGE"); case STATE::WEBSOCKET_DETECTED: return QString("WEBSOCKET_DETECTED"); case STATE::TRY_SETUP: return QString("TRY_TOPIC_SERVICE_SETUP"); case STATE::READY: return QString("READY"); case STATE::USER_SYNC: return QString("SYNC_USER"); case STATE::SYS_SYNC: return QString("SYNC_SYS"); case STATE::WEBSOCKET_TIMEOUT: return QString("WEBSOCKET_TIMEOUT"); case STATE::HEARTBEAT_TIMEOUT: return QString("HEARTBEAT_TIMEOUT"); } return "unknown state!"; } QString NemoInterface::Impl::_toString(NemoInterface::STATUS s) { switch (s) { case NemoInterface::STATUS::NOT_CONNECTED: return QString("NOT_CONNECTED"); case NemoInterface::STATUS::ERROR: return QString("ERROR"); case NemoInterface::STATUS::READY: return QString("READY"); case NemoInterface::STATUS::TIMEOUT: return QString("TIMEOUT"); case NemoInterface::STATUS::WEBSOCKET_DETECTED: return QString("WEBSOCKET_DETECTED"); case NemoInterface::STATUS::SYNC: return QString("SYNC"); } return "unknown state!"; } // =============================================================== // NemoInterface NemoInterface::NemoInterface() : QObject(), pImpl(std::make_unique(this)) {} NemoInterface *NemoInterface::createInstance() { return new NemoInterface(); } NemoInterface *NemoInterface::instance() { return GenericSingelton::instance( NemoInterface::createInstance); } NemoInterface::~NemoInterface() {} void NemoInterface::start() { this->pImpl->start(); } void NemoInterface::stop() { this->pImpl->stop(); } std::shared_future NemoInterface::addTiles(const TileArray &tileArray) { TilePtrArray ptrArray; for (const auto &tile : tileArray) { ptrArray.push_back(const_cast(&tile)); } return this->pImpl->addTiles(ptrArray); } std::shared_future NemoInterface::addTiles(const TilePtrArray &tileArray) { return this->pImpl->addTiles(tileArray); } std::shared_future NemoInterface::removeTiles(const IDArray &idArray) { return this->pImpl->removeTiles(idArray); } std::shared_future NemoInterface::clearTiles() { return this->pImpl->clearTiles(); } TileArray NemoInterface::getTiles(const IDArray &idArray) const { return this->pImpl->getTiles(idArray); } TileArray NemoInterface::getAllTiles() const { return this->pImpl->getAllTiles(); } LogicalArray NemoInterface::containsTiles(const IDArray &idArray) const { return this->pImpl->containsTiles(idArray); } std::size_t NemoInterface::size() const { return this->pImpl->size(); } bool NemoInterface::empty() const { return this->pImpl->empty(); } ProgressArray NemoInterface::getProgress() const { return this->pImpl->getProgress(); } ProgressArray NemoInterface::getProgress(const IDArray &idArray) const { return this->pImpl->getProgress(idArray); } NemoInterface::STATUS NemoInterface::status() const { return this->pImpl->status(); } QString NemoInterface::statusString() const { return statusMap.at(this->pImpl->status()); } QString NemoInterface::infoString() const { return this->pImpl->infoString(); } QString NemoInterface::warningString() const { return this->pImpl->warningString(); } QString NemoInterface::editorQml() { return QStringLiteral("NemoInterface.qml"); } bool NemoInterface::running() const { return this->pImpl->running(); }