ryanschneider opened a new issue, #451:
URL: https://github.com/apache/arrow-go/issues/451
### Describe the bug, including details regarding any error messages,
version, and platform.
I'm using `flight.StreamChunksFromReader` in my `DoGetStatement` statement
handler to serve chunks from a parquet file, more or less like so (err checks
removed, how I get the ReaderAtSeeker abstracted out)
```golang
func (s *flightServer) DoGetStatement(ctx context.Context, cmd
flightsql.StatementQueryTicket) (*arrow.Schema, <-chan flight.StreamChunk,
error) {
r := getParquetFile(...) // returns a ReaderAtSeeker
pqr, err := file.NewParquetReader(r,
file.WithReadProps(&parquet.ReaderProperties{
BufferedStreamEnabled: true,
}))
arr, err := pqarrow.NewFileReader(
pqr,
pqarrow.ArrowReadProperties{
Parallel: false,
BatchSize: batchSize,
},
memory.DefaultAllocator,
)
schema, err := arr.Schema()
rr, err := arr.GetRecordReader(ctx, nil, nil)
ch := make(chan flight.StreamChunk)
go streamChunksFromReader(rr, ch)
return schema, ch, nil
}
```
But this consistently returns `EOF` to the client _after_ sending all the
records, basically the reader returned by `client.DoGet()` fails on
`rdr.Read()` w/ `rpc error: code = Unknown desc = EOF` after reading the last
chunk, instead of returning `io.EOF` like it should.
I tracked this down to the trailing error check in
`flight.StreamChunksFromReader`:
https://github.com/apache/arrow-go/blob/main/arrow/flight/record_batch_reader.go#L235-L238
It seems like the `RecordReader` returned by
`pqarrow.FileReader.GetRecordReader` sets it's internal err state to `io.EOF`
after the last record is read. I was able to work around this using a slightly
modified local `streamChunksFromReader` implementation:
```golang
func streamChunksFromReader(rdr array.RecordReader, ch chan<-
flight.StreamChunk) {
defer close(ch)
defer func() {
if err := recover(); err != nil {
slog.Warn("panic while reading", slog.Any("err", err))
ch <- flight.StreamChunk{Err: fmt.Errorf("panic while
reading: %v", err)}
}
}()
defer rdr.Release()
for rdr.Next() {
rec := rdr.Record()
rec.Retain()
slog.Debug("record received", slog.Any("rows", rec.NumRows()),
slog.Any("cols", rec.NumCols()))
ch <- flight.StreamChunk{Data: rec}
if err := rdr.Err(); err != nil {
slog.Warn("error while reading", slog.Any("err", err))
ch <- flight.StreamChunk{Err: err}
}
}
if err := rdr.Err(); err != nil && err != io.EOF {
slog.Warn("error after reading", slog.Any("err", err))
ch <- flight.StreamChunk{Err: err}
}
}
```
The changes are two-fold:
- If `rdr.Err()` returns when `.Next()` returned true, always report that.
- If after `.Next()` returns false, only report non-EOF errors.
The added logging and other minors changes were just for pinpointing the
issue. I'm not sure if this is a bug in `pqarrow` or
`flight.StreamChunksFromReader`
### Component(s)
Other, Parquet
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]