Commit 50929c20 authored by Valentin Platzgummer's avatar Valentin Platzgummer

1243

parent 65679db5
......@@ -28,11 +28,13 @@ QGCROOT = $$PWD
DebugBuild {
DESTDIR = $${OUT_PWD}/debug
DEFINES += DEBUG
DEFINES += SNAKE_SHOW_TIME
#DEFINES += ROS_BRIDGE_DEBUG
}
else {
DESTDIR = $${OUT_PWD}/release
#DEFINES += ROS_BRIDGE_DEBUG
DEFINES += SNAKE_SHOW_TIME
DEFINES += NDEBUG
}
......@@ -438,11 +440,11 @@ HEADERS += \
src/Wima/Snake/QNemoHeartbeat.h \
src/Wima/Snake/QNemoProgress.h \
src/Wima/Snake/QNemoProgress.h \
src/Wima/Snake/SnakeThread.h \
src/Wima/Snake/SnakeTile.h \
src/Wima/Snake/SnakeTileLocal.h \
src/Wima/Snake/SnakeTiles.h \
src/Wima/Snake/SnakeTilesLocal.h \
src/Wima/Snake/SnakeDataManager.h \
src/Wima/WaypointManager/AreaInterface.h \
src/Wima/WaypointManager/DefaultManager.h \
src/Wima/WaypointManager/GenericWaypointManager.h \
......@@ -503,7 +505,7 @@ SOURCES += \
src/Wima/Geometry/GeoPoint3D.cpp \
src/Wima/Snake/NemoInterface.cpp \
src/Wima/Snake/QNemoProgress.cc \
src/Wima/Snake/SnakeDataManager.cc \
src/Wima/Snake/SnakeThread.cc \
src/Wima/Snake/SnakeTile.cpp \
src/Wima/WaypointManager/AreaInterface.cpp \
src/Wima/WaypointManager/DefaultManager.cpp \
......
......@@ -22,7 +22,7 @@
using namespace operations_research;
#ifndef NDEBUG
//#define SHOW_TIME
//#define SNAKE_SHOW_TIME
#endif
namespace bg = boost::geometry;
......@@ -104,7 +104,7 @@ bool minimalBoundingBox(const BoostPolygon &polygon, BoundingBox &minBBox) {
//# Compute edges (x2-x1,y2-y1)
std::vector<BoostPoint> edges;
auto convex_hull_outer = convex_hull.outer();
const auto &convex_hull_outer = convex_hull.outer();
for (long i = 0; i < long(convex_hull_outer.size()) - 1; ++i) {
BoostPoint p1 = convex_hull_outer.at(i);
BoostPoint p2 = convex_hull_outer.at(i + 1);
......@@ -711,13 +711,12 @@ bool flight_plan::transectsFromScenario(Length distance, Length minLength,
// Rotate measurement area by angle and calculate bounding box.
auto &mArea = scenario.measurementArea();
BoostPolygon mAreaRotated;
trans::rotate_transformer<bg::degree, double, 2, 2> rotate(-angle.value() *
trans::rotate_transformer<bg::degree, double, 2, 2> rotate(angle.value() *
180 / M_PI);
bg::transform(mArea, mAreaRotated, rotate);
BoostBox box;
boost::geometry::envelope(mAreaRotated, box);
double x0 = box.min_corner().get<0>();
double y0 = box.min_corner().get<1>();
double x1 = box.max_corner().get<0>();
......@@ -725,8 +724,6 @@ bool flight_plan::transectsFromScenario(Length distance, Length minLength,
// Generate transects and convert them to clipper path.
size_t num_t = int(ceil((y1 - y0) / distance.value())); // number of transects
trans::rotate_transformer<bg::degree, double, 2, 2> rotate_back(
angle.value() * 180 / M_PI);
vector<ClipperLib::Path> transectsClipper;
transectsClipper.reserve(num_t);
for (size_t i = 0; i < num_t; ++i) {
......@@ -736,17 +733,20 @@ bool flight_plan::transectsFromScenario(Length distance, Length minLength,
BoostLineString transect;
transect.push_back(v1);
transect.push_back(v2);
// transform back
BoostLineString temp_transect;
trans::rotate_transformer<bg::degree, double, 2, 2> rotate_back(
-angle.value() * 180 / M_PI);
bg::transform(transect, temp_transect, rotate_back);
ClipperLib::IntPoint c1{
static_cast<ClipperLib::cInt>(transect[0].get<0>() * CLIPPER_SCALE),
static_cast<ClipperLib::cInt>(transect[0].get<1>() * CLIPPER_SCALE)};
ClipperLib::IntPoint c2{
static_cast<ClipperLib::cInt>(transect[1].get<0>() * CLIPPER_SCALE),
static_cast<ClipperLib::cInt>(transect[1].get<1>() * CLIPPER_SCALE)};
// to clipper
ClipperLib::IntPoint c1{static_cast<ClipperLib::cInt>(
temp_transect[0].get<0>() * CLIPPER_SCALE),
static_cast<ClipperLib::cInt>(
temp_transect[0].get<1>() * CLIPPER_SCALE)};
ClipperLib::IntPoint c2{static_cast<ClipperLib::cInt>(
temp_transect[1].get<0>() * CLIPPER_SCALE),
static_cast<ClipperLib::cInt>(
temp_transect[1].get<1>() * CLIPPER_SCALE)};
ClipperLib::Path path{c1, c2};
transectsClipper.push_back(path);
}
......@@ -852,22 +852,22 @@ void generateRoutingModel(const BoostLineString &vertices,
const BoostPolygon &polygonOffset, size_t n0,
RoutingDataModel &dataModel, Matrix<double> &graph) {
#ifdef SHOW_TIME
#ifdef SNAKE_SHOW_TIME
auto start = std::chrono::high_resolution_clock::now();
#endif
graphFromPolygon(polygonOffset, vertices, graph);
#ifdef SHOW_TIME
#ifdef SNAKE_SHOW_TIME
auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start);
cout << "Execution time graphFromPolygon(): " << delta.count() << " ms"
<< endl;
#endif
Matrix<double> distanceMatrix(graph);
#ifdef SHOW_TIME
#ifdef SNAKE_SHOW_TIME
start = std::chrono::high_resolution_clock::now();
#endif
toDistanceMatrix(distanceMatrix);
#ifdef SHOW_TIME
#ifdef SNAKE_SHOW_TIME
delta = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start);
cout << "Execution time toDistanceMatrix(): " << delta.count() << " ms"
......@@ -934,12 +934,12 @@ bool flight_plan::route(const BoostPolygon &area,
// Offset joined area.
BoostPolygon areaOffset;
offsetPolygon(area, areaOffset, detail::offsetConstant);
#ifdef SHOW_TIME
start = std::chrono::high_resolution_clock::now();
#ifdef SNAKE_SHOW_TIME
auto start = std::chrono::high_resolution_clock::now();
#endif
generateRoutingModel(vertices, areaOffset, n0, dataModel, connectionGraph);
#ifdef SHOW_TIME
delta = std::chrono::duration_cast<std::chrono::milliseconds>(
#ifdef SNAKE_SHOW_TIME
auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start);
cout << "Execution time _generateRoutingModel(): " << delta.count() << " ms"
<< endl;
......@@ -995,11 +995,11 @@ bool flight_plan::route(const BoostPolygon &area,
searchParameters.set_allocated_time_limit(tMax);
// Solve the problem.
#ifdef SHOW_TIME
#ifdef SNAKE_SHOW_TIME
start = std::chrono::high_resolution_clock::now();
#endif
const Assignment *solution = routing.SolveWithParameters(searchParameters);
#ifdef SHOW_TIME
#ifdef SNAKE_SHOW_TIME
delta = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start);
cout << "Execution time routing.SolveWithParameters(): " << delta.count()
......
......@@ -173,7 +173,7 @@ public:
const BoundingBox &measurementAreaBBox() const;
const BoostPoint &homePositon() const;
bool update() const;
bool update();
mutable string errorString;
......
......@@ -213,6 +213,7 @@ bool NemoInterface::Impl::doTopicServiceSetup() {
this->pRosBridge->subscribe(
"/nemo/heartbeat",
/* callback */ [this](JsonDocUPtr pDoc) {
// auto start = std::chrono::high_resolution_clock::now();
UniqueLock lk(this->heartbeatMutex);
auto &heartbeatMsg = this->heartbeat;
......@@ -223,8 +224,15 @@ bool NemoInterface::Impl::doTopicServiceSetup() {
this->nextTimeout =
std::chrono::high_resolution_clock::now() + timeoutInterval;
}
lk.unlock();
emit this->parent->statusChanged();
// auto delta =
// std::chrono::duration_cast<std::chrono::milliseconds>(
// std::chrono::high_resolution_clock::now() - start);
// std::cout << "/nemo/heartbeat callback time: " <<
// delta.count() << " ms"
// << std::endl;
});
// Advertise /snake/get_origin.
......@@ -289,11 +297,12 @@ void NemoInterface::Impl::loop() {
}
// Check if heartbeat timeout occured.
if (this->running && this->topicServiceSetupDone &&
this->nextTimeout != TimePoint::max()) {
if (this->nextTimeout < std::chrono::high_resolution_clock::now()) {
UniqueLock lk(this->heartbeatMutex);
if (this->running && this->topicServiceSetupDone) {
UniqueLock lk(this->heartbeatMutex);
if (this->nextTimeout != TimePoint::max() &&
this->nextTimeout < std::chrono::high_resolution_clock::now()) {
this->heartbeat.setStatus(integral(NemoStatus::Timeout));
lk.unlock();
emit this->parent->statusChanged();
}
}
......@@ -321,6 +330,13 @@ void NemoInterface::Impl::publishENUOrigin() {
this->pRosBridge->publish(std::move(jOrigin), "/snake/origin");
}
// ===============================================================
// NemoInterface
NemoInterface::NemoInterface(QObject *parent)
: QObject(parent), pImpl(std::make_unique<NemoInterface::Impl>(this)) {}
NemoInterface::~NemoInterface() {}
void NemoInterface::start() { this->pImpl->start(); }
void NemoInterface::stop() { this->pImpl->stop(); }
......@@ -333,8 +349,8 @@ void NemoInterface::setENUOrigin(const QGeoCoordinate &ENUOrigin) {
this->pImpl->setENUOrigin(ENUOrigin);
}
NemoInterface::NemoStatus NemoInterface::status() {
NemoInterface::NemoStatus NemoInterface::status() const {
return this->pImpl->status();
}
QVector<int> NemoInterface::progress() { return this->pImpl->progress(); }
QVector<int> NemoInterface::progress() const { return this->pImpl->progress(); }
......@@ -21,6 +21,7 @@ public:
};
explicit NemoInterface(QObject *parent = nullptr);
~NemoInterface() override;
void start();
void stop();
......@@ -28,8 +29,8 @@ public:
void setTilesENU(const SnakeTilesLocal &tilesENU);
void setENUOrigin(const QGeoCoordinate &ENUOrigin);
NemoStatus status();
QVector<int> progress();
NemoStatus status() const;
QVector<int> progress() const;
signals:
void statusChanged();
......
......@@ -15,29 +15,34 @@ using namespace boost::units;
using Length = quantity<si::length>;
using Area = quantity<si::area>;
class SnakeDataManager : public QThread {
class SnakeThread : public QThread {
Q_OBJECT
class Impl;
using PImpl = std::unique_ptr<Impl>;
public:
SnakeDataManager(QObject *parent = nullptr);
~SnakeDataManager() override;
SnakeThread(QObject *parent = nullptr);
~SnakeThread() override;
void setMeasurementArea(const QList<QGeoCoordinate> &measurementArea);
void setServiceArea(const QList<QGeoCoordinate> &serviceArea);
void setCorridor(const QList<QGeoCoordinate> &corridor);
void setProgress(const QVector<int> &progress);
SnakeTiles tiles() const;
SnakeTilesLocal tilesENU() const;
QGeoCoordinate ENUOrigin() const;
QVariantList tileCenterPoints() const;
QNemoProgress progress() const;
QVector<int> progress() const;
bool calcInProgress() const;
QString errorMessage() const;
bool success() const;
bool tilesUpdated();
bool waypointsUpdated();
bool progressUpdated();
QVector<QGeoCoordinate> waypoints() const;
QVector<QGeoCoordinate> arrivalPath() const;
QVector<QGeoCoordinate> returnPath() const;
......
#include "SnakeTile.h"
SnakeTile::SnakeTile() : WimaAreaData()
{
SnakeTile::SnakeTile() : WimaAreaData() {}
SnakeTile::SnakeTile(const SnakeTile &other, QObject *parent)
: WimaAreaData(parent) {
*this = other;
}
SnakeTile::SnakeTile(const SnakeTile &other) : WimaAreaData()
{
*this = other;
SnakeTile &SnakeTile::operator=(const SnakeTile &other) {
this->assign(other);
return *this;
}
SnakeTile &SnakeTile::operator=(const SnakeTile &other)
{
this->assign(other);
return *this;
}
void SnakeTile::assign(const SnakeTile &other)
{
WimaAreaData::assign(other);
}
void SnakeTile::assign(const SnakeTile &other) { WimaAreaData::assign(other); }
......@@ -2,18 +2,16 @@
#include "Wima/Geometry/WimaAreaData.h"
class SnakeTile : public WimaAreaData
{
class SnakeTile : public WimaAreaData {
public:
SnakeTile();
SnakeTile(const SnakeTile&other);
SnakeTile();
SnakeTile(const SnakeTile &other, QObject *parent = nullptr);
QString type() const {return "Tile";}
SnakeTile* Clone() const {return new SnakeTile(*this);}
QString type() const { return "Tile"; }
SnakeTile *Clone() const { return new SnakeTile(*this); }
SnakeTile& operator=(const SnakeTile &other);
SnakeTile &operator=(const SnakeTile &other);
protected:
void assign(const SnakeTile &other);
void assign(const SnakeTile &other);
};
This diff is collapsed.
......@@ -27,7 +27,8 @@
#include "SurveyComplexItem.h"
#include "Geometry/GeoPoint3D.h"
#include "Snake/SnakeDataManager.h"
#include "Snake/NemoInterface.h"
#include "Snake/SnakeThread.h"
#include "Snake/SnakeTiles.h"
#include "Snake/SnakeTilesLocal.h"
......@@ -237,19 +238,16 @@ private slots:
void _initSmartRTL();
void _smartRTLCleanUp(bool flying);
// Snake.
void _DMFinishedHandler();
void _threadFinishedHandler();
void _switchWaypointManager(WaypointManager::ManagerBase &manager);
void _switchToSnakeWaypointManager(QVariant variant);
void _switchDataManager(SnakeDataManager &dataManager);
void _switchThreadObject(SnakeThread &thread);
void _progressChangedHandler();
void _enableSnakeChangedHandler();
// Periodic tasks.
void _eventTimerHandler(void);
// Waypoint manager handling.
void _switchWaypointManager(WaypointManager::ManagerBase &manager);
private:
using StatusMap = std::map<int, QString>;
// Controllers.
PlanMasterController *_masterController;
MissionController *_missionController;
......@@ -310,9 +308,12 @@ private:
double _measurementPathLength; // the lenght of the phase in meters
// Snake
SnakeDataManager _snakeDM; // Snake Data Manager
SnakeDataManager _emptyDM;
SnakeDataManager *_currentDM;
QmlObjectListModel tiles;
SnakeThread _snakeThread; // Snake Data Manager
SnakeThread _emptyThread;
SnakeThread *_currentThread;
NemoInterface _nemoInterface;
using StatusMap = std::map<int, QString>;
static StatusMap _nemoStatusMap;
// Periodic tasks.
......
......@@ -2,8 +2,8 @@
#include "ros_bridge/rapidjson/include/rapidjson/document.h"
#include <memory>
#include <deque>
#include <memory>
#include <unordered_map>
namespace ros_bridge {
......@@ -16,11 +16,6 @@ typedef std::size_t HashType;
using ClientMap = std::unordered_map<HashType, std::string>;
static const char* _topicAdvertiserKey = "topic_advertiser";
static const char* _topicPublisherKey = "topic_publisher";
static const char* _serviceAdvertiserKey = "service_advertiser";
static const char* _topicSubscriberKey = "topic_subscriber";
std::size_t getHash(const std::string &str);
std::size_t getHash(const char *str);
......@@ -30,5 +25,5 @@ bool getType(const JsonDoc &doc, std::string &type);
bool removeTopic(JsonDoc &doc);
bool removeType(JsonDoc &doc);
}
}
} // namespace com_private
} // namespace ros_bridge
......@@ -2,98 +2,87 @@
#include "rapidjson/include/rapidjson/ostreamwrapper.h"
ros_bridge::com_private::Server::Server(RosbridgeWsClient &rbc) :
_rbc(rbc)
, _stopped(std::make_shared<std::atomic_bool>(true))
{
static const char *serviceAdvertiserKey = "service_advertiser";
}
ros_bridge::com_private::Server::Server(RosbridgeWsClient &rbc)
: _rbc(rbc), _stopped(std::make_shared<std::atomic_bool>(true)) {}
void ros_bridge::com_private::Server::advertiseService(const std::string &service,
const std::string &type,
const Server::CallbackType &userCallback)
{
std::string clientName = _serviceAdvertiserKey + service;
auto it = _serviceMap.find(clientName);
if (it != _serviceMap.end())
return; // Service allready advertised.
_serviceMap.insert(std::make_pair(clientName, service));
_rbc.addClient(clientName);
void ros_bridge::com_private::Server::advertiseService(
const std::string &service, const std::string &type,
const Server::CallbackType &userCallback) {
std::string clientName = serviceAdvertiserKey + service;
auto it = _serviceMap.find(clientName);
if (it != _serviceMap.end())
return; // Service allready advertised.
_serviceMap.insert(std::make_pair(clientName, service));
_rbc.addClient(clientName);
auto rbc = &_rbc;
_rbc.advertiseService(clientName, service, type,
[userCallback, rbc](
std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message){
auto rbc = &_rbc;
_rbc.advertiseService(
clientName, service, type,
[userCallback, rbc](std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message) {
// message->string() is destructive, so we have to buffer it first
std::string messagebuf = in_message->string();
//std::cout << "advertiseServiceCallback(): Message Received: "
// << messagebuf << std::endl;
// std::cout << "advertiseServiceCallback(): Message Received: "
// << messagebuf << std::endl;
rapidjson::Document document;
if (document.Parse(messagebuf.c_str()).HasParseError())
{
std::cerr << "advertiseServiceCallback(): Error in parsing service request message: "
if (document.Parse(messagebuf.c_str()).HasParseError()) {
std::cerr << "advertiseServiceCallback(): Error in parsing service "
"request message: "
<< messagebuf << std::endl;
return;
}
if ( !document.HasMember("args") || !document["args"].IsObject()){
std::cerr << "advertiseServiceCallback(): message has no args member: "
<< messagebuf << std::endl;
return;
if (!document.HasMember("args") || !document["args"].IsObject()) {
std::cerr
<< "advertiseServiceCallback(): message has no args member: "
<< messagebuf << std::endl;
return;
}
if ( !document.HasMember("service") || !document["service"].IsString()){
std::cerr << "advertiseServiceCallback(): message has no service member: "
<< messagebuf << std::endl;
return;
if (!document.HasMember("service") || !document["service"].IsString()) {
std::cerr
<< "advertiseServiceCallback(): message has no service member: "
<< messagebuf << std::endl;
return;
}
if ( !document.HasMember("id") || !document["id"].IsString()){
std::cerr << "advertiseServiceCallback(): message has no id member: "
<< messagebuf << std::endl;
return;
if (!document.HasMember("id") || !document["id"].IsString()) {
std::cerr << "advertiseServiceCallback(): message has no id member: "
<< messagebuf << std::endl;
return;
}
JsonDocUPtr pDoc(new JsonDoc());
std::cout << "args member count: " << document["args"].MemberCount();
if ( document["args"].MemberCount() > 0)
pDoc->CopyFrom(document["args"].Move(), document.GetAllocator());
if (document["args"].MemberCount() > 0)
pDoc->CopyFrom(document["args"].Move(), document.GetAllocator());
JsonDocUPtr pDocResponse = userCallback(std::move(pDoc));
rbc->serviceResponse(
document["service"].GetString(),
document["id"].GetString(),
pDocResponse->MemberCount() > 0 ? true : false,
*pDocResponse);
document["service"].GetString(), document["id"].GetString(),
pDocResponse->MemberCount() > 0 ? true : false, *pDocResponse);
return;
});
});
}
ros_bridge::com_private::Server::~Server()
{
this->reset();
}
ros_bridge::com_private::Server::~Server() { this->reset(); }
void ros_bridge::com_private::Server::start()
{
_stopped->store(false);
}
void ros_bridge::com_private::Server::start() { _stopped->store(false); }
void ros_bridge::com_private::Server::reset()
{
if ( _stopped->load() )
return;
std::cout << "Server: _stopped->store(true)" << std::endl;
_stopped->store(true);
for (const auto& item : _serviceMap){
std::cout << "Server: unadvertiseService " << item.second << std::endl;
_rbc.unadvertiseService(item.second);
std::cout << "Server: removeClient " << item.first << std::endl;
_rbc.removeClient(item.first);
}
std::cout << "Server: _serviceMap cleared " << std::endl;
_serviceMap.clear();
void ros_bridge::com_private::Server::reset() {
if (_stopped->load())
return;
std::cout << "Server: _stopped->store(true)" << std::endl;
_stopped->store(true);
for (const auto &item : _serviceMap) {
std::cout << "Server: unadvertiseService " << item.second << std::endl;
_rbc.unadvertiseService(item.second);
std::cout << "Server: removeClient " << item.first << std::endl;
_rbc.removeClient(item.first);
}
std::cout << "Server: _serviceMap cleared " << std::endl;
_serviceMap.clear();
}
#include "topic_publisher.h"
#include <unordered_map>
ros_bridge::com_private::TopicPublisher::TopicPublisher(RosbridgeWsClient &rbc) :
_stopped(std::make_shared<std::atomic_bool>(true))
, _rbc(rbc)
{
}
static const char *topicAdvertiserKey = "topic_advertiser";
ros_bridge::com_private::TopicPublisher::~TopicPublisher()
{
this->reset();
}
#include <unordered_map>
void ros_bridge::com_private::TopicPublisher::start()
{
if ( !_stopped->load() ) // start called while thread running.
return;
_stopped->store(false);
_pThread = std::make_unique<std::thread>([this]{
// Main Loop.
while( !this->_stopped->load() ){
std::unique_lock<std::mutex> lk(this->_mutex);
// Check if new data available, wait if not.
if (this->_queue.empty()){
if ( _stopped->load() ) // Check condition again while holding the lock.
break;
this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case.
continue;
}
// Pop message from queue.
JsonDocUPtr pJsonDoc = std::move(this->_queue.front());
this->_queue.pop_front();
lk.unlock();
// Get topic and type from Json message and remove it.
std::string topic;
bool ret = com_private::getTopic(*pJsonDoc, topic);
assert(ret);
(void)ret;
// Debug
rapidjson::StringBuffer sb;
rapidjson::Writer<rapidjson::StringBuffer> writer(sb);
pJsonDoc->Accept(writer);
std::cout << "message: " << sb.GetString() << std::endl;
std::cout << "topic: " << topic << std::endl;
ret = com_private::removeTopic(*pJsonDoc);
assert(ret);
(void)ret;
// Wait for topic (Rosbridge needs some time to process a advertise() request).
this->_rbc.waitForTopic(topic, [this]{
return this->_stopped->load();
});
// Publish Json message.
if ( !this->_stopped->load() )
this->_rbc.publish(topic, *pJsonDoc);
} // while loop
ros_bridge::com_private::TopicPublisher::TopicPublisher(RosbridgeWsClient &rbc)
: _stopped(std::make_shared<std::atomic_bool>(true)), _rbc(rbc) {}
ros_bridge::com_private::TopicPublisher::~TopicPublisher() { this->reset(); }
void ros_bridge::com_private::TopicPublisher::start() {
if (!_stopped->load()) // start called while thread running.
return;
_stopped->store(false);
_pThread = std::make_unique<std::thread>([this] {
// Main Loop.
while (!this->_stopped->load()) {
std::unique_lock<std::mutex> lk(this->_mutex);
// Check if new data available, wait if not.
if (this->_queue.empty()) {
if (_stopped->load()) // Check condition again while holding the lock.
break;
this->_cv.wait(lk); // Wait for condition, spurious wakeups don't matter
// in this case.
continue;
}
// Pop message from queue.
JsonDocUPtr pJsonDoc = std::move(this->_queue.front());
this->_queue.pop_front();
lk.unlock();
// Get topic and type from Json message and remove it.
std::string topic;
bool ret = com_private::getTopic(*pJsonDoc, topic);
assert(ret);
(void)ret;
// Debug
rapidjson::StringBuffer sb;
rapidjson::Writer<rapidjson::StringBuffer> writer(sb);
pJsonDoc->Accept(writer);
std::cout << "message: " << sb.GetString() << std::endl;
std::cout << "topic: " << topic << std::endl;
ret = com_private::removeTopic(*pJsonDoc);
assert(ret);
(void)ret;
// Wait for topic (Rosbridge needs some time to process a advertise()
// request).
this->_rbc.waitForTopic(topic, [this] { return this->_stopped->load(); });
// Publish Json message.
if (!this->_stopped->load())
this->_rbc.publish(topic, *pJsonDoc);
} // while loop
#ifdef ROS_BRIDGE_DEBUG
std::cout << "TopicPublisher: publisher thread terminated." << std::endl;
std::cout << "TopicPublisher: publisher thread terminated." << std::endl;
#endif
});
});
}
void ros_bridge::com_private::TopicPublisher::reset()
{
if ( _stopped->load() ) // stop called while thread not running.
return;
std::unique_lock<std::mutex> lk(_mutex);
_stopped->store(true);
_cv.notify_one(); // Wake publisher thread.
lk.unlock();
if ( !_pThread )
return;
_pThread->join();
lk.lock();
// Tidy up.
for (auto& it : this->_topicMap){
this->_rbc.unadvertise(it.first);
this->_rbc.removeClient(it.second);
}
this->_topicMap.clear();
_queue.clear();
void ros_bridge::com_private::TopicPublisher::reset() {
if (_stopped->load()) // stop called while thread not running.
return;
std::unique_lock<std::mutex> lk(_mutex);
_stopped->store(true);
_cv.notify_one(); // Wake publisher thread.
lk.unlock();
if (!_pThread)
return;
_pThread->join();
lk.lock();
// Tidy up.
for (auto &it : this->_topicMap) {
this->_rbc.unadvertise(it.first);
this->_rbc.removeClient(it.second);
}
this->_topicMap.clear();
_queue.clear();
}
void ros_bridge::com_private::TopicPublisher::publish(
ros_bridge::com_private::JsonDocUPtr docPtr,
const char *topic)
{
// Check if topic was advertised.
std::string t(topic);
std::unique_lock<std::mutex> lk(this->_mutex);
auto it = this->_topicMap.find(t);
if ( it == this->_topicMap.end()) { // Not yet advertised?
lk.unlock();
ros_bridge::com_private::JsonDocUPtr docPtr, const char *topic) {
// Check if topic was advertised.
std::string t(topic);
std::unique_lock<std::mutex> lk(this->_mutex);
auto it = this->_topicMap.find(t);
if (it == this->_topicMap.end()) { // Not yet advertised?
lk.unlock();
#ifdef ROS_BRIDGE_DEBUG
std::cerr << "TopicPublisher: topic "
<< t << " not yet advertised" << std::endl;
std::cerr << "TopicPublisher: topic " << t << " not yet advertised"
<< std::endl;
#endif
return;
}
lk.unlock();
// Add topic information to json doc.
auto &allocator = docPtr->GetAllocator();
rapidjson::Value key("topic", allocator);
rapidjson::Value value(topic, allocator);
docPtr->AddMember(key, value, allocator);
lk.lock();
_queue.push_back(std::move(docPtr));
lk.unlock();
_cv.notify_one(); // Wake publisher thread.
return;
}
lk.unlock();
// Add topic information to json doc.
auto &allocator = docPtr->GetAllocator();
rapidjson::Value key("topic", allocator);
rapidjson::Value value(topic, allocator);
docPtr->AddMember(key, value, allocator);
lk.lock();
_queue.push_back(std::move(docPtr));
lk.unlock();
_cv.notify_one(); // Wake publisher thread.
}
bool ros_bridge::com_private::TopicPublisher::advertise(const char *topic, const char *type)
{
std::unique_lock<std::mutex> lk(this->_mutex);
std::string t(topic);
auto it = this->_topicMap.find(t);
if ( it == this->_topicMap.end()) { // Need to advertise topic?
std::string clientName =
std::string(ros_bridge::com_private::_topicAdvertiserKey)
+ t;
this->_topicMap.insert(std::make_pair(t, clientName));
lk.unlock();
this->_rbc.addClient(clientName);
this->_rbc.advertise(clientName, t, type);
return true;
} else {
lk.unlock();
bool ros_bridge::com_private::TopicPublisher::advertise(const char *topic,
const char *type) {
std::unique_lock<std::mutex> lk(this->_mutex);
std::string t(topic);
auto it = this->_topicMap.find(t);
if (it == this->_topicMap.end()) { // Need to advertise topic?
std::string clientName = std::string(topicAdvertiserKey) + t;
this->_topicMap.insert(std::make_pair(t, clientName));
lk.unlock();
this->_rbc.addClient(clientName);
this->_rbc.advertise(clientName, t, type);
return true;
} else {
lk.unlock();
#if ROS_BRIDGE_DEBUG
std::cerr << "TopicPublisher: topic " << topic << " already advertised" << std::endl;
std::cerr << "TopicPublisher: topic " << topic << " already advertised"
<< std::endl;
#endif
return false;
}
return false;
}
}
......@@ -2,98 +2,86 @@
#include <thread>
ros_bridge::com_private::TopicSubscriber::TopicSubscriber(RosbridgeWsClient &rbc) :
_rbc(rbc)
, _stopped(std::make_shared<std::atomic_bool>(true))
{
static const char *topicSubscriberKey = "topic_subscriber";
}
ros_bridge::com_private::TopicSubscriber::TopicSubscriber(
RosbridgeWsClient &rbc)
: _rbc(rbc), _stopped(std::make_shared<std::atomic_bool>(true)) {}
ros_bridge::com_private::TopicSubscriber::~TopicSubscriber()
{
this->reset();
}
ros_bridge::com_private::TopicSubscriber::~TopicSubscriber() { this->reset(); }
void ros_bridge::com_private::TopicSubscriber::start()
{
_stopped->store(false);
void ros_bridge::com_private::TopicSubscriber::start() {
_stopped->store(false);
}
void ros_bridge::com_private::TopicSubscriber::reset()
{
if ( !_stopped->load() ){
_stopped->store(true);
{
for (auto &item : _topicMap){
_rbc.unsubscribe(item.second);
_rbc.removeClient(item.first);
}
}
_topicMap.clear();
void ros_bridge::com_private::TopicSubscriber::reset() {
if (!_stopped->load()) {
_stopped->store(true);
{
for (auto &item : _topicMap) {
_rbc.unsubscribe(item.second);
_rbc.removeClient(item.first);
}
}
_topicMap.clear();
}
}
void ros_bridge::com_private::TopicSubscriber::subscribe(
const char *topic,
const std::function<void(JsonDocUPtr)> &callback)
{
if ( _stopped->load() )
return;
const char *topic, const std::function<void(JsonDocUPtr)> &callback) {
if (_stopped->load())
return;
std::string clientName = ros_bridge::com_private::_topicSubscriberKey
+ std::string(topic);
auto it = _topicMap.find(clientName);
if ( it != _topicMap.end() ){ // Topic already subscribed?
return;
}
_topicMap.insert(std::make_pair(clientName, std::string(topic)));
std::string clientName = topicSubscriberKey + std::string(topic);
auto it = _topicMap.find(clientName);
if (it != _topicMap.end()) { // Topic already subscribed?
return;
}
_topicMap.insert(std::make_pair(clientName, std::string(topic)));
// Wrap callback.
auto callbackWrapper = [callback](
std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message)
{
// Wrap callback.
auto callbackWrapper =
[callback](std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message) {
// Parse document.
JsonDoc docFull;
docFull.Parse(in_message->string().c_str());
if ( docFull.HasParseError() ) {
std::cout << "Json document has parse error: "
<< in_message->string()
<< std::endl;
return;
if (docFull.HasParseError()) {
std::cout << "Json document has parse error: " << in_message->string()
<< std::endl;
return;
} else if (!docFull.HasMember("msg")) {
std::cout << "Json document does not contain a message (\"msg\"): "
<< in_message->string()
<< std::endl;
return;
std::cout << "Json document does not contain a message (\"msg\"): "
<< in_message->string() << std::endl;
return;
}
// Extract message and call callback.
JsonDocUPtr pDoc(new JsonDoc());
pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator());
callback(std::move(pDoc));
};
};
if ( !_rbc.topicAvailable(topic) ){
// Wait until topic is available.
std::cout << "TopicSubscriber: Starting wait thread, " << clientName << std::endl;
std::thread t([this, clientName, topic, callbackWrapper]{
this->_rbc.waitForTopic(topic, [this]{
return this->_stopped->load();
});
if ( !this->_stopped->load() ){
this->_rbc.addClient(clientName);
this->_rbc.subscribe(clientName, topic, callbackWrapper);
std::cout << "TopicSubscriber: wait thread subscription successfull: " << clientName << std::endl;
}
std::cout << "TopicSubscriber: wait thread end, " << clientName << std::endl;
});
t.detach();
} else {
_rbc.addClient(clientName);
_rbc.subscribe(clientName, topic, callbackWrapper);
std::cout << "TopicSubscriber: subscription successfull: " << clientName << std::endl;
}
if (!_rbc.topicAvailable(topic)) {
// Wait until topic is available.
std::cout << "TopicSubscriber: Starting wait thread, " << clientName
<< std::endl;
std::thread t([this, clientName, topic, callbackWrapper] {
this->_rbc.waitForTopic(topic, [this] { return this->_stopped->load(); });
if (!this->_stopped->load()) {
this->_rbc.addClient(clientName);
this->_rbc.subscribe(clientName, topic, callbackWrapper);
std::cout << "TopicSubscriber: wait thread subscription successfull: "
<< clientName << std::endl;
}
std::cout << "TopicSubscriber: wait thread end, " << clientName
<< std::endl;
});
t.detach();
} else {
_rbc.addClient(clientName);
_rbc.subscribe(clientName, topic, callbackWrapper);
std::cout << "TopicSubscriber: subscription successfull: " << clientName
<< std::endl;
}
}
......@@ -29,6 +29,7 @@ Item {
QGCPalette { id: qgcPal; colorGroupEnabled: true }
property real fps: 0.0
property var currentPopUp: null
property real currentCenterX: 0
property var activeVehicle: QGroundControl.multiVehicleManager.activeVehicle
......
......@@ -39,6 +39,7 @@ Window {
Loader {
id: mainWindowInner
anchors.fill: parent
anchors.margins: 10
source: "MainWindowInner.qml"
Connections {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment