lianetm commented on code in PR #16833:
URL: https://github.com/apache/kafka/pull/16833#discussion_r1711572849


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -1003,11 +1003,13 @@ private void onFailure(final long currentTimeMs,
                 // Re-discover the coordinator and retry
                 coordinatorRequestManager.markCoordinatorUnknown("error 
response " + responseError.name(), currentTimeMs);
                 future.completeExceptionally(responseError.exception());
+            } else if (responseError.exception() instanceof 
RetriableException) {
+                // If fail with a retriable KafkaException, then retry
+                future.completeExceptionally(responseError.exception());

Review Comment:
   This may bubble up RetriableExceptions that were not being propagated before 
(they were wrapped as KafkaException before). My expectation is that they will 
never make it to the API level, because they are either internally retried or 
wrapped in a TimeoutException when time runs out 
([this](https://github.com/apache/kafka/blob/0a4a12fbc4df0ac4b141a869d3880b15b66a7759/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java#L538-L543)
 code). 
   
   That being said, that's my expectation, but seems not fully covered in the 
tests? We could consider extending the existing 
[testRetriable](https://github.com/apache/kafka/blob/0a4a12fbc4df0ac4b141a869d3880b15b66a7759/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java#L1202)
 , to check that:
   1. the request is indeed retried -> 
[this](https://github.com/apache/kafka/blob/0a4a12fbc4df0ac4b141a869d3880b15b66a7759/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java#L1208)
 poll after the backoff should return 1 request
   2. the request is not retried forever -> sleep the api timeout and poll 
again. At this point the poll should not generate any request because time has 
run out.
   3. the result propagated is a TimeoutException -> After sleeping the 
apiTimeout and poll empty we should assert that the future completed 
exceptionally with TimeoutException (not the retriable error).
   Makes sense?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -688,7 +688,8 @@ public void testOffsetFetchRequestTimeoutRequests(final 
Errors error) {
     }
 
     private boolean isRetriableOnOffsetFetch(Errors error) {
-        return error == Errors.NOT_COORDINATOR || error == 
Errors.COORDINATOR_LOAD_IN_PROGRESS || error == 
Errors.COORDINATOR_NOT_AVAILABLE;
+        return error == Errors.NOT_COORDINATOR || error == 
Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.COORDINATOR_NOT_AVAILABLE
+                || error == Errors.UNKNOWN_TOPIC_OR_PARTITION || error == 
Errors.REQUEST_TIMED_OUT || error == Errors.UNSTABLE_OFFSET_COMMIT;

Review Comment:
   with this PR, we're really saying that everything that is `instanceof 
RetriableException` will be retried. So should we check that instead of 
specific errors? (then probably simpler to just remove this func and check the 
instance of directly where it's used). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to