This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 20d975a6802 MINOR: Replace Map conversions with MetricsUtils.getTags
for metric tags (#21948)
20d975a6802 is described below
commit 20d975a6802293fdb7f009769119899d5b4c6eda
Author: TaiJuWu <[email protected]>
AuthorDate: Wed Apr 8 02:28:29 2026 +0800
MINOR: Replace Map conversions with MetricsUtils.getTags for metric tags
(#21948)
Replace HashMap with LinkedHashMap for metrics tag maps so that JMX
MBean names always reflect a deterministic, insertion-order tag
sequence. Also migrate Scala core module tag construction to use the
existing MetricsUtils.getTags(...) helper for consistency.
Reviewers: Chia-Ping Tsai <[email protected]>, Ken Huang
<[email protected]>, Chia-Yi Chiu <[email protected]>
---
core/src/main/scala/kafka/cluster/Partition.scala | 5 +++--
core/src/main/scala/kafka/network/RequestChannel.scala | 6 ++++--
core/src/main/scala/kafka/network/SocketServer.scala | 13 ++++++++-----
.../main/scala/kafka/server/AbstractFetcherManager.scala | 4 ++--
.../src/main/scala/kafka/server/AbstractFetcherThread.scala | 10 +++-------
core/src/main/scala/kafka/server/BrokerBlockingSender.scala | 5 ++---
core/src/main/scala/kafka/server/DelayedFetch.scala | 5 +++--
.../org/apache/kafka/server/purgatory/DelayedProduce.java | 5 +++--
.../kafka/server/purgatory/DelayedRemoteListOffsets.java | 5 +++--
.../server/purgatory/DelayedRemoteListOffsetsTest.java | 3 ++-
10 files changed, 33 insertions(+), 28 deletions(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala
b/core/src/main/scala/kafka/cluster/Partition.scala
index ccb6ddac4de..f303b0fede7 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -41,6 +41,7 @@ import org.apache.kafka.server.common.{RequestLocal,
TransactionVersion}
import org.apache.kafka.server.log.remote.TopicPartitionLog
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.storage.internals.log.{AppendOrigin,
AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogManager,
LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo,
LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog,
VerificationGuard}
+import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.partition.{AlterPartitionListener,
AssignmentState, CommittedPartitionState, OngoingReassignmentState,
PartitionListener, PartitionState, PendingExpandIsr, PendingPartitionChange,
PendingShrinkIsr, SimpleAssignmentState}
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords,
DelayedOperationPurgatory, DelayedProduce, TopicPartitionOperationKey}
@@ -131,7 +132,7 @@ object Partition {
}
def removeMetrics(topicPartition: TopicPartition): Unit = {
- val tags = Map("topic" -> topicPartition.topic, "partition" ->
topicPartition.partition.toString).asJava
+ val tags = MetricsUtils.getTags("topic", topicPartition.topic,
"partition", topicPartition.partition.toString)
metricsGroup.removeMetric("UnderReplicated", tags)
metricsGroup.removeMetric("UnderMinIsr", tags)
metricsGroup.removeMetric("InSyncReplicasCount", tags)
@@ -215,7 +216,7 @@ class Partition(val topicPartition: TopicPartition,
this.logIdent = s"[Partition $topicPartition broker=$localBrokerId] "
- private val tags = Map("topic" -> topic, "partition" ->
partitionId.toString).asJava
+ private val tags = MetricsUtils.getTags("topic", topic, "partition",
partitionId.toString)
metricsGroup.newGauge("UnderReplicated", () => if (isUnderReplicated) 1 else
0, tags)
metricsGroup.newGauge("InSyncReplicasCount", () => if (isLeader)
partitionState.isr.size else 0, tags)
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 25155145b5a..d3e99cbba43 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.server.common.RequestLocal
+import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.network.{RequestConvertToJson, Session}
import org.apache.kafka.server.config.AbstractKafkaConfig
@@ -367,12 +368,13 @@ class RequestChannel(val queueSize: Int,
warn(s"Unexpected processor with processorId ${processor.id}")
metricsGroup.newGauge(ResponseQueueSizeMetric, () =>
processor.responseQueueSize,
- Map(ProcessorMetricTag -> processor.id.toString).asJava)
+ MetricsUtils.getTags(ProcessorMetricTag, processor.id.toString))
}
def removeProcessor(processorId: Int): Unit = {
processors.remove(processorId)
- metricsGroup.removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag
-> processorId.toString).asJava)
+ metricsGroup.removeMetric(ResponseQueueSizeMetric,
+ MetricsUtils.getTags(ProcessorMetricTag, processorId.toString))
}
/** Send a request to be handled, potentially blocking until there is room
in the queue for the request */
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 590be14e2dd..471dfb3242e 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -46,6 +46,7 @@ import org.apache.kafka.network.{ConnectionQuotaEntity,
ConnectionThrottledExcep
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
import org.apache.kafka.server.config.QuotaConfig
+import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.ConnectionDisconnectListener
import org.apache.kafka.server.quota.QuotaUtils
@@ -496,7 +497,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
private val backwardCompatibilityMetricGroup = new
KafkaMetricsGroup("kafka.network", "Acceptor")
private val blockedPercentMeterMetricName =
backwardCompatibilityMetricGroup.metricName(
"AcceptorBlockedPercent",
- Map(ListenerMetricTag -> endPoint.listener).asJava)
+ MetricsUtils.getTags(ListenerMetricTag, endPoint.listener))
private val blockedPercentMeter =
backwardCompatibilityMetricGroup.newMeter(blockedPercentMeterMetricName,"blocked
time", TimeUnit.NANOSECONDS)
private var currentProcessorIndex = 0
private[network] val throttledSockets = new
mutable.PriorityQueue[DelayedCloseSocket]()
@@ -839,7 +840,7 @@ private[kafka] class Processor(
},
// for compatibility, only add a networkProcessor tag to the Yammer
Metrics alias (the equivalent Selector metric
// also includes the listener name)
- Map(NetworkProcessorMetricTag -> id.toString).asJava
+ MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString)
)
private val expiredConnectionsKilledCount = new CumulativeSum()
@@ -1195,7 +1196,8 @@ private[kafka] class Processor(
close(channel.id)
}
selector.close()
- metricsGroup.removeMetric(IdlePercentMetricName,
Map(NetworkProcessorMetricTag -> id.toString).asJava)
+ metricsGroup.removeMetric(IdlePercentMetricName,
+ MetricsUtils.getTags(NetworkProcessorMetricTag, id.toString))
}
// 'protected` to allow override for testing
@@ -1266,7 +1268,8 @@ private[kafka] class Processor(
Utils.swallow(this.logger.underlying, Level.ERROR, () => closeAll())
}
} finally {
- metricsGroup.removeMetric("IdlePercent", Map("networkProcessor" ->
id.toString).asJava)
+ metricsGroup.removeMetric("IdlePercent",
+ MetricsUtils.getTags("networkProcessor", id.toString))
metrics.removeMetric(expiredConnectionsKilledCountMetricName)
}
}
@@ -1686,7 +1689,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time,
metrics: Metrics) extend
val metricName =
metrics.metricName(s"${throttlePrefix}connection-accept-throttle-time",
JSocketServer.METRICS_GROUP,
"Tracking average throttle-time, out of non-zero throttle times, per
listener",
- Map(ListenerMetricTag -> listener.value).asJava)
+ MetricsUtils.getTags(ListenerMetricTag, listener.value))
sensor.add(metricName, new Avg)
sensor
}
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 958c92ac185..e371da9021c 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -20,12 +20,12 @@ package kafka.server
import kafka.utils.Logging
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.PartitionFetchState
import scala.collection.{Map, Set, mutable}
-import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
abstract class AbstractFetcherManager[T <: AbstractFetcherThread](val name:
String, clientId: String, numFetchers: Int)
@@ -43,7 +43,7 @@ abstract class AbstractFetcherManager[T <:
AbstractFetcherThread](val name: Stri
val failedPartitions = new FailedPartitions
this.logIdent = "[" + name + "] "
- private val tags = Map("clientId" -> clientId).asJava
+ private val tags = MetricsUtils.getTags("clientId", clientId)
metricsGroup.newGauge("MaxLag", () => {
// current max lag across all fetchers/topics/partitions
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 107e12a695f..8e50042f348 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.{ClientIdAndBroker,
InvalidRecordException, Topic
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.server.{LeaderEndPoint, PartitionFetchState,
ReplicaState, ResultWithPartitions}
import
org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
+import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.{LockUtils, ShutdownableThread}
import org.apache.kafka.storage.internals.log.LogAppendInfo
@@ -930,10 +931,7 @@ class FetcherLagMetrics(metricId: ClientIdTopicPartition) {
private val metricsGroup = new KafkaMetricsGroup(metricsPackage,
metricsClassName)
private[this] val lagVal = new AtomicLong(-1L)
- private[this] val tags = Map(
- "clientId" -> metricId.clientId,
- "topic" -> metricId.topicPartition.topic,
- "partition" -> metricId.topicPartition.partition.toString).asJava
+ private[this] val tags = MetricsUtils.getTags("clientId", metricId.clientId,
"topic", metricId.topicPartition.topic, "partition",
metricId.topicPartition.partition.toString)
metricsGroup.newGauge(FetcherMetrics.ConsumerLag, () => lagVal.get, tags)
@@ -971,9 +969,7 @@ class FetcherStats(metricId: ClientIdAndBroker) {
private val metricsClassName = "FetcherStats"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage,
metricsClassName)
- val tags: util.Map[String, String] = Map("clientId" -> metricId.clientId,
- "brokerHost" -> metricId.brokerHost,
- "brokerPort" -> metricId.brokerPort.toString).asJava
+ val tags: util.Map[String, String] = MetricsUtils.getTags("clientId",
metricId.clientId, "brokerHost", metricId.brokerHost, "brokerPort",
metricId.brokerPort.toString)
val requestRate: Meter =
metricsGroup.newMeter(FetcherMetrics.RequestsPerSec, "requests",
TimeUnit.SECONDS, tags)
diff --git a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
index c8405be6c6e..b4de360d532 100644
--- a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
+++ b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
@@ -26,10 +26,9 @@ import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.clients.{ApiVersions, ClientResponse,
ManualMetadataUpdater, NetworkClient}
import org.apache.kafka.common.{Node, Reconfigurable}
import org.apache.kafka.common.requests.AbstractRequest.Builder
+import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.network.BrokerEndPoint
-import scala.jdk.CollectionConverters._
-
trait BlockingSend {
def brokerEndPoint(): BrokerEndPoint
@@ -74,7 +73,7 @@ class BrokerBlockingSender(sourceBroker: BrokerEndPoint,
metrics,
time,
"replica-fetcher",
- Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" ->
fetcherId.toString).asJava,
+ MetricsUtils.getTags("broker-id", sourceBroker.id.toString,
"fetcher-id", fetcherId.toString),
false,
channelBuilder,
logContext
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index f73acd19d98..88e43ec9a22 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH,
UNDEFINED_EPOCH_OFFSET}
+import org.apache.kafka.common.metrics.internals.MetricsUtils
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.quota.ReplicaQuota
@@ -183,7 +184,7 @@ object DelayedFetchMetrics {
private val metricsClassName = "DelayedFetchMetrics"
private val metricsGroup = new KafkaMetricsGroup(metricsPackage,
metricsClassName)
private val FetcherTypeKey = "fetcherType"
- val followerExpiredRequestMeter: Meter =
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS,
Map(FetcherTypeKey -> "follower").asJava)
- val consumerExpiredRequestMeter: Meter =
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS,
Map(FetcherTypeKey -> "consumer").asJava)
+ val followerExpiredRequestMeter: Meter =
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS,
MetricsUtils.getTags(FetcherTypeKey, "follower"))
+ val consumerExpiredRequestMeter: Meter =
metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS,
MetricsUtils.getTags(FetcherTypeKey, "consumer"))
}
diff --git
a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
index 711c45cff73..b7ce44686d1 100644
--- a/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
+++ b/server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java
@@ -18,6 +18,7 @@ package org.apache.kafka.server.purgatory;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.internals.MetricsUtils;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
@@ -200,14 +201,14 @@ public class DelayedProduce extends DelayedOperation {
key -> METRICS_GROUP.newMeter("ExpiresPerSec",
"requests",
TimeUnit.SECONDS,
- Map.of("topic", key.topic(), "partition",
String.valueOf(key.partition())))
+ MetricsUtils.getTags("topic", key.topic(),
"partition", String.valueOf(key.partition())))
).mark();
}
public static void removePartitionMetrics(TopicPartition partition) {
if (PARTITION_EXPIRATION_METERS.remove(partition) != null) {
METRICS_GROUP.removeMetric("ExpiresPerSec",
- Map.of("topic", partition.topic(),
+ MetricsUtils.getTags("topic", partition.topic(),
"partition",
String.valueOf(partition.partition())));
}
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
index e992a8d2550..9980c810179 100644
---
a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
+++
b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsets.java
@@ -19,6 +19,7 @@ package org.apache.kafka.server.purgatory;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.metrics.internals.MetricsUtils;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.requests.ListOffsetsResponse;
@@ -186,13 +187,13 @@ public class DelayedRemoteListOffsets extends
DelayedOperation {
PARTITION_EXPIRATION_METERS.computeIfAbsent(partition, tp ->
METRICS_GROUP.newMeter("ExpiresPerSec",
"requests",
TimeUnit.SECONDS,
- Map.of("topic", tp.topic(), "partition",
String.valueOf(tp.partition())))).mark();
+ MetricsUtils.getTags("topic", tp.topic(), "partition",
String.valueOf(tp.partition())))).mark();
}
public static void removePartitionMetrics(TopicPartition partition) {
if (PARTITION_EXPIRATION_METERS.remove(partition) != null) {
METRICS_GROUP.removeMetric("ExpiresPerSec",
- Map.of("topic", partition.topic(), "partition",
String.valueOf(partition.partition())));
+ MetricsUtils.getTags("topic", partition.topic(),
"partition", String.valueOf(partition.partition())));
}
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
index 952a47e59c5..edcddb64aa2 100644
---
a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteListOffsetsTest.java
@@ -21,6 +21,7 @@ import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.message.ListOffsetsResponseData;
+import org.apache.kafka.common.metrics.internals.MetricsUtils;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.requests.ListOffsetsResponse;
@@ -277,7 +278,7 @@ public class DelayedRemoteListOffsetsTest {
DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.computeIfAbsent(partition,
tp ->
DelayedRemoteListOffsets.METRICS_GROUP.newMeter("ExpiresPerSec",
"requests", TimeUnit.SECONDS,
- Map.of("topic", tp.topic(), "partition",
String.valueOf(tp.partition()))));
+ MetricsUtils.getTags("topic", tp.topic(), "partition",
String.valueOf(tp.partition()))));
// Verify the partition metric exists in the map
assertTrue(DelayedRemoteListOffsets.PARTITION_EXPIRATION_METERS.containsKey(partition),