TopicPublisher.cpp 3.1 KB
Newer Older
Valentin Platzgummer's avatar
Valentin Platzgummer committed
1
#include "TopicPublisher.h"
Valentin Platzgummer's avatar
Valentin Platzgummer committed
2 3


4

5 6 7 8 9 10 11 12 13 14 15 16


struct ROSBridge::ComPrivate::ThreadData
{
    const ROSBridge::CasePacker      &casePacker;
    RosbridgeWsClient                &rbc;
    ROSBridge::ComPrivate::JsonQueue &queue;
    std::mutex                       &queueMutex;
    const std::atomic<bool>          &running;
    std::condition_variable          &cv;
};

17 18
ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker &casePacker,
                                                      RosbridgeWsClient &rbc) :
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
      _running(false)
    , _casePacker(casePacker)
    , _rbc(rbc)
{

}

ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher()
{
    this->reset();
}

void ROSBridge::ComPrivate::TopicPublisher::start()
{
    if ( _running.load() )  // start called while thread running.
        return;
    _running.store(true);
36
    ROSBridge::ComPrivate::ThreadData data{
37 38
                _casePacker,
                _rbc,
39 40 41 42 43 44
                _queue,
                _queueMutex,
                _running,
                _cv
    };
    _pThread = std::make_unique<std::thread>(&ROSBridge::ComPrivate::transmittLoop, data);
45 46 47 48 49 50 51
}

void ROSBridge::ComPrivate::TopicPublisher::reset()
{
    if ( !_running.load() )  // stop called while thread not running.
        return;
    _running.store(false);
52
    _cv.notify_one(); // Wake publisher thread.
53 54 55 56 57 58
    if ( !_pThread )
        return;
    _pThread->join();
    _queue.clear();
}

59
void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data)
Valentin Platzgummer's avatar
Valentin Platzgummer committed
60
{
61
    // Init.
62
    ClientMap           clientMap;
63
    // Main Loop.
64 65 66 67 68
    while(data.running.load()){
        std::unique_lock<std::mutex> lk(data.queueMutex);
        // Check if new data available, wait if not.
        if (data.queue.empty()){
            data.cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case.
Valentin Platzgummer's avatar
Valentin Platzgummer committed
69 70
            continue;
        }
71 72 73 74
        // Pop message from queue.
        JsonDocUPtr pJsonDoc(std::move(data.queue.front()));
        data.queue.pop_front();
        lk.unlock();
Valentin Platzgummer's avatar
Valentin Platzgummer committed
75 76 77

        // Get tag from Json message and remove it.
        Tag tag;
78
        bool ret = data.casePacker.getTag(pJsonDoc, tag);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
79 80
        assert(ret); // Json message does not contain a tag;
        (void)ret;
81
        data.casePacker.removeTag(pJsonDoc);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
82 83 84 85

        // Check if topic must be advertised.
        // Advertised topics are stored in advertisedTopicsHashList as
        // a hash.
86 87 88 89 90 91
        std::string clientName = ROSBridge::ComPrivate::_topicAdvertiserKey
                               + tag.topic();
        HashType hash = ROSBridge::ComPrivate::getHash(clientName);
        auto it = clientMap.find(hash);
        if ( it == clientMap.end()) { // Need to advertise topic?
            clientMap.insert(std::make_pair(hash, clientName));
92 93 94 95
            data.rbc.addClient(clientName);
            data.rbc.advertise(clientName,
                               tag.topic(),
                               tag.messageType() );
Valentin Platzgummer's avatar
Valentin Platzgummer committed
96
        }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
97

98
        // Publish Json message.
99
        data.rbc.publish(tag.topic(), *pJsonDoc.get());
Valentin Platzgummer's avatar
Valentin Platzgummer committed
100
    } // while loop
101 102

    // Tidy up.
103 104
    for (auto& it : clientMap)
        data.rbc.removeClient(it.second);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
105 106
}