This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2700824abd0 KAFKA-20288 Remove orphaned groups from GroupConfigManager 
(#21758)
2700824abd0 is described below

commit 2700824abd0a93b0667f3b3e4ee6d9763ab0e2c5
Author: Chang-Yu Huang <[email protected]>
AuthorDate: Tue Mar 17 03:04:07 2026 -0400

    KAFKA-20288 Remove orphaned groups from GroupConfigManager (#21758)
    
    # Description
    `GroupConfigManager` stores the dynamic configs of groups. However, when
    all dynamic configs are removed the group id is not removed from the
    manager. This patch removes the orphan when the update group config is
    empty.
    
    # Test
    `testGroupIsRemovedWhenDynamicConfigsAreRemoved` checks whether the
    orphan group exists after updating using an empty config.
    
    Reviewers: Sean Quah <[email protected]>, David Jacot
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  9 ++--
 .../coordinator/group/GroupConfigManager.java      |  5 ++
 .../coordinator/group/GroupConfigManagerTest.java  | 15 ++++++
 .../kafka/server/config/DynamicConfigTest.java     | 59 ++++++++++++++++++++++
 4 files changed, 82 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 0a8e78c2157..790959ad8cc 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -3040,12 +3040,9 @@ public class SharePartition {
         if (partitionDataStartOffset != 
PartitionFactory.UNINITIALIZED_START_OFFSET) {
             return partitionDataStartOffset;
         }
-        ShareGroupAutoOffsetResetStrategy offsetResetStrategy;
-        if (groupConfigManager.groupConfig(groupId).isPresent()) {
-            offsetResetStrategy = 
groupConfigManager.groupConfig(groupId).get().shareAutoOffsetReset();
-        } else {
-            offsetResetStrategy = GroupConfig.defaultShareAutoOffsetReset();
-        }
+        ShareGroupAutoOffsetResetStrategy offsetResetStrategy = 
groupConfigManager.groupConfig(groupId)
+            .map(GroupConfig::shareAutoOffsetReset)
+            .orElseGet(GroupConfig::defaultShareAutoOffsetReset);
 
         if (offsetResetStrategy.type() == 
ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
             return offsetForLatestTimestamp(topicIdPartition, replicaManager, 
leaderEpoch);
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
index 05ebecfc99c..9374d768107 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java
@@ -64,6 +64,11 @@ public class GroupConfigManager implements AutoCloseable {
             throw new InvalidRequestException("Group name can't be empty.");
         }
 
+        if (newGroupConfig.isEmpty()) {
+            configMap.remove(groupId);
+            return;
+        }
+
         // Evaluate ensures configs respect broker-level bounds. For the Admin 
API path,
         // values are pre-validated so this is effectively a no-op. For the 
broker startup
         // path, configs from metadata may need evaluation if bounds have 
changed.
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
index f258514f2a3..901ca0c3c1a 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java
@@ -115,6 +115,21 @@ public class GroupConfigManagerTest {
         assertEquals(49000, 
configManager.groupConfig(groupId).get().getInt(CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
     }
 
+    @Test
+    public void testGroupIsRemovedWhenDynamicConfigsAreRemoved() {
+        String groupId1 = "foo";
+        String groupId2 = "bar";
+        Properties props = new Properties();
+        props.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 50000);
+        configManager.updateGroupConfig(groupId1, props);
+        configManager.updateGroupConfig(groupId2, props);
+        assertTrue(configManager.groupIds().contains(groupId1));
+
+        configManager.updateGroupConfig(groupId1, new Properties());
+        assertFalse(configManager.groupIds().contains(groupId1));
+        assertTrue(configManager.groupIds().contains(groupId2));
+    }
+
     public static GroupConfigManager createConfigManager() {
         return createConfigManager(new HashMap<>());
     }
diff --git 
a/server/src/test/java/org/apache/kafka/server/config/DynamicConfigTest.java 
b/server/src/test/java/org/apache/kafka/server/config/DynamicConfigTest.java
new file mode 100644
index 00000000000..4521221af1d
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/config/DynamicConfigTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.test.TestUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DynamicConfigTest {
+    @ClusterTest
+    public void testGroupIsRemovedWhenDynamicConfigsAreRemoved(ClusterInstance 
clusterInstance) throws ExecutionException, InterruptedException {
+        try (Admin admin = clusterInstance.admin()) {
+            var cr = new ConfigResource(ConfigResource.Type.GROUP, "gp");
+            assertEquals(List.of(), 
admin.listConfigResources(Set.of(ConfigResource.Type.GROUP), new 
ListConfigResourcesOptions()).all().get());
+
+            // add dynamic config
+            admin.incrementalAlterConfigs(Map.of(cr, List.of(new AlterConfigOp(
+                new ConfigEntry("consumer.session.timeout.ms", "45001"), 
AlterConfigOp.OpType.SET))))
+                .all()
+                .get();
+            TestUtils.waitForCondition(() -> 
!admin.listConfigResources(Set.of(ConfigResource.Type.GROUP), new 
ListConfigResourcesOptions()).all().get().isEmpty(),
+                "Should include a group with dynamic config");
+
+            // remove dynamic config
+            admin.incrementalAlterConfigs(Map.of(cr, List.of(new AlterConfigOp(
+                new ConfigEntry("consumer.session.timeout.ms", null), 
AlterConfigOp.OpType.DELETE))))
+                .all()
+                .get();
+            TestUtils.waitForCondition(() -> 
admin.listConfigResources(Set.of(ConfigResource.Type.GROUP), new 
ListConfigResourcesOptions()).all().get().isEmpty(),
+                "Should not include any group");
+        }
+    }
+}

Reply via email to