aokolnychyi commented on code in PR #2276:
URL: https://github.com/apache/iceberg/pull/2276#discussion_r1014217373


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles(
     return CloseableIterable.combine(splitTasks, tasks);
   }
 
+  public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks(
+      List<T> splitFiles,
+      long splitSize,
+      int lookback,
+      long openFileCost,
+      StructProjection projection) {

Review Comment:
   Why not have it as `StructType`? I believe we just need the type of the key 
we can't combine by. When we consume this from Spark, we will compute an 
intersection of all partition specs in `Partitioning` as `StructType`, similar 
to what we already do in `Partitioning$partitionType` (computes a union of all 
partition types).



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles(
     return CloseableIterable.combine(splitTasks, tasks);
   }
 
+  public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks(

Review Comment:
   If we accept a list, what about returning a list as well? We access tasks by 
index in `SparkBatch`.



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles(
     return CloseableIterable.combine(splitTasks, tasks);
   }
 
+  public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks(
+      List<T> splitFiles,
+      long splitSize,
+      int lookback,
+      long openFileCost,
+      StructProjection projection) {
+    Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative 
or 0): %s", splitSize);
+    Preconditions.checkArgument(
+        lookback > 0, "Invalid split planning lookback (negative or 0): %s", 
lookback);
+    Preconditions.checkArgument(
+        openFileCost >= 0, "Invalid file open cost (negative): %s", 
openFileCost);
+
+    // Check the size of delete file as well to avoid unbalanced bin-packing
+    Function<FileScanTask, Long> weightFunc =
+        file ->
+            Math.max(
+                file.length()
+                    + 
file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(),
+                (1 + file.deletes().size()) * openFileCost);
+
+    ListMultimap<StructLikeWrapper, FileScanTask> groupedFiles =
+        Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+
+    for (T task : splitFiles) {
+      PartitionSpec spec = task.spec();
+      StructProjection projectedStruct =
+          StructProjection.create(spec.partitionType(), projection.type());

Review Comment:
   I am afraid creating a projection for each task would be expensive. What 
about having a map of projections that would be lazily initialized? That way, 
we will only create one projection per spec.
   
   ```
   Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap();
   
   ...
   
   projectionsBySpec.computeIfAbsent(
       spec.specId(), specId -> StructProjection.create(spec.partitionType(), 
...));
   ```
   



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles(
     return CloseableIterable.combine(splitTasks, tasks);
   }
 
+  public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks(

Review Comment:
   I would consider adding `PartitionScanTask` and using it as a boundary in 
this method. That way, we can make this logic generic and support arbitrary 
scan tasks with partition info.
   
   ```
   public interface PartitionScanTask extends ScanTask {
     PartitionSpec spec();
     StructLike partition();
   }
   ```
   
   ```
   public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> 
planTaskGroups(...) {}
   ```
   
   @rdblue and other reviewers, thoughts on this?



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles(
     return CloseableIterable.combine(splitTasks, tasks);
   }
 
+  public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks(
+      List<T> splitFiles,
+      long splitSize,
+      int lookback,
+      long openFileCost,
+      StructProjection projection) {
+    Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative 
or 0): %s", splitSize);
+    Preconditions.checkArgument(
+        lookback > 0, "Invalid split planning lookback (negative or 0): %s", 
lookback);
+    Preconditions.checkArgument(
+        openFileCost >= 0, "Invalid file open cost (negative): %s", 
openFileCost);
+
+    // Check the size of delete file as well to avoid unbalanced bin-packing
+    Function<FileScanTask, Long> weightFunc =
+        file ->
+            Math.max(
+                file.length()
+                    + 
file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(),
+                (1 + file.deletes().size()) * openFileCost);
+
+    ListMultimap<StructLikeWrapper, FileScanTask> groupedFiles =
+        Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+
+    for (T task : splitFiles) {
+      PartitionSpec spec = task.spec();
+      StructProjection projectedStruct =
+          StructProjection.create(spec.partitionType(), projection.type());
+      Types.StructType projectedPartitionType = projectedStruct.type();
+      StructLikeWrapper wrapper =
+          StructLikeWrapper.forType(projectedPartitionType)

Review Comment:
   This invocation is very expensive as `forType` would call this constructor:
   
   ```
   private StructLikeWrapper(Types.StructType type) {
     this(Comparators.forType(type), JavaHash.forType(type));
   }
   ```
   
   Instead, we should try to call `forType` only once and then use `copyFor`.
   
   ```
   StructLikeWrapper wrapper = StructLikeWrapper.forType(...);
   
   for (T task : splitFiles) {
     StructProjection projection = ...
     projection.wrap(task.file().partition());
     groupedFiles.put(wrapper.copyFor(projection.copy()), task);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to