Commit 07a48251 authored by Valentin Platzgummer's avatar Valentin Platzgummer

Not advertised topic issue solved (needed one client per topic)....

Not advertised topic issue solved (needed one client per topic). ROSBridge::subscribe is very slow (300ms).
parent d47dc9a9
......@@ -27,6 +27,8 @@ QGCROOT = $$PWD
DebugBuild {
DESTDIR = $${OUT_PWD}/debug
#DEFINES += DEBUG
#DEFINES += ROS_BRIDGE_CLIENT_DEBUG
}
else {
DESTDIR = $${OUT_PWD}/release
......
......@@ -75,7 +75,8 @@ WimaController::WimaController(QObject *parent)
, _snakeLineDistance (settingsGroup, _metaDataMap[snakeLineDistanceName])
, _snakeMinTransectLength (settingsGroup, _metaDataMap[snakeMinTransectLengthName])
, _nemoHeartbeat (0 /*status: not connected*/)
, _fallbackStatus (0 /*status: not connected*/)
, _fallbackStatus (0 /*status: not connected*/)
, _bridgeSetupDone (false)
, _pRosBridge (new ROSBridge::ROSBridge())
{
// Set up facts.
......@@ -692,8 +693,7 @@ void WimaController::_checkBatteryLevel()
void WimaController::_eventTimerHandler()
{
static EventTicker batteryLevelTicker(EVENT_TIMER_INTERVAL, CHECK_BATTERY_INTERVAL);
static EventTicker snakeEventLoopTicker(EVENT_TIMER_INTERVAL, SNAKE_EVENT_LOOP_INTERVAL);
static EventTicker rosBridgeTicker(EVENT_TIMER_INTERVAL, 1000);
static EventTicker snakeTicker(EVENT_TIMER_INTERVAL, SNAKE_EVENT_LOOP_INTERVAL);
static EventTicker nemoStatusTicker(EVENT_TIMER_INTERVAL, 5000);
// Battery level check necessary?
......@@ -713,37 +713,83 @@ void WimaController::_eventTimerHandler()
emit WimaController::nemoStatusStringChanged();
}
if (_enableSnake.rawValue().toBool() && rosBridgeTicker.ready()) {
if ( snakeTicker.ready() ) {
if ( _enableSnake.rawValue().toBool()
&& _pRosBridge->connected() ) {
if ( !_bridgeSetupDone ) {
qWarning() << "ROS Bridge setup. ";
auto start = std::chrono::high_resolution_clock::now();
_pRosBridge->start();
auto end = std::chrono::high_resolution_clock::now();
qWarning() << "_pRosBridge->start() time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms";
start = std::chrono::high_resolution_clock::now();
_pRosBridge->publish(_snakeOrigin, "/snake/origin");
end = std::chrono::high_resolution_clock::now();
qWarning() << "_pRosBridge->publish(_snakeOrigin, \"/snake/origin\") time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms";
start = std::chrono::high_resolution_clock::now();
_pRosBridge->publish(_snakeTilesLocal, "/snake/tiles");
end = std::chrono::high_resolution_clock::now();
qWarning() << "_pRosBridge->publish(_snakeTilesLocal, \"/snake/tiles\") time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms";
start = std::chrono::high_resolution_clock::now();
_pRosBridge->subscribe("/nemo/progress",
/* callback */ [this](JsonDocUPtr pDoc){
int requiredSize = this->_snakeTilesLocal.polygons().size();
auto& progress = this->_nemoProgress;
if ( !this->_pRosBridge->casePacker()->unpack(pDoc, progress)
|| progress.progress().size() != requiredSize ) {
progress.progress().fill(0, requiredSize);
}
emit WimaController::nemoProgressChanged();
});
end = std::chrono::high_resolution_clock::now();
qWarning() << "_pRosBridge->subscribe(\"/nemo/progress\" time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms";
start = std::chrono::high_resolution_clock::now();
_pRosBridge->subscribe("/nemo/heartbeat",
/* callback */ [this, &nemoStatusTicker](JsonDocUPtr pDoc){
if ( !this->_pRosBridge->casePacker()->unpack(pDoc, this->_nemoHeartbeat) ) {
if ( this->_nemoHeartbeat.status() == this->_fallbackStatus )
return;
this->_nemoHeartbeat.setStatus(this->_fallbackStatus);
}
nemoStatusTicker.reset();
this->_fallbackStatus = -1; /*Timeout*/
emit WimaController::nemoStatusChanged();
emit WimaController::nemoStatusStringChanged();
});
end = std::chrono::high_resolution_clock::now();
qWarning() << "_pRosBridge->subscribe(\"/nemo/heartbeat\" time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms";
_bridgeSetupDone = true;
}
} else if ( _bridgeSetupDone) {
_pRosBridge->reset();
_bridgeSetupDone = false;
}
qWarning() << "Connectable: " << _pRosBridge->connected() << "\n";
//qWarning() << "Connectable: " << _pRosBridge->connected() << "\n";
auto start = std::chrono::high_resolution_clock::now();
_pRosBridge->publish(_snakeOrigin, "/snake/origin");
_pRosBridge->publish(_snakeTilesLocal, "/snake/tiles");
//auto start = std::chrono::high_resolution_clock::now();
// using namespace std::placeholders;
// auto progressCallBack = std::bind(&WimaController::_progressFromJson,
// this,
// _1,
// std::ref(_nemoProgress));
// _pRosBridge->subscribe("/nemo/progress", progressCallBack);
// _pRosBridge->subscribe("/nemo/heartbeat", [this, &nemoStatusTicker](JsonDocUPtr pDoc){
// if ( !this->_pRosBridge->casePacker()->unpack(pDoc, this->_nemoHeartbeat) ) {
// if ( this->_nemoHeartbeat.status() == this->_fallbackStatus )
// return;
// this->_nemoHeartbeat.setStatus(this->_fallbackStatus);
// }
// nemoStatusTicker.reset();
// this->_fallbackStatus = -1; /*Timeout*/
// emit WimaController::nemoStatusChanged();
// emit WimaController::nemoStatusStringChanged();
// });
auto end = std::chrono::high_resolution_clock::now();
qWarning() << "Duration: " << std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms\n";
//auto end = std::chrono::high_resolution_clock::now();
//qWarning() << "Duration: " << std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
//<< " ms\n";
}
}
......@@ -937,16 +983,3 @@ void WimaController::_switchSnakeManager(QVariant variant)
_switchWaypointManager(_defaultManager);
}
}
void WimaController::_progressFromJson(JsonDocUPtr pDoc,
QNemoProgress &progress)
{
int requiredSize = _snakeTilesLocal.polygons().size();
if ( !_pRosBridge->casePacker()->unpack(pDoc, progress)
|| progress.progress().size() != requiredSize ) {
progress.progress().fill(0, requiredSize);
}
emit WimaController::nemoProgressChanged();
}
......@@ -46,7 +46,7 @@
#define SMART_RTL_MAX_ATTEMPTS 3 // times
#define SMART_RTL_ATTEMPT_INTERVAL 200 // ms
#define EVENT_TIMER_INTERVAL 50 // ms
#define SNAKE_EVENT_LOOP_INTERVAL 5000 // ms
#define SNAKE_EVENT_LOOP_INTERVAL 1000 // ms
using namespace snake;
......@@ -351,9 +351,6 @@ private slots:
private:
using StatusMap = std::map<int, QString>;
// Snake.
void _progressFromJson (JsonDocUPtr pDoc,
QNemoProgress &progress);
// Controllers.
PlanMasterController *_masterController;
......@@ -418,6 +415,7 @@ private:
QNemoHeartbeat _nemoHeartbeat; // measurement progress
int _fallbackStatus;
ROSBridgePtr _pRosBridge;
bool _bridgeSetupDone;
static StatusMap _nemoStatusMap;
// Periodic tasks.
......
......@@ -5,7 +5,7 @@
#include <memory>
#include <deque>
#include <set>
#include <unordered_map>
namespace ROSBridge {
namespace ComPrivate {
......@@ -15,7 +15,8 @@ typedef rapidjson::Document JsonDoc;
typedef std::unique_ptr<JsonDoc> JsonDocUPtr;
typedef std::deque<JsonDocUPtr> JsonQueue;
typedef std::size_t HashType;
typedef std::set<HashType> HashSet;
using ClientMap = std::unordered_map<HashType, std::string>;
static const char* _topicAdvertiserKey = "topic_advertiser";
static const char* _topicPublisherKey = "topic_publisher";
......
......@@ -31,13 +31,13 @@ class RosbridgeWsClient
{
if (!client->on_open)
{
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
client->on_open = [client_name, message](std::shared_ptr<WsClient::Connection> connection) {
#else
client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
#endif
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << client_name << ": Opened connection" << std::endl;
std::cout << client_name << ": Sending message: " << message << std::endl;
#endif
......@@ -45,7 +45,7 @@ class RosbridgeWsClient
};
}
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
if (!client->on_message)
{
client->on_message = [client_name](std::shared_ptr<WsClient::Connection> /*connection*/, std::shared_ptr<WsClient::InMessage> in_message) {
......@@ -69,14 +69,14 @@ class RosbridgeWsClient
}
#endif
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
std::thread client_thread([client_name, client]() {
#else
std::thread client_thread([client]() {
#endif
client->start();
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << client_name << ": Terminated" << std::endl;
#endif
client->on_open = NULL;
......@@ -141,7 +141,7 @@ public:
{
client_map[client_name] = std::make_shared<WsClient>(server_port_path);
}
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
else
{
std::cerr << client_name << " has already been created" << std::endl;
......@@ -169,11 +169,11 @@ public:
it->second->on_message = NULL;
it->second->on_close = NULL;
it->second->on_error = NULL;
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << client_name << " has been stopped" << std::endl;
#endif
}
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
else
{
std::cerr << client_name << " has not been created" << std::endl;
......@@ -189,11 +189,11 @@ public:
it->second->stop();
it->second.reset();
client_map.erase(it);
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << client_name << " has been removed" << std::endl;
#endif
}
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
else
{
std::cerr << client_name << " has not been created" << std::endl;
......@@ -202,6 +202,7 @@ public:
}
// Gets the client from client_map and starts it. Advertising is essentially sending a message.
// One client per topic!
void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "")
{
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
......@@ -217,7 +218,7 @@ public:
start(client_name, it->second, message);
}
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
else
{
std::cerr << client_name << "has not been created" << std::endl;
......@@ -242,7 +243,7 @@ public:
std::shared_ptr<WsClient> publish_client = std::make_shared<WsClient>(server_port_path);
publish_client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << "publish_client: Opened connection" << std::endl;
std::cout << "publish_client: Sending message: " << message << std::endl;
#endif
......@@ -291,7 +292,7 @@ public:
it->second->on_message = callback;
start(client_name, it->second, message);
}
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
else
{
std::cerr << client_name << "has not been created" << std::endl;
......@@ -309,7 +310,7 @@ public:
it->second->on_message = callback;
start(client_name, it->second, message);
}
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
else
{
std::cerr << client_name << "has not been created" << std::endl;
......@@ -336,7 +337,7 @@ public:
std::shared_ptr<WsClient> service_response_client = std::make_shared<WsClient>(server_port_path);
service_response_client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << "service_response_client: Opened connection" << std::endl;
std::cout << "service_response_client: Sending message: " << message << std::endl;
#endif
......@@ -383,7 +384,7 @@ public:
else
{
call_service_client->on_message = [](std::shared_ptr<WsClient::Connection> connection, std::shared_ptr<WsClient::InMessage> in_message) {
#ifdef DEBUG
#ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << "call_service_client: Message received: " << in_message->string() << std::endl;
std::cout << "call_service_client: Sending close connection" << std::endl;
#endif
......
......@@ -11,7 +11,6 @@ struct ROSBridge::ComPrivate::ThreadData
std::mutex &rbcMutex;
ROSBridge::ComPrivate::JsonQueue &queue;
std::mutex &queueMutex;
ROSBridge::ComPrivate::HashSet &advertisedTopicsHashList;
const std::atomic<bool> &running;
std::condition_variable &cv;
};
......@@ -43,7 +42,6 @@ void ROSBridge::ComPrivate::TopicPublisher::start()
*_rbcMutex,
_queue,
_queueMutex,
_advertisedTopicsHashList,
_running,
_cv
};
......@@ -60,17 +58,17 @@ void ROSBridge::ComPrivate::TopicPublisher::reset()
return;
_pThread->join();
_queue.clear();
_advertisedTopicsHashList.clear();
}
void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data)
{
// Init.
{
std::lock_guard<std::mutex> lk(data.rbcMutex);
data.rbc.addClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
data.rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey);
}
ClientMap clientMap;
// {
// std::lock_guard<std::mutex> lk(data.rbcMutex);
// data.rbc.addClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
// data.rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey);
// }
// Main Loop.
while(data.running.load()){
std::unique_lock<std::mutex> lk(data.queueMutex);
......@@ -94,15 +92,19 @@ void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data
// Check if topic must be advertised.
// Advertised topics are stored in advertisedTopicsHashList as
// a hash.
HashType hash = ROSBridge::ComPrivate::getHash(tag.topic());
if ( data.advertisedTopicsHashList.count(hash) == 0) {
data.advertisedTopicsHashList.insert(hash);
std::string clientName = ROSBridge::ComPrivate::_topicAdvertiserKey
+ tag.topic();
HashType hash = ROSBridge::ComPrivate::getHash(clientName);
auto it = clientMap.find(hash);
if ( it == clientMap.end()) { // Need to advertise topic?
clientMap.insert(std::make_pair(hash, clientName));
std::cout << clientName << ";"
<< tag.topic() << ";"
<< tag.messageType() << ";" << std::endl;
{
std::cout << ROSBridge::ComPrivate::_topicAdvertiserKey << ";"
<< tag.topic() << ";"
<< tag.messageType() << ";" << std::endl;
std::lock_guard<std::mutex> lk(data.rbcMutex);
data.rbc.advertise(ROSBridge::ComPrivate::_topicAdvertiserKey,
data.rbc.addClient(clientName);
data.rbc.advertise(clientName,
tag.topic(),
tag.messageType() );
}
......@@ -118,8 +120,8 @@ void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data
// Tidy up.
{
std::lock_guard<std::mutex> lk(data.rbcMutex);
data.rbc.removeClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
data.rbc.removeClient(ROSBridge::ComPrivate::_topicPublisherKey);
for (auto& it : clientMap)
data.rbc.removeClient(it.second);
}
}
......@@ -57,8 +57,6 @@ private:
CasePacker *_casePacker;
RosbridgeWsClient *_rbc;
std::mutex *_rbcMutex;
HashSet _advertisedTopicsHashList; // Not thread save! This container
// is manipulated by transmittLoop only!
CondVar _cv;
ThreadPtr _pThread;
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment