#pragma once /* * Created on: Apr 16, 2018 * Author: Poom Pianpak */ #include "Simple-WebSocket-Server/client_ws.hpp" #include "rapidjson/include/rapidjson/document.h" #include "rapidjson/include/rapidjson/writer.h" #include "rapidjson/include/rapidjson/stringbuffer.h" #include #include #include #include #include #include #include using WsClient = SimpleWeb::SocketClient; using InMessage = std::function, std::shared_ptr)>; template constexpr typename std::underlying_type::type integral(T value) { return static_cast::type>(value); } class RosbridgeWsClient { enum class EntryType{ SubscribedTopic, AdvertisedTopic, AdvertisedService, }; using EntryData = std::tuple /*client*/>; enum class EntryEnum{ EntryType = 0, ServiceTopicName = 1, ClientName = 2, WPClient = 3 }; std::string server_port_path; std::unordered_map /*client*/> client_map; std::deque service_topic_list; std::mutex mutex; void start(const std::string& client_name, std::shared_ptr client, const std::string& message) { if (!client->on_open) { #ifdef DEBUG client->on_open = [client_name, message](std::shared_ptr connection) { #else client->on_open = [message](std::shared_ptr connection) { #endif #ifdef DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); }; } #ifdef DEBUG if (!client->on_message) { client->on_message = [client_name](std::shared_ptr /*connection*/, std::shared_ptr in_message) { std::cout << client_name << ": Message received: " << in_message->string() << std::endl; }; } if (!client->on_close) { client->on_close = [client_name](std::shared_ptr /*connection*/, int status, const std::string & /*reason*/) { std::cout << client_name << ": Closed connection with status code " << status << std::endl; }; } if (!client->on_error) { // See http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference.html, Error Codes for error code meanings client->on_error = [client_name](std::shared_ptr /*connection*/, const SimpleWeb::error_code &ec) { std::cout << client_name << ": Error: " << ec << ", error message: " << ec.message() << std::endl; }; } #endif #ifdef DEBUG std::thread client_thread([client_name, client]() { #else std::thread client_thread([client]() { #endif client->start(); #ifdef DEBUG std::cout << client_name << ": Terminated" << std::endl; #endif client->on_open = NULL; client->on_message = NULL; client->on_close = NULL; client->on_error = NULL; }); client_thread.detach(); } public: RosbridgeWsClient(const std::string& server_port_path) { this->server_port_path = server_port_path; } ~RosbridgeWsClient() { unsubscribeAll(); unadvertiseAll(); unadvertiseAllServices(); for (auto& client : client_map) { removeClient(client.first); } } // The execution can take up to 100 ms! bool connected(){ // auto p = std::make_shared>(); // auto future = p->get_future(); // auto status_client = std::make_shared(server_port_path); // status_client->on_open = [p](std::shared_ptr) { // p->set_value(); // }; // std::thread t([status_client]{ // status_client->start(); // status_client->on_open = NULL; // status_client->on_message = NULL; // status_client->on_close = NULL; // status_client->on_error = NULL; // }); // auto status = future.wait_for(std::chrono::milliseconds(20)); // status_client->stop(); // t.join(); // bool connected = status == std::future_status::ready; // //std::cout << "connected(): " << connected << std::endl; // return connected; return true; } // Adds a client to the client_map. void addClient(const std::string& client_name) { std::lock_guard lk(mutex); std::unordered_map>::iterator it = client_map.find(client_name); if (it == client_map.end()) { client_map[client_name] = std::make_shared(server_port_path); } #ifdef DEBUG else { std::cerr << client_name << " has already been created" << std::endl; } #endif } std::shared_ptr getClient(const std::string& client_name) { std::lock_guard lk(mutex); std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { return it->second; } return NULL; } void stopClient(const std::string& client_name) { std::lock_guard lk(mutex); std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { it->second->stop(); it->second->on_open = NULL; it->second->on_message = NULL; it->second->on_close = NULL; it->second->on_error = NULL; #ifdef DEBUG std::cout << client_name << " has been stopped" << std::endl; #endif } #ifdef DEBUG else { std::cerr << client_name << " has not been created" << std::endl; } #endif } void removeClient(const std::string& client_name) { std::lock_guard lk(mutex); std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { // Stop the client asynchronously in 100 ms. // This is to ensure, that all threads involving the client have been launched. std::shared_ptr client = it->second; #ifdef DEBUG std::thread t([client, client_name](){ #else std::thread t([client](){ #endif std::this_thread::sleep_for(std::chrono::milliseconds(100)); client->stop(); client->on_open = NULL; client->on_message = NULL; client->on_close = NULL; client->on_error = NULL; #ifdef DEBUG std::cout << client_name << " has been removed" << std::endl; #endif }); it->second.reset(); client_map.erase(it); t.detach(); } #ifdef DEBUG else { std::cerr << client_name << " has not been created" << std::endl; } #endif } //! //! \brief Returns a string containing all advertised topics. //! \return Returns a string containing all advertised topics. //! //! \note This function will wait until the /rosapi/topics service is available. //! \note Call connected() in advance to ensure that a connection was established. //! \pre Connection must be established, \see \fn connected(). //! std::string get_advertised_topics(){ auto pPromise(std::make_shared>()); auto future = pPromise->get_future(); this->callService("/rosapi/topics", [pPromise]( std::shared_ptr connection, std::shared_ptr in_message){ pPromise->set_value(in_message->string()); connection->send_close(1000); }); future.wait(); return future.get(); } //! //! \brief Returns a string containing all advertised services. //! \return Returns a string containing all advertised services. //! //! \note This function will wait until the /rosapi/services service is available. //! \note Call connected() in advance to ensure that a connection was established. //! std::string get_advertised_services(){ auto pPromise(std::make_shared>()); auto future = pPromise->get_future(); // Call /rosapi/services to see if topic is already available. this->callService("/rosapi/services", [pPromise]( std::shared_ptr connection, std::shared_ptr in_message){ pPromise->set_value(in_message->string()); connection->send_close(1000); }); future.wait(); return future.get(); } bool topicAvailable(const std::string &topic){ #ifdef DEBUG std::cout << "checking if topic " << topic << " is available" << std::endl; #endif std::string advertised_topics = this->get_advertised_topics(); auto pos = advertised_topics.find(topic); return pos != std::string::npos ? true : false; } // Gets the client from client_map and starts it. Advertising is essentially sending a message. // One client per topic! void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "") { std::lock_guard lk(mutex); std::unordered_map>::iterator it_client = client_map.find(client_name); if (it_client != client_map.end()) { auto it_ser_top = std::find_if(service_topic_list.begin(), service_topic_list.end(), [topic](const EntryData &td){ return topic == std::get(td); }); if ( it_ser_top != service_topic_list.end()){ #ifdef DEBUG std::cerr << "topic: " << topic << " already advertised" << std::endl; #endif return; } auto client = it_client->second; std::weak_ptr wpClient = client; service_topic_list.push_back(std::make_tuple(EntryType::AdvertisedTopic, topic, client_name, wpClient)); std::string message = "\"op\":\"advertise\", \"topic\":\"" + topic + "\", \"type\":\"" + type + "\""; if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } message = "{" + message + "}"; #ifdef DEBUG client->on_open = [this, topic, message, client_name](std::shared_ptr connection) { #else client->on_open = [this, topic, message](std::shared_ptr connection) { #endif #ifdef DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); }; start(client_name, client, message); } #ifdef DEBUG else { std::cerr << client_name << "has not been created" << std::endl; } #endif } //! //! \brief Unadvertises the topice \p topic //! \param topic The topic to be unadvertised //! \param id //! \pre The topic \p topic must be advertised, \see topicAvailable(). //! void unadvertise(const std::string& topic, const std::string& id = ""){ std::lock_guard lk(mutex); auto it_ser_top = std::find_if(service_topic_list.begin(), service_topic_list.end(), [topic](const EntryData &td){ return topic == std::get(td); }); if ( it_ser_top == service_topic_list.end()){ #ifdef DEBUG std::cerr << "topic: " << topic << " not advertised" << std::endl; #endif return; } std::string message = "\"op\":\"unadvertise\""; if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } message += ", \"topic\":\"" + topic + "\""; message = "{" + message + "}"; std::string client_name = "topic_unadvertiser" + topic; auto client = std::make_shared(server_port_path); #ifdef DEBUG client->on_open = [client_name, message](std::shared_ptr connection) { #else client->on_open = [message](std::shared_ptr connection) { #endif #ifdef DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); // unadvertise connection->send_close(1000); }; start(client_name, client, message); service_topic_list.erase(it_ser_top); } void unadvertiseAll(){ for (auto entry : service_topic_list){ if ( std::get(entry) == EntryType::AdvertisedTopic ){ unadvertise(std::get(entry)); } } } //! //! \brief Publishes the message \p msg to the topic \p topic. //! \param topic The topic to publish the message. //! \param msg The message to publish. //! \param id //! \pre The topic \p topic must be advertised, \see topicAvailable(). //! void publish(const std::string& topic, const rapidjson::Document& msg, const std::string& id = "") { std::lock_guard lk(mutex); auto it_ser_top = std::find_if(service_topic_list.begin(), service_topic_list.end(), [&topic](const EntryData &td){ return topic == std::get(td); }); if ( it_ser_top == service_topic_list.end() ){ #ifdef DEBUG std::cerr << "topic: " << topic << " not yet advertised" << std::endl; #endif return; } rapidjson::StringBuffer strbuf; rapidjson::Writer writer(strbuf); msg.Accept(writer); std::string client_name = "publish_client" + topic; std::string message = "\"op\":\"publish\", \"topic\":\"" + topic + "\", \"msg\":" + strbuf.GetString(); if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } message = "{" + message + "}"; std::shared_ptr publish_client = std::make_shared(server_port_path); #ifdef DEBUG publish_client->on_open = [message, client_name](std::shared_ptr connection) { #else publish_client->on_open = [message, client_name](std::shared_ptr connection) { #endif #ifdef DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message." << std::endl; //std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); // TODO: This could be improved by creating a thread to keep publishing the message instead of closing it right away connection->send_close(1000); }; start(client_name, publish_client, message); } //! //! \brief Subscribes the client \p client_name to the topic \p topic. //! \param client_name //! \param topic //! \param callback //! \param id //! \param type //! \param throttle_rate //! \param queue_length //! \param fragment_size //! \param compression //! \pre The topic \p topic must be advertised, \see topicAvailable(). //! void subscribe(const std::string& client_name, const std::string& topic, const InMessage& callback, const std::string& id = "", const std::string& type = "", int throttle_rate = -1, int queue_length = -1, int fragment_size = -1, const std::string& compression = "") { std::lock_guard lk(mutex); std::unordered_map>::iterator it_client = client_map.find(client_name); if (it_client != client_map.end()) { auto it_ser_top = std::find_if(service_topic_list.begin(), service_topic_list.end(), [topic](const EntryData &td){ return topic == std::get(td); }); if ( it_ser_top != service_topic_list.end()){ #ifdef DEBUG std::cerr << "topic: " << topic << " already advertised" << std::endl; #endif return; } auto client = it_client->second; std::weak_ptr wpClient = client; service_topic_list.push_back(std::make_tuple(EntryType::SubscribedTopic, topic, client_name, wpClient)); std::string message = "\"op\":\"subscribe\", \"topic\":\"" + topic + "\""; if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } if (type.compare("") != 0) { message += ", \"type\":\"" + type + "\""; } if (throttle_rate > -1) { message += ", \"throttle_rate\":" + std::to_string(throttle_rate); } if (queue_length > -1) { message += ", \"queue_length\":" + std::to_string(queue_length); } if (fragment_size > -1) { message += ", \"fragment_size\":" + std::to_string(fragment_size); } if (compression.compare("none") == 0 || compression.compare("png") == 0) { message += ", \"compression\":\"" + compression + "\""; } message = "{" + message + "}"; client->on_message = callback; this->start(client_name, client, message); // subscribe to topic } #ifdef DEBUG else { std::cerr << client_name << "has not been created" << std::endl; } #endif } void unsubscribe(const std::string& topic, const std::string& id = ""){ std::lock_guard lk(mutex); auto it_ser_top = std::find_if(service_topic_list.begin(), service_topic_list.end(), [topic](const EntryData &td){ return topic == std::get(td); }); if ( it_ser_top == service_topic_list.end()){ #ifdef DEBUG std::cerr << "topic: " << topic << " not advertised" << std::endl; #endif return; } std::string message = "\"op\":\"unsubscribe\""; if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } message += ", \"topic\":\"" + topic + "\""; message = "{" + message + "}"; std::string client_name = "topic_unsubscriber" + topic; auto client = std::make_shared(server_port_path); #ifdef DEBUG client->on_open = [client_name, message](std::shared_ptr connection) { #else client->on_open = [message](std::shared_ptr connection) { #endif #ifdef DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); // unadvertise connection->send_close(1000); }; start(client_name, client, message); service_topic_list.erase(it_ser_top); } void unsubscribeAll(){ for (auto entry : service_topic_list){ if( std::get(entry) == EntryType::SubscribedTopic ) { unsubscribe(std::get(entry)); } } } void advertiseService(const std::string& client_name, const std::string& service, const std::string& type, const InMessage& callback) { std::lock_guard lk(mutex); std::unordered_map>::iterator it_client = client_map.find(client_name); if (it_client != client_map.end()) { auto it_ser_top = std::find_if(service_topic_list.begin(), service_topic_list.end(), [service](const EntryData &td){ return service == std::get(td); }); if ( it_ser_top != service_topic_list.end()){ #ifdef DEBUG std::cerr << "service: " << service << " already advertised" << std::endl; #endif return; } auto client = it_client->second; std::weak_ptr wpClient = client; service_topic_list.push_back(std::make_tuple(EntryType::AdvertisedService, service, client_name, wpClient)); std::string message = "{\"op\":\"advertise_service\", \"service\":\"" + service + "\", \"type\":\"" + type + "\"}"; it_client->second->on_message = callback; start(client_name, it_client->second, message); } #ifdef DEBUG else { std::cerr << client_name << "has not been created" << std::endl; } #endif } void unadvertiseService(const std::string& service){ std::lock_guard lk(mutex); auto it_ser_top = std::find_if(service_topic_list.begin(), service_topic_list.end(), [service](const EntryData &td){ return service == std::get(td); }); if ( it_ser_top == service_topic_list.end()){ #ifdef DEBUG std::cerr << "service: " << service << " not advertised" << std::endl; #endif return; } std::string message = "\"op\":\"unadvertise_service\""; message += ", \"service\":\"" + service + "\""; message = "{" + message + "}"; std::string client_name = "topic_unsubscriber" + service; auto client = std::make_shared(server_port_path); #ifdef DEBUG client->on_open = [client_name, message](std::shared_ptr connection) { #else client->on_open = [message](std::shared_ptr connection) { #endif #ifdef DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); // unadvertise connection->send_close(1000); }; start(client_name, client, message); service_topic_list.erase(it_ser_top); } void unadvertiseAllServices(){ for (auto entry : service_topic_list){ if( std::get(entry) == EntryType::AdvertisedService ) { unadvertiseService(std::get(entry)); } } } void serviceResponse(const std::string& service, const std::string& id, bool result, const rapidjson::Document& values) { std::string message = "\"op\":\"service_response\", \"service\":\"" + service + "\", \"result\":" + ((result)? "true" : "false"); // Rosbridge somehow does not allow service_response to be sent without id and values // , so we cannot omit them even though the documentation says they are optional. message += ", \"id\":\"" + id + "\""; // Convert JSON document to string rapidjson::StringBuffer strbuf; rapidjson::Writer writer(strbuf); values.Accept(writer); message += ", \"values\":" + std::string(strbuf.GetString()); message = "{" + message + "}"; std::string client_name = "service_response_client" + service; std::shared_ptr service_response_client = std::make_shared(server_port_path); #ifdef DEBUG service_response_client->on_open = [message, client_name](std::shared_ptr connection) { #else service_response_client->on_open = [message](std::shared_ptr connection) { #endif #ifdef DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); connection->send_close(1000); }; start(client_name, service_response_client, message); } void callService(const std::string& service, const InMessage& callback, const rapidjson::Document& args = {}, const std::string& id = "", int fragment_size = -1, const std::string& compression = "") { std::string message = "\"op\":\"call_service\", \"service\":\"" + service + "\""; if (!args.IsNull()) { rapidjson::StringBuffer strbuf; rapidjson::Writer writer(strbuf); args.Accept(writer); message += ", \"args\":" + std::string(strbuf.GetString()); } if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } if (fragment_size > -1) { message += ", \"fragment_size\":" + std::to_string(fragment_size); } if (compression.compare("none") == 0 || compression.compare("png") == 0) { message += ", \"compression\":\"" + compression + "\""; } message = "{" + message + "}"; std::string client_name = "call_service_client" + service; std::shared_ptr call_service_client = std::make_shared(server_port_path); if (callback) { call_service_client->on_message = callback; } else { call_service_client->on_message = [client_name](std::shared_ptr connection, std::shared_ptr in_message) { #ifdef DEBUG std::cout << client_name << ": Message received: " << in_message->string() << std::endl; std::cout << client_name << ": Sending close connection" << std::endl; #endif connection->send_close(1000); }; } start(client_name, call_service_client, message); } //! //! \brief Checks if the service \p service is available. //! \param service Service name. //! \return Returns true if the service is available, false either. //! \note Don't call this function to frequently. Use \fn waitForService() instead. //! bool serviceAvailable(const std::string& service) { #ifdef DEBUG std::cout << "checking if service " << service << " is available" << std::endl; #endif std::string advertised_services = this->get_advertised_services(); auto pos = advertised_services.find(service); return pos != std::string::npos ? true : false; } //! //! \brief Waits until the service with the name \p service is available. //! \param service Service name. //! @note This function will block as long as the service is not available. //! void waitForService(const std::string& service) { auto stop = std::make_shared(false); waitForService(service, stop); } //! //! \brief Waits until the service with the name \p service is available. //! \param service Service name. //! \param stop Flag to stop waiting. //! @note This function will block as long as the service is not available or \p stop is false. //! void waitForService(const std::string& service, const std::shared_ptr stop) { #ifdef DEBUG auto s = std::chrono::high_resolution_clock::now(); long counter = 0; #endif while( !stop->load() ) { #ifdef DEBUG ++counter; #endif if ( serviceAvailable(service) ){ break; } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid excessive polling. }; #ifdef DEBUG auto e = std::chrono::high_resolution_clock::now(); std::cout << "waitForService() " << service << " time: " << std::chrono::duration_cast(e-s).count() << " ms." << std::endl; std::cout << "waitForTopic() " << service << ": number of calls to topicAvailable: " << counter << std::endl; #endif } //! //! \brief Waits until the topic with the name \p topic is available. //! \param topic Topic name. //! @note This function will block as long as the topic is not available. //! void waitForTopic(const std::string& topic) { auto stop = std::make_shared(false); waitForTopic(topic, stop); } //! //! \brief Waits until the topic with the name \p topic is available. //! \param topic Topic name. //! \param stop Flag to stop waiting. //! @note This function will block as long as the topic is not available or \p stop is false. //! void waitForTopic(const std::string& topic, const std::shared_ptr stop) { #ifdef DEBUG auto s = std::chrono::high_resolution_clock::now(); long counter = 0; #endif while( !stop->load()) { #ifdef DEBUG ++counter; #endif if ( topicAvailable(topic) ){ break; } std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid excessive polling. }; #ifdef DEBUG auto e = std::chrono::high_resolution_clock::now(); std::cout << "waitForTopic() " << topic << " time: " << std::chrono::duration_cast(e-s).count() << " ms." << std::endl; std::cout << "waitForTopic() " << topic << ": number of calls to topicAvailable: " << counter << std::endl; #endif } };