From 9e620065df0ff5e4fa03a08401f989812f316dc1 Mon Sep 17 00:00:00 2001 From: Valentin Platzgummer Date: Mon, 25 Jan 2021 18:58:43 +0100 Subject: [PATCH] new rosbridge impl added, not tested --- qgroundcontrol.pro | 19 +- src/MeasurementComplexItem/NemoInterface.cpp | 531 +++++++++--------- .../rosbridge/rosbridge.cpp | 253 +++++++++ .../rosbridge/rosbridge.h | 110 ++++ .../rosbridge/rosbridgeimpl.cpp | 508 +++++++++++++++++ .../rosbridge/rosbridgeimpl.h | 84 +++ 6 files changed, 1212 insertions(+), 293 deletions(-) create mode 100644 src/MeasurementComplexItem/rosbridge/rosbridge.cpp create mode 100644 src/MeasurementComplexItem/rosbridge/rosbridge.h create mode 100644 src/MeasurementComplexItem/rosbridge/rosbridgeimpl.cpp create mode 100644 src/MeasurementComplexItem/rosbridge/rosbridgeimpl.h diff --git a/qgroundcontrol.pro b/qgroundcontrol.pro index 43d99bb4d..3b6e1c298 100644 --- a/qgroundcontrol.pro +++ b/qgroundcontrol.pro @@ -267,7 +267,8 @@ QT += \ widgets \ xml \ texttospeech \ - core-private + core-private \ + websockets # Multimedia only used if QVC is enabled !contains (DEFINES, QGC_DISABLE_UVC) { @@ -420,6 +421,7 @@ INCLUDEPATH += \ src/Audio \ src/comm \ src/MeasurementComplexItem \ + src/MeasurementComplexItem/rosbridge \ src/MeasurementComplexItem/geometry \ src/MeasurementComplexItem/nemo_interface \ src/comm/ros_bridge \ @@ -454,6 +456,8 @@ HEADERS += \ src/MeasurementComplexItem/geometry/TileDiff.h \ src/MeasurementComplexItem/geometry/geometry.h \ src/MeasurementComplexItem/HashFunctions.h \ + src/MeasurementComplexItem/rosbridge/rosbridge.h \ + src/MeasurementComplexItem/rosbridge/rosbridgeimpl.h \ src/MeasurementComplexItem/nemo_interface/FutureWatcher.h \ src/MeasurementComplexItem/nemo_interface/FutureWatcherInterface.h \ src/MeasurementComplexItem/nemo_interface/Task.h \ @@ -508,18 +512,13 @@ HEADERS += \ src/GPS/Drivers/src/base_station.h \ src/Settings/WimaSettings.h \ src/comm/QmlObjectListHelper.h \ - src/comm/ros_bridge/include/RosBridgeClient.h \ - src/comm/ros_bridge/include/com_private.h \ src/comm/ros_bridge/include/message_traits.h \ src/comm/ros_bridge/include/messages/geographic_msgs/geopoint.h \ src/comm/ros_bridge/include/messages/nemo_msgs/heartbeat.h \ src/comm/ros_bridge/include/messages/nemo_msgs/progress_array.h \ src/comm/ros_bridge/include/messages/nemo_msgs/tile.h \ src/comm/ros_bridge/include/messages/std_msgs/header.h \ - src/comm/ros_bridge/include/server.h \ src/comm/ros_bridge/include/messages/std_msgs/time.h \ - src/comm/ros_bridge/include/topic_publisher.h \ - src/comm/ros_bridge/include/topic_subscriber.h \ src/comm/utilities.h contains (DEFINES, QGC_ENABLE_PAIRING) { @@ -534,6 +533,8 @@ SOURCES += \ src/MeasurementComplexItem/geometry/SafeArea.cc \ src/MeasurementComplexItem/geometry/geometry.cpp \ src/MeasurementComplexItem/HashFunctions.cpp \ + src/MeasurementComplexItem/rosbridge/rosbridge.cpp \ + src/MeasurementComplexItem/rosbridge/rosbridgeimpl.cpp \ src/MeasurementComplexItem/nemo_interface/FutureWatcherInterface.cpp \ src/MeasurementComplexItem/nemo_interface/MeasurementTile.cpp \ src/MeasurementComplexItem/nemo_interface/Task.cpp \ @@ -554,8 +555,6 @@ SOURCES += \ src/MeasurementComplexItem/geometry/GeoPoint3D.cpp \ src/MeasurementComplexItem/NemoInterface.cpp \ src/comm/QmlObjectListHelper.cpp \ - src/comm/ros_bridge/include/RosBridgeClient.cpp \ - src/comm/ros_bridge/include/com_private.cpp \ src/comm/ros_bridge/include/messages/geographic_msgs/geopoint.cpp \ src/comm/ros_bridge/include/messages/nemo_msgs/heartbeat.cpp \ src/comm/ros_bridge/include/messages/nemo_msgs/labeled_progress.cpp \ @@ -563,11 +562,7 @@ SOURCES += \ src/comm/ros_bridge/include/messages/nemo_msgs/tile.cpp \ src/comm/ros_bridge/include/messages/std_msgs/header.cpp \ src/comm/ros_bridge/include/messages/std_msgs/time.cpp \ - src/comm/ros_bridge/include/server.cpp \ - src/comm/ros_bridge/include/topic_publisher.cpp \ - src/comm/ros_bridge/include/topic_subscriber.cpp \ src/Settings/WimaSettings.cc \ - src/comm/ros_bridge/src/ros_bridge.cpp contains (DEFINES, QGC_ENABLE_PAIRING) { SOURCES += \ diff --git a/src/MeasurementComplexItem/NemoInterface.cpp b/src/MeasurementComplexItem/NemoInterface.cpp index 9d4d39c24..171c3fc0a 100644 --- a/src/MeasurementComplexItem/NemoInterface.cpp +++ b/src/MeasurementComplexItem/NemoInterface.cpp @@ -10,6 +10,7 @@ #include #include +#include #include "GenericSingelton.h" #include "geometry/MeasurementArea.h" @@ -19,26 +20,26 @@ #include "nemo_interface/QNemoHeartbeat.h" #include "nemo_interface/TaskDispatcher.h" -#include "ros_bridge/include/messages/nemo_msgs/progress_array.h" - -#include "ros_bridge/include/RosBridgeClient.h" #include "ros_bridge/include/messages/geographic_msgs/geopoint.h" #include "ros_bridge/include/messages/nemo_msgs/heartbeat.h" +#include "ros_bridge/include/messages/nemo_msgs/progress_array.h" #include "ros_bridge/include/messages/nemo_msgs/tile.h" -#include "ros_bridge/include/ros_bridge.h" #include "ros_bridge/rapidjson/include/rapidjson/document.h" #include "ros_bridge/rapidjson/include/rapidjson/ostreamwrapper.h" #include "ros_bridge/rapidjson/include/rapidjson/writer.h" +#include "rosbridge/rosbridge.h" QGC_LOGGING_CATEGORY(NemoInterfaceLog, "NemoInterfaceLog") -#define SYNC_INTERVAL 1000 // ms -#define NO_HEARTBEAT_TIMEOUT 5000 // ms -#define CONNECTION_TIMER_INTERVAL 1000 // ms +#define SYNC_INTERVAL 1000 // ms +#define NO_HEARTBEAT_TIMEOUT 5000 // ms static constexpr auto maxResponseTime = std::chrono::milliseconds(10000); +static const char *progressTopic = "/nemo/progress"; +static const char *heartbeatTopic = "/nemo/heartbeat"; + using hrc = std::chrono::high_resolution_clock; -using ROSBridgePtr = std::shared_ptr; +using ROSBridgePtr = std::shared_ptr; typedef ros_bridge::messages::nemo_msgs::tile::GenericTile @@ -103,6 +104,7 @@ private: bool _sysSync() const; // thread safe void _onFutureWatcherFinished(); // thread safe void _onHeartbeatTimeout(); // thread safe + void _onRosbridgeStateChanged(); // called from dispatcher thread! QVariant _callAddTiles( @@ -150,7 +152,7 @@ private: std::atomic _state; std::atomic _lastCall; - ROSBridgePtr _pRosBridge; + ROSBridgePtr _pRosbridge; TileMap _remoteTiles; TileMapConst _localTiles; NemoInterface *const _parent; @@ -158,7 +160,6 @@ private: QString _infoString; QString _warningString; QTimer _timeoutTimer; - QTimer _connectionTimer; QNemoHeartbeat _lastHeartbeat; FutureWatcher _futureWatcher; }; @@ -185,17 +186,11 @@ NemoInterface::Impl::Impl(NemoInterface *p) auto connectionStringFact = wimaSettings->rosbridgeConnectionString(); auto setConnectionString = [connectionStringFact, this] { auto connectionString = connectionStringFact->rawValue().toString(); - if (!is_valid_port_path(connectionString.toLocal8Bit().data())) { - qgcApp()->warningMessageBoxOnMainThread( - "Nemo Interface", - "Websocket connection string possibly invalid: " + connectionString + - ". Trying to connect anyways."); - } bool wasRunning = this->running(); this->stop(); - this->_pRosBridge = std::make_shared( - connectionString.toLocal8Bit().data()); + this->_pRosbridge = std::make_shared( + QUrl(QString("ws://") + connectionString.toLocal8Bit().data())); if (wasRunning) { this->start(); } @@ -209,29 +204,14 @@ NemoInterface::Impl::Impl(NemoInterface *p) std::bind(&Impl::_onHeartbeatTimeout, this)); // Connection timer (temporary workaround) - connect(&this->_connectionTimer, &QTimer::timeout, [this] { - if (this->_pRosBridge->connected()) { - if (this->_state == STATE::START_BRIDGE || - this->_state == STATE::WEBSOCKET_TIMEOUT) { - this->_setState(STATE::WEBSOCKET_DETECTED); - this->_doAction(); - } - } else { - if (this->_state == STATE::TRY_TOPIC_SERVICE_SETUP || - this->_state == STATE::READY || - this->_state == STATE::WEBSOCKET_DETECTED || - this->_state == STATE::HEARTBEAT_TIMEOUT) { - this->_setState(STATE::WEBSOCKET_TIMEOUT); - this->_doAction(); - } - } - }); + connect(this->_pRosbridge.get(), &Rosbridge::stateChanged, + [this] { this->_onRosbridgeStateChanged(); }); connect(&this->_futureWatcher, &FutureWatcher::finished, [this] { this->_onFutureWatcherFinished(); }); } -NemoInterface::Impl::~Impl() { this->_pRosBridge->reset(); } +NemoInterface::Impl::~Impl() { this->_pRosbridge->stop(); } void NemoInterface::Impl::start() { if (!running()) { @@ -533,6 +513,25 @@ void NemoInterface::Impl::_onHeartbeatTimeout() { this->_doAction(); } +void NemoInterface::Impl::_onRosbridgeStateChanged() { + auto state = this->_pRosbridge->state(); + if (state == Rosbridge::STATE::CONNECTED) { + if (this->_state == STATE::START_BRIDGE || + this->_state == STATE::WEBSOCKET_TIMEOUT) { + this->_setState(STATE::WEBSOCKET_DETECTED); + this->_doAction(); + } + } else if (state == Rosbridge::STATE::TIMEOUT) { + if (this->_state == STATE::TRY_TOPIC_SERVICE_SETUP || + this->_state == STATE::READY || + this->_state == STATE::WEBSOCKET_DETECTED || + this->_state == STATE::HEARTBEAT_TIMEOUT) { + this->_setState(STATE::WEBSOCKET_TIMEOUT); + this->_doAction(); + } + } +} + bool NemoInterface::Impl::_userSync() const { return _userSync(this->_state); } const QString &NemoInterface::Impl::infoString() const { return _infoString; } @@ -600,113 +599,101 @@ void NemoInterface::Impl::_doTopicServiceSetup() { using namespace ros_bridge::messages; // Subscribe nemo progress. - const char *progressClient = "client:/nemo/progress"; - this->_pRosBridge->addClient(progressClient); - this->_pRosBridge->subscribe( - progressClient, "/nemo/progress", - [this](std::shared_ptr, - std::shared_ptr in_message) { - auto msg = in_message->string(); - - // parse in_message - rapidjson::Document d; - d.Parse(msg.c_str()); - if (!d.HasParseError()) { - if (d.HasMember("msg") && d["msg"].IsObject()) { - - // create obj from json - nemo_msgs::progress_array::ProgressArray progressArray; - if (nemo_msgs::progress_array::fromJson(d["msg"], progressArray)) { - - // correct range errors of progress - for (auto &lp : progressArray.progress_array()) { - bool rangeError = false; - if (lp.progress() < 0) { - lp.setProgress(0); - rangeError = true; - } - if (lp.progress() > 100) { - lp.setProgress(100); - rangeError = true; - } - - if (rangeError) { - qCWarning(NemoInterfaceLog) << "/nemo/progress progress out " - "of range, value was set to: " - << lp.progress(); - } - } - - auto p = std::make_shared(); - *p = std::move(progressArray.progress_array()); - std::promise promise; - auto future = promise.get_future(); - bool value = QMetaObject::invokeMethod( - this->_parent, - [this, p, promise = std::move(promise)]() mutable { - this->_updateProgress(p, std::move(promise)); - }); - Q_ASSERT(value == true); - future.wait(); - } else { - qCWarning(NemoInterfaceLog) << "/nemo/progress not able to " - "create ProgressArray form json: " - << msg.c_str(); + this->_pRosbridge->subscribeTopic(progressTopic, [this]( + const QJsonObject &o) { + QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + + // parse in_message + rapidjson::Document d; + d.Parse(msg.toUtf8()); + if (!d.HasParseError()) { + if (d.HasMember("msg") && d["msg"].IsObject()) { + + // create obj from json + nemo_msgs::progress_array::ProgressArray progressArray; + if (nemo_msgs::progress_array::fromJson(d["msg"], progressArray)) { + + // correct range errors of progress + for (auto &lp : progressArray.progress_array()) { + bool rangeError = false; + if (lp.progress() < 0) { + lp.setProgress(0); + rangeError = true; + } + if (lp.progress() > 100) { + lp.setProgress(100); + rangeError = true; + } + + if (rangeError) { + qCWarning(NemoInterfaceLog) << "/nemo/progress progress out " + "of range, value was set to: " + << lp.progress(); } - } else { - qCWarning(NemoInterfaceLog) - << "/nemo/progress no \"msg\" key or wrong type: " - << msg.c_str(); } + + auto p = std::make_shared(); + *p = std::move(progressArray.progress_array()); + std::promise promise; + auto future = promise.get_future(); + bool value = QMetaObject::invokeMethod( + this->_parent, [this, p, promise = std::move(promise)]() mutable { + this->_updateProgress(p, std::move(promise)); + }); + Q_ASSERT(value == true); + future.wait(); } else { - qCWarning(NemoInterfaceLog) - << "/nemo/progress message parse error (" << d.GetParseError() - << "): " << msg.c_str(); + qCWarning(NemoInterfaceLog) << "/nemo/progress not able to " + "create ProgressArray form json: " + << msg; } - }); + } else { + qCWarning(NemoInterfaceLog) + << "/nemo/progress no \"msg\" key or wrong type: " << msg; + } + } else { + qCWarning(NemoInterfaceLog) << "/nemo/progress message parse error (" + << d.GetParseError() << "): " << msg; + } + }); + // Subscribe heartbeat msg. - const char *heartbeatClient = "client:/nemo/heartbeat"; - this->_pRosBridge->addClient(heartbeatClient); - this->_pRosBridge->subscribe( - heartbeatClient, "/nemo/heartbeat", - [this](std::shared_ptr, - std::shared_ptr in_message) { - auto msg = in_message->string(); - - // parse in_message - rapidjson::Document d; - d.Parse(msg.c_str()); - if (!d.HasParseError()) { - if (d.HasMember("msg") && d["msg"].IsObject()) { - - // create obj from json - nemo_msgs::heartbeat::Heartbeat heartbeat; - if (nemo_msgs::heartbeat::fromJson(d["msg"], heartbeat)) { - std::promise promise; - auto future = promise.get_future(); - bool value = QMetaObject::invokeMethod( - this->_parent, - [this, heartbeat, promise = std::move(promise)]() mutable { - this->_onHeartbeatReceived(heartbeat, std::move(promise)); - }); - Q_ASSERT(value == true); - future.wait(); - } else { - qCWarning(NemoInterfaceLog) << "/nemo/heartbeat not able to " - "create Heartbeat form json: " - << msg.c_str(); - } - } else { - qCWarning(NemoInterfaceLog) - << "/nemo/heartbeat no \"msg\" key or wrong type: " - << msg.c_str(); - } + this->_pRosbridge->subscribeTopic(heartbeatTopic, [this]( + const QJsonObject &o) { + QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + + // parse in_message + rapidjson::Document d; + d.Parse(msg.toUtf8()); + if (!d.HasParseError()) { + if (d.HasMember("msg") && d["msg"].IsObject()) { + + // create obj from json + nemo_msgs::heartbeat::Heartbeat heartbeat; + if (nemo_msgs::heartbeat::fromJson(d["msg"], heartbeat)) { + std::promise promise; + auto future = promise.get_future(); + bool value = QMetaObject::invokeMethod( + this->_parent, + [this, heartbeat, promise = std::move(promise)]() mutable { + this->_onHeartbeatReceived(heartbeat, std::move(promise)); + }); + Q_ASSERT(value == true); + future.wait(); } else { - qCWarning(NemoInterfaceLog) - << "/nemo/heartbeat message parse error (" << d.GetParseError() - << "): " << msg.c_str(); + qCWarning(NemoInterfaceLog) << "/nemo/heartbeat not able to " + "create Heartbeat form json: " + << msg; } - }); + } else { + qCWarning(NemoInterfaceLog) + << "/nemo/heartbeat no \"msg\" key or wrong type: " << msg; + } + } else { + qCWarning(NemoInterfaceLog) << "/nemo/heartbeat message parse error (" + << d.GetParseError() << "): " << msg; + } + }); } void NemoInterface::Impl::_trySynchronize() { @@ -778,17 +765,15 @@ void NemoInterface::Impl::_doAction() { static bool resetDone = false; switch (this->_state) { case STATE::STOPPED: - this->_connectionTimer.stop(); this->_timeoutTimer.stop(); this->_clearTilesRemote(std::promise()); - if (this->_pRosBridge->running()) { - this->_pRosBridge->reset(); + if (this->_pRosbridge->state() != Rosbridge::STATE::STOPPED) { + this->_pRosbridge->stop(); } break; case STATE::START_BRIDGE: - this->_pRosBridge->run(); - this->_connectionTimer.start(CONNECTION_TIMER_INTERVAL); + this->_pRosbridge->start(); break; break; case STATE::WEBSOCKET_DETECTED: @@ -812,8 +797,8 @@ void NemoInterface::Impl::_doAction() { case STATE::WEBSOCKET_TIMEOUT: if (!resetDone) { resetDone = true; - this->_pRosBridge->reset(); - this->_pRosBridge->run(); + this->_pRosbridge->stop(); + this->_pRosbridge->start(); } this->_timeoutTimer.stop(); this->_clearTilesRemote(std::promise()); @@ -843,48 +828,48 @@ QVariant NemoInterface::Impl::_callAddTiles( } jsonTileArray.PushBack(jsonTile, allocator); } // for + rapidjson::Value tileKey("in_tile_array"); request.AddMember(tileKey, jsonTileArray, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + request.Accept(writer); + QJsonDocument req = QJsonDocument::fromJson(buffer.GetString()); + // create response handler. auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); - auto responseHandler = - [promise_response]( - std::shared_ptr connection, - std::shared_ptr in_message) mutable { - // check if transaction was successfull - auto msg = in_message->string(); - rapidjson::Document d; - d.Parse(msg.c_str()); - if (!d.HasParseError()) { - if (d.HasMember("values") && d["values"].IsObject()) { - auto values = d["values"].GetObject(); - if (values.HasMember("success") && values["success"].IsBool()) { - promise_response->set_value(values["success"].GetBool()); - } else { - qCWarning(NemoInterfaceLog) - << "/nemo/add_tiles no \"success\" key or wrong type: " - << msg.c_str(); - promise_response->set_value(false); - } - } else { - qCWarning(NemoInterfaceLog) - << "/nemo/add_tiles no \"values\" key or wrong type: " - << msg.c_str(); - promise_response->set_value(false); - } + auto responseHandler = [promise_response](const QJsonObject &o) mutable { + // check if transaction was successfull + QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + rapidjson::Document d; + d.Parse(msg.toUtf8()); + if (!d.HasParseError()) { + if (d.HasMember("values") && d["values"].IsObject()) { + auto values = d["values"].GetObject(); + if (values.HasMember("success") && values["success"].IsBool()) { + promise_response->set_value(values["success"].GetBool()); } else { qCWarning(NemoInterfaceLog) - << "/nemo/add_tiles message parse error (" << d.GetParseError() - << "): " << msg.c_str(); + << "/nemo/add_tiles no \"success\" key or wrong type: " << msg; promise_response->set_value(false); } - connection->send_close(1000); - }; + } else { + qCWarning(NemoInterfaceLog) + << "/nemo/add_tiles no \"values\" key or wrong type: " << msg; + promise_response->set_value(false); + } + } else { + qCWarning(NemoInterfaceLog) << "/nemo/add_tiles message parse error (" + << d.GetParseError() << "): " << msg; + promise_response->set_value(false); + } + }; // call service. - this->_pRosBridge->callService("/nemo/add_tiles", responseHandler, request); + this->_pRosbridge->callService("/nemo/add_tiles", responseHandler, + req.object()); // wait for response. auto tStart = hrc::now(); @@ -940,46 +925,44 @@ NemoInterface::Impl::_callRemoveTiles(std::shared_ptr pIdArray) { rapidjson::Value tileKey("ids"); request.AddMember(tileKey, jsonIdArray, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + request.Accept(writer); + QJsonDocument req = QJsonDocument::fromJson(buffer.GetString()); + // create response handler. auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); - auto responseHandler = - [promise_response]( - std::shared_ptr connection, - std::shared_ptr in_message) mutable { - // check if transaction was successfull - auto msg = in_message->string(); - rapidjson::Document d; - d.Parse(msg.c_str()); - if (!d.HasParseError()) { - if (d.HasMember("values") && d["values"].IsObject()) { - auto values = d["values"].GetObject(); - if (values.HasMember("success") && values["success"].IsBool()) { - promise_response->set_value(values["success"].GetBool()); - } else { - qCWarning(NemoInterfaceLog) - << "/nemo/remove_tiles no \"success\" key or wrong type: " - << msg.c_str(); - promise_response->set_value(false); - } - } else { - qCWarning(NemoInterfaceLog) - << "/nemo/remove_tiles no \"values\" key or wrong type: " - << msg.c_str(); - promise_response->set_value(false); - } + auto responseHandler = [promise_response](const QJsonObject &o) mutable { + // check if transaction was successfull + QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + rapidjson::Document d; + d.Parse(msg.toUtf8()); + if (!d.HasParseError()) { + if (d.HasMember("values") && d["values"].IsObject()) { + auto values = d["values"].GetObject(); + if (values.HasMember("success") && values["success"].IsBool()) { + promise_response->set_value(values["success"].GetBool()); } else { qCWarning(NemoInterfaceLog) - << "/nemo/remove_tiles message parse error (" << d.GetParseError() - << "): " << msg.c_str(); + << "/nemo/remove_tiles no \"success\" key or wrong type: " << msg; promise_response->set_value(false); } - connection->send_close(1000); - }; + } else { + qCWarning(NemoInterfaceLog) + << "/nemo/remove_tiles no \"values\" key or wrong type: " << msg; + promise_response->set_value(false); + } + } else { + qCWarning(NemoInterfaceLog) << "/nemo/remove_tiles message parse error (" + << d.GetParseError() << "): " << msg; + promise_response->set_value(false); + } + }; // call service. - this->_pRosBridge->callService("/nemo/remove_tiles", responseHandler, - request); + this->_pRosbridge->callService("/nemo/remove_tiles", responseHandler, + req.object()); // wait for response. auto tStart = hrc::now(); @@ -1026,35 +1009,29 @@ QVariant NemoInterface::Impl::_callClearTiles() { // create response handler. auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); - auto responseHandler = - [promise_response]( - std::shared_ptr connection, - std::shared_ptr in_message) mutable { - // check if transaction was successfull - auto msg = in_message->string(); - rapidjson::Document d; - d.Parse(msg.c_str()); - if (!d.HasParseError()) { - if (d.HasMember("result") && d["result"].IsBool()) { - promise_response->set_value(d["result"].GetBool()); - } else { - qCWarning(NemoInterfaceLog) - << "/nemo/clear_tiles no \"result\" key or wrong type: " - << msg.c_str(); - promise_response->set_value(false); - } - } else { - qCWarning(NemoInterfaceLog) - << "/nemo/clear_tiles message parse error (" << d.GetParseError() - << "): " << msg.c_str(); - promise_response->set_value(false); - } - connection->send_close(1000); - }; + auto responseHandler = [promise_response](const QJsonObject &o) mutable { + // check if transaction was successfull + QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + rapidjson::Document d; + d.Parse(msg.toUtf8()); + if (!d.HasParseError()) { + if (d.HasMember("result") && d["result"].IsBool()) { + promise_response->set_value(d["result"].GetBool()); + } else { + qCWarning(NemoInterfaceLog) + << "/nemo/clear_tiles no \"result\" key or wrong type: " << msg; + promise_response->set_value(false); + } + } else { + qCWarning(NemoInterfaceLog) << "/nemo/clear_tiles message parse error (" + << d.GetParseError() << "): " << msg; + promise_response->set_value(false); + } + }; // call service. - this->_pRosBridge->callService("/nemo/clear_tiles", responseHandler, - rapidjson::Document(rapidjson::kObjectType)); + this->_pRosbridge->callService("/nemo/clear_tiles", responseHandler, + QJsonObject()); // wait for response. auto tStart = hrc::now(); @@ -1108,52 +1085,51 @@ NemoInterface::Impl::_callGetProgress(std::shared_ptr pIdArray) { rapidjson::Value tileKey("ids"); request.AddMember(tileKey, jsonIdArray, allocator); + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + request.Accept(writer); + QJsonDocument req = QJsonDocument::fromJson(buffer.GetString()); + // create response handler. typedef std::shared_ptr ResponseType; auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); - auto responseHandler = - [promise_response]( - std::shared_ptr connection, - std::shared_ptr in_message) mutable { - // check if transaction was successfull - auto msg = in_message->string(); - rapidjson::Document d; - d.Parse(msg.c_str()); - if (!d.HasParseError()) { - if (d.HasMember("values") && d["values"].IsObject()) { - auto values = d["values"].GetObject(); - ros_bridge::messages::nemo_msgs::progress_array::ProgressArray - progressArrayMsg; - if (ros_bridge::messages::nemo_msgs::progress_array::fromJson( - d["values"], progressArrayMsg)) { - auto pArray = std::make_shared(); - *pArray = std::move(progressArrayMsg.progress_array()); - promise_response->set_value(pArray); - } else { - qCWarning(NemoInterfaceLog) - << "/nemo/get_progress error while creating ProgressArray " - "from json."; - promise_response->set_value(nullptr); - } - } else { - qCWarning(NemoInterfaceLog) - << "/nemo/get_progress no \"values\" key or wrong type: " - << msg.c_str(); - promise_response->set_value(nullptr); - } + auto responseHandler = [promise_response](const QJsonObject &o) mutable { + // check if transaction was successfull + QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + rapidjson::Document d; + d.Parse(msg.toUtf8()); + if (!d.HasParseError()) { + if (d.HasMember("values") && d["values"].IsObject()) { + auto values = d["values"].GetObject(); + ros_bridge::messages::nemo_msgs::progress_array::ProgressArray + progressArrayMsg; + if (ros_bridge::messages::nemo_msgs::progress_array::fromJson( + d["values"], progressArrayMsg)) { + auto pArray = std::make_shared(); + *pArray = std::move(progressArrayMsg.progress_array()); + promise_response->set_value(pArray); } else { qCWarning(NemoInterfaceLog) - << "/nemo/get_progress message parse error (" << d.GetParseError() - << "): " << msg.c_str(); + << "/nemo/get_progress error while creating ProgressArray " + "from json."; promise_response->set_value(nullptr); } - connection->send_close(1000); - }; + } else { + qCWarning(NemoInterfaceLog) + << "/nemo/get_progress no \"values\" key or wrong type: " << msg; + promise_response->set_value(nullptr); + } + } else { + qCWarning(NemoInterfaceLog) << "/nemo/get_progress message parse error (" + << d.GetParseError() << "): " << msg; + promise_response->set_value(nullptr); + } + }; // call service. - this->_pRosBridge->callService("/nemo/get_progress", responseHandler, - request); + this->_pRosbridge->callService("/nemo/get_progress", responseHandler, + req.object()); // wait for response. auto tStart = hrc::now(); @@ -1197,21 +1173,15 @@ QVariant NemoInterface::Impl::_callGetAllProgress() { this->_lastCall = CALL_NAME::GET_ALL_PROGRESS; - // create json object - rapidjson::Document request(rapidjson::kObjectType); - // create response handler. typedef std::shared_ptr ResponseType; auto promise_response = std::make_shared>(); auto future_response = promise_response->get_future(); - auto responseHandler = [promise_response]( - std::shared_ptr connection, - std::shared_ptr - in_message) mutable { + auto responseHandler = [promise_response](const QJsonObject &o) mutable { // check if transaction was successfull - auto msg = in_message->string(); + QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); rapidjson::Document d; - d.Parse(msg.c_str()); + d.Parse(msg.toUtf8()); if (!d.HasParseError()) { if (d.HasMember("values") && d["values"].IsObject()) { ros_bridge::messages::nemo_msgs::progress_array::ProgressArray @@ -1230,21 +1200,20 @@ QVariant NemoInterface::Impl::_callGetAllProgress() { } else { qCWarning(NemoInterfaceLog) << "/nemo/all_get_progress no \"values\" key or wrong type: " - << msg.c_str(); + << msg; promise_response->set_value(nullptr); } } else { qCWarning(NemoInterfaceLog) << "/nemo/all_get_progress message parse error (" << d.GetParseError() - << "): " << msg.c_str(); + << "): " << msg; promise_response->set_value(nullptr); } - connection->send_close(1000); }; // call service. - this->_pRosBridge->callService("/nemo/get_all_progress", responseHandler, - request); + this->_pRosbridge->callService("/nemo/get_all_progress", responseHandler, + QJsonObject()); // wait for response. auto tStart = hrc::now(); diff --git a/src/MeasurementComplexItem/rosbridge/rosbridge.cpp b/src/MeasurementComplexItem/rosbridge/rosbridge.cpp new file mode 100644 index 000000000..e8aac4b95 --- /dev/null +++ b/src/MeasurementComplexItem/rosbridge/rosbridge.cpp @@ -0,0 +1,253 @@ +#include "rosbridge.h" +#include "rosbridgeimpl.h" + +#include +#include +#include +#include + +Q_DECLARE_METATYPE(Rosbridge::CallBack) +Q_DECLARE_METATYPE(Rosbridge::CallBackWReturn) + +static const char *topicsKey = "topics"; +static const char *servicesKey = "services"; +static const char *rosServices = "/rosapi/services"; +static const char *rosTopics = "/rosapi/topics"; + +static constexpr auto pollIntervall = std::chrono::milliseconds(200); + +Rosbridge::Rosbridge(const QUrl url, QObject *parent) + : QObject(parent), _url(url), _running(false) { + static std::once_flag flag; + std::call_once(flag, [] { + qRegisterMetaType(); + qRegisterMetaType(); + }); +} + +Rosbridge::~Rosbridge() { stop(); } + +void Rosbridge::start() { + if (!_running) { + _running = true; + _t = new QThread(); + _impl = new RosbridgeImpl(_url); + _impl->moveToThread(_t); + + connect(this, &Rosbridge::_start, _impl, &RosbridgeImpl::start); + connect(this, &Rosbridge::_stop, _impl, &RosbridgeImpl::stop); + + connect(this, &Rosbridge::_advertiseTopic, _impl, + &RosbridgeImpl::advertiseTopic); + connect(this, &Rosbridge::_publish, _impl, &RosbridgeImpl::publish); + connect(this, &Rosbridge::_unadvertiseTopic, _impl, + &RosbridgeImpl::unadvertiseTopic); + connect(this, &Rosbridge::_unadvertiseAllTopics, _impl, + &RosbridgeImpl::unadvertiseAllTopics); + + connect(this, &Rosbridge::_subscribeTopic, _impl, + &RosbridgeImpl::subscribeTopic); + connect(this, &Rosbridge::_unsubscribeTopic, _impl, + &RosbridgeImpl::unsubscribeTopic); + connect(this, &Rosbridge::_unsubscribeAllTopics, _impl, + &RosbridgeImpl::unsubscribeAllTopics); + + connect(this, &Rosbridge::_callService, _impl, &RosbridgeImpl::callService); + connect(this, &Rosbridge::_advertiseService, _impl, + &RosbridgeImpl::advertiseService); + connect(this, &Rosbridge::_unadvertiseService, _impl, + &RosbridgeImpl::unadvertiseService); + connect(this, &Rosbridge::_unadvertiseAllServices, _impl, + &RosbridgeImpl::unadvertiseAllServices); + + _t->start(); + emit _start(); + } +} + +void Rosbridge::stop() { + if (_running) { + _running = false; + _impl->deleteLater(); + _t->quit(); + _t->wait(); + _t->deleteLater(); + } +} + +Rosbridge::STATE Rosbridge::state() { + if (_running) { + switch (_impl->state()) { + case RosbridgeImpl::STATE::STOPPED: + case RosbridgeImpl::STATE::STOPPING: + return STATE::STOPPED; + case RosbridgeImpl::STATE::STARTING: + return STATE::STARTED; + case RosbridgeImpl::STATE::CONNECTED: + return STATE::CONNECTED; + case RosbridgeImpl::STATE::TIMEOUT: + return STATE::TIMEOUT; + break; + } + qCritical() << "unreachable branch!"; + return STATE::STOPPED; + } else { + return STATE::STOPPED; + } +} + +void Rosbridge::waitConnected() { + while (this->state() != Rosbridge::STATE::CONNECTED) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } +} + +void Rosbridge::advertiseTopic(const QString &topic, const QString &type) { + emit _advertiseTopic(topic, type); +} + +void Rosbridge::publish(const QString &topic, const QJsonObject &msg) { + emit _publish(topic, msg); +} + +void Rosbridge::unadvertiseTopic(const QString &topic) { + emit _unadvertiseTopic(topic); +} + +void Rosbridge::unadvertiseAllTopics() { emit _unadvertiseAllTopics(); } + +void Rosbridge::subscribeTopic(const QString &topic, + Rosbridge::CallBack callback) { + emit _subscribeTopic(topic, callback); +} + +std::future Rosbridge::topicAvailable(const QString &topic) { + auto pPromise = std::make_shared>(); + auto future = pPromise->get_future(); + if (this->state() == STATE::CONNECTED) { + auto setBool = [pPromise, + topic](const QJsonObject &topics) mutable -> void { + if (topics.contains(topicsKey) && topics[topicsKey].isArray()) { + QJsonValue v(topic); + pPromise->set_value(topics[topicsKey].toArray().contains(v)); + } else { + pPromise->set_value(false); + } + }; + emit this->_callService(rosTopics, setBool, QJsonObject()); + } else { + pPromise->set_value(false); + } + return future; +} + +std::future Rosbridge::getAdvertisedTopics() { + auto pPromise = std::make_shared>(); + auto future = pPromise->get_future(); + if (this->state() == STATE::CONNECTED) { + auto setString = [pPromise](const QJsonObject &topics) mutable { + pPromise->set_value(topics); + }; + emit this->_callService(rosTopics, setString, QJsonObject()); + } else { + pPromise->set_value(QJsonObject()); + } + return future; +} + +void Rosbridge::unsubscribeTopic(const QString &topic) { + emit _unsubscribeTopic(topic); +} + +void Rosbridge::unsubscribeAllTopics() { emit _unsubscribeAllTopics(); } + +void Rosbridge::waitForTopic(const QString &topic) { + if (this->state() == STATE::CONNECTED) { + std::future future; + while (true) { + future = topicAvailable(topic); + auto available = future.get(); + if (available) { + break; + } + std::this_thread::sleep_for(pollIntervall); + if (this->state() != STATE::CONNECTED) { + qDebug() << "waitForTopic: Rosbridge not connected."; + break; + } + } + } else { + qDebug() << "waitForTopic: Rosbridge not connected."; + } +} + +void Rosbridge::advertiseService(const QString &service, const QString &type, + CallBackWReturn callback) { + emit _advertiseService(service, type, callback); +} + +void Rosbridge::callService(const QString &service, + Rosbridge::CallBack callback, + const QJsonObject &args) { + emit _callService(service, callback, args); +} + +std::future Rosbridge::serviceAvailable(const QString &service) { + auto pPromise = std::make_shared>(); + auto future = pPromise->get_future(); + if (this->state() == STATE::CONNECTED) { + auto setBool = [pPromise, + service](const QJsonObject &services) mutable -> void { + if (services.contains(servicesKey) && services[servicesKey].isArray()) { + QJsonValue v(service); + pPromise->set_value(services[servicesKey].toArray().contains(v)); + } else { + pPromise->set_value(false); + } + }; + emit this->_callService(rosServices, setBool, QJsonObject()); + } else { + pPromise->set_value(false); + } + return future; +} + +std::future Rosbridge::getAdvertisedServices() { + auto pPromise = std::make_shared>(); + auto future = pPromise->get_future(); + if (this->state() == STATE::CONNECTED) { + auto setString = [pPromise](const QJsonObject &services) mutable { + pPromise->set_value(services); + }; + emit this->_callService(rosServices, setString, QJsonObject()); + } else { + pPromise->set_value(QJsonObject()); + } + return future; +} + +void Rosbridge::unadvertiseService(const QString &service) { + emit _unadvertiseService(service); +} + +void Rosbridge::unadvertiseAllServices() { emit _unadvertiseAllServices(); } + +void Rosbridge::waitForService(const QString &service) { + if (this->state() == STATE::CONNECTED) { + std::future future; + while (true) { + future = serviceAvailable(service); + auto available = future.get(); + if (available) { + break; + } + std::this_thread::sleep_for(pollIntervall); + if (this->state() != STATE::CONNECTED) { + qDebug() << "waitForService: Rosbridge not connected."; + break; + } + } + } else { + qDebug() << "waitForService: Rosbridge not connected."; + } +} diff --git a/src/MeasurementComplexItem/rosbridge/rosbridge.h b/src/MeasurementComplexItem/rosbridge/rosbridge.h new file mode 100644 index 000000000..48e88cc61 --- /dev/null +++ b/src/MeasurementComplexItem/rosbridge/rosbridge.h @@ -0,0 +1,110 @@ +#ifndef ROSBRIDGE_H +#define ROSBRIDGE_H + +#include +#include +#include + +#include +#include +#include +#include +#include + +class RosbridgeImpl; +class QThread; + +class Rosbridge : public QObject { + Q_OBJECT +public: + enum class STATE { + STOPPED, + STARTED, + CONNECTED, + TIMEOUT, + }; + + typedef std::function CallBack; + typedef std::function CallBackWReturn; + + Rosbridge(QUrl _url, QObject *parent = nullptr); + ~Rosbridge(); + + void start(); + void stop(); + STATE state(); + //! + //! \brief wait Waits for connection. + //! Waits for connection. This function can be called after start() to wait + //! for the state() to change to STATE::CONNECTED. + //! + void waitConnected(); + + // Topics. + void advertiseTopic(const QString &topic, const QString &type); + + void publish(const QString &topic, const QJsonObject &msg); + void unadvertiseTopic(const QString &topic); + void unadvertiseAllTopics(); + + void subscribeTopic(const QString &topic, Rosbridge::CallBack callback); + + std::future topicAvailable(const QString &topic); + + std::future getAdvertisedTopics(); + + void unsubscribeTopic(const QString &topic); + + void unsubscribeAllTopics(); + + void waitForTopic(const QString &topic); + + // Services. + void advertiseService(const QString &service, const QString &type, + Rosbridge::CallBackWReturn callback); + + void unadvertiseService(const QString &service); + + void unadvertiseAllServices(); + + void callService(const QString &service, Rosbridge::CallBack callback, + const QJsonObject &args = QJsonObject()); + + std::future serviceAvailable(const QString &service); + + std::future getAdvertisedServices(); + + void waitForService(const QString &service); + +signals: + void stateChanged(); + +private: +signals: + void _start(); + void _stop(); + // Topics + void _advertiseTopic(const QString &topic, const QString &type); + void _publish(const QString &topic, const QJsonObject &msg); + void _unadvertiseTopic(const QString &topic); + void _unadvertiseAllTopics(); + void _subscribeTopic(const QString &topic, Rosbridge::CallBack callback); + void _unsubscribeTopic(const QString &topic); + void _unsubscribeAllTopics(); + + // Services. + void _advertiseService(const QString &service, const QString &type, + Rosbridge::CallBackWReturn callback); + void _callService(const QString &service, Rosbridge::CallBack callback, + const QJsonObject &args = QJsonObject()); + void _unadvertiseService(const QString &service); + void _unadvertiseAllServices(); + +private: + RosbridgeImpl *_impl; + QThread *_t; + QUrl _url; + std::atomic_bool _running; +}; + +#endif // ROSBRIDGE_H diff --git a/src/MeasurementComplexItem/rosbridge/rosbridgeimpl.cpp b/src/MeasurementComplexItem/rosbridge/rosbridgeimpl.cpp new file mode 100644 index 000000000..9a7e756ed --- /dev/null +++ b/src/MeasurementComplexItem/rosbridge/rosbridgeimpl.cpp @@ -0,0 +1,508 @@ +#include "rosbridgeimpl.h" + +#include +#include +#include + +static const char *advertiseOpKey = "advertise"; +static const char *subscribeOpKey = "subscribe"; +static const char *unsubscribeOpKey = "unsubscribe"; +static const char *unadvertiseOpKey = "unadvertise"; +static const char *publishOpKey = "publish"; +static const char *serviceResponseOpKey = "service_response"; +static const char *unadvertiseServiceOpKey = "unadvertise_service"; +static const char *advertiseServiceOpKey = "advertise_service"; +static const char *callServiceOpKey = "call_service"; +static const char *topicKey = "topic"; +static const char *serviceKey = "service"; +static const char *msgKey = "msg"; +static const char *opKey = "op"; +static const char *idKey = "id"; +static const char *argsKey = "args"; +static const char *resultKey = "result"; +static const char *valuesKey = "values"; +static const char *typeKey = "type"; + +struct ServiceCall { + ServiceCall() {} + ServiceCall(const Rosbridge::CallBack &c, QString i, QString s) + : callback(c), id(i), service(s) {} + + Rosbridge::CallBack callback; + QString id; + QString service; +}; + +RosbridgeImpl::RosbridgeImpl(const QUrl &url, QObject *parent) + : QObject(parent), + _webSocket(QString(), QWebSocketProtocol::VersionLatest, this), _url(url), + _state(STATE::STOPPED) { + connect(this, &RosbridgeImpl::stateChanged, this, &RosbridgeImpl::_doAction); + connect(&_webSocket, &QWebSocket::connected, this, + &RosbridgeImpl::_onConnected); + connect(&_webSocket, &QWebSocket::disconnected, this, + &RosbridgeImpl::_onDisconnected); + connect(&_webSocket, &QWebSocket::textMessageReceived, this, + &RosbridgeImpl::_onTextMessageReceived); + qRegisterMetaType(); + qRegisterMetaType(); +} + +RosbridgeImpl::~RosbridgeImpl() { + if (_state == STATE::CONNECTED) { + stop(); + } +} + +RosbridgeImpl::STATE RosbridgeImpl::state() { return _state.load(); } + +void RosbridgeImpl::start() { + if (_state == STATE::STOPPED) { + _setState(STATE::STARTING); + } +} + +void RosbridgeImpl::stop() { + if (_state != STATE::STOPPED) { + _setState(STATE::STOPPING); + } +} + +void RosbridgeImpl::advertiseTopic(const QString &topic, const QString &type) { + if (_state == STATE::CONNECTED) { + auto it = _advertisedTopics.find(topic); + if (Q_LIKELY(it == _advertisedTopics.end())) { + _advertisedTopics.insert(topic); + + QJsonObject o; + o[opKey] = advertiseOpKey; + o[topicKey] = topic; + o[typeKey] = type; + + QString payload = + QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + _webSocket.sendTextMessage(payload); + } else { + qDebug() << "Topic " << topic << " already advertised."; + } + } else { + qDebug() << "advertiseTopic: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::publish(const QString &topic, const QJsonObject &msg) { + if (_state == STATE::CONNECTED) { + auto it = _advertisedTopics.find(topic); + if (Q_LIKELY(it != _advertisedTopics.end())) { + QJsonObject o; + o[opKey] = publishOpKey; + o[topicKey] = topic; + o[msgKey] = msg; + + QString payload = + QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + _webSocket.sendTextMessage(payload); + } else { + qDebug() << "Topic " << topic << " not advertised."; + } + } else { + qDebug() << "publish: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::unadvertiseTopic(const QString &topic) { + if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { + auto it = _advertisedTopics.find(topic); + if (Q_LIKELY(it != _advertisedTopics.end())) { + _advertisedTopics.erase(it); + + QJsonObject o; + o[opKey] = unadvertiseOpKey; + o[topicKey] = topic; + + QString payload = + QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + _webSocket.sendTextMessage(payload); + } else { + qDebug() << "Topic " << topic << " not advertised."; + } + } else { + qDebug() << "unadvertiseTopic: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::unadvertiseAllTopics() { + if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { + while (!_advertisedTopics.empty()) { + const auto topic = _advertisedTopics.begin(); + unadvertiseTopic(*topic); + } + } else { + qDebug() << "unadvertiseAllTopic: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::subscribeTopic(const QString &topic, + const Rosbridge::CallBack &callback) { + if (_state == STATE::CONNECTED) { + auto it = _subscribedTopics.find(topic); + if (Q_LIKELY(it == _subscribedTopics.end())) { + auto ret = _subscribedTopics.insert(std::make_pair(topic, callback)); + Q_ASSERT(ret.second == true); + + QJsonObject o; + o[opKey] = subscribeOpKey; + o[topicKey] = topic; + + QString payload = + QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + _webSocket.sendTextMessage(payload); + + } else { + qDebug() << "subscribeTopic: topic " << topic << " already subscribed!"; + } + } else { + qDebug() << "subscribeTopic: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::unsubscribeTopic(const QString &topic) { + if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { + auto it = _subscribedTopics.find(topic); + if (Q_LIKELY(it != _subscribedTopics.end())) { + _subscribedTopics.erase(it); + + QJsonObject o; + o[opKey] = unsubscribeOpKey; + o[topicKey] = topic; + + QString payload = + QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + _webSocket.sendTextMessage(payload); + } else { + qDebug() << "unsubscribeTopic: topic " << topic << " already subscribed!"; + } + } else { + qDebug() << "unsubscribeTopic: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::unsubscribeAllTopics() { + if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { + while (!_subscribedTopics.empty()) { + auto it = _subscribedTopics.begin(); + unsubscribeTopic(it->first); + } + } else { + qDebug() << "unsubscribeAll: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::advertiseService( + const QString &service, const QString &type, + const Rosbridge::CallBackWReturn &callback) { + if (_state == STATE::CONNECTED) { + auto it = _advertisedServices.find(service); + if (it == _advertisedServices.end()) { + auto ret = _advertisedServices.insert(std::make_pair(service, callback)); + Q_ASSERT(ret.second == true); + + QJsonObject o; + o[opKey] = advertiseServiceOpKey; + o[serviceKey] = service; + o[typeKey] = type; + + QString payload = + QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + _webSocket.sendTextMessage(payload); + } else { + qDebug() << "advertiseService: Service " << service + << " already advertised. Use unadvertiseService first to " + "unadvertise the service."; + } + } else { + qDebug() << "advertiseService: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::_serviceResponse(const QString &service, const QString &id, + bool result, const QJsonObject &values) { + + if (_state == STATE::CONNECTED) { + auto it = _advertisedServices.find(service); + if (it != _advertisedServices.end()) { + + QJsonObject o; + o[opKey] = serviceResponseOpKey; + o[serviceKey] = service; + o[resultKey] = result; + o[idKey] = id; + o[valuesKey] = values; + + QString payload = + QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + _webSocket.sendTextMessage(payload); + } else { + qDebug() << "serviceResponse: Service " << service << " not advertised."; + } + } else { + qDebug() << "serviceResponse: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::_clearAllPendingServiceCalls() { + while (!_pendingServiceCalls.empty()) { + auto it = _pendingServiceCalls.begin(); + it = _pendingServiceCalls.erase(it); + } +} + +void RosbridgeImpl::callService(const QString &service, + const Rosbridge::CallBack &callback, + const QJsonObject &req) { + if (_state == STATE::CONNECTED) { + auto it = _pendingServiceCalls.find(service); + if (it == _pendingServiceCalls.end()) { + auto ret = _pendingServiceCalls.insert( + std::make_pair(service, std::deque>())); + Q_ASSERT(ret.second == true); + it = ret.first; + } + + QString id(QString::number(_getMessageId())); + auto p = + std::unique_ptr(new ServiceCall(callback, id, service)); + it->second.push_back(std::move(p)); + + QJsonObject o; + o[opKey] = callServiceOpKey; + o[serviceKey] = service; + o[idKey] = id; + o[argsKey] = req; + + QString payload = + QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + _webSocket.sendTextMessage(payload); + } else { + qDebug() << "callService: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::unadvertiseService(const QString &service) { + + if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { + auto it = _advertisedServices.find(service); + if (it != _advertisedServices.end()) { + it = _advertisedServices.erase(it); + + QJsonObject o; + o[opKey] = unadvertiseServiceOpKey; + o[serviceKey] = service; + + QString payload = + QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); + _webSocket.sendTextMessage(payload); + } else { + qDebug() << "unadvertiseService: Service " << service + << " not advertised."; + } + } else { + qDebug() << "unadvertiseService: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::unadvertiseAllServices() { + if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { + while (!_advertisedServices.empty()) { + auto it = _advertisedServices.begin(); + unadvertiseService(it->first); + } + } else { + qDebug() << "unadvertiseAllService: Rosbridge not connected!"; + } +} + +void RosbridgeImpl::_onConnected() { + if (this->_state == STATE::STARTING) { + _setState(STATE::CONNECTED); + } +} + +void RosbridgeImpl::_onDisconnected() { + if (this->_state == STATE::CONNECTED) { + _setState(STATE::TIMEOUT); + } +} + +void RosbridgeImpl::_setState(RosbridgeImpl::STATE newState) { + if (_state != newState) { + _state = newState; + emit stateChanged(); + } +} + +int RosbridgeImpl::_getMessageId() { + static int c = 0; + return c++; +} + +void RosbridgeImpl::_doAction() { + switch (_state.load()) { + case STATE::STOPPED: + break; + case STATE::STOPPING: + unadvertiseAllTopics(); + unadvertiseAllServices(); + unsubscribeAllTopics(); + _clearAllPendingServiceCalls(); + _webSocket.close(); + _setState(STATE::STOPPED); + break; + case STATE::STARTING: + _webSocket.open(_url); + break; + case STATE::TIMEOUT: + break; + case STATE::CONNECTED: + break; + } +} + +void RosbridgeImpl::_onTextMessageReceived(const QString &message) { + QJsonParseError e; + auto d = QJsonDocument::fromJson(message.toUtf8(), &e); + if (!d.isNull()) { + if (d.isObject()) { + QJsonObject o = d.object(); + if (o.contains(opKey) && o[opKey].isString()) { + auto opCode = o[opKey].toString(); + if (opCode == publishOpKey) { + _processTopic(o); + return; + } else if (opCode == serviceResponseOpKey) { + _processServiceResponse(o); + return; + } else if (opCode == callServiceOpKey) { + _processServiceCall(o); + return; + } else { + qDebug() << "_onTextMessageReceived: unknown op code: " + << o[opKey].toString(); + } + } else { + qDebug() + << "_onTextMessageReceived: json document doesn't contain op code (" + << opKey << ") or op code has wrong type."; + } + } else { + qDebug() << "_onTextMessageReceived: json document is not an object."; + } + } else { + qDebug() << "_onTextMessageReceived: parse error: " << e.errorString(); + } + qDebug() << "_onTextMessageReceived: message: " << message; +} + +void RosbridgeImpl::_processTopic(const QJsonObject &o) { + if (o.contains(topicKey) && o[topicKey].isString()) { + if (o.contains(msgKey) && o[msgKey].isObject()) { + auto topic = o[topicKey].toString(); + auto it = _subscribedTopics.find(topic); + if (Q_LIKELY(it != _subscribedTopics.end())) { + it->second(o[msgKey].toObject(QJsonObject())); + return; + } else { + qDebug() << "_processTopic: unknown topic " << topic; + } + } else { + qDebug() << "_processTopic: message key (" << msgKey << ") missing"; + } + } else { + qDebug() << "_processTopic: json document doesn't contain topic key (" + << topicKey << ") or topic key has wrong type."; + } + qDebug() << "_processTopic: msg: " << o; +} + +void RosbridgeImpl::_processServiceResponse(const QJsonObject &o) { + if (o.contains(serviceKey) && o[serviceKey].isString()) { + if (o.contains(valuesKey) && o[valuesKey].isObject()) { + auto service = o[serviceKey].toString(); + auto it = _pendingServiceCalls.find(service); + if (Q_LIKELY(it != _pendingServiceCalls.end())) { + auto p = it->second.begin(); + + // check if ids match + if (o.contains(idKey) && o[idKey].isString()) { + auto id = o[idKey].toString(); + if (id != (*p)->id) { + qDebug() << "_processServiceResponse: ids, don't match, searching " + "correct callback..."; + auto match = + std::find_if(it->second.begin(), it->second.end(), + [&id](const std::unique_ptr &c) { + return c->id == id; + }); + if (match != it->second.end()) { + p = match; + } else { + qDebug() + << "_processServiceResponse: unknown id, can't determine " + "callback"; + return; + } + } + } + + ((*p)->callback)(o[valuesKey].toObject()); // callback + p = it->second.erase(p); + if (it->second.size() == 0) { + _pendingServiceCalls.erase(it); + } + return; + } else { + qDebug() << "_processServiceResponse: unknown service " << service; + } + } else { + qDebug() << "_processServiceResponse: json document doesn't contain " + "values key (" + << valuesKey << ") or values key has wrong type."; + } + } else { + qDebug() << "_processServiceResponse: json document doesn't contain " + "service key (" + << serviceKey << ") or service key has wrong type."; + } + qDebug() << "_processServiceResponse: msg: " << o; +} + +void RosbridgeImpl::_processServiceCall(const QJsonObject &o) { + if (o.contains(serviceKey) && o[serviceKey].isString()) { + auto service = o[serviceKey].toString(); + if (o.contains(idKey) && o[idKey].isString()) { + if (o.contains(argsKey) && o[argsKey].isObject()) { + auto id = o[idKey].toString(); + auto it = _advertisedServices.find(service); + if (Q_LIKELY(it != _advertisedServices.end())) { + auto resp = it->second(o[argsKey].toObject()); + auto result = !resp.empty(); + _serviceResponse(service, id, result, resp); + return; + } else { + qDebug() << "_processServiceCall: unknown service " << service; + } + } else { + qDebug() << "_processServiceCall: args key (" << idKey + << ") missing, response not possible."; + } + } else { + qDebug() << "_processServiceCall: id key (" << idKey + << ") missing, response not possible."; + } + } else { + qDebug() + << "_processServiceCall: json document doesn't contain service key (" + << serviceKey << ") or service key has wrong type."; + } + qDebug() << "_processServiceCall: msg: " << o; +} diff --git a/src/MeasurementComplexItem/rosbridge/rosbridgeimpl.h b/src/MeasurementComplexItem/rosbridge/rosbridgeimpl.h new file mode 100644 index 000000000..e5e7ff85f --- /dev/null +++ b/src/MeasurementComplexItem/rosbridge/rosbridgeimpl.h @@ -0,0 +1,84 @@ +#ifndef ROSBRIDGEIMPL_H +#define ROSBRIDGEIMPL_H + +#include "rosbridge.h" + +#include +#include + +#include +#include +#include + +struct ServiceCall; + +class RosbridgeImpl : public QObject { + Q_OBJECT +public: + enum class STATE { STOPPED, STOPPING, STARTING, CONNECTED, TIMEOUT }; + + RosbridgeImpl(const QUrl &url, QObject *parent = nullptr); + ~RosbridgeImpl(); + + STATE state(); // thread safe +public slots: + void start(); + void stop(); + + // Topics. + void advertiseTopic(const QString &topic, const QString &type); + + void publish(const QString &topic, const QJsonObject &msg); + + void unadvertiseTopic(const QString &topic); + + void unadvertiseAllTopics(); + + void subscribeTopic(const QString &topic, + const Rosbridge::CallBack &callback); + + void unsubscribeTopic(const QString &topic); + + void unsubscribeAllTopics(); + + // Services. + void advertiseService(const QString &service, const QString &type, + const Rosbridge::CallBackWReturn &callback); + + void callService(const QString &service, const Rosbridge::CallBack &callback, + const QJsonObject &req = QJsonObject()); + + void unadvertiseService(const QString &service); + + void unadvertiseAllServices(); + +signals: + void stateChanged(); + +private slots: + void _onConnected(); + void _onDisconnected(); + void _doAction(); + void _onTextMessageReceived(const QString &message); + +private: + void _processTopic(const QJsonObject &o); + void _processServiceResponse(const QJsonObject &o); + void _processServiceCall(const QJsonObject &o); + void _setState(STATE newState); + int _getMessageId(); + void _serviceResponse(const QString &service, const QString &id, bool result, + const QJsonObject &values); + void _clearAllPendingServiceCalls(); + + QWebSocket _webSocket; + QUrl _url; + std::set _advertisedTopics; + std::map _subscribedTopics; + std::map _advertisedServices; + std::map>> + _pendingServiceCalls; + std::atomic _state; +}; + +#endif // ROSBRIDGEIMPL_H -- 2.22.0