This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2f047bedb78 KAFKA-20340 Remove GroupConfigManager dependency from
SharePartition (#21861)
2f047bedb78 is described below
commit 2f047bedb78c5bd05701b2f0f33b11605aeac6d3
Author: majialong <[email protected]>
AuthorDate: Thu Mar 26 03:04:31 2026 +0800
KAFKA-20340 Remove GroupConfigManager dependency from SharePartition
(#21861)
`SharePartition` held both a `GroupConfigManager` and a
`ShareGroupConfigProvider` reference, but `ShareGroupConfigProvider`
already wraps `GroupConfigManager`.
This PR removes the redundant `GroupConfigManager` dependency so that
`SharePartition` only uses `ShareGroupConfigProvider` for dynamic group
configuration lookups, as suggested in
[#21627.](https://github.com/apache/kafka/pull/21627#discussion_r2944775843)
Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 20 ++-----
.../kafka/server/share/SharePartitionManager.java | 20 +++----
.../src/main/scala/kafka/server/BrokerServer.scala | 3 +-
.../server/share/SharePartitionManagerTest.java | 3 +-
.../kafka/server/share/SharePartitionTest.java | 69 +++++++++++-----------
.../modern/share/ShareGroupConfigProvider.java | 14 +++++
.../modern/share/ShareGroupConfigProviderTest.java | 21 +++++++
7 files changed, 88 insertions(+), 62 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 790959ad8cc..28afd716960 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -39,8 +39,6 @@ import
org.apache.kafka.common.record.internal.ControlRecordType;
import org.apache.kafka.common.record.internal.Record;
import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.coordinator.group.GroupConfig;
-import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
@@ -208,11 +206,6 @@ public class SharePartition {
*/
private final int defaultMaxDeliveryCount;
- /**
- * The group config manager is used to retrieve the values for dynamic
group configurations
- */
- private final GroupConfigManager groupConfigManager;
-
/**
* The provider used to retrieve share group dynamic configuration values.
*/
@@ -343,11 +336,11 @@ public class SharePartition {
Time time,
Persister persister,
ReplicaManager replicaManager,
- GroupConfigManager groupConfigManager,
+ ShareGroupConfigProvider configProvider,
SharePartitionListener listener
) {
this(groupId, topicIdPartition, leaderEpoch,
defaultMaxInFlightRecords, defaultMaxDeliveryCount, defaultRecordLockDurationMs,
- timer, time, persister, replicaManager, groupConfigManager,
SharePartitionState.EMPTY, listener,
+ timer, time, persister, replicaManager, configProvider,
SharePartitionState.EMPTY, listener,
new SharePartitionMetrics(groupId, topicIdPartition.topic(),
topicIdPartition.partition()));
}
@@ -364,7 +357,7 @@ public class SharePartition {
Time time,
Persister persister,
ReplicaManager replicaManager,
- GroupConfigManager groupConfigManager,
+ ShareGroupConfigProvider configProvider,
SharePartitionState sharePartitionState,
SharePartitionListener listener,
SharePartitionMetrics sharePartitionMetrics
@@ -385,8 +378,7 @@ public class SharePartition {
this.persister = persister;
this.partitionState = sharePartitionState;
this.replicaManager = replicaManager;
- this.groupConfigManager = groupConfigManager;
- this.configProvider = new ShareGroupConfigProvider(groupConfigManager);
+ this.configProvider = configProvider;
this.fetchOffsetMetadata = new OffsetMetadata();
this.delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId,
topicIdPartition);
this.listener = listener;
@@ -3040,9 +3032,7 @@ public class SharePartition {
if (partitionDataStartOffset !=
PartitionFactory.UNINITIALIZED_START_OFFSET) {
return partitionDataStartOffset;
}
- ShareGroupAutoOffsetResetStrategy offsetResetStrategy =
groupConfigManager.groupConfig(groupId)
- .map(GroupConfig::shareAutoOffsetReset)
- .orElseGet(GroupConfig::defaultShareAutoOffsetReset);
+ ShareGroupAutoOffsetResetStrategy offsetResetStrategy =
configProvider.autoOffsetReset(groupId);
if (offsetResetStrategy.type() ==
ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
return offsetForLatestTimestamp(topicIdPartition, replicaManager,
leaderEpoch);
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 22cdb5d61fe..27da4cdafa6 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.coordinator.group.GroupConfigManager;
+import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.partition.PartitionListener;
import org.apache.kafka.server.share.CachedSharePartition;
@@ -104,9 +104,9 @@ public class SharePartitionManager implements AutoCloseable
{
private final ShareSessionCache cache;
/**
- * The group config manager is used to retrieve the values for dynamic
group configurations
+ * The provider used to retrieve share group dynamic configuration values.
*/
- private final GroupConfigManager groupConfigManager;
+ private final ShareGroupConfigProvider configProvider;
/**
* The default record lock duration is the time in milliseconds that a
record lock is held for.
@@ -157,7 +157,7 @@ public class SharePartitionManager implements AutoCloseable
{
int maxInFlightRecords,
long remoteFetchMaxWaitMs,
Persister persister,
- GroupConfigManager groupConfigManager,
+ ShareGroupConfigProvider configProvider,
BrokerTopicStats brokerTopicStats
) {
this(replicaManager,
@@ -169,7 +169,7 @@ public class SharePartitionManager implements AutoCloseable
{
maxInFlightRecords,
remoteFetchMaxWaitMs,
persister,
- groupConfigManager,
+ configProvider,
new ShareGroupMetrics(time),
brokerTopicStats
);
@@ -185,7 +185,7 @@ public class SharePartitionManager implements AutoCloseable
{
int maxInFlightRecords,
long remoteFetchMaxWaitMs,
Persister persister,
- GroupConfigManager groupConfigManager,
+ ShareGroupConfigProvider configProvider,
ShareGroupMetrics shareGroupMetrics,
BrokerTopicStats brokerTopicStats
) {
@@ -200,7 +200,7 @@ public class SharePartitionManager implements AutoCloseable
{
maxInFlightRecords,
remoteFetchMaxWaitMs,
persister,
- groupConfigManager,
+ configProvider,
shareGroupMetrics,
brokerTopicStats
);
@@ -218,7 +218,7 @@ public class SharePartitionManager implements AutoCloseable
{
int maxInFlightRecords,
long remoteFetchMaxWaitMs,
Persister persister,
- GroupConfigManager groupConfigManager,
+ ShareGroupConfigProvider configProvider,
ShareGroupMetrics shareGroupMetrics,
BrokerTopicStats brokerTopicStats
) {
@@ -232,7 +232,7 @@ public class SharePartitionManager implements AutoCloseable
{
this.maxInFlightRecords = maxInFlightRecords;
this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
this.persister = persister;
- this.groupConfigManager = groupConfigManager;
+ this.configProvider = configProvider;
this.shareGroupMetrics = shareGroupMetrics;
this.brokerTopicStats = brokerTopicStats;
this.cache.registerShareGroupListener(new ShareGroupListenerImpl());
@@ -718,7 +718,7 @@ public class SharePartitionManager implements AutoCloseable
{
time,
persister,
replicaManager,
- groupConfigManager,
+ configProvider,
listener
);
});
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index f7aef286e10..4c20d1b78c4 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.{ClusterResource,
TopicPartition, Uuid}
import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoaderImpl,
CoordinatorRecord}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics,
GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{GroupConfigManager,
GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService}
+import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider
import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics,
ShareCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorRecordSerde, ShareCoordinatorService}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
@@ -456,7 +457,7 @@ class BrokerServer(
config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong,
persister,
- groupConfigManager,
+ new ShareGroupConfigProvider(groupConfigManager),
brokerTopicStats
)
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 635c10c6424..9039d8369ef 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
+import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
@@ -3287,7 +3288,7 @@ public class SharePartitionManagerTest {
MAX_IN_FLIGHT_MESSAGES,
REMOTE_FETCH_MAX_WAIT_MS,
persister,
- mock(GroupConfigManager.class),
+ new ShareGroupConfigProvider(mock(GroupConfigManager.class)),
shareGroupMetrics,
brokerTopicStats
);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index a5040ea98af..285de56397f 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -54,6 +54,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
+import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@@ -228,7 +229,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withReplicaManager(replicaManager)
.build();
@@ -279,7 +280,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withReplicaManager(replicaManager)
.build();
@@ -340,7 +341,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withReplicaManager(replicaManager)
.withSharePartitionMetrics(sharePartitionMetrics)
.build();
@@ -395,7 +396,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withReplicaManager(replicaManager)
.build();
@@ -442,7 +443,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withReplicaManager(replicaManager)
.build();
@@ -488,7 +489,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withReplicaManager(replicaManager)
.build();
@@ -542,7 +543,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withReplicaManager(replicaManager)
.build();
@@ -7153,7 +7154,7 @@ public class SharePartitionTest {
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(expectedDurationMs);
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withGroupConfigManager(groupConfigManager).build();
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager)).build();
AcquisitionLockTimerTask timerTask =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
@@ -7175,7 +7176,7 @@ public class SharePartitionTest {
.thenReturn(expectedDurationMs2);
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withGroupConfigManager(groupConfigManager).build();
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager)).build();
AcquisitionLockTimerTask timerTask1 =
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
@@ -10389,7 +10390,7 @@ public class SharePartitionTest {
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2)
.withPersister(persister)
- .withGroupConfigManager(groupConfigManagerWithRenewDisabled())
+ .withConfigProvider(configProviderWithRenewDisabled())
.build();
List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition,
memoryRecords(0, 1), 1);
@@ -10423,7 +10424,7 @@ public class SharePartitionTest {
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withMaxDeliveryCount(2)
.withPersister(persister)
- .withGroupConfigManager(groupConfigManagerWithRenewDisabled())
+ .withConfigProvider(configProviderWithRenewDisabled())
.build();
List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition,
memoryRecords(0, 2), 2);
@@ -12328,7 +12329,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxDeliveryCount(5)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12343,7 +12344,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxDeliveryCount(5)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12359,7 +12360,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxDeliveryCount(10)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12398,7 +12399,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxDeliveryCount(2)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12436,7 +12437,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightRecords(2000)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12451,7 +12452,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightRecords(2000)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12465,7 +12466,7 @@ public class SharePartitionTest {
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12495,7 +12496,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightRecords(10)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12525,7 +12526,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withMaxInflightRecords(50)
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12560,7 +12561,7 @@ public class SharePartitionTest {
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12579,7 +12580,7 @@ public class SharePartitionTest {
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withGroupConfigManager(groupConfigManager)
+ .withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12601,15 +12602,13 @@ public class SharePartitionTest {
assertFalse(sharePartition.cachedState().isEmpty());
}
- private static GroupConfigManager groupConfigManagerWithRenewDisabled() {
- GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
- GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
-
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-
Mockito.when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(false);
-
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(ACQUISITION_LOCK_TIMEOUT_MS);
-
Mockito.when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(DEFAULT_MAX_IN_FLIGHT_RECORDS);
-
Mockito.when(groupConfig.shareDeliveryCountLimit()).thenReturn(DEFAULT_MAX_DELIVERY_COUNT);
- return groupConfigManager;
+ private static ShareGroupConfigProvider configProviderWithRenewDisabled() {
+ ShareGroupConfigProvider configProvider =
Mockito.mock(ShareGroupConfigProvider.class);
+
Mockito.when(configProvider.isRenewAcknowledgeEnabled(GROUP_ID)).thenReturn(false);
+ Mockito.when(configProvider.recordLockDurationMsOrDefault(GROUP_ID,
ACQUISITION_LOCK_TIMEOUT_MS)).thenReturn(ACQUISITION_LOCK_TIMEOUT_MS);
+ Mockito.when(configProvider.partitionMaxRecordLocksOrDefault(GROUP_ID,
DEFAULT_MAX_IN_FLIGHT_RECORDS)).thenReturn(DEFAULT_MAX_IN_FLIGHT_RECORDS);
+ Mockito.when(configProvider.deliveryCountLimitOrDefault(GROUP_ID,
DEFAULT_MAX_DELIVERY_COUNT)).thenReturn(DEFAULT_MAX_DELIVERY_COUNT);
+ return configProvider;
}
private static class SharePartitionBuilder {
@@ -12620,7 +12619,7 @@ public class SharePartitionTest {
private Persister persister = new NoOpStatePersister();
private ReplicaManager replicaManager =
Mockito.mock(ReplicaManager.class);
- private GroupConfigManager groupConfigManager =
Mockito.mock(GroupConfigManager.class);
+ private ShareGroupConfigProvider configProvider = new
ShareGroupConfigProvider(Mockito.mock(GroupConfigManager.class));
private SharePartitionState state = SharePartitionState.EMPTY;
private Time time = MOCK_TIME;
private SharePartitionMetrics sharePartitionMetrics =
Mockito.mock(SharePartitionMetrics.class);
@@ -12650,8 +12649,8 @@ public class SharePartitionTest {
return this;
}
- private SharePartitionBuilder
withGroupConfigManager(GroupConfigManager groupConfigManager) {
- this.groupConfigManager = groupConfigManager;
+ private SharePartitionBuilder
withConfigProvider(ShareGroupConfigProvider configProvider) {
+ this.configProvider = configProvider;
return this;
}
@@ -12676,7 +12675,7 @@ public class SharePartitionTest {
public SharePartition build() {
return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0,
defaultMaxInflightRecords, defaultMaxDeliveryCount,
- defaultAcquisitionLockTimeoutMs, mockTimer, time,
persister, replicaManager, groupConfigManager,
+ defaultAcquisitionLockTimeoutMs, mockTimer, time,
persister, replicaManager, configProvider,
state, Mockito.mock(SharePartitionListener.class),
sharePartitionMetrics);
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
index f8d2bd5dc70..447750fb611 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
/**
* A provider that retrieves share group dynamic configuration values,
@@ -84,4 +85,17 @@ public class ShareGroupConfigProvider {
.map(GroupConfig::shareRenewAcknowledgeEnable)
.orElse(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT);
}
+
+ /**
+ * The method is used to get the auto offset reset strategy for the group.
If the group config
+ * is present, then the value from the group config is used. Otherwise,
the default value is used.
+ *
+ * @param groupId The group id for which the auto offset reset strategy is
to be fetched.
+ * @return The auto offset reset strategy for the group.
+ */
+ public ShareGroupAutoOffsetResetStrategy autoOffsetReset(String groupId) {
+ return manager.groupConfig(groupId)
+ .map(GroupConfig::shareAutoOffsetReset)
+ .orElseGet(GroupConfig::defaultShareAutoOffsetReset);
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
index d9f85ee9e26..1eec4abc0f3 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.junit.jupiter.api.Test;
@@ -111,4 +112,24 @@ public class ShareGroupConfigProviderTest {
assertTrue(provider.isRenewAcknowledgeEnabled("test-group"));
}
+
+ @Test
+ void testAutoOffsetResetWithGroupConfig() {
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+ GroupConfig groupConfig = mock(GroupConfig.class);
+
when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
+
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
+ provider = new ShareGroupConfigProvider(groupConfigManager);
+
+ assertEquals(ShareGroupAutoOffsetResetStrategy.EARLIEST,
provider.autoOffsetReset("test-group"));
+ }
+
+ @Test
+ void testAutoOffsetResetWithoutGroupConfig() {
+ GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+ provider = new ShareGroupConfigProvider(groupConfigManager);
+
+ assertEquals(GroupConfig.defaultShareAutoOffsetReset(),
provider.autoOffsetReset("test-group"));
+ }
}