This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch 4.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 2f2a3fdbace68b99cecfd8e78a7d55fac7406b2c Author: Bolin Lin <[email protected]> AuthorDate: Thu Jan 8 05:19:18 2026 -0500 MINOR: use known number of records in testComplexShareConsumer to prevent timeout issue (#21235) In extreme situations, the existing throttling mechanism in share consumer limits the consumer to processing only a single record at a time, which can intermittently cause `testComplexShareConsumer` to time out. I use a known number of records to make the test reliable. Reviewers: Andrew Schofield <[email protected]> --- .../org/apache/kafka/clients/consumer/ShareConsumerTest.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 32b05351e98..b67202e29fb 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -2318,15 +2318,16 @@ public class ShareConsumerTest { ClientState prodState = new ClientState(); - // Produce messages until we want. + // Produce a fixed number of messages for deterministic testing. + int targetRecordCount = 2000; service.execute(() -> { try (Producer<byte[], byte[]> producer = createProducer()) { - while (!prodState.done().get()) { + do { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(multiTp.topic(), multiTp.partition(), null, "key".getBytes(), "value".getBytes()); producer.send(record); producer.flush(); - prodState.count().incrementAndGet(); - } + } while (prodState.count().incrementAndGet() < targetRecordCount); + prodState.done().set(true); } }); @@ -2345,9 +2346,6 @@ public class ShareConsumerTest { TimeUnit.MILLISECONDS ); - // Let the complex consumer read the messages. - service.schedule(() -> prodState.done().set(true), 5L, TimeUnit.SECONDS); - // All messages which can be read are read, some would be redelivered (roughly 2 times the records produced). TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did not close!"); int delta = complexCons1.recordsRead() - (int) (prodState.count().get() * 2 * 0.95); // 2 times with margin of error (5%).
