mjsax commented on code in PR #17781:
URL: https://github.com/apache/kafka/pull/17781#discussion_r1851151511
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -662,17 +662,19 @@ void setGroupAssignmentSnapshot(final Set<TopicPartition>
partitions) {
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricChange(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter ->
reporter.metricChange(metric));
+ } else {
+ log.debug("Metric {} already registered consumer metric",
metric.metricName());
}
}
@Override
public void unregisterMetricFromSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricRemoval(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter ->
reporter.metricRemoval(metric));
+ } else {
+ log.debug("Metric {} is a standard consumer metric, won't
unregister", metric.metricName());
Review Comment:
```suggestion
log.debug("Skipping unregistration for metric {}. Existing
consumer metrics cannot be removed.", metric.metricName());
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -662,17 +662,19 @@ void setGroupAssignmentSnapshot(final Set<TopicPartition>
partitions) {
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
- if (clientTelemetryReporter.isPresent()) {
- ClientTelemetryReporter reporter = clientTelemetryReporter.get();
- reporter.metricChange(metric);
+ if (!metrics().containsKey(metric.metricName())) {
+ clientTelemetryReporter.ifPresent(reporter ->
reporter.metricChange(metric));
+ } else {
+ log.debug("Metric {} already registered consumer metric",
metric.metricName());
Review Comment:
```suggestion
log.debug("Skipping registration for metric {}. Existing
consumer metrics cannot be overwritten.", metric.metricName());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]