On 2016-11-28 05:01, Ahmed S. Darwish wrote:
Current pacat code reads whatever available from STDIN and writes
it directly to the playback stream. A minimal buffer is created
for each read operation; no further reads are then allowed unless
earlier read buffer has been fully consumed by a stream write.

While quite simple, this model breaks upon the new requirements of
writing only frame-aligned data to the stream (commits #1 and #2).
The kernel read syscall can return a length much smaller than the
frame-aligned size requested, leading to invalid unaligned writes.

Sorry, but I don't understand why you need a ringbuffer to resolve this?

And now that I think of it, I'm not sure that the pa_xmalloc (used to allocated the ringbuffer's memory) it guaranteed to be aligned either, it just happens to be that way in practice.

Wouldn't it be better if we had something like:

 1) Call pa_stream_begin_write to get a buffer.
2) If we have half a frame from previous iteration at 4), put that half frame first in the buffer. 3) Call read / pa_read to read data from a file into the rest of the buffer. 4) If the last frame read is not complete, store that half frame in a local variable.
 5) Call pa_stream_write with the number of complete frames.
 6) Repeat from 1).



This can easily be reproduced by choosing a starved STDIN backend:

  pacat /dev/random    pa_stream_write() failed: EINVAL
  echo 1234 | pacat    pa_stream_write() failed: EINVAL

So guard against incomplete kernel reads by using a ringbuffer.
Meanwhile leave the simple recording mode buffering as is: it just
writes to plain STDOUT without any special needs.

CommitReference #1: 22827a5e1e62
CommitReference #2: 150ace90f380
BugLink: https://bugs.freedesktop.org/show_bug.cgi?id=98475
BugLink: https://bugs.freedesktop.org/show_bug.cgi?id=77595
Signed-off-by: Ahmed S. Darwish <[email protected]>
---
 src/utils/pacat.c | 89 +++++++++++++++++++++++++++++++++++--------------------
 1 file changed, 57 insertions(+), 32 deletions(-)

diff --git a/src/utils/pacat.c b/src/utils/pacat.c
index 68362ec..39c002e 100644
--- a/src/utils/pacat.c
+++ b/src/utils/pacat.c
@@ -38,10 +38,12 @@
 #include <pulse/pulseaudio.h>
 #include <pulse/rtclock.h>

+#include <pulsecore/atomic.h>
 #include <pulsecore/core-util.h>
 #include <pulsecore/i18n.h>
 #include <pulsecore/log.h>
 #include <pulsecore/macro.h>
+#include <pulsecore/ringbuffer.h>
 #include <pulsecore/sndfile-util.h>
 #include <pulsecore/sample-util.h>

@@ -56,6 +58,11 @@ static pa_context *context = NULL;
 static pa_stream *stream = NULL;
 static pa_mainloop_api *mainloop_api = NULL;

+/* Ring buffer for playback mode */
+static pa_ringbuffer ringbuffer = { 0, };
+static pa_atomic_t rb_count = PA_ATOMIC_INIT(0);
+
+/* Flat buffer for recording mode */
 static void *buffer = NULL;
 static size_t buffer_length = 0, buffer_index = 0;

@@ -155,29 +162,35 @@ static void start_drain(void) {

 /* Write some data to the stream */
 static void do_stream_write(size_t length) {
-    size_t l;
-    pa_assert(length);
+    int rb_freelen;
+    void *buf;
+    size_t w;

-    if (!buffer || !buffer_length)
+    if (!ringbuffer.memory)
         return;

-    l = length;
-    if (l > buffer_length)
-        l = buffer_length;
+    while (length > 0) {

-    if (pa_stream_write(stream, (uint8_t*) buffer + buffer_index, l, NULL, 0, 
PA_SEEK_RELATIVE) < 0) {
-        pa_log(_("pa_stream_write() failed: %s"), 
pa_strerror(pa_context_errno(context)));
-        quit(1);
-        return;
-    }
+        buf = pa_ringbuffer_peek(&ringbuffer, &rb_freelen);

-    buffer_length -= l;
-    buffer_index += l;
+        /* Frame-align our audio packets or the server will happily ignore
+         * them upon arrival. Also only write as much data as requested: do
+         * not overflow the stream by emptying the whole ringbuffer. */
+        w = PA_MIN(pa_frame_align(length, &sample_spec),
+                   pa_frame_align(rb_freelen, &sample_spec));
+        if (!w)
+            break;

-    if (!buffer_length) {
-        pa_xfree(buffer);
-        buffer = NULL;
-        buffer_index = buffer_length = 0;
+        if (pa_stream_write(stream, buf, w, NULL, 0, PA_SEEK_RELATIVE) < 0) {
+            pa_log(_("pa_stream_write() failed: %s"), 
pa_strerror(pa_context_errno(context)));
+            quit(1);
+            return;
+        }
+
+        pa_ringbuffer_drop(&ringbuffer, w);
+
+        pa_assert(w <= length);
+        length -= w;
     }
 }

@@ -192,9 +205,6 @@ static void stream_write_callback(pa_stream *s, size_t 
length, void *userdata) {
         if (stdio_event)
             mainloop_api->io_enable(stdio_event, PA_IO_EVENT_INPUT);

-        if (!buffer)
-            return;
-
         do_stream_write(length);

     } else {
@@ -540,25 +550,42 @@ fail:

 /* New data on STDIN **/
 static void stdin_callback(pa_mainloop_api*a, pa_io_event *e, int fd, 
pa_io_event_flags_t f, void *userdata) {
-    size_t l, w = 0;
-    ssize_t r;
-    bool stream_not_ready;
+    int rb_freelen;
+    void *buf;
+    size_t writable, r;

     pa_assert(a == mainloop_api);
     pa_assert(e);
     pa_assert(stdio_event == e);

-    stream_not_ready = !stream || pa_stream_get_state(stream) != 
PA_STREAM_READY ||
-                        !(l = w = pa_stream_writable_size(stream));
+    /* Stream not ready? */
+    if (!stream || pa_stream_get_state(stream) != PA_STREAM_READY ||
+        !(writable = pa_stream_writable_size(stream))) {

-    if (buffer || stream_not_ready) {
         mainloop_api->io_enable(stdio_event, PA_IO_EVENT_NULL);
         return;
     }

-    buffer = pa_xmalloc(l);
+    /* Ring APIs forces us to use integers: ensure safe casting */
+    writable = PA_MIN(writable, (size_t)INT_MAX);
+
+    if (!ringbuffer.memory) {
+        size_t rb_size = PA_MAX(writable, 4096 * pa_frame_size(&sample_spec));
+        ringbuffer.memory = pa_xmalloc(rb_size);
+        ringbuffer.capacity = rb_size;
+        ringbuffer.count = &rb_count;
+    }
+
+    buf = pa_ringbuffer_begin_write(&ringbuffer, &rb_freelen);
+
+    if (!rb_freelen) {
+        do_stream_write(writable);
+        return;
+    }

-    if ((r = pa_read(fd, buffer, l, userdata)) <= 0) {
+    /* Avoid underruns due to a too-long read(): read only what the
+     * stream requested and don't try to fill the whole ringbuffer. */
+    if ((r = pa_read(fd, buf, PA_MIN((int) writable, rb_freelen), userdata)) 
<= 0) {
         if (r == 0) {
             if (verbose)
                 pa_log(_("Got EOF."));
@@ -575,11 +602,9 @@ static void stdin_callback(pa_mainloop_api*a, pa_io_event 
*e, int fd, pa_io_even
         return;
     }

-    buffer_length = (uint32_t) r;
-    buffer_index = 0;
+    pa_ringbuffer_end_write(&ringbuffer, r);

-    if (w)
-        do_stream_write(w);
+    do_stream_write(writable);
 }

 /* Some data may be written to STDOUT */

_______________________________________________
pulseaudio-discuss mailing list
[email protected]
https://lists.freedesktop.org/mailman/listinfo/pulseaudio-discuss

Reply via email to