zeroshade commented on code in PR #524: URL: https://github.com/apache/iceberg-go/pull/524#discussion_r2283001490
########## table/partitioned_fanout_writer.go: ########## @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "errors" + "fmt" + "iter" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/compute" + "github.com/apache/arrow-go/v18/arrow/decimal128" + "github.com/apache/arrow-go/v18/arrow/extensions" + "github.com/apache/iceberg-go" + "golang.org/x/sync/errgroup" +) + +// PartitionedFanoutWriter distributes Arrow records across multiple partitions based on +// a partition specification, writing data to separate files for each partition using +// a fanout pattern with configurable parallelism. +type PartitionedFanoutWriter struct { + partitionSpec iceberg.PartitionSpec + schema *iceberg.Schema + itr iter.Seq2[arrow.Record, error] + writers *WriterFactory +} + +// PartitionInfo holds the row indices and partition values for a specific partition, +// used during the fanout process to group rows by their partition key. +type PartitionInfo struct { + rows []int64 + partitionValues map[int]any +} + +// NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the specified +// partition specification, schema, and record iterator. +func NewPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema *iceberg.Schema, itr iter.Seq2[arrow.Record, error]) *PartitionedFanoutWriter { Review Comment: I don't think we should export these (or the other writers). We should either move these to a new package under `internal` or otherwise ensure we don't export them ########## table/partitioned_fanout_writer.go: ########## @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "errors" + "fmt" + "iter" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/compute" + "github.com/apache/arrow-go/v18/arrow/decimal128" + "github.com/apache/arrow-go/v18/arrow/extensions" + "github.com/apache/iceberg-go" + "golang.org/x/sync/errgroup" +) + +// PartitionedFanoutWriter distributes Arrow records across multiple partitions based on +// a partition specification, writing data to separate files for each partition using +// a fanout pattern with configurable parallelism. +type PartitionedFanoutWriter struct { + partitionSpec iceberg.PartitionSpec + schema *iceberg.Schema + itr iter.Seq2[arrow.Record, error] + writers *WriterFactory +} + +// PartitionInfo holds the row indices and partition values for a specific partition, +// used during the fanout process to group rows by their partition key. +type PartitionInfo struct { + rows []int64 + partitionValues map[int]any +} + +// NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the specified +// partition specification, schema, and record iterator. +func NewPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema *iceberg.Schema, itr iter.Seq2[arrow.Record, error]) *PartitionedFanoutWriter { + return &PartitionedFanoutWriter{ + partitionSpec: partitionSpec, + schema: schema, + itr: itr, + } +} + +func (p *PartitionedFanoutWriter) partitionPath(data partitionRecord) string { + return p.partitionSpec.PartitionToPath(data, p.schema) +} + +// Write writes the Arrow records to the specified location using a fanout pattern with +// the specified number of workers. The returned iterator yields the data files written +// by the fanout process. +func (p *PartitionedFanoutWriter) Write(ctx context.Context, workers int) iter.Seq2[iceberg.DataFile, error] { + inputRecordsCh := make(chan arrow.Record, workers) + outputDataFilesCh := make(chan iceberg.DataFile, workers) + + fanoutWorkers, ctx := errgroup.WithContext(ctx) + p.startRecordFeeder(ctx, fanoutWorkers, inputRecordsCh) + + for range workers { + fanoutWorkers.Go(func() error { + return p.fanout(ctx, inputRecordsCh, outputDataFilesCh) + }) + } + + return p.yieldDataFiles(ctx, fanoutWorkers, outputDataFilesCh) +} + +func (p *PartitionedFanoutWriter) startRecordFeeder(ctx context.Context, fanoutWorkers *errgroup.Group, inputRecordsCh chan<- arrow.Record) { + fanoutWorkers.Go(func() error { + defer close(inputRecordsCh) + + for record, err := range p.itr { + if err != nil { + return err + } + + select { + case <-ctx.Done(): + record.Release() + + return context.Cause(ctx) + case inputRecordsCh <- record: + } + } + + return nil + }) +} + +func (p *PartitionedFanoutWriter) fanout(ctx context.Context, inputRecordsCh <-chan arrow.Record, dataFilesChannel chan<- iceberg.DataFile) error { + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + + case record, ok := <-inputRecordsCh: + if !ok { + return nil + } + defer record.Release() + + partitionMap, err := p.getPartitionMap(record) + if err != nil { + return err + } + + for partition, val := range partitionMap { + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } + + partitionRecord, err := partitionBatchByKey(ctx)(record, val.rows) + if err != nil { + return err + } + + rollingDataWriter, err := p.writers.getOrCreateRollingDataWriter(partition, val.partitionValues) + if err != nil { + return err + } + + err = rollingDataWriter.Add(ctx, partitionRecord, dataFilesChannel) + if err != nil { + return err + } + } + } + } +} + +func (p *PartitionedFanoutWriter) yieldDataFiles(ctx context.Context, fanoutWorkers *errgroup.Group, outputDataFilesCh chan iceberg.DataFile) iter.Seq2[iceberg.DataFile, error] { + var err error + go func() { + defer close(outputDataFilesCh) + err = fanoutWorkers.Wait() + err = errors.Join(err, p.writers.closeAll(ctx, outputDataFilesCh)) + }() + + return func(yield func(iceberg.DataFile, error) bool) { + defer func() { + for range outputDataFilesCh { + } + }() + + for f := range outputDataFilesCh { + if !yield(f, err) { + return + } + } + + if err != nil { + yield(nil, err) + } + } +} + +func (p *PartitionedFanoutWriter) getPartitionMap(record arrow.Record) (map[string]PartitionInfo, error) { + partitionMap := make(map[string]PartitionInfo) + partitionFields := p.partitionSpec.PartitionType(p.schema).FieldList + partitionRec := make(partitionRecord, len(partitionFields)) + + partitionColumns := make([]arrow.Array, len(partitionFields)) + partitionFieldsInfo := make([]struct { + sourceField *iceberg.PartitionField + fieldID int + }, len(partitionFields)) + + for i := range partitionFields { + sourceField := p.partitionSpec.Field(i) + colName, _ := p.schema.FindColumnName(sourceField.SourceID) + colIdx := record.Schema().FieldIndices(colName)[0] + partitionColumns[i] = record.Column(colIdx) + partitionFieldsInfo[i] = struct { + sourceField *iceberg.PartitionField + fieldID int + }{&sourceField, sourceField.FieldID} + } + + for row := range record.NumRows() { + partitionValues := make(map[int]any) + for i := range partitionFields { + col := partitionColumns[i] + if !col.IsNull(int(row)) { + sourceField := partitionFieldsInfo[i].sourceField + val, err := getArrowValueAsIcebergLiteral(col, int(row)) + if err != nil { + return nil, fmt.Errorf("failed to get arrow values as iceberg literal: %w", err) + } + + transformedLiteral := sourceField.Transform.Apply(iceberg.Optional[iceberg.Literal]{Valid: true, Val: val}) + if transformedLiteral.Valid { + partitionRec[i] = transformedLiteral.Val.Any() + partitionValues[sourceField.FieldID] = transformedLiteral.Val.Any() + } else { + partitionRec[i], partitionValues[sourceField.FieldID] = nil, nil + } + } else { + partitionRec[i], partitionValues[partitionFieldsInfo[i].fieldID] = nil, nil + } + } + partitionKey := p.partitionPath(partitionRec) + partVal := partitionMap[partitionKey] + partVal.rows = append(partitionMap[partitionKey].rows, row) + partVal.partitionValues = partitionValues + partitionMap[partitionKey] = partVal + } + + return partitionMap, nil +} + +type partitionBatchFn func(arrow.Record, []int64) (arrow.Record, error) + +func partitionBatchByKey(ctx context.Context) partitionBatchFn { + mem := compute.GetAllocator(ctx) + + return func(record arrow.Record, rowIndices []int64) (arrow.Record, error) { + bldr := array.NewInt64Builder(mem) + defer bldr.Release() + + bldr.AppendValues(rowIndices, nil) + rowIndicesArr := bldr.NewInt64Array() + defer rowIndicesArr.Release() + + partitionedRecord, err := compute.Take( + ctx, + *compute.DefaultTakeOptions(), + compute.NewDatumWithoutOwning(record), + compute.NewDatumWithoutOwning(rowIndicesArr), + ) + if err != nil { + return nil, err + } + + return partitionedRecord.(*compute.RecordDatum).Value, nil + } +} + +func getArrowValueAsIcebergLiteral(column arrow.Array, row int) (iceberg.Literal, error) { + if column.IsNull(row) { + return nil, nil + } + + switch arr := column.(type) { + case *array.Date32: + + return iceberg.NewLiteral(iceberg.Date(arr.Value(row))), nil + case *array.Time64: + + return iceberg.NewLiteral(iceberg.Time(arr.Value(row))), nil + case *array.Timestamp: + + return iceberg.NewLiteral(iceberg.Timestamp(arr.Value(row))), nil + case *array.Decimal32: + val := arr.Value(row) + dec := iceberg.Decimal{ + Val: decimal128.FromU64(uint64(val)), + Scale: int(arr.DataType().(*arrow.Decimal32Type).Scale), + } + + return iceberg.NewLiteral(dec), nil + case *array.Decimal64: + val := arr.Value(row) + dec := iceberg.Decimal{ + Val: decimal128.FromU64(uint64(val)), + Scale: int(arr.DataType().(*arrow.Decimal64Type).Scale), + } + + return iceberg.NewLiteral(dec), nil + case *array.Decimal128: + val := arr.Value(row) + dec := iceberg.Decimal{ + Val: val, + Scale: int(arr.DataType().(*arrow.Decimal128Type).Scale), + } + + return iceberg.NewLiteral(dec), nil + case *extensions.UUIDArray: + + return iceberg.NewLiteral(arr.Value(row)), nil + default: + val := column.GetOneForMarshal(row) + switch v := val.(type) { Review Comment: do we need to handle upcasts of `int8`/`int16`/`uint8`/`uint16` etc? ########## table/partitioned_fanout_writer.go: ########## @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "errors" + "fmt" + "iter" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/compute" + "github.com/apache/arrow-go/v18/arrow/decimal128" + "github.com/apache/arrow-go/v18/arrow/extensions" + "github.com/apache/iceberg-go" + "golang.org/x/sync/errgroup" +) + +// PartitionedFanoutWriter distributes Arrow records across multiple partitions based on +// a partition specification, writing data to separate files for each partition using +// a fanout pattern with configurable parallelism. +type PartitionedFanoutWriter struct { + partitionSpec iceberg.PartitionSpec + schema *iceberg.Schema + itr iter.Seq2[arrow.Record, error] + writers *WriterFactory +} + +// PartitionInfo holds the row indices and partition values for a specific partition, +// used during the fanout process to group rows by their partition key. +type PartitionInfo struct { + rows []int64 + partitionValues map[int]any +} + +// NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the specified +// partition specification, schema, and record iterator. +func NewPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema *iceberg.Schema, itr iter.Seq2[arrow.Record, error]) *PartitionedFanoutWriter { + return &PartitionedFanoutWriter{ + partitionSpec: partitionSpec, + schema: schema, + itr: itr, + } +} + +func (p *PartitionedFanoutWriter) partitionPath(data partitionRecord) string { + return p.partitionSpec.PartitionToPath(data, p.schema) +} + +// Write writes the Arrow records to the specified location using a fanout pattern with +// the specified number of workers. The returned iterator yields the data files written +// by the fanout process. +func (p *PartitionedFanoutWriter) Write(ctx context.Context, workers int) iter.Seq2[iceberg.DataFile, error] { + inputRecordsCh := make(chan arrow.Record, workers) + outputDataFilesCh := make(chan iceberg.DataFile, workers) + + fanoutWorkers, ctx := errgroup.WithContext(ctx) + p.startRecordFeeder(ctx, fanoutWorkers, inputRecordsCh) + + for range workers { + fanoutWorkers.Go(func() error { + return p.fanout(ctx, inputRecordsCh, outputDataFilesCh) + }) + } + + return p.yieldDataFiles(ctx, fanoutWorkers, outputDataFilesCh) +} + +func (p *PartitionedFanoutWriter) startRecordFeeder(ctx context.Context, fanoutWorkers *errgroup.Group, inputRecordsCh chan<- arrow.Record) { + fanoutWorkers.Go(func() error { + defer close(inputRecordsCh) + + for record, err := range p.itr { + if err != nil { + return err + } + + select { + case <-ctx.Done(): + record.Release() + + return context.Cause(ctx) + case inputRecordsCh <- record: + } + } + + return nil + }) +} + +func (p *PartitionedFanoutWriter) fanout(ctx context.Context, inputRecordsCh <-chan arrow.Record, dataFilesChannel chan<- iceberg.DataFile) error { + for { + select { + case <-ctx.Done(): + return context.Cause(ctx) + + case record, ok := <-inputRecordsCh: + if !ok { + return nil + } + defer record.Release() + + partitionMap, err := p.getPartitionMap(record) + if err != nil { + return err + } + + for partition, val := range partitionMap { + select { + case <-ctx.Done(): + return context.Cause(ctx) + default: + } + + partitionRecord, err := partitionBatchByKey(ctx)(record, val.rows) + if err != nil { + return err + } + + rollingDataWriter, err := p.writers.getOrCreateRollingDataWriter(partition, val.partitionValues) + if err != nil { + return err + } + + err = rollingDataWriter.Add(ctx, partitionRecord, dataFilesChannel) + if err != nil { + return err + } + } + } + } +} + +func (p *PartitionedFanoutWriter) yieldDataFiles(ctx context.Context, fanoutWorkers *errgroup.Group, outputDataFilesCh chan iceberg.DataFile) iter.Seq2[iceberg.DataFile, error] { + var err error + go func() { + defer close(outputDataFilesCh) + err = fanoutWorkers.Wait() + err = errors.Join(err, p.writers.closeAll(ctx, outputDataFilesCh)) + }() + + return func(yield func(iceberg.DataFile, error) bool) { + defer func() { + for range outputDataFilesCh { + } + }() + + for f := range outputDataFilesCh { + if !yield(f, err) { + return + } + } + + if err != nil { + yield(nil, err) + } + } +} + +func (p *PartitionedFanoutWriter) getPartitionMap(record arrow.Record) (map[string]PartitionInfo, error) { + partitionMap := make(map[string]PartitionInfo) + partitionFields := p.partitionSpec.PartitionType(p.schema).FieldList + partitionRec := make(partitionRecord, len(partitionFields)) + + partitionColumns := make([]arrow.Array, len(partitionFields)) + partitionFieldsInfo := make([]struct { + sourceField *iceberg.PartitionField + fieldID int + }, len(partitionFields)) + + for i := range partitionFields { + sourceField := p.partitionSpec.Field(i) + colName, _ := p.schema.FindColumnName(sourceField.SourceID) + colIdx := record.Schema().FieldIndices(colName)[0] + partitionColumns[i] = record.Column(colIdx) + partitionFieldsInfo[i] = struct { + sourceField *iceberg.PartitionField + fieldID int + }{&sourceField, sourceField.FieldID} + } + + for row := range record.NumRows() { + partitionValues := make(map[int]any) + for i := range partitionFields { + col := partitionColumns[i] + if !col.IsNull(int(row)) { + sourceField := partitionFieldsInfo[i].sourceField + val, err := getArrowValueAsIcebergLiteral(col, int(row)) + if err != nil { + return nil, fmt.Errorf("failed to get arrow values as iceberg literal: %w", err) + } + + transformedLiteral := sourceField.Transform.Apply(iceberg.Optional[iceberg.Literal]{Valid: true, Val: val}) + if transformedLiteral.Valid { + partitionRec[i] = transformedLiteral.Val.Any() + partitionValues[sourceField.FieldID] = transformedLiteral.Val.Any() + } else { + partitionRec[i], partitionValues[sourceField.FieldID] = nil, nil + } + } else { + partitionRec[i], partitionValues[partitionFieldsInfo[i].fieldID] = nil, nil + } + } Review Comment: this is good enough for now I think. But I think we're gonna need to implement arrow compute functions for the transforms and improve this eventually for performance. ########## table/rolling_data_writer.go: ########## @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "iter" + "net/url" + "sync" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/iceberg-go" +) + +// WriterFactory manages the creation and lifecycle of RollingDataWriter instances +// for different partitions, providing shared configuration and coordination +// across all writers in a partitioned write operation. +type WriterFactory struct { + rootLocation string + args recordWritingArgs + meta *MetadataBuilder + taskSchema *iceberg.Schema + targetFileSize int64 + writers sync.Map + nextCount func() (int, bool) + stopCount func() + mu sync.Mutex +} + +// NewWriterFactory creates a new WriterFactory with the specified configuration +// for managing rolling data writers across partitions. +func NewWriterFactory(rootLocation string, args recordWritingArgs, meta *MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) WriterFactory { + return WriterFactory{ + rootLocation: rootLocation, + args: args, + meta: meta, + taskSchema: taskSchema, + targetFileSize: targetFileSize, + } +} + +// RollingDataWriter accumulates Arrow records for a specific partition and flushes +// them to data files when the target file size is reached, implementing a rolling +// file strategy to manage file sizes. +type RollingDataWriter struct { + partitionKey string + data []arrow.Record + currentSize int64 + factory *WriterFactory + mu sync.Mutex + partitionValues map[int]any +} + +// NewRollingDataWriter creates a new RollingDataWriter for the specified partition +// with the given partition values. +func (w *WriterFactory) NewRollingDataWriter(partition string, partitionValues map[int]any) *RollingDataWriter { + return &RollingDataWriter{ + partitionKey: partition, + data: make([]arrow.Record, 0), + currentSize: 0, + factory: w, + partitionValues: partitionValues, + } +} + +func (w *WriterFactory) getOrCreateRollingDataWriter(partition string, partitionValues map[int]any) (*RollingDataWriter, error) { Review Comment: If we can generate an `iter.Seq2[arrow.Record, error]` for each partition, we might be able to just use `recordsToDataFiles` for each partition instead of duplicating the logic? ########## table/rolling_data_writer.go: ########## @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "iter" + "net/url" + "sync" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/iceberg-go" +) + +// WriterFactory manages the creation and lifecycle of RollingDataWriter instances +// for different partitions, providing shared configuration and coordination +// across all writers in a partitioned write operation. +type WriterFactory struct { + rootLocation string + args recordWritingArgs + meta *MetadataBuilder + taskSchema *iceberg.Schema + targetFileSize int64 + writers sync.Map + nextCount func() (int, bool) + stopCount func() + mu sync.Mutex +} + +// NewWriterFactory creates a new WriterFactory with the specified configuration +// for managing rolling data writers across partitions. +func NewWriterFactory(rootLocation string, args recordWritingArgs, meta *MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) WriterFactory { + return WriterFactory{ + rootLocation: rootLocation, + args: args, + meta: meta, + taskSchema: taskSchema, + targetFileSize: targetFileSize, + } +} + +// RollingDataWriter accumulates Arrow records for a specific partition and flushes +// them to data files when the target file size is reached, implementing a rolling +// file strategy to manage file sizes. +type RollingDataWriter struct { + partitionKey string + data []arrow.Record + currentSize int64 + factory *WriterFactory + mu sync.Mutex + partitionValues map[int]any +} + +// NewRollingDataWriter creates a new RollingDataWriter for the specified partition +// with the given partition values. +func (w *WriterFactory) NewRollingDataWriter(partition string, partitionValues map[int]any) *RollingDataWriter { + return &RollingDataWriter{ + partitionKey: partition, + data: make([]arrow.Record, 0), + currentSize: 0, + factory: w, + partitionValues: partitionValues, + } +} + +func (w *WriterFactory) getOrCreateRollingDataWriter(partition string, partitionValues map[int]any) (*RollingDataWriter, error) { + rollingDataWriter, _ := w.writers.LoadOrStore(partition, w.NewRollingDataWriter(partition, partitionValues)) + writer, ok := rollingDataWriter.(*RollingDataWriter) + if !ok { + return nil, fmt.Errorf("failed to create rolling data writer: %s", partition) + } + + return writer, nil +} + +// Add appends a record to the writer's buffer and flushes to a data file if the +// target file size is reached. +func (r *RollingDataWriter) Add(ctx context.Context, record arrow.Record, outputDataFilesCh chan<- iceberg.DataFile) error { + r.mu.Lock() + defer r.mu.Unlock() + + recordSize := recordNBytes(record) + record.Retain() + + if r.currentSize > 0 && r.currentSize+recordSize > r.factory.targetFileSize { + if err := r.flushToDataFile(ctx, outputDataFilesCh); err != nil { + return err + } + } + + r.data = append(r.data, record) + r.currentSize += recordSize + + if r.currentSize > r.factory.targetFileSize { + if err := r.flushToDataFile(ctx, outputDataFilesCh); err != nil { + return err + } + } + + return nil +} + +func (r *RollingDataWriter) flushToDataFile(ctx context.Context, outputDataFilesCh chan<- iceberg.DataFile) error { + if len(r.data) == 0 { + return nil + } + + task := iter.Seq[WriteTask](func(yield func(WriteTask) bool) { + r.factory.mu.Lock() + cnt, _ := r.factory.nextCount() + r.factory.mu.Unlock() Review Comment: rather than using a mutex for this, just use an atomic.Int inside the count function ########## table/rolling_data_writer.go: ########## @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "iter" + "net/url" + "sync" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/iceberg-go" +) + +// WriterFactory manages the creation and lifecycle of RollingDataWriter instances +// for different partitions, providing shared configuration and coordination +// across all writers in a partitioned write operation. +type WriterFactory struct { + rootLocation string + args recordWritingArgs + meta *MetadataBuilder + taskSchema *iceberg.Schema + targetFileSize int64 + writers sync.Map + nextCount func() (int, bool) + stopCount func() + mu sync.Mutex +} + +// NewWriterFactory creates a new WriterFactory with the specified configuration +// for managing rolling data writers across partitions. +func NewWriterFactory(rootLocation string, args recordWritingArgs, meta *MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) WriterFactory { + return WriterFactory{ + rootLocation: rootLocation, + args: args, + meta: meta, + taskSchema: taskSchema, + targetFileSize: targetFileSize, + } +} + +// RollingDataWriter accumulates Arrow records for a specific partition and flushes +// them to data files when the target file size is reached, implementing a rolling +// file strategy to manage file sizes. +type RollingDataWriter struct { + partitionKey string + data []arrow.Record + currentSize int64 + factory *WriterFactory + mu sync.Mutex + partitionValues map[int]any +} + +// NewRollingDataWriter creates a new RollingDataWriter for the specified partition +// with the given partition values. +func (w *WriterFactory) NewRollingDataWriter(partition string, partitionValues map[int]any) *RollingDataWriter { + return &RollingDataWriter{ + partitionKey: partition, + data: make([]arrow.Record, 0), + currentSize: 0, + factory: w, + partitionValues: partitionValues, + } +} + +func (w *WriterFactory) getOrCreateRollingDataWriter(partition string, partitionValues map[int]any) (*RollingDataWriter, error) { + rollingDataWriter, _ := w.writers.LoadOrStore(partition, w.NewRollingDataWriter(partition, partitionValues)) + writer, ok := rollingDataWriter.(*RollingDataWriter) + if !ok { + return nil, fmt.Errorf("failed to create rolling data writer: %s", partition) + } + + return writer, nil +} + +// Add appends a record to the writer's buffer and flushes to a data file if the +// target file size is reached. +func (r *RollingDataWriter) Add(ctx context.Context, record arrow.Record, outputDataFilesCh chan<- iceberg.DataFile) error { + r.mu.Lock() + defer r.mu.Unlock() + + recordSize := recordNBytes(record) + record.Retain() + + if r.currentSize > 0 && r.currentSize+recordSize > r.factory.targetFileSize { + if err := r.flushToDataFile(ctx, outputDataFilesCh); err != nil { + return err + } + } Review Comment: `recordSize` is not an accurate proxy for what the file size will be after it is written, so we shouldn't use that to figure out if we've hit `targetFileSize` or not. We'll always be smaller than the target in that case. ########## table/rolling_data_writer.go: ########## @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "iter" + "net/url" + "sync" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/iceberg-go" +) + +// WriterFactory manages the creation and lifecycle of RollingDataWriter instances +// for different partitions, providing shared configuration and coordination +// across all writers in a partitioned write operation. +type WriterFactory struct { + rootLocation string + args recordWritingArgs + meta *MetadataBuilder + taskSchema *iceberg.Schema + targetFileSize int64 + writers sync.Map + nextCount func() (int, bool) + stopCount func() + mu sync.Mutex +} + +// NewWriterFactory creates a new WriterFactory with the specified configuration +// for managing rolling data writers across partitions. +func NewWriterFactory(rootLocation string, args recordWritingArgs, meta *MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) WriterFactory { + return WriterFactory{ + rootLocation: rootLocation, + args: args, + meta: meta, + taskSchema: taskSchema, + targetFileSize: targetFileSize, + } +} + +// RollingDataWriter accumulates Arrow records for a specific partition and flushes +// them to data files when the target file size is reached, implementing a rolling +// file strategy to manage file sizes. +type RollingDataWriter struct { + partitionKey string + data []arrow.Record Review Comment: rather than keeping a slice of records, we should instead start a parquet file. Keeping track of the `currentSize` based on the size of the records isn't a good way to gauge the final file size because Parquet is compressed and encoded. Instead we should create a writer interface for the file formats that also reports the current written file size (since the Parquet writer interfaces support reporting the current written bytes) ########## table/rolling_data_writer.go: ########## @@ -0,0 +1,199 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "iter" + "net/url" + "sync" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/iceberg-go" +) + +// WriterFactory manages the creation and lifecycle of RollingDataWriter instances +// for different partitions, providing shared configuration and coordination +// across all writers in a partitioned write operation. +type WriterFactory struct { Review Comment: same comment as above. We shouldn't export these -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
