merlimat commented on code in PR #25559: URL: https://github.com/apache/pulsar/pull/25559#discussion_r3139484108
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.scalable; + +import java.util.Objects; +import java.util.concurrent.ScheduledFuture; +import lombok.Getter; +import org.apache.pulsar.broker.service.TransportCnx; + +/** + * In-memory handle for a consumer registered with the controller leader. + * + * <p>Session identity is the stable {@code consumerName} chosen by the client. + * This class wraps the <em>durable</em> portion (persisted via + * {@link org.apache.pulsar.broker.resources.ConsumerRegistration}) with the + * <em>transient</em> keep-alive state: the current transport connection, whether the + * consumer is currently connected, and — when disconnected — a grace-period timer that + * evicts the session if the consumer does not reconnect in time. + * + * <p>The keep-alive fields ({@code connected}, {@code consumerId}, {@code cnx}, + * {@code graceTimer}) are <em>not</em> persisted; they live on the controller leader only + * and are reset to a "just disconnected" state when a new leader takes over. + * + * <p>Equality and hash are based on {@code consumerName} alone so that a reconnection + * with a new protocol-level {@code consumerId} resolves to the same session. + */ +public class ConsumerSession { + + @Getter + private final String consumerName; + + /** Current protocol-level consumer ID. Changes on each reconnect. */ + @Getter + private volatile long consumerId; + + /** Current transport connection. Null when disconnected. */ + @Getter + private volatile TransportCnx cnx; + + /** Whether the consumer is currently connected. */ + @Getter + private volatile boolean connected; + + /** Scheduled eviction task, non-null only while disconnected and within grace period. */ + @Getter + private volatile ScheduledFuture<?> graceTimer; + + public ConsumerSession(String consumerName, long consumerId, TransportCnx cnx) { + this.consumerName = consumerName; + this.consumerId = consumerId; + this.cnx = cnx; + this.connected = cnx != null; + } + + /** + * Create a session for a consumer whose registration was loaded from the metadata store + * on controller leader failover. Starts in "disconnected" state; a grace timer should be + * attached immediately so the consumer is evicted if it does not reconnect in time. + */ + public static ConsumerSession restored(String consumerName) { + return new ConsumerSession(consumerName, -1L, null); + } + + /** + * Attach a new transport connection to this session (reconnect path). Cancels any + * active grace timer and marks the session connected. + */ + public synchronized void attach(long consumerId, TransportCnx cnx) { + this.consumerId = consumerId; + this.cnx = cnx; + this.connected = true; + cancelGraceTimer(); + } + + /** + * Mark the session as disconnected. The caller is responsible for scheduling the + * grace-period eviction task via {@link #setGraceTimer(ScheduledFuture)}. + */ + public synchronized void markDisconnected() { + this.connected = false; + this.cnx = null; + } + + public synchronized void setGraceTimer(ScheduledFuture<?> timer) { + cancelGraceTimer(); + this.graceTimer = timer; + } + + public synchronized void cancelGraceTimer() { + if (graceTimer != null) { + graceTimer.cancel(false); + graceTimer = null; + } + } Review Comment: Good point. Fixing it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
