This is an automated email from the ASF dual-hosted git repository.
apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 32e49cee129 KAFKA-20610: Move abstraction instantiation to
BrokerServer for share fetch (3/N) (#22414)
32e49cee129 is described below
commit 32e49cee129241fbab48c4754262c9a3f801bb94
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri May 29 18:34:01 2026 +0100
KAFKA-20610: Move abstraction instantiation to BrokerServer for share fetch
(3/N) (#22414)
Moves the instantiation of LogReader, PartitionMetadataProvider, and
Consumer<DelayedShareFetchKey> from SharePartitionManager to
BrokerServer.
#### What Changed:
- BrokerServer now creates ReplicaManagerLogReader,
ReplicaManagerPartitionMetadataProvider, and delayed request notifier
- Test builder updated to create and inject dependencies
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/server/share/SharePartitionManager.java | 28 +++++++++++++++++-----
.../src/main/scala/kafka/server/BrokerServer.scala | 6 ++++-
.../server/share/SharePartitionManagerTest.java | 3 +++
3 files changed, 30 insertions(+), 7 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 0472fce579d..a8eff11ac90 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -177,8 +177,12 @@ public class SharePartitionManager implements
AutoCloseable {
*/
private final ShareGroupDLQManager shareGroupDLQManager;
+ @SuppressWarnings("ParameterNumber")
public SharePartitionManager(
ReplicaManager replicaManager,
+ LogReader logReader,
+ PartitionMetadataProvider metadataProvider,
+ Consumer<DelayedShareFetchKey> delayedRequestNotifier,
Time time,
ShareSessionCache cache,
int defaultRecordLockDurationMs,
@@ -192,6 +196,9 @@ public class SharePartitionManager implements AutoCloseable
{
ShareGroupDLQManager shareGroupDLQManager
) {
this(replicaManager,
+ logReader,
+ metadataProvider,
+ delayedRequestNotifier,
time,
cache,
new SharePartitionCache(),
@@ -211,6 +218,9 @@ public class SharePartitionManager implements AutoCloseable
{
@SuppressWarnings("ParameterNumber")
private SharePartitionManager(
ReplicaManager replicaManager,
+ LogReader logReader,
+ PartitionMetadataProvider metadataProvider,
+ Consumer<DelayedShareFetchKey> delayedRequestNotifier,
Time time,
ShareSessionCache cache,
SharePartitionCache partitionCache,
@@ -226,6 +236,9 @@ public class SharePartitionManager implements AutoCloseable
{
ShareGroupDLQManager shareGroupDLQManager
) {
this(replicaManager,
+ logReader,
+ metadataProvider,
+ delayedRequestNotifier,
time,
cache,
partitionCache,
@@ -248,6 +261,9 @@ public class SharePartitionManager implements AutoCloseable
{
@SuppressWarnings("ParameterNumber")
SharePartitionManager(
ReplicaManager replicaManager,
+ LogReader logReader,
+ PartitionMetadataProvider metadataProvider,
+ Consumer<DelayedShareFetchKey> delayedRequestNotifier,
Time time,
ShareSessionCache cache,
SharePartitionCache partitionCache,
@@ -264,9 +280,9 @@ public class SharePartitionManager implements AutoCloseable
{
ShareGroupDLQManager shareGroupDLQManager
) {
this.replicaManager = replicaManager;
- this.logReader = new ReplicaManagerLogReader(replicaManager);
- this.metadataProvider = new
ReplicaManagerPartitionMetadataProvider(replicaManager);
- this.delayedRequestNotifier =
replicaManager::completeDelayedShareFetchRequest;
+ this.logReader = logReader;
+ this.metadataProvider = metadataProvider;
+ this.delayedRequestNotifier = delayedRequestNotifier;
this.time = time;
this.cache = cache;
this.partitionCache = partitionCache;
@@ -369,7 +385,7 @@ public class SharePartitionManager implements AutoCloseable
{
// If we have an acknowledgement completed for a
topic-partition, then we should check if
// there is a pending share fetch request for the
topic-partition and complete it.
DelayedShareFetchKey delayedShareFetchKey = new
DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(),
topicIdPartition.partition());
-
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
+ delayedRequestNotifier.accept(delayedShareFetchKey);
futures.put(topicIdPartition, future);
} else {
@@ -437,7 +453,7 @@ public class SharePartitionManager implements AutoCloseable
{
// If we have a release acquired request completed for a
topic-partition, then we should check if
// there is a pending share fetch request for the
topic-partition and complete it.
DelayedShareFetchKey delayedShareFetchKey = new
DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(),
topicIdPartition.partition());
-
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
+ delayedRequestNotifier.accept(delayedShareFetchKey);
futuresMap.put(topicIdPartition, future);
}
@@ -718,7 +734,7 @@ public class SharePartitionManager implements AutoCloseable
{
// for the share partition.
if (!initialized) {
shareGroupMetrics.partitionLoadTime(sharePartition.loadStartTimeMs());
-
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
+ delayedRequestNotifier.accept(delayedShareFetchKey);
}
});
sharePartitions.put(topicIdPartition, sharePartition);
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 04a1ee7672c..63a0472ddf0 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -23,7 +23,7 @@ import kafka.log.LogManager
import kafka.network.SocketServer
import kafka.raft.KafkaRaftManager
import kafka.server.metadata._
-import kafka.server.share.{ShareCoordinatorMetadataCacheHelperImpl,
SharePartitionManager}
+import kafka.server.share.{ReplicaManagerPartitionMetadataProvider,
ReplicaManagerLogReader, ShareCoordinatorMetadataCacheHelperImpl,
SharePartitionManager}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -53,6 +53,7 @@ import
org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager,
RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin,
KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
+import org.apache.kafka.server.share.fetch.DelayedShareFetchKey
import org.apache.kafka.server.share.persister.{DefaultStatePersister,
NoOpStatePersister, Persister}
import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper,
Timer}
@@ -460,6 +461,9 @@ class BrokerServer(
sharePartitionManager = new SharePartitionManager(
replicaManager,
+ new ReplicaManagerLogReader(replicaManager),
+ new ReplicaManagerPartitionMetadataProvider(replicaManager),
+ (key: DelayedShareFetchKey) =>
replicaManager.completeDelayedShareFetchRequest(key),
time,
shareFetchSessionCache,
config.shareGroupConfig.shareGroupRecordLockDurationMs,
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index eb6965bda32..3bde6678921 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -3310,6 +3310,9 @@ public class SharePartitionManagerTest {
public SharePartitionManager build() {
return new SharePartitionManager(replicaManager,
+ new ReplicaManagerLogReader(replicaManager),
+ new ReplicaManagerPartitionMetadataProvider(replicaManager),
+ replicaManager::completeDelayedShareFetchRequest,
time,
cache,
partitionCache,