#ifndef MAIN_H #define MAIN_H /* * Created on: Apr 16, 2018 * Author: Poom Pianpak */ #include "Simple-WebSocket-Server/client_ws.hpp" #include "rapidjson/include/rapidjson/document.h" #include "rapidjson/include/rapidjson/writer.h" #include "rapidjson/include/rapidjson/stringbuffer.h" #include "rapidjson/include/rapidjson/ostreamwrapper.h" #include #include #include using WsClient = SimpleWeb::SocketClient; using InMessage = std::function, std::shared_ptr)>; class RosbridgeWsClient { std::string server_port_path; std::unordered_map> client_map; void start(const std::string& client_name, std::shared_ptr client, const std::string& message) { if (!client->on_open) { #ifdef DEBUG client->on_open = [client_name, message](std::shared_ptr connection) { #else client->on_open = [message](std::shared_ptr connection) { #endif #ifdef DEBUG std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl; #endif connection->send(message); }; } #ifdef DEBUG if (!client->on_message) { client->on_message = [client_name](std::shared_ptr /*connection*/, std::shared_ptr in_message) { std::cout << client_name << ": Message received: " << in_message->string() << std::endl; }; } if (!client->on_close) { client->on_close = [client_name](std::shared_ptr /*connection*/, int status, const std::string & /*reason*/) { std::cout << client_name << ": Closed connection with status code " << status << std::endl; }; } if (!client->on_error) { // See http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference.html, Error Codes for error code meanings client->on_error = [client_name](std::shared_ptr /*connection*/, const SimpleWeb::error_code &ec) { std::cout << client_name << ": Error: " << ec << ", error message: " << ec.message() << std::endl; }; } #endif #ifdef DEBUG std::thread client_thread([client_name, client]() { #else std::thread client_thread([client]() { #endif client->start(); #ifdef DEBUG std::cout << client_name << ": Terminated" << std::endl; #endif client->on_open = NULL; client->on_message = NULL; client->on_close = NULL; client->on_error = NULL; }); client_thread.detach(); // This is to make sure that the thread got fully launched before we do anything to it (e.g. remove) std::this_thread::sleep_for(std::chrono::milliseconds(100)); } public: RosbridgeWsClient(const std::string& server_port_path) { this->server_port_path = server_port_path; } ~RosbridgeWsClient() { for (auto& client : client_map) { client.second->stop(); client.second.reset(); } } void addClient(const std::string& client_name) { std::unordered_map>::iterator it = client_map.find(client_name); if (it == client_map.end()) { client_map[client_name] = std::make_shared(server_port_path); } #ifdef DEBUG else { std::cerr << client_name << " has already been created" << std::endl; } #endif } std::shared_ptr getClient(const std::string& client_name) { std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { return it->second; } return NULL; } void stopClient(const std::string& client_name) { std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { it->second->stop(); it->second->on_open = NULL; it->second->on_message = NULL; it->second->on_close = NULL; it->second->on_error = NULL; #ifdef DEBUG std::cout << client_name << " has been stopped" << std::endl; #endif } #ifdef DEBUG else { std::cerr << client_name << " has not been created" << std::endl; } #endif } void removeClient(const std::string& client_name) { std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { it->second->stop(); it->second.reset(); client_map.erase(it); #ifdef DEBUG std::cout << client_name << " has been removed" << std::endl; #endif } #ifdef DEBUG else { std::cerr << client_name << " has not been created" << std::endl; } #endif } void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "") { std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { std::string message = "\"op\":\"advertise\", \"topic\":\"" + topic + "\", \"type\":\"" + type + "\""; if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } message = "{" + message + "}"; start(client_name, it->second, message); } #ifdef DEBUG else { std::cerr << client_name << "has not been created" << std::endl; } #endif } void publish(const std::string& topic, const rapidjson::Document& msg, const std::string& id = "") { rapidjson::StringBuffer strbuf; rapidjson::Writer writer(strbuf); msg.Accept(writer); std::string message = "\"op\":\"publish\", \"topic\":\"" + topic + "\", \"msg\":" + strbuf.GetString(); if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } message = "{" + message + "}"; std::shared_ptr publish_client = std::make_shared(server_port_path); publish_client->on_open = [message](std::shared_ptr connection) { #ifdef DEBUG std::cout << "publish_client: Opened connection" << std::endl; std::cout << "publish_client: Sending message: " << message << std::endl; #endif connection->send(message); // TODO: This could be improved by creating a thread to keep publishing the message instead of closing it right away connection->send_close(1000); }; start("publish_client", publish_client, message); } void subscribe(const std::string& client_name, const std::string& topic, const InMessage& callback, const std::string& id = "", const std::string& type = "", int throttle_rate = -1, int queue_length = -1, int fragment_size = -1, const std::string& compression = "") { std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { std::string message = "\"op\":\"subscribe\", \"topic\":\"" + topic + "\""; if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } if (type.compare("") != 0) { message += ", \"type\":\"" + type + "\""; } if (throttle_rate > -1) { message += ", \"throttle_rate\":" + std::to_string(throttle_rate); } if (queue_length > -1) { message += ", \"queue_length\":" + std::to_string(queue_length); } if (fragment_size > -1) { message += ", \"fragment_size\":" + std::to_string(fragment_size); } if (compression.compare("none") == 0 || compression.compare("png") == 0) { message += ", \"compression\":\"" + compression + "\""; } message = "{" + message + "}"; it->second->on_message = callback; start(client_name, it->second, message); } #ifdef DEBUG else { std::cerr << client_name << "has not been created" << std::endl; } #endif } void advertiseService(const std::string& client_name, const std::string& service, const std::string& type, const InMessage& callback) { std::unordered_map>::iterator it = client_map.find(client_name); if (it != client_map.end()) { std::string message = "{\"op\":\"advertise_service\", \"service\":\"" + service + "\", \"type\":\"" + type + "\"}"; it->second->on_message = callback; start(client_name, it->second, message); } #ifdef DEBUG else { std::cerr << client_name << "has not been created" << std::endl; } #endif } void serviceResponse(const std::string& service, const std::string& id, bool result, const rapidjson::Document& values = {}) { std::string message = "\"op\":\"service_response\", \"service\":\"" + service + "\", \"result\":" + ((result)? "true" : "false"); // Rosbridge somehow does not allow service_response to be sent without id and values // , so we cannot omit them even though the documentation says they are optional. message += ", \"id\":\"" + id + "\""; // Convert JSON document to string rapidjson::StringBuffer strbuf; rapidjson::Writer writer(strbuf); values.Accept(writer); message += ", \"values\":" + std::string(strbuf.GetString()); message = "{" + message + "}"; std::shared_ptr service_response_client = std::make_shared(server_port_path); service_response_client->on_open = [message](std::shared_ptr connection) { #ifdef DEBUG std::cout << "service_response_client: Opened connection" << std::endl; std::cout << "service_response_client: Sending message: " << message << std::endl; #endif connection->send(message); connection->send_close(1000); }; start("service_response_client", service_response_client, message); } void callService(const std::string& service, const InMessage& callback, const rapidjson::Document& args = {}, const std::string& id = "", int fragment_size = -1, const std::string& compression = "") { std::string message = "\"op\":\"call_service\", \"service\":\"" + service + "\""; if (!args.IsNull()) { rapidjson::StringBuffer strbuf; rapidjson::Writer writer(strbuf); args.Accept(writer); message += ", \"args\":" + std::string(strbuf.GetString()); } if (id.compare("") != 0) { message += ", \"id\":\"" + id + "\""; } if (fragment_size > -1) { message += ", \"fragment_size\":" + std::to_string(fragment_size); } if (compression.compare("none") == 0 || compression.compare("png") == 0) { message += ", \"compression\":\"" + compression + "\""; } message = "{" + message + "}"; std::shared_ptr call_service_client = std::make_shared(server_port_path); if (callback) { call_service_client->on_message = callback; } else { call_service_client->on_message = [](std::shared_ptr connection, std::shared_ptr in_message) { #ifdef DEBUG std::cout << "call_service_client: Message received: " << in_message->string() << std::endl; std::cout << "call_service_client: Sending close connection" << std::endl; #endif connection->send_close(1000); }; } start("call_service_client", call_service_client, message); } }; #endif // MAIN_H