This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 54cf2ebc9be MINOR: Initialize fetchPartitionStatus as a Map type to 
reduce collection conversions (#20768)
54cf2ebc9be is described below

commit 54cf2ebc9bef51a5597a407c361504f2c8e9c929
Author: Lan Ding <[email protected]>
AuthorDate: Sun Nov 2 22:27:41 2025 +0800

    MINOR: Initialize fetchPartitionStatus as a Map type to reduce collection 
conversions (#20768)
    
    see https://github.com/apache/kafka/pull/19876#discussion_r2461128452
    Initialize `fetchPartitionStatus` as a `Map` type to reduce unnecessary
    collection conversions.
    
    Reviewers: Ismael Juma <[email protected]>, Kamal Chandraprakash
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../src/main/scala/kafka/server/DelayedFetch.scala | 132 ++++++++++-----------
 .../main/scala/kafka/server/ReplicaManager.scala   |  17 +--
 .../kafka/server/DelayedFetchTest.scala            |  15 ++-
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  10 +-
 4 files changed, 95 insertions(+), 79 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index ebdc0000440..0c9a561db57 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.server.purgatory.DelayedOperation
 import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, 
FetchPartitionData}
 import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, 
LogOffsetMetadata}
 
+import java.util
 import scala.collection._
 import scala.jdk.CollectionConverters._
 
@@ -39,7 +40,7 @@ import scala.jdk.CollectionConverters._
  */
 class DelayedFetch(
   params: FetchParams,
-  fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
+  fetchPartitionStatus: util.LinkedHashMap[TopicIdPartition, 
FetchPartitionStatus],
   replicaManager: ReplicaManager,
   quota: ReplicaQuota,
   responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
@@ -66,79 +67,78 @@ class DelayedFetch(
    */
   override def tryComplete(): Boolean = {
     var accumulatedSize = 0
-    fetchPartitionStatus.foreach {
-      case (topicIdPartition, fetchStatus) =>
-        val fetchOffset = fetchStatus.startOffsetMetadata
-        val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
-        try {
-          if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
-            val partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
-            val offsetSnapshot = 
partition.fetchOffsetSnapshot(fetchLeaderEpoch, params.fetchOnlyLeader)
-
-            val endOffset = params.isolation match {
-              case FetchIsolation.LOG_END => offsetSnapshot.logEndOffset
-              case FetchIsolation.HIGH_WATERMARK => 
offsetSnapshot.highWatermark
-              case FetchIsolation.TXN_COMMITTED => 
offsetSnapshot.lastStableOffset
-            }
+    fetchPartitionStatus.forEach { (topicIdPartition, fetchStatus) =>
+      val fetchOffset = fetchStatus.startOffsetMetadata
+      val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
+      try {
+        if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
+          val partition = 
replicaManager.getPartitionOrException(topicIdPartition.topicPartition)
+          val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, 
params.fetchOnlyLeader)
+
+          val endOffset = params.isolation match {
+            case FetchIsolation.LOG_END => offsetSnapshot.logEndOffset
+            case FetchIsolation.HIGH_WATERMARK => offsetSnapshot.highWatermark
+            case FetchIsolation.TXN_COMMITTED => 
offsetSnapshot.lastStableOffset
+          }
 
-            // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
-            // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
-            // which would incorrectly be seen as an instance of Case F.
-            if (fetchOffset.messageOffset > endOffset.messageOffset) {
-              // Case F, this can happen when the new fetch operation is on a 
truncated leader
-              debug(s"Satisfying fetch $this since it is fetching later 
segments of partition $topicIdPartition.")
-              return forceComplete()
-            } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
-              if (fetchOffset.onOlderSegment(endOffset)) {
-                // Case F, this can happen when the fetch operation is falling 
behind the current segment
-                // or the partition has just rolled a new segment
-                debug(s"Satisfying fetch $this immediately since it is 
fetching older segments.")
-                // We will not force complete the fetch request if a replica 
should be throttled.
-                if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
-                  return forceComplete()
-              } else if (fetchOffset.onSameSegment(endOffset)) {
-                // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
-                val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
-                if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
-                  accumulatedSize += bytesAvailable
-              }
+          // Go directly to the check for Case G if the message offsets are 
the same. If the log segment
+          // has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
+          // which would incorrectly be seen as an instance of Case F.
+          if (fetchOffset.messageOffset > endOffset.messageOffset) {
+            // Case F, this can happen when the new fetch operation is on a 
truncated leader
+            debug(s"Satisfying fetch $this since it is fetching later segments 
of partition $topicIdPartition.")
+            return forceComplete()
+          } else if (fetchOffset.messageOffset < endOffset.messageOffset) {
+            if (fetchOffset.onOlderSegment(endOffset)) {
+              // Case F, this can happen when the fetch operation is falling 
behind the current segment
+              // or the partition has just rolled a new segment
+              debug(s"Satisfying fetch $this immediately since it is fetching 
older segments.")
+              // We will not force complete the fetch request if a replica 
should be throttled.
+              if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
+                return forceComplete()
+            } else if (fetchOffset.onSameSegment(endOffset)) {
+              // we take the partition fetch size as upper bound when 
accumulating the bytes (skip if a throttled partition)
+              val bytesAvailable = 
math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
+              if (!params.isFromFollower || 
!replicaManager.shouldLeaderThrottle(quota, partition, params.replicaId))
+                accumulatedSize += bytesAvailable
             }
+          }
 
-            // Case H: If truncation has caused diverging epoch while this 
request was in purgatory, return to trigger truncation
-            fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch =>
-              val epochEndOffset = 
partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, 
fetchOnlyFromLeader = false)
-              if (epochEndOffset.errorCode != Errors.NONE.code()
-                  || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET
-                  || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) {
-                debug(s"Could not obtain last offset for leader epoch for 
partition $topicIdPartition, epochEndOffset=$epochEndOffset.")
-                return forceComplete()
-              } else if (epochEndOffset.leaderEpoch < fetchEpoch || 
epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) {
-                debug(s"Satisfying fetch $this since it has diverging epoch 
requiring truncation for partition " +
-                  s"$topicIdPartition epochEndOffset=$epochEndOffset 
fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.")
-                return forceComplete()
-              }
+          // Case H: If truncation has caused diverging epoch while this 
request was in purgatory, return to trigger truncation
+          fetchStatus.fetchInfo.lastFetchedEpoch.ifPresent { fetchEpoch =>
+            val epochEndOffset = 
partition.lastOffsetForLeaderEpoch(fetchLeaderEpoch, fetchEpoch, 
fetchOnlyFromLeader = false)
+            if (epochEndOffset.errorCode != Errors.NONE.code()
+              || epochEndOffset.endOffset == UNDEFINED_EPOCH_OFFSET
+              || epochEndOffset.leaderEpoch == UNDEFINED_EPOCH) {
+              debug(s"Could not obtain last offset for leader epoch for 
partition $topicIdPartition, epochEndOffset=$epochEndOffset.")
+              return forceComplete()
+            } else if (epochEndOffset.leaderEpoch < fetchEpoch || 
epochEndOffset.endOffset < fetchStatus.fetchInfo.fetchOffset) {
+              debug(s"Satisfying fetch $this since it has diverging epoch 
requiring truncation for partition " +
+                s"$topicIdPartition epochEndOffset=$epochEndOffset 
fetchEpoch=$fetchEpoch fetchOffset=${fetchStatus.fetchInfo.fetchOffset}.")
+              return forceComplete()
             }
           }
-        } catch {
-          case _: NotLeaderOrFollowerException =>  // Case A or Case B
-            debug(s"Broker is no longer the leader or follower of 
$topicIdPartition, satisfy $this immediately")
-            return forceComplete()
-          case _: UnknownTopicOrPartitionException => // Case C
-            debug(s"Broker no longer knows of partition $topicIdPartition, 
satisfy $this immediately")
-            return forceComplete()
-          case _: KafkaStorageException => // Case D
-            debug(s"Partition $topicIdPartition is in an offline log 
directory, satisfy $this immediately")
-            return forceComplete()
-          case _: FencedLeaderEpochException => // Case E
-            debug(s"Broker is the leader of partition $topicIdPartition, but 
the requested epoch " +
-              s"$fetchLeaderEpoch is fenced by the latest leader epoch, 
satisfy $this immediately")
-            return forceComplete()
         }
+      } catch {
+        case _: NotLeaderOrFollowerException => // Case A or Case B
+          debug(s"Broker is no longer the leader or follower of 
$topicIdPartition, satisfy $this immediately")
+          return forceComplete()
+        case _: UnknownTopicOrPartitionException => // Case C
+          debug(s"Broker no longer knows of partition $topicIdPartition, 
satisfy $this immediately")
+          return forceComplete()
+        case _: KafkaStorageException => // Case D
+          debug(s"Partition $topicIdPartition is in an offline log directory, 
satisfy $this immediately")
+          return forceComplete()
+        case _: FencedLeaderEpochException => // Case E
+          debug(s"Broker is the leader of partition $topicIdPartition, but the 
requested epoch " +
+            s"$fetchLeaderEpoch is fenced by the latest leader epoch, satisfy 
$this immediately")
+          return forceComplete()
+      }
     }
 
     // Case G
     if (accumulatedSize >= params.minBytes)
-       forceComplete()
+      forceComplete()
     else
       false
   }
@@ -154,9 +154,9 @@ class DelayedFetch(
    * Upon completion, read whatever data is available and pass to the complete 
callback
    */
   override def onComplete(): Unit = {
-    val fetchInfos = fetchPartitionStatus.map { case (tp, status) =>
+    val fetchInfos = fetchPartitionStatus.asScala.iterator.map { case (tp, 
status) =>
       tp -> status.fetchInfo
-    }
+    }.toBuffer
 
     val logReadResults = replicaManager.readFromLog(
       params,
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index a6d0cf26b03..1b3566ea3da 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1628,7 +1628,7 @@ class ReplicaManager(val config: KafkaConfig,
                                    params: FetchParams,
                                    responseCallback: Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit,
                                    logReadResults: 
util.LinkedHashMap[TopicIdPartition, LogReadResult],
-                                   fetchPartitionStatus: 
Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
+                                   fetchPartitionStatus: 
util.LinkedHashMap[TopicIdPartition, FetchPartitionStatus]): Unit = {
     val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
     val remoteFetchResults = new util.HashMap[TopicIdPartition, 
CompletableFuture[RemoteLogReadResult]]
 
@@ -1643,7 +1643,7 @@ class ReplicaManager(val config: KafkaConfig,
                                              remoteFetchResults,
                                              remoteFetchInfos,
                                              remoteFetchMaxWaitMs,
-                                             fetchPartitionStatus.toMap.asJava,
+                                             fetchPartitionStatus,
                                              params,
                                              logReadResults,
                                              tp => getPartitionOrException(tp),
@@ -1710,17 +1710,17 @@ class ReplicaManager(val config: KafkaConfig,
       responseCallback(fetchPartitionData)
     } else {
       // construct the fetch results from the read results
-      val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, 
FetchPartitionStatus)]
+      val fetchPartitionStatus = new util.LinkedHashMap[TopicIdPartition, 
FetchPartitionStatus]
       fetchInfos.foreach { case (topicIdPartition, partitionData) =>
         val logReadResult = logReadResultMap.get(topicIdPartition)
         if (logReadResult != null) {
           val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
-          fetchPartitionStatus += (topicIdPartition -> new 
FetchPartitionStatus(logOffsetMetadata, partitionData))
+          fetchPartitionStatus.put(topicIdPartition, new 
FetchPartitionStatus(logOffsetMetadata, partitionData))
         }
       }
 
       if (!remoteFetchInfos.isEmpty) {
-        processRemoteFetches(remoteFetchInfos, params, responseCallback, 
logReadResultMap, fetchPartitionStatus.toSeq)
+        processRemoteFetches(remoteFetchInfos, params, responseCallback, 
logReadResultMap, fetchPartitionStatus)
       } else {
         // If there is not enough data to respond and there is no remote data, 
we will let the fetch request
         // wait for new data.
@@ -1733,12 +1733,15 @@ class ReplicaManager(val config: KafkaConfig,
         )
 
         // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-        val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new 
TopicPartitionOperationKey(tp) }.toList
+        val delayedFetchKeys = fetchPartitionStatus.keySet()
+          .stream()
+          .map(new TopicPartitionOperationKey(_))
+          .toList()
 
         // try to complete the request immediately, otherwise put it into the 
purgatory;
         // this is because while the delayed fetch operation is being created, 
new requests
         // may arrive and hence make this operation completable.
-        delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys.asJava)
+        delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
       }
     }
   }
diff --git 
a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala 
b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index fa3b8465d65..5a3e5e7959b 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -34,6 +34,8 @@ import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.ArgumentMatchers.{any, anyInt}
 import org.mockito.Mockito.{mock, when}
 
+import java.util
+
 class DelayedFetchTest {
   private val maxBytes = 1024
   private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
@@ -59,7 +61,7 @@ class DelayedFetchTest {
 
     val delayedFetch = new DelayedFetch(
       params = fetchParams,
-      fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+      fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, 
fetchStatus),
       replicaManager = replicaManager,
       quota = replicaQuota,
       responseCallback = callback
@@ -105,7 +107,7 @@ class DelayedFetchTest {
 
     val delayedFetch = new DelayedFetch(
       params = fetchParams,
-      fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+      fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, 
fetchStatus),
       replicaManager = replicaManager,
       quota = replicaQuota,
       responseCallback = callback
@@ -145,7 +147,7 @@ class DelayedFetchTest {
 
     val delayedFetch = new DelayedFetch(
       params = fetchParams,
-      fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+      fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, 
fetchStatus),
       replicaManager = replicaManager,
       quota = replicaQuota,
       responseCallback = callback
@@ -196,7 +198,7 @@ class DelayedFetchTest {
 
     val delayedFetch = new DelayedFetch(
       params = fetchParams,
-      fetchPartitionStatus = Seq(topicIdPartition -> fetchStatus),
+      fetchPartitionStatus = createFetchPartitionStatusMap(topicIdPartition, 
fetchStatus),
       replicaManager = replicaManager,
       quota = replicaQuota,
       responseCallback = callback
@@ -267,4 +269,9 @@ class DelayedFetchTest {
       error)
   }
 
+  private def createFetchPartitionStatusMap(tpId: TopicIdPartition, status: 
FetchPartitionStatus): util.LinkedHashMap[TopicIdPartition, 
FetchPartitionStatus] = {
+    val statusMap = new util.LinkedHashMap[TopicIdPartition, 
FetchPartitionStatus]
+    statusMap.put(tpId, status)
+    statusMap
+  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 307afad4f5f..93340bfb3b2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -40,6 +40,7 @@ import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, 
anyLong}
 import org.mockito.Mockito.{mock, when}
 import org.mockito.{AdditionalMatchers, ArgumentMatchers}
 
+import java.util
 import scala.jdk.CollectionConverters._
 
 class ReplicaManagerQuotasTest {
@@ -186,7 +187,7 @@ class ReplicaManagerQuotasTest {
 
       new DelayedFetch(
         params = fetchParams,
-        fetchPartitionStatus = Seq(tp -> fetchPartitionStatus),
+        fetchPartitionStatus = createFetchPartitionStatusMap(tp, 
fetchPartitionStatus),
         replicaManager = replicaManager,
         quota = null,
         responseCallback = null
@@ -237,7 +238,7 @@ class ReplicaManagerQuotasTest {
 
       new DelayedFetch(
         params = fetchParams,
-        fetchPartitionStatus = Seq(tidp -> fetchPartitionStatus),
+        fetchPartitionStatus = createFetchPartitionStatusMap(tidp, 
fetchPartitionStatus),
         replicaManager = replicaManager,
         quota = null,
         responseCallback = null
@@ -341,4 +342,9 @@ class ReplicaManagerQuotasTest {
     quota
   }
 
+  private def createFetchPartitionStatusMap(tpId: TopicIdPartition, status: 
FetchPartitionStatus): util.LinkedHashMap[TopicIdPartition, 
FetchPartitionStatus] = {
+    val statusMap = new util.LinkedHashMap[TopicIdPartition, 
FetchPartitionStatus]
+    statusMap.put(tpId, status)
+    statusMap
+  }
 }

Reply via email to