This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.3 by this push:
     new fd7b5ece823 KAFKA-20339: Remove assignment offloading configs for 4.3 
(#21926)
fd7b5ece823 is described below

commit fd7b5ece823f6c0b8b1383f5d5c6d00c5424a7c0
Author: Sean Quah <[email protected]>
AuthorDate: Tue Apr 7 14:31:10 2026 +0100

    KAFKA-20339: Remove assignment offloading configs for 4.3 (#21926)
    
    The KIP-1263 assignment offloading configs aren't fully supported yet
    in Apache Kafka 4.3. Remove them to hide them from users.
    
    Reviewers: David Jacot <[email protected]>, majialong
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../kafka/server/DynamicBrokerConfigTest.scala     | 15 -----------
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  5 +---
 .../kafka/coordinator/group/GroupConfig.java       | 30 +++-------------------
 .../coordinator/group/GroupCoordinatorConfig.java  | 15 +++--------
 .../kafka/coordinator/group/GroupConfigTest.java   | 16 ++----------
 .../group/GroupCoordinatorConfigTest.java          |  6 -----
 .../group/GroupMetadataManagerTest.java            | 18 -------------
 7 files changed, 10 insertions(+), 95 deletions(-)

diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 00a74855ec4..c2fb3f935e3 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -1183,46 +1183,31 @@ class DynamicBrokerConfigTest {
   def testDynamicGroupCoordinatorConfig(): Unit = {
     
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG))
     
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
-    
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
     
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
-    
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
     
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
-    
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
 
     val origProps = TestUtils.createBrokerConfig(0, port = 8181)
     origProps.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 
"2097152")
     
origProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
 "500")
-    
origProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
 "false")
     
origProps.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"250")
-    
origProps.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
 "false")
     
origProps.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
 "125")
-    
origProps.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
 "false")
     val config = KafkaConfig(origProps)
     config.dynamicConfig.initialize(None)
     assertEquals(2 * 1024 * 1024, 
config.groupCoordinatorConfig.cachedBufferMaxBytes())
     assertEquals(500, 
config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs())
-    assertEquals(false, 
config.groupCoordinatorConfig.consumerGroupAssignorOffloadEnable())
     assertEquals(250, 
config.groupCoordinatorConfig.shareGroupAssignmentIntervalMs())
-    assertEquals(false, 
config.groupCoordinatorConfig.shareGroupAssignorOffloadEnable())
     assertEquals(125, 
config.groupCoordinatorConfig.streamsGroupAssignmentIntervalMs())
-    assertEquals(false, 
config.groupCoordinatorConfig.streamsGroupAssignorOffloadEnable())
 
     val props = new Properties()
     props.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, "4194304")
     
props.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"1000")
-    
props.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"true")
     
props.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"500")
-    
props.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"true")
     
props.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"250")
-    
props.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"true")
     config.dynamicConfig.updateDefaultConfig(props)
     assertEquals(4 * 1024 * 1024, 
config.groupCoordinatorConfig.cachedBufferMaxBytes())
     assertEquals(1000, 
config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs())
-    assertEquals(true, 
config.groupCoordinatorConfig.consumerGroupAssignorOffloadEnable())
     assertEquals(500, 
config.groupCoordinatorConfig.shareGroupAssignmentIntervalMs())
-    assertEquals(true, 
config.groupCoordinatorConfig.shareGroupAssignorOffloadEnable())
     assertEquals(250, 
config.groupCoordinatorConfig.streamsGroupAssignmentIntervalMs())
-    assertEquals(true, 
config.groupCoordinatorConfig.streamsGroupAssignorOffloadEnable())
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b3b4a147f3f..d7cbcc52a2d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType, 
Resource, ResourcePattern,
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, 
KafkaPrincipalSerde, SecurityProtocol}
 import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
 import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, 
ProducerIdAndEpoch, SecurityUtils, Utils}
-import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
 CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, 
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
+import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
 CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, 
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG, S [...]
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, 
GroupCoordinator, GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -359,7 +359,6 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
-    cgConfigs.put(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
     cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
@@ -369,13 +368,11 @@ class KafkaApisTest extends Logging {
     cgConfigs.put(SHARE_ISOLATION_LEVEL_CONFIG, 
GroupConfig.SHARE_ISOLATION_LEVEL_DEFAULT)
     cgConfigs.put(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT.toString)
     cgConfigs.put(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
-    cgConfigs.put(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
     cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
     cgConfigs.put(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT.toString)
     cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
-    cgConfigs.put(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
     cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
 
     when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 0a6841ec324..396ddcaba39 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -176,11 +176,6 @@ public final class GroupConfig extends AbstractConfig {
             atLeast(0),
             MEDIUM,
             GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
-        .define(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
-            BOOLEAN,
-            
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
-            MEDIUM,
-            GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         .define(SHARE_SESSION_TIMEOUT_MS_CONFIG,
             INT,
             GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -234,11 +229,6 @@ public final class GroupConfig extends AbstractConfig {
             atLeast(0),
             MEDIUM,
             GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
-        .define(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
-            BOOLEAN,
-            GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
-            MEDIUM,
-            GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         .define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
             INT,
             GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -269,11 +259,6 @@ public final class GroupConfig extends AbstractConfig {
             atLeast(0),
             MEDIUM,
             GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
-        .define(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
-            BOOLEAN,
-            
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
-            MEDIUM,
-            GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         .define(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
             INT,
             
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
@@ -290,7 +275,6 @@ public final class GroupConfig extends AbstractConfig {
         Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
         Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
         Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
-        Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
 
         // Share group configs
         Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
@@ -302,7 +286,6 @@ public final class GroupConfig extends AbstractConfig {
         Map.entry(SHARE_ISOLATION_LEVEL_CONFIG, Optional.empty()),
         Map.entry(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, Optional.empty()),
         Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
-        Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
 
         // Streams group configs
         Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
@@ -310,7 +293,6 @@ public final class GroupConfig extends AbstractConfig {
         Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)),
         Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)),
         Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
-        Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
         Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG))
     );
 
@@ -332,9 +314,7 @@ public final class GroupConfig extends AbstractConfig {
         this.consumerAssignmentIntervalMs = 
props.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
             Optional.of(getInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
             Optional.empty();
-        this.consumerAssignorOffloadEnable = 
props.containsKey(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
-            Optional.of(getBoolean(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
-            Optional.empty();
+        this.consumerAssignorOffloadEnable = Optional.empty();
         this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
         this.shareHeartbeatIntervalMs = 
getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.shareRecordLockDurationMs = 
getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
@@ -347,9 +327,7 @@ public final class GroupConfig extends AbstractConfig {
         this.shareAssignmentIntervalMs = 
props.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
             Optional.of(getInt(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
             Optional.empty();
-        this.shareAssignorOffloadEnable = 
props.containsKey(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
-            Optional.of(getBoolean(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
-            Optional.empty();
+        this.shareAssignorOffloadEnable = Optional.empty();
         this.streamsSessionTimeoutMs = 
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
         this.streamsHeartbeatIntervalMs = 
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
         this.streamsNumStandbyReplicas = 
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
@@ -360,9 +338,7 @@ public final class GroupConfig extends AbstractConfig {
         this.streamsAssignmentIntervalMs = 
props.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
             Optional.of(getInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
             Optional.empty();
-        this.streamsAssignorOffloadEnable = 
props.containsKey(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
-            Optional.of(getBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
-            Optional.empty();
+        this.streamsAssignorOffloadEnable = Optional.empty();
         this.streamsTaskOffsetIntervalMs = 
getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
         this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
         this.shareRenewAcknowledgeEnable = 
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 1da3ca21412..5000678d925 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -50,7 +50,6 @@ import static 
org.apache.kafka.common.config.ConfigDef.Importance.LOW;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
 import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
 import static org.apache.kafka.common.config.ConfigDef.Type.INT;
 import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
 import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
@@ -386,11 +385,8 @@ public class GroupCoordinatorConfig {
     public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
         CACHED_BUFFER_MAX_BYTES_CONFIG,
         CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
-        CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
         SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
-        SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
-        STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
-        STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG
+        STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG
     );
     
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
@@ -434,7 +430,6 @@ public class GroupCoordinatorConfig {
         .define(CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
         .define(CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
         .define(CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
-        .define(CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, 
CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, 
CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         // Interval config used for testing purposes.
         .defineInternal(CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, INT, 
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT, atLeast(10 * 1000), MEDIUM, 
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC)
 
@@ -450,7 +445,6 @@ public class GroupCoordinatorConfig {
         .define(SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
         .define(SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
         .define(SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
-        .define(SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, 
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, 
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         .defineInternal(SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG, INT, 
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT, atLeast(1), LOW, 
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC)
 
         // Streams group configs
@@ -467,7 +461,6 @@ public class GroupCoordinatorConfig {
         .define(STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
         .define(STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
         .define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM, 
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
-        .define(STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN, 
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM, 
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
         .define(STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
         .define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, 
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC);
 
@@ -1137,7 +1130,7 @@ public class GroupCoordinatorConfig {
      * Whether to offload consumer group assignment to a group coordinator 
background thread.
      */
     public boolean consumerGroupAssignorOffloadEnable() {
-        return 
config.getBoolean(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+        return CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT;
     }
 
     /**
@@ -1228,7 +1221,7 @@ public class GroupCoordinatorConfig {
      * Whether to offload share group assignment to a group coordinator 
background thread.
      */
     public boolean shareGroupAssignorOffloadEnable() {
-        return 
config.getBoolean(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+        return SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT;
     }
 
     /**
@@ -1333,7 +1326,7 @@ public class GroupCoordinatorConfig {
      * Whether to offload streams group assignment to a group coordinator 
background thread.
      */
     public boolean streamsGroupAssignorOffloadEnable() {
-        return 
config.getBoolean(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+        return STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT;
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index d1468b1eba1..d7208fb8e5c 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -89,8 +89,6 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
             } else if 
(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
-            } else if 
(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
-                assertPropertyInvalid(name, "not_a_boolean");
             } else if 
(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
             } else if 
(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
@@ -109,8 +107,6 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "not_a_boolean", "1");
             } else if 
(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
-            } else if 
(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
-                assertPropertyInvalid(name, "not_a_boolean");
             } else if 
(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
             } else if 
(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
@@ -121,8 +117,6 @@ public class GroupConfigTest {
                 assertPropertyInvalid(name, "not_a_number", "-1", "1.0");
             } else if 
(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
-            } else if 
(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
-                assertPropertyInvalid(name, "not_a_boolean");
             } else if 
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
                 assertPropertyInvalid(name, "not_a_number", "1.0");
             } else {
@@ -347,7 +341,6 @@ public class GroupConfigTest {
         defaultValue.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, 
"10");
         defaultValue.put(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"5000");
-        defaultValue.put(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"false");
         defaultValue.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
         defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
"2000");
@@ -356,13 +349,11 @@ public class GroupConfigTest {
         defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
         defaultValue.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG, 
"read_uncommitted");
         defaultValue.put(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"2500");
-        defaultValue.put(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"false");
         defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, 
"10");
         defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, 
"2000");
         defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
         
defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
         defaultValue.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, 
"1250");
-        defaultValue.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
"false");
         defaultValue.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, 
"30000");
         defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, 
"true");
 
@@ -373,7 +364,6 @@ public class GroupConfigTest {
         assertEquals(10, 
config.getInt(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG));
         assertEquals(20, 
config.getInt(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(5000, 
config.getInt(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG));
-        assertEquals(false, 
config.getBoolean(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
         assertEquals(10, 
config.getInt(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG));
         assertEquals(10, 
config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(2000, 
config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
@@ -382,13 +372,11 @@ public class GroupConfigTest {
         assertEquals("latest", 
config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
         assertEquals("read_uncommitted", 
config.getString(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG));
         assertEquals(2500, 
config.getInt(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG));
-        assertEquals(false, 
config.getBoolean(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
         assertEquals(10, 
config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
         assertEquals(2000, 
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
         assertEquals(1, 
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
         assertEquals(3000, 
config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG));
         assertEquals(1250, 
config.getInt(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG));
-        assertEquals(false, 
config.getBoolean(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
         assertEquals(30000, 
config.getInt(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG));
         assertEquals(true, 
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
     }
@@ -418,8 +406,8 @@ public class GroupConfigTest {
     }
 
     @Test
-    public void testAssignorOffloadEnableAbsentWhenNotConfigured() {
-        // When the offload enable config is absent, the group-level value is 
empty.
+    public void testAssignorOffloadEnableAbsent() {
+        // In 4.3, the group-level offload enable configs are always treated 
as absent.
         Properties props = new Properties();
         GroupConfig config = GroupConfig.fromProps(Map.of(), props);
         assertEquals(Optional.empty(), config.consumerAssignorOffloadEnable());
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index a1eff74238f..dd56d1ed994 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -202,17 +202,14 @@ public class GroupCoordinatorConfigTest {
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
 500);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
 400);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
 600);
-        
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
 false);
         
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
 15 * 60 * 1000);
         
configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
250);
         
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
 150);
         
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
 350);
-        
configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, 
false);
         
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
 5000);
         
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, 
125);
         
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
 25);
         
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
 225);
-        
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
 false);
         configs.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 2 * 
1024 * 1024);
 
         GroupCoordinatorConfig config = createConfig(configs);
@@ -246,17 +243,14 @@ public class GroupCoordinatorConfigTest {
         assertEquals(500, config.consumerGroupAssignmentIntervalMs());
         assertEquals(400, config.consumerGroupMinAssignmentIntervalMs());
         assertEquals(600, config.consumerGroupMaxAssignmentIntervalMs());
-        assertEquals(false, config.consumerGroupAssignorOffloadEnable());
         assertEquals(15 * 60 * 1000, 
config.consumerGroupRegexRefreshIntervalMs());
         assertEquals(250, config.shareGroupAssignmentIntervalMs());
         assertEquals(150, config.shareGroupMinAssignmentIntervalMs());
         assertEquals(350, config.shareGroupMaxAssignmentIntervalMs());
-        assertEquals(false, config.shareGroupAssignorOffloadEnable());
         assertEquals(5000, config.streamsGroupInitialRebalanceDelayMs());
         assertEquals(125, config.streamsGroupAssignmentIntervalMs());
         assertEquals(25, config.streamsGroupMinAssignmentIntervalMs());
         assertEquals(225, config.streamsGroupMaxAssignmentIntervalMs());
-        assertEquals(false, config.streamsGroupAssignorOffloadEnable());
         assertEquals(2 * 1024 * 1024, config.cachedBufferMaxBytes());
     }
 
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ab1807a3cbc..306128837ff 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -21182,36 +21182,18 @@ public class GroupMetadataManagerTest {
             GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
             2000, 1500, 1000, 500
         );
-        testDynamicBrokerAndGroupConfig(
-            GroupMetadataManager::consumerGroupAssignorOffloadEnable,
-            
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
-            GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
-            true, false, true, false
-        );
         testDynamicBrokerAndGroupConfig(
             GroupMetadataManager::shareGroupAssignmentIntervalMs,
             GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
             GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
             2000, 1500, 1000, 500
         );
-        testDynamicBrokerAndGroupConfig(
-            GroupMetadataManager::shareGroupAssignorOffloadEnable,
-            GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
-            GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
-            true, false, true, false
-        );
         testDynamicBrokerAndGroupConfig(
             GroupMetadataManager::streamsGroupAssignmentIntervalMs,
             GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
             GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
             2000, 1500, 1000, 500
         );
-        testDynamicBrokerAndGroupConfig(
-            GroupMetadataManager::streamsGroupAssignorOffloadEnable,
-            
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
-            GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
-            true, false, true, false
-        );
     }
 
     private <V> void testDynamicBrokerAndGroupConfig(

Reply via email to