lhotari commented on code in PR #25559:
URL: https://github.com/apache/pulsar/pull/25559#discussion_r3138825767


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
+import org.apache.pulsar.metadata.api.coordination.CoordinationService;
+import org.apache.pulsar.metadata.api.coordination.LeaderElection;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
+
+/**
+ * Central service managing all scalable topics on this broker.
+ *
+ * <p>Lifecycle is tied to {@link BrokerService}. This service handles:
+ * <ul>
+ *   <li>Creating and deleting scalable topics</li>
+ *   <li>Managing {@link ScalableTopicController} instances for topics this 
broker coordinates</li>
+ *   <li>Admin operations: split/merge</li>
+ * </ul>
+ */
+@Slf4j
+public class ScalableTopicService {
+
+    private final BrokerService brokerService;
+    private final ScalableTopicResources resources;
+    private final CoordinationService coordinationService;
+
+    /** Active controllers for topics this broker coordinates. */
+    private final ConcurrentHashMap<String, ScalableTopicController> 
controllers = new ConcurrentHashMap<>();
+
+    public ScalableTopicService(BrokerService brokerService,
+                                ScalableTopicResources resources,
+                                CoordinationService coordinationService) {
+        this.brokerService = brokerService;
+        this.resources = resources;
+        this.coordinationService = coordinationService;
+    }
+
+    // --- Lifecycle ---
+
+    public void start() {
+        log.info("ScalableTopicService started");
+    }
+
+    public void close() {
+        log.info("Closing ScalableTopicService, releasing {} controllers", 
controllers.size());
+        controllers.values().forEach(controller -> {
+            try {
+                controller.close().join();
+            } catch (Exception e) {
+                log.warn("Error closing controller for topic {}", 
controller.getTopicName(), e);
+            }
+        });
+        controllers.clear();
+    }
+
+    // --- Controller management ---
+
+    /**
+     * Get or create a controller for a scalable topic. The controller will 
attempt
+     * leader election; only the leader actively coordinates consumers.
+     */
+    public CompletableFuture<ScalableTopicController> 
getOrCreateController(TopicName topic) {
+        String key = topic.toString();
+        ScalableTopicController existing = controllers.get(key);
+        if (existing != null) {
+            return CompletableFuture.completedFuture(existing);
+        }
+
+        String lockPath = resources.controllerLockPath(topic);
+        LeaderElection<String> election = 
coordinationService.getLeaderElection(
+                String.class, lockPath, state -> onLeaderStateChange(topic, 
state));
+
+        ScalableTopicController controller = new ScalableTopicController(
+                topic, resources, brokerService, election);
+        controllers.put(key, controller);
+
+        return controller.initialize()
+                .thenApply(__ -> controller)
+                .exceptionally(ex -> {
+                    controllers.remove(key);
+                    throw new RuntimeException("Failed to initialize 
controller for " + topic, ex);
+                });
+    }
+
+    /**
+     * Release the controller for a topic (e.g., on topic unload).
+     */
+    public CompletableFuture<Void> releaseController(TopicName topic) {
+        ScalableTopicController controller = 
controllers.remove(topic.toString());
+        if (controller != null) {
+            return controller.close();
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    // --- Admin operations ---
+
+    /**
+     * Create a new scalable topic with the given number of initial segments.
+     */
+    public CompletableFuture<Void> createScalableTopic(TopicName topic, int 
numInitialSegments) {
+        return createScalableTopic(topic, numInitialSegments, Map.of());
+    }
+
+    public CompletableFuture<Void> createScalableTopic(TopicName topic, int 
numInitialSegments,
+                                                        Map<String, String> 
properties) {
+        if (topic.getDomain() != TopicDomain.topic) {
+            return CompletableFuture.failedFuture(
+                    new IllegalArgumentException("Expected topic domain, got: 
" + topic.getDomain()));
+        }
+        if (numInitialSegments < 1) {
+            return CompletableFuture.failedFuture(
+                    new IllegalArgumentException("numInitialSegments must be 
>= 1"));
+        }
+
+        ScalableTopicMetadata metadata = 
ScalableTopicController.createInitialMetadata(
+                numInitialSegments, properties);
+
+        return resources.createScalableTopicAsync(topic, metadata)
+                .thenCompose(__ -> {
+                    // Create underlying persistent topics for each initial 
segment
+                    CompletableFuture<?>[] segmentFutures = 
metadata.getSegments().values().stream()
+                            .map(segment -> 
createUnderlyingSegmentTopic(topic, segment))
+                            .toArray(CompletableFuture[]::new);
+                    return CompletableFuture.allOf(segmentFutures);
+                });
+    }
+
+    /**
+     * Delete a scalable topic and all its segment topics.
+     */
+    public CompletableFuture<Void> deleteScalableTopic(TopicName topic) {
+        return releaseController(topic)
+                .thenCompose(__ -> 
resources.getScalableTopicMetadataAsync(topic))
+                .thenCompose(optMd -> {
+                    if (optMd.isEmpty()) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    ScalableTopicMetadata metadata = optMd.get();
+                    // Delete all underlying segment topics
+                    CompletableFuture<?>[] deleteFutures = 
metadata.getSegments().values().stream()
+                            .map(segment -> 
deleteUnderlyingSegmentTopic(topic, segment))
+                            .toArray(CompletableFuture[]::new);
+                    return CompletableFuture.allOf(deleteFutures);
+                })
+                .thenCompose(__ -> resources.deleteScalableTopicAsync(topic));
+    }
+
+    // --- Internal helpers ---
+
+    private void onLeaderStateChange(TopicName topic, LeaderElectionState 
state) {
+        log.info("Leader state change for scalable topic {}: {}", topic, 
state);
+        if (state == LeaderElectionState.NoLeader) {
+            // Try to re-elect
+            ScalableTopicController controller = 
controllers.get(topic.toString());
+            if (controller != null) {
+                controller.initialize().exceptionally(ex -> {
+                    log.warn("Failed to re-elect for topic {}", topic, ex);
+                    return null;
+                });
+            }

Review Comment:
   One question came into mind here (not necessary to handle in this PR at all):
   
   The concern I have about this is a potential thundering herd issue in the 
current LeaderElectionImpl code where there's handling for BadVersionException: 
https://github.com/apache/pulsar/blob/9855b18ce96f05c49961cbc390ea53337658ca1a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java#L238-L240
   There's a comment "There was a conflict between 2 participants trying to 
become leaders at same time. Retry to fetch info on new leader."
   
   Does this problem exist? This might apply to Zookeeper since the code in 
LeaderElectionImpl doesn't use Zookeeper's `sync` which might be required in 
this case to prevent that stale state isn't returned. Does Oxia have similar 
visibility issues as Zookeeper (which requires using `sync` in certain cases)?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ConsumerSession.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.Objects;
+import java.util.concurrent.ScheduledFuture;
+import lombok.Getter;
+import org.apache.pulsar.broker.service.TransportCnx;
+
+/**
+ * In-memory handle for a consumer registered with the controller leader.
+ *
+ * <p>Session identity is the stable {@code consumerName} chosen by the client.
+ * This class wraps the <em>durable</em> portion (persisted via
+ * {@link org.apache.pulsar.broker.resources.ConsumerRegistration}) with the
+ * <em>transient</em> keep-alive state: the current transport connection, 
whether the
+ * consumer is currently connected, and — when disconnected — a grace-period 
timer that
+ * evicts the session if the consumer does not reconnect in time.
+ *
+ * <p>The keep-alive fields ({@code connected}, {@code consumerId}, {@code 
cnx},
+ * {@code graceTimer}) are <em>not</em> persisted; they live on the controller 
leader only
+ * and are reset to a "just disconnected" state when a new leader takes over.
+ *
+ * <p>Equality and hash are based on {@code consumerName} alone so that a 
reconnection
+ * with a new protocol-level {@code consumerId} resolves to the same session.
+ */
+public class ConsumerSession {
+
+    @Getter
+    private final String consumerName;
+
+    /** Current protocol-level consumer ID. Changes on each reconnect. */
+    @Getter
+    private volatile long consumerId;
+
+    /** Current transport connection. Null when disconnected. */
+    @Getter
+    private volatile TransportCnx cnx;
+
+    /** Whether the consumer is currently connected. */
+    @Getter
+    private volatile boolean connected;
+
+    /** Scheduled eviction task, non-null only while disconnected and within 
grace period. */
+    @Getter
+    private volatile ScheduledFuture<?> graceTimer;
+
+    public ConsumerSession(String consumerName, long consumerId, TransportCnx 
cnx) {
+        this.consumerName = consumerName;
+        this.consumerId = consumerId;
+        this.cnx = cnx;
+        this.connected = cnx != null;
+    }
+
+    /**
+     * Create a session for a consumer whose registration was loaded from the 
metadata store
+     * on controller leader failover. Starts in "disconnected" state; a grace 
timer should be
+     * attached immediately so the consumer is evicted if it does not 
reconnect in time.
+     */
+    public static ConsumerSession restored(String consumerName) {
+        return new ConsumerSession(consumerName, -1L, null);
+    }
+
+    /**
+     * Attach a new transport connection to this session (reconnect path). 
Cancels any
+     * active grace timer and marks the session connected.
+     */
+    public synchronized void attach(long consumerId, TransportCnx cnx) {
+        this.consumerId = consumerId;
+        this.cnx = cnx;
+        this.connected = true;
+        cancelGraceTimer();
+    }
+
+    /**
+     * Mark the session as disconnected. The caller is responsible for 
scheduling the
+     * grace-period eviction task via {@link #setGraceTimer(ScheduledFuture)}.
+     */
+    public synchronized void markDisconnected() {
+        this.connected = false;
+        this.cnx = null;
+    }
+
+    public synchronized void setGraceTimer(ScheduledFuture<?> timer) {
+        cancelGraceTimer();
+        this.graceTimer = timer;
+    }
+
+    public synchronized void cancelGraceTimer() {
+        if (graceTimer != null) {
+            graceTimer.cancel(false);
+            graceTimer = null;
+        }
+    }

Review Comment:
   It seems that it would be cleaner to encapsulate the grace timer 
implementation details into this class. For example, when creating the session, 
the SubscriptionCoordinator could provide a callback function which gets called 
when the grace period expires.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/SubscriptionCoordinator.java:
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import io.github.merlimat.slog.Logger;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.TransportCnx;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
+
+/**
+ * Manages segment-to-consumer assignments within a single subscription of a 
scalable topic.
+ *
+ * <p>Consumer sessions are persisted in the metadata store (as
+ * {@link org.apache.pulsar.broker.resources.ConsumerRegistration}) and 
tracked in-memory
+ * as {@link ConsumerSession} objects. The distinction is important: the 
session <em>itself</em>
+ * is durable (survives TCP disconnects, client restarts, and controller 
leader failovers),
+ * but the keep-alive tracking (connected / grace-period timer) is in-memory 
only.
+ *
+ * <p>When a consumer's connection drops, the coordinator does <em>not</em> 
immediately evict
+ * it. Instead it marks the session disconnected and starts a grace-period 
timer. If the
+ * consumer reconnects (with the same {@code consumerName}) before the timer 
fires, its
+ * existing assignment is restored with no rebalance. If the timer fires, the 
persisted
+ * registration is deleted and a rebalance is triggered.
+ *
+ * <p>On controller leader failover, the new leader reloads persisted 
registrations via
+ * {@link #restoreConsumers(Collection)}, which installs them in a "just 
disconnected" state
+ * with fresh grace-period timers — giving every consumer the full window to 
reconnect to the
+ * new leader regardless of how long they had been disconnected under the old 
one.
+ */
+public class SubscriptionCoordinator {
+
+    private static final Logger LOG = 
Logger.get(SubscriptionCoordinator.class);
+    private final Logger log;
+
+    // TODO: make configurable via broker config (e.g. 
scalableTopicConsumerSessionTimeoutSeconds)
+    private static final Duration DEFAULT_GRACE_PERIOD = 
Duration.ofSeconds(60);
+
+    @Getter
+    private final String subscriptionName;
+    private final TopicName topicName;
+    private final ScalableTopicResources resources;
+    private final ScheduledExecutorService scheduler;
+    private final Duration gracePeriod;
+
+    /** Keyed by consumerName — the stable session identity. */
+    private final Map<String, ConsumerSession> sessions = new 
ConcurrentHashMap<>();
+    private Map<Long, ConsumerSession> segmentAssignments = new 
LinkedHashMap<>();
+    private SegmentLayout currentLayout;
+
+    public SubscriptionCoordinator(String subscriptionName,
+                                   TopicName topicName,
+                                   SegmentLayout initialLayout,
+                                   ScalableTopicResources resources,
+                                   ScheduledExecutorService scheduler) {
+        this(subscriptionName, topicName, initialLayout, resources, scheduler, 
DEFAULT_GRACE_PERIOD);
+    }
+
+    public SubscriptionCoordinator(String subscriptionName,
+                                   TopicName topicName,
+                                   SegmentLayout initialLayout,
+                                   ScalableTopicResources resources,
+                                   ScheduledExecutorService scheduler,
+                                   Duration gracePeriod) {
+        this.subscriptionName = subscriptionName;
+        this.topicName = topicName;
+        this.currentLayout = initialLayout;
+        this.resources = resources;
+        this.scheduler = scheduler;
+        this.gracePeriod = gracePeriod;
+        this.log = LOG.with().attr("topic", topicName).attr("subscription", 
subscriptionName).build();
+    }
+
+    // --- Register / unregister / reconnect ---
+
+    /**
+     * Register a consumer — either a fresh registration or a reconnect of an 
existing
+     * session. If the {@code consumerName} already has a persisted session, 
its assignment
+     * is preserved and the new connection is attached; otherwise the 
registration is
+     * persisted and a rebalance is triggered.
+     *
+     * @return assignment map for all consumers (unchanged on reconnect, 
recomputed on fresh register)
+     */
+    public synchronized CompletableFuture<Map<ConsumerSession, 
ConsumerAssignment>> registerConsumer(
+            String consumerName, long consumerId, TransportCnx cnx) {
+        ConsumerSession existing = sessions.get(consumerName);
+        if (existing != null) {
+            // Reconnect: attach the new connection, cancel any grace timer, 
and push the
+            // current assignment without rebalancing other consumers.
+            existing.attach(consumerId, cnx);
+            Map<ConsumerSession, ConsumerAssignment> current =
+                    computeAssignment(currentLayout, sessions.values());
+            ConsumerAssignment assignment = current.get(existing);
+            if (assignment != null) {
+                existing.sendAssignmentUpdate(assignment);
+            }
+            return CompletableFuture.completedFuture(current);
+        }
+
+        // Fresh registration — persist first, then install in-memory and 
rebalance.
+        ConsumerSession session = new ConsumerSession(consumerName, 
consumerId, cnx);
+        return resources.registerConsumerAsync(topicName, subscriptionName, 
consumerName)
+                .thenApply(__ -> {
+                    synchronized (this) {
+                        sessions.put(consumerName, session);
+                        return rebalanceAndNotify();
+                    }
+                });
+    }
+
+    /**
+     * Explicit unregister (consumer asked to leave the subscription). Cancels 
any pending
+     * grace timer, deletes the persisted registration, and rebalances.
+     */
+    public synchronized CompletableFuture<Map<ConsumerSession, 
ConsumerAssignment>> unregisterConsumer(
+            String consumerName) {
+        ConsumerSession removed = sessions.remove(consumerName);
+        if (removed == null) {
+            return CompletableFuture.completedFuture(snapshotAssignments());
+        }
+        removed.cancelGraceTimer();
+        return resources.unregisterConsumerAsync(topicName, subscriptionName, 
consumerName)
+                .thenApply(__ -> {
+                    synchronized (this) {
+                        if (sessions.isEmpty()) {
+                            segmentAssignments.clear();
+                            return Map.<ConsumerSession, 
ConsumerAssignment>of();
+                        }
+                        return rebalanceAndNotify();
+                    }
+                });
+    }
+
+    /**
+     * Called when a consumer's transport connection drops (not an explicit 
unregister).
+     * Marks the session disconnected and schedules an eviction task after the 
grace period.
+     * If the consumer reconnects with the same name before the timer fires, 
the timer is
+     * cancelled and no rebalance happens.
+     */
+    public synchronized void onConsumerDisconnect(String consumerName) {
+        ConsumerSession session = sessions.get(consumerName);
+        if (session == null || !session.isConnected()) {
+            return;
+        }
+        session.markDisconnected();
+        log.info().attr("consumer", consumerName)
+                .attr("gracePeriodSeconds", gracePeriod.toSeconds())
+                .log("Consumer disconnected; starting grace period");
+        session.setGraceTimer(scheduler.schedule(
+                () -> evictExpiredConsumer(consumerName),
+                gracePeriod.toMillis(), TimeUnit.MILLISECONDS));
+    }

Review Comment:
   if grace timer gets encapsulated, this could be `session.startGraceTimer()`



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.scalable;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.resources.ScalableTopicMetadata;
+import org.apache.pulsar.broker.resources.ScalableTopicResources;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.scalable.SegmentInfo;
+import org.apache.pulsar.common.scalable.SegmentTopicName;
+import org.apache.pulsar.metadata.api.coordination.CoordinationService;
+import org.apache.pulsar.metadata.api.coordination.LeaderElection;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
+
+/**
+ * Central service managing all scalable topics on this broker.
+ *
+ * <p>Lifecycle is tied to {@link BrokerService}. This service handles:
+ * <ul>
+ *   <li>Creating and deleting scalable topics</li>
+ *   <li>Managing {@link ScalableTopicController} instances for topics this 
broker coordinates</li>
+ *   <li>Admin operations: split/merge</li>
+ * </ul>
+ */
+@Slf4j
+public class ScalableTopicService {
+
+    private final BrokerService brokerService;
+    private final ScalableTopicResources resources;
+    private final CoordinationService coordinationService;
+
+    /** Active controllers for topics this broker coordinates. */
+    private final ConcurrentHashMap<String, ScalableTopicController> 
controllers = new ConcurrentHashMap<>();
+
+    public ScalableTopicService(BrokerService brokerService,
+                                ScalableTopicResources resources,
+                                CoordinationService coordinationService) {
+        this.brokerService = brokerService;
+        this.resources = resources;
+        this.coordinationService = coordinationService;
+    }
+
+    // --- Lifecycle ---
+
+    public void start() {
+        log.info("ScalableTopicService started");
+    }
+
+    public void close() {
+        log.info("Closing ScalableTopicService, releasing {} controllers", 
controllers.size());
+        controllers.values().forEach(controller -> {
+            try {
+                controller.close().join();
+            } catch (Exception e) {
+                log.warn("Error closing controller for topic {}", 
controller.getTopicName(), e);
+            }
+        });
+        controllers.clear();
+    }
+

Review Comment:
   Later on we might want to perform this asynchronously in parallel with a 
configured concurrency level to speed up broker shutdown in phases which don't 
conflict with each other. I think synchronous close is fine for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to