TopicPublisher.h 1.72 KB
Newer Older
Valentin Platzgummer's avatar
Valentin Platzgummer committed
1 2 3 4 5 6 7 8 9 10 11
#pragma once

#include "ros_bridge/include/TypeFactory.h"
#include "ros_bridge/include/CasePacker.h"
#include "ros_bridge/include/ComPrivateInclude.h"
#include "ros_bridge/include/RosBridgeClient.h"

#include <thread>
#include <deque>
#include <atomic>
#include <mutex>
Valentin Platzgummer's avatar
Valentin Platzgummer committed
12
#include <set>
Valentin Platzgummer's avatar
Valentin Platzgummer committed
13 14 15 16

namespace ROSBridge {
namespace ComPrivate {

Valentin Platzgummer's avatar
Valentin Platzgummer committed
17
class TopicPublisher
Valentin Platzgummer's avatar
Valentin Platzgummer committed
18 19 20 21
{
    typedef std::unique_ptr<std::thread> ThreadPtr;
public:

Valentin Platzgummer's avatar
Valentin Platzgummer committed
22 23 24 25 26
    TopicPublisher() = delete;
    TopicPublisher(CasePacker *casePacker,
                   RosbridgeWsClient *rbc);

    ~TopicPublisher();
Valentin Platzgummer's avatar
Valentin Platzgummer committed
27

28
    //! @brief Starts the publisher.
Valentin Platzgummer's avatar
Valentin Platzgummer committed
29
    void start();
30 31 32 33

    //! @brief Resets the publisher.
    void reset();

Valentin Platzgummer's avatar
Valentin Platzgummer committed
34
    template<class T>
Valentin Platzgummer's avatar
Valentin Platzgummer committed
35
    void publish(const T &msg, const std::string &topic){
Valentin Platzgummer's avatar
Valentin Platzgummer committed
36
        JsonDocUPtr docPtr(_casePacker->pack(msg, topic));
Valentin Platzgummer's avatar
Valentin Platzgummer committed
37
        publish(std::move(docPtr));
Valentin Platzgummer's avatar
Valentin Platzgummer committed
38
    }
Valentin Platzgummer's avatar
Valentin Platzgummer committed
39
    void publish(JsonDocUPtr docPtr){
Valentin Platzgummer's avatar
Valentin Platzgummer committed
40 41 42 43 44 45 46
        std::lock_guard<std::mutex> lock(_queueMutex);
        _queue.push_back(std::move(docPtr));
    }

private:
    JsonQueue           _queue;
    std::mutex          _queueMutex;
47
    std::atomic<bool>   _running;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
48 49 50
    CasePacker         *_casePacker;
    ThreadPtr           _pThread;
    RosbridgeWsClient  *_rbc;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
51 52
    HashSet            _advertisedTopicsHashList; // Not thread save! This container
                                                  // is manipulated by transmittLoop only!
Valentin Platzgummer's avatar
Valentin Platzgummer committed
53 54 55 56 57 58 59
};


void transmittLoop(const ROSBridge::CasePacker &casePacker,
                   RosbridgeWsClient &rbc,
                   ROSBridge::ComPrivate::JsonQueue &queue,
                   std::mutex &queueMutex,
Valentin Platzgummer's avatar
Valentin Platzgummer committed
60
                   HashSet &advertisedTopicsHashList,
61
                   const std::atomic<bool> &running);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
62 63 64 65


} // namespace CommunicatorDetail
} // namespace ROSBridge