GstVideoReceiver.cc 46.8 KB
Newer Older
1 2
/****************************************************************************
 *
Gus Grubba's avatar
Gus Grubba committed
3
 * (c) 2009-2020 QGROUNDCONTROL PROJECT <http://www.qgroundcontrol.org>
4 5 6 7 8
 *
 * QGroundControl is licensed according to the terms in the file
 * COPYING.md in the root of the source code directory.
 *
 ****************************************************************************/
Gus Grubba's avatar
Gus Grubba committed
9 10 11 12 13


/**
 * @file
 *   @brief QGC Video Receiver
Gus Grubba's avatar
Gus Grubba committed
14
 *   @author Gus Grubba <gus@auterion.com>
Gus Grubba's avatar
Gus Grubba committed
15 16
 */

17
#include "GstVideoReceiver.h"
18

Gus Grubba's avatar
Gus Grubba committed
19
#include <QDebug>
20
#include <QUrl>
21
#include <QDateTime>
22
#include <QSysInfo>
23

24 25
QGC_LOGGING_CATEGORY(VideoReceiverLog, "VideoReceiverLog")

26 27 28 29 30 31 32 33 34
//-----------------------------------------------------------------------------
// Our pipeline look like this:
//
//              +-->queue-->_decoderValve[-->_decoder-->_videoSink]
//              |
// _source-->_tee
//              |
//              +-->queue-->_recorderValve[-->_fileSink]
//
35

36 37
GstVideoReceiver::GstVideoReceiver(QObject* parent)
    : VideoReceiver(parent)
38 39 40
    , _streaming(false)
    , _decoding(false)
    , _recording(false)
41 42 43
    , _removingDecoder(false)
    , _removingRecorder(false)
    , _source(nullptr)
Gus Grubba's avatar
Gus Grubba committed
44
    , _tee(nullptr)
45 46 47
    , _decoderValve(nullptr)
    , _recorderValve(nullptr)
    , _decoder(nullptr)
Gus Grubba's avatar
Gus Grubba committed
48
    , _videoSink(nullptr)
49 50 51 52 53 54
    , _fileSink(nullptr)
    , _pipeline(nullptr)
    , _lastSourceFrameTime(0)
    , _lastVideoFrameTime(0)
    , _resetVideoSink(true)
    , _videoSinkProbeId(0)
55
    , _udpReconnect_us(5000000)
56
    , _signalDepth(0)
57
    , _endOfStream(false)
Gus Grubba's avatar
Gus Grubba committed
58
{
59
    _slotHandler.start();
60
    connect(&_watchdogTimer, &QTimer::timeout, this, &GstVideoReceiver::_watchdog);
61
    _watchdogTimer.start(1000);
Gus Grubba's avatar
Gus Grubba committed
62 63
}

64
GstVideoReceiver::~GstVideoReceiver(void)
Gus Grubba's avatar
Gus Grubba committed
65
{
66
    stop();
67
    _slotHandler.shutdown();
Gus Grubba's avatar
Gus Grubba committed
68 69
}

70
void
71
GstVideoReceiver::start(const QString& uri, unsigned timeout, int buffer)
72
{
73
    if (_needDispatch()) {
74
        QString cachedUri = uri;
75 76
        _slotHandler.dispatch([this, cachedUri, timeout, buffer]() {
            start(cachedUri, timeout, buffer);
77 78 79
        });
        return;
    }
80

81
    if(_pipeline) {
82
        qCDebug(VideoReceiverLog) << "Already running!" << _uri;
83
        _dispatchSignal([this](){
84
            emit onStartComplete(STATUS_INVALID_STATE);
85
        });
86 87
        return;
    }
88

89 90
    if (uri.isEmpty()) {
        qCDebug(VideoReceiverLog) << "Failed because URI is not specified";
91
        _dispatchSignal([this](){
92
            emit onStartComplete(STATUS_INVALID_URL);
93
        });
94
        return;
95 96
    }

97
    _uri = uri;
98 99
    _timeout = timeout;
    _buffer = buffer;
100

101
    qCDebug(VideoReceiverLog) << "Starting" << _uri << ", buffer" << _buffer;
102

103 104
    _endOfStream = false;

105 106
    bool running    = false;
    bool pipelineUp = false;
107

108 109
    GstElement* decoderQueue = nullptr;
    GstElement* recorderQueue = nullptr;
110

111 112 113 114 115
    do {
        if((_tee = gst_element_factory_make("tee", nullptr)) == nullptr)  {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('tee') failed";
            break;
        }
116

117
        GstPad* pad;
118

119 120 121 122
        if ((pad = gst_element_get_static_pad(_tee, "sink")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed";
            break;
        }
123

124
        _lastSourceFrameTime = 0;
125

126 127 128
        gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _teeProbe, this, nullptr);
        gst_object_unref(pad);
        pad = nullptr;
129

130 131 132 133
        if((decoderQueue = gst_element_factory_make("queue", nullptr)) == nullptr)  {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('queue') failed";
            break;
        }
134

135 136 137 138
        if((_decoderValve = gst_element_factory_make("valve", nullptr)) == nullptr)  {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('valve') failed";
            break;
        }
139

140
        g_object_set(_decoderValve, "drop", TRUE, nullptr);
141

142 143 144 145
        if((recorderQueue = gst_element_factory_make("queue", nullptr)) == nullptr)  {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('queue') failed";
            break;
        }
146

147 148 149 150
        if((_recorderValve = gst_element_factory_make("valve", nullptr)) == nullptr)  {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('valve') failed";
            break;
        }
151

152
        g_object_set(_recorderValve, "drop", TRUE, nullptr);
153

154 155 156 157
        if ((_pipeline = gst_pipeline_new("receiver")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_pipeline_new() failed";
            break;
        }
158

159
        g_object_set(_pipeline, "message-forward", TRUE, nullptr);
160

161 162 163 164
        if ((_source = _makeSource(uri)) == nullptr) {
            qCCritical(VideoReceiverLog) << "_makeSource() failed";
            break;
        }
165

166
        gst_bin_add_many(GST_BIN(_pipeline), _source, _tee, decoderQueue, _decoderValve, recorderQueue, _recorderValve, nullptr);
167

168
        pipelineUp = true;
169

170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
        GstPad* srcPad = nullptr;

        GstIterator* it;

        if ((it = gst_element_iterate_src_pads(_source)) != nullptr) {
            GValue vpad = G_VALUE_INIT;

            if (gst_iterator_next(it, &vpad) == GST_ITERATOR_OK) {
                srcPad = GST_PAD(g_value_get_object(&vpad));
                gst_object_ref(srcPad);
                g_value_reset(&vpad);
            }

            gst_iterator_free(it);
            it = nullptr;
        }

        if (srcPad != nullptr) {
            _onNewSourcePad(srcPad);
            gst_object_unref(srcPad);
            srcPad = nullptr;
        } else {
            g_signal_connect(_source, "pad-added", G_CALLBACK(_onNewPad), this);
        }

195 196 197 198
        if(!gst_element_link_many(_tee, decoderQueue, _decoderValve, nullptr)) {
            qCCritical(VideoReceiverLog) << "Unable to link decoder queue";
            break;
        }
199

200 201 202 203
        if(!gst_element_link_many(_tee, recorderQueue, _recorderValve, nullptr)) {
            qCCritical(VideoReceiverLog) << "Unable to link recorder queue";
            break;
        }
204

205
        GstBus* bus = nullptr;
206

207 208 209 210 211 212
        if ((bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline))) != nullptr) {
            gst_bus_enable_sync_message_emission(bus);
            g_signal_connect(bus, "sync-message", G_CALLBACK(_onBusMessage), this);
            gst_object_unref(bus);
            bus = nullptr;
        }
213

214 215 216 217 218 219 220 221 222 223 224 225
        GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-initial");
        running = gst_element_set_state(_pipeline, GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE;
    } while(0);

    if (!running) {
        qCCritical(VideoReceiverLog) << "Failed";

        // In newer versions, the pipeline will clean up all references that are added to it
        if (_pipeline != nullptr) {
            gst_element_set_state(_pipeline, GST_STATE_NULL);
            gst_object_unref(_pipeline);
            _pipeline = nullptr;
226 227
        }

228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
        // If we failed before adding items to the pipeline, then clean up
        if (!pipelineUp) {
            if (_recorderValve != nullptr) {
                gst_object_unref(_recorderValve);
                _recorderValve = nullptr;
            }

            if (recorderQueue != nullptr) {
                gst_object_unref(recorderQueue);
                recorderQueue = nullptr;
            }

            if (_decoderValve != nullptr) {
                gst_object_unref(_decoderValve);
                _decoderValve = nullptr;
            }

            if (decoderQueue != nullptr) {
                gst_object_unref(decoderQueue);
                decoderQueue = nullptr;
            }

            if (_tee != nullptr) {
                gst_object_unref(_tee);
                _tee = nullptr;
            }

            if (_source != nullptr) {
                gst_object_unref(_source);
                _source = nullptr;
            }
        }
260

261
        _dispatchSignal([this](){
262
            emit onStartComplete(STATUS_FAIL);
263
        });
264 265
    } else {
        GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-started");
266
        qCDebug(VideoReceiverLog) << "Started" << _uri;
267

268
        _dispatchSignal([this](){
269
            emit onStartComplete(STATUS_OK);
270
        });
271
    }
272
}
273

274
void
275
GstVideoReceiver::stop(void)
276
{
277 278
    if (_needDispatch()) {
        _slotHandler.dispatch([this]() {
279 280 281 282
            stop();
        });
        return;
    }
283

284 285 286 287 288 289
    if (_uri.isEmpty()) {
        qCWarning(VideoReceiverLog) << "We should not be here";
        return;
    }

    qCDebug(VideoReceiverLog) << "Stopping" << _uri;
290

291 292
    if (_pipeline != nullptr) {
        GstBus* bus;
293

294 295
        if ((bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline))) != nullptr) {
            gst_bus_disable_sync_message_emission(bus);
296

297 298
            g_signal_handlers_disconnect_by_data(bus, this);

299
            gboolean recordingValveClosed = TRUE;
300

301
            g_object_get(_recorderValve, "drop", &recordingValveClosed, nullptr);
302

303 304 305 306
            if (recordingValveClosed == FALSE) {
                gst_element_send_event(_pipeline, gst_event_new_eos());

                GstMessage* msg;
307

308 309 310 311 312 313 314 315 316 317 318 319
                if((msg = gst_bus_timed_pop_filtered(bus, GST_CLOCK_TIME_NONE, (GstMessageType)(GST_MESSAGE_EOS|GST_MESSAGE_ERROR))) != nullptr) {
                    if(GST_MESSAGE_TYPE(msg) == GST_MESSAGE_ERROR) {
                        qCCritical(VideoReceiverLog) << "Error stopping pipeline!";
                    } else if(GST_MESSAGE_TYPE(msg) == GST_MESSAGE_EOS) {
                        qCDebug(VideoReceiverLog) << "End of stream received!";
                    }

                    gst_message_unref(msg);
                    msg = nullptr;
                } else {
                    qCCritical(VideoReceiverLog) << "gst_bus_timed_pop_filtered() failed";
                }
320
            }
321 322 323

            gst_object_unref(bus);
            bus = nullptr;
324
        } else {
325
            qCCritical(VideoReceiverLog) << "gst_pipeline_get_bus() failed";
326 327
        }

328
        gst_element_set_state(_pipeline, GST_STATE_NULL);
329

330 331 332 333
        // FIXME: check if branch is connected and remove all elements from branch
        if (_fileSink != nullptr) {
           _shutdownRecordingBranch();
        }
334

335 336 337
        if (_videoSink != nullptr) {
            _shutdownDecodingBranch();
        }
338

339
        GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-stopped");
340

341 342
        gst_object_unref(_pipeline);
        _pipeline = nullptr;
343

344 345 346 347
        _recorderValve = nullptr;
        _decoderValve = nullptr;
        _tee = nullptr;
        _source = nullptr;
348

349
        _lastSourceFrameTime = 0;
350

351 352
        if (_streaming) {
            _streaming = false;
353
            qCDebug(VideoReceiverLog) << "Streaming stopped" << _uri;
354
            _dispatchSignal([this](){
355
                emit streamingChanged(_streaming);
356
            });
357
        } else {
358
            qCDebug(VideoReceiverLog) << "Streaming did not start" << _uri;
359
           _dispatchSignal([this](){
360 361
               emit timeout();
           });
362
        }
363 364
    }

365
    qCDebug(VideoReceiverLog) << "Stopped" << _uri;
366

367
    _dispatchSignal([this](){
368 369
        emit onStopComplete(STATUS_OK);
    });
370 371
}

372
void
373
GstVideoReceiver::startDecoding(void* sink)
374
{
375
    if (sink == nullptr) {
376
        qCCritical(VideoReceiverLog) << "VideoSink is NULL" << _uri;
377 378 379
        return;
    }

380
    if (_needDispatch()) {
381
        GstElement* videoSink = GST_ELEMENT(sink);
382
        gst_object_ref(videoSink);
383
        _slotHandler.dispatch([this, videoSink]() mutable {
384 385 386 387
            startDecoding(videoSink);
            gst_object_unref(videoSink);
        });
        return;
388 389
    }

390
    qCDebug(VideoReceiverLog) << "Starting decoding" << _uri;
391

392 393 394 395 396 397
    if (_pipeline == nullptr) {
        if (_videoSink != nullptr) {
            gst_object_unref(_videoSink);
            _videoSink = nullptr;
        }
    }
398

399 400
    GstElement* videoSink = GST_ELEMENT(sink);

401
    if(_videoSink != nullptr || _decoding) {
402
        qCDebug(VideoReceiverLog) << "Already decoding!" << _uri;
403
        _dispatchSignal([this](){
404 405
            emit onStartDecodingComplete(STATUS_INVALID_STATE);
        });
406 407
        return;
    }
408

409 410 411
    GstPad* pad;

    if ((pad = gst_element_get_static_pad(videoSink, "sink")) == nullptr) {
412
        qCCritical(VideoReceiverLog) << "Unable to find sink pad of video sink" << _uri;
413
        _dispatchSignal([this](){
414 415
            emit onStartDecodingComplete(STATUS_FAIL);
        });
416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431
        return;
    }

    _lastVideoFrameTime = 0;
    _resetVideoSink = true;

    _videoSinkProbeId = gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_BUFFER, _videoSinkProbe, this, nullptr);
    gst_object_unref(pad);
    pad = nullptr;

    _videoSink = videoSink;
    gst_object_ref(_videoSink);

    _removingDecoder = false;

    if (!_streaming) {
432
        _dispatchSignal([this](){
433 434
            emit onStartDecodingComplete(STATUS_OK);
        });
435 436 437 438
        return;
    }

    if (!_addDecoder(_decoderValve)) {
439
        qCCritical(VideoReceiverLog) << "_addDecoder() failed" << _uri;
440
        _dispatchSignal([this](){
441 442
            emit onStartDecodingComplete(STATUS_FAIL);
        });
443 444 445 446 447
        return;
    }

    g_object_set(_decoderValve, "drop", FALSE, nullptr);

448
    qCDebug(VideoReceiverLog) << "Decoding started" << _uri;
449

450
    _dispatchSignal([this](){
451 452
        emit onStartDecodingComplete(STATUS_OK);
    });
453 454 455
}

void
456
GstVideoReceiver::stopDecoding(void)
457
{
458 459
    if (_needDispatch()) {
        _slotHandler.dispatch([this]() {
460 461 462 463 464
            stopDecoding();
        });
        return;
    }

465
    qCDebug(VideoReceiverLog) << "Stopping decoding" << _uri;
466 467 468

    // exit immediately if we are not decoding
    if (_pipeline == nullptr || !_decoding) {
469
        qCDebug(VideoReceiverLog) << "Not decoding!" << _uri;
470
        _dispatchSignal([this](){
471 472
            emit onStopDecodingComplete(STATUS_INVALID_STATE);
        });
473 474 475 476 477 478 479
        return;
    }

    g_object_set(_decoderValve, "drop", TRUE, nullptr);

    _removingDecoder = true;

480 481 482 483
    bool ret = _unlinkBranch(_decoderValve);

    // FIXME: AV: it is much better to emit onStopDecodingComplete() after decoding is really stopped
    // (which happens later due to async design) but as for now it is also not so bad...
484
    _dispatchSignal([this, ret](){
485 486
        emit onStopDecodingComplete(ret ? STATUS_OK : STATUS_FAIL);
    });
487 488 489
}

void
490
GstVideoReceiver::startRecording(const QString& videoFile, FILE_FORMAT format)
491
{
492
    if (_needDispatch()) {
493
        QString cachedVideoFile = videoFile;
494
        _slotHandler.dispatch([this, cachedVideoFile, format]() {
495
            startRecording(cachedVideoFile, format);
496 497 498 499
        });
        return;
    }

500
    qCDebug(VideoReceiverLog) << "Starting recording" << _uri;
501

502
    if (_pipeline == nullptr) {
503
        qCDebug(VideoReceiverLog) << "Streaming is not active!" << _uri;
504
        _dispatchSignal([this](){
505 506
            emit onStartRecordingComplete(STATUS_INVALID_STATE);
        });
507 508 509 510
        return;
    }

    if (_recording) {
511
        qCDebug(VideoReceiverLog) << "Already recording!" << _uri;
512
        _dispatchSignal([this](){
513 514
            emit onStartRecordingComplete(STATUS_INVALID_STATE);
        });
515 516 517
        return;
    }

518
    qCDebug(VideoReceiverLog) << "New video file:" << videoFile <<  "" << _uri;
519 520

    if ((_fileSink = _makeFileSink(videoFile, format)) == nullptr) {
521
        qCCritical(VideoReceiverLog) << "_makeFileSink() failed" << _uri;
522
        _dispatchSignal([this](){
523 524
            emit onStartRecordingComplete(STATUS_FAIL);
        });
525 526 527 528 529 530 531 532 533 534
        return;
    }

    _removingRecorder = false;

    gst_object_ref(_fileSink);

    gst_bin_add(GST_BIN(_pipeline), _fileSink);

    if (!gst_element_link(_recorderValve, _fileSink)) {
535
        qCCritical(VideoReceiverLog) << "Failed to link valve and file sink" << _uri;
536
        _dispatchSignal([this](){
537 538
            emit onStartRecordingComplete(STATUS_FAIL);
        });
539 540 541 542 543 544 545 546 547 548 549 550 551
        return;
    }

    gst_element_sync_state_with_parent(_fileSink);

    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-filesink");

    // Install a probe on the recording branch to drop buffers until we hit our first keyframe
    // When we hit our first keyframe, we can offset the timestamps appropriately according to the first keyframe time
    // This will ensure the first frame is a keyframe at t=0, and decoding can begin immediately on playback
    GstPad* probepad;

    if ((probepad  = gst_element_get_static_pad(_recorderValve, "src")) == nullptr) {
552
        qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed" << _uri;
553
        _dispatchSignal([this](){
554 555
            emit onStartRecordingComplete(STATUS_FAIL);
        });
556 557 558 559 560 561 562 563 564 565
        return;
    }

    gst_pad_add_probe(probepad, GST_PAD_PROBE_TYPE_BUFFER, _keyframeWatch, this, nullptr); // to drop the buffers until key frame is received
    gst_object_unref(probepad);
    probepad = nullptr;

    g_object_set(_recorderValve, "drop", FALSE, nullptr);

    _recording = true;
566
    qCDebug(VideoReceiverLog) << "Recording started" << _uri;
567
    _dispatchSignal([this](){
568
        emit onStartRecordingComplete(STATUS_OK);
569
        emit recordingChanged(_recording);
570
    });
571 572 573 574
}

//-----------------------------------------------------------------------------
void
575
GstVideoReceiver::stopRecording(void)
576
{
577 578
    if (_needDispatch()) {
        _slotHandler.dispatch([this]() {
579 580 581 582 583
            stopRecording();
        });
        return;
    }

584
    qCDebug(VideoReceiverLog) << "Stopping recording" << _uri;
585 586 587

    // exit immediately if we are not recording
    if (_pipeline == nullptr || !_recording) {
588
        qCDebug(VideoReceiverLog) << "Not recording!" << _uri;
589
        _dispatchSignal([this](){
590 591
            emit onStopRecordingComplete(STATUS_INVALID_STATE);
        });
592 593 594 595 596 597 598
        return;
    }

    g_object_set(_recorderValve, "drop", TRUE, nullptr);

    _removingRecorder = true;

599 600 601 602
    bool ret = _unlinkBranch(_recorderValve);

    // FIXME: AV: it is much better to emit onStopRecordingComplete() after recording is really stopped
    // (which happens later due to async design) but as for now it is also not so bad...
603
    _dispatchSignal([this, ret](){
604 605
        emit onStopRecordingComplete(ret ? STATUS_OK : STATUS_FAIL);
    });
606 607 608
}

void
609
GstVideoReceiver::takeScreenshot(const QString& imageFile)
610
{
611
    if (_needDispatch()) {
612
        QString cachedImageFile = imageFile;
613
        _slotHandler.dispatch([this, cachedImageFile]() {
614
            takeScreenshot(cachedImageFile);
615 616 617 618 619
        });
        return;
    }

    // FIXME: AV: record screenshot here
620
    _dispatchSignal([this](){
621
        emit onTakeScreenshotComplete(STATUS_NOT_IMPLEMENTED);
622
    });
623 624
}

625
const char* GstVideoReceiver::_kFileMux[FILE_FORMAT_MAX - FILE_FORMAT_MIN] = {
626 627 628 629 630 631
    "matroskamux",
    "qtmux",
    "mp4mux"
};

void
632
GstVideoReceiver::_watchdog(void)
633
{
634
    _slotHandler.dispatch([this](){
635 636 637 638 639 640 641 642 643 644 645
        if(_pipeline == nullptr) {
            return;
        }

        const qint64 now = QDateTime::currentSecsSinceEpoch();

        if (_lastSourceFrameTime == 0) {
            _lastSourceFrameTime = now;
        }

        if (now - _lastSourceFrameTime > _timeout) {
646
            qCDebug(VideoReceiverLog) << "Stream timeout, no frames for " << now - _lastSourceFrameTime << "" << _uri;
647
            _dispatchSignal([this](){
648 649
                emit timeout();
            });
650 651 652 653 654 655 656 657
        }

        if (_decoding && !_removingDecoder) {
            if (_lastVideoFrameTime == 0) {
                _lastVideoFrameTime = now;
            }

            if (now - _lastVideoFrameTime > _timeout * 2) {
658
                qCDebug(VideoReceiverLog) << "Video decoder timeout, no frames for " << now - _lastVideoFrameTime << " " << _uri;
659
                _dispatchSignal([this](){
660 661
                    emit timeout();
                });
662 663 664 665 666 667
            }
        }
    });
}

void
668
GstVideoReceiver::_handleEOS(void)
669 670 671
{
    if(_pipeline == nullptr) {
        qCWarning(VideoReceiverLog) << "We should not be here";
672
        stop();
673 674 675
        return;
    }

676
    if (_endOfStream) {
677 678 679 680 681 682
        stop();
    } else {
        if(_decoding && _removingDecoder) {
            _shutdownDecodingBranch();
        } else if(_recording && _removingRecorder) {
            _shutdownRecordingBranch();
683
        } /*else {
684 685
            qCWarning(VideoReceiverLog) << "Unexpected EOS!";
            stop();
686
        }*/
687 688 689 690
    }
}

GstElement*
691
GstVideoReceiver::_makeSource(const QString& uri)
692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724
{
    if (uri.isEmpty()) {
        qCCritical(VideoReceiverLog) << "Failed because URI is not specified";
        return nullptr;
    }

    bool isTaisync  = uri.contains("tsusb://");
    bool isUdp264   = uri.contains("udp://");
    bool isRtsp     = uri.contains("rtsp://");
    bool isUdp265   = uri.contains("udp265://");
    bool isTcpMPEGTS= uri.contains("tcp://");
    bool isUdpMPEGTS= uri.contains("mpegts://");

    GstElement* source  = nullptr;
    GstElement* buffer  = nullptr;
    GstElement* tsdemux = nullptr;
    GstElement* parser  = nullptr;
    GstElement* bin     = nullptr;
    GstElement* srcbin  = nullptr;

    do {
        QUrl url(uri);

        if(isTcpMPEGTS) {
            if ((source = gst_element_factory_make("tcpclientsrc", "source")) != nullptr) {
                g_object_set(static_cast<gpointer>(source), "host", qPrintable(url.host()), "port", url.port(), nullptr);
            }
        } else if (isRtsp) {
            if ((source = gst_element_factory_make("rtspsrc", "source")) != nullptr) {
                g_object_set(static_cast<gpointer>(source), "location", qPrintable(uri), "latency", 17, "udp-reconnect", 1, "timeout", _udpReconnect_us, NULL);
            }
        } else if(isUdp264 || isUdp265 || isUdpMPEGTS || isTaisync) {
            if ((source = gst_element_factory_make("udpsrc", "source")) != nullptr) {
725 726 727 728 729 730
                g_object_set(static_cast<gpointer>(source), "uri", QString("udp://%1:%2").arg(qPrintable(url.host()), QString::number(url.port())).toUtf8().data(), nullptr);

                GstCaps* caps = nullptr;

                if(isUdp264) {
                    if ((caps = gst_caps_from_string("application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H264")) == nullptr) {
731
                        qCCritical(VideoReceiverLog) << "gst_caps_from_string() failed";
732 733
                        break;
                    }
734
                } else if (isUdp265) {
735
                    if ((caps = gst_caps_from_string("application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H265")) == nullptr) {
736
                        qCCritical(VideoReceiverLog) << "gst_caps_from_string() failed";
737 738 739 740 741 742 743 744 745 746 747
                        break;
                    }
                }

                if (caps != nullptr) {
                    g_object_set(static_cast<gpointer>(source), "caps", caps, nullptr);
                    gst_caps_unref(caps);
                    caps = nullptr;
                }
            }
        } else {
748
            qCDebug(VideoReceiverLog) << "URI is not recognized";
749 750 751
        }

        if (!source) {
752
            qCCritical(VideoReceiverLog) << "gst_element_factory_make() for data source failed";
753 754 755
            break;
        }

756 757 758 759 760 761 762 763 764 765
        if ((bin = gst_bin_new("sourcebin")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_bin_new('sourcebin') failed";
            break;
        }

        if ((parser = gst_element_factory_make("parsebin", "parser")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('parsebin') failed";
            break;
        }

766 767
        g_signal_connect(parser, "autoplug-query", G_CALLBACK(_filterParserCaps), nullptr);

768 769
        gst_bin_add_many(GST_BIN(bin), source, parser, nullptr);

770
        // FIXME: AV: Android does not determine MPEG2-TS via parsebin - have to explicitly state which demux to use
771
        // FIXME: AV: tsdemux handling is a bit ugly - let's try to find elegant solution for that later
772
        if (isTcpMPEGTS || isUdpMPEGTS) {
773 774
            if ((tsdemux = gst_element_factory_make("tsdemux", nullptr)) == nullptr) {
                qCCritical(VideoReceiverLog) << "gst_element_factory_make('tsdemux') failed";
775 776
                break;
            }
777 778 779 780 781

            gst_bin_add(GST_BIN(bin), tsdemux);

            if (!gst_element_link(source, tsdemux)) {
                qCCritical(VideoReceiverLog) << "gst_element_link() failed";
782 783
                break;
            }
784

785 786
            source = tsdemux;
            tsdemux = nullptr;
787 788 789 790 791 792 793
        }

        int probeRes = 0;

        gst_element_foreach_src_pad(source, _padProbe, &probeRes);

        if (probeRes & 1) {
794
            if (probeRes & 2 && _buffer >= 0) {
795
                if ((buffer = gst_element_factory_make("rtpjitterbuffer", nullptr)) == nullptr) {
796
                    qCCritical(VideoReceiverLog) << "gst_element_factory_make('rtpjitterbuffer') failed";
797 798 799 800 801 802
                    break;
                }

                gst_bin_add(GST_BIN(bin), buffer);

                if (!gst_element_link_many(source, buffer, parser, nullptr)) {
803
                    qCCritical(VideoReceiverLog) << "gst_element_link() failed";
804 805 806 807
                    break;
                }
            } else {
                if (!gst_element_link(source, parser)) {
808
                    qCCritical(VideoReceiverLog) << "gst_element_link() failed";
809 810 811 812
                    break;
                }
            }
        } else {
813
            g_signal_connect(source, "pad-added", G_CALLBACK(_linkPad), parser);
814 815 816 817
        }

        g_signal_connect(parser, "pad-added", G_CALLBACK(_wrapWithGhostPad), nullptr);

818
        source = tsdemux = buffer = parser = nullptr;
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833

        srcbin = bin;
        bin = nullptr;
    } while(0);

    if (bin != nullptr) {
        gst_object_unref(bin);
        bin = nullptr;
    }

    if (parser != nullptr) {
        gst_object_unref(parser);
        parser = nullptr;
    }

834 835 836 837 838
    if (tsdemux != nullptr) {
        gst_object_unref(tsdemux);
        tsdemux = nullptr;
    }

839 840 841 842 843 844 845 846 847 848 849 850 851
    if (buffer != nullptr) {
        gst_object_unref(buffer);
        buffer = nullptr;
    }

    if (source != nullptr) {
        gst_object_unref(source);
        source = nullptr;
    }

    return srcbin;
}

852
GstElement*
853
GstVideoReceiver::_makeDecoder(GstCaps* caps, GstElement* videoSink)
854
{
855 856
    Q_UNUSED(caps);

857
    GstElement* decoder = nullptr;
858

859 860 861 862 863
    do {
        if ((decoder = gst_element_factory_make("decodebin", nullptr)) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('decodebin') failed";
            break;
        }
864

865 866
        g_signal_connect(decoder, "autoplug-query", G_CALLBACK(_autoplugQuery), videoSink);
    } while(0);
867

868
    return decoder;
869 870
}

871
GstElement*
872
GstVideoReceiver::_makeFileSink(const QString& videoFile, FILE_FORMAT format)
873
{
874 875 876 877 878
    GstElement* fileSink = nullptr;
    GstElement* mux = nullptr;
    GstElement* sink = nullptr;
    GstElement* bin = nullptr;
    bool releaseElements = true;
879

880 881 882 883 884
    do{
        if (format < FILE_FORMAT_MIN || format >= FILE_FORMAT_MAX) {
            qCCritical(VideoReceiverLog) << "Unsupported file format";
            break;
        }
885

886 887 888 889
        if ((mux = gst_element_factory_make(_kFileMux[format - FILE_FORMAT_MIN], nullptr)) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('" << _kFileMux[format - FILE_FORMAT_MIN] << "') failed";
            break;
        }
890

891 892 893 894
        if ((sink = gst_element_factory_make("filesink", nullptr)) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_factory_make('filesink') failed";
            break;
        }
895

896
        g_object_set(static_cast<gpointer>(sink), "location", qPrintable(videoFile), nullptr);
897

898 899 900 901
        if ((bin = gst_bin_new("sinkbin")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_bin_new('sinkbin') failed";
            break;
        }
902

903
        GstPadTemplate* padTemplate;
904

905 906 907 908
        if ((padTemplate = gst_element_class_get_pad_template(GST_ELEMENT_GET_CLASS(mux), "video_%u")) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_class_get_pad_template(mux) failed";
            break;
        }
909

910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947
        // FIXME: AV: pad handling is potentially leaking (and other similar places too!)
        GstPad* pad;

        if ((pad = gst_element_request_pad(mux, padTemplate, nullptr, nullptr)) == nullptr) {
            qCCritical(VideoReceiverLog) << "gst_element_request_pad(mux) failed";
            break;
        }

        gst_bin_add_many(GST_BIN(bin), mux, sink, nullptr);

        releaseElements = false;

        GstPad* ghostpad = gst_ghost_pad_new("sink", pad);

        gst_element_add_pad(bin, ghostpad);

        gst_object_unref(pad);
        pad = nullptr;

        if (!gst_element_link(mux, sink)) {
            qCCritical(VideoReceiverLog) << "gst_element_link() failed";
            break;
        }

        fileSink = bin;
        bin = nullptr;
    } while(0);

    if (releaseElements) {
        if (sink != nullptr) {
            gst_object_unref(sink);
            sink = nullptr;
        }

        if (mux != nullptr) {
            gst_object_unref(mux);
            mux = nullptr;
        }
948 949
    }

950 951 952 953
    if (bin != nullptr) {
        gst_object_unref(bin);
        bin = nullptr;
    }
954

955 956
    return fileSink;
}
957

958
void
959
GstVideoReceiver::_onNewSourcePad(GstPad* pad)
Gus Grubba's avatar
Gus Grubba committed
960
{
961 962 963
    // FIXME: check for caps - if this is not video stream (and preferably - one of these which we have to support) then simply skip it
    if(!gst_element_link(_source, _tee)) {
        qCCritical(VideoReceiverLog) << "Unable to link source";
964 965
        return;
    }
966 967 968

    if (!_streaming) {
        _streaming = true;
969
        qCDebug(VideoReceiverLog) << "Streaming started" << _uri;
970
        _dispatchSignal([this](){
971
            emit streamingChanged(_streaming);
972
        });
973 974 975 976 977
    }

    gst_pad_add_probe(pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, _eosProbe, this, nullptr);

    if (_videoSink == nullptr) {
978 979
        return;
    }
980

981 982 983 984 985 986 987 988 989
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-source-pad");

    if (!_addDecoder(_decoderValve)) {
        qCCritical(VideoReceiverLog) << "_addDecoder() failed";
        return;
    }

    g_object_set(_decoderValve, "drop", FALSE, nullptr);

990
    qCDebug(VideoReceiverLog) << "Decoding started" << _uri;
991
}
992

993
void
994
GstVideoReceiver::_onNewDecoderPad(GstPad* pad)
995 996
{
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-new-decoder-pad");
997

998 999
    qCDebug(VideoReceiverLog) << "_onNewDecoderPad" << _uri;

1000 1001
    if (!_addVideoSink(pad)) {
        qCCritical(VideoReceiverLog) << "_addVideoSink() failed";
1002
    }
1003
}
1004

1005
bool
1006
GstVideoReceiver::_addDecoder(GstElement* src)
1007 1008 1009 1010 1011 1012
{
    GstPad* srcpad;

    if ((srcpad = gst_element_get_static_pad(src, "src")) == nullptr) {
        qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed";
        return false;
Gus Grubba's avatar
Gus Grubba committed
1013
    }
1014

1015 1016 1017 1018 1019 1020 1021
    GstCaps* caps;

    if ((caps = gst_pad_query_caps(srcpad, nullptr)) == nullptr) {
        qCCritical(VideoReceiverLog) << "gst_pad_query_caps() failed";
        gst_object_unref(srcpad);
        srcpad = nullptr;
        return false;
Gus Grubba's avatar
Gus Grubba committed
1022
    }
1023 1024 1025 1026 1027 1028 1029 1030 1031

    gst_object_unref(srcpad);
    srcpad = nullptr;

    if ((_decoder = _makeDecoder(caps, _videoSink)) == nullptr) {
        qCCritical(VideoReceiverLog) << "_makeDecoder() failed";
        gst_caps_unref(caps);
        caps = nullptr;
        return false;
1032
    }
Gus Grubba's avatar
Gus Grubba committed
1033

1034
    gst_object_ref(_decoder);
1035

1036 1037
    gst_caps_unref(caps);
    caps = nullptr;
1038

1039
    gst_bin_add(GST_BIN(_pipeline), _decoder);
1040

1041
    gst_element_sync_state_with_parent(_decoder);
Gus Grubba's avatar
Gus Grubba committed
1042

1043
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-decoder");
1044

1045 1046 1047 1048
    if (!gst_element_link(src, _decoder)) {
        qCCritical(VideoReceiverLog) << "Unable to link decoder";
        return false;
    }
Gus Grubba's avatar
Gus Grubba committed
1049

1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074
    GstPad* srcPad = nullptr;

    GstIterator* it;

    if ((it = gst_element_iterate_src_pads(_decoder)) != nullptr) {
        GValue vpad = G_VALUE_INIT;

        if (gst_iterator_next(it, &vpad) == GST_ITERATOR_OK) {
            srcPad = GST_PAD(g_value_get_object(&vpad));
            gst_object_ref(srcPad);
            g_value_reset(&vpad);
        }

        gst_iterator_free(it);
        it = nullptr;
    }

    if (srcPad != nullptr) {
        _onNewDecoderPad(srcPad);
        gst_object_unref(srcPad);
        srcPad = nullptr;
    } else {
        g_signal_connect(_decoder, "pad-added", G_CALLBACK(_onNewPad), this);
    }

1075 1076
    return true;
}
Gus Grubba's avatar
Gus Grubba committed
1077

1078
bool
1079
GstVideoReceiver::_addVideoSink(GstPad* pad)
1080 1081
{
    GstCaps* caps = gst_pad_query_caps(pad, nullptr);
1082

1083 1084 1085 1086 1087 1088 1089 1090 1091 1092
    gst_object_ref(_videoSink); // gst_bin_add() will steal one reference

    gst_bin_add(GST_BIN(_pipeline), _videoSink);

    if(!gst_element_link(_decoder, _videoSink)) {
        gst_bin_remove(GST_BIN(_pipeline), _videoSink);
        qCCritical(VideoReceiverLog) << "Unable to link video sink";
        if (caps != nullptr) {
            gst_caps_unref(caps);
            caps = nullptr;
1093
        }
1094 1095
        return false;
    }
1096

1097
    gst_element_sync_state_with_parent(_videoSink);
1098

1099 1100
    g_object_set(_videoSink, "sync", _buffer >= 0, NULL);

1101
    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-with-videosink");
1102

1103 1104
    if (caps != nullptr) {
        GstStructure* s = gst_caps_get_structure(caps, 0);
1105

1106 1107 1108 1109
        if (s != nullptr) {
            gint width, height;
            gst_structure_get_int(s, "width", &width);
            gst_structure_get_int(s, "height", &height);
1110 1111 1112
            _dispatchSignal([this, width, height](){
                emit videoSizeChanged(QSize(width, height));
            });
1113 1114
        }

1115 1116 1117
        gst_caps_unref(caps);
        caps = nullptr;
    } else {
1118 1119 1120
        _dispatchSignal([this](){
            emit videoSizeChanged(QSize(0, 0));
        });
1121
    }
1122

1123
    _decoding = true;
1124
    qCDebug(VideoReceiverLog) << "Decoding started";
1125
    _dispatchSignal([this](){
1126
        emit decodingChanged(_decoding);
1127
    });
Gus Grubba's avatar
Gus Grubba committed
1128

1129 1130
    return true;
}
Gus Grubba's avatar
Gus Grubba committed
1131

1132
void
1133
GstVideoReceiver::_noteTeeFrame(void)
1134 1135 1136
{
    _lastSourceFrameTime = QDateTime::currentSecsSinceEpoch();
}
Gus Grubba's avatar
Gus Grubba committed
1137

1138
void
1139
GstVideoReceiver::_noteVideoSinkFrame(void)
1140 1141 1142
{
    _lastVideoFrameTime = QDateTime::currentSecsSinceEpoch();
}
Gus Grubba's avatar
Gus Grubba committed
1143

1144
void
1145
GstVideoReceiver::_noteEndOfStream(void)
1146
{
1147
    _endOfStream = true;
1148
}
Gus Grubba's avatar
Gus Grubba committed
1149

1150 1151
// -Unlink the branch from the src pad
// -Send an EOS event at the beginning of that branch
1152
bool
1153
GstVideoReceiver::_unlinkBranch(GstElement* from)
1154 1155
{
    GstPad* src;
Gus Grubba's avatar
Gus Grubba committed
1156

1157 1158
    if ((src = gst_element_get_static_pad(from, "src")) == nullptr) {
        qCCritical(VideoReceiverLog) << "gst_element_get_static_pad() failed";
1159
        return false;
1160
    }
Gus Grubba's avatar
Gus Grubba committed
1161

1162
    GstPad* sink;
Gus Grubba's avatar
Gus Grubba committed
1163

1164 1165 1166 1167
    if ((sink = gst_pad_get_peer(src)) == nullptr) {
        gst_object_unref(src);
        src = nullptr;
        qCCritical(VideoReceiverLog) << "gst_pad_get_peer() failed";
1168
        return false;
1169
    }
1170

1171 1172 1173 1174 1175 1176
    if (!gst_pad_unlink(src, sink)) {
        gst_object_unref(src);
        src = nullptr;
        gst_object_unref(sink);
        sink = nullptr;
        qCCritical(VideoReceiverLog) << "gst_pad_unlink() failed";
1177
        return false;
1178
    }
1179

1180 1181
    gst_object_unref(src);
    src = nullptr;
1182

1183 1184
    // Send EOS at the beginning of the branch
    const gboolean ret = gst_pad_send_event(sink, gst_event_new_eos());
1185

1186 1187 1188
    gst_object_unref(sink);
    sink = nullptr;

1189
    if (!ret) {
1190
        qCCritical(VideoReceiverLog) << "Branch EOS was NOT sent";
1191
        return false;
Gus Grubba's avatar
Gus Grubba committed
1192
    }
1193 1194 1195 1196

    qCDebug(VideoReceiverLog) << "Branch EOS was sent";

    return true;
Gus Grubba's avatar
Gus Grubba committed
1197 1198
}

1199
void
1200
GstVideoReceiver::_shutdownDecodingBranch(void)
Gus Grubba's avatar
Gus Grubba committed
1201
{
1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213
    if (_decoder != nullptr) {
        GstObject* parent;

        if ((parent = gst_element_get_parent(_decoder)) != nullptr) {
            gst_bin_remove(GST_BIN(_pipeline), _decoder);
            gst_element_set_state(_decoder, GST_STATE_NULL);
            gst_object_unref(parent);
            parent = nullptr;
        }

        gst_object_unref(_decoder);
        _decoder = nullptr;
1214
    }
1215 1216 1217 1218 1219 1220 1221

    if (_videoSinkProbeId != 0) {
        GstPad* sinkpad;
        if ((sinkpad = gst_element_get_static_pad(_videoSink, "sink")) != nullptr) {
            gst_pad_remove_probe(sinkpad, _videoSinkProbeId);
            gst_object_unref(sinkpad);
            sinkpad = nullptr;
1222
        }
1223
        _videoSinkProbeId = 0;
Gus Grubba's avatar
Gus Grubba committed
1224
    }
1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244

    _lastVideoFrameTime = 0;

    GstObject* parent;

    if ((parent = gst_element_get_parent(_videoSink)) != nullptr) {
        gst_bin_remove(GST_BIN(_pipeline), _videoSink);
        gst_element_set_state(_videoSink, GST_STATE_NULL);
        gst_object_unref(parent);
        parent = nullptr;
    }

    gst_object_unref(_videoSink);
    _videoSink = nullptr;

    _removingDecoder = false;

    if (_decoding) {
        _decoding = false;
        qCDebug(VideoReceiverLog) << "Decoding stopped";
1245
        _dispatchSignal([this](){
1246
            emit decodingChanged(_decoding);
1247
        });
1248 1249 1250
    }

    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-decoding-stopped");
Gus Grubba's avatar
Gus Grubba committed
1251 1252
}

1253
void
1254
GstVideoReceiver::_shutdownRecordingBranch(void)
Gus Grubba's avatar
Gus Grubba committed
1255
{
1256 1257 1258 1259
    gst_bin_remove(GST_BIN(_pipeline), _fileSink);
    gst_element_set_state(_fileSink, GST_STATE_NULL);
    gst_object_unref(_fileSink);
    _fileSink = nullptr;
Gus Grubba's avatar
Gus Grubba committed
1260

1261 1262 1263 1264 1265
    _removingRecorder = false;

    if (_recording) {
        _recording = false;
        qCDebug(VideoReceiverLog) << "Recording stopped";
1266
        _dispatchSignal([this](){
1267
            emit recordingChanged(_recording);
1268
        });
1269
    }
1270 1271

    GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(_pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-recording-stopped");
1272 1273
}

1274 1275 1276 1277 1278 1279
bool
GstVideoReceiver::_needDispatch(void)
{
    return _slotHandler.needDispatch();
}

1280
void
1281
GstVideoReceiver::_dispatchSignal(std::function<void()> emitter)
1282
{
1283 1284 1285
    _signalDepth += 1;
    emitter();
    _signalDepth -= 1;
1286 1287
}

1288
gboolean
1289
GstVideoReceiver::_onBusMessage(GstBus* bus, GstMessage* msg, gpointer data)
Gus Grubba's avatar
Gus Grubba committed
1290 1291
{
    Q_UNUSED(bus)
Gus Grubba's avatar
Gus Grubba committed
1292
    Q_ASSERT(msg != nullptr && data != nullptr);
1293
    GstVideoReceiver* pThis = (GstVideoReceiver*)data;
1294

1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308
    switch (GST_MESSAGE_TYPE(msg)) {
    case GST_MESSAGE_ERROR:
        do {
            gchar* debug;
            GError* error;

            gst_message_parse_error(msg, &error, &debug);

            if (debug != nullptr) {
                g_free(debug);
                debug = nullptr;
            }

            if (error != nullptr) {
1309
                qCCritical(VideoReceiverLog) << "GStreamer error:" << error->message;
1310 1311 1312 1313
                g_error_free(error);
                error = nullptr;
            }

1314
            pThis->_slotHandler.dispatch([pThis](){
1315
                qCDebug(VideoReceiverLog) << "Stopping because of error";
1316 1317
                pThis->stop();
            });
1318
        } while(0);
1319
        break;
1320
    case GST_MESSAGE_EOS:
1321
        pThis->_slotHandler.dispatch([pThis](){
1322
            qCDebug(VideoReceiverLog) << "Received EOS";
1323 1324
            pThis->_handleEOS();
        });
1325
        break;
1326 1327 1328 1329 1330 1331
    case GST_MESSAGE_ELEMENT:
        do {
            const GstStructure* s = gst_message_get_structure (msg);

            if (!gst_structure_has_name (s, "GstBinForwarded")) {
                break;
1332
            }
1333 1334 1335 1336 1337 1338 1339 1340 1341 1342

            GstMessage* forward_msg = nullptr;

            gst_structure_get(s, "message", GST_TYPE_MESSAGE, &forward_msg, NULL);

            if (forward_msg == nullptr) {
                break;
            }

            if (GST_MESSAGE_TYPE(forward_msg) == GST_MESSAGE_EOS) {
1343
                pThis->_slotHandler.dispatch([pThis](){
1344
                    qCDebug(VideoReceiverLog) << "Received branch EOS";
1345 1346 1347 1348 1349 1350 1351
                    pThis->_handleEOS();
                });
            }

            gst_message_unref(forward_msg);
            forward_msg = nullptr;
        } while(0);
1352
        break;
1353 1354 1355 1356
    default:
        break;
    }

Gus Grubba's avatar
Gus Grubba committed
1357 1358
    return TRUE;
}
1359

1360
void
1361
GstVideoReceiver::_onNewPad(GstElement* element, GstPad* pad, gpointer data)
1362
{
1363
    GstVideoReceiver* self = static_cast<GstVideoReceiver*>(data);
1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374

    if (element == self->_source) {
        self->_onNewSourcePad(pad);
    } else if (element == self->_decoder) {
        self->_onNewDecoderPad(pad);
    } else {
        qCDebug(VideoReceiverLog) << "Unexpected call!";
    }
}

void
1375
GstVideoReceiver::_wrapWithGhostPad(GstElement* element, GstPad* pad, gpointer data)
1376
{
1377 1378
    Q_UNUSED(data)

1379 1380 1381 1382
    gchar* name;

    if ((name = gst_pad_get_name(pad)) == nullptr) {
        qCCritical(VideoReceiverLog) << "gst_pad_get_name() failed";
1383 1384 1385
        return;
    }

1386 1387 1388 1389 1390 1391 1392
    GstPad* ghostpad;

    if ((ghostpad = gst_ghost_pad_new(name, pad)) == nullptr) {
        qCCritical(VideoReceiverLog) << "gst_ghost_pad_new() failed";
        g_free(name);
        name = nullptr;
        return;
1393 1394
    }

1395 1396
    g_free(name);
    name = nullptr;
1397

1398
    gst_pad_set_active(ghostpad, TRUE);
1399

1400 1401
    if (!gst_element_add_pad(GST_ELEMENT_PARENT(element), ghostpad)) {
        qCCritical(VideoReceiverLog) << "gst_element_add_pad() failed";
1402 1403 1404
    }
}

1405
void
1406
GstVideoReceiver::_linkPad(GstElement* element, GstPad* pad, gpointer data)
1407
{
1408
    gchar* name;
1409

1410 1411 1412
    if ((name = gst_pad_get_name(pad)) != nullptr) {
        if(gst_element_link_pads(element, name, GST_ELEMENT(data), "sink") == false) {
            qCCritical(VideoReceiverLog) << "gst_element_link_pads() failed";
1413 1414
        }

1415 1416 1417 1418
        g_free(name);
        name = nullptr;
    } else {
        qCCritical(VideoReceiverLog) << "gst_pad_get_name() failed";
1419 1420 1421
    }
}

1422
gboolean
1423
GstVideoReceiver::_padProbe(GstElement* element, GstPad* pad, gpointer user_data)
1424
{
1425 1426
    Q_UNUSED(element)

1427
    int* probeRes = (int*)user_data;
1428

1429
    *probeRes |= 1;
1430

1431
    GstCaps* filter = gst_caps_from_string("application/x-rtp");
1432

1433 1434
    if (filter != nullptr) {
        GstCaps* caps = gst_pad_query_caps(pad, nullptr);
1435

1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447
        if (caps != nullptr) {
            if (!gst_caps_is_any(caps) && gst_caps_can_intersect(caps, filter)) {
                *probeRes |= 2;
            }

            gst_caps_unref(caps);
            caps = nullptr;
        }

        gst_caps_unref(filter);
        filter = nullptr;
    }
1448

1449 1450
    return TRUE;
}
1451

1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503
gboolean
GstVideoReceiver::_filterParserCaps(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data)
{
    Q_UNUSED(bin)
    Q_UNUSED(pad)
    Q_UNUSED(element)
    Q_UNUSED(data)

    if (GST_QUERY_TYPE(query) != GST_QUERY_CAPS) {
        return FALSE;
    }

    GstCaps* srcCaps;

    gst_query_parse_caps(query, &srcCaps);

    if (srcCaps == nullptr || gst_caps_is_any(srcCaps)) {
        return FALSE;
    }

    GstCaps* sinkCaps = nullptr;

    GstCaps* filter;

    if (sinkCaps == nullptr && (filter = gst_caps_from_string("video/x-h264")) != nullptr) {
        if (gst_caps_can_intersect(srcCaps, filter)) {
            sinkCaps = gst_caps_from_string("video/x-h264,stream-format=avc");
        }

        gst_caps_unref(filter);
        filter = nullptr;
    } else if (sinkCaps == nullptr && (filter = gst_caps_from_string("video/x-h265")) != nullptr) {
        if (gst_caps_can_intersect(srcCaps, filter)) {
            sinkCaps = gst_caps_from_string("video/x-h265,stream-format=hvc1");
        }

        gst_caps_unref(filter);
        filter = nullptr;
    }

    if (sinkCaps == nullptr) {
        return FALSE;
    }

    gst_query_set_caps_result(query, sinkCaps);

    gst_caps_unref(sinkCaps);
    sinkCaps = nullptr;

    return TRUE;
}

1504
gboolean
1505
GstVideoReceiver::_autoplugQueryCaps(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data)
1506
{
1507 1508 1509 1510
    Q_UNUSED(bin)
    Q_UNUSED(pad)
    Q_UNUSED(element)

1511
    GstElement* glupload = (GstElement* )data;
1512

1513
    GstPad* sinkpad;
1514

1515 1516 1517
    if ((sinkpad = gst_element_get_static_pad(glupload, "sink")) == nullptr) {
        qCCritical(VideoReceiverLog) << "No sink pad found";
        return FALSE;
1518
    }
1519

1520
    GstCaps* filter;
1521

1522
    gst_query_parse_caps(query, &filter);
1523

1524
    GstCaps* sinkcaps = gst_pad_query_caps(sinkpad, filter);
1525

1526
    gst_query_set_caps_result(query, sinkcaps);
1527

1528
    const gboolean ret = !gst_caps_is_empty(sinkcaps);
1529

1530 1531
    gst_caps_unref(sinkcaps);
    sinkcaps = nullptr;
1532

1533 1534
    gst_object_unref(sinkpad);
    sinkpad = nullptr;
1535

1536
    return ret;
1537 1538
}

1539
gboolean
1540
GstVideoReceiver::_autoplugQueryContext(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data)
1541
{
1542 1543 1544 1545
    Q_UNUSED(bin)
    Q_UNUSED(pad)
    Q_UNUSED(element)

1546
    GstElement* glsink = (GstElement* )data;
1547

1548
    GstPad* sinkpad;
1549

1550 1551 1552 1553
    if ((sinkpad = gst_element_get_static_pad(glsink, "sink")) == nullptr){
        qCCritical(VideoReceiverLog) << "No sink pad found";
        return FALSE;
    }
1554

1555
    const gboolean ret = gst_pad_query(sinkpad, query);
1556

1557 1558
    gst_object_unref(sinkpad);
    sinkpad = nullptr;
1559

1560
    return ret;
1561 1562
}

1563
gboolean
1564
GstVideoReceiver::_autoplugQuery(GstElement* bin, GstPad* pad, GstElement* element, GstQuery* query, gpointer data)
1565
{
1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580
    gboolean ret;

    switch (GST_QUERY_TYPE(query)) {
    case GST_QUERY_CAPS:
        ret = _autoplugQueryCaps(bin, pad, element, query, data);
        break;
    case GST_QUERY_CONTEXT:
        ret = _autoplugQueryContext(bin, pad, element, query, data);
        break;
    default:
        ret = FALSE;
        break;
    }

    return ret;
1581 1582
}

1583
GstPadProbeReturn
1584
GstVideoReceiver::_teeProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data)
1585
{
1586
    Q_UNUSED(pad)
1587 1588 1589
    Q_UNUSED(info)

    if(user_data != nullptr) {
1590
        GstVideoReceiver* pThis = static_cast<GstVideoReceiver*>(user_data);
1591
        pThis->_noteTeeFrame();
1592
    }
1593 1594

    return GST_PAD_PROBE_OK;
1595
}
1596

1597
GstPadProbeReturn
1598
GstVideoReceiver::_videoSinkProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data)
1599
{
1600 1601 1602
    Q_UNUSED(pad)
    Q_UNUSED(info)

1603
    if(user_data != nullptr) {
1604
        GstVideoReceiver* pThis = static_cast<GstVideoReceiver*>(user_data);
1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632

        if (pThis->_resetVideoSink) {
            pThis->_resetVideoSink = false;

// FIXME: AV: this makes MPEG2-TS playing smooth but breaks RTSP
//            gst_pad_send_event(pad, gst_event_new_flush_start());
//            gst_pad_send_event(pad, gst_event_new_flush_stop(TRUE));

//            GstBuffer* buf;

//            if ((buf = gst_pad_probe_info_get_buffer(info)) != nullptr) {
//                GstSegment* seg;

//                if ((seg = gst_segment_new()) != nullptr) {
//                    gst_segment_init(seg, GST_FORMAT_TIME);

//                    seg->start = buf->pts;

//                    gst_pad_send_event(pad, gst_event_new_segment(seg));

//                    gst_segment_free(seg);
//                    seg = nullptr;
//                }

//                gst_pad_set_offset(pad, -static_cast<gint64>(buf->pts));
//            }
        }

1633 1634 1635 1636 1637 1638
        pThis->_noteVideoSinkFrame();
    }

    return GST_PAD_PROBE_OK;
}

1639
GstPadProbeReturn
1640
GstVideoReceiver::_eosProbe(GstPad* pad, GstPadProbeInfo* info, gpointer user_data)
1641 1642
{
    Q_UNUSED(pad);
1643
    Q_ASSERT(user_data != nullptr);
1644

1645 1646
    if(info != nullptr) {
        GstEvent* event = gst_pad_probe_info_get_event(info);
1647

1648
        if (GST_EVENT_TYPE(event) == GST_EVENT_EOS) {
1649
            GstVideoReceiver* pThis = static_cast<GstVideoReceiver*>(user_data);
1650
            pThis->_noteEndOfStream();
1651 1652 1653
        }
    }

1654
    return GST_PAD_PROBE_OK;
1655 1656
}

1657
GstPadProbeReturn
1658
GstVideoReceiver::_keyframeWatch(GstPad* pad, GstPadProbeInfo* info, gpointer user_data)
1659
{
1660 1661 1662
    if (info == nullptr || user_data == nullptr) {
        qCCritical(VideoReceiverLog) << "Invalid arguments";
        return GST_PAD_PROBE_DROP;
1663 1664
    }

1665 1666 1667 1668
    GstBuffer* buf = gst_pad_probe_info_get_buffer(info);

    if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLAG_DELTA_UNIT)) { // wait for a keyframe
        return GST_PAD_PROBE_DROP;
1669 1670
    }

1671 1672
    // set media file '0' offset to current timeline position - we don't want to touch other elements in the graph, except these which are downstream!
    gst_pad_set_offset(pad, -static_cast<gint64>(buf->pts));
1673

1674
    GstVideoReceiver* pThis = static_cast<GstVideoReceiver*>(user_data);
1675 1676

    qCDebug(VideoReceiverLog) << "Got keyframe, stop dropping buffers";
1677

1678 1679 1680 1681
    pThis->recordingStarted();

    return GST_PAD_PROBE_REMOVE;
}