This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2177b0e44d9 [fix][fn] Fix orphan exclusive producer on creation
timeout in WorkerUtils.createExclusiveProducerWithRetry (#25942)
2177b0e44d9 is described below
commit 2177b0e44d96e87a698c40d47ebb763a9523f2ee
Author: Lari Hotari <[email protected]>
AuthorDate: Fri Jun 5 16:35:22 2026 +0300
[fix][fn] Fix orphan exclusive producer on creation timeout in
WorkerUtils.createExclusiveProducerWithRetry (#25942)
---
.../pulsar/functions/worker/WorkerUtils.java | 11 +++-
.../pulsar/functions/worker/WorkerUtilsTest.java | 69 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 3 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
index aa824829063..605840b28cd 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java
@@ -30,8 +30,8 @@ import java.io.OutputStream;
import java.net.URI;
import java.nio.file.Files;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.CustomLog;
import org.apache.commons.lang3.StringUtils;
@@ -59,6 +59,7 @@ import
org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.proto.MetricsData;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
@@ -413,13 +414,17 @@ public final class WorkerUtils {
int tries = 0;
do {
try {
- return client.newProducer().topic(topic)
+ CompletableFuture<Producer<byte[]>> producerFuture =
client.newProducer().topic(topic)
.accessMode(ProducerAccessMode.Exclusive)
.enableBatching(false)
.blockIfQueueFull(true)
.compressionType(CompressionType.LZ4)
.producerName(producerName)
- .createAsync().get(10, TimeUnit.SECONDS);
+ .createAsync();
+ return FutureUtil.getAndCleanupOnInterrupt(producerFuture,
Producer::closeAsync);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
} catch (Exception e) {
log.info().attr("topic", topic).exception(e)
.log("Encountered exception while creating
exclusive producer");
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
index 3abca76c602..2729e3f6884 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/WorkerUtilsTest.java
@@ -18,26 +18,33 @@
*/
package org.apache.pulsar.functions.worker;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
+import lombok.Cleanup;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.CompressionType;
@@ -110,6 +117,68 @@ public class WorkerUtilsTest {
}
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void
testCreateExclusiveProducerWithRetryClosesProducerOnInterrupt() throws
Exception {
+ Producer<byte[]> producer = mock(Producer.class);
+
when(producer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
+
+ // producer creation stays pending until the test completes it
explicitly
+ CompletableFuture<Producer<byte[]>> producerFuture = new
CompletableFuture<>();
+ CountDownLatch createAsyncCalled = new CountDownLatch(1);
+
+ ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
+ when(builder.topic(anyString())).thenReturn(builder);
+ when(builder.producerName(anyString())).thenReturn(builder);
+ when(builder.enableBatching(anyBoolean())).thenReturn(builder);
+ when(builder.blockIfQueueFull(anyBoolean())).thenReturn(builder);
+
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
+ when(builder.accessMode(any())).thenReturn(builder);
+ when(builder.createAsync()).thenAnswer(invocation -> {
+ createAsyncCalled.countDown();
+ return producerFuture;
+ });
+
+ PulsarClient pulsarClient = mock(PulsarClient.class);
+ when(pulsarClient.newProducer()).thenReturn(builder);
+
+ AtomicReference<Throwable> thrown = new AtomicReference<>();
+ AtomicBoolean interruptStatusPreserved = new AtomicBoolean();
+ @Cleanup("interrupt")
+ Thread caller = new Thread(() -> {
+ try {
+ WorkerUtils.createExclusiveProducerWithRetry(pulsarClient,
"test-topic", "test-producer",
+ () -> true, 0);
+ } catch (Throwable t) {
+ thrown.set(t);
+
interruptStatusPreserved.set(Thread.currentThread().isInterrupted());
+ }
+ });
+ caller.setDaemon(true);
+ caller.start();
+ assertTrue(createAsyncCalled.await(10, TimeUnit.SECONDS));
+
+ // interrupt the caller while it is waiting for the producer to be
created
+ caller.interrupt();
+ caller.join(TimeUnit.SECONDS.toMillis(10));
+ assertThat(caller.isAlive())
+ .as("Interrupt should abort the retry loop instead of
retrying")
+ .isFalse();
+
+ assertThat(thrown.get())
+ .isInstanceOf(RuntimeException.class)
+ .hasCauseInstanceOf(InterruptedException.class);
+ assertThat(interruptStatusPreserved)
+ .as("Interrupt status should be restored")
+ .isTrue();
+
+ // when the pending creation completes after the interrupt, the
producer must be closed so that
+ // the exclusive producer doesn't leak
+ verify(producer, never()).closeAsync();
+ producerFuture.complete(producer);
+ verify(producer, times(1)).closeAsync();
+ }
+
@Test
public void testDLogConfiguration() throws URISyntaxException, IOException
{
// The config yml is seeded with a fake bookie config.