TopicPublisher.cpp 4.07 KB
Newer Older
Valentin Platzgummer's avatar
Valentin Platzgummer committed
1
#include "TopicPublisher.h"
Valentin Platzgummer's avatar
Valentin Platzgummer committed
2 3


4

5 6 7 8 9 10 11 12 13 14 15 16 17 18


struct ROSBridge::ComPrivate::ThreadData
{
    const ROSBridge::CasePacker      &casePacker;
    RosbridgeWsClient                &rbc;
    std::mutex                       &rbcMutex;
    ROSBridge::ComPrivate::JsonQueue &queue;
    std::mutex                       &queueMutex;
    ROSBridge::ComPrivate::HashSet   &advertisedTopicsHashList;
    const std::atomic<bool>          &running;
    std::condition_variable          &cv;
};

19
ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker,
20 21
                                                      RosbridgeWsClient *rbc,
                                                      std::mutex *rbcMutex) :
22 23 24
      _running(false)
    , _casePacker(casePacker)
    , _rbc(rbc)
25
    , _rbcMutex(rbcMutex)
26 27 28 29 30 31 32 33 34 35 36 37 38 39
{

}

ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher()
{
    this->reset();
}

void ROSBridge::ComPrivate::TopicPublisher::start()
{
    if ( _running.load() )  // start called while thread running.
        return;
    _running.store(true);
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
    {
        std::lock_guard<std::mutex> lk(*_rbcMutex);
        _rbc->addClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
        _rbc->addClient(ROSBridge::ComPrivate::_topicPublisherKey);
    }
    ROSBridge::ComPrivate::ThreadData data{
                *_casePacker,
                *_rbc,
                *_rbcMutex,
                _queue,
                _queueMutex,
                _advertisedTopicsHashList,
                _running,
                _cv
    };
    _pThread = std::make_unique<std::thread>(&ROSBridge::ComPrivate::transmittLoop, data);
56 57 58 59 60 61 62
}

void ROSBridge::ComPrivate::TopicPublisher::reset()
{
    if ( !_running.load() )  // stop called while thread not running.
        return;
    _running.store(false);
63
    _cv.notify_one(); // Wake publisher thread.
64 65 66
    if ( !_pThread )
        return;
    _pThread->join();
67 68 69 70 71
    {
        std::lock_guard<std::mutex> lk(*_rbcMutex);
        _rbc->removeClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
        _rbc->removeClient(ROSBridge::ComPrivate::_topicPublisherKey);
    }
72 73 74 75
    _queue.clear();
    _advertisedTopicsHashList.clear();
}

76
void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data)
Valentin Platzgummer's avatar
Valentin Platzgummer committed
77
{
78 79 80 81 82
    while(data.running.load()){
        std::unique_lock<std::mutex> lk(data.queueMutex);
        // Check if new data available, wait if not.
        if (data.queue.empty()){
            data.cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case.
Valentin Platzgummer's avatar
Valentin Platzgummer committed
83 84
            continue;
        }
85 86 87 88
        // Pop message from queue.
        JsonDocUPtr pJsonDoc(std::move(data.queue.front()));
        data.queue.pop_front();
        lk.unlock();
Valentin Platzgummer's avatar
Valentin Platzgummer committed
89 90 91 92 93 94 95 96 97 98

        // Debug output.
//        std::cout << "Transmitter loop json document:" << std::endl;
//        rapidjson::OStreamWrapper out(std::cout);
//        rapidjson::Writer<rapidjson::OStreamWrapper> writer(out);
//        pJsonDoc->Accept(writer);
//        std::cout << std::endl << std::endl;

        // Get tag from Json message and remove it.
        Tag tag;
99
        bool ret = data.casePacker.getTag(pJsonDoc, tag);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
100 101
        assert(ret); // Json message does not contain a tag;
        (void)ret;
102
        data.casePacker.removeTag(pJsonDoc);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
103 104 105 106 107

        // Check if topic must be advertised.
        // Advertised topics are stored in advertisedTopicsHashList as
        // a hash.
        HashType hash = ROSBridge::ComPrivate::getHash(tag.topic());
108 109 110 111 112 113 114 115
        if ( data.advertisedTopicsHashList.count(hash) == 0) {
            data.advertisedTopicsHashList.insert(hash);
            {
                std::lock_guard<std::mutex> lk(data.rbcMutex);
                data.rbc.advertise(ROSBridge::ComPrivate::_topicAdvertiserKey,
                                   tag.topic(),
                                   tag.messageType() );
            }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
116 117 118
        }
        // Debug output.
        //std::cout << "Hash Set size: " << advertisedTopicsHashList.size() << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
119 120

        // Send Json message.
121 122 123 124
        {
            std::lock_guard<std::mutex> lk(data.rbcMutex);
            data.rbc.publish(tag.topic(), *pJsonDoc.get());
        }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
125
    } // while loop
Valentin Platzgummer's avatar
Valentin Platzgummer committed
126 127
}