lianetm commented on code in PR #18050:
URL: https://github.com/apache/kafka/pull/18050#discussion_r1907490954
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java:
##########
@@ -191,28 +187,15 @@ public void testBackoffAfterRetriableFailure() {
}
@Test
- public void testPropagateAndBackoffAfterFatalError() {
+ public void testBackoffAfterFatalError() {
CoordinatorRequestManager coordinatorManager =
setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager,
Errors.GROUP_AUTHORIZATION_FAILED);
- verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
- if (!(backgroundEvent instanceof ErrorEvent))
- return false;
-
- RuntimeException exception = ((ErrorEvent)
backgroundEvent).error();
-
- if (!(exception instanceof GroupAuthorizationException))
- return false;
-
- GroupAuthorizationException groupAuthException =
(GroupAuthorizationException) exception;
- return groupAuthException.groupId().equals(GROUP_ID);
- }));
-
time.sleep(RETRY_BACKOFF_MS - 1);
- assertEquals(Collections.emptyList(),
coordinatorManager.poll(time.milliseconds()).unsentRequests);
Review Comment:
I would expect we should keep this line to check that no request is
generated if the backoff hasn't expired.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManagerTest.java:
##########
@@ -191,28 +187,15 @@ public void testBackoffAfterRetriableFailure() {
}
@Test
- public void testPropagateAndBackoffAfterFatalError() {
+ public void testBackoffAfterFatalError() {
CoordinatorRequestManager coordinatorManager =
setupCoordinatorManager(GROUP_ID);
expectFindCoordinatorRequest(coordinatorManager,
Errors.GROUP_AUTHORIZATION_FAILED);
- verify(backgroundEventHandler).add(argThat(backgroundEvent -> {
- if (!(backgroundEvent instanceof ErrorEvent))
- return false;
-
- RuntimeException exception = ((ErrorEvent)
backgroundEvent).error();
-
- if (!(exception instanceof GroupAuthorizationException))
- return false;
-
- GroupAuthorizationException groupAuthException =
(GroupAuthorizationException) exception;
- return groupAuthException.groupId().equals(GROUP_ID);
- }));
-
time.sleep(RETRY_BACKOFF_MS - 1);
- assertEquals(Collections.emptyList(),
coordinatorManager.poll(time.milliseconds()).unsentRequests);
time.sleep(1);
assertEquals(1,
coordinatorManager.poll(time.milliseconds()).unsentRequests.size());
+ assertEquals(0,
coordinatorManager.poll(time.milliseconds()).unsentRequests.size());
Review Comment:
if we restore the line from the comment above seems we don't need to add
this, right?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -244,4 +246,12 @@ private void onResponse(
public Optional<Node> coordinator() {
return Optional.ofNullable(this.coordinator);
}
+
+ public void clearFatalError() {
Review Comment:
should this be private? we only call it from this same component now
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java:
##########
@@ -186,6 +187,7 @@ private void onSuccessfulResponse(
coordinator.port());
log.info("Discovered group coordinator {}", coordinator);
coordinatorRequestState.onSuccessfulAttempt(currentTimeMs);
+ clearFatalError();
Review Comment:
thinking more about this call here, wonder if it may not be enough and
should be higher up in the call stack of the response. Ex. in the case of fatal
error and then a retriable error, we would not clear the fatal error right? If
we want to ensure we always clear/reset the fatal error, maybe we should move
it to the first thing to do when we receive a response
https://github.com/apache/kafka/blob/624dd458099fa93b3fa1e1715b58bbc6d8689857/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorRequestManager.java#L116
From there on, it will be set again if there is again a fatal error, or left
clear if there is not (successful response, non-fatal error). Makes sense?
Also, could we add a unit test to cover this, checking how the
coordinatorReqMgr clears/reset the fatal error when receiving a successful
response, retriable error, or different fatal error?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -1498,6 +1498,23 @@ public void testSignalClose() {
OffsetCommitRequestData data = (OffsetCommitRequestData)
res.unsentRequests.get(0).requestBuilder().build().data();
assertEquals("topic", data.topics().get(0).name());
}
+
+ @Test
+ public void testPollWithFatalErrorShouldFailingAllUnsentRequest() {
Review Comment:
```suggestion
public void testPollWithFatalErrorShouldFailAllUnsentRequests() {
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -1246,6 +1248,16 @@ private List<NetworkClientDelegate.UnsentRequest>
drainPendingCommits() {
clearAll();
return res;
}
+
+ private void maybeFailCoordinatorFatalError() {
+ Optional<Throwable> fatalError =
coordinatorRequestManager.fatalError();
+ if (fatalError.isPresent()) {
Review Comment:
```suggestion
coordinatorRequestManager.fatalError().ifPresent(throwable -> {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]