Commit 4ce68a44 authored by Jacob Walser's avatar Jacob Walser

Handle all message processing in _onBusMessage

And don't pass along message pointers
parent b67ca0e3
......@@ -43,8 +43,9 @@ VideoReceiver::VideoReceiver(QObject* parent)
#if defined(QGC_GST_STREAMING)
_timer.setSingleShot(true);
connect(&_timer, &QTimer::timeout, this, &VideoReceiver::_timeout);
connect(this, &VideoReceiver::recordingEOSReceived, this, &VideoReceiver::_eosCB);
connect(this, &VideoReceiver::busMessage, this, &VideoReceiver::_handleBusMessage);
connect(this, &VideoReceiver::msgErrorReceived, this, &VideoReceiver::_handleError);
connect(this, &VideoReceiver::msgEOSReceived, this, &VideoReceiver::_handleEOS);
connect(this, &VideoReceiver::msgStateChangedReceived, this, &VideoReceiver::_handleStateChanged);
#endif
}
......@@ -328,7 +329,12 @@ void VideoReceiver::stop()
GstBus* bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline));
GstMessage* message = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, (GstMessageType)(GST_MESSAGE_EOS|GST_MESSAGE_ERROR));
gst_object_unref(bus);
_handleBusMessage(message);
if(GST_MESSAGE_TYPE(message) == GST_MESSAGE_ERROR) {
_shutdownPipeline();
qCritical() << "Error stopping pipeline!";
} else if(GST_MESSAGE_TYPE(message) == GST_MESSAGE_EOS) {
_handleEOS();
}
gst_message_unref(message);
}
#endif
......@@ -345,58 +351,83 @@ void VideoReceiver::setVideoSavePath(const QString & path)
qCDebug(VideoReceiverLog) << "New Path:" << _path;
}
void VideoReceiver::_shutdownPipeline() {
GstBus* bus = NULL;
if ((bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline))) != NULL) {
gst_bus_disable_sync_message_emission(bus);
gst_object_unref(bus);
bus = NULL;
}
gst_element_set_state(_pipeline, GST_STATE_NULL);
gst_bin_remove(GST_BIN(_pipeline), _videoSink);
gst_object_unref(_pipeline);
_pipeline = NULL;
delete _sink;
_sink = NULL;
_serverPresent = false;
_streaming = false;
_recording = false;
_stopping = false;
_running = false;
emit recordingChanged();
}
#if defined(QGC_GST_STREAMING)
void VideoReceiver::_handleBusMessage(GstMessage* msg)
{
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_ERROR:
do {
gchar* debug;
GError* error;
gst_message_parse_error(msg, &error, &debug);
g_free(debug);
qCritical() << error->message;
g_error_free(error);
} while(0);
// No break!
case GST_MESSAGE_EOS:
{
GstBus* bus = NULL;
if ((bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline))) != NULL) {
gst_bus_disable_sync_message_emission(bus);
gst_object_unref(bus);
bus = NULL;
}
gst_element_set_state(_pipeline, GST_STATE_NULL);
gst_bin_remove(GST_BIN(_pipeline), _videoSink);
gst_object_unref(_pipeline);
_pipeline = NULL;
_serverPresent = false;
_streaming = false;
_recording = false;
_stopping = false;
_running = false;
emit recordingChanged();
void VideoReceiver::_handleError() {
qCDebug(VideoReceiverLog) << "Gstreamer error!";
_shutdownPipeline();
}
#endif
#if defined(QGC_GST_STREAMING)
void VideoReceiver::_handleEOS() {
if(_stopping) {
_shutdownPipeline();
qCDebug(VideoReceiverLog) << "Stopped";
}
break;
case GST_MESSAGE_STATE_CHANGED:
_streaming = GST_STATE(_pipeline) == GST_STATE_PLAYING;
qCDebug(VideoReceiverLog) << "State changed, _streaming:" << _streaming;
break;
default:
break;
} else if(_recording && _sink->removing) {
_shutdownRecordingBranch();
} else {
qCritical() << "VideoReceiver: Unexpected EOS!";
_shutdownPipeline();
}
}
#endif
#if defined(QGC_GST_STREAMING)
void VideoReceiver::_handleStateChanged() {
_streaming = GST_STATE(_pipeline) == GST_STATE_PLAYING;
qCDebug(VideoReceiverLog) << "State changed, _streaming:" << _streaming;
}
#endif
#if defined(QGC_GST_STREAMING)
gboolean VideoReceiver::_onBusMessage(GstBus* bus, GstMessage* msg, gpointer data)
{
Q_UNUSED(bus)
Q_ASSERT(msg != NULL && data != NULL);
VideoReceiver* pThis = (VideoReceiver*)data;
pThis->busMessage(msg);
switch(GST_MESSAGE_TYPE(msg)) {
case(GST_MESSAGE_ERROR): {
gchar* debug;
GError* error;
gst_message_parse_error(msg, &error, &debug);
g_free(debug);
qCritical() << error->message;
g_error_free(error);
pThis->msgErrorReceived();
}
break;
case(GST_MESSAGE_EOS):
pThis->msgEOSReceived();
break;
case(GST_MESSAGE_STATE_CHANGED):
pThis->msgStateChangedReceived();
break;
default:
break;
}
return TRUE;
}
#endif
......@@ -486,10 +517,8 @@ void VideoReceiver::stopRecording(void)
// -At this point all of the recoring elements have been flushed, and the video file has been finalized
// -Now we can remove the temporary pipeline and its elements
#if defined(QGC_GST_STREAMING)
void VideoReceiver::_eosCB(GstMessage* message)
void VideoReceiver::_shutdownRecordingBranch()
{
Q_UNUSED(message)
gst_bin_remove(GST_BIN(_pipelineStopRec), _sink->queue);
gst_bin_remove(GST_BIN(_pipelineStopRec), _sink->mux);
gst_bin_remove(GST_BIN(_pipelineStopRec), _sink->filesink);
......@@ -520,7 +549,7 @@ void VideoReceiver::_eosCB(GstMessage* message)
// -Setup watch and handler for EOS event on the temporary pipeline's bus
// -Send an EOS event at the beginning of that pipeline
#if defined(QGC_GST_STREAMING)
void VideoReceiver::_unlinkCB(GstPadProbeInfo* info)
void VideoReceiver::_detachRecordingBranch(GstPadProbeInfo* info)
{
Q_UNUSED(info)
......@@ -541,7 +570,7 @@ void VideoReceiver::_unlinkCB(GstPadProbeInfo* info)
// Add handler for EOS event
GstBus* bus = gst_pipeline_get_bus(GST_PIPELINE(_pipelineStopRec));
gst_bus_enable_sync_message_emission(bus);
g_signal_connect(bus, "sync-message", G_CALLBACK(_eosCallBack), this);
g_signal_connect(bus, "sync-message", G_CALLBACK(_onBusMessage), this);
gst_object_unref(bus);
if(gst_element_set_state(_pipelineStopRec, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
......@@ -556,31 +585,16 @@ void VideoReceiver::_unlinkCB(GstPadProbeInfo* info)
}
#endif
// This is only installed on the transient _pipelineStopRec in order
// to finalize a video file. It is not used for the main _pipeline.
#if defined(QGC_GST_STREAMING)
gboolean VideoReceiver::_eosCallBack(GstBus* bus, GstMessage* message, gpointer user_data)
{
Q_UNUSED(bus)
Q_ASSERT(message != NULL && user_data != NULL);
if(GST_MESSAGE_TYPE(message) == GST_MESSAGE_EOS) {
VideoReceiver* pThis = (VideoReceiver*)user_data;
pThis->recordingEOSReceived(message);
}
return FALSE;
}
#endif
#if defined(QGC_GST_STREAMING)
GstPadProbeReturn VideoReceiver::_unlinkCallBack(GstPad* pad, GstPadProbeInfo* info, gpointer user_data)
{
Q_UNUSED(pad);
Q_ASSERT(info != NULL && user_data != NULL);
VideoReceiver* pThis = (VideoReceiver*)user_data;
// We will only execute once
if(!g_atomic_int_compare_and_exchange(&pThis->_sink->removing, FALSE, TRUE))
return GST_PAD_PROBE_REMOVE;
pThis->_unlinkCB(info);
// We will only act once
if(g_atomic_int_compare_and_exchange(&pThis->_sink->removing, FALSE, TRUE))
pThis->_detachRecordingBranch(info);
return GST_PAD_PROBE_REMOVE;
}
#endif
......@@ -49,8 +49,11 @@ public:
signals:
void recordingChanged();
void recordingEOSReceived(GstMessage* message);
void busMessage(GstMessage* message);
#if defined(QGC_GST_STREAMING)
void msgErrorReceived();
void msgEOSReceived();
void msgStateChangedReceived();
#endif
public slots:
void start ();
......@@ -63,10 +66,12 @@ public slots:
private slots:
#if defined(QGC_GST_STREAMING)
void _eosCB(GstMessage* message);
void _timeout ();
void _connected ();
void _socketError (QAbstractSocket::SocketError socketError);
void _handleError();
void _handleEOS();
void _handleStateChanged();
#endif
private:
......@@ -88,11 +93,12 @@ private:
Sink* _sink;
GstElement* _tee;
void _handleBusMessage(GstMessage* message);
void _unlinkCB(GstPadProbeInfo* info);
static gboolean _onBusMessage(GstBus* bus, GstMessage* message, gpointer user_data);
static gboolean _eosCallBack(GstBus* bus, GstMessage* message, gpointer user_data);
static GstPadProbeReturn _unlinkCallBack(GstPad* pad, GstPadProbeInfo* info, gpointer user_data);
void _detachRecordingBranch(GstPadProbeInfo* info);
void _shutdownRecordingBranch();
void _shutdownPipeline();
#endif
QString _uri;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment