lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1549895965
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -159,6 +160,224 @@ class PlaintextConsumerTest extends BaseConsumerTest {
consumeAndVerifyRecords(consumer = consumer, numRecords = 1,
startingOffset = 0, startingTimestamp = startingTimestamp)
}
+ /**
+ * Verifies that pattern subscription performs as expected.
+ * The pattern matches the topics 'topic' and 'tblablac', but not 'tblablak'
or 'tblab1'.
+ * It is expected that the consumer is subscribed to all partitions of
'topic' and
+ * 'tblablac' after the subscription when metadata is refreshed.
+ * When a new topic 'tsomec' is added afterwards, it is expected that upon
the next
+ * metadata refresh the consumer becomes subscribed to this new topic and
all partitions
+ * of that topic are assigned to it.
+ */
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testPatternSubscription(quorum: String, groupProtocol: String): Unit = {
+ val numRecords = 10000
+ val producer = createProducer()
+ sendRecords(producer, numRecords, tp)
+
+ val topic1 = "tblablac" // matches subscribed pattern
+ createTopic(topic1, 2, brokerCount)
+ sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
+ sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
+
+ val topic2 = "tblablak" // does not match subscribed pattern
+ createTopic(topic2, 2, brokerCount)
+ sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0))
+ sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1))
+
+ val topic3 = "tblab1" // does not match subscribed pattern
+ createTopic(topic3, 2, brokerCount)
+ sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0))
+ sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1))
+
+ val consumer = createConsumer()
+ assertEquals(0, consumer.assignment().size)
+
+ val pattern = Pattern.compile("t.*c")
+ consumer.subscribe(pattern, new TestConsumerReassignmentListener)
+
+ var assignment = Set(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1))
+ awaitAssignment(consumer, assignment)
+
+ val topic4 = "tsomec" // matches subscribed pattern
+ createTopic(topic4, 2, brokerCount)
+ sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0))
+ sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1))
+
+ assignment ++= Set(
+ new TopicPartition(topic4, 0),
+ new TopicPartition(topic4, 1))
+ awaitAssignment(consumer, assignment)
+
+ consumer.unsubscribe()
+ assertEquals(0, consumer.assignment().size)
+ }
+
+ /**
+ * Verifies that a second call to pattern subscription succeeds and performs
as expected.
+ * The initial subscription is to a pattern that matches two topics 'topic'
and 'foo'.
+ * The second subscription is to a pattern that matches 'foo' and a new
topic 'bar'.
+ * It is expected that the consumer is subscribed to all partitions of
'topic' and 'foo' after
+ * the first subscription, and to all partitions of 'foo' and 'bar' after
the second.
+ * The metadata refresh interval is intentionally increased to a large
enough value to guarantee
+ * that it is the subscription call that triggers a metadata refresh, and
not the timeout.
+ */
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testSubsequentPatternSubscription(quorum: String, groupProtocol:
String): Unit = {
+ this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG,
"30000")
+ val consumer = createConsumer()
+
+ val numRecords = 10000
+ val producer = createProducer()
+ sendRecords(producer, numRecords = numRecords, tp)
+
+ // the first topic ('topic') matches first subscription pattern only
+
+ val fooTopic = "foo" // matches both subscription patterns
+ createTopic(fooTopic, 1, brokerCount)
+ sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0))
+
+ assertEquals(0, consumer.assignment().size)
+
+ val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match
this
+ consumer.subscribe(pattern1, new TestConsumerReassignmentListener)
+
+ var assignment = Set(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(fooTopic, 0))
+ awaitAssignment(consumer, assignment)
+
+ val barTopic = "bar" // matches the next subscription pattern
+ createTopic(barTopic, 1, brokerCount)
+ sendRecords(producer, numRecords = 1000, new TopicPartition(barTopic, 0))
+
+ val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this
+ consumer.subscribe(pattern2, new TestConsumerReassignmentListener)
+ assignment --= Set(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1))
+ assignment ++= Set(
+ new TopicPartition(barTopic, 0))
+ awaitAssignment(consumer, assignment)
+
+ consumer.unsubscribe()
+ assertEquals(0, consumer.assignment().size)
+ }
+
+ /**
+ * Verifies that pattern unsubscription performs as expected.
+ * The pattern matches the topics 'topic' and 'tblablac'.
+ * It is expected that the consumer is subscribed to all partitions of
'topic' and
+ * 'tblablac' after the subscription when metadata is refreshed.
+ * When consumer unsubscribes from all its subscriptions, it is expected
that its
+ * assignments are cleared right away.
+ */
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testPatternUnsubscription(quorum: String, groupProtocol: String): Unit =
{
+ val numRecords = 10000
+ val producer = createProducer()
+ sendRecords(producer, numRecords, tp)
+
+ val topic1 = "tblablac" // matches the subscription pattern
+ createTopic(topic1, 2, brokerCount)
+ sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
+ sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
+
+ val consumer = createConsumer()
+ assertEquals(0, consumer.assignment().size)
+
+ consumer.subscribe(Pattern.compile("t.*c"), new
TestConsumerReassignmentListener)
+ val assignment = Set(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1))
+ awaitAssignment(consumer, assignment)
+
+ consumer.unsubscribe()
+ assertEquals(0, consumer.assignment().size)
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCommitMetadata(quorum: String, groupProtocol: String): Unit = {
+ val consumer = createConsumer()
+ consumer.assign(List(tp).asJava)
+
+ // sync commit
+ val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
+ consumer.commitSync(Map((tp, syncMetadata)).asJava)
+ assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+ // async commit
+ val asyncMetadata = new OffsetAndMetadata(10, "bar")
+ sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata)))
+ assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+ // handle null metadata
+ val nullMetadata = new OffsetAndMetadata(5, null)
+ consumer.commitSync(Map(tp -> nullMetadata).asJava)
+ assertEquals(nullMetadata, consumer.committed(Set(tp).asJava).get(tp))
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testAsyncCommit(quorum: String, groupProtocol: String): Unit = {
+ val consumer = createConsumer()
+ consumer.assign(List(tp).asJava)
+
+ val callback = new CountConsumerCommitCallback
+ val count = 5
+
+ for (i <- 1 to count)
+ consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava,
callback)
+
+ TestUtils.pollUntilTrue(consumer, () => callback.successCount >= count ||
callback.lastError.isDefined,
+ "Failed to observe commit callback before timeout", waitTimeMs = 10000)
+
+ assertEquals(None, callback.lastError)
+ assertEquals(count, callback.successCount)
+ assertEquals(new OffsetAndMetadata(count),
consumer.committed(Set(tp).asJava).get(tp))
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testExpandingTopicSubscriptions(quorum: String, groupProtocol: String):
Unit = {
+ val otherTopic = "other"
+ val initialAssignment = Set(new TopicPartition(topic, 0), new
TopicPartition(topic, 1))
+ val consumer = createConsumer()
+ consumer.subscribe(List(topic).asJava)
+ awaitAssignment(consumer, initialAssignment)
+
+ createTopic(otherTopic, 2, brokerCount)
+ val expandedAssignment = initialAssignment ++ Set(new
TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
+ consumer.subscribe(List(topic, otherTopic).asJava)
+ awaitAssignment(consumer, expandedAssignment)
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testShrinkingTopicSubscriptions(quorum: String, groupProtocol: String):
Unit = {
+ val otherTopic = "other"
+ createTopic(otherTopic, 2, brokerCount)
+ val initialAssignment = Set(new TopicPartition(topic, 0), new
TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new
TopicPartition(otherTopic, 1))
+ val consumer = createConsumer()
+ consumer.subscribe(List(topic, otherTopic).asJava)
+ awaitAssignment(consumer, initialAssignment)
+
+ val shrunkenAssignment = Set(new TopicPartition(topic, 0), new
TopicPartition(topic, 1))
+ consumer.subscribe(List(topic).asJava)
+ awaitAssignment(consumer, shrunkenAssignment)
+ }
+
Review Comment:
We shouldn't re-add all these subscription tests here. They were moved to a
new file `PlaintextConsumerSubscriptionTest`. I expect this PR should now have
no changes in the `PlainTextConsumerTest`, and only remove the TODOs for these
tests in the `PlaintextConsumerSubscriptionTest`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]