diff --git a/src/comm/UDPLink.cc b/src/comm/UDPLink.cc index 2462df8d6eebf97d5d660b3da3c1bd8a692ce90c..fac059cb896e34b44e320d68952f1dd224a32c8e 100644 --- a/src/comm/UDPLink.cc +++ b/src/comm/UDPLink.cc @@ -80,6 +80,7 @@ UDPLink::UDPLink(UDPConfiguration* config) #if defined(QGC_ZEROCONF_ENABLED) , _dnssServiceRef(NULL) #endif + , _running(false) { Q_ASSERT(config != NULL); _config = config; @@ -98,9 +99,12 @@ UDPLink::~UDPLink() _config->setLink(NULL); _disconnect(); // Tell the thread to exit - quit(); + _running = false; // Wait for it to exit wait(); + while(_outQueue.count() > 0) { + delete _outQueue.dequeue(); + } this->deleteLater(); } @@ -110,8 +114,16 @@ UDPLink::~UDPLink() **/ void UDPLink::run() { - _hardwareConnect(); - exec(); + if(_hardwareConnect()) { + while(true) { + readBytes(); + if(_writeBytes() && _running) + continue; + if(!_running) + break; + this->msleep(50); + } + } if (_socket) { _deregisterZeroconf(); _socket->close(); @@ -149,39 +161,42 @@ void UDPLink::writeBytes(const char* data, qint64 size) if (!_socket) { return; } - - // Broadcast to all connected systems - QString host; - int port; - if(_config->firstHost(host, port)) { - do { - if(UDPLINK_DEBUG) { - QString bytes; - QString ascii; - for (int i=0; i 31 && data[i] < 127) - { - ascii.append(data[i]); - } - else - { - ascii.append(219); - } + QByteArray* qdata = new QByteArray(data, size); + QMutexLocker lock(&_mutex); + _outQueue.enqueue(qdata); +} + + +bool UDPLink::_writeBytes() +{ + QMutexLocker lock(&_mutex); + if(_outQueue.count() > 0) { + QByteArray* qdata = _outQueue.dequeue(); + lock.unlock(); + QStringList goneHosts; + // Send to all connected systems + QString host; + int port; + if(_config->firstHost(host, port)) { + do { + QHostAddress currentHost(host); + if(_socket->writeDatagram(qdata->data(), qdata->size(), currentHost, (quint16)port) < 0) { + // This host is gone. Add to list to be removed + goneHosts.append(host); } - qDebug() << "Sent" << size << "bytes to" << host << ":" << port << "data:"; - qDebug() << bytes; - qDebug() << "ASCII:" << ascii; + // Log the amount and time written out for future data rate calculations. + QMutexLocker dataRateLocker(&dataRateMutex); + logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, qdata->size(), QDateTime::currentMSecsSinceEpoch()); + } while (_config->nextHost(host, port)); + //-- Remove hosts that are no longer there + foreach (QString ghost, goneHosts) { + _config->removeHost(host); } - QHostAddress currentHost(host); - _socket->writeDatagram(data, size, currentHost, (quint16)port); - // Log the amount and time written out for future data rate calculations. - QMutexLocker dataRateLocker(&dataRateMutex); - logDataRateToBuffer(outDataWriteAmounts, outDataWriteTimes, &outDataIndex, size, QDateTime::currentMSecsSinceEpoch()); - } while (_config->nextHost(host, port)); + } + delete qdata; + lock.relock(); } + return (_outQueue.count() > 0); } /** @@ -220,6 +235,8 @@ void UDPLink::readBytes() // would trigger this. // Add host to broadcast list if not yet present, or update its port _config->addHost(sender.toString(), (int)senderPort); + if(!_running) + break; } } @@ -230,8 +247,8 @@ void UDPLink::readBytes() **/ bool UDPLink::_disconnect(void) { - this->quit(); - this->wait(); + _running = false; + wait(); if (_socket) { // Make sure delete happen on correct thread _socket->deleteLater(); @@ -252,12 +269,13 @@ bool UDPLink::_connect(void) { if(this->isRunning()) { - this->quit(); - this->wait(); + _running = false; + wait(); } // TODO When would this ever return false? bool connected = true; // I see no reason to run this in "HighPriority" + _running = true; start(NormalPriority); return connected; } @@ -268,13 +286,12 @@ bool UDPLink::_hardwareConnect() delete _socket; _socket = NULL; } - QHostAddress host = QHostAddress::Any; + QHostAddress host = QHostAddress::AnyIPv4; _socket = new QUdpSocket(); _socket->setProxy(QNetworkProxy::NoProxy); _connectState = _socket->bind(host, _config->localPort(), QAbstractSocket::ReuseAddressHint); if (_connectState) { _registerZeroconf(_config->localPort(), kZeroconfRegistration); - QObject::connect(_socket, SIGNAL(readyRead()), this, SLOT(readBytes())); emit connected(); } else { emit communicationError("UDP Link Error", "Error binding UDP port"); @@ -408,7 +425,7 @@ void UDPConfiguration::addHost(const QString& host, int port) qWarning() << "UDP:" << "Could not resolve host:" << host << "port:" << port; } else { _hosts[ipAdd] = port; - //qDebug() << "UDP:" << "Adding Host:" << ipAdd << ":" << port; + qDebug() << "UDP:" << "Adding Host:" << ipAdd << ":" << port; } } } @@ -423,7 +440,10 @@ void UDPConfiguration::removeHost(const QString& host) tHost = tHost.trimmed(); QMap::iterator i = _hosts.find(tHost); if(i != _hosts.end()) { + qDebug() << "UDP:" << "Removed host:" << host; _hosts.erase(i); + } else { + qWarning() << "UDP:" << "Could not remove unknown host:" << host; } } diff --git a/src/comm/UDPLink.h b/src/comm/UDPLink.h index 76c0c347901291f7c06db837ce7a28095fc83477..4c2bd258dabfb14bc500615a9f203081e1ced159 100644 --- a/src/comm/UDPLink.h +++ b/src/comm/UDPLink.h @@ -36,6 +36,9 @@ This file is part of the QGROUNDCONTROL project #include #include #include +#include +#include +#include #if defined(QGC_ZEROCONF_ENABLED) #include @@ -212,8 +215,11 @@ private: DNSServiceRef _dnssServiceRef; #endif -signals: - //Signals are defined by LinkInterface + bool _running; + QMutex _mutex; + QQueue _outQueue; + + bool _writeBytes (); };