Commit 4f76dc12 authored by Valentin Platzgummer's avatar Valentin Platzgummer

cannot infere topic of type... issue solved

parent be175073
...@@ -27,7 +27,7 @@ QGCROOT = $$PWD ...@@ -27,7 +27,7 @@ QGCROOT = $$PWD
DebugBuild { DebugBuild {
DESTDIR = $${OUT_PWD}/debug DESTDIR = $${OUT_PWD}/debug
#DEFINES += DEBUG DEFINES += DEBUG
#DEFINES += ROS_BRIDGE_CLIENT_DEBUG #DEFINES += ROS_BRIDGE_CLIENT_DEBUG
} }
else { else {
...@@ -50,6 +50,7 @@ MacBuild { ...@@ -50,6 +50,7 @@ MacBuild {
LinuxBuild { LinuxBuild {
CONFIG += qesp_linux_udev CONFIG += qesp_linux_udev
CONFIG += ccache # improves build time
} }
WindowsBuild { WindowsBuild {
......
...@@ -43,16 +43,15 @@ void Slicer::_updateIdx(long size) ...@@ -43,16 +43,15 @@ void Slicer::_updateIdx(long size)
{ {
_overlap = _overlap < _N ? _overlap : _N-1; _overlap = _overlap < _N ? _overlap : _N-1;
long maxStart = size-_N; _idxStart = _idxStart < size ? _idxStart : size-1;
_idxStart = _idxStart <= maxStart ? _idxStart : maxStart; _idxStart = _idxStart < 0 ? 0 : _idxStart;
_idxStart = _idxStart < 0 ? 0 : _idxStart;
_idxEnd = _idxStart + _N - 1; _idxEnd = _idxStart + _N - 1;
_idxEnd = _idxEnd < size ? _idxEnd : size-1; _idxEnd = _idxEnd < size ? _idxEnd : size-1;
_idxNext = _idxEnd + 1 - _overlap; _idxNext = _idxEnd == size -1 ? _idxStart : _idxEnd + 1 - _overlap;
_idxNext = _idxNext < 0 ? 0 : _idxNext; _idxNext = _idxNext < 0 ? 0 : _idxNext;
_idxNext = _idxNext < size ? _idxNext : size-1; _idxNext = _idxNext < size ? _idxNext : size-1;
_idxPrevious = _idxStart + _overlap - _N; _idxPrevious = _idxStart + _overlap - _N;
_idxPrevious = _idxPrevious < 0 ? 0 : _idxPrevious; _idxPrevious = _idxPrevious < 0 ? 0 : _idxPrevious;
......
...@@ -80,9 +80,11 @@ WimaController::WimaController(QObject *parent) ...@@ -80,9 +80,11 @@ WimaController::WimaController(QObject *parent)
, _snakeMinTransectLength (settingsGroup, _metaDataMap[snakeMinTransectLengthName]) , _snakeMinTransectLength (settingsGroup, _metaDataMap[snakeMinTransectLengthName])
, _nemoHeartbeat (0 /*status: not connected*/) , _nemoHeartbeat (0 /*status: not connected*/)
, _fallbackStatus (0 /*status: not connected*/) , _fallbackStatus (0 /*status: not connected*/)
, _bridgeSetupDone (false)
, _pRosBridge (new ROSBridge::ROSBridge()) , _pRosBridge (new ROSBridge::ROSBridge())
, _pCasePacker (_pRosBridge->casePacker()) , _pCasePacker (_pRosBridge->casePacker())
, _batteryLevelTicker (EVENT_TIMER_INTERVAL, 1000 /*ms*/)
, _snakeTicker (EVENT_TIMER_INTERVAL, 200 /*ms*/)
, _nemoTimeoutTicker (EVENT_TIMER_INTERVAL, 5000 /*ms*/)
{ {
// Set up facts. // Set up facts.
_showAllMissionItems.setRawValue(true); _showAllMissionItems.setRawValue(true);
...@@ -120,8 +122,6 @@ WimaController::WimaController(QObject *parent) ...@@ -120,8 +122,6 @@ WimaController::WimaController(QObject *parent)
connect(this, &QObject::destroyed, &this->_snakeWorker, &SnakeWorker::quit); connect(this, &QObject::destroyed, &this->_snakeWorker, &SnakeWorker::quit);
// Snake. // Snake.
connect(&_enableSnake, &Fact::rawValueChanged, this, &WimaController::_startStopRosBridge);
_startStopRosBridge();
connect(&_enableSnake, &Fact::rawValueChanged, this, &WimaController::_initStartSnakeWorker); connect(&_enableSnake, &Fact::rawValueChanged, this, &WimaController::_initStartSnakeWorker);
_initStartSnakeWorker(); _initStartSnakeWorker();
connect(&_enableSnake, &Fact::rawValueChanged, this, &WimaController::_switchSnakeManager); connect(&_enableSnake, &Fact::rawValueChanged, this, &WimaController::_switchSnakeManager);
...@@ -701,13 +701,9 @@ void WimaController::_checkBatteryLevel() ...@@ -701,13 +701,9 @@ void WimaController::_checkBatteryLevel()
void WimaController::_eventTimerHandler() void WimaController::_eventTimerHandler()
{ {
static EventTicker batteryLevelTicker(EVENT_TIMER_INTERVAL, CHECK_BATTERY_INTERVAL);
static EventTicker snakeTicker(EVENT_TIMER_INTERVAL, SNAKE_EVENT_LOOP_INTERVAL);
static EventTicker nemoStatusTicker(EVENT_TIMER_INTERVAL, 5000);
// Battery level check necessary? // Battery level check necessary?
Fact *enableLowBatteryHandling = qgcApp()->toolbox()->settingsManager()->wimaSettings()->enableLowBatteryHandling(); Fact *enableLowBatteryHandling = qgcApp()->toolbox()->settingsManager()->wimaSettings()->enableLowBatteryHandling();
if ( enableLowBatteryHandling->rawValue().toBool() && batteryLevelTicker.ready() ) if ( enableLowBatteryHandling->rawValue().toBool() && _batteryLevelTicker.ready() )
_checkBatteryLevel(); _checkBatteryLevel();
// Snake flight plan update necessary? // Snake flight plan update necessary?
...@@ -716,78 +712,25 @@ void WimaController::_eventTimerHandler() ...@@ -716,78 +712,25 @@ void WimaController::_eventTimerHandler()
// } // }
// } // }
if ( nemoStatusTicker.ready() ) { if ( _nemoTimeoutTicker.ready() && _enableSnake.rawValue().toBool() ) {
this->_nemoHeartbeat.setStatus(_fallbackStatus); this->_nemoHeartbeat.setStatus(_fallbackStatus);
emit WimaController::nemoStatusChanged(); emit WimaController::nemoStatusChanged();
emit WimaController::nemoStatusStringChanged(); emit WimaController::nemoStatusStringChanged();
} }
if ( snakeTicker.ready() ) { if ( _snakeTicker.ready() ) {
if ( _enableSnake.rawValue().toBool() if ( _enableSnake.rawValue().toBool() ) {
&& _pRosBridge->connected() ) { if ( !_pRosBridge->isRunning() && _pRosBridge->ping() ) {
if ( !_bridgeSetupDone ) { _setupTopicService();
}
_pRosBridge->start();
_pRosBridge->publish(_snakeOrigin, "/snake/origin"); if ( !_pRosBridge->ping() ){
_pRosBridge->publish(_snakeTilesLocal, "/snake/tiles"); _pRosBridge->reset();
_pRosBridge->subscribe("/nemo/progress",
/* callback */ [this](JsonDocUPtr pDoc){
int requiredSize = this->_snakeTilesLocal.polygons().size();
auto& progress = this->_nemoProgress;
if ( !this->_pRosBridge->casePacker()->unpack(pDoc, progress)
|| progress.progress().size() != requiredSize ) {
progress.progress().fill(0, requiredSize);
}
emit WimaController::nemoProgressChanged();
});
_pRosBridge->subscribe("/nemo/heartbeat",
/* callback */ [this, &nemoStatusTicker](JsonDocUPtr pDoc){
if ( !this->_pRosBridge->casePacker()->unpack(pDoc, this->_nemoHeartbeat) ) {
if ( this->_nemoHeartbeat.status() == this->_fallbackStatus )
return;
this->_nemoHeartbeat.setStatus(this->_fallbackStatus);
}
nemoStatusTicker.reset();
this->_fallbackStatus = -1; /*Timeout*/
emit WimaController::nemoStatusChanged();
emit WimaController::nemoStatusStringChanged();
});
auto pOrigin = &_snakeOrigin;
auto casePacker = _pCasePacker;
_pRosBridge->advertiseService("/snake/get_origin", "snake_msgs/GetOrigin",
[casePacker, pOrigin](JsonDocUPtr) -> JsonDocUPtr {
JsonDocUPtr pDoc = casePacker->pack(*pOrigin, "");
casePacker->removeTag(pDoc);
rapidjson::Document value(rapidjson::kObjectType);
JsonDocUPtr pReturn(new rapidjson::Document(rapidjson::kObjectType));
value.CopyFrom(*pDoc, pReturn->GetAllocator());
pReturn->AddMember("origin", value, pReturn->GetAllocator());
return pReturn;
});
auto pTiles = &_snakeTilesLocal;
_pRosBridge->advertiseService("/snake/get_tiles", "snake_msgs/GetTiles",
[casePacker, pTiles](JsonDocUPtr) -> JsonDocUPtr {
JsonDocUPtr pDoc = casePacker->pack(*pTiles, "");
casePacker->removeTag(pDoc);
rapidjson::Document value(rapidjson::kObjectType);
JsonDocUPtr pReturn(new rapidjson::Document(rapidjson::kObjectType));
value.CopyFrom(*pDoc, pReturn->GetAllocator());
pReturn->AddMember("tiles", value, pReturn->GetAllocator());
return pReturn;
});
_bridgeSetupDone = true;
} }
} else if ( _bridgeSetupDone) {
_pRosBridge->reset(); } else {
_bridgeSetupDone = false; if ( _pRosBridge->isRunning() )
_pRosBridge->reset();
} }
} }
} }
...@@ -933,15 +876,6 @@ void WimaController::_snakeStoreWorkerResults() ...@@ -933,15 +876,6 @@ void WimaController::_snakeStoreWorkerResults()
qWarning() << "WimaController::_snakeStoreWorkerResults execution time: " << duration << " ms."; qWarning() << "WimaController::_snakeStoreWorkerResults execution time: " << duration << " ms.";
} }
void WimaController::_startStopRosBridge()
{
if ( _enableSnake.rawValue().toBool() ) {
_pRosBridge->start();
} else {
_pRosBridge->reset();
}
}
void WimaController::_initStartSnakeWorker() void WimaController::_initStartSnakeWorker()
{ {
if ( !_enableSnake.rawValue().toBool() ) if ( !_enableSnake.rawValue().toBool() )
...@@ -971,3 +905,63 @@ void WimaController::_switchSnakeManager(QVariant variant) ...@@ -971,3 +905,63 @@ void WimaController::_switchSnakeManager(QVariant variant)
_switchWaypointManager(_defaultManager); _switchWaypointManager(_defaultManager);
} }
} }
void WimaController::_setupTopicService()
{
_pRosBridge->start();
_pRosBridge->publish(_snakeOrigin, "/snake/origin");
_pRosBridge->publish(_snakeTilesLocal, "/snake/tiles");
_pRosBridge->subscribe("/nemo/progress",
/* callback */ [this](JsonDocUPtr pDoc){
int requiredSize = this->_snakeTilesLocal.polygons().size();
auto& progress = this->_nemoProgress;
if ( !this->_pRosBridge->casePacker()->unpack(pDoc, progress)
|| progress.progress().size() != requiredSize ) {
progress.progress().fill(0, requiredSize);
}
emit WimaController::nemoProgressChanged();
});
_pRosBridge->subscribe("/nemo/heartbeat",
/* callback */ [this](JsonDocUPtr pDoc){
if ( !this->_pRosBridge->casePacker()->unpack(pDoc, this->_nemoHeartbeat) ) {
if ( this->_nemoHeartbeat.status() == this->_fallbackStatus )
return;
this->_nemoHeartbeat.setStatus(this->_fallbackStatus);
}
this->_nemoTimeoutTicker.reset();
this->_fallbackStatus = -1; /*Timeout*/
emit WimaController::nemoStatusChanged();
emit WimaController::nemoStatusStringChanged();
});
auto pOrigin = &_snakeOrigin;
auto casePacker = _pCasePacker;
_pRosBridge->advertiseService("/snake/get_origin", "snake_msgs/GetOrigin",
[casePacker, pOrigin](JsonDocUPtr) -> JsonDocUPtr {
JsonDocUPtr pDoc = casePacker->pack(*pOrigin, "");
casePacker->removeTag(pDoc);
rapidjson::Document value(rapidjson::kObjectType);
JsonDocUPtr pReturn(new rapidjson::Document(rapidjson::kObjectType));
value.CopyFrom(*pDoc, pReturn->GetAllocator());
pReturn->AddMember("origin", value, pReturn->GetAllocator());
return pReturn;
});
auto pTiles = &_snakeTilesLocal;
_pRosBridge->advertiseService("/snake/get_tiles", "snake_msgs/GetTiles",
[casePacker, pTiles](JsonDocUPtr) -> JsonDocUPtr {
JsonDocUPtr pDoc = casePacker->pack(*pTiles, "");
casePacker->removeTag(pDoc);
rapidjson::Document value(rapidjson::kObjectType);
JsonDocUPtr pReturn(new rapidjson::Document(rapidjson::kObjectType));
value.CopyFrom(*pDoc, pReturn->GetAllocator());
pReturn->AddMember("tiles", value, pReturn->GetAllocator());
return pReturn;
});
}
...@@ -43,11 +43,9 @@ ...@@ -43,11 +43,9 @@
#include <map> #include <map>
#define CHECK_BATTERY_INTERVAL 1000 // ms
#define SMART_RTL_MAX_ATTEMPTS 3 // times #define SMART_RTL_MAX_ATTEMPTS 3 // times
#define SMART_RTL_ATTEMPT_INTERVAL 200 // ms #define SMART_RTL_ATTEMPT_INTERVAL 200 // ms
#define EVENT_TIMER_INTERVAL 50 // ms #define EVENT_TIMER_INTERVAL 50 // ms
#define SNAKE_EVENT_LOOP_INTERVAL 1000 // ms
using namespace snake; using namespace snake;
...@@ -340,9 +338,9 @@ private slots: ...@@ -340,9 +338,9 @@ private slots:
// Snake. // Snake.
void _setSnakeCalcInProgress (bool inProgress); void _setSnakeCalcInProgress (bool inProgress);
void _snakeStoreWorkerResults (); void _snakeStoreWorkerResults ();
void _startStopRosBridge ();
void _initStartSnakeWorker (); void _initStartSnakeWorker ();
void _switchSnakeManager (QVariant variant); void _switchSnakeManager (QVariant variant);
void _setupTopicService ();
// Periodic tasks. // Periodic tasks.
void _eventTimerHandler (void); void _eventTimerHandler (void);
// Waypoint manager handling. // Waypoint manager handling.
...@@ -415,11 +413,13 @@ private: ...@@ -415,11 +413,13 @@ private:
int _fallbackStatus; int _fallbackStatus;
ROSBridgePtr _pRosBridge; ROSBridgePtr _pRosBridge;
const CasePacker *_pCasePacker; const CasePacker *_pCasePacker;
bool _bridgeSetupDone;
static StatusMap _nemoStatusMap; static StatusMap _nemoStatusMap;
// Periodic tasks. // Periodic tasks.
QTimer _eventTimer; QTimer _eventTimer;
EventTicker _batteryLevelTicker;
EventTicker _snakeTicker;
EventTicker _nemoTimeoutTicker;
}; };
......
...@@ -40,9 +40,12 @@ public: ...@@ -40,9 +40,12 @@ public:
void reset(); void reset();
//! @return Returns true if connected to the rosbridge_server, false either. //! @return Returns true if connected to the rosbridge_server, false either.
//! @note This function can block up to 100ms. However normal execution time is smaller. //! @note This function can block up to 100ms. However normal execution time is smaller.
bool connected(); bool ping();
bool isRunning();
private: private:
bool _running;
TypeFactory _typeFactory; TypeFactory _typeFactory;
JsonFactory _jsonFactory; JsonFactory _jsonFactory;
CasePacker _casePacker; CasePacker _casePacker;
......
...@@ -16,15 +16,36 @@ ...@@ -16,15 +16,36 @@
#include <thread> #include <thread>
#include <future> #include <future>
#include <mutex> #include <mutex>
#include <tuple>
#include <deque>
using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>; using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>;
using InMessage = std::function<void(std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::InMessage>)>; using InMessage = std::function<void(std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::InMessage>)>;
template <typename T>
constexpr typename std::underlying_type<T>::type integral(T value)
{
return static_cast<typename std::underlying_type<T>::type>(value);
}
class RosbridgeWsClient class RosbridgeWsClient
{ {
using TopicData = std::tuple<std::string /*topic*/,
std::string /*client name*/,
std::shared_ptr<WsClient> /*client*/,
std::shared_ptr<std::atomic_bool> /*topic ready*/>;
enum class TopicEnum{
TopicName = 0,
ClientName = 1,
Client = 2,
Ready = 3
};
std::string server_port_path; std::string server_port_path;
std::unordered_map<std::string, std::shared_ptr<WsClient>> client_map; std::unordered_map<std::string /*client name*/,
std::mutex map_mutex; std::shared_ptr<WsClient> /*client*/> client_map;
std::deque<TopicData> advertised_topics_list;
std::mutex mutex;
void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message) void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message)
{ {
...@@ -66,8 +87,8 @@ class RosbridgeWsClient ...@@ -66,8 +87,8 @@ class RosbridgeWsClient
std::cout << client_name << ": Error: " << ec << ", error message: " << ec.message() << std::endl; std::cout << client_name << ": Error: " << ec << ", error message: " << ec.message() << std::endl;
}; };
} }
#endif
#endif
#ifdef DEBUG #ifdef DEBUG
std::thread client_thread([client_name, client]() { std::thread client_thread([client_name, client]() {
#else #else
...@@ -95,11 +116,13 @@ public: ...@@ -95,11 +116,13 @@ public:
~RosbridgeWsClient() ~RosbridgeWsClient()
{ {
std::lock_guard<std::mutex> lk(map_mutex); // neccessary? for (auto& topicData : advertised_topics_list){
unadvertise(std::get<integral(TopicEnum::ClientName)>(topicData),
std::get<integral(TopicEnum::TopicName)>(topicData));
}
for (auto& client : client_map) for (auto& client : client_map)
{ {
client.second->stop(); removeClient(client.first);
client.second.reset();
} }
} }
...@@ -133,7 +156,7 @@ public: ...@@ -133,7 +156,7 @@ public:
// Adds a client to the client_map. // Adds a client to the client_map.
void addClient(const std::string& client_name) void addClient(const std::string& client_name)
{ {
std::lock_guard<std::mutex> lk(map_mutex); std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it == client_map.end()) if (it == client_map.end())
{ {
...@@ -149,7 +172,7 @@ public: ...@@ -149,7 +172,7 @@ public:
std::shared_ptr<WsClient> getClient(const std::string& client_name) std::shared_ptr<WsClient> getClient(const std::string& client_name)
{ {
std::lock_guard<std::mutex> lk(map_mutex); std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end()) if (it != client_map.end())
{ {
...@@ -160,7 +183,7 @@ public: ...@@ -160,7 +183,7 @@ public:
void stopClient(const std::string& client_name) void stopClient(const std::string& client_name)
{ {
std::lock_guard<std::mutex> lk(map_mutex); std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end()) if (it != client_map.end())
{ {
...@@ -183,23 +206,26 @@ public: ...@@ -183,23 +206,26 @@ public:
void removeClient(const std::string& client_name) void removeClient(const std::string& client_name)
{ {
std::lock_guard<std::mutex> lk(map_mutex); std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end()) if (it != client_map.end())
{ {
// Stop the client asynchronously in 100 ms. // Stop the client asynchronously in 100 ms.
// This is to ensure, that all threads involving the client have been launched. // This is to ensure, that all threads involving the client have been launched.
std::thread t([](std::shared_ptr<WsClient> client){ std::shared_ptr<WsClient> client = it->second;
std::this_thread::sleep_for(std::chrono::milliseconds(100)); #ifdef DEBUG
client->stop(); std::thread t([client, client_name](){
client.reset(); #else
}, std::thread t([client](){
it->second /*lambda param*/ ); #endif
client_map.erase(it); std::this_thread::sleep_for(std::chrono::milliseconds(100));
t.detach(); client->stop();
#ifdef DEBUG #ifdef DEBUG
std::cout << client_name << " has been removed" << std::endl; std::cout << client_name << " has been removed" << std::endl;
#endif #endif
});
client_map.erase(it);
t.detach();
} }
#ifdef DEBUG #ifdef DEBUG
else else
...@@ -213,19 +239,51 @@ public: ...@@ -213,19 +239,51 @@ public:
// One client per topic! // One client per topic!
void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "") void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "")
{ {
std::lock_guard<std::mutex> lk(map_mutex); std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it_client = client_map.find(client_name);
if (it != client_map.end()) if (it_client != client_map.end())
{ {
std::string message = "\"op\":\"advertise\", \"topic\":\"" + topic + "\", \"type\":\"" + type + "\""; auto it_topic = std::find_if(advertised_topics_list.begin(),
advertised_topics_list.end(),
[topic](const TopicData &td){
return topic == std::get<integral(TopicEnum::TopicName)>(td);
});
if ( it_topic != advertised_topics_list.end()){
#ifdef DEBUG
std::cerr << "topic: " << topic << " already advertised" << std::endl;
#endif
return;
}
auto client = it_client->second;
auto ready = std::make_shared<std::atomic_bool>(false);
advertised_topics_list.push_back(std::make_tuple(topic, client_name, client, ready));
std::string message = "\"op\":\"advertise\", \"topic\":\"" + topic + "\", \"type\":\"" + type + "\"";
if (id.compare("") != 0) if (id.compare("") != 0)
{ {
message += ", \"id\":\"" + id + "\""; message += ", \"id\":\"" + id + "\"";
} }
message = "{" + message + "}"; message = "{" + message + "}";
start(client_name, it->second, message); #ifdef DEBUG
client->on_open = [client_name, message, ready](std::shared_ptr<WsClient::Connection> connection) {
#else
client->on_open = [message, ready](std::shared_ptr<WsClient::Connection> connection) {
#endif
#ifdef DEBUG
std::cout << client_name << ": Opened connection" << std::endl;
std::cout << client_name << ": Sending message: " << message << std::endl;
#endif
connection->send(message);
// Wait for rosbridge_server to process the request and mark topic as "ready".
// This could be avoided by waiting for a status message. However, at the time of
// writing rosbridge_server status messages are still experimental.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
ready->store(true);
};
start(client_name, client, message);
} }
#ifdef DEBUG #ifdef DEBUG
else else
...@@ -235,12 +293,83 @@ public: ...@@ -235,12 +293,83 @@ public:
#endif #endif
} }
void unadvertise(const std::string& client_name,
const std::string& topic,
const std::string& id = ""){
std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it_client = client_map.find(client_name);
if (it_client != client_map.end())
{
// Topic advertised?
auto it_topic = std::find_if(advertised_topics_list.begin(),
advertised_topics_list.end(),
[topic](const TopicData &td){
return topic == std::get<integral(TopicEnum::TopicName)>(td);
});
if ( it_topic == advertised_topics_list.end()){
#ifdef DEBUG
std::cerr << "topic: " << topic << " not advertised" << std::endl;
#endif
return;
}
std::string message = "\"op\":\"unadvertise\"";
if (id.compare("") != 0)
{
message += ", \"id\":\"" + id + "\"";
}
message += ", \"topic\":\"" + topic + "\"";
message = "{" + message + "}";
auto client = it_client->second;
auto ready = std::get<integral(TopicEnum::Ready)>(*it_topic);
#ifdef DEBUG
client->on_open = [client_name, message, ready](std::shared_ptr<WsClient::Connection> connection) {
#else
client->on_open = [message, ready](std::shared_ptr<WsClient::Connection> connection) {
#endif
#ifdef DEBUG
std::cout << client_name << ": Opened connection" << std::endl;
std::cout << client_name << ": Sending message: " << message << std::endl;
#endif
while ( !ready->load() ){ // Wait for topic to be advertised.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
connection->send(message);
};
start(client_name, client, message);
advertised_topics_list.erase(it_topic);
}
#ifdef DEBUG
else
{
std::cerr << client_name << "has not been created" << std::endl;
}
#endif
}
void publish(const std::string& topic, const rapidjson::Document& msg, const std::string& id = "") void publish(const std::string& topic, const rapidjson::Document& msg, const std::string& id = "")
{ {
std::lock_guard<std::mutex> lk(mutex);
auto it_topic = std::find_if(advertised_topics_list.begin(),
advertised_topics_list.end(),
[&topic](const TopicData &td){
return topic == std::get<integral(TopicEnum::TopicName)>(td);
});
if ( it_topic == advertised_topics_list.end() ){
#ifdef DEBUG
std::cerr << "topic: " << topic << " not yet advertised" << std::endl;
#endif
return;
}
rapidjson::StringBuffer strbuf; rapidjson::StringBuffer strbuf;
rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf); rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf);
msg.Accept(writer); msg.Accept(writer);
std::string client_name = "publish_client" + topic;
std::string message = "\"op\":\"publish\", \"topic\":\"" + topic + "\", \"msg\":" + strbuf.GetString(); std::string message = "\"op\":\"publish\", \"topic\":\"" + topic + "\", \"msg\":" + strbuf.GetString();
if (id.compare("") != 0) if (id.compare(""