laskoviymishka commented on code in PR #982:
URL: https://github.com/apache/iceberg-go/pull/982#discussion_r3200076769


##########
table/snapshot_producers.go:
##########
@@ -903,9 +943,103 @@ func (sp *snapshotProducer) commit(ctx context.Context) 
(_ []Update, _ []Require
                })
        }
 
+       // Build the manifest-list rebuild closure. It is called by doCommit
+       // on each OCC retry to regenerate the manifest list so it correctly
+       // inherits all data files committed by concurrent writers since the
+       // original snapshot was built.
+       formatVersion := sp.txn.meta.formatVersion
+       snapshotID := sp.snapshotID
+       commitUUID := sp.commitUuid
+       capturedSnapshot := snapshot // copy the value so the closure is 
self-contained
+       processManifestsFn := func(m []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error) {
+               return sp.processManifests(m)
+       }
+
+       rebuildFn := func(_ context.Context, freshParent *Snapshot, fio 
iceio.WriteFileIO, attempt int) (_ *Snapshot, retErr error) {
+               // Load inherited manifests from the fresh parent.
+               var inherited []iceberg.ManifestFile
+               if freshParent != nil {
+                       inherited, retErr = freshParent.Manifests(fio)
+                       if retErr != nil {
+                               return nil, fmt.Errorf("rebuild manifest list: 
load parent manifests: %w", retErr)
+                       }
+               }
+
+               // Combine own manifests with inherited ones, applying any
+               // producer-specific processing (no-op for fast/merge-append).
+               combined, procErr := 
processManifestsFn(slices.Concat(ownManifests, inherited))
+               if procErr != nil {
+                       return nil, fmt.Errorf("rebuild manifest list: process 
manifests: %w", procErr)
+               }
+
+               // Derive the sequence number. When there is a fresh parent, 
use its
+               // sequence number + 1 so the rebuilt snapshot is strictly 
greater than
+               // any committed peer. When there is no fresh parent (first 
snapshot in
+               // the table or unknown parent), preserve the original sequence 
number
+               // from the initial build.
+               var newSeq int64
+               if freshParent != nil && formatVersion >= 2 {
+                       newSeq = freshParent.SequenceNumber + 1
+               } else {
+                       newSeq = capturedSnapshot.SequenceNumber
+               }
+
+               // Write the rebuilt manifest list to a path unique to this 
retry
+               // attempt. Each retry uses a different attempt counter in the 
filename
+               // (snap-{id}-{attempt}-{uuid}.avro) so that S3 
conditional-write
+               // semantics (if-none-match) do not reject the overwrite. 
Orphaned files
+               // from superseded retry attempts are removed by doCommit after 
the
+               // commit succeeds.
+               fname := newManifestListFileName(snapshotID, attempt, 
commitUUID)
+               manifestListPath := locProvider.NewMetadataLocation(fname)
+
+               out, createErr := fio.Create(manifestListPath)
+               if createErr != nil {
+                       return nil, fmt.Errorf("rebuild manifest list: create 
file: %w", createErr)
+               }
+               defer internal.CheckedClose(out, &retErr)
+
+               var parentID *int64
+               if freshParent != nil {
+                       id := freshParent.SnapshotID
+                       parentID = &id
+               }
+
+               firstRowID := int64(0)

Review Comment:
   [Blocking] V3 row-lineage stale on retry. The rebuild closure hard-codes 
`firstRowID := int64(0)` while `capturedSnapshot.FirstRowID` keeps the value 
`sp.txn.meta.NextRowID()` produced at attempt 0. On a v3 retry the 
manifest-list container metadata declares `first-row-id=0` while the snapshot 
record advertises something else, and `validateAndUpdateRowLineage` will 
compare a stale `*FirstRowID` against the catalog's advanced `nextRowID`. 
Suggest threading `freshMeta.NextRowID()` into the closure, reassigning 
`rebuilt.FirstRowID`, and recomputing `addedRows` from `writer.NextRowID()` 
after `AddManifests` — surgical change inside this closure, no 
producer-architecture impact.



##########
table/snapshot_producers.go:
##########
@@ -903,9 +943,103 @@ func (sp *snapshotProducer) commit(ctx context.Context) 
(_ []Update, _ []Require
                })
        }
 
+       // Build the manifest-list rebuild closure. It is called by doCommit
+       // on each OCC retry to regenerate the manifest list so it correctly
+       // inherits all data files committed by concurrent writers since the
+       // original snapshot was built.
+       formatVersion := sp.txn.meta.formatVersion
+       snapshotID := sp.snapshotID
+       commitUUID := sp.commitUuid
+       capturedSnapshot := snapshot // copy the value so the closure is 
self-contained
+       processManifestsFn := func(m []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error) {
+               return sp.processManifests(m)
+       }
+
+       rebuildFn := func(_ context.Context, freshParent *Snapshot, fio 
iceio.WriteFileIO, attempt int) (_ *Snapshot, retErr error) {
+               // Load inherited manifests from the fresh parent.
+               var inherited []iceberg.ManifestFile
+               if freshParent != nil {
+                       inherited, retErr = freshParent.Manifests(fio)
+                       if retErr != nil {
+                               return nil, fmt.Errorf("rebuild manifest list: 
load parent manifests: %w", retErr)
+                       }
+               }
+
+               // Combine own manifests with inherited ones, applying any
+               // producer-specific processing (no-op for fast/merge-append).
+               combined, procErr := 
processManifestsFn(slices.Concat(ownManifests, inherited))
+               if procErr != nil {
+                       return nil, fmt.Errorf("rebuild manifest list: process 
manifests: %w", procErr)
+               }
+
+               // Derive the sequence number. When there is a fresh parent, 
use its
+               // sequence number + 1 so the rebuilt snapshot is strictly 
greater than
+               // any committed peer. When there is no fresh parent (first 
snapshot in
+               // the table or unknown parent), preserve the original sequence 
number
+               // from the initial build.
+               var newSeq int64
+               if freshParent != nil && formatVersion >= 2 {
+                       newSeq = freshParent.SequenceNumber + 1
+               } else {
+                       newSeq = capturedSnapshot.SequenceNumber
+               }
+
+               // Write the rebuilt manifest list to a path unique to this 
retry
+               // attempt. Each retry uses a different attempt counter in the 
filename
+               // (snap-{id}-{attempt}-{uuid}.avro) so that S3 
conditional-write
+               // semantics (if-none-match) do not reject the overwrite. 
Orphaned files
+               // from superseded retry attempts are removed by doCommit after 
the
+               // commit succeeds.
+               fname := newManifestListFileName(snapshotID, attempt, 
commitUUID)
+               manifestListPath := locProvider.NewMetadataLocation(fname)
+
+               out, createErr := fio.Create(manifestListPath)
+               if createErr != nil {
+                       return nil, fmt.Errorf("rebuild manifest list: create 
file: %w", createErr)
+               }
+               defer internal.CheckedClose(out, &retErr)
+
+               var parentID *int64
+               if freshParent != nil {
+                       id := freshParent.SnapshotID
+                       parentID = &id
+               }
+
+               firstRowID := int64(0)
+               if formatVersion == 3 {
+                       writer, wrErr := iceberg.NewManifestListWriterV3(out, 
snapshotID, newSeq, firstRowID, parentID)
+                       if wrErr != nil {
+                               return nil, fmt.Errorf("rebuild manifest list: 
create v3 writer: %w", wrErr)
+                       }
+                       defer internal.CheckedClose(writer, &retErr)
+                       if addErr := writer.AddManifests(combined); addErr != 
nil {
+                               return nil, fmt.Errorf("rebuild manifest list: 
add manifests: %w", addErr)
+                       }
+               } else {
+                       if wErr := iceberg.WriteManifestList(formatVersion, 
out, snapshotID, parentID, &newSeq, firstRowID, combined); wErr != nil {
+                               return nil, fmt.Errorf("rebuild manifest list: 
write: %w", wErr)
+                       }
+               }
+
+               rebuilt := capturedSnapshot

Review Comment:
   [Blocking] Snapshot summary captured at attempt 0 is reused on retry. 
`updateSnapshotSummaries` rebased `total-records` / `total-data-files` / 
`total-files-size` against the *stale* parent's summary at build time; once a 
peer commits, the rebuilt snapshot publishes regressed totals to every consumer 
that reads the summary (Iceberg metrics, BI tools, expire-snapshots 
accounting). Fix scopes naturally inside this closure: keep the per-attempt 
added/removed counts (already in `summaryProps`/`sp.snapshotProps`) and 
recompute via `updateSnapshotSummaries(..., freshParent.Summary)` before 
assigning to `rebuilt.Summary`. Doesn't require touching the producer 
architecture.



##########
table/table.go:
##########
@@ -483,6 +490,19 @@ func (t Table) doCommit(ctx context.Context, updates 
[]Update, reqs []Requiremen
                return nil, err
        }
 
+       // Delete manifest-list files that were written during failed retry
+       // attempts and have now been superseded by the committed rebuild.
+       // These are orphaned objects that will never be referenced again.
+       if len(orphanedManifests) > 0 {

Review Comment:
   [Important] Orphan cleanup runs only on success. Non-`ErrCommitFailed` 
errors (`ErrCommitDiverged` from validators, 5xx unknown-state) early-return at 
lines 485 / 490 before reaching this cleanup. Files written by previous rebuild 
attempts then leak permanently — they are never referenced by any committed 
snapshot. A `defer` running this loop with a "this exit is cleanable" flag 
would handle most cases without much surgery. The only path where the orphan 
list should *not* be removed is the unknown-state 5xx, where one of the orphans 
may actually be the snapshot the catalog accepted but didn't acknowledge — 
worth distinguishing explicitly.



##########
table/table.go:
##########
@@ -428,6 +421,20 @@ func (t Table) doCommit(ctx context.Context, updates 
[]Update, reqs []Requiremen
                        }
                        current = fresh.metadata
                        reqs = rewriteRefSnapshotRequirements(reqs, co.branch, 
current)
+
+                       // Rebuild snapshot manifest lists to inherit all files 
committed
+                       // by concurrent writers since the snapshot was 
originally built.
+                       // Without this, the new snapshot's manifest list would 
only
+                       // contain its own files and callers scanning the 
current snapshot
+                       // would miss every concurrent writer's data.
+                       if wfs, ok := fs.(icebergio.WriteFileIO); ok {

Review Comment:
   [Important] Silent `WriteFileIO` fallback reintroduces the original bug. 
When `fs` doesn't implement `icebergio.WriteFileIO` the rebuild silently no-ops 
and the original stale-parent manifest list is sent to the catalog — exactly 
the failure mode this PR is fixing. The same cast is repeated for orphan 
cleanup at line 497, with the same silent-skip behavior. Suggest hoisting one 
cast to the top of `doCommit` and either erroring out or logging at error level 
when it fails. Every real commit-path FS implements `WriteFileIO`; a silent 
fallback here is the worst possible failure mode.



##########
table/snapshot_producers.go:
##########
@@ -815,12 +815,52 @@ func (sp *snapshotProducer) summary(props 
iceberg.Properties) (Summary, error) {
        }, previousSummary)
 }
 
+// computeOwnManifests returns the subset of allManifests that were written
+// by this producer (i.e. not inherited from the parent snapshot). These are
+// preserved across OCC retry attempts when the manifest list is rebuilt
+// against a fresh parent.
+func (sp *snapshotProducer) computeOwnManifests(allManifests 
[]iceberg.ManifestFile) []iceberg.ManifestFile {
+       if sp.parentSnapshotID <= 0 {
+               // No parent means all manifests are new — nothing to exclude.
+               return allManifests
+       }
+
+       parent, err := sp.txn.meta.SnapshotByID(sp.parentSnapshotID)
+       if err != nil || parent == nil {

Review Comment:
   [Blocking] All three error returns (`SnapshotByID` lookup at line 829, 
`parent.Manifests` read at 833, missing parent for `parentSnapshotID > 0`) 
collapse to `return allManifests` — "treat every manifest as own". On a 
transient parent-manifest-list read failure the rebuild then concatenates 
`ownManifests = newManifests` with `freshParent.Manifests(fio)`, so every 
inherited manifest appears twice and downstream readers count rows twice for 
those entries. Propagate the error instead: change the signature to 
`(ownManifests []iceberg.ManifestFile, err error)` and bubble up through 
`commit`. The `parentSnapshotID <= 0` early return (line 823) is fine to keep 
as the all-new fallback.



##########
table/snapshot_producers.go:
##########
@@ -903,9 +943,103 @@ func (sp *snapshotProducer) commit(ctx context.Context) 
(_ []Update, _ []Require
                })
        }
 
+       // Build the manifest-list rebuild closure. It is called by doCommit
+       // on each OCC retry to regenerate the manifest list so it correctly
+       // inherits all data files committed by concurrent writers since the
+       // original snapshot was built.
+       formatVersion := sp.txn.meta.formatVersion
+       snapshotID := sp.snapshotID
+       commitUUID := sp.commitUuid
+       capturedSnapshot := snapshot // copy the value so the closure is 
self-contained
+       processManifestsFn := func(m []iceberg.ManifestFile) 
([]iceberg.ManifestFile, error) {
+               return sp.processManifests(m)
+       }
+
+       rebuildFn := func(_ context.Context, freshParent *Snapshot, fio 
iceio.WriteFileIO, attempt int) (_ *Snapshot, retErr error) {
+               // Load inherited manifests from the fresh parent.
+               var inherited []iceberg.ManifestFile
+               if freshParent != nil {
+                       inherited, retErr = freshParent.Manifests(fio)
+                       if retErr != nil {
+                               return nil, fmt.Errorf("rebuild manifest list: 
load parent manifests: %w", retErr)
+                       }
+               }
+
+               // Combine own manifests with inherited ones, applying any
+               // producer-specific processing (no-op for fast/merge-append).
+               combined, procErr := 
processManifestsFn(slices.Concat(ownManifests, inherited))
+               if procErr != nil {
+                       return nil, fmt.Errorf("rebuild manifest list: process 
manifests: %w", procErr)
+               }
+
+               // Derive the sequence number. When there is a fresh parent, 
use its
+               // sequence number + 1 so the rebuilt snapshot is strictly 
greater than
+               // any committed peer. When there is no fresh parent (first 
snapshot in
+               // the table or unknown parent), preserve the original sequence 
number
+               // from the initial build.
+               var newSeq int64
+               if freshParent != nil && formatVersion >= 2 {
+                       newSeq = freshParent.SequenceNumber + 1

Review Comment:
   [Blocking] `freshParent.SequenceNumber + 1` violates the spec invariant when 
a peer writes to a *different* branch and bumps the table-wide 
`last-sequence-number` without advancing this branch's parent. 
`MetadataBuilder.AddSnapshot` rejects `SequenceNumber <= lastSequenceNumber`, 
and other clients reject the metadata on read. The `else` branch (`freshParent 
== nil`) reuses `capturedSnapshot.SequenceNumber` and has the same problem 
against a freshly loaded metadata. Fix: derive from the fresh metadata — plumb 
`freshMeta` (or precompute `freshMeta.LastSequenceNumber()`) into the closure 
and use `freshMeta.LastSequenceNumber() + 1` for both branches when 
`formatVersion >= 2`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to