This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 67ec614b8be [fix][meta] PIP-454: fix migration retry after failure, 
preparation timeout and mid-migration store startup (#25989)
67ec614b8be is described below

commit 67ec614b8bef7db50bcfab61e3f05ef00a2cfd98
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jun 10 05:32:23 2026 -0700

    [fix][meta] PIP-454: fix migration retry after failure, preparation timeout 
and mid-migration store startup (#25989)
---
 .../broker/admin/impl/MetadataMigrationBase.java   |  20 +++-
 .../coordination/impl/MigrationCoordinator.java    |  54 +++++++++--
 .../pulsar/metadata/impl/DualMetadataStore.java    |  18 ++++
 .../pulsar/metadata/DualMetadataStoreTest.java     |  63 +++++++++++++
 .../pulsar/metadata/MigrationCoordinatorTest.java  | 101 +++++++++++++++++++++
 5 files changed, 246 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
index fd4aaa3eb1f..12238200848 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/MetadataMigrationBase.java
@@ -86,11 +86,29 @@ public class MetadataMigrationBase extends AdminResource {
 
         try {
             // Check if metadata store is wrapped with DualMetadataStore
-            if (!(pulsar().getLocalMetadataStore() instanceof 
DualMetadataStore)) {
+            if (!(pulsar().getLocalMetadataStore() instanceof 
DualMetadataStore dualStore)) {
                 throw new RestException(Response.Status.BAD_REQUEST, "Metadata 
store is not configured for migration. "
                         + "Please ensure you're using a supported source 
metadata store (e.g., ZooKeeper).");
             }
 
+            // Reject the request if a migration is already in progress or was 
completed. The migration
+            // flag is always kept in the source store, so read it from there: 
after a completed
+            // migration the dual store would route the read to the target 
store.
+            var existingFlag = 
dualStore.getSourceStore().get(MigrationState.MIGRATION_FLAG_PATH).get();
+            if (existingFlag.isPresent()) {
+                MigrationState currentState = 
ObjectMapperFactory.getMapper().reader()
+                        .readValue(existingFlag.get().getValue(), 
MigrationState.class);
+                switch (currentState.getPhase()) {
+                    case PREPARATION, COPYING -> throw new 
RestException(Response.Status.CONFLICT,
+                            "Migration is already in progress (phase: " + 
currentState.getPhase() + ")");
+                    case COMPLETED -> throw new 
RestException(Response.Status.CONFLICT,
+                            "Migration has already been completed");
+                    default -> {
+                        // NOT_STARTED or FAILED: ok to start (or retry) the 
migration
+                    }
+                }
+            }
+
             // Create coordinator
             MigrationCoordinator coordinator = new 
MigrationCoordinator(pulsar().getLocalMetadataStore(), targetUrl);
 
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
index f3e1ecb668b..93dd95a9d87 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/MigrationCoordinator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.metadata.coordination.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.oxia.client.api.AsyncOxiaClient;
 import io.oxia.client.api.options.defs.OptionOverrideModificationsCount;
 import io.oxia.client.api.options.defs.OptionOverrideVersionId;
@@ -38,6 +39,7 @@ import org.apache.pulsar.common.migration.MigrationPhase;
 import org.apache.pulsar.common.migration.MigrationState;
 import org.apache.pulsar.common.util.Backoff;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
@@ -53,12 +55,21 @@ public class MigrationCoordinator {
     private final String targetUrl;
     private final AsyncOxiaClient oxiaClient;
     private final MetadataCache<MigrationState> migrationStateCache;
+    private final Duration preparationTimeout;
 
     private static final int MAX_PENDING_OPS = 1000;
+    private static final Duration DEFAULT_PREPARATION_TIMEOUT = 
Duration.ofSeconds(60);
 
     public MigrationCoordinator(MetadataStore sourceStore, String targetUrl) 
throws MetadataStoreException {
+        this(sourceStore, targetUrl, DEFAULT_PREPARATION_TIMEOUT);
+    }
+
+    @VisibleForTesting
+    public MigrationCoordinator(MetadataStore sourceStore, String targetUrl, 
Duration preparationTimeout)
+            throws MetadataStoreException {
         this.sourceStore = sourceStore;
         this.targetUrl = targetUrl;
+        this.preparationTimeout = preparationTimeout;
         this.migrationStateCache = 
sourceStore.getMetadataCache(MigrationState.class);
 
         if (!targetUrl.startsWith("oxia://")) {
@@ -78,10 +89,11 @@ public class MigrationCoordinator {
         log.info().attr("source", 
sourceStore.getClass().getSimpleName()).log("Source (current)");
         log.info().attr("target", targetUrl).log("Target");
 
-        try {
-            // 1. Create migration flag
-            setInitialMigrationPhase();
+        // 1. Create migration flag. If another migration is already in 
progress, this fails without
+        // affecting the existing migration state.
+        setInitialMigrationPhase();
 
+        try {
             // 2. Wait for participants to prepare
             waitForPreparation();
 
@@ -102,10 +114,30 @@ public class MigrationCoordinator {
 
     private void setInitialMigrationPhase() throws MetadataStoreException {
         try {
+            Optional<GetResult> existing = 
sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).get();
+            Optional<Long> expectedVersion;
+            if (existing.isEmpty()) {
+                // Create-only, to guard against concurrent migration starts
+                expectedVersion = Optional.of(-1L);
+            } else {
+                MigrationState currentState = 
ObjectMapperFactory.getMapper().reader()
+                        .readValue(existing.get().getValue(), 
MigrationState.class);
+                expectedVersion = switch (currentState.getPhase()) {
+                    // A leftover flag from a failed (or never started) 
migration can be replaced. The
+                    // expected version guards against concurrent migration 
starts.
+                    case NOT_STARTED, FAILED -> 
Optional.of(existing.get().getStat().getVersion());
+                    case PREPARATION, COPYING -> throw new 
MetadataStoreException(
+                            "Migration is already in progress (phase: " + 
currentState.getPhase() + ")");
+                    case COMPLETED -> throw new 
MetadataStoreException("Migration has already been completed");
+                };
+            }
+
             sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
                     ObjectMapperFactory.getMapper().writer()
                             .writeValueAsBytes(new 
MigrationState(MigrationPhase.PREPARATION, targetUrl)),
-                    Optional.of(-1L)).get();
+                    expectedVersion).get();
+        } catch (MetadataStoreException e) {
+            throw e;
         } catch (Exception e) {
             throw new MetadataStoreException(e);
         }
@@ -123,24 +155,28 @@ public class MigrationCoordinator {
     private void waitForPreparation() throws Exception {
         log.info("Waiting for all participants to prepare...");
 
-        long startTime = System.currentTimeMillis();
+        long deadline = System.currentTimeMillis() + 
preparationTimeout.toMillis();
         Backoff backoff = Backoff.builder()
                 .initialDelay(Duration.ofMillis(100))
                 .mandatoryStop(Duration.ofSeconds(60))
                 .maxBackoff(Duration.ofSeconds(60)).build();
-        while (System.currentTimeMillis() - startTime < 60_000) {
+        while (true) {
             List<String> pending = 
sourceStore.getChildren(MigrationState.PARTICIPANTS_PATH).get();
             if (pending.isEmpty()) {
                 log.info("All migration participants ready");
                 return;
             }
 
+            if (System.currentTimeMillis() >= deadline) {
+                log.error().attr("pendingParticipants", pending)
+                        .log("Failed to wait for all participants to prepare");
+                throw new MetadataStoreException(
+                        "Timed out waiting for migration participants to 
prepare: " + pending);
+            }
+
             log.info().attr("pending", pending).log("Waiting for participants 
to prepare");
             Thread.sleep(backoff.next().toMillis());
         }
-
-        log.error().attr("pendingParticipants", 
sourceStore.getChildren(MigrationState.PARTICIPANTS_PATH).get())
-                .log("Failed to wait for all participants to prepare");
     }
 
     private void copyPersistentData() throws Exception {
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
index 16daffc81db..4fcde76505c 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/DualMetadataStore.java
@@ -112,6 +112,15 @@ public class DualMetadataStore implements 
MetadataStoreExtended {
 
         // Watch for migration events
         watchForMigrationEvents();
+
+        if (migrationState.getPhase() == MigrationPhase.PREPARATION
+                || migrationState.getPhase() == MigrationPhase.COPYING) {
+            // A migration was already in progress when this store was 
created, so the PREPARATION
+            // notification was missed. Run the preparation handler now, so 
that the target store gets
+            // initialized and the participant registered above acknowledges 
instead of stalling the
+            // migration coordinator.
+            executor.execute(this::handleMigrationStart);
+        }
     }
 
     private void readCurrentState() throws MetadataStoreException {
@@ -213,6 +222,15 @@ public class DualMetadataStore implements 
MetadataStoreExtended {
     private void handleMigrationComplete() {
         log.info("=== Metadata Migration Complete ===");
 
+        try {
+            // The target store might not have been initialized yet, if this 
store was created while
+            // the migration was already in progress
+            initializeTargetStore(migrationState.getTargetUrl());
+        } catch (MetadataStoreException e) {
+            log.error().exception(e).log("Failed to initialize target store on 
migration completion");
+            return;
+        }
+
         caches.forEach(DualMetadataCache::handleSwitchToTargetStore);
         listeners.forEach(targetStore::registerListener);
         sessionListeners.forEach(targetStore::registerSessionListener);
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataStoreTest.java
index ab5374a1841..578d419c81e 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/DualMetadataStoreTest.java
@@ -44,6 +44,7 @@ import 
org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.impl.DualMetadataStore;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
+import org.awaitility.Awaitility;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -450,4 +451,66 @@ public class DualMetadataStoreTest extends 
BaseMetadataStoreTest {
         // Exists should check target in COMPLETED phase
         assertTrue(dualStore.exists(targetPath).join());
     }
+
+    @Test
+    public void testStoreCreatedDuringMigration() throws Exception {
+        String prefix = newKey();
+
+        @Cleanup
+        MetadataStore rawStore = new ZKMetadataStore(zks.getConnectionString(),
+                MetadataStoreConfig.builder().build(), false);
+
+        String targetUrl = "memory:" + UUID.randomUUID();
+        @Cleanup
+        MetadataStore targetStore = MetadataStoreFactory.create(targetUrl,
+                MetadataStoreConfig.builder().build());
+
+        // The migration is already in PREPARATION phase before the store gets 
created
+        MigrationState preparationState = new 
MigrationState(MigrationPhase.PREPARATION, targetUrl);
+        rawStore.put(MigrationState.MIGRATION_FLAG_PATH,
+                
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(preparationState),
+                Optional.empty()).join();
+
+        @Cleanup
+        DualMetadataStore dualStore = new DualMetadataStore(
+                new ZKMetadataStore(zks.getConnectionString(), 
MetadataStoreConfig.builder().build(), true),
+                MetadataStoreConfig.builder().build());
+
+        // Writes must be blocked while the migration is in progress
+        String path = prefix + "/test-key";
+        byte[] data = "test-data".getBytes(StandardCharsets.UTF_8);
+        try {
+            dualStore.put(path, data, Optional.empty()).join();
+            fail("Should have thrown IllegalStateException");
+        } catch (CompletionException e) {
+            assertTrue(e.getCause() instanceof IllegalStateException);
+        }
+
+        // Even though it missed the PREPARATION notification, the store must 
acknowledge the
+        // preparation (delete its participant node), so that the coordinator 
is not stalled
+        Awaitility.await().untilAsserted(() ->
+                
assertTrue(rawStore.getChildren(MigrationState.PARTICIPANTS_PATH).join().isEmpty()));
+
+        // Complete the migration
+        MigrationState completedState = new 
MigrationState(MigrationPhase.COMPLETED, targetUrl);
+        rawStore.put(MigrationState.MIGRATION_FLAG_PATH,
+                
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(completedState),
+                Optional.empty()).join();
+
+        // Once the store has processed the completion, writes must succeed 
and land in the target store
+        Awaitility.await().ignoreExceptions().untilAsserted(() -> {
+            dualStore.put(path, data, Optional.empty()).join();
+            assertTrue(targetStore.get(path).join().isPresent());
+        });
+
+        Optional<GetResult> targetResult = targetStore.get(path).join();
+        assertTrue(targetResult.isPresent());
+        assertEquals(new String(targetResult.get().getValue(), 
StandardCharsets.UTF_8), "test-data");
+
+        // Reads now come from the target store as well
+        assertTrue(dualStore.get(path).join().isPresent());
+
+        // The key must not have been written to the source store
+        assertFalse(rawStore.get(path).join().isPresent());
+    }
 }
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
index 6774d166b77..2a6b99bdcca 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MigrationCoordinatorTest.java
@@ -21,8 +21,11 @@ package org.apache.pulsar.metadata;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Optional;
 import lombok.Cleanup;
 import lombok.CustomLog;
@@ -32,10 +35,12 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.coordination.impl.MigrationCoordinator;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -298,4 +303,100 @@ public class MigrationCoordinatorTest extends 
BaseMetadataStoreTest {
                 || state.getPhase() == MigrationPhase.COPYING
                 || state.getPhase() == MigrationPhase.COMPLETED);
     }
+
+    @Test
+    public void testRetryAfterFailedMigration() throws Exception {
+        String prefix = newKey();
+
+        @Cleanup
+        MetadataStoreExtended sourceStore =
+                (MetadataStoreExtended) 
MetadataStoreFactory.create(zks.getConnectionString(),
+                        MetadataStoreConfig.builder().build());
+
+        // Raw store handle for writing/reading the migration flag 
deterministically, bypassing the
+        // DualMetadataStore phase-based routing
+        @Cleanup
+        MetadataStore rawStore = new ZKMetadataStore(zks.getConnectionString(),
+                MetadataStoreConfig.builder().build(), false);
+
+        String targetUrl = getOxiaServerConnectString();
+
+        @Cleanup
+        MetadataStore targetStore = MetadataStoreFactory.create(targetUrl, 
MetadataStoreConfig.builder().build());
+
+        String key1 = prefix + "/persistent/key1";
+        sourceStore.put(key1, "value1".getBytes(StandardCharsets.UTF_8), 
Optional.empty()).join();
+
+        // Simulate a previously failed migration attempt
+        rawStore.put(MigrationState.MIGRATION_FLAG_PATH,
+                ObjectMapperFactory.getMapper().writer()
+                        .writeValueAsBytes(new 
MigrationState(MigrationPhase.FAILED, targetUrl)),
+                Optional.empty()).join();
+
+        // Re-running the migration must overwrite the leftover FAILED flag 
and complete
+        MigrationCoordinator coordinator = new 
MigrationCoordinator(sourceStore, targetUrl);
+        coordinator.startMigration();
+
+        Optional<GetResult> result = 
rawStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+        assertTrue(result.isPresent());
+        MigrationState state = ObjectMapperFactory.getMapper().reader()
+                .readValue(result.get().getValue(), MigrationState.class);
+        assertEquals(state.getPhase(), MigrationPhase.COMPLETED);
+
+        // Verify the data was copied to the target store
+        Optional<GetResult> target1 = targetStore.get(key1).join();
+        assertTrue(target1.isPresent());
+        assertEquals(new String(target1.get().getValue(), 
StandardCharsets.UTF_8), "value1");
+    }
+
+    @Test
+    public void testStartRejectedWhileMigrationInProgress() throws Exception {
+        @Cleanup
+        MetadataStore sourceStore = new 
ZKMetadataStore(zks.getConnectionString(),
+                MetadataStoreConfig.builder().build(), false);
+
+        String targetUrl = getOxiaServerConnectString();
+
+        for (MigrationPhase phase : List.of(MigrationPhase.PREPARATION, 
MigrationPhase.COPYING,
+                MigrationPhase.COMPLETED)) {
+            sourceStore.put(MigrationState.MIGRATION_FLAG_PATH,
+                    ObjectMapperFactory.getMapper().writer()
+                            .writeValueAsBytes(new MigrationState(phase, 
targetUrl)),
+                    Optional.empty()).join();
+
+            MigrationCoordinator coordinator = new 
MigrationCoordinator(sourceStore, targetUrl);
+            expectThrows(MetadataStoreException.class, 
coordinator::startMigration);
+
+            // The existing migration flag must be left untouched (in 
particular, not marked FAILED)
+            Optional<GetResult> result = 
sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+            assertTrue(result.isPresent());
+            MigrationState state = ObjectMapperFactory.getMapper().reader()
+                    .readValue(result.get().getValue(), MigrationState.class);
+            assertEquals(state.getPhase(), phase);
+        }
+    }
+
+    @Test
+    public void testPreparationTimeoutFailsMigration() throws Exception {
+        @Cleanup
+        MetadataStore sourceStore = new 
ZKMetadataStore(zks.getConnectionString(),
+                MetadataStoreConfig.builder().build(), false);
+
+        // Simulate a participant that never acknowledges the preparation
+        sourceStore.put(MigrationState.PARTICIPANTS_PATH + "/id-0000000001", 
new byte[0],
+                Optional.empty()).join();
+
+        String targetUrl = getOxiaServerConnectString();
+
+        MigrationCoordinator coordinator =
+                new MigrationCoordinator(sourceStore, targetUrl, 
Duration.ofSeconds(2));
+        expectThrows(MetadataStoreException.class, 
coordinator::startMigration);
+
+        // The migration must have transitioned to FAILED
+        Optional<GetResult> result = 
sourceStore.get(MigrationState.MIGRATION_FLAG_PATH).join();
+        assertTrue(result.isPresent());
+        MigrationState state = ObjectMapperFactory.getMapper().reader()
+                .readValue(result.get().getValue(), MigrationState.class);
+        assertEquals(state.getPhase(), MigrationPhase.FAILED);
+    }
 }

Reply via email to