From fb03713fc46ae557bd8300acab9ff9d1f192eb1d Mon Sep 17 00:00:00 2001 From: Andrew Voznytsa Date: Tue, 25 Feb 2020 22:11:37 +0200 Subject: [PATCH] Switch VideoReceiver to new API (big refactoring) --- src/Camera/QGCCameraControl.cc | 8 +- src/FlightDisplay/FlightDisplayViewVideo.qml | 8 +- src/FlightMap/Widgets/VideoPageWidget.qml | 10 +- src/VideoStreaming/SubtitleWriter.cc | 63 +- src/VideoStreaming/SubtitleWriter.h | 13 +- src/VideoStreaming/VideoManager.cc | 331 ++- src/VideoStreaming/VideoManager.h | 29 +- src/VideoStreaming/VideoReceiver.cc | 1953 ++++++++++-------- src/VideoStreaming/VideoReceiver.h | 274 ++- src/VideoStreaming/gstqgcvideosinkbin.c | 3 + 10 files changed, 1542 insertions(+), 1150 deletions(-) diff --git a/src/Camera/QGCCameraControl.cc b/src/Camera/QGCCameraControl.cc index 1fd5ce071..f46f974d5 100644 --- a/src/Camera/QGCCameraControl.cc +++ b/src/Camera/QGCCameraControl.cc @@ -388,11 +388,11 @@ QGCCameraControl::takePhoto() _setPhotoStatus(PHOTO_CAPTURE_IN_PROGRESS); _captureInfoRetries = 0; //-- Capture local image as well - if(qgcApp()->toolbox()->videoManager()->videoReceiver()) { + if(qgcApp()->toolbox()->videoManager()) { QString photoPath = qgcApp()->toolbox()->settingsManager()->appSettings()->savePath()->rawValue().toString() + QStringLiteral("/Photo"); QDir().mkpath(photoPath); photoPath += + "/" + QDateTime::currentDateTime().toString("yyyy-MM-dd_hh.mm.ss.zzz") + ".jpg"; - qgcApp()->toolbox()->videoManager()->videoReceiver()->grabImage(photoPath); + qgcApp()->toolbox()->videoManager()->grabImage(photoPath); } return true; } @@ -1542,11 +1542,11 @@ QGCCameraControl::handleCaptureStatus(const mavlink_camera_capture_status_t& cap //-- Time Lapse if(photoStatus() == PHOTO_CAPTURE_INTERVAL_IDLE || photoStatus() == PHOTO_CAPTURE_INTERVAL_IN_PROGRESS) { //-- Capture local image as well - if(qgcApp()->toolbox()->videoManager()->videoReceiver()) { + if(qgcApp()->toolbox()->videoManager()) { QString photoPath = qgcApp()->toolbox()->settingsManager()->appSettings()->savePath()->rawValue().toString() + QStringLiteral("/Photo"); QDir().mkpath(photoPath); photoPath += + "/" + QDateTime::currentDateTime().toString("yyyy-MM-dd_hh.mm.ss.zzz") + ".jpg"; - qgcApp()->toolbox()->videoManager()->videoReceiver()->grabImage(photoPath); + qgcApp()->toolbox()->videoManager()->grabImage(photoPath); } } } diff --git a/src/FlightDisplay/FlightDisplayViewVideo.qml b/src/FlightDisplay/FlightDisplayViewVideo.qml index 393886a9e..16c4a2afd 100644 --- a/src/FlightDisplay/FlightDisplayViewVideo.qml +++ b/src/FlightDisplay/FlightDisplayViewVideo.qml @@ -40,7 +40,7 @@ Item { id: noVideo anchors.fill: parent color: Qt.rgba(0,0,0,0.75) - visible: !(_videoReceiver && _videoReceiver.videoRunning) + visible: !(_videoReceiver && _videoReceiver.decoding) QGCLabel { text: QGroundControl.settingsManager.videoSettings.streamEnabled.rawValue ? qsTr("WAITING FOR VIDEO") : qsTr("VIDEO DISABLED") font.family: ScreenTools.demiboldFontFamily @@ -58,7 +58,7 @@ Item { Rectangle { anchors.fill: parent color: "black" - visible: _videoReceiver && _videoReceiver.videoRunning + visible: _videoReceiver && _videoReceiver.decoding function getWidth() { //-- Fit Width or Stretch if(_fitMode === 0 || _fitMode === 2) { @@ -86,7 +86,7 @@ Item { target: _videoReceiver onImageFileChanged: { videoContent.grabToImage(function(result) { - if (!result.saveToFile(_videoReceiver.imageFile)) { + if (!result.saveToFile(QGroundControl.videoManager.imageFile)) { console.error('Error capturing video frame'); } }); @@ -130,7 +130,7 @@ Item { height: parent.getHeight() width: parent.getWidth() anchors.centerIn: parent - visible: _videoReceiver && _videoReceiver.videoRunning + visible: _videoReceiver && _videoReceiver.decoding sourceComponent: videoBackgroundComponent property bool videoDisabled: QGroundControl.settingsManager.videoSettings.videoSource.rawValue === QGroundControl.settingsManager.videoSettings.disabledVideoSource diff --git a/src/FlightMap/Widgets/VideoPageWidget.qml b/src/FlightMap/Widgets/VideoPageWidget.qml index f835e896c..651b27a09 100644 --- a/src/FlightMap/Widgets/VideoPageWidget.qml +++ b/src/FlightMap/Widgets/VideoPageWidget.qml @@ -33,7 +33,7 @@ Item { property bool _communicationLost: activeVehicle ? activeVehicle.connectionLost : false property var _videoReceiver: QGroundControl.videoManager.videoReceiver property bool _recordingVideo: _videoReceiver && _videoReceiver.recording - property bool _videoRunning: _videoReceiver && _videoReceiver.videoRunning + property bool _decodingVideo: _videoReceiver && _videoReceiver.decoding property bool _streamingEnabled: QGroundControl.settingsManager.videoSettings.streamConfigured property var _dynamicCameras: activeVehicle ? activeVehicle.dynamicCameras : null property int _curCameraIndex: _dynamicCameras ? _dynamicCameras.currentCamera : 0 @@ -136,7 +136,7 @@ Item { anchors.bottom: parent.bottom width: height radius: _recordingVideo ? 0 : height - color: (_videoRunning && _streamingEnabled) ? "red" : "gray" + color: (_decodingVideo && _streamingEnabled) ? "red" : "gray" SequentialAnimation on opacity { running: _recordingVideo loops: Animation.Infinite @@ -157,14 +157,14 @@ Item { } MouseArea { anchors.fill: parent - enabled: _videoRunning && _streamingEnabled + enabled: _decodingVideo && _streamingEnabled onClicked: { if (_recordingVideo) { - _videoReceiver.stopRecording() + QGroundControl.videoManager.stopRecording() // reset blinking animation recordBtnBackground.opacity = 1 } else { - _videoReceiver.startRecording(videoFileName.text) + QGroundControl.videoManager.startRecording(videoFileName.text) } } } diff --git a/src/VideoStreaming/SubtitleWriter.cc b/src/VideoStreaming/SubtitleWriter.cc index 9a2a4908a..4e64e147f 100644 --- a/src/VideoStreaming/SubtitleWriter.cc +++ b/src/VideoStreaming/SubtitleWriter.cc @@ -15,9 +15,6 @@ */ #include "SubtitleWriter.h" -#include "SettingsManager.h" -#include "VideoReceiver.h" -#include "VideoManager.h" #include "QGCApplication.h" #include "QGCCorePlugin.h" #include @@ -31,48 +28,11 @@ const int SubtitleWriter::_sampleRate = 1; // Sample rate in Hz for getting tele SubtitleWriter::SubtitleWriter(QObject* parent) : QObject(parent) { + connect(&_timer, &QTimer::timeout, this, &SubtitleWriter::_captureTelemetry); } -void SubtitleWriter::setVideoReceiver(VideoReceiver* videoReceiver) +void SubtitleWriter::startCapturingTelemetry(const QString& videoFile) { - if(!videoReceiver) { - qCWarning(SubtitleWriterLog) << "Invalid VideoReceiver pointer! Aborting subtitle capture!"; - return; - } - _videoReceiver = videoReceiver; - -#if defined(QGC_GST_STREAMING) - // Only start writing subtitles once the recording pipeline actually starts - connect(_videoReceiver, &VideoReceiver::gotFirstRecordingKeyFrame, this, &SubtitleWriter::_startCapturingTelemetry); - - // Captures recordingChanged() signals to stop writing subtitles - connect(_videoReceiver, &VideoReceiver::recordingChanged, this, &SubtitleWriter::_onVideoRecordingChanged); -#endif - - // Timer for telemetry capture and writing to file - connect(&_timer, &QTimer::timeout, this, &SubtitleWriter::_captureTelemetry); -} - -void SubtitleWriter::_onVideoRecordingChanged() -{ -#if defined(QGC_GST_STREAMING) - // Stop capturing data if recording stopped - if(!_videoReceiver->recording()) { - qCDebug(SubtitleWriterLog) << "Stopping writing"; - _timer.stop(); - _file.close(); - } -#endif -} - -void SubtitleWriter::_startCapturingTelemetry() -{ - if(!_videoReceiver) { - qCWarning(SubtitleWriterLog) << "Invalid VideoReceiver pointer! Aborting subtitle capture!"; - _timer.stop(); - return; - } - // Get the facts displayed in the values widget and capture them, removing the "Vehicle." prefix. QSettings settings; settings.beginGroup("ValuesWidget"); @@ -81,8 +41,8 @@ void SubtitleWriter::_startCapturingTelemetry() _startTime = QDateTime::currentDateTime(); - QFileInfo videoFile(_videoReceiver->videoFile()); - QString subtitleFilePath = QStringLiteral("%1/%2.ass").arg(videoFile.path(), videoFile.completeBaseName()); + QFileInfo videoFileInfo(videoFile); + QString subtitleFilePath = QStringLiteral("%1/%2.ass").arg(videoFileInfo.path(), videoFileInfo.completeBaseName()); qCDebug(SubtitleWriterLog) << "Writing overlay to file:" << subtitleFilePath; _file.setFileName(subtitleFilePath); @@ -118,14 +78,17 @@ void SubtitleWriter::_startCapturingTelemetry() _timer.start(1000/_sampleRate); } -void SubtitleWriter::_captureTelemetry() +void SubtitleWriter::stopCapturingTelemetry() { - if(!_videoReceiver) { - qCWarning(SubtitleWriterLog) << "Invalid VideoReceiver pointer! Aborting subtitle capture!"; - _timer.stop(); - return; - } +#if defined(QGC_GST_STREAMING) + qCDebug(SubtitleWriterLog) << "Stopping writing"; + _timer.stop(); + _file.close(); +#endif +} +void SubtitleWriter::_captureTelemetry() +{ static const float nRows = 3; // number of rows used for displaying data static const int offsetFactor = 700; // Used to simulate a larger resolution and reduce the borders in the layout diff --git a/src/VideoStreaming/SubtitleWriter.h b/src/VideoStreaming/SubtitleWriter.h index 9d4047ff1..000c39765 100644 --- a/src/VideoStreaming/SubtitleWriter.h +++ b/src/VideoStreaming/SubtitleWriter.h @@ -17,7 +17,6 @@ #pragma once #include "QGCLoggingCategory.h" -#include "VideoReceiver.h" #include #include #include @@ -33,25 +32,19 @@ public: explicit SubtitleWriter(QObject* parent = nullptr); ~SubtitleWriter() = default; - void setVideoReceiver(VideoReceiver* videoReceiver); + // starts capturing vehicle telemetry. + void startCapturingTelemetry(const QString& videoFile); + void stopCapturingTelemetry(); private slots: - // Fires with every "videoRecordingChanged() signal, stops capturing telemetry if video stopped." - void _onVideoRecordingChanged(); - // Captures a snapshot of telemetry data from vehicle into the subtitles file. void _captureTelemetry(); - // starts capturing vehicle telemetry. - void _startCapturingTelemetry(); - private: QTimer _timer; QStringList _values; QDateTime _startTime; QFile _file; - VideoReceiver* _videoReceiver; - static const int _sampleRate; }; diff --git a/src/VideoStreaming/VideoManager.cc b/src/VideoStreaming/VideoManager.cc index 82948638a..705efa243 100644 --- a/src/VideoStreaming/VideoManager.cc +++ b/src/VideoStreaming/VideoManager.cc @@ -30,6 +30,12 @@ QGC_LOGGING_CATEGORY(VideoManagerLog, "VideoManagerLog") +static const char* kFileExtension[VideoReceiver::FILE_FORMAT_MAX - VideoReceiver::FILE_FORMAT_MIN] = { + "mkv", + "mov", + "mp4" +}; + //----------------------------------------------------------------------------- VideoManager::VideoManager(QGCApplication* app, QGCToolbox* toolbox) : QGCTool(app, toolbox) @@ -43,6 +49,17 @@ VideoManager::~VideoManager() _videoReceiver = nullptr; delete _thermalVideoReceiver; _thermalVideoReceiver = nullptr; +#if defined(QGC_GST_STREAMING) + if (_thermalVideoSink != nullptr) { + gst_object_unref(_thermalVideoSink); + _thermalVideoSink = nullptr; + } + + if (_videoSink != nullptr) { + gst_object_unref(_videoSink); + _videoSink = nullptr; + } +#endif } //----------------------------------------------------------------------------- @@ -74,60 +91,22 @@ VideoManager::setToolbox(QGCToolbox *toolbox) emit isGStreamerChanged(); qCDebug(VideoManagerLog) << "New Video Source:" << videoSource; _videoReceiver = toolbox->corePlugin()->createVideoReceiver(this); - _videoReceiver->setUnittestMode(qgcApp()->runningUnitTests()); _thermalVideoReceiver = toolbox->corePlugin()->createVideoReceiver(this); - _thermalVideoReceiver->setUnittestMode(qgcApp()->runningUnitTests()); - _videoReceiver->moveToThread(qgcApp()->thread()); - _thermalVideoReceiver->moveToThread(qgcApp()->thread()); - - // Those connects are temporary: In a perfect world those connections are going to be done on the Qml - // but because currently the videoReceiver is created in the C++ world, this is easier. - // The fact returning a QVariant is a quite annoying to use proper signal / slot connection. - _updateSettings(); - auto appSettings = toolbox->settingsManager()->appSettings(); - for (auto *videoReceiver : { _videoReceiver, _thermalVideoReceiver}) { - // First, Setup the current values from the settings. - videoReceiver->setRtspTimeout(_videoSettings->rtspTimeout()->rawValue().toInt()); - videoReceiver->setStreamEnabled(_videoSettings->streamEnabled()->rawValue().toBool()); - videoReceiver->setRecordingFormatId(_videoSettings->recordingFormat()->rawValue().toInt()); - videoReceiver->setStreamConfigured(_videoSettings->streamConfigured()); - - connect(_videoSettings->rtspTimeout(), &Fact::rawValueChanged, - videoReceiver, [videoReceiver](const QVariant &value) { - videoReceiver->setRtspTimeout(value.toInt()); - } - ); - - connect(_videoSettings->streamEnabled(), &Fact::rawValueChanged, - videoReceiver, [videoReceiver](const QVariant &value) { - videoReceiver->setStreamEnabled(value.toBool()); - } - ); - - connect(_videoSettings->recordingFormat(), &Fact::rawValueChanged, - videoReceiver, [videoReceiver](const QVariant &value) { - videoReceiver->setRecordingFormatId(value.toInt()); - } - ); - - // Why some options are facts while others aren't? - connect(_videoSettings, &VideoSettings::streamConfiguredChanged, videoReceiver, &VideoReceiver::setStreamConfigured); + connect(_videoReceiver, &VideoReceiver::timeout, this, &VideoManager::_restartVideo); + connect(_videoReceiver, &VideoReceiver::streamingChanged, this, &VideoManager::_streamingChanged); + connect(_videoReceiver, &VideoReceiver::recordingStarted, this, &VideoManager::_recordingStarted); + connect(_videoReceiver, &VideoReceiver::recordingChanged, this, &VideoManager::_recordingChanged); + connect(_videoReceiver, &VideoReceiver::screenshotComplete, this, &VideoManager::_screenshotComplete); - // Fix those. - // connect(appSettings, &Fact::rawValueChanged, videoReceiver, &VideoReceiver::setVideoPath); - // connect(appSettings->videoSavePath(), &Fact::rawValueChanged, videoReceiver, &VideoReceiver::setImagePath); - - // Connect the video receiver with the rest of the app. - connect(videoReceiver, &VideoReceiver::restartTimeout, this, &VideoManager::restartVideo); - connect(videoReceiver, &VideoReceiver::sendMessage, qgcApp(), &QGCApplication::showMessage); - connect(videoReceiver, &VideoReceiver::beforeRecording, this, &VideoManager::cleanupOldVideos); - } + // FIXME: AV: I believe _thermalVideoReceiver should be handled just like _videoReceiver in terms of event + // and I expect that it will be changed during multiple video stream activity + connect(_thermalVideoReceiver, &VideoReceiver::timeout, this, &VideoManager::_restartVideo); + connect(_thermalVideoReceiver, &VideoReceiver::streamingChanged, this, &VideoManager::_streamingChanged); _updateSettings(); if(isGStreamer()) { startVideo(); - _subtitleWriter.setVideoReceiver(_videoReceiver); } else { stopVideo(); } @@ -135,17 +114,7 @@ VideoManager::setToolbox(QGCToolbox *toolbox) #endif } -QStringList VideoManager::videoMuxes() -{ - return {"matroskamux", "qtmux", "mp4mux"}; -} - -QStringList VideoManager::videoExtensions() -{ - return {"mkv", "mov", "mp4"}; -} - -void VideoManager::cleanupOldVideos() +void VideoManager::_cleanupOldVideos() { #if defined(QGC_GST_STREAMING) //-- Only perform cleanup if storage limit is enabled @@ -158,9 +127,11 @@ void VideoManager::cleanupOldVideos() videoDir.setSorting(QDir::Time); QStringList nameFilters; - for(const QString& extension : videoExtensions()) { - nameFilters << QString("*.") + extension; + + for(size_t i = 0; i < sizeof(kFileExtension) / sizeof(kFileExtension[0]); i += 1) { + nameFilters << QString("*.") + kFileExtension[i]; } + videoDir.setNameFilters(nameFilters); //-- get the list of videos stored QFileInfoList vidList = videoDir.entryInfoList(); @@ -188,18 +159,114 @@ void VideoManager::cleanupOldVideos() void VideoManager::startVideo() { - if(_videoReceiver) _videoReceiver->start(); - if(_thermalVideoReceiver) _thermalVideoReceiver->start(); + if (qgcApp()->runningUnitTests()) { + return; + } + + if(!_videoSettings->streamEnabled()->rawValue().toBool() || !_videoSettings->streamConfigured()) { + qCDebug(VideoReceiverLog) << "Stream not enabled/configured"; + return; + } + +#if defined(QGC_GST_STREAMING) + const unsigned timeout = _videoSettings->rtspTimeout()->rawValue().toUInt(); + + if(_videoReceiver != nullptr) { + _videoReceiver->start(_videoUri, timeout); + if (_videoSink != nullptr) { + _videoReceiver->startDecoding(_videoSink); + } + } + + if(_thermalVideoReceiver != nullptr) { + _thermalVideoReceiver->start(_thermalVideoUri, timeout); + if (_thermalVideoSink != nullptr) { + _thermalVideoReceiver->startDecoding(_thermalVideoSink); + } + } +#endif } //----------------------------------------------------------------------------- void VideoManager::stopVideo() { + if (qgcApp()->runningUnitTests()) { + return; + } + if(_videoReceiver) _videoReceiver->stop(); if(_thermalVideoReceiver) _thermalVideoReceiver->stop(); } +void +VideoManager::startRecording(const QString& videoFile) +{ + if (qgcApp()->runningUnitTests()) { + return; + } + + if (!_videoReceiver) { + qgcApp()->showMessage(tr("Video receiver is not ready.")); + return; + } + + const VideoReceiver::FILE_FORMAT fileFormat = static_cast(_videoSettings->recordingFormat()->rawValue().toInt()); + + if(fileFormat < VideoReceiver::FILE_FORMAT_MIN || fileFormat >= VideoReceiver::FILE_FORMAT_MAX) { + qgcApp()->showMessage(tr("Invalid video format defined.")); + return; + } + + //-- Disk usage maintenance + _cleanupOldVideos(); + + QString savePath = qgcApp()->toolbox()->settingsManager()->appSettings()->videoSavePath(); + + if(savePath.isEmpty()) { + qgcApp()->showMessage(tr("Unabled to record video. Video save path must be specified in Settings.")); + return; + } + + _videoFile = savePath + "/" + + (videoFile.isEmpty() ? QDateTime::currentDateTime().toString("yyyy-MM-dd_hh.mm.ss") : videoFile) + + "." + kFileExtension[fileFormat - VideoReceiver::FILE_FORMAT_MIN]; + + _videoReceiver->startRecording(_videoFile, fileFormat); +} + +void +VideoManager::stopRecording() +{ + if (qgcApp()->runningUnitTests()) { + return; + } + + if (!_videoReceiver) { + return; + } + + _videoReceiver->stopRecording(); +} + +void +VideoManager::grabImage(const QString& imageFile) +{ + if (qgcApp()->runningUnitTests()) { + return; + } + + if (!_videoReceiver) { + return; + } + + _imageFile = imageFile; + + emit imageFileChanged(); + + _videoReceiver->takeScreenshot(_imageFile); +} + //----------------------------------------------------------------------------- double VideoManager::aspectRatio() { @@ -210,6 +277,7 @@ double VideoManager::aspectRatio() return pInfo->aspectRatio(); } } + // FIXME: AV: use _videoReceiver->videoSize() to calculate AR (if AR is not specified in the settings?) return _videoSettings->aspectRatio()->rawValue().toDouble(); } @@ -263,6 +331,13 @@ VideoManager::hasThermal() return false; } +//----------------------------------------------------------------------------- +QString +VideoManager::imageFile() +{ + return _imageFile; +} + //----------------------------------------------------------------------------- bool VideoManager::autoStreamConfigured() @@ -304,28 +379,28 @@ VideoManager::_videoSourceChanged() emit hasVideoChanged(); emit isGStreamerChanged(); emit isAutoStreamChanged(); - restartVideo(); + _restartVideo(); } //----------------------------------------------------------------------------- void VideoManager::_udpPortChanged() { - restartVideo(); + _restartVideo(); } //----------------------------------------------------------------------------- void VideoManager::_rtspUrlChanged() { - restartVideo(); + _restartVideo(); } //----------------------------------------------------------------------------- void VideoManager::_tcpUrlChanged() { - restartVideo(); + _restartVideo(); } //----------------------------------------------------------------------------- @@ -390,7 +465,7 @@ VideoManager::_makeVideoSink(gpointer widget) if ((sink = gst_element_factory_make("qgcvideosinkbin", nullptr)) != nullptr) { g_object_set(sink, "widget", widget, NULL); } else { - qCritical() << "VideoManager::_makeVideoSink() failed. Error with gst_element_factory_make('qgcvideosinkbin')"; + qCritical() << "gst_element_factory_make('qgcvideosinkbin') failed"; } return sink; @@ -405,24 +480,32 @@ VideoManager::_initVideo() QQuickItem* root = qgcApp()->mainRootWindow(); if (root == nullptr) { - qCDebug(VideoManagerLog) << "VideoManager::_makeVideoSink() failed. No root window"; + qCDebug(VideoManagerLog) << "mainRootWindow() failed. No root window"; return; } QQuickItem* widget = root->findChild("videoContent"); - if (widget != nullptr) { - _videoReceiver->setVideoSink(_makeVideoSink(widget)); + if (widget != nullptr && _videoReceiver != nullptr) { + if ((_videoSink = _makeVideoSink(widget)) != nullptr) { + _videoReceiver->startDecoding(_videoSink); + } else { + qCDebug(VideoManagerLog) << "_makeVideoSink() failed"; + } } else { - qCDebug(VideoManagerLog) << "VideoManager::_makeVideoSink() failed. 'videoContent' widget not found"; + qCDebug(VideoManagerLog) << "video receiver disabled"; } widget = root->findChild("thermalVideo"); - if (widget != nullptr) { - _thermalVideoReceiver->setVideoSink(_makeVideoSink(widget)); + if (widget != nullptr && _thermalVideoReceiver != nullptr) { + if ((_thermalVideoSink = _makeVideoSink(widget)) != nullptr) { + _thermalVideoReceiver->startDecoding(_thermalVideoSink); + } else { + qCDebug(VideoManagerLog) << "_makeVideoSink() failed"; + } } else { - qCDebug(VideoManagerLog) << "VideoManager::_makeVideoSink() failed. 'thermalVideo' widget not found"; + qCDebug(VideoManagerLog) << "thermal video receiver disabled"; } #endif } @@ -440,23 +523,23 @@ VideoManager::_updateSettings() qCDebug(VideoManagerLog) << "Configure primary stream: " << pInfo->uri(); switch(pInfo->type()) { case VIDEO_STREAM_TYPE_RTSP: - _videoReceiver->setUri(pInfo->uri()); + _setVideoUri(pInfo->uri()); _toolbox->settingsManager()->videoSettings()->videoSource()->setRawValue(VideoSettings::videoSourceRTSP); break; case VIDEO_STREAM_TYPE_TCP_MPEG: - _videoReceiver->setUri(pInfo->uri()); + _setVideoUri(pInfo->uri()); _toolbox->settingsManager()->videoSettings()->videoSource()->setRawValue(VideoSettings::videoSourceTCP); break; case VIDEO_STREAM_TYPE_RTPUDP: - _videoReceiver->setUri(QStringLiteral("udp://0.0.0.0:%1").arg(pInfo->uri())); + _setVideoUri(QStringLiteral("udp://0.0.0.0:%1").arg(pInfo->uri())); _toolbox->settingsManager()->videoSettings()->videoSource()->setRawValue(VideoSettings::videoSourceUDPH264); break; case VIDEO_STREAM_TYPE_MPEG_TS_H264: - _videoReceiver->setUri(QStringLiteral("mpegts://0.0.0.0:%1").arg(pInfo->uri())); + _setVideoUri(QStringLiteral("mpegts://0.0.0.0:%1").arg(pInfo->uri())); _toolbox->settingsManager()->videoSettings()->videoSource()->setRawValue(VideoSettings::videoSourceMPEGTS); break; default: - _videoReceiver->setUri(pInfo->uri()); + _setVideoUri(pInfo->uri()); break; } //-- Thermal stream (if any) @@ -466,16 +549,16 @@ VideoManager::_updateSettings() switch(pTinfo->type()) { case VIDEO_STREAM_TYPE_RTSP: case VIDEO_STREAM_TYPE_TCP_MPEG: - _thermalVideoReceiver->setUri(pTinfo->uri()); + _setThermalVideoUri(pTinfo->uri()); break; case VIDEO_STREAM_TYPE_RTPUDP: - _thermalVideoReceiver->setUri(QStringLiteral("udp://0.0.0.0:%1").arg(pTinfo->uri())); + _setThermalVideoUri(QStringLiteral("udp://0.0.0.0:%1").arg(pTinfo->uri())); break; case VIDEO_STREAM_TYPE_MPEG_TS_H264: - _thermalVideoReceiver->setUri(QStringLiteral("mpegts://0.0.0.0:%1").arg(pTinfo->uri())); + _setThermalVideoUri(QStringLiteral("mpegts://0.0.0.0:%1").arg(pTinfo->uri())); break; default: - _thermalVideoReceiver->setUri(pTinfo->uri()); + _setThermalVideoUri(pTinfo->uri()); break; } } @@ -484,20 +567,62 @@ VideoManager::_updateSettings() } QString source = _videoSettings->videoSource()->rawValue().toString(); if (source == VideoSettings::videoSourceUDPH264) - _videoReceiver->setUri(QStringLiteral("udp://0.0.0.0:%1").arg(_videoSettings->udpPort()->rawValue().toInt())); + _setVideoUri(QStringLiteral("udp://0.0.0.0:%1").arg(_videoSettings->udpPort()->rawValue().toInt())); else if (source == VideoSettings::videoSourceUDPH265) - _videoReceiver->setUri(QStringLiteral("udp265://0.0.0.0:%1").arg(_videoSettings->udpPort()->rawValue().toInt())); + _setVideoUri(QStringLiteral("udp265://0.0.0.0:%1").arg(_videoSettings->udpPort()->rawValue().toInt())); else if (source == VideoSettings::videoSourceMPEGTS) - _videoReceiver->setUri(QStringLiteral("mpegts://0.0.0.0:%1").arg(_videoSettings->udpPort()->rawValue().toInt())); + _setVideoUri(QStringLiteral("mpegts://0.0.0.0:%1").arg(_videoSettings->udpPort()->rawValue().toInt())); else if (source == VideoSettings::videoSourceRTSP) - _videoReceiver->setUri(_videoSettings->rtspUrl()->rawValue().toString()); + _setVideoUri(_videoSettings->rtspUrl()->rawValue().toString()); else if (source == VideoSettings::videoSourceTCP) - _videoReceiver->setUri(QStringLiteral("tcp://%1").arg(_videoSettings->tcpUrl()->rawValue().toString())); + _setVideoUri(QStringLiteral("tcp://%1").arg(_videoSettings->tcpUrl()->rawValue().toString())); +} + +void +VideoManager::_setVideoUri(const QString& uri) +{ +#if defined(QGC_GST_TAISYNC_ENABLED) && (defined(__android__) || defined(__ios__)) + //-- Taisync on iOS or Android sends a raw h.264 stream + if (isTaisync()) { + _videoUri = QString("tsusb://0.0.0.0:%1").arg(TAISYNC_VIDEO_UDP_PORT); + return; + } +#endif + _videoUri = uri; +} + +void +VideoManager::_setThermalVideoUri(const QString& uri) +{ +#if defined(QGC_GST_TAISYNC_ENABLED) && (defined(__android__) || defined(__ios__)) + //-- Taisync on iOS or Android sends a raw h.264 stream + if (isTaisync()) { + // FIXME: AV: TAISYNC_VIDEO_UDP_PORT is used by video stream, thermal stream should go via its own proxy + _thermalVideoUri = QString("tsusb://0.0.0.0:%1").arg(TAISYNC_VIDEO_UDP_PORT); + return; + } +#endif + _thermalVideoUri = uri; +} + +//----------------------------------------------------------------------------- +void +VideoManager::_streamingChanged() +{ +#if defined(QGC_GST_STREAMING) + // FIXME: AV: we need VideoReceiver::running() to avoid restarting if one of streams is not active + // but since VideoManager is going to be relpaced by Video Model during multiple video streaming development activity + // I'll leave it as is for week or two + if ((_videoReceiver && !_videoReceiver->streaming()) + /*|| (_thermalVideoReceiver && !_thermalVideoReceiver->streaming())*/) { + _restartVideo(); + } +#endif } //----------------------------------------------------------------------------- void -VideoManager::restartVideo() +VideoManager::_restartVideo() { #if defined(QGC_GST_STREAMING) qCDebug(VideoManagerLog) << "Restart video streaming"; @@ -508,6 +633,28 @@ VideoManager::restartVideo() #endif } +//----------------------------------------------------------------------------- +void +VideoManager::_recordingStarted() +{ + _subtitleWriter.startCapturingTelemetry(_videoFile); +} + +//----------------------------------------------------------------------------- +void +VideoManager::_recordingChanged() +{ + if (_videoReceiver && !_videoReceiver->recording()) { + _subtitleWriter.stopCapturingTelemetry(); + } +} + +//---------------------------------------------------------------------------------------- +void +VideoManager::_screenshotComplete() +{ +} + //---------------------------------------------------------------------------------------- void VideoManager::_setActiveVehicle(Vehicle* vehicle) @@ -519,14 +666,14 @@ VideoManager::_setActiveVehicle(Vehicle* vehicle) if(pCamera) { pCamera->stopStream(); } - disconnect(_activeVehicle->dynamicCameras(), &QGCCameraManager::streamChanged, this, &VideoManager::restartVideo); + disconnect(_activeVehicle->dynamicCameras(), &QGCCameraManager::streamChanged, this, &VideoManager::_restartVideo); } } _activeVehicle = vehicle; if(_activeVehicle) { connect(_activeVehicle, &Vehicle::connectionLostChanged, this, &VideoManager::_connectionLostChanged); if(_activeVehicle->dynamicCameras()) { - connect(_activeVehicle->dynamicCameras(), &QGCCameraManager::streamChanged, this, &VideoManager::restartVideo); + connect(_activeVehicle->dynamicCameras(), &QGCCameraManager::streamChanged, this, &VideoManager::_restartVideo); QGCCameraControl* pCamera = _activeVehicle->dynamicCameras()->currentCameraInstance(); if(pCamera) { pCamera->resumeStream(); @@ -537,7 +684,7 @@ VideoManager::_setActiveVehicle(Vehicle* vehicle) setfullScreen(false); } emit autoStreamConfiguredChanged(); - restartVideo(); + _restartVideo(); } //---------------------------------------------------------------------------------------- diff --git a/src/VideoStreaming/VideoManager.h b/src/VideoStreaming/VideoManager.h index 54e6051d3..70c5d8a79 100644 --- a/src/VideoStreaming/VideoManager.h +++ b/src/VideoStreaming/VideoManager.h @@ -50,6 +50,7 @@ public: Q_PROPERTY(double thermalHfov READ thermalHfov NOTIFY aspectRatioChanged) Q_PROPERTY(bool autoStreamConfigured READ autoStreamConfigured NOTIFY autoStreamConfiguredChanged) Q_PROPERTY(bool hasThermal READ hasThermal NOTIFY aspectRatioChanged) + Q_PROPERTY(QString imageFile READ imageFile NOTIFY imageFileChanged) virtual bool hasVideo (); virtual bool isGStreamer (); @@ -62,14 +63,12 @@ public: virtual double thermalHfov (); virtual bool autoStreamConfigured(); virtual bool hasThermal (); - virtual void restartVideo (); + virtual QString imageFile (); + virtual VideoReceiver* videoReceiver () { return _videoReceiver; } virtual VideoReceiver* thermalVideoReceiver () { return _thermalVideoReceiver; } - QStringList videoExtensions(); - QStringList videoMuxes(); - #if defined(QGC_DISABLE_UVC) virtual bool uvcEnabled () { return false; } #else @@ -85,7 +84,10 @@ public: Q_INVOKABLE void startVideo (); Q_INVOKABLE void stopVideo (); - void cleanupOldVideos(); + Q_INVOKABLE void startRecording (const QString& videoFile = QString()); + Q_INVOKABLE void stopRecording (); + + Q_INVOKABLE void grabImage(const QString& imageFile); signals: void hasVideoChanged (); @@ -96,6 +98,7 @@ signals: void isTaisyncChanged (); void aspectRatioChanged (); void autoStreamConfiguredChanged(); + void imageFileChanged (); protected slots: void _videoSourceChanged (); @@ -115,13 +118,29 @@ protected: #endif void _initVideo (); void _updateSettings (); + void _setVideoUri (const QString& uri); + void _setThermalVideoUri (const QString& uri); + void _cleanupOldVideos (); + void _restartVideo (); + void _streamingChanged (); + void _recordingStarted (); + void _recordingChanged (); + void _screenshotComplete (); protected: + QString _videoFile; + QString _imageFile; SubtitleWriter _subtitleWriter; bool _isTaisync = false; VideoReceiver* _videoReceiver = nullptr; VideoReceiver* _thermalVideoReceiver = nullptr; +#if defined(QGC_GST_STREAMING) + GstElement* _videoSink = nullptr; + GstElement* _thermalVideoSink = nullptr; +#endif VideoSettings* _videoSettings = nullptr; + QString _videoUri; + QString _thermalVideoUri; QString _videoSourceID; bool _fullScreen = false; Vehicle* _activeVehicle = nullptr; diff --git a/src/VideoStreaming/VideoReceiver.cc b/src/VideoStreaming/VideoReceiver.cc index 8608433c2..da89ec180 100644 --- a/src/VideoStreaming/VideoReceiver.cc +++ b/src/VideoStreaming/VideoReceiver.cc @@ -15,314 +15,607 @@ */ #include "VideoReceiver.h" -#include "VideoManager.h" -#ifdef QGC_GST_TAISYNC_ENABLED -#include "TaisyncHandler.h" -#endif + #include #include -#include #include #include QGC_LOGGING_CATEGORY(VideoReceiverLog, "VideoReceiverLog") -#if defined(QGC_GST_STREAMING) - -static const char* kVideoExtensions[] = -{ - "mkv", - "mov", - "mp4" -}; - -static const char* kVideoMuxes[] = -{ - "matroskamux", - "qtmux", - "mp4mux" -}; - -#define NUM_MUXES (sizeof(kVideoMuxes) / sizeof(char*)) - -#endif - +//----------------------------------------------------------------------------- +// Our pipeline look like this: +// +// +-->queue-->_decoderValve[-->_decoder-->_videoSink] +// | +// _source-->_tee +// | +// +-->queue-->_recorderValve[-->_fileSink] +// VideoReceiver::VideoReceiver(QObject* parent) - : QObject(parent) + : QThread(parent) #if defined(QGC_GST_STREAMING) - , _running(false) - , _recording(false) - , _streaming(false) - , _starting(false) - , _stopping(false) - , _stop(true) - , _sink(nullptr) + , _removingDecoder(false) + , _removingRecorder(false) + , _source(nullptr) , _tee(nullptr) - , _pipeline(nullptr) + , _decoderValve(nullptr) + , _recorderValve(nullptr) + , _decoder(nullptr) , _videoSink(nullptr) - , _lastFrameId(G_MAXUINT64) - , _lastFrameTime(0) - , _restart_time_ms(1389) + , _fileSink(nullptr) + , _pipeline(nullptr) + , _lastSourceFrameTime(0) + , _lastVideoFrameTime(0) + , _resetVideoSink(true) + , _videoSinkProbeId(0) , _udpReconnect_us(5000000) + , _shutdown(false) #endif - , _videoRunning(false) - , _showFullScreen(false) - , _streamEnabled(false) - , _streamConfigured(false) - , _unittTestMode(false) - , _isTaisync(false) + , _streaming(false) + , _decoding(false) + , _recording(false) { - // FIXME: AV: temporal workaround to allow for Qt::QueuedConnection for gstreamer signals. Need to evaluate proper solution - perhaps QtGst will be helpful #if defined(QGC_GST_STREAMING) - _restart_timer.setSingleShot(true); - connect(&_restart_timer, &QTimer::timeout, this, &VideoReceiver::restartTimeout); - connect(this, &VideoReceiver::msgErrorReceived, this, &VideoReceiver::_handleError, Qt::QueuedConnection); - connect(this, &VideoReceiver::msgEOSReceived, this, &VideoReceiver::_handleEOS, Qt::QueuedConnection); - connect(this, &VideoReceiver::msgStateChangedReceived, this, &VideoReceiver::_handleStateChanged, Qt::QueuedConnection); - connect(&_frameTimer, &QTimer::timeout, this, &VideoReceiver::_updateTimer); - _frameTimer.start(1000); + QThread::start(); + connect(&_watchdogTimer, &QTimer::timeout, this, &VideoReceiver::_watchdog); + _watchdogTimer.start(1000); #endif } -VideoReceiver::~VideoReceiver() +VideoReceiver::~VideoReceiver(void) { #if defined(QGC_GST_STREAMING) stop(); - setVideoSink(nullptr); + _post([this](){ + _shutdown = true; + }); + QThread::wait(); #endif } -//----------------------------------------------------------------------------- void -VideoReceiver::grabImage(QString imageFile) +VideoReceiver::start(const QString& uri, unsigned timeout) { - _imageFile = imageFile; - emit imageFileChanged(); -} - -//----------------------------------------------------------------------------- #if defined(QGC_GST_STREAMING) -static void -newPadCB(GstElement* element, GstPad* pad, gpointer data) -{ - gchar* name = gst_pad_get_name(pad); - //g_print("A new pad %s was created\n", name); - GstCaps* p_caps = gst_pad_get_pad_template_caps (pad); - gchar* description = gst_caps_to_string(p_caps); - qCDebug(VideoReceiverLog) << p_caps << ", " << description; - g_free(description); - GstElement* sink = GST_ELEMENT(data); - if(gst_element_link_pads(element, name, sink, "sink") == false) - qCCritical(VideoReceiverLog) << "Failed to link elements\n"; - g_free(name); -} - -static gboolean -autoplugQueryCaps(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data) -{ - GstElement* glupload = (GstElement* )data; + if (!_isOurThread()) { + _post([this, uri, timeout]() { + start(uri, timeout); + }); + return; + } - GstPad* sinkpad = gst_element_get_static_pad(glupload, "sink"); + if(_pipeline) { + qCDebug(VideoReceiverLog) << "Already running!"; + return; + } - if (!sinkpad) { - qCCritical(VideoReceiverLog) << "No sink pad found"; - return FALSE; + if (uri.isEmpty()) { + qCDebug(VideoReceiverLog) << "Failed because URI is not specified"; + return; } - GstCaps* filter; + qCDebug(VideoReceiverLog) << "Starting"; - gst_query_parse_caps(query, &filter); + _timeout = timeout; - GstCaps* sinkcaps = gst_pad_query_caps(sinkpad, filter); + bool running = false; + bool pipelineUp = false; - gst_query_set_caps_result(query, sinkcaps); + GstElement* decoderQueue = nullptr; + GstElement* recorderQueue = nullptr; - const gboolean ret = !gst_caps_is_empty(sinkcaps); + do { + if((_tee = gst_element_factory_make("tee", nullptr)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_factory_make('tee') failed"; + break; + } - gst_caps_unref(sinkcaps); - sinkcaps = nullptr; + GstPad* pad; - gst_object_unref(sinkpad); - sinkpad = nullptr; + if ((pad = gst_element_get_static_pad(_tee, "sink")) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed"; + break; + } - return ret; -} + _lastSourceFrameTime = 0; -static gboolean -autoplugQueryContext(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data) -{ - GstElement* glsink = (GstElement* )data; + gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _teeProbe, this, nullptr); + gst_object_unref(pad); + pad = nullptr; - GstPad* sinkpad = gst_element_get_static_pad(glsink, "sink"); + if((decoderQueue = gst_element_factory_make("queue", nullptr)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_factory_make('queue') failed"; + break; + } - if (!sinkpad){ - qCCritical(VideoReceiverLog) << "No sink pad found"; - return FALSE; - } + if((_decoderValve = gst_element_factory_make("valve", nullptr)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_factory_make('valve') failed"; + break; + } - const gboolean ret = gst_pad_query(sinkpad, query); + g_object_set(_decoderValve, "drop", TRUE, nullptr); - gst_object_unref(sinkpad); - sinkpad = nullptr; + if((recorderQueue = gst_element_factory_make("queue", nullptr)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_factory_make('queue') failed"; + break; + } - return ret; -} + if((_recorderValve = gst_element_factory_make("valve", nullptr)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_factory_make('valve') failed"; + break; + } -static gboolean -autoplugQueryCB(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data) -{ - gboolean ret; + g_object_set(_recorderValve, "drop", TRUE, nullptr); - switch (GST_QUERY_TYPE (query)) { - case GST_QUERY_CAPS: - ret = autoplugQueryCaps(bin, pad, element, query, data); - break; - case GST_QUERY_CONTEXT: - ret = autoplugQueryContext(bin, pad, element, query, data); - break; - default: - ret = FALSE; - break; - } + if ((_pipeline = gst_pipeline_new("receiver")) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_pipeline_new() failed"; + break; + } - return ret; -} + g_object_set(_pipeline, "message-forward", TRUE, nullptr); -//----------------------------------------------------------------------------- -static void -_wrapWithGhostPad(GstElement* element, GstPad* pad, gpointer data) -{ - gchar* name = gst_pad_get_name(pad); + if ((_source = _makeSource(uri)) == nullptr) { + qCCritical(VideoReceiverLog) << "_makeSource() failed"; + break; + } - GstPad* ghostpad = gst_ghost_pad_new(name, pad); + g_signal_connect(_source, "pad-added", G_CALLBACK(_onNewPad), this); - g_free(name); + gst_bin_add_many(GST_BIN(_pipeline), _source, _tee, decoderQueue, _decoderValve, recorderQueue, _recorderValve, nullptr); - gst_pad_set_active(ghostpad, TRUE); + pipelineUp = true; - if (!gst_element_add_pad(GST_ELEMENT_PARENT(element), ghostpad)) { - qCCritical(VideoReceiverLog) << "Failed to add ghost pad to source"; - } -} + if(!gst_element_link_many(_tee, decoderQueue, _decoderValve, nullptr)) { + qCCritical(VideoReceiverLog) << "Unable to link decoder queue"; + break; + } -static void -_linkPadWithOptionalBuffer(GstElement* element, GstPad* pad, gpointer data) -{ - gboolean isRtpPad = FALSE; + if(!gst_element_link_many(_tee, recorderQueue, _recorderValve, nullptr)) { + qCCritical(VideoReceiverLog) << "Unable to link recorder queue"; + break; + } - GstCaps* filter = gst_caps_from_string("application/x-rtp"); + GstBus* bus = nullptr; - if (filter != nullptr) { - GstCaps* caps = gst_pad_query_caps(pad, nullptr); + if ((bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline))) != nullptr) { + gst_bus_enable_sync_message_emission(bus); + g_signal_connect(bus, "sync-message", G_CALLBACK(_onBusMessage), this); + gst_object_unref(bus); + bus = nullptr; + } - if (caps != nullptr) { - if (!gst_caps_is_any(caps) && gst_caps_can_intersect(caps, filter)) { - isRtpPad = TRUE; - } - gst_caps_unref(caps); - caps = nullptr; + GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-initial"); + running = gst_element_set_state(_pipeline, GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE; + } while(0); + + if (!running) { + qCCritical(VideoReceiverLog) << "Failed"; + + // In newer versions, the pipeline will clean up all references that are added to it + if (_pipeline != nullptr) { + gst_element_set_state(_pipeline, GST_STATE_NULL); + gst_object_unref(_pipeline); + _pipeline = nullptr; } - gst_caps_unref(filter); - filter = nullptr; + // If we failed before adding items to the pipeline, then clean up + if (!pipelineUp) { + if (_recorderValve != nullptr) { + gst_object_unref(_recorderValve); + _recorderValve = nullptr; + } + + if (recorderQueue != nullptr) { + gst_object_unref(recorderQueue); + recorderQueue = nullptr; + } + + if (_decoderValve != nullptr) { + gst_object_unref(_decoderValve); + _decoderValve = nullptr; + } + + if (decoderQueue != nullptr) { + gst_object_unref(decoderQueue); + decoderQueue = nullptr; + } + + if (_tee != nullptr) { + gst_object_unref(_tee); + _tee = nullptr; + } + + if (_source != nullptr) { + gst_object_unref(_source); + _source = nullptr; + } + } + } else { + GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-started"); + qCDebug(VideoReceiverLog) << "Started"; } +#else + Q_UNUSED(uri); + Q_UNUSED(timeout); +#endif +} - if (isRtpPad) { - GstElement* buffer; +void +VideoReceiver::stop(void) +{ +#if defined(QGC_GST_STREAMING) + if (!_isOurThread()) { + _post([this]() { + stop(); + }); + return; + } - if ((buffer = gst_element_factory_make("rtpjitterbuffer", nullptr)) != nullptr) { - gst_bin_add(GST_BIN(GST_ELEMENT_PARENT(element)), buffer); + qCDebug(VideoReceiverLog) << "Stopping"; - gst_element_sync_state_with_parent(buffer); + if (_pipeline != nullptr) { + GstBus* bus; - GstPad* sinkpad = gst_element_get_static_pad(buffer, "sink"); + if ((bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline))) != nullptr) { + gst_bus_disable_sync_message_emission(bus); - if (sinkpad != nullptr) { - const GstPadLinkReturn ret = gst_pad_link(pad, sinkpad); + gst_element_send_event(_pipeline, gst_event_new_eos()); - gst_object_unref(sinkpad); - sinkpad = nullptr; + GstMessage* msg; - if (ret == GST_PAD_LINK_OK) { - pad = gst_element_get_static_pad(buffer, "src"); - element = buffer; - } else { - qCDebug(VideoReceiverLog) << "Partially failed - gst_pad_link()"; + if((msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, (GstMessageType)(GST_MESSAGE_EOS|GST_MESSAGE_ERROR))) != nullptr) { + if(GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) { + qCCritical(VideoReceiverLog) << "Error stopping pipeline!"; + } else if(GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) { + qCDebug(VideoReceiverLog) << "End of stream received!"; } + + gst_message_unref(msg); + msg = nullptr; } else { - qCDebug(VideoReceiverLog) << "Partially failed - gst_element_get_static_pad()"; + qCCritical(VideoReceiverLog) << "gst_bus_timed_pop_filtered() failed"; } + + gst_object_unref(bus); + bus = nullptr; } else { - qCDebug(VideoReceiverLog) << "Partially failed - gst_element_factory_make('rtpjitterbuffer')"; + qCCritical(VideoReceiverLog) << "gst_pipeline_get_bus() failed"; } - } - newPadCB(element, pad, data); -} + gst_element_set_state(_pipeline, GST_STATE_NULL); -static gboolean -_padProbe(GstElement* element, GstPad* pad, gpointer user_data) -{ - int* probeRes = (int*)user_data; + // FIXME: check if branch is connected and remove all elements from branch + if (_fileSink != nullptr) { + _shutdownRecordingBranch(); + } - *probeRes |= 1; + if (_videoSink != nullptr) { + _shutdownDecodingBranch(); + } - GstCaps* filter = gst_caps_from_string("application/x-rtp"); + GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-stopped"); - if (filter != nullptr) { - GstCaps* caps = gst_pad_query_caps(pad, nullptr); + gst_object_unref(_pipeline); + _pipeline = nullptr; - if (caps != nullptr) { - if (!gst_caps_is_any(caps) && gst_caps_can_intersect(caps, filter)) { - *probeRes |= 2; - } + _recorderValve = nullptr; + _decoderValve = nullptr; + _tee = nullptr; + _source = nullptr; - gst_caps_unref(caps); - caps = nullptr; - } + _lastSourceFrameTime = 0; - gst_caps_unref(filter); - filter = nullptr; + if (_streaming) { + _streaming = false; + emit streamingChanged(); + qCDebug(VideoReceiverLog) << "Streaming stopped"; + } } - return TRUE; + qCDebug(VideoReceiverLog) << "Stopped"; +#endif } -GstElement* -VideoReceiver::_makeSource(const QString& uri) +void +VideoReceiver::startDecoding(VideoSink* videoSink) { - if (uri.isEmpty()) { - qCCritical(VideoReceiverLog) << "Failed because URI is not specified"; - return nullptr; +#if defined(QGC_GST_STREAMING) + if (!_isOurThread()) { + gst_object_ref(videoSink); + _post([this, videoSink]() { + startDecoding(videoSink); + gst_object_unref(videoSink); + }); + return; } - bool isTaisync = uri.contains("tsusb://"); - bool isUdp264 = uri.contains("udp://"); - bool isRtsp = uri.contains("rtsp://"); - bool isUdp265 = uri.contains("udp265://"); - bool isTcpMPEGTS= uri.contains("tcp://"); - bool isUdpMPEGTS= uri.contains("mpegts://"); + qCDebug(VideoReceiverLog) << "Starting decoding"; - GstElement* source = nullptr; - GstElement* buffer = nullptr; - GstElement* parser = nullptr; - GstElement* bin = nullptr; - GstElement* srcbin = nullptr; + if (_pipeline == nullptr) { + if (_videoSink != nullptr) { + gst_object_unref(_videoSink); + _videoSink = nullptr; + } + } - do { - QUrl url(uri); + if(_videoSink != nullptr || _decoding) { + qCDebug(VideoReceiverLog) << "Already decoding!"; + return; + } - if(isTcpMPEGTS) { - if ((source = gst_element_factory_make("tcpclientsrc", "source")) != nullptr) { - g_object_set(static_cast(source), "host", qPrintable(url.host()), "port", url.port(), nullptr); - } - } else if (isRtsp) { - if ((source = gst_element_factory_make("rtspsrc", "source")) != nullptr) { - g_object_set(static_cast(source), "location", qPrintable(uri), "latency", 17, "udp-reconnect", 1, "timeout", _udpReconnect_us, NULL); - } - } else if(isUdp264 || isUdp265 || isUdpMPEGTS || isTaisync) { - if ((source = gst_element_factory_make("udpsrc", "source")) != nullptr) { + GstPad* pad; + + if ((pad = gst_element_get_static_pad(videoSink, "sink")) == nullptr) { + qCCritical(VideoReceiverLog) << "Unable to find sink pad of video sink"; + return; + } + + _lastVideoFrameTime = 0; + _resetVideoSink = true; + + _videoSinkProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _videoSinkProbe, this, nullptr); + gst_object_unref(pad); + pad = nullptr; + + _videoSink = videoSink; + gst_object_ref(_videoSink); + + _removingDecoder = false; + + if (!_streaming) { + return; + } + + if (!_addDecoder(_decoderValve)) { + qCCritical(VideoReceiverLog) << "_addDecoder() failed"; + return; + } + + g_object_set(_decoderValve, "drop", FALSE, nullptr); + + qCDebug(VideoReceiverLog) << "Decoding started"; +#else + Q_UNUSED(videoSink) +#endif +} + +void +VideoReceiver::stopDecoding(void) +{ +#if defined(QGC_GST_STREAMING) + if (!_isOurThread()) { + _post([this]() { + stopDecoding(); + }); + return; + } + + qCDebug(VideoReceiverLog) << "Stopping decoding"; + + // exit immediately if we are not decoding + if (_pipeline == nullptr || !_decoding) { + qCDebug(VideoReceiverLog) << "Not decoding!"; + return; + } + + g_object_set(_decoderValve, "drop", TRUE, nullptr); + + _removingDecoder = true; + + _unlinkBranch(_decoderValve); +#endif +} + +void +VideoReceiver::startRecording(const QString& videoFile, FILE_FORMAT format) +{ +#if defined(QGC_GST_STREAMING) + if (!_isOurThread()) { + _post([this, videoFile, format]() { + startRecording(videoFile, format); + }); + return; + } + + qCDebug(VideoReceiverLog) << "Starting recording"; + + // exit immediately if we are already recording + if (_pipeline == nullptr || _recording) { + qCDebug(VideoReceiverLog) << "Already recording!"; + return; + } + + qCDebug(VideoReceiverLog) << "New video file:" << videoFile; + + if ((_fileSink = _makeFileSink(videoFile, format)) == nullptr) { + qCCritical(VideoReceiverLog) << "_makeFileSink() failed"; + return; + } + + _removingRecorder = false; + + gst_object_ref(_fileSink); + + gst_bin_add(GST_BIN(_pipeline), _fileSink); + + if (!gst_element_link(_recorderValve, _fileSink)) { + qCCritical(VideoReceiverLog) << "Failed to link valve and file sink"; + return; + } + + gst_element_sync_state_with_parent(_fileSink); + + GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-filesink"); + + // Install a probe on the recording branch to drop buffers until we hit our first keyframe + // When we hit our first keyframe, we can offset the timestamps appropriately according to the first keyframe time + // This will ensure the first frame is a keyframe at t=0, and decoding can begin immediately on playback + GstPad* probepad; + + if ((probepad = gst_element_get_static_pad(_recorderValve, "src")) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed"; + return; + } + + gst_pad_add_probe(probepad, GST_PAD_PROBE_TYPE_BUFFER, _keyframeWatch, this, nullptr); // to drop the buffers until key frame is received + gst_object_unref(probepad); + probepad = nullptr; + + g_object_set(_recorderValve, "drop", FALSE, nullptr); + + _recording = true; + + emit recordingChanged(); + + qCDebug(VideoReceiverLog) << "Recording started"; +#else + Q_UNUSED(videoFile) + Q_UNUSED(format) +#endif +} + +//----------------------------------------------------------------------------- +void +VideoReceiver::stopRecording(void) +{ +#if defined(QGC_GST_STREAMING) + if (!_isOurThread()) { + _post([this]() { + stopRecording(); + }); + return; + } + + qCDebug(VideoReceiverLog) << "Stopping recording"; + + // exit immediately if we are not recording + if (_pipeline == nullptr || !_recording) { + qCDebug(VideoReceiverLog) << "Not recording!"; + return; + } + + g_object_set(_recorderValve, "drop", TRUE, nullptr); + + _removingRecorder = true; + + _unlinkBranch(_recorderValve); +#endif +} + +void +VideoReceiver::takeScreenshot(const QString& imageFile) +{ +#if defined(QGC_GST_STREAMING) + if (!_isOurThread()) { + _post([this, imageFile]() { + takeScreenshot(imageFile); + }); + return; + } + + // FIXME: AV: record screenshot here + emit screenshotComplete(); +#else + Q_UNUSED(imageFile); +#endif +} + +#if defined(QGC_GST_STREAMING) +const char* VideoReceiver::_kFileMux[FILE_FORMAT_MAX - FILE_FORMAT_MIN] = { + "matroskamux", + "qtmux", + "mp4mux" +}; + +void +VideoReceiver::_watchdog(void) +{ + _post([this](){ + if(_pipeline == nullptr) { + return; + } + + const qint64 now = QDateTime::currentSecsSinceEpoch(); + + if (_lastSourceFrameTime == 0) { + _lastSourceFrameTime = now; + } + + if (now - _lastSourceFrameTime > _timeout) { + emit timeout(); + } + + if (_decoding && !_removingDecoder) { + if (_lastVideoFrameTime == 0) { + _lastVideoFrameTime = now; + } + + if (now - _lastVideoFrameTime > _timeout * 2) { + emit timeout(); + } + } + }); +} + +void +VideoReceiver::_handleEOS(void) +{ + if(_pipeline == nullptr) { + qCWarning(VideoReceiverLog) << "We should not be here"; + return; + } + + if (!_streaming) { + stop(); + } else { + if(_decoding && _removingDecoder) { + _shutdownDecodingBranch(); + } else if(_recording && _removingRecorder) { + _shutdownRecordingBranch(); + } else { + qCWarning(VideoReceiverLog) << "Unexpected EOS!"; + stop(); + } + } +} + +GstElement* +VideoReceiver::_makeSource(const QString& uri) +{ + if (uri.isEmpty()) { + qCCritical(VideoReceiverLog) << "Failed because URI is not specified"; + return nullptr; + } + + bool isTaisync = uri.contains("tsusb://"); + bool isUdp264 = uri.contains("udp://"); + bool isRtsp = uri.contains("rtsp://"); + bool isUdp265 = uri.contains("udp265://"); + bool isTcpMPEGTS= uri.contains("tcp://"); + bool isUdpMPEGTS= uri.contains("mpegts://"); + + GstElement* source = nullptr; + GstElement* buffer = nullptr; + GstElement* tsdemux = nullptr; + GstElement* parser = nullptr; + GstElement* bin = nullptr; + GstElement* srcbin = nullptr; + + do { + QUrl url(uri); + + if(isTcpMPEGTS) { + if ((source = gst_element_factory_make("tcpclientsrc", "source")) != nullptr) { + g_object_set(static_cast(source), "host", qPrintable(url.host()), "port", url.port(), nullptr); + } + } else if (isRtsp) { + if ((source = gst_element_factory_make("rtspsrc", "source")) != nullptr) { + g_object_set(static_cast(source), "location", qPrintable(uri), "latency", 17, "udp-reconnect", 1, "timeout", _udpReconnect_us, NULL); + } + } else if(isUdp264 || isUdp265 || isUdpMPEGTS || isTaisync) { + if ((source = gst_element_factory_make("udpsrc", "source")) != nullptr) { g_object_set(static_cast(source), "uri", QString("udp://%1:%2").arg(qPrintable(url.host()), QString::number(url.port())).toUtf8().data(), nullptr); GstCaps* caps = nullptr; @@ -332,7 +625,7 @@ VideoReceiver::_makeSource(const QString& uri) qCCritical(VideoReceiverLog) << "gst_caps_from_string() failed"; break; } - } else if (isUdp264) { + } else if (isUdp265) { if ((caps = gst_caps_from_string("application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H265")) == nullptr) { qCCritical(VideoReceiverLog) << "gst_caps_from_string() failed"; break; @@ -354,26 +647,37 @@ VideoReceiver::_makeSource(const QString& uri) break; } + if ((bin = gst_bin_new("sourcebin")) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_bin_new('sourcebin') failed"; + break; + } + + if ((parser = gst_element_factory_make("parsebin", "parser")) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_factory_make('parsebin') failed"; + break; + } + + gst_bin_add_many(GST_BIN(bin), source, parser, nullptr); + // FIXME: AV: Android does not determine MPEG2-TS via parsebin - have to explicitly state which demux to use + // FIXME: AV: tsdemux handling is a bit ugly - let's try to find elegant solution for that later if (isTcpMPEGTS || isUdpMPEGTS) { - if ((parser = gst_element_factory_make("tsdemux", "parser")) == nullptr) { - qCritical(VideoReceiverLog) << "gst_element_factory_make('tsdemux') failed"; + if ((tsdemux = gst_element_factory_make("tsdemux", nullptr)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_factory_make('tsdemux') failed"; break; } - } else { - if ((parser = gst_element_factory_make("parsebin", "parser")) == nullptr) { - qCritical() << "VideoReceiver::_makeSource() failed. Error with gst_element_factory_make('parsebin')"; + + gst_bin_add(GST_BIN(bin), tsdemux); + + if (!gst_element_link(source, tsdemux)) { + qCCritical(VideoReceiverLog) << "gst_element_link() failed"; break; } - } - if ((bin = gst_bin_new("sourcebin")) == nullptr) { - qCCritical(VideoReceiverLog) << "gst_bin_new('sourcebin') failed"; - break; + source = tsdemux; + tsdemux = nullptr; } - gst_bin_add_many(GST_BIN(bin), source, parser, nullptr); - int probeRes = 0; gst_element_foreach_src_pad(source, _padProbe, &probeRes); @@ -403,7 +707,7 @@ VideoReceiver::_makeSource(const QString& uri) g_signal_connect(parser, "pad-added", G_CALLBACK(_wrapWithGhostPad), nullptr); - source = buffer = parser = nullptr; + source = tsdemux = buffer = parser = nullptr; srcbin = bin; bin = nullptr; @@ -419,6 +723,11 @@ VideoReceiver::_makeSource(const QString& uri) parser = nullptr; } + if (tsdemux != nullptr) { + gst_object_unref(tsdemux); + tsdemux = nullptr; + } + if (buffer != nullptr) { gst_object_unref(buffer); buffer = nullptr; @@ -432,376 +741,420 @@ VideoReceiver::_makeSource(const QString& uri) return srcbin; } -bool VideoReceiver::streamEnabled() const -{ - return _streamEnabled; -} - -void VideoReceiver::setStreamEnabled(bool enabled) +GstElement* +VideoReceiver::_makeDecoder(GstCaps* caps, GstElement* videoSink) { - if (_streamEnabled != enabled) { - _streamEnabled = enabled; - emit streamEnabledChanged(); - } -} + GstElement* decoder = nullptr; -bool VideoReceiver::streamConfigured() const -{ - return _streamConfigured; -} + do { + if ((decoder = gst_element_factory_make("decodebin", nullptr)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_factory_make('decodebin') failed"; + break; + } -void VideoReceiver::setStreamConfigured(bool enabled) -{ - if (_streamConfigured != enabled) { - _streamConfigured = enabled; - emit streamEnabledChanged(); - } -} + g_signal_connect(decoder, "autoplug-query", G_CALLBACK(_autoplugQuery), videoSink); + } while(0); -bool VideoReceiver::isTaisync() const -{ - return _isTaisync; + return decoder; } -void VideoReceiver::setIsTaysinc(bool enabled) +GstElement* +VideoReceiver::_makeFileSink(const QString& videoFile, FILE_FORMAT format) { - if (_isTaisync != enabled) { - _isTaisync = enabled; - emit isTaisyncChanged(); - } -} + GstElement* fileSink = nullptr; + GstElement* mux = nullptr; + GstElement* sink = nullptr; + GstElement* bin = nullptr; + bool releaseElements = true; -QString VideoReceiver::videoPath() const -{ - return _videoPath; -} + do{ + if (format < FILE_FORMAT_MIN || format >= FILE_FORMAT_MAX) { + qCCritical(VideoReceiverLog) << "Unsupported file format"; + break; + } -void VideoReceiver::setVideoPath(const QString& value) -{ - if (_videoPath != value) { - _videoPath = value; - emit videoPathChanged(); - } -} + if ((mux = gst_element_factory_make(_kFileMux[format - FILE_FORMAT_MIN], nullptr)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_factory_make('" << _kFileMux[format - FILE_FORMAT_MIN] << "') failed"; + break; + } -QString VideoReceiver::imagePath() const -{ - return _imagePath; -} + if ((sink = gst_element_factory_make("filesink", nullptr)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_factory_make('filesink') failed"; + break; + } -void VideoReceiver::setImagePath(const QString& value) -{ - if (_imagePath != value) { - _imagePath = value; - emit imagePathChanged(); - } -} + g_object_set(static_cast(sink), "location", qPrintable(videoFile), nullptr); -int VideoReceiver::recordingFormatId() const -{ - return _recordingFormatId; -} + if ((bin = gst_bin_new("sinkbin")) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_bin_new('sinkbin') failed"; + break; + } -void VideoReceiver::setRecordingFormatId(int value) -{ - if (_recordingFormatId != value && value < (int) NUM_MUXES) { - _recordingFormatId = value; - emit recordingFormatIdChanged(); - } -} + GstPadTemplate* padTemplate; -int VideoReceiver::rtspTimeout() const -{ - return _rtspTimeout; -} + if ((padTemplate = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(mux), "video_%u")) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_class_get_pad_template(mux) failed"; + break; + } -void VideoReceiver::setRtspTimeout(int value) -{ - if (_rtspTimeout != value) { - _rtspTimeout = value; - emit rtspTimeoutChanged(); + // FIXME: AV: pad handling is potentially leaking (and other similar places too!) + GstPad* pad; + + if ((pad = gst_element_request_pad(mux, padTemplate, nullptr, nullptr)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_request_pad(mux) failed"; + break; + } + + gst_bin_add_many(GST_BIN(bin), mux, sink, nullptr); + + releaseElements = false; + + GstPad* ghostpad = gst_ghost_pad_new("sink", pad); + + gst_element_add_pad(bin, ghostpad); + + gst_object_unref(pad); + pad = nullptr; + + if (!gst_element_link(mux, sink)) { + qCCritical(VideoReceiverLog) << "gst_element_link() failed"; + break; + } + + fileSink = bin; + bin = nullptr; + } while(0); + + if (releaseElements) { + if (sink != nullptr) { + gst_object_unref(sink); + sink = nullptr; + } + + if (mux != nullptr) { + gst_object_unref(mux); + mux = nullptr; + } } -} -void VideoReceiver::setUnittestMode(bool runUnitTests) -{ - _unittTestMode = runUnitTests; -} + if (bin != nullptr) { + gst_object_unref(bin); + bin = nullptr; + } -#endif + return fileSink; +} -//----------------------------------------------------------------------------- -// When we finish our pipeline will look like this: -// -// +-->queue-->decoder-->_videosink -// | -// datasource-->demux-->parser-->tee -// ^ -// | -// +-Here we will later link elements for recording void -VideoReceiver::start() +VideoReceiver::_onNewSourcePad(GstPad* pad) { - qCDebug(VideoReceiverLog) << "Starting " << _uri; - if(_unittTestMode) { + // FIXME: check for caps - if this is not video stream (and preferably - one of these which we have to support) then simply skip it + if(!gst_element_link(_source, _tee)) { + qCCritical(VideoReceiverLog) << "Unable to link source"; return; } - if(!_streamEnabled || !_streamConfigured) { - qCDebug(VideoReceiverLog) << "Stream not enabled/configured"; + + if (!_streaming) { + _streaming = true; + qCDebug(VideoReceiverLog) << "Streaming started"; + emit streamingChanged(); + } + + gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, _eosProbe, this, nullptr); + + if (_videoSink == nullptr) { return; } -#if defined(QGC_GST_STREAMING) - _stop = false; + GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-source-pad"); + + if (!_addDecoder(_decoderValve)) { + qCCritical(VideoReceiverLog) << "_addDecoder() failed"; + return; + } + + g_object_set(_decoderValve, "drop", FALSE, nullptr); + + qCDebug(VideoReceiverLog) << "Decoding started"; +} - QString uri = _uri; +void +VideoReceiver::_onNewDecoderPad(GstPad* pad) +{ + GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-decoder-pad"); -#if defined(QGC_GST_TAISYNC_ENABLED) && (defined(__android__) || defined(__ios__)) - //-- Taisync on iOS or Android sends a raw h.264 stream - if (_isTaisync) { - uri = QString("tsusb://0.0.0.0:%1").arg(TAISYNC_VIDEO_UDP_PORT); + if (!_addVideoSink(pad)) { + qCCritical(VideoReceiverLog) << "_addVideoSink() failed"; } -#endif +} - if (uri.isEmpty()) { - qCDebug(VideoReceiverLog) << "Failed because URI is not specified"; - return; +bool +VideoReceiver::_addDecoder(GstElement* src) +{ + GstPad* srcpad; + + if ((srcpad = gst_element_get_static_pad(src, "src")) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed"; + return false; } - if (_videoSink == nullptr) { - qCWarning(VideoReceiverLog) << "Failed because video sink is not set"; - return; + GstCaps* caps; + + if ((caps = gst_pad_query_caps(srcpad, nullptr)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_pad_query_caps() failed"; + gst_object_unref(srcpad); + srcpad = nullptr; + return false; } - if(_running) { - qCDebug(VideoReceiverLog) << "Already running!"; - return; + + gst_object_unref(srcpad); + srcpad = nullptr; + + if ((_decoder = _makeDecoder(caps, _videoSink)) == nullptr) { + qCCritical(VideoReceiverLog) << "_makeDecoder() failed"; + gst_caps_unref(caps); + caps = nullptr; + return false; } - _starting = true; + gst_object_ref(_decoder); - _lastFrameId = G_MAXUINT64; - _lastFrameTime = 0; + gst_caps_unref(caps); + caps = nullptr; - bool running = false; - bool pipelineUp = false; + // FIXME: AV: check if srcpad exists - if it does then no need to wait for new pad + // int probeRes = 0; + // gst_element_foreach_src_pad(source, _padProbe, &probeRes); + g_signal_connect(_decoder, "pad-added", G_CALLBACK(_onNewPad), this); - GstElement* source = nullptr; - GstElement* queue = nullptr; - GstElement* decoder = nullptr; + gst_bin_add(GST_BIN(_pipeline), _decoder); - do { - if ((_pipeline = gst_pipeline_new("receiver")) == nullptr) { - qCCritical(VideoReceiverLog) << "gst_pipeline_new() failed"; - break; - } + gst_element_sync_state_with_parent(_decoder); - g_object_set(_pipeline, "message-forward", TRUE, nullptr); + GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-decoder"); - if ((source = _makeSource(uri)) == nullptr) { - qCCritical(VideoReceiverLog) << "_makeSource() failed"; - break; - } + if (!gst_element_link(src, _decoder)) { + qCCritical(VideoReceiverLog) << "Unable to link decoder"; + return false; + } - if((_tee = gst_element_factory_make("tee", nullptr)) == nullptr) { - qCCritical(VideoReceiverLog) << "gst_element_factory_make('tee') failed"; - break; - } + return true; +} - if((queue = gst_element_factory_make("queue", nullptr)) == nullptr) { - // TODO: We may want to add queue2 max-size-buffers=1 to get lower latency - // We should compare gstreamer scripts to QGroundControl to determine the need - qCCritical(VideoReceiverLog) << "gst_element_factory_make('queue') failed"; - break; - } +bool +VideoReceiver::_addVideoSink(GstPad* pad) +{ + GstCaps* caps = gst_pad_query_caps(pad, nullptr); - if ((decoder = gst_element_factory_make("decodebin", "decoder")) == nullptr) { - qCCritical(VideoReceiverLog) << "gst_element_factory_make('decodebin') failed"; - break; + gst_object_ref(_videoSink); // gst_bin_add() will steal one reference + + gst_bin_add(GST_BIN(_pipeline), _videoSink); + + if(!gst_element_link(_decoder, _videoSink)) { + gst_bin_remove(GST_BIN(_pipeline), _videoSink); + qCCritical(VideoReceiverLog) << "Unable to link video sink"; + if (caps != nullptr) { + gst_caps_unref(caps); + caps = nullptr; } + return false; + } - gst_bin_add_many(GST_BIN(_pipeline), source, _tee, queue, decoder, _videoSink, nullptr); + gst_element_sync_state_with_parent(_videoSink); - pipelineUp = true; + GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-videosink"); - g_signal_connect(source, "pad-added", G_CALLBACK(newPadCB), _tee); + if (caps != nullptr) { + GstStructure* s = gst_caps_get_structure(caps, 0); - if(!gst_element_link_many(_tee, queue, decoder, nullptr)) { - qCCritical(VideoReceiverLog) << "Unable to receiver pipeline."; - break; + if (s != nullptr) { + gint width, height; + gst_structure_get_int(s, "width", &width); + gst_structure_get_int(s, "height", &height); + _setVideoSize(QSize(width, height)); } - g_signal_connect(decoder, "pad-added", G_CALLBACK(newPadCB), _videoSink); - g_signal_connect(decoder, "autoplug-query", G_CALLBACK(autoplugQueryCB), _videoSink); + gst_caps_unref(caps); + caps = nullptr; + } else { + _setVideoSize(QSize(0, 0)); + } - source = queue = decoder = nullptr; + _decoding = true; + emit decodingChanged(); - GstBus* bus = nullptr; + return true; +} - if ((bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline))) != nullptr) { - gst_bus_enable_sync_message_emission(bus); - g_signal_connect(bus, "sync-message", G_CALLBACK(_onBusMessage), this); - gst_object_unref(bus); - bus = nullptr; - } +void +VideoReceiver::_noteTeeFrame(void) +{ + _lastSourceFrameTime = QDateTime::currentSecsSinceEpoch(); +} - GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-paused"); - running = gst_element_set_state(_pipeline, GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE; +void +VideoReceiver::_noteVideoSinkFrame(void) +{ + _lastVideoFrameTime = QDateTime::currentSecsSinceEpoch(); +} - } while(0); +void +VideoReceiver::_noteEndOfStream(void) +{ + if (_streaming) { + _streaming = false; + } +} - if (!running) { - qCCritical(VideoReceiverLog) << "Failed"; +// -Unlink the branch from the src pad +// -Send an EOS event at the beginning of that branch +void +VideoReceiver::_unlinkBranch(GstElement* from) +{ + GstPad* src; - // In newer versions, the pipeline will clean up all references that are added to it - if (_pipeline != nullptr) { - gst_bin_remove(GST_BIN(_pipeline), _videoSink); - gst_object_unref(_pipeline); - _pipeline = nullptr; - } + if ((src = gst_element_get_static_pad(from, "src")) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed"; + return; + } - // If we failed before adding items to the pipeline, then clean up - if (!pipelineUp) { - if (decoder != nullptr) { - gst_object_unref(decoder); - decoder = nullptr; - } + GstPad* sink; - if (queue != nullptr) { - gst_object_unref(queue); - queue = nullptr; - } + if ((sink = gst_pad_get_peer(src)) == nullptr) { + gst_object_unref(src); + src = nullptr; + qCCritical(VideoReceiverLog) << "gst_pad_get_peer() failed"; + return; + } - if (source != nullptr) { - gst_object_unref(source); - source = nullptr; - } + if (!gst_pad_unlink(src, sink)) { + gst_object_unref(src); + src = nullptr; + gst_object_unref(sink); + sink = nullptr; + qCCritical(VideoReceiverLog) << "gst_pad_unlink() failed"; + return; + } - if (_tee != nullptr) { - gst_object_unref(_tee); - _tee = nullptr; - } + gst_object_unref(src); + src = nullptr; - } + // Send EOS at the beginning of the branch + const gboolean ret = gst_pad_send_event(sink, gst_event_new_eos()); - _running = false; + gst_object_unref(sink); + sink = nullptr; + + if (ret) { + qCDebug(VideoReceiverLog) << "Branch EOS was sent"; } else { - GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-playing"); - _running = true; - qCDebug(VideoReceiverLog) << "Running"; + qCCritical(VideoReceiverLog) << "Branch EOS was NOT sent"; } - _starting = false; -#endif } -//----------------------------------------------------------------------------- void -VideoReceiver::stop() +VideoReceiver::_shutdownDecodingBranch(void) { - if(_unittTestMode) { - return; + if (_decoder != nullptr) { + GstObject* parent; + + if ((parent = gst_element_get_parent(_decoder)) != nullptr) { + gst_bin_remove(GST_BIN(_pipeline), _decoder); + gst_element_set_state(_decoder, GST_STATE_NULL); + gst_object_unref(parent); + parent = nullptr; + } + + gst_object_unref(_decoder); + _decoder = nullptr; } -#if defined(QGC_GST_STREAMING) - _stop = true; - qCDebug(VideoReceiverLog) << "Stopping"; - if(!_streaming) { - _shutdownPipeline(); - } else if (_pipeline != nullptr && !_stopping) { - qCDebug(VideoReceiverLog) << "Stopping _pipeline"; - GstBus* bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline)); - gst_bus_disable_sync_message_emission(bus); - gst_element_send_event(_pipeline, gst_event_new_eos()); - _stopping = true; - GstMessage* message = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, (GstMessageType)(GST_MESSAGE_EOS|GST_MESSAGE_ERROR)); - gst_object_unref(bus); - if(GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) { - _shutdownPipeline(); - qCCritical(VideoReceiverLog) << "Error stopping pipeline!"; - } else if(GST_MESSAGE_TYPE(message) == GST_MESSAGE_EOS) { - _handleEOS(); + + if (_videoSinkProbeId != 0) { + GstPad* sinkpad; + if ((sinkpad = gst_element_get_static_pad(_videoSink, "sink")) != nullptr) { + gst_pad_remove_probe(sinkpad, _videoSinkProbeId); + gst_object_unref(sinkpad); + sinkpad = nullptr; } - gst_message_unref(message); + _videoSinkProbeId = 0; } -#endif + + _lastVideoFrameTime = 0; + + GstObject* parent; + + if ((parent = gst_element_get_parent(_videoSink)) != nullptr) { + gst_bin_remove(GST_BIN(_pipeline), _videoSink); + gst_element_set_state(_videoSink, GST_STATE_NULL); + gst_object_unref(parent); + parent = nullptr; + } + + gst_object_unref(_videoSink); + _videoSink = nullptr; + + _removingDecoder = false; + + if (_decoding) { + _decoding = false; + emit decodingChanged(); + qCDebug(VideoReceiverLog) << "Decoding stopped"; + } + + GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-decoding-stopped"); } -//----------------------------------------------------------------------------- void -VideoReceiver::setUri(const QString & uri) +VideoReceiver::_shutdownRecordingBranch(void) { - _uri = uri; -} + gst_bin_remove(GST_BIN(_pipeline), _fileSink); + gst_element_set_state(_fileSink, GST_STATE_NULL); + gst_object_unref(_fileSink); + _fileSink = nullptr; -//----------------------------------------------------------------------------- -#if defined(QGC_GST_STREAMING) -void -VideoReceiver::_shutdownPipeline() { - if(!_pipeline) { - qCDebug(VideoReceiverLog) << "No pipeline"; - return; + _removingRecorder = false; + + if (_recording) { + _recording = false; + emit recordingChanged(); + qCDebug(VideoReceiverLog) << "Recording stopped"; } - GstBus* bus = nullptr; - if ((bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline))) != nullptr) { - gst_bus_disable_sync_message_emission(bus); - gst_object_unref(bus); - bus = nullptr; - } - gst_element_set_state(_pipeline, GST_STATE_NULL); - gst_bin_remove(GST_BIN(_pipeline), _videoSink); - _tee = nullptr; - gst_object_unref(_pipeline); - _pipeline = nullptr; - delete _sink; - _sink = nullptr; - _streaming = false; - _recording = false; - _stopping = false; - _running = false; - emit recordingChanged(); + + GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-recording-stopped"); } -#endif -//----------------------------------------------------------------------------- -#if defined(QGC_GST_STREAMING) -void -VideoReceiver::_handleError() { - qCDebug(VideoReceiverLog) << "Gstreamer error!"; - stop(); - _restart_timer.start(_restart_time_ms); +bool +VideoReceiver::_isOurThread(void) +{ + return QThread::currentThread() == (QThread*)this; } -#endif -//----------------------------------------------------------------------------- -#if defined(QGC_GST_STREAMING) void -VideoReceiver::_handleEOS() { - if(_stopping) { - if(_recording && _sink->removing) { - _shutdownRecordingBranch(); - } - _shutdownPipeline(); - qCDebug(VideoReceiverLog) << "Stopped"; - } else if(_recording && _sink->removing) { - _shutdownRecordingBranch(); - } else { - qCWarning(VideoReceiverLog) << "Unexpected EOS!"; - _handleError(); - } +VideoReceiver::_post(Task t) +{ + QMutexLocker lock(&_taskQueueSync); + _taskQueue.enqueue(t); + _taskQueueUpdate.wakeOne(); } -#endif -//----------------------------------------------------------------------------- -#if defined(QGC_GST_STREAMING) void -VideoReceiver::_handleStateChanged() { - if(_pipeline) { - _streaming = GST_STATE(_pipeline) == GST_STATE_PLAYING; - //qCDebug(VideoReceiverLog) << "State changed, _streaming:" << _streaming; +VideoReceiver::run(void) +{ + while(!_shutdown) { + _taskQueueSync.lock(); + + while (_taskQueue.isEmpty()) { + _taskQueueUpdate.wait(&_taskQueueSync); + } + + Task t = _taskQueue.dequeue(); + + _taskQueueSync.unlock(); + + t(); } } -#endif -//----------------------------------------------------------------------------- -#if defined(QGC_GST_STREAMING) gboolean VideoReceiver::_onBusMessage(GstBus* bus, GstMessage* msg, gpointer data) { @@ -809,38 +1162,60 @@ VideoReceiver::_onBusMessage(GstBus* bus, GstMessage* msg, gpointer data) Q_ASSERT(msg != nullptr && data != nullptr); VideoReceiver* pThis = (VideoReceiver*)data; - switch(GST_MESSAGE_TYPE(msg)) { - case(GST_MESSAGE_ERROR): { - gchar* debug; - GError* error; - gst_message_parse_error(msg, &error, &debug); - g_free(debug); - qCCritical(VideoReceiverLog) << error->message; - g_error_free(error); - pThis->msgErrorReceived(); - } - break; - case(GST_MESSAGE_EOS): - pThis->msgEOSReceived(); + switch (GST_MESSAGE_TYPE(msg)) { + case GST_MESSAGE_ERROR: + do { + gchar* debug; + GError* error; + + gst_message_parse_error(msg, &error, &debug); + + if (debug != nullptr) { + g_free(debug); + debug = nullptr; + } + + if (error != nullptr) { + qCCritical(VideoReceiverLog) << error->message; + g_error_free(error); + error = nullptr; + } + + pThis->_post([pThis](){ + pThis->stop(); + }); + } while(0); break; - case(GST_MESSAGE_STATE_CHANGED): - pThis->msgStateChangedReceived(); + case GST_MESSAGE_EOS: + pThis->_post([pThis](){ + pThis->_handleEOS(); + }); break; - case(GST_MESSAGE_ELEMENT): { - const GstStructure *s = gst_message_get_structure (msg); - - if (gst_structure_has_name (s, "GstBinForwarded")) { - GstMessage *forward_msg = NULL; - gst_structure_get (s, "message", GST_TYPE_MESSAGE, &forward_msg, NULL); - if (forward_msg != nullptr) { - if (GST_MESSAGE_TYPE(forward_msg) == GST_MESSAGE_EOS) { - pThis->msgEOSReceived(); - } - gst_message_unref(forward_msg); - forward_msg = nullptr; + case GST_MESSAGE_ELEMENT: + do { + const GstStructure* s = gst_message_get_structure (msg); + + if (!gst_structure_has_name (s, "GstBinForwarded")) { + break; } - } - } + + GstMessage* forward_msg = nullptr; + + gst_structure_get(s, "message", GST_TYPE_MESSAGE, &forward_msg, NULL); + + if (forward_msg == nullptr) { + break; + } + + if (GST_MESSAGE_TYPE(forward_msg) == GST_MESSAGE_EOS) { + pThis->_post([pThis](){ + pThis->_handleEOS(); + }); + } + + gst_message_unref(forward_msg); + forward_msg = nullptr; + } while(0); break; default: break; @@ -848,393 +1223,309 @@ VideoReceiver::_onBusMessage(GstBus* bus, GstMessage* msg, gpointer data) return TRUE; } -#endif -//----------------------------------------------------------------------------- -#if defined(QGC_GST_STREAMING) void -VideoReceiver::setVideoSink(GstElement* videoSink) +VideoReceiver::_onNewPad(GstElement* element, GstPad* pad, gpointer data) +{ + VideoReceiver* self = static_cast(data); + + if (element == self->_source) { + self->_onNewSourcePad(pad); + } else if (element == self->_decoder) { + self->_onNewDecoderPad(pad); + } else { + qCDebug(VideoReceiverLog) << "Unexpected call!"; + } +} + +void +VideoReceiver::_wrapWithGhostPad(GstElement* element, GstPad* pad, gpointer data) { - if(_pipeline != nullptr) { - qCDebug(VideoReceiverLog) << "Video receiver pipeline is active, video sink change is not possible"; + gchar* name; + + if ((name = gst_pad_get_name(pad)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_pad_get_name() failed"; return; } - if (_videoSink != nullptr) { - gst_object_unref(_videoSink); - _videoSink = nullptr; + GstPad* ghostpad; + + if ((ghostpad = gst_ghost_pad_new(name, pad)) == nullptr) { + qCCritical(VideoReceiverLog) << "gst_ghost_pad_new() failed"; + g_free(name); + name = nullptr; + return; } - if (videoSink != nullptr) { - _videoSink = videoSink; - gst_object_ref(_videoSink); + g_free(name); + name = nullptr; - GstPad* pad = gst_element_get_static_pad(_videoSink, "sink"); + gst_pad_set_active(ghostpad, TRUE); - if (pad != nullptr) { - gst_pad_add_probe(pad, (GstPadProbeType)(GST_PAD_PROBE_TYPE_BUFFER), _videoSinkProbe, this, nullptr); - gst_object_unref(pad); - pad = nullptr; - } else { - qCCritical(VideoReceiverLog) << "Unable to find sink pad of video sink"; - } + if (!gst_element_add_pad(GST_ELEMENT_PARENT(element), ghostpad)) { + qCCritical(VideoReceiverLog) << "gst_element_add_pad() failed"; } } -#endif -//----------------------------------------------------------------------------- -// When we finish our pipeline will look like this: -// -// +-->queue-->decoder-->_videosink -// | -// source-->tee -// | -// | +---------_sink----------+ -// | | | -// we are adding these elements-> +->teepad-->queue-->_filesink | -// | | -// +------------------------+ -#if defined(QGC_GST_STREAMING) -GstElement* -VideoReceiver::_makeFileSink(const QString& videoFile, unsigned format) +void +VideoReceiver::_linkPadWithOptionalBuffer(GstElement* element, GstPad* pad, gpointer data) { - GstElement* fileSink = nullptr; - GstElement* mux = nullptr; - GstElement* sink = nullptr; - GstElement* bin = nullptr; - bool releaseElements = true; - - do{ - if ((mux = gst_element_factory_make(kVideoMuxes[format], nullptr)) == nullptr) { - qCCritical(VideoReceiverLog) << "gst_element_factory_make('" << kVideoMuxes[format] << "') failed"; - break; - } + bool isRtpPad = false; - if ((sink = gst_element_factory_make("filesink", nullptr)) == nullptr) { - qCCritical(VideoReceiverLog) << "gst_element_factory_make('filesink') failed"; - break; - } - - g_object_set(static_cast(sink), "location", qPrintable(videoFile), nullptr); - - if ((bin = gst_bin_new("sinkbin")) == nullptr) { - qCCritical(VideoReceiverLog) << "gst_bin_new('sinkbin') failed"; - break; - } + GstCaps* filter; - GstPadTemplate* padTemplate; + if ((filter = gst_caps_from_string("application/x-rtp")) != nullptr) { + GstCaps* caps = gst_pad_query_caps(pad, nullptr); - if ((padTemplate = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(mux), "video_%u")) == nullptr) { - qCCritical(VideoReceiverLog) << "gst_element_class_get_pad_template(mux) failed"; - break; + if (caps != nullptr) { + if (!gst_caps_is_any(caps) && gst_caps_can_intersect(caps, filter)) { + isRtpPad = true; + } + gst_caps_unref(caps); + caps = nullptr; } - // FIXME: AV: pad handling is potentially leaking (and other similar places too!) - GstPad* pad; + gst_caps_unref(filter); + filter = nullptr; + } - if ((pad = gst_element_request_pad(mux, padTemplate, nullptr, nullptr)) == nullptr) { - qCCritical(VideoReceiverLog) << "gst_element_request_pad(mux) failed"; - break; - } + if (isRtpPad) { + GstElement* buffer; - gst_bin_add_many(GST_BIN(bin), mux, sink, nullptr); + if ((buffer = gst_element_factory_make("rtpjitterbuffer", nullptr)) != nullptr) { + gst_bin_add(GST_BIN(GST_ELEMENT_PARENT(element)), buffer); - releaseElements = false; + gst_element_sync_state_with_parent(buffer); - GstPad* ghostpad = gst_ghost_pad_new("sink", pad); + GstPad* sinkpad = gst_element_get_static_pad(buffer, "sink"); - gst_element_add_pad(bin, ghostpad); + if (sinkpad != nullptr) { + const GstPadLinkReturn ret = gst_pad_link(pad, sinkpad); - gst_object_unref(pad); - pad = nullptr; + gst_object_unref(sinkpad); + sinkpad = nullptr; - if (!gst_element_link(mux, sink)) { - qCCritical(VideoReceiverLog) << "gst_element_link() failed"; - break; + if (ret == GST_PAD_LINK_OK) { + pad = gst_element_get_static_pad(buffer, "src"); + element = buffer; + } else { + qCDebug(VideoReceiverLog) << "Partially failed - gst_pad_link()"; + } + } else { + qCDebug(VideoReceiverLog) << "Partially failed - gst_element_get_static_pad()"; + } + } else { + qCDebug(VideoReceiverLog) << "Partially failed - gst_element_factory_make('rtpjitterbuffer')"; } + } - fileSink = bin; - bin = nullptr; - } while(0); - - if (releaseElements) { - if (sink != nullptr) { - gst_object_unref(sink); - sink = nullptr; - } + gchar* name; - if (mux != nullptr) { - gst_object_unref(mux); - mux = nullptr; + if ((name = gst_pad_get_name(pad)) != nullptr) { + if(gst_element_link_pads(element, name, GST_ELEMENT(data), "sink") == false) { + qCCritical(VideoReceiverLog) << "gst_element_link_pads() failed"; } - } - if (bin != nullptr) { - gst_object_unref(bin); - bin = nullptr; + g_free(name); + name = nullptr; + } else { + qCCritical(VideoReceiverLog) << "gst_pad_get_name() failed"; } - - return fileSink; } -#endif -void -VideoReceiver::startRecording(const QString &videoFile) +gboolean +VideoReceiver::_padProbe(GstElement* element, GstPad* pad, gpointer user_data) { -#if defined(QGC_GST_STREAMING) - emit beforeRecording(); + int* probeRes = (int*)user_data; - qCDebug(VideoReceiverLog) << "Starting recording"; - // exit immediately if we are already recording - if(_pipeline == nullptr || _recording) { - qCDebug(VideoReceiverLog) << "Already recording!"; - return; - } + *probeRes |= 1; - uint32_t muxIdx = _recordingFormatId; - if(muxIdx >= NUM_MUXES) { - emit sendMessage(tr("Invalid video format defined.")); - return; - } + GstCaps* filter = gst_caps_from_string("application/x-rtp"); - QString savePath = _videoPath; - if(savePath.isEmpty()) { - emit sendMessage(tr("Unabled to record video. Video save path must be specified in Settings.")); - return; - } + if (filter != nullptr) { + GstCaps* caps = gst_pad_query_caps(pad, nullptr); - _videoFile = savePath + "/" - + (videoFile.isEmpty() ? QDateTime::currentDateTime().toString("yyyy-MM-dd_hh.mm.ss") : videoFile) - + "." + kVideoExtensions[muxIdx]; + if (caps != nullptr) { + if (!gst_caps_is_any(caps) && gst_caps_can_intersect(caps, filter)) { + *probeRes |= 2; + } + + gst_caps_unref(caps); + caps = nullptr; + } + + gst_caps_unref(filter); + filter = nullptr; + } - qCDebug(VideoReceiverLog) << "New video file:" << _videoFile; + return TRUE; +} - emit videoFileChanged(); +gboolean +VideoReceiver::_autoplugQueryCaps(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data) +{ + GstElement* glupload = (GstElement* )data; - _sink = new Sink(); - _sink->teepad = gst_element_get_request_pad(_tee, "src_%u"); - _sink->queue = gst_element_factory_make("queue", nullptr); - _sink->filesink = _makeFileSink(_videoFile, muxIdx); - _sink->removing = false; + GstPad* sinkpad; - if(!_sink->teepad || !_sink->queue || !_sink->filesink) { - qCCritical(VideoReceiverLog) << "Failed to make _sink elements"; - return; + if ((sinkpad = gst_element_get_static_pad(glupload, "sink")) == nullptr) { + qCCritical(VideoReceiverLog) << "No sink pad found"; + return FALSE; } - gst_object_ref(_sink->queue); - gst_object_ref(_sink->filesink); + GstCaps* filter; - gst_bin_add(GST_BIN(_pipeline), _sink->queue); + gst_query_parse_caps(query, &filter); - gst_element_sync_state_with_parent(_sink->queue); + GstCaps* sinkcaps = gst_pad_query_caps(sinkpad, filter); - // Install a probe on the recording branch to drop buffers until we hit our first keyframe - // When we hit our first keyframe, we can offset the timestamps appropriately according to the first keyframe time - // This will ensure the first frame is a keyframe at t=0, and decoding can begin immediately on playback - // Once we have this valid frame, we attach the filesink. - // Attaching it here would cause the filesink to fail to preroll and to stall the pipeline for a few seconds. - GstPad* probepad = gst_element_get_static_pad(_sink->queue, "src"); - gst_pad_add_probe(probepad, GST_PAD_PROBE_TYPE_BUFFER, _keyframeWatch, this, nullptr); // to drop the buffer - gst_object_unref(probepad); + gst_query_set_caps_result(query, sinkcaps); - // Link the recording branch to the pipeline - GstPad* sinkpad = gst_element_get_static_pad(_sink->queue, "sink"); - gst_pad_link(_sink->teepad, sinkpad); - gst_object_unref(sinkpad); + const gboolean ret = !gst_caps_is_empty(sinkcaps); -// // Add the filesink once we have a valid I-frame -// gst_bin_add(GST_BIN(_pipeline), _sink->filesink); -// if (!gst_element_link(_sink->queue, _sink->filesink)) { -// qCritical() << "Failed to link queue and file sink"; -// } -// gst_element_sync_state_with_parent(_sink->filesink); + gst_caps_unref(sinkcaps); + sinkcaps = nullptr; - GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-recording"); + gst_object_unref(sinkpad); + sinkpad = nullptr; - _recording = true; - emit recordingChanged(); - qCDebug(VideoReceiverLog) << "Recording started"; -#else - Q_UNUSED(videoFile) -#endif + return ret; } -//----------------------------------------------------------------------------- -void -VideoReceiver::stopRecording(void) +gboolean +VideoReceiver::_autoplugQueryContext(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data) { -#if defined(QGC_GST_STREAMING) - qCDebug(VideoReceiverLog) << "Stopping recording"; - // exit immediately if we are not recording - if(_pipeline == nullptr || !_recording) { - qCDebug(VideoReceiverLog) << "Not recording!"; - return; - } - // Wait for data block before unlinking - gst_pad_add_probe(_sink->teepad, GST_PAD_PROBE_TYPE_IDLE, _unlinkCallBack, this, nullptr); -#endif -} + GstElement* glsink = (GstElement* )data; -//----------------------------------------------------------------------------- -// This is only installed on the transient _pipelineStopRec in order -// to finalize a video file. It is not used for the main _pipeline. -// -EOS has appeared on the bus of the temporary pipeline -// -At this point all of the recoring elements have been flushed, and the video file has been finalized -// -Now we can remove the temporary pipeline and its elements -#if defined(QGC_GST_STREAMING) -void -VideoReceiver::_shutdownRecordingBranch() -{ - gst_bin_remove(GST_BIN(_pipeline), _sink->queue); - gst_bin_remove(GST_BIN(_pipeline), _sink->filesink); + GstPad* sinkpad; - gst_element_set_state(_sink->queue, GST_STATE_NULL); - gst_element_set_state(_sink->filesink, GST_STATE_NULL); + if ((sinkpad = gst_element_get_static_pad(glsink, "sink")) == nullptr){ + qCCritical(VideoReceiverLog) << "No sink pad found"; + return FALSE; + } - gst_object_unref(_sink->queue); - gst_object_unref(_sink->filesink); + const gboolean ret = gst_pad_query(sinkpad, query); - delete _sink; - _sink = nullptr; - _recording = false; + gst_object_unref(sinkpad); + sinkpad = nullptr; - emit recordingChanged(); - qCDebug(VideoReceiverLog) << "Recording stopped"; + return ret; } -#endif -//----------------------------------------------------------------------------- -// -Unlink the recording branch from the tee in the main _pipeline -// -Create a second temporary pipeline, and place the recording branch elements into that pipeline -// -Setup watch and handler for EOS event on the temporary pipeline's bus -// -Send an EOS event at the beginning of that pipeline -#if defined(QGC_GST_STREAMING) -void -VideoReceiver::_unlinkRecordingBranch(GstPadProbeInfo* info) +gboolean +VideoReceiver::_autoplugQuery(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data) { - Q_UNUSED(info) - // Send EOS at the beginning of the pipeline - GstPad* sinkpad = gst_element_get_static_pad(_sink->queue, "sink"); - gst_pad_unlink(_sink->teepad, sinkpad); - gst_pad_send_event(sinkpad, gst_event_new_eos()); - gst_object_unref(sinkpad); - qCDebug(VideoReceiverLog) << "Recording EOS was sent"; - // Give tee its pad back - gst_element_release_request_pad(_tee, _sink->teepad); - gst_object_unref(_sink->teepad); + gboolean ret; + + switch (GST_QUERY_TYPE(query)) { + case GST_QUERY_CAPS: + ret = _autoplugQueryCaps(bin, pad, element, query, data); + break; + case GST_QUERY_CONTEXT: + ret = _autoplugQueryContext(bin, pad, element, query, data); + break; + default: + ret = FALSE; + break; + } + + return ret; } -#endif -//----------------------------------------------------------------------------- -#if defined(QGC_GST_STREAMING) GstPadProbeReturn -VideoReceiver::_unlinkCallBack(GstPad* pad, GstPadProbeInfo* info, gpointer user_data) +VideoReceiver::_teeProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data) { Q_UNUSED(pad); - if(info != nullptr && user_data != nullptr) { + Q_UNUSED(info) + + if(user_data != nullptr) { VideoReceiver* pThis = static_cast(user_data); - // We will only act once - if(g_atomic_int_compare_and_exchange(&pThis->_sink->removing, FALSE, TRUE)) { - pThis->_unlinkRecordingBranch(info); - } + pThis->_noteTeeFrame(); } - return GST_PAD_PROBE_REMOVE; + + return GST_PAD_PROBE_OK; } -#endif -//----------------------------------------------------------------------------- -#if defined(QGC_GST_STREAMING) GstPadProbeReturn VideoReceiver::_videoSinkProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data) { - Q_UNUSED(pad); - if(info != nullptr && user_data != nullptr) { + if(user_data != nullptr) { VideoReceiver* pThis = static_cast(user_data); + + if (pThis->_resetVideoSink) { + pThis->_resetVideoSink = false; + +// FIXME: AV: this makes MPEG2-TS playing smooth but breaks RTSP +// gst_pad_send_event(pad, gst_event_new_flush_start()); +// gst_pad_send_event(pad, gst_event_new_flush_stop(TRUE)); + +// GstBuffer* buf; + +// if ((buf = gst_pad_probe_info_get_buffer(info)) != nullptr) { +// GstSegment* seg; + +// if ((seg = gst_segment_new()) != nullptr) { +// gst_segment_init(seg, GST_FORMAT_TIME); + +// seg->start = buf->pts; + +// gst_pad_send_event(pad, gst_event_new_segment(seg)); + +// gst_segment_free(seg); +// seg = nullptr; +// } + +// gst_pad_set_offset(pad, -static_cast(buf->pts)); +// } + } + pThis->_noteVideoSinkFrame(); } return GST_PAD_PROBE_OK; } -#endif - -//----------------------------------------------------------------------------- -#if defined(QGC_GST_STREAMING) -void -VideoReceiver::_noteVideoSinkFrame() -{ - _lastFrameTime = QDateTime::currentSecsSinceEpoch(); -} -#endif -//----------------------------------------------------------------------------- -#if defined(QGC_GST_STREAMING) GstPadProbeReturn -VideoReceiver::_keyframeWatch(GstPad* pad, GstPadProbeInfo* info, gpointer user_data) +VideoReceiver::_eosProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data) { Q_UNUSED(pad); - if(info != nullptr && user_data != nullptr) { - GstBuffer* buf = gst_pad_probe_info_get_buffer(info); - if(GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)) { // wait for a keyframe - return GST_PAD_PROBE_DROP; - } else { - VideoReceiver* pThis = static_cast(user_data); + Q_ASSERT(user_data != nullptr); - // set media file '0' offset to current timeline position - we don't want to touch other elements in the graph, except these which are downstream! - gst_pad_set_offset(pad, -buf->pts); + if(info != nullptr) { + GstEvent* event = gst_pad_probe_info_get_event(info); - // Add the filesink once we have a valid I-frame - gst_bin_add(GST_BIN(pThis->_pipeline), pThis->_sink->filesink); - if (!gst_element_link(pThis->_sink->queue, pThis->_sink->filesink)) { - qCCritical(VideoReceiverLog) << "Failed to link queue and file sink"; - } - gst_element_sync_state_with_parent(pThis->_sink->filesink); - - qCDebug(VideoReceiverLog) << "Got keyframe, stop dropping buffers"; - pThis->gotFirstRecordingKeyFrame(); + if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) { + VideoReceiver* pThis = static_cast(user_data); + pThis->_noteEndOfStream(); } } - return GST_PAD_PROBE_REMOVE; + return GST_PAD_PROBE_OK; } -#endif -//----------------------------------------------------------------------------- -void -VideoReceiver::_updateTimer() +GstPadProbeReturn +VideoReceiver::_keyframeWatch(GstPad* pad, GstPadProbeInfo* info, gpointer user_data) { -#if defined(QGC_GST_STREAMING) - if(_stopping || _starting) { - return; + if (info == nullptr || user_data == nullptr) { + qCCritical(VideoReceiverLog) << "Invalid arguments"; + return GST_PAD_PROBE_DROP; } - if(_streaming) { - if(!_videoRunning) { - _videoRunning = true; - emit videoRunningChanged(); - } - } else { - if(_videoRunning) { - _videoRunning = false; - emit videoRunningChanged(); - } + GstBuffer* buf = gst_pad_probe_info_get_buffer(info); + + if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)) { // wait for a keyframe + return GST_PAD_PROBE_DROP; } - if(_videoRunning) { - uint32_t timeout = _rtspTimeout; - const qint64 now = QDateTime::currentSecsSinceEpoch(); + // set media file '0' offset to current timeline position - we don't want to touch other elements in the graph, except these which are downstream! + gst_pad_set_offset(pad, -static_cast(buf->pts)); - if(now - _lastFrameTime > timeout) { - stop(); - // We want to start it back again with _updateTimer - _stop = false; - } - } else { - // FIXME: AV: if pipeline is _running but not _streaming for some time then we need to restart - if(!_stop && !_running && !_uri.isEmpty() && _streamEnabled) { - start(); - } - } -#endif -} + VideoReceiver* pThis = static_cast(user_data); + + qCDebug(VideoReceiverLog) << "Got keyframe, stop dropping buffers"; + pThis->recordingStarted(); + + return GST_PAD_PROBE_REMOVE; +} +#endif diff --git a/src/VideoStreaming/VideoReceiver.h b/src/VideoStreaming/VideoReceiver.h index dac93c3ac..d30e902bd 100644 --- a/src/VideoStreaming/VideoReceiver.h +++ b/src/VideoStreaming/VideoReceiver.h @@ -17,183 +17,159 @@ #include "QGCLoggingCategory.h" #include +#include #include #include +#include +#include +#include +#include #if defined(QGC_GST_STREAMING) #include +typedef GstElement VideoSink; +#else +typedef void VideoSink; #endif Q_DECLARE_LOGGING_CATEGORY(VideoReceiverLog) -class VideoSettings; - -class VideoReceiver : public QObject +class VideoReceiver : public QThread { Q_OBJECT -public: - -#if defined(QGC_GST_STREAMING) - Q_PROPERTY(bool recording READ recording NOTIFY recordingChanged) -#endif - Q_PROPERTY(bool videoRunning READ videoRunning NOTIFY videoRunningChanged) - Q_PROPERTY(QString imageFile READ imageFile NOTIFY imageFileChanged) - Q_PROPERTY(QString videoFile READ videoFile NOTIFY videoFileChanged) - Q_PROPERTY(QString imagePath READ imagePath NOTIFY imagePathChanged) - Q_PROPERTY(QString videoPath READ videoPath NOTIFY videoPathChanged) - - Q_PROPERTY(bool showFullScreen READ showFullScreen WRITE setShowFullScreen NOTIFY showFullScreenChanged) - Q_PROPERTY(bool streamEnabled READ streamEnabled WRITE setStreamEnabled NOTIFY streamEnabledChanged) - Q_PROPERTY(bool streamConfigured READ streamConfigured WRITE setStreamConfigured NOTIFY streamConfiguredChanged) - Q_PROPERTY(bool isTaisync READ isTaisync WRITE setIsTaysinc NOTIFY isTaisyncChanged) - - Q_PROPERTY(int recordingFormatId READ recordingFormatId WRITE setRecordingFormatId NOTIFY recordingFormatIdChanged) - Q_PROPERTY(int rtspTimeout READ rtspTimeout WRITE setRtspTimeout NOTIFY rtspTimeoutChanged) +public: explicit VideoReceiver(QObject* parent = nullptr); - ~VideoReceiver(); - - bool streamEnabled() const; - Q_SLOT void setStreamEnabled(bool enabled); - Q_SIGNAL void streamEnabledChanged(); - - bool streamConfigured() const; - Q_SLOT void setStreamConfigured(bool enabled); - Q_SIGNAL void streamConfiguredChanged(); - - bool isTaisync() const; - Q_SLOT void setIsTaysinc(bool value); - Q_SIGNAL void isTaisyncChanged(); - - QString videoPath() const; - Q_SLOT void setVideoPath(const QString& path); - Q_SIGNAL void videoPathChanged(); - - QString imagePath() const; - Q_SLOT void setImagePath(const QString& path); - Q_SIGNAL void imagePathChanged(); - - int recordingFormatId() const; - Q_SLOT void setRecordingFormatId(int value); - Q_SIGNAL void recordingFormatIdChanged(); - - int rtspTimeout() const; - Q_SLOT void setRtspTimeout(int value); - Q_SIGNAL void rtspTimeoutChanged(); - - Q_SIGNAL void restartTimeout(); - Q_SIGNAL void sendMessage(const QString& message); - - // Emitted before recording starts. - Q_SIGNAL void beforeRecording(); - void setUnittestMode(bool runUnitTests); -#if defined(QGC_GST_STREAMING) - virtual bool recording () { return _recording; } -#endif - - virtual bool videoRunning () { return _videoRunning; } - virtual QString imageFile () { return _imageFile; } - virtual QString videoFile () { return _videoFile; } - virtual bool showFullScreen () { return _showFullScreen; } - - virtual void grabImage (QString imageFile); - - virtual void setShowFullScreen (bool show) { _showFullScreen = show; emit showFullScreenChanged(); } - -#if defined(QGC_GST_STREAMING) - void setVideoSink (GstElement* videoSink); -#endif + ~VideoReceiver(void); + + typedef enum { + FILE_FORMAT_MIN = 0, + FILE_FORMAT_MKV = FILE_FORMAT_MIN, + FILE_FORMAT_MOV, + FILE_FORMAT_MP4, + FILE_FORMAT_MAX + } FILE_FORMAT; + + Q_PROPERTY(bool streaming READ streaming NOTIFY streamingChanged) + Q_PROPERTY(bool decoding READ decoding NOTIFY decodingChanged) + Q_PROPERTY(bool recording READ recording NOTIFY recordingChanged) + Q_PROPERTY(QSize videoSize READ videoSize NOTIFY videoSizeChanged) + + bool streaming(void) { + return _streaming; + } + + bool decoding(void) { + return _decoding; + } + + bool recording(void) { + return _recording; + } + + QSize videoSize(void) { + const quint32 size = _videoSize; + return QSize((size >> 16) & 0xFFFF, size & 0xFFFF); + } signals: - void videoRunningChanged (); - void imageFileChanged (); - void videoFileChanged (); - void showFullScreenChanged (); -#if defined(QGC_GST_STREAMING) - void recordingChanged (); - void msgErrorReceived (); - void msgEOSReceived (); - void msgStateChangedReceived (); - void gotFirstRecordingKeyFrame (); -#endif + void timeout(void); + void streamingChanged(void); + void decodingChanged(void); + void recordingChanged(void); + void recordingStarted(void); + void videoSizeChanged(void); + void screenshotComplete(void); public slots: - virtual void start (); - virtual void stop (); - virtual void setUri (const QString& uri); - virtual void stopRecording (); - virtual void startRecording (const QString& videoFile = QString()); + virtual void start(const QString& uri, unsigned timeout); + virtual void stop(void); + virtual void startDecoding(VideoSink* videoSink); + virtual void stopDecoding(void); + virtual void startRecording(const QString& videoFile, FILE_FORMAT format); + virtual void stopRecording(void); + virtual void takeScreenshot(const QString& imageFile); -protected slots: - virtual void _updateTimer (); #if defined(QGC_GST_STREAMING) - GstElement* _makeSource (const QString& uri); - GstElement* _makeFileSink (const QString& videoFile, unsigned format); - virtual void _handleError (); - virtual void _handleEOS (); - virtual void _handleStateChanged (); -#endif +protected slots: + virtual void _watchdog(void); + virtual void _handleEOS(void); protected: -#if defined(QGC_GST_STREAMING) - - typedef struct - { - GstPad* teepad; - GstElement* queue; - GstElement* filesink; - gboolean removing; - } Sink; - - bool _running; - bool _recording; - bool _streaming; - bool _starting; - bool _stopping; - bool _stop; - Sink* _sink; + void _setVideoSize(const QSize& size) { + _videoSize = ((quint32)size.width() << 16) | (quint32)size.height(); + emit videoSizeChanged(); + } + + virtual GstElement* _makeSource(const QString& uri); + virtual GstElement* _makeDecoder(GstCaps* caps, GstElement* videoSink); + virtual GstElement* _makeFileSink(const QString& videoFile, FILE_FORMAT format); + + virtual void _onNewSourcePad(GstPad* pad); + virtual void _onNewDecoderPad(GstPad* pad); + virtual bool _addDecoder(GstElement* src); + virtual bool _addVideoSink(GstPad* pad); + virtual void _noteTeeFrame(void); + virtual void _noteVideoSinkFrame(void); + virtual void _noteEndOfStream(void); + virtual void _unlinkBranch(GstElement* from); + virtual void _shutdownDecodingBranch (void); + virtual void _shutdownRecordingBranch(void); + + typedef std::function Task; + bool _isOurThread(void); + void _post(Task t); + void run(void); + +private: + static gboolean _onBusMessage(GstBus* bus, GstMessage* message, gpointer user_data); + static void _onNewPad(GstElement* element, GstPad* pad, gpointer data); + static void _wrapWithGhostPad(GstElement* element, GstPad* pad, gpointer data); + static void _linkPadWithOptionalBuffer(GstElement* element, GstPad* pad, gpointer data); + static gboolean _padProbe(GstElement* element, GstPad* pad, gpointer user_data); + static gboolean _autoplugQueryCaps(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data); + static gboolean _autoplugQueryContext(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data); + static gboolean _autoplugQuery(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data); + static GstPadProbeReturn _teeProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data); + static GstPadProbeReturn _videoSinkProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data); + static GstPadProbeReturn _eosProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data); + static GstPadProbeReturn _keyframeWatch(GstPad* pad, GstPadProbeInfo* info, gpointer user_data); + + bool _removingDecoder; + bool _removingRecorder; + GstElement* _source; GstElement* _tee; + GstElement* _decoderValve; + GstElement* _recorderValve; + GstElement* _decoder; + GstElement* _videoSink; + GstElement* _fileSink; + GstElement* _pipeline; - void _noteVideoSinkFrame (); + qint64 _lastSourceFrameTime; + qint64 _lastVideoFrameTime; + bool _resetVideoSink; + gulong _videoSinkProbeId; - static gboolean _onBusMessage (GstBus* bus, GstMessage* message, gpointer user_data); - static GstPadProbeReturn _unlinkCallBack (GstPad* pad, GstPadProbeInfo* info, gpointer user_data); - static GstPadProbeReturn _videoSinkProbe (GstPad* pad, GstPadProbeInfo* info, gpointer user_data); - static GstPadProbeReturn _keyframeWatch (GstPad* pad, GstPadProbeInfo* info, gpointer user_data); + QTimer _watchdogTimer; - virtual void _unlinkRecordingBranch (GstPadProbeInfo* info); - virtual void _shutdownRecordingBranch(); - virtual void _shutdownPipeline (); + //-- RTSP UDP reconnect timeout + uint64_t _udpReconnect_us; - GstElement* _pipeline; - GstElement* _videoSink; - guint64 _lastFrameId; - qint64 _lastFrameTime; + unsigned _timeout; - //-- Wait for Video Server to show up before starting - QTimer _frameTimer; - QTimer _restart_timer; - int _restart_time_ms; + QWaitCondition _taskQueueUpdate; + QMutex _taskQueueSync; + QQueue _taskQueue; + bool _shutdown; - //-- RTSP UDP reconnect timeout - uint64_t _udpReconnect_us; + static const char* _kFileMux[FILE_FORMAT_MAX - FILE_FORMAT_MIN]; +#else +private: #endif - QString _uri; - QString _imageFile; - QString _videoFile; - QString _videoPath; - QString _imagePath; - - bool _videoRunning; - bool _showFullScreen; - bool _streamEnabled; - bool _streamConfigured; - bool _storageLimit; - bool _unittTestMode; - bool _isTaisync; - int _recordingFormatId; // 0 - 2, defined in VideoReceiver.cc / kVideoExtensions. TODO: use a better representation. - int _rtspTimeout; - + std::atomic _streaming; + std::atomic _decoding; + std::atomic _recording; + std::atomic_videoSize; }; - diff --git a/src/VideoStreaming/gstqgcvideosinkbin.c b/src/VideoStreaming/gstqgcvideosinkbin.c index 28e6dc207..943e66821 100644 --- a/src/VideoStreaming/gstqgcvideosinkbin.c +++ b/src/VideoStreaming/gstqgcvideosinkbin.c @@ -125,6 +125,9 @@ _vsb_init(GstQgcVideoSinkBin *vsb) break; } + // FIXME: AV: temporally disable sync due to MPEG2-TS sync issues + g_object_set(vsb->qmlglsink, "sync", FALSE, NULL); + if ((glcolorconvert = gst_element_factory_make("glcolorconvert", NULL)) == NULL) { GST_ERROR_OBJECT(vsb, "gst_element_factory_make('glcolorconvert' failed)"); break; -- 2.22.0