implement encoder clock provider, implement independent encoder read thread that...
[gst-plugin-dreamsource.git] / src / gstdreamvideosource.c
1 /*
2  * GStreamer dreamvideosource
3  * Copyright 2014-2015 Andreas Frisch <fraxinas@opendreambox.org>
4  *
5  * This program is licensed under the Creative Commons
6  * Attribution-NonCommercial-ShareAlike 3.0 Unported
7  * License. To view a copy of this license, visit
8  * http://creativecommons.org/licenses/by-nc-sa/3.0/ or send a letter to
9  * Creative Commons,559 Nathan Abbott Way,Stanford,California 94305,USA.
10  *
11  * Alternatively, this program may be distributed and executed on
12  * hardware which is licensed by Dream Property GmbH.
13  *
14  * This program is NOT free software. It is open source, you are allowed
15  * to modify it (if you keep the license), but it may not be commercially
16  * distributed other than under the conditions noted above.
17  */
18
19 #ifdef HAVE_CONFIG_H
20 #include <config.h>
21 #endif
22
23 #include <gst/gst.h>
24 #include "gstdreamvideosource.h"
25
26 GST_DEBUG_CATEGORY_STATIC (dreamvideosource_debug);
27 #define GST_CAT_DEFAULT dreamvideosource_debug
28
29 GType gst_dreamvideosource_input_mode_get_type (void)
30 {
31         static volatile gsize input_mode_type = 0;
32         static const GEnumValue input_mode[] = {
33                 {GST_DREAMVIDEOSOURCE_INPUT_MODE_LIVE, "GST_DREAMVIDEOSOURCE_INPUT_MODE_LIVE", "live"},
34                 {GST_DREAMVIDEOSOURCE_INPUT_MODE_HDMI_IN, "GST_DREAMVIDEOSOURCE_INPUT_MODE_HDMI_IN", "hdmi_in"},
35                 {GST_DREAMVIDEOSOURCE_INPUT_MODE_BACKGROUND, "GST_DREAMVIDEOSOURCE_INPUT_MODE_BACKGROUND", "background"},
36                 {0, NULL, NULL},
37         };
38
39         if (g_once_init_enter (&input_mode_type)) {
40                 GType tmp = g_enum_register_static ("GstDreamVideoSourceInputMode", input_mode);
41                 g_once_init_leave (&input_mode_type, tmp);
42         }
43         return (GType) input_mode_type;
44 }
45
46 enum
47 {
48         SIGNAL_GET_BASE_PTS,
49         LAST_SIGNAL
50 };
51
52 enum
53 {
54         ARG_0,
55         ARG_CAPS,
56         ARG_BITRATE,
57         ARG_INPUT_MODE
58 };
59
60 static guint gst_dreamvideosource_signals[LAST_SIGNAL] = { 0 };
61
62 #define DEFAULT_BITRATE     2048
63 #define DEFAULT_FRAMERATE   25
64 #define DEFAULT_WIDTH       1280
65 #define DEFAULT_HEIGHT      720
66 #define DEFAULT_INPUT_MODE  GST_DREAMVIDEOSOURCE_INPUT_MODE_LIVE
67 #define DEFAULT_BUFFER_SIZE 16
68
69 static GstStaticPadTemplate srctemplate =
70     GST_STATIC_PAD_TEMPLATE ("src",
71         GST_PAD_SRC,
72         GST_PAD_ALWAYS,
73         GST_STATIC_CAPS ("video/x-h264, "
74         "width = { 720, 1280, 1920 }, "
75         "height = { 576, 720, 1080 }, "
76         "framerate = { 25/1, 30/1, 50/1, 60/1 }, "
77         "pixel-aspect-ratio = { 5/4, 16/9 }, "
78         "stream-format = (string) byte-stream, "
79         "profile = (string) main")
80     );
81
82 #define gst_dreamvideosource_parent_class parent_class
83 G_DEFINE_TYPE (GstDreamVideoSource, gst_dreamvideosource, GST_TYPE_PUSH_SRC);
84
85 static GstCaps *gst_dreamvideosource_getcaps (GstBaseSrc * bsrc, GstCaps * filter);
86 static gboolean gst_dreamvideosource_setcaps (GstBaseSrc * bsrc, GstCaps * caps);
87 static GstCaps *gst_dreamvideosource_fixate (GstBaseSrc * bsrc, GstCaps * caps);
88
89 static gboolean gst_dreamvideosource_start (GstBaseSrc * bsrc);
90 static gboolean gst_dreamvideosource_stop (GstBaseSrc * bsrc);
91 static gboolean gst_dreamvideosource_unlock (GstBaseSrc * bsrc);
92 static gboolean gst_dreamvideosource_unlock_stop (GstBaseSrc * bsrc);
93 static void gst_dreamvideosource_dispose (GObject * gobject);
94 static GstFlowReturn gst_dreamvideosource_create (GstPushSrc * psrc, GstBuffer ** outbuf);
95
96 static void gst_dreamvideosource_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec);
97 static void gst_dreamvideosource_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec);
98
99 static GstStateChangeReturn gst_dreamvideosource_change_state (GstElement * element, GstStateChange transition);
100 static gint64 gst_dreamvideosource_get_base_pts (GstDreamVideoSource *self);
101
102 static void gst_dreamvideosource_read_thread_func (GstDreamVideoSource * self);
103
104 #ifdef PROVIDE_CLOCK
105 static GstClock *gst_dreamvideosource_provide_clock (GstElement * elem);
106 // static GstClockTime gst_dreamvideosource_get_encoder_time_ (GstClock * clock, GstBaseSrc * bsrc);
107 #endif
108
109 static void
110 gst_dreamvideosource_class_init (GstDreamVideoSourceClass * klass)
111 {
112         GObjectClass *gobject_class;
113         GstElementClass *gstelement_class;
114         GstBaseSrcClass *gstbsrc_class;
115         GstPushSrcClass *gstpush_src_class;
116
117         gobject_class = (GObjectClass *) klass;
118         gstelement_class = (GstElementClass *) klass;
119         gstbsrc_class = (GstBaseSrcClass *) klass;
120         gstpush_src_class = (GstPushSrcClass *) klass;
121
122         gobject_class->set_property = gst_dreamvideosource_set_property;
123         gobject_class->get_property = gst_dreamvideosource_get_property;
124         gobject_class->dispose = gst_dreamvideosource_dispose;
125
126         gst_element_class_add_pad_template (gstelement_class,
127                                             gst_static_pad_template_get (&srctemplate));
128
129         gst_element_class_set_static_metadata (gstelement_class,
130             "Dream Video source", "Source/Video",
131             "Provide an h.264 video elementary stream from Dreambox encoder device",
132             "Andreas Frisch <fraxinas@opendreambox.org>");
133
134         gstelement_class->change_state = gst_dreamvideosource_change_state;
135
136         gstbsrc_class->get_caps = gst_dreamvideosource_getcaps;
137         gstbsrc_class->set_caps = gst_dreamvideosource_setcaps;
138         gstbsrc_class->fixate = gst_dreamvideosource_fixate;
139         gstbsrc_class->start = gst_dreamvideosource_start;
140         gstbsrc_class->stop = gst_dreamvideosource_stop;
141         gstbsrc_class->unlock = gst_dreamvideosource_unlock;
142         gstbsrc_class->unlock_stop = gst_dreamvideosource_unlock_stop;
143
144         gstpush_src_class->create = gst_dreamvideosource_create;
145
146 #ifdef PROVIDE_CLOCK
147         gstelement_class->provide_clock = GST_DEBUG_FUNCPTR (gst_dreamvideosource_provide_clock);
148 //      g_type_class_ref (GST_TYPE_SYSTEM_CLOCK);
149 #endif
150
151         g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BITRATE,
152           g_param_spec_int ("bitrate", "Bitrate (kb/s)",
153             "Bitrate in kbit/sec", 16, 200000, DEFAULT_BITRATE,
154             G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
155
156         g_object_class_install_property (gobject_class, ARG_CAPS,
157           g_param_spec_boxed ("caps", "Caps",
158             "The caps for the source stream", GST_TYPE_CAPS,
159             G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
160
161         g_object_class_install_property (gobject_class, ARG_INPUT_MODE,
162           g_param_spec_enum ("input-mode", "Input Mode",
163             "Select the input source of the video stream",
164             GST_TYPE_DREAMVIDEOSOURCE_INPUT_MODE, DEFAULT_INPUT_MODE,
165             G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
166
167         gst_dreamvideosource_signals[SIGNAL_GET_BASE_PTS] =
168                 g_signal_new ("get-base-pts",
169                 G_TYPE_FROM_CLASS (klass),
170                 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
171                 G_STRUCT_OFFSET (GstDreamVideoSourceClass, get_base_pts),
172                 NULL, NULL, gst_dreamsource_marshal_INT64__VOID, G_TYPE_INT64, 0);
173
174         klass->get_base_pts = gst_dreamvideosource_get_base_pts;
175 }
176
177 static gint64
178 gst_dreamvideosource_get_base_pts (GstDreamVideoSource *self)
179 {
180         GST_DEBUG_OBJECT (self, "gst_dreamvideosource_get_base_pts %" GST_TIME_FORMAT"", GST_TIME_ARGS (self->base_pts) );
181         return self->base_pts;
182 }
183
184 static void gst_dreamvideosource_set_bitrate (GstDreamVideoSource * self, uint32_t bitrate)
185 {
186         if (!self->encoder || !self->encoder->fd)
187                 return;
188         g_mutex_lock (&self->mutex);
189         uint32_t vbr = bitrate*1000;
190         int ret = ioctl(self->encoder->fd, VENC_SET_BITRATE, &vbr);
191         if (ret != 0)
192         {
193                 GST_WARNING_OBJECT (self, "can't set video bitrate to %i bytes/s!", vbr);
194                 g_mutex_unlock (&self->mutex);
195                 return;
196         }
197         GST_INFO_OBJECT (self, "set video bitrate to %i kBytes/s", bitrate);
198         self->video_info.bitrate = vbr;
199         g_mutex_unlock (&self->mutex);
200 }
201
202 static gboolean gst_dreamvideosource_set_format (GstDreamVideoSource * self, VideoFormatInfo * info)
203 {
204         if (!self->encoder || !self->encoder->fd)
205         {
206                 GST_ERROR_OBJECT (self, "can't set format because encoder device not opened!");
207                 return FALSE;
208         }
209
210         g_mutex_lock (&self->mutex);
211
212         if (info->fps_n > 0)
213         {
214                 int venc_fps = 0;
215                 switch (info->fps_n) {
216                         case 25:
217                                 venc_fps = rate_25;
218                                 break;
219                         case 30:
220                                 venc_fps = rate_30;
221                                 break;
222                         case 50:
223                                 venc_fps = rate_50;
224                                 break;
225                         case 60:
226                                 venc_fps = rate_60;
227                                 break;
228                         default:
229                                 GST_ERROR_OBJECT (self, "invalid framerate %d/%d", info->fps_n, info->fps_d);
230                                 goto fail;
231                 }
232                 if (!ioctl(self->encoder->fd, VENC_SET_FRAMERATE, &venc_fps))
233                         GST_INFO_OBJECT (self, "set framerate to %d/%d -> ioctrl(%d, VENC_SET_FRAMERATE, &%d)", info->fps_n, info->fps_d, self->encoder->fd, venc_fps);
234                 else
235                 {
236                         GST_WARNING_OBJECT (self, "can't set framerate to %d/%d -> ioctrl(%d, VENC_SET_FRAMERATE, &%d)", info->fps_n, info->fps_d, self->encoder->fd, venc_fps);
237                         goto fail;
238                 }
239         }
240
241         if (info->width && info->height)
242         {
243                 int venc_size = 0;
244                 if ( info->width == 720 && info->height == 576 )
245                         venc_size = fmt_720x576;
246                 else if ( info->width == 1280 && info->height == 720)
247                         venc_size = fmt_1280x720;
248                 else if ( info->width == 1920 && info->height == 1080)
249                         venc_size = fmt_1920x1080;
250                 else
251                 {
252                         GST_ERROR_OBJECT (self, "invalid resolution %dx%d", info->width, info->height);
253                         goto fail;
254                 }
255                 if (!ioctl(self->encoder->fd, VENC_SET_RESOLUTION, &venc_size))
256                         GST_INFO_OBJECT (self, "set resolution to %dx%d -> ioctrl(%d, VENC_SET_RESOLUTION, &%d)", info->width, info->height, self->encoder->fd, venc_size);
257                 else
258                 {
259                         GST_WARNING_OBJECT (self, "can't set resolution to %dx%d -> ioctrl(%d, VENC_SET_RESOLUTION, &%d)", info->width, info->height, self->encoder->fd, venc_size);
260                         goto fail;
261                 }
262         }
263
264         self->video_info = *info;
265         g_mutex_unlock (&self->mutex);
266         return TRUE;
267
268 fail:
269         g_mutex_unlock (&self->mutex);
270         return FALSE;
271 }
272
273 void gst_dreamvideosource_set_input_mode (GstDreamVideoSource *self, GstDreamVideoSourceInputMode mode)
274 {
275         g_return_if_fail (GST_IS_DREAMVIDEOSOURCE (self));
276         GEnumValue *val = g_enum_get_value (G_ENUM_CLASS (g_type_class_ref (GST_TYPE_DREAMVIDEOSOURCE_INPUT_MODE)), mode);
277         if (!val)
278         {
279                 GST_ERROR_OBJECT (self, "no such input_mode %i!", mode);
280                 goto out;
281         }
282         const gchar *value_nick = val->value_nick;
283         GST_DEBUG_OBJECT (self, "setting input_mode to %s (%i)...", value_nick, mode);
284
285         g_mutex_lock (&self->mutex);
286         if (!self->encoder || !self->encoder->fd)
287         {
288                 GST_ERROR_OBJECT (self, "can't set input mode because encoder device not opened!");
289                 goto out;
290         }
291         int int_mode = mode;
292         int ret = ioctl(self->encoder->fd, VENC_SET_SOURCE, &int_mode);
293         if (ret != 0)
294         {
295                 GST_WARNING_OBJECT (self, "can't set input mode to %s (%i) error: %s", value_nick, mode, strerror(errno));
296                 goto out;
297         }
298         GST_INFO_OBJECT (self, "successfully set input mode to %s (%i)", value_nick, mode);
299         self->input_mode = mode;
300 out:
301         g_mutex_unlock (&self->mutex);
302         return;
303 }
304
305 GstDreamVideoSourceInputMode gst_dreamvideosource_get_input_mode (GstDreamVideoSource *self)
306 {
307         GstDreamVideoSourceInputMode result;
308         g_return_val_if_fail (GST_IS_DREAMVIDEOSOURCE (self), -1);
309         g_mutex_lock (&self->mutex);
310         result =self->input_mode;
311         g_mutex_unlock (&self->mutex);
312         return result;
313 }
314
315 gboolean
316 gst_dreamvideosource_plugin_init (GstPlugin *plugin)
317 {
318         GST_DEBUG_CATEGORY_INIT (dreamvideosource_debug, "dreamvideosource", 0, "dreamvideosource");
319         return gst_element_register (plugin, "dreamvideosource", GST_RANK_PRIMARY, GST_TYPE_DREAMVIDEOSOURCE);
320 }
321
322 static void
323 gst_dreamvideosource_init (GstDreamVideoSource * self)
324 {
325         GstPadTemplate *pad_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS(self), "src");
326         self->current_caps = gst_pad_template_get_caps (pad_template);
327
328         self->encoder = NULL;
329         self->descriptors_available = 0;
330         self->input_mode = DEFAULT_INPUT_MODE;
331
332         self->buffer_size = DEFAULT_BUFFER_SIZE;
333         g_queue_init (&self->current_frames);
334         self->readthread = NULL;
335
336         g_mutex_init (&self->mutex);
337         g_cond_init (&self->cond);
338         READ_SOCKET (self) = -1;
339         WRITE_SOCKET (self) = -1;
340
341         gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
342         gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
343
344         self->encoder = malloc(sizeof(EncoderInfo));
345
346         if(!self->encoder) {
347                 GST_ERROR_OBJECT(self,"out of space");
348                 return;
349         }
350
351         char fn_buf[32];
352         sprintf(fn_buf, "/dev/venc%d", 0);
353         self->encoder->fd = open(fn_buf, O_RDWR | O_SYNC);
354         if(self->encoder->fd <= 0) {
355                 GST_ERROR_OBJECT(self,"cannot open device %s (%s)", fn_buf, strerror(errno));
356                 free(self->encoder);
357                 self->encoder = NULL;
358                 return;
359         }
360
361         self->encoder->buffer = malloc(VBUFSIZE);
362         if(!self->encoder->buffer) {
363                 GST_ERROR_OBJECT(self,"cannot alloc buffer");
364                 return;
365         }
366
367         self->encoder->cdb = (unsigned char *)mmap(0, VMMAPSIZE, PROT_READ, MAP_PRIVATE, self->encoder->fd, 0);
368
369         if(!self->encoder->cdb) {
370                 GST_ERROR_OBJECT(self,"cannot mmap cdb");
371                 return;
372         }
373
374 #ifdef PROVIDE_CLOCK
375         self->encoder_clock = gst_dreamsource_clock_new ("GstDreamVideoSinkClock", self->encoder->fd);
376         GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
377 #endif
378
379 #ifdef dump
380         self->dumpfd = open("/media/hdd/movie/dreamvideosource.dump", O_WRONLY | O_CREAT | O_TRUNC);
381         GST_DEBUG_OBJECT (self, "dumpfd = %i (%s)", self->dumpfd, (self->dumpfd > 0) ? "OK" : strerror(errno));
382 #endif
383 }
384
385 static void
386 gst_dreamvideosource_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec)
387 {
388         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (object);
389
390         switch (prop_id) {
391                 case ARG_CAPS:
392                 {
393                         GstCaps *caps = gst_caps_copy (gst_value_get_caps (value));
394                         gst_dreamvideosource_setcaps(GST_BASE_SRC(object), caps);
395                         gst_caps_unref (caps);
396                         break;
397                 }
398                 case ARG_BITRATE:
399                         gst_dreamvideosource_set_bitrate(self, g_value_get_int (value));
400                         break;
401                 case ARG_INPUT_MODE:
402                         gst_dreamvideosource_set_input_mode (self, g_value_get_enum (value));
403                         break;
404                 default:
405                         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
406                         break;
407         }
408 }
409
410 static void
411 gst_dreamvideosource_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec)
412 {
413         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (object);
414
415         switch (prop_id) {
416                 case ARG_CAPS:
417                         g_value_take_boxed (value, gst_dreamvideosource_getcaps (GST_BASE_SRC(object), GST_CAPS_ANY));
418                         break;
419                 case ARG_BITRATE:
420                         g_value_set_int (value, self->video_info.bitrate/1000);
421                         break;
422                 case ARG_INPUT_MODE:
423                         g_value_set_enum (value, gst_dreamvideosource_get_input_mode (self));
424                         break;
425                 default:
426                         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
427                         break;
428         }
429 }
430
431 static GstCaps *
432 gst_dreamvideosource_getcaps (GstBaseSrc * bsrc, GstCaps * filter)
433 {
434         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (bsrc);
435         GstCaps *caps = gst_caps_copy(self->current_caps);
436
437         GST_LOG_OBJECT (self, "gst_dreamvideosource_getcaps %" GST_PTR_FORMAT " filter %" GST_PTR_FORMAT, caps, filter);
438
439         if (filter) {
440                 GstCaps *intersection;
441                 intersection = gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
442                 gst_caps_unref (caps);
443                 caps = intersection;
444         }
445
446         GST_DEBUG_OBJECT (self, "return caps %" GST_PTR_FORMAT, caps);
447         return caps;
448 }
449
450 static gboolean
451 gst_dreamvideosource_setcaps (GstBaseSrc * bsrc, GstCaps * caps)
452 {
453         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (bsrc);
454         GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (bsrc);
455         GstCaps *current_caps;
456         const GstStructure *structure;
457         VideoFormatInfo info;
458         gboolean ret;
459         int width, height;
460         const GValue *framerate, *par;
461         structure = gst_caps_get_structure (caps, 0);
462
463         current_caps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (bsrc));
464         if (current_caps && gst_caps_is_equal (current_caps, caps)) {
465                 GST_DEBUG_OBJECT (self, "New caps equal to old ones: %" GST_PTR_FORMAT, caps);
466                 ret = TRUE;
467         } else {
468                 GstState state;
469                 gst_element_get_state (GST_ELEMENT(self), &state, NULL, 1*GST_MSECOND);
470                 if (state == GST_STATE_PLAYING)
471                 {
472                         GST_WARNING_OBJECT (self, "can't change caps while in PLAYING state %" GST_PTR_FORMAT, caps);
473                         return TRUE;
474                 }
475                 else if (gst_structure_has_name (structure, "video/x-h264"))
476                 {
477                         memset (&info, 0, sizeof(VideoFormatInfo));
478                         ret = gst_structure_get_int (structure, "width", &info.width);
479                         ret &= gst_structure_get_int (structure, "height", &info.height);
480                         framerate = gst_structure_get_value (structure, "framerate");
481                         if (GST_VALUE_HOLDS_FRACTION(framerate)) {
482                                 info.fps_n = gst_value_get_fraction_numerator (framerate);
483                                 info.fps_d = gst_value_get_fraction_denominator (framerate);
484                         }
485                         GST_INFO_OBJECT (self, "set caps %" GST_PTR_FORMAT, caps);
486                         gst_caps_replace (&self->current_caps, caps);
487
488                         if (gst_dreamvideosource_set_format(self, &info) && gst_caps_is_fixed(caps))
489                                 ret = gst_pad_push_event (bsrc->srcpad, gst_event_new_caps (caps));
490                 }
491                 else {
492                         GST_WARNING_OBJECT (self, "unsupported caps: %" GST_PTR_FORMAT, caps);
493                         ret = FALSE;
494                 }
495         }
496         if (current_caps)
497                 gst_caps_unref (current_caps);
498         return ret;
499 }
500
501 static GstCaps *
502 gst_dreamvideosource_fixate (GstBaseSrc * bsrc, GstCaps * caps)
503 {
504         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (bsrc);
505         GstStructure *structure;
506
507         caps = gst_caps_make_writable (caps);
508         structure = gst_caps_get_structure (caps, 0);
509
510         gst_structure_fixate_field_nearest_int (structure, "width", DEFAULT_WIDTH);
511         gst_structure_fixate_field_nearest_int (structure, "height", DEFAULT_HEIGHT);
512         gst_structure_fixate_field_nearest_fraction (structure, "framerate", DEFAULT_FRAMERATE, 1);
513         gst_structure_fixate_field_nearest_fraction (structure, "pixel-aspect-ratio", DEFAULT_WIDTH, DEFAULT_HEIGHT);
514
515         caps = GST_BASE_SRC_CLASS (parent_class)->fixate (bsrc, caps);
516         GST_DEBUG_OBJECT (bsrc, "fixate caps: %" GST_PTR_FORMAT, caps);
517         return caps;
518 }
519
520 static gboolean gst_dreamvideosource_unlock (GstBaseSrc * bsrc)
521 {
522         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (bsrc);
523         GST_DEBUG_OBJECT (self, "stop creating buffers");
524         g_mutex_lock (&self->mutex);
525         self->flushing = TRUE;
526         GST_DEBUG_OBJECT (self, "set flushing TRUE");
527         g_cond_signal (&self->cond);
528         g_mutex_unlock (&self->mutex);
529         return TRUE;
530 }
531
532 static gboolean gst_dreamvideosource_unlock_stop (GstBaseSrc * bsrc)
533 {
534         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (bsrc);
535         GST_DEBUG_OBJECT (self, "start creating buffers...");
536         g_mutex_lock (&self->mutex);
537         self->flushing = FALSE;
538         g_queue_foreach (&self->current_frames, (GFunc) gst_buffer_unref, NULL);
539         g_queue_clear (&self->current_frames);
540         g_mutex_unlock (&self->mutex);
541         return TRUE;
542 }
543
544 static void gst_dreamvideosource_read_thread_func (GstDreamVideoSource * self)
545 {
546         EncoderInfo *enc = self->encoder;
547         GstBuffer *readbuf;
548
549         if (!enc) {
550                 GST_WARNING_OBJECT (self, "encoder device not opened!");
551                 return;
552         }
553
554         GST_DEBUG_OBJECT (self, "enter read thread");
555
556         GstMessage *message;
557         GValue val = { 0 };
558
559         message = gst_message_new_stream_status (GST_OBJECT_CAST (self), GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT_CAST (GST_OBJECT_PARENT(self)));
560         g_value_init (&val, GST_TYPE_G_THREAD);
561         g_value_set_boxed (&val, self->readthread);
562         gst_message_set_stream_status_object (message, &val);
563         g_value_unset (&val);
564         GST_DEBUG_OBJECT (self, "posting ENTER stream status");
565         gst_element_post_message (GST_ELEMENT_CAST (self), message);
566
567         while (TRUE) {
568                 readbuf = NULL;
569                 {
570                         struct pollfd rfd[2];
571                         int timeout, nfds;
572
573                         rfd[0].fd = READ_SOCKET (self);
574                         rfd[0].events = POLLIN | POLLERR | POLLHUP | POLLPRI;
575
576                         if (self->descriptors_available == 0)
577                         {
578                                 rfd[1].fd = enc->fd;
579                                 rfd[1].events = POLLIN;
580                                 self->descriptors_count = 0;
581                                 timeout = 200;
582                                 nfds = 2;
583                         }
584                         else
585                         {
586                                 rfd[1].revents = 0;
587                                 nfds = 1;
588                                 timeout = 0;
589                         }
590
591                         int ret = poll(rfd, nfds, timeout);
592
593                         if (G_UNLIKELY (ret == -1))
594                         {
595                                 GST_ERROR_OBJECT (self, "SELECT ERROR!");
596                                 goto stop_running;
597                         }
598                         else if ( ret == 0 && self->descriptors_available == 0 )
599                         {
600                                 g_mutex_lock (&self->mutex);
601                                 if (self->flushing)
602                                 {
603                                         GST_DEBUG_OBJECT (self, "FLUSHING!");
604                                         g_cond_signal (&self->cond);
605                                         continue;
606                                 }
607                                 g_mutex_unlock (&self->mutex);
608                                 GST_DEBUG_OBJECT (self, "SELECT TIMEOUT");
609                                 //!!! TODO generate valid dummy payload
610                                 readbuf = gst_buffer_new();
611                         }
612                         else if ( rfd[0].revents )
613                         {
614                                 char command;
615                                 READ_COMMAND (self, command, ret);
616                                 if (command == CONTROL_STOP)
617                                 {
618                                         GST_DEBUG_OBJECT (self, "CONTROL_STOP!");
619                                         goto stop_running;
620                                 }
621                         }
622                         else if ( G_LIKELY(rfd[1].revents & POLLIN) )
623                         {
624                                 int rlen = read(enc->fd, enc->buffer, VBUFSIZE);
625                                 if (rlen <= 0 || rlen % VBDSIZE ) {
626                                         if ( errno == 512 )
627                                                 goto stop_running;
628                                         GST_WARNING_OBJECT (self, "read error %s (%i)", strerror(errno), errno);
629                                         goto stop_running;
630                                 }
631                                 self->descriptors_available = rlen / VBDSIZE;
632                                 GST_LOG_OBJECT (self, "encoder buffer was empty, %d descriptors available", self->descriptors_available);
633                         }
634                 }
635
636                 while (self->descriptors_count < self->descriptors_available)
637                 {
638                         off_t offset = self->descriptors_count * VBDSIZE;
639                         VideoBufferDescriptor *desc = (VideoBufferDescriptor*)(&enc->buffer[offset]);
640
641                         uint32_t f = desc->stCommon.uiFlags;
642
643                         GST_LOG_OBJECT (self, "descriptors_count=%d, descriptors_available=%d\tuiOffset=%d, uiLength=%d", self->descriptors_count, self->descriptors_available, desc->stCommon.uiOffset, desc->stCommon.uiLength);
644
645                         if (G_UNLIKELY (f & CDB_FLAG_METADATA))
646                         {
647                                 GST_LOG_OBJECT (self, "CDB_FLAG_METADATA... skip outdated packet");
648                                 self->descriptors_count = self->descriptors_available;
649                                 continue;
650                         }
651
652                         if (f & VBD_FLAG_DTS_VALID && desc->uiDTS)
653                         {
654                                 g_mutex_lock (&self->mutex);
655                                 if (G_UNLIKELY (self->base_pts == GST_CLOCK_TIME_NONE))
656                                 {
657                                         if (self->dreamaudiosrc)
658                                         {
659                                                 guint64 audiosource_base_pts;
660                                                 g_signal_emit_by_name(self->dreamaudiosrc, "get-base-pts", &audiosource_base_pts);
661                                                 if (audiosource_base_pts != GST_CLOCK_TIME_NONE)
662                                                 {
663                                                         GST_DEBUG_OBJECT (self, "use DREAMAUDIOSOURCE's base_pts=%" GST_TIME_FORMAT "", GST_TIME_ARGS (audiosource_base_pts) );
664                                                         self->base_pts = audiosource_base_pts;
665                                                 }
666                                         }
667                                         if (self->base_pts == GST_CLOCK_TIME_NONE)
668                                         {
669                                                 self->base_pts = MPEGTIME_TO_GSTTIME(desc->uiDTS);
670                                                 GST_DEBUG_OBJECT (self, "use mpeg stream pts as base_pts=%" GST_TIME_FORMAT" (%lld)", GST_TIME_ARGS (self->base_pts), desc->uiDTS);
671                                         }
672                                 }
673                                 g_mutex_unlock (&self->mutex);
674                         }
675
676                         if (G_UNLIKELY (self->base_pts == GST_CLOCK_TIME_NONE))
677                         {
678                                 GST_DEBUG_OBJECT (self, "self->base_pts == GST_CLOCK_TIME_NONE! skip this frame");
679                                 self->descriptors_count++;
680                                 break;
681                         }
682
683                         readbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, enc->cdb, VMMAPSIZE, desc->stCommon.uiOffset, desc->stCommon.uiLength, self, NULL);
684
685                         GstClockTime buffer_time = GST_CLOCK_TIME_NONE;
686
687
688                         if (self->video_info.fps_d)
689                                 GST_BUFFER_DURATION(readbuf) = gst_util_uint64_scale (GST_SECOND, self->video_info.fps_d, self->video_info.fps_n);
690
691                         if (f & CDB_FLAG_PTS_VALID)
692                         {
693                                 GstClock *clock = gst_element_get_clock (GST_ELEMENT(self));
694                                 if (clock)
695                                 {
696                                         GstClockTime clock_time, base_time;
697                                         clock_time = gst_clock_get_time (clock);
698                                         base_time = gst_element_get_base_time (GST_ELEMENT(self));
699 GST_ERROR_OBJECT (self, "\n%" GST_TIME_FORMAT "=mpeg pts\n%" GST_TIME_FORMAT "=dreamsource_clock_time\n%" GST_TIME_FORMAT "=base_time\n%" GST_TIME_FORMAT "=base_pts",
700 GST_TIME_ARGS (MPEGTIME_TO_GSTTIME(desc->stCommon.uiPTS)), GST_TIME_ARGS (clock_time), GST_TIME_ARGS (base_time), GST_TIME_ARGS(self->base_pts) );
701
702                                         if (clock_time && clock_time > base_time)
703                                         {
704                                                 buffer_time = clock_time - base_time;
705                                                 gst_object_unref (clock);
706         //                                      buffer_time -= GST_BUFFER_DURATION(readbuf);
707                                                 GST_BUFFER_PTS(readbuf) = buffer_time;
708                                                 GST_BUFFER_DTS(readbuf) = buffer_time;
709
710                                         }
711                                         else
712                                                 buffer_time = GST_CLOCK_TIME_NONE;
713                                 }
714                         }
715
716
717
718                         if (/*(!clock || buffer_time == GST_CLOCK_TIME_NONE) && */f & CDB_FLAG_PTS_VALID)
719                         {
720                                 buffer_time = MPEGTIME_TO_GSTTIME(desc->stCommon.uiPTS);
721                                 GST_INFO_OBJECT (self, "pts/dts by mpeg pts =%" GST_TIME_FORMAT " (result %" GST_TIME_FORMAT")", GST_TIME_ARGS (buffer_time), GST_TIME_ARGS (buffer_time-self->base_pts));
722                                 if (self->base_pts != GST_CLOCK_TIME_NONE && buffer_time >= self->base_pts )
723                                 {
724                                         buffer_time -= self->base_pts/* + GST_BUFFER_DURATION(readbuf)*/;
725                                         GST_BUFFER_PTS(readbuf) = buffer_time;
726                                         GST_BUFFER_DTS(readbuf) = buffer_time;
727                                         GST_INFO_OBJECT (self, "currected pts =%" GST_TIME_FORMAT "", GST_TIME_ARGS (GST_BUFFER_PTS(readbuf)));
728                                 }
729                         }
730                         
731 #ifdef dump
732                         int wret = write(self->dumpfd, (unsigned char*)(enc->cdb + desc->stCommon.uiOffset), desc->stCommon.uiLength);
733                         GST_LOG_OBJECT (self, "read %i dumped %i total %" G_GSIZE_FORMAT " ", desc->stCommon.uiLength, wret, gst_buffer_get_size (*outbuf) );
734 #endif
735                         self->descriptors_count++;
736                         break;
737                 }
738
739                 if (self->descriptors_count == self->descriptors_available)
740                 {
741                         GST_LOG_OBJECT (self, "self->descriptors_count == self->descriptors_available -> release %i consumed descriptors", self->descriptors_count);
742                         /* release consumed descs */
743                         if (write(enc->fd, &self->descriptors_count, sizeof(self->descriptors_count)) != sizeof(self->descriptors_count)) {
744                                 GST_WARNING_OBJECT (self, "release consumed descs write error!");
745                                 goto stop_running;
746                         }
747                         self->descriptors_available = 0;
748                 }
749
750                 if (readbuf)
751                 {
752                         g_mutex_lock (&self->mutex);
753                         if (!self->flushing)
754                         {
755                                 while (g_queue_get_length (&self->current_frames) >= self->buffer_size)
756                                 {
757                                         GstBuffer * oldbuf = g_queue_pop_head (&self->current_frames);
758                                         GST_WARNING_OBJECT (self, "dropping %" GST_PTR_FORMAT " because of queue overflow! buffers count=%i", oldbuf, g_queue_get_length (&self->current_frames));
759                                         gst_buffer_unref(oldbuf);
760                                 }
761                                 g_queue_push_tail (&self->current_frames, readbuf);
762                         }
763                         g_cond_signal (&self->cond);
764                         GST_INFO_OBJECT (self, "read %" GST_PTR_FORMAT " to queue", readbuf );
765                         g_mutex_unlock (&self->mutex);
766                 }
767         }
768
769         g_assert_not_reached ();
770         return;
771
772         stop_running:
773         {
774                 g_mutex_unlock (&self->mutex);
775                 g_cond_signal (&self->cond);
776                 GST_DEBUG ("stop running, exit thread");
777                 message = gst_message_new_stream_status (GST_OBJECT_CAST (self), GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT_CAST (GST_OBJECT_PARENT(self)));
778                 g_value_init (&val, GST_TYPE_G_THREAD);
779                 g_value_set_boxed (&val, self->readthread);
780                 gst_message_set_stream_status_object (message, &val);
781                 g_value_unset (&val);
782                 GST_DEBUG_OBJECT (self, "posting LEAVE stream status");
783                 gst_element_post_message (GST_ELEMENT_CAST (self), message);
784                 return;
785         }
786 }
787
788 static GstFlowReturn
789 gst_dreamvideosource_create (GstPushSrc * psrc, GstBuffer ** outbuf)
790 {
791         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (psrc);
792
793         GST_LOG_OBJECT (self, "new buffer requested");
794
795         g_mutex_lock (&self->mutex);
796         while (g_queue_is_empty (&self->current_frames) && !self->flushing)
797         {
798                 g_cond_wait (&self->cond, &self->mutex);
799                 GST_INFO_OBJECT (self, "waiting for buffer from encoder");
800         }
801
802         *outbuf = g_queue_pop_head (&self->current_frames);
803         g_mutex_unlock (&self->mutex);
804
805         if (*outbuf)
806         {
807                 GST_INFO_OBJECT (self, "pushing %" GST_PTR_FORMAT "", *outbuf );
808                 return GST_FLOW_OK;
809         }
810         GST_INFO_OBJECT (self, "FLUSHING");
811         return GST_FLOW_FLUSHING;
812
813 }
814
815 static GstStateChangeReturn gst_dreamvideosource_change_state (GstElement * element, GstStateChange transition)
816 {
817         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (element);
818         GstStateChangeReturn sret = GST_STATE_CHANGE_SUCCESS;
819         int ret;
820
821         switch (transition) {
822                 case GST_STATE_CHANGE_NULL_TO_READY:
823                 {
824                         int control_sock[2];
825                         if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
826                         {
827                                 GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, (NULL), GST_ERROR_SYSTEM);
828                                 return GST_STATE_CHANGE_FAILURE;
829                         }
830                         READ_SOCKET (self) = control_sock[0];
831                         WRITE_SOCKET (self) = control_sock[1];
832                         fcntl (READ_SOCKET (self), F_SETFL, O_NONBLOCK);
833                         fcntl (WRITE_SOCKET (self), F_SETFL, O_NONBLOCK);
834                         GST_DEBUG_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
835                         break;
836                 }
837                 case GST_STATE_CHANGE_READY_TO_PAUSED:
838                         GST_LOG_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
839 #ifdef PROVIDE_CLOCK
840                         GstMessage* msg;
841                         msg = gst_message_new_clock_provide (GST_OBJECT_CAST (element), self->encoder_clock, TRUE);
842                         GST_INFO_OBJECT (self, "clock: %" GST_PTR_FORMAT " %" GST_PTR_FORMAT " typename=%s", self->encoder_clock, msg, GST_MESSAGE_TYPE_NAME(msg));
843                         gst_element_post_message (element, msg);
844 #endif
845                         self->flushing = TRUE;
846                         self->readthread = g_thread_try_new ("dreamvideosrc-read", (GThreadFunc) gst_dreamvideosource_read_thread_func, self, NULL);
847                         GST_DEBUG_OBJECT (self, "started readthread @%p", self->readthread );
848                         break;
849                 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
850                         g_mutex_lock (&self->mutex);
851                         GST_LOG_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
852                         self->base_pts = GST_CLOCK_TIME_NONE;
853                         ret = ioctl(self->encoder->fd, VENC_START);
854                         if ( ret != 0 )
855                                 goto fail;
856                         self->descriptors_available = 0;
857                         CLEAR_COMMAND (self);
858                         GST_INFO_OBJECT (self, "started encoder!");
859                         g_mutex_unlock (&self->mutex);
860                         break;
861                 default:
862                         break;
863         }
864
865         if (GST_ELEMENT_CLASS (parent_class)->change_state)
866                 sret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
867
868         switch (transition) {
869                 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
870                         GST_LOG_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
871                         g_mutex_lock (&self->mutex);
872                         self->flushing = TRUE;
873                         GST_DEBUG_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED self->descriptors_count=%i self->descriptors_available=%i", self->descriptors_count, self->descriptors_available);
874                         while (self->descriptors_count < self->descriptors_available)
875                         {
876                                 GST_LOG_OBJECT (self, "flushing self->descriptors_count=%i");
877                                 self->descriptors_count++;
878                         }
879                         if (self->descriptors_count)
880                                 write(self->encoder->fd, &self->descriptors_count, sizeof(self->descriptors_count));
881                         ret = ioctl(self->encoder->fd, VENC_STOP);
882                         if ( ret != 0 )
883                                 goto fail;
884 #ifdef PROVIDE_CLOCK
885                         gst_clock_set_master (self->encoder_clock, NULL);
886 #endif
887                         GST_INFO_OBJECT (self, "stopped encoder!");
888                         g_mutex_unlock (&self->mutex);
889                         break;
890                 case GST_STATE_CHANGE_PAUSED_TO_READY:
891                         GST_DEBUG_OBJECT (self,"GST_STATE_CHANGE_PAUSED_TO_READY");
892 #ifdef PROVIDE_CLOCK
893                         gst_element_post_message (element, gst_message_new_clock_lost (GST_OBJECT_CAST (element), self->encoder_clock));
894 #endif
895                         GST_DEBUG_OBJECT (self, "stopping readthread @%p...", self->readthread);
896                         SEND_COMMAND (self, CONTROL_STOP);
897                         g_thread_join (self->readthread);
898                         break;
899                 case GST_STATE_CHANGE_READY_TO_NULL:
900                         close (READ_SOCKET (self));
901                         close (WRITE_SOCKET (self));
902                         READ_SOCKET (self) = -1;
903                         WRITE_SOCKET (self) = -1;
904                         GST_DEBUG_OBJECT (self,"GST_STATE_CHANGE_READY_TO_NULL, close control sockets");
905                         break;
906                 default:
907                         break;
908         }
909
910         return sret;
911 fail:
912         GST_ERROR_OBJECT(self,"can't perform encoder ioctl!");
913         g_mutex_unlock (&self->mutex);
914         return GST_STATE_CHANGE_FAILURE;
915 }
916
917 static gboolean
918 gst_dreamvideosource_start (GstBaseSrc * bsrc)
919 {
920         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (bsrc);
921         self->dreamaudiosrc = gst_bin_get_by_name_recurse_up(GST_BIN(GST_ELEMENT_PARENT(self)), "dreamaudiosource0");
922         GST_DEBUG_OBJECT (self, "started. reference to dreamaudiosource=%" GST_PTR_FORMAT"", self->dreamaudiosrc);
923         return TRUE;
924 }
925
926 static gboolean
927 gst_dreamvideosource_stop (GstBaseSrc * bsrc)
928 {
929         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (bsrc);
930         if (self->dreamaudiosrc)
931                 gst_object_unref(self->dreamaudiosrc);
932         GST_DEBUG_OBJECT (self, "stop");
933         return TRUE;
934 }
935
936 static void
937 gst_dreamvideosource_dispose (GObject * gobject)
938 {
939         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (gobject);
940         if (self->encoder) {
941                 if (self->encoder->buffer)
942                         free(self->encoder->buffer);
943                 if (self->encoder->cdb)
944                         munmap(self->encoder->cdb, VMMAPSIZE);
945                 if (self->encoder->fd)
946                         close(self->encoder->fd);
947                 free(self->encoder);
948         }
949 #ifdef PROVIDE_CLOCK
950         if (self->encoder_clock) {
951                 gst_object_unref (self->encoder_clock);
952                 self->encoder_clock = NULL;
953         }
954 #endif
955 #ifdef dump
956         close(self->dumpfd);
957 #endif
958         if (self->current_caps)
959                 gst_caps_unref(self->current_caps);
960         g_mutex_clear (&self->mutex);
961         g_cond_clear (&self->cond);
962         GST_DEBUG_OBJECT (self, "disposed");
963         G_OBJECT_CLASS (parent_class)->dispose (gobject);
964 }
965
966 #ifdef PROVIDE_CLOCK
967 static GstClock *gst_dreamvideosource_provide_clock (GstElement * element)
968 {
969         GstDreamVideoSource *self = GST_DREAMVIDEOSOURCE (element);
970
971         if (!self->encoder || self->encoder->fd < 0)
972         {
973                 GST_DEBUG_OBJECT (self, "encoder device not started, can't provide clock!");
974                 return NULL;
975         }
976
977         return GST_CLOCK_CAST (gst_object_ref (self->encoder_clock));
978 }
979 #endif