AaronReboot opened a new issue, #795:
URL: https://github.com/apache/arrow-go/issues/795

   # pqarrow.FileWriter.Close() leaks per-column-chunk buffers when the 
underlying io.Writer fails mid-flush
   
   ### Describe the bug, including details regarding any error messages, 
version, and platform.
   
   When `pqarrow.FileWriter.Close()` runs against an `io.Writer` that has gone 
broken, it returns the first column-chunk close error and leaves every 
subsequent column's per-chunk allocator-tracked state stranded. The leak is 
strictly on the error path: the same code with a working sink returns 
`CheckedAllocator.CurrentAlloc() == 0`. `Close()` is the writer's only chance 
to release that state — there is no separate `Release()` API.
   
   The root cause is in 
`parquet/file/row_group_writer.go::(*rowGroupWriter).Close` ([v18.5.2 lines 
232-254](https://github.com/apache/arrow-go/blob/v18.5.2/parquet/file/row_group_writer.go#L232-L254)):
   
   ```go
   for _, wr := range rg.columnWriters {
       if wr != nil {
           if err := wr.Close(); err != nil {
               return err   // ← strands every column past this one
           }
           rg.bytesWritten += wr.TotalBytesWritten()
           rg.compressedBytesWritten += wr.TotalCompressedBytes()
       }
   }
   ```
   
   When `wr.Close()` errors on column N, columns N+1..end never get `Close()` 
called, so their `currentEncoder` and (in the buffered-row-group path) their 
`bufferedPageWriter.inMemSink` are never released. The 
`bufferedPageWriter.Close` body that *does* release the in-memory sink 
([page_writer.go:485](https://github.com/apache/arrow-go/blob/v18.5.2/parquet/file/page_writer.go#L485))
 is reached via the per-column `pager.Close` call inside `columnWriter.Close` — 
so skipping a column's Close() also skips releasing its accumulated page bytes.
   
   **Versions**:
   
   - `github.com/apache/arrow-go/v18` v18.5.2
   - Go 1.26.2
   - Verified on darwin/arm64 with the reproducer below. Originally observed on 
linux/amd64 in production with a different failing-writer (a cloud storage 
authentication failure on a long-lived multi-GiB write).
   
   ### Reproducer
   
   `go.mod` pins `github.com/apache/arrow-go/v18 v18.5.2`. Two-column schema, 
`WriteBuffered` loop, an `io.Writer` that returns `io.ErrShortWrite` after 
`failAt` bytes:
   
   ```go
   package main
   
   import (
        "fmt"
        "io"
   
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/arrow-go/v18/parquet"
        "github.com/apache/arrow-go/v18/parquet/pqarrow"
   )
   
   // failAfter returns io.ErrShortWrite once it has accepted `remaining`
   // bytes — models any sink that breaks mid-flush (token expiry, broken
   // pipe, HTTP 412 precondition failure, ctx cancellation).
   type failAfter struct{ remaining int }
   
   func (w *failAfter) Write(p []byte) (int, error) {
        if w.remaining <= 0 {
                return 0, io.ErrShortWrite
        }
        if len(p) <= w.remaining {
                w.remaining -= len(p)
                return len(p), nil
        }
        n := w.remaining
        w.remaining = 0
        return n, io.ErrShortWrite
   }
   
   func main() {
        alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
        schema := arrow.NewSchema([]arrow.Field{
                {Name: "s", Type: arrow.BinaryTypes.String, Nullable: false},
                {Name: "i", Type: arrow.PrimitiveTypes.Int32, Nullable: false},
        }, nil)
   
        props := parquet.NewWriterProperties(
                parquet.WithAllocator(alloc),
                parquet.WithMaxRowGroupLength(20_000),
        )
        fw, err := pqarrow.NewFileWriter(
                schema, &failAfter{remaining: 512 << 10}, props, 
pqarrow.DefaultWriterProps())
        if err != nil {
                panic(err)
        }
   
        for b := range 32 {
                bld := array.NewRecordBuilder(alloc, schema)
                for r := range 25_000 {
                        
bld.Field(0).(*array.StringBuilder).Append(fmt.Sprintf("v-%d-%d", b, r%50))
                        
bld.Field(1).(*array.Int32Builder).Append(int32(b*25_000 + r))
                }
                rec := bld.NewRecordBatch()
                werr := fw.WriteBuffered(rec)
                rec.Release()
                bld.Release()
                if werr != nil {
                        break
                }
        }
   
        fmt.Printf("Close error: %v\n", fw.Close())
        fmt.Printf("Leaked: %d bytes\n", alloc.CurrentAlloc())
        alloc.AssertSize(printT{}, 0)
   }
   
   type printT struct{}
   
   func (printT) Helper()                      {}
   func (printT) Errorf(f string, args ...any) { fmt.Printf(f+"\n", args...) }
   ```
   
   Run with:
   
   ```sh
   mkdir pqarrow-leak-repro && cd pqarrow-leak-repro
   go mod init pqarrow-leak-repro
   go get github.com/apache/arrow-go/[email protected]
   # (paste main.go above)
   ARROW_CHECKED_MAX_RETAINED_FRAMES=20 go run .
   ```
   
   `ARROW_CHECKED_MAX_RETAINED_FRAMES=20` is optional but strongly recommended 
— without it, the retained `AssertSize` frame for each leak is just 
`(*Buffer).ResizeNoShrink`, which doesn't reveal where the un-released state 
came from. With it, the call site is plain.
   
   Replacing the `failAfter` with `io.Discard` returns `Leaked: 0` — same code, 
working sink, no leak.
   
   ### Expected behavior
   
   `CurrentAlloc() == 0` after `FileWriter.Close()` regardless of whether the 
underlying writer succeeded or failed. An error from `Close()` should not 
strand allocator-tracked buffers.
   
   ### Actual behavior
   
   ```
   Close error: short write
   Leaked: 4718592 bytes
   LEAK of 131072 bytes FROM
        github.com/apache/arrow-go/v18/arrow/memory.(*Buffer).ResizeNoShrink+4f
                arrow/memory/buffer.go:143
        
github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).expandBuffer+2c
                parquet/internal/encoding/encoder.go:164
        
github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).addIndex
                parquet/internal/encoding/encoder.go:226
        
github.com/apache/arrow-go/v18/parquet/internal/encoding.(*dictEncoder).Put+9c
                parquet/internal/encoding/encoder.go:301
        
github.com/apache/arrow-go/v18/parquet/internal/encoding.(*typedDictEncoder[...]).Put
                parquet/internal/encoding/typed_encoder.go:145
        
github.com/apache/arrow-go/v18/parquet/file.(*Int32ColumnChunkWriter).writeValues+ab
                parquet/file/column_writer_types.gen.go:183
        ... (writeBatch / pqarrow / WriteBuffered)
   [36 identical leak sites; total 36 × 131072 = 4 718 592 bytes]
   ```
   
   The 36 leaks correspond to row groups whose `rowGroupWriter.Close` was 
started, errored on column 0's `Close()`, and never reached column 1's 
`Close()` — leaving column 1's `dictEncoder.idxBuffer` un-released. Production 
has also seen the same root cause surface as `bufferedPageWriter.inMemSink` 
`BufferWriter` leaks (rooted at `serializedPageWriter.WriteDataPage` ← 
`bufferedPageWriter.WriteDataPage` ← `columnWriter.WriteDataPage`), which fires 
when the un-closed column had triggered `FallbackToPlain` mid-write and 
accumulated post-fallback page bytes in its `inMemSink`. The reproducer above 
shows the dict-encoder surface; both surfaces share the same row-group 
early-return root cause.
   
   ### Conditions
   
   The leak fires when, in one row group's close:
   
   1. The writer is built with `pqarrow.FileWriter.WriteBuffered` (the 
buffered-row-group path — exercised by every path where all columns of a row 
group are buffered and flushed together at row-group close).
   2. The schema has multiple columns and the failing column's `Close` is not 
the last in iteration order.
   3. The underlying `io.Writer` errors during the failing column's 
`bufferedPageWriter.Close` write to the sink — early enough that 
`rowGroupWriter.Close` returns before reaching subsequent columns.
   
   Defaults are otherwise sufficient — the only `WriterProperties` knob the 
reproducer sets is `MaxRowGroupLength(20_000)` to amplify the number of row 
groups closed during the run; the bug fires equally with default 
`MaxRowGroupLength` (`int64.max`) on a larger payload.
   
   ### Suggested fix
   
   **Primary — `rowGroupWriter.Close`: drain every column unconditionally, 
capture the first error, return it after the loop.**
   
   ```go
   func (rg *rowGroupWriter) Close() error {
       if !rg.closed {
           rg.closed = true
           if err := rg.checkRowsWritten(); err != nil {
               return err
           }
   
           var firstErr error
           for _, wr := range rg.columnWriters {
               if wr == nil {
                   continue
               }
               if err := wr.Close(); err != nil {
                   if firstErr == nil {
                       firstErr = err
                   }
                   continue                       // keep draining the rest
               }
               rg.bytesWritten += wr.TotalBytesWritten()
               rg.compressedBytesWritten += wr.TotalCompressedBytes()
           }
           rg.columnWriters = nil
           if firstErr != nil {
               return firstErr
           }
           rg.metadata.SetNumRows(rg.nrows)
           rg.metadata.Finish(rg.bytesWritten, rg.ordinal)
       }
       return nil
   }
   ```
   
   This is sufficient to close both leak surfaces in the buffered-row-group 
path: every column gets `Close()` called, which calls `pager.Close()` → 
`bufferedPageWriter.Close`, whose own `defer buf.Release()` then releases the 
in-memory sink bytes; and the column's own defer (registered during its 
`Close`) releases `currentEncoder`.
   
   **Secondary — `columnWriter.Close`: register the cleanup `defer` before any 
fallible call** ([v18.5.2 lines 
579-621](https://github.com/apache/arrow-go/blob/v18.5.2/parquet/file/column_writer.go#L579-L621)).
   
   ```go
   func (w *columnWriter) Close() (err error) {
       if !w.closed {
           w.closed = true
           defer func() {
               w.defLevelSink.Reset(0)
               w.repLevelSink.Reset(0)
               if w.bitsBuffer != nil {
                   w.bitsBuffer.Release()
                   w.bitsBuffer = nil
               }
               if w.currentEncoder != nil {
                   w.currentEncoder.Release()
                   w.currentEncoder = nil
               }
           }()
           if w.hasDict && !w.fallbackToNonDict {
               if err = w.WriteDictionaryPage(); err != nil {
                   return err
               }
           }
           if err = w.FlushBufferedDataPages(); err != nil {
               return err
           }
           // ... unchanged ...
       }
       return err
   }
   ```
   
   Currently the cleanup `defer` is registered after `WriteDictionaryPage` and 
`FlushBufferedDataPages`, both of which can return errors. In the 
buffered-row-group path those calls only mutate in-memory state and don't fail, 
so this contributes nothing to the reproducer above — but in the 
`SerialRowGroupWriter` path (where each `WriteDataPage` writes straight to the 
sink), the same defer-after-fallible-call pattern would strand `currentEncoder` 
directly. The nil-check on `w.currentEncoder` becomes load-bearing because the 
defer can now fire on paths where `currentEncoder` was never set.
   
   ### Component(s)
   
   Parquet
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to