lucasbru commented on code in PR #15000:
URL: https://github.com/apache/kafka/pull/15000#discussion_r1450646214
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -760,14 +765,19 @@ public void commitAsync(Map<TopicPartition,
OffsetAndMetadata> offsets, OffsetCo
// waiting for a response.
CompletableFuture<Void> future = commit(offsets, false,
Optional.empty());
future.whenComplete((r, t) -> {
+
+ if (t == null) {
+
offsetCommitCallbackInvoker.submitCommitInterceptors(offsets);
+ }
+
if (callback == null) {
if (t != null) {
log.error("Offset commit with offsets {} failed",
offsets, t);
}
return;
}
- invoker.submit(new OffsetCommitCallbackTask(callback, offsets,
(Exception) t));
+ offsetCommitCallbackInvoker.submitUserCallback(callback,
offsets, (Exception) t);
Review Comment:
Refactored so that `OffsetCommitCallbackTask` can be an private class inside
the invoker.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1902,65 +1913,14 @@ private void maybeThrowFencedInstanceException() {
}
private void maybeInvokeCommitCallbacks() {
- if (callbacks() > 0) {
- invoker.executeCallbacks();
+ if (offsetCommitCallbackInvoker.executeCallbacks()) {
+ isFenced = true;
}
}
- // Visible for testing
- int callbacks() {
- return invoker.callbackQueue.size();
- }
-
// Visible for testing
SubscriptionState subscriptions() {
return subscriptions;
}
- /**
- * Utility class that helps the application thread to invoke user
registered {@link OffsetCommitCallback}. This is
- * achieved by having the background thread register a {@link
OffsetCommitCallbackTask} to the invoker upon the
- * future completion, and execute the callbacks when user
polls/commits/closes the consumer.
- */
- private class OffsetCommitCallbackInvoker {
Review Comment:
Moved to a separate file since it's going to be shared across threads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]