aokolnychyi commented on code in PR #2276:
URL: https://github.com/apache/iceberg/pull/2276#discussion_r998692787


##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -129,6 +129,34 @@ default ThisT select(String... columns) {
    */
   ThisT planWith(ExecutorService executorService);
 
+  /**
+   * Create a new {@link TableScan} which dictates that when plan tasks via 
the {@link

Review Comment:
   nit: Shouldn't reference `TableScan` anymore as it can be any type of 
`Scan`. Let's just write `a new scan`.



##########
api/src/main/java/org/apache/iceberg/util/StructProjection.java:
##########
@@ -90,6 +103,13 @@ public static StructProjection createAllowMissing(
   private final StructProjection[] nestedProjections;
   private StructLike struct;
 
+  private StructProjection(StructProjection other) {
+    this.type = other.type;
+    this.positionMap = other.positionMap;

Review Comment:
   We are simply assigning the same array references. Will that cause problems 
if arrays get mutated?



##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -129,6 +129,34 @@ default ThisT select(String... columns) {
    */
   ThisT planWith(ExecutorService executorService);
 
+  /**
+   * Create a new {@link TableScan} which dictates that when plan tasks via 
the {@link
+   * #planTasks()}, the scan should preserve partition boundary specified by 
the provided partition
+   * column names. In other words, the scan will not attempt to combine tasks 
whose input files have
+   * different partition data w.r.t `columns`.
+   *
+   * @param columns the partition column names to preserve boundary when 
planning tasks
+   * @return a table scan preserving partition boundary when planning tasks
+   * @throws IllegalArgumentException if any of the input columns is not a 
partition column, or if
+   *     the table is un-partitioned.
+   */
+  ThisT preservePartitions(Collection<String> columns);
+
+  /**
+   * Create a new {@link TableScan} which dictates that when plan tasks via 
the {@link
+   * #planTasks()}, the scan should preserve partition boundary specified by 
the provided partition

Review Comment:
   If we were to guess, what attributes will Spark pass us during 
storage-partitioned joins in the future? Will it pass data attributes used in 
the join condition or actual partition columns? Seems like it would be join 
attributes. If so, should we accept a list of attributes instead of actual 
partition names?
   
   Suppose I have a table partitioned by `day(ts), bucket(id)`. What's the 
input to this method?



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -87,11 +105,44 @@ public static CloseableIterable<CombinedScanTask> 
planTasks(
                     + 
file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(),
                 (1 + file.deletes().size()) * openFileCost);
 
-    return CloseableIterable.transform(
-        CloseableIterable.combine(
-            new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, 
weightFunc, true),
-            splitFiles),
-        BaseCombinedScanTask::new);
+    if (preservedPartitionIds != null && !preservedPartitionIds.isEmpty()) {
+      Preconditions.checkArgument(
+          spec != null, "spec can't be null when " + "preservedPartitionIds is 
not null");
+      StructProjection projectedStruct =
+          StructProjection.create(spec.partitionType(), preservedPartitionIds);
+      Types.StructType projectedPartitionType = projectedStruct.type();
+      ListMultimap<StructLikeWrapper, FileScanTask> groupedFiles =
+          Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+
+      splitFiles.forEach(
+          f -> {
+            StructLikeWrapper wrapper =
+                StructLikeWrapper.forType(projectedPartitionType)
+                    .set(projectedStruct.copy().wrap(f.file().partition()));
+            groupedFiles.put(wrapper, f);
+          });
+
+      List<Iterable<BaseCombinedScanTask>> groupedTasks =
+          groupedFiles.asMap().values().stream()
+              .map(
+                  t ->
+                      Iterables.transform(
+                          new BinPacking.PackingIterable<>(
+                              CloseableIterable.withNoopClose(t),
+                              splitSize,
+                              lookback,
+                              weightFunc,
+                              true),
+                          BaseCombinedScanTask::new))
+              .collect(Collectors.toList());
+      return CloseableIterable.combine(Iterables.concat(groupedTasks), 
splitFiles);
+    } else {
+      return CloseableIterable.transform(
+          CloseableIterable.combine(
+              new BinPacking.PackingIterable<>(splitFiles, splitSize, 
lookback, weightFunc, true),
+              splitFiles),
+          BaseCombinedScanTask::new);
+    }
   }
 
   @SuppressWarnings("unchecked")

Review Comment:
   We will to cover `planTaskGroups` as well. It is a new version of the API 
that we plan to switch to. It is already being used by incremental scans.



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -87,11 +105,44 @@ public static CloseableIterable<CombinedScanTask> 
planTasks(
                     + 
file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(),
                 (1 + file.deletes().size()) * openFileCost);
 
-    return CloseableIterable.transform(
-        CloseableIterable.combine(
-            new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, 
weightFunc, true),
-            splitFiles),
-        BaseCombinedScanTask::new);
+    if (preservedPartitionIds != null && !preservedPartitionIds.isEmpty()) {
+      Preconditions.checkArgument(
+          spec != null, "spec can't be null when " + "preservedPartitionIds is 
not null");
+      StructProjection projectedStruct =
+          StructProjection.create(spec.partitionType(), preservedPartitionIds);
+      Types.StructType projectedPartitionType = projectedStruct.type();
+      ListMultimap<StructLikeWrapper, FileScanTask> groupedFiles =
+          Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+
+      splitFiles.forEach(
+          f -> {
+            StructLikeWrapper wrapper =
+                StructLikeWrapper.forType(projectedPartitionType)
+                    .set(projectedStruct.copy().wrap(f.file().partition()));
+            groupedFiles.put(wrapper, f);
+          });
+
+      List<Iterable<BaseCombinedScanTask>> groupedTasks =
+          groupedFiles.asMap().values().stream()
+              .map(
+                  t ->
+                      Iterables.transform(
+                          new BinPacking.PackingIterable<>(
+                              CloseableIterable.withNoopClose(t),

Review Comment:
   Since we hold these entries in memory, shall we also sort them by partition 
before combining?



##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -129,6 +129,34 @@ default ThisT select(String... columns) {
    */
   ThisT planWith(ExecutorService executorService);
 
+  /**
+   * Create a new {@link TableScan} which dictates that when plan tasks via 
the {@link
+   * #planTasks()}, the scan should preserve partition boundary specified by 
the provided partition
+   * column names. In other words, the scan will not attempt to combine tasks 
whose input files have
+   * different partition data w.r.t `columns`.
+   *
+   * @param columns the partition column names to preserve boundary when 
planning tasks
+   * @return a table scan preserving partition boundary when planning tasks
+   * @throws IllegalArgumentException if any of the input columns is not a 
partition column, or if
+   *     the table is un-partitioned.
+   */
+  ThisT preservePartitions(Collection<String> columns);
+
+  /**
+   * Create a new {@link TableScan} which dictates that when plan tasks via 
the {@link
+   * #planTasks()}, the scan should preserve partition boundary specified by 
the provided partition
+   * column names. In other words, the scan will not attempt to combine tasks 
whose input files have
+   * different partition data w.r.t `columns`.
+   *
+   * @param columns the partition column names to preserve boundary when 
planning tasks
+   * @return a table scan preserving partition boundary when planning tasks
+   * @throws IllegalArgumentException if any of the input columns is not a 
partition column, or if
+   *     the table is un-partitioned.
+   */
+  default ThisT preservePartitions(String... columns) {

Review Comment:
   Hm, I'd play around with the method name as `preservePartitions` may be 
unclear without reading the doc.
   Let me think.



##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -129,6 +129,34 @@ default ThisT select(String... columns) {
    */
   ThisT planWith(ExecutorService executorService);
 
+  /**
+   * Create a new {@link TableScan} which dictates that when plan tasks via 
the {@link
+   * #planTasks()}, the scan should preserve partition boundary specified by 
the provided partition
+   * column names. In other words, the scan will not attempt to combine tasks 
whose input files have
+   * different partition data w.r.t `columns`.
+   *
+   * @param columns the partition column names to preserve boundary when 
planning tasks
+   * @return a table scan preserving partition boundary when planning tasks
+   * @throws IllegalArgumentException if any of the input columns is not a 
partition column, or if
+   *     the table is un-partitioned.
+   */
+  ThisT preservePartitions(Collection<String> columns);

Review Comment:
   I'd probably throw an exception to avoid breaking custom implementations.
   
   ```
   throw new UnsupportedOperationException(
       this.getClass().getName() + " does not implement preservePartitions");
   ```



##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -160,6 +161,31 @@ public ThisT planWith(ExecutorService executorService) {
     return newRefinedScan(ops, table, schema, 
context.planWith(executorService));
   }
 
+  @Override
+  public ThisT preservePartitions(Collection<String> columns) {
+    if (table.spec().isUnpartitioned()) {

Review Comment:
   What if we have multiple partition specs in the table? Shall we use 
`Partitioning.partitionType()`?
   Let me think.



##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -87,11 +105,44 @@ public static CloseableIterable<CombinedScanTask> 
planTasks(
                     + 
file.deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum(),
                 (1 + file.deletes().size()) * openFileCost);
 
-    return CloseableIterable.transform(
-        CloseableIterable.combine(
-            new BinPacking.PackingIterable<>(splitFiles, splitSize, lookback, 
weightFunc, true),
-            splitFiles),
-        BaseCombinedScanTask::new);
+    if (preservedPartitionIds != null && !preservedPartitionIds.isEmpty()) {
+      Preconditions.checkArgument(
+          spec != null, "spec can't be null when " + "preservedPartitionIds is 
not null");
+      StructProjection projectedStruct =
+          StructProjection.create(spec.partitionType(), preservedPartitionIds);

Review Comment:
   We should take into account that there may be multiple specs. Also, we need 
to test both v1 and v2 tables as they have different spec evolution rules.



##########
api/src/main/java/org/apache/iceberg/Scan.java:
##########
@@ -129,6 +129,34 @@ default ThisT select(String... columns) {
    */
   ThisT planWith(ExecutorService executorService);
 
+  /**
+   * Create a new {@link TableScan} which dictates that when plan tasks via 
the {@link
+   * #planTasks()}, the scan should preserve partition boundary specified by 
the provided partition
+   * column names. In other words, the scan will not attempt to combine tasks 
whose input files have
+   * different partition data w.r.t `columns`.
+   *
+   * @param columns the partition column names to preserve boundary when 
planning tasks
+   * @return a table scan preserving partition boundary when planning tasks
+   * @throws IllegalArgumentException if any of the input columns is not a 
partition column, or if
+   *     the table is un-partitioned.
+   */
+  ThisT preservePartitions(Collection<String> columns);

Review Comment:
   This would also allow us to avoid changes in `revapi.yml`, I guess.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to