laskoviymishka commented on code in PR #982:
URL: https://github.com/apache/iceberg-go/pull/982#discussion_r3202392990


##########
table/rebuild_manifest_test.go:
##########
@@ -0,0 +1,476 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+// newMetadataWithLastSeqNum builds a v2 Metadata whose last-sequence-number
+// equals lastSeqNum by grafting a synthetic snapshot at that sequence number.
+// Used by tests that need to control freshMeta.LastSequenceNumber().
+func newMetadataWithLastSeqNum(t *testing.T, lastSeqNum int64) Metadata {
+       t.Helper()
+       schema := iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+       )
+       meta, err := NewMetadata(schema, iceberg.UnpartitionedSpec, 
UnsortedSortOrder, "file:///tmp/seqtest",
+               iceberg.Properties{PropertyFormatVersion: "2"})
+       require.NoError(t, err)
+       builder, err := MetadataBuilderFromBase(meta, "")
+       require.NoError(t, err)
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: lastSeqNum,
+               TimestampMs:    meta.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+       }
+       require.NoError(t, builder.AddSnapshot(&snap))
+       require.NoError(t, builder.SetSnapshotRef(MainBranch, 1000, BranchRef))
+       out, err := builder.Build()
+       require.NoError(t, err)
+
+       return out
+}
+
+// newV3MetadataWithNextRowID builds a v3 Metadata whose NextRowID() returns
+// nextRowID by adding a synthetic snapshot that consumes that many rows.
+// Used by tests that need to control freshMeta.NextRowID() for v3 row lineage.
+func newV3MetadataWithNextRowID(t *testing.T, nextRowID int64) Metadata {
+       t.Helper()
+       spec := iceberg.NewPartitionSpec()
+       txn, _ := createTestTransactionWithMemIO(t, spec)
+       txn.meta.formatVersion = 3
+
+       firstRowID := int64(0)
+       addedRows := nextRowID
+       snap := Snapshot{
+               SnapshotID:     1000,
+               SequenceNumber: 1,
+               TimestampMs:    txn.meta.base.LastUpdatedMillis() + 1,
+               Summary:        &Summary{Operation: OpAppend},
+               FirstRowID:     &firstRowID,
+               AddedRows:      &addedRows,
+       }
+       require.NoError(t, txn.meta.AddSnapshot(&snap))
+       require.NoError(t, txn.meta.SetSnapshotRef(MainBranch, 1000, BranchRef))
+
+       meta, err := txn.meta.Build()
+       require.NoError(t, err)
+       require.Equal(t, nextRowID, meta.NextRowID())
+
+       return meta
+}
+
+// ptr returns a pointer to v, used in test helper expressions.
+// NOTE: ptr is also declared in pos_delete_partitioned_fanout_writer_test.go;
+// both files share the same package so only one declaration is needed.
+// This comment documents the shared usage — do not re-add a declaration here.
+
+// rebuildUpdate constructs an addSnapshotUpdate whose rebuildManifestList
+// closure simply records the freshParent it received and returns a new
+// snapshot whose ManifestList is the given newManifestList value.
+func rebuildUpdate(snap *Snapshot, newManifestList string, gotParent 
**Snapshot) *addSnapshotUpdate {
+       return &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, 
freshParent *Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       *gotParent = freshParent
+                       rebuilt := *snap
+                       rebuilt.ManifestList = newManifestList
+
+                       return &rebuilt, nil
+               },
+       }
+}
+
+// TestRebuildSnapshotUpdates_CallsClosureWithFreshParent verifies that
+// rebuildSnapshotUpdates invokes the rebuild closure and passes the fresh
+// branch head as freshParent.
+func TestRebuildSnapshotUpdates_CallsClosureWithFreshParent(t *testing.T) {
+       const oldManifest = "s3://bucket/old-manifest-list.avro"
+       const newManifest = "s3://bucket/new-manifest-list.avro"
+
+       // Build a fresh metadata that has a different snapshot than the one
+       // embedded in the update, so the parent-hasn't-changed guard is
+       // bypassed and the closure must be called.
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parentID := int64(7) // original snap's parent — does NOT match 
freshHead (42)
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &parentID,
+               ManifestList:     oldManifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       var receivedParent *Snapshot
+       upd := rebuildUpdate(snap, newManifest, &receivedParent)
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       require.Len(t, rebuilt, 1)
+       require.Len(t, orphaned, 1, "old manifest list must be recorded as 
orphaned")
+
+       // The closure should have been called with the branch's current head.
+       require.NotNil(t, receivedParent)
+       assert.Equal(t, freshHead, receivedParent.SnapshotID)
+
+       // The rebuilt update must carry the new manifest list.
+       addUpd, ok := rebuilt[0].(*addSnapshotUpdate)
+       require.True(t, ok)
+       assert.Equal(t, newManifest, addUpd.Snapshot.ManifestList)
+
+       // The superseded manifest list becomes an orphan.
+       assert.Equal(t, oldManifest, orphaned[0])
+}
+
+// TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged verifies that
+// rebuildSnapshotUpdates skips the rebuild when the update's snapshot
+// already has the fresh branch head as its parent (no-op retry).
+func TestRebuildSnapshotUpdates_SkipsWhenParentUnchanged(t *testing.T) {
+       const manifest = "s3://bucket/manifest-list.avro"
+
+       freshHead := int64(42)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       // Parent already equals the fresh head — rebuild must be skipped.
+       snap := &Snapshot{
+               SnapshotID:       99,
+               ParentSnapshotID: &freshHead, // same as fresh head
+               ManifestList:     manifest,
+               Summary:          &Summary{Operation: OpAppend},
+       }
+
+       called := false
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       called = true
+
+                       return snap, nil
+               },
+       }
+
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{upd},
+               freshMeta,
+               MainBranch,
+               iceio.LocalFS{},
+               1,
+       )
+       require.NoError(t, err)
+       assert.False(t, called, "rebuild closure must not be called when parent 
is already up-to-date")
+       assert.Empty(t, orphaned, "no orphans when rebuild is skipped")
+       assert.Same(t, upd, rebuilt[0].(*addSnapshotUpdate), "original update 
must pass through unchanged")
+}
+
+// TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates verifies that
+// updates without a rebuildManifestList closure are returned unmodified.
+func TestRebuildSnapshotUpdates_PassesThroughNonRebuildUpdates(t *testing.T) {
+       plainUpd := NewAddSnapshotUpdate(&Snapshot{
+               SnapshotID:   1,
+               ManifestList: "s3://bucket/no-rebuild.avro",
+               Summary:      &Summary{Operation: OpAppend},
+       })
+
+       // freshMeta may be nil — the plain update must not be touched.
+       rebuilt, orphaned, err := rebuildSnapshotUpdates(
+               t.Context(),
+               []Update{plainUpd},
+               nil,
+               MainBranch,
+               iceio.LocalFS{},
+               0,
+       )
+       require.NoError(t, err)
+       assert.Empty(t, orphaned)
+       assert.Same(t, plainUpd, rebuilt[0].(*addSnapshotUpdate))
+}
+
+// TestRebuildSnapshotUpdates_PropagatesClosureError verifies that an error
+// returned by the rebuild closure surfaces as the function's return error.
+func TestRebuildSnapshotUpdates_PropagatesClosureError(t *testing.T) {
+       freshHead := int64(5)
+       freshMeta := newConflictTestMetadata(t, &freshHead)
+
+       parent := int64(1)
+       snap := &Snapshot{
+               SnapshotID:       10,
+               ParentSnapshotID: &parent,
+               ManifestList:     "s3://bucket/old.avro",
+               Summary:          &Summary{Operation: OpAppend},
+       }
+       wantErr := errors.New("simulated S3 write failure")
+       upd := &addSnapshotUpdate{
+               baseUpdate: baseUpdate{ActionName: UpdateAddSnapshot},
+               Snapshot:   snap,
+               rebuildManifestList: func(_ context.Context, _ Metadata, _ 
*Snapshot, _ iceio.WriteFileIO, _ int) (*Snapshot, error) {
+                       return nil, wantErr
+               },
+       }
+
+       _, _, err := rebuildSnapshotUpdates(t.Context(), []Update{upd}, 
freshMeta, MainBranch, iceio.LocalFS{}, 1)
+       assert.ErrorIs(t, err, wantErr)
+}
+
+// ---------------------------------------------------------------------------
+// Fix 3 — newSeq derived from freshMeta.LastSequenceNumber()
+// ---------------------------------------------------------------------------
+
+// TestRebuildFn_SeqNumDerivedFromFreshMeta verifies that the rebuilt 
snapshot's
+// SequenceNumber equals freshMeta.LastSequenceNumber()+1, NOT
+// freshParent.SequenceNumber+1. A concurrent writer on a different branch can
+// advance the table-wide last-sequence-number without advancing this branch's
+// parent, so using freshParent.SequenceNumber+1 would violate the spec 
invariant.
+func TestRebuildFn_SeqNumDerivedFromFreshMeta(t *testing.T) {
+       spec := iceberg.NewPartitionSpec()
+       txn, wfs := createTestTransactionWithMemIO(t, spec)
+
+       sp := newFastAppendFilesProducer(OpAppend, txn, wfs, nil, nil)
+       sp.appendDataFile(newTestDataFile(t, spec, 
"mem://default/table-location/data/f.parquet", nil))
+
+       updates, _, err := sp.commit(context.Background())
+       require.NoError(t, err)
+       addSnap := updates[0].(*addSnapshotUpdate)
+       require.NotNil(t, addSnap.rebuildManifestList, "rebuildManifestList 
closure must be set")
+
+       // Simulate a concurrent writer on another branch that bumped the global
+       // last-sequence-number to 99 without advancing this branch's parent.
+       // old code: newSeq = capturedSnapshot.SequenceNumber (stale, ≤ 99 — 
spec violation)
+       // new code: newSeq = freshMeta.LastSequenceNumber() + 1 = 100
+       freshMeta := newMetadataWithLastSeqNum(t, 99)
+       require.Equal(t, int64(99), freshMeta.LastSequenceNumber())
+
+       rebuilt, err := addSnap.rebuildManifestList(context.Background(), 
freshMeta, nil, wfs, 1)

Review Comment:
   [Important] The headline scenario the fix targets — peer-branch advancement 
of `last-sequence-number` while this branch's parent stays put — is not 
actually exercised. The test passes `freshParent=nil`, so the buggy 
`freshParent != nil` branch (`newSeq = freshParent.SequenceNumber + 1`) never 
runs. An intermediate regression that reverts only the non-nil branch back to 
`freshParent.SequenceNumber + 1` would not be caught here, and 
`TestManifestListInheritedAfterConflict` doesn't catch it either because in 
that fixture both arithmetic paths happen to yield the same value. 
Strengthening: graft a peer-branch snapshot onto `freshMeta` with a higher 
`SequenceNumber` while keeping main's `freshParent.SequenceNumber` lower, then 
call the closure with both `freshMeta` and that lower `freshParent`, asserting 
`rebuilt.SequenceNumber == freshMeta.LastSequenceNumber()+1` — not 
`freshParent.SequenceNumber+1`. Alternatively, add `require.Equal(t, 
freshMeta.LastSequenceNumber()+1, rebuilt.
 SequenceNumber)` to the existing 
`TestRebuildFn_SummaryRebasedAgainstFreshParent` (which already passes both 
`freshParent` and `freshMeta`) — one-line strengthening that closes the gap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to