laskoviymishka commented on code in PR #1099:
URL: https://github.com/apache/iceberg-go/pull/1099#discussion_r3289810518


##########
table/transaction.go:
##########
@@ -1488,13 +1501,23 @@ func (t *Transaction) 
classifyFilesForFilteredDeletions(ctx context.Context, fs
                                        localDelete = append(localDelete, df)
                                } else {
                                        localRewrite = append(localRewrite, df)
+                                       // Capture the file's sequence number 
from the
+                                       // manifest entry so the rewrite path 
can synthesize
+                                       // _last_updated_sequence_number for 
source rows
+                                       // that have a null value (or no 
column) in the file.
+                                       if fseq := entry.FileSequenceNum(); 
fseq != nil {

Review Comment:
   I think this is reading the wrong manifest field. Spec §Row Lineage says 
when `_last_updated_sequence_number` is null on read, it gets assigned the 
manifest entry's `sequence_number` (field id 3, the data sequence number) — not 
`file_sequence_number` (field id 4, the snapshot that added the file). For 
back-dated files where they diverge, a Java or iceberg-rust reader would 
compute a different value for the same row.
   
   I'd switch both this site and the matching one in `scanner.go` to 
`entry.SequenceNum()`. `SequenceNum()` returns `int64` not `*int64`, so the nil 
guard collapses — drop the conditional or guard on `>= 0`. While we're here, 
I'd rename `FileScanTask.DataSequenceNumber` if that's where this plumbs 
through, so the field name matches what it actually holds.



##########
table/rewrite_data_files.go:
##########
@@ -289,6 +289,20 @@ func ExecuteCompactionGroup(ctx context.Context, tbl 
*Table, group CompactionTas
                scanOpts = append(scanOpts, 
WitMaxConcurrency(cfg.scanConcurrency))
        }
 
+       // Preserve row lineage only when every source file in the group carries
+       // it. A mixed group (some files with FirstRowID, some without — e.g.
+       // legacy files on a v3 table) would otherwise produce one output where
+       // post-lineage rows have explicit _row_id values and pre-lineage rows
+       // have nulls, which violates the per-file uniqueness/coverage
+       // invariant the v3 spec requires. Splitting mixed groups into separate
+       // outputs is a larger refactor and is left as a follow-up; for now we
+       // degrade gracefully (the rewrite still succeeds, but lineage is not
+       // preserved for the surviving rows).
+       preserveLineage := tbl.metadata.Version() >= 3 && 
allTasksHaveRowLineage(group.Tasks)

Review Comment:
   Mixed-lineage groups silently fall through to the non-lineage path. 
Compaction succeeds, the output is valid, but `_row_id` is gone — no log, no 
metric, no flag in `CompactionGroupResult`. For tables migrating to v3, every 
compaction pass over a group that contains any legacy file will quietly drop 
lineage on the surviving rows. That's the common case during migration, not the 
edge case.
   
   I'd at minimum add a `slog.Warn` matching the v3 pos-delete pattern, and 
ideally a `RowLineagePreserved bool` on `CompactionGroupResult` so callers can 
observe it.



##########
table/scanner_internal_test.go:
##########
@@ -238,98 +236,71 @@ func TestBuildPartitionEvaluatorWithInvalidSpecID(t 
*testing.T) {
        assert.ErrorContains(t, err, "id 999")
 }
 
-// TestProjectionV3PreLineageFile verifies that Projection() succeeds and 
returns
-// _row_id and _last_updated_sequence_number as nullable (all-null-capable) 
fields when
-// the table is v3 with next-row-id set but the data file predates row lineage 
(those
-// columns are absent from the schema).
-func TestProjectionV3PreLineageFile(t *testing.T) {
-       schema := iceberg.NewSchema(
-               1,
-               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
-               iceberg.NestedField{ID: 2, Name: "payload", Type: 
iceberg.PrimitiveTypes.String, Required: false},
-       )
+// TestSynthesizeRowLineageColumns verifies that _row_id and 
_last_updated_sequence_number
+// are filled from task constants when those columns are present and null.
+func TestSynthesizeRowLineageColumns(t *testing.T) {

Review Comment:
   The test only constructs all-null inputs, so it doesn't actually prove the 
property the feature depends on. If `synthesizeRowLineageColumns` had a bug 
where it overwrote pre-existing non-null values, this test would still pass — 
and so would `TestCoWRewritePreservesRowID`, because fresh appends produce 
all-null source columns too.
   
   The case that matters is the mixed one: some rows have explicit `_row_id` / 
`_last_updated_sequence_number` values from a prior write, the rest are null. 
That only arises after multiple rewrites of files that already carry explicit 
values, which is exactly the scenario the spec's null-coalescing rule exists 
for.
   
   I'd add a fixture that builds a record with three rows — one explicit 
non-null, one null, one explicit non-null with a different value — and asserts 
the non-null ones survive untouched while the null one picks up `firstRowID + 
position`. Same shape for `_last_updated_sequence_number`.



##########
metadata_columns.go:
##########
@@ -59,3 +61,38 @@ func LastUpdatedSequenceNumber() NestedField {
 func IsMetadataColumn(fieldID int) bool {
        return fieldID == RowIDFieldID || fieldID == 
LastUpdatedSequenceNumberFieldID
 }
+
+// SchemaWithRowLineage returns a new schema with the row-lineage metadata 
columns
+// (_row_id, _last_updated_sequence_number) appended to the given schema's 
fields.
+// Used when reading source files during a CoW rewrite or compaction so that 
row
+// identity and per-row update sequence are preserved in the output.
+//
+// Idempotent: if a row-lineage column is already present (by reserved field 
ID),
+// it is not appended again. The returned schema always allocates a fresh field
+// slice so it cannot alias the input schema's backing array.
+func SchemaWithRowLineage(s *Schema) *Schema {

Review Comment:
   Two things on the new helper.
   
   The comment a few lines up reads as if the second reserved ID is literally 
108. Should be "Integer.MAX_VALUE - 107 and Integer.MAX_VALUE - 108" — 
`2147483540` and `2147483539` respectively.
   
   More substantively, `SchemaWithRowLineage` lives in the top-level `iceberg` 
package, which makes it part of the v1 public API surface. Java keeps the 
equivalent logic internal. I'd consider an unexported `schemaWithRowLineage` in 
`table`, or a method on `*Schema`, so we're not locked into the helper's exact 
shape if v4 lineage semantics shift. Same for `WithPreserveRowLineage(schema)` 
— making callers construct the schema feels backwards; could it take no arg and 
derive from the table internally?
   
   Also worth a nil guard on `s` at the top here — it's exported, a nil 
receiver will panic on `s.Fields()`.



##########
table/transaction.go:
##########
@@ -1530,14 +1565,50 @@ func (t *Transaction) rewriteFilesWithFilter(ctx 
context.Context, fs io.IO, upda
        return nil
 }
 
-// rewriteSingleFile reads a single data file, applies the filter, and writes 
new files with filtered data
-func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO, 
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, caseSensitive 
bool, commitUUID uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+// rewriteSingleFile reads a single data file, applies the filter, and writes 
new files with filtered data.
+// fileSeqNum is the source file's file_sequence_number from its manifest 
entry; required to synthesize
+// _last_updated_sequence_number for rows whose value is null in the source 
file.
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO, 
originalFile iceberg.DataFile, fileSeqNum *int64, filter 
iceberg.BooleanExpression, postFilter func(arrow.RecordBatch) 
(arrow.RecordBatch, error), caseSensitive bool, commitUUID uuid.UUID, 
concurrency int) ([]iceberg.DataFile, error) {

Review Comment:
   Nine positional parameters with several same-typed adjacents (`commitUUID, 
concurrency`, and now two same-shaped filter closures) — no compile-time order 
protection if anyone swaps a pair. I'd pull these into a 
`rewriteSingleFileArgs` struct.
   
   Separate but related: `BindExpr` runs unconditionally in 
`rewriteSingleFile`, but when `preserveRowLineage == true` the bound filter is 
unused (scanFilter becomes `AlwaysTrue{}`) and the same expression is bound 
again inside `prepareBatchFilter`. With `preserveRowLineage == false`, 
`prepareBatchFilter` runs `BindExpr` + `ConvertExpr` but `postFilter` is never 
called. One of those two binds is always wasted.



##########
table/scanner.go:
##########
@@ -261,26 +262,96 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
                }
        }
 
-       if slices.Contains(scan.selectedFields, "*") {
-               return curSchema, nil
+       if scan.includeRowLineage && curVersion < minFormatVersionRowLineage {
+               return nil, fmt.Errorf("%w: row lineage requires format version 
%d, table is v%d",
+                       ErrInvalidOperation, minFormatVersionRowLineage, 
curVersion)
        }
 
-       selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields, 
caseSensitive)
-       schemaMeta := metaFieldsFromSchema(curSchema)
-       synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
-       if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
+       var schema *iceberg.Schema
+       if slices.Contains(scan.selectedFields, "*") {
+               schema = curSchema
+       } else {
+               // Intercept row-lineage metadata column names (_row_id,
+               // _last_updated_sequence_number) before calling Select: they 
are
+               // reserved and never appear in the user schema's fields, so
+               // Select would fail with "could not find column" on v3 tables
+               // where they are otherwise legal to project. The scanner reads
+               // them from file metadata (or synthesizes them) at scan time;
+               // here we just need to ensure they survive into the projection.
+               //
+               // On v1/v2 tables, Select is left to fail naturally — those
+               // versions don't have row lineage, so requesting these columns
+               // is an error.
+               userFields, lineageFields := 
splitLineageMetadataFields(scan.selectedFields, scan.caseSensitive)
+               if len(lineageFields) > 0 && curVersion < 
minFormatVersionRowLineage {

Review Comment:
   The rejection here works only because `Select` happens to fail when 
`_row_id` isn't in the v1/v2 schema — the user sees `ErrInvalidSchema` rather 
than a clear "row lineage requires v3". And it's fragile: if a future v2 schema 
ever carries a column literally named `_row_id` for unrelated reasons, this 
short-circuits in the wrong direction.
   
   I'd return an explicit error here for the `curVersion < 
minFormatVersionRowLineage && len(lineageFields) > 0` case, naming the format 
version requirement. Same logic, but the contract is in the code rather than 
emergent from `Select`'s error path.



##########
table/transaction.go:
##########
@@ -1798,3 +1905,53 @@ func (s *StagedTable) Refresh(ctx context.Context) 
(*Table, error) {
 func (s *StagedTable) Scan(opts ...ScanOption) *Scan {
        panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation))
 }
+
+// prepareBatchFilter binds the given Iceberg filter against schema and 
converts
+// it to substrait once, returning a per-batch filter function that can be
+// reused across every record batch. The setup work (BindExpr, ConvertExpr) is
+// independent of the batch and is the most expensive part of filter-eval, so
+// hoisting it out of the iterator loop is a measurable win on rewrites that
+// produce many batches.
+//
+// The returned function takes ownership of the input batch (it releases it on
+// the AlwaysFalse fast-path) and returns a possibly-new batch the caller is
+// responsible for releasing.
+func prepareBatchFilter(ctx context.Context, filter iceberg.BooleanExpression, 
schema *iceberg.Schema, caseSensitive bool) (func(arrow.RecordBatch) 
(arrow.RecordBatch, error), error) {
+       if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+               return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
+                       return rec, nil
+               }, nil
+       }
+
+       bound, err := iceberg.BindExpr(schema, filter, caseSensitive)
+       if err != nil {
+               return nil, fmt.Errorf("prepareBatchFilter: bind expression: 
%w", err)
+       }
+
+       if bound == nil {
+               return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
+                       return rec, nil
+               }, nil
+       }
+
+       if bound.Equals(iceberg.AlwaysFalse{}) {
+               // Return a record with the same schema and zero rows. 
NewSlice(0, 0)
+               // preserves the per-field arrays so downstream code that calls
+               // rec.Column(i) does not panic on the empty result.
+               return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
+                       defer rec.Release()
+
+                       return rec.NewSlice(0, 0), nil
+               }, nil
+       }
+
+       extSet, substraitFilter, err := substrait.ConvertExpr(schema, bound, 
caseSensitive)
+       if err != nil {
+               return nil, fmt.Errorf("prepareBatchFilter: convert expression: 
%w", err)
+       }
+
+       ctx = exprs.WithExtensionIDSet(ctx, 
exprs.NewExtensionSetDefault(*extSet))

Review Comment:
   The closure captures `ctx` at construction time with the extension-ID set 
baked in, but per-batch allocator and deadline values from the call-site ctx 
are then lost. If `filterRecords` resolves the allocator from ctx, intermediate 
arrays end up on the construction-time allocator and 
`memory.NewCheckedAllocator` accounting goes sideways under test.
   
   I'd take the call-site ctx as a parameter on the returned closure, derive 
the extension-set context inside, and let allocator/deadline propagate.
   
   While we're in here: the positional-binding assumption (lineage fields at 
indices N..N+1 in the record batch, filter bound against indices 0..N-1) is 
correct but undocumented. Worth a one-line comment so a future change to 
lineage column placement doesn't silently break filter evaluation.



##########
table/scanner.go:
##########
@@ -261,26 +262,96 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
                }
        }
 
-       if slices.Contains(scan.selectedFields, "*") {
-               return curSchema, nil
+       if scan.includeRowLineage && curVersion < minFormatVersionRowLineage {
+               return nil, fmt.Errorf("%w: row lineage requires format version 
%d, table is v%d",
+                       ErrInvalidOperation, minFormatVersionRowLineage, 
curVersion)
        }
 
-       selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields, 
caseSensitive)
-       schemaMeta := metaFieldsFromSchema(curSchema)
-       synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
-       if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
+       var schema *iceberg.Schema
+       if slices.Contains(scan.selectedFields, "*") {
+               schema = curSchema
+       } else {
+               // Intercept row-lineage metadata column names (_row_id,
+               // _last_updated_sequence_number) before calling Select: they 
are
+               // reserved and never appear in the user schema's fields, so
+               // Select would fail with "could not find column" on v3 tables
+               // where they are otherwise legal to project. The scanner reads
+               // them from file metadata (or synthesizes them) at scan time;
+               // here we just need to ensure they survive into the projection.
+               //
+               // On v1/v2 tables, Select is left to fail naturally — those
+               // versions don't have row lineage, so requesting these columns
+               // is an error.
+               userFields, lineageFields := 
splitLineageMetadataFields(scan.selectedFields, scan.caseSensitive)
+               if len(lineageFields) > 0 && curVersion < 
minFormatVersionRowLineage {
+                       userFields = scan.selectedFields
+                       lineageFields = nil
+               }
 
-               // synthesis path
-               removedMetaSlice, missingMetaFields := 
removeMetadataFromSelectedFields(scan.selectedFields, synthesisMeta)
-               sch, err := curSchema.Select(scan.caseSensitive, 
removedMetaSlice...)
+               var err error
+               schema, err = curSchema.Select(scan.caseSensitive, 
userFields...)
                if err != nil {
                        return nil, err
                }
+               if len(lineageFields) > 0 {
+                       schema = appendMissingLineageFields(schema, 
lineageFields)
+               }
+       }
+
+       if scan.includeRowLineage {

Review Comment:
   There are now two code paths that select lineage columns into the 
projection: the `includeRowLineage` flag path (calls `SchemaWithRowLineage`) 
and the explicit-field-name path (calls `appendMissingLineageFields`). Both run 
when the user supplied lineage field names *and* set `WithRowLineage`, 
producing a redundant pass.
   
   More concerning, they have subtly different error behavior — one rejects on 
v1/v2 with a clear error, the other relies on `Select` to fail. Anyone adding a 
third lineage column down the road has to update both paths and both test sets.
   
   I'd canonicalize on one path: detect lineage fields in `selectedFields` up 
front, set `scan.includeRowLineage = true` if any are present, drop them from 
`selectedFields`, and from there only the `includeRowLineage` branch runs. wdyt?



##########
table/transaction.go:
##########
@@ -1798,3 +1905,53 @@ func (s *StagedTable) Refresh(ctx context.Context) 
(*Table, error) {
 func (s *StagedTable) Scan(opts ...ScanOption) *Scan {
        panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation))
 }
+
+// prepareBatchFilter binds the given Iceberg filter against schema and 
converts
+// it to substrait once, returning a per-batch filter function that can be
+// reused across every record batch. The setup work (BindExpr, ConvertExpr) is
+// independent of the batch and is the most expensive part of filter-eval, so
+// hoisting it out of the iterator loop is a measurable win on rewrites that
+// produce many batches.
+//
+// The returned function takes ownership of the input batch (it releases it on
+// the AlwaysFalse fast-path) and returns a possibly-new batch the caller is
+// responsible for releasing.
+func prepareBatchFilter(ctx context.Context, filter iceberg.BooleanExpression, 
schema *iceberg.Schema, caseSensitive bool) (func(arrow.RecordBatch) 
(arrow.RecordBatch, error), error) {

Review Comment:
   The `AlwaysFalse` fast-path does `defer rec.Release()`; the `AlwaysTrue` / 
bound-nil paths return `rec` unchanged with no Retain. So depending on which 
fast-path triggers, the caller either does or doesn't own a reference to the 
returned record. That's two contracts for one return type, and the next caller 
to add a `Release()` will double-free on the AlwaysFalse path.
   
   I'd have every fast-path do a `rec.Retain()` so the caller always owns 
exactly one reference on the result, mirroring the bound-filter path's 
semantics.



##########
table/row_lineage_rewrite_test.go:
##########
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "path/filepath"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func newV3RowLineageTestTable(t *testing.T) *table.Table {
+       t.Helper()
+
+       location := filepath.ToSlash(t.TempDir())
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+       meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec, 
table.UnsortedSortOrder, location,
+               iceberg.Properties{table.PropertyFormatVersion: "3"})
+       require.NoError(t, err)
+
+       metaLoc := location + "/metadata/v1.metadata.json"
+       fsF := func(context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil }
+       cat := &concurrentTestCatalog{metadata: meta, location: metaLoc, fsF: 
fsF}
+
+       return table.New(table.Identifier{"db", "row_lineage_test"}, meta, 
metaLoc, fsF, cat)
+}
+
+// TestCoWRewritePreservesRowID verifies that a copy-on-write overwrite with a
+// row filter preserves the original _row_id and _last_updated_sequence_number
+// values in the rewritten file. Surviving rows must keep both values from the
+// pre-rewrite snapshot — the rewrite is "physically rewritten", not "logically
+// updated", per the v3 spec.
+func TestCoWRewritePreservesRowID(t *testing.T) {
+       ctx := context.Background()
+       mem := memory.DefaultAllocator
+
+       tbl := newV3RowLineageTestTable(t)
+
+       // Append 3 rows: id=1,2,3
+       arrowSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+
+       initialData, err := array.TableFromJSON(mem, arrowSchema, []string{
+               `[{"id": 1, "data": "a"}, {"id": 2, "data": "b"}, {"id": 3, 
"data": "c"}]`,
+       })
+       require.NoError(t, err)
+       defer initialData.Release()
+
+       tbl, err = tbl.Append(ctx, array.NewTableReader(initialData, -1), nil)
+       require.NoError(t, err)
+
+       // Verify the append created a valid v3 snapshot with row lineage.
+       snap := tbl.CurrentSnapshot()
+       require.NotNil(t, snap)
+       require.NotNil(t, snap.FirstRowID, "v3 snapshot must have first-row-id")
+       require.NotNil(t, snap.AddedRows, "v3 snapshot must have added-rows")
+       assert.Equal(t, int64(0), *snap.FirstRowID)
+       assert.Equal(t, int64(3), *snap.AddedRows)
+
+       // Capture the snapshot's sequence number so we can assert preservation
+       // after the rewrite. After the append, every row's effective
+       // _last_updated_sequence_number should be this value.
+       createSeq := snap.SequenceNumber
+
+       // Scan with row lineage to see synthesized _row_id values before the 
rewrite.
+       lineageScan := tbl.Scan(table.WithRowLineage())
+       schema, itr, err := lineageScan.ToArrowRecords(ctx)
+       require.NoError(t, err)
+
+       rowIDIdx := -1
+       for i, f := range schema.Fields() {
+               if f.Name == iceberg.RowIDColumnName {
+                       rowIDIdx = i
+
+                       break
+               }
+       }
+       require.GreaterOrEqual(t, rowIDIdx, 0, "_row_id should be in scan 
projection")
+
+       var originalRowIDs []int64
+       for rec, err := range itr {
+               require.NoError(t, err)
+               col := rec.Column(rowIDIdx).(*array.Int64)
+               for i := 0; i < col.Len(); i++ {
+                       originalRowIDs = append(originalRowIDs, col.Value(i))
+               }
+               rec.Release()
+       }
+       require.Equal(t, []int64{0, 1, 2}, originalRowIDs, "initial _row_id 
should be 0,1,2")
+
+       // CoW overwrite: delete the row where id=2, preserving id=1 and id=3.
+       filter := iceberg.EqualTo(iceberg.Reference("id"), int64(2))
+       tbl, err = tbl.Delete(ctx, filter, nil)
+       require.NoError(t, err)
+
+       snap = tbl.CurrentSnapshot()
+       require.NotNil(t, snap)
+       require.Greater(t, snap.SequenceNumber, createSeq,
+               "sanity: rewrite snapshot must have a higher sequence number 
than the create snapshot")
+
+       // Scan the result with row lineage. The surviving rows should preserve 
their
+       // original _row_id values: 0 and 2. Their 
_last_updated_sequence_number must
+       // also still report the create snapshot's seq, NOT the rewrite 
snapshot's
+       // seq — the rewrite is physical only, not a logical update.
+       lineageScan = tbl.Scan(table.WithRowLineage())
+       _, itr, err = lineageScan.ToArrowRecords(ctx)
+       require.NoError(t, err)
+
+       var afterRowIDs []int64
+       var afterIDs []int64
+       var afterSeq []int64
+       for rec, err := range itr {
+               require.NoError(t, err)
+               idIdx := rec.Schema().FieldIndices("id")
+               require.NotEmpty(t, idIdx)
+               rowIDIndices := 
rec.Schema().FieldIndices(iceberg.RowIDColumnName)
+               require.NotEmpty(t, rowIDIndices)
+               seqIndices := 
rec.Schema().FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+               require.NotEmpty(t, seqIndices, "_last_updated_sequence_number 
must be in projection")
+
+               idCol := rec.Column(idIdx[0]).(*array.Int64)
+               rowIDCol := rec.Column(rowIDIndices[0]).(*array.Int64)
+               seqCol := rec.Column(seqIndices[0]).(*array.Int64)
+               for i := 0; i < int(rec.NumRows()); i++ {
+                       afterIDs = append(afterIDs, idCol.Value(i))
+                       afterRowIDs = append(afterRowIDs, rowIDCol.Value(i))
+                       require.False(t, seqCol.IsNull(i),
+                               "row %d must have a non-null 
_last_updated_sequence_number after CoW rewrite", i)
+                       afterSeq = append(afterSeq, seqCol.Value(i))
+               }
+               rec.Release()
+       }
+
+       assert.Equal(t, []int64{1, 3}, afterIDs, "remaining rows should be 
id=1,3")
+       assert.Equal(t, []int64{0, 2}, afterRowIDs,
+               "_row_id must be preserved through CoW rewrite: row with id=1 
keeps _row_id=0, row with id=3 keeps _row_id=2")
+       assert.Equal(t, []int64{createSeq, createSeq}, afterSeq,
+               "_last_updated_sequence_number must report the original 
creation snapshot's sequence number, not the rewrite's")
+}
+
+// TestCoWRewriteRowIDNextRowIDAccounting verifies that row-id accounting 
remains
+// correct after a CoW rewrite. The overcounting (where next-row-id advances by
+// the full manifest row count including preserved survivors) is intentional 
and
+// matches Java's ManifestListWriter.V3Writer behavior.
+func TestCoWRewriteRowIDNextRowIDAccounting(t *testing.T) {
+       ctx := context.Background()
+       mem := memory.DefaultAllocator
+
+       tbl := newV3RowLineageTestTable(t)
+
+       arrowSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+
+       data, err := array.TableFromJSON(mem, arrowSchema, []string{
+               `[{"id": 10, "data": "x"}, {"id": 20, "data": "y"}, {"id": 30, 
"data": "z"}]`,
+       })
+       require.NoError(t, err)
+       defer data.Release()
+
+       tbl, err = tbl.Append(ctx, array.NewTableReader(data, -1), nil)
+       require.NoError(t, err)
+
+       // next-row-id should be 3 after appending 3 rows.
+       assert.Equal(t, int64(3), tbl.Metadata().NextRowID())
+
+       // Delete one row via CoW.
+       filter := iceberg.EqualTo(iceberg.Reference("id"), int64(20))
+       tbl, err = tbl.Delete(ctx, filter, nil)
+       require.NoError(t, err)
+
+       // next-row-id advances by 3 (original) + 2 (rewritten survivors) = 5,
+       // even though the surviving rows preserve their old IDs. This "wastes"
+       // ID space but doesn't violate uniqueness — actual row IDs come from
+       // the explicit Parquet column, not the global counter.
+       assert.Equal(t, int64(5), tbl.Metadata().NextRowID(),
+               "next-row-id should advance by original (3) + rewritten (2) = 
5")

Review Comment:
   The comment is arithmetically misleading — the advance is `2` (the new 
manifest's added rows), and `5 = firstRowID(3) + 2`. The "3" is the starting 
value, not an addend. Worth rewriting the comment to match the actual 
computation, and linking to the `snapshot_producers.go` site that mirrors 
`ManifestListWriter.V3Writer.prepare()` in Java.
   
   Separately, I'd double-check `snapshot.AddedRows` here — Java's `added-rows` 
summary field is the actual added row count, not `existing + added`. If we're 
stamping the Java-equivalent of `existing + added` into the snapshot summary, 
downstream tools reading that field will see inflated numbers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to