This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new fd7b5ece823 KAFKA-20339: Remove assignment offloading configs for 4.3
(#21926)
fd7b5ece823 is described below
commit fd7b5ece823f6c0b8b1383f5d5c6d00c5424a7c0
Author: Sean Quah <[email protected]>
AuthorDate: Tue Apr 7 14:31:10 2026 +0100
KAFKA-20339: Remove assignment offloading configs for 4.3 (#21926)
The KIP-1263 assignment offloading configs aren't fully supported yet
in Apache Kafka 4.3. Remove them to hide them from users.
Reviewers: David Jacot <[email protected]>, majialong
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../kafka/server/DynamicBrokerConfigTest.scala | 15 -----------
.../scala/unit/kafka/server/KafkaApisTest.scala | 5 +---
.../kafka/coordinator/group/GroupConfig.java | 30 +++-------------------
.../coordinator/group/GroupCoordinatorConfig.java | 15 +++--------
.../kafka/coordinator/group/GroupConfigTest.java | 16 ++----------
.../group/GroupCoordinatorConfigTest.java | 6 -----
.../group/GroupMetadataManagerTest.java | 18 -------------
7 files changed, 10 insertions(+), 95 deletions(-)
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 00a74855ec4..c2fb3f935e3 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -1183,46 +1183,31 @@ class DynamicBrokerConfigTest {
def testDynamicGroupCoordinatorConfig(): Unit = {
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG))
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
-
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
-
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG))
-
assertTrue(GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.contains(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG))
val origProps = TestUtils.createBrokerConfig(0, port = 8181)
origProps.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG,
"2097152")
origProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"500")
-
origProps.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false")
origProps.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"250")
-
origProps.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false")
origProps.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"125")
-
origProps.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false")
val config = KafkaConfig(origProps)
config.dynamicConfig.initialize(None)
assertEquals(2 * 1024 * 1024,
config.groupCoordinatorConfig.cachedBufferMaxBytes())
assertEquals(500,
config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs())
- assertEquals(false,
config.groupCoordinatorConfig.consumerGroupAssignorOffloadEnable())
assertEquals(250,
config.groupCoordinatorConfig.shareGroupAssignmentIntervalMs())
- assertEquals(false,
config.groupCoordinatorConfig.shareGroupAssignorOffloadEnable())
assertEquals(125,
config.groupCoordinatorConfig.streamsGroupAssignmentIntervalMs())
- assertEquals(false,
config.groupCoordinatorConfig.streamsGroupAssignorOffloadEnable())
val props = new Properties()
props.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, "4194304")
props.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"1000")
-
props.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"true")
props.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"500")
-
props.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"true")
props.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
"250")
-
props.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"true")
config.dynamicConfig.updateDefaultConfig(props)
assertEquals(4 * 1024 * 1024,
config.groupCoordinatorConfig.cachedBufferMaxBytes())
assertEquals(1000,
config.groupCoordinatorConfig.consumerGroupAssignmentIntervalMs())
- assertEquals(true,
config.groupCoordinatorConfig.consumerGroupAssignorOffloadEnable())
assertEquals(500,
config.groupCoordinatorConfig.shareGroupAssignmentIntervalMs())
- assertEquals(true,
config.groupCoordinatorConfig.shareGroupAssignorOffloadEnable())
assertEquals(250,
config.groupCoordinatorConfig.streamsGroupAssignmentIntervalMs())
- assertEquals(true,
config.groupCoordinatorConfig.streamsGroupAssignorOffloadEnable())
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b3b4a147f3f..d7cbcc52a2d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType,
Resource, ResourcePattern,
import org.apache.kafka.common.security.auth.{KafkaPrincipal,
KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection,
ProducerIdAndEpoch, SecurityUtils, Utils}
-import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG,
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CO [...]
+import
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG,
SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
SHARE_SESSION_TIMEOUT_MS_CONFIG, S [...]
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager,
GroupCoordinator, GroupCoordinatorConfig}
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -359,7 +359,6 @@ class KafkaApisTest extends Logging {
cgConfigs.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
- cgConfigs.put(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
cgConfigs.put(SHARE_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT.toString)
@@ -369,13 +368,11 @@ class KafkaApisTest extends Logging {
cgConfigs.put(SHARE_ISOLATION_LEVEL_CONFIG,
GroupConfig.SHARE_ISOLATION_LEVEL_DEFAULT)
cgConfigs.put(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT.toString)
cgConfigs.put(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
- cgConfigs.put(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
cgConfigs.put(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
cgConfigs.put(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT.toString)
cgConfigs.put(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG, "1000")
- cgConfigs.put(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, "false");
cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 0a6841ec324..396ddcaba39 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -176,11 +176,6 @@ public final class GroupConfig extends AbstractConfig {
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
- .define(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
- BOOLEAN,
-
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
- MEDIUM,
- GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
.define(SHARE_SESSION_TIMEOUT_MS_CONFIG,
INT,
GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -234,11 +229,6 @@ public final class GroupConfig extends AbstractConfig {
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
- .define(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
- BOOLEAN,
- GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
- MEDIUM,
- GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
.define(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT,
@@ -269,11 +259,6 @@ public final class GroupConfig extends AbstractConfig {
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
- .define(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
- BOOLEAN,
-
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT,
- MEDIUM,
- GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
.define(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
@@ -290,7 +275,6 @@ public final class GroupConfig extends AbstractConfig {
Map.entry(CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
Map.entry(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG)),
Map.entry(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
- Map.entry(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
Optional.of(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
// Share group configs
Map.entry(SHARE_SESSION_TIMEOUT_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
@@ -302,7 +286,6 @@ public final class GroupConfig extends AbstractConfig {
Map.entry(SHARE_ISOLATION_LEVEL_CONFIG, Optional.empty()),
Map.entry(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, Optional.empty()),
Map.entry(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
- Map.entry(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
Optional.of(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
// Streams group configs
Map.entry(STREAMS_SESSION_TIMEOUT_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)),
@@ -310,7 +293,6 @@ public final class GroupConfig extends AbstractConfig {
Map.entry(STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG)),
Map.entry(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG)),
Map.entry(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG)),
- Map.entry(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)),
Map.entry(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
Optional.of(GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG))
);
@@ -332,9 +314,7 @@ public final class GroupConfig extends AbstractConfig {
this.consumerAssignmentIntervalMs =
props.containsKey(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
Optional.of(getInt(CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
Optional.empty();
- this.consumerAssignorOffloadEnable =
props.containsKey(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
- Optional.of(getBoolean(CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
- Optional.empty();
+ this.consumerAssignorOffloadEnable = Optional.empty();
this.shareSessionTimeoutMs = getInt(SHARE_SESSION_TIMEOUT_MS_CONFIG);
this.shareHeartbeatIntervalMs =
getInt(SHARE_HEARTBEAT_INTERVAL_MS_CONFIG);
this.shareRecordLockDurationMs =
getInt(SHARE_RECORD_LOCK_DURATION_MS_CONFIG);
@@ -347,9 +327,7 @@ public final class GroupConfig extends AbstractConfig {
this.shareAssignmentIntervalMs =
props.containsKey(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
Optional.of(getInt(SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
Optional.empty();
- this.shareAssignorOffloadEnable =
props.containsKey(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
- Optional.of(getBoolean(SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
- Optional.empty();
+ this.shareAssignorOffloadEnable = Optional.empty();
this.streamsSessionTimeoutMs =
getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
this.streamsHeartbeatIntervalMs =
getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsNumStandbyReplicas =
getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
@@ -360,9 +338,7 @@ public final class GroupConfig extends AbstractConfig {
this.streamsAssignmentIntervalMs =
props.containsKey(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG) ?
Optional.of(getInt(STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG)) :
Optional.empty();
- this.streamsAssignorOffloadEnable =
props.containsKey(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG) ?
- Optional.of(getBoolean(STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG)) :
- Optional.empty();
+ this.streamsAssignorOffloadEnable = Optional.empty();
this.streamsTaskOffsetIntervalMs =
getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
this.shareRenewAcknowledgeEnable =
getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index 1da3ca21412..5000678d925 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -50,7 +50,6 @@ import static
org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.LIST;
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
@@ -386,11 +385,8 @@ public class GroupCoordinatorConfig {
public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
CACHED_BUFFER_MAX_BYTES_CONFIG,
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
- CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
- SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
- STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
- STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG
+ STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG
);
public static final ConfigDef CONFIG_DEF = new ConfigDef()
@@ -434,7 +430,6 @@ public class GroupCoordinatorConfig {
.define(CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
.define(CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
.define(CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
- .define(CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN,
CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM,
CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
// Interval config used for testing purposes.
.defineInternal(CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, INT,
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DEFAULT, atLeast(10 * 1000), MEDIUM,
CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_DOC)
@@ -450,7 +445,6 @@ public class GroupCoordinatorConfig {
.define(SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
.define(SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
- .define(SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN,
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM,
SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
.defineInternal(SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG, INT,
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT, atLeast(1), LOW,
SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DOC)
// Streams group configs
@@ -467,7 +461,6 @@ public class GroupCoordinatorConfig {
.define(STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DEFAULT, atLeast(0), MEDIUM,
STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_DOC)
- .define(STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG, BOOLEAN,
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT, MEDIUM,
STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DOC)
.define(STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG, INT,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM,
STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_DOC);
@@ -1137,7 +1130,7 @@ public class GroupCoordinatorConfig {
* Whether to offload consumer group assignment to a group coordinator
background thread.
*/
public boolean consumerGroupAssignorOffloadEnable() {
- return
config.getBoolean(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+ return CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT;
}
/**
@@ -1228,7 +1221,7 @@ public class GroupCoordinatorConfig {
* Whether to offload share group assignment to a group coordinator
background thread.
*/
public boolean shareGroupAssignorOffloadEnable() {
- return
config.getBoolean(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+ return SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT;
}
/**
@@ -1333,7 +1326,7 @@ public class GroupCoordinatorConfig {
* Whether to offload streams group assignment to a group coordinator
background thread.
*/
public boolean streamsGroupAssignorOffloadEnable() {
- return
config.getBoolean(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG);
+ return STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_DEFAULT;
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index d1468b1eba1..d7208fb8e5c 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -89,8 +89,6 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
} else if
(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
- } else if
(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
- assertPropertyInvalid(name, "not_a_boolean");
} else if
(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-0.1", "1.2");
} else if
(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
@@ -109,8 +107,6 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_boolean", "1");
} else if
(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
- } else if
(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
- assertPropertyInvalid(name, "not_a_boolean");
} else if
(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else if
(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG.equals(name)) {
@@ -121,8 +117,6 @@ public class GroupConfigTest {
assertPropertyInvalid(name, "not_a_number", "-1", "1.0");
} else if
(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "-1", "1.2");
- } else if
(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG.equals(name)) {
- assertPropertyInvalid(name, "not_a_boolean");
} else if
(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG.equals(name)) {
assertPropertyInvalid(name, "not_a_number", "1.0");
} else {
@@ -347,7 +341,6 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "10");
defaultValue.put(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
"10");
defaultValue.put(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
"5000");
- defaultValue.put(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false");
defaultValue.put(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "10");
defaultValue.put(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
"2000");
@@ -356,13 +349,11 @@ public class GroupConfigTest {
defaultValue.put(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "latest");
defaultValue.put(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG,
"read_uncommitted");
defaultValue.put(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
"2500");
- defaultValue.put(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false");
defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
"10");
defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
"2000");
defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1");
defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000");
defaultValue.put(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
"1250");
- defaultValue.put(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
"false");
defaultValue.put(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
"30000");
defaultValue.put(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG,
"true");
@@ -373,7 +364,6 @@ public class GroupConfigTest {
assertEquals(10,
config.getInt(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(20,
config.getInt(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(5000,
config.getInt(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG));
- assertEquals(false,
config.getBoolean(GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
assertEquals(10,
config.getInt(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(10,
config.getInt(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(2000,
config.getInt(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG));
@@ -382,13 +372,11 @@ public class GroupConfigTest {
assertEquals("latest",
config.getString(GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG));
assertEquals("read_uncommitted",
config.getString(GroupConfig.SHARE_ISOLATION_LEVEL_CONFIG));
assertEquals(2500,
config.getInt(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG));
- assertEquals(false,
config.getBoolean(GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
assertEquals(10,
config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG));
assertEquals(2000,
config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG));
assertEquals(1,
config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG));
assertEquals(3000,
config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG));
assertEquals(1250,
config.getInt(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG));
- assertEquals(false,
config.getBoolean(GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG));
assertEquals(30000,
config.getInt(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG));
assertEquals(true,
config.getBoolean(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG));
}
@@ -418,8 +406,8 @@ public class GroupConfigTest {
}
@Test
- public void testAssignorOffloadEnableAbsentWhenNotConfigured() {
- // When the offload enable config is absent, the group-level value is
empty.
+ public void testAssignorOffloadEnableAbsent() {
+ // In 4.3, the group-level offload enable configs are always treated
as absent.
Properties props = new Properties();
GroupConfig config = GroupConfig.fromProps(Map.of(), props);
assertEquals(Optional.empty(), config.consumerAssignorOffloadEnable());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
index a1eff74238f..dd56d1ed994 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java
@@ -202,17 +202,14 @@ public class GroupCoordinatorConfigTest {
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
500);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
400);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
600);
-
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
false);
configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG,
15 * 60 * 1000);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
250);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
150);
configs.put(GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
350);
-
configs.put(GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
false);
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
5000);
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
125);
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
25);
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG,
225);
-
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
false);
configs.put(GroupCoordinatorConfig.CACHED_BUFFER_MAX_BYTES_CONFIG, 2 *
1024 * 1024);
GroupCoordinatorConfig config = createConfig(configs);
@@ -246,17 +243,14 @@ public class GroupCoordinatorConfigTest {
assertEquals(500, config.consumerGroupAssignmentIntervalMs());
assertEquals(400, config.consumerGroupMinAssignmentIntervalMs());
assertEquals(600, config.consumerGroupMaxAssignmentIntervalMs());
- assertEquals(false, config.consumerGroupAssignorOffloadEnable());
assertEquals(15 * 60 * 1000,
config.consumerGroupRegexRefreshIntervalMs());
assertEquals(250, config.shareGroupAssignmentIntervalMs());
assertEquals(150, config.shareGroupMinAssignmentIntervalMs());
assertEquals(350, config.shareGroupMaxAssignmentIntervalMs());
- assertEquals(false, config.shareGroupAssignorOffloadEnable());
assertEquals(5000, config.streamsGroupInitialRebalanceDelayMs());
assertEquals(125, config.streamsGroupAssignmentIntervalMs());
assertEquals(25, config.streamsGroupMinAssignmentIntervalMs());
assertEquals(225, config.streamsGroupMaxAssignmentIntervalMs());
- assertEquals(false, config.streamsGroupAssignorOffloadEnable());
assertEquals(2 * 1024 * 1024, config.cachedBufferMaxBytes());
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index ab1807a3cbc..306128837ff 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -21182,36 +21182,18 @@ public class GroupMetadataManagerTest {
GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
2000, 1500, 1000, 500
);
- testDynamicBrokerAndGroupConfig(
- GroupMetadataManager::consumerGroupAssignorOffloadEnable,
-
GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
- GroupConfig.CONSUMER_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
- true, false, true, false
- );
testDynamicBrokerAndGroupConfig(
GroupMetadataManager::shareGroupAssignmentIntervalMs,
GroupCoordinatorConfig.SHARE_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
2000, 1500, 1000, 500
);
- testDynamicBrokerAndGroupConfig(
- GroupMetadataManager::shareGroupAssignorOffloadEnable,
- GroupCoordinatorConfig.SHARE_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
- GroupConfig.SHARE_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
- true, false, true, false
- );
testDynamicBrokerAndGroupConfig(
GroupMetadataManager::streamsGroupAssignmentIntervalMs,
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG,
GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
2000, 1500, 1000, 500
);
- testDynamicBrokerAndGroupConfig(
- GroupMetadataManager::streamsGroupAssignorOffloadEnable,
-
GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
- GroupConfig.STREAMS_ASSIGNOR_OFFLOAD_ENABLE_CONFIG,
- true, false, true, false
- );
}
private <V> void testDynamicBrokerAndGroupConfig(