kirktrue commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1568001935
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1388,6 +1393,31 @@ public void commitSync(Map<TopicPartition,
OffsetAndMetadata> offsets, Duration
}
}
+ private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer
timer, boolean disableWakeup) {
Review Comment:
nit: consider changing `disableWakeup` to `enableWakeup`. Double-negatives
add nonzero cognitive overhead.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1388,6 +1393,31 @@ public void commitSync(Map<TopicPartition,
OffsetAndMetadata> offsets, Duration
}
}
+ private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer
timer, boolean disableWakeup) {
+ if (lastPendingAsyncCommit == null) {
+ return;
+ }
+
+ try {
+ final CompletableFuture<Void> futureToAwait = new
CompletableFuture<>();
+ // We don't want the wake-up trigger to complete our pending async
commit future,
+ // so create new future here. Any errors in the pending async
commit will be handled
+ // by the async commit future / the commit callback - here, we
just want to wait for it to complete.
+ lastPendingAsyncCommit.whenComplete((v, t) ->
futureToAwait.complete(null));
+ if (!disableWakeup) {
+ wakeupTrigger.setActiveTask(futureToAwait);
+ }
+ ConsumerUtils.getResult(futureToAwait, timer);
Review Comment:
Is it true that the underlying `lastPendingAsyncCommit` `Future` could
already be completed by this point, right?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
assertEquals("Get fenced exception for group.instance.id
groupInstanceId1", e.getMessage());
}
+ @Test
+ public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+ time = new MockTime(1);
+ consumer = newConsumer();
+
+ // Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ consumer.assign(Collections.singleton(tp));
+ consumer.seek(tp, 20);
+ consumer.commitAsync();
Review Comment:
Can this be replaced with a call to
`testSyncCommitTimesoutAfterIncompleteAsyncCommit()` like the other tests? I
glanced back and forth a couple of times and didn't see too much difference:
```suggestion
final TopicPartition tp = new TopicPartition("foo", 0);
testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
assertEquals("Get fenced exception for group.instance.id
groupInstanceId1", e.getMessage());
}
+ @Test
+ public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+ time = new MockTime(1);
+ consumer = newConsumer();
+
+ // Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ consumer.assign(Collections.singleton(tp));
+ consumer.seek(tp, 20);
+ consumer.commitAsync();
+
+ // Commit async is not completed yet, so commit sync should wait for
it to complete (time out)
+ assertThrows(TimeoutException.class, () ->
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+ // Complete async commit event
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
Review Comment:
This use of JUnit is just about over my head...
For my own understanding, at which line in this test does the
`AsyncCommitEvent` get created and enqueued? I would assume at line 634, right?
It looks like you're able to add the `ArgumentCaptor` _after_ the object
pointed at by the argument was created. Is that correct? 🤔
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1005,6 +1083,43 @@ public void testNoWakeupInCloseCommit() {
assertFalse(capturedEvent.get().future().isCompletedExceptionally());
}
+ @Test
+ public void testCloseAwaitPendingAsyncCommitIncomplete() {
+ time = new MockTime(1);
+ consumer = newConsumer();
+
+ // Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ consumer.assign(Collections.singleton(tp));
+ consumer.seek(tp, 20);
+
+ consumer.commitAsync();
+ Exception e = assertThrows(KafkaException.class, () ->
consumer.close(Duration.ofMillis(10)));
+ assertInstanceOf(TimeoutException.class, e.getCause());
Review Comment:
So this is where it may make sense to have special case to close and clear
out the `consumer` instance, per the previous comment.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
assertEquals("Get fenced exception for group.instance.id
groupInstanceId1", e.getMessage());
}
+ @Test
+ public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+ time = new MockTime(1);
+ consumer = newConsumer();
+
+ // Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ consumer.assign(Collections.singleton(tp));
+ consumer.seek(tp, 20);
+ consumer.commitAsync();
+
+ // Commit async is not completed yet, so commit sync should wait for
it to complete (time out)
+ assertThrows(TimeoutException.class, () ->
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+ // Complete async commit event
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+ commitEvent.future().complete(null);
+
+ // Commit async is completed, so commit sync completes immediately
(since offsets are empty)
+ assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(),
Duration.ofMillis(100)));
+ }
+
+ @Test
+ public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets()
{
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+ // Complete async commit event and sync commit event
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+ commitEvent.future().complete(null);
+
+ // Commit async is completed, so commit sync completes immediately
(since offsets are empty)
Review Comment:
This comment legit confused me for a solid minute because it incorrectly
states the offsets are empty when they're not 😆
```suggestion
// Commit async is completed, so commit sync does not need to wait
before committing its offsets
```
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
assertEquals("Get fenced exception for group.instance.id
groupInstanceId1", e.getMessage());
}
+ @Test
+ public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+ time = new MockTime(1);
+ consumer = newConsumer();
+
+ // Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ consumer.assign(Collections.singleton(tp));
+ consumer.seek(tp, 20);
+ consumer.commitAsync();
+
+ // Commit async is not completed yet, so commit sync should wait for
it to complete (time out)
+ assertThrows(TimeoutException.class, () ->
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+ // Complete async commit event
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+ commitEvent.future().complete(null);
+
+ // Commit async is completed, so commit sync completes immediately
(since offsets are empty)
+ assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(),
Duration.ofMillis(100)));
+ }
+
+ @Test
+ public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets()
{
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+ // Complete async commit event and sync commit event
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+ commitEvent.future().complete(null);
+
+ // Commit async is completed, so commit sync completes immediately
(since offsets are empty)
+ assertDoesNotThrow(() ->
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)),
Duration.ofMillis(100)));
+ }
+
+ @Test
+ public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() {
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+ // Complete exceptionally async commit event and sync commit event
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+ commitEvent.future().completeExceptionally(new KafkaException("Test
exception"));
+
+ // Commit async is completed exceptionally, but this will be handled
by commit callback - commit sync should not fail.
+ assertDoesNotThrow(() ->
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)),
Duration.ofMillis(100)));
+ }
+
+ private void
testSyncCommitTimesoutAfterIncompleteAsyncCommit(TopicPartition tp) {
+ time = new MockTime(1);
+ consumer = newConsumer();
+
+ // Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ consumer.assign(Collections.singleton(tp));
+ consumer.seek(tp, 20);
Review Comment:
Is the `seek()` here to force the partition out of the `INITIALIZED` state
in `SubscriptionState`?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -158,7 +158,11 @@ public class AsyncKafkaConsumerTest {
public void resetAll() {
backgroundEventQueue.clear();
if (consumer != null) {
- consumer.close(Duration.ZERO);
+ try {
+ consumer.close(Duration.ZERO);
+ } catch (Exception e) {
+ assertInstanceOf(KafkaException.class, e);
+ }
Review Comment:
Is this `try`/`catch` needed because the test leaves the `consumer` in a bad
state?
If so, the approach I took was to check for that state in the test itself. I
faced a similar issue with tests that (intentionally) fenced consumers, so I
closed the consumer at the end of the test method like so:
```java
// Close the consumer here as we know it will cause a
FencedInstanceIdException to be thrown.
// If we get an error *other* than the FencedInstanceIdException, we'll fail
the test.
try {
consumer.close();
fail("Fenced consumer should have thrown a {} on close",
FencedInstanceIdException.class.getSimpleName());
} catch (KafkaException e) {
assertNotNull(e.getCause());
assertInstanceOf(FencedInstanceIdException.class, e.getCause());
} finally {
consumer = null;
}
```
I considered adding a similar catch-all approach in `resetAll()`, but was
concerned that it might unnecessarily mask issues.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
assertEquals("Get fenced exception for group.instance.id
groupInstanceId1", e.getMessage());
}
+ @Test
+ public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+ time = new MockTime(1);
+ consumer = newConsumer();
+
+ // Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ consumer.assign(Collections.singleton(tp));
+ consumer.seek(tp, 20);
+ consumer.commitAsync();
+
+ // Commit async is not completed yet, so commit sync should wait for
it to complete (time out)
+ assertThrows(TimeoutException.class, () ->
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
Review Comment:
Very good test case: committing empty offsets _still_ waits. 👍
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -793,9 +795,9 @@ public void commitAsync(Map<TopicPartition,
OffsetAndMetadata> offsets, OffsetCo
}
private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
- maybeThrowFencedInstanceException();
- maybeInvokeCommitCallbacks();
maybeThrowInvalidGroupIdException();
+ maybeThrowFencedInstanceException();
+ offsetCommitCallbackInvoker.executeCallbacks();
Review Comment:
I had thought to ask before, is this reordering needed for correctness? I
assume the order of checking for the _fenced_ case and _invalid group ID_ cases
don't matter. But I assume that we wanted to make sure to perform those checks
before executing the callbacks?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -616,6 +620,80 @@ public void
testCommitSyncTriggersFencedExceptionFromCommitAsync() {
assertEquals("Get fenced exception for group.instance.id
groupInstanceId1", e.getMessage());
}
+ @Test
+ public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
+ time = new MockTime(1);
+ consumer = newConsumer();
+
+ // Commit async (incomplete)
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ consumer.assign(Collections.singleton(tp));
+ consumer.seek(tp, 20);
+ consumer.commitAsync();
+
+ // Commit async is not completed yet, so commit sync should wait for
it to complete (time out)
+ assertThrows(TimeoutException.class, () ->
consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100)));
+
+ // Complete async commit event
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+ commitEvent.future().complete(null);
+
+ // Commit async is completed, so commit sync completes immediately
(since offsets are empty)
+ assertDoesNotThrow(() -> consumer.commitSync(Collections.emptyMap(),
Duration.ofMillis(100)));
+ }
+
+ @Test
+ public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets()
{
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+ // Complete async commit event and sync commit event
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+ commitEvent.future().complete(null);
+
+ // Commit async is completed, so commit sync completes immediately
(since offsets are empty)
+ assertDoesNotThrow(() ->
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20)),
Duration.ofMillis(100)));
+ }
+
+ @Test
+ public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() {
+ final TopicPartition tp = new TopicPartition("foo", 0);
+ testSyncCommitTimesoutAfterIncompleteAsyncCommit(tp);
+
+ // Complete exceptionally async commit event and sync commit event
+ final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
+ verify(applicationEventHandler).add(commitEventCaptor.capture());
+ final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
+ commitEvent.future().completeExceptionally(new KafkaException("Test
exception"));
+
+ // Commit async is completed exceptionally, but this will be handled
by commit callback - commit sync should not fail.
Review Comment:
Cool! I was wondering about this case, but (of course) you've got it covered
😄
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1952,10 +1988,6 @@ private void maybeThrowFencedInstanceException() {
}
}
- private void maybeInvokeCommitCallbacks() {
- offsetCommitCallbackInvoker.executeCallbacks();
- }
-
Review Comment:
I do not insist 😄 I was just curious and I agree that it doesn't odd
anything in this case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]