From fcf9d0dd23caa2648d988d119ce7e0e256ec5234 Mon Sep 17 00:00:00 2001 From: Andrew Voznytsa Date: Tue, 31 Mar 2020 13:49:19 +0300 Subject: [PATCH] emit signals from slot handler worker --- src/VideoReceiver/GstVideoReceiver.cc | 113 ++++++++++++++------------ src/VideoReceiver/GstVideoReceiver.h | 7 +- 2 files changed, 64 insertions(+), 56 deletions(-) diff --git a/src/VideoReceiver/GstVideoReceiver.cc b/src/VideoReceiver/GstVideoReceiver.cc index f350304c8..6e6ea4085 100644 --- a/src/VideoReceiver/GstVideoReceiver.cc +++ b/src/VideoReceiver/GstVideoReceiver.cc @@ -50,10 +50,10 @@ GstVideoReceiver::GstVideoReceiver(QObject* parent) , _resetVideoSink(true) , _videoSinkProbeId(0) , _udpReconnect_us(5000000) + , _signalDepth(0) , _endOfStream(false) { - _apiHandler.start(); - _notificationHandler.start(); + _slotHandler.start(); connect(&_watchdogTimer, &QTimer::timeout, this, &GstVideoReceiver::_watchdog); _watchdogTimer.start(1000); } @@ -61,16 +61,15 @@ GstVideoReceiver::GstVideoReceiver(QObject* parent) GstVideoReceiver::~GstVideoReceiver(void) { stop(); - _notificationHandler.shutdown(); - _apiHandler.shutdown(); + _slotHandler.shutdown(); } void GstVideoReceiver::start(const QString& uri, unsigned timeout) { - if (_apiHandler.needDispatch()) { + if (_needDispatch()) { QString cachedUri = uri; - _apiHandler.dispatch([this, cachedUri, timeout]() { + _slotHandler.dispatch([this, cachedUri, timeout]() { start(cachedUri, timeout); }); return; @@ -78,7 +77,7 @@ GstVideoReceiver::start(const QString& uri, unsigned timeout) if(_pipeline) { qCDebug(VideoReceiverLog) << "Already running!"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartComplete(STATUS_INVALID_STATE); }); return; @@ -86,7 +85,7 @@ GstVideoReceiver::start(const QString& uri, unsigned timeout) if (uri.isEmpty()) { qCDebug(VideoReceiverLog) << "Failed because URI is not specified"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartComplete(STATUS_INVALID_URL); }); return; @@ -231,14 +230,14 @@ GstVideoReceiver::start(const QString& uri, unsigned timeout) } } - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartComplete(STATUS_FAIL); }); } else { GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-started"); qCDebug(VideoReceiverLog) << "Started"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartComplete(STATUS_OK); }); } @@ -247,8 +246,8 @@ GstVideoReceiver::start(const QString& uri, unsigned timeout) void GstVideoReceiver::stop(void) { - if (_apiHandler.needDispatch()) { - _apiHandler.dispatch([this]() { + if (_needDispatch()) { + _slotHandler.dispatch([this]() { stop(); }); return; @@ -319,12 +318,12 @@ GstVideoReceiver::stop(void) if (_streaming) { _streaming = false; qCDebug(VideoReceiverLog) << "Streaming stopped"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit streamingChanged(); }); } else { qCDebug(VideoReceiverLog) << "Streaming did not start"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit timeout(); }); } @@ -332,7 +331,7 @@ GstVideoReceiver::stop(void) qCDebug(VideoReceiverLog) << "Stopped"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStopComplete(STATUS_OK); }); } @@ -345,10 +344,10 @@ GstVideoReceiver::startDecoding(void* sink) return; } - if (_apiHandler.needDispatch()) { + if (_needDispatch()) { GstElement* videoSink = GST_ELEMENT(sink); gst_object_ref(videoSink); - _apiHandler.dispatch([this, videoSink]() mutable { + _slotHandler.dispatch([this, videoSink]() mutable { startDecoding(videoSink); gst_object_unref(videoSink); }); @@ -368,7 +367,7 @@ GstVideoReceiver::startDecoding(void* sink) if(_videoSink != nullptr || _decoding) { qCDebug(VideoReceiverLog) << "Already decoding!"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartDecodingComplete(STATUS_INVALID_STATE); }); return; @@ -378,7 +377,7 @@ GstVideoReceiver::startDecoding(void* sink) if ((pad = gst_element_get_static_pad(videoSink, "sink")) == nullptr) { qCCritical(VideoReceiverLog) << "Unable to find sink pad of video sink"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartDecodingComplete(STATUS_FAIL); }); return; @@ -397,7 +396,7 @@ GstVideoReceiver::startDecoding(void* sink) _removingDecoder = false; if (!_streaming) { - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartDecodingComplete(STATUS_OK); }); return; @@ -405,7 +404,7 @@ GstVideoReceiver::startDecoding(void* sink) if (!_addDecoder(_decoderValve)) { qCCritical(VideoReceiverLog) << "_addDecoder() failed"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartDecodingComplete(STATUS_FAIL); }); return; @@ -415,7 +414,7 @@ GstVideoReceiver::startDecoding(void* sink) qCDebug(VideoReceiverLog) << "Decoding started"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartDecodingComplete(STATUS_OK); }); } @@ -423,8 +422,8 @@ GstVideoReceiver::startDecoding(void* sink) void GstVideoReceiver::stopDecoding(void) { - if (_apiHandler.needDispatch()) { - _apiHandler.dispatch([this]() { + if (_needDispatch()) { + _slotHandler.dispatch([this]() { stopDecoding(); }); return; @@ -435,7 +434,7 @@ GstVideoReceiver::stopDecoding(void) // exit immediately if we are not decoding if (_pipeline == nullptr || !_decoding) { qCDebug(VideoReceiverLog) << "Not decoding!"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStopDecodingComplete(STATUS_INVALID_STATE); }); return; @@ -449,7 +448,7 @@ GstVideoReceiver::stopDecoding(void) // FIXME: AV: it is much better to emit onStopDecodingComplete() after decoding is really stopped // (which happens later due to async design) but as for now it is also not so bad... - _dispatchNotification([this, ret](){ + _dispatchSignal([this, ret](){ emit onStopDecodingComplete(ret ? STATUS_OK : STATUS_FAIL); }); } @@ -457,9 +456,9 @@ GstVideoReceiver::stopDecoding(void) void GstVideoReceiver::startRecording(const QString& videoFile, FILE_FORMAT format) { - if (_apiHandler.needDispatch()) { + if (_needDispatch()) { QString cachedVideoFile = videoFile; - _apiHandler.dispatch([this, cachedVideoFile, format]() { + _slotHandler.dispatch([this, cachedVideoFile, format]() { startRecording(cachedVideoFile, format); }); return; @@ -469,7 +468,7 @@ GstVideoReceiver::startRecording(const QString& videoFile, FILE_FORMAT format) if (_pipeline == nullptr) { qCDebug(VideoReceiverLog) << "Streaming is not active!"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartRecordingComplete(STATUS_INVALID_STATE); }); return; @@ -477,7 +476,7 @@ GstVideoReceiver::startRecording(const QString& videoFile, FILE_FORMAT format) if (_recording) { qCDebug(VideoReceiverLog) << "Already recording!"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartRecordingComplete(STATUS_INVALID_STATE); }); return; @@ -487,7 +486,7 @@ GstVideoReceiver::startRecording(const QString& videoFile, FILE_FORMAT format) if ((_fileSink = _makeFileSink(videoFile, format)) == nullptr) { qCCritical(VideoReceiverLog) << "_makeFileSink() failed"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartRecordingComplete(STATUS_FAIL); }); return; @@ -501,7 +500,7 @@ GstVideoReceiver::startRecording(const QString& videoFile, FILE_FORMAT format) if (!gst_element_link(_recorderValve, _fileSink)) { qCCritical(VideoReceiverLog) << "Failed to link valve and file sink"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartRecordingComplete(STATUS_FAIL); }); return; @@ -518,7 +517,7 @@ GstVideoReceiver::startRecording(const QString& videoFile, FILE_FORMAT format) if ((probepad = gst_element_get_static_pad(_recorderValve, "src")) == nullptr) { qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartRecordingComplete(STATUS_FAIL); }); return; @@ -532,7 +531,7 @@ GstVideoReceiver::startRecording(const QString& videoFile, FILE_FORMAT format) _recording = true; qCDebug(VideoReceiverLog) << "Recording started"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStartRecordingComplete(STATUS_OK); emit recordingChanged(); }); @@ -542,8 +541,8 @@ GstVideoReceiver::startRecording(const QString& videoFile, FILE_FORMAT format) void GstVideoReceiver::stopRecording(void) { - if (_apiHandler.needDispatch()) { - _apiHandler.dispatch([this]() { + if (_needDispatch()) { + _slotHandler.dispatch([this]() { stopRecording(); }); return; @@ -554,7 +553,7 @@ GstVideoReceiver::stopRecording(void) // exit immediately if we are not recording if (_pipeline == nullptr || !_recording) { qCDebug(VideoReceiverLog) << "Not recording!"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onStopRecordingComplete(STATUS_INVALID_STATE); }); return; @@ -568,7 +567,7 @@ GstVideoReceiver::stopRecording(void) // FIXME: AV: it is much better to emit onStopRecordingComplete() after recording is really stopped // (which happens later due to async design) but as for now it is also not so bad... - _dispatchNotification([this, ret](){ + _dispatchSignal([this, ret](){ emit onStopRecordingComplete(ret ? STATUS_OK : STATUS_FAIL); }); } @@ -576,16 +575,16 @@ GstVideoReceiver::stopRecording(void) void GstVideoReceiver::takeScreenshot(const QString& imageFile) { - if (_apiHandler.needDispatch()) { + if (_needDispatch()) { QString cachedImageFile = imageFile; - _apiHandler.dispatch([this, cachedImageFile]() { + _slotHandler.dispatch([this, cachedImageFile]() { takeScreenshot(cachedImageFile); }); return; } // FIXME: AV: record screenshot here - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit onTakeScreenshotComplete(STATUS_NOT_IMPLEMENTED); }); } @@ -599,7 +598,7 @@ const char* GstVideoReceiver::_kFileMux[FILE_FORMAT_MAX - FILE_FORMAT_MIN] = { void GstVideoReceiver::_watchdog(void) { - _apiHandler.dispatch([this](){ + _slotHandler.dispatch([this](){ if(_pipeline == nullptr) { return; } @@ -612,7 +611,7 @@ GstVideoReceiver::_watchdog(void) if (now - _lastSourceFrameTime > _timeout) { qCDebug(VideoReceiverLog) << "Stream timeout, no frames for " << now - _lastSourceFrameTime; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit timeout(); }); } @@ -624,7 +623,7 @@ GstVideoReceiver::_watchdog(void) if (now - _lastVideoFrameTime > _timeout * 2) { qCDebug(VideoReceiverLog) << "Video decoder timeout, no frames for " << now - _lastVideoFrameTime; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit timeout(); }); } @@ -932,7 +931,7 @@ GstVideoReceiver::_onNewSourcePad(GstPad* pad) if (!_streaming) { _streaming = true; qCDebug(VideoReceiverLog) << "Streaming started"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit streamingChanged(); }); } @@ -1059,7 +1058,7 @@ GstVideoReceiver::_addVideoSink(GstPad* pad) _decoding = true; qCDebug(VideoReceiverLog) << "Decoding started"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit decodingChanged(); }); @@ -1179,7 +1178,7 @@ GstVideoReceiver::_shutdownDecodingBranch(void) if (_decoding) { _decoding = false; qCDebug(VideoReceiverLog) << "Decoding stopped"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit decodingChanged(); }); } @@ -1200,7 +1199,7 @@ GstVideoReceiver::_shutdownRecordingBranch(void) if (_recording) { _recording = false; qCDebug(VideoReceiverLog) << "Recording stopped"; - _dispatchNotification([this](){ + _dispatchSignal([this](){ emit recordingChanged(); }); } @@ -1208,10 +1207,18 @@ GstVideoReceiver::_shutdownRecordingBranch(void) GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-recording-stopped"); } +bool +GstVideoReceiver::_needDispatch(void) +{ + return _slotHandler.needDispatch(); +} + void -GstVideoReceiver::_dispatchNotification(std::function notification) +GstVideoReceiver::_dispatchSignal(std::function emitter) { - _notificationHandler.dispatch(notification); + _signalDepth += 1; + emitter(); + _signalDepth -= 1; } gboolean @@ -1240,14 +1247,14 @@ GstVideoReceiver::_onBusMessage(GstBus* bus, GstMessage* msg, gpointer data) error = nullptr; } - pThis->_apiHandler.dispatch([pThis](){ + pThis->_slotHandler.dispatch([pThis](){ qCDebug(VideoReceiverLog) << "Stoppping because of error"; pThis->stop(); }); } while(0); break; case GST_MESSAGE_EOS: - pThis->_apiHandler.dispatch([pThis](){ + pThis->_slotHandler.dispatch([pThis](){ qCDebug(VideoReceiverLog) << "Received EOS"; pThis->_handleEOS(); }); @@ -1269,7 +1276,7 @@ GstVideoReceiver::_onBusMessage(GstBus* bus, GstMessage* msg, gpointer data) } if (GST_MESSAGE_TYPE(forward_msg) == GST_MESSAGE_EOS) { - pThis->_apiHandler.dispatch([pThis](){ + pThis->_slotHandler.dispatch([pThis](){ qCDebug(VideoReceiverLog) << "Received branch EOS"; pThis->_handleEOS(); }); diff --git a/src/VideoReceiver/GstVideoReceiver.h b/src/VideoReceiver/GstVideoReceiver.h index 26fea20d3..a16d96eaf 100644 --- a/src/VideoReceiver/GstVideoReceiver.h +++ b/src/VideoReceiver/GstVideoReceiver.h @@ -122,7 +122,8 @@ protected: virtual void _shutdownDecodingBranch (void); virtual void _shutdownRecordingBranch(void); - void _dispatchNotification(std::function notification); + bool _needDispatch(void); + void _dispatchSignal(std::function emitter); private: static gboolean _onBusMessage(GstBus* bus, GstMessage* message, gpointer user_data); @@ -161,8 +162,8 @@ private: unsigned _timeout; - Worker _apiHandler; - Worker _notificationHandler; + Worker _slotHandler; + uint32_t _signalDepth; bool _endOfStream; -- 2.22.0