#include "rosbridgeimpl.h" #include #include #include static const char *advertiseOpKey = "advertise"; static const char *subscribeOpKey = "subscribe"; static const char *unsubscribeOpKey = "unsubscribe"; static const char *unadvertiseOpKey = "unadvertise"; static const char *publishOpKey = "publish"; static const char *serviceResponseOpKey = "service_response"; static const char *unadvertiseServiceOpKey = "unadvertise_service"; static const char *advertiseServiceOpKey = "advertise_service"; static const char *callServiceOpKey = "call_service"; static const char *topicKey = "topic"; static const char *serviceKey = "service"; static const char *msgKey = "msg"; static const char *opKey = "op"; static const char *idKey = "id"; static const char *argsKey = "args"; static const char *resultKey = "result"; static const char *valuesKey = "values"; static const char *typeKey = "type"; struct ServiceCall { ServiceCall() {} ServiceCall(const Rosbridge::CallBack &c, QString i, QString s) : callback(c), id(i), service(s) {} Rosbridge::CallBack callback; QString id; QString service; }; RosbridgeImpl::RosbridgeImpl(const QUrl &url, QObject *parent) : QObject(parent), _webSocket(QString(), QWebSocketProtocol::VersionLatest, this), _url(url), _state(STATE::STOPPED) { connect(this, &RosbridgeImpl::stateChanged, this, &RosbridgeImpl::_doAction); connect(&_webSocket, &QWebSocket::connected, this, &RosbridgeImpl::_onConnected); connect(&_webSocket, &QWebSocket::disconnected, this, &RosbridgeImpl::_onDisconnected); connect(&_webSocket, &QWebSocket::textMessageReceived, this, &RosbridgeImpl::_onTextMessageReceived); qRegisterMetaType(); qRegisterMetaType(); } RosbridgeImpl::~RosbridgeImpl() { if (_state == STATE::CONNECTED) { stop(); } } RosbridgeImpl::STATE RosbridgeImpl::state() { return _state.load(); } void RosbridgeImpl::start() { if (_state == STATE::STOPPED) { _setState(STATE::STARTING); } } void RosbridgeImpl::stop() { if (_state != STATE::STOPPED) { _setState(STATE::STOPPING); } } void RosbridgeImpl::advertiseTopic(const QString &topic, const QString &type) { if (_state == STATE::CONNECTED) { auto it = _advertisedTopics.find(topic); if (Q_LIKELY(it == _advertisedTopics.end())) { _advertisedTopics.insert(topic); QJsonObject o; o[opKey] = advertiseOpKey; o[topicKey] = topic; o[typeKey] = type; QString payload = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); _webSocket.sendTextMessage(payload); } else { qDebug() << "Topic " << topic << " already advertised."; } } else { qDebug() << "advertiseTopic: Rosbridge not connected!"; } } void RosbridgeImpl::publish(const QString &topic, const QJsonObject &msg) { if (_state == STATE::CONNECTED) { auto it = _advertisedTopics.find(topic); if (Q_LIKELY(it != _advertisedTopics.end())) { QJsonObject o; o[opKey] = publishOpKey; o[topicKey] = topic; o[msgKey] = msg; QString payload = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); _webSocket.sendTextMessage(payload); } else { qDebug() << "Topic " << topic << " not advertised."; } } else { qDebug() << "publish: Rosbridge not connected!"; } } void RosbridgeImpl::unadvertiseTopic(const QString &topic) { if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { auto it = _advertisedTopics.find(topic); if (Q_LIKELY(it != _advertisedTopics.end())) { QJsonObject o; o[opKey] = unadvertiseOpKey; o[topicKey] = topic; QString payload = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); _webSocket.sendTextMessage(payload); _advertisedTopics.erase(it); } else { qDebug() << "Topic " << topic << " not advertised."; } } else { qDebug() << "unadvertiseTopic: Rosbridge not connected!"; } } void RosbridgeImpl::unadvertiseAllTopics() { if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { while (!_advertisedTopics.empty()) { const auto topic = _advertisedTopics.begin(); unadvertiseTopic(*topic); } } else { qDebug() << "unadvertiseAllTopic: Rosbridge not connected!"; } } void RosbridgeImpl::subscribeTopic(const QString &topic, const Rosbridge::CallBack &callback) { if (_state == STATE::CONNECTED) { auto it = _subscribedTopics.find(topic); if (Q_LIKELY(it == _subscribedTopics.end())) { auto ret = _subscribedTopics.insert(std::make_pair(topic, callback)); Q_ASSERT(ret.second == true); QJsonObject o; o[opKey] = subscribeOpKey; o[topicKey] = topic; QString payload = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); _webSocket.sendTextMessage(payload); } else { qDebug() << "subscribeTopic: topic " << topic << " already subscribed!"; } } else { qDebug() << "subscribeTopic: Rosbridge not connected!"; } } void RosbridgeImpl::unsubscribeTopic(const QString &topic) { if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { auto it = _subscribedTopics.find(topic); if (Q_LIKELY(it != _subscribedTopics.end())) { QJsonObject o; o[opKey] = unsubscribeOpKey; o[topicKey] = topic; QString payload = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); _webSocket.sendTextMessage(payload); _subscribedTopics.erase(it); } else { qDebug() << "unsubscribeTopic: topic " << topic << " already subscribed!"; } } else { qDebug() << "unsubscribeTopic: Rosbridge not connected!"; } } void RosbridgeImpl::unsubscribeAllTopics() { if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { while (!_subscribedTopics.empty()) { auto it = _subscribedTopics.begin(); unsubscribeTopic(it->first); } } else { qDebug() << "unsubscribeAll: Rosbridge not connected!"; } } void RosbridgeImpl::advertiseService( const QString &service, const QString &type, const Rosbridge::CallBackWReturn &callback) { if (_state == STATE::CONNECTED) { auto it = _advertisedServices.find(service); if (it == _advertisedServices.end()) { auto ret = _advertisedServices.insert(std::make_pair(service, callback)); Q_ASSERT(ret.second == true); QJsonObject o; o[opKey] = advertiseServiceOpKey; o[serviceKey] = service; o[typeKey] = type; QString payload = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); _webSocket.sendTextMessage(payload); } else { qDebug() << "advertiseService: Service " << service << " already advertised. Use unadvertiseService first to " "unadvertise the service."; } } else { qDebug() << "advertiseService: Rosbridge not connected!"; } } void RosbridgeImpl::_serviceResponse(const QString &service, const QString &id, bool result, const QJsonObject &values) { if (_state == STATE::CONNECTED) { auto it = _advertisedServices.find(service); if (it != _advertisedServices.end()) { QJsonObject o; o[opKey] = serviceResponseOpKey; o[serviceKey] = service; o[resultKey] = result; o[idKey] = id; o[valuesKey] = values; QString payload = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); _webSocket.sendTextMessage(payload); } else { qDebug() << "serviceResponse: Service " << service << " not advertised."; } } else { qDebug() << "serviceResponse: Rosbridge not connected!"; } } void RosbridgeImpl::_clearAllPendingServiceCalls() { while (!_pendingServiceCalls.empty()) { auto it = _pendingServiceCalls.begin(); it = _pendingServiceCalls.erase(it); } } void RosbridgeImpl::callService(const QString &service, const Rosbridge::CallBack &callback, const QJsonObject &req) { if (_state == STATE::CONNECTED) { auto it = _pendingServiceCalls.find(service); if (it == _pendingServiceCalls.end()) { auto ret = _pendingServiceCalls.insert( std::make_pair(service, std::deque>())); Q_ASSERT(ret.second == true); it = ret.first; } QString id(QString::number(_getMessageId())); auto p = std::unique_ptr(new ServiceCall(callback, id, service)); it->second.push_back(std::move(p)); QJsonObject o; o[opKey] = callServiceOpKey; o[serviceKey] = service; o[idKey] = id; o[argsKey] = req; QString payload = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); _webSocket.sendTextMessage(payload); } else { qDebug() << "callService: Rosbridge not connected!"; } } void RosbridgeImpl::unadvertiseService(const QString &service) { if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { auto it = _advertisedServices.find(service); if (it != _advertisedServices.end()) { QJsonObject o; o[opKey] = unadvertiseServiceOpKey; o[serviceKey] = service; QString payload = QJsonDocument(o).toJson(QJsonDocument::JsonFormat::Compact); _webSocket.sendTextMessage(payload); it = _advertisedServices.erase(it); } else { qDebug() << "unadvertiseService: Service " << service << " not advertised."; } } else { qDebug() << "unadvertiseService: Rosbridge not connected!"; } } void RosbridgeImpl::unadvertiseAllServices() { if (_state == STATE::CONNECTED || _state == STATE::STOPPING) { while (!_advertisedServices.empty()) { auto it = _advertisedServices.begin(); unadvertiseService(it->first); } } else { qDebug() << "unadvertiseAllService: Rosbridge not connected!"; } } void RosbridgeImpl::_onConnected() { if (this->_state == STATE::STARTING) { _setState(STATE::CONNECTED); } } void RosbridgeImpl::_onDisconnected() { if (this->_state == STATE::CONNECTED) { _setState(STATE::TIMEOUT); } } void RosbridgeImpl::_setState(RosbridgeImpl::STATE newState) { if (_state != newState) { _state = newState; emit stateChanged(); } } int RosbridgeImpl::_getMessageId() { static int c = 0; return c++; } void RosbridgeImpl::_doAction() { switch (_state.load()) { case STATE::STOPPED: break; case STATE::STOPPING: unadvertiseAllTopics(); unadvertiseAllServices(); unsubscribeAllTopics(); _clearAllPendingServiceCalls(); _webSocket.close(); _setState(STATE::STOPPED); break; case STATE::STARTING: _webSocket.open(_url); break; case STATE::TIMEOUT: break; case STATE::CONNECTED: break; } } void RosbridgeImpl::_onTextMessageReceived(const QString &message) { qDebug() << "_onTextMessageReceived: " << message; QJsonParseError e; auto d = QJsonDocument::fromJson(message.toUtf8(), &e); if (!d.isNull()) { if (d.isObject()) { QJsonObject o = d.object(); if (o.contains(opKey) && o[opKey].isString()) { auto opCode = o[opKey].toString(); if (opCode == publishOpKey) { _processTopic(o); return; } else if (opCode == serviceResponseOpKey) { _processServiceResponse(o); return; } else if (opCode == callServiceOpKey) { _processServiceCall(o); return; } else { qDebug() << "_onTextMessageReceived: unknown op code: " << o[opKey].toString(); } } else { qDebug() << "_onTextMessageReceived: json document doesn't contain op code (" << opKey << ") or op code has wrong type."; } } else { qDebug() << "_onTextMessageReceived: json document is not an object."; } } else { qDebug() << "_onTextMessageReceived: parse error: " << e.errorString(); } qDebug() << "_onTextMessageReceived: message: " << message; } void RosbridgeImpl::_processTopic(const QJsonObject &o) { if (o.contains(topicKey) && o[topicKey].isString()) { if (o.contains(msgKey) && o[msgKey].isObject()) { auto topic = o[topicKey].toString(); auto it = _subscribedTopics.find(topic); if (Q_LIKELY(it != _subscribedTopics.end())) { it->second(o[msgKey].toObject(QJsonObject())); return; } else { qDebug() << "_processTopic: unknown topic " << topic; } } else { qDebug() << "_processTopic: message key (" << msgKey << ") missing"; } } else { qDebug() << "_processTopic: json document doesn't contain topic key (" << topicKey << ") or topic key has wrong type."; } qDebug() << "_processTopic: msg: " << o; } void RosbridgeImpl::_processServiceResponse(const QJsonObject &o) { if (o.contains(serviceKey) && o[serviceKey].isString()) { if (o.contains(valuesKey) && o[valuesKey].isObject()) { auto service = o[serviceKey].toString(); auto it = _pendingServiceCalls.find(service); if (Q_LIKELY(it != _pendingServiceCalls.end())) { auto p = it->second.begin(); // check if ids match if (o.contains(idKey) && o[idKey].isString()) { auto id = o[idKey].toString(); if (id != (*p)->id) { qDebug() << "_processServiceResponse: ids, don't match, searching " "correct callback..."; auto match = std::find_if(it->second.begin(), it->second.end(), [&id](const std::unique_ptr &c) { return c->id == id; }); if (match != it->second.end()) { p = match; } else { qDebug() << "_processServiceResponse: unknown id, can't determine " "callback"; return; } } } ((*p)->callback)(o[valuesKey].toObject()); // callback p = it->second.erase(p); if (it->second.size() == 0) { _pendingServiceCalls.erase(it); } return; } else { qDebug() << "_processServiceResponse: unknown service " << service; } } else { qDebug() << "_processServiceResponse: json document doesn't contain " "values key (" << valuesKey << ") or values key has wrong type."; } } else { qDebug() << "_processServiceResponse: json document doesn't contain " "service key (" << serviceKey << ") or service key has wrong type."; } qDebug() << "_processServiceResponse: msg: " << o; } void RosbridgeImpl::_processServiceCall(const QJsonObject &o) { if (o.contains(serviceKey) && o[serviceKey].isString()) { auto service = o[serviceKey].toString(); if (o.contains(idKey) && o[idKey].isString()) { if (o.contains(argsKey) && o[argsKey].isObject()) { auto id = o[idKey].toString(); auto it = _advertisedServices.find(service); if (Q_LIKELY(it != _advertisedServices.end())) { auto resp = it->second(o[argsKey].toObject()); auto result = !resp.empty(); _serviceResponse(service, id, result, resp); return; } else { qDebug() << "_processServiceCall: unknown service " << service; } } else { qDebug() << "_processServiceCall: args key (" << idKey << ") missing, response not possible."; } } else { qDebug() << "_processServiceCall: id key (" << idKey << ") missing, response not possible."; } } else { qDebug() << "_processServiceCall: json document doesn't contain service key (" << serviceKey << ") or service key has wrong type."; } qDebug() << "_processServiceCall: msg: " << o; }