laskoviymishka commented on code in PR #1099:
URL: https://github.com/apache/iceberg-go/pull/1099#discussion_r3277166972


##########
metadata_columns.go:
##########
@@ -59,3 +59,12 @@ func LastUpdatedSequenceNumber() NestedField {
 func IsMetadataColumn(fieldID int) bool {
        return fieldID == RowIDFieldID || fieldID == 
LastUpdatedSequenceNumberFieldID
 }
+
+// SchemaWithRowID returns a new schema that appends the _row_id metadata 
column
+// to the given schema's fields. Used when reading source files during a CoW 
rewrite
+// so that row identity is preserved in the output.
+func SchemaWithRowID(s *Schema) *Schema {
+       fields := append(s.Fields(), RowID())

Review Comment:
   Two issues here that bite in practice.
   
   First, if the input schema already has a field with ID 1 (e.g. a user column 
named `id`), `NewSchemaWithIdentifiers` will reject the duplicate and we panic 
at runtime. We can't assume user schemas avoid 1, 2, 3.
   
   Second, `append(schema.Fields(), ...)` aliases the original schema's backing 
array when there's spare cap. A later mutation of either schema can corrupt the 
other. I'd `slices.Clone(schema.Fields())` first, and either skip-or-error when 
one of the row-lineage IDs collides instead of panicking. wdyt?



##########
table/rewrite_data_files.go:
##########
@@ -289,6 +289,11 @@ func ExecuteCompactionGroup(ctx context.Context, tbl 
*Table, group CompactionTas
                scanOpts = append(scanOpts, 
WitMaxConcurrency(cfg.scanConcurrency))
        }
 
+       preserveLineage := tbl.metadata.Version() >= 3 && 
hasRowLineage(group.Tasks)

Review Comment:
   Mixing row-lineage and non-row-lineage source files in a single rewrite 
group is going to produce a file where some rows have `_row_id` / 
`_last_updated_sequence_number` and others don't, which violates the per-file 
invariant. `hasRowLineage` returning true if *any* task has lineage means the 
lineage path runs for the whole group regardless of the mix.
   
   I'd partition the input files by file-level `has_row_lineage` and rewrite 
each partition separately, or skip non-lineage files when the table has row 
lineage enabled. Silently merging them feels like a correctness bug waiting to 
surface.



##########
table/transaction.go:
##########
@@ -1532,6 +1532,26 @@ func (t *Transaction) rewriteSingleFile(ctx 
context.Context, fs io.IO, originalF
                Length: originalFile.FileSizeBytes(),
        }
 
+       // Preserve row lineage for v3 tables: include _row_id in the scan
+       // projection so it is synthesized (or read from existing explicit 
values)
+       // and written into the output file. _last_updated_sequence_number is 
left
+       // out — it will be null in the output and inherited from the new file's
+       // data_sequence_number at read time, which satisfies the spec 
requirement

Review Comment:
   The spec says `last-updated-sequence-number` on a data file is the sequence 
number of the snapshot in which the row was last updated. Leaving it null on 
rewrite and inheriting from the new file's `data_sequence_number` conflates 
"physically rewritten" with "logically updated" and breaks downstream readers 
that use this for CDC / incremental scans.
   
   For a pure compaction (no row changes), we should preserve the original 
per-row `_last_updated_sequence_number` from the source files. Otherwise every 
compaction looks like a full table update to consumers.



##########
table/scanner.go:
##########
@@ -261,26 +260,22 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
                }
        }
 
+       var schema *iceberg.Schema
        if slices.Contains(scan.selectedFields, "*") {
-               return curSchema, nil
-       }
-
-       selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields, 
caseSensitive)
-       schemaMeta := metaFieldsFromSchema(curSchema)
-       synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
-       if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
-
-               // synthesis path
-               removedMetaSlice, missingMetaFields := 
removeMetadataFromSelectedFields(scan.selectedFields, synthesisMeta)
-               sch, err := curSchema.Select(scan.caseSensitive, 
removedMetaSlice...)
+               schema = curSchema
+       } else {
+               var err error
+               schema, err = curSchema.Select(scan.caseSensitive, 
scan.selectedFields...)

Review Comment:
   This loses the metadata-column escape hatch that the old synthesis path 
supported. `Select("_row_id")` (or `Select("_last_updated_sequence_number")`) 
now goes straight to `curSchema.Select` and fails because the user schema 
doesn't contain those names — the only way to get those columns is 
`WithRowLineage()`, which is a separate API.
   
   If that's intentional, worth a CHANGELOG note. If not, we should intercept 
the known metadata column names here before the schema lookup, the way the 
removed `synthesizeMeta`/`removeMetadataFromSelectedFields` did.



##########
table/write_records.go:
##########
@@ -94,6 +95,16 @@ func WithClusteredWrite() WriteRecordOption {
        }
 }
 
+// WithPreserveRowLineage sets the output file schema to include _row_id so 
that
+// row identity is preserved through rewrites and compactions on v3 tables. The
+// input records must already contain the _row_id column (e.g. from a scan that
+// projected lineage columns).
+func WithPreserveRowLineage(schema *iceberg.Schema) WriteRecordOption {

Review Comment:
   `WithPreserveRowLineage` is accepted on any table — there's no check that 
the table actually has v3 row lineage enabled, or that the input records 
contain `_row_id`. A caller who sets it on a v2 table or feeds records without 
the lineage column gets silently-wrong output rather than an error.
   
   I'd validate at option-application time (or inside `WriteRecords` once we 
have the `tbl`): if the table format is < 3, error. If the input Arrow schema 
is missing `_row_id`, error.



##########
table/table.go:
##########
@@ -816,6 +816,14 @@ func WithOptions(opts iceberg.Properties) ScanOption {
        }
 }
 
+// WithRowLineage adds _row_id to the scan projection so that row identity is

Review Comment:
   The doc says this is "ignored" on non-v3 tables, but the implementation sets 
`scan.includeRowLineage = true` unconditionally and `SchemaWithRowID` then runs 
in `Projection()` regardless of format version. On a v1/v2 table that ends up 
projecting a metadata column the table doesn't support.
   
   Either the doc is wrong or the guard is missing. I'd add an explicit 
format-version check (either error or true no-op) and make the doc match the 
behavior.



##########
table/transaction.go:
##########
@@ -1788,3 +1843,40 @@ func (s *StagedTable) Refresh(ctx context.Context) 
(*Table, error) {
 func (s *StagedTable) Scan(opts ...ScanOption) *Scan {
        panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation))
 }
+
+// filterRecordBatch applies an Iceberg filter to a record batch, returning
+// only matching rows. The filter is provided unbound and is bound against the
+// given schema internally. Used for post-synthesis row filtering in CoW 
rewrites
+// where row lineage must be synthesized before filtering.
+func filterRecordBatch(ctx context.Context, rec arrow.RecordBatch, filter 
iceberg.BooleanExpression, schema *iceberg.Schema, caseSensitive bool) 
(arrow.RecordBatch, error) {

Review Comment:
   Two things in `filterRecordBatch`:
   
   This is called once per batch from the iterator loop, but `BindExpr` and 
`substrait.ConvertExpr` depend only on the (schema, filter) pair, not the 
batch. We're paying that cost N times for no reason — hoist the bind + convert 
out and pass them in, or memoize on first call.
   
   Also, the `AlwaysFalse` branch returns `array.NewRecordBatch(emptySchema, 
nil, 0)` with no columns at all — downstream code that does `rec.Column(i)` 
will panic. The empty record should have the right schema *and* empty arrays 
for each field.



##########
table/rewrite_data_files.go:
##########
@@ -328,6 +336,18 @@ func ExecuteCompactionGroup(ctx context.Context, tbl 
*Table, group CompactionTas
        }, nil
 }
 
+// hasRowLineage returns true if any task in the group has a non-nil 
FirstRowID,

Review Comment:
   The comment says "indicating the table has v3 row lineage data to preserve", 
but the body checks per-task `FirstRowID` — a per-file property, not a table 
property. A v3 table can legitimately have a mix of tasks where some have 
`FirstRowID` and some don't (legacy files), so "any task has it" doesn't really 
mean "the table has lineage".
   
   I'd either rename to `anyTaskHasRowLineage` (and fix the comment to match), 
or — better — gate on `tbl.metadata.Version() >= 3` at the call site and 
rewrite this to mean "all tasks have lineage" so we don't trip the 
mixed-lineage bug above.



##########
table/table.go:
##########
@@ -816,6 +816,14 @@ func WithOptions(opts iceberg.Properties) ScanOption {
        }
 }
 
+// WithRowLineage adds _row_id to the scan projection so that row identity is
+// preserved through rewrites. Only meaningful for v3 tables; ignored 
otherwise.
+func WithRowLineage() ScanOption {

Review Comment:
   The name `WithRowLineage` suggests it controls the whole row-lineage 
feature, but it only adds `_row_id` to the projection — 
`_last_updated_sequence_number` is governed elsewhere. That's confusing for 
callers who'd expect one switch to mean one thing.
   
   Could we either rename to scope it (e.g. `WithRowID`) or extend it to also 
project `_last_updated_sequence_number`? wdyt?



##########
table/transaction.go:
##########
@@ -1542,11 +1562,19 @@ func (t *Transaction) rewriteSingleFile(ctx 
context.Context, fs io.IO, originalF
                return nil, fmt.Errorf("failed to build metadata: %w", err)
        }
 
+       // When preserving row lineage, omit the row filter from the scanner so 
that
+       // _row_id is synthesized using the true file-level row positions. The 
filter
+       // is applied below in the record iterator.
+       scanFilter := iceberg.BooleanExpression(iceberg.AlwaysTrue{})

Review Comment:
   Passing `AlwaysTrue` here disables both manifest-level and file-level 
pushdown for the rewrite scan when row lineage is on. For a selective delete on 
a wide partition that's a lot of unnecessary IO — we read every file in scope 
just to filter post-synthesis.
   
   Worst case this is unavoidable because `_row_id` synthesis needs original 
positions, but the comment treats it as obviously fine. At minimum I'd leave a 
TODO calling out the cost, ideally we'd still apply file-level (not row-level) 
skipping based on `boundFilter`'s residuals.



##########
table/row_lineage_rewrite_test.go:
##########
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "path/filepath"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func newV3RowLineageTestTable(t *testing.T) *table.Table {
+       t.Helper()
+
+       location := filepath.ToSlash(t.TempDir())
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+       meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec, 
table.UnsortedSortOrder, location,
+               iceberg.Properties{table.PropertyFormatVersion: "3"})
+       require.NoError(t, err)
+
+       metaLoc := location + "/metadata/v1.metadata.json"
+       fsF := func(context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil }
+       cat := &concurrentTestCatalog{metadata: meta, location: metaLoc, fsF: 
fsF}
+
+       return table.New(table.Identifier{"db", "row_lineage_test"}, meta, 
metaLoc, fsF, cat)
+}
+
+// TestCoWRewritePreservesRowID verifies that a copy-on-write overwrite with a
+// row filter preserves the original _row_id values in the rewritten file. The
+// _last_updated_sequence_number should be null (inherited from 
data_sequence_number).
+func TestCoWRewritePreservesRowID(t *testing.T) {
+       ctx := context.Background()
+       mem := memory.DefaultAllocator
+
+       tbl := newV3RowLineageTestTable(t)
+
+       // Append 3 rows: id=1,2,3
+       arrowSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+
+       initialData, err := array.TableFromJSON(mem, arrowSchema, []string{
+               `[{"id": 1, "data": "a"}, {"id": 2, "data": "b"}, {"id": 3, 
"data": "c"}]`,
+       })
+       require.NoError(t, err)
+       defer initialData.Release()
+
+       tbl, err = tbl.Append(ctx, array.NewTableReader(initialData, -1), nil)
+       require.NoError(t, err)
+
+       // Verify the append created a valid v3 snapshot with row lineage.
+       snap := tbl.CurrentSnapshot()
+       require.NotNil(t, snap)
+       require.NotNil(t, snap.FirstRowID, "v3 snapshot must have first-row-id")
+       require.NotNil(t, snap.AddedRows, "v3 snapshot must have added-rows")
+       assert.Equal(t, int64(0), *snap.FirstRowID)
+       assert.Equal(t, int64(3), *snap.AddedRows)
+
+       // Scan with row lineage to see synthesized _row_id values before the 
rewrite.
+       lineageScan := tbl.Scan(table.WithRowLineage())
+       schema, itr, err := lineageScan.ToArrowRecords(ctx)
+       require.NoError(t, err)
+
+       rowIDIdx := -1
+       for i, f := range schema.Fields() {
+               if f.Name == iceberg.RowIDColumnName {
+                       rowIDIdx = i
+
+                       break
+               }
+       }
+       require.GreaterOrEqual(t, rowIDIdx, 0, "_row_id should be in scan 
projection")
+
+       var originalRowIDs []int64
+       for rec, err := range itr {
+               require.NoError(t, err)
+               col := rec.Column(rowIDIdx).(*array.Int64)
+               for i := 0; i < col.Len(); i++ {
+                       originalRowIDs = append(originalRowIDs, col.Value(i))
+               }
+               rec.Release()
+       }
+       require.Equal(t, []int64{0, 1, 2}, originalRowIDs, "initial _row_id 
should be 0,1,2")
+
+       // CoW overwrite: delete the row where id=2, preserving id=1 and id=3.
+       filter := iceberg.EqualTo(iceberg.Reference("id"), int64(2))
+       tbl, err = tbl.Delete(ctx, filter, nil)
+       require.NoError(t, err)
+
+       snap = tbl.CurrentSnapshot()
+       require.NotNil(t, snap)
+
+       // Scan the result with row lineage. The surviving rows should preserve 
their
+       // original _row_id values: 0 and 2.
+       lineageScan = tbl.Scan(table.WithRowLineage())
+       _, itr, err = lineageScan.ToArrowRecords(ctx)
+       require.NoError(t, err)
+
+       var afterRowIDs []int64
+       var afterIDs []int64
+       for rec, err := range itr {
+               require.NoError(t, err)
+               idIdx := rec.Schema().FieldIndices("id")
+               require.NotEmpty(t, idIdx)
+               rowIDIndices := 
rec.Schema().FieldIndices(iceberg.RowIDColumnName)
+               require.NotEmpty(t, rowIDIndices)
+
+               idCol := rec.Column(idIdx[0]).(*array.Int64)
+               rowIDCol := rec.Column(rowIDIndices[0]).(*array.Int64)
+               for i := 0; i < int(rec.NumRows()); i++ {
+                       afterIDs = append(afterIDs, idCol.Value(i))
+                       afterRowIDs = append(afterRowIDs, rowIDCol.Value(i))
+               }
+               rec.Release()
+       }
+
+       assert.Equal(t, []int64{1, 3}, afterIDs, "remaining rows should be 
id=1,3")
+       assert.Equal(t, []int64{0, 2}, afterRowIDs,
+               "_row_id must be preserved through CoW rewrite: row with id=1 
keeps _row_id=0, row with id=3 keeps _row_id=2")
+}
+
+// TestCoWRewriteRowIDNextRowIDAccounting verifies that row-id accounting 
remains
+// correct after a CoW rewrite. The overcounting (where next-row-id advances by
+// the full manifest row count including preserved survivors) is intentional 
and
+// matches Java's ManifestListWriter.V4Writer behavior.
+func TestCoWRewriteRowIDNextRowIDAccounting(t *testing.T) {
+       ctx := context.Background()
+       mem := memory.DefaultAllocator
+
+       tbl := newV3RowLineageTestTable(t)
+
+       arrowSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+
+       data, err := array.TableFromJSON(mem, arrowSchema, []string{
+               `[{"id": 10, "data": "x"}, {"id": 20, "data": "y"}, {"id": 30, 
"data": "z"}]`,
+       })
+       require.NoError(t, err)
+       defer data.Release()
+
+       tbl, err = tbl.Append(ctx, array.NewTableReader(data, -1), nil)
+       require.NoError(t, err)
+
+       // next-row-id should be 3 after appending 3 rows.
+       assert.Equal(t, int64(3), tbl.Metadata().NextRowID())
+
+       // Delete one row via CoW.
+       filter := iceberg.EqualTo(iceberg.Reference("id"), int64(20))
+       tbl, err = tbl.Delete(ctx, filter, nil)
+       require.NoError(t, err)
+
+       // next-row-id advances by the rewritten manifest's total row count (2 
surviving
+       // rows), even though those rows preserve old IDs. This "wastes" ID 
space but
+       // doesn't violate uniqueness — actual row IDs come from the explicit 
Parquet
+       // column, not the global counter.
+       nextRowID := tbl.Metadata().NextRowID()
+       assert.GreaterOrEqual(t, nextRowID, int64(5),

Review Comment:
   `GreaterOrEqual(..., int64(5))` makes the assertion vacuous in the loose 
direction — it passes even if next-row-id leapt to some huge value because of a 
bug elsewhere. And the comment above says the expected accounting is "original 
+ rewritten = 3 + 2 = 5", so we actually know the expected value.
   
   I'd assert `Equal(t, int64(5), nextRowID)` (or whatever the exact 
post-rewrite expectation is). As written, this test would still pass if 
next-row-id accounting drifted in either direction.



##########
table/snapshot_producers.go:
##########
@@ -917,6 +917,11 @@ func (sp *snapshotProducer) commit(ctx context.Context) (_ 
[]Update, _ []Require
                        return nil, nil, err
                }
                if writer.NextRowID() != nil {
+                       // addedRows counts ALL rows in new manifests (existing 
+ added), even
+                       // for rewrites where survivors preserve old _row_id 
values. This
+                       // "wastes" ID space but doesn't violate uniqueness: 
actual row IDs come
+                       // from the explicit Parquet column, not the global 
counter. Java's
+                       // ManifestListWriter.V4Writer uses the same accounting.

Review Comment:
   `ManifestListWriter.V4Writer` looks like a typo — Java's manifest-list 
writer for row lineage is `V3Writer` (the v3 format-version writer); there is 
no V4 manifest. Unless there's a v4 draft I'm missing, the comment should read 
`V3Writer`.
   
   Misleading provenance comments are extra costly because the next reader 
trying to verify parity will hunt for a class that doesn't exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to