fixed profile settings
[dreamrtspserver.git] / src / dreamrtspserver.c
1 /*
2  * GStreamer dreamrtspserver
3  * Copyright 2015-2016 Andreas Frisch <fraxinas@opendreambox.org>
4  *
5  * This program is licensed under the Creative Commons
6  * Attribution-NonCommercial-ShareAlike 3.0 Unported
7  * License. To view a copy of this license, visit
8  * http://creativecommons.org/licenses/by-nc-sa/3.0/ or send a letter to
9  * Creative Commons,559 Nathan Abbott Way,Stanford,California 94305,USA.
10  *
11  * Alternatively, this program may be distributed and executed on
12  * hardware which is licensed by Dream Property GmbH.
13  *
14  * This program is NOT free software. It is open source, you are allowed
15  * to modify it (if you keep the license), but it may not be commercially
16  * distributed other than under the conditions noted above.
17  */
18
19 #include "dreamrtspserver.h"
20 #include "gstdreamrtsp.h"
21
22 #define QUEUE_DEBUG \
23                 guint cur_bytes, cur_buf; \
24                 guint64 cur_time; \
25                 g_object_get (t->tstcpq, "current-level-bytes", &cur_bytes, NULL); \
26                 g_object_get (t->tstcpq, "current-level-buffers", &cur_buf, NULL); \
27                 g_object_get (t->tstcpq, "current-level-time", &cur_time, NULL);
28
29
30 static void send_signal (App *app, const gchar *signal_name, GVariant *parameters)
31 {
32         if (app->dbus_connection)
33         {
34                 GST_DEBUG ("sending signal name=%s parameters=%s", signal_name, parameters?g_variant_print (parameters, TRUE):"[not given]");
35                 g_dbus_connection_emit_signal (app->dbus_connection, NULL, object_name, service, signal_name, parameters, NULL);
36         }
37         else
38                 GST_DEBUG ("no dbus connection, can't send signal %s", signal_name);
39 }
40
41 static gboolean gst_set_inputmode(App *app, inputMode input_mode)
42 {
43         if (!app->pipeline)
44                 return FALSE;
45         if (!GST_IS_ELEMENT(app->asrc) ||
46             !GST_IS_ELEMENT(app->vsrc))
47                 return FALSE;
48
49         g_object_set (G_OBJECT (app->asrc), "input_mode", input_mode, NULL);
50         g_object_set (G_OBJECT (app->vsrc), "input_mode", input_mode, NULL);
51
52         inputMode ret1, ret2;
53         g_object_get (G_OBJECT (app->asrc), "input_mode", &ret1, NULL);
54         g_object_get (G_OBJECT (app->vsrc), "input_mode", &ret2, NULL);
55
56         if (input_mode != ret1 || input_mode != ret2)
57                 return FALSE;
58
59         GST_DEBUG("set input_mode %d", input_mode);
60         return TRUE;
61 }
62
63 static gboolean gst_set_framerate(App *app, int value)
64 {
65         GstCaps *oldcaps = NULL;
66         GstCaps *newcaps = NULL;
67         GstStructure *structure;
68         gboolean ret = FALSE;
69
70         if (!app->pipeline)
71                 return FALSE;
72         if (!GST_IS_ELEMENT(app->vsrc))
73                 return FALSE;
74
75         g_object_get (G_OBJECT (app->vsrc), "caps", &oldcaps, NULL);
76         if (!GST_IS_CAPS(oldcaps))
77                 return FALSE;
78
79         GST_DEBUG("set framerate %d fps... old caps %" GST_PTR_FORMAT, value, oldcaps);
80
81         newcaps = gst_caps_make_writable(oldcaps);
82         structure = gst_caps_steal_structure (newcaps, 0);
83         if (!structure)
84                 goto out;
85
86         if (value)
87                 gst_structure_set (structure, "framerate", GST_TYPE_FRACTION, value, 1, NULL);
88
89         gst_caps_append_structure (newcaps, structure);
90         GST_INFO("new caps %" GST_PTR_FORMAT, newcaps);
91         g_object_set (G_OBJECT (app->vsrc), "caps", newcaps, NULL);
92         ret = TRUE;
93
94 out:
95         if (GST_IS_CAPS(oldcaps))
96                 gst_caps_unref(oldcaps);
97         if (GST_IS_CAPS(newcaps))
98                 gst_caps_unref(newcaps);
99         return ret;
100 }
101
102 static gboolean gst_set_resolution(App *app, int width, int height)
103 {
104         GstCaps *oldcaps = NULL;
105         GstCaps *newcaps = NULL;
106         GstStructure *structure;
107         gboolean ret = FALSE;
108
109         if (!app->pipeline)
110                 return FALSE;
111         if (!GST_IS_ELEMENT(app->vsrc))
112                 return FALSE;
113
114         g_object_get (G_OBJECT (app->vsrc), "caps", &oldcaps, NULL);
115         if (!GST_IS_CAPS(oldcaps))
116                 return FALSE;
117
118         GST_DEBUG("set new resolution %ix%i... old caps %" GST_PTR_FORMAT, width, height, oldcaps);
119
120         newcaps = gst_caps_make_writable(oldcaps);
121         structure = gst_caps_steal_structure (newcaps, 0);
122         if (!structure)
123                 goto out;
124
125         if (width && height)
126         {
127                 gst_structure_set (structure, "width", G_TYPE_INT, width, NULL);
128                 gst_structure_set (structure, "height", G_TYPE_INT, height, NULL);
129         }
130         gst_caps_append_structure (newcaps, structure);
131         GST_INFO("new caps %" GST_PTR_FORMAT, newcaps);
132         g_object_set (G_OBJECT (app->vsrc), "caps", newcaps, NULL);
133         ret = TRUE;
134
135 out:
136         if (GST_IS_CAPS(oldcaps))
137                 gst_caps_unref(oldcaps);
138         if (GST_IS_CAPS(newcaps))
139                 gst_caps_unref(newcaps);
140         return ret;
141 }
142
143 static gboolean gst_set_profile(App *app, int value)
144 {
145         GstCaps *oldcaps = NULL;
146         GstCaps *newcaps = NULL;
147         GstStructure *structure;
148         gboolean ret = FALSE;
149
150         if (!app->pipeline)
151                 return FALSE;
152         if (!GST_IS_ELEMENT(app->vsrc))
153                 return FALSE;
154
155         g_object_get (G_OBJECT (app->vsrc), "caps", &oldcaps, NULL);
156         if (!GST_IS_CAPS(oldcaps))
157                 return FALSE;
158
159         GST_DEBUG("set profile %d... old caps %" GST_PTR_FORMAT, value, oldcaps);
160
161         newcaps = gst_caps_make_writable(oldcaps);
162         structure = gst_caps_steal_structure (newcaps, 0);
163         if (!structure)
164                 goto out;
165
166         if (value == 1)
167                 gst_structure_set (structure, "profile", G_TYPE_STRING, "high", NULL);
168         else
169                 gst_structure_set (structure, "profile", G_TYPE_STRING, "main", NULL);
170
171         gst_caps_append_structure (newcaps, structure);
172         GST_INFO("new caps %" GST_PTR_FORMAT, newcaps);
173         g_object_set (G_OBJECT (app->vsrc), "caps", newcaps, NULL);
174         ret = TRUE;
175
176 out:
177         if (GST_IS_CAPS(oldcaps))
178                 gst_caps_unref(oldcaps);
179         if (GST_IS_CAPS(newcaps))
180                 gst_caps_unref(newcaps);
181         return ret;
182 }
183
184 static gboolean gst_get_capsprop(App *app, GstElement *element, const gchar* prop_name, guint32 *value)
185 {
186         GstCaps *caps = NULL;
187         const GstStructure *structure;
188         gboolean ret = FALSE;
189
190         if (!app->pipeline)
191                 return FALSE;
192         if (!GST_IS_ELEMENT(element))
193                 return FALSE;
194
195         g_object_get (G_OBJECT (element), "caps", &caps, NULL);
196         if (!GST_IS_CAPS(caps))
197                 return FALSE;
198
199         if (gst_caps_is_empty(caps))
200                 goto out;
201
202         GST_DEBUG ("current caps %" GST_PTR_FORMAT, caps);
203
204         structure = gst_caps_get_structure (caps, 0);
205         if (!structure)
206                 goto out;
207
208         if (g_strcmp0 (prop_name, "framerate") == 0 && value)
209         {
210                 const GValue *framerate = gst_structure_get_value (structure, "framerate");
211                 if (GST_VALUE_HOLDS_FRACTION(framerate))
212                         *value = gst_value_get_fraction_numerator (framerate);
213                 else
214                         *value = 0;
215         }
216         else if (g_strcmp0 (prop_name, "profile") == 0 && value)
217         {
218                 const gchar* profile = gst_structure_get_string(structure, "profile");
219                 *value = 0;
220                 if (!profile)
221                         GST_WARNING("profile missing in caps! returned main profile");
222                 else if (g_strcmp0 (profile, "high") == 0)
223                         *value = 1;
224                 else if (g_strcmp0 (profile, "main") == 0)
225                         *value = 1;
226                 else
227                         GST_WARNING("unknown profile '%s' in caps! returned main profile", profile);
228         }
229         else if ((g_strcmp0 (prop_name, "width") == 0 || g_strcmp0 (prop_name, "height") == 0) && value)
230         {
231                 if (!gst_structure_get_uint(structure, prop_name, value))
232                         *value = 0;
233         }
234         else
235                 goto out;
236
237         GST_DEBUG ("%" GST_PTR_FORMAT"'s %s = %i", element, prop_name, *value);
238         ret = TRUE;
239 out:
240         if (GST_IS_CAPS(caps))
241                 gst_caps_unref(caps);
242         return ret;
243 }
244
245 static void get_source_properties (App *app)
246 {
247         SourceProperties *p = &app->source_properties;
248         if (GST_IS_ELEMENT(app->asrc))
249                 g_object_get (G_OBJECT (app->asrc), "bitrate", &p->audioBitrate, NULL);
250         if (GST_IS_ELEMENT(app->vsrc))
251         {
252                 g_object_get (G_OBJECT (app->vsrc), "bitrate", &p->videoBitrate, NULL);
253                 g_object_get (G_OBJECT (app->vsrc), "gop-length", &p->gopLength, NULL);
254                 g_object_get (G_OBJECT (app->vsrc), "gop-scene", &p->gopOnSceneChange, NULL);
255                 g_object_get (G_OBJECT (app->vsrc), "open-gop", &p->openGop, NULL);
256                 g_object_get (G_OBJECT (app->vsrc), "bframes", &p->bFrames, NULL);
257                 g_object_get (G_OBJECT (app->vsrc), "pframes", &p->pFrames, NULL);
258                 g_object_get (G_OBJECT (app->vsrc), "slices", &p->slices, NULL);
259                 g_object_get (G_OBJECT (app->vsrc), "level", &p->level, NULL);
260
261                 gst_get_capsprop(app, app->vsrc, "width", &p->width);
262                 gst_get_capsprop(app, app->vsrc, "height", &p->height);
263                 gst_get_capsprop(app, app->vsrc, "framerate", &p->framerate);
264                 gst_get_capsprop(app, app->vsrc, "profile", &p->profile);
265         }
266 }
267
268 static void apply_source_properties (App *app)
269 {
270         SourceProperties *p = &app->source_properties;
271         if (GST_IS_ELEMENT(app->asrc))
272         {
273                 if (p->audioBitrate)
274                         g_object_set (G_OBJECT (app->asrc), "bitrate", p->audioBitrate, NULL);
275         }
276         if (GST_IS_ELEMENT(app->vsrc))
277         {
278                 if (p->videoBitrate)
279                         g_object_set (G_OBJECT (app->vsrc), "bitrate", p->videoBitrate, NULL);
280
281                 g_object_set (G_OBJECT (app->vsrc), "gop-length", p->gopLength, NULL);
282                 g_object_set (G_OBJECT (app->vsrc), "gop-scene", p->gopOnSceneChange, NULL);
283                 g_object_set (G_OBJECT (app->vsrc), "open-gop", p->openGop, NULL);
284                 g_object_set (G_OBJECT (app->vsrc), "bframes", p->bFrames, NULL);
285                 g_object_set (G_OBJECT (app->vsrc), "pframes", p->pFrames, NULL);
286                 g_object_set (G_OBJECT (app->vsrc), "slices", p->slices, NULL);
287                 g_object_set (G_OBJECT (app->vsrc), "level", p->level, NULL);
288
289                 if (p->framerate)
290                         gst_set_framerate(app, p->framerate);
291                 if (p->width && p->height)
292                         gst_set_resolution(app,  p->width, p->height);
293                 gst_set_profile(app, p->profile);
294         }
295 }
296
297 static gboolean gst_set_int_property (App *app, GstElement *source, const gchar* key, gint32 value, gboolean zero_allowed)
298 {
299         if (!GST_IS_ELEMENT (source) || (!value && !zero_allowed))
300                 return FALSE;
301
302         g_object_set (G_OBJECT (source), key, value, NULL);
303
304         gint32 checkvalue = 0;
305
306         g_object_get (G_OBJECT (source), key, &checkvalue, NULL);
307
308         if (value != checkvalue)
309                 return FALSE;
310
311         get_source_properties(app);
312         return TRUE;
313 }
314
315 static gboolean gst_set_boolean_property (App *app, GstElement *source, const gchar* key, gboolean value)
316 {
317         if (!GST_IS_ELEMENT (source))
318                 return FALSE;
319
320         g_object_set (G_OBJECT (source), key, value, NULL);
321
322         gboolean checkvalue = FALSE;
323
324         g_object_get (G_OBJECT (source), key, &checkvalue, NULL);
325
326         if (value != checkvalue)
327                 return FALSE;
328
329         get_source_properties(app);
330         return TRUE;
331 }
332
333 static gboolean gst_set_bitrate (App *app, GstElement *source, gint32 value)
334 {
335         return gst_set_int_property(app, source, "bitrate", value, FALSE);
336 }
337
338 static gboolean gst_set_gop_length (App *app, gint32 value)
339 {
340         return gst_set_int_property(app, app->vsrc, "gop-length", value, TRUE);
341 }
342
343 static gboolean gst_set_gop_on_scene_change (App *app, gboolean value)
344 {
345         return gst_set_boolean_property(app, app->vsrc, "gop-scene", value);
346 }
347
348 static gboolean gst_set_open_gop (App *app, gboolean value)
349 {
350         return gst_set_boolean_property(app, app->vsrc, "open-gop", value);
351 }
352
353 static gboolean gst_set_bframes (App *app, gint32 value)
354 {
355         return gst_set_int_property(app, app->vsrc, "bframes", value, TRUE);
356 }
357
358 static gboolean gst_set_pframes (App *app, gint32 value)
359 {
360         return gst_set_int_property(app, app->vsrc, "pframes", value, TRUE);
361 }
362
363 static gboolean gst_set_slices (App *app, gint32 value)
364 {
365         return gst_set_int_property(app, app->vsrc, "slices", value, TRUE);
366 }
367
368 static gboolean gst_set_level (App *app, gint32 value)
369 {
370         return gst_set_int_property(app, app->vsrc, "level", value, TRUE);
371 }
372
373 gboolean upstream_resume_transmitting(App *app)
374 {
375         DreamTCPupstream *t = app->tcp_upstream;
376         GST_INFO_OBJECT (app, "resuming normal transmission...");
377         t->state = UPSTREAM_STATE_TRANSMITTING;
378         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_TRANSMITTING));
379         t->overrun_counter = 0;
380         t->overrun_period = GST_CLOCK_TIME_NONE;
381         t->id_signal_waiting = 0;
382         if (t->id_signal_keepalive)
383                 g_source_remove (t->id_signal_keepalive);
384         t->id_signal_keepalive = 0;
385         return G_SOURCE_REMOVE;
386 }
387
388 static GVariant *handle_get_property (GDBusConnection  *connection,
389                                       const gchar      *sender,
390                                       const gchar      *object_path,
391                                       const gchar      *interface_name,
392                                       const gchar      *property_name,
393                                       GError          **error,
394                                       gpointer          user_data)
395 {
396         App *app = user_data;
397
398         GST_DEBUG("dbus get property %s from %s", property_name, sender);
399
400         if (g_strcmp0 (property_name, "sourceState") == 0)
401         {
402                 if (app->pipeline)
403                 {
404                         GstState state = GST_STATE_VOID_PENDING;
405                         gst_element_get_state (GST_ELEMENT(app->pipeline), &state, NULL, 1*GST_USECOND);
406                         return g_variant_new_int32 ((int)state);
407                 }
408         }
409         else if (g_strcmp0 (property_name, "upstreamState") == 0)
410         {
411                 if (app->tcp_upstream)
412                         return g_variant_new_int32 (app->tcp_upstream->state);
413         }
414         else if (g_strcmp0 (property_name, "hlsState") == 0)
415         {
416                 if (app->hls_server)
417                         return g_variant_new_int32 (app->hls_server->state);
418         }
419         else if (g_strcmp0 (property_name, "inputMode") == 0)
420         {
421                 inputMode input_mode = -1;
422                 if (GST_IS_ELEMENT(app->asrc))
423                 {
424                         g_object_get (G_OBJECT (app->asrc), "input_mode", &input_mode, NULL);
425                         return g_variant_new_int32 (input_mode);
426                 }
427         }
428         else if (g_strcmp0 (property_name, "rtspClientCount") == 0)
429         {
430                 if (app->rtsp_server)
431                         return g_variant_new_int32 (g_list_length(app->rtsp_server->clients_list));
432         }
433         else if (g_strcmp0 (property_name, "uriParameters") == 0)
434         {
435                 if (app->rtsp_server)
436                         return g_variant_new_string (app->rtsp_server->uri_parameters);
437         }
438         else if (g_strcmp0 (property_name, "audioBitrate") == 0)
439         {
440                 gint rate = 0;
441                 if (GST_IS_ELEMENT(app->asrc))
442                 {
443                         g_object_get (G_OBJECT (app->asrc), "bitrate", &rate, NULL);
444                         return g_variant_new_int32 (rate);
445                 }
446         }
447         else if (g_strcmp0 (property_name, "videoBitrate") == 0)
448         {
449                 gint rate = 0;
450                 if (GST_IS_ELEMENT(app->vsrc))
451                 {
452                         g_object_get (G_OBJECT (app->vsrc), "bitrate", &rate, NULL);
453                         return g_variant_new_int32 (rate);
454                 }
455         }
456         else if (g_strcmp0 (property_name, "gopLength") == 0)
457         {
458                 gint length = 0;
459                 if (GST_IS_ELEMENT(app->vsrc))
460                 {
461                         g_object_get (G_OBJECT (app->vsrc), "gop-length", &length, NULL);
462                         return g_variant_new_int32 (length);
463                 }
464         }
465         else if (g_strcmp0 (property_name, "gopOnSceneChange") == 0)
466         {
467                 gboolean enabled = FALSE;
468                 if (GST_IS_ELEMENT(app->vsrc))
469                 {
470                         g_object_get (G_OBJECT (app->vsrc), "gop-scene", &enabled, NULL);
471                         return g_variant_new_boolean (enabled);
472                 }
473         }
474         else if (g_strcmp0 (property_name, "openGop") == 0)
475         {
476                 gboolean enabled = FALSE;
477                 if (GST_IS_ELEMENT(app->vsrc))
478                 {
479                         g_object_get (G_OBJECT (app->vsrc), "open-gop", &enabled, NULL);
480                         return g_variant_new_boolean (enabled);
481                 }
482         }
483         else if (g_strcmp0 (property_name, "bFrames") == 0)
484         {
485                 gint bframes = 0;
486                 if (GST_IS_ELEMENT(app->vsrc))
487                 {
488                         g_object_get (G_OBJECT (app->vsrc), "bframes", &bframes, NULL);
489                         return g_variant_new_int32 (bframes);
490                 }
491         }
492         else if (g_strcmp0 (property_name, "pFrames") == 0)
493         {
494                 gint pframes = 0;
495                 if (GST_IS_ELEMENT(app->vsrc))
496                 {
497                         g_object_get (G_OBJECT (app->vsrc), "pframes", &pframes, NULL);
498                         return g_variant_new_int32 (pframes);
499                 }
500         }
501         else if (g_strcmp0 (property_name, "slices") == 0)
502         {
503                 gint slices = 0;
504                 if (GST_IS_ELEMENT(app->vsrc))
505                 {
506                         g_object_get (G_OBJECT (app->vsrc), "slices", &slices, NULL);
507                         return g_variant_new_int32 (slices);
508                 }
509         }
510         else if (g_strcmp0 (property_name, "level") == 0)
511         {
512                 gint level = 0;
513                 if (GST_IS_ELEMENT(app->vsrc))
514                 {
515                         g_object_get (G_OBJECT (app->vsrc), "level", &level, NULL);
516                         return g_variant_new_int32 (level);
517                 }
518         }
519         else if (g_strcmp0 (property_name, "width") == 0 || g_strcmp0 (property_name, "height") == 0 || g_strcmp0 (property_name, "framerate") == 0 || g_strcmp0 (property_name, "profile") == 0)
520         {
521                 guint32 value;
522                 if (gst_get_capsprop(app, app->vsrc, property_name, &value))
523                         return g_variant_new_int32(value);
524                 GST_WARNING("can't handle_get_property name=%s", property_name);
525         }
526         else if (g_strcmp0 (property_name, "autoBitrate") == 0)
527         {
528                 if (app->tcp_upstream)
529                         return g_variant_new_boolean(app->tcp_upstream->auto_bitrate);
530         }
531         else if (g_strcmp0 (property_name, "path") == 0)
532         {
533                 if (app->rtsp_server)
534                         return g_variant_new_string (app->rtsp_server->rtsp_ts_path);
535         }
536         g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "[RTSPserver] Invalid property '%s'", property_name);
537         return NULL;
538 } // handle_get_property
539
540 static gboolean handle_set_property (GDBusConnection  *connection,
541                                      const gchar      *sender,
542                                      const gchar      *object_path,
543                                      const gchar      *interface_name,
544                                      const gchar      *property_name,
545                                      GVariant         *value,
546                                      GError          **error,
547                                      gpointer          user_data)
548 {
549         App *app = user_data;
550
551         gchar *valstr = g_variant_print (value, TRUE);
552         GST_DEBUG("dbus set property %s = %s from %s", property_name, valstr, sender);
553         g_free (valstr);
554
555         if (g_strcmp0 (property_name, "inputMode") == 0)
556         {
557                 inputMode input_mode = g_variant_get_int32 (value);
558                 if (input_mode >= INPUT_MODE_LIVE && input_mode <= INPUT_MODE_BACKGROUND )
559                 {
560                         if (gst_set_inputmode(app, input_mode))
561                                 return 1;
562                 }
563                 g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "[RTSPserver] can't set input_mode to %d", input_mode);
564                 return 0;
565         }
566         else if (g_strcmp0 (property_name, "audioBitrate") == 0)
567         {
568                 if (gst_set_bitrate (app, app->asrc, g_variant_get_int32 (value)))
569                         return 1;
570         }
571         else if (g_strcmp0 (property_name, "videoBitrate") == 0)
572         {
573                 if (gst_set_bitrate (app, app->vsrc, g_variant_get_int32 (value)))
574                         return 1;
575         }
576         else if (g_strcmp0 (property_name, "gopLength") == 0)
577         {
578                 if (gst_set_gop_length (app, g_variant_get_int32 (value)))
579                         return 1;
580         }
581         else if (g_strcmp0 (property_name, "gopOnSceneChange") == 0)
582         {
583                 if (gst_set_gop_on_scene_change (app, g_variant_get_boolean (value)))
584                         return 1;
585         }
586         else if (g_strcmp0 (property_name, "openGop") == 0)
587         {
588                 if (gst_set_open_gop (app, g_variant_get_boolean (value)))
589                         return 1;
590         }
591         else if (g_strcmp0 (property_name, "bFrames") == 0)
592         {
593                 if (gst_set_bframes (app, g_variant_get_int32 (value)))
594                         return 1;
595         }
596         else if (g_strcmp0 (property_name, "pFrames") == 0)
597         {
598                 if (gst_set_pframes (app, g_variant_get_int32 (value)))
599                         return 1;
600         }
601         else if (g_strcmp0 (property_name, "slices") == 0)
602         {
603                 if (gst_set_slices (app, g_variant_get_int32 (value)))
604                         return 1;
605         }
606         else if (g_strcmp0 (property_name, "level") == 0)
607         {
608                 if (gst_set_level (app, g_variant_get_int32 (value)))
609                         return 1;
610         }
611         else if (g_strcmp0 (property_name, "framerate") == 0)
612         {
613                 if (gst_set_framerate(app, g_variant_get_int32 (value)))
614                         return 1;
615                 g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "[RTSPserver] can't set property '%s' to %d", property_name, g_variant_get_int32 (value));
616                 return 0;
617         }
618         else if (g_strcmp0 (property_name, "profile") == 0)
619         {
620                 if (gst_set_profile(app, g_variant_get_int32 (value)))
621                         return 1;
622                 g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "[RTSPserver] can't set property '%s' to %d", property_name, g_variant_get_int32 (value));
623                 return 0;
624         }
625         else if (g_strcmp0 (property_name, "autoBitrate") == 0)
626         {
627                 if (app->tcp_upstream)
628                 {
629                         gboolean enable = g_variant_get_boolean(value);
630                         if (app->tcp_upstream->state == UPSTREAM_STATE_OVERLOAD)
631                                 upstream_resume_transmitting(app);
632                         app->tcp_upstream->auto_bitrate = enable;
633                         return 1;
634                 }
635         }
636         else
637         {
638                 g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "[RTSPserver] Invalid property: '%s'", property_name);
639                 return 0;
640         } // unknown property
641         g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, "[RTSPserver] Wrong state - can't set property: '%s'", property_name);
642         return 0;
643 } // handle_set_property
644
645 static void handle_method_call (GDBusConnection       *connection,
646                                 const gchar           *sender,
647                                 const gchar           *object_path,
648                                 const gchar           *interface_name,
649                                 const gchar           *method_name,
650                                 GVariant              *parameters,
651                                 GDBusMethodInvocation *invocation,
652                                 gpointer               user_data)
653 {
654         App *app = user_data;
655
656         gchar *paramstr = g_variant_print (parameters, TRUE);
657         GST_DEBUG("dbus handle method %s %s from %s", method_name, paramstr, sender);
658         g_free (paramstr);
659         if (g_strcmp0 (method_name, "enableRTSP") == 0)
660         {
661                 gboolean result = FALSE;
662                 if (app->pipeline)
663                 {
664                         gboolean state;
665                         guint32 port;
666                         const gchar *path, *user, *pass;
667
668                         g_variant_get (parameters, "(b&su&s&s)", &state, &path, &port, &user, &pass);
669                         GST_DEBUG("app->pipeline=%p, enableRTSP state=%i path=%s port=%i user=%s pass=%s", app->pipeline, state, path, port, user, pass);
670
671                         if (state == TRUE && app->rtsp_server->state >= RTSP_STATE_DISABLED)
672                                 result = enable_rtsp_server(app, path, port, user, pass);
673                         else if (state == FALSE && app->rtsp_server->state >= RTSP_STATE_IDLE)
674                         {
675                                 result = disable_rtsp_server(app);
676                                 if (app->tcp_upstream->state == UPSTREAM_STATE_DISABLED && app->hls_server->state == HLS_STATE_DISABLED)
677                                 {
678                                         destroy_pipeline(app);
679                                         create_source_pipeline(app);
680                                 }
681                         }
682                 }
683                 g_dbus_method_invocation_return_value (invocation,  g_variant_new ("(b)", result));
684         }
685         else if (g_strcmp0 (method_name, "enableHLS") == 0)
686         {
687                 gboolean result = FALSE;
688                 if (app->pipeline)
689                 {
690                         gboolean state;
691                         guint32 port;
692                         const gchar *user, *pass;
693
694                         g_variant_get (parameters, "(bu&s&s)", &state, &port, &user, &pass);
695                         GST_DEBUG("app->pipeline=%p, enableHLS state=%i port=%i user=%s pass=%s", app->pipeline, state, port, user, pass);
696
697                         if (state == TRUE && app->hls_server->state >= HLS_STATE_DISABLED)
698                                 result = enable_hls_server(app, port, user, pass);
699                         else if (state == FALSE && app->hls_server->state >= HLS_STATE_IDLE)
700                         {
701                                 result = disable_hls_server(app);
702                                 if (app->tcp_upstream->state == UPSTREAM_STATE_DISABLED && app->rtsp_server->state == RTSP_STATE_DISABLED)
703                                 {
704                                         destroy_pipeline(app);
705                                         create_source_pipeline(app);
706                                 }
707                         }
708                 }
709                 g_dbus_method_invocation_return_value (invocation,  g_variant_new ("(b)", result));
710         }
711         else if (g_strcmp0 (method_name, "enableUpstream") == 0)
712         {
713                 gboolean result = FALSE;
714                 if (app->pipeline)
715                 {
716                         gboolean state;
717                         const gchar *upstream_host, *token;
718                         guint32 upstream_port;
719
720                         g_variant_get (parameters, "(b&su&s)", &state, &upstream_host, &upstream_port, &token);
721                         GST_DEBUG("app->pipeline=%p, enableUpstream state=%i host=%s port=%i token=%s (currently in state %u)", app->pipeline, state, upstream_host, upstream_port, token, app->tcp_upstream->state);
722
723                         if (state == TRUE && app->tcp_upstream->state == UPSTREAM_STATE_DISABLED)
724                                 result = enable_tcp_upstream(app, upstream_host, upstream_port, token);
725                         else if (state == FALSE && app->tcp_upstream->state >= UPSTREAM_STATE_CONNECTING)
726                         {
727                                 result = disable_tcp_upstream(app);
728                                 if (app->rtsp_server->state == RTSP_STATE_DISABLED)
729                                 {
730                                         destroy_pipeline(app);
731                                         create_source_pipeline(app);
732                                 }
733                         }
734                 }
735                 g_dbus_method_invocation_return_value (invocation,  g_variant_new ("(b)", result));
736         }
737         else if (g_strcmp0 (method_name, "setResolution") == 0)
738         {
739                 int width, height;
740                 g_variant_get (parameters, "(ii)", &width, &height);
741                 if (gst_set_resolution(app, width, height))
742                         g_dbus_method_invocation_return_value (invocation, NULL);
743                 else
744                         g_dbus_method_invocation_return_error (invocation, G_IO_ERROR, G_IO_ERROR_INVALID_DATA, "[RTSPserver] can't set resolution %dx%d", width, height);
745         }
746         // Default: No such method
747         else
748         {
749                 g_dbus_method_invocation_return_error (invocation, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT, "[RTSPserver] Invalid method: '%s'", method_name);
750         } // if it's an unknown method
751 } // handle_method_call
752
753 static void on_bus_acquired (GDBusConnection *connection,
754                              const gchar     *name,
755                              gpointer        user_data)
756 {
757         static GDBusInterfaceVTable interface_vtable =
758         {
759                 handle_method_call,
760                 handle_get_property,
761                 handle_set_property,
762                 { 0, }
763         };
764
765         GError *error = NULL;
766         GST_DEBUG ("aquired dbus (\"%s\" @ %p)", name, connection);
767         g_dbus_connection_register_object (connection, object_name, introspection_data->interfaces[0], &interface_vtable, user_data, NULL, &error);
768 } // on_bus_acquired
769
770 static void on_name_acquired (GDBusConnection *connection,
771                               const gchar     *name,
772                               gpointer         user_data)
773 {
774         App *app = user_data;
775         app->dbus_connection = connection;
776         GST_DEBUG ("aquired dbus name (\"%s\")", name);
777         if (gst_element_set_state (app->pipeline, GST_STATE_READY) != GST_STATE_CHANGE_SUCCESS)
778                 GST_ERROR ("Failed to bring state of source pipeline to READY");
779 } // on_name_acquired
780
781 static void on_name_lost (GDBusConnection *connection,
782                           const gchar     *name,
783                           gpointer         user_data)
784         {
785         App *app = user_data;
786         app->dbus_connection = NULL;
787         GST_WARNING ("lost dbus name (\"%s\" @ %p)", name, connection);
788         //  g_main_loop_quit (app->loop);
789 } // on_name_lost
790
791 static gboolean message_cb (GstBus * bus, GstMessage * message, gpointer user_data)
792 {
793         App *app = user_data;
794
795 //      DREAMRTSPSERVER_LOCK (app);
796         GST_TRACE_OBJECT (app, "message %" GST_PTR_FORMAT "", message);
797         switch (GST_MESSAGE_TYPE (message)) {
798                 case GST_MESSAGE_STATE_CHANGED:
799                 {
800                         GstState old_state, new_state;
801                         gst_message_parse_state_changed(message, &old_state, &new_state, NULL);
802                         if (old_state == new_state)
803                                 break;
804
805                         if (GST_MESSAGE_SRC(message) == GST_OBJECT(app->pipeline))
806                         {
807                                 GST_DEBUG_OBJECT(app, "state transition %s -> %s", gst_element_state_get_name(old_state), gst_element_state_get_name(new_state));
808                                 send_signal (app, "sourceStateChanged", g_variant_new("(i)", (int) new_state));
809                         }
810                         break;
811                 }
812                 case GST_MESSAGE_ERROR:
813                 {
814                         GError *err = NULL;
815                         gchar *name, *debug = NULL;
816                         name = gst_object_get_path_string (message->src);
817                         gst_message_parse_error (message, &err, &debug);
818                         if (err->domain == GST_RESOURCE_ERROR)
819                         {
820                                 if (err->code == GST_RESOURCE_ERROR_READ)
821                                 {
822                                         GST_INFO ("element %s: %s", name, err->message);
823                                         send_signal (app, "encoderError", NULL);
824 //                                      DREAMRTSPSERVER_UNLOCK (app);
825                                         disable_tcp_upstream(app);
826                                         destroy_pipeline(app);
827                                 }
828                                 if (err->code == GST_RESOURCE_ERROR_WRITE)
829                                 {
830                                         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_FAILED));
831                                         GST_INFO ("element %s: %s -> this means PEER DISCONNECTED", name, err->message);
832                                         GST_DEBUG ("Additional ERROR debug info: %s", debug);
833 //                                      DREAMRTSPSERVER_UNLOCK (app);
834                                         disable_tcp_upstream(app);
835                                         if (app->rtsp_server->state == RTSP_STATE_DISABLED)
836                                         {
837                                                 destroy_pipeline(app);
838                                                 create_source_pipeline(app);
839                                         }
840                                 }
841                         }
842                         else
843                         {
844                                 GST_ERROR ("ERROR: from element %s: %s", name, err->message);
845                                 if (debug != NULL)
846                                         GST_ERROR ("Additional debug info: %s", debug);
847                                 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"dreamrtspserver-error");
848                         }
849                         g_error_free (err);
850                         g_free (debug);
851                         g_free (name);
852                         break;
853                 }
854                 case GST_MESSAGE_WARNING:
855                 {
856                         GError *err = NULL;
857                         gchar *name, *debug = NULL;
858                         name = gst_object_get_path_string (message->src);
859                         gst_message_parse_warning (message, &err, &debug);
860                         GST_WARNING ("WARNING: from element %s: %s", name, err->message);
861                         if (debug != NULL)
862                                 GST_WARNING ("Additional debug info: %s", debug);
863 #if 1
864                         if (err->domain == GST_STREAM_ERROR && err->code == GST_STREAM_ERROR_ENCODE)
865                         {
866                                 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"dreamrtspserver-encode-error");
867                                 g_main_loop_quit (app->loop);
868                                 return FALSE;
869                         }
870 #endif
871                         g_error_free (err);
872                         g_free (debug);
873                         g_free (name);
874                         break;
875                 }
876                 case GST_MESSAGE_EOS:
877                         g_print ("Got EOS\n");
878 //                      DREAMRTSPSERVER_UNLOCK (app);
879                         g_main_loop_quit (app->loop);
880                         return FALSE;
881                 default:
882                         break;
883         }
884 //      DREAMRTSPSERVER_UNLOCK (app);
885         return TRUE;
886 }
887
888 static void media_unprepare (GstRTSPMedia * media, gpointer user_data)
889 {
890         App *app = user_data;
891         DreamRTSPserver *r = app->rtsp_server;
892         GST_INFO("no more clients -> media unprepared!");
893
894 //      DREAMRTSPSERVER_LOCK (app);
895         if (media == r->es_media)
896         {
897                 r->es_media = NULL;
898                 r->es_aappsrc = r->es_vappsrc = NULL;
899         }
900         else if (media == r->ts_media)
901         {
902                 r->ts_media = NULL;
903                 r->ts_appsrc = NULL;
904         }
905         if (!r->es_media && !r->ts_media)
906         {
907                 if (app->tcp_upstream->state == UPSTREAM_STATE_DISABLED && app->hls_server->state == HLS_STATE_DISABLED)
908                         halt_source_pipeline(app);
909                 if (r->state == RTSP_STATE_RUNNING)
910                 {
911                         GST_DEBUG ("set RTSP_STATE_IDLE");
912                         send_signal (app, "rtspStateChanged", g_variant_new("(i)", RTSP_STATE_IDLE));
913                         r->state = RTSP_STATE_IDLE;
914                 }
915         }
916 //      DREAMRTSPSERVER_UNLOCK (app);
917 }
918
919 static void client_closed (GstRTSPClient * client, gpointer user_data)
920 {
921         App *app = user_data;
922         app->rtsp_server->clients_list = g_list_remove(g_list_first (app->rtsp_server->clients_list), client);
923         gint no_clients = g_list_length(app->rtsp_server->clients_list);
924         GST_INFO("client_closed  (number of clients: %i)", no_clients);
925         send_signal (app, "rtspClientCountChanged", g_variant_new("(is)", no_clients, ""));
926 }
927
928 static void client_connected (GstRTSPServer * server, GstRTSPClient * client, gpointer user_data)
929 {
930         App *app = user_data;
931         app->rtsp_server->clients_list = g_list_append(app->rtsp_server->clients_list, client);
932         const gchar *ip = gst_rtsp_connection_get_ip (gst_rtsp_client_get_connection (client));
933         gint no_clients = g_list_length(app->rtsp_server->clients_list);
934         GST_INFO("client_connected %" GST_PTR_FORMAT " from %s  (number of clients: %i)", client, ip, no_clients);
935         g_signal_connect (client, "closed", (GCallback) client_closed, app);
936         send_signal (app, "rtspClientCountChanged", g_variant_new("(is)", no_clients, ip));
937 }
938
939 static void media_configure (GstRTSPMediaFactory * factory, GstRTSPMedia * media, gpointer user_data)
940 {
941         App *app = user_data;
942         DreamRTSPserver *r = app->rtsp_server;
943         DREAMRTSPSERVER_LOCK (app);
944
945         if (GST_DREAM_RTSP_MEDIA_FACTORY (factory) == r->es_factory)
946         {
947                 r->es_media = media;
948                 GstElement *element = gst_rtsp_media_get_element (media);
949                 r->es_aappsrc = gst_bin_get_by_name_recurse_up (GST_BIN (element), ES_AAPPSRC);
950                 r->es_vappsrc = gst_bin_get_by_name_recurse_up (GST_BIN (element), ES_VAPPSRC);
951                 gst_object_unref(element);
952                 g_signal_connect (media, "unprepared", (GCallback) media_unprepare, app);
953                 g_object_set (r->es_aappsrc, "format", GST_FORMAT_TIME, NULL);
954                 g_object_set (r->es_vappsrc, "format", GST_FORMAT_TIME, NULL);
955         }
956         else if (GST_DREAM_RTSP_MEDIA_FACTORY (factory) == r->ts_factory)
957         {
958                 r->ts_media = media;
959                 GstElement *element = gst_rtsp_media_get_element (media);
960                 r->ts_appsrc = gst_bin_get_by_name_recurse_up (GST_BIN (element), TS_APPSRC);
961                 r->ts_appsrc = gst_bin_get_by_name_recurse_up (GST_BIN (element), TS_APPSRC);
962                 gst_object_unref(element);
963                 g_signal_connect (media, "unprepared", (GCallback) media_unprepare, app);
964                 g_object_set (r->ts_appsrc, "format", GST_FORMAT_TIME, NULL);
965         }
966         r->rtsp_start_pts = r->rtsp_start_dts = GST_CLOCK_TIME_NONE;
967         r->state = RTSP_STATE_RUNNING;
968         send_signal (app, "rtspStateChanged", g_variant_new("(i)", RTSP_STATE_RUNNING));
969         GST_DEBUG ("set RTSP_STATE_RUNNING");
970         start_rtsp_pipeline(app);
971         DREAMRTSPSERVER_UNLOCK (app);
972 }
973
974 static void uri_parametrized (GstDreamRTSPMediaFactory * factory, gchar *parameters, gpointer user_data)
975 {
976         App *app = user_data;
977         GST_INFO_OBJECT (app, "parametrized uri query: '%s'", parameters);
978         app->rtsp_server->uri_parameters = g_strdup(parameters);
979         send_signal (app, "uriParametersChanged", g_variant_new("(s)", app->rtsp_server->uri_parameters));
980 }
981
982 static GstPadProbeReturn cancel_waiting_probe (GstPad * sinkpad, GstPadProbeInfo * info, gpointer user_data)
983 {
984         App *app = user_data;
985         DreamTCPupstream *t = app->tcp_upstream;
986         if (((info->type & GST_PAD_PROBE_TYPE_BUFFER) && GST_IS_BUFFER(GST_PAD_PROBE_INFO_BUFFER(info))) ||
987              ((info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST) && gst_buffer_list_length(GST_PAD_PROBE_INFO_BUFFER_LIST(info))))
988         {
989                 QUEUE_DEBUG;
990                 GST_LOG_OBJECT (app, "cancel upstream_set_waiting timeout because data flow was restored! queue properties current-level-bytes=%d current-level-buffers=%d current-level-time=%" GST_TIME_FORMAT "",
991                                   cur_bytes, cur_buf, GST_TIME_ARGS(cur_time));
992                 if (t->id_signal_waiting)
993                         g_source_remove (t->id_signal_waiting);
994                 t->id_signal_waiting = 0;
995                 if (t->id_signal_keepalive)
996                         g_source_remove (t->id_signal_keepalive);
997                 t->id_signal_keepalive = 0;
998                 if (t->id_signal_overrun == 0)
999                         t->id_signal_overrun = g_signal_connect (t->tstcpq, "overrun", G_CALLBACK (queue_overrun), app);
1000                 t->id_resume = 0;
1001                 return GST_PAD_PROBE_REMOVE;
1002         }
1003         else
1004                 GST_WARNING_OBJECT(app, "probed unhandled %" GST_PTR_FORMAT ", dataflow not restored?", info->data);
1005         return GST_PAD_PROBE_OK;
1006 }
1007
1008 static GstPadProbeReturn bitrate_measure_probe (GstPad * sinkpad, GstPadProbeInfo * info, gpointer user_data)
1009 {
1010         App *app = user_data;
1011         DreamTCPupstream *t = app->tcp_upstream;
1012         GstClockTime now = gst_clock_get_time (app->clock);
1013         GstBuffer *buffer = NULL;
1014         guint idx = 0, num_buffers = 1;
1015         do {
1016                 if (info->type & GST_PAD_PROBE_TYPE_BUFFER)
1017                 {
1018                         buffer = GST_PAD_PROBE_INFO_BUFFER (info);
1019                 }
1020                 else if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)
1021                 {
1022                         GstBufferList *bufferlist = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
1023                         num_buffers = gst_buffer_list_length (bufferlist);
1024                         buffer = gst_buffer_list_get (bufferlist, idx);
1025                 }
1026                 if (GST_IS_BUFFER(buffer))
1027                         t->bitrate_sum += gst_buffer_get_size (buffer);
1028                 idx++;
1029         } while (idx < num_buffers);
1030
1031         QUEUE_DEBUG;
1032         GST_TRACE_OBJECT(app, "probetype=%i num_buffers=%i data=%" GST_PTR_FORMAT " size was=%zu bitrate_sum=%zu now=%" GST_TIME_FORMAT " avg at %" GST_TIME_FORMAT " queue properties current-level-bytes=%d current-level-buffers=%d current-level-time=%" GST_TIME_FORMAT "",
1033                         info->type, num_buffers, info->data, gst_buffer_get_size (buffer), t->bitrate_sum, GST_TIME_ARGS(now), GST_TIME_ARGS(t->measure_start+BITRATE_AVG_PERIOD), cur_bytes, cur_buf, GST_TIME_ARGS(cur_time));
1034         if (now > t->measure_start+BITRATE_AVG_PERIOD)
1035         {
1036                 gint bitrate = t->bitrate_sum*8/GST_TIME_AS_MSECONDS(BITRATE_AVG_PERIOD);
1037                 t->bitrate_avg ? (t->bitrate_avg = (t->bitrate_avg+bitrate)/2) : (t->bitrate_avg = bitrate);
1038                 send_signal (app, "tcpBitrate", g_variant_new("(i)", bitrate));
1039                 t->measure_start = now;
1040                 t->bitrate_sum = 0;
1041         }
1042         return GST_PAD_PROBE_OK;
1043 }
1044
1045 gboolean upstream_keep_alive (App *app)
1046 {
1047         GstBuffer *buf = gst_buffer_new_allocate (NULL, TS_PACK_SIZE, NULL);
1048         gst_buffer_memset (buf, 0, 0x00, TS_PACK_SIZE);
1049         GstPad * srcpad = gst_element_get_static_pad (app->tcp_upstream->tstcpq, "src");
1050
1051         GstState state;
1052         gst_element_get_state (app->tcp_upstream->tcpsink, &state, NULL, 10*GST_SECOND);
1053         GST_INFO_OBJECT(app, "tcpsink's state=%s", gst_element_state_get_name (state));
1054         gst_element_get_state (app->tcp_upstream->tstcpq, &state, NULL, 10*GST_SECOND);
1055         GST_INFO_OBJECT(app, "tstcpq's state=%s", gst_element_state_get_name (state));
1056
1057         if ( state == GST_STATE_PAUSED )
1058         {
1059                 GstStateChangeReturn sret = gst_element_set_state (app->tcp_upstream->tcpsink, GST_STATE_PLAYING);
1060                 GST_DEBUG_OBJECT(app, "gst_element_set_state (tcpsink, GST_STATE_PLAYING) = %i", sret);
1061                 sret = gst_element_set_state (app->tcp_upstream->tstcpq, GST_STATE_PLAYING);
1062                 GST_DEBUG_OBJECT(app, "gst_element_set_state (tstcpq, GST_STATE_PLAYING) = %i", sret);
1063                 GST_INFO ("injecting keepalive %" GST_PTR_FORMAT " on pad %s:%s", buf, GST_DEBUG_PAD_NAME (srcpad));
1064                 gst_pad_push (srcpad, gst_buffer_ref(buf));
1065                 sret = gst_element_set_state (app->tcp_upstream->tcpsink, GST_STATE_PAUSED);
1066                 GST_DEBUG_OBJECT(app, "gst_element_set_state (tcpsink, GST_STATE_PAUSED) = %i", sret);
1067                 sret = gst_element_set_state (app->tcp_upstream->tstcpq, GST_STATE_PAUSED);
1068                 GST_DEBUG_OBJECT(app, "gst_element_set_state (tstcpq, GST_STATE_PAUSED) = %i", sret);
1069         }
1070
1071         return G_SOURCE_REMOVE;
1072 }
1073
1074 gboolean upstream_set_waiting (App *app)
1075 {
1076         DREAMRTSPSERVER_LOCK (app);
1077         DreamTCPupstream *t = app->tcp_upstream;
1078         t->overrun_counter = 0;
1079         t->overrun_period = GST_CLOCK_TIME_NONE;
1080         t->state = UPSTREAM_STATE_WAITING;
1081         g_object_set (t->tcpsink, "max-lateness", G_GINT64_CONSTANT(1)*GST_SECOND, NULL);
1082         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_WAITING));
1083         g_signal_connect (t->tstcpq, "underrun", G_CALLBACK (queue_underrun), app);
1084         GstPad *sinkpad = gst_element_get_static_pad (t->tcpsink, "sink");
1085         if (t->id_resume)
1086         {
1087                 gst_pad_remove_probe (sinkpad, t->id_resume);
1088                 t->id_resume = 0;
1089         }
1090         if (t->id_bitrate_measure)
1091         {
1092                 gst_pad_remove_probe (sinkpad, t->id_bitrate_measure);
1093                 t->id_bitrate_measure = 0;
1094         }
1095         send_signal (app, "tcpBitrate", g_variant_new("(i)", 0));
1096         gst_object_unref (sinkpad);
1097         pause_source_pipeline(app);
1098         t->id_signal_waiting = 0;
1099         t->id_signal_keepalive = g_timeout_add_seconds (5, (GSourceFunc) upstream_keep_alive, app);
1100         DREAMRTSPSERVER_UNLOCK (app);
1101         return G_SOURCE_REMOVE;
1102 }
1103
1104 static void queue_underrun (GstElement * queue, gpointer user_data)
1105 {
1106         App *app = user_data;
1107         DreamTCPupstream *t = app->tcp_upstream;
1108         QUEUE_DEBUG;
1109         GST_DEBUG_OBJECT (app, "queue underrun! properties: current-level-bytes=%d current-level-buffers=%d current-level-time=%" GST_TIME_FORMAT "", cur_bytes, cur_buf, GST_TIME_ARGS(cur_time));
1110         if (queue == t->tstcpq && app->rtsp_server->state != RTSP_STATE_RUNNING)
1111         {
1112                 if (unpause_source_pipeline(app))
1113                 {
1114                         DREAMRTSPSERVER_LOCK (app);
1115 //                      g_object_set (G_OBJECT (t->tstcpq), "leaky", 2, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, NULL);
1116                         g_object_set (t->tcpsink, "max-lateness", G_GINT64_CONSTANT(-1), NULL);
1117                         g_signal_handlers_disconnect_by_func (queue, G_CALLBACK (queue_underrun), app);
1118                         t->id_signal_overrun = g_signal_connect (queue, "overrun", G_CALLBACK (queue_overrun), app);
1119                         t->state = UPSTREAM_STATE_TRANSMITTING;
1120                         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_TRANSMITTING));
1121                         if (t->id_bitrate_measure == 0)
1122                         {
1123                                 GstPad *sinkpad = gst_element_get_static_pad (t->tcpsink, "sink");
1124                                 t->id_bitrate_measure = gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BUFFER|GST_PAD_PROBE_TYPE_BUFFER_LIST, (GstPadProbeCallback) bitrate_measure_probe, app, NULL);
1125                                 gst_object_unref (sinkpad);
1126                         }
1127                         t->measure_start = gst_clock_get_time (app->clock);
1128                         t->bitrate_sum = t->bitrate_avg = 0;
1129                         if (t->overrun_period == GST_CLOCK_TIME_NONE)
1130                                 t->overrun_period = gst_clock_get_time (app->clock);
1131                         DREAMRTSPSERVER_UNLOCK (app);
1132                 }
1133         }
1134 }
1135
1136 static void queue_overrun (GstElement * queue, gpointer user_data)
1137 {
1138         App *app = user_data;
1139         DreamTCPupstream *t = app->tcp_upstream;
1140         DREAMRTSPSERVER_LOCK (app);
1141         if (queue == t->tstcpq/* && app->rtsp_server->state != RTSP_STATE_IDLE*/) //!!!TODO
1142         {
1143                 QUEUE_DEBUG;
1144                 GST_DEBUG_OBJECT(app, "%" GST_PTR_FORMAT " overrun! properties: current-level-bytes=%d current-level-buffers=%d current-level-time=%" GST_TIME_FORMAT " rtsp_server->state=%i", queue, cur_bytes, cur_buf, GST_TIME_ARGS(cur_time), app->rtsp_server->state);
1145                 GstClockTime now = gst_clock_get_time (app->clock);
1146                 if (t->state == UPSTREAM_STATE_CONNECTING)
1147                 {
1148                         GST_DEBUG_OBJECT (queue, "initial queue overrun after connect");
1149 //                      g_object_set (G_OBJECT (t->tstcpq), "leaky", 0, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, "min-threshold-buffers", 0, NULL);
1150                         g_signal_handlers_disconnect_by_func(t->tstcpq, G_CALLBACK (queue_overrun), app);
1151                         t->id_signal_overrun = 0;
1152                         DREAMRTSPSERVER_UNLOCK (app);
1153                         upstream_set_waiting (app);
1154                         return;
1155                 }
1156                 else if (t->state == UPSTREAM_STATE_TRANSMITTING)
1157                 {
1158                         if (t->id_signal_waiting)
1159                         {
1160                                 g_signal_handlers_disconnect_by_func(t->tstcpq, G_CALLBACK (queue_overrun), app);
1161                                 t->id_signal_overrun = 0;
1162                                 GST_DEBUG_OBJECT (queue, "disconnect overrun callback and wait for timeout or for buffer flow!");
1163                                 DREAMRTSPSERVER_UNLOCK (app);
1164                                 return;
1165                         }
1166                         t->overrun_counter++;
1167                         GST_DEBUG_OBJECT (queue, "queue overrun during transmit... %i (max %i) overruns within %" GST_TIME_FORMAT "", t->overrun_counter, MAX_OVERRUNS, GST_TIME_ARGS (now-t->overrun_period));
1168                         if (now > t->overrun_period+OVERRUN_TIME)
1169                         {
1170                                 t->overrun_counter = 0;
1171                                 t->overrun_period = now;
1172                         }
1173                         if (t->overrun_counter >= MAX_OVERRUNS)
1174                         {
1175                                 if (t->auto_bitrate)
1176                                 {
1177                                         t->state = UPSTREAM_STATE_ADJUSTING;
1178                                         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_OVERLOAD));
1179                                         auto_adjust_bitrate (app);
1180                                         t->overrun_period = now;
1181                                 }
1182                                 else
1183                                 {
1184                                         t->state = UPSTREAM_STATE_OVERLOAD;
1185                                         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", UPSTREAM_STATE_OVERLOAD));
1186                                         GST_DEBUG_OBJECT (queue, "auto overload handling disabled, go into UPSTREAM_STATE_OVERLOAD");
1187                                         if (t->id_signal_waiting)
1188                                                 g_source_remove (t->id_signal_waiting);
1189                                         t->id_signal_waiting = g_timeout_add_seconds (RESUME_DELAY, (GSourceFunc) upstream_resume_transmitting, app);
1190                                 }
1191                         }
1192                         else
1193                         {
1194                                 GST_DEBUG_OBJECT (queue, "SET upstream_set_waiting timeout!");
1195                                 GstPad *sinkpad = gst_element_get_static_pad (t->tcpsink, "sink");
1196                                 t->id_resume = gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BUFFER|GST_PAD_PROBE_TYPE_BUFFER_LIST, (GstPadProbeCallback) cancel_waiting_probe, app, NULL);
1197                                 gst_object_unref (sinkpad);
1198                                 if (t->id_signal_waiting)
1199                                         g_source_remove (t->id_signal_waiting);
1200                                 t->id_signal_waiting = g_timeout_add_seconds (5, (GSourceFunc) upstream_set_waiting, app);
1201                         }
1202                 }
1203                 else if (t->state == UPSTREAM_STATE_OVERLOAD)
1204                 {
1205                         t->overrun_counter++;
1206                         if (t->id_signal_waiting)
1207                                 g_source_remove (t->id_signal_waiting);
1208                         t->id_signal_waiting = g_timeout_add_seconds (5, (GSourceFunc) upstream_resume_transmitting, app);
1209                         GST_DEBUG_OBJECT (queue, "still in UPSTREAM_STATE_OVERLOAD overrun_counter=%i, reset resume transmit timeout!", t->overrun_counter);
1210                 }
1211                 else if (t->state == UPSTREAM_STATE_ADJUSTING)
1212                 {
1213                         if (now < t->overrun_period+BITRATE_AVG_PERIOD)
1214                         {
1215                                 GST_DEBUG_OBJECT (queue, "still in grace period, waiting for bitrate adjustment to take effect. %"G_GUINT64_FORMAT" ms remaining", GST_TIME_AS_MSECONDS(t->overrun_period+BITRATE_AVG_PERIOD-now));
1216                         }
1217                         else
1218                         {
1219                                 t->overrun_counter++;
1220                                 if (t->overrun_counter < MAX_OVERRUNS)
1221                                         GST_DEBUG_OBJECT (queue, "still waiting for bitrate adjustment to take effect. overrun_counter=%i", t->overrun_counter);
1222                                 else
1223                                 {
1224                                         GST_DEBUG_OBJECT (queue, "max overruns %i hit again while auto adjusting. -> RE-ADJUST!", t->overrun_counter);
1225                                         t->overrun_counter = 0;
1226                                         auto_adjust_bitrate (app);
1227                                         t->overrun_period = now;
1228                                 }
1229                         }
1230                 }
1231         }
1232         DREAMRTSPSERVER_UNLOCK (app);
1233 }
1234
1235 static void auto_adjust_bitrate(App *app)
1236 {
1237         DreamTCPupstream *t = app->tcp_upstream;
1238         get_source_properties (app);
1239         SourceProperties *p = &app->source_properties;
1240         GST_DEBUG_OBJECT (app, "auto overload handling: reduce bitrate from audioBitrate=%i videoBitrate=%i to fit network bandwidth=%i kbit/s", p->audioBitrate, p->videoBitrate, t->bitrate_avg);
1241         if (p->audioBitrate > 96)
1242                 p->audioBitrate = p->audioBitrate*0.8;
1243         p->videoBitrate = (t->bitrate_avg - p->audioBitrate) * 0.8;
1244         GST_INFO_OBJECT (app, "auto overload handling: newAudioBitrate=%i newVideoBitrate=%i newTotalBitrate~%i kbit/s", p->audioBitrate, p->videoBitrate, p->audioBitrate+p->videoBitrate);
1245         apply_source_properties(app);
1246         if (t->id_signal_waiting)
1247                 g_source_remove (t->id_signal_waiting);
1248         t->id_signal_waiting = g_timeout_add_seconds (RESUME_DELAY, (GSourceFunc) upstream_resume_transmitting, app);
1249         t->overrun_counter = 0;
1250 }
1251
1252 static GstFlowReturn handover_payload (GstElement * appsink, gpointer user_data)
1253 {
1254         App *app = user_data;
1255         DreamRTSPserver *r = app->rtsp_server;
1256
1257         GstAppSrc *appsrc = NULL;
1258         if ( appsink == r->vappsink )
1259                 appsrc = GST_APP_SRC(r->es_vappsrc);
1260         else if ( appsink == r->aappsink )
1261                 appsrc = GST_APP_SRC(r->es_aappsrc);
1262         else if ( appsink == r->tsappsink )
1263                 appsrc = GST_APP_SRC(r->ts_appsrc);
1264
1265         GstSample *sample = gst_app_sink_pull_sample (GST_APP_SINK (appsink));
1266         if (appsrc && g_list_length(r->clients_list) > 0) {
1267                 GstBuffer *buffer = gst_sample_get_buffer (sample);
1268                 GstCaps *caps = gst_sample_get_caps (sample);
1269
1270                 GST_LOG_OBJECT(appsink, "%" GST_PTR_FORMAT" @ %" GST_PTR_FORMAT, buffer, appsrc);
1271                 if (r->rtsp_start_pts == GST_CLOCK_TIME_NONE) {
1272                         if (GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT))
1273                         {
1274                                 GST_LOG("GST_BUFFER_FLAG_DELTA_UNIT dropping!");
1275                                 gst_sample_unref(sample);
1276 //                              DREAMRTSPSERVER_UNLOCK (app);
1277                                 return GST_FLOW_OK;
1278                         }
1279                         else if (appsink == r->vappsink || appsink == r->tsappsink)
1280                         {
1281                                 DREAMRTSPSERVER_LOCK (app);
1282                                 r->rtsp_start_pts = GST_BUFFER_PTS (buffer);
1283                                 r->rtsp_start_dts = GST_BUFFER_DTS (buffer);
1284                                 GST_LOG_OBJECT(appsink, "frame is IFRAME! set rtsp_start_pts=%" GST_TIME_FORMAT " rtsp_start_dts=%" GST_TIME_FORMAT " @ %"GST_PTR_FORMAT"", GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), GST_TIME_ARGS (GST_BUFFER_DTS (buffer)), appsrc);
1285                                 DREAMRTSPSERVER_UNLOCK (app);
1286                         }
1287                 }
1288                 if (GST_BUFFER_PTS (buffer) < r->rtsp_start_pts)
1289                         GST_BUFFER_PTS (buffer) = 0;
1290                 else
1291                         GST_BUFFER_PTS (buffer) -= r->rtsp_start_pts;
1292                 GST_BUFFER_DTS (buffer) -= r->rtsp_start_dts;
1293                 //    GST_LOG("new PTS %" GST_TIME_FORMAT " DTS %" GST_TIME_FORMAT "", GST_TIME_ARGS (GST_BUFFER_PTS (buffer)), GST_TIME_ARGS (GST_BUFFER_DTS (buffer)));
1294
1295                 GstCaps *oldcaps;
1296
1297                 oldcaps = gst_app_src_get_caps (appsrc);
1298                 if (!oldcaps || !gst_caps_is_equal (oldcaps, caps))
1299                 {
1300                         GST_DEBUG("CAPS changed! %" GST_PTR_FORMAT " to %" GST_PTR_FORMAT, oldcaps, caps);
1301                         gst_app_src_set_caps (appsrc, caps);
1302                 }
1303                 gst_app_src_push_buffer (appsrc, gst_buffer_ref(buffer));
1304         }
1305         else
1306         {
1307                 if ( gst_debug_category_get_threshold (dreamrtspserver_debug) >= GST_LEVEL_LOG)
1308                         GST_TRACE_OBJECT(appsink, "no rtsp clients, discard payload!");
1309 //              else
1310 //                      g_print (".");
1311         }
1312         gst_sample_unref (sample);
1313
1314         return GST_FLOW_OK;
1315 }
1316
1317 gboolean assert_state(App *app, GstElement *element, GstState state)
1318 {
1319         GstStateChangeReturn sret;
1320         GST_DEBUG_OBJECT(app, "setting %" GST_PTR_FORMAT"'s state to %s", element, gst_element_state_get_name (state));
1321         GstState pipeline_state, current_state;
1322         sret = gst_element_set_state (element, state);
1323         gchar *elementname = gst_element_get_name(element);
1324         gchar *dotfilename = g_strdup_printf ("assert_state_%s_to_%s_%i", elementname,gst_element_state_get_name (state), sret);
1325         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline), GST_DEBUG_GRAPH_SHOW_ALL, dotfilename);
1326         g_free (elementname);
1327         switch (sret) {
1328                 case GST_STATE_CHANGE_SUCCESS:
1329                 {
1330                         GST_DEBUG_OBJECT(app, "GST_STATE_CHANGE_SUCCESS on setting %" GST_PTR_FORMAT"'s state to %s ", element, gst_element_state_get_name (state));
1331                         return TRUE;
1332                 }
1333                 case GST_STATE_CHANGE_FAILURE:
1334                 {
1335                         GST_ERROR_OBJECT (app, "GST_STATE_CHANGE_FAILURE when trying to change %" GST_PTR_FORMAT"'s state to %s", element, gst_element_state_get_name (state));
1336                         return FALSE;
1337                 }
1338                 case GST_STATE_CHANGE_ASYNC:
1339                 {
1340                         watchdog_ping (app);
1341                         gboolean fail = FALSE;
1342                         GST_LOG_OBJECT(app, "GST_STATE_CHANGE_ASYNC %" GST_PTR_FORMAT"", element);
1343                         GstClockTime timeout = /*(element == app->pipeline) ?*/ 4 * GST_SECOND/* : GST_MSECOND*/;
1344                         if (element == GST_ELEMENT(app->pipeline))
1345                         {
1346                                 gst_element_get_state (GST_ELEMENT(app->pipeline), &pipeline_state, NULL, timeout);
1347                                 GST_LOG_OBJECT(app, "GST_STATE_CHANGE_ASYNC got pipeline_state=%s", gst_element_state_get_name (state));
1348                                 if (pipeline_state != state)
1349                                 {
1350                                         GValue item = G_VALUE_INIT;
1351                                         GstIterator* iter = gst_bin_iterate_elements(GST_BIN(app->pipeline));
1352                                         while (GST_ITERATOR_OK == gst_iterator_next(iter, (GValue*)&item))
1353                                         {
1354                                                 GstElement *elem = g_value_get_object(&item);
1355                                                 GstElementFactory *factory = gst_element_get_factory (elem);
1356                                                 gst_element_get_state (elem, &current_state, NULL, GST_MSECOND);
1357                                                 if (current_state != state && (element == app->pipeline || element == elem))
1358                                                 {
1359                                                         if (element == app->pipeline)
1360                                                         {
1361                                                                 if (gst_element_factory_list_is_type (factory, GST_ELEMENT_FACTORY_TYPE_SINK))
1362                                                                         GST_LOG_OBJECT(app, "GST_STATE_CHANGE_ASYNC %" GST_PTR_FORMAT"'s state=%s (this is a sink element, so don't worry...)", elem, gst_element_state_get_name (current_state));
1363                                                                 else
1364                                                                 {
1365                                                                         GST_WARNING_OBJECT(app, "GST_STATE_CHANGE_ASYNC %" GST_PTR_FORMAT"'s state=%s -> FAIL", elem, gst_element_state_get_name (current_state));
1366                                                                         fail = TRUE;
1367                                                                 }
1368                                                         }
1369                                                 }
1370                                                 else
1371                                                         GST_LOG_OBJECT(app, "GST_STATE_CHANGE_ASYNC %" GST_PTR_FORMAT"'s state=%s", elem, gst_element_state_get_name (current_state));
1372                                         }
1373                                         gst_iterator_free(iter);
1374                                         if (fail)
1375                                         {
1376                                                 GST_ERROR_OBJECT (app, "pipeline didn't complete GST_STATE_CHANGE_ASYNC to %s within %" GST_TIME_FORMAT ", currently in %s", gst_element_state_get_name (state), GST_TIME_ARGS(timeout), gst_element_state_get_name (pipeline_state));
1377                                                 return FALSE;
1378                                         }
1379                                 }
1380                         }
1381                         else
1382                         {
1383                                 gst_element_get_state (element, &current_state, NULL, timeout);
1384                                 if (current_state == state)
1385                                         GST_LOG_OBJECT(app, "GST_STATE_CHANGE_ASYNC on setting %" GST_PTR_FORMAT"'s state to %s SUCCESSFUL!", element, gst_element_state_get_name (state));
1386                                 else
1387                                 {
1388                                         GST_WARNING_OBJECT(app, "GST_STATE_CHANGE_ASYNC on setting %" GST_PTR_FORMAT"'s state to %s failed! now in %s", element, gst_element_state_get_name (state), gst_element_state_get_name (current_state));
1389                                         return FALSE;
1390                                 }
1391                         }
1392                         return TRUE;
1393                 }
1394                 case GST_STATE_CHANGE_NO_PREROLL:
1395                         GST_WARNING_OBJECT(app, "GST_STATE_CHANGE_NO_PREROLL on setting %" GST_PTR_FORMAT"'s state to %s ", element, gst_element_state_get_name (state));
1396                         break;
1397                 default:
1398                         break;
1399         }
1400         return TRUE;
1401 }
1402
1403 void assert_tsmux(App *app)
1404 {
1405         if (app->tsmux)
1406                 return;
1407
1408         GST_DEBUG_OBJECT (app, "inserting tsmux");
1409
1410         app->tsmux = gst_element_factory_make ("mpegtsmux", NULL);
1411         gst_bin_add (GST_BIN (app->pipeline), app->tsmux);
1412
1413         GstPad *sinkpad, *srcpad;
1414         GstPadLinkReturn ret;
1415
1416         srcpad = gst_element_get_static_pad (app->aq, "src");
1417         sinkpad = gst_element_get_compatible_pad (app->tsmux, srcpad, NULL);
1418         ret = gst_pad_link (srcpad, sinkpad);
1419         if (ret != GST_PAD_LINK_OK)
1420                 g_error ("couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", srcpad, sinkpad);
1421         gst_object_unref (srcpad);
1422         gst_object_unref (sinkpad);
1423
1424         srcpad = gst_element_get_static_pad (app->vq, "src");
1425         sinkpad = gst_element_get_compatible_pad (app->tsmux, srcpad, NULL);
1426         ret = gst_pad_link (srcpad, sinkpad);
1427         if (ret != GST_PAD_LINK_OK)
1428                 g_error ("couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", srcpad, sinkpad);
1429         gst_object_unref (srcpad);
1430         gst_object_unref (sinkpad);
1431
1432         if (!gst_element_link (app->tsmux, app->tstee))
1433                 g_error ("couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", app->tsmux, app->tstee);
1434 }
1435
1436 gboolean create_source_pipeline(App *app)
1437 {
1438         GST_INFO_OBJECT(app, "create_source_pipeline");
1439         DREAMRTSPSERVER_LOCK (app);
1440         app->pipeline = gst_pipeline_new ("dreamrtspserver_source_pipeline");
1441
1442         GstBus *bus = gst_pipeline_get_bus (GST_PIPELINE (app->pipeline));
1443         gst_bus_add_signal_watch (bus);
1444         g_signal_connect (G_OBJECT (bus), "message", G_CALLBACK (message_cb), app);
1445         gst_object_unref (GST_OBJECT (bus));
1446
1447         app->asrc = gst_element_factory_make ("dreamaudiosource", "dreamaudiosource0");
1448         app->vsrc = gst_element_factory_make ("dreamvideosource", "dreamvideosource0");
1449
1450         app->aparse = gst_element_factory_make ("aacparse", NULL);
1451         app->vparse = gst_element_factory_make ("h264parse", NULL);
1452
1453         app->atee = gst_element_factory_make ("tee", "atee");
1454         app->vtee = gst_element_factory_make ("tee", "vtee");
1455         app->tstee = gst_element_factory_make ("tee", "tstee");
1456
1457         app->aq = gst_element_factory_make ("queue", "aqueue");
1458         app->vq = gst_element_factory_make ("queue", "vqueue");
1459
1460         app->tsmux = gst_element_factory_make ("mpegtsmux", NULL);
1461         app->tstee = gst_element_factory_make ("tee", "tstee");
1462
1463         if (!(app->asrc && app->vsrc && app->aparse && app->vparse && app->aq && app->vq && app->atee && app->vtee && app->tsmux && app->tstee))
1464         {
1465                 g_error ("Failed to create source pipeline element(s):%s%s%s%s%s%s%s%s%s", app->asrc?"":" dreamaudiosource", app->vsrc?"":" dreamvideosource", app->aparse?"":" aacparse",
1466                         app->vparse?"":" h264parse", app->aq?"":" aqueue", app->vq?"":" vqueue", app->atee?"":" atee", app->vtee?"":" vtee", app->tsmux?"":"  mpegtsmux");
1467         }
1468         gst_object_unref(app->tsmux);
1469         app->tsmux = NULL;
1470
1471         if (!(app->asrc && app->vsrc && app->aparse && app->vparse))
1472         {
1473                 g_error ("Failed to create source pipeline element(s):%s%s%s%s", app->asrc?"":" dreamaudiosource", app->vsrc?"":" dreamvideosource", app->aparse?"":" aacparse", app->vparse?"":" h264parse");
1474         }
1475
1476         GstElement *appsink, *appsrc, *vpay, *apay, *udpsrc;
1477         appsink = gst_element_factory_make ("appsink", NULL);
1478         appsrc = gst_element_factory_make ("appsrc", NULL);
1479         vpay = gst_element_factory_make ("rtph264pay", NULL);
1480         apay = gst_element_factory_make ("rtpmp4apay", NULL);
1481         udpsrc = gst_element_factory_make ("udpsrc", NULL);
1482
1483         if (!(appsink && appsrc && vpay && apay && udpsrc))
1484                 g_error ("Failed to create rtsp element(s):%s%s%s%s%s", appsink?"":" appsink", appsrc?"":" appsrc", vpay?"": "rtph264pay", apay?"":" rtpmp4apay", udpsrc?"":" udpsrc" );
1485         else
1486         {
1487                 gst_object_unref (appsink);
1488                 gst_object_unref (appsrc);
1489                 gst_object_unref (vpay);
1490                 gst_object_unref (apay);
1491                 gst_object_unref (udpsrc);
1492         }
1493
1494         gst_bin_add_many (GST_BIN (app->pipeline), app->asrc, app->aparse, app->atee, app->aq, NULL);
1495         gst_bin_add_many (GST_BIN (app->pipeline), app->vsrc, app->vparse, app->vtee, app->vq, NULL);
1496         gst_bin_add (GST_BIN (app->pipeline), app->tstee);
1497         gst_element_link_many (app->asrc, app->aparse, app->atee, NULL);
1498         gst_element_link_many (app->vsrc, app->vparse, app->vtee, NULL);
1499
1500         GstPad *teepad, *sinkpad;
1501         GstPadLinkReturn ret;
1502
1503         teepad = gst_element_get_request_pad (app->atee, "src_%u");
1504         sinkpad = gst_element_get_static_pad (app->aq, "sink");
1505         ret = gst_pad_link (teepad, sinkpad);
1506         if (ret != GST_PAD_LINK_OK)
1507
1508         if (ret != GST_PAD_LINK_OK)
1509         {
1510                 GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
1511                 return FALSE;
1512         }
1513         gst_object_unref (teepad);
1514         gst_object_unref (sinkpad);
1515
1516         teepad = gst_element_get_request_pad (app->vtee, "src_%u");
1517         sinkpad = gst_element_get_static_pad (app->vq, "sink");
1518         ret = gst_pad_link (teepad, sinkpad);
1519
1520         if (ret != GST_PAD_LINK_OK)
1521         {
1522                 GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
1523                 return FALSE;
1524         }
1525
1526         gst_object_unref (teepad);
1527         gst_object_unref (sinkpad);
1528
1529         app->clock = gst_system_clock_obtain();
1530         gst_pipeline_use_clock(GST_PIPELINE (app->pipeline), app->clock);
1531
1532         apply_source_properties(app);
1533
1534         g_signal_connect (app->asrc, "signal-lost", G_CALLBACK (encoder_signal_lost), app);
1535
1536         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"create_source_pipeline");
1537         DREAMRTSPSERVER_UNLOCK (app);
1538         return TRUE;
1539 }
1540
1541 static void encoder_signal_lost (GstElement *dreamaudiosource, gpointer user_data)
1542 {
1543         GST_INFO_OBJECT (dreamaudiosource, "lost encoder signal!");
1544 }
1545
1546 static GstPadProbeReturn inject_authorization (GstPad * sinkpad, GstPadProbeInfo * info, gpointer user_data)
1547 {
1548         App *app = user_data;
1549
1550         GstBuffer *token_buf = gst_buffer_new_wrapped (app->tcp_upstream->token, TOKEN_LEN);
1551         GstPad * srcpad = gst_element_get_static_pad (app->tcp_upstream->tstcpq, "src");
1552
1553         GST_INFO ("injecting authorization on pad %s:%s, created token_buf %" GST_PTR_FORMAT "", GST_DEBUG_PAD_NAME (sinkpad), token_buf);
1554         gst_pad_remove_probe (sinkpad, info->id);
1555         gst_pad_push (srcpad, gst_buffer_ref(token_buf));
1556
1557         return GST_PAD_PROBE_REMOVE;
1558 }
1559
1560 gboolean enable_tcp_upstream(App *app, const gchar *upstream_host, guint32 upstream_port, const gchar *token)
1561 {
1562         GST_DEBUG_OBJECT(app, "enable_tcp_upstream host=%s port=%i token=%s", upstream_host, upstream_port, token);
1563
1564         if (!app->pipeline)
1565         {
1566                 GST_ERROR_OBJECT (app, "failed to enable upstream because source pipeline is NULL!");
1567                 goto fail;
1568         }
1569
1570         DreamTCPupstream *t = app->tcp_upstream;
1571
1572         if (t->state == UPSTREAM_STATE_DISABLED)
1573         {
1574                 assert_tsmux (app);
1575                 DREAMRTSPSERVER_LOCK (app);
1576
1577                 t->id_signal_overrun = 0;
1578                 t->id_signal_waiting = 0;
1579                 t->id_signal_keepalive = 0;
1580                 t->id_bitrate_measure = 0;
1581                 t->id_resume = 0;
1582                 t->state = UPSTREAM_STATE_CONNECTING;
1583                 send_signal (app, "upstreamStateChanged", g_variant_new("(i)", t->state));
1584
1585                 t->tstcpq  = gst_element_factory_make ("queue", "tstcpqueue");
1586                 t->tcpsink = gst_element_factory_make ("tcpclientsink", NULL);
1587
1588                 if (!(t->tstcpq && t->tcpsink ))
1589                         g_error ("Failed to create tcp upstream element(s):%s%s", t->tstcpq?"":"  ts queue", t->tcpsink?"":"  tcpclientsink" );
1590
1591                 g_object_set (G_OBJECT (t->tstcpq), "leaky", 2, "max-size-buffers", 400, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(0), NULL);
1592
1593                 t->id_signal_overrun = g_signal_connect (t->tstcpq, "overrun", G_CALLBACK (queue_overrun), app);
1594                 GST_TRACE_OBJECT(app, "installed %" GST_PTR_FORMAT " overrun handler id=%u", t->tstcpq, t->id_signal_overrun);
1595
1596                 g_object_set (t->tcpsink, "max-lateness", G_GINT64_CONSTANT(3)*GST_SECOND, NULL);
1597                 g_object_set (t->tcpsink, "blocksize", BLOCK_SIZE, NULL);
1598
1599                 g_object_set (t->tcpsink, "host", upstream_host, NULL);
1600                 g_object_set (t->tcpsink, "port", upstream_port, NULL);
1601                 gchar *check_host;
1602                 guint32 check_port;
1603                 g_object_get (t->tcpsink, "host", &check_host, NULL);
1604                 g_object_get (t->tcpsink, "port", &check_port, NULL);
1605                 if (g_strcmp0 (upstream_host, check_host))
1606                 {
1607                         g_free (check_host);
1608                         GST_ERROR_OBJECT (app, "couldn't set upstream_host %s", upstream_host);
1609                         goto fail;
1610                 }
1611                 if (upstream_port != check_port)
1612                 {
1613                         GST_ERROR_OBJECT (app, "couldn't set upstream_port %d", upstream_port);
1614                         goto fail;
1615                 }
1616                 g_free (check_host);
1617
1618                 GstStateChangeReturn sret = gst_element_set_state (t->tcpsink, GST_STATE_READY);
1619                 if (sret == GST_STATE_CHANGE_FAILURE)
1620                 {
1621                         GST_ERROR_OBJECT (app, "failed to set tcpsink to GST_STATE_READY. %s:%d probably refused connection", upstream_host, upstream_port);
1622                         gst_object_unref (t->tstcpq);
1623                         gst_object_unref (t->tcpsink);
1624                         t->state = UPSTREAM_STATE_DISABLED;
1625                         send_signal (app, "upstreamStateChanged", g_variant_new("(i)", t->state));
1626                         DREAMRTSPSERVER_UNLOCK (app);
1627                         return FALSE;
1628                 }
1629
1630                 gst_bin_add_many (GST_BIN(app->pipeline), t->tstcpq, t->tcpsink, NULL);
1631                 if (!gst_element_link (t->tstcpq, t->tcpsink)) {
1632                         GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", t->tstcpq, t->tcpsink);
1633                         goto fail;
1634                 }
1635
1636 //              if (!assert_state (app, t->tcpsink, GST_STATE_PLAYING) || !assert_state (app, t->tstcpq, GST_STATE_PLAYING))
1637 //                      goto fail;
1638
1639                 GstPadLinkReturn ret;
1640                 GstPad *srcpad, *sinkpad;
1641                 srcpad = gst_element_get_request_pad (app->tstee, "src_%u");
1642                 sinkpad = gst_element_get_static_pad (t->tstcpq, "sink");
1643                 ret = gst_pad_link (srcpad, sinkpad);
1644                 gst_object_unref (srcpad);
1645                 gst_object_unref (sinkpad);
1646                 if (ret != GST_PAD_LINK_OK)
1647                 {
1648                         GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", srcpad, sinkpad);
1649                         goto fail;
1650                 }
1651
1652                 if (strlen(token))
1653                 {
1654                         sinkpad = gst_element_get_static_pad (t->tcpsink, "sink");
1655                         strcpy(t->token, token);
1656                         gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BUFFER|GST_PAD_PROBE_TYPE_BUFFER_LIST, (GstPadProbeCallback) inject_authorization, app, NULL);
1657                         gst_object_unref (sinkpad);
1658                 }
1659                 else
1660                         GST_DEBUG_OBJECT (app, "no token specified!");
1661
1662                 if (!assert_state (app, app->pipeline, GST_STATE_PLAYING))
1663                 {
1664                         GST_ERROR_OBJECT (app, "GST_STATE_CHANGE_FAILURE for TCP upstream");
1665                         return FALSE;
1666                 }
1667                 GST_INFO_OBJECT(app, "enabled TCP upstream! upstreamState = UPSTREAM_STATE_CONNECTING");
1668                 DREAMRTSPSERVER_UNLOCK (app);
1669                 return TRUE;
1670         }
1671         else
1672                 GST_INFO_OBJECT (app, "tcp upstream already enabled! (upstreamState = %i)", t->state);
1673         return FALSE;
1674
1675 fail:
1676         DREAMRTSPSERVER_UNLOCK (app);
1677         disable_tcp_upstream(app);
1678         return FALSE;
1679 }
1680
1681 gboolean
1682 _delete_dir_recursively (GFile *directory, GError **error)
1683 {
1684         GFileEnumerator *children = NULL;
1685         GFileInfo *info;
1686         gboolean ret = FALSE;
1687         children = g_file_enumerate_children (directory, G_FILE_ATTRIBUTE_STANDARD_NAME "," G_FILE_ATTRIBUTE_STANDARD_TYPE, 0, NULL, error);
1688         if (children == NULL || error)
1689                 goto out;
1690         info = g_file_enumerator_next_file (children, NULL, error);
1691         while (info || error) {
1692                 GFile *child;
1693                 const char *name;
1694                 GFileType type;
1695                 if (error)
1696                         goto out;
1697                 name = g_file_info_get_name (info);
1698                 child = g_file_get_child (directory, name);
1699                 type = g_file_info_get_file_type (info);
1700                 GST_LOG ("delete %s", name);
1701                 if (type == G_FILE_TYPE_DIRECTORY)
1702                         ret = _delete_dir_recursively (child, error);
1703                 else if (type == G_FILE_TYPE_REGULAR)
1704                         ret = g_file_delete (child, NULL, error);
1705                 g_object_unref (info);
1706                 g_object_unref (child);
1707                 if (!ret)
1708                         goto out;
1709                 info = g_file_enumerator_next_file (children, NULL, error);
1710         }
1711         ret = TRUE;
1712         g_file_delete (directory, NULL, error);
1713
1714 out:
1715         if (children)
1716                 g_object_unref (children);
1717         return ret;
1718 }
1719
1720 gboolean hls_client_timeout (gpointer user_data)
1721 {
1722         App *app = user_data;
1723         if (app->hls_server)
1724         {
1725                 GST_INFO_OBJECT(app, "HLS clients stopped downloading, stopping hls pipeline!");
1726                 stop_hls_pipeline (app);
1727         }
1728         return FALSE;
1729 }
1730
1731 static void
1732 soup_do_get (SoupServer *server, SoupMessage *msg, const char *path, App *app)
1733 {
1734         gchar *hlspath = NULL;
1735         guint status_code = SOUP_STATUS_NONE;
1736         struct stat st;
1737
1738         if (path)
1739         {
1740                 if (strlen(path) < 1)
1741                         status_code = SOUP_STATUS_BAD_REQUEST;
1742                 if (strlen(path) == 1)
1743                         status_code = SOUP_STATUS_MOVED_PERMANENTLY;
1744                 else
1745                         hlspath = g_strdup_printf ("%s%s", HLS_PATH, path);
1746         }
1747         if (app->hls_server->state == HLS_STATE_IDLE && g_strcmp0 (path+1, HLS_PLAYLIST_NAME) == 0)
1748         {
1749                 DREAMRTSPSERVER_LOCK (app);
1750                 GST_INFO_OBJECT (server, "client requested '%s' but we're idle... start pipeline!", path+1);
1751                 if (!start_hls_pipeline (app))
1752                         status_code = SOUP_STATUS_INTERNAL_SERVER_ERROR;
1753                 else
1754                 {
1755                         GstState state;
1756                         gst_element_get_state (GST_ELEMENT(app->pipeline), &state, NULL, HLS_FRAGMENT_DURATION);
1757                         g_usleep (G_USEC_PER_SEC * (HLS_FRAGMENT_DURATION+1));
1758                         app->hls_server->state = HLS_STATE_RUNNING;
1759                         send_signal (app, "hlsStateChanged", g_variant_new("(i)", HLS_STATE_RUNNING));
1760                 }
1761                 DREAMRTSPSERVER_UNLOCK (app);
1762         }
1763         else if (status_code == SOUP_STATUS_NONE && stat (hlspath, &st) == -1) {
1764                 if (errno == EPERM)
1765                         status_code = SOUP_STATUS_FORBIDDEN;
1766                 else if (errno == ENOENT)
1767                         status_code = SOUP_STATUS_NOT_FOUND;
1768                 else
1769                         status_code = SOUP_STATUS_INTERNAL_SERVER_ERROR;
1770         }
1771         else if (S_ISDIR (st.st_mode))
1772                 status_code = SOUP_STATUS_SERVICE_UNAVAILABLE;
1773
1774         if (status_code == SOUP_STATUS_MOVED_PERMANENTLY)
1775         {
1776                 GST_LOG_OBJECT (server, "client requested /, redirect to %s", HLS_PLAYLIST_NAME);
1777                 soup_message_set_redirect (msg, status_code, HLS_PLAYLIST_NAME);
1778                 return;
1779         }
1780         else if (status_code != SOUP_STATUS_NONE)
1781         {
1782                 GST_WARNING_OBJECT (server, "client requested '%s', error serving '%s', http status code %i", path, hlspath ? hlspath : "", status_code);
1783                 g_free (hlspath);
1784                 soup_message_set_status (msg, status_code);
1785                 return;
1786         }
1787
1788         GST_INFO_OBJECT (server, "client requests '%s', serving '%s'...", path, hlspath);
1789
1790         if (msg->method == SOUP_METHOD_GET) {
1791                 GMappedFile *mapping;
1792                 SoupBuffer *buffer;
1793
1794                 mapping = g_mapped_file_new (hlspath, FALSE, NULL);
1795                 if (!mapping) {
1796                         soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
1797                         return;
1798                 }
1799
1800                 if (g_strrstr (hlspath, ".ts"))
1801                         soup_message_headers_set_content_type (msg->response_headers, "video/MP2T", NULL);
1802                 else
1803                 {
1804                         GstState state;
1805                         gst_element_get_state (app->asrc, &state, NULL, GST_MSECOND);
1806                         if (state != GST_STATE_PLAYING)
1807                         {
1808                                 assert_tsmux (app);
1809                                 if (!assert_state (app, app->pipeline, GST_STATE_PLAYING))
1810                                 {
1811                                         soup_message_set_status (msg, SOUP_STATUS_BAD_GATEWAY);
1812                                         g_free (hlspath);
1813                                         return;
1814                                 }
1815                         }
1816                         soup_message_headers_set_content_type (msg->response_headers, "application/x-mpegURL", NULL);
1817                 }
1818                 if (app->hls_server->id_timeout)
1819                         g_source_remove (app->hls_server->id_timeout);
1820                 app->hls_server->id_timeout = g_timeout_add_seconds (5*HLS_FRAGMENT_DURATION, (GSourceFunc) hls_client_timeout, app);
1821
1822                 buffer = soup_buffer_new_with_owner (g_mapped_file_get_contents (mapping),
1823                                                      g_mapped_file_get_length (mapping),
1824                                                      mapping, (GDestroyNotify)g_mapped_file_unref);
1825                 soup_message_body_append_buffer (msg->response_body, buffer);
1826                 soup_buffer_free (buffer);
1827         }
1828         g_free (hlspath);
1829         soup_message_set_status (msg, SOUP_STATUS_OK);
1830 }
1831
1832 static gboolean
1833 soup_server_auth_callback (SoupAuthDomain *domain, SoupMessage *msg, const char *username, const char *password, gpointer user_data)
1834 {
1835         App *app = (App *) user_data;
1836         DreamHLSserver *h = app->hls_server;
1837         if (g_strcmp0(h->hls_user, username) == 0 && strcmp(h->hls_pass, password) == 0)
1838         {
1839                 GST_TRACE_OBJECT (app->hls_server, "authenticated request with credentials %s:%s", username, password);
1840                 return TRUE;
1841         }
1842         else
1843                 GST_WARNING_OBJECT (app->hls_server, "denied authentication request with credentials %s:%s", username, password); 
1844         return FALSE;
1845 }
1846
1847 static void
1848 soup_server_callback (SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *context, gpointer data)
1849 {
1850         GST_TRACE_OBJECT (server, "%s %s HTTP/1.%d", msg->method, path, soup_message_get_http_version (msg));
1851         if (msg->method == SOUP_METHOD_GET)
1852                 soup_do_get (server, msg, path, (App *) data);
1853         else
1854                 soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
1855         GST_TRACE_OBJECT (server, "  -> %d %s", msg->status_code, msg->reason_phrase);
1856 }
1857
1858 static GstPadProbeReturn hls_pad_probe_unlink_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1859 {
1860         App *app = user_data;
1861         DreamHLSserver *h = app->hls_server;
1862
1863         GstElement *element = gst_pad_get_parent_element(pad);
1864
1865         GST_DEBUG_OBJECT(pad, "unlink... %" GST_PTR_FORMAT " and %" GST_PTR_FORMAT, element, h->hlssink);
1866
1867         GstPad *teepad;
1868         teepad = gst_pad_get_peer(pad);
1869         gst_pad_unlink (teepad, pad);
1870
1871         GstElement *tee = gst_pad_get_parent_element(teepad);
1872         gst_element_release_request_pad (tee, teepad);
1873         gst_object_unref (teepad);
1874         gst_object_unref (tee);
1875
1876         gst_element_unlink (element, h->hlssink);
1877
1878         GST_DEBUG_OBJECT(pad, "remove... %" GST_PTR_FORMAT " and %" GST_PTR_FORMAT, element, h->hlssink);
1879
1880         gst_bin_remove_many (GST_BIN (app->pipeline), element, h->hlssink, NULL);
1881
1882         GST_DEBUG_OBJECT(pad, "set state null %" GST_PTR_FORMAT " and %" GST_PTR_FORMAT, element, h->hlssink);
1883
1884         gst_element_set_state (h->hlssink, GST_STATE_NULL);
1885         gst_element_set_state (element, GST_STATE_NULL);
1886
1887         GST_DEBUG_OBJECT(pad, "unref.... %" GST_PTR_FORMAT " and %" GST_PTR_FORMAT, element, h->hlssink);
1888
1889         gst_object_unref (element);
1890         gst_object_unref (h->hlssink);
1891         h->queue = NULL;
1892         h->hlssink = NULL;
1893
1894         if (h->id_timeout)
1895                 g_source_remove (h->id_timeout);
1896
1897         if (app->tcp_upstream->state == UPSTREAM_STATE_DISABLED && g_list_length (app->rtsp_server->clients_list) == 0)
1898                 halt_source_pipeline(app);
1899
1900         GST_INFO ("HLS server unlinked!");
1901
1902         return GST_PAD_PROBE_REMOVE;
1903 }
1904
1905 gboolean stop_hls_pipeline(App *app)
1906 {
1907         GST_INFO_OBJECT(app, "stop_hls_pipeline");
1908         DreamHLSserver *h = app->hls_server;
1909         if (h->state == HLS_STATE_RUNNING)
1910         {
1911                 DREAMRTSPSERVER_LOCK (app);
1912                 h->state = HLS_STATE_IDLE;
1913                 send_signal (app, "hlsStateChanged", g_variant_new("(i)", HLS_STATE_IDLE));
1914                 gst_object_ref (h->queue);
1915                 gst_object_ref (h->hlssink);
1916                 GstPad *sinkpad;
1917                 sinkpad = gst_element_get_static_pad (h->queue, "sink");
1918                 gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, hls_pad_probe_unlink_cb, app, NULL);
1919                 gst_object_unref (sinkpad);
1920                 DREAMRTSPSERVER_UNLOCK (app);
1921                 GST_INFO("hls server pipeline stopped, set HLS_STATE_IDLE");
1922                 return TRUE;
1923         }
1924         else
1925                 GST_INFO("hls server wasn't in HLS_STATE_RUNNING... can't stop");
1926         return FALSE;
1927 }
1928
1929 gboolean disable_hls_server(App *app)
1930 {
1931         GST_INFO_OBJECT(app, "disable_hls_server");
1932         DreamHLSserver *h = app->hls_server;
1933         if (h->state == HLS_STATE_RUNNING)
1934                 stop_hls_pipeline (app);
1935         if (h->state == HLS_STATE_IDLE)
1936         {
1937                 DREAMRTSPSERVER_LOCK (app);
1938                 soup_server_disconnect(h->soupserver);
1939                 if (h->soupauthdomain)
1940                 {
1941                         g_object_unref (h->soupauthdomain);
1942                         g_free(h->hls_user);
1943                         g_free(h->hls_pass);
1944                 }
1945                 g_object_unref (h->soupserver);
1946                 GFile *tmp_dir_file = g_file_new_for_path (HLS_PATH);
1947                 _delete_dir_recursively (tmp_dir_file, NULL);
1948                 g_object_unref (tmp_dir_file);
1949                 h->state = HLS_STATE_DISABLED;
1950                 send_signal (app, "hlsStateChanged", g_variant_new("(i)", HLS_STATE_DISABLED));
1951                 DREAMRTSPSERVER_UNLOCK (app);
1952                 GST_INFO("hls soupserver unref'ed, set HLS_STATE_DISABLED");
1953                 return TRUE;
1954         }
1955         else
1956                 GST_INFO("hls server in wrong state... can't disable");
1957         return FALSE;
1958 }
1959
1960 #if 0
1961 static GstPadProbeReturn _detect_keyframes_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1962 {
1963         App *app = user_data;
1964         GstBuffer *buffer;
1965         guint idx = 0, num_buffers = 1;
1966         do {
1967                 if (info->type & GST_PAD_PROBE_TYPE_BUFFER)
1968                 {
1969                         buffer = GST_PAD_PROBE_INFO_BUFFER (info);
1970                 }
1971                 else if (info->type & GST_PAD_PROBE_TYPE_BUFFER_LIST)
1972                 {
1973                         GstBufferList *bufferlist = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
1974                         num_buffers = gst_buffer_list_length (bufferlist);
1975                         buffer = gst_buffer_list_get (bufferlist, idx);
1976                 }
1977                 if (GST_IS_BUFFER(buffer) && !GST_BUFFER_FLAG_IS_SET(buffer, GST_BUFFER_FLAG_DELTA_UNIT))
1978                         GST_INFO_OBJECT (app, "KEYFRAME %" GST_PTR_FORMAT " detected @ %" GST_PTR_FORMAT " num_buffers=%i", buffer, pad, num_buffers);
1979                 idx++;
1980         } while (idx < num_buffers);
1981         return GST_PAD_PROBE_OK;
1982 }
1983 #endif
1984
1985 gboolean enable_hls_server(App *app, guint port, const gchar *user, const gchar *pass)
1986 {
1987         GST_INFO_OBJECT(app, "enable_hls_server port=%i user=%s pass=%s", port, user, pass);
1988         if (!app->pipeline)
1989         {
1990                 GST_ERROR_OBJECT (app, "failed to enable hls server because source pipeline is NULL!");
1991                 return FALSE;
1992         }
1993
1994         DREAMRTSPSERVER_LOCK (app);
1995         DreamHLSserver *h = app->hls_server;
1996
1997         if (h->state == HLS_STATE_DISABLED)
1998         {
1999                 int r = mkdir (HLS_PATH, DEFFILEMODE);
2000                 if (r == -1 && errno != EEXIST)
2001                 {
2002                         g_error ("Failed to create HLS server directory '%s': %s (%i)", HLS_PATH, strerror(errno), errno);
2003                         goto fail;
2004                 }
2005
2006                 h->port = port;
2007
2008 #if SOUP_CHECK_VERSION(2,48,0)
2009                 h->soupserver = soup_server_new (SOUP_SERVER_SERVER_HEADER, "dreamhttplive", NULL);
2010                 soup_server_listen_all(h->soupserver, port, 0, NULL);
2011 #else
2012                 h->soupserver = soup_server_new (SOUP_SERVER_PORT, h->port, SOUP_SERVER_SERVER_HEADER, "dreamhttplive", NULL);
2013                 soup_server_run_async (h->soupserver);
2014 #endif
2015                 soup_server_add_handler (h->soupserver, NULL, soup_server_callback, app, NULL);
2016
2017                 gchar *credentials = g_strdup("");
2018                 if (strlen(user)) {
2019                         h->hls_user = g_strdup(user);
2020                         h->hls_pass = g_strdup(pass);
2021                         h->soupauthdomain = soup_auth_domain_basic_new (
2022                         SOUP_AUTH_DOMAIN_REALM, "Dreambox HLS Server",
2023                         SOUP_AUTH_DOMAIN_BASIC_AUTH_CALLBACK, soup_server_auth_callback,
2024                         SOUP_AUTH_DOMAIN_BASIC_AUTH_DATA, app,
2025                         SOUP_AUTH_DOMAIN_ADD_PATH, "",
2026                         NULL);
2027                         soup_server_add_auth_domain (h->soupserver, h->soupauthdomain);
2028                         credentials = g_strdup_printf("%s:%s@", user, pass);
2029                 }
2030                 else
2031                 {
2032                         h->hls_user = h->hls_pass = NULL;
2033                         h->soupauthdomain = NULL;
2034                 }
2035
2036 #if SOUP_CHECK_VERSION(2,48,0)
2037                 GSList *uris = soup_server_get_uris(h->soupserver);
2038                 for (GSList *uri = uris; uri != NULL; uri = uri->next) {
2039                         char *str = soup_uri_to_string(uri->data, FALSE);
2040                         GST_INFO_OBJECT(h->soupserver, "SOUP HLS server ready at %s [/%s] (%s)", str, HLS_PLAYLIST_NAME, credentials);
2041                         g_free(str);
2042                         soup_uri_free(uri->data);
2043                 }
2044                 g_slist_free(uris);
2045 #else
2046                 GST_INFO_OBJECT (h->soupserver, "SOUP HLS server ready at http://%s127.0.0.1:%i/%s ...", credentials, soup_server_get_port (h->soupserver), HLS_PLAYLIST_NAME);
2047 #endif
2048
2049                 h->state = HLS_STATE_IDLE;
2050                 send_signal (app, "hlsStateChanged", g_variant_new("(i)", HLS_STATE_IDLE));
2051                 GST_DEBUG ("set HLS_STATE_IDLE");
2052                 g_free (credentials);
2053                 DREAMRTSPSERVER_UNLOCK (app);
2054                 return TRUE;
2055         }
2056         else
2057                 GST_INFO_OBJECT (app, "HLS server already enabled!");
2058         DREAMRTSPSERVER_UNLOCK (app);
2059         return FALSE;
2060
2061 fail:
2062         DREAMRTSPSERVER_UNLOCK (app);
2063         disable_hls_server(app);
2064         return FALSE;
2065
2066 }
2067
2068 gboolean start_hls_pipeline(App* app)
2069 {
2070         GST_DEBUG_OBJECT (app, "start_hls_pipeline");
2071
2072         DreamHLSserver *h = app->hls_server;
2073         if (h->state == HLS_STATE_DISABLED)
2074         {
2075                 GST_ERROR_OBJECT (app, "failed to start hls pipeline because hls server is not enabled!");
2076                 return FALSE;
2077         }
2078
2079         assert_tsmux (app);
2080
2081         h->queue = gst_element_factory_make ("queue", "hlsqueue");
2082         h->hlssink = gst_element_factory_make ("hlssink", "hlssink");
2083         if (!(h->hlssink && h->queue))
2084         {
2085                 g_error ("Failed to create HLS pipeline element(s):%s%s", h->hlssink?"":" hlssink", h->queue?"":" queue");
2086                 return FALSE;
2087         }
2088
2089         gchar *frag_location, *playlist_location;
2090         frag_location = g_strdup_printf ("%s/%s", HLS_PATH, HLS_FRAGMENT_NAME);
2091         playlist_location = g_strdup_printf ("%s/%s", HLS_PATH, HLS_PLAYLIST_NAME);
2092
2093         g_object_set (G_OBJECT (h->hlssink), "target-duration", HLS_FRAGMENT_DURATION, NULL);
2094         g_object_set (G_OBJECT (h->hlssink), "location", frag_location, NULL);
2095         g_object_set (G_OBJECT (h->hlssink), "playlist-location", playlist_location, NULL);
2096         g_object_set (G_OBJECT (h->queue), "leaky", 2, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, NULL);
2097
2098         gst_bin_add_many (GST_BIN (app->pipeline), h->queue, h->hlssink,  NULL);
2099         gst_element_link (h->queue, h->hlssink);
2100
2101         if (!assert_state (app, h->hlssink, GST_STATE_READY) || !assert_state (app, h->queue, GST_STATE_PLAYING))
2102                 return FALSE;
2103
2104         GstPad *teepad, *sinkpad;
2105         GstPadLinkReturn ret;
2106         teepad = gst_element_get_request_pad (app->tstee, "src_%u");
2107         sinkpad = gst_element_get_static_pad (h->queue, "sink");
2108         ret = gst_pad_link (teepad, sinkpad);
2109         if (ret != GST_PAD_LINK_OK)
2110         {
2111                 GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
2112                 return FALSE;
2113         }
2114         gst_object_unref (teepad);
2115         gst_object_unref (sinkpad);
2116
2117         if (app->tcp_upstream->state == UPSTREAM_STATE_WAITING)
2118                 unpause_source_pipeline(app);
2119
2120         GstStateChangeReturn sret = gst_element_set_state (h->hlssink, GST_STATE_PLAYING);
2121         GST_DEBUG_OBJECT(app, "explicitely bring hlssink to GST_STATE_PLAYING = %i", sret);
2122
2123         if (!assert_state (app, app->pipeline, GST_STATE_PLAYING))
2124         {
2125                 GST_ERROR_OBJECT (app, "GST_STATE_CHANGE_FAILURE for hls pipeline");
2126                 return FALSE;
2127         }
2128
2129         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"start_hls_server");
2130         return TRUE;
2131 }
2132
2133 DreamHLSserver *create_hls_server(App *app)
2134 {
2135         DreamHLSserver *h = malloc(sizeof(DreamHLSserver));
2136         send_signal (app, "hlsStateChanged", g_variant_new("(i)", HLS_STATE_DISABLED));
2137         h->state = HLS_STATE_DISABLED;
2138         h->queue = NULL;
2139         h->hlssink = NULL;
2140         return h;
2141 }
2142
2143 DreamRTSPserver *create_rtsp_server(App *app)
2144 {
2145         DreamRTSPserver *r = malloc(sizeof(DreamRTSPserver));
2146         send_signal (app, "rtspStateChanged", g_variant_new("(i)", RTSP_STATE_DISABLED));
2147         r->state = RTSP_STATE_DISABLED;
2148         r->server = NULL;
2149         r->ts_factory = r->es_factory = NULL;
2150         r->ts_media = r->es_media = NULL;
2151         r->ts_appsrc = r->es_aappsrc = r->es_vappsrc = NULL;
2152         r->clients_list = NULL;
2153         return r;
2154 }
2155
2156 gboolean enable_rtsp_server(App *app, const gchar *path, guint32 port, const gchar *user, const gchar *pass)
2157 {
2158         GST_INFO_OBJECT(app, "enable_rtsp_server path=%s port=%i user=%s pass=%s", path, port, user, pass);
2159
2160         if (!app->pipeline)
2161         {
2162                 GST_ERROR_OBJECT (app, "failed to enable rtsp server because source pipeline is NULL!");
2163                 return FALSE;
2164         }
2165
2166         DREAMRTSPSERVER_LOCK (app);
2167         DreamRTSPserver *r = app->rtsp_server;
2168
2169         if (r->state == RTSP_STATE_DISABLED)
2170         {
2171                 r->artspq = gst_element_factory_make ("queue", "rtspaudioqueue");
2172                 r->vrtspq = gst_element_factory_make ("queue", "rtspvideoqueue");
2173                 r->aappsink = gst_element_factory_make ("appsink", AAPPSINK);
2174                 r->vappsink = gst_element_factory_make ("appsink", VAPPSINK);
2175
2176                 g_object_set (G_OBJECT (r->artspq), "leaky", 2, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, NULL);
2177                 g_object_set (G_OBJECT (r->vrtspq), "leaky", 2, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, NULL);
2178
2179                 g_object_set (G_OBJECT (r->aappsink), "emit-signals", TRUE, NULL);
2180                 g_object_set (G_OBJECT (r->aappsink), "enable-last-sample", FALSE, NULL);
2181                 g_signal_connect (r->aappsink, "new-sample", G_CALLBACK (handover_payload), app);
2182
2183                 g_object_set (G_OBJECT (r->vappsink), "emit-signals", TRUE, NULL);
2184                 g_object_set (G_OBJECT (r->vappsink), "enable-last-sample", FALSE, NULL);
2185                 g_signal_connect (r->vappsink, "new-sample", G_CALLBACK (handover_payload), app);
2186
2187                 r->tsrtspq = gst_element_factory_make ("queue", "tsrtspqueue");
2188                 r->tsappsink = gst_element_factory_make ("appsink", TSAPPSINK);
2189
2190                 g_object_set (G_OBJECT (r->tsrtspq), "leaky", 2, "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", G_GINT64_CONSTANT(5)*GST_SECOND, NULL);
2191
2192                 g_object_set (G_OBJECT (r->tsappsink), "emit-signals", TRUE, NULL);
2193                 g_object_set (G_OBJECT (r->tsappsink), "enable-last-sample", FALSE, NULL);
2194                 g_signal_connect (r->tsappsink, "new-sample", G_CALLBACK (handover_payload), app);
2195
2196                 gst_bin_add_many (GST_BIN (app->pipeline), r->artspq, r->vrtspq, r->aappsink, r->vappsink,  NULL);
2197                 gst_element_link (r->artspq, r->aappsink);
2198                 gst_element_link (r->vrtspq, r->vappsink);
2199
2200                 gst_bin_add_many (GST_BIN (app->pipeline), r->tsrtspq, r->tsappsink,  NULL);
2201                 gst_element_link (r->tsrtspq, r->tsappsink);
2202
2203                 GstState targetstate = GST_STATE_READY;
2204
2205                 if (!assert_state (app, r->tsappsink, targetstate) || !assert_state (app, r->aappsink, targetstate) || !assert_state (app, r->vappsink, targetstate))
2206                         goto fail;
2207
2208                 if (!assert_state (app, r->tsrtspq, targetstate) || !assert_state (app, r->artspq, targetstate) || !assert_state (app, r->vrtspq, targetstate))
2209                         goto fail;
2210
2211                 GstPad *teepad, *sinkpad;
2212                 GstPadLinkReturn ret;
2213                 teepad = gst_element_get_request_pad (app->atee, "src_%u");
2214                 sinkpad = gst_element_get_static_pad (r->artspq, "sink");
2215                 ret = gst_pad_link (teepad, sinkpad);
2216                 if (ret != GST_PAD_LINK_OK)
2217                 {
2218                         GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
2219                         goto fail;
2220                 }
2221                 gst_object_unref (teepad);
2222                 gst_object_unref (sinkpad);
2223                 teepad = gst_element_get_request_pad (app->vtee, "src_%u");
2224                 sinkpad = gst_element_get_static_pad (r->vrtspq, "sink");
2225                 ret = gst_pad_link (teepad, sinkpad);
2226                 if (ret != GST_PAD_LINK_OK)
2227                 {
2228                         GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
2229                         goto fail;
2230                 }
2231                 gst_object_unref (teepad);
2232                 gst_object_unref (sinkpad);
2233                 teepad = gst_element_get_request_pad (app->tstee, "src_%u");
2234                 sinkpad = gst_element_get_static_pad (r->tsrtspq, "sink");
2235                 ret = gst_pad_link (teepad, sinkpad);
2236                 if (ret != GST_PAD_LINK_OK)
2237                 {
2238                         GST_ERROR_OBJECT (app, "couldn't link %" GST_PTR_FORMAT " ! %" GST_PTR_FORMAT "", teepad, sinkpad);
2239                         goto fail;
2240                 }
2241                 gst_object_unref (teepad);
2242                 gst_object_unref (sinkpad);
2243
2244                 if (app->tcp_upstream->state != UPSTREAM_STATE_DISABLED || app->hls_server->state != HLS_STATE_DISABLED)
2245                         targetstate = GST_STATE_PLAYING;
2246
2247                 if (!assert_state (app, app->pipeline, targetstate))
2248                         goto fail;
2249
2250                 r->server = g_object_new (GST_TYPE_DREAM_RTSP_SERVER, NULL);
2251                 g_signal_connect (r->server, "client-connected", (GCallback) client_connected, app);
2252
2253                 r->es_factory = gst_dream_rtsp_media_factory_new ();
2254                 gst_rtsp_media_factory_set_launch (GST_RTSP_MEDIA_FACTORY (r->es_factory), "( appsrc name=" ES_VAPPSRC " ! h264parse ! rtph264pay name=pay0 pt=96   appsrc name=" ES_AAPPSRC " ! aacparse ! rtpmp4apay name=pay1 pt=97 )");
2255                 gst_rtsp_media_factory_set_shared (GST_RTSP_MEDIA_FACTORY (r->es_factory), TRUE);
2256
2257                 g_signal_connect (r->es_factory, "media-configure", (GCallback) media_configure, app);
2258
2259                 r->ts_factory = gst_dream_rtsp_media_factory_new ();
2260                 gst_rtsp_media_factory_set_launch (GST_RTSP_MEDIA_FACTORY (r->ts_factory), "( appsrc name=" TS_APPSRC " ! queue ! rtpmp2tpay name=pay0 pt=96 )");
2261                 gst_rtsp_media_factory_set_shared (GST_RTSP_MEDIA_FACTORY (r->ts_factory), TRUE);
2262
2263                 g_signal_connect (r->ts_factory, "media-configure", (GCallback) media_configure, app);
2264                 g_signal_connect (r->ts_factory, "uri-parametrized", (GCallback) uri_parametrized, app);
2265
2266                 DREAMRTSPSERVER_UNLOCK (app);
2267
2268                 gchar *credentials = g_strdup("");
2269                 if (strlen(user)) {
2270                         r->rtsp_user = g_strdup(user);
2271                         r->rtsp_pass = g_strdup(pass);
2272                         GstRTSPToken *token;
2273                         gchar *basic;
2274                         GstRTSPAuth *auth = gst_rtsp_auth_new ();
2275                         gst_rtsp_media_factory_add_role (GST_RTSP_MEDIA_FACTORY (r->es_factory), "user", GST_RTSP_PERM_MEDIA_FACTORY_ACCESS, G_TYPE_BOOLEAN, TRUE, GST_RTSP_PERM_MEDIA_FACTORY_CONSTRUCT, G_TYPE_BOOLEAN, TRUE, NULL);
2276                         gst_rtsp_media_factory_add_role (GST_RTSP_MEDIA_FACTORY (r->ts_factory), "user", GST_RTSP_PERM_MEDIA_FACTORY_ACCESS, G_TYPE_BOOLEAN, TRUE, GST_RTSP_PERM_MEDIA_FACTORY_CONSTRUCT, G_TYPE_BOOLEAN, TRUE, NULL);
2277                         token = gst_rtsp_token_new (GST_RTSP_TOKEN_MEDIA_FACTORY_ROLE, G_TYPE_STRING, "user", NULL);
2278                         basic = gst_rtsp_auth_make_basic (r->rtsp_user, r->rtsp_pass);
2279                         gst_rtsp_server_set_auth (GST_RTSP_SERVER(r->server), auth);
2280                         gst_rtsp_auth_add_basic (auth, basic, token);
2281                         g_free (basic);
2282                         gst_rtsp_token_unref (token);
2283                         credentials = g_strdup_printf("%s:%s@", user, pass);
2284                 }
2285                 else
2286                         r->rtsp_user = r->rtsp_pass = NULL;
2287
2288                 r->rtsp_port = g_strdup_printf("%i", port ? port : DEFAULT_RTSP_PORT);
2289
2290                 gst_rtsp_server_set_service (GST_RTSP_SERVER(r->server), r->rtsp_port);
2291
2292                 if (strlen(path))
2293                 {
2294                         r->rtsp_ts_path = g_strdup_printf ("%s%s", path[0]=='/' ? "" : "/", path);
2295                         r->rtsp_es_path = g_strdup_printf ("%s%s%s", path[0]=='/' ? "" : "/", path, RTSP_ES_PATH_SUFX);
2296                 }
2297                 else
2298                 {
2299                         r->rtsp_ts_path = g_strdup(DEFAULT_RTSP_PATH);
2300                         r->rtsp_es_path = g_strdup_printf ("%s%s", DEFAULT_RTSP_PATH, RTSP_ES_PATH_SUFX);
2301                 }
2302
2303                 r->mounts = gst_rtsp_server_get_mount_points (GST_RTSP_SERVER(r->server));
2304                 gst_rtsp_mount_points_add_factory (r->mounts, r->rtsp_ts_path, g_object_ref(GST_RTSP_MEDIA_FACTORY (r->ts_factory)));
2305                 gst_rtsp_mount_points_add_factory (r->mounts, r->rtsp_es_path, g_object_ref(GST_RTSP_MEDIA_FACTORY (r->es_factory)));
2306                 r->state = RTSP_STATE_IDLE;
2307                 send_signal (app, "rtspStateChanged", g_variant_new("(i)", RTSP_STATE_IDLE));
2308                 GST_DEBUG ("set RTSP_STATE_IDLE");
2309                 r->source_id = gst_rtsp_server_attach (GST_RTSP_SERVER(r->server), NULL);
2310                 r->uri_parameters = NULL;
2311                 GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"enabled_rtsp_server");
2312                 g_print ("dreambox encoder stream ready at rtsp://%s127.0.0.1:%s%s\n", credentials, app->rtsp_server->rtsp_port, app->rtsp_server->rtsp_ts_path);
2313                 g_free (credentials);
2314                 return TRUE;
2315         }
2316         else
2317                 GST_INFO_OBJECT (app, "rtsp server already enabled!");
2318         DREAMRTSPSERVER_UNLOCK (app);
2319         return FALSE;
2320
2321 fail:
2322         DREAMRTSPSERVER_UNLOCK (app);
2323         disable_rtsp_server(app);
2324         return FALSE;
2325 }
2326
2327 gboolean start_rtsp_pipeline(App* app)
2328 {
2329         GST_DEBUG_OBJECT (app, "start_rtsp_pipeline");
2330
2331         DreamRTSPserver *r = app->rtsp_server;
2332         if (r->state == RTSP_STATE_DISABLED)
2333         {
2334                 GST_ERROR_OBJECT (app, "failed to start rtsp pipeline because rtsp server is not enabled!");
2335                 return FALSE;
2336         }
2337
2338         assert_tsmux (app);
2339         if (!assert_state (app, app->pipeline, GST_STATE_PLAYING))
2340         {
2341                 GST_ERROR_OBJECT (app, "GST_STATE_CHANGE_FAILURE for rtsp pipeline");
2342                 return FALSE;
2343         }
2344         GST_INFO_OBJECT(app, "start rtsp pipeline, pipeline going into PLAYING");
2345         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"start_rtsp_pipeline");
2346         GstState state;
2347         gst_element_get_state (GST_ELEMENT(app->pipeline), &state, NULL, 10*GST_SECOND);
2348         GST_INFO_OBJECT(app, "pipeline state=%s", gst_element_state_get_name (state));
2349         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"started_rtsp_pipeline");
2350         return TRUE;
2351 }
2352
2353 static GstPadProbeReturn tsmux_pad_probe_unlink_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2354 {
2355         App *app = user_data;
2356
2357         GstElement *element = gst_pad_get_parent_element(pad);
2358         GST_DEBUG_OBJECT(pad, "tsmux_pad_probe_unlink_cb %" GST_PTR_FORMAT, element);
2359
2360         GstElement *source = NULL;
2361         if (element == app->aq)
2362                 source = app->asrc;
2363         else if (element == app->vq)
2364                 source = app->vsrc;
2365         else if (element == app->tsmux)
2366         {
2367                 gst_element_unlink (app->tsmux, app->tstee);
2368                 gst_bin_remove (GST_BIN (app->pipeline), app->tsmux);
2369                 gst_element_set_state (app->tsmux, GST_STATE_NULL);
2370                 gst_object_unref (app->tsmux);
2371                 app->tsmux = NULL;
2372                 if (gst_element_set_state (app->pipeline, GST_STATE_READY) != GST_STATE_CHANGE_SUCCESS)
2373                         GST_WARNING_OBJECT (pad, "error bringing pipeline back to ready");
2374 //              DREAMRTSPSERVER_UNLOCK (app);
2375                 GST_DEBUG_OBJECT (pad, "finished unlinking and removing sources and tsmux");
2376                 return GST_PAD_PROBE_REMOVE;
2377         }
2378
2379         if (gst_element_set_state (source, GST_STATE_NULL) != GST_STATE_CHANGE_SUCCESS)
2380         {
2381                 GST_ERROR_OBJECT(pad, "can't set %" GST_PTR_FORMAT "'s state to GST_STATE_NULL", source);
2382                 goto fail;
2383         }
2384
2385         GstPad *srcpad, *muxpad;
2386         srcpad = gst_element_get_static_pad (element, "src");
2387         muxpad = gst_pad_get_peer (srcpad);
2388         if (GST_IS_PAD (muxpad))
2389         {
2390                 GST_DEBUG_OBJECT(pad, "srcpad %" GST_PTR_FORMAT " muxpad %" GST_PTR_FORMAT " tsmux %" GST_PTR_FORMAT, srcpad, muxpad, app->tsmux);
2391                 gst_pad_unlink (srcpad, muxpad);
2392                 gst_element_release_request_pad (app->tsmux, muxpad);
2393                 gst_object_unref (muxpad);
2394         }
2395         else
2396                 GST_DEBUG_OBJECT(pad, "srcpad %" GST_PTR_FORMAT "'s peer was already unreffed", srcpad);
2397         gst_object_unref (srcpad);
2398
2399         gst_element_set_state (element, GST_STATE_READY);
2400
2401         GstState state;
2402         gst_element_get_state (GST_ELEMENT(element), &state, NULL, 2*GST_SECOND);
2403
2404         if (state != GST_STATE_READY)
2405         {
2406                 GST_ERROR_OBJECT (app, "%" GST_PTR_FORMAT"'s state = %s (should be GST_STATE_READY)", element, gst_element_state_get_name (state));
2407                 goto fail;
2408         }
2409
2410         GstPad *sinkpad = NULL;
2411         GstElement *nextelem = NULL;
2412         if (element == app->aq)
2413         {
2414                 nextelem = app->vq;
2415                 if (GST_IS_ELEMENT(nextelem))
2416                         sinkpad = gst_element_get_static_pad (nextelem, "sink");
2417         }
2418         if (element == app->vq)
2419         {
2420                 nextelem = app->tsmux;
2421                 if (GST_IS_ELEMENT(nextelem))
2422                         sinkpad = gst_element_get_static_pad (nextelem, "src");
2423         }
2424         if (GST_IS_PAD(sinkpad) && GST_IS_ELEMENT(nextelem))
2425         {
2426                 GST_DEBUG_OBJECT(pad, "element %" GST_PTR_FORMAT " is now in GST_STATE_READY, installing idle probe on %" GST_PTR_FORMAT, element, sinkpad);
2427                 gst_object_ref (nextelem);
2428                 gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, tsmux_pad_probe_unlink_cb, app, NULL);
2429                 gst_object_unref (sinkpad);
2430                 return GST_PAD_PROBE_REMOVE;
2431         }
2432
2433 fail:
2434 //      DREAMRTSPSERVER_UNLOCK (app);
2435         return GST_PAD_PROBE_REMOVE;
2436 }
2437
2438 gboolean halt_source_pipeline(App* app)
2439 {
2440         GST_INFO_OBJECT(app, "halt_source_pipeline...");
2441         GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(app->pipeline),GST_DEBUG_GRAPH_SHOW_ALL,"halt_source_pipeline_pre");
2442         GstPad *sinkpad;
2443         gst_object_ref (app->aq);
2444         sinkpad = gst_element_get_static_pad (app->aq, "sink");
2445         gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, tsmux_pad_probe_unlink_cb, app, NULL);
2446         gst_object_unref (sinkpad);
2447         return TRUE;
2448 }
2449
2450 gboolean pause_source_pipeline(App* app)
2451 {
2452         if (app->rtsp_server->state <= RTSP_STATE_IDLE && app->hls_server->state == HLS_STATE_DISABLED)
2453         {
2454                 GST_INFO_OBJECT(app, "pause_source_pipeline... setting sources to GST_STATE_PAUSED rtsp_server->state=%i hls_server->state=%i", app->rtsp_server->state, app->hls_server->state);
2455                 if (gst_element_set_state (app->asrc, GST_STATE_PAUSED) != GST_STATE_CHANGE_NO_PREROLL || gst_element_set_state (app->vsrc, GST_STATE_PAUSED) != GST_STATE_CHANGE_NO_PREROLL)
2456                 {
2457                         GST_WARNING ("can't set pipeline to GST_STATE_PAUSED!");
2458                         return FALSE;
2459                 }
2460         }
2461         else
2462                 GST_DEBUG ("not pausing pipeline because rtsp_server->state=%i hls_server->state=%i", app->rtsp_server->state, app->hls_server->state);
2463         return TRUE;
2464 }
2465
2466 gboolean unpause_source_pipeline(App* app)
2467 {
2468         GST_INFO_OBJECT(app, "unpause_source_pipeline... setting sources to GST_STATE_PLAYING");
2469         if (gst_element_set_state (app->asrc, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE || gst_element_set_state (app->vsrc, GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE)
2470         {
2471                 GST_WARNING("can't set sources to GST_STATE_PLAYING!");
2472                 return FALSE;
2473         }
2474         return TRUE;
2475 }
2476
2477 GstRTSPFilterResult remove_media_filter_func (GstRTSPSession * sess, GstRTSPSessionMedia * session_media, gpointer user_data)
2478 {
2479         App *app = user_data;
2480         GstRTSPFilterResult res = GST_RTSP_FILTER_REF;
2481         GstRTSPMedia *media;
2482         media = gst_rtsp_session_media_get_media (session_media);
2483 //      DREAMRTSPSERVER_LOCK (app);
2484         if (media == app->rtsp_server->es_media || media == app->rtsp_server->ts_media) {
2485                 GST_DEBUG_OBJECT (app, "matching RTSP media %p in filter, removing...", media);
2486                 res = GST_RTSP_FILTER_REMOVE;
2487         }
2488 //      DREAMRTSPSERVER_UNLOCK (app);
2489         return res;
2490 }
2491
2492 GstRTSPFilterResult remove_session_filter_func (GstRTSPClient *client, GstRTSPSession * sess, gpointer user_data)
2493 {
2494         App *app = user_data;
2495         GList *media_filter_res;
2496         GstRTSPFilterResult res = GST_RTSP_FILTER_REF;
2497         media_filter_res = gst_rtsp_session_filter (sess, remove_media_filter_func, app);
2498         if (g_list_length (media_filter_res) == 0) {
2499                 GST_DEBUG_OBJECT (app, "no more media for session %p, removing...", sess);
2500                 res = GST_RTSP_FILTER_REMOVE;
2501         }
2502         g_list_free (media_filter_res);
2503         return res;
2504 }
2505
2506 GstRTSPFilterResult remove_client_filter_func (GstRTSPServer *server, GstRTSPClient *client, gpointer user_data)
2507 {
2508         App *app = user_data;
2509         GList *session_filter_res;
2510         GstRTSPFilterResult res = GST_RTSP_FILTER_KEEP;
2511         int ret = g_signal_handlers_disconnect_by_func(client, (GCallback) client_closed, app);
2512         GST_INFO("client_filter_func %" GST_PTR_FORMAT "  (number of clients: %i). disconnected %i callback handlers", client, g_list_length(app->rtsp_server->clients_list), ret);
2513         session_filter_res = gst_rtsp_client_session_filter (client, remove_session_filter_func, app);
2514         if (g_list_length (session_filter_res) == 0) {
2515                 GST_DEBUG_OBJECT (app, "no more sessions for client %p, removing...", app);
2516                 res = GST_RTSP_FILTER_REMOVE;
2517         }
2518         g_list_free (session_filter_res);
2519         return res;
2520 }
2521
2522 static GstPadProbeReturn rtsp_pad_probe_unlink_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2523 {
2524         App *app = user_data;
2525         DreamRTSPserver *r = app->rtsp_server;
2526
2527         GstElement *element = gst_pad_get_parent_element(pad);
2528         GstElement *appsink = NULL;
2529         if (element == r->vrtspq)
2530                 appsink = r->vappsink;
2531         else if (element == r->artspq)
2532                 appsink = r->aappsink;
2533         else if (element == r->tsrtspq)
2534                 appsink = r->tsappsink;
2535
2536         GST_DEBUG_OBJECT(pad, "unlink... %" GST_PTR_FORMAT " and %" GST_PTR_FORMAT, element, appsink);
2537
2538         GstPad *teepad;
2539         teepad = gst_pad_get_peer(pad);
2540         gst_pad_unlink (teepad, pad);
2541
2542         GstElement *tee = gst_pad_get_parent_element(teepad);
2543         gst_element_release_request_pad (tee, teepad);
2544         gst_object_unref (teepad);
2545         gst_object_unref (tee);
2546
2547         gst_element_unlink (element, appsink);
2548
2549         GST_DEBUG_OBJECT(pad, "remove... %" GST_PTR_FORMAT " and %" GST_PTR_FORMAT, element, appsink);
2550
2551         gst_bin_remove_many (GST_BIN (app->pipeline), element, appsink, NULL);
2552
2553         GST_DEBUG_OBJECT(pad, "set state null %" GST_PTR_FORMAT " and %" GST_PTR_FORMAT, element, appsink);
2554
2555         gst_element_set_state (appsink, GST_STATE_NULL);
2556         gst_element_set_state (element, GST_STATE_NULL);
2557
2558         GST_DEBUG_OBJECT(pad, "unref.... %" GST_PTR_FORMAT " and %" GST_PTR_FORMAT, element, appsink);
2559
2560         gst_object_unref (element);
2561         gst_object_unref (appsink);
2562         element = NULL;
2563         appsink = NULL;
2564
2565         if (!r->tsappsink && !r->aappsink && !r->vappsink)
2566         {
2567                 GST_INFO("!r->tsappsink && !r->aappsink && !r->vappsink");
2568                 if (app->tcp_upstream->state == UPSTREAM_STATE_DISABLED && app->hls_server->state == HLS_STATE_DISABLED)
2569                         halt_source_pipeline(app);
2570                 GST_INFO("local rtsp server disabled!");
2571         }
2572         return GST_PAD_PROBE_REMOVE;
2573 }
2574
2575 gboolean disable_rtsp_server(App *app)
2576 {
2577         DreamRTSPserver *r = app->rtsp_server;
2578         GST_DEBUG("disable_rtsp_server %p", r->server);
2579         if (r->state >= RTSP_STATE_IDLE)
2580         {
2581                 if (app->rtsp_server->es_media)
2582                         gst_rtsp_server_client_filter(GST_RTSP_SERVER(app->rtsp_server->server), (GstRTSPServerClientFilterFunc) remove_client_filter_func, app);
2583                 DREAMRTSPSERVER_LOCK (app);
2584                 gst_rtsp_mount_points_remove_factory (app->rtsp_server->mounts, app->rtsp_server->rtsp_es_path);
2585                 gst_rtsp_mount_points_remove_factory (app->rtsp_server->mounts, app->rtsp_server->rtsp_ts_path);
2586                 GSource *source = g_main_context_find_source_by_id (g_main_context_default (), r->source_id);
2587                 g_source_destroy(source);
2588 //              g_source_unref(source);
2589 //              GST_DEBUG("disable_rtsp_server source unreffed");
2590                 if (r->mounts)
2591                         g_object_unref(r->mounts);
2592                 if (r->server)
2593                         gst_object_unref(r->server);
2594                 g_free(r->rtsp_user);
2595                 g_free(r->rtsp_pass);
2596                 g_free(r->rtsp_port);
2597                 g_free(r->rtsp_ts_path);
2598                 g_free(r->rtsp_es_path);
2599                 g_free(r->uri_parameters);
2600                 send_signal (app, "rtspStateChanged", g_variant_new("(i)", RTSP_STATE_DISABLED));
2601                 r->state = RTSP_STATE_DISABLED;
2602
2603                 gst_object_ref (r->tsrtspq);
2604                 gst_object_ref (r->artspq);
2605                 gst_object_ref (r->vrtspq);
2606                 gst_object_ref (r->tsappsink);
2607                 gst_object_ref (r->aappsink);
2608                 gst_object_ref (r->vappsink);
2609
2610                 GstPad *sinkpad;
2611                 sinkpad = gst_element_get_static_pad (r->tsrtspq, "sink");
2612                 gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, rtsp_pad_probe_unlink_cb, app, NULL);
2613                 gst_object_unref (sinkpad);
2614                 sinkpad = gst_element_get_static_pad (r->artspq, "sink");
2615                 gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, rtsp_pad_probe_unlink_cb, app, NULL);
2616                 gst_object_unref (sinkpad);
2617                 sinkpad = gst_element_get_static_pad (r->vrtspq, "sink");
2618                 gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, rtsp_pad_probe_unlink_cb, app, NULL);
2619                 gst_object_unref (sinkpad);
2620
2621                 DREAMRTSPSERVER_UNLOCK (app);
2622                 GST_INFO("rtsp_server disabled! set RTSP_STATE_DISABLED");
2623                 return TRUE;
2624         }
2625         return FALSE;
2626 }
2627
2628 static GstPadProbeReturn upstream_pad_probe_unlink_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2629 {
2630         App *app = user_data;
2631         DreamTCPupstream *t = app->tcp_upstream;
2632
2633         GstElement *element = gst_pad_get_parent_element(pad);
2634
2635         GST_DEBUG_OBJECT (pad, "upstream_pad_probe_unlink_cb %" GST_PTR_FORMAT, element);
2636
2637         if ((GST_PAD_PROBE_INFO_TYPE(info) & GST_PAD_PROBE_TYPE_IDLE) && element && element == t->tstcpq)
2638         {
2639                 GstPad *teepad;
2640                 teepad = gst_pad_get_peer(pad);
2641                 if (!teepad)
2642                 {
2643                         GST_ERROR_OBJECT (pad, "has no peer! tstcpq=%" GST_PTR_FORMAT", tcpsink=%" GST_PTR_FORMAT", tee=%" GST_PTR_FORMAT,t->tstcpq, t->tcpsink, app->tstee);
2644                         return GST_PAD_PROBE_REMOVE;
2645                 }
2646                 GST_DEBUG_OBJECT (pad, "GST_PAD_PROBE_TYPE_IDLE -> unlink and remove tcpsink");
2647                 gst_pad_unlink (teepad, pad);
2648
2649                 GstElement *tee = gst_pad_get_parent_element(teepad);
2650                 gst_element_release_request_pad (tee, teepad);
2651                 gst_object_unref (teepad);
2652                 gst_object_unref (tee);
2653
2654                 gst_object_ref (t->tcpsink);
2655                 gst_element_unlink (t->tstcpq, t->tcpsink);
2656                 gst_bin_remove_many (GST_BIN (app->pipeline), t->tstcpq, t->tcpsink, NULL);
2657
2658                 gst_element_set_state (t->tcpsink, GST_STATE_NULL);
2659                 gst_element_set_state (t->tstcpq, GST_STATE_NULL);
2660
2661                 gst_object_unref (t->tstcpq);
2662                 gst_object_unref (t->tcpsink);
2663                 t->tstcpq = NULL;
2664                 t->tcpsink = NULL;
2665
2666                 if (app->rtsp_server->state < RTSP_STATE_RUNNING && app->hls_server->state == HLS_STATE_DISABLED)
2667                         halt_source_pipeline(app);
2668                 GST_INFO("tcp_upstream disabled!");
2669                 t->state = UPSTREAM_STATE_DISABLED;
2670                 send_signal (app, "upstreamStateChanged", g_variant_new("(i)", t->state));
2671         }
2672         GST_DEBUG_OBJECT (pad, "upstream_pad_probe_unlink_cb returns GST_PAD_PROBE_REMOVE");
2673         return GST_PAD_PROBE_REMOVE;
2674 }
2675
2676 gboolean disable_tcp_upstream(App *app)
2677 {
2678         GstState state;
2679         gst_element_get_state (GST_ELEMENT(app->pipeline), &state, NULL, 3*GST_SECOND);
2680         GST_DEBUG("disable_tcp_upstream (current pipeline state=%s)", gst_element_state_get_name (state));
2681         DreamTCPupstream *t = app->tcp_upstream;
2682         if (t->state >= UPSTREAM_STATE_CONNECTING)
2683         {
2684                 GstPad *sinkpad;
2685                 if (t->id_bitrate_measure)
2686                 {
2687                         sinkpad = gst_element_get_static_pad (t->tcpsink, "sink");
2688                         gst_pad_remove_probe (sinkpad, t->id_bitrate_measure);
2689                         t->id_bitrate_measure = 0;
2690                         gst_object_unref (sinkpad);
2691                 }
2692                 gst_object_ref (t->tstcpq);
2693                 sinkpad = gst_element_get_static_pad (t->tstcpq, "sink");
2694                 gulong probe_id = gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_IDLE, upstream_pad_probe_unlink_cb, app, NULL);
2695                 GST_DEBUG("added upstream_pad_probe_unlink_cb with probe_id = %lu on %" GST_PTR_FORMAT"", probe_id, sinkpad);
2696                 gst_object_unref (sinkpad);
2697                 return TRUE;
2698         }
2699         return FALSE;
2700 }
2701
2702 gboolean destroy_pipeline(App *app)
2703 {
2704         GST_DEBUG_OBJECT(app, "destroy_pipeline @%p", app->pipeline);
2705         if (app->pipeline)
2706         {
2707                 get_source_properties (app);
2708                 GstStateChangeReturn sret = gst_element_set_state (app->pipeline, GST_STATE_NULL);
2709                 if (sret == GST_STATE_CHANGE_ASYNC)
2710                 {
2711                         GstState state;
2712                         gst_element_get_state (GST_ELEMENT(app->pipeline), &state, NULL, 3*GST_SECOND);
2713                         if (state != GST_STATE_NULL)
2714                                 GST_INFO_OBJECT(app, "%" GST_PTR_FORMAT"'s state=%s", app->pipeline, gst_element_state_get_name (state));
2715                 }
2716                 gst_object_unref (app->pipeline);
2717                 gst_object_unref (app->clock);
2718                 GST_INFO_OBJECT(app, "source pipeline destroyed");
2719                 app->pipeline = NULL;
2720                 return TRUE;
2721         }
2722         else
2723                 GST_INFO_OBJECT(app, "don't destroy inexistant pipeline");
2724         return FALSE;
2725 }
2726
2727 gboolean watchdog_ping(gpointer user_data)
2728 {
2729         App *app = user_data;
2730         GST_TRACE_OBJECT(app, "sending watchdog ping!");
2731         if (app->dbus_connection)
2732                 g_dbus_connection_emit_signal (app->dbus_connection, NULL, object_name, service, "ping", NULL, NULL);
2733         return TRUE;
2734 }
2735
2736 gboolean quit_signal(gpointer loop)
2737 {
2738         GST_INFO_OBJECT(loop, "caught SIGINT");
2739         g_main_loop_quit((GMainLoop*)loop);
2740         return FALSE;
2741 }
2742
2743 gboolean get_dot_graph (gpointer user_data)
2744 {
2745         App *app = user_data;
2746         GST_INFO_OBJECT(app, "caught SIGUSR1, saving pipeline graph...");
2747         GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (app->pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "dreamrtspserver-sigusr");
2748         return TRUE;
2749 }
2750
2751 int main (int argc, char *argv[])
2752 {
2753         App app;
2754         guint owner_id;
2755
2756         gst_init (0, NULL);
2757
2758         GST_DEBUG_CATEGORY_INIT (dreamrtspserver_debug, "dreamrtspserver",
2759                         GST_DEBUG_BOLD | GST_DEBUG_FG_YELLOW | GST_DEBUG_BG_BLUE,
2760                         "Dreambox RTSP server daemon");
2761
2762         memset (&app, 0, sizeof(app));
2763         memset (&app.source_properties, 0, sizeof(SourceProperties));
2764         app.source_properties.gopLength = 0; //auto
2765         app.source_properties.gopOnSceneChange = FALSE;
2766         app.source_properties.openGop = FALSE;
2767         app.source_properties.bFrames = 2; //default
2768         app.source_properties.pFrames = 1; //default
2769         app.source_properties.profile = 0; //main
2770         g_mutex_init (&app.rtsp_mutex);
2771
2772         introspection_data = g_dbus_node_info_new_for_xml (introspection_xml, NULL);
2773         app.dbus_connection = NULL;
2774
2775         owner_id = g_bus_own_name (G_BUS_TYPE_SYSTEM,
2776                                    service,
2777                             G_BUS_NAME_OWNER_FLAGS_NONE,
2778                             on_bus_acquired,
2779                             on_name_acquired,
2780                             on_name_lost,
2781                             &app,
2782                             NULL);
2783
2784         if (!create_source_pipeline(&app))
2785                 g_print ("Failed to create source pipeline!");
2786
2787 #if WATCHDOG_TIMEOUT > 0
2788         g_timeout_add_seconds (WATCHDOG_TIMEOUT, watchdog_ping, &app);
2789 #endif
2790
2791         app.tcp_upstream = malloc(sizeof(DreamTCPupstream));
2792         app.tcp_upstream->state = UPSTREAM_STATE_DISABLED;
2793         app.tcp_upstream->auto_bitrate = AUTO_BITRATE;
2794
2795         app.hls_server = create_hls_server(&app);
2796
2797         app.rtsp_server = create_rtsp_server(&app);
2798
2799         app.loop = g_main_loop_new (NULL, FALSE);
2800         g_unix_signal_add (SIGINT, quit_signal, app.loop);
2801         g_unix_signal_add (SIGUSR1, (GSourceFunc) get_dot_graph, &app);
2802
2803         g_main_loop_run (app.loop);
2804
2805         if (app.tcp_upstream->state > UPSTREAM_STATE_DISABLED)
2806                 disable_tcp_upstream(&app);
2807         if (app.rtsp_server->state >= RTSP_STATE_IDLE)
2808                 disable_rtsp_server(&app);
2809         if (app.rtsp_server->clients_list)
2810                 g_list_free (app.rtsp_server->clients_list);
2811
2812         if (app.hls_server->state >= HLS_STATE_IDLE)
2813                 disable_hls_server(&app);
2814
2815         free(app.hls_server);
2816         free(app.rtsp_server);
2817         free(app.tcp_upstream);
2818
2819         destroy_pipeline(&app);
2820
2821         g_main_loop_unref (app.loop);
2822
2823         g_mutex_clear (&app.rtsp_mutex);
2824
2825         g_bus_unown_name (owner_id);
2826         g_dbus_node_info_unref (introspection_data);
2827
2828         return 0;
2829 }