laskoviymishka commented on code in PR #1033: URL: https://github.com/apache/iceberg-go/pull/1033#discussion_r3218278004
########## table/rewrite_files.go: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "maps" + + "github.com/apache/iceberg-go" +) + +// RewriteFiles is the snapshot-operation builder for rewrite +// (compaction) commits. It is the snapshot-level sibling of [RowDelta] +// and mirrors Java's org.apache.iceberg.RewriteFiles interface +// (returned by Table.newRewrite() in Java). +// +// Compared to a raw [Transaction.ReplaceFiles] call, the builder +// owns the rewrite-specific isolation contract internally: +// +// - The overwrite producer's default isolation validator is suppressed +// (concurrent appends into rewritten partitions are allowed; this +// is the defining behavior of a rewrite). +// - A rewrite-specific conflict validator is registered so concurrent +// pos/eq-delete files targeting any rewritten data file are +// rejected pre-flight at [Transaction.Commit] time. The pos-delete +// branch only fires when the concurrent writer populated the +// manifest's referenced_data_file column (field id 143). That +// column is V2-optional and V3-required for deletion-vector +// deletes; V2 pos-delete writers commonly leave it empty, in +// which case only the conservative eq-delete-during-rewrite rule +// fires. +// +// Distributed compaction coordinators construct one [RewriteFiles] on +// the leader transaction, feed worker outputs in via [RewriteFiles.Apply], +// and commit one snapshot. In-process callers can use +// [Transaction.RewriteDataFiles] which drives this builder internally. +// +// The builder follows the same fail-fast pattern as +// [view.MetadataBuilder]: a method that hits an invalid input stages +// the error and short-circuits all subsequent calls until +// [RewriteFiles.Commit] drains it. The builder is single-use; once +// Commit has been called, a second call returns an error regardless +// of whether the first call succeeded. +// +// Adding new delete files (e.g., rewriting position deletes into +// deletion vectors) is not yet supported; [RewriteFiles.AddDataFile] +// rejects pos/eq-delete inputs at insertion time. Add the support to +// the underlying [Transaction.ReplaceFiles] before lifting that +// restriction. +type RewriteFiles struct { + txn *Transaction + dataFilesToDelete []iceberg.DataFile + dataFilesToAdd []iceberg.DataFile + deleteFilesToRemove []iceberg.DataFile + snapshotProps iceberg.Properties + err error + committed bool +} + +// NewRewrite returns a [RewriteFiles] builder bound to this transaction. +// Mirrors Java's org.apache.iceberg.Table#newRewrite. snapshotProps is +// cloned and the clone is added to the rewrite snapshot's summary; +// pass nil for none. +// +// Usage: +// +// rewrite := tx.NewRewrite(nil) +// rewrite.DeleteFile(oldDataFile) +// rewrite.AddDataFile(newDataFile) +// if err := rewrite.Commit(ctx); err != nil { ... } +// committed, err := tx.Commit(ctx) +func (t *Transaction) NewRewrite(snapshotProps iceberg.Properties) *RewriteFiles { + return &RewriteFiles{txn: t, snapshotProps: maps.Clone(snapshotProps)} +} + +// DeleteFile marks a file for removal in this rewrite. Routes by +// content type: data files are queued as data-file replacements; +// pos/eq-delete files are queued for delete-file removal alongside +// the data rewrite (typical when a delete is fully applied to data +// files being rewritten and is therefore safe to expunge). +// +// Any other content type stages an error that is returned from the +// next [RewriteFiles.Commit] call. +func (r *RewriteFiles) DeleteFile(df iceberg.DataFile) *RewriteFiles { + if r.err != nil { + return r + } + + switch df.ContentType() { + case iceberg.EntryContentData: + r.dataFilesToDelete = append(r.dataFilesToDelete, df) + case iceberg.EntryContentPosDeletes, iceberg.EntryContentEqDeletes: + r.deleteFilesToRemove = append(r.deleteFilesToRemove, df) + default: + r.err = fmt.Errorf("%w: DeleteFile got unsupported content type %s (%s)", + ErrInvalidOperation, df.ContentType(), df.FilePath()) + } + + return r +} + +// AddDataFile queues a new data file. Adding delete files is not yet +// supported by the underlying snapshot machinery; a pos/eq-delete here +// stages an error that is returned from the next [RewriteFiles.Commit] +// call. The error names the offending file path so callers driving the +// builder via [RewriteFiles.Apply] can identify it without tracking +// queue order. +func (r *RewriteFiles) AddDataFile(df iceberg.DataFile) *RewriteFiles { + if r.err != nil { + return r + } + + if df.ContentType() != iceberg.EntryContentData { + r.err = fmt.Errorf("%w: AddDataFile only supports data files; got content type %s (%s)", + ErrInvalidOperation, df.ContentType(), df.FilePath()) + + return r + } + r.dataFilesToAdd = append(r.dataFilesToAdd, df) + + return r +} + +// Apply is a bulk shortcut that routes a worker's outputs onto this +// builder: every entry in deletes and safeDeletes is queued via +// [RewriteFiles.DeleteFile] (which routes data vs. delete files by +// content type), and every entry in adds via [RewriteFiles.AddDataFile]. +// +// safeDeletes is the position-delete files referenced by tasks in +// the rewrite group whose target data file is being rewritten — they +// are safe to expunge in the rewrite snapshot. [CollectSafePositionDeletes] +// computes this set; [ExecuteCompactionGroup] populates +// [CompactionGroupResult.SafePosDeletes] from it. +// +// The typical distributed-coordinator pattern is one [RewriteFiles] +// builder + one Apply call per worker result + one Commit: +// +// rewrite := leaderTxn.NewRewrite(snapshotProps) +// for _, gr := range workerResults { +// rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes) +// } +// if err := rewrite.Commit(ctx); err != nil { ... } +func (r *RewriteFiles) Apply(deletes, adds, safeDeletes []iceberg.DataFile) *RewriteFiles { + if r.err != nil { + return r + } + + for _, df := range deletes { + r.DeleteFile(df) + } + for _, df := range adds { + r.AddDataFile(df) + } + for _, df := range safeDeletes { + r.DeleteFile(df) + } + + return r +} + +// Commit stages the rewrite snapshot on the underlying transaction. +// The catalog commit happens once, later, at [Transaction.Commit] time. +// +// Commit is single-shot: any second call returns an error regardless +// of whether the first call succeeded, and neither re-stages the +// rewrite nor re-registers the conflict validator. Returns an error +// if any file passed to [RewriteFiles.AddDataFile] or +// [RewriteFiles.DeleteFile] had an unsupported content type, if the +// builder has no file changes, or if the underlying +// [Transaction.ReplaceFiles] call fails. +func (r *RewriteFiles) Commit(ctx context.Context) error { + if r.committed { + return fmt.Errorf("%w: RewriteFiles.Commit already called on this builder", ErrInvalidOperation) + } + r.committed = true + + if r.err != nil { + return r.err + } + if len(r.dataFilesToDelete) == 0 && len(r.dataFilesToAdd) == 0 && len(r.deleteFilesToRemove) == 0 { + return fmt.Errorf("%w: rewrite must have at least one file change", ErrInvalidOperation) Review Comment: Empty-deletes-with-adds silently downgrades to `OpAppend`. When `dataFilesToDelete == 0 && dataFilesToAdd > 0 && deleteFilesToRemove == 0`, the guard at L196 passes, the call reaches `ReplaceFiles`, which delegates to `ReplaceDataFilesWithDataFiles` (no deletes to remove), which delegates to `AddDataFiles` (no files to delete). `AddDataFiles` is an `OpAppend` producer and never consults `cfg.rewriteSemantics`, so the snapshot is tagged `append` and no rewrite validator is registered — even though `withRewriteSemantics()` is set on the option chain. A distributed coordinator that has a group with only new files (e.g., a worker that wrote replacement files but the delete side is empty because nothing was actually replaced) gets a silently wrong snapshot op and no isolation against concurrent writers. Worth rejecting explicitly: ```go if len(r.dataFilesToDelete) == 0 && len(r.dataFilesToAdd) > 0 { return fmt.Errorf("%w: rewrite must delete at least one data file when adding data files", ErrInvalidOperation) } ``` (A pure delete-files expunge with no data movement — `deleteFilesToRemove > 0` only — is still a legitimate `OpReplace` and would pass.) ########## table/rewrite_files.go: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "maps" + + "github.com/apache/iceberg-go" +) + +// RewriteFiles is the snapshot-operation builder for rewrite +// (compaction) commits. It is the snapshot-level sibling of [RowDelta] +// and mirrors Java's org.apache.iceberg.RewriteFiles interface +// (returned by Table.newRewrite() in Java). +// +// Compared to a raw [Transaction.ReplaceFiles] call, the builder +// owns the rewrite-specific isolation contract internally: +// +// - The overwrite producer's default isolation validator is suppressed +// (concurrent appends into rewritten partitions are allowed; this +// is the defining behavior of a rewrite). +// - A rewrite-specific conflict validator is registered so concurrent +// pos/eq-delete files targeting any rewritten data file are +// rejected pre-flight at [Transaction.Commit] time. The pos-delete +// branch only fires when the concurrent writer populated the +// manifest's referenced_data_file column (field id 143). That +// column is V2-optional and V3-required for deletion-vector +// deletes; V2 pos-delete writers commonly leave it empty, in +// which case only the conservative eq-delete-during-rewrite rule +// fires. +// +// Distributed compaction coordinators construct one [RewriteFiles] on +// the leader transaction, feed worker outputs in via [RewriteFiles.Apply], +// and commit one snapshot. In-process callers can use +// [Transaction.RewriteDataFiles] which drives this builder internally. +// +// The builder follows the same fail-fast pattern as +// [view.MetadataBuilder]: a method that hits an invalid input stages +// the error and short-circuits all subsequent calls until +// [RewriteFiles.Commit] drains it. The builder is single-use; once +// Commit has been called, a second call returns an error regardless +// of whether the first call succeeded. +// +// Adding new delete files (e.g., rewriting position deletes into +// deletion vectors) is not yet supported; [RewriteFiles.AddDataFile] +// rejects pos/eq-delete inputs at insertion time. Add the support to +// the underlying [Transaction.ReplaceFiles] before lifting that +// restriction. +type RewriteFiles struct { + txn *Transaction + dataFilesToDelete []iceberg.DataFile + dataFilesToAdd []iceberg.DataFile + deleteFilesToRemove []iceberg.DataFile + snapshotProps iceberg.Properties + err error + committed bool +} + +// NewRewrite returns a [RewriteFiles] builder bound to this transaction. +// Mirrors Java's org.apache.iceberg.Table#newRewrite. snapshotProps is +// cloned and the clone is added to the rewrite snapshot's summary; +// pass nil for none. +// +// Usage: +// +// rewrite := tx.NewRewrite(nil) +// rewrite.DeleteFile(oldDataFile) +// rewrite.AddDataFile(newDataFile) +// if err := rewrite.Commit(ctx); err != nil { ... } +// committed, err := tx.Commit(ctx) +func (t *Transaction) NewRewrite(snapshotProps iceberg.Properties) *RewriteFiles { + return &RewriteFiles{txn: t, snapshotProps: maps.Clone(snapshotProps)} +} + +// DeleteFile marks a file for removal in this rewrite. Routes by +// content type: data files are queued as data-file replacements; +// pos/eq-delete files are queued for delete-file removal alongside +// the data rewrite (typical when a delete is fully applied to data +// files being rewritten and is therefore safe to expunge). +// +// Any other content type stages an error that is returned from the +// next [RewriteFiles.Commit] call. +func (r *RewriteFiles) DeleteFile(df iceberg.DataFile) *RewriteFiles { + if r.err != nil { + return r + } + + switch df.ContentType() { + case iceberg.EntryContentData: + r.dataFilesToDelete = append(r.dataFilesToDelete, df) + case iceberg.EntryContentPosDeletes, iceberg.EntryContentEqDeletes: + r.deleteFilesToRemove = append(r.deleteFilesToRemove, df) + default: + r.err = fmt.Errorf("%w: DeleteFile got unsupported content type %s (%s)", + ErrInvalidOperation, df.ContentType(), df.FilePath()) + } + + return r +} + +// AddDataFile queues a new data file. Adding delete files is not yet +// supported by the underlying snapshot machinery; a pos/eq-delete here +// stages an error that is returned from the next [RewriteFiles.Commit] +// call. The error names the offending file path so callers driving the +// builder via [RewriteFiles.Apply] can identify it without tracking +// queue order. +func (r *RewriteFiles) AddDataFile(df iceberg.DataFile) *RewriteFiles { + if r.err != nil { + return r + } + + if df.ContentType() != iceberg.EntryContentData { + r.err = fmt.Errorf("%w: AddDataFile only supports data files; got content type %s (%s)", + ErrInvalidOperation, df.ContentType(), df.FilePath()) + + return r + } + r.dataFilesToAdd = append(r.dataFilesToAdd, df) + + return r +} + +// Apply is a bulk shortcut that routes a worker's outputs onto this +// builder: every entry in deletes and safeDeletes is queued via +// [RewriteFiles.DeleteFile] (which routes data vs. delete files by +// content type), and every entry in adds via [RewriteFiles.AddDataFile]. +// +// safeDeletes is the position-delete files referenced by tasks in +// the rewrite group whose target data file is being rewritten — they +// are safe to expunge in the rewrite snapshot. [CollectSafePositionDeletes] +// computes this set; [ExecuteCompactionGroup] populates +// [CompactionGroupResult.SafePosDeletes] from it. +// +// The typical distributed-coordinator pattern is one [RewriteFiles] +// builder + one Apply call per worker result + one Commit: +// +// rewrite := leaderTxn.NewRewrite(snapshotProps) +// for _, gr := range workerResults { +// rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes) +// } +// if err := rewrite.Commit(ctx); err != nil { ... } +func (r *RewriteFiles) Apply(deletes, adds, safeDeletes []iceberg.DataFile) *RewriteFiles { + if r.err != nil { + return r + } + + for _, df := range deletes { + r.DeleteFile(df) + } + for _, df := range adds { + r.AddDataFile(df) + } + for _, df := range safeDeletes { + r.DeleteFile(df) + } + + return r +} + +// Commit stages the rewrite snapshot on the underlying transaction. +// The catalog commit happens once, later, at [Transaction.Commit] time. +// +// Commit is single-shot: any second call returns an error regardless +// of whether the first call succeeded, and neither re-stages the +// rewrite nor re-registers the conflict validator. Returns an error +// if any file passed to [RewriteFiles.AddDataFile] or +// [RewriteFiles.DeleteFile] had an unsupported content type, if the +// builder has no file changes, or if the underlying +// [Transaction.ReplaceFiles] call fails. +func (r *RewriteFiles) Commit(ctx context.Context) error { + if r.committed { + return fmt.Errorf("%w: RewriteFiles.Commit already called on this builder", ErrInvalidOperation) + } + r.committed = true + + if r.err != nil { + return r.err + } + if len(r.dataFilesToDelete) == 0 && len(r.dataFilesToAdd) == 0 && len(r.deleteFilesToRemove) == 0 { + return fmt.Errorf("%w: rewrite must have at least one file change", ErrInvalidOperation) + } + + if err := r.txn.ReplaceFiles(ctx, r.dataFilesToDelete, r.dataFilesToAdd, r.deleteFilesToRemove, r.snapshotProps, withRewriteSemantics()); err != nil { + return err + } + + if len(r.dataFilesToDelete) > 0 { + rewritten := make([]string, 0, len(r.dataFilesToDelete)) + for _, df := range r.dataFilesToDelete { + rewritten = append(rewritten, df.FilePath()) + } + r.txn.validators = append(r.txn.validators, rewriteValidator(rewritten)) Review Comment: `r.txn.validators = append(...)` mutates a `Transaction` field without holding `t.mx`. `Transaction.Commit` takes `t.mx` and then reads `t.validators` under the lock. The doc-comment on `RewriteFiles` (L46-49) explicitly endorses the fanout pattern of multiple builders feeding the same transaction — that is exactly the surface where two goroutines could race on this append. `RowDelta.Commit` has the same convention break, so fixing both at once via a tiny helper would be cleanest: ```go // in transaction.go func (t *Transaction) addValidator(v conflictValidatorFunc) { t.mx.Lock() defer t.mx.Unlock() t.validators = append(t.validators, v) } ``` Then `RewriteFiles.Commit` and `RowDelta.Commit` both use `r.txn.addValidator(...)`. `go test -race` would catch the existing race today under any concurrent driver. ########## table/rewrite_data_files.go: ########## @@ -242,19 +379,24 @@ func rewriteValidator(rewrittenPaths []string) conflictValidatorFunc { } } -// collectSafePositionDeletes returns position delete files from the given -// tasks that are safe to remove during compaction. +// CollectSafePositionDeletes returns position delete files from the +// given tasks that are safe to remove during compaction. // -// A position delete file is safe to remove when it was matched to a data -// file (via scan planning) and that data file is being rewritten in this -// compaction group. Since ReadTasks applies the deletes during reading, -// the new output files will not contain the deleted rows. +// A position delete file is safe to remove when it was matched to a +// data file (via scan planning) and that data file is being rewritten +// in this compaction group. Since ReadTasks applies the deletes during +// reading, the new output files will not contain the deleted rows. // // Only position deletes (EntryContentPosDeletes) are considered. // Equality deletes are decided by [compaction.DecideDeadEqualityDeletes] // (which needs partition-wide visibility, not just the task scope). // Deletion vectors will be handled when DV read support lands. -func collectSafePositionDeletes(tasks []FileScanTask) []iceberg.DataFile { +// +// [ExecuteCompactionGroup] calls this internally to populate +// [CompactionGroupResult.SafePosDeletes]. It is kept exported for +// custom workers that want the spec-shaped predicate without taking +// the rest of [ExecuteCompactionGroup]'s read+write pipeline. +func CollectSafePositionDeletes(tasks []FileScanTask) []iceberg.DataFile { Review Comment: Predicate caveat: this collects every pos-delete file matched to *any* task in the group, but it doesn't check that *every* data file the pos-delete references is in the rewrite set. Within a partition the planner produces multiple groups via bin-packing (`compaction.go` `packer.PackEnd`) and skips files via `MinInputFiles` — so a pos-delete file that targets data files in groups 1 and 2 (or in group 1 + a skipped file) will get marked safe by group 1's pass. After group 1's commit, the still-live data files lose their deletes. A second-order consequence now that this is public API: if a pos-delete spans two groups, both groups' `SafePosDeletes` will contain it; when the coordinator calls `Apply` twice on the same `RewriteFiles`, `deleteFilesToRemove` ends up with duplicate paths and `ReplaceFiles` rejects with "delete file paths must be unique." That at least makes the bug self-reporting in the multi-Apply case, but the single-group case is still silent data loss. This is pre-existing as `collectSafePositionDeletes` and not a regression; but it's worth at minimum a doc note pinning the contract ("caller must verify every referenced data file is in the rewrite set") since the function is now public, and ideally a follow-up that takes a `rewrittenPaths` set or moves the computation leader-side after worker outputs aggregate. ########## table/rewrite_files.go: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "maps" + + "github.com/apache/iceberg-go" +) + +// RewriteFiles is the snapshot-operation builder for rewrite +// (compaction) commits. It is the snapshot-level sibling of [RowDelta] +// and mirrors Java's org.apache.iceberg.RewriteFiles interface +// (returned by Table.newRewrite() in Java). +// +// Compared to a raw [Transaction.ReplaceFiles] call, the builder +// owns the rewrite-specific isolation contract internally: +// +// - The overwrite producer's default isolation validator is suppressed +// (concurrent appends into rewritten partitions are allowed; this +// is the defining behavior of a rewrite). +// - A rewrite-specific conflict validator is registered so concurrent +// pos/eq-delete files targeting any rewritten data file are +// rejected pre-flight at [Transaction.Commit] time. The pos-delete +// branch only fires when the concurrent writer populated the +// manifest's referenced_data_file column (field id 143). That +// column is V2-optional and V3-required for deletion-vector +// deletes; V2 pos-delete writers commonly leave it empty, in +// which case only the conservative eq-delete-during-rewrite rule +// fires. +// +// Distributed compaction coordinators construct one [RewriteFiles] on +// the leader transaction, feed worker outputs in via [RewriteFiles.Apply], +// and commit one snapshot. In-process callers can use +// [Transaction.RewriteDataFiles] which drives this builder internally. +// +// The builder follows the same fail-fast pattern as +// [view.MetadataBuilder]: a method that hits an invalid input stages +// the error and short-circuits all subsequent calls until +// [RewriteFiles.Commit] drains it. The builder is single-use; once +// Commit has been called, a second call returns an error regardless +// of whether the first call succeeded. +// +// Adding new delete files (e.g., rewriting position deletes into +// deletion vectors) is not yet supported; [RewriteFiles.AddDataFile] +// rejects pos/eq-delete inputs at insertion time. Add the support to +// the underlying [Transaction.ReplaceFiles] before lifting that +// restriction. +type RewriteFiles struct { + txn *Transaction + dataFilesToDelete []iceberg.DataFile + dataFilesToAdd []iceberg.DataFile + deleteFilesToRemove []iceberg.DataFile + snapshotProps iceberg.Properties + err error + committed bool +} + +// NewRewrite returns a [RewriteFiles] builder bound to this transaction. +// Mirrors Java's org.apache.iceberg.Table#newRewrite. snapshotProps is +// cloned and the clone is added to the rewrite snapshot's summary; +// pass nil for none. +// +// Usage: +// +// rewrite := tx.NewRewrite(nil) +// rewrite.DeleteFile(oldDataFile) +// rewrite.AddDataFile(newDataFile) +// if err := rewrite.Commit(ctx); err != nil { ... } +// committed, err := tx.Commit(ctx) +func (t *Transaction) NewRewrite(snapshotProps iceberg.Properties) *RewriteFiles { + return &RewriteFiles{txn: t, snapshotProps: maps.Clone(snapshotProps)} +} + +// DeleteFile marks a file for removal in this rewrite. Routes by +// content type: data files are queued as data-file replacements; +// pos/eq-delete files are queued for delete-file removal alongside +// the data rewrite (typical when a delete is fully applied to data +// files being rewritten and is therefore safe to expunge). +// +// Any other content type stages an error that is returned from the +// next [RewriteFiles.Commit] call. +func (r *RewriteFiles) DeleteFile(df iceberg.DataFile) *RewriteFiles { + if r.err != nil { + return r + } + + switch df.ContentType() { + case iceberg.EntryContentData: + r.dataFilesToDelete = append(r.dataFilesToDelete, df) + case iceberg.EntryContentPosDeletes, iceberg.EntryContentEqDeletes: + r.deleteFilesToRemove = append(r.deleteFilesToRemove, df) + default: + r.err = fmt.Errorf("%w: DeleteFile got unsupported content type %s (%s)", + ErrInvalidOperation, df.ContentType(), df.FilePath()) + } + + return r +} + +// AddDataFile queues a new data file. Adding delete files is not yet +// supported by the underlying snapshot machinery; a pos/eq-delete here +// stages an error that is returned from the next [RewriteFiles.Commit] +// call. The error names the offending file path so callers driving the +// builder via [RewriteFiles.Apply] can identify it without tracking +// queue order. +func (r *RewriteFiles) AddDataFile(df iceberg.DataFile) *RewriteFiles { + if r.err != nil { + return r + } + + if df.ContentType() != iceberg.EntryContentData { + r.err = fmt.Errorf("%w: AddDataFile only supports data files; got content type %s (%s)", + ErrInvalidOperation, df.ContentType(), df.FilePath()) + + return r + } + r.dataFilesToAdd = append(r.dataFilesToAdd, df) + + return r +} + +// Apply is a bulk shortcut that routes a worker's outputs onto this +// builder: every entry in deletes and safeDeletes is queued via +// [RewriteFiles.DeleteFile] (which routes data vs. delete files by +// content type), and every entry in adds via [RewriteFiles.AddDataFile]. +// +// safeDeletes is the position-delete files referenced by tasks in +// the rewrite group whose target data file is being rewritten — they +// are safe to expunge in the rewrite snapshot. [CollectSafePositionDeletes] +// computes this set; [ExecuteCompactionGroup] populates +// [CompactionGroupResult.SafePosDeletes] from it. +// +// The typical distributed-coordinator pattern is one [RewriteFiles] +// builder + one Apply call per worker result + one Commit: +// +// rewrite := leaderTxn.NewRewrite(snapshotProps) +// for _, gr := range workerResults { +// rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes) +// } +// if err := rewrite.Commit(ctx); err != nil { ... } +func (r *RewriteFiles) Apply(deletes, adds, safeDeletes []iceberg.DataFile) *RewriteFiles { Review Comment: The three-slice shape is a structural footgun for distributed coordinators. `deletes`, `adds`, and `safeDeletes` are all `[]iceberg.DataFile`; `deletes` and `safeDeletes` both route through `DeleteFile()` and are dispatched by content type, so a caller transposing slot 1 and slot 3 is silent unless the dispatch-by-content-type catches it. The documented coordinator pattern (L153-158) hands `gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes` in exactly the same slot order — three positional same-typed slices that happen to map 1:1 to three same-typed struct fields is the kind of glue that breaks silently under refactor. A non-breaking option: add a typed sibling that takes the result directly: ```go func (r *RewriteFiles) ApplyResult(gr CompactionGroupResult) *RewriteFiles { return r.Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes) } ``` At minimum the param could be renamed `safePosDeletes` to match the source field name, and the godoc could warn that slot 1 and slot 3 are not interchangeable despite both routing through `DeleteFile`. ########## table/rewrite_files.go: ########## @@ -0,0 +1,213 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package table + +import ( + "context" + "fmt" + "maps" + + "github.com/apache/iceberg-go" +) + +// RewriteFiles is the snapshot-operation builder for rewrite +// (compaction) commits. It is the snapshot-level sibling of [RowDelta] +// and mirrors Java's org.apache.iceberg.RewriteFiles interface +// (returned by Table.newRewrite() in Java). +// +// Compared to a raw [Transaction.ReplaceFiles] call, the builder +// owns the rewrite-specific isolation contract internally: +// +// - The overwrite producer's default isolation validator is suppressed +// (concurrent appends into rewritten partitions are allowed; this +// is the defining behavior of a rewrite). +// - A rewrite-specific conflict validator is registered so concurrent +// pos/eq-delete files targeting any rewritten data file are +// rejected pre-flight at [Transaction.Commit] time. The pos-delete +// branch only fires when the concurrent writer populated the +// manifest's referenced_data_file column (field id 143). That +// column is V2-optional and V3-required for deletion-vector +// deletes; V2 pos-delete writers commonly leave it empty, in +// which case only the conservative eq-delete-during-rewrite rule +// fires. +// +// Distributed compaction coordinators construct one [RewriteFiles] on +// the leader transaction, feed worker outputs in via [RewriteFiles.Apply], +// and commit one snapshot. In-process callers can use +// [Transaction.RewriteDataFiles] which drives this builder internally. +// +// The builder follows the same fail-fast pattern as +// [view.MetadataBuilder]: a method that hits an invalid input stages +// the error and short-circuits all subsequent calls until +// [RewriteFiles.Commit] drains it. The builder is single-use; once +// Commit has been called, a second call returns an error regardless +// of whether the first call succeeded. +// +// Adding new delete files (e.g., rewriting position deletes into +// deletion vectors) is not yet supported; [RewriteFiles.AddDataFile] +// rejects pos/eq-delete inputs at insertion time. Add the support to +// the underlying [Transaction.ReplaceFiles] before lifting that +// restriction. +type RewriteFiles struct { + txn *Transaction + dataFilesToDelete []iceberg.DataFile + dataFilesToAdd []iceberg.DataFile + deleteFilesToRemove []iceberg.DataFile + snapshotProps iceberg.Properties + err error + committed bool +} + +// NewRewrite returns a [RewriteFiles] builder bound to this transaction. +// Mirrors Java's org.apache.iceberg.Table#newRewrite. snapshotProps is +// cloned and the clone is added to the rewrite snapshot's summary; +// pass nil for none. +// +// Usage: +// +// rewrite := tx.NewRewrite(nil) +// rewrite.DeleteFile(oldDataFile) +// rewrite.AddDataFile(newDataFile) +// if err := rewrite.Commit(ctx); err != nil { ... } +// committed, err := tx.Commit(ctx) +func (t *Transaction) NewRewrite(snapshotProps iceberg.Properties) *RewriteFiles { + return &RewriteFiles{txn: t, snapshotProps: maps.Clone(snapshotProps)} +} + +// DeleteFile marks a file for removal in this rewrite. Routes by +// content type: data files are queued as data-file replacements; +// pos/eq-delete files are queued for delete-file removal alongside +// the data rewrite (typical when a delete is fully applied to data +// files being rewritten and is therefore safe to expunge). +// +// Any other content type stages an error that is returned from the +// next [RewriteFiles.Commit] call. +func (r *RewriteFiles) DeleteFile(df iceberg.DataFile) *RewriteFiles { + if r.err != nil { + return r + } + + switch df.ContentType() { Review Comment: Nit: `df.ContentType()` is called without a nil-check. `iceberg.DataFile` is an interface, so a nil argument panics with a nil-pointer-dereference instead of returning a clean `ErrInvalidOperation`. `ReplaceFiles` at transaction.go:822/834 guards nil explicitly. Distributed callers deserialize data files from a wire format, so a wire-decode bug producing a nil entry would crash the coordinator rather than fail cleanly. Same applies at `AddDataFile` L129. Easy fix: ```go if df == nil { r.err = fmt.Errorf("%w: nil data file", ErrInvalidOperation) return r } ``` ########## table/rewrite_data_files_test.go: ########## @@ -243,6 +243,122 @@ func TestRewriteDataFiles_EmptyPlan(t *testing.T) { assert.Equal(t, int64(0), result.BytesBefore) } +// TestExecuteCompactionGroup_TargetFileSizeForwarded verifies that +// WithCompactionTargetFileSize reaches the underlying WriteRecords +// call: a tiny target size on a multi-row group must force the +// writer to emit more than one output file. +func TestExecuteCompactionGroup_TargetFileSizeForwarded(t *testing.T) { + tbl := newRewriteTestTable(t) + + arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, false) + require.NoError(t, err) + + for i := range 5 { + dataPath := tbl.Location() + fmt.Sprintf("/data/file-%d.parquet", i) + writeParquetFile(t, dataPath, arrowSc, + fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1)) + tx := tbl.NewTransaction() + require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, nil, false)) + tbl, err = tx.Commit(t.Context()) + require.NoError(t, err) + } + + tasks, err := tbl.Scan().PlanFiles(t.Context()) + require.NoError(t, err) + + plan, err := defaultTestCompactionCfg.PlanCompaction(tasks) + require.NoError(t, err) + require.NotEmpty(t, plan.Groups) + + groups := toTaskGroups(plan.Groups) + require.Len(t, groups, 1, "test assumes a single group; tighten plan if this changes") + + withTiny, err := table.ExecuteCompactionGroup(t.Context(), tbl, groups[0], + table.WithCompactionTargetFileSize(1)) + require.NoError(t, err) + assert.Greater(t, len(withTiny.NewDataFiles), 1, + "WithCompactionTargetFileSize(1) must force the writer to roll over per row") + + withDefault, err := table.ExecuteCompactionGroup(t.Context(), tbl, groups[0]) + require.NoError(t, err) + assert.Len(t, withDefault.NewDataFiles, 1, + "without the option, the same group consolidates into a single file") +} + +// TestExecuteCompactionGroup_ScanConcurrencyForwarded is a smoke test +// confirming WithCompactionScanConcurrency is wired through without +// breaking the read path. We can't easily observe scan parallelism from +// the result, so the assertion is correctness equivalence with the +// default. +func TestExecuteCompactionGroup_ScanConcurrencyForwarded(t *testing.T) { + tbl := newRewriteTestTable(t) + + arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, false) + require.NoError(t, err) + + for i := range 3 { + dataPath := tbl.Location() + fmt.Sprintf("/data/file-%d.parquet", i) + writeParquetFile(t, dataPath, arrowSc, + fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1)) + tx := tbl.NewTransaction() + require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, nil, false)) + tbl, err = tx.Commit(t.Context()) + require.NoError(t, err) + } + + tasks, err := tbl.Scan().PlanFiles(t.Context()) + require.NoError(t, err) + + plan, err := defaultTestCompactionCfg.PlanCompaction(tasks) + require.NoError(t, err) + require.NotEmpty(t, plan.Groups) + + groups := toTaskGroups(plan.Groups) + + got, err := table.ExecuteCompactionGroup(t.Context(), tbl, groups[0], + table.WithCompactionScanConcurrency(1)) + require.NoError(t, err) + assert.NotEmpty(t, got.NewDataFiles) + assert.NotEmpty(t, got.OldDataFiles) Review Comment: This test is effectively tautological — its only assertions are `NotEmpty(NewDataFiles)` and `NotEmpty(OldDataFiles)`, both of which would pass if `WithCompactionScanConcurrency` were a complete no-op. The test docstring acknowledges "we can't easily observe scan parallelism from the result," but in its current shape a regression that silently drops the option would not be caught. At minimum, run a second `ExecuteCompactionGroup` without the option and assert the two results have the same `len(NewDataFiles)` / `len(OldDataFiles)` — so a regression that breaks the scan path when concurrency is set produces divergent output. ########## table/rewrite_data_files.go: ########## @@ -142,91 +219,151 @@ func (t *Transaction) RewriteDataFiles(ctx context.Context, groups []CompactionT continue } - // Read with deletes applied. - arrowSchema, records, err := scan.ReadTasks(ctx, group.Tasks) + gr, err := ExecuteCompactionGroup(ctx, t.tbl, group, opts.GroupOptions...) if err != nil { - return result, fmt.Errorf("read tasks for compaction group %q: %w", group.PartitionKey, err) + return result, err } - // Each compaction group is single-partition by construction, so the - // read stream is trivially clustered and we can use the clustered writer. - var newFiles []iceberg.DataFile - for df, err := range WriteRecords(ctx, t.tbl, arrowSchema, records, WithClusteredWrite()) { - if err != nil { - return result, fmt.Errorf("write compacted files for group %q: %w", group.PartitionKey, err) - } - newFiles = append(newFiles, df) + if len(gr.OldDataFiles) == 0 && len(gr.NewDataFiles) == 0 { + continue } - // Collect old data files. - oldDataFiles := make([]iceberg.DataFile, 0, len(group.Tasks)) - for _, task := range group.Tasks { - oldDataFiles = append(oldDataFiles, task.File) + rewrite.Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes) + accumulateGroupMetrics(result, gr) + } + + if result.RewrittenGroups == 0 { + return result, nil + } + + for _, df := range opts.ExtraDeleteFilesToRemove { + rewrite.DeleteFile(df) + result.RemovedEqualityDeleteFiles++ + } + + if err := rewrite.Commit(ctx); err != nil { + return result, fmt.Errorf("commit compaction: %w", err) + } + + return result, nil +} + +// ExecuteCompactionGroup reads a compaction group's tasks (with +// deletes applied), writes consolidated output files via +// [WriteRecords], and computes the position-delete files safe to +// expunge in the rewrite snapshot. It does not commit — the caller +// hands the result to a coordinator that uses [Transaction.NewRewrite] +// + [RewriteFiles.Apply] + [RewriteFiles.Commit] to stage the +// atomic commit. +// +// Empty groups return a zero [CompactionGroupResult] without doing +// any I/O. +// +// In-process callers should prefer [Transaction.RewriteDataFiles], +// which drives this and the commit step in one call. +// +// Tunables are exposed via [CompactionGroupOption]. The clustered +// write path is always used (a compaction group is single-partition +// by construction so its read stream is trivially clustered). +func ExecuteCompactionGroup(ctx context.Context, tbl *Table, group CompactionTaskGroup, opts ...CompactionGroupOption) (CompactionGroupResult, error) { + if len(group.Tasks) == 0 { + return CompactionGroupResult{PartitionKey: group.PartitionKey}, nil + } + + cfg := compactionGroupConfig{} + for _, opt := range opts { + opt(&cfg) + } + + var scanOpts []ScanOption + if cfg.scanConcurrency > 0 { + scanOpts = append(scanOpts, WitMaxConcurrency(cfg.scanConcurrency)) + } + + arrowSchema, records, err := tbl.Scan(scanOpts...).ReadTasks(ctx, group.Tasks) + if err != nil { + return CompactionGroupResult{}, fmt.Errorf("read tasks for compaction group %q: %w", group.PartitionKey, err) + } + + // Each compaction group is single-partition by construction, so the + // read stream is trivially clustered and we can use the clustered writer. + writeOpts := []WriteRecordOption{WithClusteredWrite()} + if cfg.targetFileSize > 0 { + writeOpts = append(writeOpts, WithTargetFileSize(cfg.targetFileSize)) + } + + var ( + newFiles []iceberg.DataFile + bytesAfter int64 + ) + for df, err := range WriteRecords(ctx, tbl, arrowSchema, records, writeOpts...) { + if err != nil { + return CompactionGroupResult{}, fmt.Errorf("write compacted files for group %q: %w", group.PartitionKey, err) } + newFiles = append(newFiles, df) + bytesAfter += df.FileSizeBytes() + } + + oldFiles := make([]iceberg.DataFile, 0, len(group.Tasks)) + for _, task := range group.Tasks { + oldFiles = append(oldFiles, task.File) + } - // Collect position delete files safe to remove. - safeDeletes := collectSafePositionDeletes(group.Tasks) + return CompactionGroupResult{ + PartitionKey: group.PartitionKey, + OldDataFiles: oldFiles, + NewDataFiles: newFiles, + SafePosDeletes: CollectSafePositionDeletes(group.Tasks), + BytesBefore: group.TotalSizeBytes, + BytesAfter: bytesAfter, + }, nil +} - // Update result metrics. - var bytesAfter int64 - for _, df := range newFiles { - bytesAfter += df.FileSizeBytes() +// rewriteDataFilesPartial stages each group as its own rewrite +// snapshot via a fresh [RewriteFiles] builder. Each builder commits +// independently inside the loop, so a mid-loop write failure leaves +// already-staged groups in the transaction and the catalog still +// receives them at [Transaction.Commit] time. +func (t *Transaction) rewriteDataFilesPartial(ctx context.Context, groups []CompactionTaskGroup, opts RewriteDataFilesOptions) (*RewriteResult, error) { + result := &RewriteResult{} + + for _, group := range groups { + if err := ctx.Err(); err != nil { + return result, err } - result.RewrittenGroups++ - result.AddedDataFiles += len(newFiles) - result.RemovedDataFiles += len(oldDataFiles) - result.RemovedPositionDeleteFiles += len(safeDeletes) - result.BytesBefore += group.TotalSizeBytes - result.BytesAfter += bytesAfter - - // Always accumulate across groups; partial-progress mode also - // stages each group via ReplaceFiles so work survives a - // mid-loop write failure, but the final catalog commit is - // always one atomic doCommit at Transaction.Commit() time. - allOldData = append(allOldData, oldDataFiles...) - allNewData = append(allNewData, newFiles...) - allOldDeletes = append(allOldDeletes, safeDeletes...) - - if opts.PartialProgress { - if err := t.ReplaceFiles(ctx, oldDataFiles, newFiles, safeDeletes, opts.SnapshotProps, withRewriteSemantics()); err != nil { - return result, fmt.Errorf("commit compaction group %q: %w", group.PartitionKey, err) - } + if len(group.Tasks) == 0 { + continue } - } - // Register the rewrite-specific conflict validator covering every - // rewritten data file across every group. The validator list is - // drained at Transaction.Commit() → doCommit. Runs alongside the - // overwrite producer's suppressed validator (via - // withRewriteSemantics) so concurrent pos/eq-deletes targeting a - // rewritten file are caught pre-flight. - if len(allOldData) > 0 { - rewritten := make([]string, 0, len(allOldData)) - for _, f := range allOldData { - rewritten = append(rewritten, f.FilePath()) + gr, err := ExecuteCompactionGroup(ctx, t.tbl, group, opts.GroupOptions...) + if err != nil { + return result, err } - t.validators = append(t.validators, rewriteValidator(rewritten)) - } - if !opts.PartialProgress { - // Caller-supplied dead eq-deletes (typically from - // [compaction.CollectDeadEqualityDeletes]). The caller is - // responsible for computing these against the same snapshot - // this transaction is staged on. - if len(opts.ExtraDeleteFilesToRemove) > 0 { - allOldDeletes = append(allOldDeletes, opts.ExtraDeleteFilesToRemove...) - result.RemovedEqualityDeleteFiles += len(opts.ExtraDeleteFilesToRemove) + if len(gr.OldDataFiles) == 0 && len(gr.NewDataFiles) == 0 { + continue } - if err := t.ReplaceFiles(ctx, allOldData, allNewData, allOldDeletes, opts.SnapshotProps, withRewriteSemantics()); err != nil { - return result, fmt.Errorf("commit compaction: %w", err) + if err := t.NewRewrite(opts.SnapshotProps).Apply(gr.OldDataFiles, gr.NewDataFiles, gr.SafePosDeletes).Commit(ctx); err != nil { Review Comment: Partial-progress accumulates one rewrite validator per group on the transaction. After N groups, the transaction has N validators, and at `Transaction.Commit` time (refresh-replay) each independently walks the same concurrent-snapshot set looking for deletes against its own path subset — O(N · |concurrent_snapshots|) walks where the conflict surface is really the union and one walk would do. Not a correctness issue, but partial-progress is the path callers reach for when groups are many (the docstring frames it that way), so the perf cliff scales with the very dimension this mode is for. Cheap fix is to coalesce: collect all rewritten paths across groups and register one `rewriteValidator(allRewritten)` at the end of `rewriteDataFilesPartial`, skipping the per-group validator append in this path. ########## table/rewrite_data_files.go: ########## @@ -70,34 +71,112 @@ type CompactionTaskGroup struct { TotalSizeBytes int64 } +// CompactionGroupResult is the per-group output of a compaction +// worker: the new files written, the old files being replaced, and +// the position delete files safe to expunge in the rewrite snapshot. +// +// A distributed coordinator aggregates results from N workers and +// applies them to a [RewriteFiles] builder via [RewriteFiles.Apply] +// to commit a single atomic snapshot. Each field is plain data +// ([]iceberg.DataFile values plus scalars) — callers serialize the +// contained DataFiles across process boundaries themselves; the +// typical pattern is to have the worker write a manifest containing +// the new files and ship the manifest path to the coordinator, which +// re-reads it. +type CompactionGroupResult struct { + // PartitionKey mirrors [CompactionTaskGroup.PartitionKey] for + // display/logging on the coordinator. + PartitionKey string + + // OldDataFiles are the data files this group replaces. + OldDataFiles []iceberg.DataFile + + // NewDataFiles are the consolidated outputs the worker wrote. + NewDataFiles []iceberg.DataFile + + // SafePosDeletes are position-delete files referenced by tasks in + // this group whose target data file is being rewritten, computed + // via [CollectSafePositionDeletes]. They are safe to expunge in + // the rewrite snapshot. + SafePosDeletes []iceberg.DataFile + + // BytesBefore is [CompactionTaskGroup.TotalSizeBytes] passed + // through, recorded so the coordinator can roll up metrics + // without re-reading the plan. + BytesBefore int64 + + // BytesAfter is the sum of [iceberg.DataFile.FileSizeBytes] across + // NewDataFiles. + BytesAfter int64 +} + // RewriteDataFilesOptions bundles the per-rewrite knobs for -// Transaction.RewriteDataFiles. +// [Transaction.RewriteDataFiles]. type RewriteDataFilesOptions struct { - // PartialProgress, when true, stages each group via ReplaceFiles - // inside the loop so work survives a mid-loop write failure. When - // false (the default), all groups are committed in a single atomic - // snapshot. + // PartialProgress, when true, stages each group as its own + // rewrite snapshot inside the loop so a mid-loop write failure + // leaves the already-completed groups staged on this transaction + // (the in-memory transaction can be discarded by group rather + // than wholesale). When false (the default), every group lands in + // a single atomic rewrite snapshot. // - // In both modes the final catalog commit happens once at - // Transaction.Commit() time. True per-group durability (matching - // Java's behavior) requires committing separate transactions per - // group, which is left to the caller. + // In both modes the catalog commit happens once at + // [Transaction.Commit] time, so a process crash mid-loop loses + // every staged group regardless of this flag. Callers who need + // true per-group catalog durability (matching Java's behavior) + // should drive [Transaction.NewRewrite] themselves and commit a + // fresh transaction per group. PartialProgress bool // SnapshotProps are added to the rewrite snapshot's summary. + // In partial-progress mode the same properties land on every + // per-group snapshot rather than being summed or split. SnapshotProps iceberg.Properties // ExtraDeleteFilesToRemove are delete files (typically equality // deletes that are dead after the rewrite) that the caller wants - // expunged in the same snapshot as the rewrite. The executor - // passes them through to ReplaceFiles unchanged. Honored only - // when PartialProgress is false. + // expunged in the same snapshot as the rewrite. Honored only when + // PartialProgress is false. // // Use [compaction.CollectDeadEqualityDeletes] to compute this list // from the current snapshot. Position delete files that are fully // applied are removed automatically and do NOT need to be passed // in here. ExtraDeleteFilesToRemove []iceberg.DataFile + + // GroupOptions are forwarded to every [ExecuteCompactionGroup] + // call to tune the per-group read+write pipeline (target file + // size, scan concurrency). See the With* helpers returning + // [CompactionGroupOption]. + GroupOptions []CompactionGroupOption +} + +// CompactionGroupOption configures a single [ExecuteCompactionGroup] +// call. Use the With* helpers to construct values. +type CompactionGroupOption func(*compactionGroupConfig) + +type compactionGroupConfig struct { + targetFileSize int64 + scanConcurrency int +} + +// WithCompactionTargetFileSize sets the size target for output files +// written by [ExecuteCompactionGroup]. Forwarded to [WriteRecords] as +// [WithTargetFileSize]. Zero (the default) means inherit the table's +// `write.target-file-size-bytes` property. +func WithCompactionTargetFileSize(size int64) CompactionGroupOption { + return func(c *compactionGroupConfig) { + c.targetFileSize = size + } +} + +// WithCompactionScanConcurrency sets the scan concurrency used when +// reading the group's tasks. Forwarded to [Table.Scan] as +// [WitMaxConcurrency]. Zero (the default) means runtime.GOMAXPROCS. Review Comment: Nit: this godoc link enshrines the pre-existing `WitMaxConcurrency` typo (missing `h`) in a second exported symbol's documentation, making the eventual rename a touch noisier. Either leave a TODO referencing the typo or wait for the rename PR to land before adding this link. Not blocking. ########## table/rewrite_data_files.go: ########## @@ -70,34 +71,112 @@ type CompactionTaskGroup struct { TotalSizeBytes int64 } +// CompactionGroupResult is the per-group output of a compaction +// worker: the new files written, the old files being replaced, and +// the position delete files safe to expunge in the rewrite snapshot. +// +// A distributed coordinator aggregates results from N workers and +// applies them to a [RewriteFiles] builder via [RewriteFiles.Apply] +// to commit a single atomic snapshot. Each field is plain data +// ([]iceberg.DataFile values plus scalars) — callers serialize the +// contained DataFiles across process boundaries themselves; the +// typical pattern is to have the worker write a manifest containing +// the new files and ship the manifest path to the coordinator, which +// re-reads it. +type CompactionGroupResult struct { + // PartitionKey mirrors [CompactionTaskGroup.PartitionKey] for + // display/logging on the coordinator. + PartitionKey string + + // OldDataFiles are the data files this group replaces. + OldDataFiles []iceberg.DataFile + + // NewDataFiles are the consolidated outputs the worker wrote. + NewDataFiles []iceberg.DataFile + + // SafePosDeletes are position-delete files referenced by tasks in + // this group whose target data file is being rewritten, computed + // via [CollectSafePositionDeletes]. They are safe to expunge in + // the rewrite snapshot. + SafePosDeletes []iceberg.DataFile + + // BytesBefore is [CompactionTaskGroup.TotalSizeBytes] passed + // through, recorded so the coordinator can roll up metrics + // without re-reading the plan. + BytesBefore int64 + + // BytesAfter is the sum of [iceberg.DataFile.FileSizeBytes] across + // NewDataFiles. + BytesAfter int64 +} + // RewriteDataFilesOptions bundles the per-rewrite knobs for -// Transaction.RewriteDataFiles. +// [Transaction.RewriteDataFiles]. type RewriteDataFilesOptions struct { - // PartialProgress, when true, stages each group via ReplaceFiles - // inside the loop so work survives a mid-loop write failure. When - // false (the default), all groups are committed in a single atomic - // snapshot. + // PartialProgress, when true, stages each group as its own + // rewrite snapshot inside the loop so a mid-loop write failure + // leaves the already-completed groups staged on this transaction + // (the in-memory transaction can be discarded by group rather + // than wholesale). When false (the default), every group lands in + // a single atomic rewrite snapshot. // - // In both modes the final catalog commit happens once at - // Transaction.Commit() time. True per-group durability (matching - // Java's behavior) requires committing separate transactions per - // group, which is left to the caller. + // In both modes the catalog commit happens once at + // [Transaction.Commit] time, so a process crash mid-loop loses + // every staged group regardless of this flag. Callers who need + // true per-group catalog durability (matching Java's behavior) + // should drive [Transaction.NewRewrite] themselves and commit a + // fresh transaction per group. PartialProgress bool // SnapshotProps are added to the rewrite snapshot's summary. + // In partial-progress mode the same properties land on every + // per-group snapshot rather than being summed or split. SnapshotProps iceberg.Properties // ExtraDeleteFilesToRemove are delete files (typically equality // deletes that are dead after the rewrite) that the caller wants - // expunged in the same snapshot as the rewrite. The executor - // passes them through to ReplaceFiles unchanged. Honored only - // when PartialProgress is false. + // expunged in the same snapshot as the rewrite. Honored only when + // PartialProgress is false. // // Use [compaction.CollectDeadEqualityDeletes] to compute this list // from the current snapshot. Position delete files that are fully // applied are removed automatically and do NOT need to be passed // in here. ExtraDeleteFilesToRemove []iceberg.DataFile + + // GroupOptions are forwarded to every [ExecuteCompactionGroup] + // call to tune the per-group read+write pipeline (target file + // size, scan concurrency). See the With* helpers returning + // [CompactionGroupOption]. + GroupOptions []CompactionGroupOption +} + +// CompactionGroupOption configures a single [ExecuteCompactionGroup] +// call. Use the With* helpers to construct values. +type CompactionGroupOption func(*compactionGroupConfig) + +type compactionGroupConfig struct { + targetFileSize int64 + scanConcurrency int +} + +// WithCompactionTargetFileSize sets the size target for output files +// written by [ExecuteCompactionGroup]. Forwarded to [WriteRecords] as +// [WithTargetFileSize]. Zero (the default) means inherit the table's +// `write.target-file-size-bytes` property. +func WithCompactionTargetFileSize(size int64) CompactionGroupOption { Review Comment: Negative values are silently ignored. The consumer at L291 checks `cfg.targetFileSize > 0` before applying it, so passing `-1` (or any negative) sets the field, fails the `> 0` gate, and silently falls through to the table-property default — but the doc on L163-166 only mentions zero as the inherit sentinel. A coordinator computing `targetFileSize = baseSize - overhead` could produce a negative under certain inputs and silently get the default instead of an error. `WitMaxConcurrency` (table.go:692) already handles this — it returns `noopOption` for `n <= 0`. Mirror that: ```go func WithCompactionTargetFileSize(size int64) CompactionGroupOption { if size <= 0 { return func(*compactionGroupConfig) {} } return func(c *compactionGroupConfig) { c.targetFileSize = size } } ``` ########## table/rewrite_data_files_test.go: ########## @@ -243,6 +243,122 @@ func TestRewriteDataFiles_EmptyPlan(t *testing.T) { assert.Equal(t, int64(0), result.BytesBefore) } +// TestExecuteCompactionGroup_TargetFileSizeForwarded verifies that +// WithCompactionTargetFileSize reaches the underlying WriteRecords +// call: a tiny target size on a multi-row group must force the +// writer to emit more than one output file. +func TestExecuteCompactionGroup_TargetFileSizeForwarded(t *testing.T) { + tbl := newRewriteTestTable(t) + + arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, false) + require.NoError(t, err) + + for i := range 5 { + dataPath := tbl.Location() + fmt.Sprintf("/data/file-%d.parquet", i) + writeParquetFile(t, dataPath, arrowSc, + fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1)) + tx := tbl.NewTransaction() + require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, nil, false)) + tbl, err = tx.Commit(t.Context()) + require.NoError(t, err) + } + + tasks, err := tbl.Scan().PlanFiles(t.Context()) + require.NoError(t, err) + + plan, err := defaultTestCompactionCfg.PlanCompaction(tasks) + require.NoError(t, err) + require.NotEmpty(t, plan.Groups) + + groups := toTaskGroups(plan.Groups) + require.Len(t, groups, 1, "test assumes a single group; tighten plan if this changes") + + withTiny, err := table.ExecuteCompactionGroup(t.Context(), tbl, groups[0], + table.WithCompactionTargetFileSize(1)) + require.NoError(t, err) + assert.Greater(t, len(withTiny.NewDataFiles), 1, + "WithCompactionTargetFileSize(1) must force the writer to roll over per row") + + withDefault, err := table.ExecuteCompactionGroup(t.Context(), tbl, groups[0]) + require.NoError(t, err) + assert.Len(t, withDefault.NewDataFiles, 1, + "without the option, the same group consolidates into a single file") +} + +// TestExecuteCompactionGroup_ScanConcurrencyForwarded is a smoke test +// confirming WithCompactionScanConcurrency is wired through without +// breaking the read path. We can't easily observe scan parallelism from +// the result, so the assertion is correctness equivalence with the +// default. +func TestExecuteCompactionGroup_ScanConcurrencyForwarded(t *testing.T) { + tbl := newRewriteTestTable(t) + + arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, false) + require.NoError(t, err) + + for i := range 3 { + dataPath := tbl.Location() + fmt.Sprintf("/data/file-%d.parquet", i) + writeParquetFile(t, dataPath, arrowSc, + fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1)) + tx := tbl.NewTransaction() + require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, nil, false)) + tbl, err = tx.Commit(t.Context()) + require.NoError(t, err) + } + + tasks, err := tbl.Scan().PlanFiles(t.Context()) + require.NoError(t, err) + + plan, err := defaultTestCompactionCfg.PlanCompaction(tasks) + require.NoError(t, err) + require.NotEmpty(t, plan.Groups) + + groups := toTaskGroups(plan.Groups) + + got, err := table.ExecuteCompactionGroup(t.Context(), tbl, groups[0], + table.WithCompactionScanConcurrency(1)) + require.NoError(t, err) + assert.NotEmpty(t, got.NewDataFiles) + assert.NotEmpty(t, got.OldDataFiles) +} + +// TestRewriteDataFiles_GroupOptionsForwarded verifies that +// RewriteDataFilesOptions.GroupOptions are piped through to every +// ExecuteCompactionGroup call. +func TestRewriteDataFiles_GroupOptionsForwarded(t *testing.T) { + tbl := newRewriteTestTable(t) + + arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, false) + require.NoError(t, err) + + for i := range 5 { + dataPath := tbl.Location() + fmt.Sprintf("/data/file-%d.parquet", i) + writeParquetFile(t, dataPath, arrowSc, + fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1)) + tx := tbl.NewTransaction() + require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, nil, false)) + tbl, err = tx.Commit(t.Context()) + require.NoError(t, err) + } + + tasks, err := tbl.Scan().PlanFiles(t.Context()) + require.NoError(t, err) + + plan, err := defaultTestCompactionCfg.PlanCompaction(tasks) + require.NoError(t, err) + + tx := tbl.NewTransaction() + result, err := tx.RewriteDataFiles(t.Context(), toTaskGroups(plan.Groups), table.RewriteDataFilesOptions{ + GroupOptions: []table.CompactionGroupOption{ + table.WithCompactionTargetFileSize(1), + }, + }) + require.NoError(t, err) Review Comment: Test doesn't commit the transaction. `result.AddedDataFiles > 1` is an in-process counter that's incremented before any `ReplaceFiles` is staged, so the option-forwarding claim is correct — but a regression that breaks `ReplaceFiles` for tiny-target-size rewrites would slip through. Adding `committed, err := tx.Commit(t.Context()); require.NoError(t, err)` plus a row-count assertion against the committed snapshot would harden this against future regressions without much extra setup. ########## table/rewrite_data_files_test.go: ########## @@ -243,6 +243,122 @@ func TestRewriteDataFiles_EmptyPlan(t *testing.T) { assert.Equal(t, int64(0), result.BytesBefore) } +// TestExecuteCompactionGroup_TargetFileSizeForwarded verifies that +// WithCompactionTargetFileSize reaches the underlying WriteRecords +// call: a tiny target size on a multi-row group must force the +// writer to emit more than one output file. +func TestExecuteCompactionGroup_TargetFileSizeForwarded(t *testing.T) { + tbl := newRewriteTestTable(t) + + arrowSc, err := table.SchemaToArrowSchema(tbl.Schema(), nil, false, false) + require.NoError(t, err) + + for i := range 5 { + dataPath := tbl.Location() + fmt.Sprintf("/data/file-%d.parquet", i) + writeParquetFile(t, dataPath, arrowSc, + fmt.Sprintf(`[{"id": %d, "data": "row-%d"}]`, i+1, i+1)) + tx := tbl.NewTransaction() + require.NoError(t, tx.AddFiles(t.Context(), []string{dataPath}, nil, false)) + tbl, err = tx.Commit(t.Context()) + require.NoError(t, err) + } + + tasks, err := tbl.Scan().PlanFiles(t.Context()) + require.NoError(t, err) + + plan, err := defaultTestCompactionCfg.PlanCompaction(tasks) + require.NoError(t, err) + require.NotEmpty(t, plan.Groups) + + groups := toTaskGroups(plan.Groups) + require.Len(t, groups, 1, "test assumes a single group; tighten plan if this changes") Review Comment: Brittle: this test exists to verify `WithCompactionTargetFileSize` is forwarded, but it couples that assertion to the planner producing exactly one group. If `MinInputFiles`, bin-packing parameters, or default file sizes ever shift, this fails for an unrelated reason. `TestRewriteDataFiles_PartialRewritePreservesEqDelete` (L583-589) constructs the group manually, which decouples option-forwarding from planner behavior — same pattern would work here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
