lhotari commented on code in PR #25559: URL: https://github.com/apache/pulsar/pull/25559#discussion_r3138825767
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.resources.ScalableTopicMetadata; +import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.scalable.SegmentInfo; +import org.apache.pulsar.common.scalable.SegmentTopicName; +import org.apache.pulsar.metadata.api.coordination.CoordinationService; +import org.apache.pulsar.metadata.api.coordination.LeaderElection; +import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; + +/** + * Central service managing all scalable topics on this broker. + * + * <p>Lifecycle is tied to {@link BrokerService}. This service handles: + * <ul> + * <li>Creating and deleting scalable topics</li> + * <li>Managing {@link ScalableTopicController} instances for topics this broker coordinates</li> + * <li>Admin operations: split/merge</li> + * </ul> + */ +@Slf4j +public class ScalableTopicService { + + private final BrokerService brokerService; + private final ScalableTopicResources resources; + private final CoordinationService coordinationService; + + /** Active controllers for topics this broker coordinates. */ + private final ConcurrentHashMap<String, ScalableTopicController> controllers = new ConcurrentHashMap<>(); + + public ScalableTopicService(BrokerService brokerService, + ScalableTopicResources resources, + CoordinationService coordinationService) { + this.brokerService = brokerService; + this.resources = resources; + this.coordinationService = coordinationService; + } + + // --- Lifecycle --- + + public void start() { + log.info("ScalableTopicService started"); + } + + public void close() { + log.info("Closing ScalableTopicService, releasing {} controllers", controllers.size()); + controllers.values().forEach(controller -> { + try { + controller.close().join(); + } catch (Exception e) { + log.warn("Error closing controller for topic {}", controller.getTopicName(), e); + } + }); + controllers.clear(); + } + + // --- Controller management --- + + /** + * Get or create a controller for a scalable topic. The controller will attempt + * leader election; only the leader actively coordinates consumers. + */ + public CompletableFuture<ScalableTopicController> getOrCreateController(TopicName topic) { + String key = topic.toString(); + ScalableTopicController existing = controllers.get(key); + if (existing != null) { + return CompletableFuture.completedFuture(existing); + } + + String lockPath = resources.controllerLockPath(topic); + LeaderElection<String> election = coordinationService.getLeaderElection( + String.class, lockPath, state -> onLeaderStateChange(topic, state)); + + ScalableTopicController controller = new ScalableTopicController( + topic, resources, brokerService, election); + controllers.put(key, controller); + + return controller.initialize() + .thenApply(__ -> controller) + .exceptionally(ex -> { + controllers.remove(key); + throw new RuntimeException("Failed to initialize controller for " + topic, ex); + }); + } + + /** + * Release the controller for a topic (e.g., on topic unload). + */ + public CompletableFuture<Void> releaseController(TopicName topic) { + ScalableTopicController controller = controllers.remove(topic.toString()); + if (controller != null) { + return controller.close(); + } + return CompletableFuture.completedFuture(null); + } + + // --- Admin operations --- + + /** + * Create a new scalable topic with the given number of initial segments. + */ + public CompletableFuture<Void> createScalableTopic(TopicName topic, int numInitialSegments) { + return createScalableTopic(topic, numInitialSegments, Map.of()); + } + + public CompletableFuture<Void> createScalableTopic(TopicName topic, int numInitialSegments, + Map<String, String> properties) { + if (topic.getDomain() != TopicDomain.topic) { + return CompletableFuture.failedFuture( + new IllegalArgumentException("Expected topic domain, got: " + topic.getDomain())); + } + if (numInitialSegments < 1) { + return CompletableFuture.failedFuture( + new IllegalArgumentException("numInitialSegments must be >= 1")); + } + + ScalableTopicMetadata metadata = ScalableTopicController.createInitialMetadata( + numInitialSegments, properties); + + return resources.createScalableTopicAsync(topic, metadata) + .thenCompose(__ -> { + // Create underlying persistent topics for each initial segment + CompletableFuture<?>[] segmentFutures = metadata.getSegments().values().stream() + .map(segment -> createUnderlyingSegmentTopic(topic, segment)) + .toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(segmentFutures); + }); + } + + /** + * Delete a scalable topic and all its segment topics. + */ + public CompletableFuture<Void> deleteScalableTopic(TopicName topic) { + return releaseController(topic) + .thenCompose(__ -> resources.getScalableTopicMetadataAsync(topic)) + .thenCompose(optMd -> { + if (optMd.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + ScalableTopicMetadata metadata = optMd.get(); + // Delete all underlying segment topics + CompletableFuture<?>[] deleteFutures = metadata.getSegments().values().stream() + .map(segment -> deleteUnderlyingSegmentTopic(topic, segment)) + .toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(deleteFutures); + }) + .thenCompose(__ -> resources.deleteScalableTopicAsync(topic)); + } + + // --- Internal helpers --- + + private void onLeaderStateChange(TopicName topic, LeaderElectionState state) { + log.info("Leader state change for scalable topic {}: {}", topic, state); + if (state == LeaderElectionState.NoLeader) { + // Try to re-elect + ScalableTopicController controller = controllers.get(topic.toString()); + if (controller != null) { + controller.initialize().exceptionally(ex -> { + log.warn("Failed to re-elect for topic {}", topic, ex); + return null; + }); + } Review Comment: One question came into mind here (not necessary to handle in this PR at all): The concern I have about this is a potential thundering herd issue in the current LeaderElectionImpl code where there's handling for BadVersionException: https://github.com/apache/pulsar/blob/9855b18ce96f05c49961cbc390ea53337658ca1a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java#L238-L240 There's a comment "There was a conflict between 2 participants trying to become leaders at same time. Retry to fetch info on new leader." Does this problem exist? This might apply to Zookeeper since the code in LeaderElectionImpl doesn't use Zookeeper's `sync` which might be required in this case to prevent that stale state isn't returned. Does Oxia have similar visibility issues as Zookeeper (which requires using `sync` in certain cases)? ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import java.util.Objects; +import java.util.concurrent.ScheduledFuture; +import lombok.Getter; +import org.apache.pulsar.broker.service.TransportCnx; + +/** + * In-memory handle for a consumer registered with the controller leader. + * + * <p>Session identity is the stable {@code consumerName} chosen by the client. + * This class wraps the <em>durable</em> portion (persisted via + * {@link org.apache.pulsar.broker.resources.ConsumerRegistration}) with the + * <em>transient</em> keep-alive state: the current transport connection, whether the + * consumer is currently connected, and — when disconnected — a grace-period timer that + * evicts the session if the consumer does not reconnect in time. + * + * <p>The keep-alive fields ({@code connected}, {@code consumerId}, {@code cnx}, + * {@code graceTimer}) are <em>not</em> persisted; they live on the controller leader only + * and are reset to a "just disconnected" state when a new leader takes over. + * + * <p>Equality and hash are based on {@code consumerName} alone so that a reconnection + * with a new protocol-level {@code consumerId} resolves to the same session. + */ +public class ConsumerSession { + + @Getter + private final String consumerName; + + /** Current protocol-level consumer ID. Changes on each reconnect. */ + @Getter + private volatile long consumerId; + + /** Current transport connection. Null when disconnected. */ + @Getter + private volatile TransportCnx cnx; + + /** Whether the consumer is currently connected. */ + @Getter + private volatile boolean connected; + + /** Scheduled eviction task, non-null only while disconnected and within grace period. */ + @Getter + private volatile ScheduledFuture<?> graceTimer; + + public ConsumerSession(String consumerName, long consumerId, TransportCnx cnx) { + this.consumerName = consumerName; + this.consumerId = consumerId; + this.cnx = cnx; + this.connected = cnx != null; + } + + /** + * Create a session for a consumer whose registration was loaded from the metadata store + * on controller leader failover. Starts in "disconnected" state; a grace timer should be + * attached immediately so the consumer is evicted if it does not reconnect in time. + */ + public static ConsumerSession restored(String consumerName) { + return new ConsumerSession(consumerName, -1L, null); + } + + /** + * Attach a new transport connection to this session (reconnect path). Cancels any + * active grace timer and marks the session connected. + */ + public synchronized void attach(long consumerId, TransportCnx cnx) { + this.consumerId = consumerId; + this.cnx = cnx; + this.connected = true; + cancelGraceTimer(); + } + + /** + * Mark the session as disconnected. The caller is responsible for scheduling the + * grace-period eviction task via {@link #setGraceTimer(ScheduledFuture)}. + */ + public synchronized void markDisconnected() { + this.connected = false; + this.cnx = null; + } + + public synchronized void setGraceTimer(ScheduledFuture<?> timer) { + cancelGraceTimer(); + this.graceTimer = timer; + } + + public synchronized void cancelGraceTimer() { + if (graceTimer != null) { + graceTimer.cancel(false); + graceTimer = null; + } + } Review Comment: It seems that it would be cleaner to encapsulate the grace timer implementation details into this class. For example, when creating the session, the SubscriptionCoordinator could provide a callback function which gets called when the grace period expires. ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java: ########## @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import io.github.merlimat.slog.Logger; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.broker.service.TransportCnx; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.scalable.SegmentInfo; +import org.apache.pulsar.common.scalable.SegmentTopicName; + +/** + * Manages segment-to-consumer assignments within a single subscription of a scalable topic. + * + * <p>Consumer sessions are persisted in the metadata store (as + * {@link org.apache.pulsar.broker.resources.ConsumerRegistration}) and tracked in-memory + * as {@link ConsumerSession} objects. The distinction is important: the session <em>itself</em> + * is durable (survives TCP disconnects, client restarts, and controller leader failovers), + * but the keep-alive tracking (connected / grace-period timer) is in-memory only. + * + * <p>When a consumer's connection drops, the coordinator does <em>not</em> immediately evict + * it. Instead it marks the session disconnected and starts a grace-period timer. If the + * consumer reconnects (with the same {@code consumerName}) before the timer fires, its + * existing assignment is restored with no rebalance. If the timer fires, the persisted + * registration is deleted and a rebalance is triggered. + * + * <p>On controller leader failover, the new leader reloads persisted registrations via + * {@link #restoreConsumers(Collection)}, which installs them in a "just disconnected" state + * with fresh grace-period timers — giving every consumer the full window to reconnect to the + * new leader regardless of how long they had been disconnected under the old one. + */ +public class SubscriptionCoordinator { + + private static final Logger LOG = Logger.get(SubscriptionCoordinator.class); + private final Logger log; + + // TODO: make configurable via broker config (e.g. scalableTopicConsumerSessionTimeoutSeconds) + private static final Duration DEFAULT_GRACE_PERIOD = Duration.ofSeconds(60); + + @Getter + private final String subscriptionName; + private final TopicName topicName; + private final ScalableTopicResources resources; + private final ScheduledExecutorService scheduler; + private final Duration gracePeriod; + + /** Keyed by consumerName — the stable session identity. */ + private final Map<String, ConsumerSession> sessions = new ConcurrentHashMap<>(); + private Map<Long, ConsumerSession> segmentAssignments = new LinkedHashMap<>(); + private SegmentLayout currentLayout; + + public SubscriptionCoordinator(String subscriptionName, + TopicName topicName, + SegmentLayout initialLayout, + ScalableTopicResources resources, + ScheduledExecutorService scheduler) { + this(subscriptionName, topicName, initialLayout, resources, scheduler, DEFAULT_GRACE_PERIOD); + } + + public SubscriptionCoordinator(String subscriptionName, + TopicName topicName, + SegmentLayout initialLayout, + ScalableTopicResources resources, + ScheduledExecutorService scheduler, + Duration gracePeriod) { + this.subscriptionName = subscriptionName; + this.topicName = topicName; + this.currentLayout = initialLayout; + this.resources = resources; + this.scheduler = scheduler; + this.gracePeriod = gracePeriod; + this.log = LOG.with().attr("topic", topicName).attr("subscription", subscriptionName).build(); + } + + // --- Register / unregister / reconnect --- + + /** + * Register a consumer — either a fresh registration or a reconnect of an existing + * session. If the {@code consumerName} already has a persisted session, its assignment + * is preserved and the new connection is attached; otherwise the registration is + * persisted and a rebalance is triggered. + * + * @return assignment map for all consumers (unchanged on reconnect, recomputed on fresh register) + */ + public synchronized CompletableFuture<Map<ConsumerSession, ConsumerAssignment>> registerConsumer( + String consumerName, long consumerId, TransportCnx cnx) { + ConsumerSession existing = sessions.get(consumerName); + if (existing != null) { + // Reconnect: attach the new connection, cancel any grace timer, and push the + // current assignment without rebalancing other consumers. + existing.attach(consumerId, cnx); + Map<ConsumerSession, ConsumerAssignment> current = + computeAssignment(currentLayout, sessions.values()); + ConsumerAssignment assignment = current.get(existing); + if (assignment != null) { + existing.sendAssignmentUpdate(assignment); + } + return CompletableFuture.completedFuture(current); + } + + // Fresh registration — persist first, then install in-memory and rebalance. + ConsumerSession session = new ConsumerSession(consumerName, consumerId, cnx); + return resources.registerConsumerAsync(topicName, subscriptionName, consumerName) + .thenApply(__ -> { + synchronized (this) { + sessions.put(consumerName, session); + return rebalanceAndNotify(); + } + }); + } + + /** + * Explicit unregister (consumer asked to leave the subscription). Cancels any pending + * grace timer, deletes the persisted registration, and rebalances. + */ + public synchronized CompletableFuture<Map<ConsumerSession, ConsumerAssignment>> unregisterConsumer( + String consumerName) { + ConsumerSession removed = sessions.remove(consumerName); + if (removed == null) { + return CompletableFuture.completedFuture(snapshotAssignments()); + } + removed.cancelGraceTimer(); + return resources.unregisterConsumerAsync(topicName, subscriptionName, consumerName) + .thenApply(__ -> { + synchronized (this) { + if (sessions.isEmpty()) { + segmentAssignments.clear(); + return Map.<ConsumerSession, ConsumerAssignment>of(); + } + return rebalanceAndNotify(); + } + }); + } + + /** + * Called when a consumer's transport connection drops (not an explicit unregister). + * Marks the session disconnected and schedules an eviction task after the grace period. + * If the consumer reconnects with the same name before the timer fires, the timer is + * cancelled and no rebalance happens. + */ + public synchronized void onConsumerDisconnect(String consumerName) { + ConsumerSession session = sessions.get(consumerName); + if (session == null || !session.isConnected()) { + return; + } + session.markDisconnected(); + log.info().attr("consumer", consumerName) + .attr("gracePeriodSeconds", gracePeriod.toSeconds()) + .log("Consumer disconnected; starting grace period"); + session.setGraceTimer(scheduler.schedule( + () -> evictExpiredConsumer(consumerName), + gracePeriod.toMillis(), TimeUnit.MILLISECONDS)); + } Review Comment: if grace timer gets encapsulated, this could be `session.startGraceTimer()` ########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.resources.ScalableTopicMetadata; +import org.apache.pulsar.broker.resources.ScalableTopicResources; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.scalable.SegmentInfo; +import org.apache.pulsar.common.scalable.SegmentTopicName; +import org.apache.pulsar.metadata.api.coordination.CoordinationService; +import org.apache.pulsar.metadata.api.coordination.LeaderElection; +import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; + +/** + * Central service managing all scalable topics on this broker. + * + * <p>Lifecycle is tied to {@link BrokerService}. This service handles: + * <ul> + * <li>Creating and deleting scalable topics</li> + * <li>Managing {@link ScalableTopicController} instances for topics this broker coordinates</li> + * <li>Admin operations: split/merge</li> + * </ul> + */ +@Slf4j +public class ScalableTopicService { + + private final BrokerService brokerService; + private final ScalableTopicResources resources; + private final CoordinationService coordinationService; + + /** Active controllers for topics this broker coordinates. */ + private final ConcurrentHashMap<String, ScalableTopicController> controllers = new ConcurrentHashMap<>(); + + public ScalableTopicService(BrokerService brokerService, + ScalableTopicResources resources, + CoordinationService coordinationService) { + this.brokerService = brokerService; + this.resources = resources; + this.coordinationService = coordinationService; + } + + // --- Lifecycle --- + + public void start() { + log.info("ScalableTopicService started"); + } + + public void close() { + log.info("Closing ScalableTopicService, releasing {} controllers", controllers.size()); + controllers.values().forEach(controller -> { + try { + controller.close().join(); + } catch (Exception e) { + log.warn("Error closing controller for topic {}", controller.getTopicName(), e); + } + }); + controllers.clear(); + } + Review Comment: Later on we might want to perform this asynchronously in parallel with a configured concurrency level to speed up broker shutdown in phases which don't conflict with each other. I think synchronous close is fine for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
