kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1559907295
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -157,8 +157,10 @@ public class AsyncKafkaConsumerTest {
@AfterEach
public void resetAll() {
backgroundEventQueue.clear();
- if (consumer != null) {
+ try {
consumer.close(Duration.ZERO);
+ } catch (Exception e) {
+ // ignore
Review Comment:
I'm a little leery about swallowing the exception here. Can we validate the
exception type is something we expect? e.g.:
```suggestion
} catch (Exception e) {
assertInstanceOf(KafkaException.class, e);
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1388,6 +1393,37 @@ public void commitSync(Map<TopicPartition,
OffsetAndMetadata> offsets, Duration
}
}
+ private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer
timer, boolean disableWakeup) {
+ if (lastPendingAsyncCommit == null) {
+ return;
+ }
+
+ try {
+ CompletableFuture<Void> futureToAwait;
+ if (!disableWakeup) {
+ // We don't want the wake-up trigger to complete our pending
async commit future,
+ // so create new future here.
+ futureToAwait = new CompletableFuture<>();
+ lastPendingAsyncCommit.whenComplete((v, t) -> {
+ if (t != null) {
+ futureToAwait.completeExceptionally(t);
+ } else {
+ futureToAwait.complete(v);
+ }
+ });
+ wakeupTrigger.setActiveTask(futureToAwait);
+ } else {
+ futureToAwait = lastPendingAsyncCommit;
+ }
+ ConsumerUtils.getResult(futureToAwait, timer);
+ lastPendingAsyncCommit = null;
+ } finally {
+ if (!disableWakeup) wakeupTrigger.clearTask();
+ timer.update();
+ }
Review Comment:
Do we want to clear out the `lastPendingAsyncCommit` in the `finally` block:
```suggestion
ConsumerUtils.getResult(futureToAwait, timer);
} finally {
lastPendingAsyncCommit = null;
if (!disableWakeup) wakeupTrigger.clearTask();
timer.update();
}
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() {
}
}
- private void maybeInvokeCommitCallbacks() {
- offsetCommitCallbackInvoker.executeCallbacks();
- }
-
Review Comment:
Any reason we don't want to keep this method abstraction?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]