This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 20d975a6802 MINOR: Replace Map conversions with MetricsUtils.getTags 
for metric tags (#21948)
20d975a6802 is described below

commit 20d975a6802293fdb7f009769119899d5b4c6eda
Author: TaiJuWu <[email protected]>
AuthorDate: Wed Apr 8 02:28:29 2026 +0800

    MINOR: Replace Map conversions with MetricsUtils.getTags for metric tags 
(#21948)
    
    Replace HashMap with LinkedHashMap for metrics tag maps so that JMX
    MBean names always reflect a deterministic,  insertion-order tag
    sequence. Also migrate Scala core module tag  construction to use the
    existing MetricsUtils.getTags(...) helper for  consistency.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Ken Huang
     <[email protected]>, Chia-Yi Chiu <[email protected]>
---
 core/src/main/scala/kafka/cluster/Partition.scala           |  5 +++--
 core/src/main/scala/kafka/network/RequestChannel.scala      |  6 ++++--
 core/src/main/scala/kafka/network/SocketServer.scala        | 13 ++++++++-----
 .../main/scala/kafka/server/AbstractFetcherManager.scala    |  4 ++--
 .../src/main/scala/kafka/server/AbstractFetcherThread.scala | 10 +++-------
 core/src/main/scala/kafka/server/BrokerBlockingSender.scala |  5 ++---
 core/src/main/scala/kafka/server/DelayedFetch.scala         |  5 +++--
 .../org/apache/kafka/server/purgatory/DelayedProduce.java   |  5 +++--
 .../kafka/server/purgatory/DelayedRemoteListOffsets.java    |  5 +++--
 .../server/purgatory/DelayedRemoteListOffsetsTest.java      |  3 ++-
 10 files changed, 33 insertions(+), 28 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index ccb6ddac4de..f303b0fede7 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -41,6 +41,7 @@ import org.apache.kafka.server.common.{RequestLocal, 
TransactionVersion}
 import org.apache.kafka.server.log.remote.TopicPartitionLog
 import org.apache.kafka.server.log.remote.storage.RemoteLogManager
 import org.apache.kafka.storage.internals.log.{AppendOrigin, 
AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogManager, 
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, 
LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, 
VerificationGuard}
+import org.apache.kafka.common.metrics.internals.MetricsUtils
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.partition.{AlterPartitionListener, 
AssignmentState, CommittedPartitionState, OngoingReassignmentState, 
PartitionListener, PartitionState, PendingExpandIsr, PendingPartitionChange, 
PendingShrinkIsr, SimpleAssignmentState}
 import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, 
DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
@@ -131,7 +132,7 @@ object Partition {
   }
 
   def removeMetrics(topicPartition: TopicPartition): Unit = {
-    val tags = Map("topic" -> topicPartition.topic, "partition" -> 
topicPartition.partition.toString).asJava
+    val tags = MetricsUtils.getTags("topic", topicPartition.topic, 
"partition", topicPartition.partition.toString)
     metricsGroup.removeMetric("UnderReplicated", tags)
     metricsGroup.removeMetric("UnderMinIsr", tags)
     metricsGroup.removeMetric("InSyncReplicasCount", tags)
@@ -215,7 +216,7 @@ class Partition(val topicPartition: TopicPartition,
 
   this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
 
-  private val tags = Map("topic" -> topic, "partition" -> 
partitionId.toString).asJava
+  private val tags = MetricsUtils.getTags("topic", topic, "partition", 
partitionId.toString)
 
   metricsGroup.newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else 
0, tags)
   metricsGroup.newGauge("InSyncReplicasCount", () => if (isLeader) 
partitionState.isr.size else 0, tags)
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 25155145b5a..d3e99cbba43 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
 import org.apache.kafka.server.common.RequestLocal
+import org.apache.kafka.common.metrics.internals.MetricsUtils
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.network.{RequestConvertToJson, Session}
 import org.apache.kafka.server.config.AbstractKafkaConfig
@@ -367,12 +368,13 @@ class RequestChannel(val queueSize: Int,
       warn(s"Unexpected processor with processorId ${processor.id}")
 
     metricsGroup.newGauge(ResponseQueueSizeMetric, () => 
processor.responseQueueSize,
-      Map(ProcessorMetricTag -> processor.id.toString).asJava)
+      MetricsUtils.getTags(ProcessorMetricTag, processor.id.toString))
   }
 
   def removeProcessor(processorId: Int): Unit = {
     processors.remove(processorId)
-    metricsGroup.removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag 
-> processorId.toString).asJava)
+    metricsGroup.removeMetric(ResponseQueueSizeMetric,
+      MetricsUtils.getTags(ProcessorMetricTag, processorId.toString))
   }
 
   /** Send a request to be handled, potentially blocking until there is room 
in the queue for the request */
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 590be14e2dd..471dfb3242e 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -46,6 +46,7 @@ import org.apache.kafka.network.{ConnectionQuotaEntity, 
ConnectionThrottledExcep
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
 import org.apache.kafka.server.config.QuotaConfig
+import org.apache.kafka.common.metrics.internals.MetricsUtils
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.network.ConnectionDisconnectListener
 import org.apache.kafka.server.quota.QuotaUtils
@@ -496,7 +497,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private val backwardCompatibilityMetricGroup = new 
KafkaMetricsGroup("kafka.network", "Acceptor")
   private val blockedPercentMeterMetricName = 
backwardCompatibilityMetricGroup.metricName(
     "AcceptorBlockedPercent",
-    Map(ListenerMetricTag -> endPoint.listener).asJava)
+    MetricsUtils.getTags(ListenerMetricTag, endPoint.listener))
   private val blockedPercentMeter = 
backwardCompatibilityMetricGroup.newMeter(blockedPercentMeterMetricName,"blocked
 time", TimeUnit.NANOSECONDS)
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()
@@ -839,7 +840,7 @@ private[kafka] class Processor(
   },
     // for compatibility, only add a networkProcessor tag to the Yammer 
Metrics alias (the equivalent Selector metric
     // also includes the listener name)
-    Map(NetworkProcessorMetricTag -> id.toString).asJava
+    MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString)
   )
 
   private val expiredConnectionsKilledCount = new CumulativeSum()
@@ -1195,7 +1196,8 @@ private[kafka] class Processor(
       close(channel.id)
     }
     selector.close()
-    metricsGroup.removeMetric(IdlePercentMetricName, 
Map(NetworkProcessorMetricTag -> id.toString).asJava)
+    metricsGroup.removeMetric(IdlePercentMetricName,
+      MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString))
   }
 
   // 'protected` to allow override for testing
@@ -1266,7 +1268,8 @@ private[kafka] class Processor(
         Utils.swallow(this.logger.underlying, Level.ERROR, () => closeAll())
       }
     } finally {
-      metricsGroup.removeMetric("IdlePercent", Map("networkProcessor" -> 
id.toString).asJava)
+      metricsGroup.removeMetric("IdlePercent",
+        MetricsUtils.getTags("networkProcessor", id.toString))
       metrics.removeMetric(expiredConnectionsKilledCountMetricName)
     }
   }
@@ -1686,7 +1689,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
       val metricName = 
metrics.metricName(s"${throttlePrefix}connection-accept-throttle-time",
         JSocketServer.METRICS_GROUP,
         "Tracking average throttle-time, out of non-zero throttle times, per 
listener",
-        Map(ListenerMetricTag -> listener.value).asJava)
+        MetricsUtils.getTags(ListenerMetricTag, listener.value))
       sensor.add(metricName, new Avg)
       sensor
     }
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 958c92ac185..e371da9021c 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -20,12 +20,12 @@ package kafka.server
 import kafka.utils.Logging
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.metrics.internals.MetricsUtils
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.network.BrokerEndPoint
 import org.apache.kafka.server.PartitionFetchState
 
 import scala.collection.{Map, Set, mutable}
-import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters._
 
 abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name: 
String, clientId: String, numFetchers: Int)
@@ -43,7 +43,7 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
   val failedPartitions = new FailedPartitions
   this.logIdent = "[" + name + "] "
 
-  private val tags = Map("clientId" -> clientId).asJava
+  private val tags = MetricsUtils.getTags("clientId", clientId)
 
   metricsGroup.newGauge("MaxLag", () => {
     // current max lag across all fetchers/topics/partitions
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 107e12a695f..8e50042f348 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.{ClientIdAndBroker, 
InvalidRecordException, Topic
 import org.apache.kafka.server.common.OffsetAndEpoch
 import org.apache.kafka.server.{LeaderEndPoint, PartitionFetchState, 
ReplicaState, ResultWithPartitions}
 import 
org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
+import org.apache.kafka.common.metrics.internals.MetricsUtils
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.{LockUtils, ShutdownableThread}
 import org.apache.kafka.storage.internals.log.LogAppendInfo
@@ -930,10 +931,7 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) {
   private val metricsGroup = new KafkaMetricsGroup(metricsPackage, 
metricsClassName)
 
   private[this] val lagVal = new AtomicLong(-1L)
-  private[this] val tags = Map(
-    "clientId" -> metricId.clientId,
-    "topic" -> metricId.topicPartition.topic,
-    "partition" -> metricId.topicPartition.partition.toString).asJava
+  private[this] val tags = MetricsUtils.getTags("clientId", metricId.clientId, 
"topic", metricId.topicPartition.topic, "partition", 
metricId.topicPartition.partition.toString)
 
   metricsGroup.newGauge(FetcherMetrics.ConsumerLag, () => lagVal.get, tags)
 
@@ -971,9 +969,7 @@ class FetcherStats(metricId: ClientIdAndBroker) {
   private val metricsClassName = "FetcherStats"
   private val metricsGroup = new KafkaMetricsGroup(metricsPackage, 
metricsClassName)
 
-  val tags: util.Map[String, String] = Map("clientId" -> metricId.clientId,
-    "brokerHost" -> metricId.brokerHost,
-    "brokerPort" -> metricId.brokerPort.toString).asJava
+  val tags: util.Map[String, String] = MetricsUtils.getTags("clientId", 
metricId.clientId, "brokerHost", metricId.brokerHost, "brokerPort", 
metricId.brokerPort.toString)
 
   val requestRate: Meter = 
metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, "requests", 
TimeUnit.SECONDS, tags)
 
diff --git a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala 
b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
index c8405be6c6e..b4de360d532 100644
--- a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
+++ b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
@@ -26,10 +26,9 @@ import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.clients.{ApiVersions, ClientResponse, 
ManualMetadataUpdater, NetworkClient}
 import org.apache.kafka.common.{Node, Reconfigurable}
 import org.apache.kafka.common.requests.AbstractRequest.Builder
+import org.apache.kafka.common.metrics.internals.MetricsUtils
 import org.apache.kafka.server.network.BrokerEndPoint
 
-import scala.jdk.CollectionConverters._
-
 trait BlockingSend {
 
   def brokerEndPoint(): BrokerEndPoint
@@ -74,7 +73,7 @@ class BrokerBlockingSender(sourceBroker: BrokerEndPoint,
       metrics,
       time,
       "replica-fetcher",
-      Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> 
fetcherId.toString).asJava,
+      MetricsUtils.getTags("broker-id", sourceBroker.id.toString, 
"fetcher-id", fetcherId.toString),
       false,
       channelBuilder,
       logContext
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index f73acd19d98..88e43ec9a22 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicIdPartition
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.protocol.Errors
 import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
 UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.common.metrics.internals.MetricsUtils
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.purgatory.DelayedOperation
 import org.apache.kafka.server.quota.ReplicaQuota
@@ -183,7 +184,7 @@ object DelayedFetchMetrics {
   private val metricsClassName = "DelayedFetchMetrics"
   private val metricsGroup = new KafkaMetricsGroup(metricsPackage, 
metricsClassName)
   private val FetcherTypeKey = "fetcherType"
-  val followerExpiredRequestMeter: Meter = 
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, 
Map(FetcherTypeKey -> "follower").asJava)
-  val consumerExpiredRequestMeter: Meter = 
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, 
Map(FetcherTypeKey -> "consumer").asJava)
+  val followerExpiredRequestMeter: Meter = 
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, 
MetricsUtils.getTags(FetcherTypeKey, "follower"))
+  val consumerExpiredRequestMeter: Meter = 
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, 
MetricsUtils.getTags(FetcherTypeKey, "consumer"))
 }
 
diff --git 
a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java 
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
index 711c45cff73..b7ce44686d1 100644
--- a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
+++ b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
@@ -18,6 +18,7 @@ package org.apache.kafka.server.purgatory;
 
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.internals.MetricsUtils;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
 import org.apache.kafka.server.metrics.KafkaMetricsGroup;
@@ -200,14 +201,14 @@ public class DelayedProduce extends DelayedOperation {
                 key -> METRICS_GROUP.newMeter("ExpiresPerSec",
                         "requests",
                         TimeUnit.SECONDS,
-                        Map.of("topic", key.topic(), "partition", 
String.valueOf(key.partition())))
+                        MetricsUtils.getTags("topic", key.topic(), 
"partition", String.valueOf(key.partition())))
         ).mark();
     }
 
     public static void removePartitionMetrics(TopicPartition partition) {
         if (PARTITION_EXPIRATION_METERS.remove(partition) != null) {
             METRICS_GROUP.removeMetric("ExpiresPerSec",
-                    Map.of("topic", partition.topic(),
+                    MetricsUtils.getTags("topic", partition.topic(),
                             "partition", 
String.valueOf(partition.partition())));
         }
     }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
 
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
index e992a8d2550..9980c810179 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
@@ -19,6 +19,7 @@ package org.apache.kafka.server.purgatory;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.metrics.internals.MetricsUtils;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.internal.FileRecords;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
@@ -186,13 +187,13 @@ public class DelayedRemoteListOffsets extends 
DelayedOperation {
         PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, tp -> 
METRICS_GROUP.newMeter("ExpiresPerSec",
                 "requests",
                 TimeUnit.SECONDS,
-                Map.of("topic", tp.topic(), "partition", 
String.valueOf(tp.partition())))).mark();
+                MetricsUtils.getTags("topic", tp.topic(), "partition", 
String.valueOf(tp.partition())))).mark();
     }
 
     public static void removePartitionMetrics(TopicPartition partition) {
         if (PARTITION_EXPIRATION_METERS.remove(partition) != null) {
             METRICS_GROUP.removeMetric("ExpiresPerSec",
-                    Map.of("topic", partition.topic(), "partition", 
String.valueOf(partition.partition())));
+                    MetricsUtils.getTags("topic", partition.topic(), 
"partition", String.valueOf(partition.partition())));
         }
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
 
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
index 952a47e59c5..edcddb64aa2 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
@@ -21,6 +21,7 @@ import kafka.utils.TestUtils;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.metrics.internals.MetricsUtils;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.internal.FileRecords;
 import org.apache.kafka.common.requests.ListOffsetsResponse;
@@ -277,7 +278,7 @@ public class DelayedRemoteListOffsetsTest {
         
DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, 
tp ->
                 
DelayedRemoteListOffsets.METRICS_GROUP.newMeter("ExpiresPerSec",
                         "requests", TimeUnit.SECONDS,
-                        Map.of("topic", tp.topic(), "partition", 
String.valueOf(tp.partition()))));
+                        MetricsUtils.getTags("topic", tp.topic(), "partition", 
String.valueOf(tp.partition()))));
 
         // Verify the partition metric exists in the map
         
assertTrue(DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.containsKey(partition),

Reply via email to