zeroshade commented on code in PR #759:
URL: https://github.com/apache/iceberg-go/pull/759#discussion_r2884579131
##########
table/internal/parquet_files.go:
##########
@@ -243,46 +244,105 @@ func (parquetFormat) GetWriteProperties(props
iceberg.Properties) any {
parquet.WithCompressionLevel(compressionLevel))
}
-func (p parquetFormat) WriteDataFile(ctx context.Context, fs
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches
[]arrow.RecordBatch) (_ iceberg.DataFile, err error) {
+func (p parquetFormat) WriteDataFile(ctx context.Context, fs
iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches
[]arrow.RecordBatch) (iceberg.DataFile, error) {
+ w, err := p.NewFileWriter(ctx, fs, partitionValues, info,
batches[0].Schema())
+ if err != nil {
+ return nil, err
+ }
+
+ for _, batch := range batches {
+ if err := w.Write(batch); err != nil {
+ w.Close()
+
+ return nil, err
+ }
+ }
+
+ return w.Close()
+}
+
+// ParquetFileWriter is an incremental single-file writer with open/write/close
+// lifecycle. It writes Arrow record batches to a Parquet file and tracks bytes
+// written for rolling file decisions.
+type ParquetFileWriter struct {
+ pqWriter *pqarrow.FileWriter
+ counter *internal.CountingWriter
+ fileCloser io.Closer
+ format parquetFormat
+ info WriteFileInfo
+ partition map[int]any
+ colMapping map[string]int
+}
+
+// NewFileWriter creates a ParquetFileWriter that writes batches to a single
+// Parquet file. Call Write to append batches, BytesWritten to check actual
+// compressed file size, and Close to finalize and get the resulting DataFile.
+func (p parquetFormat) NewFileWriter(ctx context.Context, fs iceio.WriteFileIO,
+ partitionValues map[int]any, info WriteFileInfo, arrowSchema
*arrow.Schema,
+) (FileWriter, error) {
fw, err := fs.Create(info.FileName)
if err != nil {
return nil, err
}
- defer internal.CheckedClose(fw, &err)
+ colMapping, err := p.PathToIDMapping(info.FileSchema)
+ if err != nil {
+ fw.Close()
- cntWriter := internal.CountingWriter{W: fw}
+ return nil, err
+ }
+
+ counter := &internal.CountingWriter{W: fw}
mem := compute.GetAllocator(ctx)
writerProps :=
parquet.NewWriterProperties(info.WriteProps.([]parquet.WriterProperty)...)
arrProps :=
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem),
pqarrow.WithStoreSchema())
- writer, err := pqarrow.NewFileWriter(batches[0].Schema(), &cntWriter,
writerProps, arrProps)
+ writer, err := pqarrow.NewFileWriter(arrowSchema, counter, writerProps,
arrProps)
if err != nil {
+ fw.Close()
+
return nil, err
}
- for _, batch := range batches {
- if err := writer.WriteBuffered(batch); err != nil {
- return nil, err
- }
- }
+ return &ParquetFileWriter{
+ pqWriter: writer,
+ counter: counter,
+ fileCloser: fw,
+ format: p,
+ info: info,
+ partition: partitionValues,
+ colMapping: colMapping,
+ }, nil
+}
- if err := writer.Close(); err != nil {
- return nil, err
- }
+// Write appends a record batch to the Parquet file.
+func (w *ParquetFileWriter) Write(batch arrow.RecordBatch) error {
+ return w.pqWriter.WriteBuffered(batch)
+}
- filemeta, err := writer.FileMetadata()
- if err != nil {
+// BytesWritten returns flushed bytes plus compressed bytes buffered in the
+// current row group — matching the size estimate used by iceberg-java and
+// iceberg-rust to make rolling decisions.
+func (w *ParquetFileWriter) BytesWritten() int64 {
+ return w.counter.Count + w.pqWriter.RowGroupTotalCompressedBytes()
Review Comment:
`TotalCompressedBytes` would include what has already been flushed + what is
still in memory. The important bits that it won't count are footers.
`RowGroupTotalBytesWritten` only counts what has been flushed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]