singhpk234 commented on code in PR #14480:
URL: https://github.com/apache/iceberg/pull/14480#discussion_r2485388022


##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -429,6 +562,68 @@ public <T extends RESTResponse> T handleRequest(
     return null;
   }
 
+  /**
+   * Do all the planning upfront but batch the file scan tasks across plan 
tasks. Plan Tasks have a
+   * key like <plan ID - table UUID - plan task sequence> The current 
implementation simply uses
+   * plan tasks as a pagination mechanism to control response sizes.
+   *
+   * @param tableScan
+   * @param planId
+   */
+  private void planFilesFor(TableScan tableScan, String planId) {
+    Iterable<List<FileScanTask>> taskGroupings =
+        Iterables.partition(
+            tableScan.planFiles(), 
planningBehavior.numberFileScanTasksPerPlanTask());

Review Comment:
   is it possible to always force the `planFIles()` to have 1 FS task per file 
? mostly coming from a file split offset FS task where a file can be part of 
multiple FS tasks, having 1 FS per file (we need to handle it in rest spec too 
the split scan task :) )



##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -294,6 +320,113 @@ public <T extends RESTResponse> T handleRequest(
           return castResponse(responseType, response);
         }
 
+      case PLAN_TABLE_SCAN:
+        {
+          TableIdentifier ident = tableIdentFromPathVars(vars);
+          PlanTableScanRequest request = 
castRequest(PlanTableScanRequest.class, body);
+          Table table = catalog.loadTable(ident);
+          TableScan tableScan = table.newScan();
+
+          if (request.snapshotId() != null) {
+            tableScan = tableScan.useSnapshot(request.snapshotId());
+          }
+          if (request.select() != null) {
+            tableScan = tableScan.select(request.select());
+          }
+          if (request.filter() != null) {
+            tableScan = tableScan.filter(request.filter());
+          }
+          if (request.statsFields() != null) {
+            tableScan = tableScan.includeColumnStats(request.statsFields());
+          }
+
+          tableScan = tableScan.caseSensitive(request.caseSensitive());
+
+          if (planningBehavior.shouldPlanTableScanAsync(tableScan)) {
+            String asyncPlanId = UUID.randomUUID().toString();
+            asyncPlanFiles(tableScan, asyncPlanId);
+            return castResponse(
+                responseType,
+                PlanTableScanResponse.builder()
+                    .withPlanId(asyncPlanId)
+                    .withPlanStatus(PlanStatus.SUBMITTED)
+                    .withSpecsById(table.specs())
+                    .build());
+          }
+
+          String planId = UUID.randomUUID().toString();
+          planFilesFor(tableScan, planId);
+          Pair<List<FileScanTask>, String> tasksAndPlan = 
initialScanTasksForPlan(planId);
+          return castResponse(
+              responseType,
+              PlanTableScanResponse.builder()
+                  .withPlanStatus(PlanStatus.COMPLETED)
+                  .withPlanTasks(nextPlanTasks(tasksAndPlan.second()))
+                  .withFileScanTasks(tasksAndPlan.first())
+                  .withDeleteFiles(
+                      tasksAndPlan.first().stream()
+                          .flatMap(t -> t.deletes().stream())
+                          .distinct()
+                          .collect(Collectors.toList()))
+                  .withSpecsById(table.specs())
+                  .build());
+        }
+
+      case FETCH_PLANNING_RESULT:
+        {
+          TableIdentifier ident = tableIdentFromPathVars(vars);
+          Table table = catalog.loadTable(ident);
+          String planId = planIDFromPathVars(vars);
+          Pair<List<FileScanTask>, String> tasksAndPlan = 
initialScanTasksForPlan(planId);
+          return castResponse(
+              responseType,
+              FetchPlanningResultResponse.builder()
+                  .withPlanStatus(PlanStatus.COMPLETED)
+                  .withDeleteFiles(
+                      tasksAndPlan.first().stream()
+                          .flatMap(t -> t.deletes().stream())
+                          .distinct()
+                          .collect(Collectors.toList()))
+                  .withFileScanTasks(tasksAndPlan.first())
+                  .withPlanTasks(nextPlanTasks(tasksAndPlan.second()))
+                  .withSpecsById(table.specs())
+                  .build());
+        }
+
+      case FETCH_SCAN_TASKS:
+        {
+          TableIdentifier ident = tableIdentFromPathVars(vars);
+          Table table = catalog.loadTable(ident);
+          FetchScanTasksRequest request = 
castRequest(FetchScanTasksRequest.class, body);
+          String planTask = request.planTask();
+          List<FileScanTask> fileScanTasks = 
planTaskToFileScanTasks.get(planTask);
+          if (fileScanTasks == null) {
+            throw new NoSuchPlanTaskException("Could not find tasks for plan 
task %s", planTask);
+          }
+
+          // Simple implementation, only have at most 1 "next" plan task to 
simulate pagination
+          return castResponse(
+              responseType,
+              FetchScanTasksResponse.builder()
+                  .withFileScanTasks(fileScanTasks)
+                  .withPlanTasks(nextPlanTasks(planTask))
+                  .withSpecsById(table.specs())

Review Comment:
   I just realized, this is not accordance to the spec, we don't return specs 
by id, whats happening presently is (Apologies, I missed this in the Req / 
Response model implementation)
   
   This response `FetchScanTasksResponse` toJson skips serializing specs by id 
and when de-serializing to FetchScanTasksResponse this is skipped again even 
though we know what is 
   
   there are 2 approaches to fix this : 
   1. remove spec-by-id from the base class 
   2. populate the spec-by-id in the deserailzed response too 
   
   Approach 1 seems cleaner but we might have to have a rev-api entry, which is 
fine since no java based rest catalog supports this yet.
   Approach 2 is also fine but it might give an impression we are not according 
to the spec. 



##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -429,6 +562,68 @@ public <T extends RESTResponse> T handleRequest(
     return null;
   }
 
+  /**
+   * Do all the planning upfront but batch the file scan tasks across plan 
tasks. Plan Tasks have a
+   * key like <plan ID - table UUID - plan task sequence> The current 
implementation simply uses
+   * plan tasks as a pagination mechanism to control response sizes.
+   *
+   * @param tableScan
+   * @param planId
+   */
+  private void planFilesFor(TableScan tableScan, String planId) {
+    Iterable<List<FileScanTask>> taskGroupings =
+        Iterables.partition(
+            tableScan.planFiles(), 
planningBehavior.numberFileScanTasksPerPlanTask());
+    int planTaskSequence = 0;
+    String prevPlanTask = null;
+    for (List<FileScanTask> taskGrouping : taskGroupings) {
+      String planTaskKey =
+          String.format("%s-%s-%s", planId, tableScan.table().uuid(), 
planTaskSequence++);
+      planTaskToFileScanTasks.put(planTaskKey, taskGrouping);
+      if (prevPlanTask != null) {
+        planTaskToNext.put(prevPlanTask, planTaskKey);
+      }
+
+      prevPlanTask = planTaskKey;
+    }
+  }
+
+  private void asyncPlanFiles(TableScan scan, String asyncPlanId) {
+    asyncPlanningPool.submit(
+        () -> {
+          planFilesFor(scan, asyncPlanId);
+        });
+  }
+
+  // The initial set of file scan tasks is going to have a sentinel plan task 
which ends in
+  // 0. Directly return this set of file scan tasks as the initial set, along 
with
+  // any next plan task if applicable
+  private Pair<List<FileScanTask>, String> initialScanTasksForPlan(String 
planId) {
+    Set<Map.Entry<String, List<FileScanTask>>> initialPlanTaskAndFileScanTasks 
=
+        planTaskToFileScanTasks.entrySet().stream()
+            .filter(
+                planTask -> planTask.getKey().contains(planId) && 
planTask.getKey().endsWith("0"))

Review Comment:
   should we split at the last `-` and see if its just 0, mostly coming from 
case where we have like `10` tasks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to