This is an automated email from the ASF dual-hosted git repository.

apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f8705000ee2 KAFKA-20610: Wire abstraction interfaces into share group 
classes (2/N) (#22407)
f8705000ee2 is described below

commit f8705000ee2ee2e78027c6ca1d36f8fae8890d8a
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri May 29 13:56:16 2026 +0100

    KAFKA-20610: Wire abstraction interfaces into share group classes (2/N) 
(#22407)
    
    This PR integrates the `LogReader` and `PartitionMetadataProvider`
    abstractions (introduced in #22389) into the share fetch path, replacing
    direct ReplicaManager dependencies in DelayedShareFetch, SharePartition,
    SharePartitionManager, and ShareFetchUtils.
    
    #### What Changed
    
      Core classes now accept LogReader and PartitionMetadataProvider
    abstractions instead of ReplicaManager. SharePartitionManager
    instantiates the default ReplicaManager-backed implementations and wires
    them through the dependency graph.
    
      SharePartition uses a Consumer<DelayedShareFetchKey> for delayed
    request notifications instead of calling ReplicaManager directly.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../java/kafka/server/share/DelayedShareFetch.java | 69 +++++--------------
 .../ReplicaManagerPartitionMetadataProvider.java   | 16 +++--
 .../java/kafka/server/share/ShareFetchUtils.java   | 79 ++--------------------
 .../java/kafka/server/share/SharePartition.java    | 36 +++++-----
 .../kafka/server/share/SharePartitionManager.java  | 57 +++++++++++-----
 .../kafka/server/share/DelayedShareFetchTest.java  |  8 +++
 .../kafka/server/share/ShareFetchUtilsTest.java    | 17 ++---
 .../server/share/SharePartitionManagerTest.java    | 12 +++-
 .../kafka/server/share/SharePartitionTest.java     | 44 +++++++-----
 .../unit/kafka/server/ReplicaManagerTest.scala     |  4 ++
 .../server/share/PartitionMetadataProvider.java    |  9 +--
 11 files changed, 150 insertions(+), 201 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java 
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index af37723c421..a770da515e8 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -17,7 +17,6 @@
 package kafka.server.share;
 
 import kafka.cluster.Partition;
-import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 
 import org.apache.kafka.common.TopicIdPartition;
@@ -27,11 +26,12 @@ import 
org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.raft.errors.NotLeaderException;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import org.apache.kafka.server.purgatory.DelayedOperation;
+import org.apache.kafka.server.share.LogReader;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
@@ -44,7 +44,6 @@ import org.apache.kafka.server.storage.log.FetchPartitionData;
 import org.apache.kafka.server.util.timer.TimerTask;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
-import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
 import org.apache.kafka.storage.internals.log.LogReadResult;
 import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
 import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
@@ -71,12 +70,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
-
-import scala.Tuple2;
-import scala.collection.Seq;
-import scala.jdk.javaapi.CollectionConverters;
-import scala.runtime.BoxedUnit;
 
 import static kafka.server.share.PendingRemoteFetches.RemoteFetch;
 
@@ -91,6 +84,8 @@ public class DelayedShareFetch extends DelayedOperation {
 
     private final ShareFetch shareFetch;
     private final ReplicaManager replicaManager;
+    private final LogReader logReader;
+    private final PartitionMetadataProvider metadataProvider;
     private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
     private final PartitionMaxBytesStrategy partitionMaxBytesStrategy;
     private final ShareGroupMetrics shareGroupMetrics;
@@ -130,6 +125,8 @@ public class DelayedShareFetch extends DelayedOperation {
     public DelayedShareFetch(
             ShareFetch shareFetch,
             ReplicaManager replicaManager,
+            LogReader logReader,
+            PartitionMetadataProvider metadataProvider,
             BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
             LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
             ShareGroupMetrics shareGroupMetrics,
@@ -138,6 +135,8 @@ public class DelayedShareFetch extends DelayedOperation {
     ) {
         this(shareFetch,
             replicaManager,
+            logReader,
+            metadataProvider,
             exceptionHandler,
             sharePartitions,
             
PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM),
@@ -166,6 +165,8 @@ public class DelayedShareFetch extends DelayedOperation {
     DelayedShareFetch(
         ShareFetch shareFetch,
         ReplicaManager replicaManager,
+        LogReader logReader,
+        PartitionMetadataProvider metadataProvider,
         BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
         LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
         PartitionMaxBytesStrategy partitionMaxBytesStrategy,
@@ -178,6 +179,8 @@ public class DelayedShareFetch extends DelayedOperation {
         super(shareFetch.fetchParams().maxWaitMs);
         this.shareFetch = shareFetch;
         this.replicaManager = replicaManager;
+        this.logReader = logReader;
+        this.metadataProvider = metadataProvider;
         this.partitionsAcquired = new LinkedHashMap<>();
         this.localPartitionsAlreadyFetched = new LinkedHashMap<>();
         this.exceptionHandler = exceptionHandler;
@@ -292,7 +295,7 @@ public class DelayedShareFetch extends DelayedOperation {
                 shareFetch,
                 shareFetchPartitionDataList,
                 sharePartitions,
-                replicaManager,
+                metadataProvider,
                 exceptionHandler
             ));
         } catch (Exception e) {
@@ -516,55 +519,15 @@ public class DelayedShareFetch extends DelayedOperation {
     }
 
     private LogOffsetMetadata 
endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
-        Partition partition = ShareFetchUtils.partition(replicaManager, 
topicIdPartition.topicPartition());
-        LogOffsetSnapshot offsetSnapshot = 
partition.fetchOffsetSnapshot(Optional.empty(), true);
-        // The FetchIsolation type that we use for share fetch is 
FetchIsolation.HIGH_WATERMARK. In the future, we can
-        // extend it to support other FetchIsolation types.
         FetchIsolation isolationType = shareFetch.fetchParams().isolation;
-        if (isolationType == FetchIsolation.LOG_END)
-            return offsetSnapshot.logEndOffset();
-        else if (isolationType == FetchIsolation.HIGH_WATERMARK)
-            return offsetSnapshot.highWatermark();
-        else
-            return offsetSnapshot.lastStableOffset();
+        return metadataProvider.endOffsetMetadata(topicIdPartition, 
isolationType);
     }
 
     private LinkedHashMap<TopicIdPartition, LogReadResult> 
readFromLog(LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
                                                                        
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
         // Filter if there already exists any erroneous topic partition.
         Set<TopicIdPartition> partitionsToFetch = 
shareFetch.filterErroneousTopicPartitions(topicPartitionFetchOffsets.keySet());
-        if (partitionsToFetch.isEmpty()) {
-            return new LinkedHashMap<>();
-        }
-
-        LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> 
topicPartitionData = new LinkedHashMap<>();
-
-        topicPartitionFetchOffsets.forEach((topicIdPartition, fetchOffset) -> 
topicPartitionData.put(topicIdPartition,
-            new FetchRequest.PartitionData(
-                topicIdPartition.topicId(),
-                fetchOffset,
-                0,
-                partitionMaxBytes.get(topicIdPartition),
-                Optional.empty())
-        ));
-
-        Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = 
replicaManager.readFromLog(
-            shareFetch.fetchParams(),
-            CollectionConverters.asScala(
-                partitionsToFetch.stream().map(topicIdPartition ->
-                    new Tuple2<>(topicIdPartition, 
topicPartitionData.get(topicIdPartition))).collect(Collectors.toList())
-            ),
-            QuotaFactory.UNBOUNDED_QUOTA,
-            true);
-
-        LinkedHashMap<TopicIdPartition, LogReadResult> responseData = new 
LinkedHashMap<>();
-        responseLogResult.foreach(tpLogResult -> {
-            responseData.put(tpLogResult._1(), tpLogResult._2());
-            return BoxedUnit.UNIT;
-        });
-
-        log.trace("Data successfully retrieved by replica manager: {}", 
responseData);
-        return responseData;
+        return logReader.read(shareFetch.fetchParams(), partitionsToFetch, 
topicPartitionFetchOffsets, partitionMaxBytes);
     }
 
     private boolean 
anyPartitionHasLogReadError(LinkedHashMap<TopicIdPartition, LogReadResult> 
replicaManagerReadResponse) {
@@ -946,7 +909,7 @@ public class DelayedShareFetch extends DelayedOperation {
                 
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) 
(acquiredRatio * 100));
 
             Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
remoteFetchResponse = ShareFetchUtils.processFetchResponse(
-                shareFetch, shareFetchPartitionDataList, sharePartitions, 
replicaManager, exceptionHandler);
+                shareFetch, shareFetchPartitionDataList, sharePartitions, 
metadataProvider, exceptionHandler);
             shareFetch.maybeComplete(remoteFetchResponse);
             log.trace("Remote share fetch request completed successfully, 
response: {}", remoteFetchResponse);
         } catch (InterruptedException | ExecutionException e) {
diff --git 
a/core/src/main/java/kafka/server/share/ReplicaManagerPartitionMetadataProvider.java
 
b/core/src/main/java/kafka/server/share/ReplicaManagerPartitionMetadataProvider.java
index 5c115a3c342..4d1d03d2ab1 100644
--- 
a/core/src/main/java/kafka/server/share/ReplicaManagerPartitionMetadataProvider.java
+++ 
b/core/src/main/java/kafka/server/share/ReplicaManagerPartitionMetadataProvider.java
@@ -53,6 +53,7 @@ public class ReplicaManagerPartitionMetadataProvider 
implements PartitionMetadat
 
     @Override
     public long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, 
int leaderEpoch) {
+        // Isolation level is only required when reading from the latest 
offset hence use Option.empty() for now.
         Optional<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
             topicIdPartition.topicPartition(), 
ListOffsetsRequest.EARLIEST_TIMESTAMP, scala.Option.empty(),
             Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
@@ -64,6 +65,7 @@ public class ReplicaManagerPartitionMetadataProvider 
implements PartitionMetadat
 
     @Override
     public long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, 
int leaderEpoch) {
+        // Isolation level is set to READ_UNCOMMITTED, matching with that used 
in share fetch requests.
         Optional<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
             topicIdPartition.topicPartition(), 
ListOffsetsRequest.LATEST_TIMESTAMP, new 
Some<>(IsolationLevel.READ_UNCOMMITTED),
             Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
@@ -85,20 +87,20 @@ public class ReplicaManagerPartitionMetadataProvider 
implements PartitionMetadat
     }
 
     @Override
-    public Optional<LogOffsetMetadata> endOffsetMetadata(TopicIdPartition 
topicIdPartition, FetchIsolation isolation) {
-        Partition partition = leaderPartition(topicIdPartition);
+    public LogOffsetMetadata endOffsetMetadata(TopicIdPartition 
topicIdPartition, FetchIsolation isolation) {
+        Partition partition = partition(topicIdPartition);
         LogOffsetSnapshot offsetSnapshot = 
partition.fetchOffsetSnapshot(Optional.empty(), true);
         if (isolation == FetchIsolation.LOG_END)
-            return Optional.of(offsetSnapshot.logEndOffset());
+            return offsetSnapshot.logEndOffset();
         else if (isolation == FetchIsolation.HIGH_WATERMARK)
-            return Optional.of(offsetSnapshot.highWatermark());
+            return offsetSnapshot.highWatermark();
         else
-            return Optional.of(offsetSnapshot.lastStableOffset());
+            return offsetSnapshot.lastStableOffset();
     }
 
     @Override
     public int leaderEpoch(TopicIdPartition topicIdPartition) {
-        return leaderPartition(topicIdPartition).getLeaderEpoch();
+        return partition(topicIdPartition).getLeaderEpoch();
     }
 
     @Override
@@ -111,7 +113,7 @@ public class ReplicaManagerPartitionMetadataProvider 
implements PartitionMetadat
         replicaManager.removeListener(topicIdPartition.topicPartition(), 
listener);
     }
 
-    private Partition leaderPartition(TopicIdPartition topicIdPartition) {
+    private Partition partition(TopicIdPartition topicIdPartition) {
         Partition partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
         if (!partition.isLeader()) {
             log.debug("The broker is not the leader for topic partition: {}", 
topicIdPartition.topicPartition());
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java 
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index ba3f03a4cca..461ca5df6b1 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -16,22 +16,14 @@
  */
 package kafka.server.share;
 
-import kafka.cluster.Partition;
-import kafka.server.ReplicaManager;
-
-import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
-import org.apache.kafka.common.errors.OffsetNotAvailableException;
 import org.apache.kafka.common.message.ShareFetchResponseData;
 import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.internal.FileRecords;
 import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.record.internal.Records;
-import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
 import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -46,12 +38,8 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.function.BiConsumer;
 
-import scala.Option;
-import scala.Some;
-
 /**
  * Utility class for post-processing of share fetch operations.
  */
@@ -66,7 +54,7 @@ public class ShareFetchUtils {
             ShareFetch shareFetch,
             List<ShareFetchPartitionData> shareFetchPartitionDataList,
             LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
-            ReplicaManager replicaManager,
+            PartitionMetadataProvider metadataProvider,
             BiConsumer<SharePartitionKey, Throwable> exceptionHandler
     ) {
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> response = 
new HashMap<>();
@@ -94,8 +82,8 @@ public class ShareFetchUtils {
                 // would be returned for other share partitions in the fetch 
request.
                 if (fetchPartitionData.error.code() == 
Errors.OFFSET_OUT_OF_RANGE.code()) {
                     try {
-                        
sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition,
-                            replicaManager, sharePartition.leaderEpoch()));
+                        
sharePartition.updateCacheAndOffsets(metadataProvider.offsetForEarliestTimestamp(
+                            topicIdPartition, sharePartition.leaderEpoch()));
                     } catch (Exception e) {
                         log.error("Error while fetching offset for earliest 
timestamp for topicIdPartition: {}", topicIdPartition, e);
                         shareFetch.addErroneous(topicIdPartition, e);
@@ -138,65 +126,6 @@ public class ShareFetchUtils {
         return response;
     }
 
-    /**
-     * The method is used to get the offset for the earliest timestamp for the 
topic-partition.
-     *
-     * @return The offset for the earliest timestamp.
-     */
-    static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, 
ReplicaManager replicaManager, int leaderEpoch) {
-        // Isolation level is only required when reading from the latest 
offset hence use Option.empty() for now.
-        Optional<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
-                topicIdPartition.topicPartition(), 
ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
-                Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
-        if (timestampAndOffset.isEmpty()) {
-            throw new OffsetNotAvailableException("Offset for earliest 
timestamp not found for topic partition: " + topicIdPartition);
-        }
-        return timestampAndOffset.get().offset;
-    }
-
-    /**
-     * The method is used to get the offset for the latest timestamp for the 
topic-partition.
-     *
-     * @return The offset for the latest timestamp.
-     */
-    static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, 
ReplicaManager replicaManager, int leaderEpoch) {
-        // Isolation level is set to READ_UNCOMMITTED, matching with that used 
in share fetch requests
-        Optional<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
-            topicIdPartition.topicPartition(), 
ListOffsetsRequest.LATEST_TIMESTAMP, new 
Some<>(IsolationLevel.READ_UNCOMMITTED),
-            Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
-        if (timestampAndOffset.isEmpty()) {
-            throw new OffsetNotAvailableException("Offset for latest timestamp 
not found for topic partition: " + topicIdPartition);
-        }
-        return timestampAndOffset.get().offset;
-    }
-
-    /**
-     * The method is used to get the offset for the given timestamp for the 
topic-partition.
-     *
-     * @return The offset for the given timestamp.
-     */
-    static long offsetForTimestamp(TopicIdPartition topicIdPartition, 
ReplicaManager replicaManager, long timestampToSearch, int leaderEpoch) {
-        Optional<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
-            topicIdPartition.topicPartition(), timestampToSearch, new 
Some<>(IsolationLevel.READ_UNCOMMITTED), Optional.of(leaderEpoch), 
true).timestampAndOffsetOpt();
-        if (timestampAndOffset.isEmpty()) {
-            throw new OffsetNotAvailableException("Offset for timestamp " + 
timestampToSearch + " not found for topic partition: " + topicIdPartition);
-        }
-        return timestampAndOffset.get().offset;
-    }
-
-    static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
-        return partition(replicaManager, tp).getLeaderEpoch();
-    }
-
-    static Partition partition(ReplicaManager replicaManager, TopicPartition 
tp) {
-        Partition partition = replicaManager.getPartitionOrException(tp);
-        if (!partition.isLeader()) {
-            log.debug("The broker is not the leader for topic partition: 
{}-{}", tp.topic(), tp.partition());
-            throw new NotLeaderOrFollowerException();
-        }
-        return partition;
-    }
-
     /**
      * Slice the fetch records based on the acquired records. The slicing is 
done based on the first
      * and last offset of the acquired records from the list. The slicing 
doesn't consider individual
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index bfa209b4819..4232eaa4336 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -16,7 +16,6 @@
  */
 package kafka.server.share;
 
-import kafka.server.ReplicaManager;
 import kafka.server.share.SharePartitionManager.SharePartitionListener;
 
 import org.apache.kafka.clients.consumer.AcknowledgeType;
@@ -41,6 +40,7 @@ import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
 import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
@@ -91,12 +91,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
-import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
-import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
-
 /**
  * The SharePartition is used to track the state of a partition that is shared 
between multiple
  * consumers. The class maintains the state of the records that have been 
fetched from the leader
@@ -263,10 +260,14 @@ public class SharePartition {
     private final AcquisitionLockTimeoutHandler timeoutHandler;
 
     /**
-     * The replica manager is used to check to see if any delayed share fetch 
request can be completed because of data
-     * availability due to acquisition lock timeout.
+     * The metadata provider is used to resolve metadata for partition.
+     */
+    private final PartitionMetadataProvider metadataProvider;
+
+    /**
+     * The delayed request notifier is used to complete delayed share fetch 
requests.
      */
-    private final ReplicaManager replicaManager;
+    private final Consumer<DelayedShareFetchKey> delayedRequestNotifier;
 
     /**
      * The share partition start offset specifies the partition start offset 
from which the records
@@ -349,14 +350,15 @@ public class SharePartition {
         Timer timer,
         Time time,
         Persister persister,
-        ReplicaManager replicaManager,
+        PartitionMetadataProvider metadataProvider,
+        Consumer<DelayedShareFetchKey> delayedRequestNotifier,
         ShareGroupConfigProvider configProvider,
         SharePartitionListener listener,
         Supplier<Boolean> shareGroupDlqEnableSupplier,
         ShareGroupDLQManager shareGroupDLQManager
     ) {
         this(groupId, topicIdPartition, leaderEpoch, 
defaultMaxInFlightRecords, defaultMaxDeliveryCount, defaultRecordLockDurationMs,
-            timer, time, persister, replicaManager, configProvider, 
SharePartitionState.EMPTY, listener,
+            timer, time, persister, metadataProvider, delayedRequestNotifier, 
configProvider, SharePartitionState.EMPTY, listener,
             new SharePartitionMetrics(groupId, topicIdPartition.topic(), 
topicIdPartition.partition()), shareGroupDlqEnableSupplier,
             shareGroupDLQManager);
     }
@@ -373,7 +375,8 @@ public class SharePartition {
         Timer timer,
         Time time,
         Persister persister,
-        ReplicaManager replicaManager,
+        PartitionMetadataProvider metadataProvider,
+        Consumer<DelayedShareFetchKey> delayedRequestNotifier,
         ShareGroupConfigProvider configProvider,
         SharePartitionState sharePartitionState,
         SharePartitionListener listener,
@@ -396,7 +399,8 @@ public class SharePartition {
         this.loadStartTimeMs = time.hiResClockMs();
         this.persister = persister;
         this.partitionState = sharePartitionState;
-        this.replicaManager = replicaManager;
+        this.metadataProvider = metadataProvider;
+        this.delayedRequestNotifier = delayedRequestNotifier;
         this.configProvider = configProvider;
         this.fetchOffsetMetadata = new OffsetMetadata();
         this.delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId, 
topicIdPartition);
@@ -3121,7 +3125,7 @@ public class SharePartition {
 
     private void maybeCompleteDelayedShareFetchRequest(boolean shouldComplete) 
{
         if (shouldComplete) {
-            
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
+            delayedRequestNotifier.accept(delayedShareFetchKey);
         }
     }
 
@@ -3133,12 +3137,12 @@ public class SharePartition {
         ShareGroupAutoOffsetResetStrategy offsetResetStrategy = 
configProvider.autoOffsetReset(groupId);
 
         if (offsetResetStrategy.type() == 
ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
-            return offsetForLatestTimestamp(topicIdPartition, replicaManager, 
leaderEpoch);
+            return metadataProvider.offsetForLatestTimestamp(topicIdPartition, 
leaderEpoch);
         } else if (offsetResetStrategy.type() == 
ShareGroupAutoOffsetResetStrategy.StrategyType.EARLIEST) {
-            return offsetForEarliestTimestamp(topicIdPartition, 
replicaManager, leaderEpoch);
+            return 
metadataProvider.offsetForEarliestTimestamp(topicIdPartition, leaderEpoch);
         } else {
             // offsetResetStrategy type is BY_DURATION
-            return offsetForTimestamp(topicIdPartition, replicaManager, 
offsetResetStrategy.timestamp(), leaderEpoch);
+            return metadataProvider.offsetForTimestamp(topicIdPartition, 
offsetResetStrategy.timestamp(), leaderEpoch);
         }
     }
 
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java 
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 531f5278bc5..0472fce579d 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -35,6 +35,8 @@ import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
 import org.apache.kafka.server.common.ShareVersion;
 import org.apache.kafka.server.partition.PartitionListener;
 import org.apache.kafka.server.share.CachedSharePartition;
+import org.apache.kafka.server.share.LogReader;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
 import org.apache.kafka.server.share.ShareGroupListener;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
@@ -95,6 +97,21 @@ public class SharePartitionManager implements AutoCloseable {
      */
     private final ReplicaManager replicaManager;
 
+    /**
+     * The log reader is used to read records from the log.
+     */
+    private final LogReader logReader;
+
+    /**
+     * The metadata provider is used to resolve metadata for partition.
+     */
+    private final PartitionMetadataProvider metadataProvider;
+
+    /**
+     * The delayed request notifier is used to complete delayed share fetch 
requests.
+     */
+    private final Consumer<DelayedShareFetchKey> delayedRequestNotifier;
+
     /**
      * The time instance is used to get the current time.
      */
@@ -247,6 +264,9 @@ public class SharePartitionManager implements AutoCloseable 
{
             ShareGroupDLQManager shareGroupDLQManager
     ) {
         this.replicaManager = replicaManager;
+        this.logReader = new ReplicaManagerLogReader(replicaManager);
+        this.metadataProvider = new 
ReplicaManagerPartitionMetadataProvider(replicaManager);
+        this.delayedRequestNotifier = 
replicaManager::completeDelayedShareFetchRequest;
         this.time = time;
         this.cache = cache;
         this.partitionCache = partitionCache;
@@ -598,7 +618,7 @@ public class SharePartitionManager implements AutoCloseable 
{
             Set<SharePartitionKey> sharePartitionKeys = 
partitionCache.cachedSharePartitionKeys();
             // Remove all share partitions from partition cache.
             sharePartitionKeys.forEach(sharePartitionKey ->
-                removeSharePartitionFromCache(sharePartitionKey, 
partitionCache, replicaManager)
+                removeSharePartitionFromCache(sharePartitionKey, 
partitionCache, metadataProvider, delayedRequestNotifier)
             );
         }
     }
@@ -720,19 +740,19 @@ public class SharePartitionManager implements 
AutoCloseable {
         // Add the share fetch to the delayed share fetch purgatory to process 
the fetch request.
         // The request will be added irrespective of whether the share 
partition is initialized or not.
         // Once the share partition is initialized, the delayed share fetch 
will be completed.
-        addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, 
fencedSharePartitionHandler(), sharePartitions, shareGroupMetrics, time, 
remoteFetchMaxWaitMs), delayedShareFetchWatchKeys);
+        addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager, 
logReader, metadataProvider, fencedSharePartitionHandler(), sharePartitions, 
shareGroupMetrics, time, remoteFetchMaxWaitMs), delayedShareFetchWatchKeys);
     }
 
     private SharePartition getOrCreateSharePartition(SharePartitionKey 
sharePartitionKey) {
         return partitionCache.computeIfAbsent(sharePartitionKey,
                 k -> {
-                    int leaderEpoch = 
ShareFetchUtils.leaderEpoch(replicaManager, 
sharePartitionKey.topicIdPartition().topicPartition());
+                    int leaderEpoch = 
metadataProvider.leaderEpoch(sharePartitionKey.topicIdPartition());
                     // Attach listener to Partition which shall invoke 
partition change handlers.
                     // However, as there could be multiple share partitions 
(per group name) for a single topic-partition,
                     // hence create separate listeners per share partition 
which holds the share partition key
                     // to identify the respective share partition.
-                    SharePartitionListener listener = new 
SharePartitionListener(sharePartitionKey, replicaManager, partitionCache);
-                    
replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(),
 listener);
+                    SharePartitionListener listener = new 
SharePartitionListener(sharePartitionKey, metadataProvider, 
delayedRequestNotifier, partitionCache);
+                    
metadataProvider.addPartitionListener(sharePartitionKey.topicIdPartition(), 
listener);
                     return new SharePartition(
                             sharePartitionKey.groupId(),
                             sharePartitionKey.topicIdPartition(),
@@ -743,7 +763,8 @@ public class SharePartitionManager implements AutoCloseable 
{
                             timer,
                             time,
                             persister,
-                            replicaManager,
+                            metadataProvider,
+                            delayedRequestNotifier,
                             configProvider,
                             listener,
                             shareGroupDlqEnableSupplier,
@@ -765,7 +786,7 @@ public class SharePartitionManager implements AutoCloseable 
{
         }
 
         // Remove the partition from the cache as it's failed to initialize.
-        removeSharePartitionFromCache(sharePartitionKey, partitionCache, 
replicaManager);
+        removeSharePartitionFromCache(sharePartitionKey, partitionCache, 
metadataProvider, delayedRequestNotifier);
         // The partition initialization failed, so add the partition to the 
erroneous partitions.
         log.debug("Error initializing share partition with key {}", 
sharePartitionKey, throwable);
         shareFetch.addErroneous(sharePartitionKey.topicIdPartition(), 
throwable);
@@ -785,7 +806,7 @@ public class SharePartitionManager implements AutoCloseable 
{
                 // The share partition is fenced hence remove the partition 
from map and let the client retry.
                 // But surface the error to the client so client might take 
some action i.e. re-fetch
                 // the metadata and retry the fetch on new leader.
-                removeSharePartitionFromCache(sharePartitionKey, 
partitionCache, replicaManager);
+                removeSharePartitionFromCache(sharePartitionKey, 
partitionCache, metadataProvider, delayedRequestNotifier);
             }
         };
     }
@@ -804,13 +825,14 @@ public class SharePartitionManager implements 
AutoCloseable {
     private static void removeSharePartitionFromCache(
         SharePartitionKey sharePartitionKey,
         SharePartitionCache partitionCache,
-        ReplicaManager replicaManager
+        PartitionMetadataProvider metadataProvider,
+        Consumer<DelayedShareFetchKey> delayedRequestNotifier
     ) {
         SharePartition sharePartition = 
partitionCache.remove(sharePartitionKey);
         if (sharePartition != null) {
             sharePartition.markFenced();
-            
replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(),
 sharePartition.listener());
-            replicaManager.completeDelayedShareFetchRequest(new 
DelayedShareFetchGroupKey(sharePartitionKey.groupId(), 
sharePartitionKey.topicIdPartition()));
+            
metadataProvider.removePartitionListener(sharePartitionKey.topicIdPartition(), 
sharePartition.listener());
+            delayedRequestNotifier.accept(new 
DelayedShareFetchGroupKey(sharePartitionKey.groupId(), 
sharePartitionKey.topicIdPartition()));
         }
     }
 
@@ -840,16 +862,19 @@ public class SharePartitionManager implements 
AutoCloseable {
     static class SharePartitionListener implements PartitionListener {
 
         private final SharePartitionKey sharePartitionKey;
-        private final ReplicaManager replicaManager;
+        private final PartitionMetadataProvider metadataProvider;
+        private final Consumer<DelayedShareFetchKey> delayedRequestNotifier;
         private final SharePartitionCache partitionCache;
 
         SharePartitionListener(
             SharePartitionKey sharePartitionKey,
-            ReplicaManager replicaManager,
+            PartitionMetadataProvider metadataProvider,
+            Consumer<DelayedShareFetchKey> delayedRequestNotifier,
             SharePartitionCache partitionCache
         ) {
             this.sharePartitionKey = sharePartitionKey;
-            this.replicaManager = replicaManager;
+            this.metadataProvider = metadataProvider;
+            this.delayedRequestNotifier = delayedRequestNotifier;
             this.partitionCache = partitionCache;
         }
 
@@ -880,7 +905,7 @@ public class SharePartitionManager implements AutoCloseable 
{
                     topicPartition, sharePartitionKey);
                 return;
             }
-            removeSharePartitionFromCache(sharePartitionKey, partitionCache, 
replicaManager);
+            removeSharePartitionFromCache(sharePartitionKey, partitionCache, 
metadataProvider, delayedRequestNotifier);
         }
     }
 
@@ -905,7 +930,7 @@ public class SharePartitionManager implements AutoCloseable 
{
             if (topicIdPartitions != null) {
                 // Remove all share partitions from partition cache.
                 topicIdPartitions.forEach(topicIdPartition ->
-                    removeSharePartitionFromCache(new 
SharePartitionKey(groupId, topicIdPartition), partitionCache, replicaManager)
+                    removeSharePartitionFromCache(new 
SharePartitionKey(groupId, topicIdPartition), partitionCache, metadataProvider, 
delayedRequestNotifier)
                 );
             }
         }
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java 
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index e5d95ff38c0..ba50773b4ac 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -36,6 +36,8 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogManager;
 import org.apache.kafka.server.purgatory.DelayedOperationKey;
 import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
 import org.apache.kafka.server.quota.ReplicaQuota;
+import org.apache.kafka.server.share.LogReader;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
 import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
@@ -2420,6 +2422,8 @@ public class DelayedShareFetchTest {
     static class DelayedShareFetchBuilder {
         private ShareFetch shareFetch = mock(ShareFetch.class);
         private ReplicaManager replicaManager = mock(ReplicaManager.class);
+        private LogReader logReader = mock(LogReader.class);
+        private PartitionMetadataProvider metadataProvider = 
mock(PartitionMetadataProvider.class);
         private BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mockExceptionHandler();
         private LinkedHashMap<TopicIdPartition, SharePartition> 
sharePartitions = mock(LinkedHashMap.class);
         private PartitionMaxBytesStrategy partitionMaxBytesStrategy = 
mock(PartitionMaxBytesStrategy.class);
@@ -2435,6 +2439,8 @@ public class DelayedShareFetchTest {
 
         DelayedShareFetchBuilder withReplicaManager(ReplicaManager 
replicaManager) {
             this.replicaManager = replicaManager;
+            this.logReader = new ReplicaManagerLogReader(replicaManager);
+            this.metadataProvider = new 
ReplicaManagerPartitionMetadataProvider(replicaManager);
             return this;
         }
 
@@ -2481,6 +2487,8 @@ public class DelayedShareFetchTest {
             return new DelayedShareFetch(
                 shareFetch,
                 replicaManager,
+                logReader,
+                metadataProvider,
                 exceptionHandler,
                 sharePartitions,
                 partitionMaxBytesStrategy,
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java 
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 08ce763b8ad..c71fc46ebb5 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.record.internal.RecordBatch;
 import org.apache.kafka.common.record.internal.Records;
 import org.apache.kafka.common.record.internal.SimpleRecord;
 import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
 import org.apache.kafka.server.share.SharePartitionKey;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
 import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -143,7 +144,7 @@ public class ShareFetchUtilsTest {
                 OptionalInt.empty(), false))
         );
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
=
-                ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions, mock(ReplicaManager.class), EXCEPTION_HANDLER);
+                ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions, mock(PartitionMetadataProvider.class), EXCEPTION_HANDLER);
 
         assertEquals(2, resultData.size());
         assertTrue(resultData.containsKey(tp0));
@@ -192,7 +193,7 @@ public class ShareFetchUtilsTest {
                 OptionalInt.empty(), false))
         );
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
=
-            ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions, mock(ReplicaManager.class), EXCEPTION_HANDLER);
+            ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions, mock(PartitionMetadataProvider.class), EXCEPTION_HANDLER);
 
         assertEquals(2, resultData.size());
         assertTrue(resultData.containsKey(tp0));
@@ -255,7 +256,7 @@ public class ShareFetchUtilsTest {
                 OptionalInt.empty(), false))
         );
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
resultData1 =
-            ShareFetchUtils.processFetchResponse(shareFetch, responseData1, 
sharePartitions, replicaManager, EXCEPTION_HANDLER);
+            ShareFetchUtils.processFetchResponse(shareFetch, responseData1, 
sharePartitions, new ReplicaManagerPartitionMetadataProvider(replicaManager), 
EXCEPTION_HANDLER);
 
         assertEquals(2, resultData1.size());
         assertTrue(resultData1.containsKey(tp0));
@@ -285,7 +286,7 @@ public class ShareFetchUtilsTest {
                 OptionalInt.empty(), false))
         );
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> 
resultData2 =
-            ShareFetchUtils.processFetchResponse(shareFetch, responseData2, 
sharePartitions, replicaManager, EXCEPTION_HANDLER);
+            ShareFetchUtils.processFetchResponse(shareFetch, responseData2, 
sharePartitions, new ReplicaManagerPartitionMetadataProvider(replicaManager), 
EXCEPTION_HANDLER);
 
         assertEquals(2, resultData2.size());
         assertTrue(resultData2.containsKey(tp0));
@@ -334,7 +335,7 @@ public class ShareFetchUtilsTest {
                 OptionalInt.empty(), false)));
 
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
=
-            ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions, replicaManager, EXCEPTION_HANDLER);
+            ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions, new ReplicaManagerPartitionMetadataProvider(replicaManager), 
EXCEPTION_HANDLER);
 
         assertEquals(1, resultData.size());
         assertTrue(resultData.containsKey(tp0));
@@ -349,7 +350,7 @@ public class ShareFetchUtilsTest {
                 records, Optional.empty(), OptionalLong.empty(), 
Optional.empty(),
                 OptionalInt.empty(), false)));
 
-        resultData = ShareFetchUtils.processFetchResponse(shareFetch, 
responseData, sharePartitions, replicaManager, EXCEPTION_HANDLER);
+        resultData = ShareFetchUtils.processFetchResponse(shareFetch, 
responseData, sharePartitions, new 
ReplicaManagerPartitionMetadataProvider(replicaManager), EXCEPTION_HANDLER);
 
         assertEquals(1, resultData.size());
         assertTrue(resultData.containsKey(tp0));
@@ -416,7 +417,7 @@ public class ShareFetchUtilsTest {
 
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
=
             ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions,
-                mock(ReplicaManager.class), EXCEPTION_HANDLER);
+                mock(PartitionMetadataProvider.class), EXCEPTION_HANDLER);
 
         assertEquals(2, resultData.size());
         assertTrue(resultData.containsKey(tp0));
@@ -465,7 +466,7 @@ public class ShareFetchUtilsTest {
         BiConsumer<SharePartitionKey, Throwable> exceptionHandler = 
mock(BiConsumer.class);
         Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData 
=
             ShareFetchUtils.processFetchResponse(shareFetch, responseData, 
sharePartitions,
-                replicaManager, exceptionHandler);
+                new ReplicaManagerPartitionMetadataProvider(replicaManager), 
exceptionHandler);
 
         assertTrue(resultData.isEmpty());
         Mockito.verify(shareFetch, times(1)).addErroneous(tp0, exception);
diff --git 
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index d1a0d98eb65..eb6965bda32 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -2788,7 +2788,9 @@ public class SharePartitionManagerTest {
         SharePartitionCache partitionCache = new SharePartitionCache();
         ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
 
-        SharePartitionListener partitionListener = new 
SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
+        SharePartitionListener partitionListener = new 
SharePartitionListener(sharePartitionKey,
+            new ReplicaManagerPartitionMetadataProvider(mockReplicaManager),
+            mockReplicaManager::completeDelayedShareFetchRequest, 
partitionCache);
         testSharePartitionListener(sharePartitionKey, partitionCache, 
mockReplicaManager, partitionListener::onFailed);
     }
 
@@ -2799,7 +2801,9 @@ public class SharePartitionManagerTest {
         SharePartitionCache partitionCache = new SharePartitionCache();
         ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
 
-        SharePartitionListener partitionListener = new 
SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
+        SharePartitionListener partitionListener = new 
SharePartitionListener(sharePartitionKey,
+            new ReplicaManagerPartitionMetadataProvider(mockReplicaManager),
+            mockReplicaManager::completeDelayedShareFetchRequest, 
partitionCache);
         testSharePartitionListener(sharePartitionKey, partitionCache, 
mockReplicaManager, partitionListener::onDeleted);
     }
 
@@ -2810,7 +2814,9 @@ public class SharePartitionManagerTest {
         SharePartitionCache partitionCache = new SharePartitionCache();
         ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
 
-        SharePartitionListener partitionListener = new 
SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
+        SharePartitionListener partitionListener = new 
SharePartitionListener(sharePartitionKey,
+            new ReplicaManagerPartitionMetadataProvider(mockReplicaManager),
+            mockReplicaManager::completeDelayedShareFetchRequest, 
partitionCache);
         testSharePartitionListener(sharePartitionKey, partitionCache, 
mockReplicaManager, partitionListener::onBecomingFollower);
     }
 
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 61480cd73bb..570e09115ab 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -55,11 +55,13 @@ import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.coordinator.group.GroupConfigManager;
 import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import 
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
 import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
+import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
 import org.apache.kafka.server.share.fetch.InFlightBatch;
 import org.apache.kafka.server.share.fetch.InFlightState;
 import org.apache.kafka.server.share.fetch.RecordState;
@@ -105,6 +107,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
@@ -242,7 +245,7 @@ public class SharePartitionTest {
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
             .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
             .build();
 
         CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -293,7 +296,7 @@ public class SharePartitionTest {
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
             .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
             .build();
 
         CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -354,7 +357,7 @@ public class SharePartitionTest {
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
             .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
             .withSharePartitionMetrics(sharePartitionMetrics)
             .build();
 
@@ -409,7 +412,7 @@ public class SharePartitionTest {
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
             .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
             .build();
 
         CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -456,7 +459,7 @@ public class SharePartitionTest {
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
             .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
             .build();
 
         CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -502,7 +505,7 @@ public class SharePartitionTest {
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
             .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
             .build();
 
         CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -556,7 +559,7 @@ public class SharePartitionTest {
         SharePartition sharePartition = SharePartitionBuilder.builder()
             .withPersister(persister)
             .withConfigProvider(new 
ShareGroupConfigProvider(groupConfigManager))
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
             .build();
 
         CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -861,7 +864,7 @@ public class SharePartitionTest {
         Mockito.doReturn(new 
OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).
             
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class), 
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
 
-        SharePartition sharePartition = 
SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager)).build();
         CompletableFuture<Void> result = sharePartition.maybeInitialize();
         assertTrue(result.isDone());
         assertFalse(result.isCompletedExceptionally());
@@ -2712,7 +2715,7 @@ public class SharePartitionTest {
             .thenReturn(140L) // for subsequent lock acquire
             .thenReturn(170L); // for subsequent lock release
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
             .withTime(time)
             .withSharePartitionMetrics(sharePartitionMetrics)
             .build();
@@ -2789,7 +2792,7 @@ public class SharePartitionTest {
     public void testAcknowledgeSingleRecordBatch() {
         ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -2824,7 +2827,8 @@ public class SharePartitionTest {
     public void testAcknowledgeMultipleRecordBatch() {
         ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
+            
.withDelayedRequestNotifier(replicaManager::completeDelayedShareFetchRequest)
             .withState(SharePartitionState.ACTIVE)
             .build();
         MemoryRecords records = memoryRecords(5, 10);
@@ -12158,7 +12162,7 @@ public class SharePartitionTest {
     public void testInvalidAcknowledgeTypeInBatchAcknowledgement() {
         ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -12178,7 +12182,7 @@ public class SharePartitionTest {
     public void testInvalidAcknowledgeTypeInSubsetAcknowledgement() {
         ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
         SharePartition sharePartition = SharePartitionBuilder.builder()
-            .withReplicaManager(replicaManager)
+            .withMetadataProvider(new 
ReplicaManagerPartitionMetadataProvider(replicaManager))
             .withState(SharePartitionState.ACTIVE)
             .build();
 
@@ -13313,7 +13317,8 @@ public class SharePartitionTest {
         private int defaultMaxInflightRecords = DEFAULT_MAX_IN_FLIGHT_RECORDS;
 
         private Persister persister = new NoOpStatePersister();
-        private ReplicaManager replicaManager = 
Mockito.mock(ReplicaManager.class);
+        private PartitionMetadataProvider metadataProvider = 
Mockito.mock(PartitionMetadataProvider.class);
+        private Consumer<DelayedShareFetchKey> delayedRequestNotifier = 
Mockito.mock(Consumer.class);
         private ShareGroupConfigProvider configProvider = new 
ShareGroupConfigProvider(Mockito.mock(GroupConfigManager.class));
         private SharePartitionState state = SharePartitionState.EMPTY;
         private Time time = MOCK_TIME;
@@ -13341,8 +13346,13 @@ public class SharePartitionTest {
             return this;
         }
 
-        private SharePartitionBuilder withReplicaManager(ReplicaManager 
replicaManager) {
-            this.replicaManager = replicaManager;
+        private SharePartitionBuilder 
withMetadataProvider(PartitionMetadataProvider metadataProvider) {
+            this.metadataProvider = metadataProvider;
+            return this;
+        }
+
+        private SharePartitionBuilder 
withDelayedRequestNotifier(Consumer<DelayedShareFetchKey> 
delayedRequestNotifier) {
+            this.delayedRequestNotifier = delayedRequestNotifier;
             return this;
         }
 
@@ -13382,7 +13392,7 @@ public class SharePartitionTest {
 
         public SharePartition build() {
             return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0, 
defaultMaxInflightRecords, defaultMaxDeliveryCount,
-                defaultAcquisitionLockTimeoutMs, mockTimer, time, persister, 
replicaManager, configProvider,
+                defaultAcquisitionLockTimeoutMs, mockTimer, time, persister, 
metadataProvider, delayedRequestNotifier, configProvider,
                 state, Mockito.mock(SharePartitionListener.class), 
sharePartitionMetrics, shareGroupDlqEnableSupplier,
                 shareGroupDLQManager);
         }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0bf4edd59c2..6c2aa46c3d3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -47,6 +47,8 @@ import 
org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
 import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
 import org.apache.kafka.common.replica.{ClientMetadata, PartitionView, 
ReplicaSelector, ReplicaView}
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.server.share.LogReader;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
@@ -5982,6 +5984,8 @@ class ReplicaManagerTest {
       val delayedShareFetch = spy(new DelayedShareFetch(
         shareFetch,
         rm,
+        mock(classOf[LogReader]),
+        mock(classOf[PartitionMetadataProvider]),
         mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
         sharePartitions,
         mock(classOf[ShareGroupMetrics]),
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/PartitionMetadataProvider.java
 
b/server/src/main/java/org/apache/kafka/server/share/PartitionMetadataProvider.java
index 0dce70d6359..ee1064d79b3 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/PartitionMetadataProvider.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/PartitionMetadataProvider.java
@@ -21,8 +21,6 @@ import org.apache.kafka.server.partition.PartitionListener;
 import org.apache.kafka.server.storage.log.FetchIsolation;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
 
-import java.util.Optional;
-
 /**
  * Abstraction for partition metadata operations.
  */
@@ -44,12 +42,11 @@ public interface PartitionMetadataProvider {
     long offsetForTimestamp(TopicIdPartition topicIdPartition, long timestamp, 
int leaderEpoch);
 
     /**
-     * Get the end offset metadata for minBytes estimation.
+     * Get the end offset metadata for partition.
      *
-     * @return The end offset metadata based on the given fetch isolation, or
-     *         {@link Optional#empty()} when no local partition metadata is 
available.
+     * @return The end offset metadata based on the given fetch isolation.
      */
-    Optional<LogOffsetMetadata> endOffsetMetadata(TopicIdPartition 
topicIdPartition, FetchIsolation isolation);
+    LogOffsetMetadata endOffsetMetadata(TopicIdPartition topicIdPartition, 
FetchIsolation isolation);
 
     /**
      * Get the leader epoch for a partition.

Reply via email to