This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 54cf2ebc9be MINOR: Initialize fetchPartitionStatus as a Map type to
reduce collection conversions (#20768)
54cf2ebc9be is described below
commit 54cf2ebc9bef51a5597a407c361504f2c8e9c929
Author: Lan Ding <[email protected]>
AuthorDate: Sun Nov 2 22:27:41 2025 +0800
MINOR: Initialize fetchPartitionStatus as a Map type to reduce collection
conversions (#20768)
see https://github.com/apache/kafka/pull/19876#discussion_r2461128452
Initialize `fetchPartitionStatus` as a `Map` type to reduce unnecessary
collection conversions.
Reviewers: Ismael Juma <[email protected]>, Kamal Chandraprakash
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../src/main/scala/kafka/server/DelayedFetch.scala | 132 ++++++++++-----------
.../main/scala/kafka/server/ReplicaManager.scala | 17 +--
.../kafka/server/DelayedFetchTest.scala | 15 ++-
.../kafka/server/ReplicaManagerQuotasTest.scala | 10 +-
4 files changed, 95 insertions(+), 79 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index ebdc0000440..0c9a561db57 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams,
FetchPartitionData}
import org.apache.kafka.storage.internals.log.{FetchPartitionStatus,
LogOffsetMetadata}
+import java.util
import scala.collection._
import scala.jdk.CollectionConverters._
@@ -39,7 +40,7 @@ import scala.jdk.CollectionConverters._
*/
class DelayedFetch(
params: FetchParams,
- fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
+ fetchPartitionStatus: util.LinkedHashMap[TopicIdPartition,
FetchPartitionStatus],
replicaManager: ReplicaManager,
quota: ReplicaQuota,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
@@ -66,79 +67,78 @@ class DelayedFetch(
*/
override def tryComplete(): Boolean = {
var accumulatedSize = 0
- fetchPartitionStatus.foreach {
- case (topicIdPartition, fetchStatus) =>
- val fetchOffset = fetchStatus.startOffsetMetadata
- val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
- try {
- if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
- val partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
- val offsetSnapshot =
partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader)
-
- val endOffset = params.isolation match {
- case FetchIsolation.LOG_END => offsetSnapshot.logEndOffset
- case FetchIsolation.HIGH_WATERMARK =>
offsetSnapshot.highWatermark
- case FetchIsolation.TXN_COMMITTED =>
offsetSnapshot.lastStableOffset
- }
+ fetchPartitionStatus.forEach { (topicIdPartition, fetchStatus) =>
+ val fetchOffset = fetchStatus.startOffsetMetadata
+ val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
+ try {
+ if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
+ val partition =
replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
+ val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch,
params.fetchOnlyLeader)
+
+ val endOffset = params.isolation match {
+ case FetchIsolation.LOG_END => offsetSnapshot.logEndOffset
+ case FetchIsolation.HIGH_WATERMARK => offsetSnapshot.highWatermark
+ case FetchIsolation.TXN_COMMITTED =>
offsetSnapshot.lastStableOffset
+ }
- // Go directly to the check for Case G if the message offsets are
the same. If the log segment
- // has just rolled, then the high watermark offset will remain the
same but be on the old segment,
- // which would incorrectly be seen as an instance of Case F.
- if (fetchOffset.messageOffset > endOffset.messageOffset) {
- // Case F, this can happen when the new fetch operation is on a
truncated leader
- debug(s"Satisfying fetch $this since it is fetching later
segments of partition $topicIdPartition.")
- return forceComplete()
- } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
- if (fetchOffset.onOlderSegment(endOffset)) {
- // Case F, this can happen when the fetch operation is falling
behind the current segment
- // or the partition has just rolled a new segment
- debug(s"Satisfying fetch $this immediately since it is
fetching older segments.")
- // We will not force complete the fetch request if a replica
should be throttled.
- if (!params.isFromFollower ||
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
- return forceComplete()
- } else if (fetchOffset.onSameSegment(endOffset)) {
- // we take the partition fetch size as upper bound when
accumulating the bytes (skip if a throttled partition)
- val bytesAvailable =
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
- if (!params.isFromFollower ||
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
- accumulatedSize += bytesAvailable
- }
+ // Go directly to the check for Case G if the message offsets are
the same. If the log segment
+ // has just rolled, then the high watermark offset will remain the
same but be on the old segment,
+ // which would incorrectly be seen as an instance of Case F.
+ if (fetchOffset.messageOffset > endOffset.messageOffset) {
+ // Case F, this can happen when the new fetch operation is on a
truncated leader
+ debug(s"Satisfying fetch $this since it is fetching later segments
of partition $topicIdPartition.")
+ return forceComplete()
+ } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+ if (fetchOffset.onOlderSegment(endOffset)) {
+ // Case F, this can happen when the fetch operation is falling
behind the current segment
+ // or the partition has just rolled a new segment
+ debug(s"Satisfying fetch $this immediately since it is fetching
older segments.")
+ // We will not force complete the fetch request if a replica
should be throttled.
+ if (!params.isFromFollower ||
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
+ return forceComplete()
+ } else if (fetchOffset.onSameSegment(endOffset)) {
+ // we take the partition fetch size as upper bound when
accumulating the bytes (skip if a throttled partition)
+ val bytesAvailable =
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
+ if (!params.isFromFollower ||
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
+ accumulatedSize += bytesAvailable
}
+ }
- // Case H: If truncation has caused diverging epoch while this
request was in purgatory, return to trigger truncation
- fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch =>
- val epochEndOffset =
partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch,
fetchOnlyFromLeader = false)
- if (epochEndOffset.errorCode != Errors.NONE.code()
- || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET
- || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) {
- debug(s"Could not obtain last offset for leader epoch for
partition $topicIdPartition, epochEndOffset=$epochEndOffset.")
- return forceComplete()
- } else if (epochEndOffset.leaderEpoch < fetchEpoch ||
epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) {
- debug(s"Satisfying fetch $this since it has diverging epoch
requiring truncation for partition " +
- s"$topicIdPartition epochEndOffset=$epochEndOffset
fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.")
- return forceComplete()
- }
+ // Case H: If truncation has caused diverging epoch while this
request was in purgatory, return to trigger truncation
+ fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch =>
+ val epochEndOffset =
partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch,
fetchOnlyFromLeader = false)
+ if (epochEndOffset.errorCode != Errors.NONE.code()
+ || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET
+ || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) {
+ debug(s"Could not obtain last offset for leader epoch for
partition $topicIdPartition, epochEndOffset=$epochEndOffset.")
+ return forceComplete()
+ } else if (epochEndOffset.leaderEpoch < fetchEpoch ||
epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) {
+ debug(s"Satisfying fetch $this since it has diverging epoch
requiring truncation for partition " +
+ s"$topicIdPartition epochEndOffset=$epochEndOffset
fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.")
+ return forceComplete()
}
}
- } catch {
- case _: NotLeaderOrFollowerException => // Case A or Case B
- debug(s"Broker is no longer the leader or follower of
$topicIdPartition, satisfy $this immediately")
- return forceComplete()
- case _: UnknownTopicOrPartitionException => // Case C
- debug(s"Broker no longer knows of partition $topicIdPartition,
satisfy $this immediately")
- return forceComplete()
- case _: KafkaStorageException => // Case D
- debug(s"Partition $topicIdPartition is in an offline log
directory, satisfy $this immediately")
- return forceComplete()
- case _: FencedLeaderEpochException => // Case E
- debug(s"Broker is the leader of partition $topicIdPartition, but
the requested epoch " +
- s"$fetchLeaderEpoch is fenced by the latest leader epoch,
satisfy $this immediately")
- return forceComplete()
}
+ } catch {
+ case _: NotLeaderOrFollowerException => // Case A or Case B
+ debug(s"Broker is no longer the leader or follower of
$topicIdPartition, satisfy $this immediately")
+ return forceComplete()
+ case _: UnknownTopicOrPartitionException => // Case C
+ debug(s"Broker no longer knows of partition $topicIdPartition,
satisfy $this immediately")
+ return forceComplete()
+ case _: KafkaStorageException => // Case D
+ debug(s"Partition $topicIdPartition is in an offline log directory,
satisfy $this immediately")
+ return forceComplete()
+ case _: FencedLeaderEpochException => // Case E
+ debug(s"Broker is the leader of partition $topicIdPartition, but the
requested epoch " +
+ s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy
$this immediately")
+ return forceComplete()
+ }
}
// Case G
if (accumulatedSize >= params.minBytes)
- forceComplete()
+ forceComplete()
else
false
}
@@ -154,9 +154,9 @@ class DelayedFetch(
* Upon completion, read whatever data is available and pass to the complete
callback
*/
override def onComplete(): Unit = {
- val fetchInfos = fetchPartitionStatus.map { case (tp, status) =>
+ val fetchInfos = fetchPartitionStatus.asScala.iterator.map { case (tp,
status) =>
tp -> status.fetchInfo
- }
+ }.toBuffer
val logReadResults = replicaManager.readFromLog(
params,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index a6d0cf26b03..1b3566ea3da 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1628,7 +1628,7 @@ class ReplicaManager(val config: KafkaConfig,
params: FetchParams,
responseCallback: Seq[(TopicIdPartition,
FetchPartitionData)] => Unit,
logReadResults:
util.LinkedHashMap[TopicIdPartition, LogReadResult],
- fetchPartitionStatus:
Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
+ fetchPartitionStatus:
util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]): Unit = {
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
val remoteFetchResults = new util.HashMap[TopicIdPartition,
CompletableFuture[RemoteLogReadResult]]
@@ -1643,7 +1643,7 @@ class ReplicaManager(val config: KafkaConfig,
remoteFetchResults,
remoteFetchInfos,
remoteFetchMaxWaitMs,
- fetchPartitionStatus.toMap.asJava,
+ fetchPartitionStatus,
params,
logReadResults,
tp => getPartitionOrException(tp),
@@ -1710,17 +1710,17 @@ class ReplicaManager(val config: KafkaConfig,
responseCallback(fetchPartitionData)
} else {
// construct the fetch results from the read results
- val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition,
FetchPartitionStatus)]
+ val fetchPartitionStatus = new util.LinkedHashMap[TopicIdPartition,
FetchPartitionStatus]
fetchInfos.foreach { case (topicIdPartition, partitionData) =>
val logReadResult = logReadResultMap.get(topicIdPartition)
if (logReadResult != null) {
val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
- fetchPartitionStatus += (topicIdPartition -> new
FetchPartitionStatus(logOffsetMetadata, partitionData))
+ fetchPartitionStatus.put(topicIdPartition, new
FetchPartitionStatus(logOffsetMetadata, partitionData))
}
}
if (!remoteFetchInfos.isEmpty) {
- processRemoteFetches(remoteFetchInfos, params, responseCallback,
logReadResultMap, fetchPartitionStatus.toSeq)
+ processRemoteFetches(remoteFetchInfos, params, responseCallback,
logReadResultMap, fetchPartitionStatus)
} else {
// If there is not enough data to respond and there is no remote data,
we will let the fetch request
// wait for new data.
@@ -1733,12 +1733,15 @@ class ReplicaManager(val config: KafkaConfig,
)
// create a list of (topic, partition) pairs to use as keys for this
delayed fetch operation
- val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new
TopicPartitionOperationKey(tp) }.toList
+ val delayedFetchKeys = fetchPartitionStatus.keySet()
+ .stream()
+ .map(new TopicPartitionOperationKey(_))
+ .toList()
// try to complete the request immediately, otherwise put it into the
purgatory;
// this is because while the delayed fetch operation is being created,
new requests
// may arrive and hence make this operation completable.
- delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch,
delayedFetchKeys.asJava)
+ delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch,
delayedFetchKeys)
}
}
}
diff --git
a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index fa3b8465d65..5a3e5e7959b 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -34,6 +34,8 @@ import org.junit.jupiter.params.provider.ValueSource
import org.mockito.ArgumentMatchers.{any, anyInt}
import org.mockito.Mockito.{mock, when}
+import java.util
+
class DelayedFetchTest {
private val maxBytes = 1024
private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
@@ -59,7 +61,7 @@ class DelayedFetchTest {
val delayedFetch = new DelayedFetch(
params = fetchParams,
- fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+ fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition,
fetchStatus),
replicaManager = replicaManager,
quota = replicaQuota,
responseCallback = callback
@@ -105,7 +107,7 @@ class DelayedFetchTest {
val delayedFetch = new DelayedFetch(
params = fetchParams,
- fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+ fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition,
fetchStatus),
replicaManager = replicaManager,
quota = replicaQuota,
responseCallback = callback
@@ -145,7 +147,7 @@ class DelayedFetchTest {
val delayedFetch = new DelayedFetch(
params = fetchParams,
- fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+ fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition,
fetchStatus),
replicaManager = replicaManager,
quota = replicaQuota,
responseCallback = callback
@@ -196,7 +198,7 @@ class DelayedFetchTest {
val delayedFetch = new DelayedFetch(
params = fetchParams,
- fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+ fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition,
fetchStatus),
replicaManager = replicaManager,
quota = replicaQuota,
responseCallback = callback
@@ -267,4 +269,9 @@ class DelayedFetchTest {
error)
}
+ private def createFetchPartitionStatusMap(tpId: TopicIdPartition, status:
FetchPartitionStatus): util.LinkedHashMap[TopicIdPartition,
FetchPartitionStatus] = {
+ val statusMap = new util.LinkedHashMap[TopicIdPartition,
FetchPartitionStatus]
+ statusMap.put(tpId, status)
+ statusMap
+ }
}
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 307afad4f5f..93340bfb3b2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -40,6 +40,7 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt,
anyLong}
import org.mockito.Mockito.{mock, when}
import org.mockito.{AdditionalMatchers, ArgumentMatchers}
+import java.util
import scala.jdk.CollectionConverters._
class ReplicaManagerQuotasTest {
@@ -186,7 +187,7 @@ class ReplicaManagerQuotasTest {
new DelayedFetch(
params = fetchParams,
- fetchPartitionStatus = Seq(tp -> fetchPartitionStatus),
+ fetchPartitionStatus = createFetchPartitionStatusMap(tp,
fetchPartitionStatus),
replicaManager = replicaManager,
quota = null,
responseCallback = null
@@ -237,7 +238,7 @@ class ReplicaManagerQuotasTest {
new DelayedFetch(
params = fetchParams,
- fetchPartitionStatus = Seq(tidp -> fetchPartitionStatus),
+ fetchPartitionStatus = createFetchPartitionStatusMap(tidp,
fetchPartitionStatus),
replicaManager = replicaManager,
quota = null,
responseCallback = null
@@ -341,4 +342,9 @@ class ReplicaManagerQuotasTest {
quota
}
+ private def createFetchPartitionStatusMap(tpId: TopicIdPartition, status:
FetchPartitionStatus): util.LinkedHashMap[TopicIdPartition,
FetchPartitionStatus] = {
+ val statusMap = new util.LinkedHashMap[TopicIdPartition,
FetchPartitionStatus]
+ statusMap.put(tpId, status)
+ statusMap
+ }
}