RosBridgeClient.h 28.8 KB
Newer Older
Valentin Platzgummer's avatar
Valentin Platzgummer committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#pragma once

/*
*  Created on: Apr 16, 2018
*      Author: Poom Pianpak
*/

#include "Simple-WebSocket-Server/client_ws.hpp"

#include "rapidjson/include/rapidjson/document.h"
#include "rapidjson/include/rapidjson/writer.h"
#include "rapidjson/include/rapidjson/stringbuffer.h"

#include <chrono>
#include <functional>
#include <thread>
17
#include <future>
18
#include <mutex>
19 20
#include <tuple>
#include <deque>
Valentin Platzgummer's avatar
Valentin Platzgummer committed
21 22 23 24

using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>;
using InMessage =  std::function<void(std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::InMessage>)>;

25 26 27 28 29 30
template <typename T>
constexpr typename std::underlying_type<T>::type integral(T value)
{
    return static_cast<typename std::underlying_type<T>::type>(value);
}

Valentin Platzgummer's avatar
Valentin Platzgummer committed
31 32
class RosbridgeWsClient
{
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
    enum class EntryType{
        SubscribedTopic,
        AdvertisedTopic,
        AdvertisedService,
    };

    using EntryData = std::tuple<EntryType,
                             std::string /*service/topic name*/,
                             std::string /*client name*/,
                             std::weak_ptr<WsClient> /*client*/>;

    enum class EntryEnum{
        EntryType = 0,
        ServiceTopicName = 1,
        ClientName = 2,
        WPClient = 3
    };
50

51
  std::string server_port_path;
52 53
  std::unordered_map<std::string /*client name*/,
                     std::shared_ptr<WsClient> /*client*/> client_map;
54
  std::deque<EntryData> service_topic_list;
55
  std::mutex mutex;
56 57 58 59 60 61 62

  void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message)
  {
    if (!client->on_open)
    {
#ifdef DEBUG
      client->on_open = [client_name, message](std::shared_ptr<WsClient::Connection> connection) {
Valentin Platzgummer's avatar
Valentin Platzgummer committed
63
#else
64
      client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
Valentin Platzgummer's avatar
Valentin Platzgummer committed
65 66
#endif

67 68 69
#ifdef DEBUG
        std::cout << client_name << ": Opened connection" << std::endl;
        std::cout << client_name << ": Sending message: " << message << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
70
#endif
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
        connection->send(message);
      };
    }

#ifdef DEBUG
    if (!client->on_message)
    {
      client->on_message = [client_name](std::shared_ptr<WsClient::Connection> /*connection*/, std::shared_ptr<WsClient::InMessage> in_message) {
        std::cout << client_name << ": Message received: " << in_message->string() << std::endl;
      };
    }

    if (!client->on_close)
    {
      client->on_close = [client_name](std::shared_ptr<WsClient::Connection> /*connection*/, int status, const std::string & /*reason*/) {
        std::cout << client_name << ": Closed connection with status code " << status << std::endl;
      };
    }

    if (!client->on_error)
    {
      // See http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference.html, Error Codes for error code meanings
      client->on_error = [client_name](std::shared_ptr<WsClient::Connection> /*connection*/, const SimpleWeb::error_code &ec) {
        std::cout << client_name << ": Error: " << ec << ", error message: " << ec.message() << std::endl;
      };
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
97

98
#endif
99 100
#ifdef DEBUG
    std::thread client_thread([client_name, client]() {
Valentin Platzgummer's avatar
Valentin Platzgummer committed
101
#else
102
    std::thread client_thread([client]() {
Valentin Platzgummer's avatar
Valentin Platzgummer committed
103
#endif
104
      client->start();
Valentin Platzgummer's avatar
Valentin Platzgummer committed
105

106 107
#ifdef DEBUG
      std::cout << client_name << ": Terminated" << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
108
#endif
109 110 111 112 113
      client->on_open = NULL;
      client->on_message = NULL;
      client->on_close = NULL;
      client->on_error = NULL;
    });
Valentin Platzgummer's avatar
Valentin Platzgummer committed
114

115 116
    client_thread.detach();
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
117 118

public:
119
  RosbridgeWsClient(const std::string& server_port_path)
120 121 122 123 124 125
  {
    this->server_port_path = server_port_path;
  }

  ~RosbridgeWsClient()
  {
126 127 128
    unsubscribeAll();
    unadvertiseAll();
    unadvertiseAllServices();
129 130
    for (auto& client : client_map)
    {
131
      removeClient(client.first);
132 133
    }
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
134

135 136 137
 // The execution can take up to 100 ms!
 bool connected(){

138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160
//    auto p = std::make_shared<std::promise<void>>();
//    auto future = p->get_future();

//    auto status_client = std::make_shared<WsClient>(server_port_path);
//    status_client->on_open = [p](std::shared_ptr<WsClient::Connection>) {
//            p->set_value();
//    };

//    std::thread t([status_client]{
//        status_client->start();
//        status_client->on_open = NULL;
//        status_client->on_message = NULL;
//        status_client->on_close = NULL;
//        status_client->on_error = NULL;
//    });

//    auto status = future.wait_for(std::chrono::milliseconds(20));
//    status_client->stop();
//    t.join();
//    bool connected = status == std::future_status::ready;
//    //std::cout << "connected(): " << connected << std::endl;
//    return connected;
     return true;
161 162
 }

163 164 165
  // Adds a client to the client_map.
  void addClient(const std::string& client_name)
  {
166
    std::lock_guard<std::mutex> lk(mutex);
167 168 169 170 171 172 173 174 175 176
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
    if (it == client_map.end())
    {
      client_map[client_name] = std::make_shared<WsClient>(server_port_path);
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << " has already been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
177
#endif
178 179 180 181
  }

  std::shared_ptr<WsClient> getClient(const std::string& client_name)
  {
182
    std::lock_guard<std::mutex> lk(mutex);
183 184 185 186 187 188 189 190 191 192
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
    if (it != client_map.end())
    {
      return it->second;
    }
    return NULL;
  }

  void stopClient(const std::string& client_name)
  {
193
    std::lock_guard<std::mutex> lk(mutex);
194 195 196 197 198 199 200 201 202 203
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
    if (it != client_map.end())
    {
      it->second->stop();
      it->second->on_open = NULL;
      it->second->on_message = NULL;
      it->second->on_close = NULL;
      it->second->on_error = NULL;
#ifdef DEBUG
      std::cout << client_name << " has been stopped" << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
204
#endif
205 206 207 208 209 210
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << " has not been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
211
#endif
212 213 214 215
  }

  void removeClient(const std::string& client_name)
  {
216
    std::lock_guard<std::mutex> lk(mutex);
217 218 219 220 221
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
    if (it != client_map.end())
    {
      // Stop the client asynchronously in 100 ms.
      // This is to ensure, that all threads involving the client have been launched.
222 223 224 225 226 227 228 229
      std::shared_ptr<WsClient> client = it->second;
#ifdef DEBUG
      std::thread t([client, client_name](){
#else
      std::thread t([client](){
#endif
          std::this_thread::sleep_for(std::chrono::milliseconds(100));
          client->stop();
230 231 232 233
          client->on_open = NULL;
          client->on_message = NULL;
          client->on_close = NULL;
          client->on_error = NULL;
234 235
#ifdef DEBUG
      std::cout << client_name << " has been removed" << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
236
#endif
237
      });
238
      it->second.reset();
239 240
      client_map.erase(it);
      t.detach();
241 242 243 244 245 246
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << " has not been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
247
#endif
248 249
  }

250 251 252 253 254 255
  //!
  //! \brief Returns a string containing all advertised topics.
  //! \return Returns a string containing all advertised topics.
  //!
  //! \note This function will wait until the /rosapi/topics service is available.
  //! \note Call connected() in advance to ensure that a connection was established.
256
  //! \pre Connection must be established, \see \fn connected().
257 258
  //!
  std::string get_advertised_topics(){
259 260 261
      auto pPromise(std::make_shared<std::promise<std::string>>());
      auto future = pPromise->get_future();
      this->callService("/rosapi/topics", [pPromise](
262 263
                        std::shared_ptr<WsClient::Connection> connection,
                        std::shared_ptr<WsClient::InMessage> in_message){
264
          pPromise->set_value(in_message->string());
265 266 267 268 269 270 271 272 273 274 275 276 277 278
          connection->send_close(1000);
      });
      future.wait();
      return future.get();
  }

  //!
  //! \brief Returns a string containing all advertised services.
  //! \return Returns a string containing all advertised services.
  //!
  //! \note This function will wait until the /rosapi/services service is available.
  //! \note Call connected() in advance to ensure that a connection was established.
  //!
  std::string get_advertised_services(){
279 280
      auto pPromise(std::make_shared<std::promise<std::string>>());
      auto future = pPromise->get_future();
281
      // Call /rosapi/services to see if topic is already available.
282
      this->callService("/rosapi/services", [pPromise](
283 284
                        std::shared_ptr<WsClient::Connection> connection,
                        std::shared_ptr<WsClient::InMessage> in_message){
285
          pPromise->set_value(in_message->string());
286 287 288 289 290 291
          connection->send_close(1000);
      });
      future.wait();
      return future.get();
  }

292 293 294 295 296 297 298 299 300
  bool topicAvailable(const std::string &topic){
#ifdef DEBUG
      std::cout << "checking if topic " << topic << " is available" << std::endl;
#endif
      std::string advertised_topics = this->get_advertised_topics();
      auto pos = advertised_topics.find(topic);
      return pos != std::string::npos ? true : false;
  }

301 302 303 304
  // Gets the client from client_map and starts it. Advertising is essentially sending a message.
  // One client per topic!
  void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "")
  {
305 306 307
    std::lock_guard<std::mutex> lk(mutex);
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it_client = client_map.find(client_name);
    if (it_client != client_map.end())
308
    {
309 310 311 312
      auto it_ser_top = std::find_if(service_topic_list.begin(),
                                   service_topic_list.end(),
                                   [topic](const EntryData &td){
          return topic == std::get<integral(EntryEnum::ServiceTopicName)>(td);
313
      });
314
      if ( it_ser_top != service_topic_list.end()){
315 316 317 318 319 320
#ifdef DEBUG
            std::cerr << "topic: " << topic << " already advertised" << std::endl;
#endif
            return;
      }
      auto client = it_client->second;
321 322
      std::weak_ptr<WsClient> wpClient = client;
      service_topic_list.push_back(std::make_tuple(EntryType::AdvertisedTopic, topic, client_name, wpClient));
323

324
      std::string message = "\"op\":\"advertise\", \"topic\":\"" + topic + "\", \"type\":\"" + type + "\"";
325 326 327 328 329 330
      if (id.compare("") != 0)
      {
        message += ", \"id\":\"" + id + "\"";
      }
      message = "{" + message + "}";

331
#ifdef DEBUG
332
      client->on_open = [this, topic, message, client_name](std::shared_ptr<WsClient::Connection> connection) {
333
#else
334
      client->on_open = [this, topic, message](std::shared_ptr<WsClient::Connection> connection) {
335 336 337 338 339 340 341 342 343 344
#endif

#ifdef DEBUG
        std::cout << client_name << ": Opened connection" << std::endl;
        std::cout << client_name << ": Sending message: " << message << std::endl;
#endif
        connection->send(message);
      };

      start(client_name, client, message);
345 346 347 348 349 350
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << "has not been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
351
#endif
352
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
353

354 355 356 357 358 359 360
  //!
  //! \brief Unadvertises the topice \p topic
  //! \param topic The topic to be unadvertised
  //! \param id
  //! \pre The topic \p topic must be advertised, \see topicAvailable().
  //!
  void unadvertise(const std::string& topic,
361 362
                   const std::string& id = ""){
      std::lock_guard<std::mutex> lk(mutex);
363 364 365 366 367 368
      auto it_ser_top = std::find_if(service_topic_list.begin(),
                                   service_topic_list.end(),
                                   [topic](const EntryData &td){
          return topic == std::get<integral(EntryEnum::ServiceTopicName)>(td);
      });
      if ( it_ser_top == service_topic_list.end()){
369
  #ifdef DEBUG
370
          std::cerr << "topic: " << topic << " not advertised" << std::endl;
371
  #endif
372 373
          return;
      }
374

375 376 377
      std::string message = "\"op\":\"unadvertise\"";
      if (id.compare("") != 0)
      {
378
          message += ", \"id\":\"" + id + "\"";
379 380 381
      }
      message += ", \"topic\":\"" + topic + "\"";
      message = "{" + message + "}";
382

383 384
      std::string client_name = "topic_unadvertiser" + topic;
      auto client = std::make_shared<WsClient>(server_port_path);
385
  #ifdef DEBUG
386
      client->on_open = [client_name, message](std::shared_ptr<WsClient::Connection> connection) {
387
  #else
388
      client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
389 390 391 392 393 394
  #endif

  #ifdef DEBUG
          std::cout << client_name << ": Opened connection" << std::endl;
          std::cout << client_name << ": Sending message: " << message << std::endl;
  #endif
395 396
          connection->send(message); // unadvertise
          connection->send_close(1000);
397 398 399
        };

        start(client_name, client, message);
400 401
        service_topic_list.erase(it_ser_top);
  }
402

403 404 405 406 407 408
  void unadvertiseAll(){
      for (auto entry : service_topic_list){
          if ( std::get<integral(EntryEnum::EntryType)>(entry) == EntryType::AdvertisedTopic ){
              unadvertise(std::get<integral(EntryEnum::ServiceTopicName)>(entry));
          }
      }
409 410
  }

411 412 413 414 415 416 417
    //!
    //! \brief Publishes the message \p msg to the topic \p topic.
    //! \param topic The topic to publish the message.
    //! \param msg The message to publish.
    //! \param id
    //! \pre The topic \p topic must be advertised, \see topicAvailable().
    //!
418 419
  void publish(const std::string& topic, const rapidjson::Document& msg, const std::string& id = "")
  {
420
    std::lock_guard<std::mutex> lk(mutex);
421 422 423 424
    auto it_ser_top = std::find_if(service_topic_list.begin(),
                                 service_topic_list.end(),
                                 [&topic](const EntryData &td){
        return topic == std::get<integral(EntryEnum::ServiceTopicName)>(td);
425
    });
426
    if ( it_ser_top == service_topic_list.end() ){
427 428 429 430 431
#ifdef DEBUG
        std::cerr << "topic: " << topic << " not yet advertised" << std::endl;
#endif
        return;
    }
432 433 434
    rapidjson::StringBuffer strbuf;
    rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf);
    msg.Accept(writer);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
435

436
    std::string client_name = "publish_client" + topic;
437
    std::string message = "\"op\":\"publish\", \"topic\":\"" + topic + "\", \"msg\":" + strbuf.GetString();
Valentin Platzgummer's avatar
Valentin Platzgummer committed
438

439 440 441 442 443
    if (id.compare("") != 0)
    {
      message += ", \"id\":\"" + id + "\"";
    }
    message = "{" + message + "}";
Valentin Platzgummer's avatar
Valentin Platzgummer committed
444

445
    std::shared_ptr<WsClient> publish_client = std::make_shared<WsClient>(server_port_path);
446 447 448 449 450
#ifdef DEBUG
    publish_client->on_open = [message, client_name](std::shared_ptr<WsClient::Connection> connection) {
#else
    publish_client->on_open = [message, client_name](std::shared_ptr<WsClient::Connection> connection) {
#endif
451
#ifdef DEBUG
452 453 454
      std::cout << client_name << ": Opened connection" << std::endl;
      std::cout << client_name << ": Sending message." << std::endl;
      //std::cout << client_name << ": Sending message: " << message << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
455
#endif
456
      connection->send(message);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
457

458 459 460
      // TODO: This could be improved by creating a thread to keep publishing the message instead of closing it right away
      connection->send_close(1000);
    };
Valentin Platzgummer's avatar
Valentin Platzgummer committed
461

462
    start(client_name, publish_client, message);
463 464
  }

465 466 467 468 469 470 471 472 473 474 475 476 477
  //!
  //! \brief Subscribes the client \p client_name to the topic \p topic.
  //! \param client_name
  //! \param topic
  //! \param callback
  //! \param id
  //! \param type
  //! \param throttle_rate
  //! \param queue_length
  //! \param fragment_size
  //! \param compression
  //! \pre The topic \p topic must be advertised, \see topicAvailable().
  //!
478 479
  void subscribe(const std::string& client_name, const std::string& topic, const InMessage& callback, const std::string& id = "", const std::string& type = "", int throttle_rate = -1, int queue_length = -1, int fragment_size = -1, const std::string& compression = "")
  {
480
    std::lock_guard<std::mutex> lk(mutex);
481 482
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it_client = client_map.find(client_name);
    if (it_client != client_map.end())
483
    {
484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
        auto it_ser_top = std::find_if(service_topic_list.begin(),
                                     service_topic_list.end(),
                                     [topic](const EntryData &td){
            return topic == std::get<integral(EntryEnum::ServiceTopicName)>(td);
        });
        if ( it_ser_top != service_topic_list.end()){
  #ifdef DEBUG
              std::cerr << "topic: " << topic << " already advertised" << std::endl;
  #endif
              return;
        }
        auto client = it_client->second;
        std::weak_ptr<WsClient> wpClient = client;
        service_topic_list.push_back(std::make_tuple(EntryType::SubscribedTopic, topic, client_name, wpClient));

499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524
      std::string message = "\"op\":\"subscribe\", \"topic\":\"" + topic + "\"";

      if (id.compare("") != 0)
      {
        message += ", \"id\":\"" + id + "\"";
      }
      if (type.compare("") != 0)
      {
        message += ", \"type\":\"" + type + "\"";
      }
      if (throttle_rate > -1)
      {
        message += ", \"throttle_rate\":" + std::to_string(throttle_rate);
      }
      if (queue_length > -1)
      {
        message += ", \"queue_length\":" + std::to_string(queue_length);
      }
      if (fragment_size > -1)
      {
        message += ", \"fragment_size\":" + std::to_string(fragment_size);
      }
      if (compression.compare("none") == 0 || compression.compare("png") == 0)
      {
        message += ", \"compression\":\"" + compression + "\"";
      }
525
      message = "{" + message + "}";
526 527

      client->on_message = callback;
528
      this->start(client_name, client, message); // subscribe to topic
529 530 531 532 533 534
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << "has not been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
535
#endif
536 537
  }

538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588
  void unsubscribe(const std::string& topic,
                   const std::string& id = ""){
      std::lock_guard<std::mutex> lk(mutex);
      auto it_ser_top = std::find_if(service_topic_list.begin(),
                                   service_topic_list.end(),
                                   [topic](const EntryData &td){
          return topic == std::get<integral(EntryEnum::ServiceTopicName)>(td);
      });
      if ( it_ser_top == service_topic_list.end()){
  #ifdef DEBUG
          std::cerr << "topic: " << topic << " not advertised" << std::endl;
  #endif
          return;
      }

      std::string message = "\"op\":\"unsubscribe\"";
      if (id.compare("") != 0)
      {
          message += ", \"id\":\"" + id + "\"";
      }
      message += ", \"topic\":\"" + topic + "\"";
      message = "{" + message + "}";

      std::string client_name = "topic_unsubscriber" + topic;
      auto client = std::make_shared<WsClient>(server_port_path);
  #ifdef DEBUG
      client->on_open = [client_name, message](std::shared_ptr<WsClient::Connection> connection) {
  #else
      client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
  #endif

  #ifdef DEBUG
          std::cout << client_name << ": Opened connection" << std::endl;
          std::cout << client_name << ": Sending message: " << message << std::endl;
  #endif
          connection->send(message); // unadvertise
          connection->send_close(1000);
        };

        start(client_name, client, message);
        service_topic_list.erase(it_ser_top);
  }

  void unsubscribeAll(){
      for (auto entry : service_topic_list){
        if( std::get<integral(EntryEnum::EntryType)>(entry) == EntryType::SubscribedTopic ) {
            unsubscribe(std::get<integral(EntryEnum::ServiceTopicName)>(entry));
        }
      }
  }

589 590
  void advertiseService(const std::string& client_name, const std::string& service, const std::string& type, const InMessage& callback)
  {
591
    std::lock_guard<std::mutex> lk(mutex);
592 593
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it_client = client_map.find(client_name);
    if (it_client != client_map.end())
594
    {
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609
        auto it_ser_top = std::find_if(service_topic_list.begin(),
                                  service_topic_list.end(),
                                  [service](const EntryData &td){
         return service == std::get<integral(EntryEnum::ServiceTopicName)>(td);
     });
     if ( it_ser_top != service_topic_list.end()){
#ifdef DEBUG
           std::cerr << "service: " << service << " already advertised" << std::endl;
#endif
           return;
     }
     auto client = it_client->second;
     std::weak_ptr<WsClient> wpClient = client;
     service_topic_list.push_back(std::make_tuple(EntryType::AdvertisedService, service, client_name, wpClient));

610 611
      std::string message = "{\"op\":\"advertise_service\", \"service\":\"" + service + "\", \"type\":\"" + type + "\"}";

612 613
      it_client->second->on_message = callback;
      start(client_name, it_client->second, message);
614 615 616 617 618 619
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << "has not been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
620
#endif
621
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
622

623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668
  void unadvertiseService(const std::string& service){
      std::lock_guard<std::mutex> lk(mutex);
      auto it_ser_top = std::find_if(service_topic_list.begin(),
                                   service_topic_list.end(),
                                   [service](const EntryData &td){
          return service == std::get<integral(EntryEnum::ServiceTopicName)>(td);
      });
      if ( it_ser_top == service_topic_list.end()){
  #ifdef DEBUG
          std::cerr << "service: " << service << " not advertised" << std::endl;
  #endif
          return;
      }

      std::string message = "\"op\":\"unadvertise_service\"";
      message += ", \"service\":\"" + service + "\"";
      message = "{" + message + "}";

      std::string client_name = "topic_unsubscriber" + service;
      auto client = std::make_shared<WsClient>(server_port_path);
  #ifdef DEBUG
      client->on_open = [client_name, message](std::shared_ptr<WsClient::Connection> connection) {
  #else
      client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
  #endif

  #ifdef DEBUG
          std::cout << client_name << ": Opened connection" << std::endl;
          std::cout << client_name << ": Sending message: " << message << std::endl;
  #endif
          connection->send(message); // unadvertise
          connection->send_close(1000);
        };

        start(client_name, client, message);
        service_topic_list.erase(it_ser_top);
  }

  void unadvertiseAllServices(){
      for (auto entry : service_topic_list){
        if( std::get<integral(EntryEnum::EntryType)>(entry) == EntryType::AdvertisedService ) {
            unadvertiseService(std::get<integral(EntryEnum::ServiceTopicName)>(entry));
        }
      }
  }

669
  void serviceResponse(const std::string& service, const std::string& id, bool result, const rapidjson::Document& values)
670 671
  {
    std::string message = "\"op\":\"service_response\", \"service\":\"" + service + "\", \"result\":" + ((result)? "true" : "false");
Valentin Platzgummer's avatar
Valentin Platzgummer committed
672

673 674 675
    // Rosbridge somehow does not allow service_response to be sent without id and values
    // , so we cannot omit them even though the documentation says they are optional.
    message += ", \"id\":\"" + id + "\"";
Valentin Platzgummer's avatar
Valentin Platzgummer committed
676

677 678 679 680
    // Convert JSON document to string
    rapidjson::StringBuffer strbuf;
    rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf);
    values.Accept(writer);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
681

682 683
    message += ", \"values\":" + std::string(strbuf.GetString());
    message = "{" + message + "}";
Valentin Platzgummer's avatar
Valentin Platzgummer committed
684

685
    std::string client_name = "service_response_client" + service;
686
    std::shared_ptr<WsClient> service_response_client = std::make_shared<WsClient>(server_port_path);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
687

688 689 690
#ifdef DEBUG
    service_response_client->on_open = [message, client_name](std::shared_ptr<WsClient::Connection> connection) {
#else
691
    service_response_client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
692
#endif
693
#ifdef DEBUG
694 695
      std::cout << client_name << ": Opened connection" << std::endl;
      std::cout << client_name << ": Sending message: " << message << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
696
#endif
697
      connection->send(message);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
698

699 700
      connection->send_close(1000);
    };
Valentin Platzgummer's avatar
Valentin Platzgummer committed
701

702
    start(client_name, service_response_client, message);
703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730
  }

  void callService(const std::string& service, const InMessage& callback, const rapidjson::Document& args = {}, const std::string& id = "", int fragment_size = -1, const std::string& compression = "")
  {
    std::string message = "\"op\":\"call_service\", \"service\":\"" + service + "\"";

    if (!args.IsNull())
    {
      rapidjson::StringBuffer strbuf;
      rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf);
      args.Accept(writer);

      message += ", \"args\":" + std::string(strbuf.GetString());
    }
    if (id.compare("") != 0)
    {
      message += ", \"id\":\"" + id + "\"";
    }
    if (fragment_size > -1)
    {
      message += ", \"fragment_size\":" + std::to_string(fragment_size);
    }
    if (compression.compare("none") == 0 || compression.compare("png") == 0)
    {
      message += ", \"compression\":\"" + compression + "\"";
    }
    message = "{" + message + "}";

731
    std::string client_name = "call_service_client" + service;
732 733 734 735 736 737 738 739
    std::shared_ptr<WsClient> call_service_client = std::make_shared<WsClient>(server_port_path);

    if (callback)
    {
      call_service_client->on_message = callback;
    }
    else
    {
740
      call_service_client->on_message = [client_name](std::shared_ptr<WsClient::Connection> connection, std::shared_ptr<WsClient::InMessage> in_message) {
741
#ifdef DEBUG
742 743
        std::cout << client_name << ": Message received: " << in_message->string() << std::endl;
        std::cout << client_name << ": Sending close connection" << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
744
#endif
745 746 747 748
        connection->send_close(1000);
      };
    }

749
    start(client_name, call_service_client, message);
750 751 752 753 754 755 756 757 758 759 760
  }

  //!
  //! \brief Checks if the service \p service is available.
  //! \param service Service name.
  //! \return Returns true if the service is available, false either.
  //! \note Don't call this function to frequently. Use \fn waitForService() instead.
  //!
  bool serviceAvailable(const std::string& service)
  {
#ifdef DEBUG
761 762 763 764 765
      std::cout << "checking if service " << service << " is available" << std::endl;
#endif
      std::string advertised_services = this->get_advertised_services();
      auto pos = advertised_services.find(service);
      return pos != std::string::npos ? true : false;
766 767 768 769 770 771 772 773 774
  }

  //!
  //! \brief Waits until the service with the name \p service is available.
  //! \param service Service name.
  //! @note This function will block as long as the service is not available.
  //!
  void waitForService(const std::string& service)
  {
775 776 777
      auto stop = std::make_shared<std::atomic_bool>(false);
      waitForService(service, stop);
  }
778

779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
  //!
  //! \brief Waits until the service with the name \p service is available.
  //! \param service Service name.
  //! \param stop Flag to stop waiting.
  //! @note This function will block as long as the service is not available or \p stop is false.
  //!
  void waitForService(const std::string& service, const std::shared_ptr<std::atomic_bool> stop)
  {
#ifdef DEBUG
    auto s = std::chrono::high_resolution_clock::now();
    long counter = 0;
#endif
    while( !stop->load() )
    {
#ifdef DEBUG
        ++counter;
#endif
        if ( serviceAvailable(service) ){
            break;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid excessive polling.
    };
#ifdef DEBUG
    auto e = std::chrono::high_resolution_clock::now();
    std::cout << "waitForService() " << service << " time: "
              << std::chrono::duration_cast<std::chrono::milliseconds>(e-s).count()
              << " ms." << std::endl;
    std::cout << "waitForTopic() " << service << ": number of calls to topicAvailable: "
              << counter << std::endl;
#endif
  }
810

811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828
  //!
  //! \brief Waits until the topic with the name \p topic is available.
  //! \param topic Topic name.
  //! @note This function will block as long as the topic is not available.
  //!
  void waitForTopic(const std::string& topic)
  {
      auto stop = std::make_shared<std::atomic_bool>(false);
      waitForTopic(topic, stop);
  }
  //!
  //! \brief Waits until the topic with the name \p topic is available.
  //! \param topic Topic name.
  //! \param stop Flag to stop waiting.
  //! @note This function will block as long as the topic is not available or \p stop is false.
  //!
  void waitForTopic(const std::string& topic, const std::shared_ptr<std::atomic_bool> stop)
  {
829 830 831 832
#ifdef DEBUG
    auto s = std::chrono::high_resolution_clock::now();
    long counter = 0;
#endif
833
    while( !stop->load())
834 835 836 837
    {
#ifdef DEBUG
        ++counter;
#endif
838
        if ( topicAvailable(topic) ){
839 840
            break;
        }
841
        std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // Avoid excessive polling.
842 843 844
    };
#ifdef DEBUG
    auto e = std::chrono::high_resolution_clock::now();
845
    std::cout << "waitForTopic() " << topic << " time: "
846 847
              << std::chrono::duration_cast<std::chrono::milliseconds>(e-s).count()
              << " ms." << std::endl;
848
    std::cout << "waitForTopic() " << topic << ": number of calls to topicAvailable: "
849 850 851
              << counter << std::endl;
#endif
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
852
};