Commit 68fd43e3 authored by Don Gagne's avatar Don Gagne

Merge pull request #1692 from DonLakeFlyer/SerialMainThread

Serial link runs on main thread
parents 919955e7 ffa4a2c6
......@@ -100,9 +100,9 @@ public:
*
* @return The data rate of the interface in bits per second, 0 if unknown
**/
qint64 getCurrentInDataRate() const
qint64 getCurrentInputDataRate() const
{
return getCurrentDataRate(inDataIndex, inDataWriteTimes, inDataWriteAmounts);
return _getCurrentDataRate(_inDataIndex, _inDataWriteTimes, _inDataWriteAmounts);
}
/**
......@@ -113,9 +113,9 @@ public:
*
* @return The data rate of the interface in bits per second, 0 if unknown
**/
qint64 getCurrentOutDataRate() const
qint64 getCurrentOutputDataRate() const
{
return getCurrentDataRate(outDataIndex, outDataWriteTimes, outDataWriteAmounts);
return _getCurrentDataRate(_outDataIndex, _outDataWriteTimes, _outDataWriteAmounts);
}
/// mavlink channel to use for this link, as used by mavlink_parse_char. The mavlink channel is only
......@@ -141,25 +141,6 @@ public slots:
**/
virtual void writeBytes(const char *bytes, qint64 length) = 0;
protected:
// Links are only created by LinkManager so constructor is not public
LinkInterface() :
QThread(0),
_mavlinkChannelSet(false)
{
// Initialize everything for the data rate calculation buffers.
inDataIndex = 0;
outDataIndex = 0;
// Initialize our data rate buffers.
memset(inDataWriteAmounts, 0, sizeof(inDataWriteAmounts));
memset(inDataWriteTimes, 0, sizeof(inDataWriteTimes));
memset(outDataWriteAmounts,0, sizeof(outDataWriteAmounts));
memset(outDataWriteTimes, 0, sizeof(outDataWriteTimes));
qRegisterMetaType<LinkInterface*>("LinkInterface*");
}
signals:
/**
......@@ -195,25 +176,51 @@ signals:
void communicationUpdate(const QString& linkname, const QString& text);
protected:
// Links are only created by LinkManager so constructor is not public
LinkInterface() :
QThread(0),
_mavlinkChannelSet(false)
{
// Initialize everything for the data rate calculation buffers.
_inDataIndex = 0;
_outDataIndex = 0;
// Initialize our data rate buffers.
memset(_inDataWriteAmounts, 0, sizeof(_inDataWriteAmounts));
memset(_inDataWriteTimes, 0, sizeof(_inDataWriteTimes));
memset(_outDataWriteAmounts,0, sizeof(_outDataWriteAmounts));
memset(_outDataWriteTimes, 0, sizeof(_outDataWriteTimes));
qRegisterMetaType<LinkInterface*>("LinkInterface*");
}
static const int dataRateBufferSize = 20; ///< Specify how many data points to capture for data rate calculations.
static const qint64 dataRateCurrentTimespan = 500; ///< Set the maximum age of samples to use for data calculations (ms).
// Implement a simple circular buffer for storing when and how much data was received.
// Used for calculating the incoming data rate. Use with *StatsBuffer() functions.
int inDataIndex;
quint64 inDataWriteAmounts[dataRateBufferSize]; // In bytes
qint64 inDataWriteTimes[dataRateBufferSize]; // in ms
// Implement a simple circular buffer for storing when and how much data was transmit.
// Used for calculating the outgoing data rate. Use with *StatsBuffer() functions.
int outDataIndex;
quint64 outDataWriteAmounts[dataRateBufferSize]; // In bytes
qint64 outDataWriteTimes[dataRateBufferSize]; // in ms
/// This function logs the send times and amounts of datas for input. Data is used for calculating
/// the transmission rate.
/// @param byteCount Number of bytes received
/// @param time Time in ms send occured
void _logInputDataRate(quint64 byteCount, qint64 time) {
_logDataRateToBuffer(_inDataWriteAmounts, _inDataWriteTimes, &_inDataIndex, byteCount, time);
}
/// This function logs the send times and amounts of datas for output. Data is used for calculating
/// the transmission rate.
/// @param byteCount Number of bytes sent
/// @param time Time in ms receive occured
void _logOutputDataRate(quint64 byteCount, qint64 time) {
_logDataRateToBuffer(_outDataWriteAmounts, _outDataWriteTimes, &_outDataIndex, byteCount, time);
}
protected slots:
mutable QMutex dataRateMutex; // Mutex for accessing the data rate member variables
/**
* @brief Read a number of bytes from the interface.
*
* @param bytes The pointer to write the bytes to
* @param maxLength The maximum length which can be written
**/
virtual void readBytes() = 0;
private:
/**
* @brief logDataRateToBuffer Stores transmission times/amounts for statistics
*
......@@ -226,8 +233,10 @@ protected:
* @param bytes The amount of bytes transmit.
* @param time The time (in ms) this transmission occurred.
*/
static void logDataRateToBuffer(quint64 *bytesBuffer, qint64 *timeBuffer, int *writeIndex, quint64 bytes, qint64 time)
void _logDataRateToBuffer(quint64 *bytesBuffer, qint64 *timeBuffer, int *writeIndex, quint64 bytes, qint64 time)
{
QMutexLocker dataRateLocker(&_dataRateMutex);
int i = *writeIndex;
// Now write into the buffer, if there's no room, we just overwrite the first data point.
......@@ -236,7 +245,7 @@ protected:
// Increment and wrap the write index
++i;
if (i == dataRateBufferSize)
if (i == _dataRateBufferSize)
{
i = 0;
}
......@@ -248,7 +257,7 @@ protected:
*
* This function attempts to use the times and number of bytes transmit into a current data rate
* estimation. Since it needs to use timestamps to get the timeperiods over when the data was sent,
* this is effectively a global data rate over the last dataRateBufferSize - 1 data points. Also note
* this is effectively a global data rate over the last _dataRateBufferSize - 1 data points. Also note
* that data points older than NOW - dataRateCurrentTimespan are ignored.
*
* @param index The first valid sample in the data rate buffer. Refers to the oldest time sample.
......@@ -256,22 +265,22 @@ protected:
* @param dataWriteAmounts The amount of data (in bits) that was transferred.
* @return The bits per second of data transferrence of the interface over the last [-statsCurrentTimespan, 0] timespan.
*/
qint64 getCurrentDataRate(int index, const qint64 dataWriteTimes[], const quint64 dataWriteAmounts[]) const
qint64 _getCurrentDataRate(int index, const qint64 dataWriteTimes[], const quint64 dataWriteAmounts[]) const
{
const qint64 now = QDateTime::currentMSecsSinceEpoch();
// Limit the time we calculate to the recent past
const qint64 cutoff = now - dataRateCurrentTimespan;
const qint64 cutoff = now - _dataRateCurrentTimespan;
// Grab the mutex for working with the stats variables
QMutexLocker dataRateLocker(&dataRateMutex);
QMutexLocker dataRateLocker(&_dataRateMutex);
// Now iterate through the buffer of all received data packets adding up all values
// within now and our cutof.
qint64 totalBytes = 0;
qint64 totalTime = 0;
qint64 lastTime = 0;
int size = dataRateBufferSize;
int size = _dataRateBufferSize;
while (size-- > 0)
{
// If this data is within our cutoff time, include it in our calculations.
......@@ -286,7 +295,7 @@ protected:
lastTime = dataWriteTimes[index];
// Increment and wrap the index if necessary.
if (++index == dataRateBufferSize)
if (++index == _dataRateBufferSize)
{
index = 0;
}
......@@ -299,17 +308,6 @@ protected:
return dataRate;
}
protected slots:
/**
* @brief Read a number of bytes from the interface.
*
* @param bytes The pointer to write the bytes to
* @param maxLength The maximum length which can be written
**/
virtual void readBytes() = 0;
private:
/**
* @brief Connect this interface logically
*
......@@ -329,6 +327,24 @@ private:
bool _mavlinkChannelSet; ///< true: _mavlinkChannel has been set
uint8_t _mavlinkChannel; ///< mavlink channel to use for this link, as used by mavlink_parse_char
static const int _dataRateBufferSize = 20; ///< Specify how many data points to capture for data rate calculations.
static const qint64 _dataRateCurrentTimespan = 500; ///< Set the maximum age of samples to use for data calculations (ms).
// Implement a simple circular buffer for storing when and how much data was received.
// Used for calculating the incoming data rate. Use with *StatsBuffer() functions.
int _inDataIndex;
quint64 _inDataWriteAmounts[_dataRateBufferSize]; // In bytes
qint64 _inDataWriteTimes[_dataRateBufferSize]; // in ms
// Implement a simple circular buffer for storing when and how much data was transmit.
// Used for calculating the outgoing data rate. Use with *StatsBuffer() functions.
int _outDataIndex;
quint64 _outDataWriteAmounts[_dataRateBufferSize]; // In bytes
qint64 _outDataWriteTimes[_dataRateBufferSize]; // in ms
mutable QMutex _dataRateMutex; // Mutex for accessing the data rate member variables
};
typedef QSharedPointer<LinkInterface> SharedLinkInterface;
......
......@@ -37,9 +37,6 @@ SerialLink::SerialLink(SerialConfiguration* config)
Q_ASSERT(config != NULL);
_config = config;
_config->setLink(this);
// We're doing it wrong - because the Qt folks got the API wrong:
// http://blog.qt.digia.com/blog/2010/06/17/youre-doing-it-wrong/
moveToThread(this);
qCDebug(SerialLinkLog) << "Create SerialLink " << config->portName() << config->baud() << config->flowControl()
<< config->parity() << config->dataBits() << config->stopBits();
......@@ -59,10 +56,6 @@ SerialLink::~SerialLink()
_disconnect();
if(_port) delete _port;
_port = NULL;
// Tell the thread to exit
quit();
// Wait for it to exit
wait();
}
bool SerialLink::_isBootloader()
......@@ -87,130 +80,11 @@ bool SerialLink::_isBootloader()
return false;
}
/**
* @brief Runs the thread
*
**/
void SerialLink::run()
{
#ifndef __android__
// Initialize the connection
if (!_hardwareConnect(_type)) {
// Need to error out here.
QString err("Could not create port.");
if (_port) {
err = _port->errorString();
}
_emitLinkError("Error connecting: " + err);
return;
}
#endif
qint64 msecs = QDateTime::currentMSecsSinceEpoch();
qint64 initialmsecs = QDateTime::currentMSecsSinceEpoch();
quint64 bytes = 0;
qint64 timeout = 5000;
int linkErrorCount = 0;
// Qt way to make clear what a while(1) loop does
forever {
{
QMutexLocker locker(&this->_stoppMutex);
if (_stopp) {
_stopp = false;
break; // exit the thread
}
}
// Write all our buffered data out the serial port.
if (_transmitBuffer.count() > 0) {
_writeMutex.lock();
int numWritten = _port->write(_transmitBuffer);
bool txSuccess = _port->flush();
txSuccess |= _port->waitForBytesWritten(10);
if (!txSuccess || (numWritten != _transmitBuffer.count())) {
linkErrorCount++;
qCDebug(SerialLinkLog) << "TX Error! written:" << txSuccess << "wrote" << numWritten << ", asked for " << _transmitBuffer.count() << "bytes";
}
else {
// Since we were successful, reset out error counter.
linkErrorCount = 0;
}
// Now that we transmit all of the data in the transmit buffer, flush it.
_transmitBuffer = _transmitBuffer.remove(0, numWritten);
_writeMutex.unlock();
// Log this written data for this timestep. If this value ends up being 0 due to
// write() failing, that's what we want as well.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, numWritten, QDateTime::currentMSecsSinceEpoch());
}
//wait n msecs for data to be ready
//[TODO][BB] lower to SerialLink::poll_interval?
_dataMutex.lock();
bool success = _port->waitForReadyRead(20);
if (success) {
QByteArray readData = _port->readAll();
while (_port->waitForReadyRead(10))
readData += _port->readAll();
_dataMutex.unlock();
if (readData.length() > 0) {
emit bytesReceived(this, readData);
// Log this data reception for this timestep
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, readData.length(), QDateTime::currentMSecsSinceEpoch());
// Track the total amount of data read.
_bytesRead += readData.length();
linkErrorCount = 0;
}
}
else {
_dataMutex.unlock();
linkErrorCount++;
}
if (bytes != _bytesRead) { // i.e things are good and data is being read.
bytes = _bytesRead;
msecs = QDateTime::currentMSecsSinceEpoch();
}
else {
if (QDateTime::currentMSecsSinceEpoch() - msecs > timeout) {
//It's been 10 seconds since the last data came in. Reset and try again
msecs = QDateTime::currentMSecsSinceEpoch();
if (msecs - initialmsecs > 25000) {
//After initial 25 seconds, timeouts are increased to 30 seconds.
//This prevents temporary silences from things like calibration commands
//from screwing things up. In all reality, timeouts should be enabled/disabled
//for events like that on a case by case basis.
//TODO ^^
timeout = 30000;
}
}
}
QGC::SLEEP::msleep(SerialLink::poll_interval);
} // end of forever
if (_port) {
qCDebug(SerialLinkLog) << "Closing Port #" << __LINE__ << _port->portName();
_port->close();
delete _port;
_port = NULL;
}
}
void SerialLink::writeBytes(const char* data, qint64 size)
{
if(_port && _port->isOpen()) {
QByteArray byteArray(data, size);
_writeMutex.lock();
_transmitBuffer.append(byteArray);
_writeMutex.unlock();
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
_port->write(data, size);
} else {
// Error occured
_emitLinkError(tr("Could not send data - link %1 is disconnected!").arg(getName()));
......@@ -231,10 +105,12 @@ void SerialLink::readBytes()
_dataMutex.lock();
qint64 numBytes = _port->bytesAvailable();
if(numBytes > 0) {
if (numBytes > 0) {
/* Read as much data in buffer as possible without overflow */
if(maxLength < numBytes) numBytes = maxLength;
_logInputDataRate(numBytes, QDateTime::currentMSecsSinceEpoch());
_port->read(data, numBytes);
QByteArray b(data, numBytes);
emit bytesReceived(this, b);
......@@ -250,17 +126,12 @@ void SerialLink::readBytes()
**/
bool SerialLink::_disconnect(void)
{
if (isRunning())
{
{
QMutexLocker locker(&_stoppMutex);
_stopp = true;
}
wait(); // This will terminate the thread and close the serial port
} else {
_transmitBuffer.clear(); //clear the output buffer to avoid sending garbage at next connect
qCDebug(SerialLinkLog) << "Already disconnected";
if (_port) {
_port->close();
delete _port;
_port = NULL;
}
#ifdef __android__
LinkManager::instance()->suspendConfigurationUpdates(false);
#endif
......@@ -275,14 +146,13 @@ bool SerialLink::_disconnect(void)
bool SerialLink::_connect(void)
{
qCDebug(SerialLinkLog) << "CONNECT CALLED";
if (isRunning())
_disconnect();
{
QMutexLocker locker(&this->_stoppMutex);
_stopp = false;
}
_disconnect();
#ifdef __android__
LinkManager::instance()->suspendConfigurationUpdates(true);
#endif
// Initialize the connection
if (!_hardwareConnect(_type)) {
// Need to error out here.
......@@ -293,8 +163,6 @@ bool SerialLink::_connect(void)
_emitLinkError("Error connecting: " + err);
return false;
}
#endif
start(HighPriority);
return true;
}
......@@ -343,12 +211,12 @@ bool SerialLink::_hardwareConnect(QString &type)
emit communicationUpdate(getName(),"Error opening port: " + _config->portName());
return false; // couldn't create serial port.
}
_port->moveToThread(this);
// We need to catch this signal and then emit disconnected. You can't connect
// signal to signal otherwise disonnected will have the wrong QObject::Sender
QObject::connect(_port, SIGNAL(aboutToClose()), this, SLOT(_rerouteDisconnected()));
QObject::connect(_port, SIGNAL(error(QSerialPort::SerialPortError)), this, SLOT(linkError(QSerialPort::SerialPortError)));
QObject::connect(_port, &QIODevice::readyRead, this, &SerialLink::_readBytes);
// port->setCommTimeouts(QSerialPort::CtScheme_NonBlockingRead);
......@@ -371,6 +239,8 @@ bool SerialLink::_hardwareConnect(QString &type)
if (!_port->isOpen() ) {
emit communicationUpdate(getName(),"Error opening port: " + _port->errorString());
_port->close();
delete _port;
_port = NULL;
return false; // couldn't open serial port
}
......@@ -390,6 +260,18 @@ bool SerialLink::_hardwareConnect(QString &type)
return true; // successful connection
}
void SerialLink::_readBytes(void)
{
qint64 byteCount = _port->bytesAvailable();
if (byteCount)
{
QByteArray buffer;
buffer.resize(byteCount);
_port->read(buffer.data(), buffer.size());
emit bytesReceived(this, buffer);
}
}
void SerialLink::linkError(QSerialPort::SerialPortError error)
{
if (error != QSerialPort::NoError) {
......@@ -493,6 +375,7 @@ void SerialLink::_rerouteDisconnected(void)
void SerialLink::_emitLinkError(const QString& errorMsg)
{
QString msg("Error on link %1. %2");
qDebug() << errorMsg;
emit communicationError(tr("Link Error"), msg.arg(getName()).arg(errorMsg));
}
......
......@@ -124,10 +124,6 @@ public:
bool connect(void);
bool disconnect(void);
void run();
static const int poll_interval = SERIAL_POLL_INTERVAL; ///< Polling interval, defined in QGCConfig.h
signals: //[TODO] Refactor to Linkinterface
void updateLink(LinkInterface*);
......@@ -154,6 +150,7 @@ protected:
private slots:
void _rerouteDisconnected(void);
void _readBytes(void);
private:
// Links are only created/destroyed by LinkManager so constructor/destructor is not public
......
......@@ -94,9 +94,7 @@ void TCPLink::writeBytes(const char* data, qint64 size)
_writeDebugBytes(data, size);
#endif
_socket->write(data, size);
// Log the amount and time written out for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch());
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
}
/**
......@@ -114,9 +112,7 @@ void TCPLink::readBytes()
buffer.resize(byteCount);
_socket->read(buffer.data(), buffer.size());
emit bytesReceived(this, buffer);
// Log the amount and time received for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, byteCount, QDateTime::currentMSecsSinceEpoch());
_logInputDataRate(byteCount, QDateTime::currentMSecsSinceEpoch());
#ifdef TCPLINK_READWRITE_DEBUG
writeDebugBytes(buffer.data(), buffer.size());
#endif
......
......@@ -214,9 +214,7 @@ void UDPLink::_sendBytes(const char* data, qint64 size)
// This host is gone. Add to list to be removed
goneHosts.append(host);
}
// Log the amount and time written out for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch());
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
} while (_config->nextHost(host, port));
//-- Remove hosts that are no longer there
foreach (QString ghost, goneHosts) {
......@@ -242,9 +240,7 @@ void UDPLink::readBytes()
// FIXME TODO Check if this method is better than retrieving the data by individual processes
emit bytesReceived(this, datagram);
// Log this data reception for this timestep
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, datagram.length(), QDateTime::currentMSecsSinceEpoch());
_logInputDataRate(datagram.length(), QDateTime::currentMSecsSinceEpoch());
// // Echo data for debugging purposes
// std::cerr << __FILE__ << __LINE__ << "Received datagram:" << std::endl;
......
......@@ -184,9 +184,7 @@ void XbeeLink::writeBytes(const char *bytes, qint64 length) // TO DO: delete th
}
if(!xbee_nsenddata(this->m_xbeeCon,data,length)) // return value of 0 is successful written
{
// Log the amount and time written out for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, length, QDateTime::currentMSecsSinceEpoch());
_logOutputDataRate(length, QDateTime::currentMSecsSinceEpoch());
}
else
{
......@@ -207,11 +205,8 @@ void XbeeLink::readBytes()
data.push_back(xbeePkt->data[i]);
}
emit bytesReceived(this, data);
// Log the amount and time received for future data rate calculations.
QMutexLocker dataRateLocker(&dataRateMutex);
logDataRateToBuffer(inDataWriteAmounts, inDataWriteTimes, &inDataIndex, data.length(), QDateTime::currentMSecsSinceEpoch());
_logInputDataRate(data.length(), QDateTime::currentMSecsSinceEpoch());
emit bytesReceived(this, data);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment