TopicSubscriber.cpp 3.93 KB
Newer Older
Valentin Platzgummer's avatar
Valentin Platzgummer committed
1 2 3
#include "TopicSubscriber.h"


4 5
ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(CasePacker &casePacker,
        RosbridgeWsClient &rbc) :
Valentin Platzgummer's avatar
Valentin Platzgummer committed
6 7
    _casePacker(casePacker)
  , _rbc(rbc)
8
  , _running(false)
Valentin Platzgummer's avatar
Valentin Platzgummer committed
9 10 11 12
{

}

13 14 15 16 17
ROSBridge::ComPrivate::TopicSubscriber::~TopicSubscriber()
{
    this->reset();
}

Valentin Platzgummer's avatar
Valentin Platzgummer committed
18 19
void ROSBridge::ComPrivate::TopicSubscriber::start()
{
20
    _running = true;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
21 22
}

23
void ROSBridge::ComPrivate::TopicSubscriber::reset()
Valentin Platzgummer's avatar
Valentin Platzgummer committed
24
{
25
    if ( _running ){
26
        _running = false;
27 28
        {
            for (std::string clientName : _clientList){
29
                _rbc.removeClient(clientName);
30 31 32
            }
        }
        {
33 34
            std::lock_guard<std::mutex> lk(_callbackMapStruct.mutex);
            _callbackMapStruct.map.clear();
35
        }
36 37
        _clientList.clear();
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
38 39 40 41 42 43
}

bool ROSBridge::ComPrivate::TopicSubscriber::subscribe(
        const char *topic,
        const std::function<void(JsonDocUPtr)> &callback)
{
44 45 46 47 48 49 50 51 52
    if ( !_running )
        return false;

    std::string clientName = ROSBridge::ComPrivate::_topicSubscriberKey
                           + std::string(topic);
    _clientList.push_back(clientName);

    HashType hash = getHash(clientName);
    {
53 54
        std::lock_guard<std::mutex> lk(_callbackMapStruct.mutex);
        auto ret = _callbackMapStruct.map.insert(std::make_pair(hash, callback)); //
55 56 57 58 59
        if ( !ret.second )
            return false; // Topic subscription already present.

    }

60 61 62
    using namespace std::placeholders;
    auto f = std::bind(&ROSBridge::ComPrivate::subscriberCallback,
                      hash,
63
                      std::ref(_callbackMapStruct),
64 65 66 67 68 69
                      _1, _2);

//    std::cout << std::endl;
//    std::cout << "topic subscription" << std::endl;
//    std::cout << "client name: " << clientName << std::endl;
//    std::cout << "topic: " << topic << std::endl;
70
    {
71 72 73 74 75 76 77 78
        auto start = std::chrono::high_resolution_clock::now();
        _rbc.addClient(clientName);
        auto end = std::chrono::high_resolution_clock::now();
        std::cout  << "add client time: "
                   << std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
                   << " ms" << std::endl;
        start = std::chrono::high_resolution_clock::now();
        _rbc.subscribe(clientName,
79 80
                        topic,
                        f );
81 82 83 84
        end = std::chrono::high_resolution_clock::now();
        std::cout  << "subscribe time: "
                   << std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
                   << " ms" << std::endl;
85
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
86 87 88 89 90 91 92 93

    return true;
}


using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>;
void ROSBridge::ComPrivate::subscriberCallback(
        const HashType &hash,
94
        ROSBridge::ComPrivate::MapStruct &mapWrapper,
Valentin Platzgummer's avatar
Valentin Platzgummer committed
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
        std::shared_ptr<WsClient::Connection>,
        std::shared_ptr<WsClient::InMessage> in_message)
{
    // Parse document.
    JsonDoc docFull;
    docFull.Parse(in_message->string().c_str());
    if ( docFull.HasParseError() ) {
        std::cout << "Json document has parse error: "
                  << in_message->string()
                  << std::endl;
        return;
    } else if (!docFull.HasMember("msg")) {
        std::cout << "Json document does not contain a message (\"msg\"): "
                  << in_message->string()
                  << std::endl;
        return;
    }


//    std::cout << "Json document: "
//              << in_message->string()
//              << std::endl;

    // Search callback.
119 120 121 122 123 124 125 126 127
    CallbackType callback;
    {
        std::lock_guard<std::mutex> lk(mapWrapper.mutex);
        auto it = mapWrapper.map.find(hash);
        if (it == mapWrapper.map.end()) {
            //assert(false); // callback not found
            return;
        }
        callback = it->second;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
128 129 130 131 132
    }

    // Extract message and call callback.
    JsonDocUPtr pDoc(new JsonDoc());
    pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator());
133
    callback(std::move(pDoc));
Valentin Platzgummer's avatar
Valentin Platzgummer committed
134 135 136
    return;

}