lucasbru commented on code in PR #15613:
URL: https://github.com/apache/kafka/pull/15613#discussion_r1559430603
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -229,7 +229,11 @@ private GroupRebalanceConfig
buildRebalanceConfig(Optional<String> groupInstance
@AfterEach
public void teardown() {
this.metrics.close();
- this.coordinator.close(time.timer(0));
+ try {
+ this.coordinator.close(time.timer(0));
Review Comment:
correct. it was just less likely before.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1360,6 +1362,9 @@ public void commitSync(Map<TopicPartition,
OffsetAndMetadata> offsets, Duration
Timer requestTimer = time.timer(timeout.toMillis());
SyncCommitEvent syncCommitEvent = new SyncCommitEvent(offsets,
requestTimer);
CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
+
+ awaitPendingAsyncCommits(requestTimer, false);
Review Comment:
Done
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol:
String): Unit = {
Review Comment:
Done
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol:
String): Unit = {
+ // This is testing the contract that asynchronous offset commit are
completed before the consumer
+ // is closed.
+ val producer = createProducer()
+ sendRecords(producer, numRecords = 3, tp)
+ sendRecords(producer, numRecords = 3, tp2)
+
+ val consumer = createConsumer()
+ consumer.assign(List(tp, tp2).asJava)
+
+ // Try without looking up the coordinator first
+ val cb = new CountConsumerCommitCallback
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(1L))).asJava, cb)
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new
OffsetAndMetadata(1L))).asJava, cb)
+ consumer.close()
+ assertEquals(2, cb.successCount);
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String,
groupProtocol: String): Unit = {
+ // This is testing the contract that asynchronous offset commits sent
previously with the
+ // `commitAsync` are guaranteed to have their callbacks invoked prior to
completion of
+ // `commitSync` (given that it does not time out).
+ val producer = createProducer()
+ sendRecords(producer, numRecords = 3, tp)
+ sendRecords(producer, numRecords = 3, tp2)
+
+ val consumer = createConsumer()
+ consumer.assign(List(tp, tp2).asJava)
+
+ // Try without looking up the coordinator first
+ val cb = new CountConsumerCommitCallback
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(1L))).asJava, cb)
+ consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+ assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+ assertEquals(1, cb.successCount);
+
+ // Enforce looking up the coordinator
+ consumer.committed(Set(tp, tp2).asJava)
+
+ // Try with coordinator known
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(2L))).asJava, cb)
+ consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new
OffsetAndMetadata(2L))).asJava)
+ assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
+ assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+ assertEquals(2, cb.successCount);
+
+ // Try with empty sync commit
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(3L))).asJava, cb)
+ consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+ assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset)
+ assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+ assertEquals(3, cb.successCount);
Review Comment:
Done
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol:
String): Unit = {
+ // This is testing the contract that asynchronous offset commit are
completed before the consumer
+ // is closed.
+ val producer = createProducer()
+ sendRecords(producer, numRecords = 3, tp)
+ sendRecords(producer, numRecords = 3, tp2)
+
+ val consumer = createConsumer()
+ consumer.assign(List(tp, tp2).asJava)
+
+ // Try without looking up the coordinator first
+ val cb = new CountConsumerCommitCallback
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(1L))).asJava, cb)
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new
OffsetAndMetadata(1L))).asJava, cb)
+ consumer.close()
+ assertEquals(2, cb.successCount);
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String,
groupProtocol: String): Unit = {
+ // This is testing the contract that asynchronous offset commits sent
previously with the
+ // `commitAsync` are guaranteed to have their callbacks invoked prior to
completion of
+ // `commitSync` (given that it does not time out).
+ val producer = createProducer()
+ sendRecords(producer, numRecords = 3, tp)
+ sendRecords(producer, numRecords = 3, tp2)
+
+ val consumer = createConsumer()
+ consumer.assign(List(tp, tp2).asJava)
+
+ // Try without looking up the coordinator first
+ val cb = new CountConsumerCommitCallback
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(1L))).asJava, cb)
+ consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+ assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+ assertEquals(1, cb.successCount);
+
+ // Enforce looking up the coordinator
+ consumer.committed(Set(tp, tp2).asJava)
Review Comment:
True, it's not required. I added this to unconfuse people (because that
`assertEquals` already has the effect is subtle). But instead I confused you,
so let me remove it ;)
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol:
String): Unit = {
+ // This is testing the contract that asynchronous offset commit are
completed before the consumer
+ // is closed.
+ val producer = createProducer()
+ sendRecords(producer, numRecords = 3, tp)
+ sendRecords(producer, numRecords = 3, tp2)
+
+ val consumer = createConsumer()
+ consumer.assign(List(tp, tp2).asJava)
+
+ // Try without looking up the coordinator first
+ val cb = new CountConsumerCommitCallback
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(1L))).asJava, cb)
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new
OffsetAndMetadata(1L))).asJava, cb)
+ consumer.close()
+ assertEquals(2, cb.successCount);
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCommitAsyncCompletedBeforeCommitSyncReturns(quorum: String,
groupProtocol: String): Unit = {
+ // This is testing the contract that asynchronous offset commits sent
previously with the
+ // `commitAsync` are guaranteed to have their callbacks invoked prior to
completion of
+ // `commitSync` (given that it does not time out).
+ val producer = createProducer()
+ sendRecords(producer, numRecords = 3, tp)
+ sendRecords(producer, numRecords = 3, tp2)
+
+ val consumer = createConsumer()
+ consumer.assign(List(tp, tp2).asJava)
+
+ // Try without looking up the coordinator first
+ val cb = new CountConsumerCommitCallback
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(1L))).asJava, cb)
+ consumer.commitSync(Map.empty[TopicPartition, OffsetAndMetadata].asJava)
+ assertEquals(1, consumer.committed(Set(tp).asJava).get(tp).offset)
+ assertEquals(1, cb.successCount);
+
+ // Enforce looking up the coordinator
+ consumer.committed(Set(tp, tp2).asJava)
+
+ // Try with coordinator known
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(2L))).asJava, cb)
+ consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new
OffsetAndMetadata(2L))).asJava)
+ assertEquals(2, consumer.committed(Set(tp).asJava).get(tp).offset)
+ assertEquals(2, consumer.committed(Set(tp2).asJava).get(tp2).offset)
+ assertEquals(2, cb.successCount);
Review Comment:
Done
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1164,7 +1176,8 @@ public void maybeAutoCommitOffsetsAsync(long now) {
}
private boolean invokePendingAsyncCommits(Timer timer) {
- if (inFlightAsyncCommits.get() == 0) {
+ if (pendingAsyncCommits.get() == 0 && inFlightAsyncCommits.get() == 0)
{
Review Comment:
100%
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol:
String): Unit = {
+ // This is testing the contract that asynchronous offset commit are
completed before the consumer
+ // is closed.
+ val producer = createProducer()
+ sendRecords(producer, numRecords = 3, tp)
+ sendRecords(producer, numRecords = 3, tp2)
+
+ val consumer = createConsumer()
+ consumer.assign(List(tp, tp2).asJava)
+
+ // Try without looking up the coordinator first
+ val cb = new CountConsumerCommitCallback
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp, new
OffsetAndMetadata(1L))).asJava, cb)
+ consumer.commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, new
OffsetAndMetadata(1L))).asJava, cb)
+ consumer.close()
+ assertEquals(2, cb.successCount);
Review Comment:
Done
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -984,6 +984,8 @@ public void close(final Timer timer) {
}
} finally {
super.close(timer);
+ // Super-class close may wait for more commit callbacks to
complete.
+ invokeCompletedOffsetCommitCallbacks();
Review Comment:
That timer will be expired, right? So when we wait inside finally, we will
not block again in `awaitPendingRequests` in the super-class.
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -654,6 +654,64 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset)
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCommitAsyncCompletedConsumerCloses(quorum: String, groupProtocol:
String): Unit = {
+ // This is testing the contract that asynchronous offset commit are
completed before the consumer
+ // is closed.
Review Comment:
Agreed!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]