stevenzwu commented on code in PR #6304: URL: https://github.com/apache/iceberg/pull/6304#discussion_r1035310165
########## core/src/main/java/org/apache/iceberg/util/TableScanUtil.java: ########## @@ -151,38 +153,68 @@ public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroup Function<T, Long> weightFunc = task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); - Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap(); + Map<Integer, StructProjection> groupingKeyProjectionsBySpec = Maps.newHashMap(); - // Group tasks by their partition values - StructLikeMap<List<T>> tasksByPartition = StructLikeMap.create(projectedPartitionType); + // group tasks by grouping keys derived from their partition tuples + StructLikeMap<List<T>> tasksByGroupingKey = StructLikeMap.create(groupingKeyType); for (T task : tasks) { PartitionSpec spec = task.spec(); - StructProjection projectedStruct = - projectionsBySpec.computeIfAbsent( + StructProjection groupingKeyProjection = + groupingKeyProjectionsBySpec.computeIfAbsent( spec.specId(), - specId -> StructProjection.create(spec.partitionType(), projectedPartitionType)); - List<T> taskList = - tasksByPartition.computeIfAbsent( - projectedStruct.copyFor(task.partition()), k -> Lists.newArrayList()); + specId -> StructProjection.create(spec.partitionType(), groupingKeyType)); + List<T> groupingKeyTasks = + tasksByGroupingKey.computeIfAbsent( + projectGroupingKey(groupingKeyProjection, groupingKeyType, task), + groupingKey -> Lists.newArrayList()); if (task instanceof SplittableScanTask<?>) { - ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(taskList::add); + ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(groupingKeyTasks::add); } else { - taskList.add(task); + groupingKeyTasks.add(task); } } - // Now apply task combining within each partition - return FluentIterable.from(tasksByPartition.values()) - .transformAndConcat(ts -> toTaskGroupIterable(ts, splitSize, lookback, weightFunc)) - .toList(); + List<ScanTaskGroup<T>> taskGroups = Lists.newArrayList(); + + for (Map.Entry<StructLike, List<T>> entry : tasksByGroupingKey.entrySet()) { + StructLike groupingKey = entry.getKey(); + List<T> groupingKeyTasks = entry.getValue(); + Iterables.addAll( + taskGroups, + toTaskGroupIterable(groupingKey, groupingKeyTasks, splitSize, lookback, weightFunc)); + } + + return taskGroups; + } + + private static StructLike projectGroupingKey( + StructProjection groupingKeyProjection, + Types.StructType groupingKeyType, + PartitionScanTask task) { + + PartitionData groupingKey = new PartitionData(groupingKeyType); + + groupingKeyProjection.wrap(task.partition()); Review Comment: I was saying calling the wrap outside this method, e.g. right before line 168. `groupingKeyProjection` is read only inside this method. `groupingKeyProjection` is fully constructed before this method. Then we don't need to pass in `task` or `partition` into this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org