This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a6b1847720b MINOR: Remove temporary internal config group.share.enable
(#21708)
a6b1847720b is described below
commit a6b1847720bcb5c16487d2a7865fd0e82c826a51
Author: Sanskar Jhajharia <[email protected]>
AuthorDate: Mon Mar 16 00:28:48 2026 +0530
MINOR: Remove temporary internal config group.share.enable (#21708)
### Summary
Removes the internal configuration group.share.enable that was
previously used to enable share groups (KIP-932) in integration and
system tests. Share groups are now exclusively controlled by the
`share.version` feature flag, which is the proper enablement mechanism
for production use.
### Motivation
The `group.share.enable` config was an internal, test-only configuration
that provided a secondary way to enable share groups alongside the
proper share.version feature flag. This dual-enablement mechanism was
confusing, error-prone, not production-ready and temporary.
With share groups now stable and properly gated by the `share.version`
feature flag, this internal config is no longer needed.
### Changes
#### Core Configuration
- `ShareGroupConfig.java`: Removed `SHARE_GROUP_ENABLE_CONFIG`, field,
and getter method
- `KafkaApis.scala`: Simplified `isShareGroupProtocolEnabled(`) to only
check `shareVersion().supportsShareGroups`
- `SharePartitionManager.java`: Removed `isEnabledFromConfig` parameter
from `onShareVersionToggle()`
- `BrokerMetadataPublisher.scala`: Updated to pass only `ShareVersion`
to partition manager
- `ShareCoordinatorService.java`: Removed supplier pattern for config,
simplified `isShareGroupsEnabled()` to only check `ShareVersion` feature
- `BrokerServer.scala`: Removed config supplier from
ShareCoordinatorService builder
#### Tests Updated
- `ShareGroupConfigTest.java`: Removed config assertions and helper
parameter
- `GroupConfigTest.java`: Removed config constant
- `ShareConsumerTest.java:` Removed `group.share.enable` cluster
property
- `SharePartitionManagerTest.java`: Updated method calls and removed
obsolete test
- `ShareCoordinatorServiceTest.java`: Updated to mock `MetadataImage`
with proper ShareVersion feature flag
- `BrokerMetadataPublisherTest.scala`: Updated method signature
verification
- Python system tests: Removed config constant and usage
Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 1 -
.../kafka/server/share/SharePartitionManager.java | 5 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 1 -
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../server/metadata/BrokerMetadataPublisher.scala | 2 +-
.../server/share/SharePartitionManagerTest.java | 22 +---
.../metadata/BrokerMetadataPublisherTest.scala | 2 +-
.../group/modern/share/ShareGroupConfig.java | 14 --
.../kafka/coordinator/group/GroupConfigTest.java | 3 +-
.../group/modern/share/ShareGroupConfigTest.java | 5 -
.../coordinator/share/ShareCoordinatorService.java | 20 +--
.../share/ShareCoordinatorServiceTest.java | 141 ++++++++-------------
tests/kafkatest/services/kafka/config_property.py | 1 -
tests/kafkatest/services/kafka/kafka.py | 3 -
14 files changed, 67 insertions(+), 155 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 837db109d94..325a3c3ca8d 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -2365,7 +2365,6 @@ public class ShareConsumerTest {
brokers = 1,
serverProperties = {
@ClusterConfigProperty(key = "auto.create.topics.enable", value =
"false"),
- @ClusterConfigProperty(key = "group.share.enable", value = "true"),
@ClusterConfigProperty(key =
"group.share.max.partition.max.record.locks", value = "10000"),
@ClusterConfigProperty(key =
"group.share.partition.max.record.locks", value = "10000"),
@ClusterConfigProperty(key =
"group.share.record.lock.duration.ms", value = "15000"),
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 1f37eadf11b..22cdb5d61fe 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -563,12 +563,11 @@ public class SharePartitionManager implements
AutoCloseable {
/**
* The handler for share version feature metadata changes.
* @param shareVersion the new share version feature
- * @param isEnabledFromConfig whether the share version feature is enabled
from config
*/
- public void onShareVersionToggle(ShareVersion shareVersion, boolean
isEnabledFromConfig) {
+ public void onShareVersionToggle(ShareVersion shareVersion) {
// Clear the cache and remove all share partitions from the cache if
the share version does
// not support share groups.
- if (!shareVersion.supportsShareGroups() && !isEnabledFromConfig) {
+ if (!shareVersion.supportsShareGroups()) {
cache.removeAllSessions();
Set<SharePartitionKey> sharePartitionKeys =
partitionCache.cachedSharePartitionKeys();
// Remove all share partitions from partition cache.
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 8493321641f..f7aef286e10 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -717,7 +717,6 @@ class BrokerServer(
.withWriter(writer)
.withCoordinatorRuntimeMetrics(new
ShareCoordinatorRuntimeMetrics(metrics))
.withCoordinatorMetrics(new ShareCoordinatorMetrics(metrics))
- .withShareGroupEnabledConfigSupplier(() =>
config.shareGroupConfig.isShareGroupEnabled)
.build()
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 76a45415acc..555469599b3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -4224,7 +4224,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
private def isShareGroupProtocolEnabled: Boolean = {
- config.shareGroupConfig.isShareGroupEnabled ||
shareVersion().supportsShareGroups
+ shareVersion().supportsShareGroups
}
/**
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 8672efcf9c5..d42664c0263 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -233,7 +233,7 @@ class BrokerMetadataPublisher(
finalizedShareVersion = newFinalizedShareVersion
val shareVersion: ShareVersion =
ShareVersion.fromFeatureLevel(finalizedShareVersion)
info(s"Feature share.version has been updated to version
$finalizedShareVersion")
- sharePartitionManager.onShareVersionToggle(shareVersion,
config.shareGroupConfig.isShareGroupEnabled)
+ sharePartitionManager.onShareVersionToggle(shareVersion)
}
} catch {
case t: Throwable =>
metadataPublishingFaultHandler.handleFault("Error updating share partition
manager " +
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index d177ac6c9a9..134f4524cdd 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -2923,7 +2923,7 @@ public class SharePartitionManagerTest {
.withPartitionCache(partitionCache)
.build();
assertEquals(4, partitionCache.size());
- sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, false);
+ sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0);
// Because we are toggling to a share version which does not support
share groups, the cache inside share partitions must be cleared.
assertEquals(0, partitionCache.size());
//Check if all share partitions have been fenced.
@@ -2933,24 +2933,6 @@ public class SharePartitionManagerTest {
Mockito.verify(sp3).markFenced();
}
- @Test
- public void testOnShareVersionToggleWhenEnabledFromConfig() {
- SharePartition sp0 = mock(SharePartition.class);
- // Mock the share partitions corresponding to the topic partitions.
- SharePartitionCache partitionCache = new SharePartitionCache();
- partitionCache.put(
- new SharePartitionKey("grp", new
TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))), sp0
- );
- sharePartitionManager = SharePartitionManagerBuilder.builder()
- .withPartitionCache(partitionCache)
- .build();
- assertEquals(1, partitionCache.size());
- sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, true);
- // Though share version is toggled to off, but it's enabled from
config, hence the cache should not be cleared.
- assertEquals(1, partitionCache.size());
- Mockito.verify(sp0, times(0)).markFenced();
- }
-
@Test
public void testShareGroupListener() {
String groupId = "grp";
@@ -3023,7 +3005,7 @@ public class SharePartitionManagerTest {
assertEquals(1, partitionCache.size());
// Clean up share session and partition cache.
- sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, false);
+ sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0);
assertEquals(0, cache.size());
assertEquals(0, partitionCache.size());
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index f271d238d06..ba7445de0cb 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -356,6 +356,6 @@ class BrokerMetadataPublisherTest {
)
// SharePartitionManager is receiving the latest changes.
- verify(sharePartitionManager).onShareVersionToggle(any(), any())
+ verify(sharePartitionManager).onShareVersionToggle(any())
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
index 5c993f420b2..cc72a8b48e3 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfig.java
@@ -27,18 +27,12 @@ import java.util.Map;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
-import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public class ShareGroupConfig {
/** Share Group Configurations **/
- // Internal configuration used by integration and system tests.
- public static final String SHARE_GROUP_ENABLE_CONFIG =
"group.share.enable";
- public static final boolean SHARE_GROUP_ENABLE_DEFAULT = false;
- public static final String SHARE_GROUP_ENABLE_DOC = "Enable share groups
on the broker.";
-
public static final String SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG =
"group.share.partition.max.record.locks";
public static final int SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DEFAULT =
2000;
public static final String SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_DOC =
"Share-group record lock limit per share-partition.";
@@ -89,7 +83,6 @@ public class ShareGroupConfig {
"the <code>org.apache.kafka.server.share.Persister</code> interface.";
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- .defineInternal(SHARE_GROUP_ENABLE_CONFIG, BOOLEAN,
SHARE_GROUP_ENABLE_DEFAULT, null, MEDIUM, SHARE_GROUP_ENABLE_DOC)
.define(SHARE_GROUP_DELIVERY_COUNT_LIMIT_CONFIG, INT,
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 10), MEDIUM,
SHARE_GROUP_DELIVERY_COUNT_LIMIT_DOC)
.define(SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG, INT,
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DEFAULT, between(5, 25), MEDIUM,
SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_DOC)
.define(SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG, INT,
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DEFAULT, between(2, 5), MEDIUM,
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_DOC)
@@ -103,7 +96,6 @@ public class ShareGroupConfig {
.define(SHARE_GROUP_MAX_SHARE_SESSIONS_CONFIG, INT,
SHARE_GROUP_MAX_SHARE_SESSIONS_DEFAULT, atLeast(1), MEDIUM,
SHARE_GROUP_MAX_SHARE_SESSIONS_DOC)
.defineInternal(SHARE_GROUP_PERSISTER_CLASS_NAME_CONFIG, STRING,
SHARE_GROUP_PERSISTER_CLASS_NAME_DEFAULT, null, MEDIUM,
SHARE_GROUP_PERSISTER_CLASS_NAME_DOC);
- private final boolean isShareGroupEnabled;
private final int shareGroupPartitionMaxRecordLocks;
private final int shareGroupMaxPartitionMaxRecordLocks;
private final int shareGroupMinPartitionMaxRecordLocks;
@@ -120,8 +112,6 @@ public class ShareGroupConfig {
public ShareGroupConfig(AbstractConfig config) {
this.config = config;
- // The proper way to enable share groups is to use the share.version
feature with v1 or later.
- isShareGroupEnabled =
config.getBoolean(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG);
shareGroupPartitionMaxRecordLocks =
config.getInt(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG);
shareGroupMaxPartitionMaxRecordLocks =
config.getInt(SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG);
shareGroupMinPartitionMaxRecordLocks =
config.getInt(SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG);
@@ -148,10 +138,6 @@ public class ShareGroupConfig {
}
/** Share group configuration **/
- public boolean isShareGroupEnabled() {
- return isShareGroupEnabled;
- }
-
public int shareGroupPartitionMaxRecordLocks() {
return shareGroupPartitionMaxRecordLocks;
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 2315d4496ce..bd7bec0e98f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -62,7 +62,6 @@ public class GroupConfigTest {
private static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS = 1000L;
private static final int OFFSETS_RETENTION_MINUTES = 24 * 60;
- private static final boolean SHARE_GROUP_ENABLE = true;
private static final int SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS = 200;
private static final int SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS = 100;
private static final int SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS =
10000;
@@ -512,7 +511,7 @@ public class GroupConfigTest {
}
private ShareGroupConfig createShareGroupConfig() {
- return ShareGroupConfigTest.createShareGroupConfig(SHARE_GROUP_ENABLE,
SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS,
+ return
ShareGroupConfigTest.createShareGroupConfig(SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS,
SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS,
SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS,
SHARE_GROUP_DELIVERY_COUNT_LIMIT,
SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT, SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT,
SHARE_GROUP_RECORD_LOCK_DURATION_MS,
SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS,
SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS);
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
index 50ceedfda81..4f70e3909aa 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigTest.java
@@ -26,14 +26,12 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShareGroupConfigTest {
@Test
public void testConfigs() {
Map<String, Object> configs = new HashMap<>();
- configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG, true);
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG,
200);
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG,
100);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG,
10000);
@@ -48,7 +46,6 @@ public class ShareGroupConfigTest {
ShareGroupConfig config = createConfig(configs);
- assertTrue(config.isShareGroupEnabled());
assertEquals(200, config.shareGroupPartitionMaxRecordLocks());
assertEquals(5, config.shareGroupDeliveryCountLimit());
assertEquals(30000, config.shareGroupRecordLockDurationMs());
@@ -137,7 +134,6 @@ public class ShareGroupConfigTest {
}
public static ShareGroupConfig createShareGroupConfig(
- boolean shareGroupEnable,
int shareGroupPartitionMaxRecordLocks,
int shareGroupMinPartitionMaxRecordLocks,
int shareGroupMaxPartitionMaxRecordLocks,
@@ -149,7 +145,6 @@ public class ShareGroupConfigTest {
int shareGroupMaxRecordLockDurationMs
) {
Map<String, Object> configs = new HashMap<>();
- configs.put(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG,
shareGroupEnable);
configs.put(ShareGroupConfig.SHARE_GROUP_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupPartitionMaxRecordLocks);
configs.put(ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupMinPartitionMaxRecordLocks);
configs.put(ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupMaxPartitionMaxRecordLocks);
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index 2f591415574..d5e11424486 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -78,7 +78,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntSupplier;
-import java.util.function.Supplier;
import static
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
@@ -94,7 +93,6 @@ public class ShareCoordinatorService implements
ShareCoordinator {
private final Timer timer;
private final PartitionWriter writer;
private final Map<TopicPartition, Long> lastPrunedOffsets;
- private final Supplier<Boolean> shareGroupConfigEnabledSupplier;
private volatile boolean shouldRunPeriodicJob;
public static class Builder {
@@ -104,7 +102,6 @@ public class ShareCoordinatorService implements
ShareCoordinator {
private CoordinatorLoader<CoordinatorRecord> loader;
private Time time;
private Timer timer;
- private Supplier<Boolean> shareGroupConfigEnabledSupplier;
private ShareCoordinatorMetrics coordinatorMetrics;
private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
@@ -143,11 +140,6 @@ public class ShareCoordinatorService implements
ShareCoordinator {
return this;
}
- public Builder withShareGroupEnabledConfigSupplier(Supplier<Boolean>
shareGroupConfigEnabledSupplier) {
- this.shareGroupConfigEnabledSupplier =
shareGroupConfigEnabledSupplier;
- return this;
- }
-
public ShareCoordinatorService build() {
if (config == null) {
throw new IllegalArgumentException("Config must be set.");
@@ -170,9 +162,6 @@ public class ShareCoordinatorService implements
ShareCoordinator {
if (coordinatorRuntimeMetrics == null) {
throw new IllegalArgumentException("Coordinator runtime
metrics must be set.");
}
- if (shareGroupConfigEnabledSupplier == null) {
- throw new IllegalArgumentException("Share group enabled config
enabled supplier must be set.");
- }
String logPrefix = String.format("ShareCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ",
logPrefix));
@@ -216,8 +205,7 @@ public class ShareCoordinatorService implements
ShareCoordinator {
coordinatorMetrics,
time,
timer,
- writer,
- shareGroupConfigEnabledSupplier
+ writer
);
}
}
@@ -229,8 +217,7 @@ public class ShareCoordinatorService implements
ShareCoordinator {
ShareCoordinatorMetrics shareCoordinatorMetrics,
Time time,
Timer timer,
- PartitionWriter writer,
- Supplier<Boolean> shareGroupConfigEnabledSupplier
+ PartitionWriter writer
) {
this.log = logContext.logger(ShareCoordinatorService.class);
this.config = config;
@@ -240,7 +227,6 @@ public class ShareCoordinatorService implements
ShareCoordinator {
this.timer = timer;
this.writer = writer;
this.lastPrunedOffsets = new ConcurrentHashMap<>();
- this.shareGroupConfigEnabledSupplier = shareGroupConfigEnabledSupplier;
}
@Override
@@ -1130,7 +1116,7 @@ public class ShareCoordinatorService implements
ShareCoordinator {
}
private boolean isShareGroupsEnabled(MetadataImage image) {
- return shareGroupConfigEnabledSupplier.get() ||
ShareVersion.fromFeatureLevel(
+ return ShareVersion.fromFeatureLevel(
image.features().finalizedVersions().getOrDefault(ShareVersion.FEATURE_NAME,
(short) 0)
).supportsShareGroups();
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index 6c22e91bcf1..cf5c51590a5 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -97,6 +97,13 @@ class ShareCoordinatorServiceTest {
return runtime;
}
+ private MetadataImage mockMetadataImageWithShareGroupsEnabled() {
+ MetadataImage image = mock(MetadataImage.class, RETURNS_DEEP_STUBS);
+
when(image.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
anyShort()))
+ .thenReturn((short) 1);
+ return image;
+ }
+
@Test
public void testStartupShutdown() throws Exception {
CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
@@ -107,8 +114,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
new MockTimer(),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -131,8 +137,7 @@ class ShareCoordinatorServiceTest {
coordinatorMetrics,
time,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -243,8 +248,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -345,8 +349,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -429,8 +432,7 @@ class ShareCoordinatorServiceTest {
coordinatorMetrics,
time,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -511,8 +513,7 @@ class ShareCoordinatorServiceTest {
coordinatorMetrics,
time,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -593,8 +594,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -641,8 +641,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -689,8 +688,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -737,8 +735,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -785,8 +782,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -833,8 +829,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -915,8 +910,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -979,8 +973,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -1043,8 +1036,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -1105,8 +1097,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -1166,8 +1157,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -1218,8 +1208,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -1262,8 +1251,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -1306,8 +1294,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -1349,8 +1336,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -1391,8 +1377,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
service.startup(() -> 1);
@@ -1421,8 +1406,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(),
Time.SYSTEM,
mock(Timer.class),
- mock(PartitionWriter.class),
- () -> true
+ mock(PartitionWriter.class)
);
String groupId = "group1";
@@ -1479,12 +1463,11 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 1);
- service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mockMetadataImageWithShareGroupsEnabled());
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1569,12 +1552,11 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 2);
- service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mockMetadataImageWithShareGroupsEnabled());
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1627,12 +1609,11 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 1);
- service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mockMetadataImageWithShareGroupsEnabled());
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1677,12 +1658,11 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 1);
- service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mockMetadataImageWithShareGroupsEnabled());
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1725,12 +1705,11 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 1);
- service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mockMetadataImageWithShareGroupsEnabled());
verify(runtime, times(0))
.scheduleWriteOperation(
@@ -1786,12 +1765,11 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 1);
- service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mockMetadataImageWithShareGroupsEnabled());
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1858,12 +1836,11 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 1);
- service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mockMetadataImageWithShareGroupsEnabled());
verify(runtime, times(0))
.scheduleWriteOperation(
eq("write-state-record-prune"),
@@ -1909,8 +1886,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
when(runtime.scheduleWriteAllOperation(
@@ -1919,7 +1895,7 @@ class ShareCoordinatorServiceTest {
)).thenReturn(List.of(CompletableFuture.completedFuture(null)));
service.startup(() -> 1);
- service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mockMetadataImageWithShareGroupsEnabled());
verify(runtime, times(0))
.scheduleWriteOperation(
eq("snapshot-cold-partitions"),
@@ -1968,12 +1944,11 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 2);
- service.onMetadataUpdate(mock(MetadataDelta.class),
mock(MetadataImage.class));
+ service.onMetadataUpdate(mock(MetadataDelta.class),
mockMetadataImageWithShareGroupsEnabled());
verify(runtime, times(0))
.scheduleWriteAllOperation(
eq("snapshot-cold-partitions"),
@@ -2012,8 +1987,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> false // So that the feature config is used.
+ writer
));
// Prune job.
@@ -2108,8 +2082,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
List<String> propNames = List.of(
@@ -2141,8 +2114,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 3);
@@ -2191,8 +2163,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 3);
@@ -2227,8 +2198,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 3);
@@ -2237,6 +2207,8 @@ class ShareCoordinatorServiceTest {
MetadataDelta delta = mock(MetadataDelta.class);
MetadataImage image = mock(MetadataImage.class, RETURNS_DEEP_STUBS);
when(delta.topicsDelta()).thenReturn(null);
+
when(image.features().finalizedVersions().getOrDefault(eq(ShareVersion.FEATURE_NAME),
anyShort()))
+ .thenReturn((short) 1);
assertDoesNotThrow(() -> service.onMetadataUpdate(delta, image));
@@ -2262,8 +2234,7 @@ class ShareCoordinatorServiceTest {
new ShareCoordinatorMetrics(metrics),
time,
timer,
- writer,
- () -> true
+ writer
));
service.startup(() -> 3);
diff --git a/tests/kafkatest/services/kafka/config_property.py
b/tests/kafkatest/services/kafka/config_property.py
index 329df370d19..702120e1625 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -78,7 +78,6 @@ CONSUMER_GROUP_MIGRATION_POLICY =
"group.consumer.migration.policy"
SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR
="share.coordinator.state.topic.replication.factor"
SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR = "share.coordinator.state.topic.min.isr"
-SHARE_GROUP_ENABLE = "group.share.enable"
UNSTABLE_API_VERSIONS_ENABLE = "unstable.api.versions.enable"
UNSTABLE_FEATURE_VERSIONS_ENABLE = "unstable.feature.versions.enable"
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index 04d3457f3b7..cb4f0af990d 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -781,9 +781,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
for prop in self.per_node_server_prop_overrides.get(self.idx(node),
[]):
override_configs[prop[0]] = prop[1]
- if self.use_share_groups is not None and self.use_share_groups is True:
- override_configs[config_property.SHARE_GROUP_ENABLE] =
str(self.use_share_groups)
-
if self.use_streams_groups is True:
override_configs[config_property.UNSTABLE_API_VERSIONS_ENABLE] =
str(True)
override_configs[config_property.UNSTABLE_FEATURE_VERSIONS_ENABLE]
= str(True)