rdblue commented on code in PR #7731:
URL: https://github.com/apache/iceberg/pull/7731#discussion_r1220272233
##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -251,4 +322,126 @@ private static void validatePlanningArguments(long
splitSize, int lookback, long
Preconditions.checkArgument(lookback > 0, "Split planning lookback must be
> 0: %s", lookback);
Preconditions.checkArgument(openFileCost >= 0, "File open cost must be >=
0: %s", openFileCost);
}
+
+ private static <T extends ScanTask, G extends ScanTaskGroup<T>>
+ CloseableIterable<G> planTasksInternal(
+ CloseableIterable<T> splitFiles,
+ long splitSize,
+ int lookback,
+ Function<T, Long> weightFunc,
+ Function<List<T>, G> groupFunc) {
+
+ return CloseableIterable.transform(
+ CloseableIterable.combine(
+ new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback,
weightFunc, true),
+ splitFiles),
+ groupFunc);
+ }
+
+ private static class AdaptiveSplitPlanningIterable<T extends ScanTask, G
extends ScanTaskGroup<T>>
+ extends CloseableGroup implements CloseableIterable<G> {
+ private final CloseableIterable<T> files;
+ private final int parallelism;
+ private final long splitSize;
+ private final int lookback;
+ private final Function<T, Long> weightFunc;
+ private final BiFunction<CloseableIterable<T>, Long, CloseableIterable<T>>
splitFunc;
+ private final Function<List<T>, G> groupFunc;
+
+ private Long targetSize = null;
+
+ private AdaptiveSplitPlanningIterable(
+ CloseableIterable<T> files,
+ int parallelism,
+ long splitSize,
+ int lookback,
+ Function<T, Long> weightFunc,
+ BiFunction<CloseableIterable<T>, Long, CloseableIterable<T>> splitFunc,
+ Function<List<T>, G> groupFunc) {
+ this.files = files;
+ this.parallelism = parallelism;
+ this.splitSize = splitSize;
+ this.lookback = lookback;
+ this.weightFunc = weightFunc;
+ this.splitFunc = splitFunc;
+ this.groupFunc = groupFunc;
+ }
+
+ @Override
+ public CloseableIterator<G> iterator() {
+ if (targetSize != null) {
+ // target size is already known so plan with the static target size
+ CloseableIterable<T> splitTasks = splitFunc.apply(files, targetSize);
+ CloseableIterator<G> iter =
+ planTasksInternal(splitTasks, targetSize, lookback, weightFunc,
groupFunc).iterator();
+ addCloseable(iter);
+ return iter;
+ }
+
+ boolean shouldClose = true;
+ CloseableIterator<T> tasksIter = files.iterator();
+ try {
+ // load tasks until the iterator is exhausted or until the total
weight is enough to get the
+ // parallelism at the split size passed in.
+ LinkedList<T> readAheadTasks = Lists.newLinkedList();
+ long readToSize = parallelism * splitSize;
+ long totalSize = 0L;
+
+ while (tasksIter.hasNext()) {
+ T task = tasksIter.next();
+ readAheadTasks.addLast(task);
+ totalSize += weightFunc.apply(task);
+
+ if (totalSize > readToSize) {
+ break;
+ }
+ }
+
+ // if total size was reached, then the requested split size is used.
otherwise, the iterator
+ // was exhausted and the split size will be adjusted to target
parallelism with a reasonable
+ // minimum.
+ this.targetSize = Math.max(MIN_SPLIT_SIZE, Math.min(totalSize /
parallelism, splitSize));
Review Comment:
> Suppose we produce 1100 tasks for 1000 cores that are all equally
balanced. Spark will run first 1000 tasks in parallel (let's assume each task
would take the same amount of time) and only after that schedule the remaining
100 tasks. The remaining 100 tasks would take the same amount of time as the
first 1000 as most of the slots in the cluster will be idle. Instead of blindly
following the configured split size, it would be much better to increase the
split size by 10% and process all data in one pass.
Would it be better? I've typically recommended targeting splits so that you
have 2-4x the number of splits as the number of cores. That way if there is a
difference in actual task runtime you can still balance the workload. For
example, if you're reading 100MB splits where some of them produce 250MB of
shuffle data and some produce 2.5MB of shuffle data, you want more splits to
balance the workload as executors become idle.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]