Commit 29b83181 authored by Valentin Platzgummer's avatar Valentin Platzgummer

enable snake bug solved (was data race)

parent f391ef7a
......@@ -251,7 +251,8 @@ FlightMap {
delegate: MapCircle{
center: modelData
border.color: "transparent"
border.color: "black"
border.width: 1
color: getColor(wimaController.nemoProgress[index])
radius: 0.6
opacity: 1
......
......@@ -715,6 +715,10 @@ void WimaController::_eventTimerHandler()
if (_enableSnake.rawValue().toBool() && rosBridgeTicker.ready()) {
qWarning() << "Connectable: " << _pRosBridge->connected() << "\n";
auto start = std::chrono::high_resolution_clock::now();
_pRosBridge->publish(_snakeTilesLocal, "/snake/tiles");
_pRosBridge->publish(_snakeOrigin, "/snake/origin");
......@@ -736,6 +740,10 @@ void WimaController::_eventTimerHandler()
emit WimaController::nemoStatusChanged();
emit WimaController::nemoStatusStringChanged();
});
auto end = std::chrono::high_resolution_clock::now();
qWarning() << "Duration: " << std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms\n";
}
}
......@@ -862,7 +870,7 @@ void WimaController::_snakeStoreWorkerResults()
const WorkerResult_t &r = _snakeWorker.getResult();
_setSnakeCalcInProgress(false);
if (!r.success) {
qgcApp()->showMessage(r.errorMessage);
//qgcApp()->showMessage(r.errorMessage);
return;
}
......
......@@ -353,7 +353,7 @@ public:
typedef MessageGroups::HeartbeatGroup Group;
Heartbeat(){}
Heartbeat(int status) :_status(status){}
Heartbeat(const Heartbeat &heartbeat) :_status(heartbeat.status()){}
Heartbeat(const Heartbeat &heartbeat) : MessageBaseClass(), _status(heartbeat.status()){}
~Heartbeat() = default;
virtual Heartbeat *Clone() const override { return new Heartbeat(*this); }
......
......@@ -79,6 +79,17 @@ struct GeometryMsgs {
};
};
struct GeographicMsgs {
static StringType label() {return "geographic_msgs";}
//!
//! \brief The GeoPointGroup struct is used the mark a class as a ROS geographic_msgs/GeoPoint message.
struct GeoPointGroup {
static StringType label() {return "GeoPoint";}
};
};
struct JSKRecognitionMsgs {
static StringType label() {return "jsk_recognition_msgs";}
......@@ -118,8 +129,8 @@ typedef MessageGroups::MessageGroup<MessageGroups::GeometryMsgs,
MessageGroups::GeometryMsgs::Point32Group>
Point32Group;
typedef MessageGroups::MessageGroup<MessageGroups::GeometryMsgs,
MessageGroups::GeometryMsgs::GeoPointGroup>
typedef MessageGroups::MessageGroup<MessageGroups::GeographicMsgs,
MessageGroups::GeographicMsgs::GeoPointGroup>
GeoPointGroup;
typedef MessageGroups::MessageGroup<MessageGroups::GeometryMsgs,
......
......@@ -34,14 +34,18 @@ public:
//! @brief Start ROS bridge.
void start();
//! @brief Reset ROS bridge.
//! @brief Stops ROS bridge.
void reset();
//! @return Returns true if connected to the rosbridge_server, false either.
//! @note This function can block up to 100ms. However normal execution time is smaller.
bool connected();
private:
TypeFactory _typeFactory;
JsonFactory _jsonFactory;
CasePacker _casePacker;
RosbridgeWsClient _rbc;
std::mutex _rbcMutex;
ComPrivate::TopicPublisher _topicPublisher;
ComPrivate::TopicSubscriber _topicSubscriber;
......
......@@ -15,6 +15,8 @@
#include <chrono>
#include <functional>
#include <thread>
#include <future>
#include <functional>
using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>;
using InMessage = std::function<void(std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::InMessage>)>;
......@@ -24,6 +26,7 @@ class RosbridgeWsClient
std::string server_port_path;
std::unordered_map<std::string, std::shared_ptr<WsClient>> client_map;
// Starts the client.
void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message)
{
if (!client->on_open)
......@@ -103,6 +106,34 @@ public:
}
}
// The execution can take up to 100 ms!
bool connected(){
auto p = std::make_shared<std::promise<void>>();
auto future = p->get_future();
auto callback = [](std::shared_ptr<WsClient::Connection> connection, std::shared_ptr<std::promise<void>> p) {
p->set_value();
connection->send_close(1000);
};
std::shared_ptr<WsClient> status_client = std::make_shared<WsClient>(server_port_path);
status_client->on_open = std::bind(callback, std::placeholders::_1, p);
std::async([status_client]{
status_client->start();
status_client->on_open = NULL;
status_client->on_message = NULL;
status_client->on_close = NULL;
status_client->on_error = NULL;
});
auto status = future.wait_for(std::chrono::milliseconds(100));
return status == std::future_status::ready;
}
// Adds a client to the client_map.
void addClient(const std::string& client_name)
{
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
......@@ -170,6 +201,7 @@ public:
#endif
}
// Gets the client from client_map and starts it. Advertising is essentially sending a message.
void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "")
{
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
......
......@@ -2,11 +2,27 @@
struct ROSBridge::ComPrivate::ThreadData
{
const ROSBridge::CasePacker &casePacker;
RosbridgeWsClient &rbc;
std::mutex &rbcMutex;
ROSBridge::ComPrivate::JsonQueue &queue;
std::mutex &queueMutex;
ROSBridge::ComPrivate::HashSet &advertisedTopicsHashList;
const std::atomic<bool> &running;
std::condition_variable &cv;
};
ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker,
RosbridgeWsClient *rbc) :
RosbridgeWsClient *rbc,
std::mutex *rbcMutex) :
_running(false)
, _casePacker(casePacker)
, _rbc(rbc)
, _rbcMutex(rbcMutex)
{
}
......@@ -21,14 +37,22 @@ void ROSBridge::ComPrivate::TopicPublisher::start()
if ( _running.load() ) // start called while thread running.
return;
_running.store(true);
_rbc->addClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
_pThread.reset(new std::thread(&ROSBridge::ComPrivate::transmittLoop,
std::cref(*_casePacker),
std::ref(*_rbc),
std::ref(_queue),
std::ref(_queueMutex),
std::ref(_advertisedTopicsHashList),
std::cref(_running)));
{
std::lock_guard<std::mutex> lk(*_rbcMutex);
_rbc->addClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
_rbc->addClient(ROSBridge::ComPrivate::_topicPublisherKey);
}
ROSBridge::ComPrivate::ThreadData data{
*_casePacker,
*_rbc,
*_rbcMutex,
_queue,
_queueMutex,
_advertisedTopicsHashList,
_running,
_cv
};
_pThread = std::make_unique<std::thread>(&ROSBridge::ComPrivate::transmittLoop, data);
}
void ROSBridge::ComPrivate::TopicPublisher::reset()
......@@ -36,36 +60,32 @@ void ROSBridge::ComPrivate::TopicPublisher::reset()
if ( !_running.load() ) // stop called while thread not running.
return;
_running.store(false);
_cv.notify_one(); // Wake publisher thread.
if ( !_pThread )
return;
_pThread->join();
_pThread.reset();
_rbc->removeClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
{
std::lock_guard<std::mutex> lk(*_rbcMutex);
_rbc->removeClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
_rbc->removeClient(ROSBridge::ComPrivate::_topicPublisherKey);
}
_queue.clear();
_advertisedTopicsHashList.clear();
}
void ROSBridge::ComPrivate::transmittLoop(const ROSBridge::CasePacker &casePacker,
RosbridgeWsClient &rbc,
ROSBridge::ComPrivate::JsonQueue &queue,
std::mutex &queueMutex,
HashSet &advertisedTopicsHashList,
const std::atomic<bool> &running)
void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data)
{
rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey);
while(running.load()){
// Pop message from queue.
queueMutex.lock();
//std::cout << "Queue size: " << queue.size() << std::endl;
if (queue.empty()){
queueMutex.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(20));
while(data.running.load()){
std::unique_lock<std::mutex> lk(data.queueMutex);
// Check if new data available, wait if not.
if (data.queue.empty()){
data.cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case.
continue;
}
JsonDocUPtr pJsonDoc(std::move(queue.front()));
queue.pop_front();
queueMutex.unlock();
// Pop message from queue.
JsonDocUPtr pJsonDoc(std::move(data.queue.front()));
data.queue.pop_front();
lk.unlock();
// Debug output.
// std::cout << "Transmitter loop json document:" << std::endl;
......@@ -76,26 +96,32 @@ void ROSBridge::ComPrivate::transmittLoop(const ROSBridge::CasePacker &casePacke
// Get tag from Json message and remove it.
Tag tag;
bool ret = casePacker.getTag(pJsonDoc, tag);
bool ret = data.casePacker.getTag(pJsonDoc, tag);
assert(ret); // Json message does not contain a tag;
(void)ret;
casePacker.removeTag(pJsonDoc);
data.casePacker.removeTag(pJsonDoc);
// Check if topic must be advertised.
// Advertised topics are stored in advertisedTopicsHashList as
// a hash.
HashType hash = ROSBridge::ComPrivate::getHash(tag.topic());
if ( advertisedTopicsHashList.count(hash) == 0) {
advertisedTopicsHashList.insert(hash);
rbc.advertise(ROSBridge::ComPrivate::_topicAdvertiserKey,
tag.topic(),
tag.messageType() );
if ( data.advertisedTopicsHashList.count(hash) == 0) {
data.advertisedTopicsHashList.insert(hash);
{
std::lock_guard<std::mutex> lk(data.rbcMutex);
data.rbc.advertise(ROSBridge::ComPrivate::_topicAdvertiserKey,
tag.topic(),
tag.messageType() );
}
}
// Debug output.
//std::cout << "Hash Set size: " << advertisedTopicsHashList.size() << std::endl;
// Send Json message.
rbc.publish(tag.topic(), *pJsonDoc.get());
{
std::lock_guard<std::mutex> lk(data.rbcMutex);
data.rbc.publish(tag.topic(), *pJsonDoc.get());
}
} // while loop
}
......@@ -10,18 +10,23 @@
#include <atomic>
#include <mutex>
#include <set>
#include <condition_variable>
namespace ROSBridge {
namespace ComPrivate {
struct ThreadData;
class TopicPublisher
{
typedef std::unique_ptr<std::thread> ThreadPtr;
using CondVar = std::condition_variable;
public:
TopicPublisher() = delete;
TopicPublisher(CasePacker *casePacker,
RosbridgeWsClient *rbc);
RosbridgeWsClient *rbc,
std::mutex *rbcMutex);
~TopicPublisher();
......@@ -37,8 +42,11 @@ public:
publish(std::move(docPtr));
}
void publish(JsonDocUPtr docPtr){
{
std::lock_guard<std::mutex> lock(_queueMutex);
_queue.push_back(std::move(docPtr));
}
_cv.notify_one(); // Wake publisher thread.
}
private:
......@@ -46,19 +54,16 @@ private:
std::mutex _queueMutex;
std::atomic<bool> _running;
CasePacker *_casePacker;
ThreadPtr _pThread;
RosbridgeWsClient *_rbc;
std::mutex *_rbcMutex;
HashSet _advertisedTopicsHashList; // Not thread save! This container
// is manipulated by transmittLoop only!
CondVar _cv;
ThreadPtr _pThread;
};
void transmittLoop(const ROSBridge::CasePacker &casePacker,
RosbridgeWsClient &rbc,
ROSBridge::ComPrivate::JsonQueue &queue,
std::mutex &queueMutex,
HashSet &advertisedTopicsHashList,
const std::atomic<bool> &running);
void transmittLoop(ThreadData data);
} // namespace CommunicatorDetail
......
#include "TopicSubscriber.h"
ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(
ROSBridge::CasePacker *casePacker,
RosbridgeWsClient *rbc) :
ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(ROSBridge::CasePacker *casePacker,
RosbridgeWsClient *rbc, std::mutex *rbcMutex) :
_casePacker(casePacker)
, _rbc(rbc)
, _rbcMutex(rbcMutex)
, _running(false)
{
......@@ -24,10 +24,17 @@ void ROSBridge::ComPrivate::TopicSubscriber::start()
void ROSBridge::ComPrivate::TopicSubscriber::reset()
{
if ( _running ){
for (std::string clientName : _clientList)
_rbc->removeClient(clientName);
{
std::lock_guard<std::mutex> lk(*_rbcMutex);
for (std::string clientName : _clientList){
_rbc->removeClient(clientName);
}
}
_running = false;
_callbackMap.clear();
{
std::lock_guard<std::mutex> lk(_callbackMap.mutex);
_callbackMap.map.clear();
}
_clientList.clear();
}
}
......@@ -44,25 +51,30 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe(
_clientList.push_back(clientName);
HashType hash = getHash(clientName);
auto ret = _callbackMap.insert(std::make_pair(hash, callback)); //
if ( !ret.second )
return false; // Topic subscription already present.
{
std::lock_guard<std::mutex> lk(_callbackMap.mutex);
auto ret = _callbackMap.map.insert(std::make_pair(hash, callback)); //
if ( !ret.second )
return false; // Topic subscription already present.
}
using namespace std::placeholders;
auto f = std::bind(&ROSBridge::ComPrivate::subscriberCallback,
hash,
std::cref(_callbackMap),
std::ref(_callbackMap),
_1, _2);
// std::cout << std::endl;
// std::cout << "topic subscription" << std::endl;
// std::cout << "client name: " << clientName << std::endl;
// std::cout << "topic: " << topic << std::endl;
_rbc->addClient(clientName);
_rbc->subscribe(clientName,
topic,
f );
{
std::lock_guard<std::mutex> lk(*_rbcMutex);
_rbc->addClient(clientName);
_rbc->subscribe(clientName,
topic,
f );
}
return true;
......@@ -72,7 +84,7 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe(
using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>;
void ROSBridge::ComPrivate::subscriberCallback(
const HashType &hash,
const ROSBridge::ComPrivate::TopicSubscriber::CallbackMap &map,
ROSBridge::ComPrivate::CallbackMapWrapper &mapWrapper,
std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message)
{
......@@ -97,26 +109,21 @@ void ROSBridge::ComPrivate::subscriberCallback(
// << std::endl;
// Search callback.
auto it = map.find(hash);
if (it == map.end()) {
assert(false); // callback not found
return;
CallbackType callback;
{
std::lock_guard<std::mutex> lk(mapWrapper.mutex);
auto it = mapWrapper.map.find(hash);
if (it == mapWrapper.map.end()) {
//assert(false); // callback not found
return;
}
callback = it->second;
}
// Extract message and call callback.
JsonDocUPtr pDoc(new JsonDoc());
pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator());
it->second(std::move(pDoc)); // Call callback.
callback(std::move(pDoc));
return;
}
void ROSBridge::ComPrivate::test(
std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message)
{
std::cout << "test: Json document: "
<< in_message->string()
<< std::endl;
}
......@@ -8,15 +8,21 @@
namespace ROSBridge {
namespace ComPrivate {
typedef std::function<void(JsonDocUPtr)> CallbackType;
struct CallbackMapWrapper{
typedef std::map<HashType, CallbackType> Map;
Map map;
std::mutex mutex;
};
class TopicSubscriber
{
typedef std::vector<std::string> ClientList;
public:
typedef std::function<void(JsonDocUPtr)> CallbackType;
typedef std::map<HashType, CallbackType> CallbackMap;
public:
TopicSubscriber() = delete;
TopicSubscriber(CasePacker *casePacker, RosbridgeWsClient *rbc);
TopicSubscriber(CasePacker *casePacker, RosbridgeWsClient *rbc, std::mutex *rbcMutex);
~TopicSubscriber();
//! @brief Starts the subscriber.
......@@ -31,19 +37,20 @@ public:
bool subscribe(const char* topic, const CallbackType &callback);
private:
CasePacker *_casePacker;
RosbridgeWsClient *_rbc;
CallbackMap _callbackMap;
std::mutex *_rbcMutex;
CallbackMapWrapper _callbackMap;
ClientList _clientList;
bool _running;
};
void subscriberCallback(const HashType &hash,
const TopicSubscriber::CallbackMap &map,
CallbackMapWrapper &mapWrapper,
std::shared_ptr<WsClient::Connection> /*connection*/,
std::shared_ptr<WsClient::InMessage> in_message);
void test(std::shared_ptr<WsClient::Connection> /*connection*/,
std::shared_ptr<WsClient::InMessage> in_message);
} // namespace ComPrivate
} // namespace ROSBridge
......@@ -3,8 +3,8 @@
ROSBridge::ROSBridge::ROSBridge() :
_casePacker(&_typeFactory, &_jsonFactory)
, _rbc("localhost:9090")
, _topicPublisher(&_casePacker, &_rbc)
, _topicSubscriber(&_casePacker, &_rbc)
, _topicPublisher(&_casePacker, &_rbc, &_rbcMutex)
, _topicSubscriber(&_casePacker, &_rbc, &_rbcMutex)
{
}
......@@ -36,3 +36,9 @@ void ROSBridge::ROSBridge::reset()
_topicSubscriber.reset();
}
bool ROSBridge::ROSBridge::connected()
{
std::lock_guard<std::mutex> lk(_rbcMutex);
return _rbc.connected();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment