Commit be175073 authored by Valentin Platzgummer's avatar Valentin Platzgummer

service calls are working now

parent b4b4aede
...@@ -14,6 +14,8 @@ ...@@ -14,6 +14,8 @@
#include "QVector3D" #include "QVector3D"
#include <QScopedPointer> #include <QScopedPointer>
#include <memory>
// const char* WimaController::wimaFileExtension = "wima"; // const char* WimaController::wimaFileExtension = "wima";
...@@ -80,6 +82,7 @@ WimaController::WimaController(QObject *parent) ...@@ -80,6 +82,7 @@ WimaController::WimaController(QObject *parent)
, _fallbackStatus (0 /*status: not connected*/) , _fallbackStatus (0 /*status: not connected*/)
, _bridgeSetupDone (false) , _bridgeSetupDone (false)
, _pRosBridge (new ROSBridge::ROSBridge()) , _pRosBridge (new ROSBridge::ROSBridge())
, _pCasePacker (_pRosBridge->casePacker())
{ {
// Set up facts. // Set up facts.
_showAllMissionItems.setRawValue(true); _showAllMissionItems.setRawValue(true);
...@@ -480,7 +483,9 @@ bool WimaController::setWimaPlanData(const WimaPlanData &planData) ...@@ -480,7 +483,9 @@ bool WimaController::setWimaPlanData(const WimaPlanData &planData)
_scenario.addArea(corridor); _scenario.addArea(corridor);
// Check if scenario is defined. // Check if scenario is defined.
if ( !_isScenarioDefinedErrorMessage() ) { if ( !_scenario.defined(_snakeTileWidth.rawValue().toUInt(),
_snakeTileHeight.rawValue().toUInt(),
_snakeMinTileArea.rawValue().toUInt()) ) {
Q_ASSERT(false); Q_ASSERT(false);
return false; return false;
} }
...@@ -721,28 +726,10 @@ void WimaController::_eventTimerHandler() ...@@ -721,28 +726,10 @@ void WimaController::_eventTimerHandler()
if ( _enableSnake.rawValue().toBool() if ( _enableSnake.rawValue().toBool()
&& _pRosBridge->connected() ) { && _pRosBridge->connected() ) {
if ( !_bridgeSetupDone ) { if ( !_bridgeSetupDone ) {
qWarning() << "ROS Bridge setup. ";
auto start = std::chrono::high_resolution_clock::now();
_pRosBridge->start(); _pRosBridge->start();
auto end = std::chrono::high_resolution_clock::now();
qWarning() << "_pRosBridge->start() time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms";
start = std::chrono::high_resolution_clock::now();
_pRosBridge->publish(_snakeOrigin, "/snake/origin"); _pRosBridge->publish(_snakeOrigin, "/snake/origin");
end = std::chrono::high_resolution_clock::now();
qWarning() << "_pRosBridge->publish(_snakeOrigin, \"/snake/origin\") time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms";
start = std::chrono::high_resolution_clock::now();
_pRosBridge->publish(_snakeTilesLocal, "/snake/tiles"); _pRosBridge->publish(_snakeTilesLocal, "/snake/tiles");
end = std::chrono::high_resolution_clock::now();
qWarning() << "_pRosBridge->publish(_snakeTilesLocal, \"/snake/tiles\") time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms";
start = std::chrono::high_resolution_clock::now();
_pRosBridge->subscribe("/nemo/progress", _pRosBridge->subscribe("/nemo/progress",
/* callback */ [this](JsonDocUPtr pDoc){ /* callback */ [this](JsonDocUPtr pDoc){
int requiredSize = this->_snakeTilesLocal.polygons().size(); int requiredSize = this->_snakeTilesLocal.polygons().size();
...@@ -754,12 +741,6 @@ void WimaController::_eventTimerHandler() ...@@ -754,12 +741,6 @@ void WimaController::_eventTimerHandler()
emit WimaController::nemoProgressChanged(); emit WimaController::nemoProgressChanged();
}); });
end = std::chrono::high_resolution_clock::now();
qWarning() << "_pRosBridge->subscribe(\"/nemo/progress\" time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms";
start = std::chrono::high_resolution_clock::now();
_pRosBridge->subscribe("/nemo/heartbeat", _pRosBridge->subscribe("/nemo/heartbeat",
/* callback */ [this, &nemoStatusTicker](JsonDocUPtr pDoc){ /* callback */ [this, &nemoStatusTicker](JsonDocUPtr pDoc){
if ( !this->_pRosBridge->casePacker()->unpack(pDoc, this->_nemoHeartbeat) ) { if ( !this->_pRosBridge->casePacker()->unpack(pDoc, this->_nemoHeartbeat) ) {
...@@ -773,10 +754,35 @@ void WimaController::_eventTimerHandler() ...@@ -773,10 +754,35 @@ void WimaController::_eventTimerHandler()
emit WimaController::nemoStatusChanged(); emit WimaController::nemoStatusChanged();
emit WimaController::nemoStatusStringChanged(); emit WimaController::nemoStatusStringChanged();
}); });
end = std::chrono::high_resolution_clock::now();
qWarning() << "_pRosBridge->subscribe(\"/nemo/heartbeat\" time: " auto pOrigin = &_snakeOrigin;
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count() auto casePacker = _pCasePacker;
<< " ms"; _pRosBridge->advertiseService("/snake/get_origin", "snake_msgs/GetOrigin",
[casePacker, pOrigin](JsonDocUPtr) -> JsonDocUPtr {
JsonDocUPtr pDoc = casePacker->pack(*pOrigin, "");
casePacker->removeTag(pDoc);
rapidjson::Document value(rapidjson::kObjectType);
JsonDocUPtr pReturn(new rapidjson::Document(rapidjson::kObjectType));
value.CopyFrom(*pDoc, pReturn->GetAllocator());
pReturn->AddMember("origin", value, pReturn->GetAllocator());
return pReturn;
});
auto pTiles = &_snakeTilesLocal;
_pRosBridge->advertiseService("/snake/get_tiles", "snake_msgs/GetTiles",
[casePacker, pTiles](JsonDocUPtr) -> JsonDocUPtr {
JsonDocUPtr pDoc = casePacker->pack(*pTiles, "");
casePacker->removeTag(pDoc);
rapidjson::Document value(rapidjson::kObjectType);
JsonDocUPtr pReturn(new rapidjson::Document(rapidjson::kObjectType));
value.CopyFrom(*pDoc, pReturn->GetAllocator());
pReturn->AddMember("tiles", value, pReturn->GetAllocator());
return pReturn;
});
_bridgeSetupDone = true; _bridgeSetupDone = true;
} }
} else if ( _bridgeSetupDone) { } else if ( _bridgeSetupDone) {
......
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
#include "Snake/QNemoHeartbeat.h" #include "Snake/QNemoHeartbeat.h"
#include "ros_bridge/include/ROSBridge.h" #include "ros_bridge/include/ROSBridge.h"
#include "ros_bridge/include/CasePacker.h"
#include "WaypointManager/DefaultManager.h" #include "WaypointManager/DefaultManager.h"
#include "WaypointManager/RTLManager.h" #include "WaypointManager/RTLManager.h"
...@@ -349,6 +350,7 @@ private slots: ...@@ -349,6 +350,7 @@ private slots:
private: private:
using StatusMap = std::map<int, QString>; using StatusMap = std::map<int, QString>;
using CasePacker = ROSBridge::CasePacker;
// Controllers. // Controllers.
PlanMasterController *_masterController; PlanMasterController *_masterController;
...@@ -397,8 +399,6 @@ private: ...@@ -397,8 +399,6 @@ private:
// Waypoint statistics. // Waypoint statistics.
double _measurementPathLength; // the lenght of the phase in meters double _measurementPathLength; // the lenght of the phase in meters
// double _phaseDistance; // the lenth in meters of the current phase
// double _phaseDuration; // the phase duration in seconds
// Snake // Snake
bool _snakeCalcInProgress; bool _snakeCalcInProgress;
...@@ -414,6 +414,7 @@ private: ...@@ -414,6 +414,7 @@ private:
QNemoHeartbeat _nemoHeartbeat; // measurement progress QNemoHeartbeat _nemoHeartbeat; // measurement progress
int _fallbackStatus; int _fallbackStatus;
ROSBridgePtr _pRosBridge; ROSBridgePtr _pRosBridge;
const CasePacker *_pCasePacker;
bool _bridgeSetupDone; bool _bridgeSetupDone;
static StatusMap _nemoStatusMap; static StatusMap _nemoStatusMap;
......
...@@ -20,7 +20,7 @@ using ClientMap = std::unordered_map<HashType, std::string>; ...@@ -20,7 +20,7 @@ using ClientMap = std::unordered_map<HashType, std::string>;
static const char* _topicAdvertiserKey = "topic_advertiser"; static const char* _topicAdvertiserKey = "topic_advertiser";
static const char* _topicPublisherKey = "topic_publisher"; static const char* _topicPublisherKey = "topic_publisher";
//static const char* _topicAdvertiserKey = "service_advertiser"; static const char* _serviceAdvertiserKey = "service_advertiser";
static const char* _topicSubscriberKey = "topic_subscriber"; static const char* _topicSubscriberKey = "topic_subscriber";
std::size_t getHash(const std::string &str); std::size_t getHash(const std::string &str);
......
...@@ -6,11 +6,10 @@ ...@@ -6,11 +6,10 @@
#include "ros_bridge/include/JsonFactory.h" #include "ros_bridge/include/JsonFactory.h"
#include "ros_bridge/include/TopicPublisher.h" #include "ros_bridge/include/TopicPublisher.h"
#include "ros_bridge/include/TopicSubscriber.h" #include "ros_bridge/include/TopicSubscriber.h"
#include "ros_bridge/include/Server.h"
#include <memory> #include <memory>
#include <tuple> #include <functional>
#include "boost/lockfree/queue.hpp"
namespace ROSBridge { namespace ROSBridge {
class ROSBridge class ROSBridge
...@@ -27,8 +26,11 @@ public: ...@@ -27,8 +26,11 @@ public:
_topicPublisher.publish(msg, topic); _topicPublisher.publish(msg, topic);
} }
void publish(JsonDocUPtr doc); void publish(JsonDocUPtr doc);
void subscribe(const char *topic,
void subscribe(const char *topic, const std::function<void(JsonDocUPtr)> &callBack); const std::function<void(JsonDocUPtr)> &callBack);
void advertiseService(const std::string &service,
const std::string &type,
const std::function<JsonDocUPtr(JsonDocUPtr)> &callback);
const CasePacker *casePacker() const; const CasePacker *casePacker() const;
...@@ -45,9 +47,9 @@ private: ...@@ -45,9 +47,9 @@ private:
JsonFactory _jsonFactory; JsonFactory _jsonFactory;
CasePacker _casePacker; CasePacker _casePacker;
RosbridgeWsClient _rbc; RosbridgeWsClient _rbc;
std::mutex _rbcMutex;
ComPrivate::TopicPublisher _topicPublisher; ComPrivate::TopicPublisher _topicPublisher;
ComPrivate::TopicSubscriber _topicSubscriber; ComPrivate::TopicSubscriber _topicSubscriber;
ComPrivate::Server _server;
}; };
} }
...@@ -329,7 +329,7 @@ public: ...@@ -329,7 +329,7 @@ public:
#endif #endif
} }
void serviceResponse(const std::string& service, const std::string& id, bool result, const rapidjson::Document& values = {}) void serviceResponse(const std::string& service, const std::string& id, bool result, const rapidjson::Document& values)
{ {
std::string message = "\"op\":\"service_response\", \"service\":\"" + service + "\", \"result\":" + ((result)? "true" : "false"); std::string message = "\"op\":\"service_response\", \"service\":\"" + service + "\", \"result\":" + ((result)? "true" : "false");
......
#include "Server.h" #include "Server.h"
Server::Server() #include "rapidjson/include/rapidjson/ostreamwrapper.h"
ROSBridge::ComPrivate::Server::Server(CasePacker &casePacker, RosbridgeWsClient &rbc) :
_rbc(rbc)
, _casePacker(casePacker)
{
}
void ROSBridge::ComPrivate::Server::advertiseService(const std::string &service,
const std::string &type,
const Server::CallbackType &userCallback)
{
std::string clientName = _serviceAdvertiserKey + service;
auto it = std::find(_clientList.begin(), _clientList.end(), clientName);
if (it != _clientList.end())
return; // Service allready advertised.
_clientList.push_back(clientName);
_rbc.addClient(clientName);
auto rbc = &_rbc;
auto casePacker = &_casePacker;
_rbc.advertiseService(clientName, service, type,
[userCallback, rbc, casePacker](
std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message){
// message->string() is destructive, so we have to buffer it first
std::string messagebuf = in_message->string();
//std::cout << "advertiseServiceCallback(): Message Received: "
// << messagebuf << std::endl;
rapidjson::Document document;
if (document.Parse(messagebuf.c_str()).HasParseError())
{
std::cerr << "advertiseServiceCallback(): Error in parsing service request message: "
<< messagebuf << std::endl;
return;
}
// Extract message and call callback.
if ( !document.HasMember("args") || !document["args"].IsObject()){
std::cerr << "advertiseServiceCallback(): message has no args: "
<< messagebuf << std::endl;
return;
}
JsonDocUPtr pDoc(new JsonDoc());
std::cout << "args member count: " << document["args"].MemberCount();
if ( document["args"].MemberCount() > 0)
pDoc->CopyFrom(document["args"].Move(), document.GetAllocator());
JsonDocUPtr pDocResponse = userCallback(std::move(pDoc));
rapidjson::OStreamWrapper out(std::cout);
// Write document...
rapidjson::Writer<rapidjson::OStreamWrapper> writer(out);
std::cout << "Ros Server: ";
(*pDocResponse).Accept(writer);
std::cout << std::endl;
rbc->serviceResponse(
document["service"].GetString(),
document["id"].GetString(),
true,
*pDocResponse);
return;
});
}
ROSBridge::ComPrivate::Server::~Server()
{ {
this->reset();
}
void ROSBridge::ComPrivate::Server::start()
{
_running = true;
}
void ROSBridge::ComPrivate::Server::reset()
{
if (!_running)
return;
for (const auto& str : _clientList)
_rbc.removeClient(str);
_clientList.clear();
} }
#ifndef SERVER_H #pragma once
#define SERVER_H
#include "ros_bridge/include/ComPrivateInclude.h"
#include "ros_bridge/include/RosBridgeClient.h"
#include "ros_bridge/include/TypeFactory.h"
#include "ros_bridge/include/CasePacker.h"
namespace ROSBridge {
namespace ComPrivate {
class Server class Server
{ {
typedef std::vector<std::string> ClientList;
public: public:
Server(); typedef std::function<JsonDocUPtr(JsonDocUPtr)> CallbackType;
};
Server() = delete;
Server(CasePacker &casePacker, RosbridgeWsClient &rbc);
~Server();
//! @brief Starts the subscriber.
void start();
#endif // SERVER_H //! @brief Resets the subscriber.
void reset();
void advertiseService(const std::string& service,
const std::string& type,
const CallbackType& userCallback);
private:
RosbridgeWsClient &_rbc;
CasePacker &_casePacker;
ClientList _clientList;
bool _running;
};
} // namespace ComPrivate
} // namespace ROSBridge
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
namespace ROSBridge {
template <class T>
class ThreadSafeQueue
{
public:
ThreadSafeQueue();
~ThreadSafeQueue();
T pop_front();
void push_back(const T& item);
void push_back(T&& item);
int size();
bool empty();
private:
std::deque<T> _queue;
std::mutex _mutex;
std::condition_variable _cond;
};
template <typename T>
ThreadSafeQueue<T>::ThreadSafeQueue(){}
template <typename T>
ThreadSafeQueue<T>::~ThreadSafeQueue(){}
template <typename T>
T ThreadSafeQueue<T>::pop_front()
{
std::unique_lock<std::mutex> mlock(_mutex);
while (_queue.empty())
{
_cond.wait(mlock);
}
T t = std::move(_queue.front());
_queue.pop_front();
return t;
}
template <typename T>
void ThreadSafeQueue<T>::push_back(const T& item)
{
std::unique_lock<std::mutex> mlock(_mutex);
_queue.push_back(item);
mlock.unlock(); // unlock before notificiation to minimize mutex con
_cond.notify_one(); // notify one waiting thread
}
template <typename T>
void ThreadSafeQueue<T>::push_back(T&& item)
{
std::unique_lock<std::mutex> mlock(_mutex);
_queue.push_back(std::move(item));
mlock.unlock(); // unlock before notificiation to minimize mutex con
_cond.notify_one(); // notify one waiting thread
}
template <typename T>
int ThreadSafeQueue<T>::size()
{
std::unique_lock<std::mutex> mlock(_mutex);
int size = _queue.size();
mlock.unlock();
return size;
}
} // namespace
...@@ -8,20 +8,17 @@ struct ROSBridge::ComPrivate::ThreadData ...@@ -8,20 +8,17 @@ struct ROSBridge::ComPrivate::ThreadData
{ {
const ROSBridge::CasePacker &casePacker; const ROSBridge::CasePacker &casePacker;
RosbridgeWsClient &rbc; RosbridgeWsClient &rbc;
std::mutex &rbcMutex;
ROSBridge::ComPrivate::JsonQueue &queue; ROSBridge::ComPrivate::JsonQueue &queue;
std::mutex &queueMutex; std::mutex &queueMutex;
const std::atomic<bool> &running; const std::atomic<bool> &running;
std::condition_variable &cv; std::condition_variable &cv;
}; };
ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker, ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker &casePacker,
RosbridgeWsClient *rbc, RosbridgeWsClient &rbc) :
std::mutex *rbcMutex) :
_running(false) _running(false)
, _casePacker(casePacker) , _casePacker(casePacker)
, _rbc(rbc) , _rbc(rbc)
, _rbcMutex(rbcMutex)
{ {
} }
...@@ -37,9 +34,8 @@ void ROSBridge::ComPrivate::TopicPublisher::start() ...@@ -37,9 +34,8 @@ void ROSBridge::ComPrivate::TopicPublisher::start()
return; return;
_running.store(true); _running.store(true);
ROSBridge::ComPrivate::ThreadData data{ ROSBridge::ComPrivate::ThreadData data{
*_casePacker, _casePacker,
*_rbc, _rbc,
*_rbcMutex,
_queue, _queue,
_queueMutex, _queueMutex,
_running, _running,
...@@ -64,11 +60,6 @@ void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data ...@@ -64,11 +60,6 @@ void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data
{ {
// Init. // Init.
ClientMap clientMap; ClientMap clientMap;
// {
// std::lock_guard<std::mutex> lk(data.rbcMutex);
// data.rbc.addClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
// data.rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey);
// }
// Main Loop. // Main Loop.
while(data.running.load()){ while(data.running.load()){
std::unique_lock<std::mutex> lk(data.queueMutex); std::unique_lock<std::mutex> lk(data.queueMutex);
...@@ -101,27 +92,18 @@ void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data ...@@ -101,27 +92,18 @@ void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data
std::cout << clientName << ";" std::cout << clientName << ";"
<< tag.topic() << ";" << tag.topic() << ";"
<< tag.messageType() << ";" << std::endl; << tag.messageType() << ";" << std::endl;
{ data.rbc.addClient(clientName);
std::lock_guard<std::mutex> lk(data.rbcMutex); data.rbc.advertise(clientName,
data.rbc.addClient(clientName); tag.topic(),
data.rbc.advertise(clientName, tag.messageType() );
tag.topic(),
tag.messageType() );
}
} }
// Publish Json message. // Publish Json message.
{ data.rbc.publish(tag.topic(), *pJsonDoc.get());
std::lock_guard<std::mutex> lk(data.rbcMutex);
data.rbc.publish(tag.topic(), *pJsonDoc.get());
}
} // while loop } // while loop
// Tidy up. // Tidy up.
{ for (auto& it : clientMap)
std::lock_guard<std::mutex> lk(data.rbcMutex); data.rbc.removeClient(it.second);
for (auto& it : clientMap)
data.rbc.removeClient(it.second);
}
} }
...@@ -24,9 +24,8 @@ class TopicPublisher ...@@ -24,9 +24,8 @@ class TopicPublisher
public: public:
TopicPublisher() = delete; TopicPublisher() = delete;
TopicPublisher(CasePacker *casePacker, TopicPublisher(CasePacker &casePacker,
RosbridgeWsClient *rbc, RosbridgeWsClient &rbc);
std::mutex *rbcMutex);
~TopicPublisher(); ~TopicPublisher();
...@@ -46,7 +45,7 @@ public: ...@@ -46,7 +45,7 @@ public:
template<class T> template<class T>
void publish(const T &msg, const std::string &topic){ void publish(const T &msg, const std::string &topic){
JsonDocUPtr docPtr(_casePacker->pack(msg, topic)); JsonDocUPtr docPtr(_casePacker.pack(msg, topic));
publish(std::move(docPtr)); publish(std::move(docPtr));
} }
...@@ -54,9 +53,8 @@ private: ...@@ -54,9 +53,8 @@ private:
JsonQueue _queue; JsonQueue _queue;
std::mutex _queueMutex; std::mutex _queueMutex;
std::atomic<bool> _running; std::atomic<bool> _running;
CasePacker *_casePacker; CasePacker &_casePacker;
RosbridgeWsClient *_rbc; RosbridgeWsClient &_rbc;
std::mutex *_rbcMutex;
CondVar _cv; CondVar _cv;
ThreadPtr _pThread; ThreadPtr _pThread;
}; };
......
#include "TopicSubscriber.h" #include "TopicSubscriber.h"
ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(ROSBridge::CasePacker *casePacker, ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(CasePacker &casePacker,
RosbridgeWsClient *rbc, std::mutex *rbcMutex) : RosbridgeWsClient &rbc) :
_casePacker(casePacker) _casePacker(casePacker)
, _rbc(rbc) , _rbc(rbc)
, _rbcMutex(rbcMutex)
, _running(false) , _running(false)
{ {
...@@ -26,14 +25,13 @@ void ROSBridge::ComPrivate::TopicSubscriber::reset() ...@@ -26,14 +25,13 @@ void ROSBridge::ComPrivate::TopicSubscriber::reset()
if ( _running ){ if ( _running ){
_running = false; _running = false;
{ {
std::lock_guard<std::mutex> lk(*_rbcMutex);
for (std::string clientName : _clientList){ for (std::string clientName : _clientList){
_rbc->removeClient(clientName); _rbc.removeClient(clientName);
} }
} }
{ {
std::lock_guard<std::mutex> lk(_callbackMap.mutex); std::lock_guard<std::mutex> lk(_callbackMapStruct.mutex);
_callbackMap.map.clear(); _callbackMapStruct.map.clear();
} }
_clientList.clear(); _clientList.clear();
} }
...@@ -52,8 +50,8 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( ...@@ -52,8 +50,8 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe(
HashType hash = getHash(clientName); HashType hash = getHash(clientName);
{ {
std::lock_guard<std::mutex> lk(_callbackMap.mutex); std::lock_guard<std::mutex> lk(_callbackMapStruct.mutex);
auto ret = _callbackMap.map.insert(std::make_pair(hash, callback)); // auto ret = _callbackMapStruct.map.insert(std::make_pair(hash, callback)); //
if ( !ret.second ) if ( !ret.second )
return false; // Topic subscription already present. return false; // Topic subscription already present.
...@@ -62,7 +60,7 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( ...@@ -62,7 +60,7 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe(
using namespace std::placeholders; using namespace std::placeholders;
auto f = std::bind(&ROSBridge::ComPrivate::subscriberCallback, auto f = std::bind(&ROSBridge::ComPrivate::subscriberCallback,
hash, hash,
std::ref(_callbackMap), std::ref(_callbackMapStruct),
_1, _2); _1, _2);
// std::cout << std::endl; // std::cout << std::endl;
...@@ -70,11 +68,20 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( ...@@ -70,11 +68,20 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe(
// std::cout << "client name: " << clientName << std::endl; // std::cout << "client name: " << clientName << std::endl;
// std::cout << "topic: " << topic << std::endl; // std::cout << "topic: " << topic << std::endl;
{ {
std::lock_guard<std::mutex> lk(*_rbcMutex); auto start = std::chrono::high_resolution_clock::now();
_rbc->addClient(clientName); _rbc.addClient(clientName);
_rbc->subscribe(clientName, auto end = std::chrono::high_resolution_clock::now();
std::cout << "add client time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms" << std::endl;
start = std::chrono::high_resolution_clock::now();
_rbc.subscribe(clientName,
topic, topic,
f ); f );
end = std::chrono::high_resolution_clock::now();
std::cout << "subscribe time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms" << std::endl;
} }
return true; return true;
...@@ -84,7 +91,7 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( ...@@ -84,7 +91,7 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe(
using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>; using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>;
void ROSBridge::ComPrivate::subscriberCallback( void ROSBridge::ComPrivate::subscriberCallback(
const HashType &hash, const HashType &hash,
ROSBridge::ComPrivate::CallbackMapWrapper &mapWrapper, ROSBridge::ComPrivate::MapStruct &mapWrapper,
std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message) std::shared_ptr<WsClient::InMessage> in_message)
{ {
......
...@@ -10,7 +10,7 @@ namespace ComPrivate { ...@@ -10,7 +10,7 @@ namespace ComPrivate {
typedef std::function<void(JsonDocUPtr)> CallbackType; typedef std::function<void(JsonDocUPtr)> CallbackType;
struct CallbackMapWrapper{ struct MapStruct{
typedef std::map<HashType, CallbackType> Map; typedef std::map<HashType, CallbackType> Map;
Map map; Map map;
std::mutex mutex; std::mutex mutex;
...@@ -22,7 +22,7 @@ class TopicSubscriber ...@@ -22,7 +22,7 @@ class TopicSubscriber
public: public:
TopicSubscriber() = delete; TopicSubscriber() = delete;
TopicSubscriber(CasePacker *casePacker, RosbridgeWsClient *rbc, std::mutex *rbcMutex); TopicSubscriber(CasePacker &casePacker, RosbridgeWsClient &rbc);
~TopicSubscriber(); ~TopicSubscriber();
//! @brief Starts the subscriber. //! @brief Starts the subscriber.
...@@ -40,16 +40,15 @@ private: ...@@ -40,16 +40,15 @@ private:
CasePacker *_casePacker; CasePacker &_casePacker;
RosbridgeWsClient *_rbc; RosbridgeWsClient &_rbc;
std::mutex *_rbcMutex; MapStruct _callbackMapStruct;
CallbackMapWrapper _callbackMap;
ClientList _clientList; ClientList _clientList;
bool _running; bool _running;
}; };
void subscriberCallback(const HashType &hash, void subscriberCallback(const HashType &hash,
CallbackMapWrapper &mapWrapper, MapStruct &mapWrapper,
std::shared_ptr<WsClient::Connection> /*connection*/, std::shared_ptr<WsClient::Connection> /*connection*/,
std::shared_ptr<WsClient::InMessage> in_message); std::shared_ptr<WsClient::InMessage> in_message);
} // namespace ComPrivate } // namespace ComPrivate
......
...@@ -3,8 +3,9 @@ ...@@ -3,8 +3,9 @@
ROSBridge::ROSBridge::ROSBridge() : ROSBridge::ROSBridge::ROSBridge() :
_casePacker(&_typeFactory, &_jsonFactory) _casePacker(&_typeFactory, &_jsonFactory)
, _rbc("localhost:9090") , _rbc("localhost:9090")
, _topicPublisher(&_casePacker, &_rbc, &_rbcMutex) , _topicPublisher(_casePacker, _rbc)
, _topicSubscriber(&_casePacker, &_rbc, &_rbcMutex) , _topicSubscriber(_casePacker, _rbc)
, _server(_casePacker, _rbc)
{ {
} }
...@@ -19,6 +20,13 @@ void ROSBridge::ROSBridge::subscribe(const char *topic, const std::function<void ...@@ -19,6 +20,13 @@ void ROSBridge::ROSBridge::subscribe(const char *topic, const std::function<void
_topicSubscriber.subscribe(topic, callBack); _topicSubscriber.subscribe(topic, callBack);
} }
void ROSBridge::ROSBridge::advertiseService(const std::string &service,
const std::string &type,
const std::function<JsonDocUPtr(JsonDocUPtr)> &callback)
{
_server.advertiseService(service, type, callback);
}
const ROSBridge::CasePacker *ROSBridge::ROSBridge::casePacker() const const ROSBridge::CasePacker *ROSBridge::ROSBridge::casePacker() const
{ {
return &_casePacker; return &_casePacker;
...@@ -28,17 +36,18 @@ void ROSBridge::ROSBridge::start() ...@@ -28,17 +36,18 @@ void ROSBridge::ROSBridge::start()
{ {
_topicPublisher.start(); _topicPublisher.start();
_topicSubscriber.start(); _topicSubscriber.start();
_server.start();
} }
void ROSBridge::ROSBridge::reset() void ROSBridge::ROSBridge::reset()
{ {
_topicPublisher.reset(); _topicPublisher.reset();
_topicSubscriber.reset(); _topicSubscriber.reset();
_server.reset();
} }
bool ROSBridge::ROSBridge::connected() bool ROSBridge::ROSBridge::connected()
{ {
std::lock_guard<std::mutex> lk(_rbcMutex);
return _rbc.connected(); return _rbc.connected();
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment