TopicPublisher.cpp 3.44 KB
Newer Older
Valentin Platzgummer's avatar
Valentin Platzgummer committed
1
#include "TopicPublisher.h"
Valentin Platzgummer's avatar
Valentin Platzgummer committed
2 3 4 5 6 7


void ROSBridge::ComPrivate::transmittLoop(const ROSBridge::CasePacker &casePacker,
                                          RosbridgeWsClient &rbc,
                                          ROSBridge::ComPrivate::JsonQueue &queue,
                                          std::mutex &queueMutex,
Valentin Platzgummer's avatar
Valentin Platzgummer committed
8
                                          HashSet &advertisedTopicsHashList,
Valentin Platzgummer's avatar
Valentin Platzgummer committed
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
                                          const std::atomic<bool> &stopFlag)
{
    rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey);

    while(!stopFlag.load()){
        // Pop message from queue.
        queueMutex.lock();
        //std::cout << "Queue size: " << queue.size() << std::endl;
        if (queue.empty()){
            queueMutex.unlock();
            std::this_thread::sleep_for(std::chrono::milliseconds(20));
            continue;
        }
        JsonDocUPtr pJsonDoc(std::move(queue.front()));
        queue.pop_front();
        queueMutex.unlock();

        // Debug output.
//        std::cout << "Transmitter loop json document:" << std::endl;
//        rapidjson::OStreamWrapper out(std::cout);
//        rapidjson::Writer<rapidjson::OStreamWrapper> writer(out);
//        pJsonDoc->Accept(writer);
//        std::cout << std::endl << std::endl;

        // Get tag from Json message and remove it.
        Tag tag;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
35
        bool ret = casePacker.getTag(pJsonDoc, tag);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
36 37
        assert(ret); // Json message does not contain a tag;
        (void)ret;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
38 39 40 41 42 43 44 45 46 47 48 49 50 51
        casePacker.removeTag(pJsonDoc);

        // Check if topic must be advertised.
        // Advertised topics are stored in advertisedTopicsHashList as
        // a hash.
        HashType hash = ROSBridge::ComPrivate::getHash(tag.topic());
        if ( advertisedTopicsHashList.count(hash) == 0) {
            advertisedTopicsHashList.insert(hash);
            rbc.advertise(ROSBridge::ComPrivate::_topicAdvertiserKey,
                          tag.topic(),
                          tag.messageType() );
        }
        // Debug output.
        //std::cout << "Hash Set size: " << advertisedTopicsHashList.size() << std::endl;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
52 53 54

        // Send Json message.
        rbc.publish(tag.topic(), *pJsonDoc.get());
Valentin Platzgummer's avatar
Valentin Platzgummer committed
55
    } // while loop
Valentin Platzgummer's avatar
Valentin Platzgummer committed
56 57
}

Valentin Platzgummer's avatar
Valentin Platzgummer committed
58 59
ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker,
                                                      RosbridgeWsClient *rbc) :
Valentin Platzgummer's avatar
Valentin Platzgummer committed
60 61 62 63 64 65 66
      _stopFlag(true)
    , _casePacker(casePacker)
    , _rbc(rbc)
{

}

Valentin Platzgummer's avatar
Valentin Platzgummer committed
67 68 69 70 71 72
ROSBridge::ComPrivate::TopicPublisher::~TopicPublisher()
{
    this->stop();
}

void ROSBridge::ComPrivate::TopicPublisher::start()
Valentin Platzgummer's avatar
Valentin Platzgummer committed
73 74 75
{
    if ( !_stopFlag.load() )  // start called while thread running.
        return;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
76 77
    _stopFlag.store(false);    
    _rbc->addClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
78 79 80 81 82
    _pThread.reset(new std::thread(&ROSBridge::ComPrivate::transmittLoop,
                                   std::cref(*_casePacker),
                                   std::ref(*_rbc),
                                   std::ref(_queue),
                                   std::ref(_queueMutex),
Valentin Platzgummer's avatar
Valentin Platzgummer committed
83
                                   std::ref(_advertisedTopicsHashList),
Valentin Platzgummer's avatar
Valentin Platzgummer committed
84 85 86
                                   std::cref(_stopFlag)));
}

Valentin Platzgummer's avatar
Valentin Platzgummer committed
87
void ROSBridge::ComPrivate::TopicPublisher::stop()
Valentin Platzgummer's avatar
Valentin Platzgummer committed
88 89 90 91 92 93 94
{
    if ( _stopFlag.load() )  // start called while thread not running.
        return;
    _stopFlag.store(true);
    if ( !_pThread )
        return;
    _pThread->join();
Valentin Platzgummer's avatar
Valentin Platzgummer committed
95 96
    _pThread.reset();    
    _rbc->removeClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
97
}