topic_publisher.cpp 4.1 KB
Newer Older
1 2
#include "topic_publisher.h"

Valentin Platzgummer's avatar
Valentin Platzgummer committed
3
static const char *topicAdvertiserKey = "topic_advertiser";
4

Valentin Platzgummer's avatar
Valentin Platzgummer committed
5
#include <unordered_map>
6

Valentin Platzgummer's avatar
Valentin Platzgummer committed
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
ros_bridge::com_private::TopicPublisher::TopicPublisher(RosbridgeWsClient &rbc)
    : _stopped(std::make_shared<std::atomic_bool>(true)), _rbc(rbc) {}

ros_bridge::com_private::TopicPublisher::~TopicPublisher() { this->reset(); }

void ros_bridge::com_private::TopicPublisher::start() {
  if (!_stopped->load()) // start called while thread running.
    return;
  _stopped->store(false);
  _pThread = std::make_unique<std::thread>([this] {
    // Main Loop.
    while (!this->_stopped->load()) {
      std::unique_lock<std::mutex> lk(this->_mutex);
      // Check if new data available, wait if not.
      if (this->_queue.empty()) {
        if (_stopped->load()) // Check condition again while holding the lock.
          break;
        this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter
                            // in this case.
        continue;
      }
      // Pop message from queue.
      JsonDocUPtr pJsonDoc = std::move(this->_queue.front());
      this->_queue.pop_front();
      lk.unlock();

      // Get topic and type from Json message and remove it.
      std::string topic;
      bool ret = com_private::getTopic(*pJsonDoc, topic);
      assert(ret);
      (void)ret;

      // Debug
      rapidjson::StringBuffer sb;
      rapidjson::Writer<rapidjson::StringBuffer> writer(sb);
      pJsonDoc->Accept(writer);
      std::cout << "message: " << sb.GetString() << std::endl;
      std::cout << "topic: " << topic << std::endl;

      ret = com_private::removeTopic(*pJsonDoc);
      assert(ret);
      (void)ret;

      // Wait for topic (Rosbridge needs some time to process a advertise()
      // request).
      this->_rbc.waitForTopic(topic, [this] { return this->_stopped->load(); });

      // Publish Json message.
      if (!this->_stopped->load())
        this->_rbc.publish(topic, *pJsonDoc);
    } // while loop
Valentin Platzgummer's avatar
Valentin Platzgummer committed
58
#ifdef ROS_BRIDGE_DEBUG
Valentin Platzgummer's avatar
Valentin Platzgummer committed
59
    std::cout << "TopicPublisher: publisher thread terminated." << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
60
#endif
Valentin Platzgummer's avatar
Valentin Platzgummer committed
61
  });
62 63
}

Valentin Platzgummer's avatar
Valentin Platzgummer committed
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
void ros_bridge::com_private::TopicPublisher::reset() {
  if (_stopped->load()) // stop called while thread not running.
    return;
  std::unique_lock<std::mutex> lk(_mutex);
  _stopped->store(true);
  _cv.notify_one(); // Wake publisher thread.
  lk.unlock();

  if (!_pThread)
    return;
  _pThread->join();

  lk.lock();
  // Tidy up.
  for (auto &it : this->_topicMap) {
    this->_rbc.unadvertise(it.first);
    this->_rbc.removeClient(it.second);
  }
  this->_topicMap.clear();
  _queue.clear();
84 85 86
}

void ros_bridge::com_private::TopicPublisher::publish(
Valentin Platzgummer's avatar
Valentin Platzgummer committed
87 88 89 90 91 92 93
    ros_bridge::com_private::JsonDocUPtr docPtr, const char *topic) {
  // Check if topic was advertised.
  std::string t(topic);
  std::unique_lock<std::mutex> lk(this->_mutex);
  auto it = this->_topicMap.find(t);
  if (it == this->_topicMap.end()) { // Not yet advertised?
    lk.unlock();
Valentin Platzgummer's avatar
Valentin Platzgummer committed
94
#ifdef ROS_BRIDGE_DEBUG
Valentin Platzgummer's avatar
Valentin Platzgummer committed
95 96
    std::cerr << "TopicPublisher: topic " << t << " not yet advertised"
              << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
97
#endif
Valentin Platzgummer's avatar
Valentin Platzgummer committed
98 99 100 101 102 103 104 105 106 107 108 109 110 111
    return;
  }
  lk.unlock();

  // Add topic information to json doc.
  auto &allocator = docPtr->GetAllocator();
  rapidjson::Value key("topic", allocator);
  rapidjson::Value value(topic, allocator);
  docPtr->AddMember(key, value, allocator);

  lk.lock();
  _queue.push_back(std::move(docPtr));
  lk.unlock();
  _cv.notify_one(); // Wake publisher thread.
112 113
}

Valentin Platzgummer's avatar
Valentin Platzgummer committed
114 115 116 117 118 119 120 121 122 123 124 125 126 127
bool ros_bridge::com_private::TopicPublisher::advertise(const char *topic,
                                                        const char *type) {
  std::unique_lock<std::mutex> lk(this->_mutex);
  std::string t(topic);
  auto it = this->_topicMap.find(t);
  if (it == this->_topicMap.end()) { // Need to advertise topic?
    std::string clientName = std::string(topicAdvertiserKey) + t;
    this->_topicMap.insert(std::make_pair(t, clientName));
    lk.unlock();
    this->_rbc.addClient(clientName);
    this->_rbc.advertise(clientName, t, type);
    return true;
  } else {
    lk.unlock();
Valentin Platzgummer's avatar
Valentin Platzgummer committed
128
#if ROS_BRIDGE_DEBUG
Valentin Platzgummer's avatar
Valentin Platzgummer committed
129 130
    std::cerr << "TopicPublisher: topic " << topic << " already advertised"
              << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
131
#endif
Valentin Platzgummer's avatar
Valentin Platzgummer committed
132 133
    return false;
  }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
134
}