topic_subscriber.cpp 2.86 KB
Newer Older
1
#include "topic_subscriber.h"
Valentin Platzgummer's avatar
Valentin Platzgummer committed
2

3
#include <thread>
Valentin Platzgummer's avatar
Valentin Platzgummer committed
4

Valentin Platzgummer's avatar
Valentin Platzgummer committed
5
static const char *topicSubscriberKey = "topic_subscriber";
Valentin Platzgummer's avatar
Valentin Platzgummer committed
6

Valentin Platzgummer's avatar
Valentin Platzgummer committed
7 8 9
ros_bridge::com_private::TopicSubscriber::TopicSubscriber(
    RosbridgeWsClient &rbc)
    : _rbc(rbc), _stopped(std::make_shared<std::atomic_bool>(true)) {}
Valentin Platzgummer's avatar
Valentin Platzgummer committed
10

Valentin Platzgummer's avatar
Valentin Platzgummer committed
11
ros_bridge::com_private::TopicSubscriber::~TopicSubscriber() { this->reset(); }
12

Valentin Platzgummer's avatar
Valentin Platzgummer committed
13 14
void ros_bridge::com_private::TopicSubscriber::start() {
  _stopped->store(false);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
15 16
}

Valentin Platzgummer's avatar
Valentin Platzgummer committed
17 18 19 20 21 22 23 24
void ros_bridge::com_private::TopicSubscriber::reset() {
  if (!_stopped->load()) {
    _stopped->store(true);
    {
      for (auto &item : _topicMap) {
        _rbc.unsubscribe(item.second);
        _rbc.removeClient(item.first);
      }
25
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
26 27
    _topicMap.clear();
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
28 29
}

30
void ros_bridge::com_private::TopicSubscriber::subscribe(
Valentin Platzgummer's avatar
Valentin Platzgummer committed
31 32 33
    const char *topic, const std::function<void(JsonDocUPtr)> &callback) {
  if (_stopped->load())
    return;
34

Valentin Platzgummer's avatar
Valentin Platzgummer committed
35 36 37 38 39 40
  std::string clientName = topicSubscriberKey + std::string(topic);
  auto it = _topicMap.find(clientName);
  if (it != _topicMap.end()) { // Topic already subscribed?
    return;
  }
  _topicMap.insert(std::make_pair(clientName, std::string(topic)));
41

Valentin Platzgummer's avatar
Valentin Platzgummer committed
42 43 44 45
  // Wrap callback.
  auto callbackWrapper =
      [callback](std::shared_ptr<WsClient::Connection>,
                 std::shared_ptr<WsClient::InMessage> in_message) {
46 47 48
        // Parse document.
        JsonDoc docFull;
        docFull.Parse(in_message->string().c_str());
Valentin Platzgummer's avatar
Valentin Platzgummer committed
49 50 51 52
        if (docFull.HasParseError()) {
          std::cout << "Json document has parse error: " << in_message->string()
                    << std::endl;
          return;
53
        } else if (!docFull.HasMember("msg")) {
Valentin Platzgummer's avatar
Valentin Platzgummer committed
54 55 56
          std::cout << "Json document does not contain a message (\"msg\"): "
                    << in_message->string() << std::endl;
          return;
57
        }
58

59 60 61 62
        // Extract message and call callback.
        JsonDocUPtr pDoc(new JsonDoc());
        pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator());
        callback(std::move(pDoc));
Valentin Platzgummer's avatar
Valentin Platzgummer committed
63
      };
64

Valentin Platzgummer's avatar
Valentin Platzgummer committed
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
  if (!_rbc.topicAvailable(topic)) {
    // Wait until topic is available.
    std::cout << "TopicSubscriber: Starting wait thread, " << clientName
              << std::endl;
    std::thread t([this, clientName, topic, callbackWrapper] {
      this->_rbc.waitForTopic(topic, [this] { return this->_stopped->load(); });
      if (!this->_stopped->load()) {
        this->_rbc.addClient(clientName);
        this->_rbc.subscribe(clientName, topic, callbackWrapper);
        std::cout << "TopicSubscriber: wait thread subscription successfull: "
                  << clientName << std::endl;
      }
      std::cout << "TopicSubscriber: wait thread end, " << clientName
                << std::endl;
    });
    t.detach();
  } else {
    _rbc.addClient(clientName);
    _rbc.subscribe(clientName, topic, callbackWrapper);
    std::cout << "TopicSubscriber: subscription successfull: " << clientName
              << std::endl;
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
87
}