Commit 5c6b9306 authored by Valentin Platzgummer's avatar Valentin Platzgummer

rosbridge works good now

parent 585c0b82
......@@ -45,7 +45,7 @@ public:
bool isRunning();
private:
bool _running;
std::shared_ptr<std::atomic_bool> _stopped;
TypeFactory _typeFactory;
JsonFactory _jsonFactory;
CasePacker _casePacker;
......
......@@ -30,26 +30,30 @@ constexpr typename std::underlying_type<T>::type integral(T value)
class RosbridgeWsClient
{
using TopicData = std::tuple<std::string /*topic*/,
std::string /*client name*/,
std::shared_ptr<WsClient> /*client*/,
std::shared_ptr<std::atomic_bool> /*topic ready*/>;
enum class TopicEnum{
TopicName = 0,
ClientName = 1,
Client = 2,
Ready = 3
};
enum class EntryType{
SubscribedTopic,
AdvertisedTopic,
AdvertisedService,
};
using EntryData = std::tuple<EntryType,
std::string /*service/topic name*/,
std::string /*client name*/,
std::weak_ptr<WsClient> /*client*/>;
enum class EntryEnum{
EntryType = 0,
ServiceTopicName = 1,
ClientName = 2,
WPClient = 3
};
std::string server_port_path;
std::unordered_map<std::string /*client name*/,
std::shared_ptr<WsClient> /*client*/> client_map;
std::deque<TopicData> advertised_topics_list;
std::deque<EntryData> service_topic_list;
std::mutex mutex;
// Set to true on construction and to false on destruction. Used to notify independet threads.
std::shared_ptr<std::atomic_bool> alive;
void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message)
{
if (!client->on_open)
......@@ -112,23 +116,20 @@ class RosbridgeWsClient
}
public:
RosbridgeWsClient(const std::string& server_port_path) :
alive(std::make_shared<std::atomic_bool>(true))
RosbridgeWsClient(const std::string& server_port_path)
{
this->server_port_path = server_port_path;
}
~RosbridgeWsClient()
{
for (auto& topicData : advertised_topics_list){
unadvertise(std::get<integral(TopicEnum::ClientName)>(topicData),
std::get<integral(TopicEnum::TopicName)>(topicData));
}
unsubscribeAll();
unadvertiseAll();
unadvertiseAllServices();
for (auto& client : client_map)
{
removeClient(client.first);
}
alive->store(false);
}
// The execution can take up to 100 ms!
......@@ -225,6 +226,10 @@ public:
#endif
std::this_thread::sleep_for(std::chrono::milliseconds(100));
client->stop();
client->on_open = NULL;
client->on_message = NULL;
client->on_close = NULL;
client->on_error = NULL;
#ifdef DEBUG
std::cout << client_name << " has been removed" << std::endl;
#endif
......@@ -246,16 +251,15 @@ public:
//!
//! \note This function will wait until the /rosapi/topics service is available.
//! \note Call connected() in advance to ensure that a connection was established.
//! \pre Connection must be established, \see \fn connected().
//!
std::string get_advertised_topics(){
this->waitForService("/rosapi/topics");
std::shared_ptr<std::promise<std::string>> promise;
auto future = promise->get_future();
// Call /rosapi/topics to see if topic is already available.
this->callService("/rosapi/topics", [promise](
auto pPromise(std::make_shared<std::promise<std::string>>());
auto future = pPromise->get_future();
this->callService("/rosapi/topics", [pPromise](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
promise->set_value(in_message->string());
pPromise->set_value(in_message->string());
connection->send_close(1000);
});
future.wait();
......@@ -270,20 +274,28 @@ public:
//! \note Call connected() in advance to ensure that a connection was established.
//!
std::string get_advertised_services(){
this->waitForService("/rosapi/services");
std::shared_ptr<std::promise<std::string>> promise;
auto future = promise->get_future();
auto pPromise(std::make_shared<std::promise<std::string>>());
auto future = pPromise->get_future();
// Call /rosapi/services to see if topic is already available.
this->callService("/rosapi/services", [promise](
this->callService("/rosapi/services", [pPromise](
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
promise->set_value(in_message->string());
pPromise->set_value(in_message->string());
connection->send_close(1000);
});
future.wait();
return future.get();
}
bool topicAvailable(const std::string &topic){
#ifdef DEBUG
std::cout << "checking if topic " << topic << " is available" << std::endl;
#endif
std::string advertised_topics = this->get_advertised_topics();
auto pos = advertised_topics.find(topic);
return pos != std::string::npos ? true : false;
}
// Gets the client from client_map and starts it. Advertising is essentially sending a message.
// One client per topic!
void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "")
......@@ -292,20 +304,20 @@ public:
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it_client = client_map.find(client_name);
if (it_client != client_map.end())
{
auto it_topic = std::find_if(advertised_topics_list.begin(),
advertised_topics_list.end(),
[topic](const TopicData &td){
return topic == std::get<integral(TopicEnum::TopicName)>(td);
auto it_ser_top = std::find_if(service_topic_list.begin(),
service_topic_list.end(),
[topic](const EntryData &td){
return topic == std::get<integral(EntryEnum::ServiceTopicName)>(td);
});
if ( it_topic != advertised_topics_list.end()){
if ( it_ser_top != service_topic_list.end()){
#ifdef DEBUG
std::cerr << "topic: " << topic << " already advertised" << std::endl;
#endif
return;
}
auto client = it_client->second;
auto ready = std::make_shared<std::atomic_bool>(false);
advertised_topics_list.push_back(std::make_tuple(topic, client_name, client, ready));
std::weak_ptr<WsClient> wpClient = client;
service_topic_list.push_back(std::make_tuple(EntryType::AdvertisedTopic, topic, client_name, wpClient));
std::string message = "\"op\":\"advertise\", \"topic\":\"" + topic + "\", \"type\":\"" + type + "\"";
if (id.compare("") != 0)
......@@ -315,7 +327,7 @@ public:
message = "{" + message + "}";
#ifdef DEBUG
client->on_open = [this, topic, message, ready, client_name](std::shared_ptr<WsClient::Connection> connection) {
client->on_open = [this, topic, message, client_name](std::shared_ptr<WsClient::Connection> connection) {
#else
client->on_open = [this, topic, message, ready](std::shared_ptr<WsClient::Connection> connection) {
#endif
......@@ -325,22 +337,6 @@ public:
std::cout << client_name << ": Sending message: " << message << std::endl;
#endif
connection->send(message);
// Now wait until topic is successfully advertised.
while (true){
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::string advertised_topics = this->get_advertised_topics();
auto pos = advertised_topics.find(topic);
if (pos != std::string::npos){ // topic advertised!
#ifdef DEBUG
std::cout << client_name << ": topic " << topic << "successfully advertised" << std::endl;
#endif
break;
}
#ifdef DEBUG
std::cout << client_name << ": waiting for topic " << topic << "to be advertised" << std::endl;
#endif
}
ready->store(true); // Mark topic as successfully advertised.
};
start(client_name, client, message);
......@@ -353,73 +349,79 @@ public:
#endif
}
void unadvertise(const std::string& client_name,
const std::string& topic,
//!
//! \brief Unadvertises the topice \p topic
//! \param topic The topic to be unadvertised
//! \param id
//! \pre The topic \p topic must be advertised, \see topicAvailable().
//!
void unadvertise(const std::string& topic,
const std::string& id = ""){
std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it_client = client_map.find(client_name);
if (it_client != client_map.end())
{
// Topic advertised?
auto it_topic = std::find_if(advertised_topics_list.begin(),
advertised_topics_list.end(),
[topic](const TopicData &td){
return topic == std::get<integral(TopicEnum::TopicName)>(td);
});
if ( it_topic == advertised_topics_list.end()){
auto it_ser_top = std::find_if(service_topic_list.begin(),
service_topic_list.end(),
[topic](const EntryData &td){
return topic == std::get<integral(EntryEnum::ServiceTopicName)>(td);
});
if ( it_ser_top == service_topic_list.end()){
#ifdef DEBUG
std::cerr << "topic: " << topic << " not advertised" << std::endl;
std::cerr << "topic: " << topic << " not advertised" << std::endl;
#endif
return;
}
return;
}
std::string message = "\"op\":\"unadvertise\"";
if (id.compare("") != 0)
{
std::string message = "\"op\":\"unadvertise\"";
if (id.compare("") != 0)
{
message += ", \"id\":\"" + id + "\"";
}
message += ", \"topic\":\"" + topic + "\"";
message = "{" + message + "}";
}
message += ", \"topic\":\"" + topic + "\"";
message = "{" + message + "}";
auto client = it_client->second;
auto ready = std::get<integral(TopicEnum::Ready)>(*it_topic);
std::string client_name = "topic_unadvertiser" + topic;
auto client = std::make_shared<WsClient>(server_port_path);
#ifdef DEBUG
client->on_open = [client_name, message, ready](std::shared_ptr<WsClient::Connection> connection) {
client->on_open = [client_name, message](std::shared_ptr<WsClient::Connection> connection) {
#else
client->on_open = [message, ready](std::shared_ptr<WsClient::Connection> connection) {
client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
#endif
#ifdef DEBUG
std::cout << client_name << ": Opened connection" << std::endl;
std::cout << client_name << ": Sending message: " << message << std::endl;
#endif
while ( !ready->load() ){ // Wait for topic to be advertised.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
connection->send(message);
connection->send(message); // unadvertise
connection->send_close(1000);
};
start(client_name, client, message);
advertised_topics_list.erase(it_topic);
}
#ifdef DEBUG
else
{
std::cerr << client_name << "has not been created" << std::endl;
}
#endif
service_topic_list.erase(it_ser_top);
}
void unadvertiseAll(){
for (auto entry : service_topic_list){
if ( std::get<integral(EntryEnum::EntryType)>(entry) == EntryType::AdvertisedTopic ){
unadvertise(std::get<integral(EntryEnum::ServiceTopicName)>(entry));
}
}
}
//!
//! \brief Publishes the message \p msg to the topic \p topic.
//! \param topic The topic to publish the message.
//! \param msg The message to publish.
//! \param id
//! \pre The topic \p topic must be advertised, \see topicAvailable().
//!
void publish(const std::string& topic, const rapidjson::Document& msg, const std::string& id = "")
{
std::lock_guard<std::mutex> lk(mutex);
auto it_topic = std::find_if(advertised_topics_list.begin(),
advertised_topics_list.end(),
[&topic](const TopicData &td){
return topic == std::get<integral(TopicEnum::TopicName)>(td);
auto it_ser_top = std::find_if(service_topic_list.begin(),
service_topic_list.end(),
[&topic](const EntryData &td){
return topic == std::get<integral(EntryEnum::ServiceTopicName)>(td);
});
if ( it_topic == advertised_topics_list.end() ){
if ( it_ser_top == service_topic_list.end() ){
#ifdef DEBUG
std::cerr << "topic: " << topic << " not yet advertised" << std::endl;
#endif
......@@ -439,16 +441,16 @@ public:
message = "{" + message + "}";
std::shared_ptr<WsClient> publish_client = std::make_shared<WsClient>(server_port_path);
auto ready = std::get<integral(TopicEnum::Ready)>(*it_topic);
publish_client->on_open = [message, client_name, ready](std::shared_ptr<WsClient::Connection> connection) {
#ifdef DEBUG
publish_client->on_open = [message, client_name](std::shared_ptr<WsClient::Connection> connection) {
#else
publish_client->on_open = [message, client_name](std::shared_ptr<WsClient::Connection> connection) {
#endif
#ifdef DEBUG
std::cout << client_name << ": Opened connection" << std::endl;
std::cout << client_name << ": Sending message." << std::endl;
//std::cout << client_name << ": Sending message: " << message << std::endl;
#endif
while ( !ready->load() ){ // Wait for the topic to be successfully advertised.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
connection->send(message);
// TODO: This could be improved by creating a thread to keep publishing the message instead of closing it right away
......@@ -458,12 +460,40 @@ public:
start(client_name, publish_client, message);
}
//!
//! \brief Subscribes the client \p client_name to the topic \p topic.
//! \param client_name
//! \param topic
//! \param callback
//! \param id
//! \param type
//! \param throttle_rate
//! \param queue_length
//! \param fragment_size
//! \param compression
//! \pre The topic \p topic must be advertised, \see topicAvailable().
//!
void subscribe(const std::string& client_name, const std::string& topic, const InMessage& callback, const std::string& id = "", const std::string& type = "", int throttle_rate = -1, int queue_length = -1, int fragment_size = -1, const std::string& compression = "")
{
std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end())
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it_client = client_map.find(client_name);
if (it_client != client_map.end())
{
auto it_ser_top = std::find_if(service_topic_list.begin(),
service_topic_list.end(),
[topic](const EntryData &td){
return topic == std::get<integral(EntryEnum::ServiceTopicName)>(td);
});
if ( it_ser_top != service_topic_list.end()){
#ifdef DEBUG
std::cerr << "topic: " << topic << " already advertised" << std::endl;
#endif
return;
}
auto client = it_client->second;
std::weak_ptr<WsClient> wpClient = client;
service_topic_list.push_back(std::make_tuple(EntryType::SubscribedTopic, topic, client_name, wpClient));
std::string message = "\"op\":\"subscribe\", \"topic\":\"" + topic + "\"";
if (id.compare("") != 0)
......@@ -490,34 +520,10 @@ public:
{
message += ", \"compression\":\"" + compression + "\"";
}
message = "{" + message + "}";
message = "{" + message + "}";
auto client = it->second;
client->on_message = callback;
const auto cAlive = this->alive;
// Start thread to wait for topic to be advertised.
std::thread t([this, client_name, client, message, cAlive, topic](){
while (cAlive->load()){ // RosBridgeClient alive?
auto advertised_topics = this->get_advertised_topics();
auto pos = advertised_topics.find(topic);
if (pos != std::string::npos){ // desired topic available!
#ifdef DEBUG
std::cout << client_name << ": topic " << topic << "available" << std::endl;
#endif
break;
}
#ifdef DEBUG
std::cout << client_name << ": waiting for topic " << topic << std::endl;
#endif
std::this_thread::sleep_for(std::chrono::seconds(1));
}
if ( cAlive->load() ){
this->start(client_name, client, message); // subscribe to topic
}
});
t.detach();
this->start(client_name, client, message); // subscribe to topic
}
#ifdef DEBUG
else
......@@ -527,16 +533,82 @@ public:
#endif
}
void unsubscribe(const std::string& topic,
const std::string& id = ""){
std::lock_guard<std::mutex> lk(mutex);
auto it_ser_top = std::find_if(service_topic_list.begin(),
service_topic_list.end(),
[topic](const EntryData &td){
return topic == std::get<integral(EntryEnum::ServiceTopicName)>(td);
});
if ( it_ser_top == service_topic_list.end()){
#ifdef DEBUG
std::cerr << "topic: " << topic << " not advertised" << std::endl;
#endif
return;
}
std::string message = "\"op\":\"unsubscribe\"";
if (id.compare("") != 0)
{
message += ", \"id\":\"" + id + "\"";
}
message += ", \"topic\":\"" + topic + "\"";
message = "{" + message + "}";
std::string client_name = "topic_unsubscriber" + topic;
auto client = std::make_shared<WsClient>(server_port_path);
#ifdef DEBUG
client->on_open = [client_name, message](std::shared_ptr<WsClient::Connection> connection) {
#else
client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
#endif
#ifdef DEBUG
std::cout << client_name << ": Opened connection" << std::endl;
std::cout << client_name << ": Sending message: " << message << std::endl;
#endif
connection->send(message); // unadvertise
connection->send_close(1000);
};
start(client_name, client, message);
service_topic_list.erase(it_ser_top);
}
void unsubscribeAll(){
for (auto entry : service_topic_list){
if( std::get<integral(EntryEnum::EntryType)>(entry) == EntryType::SubscribedTopic ) {
unsubscribe(std::get<integral(EntryEnum::ServiceTopicName)>(entry));
}
}
}
void advertiseService(const std::string& client_name, const std::string& service, const std::string& type, const InMessage& callback)
{
std::lock_guard<std::mutex> lk(mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end())
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it_client = client_map.find(client_name);
if (it_client != client_map.end())
{
auto it_ser_top = std::find_if(service_topic_list.begin(),
service_topic_list.end(),
[service](const EntryData &td){
return service == std::get<integral(EntryEnum::ServiceTopicName)>(td);
});
if ( it_ser_top != service_topic_list.end()){
#ifdef DEBUG
std::cerr << "service: " << service << " already advertised" << std::endl;
#endif
return;
}
auto client = it_client->second;
std::weak_ptr<WsClient> wpClient = client;
service_topic_list.push_back(std::make_tuple(EntryType::AdvertisedService, service, client_name, wpClient));
std::string message = "{\"op\":\"advertise_service\", \"service\":\"" + service + "\", \"type\":\"" + type + "\"}";
it->second->on_message = callback;
start(client_name, it->second, message);
it_client->second->on_message = callback;
start(client_name, it_client->second, message);
}
#ifdef DEBUG
else
......@@ -546,6 +618,52 @@ public:
#endif
}
void unadvertiseService(const std::string& service){
std::lock_guard<std::mutex> lk(mutex);
auto it_ser_top = std::find_if(service_topic_list.begin(),
service_topic_list.end(),
[service](const EntryData &td){
return service == std::get<integral(EntryEnum::ServiceTopicName)>(td);
});
if ( it_ser_top == service_topic_list.end()){
#ifdef DEBUG
std::cerr << "service: " << service << " not advertised" << std::endl;
#endif
return;
}
std::string message = "\"op\":\"unadvertise_service\"";
message += ", \"service\":\"" + service + "\"";
message = "{" + message + "}";
std::string client_name = "topic_unsubscriber" + service;
auto client = std::make_shared<WsClient>(server_port_path);
#ifdef DEBUG
client->on_open = [client_name, message](std::shared_ptr<WsClient::Connection> connection) {
#else
client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
#endif
#ifdef DEBUG
std::cout << client_name << ": Opened connection" << std::endl;
std::cout << client_name << ": Sending message: " << message << std::endl;
#endif
connection->send(message); // unadvertise
connection->send_close(1000);
};
start(client_name, client, message);
service_topic_list.erase(it_ser_top);
}
void unadvertiseAllServices(){
for (auto entry : service_topic_list){
if( std::get<integral(EntryEnum::EntryType)>(entry) == EntryType::AdvertisedService ) {
unadvertiseService(std::get<integral(EntryEnum::ServiceTopicName)>(entry));
}
}
}
void serviceResponse(const std::string& service, const std::string& id, bool result, const rapidjson::Document& values)
{
std::string message = "\"op\":\"service_response\", \"service\":\"" + service + "\", \"result\":" + ((result)? "true" : "false");
......@@ -562,19 +680,24 @@ public:
message += ", \"values\":" + std::string(strbuf.GetString());
message = "{" + message + "}";
std::string client_name = "service_response_client" + service;
std::shared_ptr<WsClient> service_response_client = std::make_shared<WsClient>(server_port_path);
#ifdef DEBUG
service_response_client->on_open = [message, client_name](std::shared_ptr<WsClient::Connection> connection) {
#else
service_response_client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
#endif
#ifdef DEBUG
std::cout << "service_response_client: Opened connection" << std::endl;
std::cout << "service_response_client: Sending message: " << message << std::endl;
std::cout << client_name << ": Opened connection" << std::endl;
std::cout << client_name << ": Sending message: " << message << std::endl;
#endif
connection->send(message);
connection->send_close(1000);
};
start("service_response_client", service_response_client, message);
start(client_name, service_response_client, message);
}
void callService(const std::string& service, const InMessage& callback, const rapidjson::Document& args = {}, const std::string& id = "", int fragment_size = -1, const std::string& compression = "")
......@@ -603,6 +726,7 @@ public:
}
message = "{" + message + "}";
std::string client_name = "call_service_client" + service;
std::shared_ptr<WsClient> call_service_client = std::make_shared<WsClient>(server_port_path);
if (callback)
......@@ -611,16 +735,16 @@ public:
}
else
{
call_service_client->on_message = [](std::shared_ptr<WsClient::Connection> connection, std::shared_ptr<WsClient::InMessage> in_message) {
call_service_client->on_message = [client_name](std::shared_ptr<WsClient::Connection> connection, std::shared_ptr<WsClient::InMessage> in_message) {
#ifdef DEBUG
std::cout << "call_service_client: Message received: " << in_message->string() << std::endl;
std::cout << "call_service_client: Sending close connection" << std::endl;
std::cout << client_name << ": Message received: " << in_message->string() << std::endl;
std::cout << client_name << ": Sending close connection" << std::endl;
#endif
connection->send_close(1000);
};
}
start("call_service_client", call_service_client, message);
start(client_name, call_service_client, message);
}
//!
......@@ -631,70 +755,12 @@ public:
//!
bool serviceAvailable(const std::string& service)
{
const rapidjson::Document args = {};
const std::string& id = "";
const int fragment_size = -1;
const std::string& compression = "";
std::string message = "\"op\":\"call_service\", \"service\":\"" + service + "\"";
std::string client_name("wait_for_service_client");
if (!args.IsNull())
{
rapidjson::StringBuffer strbuf;
rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf);
args.Accept(writer);
message += ", \"args\":" + std::string(strbuf.GetString());
}
if (id.compare("") != 0)
{
message += ", \"id\":\"" + id + "\"";
}
if (fragment_size > -1)
{
message += ", \"fragment_size\":" + std::to_string(fragment_size);
}
if (compression.compare("none") == 0 || compression.compare("png") == 0)
{
message += ", \"compression\":\"" + compression + "\"";
}
message = "{" + message + "}";
std::shared_ptr<WsClient> wait_for_service_client = std::make_shared<WsClient>(server_port_path);
std::shared_ptr<std::promise<bool>> p(std::make_shared<std::promise<bool>>());
auto future = p->get_future();
#ifdef DEBUG
wait_for_service_client->on_message = [p, service, client_name](
#else
wait_for_service_client->on_message = [p](
#endif
std::shared_ptr<WsClient::Connection> connection,
std::shared_ptr<WsClient::InMessage> in_message){
#ifdef DEBUG
#endif
rapidjson::Document doc;
doc.Parse(in_message->string().c_str());
if ( !doc.HasParseError()
&& doc.HasMember("result")
&& doc["result"].IsBool()
&& doc["result"] == true )
{
#ifdef DEBUG
std::cout << client_name << ": "
<< "service " << service
<< " available." << std::endl;
std::cout << client_name << ": "
<< "message: " << in_message->string()
<< std::endl;
#endif
p->set_value(true);
} else {
p->set_value(false);
}
connection->send_close(1000);
};
start(client_name, wait_for_service_client, message);
return future.get();
std::cout << "checking if service " << service << " is available" << std::endl;
#endif
std::string advertised_services = this->get_advertised_services();
auto pos = advertised_services.find(service);
return pos != std::string::npos ? true : false;
}
//!
......@@ -704,28 +770,80 @@ public:
//!
void waitForService(const std::string& service)
{
auto stop = std::make_shared<std::atomic_bool>(false);
waitForService(service, stop);
}
//!
//! \brief Waits until the service with the name \p service is available.
//! \param service Service name.
//! \param stop Flag to stop waiting.
//! @note This function will block as long as the service is not available or \p stop is false.
//!
void waitForService(const std::string& service, const std::shared_ptr<std::atomic_bool> stop)
{
#ifdef DEBUG
auto s = std::chrono::high_resolution_clock::now();
long counter = 0;
#endif
while( !stop->load() )
{
#ifdef DEBUG
++counter;
#endif
if ( serviceAvailable(service) ){
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid excessive polling.
};
#ifdef DEBUG
auto e = std::chrono::high_resolution_clock::now();
std::cout << "waitForService() " << service << " time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(e-s).count()
<< " ms." << std::endl;
std::cout << "waitForTopic() " << service << ": number of calls to topicAvailable: "
<< counter << std::endl;
#endif
}
//!
//! \brief Waits until the topic with the name \p topic is available.
//! \param topic Topic name.
//! @note This function will block as long as the topic is not available.
//!
void waitForTopic(const std::string& topic)
{
auto stop = std::make_shared<std::atomic_bool>(false);
waitForTopic(topic, stop);
}
//!
//! \brief Waits until the topic with the name \p topic is available.
//! \param topic Topic name.
//! \param stop Flag to stop waiting.
//! @note This function will block as long as the topic is not available or \p stop is false.
//!
void waitForTopic(const std::string& topic, const std::shared_ptr<std::atomic_bool> stop)
{
#ifdef DEBUG
auto s = std::chrono::high_resolution_clock::now();
long counter = 0;
#endif
while(true)
while( !stop->load())
{
#ifdef DEBUG
++counter;
#endif
if (serviceAvailable(service)){
if ( topicAvailable(topic) ){
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Avoid excessive polling.
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid excessive polling.
};
#ifdef DEBUG
auto e = std::chrono::high_resolution_clock::now();
std::cout << "waitForService(): time: "
std::cout << "waitForTopic() " << topic << " time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(e-s).count()
<< " ms." << std::endl;
std::cout << "waitForService(): clients launched: "
std::cout << "waitForTopic() " << topic << ": number of calls to topicAvailable: "
<< counter << std::endl;
#endif
}
......
......@@ -5,6 +5,7 @@
ROSBridge::ComPrivate::Server::Server(CasePacker &casePacker, RosbridgeWsClient &rbc) :
_rbc(rbc)
, _casePacker(casePacker)
, _stopped(std::make_shared<std::atomic_bool>(true))
{
}
......@@ -14,10 +15,10 @@ void ROSBridge::ComPrivate::Server::advertiseService(const std::string &service,
const Server::CallbackType &userCallback)
{
std::string clientName = _serviceAdvertiserKey + service;
auto it = std::find(_clientList.begin(), _clientList.end(), clientName);
if (it != _clientList.end())
auto it = _serviceMap.find(clientName);
if (it != _serviceMap.end())
return; // Service allready advertised.
_clientList.push_back(clientName);
_serviceMap.insert(std::make_pair(clientName, service));
_rbc.addClient(clientName);
auto rbc = &_rbc;
......@@ -80,14 +81,16 @@ ROSBridge::ComPrivate::Server::~Server()
void ROSBridge::ComPrivate::Server::start()
{
_running = true;
_stopped->store(false);
}
void ROSBridge::ComPrivate::Server::reset()
{
if (!_running)
if ( _stopped->load() )
return;
for (const auto& str : _clientList)
_rbc.removeClient(str);
_clientList.clear();
for (const auto& item : _serviceMap){
_rbc.unadvertiseService(item.second);
_rbc.removeClient(item.first);
}
_serviceMap.clear();
}
......@@ -5,12 +5,14 @@
#include "ros_bridge/include/TypeFactory.h"
#include "ros_bridge/include/CasePacker.h"
#include <unordered_map>
namespace ROSBridge {
namespace ComPrivate {
class Server
{
typedef std::vector<std::string> ClientList;
typedef std::unordered_map<std::string, std::string> ServiceMap;
public:
typedef std::function<JsonDocUPtr(JsonDocUPtr)> CallbackType;
......@@ -33,8 +35,8 @@ private:
RosbridgeWsClient &_rbc;
CasePacker &_casePacker;
ClientList _clientList;
bool _running;
ServiceMap _serviceMap;
std::shared_ptr<std::atomic_bool> _stopped;
};
} // namespace ComPrivate
} // namespace ROSBridge
#include "TopicPublisher.h"
struct ROSBridge::ComPrivate::ThreadData
{
const ROSBridge::CasePacker &casePacker;
RosbridgeWsClient &rbc;
ROSBridge::ComPrivate::JsonQueue &queue;
std::mutex &queueMutex;
const std::atomic<bool> &running;
std::condition_variable &cv;
};
#include <unordered_map>
ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker &casePacker,
RosbridgeWsClient &rbc) :
_running(false)
_stopped(std::make_shared<std::atomic_bool>(true))
, _casePacker(casePacker)
, _rbc(rbc)
{
......@@ -30,25 +18,65 @@ ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher()
void ROSBridge::ComPrivate::TopicPublisher::start()
{
if ( _running.load() ) // start called while thread running.
if ( !_stopped->load() ) // start called while thread running.
return;
_running.store(true);
ROSBridge::ComPrivate::ThreadData data{
_casePacker,
_rbc,
_queue,
_queueMutex,
_running,
_cv
};
_pThread = std::make_unique<std::thread>(&ROSBridge::ComPrivate::transmittLoop, data);
_stopped->store(false);
_pThread = std::make_unique<std::thread>([this]{
// Init.
std::unordered_map<std::string, std::string> topicMap;
// Main Loop.
while( !this->_stopped->load()){
std::unique_lock<std::mutex> lk(this->_mutex);
// Check if new data available, wait if not.
if (this->_queue.empty()){
this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case.
continue;
}
// Pop message from queue.
JsonDocUPtr pJsonDoc(std::move(this->_queue.front()));
this->_queue.pop_front();
lk.unlock();
// Get tag from Json message and remove it.
Tag tag;
bool ret = this->_casePacker.getTag(pJsonDoc, tag);
assert(ret); // Json message does not contain a tag;
(void)ret;
this->_casePacker.removeTag(pJsonDoc);
// Check if topic must be advertised.
// Advertised topics are stored in advertisedTopicsHashList as
// a hash.
std::string clientName = ROSBridge::ComPrivate::_topicAdvertiserKey
+ tag.topic();
auto it = topicMap.find(clientName);
if ( it == topicMap.end()) { // Need to advertise topic?
topicMap.insert(std::make_pair(clientName, tag.topic()));
this->_rbc.addClient(clientName);
this->_rbc.advertise(clientName,
tag.topic(),
tag.messageType() );
this->_rbc.waitForTopic(tag.topic(), this->_stopped); // Wait until topic is advertised.
}
// Publish Json message.
if ( !this->_stopped->load() )
this->_rbc.publish(tag.topic(), *pJsonDoc.get());
} // while loop
// Tidy up.
for (auto& it : topicMap){
this->_rbc.unadvertise(it.second);
this->_rbc.removeClient(it.first);
}
});
}
void ROSBridge::ComPrivate::TopicPublisher::reset()
{
if ( !_running.load() ) // stop called while thread not running.
if ( _stopped->load() ) // stop called while thread not running.
return;
_running.store(false);
_stopped->store(true);
_cv.notify_one(); // Wake publisher thread.
if ( !_pThread )
return;
......@@ -56,51 +84,3 @@ void ROSBridge::ComPrivate::TopicPublisher::reset()
_queue.clear();
}
void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data)
{
// Init.
ClientMap clientMap;
// Main Loop.
while(data.running.load()){
std::unique_lock<std::mutex> lk(data.queueMutex);
// Check if new data available, wait if not.
if (data.queue.empty()){
data.cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case.
continue;
}
// Pop message from queue.
JsonDocUPtr pJsonDoc(std::move(data.queue.front()));
data.queue.pop_front();
lk.unlock();
// Get tag from Json message and remove it.
Tag tag;
bool ret = data.casePacker.getTag(pJsonDoc, tag);
assert(ret); // Json message does not contain a tag;
(void)ret;
data.casePacker.removeTag(pJsonDoc);
// Check if topic must be advertised.
// Advertised topics are stored in advertisedTopicsHashList as
// a hash.
std::string clientName = ROSBridge::ComPrivate::_topicAdvertiserKey
+ tag.topic();
HashType hash = ROSBridge::ComPrivate::getHash(clientName);
auto it = clientMap.find(hash);
if ( it == clientMap.end()) { // Need to advertise topic?
clientMap.insert(std::make_pair(hash, clientName));
data.rbc.addClient(clientName);
data.rbc.advertise(clientName,
tag.topic(),
tag.messageType() );
}
// Publish Json message.
data.rbc.publish(tag.topic(), *pJsonDoc.get());
} // while loop
// Tidy up.
for (auto& it : clientMap)
data.rbc.removeClient(it.second);
}
......@@ -37,7 +37,7 @@ public:
void publish(JsonDocUPtr docPtr){
{
std::lock_guard<std::mutex> lock(_queueMutex);
std::lock_guard<std::mutex> lock(_mutex);
_queue.push_back(std::move(docPtr));
}
_cv.notify_one(); // Wake publisher thread.
......@@ -51,8 +51,8 @@ public:
private:
JsonQueue _queue;
std::mutex _queueMutex;
std::atomic<bool> _running;
std::mutex _mutex;
std::shared_ptr<std::atomic_bool> _stopped;
CasePacker &_casePacker;
RosbridgeWsClient &_rbc;
CondVar _cv;
......@@ -60,8 +60,5 @@ private:
};
void transmittLoop(ThreadData data);
} // namespace CommunicatorDetail
} // namespace ROSBridge
......@@ -5,7 +5,7 @@ ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(CasePacker &casePacker,
RosbridgeWsClient &rbc) :
_casePacker(casePacker)
, _rbc(rbc)
, _running(false)
, _stopped(std::make_shared<std::atomic_bool>(true))
{
}
......@@ -17,120 +17,78 @@ ROSBridge::ComPrivate::TopicSubscriber::~TopicSubscriber()
void ROSBridge::ComPrivate::TopicSubscriber::start()
{
_running = true;
_stopped->store(false);
}
void ROSBridge::ComPrivate::TopicSubscriber::reset()
{
if ( _running ){
_running = false;
if ( !_stopped->load() ){
_stopped->store(true);
{
for (std::string clientName : _clientList){
_rbc.removeClient(clientName);
for (auto &item : _topicMap){
_rbc.unsubscribe(item.second);
_rbc.removeClient(item.first);
}
}
{
std::lock_guard<std::mutex> lk(_callbackMapStruct.mutex);
_callbackMapStruct.map.clear();
}
_clientList.clear();
_topicMap.clear();
}
}
bool ROSBridge::ComPrivate::TopicSubscriber::subscribe(
void ROSBridge::ComPrivate::TopicSubscriber::subscribe(
const char *topic,
const std::function<void(JsonDocUPtr)> &callback)
{
if ( !_running )
return false;
if ( _stopped->load() )
return;
std::string clientName = ROSBridge::ComPrivate::_topicSubscriberKey
+ std::string(topic);
_clientList.push_back(clientName);
HashType hash = getHash(clientName);
{
std::lock_guard<std::mutex> lk(_callbackMapStruct.mutex);
auto ret = _callbackMapStruct.map.insert(std::make_pair(hash, callback)); //
if ( !ret.second )
return false; // Topic subscription already present.
auto it = _topicMap.find(clientName);
if ( it != _topicMap.end() ){ // Topic already subscribed?
return;
}
_topicMap.insert(std::make_pair(clientName, std::string(topic)));
using namespace std::placeholders;
auto f = std::bind(&ROSBridge::ComPrivate::subscriberCallback,
hash,
std::ref(_callbackMapStruct),
_1, _2);
// std::cout << std::endl;
// std::cout << "topic subscription" << std::endl;
// std::cout << "client name: " << clientName << std::endl;
// std::cout << "topic: " << topic << std::endl;
{
auto start = std::chrono::high_resolution_clock::now();
auto callbackWrapper = [callback](
std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message){
// Parse document.
JsonDoc docFull;
docFull.Parse(in_message->string().c_str());
if ( docFull.HasParseError() ) {
std::cout << "Json document has parse error: "
<< in_message->string()
<< std::endl;
return;
} else if (!docFull.HasMember("msg")) {
std::cout << "Json document does not contain a message (\"msg\"): "
<< in_message->string()
<< std::endl;
return;
}
// Extract message and call callback.
JsonDocUPtr pDoc(new JsonDoc());
pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator());
callback(std::move(pDoc));
return;
};
if ( !_rbc.topicAvailable(topic) ){
// Wait until topic is available.
std::thread t([this, clientName, topic, callbackWrapper]{
this->_rbc.waitForTopic(topic, this->_stopped);
if ( !this->_stopped->load() ){
this->_rbc.addClient(clientName);
this->_rbc.subscribe(clientName, topic, callbackWrapper);
}
});
t.detach();
} else {
_rbc.addClient(clientName);
auto end = std::chrono::high_resolution_clock::now();
std::cout << "add client time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms" << std::endl;
start = std::chrono::high_resolution_clock::now();
_rbc.subscribe(clientName,
topic,
f );
end = std::chrono::high_resolution_clock::now();
std::cout << "subscribe time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms" << std::endl;
_rbc.subscribe(clientName, topic, callbackWrapper);
}
return true;
}
using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>;
void ROSBridge::ComPrivate::subscriberCallback(
const HashType &hash,
ROSBridge::ComPrivate::MapStruct &mapWrapper,
std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message)
{
// Parse document.
JsonDoc docFull;
docFull.Parse(in_message->string().c_str());
if ( docFull.HasParseError() ) {
std::cout << "Json document has parse error: "
<< in_message->string()
<< std::endl;
return;
} else if (!docFull.HasMember("msg")) {
std::cout << "Json document does not contain a message (\"msg\"): "
<< in_message->string()
<< std::endl;
return;
}
// std::cout << "Json document: "
// << in_message->string()
// << std::endl;
// Search callback.
CallbackType callback;
{
std::lock_guard<std::mutex> lk(mapWrapper.mutex);
auto it = mapWrapper.map.find(hash);
if (it == mapWrapper.map.end()) {
//assert(false); // callback not found
return;
}
callback = it->second;
}
// Extract message and call callback.
JsonDocUPtr pDoc(new JsonDoc());
pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator());
callback(std::move(pDoc));
return;
}
......@@ -5,20 +5,17 @@
#include "ros_bridge/include/TypeFactory.h"
#include "ros_bridge/include/CasePacker.h"
#include <unordered_map>
namespace ROSBridge {
namespace ComPrivate {
typedef std::function<void(JsonDocUPtr)> CallbackType;
struct MapStruct{
typedef std::map<HashType, CallbackType> Map;
Map map;
std::mutex mutex;
};
class TopicSubscriber
{
typedef std::vector<std::string> ClientList;
typedef std::unordered_map<std::string, std::string> TopicMap;
public:
TopicSubscriber() = delete;
......@@ -34,7 +31,7 @@ public:
//! @return Returns false if a subscription to this topic allready exists.
//!
//! @note Only one callback can be registered.
bool subscribe(const char* topic, const CallbackType &callback);
void subscribe(const char* topic, const CallbackType &callback);
private:
......@@ -42,14 +39,9 @@ private:
CasePacker &_casePacker;
RosbridgeWsClient &_rbc;
MapStruct _callbackMapStruct;
ClientList _clientList;
bool _running;
TopicMap _topicMap;
std::shared_ptr<std::atomic_bool> _stopped;
};
void subscriberCallback(const HashType &hash,
MapStruct &mapWrapper,
std::shared_ptr<WsClient::Connection> /*connection*/,
std::shared_ptr<WsClient::InMessage> in_message);
} // namespace ComPrivate
} // namespace ROSBridge
#include "ros_bridge/include/ROSBridge.h"
ROSBridge::ROSBridge::ROSBridge() :
_running(false)
_stopped(std::make_shared<std::atomic_bool>(true))
, _casePacker(&_typeFactory, &_jsonFactory)
, _rbc("localhost:9090")
, _topicPublisher(_casePacker, _rbc)
......@@ -38,7 +38,7 @@ void ROSBridge::ROSBridge::start()
_topicPublisher.start();
_topicSubscriber.start();
_server.start();
_running = true;
_stopped->store(false);
}
void ROSBridge::ROSBridge::reset()
......@@ -46,7 +46,7 @@ void ROSBridge::ROSBridge::reset()
_topicPublisher.reset();
_topicSubscriber.reset();
_server.reset();
_running = false;
_stopped->store(true);
}
bool ROSBridge::ROSBridge::ping()
......@@ -56,6 +56,6 @@ bool ROSBridge::ROSBridge::ping()
bool ROSBridge::ROSBridge::isRunning()
{
return _running;
return !_stopped->load();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment