lianetm commented on code in PR #16833:
URL: https://github.com/apache/kafka/pull/16833#discussion_r1711572849
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -1003,11 +1003,13 @@ private void onFailure(final long currentTimeMs,
// Re-discover the coordinator and retry
coordinatorRequestManager.markCoordinatorUnknown("error
response " + responseError.name(), currentTimeMs);
future.completeExceptionally(responseError.exception());
+ } else if (responseError.exception() instanceof
RetriableException) {
+ // If fail with a retriable KafkaException, then retry
+ future.completeExceptionally(responseError.exception());
Review Comment:
This may bubble up RetriableExceptions that were not being propagated before
(they were wrapped as KafkaException before). My expectation is that they will
never make it to the API level, because they are either internally retried or
wrapped in a TimeoutException when time runs out
([this](https://github.com/apache/kafka/blob/0a4a12fbc4df0ac4b141a869d3880b15b66a7759/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L538-L543)
code).
That being said, that's my expectation, but seems not fully covered in the
tests? We could consider extending the existing
[testRetriable](https://github.com/apache/kafka/blob/0a4a12fbc4df0ac4b141a869d3880b15b66a7759/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java#L1202)
, to check that:
1. the request is indeed retried ->
[this](https://github.com/apache/kafka/blob/0a4a12fbc4df0ac4b141a869d3880b15b66a7759/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java#L1208)
poll after the backoff should return 1 request
2. the request is not retried forever -> sleep the api timeout and poll
again. At this point the poll should not generate any request because time has
run out.
3. the result propagated is a TimeoutException -> After sleeping the
apiTimeout and poll empty we should assert that the future completed
exceptionally with TimeoutException (not the retriable error).
Makes sense?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -688,7 +688,8 @@ public void testOffsetFetchRequestTimeoutRequests(final
Errors error) {
}
private boolean isRetriableOnOffsetFetch(Errors error) {
- return error == Errors.NOT_COORDINATOR || error ==
Errors.COORDINATOR_LOAD_IN_PROGRESS || error ==
Errors.COORDINATOR_NOT_AVAILABLE;
+ return error == Errors.NOT_COORDINATOR || error ==
Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE
+ || error == Errors.UNKNOWN_TOPIC_OR_PARTITION || error ==
Errors.REQUEST_TIMED_OUT || error == Errors.UNSTABLE_OFFSET_COMMIT;
Review Comment:
with this PR, we're really saying that everything that is `instanceof
RetriableException` will be retried. So should we check that instead of
specific errors? (then probably simpler to just remove this func and check the
instance of directly where it's used).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]