stevenzwu commented on code in PR #16108:
URL: https://github.com/apache/iceberg/pull/16108#discussion_r3276124068


##########
api/src/main/java/org/apache/iceberg/SnapshotUpdate.java:
##########
@@ -60,6 +60,21 @@ public interface SnapshotUpdate<ThisT> extends 
PendingUpdate<Snapshot> {
    */
   ThisT scanManifestsWith(ExecutorService executorService);
 
+  /**
+   * Use a particular executor to write manifests during commit. The default 
worker pool will be
+   * used by default.
+   *
+   * <p>Implementations may require a specific executor type (such as {@link
+   * java.util.concurrent.ThreadPoolExecutor}) to derive parallelism for 
manifest writes.
+   *
+   * @param executorService the provided executor
+   * @return this for method chaining
+   */
+  default ThisT commitManifestsWith(ExecutorService executorService) {

Review Comment:
   Two parts: a context note on the dual-method shape, then a recommendation on 
this specific method.
   
   ### Dual-method shape is fine — it matches existing convention
   
   Separate `scanManifestsWith` (read-side) + a write-side companion is 
consistent with `ExpireSnapshots`, which has had 
[`planWith(ExecutorService)`](https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/ExpireSnapshots.java)
 for read-side planning and `executeDeleteWith(ExecutorService)` for 
delete-side IO as separate methods since 2020 — same pattern, same reason: two 
distinct parallelizable phases, one executor per phase. Callers who want a 
single pool can pass the same `ExecutorService` to both methods.
   
   For history (this PR closes a long-standing gap rather than introducing a 
new shape): [#4147](https://github.com/apache/iceberg/pull/4147) (Feb 2022) 
added `scanManifestsWith` scoped to *reads* because manifest writes weren't 
parallelized at the time. Writes got parallelized later in 
[#11086](https://github.com/apache/iceberg/pull/11086), which hardcoded 
`ThreadPools.getWorkerPool()` instead of plumbing the existing executor hook 
through. The original Flink classloader/lifecycle motivation 
([#3776](https://github.com/apache/iceberg/issues/3776)) applies just as 
strongly to writes, so plumbing a caller-provided pool here is a natural 
extension.
   
   ### Two design suggestions on this method specifically
   
   **1. Rename `commitManifestsWith` → `writeManifestsWith`.**
   
   The natural counterpart of `scanManifestsWith` is `writeManifestsWith`, not 
`commitManifestsWith`. `scan` and `write` are verbs describing the operation on 
manifests; `commit` describes a phase. Three concrete reasons to prefer the 
verb pairing:
   
   - **"Commit" is overloaded in Iceberg.** `Transaction.commitTransaction()`, 
`PendingUpdate.commit()`, the catalog RPC at the end of 
`SnapshotProducer.commit()`, the commit retry budget — all reuse the word for 
different things. A reader landing on `commitManifestsWith(executor)` cold can 
reasonably read it as "use this executor to perform the commit," which is 
wrong: the catalog commit is a single RPC with nothing to parallelize.
   - **The pool literally writes.** `commitPool()` is invoked exactly once, 
inside the `writeManifests` method, around the `writeFunc.apply(group)` calls 
that produce manifest files. The internal naming (`commitPool` field driving 
`writeManifests`) already has a small inconsistency that the rename resolves.
   - Iceberg style: "method names should describe the specific behavior, not 
just be generic" — same instinct as `selectInIdOrder` over `selectOrdered`.
   
   **2. Take parallelism as an explicit second arg: 
`writeManifestsWith(ExecutorService executor, int parallelism)`.**
   
   The current 1-arg shape conflates two concerns:
   - *Lifecycle/threading*: where the `Runnable`s execute.
   - *Layout determinism*: how many manifest groups get produced (currently 
inferred from `getMaximumPoolSize()`).
   
   Coupling them forces the `instanceof ThreadPoolExecutor` precondition, which:
   - **Rejects `Executors.newSingleThreadExecutor()`** — it returns a 
`FinalizableDelegatedExecutorService` wrapper, not a raw `ThreadPoolExecutor`. 
Asymmetric with `Executors.newFixedThreadPool(1)`, which works.
   - **Forecloses `ForkJoinPool` and virtual-thread executors permanently** — a 
real cost as JDK 21+ pushes `Executors.newVirtualThreadPerTaskExecutor()` for 
I/O-bound work like manifest writes.
   - The recent `getMaximumPoolSize() < Integer.MAX_VALUE` guard plugs the 
`newCachedThreadPool` hole but doesn't address the structural issue.
   
   Decoupling fixes all three:
   - Any `ExecutorService` works — `ForkJoinPool`, virtual threads, 
single-thread, cached pool, fixed pool.
   - Layout determinism is *stated*, not *inferred* — the caller knows the 
manifest count.
   - The strict-type precondition reduces to 
`Preconditions.checkArgument(parallelism > 0)`.
   
   ### On the "match Iceberg convention" pull for the second arg
   
   Every existing executor method (`scanManifestsWith`, `Scan.planWith`, 
`ExpireSnapshots.planWith`, `executeDeleteWith`, `executeWith`) is single-arg 
with no explicit parallelism. But those methods all share a property 
`writeManifestsWith` doesn't: **the pool is a pure performance knob**. Pool 
size affects how fast, not what gets produced. The write-side pool is unique in 
that pool size determines manifest count — a correctness / durable-output 
concern. Different semantics, different shape is justifiable.
   
   If staying aligned with convention is preferred, the 
[`TableMigrationUtil`](https://github.com/apache/iceberg/blob/main/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java)
 precedent of overload pairs `(ExecutorService)` and `(ExecutorService, int 
parallelism)` is available. But since this method hasn't shipped yet, landing 
the two-arg version directly is cleaner — no overload split, no future 
migration story.
   
   ### Net recommendation
   
   ```java
   default ThisT writeManifestsWith(ExecutorService executor, int parallelism) {
     throw new UnsupportedOperationException(
         this.getClass().getName() + " does not support writeManifestsWith");
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to