merlimat commented on code in PR #25559:
URL: https://github.com/apache/pulsar/pull/25559#discussion_r3139484108


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.Objects;
+import java.util.concurrent.ScheduledFuture;
+import lombok.Getter;
+import org.apache.pulsar.broker.service.TransportCnx;
+
+/**
+ * In-memory handle for a consumer registered with the controller leader.
+ *
+ * <p>Session identity is the stable {@code consumerName} chosen by the client.
+ * This class wraps the <em>durable</em> portion (persisted via
+ * {@link org.apache.pulsar.broker.resources.ConsumerRegistration}) with the
+ * <em>transient</em> keep-alive state: the current transport connection, 
whether the
+ * consumer is currently connected, and — when disconnected — a grace-period 
timer that
+ * evicts the session if the consumer does not reconnect in time.
+ *
+ * <p>The keep-alive fields ({@code connected}, {@code consumerId}, {@code 
cnx},
+ * {@code graceTimer}) are <em>not</em> persisted; they live on the controller 
leader only
+ * and are reset to a "just disconnected" state when a new leader takes over.
+ *
+ * <p>Equality and hash are based on {@code consumerName} alone so that a 
reconnection
+ * with a new protocol-level {@code consumerId} resolves to the same session.
+ */
+public class ConsumerSession {
+
+    @Getter
+    private final String consumerName;
+
+    /** Current protocol-level consumer ID. Changes on each reconnect. */
+    @Getter
+    private volatile long consumerId;
+
+    /** Current transport connection. Null when disconnected. */
+    @Getter
+    private volatile TransportCnx cnx;
+
+    /** Whether the consumer is currently connected. */
+    @Getter
+    private volatile boolean connected;
+
+    /** Scheduled eviction task, non-null only while disconnected and within 
grace period. */
+    @Getter
+    private volatile ScheduledFuture<?> graceTimer;
+
+    public ConsumerSession(String consumerName, long consumerId, TransportCnx 
cnx) {
+        this.consumerName = consumerName;
+        this.consumerId = consumerId;
+        this.cnx = cnx;
+        this.connected = cnx != null;
+    }
+
+    /**
+     * Create a session for a consumer whose registration was loaded from the 
metadata store
+     * on controller leader failover. Starts in "disconnected" state; a grace 
timer should be
+     * attached immediately so the consumer is evicted if it does not 
reconnect in time.
+     */
+    public static ConsumerSession restored(String consumerName) {
+        return new ConsumerSession(consumerName, -1L, null);
+    }
+
+    /**
+     * Attach a new transport connection to this session (reconnect path). 
Cancels any
+     * active grace timer and marks the session connected.
+     */
+    public synchronized void attach(long consumerId, TransportCnx cnx) {
+        this.consumerId = consumerId;
+        this.cnx = cnx;
+        this.connected = true;
+        cancelGraceTimer();
+    }
+
+    /**
+     * Mark the session as disconnected. The caller is responsible for 
scheduling the
+     * grace-period eviction task via {@link #setGraceTimer(ScheduledFuture)}.
+     */
+    public synchronized void markDisconnected() {
+        this.connected = false;
+        this.cnx = null;
+    }
+
+    public synchronized void setGraceTimer(ScheduledFuture<?> timer) {
+        cancelGraceTimer();
+        this.graceTimer = timer;
+    }
+
+    public synchronized void cancelGraceTimer() {
+        if (graceTimer != null) {
+            graceTimer.cancel(false);
+            graceTimer = null;
+        }
+    }

Review Comment:
   Good point. Fixing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to