entirely start/stop hls server pipeline depending on http requests (fixes idle cpu...
[dreamrtspserver.git] / src / dreamrtspserver.c
1 /*
2  * GStreamer dreamrtspserver
3  * Copyright 2015-2016 Andreas Frisch <fraxinas@opendreambox.org>
4  *
5  * This program is licensed under the Creative Commons
6  * Attribution-NonCommercial-ShareAlike 3.0 Unported
7  * License. To view a copy of this license, visit
8  * http://creativecommons.org/licenses/by-nc-sa/3.0/ or send a letter to
9  * Creative Commons,559 Nathan Abbott Way,Stanford,California 94305,USA.
10  *
11  * Alternatively, this program may be distributed and executed on
12  * hardware which is licensed by Dream Property GmbH.
13  *
14  * This program is NOT free software. It is open source, you are allowed
15  * to modify it (if you keep the license), but it may not be commercially
16  * distributed other than under the conditions noted above.
17  */
18
19 #include "dreamrtspserver.h"
20 #include "gstdreamrtsp.h"
21
22 #define QUEUE_DEBUG \
23                 guint cur_bytes, cur_buf; \
24                 guint64 cur_time; \
25                 g_object_get (t->tstcpq, "current-level-bytes", &cur_bytes, NULL); \
26                 g_object_get (t->tstcpq, "current-level-buffers", &cur_buf, NULL); \
27                 g_object_get (t->tstcpq, "current-level-time", &cur_time, NULL);
28
29
30 static void send_signal (App *app, const gchar *signal_name, GVariant *parameters)
31 {
32         if (app->dbus_connection)
33         {
34                 GST_DEBUG ("sending signal name=%s parameters=%s", signal_name, parameters?g_variant_print (parameters, TRUE):"[not given]");
35                 g_dbus_connection_emit_signal (app->dbus_connection, NULL, object_name, service, signal_name, parameters, NULL);
36         }
37         else
38                 GST_DEBUG ("no dbus connection, can't send signal %s", signal_name);
39 }
40
41 static gboolean gst_set_inputmode(App *app, inputMode input_mode)
42 {
43         if (!app->pipeline)
44                 return FALSE;
45
46         g_object_set (G_OBJECT (app->asrc), "input_mode", input_mode, NULL);
47         g_object_set (G_OBJECT (app->vsrc), "input_mode", input_mode, NULL);
48
49         inputMode ret1, ret2;
50         g_object_get (G_OBJECT (app->asrc), "input_mode", &ret1, NULL);
51         g_object_get (G_OBJECT (app->vsrc), "input_mode", &ret2, NULL);
52
53         if (input_mode != ret1 || input_mode != ret2)
54                 return FALSE;
55
56         GST_DEBUG("set input_mode %d", input_mode);
57         return TRUE;
58 }
59
60 static gboolean gst_set_framerate(App *app, int value)
61 {
62         GstCaps *oldcaps, *newcaps;
63         GstStructure *structure;
64         gboolean ret = FALSE;
65
66         if (!app->pipeline)
67                 goto out;
68
69         g_object_get (G_OBJECT (app->vsrc), "caps", &oldcaps, NULL);
70
71         if (!GST_IS_CAPS(oldcaps))
72                 goto out;
73
74         GST_DEBUG("set framerate %d fps... old caps %" GST_PTR_FORMAT, value, oldcaps);
75
76         newcaps = gst_caps_make_writable(oldcaps);
77         structure = gst_caps_steal_structure (newcaps, 0);
78         if (!structure)
79                 goto out;
80
81         if (value)
82                 gst_structure_set (structure, "framerate", GST_TYPE_FRACTION, value, 1, NULL);
83
84         gst_caps_append_structure (newcaps, structure);
85         GST_INFO("new caps %" GST_PTR_FORMAT, newcaps);
86         g_object_set (G_OBJECT (app->vsrc), "caps", newcaps, NULL);
87         ret = TRUE;
88
89 out:
90         if (GST_IS_CAPS(oldcaps))
91                 gst_caps_unref(oldcaps);
92         if (GST_IS_CAPS(newcaps))
93                 gst_caps_unref(newcaps);
94         return ret;
95 }
96
97 static gboolean gst_set_resolution(App *app, int width, int height)
98 {
99         GstCaps *oldcaps, *newcaps;
100         GstStructure *structure;
101         gboolean ret = FALSE;
102
103         if (!app->pipeline)
104                 goto out;
105
106         g_object_get (G_OBJECT (app->vsrc), "caps", &oldcaps, NULL);
107
108         if (!GST_IS_CAPS(oldcaps))
109                 goto out;
110
111         GST_DEBUG("set new resolution %ix%i... old caps %" GST_PTR_FORMAT, width, height, oldcaps);
112
113         newcaps = gst_caps_make_writable(oldcaps);
114         structure = gst_caps_steal_structure (newcaps, 0);
115         if (!structure)
116                 goto out;
117
118         if (width && height)
119         {
120                 gst_structure_set (structure, "width", G_TYPE_INT, width, NULL);
121                 gst_structure_set (structure, "height", G_TYPE_INT, height, NULL);
122         }
123         gst_caps_append_structure (newcaps, structure);
124         GST_INFO("new caps %" GST_PTR_FORMAT, newcaps);
125         g_object_set (G_OBJECT (app->vsrc), "caps", newcaps, NULL);
126         ret = TRUE;
127
128 out:
129         if (GST_IS_CAPS(oldcaps))
130                 gst_caps_unref(oldcaps);
131         if (GST_IS_CAPS(newcaps))
132                 gst_caps_unref(newcaps);
133         return ret;
134 }
135
136 static gboolean gst_get_capsprop(App *app, GstElement *element, const gchar* prop_name, guint32 *value)
137 {
138         GstCaps *caps = NULL;
139         const GstStructure *structure;
140         gboolean ret = FALSE;
141
142         if (!app->pipeline)
143                 goto out;
144
145         if (!GST_IS_ELEMENT(element))
146                 goto out;
147
148         g_object_get (G_OBJECT (element), "caps", &caps, NULL);
149
150         if (!GST_IS_CAPS(caps) || gst_caps_is_empty (caps) )
151                 goto out;
152
153         GST_DEBUG ("current caps %" GST_PTR_FORMAT, caps);
154
155         structure = gst_caps_get_structure (caps, 0);
156         if (!structure)
157                 goto out;
158
159         if (g_strcmp0 (prop_name, "framerate") == 0 && value)
160         {
161                 const GValue *framerate = gst_structure_get_value (structure, "framerate");
162                 if (GST_VALUE_HOLDS_FRACTION(framerate))
163                         *value = gst_value_get_fraction_numerator (framerate);
164                 else
165                         *value = 0;
166         }
167         else if ((g_strcmp0 (prop_name, "width") == 0 || g_strcmp0 (prop_name, "height") == 0) && value)
168         {
169                 if (!gst_structure_get_int (structure, prop_name, (guint*)value))
170                         *value = 0;
171         }
172         else
173                 goto out;
174
175         GST_DEBUG ("%" GST_PTR_FORMAT"'s %s = %i", element, prop_name, *value);
176         ret = TRUE;
177 out:
178         if (caps)
179                 gst_caps_unref(caps);
180         return ret;
181 }
182
183 static void get_source_properties (App *app)
184 {
185         SourceProperties *p = &app->source_properties;
186         if (app->asrc)
187                 g_object_get (G_OBJECT (app->asrc), "bitrate", &p->audioBitrate, NULL);
188         if (app->vsrc)
189         {
190                 g_object_get (G_OBJECT (app->vsrc), "bitrate", &p->videoBitrate, NULL);
191                 gst_get_capsprop(app, app->vsrc, "width", &p->width);
192                 gst_get_capsprop(app, app->vsrc, "height", &p->height);
193                 gst_get_capsprop(app, app->vsrc, "framerate", &p->framerate);
194         }
195 }
196
197 static void apply_source_properties (App *app)
198 {
199         SourceProperties *p = &app->source_properties;
200         if (app->asrc)
201         {
202                 if (p->audioBitrate)
203                         g_object_set (G_OBJECT (app->asrc), "bitrate", p->audioBitrate, NULL);
204         }
205         if (app->vsrc)
206         {
207                 if (p->videoBitrate)
208                         g_object_set (G_OBJECT (app->vsrc), "bitrate", p->videoBitrate, NULL);
209                 if (p->framerate)
210                         gst_set_framerate(app, p->framerate);
211                 if (p->width && p->height)
212                         gst_set_resolution(app,  p->width, p->height);
213         }
214 }
215
216 static gboolean gst_set_bitrate (App *app, GstElement *source, gint32 value)
217 {
218         if (!GST_IS_ELEMENT (source) || !value)
219                 return FALSE;
220
221         g_object_set (G_OBJECT (source), "bitrate", value, NULL);
222
223         gint32 checkvalue = 0;
224
225         g_object_get (G_OBJECT (source), "bitrate", &checkvalue, NULL);
226
227         if (value != checkvalue)
228                 return FALSE;
229
230         get_source_properties(app);
231         return TRUE;
232 }
233
234 gboolean upstream_resume_transmitting(App *app)
235 {
236         DreamTCPupstream *t = app->tcp_upstream;
237         GST_INFO_OBJECT (app, "resuming normal transmission...");
238         t->state = UPSTREAM_STATE_TRANSMITTING;
239         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_TRANSMITTING));
240         t->overrun_counter = 0;
241         t->overrun_period = GST_CLOCK_TIME_NONE;
242         t->id_signal_waiting = 0;
243         if (t->id_signal_keepalive)
244                 g_source_remove (t->id_signal_keepalive);
245         t->id_signal_keepalive = 0;
246         return G_SOURCE_REMOVE;
247 }
248
249 static GVariant *handle_get_property (GDBusConnection  *connection,
250                                       const gchar      *sender,
251                                       const gchar      *object_path,
252                                       const gchar      *interface_name,
253                                       const gchar      *property_name,
254                                       GError          **error,
255                                       gpointer          user_data)
256 {
257         App *app = user_data;
258
259         GST_DEBUG("dbus get property %s from %s", property_name, sender);
260
261         if (g_strcmp0 (property_name, "sourceState") == 0)
262         {
263                 if (app->pipeline)
264                 {
265                         GstState state = GST_STATE_VOID_PENDING;
266                         gst_element_get_state (GST_ELEMENT(app->pipeline), &state, NULL, 1*GST_USECOND);
267                         return g_variant_new_int32 ((int)state);
268                 }
269         }
270         else if (g_strcmp0 (property_name, "upstreamState") == 0)
271         {
272                 if (app->tcp_upstream)
273                         return g_variant_new_int32 (app->tcp_upstream->state);
274         }
275         else if (g_strcmp0 (property_name, "hlsState") == 0)
276         {
277                 if (app->hls_server)
278                         return g_variant_new_int32 (app->hls_server->state);
279         }
280         else if (g_strcmp0 (property_name, "inputMode") == 0)
281         {
282                 inputMode input_mode = -1;
283                 if (app->asrc)
284                 {
285                         g_object_get (G_OBJECT (app->asrc), "input_mode", &input_mode, NULL);
286                         return g_variant_new_int32 (input_mode);
287                 }
288         }
289         else if (g_strcmp0 (property_name, "rtspClientCount") == 0)
290         {
291                 if (app->rtsp_server)
292                         return g_variant_new_int32 (g_list_length(app->rtsp_server->clients_list));
293         }
294         else if (g_strcmp0 (property_name, "uriParameters") == 0)
295         {
296                 if (app->rtsp_server)
297                         return g_variant_new_string (app->rtsp_server->uri_parameters);
298         }
299         else if (g_strcmp0 (property_name, "audioBitrate") == 0)
300         {
301                 gint rate = 0;
302                 if (app->asrc)
303                 {
304                         g_object_get (G_OBJECT (app->asrc), "bitrate", &rate, NULL);
305                         return g_variant_new_int32 (rate);
306                 }
307         }
308         else if (g_strcmp0 (property_name, "videoBitrate") == 0)
309         {
310                 gint rate = 0;
311                 if (app->vsrc)
312                 {
313                         g_object_get (G_OBJECT (app->vsrc), "bitrate", &rate, NULL);
314                         return g_variant_new_int32 (rate);
315                 }
316         }
317         else if (g_strcmp0 (property_name, "width") == 0 || g_strcmp0 (property_name, "height") == 0 || g_strcmp0 (property_name, "framerate") == 0)
318         {
319                 guint32 value;
320                 if (gst_get_capsprop(app, app->vsrc, property_name, &value))
321                         return g_variant_new_int32(value);
322                 GST_WARNING("can't handle_get_property name=%s", property_name);
323         }
324         else if (g_strcmp0 (property_name, "autoBitrate") == 0)
325         {
326                 if (app->tcp_upstream)
327                         return g_variant_new_boolean(app->tcp_upstream->auto_bitrate);
328         }
329         else if (g_strcmp0 (property_name, "path") == 0)
330         {
331                 if (app->rtsp_server)
332                         return g_variant_new_string (app->rtsp_server->rtsp_ts_path);
333         }
334         g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "[RTSPserver] Invalid property '%s'", property_name);
335         return NULL;
336 } // handle_get_property
337
338 static gboolean handle_set_property (GDBusConnection  *connection,
339                                      const gchar      *sender,
340                                      const gchar      *object_path,
341                                      const gchar      *interface_name,
342                                      const gchar      *property_name,
343                                      GVariant         *value,
344                                      GError          **error,
345                                      gpointer          user_data)
346 {
347         App *app = user_data;
348
349         gchar *valstr = g_variant_print (value, TRUE);
350         GST_DEBUG("dbus set property %s = %s from %s", property_name, valstr, sender);
351         g_free (valstr);
352
353         if (g_strcmp0 (property_name, "inputMode") == 0)
354         {
355                 inputMode input_mode = g_variant_get_int32 (value);
356                 if (input_mode >= INPUT_MODE_LIVE && input_mode <= INPUT_MODE_BACKGROUND )
357                 {
358                         if (gst_set_inputmode(app, input_mode))
359                                 return 1;
360                 }
361                 g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "[RTSPserver] can't set input_mode to %d", input_mode);
362                 return 0;
363         }
364         else if (g_strcmp0 (property_name, "audioBitrate") == 0)
365         {
366                 if (gst_set_bitrate (app, app->asrc, g_variant_get_int32 (value)))
367                         return 1;
368         }
369         else if (g_strcmp0 (property_name, "videoBitrate") == 0)
370         {
371                 if (gst_set_bitrate (app, app->vsrc, g_variant_get_int32 (value)))
372                         return 1;
373         }
374         else if (g_strcmp0 (property_name, "framerate") == 0)
375         {
376                 if (gst_set_framerate(app, g_variant_get_int32 (value)))
377                         return 1;
378                 g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "[RTSPserver] can't set property '%s' to %d", property_name, g_variant_get_int32 (value));
379                 return 0;
380         }
381         else if (g_strcmp0 (property_name, "autoBitrate") == 0)
382         {
383                 if (app->tcp_upstream)
384                 {
385                         gboolean enable = g_variant_get_boolean(value);
386                         if (app->tcp_upstream->state == UPSTREAM_STATE_OVERLOAD)
387                                 upstream_resume_transmitting(app);
388                         app->tcp_upstream->auto_bitrate = enable;
389                         return 1;
390                 }
391         }
392         else
393         {
394                 g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "[RTSPserver] Invalid property: '%s'", property_name);
395                 return 0;
396         } // unknown property
397         g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "[RTSPserver] Wrong state - can't set property: '%s'", property_name);
398         return 0;
399 } // handle_set_property
400
401 static void handle_method_call (GDBusConnection       *connection,
402                                 const gchar           *sender,
403                                 const gchar           *object_path,
404                                 const gchar           *interface_name,
405                                 const gchar           *method_name,
406                                 GVariant              *parameters,
407                                 GDBusMethodInvocation *invocation,
408                                 gpointer               user_data)
409 {
410         App *app = user_data;
411
412         gchar *paramstr = g_variant_print (parameters, TRUE);
413         GST_DEBUG("dbus handle method %s %s from %s", method_name, paramstr, sender);
414         g_free (paramstr);
415         if (g_strcmp0 (method_name, "enableRTSP") == 0)
416         {
417                 gboolean result = FALSE;
418                 if (app->pipeline)
419                 {
420                         gboolean state;
421                         guint32 port;
422                         const gchar *path, *user, *pass;
423
424                         g_variant_get (parameters, "(b&su&s&s)", &state, &path, &port, &user, &pass);
425                         GST_DEBUG("app->pipeline=%p, enableRTSP state=%i path=%s port=%i user=%s pass=%s", app->pipeline, state, path, port, user, pass);
426
427                         if (state == TRUE && app->rtsp_server->state >= RTSP_STATE_DISABLED)
428                                 result = enable_rtsp_server(app, path, port, user, pass);
429                         else if (state == FALSE && app->rtsp_server->state >= RTSP_STATE_IDLE)
430                         {
431                                 result = disable_rtsp_server(app);
432                                 if (app->tcp_upstream->state == UPSTREAM_STATE_DISABLED && app->hls_server->state == HLS_STATE_DISABLED)
433                                 {
434                                         destroy_pipeline(app);
435                                         create_source_pipeline(app);
436                                 }
437                         }
438                 }
439                 g_dbus_method_invocation_return_value (invocation,  g_variant_new ("(b)", result));
440         }
441         else if (g_strcmp0 (method_name, "enableHLS") == 0)
442         {
443                 gboolean result = FALSE;
444                 if (app->pipeline)
445                 {
446                         gboolean state;
447                         guint32 port;
448                         const gchar *user, *pass;
449
450                         g_variant_get (parameters, "(bu&s&s)", &state, &port, &user, &pass);
451                         GST_DEBUG("app->pipeline=%p, enableHLS state=%i port=%i user=%s pass=%s", app->pipeline, state, port, user, pass);
452
453                         if (state == TRUE && app->hls_server->state >= HLS_STATE_DISABLED)
454                                 result = enable_hls_server(app, port, user, pass);
455                         else if (state == FALSE && app->hls_server->state >= HLS_STATE_IDLE)
456                         {
457                                 result = disable_hls_server(app);
458                                 if (app->tcp_upstream->state == UPSTREAM_STATE_DISABLED && app->rtsp_server->state == RTSP_STATE_DISABLED)
459                                 {
460                                         destroy_pipeline(app);
461                                         create_source_pipeline(app);
462                                 }
463                         }
464                 }
465                 g_dbus_method_invocation_return_value (invocation,  g_variant_new ("(b)", result));
466         }
467         else if (g_strcmp0 (method_name, "enableUpstream") == 0)
468         {
469                 gboolean result = FALSE;
470                 if (app->pipeline)
471                 {
472                         gboolean state;
473                         const gchar *upstream_host, *token;
474                         guint32 upstream_port;
475
476                         g_variant_get (parameters, "(b&su&s)", &state, &upstream_host, &upstream_port, &token);
477                         GST_DEBUG("app->pipeline=%p, enableUpstream state=%i host=%s port=%i token=%s (currently in state %s)", app->pipeline, state, upstream_host, upstream_port, token, app->tcp_upstream->state);
478
479                         if (state == TRUE && app->tcp_upstream->state == UPSTREAM_STATE_DISABLED)
480                                 result = enable_tcp_upstream(app, upstream_host, upstream_port, token);
481                         else if (state == FALSE && app->tcp_upstream->state >= UPSTREAM_STATE_CONNECTING)
482                         {
483                                 result = disable_tcp_upstream(app);
484                                 if (app->rtsp_server->state == RTSP_STATE_DISABLED)
485                                 {
486                                         destroy_pipeline(app);
487                                         create_source_pipeline(app);
488                                 }
489                         }
490                 }
491                 g_dbus_method_invocation_return_value (invocation,  g_variant_new ("(b)", result));
492         }
493         else if (g_strcmp0 (method_name, "setResolution") == 0)
494         {
495                 int width, height;
496                 g_variant_get (parameters, "(ii)", &width, &height);
497                 if (gst_set_resolution(app, width, height))
498                         g_dbus_method_invocation_return_value (invocation, NULL);
499                 else
500                         g_dbus_method_invocation_return_error (invocation, G_IO_ERROR, G_IO_ERROR_INVALID_DATA, "[RTSPserver] can't set resolution %dx%d", width, height);
501         }
502         // Default: No such method
503         else
504         {
505                 g_dbus_method_invocation_return_error (invocation, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, "[RTSPserver] Invalid method: '%s'", method_name);
506         } // if it's an unknown method
507 } // handle_method_call
508
509 static void on_bus_acquired (GDBusConnection *connection,
510                              const gchar     *name,
511                              gpointer        user_data)
512 {
513         static GDBusInterfaceVTable interface_vtable =
514         {
515                 handle_method_call,
516                 handle_get_property,
517                 handle_set_property
518         };
519
520         GError *error = NULL;
521         GST_DEBUG ("aquired dbus (\"%s\" @ %p)", name, connection);
522         g_dbus_connection_register_object (connection, object_name, introspection_data->interfaces[0], &interface_vtable, user_data, NULL, &error);
523 } // on_bus_acquired
524
525 static void on_name_acquired (GDBusConnection *connection,
526                               const gchar     *name,
527                               gpointer         user_data)
528 {
529         App *app = user_data;
530         app->dbus_connection = connection;
531         GST_DEBUG ("aquired dbus name (\"%s\")", name);
532         if (gst_element_set_state (app->pipeline, GST_STATE_READY) != GST_STATE_CHANGE_SUCCESS)
533                 GST_ERROR ("Failed to bring state of source pipeline to READY");
534 } // on_name_acquired
535
536 static void on_name_lost (GDBusConnection *connection,
537                           const gchar     *name,
538                           gpointer         user_data)
539         {
540         App *app = user_data;
541         app->dbus_connection = NULL;
542         GST_WARNING ("lost dbus name (\"%s\" @ %p)", name, connection);
543         //  g_main_loop_quit (app->loop);
544 } // on_name_lost
545
546 static gboolean message_cb (GstBus * bus, GstMessage * message, gpointer user_data)
547 {
548         App *app = user_data;
549
550 //      DREAMRTSPSERVER_LOCK (app);
551         GST_TRACE_OBJECT (app, "message %" GST_PTR_FORMAT "", message);
552         switch (GST_MESSAGE_TYPE (message)) {
553                 case GST_MESSAGE_STATE_CHANGED:
554                 {
555                         GstState old_state, new_state;
556                         gst_message_parse_state_changed(message, &old_state, &new_state, NULL);
557                         if (old_state == new_state)
558                                 break;
559
560                         if (GST_MESSAGE_SRC(message) == GST_OBJECT(app->pipeline))
561                         {
562                                 GST_DEBUG_OBJECT(app, "state transition %s -> %s", gst_element_state_get_name(old_state), gst_element_state_get_name(new_state));
563                                 send_signal (app, "sourceStateChanged", g_variant_new("(i)", (int) new_state));
564                         }
565                         break;
566                 }
567                 case GST_MESSAGE_ERROR:
568                 {
569                         GError *err = NULL;
570                         gchar *name, *debug = NULL;
571                         name = gst_object_get_path_string (message->src);
572                         gst_message_parse_error (message, &err, &debug);
573                         if (err->domain == GST_RESOURCE_ERROR)
574                         {
575                                 if (err->code == GST_RESOURCE_ERROR_READ)
576                                 {
577                                         GST_INFO ("element %s: %s", name, err->message);
578                                         send_signal (app, "encoderError", NULL);
579 //                                      DREAMRTSPSERVER_UNLOCK (app);
580                                         disable_tcp_upstream(app);
581                                         destroy_pipeline(app);
582                                 }
583                                 if (err->code == GST_RESOURCE_ERROR_WRITE)
584                                 {
585                                         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_FAILED));
586                                         GST_INFO ("element %s: %s -> this means PEER DISCONNECTED", name, err->message);
587                                         GST_DEBUG ("Additional ERROR debug info: %s", debug);
588 //                                      DREAMRTSPSERVER_UNLOCK (app);
589                                         disable_tcp_upstream(app);
590                                         if (&app->rtsp_server->state == RTSP_STATE_DISABLED)
591                                         {
592                                                 destroy_pipeline(app);
593                                                 create_source_pipeline(app);
594                                         }
595                                 }
596                         }
597                         else
598                         {
599                                 GST_ERROR ("ERROR: from element %s: %s", name, err->message);
600                                 if (debug != NULL)
601                                         GST_ERROR ("Additional debug info: %s", debug);
602                                 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"dreamrtspserver-error");
603                         }
604                         g_error_free (err);
605                         g_free (debug);
606                         g_free (name);
607                         break;
608                 }
609                 case GST_MESSAGE_WARNING:
610                 {
611                         GError *err = NULL;
612                         gchar *name, *debug = NULL;
613                         name = gst_object_get_path_string (message->src);
614                         gst_message_parse_warning (message, &err, &debug);
615                         GST_WARNING ("WARNING: from element %s: %s", name, err->message);
616                         if (debug != NULL)
617                                 GST_WARNING ("Additional debug info: %s", debug);
618 #if 1
619                         if (err->domain == GST_STREAM_ERROR && err->code == GST_STREAM_ERROR_ENCODE)
620                         {
621                                 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"dreamrtspserver-encode-error");
622                                 g_main_loop_quit (app->loop);
623                                 return FALSE;
624                         }
625 #endif
626                         g_error_free (err);
627                         g_free (debug);
628                         g_free (name);
629                         break;
630                 }
631                 case GST_MESSAGE_EOS:
632                         g_print ("Got EOS\n");
633 //                      DREAMRTSPSERVER_UNLOCK (app);
634                         g_main_loop_quit (app->loop);
635                         return FALSE;
636                 default:
637                         break;
638         }
639 //      DREAMRTSPSERVER_UNLOCK (app);
640         return TRUE;
641 }
642
643 static void media_unprepare (GstRTSPMedia * media, gpointer user_data)
644 {
645         App *app = user_data;
646         DreamRTSPserver *r = app->rtsp_server;
647         GST_INFO("no more clients -> media unprepared!");
648
649 //      DREAMRTSPSERVER_LOCK (app);
650         if (media == r->es_media)
651         {
652                 r->es_media = NULL;
653                 r->es_aappsrc = r->es_vappsrc = NULL;
654         }
655         else if (media == r->ts_media)
656         {
657                 r->ts_media = NULL;
658                 r->ts_appsrc = r->ts_appsrc = NULL;
659         }
660         if (!r->es_media && !r->ts_media)
661         {
662                 if (app->tcp_upstream->state == UPSTREAM_STATE_DISABLED && app->hls_server->state == HLS_STATE_DISABLED)
663                         halt_source_pipeline(app);
664                 if (r->state == RTSP_STATE_RUNNING)
665                 {
666                         GST_DEBUG ("set RTSP_STATE_IDLE");
667                         send_signal (app, "rtspStateChanged", g_variant_new("(i)", RTSP_STATE_IDLE));
668                         r->state = RTSP_STATE_IDLE;
669                 }
670         }
671 //      DREAMRTSPSERVER_UNLOCK (app);
672 }
673
674 static void client_closed (GstRTSPClient * client, gpointer user_data)
675 {
676         App *app = user_data;
677         app->rtsp_server->clients_list = g_list_remove(g_list_first (app->rtsp_server->clients_list), client);
678         gint no_clients = g_list_length(app->rtsp_server->clients_list);
679         GST_INFO("client_closed  (number of clients: %i)", no_clients);
680         send_signal (app, "rtspClientCountChanged", g_variant_new("(is)", no_clients, ""));
681 }
682
683 static void client_connected (GstRTSPServer * server, GstRTSPClient * client, gpointer user_data)
684 {
685         App *app = user_data;
686         app->rtsp_server->clients_list = g_list_append(app->rtsp_server->clients_list, client);
687         const gchar *ip = gst_rtsp_connection_get_ip (gst_rtsp_client_get_connection (client));
688         gint no_clients = g_list_length(app->rtsp_server->clients_list);
689         GST_INFO("client_connected %" GST_PTR_FORMAT " from %s  (number of clients: %i)", client, ip, no_clients);
690         g_signal_connect (client, "closed", (GCallback) client_closed, app);
691         send_signal (app, "rtspClientCountChanged", g_variant_new("(is)", no_clients, ip));
692 }
693
694 static void media_configure (GstRTSPMediaFactory * factory, GstRTSPMedia * media, gpointer user_data)
695 {
696         App *app = user_data;
697         DreamRTSPserver *r = app->rtsp_server;
698         DREAMRTSPSERVER_LOCK (app);
699
700         if (GST_DREAM_RTSP_MEDIA_FACTORY (factory) == r->es_factory)
701         {
702                 r->es_media = media;
703                 GstElement *element = gst_rtsp_media_get_element (media);
704                 r->es_aappsrc = gst_bin_get_by_name_recurse_up (GST_BIN (element), ES_AAPPSRC);
705                 r->es_vappsrc = gst_bin_get_by_name_recurse_up (GST_BIN (element), ES_VAPPSRC);
706                 gst_object_unref(element);
707                 g_signal_connect (media, "unprepared", (GCallback) media_unprepare, app);
708                 g_object_set (r->es_aappsrc, "format", GST_FORMAT_TIME, NULL);
709                 g_object_set (r->es_vappsrc, "format", GST_FORMAT_TIME, NULL);
710         }
711         else if (GST_DREAM_RTSP_MEDIA_FACTORY (factory) == r->ts_factory)
712         {
713                 r->ts_media = media;
714                 GstElement *element = gst_rtsp_media_get_element (media);
715                 r->ts_appsrc = gst_bin_get_by_name_recurse_up (GST_BIN (element), TS_APPSRC);
716                 r->ts_appsrc = gst_bin_get_by_name_recurse_up (GST_BIN (element), TS_APPSRC);
717                 gst_object_unref(element);
718                 g_signal_connect (media, "unprepared", (GCallback) media_unprepare, app);
719                 g_object_set (r->ts_appsrc, "format", GST_FORMAT_TIME, NULL);
720         }
721         r->rtsp_start_pts = r->rtsp_start_dts = GST_CLOCK_TIME_NONE;
722         r->state = RTSP_STATE_RUNNING;
723         send_signal (app, "rtspStateChanged", g_variant_new("(i)", RTSP_STATE_RUNNING));
724         GST_DEBUG ("set RTSP_STATE_RUNNING");
725         start_rtsp_pipeline(app);
726         DREAMRTSPSERVER_UNLOCK (app);
727 }
728
729 static void uri_parametrized (GstDreamRTSPMediaFactory * factory, gchar *parameters, gpointer user_data)
730 {
731         App *app = user_data;
732         GST_INFO_OBJECT (app, "parametrized uri query: '%s'", parameters);
733         app->rtsp_server->uri_parameters = g_strdup(parameters);
734         send_signal (app, "uriParametersChanged", g_variant_new("(s)", app->rtsp_server->uri_parameters));
735 }
736
737 static GstPadProbeReturn cancel_waiting_probe (GstPad * sinkpad, GstPadProbeInfo * info, gpointer user_data)
738 {
739         App *app = user_data;
740         DreamTCPupstream *t = app->tcp_upstream;
741         if (info->type & GST_PAD_PROBE_TYPE_BUFFER      && GST_IS_BUFFER     (GST_PAD_PROBE_INFO_BUFFER (info)) ||
742             info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST && gst_buffer_list_length(GST_PAD_PROBE_INFO_BUFFER_LIST (info)))
743         {
744                 QUEUE_DEBUG;
745                 GST_LOG_OBJECT (app, "cancel upstream_set_waiting timeout because data flow was restored! queue properties current-level-bytes=%d current-level-buffers=%d current-level-time=%" GST_TIME_FORMAT "",
746                                   cur_bytes, cur_buf, GST_TIME_ARGS(cur_time));
747                 if (t->id_signal_waiting)
748                         g_source_remove (t->id_signal_waiting);
749                 t->id_signal_waiting = 0;
750                 if (t->id_signal_keepalive)
751                         g_source_remove (t->id_signal_keepalive);
752                 t->id_signal_keepalive = 0;
753                 if (t->id_signal_overrun == 0)
754                         t->id_signal_overrun = g_signal_connect (t->tstcpq, "overrun", G_CALLBACK (queue_overrun), app);
755                 t->id_resume = 0;
756                 return GST_PAD_PROBE_REMOVE;
757         }
758         else
759                 GST_WARNING_OBJECT (app, "probed unhandled % "GST_PTR_FORMAT ", dataflow not restored?", info->data);
760         return GST_PAD_PROBE_OK;
761 }
762
763 static GstPadProbeReturn bitrate_measure_probe (GstPad * sinkpad, GstPadProbeInfo * info, gpointer user_data)
764 {
765         App *app = user_data;
766         DreamTCPupstream *t = app->tcp_upstream;
767         GstClockTime now = gst_clock_get_time (app->clock);
768         GstBuffer *buffer;
769         guint idx = 0, num_buffers = 1;
770         do {
771                 if (info->type & GST_PAD_PROBE_TYPE_BUFFER)
772                 {
773                         buffer = GST_PAD_PROBE_INFO_BUFFER (info);
774                 }
775                 else if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)
776                 {
777                         GstBufferList *bufferlist = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
778                         num_buffers = gst_buffer_list_length (bufferlist);
779                         buffer = gst_buffer_list_get (bufferlist, idx);
780                 }
781                 if (GST_IS_BUFFER(buffer))
782                         t->bitrate_sum += gst_buffer_get_size (buffer);
783                 idx++;
784         } while (idx < num_buffers);
785
786         QUEUE_DEBUG;
787         GST_TRACE_OBJECT (app, "probetype=%i num_buffers=%i data=% "GST_PTR_FORMAT " size was=%zu bitrate_sum=%zu now=%" GST_TIME_FORMAT " avg at %" GST_TIME_FORMAT " queue properties current-level-bytes=%d current-level-buffers=%d current-level-time=%" GST_TIME_FORMAT "",
788                         info->type, num_buffers, info->data, gst_buffer_get_size (buffer), t->bitrate_sum, GST_TIME_ARGS(now), GST_TIME_ARGS(t->measure_start+BITRATE_AVG_PERIOD), cur_bytes, cur_buf, GST_TIME_ARGS(cur_time));
789         if (now > t->measure_start+BITRATE_AVG_PERIOD)
790         {
791                 gint bitrate = t->bitrate_sum*8/GST_TIME_AS_MSECONDS(BITRATE_AVG_PERIOD);
792                 t->bitrate_avg ? (t->bitrate_avg = (t->bitrate_avg+bitrate)/2) : (t->bitrate_avg = bitrate);
793                 send_signal (app, "tcpBitrate", g_variant_new("(i)", bitrate));
794                 t->measure_start = now;
795                 t->bitrate_sum = 0;
796         }
797         return GST_PAD_PROBE_OK;
798 }
799
800 gboolean upstream_keep_alive (App *app)
801 {
802         GstBuffer *buf = gst_buffer_new_allocate (NULL, TS_PACK_SIZE, NULL);
803         gst_buffer_memset (buf, 0, 0x00, TS_PACK_SIZE);
804         GstPad * srcpad = gst_element_get_static_pad (app->tcp_upstream->tstcpq, "src");
805
806         GstState state;
807         gst_element_get_state (app->tcp_upstream->tcpsink, &state, NULL, 10*GST_SECOND);
808         GST_INFO_OBJECT(app, "tcpsink's state=%s", gst_element_state_get_name (state));
809         gst_element_get_state (app->tcp_upstream->tstcpq, &state, NULL, 10*GST_SECOND);
810         GST_INFO_OBJECT(app, "tstcpq's state=%s", gst_element_state_get_name (state));
811
812         if ( state == GST_STATE_PAUSED )
813         {
814                 GstStateChangeReturn sret = gst_element_set_state (app->tcp_upstream->tcpsink, GST_STATE_PLAYING);
815                 GST_DEBUG_OBJECT(app, "gst_element_set_state (tcpsink, GST_STATE_PLAYING) = %i", sret);
816                 sret = gst_element_set_state (app->tcp_upstream->tstcpq, GST_STATE_PLAYING);
817                 GST_DEBUG_OBJECT(app, "gst_element_set_state (tstcpq, GST_STATE_PLAYING) = %i", sret);
818                 GST_INFO ("injecting keepalive %" GST_PTR_FORMAT " on pad %s:%s", buf, GST_DEBUG_PAD_NAME (srcpad));
819                 gst_pad_push (srcpad, gst_buffer_ref(buf));
820                 sret = gst_element_set_state (app->tcp_upstream->tcpsink, GST_STATE_PAUSED);
821                 GST_DEBUG_OBJECT(app, "gst_element_set_state (tcpsink, GST_STATE_PAUSED) = %i", sret);
822                 sret = gst_element_set_state (app->tcp_upstream->tstcpq, GST_STATE_PAUSED);
823                 GST_DEBUG_OBJECT(app, "gst_element_set_state (tstcpq, GST_STATE_PAUSED) = %i", sret);
824         }
825
826         return G_SOURCE_REMOVE;
827 }
828
829 gboolean upstream_set_waiting (App *app)
830 {
831         DREAMRTSPSERVER_LOCK (app);
832         DreamTCPupstream *t = app->tcp_upstream;
833         t->overrun_counter = 0;
834         t->overrun_period = GST_CLOCK_TIME_NONE;
835         t->state = UPSTREAM_STATE_WAITING;
836         g_object_set (t->tcpsink, "max-lateness", G_GINT64_CONSTANT(1)*GST_SECOND, NULL);
837         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_WAITING));
838         g_signal_connect (t->tstcpq, "underrun", G_CALLBACK (queue_underrun), app);
839         GstPad *sinkpad = gst_element_get_static_pad (t->tcpsink, "sink");
840         if (t->id_resume)
841         {
842                 gst_pad_remove_probe (sinkpad, t->id_resume);
843                 t->id_resume = 0;
844         }
845         if (t->id_bitrate_measure)
846         {
847                 gst_pad_remove_probe (sinkpad, t->id_bitrate_measure);
848                 t->id_bitrate_measure = 0;
849         }
850         send_signal (app, "tcpBitrate", g_variant_new("(i)", 0));
851         gst_object_unref (sinkpad);
852         pause_source_pipeline(app);
853         t->id_signal_waiting = 0;
854         t->id_signal_keepalive = g_timeout_add_seconds (5, (GSourceFunc) upstream_keep_alive, app);
855         DREAMRTSPSERVER_UNLOCK (app);
856         return G_SOURCE_REMOVE;
857 }
858
859 static void queue_underrun (GstElement * queue, gpointer user_data)
860 {
861         App *app = user_data;
862         DreamTCPupstream *t = app->tcp_upstream;
863         QUEUE_DEBUG;
864         GST_DEBUG_OBJECT (app, "queue underrun! properties: current-level-bytes=%d current-level-buffers=%d current-level-time=%" GST_TIME_FORMAT "", cur_bytes, cur_buf, GST_TIME_ARGS(cur_time));
865         if (queue == t->tstcpq && app->rtsp_server->state != RTSP_STATE_RUNNING)
866         {
867                 if (unpause_source_pipeline(app))
868                 {
869                         DREAMRTSPSERVER_LOCK (app);
870 //                      g_object_set (G_OBJECT (t->tstcpq), "leaky", 2, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, NULL);
871                         g_object_set (t->tcpsink, "max-lateness", G_GINT64_CONSTANT(-1), NULL);
872                         g_signal_handlers_disconnect_by_func (queue, G_CALLBACK (queue_underrun), app);
873                         t->id_signal_overrun = g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), app);
874                         t->state = UPSTREAM_STATE_TRANSMITTING;
875                         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_TRANSMITTING));
876                         if (t->id_bitrate_measure == 0)
877                         {
878                                 GstPad *sinkpad = gst_element_get_static_pad (t->tcpsink, "sink");
879                                 t->id_bitrate_measure = gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BUFFER|GST_PAD_PROBE_TYPE_BUFFER_LIST, (GstPadProbeCallback) bitrate_measure_probe, app, NULL);
880                                 gst_object_unref (sinkpad);
881                         }
882                         t->measure_start = gst_clock_get_time (app->clock);
883                         t->bitrate_sum = t->bitrate_avg = 0;
884                         if (t->overrun_period == GST_CLOCK_TIME_NONE)
885                                 t->overrun_period = gst_clock_get_time (app->clock);
886                         DREAMRTSPSERVER_UNLOCK (app);
887                 }
888         }
889 }
890
891 static void queue_overrun (GstElement * queue, gpointer user_data)
892 {
893         App *app = user_data;
894         DreamTCPupstream *t = app->tcp_upstream;
895         DREAMRTSPSERVER_LOCK (app);
896         if (queue == t->tstcpq/* && app->rtsp_server->state != RTSP_STATE_IDLE*/) //!!!TODO
897         {
898                 QUEUE_DEBUG;
899                 GST_DEBUG_OBJECT (app, "% "GST_PTR_FORMAT " overrun! properties: current-level-bytes=%d current-level-buffers=%d current-level-time=%" GST_TIME_FORMAT " rtsp_server->state=%i", queue, cur_bytes, cur_buf, GST_TIME_ARGS(cur_time), app->rtsp_server->state);
900                 GstClockTime now = gst_clock_get_time (app->clock);
901                 if (t->state == UPSTREAM_STATE_CONNECTING)
902                 {
903                         GST_DEBUG_OBJECT (queue, "initial queue overrun after connect");
904 //                      g_object_set (G_OBJECT (t->tstcpq), "leaky", 0, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, "min-threshold-buffers", 0, NULL);
905                         g_signal_handlers_disconnect_by_func(t->tstcpq, G_CALLBACK (queue_overrun), app);
906                         t->id_signal_overrun = 0;
907                         DREAMRTSPSERVER_UNLOCK (app);
908                         upstream_set_waiting (app);
909                         return;
910                 }
911                 else if (t->state == UPSTREAM_STATE_TRANSMITTING)
912                 {
913                         if (t->id_signal_waiting)
914                         {
915                                 g_signal_handlers_disconnect_by_func(t->tstcpq, G_CALLBACK (queue_overrun), app);
916                                 t->id_signal_overrun = 0;
917                                 GST_DEBUG_OBJECT (queue, "disconnect overrun callback and wait for timeout or for buffer flow!");
918                                 DREAMRTSPSERVER_UNLOCK (app);
919                                 return;
920                         }
921                         t->overrun_counter++;
922                         GST_DEBUG_OBJECT (queue, "queue overrun during transmit... %i (max %i) overruns within %" GST_TIME_FORMAT "", t->overrun_counter, MAX_OVERRUNS, GST_TIME_ARGS (now-t->overrun_period));
923                         if (now > t->overrun_period+OVERRUN_TIME)
924                         {
925                                 t->overrun_counter = 0;
926                                 t->overrun_period = now;
927                         }
928                         if (t->overrun_counter >= MAX_OVERRUNS)
929                         {
930                                 if (t->auto_bitrate)
931                                 {
932                                         t->state = UPSTREAM_STATE_ADJUSTING;
933                                         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_OVERLOAD));
934                                         auto_adjust_bitrate (app);
935                                         t->overrun_period = now;
936                                 }
937                                 else
938                                 {
939                                         t->state = UPSTREAM_STATE_OVERLOAD;
940                                         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_OVERLOAD));
941                                         GST_DEBUG_OBJECT (queue, "auto overload handling disabled, go into UPSTREAM_STATE_OVERLOAD");
942                                         if (t->id_signal_waiting)
943                                                 g_source_remove (t->id_signal_waiting);
944                                         t->id_signal_waiting = g_timeout_add_seconds (RESUME_DELAY, (GSourceFunc) upstream_resume_transmitting, app);
945                                 }
946                         }
947                         else
948                         {
949                                 GST_DEBUG_OBJECT (queue, "SET upstream_set_waiting timeout!");
950                                 GstPad *sinkpad = gst_element_get_static_pad (t->tcpsink, "sink");
951                                 t->id_resume = gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BUFFER|GST_PAD_PROBE_TYPE_BUFFER_LIST, (GstPadProbeCallback) cancel_waiting_probe, app, NULL);
952                                 gst_object_unref (sinkpad);
953                                 if (t->id_signal_waiting)
954                                         g_source_remove (t->id_signal_waiting);
955                                 t->id_signal_waiting = g_timeout_add_seconds (5, (GSourceFunc) upstream_set_waiting, app);
956                         }
957                 }
958                 else if (t->state == UPSTREAM_STATE_OVERLOAD)
959                 {
960                         t->overrun_counter++;
961                         if (t->id_signal_waiting)
962                                 g_source_remove (t->id_signal_waiting);
963                         t->id_signal_waiting = g_timeout_add_seconds (5, (GSourceFunc) upstream_resume_transmitting, app);
964                         GST_DEBUG_OBJECT (queue, "still in UPSTREAM_STATE_OVERLOAD overrun_counter=%i, reset resume transmit timeout!", t->overrun_counter);
965                 }
966                 else if (t->state == UPSTREAM_STATE_ADJUSTING)
967                 {
968                         if (now < t->overrun_period+BITRATE_AVG_PERIOD)
969                         {
970                                 GST_DEBUG_OBJECT (queue, "still in grace period, waiting for bitrate adjustment to take effect. %"G_GUINT64_FORMAT" ms remaining", GST_TIME_AS_MSECONDS(t->overrun_period+BITRATE_AVG_PERIOD-now));
971                         }
972                         else
973                         {
974                                 t->overrun_counter++;
975                                 if (t->overrun_counter < MAX_OVERRUNS)
976                                         GST_DEBUG_OBJECT (queue, "still waiting for bitrate adjustment to take effect. overrun_counter=%i", t->overrun_counter);
977                                 else
978                                 {
979                                         GST_DEBUG_OBJECT (queue, "max overruns %i hit again while auto adjusting. -> RE-ADJUST!", t->overrun_counter);
980                                         t->overrun_counter = 0;
981                                         auto_adjust_bitrate (app);
982                                         t->overrun_period = now;
983                                 }
984                         }
985                 }
986         }
987         DREAMRTSPSERVER_UNLOCK (app);
988 }
989
990 gboolean auto_adjust_bitrate(App *app)
991 {
992         DreamTCPupstream *t = app->tcp_upstream;
993         get_source_properties (app);
994         SourceProperties *p = &app->source_properties;
995         GST_DEBUG_OBJECT (app, "auto overload handling: reduce bitrate from audioBitrate=%i videoBitrate=%i to fit network bandwidth=%i kbit/s", p->audioBitrate, p->videoBitrate, t->bitrate_avg);
996         gint newAudioBitrate, newVideoBitrate;
997         if (p->audioBitrate > 96)
998                 p->audioBitrate = p->audioBitrate*0.8;
999         p->videoBitrate = (t->bitrate_avg-newAudioBitrate)*0.8;
1000         GST_INFO_OBJECT (app, "auto overload handling: newAudioBitrate=%i newVideoBitrate=%i newTotalBitrate~%i kbit/s", p->audioBitrate, p->videoBitrate, p->audioBitrate+p->videoBitrate);
1001         apply_source_properties(app);
1002         if (t->id_signal_waiting)
1003                 g_source_remove (t->id_signal_waiting);
1004         t->id_signal_waiting = g_timeout_add_seconds (RESUME_DELAY, (GSourceFunc) upstream_resume_transmitting, app);
1005         t->overrun_counter = 0;
1006 }
1007
1008 static GstFlowReturn handover_payload (GstElement * appsink, gpointer user_data)
1009 {
1010         App *app = user_data;
1011         DreamRTSPserver *r = app->rtsp_server;
1012
1013         GstAppSrc *appsrc = NULL;
1014         if ( appsink == r->vappsink )
1015                 appsrc = GST_APP_SRC(r->es_vappsrc);
1016         else if ( appsink == r->aappsink )
1017                 appsrc = GST_APP_SRC(r->es_aappsrc);
1018         else if ( appsink == r->tsappsink )
1019                 appsrc = GST_APP_SRC(r->ts_appsrc);
1020
1021         GstSample *sample = gst_app_sink_pull_sample (GST_APP_SINK (appsink));
1022         if (appsrc && g_list_length(r->clients_list) > 0) {
1023                 GstBuffer *buffer = gst_sample_get_buffer (sample);
1024                 GstCaps *caps = gst_sample_get_caps (sample);
1025
1026                 GST_LOG_OBJECT(appsink, "% "GST_PTR_FORMAT" @ %"GST_PTR_FORMAT"", buffer, appsrc);
1027                 if (r->rtsp_start_pts == GST_CLOCK_TIME_NONE) {
1028                         if (GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT))
1029                         {
1030                                 GST_LOG("GST_BUFFER_FLAG_DELTA_UNIT dropping!");
1031                                 gst_sample_unref(sample);
1032                                 DREAMRTSPSERVER_UNLOCK (app);
1033                                 return GST_FLOW_OK;
1034                         }
1035                         else if (appsink == r->vappsink || appsink == r->tsappsink)
1036                         {
1037                                 DREAMRTSPSERVER_LOCK (app);
1038                                 r->rtsp_start_pts = GST_BUFFER_PTS (buffer);
1039                                 r->rtsp_start_dts = GST_BUFFER_DTS (buffer);
1040                                 GST_LOG_OBJECT(appsink, "frame is IFRAME! set rtsp_start_pts=%" GST_TIME_FORMAT " rtsp_start_dts=%" GST_TIME_FORMAT " @ %"GST_PTR_FORMAT"", GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), GST_TIME_ARGS (GST_BUFFER_DTS (buffer)), appsrc);
1041                                 DREAMRTSPSERVER_UNLOCK (app);
1042                         }
1043                 }
1044                 if (GST_BUFFER_PTS (buffer) < r->rtsp_start_pts)
1045                         GST_BUFFER_PTS (buffer) = 0;
1046                 else
1047                         GST_BUFFER_PTS (buffer) -= r->rtsp_start_pts;
1048                 GST_BUFFER_DTS (buffer) -= r->rtsp_start_dts;
1049                 //    GST_LOG("new PTS %" GST_TIME_FORMAT " DTS %" GST_TIME_FORMAT "", GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), GST_TIME_ARGS (GST_BUFFER_DTS (buffer)));
1050
1051                 GstCaps *oldcaps;
1052
1053                 oldcaps = gst_app_src_get_caps (appsrc);
1054                 if (!oldcaps || !gst_caps_is_equal (oldcaps, caps))
1055                 {
1056                         GST_DEBUG("CAPS changed! %" GST_PTR_FORMAT " to %" GST_PTR_FORMAT, oldcaps, caps);
1057                         gst_app_src_set_caps (appsrc, caps);
1058                 }
1059                 gst_app_src_push_buffer (appsrc, gst_buffer_ref(buffer));
1060         }
1061         else
1062         {
1063                 if ( gst_debug_category_get_threshold (dreamrtspserver_debug) >= GST_LEVEL_LOG)
1064                         GST_TRACE_OBJECT(appsink, "no rtsp clients, discard payload!");
1065 //              else
1066 //                      g_print (".");
1067         }
1068         gst_sample_unref (sample);
1069
1070         return GST_FLOW_OK;
1071 }
1072
1073 gboolean assert_state(App *app, GstElement *element, GstState state)
1074 {
1075         GstStateChangeReturn sret;
1076         GST_DEBUG_OBJECT(app, "setting %" GST_PTR_FORMAT"'s state to %s", element, gst_element_state_get_name (state));
1077         GstState pipeline_state, current_state;
1078         sret = gst_element_set_state (element, state);
1079         gchar *elementname = gst_element_get_name(element);
1080         gchar *dotfilename = g_strdup_printf ("assert_state_%s_to_%s_%i", elementname,gst_element_state_get_name (state), sret);
1081         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline), GST_DEBUG_GRAPH_SHOW_ALL, dotfilename);
1082         g_free (elementname);
1083         switch (sret) {
1084                 case GST_STATE_CHANGE_SUCCESS:
1085                 {
1086                         GST_DEBUG_OBJECT(app, "GST_STATE_CHANGE_SUCCESS on setting %" GST_PTR_FORMAT"'s state to %s ", element, gst_element_state_get_name (state));
1087                         return TRUE;
1088                 }
1089                 case GST_STATE_CHANGE_FAILURE:
1090                 {
1091                         GST_ERROR_OBJECT (app, "GST_STATE_CHANGE_FAILURE when trying to change %" GST_PTR_FORMAT"'s state to %s", element, gst_element_state_get_name (state));
1092                         return FALSE;
1093                 }
1094                 case GST_STATE_CHANGE_ASYNC:
1095                 {
1096                         watchdog_ping (app);
1097                         gboolean fail = FALSE;
1098                         GST_LOG_OBJECT(app, "GST_STATE_CHANGE_ASYNC %" GST_PTR_FORMAT"", element);
1099                         GstClockTime timeout = /*(element == app->pipeline) ?*/ 4 * GST_SECOND/* : GST_MSECOND*/;
1100                         if (element == GST_ELEMENT(app->pipeline))
1101                         {
1102                                 gst_element_get_state (GST_ELEMENT(app->pipeline), &pipeline_state, NULL, timeout);
1103                                 GST_LOG_OBJECT(app, "GST_STATE_CHANGE_ASYNC got pipeline_state=%s", gst_element_state_get_name (state));
1104                                 if (pipeline_state != state)
1105                                 {
1106                                         GValue item = G_VALUE_INIT;
1107                                         GstIterator* iter = gst_bin_iterate_elements(GST_BIN(app->pipeline));
1108                                         while (GST_ITERATOR_OK == gst_iterator_next(iter, (GValue*)&item))
1109                                         {
1110                                                 GstElement *elem = g_value_get_object(&item);
1111                                                 GstElementFactory *factory = gst_element_get_factory (elem);
1112                                                 gst_element_get_state (elem, &current_state, NULL, GST_MSECOND);
1113                                                 if (current_state != state && (element == app->pipeline || element == elem))
1114                                                 {
1115                                                         if (element == app->pipeline)
1116                                                         {
1117                                                                 if (gst_element_factory_list_is_type (factory, GST_ELEMENT_FACTORY_TYPE_SINK))
1118                                                                         GST_LOG_OBJECT(app, "GST_STATE_CHANGE_ASYNC %" GST_PTR_FORMAT"'s state=%s (this is a sink element, so don't worry...)", elem, gst_element_state_get_name (current_state));
1119                                                                 else
1120                                                                 {
1121                                                                         GST_WARNING_OBJECT(app, "GST_STATE_CHANGE_ASYNC %" GST_PTR_FORMAT"'s state=%s -> FAIL", elem, gst_element_state_get_name (current_state));
1122                                                                         fail = TRUE;
1123                                                                 }
1124                                                         }
1125                                                 }
1126                                                 else
1127                                                         GST_LOG_OBJECT(app, "GST_STATE_CHANGE_ASYNC %" GST_PTR_FORMAT"'s state=%s", elem, gst_element_state_get_name (current_state));
1128                                         }
1129                                         gst_iterator_free(iter);
1130                                         if (fail)
1131                                         {
1132                                                 GST_ERROR_OBJECT (app, "pipeline didn't complete GST_STATE_CHANGE_ASYNC to %s within %" GST_TIME_FORMAT ", currently in %s", gst_element_state_get_name (state), GST_TIME_ARGS(timeout), gst_element_state_get_name (pipeline_state));
1133                                                 return FALSE;
1134                                         }
1135                                 }
1136                         }
1137                         else
1138                         {
1139                                 gst_element_get_state (element, &current_state, NULL, timeout);
1140                                 if (current_state == state)
1141                                         GST_LOG_OBJECT(app, "GST_STATE_CHANGE_ASYNC on setting %" GST_PTR_FORMAT"'s state to %s SUCCESSFUL!", element, gst_element_state_get_name (state));
1142                                 else
1143                                 {
1144                                         GST_WARNING_OBJECT(app, "GST_STATE_CHANGE_ASYNC on setting %" GST_PTR_FORMAT"'s state to %s failed! now in %s", element, gst_element_state_get_name (state), gst_element_state_get_name (current_state));
1145                                         return FALSE;
1146                                 }
1147                         }
1148                         return TRUE;
1149                 }
1150                 case GST_STATE_CHANGE_NO_PREROLL:
1151                         GST_WARNING_OBJECT(app, "GST_STATE_CHANGE_NO_PREROLL on setting %" GST_PTR_FORMAT"'s state to %s ", element, gst_element_state_get_name (state));
1152                         break;
1153                 default:
1154                         break;
1155         }
1156         return TRUE;
1157 }
1158
1159 void assert_tsmux(App *app)
1160 {
1161         if (app->tsmux)
1162                 return;
1163
1164         GST_DEBUG_OBJECT (app, "inserting tsmux");
1165
1166         app->tsmux = gst_element_factory_make ("mpegtsmux", NULL);
1167         gst_bin_add (GST_BIN (app->pipeline), app->tsmux);
1168
1169         GstPad *sinkpad, *srcpad;
1170         GstPadLinkReturn ret;
1171
1172         srcpad = gst_element_get_static_pad (app->aq, "src");
1173         sinkpad = gst_element_get_compatible_pad (app->tsmux, srcpad, NULL);
1174         ret = gst_pad_link (srcpad, sinkpad);
1175         if (ret != GST_PAD_LINK_OK)
1176                 g_error ("couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", srcpad, sinkpad);
1177         gst_object_unref (srcpad);
1178         gst_object_unref (sinkpad);
1179
1180         srcpad = gst_element_get_static_pad (app->vq, "src");
1181         sinkpad = gst_element_get_compatible_pad (app->tsmux, srcpad, NULL);
1182         ret = gst_pad_link (srcpad, sinkpad);
1183         if (ret != GST_PAD_LINK_OK)
1184                 g_error ("couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", srcpad, sinkpad);
1185         gst_object_unref (srcpad);
1186         gst_object_unref (sinkpad);
1187
1188         if (!gst_element_link (app->tsmux, app->tstee))
1189                 g_error ("couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", app->tsmux, app->tstee);
1190 }
1191
1192 gboolean create_source_pipeline(App *app)
1193 {
1194         GST_INFO_OBJECT(app, "create_source_pipeline");
1195         DREAMRTSPSERVER_LOCK (app);
1196         app->pipeline = gst_pipeline_new ("dreamrtspserver_source_pipeline");
1197
1198         GstBus *bus = gst_pipeline_get_bus (GST_PIPELINE (app->pipeline));
1199         gst_bus_add_signal_watch (bus);
1200         g_signal_connect (G_OBJECT (bus), "message", G_CALLBACK (message_cb), app);
1201         gst_object_unref (GST_OBJECT (bus));
1202
1203         app->asrc = gst_element_factory_make ("dreamaudiosource", "dreamaudiosource0");
1204         app->vsrc = gst_element_factory_make ("dreamvideosource", "dreamvideosource0");
1205
1206         app->aparse = gst_element_factory_make ("aacparse", NULL);
1207         app->vparse = gst_element_factory_make ("h264parse", NULL);
1208
1209         app->atee = gst_element_factory_make ("tee", "atee");
1210         app->vtee = gst_element_factory_make ("tee", "vtee");
1211         app->tstee = gst_element_factory_make ("tee", "tstee");
1212
1213         app->aq = gst_element_factory_make ("queue", "aqueue");
1214         app->vq = gst_element_factory_make ("queue", "vqueue");
1215
1216         app->tsmux = gst_element_factory_make ("mpegtsmux", NULL);
1217         app->tstee = gst_element_factory_make ("tee", "tstee");
1218
1219         if (!(app->asrc && app->vsrc && app->aparse && app->vparse && app->aq && app->vq && app->atee && app->vtee && app->tsmux && app->tstee))
1220         {
1221                 g_error ("Failed to create source pipeline element(s):%s%s%s%s%s%s%s%s%s", app->asrc?"":" dreamaudiosource", app->vsrc?"":" dreamvideosource", app->aparse?"":" aacparse",
1222                         app->vparse?"":" h264parse", app->aq?"":" aqueue", app->vq?"":" vqueue", app->atee?"":" atee", app->vtee?"":" vtee", app->tsmux?"":"  mpegtsmux");
1223         }
1224         gst_object_unref(app->tsmux);
1225         app->tsmux = NULL;
1226
1227         if (!(app->asrc && app->vsrc && app->aparse && app->vparse))
1228         {
1229                 g_error ("Failed to create source pipeline element(s):%s%s%s%s", app->asrc?"":" dreamaudiosource", app->vsrc?"":" dreamvideosource", app->aparse?"":" aacparse", app->vparse?"":" h264parse");
1230         }
1231
1232         GstElement *appsink, *appsrc, *vpay, *apay, *udpsrc;
1233         appsink = gst_element_factory_make ("appsink", NULL);
1234         appsrc = gst_element_factory_make ("appsrc", NULL);
1235         vpay = gst_element_factory_make ("rtph264pay", NULL);
1236         apay = gst_element_factory_make ("rtpmp4apay", NULL);
1237         udpsrc = gst_element_factory_make ("udpsrc", NULL);
1238
1239         if (!(appsink && appsrc && vpay && apay && udpsrc))
1240                 g_error ("Failed to create rtsp element(s):%s%s%s%s%s", appsink?"":" appsink", appsrc?"":" appsrc", vpay?"": "rtph264pay", apay?"":" rtpmp4apay", udpsrc?"":" udpsrc" );
1241         else
1242         {
1243                 gst_object_unref (appsink);
1244                 gst_object_unref (appsrc);
1245                 gst_object_unref (vpay);
1246                 gst_object_unref (apay);
1247                 gst_object_unref (udpsrc);
1248         }
1249
1250         gst_bin_add_many (GST_BIN (app->pipeline), app->asrc, app->aparse, app->atee, app->aq, NULL);
1251         gst_bin_add_many (GST_BIN (app->pipeline), app->vsrc, app->vparse, app->vtee, app->vq, NULL);
1252         gst_bin_add (GST_BIN (app->pipeline), app->tstee);
1253         gst_element_link_many (app->asrc, app->aparse, app->atee, NULL);
1254         gst_element_link_many (app->vsrc, app->vparse, app->vtee, NULL);
1255
1256         GstPad *teepad, *sinkpad;
1257         GstPadLinkReturn ret;
1258
1259         teepad = gst_element_get_request_pad (app->atee, "src_%u");
1260         sinkpad = gst_element_get_static_pad (app->aq, "sink");
1261         ret = gst_pad_link (teepad, sinkpad);
1262         if (ret != GST_PAD_LINK_OK)
1263
1264         if (ret != GST_PAD_LINK_OK)
1265         {
1266                 GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
1267                 return FALSE;
1268         }
1269         gst_object_unref (teepad);
1270         gst_object_unref (sinkpad);
1271
1272         teepad = gst_element_get_request_pad (app->vtee, "src_%u");
1273         sinkpad = gst_element_get_static_pad (app->vq, "sink");
1274         ret = gst_pad_link (teepad, sinkpad);
1275
1276         if (ret != GST_PAD_LINK_OK)
1277         {
1278                 GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
1279                 return FALSE;
1280         }
1281
1282         gst_object_unref (teepad);
1283         gst_object_unref (sinkpad);
1284
1285         app->clock = gst_system_clock_obtain();
1286         gst_pipeline_use_clock(GST_PIPELINE (app->pipeline), app->clock);
1287
1288         apply_source_properties(app);
1289
1290         g_signal_connect (app->asrc, "signal-lost", G_CALLBACK (encoder_signal_lost), app);
1291
1292         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"create_source_pipeline");
1293         DREAMRTSPSERVER_UNLOCK (app);
1294         return TRUE;
1295 }
1296
1297 static void encoder_signal_lost (GstElement *dreamaudiosource, gpointer user_data)
1298 {
1299         App *app = user_data;
1300         GST_INFO_OBJECT (dreamaudiosource, "lost encoder signal!");
1301 }
1302
1303 static GstPadProbeReturn inject_authorization (GstPad * sinkpad, GstPadProbeInfo * info, gpointer user_data)
1304 {
1305         App *app = user_data;
1306
1307         GstBuffer *token_buf = gst_buffer_new_wrapped (app->tcp_upstream->token, TOKEN_LEN);
1308         GstPad * srcpad = gst_element_get_static_pad (app->tcp_upstream->tstcpq, "src");
1309
1310         GST_INFO ("injecting authorization on pad %s:%s, created token_buf %" GST_PTR_FORMAT "", GST_DEBUG_PAD_NAME (sinkpad), token_buf);
1311         gst_pad_remove_probe (sinkpad, info->id);
1312         gst_pad_push (srcpad, gst_buffer_ref(token_buf));
1313
1314         return GST_PAD_PROBE_REMOVE;
1315 }
1316
1317 gboolean enable_tcp_upstream(App *app, const gchar *upstream_host, guint32 upstream_port, const gchar *token)
1318 {
1319         GST_DEBUG_OBJECT(app, "enable_tcp_upstream host=%s port=%i token=%s", upstream_host, upstream_port, token);
1320
1321         if (!app->pipeline)
1322         {
1323                 GST_ERROR_OBJECT (app, "failed to enable upstream because source pipeline is NULL!");
1324                 goto fail;
1325         }
1326
1327         DreamTCPupstream *t = app->tcp_upstream;
1328
1329         if (t->state == UPSTREAM_STATE_DISABLED)
1330         {
1331                 assert_tsmux (app);
1332                 DREAMRTSPSERVER_LOCK (app);
1333
1334                 t->id_signal_overrun = 0;
1335                 t->id_signal_waiting = 0;
1336                 t->id_signal_keepalive = 0;
1337                 t->id_bitrate_measure = 0;
1338                 t->id_resume = 0;
1339                 t->state = UPSTREAM_STATE_CONNECTING;
1340                 send_signal (app, "upstreamStateChanged", g_variant_new("(i)", t->state));
1341
1342                 t->tstcpq  = gst_element_factory_make ("queue", "tstcpqueue");
1343                 t->tcpsink = gst_element_factory_make ("tcpclientsink", NULL);
1344
1345                 if (!(t->tstcpq && t->tcpsink ))
1346                         g_error ("Failed to create tcp upstream element(s):%s%s", t->tstcpq?"":"  ts queue", t->tcpsink?"":"  tcpclientsink" );
1347
1348                 g_object_set (G_OBJECT (t->tstcpq), "leaky", 2, "max-size-buffers", 400, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(0), NULL);
1349
1350                 t->id_signal_overrun = g_signal_connect (t->tstcpq, "overrun", G_CALLBACK (queue_overrun), app);
1351                 GST_TRACE_OBJECT (app, "installed %" GST_PTR_FORMAT " overrun handler id=%lu", t->tstcpq, t->id_signal_overrun);
1352
1353                 g_object_set (t->tcpsink, "max-lateness", G_GINT64_CONSTANT(3)*GST_SECOND, NULL);
1354                 g_object_set (t->tcpsink, "blocksize", BLOCK_SIZE, NULL);
1355
1356                 g_object_set (t->tcpsink, "host", upstream_host, NULL);
1357                 g_object_set (t->tcpsink, "port", upstream_port, NULL);
1358                 gchar *check_host;
1359                 guint32 check_port;
1360                 g_object_get (t->tcpsink, "host", &check_host, NULL);
1361                 g_object_get (t->tcpsink, "port", &check_port, NULL);
1362                 if (g_strcmp0 (upstream_host, check_host))
1363                 {
1364                         g_free (check_host);
1365                         GST_ERROR_OBJECT (app, "couldn't set upstream_host %s", upstream_host);
1366                         goto fail;
1367                 }
1368                 if (upstream_port != check_port)
1369                 {
1370                         GST_ERROR_OBJECT (app, "couldn't set upstream_port %d", upstream_port);
1371                         goto fail;
1372                 }
1373                 g_free (check_host);
1374
1375                 GstStateChangeReturn sret = gst_element_set_state (t->tcpsink, GST_STATE_READY);
1376                 if (sret == GST_STATE_CHANGE_FAILURE)
1377                 {
1378                         GST_ERROR_OBJECT (app, "failed to set tcpsink to GST_STATE_READY. %s:%d probably refused connection", upstream_host, upstream_port);
1379                         gst_object_unref (t->tstcpq);
1380                         gst_object_unref (t->tcpsink);
1381                         t->state = UPSTREAM_STATE_DISABLED;
1382                         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", t->state));
1383                         DREAMRTSPSERVER_UNLOCK (app);
1384                         return FALSE;
1385                 }
1386
1387                 gst_bin_add_many (GST_BIN(app->pipeline), t->tstcpq, t->tcpsink, NULL);
1388                 if (!gst_element_link (t->tstcpq, t->tcpsink)) {
1389                         GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", t->tstcpq, t->tcpsink);
1390                         goto fail;
1391                 }
1392
1393 //              if (!assert_state (app, t->tcpsink, GST_STATE_PLAYING) || !assert_state (app, t->tstcpq, GST_STATE_PLAYING))
1394 //                      goto fail;
1395
1396                 GstPadLinkReturn ret;
1397                 GstPad *srcpad, *sinkpad;
1398                 srcpad = gst_element_get_request_pad (app->tstee, "src_%u");
1399                 sinkpad = gst_element_get_static_pad (t->tstcpq, "sink");
1400                 ret = gst_pad_link (srcpad, sinkpad);
1401                 gst_object_unref (srcpad);
1402                 gst_object_unref (sinkpad);
1403                 if (ret != GST_PAD_LINK_OK)
1404                 {
1405                         GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", srcpad, sinkpad);
1406                         goto fail;
1407                 }
1408
1409                 if (strlen(token))
1410                 {
1411                         sinkpad = gst_element_get_static_pad (t->tcpsink, "sink");
1412                         strcpy(t->token, token);
1413                         gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BUFFER|GST_PAD_PROBE_TYPE_BUFFER_LIST, (GstPadProbeCallback) inject_authorization, app, NULL);
1414                         gst_object_unref (sinkpad);
1415                 }
1416                 else
1417                         GST_DEBUG_OBJECT (app, "no token specified!");
1418
1419                 if (!assert_state (app, app->pipeline, GST_STATE_PLAYING))
1420                 {
1421                         GST_ERROR_OBJECT (app, "GST_STATE_CHANGE_FAILURE for TCP upstream");
1422                         return FALSE;
1423                 }
1424                 GST_INFO_OBJECT(app, "enabled TCP upstream! upstreamState = UPSTREAM_STATE_CONNECTING");
1425                 DREAMRTSPSERVER_UNLOCK (app);
1426                 return TRUE;
1427         }
1428         else
1429                 GST_INFO_OBJECT (app, "tcp upstream already enabled! (upstreamState = %i)", t->state);
1430         return FALSE;
1431
1432 fail:
1433         DREAMRTSPSERVER_UNLOCK (app);
1434         disable_tcp_upstream(app);
1435         return FALSE;
1436 }
1437
1438 gboolean
1439 _delete_dir_recursively (GFile *directory, GError **error)
1440 {
1441         GFileEnumerator *children = NULL;
1442         GFileInfo *info;
1443         gboolean ret = FALSE;
1444         children = g_file_enumerate_children (directory, G_FILE_ATTRIBUTE_STANDARD_NAME "," G_FILE_ATTRIBUTE_STANDARD_TYPE, 0, NULL, error);
1445         if (children == NULL || error)
1446                 goto out;
1447         info = g_file_enumerator_next_file (children, NULL, error);
1448         while (info || error) {
1449                 GFile *child;
1450                 const char *name;
1451                 GFileType type;
1452                 if (error)
1453                         goto out;
1454                 name = g_file_info_get_name (info);
1455                 child = g_file_get_child (directory, name);
1456                 type = g_file_info_get_file_type (info);
1457                 GST_LOG ("delete %s", name);
1458                 if (type == G_FILE_TYPE_DIRECTORY)
1459                         ret = _delete_dir_recursively (child, error);
1460                 else if (type == G_FILE_TYPE_REGULAR)
1461                         ret = g_file_delete (child, NULL, error);
1462                 g_object_unref (info);
1463                 g_object_unref (child);
1464                 if (!ret)
1465                         goto out;
1466                 info = g_file_enumerator_next_file (children, NULL, error);
1467         }
1468         ret = TRUE;
1469         g_file_delete (directory, NULL, error);
1470
1471 out:
1472         if (children)
1473                 g_object_unref (children);
1474         return ret;
1475 }
1476
1477 gboolean hls_client_timeout (gpointer user_data)
1478 {
1479         App *app = user_data;
1480         if (app->hls_server)
1481         {
1482                 GST_INFO_OBJECT(app, "HLS clients stopped downloading, stopping hls pipeline!");
1483                 stop_hls_pipeline (app);
1484         }
1485         return FALSE;
1486 }
1487
1488 static void
1489 soup_do_get (SoupServer *server, SoupMessage *msg, const char *path, App *app)
1490 {
1491         char *slash;
1492         gchar *hlspath = NULL;
1493         guint status_code = SOUP_STATUS_NONE;
1494         struct stat st;
1495
1496         if (path)
1497         {
1498                 if (strlen(path) < 1)
1499                         status_code = SOUP_STATUS_BAD_REQUEST;
1500                 if (strlen(path) == 1)
1501                         status_code = SOUP_STATUS_MOVED_PERMANENTLY;
1502                 else
1503                         hlspath = g_strdup_printf ("%s%s", HLS_PATH, path);
1504         }
1505         if (app->hls_server->state == HLS_STATE_IDLE && g_strcmp0 (path+1, HLS_PLAYLIST_NAME) == 0)
1506         {
1507                 DREAMRTSPSERVER_LOCK (app);
1508                 GST_INFO_OBJECT (server, "client requested '%s' but we're idle... start pipeline!", path+1);
1509                 if (!start_hls_pipeline (app))
1510                         status_code = SOUP_STATUS_INTERNAL_SERVER_ERROR;
1511                 else
1512                 {
1513                         GstState state;
1514                         gst_element_get_state (GST_ELEMENT(app->pipeline), &state, NULL, HLS_FRAGMENT_DURATION);
1515                         g_usleep (G_USEC_PER_SEC * (HLS_FRAGMENT_DURATION+1));
1516                         app->hls_server->state = HLS_STATE_RUNNING;
1517                         send_signal (app, "hlsStateChanged", g_variant_new("(i)", HLS_STATE_RUNNING));
1518                 }
1519                 DREAMRTSPSERVER_UNLOCK (app);
1520         }
1521         else if (status_code == SOUP_STATUS_NONE && stat (hlspath, &st) == -1) {
1522                 if (errno == EPERM)
1523                         status_code = SOUP_STATUS_FORBIDDEN;
1524                 else if (errno == ENOENT)
1525                         status_code = SOUP_STATUS_NOT_FOUND;
1526                 else
1527                         status_code = SOUP_STATUS_INTERNAL_SERVER_ERROR;
1528         }
1529         else if (S_ISDIR (st.st_mode))
1530                 status_code = SOUP_STATUS_SERVICE_UNAVAILABLE;
1531
1532         if (status_code == SOUP_STATUS_MOVED_PERMANENTLY)
1533         {
1534                 GST_LOG_OBJECT (server, "client requested /, redirect to %s", HLS_PLAYLIST_NAME);
1535                 soup_message_set_redirect (msg, status_code, HLS_PLAYLIST_NAME);
1536                 return;
1537         }
1538         else if (status_code != SOUP_STATUS_NONE)
1539         {
1540                 GST_WARNING_OBJECT (server, "client requested '%s', error serving '%s', http status code %i", path, hlspath ? hlspath : "", status_code);
1541                 g_free (hlspath);
1542                 soup_message_set_status (msg, status_code);
1543                 return;
1544         }
1545
1546         GST_INFO_OBJECT (server, "client requests '%s', serving '%s'...", path, hlspath);
1547
1548         if (msg->method == SOUP_METHOD_GET) {
1549                 GMappedFile *mapping;
1550                 SoupBuffer *buffer;
1551
1552                 mapping = g_mapped_file_new (hlspath, FALSE, NULL);
1553                 if (!mapping) {
1554                         soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
1555                         return;
1556                 }
1557
1558                 if (g_strrstr (hlspath, ".ts"))
1559                         soup_message_headers_set_content_type (msg->response_headers, "video/MP2T", NULL);
1560                 else
1561                 {
1562                         GstState state;
1563                         gst_element_get_state (app->asrc, &state, NULL, GST_MSECOND);
1564                         if (state != GST_STATE_PLAYING)
1565                         {
1566                                 assert_tsmux (app);
1567                                 if (!assert_state (app, app->pipeline, GST_STATE_PLAYING))
1568                                 {
1569                                         soup_message_set_status (msg, SOUP_STATUS_BAD_GATEWAY);
1570                                         g_free (hlspath);
1571                                         return;
1572                                 }
1573                         }
1574                         soup_message_headers_set_content_type (msg->response_headers, "application/x-mpegURL", NULL);
1575                 }
1576                 if (app->hls_server->id_timeout)
1577                         g_source_remove (app->hls_server->id_timeout);
1578                 app->hls_server->id_timeout = g_timeout_add_seconds (5*HLS_FRAGMENT_DURATION, (GSourceFunc) hls_client_timeout, app);
1579
1580                 buffer = soup_buffer_new_with_owner (g_mapped_file_get_contents (mapping),
1581                                                      g_mapped_file_get_length (mapping),
1582                                                      mapping, (GDestroyNotify)g_mapped_file_unref);
1583                 soup_message_body_append_buffer (msg->response_body, buffer);
1584                 soup_buffer_free (buffer);
1585         }
1586         g_free (hlspath);
1587         soup_message_set_status (msg, SOUP_STATUS_OK);
1588 }
1589
1590 static gboolean
1591 soup_server_auth_callback (SoupAuthDomain *domain, SoupMessage *msg, const char *username, const char *password, gpointer user_data)
1592 {
1593         App *app = (App *) user_data;
1594         DreamHLSserver *h = app->hls_server;
1595         if (g_strcmp0(h->hls_user, username) == 0 && strcmp(h->hls_pass, password) == 0)
1596         {
1597                 GST_TRACE_OBJECT (app->hls_server, "authenticated request with credentials %s:%s", username, password);
1598                 return TRUE;
1599         }
1600         else
1601                 GST_WARNING_OBJECT (app->hls_server, "denied authentication request with credentials %s:%s", username, password); 
1602         return FALSE;
1603 }
1604
1605 static void
1606 soup_server_callback (SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *context, gpointer data)
1607 {
1608         GST_TRACE_OBJECT (server, "%s %s HTTP/1.%d", msg->method, path, soup_message_get_http_version (msg));
1609         if (msg->method == SOUP_METHOD_GET)
1610                 soup_do_get (server, msg, path, (App *) data);
1611         else
1612                 soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
1613         GST_TRACE_OBJECT (server, "  -> %d %s", msg->status_code, msg->reason_phrase);
1614 }
1615
1616 static GstPadProbeReturn hls_pad_probe_unlink_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1617 {
1618         App *app = user_data;
1619         DreamHLSserver *h = app->hls_server;
1620
1621         GstElement *element = gst_pad_get_parent_element(pad);
1622
1623         GST_DEBUG_OBJECT (pad, "unlink... %" GST_PTR_FORMAT " and % "GST_PTR_FORMAT, element, h->hlssink);
1624
1625         GstPad *teepad;
1626         teepad = gst_pad_get_peer(pad);
1627         gst_pad_unlink (teepad, pad);
1628
1629         GstElement *tee = gst_pad_get_parent_element(teepad);
1630         gst_element_release_request_pad (tee, teepad);
1631         gst_object_unref (teepad);
1632         gst_object_unref (tee);
1633
1634         gst_element_unlink (element, h->hlssink);
1635
1636         GST_DEBUG_OBJECT (pad, "remove... %" GST_PTR_FORMAT " and % "GST_PTR_FORMAT, element, h->hlssink);
1637
1638         gst_bin_remove_many (GST_BIN (app->pipeline), element, h->hlssink, NULL);
1639
1640         GST_DEBUG_OBJECT (pad, "set state null %" GST_PTR_FORMAT " and % "GST_PTR_FORMAT, element, h->hlssink);
1641
1642         gst_element_set_state (h->hlssink, GST_STATE_NULL);
1643         gst_element_set_state (element, GST_STATE_NULL);
1644
1645         GST_DEBUG_OBJECT (pad, "unref.... %" GST_PTR_FORMAT " and % "GST_PTR_FORMAT, element, h->hlssink);
1646
1647         gst_object_unref (element);
1648         gst_object_unref (h->hlssink);
1649         h->queue = NULL;
1650         h->hlssink = NULL;
1651
1652         if (h->id_timeout)
1653                 g_source_remove (h->id_timeout);
1654
1655         if (app->tcp_upstream->state == UPSTREAM_STATE_DISABLED && g_list_length (app->rtsp_server->clients_list) == 0)
1656                 halt_source_pipeline(app);
1657
1658         GST_INFO ("HLS server unlinked!");
1659
1660         return GST_PAD_PROBE_REMOVE;
1661 }
1662
1663 gboolean stop_hls_pipeline(App *app)
1664 {
1665         GST_INFO_OBJECT(app, "stop_hls_pipeline");
1666         DreamHLSserver *h = app->hls_server;
1667         if (h->state == HLS_STATE_RUNNING)
1668         {
1669                 DREAMRTSPSERVER_LOCK (app);
1670                 h->state = HLS_STATE_IDLE;
1671                 send_signal (app, "hlsStateChanged", g_variant_new("(i)", HLS_STATE_IDLE));
1672                 gst_object_ref (h->queue);
1673                 gst_object_ref (h->hlssink);
1674                 GstPad *sinkpad;
1675                 sinkpad = gst_element_get_static_pad (h->queue, "sink");
1676                 gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, hls_pad_probe_unlink_cb, app, NULL);
1677                 gst_object_unref (sinkpad);
1678                 DREAMRTSPSERVER_UNLOCK (app);
1679                 GST_INFO("hls server pipeline stopped, set HLS_STATE_IDLE");
1680                 return TRUE;
1681         }
1682         else
1683                 GST_INFO("hls server wasn't in HLS_STATE_RUNNING... can't stop");
1684         return FALSE;
1685 }
1686
1687 gboolean disable_hls_server(App *app)
1688 {
1689         GST_INFO_OBJECT(app, "disable_hls_server");
1690         DreamHLSserver *h = app->hls_server;
1691         if (h->state == HLS_STATE_RUNNING)
1692                 stop_hls_pipeline (app);
1693         if (h->state == HLS_STATE_IDLE)
1694         {
1695                 DREAMRTSPSERVER_LOCK (app);
1696                 soup_server_quit (h->soupserver);
1697                 if (h->soupauthdomain)
1698                 {
1699                         g_object_unref (h->soupauthdomain);
1700                         g_free(h->hls_user);
1701                         g_free(h->hls_pass);
1702                 }
1703                 g_object_unref (h->soupserver);
1704                 GFile *tmp_dir_file = g_file_new_for_path (HLS_PATH);
1705                 _delete_dir_recursively (tmp_dir_file, NULL);
1706                 g_object_unref (tmp_dir_file);
1707                 h->state = HLS_STATE_DISABLED;
1708                 send_signal (app, "hlsStateChanged", g_variant_new("(i)", HLS_STATE_DISABLED));
1709                 DREAMRTSPSERVER_UNLOCK (app);
1710                 GST_INFO("hls soupserver unref'ed, set HLS_STATE_DISABLED");
1711                 return TRUE;
1712         }
1713         else
1714                 GST_INFO("hls server in wrong state... can't disable");
1715         return FALSE;
1716 }
1717
1718 static GstPadProbeReturn _detect_keyframes_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1719 {
1720         App *app = user_data;
1721         GstBuffer *buffer;
1722         guint idx = 0, num_buffers = 1;
1723         do {
1724                 if (info->type & GST_PAD_PROBE_TYPE_BUFFER)
1725                 {
1726                         buffer = GST_PAD_PROBE_INFO_BUFFER (info);
1727                 }
1728                 else if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)
1729                 {
1730                         GstBufferList *bufferlist = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
1731                         num_buffers = gst_buffer_list_length (bufferlist);
1732                         buffer = gst_buffer_list_get (bufferlist, idx);
1733                 }
1734                 if (GST_IS_BUFFER(buffer) && !GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT))
1735                         GST_INFO_OBJECT (app, "KEYFRAME %" GST_PTR_FORMAT " detected @ %" GST_PTR_FORMAT " num_buffers=%i", buffer, pad, num_buffers);
1736                 idx++;
1737         } while (idx < num_buffers);
1738         return GST_PAD_PROBE_OK;
1739 }
1740
1741 gboolean enable_hls_server(App *app, guint port, const gchar *user, const gchar *pass)
1742 {
1743         GST_INFO_OBJECT(app, "enable_hls_server port=%i user=%s pass=%s", port, user, pass);
1744         if (!app->pipeline)
1745         {
1746                 GST_ERROR_OBJECT (app, "failed to enable hls server because source pipeline is NULL!");
1747                 return FALSE;
1748         }
1749
1750         DREAMRTSPSERVER_LOCK (app);
1751         DreamHLSserver *h = app->hls_server;
1752
1753         if (h->state == HLS_STATE_DISABLED)
1754         {
1755                 int r = mkdir (HLS_PATH, DEFFILEMODE);
1756                 if (r == -1 && errno != EEXIST)
1757                 {
1758                         g_error ("Failed to create HLS server directory '%s': %s (%i)", HLS_PATH, strerror(errno), errno);
1759                         goto fail;
1760                 }
1761
1762                 h->port = port;
1763                 h->soupserver = soup_server_new (SOUP_SERVER_PORT, h->port, SOUP_SERVER_SERVER_HEADER, "dreamhttplive", NULL);
1764                 soup_server_add_handler (h->soupserver, NULL, soup_server_callback, app, NULL);
1765                 soup_server_run_async (h->soupserver);
1766
1767                 gchar *credentials = g_strdup("");
1768                 if (strlen(user)) {
1769                         h->hls_user = g_strdup(user);
1770                         h->hls_pass = g_strdup(pass);
1771                         h->soupauthdomain = soup_auth_domain_basic_new (
1772                         SOUP_AUTH_DOMAIN_REALM, "Dreambox HLS Server",
1773                         SOUP_AUTH_DOMAIN_BASIC_AUTH_CALLBACK, soup_server_auth_callback,
1774                         SOUP_AUTH_DOMAIN_BASIC_AUTH_DATA, app,
1775                         SOUP_AUTH_DOMAIN_ADD_PATH, "",
1776                         NULL);
1777                         soup_server_add_auth_domain (h->soupserver, h->soupauthdomain);
1778                         credentials = g_strdup_printf("%s:%s@", user, pass);
1779                 }
1780                 else
1781                 {
1782                         h->hls_user = h->hls_pass = NULL;
1783                         h->soupauthdomain = NULL;
1784                 }
1785
1786                 GST_INFO_OBJECT (h->soupserver, "SOUP HLS server ready at http://%s127.0.0.1:%i/%s ...", credentials, soup_server_get_port (h->soupserver), HLS_PLAYLIST_NAME);
1787
1788                 h->state = HLS_STATE_IDLE;
1789                 send_signal (app, "hlsStateChanged", g_variant_new("(i)", HLS_STATE_IDLE));
1790                 GST_DEBUG ("set HLS_STATE_IDLE");
1791                 g_free (credentials);
1792                 DREAMRTSPSERVER_UNLOCK (app);
1793                 return TRUE;
1794         }
1795         else
1796                 GST_INFO_OBJECT (app, "HLS server already enabled!");
1797         DREAMRTSPSERVER_UNLOCK (app);
1798         return FALSE;
1799
1800 fail:
1801         DREAMRTSPSERVER_UNLOCK (app);
1802         disable_hls_server(app);
1803         return FALSE;
1804
1805 }
1806
1807 gboolean start_hls_pipeline(App* app)
1808 {
1809         GST_DEBUG_OBJECT (app, "start_hls_pipeline");
1810
1811         DreamHLSserver *h = app->hls_server;
1812         if (h->state == HLS_STATE_DISABLED)
1813         {
1814                 GST_ERROR_OBJECT (app, "failed to start hls pipeline because hls server is not enabled!");
1815                 return FALSE;
1816         }
1817
1818         assert_tsmux (app);
1819
1820         h->queue = gst_element_factory_make ("queue", "hlsqueue");
1821         h->hlssink = gst_element_factory_make ("hlssink", "hlssink");
1822         if (!(h->hlssink && h->queue))
1823         {
1824                 g_error ("Failed to create HLS pipeline element(s):%s%s", h->hlssink?"":" hlssink", h->queue?"":" queue");
1825                 return FALSE;
1826         }
1827
1828         gchar *frag_location, *playlist_location;
1829         frag_location = g_strdup_printf ("%s/%s", HLS_PATH, HLS_FRAGMENT_NAME);
1830         playlist_location = g_strdup_printf ("%s/%s", HLS_PATH, HLS_PLAYLIST_NAME);
1831
1832         g_object_set (G_OBJECT (h->hlssink), "target-duration", HLS_FRAGMENT_DURATION, NULL);
1833         g_object_set (G_OBJECT (h->hlssink), "location", frag_location, NULL);
1834         g_object_set (G_OBJECT (h->hlssink), "playlist-location", playlist_location, NULL);
1835         g_object_set (G_OBJECT (h->queue), "leaky", 2, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, NULL);
1836
1837         gst_bin_add_many (GST_BIN (app->pipeline), h->queue, h->hlssink,  NULL);
1838         gst_element_link (h->queue, h->hlssink);
1839
1840         if (!assert_state (app, h->hlssink, GST_STATE_READY) || !assert_state (app, h->queue, GST_STATE_PLAYING))
1841                 return FALSE;
1842
1843         GstPad *teepad, *sinkpad;
1844         GstPadLinkReturn ret;
1845         teepad = gst_element_get_request_pad (app->tstee, "src_%u");
1846         sinkpad = gst_element_get_static_pad (h->queue, "sink");
1847         ret = gst_pad_link (teepad, sinkpad);
1848         if (ret != GST_PAD_LINK_OK)
1849         {
1850                 GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
1851                 return FALSE;
1852         }
1853         gst_object_unref (teepad);
1854         gst_object_unref (sinkpad);
1855
1856         if (app->tcp_upstream->state == UPSTREAM_STATE_WAITING)
1857                 unpause_source_pipeline(app);
1858
1859         GstStateChangeReturn sret = gst_element_set_state (h->hlssink, GST_STATE_PLAYING);
1860         GST_DEBUG_OBJECT(app, "explicitely bring hlssink to GST_STATE_PLAYING = %i", sret);
1861
1862         if (!assert_state (app, app->pipeline, GST_STATE_PLAYING))
1863         {
1864                 GST_ERROR_OBJECT (app, "GST_STATE_CHANGE_FAILURE for hls pipeline");
1865                 return FALSE;
1866         }
1867
1868         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"start_hls_server");
1869         return TRUE;
1870 }
1871
1872 DreamHLSserver *create_hls_server(App *app)
1873 {
1874         DreamHLSserver *h = malloc(sizeof(DreamHLSserver));
1875         send_signal (app, "hlsStateChanged", g_variant_new("(i)", HLS_STATE_DISABLED));
1876         h->state = HLS_STATE_DISABLED;
1877         h->queue = NULL;
1878         h->hlssink = NULL;
1879         return h;
1880 }
1881
1882 DreamRTSPserver *create_rtsp_server(App *app)
1883 {
1884         DreamRTSPserver *r = malloc(sizeof(DreamRTSPserver));
1885         send_signal (app, "rtspStateChanged", g_variant_new("(i)", RTSP_STATE_DISABLED));
1886         r->state = RTSP_STATE_DISABLED;
1887         r->server = NULL;
1888         r->ts_factory = r->es_factory = NULL;
1889         r->ts_media = r->es_media = NULL;
1890         r->ts_appsrc = r->es_aappsrc = r->es_vappsrc = NULL;
1891         r->clients_list = NULL;
1892         return r;
1893 }
1894
1895 gboolean enable_rtsp_server(App *app, const gchar *path, guint32 port, const gchar *user, const gchar *pass)
1896 {
1897         GST_INFO_OBJECT(app, "enable_rtsp_server path=%s port=%i user=%s pass=%s", path, port, user, pass);
1898
1899         if (!app->pipeline)
1900         {
1901                 GST_ERROR_OBJECT (app, "failed to enable rtsp server because source pipeline is NULL!");
1902                 return FALSE;
1903         }
1904
1905         DREAMRTSPSERVER_LOCK (app);
1906         DreamRTSPserver *r = app->rtsp_server;
1907
1908         if (r->state == RTSP_STATE_DISABLED)
1909         {
1910                 r->artspq = gst_element_factory_make ("queue", "rtspaudioqueue");
1911                 r->vrtspq = gst_element_factory_make ("queue", "rtspvideoqueue");
1912                 r->aappsink = gst_element_factory_make ("appsink", AAPPSINK);
1913                 r->vappsink = gst_element_factory_make ("appsink", VAPPSINK);
1914
1915                 g_object_set (G_OBJECT (r->artspq), "leaky", 2, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, NULL);
1916                 g_object_set (G_OBJECT (r->vrtspq), "leaky", 2, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, NULL);
1917
1918                 g_object_set (G_OBJECT (r->aappsink), "emit-signals", TRUE, NULL);
1919                 g_object_set (G_OBJECT (r->aappsink), "enable-last-sample", FALSE, NULL);
1920                 g_signal_connect (r->aappsink, "new-sample", G_CALLBACK (handover_payload), app);
1921
1922                 g_object_set (G_OBJECT (r->vappsink), "emit-signals", TRUE, NULL);
1923                 g_object_set (G_OBJECT (r->vappsink), "enable-last-sample", FALSE, NULL);
1924                 g_signal_connect (r->vappsink, "new-sample", G_CALLBACK (handover_payload), app);
1925
1926                 r->tsrtspq = gst_element_factory_make ("queue", "tsrtspqueue");
1927                 r->tsappsink = gst_element_factory_make ("appsink", TSAPPSINK);
1928
1929                 g_object_set (G_OBJECT (r->tsrtspq), "leaky", 2, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, NULL);
1930
1931                 g_object_set (G_OBJECT (r->tsappsink), "emit-signals", TRUE, NULL);
1932                 g_object_set (G_OBJECT (r->tsappsink), "enable-last-sample", FALSE, NULL);
1933                 g_signal_connect (r->tsappsink, "new-sample", G_CALLBACK (handover_payload), app);
1934
1935                 gst_bin_add_many (GST_BIN (app->pipeline), r->artspq, r->vrtspq, r->aappsink, r->vappsink,  NULL);
1936                 gst_element_link (r->artspq, r->aappsink);
1937                 gst_element_link (r->vrtspq, r->vappsink);
1938
1939                 gst_bin_add_many (GST_BIN (app->pipeline), r->tsrtspq, r->tsappsink,  NULL);
1940                 gst_element_link (r->tsrtspq, r->tsappsink);
1941
1942                 GstState targetstate = GST_STATE_READY;
1943
1944                 if (!assert_state (app, r->tsappsink, targetstate) || !assert_state (app, r->aappsink, targetstate) || !assert_state (app, r->vappsink, targetstate))
1945                         goto fail;
1946
1947                 if (!assert_state (app, r->tsrtspq, targetstate) || !assert_state (app, r->artspq, targetstate) || !assert_state (app, r->vrtspq, targetstate))
1948                         goto fail;
1949
1950                 GstPad *teepad, *sinkpad;
1951                 GstPadLinkReturn ret;
1952                 teepad = gst_element_get_request_pad (app->atee, "src_%u");
1953                 sinkpad = gst_element_get_static_pad (r->artspq, "sink");
1954                 ret = gst_pad_link (teepad, sinkpad);
1955                 if (ret != GST_PAD_LINK_OK)
1956                 {
1957                         GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
1958                         goto fail;
1959                 }
1960                 gst_object_unref (teepad);
1961                 gst_object_unref (sinkpad);
1962                 teepad = gst_element_get_request_pad (app->vtee, "src_%u");
1963                 sinkpad = gst_element_get_static_pad (r->vrtspq, "sink");
1964                 ret = gst_pad_link (teepad, sinkpad);
1965                 if (ret != GST_PAD_LINK_OK)
1966                 {
1967                         GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
1968                         goto fail;
1969                 }
1970                 gst_object_unref (teepad);
1971                 gst_object_unref (sinkpad);
1972                 teepad = gst_element_get_request_pad (app->tstee, "src_%u");
1973                 sinkpad = gst_element_get_static_pad (r->tsrtspq, "sink");
1974                 ret = gst_pad_link (teepad, sinkpad);
1975                 if (ret != GST_PAD_LINK_OK)
1976                 {
1977                         GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
1978                         goto fail;
1979                 }
1980                 gst_object_unref (teepad);
1981                 gst_object_unref (sinkpad);
1982
1983                 if (app->tcp_upstream->state != UPSTREAM_STATE_DISABLED || app->hls_server->state != HLS_STATE_DISABLED)
1984                         targetstate = GST_STATE_PLAYING;
1985
1986                 if (!assert_state (app, app->pipeline, targetstate))
1987                         goto fail;
1988
1989                 r->server = g_object_new (GST_TYPE_DREAM_RTSP_SERVER, NULL);
1990                 g_signal_connect (r->server, "client-connected", (GCallback) client_connected, app);
1991
1992                 r->es_factory = gst_dream_rtsp_media_factory_new ();
1993                 gst_rtsp_media_factory_set_launch (GST_RTSP_MEDIA_FACTORY (r->es_factory), "( appsrc name=" ES_VAPPSRC " ! h264parse ! rtph264pay name=pay0 pt=96   appsrc name=" ES_AAPPSRC " ! aacparse ! rtpmp4apay name=pay1 pt=97 )");
1994                 gst_rtsp_media_factory_set_shared (GST_RTSP_MEDIA_FACTORY (r->es_factory), TRUE);
1995
1996                 g_signal_connect (r->es_factory, "media-configure", (GCallback) media_configure, app);
1997
1998                 r->ts_factory = gst_dream_rtsp_media_factory_new ();
1999                 gst_rtsp_media_factory_set_launch (GST_RTSP_MEDIA_FACTORY (r->ts_factory), "( appsrc name=" TS_APPSRC " ! queue ! rtpmp2tpay name=pay0 pt=96 )");
2000                 gst_rtsp_media_factory_set_shared (GST_RTSP_MEDIA_FACTORY (r->ts_factory), TRUE);
2001
2002                 g_signal_connect (r->ts_factory, "media-configure", (GCallback) media_configure, app);
2003                 g_signal_connect (r->ts_factory, "uri-parametrized", (GCallback) uri_parametrized, app);
2004
2005                 DREAMRTSPSERVER_UNLOCK (app);
2006
2007                 gchar *credentials = g_strdup("");
2008                 if (strlen(user)) {
2009                         r->rtsp_user = g_strdup(user);
2010                         r->rtsp_pass = g_strdup(pass);
2011                         GstRTSPToken *token;
2012                         gchar *basic;
2013                         GstRTSPAuth *auth = gst_rtsp_auth_new ();
2014                         gst_rtsp_media_factory_add_role (GST_RTSP_MEDIA_FACTORY (r->es_factory), "user", GST_RTSP_PERM_MEDIA_FACTORY_ACCESS, G_TYPE_BOOLEAN, TRUE, GST_RTSP_PERM_MEDIA_FACTORY_CONSTRUCT, G_TYPE_BOOLEAN, TRUE, NULL);
2015                         gst_rtsp_media_factory_add_role (GST_RTSP_MEDIA_FACTORY (r->ts_factory), "user", GST_RTSP_PERM_MEDIA_FACTORY_ACCESS, G_TYPE_BOOLEAN, TRUE, GST_RTSP_PERM_MEDIA_FACTORY_CONSTRUCT, G_TYPE_BOOLEAN, TRUE, NULL);
2016                         token = gst_rtsp_token_new (GST_RTSP_TOKEN_MEDIA_FACTORY_ROLE, G_TYPE_STRING, "user", NULL);
2017                         basic = gst_rtsp_auth_make_basic (r->rtsp_user, r->rtsp_pass);
2018                         gst_rtsp_server_set_auth (GST_RTSP_SERVER(r->server), auth);
2019                         gst_rtsp_auth_add_basic (auth, basic, token);
2020                         g_free (basic);
2021                         gst_rtsp_token_unref (token);
2022                         credentials = g_strdup_printf("%s:%s@", user, pass);
2023                 }
2024                 else
2025                         r->rtsp_user = r->rtsp_pass = NULL;
2026
2027                 r->rtsp_port = g_strdup_printf("%i", port ? port : DEFAULT_RTSP_PORT);
2028
2029                 gst_rtsp_server_set_service (GST_RTSP_SERVER(r->server), r->rtsp_port);
2030
2031                 if (strlen(path))
2032                 {
2033                         r->rtsp_ts_path = g_strdup_printf ("%s%s", path[0]=='/' ? "" : "/", path);
2034                         r->rtsp_es_path = g_strdup_printf ("%s%s%s", path[0]=='/' ? "" : "/", path, RTSP_ES_PATH_SUFX);
2035                 }
2036                 else
2037                 {
2038                         r->rtsp_ts_path = g_strdup(DEFAULT_RTSP_PATH);
2039                         r->rtsp_es_path = g_strdup_printf ("%s%s", DEFAULT_RTSP_PATH, RTSP_ES_PATH_SUFX);
2040                 }
2041
2042                 r->mounts = gst_rtsp_server_get_mount_points (GST_RTSP_SERVER(r->server));
2043                 gst_rtsp_mount_points_add_factory (r->mounts, r->rtsp_ts_path, g_object_ref(GST_RTSP_MEDIA_FACTORY (r->ts_factory)));
2044                 gst_rtsp_mount_points_add_factory (r->mounts, r->rtsp_es_path, g_object_ref(GST_RTSP_MEDIA_FACTORY (r->es_factory)));
2045                 r->state = RTSP_STATE_IDLE;
2046                 send_signal (app, "rtspStateChanged", g_variant_new("(i)", RTSP_STATE_IDLE));
2047                 GST_DEBUG ("set RTSP_STATE_IDLE");
2048                 r->source_id = gst_rtsp_server_attach (GST_RTSP_SERVER(r->server), NULL);
2049                 r->uri_parameters = NULL;
2050                 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"enabled_rtsp_server");
2051                 g_print ("dreambox encoder stream ready at rtsp://%s127.0.0.1:%s%s\n", credentials, app->rtsp_server->rtsp_port, app->rtsp_server->rtsp_ts_path);
2052                 g_free (credentials);
2053                 return TRUE;
2054         }
2055         else
2056                 GST_INFO_OBJECT (app, "rtsp server already enabled!");
2057         DREAMRTSPSERVER_UNLOCK (app);
2058         return FALSE;
2059
2060 fail:
2061         DREAMRTSPSERVER_UNLOCK (app);
2062         disable_rtsp_server(app);
2063         return FALSE;
2064 }
2065
2066 gboolean start_rtsp_pipeline(App* app)
2067 {
2068         GST_DEBUG_OBJECT (app, "start_rtsp_pipeline");
2069
2070         DreamRTSPserver *r = app->rtsp_server;
2071         if (r->state == RTSP_STATE_DISABLED)
2072         {
2073                 GST_ERROR_OBJECT (app, "failed to start rtsp pipeline because rtsp server is not enabled!");
2074                 return FALSE;
2075         }
2076
2077         assert_tsmux (app);
2078         if (!assert_state (app, app->pipeline, GST_STATE_PLAYING))
2079         {
2080                 GST_ERROR_OBJECT (app, "GST_STATE_CHANGE_FAILURE for rtsp pipeline");
2081                 return FALSE;
2082         }
2083         GST_INFO_OBJECT(app, "start rtsp pipeline, pipeline going into PLAYING");
2084         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"start_rtsp_pipeline");
2085         GstState state;
2086         gst_element_get_state (GST_ELEMENT(app->pipeline), &state, NULL, 10*GST_SECOND);
2087         GST_INFO_OBJECT(app, "pipeline state=%s", gst_element_state_get_name (state));
2088         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"started_rtsp_pipeline");
2089         return TRUE;
2090 }
2091
2092 static GstPadProbeReturn tsmux_pad_probe_unlink_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2093 {
2094         App *app = user_data;
2095
2096         GstElement *element = gst_pad_get_parent_element(pad);
2097         GST_DEBUG_OBJECT (pad, "tsmux_pad_probe_unlink_cb % "GST_PTR_FORMAT, element);
2098
2099         GstElement *source = NULL;
2100         if (element == app->aq)
2101                 source = app->asrc;
2102         else if (element == app->vq)
2103                 source = app->vsrc;
2104         else if (element == app->tsmux)
2105         {
2106                 gst_element_unlink (app->tsmux, app->tstee);
2107                 gst_bin_remove (GST_BIN (app->pipeline), app->tsmux);
2108                 gst_element_set_state (app->tsmux, GST_STATE_NULL);
2109                 gst_object_unref (app->tsmux);
2110                 app->tsmux = NULL;
2111                 if (gst_element_set_state (app->pipeline, GST_STATE_READY) != GST_STATE_CHANGE_SUCCESS)
2112                         GST_WARNING_OBJECT (pad, "error bringing pipeline back to ready");
2113                 DREAMRTSPSERVER_UNLOCK (app);
2114                 GST_DEBUG_OBJECT (pad, "finished unlinking and removing sources and tsmux");
2115                 return GST_PAD_PROBE_REMOVE;
2116         }
2117
2118         if (gst_element_set_state (source, GST_STATE_NULL) != GST_STATE_CHANGE_SUCCESS)
2119         {
2120                 GST_ERROR_OBJECT (pad, "can't set % " GST_PTR_FORMAT "'s state to GST_STATE_NULL", source);
2121                 goto fail;
2122         }
2123
2124         GstPad *srcpad, *muxpad;
2125         srcpad = gst_element_get_static_pad (element, "src");
2126         muxpad = gst_pad_get_peer (srcpad);
2127         if (GST_IS_PAD (muxpad))
2128         {
2129                 GST_DEBUG_OBJECT (pad, "srcpad % " GST_PTR_FORMAT " muxpad % "GST_PTR_FORMAT" tsmux % "GST_PTR_FORMAT, srcpad, muxpad, app->tsmux);
2130                 gst_pad_unlink (srcpad, muxpad);
2131                 gst_element_release_request_pad (app->tsmux, muxpad);
2132                 gst_object_unref (muxpad);
2133         }
2134         else
2135                 GST_DEBUG_OBJECT (pad, "srcpad % " GST_PTR_FORMAT "'s peer was already unreffed");
2136         gst_object_unref (srcpad);
2137
2138         gst_element_set_state (element, GST_STATE_READY);
2139
2140         GstState state;
2141         gst_element_get_state (GST_ELEMENT(element), &state, NULL, 2*GST_SECOND);
2142
2143         if (state != GST_STATE_READY)
2144         {
2145                 GST_ERROR_OBJECT (app, "%" GST_PTR_FORMAT"'s state = %s (should be GST_STATE_READY)", element, gst_element_state_get_name (state));
2146                 goto fail;
2147         }
2148
2149         GstPad *sinkpad = NULL;
2150         GstElement *nextelem = NULL;
2151         if (element == app->aq)
2152         {
2153                 nextelem = app->vq;
2154                 if (GST_IS_ELEMENT(nextelem))
2155                         sinkpad = gst_element_get_static_pad (nextelem, "sink");
2156         }
2157         if (element == app->vq)
2158         {
2159                 nextelem = app->tsmux;
2160                 if (GST_IS_ELEMENT(nextelem))
2161                         sinkpad = gst_element_get_static_pad (nextelem, "src");
2162         }
2163         if (GST_IS_PAD(sinkpad) && GST_IS_ELEMENT(nextelem))
2164         {
2165                 GST_DEBUG_OBJECT (pad, "element % " GST_PTR_FORMAT " is now in GST_STATE_READY, installing idle probe on % "GST_PTR_FORMAT"", element, sinkpad);
2166                 gst_object_ref (nextelem);
2167                 gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, tsmux_pad_probe_unlink_cb, app, NULL);
2168                 gst_object_unref (sinkpad);
2169                 return GST_PAD_PROBE_REMOVE;
2170         }
2171
2172 fail:
2173         DREAMRTSPSERVER_UNLOCK (app);
2174         return GST_PAD_PROBE_REMOVE;
2175 }
2176
2177 gboolean halt_source_pipeline(App* app)
2178 {
2179         GST_INFO_OBJECT(app, "halt_source_pipeline...");
2180         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"halt_source_pipeline_pre");
2181         GstPad *sinkpad;
2182         gst_object_ref (app->aq);
2183         sinkpad = gst_element_get_static_pad (app->aq, "sink");
2184         gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, tsmux_pad_probe_unlink_cb, app, NULL);
2185         gst_object_unref (sinkpad);
2186         return TRUE;
2187 }
2188
2189 gboolean pause_source_pipeline(App* app)
2190 {
2191         if (app->rtsp_server->state <= RTSP_STATE_IDLE && app->hls_server->state == HLS_STATE_DISABLED)
2192         {
2193                 GST_INFO_OBJECT(app, "pause_source_pipeline... setting sources to GST_STATE_PAUSED rtsp_server->state=%i hls_server->state=%i", app->rtsp_server->state, app->hls_server->state);
2194                 if (gst_element_set_state (app->asrc, GST_STATE_PAUSED) != GST_STATE_CHANGE_NO_PREROLL || gst_element_set_state (app->vsrc, GST_STATE_PAUSED) != GST_STATE_CHANGE_NO_PREROLL)
2195                 {
2196                         GST_WARNING ("can't set pipeline to GST_STATE_PAUSED!");
2197                         return FALSE;
2198                 }
2199         }
2200         else
2201                 GST_DEBUG ("not pausing pipeline because rtsp_server->state=%i hls_server->state=%i", app->rtsp_server->state, app->hls_server->state);
2202         return TRUE;
2203 }
2204
2205 gboolean unpause_source_pipeline(App* app)
2206 {
2207         GST_INFO_OBJECT(app, "unpause_source_pipeline... setting sources to GST_STATE_PLAYING");
2208         if (gst_element_set_state (app->asrc, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE || gst_element_set_state (app->vsrc, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE)
2209         {
2210                 GST_WARNING("can't set sources to GST_STATE_PLAYING!");
2211                 return FALSE;
2212         }
2213         return TRUE;
2214 }
2215
2216 GstRTSPFilterResult remove_media_filter_func (GstRTSPSession * sess, GstRTSPSessionMedia * session_media, gpointer user_data)
2217 {
2218         App *app = user_data;
2219         GstRTSPFilterResult res = GST_RTSP_FILTER_REF;
2220         GstRTSPMedia *media;
2221         media = gst_rtsp_session_media_get_media (session_media);
2222 //      DREAMRTSPSERVER_LOCK (app);
2223         if (media == app->rtsp_server->es_media || media == app->rtsp_server->ts_media) {
2224                 GST_DEBUG_OBJECT (app, "matching RTSP media %p in filter, removing...", media);
2225                 res = GST_RTSP_FILTER_REMOVE;
2226         }
2227 //      DREAMRTSPSERVER_UNLOCK (app);
2228         return res;
2229 }
2230
2231 GstRTSPFilterResult remove_session_filter_func (GstRTSPClient *client, GstRTSPSession * sess, gpointer user_data)
2232 {
2233         App *app = user_data;
2234         GList *media_filter_res;
2235         GstRTSPFilterResult res = GST_RTSP_FILTER_REF;
2236         media_filter_res = gst_rtsp_session_filter (sess, remove_media_filter_func, app);
2237         if (g_list_length (media_filter_res) == 0) {
2238                 GST_DEBUG_OBJECT (app, "no more media for session %p, removing...", sess);
2239                 res = GST_RTSP_FILTER_REMOVE;
2240         }
2241         g_list_free (media_filter_res);
2242         return res;
2243 }
2244
2245 GstRTSPFilterResult remove_client_filter_func (GstRTSPServer *server, GstRTSPClient *client, gpointer user_data)
2246 {
2247         App *app = user_data;
2248         GList *session_filter_res;
2249         GstRTSPFilterResult res = GST_RTSP_FILTER_KEEP;
2250         int ret = g_signal_handlers_disconnect_by_func(client, (GCallback) client_closed, app);
2251         GST_INFO("client_filter_func %" GST_PTR_FORMAT "  (number of clients: %i). disconnected %i callback handlers", client, g_list_length(app->rtsp_server->clients_list), ret);
2252         session_filter_res = gst_rtsp_client_session_filter (client, remove_session_filter_func, app);
2253         if (g_list_length (session_filter_res) == 0) {
2254                 GST_DEBUG_OBJECT (app, "no more sessions for client %p, removing...", app);
2255                 res = GST_RTSP_FILTER_REMOVE;
2256         }
2257         g_list_free (session_filter_res);
2258         return res;
2259 }
2260
2261 static GstPadProbeReturn rtsp_pad_probe_unlink_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2262 {
2263         App *app = user_data;
2264         DreamRTSPserver *r = app->rtsp_server;
2265
2266         GstElement *element = gst_pad_get_parent_element(pad);
2267         GstElement *appsink;
2268         if (element == r->vrtspq)
2269                 appsink = r->vappsink;
2270         else if (element == r->artspq)
2271                 appsink = r->aappsink;
2272         else if (element == r->tsrtspq)
2273                 appsink = r->tsappsink;
2274
2275         GST_DEBUG_OBJECT (pad, "unlink... %" GST_PTR_FORMAT " and % "GST_PTR_FORMAT, element, appsink);
2276
2277         GstPad *teepad;
2278         teepad = gst_pad_get_peer(pad);
2279         gst_pad_unlink (teepad, pad);
2280
2281         GstElement *tee = gst_pad_get_parent_element(teepad);
2282         gst_element_release_request_pad (tee, teepad);
2283         gst_object_unref (teepad);
2284         gst_object_unref (tee);
2285
2286         gst_element_unlink (element, appsink);
2287
2288         GST_DEBUG_OBJECT (pad, "remove... %" GST_PTR_FORMAT " and % "GST_PTR_FORMAT, element, appsink);
2289
2290         gst_bin_remove_many (GST_BIN (app->pipeline), element, appsink, NULL);
2291
2292         GST_DEBUG_OBJECT (pad, "set state null %" GST_PTR_FORMAT " and % "GST_PTR_FORMAT, element, appsink);
2293
2294         gst_element_set_state (appsink, GST_STATE_NULL);
2295         gst_element_set_state (element, GST_STATE_NULL);
2296
2297         GST_DEBUG_OBJECT (pad, "unref.... %" GST_PTR_FORMAT " and % "GST_PTR_FORMAT, element, appsink);
2298
2299         gst_object_unref (element);
2300         gst_object_unref (appsink);
2301         element = NULL;
2302         appsink = NULL;
2303
2304         if (!r->tsappsink && !r->aappsink && !r->vappsink)
2305         {
2306                 GST_INFO("!r->tsappsink && !r->aappsink && !r->vappsink");
2307                 if (app->tcp_upstream->state == UPSTREAM_STATE_DISABLED && app->hls_server->state == HLS_STATE_DISABLED)
2308                         halt_source_pipeline(app);
2309                 GST_INFO("local rtsp server disabled!");
2310         }
2311         return GST_PAD_PROBE_REMOVE;
2312 }
2313
2314 gboolean disable_rtsp_server(App *app)
2315 {
2316         DreamRTSPserver *r = app->rtsp_server;
2317         GST_DEBUG("disable_rtsp_server %p", r->server);
2318         if (r->state >= RTSP_STATE_IDLE)
2319         {
2320                 if (app->rtsp_server->es_media)
2321                         gst_rtsp_server_client_filter(GST_RTSP_SERVER(app->rtsp_server->server), (GstRTSPServerClientFilterFunc) remove_client_filter_func, app);
2322                 DREAMRTSPSERVER_LOCK (app);
2323                 gst_rtsp_mount_points_remove_factory (app->rtsp_server->mounts, app->rtsp_server->rtsp_es_path);
2324                 gst_rtsp_mount_points_remove_factory (app->rtsp_server->mounts, app->rtsp_server->rtsp_ts_path);
2325                 GSource *source = g_main_context_find_source_by_id (g_main_context_default (), r->source_id);
2326                 g_source_destroy(source);
2327 //              g_source_unref(source);
2328 //              GST_DEBUG("disable_rtsp_server source unreffed");
2329                 if (r->mounts)
2330                         g_object_unref(r->mounts);
2331                 if (r->server)
2332                         gst_object_unref(r->server);
2333                 g_free(r->rtsp_user);
2334                 g_free(r->rtsp_pass);
2335                 g_free(r->rtsp_port);
2336                 g_free(r->rtsp_ts_path);
2337                 g_free(r->rtsp_es_path);
2338                 g_free(r->uri_parameters);
2339                 send_signal (app, "rtspStateChanged", g_variant_new("(i)", RTSP_STATE_DISABLED));
2340                 r->state = RTSP_STATE_DISABLED;
2341
2342                 gst_object_ref (r->tsrtspq);
2343                 gst_object_ref (r->artspq);
2344                 gst_object_ref (r->vrtspq);
2345                 gst_object_ref (r->tsappsink);
2346                 gst_object_ref (r->aappsink);
2347                 gst_object_ref (r->vappsink);
2348
2349                 GstPad *sinkpad;
2350                 sinkpad = gst_element_get_static_pad (r->tsrtspq, "sink");
2351                 gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, rtsp_pad_probe_unlink_cb, app, NULL);
2352                 gst_object_unref (sinkpad);
2353                 sinkpad = gst_element_get_static_pad (r->artspq, "sink");
2354                 gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, rtsp_pad_probe_unlink_cb, app, NULL);
2355                 gst_object_unref (sinkpad);
2356                 sinkpad = gst_element_get_static_pad (r->vrtspq, "sink");
2357                 gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, rtsp_pad_probe_unlink_cb, app, NULL);
2358                 gst_object_unref (sinkpad);
2359
2360                 DREAMRTSPSERVER_UNLOCK (app);
2361                 GST_INFO("rtsp_server disabled! set RTSP_STATE_DISABLED");
2362                 return TRUE;
2363         }
2364         return FALSE;
2365 }
2366
2367 static GstPadProbeReturn upstream_pad_probe_unlink_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2368 {
2369         App *app = user_data;
2370         DreamTCPupstream *t = app->tcp_upstream;
2371         GstPadProbeReturn ret;
2372
2373         GstElement *element = gst_pad_get_parent_element(pad);
2374
2375         GST_DEBUG_OBJECT (pad, "upstream_pad_probe_unlink_cb %" GST_PTR_FORMAT, element);
2376
2377         if (GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_IDLE && element && element == t->tstcpq)
2378         {
2379                 GstPad *teepad;
2380                 teepad = gst_pad_get_peer(pad);
2381                 if (!teepad)
2382                 {
2383                         GST_ERROR_OBJECT (pad, "has no peer! tstcpq=%" GST_PTR_FORMAT", tcpsink=%" GST_PTR_FORMAT", tee=%" GST_PTR_FORMAT,t->tstcpq, t->tcpsink, app->tstee);
2384                         return GST_PAD_PROBE_REMOVE;
2385                 }
2386                 GST_DEBUG_OBJECT (pad, "GST_PAD_PROBE_TYPE_IDLE -> unlink and remove tcpsink");
2387                 gst_pad_unlink (teepad, pad);
2388
2389                 GstElement *tee = gst_pad_get_parent_element(teepad);
2390                 gst_element_release_request_pad (tee, teepad);
2391                 gst_object_unref (teepad);
2392                 gst_object_unref (tee);
2393
2394                 gst_object_ref (t->tcpsink);
2395                 gst_element_unlink (t->tstcpq, t->tcpsink);
2396                 gst_bin_remove_many (GST_BIN (app->pipeline), t->tstcpq, t->tcpsink, NULL);
2397
2398                 gst_element_set_state (t->tcpsink, GST_STATE_NULL);
2399                 gst_element_set_state (t->tstcpq, GST_STATE_NULL);
2400
2401                 gst_object_unref (t->tstcpq);
2402                 gst_object_unref (t->tcpsink);
2403                 t->tstcpq = NULL;
2404                 t->tcpsink = NULL;
2405
2406                 if (app->rtsp_server->state < RTSP_STATE_RUNNING && app->hls_server->state == HLS_STATE_DISABLED)
2407                         halt_source_pipeline(app);
2408                 GST_INFO("tcp_upstream disabled!");
2409                 t->state = UPSTREAM_STATE_DISABLED;
2410                 send_signal (app, "upstreamStateChanged", g_variant_new("(i)", t->state));
2411         }
2412         GST_DEBUG_OBJECT (pad, "upstream_pad_probe_unlink_cb returns GST_PAD_PROBE_REMOVE");
2413         return GST_PAD_PROBE_REMOVE;
2414 }
2415
2416 gboolean disable_tcp_upstream(App *app)
2417 {
2418         GstState state;
2419         gst_element_get_state (GST_ELEMENT(app->pipeline), &state, NULL, 3*GST_SECOND);
2420         GST_DEBUG("disable_tcp_upstream (current pipeline state=%s)", gst_element_state_get_name (state));
2421         DreamTCPupstream *t = app->tcp_upstream;
2422         if (t->state >= UPSTREAM_STATE_CONNECTING)
2423         {
2424                 GstPad *sinkpad;
2425                 if (t->id_bitrate_measure)
2426                 {
2427                         sinkpad = gst_element_get_static_pad (t->tcpsink, "sink");
2428                         gst_pad_remove_probe (sinkpad, t->id_bitrate_measure);
2429                         t->id_bitrate_measure = 0;
2430                         gst_object_unref (sinkpad);
2431                 }
2432                 gst_object_ref (t->tstcpq);
2433                 sinkpad = gst_element_get_static_pad (t->tstcpq, "sink");
2434                 gulong probe_id = gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, upstream_pad_probe_unlink_cb, app, NULL);
2435                 GST_DEBUG("added upstream_pad_probe_unlink_cb with probe_id = %lu on %" GST_PTR_FORMAT"", probe_id, sinkpad);
2436                 gst_object_unref (sinkpad);
2437                 return TRUE;
2438         }
2439         return FALSE;
2440 }
2441
2442 gboolean destroy_pipeline(App *app)
2443 {
2444         GST_DEBUG_OBJECT(app, "destroy_pipeline @%p", app->pipeline);
2445         if (app->pipeline)
2446         {
2447                 get_source_properties (app);
2448                 GstStateChangeReturn sret = gst_element_set_state (app->pipeline, GST_STATE_NULL);
2449                 if (sret == GST_STATE_CHANGE_ASYNC)
2450                 {
2451                         GstState state;
2452                         gst_element_get_state (GST_ELEMENT(app->pipeline), &state, NULL, 3*GST_SECOND);
2453                         if (state != GST_STATE_NULL)
2454                                 GST_INFO_OBJECT(app, "%" GST_PTR_FORMAT"'s state=%s", app->pipeline, gst_element_state_get_name (state));
2455                 }
2456                 gst_object_unref (app->pipeline);
2457                 gst_object_unref (app->clock);
2458                 GST_INFO_OBJECT(app, "source pipeline destroyed");
2459                 app->pipeline = NULL;
2460                 return TRUE;
2461         }
2462         else
2463                 GST_INFO_OBJECT(app, "don't destroy inexistant pipeline");
2464         return FALSE;
2465 }
2466
2467 gboolean watchdog_ping(gpointer user_data)
2468 {
2469         App *app = user_data;
2470         GST_TRACE_OBJECT(app, "sending watchdog ping!");
2471         if (app->dbus_connection)
2472                 g_dbus_connection_emit_signal (app->dbus_connection, NULL, object_name, service, "ping", NULL, NULL);
2473         return TRUE;
2474 }
2475
2476 gboolean quit_signal(gpointer loop)
2477 {
2478         GST_INFO_OBJECT(loop, "caught SIGINT");
2479         g_main_loop_quit((GMainLoop*)loop);
2480         return FALSE;
2481 }
2482
2483 gboolean get_dot_graph (gpointer user_data)
2484 {
2485         App *app = user_data;
2486         GST_INFO_OBJECT(app, "caught SIGUSR1, saving pipeline graph...");
2487         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (app->pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "dreamrtspserver-sigusr");
2488         return TRUE;
2489 }
2490
2491 int main (int argc, char *argv[])
2492 {
2493         App app;
2494         guint owner_id;
2495
2496         gst_init (0, NULL);
2497
2498         GST_DEBUG_CATEGORY_INIT (dreamrtspserver_debug, "dreamrtspserver",
2499                         GST_DEBUG_BOLD | GST_DEBUG_FG_YELLOW | GST_DEBUG_BG_BLUE,
2500                         "Dreambox RTSP server daemon");
2501
2502         memset (&app, 0, sizeof(app));
2503         memset (&app.source_properties, 0, sizeof(SourceProperties));
2504         g_mutex_init (&app.rtsp_mutex);
2505
2506         introspection_data = g_dbus_node_info_new_for_xml (introspection_xml, NULL);
2507         app.dbus_connection = NULL;
2508
2509         owner_id = g_bus_own_name (G_BUS_TYPE_SYSTEM,
2510                                    service,
2511                             G_BUS_NAME_OWNER_FLAGS_NONE,
2512                             on_bus_acquired,
2513                             on_name_acquired,
2514                             on_name_lost,
2515                             &app,
2516                             NULL);
2517
2518         if (!create_source_pipeline(&app))
2519                 g_print ("Failed to create source pipeline!");
2520
2521 #if WATCHDOG_TIMEOUT > 0
2522         g_timeout_add_seconds (WATCHDOG_TIMEOUT, watchdog_ping, &app);
2523 #endif
2524
2525         app.tcp_upstream = malloc(sizeof(DreamTCPupstream));
2526         app.tcp_upstream->state = UPSTREAM_STATE_DISABLED;
2527         app.tcp_upstream->auto_bitrate = AUTO_BITRATE;
2528
2529         app.hls_server = create_hls_server(&app);
2530
2531         app.rtsp_server = create_rtsp_server(&app);
2532
2533         app.loop = g_main_loop_new (NULL, FALSE);
2534         g_unix_signal_add (SIGINT, quit_signal, app.loop);
2535         g_unix_signal_add (SIGUSR1, (GSourceFunc) get_dot_graph, &app);
2536
2537         g_main_loop_run (app.loop);
2538
2539         if (app.tcp_upstream->state > UPSTREAM_STATE_DISABLED)
2540                 disable_tcp_upstream(&app);
2541         if (app.rtsp_server->state >= RTSP_STATE_IDLE)
2542                 disable_rtsp_server(&app);
2543         if (app.rtsp_server->clients_list)
2544                 g_list_free (app.rtsp_server->clients_list);
2545
2546         if (app.hls_server->state >= HLS_STATE_IDLE)
2547                 disable_hls_server(&app);
2548
2549         free(app.hls_server);
2550         free(app.rtsp_server);
2551         free(app.tcp_upstream);
2552
2553         destroy_pipeline(&app);
2554
2555         g_main_loop_unref (app.loop);
2556
2557         g_mutex_clear (&app.rtsp_mutex);
2558
2559         g_bus_unown_name (owner_id);
2560         g_dbus_node_info_unref (introspection_data);
2561
2562         return 0;
2563 }