wombatu-kun commented on code in PR #16730:
URL: https://github.com/apache/iceberg/pull/16730#discussion_r3385207092


##########
core/src/main/java/org/apache/iceberg/ManifestFiles.java:
##########
@@ -602,4 +612,54 @@ static long cacheMaxContentLength(FileIO io) {
         CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH,
         CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT);
   }
+
+  /**
+   * Writes the given files into manifests in parallel, splitting them into 
the given number of
+   * groups and submitting each group to the provided executor.
+   *
+   * @param files content files to write
+   * @param parallelism number of parallel groups; the caller decides this 
based on its own
+   *     parallelism and minimum-group-size policy
+   * @param writePool executor used to run group writes concurrently
+   * @param writeFunc function that writes a single group and returns the 
resulting manifests
+   * @return manifests in input-group order
+   */
+  static <F> List<ManifestFile> writeParallel(
+      Collection<F> files,
+      int parallelism,
+      ExecutorService writePool,
+      Function<List<F>, List<ManifestFile>> writeFunc) {
+    List<List<F>> groups = partition(files, parallelism);
+
+    // Pair each group with its index so results can be reassembled in input 
order.
+    List<Pair<Integer, List<F>>> groupsWithIndex = Lists.newArrayList();
+    for (int i = 0; i < groups.size(); i++) {
+      groupsWithIndex.add(Pair.of(i, groups.get(i)));
+    }
+
+    AtomicReferenceArray<List<ManifestFile>> results = new 
AtomicReferenceArray<>(groups.size());
+
+    Tasks.foreach(groupsWithIndex)
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(writePool)
+        .run(
+            indexedGroup -> {
+              int index = indexedGroup.first();
+              List<F> group = indexedGroup.second();
+              results.set(index, writeFunc.apply(group));
+            });
+
+    ImmutableList.Builder<ManifestFile> builder = ImmutableList.builder();
+    for (int i = 0; i < results.length(); i++) {
+      builder.addAll(results.get(i));
+    }
+    return builder.build();
+  }
+
+  private static <T> List<List<T>> partition(Collection<T> collection, int 
groupCount) {

Review Comment:
   This `partition(collection, groupCount)` takes a group COUNT but delegates 
to Guava `Lists.partition(list, groupSize)` (line 663), whose second argument 
is a group SIZE - same name, opposite unit, so `partition(files, parallelism)` 
at the call sites can be misread as size-based chunking. Consider a 
count-explicit name like `splitIntoGroups` (or the prior `divide`) to avoid 
colliding with `Lists.partition`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to