chia7712 commented on code in PR #16426:
URL: https://github.com/apache/kafka/pull/16426#discussion_r1665891312
##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -284,26 +284,34 @@ class BrokerTopicMetrics(name: Option[String],
remoteStorageEnabled: Boolean = f
}
case class GaugeWrapper(metricType: String) {
- @volatile private var gaugeObject: Gauge[Long] = _
- final private val gaugeLock = new Object
- final val aggregatedMetric = new AggregatedMetric()
+ private final val created = new AtomicBoolean(false)
+ // The map to store:
+ // - per-partition value for topic-level metrics. The key will be the
partition number
+ // - per-topic value for broker-level metrics. The key will be the topic
name
+ private val metricValues = new ConcurrentHashMap[String, Long]()
+
+ def setValue(key: String, value: Long): Unit = {
+ newGaugeIfNeed()
+ metricValues.put(key, value)
+ }
- def gauge(): Gauge[Long] = gaugeLock synchronized {
- if (gaugeObject == null) {
- gaugeObject = metricsGroup.newGauge(metricType, () =>
aggregatedMetric.value(), tags)
- }
- return gaugeObject
+ def removeKey(key: String): Unit = {
+ newGaugeIfNeed()
+ metricValues.remove(key)
}
- def close(): Unit = gaugeLock synchronized {
- if (gaugeObject != null) {
- metricsGroup.removeMetric(metricType, tags)
- aggregatedMetric.close()
- gaugeObject = null
- }
+ def close(): Unit = if (created.compareAndSet(true, false)) {
Review Comment:
Sorry that I just notice that `MeterWrapper` is using sync lock. We should
use the same sync mechanism :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]