aokolnychyi commented on code in PR #2276: URL: https://github.com/apache/iceberg/pull/2276#discussion_r1014217373
########## core/src/main/java/org/apache/iceberg/util/TableScanUtil.java: ########## @@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles( return CloseableIterable.combine(splitTasks, tasks); } + public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks( + List<T> splitFiles, + long splitSize, + int lookback, + long openFileCost, + StructProjection projection) { Review Comment: Why not have it as `StructType`? I believe we just need the type of the key we can't combine by. When we consume this from Spark, we will compute an intersection of all partition specs in `Partitioning` as `StructType`, similar to what we already do in `Partitioning$partitionType` (computes a union of all partition types). ########## core/src/main/java/org/apache/iceberg/util/TableScanUtil.java: ########## @@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles( return CloseableIterable.combine(splitTasks, tasks); } + public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks( Review Comment: If we accept a list, what about returning a list as well? We access tasks by index in `SparkBatch`. ########## core/src/main/java/org/apache/iceberg/util/TableScanUtil.java: ########## @@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles( return CloseableIterable.combine(splitTasks, tasks); } + public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks( + List<T> splitFiles, + long splitSize, + int lookback, + long openFileCost, + StructProjection projection) { + Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize); + Preconditions.checkArgument( + lookback > 0, "Invalid split planning lookback (negative or 0): %s", lookback); + Preconditions.checkArgument( + openFileCost >= 0, "Invalid file open cost (negative): %s", openFileCost); + + // Check the size of delete file as well to avoid unbalanced bin-packing + Function<FileScanTask, Long> weightFunc = + file -> + Math.max( + file.length() + + file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(), + (1 + file.deletes().size()) * openFileCost); + + ListMultimap<StructLikeWrapper, FileScanTask> groupedFiles = + Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList); + + for (T task : splitFiles) { + PartitionSpec spec = task.spec(); + StructProjection projectedStruct = + StructProjection.create(spec.partitionType(), projection.type()); Review Comment: I am afraid creating a projection for each task would be expensive. What about having a map of projections that would be lazily initialized? That way, we will only create one projection per spec. ``` Map<Integer, StructProjection> projectionsBySpec = Maps.newHashMap(); ... projectionsBySpec.computeIfAbsent( spec.specId(), specId -> StructProjection.create(spec.partitionType(), ...)); ``` ########## core/src/main/java/org/apache/iceberg/util/TableScanUtil.java: ########## @@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles( return CloseableIterable.combine(splitTasks, tasks); } + public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks( Review Comment: I would consider adding `PartitionScanTask` and using it as a boundary in this method. That way, we can make this logic generic and support arbitrary scan tasks with partition info. ``` public interface PartitionScanTask extends ScanTask { PartitionSpec spec(); StructLike partition(); } ``` ``` public static <T extends PartitionScanTask> List<ScanTaskGroup<T>> planTaskGroups(...) {} ``` @rdblue and other reviewers, thoughts on this? ########## core/src/main/java/org/apache/iceberg/util/TableScanUtil.java: ########## @@ -71,6 +78,57 @@ public static CloseableIterable<FileScanTask> splitFiles( return CloseableIterable.combine(splitTasks, tasks); } + public static <T extends FileScanTask> Iterable<CombinedScanTask> planTasks( + List<T> splitFiles, + long splitSize, + int lookback, + long openFileCost, + StructProjection projection) { + Preconditions.checkArgument(splitSize > 0, "Invalid split size (negative or 0): %s", splitSize); + Preconditions.checkArgument( + lookback > 0, "Invalid split planning lookback (negative or 0): %s", lookback); + Preconditions.checkArgument( + openFileCost >= 0, "Invalid file open cost (negative): %s", openFileCost); + + // Check the size of delete file as well to avoid unbalanced bin-packing + Function<FileScanTask, Long> weightFunc = + file -> + Math.max( + file.length() + + file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(), + (1 + file.deletes().size()) * openFileCost); + + ListMultimap<StructLikeWrapper, FileScanTask> groupedFiles = + Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList); + + for (T task : splitFiles) { + PartitionSpec spec = task.spec(); + StructProjection projectedStruct = + StructProjection.create(spec.partitionType(), projection.type()); + Types.StructType projectedPartitionType = projectedStruct.type(); + StructLikeWrapper wrapper = + StructLikeWrapper.forType(projectedPartitionType) Review Comment: This invocation is very expensive as `forType` would call this constructor: ``` private StructLikeWrapper(Types.StructType type) { this(Comparators.forType(type), JavaHash.forType(type)); } ``` Instead, we should try to call `forType` only once and then use `copyFor`. ``` StructLikeWrapper wrapper = StructLikeWrapper.forType(...); for (T task : splitFiles) { StructProjection projection = ... projection.wrap(task.file().partition()); groupedFiles.put(wrapper.copyFor(projection.copy()), task); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org