This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1eda1661b04 MINOR : Added close() in ShareConsumeBenchWorker::stop()
to handle interrupts cleanly. (#21742)
1eda1661b04 is described below
commit 1eda1661b04e79ce396af7c92c16f0b650a765d4
Author: Shivsundar R <[email protected]>
AuthorDate: Fri Mar 13 08:22:46 2026 -0400
MINOR : Added close() in ShareConsumeBenchWorker::stop() to handle
interrupts cleanly. (#21742)
*What*
The `ShareConsumeBenchWorker` did not handle closing of share-consumers
cleanly if there was an external interrupt which could result in leaked
connections/resources.
PR handles it similar to `ConsumeBenchWorker` where we will close from
the main thread(which would not have the interrupt flag on) and close
all consumers before shutting down.
Reviewers: Sushant Mahajan <[email protected]>, Andrew Schofield
<[email protected]>
---
.../kafka/trogdor/workload/ShareConsumeBenchWorker.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
index dad654ec552..ddf064cd68d 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ShareConsumeBenchWorker.java
@@ -47,8 +47,10 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -65,6 +67,7 @@ public class ShareConsumeBenchWorker implements TaskWorker {
private final String id;
private final ShareConsumeBenchSpec spec;
private final AtomicBoolean running = new AtomicBoolean(false);
+ private final Queue<ThreadSafeShareConsumer> consumers = new
ConcurrentLinkedQueue<>();
private ScheduledExecutorService executor;
private WorkerStatusTracker workerStatus;
private StatusUpdater statusUpdater;
@@ -114,7 +117,9 @@ public class ShareConsumeBenchWorker implements TaskWorker {
Set<String> topics = new HashSet<>(spec.expandTopicNames());
for (int i = 0; i < consumerCount; i++) {
- tasks.add(new ConsumeMessages(consumer(shareGroup,
clientId(i)), spec.recordProcessor(), topics));
+ ThreadSafeShareConsumer consumer = consumer(shareGroup,
clientId(i));
+ consumers.add(consumer);
+ tasks.add(new ConsumeMessages(consumer,
spec.recordProcessor(), topics));
}
return tasks;
@@ -425,6 +430,10 @@ public class ShareConsumeBenchWorker implements TaskWorker
{
doneFuture.complete("");
executor.shutdownNow();
executor.awaitTermination(1, TimeUnit.DAYS);
+ for (ThreadSafeShareConsumer consumer : consumers) {
+ consumer.close();
+ }
+ this.consumers.clear();
this.executor = null;
this.statusUpdater = null;
this.statusUpdaterFuture = null;