aokolnychyi commented on code in PR #7731:
URL: https://github.com/apache/iceberg/pull/7731#discussion_r1217434804


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -251,4 +322,126 @@ private static void validatePlanningArguments(long 
splitSize, int lookback, long
     Preconditions.checkArgument(lookback > 0, "Split planning lookback must be 
> 0: %s", lookback);
     Preconditions.checkArgument(openFileCost >= 0, "File open cost must be >= 
0: %s", openFileCost);
   }
+
+  private static <T extends ScanTask, G extends ScanTaskGroup<T>>
+      CloseableIterable<G> planTasksInternal(
+          CloseableIterable<T> splitFiles,
+          long splitSize,
+          int lookback,
+          Function<T, Long> weightFunc,
+          Function<List<T>, G> groupFunc) {
+
+    return CloseableIterable.transform(
+        CloseableIterable.combine(
+            new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, 
weightFunc, true),
+            splitFiles),
+        groupFunc);
+  }
+
+  private static class AdaptiveSplitPlanningIterable<T extends ScanTask, G 
extends ScanTaskGroup<T>>
+      extends CloseableGroup implements CloseableIterable<G> {
+    private final CloseableIterable<T> files;
+    private final int parallelism;
+    private final long splitSize;
+    private final int lookback;
+    private final Function<T, Long> weightFunc;
+    private final BiFunction<CloseableIterable<T>, Long, CloseableIterable<T>> 
splitFunc;
+    private final Function<List<T>, G> groupFunc;
+
+    private Long targetSize = null;
+
+    private AdaptiveSplitPlanningIterable(
+        CloseableIterable<T> files,
+        int parallelism,
+        long splitSize,
+        int lookback,
+        Function<T, Long> weightFunc,
+        BiFunction<CloseableIterable<T>, Long, CloseableIterable<T>> splitFunc,
+        Function<List<T>, G> groupFunc) {
+      this.files = files;
+      this.parallelism = parallelism;
+      this.splitSize = splitSize;
+      this.lookback = lookback;
+      this.weightFunc = weightFunc;
+      this.splitFunc = splitFunc;
+      this.groupFunc = groupFunc;
+    }
+
+    @Override
+    public CloseableIterator<G> iterator() {
+      if (targetSize != null) {
+        // target size is already known so plan with the static target size
+        CloseableIterable<T> splitTasks = splitFunc.apply(files, targetSize);
+        CloseableIterator<G> iter =
+            planTasksInternal(splitTasks, targetSize, lookback, weightFunc, 
groupFunc).iterator();
+        addCloseable(iter);
+        return iter;
+      }
+
+      boolean shouldClose = true;
+      CloseableIterator<T> tasksIter = files.iterator();
+      try {
+        // load tasks until the iterator is exhausted or until the total 
weight is enough to get the
+        // parallelism at the split size passed in.
+        LinkedList<T> readAheadTasks = Lists.newLinkedList();
+        long readToSize = parallelism * splitSize;
+        long totalSize = 0L;
+
+        while (tasksIter.hasNext()) {
+          T task = tasksIter.next();
+          readAheadTasks.addLast(task);
+          totalSize += weightFunc.apply(task);
+
+          if (totalSize > readToSize) {
+            break;
+          }
+        }
+
+        // if total size was reached, then the requested split size is used. 
otherwise, the iterator
+        // was exhausted and the split size will be adjusted to target 
parallelism with a reasonable
+        // minimum.
+        this.targetSize = Math.max(MIN_SPLIT_SIZE, Math.min(totalSize / 
parallelism, splitSize));

Review Comment:
   Can we move this logic into a separate utility method? I think we will use 
that directly in Spark as we have a fully loaded list of tasks and there is no 
point in constructing `CloseableIterable` and complicating the logic in that 
case.
   
   I think this approach would perform well for cases where the provided split 
size prevents us from utilizing all cores in the cluster. However, I'd also 
target the following two use cases (I am still thinking about the second one):
   
   - The size of data to be scanned is large and the default split size 
produces too many tasks. That's the original issue the PR with a SQL property 
was trying to solve. The need to keep all that extra task info can easily kill 
the driver, not to mention the overhead of launching a task in Spark (easily a 
second).
   - I think our goal is to produce a number of splits that is a multiple of 
the number of slots in the cluster. Suppose we produce 1100 tasks for 1000 
cores that are all equally balanced. Spark will run first 1000 tasks in 
parallel (let's assume each task would take the same amount of time) and only 
after that schedule the remaining 100 tasks. The remaining 100 tasks would take 
the same amount of time as the first 1000 as most of the slots in the cluster 
will be idle. Instead of blindly following the configured split size, it would 
be much better to increase the split size by 10% and process all data in one 
pass. That can improve the read performance up to 50%.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to