This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a6b1847720b MINOR: Remove temporary internal config group.share.enable 
(#21708)
a6b1847720b is described below

commit a6b1847720bcb5c16487d2a7865fd0e82c826a51
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Mon Mar 16 00:28:48 2026 +0530

    MINOR: Remove temporary internal config group.share.enable (#21708)
    
    ### Summary
    Removes the internal configuration group.share.enable that was
    previously used to enable share groups (KIP-932) in integration and
    system tests. Share groups are now exclusively controlled by the
    `share.version` feature flag, which is the proper enablement mechanism
    for production use.
    
    ### Motivation
    The `group.share.enable` config was an internal, test-only configuration
    that provided a secondary way to enable share groups alongside the
    proper share.version feature flag. This dual-enablement mechanism was
    confusing, error-prone, not production-ready and temporary.
    With share groups now stable and properly gated by the `share.version`
    feature flag, this internal config is no longer needed.
    
    ### Changes
    #### Core Configuration
    - `ShareGroupConfig.java`: Removed `SHARE_GROUP_ENABLE_CONFIG`, field,
    and getter method
    - `KafkaApis.scala`: Simplified `isShareGroupProtocolEnabled(`) to only
    check `shareVersion().supportsShareGroups`
    - `SharePartitionManager.java`: Removed `isEnabledFromConfig` parameter
    from `onShareVersionToggle()`
    - `BrokerMetadataPublisher.scala`: Updated to pass only `ShareVersion`
    to partition manager
    - `ShareCoordinatorService.java`: Removed supplier pattern for config,
    simplified `isShareGroupsEnabled()` to only check `ShareVersion` feature
    - `BrokerServer.scala`: Removed config supplier from
    ShareCoordinatorService builder
    
    #### Tests Updated
    - `ShareGroupConfigTest.java`: Removed config assertions and helper
    parameter
    - `GroupConfigTest.java`: Removed config constant
    - `ShareConsumerTest.java:` Removed `group.share.enable` cluster
    property
    - `SharePartitionManagerTest.java`: Updated method calls and removed
    obsolete test
    - `ShareCoordinatorServiceTest.java`: Updated to mock `MetadataImage`
    with proper ShareVersion feature flag
    - `BrokerMetadataPublisherTest.scala`: Updated method signature
    verification
    - Python system tests: Removed config constant and usage
    
    Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  |   1 -
 .../kafka/server/share/SharePartitionManager.java  |   5 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   1 -
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 .../server/metadata/BrokerMetadataPublisher.scala  |   2 +-
 .../server/share/SharePartitionManagerTest.java    |  22 +---
 .../metadata/BrokerMetadataPublisherTest.scala     |   2 +-
 .../group/modern/share/ShareGroupConfig.java       |  14 --
 .../kafka/coordinator/group/GroupConfigTest.java   |   3 +-
 .../group/modern/share/ShareGroupConfigTest.java   |   5 -
 .../coordinator/share/ShareCoordinatorService.java |  20 +--
 .../share/ShareCoordinatorServiceTest.java         | 141 ++++++++-------------
 tests/kafkatest/services/kafka/config_property.py  |   1 -
 tests/kafkatest/services/kafka/kafka.py            |   3 -
 14 files changed, 67 insertions(+), 155 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 837db109d94..325a3c3ca8d 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -2365,7 +2365,6 @@ public class ShareConsumerTest {
         brokers = 1,
         serverProperties = {
             @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
-            @ClusterConfigProperty(key = "group.share.enable", value = "true"),
             @ClusterConfigProperty(key = 
"group.share.max.partition.max.record.locks", value = "10000"),
             @ClusterConfigProperty(key = 
"group.share.partition.max.record.locks", value = "10000"),
             @ClusterConfigProperty(key = 
"group.share.record.lock.duration.ms", value = "15000"),
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 1f37eadf11b..22cdb5d61fe 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -563,12 +563,11 @@ public class SharePartitionManager implements 
AutoCloseable {
     /**
      * The handler for share version feature metadata changes.
      * @param shareVersion the new share version feature
-     * @param isEnabledFromConfig whether the share version feature is enabled 
from config
      */
-    public void onShareVersionToggle(ShareVersion shareVersion, boolean 
isEnabledFromConfig) {
+    public void onShareVersionToggle(ShareVersion shareVersion) {
         // Clear the cache and remove all share partitions from the cache if 
the share version does
         // not support share groups.
-        if (!shareVersion.supportsShareGroups() && !isEnabledFromConfig) {
+        if (!shareVersion.supportsShareGroups()) {
             cache.removeAllSessions();
             Set<SharePartitionKey> sharePartitionKeys = 
partitionCache.cachedSharePartitionKeys();
             // Remove all share partitions from partition cache.
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 8493321641f..f7aef286e10 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -717,7 +717,6 @@ class BrokerServer(
       .withWriter(writer)
       .withCoordinatorRuntimeMetrics(new 
ShareCoordinatorRuntimeMetrics(metrics))
       .withCoordinatorMetrics(new ShareCoordinatorMetrics(metrics))
-      .withShareGroupEnabledConfigSupplier(() => 
config.shareGroupConfig.isShareGroupEnabled)
       .build()
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 76a45415acc..555469599b3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -4224,7 +4224,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def isShareGroupProtocolEnabled: Boolean = {
-    config.shareGroupConfig.isShareGroupEnabled || 
shareVersion().supportsShareGroups
+    shareVersion().supportsShareGroups
   }
 
   /**
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 8672efcf9c5..d42664c0263 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -233,7 +233,7 @@ class BrokerMetadataPublisher(
             finalizedShareVersion = newFinalizedShareVersion
             val shareVersion: ShareVersion = 
ShareVersion.fromFeatureLevel(finalizedShareVersion)
             info(s"Feature share.version has been updated to version 
$finalizedShareVersion")
-            sharePartitionManager.onShareVersionToggle(shareVersion, 
config.shareGroupConfig.isShareGroupEnabled)
+            sharePartitionManager.onShareVersionToggle(shareVersion)
           }
         } catch {
           case t: Throwable => 
metadataPublishingFaultHandler.handleFault("Error updating share partition 
manager " +
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index d177ac6c9a9..134f4524cdd 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -2923,7 +2923,7 @@ public class SharePartitionManagerTest {
             .withPartitionCache(partitionCache)
             .build();
         assertEquals(4, partitionCache.size());
-        sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, false);
+        sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0);
         // Because we are toggling to a share version which does not support 
share groups, the cache inside share partitions must be cleared.
         assertEquals(0, partitionCache.size());
         //Check if all share partitions have been fenced.
@@ -2933,24 +2933,6 @@ public class SharePartitionManagerTest {
         Mockito.verify(sp3).markFenced();
     }
 
-    @Test
-    public void testOnShareVersionToggleWhenEnabledFromConfig() {
-        SharePartition sp0 = mock(SharePartition.class);
-        // Mock the share partitions corresponding to the topic partitions.
-        SharePartitionCache partitionCache = new SharePartitionCache();
-        partitionCache.put(
-            new SharePartitionKey("grp", new 
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))), sp0
-        );
-        sharePartitionManager = SharePartitionManagerBuilder.builder()
-            .withPartitionCache(partitionCache)
-            .build();
-        assertEquals(1, partitionCache.size());
-        sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, true);
-        // Though share version is toggled to off, but it's enabled from 
config, hence the cache should not be cleared.
-        assertEquals(1, partitionCache.size());
-        Mockito.verify(sp0, times(0)).markFenced();
-    }
-
     @Test
     public void testShareGroupListener() {
         String groupId = "grp";
@@ -3023,7 +3005,7 @@ public class SharePartitionManagerTest {
         assertEquals(1, partitionCache.size());
 
         // Clean up share session and partition cache.
-        sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, false);
+        sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0);
         assertEquals(0, cache.size());
         assertEquals(0, partitionCache.size());
 
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index f271d238d06..ba7445de0cb 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -356,6 +356,6 @@ class BrokerMetadataPublisherTest {
     )
 
     // SharePartitionManager is receiving the latest changes.
-    verify(sharePartitionManager).onShareVersionToggle(any(), any())
+    verify(sharePartitionManager).onShareVersionToggle(any())
   }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index 5c993f420b2..cc72a8b48e3 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -27,18 +27,12 @@ import java.util.Map;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
 
 public class ShareGroupConfig {
     /** Share Group Configurations **/
 
-    // Internal configuration used by integration and system tests.
-    public static final String SHARE_GROUP_ENABLE_CONFIG = 
"group.share.enable";
-    public static final boolean SHARE_GROUP_ENABLE_DEFAULT = false;
-    public static final String SHARE_GROUP_ENABLE_DOC = "Enable share groups 
on the broker.";
-
     public static final String SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG = 
"group.share.partition.max.record.locks";
     public static final int SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT = 
2000;
     public static final String SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC = 
"Share-group record lock limit per share-partition.";
@@ -89,7 +83,6 @@ public class ShareGroupConfig {
         "the <code>org.apache.kafka.server.share.Persister</code> interface.";
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
-            .defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN, 
SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC)
             .define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT, 
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM, 
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
             .define(SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, INT, 
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT, between(5, 25), MEDIUM, 
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DOC)
             .define(SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG, INT, 
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 5), MEDIUM, 
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DOC)
@@ -103,7 +96,6 @@ public class ShareGroupConfig {
             .define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT, 
SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM, 
SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
             .defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING, 
SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM, 
SHARE_GROUP_PERSISTER_CLASS_NAME_DOC);
 
-    private final boolean isShareGroupEnabled;
     private final int shareGroupPartitionMaxRecordLocks;
     private final int shareGroupMaxPartitionMaxRecordLocks;
     private final int shareGroupMinPartitionMaxRecordLocks;
@@ -120,8 +112,6 @@ public class ShareGroupConfig {
 
     public ShareGroupConfig(AbstractConfig config) {
         this.config = config;
-        // The proper way to enable share groups is to use the share.version 
feature with v1 or later.
-        isShareGroupEnabled = 
config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG);
         shareGroupPartitionMaxRecordLocks = 
config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG);
         shareGroupMaxPartitionMaxRecordLocks = 
config.getInt(SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG);
         shareGroupMinPartitionMaxRecordLocks = 
config.getInt(SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG);
@@ -148,10 +138,6 @@ public class ShareGroupConfig {
     }
 
     /** Share group configuration **/
-    public boolean isShareGroupEnabled() {
-        return isShareGroupEnabled;
-    }
-
     public int shareGroupPartitionMaxRecordLocks() {
         return shareGroupPartitionMaxRecordLocks;
     }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 2315d4496ce..bd7bec0e98f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -62,7 +62,6 @@ public class GroupConfigTest {
     private static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS = 1000L;
     private static final int OFFSETS_RETENTION_MINUTES = 24 * 60;
 
-    private static final boolean SHARE_GROUP_ENABLE = true;
     private static final int SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS = 200;
     private static final int SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS = 100;
     private static final int SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS = 
10000;
@@ -512,7 +511,7 @@ public class GroupConfigTest {
     }
 
     private ShareGroupConfig createShareGroupConfig() {
-        return ShareGroupConfigTest.createShareGroupConfig(SHARE_GROUP_ENABLE, 
SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS,
+        return 
ShareGroupConfigTest.createShareGroupConfig(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS,
                 SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS, 
SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS,
                 SHARE_GROUP_DELIVERY_COUNT_LIMIT, 
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT,
                 SHARE_GROUP_RECORD_LOCK_DURATION_MS, 
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS, 
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 50ceedfda81..4f70e3909aa 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -26,14 +26,12 @@ import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ShareGroupConfigTest {
 
     @Test
     public void testConfigs() {
         Map<String, Object> configs = new HashMap<>();
-        configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG, true);
         
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
200);
         
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
100);
         
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
10000);
@@ -48,7 +46,6 @@ public class ShareGroupConfigTest {
 
         ShareGroupConfig config = createConfig(configs);
 
-        assertTrue(config.isShareGroupEnabled());
         assertEquals(200, config.shareGroupPartitionMaxRecordLocks());
         assertEquals(5, config.shareGroupDeliveryCountLimit());
         assertEquals(30000, config.shareGroupRecordLockDurationMs());
@@ -137,7 +134,6 @@ public class ShareGroupConfigTest {
     }
 
     public static ShareGroupConfig createShareGroupConfig(
-        boolean shareGroupEnable,
         int shareGroupPartitionMaxRecordLocks,
         int shareGroupMinPartitionMaxRecordLocks,
         int shareGroupMaxPartitionMaxRecordLocks,
@@ -149,7 +145,6 @@ public class ShareGroupConfigTest {
         int shareGroupMaxRecordLockDurationMs
     ) {
         Map<String, Object> configs = new HashMap<>();
-        configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG, 
shareGroupEnable);
         
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
shareGroupPartitionMaxRecordLocks);
         
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
shareGroupMinPartitionMaxRecordLocks);
         
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
shareGroupMaxPartitionMaxRecordLocks);
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 2f591415574..d5e11424486 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -78,7 +78,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntSupplier;
-import java.util.function.Supplier;
 
 import static 
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
 
@@ -94,7 +93,6 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
     private final Timer timer;
     private final PartitionWriter writer;
     private final Map<TopicPartition, Long> lastPrunedOffsets;
-    private final Supplier<Boolean> shareGroupConfigEnabledSupplier;
     private volatile boolean shouldRunPeriodicJob;
 
     public static class Builder {
@@ -104,7 +102,6 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         private CoordinatorLoader<CoordinatorRecord> loader;
         private Time time;
         private Timer timer;
-        private Supplier<Boolean> shareGroupConfigEnabledSupplier;
         private ShareCoordinatorMetrics coordinatorMetrics;
         private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
 
@@ -143,11 +140,6 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
             return this;
         }
 
-        public Builder withShareGroupEnabledConfigSupplier(Supplier<Boolean> 
shareGroupConfigEnabledSupplier) {
-            this.shareGroupConfigEnabledSupplier = 
shareGroupConfigEnabledSupplier;
-            return this;
-        }
-
         public ShareCoordinatorService build() {
             if (config == null) {
                 throw new IllegalArgumentException("Config must be set.");
@@ -170,9 +162,6 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
             if (coordinatorRuntimeMetrics == null) {
                 throw new IllegalArgumentException("Coordinator runtime 
metrics must be set.");
             }
-            if (shareGroupConfigEnabledSupplier == null) {
-                throw new IllegalArgumentException("Share group enabled config 
enabled supplier must be set.");
-            }
 
             String logPrefix = String.format("ShareCoordinator id=%d", nodeId);
             LogContext logContext = new LogContext(String.format("[%s] ", 
logPrefix));
@@ -216,8 +205,7 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                 coordinatorMetrics,
                 time,
                 timer,
-                writer,
-                shareGroupConfigEnabledSupplier
+                writer
             );
         }
     }
@@ -229,8 +217,7 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         ShareCoordinatorMetrics shareCoordinatorMetrics,
         Time time,
         Timer timer,
-        PartitionWriter writer,
-        Supplier<Boolean> shareGroupConfigEnabledSupplier
+        PartitionWriter writer
     ) {
         this.log = logContext.logger(ShareCoordinatorService.class);
         this.config = config;
@@ -240,7 +227,6 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
         this.timer = timer;
         this.writer = writer;
         this.lastPrunedOffsets = new ConcurrentHashMap<>();
-        this.shareGroupConfigEnabledSupplier = shareGroupConfigEnabledSupplier;
     }
 
     @Override
@@ -1130,7 +1116,7 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
     }
 
     private boolean isShareGroupsEnabled(MetadataImage image) {
-        return shareGroupConfigEnabledSupplier.get() || 
ShareVersion.fromFeatureLevel(
+        return ShareVersion.fromFeatureLevel(
             
image.features().finalizedVersions().getOrDefault(ShareVersion.FEATURE_NAME, 
(short) 0)
         ).supportsShareGroups();
     }
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 6c22e91bcf1..cf5c51590a5 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -97,6 +97,13 @@ class ShareCoordinatorServiceTest {
         return runtime;
     }
 
+    private MetadataImage mockMetadataImageWithShareGroupsEnabled() {
+        MetadataImage image = mock(MetadataImage.class, RETURNS_DEEP_STUBS);
+        
when(image.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
 anyShort()))
+            .thenReturn((short) 1);
+        return image;
+    }
+
     @Test
     public void testStartupShutdown() throws Exception {
         CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
@@ -107,8 +114,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             new MockTimer(),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -131,8 +137,7 @@ class ShareCoordinatorServiceTest {
             coordinatorMetrics,
             time,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -243,8 +248,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -345,8 +349,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -429,8 +432,7 @@ class ShareCoordinatorServiceTest {
             coordinatorMetrics,
             time,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -511,8 +513,7 @@ class ShareCoordinatorServiceTest {
             coordinatorMetrics,
             time,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -593,8 +594,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -641,8 +641,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -689,8 +688,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -737,8 +735,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -785,8 +782,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -833,8 +829,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         String groupId = "group1";
@@ -915,8 +910,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         String groupId = "group1";
@@ -979,8 +973,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         String groupId = "group1";
@@ -1043,8 +1036,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         String groupId = "group1";
@@ -1105,8 +1097,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         String groupId = "group1";
@@ -1166,8 +1157,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -1218,8 +1208,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -1262,8 +1251,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -1306,8 +1294,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -1349,8 +1336,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -1391,8 +1377,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         service.startup(() -> 1);
@@ -1421,8 +1406,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(),
             Time.SYSTEM,
             mock(Timer.class),
-            mock(PartitionWriter.class),
-            () -> true
+            mock(PartitionWriter.class)
         );
 
         String groupId = "group1";
@@ -1479,12 +1463,11 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 1);
-        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mockMetadataImageWithShareGroupsEnabled());
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1569,12 +1552,11 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 2);
-        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mockMetadataImageWithShareGroupsEnabled());
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1627,12 +1609,11 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 1);
-        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mockMetadataImageWithShareGroupsEnabled());
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1677,12 +1658,11 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 1);
-        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mockMetadataImageWithShareGroupsEnabled());
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1725,12 +1705,11 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 1);
-        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mockMetadataImageWithShareGroupsEnabled());
 
         verify(runtime, times(0))
             .scheduleWriteOperation(
@@ -1786,12 +1765,11 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 1);
-        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mockMetadataImageWithShareGroupsEnabled());
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1858,12 +1836,11 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 1);
-        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mockMetadataImageWithShareGroupsEnabled());
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("write-state-record-prune"),
@@ -1909,8 +1886,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         when(runtime.scheduleWriteAllOperation(
@@ -1919,7 +1895,7 @@ class ShareCoordinatorServiceTest {
         )).thenReturn(List.of(CompletableFuture.completedFuture(null)));
 
         service.startup(() -> 1);
-        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mockMetadataImageWithShareGroupsEnabled());
         verify(runtime, times(0))
             .scheduleWriteOperation(
                 eq("snapshot-cold-partitions"),
@@ -1968,12 +1944,11 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 2);
-        service.onMetadataUpdate(mock(MetadataDelta.class), 
mock(MetadataImage.class));
+        service.onMetadataUpdate(mock(MetadataDelta.class), 
mockMetadataImageWithShareGroupsEnabled());
         verify(runtime, times(0))
             .scheduleWriteAllOperation(
                 eq("snapshot-cold-partitions"),
@@ -2012,8 +1987,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> false // So that the feature config is used.
+            writer
         ));
 
         // Prune job.
@@ -2108,8 +2082,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         List<String> propNames = List.of(
@@ -2141,8 +2114,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 3);
@@ -2191,8 +2163,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 3);
@@ -2227,8 +2198,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 3);
@@ -2237,6 +2207,8 @@ class ShareCoordinatorServiceTest {
         MetadataDelta delta = mock(MetadataDelta.class);
         MetadataImage image = mock(MetadataImage.class, RETURNS_DEEP_STUBS);
         when(delta.topicsDelta()).thenReturn(null);
+        
when(image.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
 anyShort()))
+            .thenReturn((short) 1);
 
         assertDoesNotThrow(() -> service.onMetadataUpdate(delta, image));
 
@@ -2262,8 +2234,7 @@ class ShareCoordinatorServiceTest {
             new ShareCoordinatorMetrics(metrics),
             time,
             timer,
-            writer,
-            () -> true
+            writer
         ));
 
         service.startup(() -> 3);
diff --git a/tests/kafkatest/services/kafka/config_property.py 
b/tests/kafkatest/services/kafka/config_property.py
index 329df370d19..702120e1625 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -78,7 +78,6 @@ CONSUMER_GROUP_MIGRATION_POLICY = 
"group.consumer.migration.policy"
 
 SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR 
="share.coordinator.state.topic.replication.factor"
 SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR = "share.coordinator.state.topic.min.isr"
-SHARE_GROUP_ENABLE = "group.share.enable"
 
 UNSTABLE_API_VERSIONS_ENABLE = "unstable.api.versions.enable"
 UNSTABLE_FEATURE_VERSIONS_ENABLE = "unstable.feature.versions.enable"
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 04d3457f3b7..cb4f0af990d 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -781,9 +781,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         for prop in self.per_node_server_prop_overrides.get(self.idx(node), 
[]):
             override_configs[prop[0]] = prop[1]
 
-        if self.use_share_groups is not None and self.use_share_groups is True:
-            override_configs[config_property.SHARE_GROUP_ENABLE] = 
str(self.use_share_groups)
-
         if self.use_streams_groups is True:
             override_configs[config_property.UNSTABLE_API_VERSIONS_ENABLE] = 
str(True)
             override_configs[config_property.UNSTABLE_FEATURE_VERSIONS_ENABLE] 
= str(True)


Reply via email to