This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1260980f11b [fix] Client-v5: Retry producer send on AlreadyClosed
across segment seal (#25610)
1260980f11b is described below
commit 1260980f11b49d95ec10474f520efcf2b390e7e6
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Apr 29 04:11:13 2026 -0700
[fix] Client-v5: Retry producer send on AlreadyClosed across segment seal
(#25610)
---
.../client/api/v5/V5ProducerSplitRetryTest.java | 113 +++++++++++++++++++++
.../client/impl/v5/ScalableTopicProducer.java | 23 +++--
2 files changed, 129 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSplitRetryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSplitRetryTest.java
new file mode 100644
index 00000000000..fb576c0590b
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSplitRetryTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the V5 producer's transparent retry across a layout transition.
+ *
+ * <p>The split admin call is synchronous server-side, but the V5 client's DAG
watch is
+ * async — the producer's view of the segment layout doesn't update until the
watch
+ * delivers the new revision. A send issued in this window targets the
now-sealed parent
+ * segment and the v4 layer surfaces either {@code TopicTerminated} (if the
broker
+ * replies to the still-open producer) or {@code AlreadyClosed} (if the v4
producer
+ * noticed the seal first). In either case the V5 producer must drop the stale
+ * per-segment producer, wait briefly for the layout watch to catch up, and
re-route to
+ * an active child — without surfacing the failure to the application.
+ *
+ * <p>This is the no-wait counterpart to {@link V5SegmentSplitTest}, which
also exercises
+ * a split mid-flow but with an explicit Awaitility wait before the post-split
sends.
+ */
+public class V5ProducerSplitRetryTest extends V5ClientBaseTest {
+
+ @Test
+ public void testSendImmediatelyAfterSplitTransparentlyRetries() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("split-retry-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Pre-split batch: lands on the only initial segment, opens the
per-segment v4
+ // producer that will become stale on split.
+ int firstBatch = 20;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < firstBatch; i++) {
+ String v = "before-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Find and split the active segment.
+ long activeSegmentId = -1;
+ var meta = admin.scalableTopics().getMetadata(topic);
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ activeSegmentId = seg.getSegmentId();
+ break;
+ }
+ }
+ assertTrue(activeSegmentId >= 0, "expected exactly one active segment
before split");
+
+ admin.scalableTopics().splitSegment(topic, activeSegmentId);
+
+ // Critical: do NOT wait for the V5 client's DAG watch to converge.
The very next
+ // send will target the now-sealed parent's per-segment v4 producer,
which by now
+ // has been closed by the broker — the V5 producer must observe
AlreadyClosed (or
+ // TopicTerminated), invalidate the stale entry, give the watch a
moment to
+ // deliver the new layout, and re-route to a child. From the
application side,
+ // every send must succeed.
+ int secondBatch = 20;
+ for (int i = 0; i < secondBatch; i++) {
+ String v = "after-" + i;
+ producer.newMessage().key("k-after-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Drain everything via the consumer.
+ Set<String> received = new HashSet<>();
+ int total = firstBatch + secondBatch;
+ for (int i = 0; i < total; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(msg, "missed message #" + i + " (received so far: "
+ received.size() + ")");
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+
+ assertEquals(received.size(), total, "expected " + total + " distinct
messages");
+ assertEquals(received, sent, "received set must equal sent set across
the split");
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 8331b4d2fa9..01fe8b0d3d1 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -160,12 +160,17 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters)
.send();
return new MessageIdV5(v4MsgId, segmentId);
- } catch
(org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException e)
{
- // Segment was terminated (split/merge) — remove stale
producer and retry
- // with the updated layout (which should arrive via
DagWatchClient)
+ } catch
(org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException
+ |
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException e) {
+ // The segment was sealed (split/merge). We may observe this
either as
+ // TopicTerminated (broker reply to a still-open producer) or
AlreadyClosed
+ // (the v4 producer noticed first and shut itself down).
Either way, drop
+ // the stale per-segment producer and retry — the DAG watch
will deliver
+ // the new layout shortly, and routeMessage on the next
attempt will land
+ // on an active child.
log.info().attr("segmentId", segmentId)
.attr("attempt", attempt + 1)
- .log("Segment terminated, waiting for layout update");
+ .log("Segment sealed, waiting for layout update");
segmentProducers.remove(segmentId);
try {
Thread.sleep(100L * (attempt + 1));
@@ -221,10 +226,14 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
.exceptionallyCompose(ex -> {
Throwable cause = ex instanceof
java.util.concurrent.CompletionException
? ex.getCause() : ex;
- if (cause instanceof
org.apache.pulsar.client.api.PulsarClientException
- .TopicTerminatedException && attempt < 3) {
+ boolean segmentSealed = cause
+ instanceof
org.apache.pulsar.client.api.PulsarClientException
+ .TopicTerminatedException
+ || cause instanceof
org.apache.pulsar.client.api.PulsarClientException
+ .AlreadyClosedException;
+ if (segmentSealed && attempt < 3) {
log.info().attr("segmentId", segmentId)
- .attr("attempt", attempt + 1).log("Segment
terminated, retrying");
+ .attr("attempt", attempt + 1).log("Segment
sealed, retrying");
segmentProducers.remove(segmentId);
return CompletableFuture.supplyAsync(() -> null,
CompletableFuture.delayedExecutor(