Commit 1d0bf627 authored by Nate Weibley's avatar Nate Weibley

Force link writes to occur via a signal/slot connection.

This mechanism automatically queues data into the correct thread where necessary, otherwise the method is invoked directly with low overhead.

At the same time remove the UDP socket event workaround. It is not a problem.
parent 20a51dfe
......@@ -733,7 +733,7 @@ void Vehicle::_sendMessageOnLink(LinkInterface* link, mavlink_message_t message)
uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
int len = mavlink_msg_to_send_buffer(buffer, &message);
link->writeBytes((const char*)buffer, len);
link->writeBytesSafe((const char*)buffer, len);
_messagesSent++;
emit messagesSentChanged();
}
......
......@@ -89,18 +89,13 @@ QString BluetoothLink::getName() const
return _config->name();
}
void BluetoothLink::writeBytes(const char* data, qint64 size)
{
_sendBytes(data, size);
}
void BluetoothLink::_sendBytes(const char* data, qint64 size)
void BluetoothLink::writeBytes(const QByteArray bytes)
{
if(_targetSocket)
{
if(_targetSocket->isWritable())
{
if(_targetSocket->write(data, size) > 0) {
if(_targetSocket->write(bytes) > 0) {
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
}
else
......
......@@ -165,9 +165,7 @@ public:
LinkConfiguration* getLinkConfiguration() { return _config; }
public slots:
void readBytes ();
void writeBytes (const char* data, qint64 length);
void deviceConnected ();
void deviceDisconnected ();
void deviceError (QBluetoothSocket::SocketError error);
......@@ -193,7 +191,11 @@ private:
bool _hardwareConnect ();
void _restartConnection ();
void _sendBytes (const char* data, qint64 size);
private slots:
void writeBytes (const QByteArray bytes);
private:
void _createSocket ();
private:
......
......@@ -151,16 +151,24 @@ public slots:
*
* If the underlying communication is packet oriented,
* one write command equals a datagram. In case of serial
* communication arbitrary byte lengths can be written
* communication arbitrary byte lengths can be written. The method ensures
* thread safety regardless of the underlying LinkInterface implementation.
*
* @param bytes The pointer to the byte array containing the data
* @param length The length of the data array
**/
virtual void writeBytes(const char *bytes, qint64 length) = 0;
void writeBytesSafe(const char *bytes, int length)
{
emit _invokeWriteBytes(QByteArray(bytes, length));
}
private slots:
virtual void writeBytes(const QByteArray) = 0;
signals:
void autoconnectChanged(bool autoconnect);
void activeChanged(bool active);
void _invokeWriteBytes(QByteArray);
/**
* @brief New data arrived
......@@ -212,6 +220,7 @@ protected:
memset(_outDataWriteAmounts,0, sizeof(_outDataWriteAmounts));
memset(_outDataWriteTimes, 0, sizeof(_outDataWriteTimes));
QObject::connect(this, &LinkInterface::_invokeWriteBytes, this, &LinkInterface::writeBytes);
qRegisterMetaType<LinkInterface*>("LinkInterface*");
}
......
......@@ -881,7 +881,7 @@ void LinkManager::_activeLinkCheck(void)
if (!found && link) {
// See if we can get an NSH prompt on this link
bool foundNSHPrompt = false;
link->writeBytes("\r", 1);
link->writeBytesSafe("\r", 1);
QSignalSpy spy(link, SIGNAL(bytesReceived(LinkInterface*, QByteArray)));
if (spy.wait(100)) {
QList<QVariant> arguments = spy.takeFirst();
......
......@@ -151,10 +151,9 @@ void LogReplayLink::_replayError(const QString& errorMsg)
}
/// Since this is log replay, we just drops writes on the floor
void LogReplayLink::writeBytes(const char* bytes, qint64 cBytes)
void LogReplayLink::writeBytes(const QByteArray bytes)
{
Q_UNUSED(bytes);
Q_UNUSED(cBytes);
}
/// Parses a BigEndian quint64 timestamp
......
......@@ -98,8 +98,8 @@ public:
bool connect(void);
bool disconnect(void);
public slots:
virtual void writeBytes(const char *bytes, qint64 cBytes);
private slots:
virtual void writeBytes(const QByteArray bytes);
signals:
void logFileStats(bool logTimestamped, int logDurationSecs, int binaryBaudRate);
......
......@@ -438,7 +438,7 @@ void MAVLinkProtocol::_sendMessage(LinkInterface* link, mavlink_message_t messag
if (link->isConnected())
{
// Send the portion of the buffer now occupied by the message
link->writeBytes((const char*)buffer, len);
link->writeBytesSafe((const char*)buffer, len);
}
}
......@@ -461,7 +461,7 @@ void MAVLinkProtocol::_sendMessage(LinkInterface* link, mavlink_message_t messag
if (link->isConnected())
{
// Send the portion of the buffer now occupied by the message
link->writeBytes((const char*)buffer, len);
link->writeBytesSafe((const char*)buffer, len);
}
}
......
......@@ -115,7 +115,6 @@ MockLink::MockLink(MockConfiguration* config)
moveToThread(this);
_loadParams();
QObject::connect(this, &MockLink::_incomingBytes, this, &MockLink::_handleIncomingBytes);
}
MockLink::~MockLink(void)
......@@ -314,16 +313,7 @@ void MockLink::respondWithMavlinkMessage(const mavlink_message_t& msg)
}
/// @brief Called when QGC wants to write bytes to the MAV
void MockLink::writeBytes(const char* bytes, qint64 cBytes)
{
// Package up the data so we can signal it over to the right thread
QByteArray byteArray(bytes, cBytes);
emit _incomingBytes(byteArray);
}
/// @brief Handles bytes from QGC on the thread
void MockLink::_handleIncomingBytes(const QByteArray bytes)
void MockLink::writeBytes(const QByteArray bytes)
{
if (_inNSH) {
_handleIncomingNSHBytes(bytes.constData(), bytes.count());
......
......@@ -150,12 +150,8 @@ public:
static MockLink* startAPMArduCopterMockLink (bool sendStatusText);
static MockLink* startAPMArduPlaneMockLink (bool sendStatusText);
signals:
/// @brief Used internally to move data to the thread.
void _incomingBytes(const QByteArray bytes);
public slots:
virtual void writeBytes(const char *bytes, qint64 cBytes);
private slots:
virtual void writeBytes(const QByteArray bytes);
private slots:
void _run1HzTasks(void);
......@@ -172,7 +168,6 @@ private:
// MockLink methods
void _sendHeartBeat(void);
void _handleIncomingBytes(const QByteArray bytes);
void _handleIncomingNSHBytes(const char* bytes, int cBytes);
void _handleIncomingMavlinkBytes(const uint8_t* bytes, int cBytes);
void _loadParams(void);
......
......@@ -234,7 +234,7 @@ void QGCFlightGearLink::updateControls(quint64 time, float rollAilerons, float p
{
QString state("%1\t%2\t%3\t%4\t%5\n");
state = state.arg(rollAilerons).arg(pitchElevator).arg(yawRudder).arg(true).arg(throttle);
writeBytes(state.toLatin1().constData(), state.length());
emit _invokeWriteBytes(state.toLatin1());
//qDebug() << "Updated controls" << rollAilerons << pitchElevator << yawRudder << throttle;
//qDebug() << "Updated controls" << state;
}
......@@ -244,13 +244,13 @@ void QGCFlightGearLink::updateControls(quint64 time, float rollAilerons, float p
}
}
void QGCFlightGearLink::writeBytes(const char* data, qint64 size)
void QGCFlightGearLink::writeBytes(const QByteArray data)
{
//#define QGCFlightGearLink_DEBUG
#ifdef QGCFlightGearLink_DEBUG
QString bytes;
QString ascii;
for (int i=0; i<size; i++)
for (int i=0, size = data.size(); i<size; i++)
{
unsigned char v = data[i];
bytes.append(QString().sprintf("%02x ", v));
......@@ -267,7 +267,7 @@ void QGCFlightGearLink::writeBytes(const char* data, qint64 size)
qDebug() << bytes;
qDebug() << "ASCII:" << ascii;
#endif
if (connectState && _udpCommSocket) _udpCommSocket->writeDatagram(data, size, currentHost, currentPort);
if (connectState && _udpCommSocket) _udpCommSocket->writeDatagram(data, currentHost, currentPort);
}
/**
......
......@@ -121,13 +121,17 @@ public slots:
}
void readBytes();
private slots:
/**
* @brief Write a number of bytes to the interface.
*
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
void writeBytes(const char* data, qint64 length);
void writeBytes(const QByteArray data);
public slots:
bool connectSimulation();
bool disconnectSimulation();
......
......@@ -64,13 +64,26 @@ public slots:
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
virtual void writeBytes(const char* data, qint64 length) = 0;
void writeBytesSafe(const char* data, int length)
{
emit _invokeWriteBytes(QByteArray(data, length));
}
virtual bool connectSimulation() = 0;
virtual bool disconnectSimulation() = 0;
private slots:
virtual void writeBytes(const QByteArray) = 0;
protected:
virtual void setName(QString name) = 0;
QGCHilLink() :
QThread()
{
connect(this, &QGCHilLink::_invokeWriteBytes, this, &QGCHilLink::writeBytes);
}
signals:
/**
* @brief This signal is emitted instantly when the link is connected
......@@ -130,6 +143,9 @@ signals:
/** @brief Sensor leve HIL state changed */
void sensorHilChanged(bool enabled);
/** @brief Helper signal to force execution on the correct thread */
void _invokeWriteBytes(QByteArray);
};
#endif // QGCHILLINK_H
......@@ -246,7 +246,7 @@ void QGCJSBSimLink::updateControls(quint64 time, float rollAilerons, float pitch
{
QString state("%1\t%2\t%3\t%4\t%5\n");
state = state.arg(rollAilerons).arg(pitchElevator).arg(yawRudder).arg(true).arg(throttle);
writeBytes(state.toLatin1().constData(), state.length());
emit _invokeWriteBytes(state.toLatin1());
}
else
{
......@@ -255,13 +255,13 @@ void QGCJSBSimLink::updateControls(quint64 time, float rollAilerons, float pitch
//qDebug() << "Updated controls" << state;
}
void QGCJSBSimLink::writeBytes(const char* data, qint64 size)
void QGCJSBSimLink::writeBytes(const QByteArray data)
{
//#define QGCJSBSimLink_DEBUG
#ifdef QGCJSBSimLink_DEBUG
QString bytes;
QString ascii;
for (int i=0; i<size; i++)
for (int i=0, size = data.size(); i<size; i++)
{
unsigned char v = data[i];
bytes.append(QString().sprintf("%02x ", v));
......@@ -278,7 +278,7 @@ void QGCJSBSimLink::writeBytes(const char* data, qint64 size)
qDebug() << bytes;
qDebug() << "ASCII:" << ascii;
#endif
if (connectState && socket) socket->writeDatagram(data, size, currentHost, currentPort);
if (connectState && socket) socket->writeDatagram(data, currentHost, currentPort);
}
/**
......
......@@ -114,13 +114,17 @@ public slots:
}
void readBytes();
private slots:
/**
* @brief Write a number of bytes to the interface.
*
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
void writeBytes(const char* data, qint64 length);
void writeBytes(const QByteArray data);
public slots:
bool connectSimulation();
bool disconnectSimulation();
......
......@@ -229,7 +229,7 @@ void QGCXPlaneLink::run()
strncpy(ip.str_port_them, localPortStr.toLatin1(), qMin((int)sizeof(ip.str_port_them), 6));
ip.use_ip = 1;
writeBytes((const char*)&ip, sizeof(ip));
writeBytesSafe((const char*)&ip, sizeof(ip));
_should_exit = false;
......@@ -395,10 +395,10 @@ void QGCXPlaneLink::updateControls(quint64 time, float rollAilerons, float pitch
{
// Ail / Elevon / Rudder
p.index = 12; // XPlane, wing sweep
writeBytes((const char*)&p, sizeof(p));
writeBytesSafe((const char*)&p, sizeof(p));
p.index = 8; // XPlane, joystick? why?
writeBytes((const char*)&p, sizeof(p));
writeBytesSafe((const char*)&p, sizeof(p));
p.index = 25; // Thrust
memset(p.f, 0, sizeof(p.f));
......@@ -408,13 +408,13 @@ void QGCXPlaneLink::updateControls(quint64 time, float rollAilerons, float pitch
p.f[3] = throttle;
// Throttle
writeBytes((const char*)&p, sizeof(p));
writeBytesSafe((const char*)&p, sizeof(p));
}
else
{
qDebug() << "Transmitting p.index = 25";
p.index = 25; // XPlane, throttle command.
writeBytes((const char*)&p, sizeof(p));
writeBytesSafe((const char*)&p, sizeof(p));
}
}
......@@ -448,14 +448,14 @@ Eigen::Matrix3f euler_to_wRo(double yaw, double pitch, double roll) {
return wRo;
}
void QGCXPlaneLink::writeBytes(const char* data, qint64 size)
void QGCXPlaneLink::writeBytes(const QByteArray data)
{
if (!data) return;
if (data.isEmpty()) return;
// If socket exists and is connected, transmit the data
if (socket && connectState)
{
socket->writeDatagram(data, size, remoteHost, remotePort);
socket->writeDatagram(data, remoteHost, remotePort);
}
}
......@@ -899,7 +899,7 @@ void QGCXPlaneLink::setPositionAttitude(double lat, double lon, double alt, doub
pos.gear_flap_vect[1] = 0.0f;
pos.gear_flap_vect[2] = 0.0f;
writeBytes((const char*)&pos, sizeof(pos));
writeBytesSafe((const char*)&pos, sizeof(pos));
// pos.header[0] = 'V';
// pos.header[1] = 'E';
......@@ -917,7 +917,7 @@ void QGCXPlaneLink::setPositionAttitude(double lat, double lon, double alt, doub
// pos.gear_flap_vect[1] = -999;
// pos.gear_flap_vect[2] = -999;
// writeBytes((const char*)&pos, sizeof(pos));
// writeBytesSafe((const char*)&pos, sizeof(pos));
}
/**
......
......@@ -126,13 +126,17 @@ public slots:
void processError(QProcess::ProcessError err);
void readBytes();
private slots:
/**
* @brief Write a number of bytes to the interface.
*
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
void writeBytes(const char* data, qint64 length);
void writeBytes(const QByteArray data);
public slots:
bool connectSimulation();
bool disconnectSimulation();
/**
......
......@@ -82,11 +82,11 @@ bool SerialLink::_isBootloader()
return false;
}
void SerialLink::writeBytes(const char* data, qint64 size)
void SerialLink::writeBytes(const QByteArray data)
{
if(_port && _port->isOpen()) {
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
_port->write(data, size);
_logOutputDataRate(data.size(), QDateTime::currentMSecsSinceEpoch());
_port->write(data);
} else {
// Error occured
_emitLinkError(tr("Could not send data - link %1 is disconnected!").arg(getName()));
......
......@@ -157,15 +157,16 @@ public:
bool connect(void);
bool disconnect(void);
public slots:
private slots:
/**
* @brief Write a number of bytes to the interface.
*
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
void writeBytes(const char* data, qint64 length);
void writeBytes(const QByteArray data);
public slots:
void linkError(QSerialPort::SerialPortError error);
protected:
......
......@@ -65,11 +65,11 @@ void TCPLink::run()
}
#ifdef TCPLINK_READWRITE_DEBUG
void TCPLink::_writeDebugBytes(const char *data, qint16 size)
void TCPLink::_writeDebugBytes(const QByteArray data)
{
QString bytes;
QString ascii;
for (int i=0; i<size; i++)
for (int i=0, size = data.size(); i<size; i++)
{
unsigned char v = data[i];
bytes.append(QString().sprintf("%02x ", v));
......@@ -88,13 +88,13 @@ void TCPLink::_writeDebugBytes(const char *data, qint16 size)
}
#endif
void TCPLink::writeBytes(const char* data, qint64 size)
void TCPLink::writeBytes(const QByteArray data)
{
#ifdef TCPLINK_READWRITE_DEBUG
_writeDebugBytes(data, size);
_writeDebugBytes(data);
#endif
_socket->write(data, size);
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
_socket->write(data);
_logOutputDataRate(data.size(), QDateTime::currentMSecsSinceEpoch());
}
/**
......
......@@ -152,10 +152,11 @@ public:
bool connect(void);
bool disconnect(void);
public slots:
private slots:
// From LinkInterface
void writeBytes(const char* data, qint64 length);
void writeBytes(const QByteArray data);
public slots:
void waitForBytesWritten(int msecs);
void waitForReadyRead(int msecs);
......@@ -182,7 +183,7 @@ private:
void _restartConnection();
#ifdef TCPLINK_READWRITE_DEBUG
void _writeDebugBytes(const char *data, qint16 size);
void _writeDebugBytes(const QByteArray data);
#endif
TCPConfiguration* _config;
......
......@@ -29,12 +29,6 @@ This file is part of the QGROUNDCONTROL project
*/
#include <QtGlobal>
#if QT_VERSION > 0x050401
#define UDP_BROKEN_SIGNAL 1
#else
#define UDP_BROKEN_SIGNAL 0
#endif
#include <QTimer>
#include <QList>
#include <QDebug>
......@@ -111,9 +105,6 @@ UDPLink::~UDPLink()
quit();
// Wait for it to exit
wait();
while(_outQueue.count() > 0) {
delete _outQueue.dequeue();
}
this->deleteLater();
}
......@@ -124,25 +115,7 @@ UDPLink::~UDPLink()
void UDPLink::run()
{
if(_hardwareConnect()) {
if(UDP_BROKEN_SIGNAL) {
bool loop = false;
while(true) {
//-- Anything to read?
loop = _socket->hasPendingDatagrams();
if(loop) {
readBytes();
}
//-- Loop right away if busy
if((_dequeBytes() || loop) && _running)
continue;
if(!_running)
break;
//-- Settle down (it gets here if there is nothing to read or write)
_socket->waitForReadyRead(5);
}
} else {
exec();
}
exec();
}
if (_socket) {
_deregisterZeroconf();
......@@ -174,35 +147,7 @@ void UDPLink::removeHost(const QString& host)
_config->removeHost(host);
}
void UDPLink::writeBytes(const char* data, qint64 size)
{
if (!_socket) {
return;
}
if(UDP_BROKEN_SIGNAL) {
QByteArray* qdata = new QByteArray(data, size);
QMutexLocker lock(&_mutex);
_outQueue.enqueue(qdata);
} else {
_sendBytes(data, size);
}
}
bool UDPLink::_dequeBytes()
{
QMutexLocker lock(&_mutex);
if(_outQueue.count() > 0) {
QByteArray* qdata = _outQueue.dequeue();
lock.unlock();
_sendBytes(qdata->data(), qdata->size());
delete qdata;
lock.relock();
}
return (_outQueue.count() > 0);
}
void UDPLink::_sendBytes(const char* data, qint64 size)
void UDPLink::writeBytes(const QByteArray data)
{
QStringList goneHosts;
// Send to all connected systems
......@@ -211,7 +156,7 @@ void UDPLink::_sendBytes(const char* data, qint64 size)
if(_config->firstHost(host, port)) {
do {
QHostAddress currentHost(host);
if(_socket->writeDatagram(data, size, currentHost, (quint16)port) < 0) {
if(_socket->writeDatagram(data, currentHost, (quint16)port) < 0) {
// This host is gone. Add to list to be removed
// We should keep track of hosts that were manually added (static) and
// hosts that were added because we heard from them (dynamic). Only
......@@ -225,7 +170,7 @@ void UDPLink::_sendBytes(const char* data, qint64 size)
// "host not there" takes time too regardless of size of data. In fact,
// 1 byte or "UDP frame size" bytes are the same as that's the data
// unit sent by UDP.
_logOutputDataRate(size, QDateTime::currentMSecsSinceEpoch());
_logOutputDataRate(data.size(), QDateTime::currentMSecsSinceEpoch());
}
} while (_config->nextHost(host, port));
//-- Remove hosts that are no longer there
......@@ -260,8 +205,6 @@ void UDPLink::readBytes()
// would trigger this.
// Add host to broadcast list if not yet present, or update its port
_config->addHost(sender.toString(), (int)senderPort);
if(UDP_BROKEN_SIGNAL && !_running)
break;
}
//-- Send whatever is left
if(databuffer.size()) {
......@@ -326,10 +269,7 @@ bool UDPLink::_hardwareConnect()
_socket->setSocketOption(QAbstractSocket::ReceiveBufferSizeSocketOption, 512 * 1024);
#endif
_registerZeroconf(_config->localPort(), kZeroconfRegistration);
//-- Connect signal if this version of Qt is not broken
if(!UDP_BROKEN_SIGNAL) {
QObject::connect(_socket, &QUdpSocket::readyRead, this, &UDPLink::readBytes);
}
QObject::connect(_socket, &QUdpSocket::readyRead, this, &UDPLink::readBytes);
emit connected();
} else {
emit communicationError("UDP Link Error", "Error binding UDP port");
......
......@@ -201,13 +201,14 @@ public slots:
void readBytes();
private slots:
/*!
* @brief Write a number of bytes to the interface.
*
* @param data Pointer to the data byte array
* @param size The size of the bytes array
**/
void writeBytes(const char* data, qint64 length);
void writeBytes(const QByteArray data);
protected:
......@@ -235,12 +236,6 @@ private:
#endif
bool _running;
QMutex _mutex;
QQueue<QByteArray*> _outQueue;
bool _dequeBytes ();
void _sendBytes (const char* data, qint64 size);
};
#endif // UDPLINK_H
......@@ -172,17 +172,11 @@ void XbeeLink::_disconnect(void)
emit disconnected();
}
void XbeeLink::writeBytes(const char *bytes, qint64 length) // TO DO: delete the data array
void XbeeLink::writeBytes(const QByteArray bytes)
{
char *data;
data = new char[length];
for(long i=0;i<length;i++)
if(!xbee_nsenddata(this->m_xbeeCon,const_cast<char*>(bytes.data()),bytes.size())) // return value of 0 is successful written
{
data[i] = bytes[i];
}
if(!xbee_nsenddata(this->m_xbeeCon,data,length)) // return value of 0 is successful written
{
_logOutputDataRate(length, QDateTime::currentMSecsSinceEpoch());
_logOutputDataRate(bytes.size(), QDateTime::currentMSecsSinceEpoch());
}
else
{
......
......@@ -45,8 +45,8 @@ public:
qint64 getCurrentOutDataRate() const;
qint64 getCurrentInDataRate() const;
public slots: // virtual functions from LinkInterface
void writeBytes(const char *bytes, qint64 length);
private slots: // virtual functions from LinkInterface
void writeBytes(const QByteArray bytes);
protected slots: // virtual functions from LinkInterface
void readBytes();
......
......@@ -154,7 +154,7 @@ void TCPLinkUnitTest::_connectSucceed_test(void)
const char* bytesWrittenSignal = SIGNAL(bytesWritten(qint64));
MultiSignalSpy bytesWrittenSpy;
QCOMPARE(bytesWrittenSpy.init(_link->getSocket(), &bytesWrittenSignal, 1), true);
_link->writeBytes(bytesOut.data(), bytesOut.size());
_link->writeBytesSafe(bytesOut.data(), bytesOut.size());
_multiSpy->clearAllSignals();
// We emit this signal such that it will be queued and run on the TCPLink thread. This in turn
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment