This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9a28bd23ad1 KAFKA-19542: Consumer.close() does not remove all added
sensors from Metrics (#21038)
9a28bd23ad1 is described below
commit 9a28bd23ad10454ba911b600d5d21e8cc023006f
Author: Kirk True <[email protected]>
AuthorDate: Thu Mar 19 11:48:31 2026 -0700
KAFKA-19542: Consumer.close() does not remove all added sensors from
Metrics (#21038)
The intention of the `Consumer` API is that after `close()`, `metrics()`
should return an empty map of metrics. The lifecycle management of the
various metrics managers in the consumer is inconsistent. Some managers
remove all the metrics that were created, some registries remove some of
them, and some don't make any effort to remove the metrics at all.
This change introduces `AbstractConsumerMetricsManager` as a shared base
class for consumer metrics managers, consolidating the steps of metric
registration and cleanup logic, as well as unit tests to ensure that the
different metrics managers perform the cleanup step.
Reviewers: Chia-Ping Tsai <[email protected]>, TengYao Chi
<[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 38 ++++---
.../consumer/internals/AsyncKafkaConsumer.java | 18 +++-
.../consumer/internals/ClassicKafkaConsumer.java | 8 +-
.../consumer/internals/ConsumerCoordinator.java | 16 ++-
.../consumer/internals/FetchMetricsManager.java | 14 ++-
.../consumer/internals/ShareConsumerImpl.java | 8 +-
.../internals/ShareFetchMetricsManager.java | 28 ++----
...er.java => AbstractConsumerMetricsManager.java} | 28 +++---
.../internals/metrics/AsyncConsumerMetrics.java | 27 ++---
.../metrics/ConsumerRebalanceMetricsManager.java | 26 ++---
.../internals/metrics/HeartbeatMetricsManager.java | 7 +-
.../internals/metrics/KafkaConsumerMetrics.java | 18 ++--
.../metrics/KafkaShareConsumerMetrics.java | 16 ++-
.../consumer/internals/metrics/MetricsLedger.java | 111 +++++++++++++++++++++
.../metrics/OffsetCommitMetricsManager.java | 7 +-
.../metrics/RebalanceCallbackMetricsManager.java | 7 +-
.../internals/metrics/RebalanceMetricsManager.java | 8 +-
.../internals/{ => metrics}/SensorBuilder.java | 24 ++---
.../metrics/ShareRebalanceMetricsManager.java | 10 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 52 ++++++++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 3 +
.../consumer/internals/ShareConsumerImplTest.java | 55 +++++++++-
.../AbstractConsumerMetricsManagerTest.java} | 31 +++---
.../metrics/AsyncConsumerMetricsTest.java | 7 +-
.../ConsumerRebalanceMetricsManagerTest.java | 8 +-
.../metrics/HeartbeatMetricsManagerTest.java | 7 +-
.../metrics/OffsetCommitMetricsManagerTest.java | 7 +-
.../RebalanceCallbackMetricsManagerTest.java | 7 +-
.../metrics/ShareRebalanceMetricsManagerTest.java | 7 +-
29 files changed, 442 insertions(+), 161 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index e14509da5f1..42809ba7477 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.CloseOptions;
+import
org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager;
+import org.apache.kafka.clients.consumer.internals.metrics.MetricsLedger;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
@@ -1330,14 +1332,6 @@ public abstract class AbstractCoordinator implements
Closeable {
}
}
- protected final Meter createMeter(Metrics metrics, String groupName,
String baseName, String descriptiveName) {
- return new Meter(new WindowedCount(),
- metrics.metricName(baseName + "-rate", groupName,
- String.format("The number of %s per second",
descriptiveName)),
- metrics.metricName(baseName + "-total", groupName,
- String.format("The total number of %s",
descriptiveName)));
- }
-
/**
* Visible for testing.
*/
@@ -1345,7 +1339,22 @@ public abstract class AbstractCoordinator implements
Closeable {
return heartbeatThread;
}
- private class GroupCoordinatorMetrics {
+ protected class AbstractCoordinatorMetrics extends
AbstractConsumerMetricsManager {
+
+ protected AbstractCoordinatorMetrics(MetricsLedger metrics) {
+ super(metrics);
+ }
+
+ protected final Meter createMeter(String groupName, String baseName,
String descriptiveName) {
+ return new Meter(new WindowedCount(),
+ metrics.metricName(baseName + "-rate", groupName,
+ String.format("The number of %s per second",
descriptiveName)),
+ metrics.metricName(baseName + "-total", groupName,
+ String.format("The total number of %s", descriptiveName)));
+ }
+ }
+
+ private class GroupCoordinatorMetrics extends AbstractCoordinatorMetrics {
public final String metricGrpName;
public final Sensor heartbeatSensor;
@@ -1355,13 +1364,18 @@ public abstract class AbstractCoordinator implements
Closeable {
public final Sensor failedRebalanceSensor;
public GroupCoordinatorMetrics(Metrics metrics, String
metricGrpPrefix) {
+ this(new MetricsLedger(metrics), metricGrpPrefix);
+ }
+
+ private GroupCoordinatorMetrics(MetricsLedger metrics, String
metricGrpPrefix) {
+ super(metrics);
this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
this.heartbeatSensor = metrics.sensor("heartbeat-latency");
this.heartbeatSensor.add(metrics.metricName("heartbeat-response-time-max",
this.metricGrpName,
"The max time taken to receive a response to a heartbeat
request"), new Max());
- this.heartbeatSensor.add(createMeter(metrics, metricGrpName,
"heartbeat", "heartbeats"));
+ this.heartbeatSensor.add(createMeter(metricGrpName, "heartbeat",
"heartbeats"));
this.joinSensor = metrics.sensor("join-latency");
this.joinSensor.add(metrics.metricName("join-time-avg",
@@ -1370,7 +1384,7 @@ public abstract class AbstractCoordinator implements
Closeable {
this.joinSensor.add(metrics.metricName("join-time-max",
this.metricGrpName,
"The max time taken for a group rejoin"), new Max());
- this.joinSensor.add(createMeter(metrics, metricGrpName, "join",
"group joins"));
+ this.joinSensor.add(createMeter(metricGrpName, "join", "group
joins"));
this.syncSensor = metrics.sensor("sync-latency");
this.syncSensor.add(metrics.metricName("sync-time-avg",
@@ -1379,7 +1393,7 @@ public abstract class AbstractCoordinator implements
Closeable {
this.syncSensor.add(metrics.metricName("sync-time-max",
this.metricGrpName,
"The max time taken for a group sync"), new Max());
- this.syncSensor.add(createMeter(metrics, metricGrpName, "sync",
"group syncs"));
+ this.syncSensor.add(createMeter(metricGrpName, "sync", "group
syncs"));
this.successfulRebalanceSensor =
metrics.sensor("rebalance-latency");
this.successfulRebalanceSensor.add(metrics.metricName("rebalance-latency-avg",
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index c08dcfa325a..3176225c2d4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -333,6 +333,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final ApplicationEventHandler applicationEventHandler;
private final Time time;
private final AtomicReference<Optional<ConsumerGroupMetadata>>
groupMetadata = new AtomicReference<>(Optional.empty());
+ private final FetchMetricsManager fetchMetricsManager;
+ private final RebalanceCallbackMetricsManager
rebalanceCallbackMetricsManager;
private final AsyncConsumerMetrics asyncConsumerMetrics;
private final KafkaConsumerMetrics kafkaConsumerMetrics;
private Logger log;
@@ -462,7 +464,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
final List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);
- FetchMetricsManager fetchMetricsManager =
createFetchMetricsManager(metrics);
+ this.fetchMetricsManager = createFetchMetricsManager(metrics);
FetchConfig fetchConfig = new FetchConfig(config);
this.isolationLevel = fetchConfig.isolationLevel;
@@ -525,11 +527,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
requestManagersSupplier,
asyncConsumerMetrics
);
+ this.rebalanceCallbackMetricsManager = new
RebalanceCallbackMetricsManager(metrics);
this.rebalanceListenerInvoker = new
ConsumerRebalanceListenerInvoker(
logContext,
subscriptions,
time,
- new RebalanceCallbackMetricsManager(metrics)
+ rebalanceCallbackMetricsManager
);
this.streamsRebalanceListenerInvoker = streamsRebalanceData.map(s
->
new StreamsRebalanceListenerInvoker(logContext, s));
@@ -569,6 +572,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
Deserializers<K, V> deserializers,
FetchBuffer fetchBuffer,
FetchCollector<K, V> fetchCollector,
+ FetchMetricsManager fetchMetricsManager,
+ RebalanceCallbackMetricsManager
rebalanceCallbackMetricsManager,
ConsumerInterceptors<K, V> interceptors,
Time time,
ApplicationEventHandler applicationEventHandler,
@@ -589,6 +594,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.clientId = clientId;
this.fetchBuffer = fetchBuffer;
this.fetchCollector = fetchCollector;
+ this.fetchMetricsManager = fetchMetricsManager;
+ this.rebalanceCallbackMetricsManager = rebalanceCallbackMetricsManager;
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
this.interceptors = Objects.requireNonNull(interceptors);
this.time = time;
@@ -643,7 +650,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.clientTelemetryReporter = Optional.empty();
ConsumerMetrics metricsRegistry = new ConsumerMetrics();
- FetchMetricsManager fetchMetricsManager = new
FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
+ this.fetchMetricsManager = new FetchMetricsManager(metrics,
metricsRegistry.fetcherMetrics);
this.fetchCollector = new FetchCollector<>(logContext,
metadata,
subscriptions,
@@ -668,11 +675,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
time,
asyncConsumerMetrics
);
+ this.rebalanceCallbackMetricsManager = new
RebalanceCallbackMetricsManager(metrics);
this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
logContext,
subscriptions,
time,
- new RebalanceCallbackMetricsManager(metrics)
+ rebalanceCallbackMetricsManager
);
ApiVersions apiVersions = new ApiVersions();
this.positionsValidator = new PositionsValidator(logContext, time,
subscriptions, metadata);
@@ -1618,6 +1626,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
closeQuietly(interceptors, "consumer interceptors", firstException);
closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics",
firstException);
closeQuietly(asyncConsumerMetrics, "async consumer metrics",
firstException);
+ closeQuietly(fetchMetricsManager, "consumer fetch metrics",
firstException);
+ closeQuietly(rebalanceCallbackMetricsManager, "consumer rebalance
callback metrics");
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter,
"async consumer telemetry reporter", firstException));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 5b54a759a98..b8e181743a2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -125,6 +125,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final Optional<String> groupId;
private final ConsumerCoordinator coordinator;
private final Deserializers<K, V> deserializers;
+ private final FetchMetricsManager fetchMetricsManager;
private final Fetcher<K, V> fetcher;
private final OffsetFetcher offsetFetcher;
private final TopicMetadataFetcher topicMetadataFetcher;
@@ -191,7 +192,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config);
this.metadata.bootstrap(addresses);
- FetchMetricsManager fetchMetricsManager =
createFetchMetricsManager(metrics);
+ this.fetchMetricsManager = createFetchMetricsManager(metrics);
FetchConfig fetchConfig = new FetchConfig(config);
this.isolationLevel = fetchConfig.isolationLevel;
@@ -361,7 +362,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
boolean checkCrcs =
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
ConsumerMetrics metricsRegistry = new ConsumerMetrics();
- FetchMetricsManager metricsManager = new FetchMetricsManager(metrics,
metricsRegistry.fetcherMetrics);
+ this.fetchMetricsManager = new FetchMetricsManager(metrics,
metricsRegistry.fetcherMetrics);
ApiVersions apiVersions = new ApiVersions();
FetchConfig fetchConfig = new FetchConfig(
minBytes,
@@ -380,7 +381,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
subscriptions,
fetchConfig,
deserializers,
- metricsManager,
+ fetchMetricsManager,
time,
apiVersions
);
@@ -1160,6 +1161,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
closeQuietly(interceptors, "consumer interceptors", firstException);
closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics",
firstException);
+ closeQuietly(fetchMetricsManager, "kafka fetch metrics",
firstException);
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(client, "consumer network client", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9834bd2582f..9e999c72a65 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import
org.apache.kafka.clients.consumer.internals.Utils.TopicPartitionComparator;
+import org.apache.kafka.clients.consumer.internals.metrics.MetricsLedger;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
@@ -108,6 +109,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
private final List<ConsumerPartitionAssignor> assignors;
private final ConsumerMetadata metadata;
private final ConsumerCoordinatorMetrics coordinatorMetrics;
+ private final RebalanceCallbackMetricsManager
rebalanceCallbackMetricsManager;
private final SubscriptionState subscriptions;
private final OffsetCommitCallback defaultOffsetCommitCallback;
private final boolean autoCommitEnabled;
@@ -272,11 +274,12 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
protocol = null;
}
+ this.rebalanceCallbackMetricsManager = new
RebalanceCallbackMetricsManager(metrics, metricGrpPrefix);
this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
logContext,
subscriptions,
time,
- new RebalanceCallbackMetricsManager(metrics, metricGrpPrefix)
+ rebalanceCallbackMetricsManager
);
this.metadata.requestUpdate(true);
}
@@ -1026,6 +1029,8 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
}
} finally {
super.close(timer, membershipOperation);
+ Utils.closeQuietly(coordinatorMetrics, "consumer coordinator
metrics");
+ Utils.closeQuietly(rebalanceCallbackMetricsManager, "consumer
rebalance callback metrics");
}
}
@@ -1623,10 +1628,15 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
}
}
- private class ConsumerCoordinatorMetrics {
+ private class ConsumerCoordinatorMetrics extends
AbstractCoordinatorMetrics {
private final Sensor commitSensor;
private ConsumerCoordinatorMetrics(Metrics metrics, String
metricGrpPrefix) {
+ this(new MetricsLedger(metrics), metricGrpPrefix);
+ }
+
+ private ConsumerCoordinatorMetrics(MetricsLedger metrics, String
metricGrpPrefix) {
+ super(metrics);
String metricGrpName = metricGrpPrefix +
COORDINATOR_METRICS_SUFFIX;
this.commitSensor = metrics.sensor("commit-latency");
@@ -1636,7 +1646,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
this.commitSensor.add(metrics.metricName("commit-latency-max",
metricGrpName,
"The max time taken for a commit request"), new Max());
- this.commitSensor.add(createMeter(metrics, metricGrpName,
"commit", "commit calls"));
+ this.commitSensor.add(createMeter(metricGrpName, "commit", "commit
calls"));
Measurable numParts = (config, now) ->
subscriptions.numAssignedPartitions();
metrics.addMetric(metrics.metricName("assigned-partitions",
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
index b9a1af69359..646e33333a4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import
org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager;
+import org.apache.kafka.clients.consumer.internals.metrics.MetricsLedger;
+import org.apache.kafka.clients.consumer.internals.metrics.SensorBuilder;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Gauge;
@@ -38,9 +41,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
* <p>Note: metric tag maps use {@code Utils.mkMap} to preserve insertion
order; do not replace
* with {@code Map.of} as tag order affects JMX MBean names.
*/
-public class FetchMetricsManager {
+public class FetchMetricsManager extends AbstractConsumerMetricsManager {
- private final Metrics metrics;
private final FetchMetricsRegistry metricsRegistry;
private final Sensor throttleTime;
private final Sensor bytesFetched;
@@ -52,8 +54,14 @@ public class FetchMetricsManager {
private int assignmentId = 0;
private Set<TopicPartition> assignedPartitions = Collections.emptySet();
+ @SuppressWarnings("this-escape")
public FetchMetricsManager(Metrics metrics, FetchMetricsRegistry
metricsRegistry) {
- this.metrics = metrics;
+ this(new MetricsLedger(metrics), metricsRegistry);
+ }
+
+ @SuppressWarnings("this-escape")
+ private FetchMetricsManager(MetricsLedger metrics, FetchMetricsRegistry
metricsRegistry) {
+ super(metrics);
this.metricsRegistry = metricsRegistry;
this.throttleTime = new SensorBuilder(metrics, "fetch-throttle-time")
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 448e3436bd9..6e221877609 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -168,6 +168,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
private final ApplicationEventHandler applicationEventHandler;
private final Time time;
+ private final ShareFetchMetricsManager shareFetchMetricsManager;
private final KafkaShareConsumerMetrics kafkaShareConsumerMetrics;
private final AsyncConsumerMetrics asyncConsumerMetrics;
private Logger log;
@@ -273,7 +274,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);
- ShareFetchMetricsManager shareFetchMetricsManager =
createShareFetchMetricsManager(metrics);
+ this.shareFetchMetricsManager =
createShareFetchMetricsManager(metrics);
ApiVersions apiVersions = new ApiVersions();
final BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
this.acknowledgementEventHandler = new
ShareAcknowledgementEventHandler(acknowledgementEventQueue);
@@ -384,7 +385,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.completedAcknowledgements = new LinkedList<>();
ShareConsumerMetrics metricsRegistry = new ShareConsumerMetrics();
- ShareFetchMetricsManager shareFetchMetricsManager = new
ShareFetchMetricsManager(metrics, metricsRegistry.shareFetchMetrics);
+ this.shareFetchMetricsManager = new ShareFetchMetricsManager(metrics,
metricsRegistry.shareFetchMetrics);
this.fetchCollector = new ShareFetchCollector<>(
logContext,
metadata,
@@ -456,6 +457,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final Deserializer<V> valueDeserializer,
final ShareFetchBuffer fetchBuffer,
final ShareFetchCollector<K, V> fetchCollector,
+ final ShareFetchMetricsManager shareFetchMetricsManager,
final Time time,
final ApplicationEventHandler applicationEventHandler,
final BlockingQueue<ShareAcknowledgementEvent>
acknowledgementEventQueue,
@@ -474,6 +476,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.groupId = groupId;
this.fetchBuffer = fetchBuffer;
this.fetchCollector = fetchCollector;
+ this.shareFetchMetricsManager = shareFetchMetricsManager;
this.time = time;
this.acknowledgementEventQueue = acknowledgementEventQueue;
this.acknowledgementEventProcessor = new
ShareAcknowledgementEventProcessor();
@@ -1041,6 +1044,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
closeQuietly(kafkaShareConsumerMetrics, "kafka share consumer
metrics", firstException);
closeQuietly(asyncConsumerMetrics, "kafka async consumer metrics",
firstException);
+ closeQuietly(shareFetchMetricsManager, "kafka share consumer fetch
metrics", firstException);
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter,
"consumer telemetry reporter", firstException));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java
index d3e60a3dfaa..cdfd65bed5f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java
@@ -16,15 +16,14 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import
org.apache.kafka.clients.consumer.internals.metrics.AbstractConsumerMetricsManager;
+import org.apache.kafka.clients.consumer.internals.metrics.MetricsLedger;
+import org.apache.kafka.clients.consumer.internals.metrics.SensorBuilder;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.WindowedCount;
-import java.io.IOException;
-import java.util.Arrays;
-
-public class ShareFetchMetricsManager implements AutoCloseable {
- private final Metrics metrics;
+public class ShareFetchMetricsManager extends AbstractConsumerMetricsManager {
private final Sensor throttleTime;
private final Sensor bytesFetched;
private final Sensor recordsFetched;
@@ -32,9 +31,14 @@ public class ShareFetchMetricsManager implements
AutoCloseable {
private final Sensor sentAcknowledgements;
private final Sensor failedAcknowledgements;
+ @SuppressWarnings({"this-escape"})
public ShareFetchMetricsManager(Metrics metrics, ShareFetchMetricsRegistry
metricsRegistry) {
- this.metrics = metrics;
+ this(new MetricsLedger(metrics), metricsRegistry);
+ }
+ @SuppressWarnings({"this-escape"})
+ private ShareFetchMetricsManager(MetricsLedger metrics,
ShareFetchMetricsRegistry metricsRegistry) {
+ super(metrics);
this.bytesFetched = new SensorBuilder(metrics, "bytes-fetched")
.withAvg(metricsRegistry.fetchSizeAvg)
.withMax(metricsRegistry.fetchSizeMax)
@@ -95,16 +99,4 @@ public class ShareFetchMetricsManager implements
AutoCloseable {
void recordFailedAcknowledgements(int acknowledgements) {
failedAcknowledgements.record(acknowledgements);
}
-
- @Override
- public void close() throws IOException {
- Arrays.asList(
- throttleTime.name(),
- bytesFetched.name(),
- recordsFetched.name(),
- fetchLatency.name(),
- sentAcknowledgements.name(),
- failedAcknowledgements.name()
- ).forEach(metrics::removeSensor);
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java
similarity index 56%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
copy to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java
index 16ad1b39817..c5d7e474eeb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java
@@ -16,26 +16,22 @@
*/
package org.apache.kafka.clients.consumer.internals.metrics;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.Metrics;
+import java.util.Objects;
-public abstract class RebalanceMetricsManager {
- protected final String metricGroupName;
+/**
+ * Utility class that serves as a common abstraction point for consumers to
create and register their
+ * metrics, and to ensure they're removed on {@link #close()} via the {@link
MetricsLedger} instance.
+ */
+public abstract class AbstractConsumerMetricsManager implements AutoCloseable {
- RebalanceMetricsManager(String metricGroupName) {
- this.metricGroupName = metricGroupName;
- }
+ protected final MetricsLedger metrics;
- protected MetricName createMetric(Metrics metrics, String name, String
description) {
- return metrics.metricName(name, metricGroupName, description);
+ protected AbstractConsumerMetricsManager(MetricsLedger metrics) {
+ this.metrics = Objects.requireNonNull(metrics);
}
- public abstract void recordRebalanceStarted(long nowMs);
-
- public abstract void recordRebalanceEnded(long nowMs);
-
- public void maybeRecordRebalanceFailed() {
+ @Override
+ public void close() {
+ metrics.close();
}
-
- public abstract boolean rebalanceStarted();
}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
index 2f90440a662..6b640dc015b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
@@ -22,10 +22,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Value;
-import java.util.Arrays;
-
-public class AsyncConsumerMetrics implements AutoCloseable {
- private final Metrics metrics;
+public class AsyncConsumerMetrics extends AbstractConsumerMetricsManager {
public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME =
"time-between-network-thread-poll";
public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME =
"application-event-queue-size";
@@ -49,7 +46,11 @@ public class AsyncConsumerMetrics implements AutoCloseable {
private final Sensor unsentRequestsQueueTimeSensor;
public AsyncConsumerMetrics(Metrics metrics, String groupName) {
- this.metrics = metrics;
+ this(new MetricsLedger(metrics), groupName);
+ }
+
+ private AsyncConsumerMetrics(MetricsLedger metrics, String groupName) {
+ super(metrics);
this.timeBetweenNetworkThreadPollSensor =
metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME);
this.timeBetweenNetworkThreadPollSensor.add(
metrics.metricName(
@@ -237,20 +238,4 @@ public class AsyncConsumerMetrics implements AutoCloseable
{
public void recordBackgroundEventQueueProcessingTime(long processingTime) {
this.backgroundEventQueueProcessingTimeSensor.record(processingTime);
}
-
- @Override
- public void close() {
- Arrays.asList(
- timeBetweenNetworkThreadPollSensor.name(),
- applicationEventQueueSizeSensor.name(),
- applicationEventQueueTimeSensor.name(),
- applicationEventQueueProcessingTimeSensor.name(),
- applicationEventExpiredSizeSensor.name(),
- backgroundEventQueueSizeSensor.name(),
- backgroundEventQueueTimeSensor.name(),
- backgroundEventQueueProcessingTimeSensor.name(),
- unsentRequestsQueueSizeSensor.name(),
- unsentRequestsQueueTimeSensor.name()
- ).forEach(metrics::removeSensor);
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java
index a9091e060b2..43513e9a0c3 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManager.java
@@ -51,27 +51,29 @@ public final class ConsumerRebalanceMetricsManager extends
RebalanceMetricsManag
public final MetricName assignedPartitionsCount;
private long lastRebalanceEndMs = -1L;
private long lastRebalanceStartMs = -1L;
- private final Metrics metrics;
public ConsumerRebalanceMetricsManager(Metrics metrics, SubscriptionState
subscriptions) {
- super(CONSUMER_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX);
- this.metrics = metrics;
+ this(new MetricsLedger(metrics), subscriptions);
+ }
+
+ private ConsumerRebalanceMetricsManager(MetricsLedger metrics,
SubscriptionState subscriptions) {
+ super(metrics, CONSUMER_METRIC_GROUP_PREFIX +
COORDINATOR_METRICS_SUFFIX);
- rebalanceLatencyAvg = createMetric(metrics, "rebalance-latency-avg",
+ rebalanceLatencyAvg = createMetric("rebalance-latency-avg",
"The average time in ms taken for a group to complete a
rebalance");
- rebalanceLatencyMax = createMetric(metrics, "rebalance-latency-max",
+ rebalanceLatencyMax = createMetric("rebalance-latency-max",
"The max time in ms taken for a group to complete a
rebalance");
- rebalanceLatencyTotal = createMetric(metrics,
"rebalance-latency-total",
+ rebalanceLatencyTotal = createMetric("rebalance-latency-total",
"The total number of milliseconds spent in rebalances");
- rebalanceTotal = createMetric(metrics, "rebalance-total",
+ rebalanceTotal = createMetric("rebalance-total",
"The total number of rebalance events");
- rebalanceRatePerHour = createMetric(metrics, "rebalance-rate-per-hour",
+ rebalanceRatePerHour = createMetric("rebalance-rate-per-hour",
"The number of rebalance events per hour");
- failedRebalanceTotal = createMetric(metrics, "failed-rebalance-total",
+ failedRebalanceTotal = createMetric("failed-rebalance-total",
"The total number of failed rebalance events");
- failedRebalanceRate = createMetric(metrics,
"failed-rebalance-rate-per-hour",
+ failedRebalanceRate = createMetric("failed-rebalance-rate-per-hour",
"The number of failed rebalance events per hour");
- assignedPartitionsCount = createMetric(metrics, "assigned-partitions",
+ assignedPartitionsCount = createMetric("assigned-partitions",
"The number of partitions currently assigned to this
consumer");
registerAssignedPartitionCount(subscriptions);
@@ -92,7 +94,7 @@ public final class ConsumerRebalanceMetricsManager extends
RebalanceMetricsManag
else
return TimeUnit.SECONDS.convert(now - lastRebalanceEndMs,
TimeUnit.MILLISECONDS);
};
- lastRebalanceSecondsAgo = createMetric(metrics,
+ lastRebalanceSecondsAgo = createMetric(
"last-rebalance-seconds-ago",
"The number of seconds since the last rebalance event");
metrics.addMetric(lastRebalanceSecondsAgo, lastRebalance);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java
index 926e267d989..cabea5b2394 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManager.java
@@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
-public class HeartbeatMetricsManager {
+public class HeartbeatMetricsManager extends AbstractConsumerMetricsManager {
// MetricName visible for testing
final MetricName heartbeatResponseTimeMax;
final MetricName heartbeatRate;
@@ -43,6 +43,11 @@ public class HeartbeatMetricsManager {
}
public HeartbeatMetricsManager(Metrics metrics, String metricGroupPrefix) {
+ this(new MetricsLedger(metrics), metricGroupPrefix);
+ }
+
+ private HeartbeatMetricsManager(MetricsLedger metrics, String
metricGroupPrefix) {
+ super(metrics);
final String metricGroupName = metricGroupPrefix +
COORDINATOR_METRICS_SUFFIX;
heartbeatSensor = metrics.sensor("heartbeat-latency");
heartbeatResponseTimeMax =
metrics.metricName("heartbeat-response-time-max",
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
index 1b2bb4518f9..0a2cf694d49 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
@@ -28,8 +28,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
-public class KafkaConsumerMetrics implements AutoCloseable {
- private final Metrics metrics;
+public class KafkaConsumerMetrics extends AbstractConsumerMetricsManager {
private final MetricName lastPollMetricName;
private final Sensor timeBetweenPollSensor;
private final Sensor pollIdleSensor;
@@ -40,7 +39,11 @@ public class KafkaConsumerMetrics implements AutoCloseable {
private long timeSinceLastPollMs;
public KafkaConsumerMetrics(Metrics metrics) {
- this.metrics = metrics;
+ this(new MetricsLedger(metrics));
+ }
+
+ private KafkaConsumerMetrics(MetricsLedger metrics) {
+ super(metrics);
final String metricGroupName = CONSUMER_METRIC_GROUP;
Measurable lastPoll = (mConfig, now) -> {
if (lastPollMs == 0L)
@@ -110,13 +113,4 @@ public class KafkaConsumerMetrics implements AutoCloseable
{
public void recordCommitted(long duration) {
this.committedSensor.record(duration);
}
-
- @Override
- public void close() {
- metrics.removeMetric(lastPollMetricName);
- metrics.removeSensor(timeBetweenPollSensor.name());
- metrics.removeSensor(pollIdleSensor.name());
- metrics.removeSensor(commitSyncSensor.name());
- metrics.removeSensor(committedSensor.name());
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
index e154b97da5a..8903f046c5e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
@@ -27,8 +27,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
-public class KafkaShareConsumerMetrics implements AutoCloseable {
- private final Metrics metrics;
+public class KafkaShareConsumerMetrics extends AbstractConsumerMetricsManager {
private final MetricName lastPollMetricName;
private final Sensor timeBetweenPollSensor;
private final Sensor pollIdleSensor;
@@ -37,7 +36,11 @@ public class KafkaShareConsumerMetrics implements
AutoCloseable {
private long timeSinceLastPollMs;
public KafkaShareConsumerMetrics(Metrics metrics) {
- this.metrics = metrics;
+ this(new MetricsLedger(metrics));
+ }
+
+ private KafkaShareConsumerMetrics(MetricsLedger metrics) {
+ super(metrics);
final String metricGroupName = CONSUMER_SHARE_METRIC_GROUP;
Measurable lastPoll = (mConfig, now) -> {
if (lastPollMs == 0L)
@@ -79,11 +82,4 @@ public class KafkaShareConsumerMetrics implements
AutoCloseable {
double pollIdleRatio = pollTimeMs * 1.0 / (pollTimeMs +
timeSinceLastPollMs);
this.pollIdleSensor.record(pollIdleRatio);
}
-
- @Override
- public void close() {
- metrics.removeMetric(lastPollMetricName);
- metrics.removeSensor(timeBetweenPollSensor.name());
- metrics.removeSensor(pollIdleSensor.name());
- }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/MetricsLedger.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/MetricsLedger.java
new file mode 100644
index 00000000000..3cbd0d6fd03
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/MetricsLedger.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * {@code MetricsLedger} records the {@link MetricName}s and {@link Sensor}s
that are created
+ * using the internal {@link Metrics} instance in a ledger. Then, in {@link
#close()}, the
+ * ledger is reviewed and each of the {@link MetricName}s and {@link Sensor}s
are removed from
+ * the underlying {@link Metrics} instance.
+ *
+ * <p/>
+ *
+ * Because {@link Metrics} is a <code>final</code> class, we cannot extend it
in a delegation
+ * pattern. Instead, we mimic the subset of APIs that are needed by the
callers.
+ */
+public class MetricsLedger implements AutoCloseable {
+
+ private final Metrics metrics;
+ private final Set<MetricName> metricNames;
+ private final Set<Sensor> sensors;
+
+ public MetricsLedger(Metrics metrics) {
+ this.metrics = Objects.requireNonNull(metrics);
+ this.metricNames = new HashSet<>();
+ this.sensors = new HashSet<>();
+ }
+
+ public MetricName metricName(String name, String metricGroupName, String
description) {
+ MetricName metricName = metrics.metricName(name, metricGroupName,
description);
+ metricNames.add(metricName);
+ return metricName;
+ }
+
+ public MetricName metricInstance(MetricNameTemplate template, Map<String,
String> tags) {
+ MetricName metricName = metrics.metricInstance(template, tags);
+ metricNames.add(metricName);
+ return metricName;
+ }
+
+ public void addMetricIfAbsent(MetricName metricName, MetricConfig config,
MetricValueProvider<?> metricValueProvider) {
+ metrics.addMetricIfAbsent(metricName, config, metricValueProvider);
+ metricNames.add(metricName);
+ }
+
+ public void addMetric(MetricName metricName, Measurable measurable) {
+ metrics.addMetric(metricName, measurable);
+ metricNames.add(metricName);
+ }
+
+ public void removeMetric(MetricName metricName) {
+ metrics.removeMetric(metricName);
+ metricNames.remove(metricName);
+ }
+
+ public Sensor sensor(String name) {
+ Sensor sensor = metrics.sensor(name);
+ sensors.add(sensor);
+ return sensor;
+ }
+
+ public Sensor getSensor(String name) {
+ Sensor sensor = metrics.getSensor(name);
+
+ if (sensor != null)
+ sensors.add(sensor);
+
+ return sensor;
+ }
+
+ public void removeSensor(String name) {
+ Sensor s = getSensor(name);
+ metrics.removeSensor(name);
+ sensors.remove(s);
+ }
+
+ @Override
+ public final void close() {
+ sensors.forEach(s -> {
+ metrics.removeSensor(s.name());
+ });
+
+ metricNames.forEach(metrics::removeMetric);
+ }
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java
index d700299ef18..53044f2dc05 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManager.java
@@ -27,7 +27,7 @@ import org.apache.kafka.common.metrics.stats.WindowedCount;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
-public class OffsetCommitMetricsManager {
+public class OffsetCommitMetricsManager extends AbstractConsumerMetricsManager
{
final MetricName commitLatencyAvg;
final MetricName commitLatencyMax;
final MetricName commitRate;
@@ -35,6 +35,11 @@ public class OffsetCommitMetricsManager {
private final Sensor commitSensor;
public OffsetCommitMetricsManager(Metrics metrics) {
+ this(new MetricsLedger(metrics));
+ }
+
+ private OffsetCommitMetricsManager(MetricsLedger metrics) {
+ super(metrics);
final String metricGroupName = CONSUMER_METRIC_GROUP_PREFIX +
COORDINATOR_METRICS_SUFFIX;
commitSensor = metrics.sensor("commit-latency");
commitLatencyAvg = metrics.metricName("commit-latency-avg",
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java
index f70b891864f..c22c388c3f2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManager.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.metrics.stats.Max;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
-public class RebalanceCallbackMetricsManager {
+public class RebalanceCallbackMetricsManager extends
AbstractConsumerMetricsManager {
final MetricName partitionRevokeLatencyAvg;
final MetricName partitionAssignLatencyAvg;
final MetricName partitionLostLatencyAvg;
@@ -41,6 +41,11 @@ public class RebalanceCallbackMetricsManager {
}
public RebalanceCallbackMetricsManager(Metrics metrics, String
grpMetricsPrefix) {
+ this(new MetricsLedger(metrics), grpMetricsPrefix);
+ }
+
+ private RebalanceCallbackMetricsManager(MetricsLedger metrics, String
grpMetricsPrefix) {
+ super(metrics);
final String metricGroupName = grpMetricsPrefix +
COORDINATOR_METRICS_SUFFIX;
partitionRevokeCallbackSensor =
metrics.sensor("partition-revoked-latency");
partitionRevokeLatencyAvg =
metrics.metricName("partition-revoked-latency-avg",
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
index 16ad1b39817..238de4f527d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
@@ -17,16 +17,16 @@
package org.apache.kafka.clients.consumer.internals.metrics;
import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.Metrics;
-public abstract class RebalanceMetricsManager {
+public abstract class RebalanceMetricsManager extends
AbstractConsumerMetricsManager {
protected final String metricGroupName;
- RebalanceMetricsManager(String metricGroupName) {
+ RebalanceMetricsManager(MetricsLedger metrics, String metricGroupName) {
+ super(metrics);
this.metricGroupName = metricGroupName;
}
- protected MetricName createMetric(Metrics metrics, String name, String
description) {
+ protected MetricName createMetric(String name, String description) {
return metrics.metricName(name, metricGroupName, description);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/SensorBuilder.java
similarity index 79%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/SensorBuilder.java
index a2346a3b376..c29c599d45a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/SensorBuilder.java
@@ -14,11 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.clients.consumer.internals;
+package org.apache.kafka.clients.consumer.internals.metrics;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricNameTemplate;
-import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
@@ -37,7 +36,7 @@ import java.util.function.Supplier;
*/
public class SensorBuilder {
- private final Metrics metrics;
+ private final MetricsLedger metrics;
private final Sensor sensor;
@@ -45,11 +44,11 @@ public class SensorBuilder {
private final Map<String, String> tags;
- public SensorBuilder(Metrics metrics, String name) {
+ public SensorBuilder(MetricsLedger metrics, String name) {
this(metrics, name, Collections::emptyMap);
}
- public SensorBuilder(Metrics metrics, String name, Supplier<Map<String,
String>> tagsSupplier) {
+ public SensorBuilder(MetricsLedger metrics, String name,
Supplier<Map<String, String>> tagsSupplier) {
this.metrics = metrics;
Sensor s = metrics.getSensor(name);
@@ -64,35 +63,35 @@ public class SensorBuilder {
}
}
- SensorBuilder withAvg(MetricNameTemplate name) {
+ public SensorBuilder withAvg(MetricNameTemplate name) {
if (!preexisting)
sensor.add(metrics.metricInstance(name, tags), new Avg());
return this;
}
- SensorBuilder withMin(MetricNameTemplate name) {
+ public SensorBuilder withMin(MetricNameTemplate name) {
if (!preexisting)
sensor.add(metrics.metricInstance(name, tags), new Min());
return this;
}
- SensorBuilder withMax(MetricNameTemplate name) {
+ public SensorBuilder withMax(MetricNameTemplate name) {
if (!preexisting)
sensor.add(metrics.metricInstance(name, tags), new Max());
return this;
}
- SensorBuilder withValue(MetricNameTemplate name) {
+ public SensorBuilder withValue(MetricNameTemplate name) {
if (!preexisting)
sensor.add(metrics.metricInstance(name, tags), new Value());
return this;
}
- SensorBuilder withMeter(MetricNameTemplate rateName, MetricNameTemplate
totalName) {
+ public SensorBuilder withMeter(MetricNameTemplate rateName,
MetricNameTemplate totalName) {
if (!preexisting) {
sensor.add(new Meter(metrics.metricInstance(rateName, tags),
metrics.metricInstance(totalName, tags)));
}
@@ -100,7 +99,7 @@ public class SensorBuilder {
return this;
}
- SensorBuilder withMeter(SampledStat sampledStat, MetricNameTemplate
rateName, MetricNameTemplate totalName) {
+ public SensorBuilder withMeter(SampledStat sampledStat, MetricNameTemplate
rateName, MetricNameTemplate totalName) {
if (!preexisting) {
sensor.add(new Meter(sampledStat, metrics.metricInstance(rateName,
tags), metrics.metricInstance(totalName, tags)));
}
@@ -108,8 +107,7 @@ public class SensorBuilder {
return this;
}
- Sensor build() {
+ public Sensor build() {
return sensor;
}
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java
index a346c4bbcec..1a9f57e9b68 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManager.java
@@ -36,11 +36,15 @@ public final class ShareRebalanceMetricsManager extends
RebalanceMetricsManager
private long lastRebalanceStartMs = -1L;
public ShareRebalanceMetricsManager(Metrics metrics) {
- super(CONSUMER_SHARE_METRIC_GROUP_PREFIX + COORDINATOR_METRICS_SUFFIX);
+ this(new MetricsLedger(metrics));
+ }
+
+ private ShareRebalanceMetricsManager(MetricsLedger metrics) {
+ super(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX +
COORDINATOR_METRICS_SUFFIX);
- rebalanceTotal = createMetric(metrics, "rebalance-total",
+ rebalanceTotal = createMetric("rebalance-total",
"The total number of rebalance events");
- rebalanceRatePerHour = createMetric(metrics, "rebalance-rate-per-hour",
+ rebalanceRatePerHour = createMetric("rebalance-rate-per-hour",
"The number of rebalance events per hour");
rebalanceSensor = metrics.sensor("rebalance-latency");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index f70a618bb65..498f1fcbf41 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -395,6 +395,58 @@ public class KafkaConsumerTest {
consumer.close(CloseOptions.timeout(Duration.ZERO));
}
+ @ParameterizedTest
+ @EnumSource(GroupProtocol.class)
+ public void testMetricsRemovedOnClose(GroupProtocol groupProtocol) {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
+
+ assertMetricsMap(true);
+ consumer.close(CloseOptions.timeout(Duration.ZERO));
+ assertMetricsMap(false);
+ }
+
+ private void assertMetricsMap(boolean metricsShouldBePresent) {
+ // Copy the map because we're going to modify it.
+ Map<MetricName, ? extends Metric> metrics = new
HashMap<>(consumer.metrics());
+
+ // There's a meta-metric named "count" that is automatically added to
the metrics map.
+ Optional<MetricName> countMetricNameOpt = metrics.keySet().stream()
+ .filter(metricName -> metricName.name().equals("count") &&
metricName.group().equals("kafka-metrics-count"))
+ .findAny();
+
+ // Make sure the meta-metric is present and has an entry.
+ assertTrue(
+ countMetricNameOpt.isPresent(),
+ "The \"count\" meta-metric was unexpectedly missing from the
Consumer metrics"
+ );
+ MetricName countMetricName = countMetricNameOpt.get();
+ assertNotNull(
+ metrics.remove(countMetricName),
+ "The \"count\" meta-metric key was removed from the Consumer
metrics map, but it unexpectedly had no entry"
+ );
+
+ if (metricsShouldBePresent) {
+ assertFalse(
+ metrics.isEmpty(),
+ "The consumer should have created metrics, but they are
unexpectedly empty"
+ );
+ } else {
+ List<String> expected = List.of();
+ List<String> actual = metrics.keySet().stream()
+ .map(metricName -> metricName.group() + ":" +
metricName.name())
+ .sorted()
+ .collect(Collectors.toList());
+ assertEquals(
+ expected,
+ actual,
+ "The consumer should have removed its metrics on close(), but
there are metrics remaining"
+ );
+ }
+ }
+
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testDisableJmxAndClientTelemetryReporter(GroupProtocol
groupProtocol) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 4c7b97524ca..b3086caba56 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -57,6 +57,7 @@ import
org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscr
import
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
@@ -258,6 +259,8 @@ public class AsyncKafkaConsumerTest {
new Deserializers<>(new StringDeserializer(), new
StringDeserializer(), metrics),
fetchBuffer,
fetchCollector,
+ mock(FetchMetricsManager.class),
+ mock(RebalanceCallbackMetricsManager.class),
interceptors,
time,
applicationEventHandler,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 24baa003d5a..7f88af47e93 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -40,6 +40,8 @@ import
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@@ -71,6 +73,7 @@ import org.mockito.Mockito;
import java.time.Duration;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -85,6 +88,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import javax.security.auth.login.LoginException;
@@ -107,7 +111,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-@SuppressWarnings("unchecked")
+@SuppressWarnings({"ClassFanOutComplexity", "unchecked"})
public class ShareConsumerImplTest {
private static final Optional<Integer> DEFAULT_ACQUISITION_LOCK_TIMEOUT_MS
= Optional.of(30000);
@@ -115,6 +119,7 @@ public class ShareConsumerImplTest {
private final Time time = new MockTime(1);
private final ShareFetchCollector<String, String> fetchCollector =
mock(ShareFetchCollector.class);
+ private final ShareFetchMetricsManager shareFetchMetricsManager =
mock(ShareFetchMetricsManager.class);
private final ShareConsumerMetadata metadata =
mock(ShareConsumerMetadata.class);
private final ApplicationEventHandler applicationEventHandler =
mock(ApplicationEventHandler.class);
private final LinkedBlockingQueue<ShareAcknowledgementEvent>
acknowledgementEventQueue = new LinkedBlockingQueue<>();
@@ -191,6 +196,7 @@ public class ShareConsumerImplTest {
new StringDeserializer(),
fetchBuffer,
fetchCollector,
+ shareFetchMetricsManager,
time,
applicationEventHandler,
acknowledgementEventQueue,
@@ -953,6 +959,53 @@ public class ShareConsumerImplTest {
assertEquals(0, timer.remainingMs());
}
+ @Test
+ public void testMetricsRemovedOnClose() {
+ consumer = newConsumer();
+ assertMetricsMap(true);
+ consumer.close(Duration.ZERO);
+ assertMetricsMap(false);
+ }
+
+ private void assertMetricsMap(boolean metricsShouldBePresent) {
+ // Copy the map because we're going to modify it.
+ Map<MetricName, ? extends Metric> metrics = new
HashMap<>(consumer.metrics());
+
+ // There's a meta-metric named "count" that is automatically added to
the metrics map.
+ Optional<MetricName> countMetricNameOpt = metrics.keySet().stream()
+ .filter(metricName -> metricName.name().equals("count") &&
metricName.group().equals("kafka-metrics-count"))
+ .findAny();
+
+ // Make sure the meta-metric is present and has an entry.
+ assertTrue(
+ countMetricNameOpt.isPresent(),
+ "The \"count\" meta-metric was unexpectedly missing from the
Consumer metrics"
+ );
+ MetricName countMetricName = countMetricNameOpt.get();
+ assertNotNull(
+ metrics.remove(countMetricName),
+ "The \"count\" meta-metric key was removed from the Consumer
metrics map, but it unexpectedly had no entry"
+ );
+
+ if (metricsShouldBePresent) {
+ assertFalse(
+ metrics.isEmpty(),
+ "The consumer should have created metrics, but they are
unexpectedly empty"
+ );
+ } else {
+ List<String> expected = List.of();
+ List<String> actual = metrics.keySet().stream()
+ .map(metricName -> metricName.group() + ":" +
metricName.name())
+ .sorted()
+ .collect(Collectors.toList());
+ assertEquals(
+ expected,
+ actual,
+ "The consumer should have removed its metrics on close(), but
there are metrics remaining"
+ );
+ }
+ }
+
/**
* This test ensures that the {@link ShareConsumer} implementation fails
on creation when the underlying
* {@link NetworkClient} fails creation.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java
similarity index 55%
copy from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
copy to
clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java
index 16ad1b39817..1917ddd6df9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceMetricsManager.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManagerTest.java
@@ -16,26 +16,27 @@
*/
package org.apache.kafka.clients.consumer.internals.metrics;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
-public abstract class RebalanceMetricsManager {
- protected final String metricGroupName;
+import org.junit.jupiter.api.Test;
- RebalanceMetricsManager(String metricGroupName) {
- this.metricGroupName = metricGroupName;
- }
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
- protected MetricName createMetric(Metrics metrics, String name, String
description) {
- return metrics.metricName(name, metricGroupName, description);
- }
+public abstract class AbstractConsumerMetricsManagerTest {
- public abstract void recordRebalanceStarted(long nowMs);
+ protected abstract AbstractConsumerMetricsManager metricsManager(Metrics
metrics, String groupDescription);
- public abstract void recordRebalanceEnded(long nowMs);
+ @Test
+ public void testCleanup() {
+ try (Metrics metrics = new Metrics()) {
+ int metricCount = metrics.metrics().size();
- public void maybeRecordRebalanceFailed() {
- }
+ try (AbstractConsumerMetricsManager metricsManager =
metricsManager(metrics, "test")) {
+ assertTrue(metrics.metrics().size() > metricCount);
+ }
- public abstract boolean rebalanceStarted();
-}
\ No newline at end of file
+ assertEquals(metricCount, metrics.metrics().size());
+ }
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
index 876bc3ffa12..bec5ec2785d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
@@ -32,7 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class AsyncConsumerMetricsTest {
+public class AsyncConsumerMetricsTest extends
AbstractConsumerMetricsManagerTest {
private static final long METRIC_VALUE = 123L;
private final Metrics metrics = new Metrics();
@@ -53,6 +53,11 @@ public class AsyncConsumerMetricsTest {
metrics.close();
}
+ @Override
+ protected AbstractConsumerMetricsManager metricsManager(Metrics metrics,
String groupDescription) {
+ return new AsyncConsumerMetrics(metrics, groupDescription);
+ }
+
@ParameterizedTest
@MethodSource("groupNameProvider")
public void shouldMetricNames(String groupName) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java
index 346035f1caf..e3fea75ef81 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ConsumerRebalanceMetricsManagerTest.java
@@ -39,7 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-class ConsumerRebalanceMetricsManagerTest {
+class ConsumerRebalanceMetricsManagerTest extends
AbstractConsumerMetricsManagerTest {
private Time time;
private Metrics metrics;
@@ -68,6 +68,12 @@ class ConsumerRebalanceMetricsManagerTest {
metrics.close();
}
+ @Override
+ protected AbstractConsumerMetricsManager metricsManager(Metrics metrics,
String groupDescription) {
+ SubscriptionState subscriptionState = new
SubscriptionState(mock(LogContext.class), AutoOffsetResetStrategy.EARLIEST);
+ return new ConsumerRebalanceMetricsManager(metrics, subscriptionState);
+ }
+
@Test
public void testAssignedPartitionCountMetric() {
assertNotNull(metrics.metric(metricsManager.assignedPartitionsCount),
"Metric assigned-partitions has not been registered as expected");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManagerTest.java
index fe7f8a0a023..a2ea0e04f12 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/HeartbeatMetricsManagerTest.java
@@ -28,10 +28,15 @@ import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-public class HeartbeatMetricsManagerTest {
+public class HeartbeatMetricsManagerTest extends
AbstractConsumerMetricsManagerTest {
private final Time time = new MockTime();
private final Metrics metrics = new Metrics(time);
+ @Override
+ protected AbstractConsumerMetricsManager metricsManager(Metrics metrics,
String groupDescription) {
+ return new HeartbeatMetricsManager(metrics, groupDescription);
+ }
+
@Test
public void testHeartbeatMetrics() {
// Assuming 'metrics' is an instance of your Metrics class
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManagerTest.java
index 4d0f3f619a1..e541f817edb 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/OffsetCommitMetricsManagerTest.java
@@ -25,10 +25,15 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-public class OffsetCommitMetricsManagerTest {
+public class OffsetCommitMetricsManagerTest extends
AbstractConsumerMetricsManagerTest {
private final Time time = new MockTime();
private final Metrics metrics = new Metrics(time);
+ @Override
+ protected AbstractConsumerMetricsManager metricsManager(Metrics metrics,
String groupDescription) {
+ return new HeartbeatMetricsManager(metrics, groupDescription);
+ }
+
@Test
public void testOffsetCommitMetrics() {
// Assuming 'metrics' is an instance of your Metrics class
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManagerTest.java
index 876d06a9967..e2cbcd5f3e5 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/RebalanceCallbackMetricsManagerTest.java
@@ -25,10 +25,15 @@ import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-public class RebalanceCallbackMetricsManagerTest {
+public class RebalanceCallbackMetricsManagerTest extends
AbstractConsumerMetricsManagerTest {
private final Time time = new MockTime();
private final Metrics metrics = new Metrics(time);
+ @Override
+ protected AbstractConsumerMetricsManager metricsManager(Metrics metrics,
String groupDescription) {
+ return new HeartbeatMetricsManager(metrics, groupDescription);
+ }
+
@Test
public void testRebalanceCallbackMetrics() {
RebalanceCallbackMetricsManager metricsManager = new
RebalanceCallbackMetricsManager(metrics);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManagerTest.java
index 6a28dd8ea0f..0acee6825de 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/ShareRebalanceMetricsManagerTest.java
@@ -26,11 +26,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-class ShareRebalanceMetricsManagerTest {
+class ShareRebalanceMetricsManagerTest extends
AbstractConsumerMetricsManagerTest {
private final Time time = new MockTime();
private final Metrics metrics = new Metrics(time);
+ @Override
+ protected AbstractConsumerMetricsManager metricsManager(Metrics metrics,
String groupDescription) {
+ return new HeartbeatMetricsManager(metrics, groupDescription);
+ }
+
@Test
public void testRebalanceMetrics() {
ShareRebalanceMetricsManager shareRebalanceMetricsManager = new
ShareRebalanceMetricsManager(metrics);