This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 67ec614b8be [fix][meta] PIP-454: fix migration retry after failure,
preparation timeout and mid-migration store startup (#25989)
67ec614b8be is described below
commit 67ec614b8bef7db50bcfab61e3f05ef00a2cfd98
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jun 10 05:32:23 2026 -0700
[fix][meta] PIP-454: fix migration retry after failure, preparation timeout
and mid-migration store startup (#25989)
---
.../broker/admin/impl/MetadataMigrationBase.java | 20 +++-
.../coordination/impl/MigrationCoordinator.java | 54 +++++++++--
.../pulsar/metadata/impl/DualMetadataStore.java | 18 ++++
.../pulsar/metadata/DualMetadataStoreTest.java | 63 +++++++++++++
.../pulsar/metadata/MigrationCoordinatorTest.java | 101 +++++++++++++++++++++
5 files changed, 246 insertions(+), 10 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
index fd4aaa3eb1f..12238200848 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
@@ -86,11 +86,29 @@ public class MetadataMigrationBase extends AdminResource {
try {
// Check if metadata store is wrapped with DualMetadataStore
- if (!(pulsar().getLocalMetadataStore() instanceof
DualMetadataStore)) {
+ if (!(pulsar().getLocalMetadataStore() instanceof
DualMetadataStore dualStore)) {
throw new RestException(Response.Status.BAD_REQUEST, "Metadata
store is not configured for migration. "
+ "Please ensure you're using a supported source
metadata store (e.g., ZooKeeper).");
}
+ // Reject the request if a migration is already in progress or was
completed. The migration
+ // flag is always kept in the source store, so read it from there:
after a completed
+ // migration the dual store would route the read to the target
store.
+ var existingFlag =
dualStore.getSourceStore().get(MigrationState.MIGRATION_FLAG_PATH).get();
+ if (existingFlag.isPresent()) {
+ MigrationState currentState =
ObjectMapperFactory.getMapper().reader()
+ .readValue(existingFlag.get().getValue(),
MigrationState.class);
+ switch (currentState.getPhase()) {
+ case PREPARATION, COPYING -> throw new
RestException(Response.Status.CONFLICT,
+ "Migration is already in progress (phase: " +
currentState.getPhase() + ")");
+ case COMPLETED -> throw new
RestException(Response.Status.CONFLICT,
+ "Migration has already been completed");
+ default -> {
+ // NOT_STARTED or FAILED: ok to start (or retry) the
migration
+ }
+ }
+ }
+
// Create coordinator
MigrationCoordinator coordinator = new
MigrationCoordinator(pulsar().getLocalMetadataStore(), targetUrl);
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
index f3e1ecb668b..93dd95a9d87 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.metadata.coordination.impl;
+import com.google.common.annotations.VisibleForTesting;
import io.oxia.client.api.AsyncOxiaClient;
import io.oxia.client.api.options.defs.OptionOverrideModificationsCount;
import io.oxia.client.api.options.defs.OptionOverrideVersionId;
@@ -38,6 +39,7 @@ import org.apache.pulsar.common.migration.MigrationPhase;
import org.apache.pulsar.common.migration.MigrationState;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -53,12 +55,21 @@ public class MigrationCoordinator {
private final String targetUrl;
private final AsyncOxiaClient oxiaClient;
private final MetadataCache<MigrationState> migrationStateCache;
+ private final Duration preparationTimeout;
private static final int MAX_PENDING_OPS = 1000;
+ private static final Duration DEFAULT_PREPARATION_TIMEOUT =
Duration.ofSeconds(60);
public MigrationCoordinator(MetadataStore sourceStore, String targetUrl)
throws MetadataStoreException {
+ this(sourceStore, targetUrl, DEFAULT_PREPARATION_TIMEOUT);
+ }
+
+ @VisibleForTesting
+ public MigrationCoordinator(MetadataStore sourceStore, String targetUrl,
Duration preparationTimeout)
+ throws MetadataStoreException {
this.sourceStore = sourceStore;
this.targetUrl = targetUrl;
+ this.preparationTimeout = preparationTimeout;
this.migrationStateCache =
sourceStore.getMetadataCache(MigrationState.class);
if (!targetUrl.startsWith("oxia://")) {
@@ -78,10 +89,11 @@ public class MigrationCoordinator {
log.info().attr("source",
sourceStore.getClass().getSimpleName()).log("Source (current)");
log.info().attr("target", targetUrl).log("Target");
- try {
- // 1. Create migration flag
- setInitialMigrationPhase();
+ // 1. Create migration flag. If another migration is already in
progress, this fails without
+ // affecting the existing migration state.
+ setInitialMigrationPhase();
+ try {
// 2. Wait for participants to prepare
waitForPreparation();
@@ -102,10 +114,30 @@ public class MigrationCoordinator {
private void setInitialMigrationPhase() throws MetadataStoreException {
try {
+ Optional<GetResult> existing =
sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).get();
+ Optional<Long> expectedVersion;
+ if (existing.isEmpty()) {
+ // Create-only, to guard against concurrent migration starts
+ expectedVersion = Optional.of(-1L);
+ } else {
+ MigrationState currentState =
ObjectMapperFactory.getMapper().reader()
+ .readValue(existing.get().getValue(),
MigrationState.class);
+ expectedVersion = switch (currentState.getPhase()) {
+ // A leftover flag from a failed (or never started)
migration can be replaced. The
+ // expected version guards against concurrent migration
starts.
+ case NOT_STARTED, FAILED ->
Optional.of(existing.get().getStat().getVersion());
+ case PREPARATION, COPYING -> throw new
MetadataStoreException(
+ "Migration is already in progress (phase: " +
currentState.getPhase() + ")");
+ case COMPLETED -> throw new
MetadataStoreException("Migration has already been completed");
+ };
+ }
+
sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
ObjectMapperFactory.getMapper().writer()
.writeValueAsBytes(new
MigrationState(MigrationPhase.PREPARATION, targetUrl)),
- Optional.of(-1L)).get();
+ expectedVersion).get();
+ } catch (MetadataStoreException e) {
+ throw e;
} catch (Exception e) {
throw new MetadataStoreException(e);
}
@@ -123,24 +155,28 @@ public class MigrationCoordinator {
private void waitForPreparation() throws Exception {
log.info("Waiting for all participants to prepare...");
- long startTime = System.currentTimeMillis();
+ long deadline = System.currentTimeMillis() +
preparationTimeout.toMillis();
Backoff backoff = Backoff.builder()
.initialDelay(Duration.ofMillis(100))
.mandatoryStop(Duration.ofSeconds(60))
.maxBackoff(Duration.ofSeconds(60)).build();
- while (System.currentTimeMillis() - startTime < 60_000) {
+ while (true) {
List<String> pending =
sourceStore.getChildren(MigrationState.PARTICIPANTS_PATH).get();
if (pending.isEmpty()) {
log.info("All migration participants ready");
return;
}
+ if (System.currentTimeMillis() >= deadline) {
+ log.error().attr("pendingParticipants", pending)
+ .log("Failed to wait for all participants to prepare");
+ throw new MetadataStoreException(
+ "Timed out waiting for migration participants to
prepare: " + pending);
+ }
+
log.info().attr("pending", pending).log("Waiting for participants
to prepare");
Thread.sleep(backoff.next().toMillis());
}
-
- log.error().attr("pendingParticipants",
sourceStore.getChildren(MigrationState.PARTICIPANTS_PATH).get())
- .log("Failed to wait for all participants to prepare");
}
private void copyPersistentData() throws Exception {
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 16daffc81db..4fcde76505c 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -112,6 +112,15 @@ public class DualMetadataStore implements
MetadataStoreExtended {
// Watch for migration events
watchForMigrationEvents();
+
+ if (migrationState.getPhase() == MigrationPhase.PREPARATION
+ || migrationState.getPhase() == MigrationPhase.COPYING) {
+ // A migration was already in progress when this store was
created, so the PREPARATION
+ // notification was missed. Run the preparation handler now, so
that the target store gets
+ // initialized and the participant registered above acknowledges
instead of stalling the
+ // migration coordinator.
+ executor.execute(this::handleMigrationStart);
+ }
}
private void readCurrentState() throws MetadataStoreException {
@@ -213,6 +222,15 @@ public class DualMetadataStore implements
MetadataStoreExtended {
private void handleMigrationComplete() {
log.info("=== Metadata Migration Complete ===");
+ try {
+ // The target store might not have been initialized yet, if this
store was created while
+ // the migration was already in progress
+ initializeTargetStore(migrationState.getTargetUrl());
+ } catch (MetadataStoreException e) {
+ log.error().exception(e).log("Failed to initialize target store on
migration completion");
+ return;
+ }
+
caches.forEach(DualMetadataCache::handleSwitchToTargetStore);
listeners.forEach(targetStore::registerListener);
sessionListeners.forEach(targetStore::registerSessionListener);
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataStoreTest.java
index ab5374a1841..578d419c81e 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataStoreTest.java
@@ -44,6 +44,7 @@ import
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.DualMetadataStore;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -450,4 +451,66 @@ public class DualMetadataStoreTest extends
BaseMetadataStoreTest {
// Exists should check target in COMPLETED phase
assertTrue(dualStore.exists(targetPath).join());
}
+
+ @Test
+ public void testStoreCreatedDuringMigration() throws Exception {
+ String prefix = newKey();
+
+ @Cleanup
+ MetadataStore rawStore = new ZKMetadataStore(zks.getConnectionString(),
+ MetadataStoreConfig.builder().build(), false);
+
+ String targetUrl = "memory:" + UUID.randomUUID();
+ @Cleanup
+ MetadataStore targetStore = MetadataStoreFactory.create(targetUrl,
+ MetadataStoreConfig.builder().build());
+
+ // The migration is already in PREPARATION phase before the store gets
created
+ MigrationState preparationState = new
MigrationState(MigrationPhase.PREPARATION, targetUrl);
+ rawStore.put(MigrationState.MIGRATION_FLAG_PATH,
+
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(preparationState),
+ Optional.empty()).join();
+
+ @Cleanup
+ DualMetadataStore dualStore = new DualMetadataStore(
+ new ZKMetadataStore(zks.getConnectionString(),
MetadataStoreConfig.builder().build(), true),
+ MetadataStoreConfig.builder().build());
+
+ // Writes must be blocked while the migration is in progress
+ String path = prefix + "/test-key";
+ byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+ try {
+ dualStore.put(path, data, Optional.empty()).join();
+ fail("Should have thrown IllegalStateException");
+ } catch (CompletionException e) {
+ assertTrue(e.getCause() instanceof IllegalStateException);
+ }
+
+ // Even though it missed the PREPARATION notification, the store must
acknowledge the
+ // preparation (delete its participant node), so that the coordinator
is not stalled
+ Awaitility.await().untilAsserted(() ->
+
assertTrue(rawStore.getChildren(MigrationState.PARTICIPANTS_PATH).join().isEmpty()));
+
+ // Complete the migration
+ MigrationState completedState = new
MigrationState(MigrationPhase.COMPLETED, targetUrl);
+ rawStore.put(MigrationState.MIGRATION_FLAG_PATH,
+
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(completedState),
+ Optional.empty()).join();
+
+ // Once the store has processed the completion, writes must succeed
and land in the target store
+ Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+ dualStore.put(path, data, Optional.empty()).join();
+ assertTrue(targetStore.get(path).join().isPresent());
+ });
+
+ Optional<GetResult> targetResult = targetStore.get(path).join();
+ assertTrue(targetResult.isPresent());
+ assertEquals(new String(targetResult.get().getValue(),
StandardCharsets.UTF_8), "test-data");
+
+ // Reads now come from the target store as well
+ assertTrue(dualStore.get(path).join().isPresent());
+
+ // The key must not have been written to the source store
+ assertFalse(rawStore.get(path).join().isPresent());
+ }
}
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
index 6774d166b77..2a6b99bdcca 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
@@ -21,8 +21,11 @@ package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.EnumSet;
+import java.util.List;
import java.util.Optional;
import lombok.Cleanup;
import lombok.CustomLog;
@@ -32,10 +35,12 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.coordination.impl.MigrationCoordinator;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -298,4 +303,100 @@ public class MigrationCoordinatorTest extends
BaseMetadataStoreTest {
|| state.getPhase() == MigrationPhase.COPYING
|| state.getPhase() == MigrationPhase.COMPLETED);
}
+
+ @Test
+ public void testRetryAfterFailedMigration() throws Exception {
+ String prefix = newKey();
+
+ @Cleanup
+ MetadataStoreExtended sourceStore =
+ (MetadataStoreExtended)
MetadataStoreFactory.create(zks.getConnectionString(),
+ MetadataStoreConfig.builder().build());
+
+ // Raw store handle for writing/reading the migration flag
deterministically, bypassing the
+ // DualMetadataStore phase-based routing
+ @Cleanup
+ MetadataStore rawStore = new ZKMetadataStore(zks.getConnectionString(),
+ MetadataStoreConfig.builder().build(), false);
+
+ String targetUrl = getOxiaServerConnectString();
+
+ @Cleanup
+ MetadataStore targetStore = MetadataStoreFactory.create(targetUrl,
MetadataStoreConfig.builder().build());
+
+ String key1 = prefix + "/persistent/key1";
+ sourceStore.put(key1, "value1".getBytes(StandardCharsets.UTF_8),
Optional.empty()).join();
+
+ // Simulate a previously failed migration attempt
+ rawStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer()
+ .writeValueAsBytes(new
MigrationState(MigrationPhase.FAILED, targetUrl)),
+ Optional.empty()).join();
+
+ // Re-running the migration must overwrite the leftover FAILED flag
and complete
+ MigrationCoordinator coordinator = new
MigrationCoordinator(sourceStore, targetUrl);
+ coordinator.startMigration();
+
+ Optional<GetResult> result =
rawStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+ assertTrue(result.isPresent());
+ MigrationState state = ObjectMapperFactory.getMapper().reader()
+ .readValue(result.get().getValue(), MigrationState.class);
+ assertEquals(state.getPhase(), MigrationPhase.COMPLETED);
+
+ // Verify the data was copied to the target store
+ Optional<GetResult> target1 = targetStore.get(key1).join();
+ assertTrue(target1.isPresent());
+ assertEquals(new String(target1.get().getValue(),
StandardCharsets.UTF_8), "value1");
+ }
+
+ @Test
+ public void testStartRejectedWhileMigrationInProgress() throws Exception {
+ @Cleanup
+ MetadataStore sourceStore = new
ZKMetadataStore(zks.getConnectionString(),
+ MetadataStoreConfig.builder().build(), false);
+
+ String targetUrl = getOxiaServerConnectString();
+
+ for (MigrationPhase phase : List.of(MigrationPhase.PREPARATION,
MigrationPhase.COPYING,
+ MigrationPhase.COMPLETED)) {
+ sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+ ObjectMapperFactory.getMapper().writer()
+ .writeValueAsBytes(new MigrationState(phase,
targetUrl)),
+ Optional.empty()).join();
+
+ MigrationCoordinator coordinator = new
MigrationCoordinator(sourceStore, targetUrl);
+ expectThrows(MetadataStoreException.class,
coordinator::startMigration);
+
+ // The existing migration flag must be left untouched (in
particular, not marked FAILED)
+ Optional<GetResult> result =
sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+ assertTrue(result.isPresent());
+ MigrationState state = ObjectMapperFactory.getMapper().reader()
+ .readValue(result.get().getValue(), MigrationState.class);
+ assertEquals(state.getPhase(), phase);
+ }
+ }
+
+ @Test
+ public void testPreparationTimeoutFailsMigration() throws Exception {
+ @Cleanup
+ MetadataStore sourceStore = new
ZKMetadataStore(zks.getConnectionString(),
+ MetadataStoreConfig.builder().build(), false);
+
+ // Simulate a participant that never acknowledges the preparation
+ sourceStore.put(MigrationState.PARTICIPANTS_PATH + "/id-0000000001",
new byte[0],
+ Optional.empty()).join();
+
+ String targetUrl = getOxiaServerConnectString();
+
+ MigrationCoordinator coordinator =
+ new MigrationCoordinator(sourceStore, targetUrl,
Duration.ofSeconds(2));
+ expectThrows(MetadataStoreException.class,
coordinator::startMigration);
+
+ // The migration must have transitioned to FAILED
+ Optional<GetResult> result =
sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+ assertTrue(result.isPresent());
+ MigrationState state = ObjectMapperFactory.getMapper().reader()
+ .readValue(result.get().getValue(), MigrationState.class);
+ assertEquals(state.getPhase(), MigrationPhase.FAILED);
+ }
}