stevenzwu commented on code in PR #16108:
URL: https://github.com/apache/iceberg/pull/16108#discussion_r3276124068
##########
api/src/main/java/org/apache/iceberg/SnapshotUpdate.java:
##########
@@ -60,6 +60,21 @@ public interface SnapshotUpdate<ThisT> extends
PendingUpdate<Snapshot> {
*/
ThisT scanManifestsWith(ExecutorService executorService);
+ /**
+ * Use a particular executor to write manifests during commit. The default
worker pool will be
+ * used by default.
+ *
+ * <p>Implementations may require a specific executor type (such as {@link
+ * java.util.concurrent.ThreadPoolExecutor}) to derive parallelism for
manifest writes.
+ *
+ * @param executorService the provided executor
+ * @return this for method chaining
+ */
+ default ThisT commitManifestsWith(ExecutorService executorService) {
Review Comment:
Two parts: a context note on the dual-method shape, then a recommendation on
this specific method.
### Dual-method shape is fine — it matches existing convention
Separate `scanManifestsWith` (read-side) + a write-side companion is
consistent with `ExpireSnapshots`, which has had
[`planWith(ExecutorService)`](https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java)
for read-side planning and `executeDeleteWith(ExecutorService)` for
delete-side IO as separate methods since 2020 — same pattern, same reason: two
distinct parallelizable phases, one executor per phase. Callers who want a
single pool can pass the same `ExecutorService` to both methods.
For history (this PR closes a long-standing gap rather than introducing a
new shape): [#4147](https://github.com/apache/iceberg/pull/4147) (Feb 2022)
added `scanManifestsWith` scoped to *reads* because manifest writes weren't
parallelized at the time. Writes got parallelized later in
[#11086](https://github.com/apache/iceberg/pull/11086), which hardcoded
`ThreadPools.getWorkerPool()` instead of plumbing the existing executor hook
through. The original Flink classloader/lifecycle motivation
([#3776](https://github.com/apache/iceberg/issues/3776)) applies just as
strongly to writes, so plumbing a caller-provided pool here is a natural
extension.
### Two design suggestions on this method specifically
**1. Rename `commitManifestsWith` → `writeManifestsWith`.**
The natural counterpart of `scanManifestsWith` is `writeManifestsWith`, not
`commitManifestsWith`. `scan` and `write` are verbs describing the operation on
manifests; `commit` describes a phase. Three concrete reasons to prefer the
verb pairing:
- **"Commit" is overloaded in Iceberg.** `Transaction.commitTransaction()`,
`PendingUpdate.commit()`, the catalog RPC at the end of
`SnapshotProducer.commit()`, the commit retry budget — all reuse the word for
different things. A reader landing on `commitManifestsWith(executor)` cold can
reasonably read it as "use this executor to perform the commit," which is
wrong: the catalog commit is a single RPC with nothing to parallelize.
- **The pool literally writes.** `commitPool()` is invoked exactly once,
inside the `writeManifests` method, around the `writeFunc.apply(group)` calls
that produce manifest files. The internal naming (`commitPool` field driving
`writeManifests`) already has a small inconsistency that the rename resolves.
- Iceberg style: "method names should describe the specific behavior, not
just be generic" — same instinct as `selectInIdOrder` over `selectOrdered`.
**2. Take parallelism as an explicit second arg:
`writeManifestsWith(ExecutorService executor, int parallelism)`.**
The current 1-arg shape conflates two concerns:
- *Lifecycle/threading*: where the `Runnable`s execute.
- *Layout determinism*: how many manifest groups get produced (currently
inferred from `getMaximumPoolSize()`).
Coupling them forces the `instanceof ThreadPoolExecutor` precondition, which:
- **Rejects `Executors.newSingleThreadExecutor()`** — it returns a
`FinalizableDelegatedExecutorService` wrapper, not a raw `ThreadPoolExecutor`.
Asymmetric with `Executors.newFixedThreadPool(1)`, which works.
- **Forecloses `ForkJoinPool` and virtual-thread executors permanently** — a
real cost as JDK 21+ pushes `Executors.newVirtualThreadPerTaskExecutor()` for
I/O-bound work like manifest writes.
- The recent `getMaximumPoolSize() < Integer.MAX_VALUE` guard plugs the
`newCachedThreadPool` hole but doesn't address the structural issue.
Decoupling fixes all three:
- Any `ExecutorService` works — `ForkJoinPool`, virtual threads,
single-thread, cached pool, fixed pool.
- Layout determinism is *stated*, not *inferred* — the caller knows the
manifest count.
- The strict-type precondition reduces to
`Preconditions.checkArgument(parallelism > 0)`.
### On the "match Iceberg convention" pull for the second arg
Every existing executor method (`scanManifestsWith`, `Scan.planWith`,
`ExpireSnapshots.planWith`, `executeDeleteWith`, `executeWith`) is single-arg
with no explicit parallelism. But those methods all share a property
`writeManifestsWith` doesn't: **the pool is a pure performance knob**. Pool
size affects how fast, not what gets produced. The write-side pool is unique in
that pool size determines manifest count — a correctness / durable-output
concern. Different semantics, different shape is justifiable.
If staying aligned with convention is preferred, the
[`TableMigrationUtil`](https://github.com/apache/iceberg/blob/main/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java)
precedent of overload pairs `(ExecutorService)` and `(ExecutorService, int
parallelism)` is available. But since this method hasn't shipped yet, landing
the two-arg version directly is cleaner — no overload split, no future
migration story.
### Net recommendation
```java
default ThisT writeManifestsWith(ExecutorService executor, int parallelism) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not support writeManifestsWith");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]