implement readthread for dreamaudiosource
[gst-plugin-dreamsource.git] / src / gstdreamaudiosource.c
1 /*
2  * GStreamer dreamaudiosource
3  * Copyright 2014-2015 Andreas Frisch <fraxinas@opendreambox.org>
4  *
5  * This program is licensed under the Creative Commons
6  * Attribution-NonCommercial-ShareAlike 3.0 Unported
7  * License. To view a copy of this license, visit
8  * http://creativecommons.org/licenses/by-nc-sa/3.0/ or send a letter to
9  * Creative Commons,559 Nathan Abbott Way,Stanford,California 94305,USA.
10  *
11  * Alternatively, this program may be distributed and executed on
12  * hardware which is licensed by Dream Property GmbH.
13  *
14  * This program is NOT free software. It is open source, you are allowed
15  * to modify it (if you keep the license), but it may not be commercially
16  * distributed other than under the conditions noted above.
17  */
18
19 #ifdef HAVE_CONFIG_H
20 #include <config.h>
21 #endif
22
23 #include <gst/gst.h>
24 #include "gstdreamaudiosource.h"
25
26 GST_DEBUG_CATEGORY_STATIC (dreamaudiosource_debug);
27 #define GST_CAT_DEFAULT dreamaudiosource_debug
28
29 GType gst_dreamaudiosource_input_mode_get_type (void)
30 {
31         static volatile gsize input_mode_type = 0;
32         static const GEnumValue input_mode[] = {
33                 {GST_DREAMAUDIOSOURCE_INPUT_MODE_LIVE, "GST_DREAMAUDIOSOURCE_INPUT_MODE_LIVE", "live"},
34                 {GST_DREAMAUDIOSOURCE_INPUT_MODE_HDMI_IN, "GST_DREAMAUDIOSOURCE_INPUT_MODE_HDMI_IN", "hdmi_in"},
35                 {GST_DREAMAUDIOSOURCE_INPUT_MODE_BACKGROUND, "GST_DREAMAUDIOSOURCE_INPUT_MODE_BACKGROUND", "background"},
36                 {0, NULL, NULL},
37         };
38
39         if (g_once_init_enter (&input_mode_type)) {
40                 GType tmp = g_enum_register_static ("GstDreamAudioSourceInputMode", input_mode);
41                 g_once_init_leave (&input_mode_type, tmp);
42         }
43         return (GType) input_mode_type;
44 }
45
46 enum
47 {
48         SIGNAL_GET_BASE_PTS,
49         LAST_SIGNAL
50 };
51 enum
52 {
53         ARG_0,
54         ARG_BITRATE,
55         ARG_INPUT_MODE
56 };
57
58 static guint gst_dreamaudiosource_signals[LAST_SIGNAL] = { 0 };
59
60 #define DEFAULT_BITRATE     128
61 #define DEFAULT_INPUT_MODE  GST_DREAMAUDIOSOURCE_INPUT_MODE_LIVE
62 #define DEFAULT_BUFFER_SIZE 16
63
64 static GstStaticPadTemplate srctemplate =
65     GST_STATIC_PAD_TEMPLATE ("src",
66         GST_PAD_SRC,
67         GST_PAD_ALWAYS,
68         GST_STATIC_CAPS ("audio/mpeg, "
69         "mpegversion = 4,"
70         "stream-format = (string) adts")
71     );
72
73 #define gst_dreamaudiosource_parent_class parent_class
74 G_DEFINE_TYPE (GstDreamAudioSource, gst_dreamaudiosource, GST_TYPE_PUSH_SRC);
75
76 static GstCaps *gst_dreamaudiosource_getcaps (GstBaseSrc * bsrc, GstCaps * filter);
77 static gboolean gst_dreamaudiosource_start (GstBaseSrc * bsrc);
78 static gboolean gst_dreamaudiosource_stop (GstBaseSrc * bsrc);
79 static gboolean gst_dreamaudiosource_unlock (GstBaseSrc * bsrc);
80 static gboolean gst_dreamaudiosource_unlock_stop (GstBaseSrc * bsrc);
81 static void gst_dreamaudiosource_dispose (GObject * gobject);
82 static GstFlowReturn gst_dreamaudiosource_create (GstPushSrc * psrc, GstBuffer ** outbuf);
83
84 static void gst_dreamaudiosource_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec);
85 static void gst_dreamaudiosource_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec);
86
87 static GstStateChangeReturn gst_dreamaudiosource_change_state (GstElement * element, GstStateChange transition);
88 static gint64 gst_dreamaudiosource_get_base_pts (GstDreamAudioSource *self);
89
90 static void gst_dreamaudiosource_read_thread_func (GstDreamAudioSource * self);
91
92 #ifdef PROVIDE_CLOCK
93 static GstClock *gst_dreamaudiosource_provide_clock (GstElement * elem);
94 // static GstClockTime gst_dreamaudiosource_get_encoder_time_ (GstClock * clock, GstBaseSrc * bsrc);
95 #endif
96
97 static void
98 gst_dreamaudiosource_class_init (GstDreamAudioSourceClass * klass)
99 {
100         GObjectClass *gobject_class;
101         GstElementClass *gstelement_class;
102         GstBaseSrcClass *gstbasesrc_class;
103         GstPushSrcClass *gstpush_src_class;
104
105         gobject_class = (GObjectClass *) klass;
106         gstelement_class = (GstElementClass *) klass;
107         gstbasesrc_class = (GstBaseSrcClass *) klass;
108         gstpush_src_class = (GstPushSrcClass *) klass;
109
110         gobject_class->set_property = gst_dreamaudiosource_set_property;
111         gobject_class->get_property = gst_dreamaudiosource_get_property;
112         gobject_class->dispose = gst_dreamaudiosource_dispose;
113
114         gst_element_class_add_pad_template (gstelement_class,
115                                             gst_static_pad_template_get (&srctemplate));
116
117         gst_element_class_set_static_metadata (gstelement_class,
118             "Dream Audio source", "Source/Audio",
119             "Provide an audio elementary stream from Dreambox encoder device",
120             "Andreas Frisch <fraxinas@opendreambox.org>");
121
122         gstelement_class->change_state = gst_dreamaudiosource_change_state;
123
124         gstbasesrc_class->get_caps = gst_dreamaudiosource_getcaps;
125         gstbasesrc_class->start = gst_dreamaudiosource_start;
126         gstbasesrc_class->stop = gst_dreamaudiosource_stop;
127         gstbasesrc_class->unlock = gst_dreamaudiosource_unlock;
128         gstbasesrc_class->unlock_stop = gst_dreamaudiosource_unlock_stop;
129
130         gstpush_src_class->create = gst_dreamaudiosource_create;
131
132 #ifdef PROVIDE_CLOCK
133         gstelement_class->provide_clock = GST_DEBUG_FUNCPTR (gst_dreamaudiosource_provide_clock);
134 //      g_type_class_ref (GST_TYPE_SYSTEM_CLOCK);
135 #endif
136
137         g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_BITRATE,
138           g_param_spec_int ("bitrate", "Bitrate (kb/s)",
139             "Bitrate in kbit/sec", 16, 320, DEFAULT_BITRATE,
140             G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
141
142         g_object_class_install_property (gobject_class, ARG_INPUT_MODE,
143           g_param_spec_enum ("input-mode", "Input Mode",
144             "Select the input source of the audio stream",
145             GST_TYPE_DREAMAUDIOSOURCE_INPUT_MODE, DEFAULT_INPUT_MODE,
146             G_PARAM_CONSTRUCT | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
147
148         gst_dreamaudiosource_signals[SIGNAL_GET_BASE_PTS] =
149                 g_signal_new ("get-base-pts",
150                 G_TYPE_FROM_CLASS (klass),
151                 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
152                 G_STRUCT_OFFSET (GstDreamAudioSourceClass, get_base_pts),
153                 NULL, NULL, gst_dreamsource_marshal_INT64__VOID, G_TYPE_INT64, 0);
154
155         klass->get_base_pts = gst_dreamaudiosource_get_base_pts;
156
157 }
158
159 static gint64
160 gst_dreamaudiosource_get_base_pts (GstDreamAudioSource *self)
161 {
162         GST_DEBUG_OBJECT (self, "gst_dreamaudiosource_get_base_pts %" GST_TIME_FORMAT"", GST_TIME_ARGS (self->base_pts) );
163         return self->base_pts;
164 }
165
166 static void gst_dreamaudiosource_set_bitrate (GstDreamAudioSource * self, uint32_t bitrate)
167 {
168         if (!self->encoder || !self->encoder->fd)
169                 return;
170         uint32_t abr = bitrate*1000;
171         int ret = ioctl(self->encoder->fd, AENC_SET_BITRATE, &abr);
172         if (ret != 0)
173         {
174                 GST_WARNING_OBJECT (self, "can't set audio bitrate to %i bytes/s!", abr);
175                 return;
176         }
177         GST_INFO_OBJECT (self, "set audio bitrate to %i kBytes/s", bitrate);
178         self->audio_info.bitrate = abr;
179 }
180
181 void gst_dreamaudiosource_set_input_mode (GstDreamAudioSource *self, GstDreamAudioSourceInputMode mode)
182 {
183         g_return_if_fail (GST_IS_DREAMAUDIOSOURCE (self));
184         GEnumValue *val = g_enum_get_value (G_ENUM_CLASS (g_type_class_ref (GST_TYPE_DREAMAUDIOSOURCE_INPUT_MODE)), mode);
185         if (!val)
186         {
187                 GST_ERROR_OBJECT (self, "no such input_mode %i!", mode);
188                 goto out;
189         }
190         const gchar *value_nick = val->value_nick;
191         GST_DEBUG_OBJECT (self, "setting input_mode to %s (%i)...", value_nick, mode);
192         g_mutex_lock (&self->mutex);
193         if (!self->encoder || !self->encoder->fd)
194         {
195                 GST_ERROR_OBJECT (self, "can't set input mode because encoder device not opened!");
196                 goto out;
197         }
198         int int_mode = mode;
199         int ret = ioctl(self->encoder->fd, AENC_SET_SOURCE, &int_mode);
200         if (ret != 0)
201         {
202                 GST_WARNING_OBJECT (self, "can't set input mode to %s (%i) error: %s", value_nick, mode, strerror(errno));
203                 goto out;
204         }
205         GST_INFO_OBJECT (self, "successfully set input mode to %s (%i)", value_nick, mode);
206         self->input_mode = mode;
207 out:
208         g_mutex_unlock (&self->mutex);
209         return;
210 }
211
212 GstDreamAudioSourceInputMode gst_dreamaudiosource_get_input_mode (GstDreamAudioSource *self)
213 {
214         GstDreamAudioSourceInputMode result;
215         g_return_val_if_fail (GST_IS_DREAMAUDIOSOURCE (self), -1);
216         GST_OBJECT_LOCK (self);
217         result =self->input_mode;
218         GST_OBJECT_UNLOCK (self);
219         return result;
220 }
221
222 gboolean
223 gst_dreamaudiosource_plugin_init (GstPlugin *plugin)
224 {
225         GST_DEBUG_CATEGORY_INIT (dreamaudiosource_debug, "dreamaudiosource", 0, "dreamaudiosource");
226         return gst_element_register (plugin, "dreamaudiosource", GST_RANK_PRIMARY, GST_TYPE_DREAMAUDIOSOURCE);
227 }
228
229 static void
230 gst_dreamaudiosource_init (GstDreamAudioSource * self)
231 {
232         self->encoder = NULL;
233         self->descriptors_available = 0;
234         self->input_mode = DEFAULT_INPUT_MODE;
235
236         self->buffer_size = DEFAULT_BUFFER_SIZE;
237         g_queue_init (&self->current_frames);
238         self->readthread = NULL;
239
240         g_mutex_init (&self->mutex);
241         READ_SOCKET (self) = -1;
242         WRITE_SOCKET (self) = -1;
243
244         gst_base_src_set_format (GST_BASE_SRC (self), GST_FORMAT_TIME);
245         gst_base_src_set_live (GST_BASE_SRC (self), TRUE);
246
247         self->encoder = malloc(sizeof(EncoderInfo));
248
249         if(!self->encoder) {
250                 GST_ERROR_OBJECT(self,"out of space");
251                 return;
252         }
253
254         char fn_buf[32];
255         sprintf(fn_buf, "/dev/aenc%d", 0);
256         self->encoder->fd = open(fn_buf, O_RDWR | O_SYNC);
257         if(self->encoder->fd <= 0) {
258                 GST_ERROR_OBJECT(self,"cannot open device %s (%s)", fn_buf, strerror(errno));
259                 free(self->encoder);
260                 self->encoder = NULL;
261                 return;
262         }
263
264         int flags = fcntl(self->encoder->fd, F_GETFL, 0);
265         fcntl(self->encoder->fd, F_SETFL, flags | O_NONBLOCK);
266
267         self->encoder->buffer = malloc(ABUFSIZE);
268         if(!self->encoder->buffer) {
269                 GST_ERROR_OBJECT(self,"cannot alloc buffer");
270                 return;
271         }
272
273         self->encoder->cdb = (unsigned char *)mmap(0, AMMAPSIZE, PROT_READ, MAP_PRIVATE, self->encoder->fd, 0);
274         if(!self->encoder->cdb || self->encoder->cdb == MAP_FAILED) {
275                 GST_ERROR_OBJECT(self,"cannot mmap cdb: %s (%d)", strerror(errno));
276                 self->encoder->cdb = NULL;
277                 return;
278         }
279
280 #ifdef PROVIDE_CLOCK
281         self->encoder_clock = gst_dreamsource_clock_new ("GstDreamAudioSourceClock", self->encoder->fd);
282         GST_OBJECT_FLAG_SET (self, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
283 #endif
284
285 #ifdef dump
286         self->dumpfd = open("/media/hdd/movie/dreamaudiosource.dump", O_WRONLY | O_CREAT | O_TRUNC);
287         GST_DEBUG_OBJECT (self, "dumpfd = %i (%s)", self->dumpfd, (self->dumpfd > 0) ? "OK" : strerror(errno));
288 #endif
289 }
290
291 static void
292 gst_dreamaudiosource_set_property (GObject * object, guint prop_id, const GValue * value, GParamSpec * pspec)
293 {
294         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (object);
295
296         switch (prop_id) {
297                 case ARG_BITRATE:
298                         gst_dreamaudiosource_set_bitrate(self, g_value_get_int (value));
299                         break;
300                 case ARG_INPUT_MODE:
301                              gst_dreamaudiosource_set_input_mode (self, g_value_get_enum (value));
302                         break;
303                 default:
304                         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
305                         break;
306         }
307 }
308
309 static void
310 gst_dreamaudiosource_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec)
311 {
312         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (object);
313
314         switch (prop_id) {
315                 case ARG_BITRATE:
316                         g_value_set_int (value, self->audio_info.bitrate/1000);
317                         break;
318                 case ARG_INPUT_MODE:
319                         g_value_set_enum (value, gst_dreamaudiosource_get_input_mode (self));
320                         break;
321                 default:
322                         G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
323                         break;
324         }
325 }
326
327 static GstCaps *
328 gst_dreamaudiosource_getcaps (GstBaseSrc * bsrc, GstCaps * filter)
329 {
330         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
331         GstPadTemplate *pad_template;
332         GstCaps *caps;
333
334         pad_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS(self), "src");
335         g_return_val_if_fail (pad_template != NULL, NULL);
336
337         if (self->encoder == NULL) {
338                 GST_DEBUG_OBJECT (self, "encoder not opened -> use template caps");
339                 caps = gst_pad_template_get_caps (pad_template);
340         }
341         else
342         {
343                 caps = gst_caps_make_writable(gst_pad_template_get_caps (pad_template));
344         }
345
346         GST_DEBUG_OBJECT (self, "return caps %" GST_PTR_FORMAT, caps);
347         return caps;
348 }
349
350 static gboolean gst_dreamaudiosource_unlock (GstBaseSrc * bsrc)
351 {
352         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
353         GST_DEBUG_OBJECT (self, "stop creating buffers");
354         g_mutex_lock (&self->mutex);
355         self->flushing = TRUE;
356         GST_DEBUG_OBJECT (self, "set flushing TRUE");
357         g_cond_signal (&self->cond);
358         g_mutex_unlock (&self->mutex);
359         return TRUE;
360 }
361
362 static gboolean gst_dreamaudiosource_unlock_stop (GstBaseSrc * bsrc)
363 {
364         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
365         GST_DEBUG_OBJECT (self, "start creating buffers...");
366         g_mutex_lock (&self->mutex);
367         self->flushing = FALSE;
368         g_queue_foreach (&self->current_frames, (GFunc) gst_buffer_unref, NULL);
369         g_queue_clear (&self->current_frames);
370         g_mutex_unlock (&self->mutex);
371         return TRUE;
372 }
373
374 static void gst_dreamaudiosource_read_thread_func (GstDreamAudioSource * self)
375 {
376         EncoderInfo *enc = self->encoder;
377         GstBuffer *readbuf;
378
379         if (!enc) {
380                 GST_WARNING_OBJECT (self, "encoder device not opened!");
381                 return;
382         }
383
384         GST_DEBUG_OBJECT (self, "enter read thread");
385
386         GstMessage *message;
387         GValue val = { 0 };
388
389         message = gst_message_new_stream_status (GST_OBJECT_CAST (self), GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT_CAST (GST_OBJECT_PARENT(self)));
390         g_value_init (&val, GST_TYPE_G_THREAD);
391         g_value_set_boxed (&val, self->readthread);
392         gst_message_set_stream_status_object (message, &val);
393         g_value_unset (&val);
394         GST_DEBUG_OBJECT (self, "posting ENTER stream status");
395         gst_element_post_message (GST_ELEMENT_CAST (self), message);
396
397         while (TRUE) {
398                 readbuf = NULL;
399                 {
400                         struct pollfd rfd[2];
401                         int timeout, nfds;
402
403                         rfd[0].fd = READ_SOCKET (self);
404                         rfd[0].events = POLLIN | POLLERR | POLLHUP | POLLPRI;
405
406                         if (self->descriptors_available == 0)
407                         {
408                                 rfd[1].fd = enc->fd;
409                                 rfd[1].events = POLLIN;
410                                 self->descriptors_count = 0;
411                                 timeout = 200;
412                                 nfds = 2;
413                         }
414                         else
415                         {
416                                 rfd[1].revents = 0;
417                                 nfds = 1;
418                                 timeout = 0;
419                         }
420
421                         int ret = poll(rfd, nfds, timeout);
422
423                         if (G_UNLIKELY (ret == -1))
424                         {
425                                 GST_ERROR_OBJECT (self, "SELECT ERROR!");
426                                 goto stop_running;
427                         }
428                         else if ( ret == 0 && self->descriptors_available == 0 )
429                         {
430                                 g_mutex_lock (&self->mutex);
431                                 gst_clock_get_time(self->encoder_clock);
432                                 if (self->flushing)
433                                 {
434                                         GST_DEBUG_OBJECT (self, "FLUSHING!");
435                                         g_cond_signal (&self->cond);
436                                         continue;
437                                 }
438                                 g_mutex_unlock (&self->mutex);
439                                 GST_DEBUG_OBJECT (self, "SELECT TIMEOUT");
440                                 //!!! TODO generate valid dummy payload
441                                 readbuf = gst_buffer_new();
442                         }
443                         else if ( rfd[0].revents )
444                         {
445                                 char command;
446                                 READ_COMMAND (self, command, ret);
447                                 if (command == CONTROL_STOP)
448                                 {
449                                         GST_DEBUG_OBJECT (self, "CONTROL_STOP!");
450                                         goto stop_running;
451                                 }
452                         }
453                         else if ( G_LIKELY(rfd[1].revents & POLLIN) )
454                         {
455                                 int rlen = read(enc->fd, enc->buffer, ABUFSIZE);
456                                 if (rlen <= 0 || rlen % ABDSIZE ) {
457                                         if ( errno == 512 )
458                                                 goto stop_running;
459                                         GST_WARNING_OBJECT (self, "read error %s (%i)", strerror(errno), errno);
460                                         goto stop_running;
461                                 }
462                                 self->descriptors_available = rlen / ABDSIZE;
463                                 GST_LOG_OBJECT (self, "encoder buffer was empty, %d descriptors available", self->descriptors_available);
464                         }
465                 }
466
467                 while (self->descriptors_count < self->descriptors_available)
468                 {
469                         off_t offset = self->descriptors_count * ABDSIZE;
470                         AudioBufferDescriptor *desc = (AudioBufferDescriptor*)(&enc->buffer[offset]);
471
472                         uint32_t f = desc->stCommon.uiFlags;
473
474                         GST_LOG_OBJECT (self, "descriptors_count=%d, descriptors_available=%d\tuiOffset=%d, uiLength=%d", self->descriptors_count, self->descriptors_available, desc->stCommon.uiOffset, desc->stCommon.uiLength);
475
476                         if (G_UNLIKELY (f & CDB_FLAG_METADATA))
477                         {
478                                 GST_LOG_OBJECT (self, "CDB_FLAG_METADATA... skip outdated packet");
479                                 self->descriptors_count = self->descriptors_available;
480                                 continue;
481                         }
482
483                         readbuf = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, enc->cdb, AMMAPSIZE, desc->stCommon.uiOffset, desc->stCommon.uiLength, self, NULL);
484
485                         GstClockTime buffer_pts = GST_CLOCK_TIME_NONE;
486
487                         // uiDTS since kernel driver booted
488                         if (f & CDB_FLAG_PTS_VALID)
489                         {
490                                 buffer_pts = MPEGTIME_TO_GSTTIME(desc->stCommon.uiPTS);
491                                 GST_LOG_OBJECT (self, "f & CDB_FLAG_PTS_VALID && encoder's uiPTS=%" GST_TIME_FORMAT"", GST_TIME_ARGS(buffer_pts));
492
493                                 g_mutex_lock (&self->mutex);
494                                 if (G_UNLIKELY (self->base_pts == GST_CLOCK_TIME_NONE))
495                                 {
496                                         if (self->dreamvideosrc)
497                                         {
498                                                 guint64 videosource_base_pts;
499                                                 g_signal_emit_by_name(self->dreamvideosrc, "get-base-pts", &videosource_base_pts);
500                                                 if (videosource_base_pts != GST_CLOCK_TIME_NONE)
501                                                 {
502                                                         GST_DEBUG_OBJECT (self, "use DREAMVIDEOSOURCE's base_pts=%" GST_TIME_FORMAT "", GST_TIME_ARGS (videosource_base_pts) );
503                                                         self->base_pts = videosource_base_pts;
504                                                 }
505                                         }
506                                         if (self->base_pts == GST_CLOCK_TIME_NONE)
507                                         {
508                                                 self->base_pts = buffer_pts;
509                                                 GST_DEBUG_OBJECT (self, "use mpeg stream pts as base_pts=%" GST_TIME_FORMAT" (%lld)", GST_TIME_ARGS (self->base_pts), desc->stCommon.uiPTS);
510                                         }
511                                 }
512                                 g_mutex_unlock (&self->mutex);
513
514                                 if (self->base_pts != GST_CLOCK_TIME_NONE && buffer_pts >= self->base_pts )
515                                 {
516                                         buffer_pts -= self->base_pts;
517                                         GST_BUFFER_PTS(readbuf) = buffer_pts;
518                                         GST_BUFFER_DTS(readbuf) = buffer_pts;
519                                         GST_LOG_OBJECT (self, "corrected pts+dts=%" GST_TIME_FORMAT "", GST_TIME_ARGS (GST_BUFFER_PTS(readbuf)));
520                                 }
521                         }
522
523                         if (G_UNLIKELY (self->base_pts == GST_CLOCK_TIME_NONE))
524                         {
525                                 GST_DEBUG_OBJECT (self, "self->base_pts == GST_CLOCK_TIME_NONE! skip this frame");
526                                 self->descriptors_count++;
527                                 break;
528                         }
529
530 #ifdef dump
531                         int wret = write(self->dumpfd, (unsigned char*)(enc->cdb + desc->stCommon.uiOffset), desc->stCommon.uiLength);
532                         GST_LOG_OBJECT (self, "read %i dumped %i total %" G_GSIZE_FORMAT " ", desc->stCommon.uiLength, wret, gst_buffer_get_size (*outbuf) );
533 #endif
534                         self->descriptors_count++;
535                         break;
536                 }
537
538                 if (self->descriptors_count == self->descriptors_available)
539                 {
540                         GST_LOG_OBJECT (self, "self->descriptors_count == self->descriptors_available -> release %i consumed descriptors", self->descriptors_count);
541                         /* release consumed descs */
542                         if (write(enc->fd, &self->descriptors_count, sizeof(self->descriptors_count)) != sizeof(self->descriptors_count)) {
543                                 GST_WARNING_OBJECT (self, "release consumed descs write error!");
544                                 goto stop_running;
545                         }
546                         self->descriptors_available = 0;
547                 }
548
549                 if (readbuf)
550                 {
551                         g_mutex_lock (&self->mutex);
552                         if (!self->flushing)
553                         {
554                                 while (g_queue_get_length (&self->current_frames) >= self->buffer_size)
555                                 {
556                                         GstBuffer * oldbuf = g_queue_pop_head (&self->current_frames);
557                                         GST_WARNING_OBJECT (self, "dropping %" GST_PTR_FORMAT " because of queue overflow! buffers count=%i", oldbuf, g_queue_get_length (&self->current_frames));
558                                         gst_buffer_unref(oldbuf);
559                                 }
560                                 g_queue_push_tail (&self->current_frames, readbuf);
561                         }
562                         g_cond_signal (&self->cond);
563                         GST_INFO_OBJECT (self, "read %" GST_PTR_FORMAT " to queue", readbuf );
564                         g_mutex_unlock (&self->mutex);
565                 }
566         }
567
568         g_assert_not_reached ();
569         return;
570
571         stop_running:
572         {
573                 g_mutex_unlock (&self->mutex);
574                 g_cond_signal (&self->cond);
575                 GST_DEBUG ("stop running, exit thread");
576                 message = gst_message_new_stream_status (GST_OBJECT_CAST (self), GST_STREAM_STATUS_TYPE_ENTER, GST_ELEMENT_CAST (GST_OBJECT_PARENT(self)));
577                 g_value_init (&val, GST_TYPE_G_THREAD);
578                 g_value_set_boxed (&val, self->readthread);
579                 gst_message_set_stream_status_object (message, &val);
580                 g_value_unset (&val);
581                 GST_DEBUG_OBJECT (self, "posting LEAVE stream status");
582                 gst_element_post_message (GST_ELEMENT_CAST (self), message);
583                 return;
584         }
585 }
586
587 static GstFlowReturn
588 gst_dreamaudiosource_create (GstPushSrc * psrc, GstBuffer ** outbuf)
589 {
590         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (psrc);
591
592         GST_LOG_OBJECT (self, "new buffer requested");
593
594         g_mutex_lock (&self->mutex);
595         while (g_queue_is_empty (&self->current_frames) && !self->flushing)
596         {
597                 g_cond_wait (&self->cond, &self->mutex);
598                 GST_INFO_OBJECT (self, "waiting for buffer from encoder");
599         }
600
601         *outbuf = g_queue_pop_head (&self->current_frames);
602         g_mutex_unlock (&self->mutex);
603
604         if (*outbuf)
605         {
606                 GST_INFO_OBJECT (self, "pushing %" GST_PTR_FORMAT "", *outbuf );
607                 return GST_FLOW_OK;
608         }
609         GST_INFO_OBJECT (self, "FLUSHING");
610         return GST_FLOW_FLUSHING;
611 }
612
613 static GstStateChangeReturn gst_dreamaudiosource_change_state (GstElement * element, GstStateChange transition)
614 {
615         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (element);
616         GstStateChangeReturn sret = GST_STATE_CHANGE_SUCCESS;
617         int ret;
618
619         switch (transition) {
620                 case GST_STATE_CHANGE_NULL_TO_READY:
621                 {
622                         if (!(self->encoder && self->encoder->cdb))
623                                 return GST_STATE_CHANGE_FAILURE;
624                         int control_sock[2];
625                         if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
626                         {
627                                 GST_ELEMENT_ERROR (self, RESOURCE, OPEN_READ_WRITE, (NULL), GST_ERROR_SYSTEM);
628                                 return GST_STATE_CHANGE_FAILURE;
629                         }
630                         READ_SOCKET (self) = control_sock[0];
631                         WRITE_SOCKET (self) = control_sock[1];
632                         fcntl (READ_SOCKET (self), F_SETFL, O_NONBLOCK);
633                         fcntl (WRITE_SOCKET (self), F_SETFL, O_NONBLOCK);
634                         GST_DEBUG_OBJECT (self, "GST_STATE_CHANGE_NULL_TO_READY");
635                         break;
636                 }
637                 case GST_STATE_CHANGE_READY_TO_PAUSED:
638                         GST_LOG_OBJECT (self, "GST_STATE_CHANGE_READY_TO_PAUSED");
639 #ifdef PROVIDE_CLOCK
640                         gst_element_post_message (element, gst_message_new_clock_provide (GST_OBJECT_CAST (element), self->encoder_clock, TRUE));
641 #endif
642                         self->flushing = TRUE;
643                         self->readthread = g_thread_try_new ("dreamaudiosrc-read", (GThreadFunc) gst_dreamaudiosource_read_thread_func, self, NULL);
644                         GST_DEBUG_OBJECT (self, "started readthread @%p", self->readthread );
645                         break;
646                 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
647                         g_mutex_lock (&self->mutex);
648                         GST_LOG_OBJECT (self, "GST_STATE_CHANGE_PAUSED_TO_PLAYING");
649                         self->base_pts = GST_CLOCK_TIME_NONE;
650                         ret = ioctl(self->encoder->fd, AENC_START);
651                         if ( ret != 0 )
652                                 goto fail;
653                         self->descriptors_available = 0;
654                         CLEAR_COMMAND (self);
655                         GST_INFO_OBJECT (self, "started encoder!");
656                         g_mutex_unlock (&self->mutex);
657                         break;
658                 default:
659                         break;
660         }
661
662         if (GST_ELEMENT_CLASS (parent_class)->change_state)
663                 sret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
664
665         switch (transition) {
666                 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
667                         g_mutex_lock (&self->mutex);
668                         GST_DEBUG_OBJECT (self, "GST_STATE_CHANGE_PLAYING_TO_PAUSED self->descriptors_count=%i self->descriptors_available=%i", self->descriptors_count, self->descriptors_available);
669                         while (self->descriptors_count < self->descriptors_available)
670                         {
671                                 GST_LOG_OBJECT (self, "flushing self->descriptors_count=%i");
672                                 self->descriptors_count++;
673                         }
674                         if (self->descriptors_count)
675                                 write(self->encoder->fd, &self->descriptors_count, sizeof(self->descriptors_count));
676                         ret = ioctl(self->encoder->fd, AENC_STOP);
677                         if ( ret != 0 )
678                                 goto fail;
679 #ifdef PROVIDE_CLOCK
680                         gst_clock_set_master (self->encoder_clock, NULL);
681 #endif
682                         GST_INFO_OBJECT (self, "stopped encoder!");
683                         g_mutex_unlock (&self->mutex);
684                         break;
685                 case GST_STATE_CHANGE_PAUSED_TO_READY:
686                         GST_DEBUG_OBJECT (self,"GST_STATE_CHANGE_PAUSED_TO_READY");
687 #ifdef PROVIDE_CLOCK
688                         gst_element_post_message (element, gst_message_new_clock_lost (GST_OBJECT_CAST (element), self->encoder_clock));
689 #endif
690                         GST_DEBUG_OBJECT (self, "stopping readthread @%p...", self->readthread);
691                         SEND_COMMAND (self, CONTROL_STOP);
692                         g_thread_join (self->readthread);
693                         break;
694                 case GST_STATE_CHANGE_READY_TO_NULL:
695                         close (READ_SOCKET (self));
696                         close (WRITE_SOCKET (self));
697                         READ_SOCKET (self) = -1;
698                         WRITE_SOCKET (self) = -1;
699                         GST_DEBUG_OBJECT (self,"GST_STATE_CHANGE_READY_TO_NULL, close control sockets");
700                         break;
701                 default:
702                         break;
703         }
704
705         return sret;
706 fail:
707         GST_ERROR_OBJECT(self,"can't perform encoder ioctl!");
708         g_mutex_unlock (&self->mutex);
709         return GST_STATE_CHANGE_FAILURE;
710 }
711
712 static gboolean
713 gst_dreamaudiosource_start (GstBaseSrc * bsrc)
714 {
715         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
716         self->dreamvideosrc = gst_bin_get_by_name_recurse_up(GST_BIN(GST_ELEMENT_PARENT(self)), "dreamvideosource0");
717         GST_DEBUG_OBJECT (self, "started. reference to dreamvideosource=%" GST_PTR_FORMAT "", self->dreamvideosrc);
718         return TRUE;
719 }
720
721 static gboolean
722 gst_dreamaudiosource_stop (GstBaseSrc * bsrc)
723 {
724         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (bsrc);
725         if (self->dreamvideosrc)
726                 gst_object_unref(self->dreamvideosrc);
727         GST_DEBUG_OBJECT (self, "stop");
728         return TRUE;
729 }
730
731 static void
732 gst_dreamaudiosource_dispose (GObject * gobject)
733 {
734         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (gobject);
735         if (self->encoder) {
736                 if (self->encoder->buffer)
737                         free(self->encoder->buffer);
738                 if (self->encoder->cdb)
739                         munmap(self->encoder->cdb, AMMAPSIZE);
740                 if (self->encoder->fd)
741                         close(self->encoder->fd);
742                 free(self->encoder);
743         }
744 #ifdef PROVIDE_CLOCK
745         if (self->encoder_clock) {
746                 gst_object_unref (self->encoder_clock);
747                 self->encoder_clock = NULL;
748         }
749 #endif
750 #ifdef dump
751         close(self->dumpfd);
752 #endif
753         g_mutex_clear (&self->mutex);
754         g_cond_clear (&self->cond);
755         GST_DEBUG_OBJECT (self, "disposed");
756         G_OBJECT_CLASS (parent_class)->dispose (gobject);
757 }
758
759 #ifdef PROVIDE_CLOCK
760 static GstClock *gst_dreamaudiosource_provide_clock (GstElement * element)
761 {
762         GstDreamAudioSource *self = GST_DREAMAUDIOSOURCE (element);
763
764         if (!self->encoder || self->encoder->fd < 0)
765         {
766                 GST_DEBUG_OBJECT (self, "encoder device not started, can't provide clock!");
767                 return NULL;
768         }
769
770         return GST_CLOCK_CAST (gst_object_ref (self->encoder_clock));
771 }
772
773 #endif
774