lhotari commented on code in PR #25559:
URL: https://github.com/apache/pulsar/pull/25559#discussion_r3138884874


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java:
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
+
+/**
+ * Manages segment-to-consumer assignments within a single subscription of a 
scalable topic.
+ *
+ * <p>Consumer sessions are persisted in the metadata store (as
+ * {@link org.apache.pulsar.broker.resources.ConsumerRegistration}) and 
tracked in-memory
+ * as {@link ConsumerSession} objects. The distinction is important: the 
session <em>itself</em>
+ * is durable (survives TCP disconnects, client restarts, and controller 
leader failovers),
+ * but the keep-alive tracking (connected / grace-period timer) is in-memory 
only.
+ *
+ * <p>When a consumer's connection drops, the coordinator does <em>not</em> 
immediately evict
+ * it. Instead it marks the session disconnected and starts a grace-period 
timer. If the
+ * consumer reconnects (with the same {@code consumerName}) before the timer 
fires, its
+ * existing assignment is restored with no rebalance. If the timer fires, the 
persisted
+ * registration is deleted and a rebalance is triggered.
+ *
+ * <p>On controller leader failover, the new leader reloads persisted 
registrations via
+ * {@link #restoreConsumers(Collection)}, which installs them in a "just 
disconnected" state
+ * with fresh grace-period timers — giving every consumer the full window to 
reconnect to the
+ * new leader regardless of how long they had been disconnected under the old 
one.
+ */
+public class SubscriptionCoordinator {
+
+    private static final Logger LOG = 
Logger.get(SubscriptionCoordinator.class);
+    private final Logger log;
+
+    // TODO: make configurable via broker config (e.g. 
scalableTopicConsumerSessionTimeoutSeconds)
+    private static final Duration DEFAULT_GRACE_PERIOD = 
Duration.ofSeconds(60);
+
+    @Getter
+    private final String subscriptionName;
+    private final TopicName topicName;
+    private final ScalableTopicResources resources;
+    private final ScheduledExecutorService scheduler;
+    private final Duration gracePeriod;
+
+    /** Keyed by consumerName — the stable session identity. */
+    private final Map<String, ConsumerSession> sessions = new 
ConcurrentHashMap<>();
+    private Map<Long, ConsumerSession> segmentAssignments = new 
LinkedHashMap<>();
+    private SegmentLayout currentLayout;
+
+    public SubscriptionCoordinator(String subscriptionName,
+                                   TopicName topicName,
+                                   SegmentLayout initialLayout,
+                                   ScalableTopicResources resources,
+                                   ScheduledExecutorService scheduler) {
+        this(subscriptionName, topicName, initialLayout, resources, scheduler, 
DEFAULT_GRACE_PERIOD);
+    }
+
+    public SubscriptionCoordinator(String subscriptionName,
+                                   TopicName topicName,
+                                   SegmentLayout initialLayout,
+                                   ScalableTopicResources resources,
+                                   ScheduledExecutorService scheduler,
+                                   Duration gracePeriod) {
+        this.subscriptionName = subscriptionName;
+        this.topicName = topicName;
+        this.currentLayout = initialLayout;
+        this.resources = resources;
+        this.scheduler = scheduler;
+        this.gracePeriod = gracePeriod;
+        this.log = LOG.with().attr("topic", topicName).attr("subscription", 
subscriptionName).build();
+    }
+
+    // --- Register / unregister / reconnect ---
+
+    /**
+     * Register a consumer — either a fresh registration or a reconnect of an 
existing
+     * session. If the {@code consumerName} already has a persisted session, 
its assignment
+     * is preserved and the new connection is attached; otherwise the 
registration is
+     * persisted and a rebalance is triggered.
+     *
+     * @return assignment map for all consumers (unchanged on reconnect, 
recomputed on fresh register)
+     */
+    public synchronized CompletableFuture<Map<ConsumerSession, 
ConsumerAssignment>> registerConsumer(
+            String consumerName, long consumerId, TransportCnx cnx) {
+        ConsumerSession existing = sessions.get(consumerName);
+        if (existing != null) {
+            // Reconnect: attach the new connection, cancel any grace timer, 
and push the
+            // current assignment without rebalancing other consumers.
+            existing.attach(consumerId, cnx);
+            Map<ConsumerSession, ConsumerAssignment> current =
+                    computeAssignment(currentLayout, sessions.values());
+            ConsumerAssignment assignment = current.get(existing);
+            if (assignment != null) {
+                existing.sendAssignmentUpdate(assignment);
+            }
+            return CompletableFuture.completedFuture(current);
+        }
+
+        // Fresh registration — persist first, then install in-memory and 
rebalance.
+        ConsumerSession session = new ConsumerSession(consumerName, 
consumerId, cnx);

Review Comment:
   I added a separate comment in ConsumerSession about encapsulating the timer 
into ConsumerSession class. It would be possible to pass the timer scheduler 
and the callback for expiration when the instance is constructed.
   
   I'm not exactly sure if it makes sense. Maybe not. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to