This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f1d79046c9 [feat][monitor] Add custom topic metric labels to 
OpenTelemetry metrics attributes (#25306)
0f1d79046c9 is described below

commit 0f1d79046c9fcfd612d0711c16a932245f30a192
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Mar 30 14:21:35 2026 +0800

    [feat][monitor] Add custom topic metric labels to OpenTelemetry metrics 
attributes (#25306)
---
 .../broker/service/PersistentTopicAttributes.java  | 133 +++++++++++-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../broker/stats/OpenTelemetryTopicStats.java      |  98 +++++----
 .../stats/OpenTelemetryCustomLabelsTest.java       | 235 +++++++++++++++++++++
 4 files changed, 424 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
index 51f5bdb354d..926314686db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PersistentTopicAttributes.java
@@ -18,14 +18,66 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static 
org.apache.pulsar.broker.service.AbstractTopic.getCustomMetricLabelsMap;
 import io.opentelemetry.api.common.Attributes;
+import java.util.Map;
 import lombok.Getter;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
 
 @Getter
 public class PersistentTopicAttributes extends TopicAttributes {
 
+    @Getter
+    public static class MetricAttributes {
+        private final Attributes customAttributes;
+        private final Attributes resolvedCommonAttributes;
+        private final Attributes timeBasedQuotaAttributes;
+        private final Attributes sizeBasedQuotaAttributes;
+        private final Attributes compactionSuccessAttributes;
+        private final Attributes compactionFailureAttributes;
+        private final Attributes transactionActiveAttributes;
+        private final Attributes transactionCommittedAttributes;
+        private final Attributes transactionAbortedAttributes;
+        private final Attributes 
transactionBufferClientCommitSucceededAttributes;
+        private final Attributes transactionBufferClientCommitFailedAttributes;
+        private final Attributes 
transactionBufferClientAbortSucceededAttributes;
+        private final Attributes transactionBufferClientAbortFailedAttributes;
+
+        private MetricAttributes(Attributes customAttributes,
+                                 Attributes resolvedCommonAttributes,
+                                 Attributes timeBasedQuotaAttributes,
+                                 Attributes sizeBasedQuotaAttributes,
+                                 Attributes compactionSuccessAttributes,
+                                 Attributes compactionFailureAttributes,
+                                 Attributes transactionActiveAttributes,
+                                 Attributes transactionCommittedAttributes,
+                                 Attributes transactionAbortedAttributes,
+                                 Attributes 
transactionBufferClientCommitSucceededAttributes,
+                                 Attributes 
transactionBufferClientCommitFailedAttributes,
+                                 Attributes 
transactionBufferClientAbortSucceededAttributes,
+                                 Attributes 
transactionBufferClientAbortFailedAttributes) {
+            this.customAttributes = customAttributes;
+            this.resolvedCommonAttributes = resolvedCommonAttributes;
+            this.timeBasedQuotaAttributes = timeBasedQuotaAttributes;
+            this.sizeBasedQuotaAttributes = sizeBasedQuotaAttributes;
+            this.compactionSuccessAttributes = compactionSuccessAttributes;
+            this.compactionFailureAttributes = compactionFailureAttributes;
+            this.transactionActiveAttributes = transactionActiveAttributes;
+            this.transactionCommittedAttributes = 
transactionCommittedAttributes;
+            this.transactionAbortedAttributes = transactionAbortedAttributes;
+            this.transactionBufferClientCommitSucceededAttributes = 
transactionBufferClientCommitSucceededAttributes;
+            this.transactionBufferClientCommitFailedAttributes = 
transactionBufferClientCommitFailedAttributes;
+            this.transactionBufferClientAbortSucceededAttributes = 
transactionBufferClientAbortSucceededAttributes;
+            this.transactionBufferClientAbortFailedAttributes = 
transactionBufferClientAbortFailedAttributes;
+        }
+    }
+
+    private final TopicName topicName;
+    private final PulsarService pulsar;
+
+    // Pre-built attributes with specific labels (without custom metric labels)
     private final Attributes timeBasedQuotaAttributes;
     private final Attributes sizeBasedQuotaAttributes;
 
@@ -41,8 +93,10 @@ public class PersistentTopicAttributes extends 
TopicAttributes {
     private final Attributes transactionBufferClientAbortSucceededAttributes;
     private final Attributes transactionBufferClientAbortFailedAttributes;
 
-    public PersistentTopicAttributes(TopicName topicName) {
+    public PersistentTopicAttributes(TopicName topicName, PulsarService 
pulsar) {
         super(topicName);
+        this.topicName = topicName;
+        this.pulsar = pulsar;
 
         timeBasedQuotaAttributes = Attributes.builder()
                 .putAll(commonAttributes)
@@ -100,4 +154,81 @@ public class PersistentTopicAttributes extends 
TopicAttributes {
                 
.putAll(OpenTelemetryAttributes.CompactionStatus.FAILURE.attributes)
                 .build();
     }
+
+    /**
+     * Converts a custom metric label key to OpenTelemetry format.
+     * The conversion follows OpenTelemetry semantic conventions by converting 
to lowercase.
+     *
+     * <p>Examples:
+     * <ul>
+     *   <li>{@code custom_label} → {@code custom_label}</li>
+     *   <li>{@code MY_LABEL} → {@code my_label}</li>
+     * </ul>
+     *
+     * @param originalKey the original key of the custom metric label
+     * @return the OpenTelemetry-compliant attribute key
+     * @see <a 
href="https://opentelemetry.io/docs/specs/semconv/general/naming/";>OpenTelemetry
 Naming Conventions</a>
+     */
+    public static String toOpenTelemetryAttributeKey(String originalKey) {
+        return originalKey.toLowerCase();
+    }
+
+    /**
+     * Get custom attributes (metric labels) for this topic.
+     * This method dynamically fetches custom labels to ensure they are always 
up-to-date.
+     * Only supported for persistent topics.
+     *
+     * @return attributes containing only custom metric labels, or empty 
attributes if not available
+     */
+    public Attributes getCustomAttributes() {
+        if (pulsar == null || 
!pulsar.getConfiguration().isExposeCustomTopicMetricLabelsEnabled()) {
+            return Attributes.empty();
+        }
+        Map<String, String> customLabels = getCustomMetricLabelsMap(pulsar, 
topicName);
+        if (customLabels.isEmpty()) {
+            return Attributes.empty();
+        }
+        var builder = Attributes.builder();
+        customLabels.forEach((key, value) -> 
builder.put(toOpenTelemetryAttributeKey(key), value));
+        return builder.build();
+    }
+
+    /**
+     * Build attributes by merging custom attributes into the given base 
attributes.
+     *
+     * @param baseAttributes the base attributes to merge custom attributes 
into
+     * @param customAttributes the custom attributes to merge
+     * @return attributes with custom attributes merged
+     */
+    public Attributes buildAttributesWithCustomLabels(Attributes 
baseAttributes, Attributes customAttributes) {
+        if (customAttributes.isEmpty()) {
+            return baseAttributes;
+        }
+        return Attributes.builder()
+                .putAll(baseAttributes)
+                .putAll(customAttributes)
+                .build();
+    }
+
+    /**
+     * Resolve all topic metric attributes for the current scrape cycle.
+     * The returned snapshot keeps the dynamic custom labels explicit while 
avoiding repeated merges.
+     */
+    public MetricAttributes resolveMetricAttributes() {
+        var customAttributes = getCustomAttributes();
+        return new MetricAttributes(
+                customAttributes,
+                buildAttributesWithCustomLabels(commonAttributes, 
customAttributes),
+                buildAttributesWithCustomLabels(timeBasedQuotaAttributes, 
customAttributes),
+                buildAttributesWithCustomLabels(sizeBasedQuotaAttributes, 
customAttributes),
+                buildAttributesWithCustomLabels(compactionSuccessAttributes, 
customAttributes),
+                buildAttributesWithCustomLabels(compactionFailureAttributes, 
customAttributes),
+                buildAttributesWithCustomLabels(transactionActiveAttributes, 
customAttributes),
+                
buildAttributesWithCustomLabels(transactionCommittedAttributes, 
customAttributes),
+                buildAttributesWithCustomLabels(transactionAbortedAttributes, 
customAttributes),
+                
buildAttributesWithCustomLabels(transactionBufferClientCommitSucceededAttributes,
 customAttributes),
+                
buildAttributesWithCustomLabels(transactionBufferClientCommitFailedAttributes, 
customAttributes),
+                
buildAttributesWithCustomLabels(transactionBufferClientAbortSucceededAttributes,
 customAttributes),
+                
buildAttributesWithCustomLabels(transactionBufferClientAbortFailedAttributes, 
customAttributes));
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 386e49e49de..88d511217c4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4834,7 +4834,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
             return persistentTopicAttributes;
         }
         return PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER.updateAndGet(this,
-                old -> old != null ? old : new 
PersistentTopicAttributes(TopicName.get(topic)));
+                old -> old != null ? old : new 
PersistentTopicAttributes(TopicName.get(topic), brokerService.pulsar()));
     }
 
     /**
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
index 0274cb7a7d4..92cb0d03efc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/OpenTelemetryTopicStats.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.PersistentTopicAttributes;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -393,10 +394,16 @@ public class OpenTelemetryTopicStats implements 
AutoCloseable {
     }
 
     private void recordMetricsForTopic(Topic topic) {
-        var topicAttributes = topic.getTopicAttributes();
-        var attributes = topicAttributes.getCommonAttributes();
+        PersistentTopic persistentTopic = null;
+        PersistentTopicAttributes.MetricAttributes persistentMetricAttributes 
= null;
 
         if (topic instanceof AbstractTopic abstractTopic) {
+            var attributes = topic.getTopicAttributes().getCommonAttributes();
+            if (topic instanceof PersistentTopic pt) {
+                persistentTopic = pt;
+                persistentMetricAttributes = 
pt.getTopicAttributes().resolveMetricAttributes();
+                attributes = 
persistentMetricAttributes.getResolvedCommonAttributes();
+            }
             
subscriptionCounter.record(abstractTopic.getSubscriptions().size(), attributes);
             producerCounter.record(abstractTopic.getProducers().size(), 
attributes);
             consumerCounter.record(abstractTopic.getNumberOfConsumers(), 
attributes);
@@ -411,10 +418,11 @@ public class OpenTelemetryTopicStats implements 
AutoCloseable {
             // Omitted: consumerMsgAckCounter
         }
 
-        if (topic instanceof PersistentTopic persistentTopic) {
+        if (persistentTopic != null) {
+            final var attributes = 
persistentMetricAttributes.getResolvedCommonAttributes();
+            final var persistentAttributes = persistentMetricAttributes;
+            final var currentPersistentTopic = persistentTopic;
             var persistentTopicMetrics = 
persistentTopic.getPersistentTopicMetrics();
-
-            var persistentTopicAttributes = 
persistentTopic.getTopicAttributes();
             var managedLedger = persistentTopic.getManagedLedger();
             var managedLedgerStats = 
persistentTopic.getManagedLedger().getStats();
             storageCounter.record(managedLedgerStats.getStoredMessagesSize(), 
attributes);
@@ -425,62 +433,68 @@ public class OpenTelemetryTopicStats implements 
AutoCloseable {
             
storageOutCounter.record(managedLedgerStats.getAddEntrySucceedTotal(), 
attributes);
 
             backlogQuotaLimitSize.record(
-                    
topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
-                    attributes);
+                
topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(),
+                attributes);
             backlogQuotaLimitTime.record(
-                    
topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime(),
-                    attributes);
-            
backlogQuotaAge.record(topic.getBestEffortOldestUnacknowledgedMessageAgeSeconds(),
 attributes);
+                
topic.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime(),
+                attributes);
+            
backlogQuotaAge.record(topic.getBestEffortOldestUnacknowledgedMessageAgeSeconds(),
+                attributes);
             var backlogQuotaMetrics = 
persistentTopicMetrics.getBacklogQuotaMetrics();
             
backlogEvictionCounter.record(backlogQuotaMetrics.getSizeBasedBacklogQuotaExceededEvictionCount(),
-                    persistentTopicAttributes.getSizeBasedQuotaAttributes());
+                persistentAttributes.getSizeBasedQuotaAttributes());
             
backlogEvictionCounter.record(backlogQuotaMetrics.getTimeBasedBacklogQuotaExceededEvictionCount(),
-                    persistentTopicAttributes.getTimeBasedQuotaAttributes());
+                persistentAttributes.getTimeBasedQuotaAttributes());
 
             var txnBuffer = persistentTopic.getTransactionBuffer();
             transactionCounter.record(txnBuffer.getOngoingTxnCount(),
-                    
persistentTopicAttributes.getTransactionActiveAttributes());
+                persistentAttributes.getTransactionActiveAttributes());
             transactionCounter.record(txnBuffer.getCommittedTxnCount(),
-                    
persistentTopicAttributes.getTransactionCommittedAttributes());
+                persistentAttributes.getTransactionCommittedAttributes());
             transactionCounter.record(txnBuffer.getAbortedTxnCount(),
-                    
persistentTopicAttributes.getTransactionAbortedAttributes());
+                persistentAttributes.getTransactionAbortedAttributes());
 
             var txnBufferClientMetrics = 
persistentTopicMetrics.getTransactionBufferClientMetrics();
             
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getCommitSucceededCount().sum(),
-                    
persistentTopicAttributes.getTransactionBufferClientCommitSucceededAttributes());
+                
persistentAttributes.getTransactionBufferClientCommitSucceededAttributes());
             
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getCommitFailedCount().sum(),
-                    
persistentTopicAttributes.getTransactionBufferClientCommitFailedAttributes());
+                
persistentAttributes.getTransactionBufferClientCommitFailedAttributes());
             
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getAbortSucceededCount().sum(),
-                    
persistentTopicAttributes.getTransactionBufferClientAbortSucceededAttributes());
+                
persistentAttributes.getTransactionBufferClientAbortSucceededAttributes());
             
transactionBufferClientOperationCounter.record(txnBufferClientMetrics.getAbortFailedCount().sum(),
-                    
persistentTopicAttributes.getTransactionBufferClientAbortFailedAttributes());
+                
persistentAttributes.getTransactionBufferClientAbortFailedAttributes());
 
             Optional.ofNullable(pulsar.getNullableCompactor())
-                    .map(Compactor::getStats)
-                    .flatMap(compactorMXBean -> 
compactorMXBean.getCompactionRecordForTopic(topic.getName()))
-                    .ifPresent(compactionRecord -> {
-                        
compactionRemovedCounter.record(compactionRecord.getCompactionRemovedEventCount(),
 attributes);
-                        
compactionOperationCounter.record(compactionRecord.getCompactionSucceedCount(),
-                                
persistentTopicAttributes.getCompactionSuccessAttributes());
-                        
compactionOperationCounter.record(compactionRecord.getCompactionFailedCount(),
-                                
persistentTopicAttributes.getCompactionFailureAttributes());
-                        
compactionDurationSeconds.record(MetricsUtil.convertToSeconds(
-                            
compactionRecord.getCompactionDurationTimeInMills(), TimeUnit.MILLISECONDS), 
attributes);
-                        
compactionBytesInCounter.record(compactionRecord.getCompactionReadBytes(), 
attributes);
-                        
compactionBytesOutCounter.record(compactionRecord.getCompactionWriteBytes(), 
attributes);
-
-                        
persistentTopic.getCompactedTopicContext().map(CompactedTopicContext::getLedger)
-                                .ifPresent(ledger -> {
-                                    
compactionEntriesCounter.record(ledger.getLastAddConfirmed() + 1, attributes);
-                                    
compactionBytesCounter.record(ledger.getLength(), attributes);
-                                });
-                    });
+                .map(Compactor::getStats)
+                .flatMap(compactorMXBean -> 
compactorMXBean.getCompactionRecordForTopic(topic.getName()))
+                .ifPresent(compactionRecord -> {
+                    
compactionRemovedCounter.record(compactionRecord.getCompactionRemovedEventCount(),
+                        attributes);
+                    
compactionOperationCounter.record(compactionRecord.getCompactionSucceedCount(),
+                        persistentAttributes.getCompactionSuccessAttributes());
+                    
compactionOperationCounter.record(compactionRecord.getCompactionFailedCount(),
+                        persistentAttributes.getCompactionFailureAttributes());
+                    
compactionDurationSeconds.record(MetricsUtil.convertToSeconds(
+                            
compactionRecord.getCompactionDurationTimeInMills(), TimeUnit.MILLISECONDS),
+                        attributes);
+                    
compactionBytesInCounter.record(compactionRecord.getCompactionReadBytes(),
+                        attributes);
+                    
compactionBytesOutCounter.record(compactionRecord.getCompactionWriteBytes(),
+                        attributes);
+
+                    
currentPersistentTopic.getCompactedTopicContext().map(CompactedTopicContext::getLedger)
+                        .ifPresent(ledger -> {
+                            
compactionEntriesCounter.record(ledger.getLastAddConfirmed() + 1,
+                                attributes);
+                            compactionBytesCounter.record(ledger.getLength(), 
attributes);
+                        });
+                });
 
             var delayedMessages = topic.getSubscriptions().values().stream()
-                    .map(Subscription::getDispatcher)
-                    .filter(Objects::nonNull)
-                    .mapToLong(Dispatcher::getNumberOfDelayedMessages)
-                    .sum();
+                .map(Subscription::getDispatcher)
+                .filter(Objects::nonNull)
+                .mapToLong(Dispatcher::getNumberOfDelayedMessages)
+                .sum();
             delayedSubscriptionCounter.record(delayedMessages, attributes);
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryCustomLabelsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryCustomLabelsTest.java
new file mode 100644
index 00000000000..153336c1b2d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryCustomLabelsTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import io.opentelemetry.api.common.Attributes;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class OpenTelemetryCustomLabelsTest extends BrokerTestBase {
+
+    private static final Set<String> ALLOWED_CUSTOM_METRIC_LABEL_KEYS = 
Set.of("SLA_TIER", "APP_OWNER");
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+        super.customizeMainPulsarTestContextBuilder(builder);
+        builder.enableOpenTelemetry(true);
+    }
+
+    @Override
+    protected ServiceConfiguration getDefaultConf() {
+        ServiceConfiguration conf = super.getDefaultConf();
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        conf.setExposeCustomTopicMetricLabelsEnabled(true);
+        
conf.setAllowedTopicPropertyKeysForMetrics(ALLOWED_CUSTOM_METRIC_LABEL_KEYS);
+        conf.setBrokerShutdownTimeoutMs(5000L);
+        return conf;
+    }
+
+    @Test(timeOut = 30_000)
+    public void testCustomLabelsInOpenTelemetryMetrics() throws Exception {
+        var topic1 = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testCustomLabels1");
+        var topic2 = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testCustomLabels2");
+
+        admin.topics().createNonPartitionedTopic(topic1);
+        admin.topics().createPartitionedTopic(topic2, 2);
+
+        @Cleanup
+        Producer<byte[]> p1 = 
pulsarClient.newProducer().topic(topic1).create();
+        @Cleanup
+        Producer<byte[]> p2 = 
pulsarClient.newProducer().topic(topic2).create();
+
+        @Cleanup
+        Consumer<byte[]> c1 = pulsarClient.newConsumer()
+            .topic(topic1)
+            .subscriptionName("test")
+            .subscribe();
+        @Cleanup
+        Consumer<byte[]> c2 = pulsarClient.newConsumer()
+            .topic(topic2)
+            .subscriptionName("test")
+            .subscribe();
+
+        // Produce and consume messages
+        for (int i = 0; i < 5; i++) {
+            p1.send(("message-" + i).getBytes());
+            p2.send(("message-" + i).getBytes());
+        }
+        for (int i = 0; i < 5; i++) {
+            c1.acknowledge(c1.receive());
+            c2.acknowledge(c2.receive());
+        }
+
+        // Set custom metric labels for topic1
+        Map<String, String> labels1 = new HashMap<>();
+        labels1.put("SLA_TIER", "gold");
+        labels1.put("APP_OWNER", "team-a");
+        admin.topics().updateProperties(topic1, labels1);
+
+        // Set custom metric labels for topic2
+        Map<String, String> labels2 = new HashMap<>();
+        labels2.put("SLA_TIER", "platinum");
+        labels2.put("APP_OWNER", "team-b");
+        admin.topics().updateProperties(topic2, labels2);
+
+        // Wait for labels to be set
+        Awaitility.await().untilAsserted(() -> {
+            var retrievedLabels1 = admin.topics().getProperties(topic1);
+            assertThat(retrievedLabels1.get("SLA_TIER")).isEqualTo("gold");
+            assertThat(retrievedLabels1.get("APP_OWNER")).isEqualTo("team-a");
+
+            var retrievedLabels2 = admin.topics().getProperties(topic2);
+            assertThat(retrievedLabels2.get("SLA_TIER")).isEqualTo("platinum");
+            assertThat(retrievedLabels2.get("APP_OWNER")).isEqualTo("team-b");
+        });
+
+        // Collect metrics and verify custom labels are present
+        var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+
+        // Build expected attributes for topic1 with custom labels
+        var attributesTopic1 = Attributes.builder()
+            .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+            .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop")
+            .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc")
+            .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic1)
+            .put("sla_tier", "gold")
+            .put("app_owner", "team-a")
+            .build();
+
+        // Build expected attributes for topic2 with custom labels
+        var attributesTopic2 = Attributes.builder()
+            .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+            .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop")
+            .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc")
+            .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic2)
+            .put("sla_tier", "platinum")
+            .put("app_owner", "team-b")
+            .build();
+
+        // Verify shared topic metrics contain custom labels for topic1
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.MESSAGE_IN_COUNTER, attributesTopic1, 5);
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.PRODUCER_COUNTER, attributesTopic1, 1);
+
+        // Verify persistent-topic metrics contain custom labels for topic1
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.STORAGE_COUNTER, attributesTopic1,
+            actual -> assertThat(actual).isPositive());
+        assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.STORAGE_OUT_COUNTER, attributesTopic1, 5);
+
+        // Verify metrics contain custom labels for topic2 (partitioned topic)
+        // For partitioned topics, metrics are reported per partition
+        boolean foundTopic2Metrics = metrics.stream()
+            .filter(metric -> 
metric.getName().equals(OpenTelemetryTopicStats.STORAGE_COUNTER))
+            .flatMap(metric -> metric.getLongSumData().getPoints().stream())
+            .anyMatch(point -> {
+                var attrs = point.getAttributes();
+                return 
attrs.get(OpenTelemetryAttributes.PULSAR_TOPIC).equals(topic2)
+                    && 
"platinum".equals(attrs.get(io.opentelemetry.api.common.AttributeKey
+                    .stringKey("sla_tier")))
+                    && 
"team-b".equals(attrs.get(io.opentelemetry.api.common.AttributeKey
+                    .stringKey("app_owner")));
+            });
+        assertThat(foundTopic2Metrics).isTrue();
+    }
+
+    @Test(timeOut = 30_000)
+    public void testCustomLabelsDisabled() throws Exception {
+        // Create a new test with custom labels disabled
+        var topic = 
BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testCustomLabelsDisabled");
+        admin.topics().createNonPartitionedTopic(topic);
+
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).create();
+        producer.send("test".getBytes());
+
+        // Set custom metric labels
+        Map<String, String> labels = new HashMap<>();
+        labels.put("SLA_TIER", "gold");
+        admin.topics().updateProperties(topic, labels);
+
+        // Wait for labels to be set
+        Awaitility.await().untilAsserted(() -> {
+            var retrievedLabels = admin.topics().getProperties(topic);
+            assertThat(retrievedLabels.get("SLA_TIER")).isEqualTo("gold");
+        });
+
+        // Temporarily disable custom labels
+        
pulsar.getConfiguration().setExposeCustomTopicMetricLabelsEnabled(false);
+
+        try {
+            // Collect metrics
+            var metrics = 
pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+
+            // Build expected attributes without custom labels
+            var attributesWithoutCustomLabels = Attributes.builder()
+                .put(OpenTelemetryAttributes.PULSAR_DOMAIN, "persistent")
+                .put(OpenTelemetryAttributes.PULSAR_TENANT, "prop")
+                .put(OpenTelemetryAttributes.PULSAR_NAMESPACE, "prop/ns-abc")
+                .put(OpenTelemetryAttributes.PULSAR_TOPIC, topic)
+                .build();
+
+            // Verify shared topic metrics do NOT contain custom labels
+            assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.MESSAGE_IN_COUNTER, attributesWithoutCustomLabels,
+                1);
+
+            // Verify persistent-topic metrics do NOT contain custom labels
+            assertMetricLongSumValue(metrics, 
OpenTelemetryTopicStats.STORAGE_COUNTER, attributesWithoutCustomLabels,
+                actual -> assertThat(actual).isPositive());
+
+            // Verify no metrics contain the custom label
+            boolean foundCustomLabel = metrics.stream()
+                .filter(metric -> 
metric.getName().equals(OpenTelemetryTopicStats.STORAGE_COUNTER))
+                .flatMap(metric -> 
metric.getLongSumData().getPoints().stream())
+                .anyMatch(point -> point.getAttributes()
+                    
.get(io.opentelemetry.api.common.AttributeKey.stringKey("sla_tier")) != null);
+            assertThat(foundCustomLabel).isFalse();
+        } finally {
+            // Re-enable custom labels for other tests
+            
pulsar.getConfiguration().setExposeCustomTopicMetricLabelsEnabled(true);
+        }
+    }
+}

Reply via email to