lianetm commented on code in PR #17516:
URL: https://github.com/apache/kafka/pull/17516#discussion_r1824955585
##########
core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala:
##########
@@ -124,12 +128,56 @@ class GroupAuthorizerIntegrationTest extends
BaseRequestTest {
() => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic,
"message".getBytes)).get()).getCause
assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
assertEquals(Set(topic),
produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
+ }
- val consumer = createConsumer(configsToRemove =
List(ConsumerConfig.GROUP_ID_CONFIG))
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol:
String): Unit = {
+ val topic = "topic"
+ val topicPartition = new TopicPartition(topic, 0)
+
+ createTopic(topic, listenerName = interBrokerListenerName)
+
+ val consumer = createConsumer()
+ consumer.assign(List(topicPartition).asJava)
+ val consumeException = assertThrows(classOf[TopicAuthorizationException],
+ () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+ assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
+
+ assertThrows(classOf[GroupAuthorizationException],
+ () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
+
+ // TODO: use background-event-queue-size metric to check there is
background event
+ Thread.sleep(3000)
+
+ assertDoesNotThrow(new Executable {
+ override def execute(): Unit = consumer.unsubscribe()
+ })
Review Comment:
uhm I think we may be not getting the classic consumer behaviour right here.
I would expect this assertion passes for the classic consumer here only because
of how the test is written (never finds a coordinator), but that doesn't mean
that the classic consumer does not throw topic or group auth exceptions on
unsubscribe (which is what the changes in this PR are trying to achieve for the
new consumer)
1. I believe classic does not throw GroupAuthException here only because it
never finds a coordinator, so the unsubscribes perform no action on the group
(but if it had had a known coordinator, the unsubscribe would send a leave
group, if the classic checks the response I expect would fail with
GroupAuthException, to double check...)
2. I believe classic does not throw TopicAuthException here only because
unsubscribe does not poll the network client if it doesn't have a coordinator
to send the leave group to.
I could definitely be missing something, but we could validate my
expectations with an integration test here:
- consumer subscribes to a group successfully (has acls to READ + GROUP +
"group-name-from-config")
- looses the acls (using `removeAndVerifyAcls`)
- consumer.unsubscribe -> I would expect that this should indeed throw a
group authorization exception received in the response to the leave group, or
the topic auth exception propagated from metadata (honestly not sure about the
precedence, but both should be there)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]