This is an automated email from the ASF dual-hosted git repository.
apoorvmittal10 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f8705000ee2 KAFKA-20610: Wire abstraction interfaces into share group
classes (2/N) (#22407)
f8705000ee2 is described below
commit f8705000ee2ee2e78027c6ca1d36f8fae8890d8a
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri May 29 13:56:16 2026 +0100
KAFKA-20610: Wire abstraction interfaces into share group classes (2/N)
(#22407)
This PR integrates the `LogReader` and `PartitionMetadataProvider`
abstractions (introduced in #22389) into the share fetch path, replacing
direct ReplicaManager dependencies in DelayedShareFetch, SharePartition,
SharePartitionManager, and ShareFetchUtils.
#### What Changed
Core classes now accept LogReader and PartitionMetadataProvider
abstractions instead of ReplicaManager. SharePartitionManager
instantiates the default ReplicaManager-backed implementations and wires
them through the dependency graph.
SharePartition uses a Consumer<DelayedShareFetchKey> for delayed
request notifications instead of calling ReplicaManager directly.
Reviewers: Andrew Schofield <[email protected]>
---
.../java/kafka/server/share/DelayedShareFetch.java | 69 +++++--------------
.../ReplicaManagerPartitionMetadataProvider.java | 16 +++--
.../java/kafka/server/share/ShareFetchUtils.java | 79 ++--------------------
.../java/kafka/server/share/SharePartition.java | 36 +++++-----
.../kafka/server/share/SharePartitionManager.java | 57 +++++++++++-----
.../kafka/server/share/DelayedShareFetchTest.java | 8 +++
.../kafka/server/share/ShareFetchUtilsTest.java | 17 ++---
.../server/share/SharePartitionManagerTest.java | 12 +++-
.../kafka/server/share/SharePartitionTest.java | 44 +++++++-----
.../unit/kafka/server/ReplicaManagerTest.scala | 4 ++
.../server/share/PartitionMetadataProvider.java | 9 +--
11 files changed, 150 insertions(+), 201 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
index af37723c421..a770da515e8 100644
--- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java
+++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java
@@ -17,7 +17,6 @@
package kafka.server.share;
import kafka.cluster.Partition;
-import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import org.apache.kafka.common.TopicIdPartition;
@@ -27,11 +26,12 @@ import
org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperation;
+import org.apache.kafka.server.share.LogReader;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey;
@@ -44,7 +44,6 @@ import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
-import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
@@ -71,12 +70,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
-
-import scala.Tuple2;
-import scala.collection.Seq;
-import scala.jdk.javaapi.CollectionConverters;
-import scala.runtime.BoxedUnit;
import static kafka.server.share.PendingRemoteFetches.RemoteFetch;
@@ -91,6 +84,8 @@ public class DelayedShareFetch extends DelayedOperation {
private final ShareFetch shareFetch;
private final ReplicaManager replicaManager;
+ private final LogReader logReader;
+ private final PartitionMetadataProvider metadataProvider;
private final BiConsumer<SharePartitionKey, Throwable> exceptionHandler;
private final PartitionMaxBytesStrategy partitionMaxBytesStrategy;
private final ShareGroupMetrics shareGroupMetrics;
@@ -130,6 +125,8 @@ public class DelayedShareFetch extends DelayedOperation {
public DelayedShareFetch(
ShareFetch shareFetch,
ReplicaManager replicaManager,
+ LogReader logReader,
+ PartitionMetadataProvider metadataProvider,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
ShareGroupMetrics shareGroupMetrics,
@@ -138,6 +135,8 @@ public class DelayedShareFetch extends DelayedOperation {
) {
this(shareFetch,
replicaManager,
+ logReader,
+ metadataProvider,
exceptionHandler,
sharePartitions,
PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM),
@@ -166,6 +165,8 @@ public class DelayedShareFetch extends DelayedOperation {
DelayedShareFetch(
ShareFetch shareFetch,
ReplicaManager replicaManager,
+ LogReader logReader,
+ PartitionMetadataProvider metadataProvider,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
PartitionMaxBytesStrategy partitionMaxBytesStrategy,
@@ -178,6 +179,8 @@ public class DelayedShareFetch extends DelayedOperation {
super(shareFetch.fetchParams().maxWaitMs);
this.shareFetch = shareFetch;
this.replicaManager = replicaManager;
+ this.logReader = logReader;
+ this.metadataProvider = metadataProvider;
this.partitionsAcquired = new LinkedHashMap<>();
this.localPartitionsAlreadyFetched = new LinkedHashMap<>();
this.exceptionHandler = exceptionHandler;
@@ -292,7 +295,7 @@ public class DelayedShareFetch extends DelayedOperation {
shareFetch,
shareFetchPartitionDataList,
sharePartitions,
- replicaManager,
+ metadataProvider,
exceptionHandler
));
} catch (Exception e) {
@@ -516,55 +519,15 @@ public class DelayedShareFetch extends DelayedOperation {
}
private LogOffsetMetadata
endOffsetMetadataForTopicPartition(TopicIdPartition topicIdPartition) {
- Partition partition = ShareFetchUtils.partition(replicaManager,
topicIdPartition.topicPartition());
- LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
- // The FetchIsolation type that we use for share fetch is
FetchIsolation.HIGH_WATERMARK. In the future, we can
- // extend it to support other FetchIsolation types.
FetchIsolation isolationType = shareFetch.fetchParams().isolation;
- if (isolationType == FetchIsolation.LOG_END)
- return offsetSnapshot.logEndOffset();
- else if (isolationType == FetchIsolation.HIGH_WATERMARK)
- return offsetSnapshot.highWatermark();
- else
- return offsetSnapshot.lastStableOffset();
+ return metadataProvider.endOffsetMetadata(topicIdPartition,
isolationType);
}
private LinkedHashMap<TopicIdPartition, LogReadResult>
readFromLog(LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
// Filter if there already exists any erroneous topic partition.
Set<TopicIdPartition> partitionsToFetch =
shareFetch.filterErroneousTopicPartitions(topicPartitionFetchOffsets.keySet());
- if (partitionsToFetch.isEmpty()) {
- return new LinkedHashMap<>();
- }
-
- LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData>
topicPartitionData = new LinkedHashMap<>();
-
- topicPartitionFetchOffsets.forEach((topicIdPartition, fetchOffset) ->
topicPartitionData.put(topicIdPartition,
- new FetchRequest.PartitionData(
- topicIdPartition.topicId(),
- fetchOffset,
- 0,
- partitionMaxBytes.get(topicIdPartition),
- Optional.empty())
- ));
-
- Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult =
replicaManager.readFromLog(
- shareFetch.fetchParams(),
- CollectionConverters.asScala(
- partitionsToFetch.stream().map(topicIdPartition ->
- new Tuple2<>(topicIdPartition,
topicPartitionData.get(topicIdPartition))).collect(Collectors.toList())
- ),
- QuotaFactory.UNBOUNDED_QUOTA,
- true);
-
- LinkedHashMap<TopicIdPartition, LogReadResult> responseData = new
LinkedHashMap<>();
- responseLogResult.foreach(tpLogResult -> {
- responseData.put(tpLogResult._1(), tpLogResult._2());
- return BoxedUnit.UNIT;
- });
-
- log.trace("Data successfully retrieved by replica manager: {}",
responseData);
- return responseData;
+ return logReader.read(shareFetch.fetchParams(), partitionsToFetch,
topicPartitionFetchOffsets, partitionMaxBytes);
}
private boolean
anyPartitionHasLogReadError(LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse) {
@@ -946,7 +909,7 @@ public class DelayedShareFetch extends DelayedOperation {
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int)
(acquiredRatio * 100));
Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
remoteFetchResponse = ShareFetchUtils.processFetchResponse(
- shareFetch, shareFetchPartitionDataList, sharePartitions,
replicaManager, exceptionHandler);
+ shareFetch, shareFetchPartitionDataList, sharePartitions,
metadataProvider, exceptionHandler);
shareFetch.maybeComplete(remoteFetchResponse);
log.trace("Remote share fetch request completed successfully,
response: {}", remoteFetchResponse);
} catch (InterruptedException | ExecutionException e) {
diff --git
a/core/src/main/java/kafka/server/share/ReplicaManagerPartitionMetadataProvider.java
b/core/src/main/java/kafka/server/share/ReplicaManagerPartitionMetadataProvider.java
index 5c115a3c342..4d1d03d2ab1 100644
---
a/core/src/main/java/kafka/server/share/ReplicaManagerPartitionMetadataProvider.java
+++
b/core/src/main/java/kafka/server/share/ReplicaManagerPartitionMetadataProvider.java
@@ -53,6 +53,7 @@ public class ReplicaManagerPartitionMetadataProvider
implements PartitionMetadat
@Override
public long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition,
int leaderEpoch) {
+ // Isolation level is only required when reading from the latest
offset hence use Option.empty() for now.
Optional<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(),
ListOffsetsRequest.EARLIEST_TIMESTAMP, scala.Option.empty(),
Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
@@ -64,6 +65,7 @@ public class ReplicaManagerPartitionMetadataProvider
implements PartitionMetadat
@Override
public long offsetForLatestTimestamp(TopicIdPartition topicIdPartition,
int leaderEpoch) {
+ // Isolation level is set to READ_UNCOMMITTED, matching with that used
in share fetch requests.
Optional<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
topicIdPartition.topicPartition(),
ListOffsetsRequest.LATEST_TIMESTAMP, new
Some<>(IsolationLevel.READ_UNCOMMITTED),
Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
@@ -85,20 +87,20 @@ public class ReplicaManagerPartitionMetadataProvider
implements PartitionMetadat
}
@Override
- public Optional<LogOffsetMetadata> endOffsetMetadata(TopicIdPartition
topicIdPartition, FetchIsolation isolation) {
- Partition partition = leaderPartition(topicIdPartition);
+ public LogOffsetMetadata endOffsetMetadata(TopicIdPartition
topicIdPartition, FetchIsolation isolation) {
+ Partition partition = partition(topicIdPartition);
LogOffsetSnapshot offsetSnapshot =
partition.fetchOffsetSnapshot(Optional.empty(), true);
if (isolation == FetchIsolation.LOG_END)
- return Optional.of(offsetSnapshot.logEndOffset());
+ return offsetSnapshot.logEndOffset();
else if (isolation == FetchIsolation.HIGH_WATERMARK)
- return Optional.of(offsetSnapshot.highWatermark());
+ return offsetSnapshot.highWatermark();
else
- return Optional.of(offsetSnapshot.lastStableOffset());
+ return offsetSnapshot.lastStableOffset();
}
@Override
public int leaderEpoch(TopicIdPartition topicIdPartition) {
- return leaderPartition(topicIdPartition).getLeaderEpoch();
+ return partition(topicIdPartition).getLeaderEpoch();
}
@Override
@@ -111,7 +113,7 @@ public class ReplicaManagerPartitionMetadataProvider
implements PartitionMetadat
replicaManager.removeListener(topicIdPartition.topicPartition(),
listener);
}
- private Partition leaderPartition(TopicIdPartition topicIdPartition) {
+ private Partition partition(TopicIdPartition topicIdPartition) {
Partition partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
if (!partition.isLeader()) {
log.debug("The broker is not the leader for topic partition: {}",
topicIdPartition.topicPartition());
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index ba3f03a4cca..461ca5df6b1 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -16,22 +16,14 @@
*/
package kafka.server.share;
-import kafka.cluster.Partition;
-import kafka.server.ReplicaManager;
-
-import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicIdPartition;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
-import org.apache.kafka.common.errors.OffsetNotAvailableException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.record.internal.Records;
-import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -46,12 +38,8 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.function.BiConsumer;
-import scala.Option;
-import scala.Some;
-
/**
* Utility class for post-processing of share fetch operations.
*/
@@ -66,7 +54,7 @@ public class ShareFetchUtils {
ShareFetch shareFetch,
List<ShareFetchPartitionData> shareFetchPartitionDataList,
LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions,
- ReplicaManager replicaManager,
+ PartitionMetadataProvider metadataProvider,
BiConsumer<SharePartitionKey, Throwable> exceptionHandler
) {
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> response =
new HashMap<>();
@@ -94,8 +82,8 @@ public class ShareFetchUtils {
// would be returned for other share partitions in the fetch
request.
if (fetchPartitionData.error.code() ==
Errors.OFFSET_OUT_OF_RANGE.code()) {
try {
-
sharePartition.updateCacheAndOffsets(offsetForEarliestTimestamp(topicIdPartition,
- replicaManager, sharePartition.leaderEpoch()));
+
sharePartition.updateCacheAndOffsets(metadataProvider.offsetForEarliestTimestamp(
+ topicIdPartition, sharePartition.leaderEpoch()));
} catch (Exception e) {
log.error("Error while fetching offset for earliest
timestamp for topicIdPartition: {}", topicIdPartition, e);
shareFetch.addErroneous(topicIdPartition, e);
@@ -138,65 +126,6 @@ public class ShareFetchUtils {
return response;
}
- /**
- * The method is used to get the offset for the earliest timestamp for the
topic-partition.
- *
- * @return The offset for the earliest timestamp.
- */
- static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager, int leaderEpoch) {
- // Isolation level is only required when reading from the latest
offset hence use Option.empty() for now.
- Optional<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
- topicIdPartition.topicPartition(),
ListOffsetsRequest.EARLIEST_TIMESTAMP, Option.empty(),
- Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
- if (timestampAndOffset.isEmpty()) {
- throw new OffsetNotAvailableException("Offset for earliest
timestamp not found for topic partition: " + topicIdPartition);
- }
- return timestampAndOffset.get().offset;
- }
-
- /**
- * The method is used to get the offset for the latest timestamp for the
topic-partition.
- *
- * @return The offset for the latest timestamp.
- */
- static long offsetForLatestTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager, int leaderEpoch) {
- // Isolation level is set to READ_UNCOMMITTED, matching with that used
in share fetch requests
- Optional<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
- topicIdPartition.topicPartition(),
ListOffsetsRequest.LATEST_TIMESTAMP, new
Some<>(IsolationLevel.READ_UNCOMMITTED),
- Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
- if (timestampAndOffset.isEmpty()) {
- throw new OffsetNotAvailableException("Offset for latest timestamp
not found for topic partition: " + topicIdPartition);
- }
- return timestampAndOffset.get().offset;
- }
-
- /**
- * The method is used to get the offset for the given timestamp for the
topic-partition.
- *
- * @return The offset for the given timestamp.
- */
- static long offsetForTimestamp(TopicIdPartition topicIdPartition,
ReplicaManager replicaManager, long timestampToSearch, int leaderEpoch) {
- Optional<FileRecords.TimestampAndOffset> timestampAndOffset =
replicaManager.fetchOffsetForTimestamp(
- topicIdPartition.topicPartition(), timestampToSearch, new
Some<>(IsolationLevel.READ_UNCOMMITTED), Optional.of(leaderEpoch),
true).timestampAndOffsetOpt();
- if (timestampAndOffset.isEmpty()) {
- throw new OffsetNotAvailableException("Offset for timestamp " +
timestampToSearch + " not found for topic partition: " + topicIdPartition);
- }
- return timestampAndOffset.get().offset;
- }
-
- static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
- return partition(replicaManager, tp).getLeaderEpoch();
- }
-
- static Partition partition(ReplicaManager replicaManager, TopicPartition
tp) {
- Partition partition = replicaManager.getPartitionOrException(tp);
- if (!partition.isLeader()) {
- log.debug("The broker is not the leader for topic partition:
{}-{}", tp.topic(), tp.partition());
- throw new NotLeaderOrFollowerException();
- }
- return partition;
- }
-
/**
* Slice the fetch records based on the acquired records. The slicing is
done based on the first
* and last offset of the acquired records from the list. The slicing
doesn't consider individual
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index bfa209b4819..4232eaa4336 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -16,7 +16,6 @@
*/
package kafka.server.share;
-import kafka.server.ReplicaManager;
import kafka.server.share.SharePartitionManager.SharePartitionListener;
import org.apache.kafka.clients.consumer.AcknowledgeType;
@@ -41,6 +40,7 @@ import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
import org.apache.kafka.server.share.dlq.ShareGroupDLQRecordParameter;
@@ -91,12 +91,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
import java.util.function.Supplier;
-import static kafka.server.share.ShareFetchUtils.offsetForEarliestTimestamp;
-import static kafka.server.share.ShareFetchUtils.offsetForLatestTimestamp;
-import static kafka.server.share.ShareFetchUtils.offsetForTimestamp;
-
/**
* The SharePartition is used to track the state of a partition that is shared
between multiple
* consumers. The class maintains the state of the records that have been
fetched from the leader
@@ -263,10 +260,14 @@ public class SharePartition {
private final AcquisitionLockTimeoutHandler timeoutHandler;
/**
- * The replica manager is used to check to see if any delayed share fetch
request can be completed because of data
- * availability due to acquisition lock timeout.
+ * The metadata provider is used to resolve metadata for partition.
+ */
+ private final PartitionMetadataProvider metadataProvider;
+
+ /**
+ * The delayed request notifier is used to complete delayed share fetch
requests.
*/
- private final ReplicaManager replicaManager;
+ private final Consumer<DelayedShareFetchKey> delayedRequestNotifier;
/**
* The share partition start offset specifies the partition start offset
from which the records
@@ -349,14 +350,15 @@ public class SharePartition {
Timer timer,
Time time,
Persister persister,
- ReplicaManager replicaManager,
+ PartitionMetadataProvider metadataProvider,
+ Consumer<DelayedShareFetchKey> delayedRequestNotifier,
ShareGroupConfigProvider configProvider,
SharePartitionListener listener,
Supplier<Boolean> shareGroupDlqEnableSupplier,
ShareGroupDLQManager shareGroupDLQManager
) {
this(groupId, topicIdPartition, leaderEpoch,
defaultMaxInFlightRecords, defaultMaxDeliveryCount, defaultRecordLockDurationMs,
- timer, time, persister, replicaManager, configProvider,
SharePartitionState.EMPTY, listener,
+ timer, time, persister, metadataProvider, delayedRequestNotifier,
configProvider, SharePartitionState.EMPTY, listener,
new SharePartitionMetrics(groupId, topicIdPartition.topic(),
topicIdPartition.partition()), shareGroupDlqEnableSupplier,
shareGroupDLQManager);
}
@@ -373,7 +375,8 @@ public class SharePartition {
Timer timer,
Time time,
Persister persister,
- ReplicaManager replicaManager,
+ PartitionMetadataProvider metadataProvider,
+ Consumer<DelayedShareFetchKey> delayedRequestNotifier,
ShareGroupConfigProvider configProvider,
SharePartitionState sharePartitionState,
SharePartitionListener listener,
@@ -396,7 +399,8 @@ public class SharePartition {
this.loadStartTimeMs = time.hiResClockMs();
this.persister = persister;
this.partitionState = sharePartitionState;
- this.replicaManager = replicaManager;
+ this.metadataProvider = metadataProvider;
+ this.delayedRequestNotifier = delayedRequestNotifier;
this.configProvider = configProvider;
this.fetchOffsetMetadata = new OffsetMetadata();
this.delayedShareFetchKey = new DelayedShareFetchGroupKey(groupId,
topicIdPartition);
@@ -3121,7 +3125,7 @@ public class SharePartition {
private void maybeCompleteDelayedShareFetchRequest(boolean shouldComplete)
{
if (shouldComplete) {
-
replicaManager.completeDelayedShareFetchRequest(delayedShareFetchKey);
+ delayedRequestNotifier.accept(delayedShareFetchKey);
}
}
@@ -3133,12 +3137,12 @@ public class SharePartition {
ShareGroupAutoOffsetResetStrategy offsetResetStrategy =
configProvider.autoOffsetReset(groupId);
if (offsetResetStrategy.type() ==
ShareGroupAutoOffsetResetStrategy.StrategyType.LATEST) {
- return offsetForLatestTimestamp(topicIdPartition, replicaManager,
leaderEpoch);
+ return metadataProvider.offsetForLatestTimestamp(topicIdPartition,
leaderEpoch);
} else if (offsetResetStrategy.type() ==
ShareGroupAutoOffsetResetStrategy.StrategyType.EARLIEST) {
- return offsetForEarliestTimestamp(topicIdPartition,
replicaManager, leaderEpoch);
+ return
metadataProvider.offsetForEarliestTimestamp(topicIdPartition, leaderEpoch);
} else {
// offsetResetStrategy type is BY_DURATION
- return offsetForTimestamp(topicIdPartition, replicaManager,
offsetResetStrategy.timestamp(), leaderEpoch);
+ return metadataProvider.offsetForTimestamp(topicIdPartition,
offsetResetStrategy.timestamp(), leaderEpoch);
}
}
diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java
b/core/src/main/java/kafka/server/share/SharePartitionManager.java
index 531f5278bc5..0472fce579d 100644
--- a/core/src/main/java/kafka/server/share/SharePartitionManager.java
+++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java
@@ -35,6 +35,8 @@ import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.partition.PartitionListener;
import org.apache.kafka.server.share.CachedSharePartition;
+import org.apache.kafka.server.share.LogReader;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
import org.apache.kafka.server.share.ShareGroupListener;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
@@ -95,6 +97,21 @@ public class SharePartitionManager implements AutoCloseable {
*/
private final ReplicaManager replicaManager;
+ /**
+ * The log reader is used to read records from the log.
+ */
+ private final LogReader logReader;
+
+ /**
+ * The metadata provider is used to resolve metadata for partition.
+ */
+ private final PartitionMetadataProvider metadataProvider;
+
+ /**
+ * The delayed request notifier is used to complete delayed share fetch
requests.
+ */
+ private final Consumer<DelayedShareFetchKey> delayedRequestNotifier;
+
/**
* The time instance is used to get the current time.
*/
@@ -247,6 +264,9 @@ public class SharePartitionManager implements AutoCloseable
{
ShareGroupDLQManager shareGroupDLQManager
) {
this.replicaManager = replicaManager;
+ this.logReader = new ReplicaManagerLogReader(replicaManager);
+ this.metadataProvider = new
ReplicaManagerPartitionMetadataProvider(replicaManager);
+ this.delayedRequestNotifier =
replicaManager::completeDelayedShareFetchRequest;
this.time = time;
this.cache = cache;
this.partitionCache = partitionCache;
@@ -598,7 +618,7 @@ public class SharePartitionManager implements AutoCloseable
{
Set<SharePartitionKey> sharePartitionKeys =
partitionCache.cachedSharePartitionKeys();
// Remove all share partitions from partition cache.
sharePartitionKeys.forEach(sharePartitionKey ->
- removeSharePartitionFromCache(sharePartitionKey,
partitionCache, replicaManager)
+ removeSharePartitionFromCache(sharePartitionKey,
partitionCache, metadataProvider, delayedRequestNotifier)
);
}
}
@@ -720,19 +740,19 @@ public class SharePartitionManager implements
AutoCloseable {
// Add the share fetch to the delayed share fetch purgatory to process
the fetch request.
// The request will be added irrespective of whether the share
partition is initialized or not.
// Once the share partition is initialized, the delayed share fetch
will be completed.
- addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager,
fencedSharePartitionHandler(), sharePartitions, shareGroupMetrics, time,
remoteFetchMaxWaitMs), delayedShareFetchWatchKeys);
+ addDelayedShareFetch(new DelayedShareFetch(shareFetch, replicaManager,
logReader, metadataProvider, fencedSharePartitionHandler(), sharePartitions,
shareGroupMetrics, time, remoteFetchMaxWaitMs), delayedShareFetchWatchKeys);
}
private SharePartition getOrCreateSharePartition(SharePartitionKey
sharePartitionKey) {
return partitionCache.computeIfAbsent(sharePartitionKey,
k -> {
- int leaderEpoch =
ShareFetchUtils.leaderEpoch(replicaManager,
sharePartitionKey.topicIdPartition().topicPartition());
+ int leaderEpoch =
metadataProvider.leaderEpoch(sharePartitionKey.topicIdPartition());
// Attach listener to Partition which shall invoke
partition change handlers.
// However, as there could be multiple share partitions
(per group name) for a single topic-partition,
// hence create separate listeners per share partition
which holds the share partition key
// to identify the respective share partition.
- SharePartitionListener listener = new
SharePartitionListener(sharePartitionKey, replicaManager, partitionCache);
-
replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(),
listener);
+ SharePartitionListener listener = new
SharePartitionListener(sharePartitionKey, metadataProvider,
delayedRequestNotifier, partitionCache);
+
metadataProvider.addPartitionListener(sharePartitionKey.topicIdPartition(),
listener);
return new SharePartition(
sharePartitionKey.groupId(),
sharePartitionKey.topicIdPartition(),
@@ -743,7 +763,8 @@ public class SharePartitionManager implements AutoCloseable
{
timer,
time,
persister,
- replicaManager,
+ metadataProvider,
+ delayedRequestNotifier,
configProvider,
listener,
shareGroupDlqEnableSupplier,
@@ -765,7 +786,7 @@ public class SharePartitionManager implements AutoCloseable
{
}
// Remove the partition from the cache as it's failed to initialize.
- removeSharePartitionFromCache(sharePartitionKey, partitionCache,
replicaManager);
+ removeSharePartitionFromCache(sharePartitionKey, partitionCache,
metadataProvider, delayedRequestNotifier);
// The partition initialization failed, so add the partition to the
erroneous partitions.
log.debug("Error initializing share partition with key {}",
sharePartitionKey, throwable);
shareFetch.addErroneous(sharePartitionKey.topicIdPartition(),
throwable);
@@ -785,7 +806,7 @@ public class SharePartitionManager implements AutoCloseable
{
// The share partition is fenced hence remove the partition
from map and let the client retry.
// But surface the error to the client so client might take
some action i.e. re-fetch
// the metadata and retry the fetch on new leader.
- removeSharePartitionFromCache(sharePartitionKey,
partitionCache, replicaManager);
+ removeSharePartitionFromCache(sharePartitionKey,
partitionCache, metadataProvider, delayedRequestNotifier);
}
};
}
@@ -804,13 +825,14 @@ public class SharePartitionManager implements
AutoCloseable {
private static void removeSharePartitionFromCache(
SharePartitionKey sharePartitionKey,
SharePartitionCache partitionCache,
- ReplicaManager replicaManager
+ PartitionMetadataProvider metadataProvider,
+ Consumer<DelayedShareFetchKey> delayedRequestNotifier
) {
SharePartition sharePartition =
partitionCache.remove(sharePartitionKey);
if (sharePartition != null) {
sharePartition.markFenced();
-
replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(),
sharePartition.listener());
- replicaManager.completeDelayedShareFetchRequest(new
DelayedShareFetchGroupKey(sharePartitionKey.groupId(),
sharePartitionKey.topicIdPartition()));
+
metadataProvider.removePartitionListener(sharePartitionKey.topicIdPartition(),
sharePartition.listener());
+ delayedRequestNotifier.accept(new
DelayedShareFetchGroupKey(sharePartitionKey.groupId(),
sharePartitionKey.topicIdPartition()));
}
}
@@ -840,16 +862,19 @@ public class SharePartitionManager implements
AutoCloseable {
static class SharePartitionListener implements PartitionListener {
private final SharePartitionKey sharePartitionKey;
- private final ReplicaManager replicaManager;
+ private final PartitionMetadataProvider metadataProvider;
+ private final Consumer<DelayedShareFetchKey> delayedRequestNotifier;
private final SharePartitionCache partitionCache;
SharePartitionListener(
SharePartitionKey sharePartitionKey,
- ReplicaManager replicaManager,
+ PartitionMetadataProvider metadataProvider,
+ Consumer<DelayedShareFetchKey> delayedRequestNotifier,
SharePartitionCache partitionCache
) {
this.sharePartitionKey = sharePartitionKey;
- this.replicaManager = replicaManager;
+ this.metadataProvider = metadataProvider;
+ this.delayedRequestNotifier = delayedRequestNotifier;
this.partitionCache = partitionCache;
}
@@ -880,7 +905,7 @@ public class SharePartitionManager implements AutoCloseable
{
topicPartition, sharePartitionKey);
return;
}
- removeSharePartitionFromCache(sharePartitionKey, partitionCache,
replicaManager);
+ removeSharePartitionFromCache(sharePartitionKey, partitionCache,
metadataProvider, delayedRequestNotifier);
}
}
@@ -905,7 +930,7 @@ public class SharePartitionManager implements AutoCloseable
{
if (topicIdPartitions != null) {
// Remove all share partitions from partition cache.
topicIdPartitions.forEach(topicIdPartition ->
- removeSharePartitionFromCache(new
SharePartitionKey(groupId, topicIdPartition), partitionCache, replicaManager)
+ removeSharePartitionFromCache(new
SharePartitionKey(groupId, topicIdPartition), partitionCache, metadataProvider,
delayedRequestNotifier)
);
}
}
diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
index e5d95ff38c0..ba50773b4ac 100644
--- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
+++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java
@@ -36,6 +36,8 @@ import
org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.quota.ReplicaQuota;
+import org.apache.kafka.server.share.LogReader;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
@@ -2420,6 +2422,8 @@ public class DelayedShareFetchTest {
static class DelayedShareFetchBuilder {
private ShareFetch shareFetch = mock(ShareFetch.class);
private ReplicaManager replicaManager = mock(ReplicaManager.class);
+ private LogReader logReader = mock(LogReader.class);
+ private PartitionMetadataProvider metadataProvider =
mock(PartitionMetadataProvider.class);
private BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mockExceptionHandler();
private LinkedHashMap<TopicIdPartition, SharePartition>
sharePartitions = mock(LinkedHashMap.class);
private PartitionMaxBytesStrategy partitionMaxBytesStrategy =
mock(PartitionMaxBytesStrategy.class);
@@ -2435,6 +2439,8 @@ public class DelayedShareFetchTest {
DelayedShareFetchBuilder withReplicaManager(ReplicaManager
replicaManager) {
this.replicaManager = replicaManager;
+ this.logReader = new ReplicaManagerLogReader(replicaManager);
+ this.metadataProvider = new
ReplicaManagerPartitionMetadataProvider(replicaManager);
return this;
}
@@ -2481,6 +2487,8 @@ public class DelayedShareFetchTest {
return new DelayedShareFetch(
shareFetch,
replicaManager,
+ logReader,
+ metadataProvider,
exceptionHandler,
sharePartitions,
partitionMaxBytesStrategy,
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index 08ce763b8ad..c71fc46ebb5 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.record.internal.RecordBatch;
import org.apache.kafka.common.record.internal.Records;
import org.apache.kafka.common.record.internal.SimpleRecord;
import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
@@ -143,7 +144,7 @@ public class ShareFetchUtilsTest {
OptionalInt.empty(), false))
);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData
=
- ShareFetchUtils.processFetchResponse(shareFetch, responseData,
sharePartitions, mock(ReplicaManager.class), EXCEPTION_HANDLER);
+ ShareFetchUtils.processFetchResponse(shareFetch, responseData,
sharePartitions, mock(PartitionMetadataProvider.class), EXCEPTION_HANDLER);
assertEquals(2, resultData.size());
assertTrue(resultData.containsKey(tp0));
@@ -192,7 +193,7 @@ public class ShareFetchUtilsTest {
OptionalInt.empty(), false))
);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData
=
- ShareFetchUtils.processFetchResponse(shareFetch, responseData,
sharePartitions, mock(ReplicaManager.class), EXCEPTION_HANDLER);
+ ShareFetchUtils.processFetchResponse(shareFetch, responseData,
sharePartitions, mock(PartitionMetadataProvider.class), EXCEPTION_HANDLER);
assertEquals(2, resultData.size());
assertTrue(resultData.containsKey(tp0));
@@ -255,7 +256,7 @@ public class ShareFetchUtilsTest {
OptionalInt.empty(), false))
);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
resultData1 =
- ShareFetchUtils.processFetchResponse(shareFetch, responseData1,
sharePartitions, replicaManager, EXCEPTION_HANDLER);
+ ShareFetchUtils.processFetchResponse(shareFetch, responseData1,
sharePartitions, new ReplicaManagerPartitionMetadataProvider(replicaManager),
EXCEPTION_HANDLER);
assertEquals(2, resultData1.size());
assertTrue(resultData1.containsKey(tp0));
@@ -285,7 +286,7 @@ public class ShareFetchUtilsTest {
OptionalInt.empty(), false))
);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
resultData2 =
- ShareFetchUtils.processFetchResponse(shareFetch, responseData2,
sharePartitions, replicaManager, EXCEPTION_HANDLER);
+ ShareFetchUtils.processFetchResponse(shareFetch, responseData2,
sharePartitions, new ReplicaManagerPartitionMetadataProvider(replicaManager),
EXCEPTION_HANDLER);
assertEquals(2, resultData2.size());
assertTrue(resultData2.containsKey(tp0));
@@ -334,7 +335,7 @@ public class ShareFetchUtilsTest {
OptionalInt.empty(), false)));
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData
=
- ShareFetchUtils.processFetchResponse(shareFetch, responseData,
sharePartitions, replicaManager, EXCEPTION_HANDLER);
+ ShareFetchUtils.processFetchResponse(shareFetch, responseData,
sharePartitions, new ReplicaManagerPartitionMetadataProvider(replicaManager),
EXCEPTION_HANDLER);
assertEquals(1, resultData.size());
assertTrue(resultData.containsKey(tp0));
@@ -349,7 +350,7 @@ public class ShareFetchUtilsTest {
records, Optional.empty(), OptionalLong.empty(),
Optional.empty(),
OptionalInt.empty(), false)));
- resultData = ShareFetchUtils.processFetchResponse(shareFetch,
responseData, sharePartitions, replicaManager, EXCEPTION_HANDLER);
+ resultData = ShareFetchUtils.processFetchResponse(shareFetch,
responseData, sharePartitions, new
ReplicaManagerPartitionMetadataProvider(replicaManager), EXCEPTION_HANDLER);
assertEquals(1, resultData.size());
assertTrue(resultData.containsKey(tp0));
@@ -416,7 +417,7 @@ public class ShareFetchUtilsTest {
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData
=
ShareFetchUtils.processFetchResponse(shareFetch, responseData,
sharePartitions,
- mock(ReplicaManager.class), EXCEPTION_HANDLER);
+ mock(PartitionMetadataProvider.class), EXCEPTION_HANDLER);
assertEquals(2, resultData.size());
assertTrue(resultData.containsKey(tp0));
@@ -465,7 +466,7 @@ public class ShareFetchUtilsTest {
BiConsumer<SharePartitionKey, Throwable> exceptionHandler =
mock(BiConsumer.class);
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> resultData
=
ShareFetchUtils.processFetchResponse(shareFetch, responseData,
sharePartitions,
- replicaManager, exceptionHandler);
+ new ReplicaManagerPartitionMetadataProvider(replicaManager),
exceptionHandler);
assertTrue(resultData.isEmpty());
Mockito.verify(shareFetch, times(1)).addErroneous(tp0, exception);
diff --git
a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
index d1a0d98eb65..eb6965bda32 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java
@@ -2788,7 +2788,9 @@ public class SharePartitionManagerTest {
SharePartitionCache partitionCache = new SharePartitionCache();
ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
- SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey,
+ new ReplicaManagerPartitionMetadataProvider(mockReplicaManager),
+ mockReplicaManager::completeDelayedShareFetchRequest,
partitionCache);
testSharePartitionListener(sharePartitionKey, partitionCache,
mockReplicaManager, partitionListener::onFailed);
}
@@ -2799,7 +2801,9 @@ public class SharePartitionManagerTest {
SharePartitionCache partitionCache = new SharePartitionCache();
ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
- SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey,
+ new ReplicaManagerPartitionMetadataProvider(mockReplicaManager),
+ mockReplicaManager::completeDelayedShareFetchRequest,
partitionCache);
testSharePartitionListener(sharePartitionKey, partitionCache,
mockReplicaManager, partitionListener::onDeleted);
}
@@ -2810,7 +2814,9 @@ public class SharePartitionManagerTest {
SharePartitionCache partitionCache = new SharePartitionCache();
ReplicaManager mockReplicaManager = mock(ReplicaManager.class);
- SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
+ SharePartitionListener partitionListener = new
SharePartitionListener(sharePartitionKey,
+ new ReplicaManagerPartitionMetadataProvider(mockReplicaManager),
+ mockReplicaManager::completeDelayedShareFetchRequest,
partitionCache);
testSharePartitionListener(sharePartitionKey, partitionCache,
mockReplicaManager, partitionListener::onBecomingFollower);
}
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index 61480cd73bb..570e09115ab 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -55,11 +55,13 @@ import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import
org.apache.kafka.coordinator.group.modern.share.ShareGroupConfigProvider;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.dlq.NoOpShareGroupDLQManager;
import org.apache.kafka.server.share.dlq.ShareGroupDLQManager;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
+import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.InFlightBatch;
import org.apache.kafka.server.share.fetch.InFlightState;
import org.apache.kafka.server.share.fetch.RecordState;
@@ -105,6 +107,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -242,7 +245,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -293,7 +296,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -354,7 +357,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
.withSharePartitionMetrics(sharePartitionMetrics)
.build();
@@ -409,7 +412,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -456,7 +459,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -502,7 +505,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -556,7 +559,7 @@ public class SharePartitionTest {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withPersister(persister)
.withConfigProvider(new
ShareGroupConfigProvider(groupConfigManager))
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
.build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
@@ -861,7 +864,7 @@ public class SharePartitionTest {
Mockito.doReturn(new
OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).
when(replicaManager).fetchOffsetForTimestamp(Mockito.any(TopicPartition.class),
Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean());
- SharePartition sharePartition =
SharePartitionBuilder.builder().withReplicaManager(replicaManager).build();
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager)).build();
CompletableFuture<Void> result = sharePartition.maybeInitialize();
assertTrue(result.isDone());
assertFalse(result.isCompletedExceptionally());
@@ -2712,7 +2715,7 @@ public class SharePartitionTest {
.thenReturn(140L) // for subsequent lock acquire
.thenReturn(170L); // for subsequent lock release
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
.withTime(time)
.withSharePartitionMetrics(sharePartitionMetrics)
.build();
@@ -2789,7 +2792,7 @@ public class SharePartitionTest {
public void testAcknowledgeSingleRecordBatch() {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -2824,7 +2827,8 @@ public class SharePartitionTest {
public void testAcknowledgeMultipleRecordBatch() {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
+
.withDelayedRequestNotifier(replicaManager::completeDelayedShareFetchRequest)
.withState(SharePartitionState.ACTIVE)
.build();
MemoryRecords records = memoryRecords(5, 10);
@@ -12158,7 +12162,7 @@ public class SharePartitionTest {
public void testInvalidAcknowledgeTypeInBatchAcknowledgement() {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -12178,7 +12182,7 @@ public class SharePartitionTest {
public void testInvalidAcknowledgeTypeInSubsetAcknowledgement() {
ReplicaManager replicaManager = Mockito.mock(ReplicaManager.class);
SharePartition sharePartition = SharePartitionBuilder.builder()
- .withReplicaManager(replicaManager)
+ .withMetadataProvider(new
ReplicaManagerPartitionMetadataProvider(replicaManager))
.withState(SharePartitionState.ACTIVE)
.build();
@@ -13313,7 +13317,8 @@ public class SharePartitionTest {
private int defaultMaxInflightRecords = DEFAULT_MAX_IN_FLIGHT_RECORDS;
private Persister persister = new NoOpStatePersister();
- private ReplicaManager replicaManager =
Mockito.mock(ReplicaManager.class);
+ private PartitionMetadataProvider metadataProvider =
Mockito.mock(PartitionMetadataProvider.class);
+ private Consumer<DelayedShareFetchKey> delayedRequestNotifier =
Mockito.mock(Consumer.class);
private ShareGroupConfigProvider configProvider = new
ShareGroupConfigProvider(Mockito.mock(GroupConfigManager.class));
private SharePartitionState state = SharePartitionState.EMPTY;
private Time time = MOCK_TIME;
@@ -13341,8 +13346,13 @@ public class SharePartitionTest {
return this;
}
- private SharePartitionBuilder withReplicaManager(ReplicaManager
replicaManager) {
- this.replicaManager = replicaManager;
+ private SharePartitionBuilder
withMetadataProvider(PartitionMetadataProvider metadataProvider) {
+ this.metadataProvider = metadataProvider;
+ return this;
+ }
+
+ private SharePartitionBuilder
withDelayedRequestNotifier(Consumer<DelayedShareFetchKey>
delayedRequestNotifier) {
+ this.delayedRequestNotifier = delayedRequestNotifier;
return this;
}
@@ -13382,7 +13392,7 @@ public class SharePartitionTest {
public SharePartition build() {
return new SharePartition(GROUP_ID, TOPIC_ID_PARTITION, 0,
defaultMaxInflightRecords, defaultMaxDeliveryCount,
- defaultAcquisitionLockTimeoutMs, mockTimer, time, persister,
replicaManager, configProvider,
+ defaultAcquisitionLockTimeoutMs, mockTimer, time, persister,
metadataProvider, delayedRequestNotifier, configProvider,
state, Mockito.mock(SharePartitionListener.class),
sharePartitionMetrics, shareGroupDlqEnableSupplier,
shareGroupDLQManager);
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 0bf4edd59c2..6c2aa46c3d3 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -47,6 +47,8 @@ import
org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.replica.ReplicaView.DefaultReplicaView
import org.apache.kafka.common.replica.{ClientMetadata, PartitionView,
ReplicaSelector, ReplicaView}
import org.apache.kafka.common.requests.FetchRequest.PartitionData
+import org.apache.kafka.server.share.LogReader;
+import org.apache.kafka.server.share.PartitionMetadataProvider;
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.KafkaPrincipal
@@ -5982,6 +5984,8 @@ class ReplicaManagerTest {
val delayedShareFetch = spy(new DelayedShareFetch(
shareFetch,
rm,
+ mock(classOf[LogReader]),
+ mock(classOf[PartitionMetadataProvider]),
mock(classOf[BiConsumer[SharePartitionKey, Throwable]]),
sharePartitions,
mock(classOf[ShareGroupMetrics]),
diff --git
a/server/src/main/java/org/apache/kafka/server/share/PartitionMetadataProvider.java
b/server/src/main/java/org/apache/kafka/server/share/PartitionMetadataProvider.java
index 0dce70d6359..ee1064d79b3 100644
---
a/server/src/main/java/org/apache/kafka/server/share/PartitionMetadataProvider.java
+++
b/server/src/main/java/org/apache/kafka/server/share/PartitionMetadataProvider.java
@@ -21,8 +21,6 @@ import org.apache.kafka.server.partition.PartitionListener;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
-import java.util.Optional;
-
/**
* Abstraction for partition metadata operations.
*/
@@ -44,12 +42,11 @@ public interface PartitionMetadataProvider {
long offsetForTimestamp(TopicIdPartition topicIdPartition, long timestamp,
int leaderEpoch);
/**
- * Get the end offset metadata for minBytes estimation.
+ * Get the end offset metadata for partition.
*
- * @return The end offset metadata based on the given fetch isolation, or
- * {@link Optional#empty()} when no local partition metadata is
available.
+ * @return The end offset metadata based on the given fetch isolation.
*/
- Optional<LogOffsetMetadata> endOffsetMetadata(TopicIdPartition
topicIdPartition, FetchIsolation isolation);
+ LogOffsetMetadata endOffsetMetadata(TopicIdPartition topicIdPartition,
FetchIsolation isolation);
/**
* Get the leader epoch for a partition.