mzzz-zzm commented on code in PR #982:
URL: https://github.com/apache/iceberg-go/pull/982#discussion_r3202156649
##########
table/snapshot_producers.go:
##########
@@ -903,9 +943,103 @@ func (sp *snapshotProducer) commit(ctx context.Context)
(_ []Update, _ []Require
})
}
+ // Build the manifest-list rebuild closure. It is called by doCommit
+ // on each OCC retry to regenerate the manifest list so it correctly
+ // inherits all data files committed by concurrent writers since the
+ // original snapshot was built.
+ formatVersion := sp.txn.meta.formatVersion
+ snapshotID := sp.snapshotID
+ commitUUID := sp.commitUuid
+ capturedSnapshot := snapshot // copy the value so the closure is
self-contained
+ processManifestsFn := func(m []iceberg.ManifestFile)
([]iceberg.ManifestFile, error) {
+ return sp.processManifests(m)
+ }
+
+ rebuildFn := func(_ context.Context, freshParent *Snapshot, fio
iceio.WriteFileIO, attempt int) (_ *Snapshot, retErr error) {
+ // Load inherited manifests from the fresh parent.
+ var inherited []iceberg.ManifestFile
+ if freshParent != nil {
+ inherited, retErr = freshParent.Manifests(fio)
+ if retErr != nil {
+ return nil, fmt.Errorf("rebuild manifest list:
load parent manifests: %w", retErr)
+ }
+ }
+
+ // Combine own manifests with inherited ones, applying any
+ // producer-specific processing (no-op for fast/merge-append).
+ combined, procErr :=
processManifestsFn(slices.Concat(ownManifests, inherited))
+ if procErr != nil {
+ return nil, fmt.Errorf("rebuild manifest list: process
manifests: %w", procErr)
+ }
+
+ // Derive the sequence number. When there is a fresh parent,
use its
+ // sequence number + 1 so the rebuilt snapshot is strictly
greater than
+ // any committed peer. When there is no fresh parent (first
snapshot in
+ // the table or unknown parent), preserve the original sequence
number
+ // from the initial build.
+ var newSeq int64
+ if freshParent != nil && formatVersion >= 2 {
+ newSeq = freshParent.SequenceNumber + 1
+ } else {
+ newSeq = capturedSnapshot.SequenceNumber
+ }
+
+ // Write the rebuilt manifest list to a path unique to this
retry
+ // attempt. Each retry uses a different attempt counter in the
filename
+ // (snap-{id}-{attempt}-{uuid}.avro) so that S3
conditional-write
+ // semantics (if-none-match) do not reject the overwrite.
Orphaned files
+ // from superseded retry attempts are removed by doCommit after
the
+ // commit succeeds.
+ fname := newManifestListFileName(snapshotID, attempt,
commitUUID)
+ manifestListPath := locProvider.NewMetadataLocation(fname)
+
+ out, createErr := fio.Create(manifestListPath)
+ if createErr != nil {
+ return nil, fmt.Errorf("rebuild manifest list: create
file: %w", createErr)
+ }
+ defer internal.CheckedClose(out, &retErr)
+
+ var parentID *int64
+ if freshParent != nil {
+ id := freshParent.SnapshotID
+ parentID = &id
+ }
+
+ firstRowID := int64(0)
+ if formatVersion == 3 {
+ writer, wrErr := iceberg.NewManifestListWriterV3(out,
snapshotID, newSeq, firstRowID, parentID)
+ if wrErr != nil {
+ return nil, fmt.Errorf("rebuild manifest list:
create v3 writer: %w", wrErr)
+ }
+ defer internal.CheckedClose(writer, &retErr)
+ if addErr := writer.AddManifests(combined); addErr !=
nil {
+ return nil, fmt.Errorf("rebuild manifest list:
add manifests: %w", addErr)
+ }
+ } else {
+ if wErr := iceberg.WriteManifestList(formatVersion,
out, snapshotID, parentID, &newSeq, firstRowID, combined); wErr != nil {
+ return nil, fmt.Errorf("rebuild manifest list:
write: %w", wErr)
+ }
+ }
+
+ rebuilt := capturedSnapshot
Review Comment:
Inside `rebuildFn`, the rebuilt summary is now recomputed against the fresh
parent instead of copied from `capturedSnapshot`. `deltaSummary` is derived
from `capturedSnapshot.Summary` minus the fresh parent's counts — it represents
only the files this producer added/removed, which is invariant across retries:
```go
if freshParent != nil && freshParent.Summary != nil &&
capturedSnapshot.Summary != nil {
deltaSummary := computeDeltaSummary(capturedSnapshot.Summary,
freshParent.Summary)
if s, sumErr := updateSnapshotSummaries(deltaSummary,
freshParent.Summary.Properties); sumErr == nil {
rebuilt.Summary = s
}
}
```
When `freshParent == nil` (first snapshot on the table), `rebuilt.Summary`
stays as `capturedSnapshot.Summary` — there is no prior state to rebase
against, so the attempt-0 summary is correct.
Tests added: `TestRebuildFn_SummaryRebasedAgainstFreshParent` (concurrent
writer adds 5 files / 500 records / 5000 bytes; this producer adds 1 / 1 / 1 →
asserts rebuilt `total-data-files == 6`, `total-records == 501`,
`total-files-size == 5001`),
`TestRebuildFn_SummaryFreshParentNilKeepsCapturedSummary` (no parent to rebase
against → rebuilt summary equals captured summary).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]