ShivsundarR commented on code in PR #19417:
URL: https://github.com/apache/kafka/pull/19417#discussion_r2044503952
##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##########
@@ -748,28 +713,33 @@ public void
testExplicitAcknowledgementCommitAsyncPartialBatch() {
// Acknowledging 2 out of the 3 records received via commitAsync.
ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+ ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next();
assertEquals(0L, firstRecord.offset());
assertEquals(1L, secondRecord.offset());
shareConsumer1.acknowledge(firstRecord);
shareConsumer1.acknowledge(secondRecord);
shareConsumer1.commitAsync();
- // The 3rd record should be re-presented to the consumer when it
polls again.
- records = shareConsumer1.poll(Duration.ofMillis(5000));
- assertEquals(1, records.count());
- iterator = records.iterator();
- firstRecord = iterator.next();
- assertEquals(2L, firstRecord.offset());
+ producer.send(record4);
+ producer.flush();
+
+ // The next poll() should throw an IllegalStateException as there
is still 1 unacknowledged record.
+ // In EXPLICIT acknowledgement mode, we are not allowed to have
unacknowledged records from a batch.
+ assertThrows(IllegalStateException.class, () ->
shareConsumer1.poll(Duration.ofMillis(5000)));
+
+ // Acknowledging the 3rd record
Review Comment:
Update on this : So I experimented with a mock consumer application, overall
it looks good with the current setup. When the user gets the exception, they
can do what the want.
- Close the consumer, in which case the unacked records are released.
- Re-attempt to acknowledge the remaining records. We will continue to get
the exception until all the records are acknowledged. After that any further
consumption will continue as expected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]