TopicPublisher.cpp 3.58 KB
Newer Older
Valentin Platzgummer's avatar
Valentin Platzgummer committed
1
#include "TopicPublisher.h"
Valentin Platzgummer's avatar
Valentin Platzgummer committed
2

3
#include <unordered_map>
4

5 6
ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker &casePacker,
                                                      RosbridgeWsClient &rbc) :
7
      _stopped(std::make_shared<std::atomic_bool>(true))
8 9 10 11 12 13 14 15 16 17 18 19 20
    , _casePacker(casePacker)
    , _rbc(rbc)
{

}

ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher()
{
    this->reset();
}

void ROSBridge::ComPrivate::TopicPublisher::start()
{
21
    if ( !_stopped->load() )  // start called while thread running.
22
        return;
23 24 25 26 27
    _stopped->store(false);
    _pThread = std::make_unique<std::thread>([this]{
        // Init.
        std::unordered_map<std::string, std::string> topicMap;
        // Main Loop.
28 29 30 31 32 33 34 35 36 37 38 39 40 41
        while( !this->_stopped->load() ){
            JsonDocUPtr pJsonDoc;
            {
                std::unique_lock<std::mutex> lk(this->_mutex);
                // Check if new data available, wait if not.
                if (this->_queue.empty()){
                    if ( _stopped->load() ) // Check condition again while holding the lock.
                        break;
                    this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case.
                    continue;
                }
                // Pop message from queue.
                pJsonDoc = std::move(this->_queue.front());
                this->_queue.pop_front();
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
            }

            // Get tag from Json message and remove it.
            Tag tag;
            bool ret = this->_casePacker.getTag(pJsonDoc, tag);
            assert(ret); // Json message does not contain a tag;
            (void)ret;
            this->_casePacker.removeTag(pJsonDoc);

            // Check if topic must be advertised.
            // Advertised topics are stored in advertisedTopicsHashList as
            // a hash.
            std::string clientName = ROSBridge::ComPrivate::_topicAdvertiserKey
                                   + tag.topic();
            auto it =  topicMap.find(clientName);
            if ( it ==  topicMap.end()) { // Need to advertise topic?
                 topicMap.insert(std::make_pair(clientName, tag.topic()));
                 this->_rbc.addClient(clientName);
                 this->_rbc.advertise(clientName,
                                   tag.topic(),
                                   tag.messageType() );
63 64 65
                 this->_rbc.waitForTopic(tag.topic(), [this]{
                     return this->_stopped->load();
                 }); // Wait until topic is advertised.
66 67 68 69 70 71 72 73 74 75 76 77
            }

            // Publish Json message.
            if ( !this->_stopped->load() )
                this->_rbc.publish(tag.topic(), *pJsonDoc.get());
        } // while loop

        // Tidy up.
        for (auto& it :  topicMap){
            this->_rbc.unadvertise(it.second);
            this->_rbc.removeClient(it.first);
        }
78 79

        std::cout << "TopicPublisher: publisher thread terminated." << std::endl;
80
    });
81 82 83 84
}

void ROSBridge::ComPrivate::TopicPublisher::reset()
{
85
    if ( _stopped->load() )  // stop called while thread not running.
86
        return;
87 88 89 90 91 92 93
    {
        std::lock_guard<std::mutex> lk(_mutex);
        std::cout << "TopicPublisher: _stopped->store(true)." << std::endl;
        _stopped->store(true);
        std::cout << "TopicPublisher: ask publisher thread to stop." << std::endl;
        _cv.notify_one(); // Wake publisher thread.
    }
94 95 96
    if ( !_pThread )
        return;
    _pThread->join();
97
    std::cout << "TopicPublisher: publisher thread joined." << std::endl;
98 99 100 101
    {
        _queue.clear();
        std::cout << "TopicPublisher: queue cleard." << std::endl;
    }
102 103
}