amogh-jahagirdar commented on code in PR #13400:
URL: https://github.com/apache/iceberg/pull/13400#discussion_r2389078577
##########
core/src/main/java/org/apache/iceberg/rest/RESTFileScanTaskParser.java:
##########
@@ -86,8 +86,9 @@ public static FileScanTask fromJson(
DeleteFile[] deleteFiles = null;
if (jsonNode.has(DELETE_FILE_REFERENCES)) {
List<Integer> indices = JsonUtil.getIntegerList(DELETE_FILE_REFERENCES,
jsonNode);
+ // TODO: revisit this check when delete files are not required
Preconditions.checkArgument(
- Collections.max(indices) < allDeleteFiles.size(),
+ indices.isEmpty() || Collections.max(indices) <
allDeleteFiles.size(),
Review Comment:
I think this change is right; after all, it's possible that the delete file
references is defined but an empty list for a given task.
##########
core/src/main/java/org/apache/iceberg/RESTTableScan.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableIterable;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.ParserContext;
+import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ParallelIterable;
+
+public class RESTTableScan extends DataTableScan {
+ private final RESTClient client;
+ private final String path;
+ private final Supplier<Map<String, String>> headers;
+ private final TableOperations operations;
+ private final Table table;
+ private final ResourcePaths resourcePaths;
+ private final TableIdentifier tableIdentifier;
+ private final ParserContext parserContext;
+
+ // Track the current plan ID for cancellation
+ private volatile String currentPlanId = null;
+
+ // TODO revisit if this property should be configurable
+ private static final int FETCH_PLANNING_SLEEP_DURATION_MS = 1000;
+ private static final long MAX_WAIT_TIME_MS = 5 * 60 * 1000L;
+
+ RESTTableScan(
+ Table table,
+ Schema schema,
+ TableScanContext context,
+ RESTClient client,
+ String path,
+ Supplier<Map<String, String>> headers,
+ TableOperations operations,
+ TableIdentifier tableIdentifier,
+ ResourcePaths resourcePaths) {
+ super(table, schema, context);
+ this.table = table;
+ this.client = client;
+ this.headers = headers;
+ this.path = path;
+ this.operations = operations;
+ this.tableIdentifier = tableIdentifier;
+ this.resourcePaths = resourcePaths;
+ this.parserContext =
+ ParserContext.builder()
+ .add("specsById", table.specs())
+ .add("caseSensitive", context().caseSensitive())
+ .build();
+ }
+
+ @Override
+ protected TableScan newRefinedScan(
+ Table refinedTable, Schema refinedSchema, TableScanContext
refinedContext) {
+ return new RESTTableScan(
+ refinedTable,
+ refinedSchema,
+ refinedContext,
+ client,
+ path,
+ headers,
+ operations,
+ tableIdentifier,
+ resourcePaths);
+ }
+
+ @Override
+ public CloseableIterable<FileScanTask> planFiles() {
+ Long startSnapshotId = context().fromSnapshotId();
+ Long endSnapshotId = context().toSnapshotId();
+ Long snapshotId = snapshotId();
+ List<String> selectedColumns =
+
schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList());
+
+ List<String> statsFields = null;
+ if (columnsToKeepStats() != null) {
+ statsFields =
+ columnsToKeepStats().stream()
+ .map(columnId -> schema().findColumnName(columnId))
+ .collect(Collectors.toList());
+ }
+
+ PlanTableScanRequest.Builder planTableScanRequestBuilder =
+ new PlanTableScanRequest.Builder()
+ .withSelect(selectedColumns)
+ .withFilter(filter())
+ .withCaseSensitive(isCaseSensitive())
+ .withStatsFields(statsFields);
+
+ if (startSnapshotId != null && endSnapshotId != null) {
+ planTableScanRequestBuilder
+ .withStartSnapshotId(startSnapshotId)
+ .withEndSnapshotId(endSnapshotId)
+ .withUseSnapshotSchema(true);
+
+ } else if (snapshotId != null) {
+ boolean useSnapShotSchema = snapshotId !=
table.currentSnapshot().snapshotId();
+ planTableScanRequestBuilder
+ .withSnapshotId(snapshotId)
+ .withUseSnapshotSchema(useSnapShotSchema);
+
+ } else {
+
planTableScanRequestBuilder.withSnapshotId(table().currentSnapshot().snapshotId());
+ }
+
+ return planTableScan(planTableScanRequestBuilder.build());
+ }
+
+ private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest
planTableScanRequest) {
+
+ PlanTableScanResponse response =
+ client.post(
+ resourcePaths.planTableScan(tableIdentifier),
+ planTableScanRequest,
+ PlanTableScanResponse.class,
+ headers.get(),
+ ErrorHandlers.defaultErrorHandler(),
+ stringStringMap -> {},
+ parserContext);
+
+ PlanStatus planStatus = response.planStatus();
+ switch (planStatus) {
+ case COMPLETED:
+ return getScanTasksIterable(response.planTasks(),
response.fileScanTasks());
+ case SUBMITTED:
+ return fetchPlanningResult(response.planId());
+ case FAILED:
+ throw new IllegalStateException(
+ "Received \"failed\" status from service when planning a table
scan");
+ case CANCELLED:
+ throw new IllegalStateException(
+ "Received \"cancelled\" status from service when planning a table
scan");
+ default:
+ throw new RuntimeException(
+ String.format("Invalid planStatus during planTableScan: %s",
planStatus));
+ }
+ }
+
+ private CloseableIterable<FileScanTask> fetchPlanningResult(String planId) {
+ // Set the current plan ID for potential cancellation
+ currentPlanId = planId;
+
+ try {
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime <= MAX_WAIT_TIME_MS) {
+ FetchPlanningResultResponse response =
+ client.get(
+ resourcePaths.fetchPlanningResult(tableIdentifier, planId),
+ Map.of(),
+ FetchPlanningResultResponse.class,
+ headers.get(),
+ ErrorHandlers.defaultErrorHandler(),
+ parserContext);
+
+ PlanStatus planStatus = response.planStatus();
+ switch (planStatus) {
+ case COMPLETED:
+ return getScanTasksIterable(response.planTasks(),
response.fileScanTasks());
+ case SUBMITTED:
+ try {
+ // TODO: if we want to add some jitter here to avoid thundering
herd.
+ Thread.sleep(FETCH_PLANNING_SLEEP_DURATION_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // Attempt to cancel the plan before exiting
+ cancelPlan();
+ throw new RuntimeException("Interrupted while fetching plan
status", e);
+ }
+ break;
+ case FAILED:
+ throw new IllegalStateException(
+ "Received \"failed\" status from service when fetching a table
scan");
+ case CANCELLED:
+ throw new IllegalStateException(
+ String.format(
+ Locale.ROOT,
+ "Received \"cancelled\" status from service when fetching
a table scan, planId: %s is invalid",
+ planId));
+ default:
+ throw new IllegalStateException(
+ String.format(
+ Locale.ROOT, "Invalid planStatus during
fetchPlanningResult: %s", planStatus));
+ }
+ }
+ // If we reach here, we've exceeded the max wait time
+ currentPlanId = null; // Clear on timeout
+ throw new IllegalStateException(
+ String.format(
+ Locale.ROOT,
+ "Exceeded max wait time of %d ms when fetching planning result",
+ MAX_WAIT_TIME_MS));
+ } catch (Exception e) {
+ // Clear the plan ID on any exception (except successful completion)
+ currentPlanId = null;
+ throw e;
+ }
+ }
+
+ public CloseableIterable<FileScanTask> getScanTasksIterable(
+ List<String> planTasks, List<FileScanTask> fileScanTasks) {
+ List<ScanTasksIterable> iterableOfScanTaskIterables = Lists.newArrayList();
+ if (fileScanTasks != null) {
+ // add this to the list for below if planTasks will also be present
+ ScanTasksIterable scanTasksIterable =
+ new ScanTasksIterable(
+ fileScanTasks,
+ client,
+ resourcePaths,
+ tableIdentifier,
+ headers,
+ planExecutor(),
+ table.specs(),
+ isCaseSensitive(),
+ this::cancelPlan);
+ iterableOfScanTaskIterables.add(scanTasksIterable);
+ }
+ if (planTasks != null) {
+ // Use parallel iterable since planTasks are present
+ for (String planTask : planTasks) {
+ ScanTasksIterable iterable =
+ new ScanTasksIterable(
+ planTask,
+ client,
+ resourcePaths,
+ tableIdentifier,
+ headers,
+ planExecutor(),
+ table.specs(),
+ isCaseSensitive(),
+ this::cancelPlan);
+ iterableOfScanTaskIterables.add(iterable);
+ }
+ return new ParallelIterable<>(iterableOfScanTaskIterables,
planExecutor());
+ }
+ // use a single scanTasks iterable since no need to parallelize since no
planTasks
+ return new ScanTasksIterable(
+ fileScanTasks,
+ client,
+ resourcePaths,
+ tableIdentifier,
+ headers,
+ planExecutor(),
+ table.specs(),
+ isCaseSensitive(),
+ this::cancelPlan);
+ }
+
+ @VisibleForTesting
+ @SuppressWarnings("checkstyle:RegexpMultiline")
Review Comment:
What is this for?
##########
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java:
##########
@@ -460,6 +468,11 @@ public Table loadTable(SessionContext context,
TableIdentifier identifier) {
trackFileIO(ops);
+ RESTTable restTable = getRemoteScanPlanningTable(ops, finalIdentifier,
tableClient);
Review Comment:
`restTableFor`? or just `restTable`, since we limit the use of get in the
project
##########
core/src/main/java/org/apache/iceberg/rest/ResourcePaths.java:
##########
@@ -133,4 +133,50 @@ public String view(TableIdentifier ident) {
public String renameView() {
return SLASH.join("v1", prefix, "views", "rename");
}
+
+ public String planTableScan(TableIdentifier ident) {
+ return SLASH.join(
+ "v1",
+ prefix,
+ "namespaces",
+ RESTUtil.encodeNamespace(ident.namespace()),
+ "tables",
+ RESTUtil.encodeString(ident.name()),
+ "plan");
+ }
+
+ public String fetchPlanningResult(TableIdentifier ident, String planId) {
+ return SLASH.join(
+ "v1",
+ prefix,
+ "namespaces",
+ RESTUtil.encodeNamespace(ident.namespace()),
+ "tables",
+ RESTUtil.encodeString(ident.name()),
+ "plan",
+ planId);
+ }
+
+ public String cancelPlan(TableIdentifier ident, String planId) {
+ return SLASH.join(
+ "v1",
+ prefix,
+ "namespaces",
+ RESTUtil.encodeNamespace(ident.namespace()),
+ "tables",
+ RESTUtil.encodeString(ident.name()),
+ "plan",
+ planId);
+ }
Review Comment:
These are the same paths no? Just different verbs
##########
core/src/main/java/org/apache/iceberg/ScanTasksIterable.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.ParserContext;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.requests.FetchScanTasksRequest;
+import org.apache.iceberg.rest.responses.FetchScanTasksResponse;
+import org.apache.iceberg.util.ParallelIterable;
+
+public class ScanTasksIterable implements CloseableIterable<FileScanTask> {
+ private final RESTClient client;
+ private final ResourcePaths resourcePaths;
+ private final TableIdentifier tableIdentifier;
+ private final Supplier<Map<String, String>> headers;
+ // parallelizing on this where a planTask produces a list of file scan
tasks, as
+ // well more planTasks.
+ private final String planTask;
+ private final ArrayDeque<FileScanTask> fileScanTasks;
+ private final ExecutorService executorService;
+ private final Map<Integer, PartitionSpec> specsById;
+ private final boolean caseSensitive;
+ private final Supplier<Boolean> cancellationCallback;
+
+ public ScanTasksIterable(
+ String planTask,
+ RESTClient client,
+ ResourcePaths resourcePaths,
+ TableIdentifier tableIdentifier,
+ Supplier<Map<String, String>> headers,
+ ExecutorService executorService,
+ Map<Integer, PartitionSpec> specsById,
+ boolean caseSensitive,
+ Supplier<Boolean> cancellationCallback) {
+ this.planTask = planTask;
+ this.fileScanTasks = null;
+ this.client = client;
+ this.resourcePaths = resourcePaths;
+ this.tableIdentifier = tableIdentifier;
+ this.headers = headers;
+ this.executorService = executorService;
+ this.specsById = specsById;
+ this.caseSensitive = caseSensitive;
+ this.cancellationCallback = cancellationCallback;
+ }
+
+ public ScanTasksIterable(
+ List<FileScanTask> fileScanTasks,
+ RESTClient client,
+ ResourcePaths resourcePaths,
+ TableIdentifier tableIdentifier,
+ Supplier<Map<String, String>> headers,
+ ExecutorService executorService,
+ Map<Integer, PartitionSpec> specsById,
+ boolean caseSensitive,
+ Supplier<Boolean> cancellationCallback) {
+ this.planTask = null;
+ this.fileScanTasks = new ArrayDeque<>(fileScanTasks);
+ this.client = client;
+ this.resourcePaths = resourcePaths;
+ this.tableIdentifier = tableIdentifier;
+ this.headers = headers;
+ this.executorService = executorService;
+ this.specsById = specsById;
+ this.caseSensitive = caseSensitive;
+ this.cancellationCallback = cancellationCallback;
+ }
+
+ @Override
+ public CloseableIterator<FileScanTask> iterator() {
+ return new ScanTasksIterator(
+ planTask,
+ fileScanTasks,
+ client,
+ resourcePaths,
+ tableIdentifier,
+ headers,
+ executorService,
+ specsById,
+ caseSensitive,
+ cancellationCallback);
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ private static class ScanTasksIterator implements
CloseableIterator<FileScanTask> {
+ private final RESTClient client;
+ private final ResourcePaths resourcePaths;
+ private final TableIdentifier tableIdentifier;
+ private final Supplier<Map<String, String>> headers;
+ private String planTask;
+ private final ArrayDeque<FileScanTask> fileScanTasks;
+ private final ExecutorService executorService;
+ private final Map<Integer, PartitionSpec> specsById;
+ private final boolean caseSensitive;
+ private final Supplier<Boolean> cancellationCallback;
+
+ ScanTasksIterator(
+ String planTask,
+ ArrayDeque<FileScanTask> fileScanTasks,
+ RESTClient client,
+ ResourcePaths resourcePaths,
+ TableIdentifier tableIdentifier,
+ Supplier<Map<String, String>> headers,
+ ExecutorService executorService,
+ Map<Integer, PartitionSpec> specsById,
+ boolean caseSensitive,
+ Supplier<Boolean> cancellationCallback) {
+ this.client = client;
+ this.resourcePaths = resourcePaths;
+ this.tableIdentifier = tableIdentifier;
+ this.headers = headers;
+ this.planTask = planTask;
+ this.fileScanTasks = fileScanTasks != null ? fileScanTasks : new
ArrayDeque<>();
+ this.executorService = executorService;
+ this.specsById = specsById;
+ this.caseSensitive = caseSensitive;
+ this.cancellationCallback = cancellationCallback;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!fileScanTasks.isEmpty()) {
+ // Have file scan tasks so continue to consume
+ return true;
+ }
+ // Out of file scan tasks, so need to now fetch more from each planTask
+ // Service can send back more planTasks which acts as pagination
+ if (planTask != null) {
+ fetchScanTasks(planTask);
+ planTask = null;
+ // Make another hasNext() call, as more fileScanTasks have been fetched
+ return hasNext();
+ }
+ // we have no file scan tasks left to consume
+ // so means we are finished
+ return false;
+ }
+
+ @Override
+ public FileScanTask next() {
+ return fileScanTasks.removeFirst();
+ }
+
+ private void fetchScanTasks(String withPlanTask) {
+ FetchScanTasksRequest fetchScanTasksRequest = new
FetchScanTasksRequest(withPlanTask);
+ // we need injectable values here
Review Comment:
Nit: remove these comments
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3026,4 +3079,316 @@ private static List<HTTPRequest>
allRequests(RESTCatalogAdapter adapter) {
verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(),
any());
return captor.getAllValues();
}
+
+ @Test
+ public void testCancelPlanWithNoActivePlan() {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK);
+
+ // Cast to RESTTable to access scan functionality
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ // Create a new scan and cast to RESTTableScan
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ RESTTableScan restTableScan = (RESTTableScan) scan;
+
+ // Calling cancel with no active plan should return false
+ boolean cancelled = restTableScan.cancelPlan();
+ assertThat(cancelled).isFalse();
+ }
+
+ @Test
+ public void testCancelPlanEndpointSupport() {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK);
+
+ // Verify that a RESTTable was created (indicating server-side planning is
enabled)
+ assertThat(table).isInstanceOf(RESTTable.class);
+
+ // Create scan from RESTTable
+ TableScan scan = table.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ RESTTableScan restTableScan = (RESTTableScan) scan;
+
+ // Test that cancelPlan method is available and returns false when no plan
is active
+ boolean cancelled = restTableScan.cancelPlan();
+ assertThat(cancelled).isFalse();
+ }
+
+ @Test
+ public void testCancelPlanMethodAvailability() {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK);
+ // Ensure we have a RESTTable with server-side planning enabled
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ RESTTableScan restTableScan = (RESTTableScan) scan;
Review Comment:
This probably repeats enough across the tests that it's worth putting in a
helper:
restTableScanFor(Table table) which asserts the instance of the table and
the scan type and returns the casted RESTTableScan.
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3026,4 +3079,316 @@ private static List<HTTPRequest>
allRequests(RESTCatalogAdapter adapter) {
verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(),
any());
return captor.getAllValues();
}
+
+ @Test
+ public void testCancelPlanWithNoActivePlan() {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK);
+
+ // Cast to RESTTable to access scan functionality
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ // Create a new scan and cast to RESTTableScan
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ RESTTableScan restTableScan = (RESTTableScan) scan;
+
+ // Calling cancel with no active plan should return false
+ boolean cancelled = restTableScan.cancelPlan();
+ assertThat(cancelled).isFalse();
+ }
+
+ @Test
+ public void testCancelPlanEndpointSupport() {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK);
+
+ // Verify that a RESTTable was created (indicating server-side planning is
enabled)
+ assertThat(table).isInstanceOf(RESTTable.class);
+
+ // Create scan from RESTTable
+ TableScan scan = table.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ RESTTableScan restTableScan = (RESTTableScan) scan;
+
+ // Test that cancelPlan method is available and returns false when no plan
is active
+ boolean cancelled = restTableScan.cancelPlan();
+ assertThat(cancelled).isFalse();
+ }
+
+ @Test
+ public void testCancelPlanMethodAvailability() {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK);
+ // Ensure we have a RESTTable with server-side planning enabled
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ RESTTableScan restTableScan = (RESTTableScan) scan;
+
+ // Test that cancelPlan method is available and callable
+ // When no plan is active, it should return false
+ boolean cancelled = restTableScan.cancelPlan();
+ assertThat(cancelled).isFalse();
+
+ // Verify the method exists and doesn't throw exceptions when called
multiple times
+ boolean cancelled2 = restTableScan.cancelPlan();
+ assertThat(cancelled2).isFalse();
+ }
+
+ @Test
+ public void testCancelPlanEndpointPath() {
+ TableIdentifier tableId = TableIdentifier.of("test_namespace",
"test_table");
+ String planId = "plan-abc-123";
+ ResourcePaths paths = new ResourcePaths("test-prefix");
+
+ // Test that the cancel plan path is generated correctly
+ String cancelPath = paths.cancelPlan(tableId, planId);
+
+ assertThat(cancelPath)
+
.isEqualTo("v1/test-prefix/namespaces/test_namespace/tables/test_table/plan/plan-abc-123");
+
+ // Test with different identifiers
+ TableIdentifier complexId = TableIdentifier.of(Namespace.of("db",
"schema"), "my_table");
+ String complexPath = paths.cancelPlan(complexId, "plan-xyz-789");
+
+ assertThat(complexPath).contains("/plan/plan-xyz-789");
+ assertThat(complexPath).contains("db%1Fschema"); // URL encoded namespace
separator
+ }
+
+ @Test
+ public void testIteratorCloseTriggersCancel() throws IOException {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK);
+
+ // Ensure we have a RESTTable with server-side planning enabled
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ boolean cancelled = isCancelled((RESTTableScan) scan);
+ assertThat(cancelled).isFalse(); // No active plan to cancel
+ }
+
+ private static boolean isCancelled(RESTTableScan scan) throws IOException {
+
+ // Get the iterable and iterator
+ CloseableIterable<FileScanTask> iterable = scan.planFiles();
+ CloseableIterator<FileScanTask> iterator = iterable.iterator();
+
+ // Verify we can close the iterator without exceptions
+ // The cancellation callback will be called (though no active plan exists)
+ iterator.close();
+
+ // Verify we can still call cancelPlan on the scan
+ return scan.cancelPlan();
+ }
+
+ @Test
+ public void testIterableCloseTriggersCancel() throws IOException {
+ Table table = createRESTTableAndInsertData(TABLE_COMPLETED_WITH_PLAN_TASK);
+
+ // Ensure we have a RESTTable with server-side planning enabled
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ RESTTableScan restTableScan = (RESTTableScan) scan;
+
+ // Get the iterable
+ CloseableIterable<FileScanTask> iterable = restTableScan.planFiles();
+
+ // Verify we can close the iterable without exceptions
+ // This tests that cancellation callbacks are properly wired through
+ iterable.close();
+
+ // Verify the scan is still functional
+ boolean cancelled = restTableScan.cancelPlan();
+ assertThat(cancelled).isFalse(); // No active plan to cancel
+ }
+
+ @Test
+ public void testRESTScanPlanningWithPositionDeletes() throws IOException {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_FILE_SCAN_TASK);
+
+ // Add position deletes that correspond to FILE_A (which was added in
+ // createRESTTableAndInsertData)
+ table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+
+ // Ensure we have a RESTTable with server-side planning enabled
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+
+ // Execute scan planning - should handle position deletes correctly
+ CloseableIterable<FileScanTask> iterable = scan.planFiles();
+ List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+ // Verify we get tasks back (specific count depends on implementation)
+ assertThat(tasks).isNotEmpty();
+
+ // Verify that delete files are properly handled in the scan tasks
+ boolean hasTasksWithDeletes = tasks.stream().anyMatch(task ->
!task.deletes().isEmpty());
+ assertThat(hasTasksWithDeletes).isTrue();
+ }
+
+ @Test
+ public void testRESTScanPlanningWithEqualityDeletes() throws IOException {
Review Comment:
Same for these tests, I feel like these tests could be stronger in asserting
the contents of the actual files rather than simple expectations on the
existence of delete files.
##########
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java:
##########
@@ -460,6 +468,11 @@ public Table loadTable(SessionContext context,
TableIdentifier identifier) {
trackFileIO(ops);
+ RESTTable restTable = getRemoteScanPlanningTable(ops, finalIdentifier,
tableClient);
+ if (restTable != null) {
+ return restTable;
+ }
Review Comment:
Apologies if I missed it but could we add a test to make sure metadata
tables still work as expected with remote planning? We can always refactor
later but I do want to make sure there's no incorrect behavior in case server
side planning is supported and a client does a metadata table scan.
##########
core/src/main/java/org/apache/iceberg/RESTTableScan.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableIterable;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.ParserContext;
+import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ParallelIterable;
+
+public class RESTTableScan extends DataTableScan {
+ private final RESTClient client;
+ private final String path;
+ private final Supplier<Map<String, String>> headers;
+ private final TableOperations operations;
+ private final Table table;
+ private final ResourcePaths resourcePaths;
+ private final TableIdentifier tableIdentifier;
+ private final ParserContext parserContext;
+
+ // Track the current plan ID for cancellation
+ private volatile String currentPlanId = null;
+
+ // TODO revisit if this property should be configurable
+ private static final int FETCH_PLANNING_SLEEP_DURATION_MS = 1000;
+ private static final long MAX_WAIT_TIME_MS = 5 * 60 * 1000L;
+
+ RESTTableScan(
+ Table table,
+ Schema schema,
+ TableScanContext context,
+ RESTClient client,
+ String path,
+ Supplier<Map<String, String>> headers,
+ TableOperations operations,
+ TableIdentifier tableIdentifier,
+ ResourcePaths resourcePaths) {
+ super(table, schema, context);
+ this.table = table;
+ this.client = client;
+ this.headers = headers;
+ this.path = path;
+ this.operations = operations;
+ this.tableIdentifier = tableIdentifier;
+ this.resourcePaths = resourcePaths;
+ this.parserContext =
+ ParserContext.builder()
+ .add("specsById", table.specs())
+ .add("caseSensitive", context().caseSensitive())
+ .build();
+ }
+
+ @Override
+ protected TableScan newRefinedScan(
+ Table refinedTable, Schema refinedSchema, TableScanContext
refinedContext) {
+ return new RESTTableScan(
+ refinedTable,
+ refinedSchema,
+ refinedContext,
+ client,
+ path,
+ headers,
+ operations,
+ tableIdentifier,
+ resourcePaths);
+ }
+
+ @Override
+ public CloseableIterable<FileScanTask> planFiles() {
+ Long startSnapshotId = context().fromSnapshotId();
+ Long endSnapshotId = context().toSnapshotId();
+ Long snapshotId = snapshotId();
+ List<String> selectedColumns =
+
schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList());
+
+ List<String> statsFields = null;
+ if (columnsToKeepStats() != null) {
+ statsFields =
+ columnsToKeepStats().stream()
+ .map(columnId -> schema().findColumnName(columnId))
+ .collect(Collectors.toList());
+ }
+
+ PlanTableScanRequest.Builder planTableScanRequestBuilder =
+ new PlanTableScanRequest.Builder()
+ .withSelect(selectedColumns)
+ .withFilter(filter())
+ .withCaseSensitive(isCaseSensitive())
+ .withStatsFields(statsFields);
+
+ if (startSnapshotId != null && endSnapshotId != null) {
+ planTableScanRequestBuilder
+ .withStartSnapshotId(startSnapshotId)
+ .withEndSnapshotId(endSnapshotId)
+ .withUseSnapshotSchema(true);
+
+ } else if (snapshotId != null) {
+ boolean useSnapShotSchema = snapshotId !=
table.currentSnapshot().snapshotId();
+ planTableScanRequestBuilder
+ .withSnapshotId(snapshotId)
+ .withUseSnapshotSchema(useSnapShotSchema);
+
+ } else {
+
planTableScanRequestBuilder.withSnapshotId(table().currentSnapshot().snapshotId());
+ }
+
+ return planTableScan(planTableScanRequestBuilder.build());
+ }
+
+ private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest
planTableScanRequest) {
+
+ PlanTableScanResponse response =
+ client.post(
+ resourcePaths.planTableScan(tableIdentifier),
+ planTableScanRequest,
+ PlanTableScanResponse.class,
+ headers.get(),
+ ErrorHandlers.defaultErrorHandler(),
+ stringStringMap -> {},
+ parserContext);
+
+ PlanStatus planStatus = response.planStatus();
+ switch (planStatus) {
+ case COMPLETED:
+ return getScanTasksIterable(response.planTasks(),
response.fileScanTasks());
+ case SUBMITTED:
+ return fetchPlanningResult(response.planId());
+ case FAILED:
+ throw new IllegalStateException(
+ "Received \"failed\" status from service when planning a table
scan");
+ case CANCELLED:
+ throw new IllegalStateException(
+ "Received \"cancelled\" status from service when planning a table
scan");
+ default:
+ throw new RuntimeException(
+ String.format("Invalid planStatus during planTableScan: %s",
planStatus));
+ }
+ }
+
+ private CloseableIterable<FileScanTask> fetchPlanningResult(String planId) {
+ // Set the current plan ID for potential cancellation
+ currentPlanId = planId;
+
+ try {
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime <= MAX_WAIT_TIME_MS) {
+ FetchPlanningResultResponse response =
+ client.get(
+ resourcePaths.fetchPlanningResult(tableIdentifier, planId),
+ Map.of(),
+ FetchPlanningResultResponse.class,
+ headers.get(),
+ ErrorHandlers.defaultErrorHandler(),
+ parserContext);
+
+ PlanStatus planStatus = response.planStatus();
+ switch (planStatus) {
+ case COMPLETED:
+ return getScanTasksIterable(response.planTasks(),
response.fileScanTasks());
+ case SUBMITTED:
+ try {
+ // TODO: if we want to add some jitter here to avoid thundering
herd.
+ Thread.sleep(FETCH_PLANNING_SLEEP_DURATION_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // Attempt to cancel the plan before exiting
+ cancelPlan();
+ throw new RuntimeException("Interrupted while fetching plan
status", e);
+ }
+ break;
+ case FAILED:
+ throw new IllegalStateException(
+ "Received \"failed\" status from service when fetching a table
scan");
+ case CANCELLED:
+ throw new IllegalStateException(
+ String.format(
+ Locale.ROOT,
+ "Received \"cancelled\" status from service when fetching
a table scan, planId: %s is invalid",
+ planId));
+ default:
+ throw new IllegalStateException(
+ String.format(
+ Locale.ROOT, "Invalid planStatus during
fetchPlanningResult: %s", planStatus));
+ }
+ }
+ // If we reach here, we've exceeded the max wait time
+ currentPlanId = null; // Clear on timeout
+ throw new IllegalStateException(
+ String.format(
+ Locale.ROOT,
+ "Exceeded max wait time of %d ms when fetching planning result",
+ MAX_WAIT_TIME_MS));
+ } catch (Exception e) {
+ // Clear the plan ID on any exception (except successful completion)
+ currentPlanId = null;
+ throw e;
+ }
+ }
+
+ public CloseableIterable<FileScanTask> getScanTasksIterable(
Review Comment:
Does this need to be public?
##########
core/src/main/java/org/apache/iceberg/ScanTasksIterable.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.ParserContext;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.requests.FetchScanTasksRequest;
+import org.apache.iceberg.rest.responses.FetchScanTasksResponse;
+import org.apache.iceberg.util.ParallelIterable;
+
+public class ScanTasksIterable implements CloseableIterable<FileScanTask> {
Review Comment:
Does this need to be public? If I'm not mistaken, it's only used in the
RESTTableScan in the same module
##########
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java:
##########
@@ -434,6 +469,169 @@ public <T extends RESTResponse> T handleRequest(
return castResponse(responseType, response);
}
+ case PLAN_TABLE_SCAN:
+ {
+ TableIdentifier ident = tableIdentFromPathVars(vars);
+ PlanTableScanRequest request =
castRequest(PlanTableScanRequest.class, body);
+ Table table = catalog.loadTable(ident);
+ TableScan tableScan = table.newScan();
+
+ if (request.snapshotId() != null) {
+ tableScan.useSnapshot(request.snapshotId());
+ }
+ if (request.select() != null) {
+ tableScan.select(request.select());
+ }
+ if (request.filter() != null) {
+ tableScan.filter(request.filter());
+ }
+ if (request.statsFields() != null) {
+ tableScan.includeColumnStats(request.statsFields());
+ }
+ tableScan.caseSensitive(request.caseSensitive());
+
+ List<FileScanTask> fileScanTasks = Lists.newArrayList();
+ CloseableIterable<FileScanTask> returnedTasks =
tableScan.planFiles();
+ returnedTasks.forEach(task -> fileScanTasks.add(task));
+
+ if (ident.equals(TABLE_COMPLETED_WITH_FILE_SCAN_TASK)) {
Review Comment:
I'm not really a fan of harcoded table identifiers which exhibit a behavior
in `RESTCatalogAdapter` especially considering we know people use this class as
a primitive when building simple catalogs out there.
I see how it's convenient for writing tests but I feel like we should have a
different mechanism since this change really is just for the tests we're adding.
Is it not possible to do something like RESTCatalog adapter only does
planning with depth when there are some N (some hardcoded integer value) number
of tasks? Then in the tests we setup the table with some amount of files based
on N and then just do planning, and assert the expected reponse contents that
way? This is also a bit more realistic in that it's based on the actual size
(albeit for tests we can keep it small).
It's a bit more complicated but I think I'd prefer that over having
hardcoded table names here.
##########
core/src/main/java/org/apache/iceberg/RESTTable.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+
+public class RESTTable extends BaseTable {
+ private final RESTClient client;
+ private final String path;
+ private final Supplier<Map<String, String>> headers;
+ private final MetricsReporter reporter;
+ private final ResourcePaths resourcePaths;
+ private final TableIdentifier tableIdentifier;
+
+ public RESTTable(
+ TableOperations ops,
+ String name,
+ MetricsReporter reporter,
+ RESTClient client,
+ String path,
+ Supplier<Map<String, String>> headers,
+ TableIdentifier tableIdentifier,
+ ResourcePaths resourcePaths) {
+ super(ops, name, reporter);
+ this.reporter = reporter;
+ this.client = client;
+ this.headers = headers;
+ this.path = path;
+ this.tableIdentifier = tableIdentifier;
+ this.resourcePaths = resourcePaths;
+ }
+
+ @Override
+ public TableScan newScan() {
+ // TODO when looking at ImmutableTableScanContext how do we ensure
Review Comment:
I don't quite understand this comment, don't things just work by the client
setting the desired snapshot which internally gets set in the context? What's
special about the REST case?
##########
core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java:
##########
@@ -460,6 +468,11 @@ public Table loadTable(SessionContext context,
TableIdentifier identifier) {
trackFileIO(ops);
+ RESTTable restTable = getRemoteScanPlanningTable(ops, finalIdentifier,
tableClient);
+ if (restTable != null) {
+ return restTable;
+ }
Review Comment:
I feel like we should double check we're getting the expected behavior out
of metadata tables in this change. The way it's currently implemented we're
never going to pass the RESTTable through to
MetadataTableUtils.createMetadataTableInstance. Of course in general for
security use cases typically nobody is going to have access to this but this
should still be implemented in a way that's compatible with supporting metadata
queries when remote planning is enabled.
I understand we're going to want to delegate the server side planning for
any metadata table queries but is the right change to just fully delegate to
RESTTable to take care of this or is the right change to allow metadata table
instances to wrap RESTTable as well, and make sure that queries get satisfied
through the remote planning offered by RESTTable.
##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -3026,4 +3079,316 @@ private static List<HTTPRequest>
allRequests(RESTCatalogAdapter adapter) {
verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(),
any());
return captor.getAllValues();
}
+
+ @Test
+ public void testCancelPlanWithNoActivePlan() {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK);
+
+ // Cast to RESTTable to access scan functionality
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ // Create a new scan and cast to RESTTableScan
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ RESTTableScan restTableScan = (RESTTableScan) scan;
+
+ // Calling cancel with no active plan should return false
+ boolean cancelled = restTableScan.cancelPlan();
+ assertThat(cancelled).isFalse();
+ }
+
+ @Test
+ public void testCancelPlanEndpointSupport() {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK);
+
+ // Verify that a RESTTable was created (indicating server-side planning is
enabled)
+ assertThat(table).isInstanceOf(RESTTable.class);
+
+ // Create scan from RESTTable
+ TableScan scan = table.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ RESTTableScan restTableScan = (RESTTableScan) scan;
+
+ // Test that cancelPlan method is available and returns false when no plan
is active
+ boolean cancelled = restTableScan.cancelPlan();
+ assertThat(cancelled).isFalse();
+ }
+
+ @Test
+ public void testCancelPlanMethodAvailability() {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK);
+ // Ensure we have a RESTTable with server-side planning enabled
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ RESTTableScan restTableScan = (RESTTableScan) scan;
+
+ // Test that cancelPlan method is available and callable
+ // When no plan is active, it should return false
+ boolean cancelled = restTableScan.cancelPlan();
+ assertThat(cancelled).isFalse();
+
+ // Verify the method exists and doesn't throw exceptions when called
multiple times
+ boolean cancelled2 = restTableScan.cancelPlan();
+ assertThat(cancelled2).isFalse();
+ }
+
+ @Test
+ public void testCancelPlanEndpointPath() {
+ TableIdentifier tableId = TableIdentifier.of("test_namespace",
"test_table");
+ String planId = "plan-abc-123";
+ ResourcePaths paths = new ResourcePaths("test-prefix");
+
+ // Test that the cancel plan path is generated correctly
+ String cancelPath = paths.cancelPlan(tableId, planId);
+
+ assertThat(cancelPath)
+
.isEqualTo("v1/test-prefix/namespaces/test_namespace/tables/test_table/plan/plan-abc-123");
+
+ // Test with different identifiers
+ TableIdentifier complexId = TableIdentifier.of(Namespace.of("db",
"schema"), "my_table");
+ String complexPath = paths.cancelPlan(complexId, "plan-xyz-789");
+
+ assertThat(complexPath).contains("/plan/plan-xyz-789");
+ assertThat(complexPath).contains("db%1Fschema"); // URL encoded namespace
separator
+ }
+
+ @Test
+ public void testIteratorCloseTriggersCancel() throws IOException {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_NESTED_PLAN_TASK);
+
+ // Ensure we have a RESTTable with server-side planning enabled
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ boolean cancelled = isCancelled((RESTTableScan) scan);
+ assertThat(cancelled).isFalse(); // No active plan to cancel
+ }
+
+ private static boolean isCancelled(RESTTableScan scan) throws IOException {
+
+ // Get the iterable and iterator
+ CloseableIterable<FileScanTask> iterable = scan.planFiles();
+ CloseableIterator<FileScanTask> iterator = iterable.iterator();
+
+ // Verify we can close the iterator without exceptions
+ // The cancellation callback will be called (though no active plan exists)
+ iterator.close();
+
+ // Verify we can still call cancelPlan on the scan
+ return scan.cancelPlan();
+ }
+
+ @Test
+ public void testIterableCloseTriggersCancel() throws IOException {
+ Table table = createRESTTableAndInsertData(TABLE_COMPLETED_WITH_PLAN_TASK);
+
+ // Ensure we have a RESTTable with server-side planning enabled
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+ RESTTableScan restTableScan = (RESTTableScan) scan;
+
+ // Get the iterable
+ CloseableIterable<FileScanTask> iterable = restTableScan.planFiles();
+
+ // Verify we can close the iterable without exceptions
+ // This tests that cancellation callbacks are properly wired through
+ iterable.close();
+
+ // Verify the scan is still functional
+ boolean cancelled = restTableScan.cancelPlan();
+ assertThat(cancelled).isFalse(); // No active plan to cancel
+ }
+
+ @Test
+ public void testRESTScanPlanningWithPositionDeletes() throws IOException {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_FILE_SCAN_TASK);
+
+ // Add position deletes that correspond to FILE_A (which was added in
+ // createRESTTableAndInsertData)
+ table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
+
+ // Ensure we have a RESTTable with server-side planning enabled
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+
+ // Execute scan planning - should handle position deletes correctly
+ CloseableIterable<FileScanTask> iterable = scan.planFiles();
+ List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+ // Verify we get tasks back (specific count depends on implementation)
+ assertThat(tasks).isNotEmpty();
+
+ // Verify that delete files are properly handled in the scan tasks
+ boolean hasTasksWithDeletes = tasks.stream().anyMatch(task ->
!task.deletes().isEmpty());
+ assertThat(hasTasksWithDeletes).isTrue();
+ }
+
+ @Test
+ public void testRESTScanPlanningWithEqualityDeletes() throws IOException {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_FILE_SCAN_TASK);
+
+ // Add equality deletes that correspond to FILE_A
+ table.newRowDelta().addDeletes(FILE_A_EQUALITY_DELETES).commit();
+
+ // Ensure we have a RESTTable with server-side planning enabled
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+
+ // Execute scan planning - should handle equality deletes correctly
+ CloseableIterable<FileScanTask> iterable = scan.planFiles();
+ List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+ // Verify we get tasks back
+ assertThat(tasks).isNotEmpty();
+
+ // Verify that equality delete files are properly handled
+ boolean hasTasksWithDeletes = tasks.stream().anyMatch(task ->
!task.deletes().isEmpty());
+ assertThat(hasTasksWithDeletes).isTrue();
+ }
+
+ @Test
+ public void testRESTScanPlanningWithMixedDeletes() throws IOException {
+ Table table =
createRESTTableAndInsertData(TABLE_COMPLETED_WITH_FILE_SCAN_TASK);
+
+ // Add both position and equality deletes in separate commits
+ table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); // Position
deletes for FILE_A
+ table
+ .newRowDelta()
+ .addDeletes(FILE_B_EQUALITY_DELETES)
+ .commit(); // Equality deletes for different partition
+
+ // Ensure we have a RESTTable with server-side planning enabled
+ assertThat(table).isInstanceOf(RESTTable.class);
+ RESTTable restTable = (RESTTable) table;
+
+ TableScan scan = restTable.newScan();
+ assertThat(scan).isInstanceOf(RESTTableScan.class);
+
+ // Execute scan planning - should handle mixed delete types correctly
+ CloseableIterable<FileScanTask> iterable = scan.planFiles();
+ List<FileScanTask> tasks = Lists.newArrayList(iterable);
+
+ // Verify we get tasks back
+ assertThat(tasks).isNotEmpty();
+
+ // Verify scan planning succeeds with mixed delete types
+ boolean hasTasksWithDeletes = tasks.stream().anyMatch(task ->
!task.deletes().isEmpty());
+ assertThat(hasTasksWithDeletes).isTrue();
Review Comment:
Test hardness: We should make sure the task contents are what we expect, the
exact data/delete files....
##########
core/src/main/java/org/apache/iceberg/RESTTable.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.metrics.MetricsReporter;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+
+public class RESTTable extends BaseTable {
+ private final RESTClient client;
+ private final String path;
+ private final Supplier<Map<String, String>> headers;
+ private final MetricsReporter reporter;
+ private final ResourcePaths resourcePaths;
+ private final TableIdentifier tableIdentifier;
+
+ public RESTTable(
+ TableOperations ops,
+ String name,
+ MetricsReporter reporter,
+ RESTClient client,
+ String path,
+ Supplier<Map<String, String>> headers,
+ TableIdentifier tableIdentifier,
+ ResourcePaths resourcePaths) {
+ super(ops, name, reporter);
+ this.reporter = reporter;
+ this.client = client;
+ this.headers = headers;
+ this.path = path;
+ this.tableIdentifier = tableIdentifier;
+ this.resourcePaths = resourcePaths;
+ }
+
+ @Override
+ public TableScan newScan() {
+ // TODO when looking at ImmutableTableScanContext how do we ensure
Review Comment:
Speaking of this, could we add some tests around time travel?
##########
core/src/main/java/org/apache/iceberg/RESTTableScan.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableIterable;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.ParserContext;
+import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.ParallelIterable;
+
+public class RESTTableScan extends DataTableScan {
+ private final RESTClient client;
+ private final String path;
+ private final Supplier<Map<String, String>> headers;
+ private final TableOperations operations;
+ private final Table table;
+ private final ResourcePaths resourcePaths;
+ private final TableIdentifier tableIdentifier;
+ private final ParserContext parserContext;
+
+ // Track the current plan ID for cancellation
+ private volatile String currentPlanId = null;
+
+ // TODO revisit if this property should be configurable
+ private static final int FETCH_PLANNING_SLEEP_DURATION_MS = 1000;
+ private static final long MAX_WAIT_TIME_MS = 5 * 60 * 1000L;
+
+ RESTTableScan(
+ Table table,
+ Schema schema,
+ TableScanContext context,
+ RESTClient client,
+ String path,
+ Supplier<Map<String, String>> headers,
+ TableOperations operations,
+ TableIdentifier tableIdentifier,
+ ResourcePaths resourcePaths) {
+ super(table, schema, context);
+ this.table = table;
+ this.client = client;
+ this.headers = headers;
+ this.path = path;
+ this.operations = operations;
+ this.tableIdentifier = tableIdentifier;
+ this.resourcePaths = resourcePaths;
+ this.parserContext =
+ ParserContext.builder()
+ .add("specsById", table.specs())
+ .add("caseSensitive", context().caseSensitive())
+ .build();
+ }
+
+ @Override
+ protected TableScan newRefinedScan(
+ Table refinedTable, Schema refinedSchema, TableScanContext
refinedContext) {
+ return new RESTTableScan(
+ refinedTable,
+ refinedSchema,
+ refinedContext,
+ client,
+ path,
+ headers,
+ operations,
+ tableIdentifier,
+ resourcePaths);
+ }
+
+ @Override
+ public CloseableIterable<FileScanTask> planFiles() {
+ Long startSnapshotId = context().fromSnapshotId();
+ Long endSnapshotId = context().toSnapshotId();
+ Long snapshotId = snapshotId();
+ List<String> selectedColumns =
+
schema().columns().stream().map(Types.NestedField::name).collect(Collectors.toList());
+
+ List<String> statsFields = null;
+ if (columnsToKeepStats() != null) {
+ statsFields =
+ columnsToKeepStats().stream()
+ .map(columnId -> schema().findColumnName(columnId))
+ .collect(Collectors.toList());
+ }
+
+ PlanTableScanRequest.Builder planTableScanRequestBuilder =
+ new PlanTableScanRequest.Builder()
+ .withSelect(selectedColumns)
+ .withFilter(filter())
+ .withCaseSensitive(isCaseSensitive())
+ .withStatsFields(statsFields);
+
+ if (startSnapshotId != null && endSnapshotId != null) {
+ planTableScanRequestBuilder
+ .withStartSnapshotId(startSnapshotId)
+ .withEndSnapshotId(endSnapshotId)
+ .withUseSnapshotSchema(true);
+
+ } else if (snapshotId != null) {
+ boolean useSnapShotSchema = snapshotId !=
table.currentSnapshot().snapshotId();
+ planTableScanRequestBuilder
+ .withSnapshotId(snapshotId)
+ .withUseSnapshotSchema(useSnapShotSchema);
+
+ } else {
+
planTableScanRequestBuilder.withSnapshotId(table().currentSnapshot().snapshotId());
+ }
+
+ return planTableScan(planTableScanRequestBuilder.build());
+ }
+
+ private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest
planTableScanRequest) {
+
+ PlanTableScanResponse response =
+ client.post(
+ resourcePaths.planTableScan(tableIdentifier),
+ planTableScanRequest,
+ PlanTableScanResponse.class,
+ headers.get(),
+ ErrorHandlers.defaultErrorHandler(),
+ stringStringMap -> {},
+ parserContext);
+
+ PlanStatus planStatus = response.planStatus();
+ switch (planStatus) {
+ case COMPLETED:
+ return getScanTasksIterable(response.planTasks(),
response.fileScanTasks());
+ case SUBMITTED:
+ return fetchPlanningResult(response.planId());
+ case FAILED:
+ throw new IllegalStateException(
+ "Received \"failed\" status from service when planning a table
scan");
+ case CANCELLED:
+ throw new IllegalStateException(
+ "Received \"cancelled\" status from service when planning a table
scan");
+ default:
+ throw new RuntimeException(
+ String.format("Invalid planStatus during planTableScan: %s",
planStatus));
+ }
+ }
+
+ private CloseableIterable<FileScanTask> fetchPlanningResult(String planId) {
+ // Set the current plan ID for potential cancellation
+ currentPlanId = planId;
+
+ try {
+ long startTime = System.currentTimeMillis();
+ while (System.currentTimeMillis() - startTime <= MAX_WAIT_TIME_MS) {
+ FetchPlanningResultResponse response =
+ client.get(
+ resourcePaths.fetchPlanningResult(tableIdentifier, planId),
+ Map.of(),
+ FetchPlanningResultResponse.class,
+ headers.get(),
+ ErrorHandlers.defaultErrorHandler(),
+ parserContext);
+
+ PlanStatus planStatus = response.planStatus();
+ switch (planStatus) {
+ case COMPLETED:
+ return getScanTasksIterable(response.planTasks(),
response.fileScanTasks());
+ case SUBMITTED:
+ try {
+ // TODO: if we want to add some jitter here to avoid thundering
herd.
+ Thread.sleep(FETCH_PLANNING_SLEEP_DURATION_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // Attempt to cancel the plan before exiting
+ cancelPlan();
+ throw new RuntimeException("Interrupted while fetching plan
status", e);
+ }
+ break;
+ case FAILED:
+ throw new IllegalStateException(
+ "Received \"failed\" status from service when fetching a table
scan");
+ case CANCELLED:
+ throw new IllegalStateException(
+ String.format(
+ Locale.ROOT,
+ "Received \"cancelled\" status from service when fetching
a table scan, planId: %s is invalid",
+ planId));
+ default:
+ throw new IllegalStateException(
+ String.format(
+ Locale.ROOT, "Invalid planStatus during
fetchPlanningResult: %s", planStatus));
+ }
+ }
+ // If we reach here, we've exceeded the max wait time
+ currentPlanId = null; // Clear on timeout
+ throw new IllegalStateException(
+ String.format(
+ Locale.ROOT,
+ "Exceeded max wait time of %d ms when fetching planning result",
+ MAX_WAIT_TIME_MS));
+ } catch (Exception e) {
+ // Clear the plan ID on any exception (except successful completion)
+ currentPlanId = null;
+ throw e;
+ }
+ }
+
+ public CloseableIterable<FileScanTask> getScanTasksIterable(
+ List<String> planTasks, List<FileScanTask> fileScanTasks) {
+ List<ScanTasksIterable> iterableOfScanTaskIterables = Lists.newArrayList();
+ if (fileScanTasks != null) {
+ // add this to the list for below if planTasks will also be present
+ ScanTasksIterable scanTasksIterable =
+ new ScanTasksIterable(
+ fileScanTasks,
+ client,
+ resourcePaths,
+ tableIdentifier,
+ headers,
+ planExecutor(),
+ table.specs(),
+ isCaseSensitive(),
+ this::cancelPlan);
Review Comment:
I don't quite follow, why do we need to pass in a callback for cancellation?
We're already passing the client into the iterable, could we not just cancel a
plan inside the close of the iterable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]