alexandre-normand commented on code in PR #721:
URL: https://github.com/apache/iceberg-go/pull/721#discussion_r2835329248


##########
table/arrow_utils.go:
##########
@@ -1358,13 +1361,81 @@ func recordsToDataFiles(ctx context.Context, 
rootLocation string, meta *Metadata
                        }
                }
 
-               return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks)
-       } else {
-               partitionWriter := newPartitionedFanoutWriter(*currentSpec, 
meta.CurrentSchema(), args.itr)
-               rollingDataWriters := NewWriterFactory(rootLocation, args, 
meta, taskSchema, targetFileSize)
-               partitionWriter.writers = &rollingDataWriters
-               workers := config.EnvConfig.MaxWorkers
+               return cw.writeFiles(ctx, rootLocation, args.fs, meta, 
meta.props, nil, tasks)
+       }
+
+       partitionWriter := newPartitionedFanoutWriter(*currentSpec, cw, 
meta.CurrentSchema(), args.itr)
+       partitionWriter.writers = newWriterFactory(rootLocation, args, meta, 
taskSchema, targetFileSize)
+       workers := config.EnvConfig.MaxWorkers
+
+       return partitionWriter.Write(ctx, workers)
+}
+
+func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation 
string, meta *MetadataBuilder, partitionDataPerFile map[string]map[int]any, 
args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, error]) {
+       if args.counter == nil {
+               args.counter = internal.Counter(0)
+       }
+
+       defer func() {
+               if r := recover(); r != nil {
+                       var err error
+                       switch e := r.(type) {
+                       case string:
+                               err = fmt.Errorf("error encountered during 
position delete file writing %s", e)
+                       case error:
+                               err = fmt.Errorf("error encountered during 
position delete file writing: %w", e)
+                       }
+                       ret = func(yield func(iceberg.DataFile, error) bool) {
+                               yield(nil, err)
+                       }
+               }
+       }()
 
-               return partitionWriter.Write(ctx, workers)
+       if args.writeUUID == nil {
+               u := uuid.Must(uuid.NewRandom())
+               args.writeUUID = &u
        }
+
+       targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey,
+               WriteTargetFileSizeBytesDefault))
+
+       currentSpec, err := meta.CurrentSpec()
+       if err != nil || currentSpec == nil {
+               panic(fmt.Errorf("%w: cannot write files without a current 
spec", err))

Review Comment:
   Done in 42755749bc82c055102dd743b2d33d9790cbf5c7. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to