This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7ff975d268f MINOR: Improve shutdown logging for share consumer system 
tests (#21895)
7ff975d268f is described below

commit 7ff975d268f23a201e08cf8095d29ceac83467a5
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Mar 30 16:48:46 2026 +0100

    MINOR: Improve shutdown logging for share consumer system tests (#21895)
    
    This PR aligns the shutdown logging for share consumer system tests to
    match the regular consumer tests (see
    https://github.com/apache/kafka/pull/21586). The aim is to help with
    debugging flaky system tests in the future.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../services/verifiable_share_consumer.py          |  2 ++
 .../org/apache/kafka/tools/VerifiableConsumer.java |  8 +++---
 .../kafka/tools/VerifiableShareConsumer.java       | 31 ++++++++++++++++++++++
 3 files changed, 38 insertions(+), 3 deletions(-)

diff --git a/tests/kafkatest/services/verifiable_share_consumer.py 
b/tests/kafkatest/services/verifiable_share_consumer.py
index eca79cb4106..65f86966237 100644
--- a/tests/kafkatest/services/verifiable_share_consumer.py
+++ b/tests/kafkatest/services/verifiable_share_consumer.py
@@ -183,6 +183,8 @@ class VerifiableShareConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Bac
                         self._update_global_consumed(event)
                     elif name == "record_data" and self.on_record_consumed:
                         self.on_record_consumed(event, node)
+                    elif name == "shutdown_requested":
+                        self.logger.debug("Shutdown has been requested")
                     else:
                         self.logger.debug("%s: ignoring unknown event: %s" % 
(str(node.account), event))
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 25c82c713b1..98b1ac00e2a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -75,15 +75,17 @@ import static 
net.sourceforge.argparse4j.impl.Arguments.storeTrue;
  *
  * <ul>
  * <li>partitions_revoked: outputs the partitions revoked through {@link 
ConsumerRebalanceListener#onPartitionsRevoked(Collection)}.
- *     See {@link 
org.apache.kafka.tools.VerifiableConsumer.PartitionsRevoked}</li>
+ *     See {@link 
org.apache.kafka.tools.VerifiableConsumer.PartitionsRevoked}.</li>
  * <li>partitions_assigned: outputs the partitions assigned through {@link 
ConsumerRebalanceListener#onPartitionsAssigned(Collection)}
  *     See {@link 
org.apache.kafka.tools.VerifiableConsumer.PartitionsAssigned}.</li>
  * <li>records_consumed: contains a summary of records consumed in a single 
call to {@link KafkaConsumer#poll(Duration)}.
  *     See {@link 
org.apache.kafka.tools.VerifiableConsumer.RecordsConsumed}.</li>
  * <li>record_data: contains the key, value, and offset of an individual 
consumed record (only included if verbose
  *     output is enabled). See {@link 
org.apache.kafka.tools.VerifiableConsumer.RecordData}.</li>
- * <li>offsets_committed: The result of every offset commit (only included if 
auto-commit is not enabled).
- *     See {@link 
org.apache.kafka.tools.VerifiableConsumer.OffsetsCommitted}</li>
+ * <li>offsets_committed: the result of every offset commit (only included if 
auto-commit is not enabled).
+ *     See {@link 
org.apache.kafka.tools.VerifiableConsumer.OffsetsCommitted}.</li>
+ * <li>shutdown_requested: emitted as consumer shutdown is requested.
+ *     See {@link 
org.apache.kafka.tools.VerifiableConsumer.ShutdownRequested}.</li>
  * <li>shutdown_complete: emitted after the consumer returns from {@link 
KafkaConsumer#close()}.
  *     See {@link 
org.apache.kafka.tools.VerifiableConsumer.ShutdownComplete}.</li>
  * </ul>
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
index c2da3ed34d3..5b897cfb643 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
@@ -76,6 +76,28 @@ import java.util.concurrent.TimeUnit;
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 
+/**
+ * Command line share consumer designed for system testing. It outputs 
consumer events to STDOUT as JSON
+ * formatted objects. The "name" field in each JSON event identifies the event 
type. The following
+ * events are currently supported:
+ *
+ * <ul>
+ * <li>startup_complete: emitted when the share consumer has started up.
+ *     See {@link VerifiableShareConsumer.StartupComplete}.</li>
+ * <li>offset_reset_strategy_set: emitted when the offset reset strategy for 
the share group has been set.
+ *     See {@link VerifiableShareConsumer.OffsetResetStrategySet}.</li>
+ * <li>records_consumed: contains a summary of records consumed in a single 
call to {@link KafkaShareConsumer#poll(Duration)}.
+ *     See {@link VerifiableShareConsumer.RecordsConsumed}.</li>
+ * <li>offsets_acknowledged: contains the result of acknowledging offsets.
+ *     See {@link VerifiableShareConsumer.OffsetsAcknowledged}.</li>
+ * <li>record_data: contains the key, value, and offset of an individual 
consumed record (only included if verbose
+ *  *     output is enabled). See {@link 
VerifiableShareConsumer.RecordData}.</li>
+ * <li>shutdown_requested: emitted as share consumer shutdown is requested.
+ *     See {@link VerifiableShareConsumer.ShutdownRequested}.</li>
+ * <li>shutdown_complete: emitted after the share consumer returns from {@link 
KafkaShareConsumer#close()}.
+ *     See {@link VerifiableShareConsumer.ShutdownComplete}.</li>
+ * </ul>
+ */
 public class VerifiableShareConsumer implements Closeable, 
AcknowledgementCommitCallback {
 
     private static final Logger log = 
LoggerFactory.getLogger(VerifiableShareConsumer.class);
@@ -197,6 +219,14 @@ public class VerifiableShareConsumer implements Closeable, 
AcknowledgementCommit
         }
     }
 
+    private static class ShutdownRequested extends ShareConsumerEvent {
+
+        @Override
+        public String name() {
+            return "shutdown_requested";
+        }
+    }
+
     protected static class ShutdownComplete extends ShareConsumerEvent {
 
         @Override
@@ -455,6 +485,7 @@ public class VerifiableShareConsumer implements Closeable, 
AcknowledgementCommit
     public void close() {
         boolean interrupted = false;
         try {
+            printJson(new ShutdownRequested());
             consumer.wakeup();
             while (true) {
                 try {

Reply via email to