FrankYang0529 commented on code in PR #16426:
URL: https://github.com/apache/kafka/pull/16426#discussion_r1666154846


##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -284,26 +284,34 @@ class BrokerTopicMetrics(name: Option[String], 
remoteStorageEnabled: Boolean = f
   }
 
   case class GaugeWrapper(metricType: String) {
-    @volatile private var gaugeObject: Gauge[Long] = _
-    final private val gaugeLock = new Object
-    final val aggregatedMetric = new AggregatedMetric()
+    private final val created = new AtomicBoolean(false)
+    // The map to store:
+    //   - per-partition value for topic-level metrics. The key will be the 
partition number
+    //   - per-topic value for broker-level metrics. The key will be the topic 
name
+    private val metricValues = new ConcurrentHashMap[String, Long]()
+
+    def setValue(key: String, value: Long): Unit = {
+      newGaugeIfNeed()
+      metricValues.put(key, value)
+    }
 
-    def gauge(): Gauge[Long] = gaugeLock synchronized {
-      if (gaugeObject == null) {
-        gaugeObject = metricsGroup.newGauge(metricType, () => 
aggregatedMetric.value(), tags)
-      }
-      return gaugeObject
+    def removeKey(key: String): Unit = {
+      newGaugeIfNeed()
+      metricValues.remove(key)
     }
 
-    def close(): Unit = gaugeLock synchronized {
-      if (gaugeObject != null) {
-        metricsGroup.removeMetric(metricType, tags)
-        aggregatedMetric.close()
-        gaugeObject = null
-      }
+    def close(): Unit = if (created.compareAndSet(true, false)) {

Review Comment:
   Updated it. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to