diff --git a/src/FlightDisplay/FlightDisplayViewMap.qml b/src/FlightDisplay/FlightDisplayViewMap.qml index e51f73b4486c934049c99529efdb8eae78a3c101..3a08cab9fbd6d87c0a12795b91a5235844324951 100644 --- a/src/FlightDisplay/FlightDisplayViewMap.qml +++ b/src/FlightDisplay/FlightDisplayViewMap.qml @@ -251,7 +251,8 @@ FlightMap { delegate: MapCircle{ center: modelData - border.color: "transparent" + border.color: "black" + border.width: 1 color: getColor(wimaController.nemoProgress[index]) radius: 0.6 opacity: 1 diff --git a/src/Wima/WimaController.cc b/src/Wima/WimaController.cc index b3fb20ac580791c9709bc80625b45c13ed4cec86..6ad642c1af08e472fcfa174724ba49c18ca9a926 100644 --- a/src/Wima/WimaController.cc +++ b/src/Wima/WimaController.cc @@ -715,6 +715,10 @@ void WimaController::_eventTimerHandler() if (_enableSnake.rawValue().toBool() && rosBridgeTicker.ready()) { + qWarning() << "Connectable: " << _pRosBridge->connected() << "\n"; + + + auto start = std::chrono::high_resolution_clock::now(); _pRosBridge->publish(_snakeTilesLocal, "/snake/tiles"); _pRosBridge->publish(_snakeOrigin, "/snake/origin"); @@ -736,6 +740,10 @@ void WimaController::_eventTimerHandler() emit WimaController::nemoStatusChanged(); emit WimaController::nemoStatusStringChanged(); }); + + auto end = std::chrono::high_resolution_clock::now(); + qWarning() << "Duration: " << std::chrono::duration_cast(end-start).count() + << " ms\n"; } } @@ -862,7 +870,7 @@ void WimaController::_snakeStoreWorkerResults() const WorkerResult_t &r = _snakeWorker.getResult(); _setSnakeCalcInProgress(false); if (!r.success) { - qgcApp()->showMessage(r.errorMessage); + //qgcApp()->showMessage(r.errorMessage); return; } diff --git a/src/comm/ros_bridge/include/GenericMessages.h b/src/comm/ros_bridge/include/GenericMessages.h index ae381409e1b54eb5472210b332985ed9c7b68d67..7e19254cd27b243f9574cddb217849a034a070d9 100644 --- a/src/comm/ros_bridge/include/GenericMessages.h +++ b/src/comm/ros_bridge/include/GenericMessages.h @@ -353,7 +353,7 @@ public: typedef MessageGroups::HeartbeatGroup Group; Heartbeat(){} Heartbeat(int status) :_status(status){} - Heartbeat(const Heartbeat &heartbeat) :_status(heartbeat.status()){} + Heartbeat(const Heartbeat &heartbeat) : MessageBaseClass(), _status(heartbeat.status()){} ~Heartbeat() = default; virtual Heartbeat *Clone() const override { return new Heartbeat(*this); } diff --git a/src/comm/ros_bridge/include/MessageGroups.h b/src/comm/ros_bridge/include/MessageGroups.h index 8572183e1e3a9cbd039d2ddadc8d1a79c0b8cd29..7b95eef76aa215455731100b6007595b588266bb 100644 --- a/src/comm/ros_bridge/include/MessageGroups.h +++ b/src/comm/ros_bridge/include/MessageGroups.h @@ -79,6 +79,17 @@ struct GeometryMsgs { }; }; +struct GeographicMsgs { + + static StringType label() {return "geographic_msgs";} + + //! + //! \brief The GeoPointGroup struct is used the mark a class as a ROS geographic_msgs/GeoPoint message. + struct GeoPointGroup { + static StringType label() {return "GeoPoint";} + }; +}; + struct JSKRecognitionMsgs { static StringType label() {return "jsk_recognition_msgs";} @@ -118,8 +129,8 @@ typedef MessageGroups::MessageGroup Point32Group; -typedef MessageGroups::MessageGroup +typedef MessageGroups::MessageGroup GeoPointGroup; typedef MessageGroups::MessageGroup #include #include +#include +#include using WsClient = SimpleWeb::SocketClient; using InMessage = std::function, std::shared_ptr)>; @@ -24,6 +26,7 @@ class RosbridgeWsClient std::string server_port_path; std::unordered_map> client_map; + // Starts the client. void start(const std::string& client_name, std::shared_ptr client, const std::string& message) { if (!client->on_open) @@ -103,6 +106,34 @@ public: } } + // The execution can take up to 100 ms! + bool connected(){ + + auto p = std::make_shared>(); + auto future = p->get_future(); + + auto callback = [](std::shared_ptr connection, std::shared_ptr> p) { + p->set_value(); + connection->send_close(1000); + }; + std::shared_ptr status_client = std::make_shared(server_port_path); + status_client->on_open = std::bind(callback, std::placeholders::_1, p); + + std::async([status_client]{ + + status_client->start(); + status_client->on_open = NULL; + status_client->on_message = NULL; + status_client->on_close = NULL; + status_client->on_error = NULL; + + }); + + auto status = future.wait_for(std::chrono::milliseconds(100)); + return status == std::future_status::ready; + } + + // Adds a client to the client_map. void addClient(const std::string& client_name) { std::unordered_map>::iterator it = client_map.find(client_name); @@ -170,6 +201,7 @@ public: #endif } + // Gets the client from client_map and starts it. Advertising is essentially sending a message. void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "") { std::unordered_map>::iterator it = client_map.find(client_name); diff --git a/src/comm/ros_bridge/include/TopicPublisher.cpp b/src/comm/ros_bridge/include/TopicPublisher.cpp index 3b8d62639e94466610fdcf6fdadef5cdeb7cabdb..5e36c2e6864a3fea629400b82f6e2e3d7252d173 100644 --- a/src/comm/ros_bridge/include/TopicPublisher.cpp +++ b/src/comm/ros_bridge/include/TopicPublisher.cpp @@ -2,11 +2,27 @@ + + +struct ROSBridge::ComPrivate::ThreadData +{ + const ROSBridge::CasePacker &casePacker; + RosbridgeWsClient &rbc; + std::mutex &rbcMutex; + ROSBridge::ComPrivate::JsonQueue &queue; + std::mutex &queueMutex; + ROSBridge::ComPrivate::HashSet &advertisedTopicsHashList; + const std::atomic &running; + std::condition_variable &cv; +}; + ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker, - RosbridgeWsClient *rbc) : + RosbridgeWsClient *rbc, + std::mutex *rbcMutex) : _running(false) , _casePacker(casePacker) , _rbc(rbc) + , _rbcMutex(rbcMutex) { } @@ -21,14 +37,22 @@ void ROSBridge::ComPrivate::TopicPublisher::start() if ( _running.load() ) // start called while thread running. return; _running.store(true); - _rbc->addClient(ROSBridge::ComPrivate::_topicAdvertiserKey); - _pThread.reset(new std::thread(&ROSBridge::ComPrivate::transmittLoop, - std::cref(*_casePacker), - std::ref(*_rbc), - std::ref(_queue), - std::ref(_queueMutex), - std::ref(_advertisedTopicsHashList), - std::cref(_running))); + { + std::lock_guard lk(*_rbcMutex); + _rbc->addClient(ROSBridge::ComPrivate::_topicAdvertiserKey); + _rbc->addClient(ROSBridge::ComPrivate::_topicPublisherKey); + } + ROSBridge::ComPrivate::ThreadData data{ + *_casePacker, + *_rbc, + *_rbcMutex, + _queue, + _queueMutex, + _advertisedTopicsHashList, + _running, + _cv + }; + _pThread = std::make_unique(&ROSBridge::ComPrivate::transmittLoop, data); } void ROSBridge::ComPrivate::TopicPublisher::reset() @@ -36,36 +60,32 @@ void ROSBridge::ComPrivate::TopicPublisher::reset() if ( !_running.load() ) // stop called while thread not running. return; _running.store(false); + _cv.notify_one(); // Wake publisher thread. if ( !_pThread ) return; _pThread->join(); - _pThread.reset(); - _rbc->removeClient(ROSBridge::ComPrivate::_topicAdvertiserKey); + { + std::lock_guard lk(*_rbcMutex); + _rbc->removeClient(ROSBridge::ComPrivate::_topicAdvertiserKey); + _rbc->removeClient(ROSBridge::ComPrivate::_topicPublisherKey); + } _queue.clear(); _advertisedTopicsHashList.clear(); } -void ROSBridge::ComPrivate::transmittLoop(const ROSBridge::CasePacker &casePacker, - RosbridgeWsClient &rbc, - ROSBridge::ComPrivate::JsonQueue &queue, - std::mutex &queueMutex, - HashSet &advertisedTopicsHashList, - const std::atomic &running) +void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data) { - rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey); - - while(running.load()){ - // Pop message from queue. - queueMutex.lock(); - //std::cout << "Queue size: " << queue.size() << std::endl; - if (queue.empty()){ - queueMutex.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(20)); + while(data.running.load()){ + std::unique_lock lk(data.queueMutex); + // Check if new data available, wait if not. + if (data.queue.empty()){ + data.cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case. continue; } - JsonDocUPtr pJsonDoc(std::move(queue.front())); - queue.pop_front(); - queueMutex.unlock(); + // Pop message from queue. + JsonDocUPtr pJsonDoc(std::move(data.queue.front())); + data.queue.pop_front(); + lk.unlock(); // Debug output. // std::cout << "Transmitter loop json document:" << std::endl; @@ -76,26 +96,32 @@ void ROSBridge::ComPrivate::transmittLoop(const ROSBridge::CasePacker &casePacke // Get tag from Json message and remove it. Tag tag; - bool ret = casePacker.getTag(pJsonDoc, tag); + bool ret = data.casePacker.getTag(pJsonDoc, tag); assert(ret); // Json message does not contain a tag; (void)ret; - casePacker.removeTag(pJsonDoc); + data.casePacker.removeTag(pJsonDoc); // Check if topic must be advertised. // Advertised topics are stored in advertisedTopicsHashList as // a hash. HashType hash = ROSBridge::ComPrivate::getHash(tag.topic()); - if ( advertisedTopicsHashList.count(hash) == 0) { - advertisedTopicsHashList.insert(hash); - rbc.advertise(ROSBridge::ComPrivate::_topicAdvertiserKey, - tag.topic(), - tag.messageType() ); + if ( data.advertisedTopicsHashList.count(hash) == 0) { + data.advertisedTopicsHashList.insert(hash); + { + std::lock_guard lk(data.rbcMutex); + data.rbc.advertise(ROSBridge::ComPrivate::_topicAdvertiserKey, + tag.topic(), + tag.messageType() ); + } } // Debug output. //std::cout << "Hash Set size: " << advertisedTopicsHashList.size() << std::endl; // Send Json message. - rbc.publish(tag.topic(), *pJsonDoc.get()); + { + std::lock_guard lk(data.rbcMutex); + data.rbc.publish(tag.topic(), *pJsonDoc.get()); + } } // while loop } diff --git a/src/comm/ros_bridge/include/TopicPublisher.h b/src/comm/ros_bridge/include/TopicPublisher.h index a762eafd42e01197d9431929ff28ecb29ebb96ba..87ede08a3925dda3eae266e7dbab771bb4778756 100644 --- a/src/comm/ros_bridge/include/TopicPublisher.h +++ b/src/comm/ros_bridge/include/TopicPublisher.h @@ -10,18 +10,23 @@ #include #include #include +#include namespace ROSBridge { namespace ComPrivate { +struct ThreadData; + class TopicPublisher { typedef std::unique_ptr ThreadPtr; + using CondVar = std::condition_variable; public: TopicPublisher() = delete; TopicPublisher(CasePacker *casePacker, - RosbridgeWsClient *rbc); + RosbridgeWsClient *rbc, + std::mutex *rbcMutex); ~TopicPublisher(); @@ -37,8 +42,11 @@ public: publish(std::move(docPtr)); } void publish(JsonDocUPtr docPtr){ + { std::lock_guard lock(_queueMutex); _queue.push_back(std::move(docPtr)); + } + _cv.notify_one(); // Wake publisher thread. } private: @@ -46,19 +54,16 @@ private: std::mutex _queueMutex; std::atomic _running; CasePacker *_casePacker; - ThreadPtr _pThread; RosbridgeWsClient *_rbc; + std::mutex *_rbcMutex; HashSet _advertisedTopicsHashList; // Not thread save! This container // is manipulated by transmittLoop only! + CondVar _cv; + ThreadPtr _pThread; }; -void transmittLoop(const ROSBridge::CasePacker &casePacker, - RosbridgeWsClient &rbc, - ROSBridge::ComPrivate::JsonQueue &queue, - std::mutex &queueMutex, - HashSet &advertisedTopicsHashList, - const std::atomic &running); +void transmittLoop(ThreadData data); } // namespace CommunicatorDetail diff --git a/src/comm/ros_bridge/include/TopicSubscriber.cpp b/src/comm/ros_bridge/include/TopicSubscriber.cpp index a76971253cf90b17be7f03919a9522b11be67e15..f58c1088592a8f02c997b917772c82b50fd3433a 100644 --- a/src/comm/ros_bridge/include/TopicSubscriber.cpp +++ b/src/comm/ros_bridge/include/TopicSubscriber.cpp @@ -1,11 +1,11 @@ #include "TopicSubscriber.h" -ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber( - ROSBridge::CasePacker *casePacker, - RosbridgeWsClient *rbc) : +ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(ROSBridge::CasePacker *casePacker, + RosbridgeWsClient *rbc, std::mutex *rbcMutex) : _casePacker(casePacker) , _rbc(rbc) + , _rbcMutex(rbcMutex) , _running(false) { @@ -24,10 +24,17 @@ void ROSBridge::ComPrivate::TopicSubscriber::start() void ROSBridge::ComPrivate::TopicSubscriber::reset() { if ( _running ){ - for (std::string clientName : _clientList) - _rbc->removeClient(clientName); + { + std::lock_guard lk(*_rbcMutex); + for (std::string clientName : _clientList){ + _rbc->removeClient(clientName); + } + } _running = false; - _callbackMap.clear(); + { + std::lock_guard lk(_callbackMap.mutex); + _callbackMap.map.clear(); + } _clientList.clear(); } } @@ -44,25 +51,30 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( _clientList.push_back(clientName); HashType hash = getHash(clientName); - auto ret = _callbackMap.insert(std::make_pair(hash, callback)); // - if ( !ret.second ) - return false; // Topic subscription already present. - { + std::lock_guard lk(_callbackMap.mutex); + auto ret = _callbackMap.map.insert(std::make_pair(hash, callback)); // + if ( !ret.second ) + return false; // Topic subscription already present. + + } + using namespace std::placeholders; auto f = std::bind(&ROSBridge::ComPrivate::subscriberCallback, hash, - std::cref(_callbackMap), + std::ref(_callbackMap), _1, _2); // std::cout << std::endl; // std::cout << "topic subscription" << std::endl; // std::cout << "client name: " << clientName << std::endl; // std::cout << "topic: " << topic << std::endl; - _rbc->addClient(clientName); - _rbc->subscribe(clientName, - topic, - f ); + { + std::lock_guard lk(*_rbcMutex); + _rbc->addClient(clientName); + _rbc->subscribe(clientName, + topic, + f ); } return true; @@ -72,7 +84,7 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( using WsClient = SimpleWeb::SocketClient; void ROSBridge::ComPrivate::subscriberCallback( const HashType &hash, - const ROSBridge::ComPrivate::TopicSubscriber::CallbackMap &map, + ROSBridge::ComPrivate::CallbackMapWrapper &mapWrapper, std::shared_ptr, std::shared_ptr in_message) { @@ -97,26 +109,21 @@ void ROSBridge::ComPrivate::subscriberCallback( // << std::endl; // Search callback. - auto it = map.find(hash); - if (it == map.end()) { - assert(false); // callback not found - return; + CallbackType callback; + { + std::lock_guard lk(mapWrapper.mutex); + auto it = mapWrapper.map.find(hash); + if (it == mapWrapper.map.end()) { + //assert(false); // callback not found + return; + } + callback = it->second; } // Extract message and call callback. JsonDocUPtr pDoc(new JsonDoc()); pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator()); - it->second(std::move(pDoc)); // Call callback. + callback(std::move(pDoc)); return; } - -void ROSBridge::ComPrivate::test( - std::shared_ptr, - std::shared_ptr in_message) -{ - - std::cout << "test: Json document: " - << in_message->string() - << std::endl; -} diff --git a/src/comm/ros_bridge/include/TopicSubscriber.h b/src/comm/ros_bridge/include/TopicSubscriber.h index 3c05edf7a3ba1e48c48a6d35abea63eef588a9bc..4ce46231398d5d752560a35d0a34799fc8b4d79b 100644 --- a/src/comm/ros_bridge/include/TopicSubscriber.h +++ b/src/comm/ros_bridge/include/TopicSubscriber.h @@ -8,15 +8,21 @@ namespace ROSBridge { namespace ComPrivate { + +typedef std::function CallbackType; +struct CallbackMapWrapper{ + typedef std::map Map; + Map map; + std::mutex mutex; +}; + class TopicSubscriber { typedef std::vector ClientList; -public: - typedef std::function CallbackType; - typedef std::map CallbackMap; +public: TopicSubscriber() = delete; - TopicSubscriber(CasePacker *casePacker, RosbridgeWsClient *rbc); + TopicSubscriber(CasePacker *casePacker, RosbridgeWsClient *rbc, std::mutex *rbcMutex); ~TopicSubscriber(); //! @brief Starts the subscriber. @@ -31,19 +37,20 @@ public: bool subscribe(const char* topic, const CallbackType &callback); private: + + + CasePacker *_casePacker; RosbridgeWsClient *_rbc; - CallbackMap _callbackMap; + std::mutex *_rbcMutex; + CallbackMapWrapper _callbackMap; ClientList _clientList; bool _running; }; void subscriberCallback(const HashType &hash, - const TopicSubscriber::CallbackMap &map, + CallbackMapWrapper &mapWrapper, std::shared_ptr /*connection*/, std::shared_ptr in_message); - -void test(std::shared_ptr /*connection*/, - std::shared_ptr in_message); } // namespace ComPrivate } // namespace ROSBridge diff --git a/src/comm/ros_bridge/src/ROSBridge.cpp b/src/comm/ros_bridge/src/ROSBridge.cpp index f18a1da6235242f8544a4f3ad6d54f38a70df619..9b8d07fa537a0cb1986473f2cbacb1467c98e704 100644 --- a/src/comm/ros_bridge/src/ROSBridge.cpp +++ b/src/comm/ros_bridge/src/ROSBridge.cpp @@ -3,8 +3,8 @@ ROSBridge::ROSBridge::ROSBridge() : _casePacker(&_typeFactory, &_jsonFactory) , _rbc("localhost:9090") - , _topicPublisher(&_casePacker, &_rbc) - , _topicSubscriber(&_casePacker, &_rbc) + , _topicPublisher(&_casePacker, &_rbc, &_rbcMutex) + , _topicSubscriber(&_casePacker, &_rbc, &_rbcMutex) { } @@ -36,3 +36,9 @@ void ROSBridge::ROSBridge::reset() _topicSubscriber.reset(); } +bool ROSBridge::ROSBridge::connected() +{ + std::lock_guard lk(_rbcMutex); + return _rbc.connected(); +} +