From be17507325c6871b4b0254f4792187d222ac0e5e Mon Sep 17 00:00:00 2001 From: Valentin Platzgummer Date: Tue, 4 Aug 2020 15:44:06 +0200 Subject: [PATCH] service calls are working now --- src/Wima/WimaController.cc | 66 +++++++------- src/Wima/WimaController.h | 5 +- .../ros_bridge/include/ComPrivateInclude.h | 2 +- src/comm/ros_bridge/include/ROSBridge.h | 14 +-- src/comm/ros_bridge/include/RosBridgeClient.h | 2 +- src/comm/ros_bridge/include/Server.cpp | 85 ++++++++++++++++++- src/comm/ros_bridge/include/Server.h | 39 +++++++-- src/comm/ros_bridge/include/ThreadSafeQueue.h | 79 +++++++++++++++++ .../ros_bridge/include/TopicPublisher.cpp | 40 +++------ src/comm/ros_bridge/include/TopicPublisher.h | 12 ++- .../ros_bridge/include/TopicSubscriber.cpp | 35 +++++--- src/comm/ros_bridge/include/TopicSubscriber.h | 13 ++- src/comm/ros_bridge/src/ROSBridge.cpp | 15 +++- 13 files changed, 301 insertions(+), 106 deletions(-) create mode 100644 src/comm/ros_bridge/include/ThreadSafeQueue.h diff --git a/src/Wima/WimaController.cc b/src/Wima/WimaController.cc index c13aae906..4da4157d9 100644 --- a/src/Wima/WimaController.cc +++ b/src/Wima/WimaController.cc @@ -14,6 +14,8 @@ #include "QVector3D" #include +#include + // const char* WimaController::wimaFileExtension = "wima"; @@ -80,6 +82,7 @@ WimaController::WimaController(QObject *parent) , _fallbackStatus (0 /*status: not connected*/) , _bridgeSetupDone (false) , _pRosBridge (new ROSBridge::ROSBridge()) + , _pCasePacker (_pRosBridge->casePacker()) { // Set up facts. _showAllMissionItems.setRawValue(true); @@ -480,7 +483,9 @@ bool WimaController::setWimaPlanData(const WimaPlanData &planData) _scenario.addArea(corridor); // Check if scenario is defined. - if ( !_isScenarioDefinedErrorMessage() ) { + if ( !_scenario.defined(_snakeTileWidth.rawValue().toUInt(), + _snakeTileHeight.rawValue().toUInt(), + _snakeMinTileArea.rawValue().toUInt()) ) { Q_ASSERT(false); return false; } @@ -721,28 +726,10 @@ void WimaController::_eventTimerHandler() if ( _enableSnake.rawValue().toBool() && _pRosBridge->connected() ) { if ( !_bridgeSetupDone ) { - qWarning() << "ROS Bridge setup. "; - auto start = std::chrono::high_resolution_clock::now(); + _pRosBridge->start(); - auto end = std::chrono::high_resolution_clock::now(); - qWarning() << "_pRosBridge->start() time: " - << std::chrono::duration_cast(end-start).count() - << " ms"; - start = std::chrono::high_resolution_clock::now(); _pRosBridge->publish(_snakeOrigin, "/snake/origin"); - end = std::chrono::high_resolution_clock::now(); - qWarning() << "_pRosBridge->publish(_snakeOrigin, \"/snake/origin\") time: " - << std::chrono::duration_cast(end-start).count() - << " ms"; - start = std::chrono::high_resolution_clock::now(); _pRosBridge->publish(_snakeTilesLocal, "/snake/tiles"); - end = std::chrono::high_resolution_clock::now(); - qWarning() << "_pRosBridge->publish(_snakeTilesLocal, \"/snake/tiles\") time: " - << std::chrono::duration_cast(end-start).count() - << " ms"; - - - start = std::chrono::high_resolution_clock::now(); _pRosBridge->subscribe("/nemo/progress", /* callback */ [this](JsonDocUPtr pDoc){ int requiredSize = this->_snakeTilesLocal.polygons().size(); @@ -754,12 +741,6 @@ void WimaController::_eventTimerHandler() emit WimaController::nemoProgressChanged(); }); - end = std::chrono::high_resolution_clock::now(); - qWarning() << "_pRosBridge->subscribe(\"/nemo/progress\" time: " - << std::chrono::duration_cast(end-start).count() - << " ms"; - - start = std::chrono::high_resolution_clock::now(); _pRosBridge->subscribe("/nemo/heartbeat", /* callback */ [this, &nemoStatusTicker](JsonDocUPtr pDoc){ if ( !this->_pRosBridge->casePacker()->unpack(pDoc, this->_nemoHeartbeat) ) { @@ -773,10 +754,35 @@ void WimaController::_eventTimerHandler() emit WimaController::nemoStatusChanged(); emit WimaController::nemoStatusStringChanged(); }); - end = std::chrono::high_resolution_clock::now(); - qWarning() << "_pRosBridge->subscribe(\"/nemo/heartbeat\" time: " - << std::chrono::duration_cast(end-start).count() - << " ms"; + + auto pOrigin = &_snakeOrigin; + auto casePacker = _pCasePacker; + _pRosBridge->advertiseService("/snake/get_origin", "snake_msgs/GetOrigin", + [casePacker, pOrigin](JsonDocUPtr) -> JsonDocUPtr { + JsonDocUPtr pDoc = casePacker->pack(*pOrigin, ""); + casePacker->removeTag(pDoc); + rapidjson::Document value(rapidjson::kObjectType); + JsonDocUPtr pReturn(new rapidjson::Document(rapidjson::kObjectType)); + value.CopyFrom(*pDoc, pReturn->GetAllocator()); + pReturn->AddMember("origin", value, pReturn->GetAllocator()); + + return pReturn; + + }); + + auto pTiles = &_snakeTilesLocal; + _pRosBridge->advertiseService("/snake/get_tiles", "snake_msgs/GetTiles", + [casePacker, pTiles](JsonDocUPtr) -> JsonDocUPtr { + + JsonDocUPtr pDoc = casePacker->pack(*pTiles, ""); + casePacker->removeTag(pDoc); + rapidjson::Document value(rapidjson::kObjectType); + JsonDocUPtr pReturn(new rapidjson::Document(rapidjson::kObjectType)); + value.CopyFrom(*pDoc, pReturn->GetAllocator()); + pReturn->AddMember("tiles", value, pReturn->GetAllocator()); + return pReturn; + + }); _bridgeSetupDone = true; } } else if ( _bridgeSetupDone) { diff --git a/src/Wima/WimaController.h b/src/Wima/WimaController.h index 063c13a7b..e39c95321 100644 --- a/src/Wima/WimaController.h +++ b/src/Wima/WimaController.h @@ -36,6 +36,7 @@ #include "Snake/QNemoHeartbeat.h" #include "ros_bridge/include/ROSBridge.h" +#include "ros_bridge/include/CasePacker.h" #include "WaypointManager/DefaultManager.h" #include "WaypointManager/RTLManager.h" @@ -349,6 +350,7 @@ private slots: private: using StatusMap = std::map; + using CasePacker = ROSBridge::CasePacker; // Controllers. PlanMasterController *_masterController; @@ -397,8 +399,6 @@ private: // Waypoint statistics. double _measurementPathLength; // the lenght of the phase in meters -// double _phaseDistance; // the lenth in meters of the current phase -// double _phaseDuration; // the phase duration in seconds // Snake bool _snakeCalcInProgress; @@ -414,6 +414,7 @@ private: QNemoHeartbeat _nemoHeartbeat; // measurement progress int _fallbackStatus; ROSBridgePtr _pRosBridge; + const CasePacker *_pCasePacker; bool _bridgeSetupDone; static StatusMap _nemoStatusMap; diff --git a/src/comm/ros_bridge/include/ComPrivateInclude.h b/src/comm/ros_bridge/include/ComPrivateInclude.h index d62855099..ba6e137f6 100644 --- a/src/comm/ros_bridge/include/ComPrivateInclude.h +++ b/src/comm/ros_bridge/include/ComPrivateInclude.h @@ -20,7 +20,7 @@ using ClientMap = std::unordered_map; static const char* _topicAdvertiserKey = "topic_advertiser"; static const char* _topicPublisherKey = "topic_publisher"; -//static const char* _topicAdvertiserKey = "service_advertiser"; +static const char* _serviceAdvertiserKey = "service_advertiser"; static const char* _topicSubscriberKey = "topic_subscriber"; std::size_t getHash(const std::string &str); diff --git a/src/comm/ros_bridge/include/ROSBridge.h b/src/comm/ros_bridge/include/ROSBridge.h index 589101a99..49e3ec497 100644 --- a/src/comm/ros_bridge/include/ROSBridge.h +++ b/src/comm/ros_bridge/include/ROSBridge.h @@ -6,11 +6,10 @@ #include "ros_bridge/include/JsonFactory.h" #include "ros_bridge/include/TopicPublisher.h" #include "ros_bridge/include/TopicSubscriber.h" +#include "ros_bridge/include/Server.h" #include -#include - -#include "boost/lockfree/queue.hpp" +#include namespace ROSBridge { class ROSBridge @@ -27,8 +26,11 @@ public: _topicPublisher.publish(msg, topic); } void publish(JsonDocUPtr doc); - - void subscribe(const char *topic, const std::function &callBack); + void subscribe(const char *topic, + const std::function &callBack); + void advertiseService(const std::string &service, + const std::string &type, + const std::function &callback); const CasePacker *casePacker() const; @@ -45,9 +47,9 @@ private: JsonFactory _jsonFactory; CasePacker _casePacker; RosbridgeWsClient _rbc; - std::mutex _rbcMutex; ComPrivate::TopicPublisher _topicPublisher; ComPrivate::TopicSubscriber _topicSubscriber; + ComPrivate::Server _server; }; } diff --git a/src/comm/ros_bridge/include/RosBridgeClient.h b/src/comm/ros_bridge/include/RosBridgeClient.h index 83e6febe0..d28fda0fa 100644 --- a/src/comm/ros_bridge/include/RosBridgeClient.h +++ b/src/comm/ros_bridge/include/RosBridgeClient.h @@ -329,7 +329,7 @@ public: #endif } - void serviceResponse(const std::string& service, const std::string& id, bool result, const rapidjson::Document& values = {}) + void serviceResponse(const std::string& service, const std::string& id, bool result, const rapidjson::Document& values) { std::string message = "\"op\":\"service_response\", \"service\":\"" + service + "\", \"result\":" + ((result)? "true" : "false"); diff --git a/src/comm/ros_bridge/include/Server.cpp b/src/comm/ros_bridge/include/Server.cpp index a4907064d..abfb4b1ba 100644 --- a/src/comm/ros_bridge/include/Server.cpp +++ b/src/comm/ros_bridge/include/Server.cpp @@ -1,6 +1,89 @@ #include "Server.h" -Server::Server() +#include "rapidjson/include/rapidjson/ostreamwrapper.h" + +ROSBridge::ComPrivate::Server::Server(CasePacker &casePacker, RosbridgeWsClient &rbc) : + _rbc(rbc) + , _casePacker(casePacker) +{ + +} + +void ROSBridge::ComPrivate::Server::advertiseService(const std::string &service, + const std::string &type, + const Server::CallbackType &userCallback) +{ + std::string clientName = _serviceAdvertiserKey + service; + auto it = std::find(_clientList.begin(), _clientList.end(), clientName); + if (it != _clientList.end()) + return; // Service allready advertised. + _clientList.push_back(clientName); + _rbc.addClient(clientName); + + auto rbc = &_rbc; + auto casePacker = &_casePacker; + _rbc.advertiseService(clientName, service, type, + [userCallback, rbc, casePacker]( + std::shared_ptr, + std::shared_ptr in_message){ + // message->string() is destructive, so we have to buffer it first + std::string messagebuf = in_message->string(); + //std::cout << "advertiseServiceCallback(): Message Received: " + // << messagebuf << std::endl; + + rapidjson::Document document; + if (document.Parse(messagebuf.c_str()).HasParseError()) + { + std::cerr << "advertiseServiceCallback(): Error in parsing service request message: " + << messagebuf << std::endl; + return; + } + + // Extract message and call callback. + if ( !document.HasMember("args") || !document["args"].IsObject()){ + std::cerr << "advertiseServiceCallback(): message has no args: " + << messagebuf << std::endl; + return; + } + JsonDocUPtr pDoc(new JsonDoc()); + std::cout << "args member count: " << document["args"].MemberCount(); + if ( document["args"].MemberCount() > 0) + pDoc->CopyFrom(document["args"].Move(), document.GetAllocator()); + JsonDocUPtr pDocResponse = userCallback(std::move(pDoc)); + + rapidjson::OStreamWrapper out(std::cout); + // Write document... + rapidjson::Writer writer(out); + std::cout << "Ros Server: "; + (*pDocResponse).Accept(writer); + std::cout << std::endl; + + rbc->serviceResponse( + document["service"].GetString(), + document["id"].GetString(), + true, + *pDocResponse); + + return; + + }); +} + +ROSBridge::ComPrivate::Server::~Server() { + this->reset(); +} +void ROSBridge::ComPrivate::Server::start() +{ + _running = true; +} + +void ROSBridge::ComPrivate::Server::reset() +{ + if (!_running) + return; + for (const auto& str : _clientList) + _rbc.removeClient(str); + _clientList.clear(); } diff --git a/src/comm/ros_bridge/include/Server.h b/src/comm/ros_bridge/include/Server.h index 5cbd2374b..85f93c86c 100644 --- a/src/comm/ros_bridge/include/Server.h +++ b/src/comm/ros_bridge/include/Server.h @@ -1,11 +1,40 @@ -#ifndef SERVER_H -#define SERVER_H +#pragma once +#include "ros_bridge/include/ComPrivateInclude.h" +#include "ros_bridge/include/RosBridgeClient.h" +#include "ros_bridge/include/TypeFactory.h" +#include "ros_bridge/include/CasePacker.h" + +namespace ROSBridge { +namespace ComPrivate { class Server { + typedef std::vector ClientList; public: - Server(); -}; + typedef std::function CallbackType; + + + Server() = delete; + Server(CasePacker &casePacker, RosbridgeWsClient &rbc); + ~Server(); + + //! @brief Starts the subscriber. + void start(); -#endif // SERVER_H + //! @brief Resets the subscriber. + void reset(); + + void advertiseService(const std::string& service, + const std::string& type, + const CallbackType& userCallback); + +private: + + RosbridgeWsClient &_rbc; + CasePacker &_casePacker; + ClientList _clientList; + bool _running; +}; +} // namespace ComPrivate +} // namespace ROSBridge diff --git a/src/comm/ros_bridge/include/ThreadSafeQueue.h b/src/comm/ros_bridge/include/ThreadSafeQueue.h new file mode 100644 index 000000000..03d653eee --- /dev/null +++ b/src/comm/ros_bridge/include/ThreadSafeQueue.h @@ -0,0 +1,79 @@ +#pragma once + +#include +#include +#include + +namespace ROSBridge { + +template +class ThreadSafeQueue +{ +public: + ThreadSafeQueue(); + ~ThreadSafeQueue(); + + T pop_front(); + + void push_back(const T& item); + void push_back(T&& item); + + int size(); + bool empty(); + +private: + std::deque _queue; + std::mutex _mutex; + std::condition_variable _cond; +}; + +template +ThreadSafeQueue::ThreadSafeQueue(){} + +template +ThreadSafeQueue::~ThreadSafeQueue(){} + +template +T ThreadSafeQueue::pop_front() +{ + std::unique_lock mlock(_mutex); + while (_queue.empty()) + { + _cond.wait(mlock); + } + T t = std::move(_queue.front()); + _queue.pop_front(); + return t; +} + +template +void ThreadSafeQueue::push_back(const T& item) +{ + std::unique_lock mlock(_mutex); + _queue.push_back(item); + mlock.unlock(); // unlock before notificiation to minimize mutex con + _cond.notify_one(); // notify one waiting thread + +} + +template +void ThreadSafeQueue::push_back(T&& item) +{ + std::unique_lock mlock(_mutex); + _queue.push_back(std::move(item)); + mlock.unlock(); // unlock before notificiation to minimize mutex con + _cond.notify_one(); // notify one waiting thread + +} + +template +int ThreadSafeQueue::size() +{ + std::unique_lock mlock(_mutex); + int size = _queue.size(); + mlock.unlock(); + return size; +} + +} // namespace + diff --git a/src/comm/ros_bridge/include/TopicPublisher.cpp b/src/comm/ros_bridge/include/TopicPublisher.cpp index 18cda8de9..1e44a7a74 100644 --- a/src/comm/ros_bridge/include/TopicPublisher.cpp +++ b/src/comm/ros_bridge/include/TopicPublisher.cpp @@ -8,20 +8,17 @@ struct ROSBridge::ComPrivate::ThreadData { const ROSBridge::CasePacker &casePacker; RosbridgeWsClient &rbc; - std::mutex &rbcMutex; ROSBridge::ComPrivate::JsonQueue &queue; std::mutex &queueMutex; const std::atomic &running; std::condition_variable &cv; }; -ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker, - RosbridgeWsClient *rbc, - std::mutex *rbcMutex) : +ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker &casePacker, + RosbridgeWsClient &rbc) : _running(false) , _casePacker(casePacker) , _rbc(rbc) - , _rbcMutex(rbcMutex) { } @@ -37,9 +34,8 @@ void ROSBridge::ComPrivate::TopicPublisher::start() return; _running.store(true); ROSBridge::ComPrivate::ThreadData data{ - *_casePacker, - *_rbc, - *_rbcMutex, + _casePacker, + _rbc, _queue, _queueMutex, _running, @@ -64,11 +60,6 @@ void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data { // Init. ClientMap clientMap; -// { -// std::lock_guard lk(data.rbcMutex); -// data.rbc.addClient(ROSBridge::ComPrivate::_topicAdvertiserKey); -// data.rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey); -// } // Main Loop. while(data.running.load()){ std::unique_lock lk(data.queueMutex); @@ -101,27 +92,18 @@ void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data std::cout << clientName << ";" << tag.topic() << ";" << tag.messageType() << ";" << std::endl; - { - std::lock_guard lk(data.rbcMutex); - data.rbc.addClient(clientName); - data.rbc.advertise(clientName, - tag.topic(), - tag.messageType() ); - } + data.rbc.addClient(clientName); + data.rbc.advertise(clientName, + tag.topic(), + tag.messageType() ); } // Publish Json message. - { - std::lock_guard lk(data.rbcMutex); - data.rbc.publish(tag.topic(), *pJsonDoc.get()); - } + data.rbc.publish(tag.topic(), *pJsonDoc.get()); } // while loop // Tidy up. - { - std::lock_guard lk(data.rbcMutex); - for (auto& it : clientMap) - data.rbc.removeClient(it.second); - } + for (auto& it : clientMap) + data.rbc.removeClient(it.second); } diff --git a/src/comm/ros_bridge/include/TopicPublisher.h b/src/comm/ros_bridge/include/TopicPublisher.h index ce7e14285..1c9dc680a 100644 --- a/src/comm/ros_bridge/include/TopicPublisher.h +++ b/src/comm/ros_bridge/include/TopicPublisher.h @@ -24,9 +24,8 @@ class TopicPublisher public: TopicPublisher() = delete; - TopicPublisher(CasePacker *casePacker, - RosbridgeWsClient *rbc, - std::mutex *rbcMutex); + TopicPublisher(CasePacker &casePacker, + RosbridgeWsClient &rbc); ~TopicPublisher(); @@ -46,7 +45,7 @@ public: template void publish(const T &msg, const std::string &topic){ - JsonDocUPtr docPtr(_casePacker->pack(msg, topic)); + JsonDocUPtr docPtr(_casePacker.pack(msg, topic)); publish(std::move(docPtr)); } @@ -54,9 +53,8 @@ private: JsonQueue _queue; std::mutex _queueMutex; std::atomic _running; - CasePacker *_casePacker; - RosbridgeWsClient *_rbc; - std::mutex *_rbcMutex; + CasePacker &_casePacker; + RosbridgeWsClient &_rbc; CondVar _cv; ThreadPtr _pThread; }; diff --git a/src/comm/ros_bridge/include/TopicSubscriber.cpp b/src/comm/ros_bridge/include/TopicSubscriber.cpp index dce6371ec..7a1d8311f 100644 --- a/src/comm/ros_bridge/include/TopicSubscriber.cpp +++ b/src/comm/ros_bridge/include/TopicSubscriber.cpp @@ -1,11 +1,10 @@ #include "TopicSubscriber.h" -ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(ROSBridge::CasePacker *casePacker, - RosbridgeWsClient *rbc, std::mutex *rbcMutex) : +ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(CasePacker &casePacker, + RosbridgeWsClient &rbc) : _casePacker(casePacker) , _rbc(rbc) - , _rbcMutex(rbcMutex) , _running(false) { @@ -26,14 +25,13 @@ void ROSBridge::ComPrivate::TopicSubscriber::reset() if ( _running ){ _running = false; { - std::lock_guard lk(*_rbcMutex); for (std::string clientName : _clientList){ - _rbc->removeClient(clientName); + _rbc.removeClient(clientName); } } { - std::lock_guard lk(_callbackMap.mutex); - _callbackMap.map.clear(); + std::lock_guard lk(_callbackMapStruct.mutex); + _callbackMapStruct.map.clear(); } _clientList.clear(); } @@ -52,8 +50,8 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( HashType hash = getHash(clientName); { - std::lock_guard lk(_callbackMap.mutex); - auto ret = _callbackMap.map.insert(std::make_pair(hash, callback)); // + std::lock_guard lk(_callbackMapStruct.mutex); + auto ret = _callbackMapStruct.map.insert(std::make_pair(hash, callback)); // if ( !ret.second ) return false; // Topic subscription already present. @@ -62,7 +60,7 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( using namespace std::placeholders; auto f = std::bind(&ROSBridge::ComPrivate::subscriberCallback, hash, - std::ref(_callbackMap), + std::ref(_callbackMapStruct), _1, _2); // std::cout << std::endl; @@ -70,11 +68,20 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( // std::cout << "client name: " << clientName << std::endl; // std::cout << "topic: " << topic << std::endl; { - std::lock_guard lk(*_rbcMutex); - _rbc->addClient(clientName); - _rbc->subscribe(clientName, + auto start = std::chrono::high_resolution_clock::now(); + _rbc.addClient(clientName); + auto end = std::chrono::high_resolution_clock::now(); + std::cout << "add client time: " + << std::chrono::duration_cast(end-start).count() + << " ms" << std::endl; + start = std::chrono::high_resolution_clock::now(); + _rbc.subscribe(clientName, topic, f ); + end = std::chrono::high_resolution_clock::now(); + std::cout << "subscribe time: " + << std::chrono::duration_cast(end-start).count() + << " ms" << std::endl; } return true; @@ -84,7 +91,7 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( using WsClient = SimpleWeb::SocketClient; void ROSBridge::ComPrivate::subscriberCallback( const HashType &hash, - ROSBridge::ComPrivate::CallbackMapWrapper &mapWrapper, + ROSBridge::ComPrivate::MapStruct &mapWrapper, std::shared_ptr, std::shared_ptr in_message) { diff --git a/src/comm/ros_bridge/include/TopicSubscriber.h b/src/comm/ros_bridge/include/TopicSubscriber.h index 4ce462313..585e36477 100644 --- a/src/comm/ros_bridge/include/TopicSubscriber.h +++ b/src/comm/ros_bridge/include/TopicSubscriber.h @@ -10,7 +10,7 @@ namespace ComPrivate { typedef std::function CallbackType; -struct CallbackMapWrapper{ +struct MapStruct{ typedef std::map Map; Map map; std::mutex mutex; @@ -22,7 +22,7 @@ class TopicSubscriber public: TopicSubscriber() = delete; - TopicSubscriber(CasePacker *casePacker, RosbridgeWsClient *rbc, std::mutex *rbcMutex); + TopicSubscriber(CasePacker &casePacker, RosbridgeWsClient &rbc); ~TopicSubscriber(); //! @brief Starts the subscriber. @@ -40,16 +40,15 @@ private: - CasePacker *_casePacker; - RosbridgeWsClient *_rbc; - std::mutex *_rbcMutex; - CallbackMapWrapper _callbackMap; + CasePacker &_casePacker; + RosbridgeWsClient &_rbc; + MapStruct _callbackMapStruct; ClientList _clientList; bool _running; }; void subscriberCallback(const HashType &hash, - CallbackMapWrapper &mapWrapper, + MapStruct &mapWrapper, std::shared_ptr /*connection*/, std::shared_ptr in_message); } // namespace ComPrivate diff --git a/src/comm/ros_bridge/src/ROSBridge.cpp b/src/comm/ros_bridge/src/ROSBridge.cpp index 9b8d07fa5..f552faf9d 100644 --- a/src/comm/ros_bridge/src/ROSBridge.cpp +++ b/src/comm/ros_bridge/src/ROSBridge.cpp @@ -3,8 +3,9 @@ ROSBridge::ROSBridge::ROSBridge() : _casePacker(&_typeFactory, &_jsonFactory) , _rbc("localhost:9090") - , _topicPublisher(&_casePacker, &_rbc, &_rbcMutex) - , _topicSubscriber(&_casePacker, &_rbc, &_rbcMutex) + , _topicPublisher(_casePacker, _rbc) + , _topicSubscriber(_casePacker, _rbc) + , _server(_casePacker, _rbc) { } @@ -19,6 +20,13 @@ void ROSBridge::ROSBridge::subscribe(const char *topic, const std::function &callback) +{ + _server.advertiseService(service, type, callback); +} + const ROSBridge::CasePacker *ROSBridge::ROSBridge::casePacker() const { return &_casePacker; @@ -28,17 +36,18 @@ void ROSBridge::ROSBridge::start() { _topicPublisher.start(); _topicSubscriber.start(); + _server.start(); } void ROSBridge::ROSBridge::reset() { _topicPublisher.reset(); _topicSubscriber.reset(); + _server.reset(); } bool ROSBridge::ROSBridge::connected() { - std::lock_guard lk(_rbcMutex); return _rbc.connected(); } -- 2.22.0