kirktrue commented on code in PR #15640:
URL: https://github.com/apache/kafka/pull/15640#discussion_r1602382027
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -294,6 +299,46 @@ void testEnsureEventsAreCompleted() {
assertTrue(applicationEventsQueue.isEmpty());
}
+ @Test
+ void testReaperExpiresExpiredEvents() {
+ long event1TimeoutMs = 100;
+ long event2TimeoutMs = 200;
+ SyncCommitEvent event1 = new SyncCommitEvent(new HashMap<>(),
calculateDeadlineMs(time, event1TimeoutMs));
+ SyncCommitEvent event2 = new SyncCommitEvent(new HashMap<>(),
calculateDeadlineMs(time, event2TimeoutMs));
+ applicationEventsQueue.add(event1);
+ applicationEventsQueue.add(event2);
+ consumerNetworkThread.runOnce();
+
+ // Make sure both events have been moved from the event queue to the
reaper's "tracked" list.
+ assertFalse(applicationEventsQueue.contains(event1));
+ assertFalse(applicationEventsQueue.contains(event2));
+ assertTrue(applicationEventReaper.contains(event1));
+ assertTrue(applicationEventReaper.contains(event2));
+ assertEquals(2, applicationEventReaper.size());
+
+ // Sleep long enough for the first event to have expired.
+ time.sleep(event1TimeoutMs + 1);
+
+ consumerNetworkThread.runOnce();
+
+ // Validate that the first event was expired, but the second continues
to be tracked
+ assertTrue(event1.future().isCompletedExceptionally());
+ assertThrows(TimeoutException.class, () ->
ConsumerUtils.getResult(event1.future()));
+ assertFalse(applicationEventReaper.contains(event1));
+
+ assertTrue(applicationEventReaper.contains(event2));
+ assertFalse(event2.future().isDone());
+ assertEquals(1, applicationEventReaper.size());
+
+ // The cleanup will trigger the reaper's
+ consumerNetworkThread.cleanup();
Review Comment:
Yes, PTAL.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]