laskoviymishka commented on code in PR #1099:
URL: https://github.com/apache/iceberg-go/pull/1099#discussion_r3289810518
##########
table/transaction.go:
##########
@@ -1488,13 +1501,23 @@ func (t *Transaction)
classifyFilesForFilteredDeletions(ctx context.Context, fs
localDelete = append(localDelete, df)
} else {
localRewrite = append(localRewrite, df)
+ // Capture the file's sequence number
from the
+ // manifest entry so the rewrite path
can synthesize
+ // _last_updated_sequence_number for
source rows
+ // that have a null value (or no
column) in the file.
+ if fseq := entry.FileSequenceNum();
fseq != nil {
Review Comment:
I think this is reading the wrong manifest field. Spec §Row Lineage says
when `_last_updated_sequence_number` is null on read, it gets assigned the
manifest entry's `sequence_number` (field id 3, the data sequence number) — not
`file_sequence_number` (field id 4, the snapshot that added the file). For
back-dated files where they diverge, a Java or iceberg-rust reader would
compute a different value for the same row.
I'd switch both this site and the matching one in `scanner.go` to
`entry.SequenceNum()`. `SequenceNum()` returns `int64` not `*int64`, so the nil
guard collapses — drop the conditional or guard on `>= 0`. While we're here,
I'd rename `FileScanTask.DataSequenceNumber` if that's where this plumbs
through, so the field name matches what it actually holds.
##########
table/rewrite_data_files.go:
##########
@@ -289,6 +289,20 @@ func ExecuteCompactionGroup(ctx context.Context, tbl
*Table, group CompactionTas
scanOpts = append(scanOpts,
WitMaxConcurrency(cfg.scanConcurrency))
}
+ // Preserve row lineage only when every source file in the group carries
+ // it. A mixed group (some files with FirstRowID, some without — e.g.
+ // legacy files on a v3 table) would otherwise produce one output where
+ // post-lineage rows have explicit _row_id values and pre-lineage rows
+ // have nulls, which violates the per-file uniqueness/coverage
+ // invariant the v3 spec requires. Splitting mixed groups into separate
+ // outputs is a larger refactor and is left as a follow-up; for now we
+ // degrade gracefully (the rewrite still succeeds, but lineage is not
+ // preserved for the surviving rows).
+ preserveLineage := tbl.metadata.Version() >= 3 &&
allTasksHaveRowLineage(group.Tasks)
Review Comment:
Mixed-lineage groups silently fall through to the non-lineage path.
Compaction succeeds, the output is valid, but `_row_id` is gone — no log, no
metric, no flag in `CompactionGroupResult`. For tables migrating to v3, every
compaction pass over a group that contains any legacy file will quietly drop
lineage on the surviving rows. That's the common case during migration, not the
edge case.
I'd at minimum add a `slog.Warn` matching the v3 pos-delete pattern, and
ideally a `RowLineagePreserved bool` on `CompactionGroupResult` so callers can
observe it.
##########
table/scanner_internal_test.go:
##########
@@ -238,98 +236,71 @@ func TestBuildPartitionEvaluatorWithInvalidSpecID(t
*testing.T) {
assert.ErrorContains(t, err, "id 999")
}
-// TestProjectionV3PreLineageFile verifies that Projection() succeeds and
returns
-// _row_id and _last_updated_sequence_number as nullable (all-null-capable)
fields when
-// the table is v3 with next-row-id set but the data file predates row lineage
(those
-// columns are absent from the schema).
-func TestProjectionV3PreLineageFile(t *testing.T) {
- schema := iceberg.NewSchema(
- 1,
- iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
- iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
- )
+// TestSynthesizeRowLineageColumns verifies that _row_id and
_last_updated_sequence_number
+// are filled from task constants when those columns are present and null.
+func TestSynthesizeRowLineageColumns(t *testing.T) {
Review Comment:
The test only constructs all-null inputs, so it doesn't actually prove the
property the feature depends on. If `synthesizeRowLineageColumns` had a bug
where it overwrote pre-existing non-null values, this test would still pass —
and so would `TestCoWRewritePreservesRowID`, because fresh appends produce
all-null source columns too.
The case that matters is the mixed one: some rows have explicit `_row_id` /
`_last_updated_sequence_number` values from a prior write, the rest are null.
That only arises after multiple rewrites of files that already carry explicit
values, which is exactly the scenario the spec's null-coalescing rule exists
for.
I'd add a fixture that builds a record with three rows — one explicit
non-null, one null, one explicit non-null with a different value — and asserts
the non-null ones survive untouched while the null one picks up `firstRowID +
position`. Same shape for `_last_updated_sequence_number`.
##########
metadata_columns.go:
##########
@@ -59,3 +61,38 @@ func LastUpdatedSequenceNumber() NestedField {
func IsMetadataColumn(fieldID int) bool {
return fieldID == RowIDFieldID || fieldID ==
LastUpdatedSequenceNumberFieldID
}
+
+// SchemaWithRowLineage returns a new schema with the row-lineage metadata
columns
+// (_row_id, _last_updated_sequence_number) appended to the given schema's
fields.
+// Used when reading source files during a CoW rewrite or compaction so that
row
+// identity and per-row update sequence are preserved in the output.
+//
+// Idempotent: if a row-lineage column is already present (by reserved field
ID),
+// it is not appended again. The returned schema always allocates a fresh field
+// slice so it cannot alias the input schema's backing array.
+func SchemaWithRowLineage(s *Schema) *Schema {
Review Comment:
Two things on the new helper.
The comment a few lines up reads as if the second reserved ID is literally
108. Should be "Integer.MAX_VALUE - 107 and Integer.MAX_VALUE - 108" —
`2147483540` and `2147483539` respectively.
More substantively, `SchemaWithRowLineage` lives in the top-level `iceberg`
package, which makes it part of the v1 public API surface. Java keeps the
equivalent logic internal. I'd consider an unexported `schemaWithRowLineage` in
`table`, or a method on `*Schema`, so we're not locked into the helper's exact
shape if v4 lineage semantics shift. Same for `WithPreserveRowLineage(schema)`
— making callers construct the schema feels backwards; could it take no arg and
derive from the table internally?
Also worth a nil guard on `s` at the top here — it's exported, a nil
receiver will panic on `s.Fields()`.
##########
table/transaction.go:
##########
@@ -1530,14 +1565,50 @@ func (t *Transaction) rewriteFilesWithFilter(ctx
context.Context, fs io.IO, upda
return nil
}
-// rewriteSingleFile reads a single data file, applies the filter, and writes
new files with filtered data
-func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, filter iceberg.BooleanExpression, caseSensitive
bool, commitUUID uuid.UUID, concurrency int) ([]iceberg.DataFile, error) {
+// rewriteSingleFile reads a single data file, applies the filter, and writes
new files with filtered data.
+// fileSeqNum is the source file's file_sequence_number from its manifest
entry; required to synthesize
+// _last_updated_sequence_number for rows whose value is null in the source
file.
+func (t *Transaction) rewriteSingleFile(ctx context.Context, fs io.IO,
originalFile iceberg.DataFile, fileSeqNum *int64, filter
iceberg.BooleanExpression, postFilter func(arrow.RecordBatch)
(arrow.RecordBatch, error), caseSensitive bool, commitUUID uuid.UUID,
concurrency int) ([]iceberg.DataFile, error) {
Review Comment:
Nine positional parameters with several same-typed adjacents (`commitUUID,
concurrency`, and now two same-shaped filter closures) — no compile-time order
protection if anyone swaps a pair. I'd pull these into a
`rewriteSingleFileArgs` struct.
Separate but related: `BindExpr` runs unconditionally in
`rewriteSingleFile`, but when `preserveRowLineage == true` the bound filter is
unused (scanFilter becomes `AlwaysTrue{}`) and the same expression is bound
again inside `prepareBatchFilter`. With `preserveRowLineage == false`,
`prepareBatchFilter` runs `BindExpr` + `ConvertExpr` but `postFilter` is never
called. One of those two binds is always wasted.
##########
table/scanner.go:
##########
@@ -261,26 +262,96 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
}
}
- if slices.Contains(scan.selectedFields, "*") {
- return curSchema, nil
+ if scan.includeRowLineage && curVersion < minFormatVersionRowLineage {
+ return nil, fmt.Errorf("%w: row lineage requires format version
%d, table is v%d",
+ ErrInvalidOperation, minFormatVersionRowLineage,
curVersion)
}
- selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields,
caseSensitive)
- schemaMeta := metaFieldsFromSchema(curSchema)
- synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
- if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
+ var schema *iceberg.Schema
+ if slices.Contains(scan.selectedFields, "*") {
+ schema = curSchema
+ } else {
+ // Intercept row-lineage metadata column names (_row_id,
+ // _last_updated_sequence_number) before calling Select: they
are
+ // reserved and never appear in the user schema's fields, so
+ // Select would fail with "could not find column" on v3 tables
+ // where they are otherwise legal to project. The scanner reads
+ // them from file metadata (or synthesizes them) at scan time;
+ // here we just need to ensure they survive into the projection.
+ //
+ // On v1/v2 tables, Select is left to fail naturally — those
+ // versions don't have row lineage, so requesting these columns
+ // is an error.
+ userFields, lineageFields :=
splitLineageMetadataFields(scan.selectedFields, scan.caseSensitive)
+ if len(lineageFields) > 0 && curVersion <
minFormatVersionRowLineage {
Review Comment:
The rejection here works only because `Select` happens to fail when
`_row_id` isn't in the v1/v2 schema — the user sees `ErrInvalidSchema` rather
than a clear "row lineage requires v3". And it's fragile: if a future v2 schema
ever carries a column literally named `_row_id` for unrelated reasons, this
short-circuits in the wrong direction.
I'd return an explicit error here for the `curVersion <
minFormatVersionRowLineage && len(lineageFields) > 0` case, naming the format
version requirement. Same logic, but the contract is in the code rather than
emergent from `Select`'s error path.
##########
table/transaction.go:
##########
@@ -1798,3 +1905,53 @@ func (s *StagedTable) Refresh(ctx context.Context)
(*Table, error) {
func (s *StagedTable) Scan(opts ...ScanOption) *Scan {
panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation))
}
+
+// prepareBatchFilter binds the given Iceberg filter against schema and
converts
+// it to substrait once, returning a per-batch filter function that can be
+// reused across every record batch. The setup work (BindExpr, ConvertExpr) is
+// independent of the batch and is the most expensive part of filter-eval, so
+// hoisting it out of the iterator loop is a measurable win on rewrites that
+// produce many batches.
+//
+// The returned function takes ownership of the input batch (it releases it on
+// the AlwaysFalse fast-path) and returns a possibly-new batch the caller is
+// responsible for releasing.
+func prepareBatchFilter(ctx context.Context, filter iceberg.BooleanExpression,
schema *iceberg.Schema, caseSensitive bool) (func(arrow.RecordBatch)
(arrow.RecordBatch, error), error) {
+ if filter == nil || filter.Equals(iceberg.AlwaysTrue{}) {
+ return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
+ return rec, nil
+ }, nil
+ }
+
+ bound, err := iceberg.BindExpr(schema, filter, caseSensitive)
+ if err != nil {
+ return nil, fmt.Errorf("prepareBatchFilter: bind expression:
%w", err)
+ }
+
+ if bound == nil {
+ return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
+ return rec, nil
+ }, nil
+ }
+
+ if bound.Equals(iceberg.AlwaysFalse{}) {
+ // Return a record with the same schema and zero rows.
NewSlice(0, 0)
+ // preserves the per-field arrays so downstream code that calls
+ // rec.Column(i) does not panic on the empty result.
+ return func(rec arrow.RecordBatch) (arrow.RecordBatch, error) {
+ defer rec.Release()
+
+ return rec.NewSlice(0, 0), nil
+ }, nil
+ }
+
+ extSet, substraitFilter, err := substrait.ConvertExpr(schema, bound,
caseSensitive)
+ if err != nil {
+ return nil, fmt.Errorf("prepareBatchFilter: convert expression:
%w", err)
+ }
+
+ ctx = exprs.WithExtensionIDSet(ctx,
exprs.NewExtensionSetDefault(*extSet))
Review Comment:
The closure captures `ctx` at construction time with the extension-ID set
baked in, but per-batch allocator and deadline values from the call-site ctx
are then lost. If `filterRecords` resolves the allocator from ctx, intermediate
arrays end up on the construction-time allocator and
`memory.NewCheckedAllocator` accounting goes sideways under test.
I'd take the call-site ctx as a parameter on the returned closure, derive
the extension-set context inside, and let allocator/deadline propagate.
While we're in here: the positional-binding assumption (lineage fields at
indices N..N+1 in the record batch, filter bound against indices 0..N-1) is
correct but undocumented. Worth a one-line comment so a future change to
lineage column placement doesn't silently break filter evaluation.
##########
table/scanner.go:
##########
@@ -261,26 +262,96 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
}
}
- if slices.Contains(scan.selectedFields, "*") {
- return curSchema, nil
+ if scan.includeRowLineage && curVersion < minFormatVersionRowLineage {
+ return nil, fmt.Errorf("%w: row lineage requires format version
%d, table is v%d",
+ ErrInvalidOperation, minFormatVersionRowLineage,
curVersion)
}
- selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields,
caseSensitive)
- schemaMeta := metaFieldsFromSchema(curSchema)
- synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
- if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
+ var schema *iceberg.Schema
+ if slices.Contains(scan.selectedFields, "*") {
+ schema = curSchema
+ } else {
+ // Intercept row-lineage metadata column names (_row_id,
+ // _last_updated_sequence_number) before calling Select: they
are
+ // reserved and never appear in the user schema's fields, so
+ // Select would fail with "could not find column" on v3 tables
+ // where they are otherwise legal to project. The scanner reads
+ // them from file metadata (or synthesizes them) at scan time;
+ // here we just need to ensure they survive into the projection.
+ //
+ // On v1/v2 tables, Select is left to fail naturally — those
+ // versions don't have row lineage, so requesting these columns
+ // is an error.
+ userFields, lineageFields :=
splitLineageMetadataFields(scan.selectedFields, scan.caseSensitive)
+ if len(lineageFields) > 0 && curVersion <
minFormatVersionRowLineage {
+ userFields = scan.selectedFields
+ lineageFields = nil
+ }
- // synthesis path
- removedMetaSlice, missingMetaFields :=
removeMetadataFromSelectedFields(scan.selectedFields, synthesisMeta)
- sch, err := curSchema.Select(scan.caseSensitive,
removedMetaSlice...)
+ var err error
+ schema, err = curSchema.Select(scan.caseSensitive,
userFields...)
if err != nil {
return nil, err
}
+ if len(lineageFields) > 0 {
+ schema = appendMissingLineageFields(schema,
lineageFields)
+ }
+ }
+
+ if scan.includeRowLineage {
Review Comment:
There are now two code paths that select lineage columns into the
projection: the `includeRowLineage` flag path (calls `SchemaWithRowLineage`)
and the explicit-field-name path (calls `appendMissingLineageFields`). Both run
when the user supplied lineage field names *and* set `WithRowLineage`,
producing a redundant pass.
More concerning, they have subtly different error behavior — one rejects on
v1/v2 with a clear error, the other relies on `Select` to fail. Anyone adding a
third lineage column down the road has to update both paths and both test sets.
I'd canonicalize on one path: detect lineage fields in `selectedFields` up
front, set `scan.includeRowLineage = true` if any are present, drop them from
`selectedFields`, and from there only the `includeRowLineage` branch runs. wdyt?
##########
table/transaction.go:
##########
@@ -1798,3 +1905,53 @@ func (s *StagedTable) Refresh(ctx context.Context)
(*Table, error) {
func (s *StagedTable) Scan(opts ...ScanOption) *Scan {
panic(fmt.Errorf("%w: cannot scan a staged table", ErrInvalidOperation))
}
+
+// prepareBatchFilter binds the given Iceberg filter against schema and
converts
+// it to substrait once, returning a per-batch filter function that can be
+// reused across every record batch. The setup work (BindExpr, ConvertExpr) is
+// independent of the batch and is the most expensive part of filter-eval, so
+// hoisting it out of the iterator loop is a measurable win on rewrites that
+// produce many batches.
+//
+// The returned function takes ownership of the input batch (it releases it on
+// the AlwaysFalse fast-path) and returns a possibly-new batch the caller is
+// responsible for releasing.
+func prepareBatchFilter(ctx context.Context, filter iceberg.BooleanExpression,
schema *iceberg.Schema, caseSensitive bool) (func(arrow.RecordBatch)
(arrow.RecordBatch, error), error) {
Review Comment:
The `AlwaysFalse` fast-path does `defer rec.Release()`; the `AlwaysTrue` /
bound-nil paths return `rec` unchanged with no Retain. So depending on which
fast-path triggers, the caller either does or doesn't own a reference to the
returned record. That's two contracts for one return type, and the next caller
to add a `Release()` will double-free on the AlwaysFalse path.
I'd have every fast-path do a `rec.Retain()` so the caller always owns
exactly one reference on the result, mirroring the bound-filter path's
semantics.
##########
table/row_lineage_rewrite_test.go:
##########
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+ "context"
+ "path/filepath"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func newV3RowLineageTestTable(t *testing.T) *table.Table {
+ t.Helper()
+
+ location := filepath.ToSlash(t.TempDir())
+ schema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+ meta, err := table.NewMetadata(schema, iceberg.UnpartitionedSpec,
table.UnsortedSortOrder, location,
+ iceberg.Properties{table.PropertyFormatVersion: "3"})
+ require.NoError(t, err)
+
+ metaLoc := location + "/metadata/v1.metadata.json"
+ fsF := func(context.Context) (iceio.IO, error) { return
iceio.LocalFS{}, nil }
+ cat := &concurrentTestCatalog{metadata: meta, location: metaLoc, fsF:
fsF}
+
+ return table.New(table.Identifier{"db", "row_lineage_test"}, meta,
metaLoc, fsF, cat)
+}
+
+// TestCoWRewritePreservesRowID verifies that a copy-on-write overwrite with a
+// row filter preserves the original _row_id and _last_updated_sequence_number
+// values in the rewritten file. Surviving rows must keep both values from the
+// pre-rewrite snapshot — the rewrite is "physically rewritten", not "logically
+// updated", per the v3 spec.
+func TestCoWRewritePreservesRowID(t *testing.T) {
+ ctx := context.Background()
+ mem := memory.DefaultAllocator
+
+ tbl := newV3RowLineageTestTable(t)
+
+ // Append 3 rows: id=1,2,3
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+
+ initialData, err := array.TableFromJSON(mem, arrowSchema, []string{
+ `[{"id": 1, "data": "a"}, {"id": 2, "data": "b"}, {"id": 3,
"data": "c"}]`,
+ })
+ require.NoError(t, err)
+ defer initialData.Release()
+
+ tbl, err = tbl.Append(ctx, array.NewTableReader(initialData, -1), nil)
+ require.NoError(t, err)
+
+ // Verify the append created a valid v3 snapshot with row lineage.
+ snap := tbl.CurrentSnapshot()
+ require.NotNil(t, snap)
+ require.NotNil(t, snap.FirstRowID, "v3 snapshot must have first-row-id")
+ require.NotNil(t, snap.AddedRows, "v3 snapshot must have added-rows")
+ assert.Equal(t, int64(0), *snap.FirstRowID)
+ assert.Equal(t, int64(3), *snap.AddedRows)
+
+ // Capture the snapshot's sequence number so we can assert preservation
+ // after the rewrite. After the append, every row's effective
+ // _last_updated_sequence_number should be this value.
+ createSeq := snap.SequenceNumber
+
+ // Scan with row lineage to see synthesized _row_id values before the
rewrite.
+ lineageScan := tbl.Scan(table.WithRowLineage())
+ schema, itr, err := lineageScan.ToArrowRecords(ctx)
+ require.NoError(t, err)
+
+ rowIDIdx := -1
+ for i, f := range schema.Fields() {
+ if f.Name == iceberg.RowIDColumnName {
+ rowIDIdx = i
+
+ break
+ }
+ }
+ require.GreaterOrEqual(t, rowIDIdx, 0, "_row_id should be in scan
projection")
+
+ var originalRowIDs []int64
+ for rec, err := range itr {
+ require.NoError(t, err)
+ col := rec.Column(rowIDIdx).(*array.Int64)
+ for i := 0; i < col.Len(); i++ {
+ originalRowIDs = append(originalRowIDs, col.Value(i))
+ }
+ rec.Release()
+ }
+ require.Equal(t, []int64{0, 1, 2}, originalRowIDs, "initial _row_id
should be 0,1,2")
+
+ // CoW overwrite: delete the row where id=2, preserving id=1 and id=3.
+ filter := iceberg.EqualTo(iceberg.Reference("id"), int64(2))
+ tbl, err = tbl.Delete(ctx, filter, nil)
+ require.NoError(t, err)
+
+ snap = tbl.CurrentSnapshot()
+ require.NotNil(t, snap)
+ require.Greater(t, snap.SequenceNumber, createSeq,
+ "sanity: rewrite snapshot must have a higher sequence number
than the create snapshot")
+
+ // Scan the result with row lineage. The surviving rows should preserve
their
+ // original _row_id values: 0 and 2. Their
_last_updated_sequence_number must
+ // also still report the create snapshot's seq, NOT the rewrite
snapshot's
+ // seq — the rewrite is physical only, not a logical update.
+ lineageScan = tbl.Scan(table.WithRowLineage())
+ _, itr, err = lineageScan.ToArrowRecords(ctx)
+ require.NoError(t, err)
+
+ var afterRowIDs []int64
+ var afterIDs []int64
+ var afterSeq []int64
+ for rec, err := range itr {
+ require.NoError(t, err)
+ idIdx := rec.Schema().FieldIndices("id")
+ require.NotEmpty(t, idIdx)
+ rowIDIndices :=
rec.Schema().FieldIndices(iceberg.RowIDColumnName)
+ require.NotEmpty(t, rowIDIndices)
+ seqIndices :=
rec.Schema().FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ require.NotEmpty(t, seqIndices, "_last_updated_sequence_number
must be in projection")
+
+ idCol := rec.Column(idIdx[0]).(*array.Int64)
+ rowIDCol := rec.Column(rowIDIndices[0]).(*array.Int64)
+ seqCol := rec.Column(seqIndices[0]).(*array.Int64)
+ for i := 0; i < int(rec.NumRows()); i++ {
+ afterIDs = append(afterIDs, idCol.Value(i))
+ afterRowIDs = append(afterRowIDs, rowIDCol.Value(i))
+ require.False(t, seqCol.IsNull(i),
+ "row %d must have a non-null
_last_updated_sequence_number after CoW rewrite", i)
+ afterSeq = append(afterSeq, seqCol.Value(i))
+ }
+ rec.Release()
+ }
+
+ assert.Equal(t, []int64{1, 3}, afterIDs, "remaining rows should be
id=1,3")
+ assert.Equal(t, []int64{0, 2}, afterRowIDs,
+ "_row_id must be preserved through CoW rewrite: row with id=1
keeps _row_id=0, row with id=3 keeps _row_id=2")
+ assert.Equal(t, []int64{createSeq, createSeq}, afterSeq,
+ "_last_updated_sequence_number must report the original
creation snapshot's sequence number, not the rewrite's")
+}
+
+// TestCoWRewriteRowIDNextRowIDAccounting verifies that row-id accounting
remains
+// correct after a CoW rewrite. The overcounting (where next-row-id advances by
+// the full manifest row count including preserved survivors) is intentional
and
+// matches Java's ManifestListWriter.V3Writer behavior.
+func TestCoWRewriteRowIDNextRowIDAccounting(t *testing.T) {
+ ctx := context.Background()
+ mem := memory.DefaultAllocator
+
+ tbl := newV3RowLineageTestTable(t)
+
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+ {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+
+ data, err := array.TableFromJSON(mem, arrowSchema, []string{
+ `[{"id": 10, "data": "x"}, {"id": 20, "data": "y"}, {"id": 30,
"data": "z"}]`,
+ })
+ require.NoError(t, err)
+ defer data.Release()
+
+ tbl, err = tbl.Append(ctx, array.NewTableReader(data, -1), nil)
+ require.NoError(t, err)
+
+ // next-row-id should be 3 after appending 3 rows.
+ assert.Equal(t, int64(3), tbl.Metadata().NextRowID())
+
+ // Delete one row via CoW.
+ filter := iceberg.EqualTo(iceberg.Reference("id"), int64(20))
+ tbl, err = tbl.Delete(ctx, filter, nil)
+ require.NoError(t, err)
+
+ // next-row-id advances by 3 (original) + 2 (rewritten survivors) = 5,
+ // even though the surviving rows preserve their old IDs. This "wastes"
+ // ID space but doesn't violate uniqueness — actual row IDs come from
+ // the explicit Parquet column, not the global counter.
+ assert.Equal(t, int64(5), tbl.Metadata().NextRowID(),
+ "next-row-id should advance by original (3) + rewritten (2) =
5")
Review Comment:
The comment is arithmetically misleading — the advance is `2` (the new
manifest's added rows), and `5 = firstRowID(3) + 2`. The "3" is the starting
value, not an addend. Worth rewriting the comment to match the actual
computation, and linking to the `snapshot_producers.go` site that mirrors
`ManifestListWriter.V3Writer.prepare()` in Java.
Separately, I'd double-check `snapshot.AddedRows` here — Java's `added-rows`
summary field is the actual added row count, not `existing + added`. If we're
stamping the Java-equivalent of `existing + added` into the snapshot summary,
downstream tools reading that field will see inflated numbers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]