topic_publisher.cpp 3.43 KB
Newer Older
1 2 3 4
#include "topic_publisher.h"

#include <unordered_map>

Valentin Platzgummer's avatar
Valentin Platzgummer committed
5
ros_bridge::com_private::TopicPublisher::TopicPublisher(RosbridgeWsClient &rbc) :
6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
      _stopped(std::make_shared<std::atomic_bool>(true))
    , _rbc(rbc)
{

}

ros_bridge::com_private::TopicPublisher::~TopicPublisher()
{
    this->reset();
}

void ros_bridge::com_private::TopicPublisher::start()
{
    if ( !_stopped->load() )  // start called while thread running.
        return;
    _stopped->store(false);
    _pThread = std::make_unique<std::thread>([this]{
        // Init.
        std::unordered_map<std::string, std::string> topicMap;
        // Main Loop.
        while( !this->_stopped->load() ){
            std::unique_lock<std::mutex> lk(this->_mutex);
            // Check if new data available, wait if not.
            if (this->_queue.empty()){
                if ( _stopped->load() ) // Check condition again while holding the lock.
                    break;
                this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case.
                continue;
            }
            // Pop message from queue.
            JsonDocUPtr pJsonDoc = std::move(this->_queue.front());
            this->_queue.pop_front();
            lk.unlock();

            // Get topic and type from Json message and remove it.
            std::string topic;
            assert(com_private::getTopic(*pJsonDoc, topic));
            assert(com_private::removeTopic(*pJsonDoc));
            std::string type;
            assert(com_private::getType(*pJsonDoc, type));
            assert(com_private::removeType(*pJsonDoc));

            // Check if topic must be advertised.
            std::string clientName =
                    ros_bridge::com_private::_topicAdvertiserKey
                    + topic;
            auto it =  topicMap.find(clientName);
            if ( it ==  topicMap.end()) { // Need to advertise topic?
                 topicMap.insert(std::make_pair(clientName, topic));
                 this->_rbc.addClient(clientName);
                 this->_rbc.advertise(clientName, topic, type);
                 this->_rbc.waitForTopic(topic, [this]{
                     return this->_stopped->load();
                 }); // Wait until topic is advertised.
            }

            // Publish Json message.
            if ( !this->_stopped->load() )
                this->_rbc.publish(topic, *pJsonDoc);
        } // while loop

        // Tidy up.
        for (auto& it :  topicMap){
            this->_rbc.unadvertise(it.second);
            this->_rbc.removeClient(it.first);
        }

        std::cout << "TopicPublisher: publisher thread terminated." << std::endl;
    });
}

void ros_bridge::com_private::TopicPublisher::reset()
{
    if ( _stopped->load() )  // stop called while thread not running.
        return;
    std::unique_lock<std::mutex> lk(_mutex);
    _stopped->store(true);
    _cv.notify_one(); // Wake publisher thread.
    lk.unlock();

    if ( !_pThread )
        return;
    _pThread->join();

    lk.lock();
    _queue.clear();
}

void ros_bridge::com_private::TopicPublisher::publish(
        ros_bridge::com_private::JsonDocUPtr docPtr,
Valentin Platzgummer's avatar
Valentin Platzgummer committed
96 97 98 99 100
        const char *topic)
{
    rapidjson::Value jTopic;
    jTopic.SetString(topic, docPtr->GetAllocator());
    docPtr->AddMember("topic", jTopic, docPtr->GetAllocator());
101 102 103 104 105 106
    std::unique_lock<std::mutex> lock(_mutex);
    _queue.push_back(std::move(docPtr));
    lock.unlock();
    _cv.notify_one();  // Wake publisher thread.
}