dajac commented on code in PR #16859:
URL: https://github.com/apache/kafka/pull/16859#discussion_r1714940299


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -925,6 +925,66 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     checkValidAlterConfigs(client, this, topicResource1, topicResource2)
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip848"))
+  def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = {
+    client = createAdminClient
+    val group = "describe-alter-configs-group"
+    val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group)
+
+    // Alter group configs
+    var groupAlterConfigs = Seq(
+      new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"), 
AlterConfigOp.OpType.SET),
+      new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, ""), 
AlterConfigOp.OpType.DELETE)

Review Comment:
   I suppose that we expect this one to be a no-op. Am I correct?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -528,6 +529,56 @@ class KafkaApisTest extends Logging {
     verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
   }
 
+  @Test
+  def testDescribeConfigsConsumerGroup(): Unit = {
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val operation = AclOperation.DESCRIBE_CONFIGS
+    val resourceType = ResourceType.GROUP
+    val consumerGroupId = "consumer_group_1"
+    val requestHeader =
+      new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, 
ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0)
+    val expectedActions = Seq(
+      new Action(operation, new ResourcePattern(resourceType, consumerGroupId, 
PatternType.LITERAL),
+        1, true, true)
+    )
+
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(expectedActions.asJava)))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+
+    val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
+    val cgConfigs = new Properties()
+    cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
+    cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
+    when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
+
+    metadataCache = mock(classOf[ZkMetadataCache])
+    val describeConfigsRequest = new DescribeConfigsRequest.Builder(new 
DescribeConfigsRequestData()
+      .setIncludeSynonyms(true)
+      .setResources(List(new 
DescribeConfigsRequestData.DescribeConfigsResource()
+        .setResourceName(consumerGroupId)
+        .setResourceType(ConfigResource.Type.GROUP.id)).asJava))
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(describeConfigsRequest,
+      requestHeader = Option(requestHeader))
+    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+
+    createKafkaApis(authorizer = Some(authorizer), configRepository = 
configRepository)
+      .handleDescribeConfigsRequest(request)
+
+    val response = verifyNoThrottling[DescribeConfigsResponse](request)
+    // Verify that authorize is only called once
+    verify(authorizer, times(1)).authorize(any(), any())
+    val results = response.data().results()
+    assertEquals(1, results.size())
+    val describeConfigsResult: DescribeConfigsResult = results.get(0)

Review Comment:
   nit: Could we remove specifying the type?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -528,6 +529,56 @@ class KafkaApisTest extends Logging {
     verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
   }
 
+  @Test
+  def testDescribeConfigsConsumerGroup(): Unit = {
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val operation = AclOperation.DESCRIBE_CONFIGS
+    val resourceType = ResourceType.GROUP
+    val consumerGroupId = "consumer_group_1"
+    val requestHeader =
+      new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, 
ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0)
+    val expectedActions = Seq(
+      new Action(operation, new ResourcePattern(resourceType, consumerGroupId, 
PatternType.LITERAL),
+        1, true, true)
+    )
+
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(expectedActions.asJava)))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+
+    val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
+    val cgConfigs = new Properties()
+    cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
+    cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
+    when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
+
+    metadataCache = mock(classOf[ZkMetadataCache])

Review Comment:
   Is this one needed?



##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -2087,6 +2087,33 @@ public void testDescribeClientMetricsConfigs() throws 
Exception {
         }
     }
 
+    @Test
+    public void testDescribeConsumerGroupConfigs() throws Exception {
+        ConfigResource resource = new 
ConfigResource(ConfigResource.Type.GROUP, "group1");
+        ConfigResource resource1 = new 
ConfigResource(ConfigResource.Type.GROUP, "group2");

Review Comment:
   nit: Would it make sense to name them resourceGroup1 and resourceGroup2? It 
is a bit misleading that resource1 maps to group2.



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -925,6 +925,66 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     checkValidAlterConfigs(client, this, topicResource1, topicResource2)
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip848"))
+  def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = {
+    client = createAdminClient
+    val group = "describe-alter-configs-group"
+    val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group)
+
+    // Alter group configs
+    var groupAlterConfigs = Seq(
+      new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"), 
AlterConfigOp.OpType.SET),
+      new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, ""), 
AlterConfigOp.OpType.DELETE)
+    ).asJavaCollection
+
+    var alterResult = client.incrementalAlterConfigs(Map(
+      groupResource -> groupAlterConfigs
+    ).asJava)
+
+    assertEquals(Set(groupResource).asJava, alterResult.values.keySet)
+    alterResult.all.get

Review Comment:
   I would put a timeout here. Could we also verify the results?



##########
core/src/main/scala/kafka/server/ConfigHelper.scala:
##########
@@ -149,6 +153,19 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
                 .setConfigs(configEntries.asJava)
             }
 
+          case ConfigResource.Type.GROUP =>
+            val group = resource.resourceName
+            if (group == null || group.isEmpty) {
+              throw new InvalidRequestException("Group name must not be empty")
+            } else {
+              val groupProps = configRepository.groupConfig(group)
+              val groupConfig = 
GroupConfig.fromProps(config.groupCoordinatorConfig.extractGroupConfigMap, 
groupProps)

Review Comment:
   I am not sure about this part. My understanding is that it will return the 
default configs if the group does not have any dynamic configs set. If I 
remember correctly, it does not work like this for topics. In my mind, it would 
be better to only return the dynamic configs. It is confusing otherwise. What 
do you think?



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -925,6 +925,66 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     checkValidAlterConfigs(client, this, topicResource1, topicResource2)
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip848"))
+  def testIncrementalAlterAndDescribeGroupConfigs(quorum: String): Unit = {
+    client = createAdminClient
+    val group = "describe-alter-configs-group"
+    val groupResource = new ConfigResource(ConfigResource.Type.GROUP, group)
+
+    // Alter group configs
+    var groupAlterConfigs = Seq(
+      new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "50000"), 
AlterConfigOp.OpType.SET),
+      new AlterConfigOp(new 
ConfigEntry(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, ""), 
AlterConfigOp.OpType.DELETE)
+    ).asJavaCollection
+
+    var alterResult = client.incrementalAlterConfigs(Map(
+      groupResource -> groupAlterConfigs
+    ).asJava)
+
+    assertEquals(Set(groupResource).asJava, alterResult.values.keySet)
+    alterResult.all.get
+
+    ensureConsistentKRaftMetadata()
+
+    // Describe group config, verify that group config was updated correctly
+    var describeResult = client.describeConfigs(Seq(groupResource).asJava)
+    var configs = describeResult.all.get

Review Comment:
   I would also put a timeout here.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -528,6 +529,56 @@ class KafkaApisTest extends Logging {
     verify(adminManager).incrementalAlterConfigs(any(), anyBoolean())
   }
 
+  @Test
+  def testDescribeConfigsConsumerGroup(): Unit = {
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+    val operation = AclOperation.DESCRIBE_CONFIGS
+    val resourceType = ResourceType.GROUP
+    val consumerGroupId = "consumer_group_1"
+    val requestHeader =
+      new RequestHeader(ApiKeys.DESCRIBE_CONFIGS, 
ApiKeys.DESCRIBE_CONFIGS.latestVersion, clientId, 0)
+    val expectedActions = Seq(
+      new Action(operation, new ResourcePattern(resourceType, consumerGroupId, 
PatternType.LITERAL),
+        1, true, true)
+    )
+
+    when(authorizer.authorize(any[RequestContext], 
ArgumentMatchers.eq(expectedActions.asJava)))
+      .thenReturn(Seq(AuthorizationResult.ALLOWED).asJava)
+
+    val configRepository: ConfigRepository = mock(classOf[ConfigRepository])
+    val cgConfigs = new Properties()
+    cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
+    cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
+    when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
+
+    metadataCache = mock(classOf[ZkMetadataCache])
+    val describeConfigsRequest = new DescribeConfigsRequest.Builder(new 
DescribeConfigsRequestData()
+      .setIncludeSynonyms(true)
+      .setResources(List(new 
DescribeConfigsRequestData.DescribeConfigsResource()
+        .setResourceName(consumerGroupId)
+        .setResourceType(ConfigResource.Type.GROUP.id)).asJava))
+      .build(requestHeader.apiVersion)
+    val request = buildRequest(describeConfigsRequest,
+      requestHeader = Option(requestHeader))
+    
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
+      any[Long])).thenReturn(0)
+
+    createKafkaApis(authorizer = Some(authorizer), configRepository = 
configRepository)
+      .handleDescribeConfigsRequest(request)
+
+    val response = verifyNoThrottling[DescribeConfigsResponse](request)
+    // Verify that authorize is only called once
+    verify(authorizer, times(1)).authorize(any(), any())
+    val results = response.data().results()

Review Comment:
   nit: One small remark. We usually don't use `()` for getters. You could use 
`response.data.results`. There are a few other cases like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to