RosBridgeClient.h 21.3 KB
Newer Older
Valentin Platzgummer's avatar
Valentin Platzgummer committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#pragma once

/*
*  Created on: Apr 16, 2018
*      Author: Poom Pianpak
*/

#include "Simple-WebSocket-Server/client_ws.hpp"

#include "rapidjson/include/rapidjson/document.h"
#include "rapidjson/include/rapidjson/writer.h"
#include "rapidjson/include/rapidjson/stringbuffer.h"

#include <chrono>
#include <functional>
#include <thread>
17
#include <future>
18
#include <mutex>
19 20
#include <tuple>
#include <deque>
Valentin Platzgummer's avatar
Valentin Platzgummer committed
21 22 23 24

using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>;
using InMessage =  std::function<void(std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::InMessage>)>;

25 26 27 28 29 30
template <typename T>
constexpr typename std::underlying_type<T>::type integral(T value)
{
    return static_cast<typename std::underlying_type<T>::type>(value);
}

Valentin Platzgummer's avatar
Valentin Platzgummer committed
31 32
class RosbridgeWsClient
{
33 34 35 36 37 38 39 40 41 42 43
  using TopicData = std::tuple<std::string /*topic*/,
                               std::string /*client name*/,
                               std::shared_ptr<WsClient> /*client*/,
                               std::shared_ptr<std::atomic_bool> /*topic ready*/>;
  enum class TopicEnum{
      TopicName = 0,
      ClientName = 1,
      Client = 2,
      Ready = 3
  };

44
  std::string server_port_path;
45 46 47 48
  std::unordered_map<std::string /*client name*/,
                     std::shared_ptr<WsClient> /*client*/> client_map;
  std::deque<TopicData> advertised_topics_list;
  std::mutex mutex;
49 50 51 52 53 54 55

  void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message)
  {
    if (!client->on_open)
    {
#ifdef DEBUG
      client->on_open = [client_name, message](std::shared_ptr<WsClient::Connection> connection) {
Valentin Platzgummer's avatar
Valentin Platzgummer committed
56
#else
57
      client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
Valentin Platzgummer's avatar
Valentin Platzgummer committed
58 59
#endif

60 61 62
#ifdef DEBUG
        std::cout << client_name << ": Opened connection" << std::endl;
        std::cout << client_name << ": Sending message: " << message << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
63
#endif
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
        connection->send(message);
      };
    }

#ifdef DEBUG
    if (!client->on_message)
    {
      client->on_message = [client_name](std::shared_ptr<WsClient::Connection> /*connection*/, std::shared_ptr<WsClient::InMessage> in_message) {
        std::cout << client_name << ": Message received: " << in_message->string() << std::endl;
      };
    }

    if (!client->on_close)
    {
      client->on_close = [client_name](std::shared_ptr<WsClient::Connection> /*connection*/, int status, const std::string & /*reason*/) {
        std::cout << client_name << ": Closed connection with status code " << status << std::endl;
      };
    }

    if (!client->on_error)
    {
      // See http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference.html, Error Codes for error code meanings
      client->on_error = [client_name](std::shared_ptr<WsClient::Connection> /*connection*/, const SimpleWeb::error_code &ec) {
        std::cout << client_name << ": Error: " << ec << ", error message: " << ec.message() << std::endl;
      };
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
90

91
#endif
92 93
#ifdef DEBUG
    std::thread client_thread([client_name, client]() {
Valentin Platzgummer's avatar
Valentin Platzgummer committed
94
#else
95
    std::thread client_thread([client]() {
Valentin Platzgummer's avatar
Valentin Platzgummer committed
96
#endif
97
      client->start();
Valentin Platzgummer's avatar
Valentin Platzgummer committed
98

99 100
#ifdef DEBUG
      std::cout << client_name << ": Terminated" << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
101
#endif
102 103 104 105 106
      client->on_open = NULL;
      client->on_message = NULL;
      client->on_close = NULL;
      client->on_error = NULL;
    });
Valentin Platzgummer's avatar
Valentin Platzgummer committed
107

108 109
    client_thread.detach();
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
110 111

public:
112 113 114 115 116 117 118
  RosbridgeWsClient(const std::string& server_port_path)
  {
    this->server_port_path = server_port_path;
  }

  ~RosbridgeWsClient()
  {
119 120 121 122
    for (auto& topicData : advertised_topics_list){
        unadvertise(std::get<integral(TopicEnum::ClientName)>(topicData),
                    std::get<integral(TopicEnum::TopicName)>(topicData));
    }
123 124
    for (auto& client : client_map)
    {
125
      removeClient(client.first);
126 127
    }
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
128

129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
 // The execution can take up to 100 ms!
 bool connected(){

    auto p = std::make_shared<std::promise<void>>();
    auto future = p->get_future();

    auto callback = [](std::shared_ptr<WsClient::Connection> connection, std::shared_ptr<std::promise<void>> p) {
        p->set_value();
        connection->send_close(1000);
    };
    std::shared_ptr<WsClient> status_client = std::make_shared<WsClient>(server_port_path);
    status_client->on_open = std::bind(callback, std::placeholders::_1, p);

    std::async([status_client]{

        status_client->start();
        status_client->on_open = NULL;
        status_client->on_message = NULL;
        status_client->on_close = NULL;
        status_client->on_error = NULL;

    });

    auto status = future.wait_for(std::chrono::milliseconds(100));
    return status == std::future_status::ready;
 }

156 157 158
  // Adds a client to the client_map.
  void addClient(const std::string& client_name)
  {
159
    std::lock_guard<std::mutex> lk(mutex);
160 161 162 163 164 165 166 167 168 169
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
    if (it == client_map.end())
    {
      client_map[client_name] = std::make_shared<WsClient>(server_port_path);
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << " has already been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
170
#endif
171 172 173 174
  }

  std::shared_ptr<WsClient> getClient(const std::string& client_name)
  {
175
    std::lock_guard<std::mutex> lk(mutex);
176 177 178 179 180 181 182 183 184 185
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
    if (it != client_map.end())
    {
      return it->second;
    }
    return NULL;
  }

  void stopClient(const std::string& client_name)
  {
186
    std::lock_guard<std::mutex> lk(mutex);
187 188 189 190 191 192 193 194 195 196
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
    if (it != client_map.end())
    {
      it->second->stop();
      it->second->on_open = NULL;
      it->second->on_message = NULL;
      it->second->on_close = NULL;
      it->second->on_error = NULL;
#ifdef DEBUG
      std::cout << client_name << " has been stopped" << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
197
#endif
198 199 200 201 202 203
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << " has not been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
204
#endif
205 206 207 208
  }

  void removeClient(const std::string& client_name)
  {
209
    std::lock_guard<std::mutex> lk(mutex);
210 211 212 213 214
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
    if (it != client_map.end())
    {
      // Stop the client asynchronously in 100 ms.
      // This is to ensure, that all threads involving the client have been launched.
215 216 217 218 219 220 221 222
      std::shared_ptr<WsClient> client = it->second;
#ifdef DEBUG
      std::thread t([client, client_name](){
#else
      std::thread t([client](){
#endif
          std::this_thread::sleep_for(std::chrono::milliseconds(100));
          client->stop();
223 224
#ifdef DEBUG
      std::cout << client_name << " has been removed" << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
225
#endif
226 227 228
      });
      client_map.erase(it);
      t.detach();
229 230 231 232 233 234
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << " has not been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
235
#endif
236 237 238 239 240 241
  }

  // Gets the client from client_map and starts it. Advertising is essentially sending a message.
  // One client per topic!
  void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "")
  {
242 243 244
    std::lock_guard<std::mutex> lk(mutex);
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it_client = client_map.find(client_name);
    if (it_client != client_map.end())
245
    {
246 247 248 249 250 251 252 253 254 255 256 257 258 259
      auto it_topic = std::find_if(advertised_topics_list.begin(),
                                   advertised_topics_list.end(),
                                   [topic](const TopicData &td){
          return topic == std::get<integral(TopicEnum::TopicName)>(td);
      });
      if ( it_topic != advertised_topics_list.end()){
#ifdef DEBUG
            std::cerr << "topic: " << topic << " already advertised" << std::endl;
#endif
            return;
      }
      auto client = it_client->second;
      auto ready = std::make_shared<std::atomic_bool>(false);
      advertised_topics_list.push_back(std::make_tuple(topic, client_name, client, ready));
260

261
      std::string message = "\"op\":\"advertise\", \"topic\":\"" + topic + "\", \"type\":\"" + type + "\"";
262 263 264 265 266 267
      if (id.compare("") != 0)
      {
        message += ", \"id\":\"" + id + "\"";
      }
      message = "{" + message + "}";

268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
#ifdef DEBUG
      client->on_open = [client_name, message, ready](std::shared_ptr<WsClient::Connection> connection) {
#else
      client->on_open = [message, ready](std::shared_ptr<WsClient::Connection> connection) {
#endif

#ifdef DEBUG
        std::cout << client_name << ": Opened connection" << std::endl;
        std::cout << client_name << ": Sending message: " << message << std::endl;
#endif
        connection->send(message);
        // Wait for rosbridge_server to process the request and mark topic as "ready".
        // This could be avoided by waiting for a status message. However, at the time of
        // writing rosbridge_server status messages are still experimental.
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
        ready->store(true);
      };

      start(client_name, client, message);
287 288 289 290 291 292
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << "has not been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
293
#endif
294
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
295

296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
  void unadvertise(const std::string& client_name,
                   const std::string& topic,
                   const std::string& id = ""){
      std::lock_guard<std::mutex> lk(mutex);
      std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it_client = client_map.find(client_name);
      if (it_client != client_map.end())
      {
          // Topic advertised?
          auto it_topic = std::find_if(advertised_topics_list.begin(),
                                       advertised_topics_list.end(),
                                       [topic](const TopicData &td){
              return topic == std::get<integral(TopicEnum::TopicName)>(td);
          });
        if ( it_topic == advertised_topics_list.end()){
  #ifdef DEBUG
              std::cerr << "topic: " << topic << " not advertised" << std::endl;
  #endif
              return;
        }

        std::string message = "\"op\":\"unadvertise\"";
        if (id.compare("") != 0)
        {
          message += ", \"id\":\"" + id + "\"";
        }
        message += ", \"topic\":\"" + topic + "\"";
        message = "{" + message + "}";

        auto client = it_client->second;
        auto ready = std::get<integral(TopicEnum::Ready)>(*it_topic);
  #ifdef DEBUG
        client->on_open = [client_name, message, ready](std::shared_ptr<WsClient::Connection> connection) {
  #else
        client->on_open = [message, ready](std::shared_ptr<WsClient::Connection> connection) {
  #endif

  #ifdef DEBUG
          std::cout << client_name << ": Opened connection" << std::endl;
          std::cout << client_name << ": Sending message: " << message << std::endl;
  #endif
          while ( !ready->load() ){ // Wait for topic to be advertised.
              std::this_thread::sleep_for(std::chrono::milliseconds(500));
          }
          connection->send(message);
        };

        start(client_name, client, message);
        advertised_topics_list.erase(it_topic);
      }
  #ifdef DEBUG
      else
      {
        std::cerr << client_name << "has not been created" << std::endl;
      }
  #endif

  }

354 355
  void publish(const std::string& topic, const rapidjson::Document& msg, const std::string& id = "")
  {
356 357 358 359 360 361 362 363 364 365 366 367
    std::lock_guard<std::mutex> lk(mutex);
    auto it_topic = std::find_if(advertised_topics_list.begin(),
                                 advertised_topics_list.end(),
                                 [&topic](const TopicData &td){
        return topic == std::get<integral(TopicEnum::TopicName)>(td);
    });
    if ( it_topic == advertised_topics_list.end() ){
#ifdef DEBUG
        std::cerr << "topic: " << topic << " not yet advertised" << std::endl;
#endif
        return;
    }
368 369 370
    rapidjson::StringBuffer strbuf;
    rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf);
    msg.Accept(writer);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
371

372
    std::string client_name = "publish_client" + topic;
373
    std::string message = "\"op\":\"publish\", \"topic\":\"" + topic + "\", \"msg\":" + strbuf.GetString();
Valentin Platzgummer's avatar
Valentin Platzgummer committed
374

375 376 377 378 379
    if (id.compare("") != 0)
    {
      message += ", \"id\":\"" + id + "\"";
    }
    message = "{" + message + "}";
Valentin Platzgummer's avatar
Valentin Platzgummer committed
380

381
    std::shared_ptr<WsClient> publish_client = std::make_shared<WsClient>(server_port_path);
382 383
    auto ready = std::get<integral(TopicEnum::Ready)>(*it_topic);
    publish_client->on_open = [message, client_name, ready](std::shared_ptr<WsClient::Connection> connection) {
384
#ifdef DEBUG
385 386 387
      std::cout << client_name << ": Opened connection" << std::endl;
      std::cout << client_name << ": Sending message." << std::endl;
      //std::cout << client_name << ": Sending message: " << message << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
388
#endif
389 390 391
      while ( !ready->load() ){ // Wait for the topic to be advertised.
          std::this_thread::sleep_for(std::chrono::milliseconds(500));
      }
392
      connection->send(message);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
393

394 395 396
      // TODO: This could be improved by creating a thread to keep publishing the message instead of closing it right away
      connection->send_close(1000);
    };
Valentin Platzgummer's avatar
Valentin Platzgummer committed
397

398
    start(client_name, publish_client, message);
399 400 401 402
  }

  void subscribe(const std::string& client_name, const std::string& topic, const InMessage& callback, const std::string& id = "", const std::string& type = "", int throttle_rate = -1, int queue_length = -1, int fragment_size = -1, const std::string& compression = "")
  {
403
    std::lock_guard<std::mutex> lk(mutex);
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
    if (it != client_map.end())
    {
      std::string message = "\"op\":\"subscribe\", \"topic\":\"" + topic + "\"";

      if (id.compare("") != 0)
      {
        message += ", \"id\":\"" + id + "\"";
      }
      if (type.compare("") != 0)
      {
        message += ", \"type\":\"" + type + "\"";
      }
      if (throttle_rate > -1)
      {
        message += ", \"throttle_rate\":" + std::to_string(throttle_rate);
      }
      if (queue_length > -1)
      {
        message += ", \"queue_length\":" + std::to_string(queue_length);
      }
      if (fragment_size > -1)
      {
        message += ", \"fragment_size\":" + std::to_string(fragment_size);
      }
      if (compression.compare("none") == 0 || compression.compare("png") == 0)
      {
        message += ", \"compression\":\"" + compression + "\"";
      }
      message = "{" + message + "}";

      it->second->on_message = callback;
      start(client_name, it->second, message);
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << "has not been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
443
#endif
444 445 446 447
  }

  void advertiseService(const std::string& client_name, const std::string& service, const std::string& type, const InMessage& callback)
  {
448
    std::lock_guard<std::mutex> lk(mutex);
449 450 451 452 453 454 455 456 457 458 459 460 461
    std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
    if (it != client_map.end())
    {
      std::string message = "{\"op\":\"advertise_service\", \"service\":\"" + service + "\", \"type\":\"" + type + "\"}";

      it->second->on_message = callback;
      start(client_name, it->second, message);
    }
#ifdef DEBUG
    else
    {
      std::cerr << client_name << "has not been created" << std::endl;
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
462
#endif
463
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
464

465
  void serviceResponse(const std::string& service, const std::string& id, bool result, const rapidjson::Document& values)
466 467
  {
    std::string message = "\"op\":\"service_response\", \"service\":\"" + service + "\", \"result\":" + ((result)? "true" : "false");
Valentin Platzgummer's avatar
Valentin Platzgummer committed
468

469 470 471
    // Rosbridge somehow does not allow service_response to be sent without id and values
    // , so we cannot omit them even though the documentation says they are optional.
    message += ", \"id\":\"" + id + "\"";
Valentin Platzgummer's avatar
Valentin Platzgummer committed
472

473 474 475 476
    // Convert JSON document to string
    rapidjson::StringBuffer strbuf;
    rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf);
    values.Accept(writer);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
477

478 479
    message += ", \"values\":" + std::string(strbuf.GetString());
    message = "{" + message + "}";
Valentin Platzgummer's avatar
Valentin Platzgummer committed
480

481
    std::shared_ptr<WsClient> service_response_client = std::make_shared<WsClient>(server_port_path);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
482

483 484 485 486
    service_response_client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
#ifdef DEBUG
      std::cout << "service_response_client: Opened connection" << std::endl;
      std::cout << "service_response_client: Sending message: " << message << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
487
#endif
488
      connection->send(message);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
489

490 491
      connection->send_close(1000);
    };
Valentin Platzgummer's avatar
Valentin Platzgummer committed
492

493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
    start("service_response_client", service_response_client, message);
  }

  void callService(const std::string& service, const InMessage& callback, const rapidjson::Document& args = {}, const std::string& id = "", int fragment_size = -1, const std::string& compression = "")
  {
    std::string message = "\"op\":\"call_service\", \"service\":\"" + service + "\"";

    if (!args.IsNull())
    {
      rapidjson::StringBuffer strbuf;
      rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf);
      args.Accept(writer);

      message += ", \"args\":" + std::string(strbuf.GetString());
    }
    if (id.compare("") != 0)
    {
      message += ", \"id\":\"" + id + "\"";
    }
    if (fragment_size > -1)
    {
      message += ", \"fragment_size\":" + std::to_string(fragment_size);
    }
    if (compression.compare("none") == 0 || compression.compare("png") == 0)
    {
      message += ", \"compression\":\"" + compression + "\"";
    }
    message = "{" + message + "}";

    std::shared_ptr<WsClient> call_service_client = std::make_shared<WsClient>(server_port_path);

    if (callback)
    {
      call_service_client->on_message = callback;
    }
    else
    {
      call_service_client->on_message = [](std::shared_ptr<WsClient::Connection> connection, std::shared_ptr<WsClient::InMessage> in_message) {
#ifdef DEBUG
        std::cout << "call_service_client: Message received: " << in_message->string() << std::endl;
        std::cout << "call_service_client: Sending close connection" << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
534
#endif
535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647
        connection->send_close(1000);
      };
    }

    start("call_service_client", call_service_client, message);
  }

  //!
  //! \brief Checks if the service \p service is available.
  //! \param service Service name.
  //! \return Returns true if the service is available, false either.
  //! \note Don't call this function to frequently. Use \fn waitForService() instead.
  //!
  bool serviceAvailable(const std::string& service)
  {
    const rapidjson::Document args = {};
    const std::string& id = "";
    const int fragment_size = -1;
    const std::string& compression = "";
    std::string message = "\"op\":\"call_service\", \"service\":\"" + service + "\"";
    std::string client_name("wait_for_service_client");

    if (!args.IsNull())
    {
      rapidjson::StringBuffer strbuf;
      rapidjson::Writer<rapidjson::StringBuffer> writer(strbuf);
      args.Accept(writer);

      message += ", \"args\":" + std::string(strbuf.GetString());
    }
    if (id.compare("") != 0)
    {
      message += ", \"id\":\"" + id + "\"";
    }
    if (fragment_size > -1)
    {
      message += ", \"fragment_size\":" + std::to_string(fragment_size);
    }
    if (compression.compare("none") == 0 || compression.compare("png") == 0)
    {
      message += ", \"compression\":\"" + compression + "\"";
    }
    message = "{" + message + "}";

    std::shared_ptr<WsClient> wait_for_service_client = std::make_shared<WsClient>(server_port_path);
    std::shared_ptr<std::promise<bool>> p(std::make_shared<std::promise<bool>>());
    auto future = p->get_future();
#ifdef DEBUG
    wait_for_service_client->on_message = [p, service, client_name](
#else
    wait_for_service_client->on_message = [p](
#endif
            std::shared_ptr<WsClient::Connection> connection,
            std::shared_ptr<WsClient::InMessage> in_message){
#ifdef DEBUG
#endif
        rapidjson::Document doc;
        doc.Parse(in_message->string().c_str());
        if ( !doc.HasParseError()
             && doc.HasMember("result")
             && doc["result"].IsBool()
             && doc["result"] == true )
        {
#ifdef DEBUG
            std::cout << client_name << ": "
                      << "service " << service
                      << " available." << std::endl;
            std::cout << client_name << ": "
                      << "message: " << in_message->string()
                      << std::endl;
#endif
            p->set_value(true);
        } else {
            p->set_value(false);
        }
        connection->send_close(1000);
    };
    start(client_name, wait_for_service_client, message);
    return future.get();
  }

  //!
  //! \brief Waits until the service with the name \p service is available.
  //! \param service Service name.
  //! @note This function will block as long as the service is not available.
  //!
  void waitForService(const std::string& service)
  {


#ifdef DEBUG
    auto s = std::chrono::high_resolution_clock::now();
    long counter = 0;
#endif
    while(true)
    {
#ifdef DEBUG
        ++counter;
#endif
        if (serviceAvailable(service)){
            break;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Avoid excessive polling.
    };
#ifdef DEBUG
    auto e = std::chrono::high_resolution_clock::now();
    std::cout << "waitForService(): time: "
              << std::chrono::duration_cast<std::chrono::milliseconds>(e-s).count()
              << " ms." << std::endl;
    std::cout << "waitForService(): clients launched: "
              << counter << std::endl;
#endif
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
648
};