This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1260980f11b [fix] Client-v5: Retry producer send on AlreadyClosed 
across segment seal (#25610)
1260980f11b is described below

commit 1260980f11b49d95ec10474f520efcf2b390e7e6
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Apr 29 04:11:13 2026 -0700

    [fix] Client-v5: Retry producer send on AlreadyClosed across segment seal 
(#25610)
---
 .../client/api/v5/V5ProducerSplitRetryTest.java    | 113 +++++++++++++++++++++
 .../client/impl/v5/ScalableTopicProducer.java      |  23 +++--
 2 files changed, 129 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSplitRetryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSplitRetryTest.java
new file mode 100644
index 00000000000..fb576c0590b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSplitRetryTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the V5 producer's transparent retry across a layout transition.
+ *
+ * <p>The split admin call is synchronous server-side, but the V5 client's DAG 
watch is
+ * async — the producer's view of the segment layout doesn't update until the 
watch
+ * delivers the new revision. A send issued in this window targets the 
now-sealed parent
+ * segment and the v4 layer surfaces either {@code TopicTerminated} (if the 
broker
+ * replies to the still-open producer) or {@code AlreadyClosed} (if the v4 
producer
+ * noticed the seal first). In either case the V5 producer must drop the stale
+ * per-segment producer, wait briefly for the layout watch to catch up, and 
re-route to
+ * an active child — without surfacing the failure to the application.
+ *
+ * <p>This is the no-wait counterpart to {@link V5SegmentSplitTest}, which 
also exercises
+ * a split mid-flow but with an explicit Awaitility wait before the post-split 
sends.
+ */
+public class V5ProducerSplitRetryTest extends V5ClientBaseTest {
+
+    @Test
+    public void testSendImmediatelyAfterSplitTransparentlyRetries() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("split-retry-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Pre-split batch: lands on the only initial segment, opens the 
per-segment v4
+        // producer that will become stale on split.
+        int firstBatch = 20;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < firstBatch; i++) {
+            String v = "before-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Find and split the active segment.
+        long activeSegmentId = -1;
+        var meta = admin.scalableTopics().getMetadata(topic);
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                activeSegmentId = seg.getSegmentId();
+                break;
+            }
+        }
+        assertTrue(activeSegmentId >= 0, "expected exactly one active segment 
before split");
+
+        admin.scalableTopics().splitSegment(topic, activeSegmentId);
+
+        // Critical: do NOT wait for the V5 client's DAG watch to converge. 
The very next
+        // send will target the now-sealed parent's per-segment v4 producer, 
which by now
+        // has been closed by the broker — the V5 producer must observe 
AlreadyClosed (or
+        // TopicTerminated), invalidate the stale entry, give the watch a 
moment to
+        // deliver the new layout, and re-route to a child. From the 
application side,
+        // every send must succeed.
+        int secondBatch = 20;
+        for (int i = 0; i < secondBatch; i++) {
+            String v = "after-" + i;
+            producer.newMessage().key("k-after-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Drain everything via the consumer.
+        Set<String> received = new HashSet<>();
+        int total = firstBatch + secondBatch;
+        for (int i = 0; i < total; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+            assertNotNull(msg, "missed message #" + i + " (received so far: " 
+ received.size() + ")");
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+
+        assertEquals(received.size(), total, "expected " + total + " distinct 
messages");
+        assertEquals(received, sent, "received set must equal sent set across 
the split");
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 8331b4d2fa9..01fe8b0d3d1 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -160,12 +160,17 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
                         eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters)
                         .send();
                 return new MessageIdV5(v4MsgId, segmentId);
-            } catch 
(org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException e) 
{
-                // Segment was terminated (split/merge) — remove stale 
producer and retry
-                // with the updated layout (which should arrive via 
DagWatchClient)
+            } catch 
(org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException
+                     | 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException e) {
+                // The segment was sealed (split/merge). We may observe this 
either as
+                // TopicTerminated (broker reply to a still-open producer) or 
AlreadyClosed
+                // (the v4 producer noticed first and shut itself down). 
Either way, drop
+                // the stale per-segment producer and retry — the DAG watch 
will deliver
+                // the new layout shortly, and routeMessage on the next 
attempt will land
+                // on an active child.
                 log.info().attr("segmentId", segmentId)
                         .attr("attempt", attempt + 1)
-                        .log("Segment terminated, waiting for layout update");
+                        .log("Segment sealed, waiting for layout update");
                 segmentProducers.remove(segmentId);
                 try {
                     Thread.sleep(100L * (attempt + 1));
@@ -221,10 +226,14 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
                 .exceptionallyCompose(ex -> {
                     Throwable cause = ex instanceof 
java.util.concurrent.CompletionException
                             ? ex.getCause() : ex;
-                    if (cause instanceof 
org.apache.pulsar.client.api.PulsarClientException
-                            .TopicTerminatedException && attempt < 3) {
+                    boolean segmentSealed = cause
+                            instanceof 
org.apache.pulsar.client.api.PulsarClientException
+                                    .TopicTerminatedException
+                            || cause instanceof 
org.apache.pulsar.client.api.PulsarClientException
+                                    .AlreadyClosedException;
+                    if (segmentSealed && attempt < 3) {
                         log.info().attr("segmentId", segmentId)
-                                .attr("attempt", attempt + 1).log("Segment 
terminated, retrying");
+                                .attr("attempt", attempt + 1).log("Segment 
sealed, retrying");
                         segmentProducers.remove(segmentId);
                         return CompletableFuture.supplyAsync(() -> null,
                                 CompletableFuture.delayedExecutor(

Reply via email to