aokolnychyi commented on code in PR #2276:
URL: https://github.com/apache/iceberg/pull/2276#discussion_r1016116389


##########
api/src/main/java/org/apache/iceberg/util/StructProjection.java:
##########
@@ -171,6 +185,14 @@ public StructProjection wrap(StructLike newStruct) {
     return this;
   }
 
+  public StructProjection copy() {
+    return new StructProjection(this);
+  }
+
+  public StructType type() {

Review Comment:
   Is this still being used?



##########
api/src/main/java/org/apache/iceberg/util/StructProjection.java:
##########
@@ -90,6 +91,19 @@ public static StructProjection createAllowMissing(
   private final StructProjection[] nestedProjections;
   private StructLike struct;
 
+  private StructProjection(StructProjection other) {
+    this.type = other.type;
+    this.positionMap =

Review Comment:
   What if do the same optimization trick like in `StructLikeWrapper$copyFor` 
to avoid copying these arrays?
   
   We can have a constructor like this:
   ```
   private StructProjection(
       StructType type, int[] positionMap, StructProjection[] 
nestedProjections) {
     this.type = type;
     this.positionMap = positionMap;
     this.nestedProjections = nestedProjections;
   }
   ``` 
   
   And `copyFor` method like this:
   
   ```
   public StructProjection copyFor(StructLike newStruct) {
     return new StructProjection(type, positionMap, 
nestedProjections).wrap(newStruct);
   }
   ```
   
   This way, we share the arrays in all instances, which should be fine as we 
have a projection per spec and these arrays won't actually change.



##########
core/src/test/java/org/apache/iceberg/util/TestTableScanUtil.java:
##########
@@ -136,6 +141,129 @@ public void testTaskMerging() {
     Assert.assertEquals("Appropriate tasks should be merged", 3, 
mergedTasks.size());
   }
 
+  private static final Schema TEST_SCHEMA =
+      new Schema(
+          Types.NestedField.optional(1, "c1", Types.IntegerType.get()),
+          Types.NestedField.optional(2, "c2", Types.StringType.get()),
+          Types.NestedField.optional(3, "c3", Types.StringType.get()),
+          Types.NestedField.optional(4, "c4", Types.StringType.get()));
+
+  private static final PartitionSpec SPEC1 =
+      
PartitionSpec.builderFor(TEST_SCHEMA).identity("c1").identity("c2").build();
+
+  private static final PartitionSpec SPEC2 =
+      
PartitionSpec.builderFor(TEST_SCHEMA).identity("c1").identity("c3").identity("c2").build();
+
+  private static final StructLike PARTITION1 = new TestStructLike(100, "a");
+  private static final StructLike PARTITION2 = new TestStructLike(200, "b");
+
+  @Test
+  public void testTaskGroupPlanningByPartition() {

Review Comment:
   Can we add a test that we throw an exception if the combining key includes 
columns not present in all partition specs? Like if we try to combine by all 
partition fields in `SPEC2`?
   
   I think that should be covered by the existing code as 
`StructProjection$create` will complain.



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -128,6 +137,66 @@ public static <T extends ScanTask> 
CloseableIterable<ScanTaskGroup<T>> planTaskG
         combinedTasks -> new BaseScanTaskGroup<>(mergeTasks(combinedTasks)));
   }
 
+  @SuppressWarnings("unchecked")
+  public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> 
planTaskGroups(
+      List<T> tasks,
+      long splitSize,
+      int lookback,
+      long openFileCost,
+      Types.StructType projectedPartitionType) {
+
+    Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative 
or 0): %s", splitSize);
+    Preconditions.checkArgument(
+        lookback > 0, "Invalid split planning lookback (negative or 0): %s", 
lookback);
+    Preconditions.checkArgument(
+        openFileCost >= 0, "Invalid file open cost (negative): %s", 
openFileCost);
+
+    List<T> splitTasks = Lists.newArrayList();
+    for (T task : tasks) {
+      if (task instanceof SplittableScanTask<?>) {
+        ((SplittableScanTask<? extends T>) 
task).split(splitSize).forEach(splitTasks::add);
+      } else {
+        splitTasks.add(task);
+      }
+    }
+
+    Function<T, Long> weightFunc =
+        task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
+
+    StructLikeWrapper wrapper = 
StructLikeWrapper.forType(projectedPartitionType);
+    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+
+    // Group tasks by their partition values
+    ListMultimap<StructLikeWrapper, T> groupedTasks =
+        Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+
+    for (T task : splitTasks) {
+      PartitionSpec spec = task.spec();
+      projectionsBySpec.computeIfAbsent(
+          spec.specId(),
+          specId -> StructProjection.create(spec.partitionType(), 
projectedPartitionType));
+      StructProjection projectedStruct = projectionsBySpec.get(spec.specId());
+      StructLikeWrapper wrapperCopy =
+          wrapper.copyFor(projectedStruct.copy().wrap(task.partition()));
+      groupedTasks.put(wrapperCopy, task);
+    }
+
+    // Now apply task combining within each partition
+    List<List<BaseScanTaskGroup<T>>> nestedTaskGroups =
+        groupedTasks.asMap().values().stream()
+            .map(

Review Comment:
   I'd probably use streams directly via `flatMap` and move the logic for 
packing into a separate method.
   
   ```
   return tasksByKey.values().stream()
       .flatMap(sameKeyTasks -> toTaskGroupStream(sameKeyTasks, splitSize, 
lookback, weightFunc))
       .collect(Collectors.toList());
   ``` 
   
   ```
   private static <T extends ScanTask> Stream<ScanTaskGroup<T>> 
toTaskGroupStream(
       Iterable<T> tasks, long splitSize, int lookback, Function<T, Long> 
weightFunc) {
     ...
   }
   ```



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -128,6 +137,66 @@ public static <T extends ScanTask> 
CloseableIterable<ScanTaskGroup<T>> planTaskG
         combinedTasks -> new BaseScanTaskGroup<>(mergeTasks(combinedTasks)));
   }
 
+  @SuppressWarnings("unchecked")
+  public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> 
planTaskGroups(
+      List<T> tasks,
+      long splitSize,
+      int lookback,
+      long openFileCost,
+      Types.StructType projectedPartitionType) {
+
+    Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative 
or 0): %s", splitSize);
+    Preconditions.checkArgument(
+        lookback > 0, "Invalid split planning lookback (negative or 0): %s", 
lookback);
+    Preconditions.checkArgument(
+        openFileCost >= 0, "Invalid file open cost (negative): %s", 
openFileCost);
+
+    List<T> splitTasks = Lists.newArrayList();
+    for (T task : tasks) {
+      if (task instanceof SplittableScanTask<?>) {
+        ((SplittableScanTask<? extends T>) 
task).split(splitSize).forEach(splitTasks::add);
+      } else {
+        splitTasks.add(task);
+      }
+    }
+
+    Function<T, Long> weightFunc =
+        task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost);
+
+    StructLikeWrapper wrapper = 
StructLikeWrapper.forType(projectedPartitionType);
+    Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
+
+    // Group tasks by their partition values
+    ListMultimap<StructLikeWrapper, T> groupedTasks =

Review Comment:
   Instead of dealing with `StructLikeWrapper` ourselves, what about using 
`StructLikeMap` instead?
   
   ```
   StructLikeMap<List<T>> tasksByKey = 
StructLikeMap.create(projectedPartitionType);
   ```
   
   We will have to call `computeIfAbsent` to init the list ourselves but that's 
seems easier than to explicitly manage the wrapper. We try to use 
`StructLikeMap` whenever possible.
   



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -128,6 +137,66 @@ public static <T extends ScanTask> 
CloseableIterable<ScanTaskGroup<T>> planTaskG
         combinedTasks -> new BaseScanTaskGroup<>(mergeTasks(combinedTasks)));
   }
 
+  @SuppressWarnings("unchecked")
+  public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> 
planTaskGroups(
+      List<T> tasks,
+      long splitSize,
+      int lookback,
+      long openFileCost,
+      Types.StructType projectedPartitionType) {
+
+    Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative 
or 0): %s", splitSize);
+    Preconditions.checkArgument(
+        lookback > 0, "Invalid split planning lookback (negative or 0): %s", 
lookback);
+    Preconditions.checkArgument(
+        openFileCost >= 0, "Invalid file open cost (negative): %s", 
openFileCost);
+
+    List<T> splitTasks = Lists.newArrayList();

Review Comment:
   What about skipping this temporary list and simply checking if a task is 
splittable in the for loop below? All of this will be held in the driver memory 
and I am a bit paranoid.
   
   ```
   if (task instanceof SplittableScanTask<?>) {
     SplittableScanTask<? extends T> splittableTask = (SplittableScanTask<? 
extends T>) task;
     Iterables.addAll(sameKeyTasks, splittableTask.split(splitSize));
   } else {
     sameKeyTasks.add(task);
   }
   ```



##########
api/src/main/java/org/apache/iceberg/FileScanTask.java:
##########
@@ -21,7 +21,8 @@
 import java.util.List;
 
 /** A scan task over a range of bytes in a single data file. */
-public interface FileScanTask extends ContentScanTask<DataFile>, 
SplittableScanTask<FileScanTask> {
+public interface FileScanTask

Review Comment:
   What about implementing `PartitionScanTask` in `ContentScanTask`?
   That way, all its children will inherit the new functionality.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to