Commit 47586728 authored by Valentin Platzgummer's avatar Valentin Platzgummer

temp

parent 22c1b55e
......@@ -33,7 +33,7 @@
QGC_LOGGING_CATEGORY(NemoInterfaceLog, "NemoInterfaceLog")
#define SYNC_INTERVAL 1000 // ms
#define NO_HEARTBEAT_TIMEOUT 10000 // ms
#define NO_HEARTBEAT_TIMEOUT 5000 // ms
#define CONNECTION_TIMER_INTERVAL 1000 // ms
static constexpr auto maxResponseTime = std::chrono::milliseconds(10000);
......@@ -56,8 +56,8 @@ class NemoInterface::Impl {
START_BRIDGE,
WEBSOCKET_DETECTED,
TRY_TOPIC_SERVICE_SETUP,
SYNC_USER,
SYNC_SYS,
USER_SYNC,
SYS_SYNC,
READY,
WEBSOCKET_TIMEOUT,
HEARTBEAT_TIMEOUT
......@@ -112,6 +112,14 @@ private:
QVariant _callClearTiles();
// called from dispatcher thread!
QVariant _callGetProgress(std::shared_ptr<IDArray> pIdArray);
QVariant _callGetAllProgress();
enum class CALL_NAME {
ADD_TILES,
REMOVE_TILES,
CLEAR_TILES,
GET_PROGRESS,
GET_ALL_PROGRESS
};
void _addTilesRemote(
std::shared_ptr<QVector<std::shared_ptr<const Tile>>> pTileArray);
......@@ -134,6 +142,7 @@ private:
static QString _toString(NemoInterface::STATUS s);
std::atomic<STATE> _state;
std::atomic<CALL_NAME> _lastCall;
ROSBridgePtr _pRosBridge;
TileMap _remoteTiles;
TileMapConst _localTiles;
......@@ -169,20 +178,20 @@ NemoInterface::Impl::Impl(NemoInterface *p)
auto connectionStringFact = wimaSettings->rosbridgeConnectionString();
auto setConnectionString = [connectionStringFact, this] {
auto connectionString = connectionStringFact->rawValue().toString();
if (is_valid_port_path(connectionString.toLocal8Bit().data())) {
} else {
if (!is_valid_port_path(connectionString.toLocal8Bit().data())) {
qgcApp()->warningMessageBoxOnMainThread(
"Nemo Interface",
"Websocket connection string possibly invalid: " + connectionString +
". Trying to connect anyways.");
}
if (this->_pRosBridge) {
this->_pRosBridge->reset();
}
bool wasRunning = this->running();
this->stop();
this->_pRosBridge = std::make_shared<RosbridgeWsClient>(
connectionString.toLocal8Bit().data());
this->_pRosBridge->reset();
if (wasRunning) {
this->start();
}
};
connect(connectionStringFact, &SettingsFact::rawValueChanged,
setConnectionString);
......@@ -241,6 +250,7 @@ NemoInterface::Impl::addTiles(const TilePtrArray &tileArray) {
// copy unknown tiles
auto pTileArray = std::make_shared<QVector<std::shared_ptr<const Tile>>>();
auto pIdArray = std::make_shared<IDArray>();
for (const auto *pTile : tileArray) {
auto id = pTile->id();
const auto it = this->_localTiles.find(id);
......@@ -250,6 +260,7 @@ NemoInterface::Impl::addTiles(const TilePtrArray &tileArray) {
std::make_shared<const Tile>(pTile->coordinateList(), 0.0, id);
_localTiles.insert(std::make_pair(id, pTileCopy));
pTileArray->push_back(pTileCopy);
pIdArray->push_back(id);
} else {
qCDebug(NemoInterfaceLog)
<< "addTiles(): tile with id: " << pTile->id() << "already added.";
......@@ -259,18 +270,32 @@ NemoInterface::Impl::addTiles(const TilePtrArray &tileArray) {
// ready for send?
if (pTileArray->size() > 0 && (this->ready() || this->_userSync())) {
this->_setState(STATE::SYNC_USER);
this->_setState(STATE::USER_SYNC);
this->_doAction();
// create command.
auto sendTilesCommand = std::make_unique<Task>(
// create add tiles command.
auto pTask = std::make_unique<Task>(
std::bind(&Impl::_callAddTiles, this, pTileArray));
// dispatch command and return.
auto ret = _dispatcher.dispatch(std::move(sendTilesCommand));
auto sfut = ret.share();
_futureWatcher.setFuture(sfut);
return sfut;
// dispatch command.
auto ret = _dispatcher.dispatch(std::move(pTask));
auto addFuture = ret.share();
// create get progress cmd.
pTask = std::make_unique<Task>([this, addFuture, pIdArray] {
addFuture.wait();
if (addFuture.get().toBool()) {
return this->_callGetProgress(pIdArray);
} else {
return QVariant(false);
}
});
// dispatch command.
ret = _dispatcher.dispatch(std::move(pTask));
auto progressFuture = ret.share();
_futureWatcher.setFuture(progressFuture);
return progressFuture;
}
}
......@@ -301,7 +326,7 @@ NemoInterface::Impl::removeTiles(const IDArray &idArray) {
// ready for send?
if (pIdArray->size() > 0 && (this->ready() || this->_userSync())) {
this->_setState(STATE::SYNC_USER);
this->_setState(STATE::USER_SYNC);
this->_doAction();
// create command.
......@@ -329,7 +354,7 @@ std::shared_future<QVariant> NemoInterface::Impl::clearTiles() {
if (this->_localTiles.size() > 0 && (this->ready() || this->_userSync())) {
this->_setState(STATE::SYNC_USER);
this->_setState(STATE::USER_SYNC);
this->_doAction();
// create command.
......@@ -448,27 +473,11 @@ bool NemoInterface::Impl::ready() { return _ready(this->_state.load()); }
bool NemoInterface::Impl::_sysSync() { return _sysSync(this->_state); }
void NemoInterface::Impl::_onFutureWatcherFinished() {
auto lastTransactionSuccessfull = _futureWatcher.result().toBool();
if (!lastTransactionSuccessfull) {
if (this->_userSync()) {
_trySynchronize();
} else if (this->_sysSync()) {
if (this->_userSync() || this->_sysSync()) {
auto lastTransactionSuccessfull = _futureWatcher.result().toBool();
if (!lastTransactionSuccessfull) {
QTimer::singleShot(1000, [this] { this->_trySynchronize(); });
}
} else {
// fetch progress
auto pIdArray = std::make_shared<IDArray>();
for (const auto &pair : _remoteTiles) {
pIdArray->push_back(pair.first);
}
auto pTask = std::make_unique<nemo_interface::Task>(
std::bind(&Impl::_callGetProgress, this, pIdArray));
// dispatch command.
auto ret = _dispatcher.dispatch(std::move(pTask));
Q_ASSERT(false);
_futureWatcher.setFuture(ret.share());
}
}
......@@ -628,8 +637,10 @@ void NemoInterface::Impl::_doTopicServiceSetup() {
}
void NemoInterface::Impl::_trySynchronize() {
if (!_isSynchronized()) {
this->_setState(STATE::SYNC_SYS);
if ((this->_state == STATE::READY || this->_state == STATE::SYS_SYNC ||
this->_state == STATE::USER_SYNC) &&
!_isSynchronized()) {
this->_setState(STATE::SYS_SYNC);
this->_doAction();
// create clear cmd.
......@@ -637,9 +648,11 @@ void NemoInterface::Impl::_trySynchronize() {
std::bind(&Impl::_callClearTiles, this));
// dispatch command.
qCritical() << "this assert is triggered sometimes! sdf92894";
_dispatcher.clear();
_dispatcher.stop();
Q_ASSERT(_dispatcher.pendingTasks() == 0);
auto ret = _dispatcher.dispatch(std::move(pTask));
auto clearFuture = ret.share();
// create tile array.
auto pTileArray = std::make_shared<QVector<std::shared_ptr<const Tile>>>();
......@@ -648,11 +661,32 @@ void NemoInterface::Impl::_trySynchronize() {
}
// create addTiles cmd.
auto sendTilesCommand = std::make_unique<nemo_interface::Task>(
std::bind(&Impl::_callAddTiles, this, pTileArray));
pTask =
std::make_unique<nemo_interface::Task>([this, pTileArray, clearFuture] {
clearFuture.wait();
if (clearFuture.get().toBool()) {
return this->_callAddTiles(pTileArray);
} else {
return QVariant(false);
}
});
// dispatch command.
ret = _dispatcher.dispatch(std::move(pTask));
auto addFuture = ret.share();
// create GetAllProgress cmd.
pTask = std::make_unique<nemo_interface::Task>([this, addFuture] {
addFuture.wait();
if (addFuture.get().toBool()) {
return this->_callGetAllProgress();
} else {
return QVariant(false);
}
});
// dispatch command.
ret = _dispatcher.dispatch(std::move(sendTilesCommand));
ret = _dispatcher.dispatch(std::move(pTask));
_futureWatcher.setFuture(ret.share());
}
}
......@@ -693,8 +727,8 @@ void NemoInterface::Impl::_doAction() {
case STATE::READY:
_trySynchronize();
break;
case STATE::SYNC_USER:
case STATE::SYNC_SYS:
case STATE::USER_SYNC:
case STATE::SYS_SYNC:
break;
case STATE::HEARTBEAT_TIMEOUT:
this->_clearTilesRemote();
......@@ -711,8 +745,10 @@ void NemoInterface::Impl::_doAction() {
}
QVariant NemoInterface::Impl::_callAddTiles(
std::shared_ptr<QVector<std::shared_ptr<const Tile>>> pTileArray) {
this->_lastCall = CALL_NAME::ADD_TILES;
// create json object
rapidjson::Document request(rapidjson::kObjectType);
auto &allocator = request.GetAllocator();
......@@ -805,6 +841,9 @@ QVariant NemoInterface::Impl::_callAddTiles(
QVariant
NemoInterface::Impl::_callRemoveTiles(std::shared_ptr<IDArray> pIdArray) {
this->_lastCall = CALL_NAME::REMOVE_TILES;
// create json object
rapidjson::Document request(rapidjson::kObjectType);
auto &allocator = request.GetAllocator();
......@@ -890,6 +929,9 @@ NemoInterface::Impl::_callRemoveTiles(std::shared_ptr<IDArray> pIdArray) {
}
QVariant NemoInterface::Impl::_callClearTiles() {
this->_lastCall = CALL_NAME::CLEAR_TILES;
// create response handler.
auto promise_response = std::make_shared<std::promise<bool>>();
auto future_response = promise_response->get_future();
......@@ -955,6 +997,9 @@ QVariant NemoInterface::Impl::_callClearTiles() {
QVariant
NemoInterface::Impl::_callGetProgress(std::shared_ptr<IDArray> pIdArray) {
this->_lastCall = CALL_NAME::GET_PROGRESS;
// create json object
rapidjson::Document request(rapidjson::kObjectType);
auto &allocator = request.GetAllocator();
......@@ -1027,7 +1072,92 @@ NemoInterface::Impl::_callGetProgress(std::shared_ptr<IDArray> pIdArray) {
if (abort) {
qCWarning(NemoInterfaceLog)
<< "remove_tiles(): Websocket not responding to request.";
<< "all_remove_tiles(): Websocket not responding to request.";
return QVariant(false);
}
// transaction error?
auto pArray = future_response.get();
if (pArray == nullptr) {
return QVariant(false);
}
// remove remote tiles (_remoteTiles)
QMetaObject::invokeMethod(this->_parent /* context */,
[this, pArray] { this->_updateProgress(pArray); });
// return success
return QVariant(true);
}
QVariant NemoInterface::Impl::_callGetAllProgress() {
this->_lastCall = CALL_NAME::GET_ALL_PROGRESS;
// create json object
rapidjson::Document request(rapidjson::kObjectType);
// create response handler.
typedef std::shared_ptr<ProgressArray> ResponseType;
auto promise_response = std::make_shared<std::promise<ResponseType>>();
auto future_response = promise_response->get_future();
auto responseHandler = [promise_response](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage>
in_message) mutable {
// check if transaction was successfull
auto msg = in_message->string();
rapidjson::Document d;
d.Parse(msg.c_str());
if (!d.HasParseError()) {
if (d.HasMember("values") && d["values"].IsObject()) {
ros_bridge::messages::nemo_msgs::progress_array::ProgressArray
progressArrayMsg;
if (ros_bridge::messages::nemo_msgs::progress_array::fromJson(
d["values"], progressArrayMsg)) {
auto pArray = std::make_shared<ProgressArray>();
*pArray = std::move(progressArrayMsg.progress_array());
promise_response->set_value(pArray);
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/all_get_progress error while creating ProgressArray "
"from json.";
promise_response->set_value(nullptr);
}
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/all_get_progress no \"values\" key or wrong type: "
<< msg.c_str();
promise_response->set_value(nullptr);
}
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/all_get_progress message parse error (" << d.GetParseError()
<< "): " << msg.c_str();
promise_response->set_value(nullptr);
}
connection->send_close(1000);
};
// call service.
this->_pRosBridge->callService("/nemo/get_all_progress", responseHandler,
request);
// wait for response.
auto tStart = hrc::now();
bool abort = true;
do {
auto status = future_response.wait_for(std::chrono::milliseconds(100));
if (status == std::future_status::ready) {
abort = false;
break;
}
} while (hrc::now() - tStart < maxResponseTime ||
this->_dispatcher.isInterruptionRequested());
if (abort) {
qCWarning(NemoInterfaceLog)
<< "all_remove_tiles(): Websocket not responding to request.";
return QVariant(false);
}
......@@ -1062,8 +1192,6 @@ void NemoInterface::Impl::_addTilesRemote(
for (auto pTile : *pTileArray) {
auto id = pTile->id();
auto it = _remoteTiles.find(id);
qCritical() << "this assert is triggered sometimes! 1212341242";
Q_ASSERT(it == _remoteTiles.end());
if (Q_LIKELY(it == _remoteTiles.end())) {
auto ret = _remoteTiles.insert(std::make_pair(id, pTile));
Q_ASSERT(ret.second == true);
......@@ -1089,7 +1217,6 @@ void NemoInterface::Impl::_removeTilesRemote(std::shared_ptr<IDArray> idArray) {
for (const auto id : *idArray) {
auto it = _remoteTiles.find(id);
Q_ASSERT(it != _remoteTiles.end());
if (Q_LIKELY(it != _remoteTiles.end())) {
_remoteTiles.erase(it);
anyChange = true;
......@@ -1129,8 +1256,6 @@ bool NemoInterface::Impl::_setState(STATE newState) {
auto oldStatus = _status(oldState);
auto newStatus = _status(newState);
if (oldStatus != newStatus) {
qCDebug(NemoInterfaceLog) << "status: " << _toString(oldStatus) << " -> "
<< _toString(newStatus);
emit this->_parent->statusChanged();
}
......@@ -1149,11 +1274,11 @@ bool NemoInterface::Impl::_ready(NemoInterface::Impl::STATE s) {
}
bool NemoInterface::Impl::_userSync(NemoInterface::Impl::STATE s) {
return s == STATE::SYNC_USER;
return s == STATE::USER_SYNC;
}
bool NemoInterface::Impl::_sysSync(NemoInterface::Impl::STATE s) {
return s == STATE::SYNC_SYS;
return s == STATE::SYS_SYNC;
}
bool NemoInterface::Impl::_running(NemoInterface::Impl::STATE s) {
......@@ -1179,8 +1304,8 @@ NemoInterface::Impl::_status(NemoInterface::Impl::STATE state) {
case STATE::READY:
status = NemoInterface::STATUS::READY;
break;
case STATE::SYNC_USER:
case STATE::SYNC_SYS:
case STATE::USER_SYNC:
case STATE::SYS_SYNC:
status = NemoInterface::STATUS::SYNC;
break;
case STATE::WEBSOCKET_TIMEOUT:
......@@ -1204,9 +1329,9 @@ QString NemoInterface::Impl::_toString(NemoInterface::Impl::STATE s) {
return QString("TRY_TOPIC_SERVICE_SETUP");
case STATE::READY:
return QString("READY");
case STATE::SYNC_USER:
case STATE::USER_SYNC:
return QString("SYNC_USER");
case STATE::SYNC_SYS:
case STATE::SYS_SYNC:
return QString("SYNC_SYS");
case STATE::WEBSOCKET_TIMEOUT:
return QString("WEBSOCKET_TIMEOUT");
......
......@@ -325,22 +325,15 @@ bool MeasurementArea::isCorrect() {
void MeasurementArea::updateProgress(const ProgressArray &array) {
if (ready() && !_holdProgress && array.size() > 0) {
bool anyChanges = false;
long counter = 0;
for (const auto &lp : array) {
qDebug() << "MeasurementArea::updateProgress: counter = " << counter++;
auto it = _indexMap.find(lp.id());
if (it != _indexMap.end()) {
int tileIndex = it->second;
auto *tile = _tiles->value<MeasurementTile *>(tileIndex);
qDebug() << "MeasurementArea::updateProgress: progress before = "
<< tile->progress();
if (!qFuzzyCompare(lp.progress(), tile->progress())) {
tile->setProgress(lp.progress());
anyChanges = true;
}
qDebug() << "MeasurementArea::updateProgress: progress after = "
<< tile->progress();
}
}
......
......@@ -28,7 +28,10 @@ public:
_init();
}
FutureType future() const { return _future; }
FutureType future() const {
Q_ASSERT(this->_state != STATE::EMPTY);
return _future;
}
void setFuture(const FutureType &future) {
_future = future;
......@@ -37,9 +40,13 @@ public:
emit started();
}
T result() { return _future.get(); }
T result() {
Q_ASSERT(this->_state != STATE::EMPTY);
return _future.get();
}
virtual void waitForFinished() override {
Q_ASSERT(this->_state != STATE::EMPTY);
if (_state == STATE::STARTED) {
_timer.stop();
_future.wait();
......@@ -53,9 +60,11 @@ public:
private:
void _onTimeout() {
Q_ASSERT(this->_state == STATE::STARTED);
if (_state == STATE::STARTED) {
auto status = _future.wait_for(std::chrono::seconds(0));
if (status == std::future_status::ready) {
_timer.stop();
_state = STATE::FINISHED;
emit finished();
}
......
......@@ -91,6 +91,7 @@ void TaskDispatcher::run() {
ULock lk1(this->_mutex);
if (this->_taskQueue.size() > 0 && this->_running) {
Q_ASSERT(this->_taskQueue.size() == this->_promiseQueue.size());
// pop task and promise
auto pTask = std::move(this->_taskQueue.front());
auto promise = std::move(this->_promiseQueue.front());
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment