Commit 70124a1e authored by Don Gagne's avatar Don Gagne

Merge pull request #3079 from NaterGator/linkthreadfix

[WIP] Link thread safety enforcement
parents e0efcb20 fe8ff69e
......@@ -735,7 +735,7 @@ void Vehicle::_sendMessageOnLink(LinkInterface* link, mavlink_message_t message)
uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
int len = mavlink_msg_to_send_buffer(buffer, &message);
link->writeBytes((const char*)buffer, len);
link->writeBytesSafe((const char*)buffer, len);
_messagesSent++;
emit messagesSentChanged();
}
......
......@@ -89,19 +89,14 @@ QString BluetoothLink::getName() const
return _config->name();
}
void BluetoothLink::writeBytes(const char* data, qint64 size)
{
_sendBytes(data, size);
}
void BluetoothLink::_sendBytes(const char* data, qint64 size)
void BluetoothLink::_writeBytes(const QByteArray bytes)
{
if(_targetSocket)
{
if(_targetSocket->isWritable())
{
if(_targetSocket->write(data, size) > 0) {
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
if(_targetSocket->write(bytes) > 0) {
_logOutputDataRate(bytes.size(), QDateTime::currentMSecsSinceEpoch());
}
else
qWarning() << "Bluetooth write error";
......
......@@ -165,9 +165,7 @@ public:
LinkConfiguration* getLinkConfiguration() { return _config; }
public slots:
void readBytes ();
void writeBytes (const char* data, qint64 length);
void deviceConnected ();
void deviceDisconnected ();
void deviceError (QBluetoothSocket::SocketError error);
......@@ -193,7 +191,11 @@ private:
bool _hardwareConnect ();
void _restartConnection ();
void _sendBytes (const char* data, qint64 size);
private slots:
void _writeBytes (const QByteArray bytes);
private:
void _createSocket ();
private:
......
......@@ -151,16 +151,24 @@ public slots:
*
* If the underlying communication is packet oriented,
* one write command equals a datagram. In case of serial
* communication arbitrary byte lengths can be written
* communication arbitrary byte lengths can be written. The method ensures
* thread safety regardless of the underlying LinkInterface implementation.
*
* @param bytes The pointer to the byte array containing the data
* @param length The length of the data array
**/
virtual void writeBytes(const char *bytes, qint64 length) = 0;
void writeBytesSafe(const char *bytes, int length)
{
emit _invokeWriteBytes(QByteArray(bytes, length));
}
private slots:
virtual void _writeBytes(const QByteArray) = 0;
signals:
void autoconnectChanged(bool autoconnect);
void activeChanged(bool active);
void _invokeWriteBytes(QByteArray);
/**
* @brief New data arrived
......@@ -212,6 +220,7 @@ protected:
memset(_outDataWriteAmounts,0, sizeof(_outDataWriteAmounts));
memset(_outDataWriteTimes, 0, sizeof(_outDataWriteTimes));
QObject::connect(this, &LinkInterface::_invokeWriteBytes, this, &LinkInterface::_writeBytes);
qRegisterMetaType<LinkInterface*>("LinkInterface*");
}
......
......@@ -881,7 +881,7 @@ void LinkManager::_activeLinkCheck(void)
if (!found && link) {
// See if we can get an NSH prompt on this link
bool foundNSHPrompt = false;
link->writeBytes("\r", 1);
link->writeBytesSafe("\r", 1);
QSignalSpy spy(link, SIGNAL(bytesReceived(LinkInterface*, QByteArray)));
if (spy.wait(100)) {
QList<QVariant> arguments = spy.takeFirst();
......
......@@ -151,10 +151,9 @@ void LogReplayLink::_replayError(const QString& errorMsg)
}
/// Since this is log replay, we just drops writes on the floor
void LogReplayLink::writeBytes(const char* bytes, qint64 cBytes)
void LogReplayLink::_writeBytes(const QByteArray bytes)
{
Q_UNUSED(bytes);
Q_UNUSED(cBytes);
}
/// Parses a BigEndian quint64 timestamp
......
......@@ -98,8 +98,8 @@ public:
bool connect(void);
bool disconnect(void);
public slots:
virtual void writeBytes(const char *bytes, qint64 cBytes);
private slots:
virtual void _writeBytes(const QByteArray bytes);
signals:
void logFileStats(bool logTimestamped, int logDurationSecs, int binaryBaudRate);
......
......@@ -438,7 +438,7 @@ void MAVLinkProtocol::_sendMessage(LinkInterface* link, mavlink_message_t messag
if (link->isConnected())
{
// Send the portion of the buffer now occupied by the message
link->writeBytes((const char*)buffer, len);
link->writeBytesSafe((const char*)buffer, len);
}
}
......@@ -461,7 +461,7 @@ void MAVLinkProtocol::_sendMessage(LinkInterface* link, mavlink_message_t messag
if (link->isConnected())
{
// Send the portion of the buffer now occupied by the message
link->writeBytes((const char*)buffer, len);
link->writeBytesSafe((const char*)buffer, len);
}
}
......
......@@ -115,7 +115,6 @@ MockLink::MockLink(MockConfiguration* config)
moveToThread(this);
_loadParams();
QObject::connect(this, &MockLink::_incomingBytes, this, &MockLink::_handleIncomingBytes);
}
MockLink::~MockLink(void)
......@@ -314,16 +313,7 @@ void MockLink::respondWithMavlinkMessage(const mavlink_message_t& msg)
}
/// @brief Called when QGC wants to write bytes to the MAV
void MockLink::writeBytes(const char* bytes, qint64 cBytes)
{
// Package up the data so we can signal it over to the right thread
QByteArray byteArray(bytes, cBytes);
emit _incomingBytes(byteArray);
}
/// @brief Handles bytes from QGC on the thread
void MockLink::_handleIncomingBytes(const QByteArray bytes)
void MockLink::_writeBytes(const QByteArray bytes)
{
if (_inNSH) {
_handleIncomingNSHBytes(bytes.constData(), bytes.count());
......
......@@ -150,12 +150,8 @@ public:
static MockLink* startAPMArduCopterMockLink (bool sendStatusText);
static MockLink* startAPMArduPlaneMockLink (bool sendStatusText);
signals:
/// @brief Used internally to move data to the thread.
void _incomingBytes(const QByteArray bytes);
public slots:
virtual void writeBytes(const char *bytes, qint64 cBytes);
private slots:
virtual void _writeBytes(const QByteArray bytes);
private slots:
void _run1HzTasks(void);
......@@ -172,7 +168,6 @@ private:
// MockLink methods
void _sendHeartBeat(void);
void _handleIncomingBytes(const QByteArray bytes);
void _handleIncomingNSHBytes(const char* bytes, int cBytes);
void _handleIncomingMavlinkBytes(const uint8_t* bytes, int cBytes);
void _loadParams(void);
......
......@@ -234,7 +234,7 @@ void QGCFlightGearLink::updateControls(quint64 time, float rollAilerons, float p
{
QString state("%1\t%2\t%3\t%4\t%5\n");
state = state.arg(rollAilerons).arg(pitchElevator).arg(yawRudder).arg(true).arg(throttle);
writeBytes(state.toLatin1().constData(), state.length());
emit _invokeWriteBytes(state.toLatin1());
//qDebug() << "Updated controls" << rollAilerons << pitchElevator << yawRudder << throttle;
//qDebug() << "Updated controls" << state;
}
......@@ -244,13 +244,13 @@ void QGCFlightGearLink::updateControls(quint64 time, float rollAilerons, float p
}
}
void QGCFlightGearLink::writeBytes(const char* data, qint64 size)
void QGCFlightGearLink::_writeBytes(const QByteArray data)
{
//#define QGCFlightGearLink_DEBUG
#ifdef QGCFlightGearLink_DEBUG
QString bytes;
QString ascii;
for (int i=0; i<size; i++)
for (int i=0, size = data.size(); i<size; i++)
{
unsigned char v = data[i];
bytes.append(QString().sprintf("%02x ", v));
......@@ -267,7 +267,7 @@ void QGCFlightGearLink::writeBytes(const char* data, qint64 size)
qDebug() << bytes;
qDebug() << "ASCII:" << ascii;
#endif
if (connectState && _udpCommSocket) _udpCommSocket->writeDatagram(data, size, currentHost, currentPort);
if (connectState && _udpCommSocket) _udpCommSocket->writeDatagram(data, currentHost, currentPort);
}
/**
......
......@@ -121,13 +121,17 @@ public slots:
}
void readBytes();
private slots:
/**
* @brief Write a number of bytes to the interface.
*
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
void writeBytes(const char* data, qint64 length);
void _writeBytes(const QByteArray data);
public slots:
bool connectSimulation();
bool disconnectSimulation();
......
......@@ -64,13 +64,26 @@ public slots:
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
virtual void writeBytes(const char* data, qint64 length) = 0;
void writeBytesSafe(const char* data, int length)
{
emit _invokeWriteBytes(QByteArray(data, length));
}
virtual bool connectSimulation() = 0;
virtual bool disconnectSimulation() = 0;
private slots:
virtual void _writeBytes(const QByteArray) = 0;
protected:
virtual void setName(QString name) = 0;
QGCHilLink() :
QThread()
{
connect(this, &QGCHilLink::_invokeWriteBytes, this, &QGCHilLink::_writeBytes);
}
signals:
/**
* @brief This signal is emitted instantly when the link is connected
......@@ -130,6 +143,9 @@ signals:
/** @brief Sensor leve HIL state changed */
void sensorHilChanged(bool enabled);
/** @brief Helper signal to force execution on the correct thread */
void _invokeWriteBytes(QByteArray);
};
#endif // QGCHILLINK_H
......@@ -246,7 +246,7 @@ void QGCJSBSimLink::updateControls(quint64 time, float rollAilerons, float pitch
{
QString state("%1\t%2\t%3\t%4\t%5\n");
state = state.arg(rollAilerons).arg(pitchElevator).arg(yawRudder).arg(true).arg(throttle);
writeBytes(state.toLatin1().constData(), state.length());
emit _invokeWriteBytes(state.toLatin1());
}
else
{
......@@ -255,13 +255,13 @@ void QGCJSBSimLink::updateControls(quint64 time, float rollAilerons, float pitch
//qDebug() << "Updated controls" << state;
}
void QGCJSBSimLink::writeBytes(const char* data, qint64 size)
void QGCJSBSimLink::_writeBytes(const QByteArray data)
{
//#define QGCJSBSimLink_DEBUG
#ifdef QGCJSBSimLink_DEBUG
QString bytes;
QString ascii;
for (int i=0; i<size; i++)
for (int i=0, size = data.size(); i<size; i++)
{
unsigned char v = data[i];
bytes.append(QString().sprintf("%02x ", v));
......@@ -278,7 +278,7 @@ void QGCJSBSimLink::writeBytes(const char* data, qint64 size)
qDebug() << bytes;
qDebug() << "ASCII:" << ascii;
#endif
if (connectState && socket) socket->writeDatagram(data, size, currentHost, currentPort);
if (connectState && socket) socket->writeDatagram(data, currentHost, currentPort);
}
/**
......
......@@ -114,13 +114,17 @@ public slots:
}
void readBytes();
private slots:
/**
* @brief Write a number of bytes to the interface.
*
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
void writeBytes(const char* data, qint64 length);
void _writeBytes(const QByteArray data);
public slots:
bool connectSimulation();
bool disconnectSimulation();
......
......@@ -229,7 +229,7 @@ void QGCXPlaneLink::run()
strncpy(ip.str_port_them, localPortStr.toLatin1(), qMin((int)sizeof(ip.str_port_them), 6));
ip.use_ip = 1;
writeBytes((const char*)&ip, sizeof(ip));
writeBytesSafe((const char*)&ip, sizeof(ip));
_should_exit = false;
......@@ -395,10 +395,10 @@ void QGCXPlaneLink::updateControls(quint64 time, float rollAilerons, float pitch
{
// Ail / Elevon / Rudder
p.index = 12; // XPlane, wing sweep
writeBytes((const char*)&p, sizeof(p));
writeBytesSafe((const char*)&p, sizeof(p));
p.index = 8; // XPlane, joystick? why?
writeBytes((const char*)&p, sizeof(p));
writeBytesSafe((const char*)&p, sizeof(p));
p.index = 25; // Thrust
memset(p.f, 0, sizeof(p.f));
......@@ -408,13 +408,13 @@ void QGCXPlaneLink::updateControls(quint64 time, float rollAilerons, float pitch
p.f[3] = throttle;
// Throttle
writeBytes((const char*)&p, sizeof(p));
writeBytesSafe((const char*)&p, sizeof(p));
}
else
{
qDebug() << "Transmitting p.index = 25";
p.index = 25; // XPlane, throttle command.
writeBytes((const char*)&p, sizeof(p));
writeBytesSafe((const char*)&p, sizeof(p));
}
}
......@@ -448,14 +448,14 @@ Eigen::Matrix3f euler_to_wRo(double yaw, double pitch, double roll) {
return wRo;
}
void QGCXPlaneLink::writeBytes(const char* data, qint64 size)
void QGCXPlaneLink::_writeBytes(const QByteArray data)
{
if (!data) return;
if (data.isEmpty()) return;
// If socket exists and is connected, transmit the data
if (socket && connectState)
{
socket->writeDatagram(data, size, remoteHost, remotePort);
socket->writeDatagram(data, remoteHost, remotePort);
}
}
......@@ -899,7 +899,7 @@ void QGCXPlaneLink::setPositionAttitude(double lat, double lon, double alt, doub
pos.gear_flap_vect[1] = 0.0f;
pos.gear_flap_vect[2] = 0.0f;
writeBytes((const char*)&pos, sizeof(pos));
writeBytesSafe((const char*)&pos, sizeof(pos));
// pos.header[0] = 'V';
// pos.header[1] = 'E';
......@@ -917,7 +917,7 @@ void QGCXPlaneLink::setPositionAttitude(double lat, double lon, double alt, doub
// pos.gear_flap_vect[1] = -999;
// pos.gear_flap_vect[2] = -999;
// writeBytes((const char*)&pos, sizeof(pos));
// writeBytesSafe((const char*)&pos, sizeof(pos));
}
/**
......
......@@ -126,13 +126,17 @@ public slots:
void processError(QProcess::ProcessError err);
void readBytes();
private slots:
/**
* @brief Write a number of bytes to the interface.
*
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
void writeBytes(const char* data, qint64 length);
void _writeBytes(const QByteArray data);
public slots:
bool connectSimulation();
bool disconnectSimulation();
/**
......
......@@ -82,11 +82,11 @@ bool SerialLink::_isBootloader()
return false;
}
void SerialLink::writeBytes(const char* data, qint64 size)
void SerialLink::_writeBytes(const QByteArray data)
{
if(_port && _port->isOpen()) {
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
_port->write(data, size);
_logOutputDataRate(data.size(), QDateTime::currentMSecsSinceEpoch());
_port->write(data);
} else {
// Error occured
_emitLinkError(tr("Could not send data - link %1 is disconnected!").arg(getName()));
......
......@@ -157,15 +157,16 @@ public:
bool connect(void);
bool disconnect(void);
public slots:
private slots:
/**
* @brief Write a number of bytes to the interface.
*
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
void writeBytes(const char* data, qint64 length);
void _writeBytes(const QByteArray data);
public slots:
void linkError(QSerialPort::SerialPortError error);
protected:
......
......@@ -65,11 +65,11 @@ void TCPLink::run()
}
#ifdef TCPLINK_READWRITE_DEBUG
void TCPLink::_writeDebugBytes(const char *data, qint16 size)
void TCPLink::_writeDebugBytes(const QByteArray data)
{
QString bytes;
QString ascii;
for (int i=0; i<size; i++)
for (int i=0, size = data.size(); i<size; i++)
{
unsigned char v = data[i];
bytes.append(QString().sprintf("%02x ", v));
......@@ -88,13 +88,16 @@ void TCPLink::_writeDebugBytes(const char *data, qint16 size)
}
#endif
void TCPLink::writeBytes(const char* data, qint64 size)
void TCPLink::_writeBytes(const QByteArray data)
{
#ifdef TCPLINK_READWRITE_DEBUG
_writeDebugBytes(data, size);
_writeDebugBytes(data);
#endif
_socket->write(data, size);
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
if (!_socket)
return;
_socket->write(data);
_logOutputDataRate(data.size(), QDateTime::currentMSecsSinceEpoch());
}
/**
......
......@@ -152,10 +152,11 @@ public:
bool connect(void);
bool disconnect(void);
public slots:
private slots:
// From LinkInterface
void writeBytes(const char* data, qint64 length);
void _writeBytes(const QByteArray data);
public slots:
void waitForBytesWritten(int msecs);
void waitForReadyRead(int msecs);
......@@ -182,7 +183,7 @@ private:
void _restartConnection();
#ifdef TCPLINK_READWRITE_DEBUG
void _writeDebugBytes(const char *data, qint16 size);
void _writeDebugBytes(const QByteArray data);
#endif
TCPConfiguration* _config;
......
......@@ -29,12 +29,6 @@ This file is part of the QGROUNDCONTROL project
*/
#include <QtGlobal>
#if QT_VERSION > 0x050401
#define UDP_BROKEN_SIGNAL 1
#else
#define UDP_BROKEN_SIGNAL 0
#endif
#include <QTimer>
#include <QList>
#include <QDebug>
......@@ -111,9 +105,6 @@ UDPLink::~UDPLink()
quit();
// Wait for it to exit
wait();
while(_outQueue.count() > 0) {
delete _outQueue.dequeue();
}
this->deleteLater();
}
......@@ -124,25 +115,7 @@ UDPLink::~UDPLink()
void UDPLink::run()
{
if(_hardwareConnect()) {
if(UDP_BROKEN_SIGNAL) {
bool loop = false;
while(true) {
//-- Anything to read?
loop = _socket->hasPendingDatagrams();
if(loop) {
readBytes();
}
//-- Loop right away if busy
if((_dequeBytes() || loop) && _running)
continue;
if(!_running)
break;
//-- Settle down (it gets here if there is nothing to read or write)
_socket->waitForReadyRead(5);
}
} else {
exec();
}
exec();
}
if (_socket) {
_deregisterZeroconf();
......@@ -174,36 +147,11 @@ void UDPLink::removeHost(const QString& host)
_config->removeHost(host);
}
void UDPLink::writeBytes(const char* data, qint64 size)
void UDPLink::_writeBytes(const QByteArray data)
{
if (!_socket) {
if (!_socket)
return;
}
if(UDP_BROKEN_SIGNAL) {
QByteArray* qdata = new QByteArray(data, size);
QMutexLocker lock(&_mutex);
_outQueue.enqueue(qdata);
} else {
_sendBytes(data, size);
}
}
bool UDPLink::_dequeBytes()
{
QMutexLocker lock(&_mutex);
if(_outQueue.count() > 0) {
QByteArray* qdata = _outQueue.dequeue();
lock.unlock();
_sendBytes(qdata->data(), qdata->size());
delete qdata;
lock.relock();
}
return (_outQueue.count() > 0);
}
void UDPLink::_sendBytes(const char* data, qint64 size)
{
QStringList goneHosts;
// Send to all connected systems
QString host;
......@@ -211,7 +159,7 @@ void UDPLink::_sendBytes(const char* data, qint64 size)
if(_config->firstHost(host, port)) {
do {
QHostAddress currentHost(host);
if(_socket->writeDatagram(data, size, currentHost, (quint16)port) < 0) {
if(_socket->writeDatagram(data, currentHost, (quint16)port) < 0) {
// This host is gone. Add to list to be removed
// We should keep track of hosts that were manually added (static) and
// hosts that were added because we heard from them (dynamic). Only
......@@ -225,7 +173,7 @@ void UDPLink::_sendBytes(const char* data, qint64 size)
// "host not there" takes time too regardless of size of data. In fact,
// 1 byte or "UDP frame size" bytes are the same as that's the data
// unit sent by UDP.
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
_logOutputDataRate(data.size(), QDateTime::currentMSecsSinceEpoch());
}
} while (_config->nextHost(host, port));
//-- Remove hosts that are no longer there
......@@ -260,8 +208,6 @@ void UDPLink::readBytes()
// would trigger this.
// Add host to broadcast list if not yet present, or update its port
_config->addHost(sender.toString(), (int)senderPort);
if(UDP_BROKEN_SIGNAL && !_running)
break;
}
//-- Send whatever is left
if(databuffer.size()) {
......@@ -326,10 +272,7 @@ bool UDPLink::_hardwareConnect()
_socket->setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, 512 * 1024);
#endif
_registerZeroconf(_config->localPort(), kZeroconfRegistration);
//-- Connect signal if this version of Qt is not broken
if(!UDP_BROKEN_SIGNAL) {
QObject::connect(_socket, &QUdpSocket::readyRead, this, &UDPLink::readBytes);
}
QObject::connect(_socket, &QUdpSocket::readyRead, this, &UDPLink::readBytes);
emit connected();
} else {
emit communicationError("UDP Link Error", "Error binding UDP port");
......
......@@ -201,13 +201,14 @@ public slots:
void readBytes();
private slots:
/*!
* @brief Write a number of bytes to the interface.
*
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
void writeBytes(const char* data, qint64 length);
void _writeBytes(const QByteArray data);
protected:
......@@ -235,12 +236,6 @@ private:
#endif
bool _running;
QMutex _mutex;
QQueue<QByteArray*> _outQueue;
bool _dequeBytes ();
void _sendBytes (const char* data, qint64 size);
};
#endif // UDPLINK_H
......@@ -172,17 +172,11 @@ void XbeeLink::_disconnect(void)
emit disconnected();
}
void XbeeLink::writeBytes(const char *bytes, qint64 length) // TO DO: delete the data array
void XbeeLink::_writeBytes(const QByteArray bytes)
{
char *data;
data = new char[length];
for(long i=0;i<length;i++)
if(!xbee_nsenddata(this->m_xbeeCon,const_cast<char*>(bytes.data()),bytes.size())) // return value of 0 is successful written
{
data[i] = bytes[i];
}
if(!xbee_nsenddata(this->m_xbeeCon,data,length)) // return value of 0 is successful written
{
_logOutputDataRate(length, QDateTime::currentMSecsSinceEpoch());
_logOutputDataRate(bytes.size(), QDateTime::currentMSecsSinceEpoch());
}
else
{
......
......@@ -45,8 +45,8 @@ public:
qint64 getCurrentOutDataRate() const;
qint64 getCurrentInDataRate() const;
public slots: // virtual functions from LinkInterface
void writeBytes(const char *bytes, qint64 length);
private slots: // virtual functions from LinkInterface
void _writeBytes(const QByteArray bytes);
protected slots: // virtual functions from LinkInterface
void readBytes();
......
......@@ -154,7 +154,7 @@ void TCPLinkUnitTest::_connectSucceed_test(void)
const char* bytesWrittenSignal = SIGNAL(bytesWritten(qint64));
MultiSignalSpy bytesWrittenSpy;
QCOMPARE(bytesWrittenSpy.init(_link->getSocket(), &bytesWrittenSignal, 1), true);
_link->writeBytes(bytesOut.data(), bytesOut.size());
_link->writeBytesSafe(bytesOut.data(), bytesOut.size());
_multiSpy->clearAllSignals();
// We emit this signal such that it will be queued and run on the TCPLink thread. This in turn
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment