VideoReceiver.cc 16 KB
Newer Older
1 2 3 4 5 6 7 8
/****************************************************************************
 *
 *   (c) 2009-2016 QGROUNDCONTROL PROJECT <http://www.qgroundcontrol.org>
 *
 * QGroundControl is licensed according to the terms in the file
 * COPYING.md in the root of the source code directory.
 *
 ****************************************************************************/
Gus Grubba's avatar
Gus Grubba committed
9 10 11 12 13 14 15 16 17 18


/**
 * @file
 *   @brief QGC Video Receiver
 *   @author Gus Grubba <mavlink@grubba.com>
 */

#include "VideoReceiver.h"
#include <QDebug>
19
#include <QUrl>
20 21 22
#include <QDir>
#include <QDateTime>

23
VideoReceiver::Sink* VideoReceiver::_sink = NULL;
24 25
GstElement*          VideoReceiver::_pipeline = NULL;
GstElement*          VideoReceiver::_pipeline2 = NULL;
26
GstElement*          VideoReceiver::_tee = NULL;
27

28 29 30
// -EOS has appeared on the bus of the temporary pipeline
// -At this point all of the recoring elements have been flushed, and the video file has been finalized
// -Now we can remove the temporary pipeline and its elements
31 32 33 34 35 36
gboolean VideoReceiver::_eosCB(GstBus* bus, GstMessage* message, gpointer user_data)
{
    Q_UNUSED(bus);
    Q_UNUSED(message);
    Q_UNUSED(user_data);

37 38 39
    gst_bin_remove(GST_BIN(_pipeline2), _sink->queue);
    gst_bin_remove(GST_BIN(_pipeline2), _sink->mux);
    gst_bin_remove(GST_BIN(_pipeline2), _sink->filesink);
40 41 42 43

    gst_element_set_state(_pipeline2, GST_STATE_NULL);
    gst_object_unref(_pipeline2);

44 45 46
    gst_element_set_state(_sink->filesink, GST_STATE_NULL);
    gst_element_set_state(_sink->mux, GST_STATE_NULL);
    gst_element_set_state(_sink->queue, GST_STATE_NULL);
47

48 49 50
    gst_object_unref(_sink->queue);
    gst_object_unref(_sink->mux);
    gst_object_unref(_sink->filesink);
51

52 53
    delete _sink;
    _sink = NULL;
54 55 56 57

    return true;
}

58 59 60 61
// -Unlink the recording branch from the tee in the main pipeline
// -Create a second temporary pipeline, and place the recording branch elements into that pipeline
// -Setup watch and handler for EOS event on the temporary pipeline's bus
// -Send an EOS event at the beginning of that pipeline and set up a callback for
62 63 64 65 66 67
GstPadProbeReturn VideoReceiver::_unlinkCB(GstPad* pad, GstPadProbeInfo* info, gpointer user_data)
{
    Q_UNUSED(pad);
    Q_UNUSED(info);
    Q_UNUSED(user_data);

68 69
    // We will only execute once
    if(!g_atomic_int_compare_and_exchange(&_sink->removing, FALSE, TRUE))
70 71 72
        return GST_PAD_PROBE_OK;

    // Also unlinks and unrefs
73 74 75 76 77
    gst_bin_remove_many(GST_BIN (_pipeline), _sink->queue, _sink->mux, _sink->filesink, NULL);

    // Give tee its pad back
    gst_element_release_request_pad(_tee, _sink->teepad);
    gst_object_unref(_sink->teepad);
78

79
    // Create temporary pipeline
80 81
    _pipeline2 = gst_pipeline_new("pipe2");

82 83 84
    // Put our elements from the recording branch into the temporary pipeline
    gst_bin_add_many(GST_BIN(_pipeline2), _sink->queue, _sink->mux, _sink->filesink, NULL);
    gst_element_link_many(_sink->queue, _sink->mux, _sink->filesink, NULL);
85

86
    // Add watch for EOS event
87 88 89
    GstBus* bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline2));
    gst_bus_add_signal_watch(bus);
    g_signal_connect(bus, "message::eos", G_CALLBACK(_eosCB), NULL);
90
    gst_object_unref(bus);
91 92 93 94 95

    if(gst_element_set_state(_pipeline2, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
        qDebug() << "problem starting pipeline2";
    }

96 97 98 99
    // Send EOS at the beginning of the pipeline
    GstPad* sinkpad = gst_element_get_static_pad(_sink->queue, "sink");
    gst_pad_send_event(sinkpad, gst_event_new_eos());
    gst_object_unref(sinkpad);
100 101 102 103

    return GST_PAD_PROBE_REMOVE;
}

104 105 106 107 108 109 110 111 112 113 114 115
// When we finish our pipeline will look like this:
//
//                                   +-->queue-->decoder-->_videosink
//                                   |
//    datasource-->demux-->parser-->tee
//                                   |
//                                   |    +--------------_sink-------------------+
//                                   |    |                                      |
//   we are adding these elements->  +->teepad-->queue-->matroskamux-->_filesink |
//                                        |                                      |
//                                        +--------------------------------------+
void VideoReceiver::startRecording(void)
116 117 118 119 120 121
{
    // exit immediately if we are already recording
    if(_pipeline == NULL || _recording) {
        return;
    }

122
    _sink = g_new0(Sink, 1);
123

124 125 126 127 128
    _sink->teepad = gst_element_get_request_pad(_tee, "src_%u");
    _sink->queue = gst_element_factory_make("queue", NULL);
    _sink->mux = gst_element_factory_make("matroskamux", NULL);
    _sink->filesink = gst_element_factory_make("filesink", NULL);
    _sink->removing = false;
129 130

    QString filename = QDir::homePath() + "/" + QDateTime::currentDateTime().toString() + ".mkv";
131
    g_object_set(G_OBJECT(_sink->filesink), "location", qPrintable(filename), NULL);
132

133 134 135
    gst_object_ref(_sink->queue);
    gst_object_ref(_sink->mux);
    gst_object_ref(_sink->filesink);
136

137 138
    gst_bin_add_many(GST_BIN(_pipeline), _sink->queue, _sink->mux, _sink->filesink, NULL);
    gst_element_link_many(_sink->queue, _sink->mux, _sink->filesink, NULL);
139

140 141 142
    gst_element_sync_state_with_parent(_sink->queue);
    gst_element_sync_state_with_parent(_sink->mux);
    gst_element_sync_state_with_parent(_sink->filesink);
143

144 145
    GstPad* sinkpad = gst_element_get_static_pad(_sink->queue, "sink");
    gst_pad_link(_sink->teepad, sinkpad);
146 147 148 149 150 151
    gst_object_unref(sinkpad);

    _recording = true;
    emit recordingChanged();
}

152
void VideoReceiver::stopRecording(void)
153 154 155 156 157 158
{
    // exit immediately if we are not recording
    if(_pipeline == NULL || !_recording) {
        return;
    }

159
    gst_pad_add_probe(_sink->teepad, GST_PAD_PROBE_TYPE_IDLE, _unlinkCB, _sink, NULL);
160 161 162 163

    _recording = false;
    emit recordingChanged();
}
Gus Grubba's avatar
Gus Grubba committed
164 165 166

VideoReceiver::VideoReceiver(QObject* parent)
    : QObject(parent)
167
#if defined(QGC_GST_STREAMING)
168
    , _recording(false)
Gus Grubba's avatar
Gus Grubba committed
169
    , _videoSink(NULL)
170 171
    , _socket(NULL)
    , _serverPresent(false)
172
#endif
Gus Grubba's avatar
Gus Grubba committed
173
{
174 175 176 177
#if defined(QGC_GST_STREAMING)
    _timer.setSingleShot(true);
    connect(&_timer, &QTimer::timeout, this, &VideoReceiver::_timeout);
#endif
Gus Grubba's avatar
Gus Grubba committed
178 179 180 181
}

VideoReceiver::~VideoReceiver()
{
182
#if defined(QGC_GST_STREAMING)
183 184 185 186 187 188
//    stop();
//    setVideoSink(NULL);
//    if(_socket) {
//        delete _socket;
//    }
    EOS();
189
#endif
Gus Grubba's avatar
Gus Grubba committed
190 191
}

192
#if defined(QGC_GST_STREAMING)
Gus Grubba's avatar
Gus Grubba committed
193 194 195 196 197 198 199 200 201 202 203
void VideoReceiver::setVideoSink(GstElement* sink)
{
    if (_videoSink) {
        gst_object_unref(_videoSink);
        _videoSink = NULL;
    }
    if (sink) {
        _videoSink = sink;
        gst_object_ref_sink(_videoSink);
    }
}
204
#endif
Gus Grubba's avatar
Gus Grubba committed
205

206
#if defined(QGC_GST_STREAMING)
207 208 209 210 211 212 213 214 215 216 217 218 219 220
static void newPadCB(GstElement * element, GstPad* pad, gpointer data)
{
    gchar *name;
    name = gst_pad_get_name(pad);
    g_print("A new pad %s was created\n", name);
    GstCaps * p_caps = gst_pad_get_pad_template_caps (pad);
    gchar * description = gst_caps_to_string(p_caps);
    qDebug() << p_caps << ", " << description;
    g_free(description);
    GstElement * p_rtph264depay = GST_ELEMENT(data);
    if(gst_element_link_pads(element, name, p_rtph264depay, "sink") == false)
        qCritical() << "newPadCB : failed to link elements\n";
    g_free(name);
}
221
#endif
222

223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
#if defined(QGC_GST_STREAMING)
void VideoReceiver::_connected()
{
    //-- Server showed up. Now we start the stream.
    _timer.stop();
    delete _socket;
    _socket = NULL;
    _serverPresent = true;
    start();
}
#endif

#if defined(QGC_GST_STREAMING)
void VideoReceiver::_socketError(QAbstractSocket::SocketError socketError)
{
    Q_UNUSED(socketError);
    delete _socket;
    _socket = NULL;
    //-- Try again in 5 seconds
    _timer.start(5000);
}
#endif

#if defined(QGC_GST_STREAMING)
void VideoReceiver::_timeout()
{
    //-- If socket is live, we got no connection nor a socket error
    if(_socket) {
        delete _socket;
        _socket = NULL;
    }
    //-- RTSP will try to connect to the server. If it cannot connect,
    //   it will simply give up and never try again. Instead, we keep
    //   attempting a connection on this timer. Once a connection is
    //   found to be working, only then we actually start the stream.
    QUrl url(_uri);
    _socket = new QTcpSocket;
    connect(_socket, static_cast<void (QTcpSocket::*)(QAbstractSocket::SocketError)>(&QTcpSocket::error), this, &VideoReceiver::_socketError);
    connect(_socket, &QTcpSocket::connected, this, &VideoReceiver::_connected);
    //qDebug() << "Trying to connect to:" << url.host() << url.port();
    _socket->connectToHost(url.host(), url.port());
    _timer.start(5000);
}
#endif

268 269 270 271 272 273 274 275 276 277

// When we finish our pipeline will look like this:
//
//                                   +-->queue-->decoder-->_videosink
//                                   |
//    datasource-->demux-->parser-->tee
//
//                                   ^
//                                   |
//                                   +-Here we will later link elements for recording
Gus Grubba's avatar
Gus Grubba committed
278 279
void VideoReceiver::start()
{
280
#if defined(QGC_GST_STREAMING)
Gus Grubba's avatar
Gus Grubba committed
281 282 283 284 285 286 287 288 289
    if (_uri.isEmpty()) {
        qCritical() << "VideoReceiver::start() failed because URI is not specified";
        return;
    }
    if (_videoSink == NULL) {
        qCritical() << "VideoReceiver::start() failed because video sink is not set";
        return;
    }

290 291
    bool isUdp = _uri.contains("udp://");

Gus Grubba's avatar
Gus Grubba committed
292 293
    stop();

294 295 296 297 298 299
    //-- For RTSP, check to see if server is there first
    if(!_serverPresent && !isUdp) {
        _timer.start(100);
        return;
    }

Gus Grubba's avatar
Gus Grubba committed
300 301 302 303 304 305 306
    bool running = false;

    GstElement*     dataSource  = NULL;
    GstCaps*        caps        = NULL;
    GstElement*     demux       = NULL;
    GstElement*     parser      = NULL;
    GstElement*     decoder     = NULL;
307 308 309 310 311 312
    GstElement*     queue1      = NULL;

    // Pads to link queues and tee
    GstPad*         teeSrc1     = NULL; // tee source pad 1
    GstPad*         q1Sink      = NULL; // queue1 sink pad

Gus Grubba's avatar
Gus Grubba committed
313 314
    do {
        if ((_pipeline = gst_pipeline_new("receiver")) == NULL) {
315
            qCritical() << "VideoReceiver::start() failed. Error with gst_pipeline_new()";
Gus Grubba's avatar
Gus Grubba committed
316 317 318
            break;
        }

319 320 321 322
        if(isUdp) {
            dataSource = gst_element_factory_make("udpsrc", "udp-source");
        } else {
            dataSource = gst_element_factory_make("rtspsrc", "rtsp-source");
Gus Grubba's avatar
Gus Grubba committed
323 324
        }

325 326
        if (!dataSource) {
            qCritical() << "VideoReceiver::start() failed. Error with data source for gst_element_factory_make()";
Gus Grubba's avatar
Gus Grubba committed
327 328 329
            break;
        }

330 331 332 333 334 335 336
        if(isUdp) {
            if ((caps = gst_caps_from_string("application/x-rtp, media=(string)video, clock-rate=(int)90000, encoding-name=(string)H264")) == NULL) {
                qCritical() << "VideoReceiver::start() failed. Error with gst_caps_from_string()";
                break;
            }
            g_object_set(G_OBJECT(dataSource), "uri", qPrintable(_uri), "caps", caps, NULL);
        } else {
337
            g_object_set(G_OBJECT(dataSource), "location", qPrintable(_uri), "latency", 0, "udp-reconnect", 1, "timeout", 5000000, NULL);
338
        }
Gus Grubba's avatar
Gus Grubba committed
339 340

        if ((demux = gst_element_factory_make("rtph264depay", "rtp-h264-depacketizer")) == NULL) {
341
            qCritical() << "VideoReceiver::start() failed. Error with gst_element_factory_make('rtph264depay')";
Gus Grubba's avatar
Gus Grubba committed
342 343 344
            break;
        }

345 346 347 348
        if(!isUdp) {
            g_signal_connect(dataSource, "pad-added", G_CALLBACK(newPadCB), demux);
        }

Gus Grubba's avatar
Gus Grubba committed
349
        if ((parser = gst_element_factory_make("h264parse", "h264-parser")) == NULL) {
350
            qCritical() << "VideoReceiver::start() failed. Error with gst_element_factory_make('h264parse')";
Gus Grubba's avatar
Gus Grubba committed
351 352 353 354
            break;
        }

        if ((decoder = gst_element_factory_make("avdec_h264", "h264-decoder")) == NULL) {
355
            qCritical() << "VideoReceiver::start() failed. Error with gst_element_factory_make('avdec_h264')";
Gus Grubba's avatar
Gus Grubba committed
356 357 358
            break;
        }

359
        if((_tee = gst_element_factory_make("tee", "stream-file-tee")) == NULL)  {
360 361 362
            qCritical() << "VideoReceiver::start() failed. Error with gst_element_factory_make('tee')";
            break;
        }
Gus Grubba's avatar
Gus Grubba committed
363

364 365 366 367
        if((queue1 = gst_element_factory_make("queue", NULL)) == NULL)  {
            qCritical() << "VideoReceiver::start() failed. Error with gst_element_factory_make('queue1')";
            break;
        }
368

369
        gst_bin_add_many(GST_BIN(_pipeline), dataSource, demux, parser, _tee, queue1, decoder, _videoSink, NULL);
370 371 372 373 374 375 376 377

//        if(isUdp) {
//            res = gst_element_link_many(dataSource, demux, parser, decoder, tee, _videoSink, NULL);
//        } else {
//            res = gst_element_link_many(demux, parser, decoder, tee, _videoSink, NULL);
//        }

        // Link the pipeline in front of the tee
378
        if(!gst_element_link_many(dataSource, demux, parser, _tee, NULL)) {
379 380
            qCritical() << "Unable to link datasource and tee.";
            break;
381 382
        }

383 384 385
        // Link the videostream to queue1
        if(!gst_element_link_many(queue1, decoder, _videoSink, NULL)) {
            qCritical() << "Unable to link queue1 and videosink.";
Gus Grubba's avatar
Gus Grubba committed
386 387 388
            break;
        }

389
        // Link the queues to the tee
390
        teeSrc1 = gst_element_get_request_pad(_tee, "src_%u");
391 392 393 394 395 396 397 398 399 400 401 402 403
        q1Sink = gst_element_get_static_pad(queue1, "sink");

        // Link the tee to queue1
        if (gst_pad_link(teeSrc1, q1Sink) != GST_PAD_LINK_OK ){
            qCritical() << "Tee for queue1 could not be linked.\n";
            break;
        }

        gst_object_unref(teeSrc1);
        gst_object_unref(q1Sink);

        teeSrc1 = q1Sink = NULL;
        queue1 = NULL;
Gus Grubba's avatar
Gus Grubba committed
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
        dataSource = demux = parser = decoder = NULL;

        GstBus* bus = NULL;

        if ((bus = gst_pipeline_get_bus(GST_PIPELINE(_pipeline))) != NULL) {
            gst_bus_add_watch(bus, _onBusMessage, this);
            gst_object_unref(bus);
            bus = NULL;
        }

        running = gst_element_set_state(_pipeline, GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE;

    } while(0);

    if (caps != NULL) {
        gst_caps_unref(caps);
        caps = NULL;
    }

    if (!running) {
        qCritical() << "VideoReceiver::start() failed";

        if (decoder != NULL) {
            gst_object_unref(decoder);
            decoder = NULL;
        }

        if (parser != NULL) {
            gst_object_unref(parser);
            parser = NULL;
        }

        if (demux != NULL) {
            gst_object_unref(demux);
            demux = NULL;
        }

        if (dataSource != NULL) {
            gst_object_unref(dataSource);
            dataSource = NULL;
        }

446 447
        if (_tee != NULL) {
            gst_object_unref(_tee);
448 449 450 451 452 453 454 455
            dataSource = NULL;
        }

        if (queue1 != NULL) {
            gst_object_unref(queue1);
            dataSource = NULL;
        }

Gus Grubba's avatar
Gus Grubba committed
456 457 458 459 460
        if (_pipeline != NULL) {
            gst_object_unref(_pipeline);
            _pipeline = NULL;
        }
    }
461 462

    qDebug() << "Video Receiver started.";
463
#endif
Gus Grubba's avatar
Gus Grubba committed
464 465
}

466 467 468 469
void VideoReceiver::EOS() {
    gst_element_send_event(_pipeline, gst_event_new_eos());
}

Gus Grubba's avatar
Gus Grubba committed
470 471
void VideoReceiver::stop()
{
472
#if defined(QGC_GST_STREAMING)
473
    qDebug() << "stop()";
Gus Grubba's avatar
Gus Grubba committed
474
    if (_pipeline != NULL) {
475
        qDebug() << "Stopping pipeline";
Gus Grubba's avatar
Gus Grubba committed
476 477 478
        gst_element_set_state(_pipeline, GST_STATE_NULL);
        gst_object_unref(_pipeline);
        _pipeline = NULL;
479
        _serverPresent = false;
Gus Grubba's avatar
Gus Grubba committed
480
    }
481
#endif
Gus Grubba's avatar
Gus Grubba committed
482 483 484 485 486 487 488 489
}

void VideoReceiver::setUri(const QString & uri)
{
    stop();
    _uri = uri;
}

490
#if defined(QGC_GST_STREAMING)
Gus Grubba's avatar
Gus Grubba committed
491 492
void VideoReceiver::_onBusMessage(GstMessage* msg)
{
493 494
    //qDebug() << "Got bus message";

Gus Grubba's avatar
Gus Grubba committed
495 496
    switch (GST_MESSAGE_TYPE(msg)) {
    case GST_MESSAGE_EOS:
497 498
        qDebug() << "Got EOS";
        //stop();
Gus Grubba's avatar
Gus Grubba committed
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514
        break;
    case GST_MESSAGE_ERROR:
        do {
            gchar* debug;
            GError* error;
            gst_message_parse_error(msg, &error, &debug);
            g_free(debug);
            qCritical() << error->message;
            g_error_free(error);
        } while(0);
        stop();
        break;
    default:
        break;
    }
}
515
#endif
Gus Grubba's avatar
Gus Grubba committed
516

517
#if defined(QGC_GST_STREAMING)
Gus Grubba's avatar
Gus Grubba committed
518 519 520 521 522 523 524 525
gboolean VideoReceiver::_onBusMessage(GstBus* bus, GstMessage* msg, gpointer data)
{
    Q_UNUSED(bus)
    Q_ASSERT(msg != NULL && data != NULL);
    VideoReceiver* pThis = (VideoReceiver*)data;
    pThis->_onBusMessage(msg);
    return TRUE;
}
526
#endif