zeroshade commented on code in PR #1113:
URL: https://github.com/apache/iceberg-go/pull/1113#discussion_r3290594596


##########
table/arrow_utils.go:
##########
@@ -1646,10 +1647,19 @@ func positionDeleteRecordsToDataFiles(ctx 
context.Context, rootLocation string,
                }
        }
 
-       // V3 and later prefer deletion vectors over Parquet position-delete 
files;
-       // warn so users migrate when DV-write support lands. The check is `>= 
3`
-       // rather than `== 3` so the warning carries forward to v4+ without 
churn.
-       // See apache/iceberg#12048.
+       deleteFormat := meta.props.Get(WriteDeleteFormatKey, "")
+       if deleteFormat == "" {
+               if latestMetadata.Version() >= 3 {
+                       deleteFormat = WriteDeleteFormatDV
+               } else {
+                       deleteFormat = WriteDeleteFormatPosition
+               }
+       }
+
+       if deleteFormat == WriteDeleteFormatDV {
+               return positionDeleteRecordsToDataFilesDV(ctx, rootLocation, 
args)
+       }

Review Comment:
   it looks like we're losing track of the partitions here since 
dv_writer.go:131 doesn't seem to push the partition data into the data files, 
despite us having the `partitionContextByFile` here. We probably need to push 
the partition context through the new `positionDeleteRecordsToDataFilesDV` 
function and all the way into the DVWriter so that we get the correct partition 
spec ID and partition structs into the dv manifests.
   
   You should be able to create a `TestDVWritePathPartitioned` test for this 
and assert that every returned DataFile has the proper SpecID and Partition 
fields.
   



##########
table/properties.go:
##########
@@ -76,6 +76,11 @@ const (
        WriteDeleteModeKey     = "write.delete.mode"
        WriteDeleteModeDefault = WriteModeCopyOnWrite
 
+       WriteDeleteFormatKey = "write.delete.format"
+
+       WriteDeleteFormatPosition = "position"
+       WriteDeleteFormatDV       = "dv"

Review Comment:
   the Java library uses this key for different semantics: 
`write.delete.format.default` refers to the *file format* and should be 
`parquet`, `avro` or `orc`. Let's not reuse an existing key and give it 
different semantics. 
   
   The Java library hardcodes using puffin and DVs for v3 instead of letting 
users change it with a property. We should probably follow the same behavior 
instead of introducing the new property.



##########
table/arrow_utils.go:
##########
@@ -1704,3 +1714,43 @@ func positionDeleteRecordsToDataFiles(ctx 
context.Context, rootLocation string,
 
        return partitionWriter.Write(ctx, workers)
 }
+
+func positionDeleteRecordsToDataFilesDV(ctx context.Context, rootLocation 
string, args recordWritingArgs) iter.Seq2[iceberg.DataFile, error] {
+       return func(yield func(iceberg.DataFile, error) bool) {
+               writer := dv.NewDVWriter(args.fs)
+
+               for batch, err := range args.itr {
+                       if err != nil {
+                               yield(nil, err)
+
+                               return
+                       }
+
+                       filePaths := batch.Column(0).(*array.String)
+                       positions := batch.Column(1).(*array.Int64)
+
+                       for i := range batch.NumRows() {
+                               writer.Add(filePaths.Value(int(i)), 
[]int64{positions.Value(int(i))})
+                       }
+               }
+
+               u := uuid.Must(uuid.NewRandom())
+               if args.writeUUID != nil {
+                       u = *args.writeUUID
+               }

Review Comment:
   we have this same block (almost, the other version is better) up at line 
1668, can we avoid duplicating that? Or at least move the `u := uuid.Must....` 
inside the if?



##########
table/pos_delete_v3_warn_test.go:
##########
@@ -123,20 +123,20 @@ func runPositionDeleteWrite(t *testing.T, mb 
*MetadataBuilder, partitions map[st
 }
 
 // TestPositionDeleteV3Warning verifies the writer emits a single deduped
-// slog.Warn naming the table when position-deletes are written on a v3 table,
-// and stays silent on v2 where Parquet position-deletes are still canonical.
+// slog.Warn naming the table when position-deletes are written on a v3 table
+// with write.delete.format explicitly set to "position", and stays silent on
+// v2 where Parquet position-deletes are still canonical.
 //
-// The warning fires once at writer entry, before partition fanout, so a
-// partitioned write logs once total — not once per partition. The partitioned
-// subtest locks that contract: the issue specifically called out "deduped",
-// and a future change that moved the warning into per-partition writers would
-// silently regress this property without it.
+// On v3 with the default (or explicit "dv") format, the DV writer is used
+// instead and no warning is emitted.
 func TestPositionDeleteV3Warning(t *testing.T) {
        emptyItr := func(yield func(arrow.RecordBatch, error) bool) {}
 
-       t.Run("v3 unpartitioned warns once with table location", func(t 
*testing.T) {
+       t.Run("v3 with explicit position format warns once with table 
location", func(t *testing.T) {
+               mb := newPositionDeleteUnpartitionedMetadata(t, 3)
+               mb.props = iceberg.Properties{WriteDeleteFormatKey: 
WriteDeleteFormatPosition}

Review Comment:
   use `mb.SetProperties` instead of using the internal field here.



##########
table/arrow_utils.go:
##########
@@ -1704,3 +1714,43 @@ func positionDeleteRecordsToDataFiles(ctx 
context.Context, rootLocation string,
 
        return partitionWriter.Write(ctx, workers)
 }
+
+func positionDeleteRecordsToDataFilesDV(ctx context.Context, rootLocation 
string, args recordWritingArgs) iter.Seq2[iceberg.DataFile, error] {
+       return func(yield func(iceberg.DataFile, error) bool) {
+               writer := dv.NewDVWriter(args.fs)
+
+               for batch, err := range args.itr {
+                       if err != nil {
+                               yield(nil, err)
+
+                               return
+                       }
+
+                       filePaths := batch.Column(0).(*array.String)
+                       positions := batch.Column(1).(*array.Int64)
+
+                       for i := range batch.NumRows() {
+                               writer.Add(filePaths.Value(int(i)), 
[]int64{positions.Value(int(i))})
+                       }
+               }
+
+               u := uuid.Must(uuid.NewRandom())
+               if args.writeUUID != nil {
+                       u = *args.writeUUID
+               }
+               location := rootLocation + "/data/" + 
fmt.Sprintf("dv-%s.puffin", u)

Review Comment:
   should we follow the Java lib convention of `00000-0-<UUID>-deletes.puffin` 
instead?



##########
table/arrow_utils.go:
##########
@@ -1704,3 +1714,43 @@ func positionDeleteRecordsToDataFiles(ctx 
context.Context, rootLocation string,
 
        return partitionWriter.Write(ctx, workers)
 }
+
+func positionDeleteRecordsToDataFilesDV(ctx context.Context, rootLocation 
string, args recordWritingArgs) iter.Seq2[iceberg.DataFile, error] {
+       return func(yield func(iceberg.DataFile, error) bool) {
+               writer := dv.NewDVWriter(args.fs)
+
+               for batch, err := range args.itr {
+                       if err != nil {
+                               yield(nil, err)
+
+                               return
+                       }
+
+                       filePaths := batch.Column(0).(*array.String)
+                       positions := batch.Column(1).(*array.Int64)
+
+                       for i := range batch.NumRows() {
+                               writer.Add(filePaths.Value(int(i)), 
[]int64{positions.Value(int(i))})
+                       }
+               }
+
+               u := uuid.Must(uuid.NewRandom())
+               if args.writeUUID != nil {
+                       u = *args.writeUUID
+               }
+               location := rootLocation + "/data/" + 
fmt.Sprintf("dv-%s.puffin", u)
+
+               dataFiles, err := writer.Flush(ctx, location)
+               if err != nil {
+                       yield(nil, err)
+
+                       return
+               }

Review Comment:
   move this early return above line 1737 and also do a `if len(dataFiles) == 0 
{ return }` early return. That way we can avoid the uuid generation and 
location string handling if we're gonna return early.



##########
table/dv_write_path_test.go:
##########
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table/dv"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestDVWritePathProducesReadableOutput(t *testing.T) {
+       mb := newPositionDeleteUnpartitionedMetadata(t, 3)
+       fs := iceio.NewMemFS()
+
+       batch := mustLoadRecordBatchFromJSON(PositionalDeleteArrowSchema, `[
+               {"file_path": "s3://bucket/data/file-001.parquet", "pos": 1},
+               {"file_path": "s3://bucket/data/file-001.parquet", "pos": 3},
+               {"file_path": "s3://bucket/data/file-001.parquet", "pos": 5},
+               {"file_path": "s3://bucket/data/file-002.parquet", "pos": 0},
+               {"file_path": "s3://bucket/data/file-002.parquet", "pos": 10}
+       ]`)
+       itr := func(yield func(arrow.RecordBatch, error) bool) {
+               batch.Retain()
+               yield(batch, nil)
+       }
+
+       seq := positionDeleteRecordsToDataFiles(context.Background(), 
"mem://test", mb, nil,
+               recordWritingArgs{
+                       sc:  PositionalDeleteArrowSchema,
+                       itr: itr,
+                       fs:  fs,
+               })
+
+       var dataFiles []iceberg.DataFile
+       for df, err := range seq {
+               require.NoError(t, err)
+               dataFiles = append(dataFiles, df)
+       }
+
+       require.Len(t, dataFiles, 2)
+
+       df1 := dataFiles[0]
+       assert.Equal(t, iceberg.PuffinFile, df1.FileFormat())
+       assert.Equal(t, int64(3), df1.Count())
+       assert.Equal(t, "s3://bucket/data/file-001.parquet", 
*df1.ReferencedDataFile())
+       assert.NotNil(t, df1.ContentOffset())
+       assert.NotNil(t, df1.ContentSizeInBytes())
+
+       df2 := dataFiles[1]
+       assert.Equal(t, iceberg.PuffinFile, df2.FileFormat())
+       assert.Equal(t, int64(2), df2.Count())
+       assert.Equal(t, "s3://bucket/data/file-002.parquet", 
*df2.ReferencedDataFile())
+
+       bm1, err := dv.ReadDV(fs, df1)
+       require.NoError(t, err)
+       assert.Equal(t, int64(3), bm1.Cardinality())
+       assert.True(t, bm1.Contains(1))
+       assert.True(t, bm1.Contains(3))
+       assert.True(t, bm1.Contains(5))
+       assert.False(t, bm1.Contains(2))
+
+       bm2, err := dv.ReadDV(fs, df2)
+       require.NoError(t, err)
+       assert.Equal(t, int64(2), bm2.Cardinality())
+       assert.True(t, bm2.Contains(0))
+       assert.True(t, bm2.Contains(10))
+}
+
+func TestDVWritePathV2FallsBackToPosition(t *testing.T) {
+       mb := newPositionDeleteUnpartitionedMetadata(t, 2)
+
+       emptyItr := func(yield func(arrow.RecordBatch, error) bool) {}
+
+       seq := positionDeleteRecordsToDataFiles(context.Background(), 
t.TempDir(), mb, nil,
+               recordWritingArgs{
+                       sc:  PositionalDeleteArrowSchema,
+                       itr: emptyItr,
+                       fs:  iceio.LocalFS{},
+               })

Review Comment:
   if `len(w.entries) == 0` in `positionDeleteRecordsToDataFiles` we return 
early from Flush, so since you're using the empty iterator this isn't actually 
testing anything. Likewise, the position delete path will produce no files and 
no errors on an empty case. We need to feed a non-empty iterator and then 
assert on the FileFormat of the first returned DataFile



##########
table/arrow_utils.go:
##########
@@ -1646,10 +1647,19 @@ func positionDeleteRecordsToDataFiles(ctx 
context.Context, rootLocation string,
                }
        }
 
-       // V3 and later prefer deletion vectors over Parquet position-delete 
files;
-       // warn so users migrate when DV-write support lands. The check is `>= 
3`
-       // rather than `== 3` so the warning carries forward to v4+ without 
churn.
-       // See apache/iceberg#12048.

Review Comment:
   we probably shouldn't drop this comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to