This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f6698146cee KAFKA-20368 Use LinkedHashMap for deterministic metric
tags order in RLMExpirationTask (#21931)
f6698146cee is described below
commit f6698146ceeb690be76b0ad4f7d4577670f8884d
Author: Chia-Yi Chiu <[email protected]>
AuthorDate: Fri Apr 3 23:22:58 2026 +0900
KAFKA-20368 Use LinkedHashMap for deterministic metric tags order in
RLMExpirationTask (#21931)
Summary: use `MetricsUtils.getTags` to handle
[metricTags](https://github.com/apache/kafka/blob/5a2d79a1ffb31b0ebb8c8b13c4ac6e5a2af7cbce/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java#L1145)
in RLMExpirationTask.
Reviewers: TaiJuWu <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../server/log/remote/storage/RemoteLogManager.java | 7 ++++---
.../server/log/remote/storage/RemoteLogManagerTest.java | 16 ++++++++++------
2 files changed, 14 insertions(+), 9 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index cb2764f142b..c65cf5b1fe4 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Quota;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.internals.MetricsUtils;
import org.apache.kafka.common.record.internal.FileRecords;
import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.Record;
@@ -1142,7 +1143,7 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
private final Logger logger;
private volatile boolean isAllSegmentsValid = false;
private volatile boolean metricsRegistered = false;
- private final Map<String, String> metricTags = new HashMap<>();
+ private final Map<String, String> metricTags;
private final AtomicInteger retentionSizeInPercentValue = new
AtomicInteger(0);
private final AtomicInteger localRetentionSizeInPercentValue = new
AtomicInteger(0);
@@ -1157,8 +1158,8 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
public RLMExpirationTask(TopicIdPartition topicIdPartition) {
super(topicIdPartition);
this.logger = getLogContext().logger(RLMExpirationTask.class);
- metricTags.put("topic", topicIdPartition.topic());
- metricTags.put("partition",
Integer.toString(topicIdPartition.partition()));
+ this.metricTags = MetricsUtils.getTags("topic",
topicIdPartition.topic(),
+ "partition",
Integer.toString(topicIdPartition.partition()));
}
// Visible for testing
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index 234bb0f06f4..b1c5090625e 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -2274,8 +2274,8 @@ public class RemoteLogManagerTest {
// Register metrics to expose them via JMX
expirationTask.registerMetrics();
- String retentionMetricName = "name=RetentionSizeInPercent,partition="
+ leaderTopicIdPartition.partition() + ",topic=" +
leaderTopicIdPartition.topic();
- String localRetentionMetricName =
"name=LocalRetentionSizeInPercent,partition=" +
leaderTopicIdPartition.partition() + ",topic=" + leaderTopicIdPartition.topic();
+ String retentionMetricName = "name=RetentionSizeInPercent,topic=" +
leaderTopicIdPartition.topic() + ",partition=" +
leaderTopicIdPartition.partition();
+ String localRetentionMetricName =
"name=LocalRetentionSizeInPercent,topic=" + leaderTopicIdPartition.topic() +
",partition=" + leaderTopicIdPartition.partition();
// Test case 1: Testing RetentionSizeInPercent metric (standard
retention scenario)
// retentionSize = 12288, onlyLocalLogSegmentsSize = 100,
localLogSegmentsSize = 100
@@ -2339,8 +2339,8 @@ public class RemoteLogManagerTest {
// Register metrics to expose them via JMX
expirationTask.registerMetrics();
- String retentionMetricName = "name=RetentionSizeInPercent,partition="
+ leaderTopicIdPartition.partition() + ",topic=" +
leaderTopicIdPartition.topic();
- String localRetentionMetricName =
"name=LocalRetentionSizeInPercent,partition=" +
leaderTopicIdPartition.partition() + ",topic=" + leaderTopicIdPartition.topic();
+ String retentionMetricName = "name=RetentionSizeInPercent,topic=" +
leaderTopicIdPartition.topic() + ",partition=" +
leaderTopicIdPartition.partition();
+ String localRetentionMetricName =
"name=LocalRetentionSizeInPercent,topic=" + leaderTopicIdPartition.topic() +
",partition=" + leaderTopicIdPartition.partition();
// RetentionSizeInPercent = ((100 + 10240) * 100) / 12288 = 84%
// LocalRetentionSizeInPercent = (100 * 100) / 6144 = 1%
@@ -2356,6 +2356,10 @@ public class RemoteLogManagerTest {
// Verify metrics are reset to 0 on cancellation (check via accessor
since JMX metrics are deregistered)
assertEquals(0, expirationTask.retentionSizeInPercent());
assertEquals(0, expirationTask.localRetentionSizeInPercent());
+
+ // Verify JMX metrics are actually deregistered from the Yammer
registry after cancellation
+ assertThrows(NoSuchElementException.class, () ->
yammerMetricValue(retentionMetricName));
+ assertThrows(NoSuchElementException.class, () ->
yammerMetricValue(localRetentionMetricName));
}
@Test
@@ -2379,8 +2383,8 @@ public class RemoteLogManagerTest {
// Register metrics to expose them via JMX
expirationTask.registerMetrics();
- String retentionMetricName = "name=RetentionSizeInPercent,partition="
+ leaderTopicIdPartition.partition() + ",topic=" +
leaderTopicIdPartition.topic();
- String localRetentionMetricName =
"name=LocalRetentionSizeInPercent,partition=" +
leaderTopicIdPartition.partition() + ",topic=" + leaderTopicIdPartition.topic();
+ String retentionMetricName = "name=RetentionSizeInPercent,topic=" +
leaderTopicIdPartition.topic() + ",partition=" +
leaderTopicIdPartition.partition();
+ String localRetentionMetricName =
"name=LocalRetentionSizeInPercent,topic=" + leaderTopicIdPartition.topic() +
",partition=" + leaderTopicIdPartition.partition();
expirationTask.buildRetentionSizeData(0, 100, 100, 1000, epochEntries,
0, Long.MAX_VALUE);