This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2f047bedb78 KAFKA-20340 Remove GroupConfigManager dependency from 
SharePartition (#21861)
2f047bedb78 is described below

commit 2f047bedb78c5bd05701b2f0f33b11605aeac6d3
Author: majialong <[email protected]>
AuthorDate: Thu Mar 26 03:04:31 2026 +0800

    KAFKA-20340 Remove GroupConfigManager dependency from SharePartition 
(#21861)
    
    `SharePartition` held both a `GroupConfigManager` and a
    `ShareGroupConfigProvider` reference, but `ShareGroupConfigProvider`
    already wraps `GroupConfigManager`.
    
    This PR removes the redundant `GroupConfigManager` dependency so that
    `SharePartition` only uses `ShareGroupConfigProvider` for dynamic group
    configuration lookups, as suggested in
    
    [#21627.](https://github.com/apache/kafka/pull/21627#discussion_r2944775843)
    
    Reviewers: Sean Quah <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 20 ++-----
 .../kafka/server/share/SharePartitionManager.java  | 20 +++----
 .../src/main/scala/kafka/server/BrokerServer.scala |  3 +-
 .../server/share/SharePartitionManagerTest.java    |  3 +-
 .../kafka/server/share/SharePartitionTest.java     | 69 +++++++++++-----------
 .../modern/share/ShareGroupConfigProvider.java     | 14 +++++
 .../modern/share/ShareGroupConfigProviderTest.java | 21 +++++++
 7 files changed, 88 insertions(+), 62 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 790959ad8cc..28afd716960 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -39,8 +39,6 @@ import 
org.apache.kafka.common.record.internal.ControlRecordType;
 import org.apache.kafka.common.record.internal.Record;
 import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.coordinator.group.GroupConfig;
-import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
@@ -208,11 +206,6 @@ public class SharePartition {
      */
     private final int defaultMaxDeliveryCount;
 
-    /**
-     * The group config manager is used to retrieve the values for dynamic 
group configurations
-     */
-    private final GroupConfigManager groupConfigManager;
-
     /**
      * The provider used to retrieve share group dynamic configuration values.
      */
@@ -343,11 +336,11 @@ public class SharePartition {
         Time time,
         Persister persister,
         ReplicaManager replicaManager,
-        GroupConfigManager groupConfigManager,
+        ShareGroupConfigProvider configProvider,
         SharePartitionListener listener
     ) {
         this(groupId, topicIdPartition, leaderEpoch, 
defaultMaxInFlightRecords, defaultMaxDeliveryCount, defaultRecordLockDurationMs,
-            timer, time, persister, replicaManager, groupConfigManager, 
SharePartitionState.EMPTY, listener,
+            timer, time, persister, replicaManager, configProvider, 
SharePartitionState.EMPTY, listener,
             new SharePartitionMetrics(groupId, topicIdPartition.topic(), 
topicIdPartition.partition()));
     }
 
@@ -364,7 +357,7 @@ public class SharePartition {
         Time time,
         Persister persister,
         ReplicaManager replicaManager,
-        GroupConfigManager groupConfigManager,
+        ShareGroupConfigProvider configProvider,
         SharePartitionState sharePartitionState,
         SharePartitionListener listener,
         SharePartitionMetrics sharePartitionMetrics
@@ -385,8 +378,7 @@ public class SharePartition {
         this.persister = persister;
         this.partitionState = sharePartitionState;
         this.replicaManager = replicaManager;
-        this.groupConfigManager = groupConfigManager;
-        this.configProvider = new ShareGroupConfigProvider(groupConfigManager);
+        this.configProvider = configProvider;
         this.fetchOffsetMetadata = new OffsetMetadata();
         this.delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, 
topicIdPartition);
         this.listener = listener;
@@ -3040,9 +3032,7 @@ public class SharePartition {
         if (partitionDataStartOffset != 
PartitionFactory.UNINITIALIZED_START_OFFSET) {
             return partitionDataStartOffset;
         }
-        ShareGroupAutoOffsetResetStrategy offsetResetStrategy = 
groupConfigManager.groupConfig(groupId)
-            .map(GroupConfig::shareAutoOffsetReset)
-            .orElseGet(GroupConfig::defaultShareAutoOffsetReset);
+        ShareGroupAutoOffsetResetStrategy offsetResetStrategy = 
configProvider.autoOffsetReset(groupId);
 
         if (offsetResetStrategy.type() == 
ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
             return offsetForLatestTimestamp(topicIdPartition, replicaManager, 
leaderEpoch);
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 22cdb5d61fe..27da4cdafa6 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -31,7 +31,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ShareRequestMetadata;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.coordinator.group.GroupConfigManager;
+import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
 import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.partition.PartitionListener;
 import org.apache.kafka.server.share.CachedSharePartition;
@@ -104,9 +104,9 @@ public class SharePartitionManager implements AutoCloseable 
{
     private final ShareSessionCache cache;
 
     /**
-     * The group config manager is used to retrieve the values for dynamic 
group configurations
+     * The provider used to retrieve share group dynamic configuration values.
      */
-    private final GroupConfigManager groupConfigManager;
+    private final ShareGroupConfigProvider configProvider;
 
     /**
      * The default record lock duration is the time in milliseconds that a 
record lock is held for.
@@ -157,7 +157,7 @@ public class SharePartitionManager implements AutoCloseable 
{
         int maxInFlightRecords,
         long remoteFetchMaxWaitMs,
         Persister persister,
-        GroupConfigManager groupConfigManager,
+        ShareGroupConfigProvider configProvider,
         BrokerTopicStats brokerTopicStats
     ) {
         this(replicaManager,
@@ -169,7 +169,7 @@ public class SharePartitionManager implements AutoCloseable 
{
             maxInFlightRecords,
             remoteFetchMaxWaitMs,
             persister,
-            groupConfigManager,
+            configProvider,
             new ShareGroupMetrics(time),
             brokerTopicStats
         );
@@ -185,7 +185,7 @@ public class SharePartitionManager implements AutoCloseable 
{
         int maxInFlightRecords,
         long remoteFetchMaxWaitMs,
         Persister persister,
-        GroupConfigManager groupConfigManager,
+        ShareGroupConfigProvider configProvider,
         ShareGroupMetrics shareGroupMetrics,
         BrokerTopicStats brokerTopicStats
     ) {
@@ -200,7 +200,7 @@ public class SharePartitionManager implements AutoCloseable 
{
             maxInFlightRecords,
             remoteFetchMaxWaitMs,
             persister,
-            groupConfigManager,
+            configProvider,
             shareGroupMetrics,
             brokerTopicStats
         );
@@ -218,7 +218,7 @@ public class SharePartitionManager implements AutoCloseable 
{
             int maxInFlightRecords,
             long remoteFetchMaxWaitMs,
             Persister persister,
-            GroupConfigManager groupConfigManager,
+            ShareGroupConfigProvider configProvider,
             ShareGroupMetrics shareGroupMetrics,
             BrokerTopicStats brokerTopicStats
     ) {
@@ -232,7 +232,7 @@ public class SharePartitionManager implements AutoCloseable 
{
         this.maxInFlightRecords = maxInFlightRecords;
         this.remoteFetchMaxWaitMs = remoteFetchMaxWaitMs;
         this.persister = persister;
-        this.groupConfigManager = groupConfigManager;
+        this.configProvider = configProvider;
         this.shareGroupMetrics = shareGroupMetrics;
         this.brokerTopicStats = brokerTopicStats;
         this.cache.registerShareGroupListener(new ShareGroupListenerImpl());
@@ -718,7 +718,7 @@ public class SharePartitionManager implements AutoCloseable 
{
                             time,
                             persister,
                             replicaManager,
-                            groupConfigManager,
+                            configProvider,
                             listener
                     );
                 });
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index f7aef286e10..4c20d1b78c4 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.{ClusterResource, 
TopicPartition, Uuid}
 import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoaderImpl, 
CoordinatorRecord}
 import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, 
GroupCoordinatorRuntimeMetrics}
 import org.apache.kafka.coordinator.group.{GroupConfigManager, 
GroupCoordinator, GroupCoordinatorRecordSerde, GroupCoordinatorService}
+import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider
 import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, 
ShareCoordinatorRuntimeMetrics}
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorRecordSerde, ShareCoordinatorService}
 import org.apache.kafka.coordinator.transaction.ProducerIdManager
@@ -456,7 +457,7 @@ class BrokerServer(
         config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
         config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong,
         persister,
-        groupConfigManager,
+        new ShareGroupConfigProvider(groupConfigManager),
         brokerTopicStats
       )
 
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index 635c10c6424..9039d8369ef 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -51,6 +51,7 @@ import org.apache.kafka.common.requests.ShareRequestMetadata;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
+import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
 import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.purgatory.DelayedOperationKey;
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
@@ -3287,7 +3288,7 @@ public class SharePartitionManagerTest {
                 MAX_IN_FLIGHT_MESSAGES,
                 REMOTE_FETCH_MAX_WAIT_MS,
                 persister,
-                mock(GroupConfigManager.class),
+                new ShareGroupConfigProvider(mock(GroupConfigManager.class)),
                 shareGroupMetrics,
                 brokerTopicStats
             );
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index a5040ea98af..285de56397f 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -54,6 +54,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
+import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
@@ -228,7 +229,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withReplicaManager(replicaManager)
             .build();
 
@@ -279,7 +280,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withReplicaManager(replicaManager)
             .build();
 
@@ -340,7 +341,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withReplicaManager(replicaManager)
             .withSharePartitionMetrics(sharePartitionMetrics)
             .build();
@@ -395,7 +396,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withReplicaManager(replicaManager)
             .build();
 
@@ -442,7 +443,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withReplicaManager(replicaManager)
             .build();
 
@@ -488,7 +489,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withReplicaManager(replicaManager)
             .build();
 
@@ -542,7 +543,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withReplicaManager(replicaManager)
             .build();
 
@@ -7153,7 +7154,7 @@ public class SharePartitionTest {
         
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(expectedDurationMs);
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withGroupConfigManager(groupConfigManager).build();
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager)).build();
 
         AcquisitionLockTimerTask timerTask = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
 
@@ -7175,7 +7176,7 @@ public class SharePartitionTest {
             .thenReturn(expectedDurationMs2);
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withGroupConfigManager(groupConfigManager).build();
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager)).build();
 
         AcquisitionLockTimerTask timerTask1 = 
sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
 
@@ -10389,7 +10390,7 @@ public class SharePartitionTest {
             .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withMaxDeliveryCount(2)
             .withPersister(persister)
-            .withGroupConfigManager(groupConfigManagerWithRenewDisabled())
+            .withConfigProvider(configProviderWithRenewDisabled())
             .build();
 
         List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition, 
memoryRecords(0, 1), 1);
@@ -10423,7 +10424,7 @@ public class SharePartitionTest {
             .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
             .withMaxDeliveryCount(2)
             .withPersister(persister)
-            .withGroupConfigManager(groupConfigManagerWithRenewDisabled())
+            .withConfigProvider(configProviderWithRenewDisabled())
             .build();
 
         List<AcquiredRecords> records = fetchAcquiredRecords(sharePartition, 
memoryRecords(0, 2), 2);
@@ -12328,7 +12329,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withMaxDeliveryCount(5)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12343,7 +12344,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withMaxDeliveryCount(5)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12359,7 +12360,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withMaxDeliveryCount(10)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12398,7 +12399,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withMaxDeliveryCount(2)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12436,7 +12437,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withMaxInflightRecords(2000)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12451,7 +12452,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withMaxInflightRecords(2000)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12465,7 +12466,7 @@ public class SharePartitionTest {
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12495,7 +12496,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withMaxInflightRecords(10)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12525,7 +12526,7 @@ public class SharePartitionTest {
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withMaxInflightRecords(50)
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12560,7 +12561,7 @@ public class SharePartitionTest {
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12579,7 +12580,7 @@ public class SharePartitionTest {
         
when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.empty());
 
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withGroupConfigManager(groupConfigManager)
+            .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12601,15 +12602,13 @@ public class SharePartitionTest {
         assertFalse(sharePartition.cachedState().isEmpty());
     }
 
-    private static GroupConfigManager groupConfigManagerWithRenewDisabled() {
-        GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
-        GroupConfig groupConfig = Mockito.mock(GroupConfig.class);
-        
Mockito.when(groupConfigManager.groupConfig(GROUP_ID)).thenReturn(Optional.of(groupConfig));
-        
Mockito.when(groupConfig.shareRenewAcknowledgeEnable()).thenReturn(false);
-        
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(ACQUISITION_LOCK_TIMEOUT_MS);
-        
Mockito.when(groupConfig.sharePartitionMaxRecordLocks()).thenReturn(DEFAULT_MAX_IN_FLIGHT_RECORDS);
-        
Mockito.when(groupConfig.shareDeliveryCountLimit()).thenReturn(DEFAULT_MAX_DELIVERY_COUNT);
-        return groupConfigManager;
+    private static ShareGroupConfigProvider configProviderWithRenewDisabled() {
+        ShareGroupConfigProvider configProvider = 
Mockito.mock(ShareGroupConfigProvider.class);
+        
Mockito.when(configProvider.isRenewAcknowledgeEnabled(GROUP_ID)).thenReturn(false);
+        Mockito.when(configProvider.recordLockDurationMsOrDefault(GROUP_ID, 
ACQUISITION_LOCK_TIMEOUT_MS)).thenReturn(ACQUISITION_LOCK_TIMEOUT_MS);
+        Mockito.when(configProvider.partitionMaxRecordLocksOrDefault(GROUP_ID, 
DEFAULT_MAX_IN_FLIGHT_RECORDS)).thenReturn(DEFAULT_MAX_IN_FLIGHT_RECORDS);
+        Mockito.when(configProvider.deliveryCountLimitOrDefault(GROUP_ID, 
DEFAULT_MAX_DELIVERY_COUNT)).thenReturn(DEFAULT_MAX_DELIVERY_COUNT);
+        return configProvider;
     }
 
     private static class SharePartitionBuilder {
@@ -12620,7 +12619,7 @@ public class SharePartitionTest {
 
         private Persister persister = new NoOpStatePersister();
         private ReplicaManager replicaManager = 
Mockito.mock(ReplicaManager.class);
-        private GroupConfigManager groupConfigManager = 
Mockito.mock(GroupConfigManager.class);
+        private ShareGroupConfigProvider configProvider = new 
ShareGroupConfigProvider(Mockito.mock(GroupConfigManager.class));
         private SharePartitionState state = SharePartitionState.EMPTY;
         private Time time = MOCK_TIME;
         private SharePartitionMetrics sharePartitionMetrics = 
Mockito.mock(SharePartitionMetrics.class);
@@ -12650,8 +12649,8 @@ public class SharePartitionTest {
             return this;
         }
 
-        private SharePartitionBuilder 
withGroupConfigManager(GroupConfigManager groupConfigManager) {
-            this.groupConfigManager = groupConfigManager;
+        private SharePartitionBuilder 
withConfigProvider(ShareGroupConfigProvider configProvider) {
+            this.configProvider = configProvider;
             return this;
         }
 
@@ -12676,7 +12675,7 @@ public class SharePartitionTest {
 
         public SharePartition build() {
             return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, 
defaultMaxInflightRecords, defaultMaxDeliveryCount,
-                    defaultAcquisitionLockTimeoutMs, mockTimer, time, 
persister, replicaManager, groupConfigManager,
+                    defaultAcquisitionLockTimeoutMs, mockTimer, time, 
persister, replicaManager, configProvider,
                     state, Mockito.mock(SharePartitionListener.class), 
sharePartitionMetrics);
         }
     }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
index f8d2bd5dc70..447750fb611 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProvider.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group.modern.share;
 
 import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 
 /**
  * A provider that retrieves share group dynamic configuration values,
@@ -84,4 +85,17 @@ public class ShareGroupConfigProvider {
             .map(GroupConfig::shareRenewAcknowledgeEnable)
             .orElse(GroupConfig.SHARE_RENEW_ACKNOWLEDGE_ENABLE_DEFAULT);
     }
+
+    /**
+     * The method is used to get the auto offset reset strategy for the group. 
If the group config
+     * is present, then the value from the group config is used. Otherwise, 
the default value is used.
+     *
+     * @param groupId The group id for which the auto offset reset strategy is 
to be fetched.
+     * @return The auto offset reset strategy for the group.
+     */
+    public ShareGroupAutoOffsetResetStrategy autoOffsetReset(String groupId) {
+        return manager.groupConfig(groupId)
+            .map(GroupConfig::shareAutoOffsetReset)
+            .orElseGet(GroupConfig::defaultShareAutoOffsetReset);
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
index d9f85ee9e26..1eec4abc0f3 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupConfigProviderTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group.modern.share;
 
 import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
+import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 
 import org.junit.jupiter.api.Test;
 
@@ -111,4 +112,24 @@ public class ShareGroupConfigProviderTest {
 
         assertTrue(provider.isRenewAcknowledgeEnabled("test-group"));
     }
+
+    @Test
+    void testAutoOffsetResetWithGroupConfig() {
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        GroupConfig groupConfig = mock(GroupConfig.class);
+        
when(groupConfig.shareAutoOffsetReset()).thenReturn(ShareGroupAutoOffsetResetStrategy.EARLIEST);
+        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.of(groupConfig));
+        provider = new ShareGroupConfigProvider(groupConfigManager);
+
+        assertEquals(ShareGroupAutoOffsetResetStrategy.EARLIEST, 
provider.autoOffsetReset("test-group"));
+    }
+
+    @Test
+    void testAutoOffsetResetWithoutGroupConfig() {
+        GroupConfigManager groupConfigManager = mock(GroupConfigManager.class);
+        
when(groupConfigManager.groupConfig("test-group")).thenReturn(Optional.empty());
+        provider = new ShareGroupConfigProvider(groupConfigManager);
+
+        assertEquals(GroupConfig.defaultShareAutoOffsetReset(), 
provider.autoOffsetReset("test-group"));
+    }
 }

Reply via email to