GstVideoReceiver.cc 46.9 KB
Newer Older
1 2
/****************************************************************************
 *
Gus Grubba's avatar
Gus Grubba committed
3
 * (c) 2009-2020 QGROUNDCONTROL PROJECT <http://www.qgroundcontrol.org>
4 5 6 7 8
 *
 * QGroundControl is licensed according to the terms in the file
 * COPYING.md in the root of the source code directory.
 *
 ****************************************************************************/
Gus Grubba's avatar
Gus Grubba committed
9 10 11 12 13


/**
 * @file
 *   @brief QGC Video Receiver
Gus Grubba's avatar
Gus Grubba committed
14
 *   @author Gus Grubba <gus@auterion.com>
Gus Grubba's avatar
Gus Grubba committed
15 16
 */

17
#include "GstVideoReceiver.h"
18

Gus Grubba's avatar
Gus Grubba committed
19
#include <QDebug>
20
#include <QUrl>
21
#include <QDateTime>
22
#include <QSysInfo>
23

24 25
QGC_LOGGING_CATEGORY(VideoReceiverLog, "VideoReceiverLog")

26 27 28 29 30 31 32 33 34
//-----------------------------------------------------------------------------
// Our pipeline look like this:
//
//              +-->queue-->_decoderValve[-->_decoder-->_videoSink]
//              |
// _source-->_tee
//              |
//              +-->queue-->_recorderValve[-->_fileSink]
//
35

36 37
GstVideoReceiver::GstVideoReceiver(QObject* parent)
    : VideoReceiver(parent)
38 39 40
    , _streaming(false)
    , _decoding(false)
    , _recording(false)
41 42 43
    , _removingDecoder(false)
    , _removingRecorder(false)
    , _source(nullptr)
Gus Grubba's avatar
Gus Grubba committed
44
    , _tee(nullptr)
45 46 47
    , _decoderValve(nullptr)
    , _recorderValve(nullptr)
    , _decoder(nullptr)
Gus Grubba's avatar
Gus Grubba committed
48
    , _videoSink(nullptr)
49 50 51 52 53 54
    , _fileSink(nullptr)
    , _pipeline(nullptr)
    , _lastSourceFrameTime(0)
    , _lastVideoFrameTime(0)
    , _resetVideoSink(true)
    , _videoSinkProbeId(0)
55
    , _udpReconnect_us(5000000)
56
    , _signalDepth(0)
57
    , _endOfStream(false)
Gus Grubba's avatar
Gus Grubba committed
58
{
59
    _slotHandler.start();
60
    connect(&_watchdogTimer, &QTimer::timeout, this, &GstVideoReceiver::_watchdog);
61
    _watchdogTimer.start(1000);
Gus Grubba's avatar
Gus Grubba committed
62 63
}

64
GstVideoReceiver::~GstVideoReceiver(void)
Gus Grubba's avatar
Gus Grubba committed
65
{
66
    _slotHandler.shutdown();
Gus Grubba's avatar
Gus Grubba committed
67 68
}

69
void
70
GstVideoReceiver::start(const QString& uri, unsigned timeout, int buffer)
71
{
72
    if (_needDispatch()) {
73
        QString cachedUri = uri;
74 75
        _slotHandler.dispatch([this, cachedUri, timeout, buffer]() {
            start(cachedUri, timeout, buffer);
76 77 78
        });
        return;
    }
79

80
    if(_pipeline) {
81
        qCDebug(VideoReceiverLog) << "Already running!" << _uri;
82
        _dispatchSignal([this](){
83
            emit onStartComplete(STATUS_INVALID_STATE);
84
        });
85 86
        return;
    }
87

88 89
    if (uri.isEmpty()) {
        qCDebug(VideoReceiverLog) << "Failed because URI is not specified";
90
        _dispatchSignal([this](){
91
            emit onStartComplete(STATUS_INVALID_URL);
92
        });
93
        return;
94 95
    }

96
    _uri = uri;
97 98
    _timeout = timeout;
    _buffer = buffer;
99

100
    qCDebug(VideoReceiverLog) << "Starting" << _uri << ", buffer" << _buffer;
101

102 103
    _endOfStream = false;

104 105
    bool running    = false;
    bool pipelineUp = false;
106

107 108
    GstElement* decoderQueue = nullptr;
    GstElement* recorderQueue = nullptr;
109

110 111 112 113 114
    do {
        if((_tee = gst_element_factory_make("tee", nullptr)) == nullptr)  {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('tee') failed";
            break;
        }
115

116
        GstPad* pad;
117

118 119 120 121
        if ((pad = gst_element_get_static_pad(_tee, "sink")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed";
            break;
        }
122

123
        _lastSourceFrameTime = 0;
124

125 126 127
        gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _teeProbe, this, nullptr);
        gst_object_unref(pad);
        pad = nullptr;
128

129 130 131 132
        if((decoderQueue = gst_element_factory_make("queue", nullptr)) == nullptr)  {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('queue') failed";
            break;
        }
133

134 135 136 137
        if((_decoderValve = gst_element_factory_make("valve", nullptr)) == nullptr)  {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('valve') failed";
            break;
        }
138

139
        g_object_set(_decoderValve, "drop", TRUE, nullptr);
140

141 142 143 144
        if((recorderQueue = gst_element_factory_make("queue", nullptr)) == nullptr)  {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('queue') failed";
            break;
        }
145

146 147 148 149
        if((_recorderValve = gst_element_factory_make("valve", nullptr)) == nullptr)  {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('valve') failed";
            break;
        }
150

151
        g_object_set(_recorderValve, "drop", TRUE, nullptr);
152

153 154 155 156
        if ((_pipeline = gst_pipeline_new("receiver")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_pipeline_new() failed";
            break;
        }
157

158
        g_object_set(_pipeline, "message-forward", TRUE, nullptr);
159

160 161 162 163
        if ((_source = _makeSource(uri)) == nullptr) {
            qCCritical(VideoReceiverLog) << "_makeSource() failed";
            break;
        }
164

165
        gst_bin_add_many(GST_BIN(_pipeline), _source, _tee, decoderQueue, _decoderValve, recorderQueue, _recorderValve, nullptr);
166

167
        pipelineUp = true;
168

169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
        GstPad* srcPad = nullptr;

        GstIterator* it;

        if ((it = gst_element_iterate_src_pads(_source)) != nullptr) {
            GValue vpad = G_VALUE_INIT;

            if (gst_iterator_next(it, &vpad) == GST_ITERATOR_OK) {
                srcPad = GST_PAD(g_value_get_object(&vpad));
                gst_object_ref(srcPad);
                g_value_reset(&vpad);
            }

            gst_iterator_free(it);
            it = nullptr;
        }

        if (srcPad != nullptr) {
            _onNewSourcePad(srcPad);
            gst_object_unref(srcPad);
            srcPad = nullptr;
        } else {
            g_signal_connect(_source, "pad-added", G_CALLBACK(_onNewPad), this);
        }

194 195 196 197
        if(!gst_element_link_many(_tee, decoderQueue, _decoderValve, nullptr)) {
            qCCritical(VideoReceiverLog) << "Unable to link decoder queue";
            break;
        }
198

199 200 201 202
        if(!gst_element_link_many(_tee, recorderQueue, _recorderValve, nullptr)) {
            qCCritical(VideoReceiverLog) << "Unable to link recorder queue";
            break;
        }
203

204
        GstBus* bus = nullptr;
205

206 207 208 209 210 211
        if ((bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline))) != nullptr) {
            gst_bus_enable_sync_message_emission(bus);
            g_signal_connect(bus, "sync-message", G_CALLBACK(_onBusMessage), this);
            gst_object_unref(bus);
            bus = nullptr;
        }
212

213 214 215 216 217 218 219 220 221 222 223 224
        GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-initial");
        running = gst_element_set_state(_pipeline, GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE;
    } while(0);

    if (!running) {
        qCCritical(VideoReceiverLog) << "Failed";

        // In newer versions, the pipeline will clean up all references that are added to it
        if (_pipeline != nullptr) {
            gst_element_set_state(_pipeline, GST_STATE_NULL);
            gst_object_unref(_pipeline);
            _pipeline = nullptr;
225 226
        }

227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
        // If we failed before adding items to the pipeline, then clean up
        if (!pipelineUp) {
            if (_recorderValve != nullptr) {
                gst_object_unref(_recorderValve);
                _recorderValve = nullptr;
            }

            if (recorderQueue != nullptr) {
                gst_object_unref(recorderQueue);
                recorderQueue = nullptr;
            }

            if (_decoderValve != nullptr) {
                gst_object_unref(_decoderValve);
                _decoderValve = nullptr;
            }

            if (decoderQueue != nullptr) {
                gst_object_unref(decoderQueue);
                decoderQueue = nullptr;
            }

            if (_tee != nullptr) {
                gst_object_unref(_tee);
                _tee = nullptr;
            }

            if (_source != nullptr) {
                gst_object_unref(_source);
                _source = nullptr;
            }
        }
259

260
        _dispatchSignal([this](){
261
            emit onStartComplete(STATUS_FAIL);
262
        });
263 264
    } else {
        GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-started");
265
        qCDebug(VideoReceiverLog) << "Started" << _uri;
266

267
        _dispatchSignal([this](){
268
            emit onStartComplete(STATUS_OK);
269
        });
270
    }
271
}
272

273
void
274
GstVideoReceiver::stop(void)
275
{
276 277
    if (_needDispatch()) {
        _slotHandler.dispatch([this]() {
278 279 280 281
            stop();
        });
        return;
    }
282

283
    if (_uri.isEmpty()) {
284
        qCWarning(VideoReceiverLog) << "Stop called on empty URI";
285 286 287 288
        return;
    }

    qCDebug(VideoReceiverLog) << "Stopping" << _uri;
289

290 291
    if (_pipeline != nullptr) {
        GstBus* bus;
292

293 294
        if ((bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline))) != nullptr) {
            gst_bus_disable_sync_message_emission(bus);
295

296 297
            g_signal_handlers_disconnect_by_data(bus, this);

298
            gboolean recordingValveClosed = TRUE;
299

300
            g_object_get(_recorderValve, "drop", &recordingValveClosed, nullptr);
301

302 303 304 305
            if (recordingValveClosed == FALSE) {
                gst_element_send_event(_pipeline, gst_event_new_eos());

                GstMessage* msg;
306

307 308 309 310 311 312 313 314 315 316 317 318
                if((msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, (GstMessageType)(GST_MESSAGE_EOS|GST_MESSAGE_ERROR))) != nullptr) {
                    if(GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) {
                        qCCritical(VideoReceiverLog) << "Error stopping pipeline!";
                    } else if(GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) {
                        qCDebug(VideoReceiverLog) << "End of stream received!";
                    }

                    gst_message_unref(msg);
                    msg = nullptr;
                } else {
                    qCCritical(VideoReceiverLog) << "gst_bus_timed_pop_filtered() failed";
                }
319
            }
320 321 322

            gst_object_unref(bus);
            bus = nullptr;
323
        } else {
324
            qCCritical(VideoReceiverLog) << "gst_pipeline_get_bus() failed";
325 326
        }

327
        gst_element_set_state(_pipeline, GST_STATE_NULL);
328

329 330 331 332
        // FIXME: check if branch is connected and remove all elements from branch
        if (_fileSink != nullptr) {
           _shutdownRecordingBranch();
        }
333

334 335 336
        if (_videoSink != nullptr) {
            _shutdownDecodingBranch();
        }
337

338
        GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-stopped");
339

340 341
        gst_object_unref(_pipeline);
        _pipeline = nullptr;
342

343 344 345 346
        _recorderValve = nullptr;
        _decoderValve = nullptr;
        _tee = nullptr;
        _source = nullptr;
347

348
        _lastSourceFrameTime = 0;
349

350 351
        if (_streaming) {
            _streaming = false;
352
            qCDebug(VideoReceiverLog) << "Streaming stopped" << _uri;
353
            _dispatchSignal([this](){
354
                emit streamingChanged(_streaming);
355
            });
356
        } else {
357
            qCDebug(VideoReceiverLog) << "Streaming did not start" << _uri;
358
        }
359 360
    }

361
    qCDebug(VideoReceiverLog) << "Stopped" << _uri;
362

363
    _dispatchSignal([this](){
364 365
        emit onStopComplete(STATUS_OK);
    });
366 367
}

368
void
369
GstVideoReceiver::startDecoding(void* sink)
370
{
371
    if (sink == nullptr) {
372
        qCCritical(VideoReceiverLog) << "VideoSink is NULL" << _uri;
373 374 375
        return;
    }

376
    if (_needDispatch()) {
377
        GstElement* videoSink = GST_ELEMENT(sink);
378
        gst_object_ref(videoSink);
379
        _slotHandler.dispatch([this, videoSink]() mutable {
380 381 382 383
            startDecoding(videoSink);
            gst_object_unref(videoSink);
        });
        return;
384 385
    }

386
    qCDebug(VideoReceiverLog) << "Starting decoding" << _uri;
387

388 389 390 391 392 393
    if (_pipeline == nullptr) {
        if (_videoSink != nullptr) {
            gst_object_unref(_videoSink);
            _videoSink = nullptr;
        }
    }
394

395 396
    GstElement* videoSink = GST_ELEMENT(sink);

397
    if(_videoSink != nullptr || _decoding) {
398
        qCDebug(VideoReceiverLog) << "Already decoding!" << _uri;
399
        _dispatchSignal([this](){
400 401
            emit onStartDecodingComplete(STATUS_INVALID_STATE);
        });
402 403
        return;
    }
404

405 406 407
    GstPad* pad;

    if ((pad = gst_element_get_static_pad(videoSink, "sink")) == nullptr) {
408
        qCCritical(VideoReceiverLog) << "Unable to find sink pad of video sink" << _uri;
409
        _dispatchSignal([this](){
410 411
            emit onStartDecodingComplete(STATUS_FAIL);
        });
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
        return;
    }

    _lastVideoFrameTime = 0;
    _resetVideoSink = true;

    _videoSinkProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _videoSinkProbe, this, nullptr);
    gst_object_unref(pad);
    pad = nullptr;

    _videoSink = videoSink;
    gst_object_ref(_videoSink);

    _removingDecoder = false;

    if (!_streaming) {
428
        _dispatchSignal([this](){
429 430
            emit onStartDecodingComplete(STATUS_OK);
        });
431 432 433 434
        return;
    }

    if (!_addDecoder(_decoderValve)) {
435
        qCCritical(VideoReceiverLog) << "_addDecoder() failed" << _uri;
436
        _dispatchSignal([this](){
437 438
            emit onStartDecodingComplete(STATUS_FAIL);
        });
439 440 441 442 443
        return;
    }

    g_object_set(_decoderValve, "drop", FALSE, nullptr);

444
    qCDebug(VideoReceiverLog) << "Decoding started" << _uri;
445

446
    _dispatchSignal([this](){
447 448
        emit onStartDecodingComplete(STATUS_OK);
    });
449 450 451
}

void
452
GstVideoReceiver::stopDecoding(void)
453
{
454 455
    if (_needDispatch()) {
        _slotHandler.dispatch([this]() {
456 457 458 459 460
            stopDecoding();
        });
        return;
    }

461
    qCDebug(VideoReceiverLog) << "Stopping decoding" << _uri;
462 463 464

    // exit immediately if we are not decoding
    if (_pipeline == nullptr || !_decoding) {
465
        qCDebug(VideoReceiverLog) << "Not decoding!" << _uri;
466
        _dispatchSignal([this](){
467 468
            emit onStopDecodingComplete(STATUS_INVALID_STATE);
        });
469 470 471 472 473 474 475
        return;
    }

    g_object_set(_decoderValve, "drop", TRUE, nullptr);

    _removingDecoder = true;

476 477 478 479
    bool ret = _unlinkBranch(_decoderValve);

    // FIXME: AV: it is much better to emit onStopDecodingComplete() after decoding is really stopped
    // (which happens later due to async design) but as for now it is also not so bad...
480
    _dispatchSignal([this, ret](){
481 482
        emit onStopDecodingComplete(ret ? STATUS_OK : STATUS_FAIL);
    });
483 484 485
}

void
486
GstVideoReceiver::startRecording(const QString& videoFile, FILE_FORMAT format)
487
{
488
    if (_needDispatch()) {
489
        QString cachedVideoFile = videoFile;
490
        _slotHandler.dispatch([this, cachedVideoFile, format]() {
491
            startRecording(cachedVideoFile, format);
492 493 494 495
        });
        return;
    }

496
    qCDebug(VideoReceiverLog) << "Starting recording" << _uri;
497

498
    if (_pipeline == nullptr) {
499
        qCDebug(VideoReceiverLog) << "Streaming is not active!" << _uri;
500
        _dispatchSignal([this](){
501 502
            emit onStartRecordingComplete(STATUS_INVALID_STATE);
        });
503 504 505 506
        return;
    }

    if (_recording) {
507
        qCDebug(VideoReceiverLog) << "Already recording!" << _uri;
508
        _dispatchSignal([this](){
509 510
            emit onStartRecordingComplete(STATUS_INVALID_STATE);
        });
511 512 513
        return;
    }

514
    qCDebug(VideoReceiverLog) << "New video file:" << videoFile <<  "" << _uri;
515 516

    if ((_fileSink = _makeFileSink(videoFile, format)) == nullptr) {
517
        qCCritical(VideoReceiverLog) << "_makeFileSink() failed" << _uri;
518
        _dispatchSignal([this](){
519 520
            emit onStartRecordingComplete(STATUS_FAIL);
        });
521 522 523 524 525 526 527 528 529 530
        return;
    }

    _removingRecorder = false;

    gst_object_ref(_fileSink);

    gst_bin_add(GST_BIN(_pipeline), _fileSink);

    if (!gst_element_link(_recorderValve, _fileSink)) {
531
        qCCritical(VideoReceiverLog) << "Failed to link valve and file sink" << _uri;
532
        _dispatchSignal([this](){
533 534
            emit onStartRecordingComplete(STATUS_FAIL);
        });
535 536 537 538 539 540 541 542 543 544 545 546 547
        return;
    }

    gst_element_sync_state_with_parent(_fileSink);

    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-filesink");

    // Install a probe on the recording branch to drop buffers until we hit our first keyframe
    // When we hit our first keyframe, we can offset the timestamps appropriately according to the first keyframe time
    // This will ensure the first frame is a keyframe at t=0, and decoding can begin immediately on playback
    GstPad* probepad;

    if ((probepad  = gst_element_get_static_pad(_recorderValve, "src")) == nullptr) {
548
        qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed" << _uri;
549
        _dispatchSignal([this](){
550 551
            emit onStartRecordingComplete(STATUS_FAIL);
        });
552 553 554 555 556 557 558 559 560 561
        return;
    }

    gst_pad_add_probe(probepad, GST_PAD_PROBE_TYPE_BUFFER, _keyframeWatch, this, nullptr); // to drop the buffers until key frame is received
    gst_object_unref(probepad);
    probepad = nullptr;

    g_object_set(_recorderValve, "drop", FALSE, nullptr);

    _recording = true;
562
    qCDebug(VideoReceiverLog) << "Recording started" << _uri;
563
    _dispatchSignal([this](){
564
        emit onStartRecordingComplete(STATUS_OK);
565
        emit recordingChanged(_recording);
566
    });
567 568 569 570
}

//-----------------------------------------------------------------------------
void
571
GstVideoReceiver::stopRecording(void)
572
{
573 574
    if (_needDispatch()) {
        _slotHandler.dispatch([this]() {
575 576 577 578 579
            stopRecording();
        });
        return;
    }

580
    qCDebug(VideoReceiverLog) << "Stopping recording" << _uri;
581 582 583

    // exit immediately if we are not recording
    if (_pipeline == nullptr || !_recording) {
584
        qCDebug(VideoReceiverLog) << "Not recording!" << _uri;
585
        _dispatchSignal([this](){
586 587
            emit onStopRecordingComplete(STATUS_INVALID_STATE);
        });
588 589 590 591 592 593 594
        return;
    }

    g_object_set(_recorderValve, "drop", TRUE, nullptr);

    _removingRecorder = true;

595 596 597 598
    bool ret = _unlinkBranch(_recorderValve);

    // FIXME: AV: it is much better to emit onStopRecordingComplete() after recording is really stopped
    // (which happens later due to async design) but as for now it is also not so bad...
599
    _dispatchSignal([this, ret](){
600 601
        emit onStopRecordingComplete(ret ? STATUS_OK : STATUS_FAIL);
    });
602 603 604
}

void
605
GstVideoReceiver::takeScreenshot(const QString& imageFile)
606
{
607
    if (_needDispatch()) {
608
        QString cachedImageFile = imageFile;
609
        _slotHandler.dispatch([this, cachedImageFile]() {
610
            takeScreenshot(cachedImageFile);
611 612 613 614 615
        });
        return;
    }

    // FIXME: AV: record screenshot here
616
    _dispatchSignal([this](){
617
        emit onTakeScreenshotComplete(STATUS_NOT_IMPLEMENTED);
618
    });
619 620
}

621
const char* GstVideoReceiver::_kFileMux[FILE_FORMAT_MAX - FILE_FORMAT_MIN] = {
622 623 624 625 626 627
    "matroskamux",
    "qtmux",
    "mp4mux"
};

void
628
GstVideoReceiver::_watchdog(void)
629
{
630
    _slotHandler.dispatch([this](){
631 632 633 634 635 636 637 638 639 640 641
        if(_pipeline == nullptr) {
            return;
        }

        const qint64 now = QDateTime::currentSecsSinceEpoch();

        if (_lastSourceFrameTime == 0) {
            _lastSourceFrameTime = now;
        }

        if (now - _lastSourceFrameTime > _timeout) {
642
            qCDebug(VideoReceiverLog) << "Stream timeout, no frames for " << now - _lastSourceFrameTime << "" << _uri;
643
            _dispatchSignal([this](){
644 645
                emit timeout();
            });
646
            stop();
647 648 649 650 651 652 653 654
        }

        if (_decoding && !_removingDecoder) {
            if (_lastVideoFrameTime == 0) {
                _lastVideoFrameTime = now;
            }

            if (now - _lastVideoFrameTime > _timeout * 2) {
655
                qCDebug(VideoReceiverLog) << "Video decoder timeout, no frames for " << now - _lastVideoFrameTime << " " << _uri;
656
                _dispatchSignal([this](){
657 658
                    emit timeout();
                });
659
                stop();
660 661 662 663 664 665
            }
        }
    });
}

void
666
GstVideoReceiver::_handleEOS(void)
667 668 669 670 671
{
    if(_pipeline == nullptr) {
        return;
    }

672
    if (_endOfStream) {
673 674 675 676 677 678
        stop();
    } else {
        if(_decoding && _removingDecoder) {
            _shutdownDecodingBranch();
        } else if(_recording && _removingRecorder) {
            _shutdownRecordingBranch();
679
        } /*else {
680 681
            qCWarning(VideoReceiverLog) << "Unexpected EOS!";
            stop();
682
        }*/
683 684 685 686
    }
}

GstElement*
687
GstVideoReceiver::_makeSource(const QString& uri)
688 689 690 691 692 693
{
    if (uri.isEmpty()) {
        qCCritical(VideoReceiverLog) << "Failed because URI is not specified";
        return nullptr;
    }

694 695 696 697 698 699
    bool isTaisync  = uri.contains("tsusb://",  Qt::CaseInsensitive);
    bool isUdp264   = uri.contains("udp://",    Qt::CaseInsensitive);
    bool isRtsp     = uri.contains("rtsp://",   Qt::CaseInsensitive);
    bool isUdp265   = uri.contains("udp265://", Qt::CaseInsensitive);
    bool isTcpMPEGTS= uri.contains("tcp://",    Qt::CaseInsensitive);
    bool isUdpMPEGTS= uri.contains("mpegts://", Qt::CaseInsensitive);
700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720

    GstElement* source  = nullptr;
    GstElement* buffer  = nullptr;
    GstElement* tsdemux = nullptr;
    GstElement* parser  = nullptr;
    GstElement* bin     = nullptr;
    GstElement* srcbin  = nullptr;

    do {
        QUrl url(uri);

        if(isTcpMPEGTS) {
            if ((source = gst_element_factory_make("tcpclientsrc", "source")) != nullptr) {
                g_object_set(static_cast<gpointer>(source), "host", qPrintable(url.host()), "port", url.port(), nullptr);
            }
        } else if (isRtsp) {
            if ((source = gst_element_factory_make("rtspsrc", "source")) != nullptr) {
                g_object_set(static_cast<gpointer>(source), "location", qPrintable(uri), "latency", 17, "udp-reconnect", 1, "timeout", _udpReconnect_us, NULL);
            }
        } else if(isUdp264 || isUdp265 || isUdpMPEGTS || isTaisync) {
            if ((source = gst_element_factory_make("udpsrc", "source")) != nullptr) {
721 722 723 724 725 726
                g_object_set(static_cast<gpointer>(source), "uri", QString("udp://%1:%2").arg(qPrintable(url.host()), QString::number(url.port())).toUtf8().data(), nullptr);

                GstCaps* caps = nullptr;

                if(isUdp264) {
                    if ((caps = gst_caps_from_string("application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H264")) == nullptr) {
727
                        qCCritical(VideoReceiverLog) << "gst_caps_from_string() failed";
728 729
                        break;
                    }
730
                } else if (isUdp265) {
731
                    if ((caps = gst_caps_from_string("application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H265")) == nullptr) {
732
                        qCCritical(VideoReceiverLog) << "gst_caps_from_string() failed";
733 734 735 736 737 738 739 740 741 742 743
                        break;
                    }
                }

                if (caps != nullptr) {
                    g_object_set(static_cast<gpointer>(source), "caps", caps, nullptr);
                    gst_caps_unref(caps);
                    caps = nullptr;
                }
            }
        } else {
744
            qCDebug(VideoReceiverLog) << "URI is not recognized";
745 746 747
        }

        if (!source) {
748
            qCCritical(VideoReceiverLog) << "gst_element_factory_make() for data source failed";
749 750 751
            break;
        }

752 753 754 755 756 757 758 759 760 761
        if ((bin = gst_bin_new("sourcebin")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_bin_new('sourcebin') failed";
            break;
        }

        if ((parser = gst_element_factory_make("parsebin", "parser")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('parsebin') failed";
            break;
        }

762 763
        g_signal_connect(parser, "autoplug-query", G_CALLBACK(_filterParserCaps), nullptr);

764 765
        gst_bin_add_many(GST_BIN(bin), source, parser, nullptr);

766
        // FIXME: AV: Android does not determine MPEG2-TS via parsebin - have to explicitly state which demux to use
767
        // FIXME: AV: tsdemux handling is a bit ugly - let's try to find elegant solution for that later
768
        if (isTcpMPEGTS || isUdpMPEGTS) {
769 770
            if ((tsdemux = gst_element_factory_make("tsdemux", nullptr)) == nullptr) {
                qCCritical(VideoReceiverLog) << "gst_element_factory_make('tsdemux') failed";
771 772
                break;
            }
773 774 775 776 777

            gst_bin_add(GST_BIN(bin), tsdemux);

            if (!gst_element_link(source, tsdemux)) {
                qCCritical(VideoReceiverLog) << "gst_element_link() failed";
778 779
                break;
            }
780

781 782
            source = tsdemux;
            tsdemux = nullptr;
783 784 785 786 787 788 789
        }

        int probeRes = 0;

        gst_element_foreach_src_pad(source, _padProbe, &probeRes);

        if (probeRes & 1) {
790
            if (probeRes & 2 && _buffer >= 0) {
791
                if ((buffer = gst_element_factory_make("rtpjitterbuffer", nullptr)) == nullptr) {
792
                    qCCritical(VideoReceiverLog) << "gst_element_factory_make('rtpjitterbuffer') failed";
793 794 795 796 797 798
                    break;
                }

                gst_bin_add(GST_BIN(bin), buffer);

                if (!gst_element_link_many(source, buffer, parser, nullptr)) {
799
                    qCCritical(VideoReceiverLog) << "gst_element_link() failed";
800 801 802 803
                    break;
                }
            } else {
                if (!gst_element_link(source, parser)) {
804
                    qCCritical(VideoReceiverLog) << "gst_element_link() failed";
805 806 807 808
                    break;
                }
            }
        } else {
809
            g_signal_connect(source, "pad-added", G_CALLBACK(_linkPad), parser);
810 811 812 813
        }

        g_signal_connect(parser, "pad-added", G_CALLBACK(_wrapWithGhostPad), nullptr);

814
        source = tsdemux = buffer = parser = nullptr;
815 816 817 818 819 820 821 822 823 824 825 826 827 828 829

        srcbin = bin;
        bin = nullptr;
    } while(0);

    if (bin != nullptr) {
        gst_object_unref(bin);
        bin = nullptr;
    }

    if (parser != nullptr) {
        gst_object_unref(parser);
        parser = nullptr;
    }

830 831 832 833 834
    if (tsdemux != nullptr) {
        gst_object_unref(tsdemux);
        tsdemux = nullptr;
    }

835 836 837 838 839 840 841 842 843 844 845 846 847
    if (buffer != nullptr) {
        gst_object_unref(buffer);
        buffer = nullptr;
    }

    if (source != nullptr) {
        gst_object_unref(source);
        source = nullptr;
    }

    return srcbin;
}

848
GstElement*
849
GstVideoReceiver::_makeDecoder(GstCaps* caps, GstElement* videoSink)
850
{
851 852
    Q_UNUSED(caps);

853
    GstElement* decoder = nullptr;
854

855 856 857 858 859
    do {
        if ((decoder = gst_element_factory_make("decodebin", nullptr)) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('decodebin') failed";
            break;
        }
860

861 862
        g_signal_connect(decoder, "autoplug-query", G_CALLBACK(_autoplugQuery), videoSink);
    } while(0);
863

864
    return decoder;
865 866
}

867
GstElement*
868
GstVideoReceiver::_makeFileSink(const QString& videoFile, FILE_FORMAT format)
869
{
870 871 872 873 874
    GstElement* fileSink = nullptr;
    GstElement* mux = nullptr;
    GstElement* sink = nullptr;
    GstElement* bin = nullptr;
    bool releaseElements = true;
875

876 877 878 879 880
    do{
        if (format < FILE_FORMAT_MIN || format >= FILE_FORMAT_MAX) {
            qCCritical(VideoReceiverLog) << "Unsupported file format";
            break;
        }
881

882 883 884 885
        if ((mux = gst_element_factory_make(_kFileMux[format - FILE_FORMAT_MIN], nullptr)) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('" << _kFileMux[format - FILE_FORMAT_MIN] << "') failed";
            break;
        }
886

887 888 889 890
        if ((sink = gst_element_factory_make("filesink", nullptr)) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('filesink') failed";
            break;
        }
891

892
        g_object_set(static_cast<gpointer>(sink), "location", qPrintable(videoFile), nullptr);
893

894 895 896 897
        if ((bin = gst_bin_new("sinkbin")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_bin_new('sinkbin') failed";
            break;
        }
898

899
        GstPadTemplate* padTemplate;
900

901 902 903 904
        if ((padTemplate = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(mux), "video_%u")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_class_get_pad_template(mux) failed";
            break;
        }
905

906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943
        // FIXME: AV: pad handling is potentially leaking (and other similar places too!)
        GstPad* pad;

        if ((pad = gst_element_request_pad(mux, padTemplate, nullptr, nullptr)) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_request_pad(mux) failed";
            break;
        }

        gst_bin_add_many(GST_BIN(bin), mux, sink, nullptr);

        releaseElements = false;

        GstPad* ghostpad = gst_ghost_pad_new("sink", pad);

        gst_element_add_pad(bin, ghostpad);

        gst_object_unref(pad);
        pad = nullptr;

        if (!gst_element_link(mux, sink)) {
            qCCritical(VideoReceiverLog) << "gst_element_link() failed";
            break;
        }

        fileSink = bin;
        bin = nullptr;
    } while(0);

    if (releaseElements) {
        if (sink != nullptr) {
            gst_object_unref(sink);
            sink = nullptr;
        }

        if (mux != nullptr) {
            gst_object_unref(mux);
            mux = nullptr;
        }
944 945
    }

946 947 948 949
    if (bin != nullptr) {
        gst_object_unref(bin);
        bin = nullptr;
    }
950

951 952
    return fileSink;
}
953

954
void
955
GstVideoReceiver::_onNewSourcePad(GstPad* pad)
Gus Grubba's avatar
Gus Grubba committed
956
{
957 958 959
    // FIXME: check for caps - if this is not video stream (and preferably - one of these which we have to support) then simply skip it
    if(!gst_element_link(_source, _tee)) {
        qCCritical(VideoReceiverLog) << "Unable to link source";
960 961
        return;
    }
962 963 964

    if (!_streaming) {
        _streaming = true;
965
        qCDebug(VideoReceiverLog) << "Streaming started" << _uri;
966
        _dispatchSignal([this](){
967
            emit streamingChanged(_streaming);
968
        });
969 970 971 972 973
    }

    gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, _eosProbe, this, nullptr);

    if (_videoSink == nullptr) {
974 975
        return;
    }
976

977 978 979 980 981 982 983 984 985
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-source-pad");

    if (!_addDecoder(_decoderValve)) {
        qCCritical(VideoReceiverLog) << "_addDecoder() failed";
        return;
    }

    g_object_set(_decoderValve, "drop", FALSE, nullptr);

986
    qCDebug(VideoReceiverLog) << "Decoding started" << _uri;
987
}
988

989
void
990
GstVideoReceiver::_onNewDecoderPad(GstPad* pad)
991 992
{
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-decoder-pad");
993

994 995
    qCDebug(VideoReceiverLog) << "_onNewDecoderPad" << _uri;

996 997
    if (!_addVideoSink(pad)) {
        qCCritical(VideoReceiverLog) << "_addVideoSink() failed";
998
    }
999
}
1000

1001
bool
1002
GstVideoReceiver::_addDecoder(GstElement* src)
1003 1004 1005 1006 1007 1008
{
    GstPad* srcpad;

    if ((srcpad = gst_element_get_static_pad(src, "src")) == nullptr) {
        qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed";
        return false;
Gus Grubba's avatar
Gus Grubba committed
1009
    }
1010

1011 1012 1013 1014 1015 1016 1017
    GstCaps* caps;

    if ((caps = gst_pad_query_caps(srcpad, nullptr)) == nullptr) {
        qCCritical(VideoReceiverLog) << "gst_pad_query_caps() failed";
        gst_object_unref(srcpad);
        srcpad = nullptr;
        return false;
Gus Grubba's avatar
Gus Grubba committed
1018
    }
1019 1020 1021 1022 1023 1024 1025 1026 1027

    gst_object_unref(srcpad);
    srcpad = nullptr;

    if ((_decoder = _makeDecoder(caps, _videoSink)) == nullptr) {
        qCCritical(VideoReceiverLog) << "_makeDecoder() failed";
        gst_caps_unref(caps);
        caps = nullptr;
        return false;
1028
    }
Gus Grubba's avatar
Gus Grubba committed
1029

1030
    gst_object_ref(_decoder);
1031

1032 1033
    gst_caps_unref(caps);
    caps = nullptr;
1034

1035
    gst_bin_add(GST_BIN(_pipeline), _decoder);
1036

1037
    gst_element_sync_state_with_parent(_decoder);
Gus Grubba's avatar
Gus Grubba committed
1038

1039
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-decoder");
1040

1041 1042 1043 1044
    if (!gst_element_link(src, _decoder)) {
        qCCritical(VideoReceiverLog) << "Unable to link decoder";
        return false;
    }
Gus Grubba's avatar
Gus Grubba committed
1045

1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070
    GstPad* srcPad = nullptr;

    GstIterator* it;

    if ((it = gst_element_iterate_src_pads(_decoder)) != nullptr) {
        GValue vpad = G_VALUE_INIT;

        if (gst_iterator_next(it, &vpad) == GST_ITERATOR_OK) {
            srcPad = GST_PAD(g_value_get_object(&vpad));
            gst_object_ref(srcPad);
            g_value_reset(&vpad);
        }

        gst_iterator_free(it);
        it = nullptr;
    }

    if (srcPad != nullptr) {
        _onNewDecoderPad(srcPad);
        gst_object_unref(srcPad);
        srcPad = nullptr;
    } else {
        g_signal_connect(_decoder, "pad-added", G_CALLBACK(_onNewPad), this);
    }

1071 1072
    return true;
}
Gus Grubba's avatar
Gus Grubba committed
1073

1074
bool
1075
GstVideoReceiver::_addVideoSink(GstPad* pad)
1076 1077
{
    GstCaps* caps = gst_pad_query_caps(pad, nullptr);
1078

1079 1080 1081 1082 1083 1084 1085 1086 1087 1088
    gst_object_ref(_videoSink); // gst_bin_add() will steal one reference

    gst_bin_add(GST_BIN(_pipeline), _videoSink);

    if(!gst_element_link(_decoder, _videoSink)) {
        gst_bin_remove(GST_BIN(_pipeline), _videoSink);
        qCCritical(VideoReceiverLog) << "Unable to link video sink";
        if (caps != nullptr) {
            gst_caps_unref(caps);
            caps = nullptr;
1089
        }
1090 1091
        return false;
    }
1092

1093
    gst_element_sync_state_with_parent(_videoSink);
1094

1095 1096
    g_object_set(_videoSink, "sync", _buffer >= 0, NULL);

1097
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-videosink");
1098

1099 1100
    if (caps != nullptr) {
        GstStructure* s = gst_caps_get_structure(caps, 0);
1101

1102 1103 1104 1105
        if (s != nullptr) {
            gint width, height;
            gst_structure_get_int(s, "width", &width);
            gst_structure_get_int(s, "height", &height);
1106 1107 1108
            _dispatchSignal([this, width, height](){
                emit videoSizeChanged(QSize(width, height));
            });
1109 1110
        }

1111 1112 1113
        gst_caps_unref(caps);
        caps = nullptr;
    } else {
1114 1115 1116
        _dispatchSignal([this](){
            emit videoSizeChanged(QSize(0, 0));
        });
1117
    }
1118

1119 1120
    return true;
}
Gus Grubba's avatar
Gus Grubba committed
1121

1122
void
1123
GstVideoReceiver::_noteTeeFrame(void)
1124 1125 1126
{
    _lastSourceFrameTime = QDateTime::currentSecsSinceEpoch();
}
Gus Grubba's avatar
Gus Grubba committed
1127

1128
void
1129
GstVideoReceiver::_noteVideoSinkFrame(void)
1130 1131
{
    _lastVideoFrameTime = QDateTime::currentSecsSinceEpoch();
1132 1133 1134 1135 1136 1137 1138
    if (!_decoding) {
        _decoding = true;
        qCDebug(VideoReceiverLog) << "Decoding started";
        _dispatchSignal([this](){
            emit decodingChanged(_decoding);
        });
    }
1139
}
Gus Grubba's avatar
Gus Grubba committed
1140

1141
void
1142
GstVideoReceiver::_noteEndOfStream(void)
1143
{
1144
    _endOfStream = true;
1145
}
Gus Grubba's avatar
Gus Grubba committed
1146

1147 1148
// -Unlink the branch from the src pad
// -Send an EOS event at the beginning of that branch
1149
bool
1150
GstVideoReceiver::_unlinkBranch(GstElement* from)
1151 1152
{
    GstPad* src;
Gus Grubba's avatar
Gus Grubba committed
1153

1154 1155
    if ((src = gst_element_get_static_pad(from, "src")) == nullptr) {
        qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed";
1156
        return false;
1157
    }
Gus Grubba's avatar
Gus Grubba committed
1158

1159
    GstPad* sink;
Gus Grubba's avatar
Gus Grubba committed
1160

1161 1162 1163 1164
    if ((sink = gst_pad_get_peer(src)) == nullptr) {
        gst_object_unref(src);
        src = nullptr;
        qCCritical(VideoReceiverLog) << "gst_pad_get_peer() failed";
1165
        return false;
1166
    }
1167

1168 1169 1170 1171 1172 1173
    if (!gst_pad_unlink(src, sink)) {
        gst_object_unref(src);
        src = nullptr;
        gst_object_unref(sink);
        sink = nullptr;
        qCCritical(VideoReceiverLog) << "gst_pad_unlink() failed";
1174
        return false;
1175
    }
1176

1177 1178
    gst_object_unref(src);
    src = nullptr;
1179

1180 1181
    // Send EOS at the beginning of the branch
    const gboolean ret = gst_pad_send_event(sink, gst_event_new_eos());
1182

1183 1184 1185
    gst_object_unref(sink);
    sink = nullptr;

1186
    if (!ret) {
1187
        qCCritical(VideoReceiverLog) << "Branch EOS was NOT sent";
1188
        return false;
Gus Grubba's avatar
Gus Grubba committed
1189
    }
1190 1191 1192 1193

    qCDebug(VideoReceiverLog) << "Branch EOS was sent";

    return true;
Gus Grubba's avatar
Gus Grubba committed
1194 1195
}

1196
void
1197
GstVideoReceiver::_shutdownDecodingBranch(void)
Gus Grubba's avatar
Gus Grubba committed
1198
{
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210
    if (_decoder != nullptr) {
        GstObject* parent;

        if ((parent = gst_element_get_parent(_decoder)) != nullptr) {
            gst_bin_remove(GST_BIN(_pipeline), _decoder);
            gst_element_set_state(_decoder, GST_STATE_NULL);
            gst_object_unref(parent);
            parent = nullptr;
        }

        gst_object_unref(_decoder);
        _decoder = nullptr;
1211
    }
1212 1213 1214 1215 1216 1217 1218

    if (_videoSinkProbeId != 0) {
        GstPad* sinkpad;
        if ((sinkpad = gst_element_get_static_pad(_videoSink, "sink")) != nullptr) {
            gst_pad_remove_probe(sinkpad, _videoSinkProbeId);
            gst_object_unref(sinkpad);
            sinkpad = nullptr;
1219
        }
1220
        _videoSinkProbeId = 0;
Gus Grubba's avatar
Gus Grubba committed
1221
    }
1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241

    _lastVideoFrameTime = 0;

    GstObject* parent;

    if ((parent = gst_element_get_parent(_videoSink)) != nullptr) {
        gst_bin_remove(GST_BIN(_pipeline), _videoSink);
        gst_element_set_state(_videoSink, GST_STATE_NULL);
        gst_object_unref(parent);
        parent = nullptr;
    }

    gst_object_unref(_videoSink);
    _videoSink = nullptr;

    _removingDecoder = false;

    if (_decoding) {
        _decoding = false;
        qCDebug(VideoReceiverLog) << "Decoding stopped";
1242
        _dispatchSignal([this](){
1243
            emit decodingChanged(_decoding);
1244
        });
1245 1246 1247
    }

    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-decoding-stopped");
Gus Grubba's avatar
Gus Grubba committed
1248 1249
}

1250
void
1251
GstVideoReceiver::_shutdownRecordingBranch(void)
Gus Grubba's avatar
Gus Grubba committed
1252
{
1253 1254 1255 1256
    gst_bin_remove(GST_BIN(_pipeline), _fileSink);
    gst_element_set_state(_fileSink, GST_STATE_NULL);
    gst_object_unref(_fileSink);
    _fileSink = nullptr;
Gus Grubba's avatar
Gus Grubba committed
1257

1258 1259 1260 1261 1262
    _removingRecorder = false;

    if (_recording) {
        _recording = false;
        qCDebug(VideoReceiverLog) << "Recording stopped";
1263
        _dispatchSignal([this](){
1264
            emit recordingChanged(_recording);
1265
        });
1266
    }
1267 1268

    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-recording-stopped");
1269 1270
}

1271 1272 1273 1274 1275 1276
bool
GstVideoReceiver::_needDispatch(void)
{
    return _slotHandler.needDispatch();
}

1277
void
1278
GstVideoReceiver::_dispatchSignal(std::function<void()> emitter)
1279
{
1280 1281 1282
    _signalDepth += 1;
    emitter();
    _signalDepth -= 1;
1283 1284
}

1285
gboolean
1286
GstVideoReceiver::_onBusMessage(GstBus* bus, GstMessage* msg, gpointer data)
Gus Grubba's avatar
Gus Grubba committed
1287 1288
{
    Q_UNUSED(bus)
Gus Grubba's avatar
Gus Grubba committed
1289
    Q_ASSERT(msg != nullptr && data != nullptr);
1290
    GstVideoReceiver* pThis = (GstVideoReceiver*)data;
1291

1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305
    switch (GST_MESSAGE_TYPE(msg)) {
    case GST_MESSAGE_ERROR:
        do {
            gchar* debug;
            GError* error;

            gst_message_parse_error(msg, &error, &debug);

            if (debug != nullptr) {
                g_free(debug);
                debug = nullptr;
            }

            if (error != nullptr) {
1306
                qCCritical(VideoReceiverLog) << "GStreamer error:" << error->message;
1307 1308 1309 1310
                g_error_free(error);
                error = nullptr;
            }

1311
            pThis->_slotHandler.dispatch([pThis](){
1312
                qCDebug(VideoReceiverLog) << "Stopping because of error";
1313 1314
                pThis->stop();
            });
1315
        } while(0);
1316
        break;
1317
    case GST_MESSAGE_EOS:
1318
        pThis->_slotHandler.dispatch([pThis](){
1319
            qCDebug(VideoReceiverLog) << "Received EOS";
1320 1321
            pThis->_handleEOS();
        });
1322
        break;
1323 1324 1325 1326 1327 1328
    case GST_MESSAGE_ELEMENT:
        do {
            const GstStructure* s = gst_message_get_structure (msg);

            if (!gst_structure_has_name (s, "GstBinForwarded")) {
                break;
1329
            }
1330 1331 1332 1333 1334 1335 1336 1337 1338 1339

            GstMessage* forward_msg = nullptr;

            gst_structure_get(s, "message", GST_TYPE_MESSAGE, &forward_msg, NULL);

            if (forward_msg == nullptr) {
                break;
            }

            if (GST_MESSAGE_TYPE(forward_msg) == GST_MESSAGE_EOS) {
1340
                pThis->_slotHandler.dispatch([pThis](){
1341
                    qCDebug(VideoReceiverLog) << "Received branch EOS";
1342 1343 1344 1345 1346 1347 1348
                    pThis->_handleEOS();
                });
            }

            gst_message_unref(forward_msg);
            forward_msg = nullptr;
        } while(0);
1349
        break;
1350 1351 1352 1353
    default:
        break;
    }

Gus Grubba's avatar
Gus Grubba committed
1354 1355
    return TRUE;
}
1356

1357
void
1358
GstVideoReceiver::_onNewPad(GstElement* element, GstPad* pad, gpointer data)
1359
{
1360
    GstVideoReceiver* self = static_cast<GstVideoReceiver*>(data);
1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371

    if (element == self->_source) {
        self->_onNewSourcePad(pad);
    } else if (element == self->_decoder) {
        self->_onNewDecoderPad(pad);
    } else {
        qCDebug(VideoReceiverLog) << "Unexpected call!";
    }
}

void
1372
GstVideoReceiver::_wrapWithGhostPad(GstElement* element, GstPad* pad, gpointer data)
1373
{
1374 1375
    Q_UNUSED(data)

1376 1377 1378 1379
    gchar* name;

    if ((name = gst_pad_get_name(pad)) == nullptr) {
        qCCritical(VideoReceiverLog) << "gst_pad_get_name() failed";
1380 1381 1382
        return;
    }

1383 1384 1385 1386 1387 1388 1389
    GstPad* ghostpad;

    if ((ghostpad = gst_ghost_pad_new(name, pad)) == nullptr) {
        qCCritical(VideoReceiverLog) << "gst_ghost_pad_new() failed";
        g_free(name);
        name = nullptr;
        return;
1390 1391
    }

1392 1393
    g_free(name);
    name = nullptr;
1394

1395
    gst_pad_set_active(ghostpad, TRUE);
1396

1397 1398
    if (!gst_element_add_pad(GST_ELEMENT_PARENT(element), ghostpad)) {
        qCCritical(VideoReceiverLog) << "gst_element_add_pad() failed";
1399 1400 1401
    }
}

1402
void
1403
GstVideoReceiver::_linkPad(GstElement* element, GstPad* pad, gpointer data)
1404
{
1405
    gchar* name;
1406

1407 1408 1409
    if ((name = gst_pad_get_name(pad)) != nullptr) {
        if(gst_element_link_pads(element, name, GST_ELEMENT(data), "sink") == false) {
            qCCritical(VideoReceiverLog) << "gst_element_link_pads() failed";
1410 1411
        }

1412 1413 1414 1415
        g_free(name);
        name = nullptr;
    } else {
        qCCritical(VideoReceiverLog) << "gst_pad_get_name() failed";
1416 1417 1418
    }
}

1419
gboolean
1420
GstVideoReceiver::_padProbe(GstElement* element, GstPad* pad, gpointer user_data)
1421
{
1422 1423
    Q_UNUSED(element)

1424
    int* probeRes = (int*)user_data;
1425

1426
    *probeRes |= 1;
1427

1428
    GstCaps* filter = gst_caps_from_string("application/x-rtp");
1429

1430 1431
    if (filter != nullptr) {
        GstCaps* caps = gst_pad_query_caps(pad, nullptr);
1432

1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444
        if (caps != nullptr) {
            if (!gst_caps_is_any(caps) && gst_caps_can_intersect(caps, filter)) {
                *probeRes |= 2;
            }

            gst_caps_unref(caps);
            caps = nullptr;
        }

        gst_caps_unref(filter);
        filter = nullptr;
    }
1445

1446 1447
    return TRUE;
}
1448

1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
gboolean
GstVideoReceiver::_filterParserCaps(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data)
{
    Q_UNUSED(bin)
    Q_UNUSED(pad)
    Q_UNUSED(element)
    Q_UNUSED(data)

    if (GST_QUERY_TYPE(query) != GST_QUERY_CAPS) {
        return FALSE;
    }

    GstCaps* srcCaps;

    gst_query_parse_caps(query, &srcCaps);

    if (srcCaps == nullptr || gst_caps_is_any(srcCaps)) {
        return FALSE;
    }

    GstCaps* sinkCaps = nullptr;

    GstCaps* filter;

    if (sinkCaps == nullptr && (filter = gst_caps_from_string("video/x-h264")) != nullptr) {
        if (gst_caps_can_intersect(srcCaps, filter)) {
            sinkCaps = gst_caps_from_string("video/x-h264,stream-format=avc");
        }

        gst_caps_unref(filter);
        filter = nullptr;
    } else if (sinkCaps == nullptr && (filter = gst_caps_from_string("video/x-h265")) != nullptr) {
        if (gst_caps_can_intersect(srcCaps, filter)) {
            sinkCaps = gst_caps_from_string("video/x-h265,stream-format=hvc1");
        }

        gst_caps_unref(filter);
        filter = nullptr;
    }

    if (sinkCaps == nullptr) {
        return FALSE;
    }

    gst_query_set_caps_result(query, sinkCaps);

    gst_caps_unref(sinkCaps);
    sinkCaps = nullptr;

    return TRUE;
}

1501
gboolean
1502
GstVideoReceiver::_autoplugQueryCaps(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data)
1503
{
1504 1505 1506 1507
    Q_UNUSED(bin)
    Q_UNUSED(pad)
    Q_UNUSED(element)

1508
    GstElement* glupload = (GstElement* )data;
1509

1510
    GstPad* sinkpad;
1511

1512 1513 1514
    if ((sinkpad = gst_element_get_static_pad(glupload, "sink")) == nullptr) {
        qCCritical(VideoReceiverLog) << "No sink pad found";
        return FALSE;
1515
    }
1516

1517
    GstCaps* filter;
1518

1519
    gst_query_parse_caps(query, &filter);
1520

1521
    GstCaps* sinkcaps = gst_pad_query_caps(sinkpad, filter);
1522

1523
    gst_query_set_caps_result(query, sinkcaps);
1524

1525
    const gboolean ret = !gst_caps_is_empty(sinkcaps);
1526

1527 1528
    gst_caps_unref(sinkcaps);
    sinkcaps = nullptr;
1529

1530 1531
    gst_object_unref(sinkpad);
    sinkpad = nullptr;
1532

1533
    return ret;
1534 1535
}

1536
gboolean
1537
GstVideoReceiver::_autoplugQueryContext(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data)
1538
{
1539 1540 1541 1542
    Q_UNUSED(bin)
    Q_UNUSED(pad)
    Q_UNUSED(element)

1543
    GstElement* glsink = (GstElement* )data;
1544

1545
    GstPad* sinkpad;
1546

1547 1548 1549 1550
    if ((sinkpad = gst_element_get_static_pad(glsink, "sink")) == nullptr){
        qCCritical(VideoReceiverLog) << "No sink pad found";
        return FALSE;
    }
1551

1552
    const gboolean ret = gst_pad_query(sinkpad, query);
1553

1554 1555
    gst_object_unref(sinkpad);
    sinkpad = nullptr;
1556

1557
    return ret;
1558 1559
}

1560
gboolean
1561
GstVideoReceiver::_autoplugQuery(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data)
1562
{
1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577
    gboolean ret;

    switch (GST_QUERY_TYPE(query)) {
    case GST_QUERY_CAPS:
        ret = _autoplugQueryCaps(bin, pad, element, query, data);
        break;
    case GST_QUERY_CONTEXT:
        ret = _autoplugQueryContext(bin, pad, element, query, data);
        break;
    default:
        ret = FALSE;
        break;
    }

    return ret;
1578 1579
}

1580
GstPadProbeReturn
1581
GstVideoReceiver::_teeProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data)
1582
{
1583
    Q_UNUSED(pad)
1584 1585 1586
    Q_UNUSED(info)

    if(user_data != nullptr) {
1587
        GstVideoReceiver* pThis = static_cast<GstVideoReceiver*>(user_data);
1588
        pThis->_noteTeeFrame();
1589
    }
1590 1591

    return GST_PAD_PROBE_OK;
1592
}
1593

1594
GstPadProbeReturn
1595
GstVideoReceiver::_videoSinkProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data)
1596
{
1597 1598 1599
    Q_UNUSED(pad)
    Q_UNUSED(info)

1600
    if(user_data != nullptr) {
1601
        GstVideoReceiver* pThis = static_cast<GstVideoReceiver*>(user_data);
1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629

        if (pThis->_resetVideoSink) {
            pThis->_resetVideoSink = false;

// FIXME: AV: this makes MPEG2-TS playing smooth but breaks RTSP
//            gst_pad_send_event(pad, gst_event_new_flush_start());
//            gst_pad_send_event(pad, gst_event_new_flush_stop(TRUE));

//            GstBuffer* buf;

//            if ((buf = gst_pad_probe_info_get_buffer(info)) != nullptr) {
//                GstSegment* seg;

//                if ((seg = gst_segment_new()) != nullptr) {
//                    gst_segment_init(seg, GST_FORMAT_TIME);

//                    seg->start = buf->pts;

//                    gst_pad_send_event(pad, gst_event_new_segment(seg));

//                    gst_segment_free(seg);
//                    seg = nullptr;
//                }

//                gst_pad_set_offset(pad, -static_cast<gint64>(buf->pts));
//            }
        }

1630 1631 1632 1633 1634 1635
        pThis->_noteVideoSinkFrame();
    }

    return GST_PAD_PROBE_OK;
}

1636
GstPadProbeReturn
1637
GstVideoReceiver::_eosProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data)
1638 1639
{
    Q_UNUSED(pad);
1640
    Q_ASSERT(user_data != nullptr);
1641

1642 1643
    if(info != nullptr) {
        GstEvent* event = gst_pad_probe_info_get_event(info);
1644

1645
        if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
1646
            GstVideoReceiver* pThis = static_cast<GstVideoReceiver*>(user_data);
1647
            pThis->_noteEndOfStream();
1648 1649 1650
        }
    }

1651
    return GST_PAD_PROBE_OK;
1652 1653
}

1654
GstPadProbeReturn
1655
GstVideoReceiver::_keyframeWatch(GstPad* pad, GstPadProbeInfo* info, gpointer user_data)
1656
{
1657 1658 1659
    if (info == nullptr || user_data == nullptr) {
        qCCritical(VideoReceiverLog) << "Invalid arguments";
        return GST_PAD_PROBE_DROP;
1660 1661
    }

1662 1663 1664 1665
    GstBuffer* buf = gst_pad_probe_info_get_buffer(info);

    if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)) { // wait for a keyframe
        return GST_PAD_PROBE_DROP;
1666 1667
    }

1668 1669
    // set media file '0' offset to current timeline position - we don't want to touch other elements in the graph, except these which are downstream!
    gst_pad_set_offset(pad, -static_cast<gint64>(buf->pts));
1670

1671
    GstVideoReceiver* pThis = static_cast<GstVideoReceiver*>(user_data);
1672 1673

    qCDebug(VideoReceiverLog) << "Got keyframe, stop dropping buffers";
1674

1675 1676 1677
    pThis->_dispatchSignal([pThis]() {
        pThis->recordingStarted();
    });
1678 1679 1680

    return GST_PAD_PROBE_REMOVE;
}