This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7ff975d268f MINOR: Improve shutdown logging for share consumer system
tests (#21895)
7ff975d268f is described below
commit 7ff975d268f23a201e08cf8095d29ceac83467a5
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Mar 30 16:48:46 2026 +0100
MINOR: Improve shutdown logging for share consumer system tests (#21895)
This PR aligns the shutdown logging for share consumer system tests to
match the regular consumer tests (see
https://github.com/apache/kafka/pull/21586). The aim is to help with
debugging flaky system tests in the future.
Reviewers: Bill Bejeck <[email protected]>
---
.../services/verifiable_share_consumer.py | 2 ++
.../org/apache/kafka/tools/VerifiableConsumer.java | 8 +++---
.../kafka/tools/VerifiableShareConsumer.java | 31 ++++++++++++++++++++++
3 files changed, 38 insertions(+), 3 deletions(-)
diff --git a/tests/kafkatest/services/verifiable_share_consumer.py
b/tests/kafkatest/services/verifiable_share_consumer.py
index eca79cb4106..65f86966237 100644
--- a/tests/kafkatest/services/verifiable_share_consumer.py
+++ b/tests/kafkatest/services/verifiable_share_consumer.py
@@ -183,6 +183,8 @@ class VerifiableShareConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Bac
self._update_global_consumed(event)
elif name == "record_data" and self.on_record_consumed:
self.on_record_consumed(event, node)
+ elif name == "shutdown_requested":
+ self.logger.debug("Shutdown has been requested")
else:
self.logger.debug("%s: ignoring unknown event: %s" %
(str(node.account), event))
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 25c82c713b1..98b1ac00e2a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -75,15 +75,17 @@ import static
net.sourceforge.argparse4j.impl.Arguments.storeTrue;
*
* <ul>
* <li>partitions_revoked: outputs the partitions revoked through {@link
ConsumerRebalanceListener#onPartitionsRevoked(Collection)}.
- * See {@link
org.apache.kafka.tools.VerifiableConsumer.PartitionsRevoked}</li>
+ * See {@link
org.apache.kafka.tools.VerifiableConsumer.PartitionsRevoked}.</li>
* <li>partitions_assigned: outputs the partitions assigned through {@link
ConsumerRebalanceListener#onPartitionsAssigned(Collection)}
* See {@link
org.apache.kafka.tools.VerifiableConsumer.PartitionsAssigned}.</li>
* <li>records_consumed: contains a summary of records consumed in a single
call to {@link KafkaConsumer#poll(Duration)}.
* See {@link
org.apache.kafka.tools.VerifiableConsumer.RecordsConsumed}.</li>
* <li>record_data: contains the key, value, and offset of an individual
consumed record (only included if verbose
* output is enabled). See {@link
org.apache.kafka.tools.VerifiableConsumer.RecordData}.</li>
- * <li>offsets_committed: The result of every offset commit (only included if
auto-commit is not enabled).
- * See {@link
org.apache.kafka.tools.VerifiableConsumer.OffsetsCommitted}</li>
+ * <li>offsets_committed: the result of every offset commit (only included if
auto-commit is not enabled).
+ * See {@link
org.apache.kafka.tools.VerifiableConsumer.OffsetsCommitted}.</li>
+ * <li>shutdown_requested: emitted as consumer shutdown is requested.
+ * See {@link
org.apache.kafka.tools.VerifiableConsumer.ShutdownRequested}.</li>
* <li>shutdown_complete: emitted after the consumer returns from {@link
KafkaConsumer#close()}.
* See {@link
org.apache.kafka.tools.VerifiableConsumer.ShutdownComplete}.</li>
* </ul>
diff --git
a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
index c2da3ed34d3..5b897cfb643 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java
@@ -76,6 +76,28 @@ import java.util.concurrent.TimeUnit;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+/**
+ * Command line share consumer designed for system testing. It outputs
consumer events to STDOUT as JSON
+ * formatted objects. The "name" field in each JSON event identifies the event
type. The following
+ * events are currently supported:
+ *
+ * <ul>
+ * <li>startup_complete: emitted when the share consumer has started up.
+ * See {@link VerifiableShareConsumer.StartupComplete}.</li>
+ * <li>offset_reset_strategy_set: emitted when the offset reset strategy for
the share group has been set.
+ * See {@link VerifiableShareConsumer.OffsetResetStrategySet}.</li>
+ * <li>records_consumed: contains a summary of records consumed in a single
call to {@link KafkaShareConsumer#poll(Duration)}.
+ * See {@link VerifiableShareConsumer.RecordsConsumed}.</li>
+ * <li>offsets_acknowledged: contains the result of acknowledging offsets.
+ * See {@link VerifiableShareConsumer.OffsetsAcknowledged}.</li>
+ * <li>record_data: contains the key, value, and offset of an individual
consumed record (only included if verbose
+ * * output is enabled). See {@link
VerifiableShareConsumer.RecordData}.</li>
+ * <li>shutdown_requested: emitted as share consumer shutdown is requested.
+ * See {@link VerifiableShareConsumer.ShutdownRequested}.</li>
+ * <li>shutdown_complete: emitted after the share consumer returns from {@link
KafkaShareConsumer#close()}.
+ * See {@link VerifiableShareConsumer.ShutdownComplete}.</li>
+ * </ul>
+ */
public class VerifiableShareConsumer implements Closeable,
AcknowledgementCommitCallback {
private static final Logger log =
LoggerFactory.getLogger(VerifiableShareConsumer.class);
@@ -197,6 +219,14 @@ public class VerifiableShareConsumer implements Closeable,
AcknowledgementCommit
}
}
+ private static class ShutdownRequested extends ShareConsumerEvent {
+
+ @Override
+ public String name() {
+ return "shutdown_requested";
+ }
+ }
+
protected static class ShutdownComplete extends ShareConsumerEvent {
@Override
@@ -455,6 +485,7 @@ public class VerifiableShareConsumer implements Closeable,
AcknowledgementCommit
public void close() {
boolean interrupted = false;
try {
+ printJson(new ShutdownRequested());
consumer.wakeup();
while (true) {
try {