This is an automated email from the ASF dual-hosted git repository.

apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 32e49cee129 KAFKA-20610: Move abstraction instantiation to 
BrokerServer for share fetch (3/N) (#22414)
32e49cee129 is described below

commit 32e49cee129241fbab48c4754262c9a3f801bb94
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri May 29 18:34:01 2026 +0100

    KAFKA-20610: Move abstraction instantiation to BrokerServer for share fetch 
(3/N) (#22414)
    
    Moves the instantiation of LogReader, PartitionMetadataProvider, and
    Consumer<DelayedShareFetchKey> from SharePartitionManager to
    BrokerServer.
    
    #### What Changed:
    - BrokerServer now creates ReplicaManagerLogReader,
    ReplicaManagerPartitionMetadataProvider, and delayed request notifier
    - Test builder updated to create and inject dependencies
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/server/share/SharePartitionManager.java  | 28 +++++++++++++++++-----
 .../src/main/scala/kafka/server/BrokerServer.scala |  6 ++++-
 .../server/share/SharePartitionManagerTest.java    |  3 +++
 3 files changed, 30 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 0472fce579d..a8eff11ac90 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -177,8 +177,12 @@ public class SharePartitionManager implements 
AutoCloseable {
      */
     private final ShareGroupDLQManager shareGroupDLQManager;
 
+    @SuppressWarnings("ParameterNumber")
     public SharePartitionManager(
         ReplicaManager replicaManager,
+        LogReader logReader,
+        PartitionMetadataProvider metadataProvider,
+        Consumer<DelayedShareFetchKey> delayedRequestNotifier,
         Time time,
         ShareSessionCache cache,
         int defaultRecordLockDurationMs,
@@ -192,6 +196,9 @@ public class SharePartitionManager implements AutoCloseable 
{
         ShareGroupDLQManager shareGroupDLQManager
     ) {
         this(replicaManager,
+            logReader,
+            metadataProvider,
+            delayedRequestNotifier,
             time,
             cache,
             new SharePartitionCache(),
@@ -211,6 +218,9 @@ public class SharePartitionManager implements AutoCloseable 
{
     @SuppressWarnings("ParameterNumber")
     private SharePartitionManager(
         ReplicaManager replicaManager,
+        LogReader logReader,
+        PartitionMetadataProvider metadataProvider,
+        Consumer<DelayedShareFetchKey> delayedRequestNotifier,
         Time time,
         ShareSessionCache cache,
         SharePartitionCache partitionCache,
@@ -226,6 +236,9 @@ public class SharePartitionManager implements AutoCloseable 
{
         ShareGroupDLQManager shareGroupDLQManager
     ) {
         this(replicaManager,
+            logReader,
+            metadataProvider,
+            delayedRequestNotifier,
             time,
             cache,
             partitionCache,
@@ -248,6 +261,9 @@ public class SharePartitionManager implements AutoCloseable 
{
     @SuppressWarnings("ParameterNumber")
     SharePartitionManager(
             ReplicaManager replicaManager,
+            LogReader logReader,
+            PartitionMetadataProvider metadataProvider,
+            Consumer<DelayedShareFetchKey> delayedRequestNotifier,
             Time time,
             ShareSessionCache cache,
             SharePartitionCache partitionCache,
@@ -264,9 +280,9 @@ public class SharePartitionManager implements AutoCloseable 
{
             ShareGroupDLQManager shareGroupDLQManager
     ) {
         this.replicaManager = replicaManager;
-        this.logReader = new ReplicaManagerLogReader(replicaManager);
-        this.metadataProvider = new 
ReplicaManagerPartitionMetadataProvider(replicaManager);
-        this.delayedRequestNotifier = 
replicaManager::completeDelayedShareFetchRequest;
+        this.logReader = logReader;
+        this.metadataProvider = metadataProvider;
+        this.delayedRequestNotifier = delayedRequestNotifier;
         this.time = time;
         this.cache = cache;
         this.partitionCache = partitionCache;
@@ -369,7 +385,7 @@ public class SharePartitionManager implements AutoCloseable 
{
                 // If we have an acknowledgement completed for a 
topic-partition, then we should check if
                 // there is a pending share fetch request for the 
topic-partition and complete it.
                 DelayedShareFetchKey delayedShareFetchKey = new 
DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), 
topicIdPartition.partition());
-                
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
+                delayedRequestNotifier.accept(delayedShareFetchKey);
 
                 futures.put(topicIdPartition, future);
             } else {
@@ -437,7 +453,7 @@ public class SharePartitionManager implements AutoCloseable 
{
                 // If we have a release acquired request completed for a 
topic-partition, then we should check if
                 // there is a pending share fetch request for the 
topic-partition and complete it.
                 DelayedShareFetchKey delayedShareFetchKey = new 
DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), 
topicIdPartition.partition());
-                
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
+                delayedRequestNotifier.accept(delayedShareFetchKey);
 
                 futuresMap.put(topicIdPartition, future);
             }
@@ -718,7 +734,7 @@ public class SharePartitionManager implements AutoCloseable 
{
                 // for the share partition.
                 if (!initialized) {
                     
shareGroupMetrics.partitionLoadTime(sharePartition.loadStartTimeMs());
-                    
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
+                    delayedRequestNotifier.accept(delayedShareFetchKey);
                 }
             });
             sharePartitions.put(topicIdPartition, sharePartition);
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 04a1ee7672c..63a0472ddf0 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -23,7 +23,7 @@ import kafka.log.LogManager
 import kafka.network.SocketServer
 import kafka.raft.KafkaRaftManager
 import kafka.server.metadata._
-import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl, 
SharePartitionManager}
+import kafka.server.share.{ReplicaManagerPartitionMetadataProvider, 
ReplicaManagerLogReader, ShareCoordinatorMetadataCacheHelperImpl, 
SharePartitionManager}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.internals.Plugin
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -53,6 +53,7 @@ import 
org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback
 import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, 
RemoteLogManagerConfig}
 import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, 
KafkaYammerMetrics}
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
+import org.apache.kafka.server.share.fetch.DelayedShareFetchKey
 import org.apache.kafka.server.share.persister.{DefaultStatePersister, 
NoOpStatePersister, Persister}
 import org.apache.kafka.server.share.session.ShareSessionCache
 import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper, 
Timer}
@@ -460,6 +461,9 @@ class BrokerServer(
 
       sharePartitionManager = new SharePartitionManager(
         replicaManager,
+        new ReplicaManagerLogReader(replicaManager),
+        new ReplicaManagerPartitionMetadataProvider(replicaManager),
+        (key: DelayedShareFetchKey) => 
replicaManager.completeDelayedShareFetchRequest(key),
         time,
         shareFetchSessionCache,
         config.shareGroupConfig.shareGroupRecordLockDurationMs,
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index eb6965bda32..3bde6678921 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -3310,6 +3310,9 @@ public class SharePartitionManagerTest {
 
         public SharePartitionManager build() {
             return new SharePartitionManager(replicaManager,
+                new ReplicaManagerLogReader(replicaManager),
+                new ReplicaManagerPartitionMetadataProvider(replicaManager),
+                replicaManager::completeDelayedShareFetchRequest,
                 time,
                 cache,
                 partitionCache,

Reply via email to