dajac commented on code in PR #16859:
URL: https://github.com/apache/kafka/pull/16859#discussion_r1714940299
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -925,6 +925,66 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
checkValidAlterConfigs(client, this, topicResource1, topicResource2)
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft+kip848"))
+ def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = {
+ client = createAdminClient
+ val group = "describe-alter-configs-group"
+ val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group)
+
+ // Alter group configs
+ var groupAlterConfigs = Seq(
+ new AlterConfigOp(new
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"),
AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, ""),
AlterConfigOp.OpType.DELETE)
Review Comment:
I suppose that we expect this one to be a no-op. Am I correct?
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -528,6 +529,56 @@ class KafkaApisTest extends Logging {
verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
}
+ @Test
+ def testDescribeConfigsConsumerGroup(): Unit = {
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val operation = AclOperation.DESCRIBE_CONFIGS
+ val resourceType = ResourceType.GROUP
+ val consumerGroupId = "consumer_group_1"
+ val requestHeader =
+ new RequestHeader(ApiKeys.DESCRIBE_CONFIGS,
ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0)
+ val expectedActions = Seq(
+ new Action(operation, new ResourcePattern(resourceType, consumerGroupId,
PatternType.LITERAL),
+ 1, true, true)
+ )
+
+ when(authorizer.authorize(any[RequestContext],
ArgumentMatchers.eq(expectedActions.asJava)))
+ .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+
+ val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
+ val cgConfigs = new Properties()
+ cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
+ cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
+ when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
+
+ metadataCache = mock(classOf[ZkMetadataCache])
+ val describeConfigsRequest = new DescribeConfigsRequest.Builder(new
DescribeConfigsRequestData()
+ .setIncludeSynonyms(true)
+ .setResources(List(new
DescribeConfigsRequestData.DescribeConfigsResource()
+ .setResourceName(consumerGroupId)
+ .setResourceType(ConfigResource.Type.GROUP.id)).asJava))
+ .build(requestHeader.apiVersion)
+ val request = buildRequest(describeConfigsRequest,
+ requestHeader = Option(requestHeader))
+
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+
+ createKafkaApis(authorizer = Some(authorizer), configRepository =
configRepository)
+ .handleDescribeConfigsRequest(request)
+
+ val response = verifyNoThrottling[DescribeConfigsResponse](request)
+ // Verify that authorize is only called once
+ verify(authorizer, times(1)).authorize(any(), any())
+ val results = response.data().results()
+ assertEquals(1, results.size())
+ val describeConfigsResult: DescribeConfigsResult = results.get(0)
Review Comment:
nit: Could we remove specifying the type?
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -528,6 +529,56 @@ class KafkaApisTest extends Logging {
verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
}
+ @Test
+ def testDescribeConfigsConsumerGroup(): Unit = {
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val operation = AclOperation.DESCRIBE_CONFIGS
+ val resourceType = ResourceType.GROUP
+ val consumerGroupId = "consumer_group_1"
+ val requestHeader =
+ new RequestHeader(ApiKeys.DESCRIBE_CONFIGS,
ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0)
+ val expectedActions = Seq(
+ new Action(operation, new ResourcePattern(resourceType, consumerGroupId,
PatternType.LITERAL),
+ 1, true, true)
+ )
+
+ when(authorizer.authorize(any[RequestContext],
ArgumentMatchers.eq(expectedActions.asJava)))
+ .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+
+ val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
+ val cgConfigs = new Properties()
+ cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
+ cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
+ when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
+
+ metadataCache = mock(classOf[ZkMetadataCache])
Review Comment:
Is this one needed?
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2087,6 +2087,33 @@ public void testDescribeClientMetricsConfigs() throws
Exception {
}
}
+ @Test
+ public void testDescribeConsumerGroupConfigs() throws Exception {
+ ConfigResource resource = new
ConfigResource(ConfigResource.Type.GROUP, "group1");
+ ConfigResource resource1 = new
ConfigResource(ConfigResource.Type.GROUP, "group2");
Review Comment:
nit: Would it make sense to name them resourceGroup1 and resourceGroup2? It
is a bit misleading that resource1 maps to group2.
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -925,6 +925,66 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
checkValidAlterConfigs(client, this, topicResource1, topicResource2)
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft+kip848"))
+ def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = {
+ client = createAdminClient
+ val group = "describe-alter-configs-group"
+ val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group)
+
+ // Alter group configs
+ var groupAlterConfigs = Seq(
+ new AlterConfigOp(new
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"),
AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, ""),
AlterConfigOp.OpType.DELETE)
+ ).asJavaCollection
+
+ var alterResult = client.incrementalAlterConfigs(Map(
+ groupResource -> groupAlterConfigs
+ ).asJava)
+
+ assertEquals(Set(groupResource).asJava, alterResult.values.keySet)
+ alterResult.all.get
Review Comment:
I would put a timeout here. Could we also verify the results?
##########
core/src/main/scala/kafka/server/ConfigHelper.scala:
##########
@@ -149,6 +153,19 @@ class ConfigHelper(metadataCache: MetadataCache, config:
KafkaConfig, configRepo
.setConfigs(configEntries.asJava)
}
+ case ConfigResource.Type.GROUP =>
+ val group = resource.resourceName
+ if (group == null || group.isEmpty) {
+ throw new InvalidRequestException("Group name must not be empty")
+ } else {
+ val groupProps = configRepository.groupConfig(group)
+ val groupConfig =
GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap,
groupProps)
Review Comment:
I am not sure about this part. My understanding is that it will return the
default configs if the group does not have any dynamic configs set. If I
remember correctly, it does not work like this for topics. In my mind, it would
be better to only return the dynamic configs. It is confusing otherwise. What
do you think?
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -925,6 +925,66 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
checkValidAlterConfigs(client, this, topicResource1, topicResource2)
}
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft+kip848"))
+ def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = {
+ client = createAdminClient
+ val group = "describe-alter-configs-group"
+ val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group)
+
+ // Alter group configs
+ var groupAlterConfigs = Seq(
+ new AlterConfigOp(new
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"),
AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new
ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, ""),
AlterConfigOp.OpType.DELETE)
+ ).asJavaCollection
+
+ var alterResult = client.incrementalAlterConfigs(Map(
+ groupResource -> groupAlterConfigs
+ ).asJava)
+
+ assertEquals(Set(groupResource).asJava, alterResult.values.keySet)
+ alterResult.all.get
+
+ ensureConsistentKRaftMetadata()
+
+ // Describe group config, verify that group config was updated correctly
+ var describeResult = client.describeConfigs(Seq(groupResource).asJava)
+ var configs = describeResult.all.get
Review Comment:
I would also put a timeout here.
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -528,6 +529,56 @@ class KafkaApisTest extends Logging {
verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
}
+ @Test
+ def testDescribeConfigsConsumerGroup(): Unit = {
+ val authorizer: Authorizer = mock(classOf[Authorizer])
+ val operation = AclOperation.DESCRIBE_CONFIGS
+ val resourceType = ResourceType.GROUP
+ val consumerGroupId = "consumer_group_1"
+ val requestHeader =
+ new RequestHeader(ApiKeys.DESCRIBE_CONFIGS,
ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0)
+ val expectedActions = Seq(
+ new Action(operation, new ResourcePattern(resourceType, consumerGroupId,
PatternType.LITERAL),
+ 1, true, true)
+ )
+
+ when(authorizer.authorize(any[RequestContext],
ArgumentMatchers.eq(expectedActions.asJava)))
+ .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+
+ val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
+ val cgConfigs = new Properties()
+ cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
+ cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
+ when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
+
+ metadataCache = mock(classOf[ZkMetadataCache])
+ val describeConfigsRequest = new DescribeConfigsRequest.Builder(new
DescribeConfigsRequestData()
+ .setIncludeSynonyms(true)
+ .setResources(List(new
DescribeConfigsRequestData.DescribeConfigsResource()
+ .setResourceName(consumerGroupId)
+ .setResourceType(ConfigResource.Type.GROUP.id)).asJava))
+ .build(requestHeader.apiVersion)
+ val request = buildRequest(describeConfigsRequest,
+ requestHeader = Option(requestHeader))
+
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+ any[Long])).thenReturn(0)
+
+ createKafkaApis(authorizer = Some(authorizer), configRepository =
configRepository)
+ .handleDescribeConfigsRequest(request)
+
+ val response = verifyNoThrottling[DescribeConfigsResponse](request)
+ // Verify that authorize is only called once
+ verify(authorizer, times(1)).authorize(any(), any())
+ val results = response.data().results()
Review Comment:
nit: One small remark. We usually don't use `()` for getters. You could use
`response.data.results`. There are a few other cases like this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]