Commit 9e620065 authored by Valentin Platzgummer's avatar Valentin Platzgummer

new rosbridge impl added, not tested

parent b94dfebd
......@@ -267,7 +267,8 @@ QT += \
widgets \
xml \
texttospeech \
core-private
core-private \
websockets
# Multimedia only used if QVC is enabled
!contains (DEFINES, QGC_DISABLE_UVC) {
......@@ -420,6 +421,7 @@ INCLUDEPATH += \
src/Audio \
src/comm \
src/MeasurementComplexItem \
src/MeasurementComplexItem/rosbridge \
src/MeasurementComplexItem/geometry \
src/MeasurementComplexItem/nemo_interface \
src/comm/ros_bridge \
......@@ -454,6 +456,8 @@ HEADERS += \
src/MeasurementComplexItem/geometry/TileDiff.h \
src/MeasurementComplexItem/geometry/geometry.h \
src/MeasurementComplexItem/HashFunctions.h \
src/MeasurementComplexItem/rosbridge/rosbridge.h \
src/MeasurementComplexItem/rosbridge/rosbridgeimpl.h \
src/MeasurementComplexItem/nemo_interface/FutureWatcher.h \
src/MeasurementComplexItem/nemo_interface/FutureWatcherInterface.h \
src/MeasurementComplexItem/nemo_interface/Task.h \
......@@ -508,18 +512,13 @@ HEADERS += \
src/GPS/Drivers/src/base_station.h \
src/Settings/WimaSettings.h \
src/comm/QmlObjectListHelper.h \
src/comm/ros_bridge/include/RosBridgeClient.h \
src/comm/ros_bridge/include/com_private.h \
src/comm/ros_bridge/include/message_traits.h \
src/comm/ros_bridge/include/messages/geographic_msgs/geopoint.h \
src/comm/ros_bridge/include/messages/nemo_msgs/heartbeat.h \
src/comm/ros_bridge/include/messages/nemo_msgs/progress_array.h \
src/comm/ros_bridge/include/messages/nemo_msgs/tile.h \
src/comm/ros_bridge/include/messages/std_msgs/header.h \
src/comm/ros_bridge/include/server.h \
src/comm/ros_bridge/include/messages/std_msgs/time.h \
src/comm/ros_bridge/include/topic_publisher.h \
src/comm/ros_bridge/include/topic_subscriber.h \
src/comm/utilities.h
contains (DEFINES, QGC_ENABLE_PAIRING) {
......@@ -534,6 +533,8 @@ SOURCES += \
src/MeasurementComplexItem/geometry/SafeArea.cc \
src/MeasurementComplexItem/geometry/geometry.cpp \
src/MeasurementComplexItem/HashFunctions.cpp \
src/MeasurementComplexItem/rosbridge/rosbridge.cpp \
src/MeasurementComplexItem/rosbridge/rosbridgeimpl.cpp \
src/MeasurementComplexItem/nemo_interface/FutureWatcherInterface.cpp \
src/MeasurementComplexItem/nemo_interface/MeasurementTile.cpp \
src/MeasurementComplexItem/nemo_interface/Task.cpp \
......@@ -554,8 +555,6 @@ SOURCES += \
src/MeasurementComplexItem/geometry/GeoPoint3D.cpp \
src/MeasurementComplexItem/NemoInterface.cpp \
src/comm/QmlObjectListHelper.cpp \
src/comm/ros_bridge/include/RosBridgeClient.cpp \
src/comm/ros_bridge/include/com_private.cpp \
src/comm/ros_bridge/include/messages/geographic_msgs/geopoint.cpp \
src/comm/ros_bridge/include/messages/nemo_msgs/heartbeat.cpp \
src/comm/ros_bridge/include/messages/nemo_msgs/labeled_progress.cpp \
......@@ -563,11 +562,7 @@ SOURCES += \
src/comm/ros_bridge/include/messages/nemo_msgs/tile.cpp \
src/comm/ros_bridge/include/messages/std_msgs/header.cpp \
src/comm/ros_bridge/include/messages/std_msgs/time.cpp \
src/comm/ros_bridge/include/server.cpp \
src/comm/ros_bridge/include/topic_publisher.cpp \
src/comm/ros_bridge/include/topic_subscriber.cpp \
src/Settings/WimaSettings.cc \
src/comm/ros_bridge/src/ros_bridge.cpp
contains (DEFINES, QGC_ENABLE_PAIRING) {
SOURCES += \
......
......@@ -10,6 +10,7 @@
#include <shared_mutex>
#include <QTimer>
#include <QUrl>
#include "GenericSingelton.h"
#include "geometry/MeasurementArea.h"
......@@ -19,26 +20,26 @@
#include "nemo_interface/QNemoHeartbeat.h"
#include "nemo_interface/TaskDispatcher.h"
#include "ros_bridge/include/messages/nemo_msgs/progress_array.h"
#include "ros_bridge/include/RosBridgeClient.h"
#include "ros_bridge/include/messages/geographic_msgs/geopoint.h"
#include "ros_bridge/include/messages/nemo_msgs/heartbeat.h"
#include "ros_bridge/include/messages/nemo_msgs/progress_array.h"
#include "ros_bridge/include/messages/nemo_msgs/tile.h"
#include "ros_bridge/include/ros_bridge.h"
#include "ros_bridge/rapidjson/include/rapidjson/document.h"
#include "ros_bridge/rapidjson/include/rapidjson/ostreamwrapper.h"
#include "ros_bridge/rapidjson/include/rapidjson/writer.h"
#include "rosbridge/rosbridge.h"
QGC_LOGGING_CATEGORY(NemoInterfaceLog, "NemoInterfaceLog")
#define SYNC_INTERVAL 1000 // ms
#define NO_HEARTBEAT_TIMEOUT 5000 // ms
#define CONNECTION_TIMER_INTERVAL 1000 // ms
#define SYNC_INTERVAL 1000 // ms
#define NO_HEARTBEAT_TIMEOUT 5000 // ms
static constexpr auto maxResponseTime = std::chrono::milliseconds(10000);
static const char *progressTopic = "/nemo/progress";
static const char *heartbeatTopic = "/nemo/heartbeat";
using hrc = std::chrono::high_resolution_clock;
using ROSBridgePtr = std::shared_ptr<RosbridgeWsClient>;
using ROSBridgePtr = std::shared_ptr<Rosbridge>;
typedef ros_bridge::messages::nemo_msgs::tile::GenericTile<QGeoCoordinate,
QList>
......@@ -103,6 +104,7 @@ private:
bool _sysSync() const; // thread safe
void _onFutureWatcherFinished(); // thread safe
void _onHeartbeatTimeout(); // thread safe
void _onRosbridgeStateChanged();
// called from dispatcher thread!
QVariant _callAddTiles(
......@@ -150,7 +152,7 @@ private:
std::atomic<STATE> _state;
std::atomic<CALL_NAME> _lastCall;
ROSBridgePtr _pRosBridge;
ROSBridgePtr _pRosbridge;
TileMap _remoteTiles;
TileMapConst _localTiles;
NemoInterface *const _parent;
......@@ -158,7 +160,6 @@ private:
QString _infoString;
QString _warningString;
QTimer _timeoutTimer;
QTimer _connectionTimer;
QNemoHeartbeat _lastHeartbeat;
FutureWatcher _futureWatcher;
};
......@@ -185,17 +186,11 @@ NemoInterface::Impl::Impl(NemoInterface *p)
auto connectionStringFact = wimaSettings->rosbridgeConnectionString();
auto setConnectionString = [connectionStringFact, this] {
auto connectionString = connectionStringFact->rawValue().toString();
if (!is_valid_port_path(connectionString.toLocal8Bit().data())) {
qgcApp()->warningMessageBoxOnMainThread(
"Nemo Interface",
"Websocket connection string possibly invalid: " + connectionString +
". Trying to connect anyways.");
}
bool wasRunning = this->running();
this->stop();
this->_pRosBridge = std::make_shared<RosbridgeWsClient>(
connectionString.toLocal8Bit().data());
this->_pRosbridge = std::make_shared<Rosbridge>(
QUrl(QString("ws://") + connectionString.toLocal8Bit().data()));
if (wasRunning) {
this->start();
}
......@@ -209,29 +204,14 @@ NemoInterface::Impl::Impl(NemoInterface *p)
std::bind(&Impl::_onHeartbeatTimeout, this));
// Connection timer (temporary workaround)
connect(&this->_connectionTimer, &QTimer::timeout, [this] {
if (this->_pRosBridge->connected()) {
if (this->_state == STATE::START_BRIDGE ||
this->_state == STATE::WEBSOCKET_TIMEOUT) {
this->_setState(STATE::WEBSOCKET_DETECTED);
this->_doAction();
}
} else {
if (this->_state == STATE::TRY_TOPIC_SERVICE_SETUP ||
this->_state == STATE::READY ||
this->_state == STATE::WEBSOCKET_DETECTED ||
this->_state == STATE::HEARTBEAT_TIMEOUT) {
this->_setState(STATE::WEBSOCKET_TIMEOUT);
this->_doAction();
}
}
});
connect(this->_pRosbridge.get(), &Rosbridge::stateChanged,
[this] { this->_onRosbridgeStateChanged(); });
connect(&this->_futureWatcher, &FutureWatcher::finished,
[this] { this->_onFutureWatcherFinished(); });
}
NemoInterface::Impl::~Impl() { this->_pRosBridge->reset(); }
NemoInterface::Impl::~Impl() { this->_pRosbridge->stop(); }
void NemoInterface::Impl::start() {
if (!running()) {
......@@ -533,6 +513,25 @@ void NemoInterface::Impl::_onHeartbeatTimeout() {
this->_doAction();
}
void NemoInterface::Impl::_onRosbridgeStateChanged() {
auto state = this->_pRosbridge->state();
if (state == Rosbridge::STATE::CONNECTED) {
if (this->_state == STATE::START_BRIDGE ||
this->_state == STATE::WEBSOCKET_TIMEOUT) {
this->_setState(STATE::WEBSOCKET_DETECTED);
this->_doAction();
}
} else if (state == Rosbridge::STATE::TIMEOUT) {
if (this->_state == STATE::TRY_TOPIC_SERVICE_SETUP ||
this->_state == STATE::READY ||
this->_state == STATE::WEBSOCKET_DETECTED ||
this->_state == STATE::HEARTBEAT_TIMEOUT) {
this->_setState(STATE::WEBSOCKET_TIMEOUT);
this->_doAction();
}
}
}
bool NemoInterface::Impl::_userSync() const { return _userSync(this->_state); }
const QString &NemoInterface::Impl::infoString() const { return _infoString; }
......@@ -600,113 +599,101 @@ void NemoInterface::Impl::_doTopicServiceSetup() {
using namespace ros_bridge::messages;
// Subscribe nemo progress.
const char *progressClient = "client:/nemo/progress";
this->_pRosBridge->addClient(progressClient);
this->_pRosBridge->subscribe(
progressClient, "/nemo/progress",
[this](std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message) {
auto msg = in_message->string();
// parse in_message
rapidjson::Document d;
d.Parse(msg.c_str());
if (!d.HasParseError()) {
if (d.HasMember("msg") && d["msg"].IsObject()) {
// create obj from json
nemo_msgs::progress_array::ProgressArray progressArray;
if (nemo_msgs::progress_array::fromJson(d["msg"], progressArray)) {
// correct range errors of progress
for (auto &lp : progressArray.progress_array()) {
bool rangeError = false;
if (lp.progress() < 0) {
lp.setProgress(0);
rangeError = true;
}
if (lp.progress() > 100) {
lp.setProgress(100);
rangeError = true;
}
if (rangeError) {
qCWarning(NemoInterfaceLog) << "/nemo/progress progress out "
"of range, value was set to: "
<< lp.progress();
}
}
auto p = std::make_shared<ProgressArray>();
*p = std::move(progressArray.progress_array());
std::promise<bool> promise;
auto future = promise.get_future();
bool value = QMetaObject::invokeMethod(
this->_parent,
[this, p, promise = std::move(promise)]() mutable {
this->_updateProgress(p, std::move(promise));
});
Q_ASSERT(value == true);
future.wait();
} else {
qCWarning(NemoInterfaceLog) << "/nemo/progress not able to "
"create ProgressArray form json: "
<< msg.c_str();
this->_pRosbridge->subscribeTopic(progressTopic, [this](
const QJsonObject &o) {
QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
// parse in_message
rapidjson::Document d;
d.Parse(msg.toUtf8());
if (!d.HasParseError()) {
if (d.HasMember("msg") && d["msg"].IsObject()) {
// create obj from json
nemo_msgs::progress_array::ProgressArray progressArray;
if (nemo_msgs::progress_array::fromJson(d["msg"], progressArray)) {
// correct range errors of progress
for (auto &lp : progressArray.progress_array()) {
bool rangeError = false;
if (lp.progress() < 0) {
lp.setProgress(0);
rangeError = true;
}
if (lp.progress() > 100) {
lp.setProgress(100);
rangeError = true;
}
if (rangeError) {
qCWarning(NemoInterfaceLog) << "/nemo/progress progress out "
"of range, value was set to: "
<< lp.progress();
}
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/progress no \"msg\" key or wrong type: "
<< msg.c_str();
}
auto p = std::make_shared<ProgressArray>();
*p = std::move(progressArray.progress_array());
std::promise<bool> promise;
auto future = promise.get_future();
bool value = QMetaObject::invokeMethod(
this->_parent, [this, p, promise = std::move(promise)]() mutable {
this->_updateProgress(p, std::move(promise));
});
Q_ASSERT(value == true);
future.wait();
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/progress message parse error (" << d.GetParseError()
<< "): " << msg.c_str();
qCWarning(NemoInterfaceLog) << "/nemo/progress not able to "
"create ProgressArray form json: "
<< msg;
}
});
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/progress no \"msg\" key or wrong type: " << msg;
}
} else {
qCWarning(NemoInterfaceLog) << "/nemo/progress message parse error ("
<< d.GetParseError() << "): " << msg;
}
});
// Subscribe heartbeat msg.
const char *heartbeatClient = "client:/nemo/heartbeat";
this->_pRosBridge->addClient(heartbeatClient);
this->_pRosBridge->subscribe(
heartbeatClient, "/nemo/heartbeat",
[this](std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message) {
auto msg = in_message->string();
// parse in_message
rapidjson::Document d;
d.Parse(msg.c_str());
if (!d.HasParseError()) {
if (d.HasMember("msg") && d["msg"].IsObject()) {
// create obj from json
nemo_msgs::heartbeat::Heartbeat heartbeat;
if (nemo_msgs::heartbeat::fromJson(d["msg"], heartbeat)) {
std::promise<bool> promise;
auto future = promise.get_future();
bool value = QMetaObject::invokeMethod(
this->_parent,
[this, heartbeat, promise = std::move(promise)]() mutable {
this->_onHeartbeatReceived(heartbeat, std::move(promise));
});
Q_ASSERT(value == true);
future.wait();
} else {
qCWarning(NemoInterfaceLog) << "/nemo/heartbeat not able to "
"create Heartbeat form json: "
<< msg.c_str();
}
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/heartbeat no \"msg\" key or wrong type: "
<< msg.c_str();
}
this->_pRosbridge->subscribeTopic(heartbeatTopic, [this](
const QJsonObject &o) {
QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
// parse in_message
rapidjson::Document d;
d.Parse(msg.toUtf8());
if (!d.HasParseError()) {
if (d.HasMember("msg") && d["msg"].IsObject()) {
// create obj from json
nemo_msgs::heartbeat::Heartbeat heartbeat;
if (nemo_msgs::heartbeat::fromJson(d["msg"], heartbeat)) {
std::promise<bool> promise;
auto future = promise.get_future();
bool value = QMetaObject::invokeMethod(
this->_parent,
[this, heartbeat, promise = std::move(promise)]() mutable {
this->_onHeartbeatReceived(heartbeat, std::move(promise));
});
Q_ASSERT(value == true);
future.wait();
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/heartbeat message parse error (" << d.GetParseError()
<< "): " << msg.c_str();
qCWarning(NemoInterfaceLog) << "/nemo/heartbeat not able to "
"create Heartbeat form json: "
<< msg;
}
});
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/heartbeat no \"msg\" key or wrong type: " << msg;
}
} else {
qCWarning(NemoInterfaceLog) << "/nemo/heartbeat message parse error ("
<< d.GetParseError() << "): " << msg;
}
});
}
void NemoInterface::Impl::_trySynchronize() {
......@@ -778,17 +765,15 @@ void NemoInterface::Impl::_doAction() {
static bool resetDone = false;
switch (this->_state) {
case STATE::STOPPED:
this->_connectionTimer.stop();
this->_timeoutTimer.stop();
this->_clearTilesRemote(std::promise<bool>());
if (this->_pRosBridge->running()) {
this->_pRosBridge->reset();
if (this->_pRosbridge->state() != Rosbridge::STATE::STOPPED) {
this->_pRosbridge->stop();
}
break;
case STATE::START_BRIDGE:
this->_pRosBridge->run();
this->_connectionTimer.start(CONNECTION_TIMER_INTERVAL);
this->_pRosbridge->start();
break;
break;
case STATE::WEBSOCKET_DETECTED:
......@@ -812,8 +797,8 @@ void NemoInterface::Impl::_doAction() {
case STATE::WEBSOCKET_TIMEOUT:
if (!resetDone) {
resetDone = true;
this->_pRosBridge->reset();
this->_pRosBridge->run();
this->_pRosbridge->stop();
this->_pRosbridge->start();
}
this->_timeoutTimer.stop();
this->_clearTilesRemote(std::promise<bool>());
......@@ -843,48 +828,48 @@ QVariant NemoInterface::Impl::_callAddTiles(
}
jsonTileArray.PushBack(jsonTile, allocator);
} // for
rapidjson::Value tileKey("in_tile_array");
request.AddMember(tileKey, jsonTileArray, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
request.Accept(writer);
QJsonDocument req = QJsonDocument::fromJson(buffer.GetString());
// create response handler.
auto promise_response = std::make_shared<std::promise<bool>>();
auto future_response = promise_response->get_future();
auto responseHandler =
[promise_response](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message) mutable {
// check if transaction was successfull
auto msg = in_message->string();
rapidjson::Document d;
d.Parse(msg.c_str());
if (!d.HasParseError()) {
if (d.HasMember("values") && d["values"].IsObject()) {
auto values = d["values"].GetObject();
if (values.HasMember("success") && values["success"].IsBool()) {
promise_response->set_value(values["success"].GetBool());
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/add_tiles no \"success\" key or wrong type: "
<< msg.c_str();
promise_response->set_value(false);
}
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/add_tiles no \"values\" key or wrong type: "
<< msg.c_str();
promise_response->set_value(false);
}
auto responseHandler = [promise_response](const QJsonObject &o) mutable {
// check if transaction was successfull
QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
rapidjson::Document d;
d.Parse(msg.toUtf8());
if (!d.HasParseError()) {
if (d.HasMember("values") && d["values"].IsObject()) {
auto values = d["values"].GetObject();
if (values.HasMember("success") && values["success"].IsBool()) {
promise_response->set_value(values["success"].GetBool());
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/add_tiles message parse error (" << d.GetParseError()
<< "): " << msg.c_str();
<< "/nemo/add_tiles no \"success\" key or wrong type: " << msg;
promise_response->set_value(false);
}
connection->send_close(1000);
};
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/add_tiles no \"values\" key or wrong type: " << msg;
promise_response->set_value(false);
}
} else {
qCWarning(NemoInterfaceLog) << "/nemo/add_tiles message parse error ("
<< d.GetParseError() << "): " << msg;
promise_response->set_value(false);
}
};
// call service.
this->_pRosBridge->callService("/nemo/add_tiles", responseHandler, request);
this->_pRosbridge->callService("/nemo/add_tiles", responseHandler,
req.object());
// wait for response.
auto tStart = hrc::now();
......@@ -940,46 +925,44 @@ NemoInterface::Impl::_callRemoveTiles(std::shared_ptr<IDArray> pIdArray) {
rapidjson::Value tileKey("ids");
request.AddMember(tileKey, jsonIdArray, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
request.Accept(writer);
QJsonDocument req = QJsonDocument::fromJson(buffer.GetString());
// create response handler.
auto promise_response = std::make_shared<std::promise<bool>>();
auto future_response = promise_response->get_future();
auto responseHandler =
[promise_response](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message) mutable {
// check if transaction was successfull
auto msg = in_message->string();
rapidjson::Document d;
d.Parse(msg.c_str());
if (!d.HasParseError()) {
if (d.HasMember("values") && d["values"].IsObject()) {
auto values = d["values"].GetObject();
if (values.HasMember("success") && values["success"].IsBool()) {
promise_response->set_value(values["success"].GetBool());
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/remove_tiles no \"success\" key or wrong type: "
<< msg.c_str();
promise_response->set_value(false);
}
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/remove_tiles no \"values\" key or wrong type: "
<< msg.c_str();
promise_response->set_value(false);
}
auto responseHandler = [promise_response](const QJsonObject &o) mutable {
// check if transaction was successfull
QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
rapidjson::Document d;
d.Parse(msg.toUtf8());
if (!d.HasParseError()) {
if (d.HasMember("values") && d["values"].IsObject()) {
auto values = d["values"].GetObject();
if (values.HasMember("success") && values["success"].IsBool()) {
promise_response->set_value(values["success"].GetBool());
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/remove_tiles message parse error (" << d.GetParseError()
<< "): " << msg.c_str();
<< "/nemo/remove_tiles no \"success\" key or wrong type: " << msg;
promise_response->set_value(false);
}
connection->send_close(1000);
};
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/remove_tiles no \"values\" key or wrong type: " << msg;
promise_response->set_value(false);
}
} else {
qCWarning(NemoInterfaceLog) << "/nemo/remove_tiles message parse error ("
<< d.GetParseError() << "): " << msg;
promise_response->set_value(false);
}
};
// call service.
this->_pRosBridge->callService("/nemo/remove_tiles", responseHandler,
request);
this->_pRosbridge->callService("/nemo/remove_tiles", responseHandler,
req.object());
// wait for response.
auto tStart = hrc::now();
......@@ -1026,35 +1009,29 @@ QVariant NemoInterface::Impl::_callClearTiles() {
// create response handler.
auto promise_response = std::make_shared<std::promise<bool>>();
auto future_response = promise_response->get_future();
auto responseHandler =
[promise_response](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message) mutable {
// check if transaction was successfull
auto msg = in_message->string();
rapidjson::Document d;
d.Parse(msg.c_str());
if (!d.HasParseError()) {
if (d.HasMember("result") && d["result"].IsBool()) {
promise_response->set_value(d["result"].GetBool());
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/clear_tiles no \"result\" key or wrong type: "
<< msg.c_str();
promise_response->set_value(false);
}
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/clear_tiles message parse error (" << d.GetParseError()
<< "): " << msg.c_str();
promise_response->set_value(false);
}
connection->send_close(1000);
};
auto responseHandler = [promise_response](const QJsonObject &o) mutable {
// check if transaction was successfull
QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
rapidjson::Document d;
d.Parse(msg.toUtf8());
if (!d.HasParseError()) {
if (d.HasMember("result") && d["result"].IsBool()) {
promise_response->set_value(d["result"].GetBool());
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/clear_tiles no \"result\" key or wrong type: " << msg;
promise_response->set_value(false);
}
} else {
qCWarning(NemoInterfaceLog) << "/nemo/clear_tiles message parse error ("
<< d.GetParseError() << "): " << msg;
promise_response->set_value(false);
}
};
// call service.
this->_pRosBridge->callService("/nemo/clear_tiles", responseHandler,
rapidjson::Document(rapidjson::kObjectType));
this->_pRosbridge->callService("/nemo/clear_tiles", responseHandler,
QJsonObject());
// wait for response.
auto tStart = hrc::now();
......@@ -1108,52 +1085,51 @@ NemoInterface::Impl::_callGetProgress(std::shared_ptr<IDArray> pIdArray) {
rapidjson::Value tileKey("ids");
request.AddMember(tileKey, jsonIdArray, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
request.Accept(writer);
QJsonDocument req = QJsonDocument::fromJson(buffer.GetString());
// create response handler.
typedef std::shared_ptr<ProgressArray> ResponseType;
auto promise_response = std::make_shared<std::promise<ResponseType>>();
auto future_response = promise_response->get_future();
auto responseHandler =
[promise_response](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message) mutable {
// check if transaction was successfull
auto msg = in_message->string();
rapidjson::Document d;
d.Parse(msg.c_str());
if (!d.HasParseError()) {
if (d.HasMember("values") && d["values"].IsObject()) {
auto values = d["values"].GetObject();
ros_bridge::messages::nemo_msgs::progress_array::ProgressArray
progressArrayMsg;
if (ros_bridge::messages::nemo_msgs::progress_array::fromJson(
d["values"], progressArrayMsg)) {
auto pArray = std::make_shared<ProgressArray>();
*pArray = std::move(progressArrayMsg.progress_array());
promise_response->set_value(pArray);
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/get_progress error while creating ProgressArray "
"from json.";
promise_response->set_value(nullptr);
}
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/get_progress no \"values\" key or wrong type: "
<< msg.c_str();
promise_response->set_value(nullptr);
}
auto responseHandler = [promise_response](const QJsonObject &o) mutable {
// check if transaction was successfull
QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
rapidjson::Document d;
d.Parse(msg.toUtf8());
if (!d.HasParseError()) {
if (d.HasMember("values") && d["values"].IsObject()) {
auto values = d["values"].GetObject();
ros_bridge::messages::nemo_msgs::progress_array::ProgressArray
progressArrayMsg;
if (ros_bridge::messages::nemo_msgs::progress_array::fromJson(
d["values"], progressArrayMsg)) {
auto pArray = std::make_shared<ProgressArray>();
*pArray = std::move(progressArrayMsg.progress_array());
promise_response->set_value(pArray);
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/get_progress message parse error (" << d.GetParseError()
<< "): " << msg.c_str();
<< "/nemo/get_progress error while creating ProgressArray "
"from json.";
promise_response->set_value(nullptr);
}
connection->send_close(1000);
};
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/get_progress no \"values\" key or wrong type: " << msg;
promise_response->set_value(nullptr);
}
} else {
qCWarning(NemoInterfaceLog) << "/nemo/get_progress message parse error ("
<< d.GetParseError() << "): " << msg;
promise_response->set_value(nullptr);
}
};
// call service.
this->_pRosBridge->callService("/nemo/get_progress", responseHandler,
request);
this->_pRosbridge->callService("/nemo/get_progress", responseHandler,
req.object());
// wait for response.
auto tStart = hrc::now();
......@@ -1197,21 +1173,15 @@ QVariant NemoInterface::Impl::_callGetAllProgress() {
this->_lastCall = CALL_NAME::GET_ALL_PROGRESS;
// create json object
rapidjson::Document request(rapidjson::kObjectType);
// create response handler.
typedef std::shared_ptr<ProgressArray> ResponseType;
auto promise_response = std::make_shared<std::promise<ResponseType>>();
auto future_response = promise_response->get_future();
auto responseHandler = [promise_response](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage>
in_message) mutable {
auto responseHandler = [promise_response](const QJsonObject &o) mutable {
// check if transaction was successfull
auto msg = in_message->string();
QString msg = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
rapidjson::Document d;
d.Parse(msg.c_str());
d.Parse(msg.toUtf8());
if (!d.HasParseError()) {
if (d.HasMember("values") && d["values"].IsObject()) {
ros_bridge::messages::nemo_msgs::progress_array::ProgressArray
......@@ -1230,21 +1200,20 @@ QVariant NemoInterface::Impl::_callGetAllProgress() {
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/all_get_progress no \"values\" key or wrong type: "
<< msg.c_str();
<< msg;
promise_response->set_value(nullptr);
}
} else {
qCWarning(NemoInterfaceLog)
<< "/nemo/all_get_progress message parse error (" << d.GetParseError()
<< "): " << msg.c_str();
<< "): " << msg;
promise_response->set_value(nullptr);
}
connection->send_close(1000);
};
// call service.
this->_pRosBridge->callService("/nemo/get_all_progress", responseHandler,
request);
this->_pRosbridge->callService("/nemo/get_all_progress", responseHandler,
QJsonObject());
// wait for response.
auto tStart = hrc::now();
......
#include "rosbridge.h"
#include "rosbridgeimpl.h"
#include <QJsonArray>
#include <QJsonDocument>
#include <QJsonObject>
#include <QThread>
Q_DECLARE_METATYPE(Rosbridge::CallBack)
Q_DECLARE_METATYPE(Rosbridge::CallBackWReturn)
static const char *topicsKey = "topics";
static const char *servicesKey = "services";
static const char *rosServices = "/rosapi/services";
static const char *rosTopics = "/rosapi/topics";
static constexpr auto pollIntervall = std::chrono::milliseconds(200);
Rosbridge::Rosbridge(const QUrl url, QObject *parent)
: QObject(parent), _url(url), _running(false) {
static std::once_flag flag;
std::call_once(flag, [] {
qRegisterMetaType<Rosbridge::CallBack>();
qRegisterMetaType<Rosbridge::CallBackWReturn>();
});
}
Rosbridge::~Rosbridge() { stop(); }
void Rosbridge::start() {
if (!_running) {
_running = true;
_t = new QThread();
_impl = new RosbridgeImpl(_url);
_impl->moveToThread(_t);
connect(this, &Rosbridge::_start, _impl, &RosbridgeImpl::start);
connect(this, &Rosbridge::_stop, _impl, &RosbridgeImpl::stop);
connect(this, &Rosbridge::_advertiseTopic, _impl,
&RosbridgeImpl::advertiseTopic);
connect(this, &Rosbridge::_publish, _impl, &RosbridgeImpl::publish);
connect(this, &Rosbridge::_unadvertiseTopic, _impl,
&RosbridgeImpl::unadvertiseTopic);
connect(this, &Rosbridge::_unadvertiseAllTopics, _impl,
&RosbridgeImpl::unadvertiseAllTopics);
connect(this, &Rosbridge::_subscribeTopic, _impl,
&RosbridgeImpl::subscribeTopic);
connect(this, &Rosbridge::_unsubscribeTopic, _impl,
&RosbridgeImpl::unsubscribeTopic);
connect(this, &Rosbridge::_unsubscribeAllTopics, _impl,
&RosbridgeImpl::unsubscribeAllTopics);
connect(this, &Rosbridge::_callService, _impl, &RosbridgeImpl::callService);
connect(this, &Rosbridge::_advertiseService, _impl,
&RosbridgeImpl::advertiseService);
connect(this, &Rosbridge::_unadvertiseService, _impl,
&RosbridgeImpl::unadvertiseService);
connect(this, &Rosbridge::_unadvertiseAllServices, _impl,
&RosbridgeImpl::unadvertiseAllServices);
_t->start();
emit _start();
}
}
void Rosbridge::stop() {
if (_running) {
_running = false;
_impl->deleteLater();
_t->quit();
_t->wait();
_t->deleteLater();
}
}
Rosbridge::STATE Rosbridge::state() {
if (_running) {
switch (_impl->state()) {
case RosbridgeImpl::STATE::STOPPED:
case RosbridgeImpl::STATE::STOPPING:
return STATE::STOPPED;
case RosbridgeImpl::STATE::STARTING:
return STATE::STARTED;
case RosbridgeImpl::STATE::CONNECTED:
return STATE::CONNECTED;
case RosbridgeImpl::STATE::TIMEOUT:
return STATE::TIMEOUT;
break;
}
qCritical() << "unreachable branch!";
return STATE::STOPPED;
} else {
return STATE::STOPPED;
}
}
void Rosbridge::waitConnected() {
while (this->state() != Rosbridge::STATE::CONNECTED) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
void Rosbridge::advertiseTopic(const QString &topic, const QString &type) {
emit _advertiseTopic(topic, type);
}
void Rosbridge::publish(const QString &topic, const QJsonObject &msg) {
emit _publish(topic, msg);
}
void Rosbridge::unadvertiseTopic(const QString &topic) {
emit _unadvertiseTopic(topic);
}
void Rosbridge::unadvertiseAllTopics() { emit _unadvertiseAllTopics(); }
void Rosbridge::subscribeTopic(const QString &topic,
Rosbridge::CallBack callback) {
emit _subscribeTopic(topic, callback);
}
std::future<bool> Rosbridge::topicAvailable(const QString &topic) {
auto pPromise = std::make_shared<std::promise<bool>>();
auto future = pPromise->get_future();
if (this->state() == STATE::CONNECTED) {
auto setBool = [pPromise,
topic](const QJsonObject &topics) mutable -> void {
if (topics.contains(topicsKey) && topics[topicsKey].isArray()) {
QJsonValue v(topic);
pPromise->set_value(topics[topicsKey].toArray().contains(v));
} else {
pPromise->set_value(false);
}
};
emit this->_callService(rosTopics, setBool, QJsonObject());
} else {
pPromise->set_value(false);
}
return future;
}
std::future<QJsonObject> Rosbridge::getAdvertisedTopics() {
auto pPromise = std::make_shared<std::promise<QJsonObject>>();
auto future = pPromise->get_future();
if (this->state() == STATE::CONNECTED) {
auto setString = [pPromise](const QJsonObject &topics) mutable {
pPromise->set_value(topics);
};
emit this->_callService(rosTopics, setString, QJsonObject());
} else {
pPromise->set_value(QJsonObject());
}
return future;
}
void Rosbridge::unsubscribeTopic(const QString &topic) {
emit _unsubscribeTopic(topic);
}
void Rosbridge::unsubscribeAllTopics() { emit _unsubscribeAllTopics(); }
void Rosbridge::waitForTopic(const QString &topic) {
if (this->state() == STATE::CONNECTED) {
std::future<bool> future;
while (true) {
future = topicAvailable(topic);
auto available = future.get();
if (available) {
break;
}
std::this_thread::sleep_for(pollIntervall);
if (this->state() != STATE::CONNECTED) {
qDebug() << "waitForTopic: Rosbridge not connected.";
break;
}
}
} else {
qDebug() << "waitForTopic: Rosbridge not connected.";
}
}
void Rosbridge::advertiseService(const QString &service, const QString &type,
CallBackWReturn callback) {
emit _advertiseService(service, type, callback);
}
void Rosbridge::callService(const QString &service,
Rosbridge::CallBack callback,
const QJsonObject &args) {
emit _callService(service, callback, args);
}
std::future<bool> Rosbridge::serviceAvailable(const QString &service) {
auto pPromise = std::make_shared<std::promise<bool>>();
auto future = pPromise->get_future();
if (this->state() == STATE::CONNECTED) {
auto setBool = [pPromise,
service](const QJsonObject &services) mutable -> void {
if (services.contains(servicesKey) && services[servicesKey].isArray()) {
QJsonValue v(service);
pPromise->set_value(services[servicesKey].toArray().contains(v));
} else {
pPromise->set_value(false);
}
};
emit this->_callService(rosServices, setBool, QJsonObject());
} else {
pPromise->set_value(false);
}
return future;
}
std::future<QJsonObject> Rosbridge::getAdvertisedServices() {
auto pPromise = std::make_shared<std::promise<QJsonObject>>();
auto future = pPromise->get_future();
if (this->state() == STATE::CONNECTED) {
auto setString = [pPromise](const QJsonObject &services) mutable {
pPromise->set_value(services);
};
emit this->_callService(rosServices, setString, QJsonObject());
} else {
pPromise->set_value(QJsonObject());
}
return future;
}
void Rosbridge::unadvertiseService(const QString &service) {
emit _unadvertiseService(service);
}
void Rosbridge::unadvertiseAllServices() { emit _unadvertiseAllServices(); }
void Rosbridge::waitForService(const QString &service) {
if (this->state() == STATE::CONNECTED) {
std::future<bool> future;
while (true) {
future = serviceAvailable(service);
auto available = future.get();
if (available) {
break;
}
std::this_thread::sleep_for(pollIntervall);
if (this->state() != STATE::CONNECTED) {
qDebug() << "waitForService: Rosbridge not connected.";
break;
}
}
} else {
qDebug() << "waitForService: Rosbridge not connected.";
}
}
#ifndef ROSBRIDGE_H
#define ROSBRIDGE_H
#include <QJsonObject>
#include <QObject>
#include <QtWebSockets/QWebSocket>
#include <deque>
#include <functional>
#include <future>
#include <map>
#include <memory>
class RosbridgeImpl;
class QThread;
class Rosbridge : public QObject {
Q_OBJECT
public:
enum class STATE {
STOPPED,
STARTED,
CONNECTED,
TIMEOUT,
};
typedef std::function<void(const QJsonObject &)> CallBack;
typedef std::function<QJsonObject(const QJsonObject &)> CallBackWReturn;
Rosbridge(QUrl _url, QObject *parent = nullptr);
~Rosbridge();
void start();
void stop();
STATE state();
//!
//! \brief wait Waits for connection.
//! Waits for connection. This function can be called after start() to wait
//! for the state() to change to STATE::CONNECTED.
//!
void waitConnected();
// Topics.
void advertiseTopic(const QString &topic, const QString &type);
void publish(const QString &topic, const QJsonObject &msg);
void unadvertiseTopic(const QString &topic);
void unadvertiseAllTopics();
void subscribeTopic(const QString &topic, Rosbridge::CallBack callback);
std::future<bool> topicAvailable(const QString &topic);
std::future<QJsonObject> getAdvertisedTopics();
void unsubscribeTopic(const QString &topic);
void unsubscribeAllTopics();
void waitForTopic(const QString &topic);
// Services.
void advertiseService(const QString &service, const QString &type,
Rosbridge::CallBackWReturn callback);
void unadvertiseService(const QString &service);
void unadvertiseAllServices();
void callService(const QString &service, Rosbridge::CallBack callback,
const QJsonObject &args = QJsonObject());
std::future<bool> serviceAvailable(const QString &service);
std::future<QJsonObject> getAdvertisedServices();
void waitForService(const QString &service);
signals:
void stateChanged();
private:
signals:
void _start();
void _stop();
// Topics
void _advertiseTopic(const QString &topic, const QString &type);
void _publish(const QString &topic, const QJsonObject &msg);
void _unadvertiseTopic(const QString &topic);
void _unadvertiseAllTopics();
void _subscribeTopic(const QString &topic, Rosbridge::CallBack callback);
void _unsubscribeTopic(const QString &topic);
void _unsubscribeAllTopics();
// Services.
void _advertiseService(const QString &service, const QString &type,
Rosbridge::CallBackWReturn callback);
void _callService(const QString &service, Rosbridge::CallBack callback,
const QJsonObject &args = QJsonObject());
void _unadvertiseService(const QString &service);
void _unadvertiseAllServices();
private:
RosbridgeImpl *_impl;
QThread *_t;
QUrl _url;
std::atomic_bool _running;
};
#endif // ROSBRIDGE_H
#include "rosbridgeimpl.h"
#include <QDebug>
#include <QJsonDocument>
#include <QJsonObject>
static const char *advertiseOpKey = "advertise";
static const char *subscribeOpKey = "subscribe";
static const char *unsubscribeOpKey = "unsubscribe";
static const char *unadvertiseOpKey = "unadvertise";
static const char *publishOpKey = "publish";
static const char *serviceResponseOpKey = "service_response";
static const char *unadvertiseServiceOpKey = "unadvertise_service";
static const char *advertiseServiceOpKey = "advertise_service";
static const char *callServiceOpKey = "call_service";
static const char *topicKey = "topic";
static const char *serviceKey = "service";
static const char *msgKey = "msg";
static const char *opKey = "op";
static const char *idKey = "id";
static const char *argsKey = "args";
static const char *resultKey = "result";
static const char *valuesKey = "values";
static const char *typeKey = "type";
struct ServiceCall {
ServiceCall() {}
ServiceCall(const Rosbridge::CallBack &c, QString i, QString s)
: callback(c), id(i), service(s) {}
Rosbridge::CallBack callback;
QString id;
QString service;
};
RosbridgeImpl::RosbridgeImpl(const QUrl &url, QObject *parent)
: QObject(parent),
_webSocket(QString(), QWebSocketProtocol::VersionLatest, this), _url(url),
_state(STATE::STOPPED) {
connect(this, &RosbridgeImpl::stateChanged, this, &RosbridgeImpl::_doAction);
connect(&_webSocket, &QWebSocket::connected, this,
&RosbridgeImpl::_onConnected);
connect(&_webSocket, &QWebSocket::disconnected, this,
&RosbridgeImpl::_onDisconnected);
connect(&_webSocket, &QWebSocket::textMessageReceived, this,
&RosbridgeImpl::_onTextMessageReceived);
qRegisterMetaType<QAbstractSocket::SocketState>();
qRegisterMetaType<QAbstractSocket::SocketError>();
}
RosbridgeImpl::~RosbridgeImpl() {
if (_state == STATE::CONNECTED) {
stop();
}
}
RosbridgeImpl::STATE RosbridgeImpl::state() { return _state.load(); }
void RosbridgeImpl::start() {
if (_state == STATE::STOPPED) {
_setState(STATE::STARTING);
}
}
void RosbridgeImpl::stop() {
if (_state != STATE::STOPPED) {
_setState(STATE::STOPPING);
}
}
void RosbridgeImpl::advertiseTopic(const QString &topic, const QString &type) {
if (_state == STATE::CONNECTED) {
auto it = _advertisedTopics.find(topic);
if (Q_LIKELY(it == _advertisedTopics.end())) {
_advertisedTopics.insert(topic);
QJsonObject o;
o[opKey] = advertiseOpKey;
o[topicKey] = topic;
o[typeKey] = type;
QString payload =
QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
_webSocket.sendTextMessage(payload);
} else {
qDebug() << "Topic " << topic << " already advertised.";
}
} else {
qDebug() << "advertiseTopic: Rosbridge not connected!";
}
}
void RosbridgeImpl::publish(const QString &topic, const QJsonObject &msg) {
if (_state == STATE::CONNECTED) {
auto it = _advertisedTopics.find(topic);
if (Q_LIKELY(it != _advertisedTopics.end())) {
QJsonObject o;
o[opKey] = publishOpKey;
o[topicKey] = topic;
o[msgKey] = msg;
QString payload =
QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
_webSocket.sendTextMessage(payload);
} else {
qDebug() << "Topic " << topic << " not advertised.";
}
} else {
qDebug() << "publish: Rosbridge not connected!";
}
}
void RosbridgeImpl::unadvertiseTopic(const QString &topic) {
if (_state == STATE::CONNECTED || _state == STATE::STOPPING) {
auto it = _advertisedTopics.find(topic);
if (Q_LIKELY(it != _advertisedTopics.end())) {
_advertisedTopics.erase(it);
QJsonObject o;
o[opKey] = unadvertiseOpKey;
o[topicKey] = topic;
QString payload =
QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
_webSocket.sendTextMessage(payload);
} else {
qDebug() << "Topic " << topic << " not advertised.";
}
} else {
qDebug() << "unadvertiseTopic: Rosbridge not connected!";
}
}
void RosbridgeImpl::unadvertiseAllTopics() {
if (_state == STATE::CONNECTED || _state == STATE::STOPPING) {
while (!_advertisedTopics.empty()) {
const auto topic = _advertisedTopics.begin();
unadvertiseTopic(*topic);
}
} else {
qDebug() << "unadvertiseAllTopic: Rosbridge not connected!";
}
}
void RosbridgeImpl::subscribeTopic(const QString &topic,
const Rosbridge::CallBack &callback) {
if (_state == STATE::CONNECTED) {
auto it = _subscribedTopics.find(topic);
if (Q_LIKELY(it == _subscribedTopics.end())) {
auto ret = _subscribedTopics.insert(std::make_pair(topic, callback));
Q_ASSERT(ret.second == true);
QJsonObject o;
o[opKey] = subscribeOpKey;
o[topicKey] = topic;
QString payload =
QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
_webSocket.sendTextMessage(payload);
} else {
qDebug() << "subscribeTopic: topic " << topic << " already subscribed!";
}
} else {
qDebug() << "subscribeTopic: Rosbridge not connected!";
}
}
void RosbridgeImpl::unsubscribeTopic(const QString &topic) {
if (_state == STATE::CONNECTED || _state == STATE::STOPPING) {
auto it = _subscribedTopics.find(topic);
if (Q_LIKELY(it != _subscribedTopics.end())) {
_subscribedTopics.erase(it);
QJsonObject o;
o[opKey] = unsubscribeOpKey;
o[topicKey] = topic;
QString payload =
QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
_webSocket.sendTextMessage(payload);
} else {
qDebug() << "unsubscribeTopic: topic " << topic << " already subscribed!";
}
} else {
qDebug() << "unsubscribeTopic: Rosbridge not connected!";
}
}
void RosbridgeImpl::unsubscribeAllTopics() {
if (_state == STATE::CONNECTED || _state == STATE::STOPPING) {
while (!_subscribedTopics.empty()) {
auto it = _subscribedTopics.begin();
unsubscribeTopic(it->first);
}
} else {
qDebug() << "unsubscribeAll: Rosbridge not connected!";
}
}
void RosbridgeImpl::advertiseService(
const QString &service, const QString &type,
const Rosbridge::CallBackWReturn &callback) {
if (_state == STATE::CONNECTED) {
auto it = _advertisedServices.find(service);
if (it == _advertisedServices.end()) {
auto ret = _advertisedServices.insert(std::make_pair(service, callback));
Q_ASSERT(ret.second == true);
QJsonObject o;
o[opKey] = advertiseServiceOpKey;
o[serviceKey] = service;
o[typeKey] = type;
QString payload =
QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
_webSocket.sendTextMessage(payload);
} else {
qDebug() << "advertiseService: Service " << service
<< " already advertised. Use unadvertiseService first to "
"unadvertise the service.";
}
} else {
qDebug() << "advertiseService: Rosbridge not connected!";
}
}
void RosbridgeImpl::_serviceResponse(const QString &service, const QString &id,
bool result, const QJsonObject &values) {
if (_state == STATE::CONNECTED) {
auto it = _advertisedServices.find(service);
if (it != _advertisedServices.end()) {
QJsonObject o;
o[opKey] = serviceResponseOpKey;
o[serviceKey] = service;
o[resultKey] = result;
o[idKey] = id;
o[valuesKey] = values;
QString payload =
QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
_webSocket.sendTextMessage(payload);
} else {
qDebug() << "serviceResponse: Service " << service << " not advertised.";
}
} else {
qDebug() << "serviceResponse: Rosbridge not connected!";
}
}
void RosbridgeImpl::_clearAllPendingServiceCalls() {
while (!_pendingServiceCalls.empty()) {
auto it = _pendingServiceCalls.begin();
it = _pendingServiceCalls.erase(it);
}
}
void RosbridgeImpl::callService(const QString &service,
const Rosbridge::CallBack &callback,
const QJsonObject &req) {
if (_state == STATE::CONNECTED) {
auto it = _pendingServiceCalls.find(service);
if (it == _pendingServiceCalls.end()) {
auto ret = _pendingServiceCalls.insert(
std::make_pair(service, std::deque<std::unique_ptr<ServiceCall>>()));
Q_ASSERT(ret.second == true);
it = ret.first;
}
QString id(QString::number(_getMessageId()));
auto p =
std::unique_ptr<ServiceCall>(new ServiceCall(callback, id, service));
it->second.push_back(std::move(p));
QJsonObject o;
o[opKey] = callServiceOpKey;
o[serviceKey] = service;
o[idKey] = id;
o[argsKey] = req;
QString payload =
QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
_webSocket.sendTextMessage(payload);
} else {
qDebug() << "callService: Rosbridge not connected!";
}
}
void RosbridgeImpl::unadvertiseService(const QString &service) {
if (_state == STATE::CONNECTED || _state == STATE::STOPPING) {
auto it = _advertisedServices.find(service);
if (it != _advertisedServices.end()) {
it = _advertisedServices.erase(it);
QJsonObject o;
o[opKey] = unadvertiseServiceOpKey;
o[serviceKey] = service;
QString payload =
QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact);
_webSocket.sendTextMessage(payload);
} else {
qDebug() << "unadvertiseService: Service " << service
<< " not advertised.";
}
} else {
qDebug() << "unadvertiseService: Rosbridge not connected!";
}
}
void RosbridgeImpl::unadvertiseAllServices() {
if (_state == STATE::CONNECTED || _state == STATE::STOPPING) {
while (!_advertisedServices.empty()) {
auto it = _advertisedServices.begin();
unadvertiseService(it->first);
}
} else {
qDebug() << "unadvertiseAllService: Rosbridge not connected!";
}
}
void RosbridgeImpl::_onConnected() {
if (this->_state == STATE::STARTING) {
_setState(STATE::CONNECTED);
}
}
void RosbridgeImpl::_onDisconnected() {
if (this->_state == STATE::CONNECTED) {
_setState(STATE::TIMEOUT);
}
}
void RosbridgeImpl::_setState(RosbridgeImpl::STATE newState) {
if (_state != newState) {
_state = newState;
emit stateChanged();
}
}
int RosbridgeImpl::_getMessageId() {
static int c = 0;
return c++;
}
void RosbridgeImpl::_doAction() {
switch (_state.load()) {
case STATE::STOPPED:
break;
case STATE::STOPPING:
unadvertiseAllTopics();
unadvertiseAllServices();
unsubscribeAllTopics();
_clearAllPendingServiceCalls();
_webSocket.close();
_setState(STATE::STOPPED);
break;
case STATE::STARTING:
_webSocket.open(_url);
break;
case STATE::TIMEOUT:
break;
case STATE::CONNECTED:
break;
}
}
void RosbridgeImpl::_onTextMessageReceived(const QString &message) {
QJsonParseError e;
auto d = QJsonDocument::fromJson(message.toUtf8(), &e);
if (!d.isNull()) {
if (d.isObject()) {
QJsonObject o = d.object();
if (o.contains(opKey) && o[opKey].isString()) {
auto opCode = o[opKey].toString();
if (opCode == publishOpKey) {
_processTopic(o);
return;
} else if (opCode == serviceResponseOpKey) {
_processServiceResponse(o);
return;
} else if (opCode == callServiceOpKey) {
_processServiceCall(o);
return;
} else {
qDebug() << "_onTextMessageReceived: unknown op code: "
<< o[opKey].toString();
}
} else {
qDebug()
<< "_onTextMessageReceived: json document doesn't contain op code ("
<< opKey << ") or op code has wrong type.";
}
} else {
qDebug() << "_onTextMessageReceived: json document is not an object.";
}
} else {
qDebug() << "_onTextMessageReceived: parse error: " << e.errorString();
}
qDebug() << "_onTextMessageReceived: message: " << message;
}
void RosbridgeImpl::_processTopic(const QJsonObject &o) {
if (o.contains(topicKey) && o[topicKey].isString()) {
if (o.contains(msgKey) && o[msgKey].isObject()) {
auto topic = o[topicKey].toString();
auto it = _subscribedTopics.find(topic);
if (Q_LIKELY(it != _subscribedTopics.end())) {
it->second(o[msgKey].toObject(QJsonObject()));
return;
} else {
qDebug() << "_processTopic: unknown topic " << topic;
}
} else {
qDebug() << "_processTopic: message key (" << msgKey << ") missing";
}
} else {
qDebug() << "_processTopic: json document doesn't contain topic key ("
<< topicKey << ") or topic key has wrong type.";
}
qDebug() << "_processTopic: msg: " << o;
}
void RosbridgeImpl::_processServiceResponse(const QJsonObject &o) {
if (o.contains(serviceKey) && o[serviceKey].isString()) {
if (o.contains(valuesKey) && o[valuesKey].isObject()) {
auto service = o[serviceKey].toString();
auto it = _pendingServiceCalls.find(service);
if (Q_LIKELY(it != _pendingServiceCalls.end())) {
auto p = it->second.begin();
// check if ids match
if (o.contains(idKey) && o[idKey].isString()) {
auto id = o[idKey].toString();
if (id != (*p)->id) {
qDebug() << "_processServiceResponse: ids, don't match, searching "
"correct callback...";
auto match =
std::find_if(it->second.begin(), it->second.end(),
[&id](const std::unique_ptr<ServiceCall> &c) {
return c->id == id;
});
if (match != it->second.end()) {
p = match;
} else {
qDebug()
<< "_processServiceResponse: unknown id, can't determine "
"callback";
return;
}
}
}
((*p)->callback)(o[valuesKey].toObject()); // callback
p = it->second.erase(p);
if (it->second.size() == 0) {
_pendingServiceCalls.erase(it);
}
return;
} else {
qDebug() << "_processServiceResponse: unknown service " << service;
}
} else {
qDebug() << "_processServiceResponse: json document doesn't contain "
"values key ("
<< valuesKey << ") or values key has wrong type.";
}
} else {
qDebug() << "_processServiceResponse: json document doesn't contain "
"service key ("
<< serviceKey << ") or service key has wrong type.";
}
qDebug() << "_processServiceResponse: msg: " << o;
}
void RosbridgeImpl::_processServiceCall(const QJsonObject &o) {
if (o.contains(serviceKey) && o[serviceKey].isString()) {
auto service = o[serviceKey].toString();
if (o.contains(idKey) && o[idKey].isString()) {
if (o.contains(argsKey) && o[argsKey].isObject()) {
auto id = o[idKey].toString();
auto it = _advertisedServices.find(service);
if (Q_LIKELY(it != _advertisedServices.end())) {
auto resp = it->second(o[argsKey].toObject());
auto result = !resp.empty();
_serviceResponse(service, id, result, resp);
return;
} else {
qDebug() << "_processServiceCall: unknown service " << service;
}
} else {
qDebug() << "_processServiceCall: args key (" << idKey
<< ") missing, response not possible.";
}
} else {
qDebug() << "_processServiceCall: id key (" << idKey
<< ") missing, response not possible.";
}
} else {
qDebug()
<< "_processServiceCall: json document doesn't contain service key ("
<< serviceKey << ") or service key has wrong type.";
}
qDebug() << "_processServiceCall: msg: " << o;
}
#ifndef ROSBRIDGEIMPL_H
#define ROSBRIDGEIMPL_H
#include "rosbridge.h"
#include <QObject>
#include <QtWebSockets/QWebSocket>
#include <deque>
#include <memory>
#include <set>
struct ServiceCall;
class RosbridgeImpl : public QObject {
Q_OBJECT
public:
enum class STATE { STOPPED, STOPPING, STARTING, CONNECTED, TIMEOUT };
RosbridgeImpl(const QUrl &url, QObject *parent = nullptr);
~RosbridgeImpl();
STATE state(); // thread safe
public slots:
void start();
void stop();
// Topics.
void advertiseTopic(const QString &topic, const QString &type);
void publish(const QString &topic, const QJsonObject &msg);
void unadvertiseTopic(const QString &topic);
void unadvertiseAllTopics();
void subscribeTopic(const QString &topic,
const Rosbridge::CallBack &callback);
void unsubscribeTopic(const QString &topic);
void unsubscribeAllTopics();
// Services.
void advertiseService(const QString &service, const QString &type,
const Rosbridge::CallBackWReturn &callback);
void callService(const QString &service, const Rosbridge::CallBack &callback,
const QJsonObject &req = QJsonObject());
void unadvertiseService(const QString &service);
void unadvertiseAllServices();
signals:
void stateChanged();
private slots:
void _onConnected();
void _onDisconnected();
void _doAction();
void _onTextMessageReceived(const QString &message);
private:
void _processTopic(const QJsonObject &o);
void _processServiceResponse(const QJsonObject &o);
void _processServiceCall(const QJsonObject &o);
void _setState(STATE newState);
int _getMessageId();
void _serviceResponse(const QString &service, const QString &id, bool result,
const QJsonObject &values);
void _clearAllPendingServiceCalls();
QWebSocket _webSocket;
QUrl _url;
std::set<QString> _advertisedTopics;
std::map<QString, Rosbridge::CallBack> _subscribedTopics;
std::map<QString, Rosbridge::CallBackWReturn> _advertisedServices;
std::map<QString, std::deque<std::unique_ptr<ServiceCall>>>
_pendingServiceCalls;
std::atomic<STATE> _state;
};
#endif // ROSBRIDGEIMPL_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment