unref buffers on flushing = TRUE in readfunc (slomo's suggestion)
[gst-plugin-dreamsource.git] / src / gstdreamaudiosource.c
1 /*
2  * GStreamer dreamaudiosource
3  * Copyright 2014-2015 Andreas Frisch <fraxinas@opendreambox.org>
4  *
5  * This program is licensed under the Creative Commons
6  * Attribution-NonCommercial-ShareAlike 3.0 Unported
7  * License. To view a copy of this license, visit
8  * http://creativecommons.org/licenses/by-nc-sa/3.0/ or send a letter to
9  * Creative Commons,559 Nathan Abbott Way,Stanford,California 94305,USA.
10  *
11  * Alternatively, this program may be distributed and executed on
12  * hardware which is licensed by Dream Property GmbH.
13  *
14  * This program is NOT free software. It is open source, you are allowed
15  * to modify it (if you keep the license), but it may not be commercially
16  * distributed other than under the conditions noted above.
17  */
18
19 #ifdef HAVE_CONFIG_H
20 #include <config.h>
21 #endif
22
23 #include <gst/gst.h>
24 #include "gstdreamaudiosource.h"
25
26 GST_DEBUG_CATEGORY_STATIC (dreamaudiosource_debug);
27 #define GST_CAT_DEFAULT dreamaudiosource_debug
28
29 GType gst_dreamaudiosource_input_mode_get_type (void)
30 {
31         static volatile gsize input_mode_type = 0;
32         static const GEnumValue input_mode[] = {
33                 {GST_DREAMAUDIOSOURCE_INPUT_MODE_LIVE, "GST_DREAMAUDIOSOURCE_INPUT_MODE_LIVE", "live"},
34                 {GST_DREAMAUDIOSOURCE_INPUT_MODE_HDMI_IN, "GST_DREAMAUDIOSOURCE_INPUT_MODE_HDMI_IN", "hdmi_in"},
35                 {GST_DREAMAUDIOSOURCE_INPUT_MODE_BACKGROUND, "GST_DREAMAUDIOSOURCE_INPUT_MODE_BACKGROUND", "background"},
36                 {0, NULL, NULL},
37         };
38
39         if (g_once_init_enter (&input_mode_type)) {
40                 GType tmp = g_enum_register_static ("GstDreamAudioSourceInputMode", input_mode);
41                 g_once_init_leave (&input_mode_type, tmp);
42         }
43         return (GType) input_mode_type;
44 }
45
46 enum
47 {
48         SIGNAL_GET_BASE_PTS,
49         LAST_SIGNAL
50 };
51 enum
52 {
53         ARG_0,
54         ARG_BITRATE,
55         ARG_INPUT_MODE
56 };
57
58 static guint gst_dreamaudiosource_signals[LAST_SIGNAL] = { 0 };
59
60 #define DEFAULT_BITRATE     128
61 #define DEFAULT_INPUT_MODE  GST_DREAMAUDIOSOURCE_INPUT_MODE_LIVE
62 #define DEFAULT_BUFFER_SIZE 16
63
64 static GstStaticPadTemplate srctemplate =
65     GST_STATIC_PAD_TEMPLATE ("src",
66         GST_PAD_SRC,
67         GST_PAD_ALWAYS,
68         GST_STATIC_CAPS ("audio/mpeg, "
69         "mpegversion = 4,"
70         "stream-format = (string) adts")
71     );
72
73 #define gst_dreamaudiosource_parent_class parent_class
74 G_DEFINE_TYPE (GstDreamAudioSource, gst_dreamaudiosource, GST_TYPE_PUSH_SRC);
75
76 static GstCaps *gst_dreamaudiosource_getcaps (GstBaseSrc * bsrc, GstCaps * filter);
77 static gboolean gst_dreamaudiosource_start (GstBaseSrc * bsrc);
78 static gboolean gst_dreamaudiosource_stop (GstBaseSrc * bsrc);
79 static gboolean gst_dreamaudiosource_unlock (GstBaseSrc * bsrc);
80 static gboolean gst_dreamaudiosource_unlock_stop (GstBaseSrc * bsrc);
81 static void gst_dreamaudiosource_dispose (GObject * gobject);
82 static GstFlowReturn gst_dreamaudiosource_create (GstPushSrc * psrc, GstBuffer ** outbuf);
83
84 static void gst_dreamaudiosource_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec);
85 static void gst_dreamaudiosource_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec);
86
87 static GstStateChangeReturn gst_dreamaudiosource_change_state (GstElement * element, GstStateChange transition);
88 static gint64 gst_dreamaudiosource_get_base_pts (GstDreamAudioSource *self);
89
90 static gboolean gst_dreamaudiosource_encoder_init (GstDreamAudioSource * self);
91 static void gst_dreamaudiosource_encoder_release (GstDreamAudioSource * self);
92
93 static void gst_dreamaudiosource_read_thread_func (GstDreamAudioSource * self);
94
95 #ifdef PROVIDE_CLOCK
96 static GstClock *gst_dreamaudiosource_provide_clock (GstElement * elem);
97 // static GstClockTime gst_dreamaudiosource_get_encoder_time_ (GstClock * clock, GstBaseSrc * bsrc);
98 #endif
99
100 static void
101 gst_dreamaudiosource_class_init (GstDreamAudioSourceClass * klass)
102 {
103         GObjectClass *gobject_class;
104         GstElementClass *gstelement_class;
105         GstBaseSrcClass *gstbasesrc_class;
106         GstPushSrcClass *gstpush_src_class;
107
108         gobject_class = (GObjectClass *) klass;
109         gstelement_class = (GstElementClass *) klass;
110         gstbasesrc_class = (GstBaseSrcClass *) klass;
111         gstpush_src_class = (GstPushSrcClass *) klass;
112
113         gobject_class->set_property = gst_dreamaudiosource_set_property;
114         gobject_class->get_property = gst_dreamaudiosource_get_property;
115         gobject_class->dispose = gst_dreamaudiosource_dispose;
116
117         gst_element_class_add_pad_template (gstelement_class,
118                                             gst_static_pad_template_get (&srctemplate));
119
120         gst_element_class_set_static_metadata (gstelement_class,
121             "Dream Audio source", "Source/Audio",
122             "Provide an audio elementary stream from Dreambox encoder device",
123             "Andreas Frisch <fraxinas@opendreambox.org>");
124
125         gstelement_class->change_state = gst_dreamaudiosource_change_state;
126
127         gstbasesrc_class->get_caps = gst_dreamaudiosource_getcaps;
128         gstbasesrc_class->start = gst_dreamaudiosource_start;
129         gstbasesrc_class->stop = gst_dreamaudiosource_stop;
130         gstbasesrc_class->unlock = gst_dreamaudiosource_unlock;
131         gstbasesrc_class->unlock_stop = gst_dreamaudiosource_unlock_stop;
132
133         gstpush_src_class->create = gst_dreamaudiosource_create;
134
135 #ifdef PROVIDE_CLOCK
136         gstelement_class->provide_clock = GST_DEBUG_FUNCPTR (gst_dreamaudiosource_provide_clock);
137 //      g_type_class_ref (GST_TYPE_SYSTEM_CLOCK);
138 #endif
139
140         g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BITRATE,
141           g_param_spec_int ("bitrate", "Bitrate (kb/s)",
142             "Bitrate in kbit/sec", 16, 320, DEFAULT_BITRATE,
143             G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
144
145         g_object_class_install_property (gobject_class, ARG_INPUT_MODE,
146           g_param_spec_enum ("input-mode", "Input Mode",
147             "Select the input source of the audio stream",
148             GST_TYPE_DREAMAUDIOSOURCE_INPUT_MODE, DEFAULT_INPUT_MODE,
149             G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
150
151         gst_dreamaudiosource_signals[SIGNAL_GET_BASE_PTS] =
152                 g_signal_new ("get-base-pts",
153                 G_TYPE_FROM_CLASS (klass),
154                 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
155                 G_STRUCT_OFFSET (GstDreamAudioSourceClass, get_base_pts),
156                 NULL, NULL, gst_dreamsource_marshal_INT64__VOID, G_TYPE_INT64, 0);
157
158         klass->get_base_pts = gst_dreamaudiosource_get_base_pts;
159
160 }
161
162 static gint64
163 gst_dreamaudiosource_get_base_pts (GstDreamAudioSource *self)
164 {
165         GST_DEBUG_OBJECT (self, "gst_dreamaudiosource_get_base_pts %" GST_TIME_FORMAT"", GST_TIME_ARGS (self->base_pts) );
166         return self->base_pts;
167 }
168
169 static void gst_dreamaudiosource_set_bitrate (GstDreamAudioSource * self, uint32_t bitrate)
170 {
171         if (!self->encoder || !self->encoder->fd)
172                 return;
173         uint32_t abr = bitrate*1000;
174         int ret = ioctl(self->encoder->fd, AENC_SET_BITRATE, &abr);
175         if (ret != 0)
176         {
177                 GST_WARNING_OBJECT (self, "can't set audio bitrate to %i bytes/s!", abr);
178                 return;
179         }
180         GST_INFO_OBJECT (self, "set audio bitrate to %i kBytes/s", bitrate);
181         self->audio_info.bitrate = abr;
182 }
183
184 void gst_dreamaudiosource_set_input_mode (GstDreamAudioSource *self, GstDreamAudioSourceInputMode mode)
185 {
186         g_return_if_fail (GST_IS_DREAMAUDIOSOURCE (self));
187         GEnumValue *val = g_enum_get_value (G_ENUM_CLASS (g_type_class_ref (GST_TYPE_DREAMAUDIOSOURCE_INPUT_MODE)), mode);
188         if (!val)
189         {
190                 GST_ERROR_OBJECT (self, "no such input_mode %i!", mode);
191                 goto out;
192         }
193         const gchar *value_nick = val->value_nick;
194         GST_DEBUG_OBJECT (self, "setting input_mode to %s (%i)...", value_nick, mode);
195         g_mutex_lock (&self->mutex);
196         if (!self->encoder || !self->encoder->fd)
197         {
198                 GST_ERROR_OBJECT (self, "can't set input mode because encoder device not opened!");
199                 goto out;
200         }
201         int int_mode = mode;
202         int ret = ioctl(self->encoder->fd, AENC_SET_SOURCE, &int_mode);
203         if (ret != 0)
204         {
205                 GST_WARNING_OBJECT (self, "can't set input mode to %s (%i) error: %s", value_nick, mode, strerror(errno));
206                 goto out;
207         }
208         GST_INFO_OBJECT (self, "successfully set input mode to %s (%i)", value_nick, mode);
209         self->input_mode = mode;
210 out:
211         g_mutex_unlock (&self->mutex);
212         return;
213 }
214
215 GstDreamAudioSourceInputMode gst_dreamaudiosource_get_input_mode (GstDreamAudioSource *self)
216 {
217         GstDreamAudioSourceInputMode result;
218         g_return_val_if_fail (GST_IS_DREAMAUDIOSOURCE (self), -1);
219         GST_OBJECT_LOCK (self);
220         result =self->input_mode;
221         GST_OBJECT_UNLOCK (self);
222         return result;
223 }
224
225 gboolean
226 gst_dreamaudiosource_plugin_init (GstPlugin *plugin)
227 {
228         GST_DEBUG_CATEGORY_INIT (dreamaudiosource_debug, "dreamaudiosource", 0, "dreamaudiosource");
229         return gst_element_register (plugin, "dreamaudiosource", GST_RANK_PRIMARY, GST_TYPE_DREAMAUDIOSOURCE);
230 }
231
232 static void
233 gst_dreamaudiosource_init (GstDreamAudioSource * self)
234 {
235         self->encoder = NULL;
236         self->descriptors_available = 0;
237         self->input_mode = DEFAULT_INPUT_MODE;
238
239         self->buffer_size = DEFAULT_BUFFER_SIZE;
240         g_queue_init (&self->current_frames);
241         self->readthread = NULL;
242
243         g_mutex_init (&self->mutex);
244         READ_SOCKET (self) = -1;
245         WRITE_SOCKET (self) = -1;
246
247         gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
248         gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
249
250         self->encoder = NULL;
251         self->encoder_clock = NULL;
252
253 #ifdef dump
254         self->dumpfd = open("/media/hdd/movie/dreamaudiosource.dump", O_WRONLY | O_CREAT | O_TRUNC);
255         GST_DEBUG_OBJECT (self, "dumpfd = %i (%s)", self->dumpfd, (self->dumpfd > 0) ? "OK" : strerror(errno));
256 #endif
257 }
258
259 static gboolean gst_dreamaudiosource_encoder_init (GstDreamAudioSource * self)
260 {
261         GST_LOG_OBJECT (self, "initializating encoder...");
262         self->encoder = malloc(sizeof(EncoderInfo));
263
264         if (!self->encoder) {
265                 GST_ERROR_OBJECT (self,"out of space");
266                 return FALSE;
267         }
268
269         char fn_buf[32];
270         sprintf(fn_buf, "/dev/aenc%d", 0);
271         self->encoder->fd = open(fn_buf, O_RDWR | O_SYNC);
272         if (self->encoder->fd <= 0) {
273                 GST_ERROR_OBJECT (self,"cannot open device %s (%s)", fn_buf, strerror(errno));
274                 free(self->encoder);
275                 self->encoder = NULL;
276                 return FALSE;
277         }
278
279         self->encoder->buffer = malloc(ABUFSIZE);
280         if (!self->encoder->buffer) {
281                 GST_ERROR_OBJECT(self,"cannot alloc buffer");
282                 return FALSE;
283         }
284
285         self->encoder->cdb = (unsigned char *)mmap (0, AMMAPSIZE, PROT_READ, MAP_PRIVATE, self->encoder->fd, 0);
286
287         if (!self->encoder->cdb || self->encoder->cdb == MAP_FAILED) {
288                 GST_ERROR_OBJECT(self,"cannot alloc buffer: %s (%d)", strerror(errno));
289                 self->encoder->cdb = NULL;
290                 return FALSE;
291         }
292
293         int control_sock[2];
294         if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
295         {
296                 GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, (NULL), GST_ERROR_SYSTEM);
297                 return GST_STATE_CHANGE_FAILURE;
298         }
299         READ_SOCKET (self) = control_sock[0];
300         WRITE_SOCKET (self) = control_sock[1];
301         fcntl (READ_SOCKET (self), F_SETFL, O_NONBLOCK);
302         fcntl (WRITE_SOCKET (self), F_SETFL, O_NONBLOCK);
303
304 #ifdef PROVIDE_CLOCK
305         self->encoder_clock = gst_dreamsource_clock_new ("GstDreamAudioSourceClock", self->encoder->fd);
306         GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
307 #endif
308
309         GST_LOG_OBJECT (self, "encoder %s successfully initialized", fn_buf);
310         return TRUE;
311 }
312
313 static void gst_dreamaudiosource_encoder_release (GstDreamAudioSource * self)
314 {
315         GST_LOG_OBJECT (self, "releasing encoder...");
316         if (self->encoder) {
317                 if (self->encoder->buffer)
318                         free(self->encoder->buffer);
319                 if (self->encoder->cdb)
320                         munmap(self->encoder->cdb, AMMAPSIZE);
321                 if (self->encoder->fd)
322                         close(self->encoder->fd);
323                 free(self->encoder);
324         }
325         self->encoder = NULL;
326         close (READ_SOCKET (self));
327         close (WRITE_SOCKET (self));
328         READ_SOCKET (self) = -1;
329         WRITE_SOCKET (self) = -1;
330         if (self->encoder_clock) {
331                 gst_object_unref (self->encoder_clock);
332                 self->encoder_clock = NULL;
333         }
334 }
335
336 static void
337 gst_dreamaudiosource_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec)
338 {
339         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (object);
340
341         switch (prop_id) {
342                 case ARG_BITRATE:
343                         gst_dreamaudiosource_set_bitrate(self, g_value_get_int (value));
344                         break;
345                 case ARG_INPUT_MODE:
346                              gst_dreamaudiosource_set_input_mode (self, g_value_get_enum (value));
347                         break;
348                 default:
349                         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
350                         break;
351         }
352 }
353
354 static void
355 gst_dreamaudiosource_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec)
356 {
357         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (object);
358
359         switch (prop_id) {
360                 case ARG_BITRATE:
361                         g_value_set_int (value, self->audio_info.bitrate/1000);
362                         break;
363                 case ARG_INPUT_MODE:
364                         g_value_set_enum (value, gst_dreamaudiosource_get_input_mode (self));
365                         break;
366                 default:
367                         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
368                         break;
369         }
370 }
371
372 static GstCaps *
373 gst_dreamaudiosource_getcaps (GstBaseSrc * bsrc, GstCaps * filter)
374 {
375         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
376         GstPadTemplate *pad_template;
377         GstCaps *caps;
378
379         pad_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS(self), "src");
380         g_return_val_if_fail (pad_template != NULL, NULL);
381
382         if (self->encoder == NULL) {
383                 GST_DEBUG_OBJECT (self, "encoder not opened -> use template caps");
384                 caps = gst_pad_template_get_caps (pad_template);
385         }
386         else
387         {
388                 caps = gst_caps_make_writable(gst_pad_template_get_caps (pad_template));
389         }
390
391         GST_DEBUG_OBJECT (self, "return caps %" GST_PTR_FORMAT, caps);
392         return caps;
393 }
394
395 static gboolean gst_dreamaudiosource_unlock (GstBaseSrc * bsrc)
396 {
397         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
398         GST_DEBUG_OBJECT (self, "stop creating buffers");
399         g_mutex_lock (&self->mutex);
400         self->flushing = TRUE;
401         GST_DEBUG_OBJECT (self, "set flushing TRUE");
402         g_cond_signal (&self->cond);
403         g_mutex_unlock (&self->mutex);
404         return TRUE;
405 }
406
407 static gboolean gst_dreamaudiosource_unlock_stop (GstBaseSrc * bsrc)
408 {
409         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
410         GST_DEBUG_OBJECT (self, "start creating buffers...");
411         g_mutex_lock (&self->mutex);
412         self->flushing = FALSE;
413         g_queue_foreach (&self->current_frames, (GFunc) gst_buffer_unref, NULL);
414         g_queue_clear (&self->current_frames);
415         g_mutex_unlock (&self->mutex);
416         return TRUE;
417 }
418
419 static void gst_dreamaudiosource_read_thread_func (GstDreamAudioSource * self)
420 {
421         EncoderInfo *enc = self->encoder;
422         GstBuffer *readbuf;
423
424         if (!enc) {
425                 GST_WARNING_OBJECT (self, "encoder device not opened!");
426                 return;
427         }
428
429         GST_DEBUG_OBJECT (self, "enter read thread");
430
431         GstMessage *message;
432         GValue val = { 0 };
433
434         message = gst_message_new_stream_status (GST_OBJECT_CAST (self), GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT_CAST (GST_OBJECT_PARENT(self)));
435         g_value_init (&val, GST_TYPE_G_THREAD);
436         g_value_set_boxed (&val, self->readthread);
437         gst_message_set_stream_status_object (message, &val);
438         g_value_unset (&val);
439         GST_DEBUG_OBJECT (self, "posting ENTER stream status");
440         gst_element_post_message (GST_ELEMENT_CAST (self), message);
441
442         while (TRUE) {
443                 readbuf = NULL;
444                 {
445                         struct pollfd rfd[2];
446                         int timeout, nfds;
447
448                         rfd[0].fd = READ_SOCKET (self);
449                         rfd[0].events = POLLIN | POLLERR | POLLHUP | POLLPRI;
450
451                         if (self->descriptors_available == 0)
452                         {
453                                 rfd[1].fd = enc->fd;
454                                 rfd[1].events = POLLIN;
455                                 self->descriptors_count = 0;
456                                 timeout = 200;
457                                 nfds = 2;
458                         }
459                         else
460                         {
461                                 rfd[1].revents = 0;
462                                 nfds = 1;
463                                 timeout = 0;
464                         }
465
466                         int ret = poll(rfd, nfds, timeout);
467
468                         if (G_UNLIKELY (ret == -1))
469                         {
470                                 GST_ERROR_OBJECT (self, "SELECT ERROR!");
471                                 goto stop_running;
472                         }
473                         else if ( ret == 0 && self->descriptors_available == 0 )
474                         {
475                                 g_mutex_lock (&self->mutex);
476                                 gst_clock_get_time(self->encoder_clock);
477                                 if (self->flushing)
478                                 {
479                                         GST_DEBUG_OBJECT (self, "FLUSHING!");
480                                         g_cond_signal (&self->cond);
481                                         continue;
482                                 }
483                                 g_mutex_unlock (&self->mutex);
484                                 GST_DEBUG_OBJECT (self, "SELECT TIMEOUT");
485                                 //!!! TODO generate valid dummy payload
486                                 readbuf = gst_buffer_new();
487                         }
488                         else if ( rfd[0].revents )
489                         {
490                                 char command;
491                                 READ_COMMAND (self, command, ret);
492                                 if (command == CONTROL_STOP)
493                                 {
494                                         GST_DEBUG_OBJECT (self, "CONTROL_STOP!");
495                                         goto stop_running;
496                                 }
497                         }
498                         else if ( G_LIKELY(rfd[1].revents & POLLIN) )
499                         {
500                                 int rlen = read(enc->fd, enc->buffer, ABUFSIZE);
501                                 if (rlen <= 0 || rlen % ABDSIZE ) {
502                                         if ( errno == 512 )
503                                                 goto stop_running;
504                                         GST_WARNING_OBJECT (self, "read error %s (%i)", strerror(errno), errno);
505                                         goto stop_running;
506                                 }
507                                 self->descriptors_available = rlen / ABDSIZE;
508                                 GST_LOG_OBJECT (self, "encoder buffer was empty, %d descriptors available", self->descriptors_available);
509                         }
510                 }
511
512                 while (self->descriptors_count < self->descriptors_available)
513                 {
514                         off_t offset = self->descriptors_count * ABDSIZE;
515                         AudioBufferDescriptor *desc = (AudioBufferDescriptor*)(&enc->buffer[offset]);
516
517                         uint32_t f = desc->stCommon.uiFlags;
518
519                         GST_LOG_OBJECT (self, "descriptors_count=%d, descriptors_available=%d\tuiOffset=%d, uiLength=%d", self->descriptors_count, self->descriptors_available, desc->stCommon.uiOffset, desc->stCommon.uiLength);
520
521                         if (G_UNLIKELY (f & CDB_FLAG_METADATA))
522                         {
523                                 GST_LOG_OBJECT (self, "CDB_FLAG_METADATA... skip outdated packet");
524                                 self->descriptors_count = self->descriptors_available;
525                                 continue;
526                         }
527
528                         readbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, enc->cdb, AMMAPSIZE, desc->stCommon.uiOffset, desc->stCommon.uiLength, self, NULL);
529
530                         GstClockTime buffer_pts = GST_CLOCK_TIME_NONE;
531
532                         // uiDTS since kernel driver booted
533                         if (f & CDB_FLAG_PTS_VALID)
534                         {
535                                 buffer_pts = MPEGTIME_TO_GSTTIME(desc->stCommon.uiPTS);
536                                 GST_LOG_OBJECT (self, "f & CDB_FLAG_PTS_VALID && encoder's uiPTS=%" GST_TIME_FORMAT"", GST_TIME_ARGS(buffer_pts));
537
538                                 g_mutex_lock (&self->mutex);
539                                 if (G_UNLIKELY (self->base_pts == GST_CLOCK_TIME_NONE))
540                                 {
541                                         if (self->dreamvideosrc)
542                                         {
543                                                 guint64 videosource_base_pts;
544                                                 g_signal_emit_by_name(self->dreamvideosrc, "get-base-pts", &videosource_base_pts);
545                                                 if (videosource_base_pts != GST_CLOCK_TIME_NONE)
546                                                 {
547                                                         GST_DEBUG_OBJECT (self, "use DREAMVIDEOSOURCE's base_pts=%" GST_TIME_FORMAT "", GST_TIME_ARGS (videosource_base_pts) );
548                                                         self->base_pts = videosource_base_pts;
549                                                 }
550                                         }
551                                         if (self->base_pts == GST_CLOCK_TIME_NONE)
552                                         {
553                                                 self->base_pts = buffer_pts;
554                                                 GST_DEBUG_OBJECT (self, "use mpeg stream pts as base_pts=%" GST_TIME_FORMAT" (%lld)", GST_TIME_ARGS (self->base_pts), desc->stCommon.uiPTS);
555                                         }
556                                 }
557                                 g_mutex_unlock (&self->mutex);
558
559                                 if (self->base_pts != GST_CLOCK_TIME_NONE && buffer_pts >= self->base_pts )
560                                 {
561                                         buffer_pts -= self->base_pts;
562                                         GST_BUFFER_PTS(readbuf) = buffer_pts;
563                                         GST_BUFFER_DTS(readbuf) = buffer_pts;
564                                         GST_LOG_OBJECT (self, "corrected pts+dts=%" GST_TIME_FORMAT "", GST_TIME_ARGS (GST_BUFFER_PTS(readbuf)));
565                                 }
566                         }
567
568                         if (G_UNLIKELY (self->base_pts == GST_CLOCK_TIME_NONE))
569                         {
570                                 GST_DEBUG_OBJECT (self, "self->base_pts == GST_CLOCK_TIME_NONE! skip this frame");
571                                 self->descriptors_count++;
572                                 break;
573                         }
574
575 #ifdef dump
576                         int wret = write(self->dumpfd, (unsigned char*)(enc->cdb + desc->stCommon.uiOffset), desc->stCommon.uiLength);
577                         GST_LOG_OBJECT (self, "read %i dumped %i total %" G_GSIZE_FORMAT " ", desc->stCommon.uiLength, wret, gst_buffer_get_size (*outbuf) );
578 #endif
579                         self->descriptors_count++;
580                         break;
581                 }
582
583                 if (self->descriptors_count == self->descriptors_available)
584                 {
585                         GST_LOG_OBJECT (self, "self->descriptors_count == self->descriptors_available -> release %i consumed descriptors", self->descriptors_count);
586                         /* release consumed descs */
587                         if (write(enc->fd, &self->descriptors_count, sizeof(self->descriptors_count)) != sizeof(self->descriptors_count)) {
588                                 GST_WARNING_OBJECT (self, "release consumed descs write error!");
589                                 goto stop_running;
590                         }
591                         self->descriptors_available = 0;
592                 }
593
594                 if (readbuf)
595                 {
596                         g_mutex_lock (&self->mutex);
597                         if (!self->flushing)
598                         {
599                                 while (g_queue_get_length (&self->current_frames) >= self->buffer_size)
600                                 {
601                                         GstBuffer * oldbuf = g_queue_pop_head (&self->current_frames);
602                                         GST_WARNING_OBJECT (self, "dropping %" GST_PTR_FORMAT " because of queue overflow! buffers count=%i", oldbuf, g_queue_get_length (&self->current_frames));
603                                         gst_buffer_unref(oldbuf);
604                                 }
605                                 g_queue_push_tail (&self->current_frames, readbuf);
606                         }
607                         else
608                                 gst_buffer_unref(readbuf);
609                         g_cond_signal (&self->cond);
610                         GST_INFO_OBJECT (self, "read %" GST_PTR_FORMAT " to queue", readbuf );
611                         g_mutex_unlock (&self->mutex);
612                 }
613         }
614
615         g_assert_not_reached ();
616         return;
617
618         stop_running:
619         {
620                 g_mutex_unlock (&self->mutex);
621                 g_cond_signal (&self->cond);
622                 GST_DEBUG ("stop running, exit thread");
623                 message = gst_message_new_stream_status (GST_OBJECT_CAST (self), GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT_CAST (GST_OBJECT_PARENT(self)));
624                 g_value_init (&val, GST_TYPE_G_THREAD);
625                 g_value_set_boxed (&val, self->readthread);
626                 gst_message_set_stream_status_object (message, &val);
627                 g_value_unset (&val);
628                 GST_DEBUG_OBJECT (self, "posting LEAVE stream status");
629                 gst_element_post_message (GST_ELEMENT_CAST (self), message);
630                 return;
631         }
632 }
633
634 static GstFlowReturn
635 gst_dreamaudiosource_create (GstPushSrc * psrc, GstBuffer ** outbuf)
636 {
637         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (psrc);
638
639         GST_LOG_OBJECT (self, "new buffer requested");
640
641         g_mutex_lock (&self->mutex);
642         while (g_queue_is_empty (&self->current_frames) && !self->flushing)
643         {
644                 g_cond_wait (&self->cond, &self->mutex);
645                 GST_INFO_OBJECT (self, "waiting for buffer from encoder");
646         }
647
648         *outbuf = g_queue_pop_head (&self->current_frames);
649         g_mutex_unlock (&self->mutex);
650
651         if (*outbuf)
652         {
653                 GST_INFO_OBJECT (self, "pushing %" GST_PTR_FORMAT "", *outbuf );
654                 return GST_FLOW_OK;
655         }
656         GST_INFO_OBJECT (self, "FLUSHING");
657         return GST_FLOW_FLUSHING;
658 }
659
660 static GstStateChangeReturn gst_dreamaudiosource_change_state (GstElement * element, GstStateChange transition)
661 {
662         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (element);
663         GstStateChangeReturn sret = GST_STATE_CHANGE_SUCCESS;
664         int ret;
665
666         switch (transition) {
667                 case GST_STATE_CHANGE_NULL_TO_READY:
668                 {
669                         if (!gst_dreamaudiosource_encoder_init (self))
670                                 return GST_STATE_CHANGE_FAILURE;
671                         GST_DEBUG_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
672                         break;
673                 }
674                 case GST_STATE_CHANGE_READY_TO_PAUSED:
675                         GST_LOG_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
676 #ifdef PROVIDE_CLOCK
677                         gst_element_post_message (element, gst_message_new_clock_provide (GST_OBJECT_CAST (element), self->encoder_clock, TRUE));
678 #endif
679                         self->flushing = TRUE;
680                         self->readthread = g_thread_try_new ("dreamaudiosrc-read", (GThreadFunc) gst_dreamaudiosource_read_thread_func, self, NULL);
681                         GST_DEBUG_OBJECT (self, "started readthread @%p", self->readthread );
682                         break;
683                 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
684                         g_mutex_lock (&self->mutex);
685                         GST_LOG_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
686                         self->base_pts = GST_CLOCK_TIME_NONE;
687                         ret = ioctl(self->encoder->fd, AENC_START);
688                         if ( ret != 0 )
689                                 goto fail;
690                         self->descriptors_available = 0;
691                         CLEAR_COMMAND (self);
692                         GST_INFO_OBJECT (self, "started encoder!");
693                         g_mutex_unlock (&self->mutex);
694                         break;
695                 default:
696                         break;
697         }
698
699         if (GST_ELEMENT_CLASS (parent_class)->change_state)
700                 sret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
701
702         switch (transition) {
703                 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
704                         g_mutex_lock (&self->mutex);
705                         GST_DEBUG_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED self->descriptors_count=%i self->descriptors_available=%i", self->descriptors_count, self->descriptors_available);
706                         while (self->descriptors_count < self->descriptors_available)
707                         {
708                                 GST_LOG_OBJECT (self, "flushing self->descriptors_count=%i");
709                                 self->descriptors_count++;
710                         }
711                         if (self->descriptors_count)
712                                 write(self->encoder->fd, &self->descriptors_count, sizeof(self->descriptors_count));
713                         ret = ioctl(self->encoder->fd, AENC_STOP);
714                         if ( ret != 0 )
715                                 goto fail;
716 #ifdef PROVIDE_CLOCK
717                         gst_clock_set_master (self->encoder_clock, NULL);
718 #endif
719                         GST_INFO_OBJECT (self, "stopped encoder!");
720                         g_mutex_unlock (&self->mutex);
721                         break;
722                 case GST_STATE_CHANGE_PAUSED_TO_READY:
723                         GST_DEBUG_OBJECT (self,"GST_STATE_CHANGE_PAUSED_TO_READY");
724 #ifdef PROVIDE_CLOCK
725                         gst_element_post_message (element, gst_message_new_clock_lost (GST_OBJECT_CAST (element), self->encoder_clock));
726 #endif
727                         GST_DEBUG_OBJECT (self, "stopping readthread @%p...", self->readthread);
728                         SEND_COMMAND (self, CONTROL_STOP);
729                         g_thread_join (self->readthread);
730                         break;
731                 case GST_STATE_CHANGE_READY_TO_NULL:
732                         gst_dreamaudiosource_encoder_release (self);
733                         GST_DEBUG_OBJECT (self,"GST_STATE_CHANGE_READY_TO_NULL, close control sockets");
734                         break;
735                 default:
736                         break;
737         }
738
739         return sret;
740 fail:
741         GST_ERROR_OBJECT(self,"can't perform encoder ioctl!");
742         g_mutex_unlock (&self->mutex);
743         return GST_STATE_CHANGE_FAILURE;
744 }
745
746 static gboolean
747 gst_dreamaudiosource_start (GstBaseSrc * bsrc)
748 {
749         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
750         self->dreamvideosrc = gst_bin_get_by_name_recurse_up(GST_BIN(GST_ELEMENT_PARENT(self)), "dreamvideosource0");
751         GST_DEBUG_OBJECT (self, "started. reference to dreamvideosource=%" GST_PTR_FORMAT "", self->dreamvideosrc);
752         return TRUE;
753 }
754
755 static gboolean
756 gst_dreamaudiosource_stop (GstBaseSrc * bsrc)
757 {
758         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
759         if (self->dreamvideosrc)
760                 gst_object_unref(self->dreamvideosrc);
761         GST_DEBUG_OBJECT (self, "stop");
762         return TRUE;
763 }
764
765 static void
766 gst_dreamaudiosource_dispose (GObject * gobject)
767 {
768         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (gobject);
769 #ifdef dump
770         close(self->dumpfd);
771 #endif
772         g_mutex_clear (&self->mutex);
773         g_cond_clear (&self->cond);
774         GST_DEBUG_OBJECT (self, "disposed");
775         G_OBJECT_CLASS (parent_class)->dispose (gobject);
776 }
777
778 #ifdef PROVIDE_CLOCK
779 static GstClock *gst_dreamaudiosource_provide_clock (GstElement * element)
780 {
781         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (element);
782
783         if (!self->encoder || self->encoder->fd < 0)
784         {
785                 GST_DEBUG_OBJECT (self, "encoder device not started, can't provide clock!");
786                 return NULL;
787         }
788
789         return GST_CLOCK_CAST (gst_object_ref (self->encoder_clock));
790 }
791
792 #endif
793