This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1eda1661b04 MINOR : Added close() in ShareConsumeBenchWorker::stop() 
to handle interrupts cleanly. (#21742)
1eda1661b04 is described below

commit 1eda1661b04e79ce396af7c92c16f0b650a765d4
Author: Shivsundar R <[email protected]>
AuthorDate: Fri Mar 13 08:22:46 2026 -0400

    MINOR : Added close() in ShareConsumeBenchWorker::stop() to handle 
interrupts cleanly. (#21742)
    
    *What*
    The `ShareConsumeBenchWorker` did not handle closing of share-consumers
    cleanly if there was an external interrupt which could result in leaked
    connections/resources.
    PR handles it similar to `ConsumeBenchWorker` where we will close from
    the main thread(which would not have the interrupt flag on) and close
    all consumers before shutting down.
    
    Reviewers: Sushant Mahajan <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../kafka/trogdor/workload/ShareConsumeBenchWorker.java       | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
index dad654ec552..ddf064cd68d 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
@@ -47,8 +47,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -65,6 +67,7 @@ public class ShareConsumeBenchWorker implements TaskWorker {
     private final String id;
     private final ShareConsumeBenchSpec spec;
     private final AtomicBoolean running = new AtomicBoolean(false);
+    private final Queue<ThreadSafeShareConsumer> consumers = new 
ConcurrentLinkedQueue<>();
     private ScheduledExecutorService executor;
     private WorkerStatusTracker workerStatus;
     private StatusUpdater statusUpdater;
@@ -114,7 +117,9 @@ public class ShareConsumeBenchWorker implements TaskWorker {
             Set<String> topics = new HashSet<>(spec.expandTopicNames());
 
             for (int i = 0; i < consumerCount; i++) {
-                tasks.add(new ConsumeMessages(consumer(shareGroup, 
clientId(i)), spec.recordProcessor(), topics));
+                ThreadSafeShareConsumer consumer = consumer(shareGroup, 
clientId(i));
+                consumers.add(consumer);
+                tasks.add(new ConsumeMessages(consumer, 
spec.recordProcessor(), topics));
             }
 
             return tasks;
@@ -425,6 +430,10 @@ public class ShareConsumeBenchWorker implements TaskWorker 
{
         doneFuture.complete("");
         executor.shutdownNow();
         executor.awaitTermination(1, TimeUnit.DAYS);
+        for (ThreadSafeShareConsumer consumer : consumers) {
+            consumer.close();
+        }
+        this.consumers.clear();
         this.executor = null;
         this.statusUpdater = null;
         this.statusUpdaterFuture = null;

Reply via email to