laskoviymishka commented on code in PR #721:
URL: https://github.com/apache/iceberg-go/pull/721#discussion_r2814108204


##########
manifest.go:
##########
@@ -1136,7 +1136,7 @@ func (w *ManifestWriter) Close() error {
        return w.writer.Close()
 }
 
-func (w *ManifestWriter) ToManifestFile(location string, length int64) 
(ManifestFile, error) {
+func (w *ManifestWriter) ToManifestFile(location string, length int64, content 
ManifestContent) (ManifestFile, error) {

Review Comment:
   I think table/internal.DataFileStatistics.ToDataFile signature changes are 
acceptable (it's internal so who care?), but ManifestWriter.ToManifestFile is 
externally visible and now requires a new content arg. 
   
   This is backward incompatible change so 2 questions:
   
   1. Is it really really needed?
   2. If it's needed - we need to state it in changelog.



##########
table/arrow_scanner.go:
##########
@@ -189,6 +192,54 @@ func processPositionalDeletes(ctx context.Context, deletes 
set[int64]) recProces
        }
 }
 
+// enrichRecordsWithPosDeleteFields enriches a RecordBatch with the columns 
declared in the PositionalDeleteArrowSchema
+// so that during the pipeline filtering stages that sheds filtered out 
records, we still have a way to
+// preserve the original position of those records.
+func enrichRecordsWithPosDeleteFields(ctx context.Context, filePath 
iceberg.DataFile) recProcessFn {

Review Comment:
   Please add focused unit tests for enrichRecordsWithPosDeleteFields:
   
   - monotonically increasing pos across multiple input batches
   - exact appended schema/column ordering
   - empty batch behavior
   - failure mode if required fields are missing from 
PositionalDeleteArrowSchema.



##########
table/pos_delete_partitioned_fanout_writer.go:
##########
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "iter"
+       "path"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/iceberg-go"
+       "golang.org/x/sync/errgroup"
+)
+
+// positionDeletePartitionedFanoutWriter distributes Arrow position delete 
records across multiple partitions based on
+// a partition specification, writing data to separate delete files for each 
partition using
+// a fanout pattern with configurable parallelism.
+type positionDeletePartitionedFanoutWriter struct {

Review Comment:
   Could we add unit tests for positionDeletePartitionedFanoutWriter covering:
   
   - mixed file_path values in one batch (assert/handle explicitly)
   - missing partitionDataByFilePath entry (error instead of silent nil)
   - empty batch behavior and cancellation path.
   
   Also, consider avoiding defer record.Release() inside the loop to prevent 
delayed releases in long runs.



##########
table/arrow_utils.go:
##########
@@ -1358,13 +1361,81 @@ func recordsToDataFiles(ctx context.Context, 
rootLocation string, meta *Metadata
                        }
                }
 
-               return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks)
-       } else {
-               partitionWriter := newPartitionedFanoutWriter(*currentSpec, 
meta.CurrentSchema(), args.itr)
-               rollingDataWriters := NewWriterFactory(rootLocation, args, 
meta, taskSchema, targetFileSize)
-               partitionWriter.writers = &rollingDataWriters
-               workers := config.EnvConfig.MaxWorkers
+               return cw.writeFiles(ctx, rootLocation, args.fs, meta, 
meta.props, nil, tasks)
+       }
+
+       partitionWriter := newPartitionedFanoutWriter(*currentSpec, cw, 
meta.CurrentSchema(), args.itr)
+       partitionWriter.writers = newWriterFactory(rootLocation, args, meta, 
taskSchema, targetFileSize)
+       workers := config.EnvConfig.MaxWorkers
+
+       return partitionWriter.Write(ctx, workers)
+}
+
+func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation 
string, meta *MetadataBuilder, partitionDataPerFile map[string]map[int]any, 
args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, error]) {
+       if args.counter == nil {
+               args.counter = internal.Counter(0)
+       }
+
+       defer func() {
+               if r := recover(); r != nil {
+                       var err error
+                       switch e := r.(type) {
+                       case string:
+                               err = fmt.Errorf("error encountered during 
position delete file writing %s", e)
+                       case error:
+                               err = fmt.Errorf("error encountered during 
position delete file writing: %w", e)
+                       }
+                       ret = func(yield func(iceberg.DataFile, error) bool) {
+                               yield(nil, err)
+                       }
+               }
+       }()
 
-               return partitionWriter.Write(ctx, workers)
+       if args.writeUUID == nil {
+               u := uuid.Must(uuid.NewRandom())
+               args.writeUUID = &u
        }
+
+       targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey,
+               WriteTargetFileSizeBytesDefault))
+
+       currentSpec, err := meta.CurrentSpec()
+       if err != nil || currentSpec == nil {
+               panic(fmt.Errorf("%w: cannot write files without a current 
spec", err))

Review Comment:
   Can we avoid panic/recover for expected failures here?
   CurrentSpec() and schema conversion errors are regular error cases and 
returning an error iterator directly would be clearer/safer than panicking. 
Also, the recover block only handles string and error; any other panic type 
could produce yield(nil, nil), which masks failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to