TopicPublisher.h 1.72 KB
Newer Older
Valentin Platzgummer's avatar
Valentin Platzgummer committed
1 2 3 4 5 6 7 8 9 10 11
#pragma once

#include "ros_bridge/include/TypeFactory.h"
#include "ros_bridge/include/CasePacker.h"
#include "ros_bridge/include/ComPrivateInclude.h"
#include "ros_bridge/include/RosBridgeClient.h"

#include <thread>
#include <deque>
#include <atomic>
#include <mutex>
Valentin Platzgummer's avatar
Valentin Platzgummer committed
12
#include <set>
13
#include <condition_variable>
Valentin Platzgummer's avatar
Valentin Platzgummer committed
14 15 16 17

namespace ROSBridge {
namespace ComPrivate {

18 19
struct ThreadData;

Valentin Platzgummer's avatar
Valentin Platzgummer committed
20
class TopicPublisher
Valentin Platzgummer's avatar
Valentin Platzgummer committed
21 22
{
    typedef std::unique_ptr<std::thread> ThreadPtr;
23
    using CondVar = std::condition_variable;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
24 25
public:

Valentin Platzgummer's avatar
Valentin Platzgummer committed
26 27
    TopicPublisher() = delete;
    TopicPublisher(CasePacker *casePacker,
28 29
                   RosbridgeWsClient *rbc,
                   std::mutex *rbcMutex);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
30 31

    ~TopicPublisher();
Valentin Platzgummer's avatar
Valentin Platzgummer committed
32

33
    //! @brief Starts the publisher.
Valentin Platzgummer's avatar
Valentin Platzgummer committed
34
    void start();
35 36 37 38

    //! @brief Resets the publisher.
    void reset();

Valentin Platzgummer's avatar
Valentin Platzgummer committed
39
    void publish(JsonDocUPtr docPtr){
40
        {
Valentin Platzgummer's avatar
Valentin Platzgummer committed
41 42
        std::lock_guard<std::mutex> lock(_queueMutex);
        _queue.push_back(std::move(docPtr));
43 44
        }
        _cv.notify_one();  // Wake publisher thread.
Valentin Platzgummer's avatar
Valentin Platzgummer committed
45 46
    }

47 48 49 50 51 52
    template<class T>
    void publish(const T &msg, const std::string &topic){
        JsonDocUPtr docPtr(_casePacker->pack(msg, topic));
        publish(std::move(docPtr));
    }

Valentin Platzgummer's avatar
Valentin Platzgummer committed
53 54 55
private:
    JsonQueue           _queue;
    std::mutex          _queueMutex;
56
    std::atomic<bool>   _running;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
57 58
    CasePacker         *_casePacker;
    RosbridgeWsClient  *_rbc;
59
    std::mutex         *_rbcMutex;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
60 61
    HashSet            _advertisedTopicsHashList; // Not thread save! This container
                                                  // is manipulated by transmittLoop only!
62 63
    CondVar             _cv;
    ThreadPtr           _pThread;
Valentin Platzgummer's avatar
Valentin Platzgummer committed
64 65 66
};


67
void transmittLoop(ThreadData data);
Valentin Platzgummer's avatar
Valentin Platzgummer committed
68 69 70 71


} // namespace CommunicatorDetail
} // namespace ROSBridge