kuoche1712003 commented on code in PR #19914:
URL: https://github.com/apache/kafka/pull/19914#discussion_r2266029745
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -178,6 +178,11 @@ public CommitRequestManager(
*/
@Override
public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
+ if (coordinatorRequestManager.coordinator().isEmpty() && closing) {
+ handleClosingWithoutCoordinator();
+ return EMPTY;
+ }
+
// poll when the coordinator node is known and fatal error is not
present
if (coordinatorRequestManager.coordinator().isEmpty()) {
pendingRequests.maybeFailOnCoordinatorFatalError();
Review Comment:
Perhaps it could be written like this for better readability:
```suggestion
pendingRequests.maybeFailOnCoordinatorFatalError();
if(closing && pendingRequests.hasUnsentRequests()){
CommitFailedException exception = new CommitFailedException(
"Failed to commit offsets: Coordinator unknown and consumer
is closing");
pendingRequests.drainPendingCommits()
.forEach(request ->
request.future().completeExceptionally(exception));
}
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -1491,6 +1492,31 @@ private static void
assertEmptyPendingRequests(CommitRequestManager commitReques
assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
}
+ @Test
+ public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() {
+ CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+ Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(new TopicPartition("topic", 1),
+ new OffsetAndMetadata(0));
+
+ var commitFuture = commitRequestManager.commitAsync(offsets);
+
+ commitRequestManager.signalClose();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+ when(coordinatorRequestManager.fatalError())
+ .thenReturn(Optional.of(new GroupAuthorizationException("Fatal
error")));
+
+ assertEquals(NetworkClientDelegate.PollResult.EMPTY,
commitRequestManager.poll(200));
+
+ assertTrue(commitFuture.isCompletedExceptionally());
+
+ ExecutionException exception = assertThrows(ExecutionException.class,
commitFuture::get);
+
+ assertInstanceOf(GroupAuthorizationException.class,
exception.getCause());
+ assertEquals("Fatal error", exception.getCause().getMessage());
Review Comment:
You might consider using this method.
`TestUtils.assertFutureThrows`
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -1491,6 +1492,31 @@ private static void
assertEmptyPendingRequests(CommitRequestManager commitReques
assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
}
+ @Test
+ public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() {
+ CommitRequestManager commitRequestManager = create(true, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
Review Comment:
Would it be necessary to include this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]