This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ab4aaaa3f6f1e5f7a506d86b6abbb7d76cc311bb Author: Lari Hotari <[email protected]> AuthorDate: Fri Jun 12 08:40:43 2026 +0300 [fix][meta] Keep the leader value in the election cycle and make leader reads authoritative (#26000) (cherry picked from commit c1f2a2b260728f00f0428db38ea5c57339163828) --- .../pulsar/broker/admin/impl/BrokersBase.java | 7 +- .../pulsar/broker/admin/impl/NamespacesBase.java | 71 ++++----- .../broker/loadbalance/LeaderElectionService.java | 10 ++ .../pulsar/broker/namespace/NamespaceService.java | 131 ++++++++++------ .../loadbalance/LeaderElectionServiceTest.java | 4 + .../metadata/api/coordination/LeaderElection.java | 23 ++- .../coordination/impl/LeaderElectionImpl.java | 172 +++++++++++++++------ .../replication/AutoRecoveryMainTest.java | 10 +- .../apache/pulsar/metadata/LeaderElectionTest.java | 155 +++++++++++++++++++ .../coordination/impl/LeaderElectionImplTest.java | 28 ++++ 10 files changed, 464 insertions(+), 147 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 47b78bda806..d165335719c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -128,8 +128,11 @@ public class BrokersBase extends AdminResource { public void getLeaderBroker(@Suspended final AsyncResponse asyncResponse) { validateBothSuperuserAndBrokerOperation(pulsar().getConfig().getClusterName(), pulsar().getBrokerId(), BrokerOperation.GET_LEADER_BROKER) - .thenAccept(__ -> { - LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader() + // The authoritative read: waits for an in-progress leader election to settle + // instead of returning 404 while a re-election is still in flight. + .thenCompose(__ -> pulsar().getLeaderElectionService().readCurrentLeader()) + .thenAccept(leader -> { + LeaderBroker leaderBroker = leader .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Couldn't find leader broker")); BrokerInfo brokerInfo = BrokerInfo.builder() .serviceUrl(leaderBroker.getServiceUrl()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 7b5a4fbb005..044dbdaeb68 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -58,7 +58,6 @@ import org.apache.commons.lang3.mutable.MutableObject; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; -import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; @@ -1352,42 +1351,44 @@ public abstract class NamespacesBase extends AdminResource { if (this.isLeaderBroker()) { return CompletableFuture.completedFuture(null); } - Optional<LeaderBroker> currentLeaderOpt = pulsar().getLeaderElectionService().getCurrentLeader(); - if (currentLeaderOpt.isEmpty()) { - String errorStr = "The current leader is empty."; - log.error(errorStr); - return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, errorStr)); - } - LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get(); - String leaderBrokerId = leaderBroker.getBrokerId(); - return pulsar().getNamespaceService() - .createLookupResult(leaderBrokerId, false, null) - .thenCompose(lookupResult -> { - String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls() - : lookupResult.getLookupData().getHttpUrl(); - if (redirectUrl == null) { - log.error("Redirected broker's service url is not configured"); - return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, - "Redirected broker's service url is not configured.")); - } + // The authoritative read: waits for an in-progress leader election to settle instead of + // failing the request while a re-election is still in flight. + return pulsar().getLeaderElectionService().readCurrentLeader().thenCompose(currentLeaderOpt -> { + if (currentLeaderOpt.isEmpty()) { + String errorStr = "The current leader is empty."; + log.error(errorStr); + return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, errorStr)); + } + String leaderBrokerId = currentLeaderOpt.get().getBrokerId(); + return pulsar().getNamespaceService() + .createLookupResult(leaderBrokerId, false, null) + .thenCompose(lookupResult -> { + String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls() + : lookupResult.getLookupData().getHttpUrl(); + if (redirectUrl == null) { + log.error("Redirected broker's service url is not configured"); + return FutureUtil.failedFuture(new RestException(Response.Status.PRECONDITION_FAILED, + "Redirected broker's service url is not configured.")); + } - try { - URL url = new URL(redirectUrl); - URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost()) - .port(url.getPort()) - .replaceQueryParam("authoritative", - false).build(); - // Redirect - if (log.isDebugEnabled()) { - log.debug("Redirecting the request call to leader - {}", redirect); + try { + URL url = new URL(redirectUrl); + URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost()) + .port(url.getPort()) + .replaceQueryParam("authoritative", + false).build(); + // Redirect + if (log.isDebugEnabled()) { + log.debug("Redirecting the request call to leader - {}", redirect); + } + return FutureUtil.failedFuture(( + new WebApplicationException(Response.temporaryRedirect(redirect).build()))); + } catch (MalformedURLException exception) { + log.error("The redirect url is malformed - {}", redirectUrl); + return FutureUtil.failedFuture(new RestException(exception)); } - return FutureUtil.failedFuture(( - new WebApplicationException(Response.temporaryRedirect(redirect).build()))); - } catch (MalformedURLException exception) { - log.error("The redirect url is malformed - {}", redirectUrl); - return FutureUtil.failedFuture(new RestException(exception)); - } - }); + }); + }); } public CompletableFuture<Void> setNamespaceBundleAffinityAsync(String bundleRange, String destinationBroker) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java index 2e53b54e98f..21f67bd6b36 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java @@ -56,10 +56,20 @@ public class LeaderElectionService implements AutoCloseable { leaderElection.close(); } + /** + * Authoritative read of the current leader: if a leader election is in progress, the returned + * future completes once it settles (bounded by the default metadata operation timeout). Use + * this whenever a decision is made based on who the leader is. + */ public CompletableFuture<Optional<LeaderBroker>> readCurrentLeader() { return leaderElection.getLeaderValue(); } + /** + * Non-blocking snapshot of the current leader; empty while a re-election is settling even + * though a leader may technically exist. Only suitable for best-effort uses such as logging — + * decision-making callers must use {@link #readCurrentLeader()}. + */ public Optional<LeaderBroker> getCurrentLeader() { return leaderElection.getLeaderValueIfPresent(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 44988d8fd5c..8a14b16f373 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -61,7 +61,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.loadbalance.ResourceUnit; @@ -561,7 +560,6 @@ public class NamespaceService implements AutoCloseable { private void searchForCandidateBroker(NamespaceBundle bundle, CompletableFuture<Optional<LookupResult>> lookupFuture, LookupOptions options) { - String candidateBroker; LeaderElectionService les = pulsar.getLeaderElectionService(); if (les == null) { LOG.warn("The leader election has not yet been completed! NamespaceBundle[{}]", bundle); @@ -570,67 +568,98 @@ public class NamespaceService implements AutoCloseable { return; } - boolean authoritativeRedirect = les.isLeader(); + selectCandidateBroker(bundle, options, les) + .thenAcceptAsync(selection -> { + if (selection.isEmpty()) { + LOG.warn("Load manager didn't return any available broker. " + + "Returning empty result to lookup. NamespaceBundle[{}]", + bundle); + lookupFuture.complete(Optional.empty()); + return; + } + acquireOwnershipOrRedirect(bundle, options, selection.get(), lookupFuture); + }, pulsar.getExecutor()) + .exceptionally(e -> { + LOG.warn("Error when searching for candidate broker to acquire {}: {}", bundle, e.getMessage(), e); + lookupFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e)); + return null; + }); + } - try { - // check if this is Heartbeat or SLAMonitor namespace - candidateBroker = getHeartbeatOrSLAMonitorBrokerId(bundle, cb -> - CompletableFuture.completedFuture(isBrokerActive(cb))) - .get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS); + /** The broker selected for a bundle assignment, and whether the redirect to it is authoritative. */ + private record CandidateBrokerSelection(String candidateBroker, boolean authoritativeRedirect) { } - if (candidateBroker == null) { - Optional<LeaderBroker> currentLeader = pulsar.getLeaderElectionService().getCurrentLeader(); + private CompletableFuture<Optional<CandidateBrokerSelection>> selectCandidateBroker( + NamespaceBundle bundle, LookupOptions options, LeaderElectionService les) { + boolean authoritativeRedirect = les.isLeader(); - if (options.isAuthoritative()) { - // leader broker already assigned the current broker as owner - candidateBroker = pulsar.getBrokerId(); - } else { + // check if this is Heartbeat or SLAMonitor namespace + return getHeartbeatOrSLAMonitorBrokerId(bundle, cb -> + CompletableFuture.completedFuture(isBrokerActive(cb))) + .thenComposeAsync(heartbeatOrSlaBroker -> { + if (heartbeatOrSlaBroker != null) { + return completedSelection(heartbeatOrSlaBroker, authoritativeRedirect); + } + if (options.isAuthoritative()) { + // leader broker already assigned the current broker as owner + return completedSelection(pulsar.getBrokerId(), authoritativeRedirect); + } LoadManager loadManager = this.loadManager.get(); - boolean makeLoadManagerDecisionOnThisBroker = !loadManager.isCentralized() || les.isLeader(); - if (!makeLoadManagerDecisionOnThisBroker) { - // If leader is not active, fallback to pick the least loaded from current broker loadmanager + if (!loadManager.isCentralized() || les.isLeader()) { + return selectLeastLoadedBroker(bundle); + } + // The load manager decision belongs to the leader: read the leader + // authoritatively (waits for an in-progress election to settle) instead of + // acting on a possibly-empty snapshot during a leadership handoff. + return les.readCurrentLeader().thenComposeAsync(currentLeader -> { boolean leaderBrokerActive = currentLeader.isPresent() && isBrokerActive(currentLeader.get().getBrokerId()); - if (!leaderBrokerActive) { - makeLoadManagerDecisionOnThisBroker = true; - if (currentLeader.isEmpty()) { - LOG.warn( - "The information about the current leader broker wasn't available. " - + "Handling load manager decisions in a decentralized way. " - + "NamespaceBundle[{}]", - bundle); - } else { - LOG.warn( - "The current leader broker {} isn't active. " - + "Handling load manager decisions in a decentralized way. " - + "NamespaceBundle[{}]", - currentLeader.get(), bundle); - } + if (leaderBrokerActive) { + // forward to leader broker to make assignment + return completedSelection(currentLeader.get().getBrokerId(), authoritativeRedirect); } - } - if (makeLoadManagerDecisionOnThisBroker) { - Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle); - if (availableBroker.isEmpty()) { - LOG.warn("Load manager didn't return any available broker. " - + "Returning empty result to lookup. NamespaceBundle[{}]", + // If leader is not active, fallback to pick the least loaded from current broker loadmanager + if (currentLeader.isEmpty()) { + LOG.warn( + "The information about the current leader broker wasn't available. " + + "Handling load manager decisions in a decentralized way. " + + "NamespaceBundle[{}]", bundle); - lookupFuture.complete(Optional.empty()); - return; + } else { + LOG.warn( + "The current leader broker {} isn't active. " + + "Handling load manager decisions in a decentralized way. " + + "NamespaceBundle[{}]", + currentLeader.get(), bundle); } - candidateBroker = availableBroker.get(); - authoritativeRedirect = true; - } else { - // forward to leader broker to make assignment - candidateBroker = currentLeader.get().getBrokerId(); - } - } - } + return selectLeastLoadedBroker(bundle); + }, pulsar.getExecutor()); + }, pulsar.getExecutor()); + } + + private static CompletableFuture<Optional<CandidateBrokerSelection>> completedSelection( + String candidateBroker, boolean authoritativeRedirect) { + return CompletableFuture.completedFuture( + Optional.of(new CandidateBrokerSelection(candidateBroker, authoritativeRedirect))); + } + + // The decentralized decision is authoritative: this broker picked the owner itself. + private CompletableFuture<Optional<CandidateBrokerSelection>> selectLeastLoadedBroker(NamespaceBundle bundle) { + Optional<String> availableBroker; + try { + availableBroker = getLeastLoadedFromLoadManager(bundle); } catch (Exception e) { - LOG.warn("Error when searching for candidate broker to acquire {}: {}", bundle, e.getMessage(), e); - lookupFuture.completeExceptionally(e); - return; + return CompletableFuture.failedFuture(e); } + return CompletableFuture.completedFuture( + availableBroker.map(broker -> new CandidateBrokerSelection(broker, true))); + } + private void acquireOwnershipOrRedirect(NamespaceBundle bundle, LookupOptions options, + CandidateBrokerSelection selection, + CompletableFuture<Optional<LookupResult>> lookupFuture) { + final String candidateBroker = selection.candidateBroker(); + final boolean authoritativeRedirect = selection.authoritativeRedirect(); try { Objects.requireNonNull(candidateBroker); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java index 3014c185cc8..a54a492144b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.loadbalance; import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs; import com.google.common.collect.Sets; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -113,6 +114,9 @@ public class LeaderElectionServiceTest { leaderBrokerReference.get() != null); Mockito.when(leaderElectionService.getCurrentLeader()) .thenAnswer(invocation -> Optional.ofNullable(leaderBrokerReference.get())); + Mockito.when(leaderElectionService.readCurrentLeader()) + .thenAnswer(invocation -> + CompletableFuture.completedFuture(Optional.ofNullable(leaderBrokerReference.get()))); leaderElectionServiceReference.set(leaderElectionService); // broker, webService and leaderElectionService is started, but elect not ready; diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java index 016b7d061d0..4b34815c558 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/coordination/LeaderElection.java @@ -45,19 +45,30 @@ public interface LeaderElection<T> extends AutoCloseable { LeaderElectionState getState(); /** - * Get the value set by the elected leader, or empty if there's currently no leader. + * Get the value set by the elected leader. + * <p> + * This is the authoritative read: if a leader election is currently in progress (e.g. the + * previous leader's node was just deleted and the participants are re-electing), the returned + * future completes once the election has settled, with the newly determined leader value. The + * future completes exceptionally with a {@link java.util.concurrent.TimeoutException} if the + * election does not complete within the default metadata operation timeout. + * <p> + * An instance that never participated in the election (no {@link #elect(Object)} call) reads + * the leader value directly from the metadata store. A closed instance does not wait: it + * reports an empty leader if it held the leadership when closed, or its last known view + * otherwise. * * @return a future that will track the completion of the operation */ CompletableFuture<Optional<T>> getLeaderValue(); /** - * Get the value set by the elected leader, or empty if there's currently no leader. + * Get a non-blocking snapshot of the value set by the elected leader, or empty if no leader is + * known right now. * <p> - * The call is non blocking and in certain cases can return <code>Optional.empty()</code> even though a leader is - * technically elected. - * - * @return a future that will track the completion of the operation + * The snapshot can return <code>Optional.empty()</code> even though a leader is technically + * elected (for example while a re-election is still settling). Callers that need the + * authoritative leader must use {@link #getLeaderValue()} instead. */ Optional<T> getLeaderValueIfPresent(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java index ab35eb7040c..954bcfaf45a 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java @@ -20,20 +20,18 @@ package org.apache.pulsar.metadata.coordination.impl; import com.fasterxml.jackson.databind.type.TypeFactory; import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; import java.util.EnumSet; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.GetResult; -import org.apache.pulsar.metadata.api.MetadataCache; -import org.apache.pulsar.metadata.api.MetadataCacheConfig; import org.apache.pulsar.metadata.api.MetadataSerde; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.MetadataStoreException.AlreadyClosedException; @@ -52,14 +50,19 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { private final String path; private final MetadataSerde<T> serde; private final MetadataStoreExtended store; - private final MetadataCache<T> cache; private final Consumer<LeaderElectionState> stateChangesListener; - private final ScheduledFuture<?> updateCachedValueFuture; private LeaderElectionState leaderElectionState; private Optional<Long> version = Optional.empty(); private Optional<T> proposedValue; + // The leader value as known by the election cycle (the leader can only change through an + // election cycle). Pending while no leader is known — election in progress or the leader node + // deleted — and completed with the leader value once the election settles. Readers of + // getLeaderValue() wait on it (bounded by leaderElectionCompletionTimeoutSeconds); + // getLeaderValueIfPresent() takes a non-blocking snapshot of it. + private CompletableFuture<Optional<T>> currentLeaderFuture = new CompletableFuture<>(); + private final ScheduledExecutorService executor; private final FutureUtil.Sequencer<Void> sequencer; @@ -71,16 +74,21 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { private static final int LEADER_ELECTION_RETRY_DELAY_SECONDS = 5; + // Upper bound for getLeaderValue() waiting on an election that never settles, aligned with the + // default metadata-store operation timeout (the broker's metadataStoreOperationTimeoutSeconds). + private volatile int leaderElectionCompletionTimeoutSeconds = 30; + + @VisibleForTesting + void setLeaderElectionCompletionTimeoutSeconds(int leaderElectionCompletionTimeoutSeconds) { + this.leaderElectionCompletionTimeoutSeconds = leaderElectionCompletionTimeoutSeconds; + } + LeaderElectionImpl(MetadataStoreExtended store, Class<T> clazz, String path, Consumer<LeaderElectionState> stateChangesListener, ScheduledExecutorService executor) { this.path = path; this.serde = new JSONMetadataSerdeSimpleType<>(TypeFactory.defaultInstance().constructSimpleType(clazz, null)); this.store = store; - MetadataCacheConfig metadataCacheConfig = MetadataCacheConfig.builder() - .expireAfterWriteMillis(-1L) - .build(); - this.cache = store.getMetadataCache(clazz, metadataCacheConfig); this.leaderElectionState = LeaderElectionState.NoLeader; this.internalState = InternalState.Init; this.stateChangesListener = stateChangesListener; @@ -88,13 +96,38 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { this.sequencer = FutureUtil.Sequencer.create(); store.registerListener(this::handlePathNotification); store.registerSessionListener(this::handleSessionNotification); - updateCachedValueFuture = executor.scheduleWithFixedDelay(this::getLeaderValue, - metadataCacheConfig.getRefreshAfterWriteMillis() / 2, - metadataCacheConfig.getRefreshAfterWriteMillis(), TimeUnit.MILLISECONDS); + } + + /** + * Record the leader value determined by the election cycle, waking up any getLeaderValue() + * callers waiting for the election to settle. + */ + private synchronized void leaderKnown(Optional<T> leaderValue) { + if (currentLeaderFuture.isDone()) { + currentLeaderFuture = CompletableFuture.completedFuture(leaderValue); + } else { + currentLeaderFuture.complete(leaderValue); + } + } + + /** + * Mark the leader as unknown (the leader node was deleted) so getLeaderValue() callers wait for + * the next election cycle to settle instead of observing a stale value. + */ + private synchronized void leaderUnknown() { + if (currentLeaderFuture.isDone()) { + currentLeaderFuture = new CompletableFuture<>(); + } } @Override public synchronized CompletableFuture<LeaderElectionState> elect(T proposedValue) { + if (internalState == InternalState.Closed) { + // Reopened after close() (e.g. the broker's LeaderElectionService is close()d and then + // start()ed again): reset so a fresh election cycle runs and readers wait for it. + leaderElectionState = LeaderElectionState.NoLeader; + currentLeaderFuture = new CompletableFuture<>(); + } if (leaderElectionState != LeaderElectionState.NoLeader) { return CompletableFuture.completedFuture(leaderElectionState); } @@ -112,12 +145,6 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { } else { return tryToBecomeLeader(); } - }).thenCompose(leaderElectionState -> { - // make sure that the cache contains the current leader - // so that getLeaderValueIfPresent works on all brokers - cache.refresh(path); - return cache.get(path) - .thenApply(__ -> leaderElectionState); }); } @@ -137,6 +164,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { log.info("Keeping the existing value {} for {} as it's from the same session stat={}", existingValue, path, res.getStat()); // The value is still valid because it was created in the same session + leaderKnown(Optional.of(existingValue)); changeState(LeaderElectionState.Leading); return CompletableFuture.completedFuture(LeaderElectionState.Leading); } else { @@ -158,6 +186,7 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { } // If the existing value is different, it means there's already another leader + leaderKnown(Optional.of(existingValue)); changeState(LeaderElectionState.Following); return CompletableFuture.completedFuture(LeaderElectionState.Following); } @@ -188,35 +217,18 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { .thenAccept(stat -> { synchronized (LeaderElectionImpl.this) { if (internalState == InternalState.ElectionInProgress) { - // Do a get() in order to force a notification later, if the z-node disappears - cache.get(path) - .thenRun(() -> { - synchronized (LeaderElectionImpl.this) { - log.info("Acquired leadership on {} with {}", path, value); - internalState = InternalState.LeaderIsPresent; - if (leaderElectionState != LeaderElectionState.Leading) { - leaderElectionState = LeaderElectionState.Leading; - try { - stateChangesListener.accept(leaderElectionState); - } catch (Throwable t) { - log.warn("Exception in state change listener", t); - } - } - result.complete(leaderElectionState); - } - }).exceptionally(ex -> { - // We fail to do the get(), so clean up the leader election fail the whole - // operation - log.warn("Failed to get the current state after acquiring leadership on {}. " - + " Conditionally deleting current entry.", path, ex); - store.delete(path, Optional.of(stat.getVersion())) - .thenRun(() -> result.completeExceptionally(ex)) - .exceptionally(ex2 -> { - result.completeExceptionally(ex2); - return null; - }); - return null; - }); + log.info("Acquired leadership on {} with {}", path, value); + internalState = InternalState.LeaderIsPresent; + leaderKnown(Optional.of(value)); + if (leaderElectionState != LeaderElectionState.Leading) { + leaderElectionState = LeaderElectionState.Leading; + try { + stateChangesListener.accept(leaderElectionState); + } catch (Throwable t) { + log.warn("Exception in state change listener", t); + } + } + result.complete(leaderElectionState); } else { log.info("Leadership on {} with value {} was lost. " + "Conditionally deleting entry with stat={}.", path, value, stat); @@ -254,7 +266,6 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { @Override public void close() throws Exception { - updateCachedValueFuture.cancel(true); try { asyncClose().join(); } catch (CompletionException e) { @@ -269,6 +280,12 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { } internalState = InternalState.Closed; + // A closed election reports "no leader" rather than waiting or failing: callers like the + // extensible load manager's handleNoChannelOwnerError() key off the resulting + // "no channel owner" condition to restart the election. + if (!currentLeaderFuture.isDone()) { + currentLeaderFuture.complete(Optional.empty()); + } if (leaderElectionState != LeaderElectionState.Leading) { return CompletableFuture.completedFuture(null); @@ -278,6 +295,9 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { .thenAccept(__ -> { synchronized (LeaderElectionImpl.this) { leaderElectionState = LeaderElectionState.NoLeader; + // The deleted leader node was ours and a closed instance no longer + // observes elections; don't keep reporting ourselves as leader. + currentLeaderFuture = CompletableFuture.completedFuture(Optional.empty()); } } ); @@ -290,12 +310,61 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { @Override public CompletableFuture<Optional<T>> getLeaderValue() { - return cache.get(path); + CompletableFuture<Optional<T>> future; + synchronized (this) { + if (internalState == InternalState.Init) { + // This instance never participated in the election (a pure observer, e.g. + // BookKeeper's MetadataDrivers helpers querying the current auditor): there is no + // local election cycle to wait for, so the store content is the authoritative + // answer. + return readLeaderValueFromStore(); + } + future = currentLeaderFuture; + } + if (future.isDone()) { + // Hand out a derived future so callers cannot complete the internal one. + return future.thenApply(value -> value); + } + int timeoutSeconds = leaderElectionCompletionTimeoutSeconds; + return FutureUtil.addTimeoutHandling(whenLeaderKnown(future), + Duration.ofSeconds(timeoutSeconds), executor, + () -> FutureUtil.createTimeoutException( + "Leader election on path " + path + " did not complete within " + + timeoutSeconds + " seconds", + LeaderElectionImpl.class, "getLeaderValue()")); + } + + private CompletableFuture<Optional<T>> readLeaderValueFromStore() { + return store.get(path).thenApply(optRes -> optRes.map(res -> { + try { + return serde.deserialize(path, res.getValue(), res.getStat()); + } catch (Throwable t) { + throw new CompletionException(t); + } + })); + } + + // Track the internal future without exposing it: completing/cancelling the returned future + // (e.g. by the timeout handling) must not complete the election's own future. + private CompletableFuture<Optional<T>> whenLeaderKnown(CompletableFuture<Optional<T>> future) { + CompletableFuture<Optional<T>> result = new CompletableFuture<>(); + future.whenComplete((value, ex) -> { + if (ex != null) { + result.completeExceptionally(ex); + } else { + result.complete(value); + } + }); + return result; } @Override public Optional<T> getLeaderValueIfPresent() { - return cache.getIfCached(path); + CompletableFuture<Optional<T>> future; + synchronized (this) { + future = currentLeaderFuture; + } + return future.isDone() && !future.isCompletedExceptionally() ? future.join() : Optional.empty(); } private void handleSessionNotification(SessionEvent event) { @@ -333,6 +402,9 @@ class LeaderElectionImpl<T> implements LeaderElection<T> { } leaderElectionState = LeaderElectionState.NoLeader; + // The leader is unknown until the re-election below settles; getLeaderValue() + // callers wait for it instead of observing the stale value. + leaderUnknown(); if (proposedValue.isPresent()) { elect() diff --git a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java index 146e4f0dd58..47d5e9f1f79 100644 --- a/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java +++ b/pulsar-metadata/src/test/java/org/apache/bookkeeper/replication/AutoRecoveryMainTest.java @@ -143,12 +143,16 @@ public class AutoRecoveryMainTest extends BookKeeperClusterTestCase { } BookieId currentAuditor = main1.auditorElector.getCurrentAuditor(); assertNotNull(currentAuditor); - Auditor auditor1 = main1.auditorElector.getAuditor(); assertEquals("Current Auditor should be AR1", currentAuditor, BookieImpl.getBookieId(confByIndex(0))); + // getCurrentAuditor() can resolve as soon as the election settles, before the elector + // thread has constructed the Auditor instance — re-read getAuditor() on every poll instead + // of capturing a possibly-null reference once. Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> { - assertNotNull(auditor1); - assertTrue("Auditor of AR1 should be running", auditor1.isRunning()); + Auditor a1 = main1.auditorElector.getAuditor(); + assertNotNull(a1); + assertTrue("Auditor of AR1 should be running", a1.isRunning()); }); + Auditor auditor1 = main1.auditorElector.getAuditor(); /* diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java index 4b48f3c20b0..cb269537383 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LeaderElectionTest.java @@ -18,10 +18,13 @@ */ package org.apache.pulsar.metadata; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import java.util.EnumSet; +import java.util.List; import java.util.Optional; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -35,6 +38,7 @@ import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl; +import org.awaitility.Awaitility; import org.testng.annotations.Test; public class LeaderElectionTest extends BaseMetadataStoreTest { @@ -284,4 +288,155 @@ public class LeaderElectionTest extends BaseMetadataStoreTest { assertEquals(le.getLeaderValue().join(), Optional.of("test-1")); assertEqualsAndRetry(() -> le.getLeaderValueIfPresent(), Optional.of("test-1"), Optional.empty()); } + + @Test(dataProvider = "impl", timeOut = 30000) + public void readsDoNotObserveEmptyLeaderDuringReElection(String provider, Supplier<String> urlSupplier) + throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().fsyncEnable(false).build()); + + String path = newKey(); + + @Cleanup + CoordinationService cs = new CoordinationServiceImpl(store); + + @Cleanup + LeaderElection<String> le = cs.getLeaderElection(String.class, path, __ -> { + }); + + assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading); + + // Externally delete the leader node: the instance re-elects itself. An authoritative read + // issued during the churn either returns the last settled value or waits for the + // re-election to settle — it never observes an empty leader. + store.delete(path, Optional.empty()).join(); + assertEquals(le.getLeaderValue().join(), Optional.of("test-1")); + + Awaitility.await().untilAsserted(() -> { + assertEquals(le.getState(), LeaderElectionState.Leading); + assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1")); + }); + } + + @Test(dataProvider = "zkImpls", timeOut = 30000) + public void followerReadsResolveToTheNewLeaderAfterHandoff(String provider, Supplier<String> urlSupplier) + throws Exception { + @Cleanup + MetadataStoreExtended store1 = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().build()); + @Cleanup + MetadataStoreExtended store2 = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().build()); + + String path = newKey(); + + @Cleanup + CoordinationService cs1 = new CoordinationServiceImpl(store1); + @Cleanup + CoordinationService cs2 = new CoordinationServiceImpl(store2); + + @Cleanup + LeaderElection<String> le1 = cs1.getLeaderElection(String.class, path, __ -> { + }); + @Cleanup + LeaderElection<String> le2 = cs2.getLeaderElection(String.class, path, __ -> { + }); + + assertEquals(le1.elect("test-1").join(), LeaderElectionState.Leading); + assertEquals(le2.elect("test-2").join(), LeaderElectionState.Following); + assertEquals(le2.getLeaderValue().join(), Optional.of("test-1")); + + // The leader hands off: le2 re-elects itself. Authoritative reads during the handoff + // return one of the settled leader values and converge to the new leader, but never + // observe an empty leader. + le1.close(); + List<Optional<String>> observed = new CopyOnWriteArrayList<>(); + Awaitility.await().untilAsserted(() -> { + Optional<String> leader = le2.getLeaderValue().join(); + observed.add(leader); + assertEquals(leader, Optional.of("test-2")); + }); + assertThat(observed) + .as("authoritative reads during the leadership handoff") + .doesNotContain(Optional.empty()); + } + + @Test(dataProvider = "impl", timeOut = 30000) + public void closedLeaderReportsEmptyLeader(String provider, Supplier<String> urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().fsyncEnable(false).build()); + + String path = newKey(); + + @Cleanup + CoordinationService cs = new CoordinationServiceImpl(store); + + LeaderElection<String> le = cs.getLeaderElection(String.class, path, __ -> { + }); + + assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading); + assertEquals(le.getLeaderValue().join(), Optional.of("test-1")); + + // Closing the leader releases the leadership; reads on the closed instance must report an + // empty leader without waiting (recovery paths key off the "no leader" condition). + le.close(); + assertEquals(le.getLeaderValue().join(), Optional.empty()); + assertEquals(le.getLeaderValueIfPresent(), Optional.empty()); + } + + @Test(dataProvider = "impl", timeOut = 30000) + public void electAfterCloseRunsANewElection(String provider, Supplier<String> urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().fsyncEnable(false).build()); + + String path = newKey(); + + @Cleanup + CoordinationService cs = new CoordinationServiceImpl(store); + + @Cleanup + LeaderElection<String> le = cs.getLeaderElection(String.class, path, __ -> { + }); + + assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading); + le.close(); + + // Re-electing on a closed instance reopens it (the broker's LeaderElectionService is + // close()d and start()ed again to force a leadership change). + assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading); + assertEquals(le.getLeaderValue().join(), Optional.of("test-1")); + assertEquals(le.getLeaderValueIfPresent(), Optional.of("test-1")); + } + + @Test(dataProvider = "impl", timeOut = 30000) + public void observerReadsLeaderValueFromStore(String provider, Supplier<String> urlSupplier) throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(), + MetadataStoreConfig.builder().fsyncEnable(false).build()); + + String path = newKey(); + + @Cleanup + CoordinationService cs = new CoordinationServiceImpl(store); + @Cleanup + CoordinationService observerCs = new CoordinationServiceImpl(store); + + @Cleanup + LeaderElection<String> le = cs.getLeaderElection(String.class, path, __ -> { + }); + // The observer never calls elect(): there is no local election cycle to wait for, so the + // authoritative read goes directly to the metadata store, while the snapshot stays empty. + @Cleanup + LeaderElection<String> observer = observerCs.getLeaderElection(String.class, path, __ -> { + }); + + assertEquals(observer.getLeaderValue().join(), Optional.empty()); + + assertEquals(le.elect("test-1").join(), LeaderElectionState.Leading); + assertEquals(observer.getLeaderValue().join(), Optional.of("test-1")); + assertEquals(observer.getLeaderValueIfPresent(), Optional.empty()); + } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java index 09c9d71c41a..4827c4a197a 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImplTest.java @@ -18,7 +18,14 @@ */ package org.apache.pulsar.metadata.coordination.impl; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeoutException; import java.util.function.Supplier; import lombok.Cleanup; import org.apache.pulsar.metadata.BaseMetadataStoreTest; @@ -62,4 +69,25 @@ public class LeaderElectionImplTest extends BaseMetadataStoreTest { }); blockFuture.join(); } + + @Test(timeOut = 20000) + public void getLeaderValueTimesOutWhenElectionNeverCompletes() { + MetadataStoreExtended store = mock(MetadataStoreExtended.class); + // The store never answers, so the election never settles. + when(store.get(anyString())).thenReturn(new CompletableFuture<>()); + + @Cleanup("shutdownNow") + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + LeaderElectionImpl<String> le = new LeaderElectionImpl<>(store, String.class, + "/getLeaderValueTimesOutWhenElectionNeverCompletes", __ -> { + }, executor); + le.setLeaderElectionCompletionTimeoutSeconds(1); + + le.elect("test-1"); + + assertThatThrownBy(() -> le.getLeaderValue().join()) + .hasCauseInstanceOf(TimeoutException.class) + .cause() + .hasMessageContaining("did not complete within"); + } }
