Commit a0ede0e6 authored by Valentin Platzgummer's avatar Valentin Platzgummer
parents 4bb718fe 96d3005b
...@@ -27,6 +27,8 @@ QGCROOT = $$PWD ...@@ -27,6 +27,8 @@ QGCROOT = $$PWD
DebugBuild { DebugBuild {
DESTDIR = $${OUT_PWD}/debug DESTDIR = $${OUT_PWD}/debug
#DEFINES += DEBUG
#DEFINES += ROS_BRIDGE_CLIENT_DEBUG
} }
else { else {
DESTDIR = $${OUT_PWD}/release DESTDIR = $${OUT_PWD}/release
......
...@@ -251,7 +251,8 @@ FlightMap { ...@@ -251,7 +251,8 @@ FlightMap {
delegate: MapCircle{ delegate: MapCircle{
center: modelData center: modelData
border.color: "transparent" border.color: "black"
border.width: 1
color: getColor(wimaController.nemoProgress[index]) color: getColor(wimaController.nemoProgress[index])
radius: 0.6 radius: 0.6
opacity: 1 opacity: 1
......
...@@ -4,6 +4,13 @@ ...@@ -4,6 +4,13 @@
#include "clipper/clipper.hpp" #include "clipper/clipper.hpp"
#define CLIPPER_SCALE 10000 #define CLIPPER_SCALE 10000
#include "ortools/constraint_solver/routing.h"
#include "ortools/constraint_solver/routing_enums.pb.h"
#include "ortools/constraint_solver/routing_index_manager.h"
#include "ortools/constraint_solver/routing_parameters.h"
using namespace operations_research;
#ifndef NDEBUG #ifndef NDEBUG
//#define SHOW_TIME //#define SHOW_TIME
#endif #endif
...@@ -37,7 +44,7 @@ bool Scenario::addArea(Area &area) ...@@ -37,7 +44,7 @@ bool Scenario::addArea(Area &area)
return false; return false;
} }
bool Scenario::defined(double tileWidth, double tileHeight, double minTileArea) bool Scenario::update(double tileWidth, double tileHeight, double minTileArea)
{ {
if (!_areas2enu()) if (!_areas2enu())
return false; return false;
...@@ -301,6 +308,14 @@ bool Scenario::_calculateJoinedArea() ...@@ -301,6 +308,14 @@ bool Scenario::_calculateJoinedArea()
return true; return true;
} }
struct FlightPlan::RoutingDataModel{
Matrix<int64_t> distanceMatrix;
long numVehicles;
RoutingIndexManager::NodeIndex depot;
};
FlightPlan::FlightPlan() FlightPlan::FlightPlan()
{ {
...@@ -356,7 +371,7 @@ bool FlightPlan::generate(double lineDistance, double minTransectLength) ...@@ -356,7 +371,7 @@ bool FlightPlan::generate(double lineDistance, double minTransectLength)
} }
size_t n1 = vertices.size(); size_t n1 = vertices.size();
// Generate routing model. // Generate routing model.
RoutingDataModel_t dataModel; RoutingDataModel dataModel;
Matrix<double> connectionGraph(n1, n1); Matrix<double> connectionGraph(n1, n1);
#ifdef SHOW_TIME #ifdef SHOW_TIME
...@@ -612,7 +627,7 @@ bool FlightPlan::_generateTransects(double lineDistance, double minTransectLengt ...@@ -612,7 +627,7 @@ bool FlightPlan::_generateTransects(double lineDistance, double minTransectLengt
void FlightPlan::_generateRoutingModel(const BoostLineString &vertices, void FlightPlan::_generateRoutingModel(const BoostLineString &vertices,
const BoostPolygon &polygonOffset, const BoostPolygon &polygonOffset,
size_t n0, size_t n0,
FlightPlan::RoutingDataModel_t &dataModel, FlightPlan::RoutingDataModel &dataModel,
Matrix<double> &graph) Matrix<double> &graph)
{ {
#ifdef SHOW_TIME #ifdef SHOW_TIME
......
...@@ -6,16 +6,9 @@ ...@@ -6,16 +6,9 @@
#include "snake_geometry.h" #include "snake_geometry.h"
#include "ortools/constraint_solver/routing.h"
#include "ortools/constraint_solver/routing_enums.pb.h"
#include "ortools/constraint_solver/routing_index_manager.h"
#include "ortools/constraint_solver/routing_parameters.h"
using namespace std; using namespace std;
using namespace snake_geometry; using namespace snake_geometry;
using namespace operations_research;
namespace snake { namespace snake {
enum AreaType {MeasurementArea, ServiceArea, Corridor}; enum AreaType {MeasurementArea, ServiceArea, Corridor};
...@@ -60,7 +53,7 @@ namespace snake { ...@@ -60,7 +53,7 @@ namespace snake {
const vector<GeoPoint3D> &getTileCenterPoints() {return _tileCenterPoints;} const vector<GeoPoint3D> &getTileCenterPoints() {return _tileCenterPoints;}
const GeoPoint3D &getHomePositon() {return _homePosition;} const GeoPoint3D &getHomePositon() {return _homePosition;}
bool defined(double tileWidth, double tileHeight, double minTileArea); bool update(double tileWidth, double tileHeight, double minTileArea);
string errorString; string errorString;
...@@ -104,8 +97,8 @@ namespace snake { ...@@ -104,8 +97,8 @@ namespace snake {
public: public:
FlightPlan(); FlightPlan();
void setScenario(const Scenario &scenario) {_scenario = scenario;} void setScenario(Scenario &scenario) {_scenario = scenario;}
void setProgress(const vector<int8_t> &progress) {_progress = progress;} void setProgress(vector<int> &progress) {_progress = progress;}
const Scenario &getScenario(void) const {return _scenario;} const Scenario &getScenario(void) const {return _scenario;}
const BoostLineString &getWaypointsENU(void) const {return _waypointsENU;} const BoostLineString &getWaypointsENU(void) const {return _waypointsENU;}
...@@ -124,28 +117,22 @@ namespace snake { ...@@ -124,28 +117,22 @@ namespace snake {
private: private:
// Search Filter to speed up routing.SolveWithParameters(...); // Search Filter to speed up routing.SolveWithParameters(...);
// Found here: http://www.lia.disi.unibo.it/Staff/MicheleLombardi/or-tools-doc/user_manual/manual/ls/ls_filtering.html // Found here: http://www.lia.disi.unibo.it/Staff/MicheleLombardi/or-tools-doc/user_manual/manual/ls/ls_filtering.html
class SearchFilter : public LocalSearchFilter{ class SearchFilter;
};
typedef struct{ struct RoutingDataModel;
Matrix<int64_t> distanceMatrix;
long numVehicles;
RoutingIndexManager::NodeIndex depot;
}RoutingDataModel_t;
bool _generateTransects(double lineDistance, double minTransectLength); bool _generateTransects(double lineDistance, double minTransectLength);
void _generateRoutingModel(const BoostLineString &vertices, void _generateRoutingModel(const BoostLineString &vertices,
const BoostPolygon &polygonOffset, const BoostPolygon &polygonOffset,
size_t n0, size_t n0,
RoutingDataModel_t &dataModel, RoutingDataModel &dataModel,
Matrix<double> &graph); Matrix<double> &graph);
Scenario _scenario; Scenario _scenario;
BoostLineString _waypointsENU; BoostLineString _waypointsENU;
GeoPoint2DList _waypoints; GeoPoint2DList _waypoints;
vector<BoostLineString> _transects; vector<BoostLineString> _transects;
vector<int8_t> _progress; vector<int> _progress;
vector<uint64_t> _arrivalPathIdx; vector<uint64_t> _arrivalPathIdx;
vector<uint64_t> _returnPathIdx; vector<uint64_t> _returnPathIdx;
......
#include "GeoPoint3D.h" #include "GeoPoint3D.h"
GeoPoint3D::GeoPoint3D(QObject *parent) GeoPoint::GeoPoint(QObject *parent)
: QObject(parent), ROSGeoPoint() {} : QObject(parent), ROSGeoPoint() {}
GeoPoint3D::GeoPoint3D(double latitude, double longitude, double altitude, QObject *parent) GeoPoint::GeoPoint(double latitude, double longitude, double altitude, QObject *parent)
: QObject(parent), ROSGeoPoint(latitude, longitude, altitude) : QObject(parent), ROSGeoPoint(latitude, longitude, altitude)
{} {}
GeoPoint3D::GeoPoint3D(const GeoPoint3D &p, QObject *parent) GeoPoint::GeoPoint(const GeoPoint &p, QObject *parent)
: QObject(parent), ROSGeoPoint(p.latitude(), p.longitude(), p.altitude()) : QObject(parent), ROSGeoPoint(p.latitude(), p.longitude(), p.altitude())
{} {}
GeoPoint3D::GeoPoint3D(const ROSGeoPoint &p, QObject *parent) GeoPoint::GeoPoint(const ROSGeoPoint &p, QObject *parent)
: QObject(parent), ROSGeoPoint(p.latitude(), p.longitude(), p.altitude()) : QObject(parent), ROSGeoPoint(p.latitude(), p.longitude(), p.altitude())
{} {}
GeoPoint3D::GeoPoint3D(const QGeoCoordinate &p, QObject *parent) GeoPoint::GeoPoint(const QGeoCoordinate &p, QObject *parent)
: QObject(parent), ROSGeoPoint(p.latitude(), p.longitude(), p.altitude()) : QObject(parent), ROSGeoPoint(p.latitude(), p.longitude(), p.altitude())
{} {}
GeoPoint3D *GeoPoint3D::Clone() const GeoPoint *GeoPoint::Clone() const
{ {
return new GeoPoint3D(*this); return new GeoPoint(*this);
} }
GeoPoint3D &GeoPoint3D::operator=(const GeoPoint3D &p) GeoPoint &GeoPoint::operator=(const GeoPoint &p)
{ {
this->setLatitude(p.latitude()); this->setLatitude(p.latitude());
this->setLongitude(p.longitude()); this->setLongitude(p.longitude());
...@@ -32,7 +32,7 @@ GeoPoint3D &GeoPoint3D::operator=(const GeoPoint3D &p) ...@@ -32,7 +32,7 @@ GeoPoint3D &GeoPoint3D::operator=(const GeoPoint3D &p)
return *this; return *this;
} }
GeoPoint3D &GeoPoint3D::operator=(const QGeoCoordinate &p) GeoPoint &GeoPoint::operator=(const QGeoCoordinate &p)
{ {
this->setLatitude(p.latitude()); this->setLatitude(p.latitude());
this->setLongitude(p.longitude()); this->setLongitude(p.longitude());
......
...@@ -11,27 +11,27 @@ ...@@ -11,27 +11,27 @@
typedef ROSBridge::MessageBaseClass ROSMsg; typedef ROSBridge::MessageBaseClass ROSMsg;
typedef ROSBridge::GenericMessages::GeographicMsgs::GeoPoint ROSGeoPoint; typedef ROSBridge::GenericMessages::GeographicMsgs::GeoPoint ROSGeoPoint;
namespace MsgGroups = ROSBridge::MessageGroups; namespace MsgGroups = ROSBridge::MessageGroups;
class GeoPoint3D : public QObject, public ROSGeoPoint class GeoPoint : public QObject, public ROSGeoPoint
{ {
Q_OBJECT Q_OBJECT
public: public:
typedef MsgGroups::GeoPointGroup Group; typedef MsgGroups::GeoPointGroup Group;
GeoPoint3D(QObject *parent = nullptr); GeoPoint(QObject *parent = nullptr);
GeoPoint3D(double latitude, GeoPoint(double latitude,
double longitude, double longitude,
double altitude, double altitude,
QObject *parent = nullptr); QObject *parent = nullptr);
GeoPoint3D(const GeoPoint3D& p, GeoPoint(const GeoPoint& p,
QObject *parent = nullptr); QObject *parent = nullptr);
GeoPoint3D(const ROSGeoPoint& p, GeoPoint(const ROSGeoPoint& p,
QObject *parent = nullptr); QObject *parent = nullptr);
GeoPoint3D(const QGeoCoordinate& p, GeoPoint(const QGeoCoordinate& p,
QObject *parent = nullptr); QObject *parent = nullptr);
virtual GeoPoint3D *Clone() const override; virtual GeoPoint *Clone() const override;
GeoPoint3D &operator=(const GeoPoint3D&p); GeoPoint &operator=(const GeoPoint&p);
GeoPoint3D &operator=(const QGeoCoordinate&p); GeoPoint &operator=(const QGeoCoordinate&p);
}; };
......
...@@ -2,58 +2,84 @@ ...@@ -2,58 +2,84 @@
#include <QGeoCoordinate> #include <QGeoCoordinate>
#include "snake_geometry.h"
#include "snake.h"
using namespace snake;
using namespace snake_geometry;
SnakeWorker::SnakeWorker(QObject *parent) : QThread(parent) SnakeWorker::SnakeWorker(QObject *parent) : QThread(parent)
, _lineDistance (3) // meter , _lineDistance(3)
, _minTransectLength (2) // meter , _minTransectLength(2)
, _tileWidth(5)
, _tileHeight(5)
, _minTileArea(3)
{ {
} }
SnakeWorker::~SnakeWorker() SnakeWorker::~SnakeWorker()
{ {
}
void SnakeWorker::setScenario(const Scenario &scenario)
{
_scenario = scenario;
} }
void SnakeWorker::setProgress(const QVector<int> &progress) WorkerResult_t &SnakeWorker::result()
{ {
_progress.clear(); return _result;
for (auto p : progress) {
assert(p >= -1 && p <= 100);
_progress.push_back(p);
}
} }
void SnakeWorker::setLineDistance(double lineDistance) bool SnakeWorker::precondition() const
{ {
assert(lineDistance > 0);
_lineDistance = lineDistance;
}
void SnakeWorker::setMinTransectLength(double minTransectLength) return _mArea.size() > 0
{ && _sArea.size() > 0
assert(minTransectLength > 0); && _corridor.size() > 0
_minTransectLength = minTransectLength; && _progress.size() > 0;
} }
const WorkerResult_t &SnakeWorker::getResult() const double SnakeWorker::lineDistance() const
{ {
return _result; return _lineDistance;
} }
void SnakeWorker::run() void SnakeWorker::run()
{ {
// Reset _result struct. // Reset _result struct.
_result.success = false; _result.clear();
_result.errorMessage = "";
_result.waypoints.clear(); if ( !precondition())
_result.returnPathIdx.clear(); return;
_result.arrivalPathIdx.clear();
Scenario scenario;// Initialize scenario.
Area mArea;
for (auto vertex : _mArea){
mArea.geoPolygon.push_back(GeoPoint2D{vertex.latitude(), vertex.longitude()});
}
mArea.type = AreaType::MeasurementArea;
Area sArea;
for (auto vertex : _sArea){
sArea.geoPolygon.push_back(GeoPoint2D{vertex.latitude(), vertex.longitude()});
}
sArea.type = AreaType::ServiceArea;
Area corridor;
for (auto vertex : _corridor){
corridor.geoPolygon.push_back(GeoPoint2D{vertex.latitude(), vertex.longitude()});
}
corridor.type = AreaType::Corridor;
scenario.addArea(mArea);
scenario.addArea(sArea);
scenario.addArea(corridor);
if ( !scenario.update(_tileWidth, _tileHeight, _minTileArea) )
return;
FlightPlan flightplan; FlightPlan flightplan;
flightplan.setScenario(_scenario); flightplan.setScenario(scenario);
flightplan.setProgress(_progress); flightplan.setProgress(_progress);
// Trying to generate flight plan. // Trying to generate flight plan.
...@@ -67,20 +93,116 @@ void SnakeWorker::run() ...@@ -67,20 +93,116 @@ void SnakeWorker::run()
_result.success = true; _result.success = true;
// Fill result struct. // Fill result struct.
auto waypoints = flightplan.getWaypoints(); auto& waypoints = flightplan.getWaypoints();
for (auto vertex : waypoints){ for (auto vertex : waypoints){
_result.waypoints.append(QVariant::fromValue(QGeoCoordinate(vertex[0], vertex[1], 0))); _result.waypoints.append(QGeoCoordinate(vertex[0], vertex[1], 0));
} }
auto arrivalPathIdx = flightplan.getArrivalPathIdx(); auto& arrivalPathIdx = flightplan.getArrivalPathIdx();
for (auto idx : arrivalPathIdx){ for (auto idx : arrivalPathIdx){
_result.arrivalPathIdx.append(idx); _result.arrivalPathIdx.append(idx);
} }
auto returnPathIdx = flightplan.getReturnPathIdx(); auto& returnPathIdx = flightplan.getReturnPathIdx();
for (auto idx : returnPathIdx){ for (auto idx : returnPathIdx){
_result.returnPathIdx.append(idx); _result.returnPathIdx.append(idx);
} }
// Get tiles and origin.
auto origin = scenario.getOrigin();
_result.origin.setLatitude(origin[0]);
_result.origin.setLongitude(origin[1]);
_result.origin.setAltitude(origin[2]);
const auto &tiles = scenario.getTiles();
const auto &cps = scenario.getTileCenterPoints();
for ( unsigned int i=0; i < tiles.size(); ++i ) {
const auto &tile = tiles[i];
SnakeTile Tile;
for ( const auto &vertex : tile) {
QGeoCoordinate QVertex(vertex[0], vertex[1], vertex[2]);
Tile.append(QVertex);
}
const auto &centerPoint = cps[i];
QGeoCoordinate QCenterPoint(centerPoint[0], centerPoint[1], centerPoint[2]);
Tile.setCenter(QCenterPoint);
_result.tiles.polygons().append(Tile);
_result.tileCenterPoints.append(QVariant::fromValue(QCenterPoint));
}
// Get local tiles.
const auto &tilesENU = scenario.getTilesENU();
for ( unsigned int i=0; i < tilesENU.size(); ++i ) {
const auto &tile = tilesENU[i];
Polygon2D Tile;
for ( const auto &vertex : tile.outer()) {
QPointF QVertex(vertex.get<0>(), vertex.get<1>());
Tile.path().append(QVertex);
}
_result.tilesLocal.polygons().append(Tile);
}
} }
} }
double SnakeWorker::minTransectLength() const
{
return _minTransectLength;
}
void SnakeWorker::setMinTransectLength(double minTransectLength)
{
if (minTransectLength > 0)
_minTransectLength = minTransectLength;
}
double SnakeWorker::minTileArea() const
{
return _minTileArea;
}
void SnakeWorker::setLineDistance(double lineDistance)
{
if (lineDistance > 0)
_lineDistance = lineDistance;
}
double SnakeWorker::tileWidth() const
{
return _tileWidth;
}
void SnakeWorker::setTileWidth(double tileWidth)
{
if (tileWidth > 0)
_tileWidth = tileWidth;
}
double SnakeWorker::tileHeight() const
{
return _tileHeight;
}
void SnakeWorker::setTileHeight(double tileHeight)
{
if (tileHeight > 0)
_tileHeight = tileHeight;
}
void SnakeWorker::setMinTileArea(double minTileArea)
{
if (minTileArea > 0)
_minTileArea = minTileArea;
}
void Result::clear()
{
this->success = false;
this->errorMessage = "";
this->waypoints.clear();
this->returnPathIdx.clear();
this->arrivalPathIdx.clear();
this->tiles.polygons().clear();
this->tilesLocal.polygons().clear();
this->tileCenterPoints.clear();
}
...@@ -4,23 +4,27 @@ ...@@ -4,23 +4,27 @@
#include <QVariant> #include <QVariant>
#include <QThread> #include <QThread>
#include <QVector> #include <QVector>
#include <QGeoCoordinate>
#include <vector>
#include "snake_geometry.h" #include "SnakeTiles.h"
#include "snake.h" #include "SnakeTilesLocal.h"
using namespace snake;
using namespace snake_geometry;
using namespace std;
typedef QList<QVariant> QVariantList; typedef QList<QVariant> QVariantList;
typedef struct Result{ typedef struct Result{
QVariantList waypoints; QVector<QGeoCoordinate> waypoints;
SnakeTiles tiles;
SnakeTilesLocal tilesLocal;
QVariantList tileCenterPoints;
QGeoCoordinate origin;
QVector<unsigned long> arrivalPathIdx; QVector<unsigned long> arrivalPathIdx;
QVector<unsigned long> returnPathIdx; QVector<unsigned long> returnPathIdx;
bool success; bool success;
QString errorMessage; QString errorMessage;
void clear();
}WorkerResult_t; }WorkerResult_t;
class SnakeWorker : public QThread{ class SnakeWorker : public QThread{
...@@ -30,21 +34,86 @@ public: ...@@ -30,21 +34,86 @@ public:
SnakeWorker(QObject *parent = nullptr); SnakeWorker(QObject *parent = nullptr);
~SnakeWorker() override; ~SnakeWorker() override;
void setScenario (const Scenario &scenario);
void setProgress (const QVector<int> &progress);
void setLineDistance (double lineDistance);
void setMinTransectLength (double minTransectLength);
const WorkerResult_t &getResult() const; template<template<class, class...> class Container>
void setMeasurementArea (const Container<QGeoCoordinate> &measurementArea);
template<template<class, class...> class Container>
void setServiceArea (const Container<QGeoCoordinate> &serviceArea);
template<template<class, class...> class Container>
void setCorridor (const Container<QGeoCoordinate> &corridor);
template<template<class, class...> class Container, class IntType>
void setProgress (const Container<IntType> &progress);
WorkerResult_t &result();
double lineDistance() const;
void setLineDistance(double lineDistance);
double minTransectLength() const;
void setMinTransectLength(double minTransectLength);
double minTileArea() const;
void setMinTileArea(double minTileArea);
double tileHeight() const;
void setTileHeight(double tileHeight);
double tileWidth() const;
void setTileWidth(double tileWidth);
protected: protected:
void run() override; void run() override;
private: private:
Scenario _scenario; bool precondition() const;
vector<int8_t> _progress;
double _lineDistance;
double _minTransectLength; std::vector<QGeoCoordinate> _mArea;
WorkerResult_t _result; std::vector<QGeoCoordinate> _sArea;
std::vector<QGeoCoordinate> _corridor;
std::vector<int> _progress;
double _lineDistance;
double _minTransectLength;
double _tileWidth;
double _tileHeight;
double _minTileArea;
WorkerResult_t _result;
}; };
template<template<class, class...> class Container>
void SnakeWorker::setCorridor(const Container<QGeoCoordinate> &corridor)
{
_corridor.clear();
for (auto vertex : corridor) {
_corridor.push_back(vertex);
}
}
template<template<class, class...> class Container>
void SnakeWorker::setMeasurementArea(const Container<QGeoCoordinate> &measurementArea)
{
_mArea.clear();
for (auto vertex : measurementArea) {
_mArea.push_back(vertex);
}
}
template<template<class, class...> class Container>
void SnakeWorker::setServiceArea(const Container<QGeoCoordinate> &serviceArea)
{
_sArea.clear();
for (auto vertex : serviceArea) {
_sArea.push_back(vertex);
}
}
template<template<class, class...> class Container, class IntType>
void SnakeWorker::setProgress(const Container<IntType> &progress)
{
_progress.clear();
for (auto p : progress) {
assert(p >= -1 && p <= 100);
_progress.push_back(p);
}
}
...@@ -43,16 +43,15 @@ void Slicer::_updateIdx(long size) ...@@ -43,16 +43,15 @@ void Slicer::_updateIdx(long size)
{ {
_overlap = _overlap < _N ? _overlap : _N-1; _overlap = _overlap < _N ? _overlap : _N-1;
long maxStart = size-_N; _idxStart = _idxStart < size ? _idxStart : size-1;
_idxStart = _idxStart <= maxStart ? _idxStart : maxStart; _idxStart = _idxStart < 0 ? 0 : _idxStart;
_idxStart = _idxStart < 0 ? 0 : _idxStart;
_idxEnd = _idxStart + _N - 1; _idxEnd = _idxStart + _N - 1;
_idxEnd = _idxEnd < size ? _idxEnd : size-1; _idxEnd = _idxEnd < size ? _idxEnd : size-1;
_idxNext = _idxEnd + 1 - _overlap; _idxNext = _idxEnd == size -1 ? _idxStart : _idxEnd + 1 - _overlap;
_idxNext = _idxNext < 0 ? 0 : _idxNext; _idxNext = _idxNext < 0 ? 0 : _idxNext;
_idxNext = _idxNext < size ? _idxNext : size-1; _idxNext = _idxNext < size ? _idxNext : size-1;
_idxPrevious = _idxStart + _overlap - _N; _idxPrevious = _idxStart + _overlap - _N;
_idxPrevious = _idxPrevious < 0 ? 0 : _idxPrevious; _idxPrevious = _idxPrevious < 0 ? 0 : _idxPrevious;
......
This diff is collapsed.
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
#define SMART_RTL_MAX_ATTEMPTS 3 // times #define SMART_RTL_MAX_ATTEMPTS 3 // times
#define SMART_RTL_ATTEMPT_INTERVAL 200 // ms #define SMART_RTL_ATTEMPT_INTERVAL 200 // ms
#define EVENT_TIMER_INTERVAL 50 // ms #define EVENT_TIMER_INTERVAL 50 // ms
#define SNAKE_EVENT_LOOP_INTERVAL 5000 // ms #define SNAKE_EVENT_LOOP_INTERVAL 1000 // ms
using namespace snake; using namespace snake;
...@@ -338,10 +338,7 @@ private slots: ...@@ -338,10 +338,7 @@ private slots:
void _smartRTLCleanUp (bool flying); void _smartRTLCleanUp (bool flying);
// Snake. // Snake.
void _setSnakeCalcInProgress (bool inProgress); void _setSnakeCalcInProgress (bool inProgress);
bool _isScenarioDefined (void);
bool _isScenarioDefinedErrorMessage (void);
void _snakeStoreWorkerResults (); void _snakeStoreWorkerResults ();
void _startStopRosBridge ();
void _initStartSnakeWorker (); void _initStartSnakeWorker ();
void _switchSnakeManager (QVariant variant); void _switchSnakeManager (QVariant variant);
// Periodic tasks. // Periodic tasks.
...@@ -351,16 +348,12 @@ private slots: ...@@ -351,16 +348,12 @@ private slots:
private: private:
using StatusMap = std::map<int, QString>; using StatusMap = std::map<int, QString>;
// Snake.
void _progressFromJson (JsonDocUPtr pDoc,
QNemoProgress &progress);
// Controllers. // Controllers.
PlanMasterController *_masterController; PlanMasterController *_masterController;
MissionController *_missionController; MissionController *_missionController;
// Wima Data. // Wima Data.
QmlObjectListModel _areas; // contains all visible areas QmlObjectListModel _areas; // contains all visible areas
WimaJoinedAreaData _joinedArea; // joined area fromed by opArea, serArea, _corridor WimaJoinedAreaData _joinedArea; // joined area fromed by opArea, serArea, _corridor
WimaMeasurementAreaData _measurementArea; // measurement area WimaMeasurementAreaData _measurementArea; // measurement area
...@@ -375,6 +368,8 @@ private: ...@@ -375,6 +368,8 @@ private:
WaypointManager::DefaultManager _snakeManager; WaypointManager::DefaultManager _snakeManager;
WaypointManager::RTLManager _rtlManager; WaypointManager::RTLManager _rtlManager;
WaypointManager::ManagerBase *_currentManager; WaypointManager::ManagerBase *_currentManager;
using ManagerList = QList<WaypointManager::ManagerBase*>;
ManagerList _managerList;
// Settings Facts. // Settings Facts.
QMap<QString, FactMetaData*> _metaDataMap; QMap<QString, FactMetaData*> _metaDataMap;
...@@ -406,11 +401,8 @@ private: ...@@ -406,11 +401,8 @@ private:
// Snake // Snake
bool _snakeCalcInProgress; bool _snakeCalcInProgress;
bool _snakeRecalcNecessary;
bool _scenarioDefinedBool;
SnakeWorker _snakeWorker; SnakeWorker _snakeWorker;
Scenario _scenario; GeoPoint _snakeOrigin;
::GeoPoint3D _snakeOrigin;
SnakeTiles _snakeTiles; // tiles SnakeTiles _snakeTiles; // tiles
SnakeTilesLocal _snakeTilesLocal; // tiles local coordinate system SnakeTilesLocal _snakeTilesLocal; // tiles local coordinate system
QVariantList _snakeTileCenterPoints; QVariantList _snakeTileCenterPoints;
...@@ -418,6 +410,7 @@ private: ...@@ -418,6 +410,7 @@ private:
QNemoHeartbeat _nemoHeartbeat; // measurement progress QNemoHeartbeat _nemoHeartbeat; // measurement progress
int _fallbackStatus; int _fallbackStatus;
ROSBridgePtr _pRosBridge; ROSBridgePtr _pRosBridge;
bool _bridgeSetupDone;
static StatusMap _nemoStatusMap; static StatusMap _nemoStatusMap;
// Periodic tasks. // Periodic tasks.
......
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
#include <memory> #include <memory>
#include <deque> #include <deque>
#include <set> #include <unordered_map>
namespace ROSBridge { namespace ROSBridge {
namespace ComPrivate { namespace ComPrivate {
...@@ -15,7 +15,8 @@ typedef rapidjson::Document JsonDoc; ...@@ -15,7 +15,8 @@ typedef rapidjson::Document JsonDoc;
typedef std::unique_ptr<JsonDoc> JsonDocUPtr; typedef std::unique_ptr<JsonDoc> JsonDocUPtr;
typedef std::deque<JsonDocUPtr> JsonQueue; typedef std::deque<JsonDocUPtr> JsonQueue;
typedef std::size_t HashType; typedef std::size_t HashType;
typedef std::set<HashType> HashSet;
using ClientMap = std::unordered_map<HashType, std::string>;
static const char* _topicAdvertiserKey = "topic_advertiser"; static const char* _topicAdvertiserKey = "topic_advertiser";
static const char* _topicPublisherKey = "topic_publisher"; static const char* _topicPublisherKey = "topic_publisher";
......
...@@ -79,6 +79,17 @@ struct GeometryMsgs { ...@@ -79,6 +79,17 @@ struct GeometryMsgs {
}; };
}; };
struct GeographicMsgs {
static StringType label() {return "geographic_msgs";}
//!
//! \brief The GeoPointGroup struct is used the mark a class as a ROS geographic_msgs/GeoPoint message.
struct GeoPointGroup {
static StringType label() {return "GeoPoint";}
};
};
struct JSKRecognitionMsgs { struct JSKRecognitionMsgs {
static StringType label() {return "jsk_recognition_msgs";} static StringType label() {return "jsk_recognition_msgs";}
...@@ -118,8 +129,8 @@ typedef MessageGroups::MessageGroup<MessageGroups::GeometryMsgs, ...@@ -118,8 +129,8 @@ typedef MessageGroups::MessageGroup<MessageGroups::GeometryMsgs,
MessageGroups::GeometryMsgs::Point32Group> MessageGroups::GeometryMsgs::Point32Group>
Point32Group; Point32Group;
typedef MessageGroups::MessageGroup<MessageGroups::GeometryMsgs, typedef MessageGroups::MessageGroup<MessageGroups::GeographicMsgs,
MessageGroups::GeometryMsgs::GeoPointGroup> MessageGroups::GeographicMsgs::GeoPointGroup>
GeoPointGroup; GeoPointGroup;
typedef MessageGroups::MessageGroup<MessageGroups::GeometryMsgs, typedef MessageGroups::MessageGroup<MessageGroups::GeometryMsgs,
......
...@@ -34,8 +34,11 @@ public: ...@@ -34,8 +34,11 @@ public:
//! @brief Start ROS bridge. //! @brief Start ROS bridge.
void start(); void start();
//! @brief Reset ROS bridge. //! @brief Stops ROS bridge.
void reset(); void reset();
//! @return Returns true if connected to the rosbridge_server, false either.
//! @note This function can block up to 100ms. However normal execution time is smaller.
bool connected();
private: private:
TypeFactory _typeFactory; TypeFactory _typeFactory;
......
...@@ -15,6 +15,9 @@ ...@@ -15,6 +15,9 @@
#include <chrono> #include <chrono>
#include <functional> #include <functional>
#include <thread> #include <thread>
#include <future>
#include <functional>
#include <mutex>
using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>; using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>;
using InMessage = std::function<void(std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::InMessage>)>; using InMessage = std::function<void(std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::InMessage>)>;
...@@ -23,18 +26,20 @@ class RosbridgeWsClient ...@@ -23,18 +26,20 @@ class RosbridgeWsClient
{ {
std::string server_port_path; std::string server_port_path;
std::unordered_map<std::string, std::shared_ptr<WsClient>> client_map; std::unordered_map<std::string, std::shared_ptr<WsClient>> client_map;
std::mutex map_mutex;
// Starts the client.
void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message) void start(const std::string& client_name, std::shared_ptr<WsClient> client, const std::string& message)
{ {
if (!client->on_open) if (!client->on_open)
{ {
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
client->on_open = [client_name, message](std::shared_ptr<WsClient::Connection> connection) { client->on_open = [client_name, message](std::shared_ptr<WsClient::Connection> connection) {
#else #else
client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) { client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
#endif #endif
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << client_name << ": Opened connection" << std::endl; std::cout << client_name << ": Opened connection" << std::endl;
std::cout << client_name << ": Sending message: " << message << std::endl; std::cout << client_name << ": Sending message: " << message << std::endl;
#endif #endif
...@@ -42,7 +47,7 @@ class RosbridgeWsClient ...@@ -42,7 +47,7 @@ class RosbridgeWsClient
}; };
} }
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
if (!client->on_message) if (!client->on_message)
{ {
client->on_message = [client_name](std::shared_ptr<WsClient::Connection> /*connection*/, std::shared_ptr<WsClient::InMessage> in_message) { client->on_message = [client_name](std::shared_ptr<WsClient::Connection> /*connection*/, std::shared_ptr<WsClient::InMessage> in_message) {
...@@ -66,14 +71,14 @@ class RosbridgeWsClient ...@@ -66,14 +71,14 @@ class RosbridgeWsClient
} }
#endif #endif
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
std::thread client_thread([client_name, client]() { std::thread client_thread([client_name, client]() {
#else #else
std::thread client_thread([client]() { std::thread client_thread([client]() {
#endif #endif
client->start(); client->start();
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << client_name << ": Terminated" << std::endl; std::cout << client_name << ": Terminated" << std::endl;
#endif #endif
client->on_open = NULL; client->on_open = NULL;
...@@ -83,9 +88,6 @@ class RosbridgeWsClient ...@@ -83,9 +88,6 @@ class RosbridgeWsClient
}); });
client_thread.detach(); client_thread.detach();
// This is to make sure that the thread got fully launched before we do anything to it (e.g. remove)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} }
public: public:
...@@ -96,6 +98,8 @@ public: ...@@ -96,6 +98,8 @@ public:
~RosbridgeWsClient() ~RosbridgeWsClient()
{ {
std::lock_guard<std::mutex> lk(map_mutex); // neccessary?
for (auto& client : client_map) for (auto& client : client_map)
{ {
client.second->stop(); client.second->stop();
...@@ -103,14 +107,44 @@ public: ...@@ -103,14 +107,44 @@ public:
} }
} }
// The execution can take up to 100 ms!
bool connected(){
auto p = std::make_shared<std::promise<void>>();
auto future = p->get_future();
auto callback = [](std::shared_ptr<WsClient::Connection> connection, std::shared_ptr<std::promise<void>> p) {
p->set_value();
connection->send_close(1000);
};
std::shared_ptr<WsClient> status_client = std::make_shared<WsClient>(server_port_path);
status_client->on_open = std::bind(callback, std::placeholders::_1, p);
std::async([status_client]{
status_client->start();
status_client->on_open = NULL;
status_client->on_message = NULL;
status_client->on_close = NULL;
status_client->on_error = NULL;
});
auto status = future.wait_for(std::chrono::milliseconds(100));
return status == std::future_status::ready;
}
// Adds a client to the client_map.
void addClient(const std::string& client_name) void addClient(const std::string& client_name)
{ {
std::lock_guard<std::mutex> lk(map_mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it == client_map.end()) if (it == client_map.end())
{ {
client_map[client_name] = std::make_shared<WsClient>(server_port_path); client_map[client_name] = std::make_shared<WsClient>(server_port_path);
} }
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
else else
{ {
std::cerr << client_name << " has already been created" << std::endl; std::cerr << client_name << " has already been created" << std::endl;
...@@ -120,6 +154,8 @@ public: ...@@ -120,6 +154,8 @@ public:
std::shared_ptr<WsClient> getClient(const std::string& client_name) std::shared_ptr<WsClient> getClient(const std::string& client_name)
{ {
std::lock_guard<std::mutex> lk(map_mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end()) if (it != client_map.end())
{ {
...@@ -130,6 +166,8 @@ public: ...@@ -130,6 +166,8 @@ public:
void stopClient(const std::string& client_name) void stopClient(const std::string& client_name)
{ {
std::lock_guard<std::mutex> lk(map_mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end()) if (it != client_map.end())
{ {
...@@ -138,11 +176,11 @@ public: ...@@ -138,11 +176,11 @@ public:
it->second->on_message = NULL; it->second->on_message = NULL;
it->second->on_close = NULL; it->second->on_close = NULL;
it->second->on_error = NULL; it->second->on_error = NULL;
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << client_name << " has been stopped" << std::endl; std::cout << client_name << " has been stopped" << std::endl;
#endif #endif
} }
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
else else
{ {
std::cerr << client_name << " has not been created" << std::endl; std::cerr << client_name << " has not been created" << std::endl;
...@@ -152,17 +190,27 @@ public: ...@@ -152,17 +190,27 @@ public:
void removeClient(const std::string& client_name) void removeClient(const std::string& client_name)
{ {
std::lock_guard<std::mutex> lk(map_mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end()) if (it != client_map.end())
{ {
it->second->stop(); // Stop the client asynchronously in 100 ms.
it->second.reset(); // This is to ensure, that all threads involving client have been launched.
std::thread t([](std::shared_ptr<WsClient> client){
std::this_thread::sleep_for(std::chrono::milliseconds(100));
client->stop();
client.reset();
},
it->second /*lambda param*/ );
client_map.erase(it); client_map.erase(it);
#ifdef DEBUG t.detach();
#ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << client_name << " has been removed" << std::endl; std::cout << client_name << " has been removed" << std::endl;
#endif #endif
} }
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
else else
{ {
std::cerr << client_name << " has not been created" << std::endl; std::cerr << client_name << " has not been created" << std::endl;
...@@ -170,8 +218,12 @@ public: ...@@ -170,8 +218,12 @@ public:
#endif #endif
} }
// Gets the client from client_map and starts it. Advertising is essentially sending a message.
// One client per topic!
void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "") void advertise(const std::string& client_name, const std::string& topic, const std::string& type, const std::string& id = "")
{ {
std::lock_guard<std::mutex> lk(map_mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end()) if (it != client_map.end())
{ {
...@@ -185,7 +237,7 @@ public: ...@@ -185,7 +237,7 @@ public:
start(client_name, it->second, message); start(client_name, it->second, message);
} }
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
else else
{ {
std::cerr << client_name << "has not been created" << std::endl; std::cerr << client_name << "has not been created" << std::endl;
...@@ -210,7 +262,7 @@ public: ...@@ -210,7 +262,7 @@ public:
std::shared_ptr<WsClient> publish_client = std::make_shared<WsClient>(server_port_path); std::shared_ptr<WsClient> publish_client = std::make_shared<WsClient>(server_port_path);
publish_client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) { publish_client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << "publish_client: Opened connection" << std::endl; std::cout << "publish_client: Opened connection" << std::endl;
std::cout << "publish_client: Sending message: " << message << std::endl; std::cout << "publish_client: Sending message: " << message << std::endl;
#endif #endif
...@@ -225,6 +277,8 @@ public: ...@@ -225,6 +277,8 @@ public:
void subscribe(const std::string& client_name, const std::string& topic, const InMessage& callback, const std::string& id = "", const std::string& type = "", int throttle_rate = -1, int queue_length = -1, int fragment_size = -1, const std::string& compression = "") void subscribe(const std::string& client_name, const std::string& topic, const InMessage& callback, const std::string& id = "", const std::string& type = "", int throttle_rate = -1, int queue_length = -1, int fragment_size = -1, const std::string& compression = "")
{ {
std::lock_guard<std::mutex> lk(map_mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end()) if (it != client_map.end())
{ {
...@@ -259,7 +313,7 @@ public: ...@@ -259,7 +313,7 @@ public:
it->second->on_message = callback; it->second->on_message = callback;
start(client_name, it->second, message); start(client_name, it->second, message);
} }
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
else else
{ {
std::cerr << client_name << "has not been created" << std::endl; std::cerr << client_name << "has not been created" << std::endl;
...@@ -269,6 +323,8 @@ public: ...@@ -269,6 +323,8 @@ public:
void advertiseService(const std::string& client_name, const std::string& service, const std::string& type, const InMessage& callback) void advertiseService(const std::string& client_name, const std::string& service, const std::string& type, const InMessage& callback)
{ {
std::lock_guard<std::mutex> lk(map_mutex);
std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name); std::unordered_map<std::string, std::shared_ptr<WsClient>>::iterator it = client_map.find(client_name);
if (it != client_map.end()) if (it != client_map.end())
{ {
...@@ -277,7 +333,7 @@ public: ...@@ -277,7 +333,7 @@ public:
it->second->on_message = callback; it->second->on_message = callback;
start(client_name, it->second, message); start(client_name, it->second, message);
} }
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
else else
{ {
std::cerr << client_name << "has not been created" << std::endl; std::cerr << client_name << "has not been created" << std::endl;
...@@ -304,7 +360,7 @@ public: ...@@ -304,7 +360,7 @@ public:
std::shared_ptr<WsClient> service_response_client = std::make_shared<WsClient>(server_port_path); std::shared_ptr<WsClient> service_response_client = std::make_shared<WsClient>(server_port_path);
service_response_client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) { service_response_client->on_open = [message](std::shared_ptr<WsClient::Connection> connection) {
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << "service_response_client: Opened connection" << std::endl; std::cout << "service_response_client: Opened connection" << std::endl;
std::cout << "service_response_client: Sending message: " << message << std::endl; std::cout << "service_response_client: Sending message: " << message << std::endl;
#endif #endif
...@@ -351,7 +407,7 @@ public: ...@@ -351,7 +407,7 @@ public:
else else
{ {
call_service_client->on_message = [](std::shared_ptr<WsClient::Connection> connection, std::shared_ptr<WsClient::InMessage> in_message) { call_service_client->on_message = [](std::shared_ptr<WsClient::Connection> connection, std::shared_ptr<WsClient::InMessage> in_message) {
#ifdef DEBUG #ifdef ROS_BRIDGE_CLIENT_DEBUG
std::cout << "call_service_client: Message received: " << in_message->string() << std::endl; std::cout << "call_service_client: Message received: " << in_message->string() << std::endl;
std::cout << "call_service_client: Sending close connection" << std::endl; std::cout << "call_service_client: Sending close connection" << std::endl;
#endif #endif
......
...@@ -2,8 +2,20 @@ ...@@ -2,8 +2,20 @@
ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker *casePacker,
RosbridgeWsClient *rbc) :
struct ROSBridge::ComPrivate::ThreadData
{
const ROSBridge::CasePacker &casePacker;
RosbridgeWsClient &rbc;
ROSBridge::ComPrivate::JsonQueue &queue;
std::mutex &queueMutex;
const std::atomic<bool> &running;
std::condition_variable &cv;
};
ROSBridge::ComPrivate::TopicPublisher::TopicPublisher(CasePacker &casePacker,
RosbridgeWsClient &rbc) :
_running(false) _running(false)
, _casePacker(casePacker) , _casePacker(casePacker)
, _rbc(rbc) , _rbc(rbc)
...@@ -21,14 +33,15 @@ void ROSBridge::ComPrivate::TopicPublisher::start() ...@@ -21,14 +33,15 @@ void ROSBridge::ComPrivate::TopicPublisher::start()
if ( _running.load() ) // start called while thread running. if ( _running.load() ) // start called while thread running.
return; return;
_running.store(true); _running.store(true);
_rbc->addClient(ROSBridge::ComPrivate::_topicAdvertiserKey); ROSBridge::ComPrivate::ThreadData data{
_pThread.reset(new std::thread(&ROSBridge::ComPrivate::transmittLoop, _casePacker,
std::cref(*_casePacker), _rbc,
std::ref(*_rbc), _queue,
std::ref(_queue), _queueMutex,
std::ref(_queueMutex), _running,
std::ref(_advertisedTopicsHashList), _cv
std::cref(_running))); };
_pThread = std::make_unique<std::thread>(&ROSBridge::ComPrivate::transmittLoop, data);
} }
void ROSBridge::ComPrivate::TopicPublisher::reset() void ROSBridge::ComPrivate::TopicPublisher::reset()
...@@ -36,66 +49,61 @@ void ROSBridge::ComPrivate::TopicPublisher::reset() ...@@ -36,66 +49,61 @@ void ROSBridge::ComPrivate::TopicPublisher::reset()
if ( !_running.load() ) // stop called while thread not running. if ( !_running.load() ) // stop called while thread not running.
return; return;
_running.store(false); _running.store(false);
_cv.notify_one(); // Wake publisher thread.
if ( !_pThread ) if ( !_pThread )
return; return;
_pThread->join(); _pThread->join();
_pThread.reset();
_rbc->removeClient(ROSBridge::ComPrivate::_topicAdvertiserKey);
_queue.clear(); _queue.clear();
_advertisedTopicsHashList.clear();
} }
void ROSBridge::ComPrivate::transmittLoop(const ROSBridge::CasePacker &casePacker, void ROSBridge::ComPrivate::transmittLoop(ROSBridge::ComPrivate::ThreadData data)
RosbridgeWsClient &rbc,
ROSBridge::ComPrivate::JsonQueue &queue,
std::mutex &queueMutex,
HashSet &advertisedTopicsHashList,
const std::atomic<bool> &running)
{ {
rbc.addClient(ROSBridge::ComPrivate::_topicPublisherKey); // Init.
ClientMap clientMap;
while(running.load()){ // Main Loop.
// Pop message from queue. while(data.running.load()){
queueMutex.lock(); std::unique_lock<std::mutex> lk(data.queueMutex);
//std::cout << "Queue size: " << queue.size() << std::endl; // Check if new data available, wait if not.
if (queue.empty()){ if (data.queue.empty()){
queueMutex.unlock(); data.cv.wait(lk); // Wait for condition, spurious wakeups don't matter in this case.
std::this_thread::sleep_for(std::chrono::milliseconds(20));
continue; continue;
} }
JsonDocUPtr pJsonDoc(std::move(queue.front())); // Pop message from queue.
queue.pop_front(); JsonDocUPtr pJsonDoc(std::move(data.queue.front()));
queueMutex.unlock(); data.queue.pop_front();
lk.unlock();
// Debug output.
// std::cout << "Transmitter loop json document:" << std::endl;
// rapidjson::OStreamWrapper out(std::cout);
// rapidjson::Writer<rapidjson::OStreamWrapper> writer(out);
// pJsonDoc->Accept(writer);
// std::cout << std::endl << std::endl;
// Get tag from Json message and remove it. // Get tag from Json message and remove it.
Tag tag; Tag tag;
bool ret = casePacker.getTag(pJsonDoc, tag); bool ret = data.casePacker.getTag(pJsonDoc, tag);
assert(ret); // Json message does not contain a tag; assert(ret); // Json message does not contain a tag;
(void)ret; (void)ret;
casePacker.removeTag(pJsonDoc); data.casePacker.removeTag(pJsonDoc);
// Check if topic must be advertised. // Check if topic must be advertised.
// Advertised topics are stored in advertisedTopicsHashList as // Advertised topics are stored in advertisedTopicsHashList as
// a hash. // a hash.
HashType hash = ROSBridge::ComPrivate::getHash(tag.topic()); std::string clientName = ROSBridge::ComPrivate::_topicAdvertiserKey
if ( advertisedTopicsHashList.count(hash) == 0) { + tag.topic();
advertisedTopicsHashList.insert(hash); HashType hash = ROSBridge::ComPrivate::getHash(clientName);
rbc.advertise(ROSBridge::ComPrivate::_topicAdvertiserKey, auto it = clientMap.find(hash);
tag.topic(), if ( it == clientMap.end()) { // Need to advertise topic?
tag.messageType() ); clientMap.insert(std::make_pair(hash, clientName));
std::cout << clientName << ";"
<< tag.topic() << ";"
<< tag.messageType() << ";" << std::endl;
data.rbc.addClient(clientName);
data.rbc.advertise(clientName,
tag.topic(),
tag.messageType() );
} }
// Debug output.
//std::cout << "Hash Set size: " << advertisedTopicsHashList.size() << std::endl;
// Send Json message. // Publish Json message.
rbc.publish(tag.topic(), *pJsonDoc.get()); data.rbc.publish(tag.topic(), *pJsonDoc.get());
} // while loop } // while loop
// Tidy up.
for (auto& it : clientMap)
data.rbc.removeClient(it.second);
} }
...@@ -10,18 +10,22 @@ ...@@ -10,18 +10,22 @@
#include <atomic> #include <atomic>
#include <mutex> #include <mutex>
#include <set> #include <set>
#include <condition_variable>
namespace ROSBridge { namespace ROSBridge {
namespace ComPrivate { namespace ComPrivate {
struct ThreadData;
class TopicPublisher class TopicPublisher
{ {
typedef std::unique_ptr<std::thread> ThreadPtr; typedef std::unique_ptr<std::thread> ThreadPtr;
using CondVar = std::condition_variable;
public: public:
TopicPublisher() = delete; TopicPublisher() = delete;
TopicPublisher(CasePacker *casePacker, TopicPublisher(CasePacker &casePacker,
RosbridgeWsClient *rbc); RosbridgeWsClient &rbc);
~TopicPublisher(); ~TopicPublisher();
...@@ -31,34 +35,32 @@ public: ...@@ -31,34 +35,32 @@ public:
//! @brief Resets the publisher. //! @brief Resets the publisher.
void reset(); void reset();
template<class T>
void publish(const T &msg, const std::string &topic){
JsonDocUPtr docPtr(_casePacker->pack(msg, topic));
publish(std::move(docPtr));
}
void publish(JsonDocUPtr docPtr){ void publish(JsonDocUPtr docPtr){
{
std::lock_guard<std::mutex> lock(_queueMutex); std::lock_guard<std::mutex> lock(_queueMutex);
_queue.push_back(std::move(docPtr)); _queue.push_back(std::move(docPtr));
}
_cv.notify_one(); // Wake publisher thread.
}
template<class T>
void publish(const T &msg, const std::string &topic){
JsonDocUPtr docPtr(_casePacker.pack(msg, topic));
publish(std::move(docPtr));
} }
private: private:
JsonQueue _queue; JsonQueue _queue;
std::mutex _queueMutex; std::mutex _queueMutex;
std::atomic<bool> _running; std::atomic<bool> _running;
CasePacker *_casePacker; CasePacker &_casePacker;
RosbridgeWsClient &_rbc;
CondVar _cv;
ThreadPtr _pThread; ThreadPtr _pThread;
RosbridgeWsClient *_rbc;
HashSet _advertisedTopicsHashList; // Not thread save! This container
// is manipulated by transmittLoop only!
}; };
void transmittLoop(const ROSBridge::CasePacker &casePacker, void transmittLoop(ThreadData data);
RosbridgeWsClient &rbc,
ROSBridge::ComPrivate::JsonQueue &queue,
std::mutex &queueMutex,
HashSet &advertisedTopicsHashList,
const std::atomic<bool> &running);
} // namespace CommunicatorDetail } // namespace CommunicatorDetail
......
#include "TopicSubscriber.h" #include "TopicSubscriber.h"
ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber( ROSBridge::ComPrivate::TopicSubscriber::TopicSubscriber(CasePacker &casePacker,
ROSBridge::CasePacker *casePacker, RosbridgeWsClient &rbc) :
RosbridgeWsClient *rbc) :
_casePacker(casePacker) _casePacker(casePacker)
, _rbc(rbc) , _rbc(rbc)
, _running(false) , _running(false)
...@@ -24,10 +23,16 @@ void ROSBridge::ComPrivate::TopicSubscriber::start() ...@@ -24,10 +23,16 @@ void ROSBridge::ComPrivate::TopicSubscriber::start()
void ROSBridge::ComPrivate::TopicSubscriber::reset() void ROSBridge::ComPrivate::TopicSubscriber::reset()
{ {
if ( _running ){ if ( _running ){
for (std::string clientName : _clientList)
_rbc->removeClient(clientName);
_running = false; _running = false;
_callbackMap.clear(); {
for (std::string clientName : _clientList){
_rbc.removeClient(clientName);
}
}
{
std::lock_guard<std::mutex> lk(_callbackMapStruct.mutex);
_callbackMapStruct.map.clear();
}
_clientList.clear(); _clientList.clear();
} }
} }
...@@ -44,25 +49,39 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( ...@@ -44,25 +49,39 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe(
_clientList.push_back(clientName); _clientList.push_back(clientName);
HashType hash = getHash(clientName); HashType hash = getHash(clientName);
auto ret = _callbackMap.insert(std::make_pair(hash, callback)); //
if ( !ret.second )
return false; // Topic subscription already present.
{ {
std::lock_guard<std::mutex> lk(_callbackMapStruct.mutex);
auto ret = _callbackMapStruct.map.insert(std::make_pair(hash, callback)); //
if ( !ret.second )
return false; // Topic subscription already present.
}
using namespace std::placeholders; using namespace std::placeholders;
auto f = std::bind(&ROSBridge::ComPrivate::subscriberCallback, auto f = std::bind(&ROSBridge::ComPrivate::subscriberCallback,
hash, hash,
std::cref(_callbackMap), std::ref(_callbackMapStruct),
_1, _2); _1, _2);
// std::cout << std::endl; // std::cout << std::endl;
// std::cout << "topic subscription" << std::endl; // std::cout << "topic subscription" << std::endl;
// std::cout << "client name: " << clientName << std::endl; // std::cout << "client name: " << clientName << std::endl;
// std::cout << "topic: " << topic << std::endl; // std::cout << "topic: " << topic << std::endl;
_rbc->addClient(clientName); {
_rbc->subscribe(clientName, auto start = std::chrono::high_resolution_clock::now();
topic, _rbc.addClient(clientName);
f ); auto end = std::chrono::high_resolution_clock::now();
std::cout << "add client time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms" << std::endl;
start = std::chrono::high_resolution_clock::now();
_rbc.subscribe(clientName,
topic,
f );
end = std::chrono::high_resolution_clock::now();
std::cout << "subscribe time: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end-start).count()
<< " ms" << std::endl;
} }
return true; return true;
...@@ -72,7 +91,7 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe( ...@@ -72,7 +91,7 @@ bool ROSBridge::ComPrivate::TopicSubscriber::subscribe(
using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>; using WsClient = SimpleWeb::SocketClient<SimpleWeb::WS>;
void ROSBridge::ComPrivate::subscriberCallback( void ROSBridge::ComPrivate::subscriberCallback(
const HashType &hash, const HashType &hash,
const ROSBridge::ComPrivate::TopicSubscriber::CallbackMap &map, ROSBridge::ComPrivate::MapStruct &mapWrapper,
std::shared_ptr<WsClient::Connection>, std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message) std::shared_ptr<WsClient::InMessage> in_message)
{ {
...@@ -97,26 +116,21 @@ void ROSBridge::ComPrivate::subscriberCallback( ...@@ -97,26 +116,21 @@ void ROSBridge::ComPrivate::subscriberCallback(
// << std::endl; // << std::endl;
// Search callback. // Search callback.
auto it = map.find(hash); CallbackType callback;
if (it == map.end()) { {
assert(false); // callback not found std::lock_guard<std::mutex> lk(mapWrapper.mutex);
return; auto it = mapWrapper.map.find(hash);
if (it == mapWrapper.map.end()) {
//assert(false); // callback not found
return;
}
callback = it->second;
} }
// Extract message and call callback. // Extract message and call callback.
JsonDocUPtr pDoc(new JsonDoc()); JsonDocUPtr pDoc(new JsonDoc());
pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator()); pDoc->CopyFrom(docFull["msg"].Move(), docFull.GetAllocator());
it->second(std::move(pDoc)); // Call callback. callback(std::move(pDoc));
return; return;
} }
void ROSBridge::ComPrivate::test(
std::shared_ptr<WsClient::Connection>,
std::shared_ptr<WsClient::InMessage> in_message)
{
std::cout << "test: Json document: "
<< in_message->string()
<< std::endl;
}
...@@ -8,15 +8,21 @@ ...@@ -8,15 +8,21 @@
namespace ROSBridge { namespace ROSBridge {
namespace ComPrivate { namespace ComPrivate {
typedef std::function<void(JsonDocUPtr)> CallbackType;
struct MapStruct{
typedef std::map<HashType, CallbackType> Map;
Map map;
std::mutex mutex;
};
class TopicSubscriber class TopicSubscriber
{ {
typedef std::vector<std::string> ClientList; typedef std::vector<std::string> ClientList;
public: public:
typedef std::function<void(JsonDocUPtr)> CallbackType;
typedef std::map<HashType, CallbackType> CallbackMap;
TopicSubscriber() = delete; TopicSubscriber() = delete;
TopicSubscriber(CasePacker *casePacker, RosbridgeWsClient *rbc); TopicSubscriber(CasePacker &casePacker, RosbridgeWsClient &rbc);
~TopicSubscriber(); ~TopicSubscriber();
//! @brief Starts the subscriber. //! @brief Starts the subscriber.
...@@ -31,19 +37,19 @@ public: ...@@ -31,19 +37,19 @@ public:
bool subscribe(const char* topic, const CallbackType &callback); bool subscribe(const char* topic, const CallbackType &callback);
private: private:
CasePacker *_casePacker;
RosbridgeWsClient *_rbc;
CallbackMap _callbackMap;
CasePacker &_casePacker;
RosbridgeWsClient &_rbc;
MapStruct _callbackMapStruct;
ClientList _clientList; ClientList _clientList;
bool _running; bool _running;
}; };
void subscriberCallback(const HashType &hash, void subscriberCallback(const HashType &hash,
const TopicSubscriber::CallbackMap &map, MapStruct &mapWrapper,
std::shared_ptr<WsClient::Connection> /*connection*/, std::shared_ptr<WsClient::Connection> /*connection*/,
std::shared_ptr<WsClient::InMessage> in_message); std::shared_ptr<WsClient::InMessage> in_message);
void test(std::shared_ptr<WsClient::Connection> /*connection*/,
std::shared_ptr<WsClient::InMessage> in_message);
} // namespace ComPrivate } // namespace ComPrivate
} // namespace ROSBridge } // namespace ROSBridge
...@@ -3,8 +3,8 @@ ...@@ -3,8 +3,8 @@
ROSBridge::ROSBridge::ROSBridge() : ROSBridge::ROSBridge::ROSBridge() :
_casePacker(&_typeFactory, &_jsonFactory) _casePacker(&_typeFactory, &_jsonFactory)
, _rbc("localhost:9090") , _rbc("localhost:9090")
, _topicPublisher(&_casePacker, &_rbc) , _topicPublisher(_casePacker, _rbc)
, _topicSubscriber(&_casePacker, &_rbc) , _topicSubscriber(_casePacker, _rbc)
{ {
} }
...@@ -36,3 +36,8 @@ void ROSBridge::ROSBridge::reset() ...@@ -36,3 +36,8 @@ void ROSBridge::ROSBridge::reset()
_topicSubscriber.reset(); _topicSubscriber.reset();
} }
bool ROSBridge::ROSBridge::connected()
{
return _rbc.connected();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment