aokolnychyi commented on code in PR #2276: URL: https://github.com/apache/iceberg/pull/2276#discussion_r1018413283
########## api/src/main/java/org/apache/iceberg/FileScanTask.java: ########## @@ -48,4 +48,9 @@ default boolean isFileScanTask() { default FileScanTask asFileScanTask() { return this; } + + @Override Review Comment: nit: Can we remove this now as the implementation is inherited from `ContentScanTask`? ########## api/src/main/java/org/apache/iceberg/util/StructProjection.java: ########## @@ -171,6 +178,10 @@ public StructProjection wrap(StructLike newStruct) { return this; } + public StructProjection copy() { Review Comment: This should be similar to `StructLikeWrapper$copyFor`. ``` public StructProjection copyFor(StructLike newStruct) { return new StructProjection(type, positionMap, nestedProjections).wrap(newStruct); } ``` ########## core/src/main/java/org/apache/iceberg/util/TableScanUtil.java: ########## @@ -121,10 +128,66 @@ public static <T extends ScanTask> CloseableIterable<ScanTaskGroup<T>> planTaskG Function<T, Long> weightFunc = task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); + return toTaskGroupIterable(splitTasks, splitSize, lookback, weightFunc); + } + + @SuppressWarnings("unchecked") + public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroups( + List<T> tasks, + long splitSize, + int lookback, + long openFileCost, + Types.StructType projectedPartitionType) { + + Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize); + Preconditions.checkArgument( + lookback > 0, "Invalid split planning lookback (negative or 0): %s", lookback); + Preconditions.checkArgument( + openFileCost >= 0, "Invalid file open cost (negative): %s", openFileCost); + + List<T> splitTasks = Lists.newArrayList(); + for (T task : tasks) { + if (task instanceof SplittableScanTask<?>) { + ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(splitTasks::add); + } else { + splitTasks.add(task); + } + } + + Function<T, Long> weightFunc = + task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); + + Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap(); + + // Group tasks by their partition values + StructLikeMap<List<T>> groupedTasks = StructLikeMap.create(projectedPartitionType); + + for (T task : splitTasks) { + PartitionSpec spec = task.spec(); + projectionsBySpec.computeIfAbsent( + spec.specId(), + specId -> StructProjection.create(spec.partitionType(), projectedPartitionType)); + StructProjection projectedStruct = projectionsBySpec.get(spec.specId()).copy(); Review Comment: Apart from having extra list entries for temp split tasks, we also call the projection code on each split separately. That means more projections are created here too. If we have a file with 4 row groups, we will call the projection code on each split. To be safe, I'd iterate over non-split tasks and call `split` after we derive `StructProjection` for the parent task. ########## core/src/main/java/org/apache/iceberg/util/TableScanUtil.java: ########## @@ -121,10 +128,66 @@ public static <T extends ScanTask> CloseableIterable<ScanTaskGroup<T>> planTaskG Function<T, Long> weightFunc = task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); + return toTaskGroupIterable(splitTasks, splitSize, lookback, weightFunc); + } + + @SuppressWarnings("unchecked") + public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroups( + List<T> tasks, + long splitSize, + int lookback, + long openFileCost, + Types.StructType projectedPartitionType) { + + Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize); + Preconditions.checkArgument( + lookback > 0, "Invalid split planning lookback (negative or 0): %s", lookback); + Preconditions.checkArgument( + openFileCost >= 0, "Invalid file open cost (negative): %s", openFileCost); + + List<T> splitTasks = Lists.newArrayList(); + for (T task : tasks) { + if (task instanceof SplittableScanTask<?>) { + ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(splitTasks::add); + } else { + splitTasks.add(task); + } + } + + Function<T, Long> weightFunc = + task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); + + Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap(); + + // Group tasks by their partition values + StructLikeMap<List<T>> groupedTasks = StructLikeMap.create(projectedPartitionType); + + for (T task : splitTasks) { + PartitionSpec spec = task.spec(); + projectionsBySpec.computeIfAbsent( + spec.specId(), + specId -> StructProjection.create(spec.partitionType(), projectedPartitionType)); + StructProjection projectedStruct = projectionsBySpec.get(spec.specId()).copy(); + groupedTasks + .computeIfAbsent(projectedStruct.copy().wrap(task.partition()), k -> Lists.newArrayList()) Review Comment: Do we call `copy` twice? Once we add `copyFor`, can it be simply like this? ``` StructProjection projection = projectionsBySpec.computeIfAbsent( spec.specId(), specId -> StructProjection.create(spec.partitionType(), projectedPartitionType)); StructProjection projectedStruct = projection.copyFor(task.partition()); ``` ########## core/src/main/java/org/apache/iceberg/util/TableScanUtil.java: ########## @@ -121,10 +128,66 @@ public static <T extends ScanTask> CloseableIterable<ScanTaskGroup<T>> planTaskG Function<T, Long> weightFunc = task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); + return toTaskGroupIterable(splitTasks, splitSize, lookback, weightFunc); + } + + @SuppressWarnings("unchecked") + public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroups( + List<T> tasks, + long splitSize, + int lookback, + long openFileCost, + Types.StructType projectedPartitionType) { + + Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize); + Preconditions.checkArgument( + lookback > 0, "Invalid split planning lookback (negative or 0): %s", lookback); + Preconditions.checkArgument( + openFileCost >= 0, "Invalid file open cost (negative): %s", openFileCost); + + List<T> splitTasks = Lists.newArrayList(); + for (T task : tasks) { + if (task instanceof SplittableScanTask<?>) { + ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(splitTasks::add); + } else { + splitTasks.add(task); + } + } + + Function<T, Long> weightFunc = + task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); + + Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap(); + + // Group tasks by their partition values + StructLikeMap<List<T>> groupedTasks = StructLikeMap.create(projectedPartitionType); + + for (T task : splitTasks) { + PartitionSpec spec = task.spec(); + projectionsBySpec.computeIfAbsent( + spec.specId(), + specId -> StructProjection.create(spec.partitionType(), projectedPartitionType)); + StructProjection projectedStruct = projectionsBySpec.get(spec.specId()).copy(); + groupedTasks + .computeIfAbsent(projectedStruct.copy().wrap(task.partition()), k -> Lists.newArrayList()) + .add(task); + } + + // Now apply task combining within each partition + return groupedTasks.values().stream() + .flatMap( + ts -> + StreamSupport.stream( + toTaskGroupIterable(ts, splitSize, lookback, weightFunc).spliterator(), false)) + .collect(Collectors.toList()); + } + + private static <T extends ScanTask> CloseableIterable<ScanTaskGroup<T>> toTaskGroupIterable( Review Comment: I understand we want to reuse this method in both calls. I am afraid this will lead to a resource leak as the existing call passes a closable iterable, which wouldn't be properly closed now. Also, if we return a closable iterable, it is strange to collect it into a list without closing. I suggested `toTaskGroupStream` to avoid weird Spotless formatting for the closure. If you feel it is easier to just have a closure, let's drop the extra method. Up to you. ########## api/src/main/java/org/apache/iceberg/util/StructProjection.java: ########## @@ -90,6 +90,13 @@ public static StructProjection createAllowMissing( private final StructProjection[] nestedProjections; private StructLike struct; + private StructProjection(StructProjection other) { Review Comment: I don't think this is correct. If we have a copy constructor, the arrays must be copied too. However, I don't think we need a copy constructor. We need to do exactly the trick we did in `StructLikeWrapper$copyFor`. I feel we need a constructor that would accept specific variables like in [this](https://github.com/apache/iceberg/pull/2276#discussion_r1016115987) comment. ########## core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java: ########## @@ -136,6 +142,141 @@ public void testTaskMerging() { Assert.assertEquals("Appropriate tasks should be merged", 3, mergedTasks.size()); } + private static final Schema TEST_SCHEMA = + new Schema( + Types.NestedField.optional(1, "c1", Types.IntegerType.get()), + Types.NestedField.optional(2, "c2", Types.StringType.get()), + Types.NestedField.optional(3, "c3", Types.StringType.get()), + Types.NestedField.optional(4, "c4", Types.StringType.get())); + + private static final PartitionSpec SPEC1 = + PartitionSpec.builderFor(TEST_SCHEMA).identity("c1").identity("c2").build(); + + private static final PartitionSpec SPEC2 = + PartitionSpec.builderFor(TEST_SCHEMA).identity("c1").identity("c3").identity("c2").build(); + + private static final StructLike PARTITION1 = new TestStructLike(100, "a"); + private static final StructLike PARTITION2 = new TestStructLike(200, "b"); + + @Test + public void testTaskGroupPlanningByPartition() { + // When all files belong to the same partition, we should combine them together as long as the + // total file size is <= split size + List<PartitionScanTask> tasks = + ImmutableList.of( + taskWithPartition(SPEC1, PARTITION1, 64), + taskWithPartition(SPEC1, PARTITION1, 128), + taskWithPartition(SPEC1, PARTITION1, 64), + taskWithPartition(SPEC1, PARTITION1, 128)); + + int count = 0; + for (ScanTaskGroup<PartitionScanTask> task : + TableScanUtil.planTaskGroups(tasks, 512, 10, 4, SPEC1.partitionType())) { + Assert.assertEquals(4, task.filesCount()); + Assert.assertEquals(64 + 128 + 64 + 128, task.sizeBytes()); + count += 1; + } + Assert.assertEquals(1, count); + + // We have 2 files from partition 1 and 2 files from partition 2, so they should be combined + // separately + tasks = + ImmutableList.of( + taskWithPartition(SPEC1, PARTITION1, 64), + taskWithPartition(SPEC1, PARTITION1, 128), + taskWithPartition(SPEC1, PARTITION2, 64), + taskWithPartition(SPEC1, PARTITION2, 128)); + + count = 0; + for (ScanTaskGroup<PartitionScanTask> task : + TableScanUtil.planTaskGroups(tasks, 512, 10, 4, SPEC1.partitionType())) { + Assert.assertEquals(2, task.filesCount()); + Assert.assertEquals(64 + 128, task.sizeBytes()); + count += 1; + } + Assert.assertEquals(2, count); + + // Similar to the case above, but now files have different partition specs + tasks = + ImmutableList.of( + taskWithPartition(SPEC1, PARTITION1, 64), + taskWithPartition(SPEC2, PARTITION1, 128), + taskWithPartition(SPEC1, PARTITION2, 64), + taskWithPartition(SPEC2, PARTITION2, 128)); + + count = 0; + for (ScanTaskGroup<PartitionScanTask> task : + TableScanUtil.planTaskGroups(tasks, 512, 10, 4, SPEC1.partitionType())) { + Assert.assertEquals(2, task.filesCount()); + Assert.assertEquals(64 + 128, task.sizeBytes()); + count += 1; + } + Assert.assertEquals(2, count); + + // Combining within partitions should also respect split size. In this case, the split size + // is equal to the file size, so each partition will have 2 tasks. + tasks = + ImmutableList.of( + taskWithPartition(SPEC1, PARTITION1, 128), + taskWithPartition(SPEC2, PARTITION1, 128), + taskWithPartition(SPEC1, PARTITION2, 128), + taskWithPartition(SPEC2, PARTITION2, 128)); + + count = 0; + for (ScanTaskGroup<PartitionScanTask> task : + TableScanUtil.planTaskGroups(tasks, 128, 10, 4, SPEC1.partitionType())) { + Assert.assertEquals(1, task.filesCount()); + Assert.assertEquals(128, task.sizeBytes()); + count += 1; + } + Assert.assertEquals(4, count); + + // The following should throw exception since `SPEC2` is not an intersection of partition specs + // across all tasks. + List<PartitionScanTask> tasks2 = + ImmutableList.of( + taskWithPartition(SPEC1, PARTITION1, 128), taskWithPartition(SPEC2, PARTITION2, 128)); + + AssertHelpers.assertThrows( Review Comment: Thanks for adding this! ########## core/src/main/java/org/apache/iceberg/util/TableScanUtil.java: ########## @@ -121,10 +128,66 @@ public static <T extends ScanTask> CloseableIterable<ScanTaskGroup<T>> planTaskG Function<T, Long> weightFunc = task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); + return toTaskGroupIterable(splitTasks, splitSize, lookback, weightFunc); + } + + @SuppressWarnings("unchecked") + public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroups( + List<T> tasks, + long splitSize, + int lookback, + long openFileCost, + Types.StructType projectedPartitionType) { + + Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize); + Preconditions.checkArgument( + lookback > 0, "Invalid split planning lookback (negative or 0): %s", lookback); + Preconditions.checkArgument( + openFileCost >= 0, "Invalid file open cost (negative): %s", openFileCost); + + List<T> splitTasks = Lists.newArrayList(); + for (T task : tasks) { + if (task instanceof SplittableScanTask<?>) { + ((SplittableScanTask<? extends T>) task).split(splitSize).forEach(splitTasks::add); + } else { + splitTasks.add(task); + } + } + + Function<T, Long> weightFunc = + task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); + + Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap(); + + // Group tasks by their partition values + StructLikeMap<List<T>> groupedTasks = StructLikeMap.create(projectedPartitionType); + + for (T task : splitTasks) { + PartitionSpec spec = task.spec(); + projectionsBySpec.computeIfAbsent( Review Comment: nit: Just assign the result of `computeIfAbsent` to a var instead of calling `get` one more time? ``` StructProjection projection = projectionsBySpec.computeIfAbsent(...) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org