zeroshade commented on code in PR #524:
URL: https://github.com/apache/iceberg-go/pull/524#discussion_r2283001490


##########
table/partitioned_fanout_writer.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "iter"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/compute"
+       "github.com/apache/arrow-go/v18/arrow/decimal128"
+       "github.com/apache/arrow-go/v18/arrow/extensions"
+       "github.com/apache/iceberg-go"
+       "golang.org/x/sync/errgroup"
+)
+
+// PartitionedFanoutWriter distributes Arrow records across multiple 
partitions based on
+// a partition specification, writing data to separate files for each 
partition using
+// a fanout pattern with configurable parallelism.
+type PartitionedFanoutWriter struct {
+       partitionSpec iceberg.PartitionSpec
+       schema        *iceberg.Schema
+       itr           iter.Seq2[arrow.Record, error]
+       writers       *WriterFactory
+}
+
+// PartitionInfo holds the row indices and partition values for a specific 
partition,
+// used during the fanout process to group rows by their partition key.
+type PartitionInfo struct {
+       rows            []int64
+       partitionValues map[int]any
+}
+
+// NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the 
specified
+// partition specification, schema, and record iterator.
+func NewPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema 
*iceberg.Schema, itr iter.Seq2[arrow.Record, error]) *PartitionedFanoutWriter {

Review Comment:
   I don't think we should export these (or the other writers). We should 
either move these to a new package under `internal` or otherwise ensure we 
don't export them



##########
table/partitioned_fanout_writer.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "iter"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/compute"
+       "github.com/apache/arrow-go/v18/arrow/decimal128"
+       "github.com/apache/arrow-go/v18/arrow/extensions"
+       "github.com/apache/iceberg-go"
+       "golang.org/x/sync/errgroup"
+)
+
+// PartitionedFanoutWriter distributes Arrow records across multiple 
partitions based on
+// a partition specification, writing data to separate files for each 
partition using
+// a fanout pattern with configurable parallelism.
+type PartitionedFanoutWriter struct {
+       partitionSpec iceberg.PartitionSpec
+       schema        *iceberg.Schema
+       itr           iter.Seq2[arrow.Record, error]
+       writers       *WriterFactory
+}
+
+// PartitionInfo holds the row indices and partition values for a specific 
partition,
+// used during the fanout process to group rows by their partition key.
+type PartitionInfo struct {
+       rows            []int64
+       partitionValues map[int]any
+}
+
+// NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the 
specified
+// partition specification, schema, and record iterator.
+func NewPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema 
*iceberg.Schema, itr iter.Seq2[arrow.Record, error]) *PartitionedFanoutWriter {
+       return &PartitionedFanoutWriter{
+               partitionSpec: partitionSpec,
+               schema:        schema,
+               itr:           itr,
+       }
+}
+
+func (p *PartitionedFanoutWriter) partitionPath(data partitionRecord) string {
+       return p.partitionSpec.PartitionToPath(data, p.schema)
+}
+
+// Write writes the Arrow records to the specified location using a fanout 
pattern with
+// the specified number of workers. The returned iterator yields the data 
files written
+// by the fanout process.
+func (p *PartitionedFanoutWriter) Write(ctx context.Context, workers int) 
iter.Seq2[iceberg.DataFile, error] {
+       inputRecordsCh := make(chan arrow.Record, workers)
+       outputDataFilesCh := make(chan iceberg.DataFile, workers)
+
+       fanoutWorkers, ctx := errgroup.WithContext(ctx)
+       p.startRecordFeeder(ctx, fanoutWorkers, inputRecordsCh)
+
+       for range workers {
+               fanoutWorkers.Go(func() error {
+                       return p.fanout(ctx, inputRecordsCh, outputDataFilesCh)
+               })
+       }
+
+       return p.yieldDataFiles(ctx, fanoutWorkers, outputDataFilesCh)
+}
+
+func (p *PartitionedFanoutWriter) startRecordFeeder(ctx context.Context, 
fanoutWorkers *errgroup.Group, inputRecordsCh chan<- arrow.Record) {
+       fanoutWorkers.Go(func() error {
+               defer close(inputRecordsCh)
+
+               for record, err := range p.itr {
+                       if err != nil {
+                               return err
+                       }
+
+                       select {
+                       case <-ctx.Done():
+                               record.Release()
+
+                               return context.Cause(ctx)
+                       case inputRecordsCh <- record:
+                       }
+               }
+
+               return nil
+       })
+}
+
+func (p *PartitionedFanoutWriter) fanout(ctx context.Context, inputRecordsCh 
<-chan arrow.Record, dataFilesChannel chan<- iceberg.DataFile) error {
+       for {
+               select {
+               case <-ctx.Done():
+                       return context.Cause(ctx)
+
+               case record, ok := <-inputRecordsCh:
+                       if !ok {
+                               return nil
+                       }
+                       defer record.Release()
+
+                       partitionMap, err := p.getPartitionMap(record)
+                       if err != nil {
+                               return err
+                       }
+
+                       for partition, val := range partitionMap {
+                               select {
+                               case <-ctx.Done():
+                                       return context.Cause(ctx)
+                               default:
+                               }
+
+                               partitionRecord, err := 
partitionBatchByKey(ctx)(record, val.rows)
+                               if err != nil {
+                                       return err
+                               }
+
+                               rollingDataWriter, err := 
p.writers.getOrCreateRollingDataWriter(partition, val.partitionValues)
+                               if err != nil {
+                                       return err
+                               }
+
+                               err = rollingDataWriter.Add(ctx, 
partitionRecord, dataFilesChannel)
+                               if err != nil {
+                                       return err
+                               }
+                       }
+               }
+       }
+}
+
+func (p *PartitionedFanoutWriter) yieldDataFiles(ctx context.Context, 
fanoutWorkers *errgroup.Group, outputDataFilesCh chan iceberg.DataFile) 
iter.Seq2[iceberg.DataFile, error] {
+       var err error
+       go func() {
+               defer close(outputDataFilesCh)
+               err = fanoutWorkers.Wait()
+               err = errors.Join(err, p.writers.closeAll(ctx, 
outputDataFilesCh))
+       }()
+
+       return func(yield func(iceberg.DataFile, error) bool) {
+               defer func() {
+                       for range outputDataFilesCh {
+                       }
+               }()
+
+               for f := range outputDataFilesCh {
+                       if !yield(f, err) {
+                               return
+                       }
+               }
+
+               if err != nil {
+                       yield(nil, err)
+               }
+       }
+}
+
+func (p *PartitionedFanoutWriter) getPartitionMap(record arrow.Record) 
(map[string]PartitionInfo, error) {
+       partitionMap := make(map[string]PartitionInfo)
+       partitionFields := p.partitionSpec.PartitionType(p.schema).FieldList
+       partitionRec := make(partitionRecord, len(partitionFields))
+
+       partitionColumns := make([]arrow.Array, len(partitionFields))
+       partitionFieldsInfo := make([]struct {
+               sourceField *iceberg.PartitionField
+               fieldID     int
+       }, len(partitionFields))
+
+       for i := range partitionFields {
+               sourceField := p.partitionSpec.Field(i)
+               colName, _ := p.schema.FindColumnName(sourceField.SourceID)
+               colIdx := record.Schema().FieldIndices(colName)[0]
+               partitionColumns[i] = record.Column(colIdx)
+               partitionFieldsInfo[i] = struct {
+                       sourceField *iceberg.PartitionField
+                       fieldID     int
+               }{&sourceField, sourceField.FieldID}
+       }
+
+       for row := range record.NumRows() {
+               partitionValues := make(map[int]any)
+               for i := range partitionFields {
+                       col := partitionColumns[i]
+                       if !col.IsNull(int(row)) {
+                               sourceField := 
partitionFieldsInfo[i].sourceField
+                               val, err := getArrowValueAsIcebergLiteral(col, 
int(row))
+                               if err != nil {
+                                       return nil, fmt.Errorf("failed to get 
arrow values as iceberg literal: %w", err)
+                               }
+
+                               transformedLiteral := 
sourceField.Transform.Apply(iceberg.Optional[iceberg.Literal]{Valid: true, Val: 
val})
+                               if transformedLiteral.Valid {
+                                       partitionRec[i] = 
transformedLiteral.Val.Any()
+                                       partitionValues[sourceField.FieldID] = 
transformedLiteral.Val.Any()
+                               } else {
+                                       partitionRec[i], 
partitionValues[sourceField.FieldID] = nil, nil
+                               }
+                       } else {
+                               partitionRec[i], 
partitionValues[partitionFieldsInfo[i].fieldID] = nil, nil
+                       }
+               }
+               partitionKey := p.partitionPath(partitionRec)
+               partVal := partitionMap[partitionKey]
+               partVal.rows = append(partitionMap[partitionKey].rows, row)
+               partVal.partitionValues = partitionValues
+               partitionMap[partitionKey] = partVal
+       }
+
+       return partitionMap, nil
+}
+
+type partitionBatchFn func(arrow.Record, []int64) (arrow.Record, error)
+
+func partitionBatchByKey(ctx context.Context) partitionBatchFn {
+       mem := compute.GetAllocator(ctx)
+
+       return func(record arrow.Record, rowIndices []int64) (arrow.Record, 
error) {
+               bldr := array.NewInt64Builder(mem)
+               defer bldr.Release()
+
+               bldr.AppendValues(rowIndices, nil)
+               rowIndicesArr := bldr.NewInt64Array()
+               defer rowIndicesArr.Release()
+
+               partitionedRecord, err := compute.Take(
+                       ctx,
+                       *compute.DefaultTakeOptions(),
+                       compute.NewDatumWithoutOwning(record),
+                       compute.NewDatumWithoutOwning(rowIndicesArr),
+               )
+               if err != nil {
+                       return nil, err
+               }
+
+               return partitionedRecord.(*compute.RecordDatum).Value, nil
+       }
+}
+
+func getArrowValueAsIcebergLiteral(column arrow.Array, row int) 
(iceberg.Literal, error) {
+       if column.IsNull(row) {
+               return nil, nil
+       }
+
+       switch arr := column.(type) {
+       case *array.Date32:
+
+               return iceberg.NewLiteral(iceberg.Date(arr.Value(row))), nil
+       case *array.Time64:
+
+               return iceberg.NewLiteral(iceberg.Time(arr.Value(row))), nil
+       case *array.Timestamp:
+
+               return iceberg.NewLiteral(iceberg.Timestamp(arr.Value(row))), 
nil
+       case *array.Decimal32:
+               val := arr.Value(row)
+               dec := iceberg.Decimal{
+                       Val:   decimal128.FromU64(uint64(val)),
+                       Scale: int(arr.DataType().(*arrow.Decimal32Type).Scale),
+               }
+
+               return iceberg.NewLiteral(dec), nil
+       case *array.Decimal64:
+               val := arr.Value(row)
+               dec := iceberg.Decimal{
+                       Val:   decimal128.FromU64(uint64(val)),
+                       Scale: int(arr.DataType().(*arrow.Decimal64Type).Scale),
+               }
+
+               return iceberg.NewLiteral(dec), nil
+       case *array.Decimal128:
+               val := arr.Value(row)
+               dec := iceberg.Decimal{
+                       Val:   val,
+                       Scale: 
int(arr.DataType().(*arrow.Decimal128Type).Scale),
+               }
+
+               return iceberg.NewLiteral(dec), nil
+       case *extensions.UUIDArray:
+
+               return iceberg.NewLiteral(arr.Value(row)), nil
+       default:
+               val := column.GetOneForMarshal(row)
+               switch v := val.(type) {

Review Comment:
   do we need to handle upcasts of `int8`/`int16`/`uint8`/`uint16` etc?



##########
table/partitioned_fanout_writer.go:
##########
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "iter"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/compute"
+       "github.com/apache/arrow-go/v18/arrow/decimal128"
+       "github.com/apache/arrow-go/v18/arrow/extensions"
+       "github.com/apache/iceberg-go"
+       "golang.org/x/sync/errgroup"
+)
+
+// PartitionedFanoutWriter distributes Arrow records across multiple 
partitions based on
+// a partition specification, writing data to separate files for each 
partition using
+// a fanout pattern with configurable parallelism.
+type PartitionedFanoutWriter struct {
+       partitionSpec iceberg.PartitionSpec
+       schema        *iceberg.Schema
+       itr           iter.Seq2[arrow.Record, error]
+       writers       *WriterFactory
+}
+
+// PartitionInfo holds the row indices and partition values for a specific 
partition,
+// used during the fanout process to group rows by their partition key.
+type PartitionInfo struct {
+       rows            []int64
+       partitionValues map[int]any
+}
+
+// NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the 
specified
+// partition specification, schema, and record iterator.
+func NewPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema 
*iceberg.Schema, itr iter.Seq2[arrow.Record, error]) *PartitionedFanoutWriter {
+       return &PartitionedFanoutWriter{
+               partitionSpec: partitionSpec,
+               schema:        schema,
+               itr:           itr,
+       }
+}
+
+func (p *PartitionedFanoutWriter) partitionPath(data partitionRecord) string {
+       return p.partitionSpec.PartitionToPath(data, p.schema)
+}
+
+// Write writes the Arrow records to the specified location using a fanout 
pattern with
+// the specified number of workers. The returned iterator yields the data 
files written
+// by the fanout process.
+func (p *PartitionedFanoutWriter) Write(ctx context.Context, workers int) 
iter.Seq2[iceberg.DataFile, error] {
+       inputRecordsCh := make(chan arrow.Record, workers)
+       outputDataFilesCh := make(chan iceberg.DataFile, workers)
+
+       fanoutWorkers, ctx := errgroup.WithContext(ctx)
+       p.startRecordFeeder(ctx, fanoutWorkers, inputRecordsCh)
+
+       for range workers {
+               fanoutWorkers.Go(func() error {
+                       return p.fanout(ctx, inputRecordsCh, outputDataFilesCh)
+               })
+       }
+
+       return p.yieldDataFiles(ctx, fanoutWorkers, outputDataFilesCh)
+}
+
+func (p *PartitionedFanoutWriter) startRecordFeeder(ctx context.Context, 
fanoutWorkers *errgroup.Group, inputRecordsCh chan<- arrow.Record) {
+       fanoutWorkers.Go(func() error {
+               defer close(inputRecordsCh)
+
+               for record, err := range p.itr {
+                       if err != nil {
+                               return err
+                       }
+
+                       select {
+                       case <-ctx.Done():
+                               record.Release()
+
+                               return context.Cause(ctx)
+                       case inputRecordsCh <- record:
+                       }
+               }
+
+               return nil
+       })
+}
+
+func (p *PartitionedFanoutWriter) fanout(ctx context.Context, inputRecordsCh 
<-chan arrow.Record, dataFilesChannel chan<- iceberg.DataFile) error {
+       for {
+               select {
+               case <-ctx.Done():
+                       return context.Cause(ctx)
+
+               case record, ok := <-inputRecordsCh:
+                       if !ok {
+                               return nil
+                       }
+                       defer record.Release()
+
+                       partitionMap, err := p.getPartitionMap(record)
+                       if err != nil {
+                               return err
+                       }
+
+                       for partition, val := range partitionMap {
+                               select {
+                               case <-ctx.Done():
+                                       return context.Cause(ctx)
+                               default:
+                               }
+
+                               partitionRecord, err := 
partitionBatchByKey(ctx)(record, val.rows)
+                               if err != nil {
+                                       return err
+                               }
+
+                               rollingDataWriter, err := 
p.writers.getOrCreateRollingDataWriter(partition, val.partitionValues)
+                               if err != nil {
+                                       return err
+                               }
+
+                               err = rollingDataWriter.Add(ctx, 
partitionRecord, dataFilesChannel)
+                               if err != nil {
+                                       return err
+                               }
+                       }
+               }
+       }
+}
+
+func (p *PartitionedFanoutWriter) yieldDataFiles(ctx context.Context, 
fanoutWorkers *errgroup.Group, outputDataFilesCh chan iceberg.DataFile) 
iter.Seq2[iceberg.DataFile, error] {
+       var err error
+       go func() {
+               defer close(outputDataFilesCh)
+               err = fanoutWorkers.Wait()
+               err = errors.Join(err, p.writers.closeAll(ctx, 
outputDataFilesCh))
+       }()
+
+       return func(yield func(iceberg.DataFile, error) bool) {
+               defer func() {
+                       for range outputDataFilesCh {
+                       }
+               }()
+
+               for f := range outputDataFilesCh {
+                       if !yield(f, err) {
+                               return
+                       }
+               }
+
+               if err != nil {
+                       yield(nil, err)
+               }
+       }
+}
+
+func (p *PartitionedFanoutWriter) getPartitionMap(record arrow.Record) 
(map[string]PartitionInfo, error) {
+       partitionMap := make(map[string]PartitionInfo)
+       partitionFields := p.partitionSpec.PartitionType(p.schema).FieldList
+       partitionRec := make(partitionRecord, len(partitionFields))
+
+       partitionColumns := make([]arrow.Array, len(partitionFields))
+       partitionFieldsInfo := make([]struct {
+               sourceField *iceberg.PartitionField
+               fieldID     int
+       }, len(partitionFields))
+
+       for i := range partitionFields {
+               sourceField := p.partitionSpec.Field(i)
+               colName, _ := p.schema.FindColumnName(sourceField.SourceID)
+               colIdx := record.Schema().FieldIndices(colName)[0]
+               partitionColumns[i] = record.Column(colIdx)
+               partitionFieldsInfo[i] = struct {
+                       sourceField *iceberg.PartitionField
+                       fieldID     int
+               }{&sourceField, sourceField.FieldID}
+       }
+
+       for row := range record.NumRows() {
+               partitionValues := make(map[int]any)
+               for i := range partitionFields {
+                       col := partitionColumns[i]
+                       if !col.IsNull(int(row)) {
+                               sourceField := 
partitionFieldsInfo[i].sourceField
+                               val, err := getArrowValueAsIcebergLiteral(col, 
int(row))
+                               if err != nil {
+                                       return nil, fmt.Errorf("failed to get 
arrow values as iceberg literal: %w", err)
+                               }
+
+                               transformedLiteral := 
sourceField.Transform.Apply(iceberg.Optional[iceberg.Literal]{Valid: true, Val: 
val})
+                               if transformedLiteral.Valid {
+                                       partitionRec[i] = 
transformedLiteral.Val.Any()
+                                       partitionValues[sourceField.FieldID] = 
transformedLiteral.Val.Any()
+                               } else {
+                                       partitionRec[i], 
partitionValues[sourceField.FieldID] = nil, nil
+                               }
+                       } else {
+                               partitionRec[i], 
partitionValues[partitionFieldsInfo[i].fieldID] = nil, nil
+                       }
+               }

Review Comment:
   this is good enough for now I think. But I think we're gonna need to 
implement arrow compute functions for the transforms and improve this 
eventually for performance.



##########
table/rolling_data_writer.go:
##########
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "fmt"
+       "iter"
+       "net/url"
+       "sync"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+)
+
+// WriterFactory manages the creation and lifecycle of RollingDataWriter 
instances
+// for different partitions, providing shared configuration and coordination
+// across all writers in a partitioned write operation.
+type WriterFactory struct {
+       rootLocation   string
+       args           recordWritingArgs
+       meta           *MetadataBuilder
+       taskSchema     *iceberg.Schema
+       targetFileSize int64
+       writers        sync.Map
+       nextCount      func() (int, bool)
+       stopCount      func()
+       mu             sync.Mutex
+}
+
+// NewWriterFactory creates a new WriterFactory with the specified 
configuration
+// for managing rolling data writers across partitions.
+func NewWriterFactory(rootLocation string, args recordWritingArgs, meta 
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) 
WriterFactory {
+       return WriterFactory{
+               rootLocation:   rootLocation,
+               args:           args,
+               meta:           meta,
+               taskSchema:     taskSchema,
+               targetFileSize: targetFileSize,
+       }
+}
+
+// RollingDataWriter accumulates Arrow records for a specific partition and 
flushes
+// them to data files when the target file size is reached, implementing a 
rolling
+// file strategy to manage file sizes.
+type RollingDataWriter struct {
+       partitionKey    string
+       data            []arrow.Record
+       currentSize     int64
+       factory         *WriterFactory
+       mu              sync.Mutex
+       partitionValues map[int]any
+}
+
+// NewRollingDataWriter creates a new RollingDataWriter for the specified 
partition
+// with the given partition values.
+func (w *WriterFactory) NewRollingDataWriter(partition string, partitionValues 
map[int]any) *RollingDataWriter {
+       return &RollingDataWriter{
+               partitionKey:    partition,
+               data:            make([]arrow.Record, 0),
+               currentSize:     0,
+               factory:         w,
+               partitionValues: partitionValues,
+       }
+}
+
+func (w *WriterFactory) getOrCreateRollingDataWriter(partition string, 
partitionValues map[int]any) (*RollingDataWriter, error) {

Review Comment:
   If we can generate an `iter.Seq2[arrow.Record, error]` for each partition, 
we might be able to just use `recordsToDataFiles` for each partition instead of 
duplicating the logic?



##########
table/rolling_data_writer.go:
##########
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "fmt"
+       "iter"
+       "net/url"
+       "sync"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+)
+
+// WriterFactory manages the creation and lifecycle of RollingDataWriter 
instances
+// for different partitions, providing shared configuration and coordination
+// across all writers in a partitioned write operation.
+type WriterFactory struct {
+       rootLocation   string
+       args           recordWritingArgs
+       meta           *MetadataBuilder
+       taskSchema     *iceberg.Schema
+       targetFileSize int64
+       writers        sync.Map
+       nextCount      func() (int, bool)
+       stopCount      func()
+       mu             sync.Mutex
+}
+
+// NewWriterFactory creates a new WriterFactory with the specified 
configuration
+// for managing rolling data writers across partitions.
+func NewWriterFactory(rootLocation string, args recordWritingArgs, meta 
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) 
WriterFactory {
+       return WriterFactory{
+               rootLocation:   rootLocation,
+               args:           args,
+               meta:           meta,
+               taskSchema:     taskSchema,
+               targetFileSize: targetFileSize,
+       }
+}
+
+// RollingDataWriter accumulates Arrow records for a specific partition and 
flushes
+// them to data files when the target file size is reached, implementing a 
rolling
+// file strategy to manage file sizes.
+type RollingDataWriter struct {
+       partitionKey    string
+       data            []arrow.Record
+       currentSize     int64
+       factory         *WriterFactory
+       mu              sync.Mutex
+       partitionValues map[int]any
+}
+
+// NewRollingDataWriter creates a new RollingDataWriter for the specified 
partition
+// with the given partition values.
+func (w *WriterFactory) NewRollingDataWriter(partition string, partitionValues 
map[int]any) *RollingDataWriter {
+       return &RollingDataWriter{
+               partitionKey:    partition,
+               data:            make([]arrow.Record, 0),
+               currentSize:     0,
+               factory:         w,
+               partitionValues: partitionValues,
+       }
+}
+
+func (w *WriterFactory) getOrCreateRollingDataWriter(partition string, 
partitionValues map[int]any) (*RollingDataWriter, error) {
+       rollingDataWriter, _ := w.writers.LoadOrStore(partition, 
w.NewRollingDataWriter(partition, partitionValues))
+       writer, ok := rollingDataWriter.(*RollingDataWriter)
+       if !ok {
+               return nil, fmt.Errorf("failed to create rolling data writer: 
%s", partition)
+       }
+
+       return writer, nil
+}
+
+// Add appends a record to the writer's buffer and flushes to a data file if 
the
+// target file size is reached.
+func (r *RollingDataWriter) Add(ctx context.Context, record arrow.Record, 
outputDataFilesCh chan<- iceberg.DataFile) error {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+
+       recordSize := recordNBytes(record)
+       record.Retain()
+
+       if r.currentSize > 0 && r.currentSize+recordSize > 
r.factory.targetFileSize {
+               if err := r.flushToDataFile(ctx, outputDataFilesCh); err != nil 
{
+                       return err
+               }
+       }
+
+       r.data = append(r.data, record)
+       r.currentSize += recordSize
+
+       if r.currentSize > r.factory.targetFileSize {
+               if err := r.flushToDataFile(ctx, outputDataFilesCh); err != nil 
{
+                       return err
+               }
+       }
+
+       return nil
+}
+
+func (r *RollingDataWriter) flushToDataFile(ctx context.Context, 
outputDataFilesCh chan<- iceberg.DataFile) error {
+       if len(r.data) == 0 {
+               return nil
+       }
+
+       task := iter.Seq[WriteTask](func(yield func(WriteTask) bool) {
+               r.factory.mu.Lock()
+               cnt, _ := r.factory.nextCount()
+               r.factory.mu.Unlock()

Review Comment:
   rather than using a mutex for this, just use an atomic.Int inside the count 
function



##########
table/rolling_data_writer.go:
##########
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "fmt"
+       "iter"
+       "net/url"
+       "sync"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+)
+
+// WriterFactory manages the creation and lifecycle of RollingDataWriter 
instances
+// for different partitions, providing shared configuration and coordination
+// across all writers in a partitioned write operation.
+type WriterFactory struct {
+       rootLocation   string
+       args           recordWritingArgs
+       meta           *MetadataBuilder
+       taskSchema     *iceberg.Schema
+       targetFileSize int64
+       writers        sync.Map
+       nextCount      func() (int, bool)
+       stopCount      func()
+       mu             sync.Mutex
+}
+
+// NewWriterFactory creates a new WriterFactory with the specified 
configuration
+// for managing rolling data writers across partitions.
+func NewWriterFactory(rootLocation string, args recordWritingArgs, meta 
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) 
WriterFactory {
+       return WriterFactory{
+               rootLocation:   rootLocation,
+               args:           args,
+               meta:           meta,
+               taskSchema:     taskSchema,
+               targetFileSize: targetFileSize,
+       }
+}
+
+// RollingDataWriter accumulates Arrow records for a specific partition and 
flushes
+// them to data files when the target file size is reached, implementing a 
rolling
+// file strategy to manage file sizes.
+type RollingDataWriter struct {
+       partitionKey    string
+       data            []arrow.Record
+       currentSize     int64
+       factory         *WriterFactory
+       mu              sync.Mutex
+       partitionValues map[int]any
+}
+
+// NewRollingDataWriter creates a new RollingDataWriter for the specified 
partition
+// with the given partition values.
+func (w *WriterFactory) NewRollingDataWriter(partition string, partitionValues 
map[int]any) *RollingDataWriter {
+       return &RollingDataWriter{
+               partitionKey:    partition,
+               data:            make([]arrow.Record, 0),
+               currentSize:     0,
+               factory:         w,
+               partitionValues: partitionValues,
+       }
+}
+
+func (w *WriterFactory) getOrCreateRollingDataWriter(partition string, 
partitionValues map[int]any) (*RollingDataWriter, error) {
+       rollingDataWriter, _ := w.writers.LoadOrStore(partition, 
w.NewRollingDataWriter(partition, partitionValues))
+       writer, ok := rollingDataWriter.(*RollingDataWriter)
+       if !ok {
+               return nil, fmt.Errorf("failed to create rolling data writer: 
%s", partition)
+       }
+
+       return writer, nil
+}
+
+// Add appends a record to the writer's buffer and flushes to a data file if 
the
+// target file size is reached.
+func (r *RollingDataWriter) Add(ctx context.Context, record arrow.Record, 
outputDataFilesCh chan<- iceberg.DataFile) error {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+
+       recordSize := recordNBytes(record)
+       record.Retain()
+
+       if r.currentSize > 0 && r.currentSize+recordSize > 
r.factory.targetFileSize {
+               if err := r.flushToDataFile(ctx, outputDataFilesCh); err != nil 
{
+                       return err
+               }
+       }

Review Comment:
   `recordSize` is not an accurate proxy for what the file size will be after 
it is written, so we shouldn't use that to figure out if we've hit 
`targetFileSize` or not. We'll always be smaller than the target in that case.



##########
table/rolling_data_writer.go:
##########
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "fmt"
+       "iter"
+       "net/url"
+       "sync"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+)
+
+// WriterFactory manages the creation and lifecycle of RollingDataWriter 
instances
+// for different partitions, providing shared configuration and coordination
+// across all writers in a partitioned write operation.
+type WriterFactory struct {
+       rootLocation   string
+       args           recordWritingArgs
+       meta           *MetadataBuilder
+       taskSchema     *iceberg.Schema
+       targetFileSize int64
+       writers        sync.Map
+       nextCount      func() (int, bool)
+       stopCount      func()
+       mu             sync.Mutex
+}
+
+// NewWriterFactory creates a new WriterFactory with the specified 
configuration
+// for managing rolling data writers across partitions.
+func NewWriterFactory(rootLocation string, args recordWritingArgs, meta 
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) 
WriterFactory {
+       return WriterFactory{
+               rootLocation:   rootLocation,
+               args:           args,
+               meta:           meta,
+               taskSchema:     taskSchema,
+               targetFileSize: targetFileSize,
+       }
+}
+
+// RollingDataWriter accumulates Arrow records for a specific partition and 
flushes
+// them to data files when the target file size is reached, implementing a 
rolling
+// file strategy to manage file sizes.
+type RollingDataWriter struct {
+       partitionKey    string
+       data            []arrow.Record

Review Comment:
   rather than keeping a slice of records, we should instead start a parquet 
file.
   
   Keeping track of the `currentSize` based on the size of the records isn't a 
good way to gauge the final file size because Parquet is compressed and 
encoded. Instead we should create a writer interface for the file formats that 
also reports the current written file size (since the Parquet writer interfaces 
support reporting the current written bytes)



##########
table/rolling_data_writer.go:
##########
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "fmt"
+       "iter"
+       "net/url"
+       "sync"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+)
+
+// WriterFactory manages the creation and lifecycle of RollingDataWriter 
instances
+// for different partitions, providing shared configuration and coordination
+// across all writers in a partitioned write operation.
+type WriterFactory struct {

Review Comment:
   same comment as above. We shouldn't export these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to