AaronReboot opened a new issue, #794: URL: https://github.com/apache/arrow-go/issues/794
# pqarrow.FileWriter.Close() leaks per-column-chunk buffers when the underlying io.Writer fails mid-flush ### Describe the bug, including details regarding any error messages, version, and platform. When `pqarrow.FileWriter.Close()` runs against an `io.Writer` that has gone broken, it returns the first column-chunk close error and leaves every subsequent column's per-chunk allocator-tracked state stranded. The leak is strictly on the error path: the same code with a working sink returns `CheckedAllocator.CurrentAlloc() == 0`. `Close()` is the writer's only chance to release that state — there is no separate `Release()` API. The root cause is in `parquet/file/row_group_writer.go::(*rowGroupWriter).Close` ([v18.5.2 lines 232-254](https://github.com/apache/arrow-go/blob/v18.5.2/parquet/file/row_group_writer.go#L232-L254)): ```go for _, wr := range rg.columnWriters { if wr != nil { if err := wr.Close(); err != nil { return err // ← strands every column past this one } rg.bytesWritten += wr.TotalBytesWritten() rg.compressedBytesWritten += wr.TotalCompressedBytes() } } ``` When `wr.Close()` errors on column N, columns N+1..end never get `Close()` called, so their `currentEncoder` and (in the buffered-row-group path) their `bufferedPageWriter.inMemSink` are never released. The `bufferedPageWriter.Close` body that *does* release the in-memory sink ([page_writer.go:485](https://github.com/apache/arrow-go/blob/v18.5.2/parquet/file/page_writer.go#L485)) is reached via the per-column `pager.Close` call inside `columnWriter.Close` — so skipping a column's Close() also skips releasing its accumulated page bytes. **Versions**: - `github.com/apache/arrow-go/v18` v18.5.2 - Go 1.26.2 - Verified on darwin/arm64 with the reproducer below. Originally observed on linux/amd64 in production with a different failing-writer (a cloud storage authentication failure on a long-lived multi-GiB write). ### Reproducer `go.mod` pins `github.com/apache/arrow-go/v18 v18.5.2`. Two-column schema, `WriteBuffered` loop, an `io.Writer` that returns `io.ErrShortWrite` after `failAt` bytes: ```go package main import ( "fmt" "io" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/parquet" "github.com/apache/arrow-go/v18/parquet/pqarrow" ) // failAfter returns io.ErrShortWrite once it has accepted `remaining` // bytes — models any sink that breaks mid-flush (token expiry, broken // pipe, HTTP 412 precondition failure, ctx cancellation). type failAfter struct{ remaining int } func (w *failAfter) Write(p []byte) (int, error) { if w.remaining <= 0 { return 0, io.ErrShortWrite } if len(p) <= w.remaining { w.remaining -= len(p) return len(p), nil } n := w.remaining w.remaining = 0 return n, io.ErrShortWrite } func main() { alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) schema := arrow.NewSchema([]arrow.Field{ {Name: "s", Type: arrow.BinaryTypes.String, Nullable: false}, {Name: "i", Type: arrow.PrimitiveTypes.Int32, Nullable: false}, }, nil) props := parquet.NewWriterProperties( parquet.WithAllocator(alloc), parquet.WithMaxRowGroupLength(20_000), ) fw, err := pqarrow.NewFileWriter( schema, &failAfter{remaining: 512 << 10}, props, pqarrow.DefaultWriterProps()) if err != nil { panic(err) } for b := range 32 { bld := array.NewRecordBuilder(alloc, schema) for r := range 25_000 { bld.Field(0).(*array.StringBuilder).Append(fmt.Sprintf("v-%d-%d", b, r%50)) bld.Field(1).(*array.Int32Builder).Append(int32(b*25_000 + r)) } rec := bld.NewRecordBatch() werr := fw.WriteBuffered(rec) rec.Release() bld.Release() if werr != nil { break } } fmt.Printf("Close error: %v\n", fw.Close()) fmt.Printf("Leaked: %d bytes\n", alloc.CurrentAlloc()) alloc.AssertSize(printT{}, 0) } type printT struct{} func (printT) Helper() {} func (printT) Errorf(f string, args ...any) { fmt.Printf(f+"\n", args...) } ``` Run with: ```sh mkdir pqarrow-leak-repro && cd pqarrow-leak-repro go mod init pqarrow-leak-repro go get github.com/apache/arrow-go/[email protected] # (paste main.go above) ARROW_CHECKED_MAX_RETAINED_FRAMES=20 go run . ``` `ARROW_CHECKED_MAX_RETAINED_FRAMES=20` is optional but strongly recommended — without it, the retained `AssertSize` frame for each leak is just `(*Buffer).ResizeNoShrink`, which doesn't reveal where the un-released state came from. With it, the call site is plain. Replacing the `failAfter` with `io.Discard` returns `Leaked: 0` — same code, working sink, no leak. ### Expected behavior `CurrentAlloc() == 0` after `FileWriter.Close()` regardless of whether the underlying writer succeeded or failed. An error from `Close()` should not strand allocator-tracked buffers. ### Actual behavior ``` Close error: short write Leaked: 4718592 bytes LEAK of 131072 bytes FROM github.com/apache/arrow-go/v18/arrow/memory.(*Buffer).ResizeNoShrink+4f arrow/memory/buffer.go:143 github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).expandBuffer+2c parquet/internal/encoding/encoder.go:164 github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).addIndex parquet/internal/encoding/encoder.go:226 github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).Put+9c parquet/internal/encoding/encoder.go:301 github.com/apache/arrow-go/v18/parquet/internal/encoding.(*typedDictEncoder[...]).Put parquet/internal/encoding/typed_encoder.go:145 github.com/apache/arrow-go/v18/parquet/file.(*Int32ColumnChunkWriter).writeValues+ab parquet/file/column_writer_types.gen.go:183 ... (writeBatch / pqarrow / WriteBuffered) [36 identical leak sites; total 36 × 131072 = 4 718 592 bytes] ``` The 36 leaks correspond to row groups whose `rowGroupWriter.Close` was started, errored on column 0's `Close()`, and never reached column 1's `Close()` — leaving column 1's `dictEncoder.idxBuffer` un-released. Production has also seen the same root cause surface as `bufferedPageWriter.inMemSink` `BufferWriter` leaks (rooted at `serializedPageWriter.WriteDataPage` ← `bufferedPageWriter.WriteDataPage` ← `columnWriter.WriteDataPage`), which fires when the un-closed column had triggered `FallbackToPlain` mid-write and accumulated post-fallback page bytes in its `inMemSink`. The reproducer above shows the dict-encoder surface; both surfaces share the same row-group early-return root cause. ### Conditions The leak fires when, in one row group's close: 1. The writer is built with `pqarrow.FileWriter.WriteBuffered` (the buffered-row-group path — exercised by every path where all columns of a row group are buffered and flushed together at row-group close). 2. The schema has multiple columns and the failing column's `Close` is not the last in iteration order. 3. The underlying `io.Writer` errors during the failing column's `bufferedPageWriter.Close` write to the sink — early enough that `rowGroupWriter.Close` returns before reaching subsequent columns. Defaults are otherwise sufficient — the only `WriterProperties` knob the reproducer sets is `MaxRowGroupLength(20_000)` to amplify the number of row groups closed during the run; the bug fires equally with default `MaxRowGroupLength` (`int64.max`) on a larger payload. ### Suggested fix **Primary — `rowGroupWriter.Close`: drain every column unconditionally, capture the first error, return it after the loop.** ```go func (rg *rowGroupWriter) Close() error { if !rg.closed { rg.closed = true if err := rg.checkRowsWritten(); err != nil { return err } var firstErr error for _, wr := range rg.columnWriters { if wr == nil { continue } if err := wr.Close(); err != nil { if firstErr == nil { firstErr = err } continue // keep draining the rest } rg.bytesWritten += wr.TotalBytesWritten() rg.compressedBytesWritten += wr.TotalCompressedBytes() } rg.columnWriters = nil if firstErr != nil { return firstErr } rg.metadata.SetNumRows(rg.nrows) rg.metadata.Finish(rg.bytesWritten, rg.ordinal) } return nil } ``` This is sufficient to close both leak surfaces in the buffered-row-group path: every column gets `Close()` called, which calls `pager.Close()` → `bufferedPageWriter.Close`, whose own `defer buf.Release()` then releases the in-memory sink bytes; and the column's own defer (registered during its `Close`) releases `currentEncoder`. **Secondary — `columnWriter.Close`: register the cleanup `defer` before any fallible call** ([v18.5.2 lines 579-621](https://github.com/apache/arrow-go/blob/v18.5.2/parquet/file/column_writer.go#L579-L621)). ```go func (w *columnWriter) Close() (err error) { if !w.closed { w.closed = true defer func() { w.defLevelSink.Reset(0) w.repLevelSink.Reset(0) if w.bitsBuffer != nil { w.bitsBuffer.Release() w.bitsBuffer = nil } if w.currentEncoder != nil { w.currentEncoder.Release() w.currentEncoder = nil } }() if w.hasDict && !w.fallbackToNonDict { if err = w.WriteDictionaryPage(); err != nil { return err } } if err = w.FlushBufferedDataPages(); err != nil { return err } // ... unchanged ... } return err } ``` Currently the cleanup `defer` is registered after `WriteDictionaryPage` and `FlushBufferedDataPages`, both of which can return errors. In the buffered-row-group path those calls only mutate in-memory state and don't fail, so this contributes nothing to the reproducer above — but in the `SerialRowGroupWriter` path (where each `WriteDataPage` writes straight to the sink), the same defer-after-fallible-call pattern would strand `currentEncoder` directly. The nil-check on `w.currentEncoder` becomes load-bearing because the defer can now fire on paths where `currentEncoder` was never set. ### Component(s) Parquet -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
