aokolnychyi commented on code in PR #6924:
URL: https://github.com/apache/iceberg/pull/6924#discussion_r1128931126


##########
core/src/main/java/org/apache/iceberg/util/TableScanUtil.java:
##########
@@ -79,6 +79,23 @@ public static CloseableIterable<FileScanTask> splitFiles(
     return CloseableIterable.combine(splitTasks, tasks);
   }
 
+  public static <T extends ScanTask> CloseableIterable<T> splitScanTasks(

Review Comment:
   Instead of doing this, we should offer another method with the following 
signature.
   
   ```
   public static <T extends ScanTask> List<ScanTaskGroup<T>> planTaskGroups(
       List<T> tasks,
       long splitSize,
       int lookback,
       long openFileCost) {
     ...
   }
   ```
   
   This will work in the scan for staged tasks as well as for 
`SparkPartitioningAwareScan`. Under the hood, it will simply delegate to the 
existing `planTaskGroups` but will wrap list into `CloseableIterable`.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/AbstractFileRewriteCoordinator.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractFileRewriteCoordinator<F extends ContentFile<F>> 
{

Review Comment:
   A lot has changed since we added `FileScanTaskSetManager`. I think that was 
even me? Back then we did not have any other scan task types and our Spark 
logic wasn't that flexible in terms of supporting those scan task types. That's 
no longer true.
   
   Instead of offering a file rewrite coordinator for each file type and 
adapting the existing logic, I'd deprecate existing classes and create a new 
more flexible path. We will eventually drop the old path.
   
   I'd start by adding a more generic read option.
   
   ```
   public static final String SCAN_TASK_SET_ID = "scan-task-set-id";
   ```
   
   We would then create a new task set manager with signature like below (just 
an idea, can be different).
   
   ```
   public class ScanTaskSetManager {
   
     public void stageTasks(Table table, String setId, List<? extends ScanTask> 
tasks) {
       ...
     }
   
     @SuppressWarnings("unchecked")
     public <T extends ScanTask> List<T> fetchTasks(Table table, String setId) {
       ...
     }
   
     @SuppressWarnings("unchecked")
     public <T extends ScanTask> List<T> removeTasks(Table table, String setID) 
{
       ...
     }
   }
   ```
   
   Then `SparkStagedScanBuilder` and `SparkStagedScan` classes that would 
support arbitrary scan tasks. The scan implementation will be very 
straightforward.
   
   ```
   @Override
   protected List<ScanTaskGroup<ScanTask>> taskGroups() {
     if (taskGroups == null) {
       ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
       List<ScanTask> tasks = taskSetManager.fetchTasks(table(), taskSetId);
       ValidationException.check(
           tasks != null,
           "Task set manager has no tasks for table %s with task set ID %s",
           table().name(),
           taskSetId);
   
       this.taskGroups = TableScanUtil.planTaskGroups(tasks, splitSize, 
splitLookback, openFileCost);
     }
   
     return taskGroups;
   }
   ```
   
   Then new path can be used for data and delete file compaction.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/AbstractFileRewriteCoordinator.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractFileRewriteCoordinator<F extends ContentFile<F>> 
{

Review Comment:
   This may seem like more work but it will actually make this PR smaller and 
it is very close to what you did initially.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to