implement readthread state to gain better control over running and paused behaviour
[gst-plugin-dreamsource.git] / src / gstdreamaudiosource.c
1 /*
2  * GStreamer dreamaudiosource
3  * Copyright 2014-2015 Andreas Frisch <fraxinas@opendreambox.org>
4  *
5  * This program is licensed under the Creative Commons
6  * Attribution-NonCommercial-ShareAlike 3.0 Unported
7  * License. To view a copy of this license, visit
8  * http://creativecommons.org/licenses/by-nc-sa/3.0/ or send a letter to
9  * Creative Commons,559 Nathan Abbott Way,Stanford,California 94305,USA.
10  *
11  * Alternatively, this program may be distributed and executed on
12  * hardware which is licensed by Dream Property GmbH.
13  *
14  * This program is NOT free software. It is open source, you are allowed
15  * to modify it (if you keep the license), but it may not be commercially
16  * distributed other than under the conditions noted above.
17  */
18
19 #ifdef HAVE_CONFIG_H
20 #include <config.h>
21 #endif
22
23 #include <gst/gst.h>
24 #include "gstdreamaudiosource.h"
25
26 GST_DEBUG_CATEGORY_STATIC (dreamaudiosource_debug);
27 #define GST_CAT_DEFAULT dreamaudiosource_debug
28
29 GType gst_dreamaudiosource_input_mode_get_type (void)
30 {
31         static volatile gsize input_mode_type = 0;
32         static const GEnumValue input_mode[] = {
33                 {GST_DREAMAUDIOSOURCE_INPUT_MODE_LIVE, "GST_DREAMAUDIOSOURCE_INPUT_MODE_LIVE", "live"},
34                 {GST_DREAMAUDIOSOURCE_INPUT_MODE_HDMI_IN, "GST_DREAMAUDIOSOURCE_INPUT_MODE_HDMI_IN", "hdmi_in"},
35                 {GST_DREAMAUDIOSOURCE_INPUT_MODE_BACKGROUND, "GST_DREAMAUDIOSOURCE_INPUT_MODE_BACKGROUND", "background"},
36                 {0, NULL, NULL},
37         };
38
39         if (g_once_init_enter (&input_mode_type)) {
40                 GType tmp = g_enum_register_static ("GstDreamAudioSourceInputMode", input_mode);
41                 g_once_init_leave (&input_mode_type, tmp);
42         }
43         return (GType) input_mode_type;
44 }
45
46 enum
47 {
48         SIGNAL_GET_DTS_OFFSET,
49         LAST_SIGNAL
50 };
51 enum
52 {
53         ARG_0,
54         ARG_BITRATE,
55         ARG_INPUT_MODE
56 };
57
58 static guint gst_dreamaudiosource_signals[LAST_SIGNAL] = { 0 };
59
60 #define DEFAULT_BITRATE     128
61 #define DEFAULT_INPUT_MODE  GST_DREAMAUDIOSOURCE_INPUT_MODE_LIVE
62 #define DEFAULT_BUFFER_SIZE 16
63
64 static GstStaticPadTemplate srctemplate =
65     GST_STATIC_PAD_TEMPLATE ("src",
66         GST_PAD_SRC,
67         GST_PAD_ALWAYS,
68         GST_STATIC_CAPS ("audio/mpeg, "
69         "mpegversion = 4,"
70         "stream-format = (string) adts")
71     );
72
73 #define gst_dreamaudiosource_parent_class parent_class
74 G_DEFINE_TYPE (GstDreamAudioSource, gst_dreamaudiosource, GST_TYPE_PUSH_SRC);
75
76 static GstCaps *gst_dreamaudiosource_getcaps (GstBaseSrc * bsrc, GstCaps * filter);
77 static gboolean gst_dreamaudiosource_unlock (GstBaseSrc * bsrc);
78 static gboolean gst_dreamaudiosource_unlock_stop (GstBaseSrc * bsrc);
79 static void gst_dreamaudiosource_dispose (GObject * gobject);
80 static GstFlowReturn gst_dreamaudiosource_create (GstPushSrc * psrc, GstBuffer ** outbuf);
81
82 static void gst_dreamaudiosource_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec);
83 static void gst_dreamaudiosource_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec);
84
85 static GstStateChangeReturn gst_dreamaudiosource_change_state (GstElement * element, GstStateChange transition);
86 static gint64 gst_dreamaudiosource_get_dts_offset (GstDreamAudioSource *self);
87
88 static gboolean gst_dreamaudiosource_encoder_init (GstDreamAudioSource * self);
89 static void gst_dreamaudiosource_encoder_release (GstDreamAudioSource * self);
90
91 static void gst_dreamaudiosource_read_thread_func (GstDreamAudioSource * self);
92
93 #ifdef PROVIDE_CLOCK
94 static GstClock *gst_dreamaudiosource_provide_clock (GstElement * elem);
95 // static GstClockTime gst_dreamaudiosource_get_encoder_time_ (GstClock * clock, GstBaseSrc * bsrc);
96 #endif
97
98 static void
99 gst_dreamaudiosource_class_init (GstDreamAudioSourceClass * klass)
100 {
101         GObjectClass *gobject_class;
102         GstElementClass *gstelement_class;
103         GstBaseSrcClass *gstbasesrc_class;
104         GstPushSrcClass *gstpush_src_class;
105
106         gobject_class = (GObjectClass *) klass;
107         gstelement_class = (GstElementClass *) klass;
108         gstbasesrc_class = (GstBaseSrcClass *) klass;
109         gstpush_src_class = (GstPushSrcClass *) klass;
110
111         gobject_class->set_property = gst_dreamaudiosource_set_property;
112         gobject_class->get_property = gst_dreamaudiosource_get_property;
113         gobject_class->dispose = gst_dreamaudiosource_dispose;
114
115         gst_element_class_add_pad_template (gstelement_class,
116                                             gst_static_pad_template_get (&srctemplate));
117
118         gst_element_class_set_static_metadata (gstelement_class,
119             "Dream Audio source", "Source/Audio",
120             "Provide an audio elementary stream from Dreambox encoder device",
121             "Andreas Frisch <fraxinas@opendreambox.org>");
122
123         gstelement_class->change_state = gst_dreamaudiosource_change_state;
124
125         gstbasesrc_class->get_caps = gst_dreamaudiosource_getcaps;
126         gstbasesrc_class->unlock = gst_dreamaudiosource_unlock;
127         gstbasesrc_class->unlock_stop = gst_dreamaudiosource_unlock_stop;
128
129         gstpush_src_class->create = gst_dreamaudiosource_create;
130
131 #ifdef PROVIDE_CLOCK
132         gstelement_class->provide_clock = GST_DEBUG_FUNCPTR (gst_dreamaudiosource_provide_clock);
133 //      g_type_class_ref (GST_TYPE_SYSTEM_CLOCK);
134 #endif
135
136         g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BITRATE,
137           g_param_spec_int ("bitrate", "Bitrate (kb/s)",
138             "Bitrate in kbit/sec", 16, 320, DEFAULT_BITRATE,
139             G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
140
141         g_object_class_install_property (gobject_class, ARG_INPUT_MODE,
142           g_param_spec_enum ("input-mode", "Input Mode",
143             "Select the input source of the audio stream",
144             GST_TYPE_DREAMAUDIOSOURCE_INPUT_MODE, DEFAULT_INPUT_MODE,
145             G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
146
147         gst_dreamaudiosource_signals[SIGNAL_GET_DTS_OFFSET] =
148                 g_signal_new ("get-dts-offset",
149                 G_TYPE_FROM_CLASS (klass),
150                 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
151                 G_STRUCT_OFFSET (GstDreamAudioSourceClass, get_dts_offset),
152                 NULL, NULL, gst_dreamsource_marshal_INT64__VOID, G_TYPE_INT64, 0);
153
154         klass->get_dts_offset = gst_dreamaudiosource_get_dts_offset;
155 }
156
157 static gint64
158 gst_dreamaudiosource_get_dts_offset (GstDreamAudioSource *self)
159 {
160         GST_DEBUG_OBJECT (self, "gst_dreamaudiosource_get_dts_offset %" GST_TIME_FORMAT"", GST_TIME_ARGS (self->dts_offset) );
161         return self->dts_offset;
162 }
163
164 static void gst_dreamaudiosource_set_bitrate (GstDreamAudioSource * self, uint32_t bitrate)
165 {
166         g_mutex_lock (&self->mutex);
167         uint32_t abr = bitrate*1000;
168         if (!self->encoder || !self->encoder->fd)
169         {
170                 self->audio_info.bitrate = bitrate;
171                 g_mutex_unlock (&self->mutex);
172                 return;
173         }
174
175         int ret = ioctl(self->encoder->fd, AENC_SET_BITRATE, &abr);
176         if (ret != 0)
177         {
178                 GST_WARNING_OBJECT (self, "can't set audio bitrate to %i bytes/s!", abr);
179                 g_mutex_unlock (&self->mutex);
180                 return;
181         }
182         GST_INFO_OBJECT (self, "set audio bitrate to %i kBytes/s", bitrate);
183         self->audio_info.bitrate = bitrate;
184         g_mutex_unlock (&self->mutex);
185 }
186
187 void gst_dreamaudiosource_set_input_mode (GstDreamAudioSource *self, GstDreamAudioSourceInputMode mode)
188 {
189         g_return_if_fail (GST_IS_DREAMAUDIOSOURCE (self));
190         GEnumValue *val = g_enum_get_value (G_ENUM_CLASS (g_type_class_ref (GST_TYPE_DREAMAUDIOSOURCE_INPUT_MODE)), mode);
191         if (!val)
192         {
193                 GST_ERROR_OBJECT (self, "no such input_mode %i!", mode);
194                 goto out;
195         }
196         const gchar *value_nick = val->value_nick;
197
198         g_mutex_lock (&self->mutex);
199         if (!self->encoder || !self->encoder->fd)
200         {
201                 self->input_mode = mode;
202                 goto out;
203         }
204         int int_mode = mode;
205         int ret = ioctl(self->encoder->fd, AENC_SET_SOURCE, &int_mode);
206         if (ret != 0)
207         {
208                 GST_WARNING_OBJECT (self, "can't set input mode to %s (%i) error: %s", value_nick, mode, strerror(errno));
209                 goto out;
210         }
211         GST_INFO_OBJECT (self, "successfully set input mode to %s (%i)", value_nick, mode);
212         self->input_mode = mode;
213 out:
214         g_mutex_unlock (&self->mutex);
215         return;
216 }
217
218 GstDreamAudioSourceInputMode gst_dreamaudiosource_get_input_mode (GstDreamAudioSource *self)
219 {
220         GstDreamAudioSourceInputMode result;
221         g_return_val_if_fail (GST_IS_DREAMAUDIOSOURCE (self), -1);
222         GST_OBJECT_LOCK (self);
223         result =self->input_mode;
224         GST_OBJECT_UNLOCK (self);
225         return result;
226 }
227
228 gboolean
229 gst_dreamaudiosource_plugin_init (GstPlugin *plugin)
230 {
231         GST_DEBUG_CATEGORY_INIT (dreamaudiosource_debug, "dreamaudiosource", 0, "dreamaudiosource");
232         return gst_element_register (plugin, "dreamaudiosource", GST_RANK_PRIMARY, GST_TYPE_DREAMAUDIOSOURCE);
233 }
234
235 static void
236 gst_dreamaudiosource_init (GstDreamAudioSource * self)
237 {
238         self->encoder = NULL;
239         self->descriptors_available = 0;
240         self->input_mode = DEFAULT_INPUT_MODE;
241
242         self->buffer_size = DEFAULT_BUFFER_SIZE;
243         g_queue_init (&self->current_frames);
244         self->readthread = NULL;
245
246         g_mutex_init (&self->mutex);
247         g_cond_init (&self->cond);
248         READ_SOCKET (self) = -1;
249         WRITE_SOCKET (self) = -1;
250
251         gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
252         gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
253
254         self->encoder = NULL;
255         self->encoder_clock = NULL;
256
257 #ifdef dump
258         self->dumpfd = open("/media/hdd/movie/dreamaudiosource.dump", O_WRONLY | O_CREAT | O_TRUNC);
259         GST_DEBUG_OBJECT (self, "dumpfd = %i (%s)", self->dumpfd, (self->dumpfd > 0) ? "OK" : strerror(errno));
260 #endif
261 }
262
263 static gboolean gst_dreamaudiosource_encoder_init (GstDreamAudioSource * self)
264 {
265         GST_LOG_OBJECT (self, "initializating encoder...");
266         self->encoder = malloc(sizeof(EncoderInfo));
267
268         if (!self->encoder) {
269                 GST_ERROR_OBJECT (self,"out of space");
270                 return FALSE;
271         }
272
273         char fn_buf[32];
274         sprintf(fn_buf, "/dev/aenc%d", 0);
275         self->encoder->fd = open(fn_buf, O_RDWR | O_SYNC);
276         if (self->encoder->fd <= 0) {
277                 GST_ERROR_OBJECT (self,"cannot open device %s (%s)", fn_buf, strerror(errno));
278                 free(self->encoder);
279                 self->encoder = NULL;
280                 return FALSE;
281         }
282
283         self->encoder->buffer = malloc(ABUFSIZE);
284         if (!self->encoder->buffer) {
285                 GST_ERROR_OBJECT(self,"cannot alloc buffer");
286                 return FALSE;
287         }
288
289         self->encoder->cdb = (unsigned char *)mmap (0, AMMAPSIZE, PROT_READ, MAP_PRIVATE, self->encoder->fd, 0);
290
291         if (!self->encoder->cdb || self->encoder->cdb == MAP_FAILED) {
292                 GST_ERROR_OBJECT(self,"cannot alloc buffer: %s (%d)", strerror(errno));
293                 self->encoder->cdb = NULL;
294                 return FALSE;
295         }
296
297         int control_sock[2];
298         if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
299         {
300                 GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, (NULL), GST_ERROR_SYSTEM);
301                 return GST_STATE_CHANGE_FAILURE;
302         }
303         READ_SOCKET (self) = control_sock[0];
304         WRITE_SOCKET (self) = control_sock[1];
305         fcntl (READ_SOCKET (self), F_SETFL, O_NONBLOCK);
306         fcntl (WRITE_SOCKET (self), F_SETFL, O_NONBLOCK);
307
308         self->memtrack_list = NULL;
309         self->encoder->used_range_min = UINT32_MAX;
310         self->encoder->used_range_max = 0;
311
312         gst_dreamaudiosource_set_bitrate (self, self->audio_info.bitrate);
313         gst_dreamaudiosource_set_input_mode (self, self->input_mode);
314
315 #ifdef PROVIDE_CLOCK
316         self->encoder_clock = gst_dreamsource_clock_new ("GstDreamAudioSourceClock", self->encoder->fd);
317         GST_DEBUG_OBJECT (self, "self->encoder_clock = %" GST_PTR_FORMAT, self->encoder_clock);
318         GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
319 #endif
320
321         GST_LOG_OBJECT (self, "encoder %s successfully initialized", fn_buf);
322         return TRUE;
323 }
324
325 static void gst_dreamaudiosource_encoder_release (GstDreamAudioSource * self)
326 {
327         GST_LOG_OBJECT (self, "releasing encoder...");
328         if (self->encoder) {
329                 if (self->encoder->buffer)
330                         free(self->encoder->buffer);
331                 if (self->encoder->cdb)
332                         munmap(self->encoder->cdb, AMMAPSIZE);
333                 if (self->encoder->fd)
334                         close(self->encoder->fd);
335                 free(self->encoder);
336         }
337         g_list_free(self->memtrack_list);
338         self->encoder = NULL;
339         close (READ_SOCKET (self));
340         close (WRITE_SOCKET (self));
341         READ_SOCKET (self) = -1;
342         WRITE_SOCKET (self) = -1;
343         if (self->encoder_clock) {
344                 gst_object_unref (self->encoder_clock);
345                 self->encoder_clock = NULL;
346         }
347 }
348
349 static void
350 gst_dreamaudiosource_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec)
351 {
352         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (object);
353
354         switch (prop_id) {
355                 case ARG_BITRATE:
356                         gst_dreamaudiosource_set_bitrate(self, g_value_get_int (value));
357                         break;
358                 case ARG_INPUT_MODE:
359                              gst_dreamaudiosource_set_input_mode (self, g_value_get_enum (value));
360                         break;
361                 default:
362                         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
363                         break;
364         }
365 }
366
367 static void
368 gst_dreamaudiosource_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec)
369 {
370         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (object);
371
372         switch (prop_id) {
373                 case ARG_BITRATE:
374                         g_value_set_int (value, self->audio_info.bitrate/1000);
375                         break;
376                 case ARG_INPUT_MODE:
377                         g_value_set_enum (value, gst_dreamaudiosource_get_input_mode (self));
378                         break;
379                 default:
380                         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
381                         break;
382         }
383 }
384
385 static GstCaps *
386 gst_dreamaudiosource_getcaps (GstBaseSrc * bsrc, GstCaps * filter)
387 {
388         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
389         GstPadTemplate *pad_template;
390         GstCaps *caps;
391
392         pad_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS(self), "src");
393         g_return_val_if_fail (pad_template != NULL, NULL);
394
395         if (self->encoder == NULL) {
396                 GST_DEBUG_OBJECT (self, "encoder not opened -> use template caps");
397                 caps = gst_pad_template_get_caps (pad_template);
398         }
399         else
400         {
401                 caps = gst_caps_make_writable(gst_pad_template_get_caps (pad_template));
402         }
403
404         GST_DEBUG_OBJECT (self, "return caps %" GST_PTR_FORMAT, caps);
405         return caps;
406 }
407
408 static gboolean gst_dreamaudiosource_unlock (GstBaseSrc * bsrc)
409 {
410         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
411         GST_DEBUG_OBJECT (self, "stop creating buffers");
412         g_mutex_lock (&self->mutex);
413         self->flushing = TRUE;
414         GST_DEBUG_OBJECT (self, "set flushing TRUE");
415         g_cond_signal (&self->cond);
416         g_mutex_unlock (&self->mutex);
417         return TRUE;
418 }
419
420 static gboolean gst_dreamaudiosource_unlock_stop (GstBaseSrc * bsrc)
421 {
422         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
423         GST_DEBUG_OBJECT (self, "start creating buffers...");
424         g_mutex_lock (&self->mutex);
425         self->flushing = FALSE;
426         g_queue_foreach (&self->current_frames, (GFunc) gst_buffer_unref, NULL);
427         g_queue_clear (&self->current_frames);
428         g_mutex_unlock (&self->mutex);
429         return TRUE;
430 }
431
432 static void gst_dreamaudiosource_free_buffer (struct _buffer_memorytracker * memtrack)
433 {
434         GstDreamAudioSource * self = memtrack->self;
435         GST_OBJECT_LOCK(self);
436         GST_TRACE_OBJECT (self, "freeing %" GST_PTR_FORMAT " uiOffset=%i uiLength=%i used_range_min=%i", memtrack->buffer, memtrack->uiOffset, memtrack->uiLength, self->encoder->used_range_min);
437         GList *list = g_list_first (self->memtrack_list);
438         guint abs_minimum = UINT32_MAX;
439         guint abs_maximum = 0;
440         struct _buffer_memorytracker * mt;
441         int count = 0;
442         self->memtrack_list = g_list_remove(list, memtrack);
443         free(memtrack);
444         list = g_list_first (self->memtrack_list);
445         while (list) {
446                 mt = list->data;
447                 if (abs_minimum > 0 && mt->uiOffset < abs_minimum)
448                         abs_minimum = mt->uiOffset;
449                 if (mt->uiOffset+mt->uiLength > abs_maximum)
450                         abs_maximum = mt->uiOffset+mt->uiLength;
451                 count++;
452                 list = g_list_next (list);
453         }
454         GST_TRACE_OBJECT (self, "new abs_minimum=%i, abs_maximum=%i", abs_minimum, abs_maximum);
455         self->encoder->used_range_min = abs_minimum;
456         self->encoder->used_range_max = abs_maximum;
457         GST_OBJECT_UNLOCK(self);
458 }
459
460 static void gst_dreamaudiosource_read_thread_func (GstDreamAudioSource * self)
461 {
462         EncoderInfo *enc = self->encoder;
463         GstDreamSourceReadthreadState state = READTHREADSTATE_NONE;
464         GstBuffer *readbuf;
465
466         if (!enc) {
467                 GST_WARNING_OBJECT (self, "encoder device not opened!");
468                 return;
469         }
470
471         GST_DEBUG_OBJECT (self, "enter read thread");
472
473         GstMessage *message;
474         GValue val = { 0 };
475
476         message = gst_message_new_stream_status (GST_OBJECT_CAST (self), GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT_CAST (GST_OBJECT_PARENT(self)));
477         g_value_init (&val, GST_TYPE_G_THREAD);
478         g_value_set_boxed (&val, self->readthread);
479         gst_message_set_stream_status_object (message, &val);
480         g_value_unset (&val);
481         GST_DEBUG_OBJECT (self, "posting ENTER stream status");
482         gst_element_post_message (GST_ELEMENT_CAST (self), message);
483         GstClockTime clock_time, base_time;
484         gboolean discont = TRUE;
485
486         while (TRUE) {
487                 readbuf = NULL;
488                 {
489                         if (state == READTHREADSTATE_STOP)
490                                 goto stop_running;
491
492                         struct pollfd rfd[2];
493                         int timeout, nfds;
494
495                         rfd[0].fd = READ_SOCKET (self);
496                         rfd[0].events = POLLIN | POLLERR | POLLHUP | POLLPRI;
497                         rfd[1].revents = 0;
498                         rfd[1].events = POLLIN;
499                         nfds = 1;
500                         timeout = 0;
501
502                         if (state <= READTRREADSTATE_PAUSED)
503                                 timeout = 200;
504                         else if (state == READTRREADSTATE_RUNNING && self->descriptors_available == 0)
505                         {
506                                 rfd[1].fd = enc->fd;
507                                 self->descriptors_count = 0;
508                                 timeout = 200;
509                                 nfds = 2;
510                         }
511
512                         int ret = poll(rfd, nfds, timeout);
513
514                         if (G_UNLIKELY (ret == -1))
515                         {
516                                 GST_ERROR_OBJECT (self, "SELECT ERROR!");
517                                 goto stop_running;
518                         }
519                         else if ( ret == 0 && self->descriptors_available == 0 )
520                         {
521                                 g_mutex_lock (&self->mutex);
522                                 gst_clock_get_internal_time(self->encoder_clock);
523                                 if (self->flushing)
524                                 {
525                                         GST_DEBUG_OBJECT (self, "FLUSHING!");
526                                         g_cond_signal (&self->cond);
527                                         g_mutex_unlock (&self->mutex);
528                                         continue;
529                                 }
530                                 g_mutex_unlock (&self->mutex);
531                                 GST_DEBUG_OBJECT (self, "SELECT TIMEOUT");
532                                 //!!! TODO generate valid dummy payload
533                                 discont = TRUE;
534                                 if (self->dts_offset != GST_CLOCK_TIME_NONE)
535                                         readbuf = gst_buffer_new();
536                         }
537                         else if ( rfd[0].revents )
538                         {
539                                 char command;
540                                 READ_COMMAND (self, command, ret);
541                                 switch (command) {
542                                         case CONTROL_STOP:
543                                                 GST_DEBUG_OBJECT (self, "CONTROL_STOP!");
544                                                 state = READTHREADSTATE_STOP;
545                                                 break;
546                                         case CONTROL_PAUSE:
547                                                 GST_DEBUG_OBJECT (self, "CONTROL_PAUSE!");
548                                                 state = READTRREADSTATE_PAUSED;
549                                                 break;
550                                         case CONTROL_RUN:
551                                                 GST_DEBUG_OBJECT (self, "CONTROL_RUN");
552                                                 state = READTRREADSTATE_RUNNING;
553                                                 break;
554                                         default:
555                                                 GST_ERROR_OBJECT (self, "illegal control socket command %c received!", command);
556                                 }
557                                 continue;
558                         }
559                         else if ( G_LIKELY(rfd[1].revents & POLLIN) )
560                         {
561                                 clock_time = gst_clock_get_internal_time (self->encoder_clock);
562                                 base_time = gst_element_get_base_time(GST_ELEMENT(self));
563                                 int rlen = read(enc->fd, enc->buffer, ABUFSIZE);
564                                 if (rlen <= 0 || rlen % ABDSIZE ) {
565                                         if ( errno == 512 )
566                                                 goto stop_running;
567                                         GST_WARNING_OBJECT (self, "read error %s (%i)", strerror(errno), errno);
568                                         goto stop_running;
569                                 }
570                                 self->descriptors_available = rlen / ABDSIZE;
571                                 GST_LOG_OBJECT (self, "encoder buffer was empty, %d descriptors available", self->descriptors_available);
572                         }
573                 }
574
575                 while (self->descriptors_count < self->descriptors_available)
576                 {
577                         GstClockTime encoder_pts = GST_CLOCK_TIME_NONE;
578                         GstClockTime result_pts = GST_CLOCK_TIME_NONE;
579
580                         off_t offset = self->descriptors_count * ABDSIZE;
581                         AudioBufferDescriptor *desc = (AudioBufferDescriptor*)(&enc->buffer[offset]);
582
583                         uint32_t f = desc->stCommon.uiFlags;
584
585                         if (G_UNLIKELY (f & CDB_FLAG_METADATA))
586                         {
587                                 GST_LOG_OBJECT (self, "CDB_FLAG_METADATA... skip outdated packet");
588                                 self->descriptors_count = self->descriptors_available;
589                                 continue;
590                         }
591
592                         GST_LOG_OBJECT (self, "descriptors_count=%d, descriptors_available=%d\tuiOffset=%d, uiLength=%d", self->descriptors_count, self->descriptors_available, desc->stCommon.uiOffset, desc->stCommon.uiLength);
593
594                         if (self->encoder->used_range_min == UINT32_MAX)
595                                 self->encoder->used_range_min = desc->stCommon.uiOffset;
596
597                         if (self->encoder->used_range_max == 0)
598                                 self->encoder->used_range_max = desc->stCommon.uiOffset+desc->stCommon.uiLength;
599
600 //                      if (desc->stCommon.uiOffset < self->encoder->used_range_max && desc->stCommon.uiOffset+desc->stCommon.uiLength > self->encoder->used_range_min)
601 //                      {
602 //                              GST_WARNING_OBJECT (self, "encoder overwrites buffer memory that is still in use! uiOffset=%i uiLength=%i used_range_min=%i used_range_max=%i", desc->stCommon.uiOffset, desc->stCommon.uiLength, self->encoder->used_range_min, self->encoder->used_range_max);
603 //                              self->descriptors_count++;
604 //                              readbuf = gst_buffer_new();
605 //                              continue;
606 //                      }
607
608                         // uiDTS since kernel driver booted
609                         if (f & CDB_FLAG_PTS_VALID)
610                         {
611                                 encoder_pts = MPEGTIME_TO_GSTTIME(desc->stCommon.uiPTS);
612                                 GST_LOG_OBJECT (self, "f & CDB_FLAG_PTS_VALID && encoder's uiPTS=%" GST_TIME_FORMAT"", GST_TIME_ARGS(encoder_pts));
613
614                                 g_mutex_lock (&self->mutex);
615                                 if (G_UNLIKELY (self->dts_offset == GST_CLOCK_TIME_NONE))
616                                 {
617                                         if (self->dreamvideosrc)
618                                         {
619                                                 guint64 videosource_dts_offset;
620                                                 g_signal_emit_by_name(self->dreamvideosrc, "get-dts-offset", &videosource_dts_offset);
621                                                 if (videosource_dts_offset != GST_CLOCK_TIME_NONE)
622                                                 {
623                                                         GST_DEBUG_OBJECT (self, "use DREAMVIDEOSOURCE's dts_offset=%" GST_TIME_FORMAT "", GST_TIME_ARGS (videosource_dts_offset) );
624                                                         self->dts_offset = videosource_dts_offset;
625                                                 }
626                                         }
627                                         if (self->dts_offset == GST_CLOCK_TIME_NONE)
628                                         {
629                                                 self->dts_offset = encoder_pts;
630                                                 GST_DEBUG_OBJECT (self, "use mpeg stream pts as dts_offset=%" GST_TIME_FORMAT" (%lld)", GST_TIME_ARGS (self->dts_offset), desc->stCommon.uiPTS);
631                                         }
632                                 }
633                                 g_mutex_unlock (&self->mutex);
634                         }
635
636                         if (G_UNLIKELY (self->dts_offset == GST_CLOCK_TIME_NONE))
637                         {
638                                 GST_DEBUG_OBJECT (self, "dts_offset is still unknown, skipping frame...");
639                                 self->descriptors_count++;
640                                 break;
641                         }
642
643                         if (encoder_pts != GST_CLOCK_TIME_NONE)
644                         {
645                                 GstClockTime pts_clock_time = encoder_pts - self->dts_offset;
646                                 GstClockTime internal, external;
647                                 GstClockTime rate_n, rate_d;
648                                 GstClockTimeDiff diff;
649
650                                 gst_clock_get_calibration (self->encoder_clock, &internal, &external, &rate_n, &rate_d);
651
652                                 if (internal > pts_clock_time) {
653                                         diff = internal - pts_clock_time;
654                                         diff = gst_util_uint64_scale (diff, rate_n, rate_d);
655                                         pts_clock_time = external - diff;
656                                 } else {
657                                         diff = pts_clock_time - internal;
658                                         diff = gst_util_uint64_scale (diff, rate_n, rate_d);
659                                         pts_clock_time = external + diff;
660                                 }
661
662                                 result_pts = pts_clock_time - base_time;
663
664 #define extra_timestamp_debug
665 #ifdef extra_timestamp_debug
666                                 GstClockTime my_int_time = gst_clock_get_internal_time(self->encoder_clock);
667                                 GstClockTime pipeline_int_time = GST_CLOCK_TIME_NONE;
668                                 GstClock *elemclk = gst_element_get_clock (GST_ELEMENT (self));
669                                 if (elemclk)
670                                 {
671                                         pipeline_int_time = gst_clock_get_internal_time(elemclk);
672                                         gst_object_unref (elemclk);
673                                 }
674
675                                 GST_LOG_OBJECT (self, "post-calibration\n"
676                                 "%" GST_TIME_FORMAT "=base_time       %" GST_TIME_FORMAT "=clock_time\n"
677                                 "%" GST_TIME_FORMAT "=encoder_pts     %" GST_TIME_FORMAT "=pts_clock_time     %" GST_TIME_FORMAT "=result_pts\n"
678                                 "%" GST_TIME_FORMAT "=internal        %" GST_TIME_FORMAT "=external           %" GST_TIME_FORMAT "=diff\n"
679                                 "%" GST_TIME_FORMAT "=rate_n          %" GST_TIME_FORMAT "=rate_d\n"
680                                 "%" GST_TIME_FORMAT "=my_int_time     %" GST_TIME_FORMAT "=pipeline_int_time\n"
681                                 ,
682                                 GST_TIME_ARGS (base_time), GST_TIME_ARGS (clock_time),
683                                 GST_TIME_ARGS (encoder_pts), GST_TIME_ARGS (pts_clock_time), GST_TIME_ARGS (result_pts),
684                                 GST_TIME_ARGS (internal), GST_TIME_ARGS (external), GST_TIME_ARGS (diff),
685                                 GST_TIME_ARGS (rate_n), GST_TIME_ARGS (rate_d),
686                                 GST_TIME_ARGS (my_int_time), GST_TIME_ARGS (pipeline_int_time)
687                                 );
688 #endif
689                         }
690
691                         struct _buffer_memorytracker * memtrack = malloc(sizeof(struct _buffer_memorytracker));
692                         if (G_UNLIKELY (!memtrack))
693                         {
694                                 GST_ERROR_OBJECT (self, "can't allocate buffer_memorytracker");
695                                 goto stop_running;
696                         }
697
698                         GST_OBJECT_LOCK (self);
699                         readbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, enc->cdb, AMMAPSIZE, desc->stCommon.uiOffset, desc->stCommon.uiLength, memtrack, (GDestroyNotify) gst_dreamaudiosource_free_buffer);
700                         memtrack->self = self;
701                         memtrack->buffer = readbuf;
702                         memtrack->uiOffset = desc->stCommon.uiOffset;
703                         memtrack->uiLength = desc->stCommon.uiLength;
704                         self->memtrack_list = g_list_append(self->memtrack_list, memtrack);
705                         GST_OBJECT_UNLOCK (self);
706
707                         if (result_pts != GST_CLOCK_TIME_NONE)
708                                 GST_BUFFER_DTS(readbuf) = result_pts;
709 #ifdef dump
710                         int wret = write(self->dumpfd, (unsigned char*)(enc->cdb + desc->stCommon.uiOffset), desc->stCommon.uiLength);
711                         GST_LOG_OBJECT (self, "read %i dumped %i total %" G_GSIZE_FORMAT " ", desc->stCommon.uiLength, wret, gst_buffer_get_size (*outbuf) );
712 #endif
713                         self->descriptors_count++;
714                         break;
715                 }
716
717                 if (self->descriptors_count == self->descriptors_available)
718                 {
719                         GST_LOG_OBJECT (self, "self->descriptors_count == self->descriptors_available -> release %i consumed descriptors", self->descriptors_count);
720                         /* release consumed descs */
721                         if (write(enc->fd, &self->descriptors_count, sizeof(self->descriptors_count)) != sizeof(self->descriptors_count)) {
722                                 GST_WARNING_OBJECT (self, "release consumed descs write error!");
723                                 goto stop_running;
724                         }
725                         self->descriptors_available = 0;
726                 }
727
728                 if (readbuf)
729                 {
730                         g_mutex_lock (&self->mutex);
731                         if (!self->flushing)
732                         {
733                                 while (g_queue_get_length (&self->current_frames) >= self->buffer_size)
734                                 {
735                                         GstBuffer * oldbuf = g_queue_pop_head (&self->current_frames);
736                                         GST_WARNING_OBJECT (self, "dropping %" GST_PTR_FORMAT " because of queue overflow! buffers count=%i", oldbuf, g_queue_get_length (&self->current_frames));
737                                         gst_buffer_unref(oldbuf);
738                                         GST_BUFFER_FLAG_SET ((GstBuffer *) g_queue_peek_head (&self->current_frames), GST_BUFFER_FLAG_DISCONT);
739                                 }
740                                 if (discont)
741                                 {
742                                         GST_BUFFER_FLAG_SET (readbuf, GST_BUFFER_FLAG_DISCONT);
743                                         discont = FALSE;
744                                 }
745                                 g_queue_push_tail (&self->current_frames, readbuf);
746                                 GST_INFO_OBJECT (self, "read %" GST_PTR_FORMAT " to queue", readbuf );
747                         }
748                         else
749                                 gst_buffer_unref(readbuf);
750                         g_cond_signal (&self->cond);
751                         g_mutex_unlock (&self->mutex);
752                 }
753         }
754
755         g_assert_not_reached ();
756         return;
757
758         stop_running:
759         {
760                 g_mutex_unlock (&self->mutex);
761                 g_cond_signal (&self->cond);
762                 GST_DEBUG ("stop running, exit thread");
763                 message = gst_message_new_stream_status (GST_OBJECT_CAST (self), GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT_CAST (GST_OBJECT_PARENT(self)));
764                 g_value_init (&val, GST_TYPE_G_THREAD);
765                 g_value_set_boxed (&val, self->readthread);
766                 gst_message_set_stream_status_object (message, &val);
767                 g_value_unset (&val);
768                 GST_DEBUG_OBJECT (self, "posting LEAVE stream status");
769                 gst_element_post_message (GST_ELEMENT_CAST (self), message);
770                 return;
771         }
772 }
773
774 static GstFlowReturn
775 gst_dreamaudiosource_create (GstPushSrc * psrc, GstBuffer ** outbuf)
776 {
777         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (psrc);
778
779         GST_LOG_OBJECT (self, "new buffer requested");
780
781         g_mutex_lock (&self->mutex);
782         while (g_queue_is_empty (&self->current_frames) && !self->flushing)
783         {
784                 g_cond_wait (&self->cond, &self->mutex);
785                 GST_INFO_OBJECT (self, "waiting for buffer from encoder");
786         }
787
788         *outbuf = g_queue_pop_head (&self->current_frames);
789         g_mutex_unlock (&self->mutex);
790
791         if (*outbuf)
792         {
793                 GST_INFO_OBJECT (self, "pushing %" GST_PTR_FORMAT "", *outbuf );
794                 return GST_FLOW_OK;
795         }
796         GST_INFO_OBJECT (self, "FLUSHING");
797         return GST_FLOW_FLUSHING;
798 }
799
800 static GstStateChangeReturn gst_dreamaudiosource_change_state (GstElement * element, GstStateChange transition)
801 {
802         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (element);
803         GstStateChangeReturn sret = GST_STATE_CHANGE_SUCCESS;
804         int ret;
805
806         switch (transition) {
807                 case GST_STATE_CHANGE_NULL_TO_READY:
808                 {
809                         if (!gst_dreamaudiosource_encoder_init (self))
810                                 return GST_STATE_CHANGE_FAILURE;
811                         GST_DEBUG_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
812                         break;
813                 }
814                 case GST_STATE_CHANGE_READY_TO_PAUSED:
815                         GST_LOG_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
816                         self->dreamvideosrc = gst_bin_get_by_name_recurse_up(GST_BIN(GST_ELEMENT_PARENT(self)), "dreamvideosource0");
817
818                         self->dts_offset = GST_CLOCK_TIME_NONE;
819 #ifdef PROVIDE_CLOCK
820                         gst_element_post_message (element, gst_message_new_clock_provide (GST_OBJECT_CAST (element), self->encoder_clock, TRUE));
821 #endif
822                         self->flushing = TRUE;
823                         self->readthread = g_thread_try_new ("dreamaudiosrc-read", (GThreadFunc) gst_dreamaudiosource_read_thread_func, self, NULL);
824                         GST_DEBUG_OBJECT (self, "started readthread @%p", self->readthread );
825                         break;
826                 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
827                         g_mutex_lock (&self->mutex);
828                         GST_LOG_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
829                         SEND_COMMAND (self, CONTROL_RUN);
830                         GstClock *pipeline_clock = gst_element_get_clock (GST_ELEMENT (self));
831                         if (pipeline_clock)
832                         {
833                                 if (pipeline_clock != self->encoder_clock)
834                                 {
835                                         gst_clock_set_master (self->encoder_clock, pipeline_clock);
836                                         GST_DEBUG_OBJECT (self, "slaved %" GST_PTR_FORMAT "to pipeline_clock %" GST_PTR_FORMAT "", self->encoder_clock, pipeline_clock);
837                                 }
838                                 else
839                                         GST_DEBUG_OBJECT (self, "encoder_clock is master clock");
840                         }
841                                 else
842                                         GST_WARNING_OBJECT (self, "no pipeline clock!");
843                         ret = ioctl(self->encoder->fd, AENC_START);
844                         if ( ret != 0 )
845                                 goto fail;
846                         self->descriptors_available = 0;
847                         CLEAR_COMMAND (self);
848                         GST_INFO_OBJECT (self, "started encoder!");
849                         g_mutex_unlock (&self->mutex);
850                         break;
851                 default:
852                         break;
853         }
854
855         if (GST_ELEMENT_CLASS (parent_class)->change_state)
856                 sret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
857
858         switch (transition) {
859                 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
860                         g_mutex_lock (&self->mutex);
861                         GST_DEBUG_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED self->descriptors_count=%i self->descriptors_available=%i", self->descriptors_count, self->descriptors_available);
862                         SEND_COMMAND (self, CONTROL_PAUSE);
863                         while (self->descriptors_count < self->descriptors_available)
864                         {
865                                 GST_LOG_OBJECT (self, "flushing self->descriptors_count=%i");
866                                 self->descriptors_count++;
867                         }
868                         if (self->descriptors_count)
869                                 write(self->encoder->fd, &self->descriptors_count, sizeof(self->descriptors_count));
870                         ret = ioctl(self->encoder->fd, AENC_STOP);
871                         if ( ret != 0 )
872                                 goto fail;
873 #ifdef PROVIDE_CLOCK
874                         gst_clock_set_master (self->encoder_clock, NULL);
875 #endif
876                         GST_INFO_OBJECT (self, "stopped encoder!");
877                         g_mutex_unlock (&self->mutex);
878                         break;
879                 case GST_STATE_CHANGE_PAUSED_TO_READY:
880                         GST_DEBUG_OBJECT (self,"GST_STATE_CHANGE_PAUSED_TO_READY");
881 #ifdef PROVIDE_CLOCK
882                         gst_element_post_message (element, gst_message_new_clock_lost (GST_OBJECT_CAST (element), self->encoder_clock));
883 #endif
884                         GST_DEBUG_OBJECT (self, "stopping readthread @%p...", self->readthread);
885                         SEND_COMMAND (self, CONTROL_STOP);
886                         g_thread_join (self->readthread);
887                         if (self->dreamvideosrc)
888                                 gst_object_unref(self->dreamvideosrc);
889                         self->dreamvideosrc = NULL;
890                         break;
891                 case GST_STATE_CHANGE_READY_TO_NULL:
892                         gst_dreamaudiosource_encoder_release (self);
893                         GST_DEBUG_OBJECT (self,"GST_STATE_CHANGE_READY_TO_NULL");
894                         break;
895                 default:
896                         break;
897         }
898
899         return sret;
900 fail:
901         GST_ERROR_OBJECT(self,"can't perform encoder ioctl!");
902         g_mutex_unlock (&self->mutex);
903         return GST_STATE_CHANGE_FAILURE;
904 }
905
906 static void
907 gst_dreamaudiosource_dispose (GObject * gobject)
908 {
909         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (gobject);
910 #ifdef dump
911         close(self->dumpfd);
912 #endif
913         g_mutex_clear (&self->mutex);
914         g_cond_clear (&self->cond);
915         GST_DEBUG_OBJECT (self, "disposed");
916         G_OBJECT_CLASS (parent_class)->dispose (gobject);
917 }
918
919 #ifdef PROVIDE_CLOCK
920 static GstClock *gst_dreamaudiosource_provide_clock (GstElement * element)
921 {
922         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (element);
923
924         if (!self->encoder || self->encoder->fd < 0)
925         {
926                 GST_DEBUG_OBJECT (self, "encoder device not started, can't provide clock!");
927                 return NULL;
928         }
929
930         return GST_CLOCK_CAST (gst_object_ref (self->encoder_clock));
931 }
932
933 #endif
934