frankvicky commented on code in PR #16833:
URL: https://github.com/apache/kafka/pull/16833#discussion_r1722677197
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -988,26 +989,29 @@ private void onFailure(final long currentTimeMs,
final Errors responseError) {
log.debug("Offset fetch failed: {}", responseError.message());
onFailedAttempt(currentTimeMs);
+ ApiException exception = responseError.exception();
if (responseError == COORDINATOR_LOAD_IN_PROGRESS) {
- future.completeExceptionally(responseError.exception());
+ future.completeExceptionally(exception);
} else if (responseError == Errors.UNKNOWN_MEMBER_ID) {
log.error("OffsetFetch failed with {} because the member is
not part of the group" +
" anymore.", responseError);
- future.completeExceptionally(responseError.exception());
+ future.completeExceptionally(exception);
} else if (responseError == Errors.STALE_MEMBER_EPOCH) {
log.error("OffsetFetch failed with {} and the consumer is not
part " +
"of the group anymore (it probably left the group, got
fenced" +
" or failed). The request cannot be retried and will
fail.", responseError);
- future.completeExceptionally(responseError.exception());
+ future.completeExceptionally(exception);
} else if (responseError == Errors.NOT_COORDINATOR ||
responseError == Errors.COORDINATOR_NOT_AVAILABLE) {
// Re-discover the coordinator and retry
coordinatorRequestManager.markCoordinatorUnknown("error
response " + responseError.name(), currentTimeMs);
- future.completeExceptionally(responseError.exception());
+ future.completeExceptionally(exception);
+ } else if (exception instanceof RetriableException && !(exception
instanceof TimeoutException)) {
Review Comment:
Hi @lianetm,
I’ve reviewed the flow of the request processing, and I agree that a request
receiving a `REQUEST_TIMED_OUT` should definitely be retried.
Since we’ve extended the `testRetriable` and ensured that no unexpected
exceptions bubble up, I believe we can safely allow requests with a
`RetriableException` to be retried. If a request does time out,
`fetchOffsetsWithRetries` will handle it appropriately by expiring the request.
I will proceed with the update.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]