adnanhemani commented on code in PR #1585:
URL: https://github.com/apache/polaris/pull/1585#discussion_r2122661176
##########
service/common/src/main/java/org/apache/polaris/service/task/ManifestFileCleanupTaskHandler.java:
##########
@@ -82,18 +82,20 @@ private boolean cleanUpManifestFile(
return true;
}
- ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile,
fileIO);
- List<CompletableFuture<Void>> dataFileDeletes =
- StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(dataFiles.iterator(),
Spliterator.IMMUTABLE),
- false)
- .map(file -> tryDelete(tableId, fileIO, manifestFile.path(),
file.location(), null, 1))
- .toList();
- LOGGER.debug(
- "Scheduled {} data files to be deleted from manifest {}",
- dataFileDeletes.size(),
- manifestFile.path());
try {
+ ManifestReader<DataFile> dataFiles = ManifestFiles.read(manifestFile,
fileIO);
Review Comment:
What's the reason behind this change?
##########
extension/persistence/relational-jdbc/src/main/java/org/apache/polaris/extension/persistence/relational/jdbc/JdbcMetaStoreManagerFactory.java:
##########
@@ -172,6 +172,11 @@ public Map<String, BaseResult>
purgeRealms(Iterable<String> realms) {
return Map.copyOf(results);
}
+ @Override
+ public Map<String, PolarisMetaStoreManager> getMetaStoreManagerMap() {
Review Comment:
To make this a bit more defensively-coded, I might recommend making this
into a iterator of Map.Entry objects, given that this is a public method and we
wouldn't want any code path to be able to modify this mapping?
##########
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskRecoveryManagerTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.quarkus.task;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.quarkus.test.junit.QuarkusTest;
+import jakarta.annotation.Nonnull;
+import jakarta.inject.Inject;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.AsyncTaskType;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisTaskConstants;
+import org.apache.polaris.core.entity.TaskEntity;
+import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.dao.entity.EntitiesResult;
+import org.apache.polaris.core.persistence.pagination.PageToken;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.task.TableCleanupTaskHandler;
+import org.apache.polaris.service.task.TaskExecutorImpl;
+import org.apache.polaris.service.task.TaskFileIOSupplier;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.threeten.extra.MutableClock;
+
+@QuarkusTest
+public class TaskRecoveryManagerTest {
+ @Inject private MetaStoreManagerFactory metaStoreManagerFactory;
+ protected final MutableClock timeSource = MutableClock.of(Instant.now(),
ZoneOffset.UTC);
+ private final RealmContext realmContext = () -> "realmName";
+
+ private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) {
+ return new TaskFileIOSupplier(
+ new FileIOFactory() {
+ @Override
+ public FileIO loadFileIO(
+ @Nonnull CallContext callContext,
+ @Nonnull String ioImplClassName,
+ @Nonnull Map<String, String> properties,
+ @Nonnull TableIdentifier identifier,
+ @Nonnull Set<String> tableLocations,
+ @Nonnull Set<PolarisStorageActions> storageActions,
+ @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
+ return fileIO;
+ }
+ });
+ }
+
+ private void addTaskLocation(TaskEntity task) {
+ Map<String, String> internalPropertiesAsMap = new
HashMap<>(task.getInternalPropertiesAsMap());
Review Comment:
`addInternalProperty`
##########
service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java:
##########
@@ -193,6 +198,9 @@ private Stream<TaskEntity> getManifestTaskStream(
.withData(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableEntity.getTableIdentifier(),
TaskUtils.encodeManifestFile(mf)))
+ .withLastAttemptExecutorId(executorId)
+ .withAttemptCount(1)
Review Comment:
How can we assume this?
##########
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TaskRecoveryManagerTest.java:
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.quarkus.task;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import io.quarkus.test.junit.QuarkusTest;
+import jakarta.annotation.Nonnull;
+import jakarta.inject.Inject;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.StatisticsFile;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.AsyncTaskType;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisTaskConstants;
+import org.apache.polaris.core.entity.TaskEntity;
+import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.dao.entity.EntitiesResult;
+import org.apache.polaris.core.persistence.pagination.PageToken;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.task.TableCleanupTaskHandler;
+import org.apache.polaris.service.task.TaskExecutorImpl;
+import org.apache.polaris.service.task.TaskFileIOSupplier;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.threeten.extra.MutableClock;
+
+@QuarkusTest
+public class TaskRecoveryManagerTest {
+ @Inject private MetaStoreManagerFactory metaStoreManagerFactory;
+ protected final MutableClock timeSource = MutableClock.of(Instant.now(),
ZoneOffset.UTC);
+ private final RealmContext realmContext = () -> "realmName";
+
+ private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) {
+ return new TaskFileIOSupplier(
+ new FileIOFactory() {
+ @Override
+ public FileIO loadFileIO(
+ @Nonnull CallContext callContext,
+ @Nonnull String ioImplClassName,
+ @Nonnull Map<String, String> properties,
+ @Nonnull TableIdentifier identifier,
+ @Nonnull Set<String> tableLocations,
+ @Nonnull Set<PolarisStorageActions> storageActions,
+ @Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
+ return fileIO;
+ }
+ });
+ }
+
+ private void addTaskLocation(TaskEntity task) {
+ Map<String, String> internalPropertiesAsMap = new
HashMap<>(task.getInternalPropertiesAsMap());
+ internalPropertiesAsMap.put(PolarisTaskConstants.STORAGE_LOCATION,
"file:///tmp/");
+ ((PolarisBaseEntity)
task).setInternalPropertiesAsMap(internalPropertiesAsMap);
+ }
+
+ @Test
+ void testTaskRecovery() throws IOException {
+ // Step 1: Initialize mock table metadata, snapshot, and statistics file
to simulate a realistic
+ // Iceberg table
+ PolarisCallContext polarisCallContext =
+ new PolarisCallContext(
+
metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get(),
+ new PolarisDefaultDiagServiceImpl(),
+ new PolarisConfigurationStore() {},
+ timeSource);
+ try (CallContext callCtx = CallContext.of(realmContext,
polarisCallContext)) {
+ CallContext.setCurrentContext(callCtx);
+ Map<String, AtomicInteger> retryCounter = new HashMap<>();
+ FileIO fileIO =
+ new InMemoryFileIO() {
+ @Override
+ public void close() {
+ // no-op
+ }
+ };
+ TestServices testServices =
TestServices.builder().realmContext(realmContext).build();
+ TaskExecutorImpl taskExecutor =
+ new TaskExecutorImpl(
+ Runnable::run,
+ metaStoreManagerFactory,
+ buildTaskFileIOSupplier(fileIO),
+ testServices.polarisEventListener()) {
+ @Override
+ public void addTaskHandlerContext(long taskEntityId, CallContext
callContext) {
+ int attempts =
+ retryCounter
+ .computeIfAbsent(String.valueOf(taskEntityId), k -> new
AtomicInteger(0))
+ .incrementAndGet();
+ if (attempts == 1) {
+ // no-op for first attempt to mock failure
+ } else {
+ super.addTaskHandlerContext(taskEntityId, callContext);
+ }
+ }
+ };
+ taskExecutor.init();
+
+ TableCleanupTaskHandler tableCleanupTaskHandler =
+ new TableCleanupTaskHandler(
+ taskExecutor,
+ metaStoreManagerFactory,
+ buildTaskFileIOSupplier(new InMemoryFileIO())) {};
+
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(Namespace.of("db1", "schema1"), "table1");
+ long snapshotId = 100L;
+ ManifestFile manifestFile =
+ TaskTestUtils.manifestFile(
+ fileIO, "manifest1.avro", snapshotId, "dataFile1.parquet",
"dataFile2.parquet");
+ TestSnapshot snapshot =
+ TaskTestUtils.newSnapshot(fileIO, "manifestList.avro", 1,
snapshotId, 99L, manifestFile);
+ String metadataFile = "v1-49494949.metadata.json";
+ StatisticsFile statisticsFile =
+ TaskTestUtils.writeStatsFile(
+ snapshot.snapshotId(),
+ snapshot.sequenceNumber(),
+ "/metadata/" + UUID.randomUUID() + ".stats",
+ fileIO);
+ TaskTestUtils.writeTableMetadata(fileIO, metadataFile,
List.of(statisticsFile), snapshot);
+
+ // Step 2: Execute the initial cleanup task, where two child cleanup
tasks are generated and
+ // executed the first time
+ TaskEntity task =
+ new TaskEntity.Builder()
+ .setName("cleanup_" + tableIdentifier)
+ .withTaskType(AsyncTaskType.ENTITY_CLEANUP_SCHEDULER)
+ .withData(
+ new IcebergTableLikeEntity.Builder(tableIdentifier,
metadataFile)
+ .setName("table1")
+ .setCatalogId(1)
+ .setCreateTimestamp(100)
+ .build())
+ .build();
+ addTaskLocation(task);
+
Assertions.assertThatPredicate(tableCleanupTaskHandler::canHandleTask).accepts(task);
+ tableCleanupTaskHandler.handleTask(task, callCtx);
+
+ // Step 3: Verify that the generated child tasks were registered,
ATTEMPT_COUNT = 2
+ timeSource.add(Duration.ofMinutes(10));
Review Comment:
I, personally, found this very hard to follow - even with the comments. I
would highly recommend making the comments much more verbose here to allow the
full flow of logic (what is happening with which task and why) to be
communicated to a reader who may not be an expert at this particular type of
task or tasks in general.
##########
service/common/src/main/java/org/apache/polaris/service/task/TableCleanupTaskHandler.java:
##########
@@ -235,6 +247,9 @@ private Stream<TaskEntity> getMetadataTaskStream(
.withData(
new BatchFileCleanupTaskHandler.BatchFileCleanupTask(
tableEntity.getTableIdentifier(), metadataBatch))
+ .withLastAttemptExecutorId(executorId)
+ .withAttemptCount(1)
Review Comment:
Ditto as above.
##########
quarkus/service/src/test/java/org/apache/polaris/service/quarkus/task/TableCleanupTaskHandlerTest.java:
##########
@@ -152,6 +156,7 @@ public void testTableCleanup() throws IOException {
handler.handleTask(task, callContext);
+ timeSource.add(Duration.ofMinutes(10));
Review Comment:
Can you explain this further - I'm not sure why the tests need this 10m
jump? Is it so that tasks are "recovered" by the Quarkus Scheduled method?
##########
service/common/src/main/java/org/apache/polaris/service/task/TaskRecoveryManager.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.task;
+
+import jakarta.annotation.Nonnull;
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.Map;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.TaskEntity;
+import org.apache.polaris.core.persistence.BasePersistence;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.dao.entity.BaseResult;
+import org.apache.polaris.core.persistence.dao.entity.EntitiesResult;
+import org.apache.polaris.core.persistence.pagination.PageToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TaskRecoveryManager {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TaskRecoveryManager.class);
+
+ public static void recoverPendingTasks(
+ MetaStoreManagerFactory metaStoreManagerFactory,
+ String executorId,
+ TaskExecutor taskExecutor) {
+ recoverPendingTasks(
+ metaStoreManagerFactory,
+ executorId,
+ taskExecutor,
+ new PolarisConfigurationStore() {},
+ Clock.system(ZoneId.systemDefault()));
+ }
+
+ public static void recoverPendingTasks(
+ @Nonnull MetaStoreManagerFactory metaStoreManagerFactory,
+ @Nonnull String executorId,
+ @Nonnull TaskExecutor taskExecutor,
+ @Nonnull PolarisConfigurationStore configurationStore,
+ @Nonnull Clock clock) {
+ for (Map.Entry<String, PolarisMetaStoreManager> entry :
+ metaStoreManagerFactory.getMetaStoreManagerMap().entrySet()) {
+ RealmContext realmContext = entry::getKey;
+ PolarisMetaStoreManager metaStoreManager = entry.getValue();
+ BasePersistence metastore =
+
metaStoreManagerFactory.getOrCreateSessionSupplier(realmContext).get();
+ // Construct a PolarisCallContext since the original one has lost
+ PolarisCallContext polarisCallContext =
+ new PolarisCallContext(
+ metastore, new PolarisDefaultDiagServiceImpl(),
configurationStore, clock);
+ EntitiesResult entitiesResult =
Review Comment:
I'm not sure I'm understanding the logic here: we are asking for 20 tasks
here - but what if there are more than 20 tasks that need recovery?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]