amogh-jahagirdar commented on code in PR #13400: URL: https://github.com/apache/iceberg/pull/13400#discussion_r2582830626
########## core/src/main/java/org/apache/iceberg/rest/ScanTasksIterable.java: ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.rest.requests.FetchScanTasksRequest; +import org.apache.iceberg.rest.responses.FetchScanTasksResponse; +import org.apache.iceberg.util.ThreadPools; + +class ScanTasksIterable implements CloseableIterable<FileScanTask> { + + private static final int DEFAULT_TASK_QUEUE_CAPACITY = 1000; + private static final long QUEUE_POLL_TIMEOUT_MS = 100; + private static final int WORKER_POOL_SIZE = Math.max(1, ThreadPools.WORKER_THREAD_POOL_SIZE / 4); + private final BlockingQueue<FileScanTask> taskQueue; + private final ConcurrentLinkedQueue<String> planTasks; + private final AtomicInteger activeWorkers = new AtomicInteger(0); + private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final ExecutorService executorService; + private final RESTClient client; + private final ResourcePaths resourcePaths; + private final TableIdentifier tableIdentifier; + private final Map<String, String> headers; + private final ParserContext parserContext; + private final Supplier<Boolean> cancellationCallback; + + ScanTasksIterable( + List<String> initialPlanTasks, + List<FileScanTask> initialFileScanTasks, + RESTClient client, + ResourcePaths resourcePaths, + TableIdentifier tableIdentifier, + Map<String, String> headers, + ExecutorService executorService, + Map<Integer, PartitionSpec> specsById, + boolean caseSensitive, + Supplier<Boolean> cancellationCallback) { + + this.taskQueue = new LinkedBlockingQueue<>(DEFAULT_TASK_QUEUE_CAPACITY); + this.planTasks = new ConcurrentLinkedQueue<>(); + + this.client = client; + this.resourcePaths = resourcePaths; + this.tableIdentifier = tableIdentifier; + this.headers = headers; + this.executorService = executorService; + this.cancellationCallback = cancellationCallback; + this.parserContext = + ParserContext.builder() + .add("specsById", specsById) + .add("caseSensitive", caseSensitive) + .build(); + + if (initialFileScanTasks != null && !initialFileScanTasks.isEmpty()) { + for (FileScanTask task : initialFileScanTasks) { + try { + taskQueue.put(task); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while adding initial tasks", e); + } + } + } + + if (initialPlanTasks != null && !initialPlanTasks.isEmpty()) { + planTasks.addAll(initialPlanTasks); + } + + submitFixedWorkers(); + } + + private void submitFixedWorkers() { + if (planTasks.isEmpty()) { + return; + } + + int numWorkers = Math.min(WORKER_POOL_SIZE, planTasks.size()); + + for (int i = 0; i < numWorkers; i++) { + executorService.execute(new PlanTaskWorker()); + } + } + + @Override + public CloseableIterator<FileScanTask> iterator() { + return new ScanTasksIterator(); + } + + @Override + public void close() throws IOException {} + + private class PlanTaskWorker implements Runnable { + + @Override + public void run() { + activeWorkers.incrementAndGet(); + + try { + while (true) { + if (shutdown.get()) { + return; + } + + String planTask = planTasks.poll(); + if (planTask == null) { + return; + } + + processPlanTask(planTask); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + throw new RuntimeException("Worker failed processing planTask", e); + } finally { + int remaining = activeWorkers.decrementAndGet(); + + if (remaining == 0 && !planTasks.isEmpty() && !shutdown.get()) { + executorService.execute(new PlanTaskWorker()); + } + } + } + + private void processPlanTask(String planTask) throws InterruptedException { + FetchScanTasksResponse response = fetchScanTasks(planTask); + + if (response.fileScanTasks() != null) { + for (FileScanTask task : response.fileScanTasks()) { + if (shutdown.get()) { + return; + } + taskQueue.put(task); + } + } + + if (response.planTasks() != null && !response.planTasks().isEmpty()) { + planTasks.addAll(response.planTasks()); + } + } + + private FetchScanTasksResponse fetchScanTasks(String planTask) { + FetchScanTasksRequest request = new FetchScanTasksRequest(planTask); + + return client.post( + resourcePaths.fetchScanTasks(tableIdentifier), + request, + FetchScanTasksResponse.class, + headers, + ErrorHandlers.defaultErrorHandler(), + stringStringMap -> {}, + parserContext); + } + } + + private class ScanTasksIterator implements CloseableIterator<FileScanTask> { + private FileScanTask nextTask = null; + + @Override + public boolean hasNext() { + if (nextTask != null) { + return true; + } + + while (true) { + if (isDone()) { + return false; + } + + try { + nextTask = taskQueue.poll(QUEUE_POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + if (nextTask != null) { + return true; + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + } + + @Override + public FileScanTask next() { + if (!hasNext()) { + throw new NoSuchElementException("No more scan tasks available"); + } + FileScanTask result = nextTask; + nextTask = null; + return result; + } + + @Override + public void close() throws IOException { + shutdown.set(true); + + if (cancellationCallback != null) { + try { + @SuppressWarnings("unused") + Boolean ignored = cancellationCallback.get(); + } catch (Exception e) { + // Ignore cancellation failures + } + } + + taskQueue.clear(); + planTasks.clear(); Review Comment: Minor: If there's still elements in the queue or in the planTasks list, do we want to log how many, at the time of close? In the case of a limit expressed by a client, the size of the remainder of these structures effectively represents how much "extra" task fetching was done. ########## core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java: ########## @@ -0,0 +1,871 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.apache.iceberg.catalog.CatalogTests.FILE_A; +import static org.apache.iceberg.catalog.CatalogTests.FILE_A_DELETES; +import static org.apache.iceberg.catalog.CatalogTests.FILE_A_EQUALITY_DELETES; +import static org.apache.iceberg.catalog.CatalogTests.FILE_B; +import static org.apache.iceberg.catalog.CatalogTests.FILE_B_DELETES; +import static org.apache.iceberg.catalog.CatalogTests.FILE_B_EQUALITY_DELETES; +import static org.apache.iceberg.catalog.CatalogTests.FILE_C; +import static org.apache.iceberg.catalog.CatalogTests.FILE_C_EQUALITY_DELETES; +import static org.apache.iceberg.catalog.CatalogTests.SCHEMA; +import static org.apache.iceberg.catalog.CatalogTests.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.Scan; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.Mockito; + +public class TestRESTScanPlanning { + protected static final Namespace NS = Namespace.of("ns"); + + @TempDir protected Path temp; + + private final RESTCatalogTestInfrastructure infrastructure = new RESTCatalogTestInfrastructure(); + private InMemoryCatalog backendCatalog; + private RESTCatalogAdapter adapterForRESTServer; + + // Scan-planning-specific fields + private RESTCatalog restCatalogWithScanPlanning; + + @BeforeEach + public void setupCatalogs() throws Exception { + infrastructure.before(temp); + this.backendCatalog = infrastructure.backendCatalog(); + this.adapterForRESTServer = infrastructure.adapter(); + + // Initialize catalog with scan planning enabled + this.restCatalogWithScanPlanning = + initCatalog( + "prod-with-scan-planning", + java.util.Map.of(RESTCatalogProperties.REST_SCAN_PLANNING_ENABLED, "true")); + } + + @AfterEach + public void teardownCatalogs() throws Exception { + if (restCatalogWithScanPlanning != null) { + restCatalogWithScanPlanning.close(); + } + infrastructure.after(); + } + + // ==================== Helper Methods ==================== + + private RESTCatalog initCatalog(String catalogName, Map<String, String> additionalProperties) { + return infrastructure.initCatalog(catalogName, additionalProperties); + } + + private boolean requiresNamespaceCreate() { + return true; + } + + private void setParserContext(org.apache.iceberg.Table table) { + infrastructure.setParserContext(table); + } + + private RESTCatalog scanPlanningCatalog() { + return restCatalogWithScanPlanning; + } + + private void configurePlanningBehavior( + Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> configurator) { + TestPlanningBehavior.Builder builder = TestPlanningBehavior.builder(); + adapterForRESTServer.setPlanningBehavior(configurator.apply(builder).build()); + } + + private Table createTableWithScanPlanning(RESTCatalog catalog, String tableName) { + return createTableWithScanPlanning(catalog, TableIdentifier.of(NS, tableName)); + } + + private Table createTableWithScanPlanning(RESTCatalog catalog, TableIdentifier identifier) { + if (requiresNamespaceCreate()) { + catalog.createNamespace(identifier.namespace()); + } + + return catalog.buildTable(identifier, SCHEMA).withPartitionSpec(SPEC).create(); + } + + private RESTTable restTableFor(RESTCatalog catalog, String tableName) { + Table table = createTableWithScanPlanning(catalog, tableName); + table.newAppend().appendFile(FILE_A).commit(); + assertThat(table).isInstanceOf(RESTTable.class); + return (RESTTable) table; + } + + private RESTTableScan restTableScanFor(Table table) { + assertThat(table).isInstanceOf(RESTTable.class); + RESTTable restTable = (RESTTable) table; + TableScan scan = restTable.newScan(); + assertThat(scan).isInstanceOf(RESTTableScan.class); + return (RESTTableScan) scan; + } + + // ==================== Test Planning Behavior ==================== + + /** Enum for parameterized tests to test both synchronous and asynchronous planning modes. */ + enum PlanningMode + implements Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> { + SYNCHRONOUS(TestPlanningBehavior.Builder::synchronous), + ASYNCHRONOUS(TestPlanningBehavior.Builder::asynchronous); + + private final Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> configurer; + + PlanningMode(Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> configurer) { + this.configurer = configurer; + } + + @Override + public TestPlanningBehavior.Builder apply(TestPlanningBehavior.Builder builder) { + return this.configurer.apply(builder); + } + } + + private static class TestPlanningBehavior implements RESTCatalogAdapter.PlanningBehavior { + private final boolean asyncPlanning; + private final int tasksPerPage; + + private TestPlanningBehavior(boolean asyncPlanning, int tasksPerPage) { + this.asyncPlanning = asyncPlanning; + this.tasksPerPage = tasksPerPage; + } + + static Builder builder() { + return new Builder(); + } + + @Override + public boolean shouldPlanTableScanAsync(Scan<?, FileScanTask, ?> scan) { + return asyncPlanning; + } + + @Override + public int numberFileScanTasksPerPlanTask() { + return tasksPerPage; + } + + protected static class Builder { + private boolean asyncPlanning; + private int tasksPerPage; + + Builder asyncPlanning(boolean async) { + asyncPlanning = async; + return this; + } + + Builder tasksPerPage(int tasks) { + tasksPerPage = tasks; + return this; + } + + // Convenience methods for common test scenarios + Builder synchronous() { + return asyncPlanning(false).tasksPerPage(100); + } + + Builder synchronousWithPagination() { + return asyncPlanning(false).tasksPerPage(1); + } + + Builder asynchronous() { + return asyncPlanning(true).tasksPerPage(100); + } + + TestPlanningBehavior build() { + return new TestPlanningBehavior(asyncPlanning, tasksPerPage); + } + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + public void scanPlanningWithAllTasksInSingleResponse() throws IOException { + Table table = restTableFor(scanPlanningCatalog(), "all_tasks_table"); + setParserContext(table); + + // Verify actual data file is returned with correct count + try (CloseableIterable<FileScanTask> iterable = table.newScan().planFiles()) { + List<FileScanTask> tasks = Lists.newArrayList(iterable); + + assertThat(tasks).hasSize(1); // 1 data file: FILE_A + assertThat(tasks.get(0).file().location()).isEqualTo(FILE_A.location()); + assertThat(tasks.get(0).deletes()).isEmpty(); // 0 delete files + } + } + + @Test + public void nestedPlanTaskPagination() throws IOException { + // Configure: synchronous planning with very small pages (creates nested plan task structure) + configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination); + + Table table = restTableFor(scanPlanningCatalog(), "nested_plan_task_table"); + // add one more files for proper pagination + table.newFastAppend().appendFile(FILE_B).commit(); + setParserContext(table); + + // Verify actual data file is returned via nested plan task fetching with correct count + try (CloseableIterable<FileScanTask> iterable = table.newScan().planFiles()) { + List<FileScanTask> tasks = Lists.newArrayList(iterable); + assertThat(tasks).hasSize(2); + assertThat(tasks) + .anySatisfy(task -> assertThat(task.file().location()).isEqualTo(FILE_A.location())); + assertThat(tasks) + .anySatisfy(task -> assertThat(task.file().location()).isEqualTo(FILE_B.location())); + assertThat(tasks.get(0).deletes()).isEmpty(); + assertThat(tasks.get(1).deletes()).isEmpty(); + } + } + + @Test + public void cancelPlanMethodAvailability() { + configurePlanningBehavior(TestPlanningBehavior.Builder::synchronousWithPagination); + RESTTable table = restTableFor(scanPlanningCatalog(), "cancel_method_table"); + RESTTableScan restTableScan = restTableScanFor(table); + + // Test that cancelPlan method is available and callable + // When no plan is active, it should return false + assertThat(restTableScan.cancelPlan()).isFalse(); + + // Verify the method exists and doesn't throw exceptions when called multiple times + assertThat(restTableScan.cancelPlan()).isFalse(); + } + + @Test + public void iterableCloseTriggersCancel() throws IOException { + configurePlanningBehavior(TestPlanningBehavior.Builder::asynchronous); + RESTTable restTable = restTableFor(scanPlanningCatalog(), "iterable_close_test"); + setParserContext(restTable); + + TableScan scan = restTable.newScan(); + assertThat(scan).isInstanceOf(RESTTableScan.class); + RESTTableScan restTableScan = (RESTTableScan) scan; + + // Get the iterable + CloseableIterable<FileScanTask> iterable = restTableScan.planFiles(); + + // call cancelPlan before closing the iterable + boolean cancelled = restTableScan.cancelPlan(); + assertThat(cancelled).isTrue(); + + // Verify we can close the iterable without exceptions + // This tests that cancellation callbacks are properly wired through + iterable.close(); + } + + @ParameterizedTest + @EnumSource(MetadataTableType.class) + public void metadataTablesWithRemotePlanning(MetadataTableType type) { + assumeThat(type) + .as("POSITION_DELETES table does not implement newScan() method") + .isNotEqualTo(MetadataTableType.POSITION_DELETES); + + configurePlanningBehavior(TestPlanningBehavior.Builder::synchronous); + RESTTable table = restTableFor(scanPlanningCatalog(), "metadata_tables_test"); + table.newAppend().appendFile(FILE_B).commit(); + table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_B_EQUALITY_DELETES).commit(); + setParserContext(table); + Table metadataTableInstance = MetadataTableUtils.createMetadataTableInstance(table, type); + assertThat(metadataTableInstance).isNotNull(); + + TableScan metadataTableScan = metadataTableInstance.newScan(); + CloseableIterable<FileScanTask> metadataTableIterable = metadataTableScan.planFiles(); + List<FileScanTask> tasks = Lists.newArrayList(metadataTableIterable); + assertThat(tasks).isNotEmpty(); + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithEmptyTable( + Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = createTableWithScanPlanning(scanPlanningCatalog(), "empty_table_test"); + setParserContext(table); + + // Execute scan planning on empty table + try (CloseableIterable<FileScanTask> iterable = table.newScan().planFiles()) { + List<FileScanTask> tasks = Lists.newArrayList(iterable); + + // Verify no tasks are returned for empty table + assertThat(tasks).isEmpty(); + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + @Disabled("Pruning files based on columns is not yet supported in REST scan planning") + void remoteScanPlanningWithNonExistentColumn( + Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "non-existent_column"); + setParserContext(table); + + try (CloseableIterable<FileScanTask> iterable = + table.newScan().select("non-existent-column").planFiles()) { + List<FileScanTask> tasks = Lists.newArrayList(iterable); + assertThat(tasks).isEmpty(); + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void incrementalScan( + Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "incremental_scan"); + setParserContext(table); + + // Add second file to the table + table.newAppend().appendFile(FILE_B).commit(); + long startSnapshotId = table.currentSnapshot().snapshotId(); + // Add third file to the table + table.newAppend().appendFile(FILE_C).commit(); + long endSnapshotId = table.currentSnapshot().snapshotId(); + try (CloseableIterable<FileScanTask> iterable = + table + .newIncrementalAppendScan() + .fromSnapshotInclusive(startSnapshotId) + .toSnapshot(endSnapshotId) + .planFiles()) { + List<FileScanTask> tasks = Lists.newArrayList(iterable); + assertThat(tasks).hasSize(2); // FILE_B and FILE_C + assertThat(tasks) + .extracting(task -> task.file().location()) + .contains(FILE_C.location(), FILE_B.location()); + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithPositionDeletes( + Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "position_deletes_test"); + setParserContext(table); + + // Add position deletes that correspond to FILE_A (which was added in table creation) + table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); + + // Ensure we have a RESTTable with server-side planning enabled + assertThat(table).isInstanceOf(RESTTable.class); + + // Execute scan planning - should handle position deletes correctly + try (CloseableIterable<FileScanTask> iterable = table.newScan().planFiles()) { + List<FileScanTask> tasks = Lists.newArrayList(iterable); + + // Verify we get tasks back (specific count depends on implementation) + assertThat(tasks).hasSize(1); // 1 data file: FILE_A + + // Verify specific task content and delete file associations + FileScanTask taskWithDeletes = + assertThat(tasks) + .filteredOn(task -> !task.deletes().isEmpty()) + .first() + .as("Expected at least one task with delete files") + .actual(); + + assertThat(taskWithDeletes.file().location()).isEqualTo(FILE_A.location()); + assertThat(taskWithDeletes.deletes()).hasSize(1); // 1 delete file: FILE_A_DELETES + assertThat(taskWithDeletes.deletes().get(0).location()).isEqualTo(FILE_A_DELETES.location()); + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithEqualityDeletes( + Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "equality_deletes_test"); + setParserContext(table); + + // Add equality deletes that correspond to FILE_A + table.newRowDelta().addDeletes(FILE_A_EQUALITY_DELETES).commit(); + + // Execute scan planning - should handle equality deletes correctly + try (CloseableIterable<FileScanTask> iterable = table.newScan().planFiles()) { + List<FileScanTask> tasks = Lists.newArrayList(iterable); + + // Verify the task count and file paths + assertThat(tasks).hasSize(1); // 1 data file: FILE_A + + // Verify specific task content and equality delete file associations + FileScanTask taskWithDeletes = + assertThat(tasks) + .filteredOn(task -> !task.deletes().isEmpty()) + .first() + .as("Expected at least one task with delete files") + .actual(); + + assertThat(taskWithDeletes.file().location()).isEqualTo(FILE_A.location()); + assertThat(taskWithDeletes.deletes()).hasSize(1); // 1 delete file: FILE_A_EQUALITY_DELETES + assertThat(taskWithDeletes.deletes().get(0).location()) + .isEqualTo(FILE_A_EQUALITY_DELETES.location()); + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithMixedDeletes( + Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "mixed_deletes_test"); + setParserContext(table); + + // Add both position and equality deletes in separate commits + table.newRowDelta().addDeletes(FILE_A_DELETES).commit(); // Position deletes for FILE_A + table + .newRowDelta() + .addDeletes(FILE_B_EQUALITY_DELETES) + .commit(); // Equality deletes for different partition + + // Execute scan planning - should handle mixed delete types correctly + try (CloseableIterable<FileScanTask> iterable = table.newScan().planFiles()) { + List<FileScanTask> tasks = Lists.newArrayList(iterable); + + // Verify task count: FILE_A only (FILE_B_EQUALITY_DELETES is in different partition) + assertThat(tasks).hasSize(1); // 1 data file: FILE_A + + // Verify FILE_A with position deletes (FILE_B_EQUALITY_DELETES not associated since no + // FILE_B) + FileScanTask fileATask = + assertThat(tasks) + .filteredOn(task -> task.file().location().equals(FILE_A.location())) + .first() + .as("Expected FILE_A in scan tasks") + .actual(); + + assertThat(fileATask.deletes()) + .hasSize(1); // 1 delete file: FILE_A_DELETES (FILE_B_EQUALITY_DELETES not matched) + assertThat(fileATask.deletes().get(0).location()).isEqualTo(FILE_A_DELETES.location()); + } + } + + @ParameterizedTest + @EnumSource(PlanningMode.class) + void remoteScanPlanningWithMultipleDeleteFiles( + Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> planMode) + throws IOException { + configurePlanningBehavior(planMode); + Table table = restTableFor(scanPlanningCatalog(), "multiple_deletes_test"); + setParserContext(table); + + // Add FILE_B and FILE_C to the table (FILE_A is already added during table creation) + table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit(); + + // Add multiple delete files corresponding to FILE_A, FILE_B, FILE_C + table + .newRowDelta() + .addDeletes(FILE_A_DELETES) // Position delete for FILE_A + .addDeletes(FILE_B_DELETES) // Position delete for FILE_B + .addDeletes(FILE_C_EQUALITY_DELETES) // Equality delete for FILE_C Review Comment: Minor: Could we go through some of these comments in the test and clean up the ones that are a bit redundant/obvious given the naming? It creates a bit more noise than needed. ########## core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java: ########## @@ -0,0 +1,871 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.apache.iceberg.catalog.CatalogTests.FILE_A; +import static org.apache.iceberg.catalog.CatalogTests.FILE_A_DELETES; +import static org.apache.iceberg.catalog.CatalogTests.FILE_A_EQUALITY_DELETES; +import static org.apache.iceberg.catalog.CatalogTests.FILE_B; +import static org.apache.iceberg.catalog.CatalogTests.FILE_B_DELETES; +import static org.apache.iceberg.catalog.CatalogTests.FILE_B_EQUALITY_DELETES; +import static org.apache.iceberg.catalog.CatalogTests.FILE_C; +import static org.apache.iceberg.catalog.CatalogTests.FILE_C_EQUALITY_DELETES; +import static org.apache.iceberg.catalog.CatalogTests.SCHEMA; +import static org.apache.iceberg.catalog.CatalogTests.SPEC; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.Scan; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.Mockito; + +public class TestRESTScanPlanning { Review Comment: Sorry if I missed this, do we have a test where files are produced under different partition specs? ########## core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java: ########## @@ -594,7 +594,11 @@ default boolean shouldPlanTableScanAsync(TableScan tableScan) { } protected PlanningBehavior planningBehavior() { - return new PlanningBehavior() {}; + return this.planningBehavior == null ? new PlanningBehavior() {} : planningBehavior; + } + + protected void setPlanningBehavior(PlanningBehavior behavior) { + this.planningBehavior = behavior; Review Comment: I think this is fine for now, we can address in a follow on. Not a blocker, just a nit of mine. ########## core/src/test/java/org/apache/iceberg/rest/RESTCatalogTestInfrastructure.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.rest; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import java.io.File; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.file.Path; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.SessionCatalog; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.rest.responses.ErrorResponse; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.gzip.GzipHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.mockito.Mockito; + +public class RESTCatalogTestInfrastructure { + protected static final ObjectMapper MAPPER = RESTObjectMapper.mapper(); + + protected RESTCatalog restCatalog; + protected InMemoryCatalog backendCatalog; + protected Server httpServer; + protected RESTCatalogAdapter adapterForRESTServer; + protected ParserContext parserContext; + + public void before(Path temp) throws Exception { + File warehouse = temp.toFile(); + + this.backendCatalog = new InMemoryCatalog(); + this.backendCatalog.initialize( + "in-memory", + ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getAbsolutePath())); + + HTTPHeaders catalogHeaders = + HTTPHeaders.of( + Map.of( + "Authorization", + "Bearer client-credentials-token:sub=catalog", + "test-header", + "test-value")); + HTTPHeaders contextHeaders = + HTTPHeaders.of( + Map.of( + "Authorization", + "Bearer client-credentials-token:sub=user", + "test-header", + "test-value")); + + adapterForRESTServer = + Mockito.spy( + new RESTCatalogAdapter(backendCatalog) { + @Override + public <T extends RESTResponse> T execute( + HTTPRequest request, + Class<T> responseType, + Consumer<ErrorResponse> errorHandler, + Consumer<Map<String, String>> responseHeaders) { + // this doesn't use a Mockito spy because this is used for catalog tests, which have + // different method calls + if (!ResourcePaths.tokens().equals(request.path())) { Review Comment: Yeah this is something I observed too, I feel like we're doing quite a bit more setup than needed for each of these cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
