Yuan-Ru-Lin opened a new issue, #580:
URL: https://github.com/apache/arrow-julia/issues/580

   I found that `Arrow.Stream` does not work with non-seekable I/O, which 
should be supported for streaming. Here are some MWEs.
   
   ### Named pipes
   
   ```bash
   mkfifo /tmp/arrow_pipe
   
   # Producer
   julia -e '
   using Arrow
   open("/tmp/arrow_pipe", "w") do io
       Arrow.write(io, (i = collect(1:10),); file=false)  # streaming
   end
   ' &
   
   # Consumer
   julia -e '
   using Arrow
   open("/tmp/arrow_pipe", "r") do io
       for batch in Arrow.Stream(io)
           println(batch)
       end
   end
   '
   # Result: no output, no error
   
   rm /tmp/arrow_pipe
   ```
   
   ### Sockets
   
   ```julia
   using Arrow, Sockets
   
   server = listen(9999)
   @async begin
       conn = accept(server)
       Arrow.write(conn, (i = collect(1:10),); file=false)
       close(conn)
   end
   
   sock = connect(9999)
   
   # This block hangs indefinitely. Press Ctrl-C to proceed.
   for batch in Arrow.Stream(sock)
       println(batch)
   end
   
   wait(t)
   # ERROR: TaskFailedException
   #     nested task error: MethodError: no method matching 
position(::TCPSocket)
   ```
   
   ### Unix domain sockets
   
   ```julia
   using Arrow, Sockets
   
   server = listen("/tmp/arrow.sock")
   @async begin
       conn = accept(server)
       Arrow.write(conn, (i = collect(1:10),); file=false)
       close(conn)
   end
   
   sock = connect("/tmp/arrow.sock")
   
   # This block hangs indefinitely. Press Ctrl-C to proceed.
   for batch in Arrow.Stream(sock)
       println(batch)
   end
   
   wait(t)
   # ERROR: TaskFailedException
   #     nested task error: MethodError: no method matching 
position(::Base.PipeEndpoint)
   ```
   
   The above demo should be reproducible with Arrow v2.8.0.
   
   ## Diagnosis
   
   The first issue is caused `tobytes(io::IOStream)` unconditionally using 
`Mmap.mmap(io)`. For named pipes, `filesize(io)` returns `0`, so 
`Mmap.mmap(io)` silently returns an empty `UInt8[]`.
   
   The rest is caused by `Base.write(io, msg, ...)` calling `position(io)` to 
record block positions for the file format footer, even when writing streaming 
format (`file=false`).
   
   I will submit a PR soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to