TopicSubscriber.cpp 3.48 KB
Newer Older
Valentin Platzgummer's avatar
Valentin Platzgummer committed
1 2
#include "TopicSubscriber.h"

3
#include <thread>
Valentin Platzgummer's avatar
Valentin Platzgummer committed
4

5 6
ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(CasePacker &casePacker,
        RosbridgeWsClient &rbc) :
Valentin Platzgummer's avatar
Valentin Platzgummer committed
7 8
    _casePacker(casePacker)
  , _rbc(rbc)
9
  , _stopped(std::make_shared<std::atomic_bool>(true))
Valentin Platzgummer's avatar
Valentin Platzgummer committed
10 11 12 13
{

}

14 15 16 17 18
ROSBridge::ComPrivate::TopicSubscriber::~TopicSubscriber()
{
    this->reset();
}

Valentin Platzgummer's avatar
Valentin Platzgummer committed
19 20
void ROSBridge::ComPrivate::TopicSubscriber::start()
{
21
    _stopped->store(false);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
22 23
}

24
void ROSBridge::ComPrivate::TopicSubscriber::reset()
Valentin Platzgummer's avatar
Valentin Platzgummer committed
25
{
26
    if ( !_stopped->load() ){
27
        std::cout << "TopicSubscriber: _stopped->store(true) " << std::endl;
28
        _stopped->store(true);
29
        {
30
            for (auto &item : _topicMap){
31
                std::cout << "TopicSubscriber: unsubscribe " << item.second << std::endl;
32
                _rbc.unsubscribe(item.second);
33
                std::cout << "TopicSubscriber: removeClient " << item.first << std::endl;
34
                _rbc.removeClient(item.first);
35 36
            }
        }
37
        _topicMap.clear();
38
        std::cout << "TopicSubscriber: _topicMap cleared " << std::endl;
39
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
40 41
}

42
void ROSBridge::ComPrivate::TopicSubscriber::subscribe(
Valentin Platzgummer's avatar
Valentin Platzgummer committed
43 44 45
        const char *topic,
        const std::function<void(JsonDocUPtr)> &callback)
{
46 47
    if ( _stopped->load() )
        return;
48 49 50

    std::string clientName = ROSBridge::ComPrivate::_topicSubscriberKey
                           + std::string(topic);
51 52 53
    auto it = _topicMap.find(clientName);
    if ( it != _topicMap.end() ){ // Topic already subscribed?
        return;
54
    }
55
    _topicMap.insert(std::make_pair(clientName, std::string(topic)));
56

Valentin Platzgummer's avatar
Valentin Platzgummer committed
57
    // Wrap callback.
58
    using namespace std::placeholders;
59 60
    auto callbackWrapper = [callback](
            std::shared_ptr<WsClient::Connection>,
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
            std::shared_ptr<WsClient::InMessage> in_message)
    {
        // Parse document.
        JsonDoc docFull;
        docFull.Parse(in_message->string().c_str());
        if ( docFull.HasParseError() ) {
            std::cout << "Json document has parse error: "
                      << in_message->string()
                      << std::endl;
            return;
        } else if (!docFull.HasMember("msg")) {
            std::cout << "Json document does not contain a message (\"msg\"): "
                      << in_message->string()
                      << std::endl;
            return;
        }
77

78 79 80 81
        // Extract message and call callback.
        JsonDocUPtr pDoc(new JsonDoc());
        pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator());
        callback(std::move(pDoc));
82 83 84 85
    };

    if ( !_rbc.topicAvailable(topic) ){
        // Wait until topic is available.
86
        std::cout << "TopicSubscriber: Starting wait thread, " << clientName << std::endl;
87
        std::thread t([this, clientName, topic, callbackWrapper]{
88 89 90
            this->_rbc.waitForTopic(topic, [this]{
                return this->_stopped->load();
            });
91 92 93
            if ( !this->_stopped->load() ){
                this->_rbc.addClient(clientName);
                this->_rbc.subscribe(clientName, topic, callbackWrapper);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
94
                std::cout << "TopicSubscriber: wait thread subscription successfull: " << clientName << std::endl;
95 96
            }            
            std::cout << "TopicSubscriber: wait thread end, " << clientName << std::endl;
97 98 99
        });
        t.detach();
    } else {
100
        _rbc.addClient(clientName);
101
        _rbc.subscribe(clientName, topic, callbackWrapper);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
102
        std::cout << "TopicSubscriber: subscription successfull: " << clientName << std::endl;
103
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
104 105 106
}