topic_subscriber.cpp 3.07 KB
Newer Older
1
#include "topic_subscriber.h"
Valentin Platzgummer's avatar
Valentin Platzgummer committed
2

3
#include <thread>
Valentin Platzgummer's avatar
Valentin Platzgummer committed
4

5 6
ros_bridge::com_private::TopicSubscriber::TopicSubscriber(RosbridgeWsClient &rbc) :
    _rbc(rbc)
7
  , _stopped(std::make_shared<std::atomic_bool>(true))
Valentin Platzgummer's avatar
Valentin Platzgummer committed
8 9 10 11
{

}

12
ros_bridge::com_private::TopicSubscriber::~TopicSubscriber()
13 14 15 16
{
    this->reset();
}

17
void ros_bridge::com_private::TopicSubscriber::start()
Valentin Platzgummer's avatar
Valentin Platzgummer committed
18
{
19
    _stopped->store(false);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
20 21
}

22
void ros_bridge::com_private::TopicSubscriber::reset()
Valentin Platzgummer's avatar
Valentin Platzgummer committed
23
{
24 25
    if ( !_stopped->load() ){
        _stopped->store(true);
26
        {
27 28 29
            for (auto &item : _topicMap){
                _rbc.unsubscribe(item.second);
                _rbc.removeClient(item.first);
30 31
            }
        }
32
        _topicMap.clear();
33
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
34 35
}

36
void ros_bridge::com_private::TopicSubscriber::subscribe(
Valentin Platzgummer's avatar
Valentin Platzgummer committed
37 38 39
        const char *topic,
        const std::function<void(JsonDocUPtr)> &callback)
{
40 41
    if ( _stopped->load() )
        return;
42

43
    std::string clientName = ros_bridge::com_private::_topicSubscriberKey
44
                           + std::string(topic);
45 46 47
    auto it = _topicMap.find(clientName);
    if ( it != _topicMap.end() ){ // Topic already subscribed?
        return;
48
    }
49
    _topicMap.insert(std::make_pair(clientName, std::string(topic)));
50

Valentin Platzgummer's avatar
Valentin Platzgummer committed
51
    // Wrap callback.
52 53
    auto callbackWrapper = [callback](
            std::shared_ptr<WsClient::Connection>,
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
            std::shared_ptr<WsClient::InMessage> in_message)
    {
        // Parse document.
        JsonDoc docFull;
        docFull.Parse(in_message->string().c_str());
        if ( docFull.HasParseError() ) {
            std::cout << "Json document has parse error: "
                      << in_message->string()
                      << std::endl;
            return;
        } else if (!docFull.HasMember("msg")) {
            std::cout << "Json document does not contain a message (\"msg\"): "
                      << in_message->string()
                      << std::endl;
            return;
        }
70

71 72 73 74
        // Extract message and call callback.
        JsonDocUPtr pDoc(new JsonDoc());
        pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator());
        callback(std::move(pDoc));
75 76 77 78
    };

    if ( !_rbc.topicAvailable(topic) ){
        // Wait until topic is available.
79
        std::cout << "TopicSubscriber: Starting wait thread, " << clientName << std::endl;
80
        std::thread t([this, clientName, topic, callbackWrapper]{
81 82 83
            this->_rbc.waitForTopic(topic, [this]{
                return this->_stopped->load();
            });
84 85 86
            if ( !this->_stopped->load() ){
                this->_rbc.addClient(clientName);
                this->_rbc.subscribe(clientName, topic, callbackWrapper);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
87
                std::cout << "TopicSubscriber: wait thread subscription successfull: " << clientName << std::endl;
88 89
            }            
            std::cout << "TopicSubscriber: wait thread end, " << clientName << std::endl;
90 91 92
        });
        t.detach();
    } else {
93
        _rbc.addClient(clientName);
94
        _rbc.subscribe(clientName, topic, callbackWrapper);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
95
        std::cout << "TopicSubscriber: subscription successfull: " << clientName << std::endl;
96
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
97 98 99
}