Commit e59fa155 authored by Lorenz Meier's avatar Lorenz Meier
Browse files

Merge pull request #439 from Susurrus/simpleCheckRates

Add data rate calculations for all LinkInterface-derived classes
parents a7ffa8a8 05582636
......@@ -33,6 +33,9 @@ along with PIXHAWK. If not, see <>.
#include <QThread>
#include <QDateTime>
#include <QMutex>
#include <QMutexLocker>
* The link interface defines the interface for all links used to communicate
......@@ -43,8 +46,27 @@ class LinkInterface : public QThread
LinkInterface(QObject* parent = 0) : QThread(parent) {}
virtual ~LinkInterface() { emit this->deleteLink(this); }
LinkInterface() :
// Initialize everything for the data rate calculation buffers.
inDataIndex = 0;
outDataIndex = 0;
// Initialize our data rate buffers manually, cause C++<03 is dumb.
for (int i = 0; i < dataRateBufferSize; ++i)
inDataWriteAmounts[i] = 0;
inDataWriteTimes[i] = 0;
outDataWriteAmounts[i] = 0;
outDataWriteTimes[i] = 0;
virtual ~LinkInterface() {
emit this->deleteLink(this);
/* Connection management */
......@@ -91,7 +113,10 @@ public:
* @return The data rate of the interface in bits per second, 0 if unknown
virtual qint64 getCurrentInDataRate() const = 0;
qint64 getCurrentInDataRate() const
return getCurrentDataRate(inDataIndex, inDataWriteTimes, inDataWriteAmounts);
* @Brief Get the current outgoing data rate.
......@@ -101,7 +126,10 @@ public:
* @return The data rate of the interface in bits per second, 0 if unknown
virtual qint64 getCurrentOutDataRate() const = 0;
qint64 getCurrentOutDataRate() const
return getCurrentDataRate(outDataIndex, outDataWriteTimes, outDataWriteAmounts);
* @brief Connect this interface logically
......@@ -177,10 +205,114 @@ signals:
void communicationUpdate(const QString& linkname, const QString& text);
/** @brief destroying element */
void deleteLink(LinkInterface* const link);
/** @brief destroying element */
void deleteLink(LinkInterface* const link);
static const int dataRateBufferSize = 20; ///< Specify how many data points to capture for data rate calculations.
static const qint64 dataRateCurrentTimespan = 500; ///< Set the maximum age of samples to use for data calculations (ms).
// Implement a simple circular buffer for storing when and how much data was received.
// Used for calculating the incoming data rate. Use with *StatsBuffer() functions.
int inDataIndex;
quint64 inDataWriteAmounts[dataRateBufferSize]; // In bytes
qint64 inDataWriteTimes[dataRateBufferSize]; // in ms
// Implement a simple circular buffer for storing when and how much data was transmit.
// Used for calculating the outgoing data rate. Use with *StatsBuffer() functions.
int outDataIndex;
quint64 outDataWriteAmounts[dataRateBufferSize]; // In bytes
qint64 outDataWriteTimes[dataRateBufferSize]; // in ms
mutable QMutex dataRateMutex; // Mutex for accessing the data rate member variables
* @brief logDataRateToBuffer Stores transmission times/amounts for statistics
* This function logs the send times and amounts of datas to the given circular buffers.
* This data is used for calculating the transmission rate.
* @param bytesBuffer[out] The buffer to write the bytes value into.
* @param timeBuffer[out] The buffer to write the time value into
* @param writeIndex[out] The write index used for this buffer.
* @param bytes The amount of bytes transmit.
* @param time The time (in ms) this transmission occurred.
static void logDataRateToBuffer(quint64 *bytesBuffer, qint64 *timeBuffer, int *writeIndex, quint64 bytes, qint64 time)
int i = *writeIndex;
// Now write into the buffer, if there's no room, we just overwrite the first data point.
bytesBuffer[i] = bytes;
timeBuffer[i] = time;
// Increment and wrap the write index
if (i == dataRateBufferSize)
i = 0;
*writeIndex = i;
* @brief getCurrentDataRate Get the current data rate given a data rate buffer.
* This function attempts to use the times and number of bytes transmit into a current data rate
* estimation. Since it needs to use timestamps to get the timeperiods over when the data was sent,
* this is effectively a global data rate over the last dataRateBufferSize - 1 data points. Also note
* that data points older than NOW - dataRateCurrentTimespan are ignored.
* @param index The first valid sample in the data rate buffer. Refers to the oldest time sample.
* @param dataWriteTimes The time, in ms since epoch, that each data sample took place.
* @param dataWriteAmounts The amount of data (in bits) that was transferred.
* @return The bits per second of data transferrence of the interface over the last [-statsCurrentTimespan, 0] timespan.
qint64 getCurrentDataRate(int index, const qint64 dataWriteTimes[], const quint64 dataWriteAmounts[]) const
const qint64 now = QDateTime::currentMSecsSinceEpoch();
// Limit the time we calculate to the recent past
const qint64 cutoff = now - dataRateCurrentTimespan;
// Grab the mutex for working with the stats variables
QMutexLocker dataRateLocker(&dataRateMutex);
// Now iterate through the buffer of all received data packets adding up all values
// within now and our cutof.
qint64 totalBytes = 0;
qint64 totalTime = 0;
qint64 lastTime = 0;
int size = dataRateBufferSize;
while (size-- > 0)
// If this data is within our cutoff time, include it in our calculations.
// This also accounts for when the buffer is empty and filled with 0-times.
if (dataWriteTimes[index] > cutoff && lastTime > 0) {
// Track the total time, using the previous time as our timeperiod.
totalTime += dataWriteTimes[index] - lastTime;
totalBytes += dataWriteAmounts[index];
// Track the last time sample for doing timespan calculations
lastTime = dataWriteTimes[index];
// Increment and wrap the index if necessary.
if (++index == dataRateBufferSize)
index = 0;
// Return the final calculated value in bits / s, converted from bytes/ms.
qint64 dataRate = (totalTime != 0)?(qint64)((float)totalBytes * 8.0f / ((float)totalTime / 1000.0f)):0;
// Finally return our calculated data rate.
return dataRate;
static int getNextLinkId() {
static int nextId = 1;
return nextId++;
......@@ -198,7 +330,4 @@ protected slots:
/* Declare C++ interface as Qt interface */
//Q_DECLARE_INTERFACE(LinkInterface, "org.openground.comm.links.LinkInterface/1.0")
......@@ -55,7 +55,7 @@ This file is part of the QGROUNDCONTROL project
* @param writeFile The received messages are written to that file
* @param rate The rate at which the messages are sent (in intervals of milliseconds)
MAVLinkSimulationLink::MAVLinkSimulationLink(QString readFile, QString writeFile, int rate, QObject* parent) : LinkInterface(parent),
MAVLinkSimulationLink::MAVLinkSimulationLink(QString readFile, QString writeFile, int rate, QObject* parent) :
......@@ -857,6 +857,12 @@ void MAVLinkSimulationLink::writeBytes(const char* data, qint64 size)
// Log the amount and time written out for future data rate calculations.
// While this interface doesn't actually write any data to external systems,
// this data "transmit" here should still count towards the outgoing data rate.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch());
for (int i = 0; i < streampointer; i++)
......@@ -884,25 +890,12 @@ void MAVLinkSimulationLink::readBytes()
QByteArray b(data, len);
emit bytesReceived(this, b);
// if (len > 0)
// {
// qDebug() << "Simulation sent " << len << " bytes to groundstation: ";
// /* Increase write counter */
// //bitsSentTotal += size * 8;
// //Output all bytes as hex digits
// int i;
// for (i=0; i<len; i++)
// {
// unsigned int v=data[i];
// fprintf(stderr,"%02x ", v);
// }
// fprintf(stderr,"\n");
// }
// Log the amount and time received for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, len, QDateTime::currentMSecsSinceEpoch());
......@@ -254,6 +254,10 @@ void OpalLink::writeBytes(const char *bytes, qint64 length)
// Log the amount and time written out for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch());
......@@ -263,6 +267,9 @@ void OpalLink::readBytes()
emit bytesReceived(this, receiveBuffer->dequeue());
// Log the amount and time received for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, s, QDateTime::currentMSecsSinceEpoch());
void OpalLink::receiveMessage(mavlink_message_t message)
......@@ -404,35 +404,16 @@ Eigen::Matrix3f euler_to_wRo(double yaw, double pitch, double roll) {
void QGCXPlaneLink::writeBytes(const char* data, qint64 size)
if (!data) return;
//#define QGCXPlaneLink_DEBUG
#if 1
QString bytes;
QString ascii;
for (int i=0; i<size; i++)
// If socket exists and is connected, transmit the data
if (socket && connectState)
unsigned char v = data[i];
bytes.append(QString().sprintf("%02x ", v));
if (data[i] > 31 && data[i] < 127)
socket->writeDatagram(data, size, remoteHost, remotePort);
//qDebug() << "Sent" << size << "bytes to" << remoteHost.toString() << ":" << remotePort << "data:";
//qDebug() << bytes;
//qDebug() << "ASCII:" << ascii;
if (connectState && socket) socket->writeDatagram(data, size, remoteHost, remotePort);
* @brief Read a number of bytes from the interface.
* @param data Pointer to the data byte array to write the bytes to
* @param maxLength The maximum number of bytes to write
* @brief Read all pending packets from the interface.
void QGCXPlaneLink::readBytes()
......@@ -449,11 +430,6 @@ void QGCXPlaneLink::readBytes()
if (s > maxLength) std::cerr << __FILE__ << __LINE__ << " UDP datagram overflow, allowed to read less bytes than datagram size" << std::endl;
socket->readDatagram(data, maxLength, &sender, &senderPort);
QByteArray b(data, s);
/*// Print string
QString state(b)*/;
// Calculate the number of data segments a 36 bytes
// XPlane always has 5 bytes header: 'DATA@'
unsigned nsegs = (s-5)/36;
......@@ -23,19 +23,9 @@ SerialLink::SerialLink(QString portname, int baudRate, bool hardwareFlowControl,
int dataBits, int stopBits) :
// Initialize our arrays manually, cause C++<03 is dumb.
for (int i = 0; i < buffer_size; ++i)
inDataWriteAmounts[i] = 0;
inDataWriteTimes[i] = 0;
outDataWriteAmounts[i] = 0;
outDataWriteTimes[i] = 0;
// Get the name of the current port in use.
m_portName = portname.trimmed();
......@@ -212,8 +202,8 @@ void SerialLink::run()
// Log this written data for this timestep. If this value ends up being 0 due to
// write() failing, that's what we want as well.
QMutexLocker statsLocker(&m_statisticsMutex);
WriteDataStatsBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, numWritten, QDateTime::currentMSecsSinceEpoch());
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, numWritten, QDateTime::currentMSecsSinceEpoch());
//wait n msecs for data to be ready
......@@ -230,8 +220,8 @@ void SerialLink::run()
emit bytesReceived(this, readData);
// Log this data reception for this timestep
QMutexLocker statsLocker(&m_statisticsMutex);
WriteDataStatsBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, readData.length(), QDateTime::currentMSecsSinceEpoch());
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, readData.length(), QDateTime::currentMSecsSinceEpoch());
// Track the total amount of data read.
m_bytesRead += readData.length();
......@@ -293,23 +283,6 @@ void SerialLink::run()
void SerialLink::WriteDataStatsBuffer(quint64 *bytesBuffer, qint64 *timeBuffer, int *writeIndex, quint64 bytes, qint64 time)
int i = *writeIndex;
// Now write into the buffer, if there's no room, we just overwrite the first data point.
bytesBuffer[i] = bytes;
timeBuffer[i] = time;
// Increment and wrap the write index
if (i == buffer_size)
i = 0;
*writeIndex = i;
void SerialLink::writeBytes(const char* data, qint64 size)
if(m_port && m_port->isOpen()) {
......@@ -555,94 +528,6 @@ qint64 SerialLink::getConnectionSpeed() const
return dataRate;
qint64 SerialLink::getCurrentOutDataRate() const
const qint64 now = QDateTime::currentMSecsSinceEpoch();
// Limit the time we calculate to the recent past
const qint64 cutoff = now - stats_timespan;
// Grab the mutex for working with the stats variables
QMutexLocker statsLocker(&m_statisticsMutex);
// Now iterate through the buffer of all received data packets adding up all values
// within now and our cutof.
int index = outDataIndex;
qint64 totalBytes = 0;
qint64 totalTime = 0;
qint64 lastTime = 0;
int size = buffer_size;
while (size-- > 0)
// If this data is within our cutoff time, include it in our calculations.
// This also accounts for when the buffer is empty and filled with 0-times.
if (outDataWriteTimes[index] > cutoff && lastTime > 0) {
// Track the total time, using the previous time as our timeperiod.
totalTime += outDataWriteTimes[index] - lastTime;
totalBytes += outDataWriteAmounts[index];
// Track the last time sample for doing timespan calculations
lastTime = outDataWriteTimes[index];
// Increment and wrap the index if necessary.
if (++index == buffer_size)
index = 0;
// Return the final calculated value in bits / s, converted from bytes/ms.
qint64 dataRate = (totalTime != 0)?(qint64)((float)totalBytes * 8.0f / ((float)totalTime / 1000.0f)):0;
// Finally return our calculated data rate.
return dataRate;
qint64 SerialLink::getCurrentInDataRate() const
const qint64 now = QDateTime::currentMSecsSinceEpoch();
// Limit the time we calculate to the recent past
const qint64 cutoff = now - stats_timespan;
// Grab the mutex for working with the stats variables
QMutexLocker statsLocker(&m_statisticsMutex);
// Now iterate through the buffer of all received data packets adding up all values
// within now and our cutof.
int index = inDataIndex;
qint64 totalBytes = 0;
qint64 totalTime = 0;
qint64 lastTime = 0;
int size = buffer_size;
while (size-- > 0)
// If this data is within our cutoff time, include it in our calculations.
// This also accounts for when the buffer is empty and filled with 0-times.
if (inDataWriteTimes[index] > cutoff && lastTime > 0) {
// Track the total time, using the previous time as our timeperiod.
totalTime += inDataWriteTimes[index] - lastTime;
totalBytes += inDataWriteAmounts[index];
// Track the last time sample for doing timespan calculations
lastTime = inDataWriteTimes[index];
// Increment and wrap the index if necessary.
if (++index == buffer_size)
index = 0;
// Return the final calculated value in bits / s, converted from bytes/ms.
qint64 dataRate = (totalTime != 0)?(qint64)((float)totalBytes * 8.0f / ((float)totalTime / 1000.0f)):0;
// Finally return our calculated data rate.
return dataRate;
QString SerialLink::getPortName() const
return m_portName;
......@@ -68,10 +68,6 @@ public:
static const int poll_interval = SERIAL_POLL_INTERVAL; ///< Polling interval, defined in configuration.h
static const int buffer_size = 20; ///< Specify how many data points to capture for data rate calculations.
static const qint64 stats_timespan = 500; ///< Set the maximum age of samples to use for data calculations (ms).
/** @brief Get a list of the currently available ports */
QList<QString> getCurrentPorts();
......@@ -154,39 +150,11 @@ protected:
QString m_portName;
int m_timeout;
int m_id;
// Implement a simple circular buffer for storing when and how much data was received.
// Used for calculating the incoming data rate. Use with *StatsBuffer() functions.
int inDataIndex;
quint64 inDataWriteAmounts[buffer_size]; // In bytes
qint64 inDataWriteTimes[buffer_size]; // in ms
// Implement a simple circular buffer for storing when and how much data was transmit.
// Used for calculating the outgoing data rate. Use with *StatsBuffer() functions.
int outDataIndex;
quint64 outDataWriteAmounts[buffer_size]; // In bytes
qint64 outDataWriteTimes[buffer_size]; // in ms
quint64 m_connectionStartTime; // Connection start time (ms since epoch)
mutable QMutex m_statisticsMutex; // Mutex for accessing the statistics member variables (inData*,outData*,bitsSent,bitsReceived)
QMutex m_dataMutex; // Mutex for reading data from m_port
QMutex m_writeMutex; // Mutex for accessing the m_transmitBuffer.
QList<QString> m_ports;
* @brief WriteDataStatsBuffer Stores transmission times/amounts for statistics
* This function logs the send times and amounts of datas to the given circular buffers.
* This data is used for calculating the transmission rate.
* @param bytesBuffer The buffer to write the bytes value into.
* @param timeBuffer The buffer to write the time value into
* @param writeIndex The write index used for this buffer.
* @param bytes The amount of bytes transmit.
* @param time The time (in ms) this transmission occurred.
void WriteDataStatsBuffer(quint64 *bytesBuffer, qint64 *timeBuffer, int *writeIndex, quint64 bytes, qint64 time);
volatile bool m_stopp;
volatile bool m_reqReset;
QMutex m_stoppMutex; // Mutex for accessing m_stopp
......@@ -131,6 +131,10 @@ void TCPLink::writeBytes(const char* data, qint64 size)
writeDebugBytes(data, size);
socket->write(data, size);
// Log the amount and time written out for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch());
......@@ -152,6 +156,10 @@ void TCPLink::readBytes()
emit bytesReceived(this, buffer);
// Log the amount and time received for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, byteCount, QDateTime::currentMSecsSinceEpoch());
writeDebugBytes(, buffer.size());
......@@ -39,9 +39,10 @@ This file is part of the QGROUNDCONTROL project
#include <QHostInfo>
//#include <netinet/in.h>
UDPLink::UDPLink(QHostAddress host, quint16 port)
: socket(NULL)
UDPLink::UDPLink(QHostAddress host, quint16 port) :
this->host = host;
this->port = port;
this->connectState = false;
......@@ -197,6 +198,10 @@ void UDPLink::writeBytes(const char* data, qint64 size)
qDebug() << "ASCII:" << ascii;
socket->writeDatagram(data, size, currentHost, currentPort);
// Log the amount and time written out for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch());
......@@ -220,6 +225,11 @@ void UDPLink::readBytes()
// FIXME TODO Check if this method is better than retrieving the data by individual processes
emit bytesReceived(this, datagram);
// Log this data reception for this timestep
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, datagram.length(), QDateTime::currentMSecsSinceEpoch());
// // Echo data for debugging purposes
// std::cerr << __FILE__ << __LINE__ << "Received datagram:" << std::endl;
// int i;
......@@ -198,6 +198,9 @@ void XbeeLink::writeBytes(const char *bytes, qint64 length) // TO DO: delete th
if(!xbee_nsenddata(this->m_xbeeCon,data,length)) // return value of 0 is successful written
// Log the amount and time written out for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, length, QDateTime::currentMSecsSinceEpoch());
......@@ -216,9 +219,13 @@ void XbeeLink::readBytes()
for(unsigned int i=0;i<=xbeePkt->datalen;i++)
qDebug() << data;
emit bytesReceived(this,data);
emit bytesReceived(this, data);
// Log the amount and time received for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, data.length(), QDateTime::currentMSecsSinceEpoch());
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment