Commit 4f4af952 authored by Andrew Voznytsa's avatar Andrew Voznytsa

Simplify file recording handling

parent a6d15868
...@@ -62,7 +62,6 @@ VideoReceiver::VideoReceiver(QObject* parent) ...@@ -62,7 +62,6 @@ VideoReceiver::VideoReceiver(QObject* parent)
, _sink(nullptr) , _sink(nullptr)
, _tee(nullptr) , _tee(nullptr)
, _pipeline(nullptr) , _pipeline(nullptr)
, _pipelineStopRec(nullptr)
, _videoSink(nullptr) , _videoSink(nullptr)
, _lastFrameId(G_MAXUINT64) , _lastFrameId(G_MAXUINT64)
, _lastFrameTime(0) , _lastFrameTime(0)
...@@ -571,6 +570,8 @@ VideoReceiver::start() ...@@ -571,6 +570,8 @@ VideoReceiver::start()
break; break;
} }
g_object_set(_pipeline, "message-forward", TRUE, nullptr);
if ((source = _makeSource(uri)) == nullptr) { if ((source = _makeSource(uri)) == nullptr) {
qCritical() << "VideoReceiver::start() failed. Error with _makeSource()"; qCritical() << "VideoReceiver::start() failed. Error with _makeSource()";
break; break;
...@@ -747,6 +748,9 @@ VideoReceiver::_handleError() { ...@@ -747,6 +748,9 @@ VideoReceiver::_handleError() {
void void
VideoReceiver::_handleEOS() { VideoReceiver::_handleEOS() {
if(_stopping) { if(_stopping) {
if(_recording && _sink->removing) {
_shutdownRecordingBranch();
}
_shutdownPipeline(); _shutdownPipeline();
qCDebug(VideoReceiverLog) << "Stopped"; qCDebug(VideoReceiverLog) << "Stopped";
} else if(_recording && _sink->removing) { } else if(_recording && _sink->removing) {
...@@ -795,6 +799,22 @@ VideoReceiver::_onBusMessage(GstBus* bus, GstMessage* msg, gpointer data) ...@@ -795,6 +799,22 @@ VideoReceiver::_onBusMessage(GstBus* bus, GstMessage* msg, gpointer data)
case(GST_MESSAGE_STATE_CHANGED): case(GST_MESSAGE_STATE_CHANGED):
pThis->msgStateChangedReceived(); pThis->msgStateChangedReceived();
break; break;
case(GST_MESSAGE_ELEMENT): {
const GstStructure *s = gst_message_get_structure (msg);
if (gst_structure_has_name (s, "GstBinForwarded")) {
GstMessage *forward_msg = NULL;
gst_structure_get (s, "message", GST_TYPE_MESSAGE, &forward_msg, NULL);
if (forward_msg != nullptr) {
if (GST_MESSAGE_TYPE(forward_msg) == GST_MESSAGE_EOS) {
pThis->msgEOSReceived();
}
gst_message_unref(forward_msg);
forward_msg = nullptr;
}
}
}
break;
default: default:
break; break;
} }
...@@ -887,6 +907,88 @@ VideoReceiver::setVideoSink(GstElement* videoSink) ...@@ -887,6 +907,88 @@ VideoReceiver::setVideoSink(GstElement* videoSink)
// we are adding these elements-> +->teepad-->queue-->matroskamux-->_filesink | // we are adding these elements-> +->teepad-->queue-->matroskamux-->_filesink |
// | | // | |
// +--------------------------------------+ // +--------------------------------------+
GstElement*
VideoReceiver::_makeFileSink(const QString& videoFile, unsigned format)
{
GstElement* fileSink = nullptr;
GstElement* mux = nullptr;
GstElement* sink = nullptr;
GstElement* bin = nullptr;
bool releaseElements = true;
do{
if ((mux = gst_element_factory_make(kVideoMuxes[format], "mux")) == nullptr) {
qCritical() << "VideoReceiver::_makeFileSink() failed. Error with gst_element_factory_make('" << kVideoMuxes[format] << "')";
break;
}
if ((sink = gst_element_factory_make("filesink", "filesink")) == nullptr) {
qCritical() << "VideoReceiver::_makeFileSink() failed. Error with gst_element_factory_make('filesink')";
break;
}
g_object_set(static_cast<gpointer>(sink), "location", qPrintable(videoFile), nullptr);
if ((bin = gst_bin_new("sinkbin")) == nullptr) {
qCritical() << "VideoReceiver::_makeFileSink() failed. Error with gst_bin_new('sinkbin')";
break;
}
GstPadTemplate* padTemplate;
if ((padTemplate = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(mux), "video_%u")) == nullptr) {
qCritical() << "VideoReceiver::_makeFileSink() failed. Error with gst_element_class_get_pad_template(mux)";
break;
}
// FIXME: AV: pad handling is potentially leaking (and other similar places too!)
GstPad* pad;
if ((pad = gst_element_request_pad(mux, padTemplate, nullptr, nullptr)) == nullptr) {
qCritical() << "VideoReceiver::_makeFileSink() failed. Error with gst_element_request_pad(mux)";
break;
}
gst_bin_add_many(GST_BIN(bin), mux, sink, nullptr);
releaseElements = false;
GstPad* ghostpad = gst_ghost_pad_new("sink", pad);
gst_element_add_pad(bin, ghostpad);
gst_object_unref(pad);
pad = nullptr;
if (!gst_element_link(mux, sink)) {
qCritical() << "VideoReceiver::_makeFileSink() failed. Error with gst_element_link()";
break;
}
fileSink = bin;
bin = nullptr;
} while(0);
if (releaseElements) {
if (sink != nullptr) {
gst_object_unref(sink);
sink = nullptr;
}
if (mux != nullptr) {
gst_object_unref(mux);
mux = nullptr;
}
}
if (bin != nullptr) {
gst_object_unref(bin);
bin = nullptr;
}
return fileSink;
}
void void
VideoReceiver::startRecording(const QString &videoFile) VideoReceiver::startRecording(const QString &videoFile)
{ {
...@@ -908,18 +1010,6 @@ VideoReceiver::startRecording(const QString &videoFile) ...@@ -908,18 +1010,6 @@ VideoReceiver::startRecording(const QString &videoFile)
//-- Disk usage maintenance //-- Disk usage maintenance
_cleanupOldVideos(); _cleanupOldVideos();
_sink = new Sink();
_sink->teepad = gst_element_get_request_pad(_tee, "src_%u");
_sink->queue = gst_element_factory_make("queue", nullptr);
_sink->mux = gst_element_factory_make(kVideoMuxes[muxIdx], nullptr);
_sink->filesink = gst_element_factory_make("filesink", nullptr);
_sink->removing = false;
if(!_sink->teepad || !_sink->queue || !_sink->mux || !_sink->filesink) {
qCritical() << "VideoReceiver::startRecording() failed to make _sink elements";
return;
}
if(videoFile.isEmpty()) { if(videoFile.isEmpty()) {
QString savePath = qgcApp()->toolbox()->settingsManager()->appSettings()->videoSavePath(); QString savePath = qgcApp()->toolbox()->settingsManager()->appSettings()->videoSavePath();
if(savePath.isEmpty()) { if(savePath.isEmpty()) {
...@@ -930,20 +1020,28 @@ VideoReceiver::startRecording(const QString &videoFile) ...@@ -930,20 +1020,28 @@ VideoReceiver::startRecording(const QString &videoFile)
} else { } else {
_videoFile = videoFile; _videoFile = videoFile;
} }
qDebug() << "New video file:" << _videoFile;
emit videoFileChanged(); emit videoFileChanged();
g_object_set(static_cast<gpointer>(_sink->filesink), "location", qPrintable(_videoFile), nullptr); _sink = new Sink();
qCDebug(VideoReceiverLog) << "New video file:" << _videoFile; _sink->teepad = gst_element_get_request_pad(_tee, "src_%u");
_sink->queue = gst_element_factory_make("queue", nullptr);
_sink->filesink = _makeFileSink(_videoFile, muxIdx);
_sink->removing = false;
if(!_sink->teepad || !_sink->queue || !_sink->filesink) {
qCritical() << "VideoReceiver::startRecording() failed to make _sink elements";
return;
}
gst_object_ref(_sink->queue); gst_object_ref(_sink->queue);
gst_object_ref(_sink->mux);
gst_object_ref(_sink->filesink); gst_object_ref(_sink->filesink);
gst_bin_add_many(GST_BIN(_pipeline), _sink->queue, _sink->mux, nullptr); gst_bin_add(GST_BIN(_pipeline), _sink->queue);
gst_element_link_many(_sink->queue, _sink->mux, nullptr);
gst_element_sync_state_with_parent(_sink->queue); gst_element_sync_state_with_parent(_sink->queue);
gst_element_sync_state_with_parent(_sink->mux);
// Install a probe on the recording branch to drop buffers until we hit our first keyframe // Install a probe on the recording branch to drop buffers until we hit our first keyframe
// When we hit our first keyframe, we can offset the timestamps appropriately according to the first keyframe time // When we hit our first keyframe, we can offset the timestamps appropriately according to the first keyframe time
...@@ -951,7 +1049,7 @@ VideoReceiver::startRecording(const QString &videoFile) ...@@ -951,7 +1049,7 @@ VideoReceiver::startRecording(const QString &videoFile)
// Once we have this valid frame, we attach the filesink. // Once we have this valid frame, we attach the filesink.
// Attaching it here would cause the filesink to fail to preroll and to stall the pipeline for a few seconds. // Attaching it here would cause the filesink to fail to preroll and to stall the pipeline for a few seconds.
GstPad* probepad = gst_element_get_static_pad(_sink->queue, "src"); GstPad* probepad = gst_element_get_static_pad(_sink->queue, "src");
gst_pad_add_probe(probepad, (GstPadProbeType)(GST_PAD_PROBE_TYPE_BUFFER /* | GST_PAD_PROBE_TYPE_BLOCK */), _keyframeWatch, this, nullptr); // to drop the buffer or to block the buffer? gst_pad_add_probe(probepad, GST_PAD_PROBE_TYPE_BUFFER, _keyframeWatch, this, nullptr); // to drop the buffer
gst_object_unref(probepad); gst_object_unref(probepad);
// Link the recording branch to the pipeline // Link the recording branch to the pipeline
...@@ -959,6 +1057,13 @@ VideoReceiver::startRecording(const QString &videoFile) ...@@ -959,6 +1057,13 @@ VideoReceiver::startRecording(const QString &videoFile)
gst_pad_link(_sink->teepad, sinkpad); gst_pad_link(_sink->teepad, sinkpad);
gst_object_unref(sinkpad); gst_object_unref(sinkpad);
// // Add the filesink once we have a valid I-frame
// gst_bin_add(GST_BIN(_pipeline), _sink->filesink);
// if (!gst_element_link(_sink->queue, _sink->filesink)) {
// qCritical() << "Failed to link queue and file sink";
// }
// gst_element_sync_state_with_parent(_sink->filesink);
GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-recording"); GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-recording");
_recording = true; _recording = true;
...@@ -995,20 +1100,13 @@ VideoReceiver::stopRecording(void) ...@@ -995,20 +1100,13 @@ VideoReceiver::stopRecording(void)
void void
VideoReceiver::_shutdownRecordingBranch() VideoReceiver::_shutdownRecordingBranch()
{ {
gst_bin_remove(GST_BIN(_pipelineStopRec), _sink->queue); gst_bin_remove(GST_BIN(_pipeline), _sink->queue);
gst_bin_remove(GST_BIN(_pipelineStopRec), _sink->mux); gst_bin_remove(GST_BIN(_pipeline), _sink->filesink);
gst_bin_remove(GST_BIN(_pipelineStopRec), _sink->filesink);
gst_element_set_state(_pipelineStopRec, GST_STATE_NULL);
gst_object_unref(_pipelineStopRec);
_pipelineStopRec = nullptr;
gst_element_set_state(_sink->filesink, GST_STATE_NULL);
gst_element_set_state(_sink->mux, GST_STATE_NULL);
gst_element_set_state(_sink->queue, GST_STATE_NULL); gst_element_set_state(_sink->queue, GST_STATE_NULL);
gst_element_set_state(_sink->filesink, GST_STATE_NULL);
gst_object_unref(_sink->queue); gst_object_unref(_sink->queue);
gst_object_unref(_sink->mux);
gst_object_unref(_sink->filesink); gst_object_unref(_sink->filesink);
delete _sink; delete _sink;
...@@ -1027,39 +1125,18 @@ VideoReceiver::_shutdownRecordingBranch() ...@@ -1027,39 +1125,18 @@ VideoReceiver::_shutdownRecordingBranch()
// -Send an EOS event at the beginning of that pipeline // -Send an EOS event at the beginning of that pipeline
#if defined(QGC_GST_STREAMING) #if defined(QGC_GST_STREAMING)
void void
VideoReceiver::_detachRecordingBranch(GstPadProbeInfo* info) VideoReceiver::_unlinkRecordingBranch(GstPadProbeInfo* info)
{ {
Q_UNUSED(info) Q_UNUSED(info)
// Also unlinks and unrefs
gst_bin_remove_many(GST_BIN(_pipeline), _sink->queue, _sink->mux, _sink->filesink, nullptr);
// Give tee its pad back
gst_element_release_request_pad(_tee, _sink->teepad);
gst_object_unref(_sink->teepad);
// Create temporary pipeline
_pipelineStopRec = gst_pipeline_new("pipeStopRec");
// Put our elements from the recording branch into the temporary pipeline
gst_bin_add_many(GST_BIN(_pipelineStopRec), _sink->queue, _sink->mux, _sink->filesink, nullptr);
gst_element_link_many(_sink->queue, _sink->mux, _sink->filesink, nullptr);
// Add handler for EOS event
GstBus* bus = gst_pipeline_get_bus(GST_PIPELINE(_pipelineStopRec));
gst_bus_enable_sync_message_emission(bus);
g_signal_connect(bus, "sync-message", G_CALLBACK(_onBusMessage), this);
gst_object_unref(bus);
if(gst_element_set_state(_pipelineStopRec, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
qCDebug(VideoReceiverLog) << "problem starting _pipelineStopRec";
}
// Send EOS at the beginning of the pipeline // Send EOS at the beginning of the pipeline
GstPad* sinkpad = gst_element_get_static_pad(_sink->queue, "sink"); GstPad* sinkpad = gst_element_get_static_pad(_sink->queue, "sink");
gst_pad_unlink(_sink->teepad, sinkpad);
gst_pad_send_event(sinkpad, gst_event_new_eos()); gst_pad_send_event(sinkpad, gst_event_new_eos());
gst_object_unref(sinkpad); gst_object_unref(sinkpad);
qCDebug(VideoReceiverLog) << "Recording branch unlinked"; qCDebug(VideoReceiverLog) << "Recording EOS was sent";
// Give tee its pad back
gst_element_release_request_pad(_tee, _sink->teepad);
gst_object_unref(_sink->teepad);
} }
#endif #endif
...@@ -1073,7 +1150,7 @@ VideoReceiver::_unlinkCallBack(GstPad* pad, GstPadProbeInfo* info, gpointer user ...@@ -1073,7 +1150,7 @@ VideoReceiver::_unlinkCallBack(GstPad* pad, GstPadProbeInfo* info, gpointer user
VideoReceiver* pThis = static_cast<VideoReceiver*>(user_data); VideoReceiver* pThis = static_cast<VideoReceiver*>(user_data);
// We will only act once // We will only act once
if(g_atomic_int_compare_and_exchange(&pThis->_sink->removing, FALSE, TRUE)) { if(g_atomic_int_compare_and_exchange(&pThis->_sink->removing, FALSE, TRUE)) {
pThis->_detachRecordingBranch(info); pThis->_unlinkRecordingBranch(info);
} }
} }
return GST_PAD_PROBE_REMOVE; return GST_PAD_PROBE_REMOVE;
...@@ -1134,8 +1211,10 @@ VideoReceiver::_keyframeWatch(GstPad* pad, GstPadProbeInfo* info, gpointer user_ ...@@ -1134,8 +1211,10 @@ VideoReceiver::_keyframeWatch(GstPad* pad, GstPadProbeInfo* info, gpointer user_
gst_pad_set_offset(pad, position); gst_pad_set_offset(pad, position);
// Add the filesink once we have a valid I-frame // Add the filesink once we have a valid I-frame
gst_bin_add_many(GST_BIN(pThis->_pipeline), pThis->_sink->filesink, nullptr); gst_bin_add(GST_BIN(pThis->_pipeline), pThis->_sink->filesink);
gst_element_link_many(pThis->_sink->mux, pThis->_sink->filesink, nullptr); if (!gst_element_link(pThis->_sink->queue, pThis->_sink->filesink)) {
qCritical() << "Failed to link queue and file sink";
}
gst_element_sync_state_with_parent(pThis->_sink->filesink); gst_element_sync_state_with_parent(pThis->_sink->filesink);
qCDebug(VideoReceiverLog) << "Got keyframe, stop dropping buffers"; qCDebug(VideoReceiverLog) << "Got keyframe, stop dropping buffers";
......
...@@ -85,6 +85,7 @@ protected slots: ...@@ -85,6 +85,7 @@ protected slots:
virtual void _updateTimer (); virtual void _updateTimer ();
#if defined(QGC_GST_STREAMING) #if defined(QGC_GST_STREAMING)
GstElement* _makeSource (const QString& uri); GstElement* _makeSource (const QString& uri);
GstElement* _makeFileSink (const QString& videoFile, unsigned format);
virtual void _restart_timeout (); virtual void _restart_timeout ();
virtual void _tcp_timeout (); virtual void _tcp_timeout ();
virtual void _connected (); virtual void _connected ();
...@@ -101,7 +102,6 @@ protected: ...@@ -101,7 +102,6 @@ protected:
{ {
GstPad* teepad; GstPad* teepad;
GstElement* queue; GstElement* queue;
GstElement* mux;
GstElement* filesink; GstElement* filesink;
gboolean removing; gboolean removing;
} Sink; } Sink;
...@@ -122,13 +122,12 @@ protected: ...@@ -122,13 +122,12 @@ protected:
static GstPadProbeReturn _videoSinkProbe (GstPad* pad, GstPadProbeInfo* info, gpointer user_data); static GstPadProbeReturn _videoSinkProbe (GstPad* pad, GstPadProbeInfo* info, gpointer user_data);
static GstPadProbeReturn _keyframeWatch (GstPad* pad, GstPadProbeInfo* info, gpointer user_data); static GstPadProbeReturn _keyframeWatch (GstPad* pad, GstPadProbeInfo* info, gpointer user_data);
virtual void _detachRecordingBranch (GstPadProbeInfo* info); virtual void _unlinkRecordingBranch (GstPadProbeInfo* info);
virtual void _shutdownRecordingBranch(); virtual void _shutdownRecordingBranch();
virtual void _shutdownPipeline (); virtual void _shutdownPipeline ();
virtual void _cleanupOldVideos (); virtual void _cleanupOldVideos ();
GstElement* _pipeline; GstElement* _pipeline;
GstElement* _pipelineStopRec;
GstElement* _videoSink; GstElement* _videoSink;
guint64 _lastFrameId; guint64 _lastFrameId;
qint64 _lastFrameTime; qint64 _lastFrameTime;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment