Commit 2739ae1c authored by Don Gagne's avatar Don Gagne

Restructure input/output data rate apis

parent ff9ba294
......@@ -100,9 +100,9 @@ public:
*
* @return The data rate of the interface in bits per second, 0 if unknown
**/
qint64 getCurrentInDataRate() const
qint64 getCurrentInputDataRate() const
{
return getCurrentDataRate(inDataIndex, inDataWriteTimes, inDataWriteAmounts);
return _getCurrentDataRate(_inDataIndex, _inDataWriteTimes, _inDataWriteAmounts);
}
/**
......@@ -113,9 +113,9 @@ public:
*
* @return The data rate of the interface in bits per second, 0 if unknown
**/
qint64 getCurrentOutDataRate() const
qint64 getCurrentOutputDataRate() const
{
return getCurrentDataRate(outDataIndex, outDataWriteTimes, outDataWriteAmounts);
return _getCurrentDataRate(_outDataIndex, _outDataWriteTimes, _outDataWriteAmounts);
}
/// mavlink channel to use for this link, as used by mavlink_parse_char. The mavlink channel is only
......@@ -141,25 +141,6 @@ public slots:
**/
virtual void writeBytes(const char *bytes, qint64 length) = 0;
protected:
// Links are only created by LinkManager so constructor is not public
LinkInterface() :
QThread(0),
_mavlinkChannelSet(false)
{
// Initialize everything for the data rate calculation buffers.
inDataIndex = 0;
outDataIndex = 0;
// Initialize our data rate buffers.
memset(inDataWriteAmounts, 0, sizeof(inDataWriteAmounts));
memset(inDataWriteTimes, 0, sizeof(inDataWriteTimes));
memset(outDataWriteAmounts,0, sizeof(outDataWriteAmounts));
memset(outDataWriteTimes, 0, sizeof(outDataWriteTimes));
qRegisterMetaType<LinkInterface*>("LinkInterface*");
}
signals:
/**
......@@ -195,25 +176,51 @@ signals:
void communicationUpdate(const QString& linkname, const QString& text);
protected:
// Links are only created by LinkManager so constructor is not public
LinkInterface() :
QThread(0),
_mavlinkChannelSet(false)
{
// Initialize everything for the data rate calculation buffers.
_inDataIndex = 0;
_outDataIndex = 0;
// Initialize our data rate buffers.
memset(_inDataWriteAmounts, 0, sizeof(_inDataWriteAmounts));
memset(_inDataWriteTimes, 0, sizeof(_inDataWriteTimes));
memset(_outDataWriteAmounts,0, sizeof(_outDataWriteAmounts));
memset(_outDataWriteTimes, 0, sizeof(_outDataWriteTimes));
qRegisterMetaType<LinkInterface*>("LinkInterface*");
}
static const int dataRateBufferSize = 20; ///< Specify how many data points to capture for data rate calculations.
static const qint64 dataRateCurrentTimespan = 500; ///< Set the maximum age of samples to use for data calculations (ms).
// Implement a simple circular buffer for storing when and how much data was received.
// Used for calculating the incoming data rate. Use with *StatsBuffer() functions.
int inDataIndex;
quint64 inDataWriteAmounts[dataRateBufferSize]; // In bytes
qint64 inDataWriteTimes[dataRateBufferSize]; // in ms
// Implement a simple circular buffer for storing when and how much data was transmit.
// Used for calculating the outgoing data rate. Use with *StatsBuffer() functions.
int outDataIndex;
quint64 outDataWriteAmounts[dataRateBufferSize]; // In bytes
qint64 outDataWriteTimes[dataRateBufferSize]; // in ms
/// This function logs the send times and amounts of datas for input. Data is used for calculating
/// the transmission rate.
/// @param byteCount Number of bytes received
/// @param time Time in ms send occured
void _logInputDataRate(quint64 byteCount, qint64 time) {
_logDataRateToBuffer(_inDataWriteAmounts, _inDataWriteTimes, &_inDataIndex, byteCount, time);
}
/// This function logs the send times and amounts of datas for output. Data is used for calculating
/// the transmission rate.
/// @param byteCount Number of bytes sent
/// @param time Time in ms receive occured
void _logOutputDataRate(quint64 byteCount, qint64 time) {
_logDataRateToBuffer(_outDataWriteAmounts, _outDataWriteTimes, &_outDataIndex, byteCount, time);
}
protected slots:
mutable QMutex dataRateMutex; // Mutex for accessing the data rate member variables
/**
* @brief Read a number of bytes from the interface.
*
* @param bytes The pointer to write the bytes to
* @param maxLength The maximum length which can be written
**/
virtual void readBytes() = 0;
private:
/**
* @brief logDataRateToBuffer Stores transmission times/amounts for statistics
*
......@@ -226,8 +233,10 @@ protected:
* @param bytes The amount of bytes transmit.
* @param time The time (in ms) this transmission occurred.
*/
static void logDataRateToBuffer(quint64 *bytesBuffer, qint64 *timeBuffer, int *writeIndex, quint64 bytes, qint64 time)
void _logDataRateToBuffer(quint64 *bytesBuffer, qint64 *timeBuffer, int *writeIndex, quint64 bytes, qint64 time)
{
QMutexLocker dataRateLocker(&_dataRateMutex);
int i = *writeIndex;
// Now write into the buffer, if there's no room, we just overwrite the first data point.
......@@ -236,7 +245,7 @@ protected:
// Increment and wrap the write index
++i;
if (i == dataRateBufferSize)
if (i == _dataRateBufferSize)
{
i = 0;
}
......@@ -248,7 +257,7 @@ protected:
*
* This function attempts to use the times and number of bytes transmit into a current data rate
* estimation. Since it needs to use timestamps to get the timeperiods over when the data was sent,
* this is effectively a global data rate over the last dataRateBufferSize - 1 data points. Also note
* this is effectively a global data rate over the last _dataRateBufferSize - 1 data points. Also note
* that data points older than NOW - dataRateCurrentTimespan are ignored.
*
* @param index The first valid sample in the data rate buffer. Refers to the oldest time sample.
......@@ -256,22 +265,22 @@ protected:
* @param dataWriteAmounts The amount of data (in bits) that was transferred.
* @return The bits per second of data transferrence of the interface over the last [-statsCurrentTimespan, 0] timespan.
*/
qint64 getCurrentDataRate(int index, const qint64 dataWriteTimes[], const quint64 dataWriteAmounts[]) const
qint64 _getCurrentDataRate(int index, const qint64 dataWriteTimes[], const quint64 dataWriteAmounts[]) const
{
const qint64 now = QDateTime::currentMSecsSinceEpoch();
// Limit the time we calculate to the recent past
const qint64 cutoff = now - dataRateCurrentTimespan;
const qint64 cutoff = now - _dataRateCurrentTimespan;
// Grab the mutex for working with the stats variables
QMutexLocker dataRateLocker(&dataRateMutex);
QMutexLocker dataRateLocker(&_dataRateMutex);
// Now iterate through the buffer of all received data packets adding up all values
// within now and our cutof.
qint64 totalBytes = 0;
qint64 totalTime = 0;
qint64 lastTime = 0;
int size = dataRateBufferSize;
int size = _dataRateBufferSize;
while (size-- > 0)
{
// If this data is within our cutoff time, include it in our calculations.
......@@ -286,7 +295,7 @@ protected:
lastTime = dataWriteTimes[index];
// Increment and wrap the index if necessary.
if (++index == dataRateBufferSize)
if (++index == _dataRateBufferSize)
{
index = 0;
}
......@@ -299,17 +308,6 @@ protected:
return dataRate;
}
protected slots:
/**
* @brief Read a number of bytes from the interface.
*
* @param bytes The pointer to write the bytes to
* @param maxLength The maximum length which can be written
**/
virtual void readBytes() = 0;
private:
/**
* @brief Connect this interface logically
*
......@@ -329,6 +327,24 @@ private:
bool _mavlinkChannelSet; ///< true: _mavlinkChannel has been set
uint8_t _mavlinkChannel; ///< mavlink channel to use for this link, as used by mavlink_parse_char
static const int _dataRateBufferSize = 20; ///< Specify how many data points to capture for data rate calculations.
static const qint64 _dataRateCurrentTimespan = 500; ///< Set the maximum age of samples to use for data calculations (ms).
// Implement a simple circular buffer for storing when and how much data was received.
// Used for calculating the incoming data rate. Use with *StatsBuffer() functions.
int _inDataIndex;
quint64 _inDataWriteAmounts[_dataRateBufferSize]; // In bytes
qint64 _inDataWriteTimes[_dataRateBufferSize]; // in ms
// Implement a simple circular buffer for storing when and how much data was transmit.
// Used for calculating the outgoing data rate. Use with *StatsBuffer() functions.
int _outDataIndex;
quint64 _outDataWriteAmounts[_dataRateBufferSize]; // In bytes
qint64 _outDataWriteTimes[_dataRateBufferSize]; // in ms
mutable QMutex _dataRateMutex; // Mutex for accessing the data rate member variables
};
typedef QSharedPointer<LinkInterface> SharedLinkInterface;
......
......@@ -94,9 +94,7 @@ void TCPLink::writeBytes(const char* data, qint64 size)
_writeDebugBytes(data, size);
#endif
_socket->write(data, size);
// Log the amount and time written out for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch());
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
}
/**
......@@ -114,9 +112,7 @@ void TCPLink::readBytes()
buffer.resize(byteCount);
_socket->read(buffer.data(), buffer.size());
emit bytesReceived(this, buffer);
// Log the amount and time received for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, byteCount, QDateTime::currentMSecsSinceEpoch());
_logInputDataRate(byteCount, QDateTime::currentMSecsSinceEpoch());
#ifdef TCPLINK_READWRITE_DEBUG
writeDebugBytes(buffer.data(), buffer.size());
#endif
......
......@@ -214,9 +214,7 @@ void UDPLink::_sendBytes(const char* data, qint64 size)
// This host is gone. Add to list to be removed
goneHosts.append(host);
}
// Log the amount and time written out for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch());
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
} while (_config->nextHost(host, port));
//-- Remove hosts that are no longer there
foreach (QString ghost, goneHosts) {
......@@ -242,9 +240,7 @@ void UDPLink::readBytes()
// FIXME TODO Check if this method is better than retrieving the data by individual processes
emit bytesReceived(this, datagram);
// Log this data reception for this timestep
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, datagram.length(), QDateTime::currentMSecsSinceEpoch());
_logInputDataRate(datagram.length(), QDateTime::currentMSecsSinceEpoch());
// // Echo data for debugging purposes
// std::cerr << __FILE__ << __LINE__ << "Received datagram:" << std::endl;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment