aokolnychyi commented on code in PR #2276:
URL: https://github.com/apache/iceberg/pull/2276#discussion_r1018413283


##########
api/src/main/java/org/apache/iceberg/FileScanTask.java:
##########
@@ -48,4 +48,9 @@ default boolean isFileScanTask() {
   default FileScanTask asFileScanTask() {
     return this;
   }
+
+  @Override

Review Comment:
   nit: Can we remove this now as the implementation is inherited from 
`ContentScanTask`?



##########
api/src/main/java/org/apache/iceberg/util/StructProjection.java:
##########
@@ -171,6 +178,10 @@ public StructProjection wrap(StructLike newStruct) {
     return this;
   }
 
+  public StructProjection copy() {

Review Comment:
   This should be similar to `StructLikeWrapper$copyFor`.
   
   ```
   public StructProjection copyFor(StructLike newStruct) {
     return new StructProjection(type, positionMap, 
nestedProjections).wrap(newStruct);
   }
   ```



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -121,10 +128,66 @@ public static <T extends ScanTask> 
CloseableIterable<ScanTaskGroup<T>> planTaskG
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
+    return toTaskGroupIterable(splitTasks, splitSize, lookback, weightFunc);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> 
planTaskGroups(
+      List<T> tasks,
+      long splitSize,
+      int lookback,
+      long openFileCost,
+      Types.StructType projectedPartitionType) {
+
+    Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative 
or 0): %s", splitSize);
+    Preconditions.checkArgument(
+        lookback > 0, "Invalid split planning lookback (negative or 0): %s", 
lookback);
+    Preconditions.checkArgument(
+        openFileCost >= 0, "Invalid file open cost (negative): %s", 
openFileCost);
+
+    List<T> splitTasks = Lists.newArrayList();
+    for (T task : tasks) {
+      if (task instanceof SplittableScanTask<?>) {
+        ((SplittableScanTask<? extends T>) 
task).split(splitSize).forEach(splitTasks::add);
+      } else {
+        splitTasks.add(task);
+      }
+    }
+
+    Function<T, Long> weightFunc =
+        task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
+
+    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+
+    // Group tasks by their partition values
+    StructLikeMap<List<T>> groupedTasks = 
StructLikeMap.create(projectedPartitionType);
+
+    for (T task : splitTasks) {
+      PartitionSpec spec = task.spec();
+      projectionsBySpec.computeIfAbsent(
+          spec.specId(),
+          specId -> StructProjection.create(spec.partitionType(), 
projectedPartitionType));
+      StructProjection projectedStruct = 
projectionsBySpec.get(spec.specId()).copy();

Review Comment:
   Apart from having extra list entries for temp split tasks, we also call the 
projection code on each split separately. That means more projections are 
created here too. If we have a file with 4 row groups, we will call the 
projection code on each split.
   
   To be safe, I'd iterate over non-split tasks and call `split` after we 
derive `StructProjection` for the parent task.



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -121,10 +128,66 @@ public static <T extends ScanTask> 
CloseableIterable<ScanTaskGroup<T>> planTaskG
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
+    return toTaskGroupIterable(splitTasks, splitSize, lookback, weightFunc);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> 
planTaskGroups(
+      List<T> tasks,
+      long splitSize,
+      int lookback,
+      long openFileCost,
+      Types.StructType projectedPartitionType) {
+
+    Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative 
or 0): %s", splitSize);
+    Preconditions.checkArgument(
+        lookback > 0, "Invalid split planning lookback (negative or 0): %s", 
lookback);
+    Preconditions.checkArgument(
+        openFileCost >= 0, "Invalid file open cost (negative): %s", 
openFileCost);
+
+    List<T> splitTasks = Lists.newArrayList();
+    for (T task : tasks) {
+      if (task instanceof SplittableScanTask<?>) {
+        ((SplittableScanTask<? extends T>) 
task).split(splitSize).forEach(splitTasks::add);
+      } else {
+        splitTasks.add(task);
+      }
+    }
+
+    Function<T, Long> weightFunc =
+        task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
+
+    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+
+    // Group tasks by their partition values
+    StructLikeMap<List<T>> groupedTasks = 
StructLikeMap.create(projectedPartitionType);
+
+    for (T task : splitTasks) {
+      PartitionSpec spec = task.spec();
+      projectionsBySpec.computeIfAbsent(
+          spec.specId(),
+          specId -> StructProjection.create(spec.partitionType(), 
projectedPartitionType));
+      StructProjection projectedStruct = 
projectionsBySpec.get(spec.specId()).copy();
+      groupedTasks
+          .computeIfAbsent(projectedStruct.copy().wrap(task.partition()), k -> 
Lists.newArrayList())

Review Comment:
   Do we call `copy` twice? Once we add `copyFor`, can it be simply like this?
   
   ```
   StructProjection projection =
       projectionsBySpec.computeIfAbsent(
           spec.specId(),
           specId -> StructProjection.create(spec.partitionType(), 
projectedPartitionType));
   StructProjection projectedStruct = projection.copyFor(task.partition());
   ```



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -121,10 +128,66 @@ public static <T extends ScanTask> 
CloseableIterable<ScanTaskGroup<T>> planTaskG
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
+    return toTaskGroupIterable(splitTasks, splitSize, lookback, weightFunc);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> 
planTaskGroups(
+      List<T> tasks,
+      long splitSize,
+      int lookback,
+      long openFileCost,
+      Types.StructType projectedPartitionType) {
+
+    Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative 
or 0): %s", splitSize);
+    Preconditions.checkArgument(
+        lookback > 0, "Invalid split planning lookback (negative or 0): %s", 
lookback);
+    Preconditions.checkArgument(
+        openFileCost >= 0, "Invalid file open cost (negative): %s", 
openFileCost);
+
+    List<T> splitTasks = Lists.newArrayList();
+    for (T task : tasks) {
+      if (task instanceof SplittableScanTask<?>) {
+        ((SplittableScanTask<? extends T>) 
task).split(splitSize).forEach(splitTasks::add);
+      } else {
+        splitTasks.add(task);
+      }
+    }
+
+    Function<T, Long> weightFunc =
+        task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
+
+    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+
+    // Group tasks by their partition values
+    StructLikeMap<List<T>> groupedTasks = 
StructLikeMap.create(projectedPartitionType);
+
+    for (T task : splitTasks) {
+      PartitionSpec spec = task.spec();
+      projectionsBySpec.computeIfAbsent(
+          spec.specId(),
+          specId -> StructProjection.create(spec.partitionType(), 
projectedPartitionType));
+      StructProjection projectedStruct = 
projectionsBySpec.get(spec.specId()).copy();
+      groupedTasks
+          .computeIfAbsent(projectedStruct.copy().wrap(task.partition()), k -> 
Lists.newArrayList())
+          .add(task);
+    }
+
+    // Now apply task combining within each partition
+    return groupedTasks.values().stream()
+        .flatMap(
+            ts ->
+                StreamSupport.stream(
+                    toTaskGroupIterable(ts, splitSize, lookback, 
weightFunc).spliterator(), false))
+        .collect(Collectors.toList());
+  }
+
+  private static <T extends ScanTask> CloseableIterable<ScanTaskGroup<T>> 
toTaskGroupIterable(

Review Comment:
   I understand we want to reuse this method in both calls. I am afraid this 
will lead to a resource leak as the existing call passes a closable iterable, 
which wouldn't be properly closed now. Also, if we return a closable iterable, 
it is strange to collect it into a list without closing.
   
   I suggested `toTaskGroupStream` to avoid weird Spotless formatting for the 
closure. If you feel it is easier to just have a closure, let's drop the extra 
method. Up to you.
   



##########
api/src/main/java/org/apache/iceberg/util/StructProjection.java:
##########
@@ -90,6 +90,13 @@ public static StructProjection createAllowMissing(
   private final StructProjection[] nestedProjections;
   private StructLike struct;
 
+  private StructProjection(StructProjection other) {

Review Comment:
   I don't think this is correct. If we have a copy constructor, the arrays 
must be copied too. However, I don't think we need a copy constructor. We need 
to do exactly the trick we did in `StructLikeWrapper$copyFor`.
   
   I feel we need a constructor that would accept specific variables like in 
[this](https://github.com/apache/iceberg/pull/2276#discussion_r1016115987) 
comment. 



##########
core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java:
##########
@@ -136,6 +142,141 @@ public void testTaskMerging() {
     Assert.assertEquals("Appropriate tasks should be merged", 3, 
mergedTasks.size());
   }
 
+  private static final Schema TEST_SCHEMA =
+      new Schema(
+          Types.NestedField.optional(1, "c1", Types.IntegerType.get()),
+          Types.NestedField.optional(2, "c2", Types.StringType.get()),
+          Types.NestedField.optional(3, "c3", Types.StringType.get()),
+          Types.NestedField.optional(4, "c4", Types.StringType.get()));
+
+  private static final PartitionSpec SPEC1 =
+      
PartitionSpec.builderFor(TEST_SCHEMA).identity("c1").identity("c2").build();
+
+  private static final PartitionSpec SPEC2 =
+      
PartitionSpec.builderFor(TEST_SCHEMA).identity("c1").identity("c3").identity("c2").build();
+
+  private static final StructLike PARTITION1 = new TestStructLike(100, "a");
+  private static final StructLike PARTITION2 = new TestStructLike(200, "b");
+
+  @Test
+  public void testTaskGroupPlanningByPartition() {
+    // When all files belong to the same partition, we should combine them 
together as long as the
+    // total file size is <= split size
+    List<PartitionScanTask> tasks =
+        ImmutableList.of(
+            taskWithPartition(SPEC1, PARTITION1, 64),
+            taskWithPartition(SPEC1, PARTITION1, 128),
+            taskWithPartition(SPEC1, PARTITION1, 64),
+            taskWithPartition(SPEC1, PARTITION1, 128));
+
+    int count = 0;
+    for (ScanTaskGroup<PartitionScanTask> task :
+        TableScanUtil.planTaskGroups(tasks, 512, 10, 4, 
SPEC1.partitionType())) {
+      Assert.assertEquals(4, task.filesCount());
+      Assert.assertEquals(64 + 128 + 64 + 128, task.sizeBytes());
+      count += 1;
+    }
+    Assert.assertEquals(1, count);
+
+    // We have 2 files from partition 1 and 2 files from partition 2, so they 
should be combined
+    // separately
+    tasks =
+        ImmutableList.of(
+            taskWithPartition(SPEC1, PARTITION1, 64),
+            taskWithPartition(SPEC1, PARTITION1, 128),
+            taskWithPartition(SPEC1, PARTITION2, 64),
+            taskWithPartition(SPEC1, PARTITION2, 128));
+
+    count = 0;
+    for (ScanTaskGroup<PartitionScanTask> task :
+        TableScanUtil.planTaskGroups(tasks, 512, 10, 4, 
SPEC1.partitionType())) {
+      Assert.assertEquals(2, task.filesCount());
+      Assert.assertEquals(64 + 128, task.sizeBytes());
+      count += 1;
+    }
+    Assert.assertEquals(2, count);
+
+    // Similar to the case above, but now files have different partition specs
+    tasks =
+        ImmutableList.of(
+            taskWithPartition(SPEC1, PARTITION1, 64),
+            taskWithPartition(SPEC2, PARTITION1, 128),
+            taskWithPartition(SPEC1, PARTITION2, 64),
+            taskWithPartition(SPEC2, PARTITION2, 128));
+
+    count = 0;
+    for (ScanTaskGroup<PartitionScanTask> task :
+        TableScanUtil.planTaskGroups(tasks, 512, 10, 4, 
SPEC1.partitionType())) {
+      Assert.assertEquals(2, task.filesCount());
+      Assert.assertEquals(64 + 128, task.sizeBytes());
+      count += 1;
+    }
+    Assert.assertEquals(2, count);
+
+    // Combining within partitions should also respect split size. In this 
case, the split size
+    // is equal to the file size, so each partition will have 2 tasks.
+    tasks =
+        ImmutableList.of(
+            taskWithPartition(SPEC1, PARTITION1, 128),
+            taskWithPartition(SPEC2, PARTITION1, 128),
+            taskWithPartition(SPEC1, PARTITION2, 128),
+            taskWithPartition(SPEC2, PARTITION2, 128));
+
+    count = 0;
+    for (ScanTaskGroup<PartitionScanTask> task :
+        TableScanUtil.planTaskGroups(tasks, 128, 10, 4, 
SPEC1.partitionType())) {
+      Assert.assertEquals(1, task.filesCount());
+      Assert.assertEquals(128, task.sizeBytes());
+      count += 1;
+    }
+    Assert.assertEquals(4, count);
+
+    // The following should throw exception since `SPEC2` is not an 
intersection of partition specs
+    // across all tasks.
+    List<PartitionScanTask> tasks2 =
+        ImmutableList.of(
+            taskWithPartition(SPEC1, PARTITION1, 128), 
taskWithPartition(SPEC2, PARTITION2, 128));
+
+    AssertHelpers.assertThrows(

Review Comment:
   Thanks for adding this!



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -121,10 +128,66 @@ public static <T extends ScanTask> 
CloseableIterable<ScanTaskGroup<T>> planTaskG
     Function<T, Long> weightFunc =
         task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
 
+    return toTaskGroupIterable(splitTasks, splitSize, lookback, weightFunc);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> 
planTaskGroups(
+      List<T> tasks,
+      long splitSize,
+      int lookback,
+      long openFileCost,
+      Types.StructType projectedPartitionType) {
+
+    Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative 
or 0): %s", splitSize);
+    Preconditions.checkArgument(
+        lookback > 0, "Invalid split planning lookback (negative or 0): %s", 
lookback);
+    Preconditions.checkArgument(
+        openFileCost >= 0, "Invalid file open cost (negative): %s", 
openFileCost);
+
+    List<T> splitTasks = Lists.newArrayList();
+    for (T task : tasks) {
+      if (task instanceof SplittableScanTask<?>) {
+        ((SplittableScanTask<? extends T>) 
task).split(splitSize).forEach(splitTasks::add);
+      } else {
+        splitTasks.add(task);
+      }
+    }
+
+    Function<T, Long> weightFunc =
+        task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
+
+    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+
+    // Group tasks by their partition values
+    StructLikeMap<List<T>> groupedTasks = 
StructLikeMap.create(projectedPartitionType);
+
+    for (T task : splitTasks) {
+      PartitionSpec spec = task.spec();
+      projectionsBySpec.computeIfAbsent(

Review Comment:
   nit: Just assign the result of `computeIfAbsent` to a var instead of calling 
`get` one more time?
   
   ```
   StructProjection projection =
       projectionsBySpec.computeIfAbsent(...)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to