singhpk234 commented on code in PR #14480:
URL: https://github.com/apache/iceberg/pull/14480#discussion_r2485388022
##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -429,6 +562,68 @@ public <T extends RESTResponse> T handleRequest(
return null;
}
+ /**
+ * Do all the planning upfront but batch the file scan tasks across plan
tasks. Plan Tasks have a
+ * key like <plan ID - table UUID - plan task sequence> The current
implementation simply uses
+ * plan tasks as a pagination mechanism to control response sizes.
+ *
+ * @param tableScan
+ * @param planId
+ */
+ private void planFilesFor(TableScan tableScan, String planId) {
+ Iterable<List<FileScanTask>> taskGroupings =
+ Iterables.partition(
+ tableScan.planFiles(),
planningBehavior.numberFileScanTasksPerPlanTask());
Review Comment:
is it possible to always force the `planFIles()` to have 1 FS task per file
? mostly coming from a file split offset FS task where a file can be part of
multiple FS tasks, having 1 FS per file (we need to handle it in rest spec too
the split scan task :) )
##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -294,6 +320,113 @@ public <T extends RESTResponse> T handleRequest(
return castResponse(responseType, response);
}
+ case PLAN_TABLE_SCAN:
+ {
+ TableIdentifier ident = tableIdentFromPathVars(vars);
+ PlanTableScanRequest request =
castRequest(PlanTableScanRequest.class, body);
+ Table table = catalog.loadTable(ident);
+ TableScan tableScan = table.newScan();
+
+ if (request.snapshotId() != null) {
+ tableScan = tableScan.useSnapshot(request.snapshotId());
+ }
+ if (request.select() != null) {
+ tableScan = tableScan.select(request.select());
+ }
+ if (request.filter() != null) {
+ tableScan = tableScan.filter(request.filter());
+ }
+ if (request.statsFields() != null) {
+ tableScan = tableScan.includeColumnStats(request.statsFields());
+ }
+
+ tableScan = tableScan.caseSensitive(request.caseSensitive());
+
+ if (planningBehavior.shouldPlanTableScanAsync(tableScan)) {
+ String asyncPlanId = UUID.randomUUID().toString();
+ asyncPlanFiles(tableScan, asyncPlanId);
+ return castResponse(
+ responseType,
+ PlanTableScanResponse.builder()
+ .withPlanId(asyncPlanId)
+ .withPlanStatus(PlanStatus.SUBMITTED)
+ .withSpecsById(table.specs())
+ .build());
+ }
+
+ String planId = UUID.randomUUID().toString();
+ planFilesFor(tableScan, planId);
+ Pair<List<FileScanTask>, String> tasksAndPlan =
initialScanTasksForPlan(planId);
+ return castResponse(
+ responseType,
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(nextPlanTasks(tasksAndPlan.second()))
+ .withFileScanTasks(tasksAndPlan.first())
+ .withDeleteFiles(
+ tasksAndPlan.first().stream()
+ .flatMap(t -> t.deletes().stream())
+ .distinct()
+ .collect(Collectors.toList()))
+ .withSpecsById(table.specs())
+ .build());
+ }
+
+ case FETCH_PLANNING_RESULT:
+ {
+ TableIdentifier ident = tableIdentFromPathVars(vars);
+ Table table = catalog.loadTable(ident);
+ String planId = planIDFromPathVars(vars);
+ Pair<List<FileScanTask>, String> tasksAndPlan =
initialScanTasksForPlan(planId);
+ return castResponse(
+ responseType,
+ FetchPlanningResultResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withDeleteFiles(
+ tasksAndPlan.first().stream()
+ .flatMap(t -> t.deletes().stream())
+ .distinct()
+ .collect(Collectors.toList()))
+ .withFileScanTasks(tasksAndPlan.first())
+ .withPlanTasks(nextPlanTasks(tasksAndPlan.second()))
+ .withSpecsById(table.specs())
+ .build());
+ }
+
+ case FETCH_SCAN_TASKS:
+ {
+ TableIdentifier ident = tableIdentFromPathVars(vars);
+ Table table = catalog.loadTable(ident);
+ FetchScanTasksRequest request =
castRequest(FetchScanTasksRequest.class, body);
+ String planTask = request.planTask();
+ List<FileScanTask> fileScanTasks =
planTaskToFileScanTasks.get(planTask);
+ if (fileScanTasks == null) {
+ throw new NoSuchPlanTaskException("Could not find tasks for plan
task %s", planTask);
+ }
+
+ // Simple implementation, only have at most 1 "next" plan task to
simulate pagination
+ return castResponse(
+ responseType,
+ FetchScanTasksResponse.builder()
+ .withFileScanTasks(fileScanTasks)
+ .withPlanTasks(nextPlanTasks(planTask))
+ .withSpecsById(table.specs())
Review Comment:
I just realized, this is not accordance to the spec, we don't return specs
by id, whats happening presently is (Apologies, I missed this in the Req /
Response model implementation)
This response `FetchScanTasksResponse` toJson skips serializing specs by id
and when de-serializing to FetchScanTasksResponse this is skipped again even
though we know what is
there are 2 approaches to fix this :
1. remove spec-by-id from the base class
2. populate the spec-by-id in the deserailzed response too
Approach 1 seems cleaner but we might have to have a rev-api entry, which is
fine since no java based rest catalog supports this yet.
Approach 2 is also fine but it might give an impression we are not according
to the spec.
##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -429,6 +562,68 @@ public <T extends RESTResponse> T handleRequest(
return null;
}
+ /**
+ * Do all the planning upfront but batch the file scan tasks across plan
tasks. Plan Tasks have a
+ * key like <plan ID - table UUID - plan task sequence> The current
implementation simply uses
+ * plan tasks as a pagination mechanism to control response sizes.
+ *
+ * @param tableScan
+ * @param planId
+ */
+ private void planFilesFor(TableScan tableScan, String planId) {
+ Iterable<List<FileScanTask>> taskGroupings =
+ Iterables.partition(
+ tableScan.planFiles(),
planningBehavior.numberFileScanTasksPerPlanTask());
+ int planTaskSequence = 0;
+ String prevPlanTask = null;
+ for (List<FileScanTask> taskGrouping : taskGroupings) {
+ String planTaskKey =
+ String.format("%s-%s-%s", planId, tableScan.table().uuid(),
planTaskSequence++);
+ planTaskToFileScanTasks.put(planTaskKey, taskGrouping);
+ if (prevPlanTask != null) {
+ planTaskToNext.put(prevPlanTask, planTaskKey);
+ }
+
+ prevPlanTask = planTaskKey;
+ }
+ }
+
+ private void asyncPlanFiles(TableScan scan, String asyncPlanId) {
+ asyncPlanningPool.submit(
+ () -> {
+ planFilesFor(scan, asyncPlanId);
+ });
+ }
+
+ // The initial set of file scan tasks is going to have a sentinel plan task
which ends in
+ // 0. Directly return this set of file scan tasks as the initial set, along
with
+ // any next plan task if applicable
+ private Pair<List<FileScanTask>, String> initialScanTasksForPlan(String
planId) {
+ Set<Map.Entry<String, List<FileScanTask>>> initialPlanTaskAndFileScanTasks
=
+ planTaskToFileScanTasks.entrySet().stream()
+ .filter(
+ planTask -> planTask.getKey().contains(planId) &&
planTask.getKey().endsWith("0"))
Review Comment:
should we split at the last `-` and see if its just 0, mostly coming from
case where we have like `10` tasks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]