This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b2048a5908e KAFKA-19149: Add KIP-877 support to MirrorMaker (#21576)
b2048a5908e is described below

commit b2048a5908ef5cea5d6716d40d2a48f6c614c9e3
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Mar 17 09:56:53 2026 +0100

    KAFKA-19149: Add KIP-877 support to MirrorMaker (#21576)
    
    Implements [KIP-1280](https://cwiki.apache.org/confluence/x/a4s8G)
    
    Reviewers: Luke Chen <[email protected]>
---
 .../connect/mirror/MirrorCheckpointConfig.java     |  21 ++-
 ...ics.java => MirrorCheckpointLegacyMetrics.java} |  40 ++---
 .../connect/mirror/MirrorCheckpointMetrics.java    |  92 +++++------
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  22 ++-
 .../connect/mirror/MirrorCheckpointTaskConfig.java |   9 +-
 .../connect/mirror/MirrorConnectorConfig.java      |   5 +
 .../connect/mirror/MirrorHeartbeatConfig.java      |   1 +
 .../kafka/connect/mirror/MirrorSourceConfig.java   |  20 +++
 .../connect/mirror/MirrorSourceLegacyMetrics.java  | 180 +++++++++++++++++++++
 .../kafka/connect/mirror/MirrorSourceMetrics.java  | 166 ++++++++-----------
 .../kafka/connect/mirror/MirrorSourceTask.java     |  36 ++++-
 .../connect/mirror/MirrorSourceTaskConfig.java     |   9 +-
 ...tricsTest.java => MirrorLegacyMetricsTest.java} |  39 ++++-
 ...urceMetricsTest.java => MirrorMetricsTest.java} |  78 +++++----
 .../kafka/connect/mirror/MirrorSourceTaskTest.java |   6 +-
 .../MirrorConnectorsIntegrationBaseTest.java       | 133 ++++++++++++++-
 16 files changed, 615 insertions(+), 242 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
index b7625da619d..b397ff742e8 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
@@ -68,6 +68,12 @@ public class MirrorCheckpointConfig extends 
MirrorConnectorConfig {
     private static final String SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DOC = 
"Frequency of consumer group offset sync.";
     public static final long SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DEFAULT = 60;
 
+    public static final String METRIC_NAMES_FORMAT_DOC = "Deprecated. The 
formats in which metrics are emitted. Valid values are legacy and new. " +
+            "When set to legacy, the metrics have the following name 
\"kafka.connect.mirror:type=MirrorCheckpointConnector,source={SOURCE},target={TARGET},group={GROUP},topic={TOPIC},partition={PARTITION}\".
 " +
+            "When set to new, the metrics have the following name 
\"kafka.connect:type=plugins,connector={CONNECTOR},task={TASK},source={SOURCE},target={TARGET},group={GROUP},topic={TOPIC},partition={PARTITION}\".
 " +
+            "When set to \"legacy,new\" the connector will emit metrics with 
both names, this can be useful when migrating to the new format but it doubles 
the amount of metrics that are emitted.\n" +
+            "In Kafka 5.0 the legacy format and this configuration will be 
removed and the metrics will always use the new names.";
+
     public static final String GROUP_FILTER_CLASS = "group.filter.class";
     private static final String GROUP_FILTER_CLASS_DOC = "GroupFilter to use. 
Selects consumer groups to replicate.";
     public static final Class<?> GROUP_FILTER_CLASS_DEFAULT = 
DefaultGroupFilter.class;
@@ -162,6 +168,10 @@ public class MirrorCheckpointConfig extends 
MirrorConnectorConfig {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
 
+    List<String> metricNamesFormats() {
+        return getList(METRIC_NAMES_FORMAT);
+    }
+
     public static Map<String, String> validate(Map<String, String> configs) {
         Map<String, String> invalidConfigs = new HashMap<>();
 
@@ -264,7 +274,16 @@ public class MirrorCheckpointConfig extends 
MirrorConnectorConfig {
                         ConfigDef.Type.CLASS,
                         TOPIC_FILTER_CLASS_DEFAULT,
                         ConfigDef.Importance.LOW,
-                        TOPIC_FILTER_CLASS_DOC);
+                        TOPIC_FILTER_CLASS_DOC
+                )
+                .define(
+                        METRIC_NAMES_FORMAT,
+                        ConfigDef.Type.LIST,
+                        METRIC_NAMES_FORMAT_DEFAULT,
+                        ConfigDef.ValidList.in(false, METRIC_NAMES_LEGACY, 
METRIC_NAMES_NEW),
+                        ConfigDef.Importance.LOW,
+                        METRIC_NAMES_FORMAT_DOC
+                );
     }
 
     protected static final ConfigDef CONNECTOR_CONFIG_DEF = 
defineCheckpointConfig(new ConfigDef(BASE_CONNECTOR_CONFIG_DEF));
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointLegacyMetrics.java
similarity index 72%
copy from 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
copy to 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointLegacyMetrics.java
index 71e3edebf5b..dc592f512f5 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointLegacyMetrics.java
@@ -27,37 +27,44 @@ import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.metrics.stats.Value;
 
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS;
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_AVG;
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_AVG_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_MAX;
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_MAX_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_MIN;
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_MIN_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.groupTags;
+
 /** Metrics for replicated topic-partitions */
-class MirrorCheckpointMetrics implements AutoCloseable {
+class MirrorCheckpointLegacyMetrics implements AutoCloseable {
 
     private static final String CHECKPOINT_CONNECTOR_GROUP = 
MirrorCheckpointConnector.class.getSimpleName();
 
     private static final Set<String> GROUP_TAGS = Set.of("source", "target", 
"group", "topic", "partition");
-
     private static final MetricNameTemplate CHECKPOINT_LATENCY = new 
MetricNameTemplate(
-            "checkpoint-latency-ms", CHECKPOINT_CONNECTOR_GROUP,
-            "Time it takes consumer group offsets to replicate from source to 
target cluster.", GROUP_TAGS);
+            LATENCY_MS, CHECKPOINT_CONNECTOR_GROUP,
+            LATENCY_MS_DESCRIPTION, GROUP_TAGS);
     private static final MetricNameTemplate CHECKPOINT_LATENCY_MAX = new 
MetricNameTemplate(
-            "checkpoint-latency-ms-max", CHECKPOINT_CONNECTOR_GROUP,
-            "Max time it takes consumer group offsets to replicate from source 
to target cluster.", GROUP_TAGS);
+            LATENCY_MS_MAX, CHECKPOINT_CONNECTOR_GROUP,
+            LATENCY_MS_MAX_DESCRIPTION, GROUP_TAGS);
     private static final MetricNameTemplate CHECKPOINT_LATENCY_MIN = new 
MetricNameTemplate(
-            "checkpoint-latency-ms-min", CHECKPOINT_CONNECTOR_GROUP,
-            "Min time it takes consumer group offsets to replicate from source 
to target cluster.", GROUP_TAGS);
+            LATENCY_MS_MIN, CHECKPOINT_CONNECTOR_GROUP,
+            LATENCY_MS_MIN_DESCRIPTION, GROUP_TAGS);
     private static final MetricNameTemplate CHECKPOINT_LATENCY_AVG = new 
MetricNameTemplate(
-            "checkpoint-latency-ms-avg", CHECKPOINT_CONNECTOR_GROUP,
-            "Average time it takes consumer group offsets to replicate from 
source to target cluster.", GROUP_TAGS);
-
+            LATENCY_MS_AVG, CHECKPOINT_CONNECTOR_GROUP,
+            LATENCY_MS_AVG_DESCRIPTION, GROUP_TAGS);
 
     private final Metrics metrics;
     private final Map<String, GroupMetrics> groupMetrics = new HashMap<>();
     private final String source;
     private final String target;
 
-    MirrorCheckpointMetrics(MirrorCheckpointTaskConfig taskConfig) {
+    MirrorCheckpointLegacyMetrics(MirrorCheckpointTaskConfig taskConfig) {
         this.target = taskConfig.targetClusterAlias();
         this.source = taskConfig.sourceClusterAlias();
         this.metrics = new Metrics();
@@ -91,12 +98,7 @@ class MirrorCheckpointMetrics implements AutoCloseable {
         private final Sensor checkpointLatencySensor;
 
         GroupMetrics(TopicPartition topicPartition, String group) {
-            Map<String, String> tags = new LinkedHashMap<>();
-            tags.put("source", source); 
-            tags.put("target", target); 
-            tags.put("group", group);
-            tags.put("topic", topicPartition.topic());
-            tags.put("partition", 
Integer.toString(topicPartition.partition()));
+            Map<String, String> tags = groupTags(source, target, group, 
topicPartition);
  
             checkpointLatencySensor = metrics.sensor("checkpoint-latency");
             
checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY, tags), 
new Value());
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
index 71e3edebf5b..e7798e71141 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
@@ -16,10 +16,9 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
@@ -29,49 +28,27 @@ import org.apache.kafka.common.metrics.stats.Value;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
 
-/** Metrics for replicated topic-partitions */
-class MirrorCheckpointMetrics implements AutoCloseable {
+public class MirrorCheckpointMetrics {
 
-    private static final String CHECKPOINT_CONNECTOR_GROUP = 
MirrorCheckpointConnector.class.getSimpleName();
+    static final String LATENCY_MS = "checkpoint-latency-ms";
+    static final String LATENCY_MS_DESCRIPTION = "Time it takes consumer group 
offsets to replicate from source to target cluster.";
+    static final String LATENCY_MS_MAX = "checkpoint-latency-ms-max";
+    static final String LATENCY_MS_MAX_DESCRIPTION = "Max time it takes 
consumer group offsets to replicate from source to target cluster.";
+    static final String LATENCY_MS_MIN = "checkpoint-latency-ms-min";
+    static final String LATENCY_MS_MIN_DESCRIPTION = "Min time it takes 
consumer group offsets to replicate from source to target cluster.";
+    static final String LATENCY_MS_AVG = "checkpoint-latency-ms-avg";
+    static final String LATENCY_MS_AVG_DESCRIPTION = "Average time it takes 
consumer group offsets to replicate from source to target cluster.";
 
-    private static final Set<String> GROUP_TAGS = Set.of("source", "target", 
"group", "topic", "partition");
-
-    private static final MetricNameTemplate CHECKPOINT_LATENCY = new 
MetricNameTemplate(
-            "checkpoint-latency-ms", CHECKPOINT_CONNECTOR_GROUP,
-            "Time it takes consumer group offsets to replicate from source to 
target cluster.", GROUP_TAGS);
-    private static final MetricNameTemplate CHECKPOINT_LATENCY_MAX = new 
MetricNameTemplate(
-            "checkpoint-latency-ms-max", CHECKPOINT_CONNECTOR_GROUP,
-            "Max time it takes consumer group offsets to replicate from source 
to target cluster.", GROUP_TAGS);
-    private static final MetricNameTemplate CHECKPOINT_LATENCY_MIN = new 
MetricNameTemplate(
-            "checkpoint-latency-ms-min", CHECKPOINT_CONNECTOR_GROUP,
-            "Min time it takes consumer group offsets to replicate from source 
to target cluster.", GROUP_TAGS);
-    private static final MetricNameTemplate CHECKPOINT_LATENCY_AVG = new 
MetricNameTemplate(
-            "checkpoint-latency-ms-avg", CHECKPOINT_CONNECTOR_GROUP,
-            "Average time it takes consumer group offsets to replicate from 
source to target cluster.", GROUP_TAGS);
-
-
-    private final Metrics metrics;
     private final Map<String, GroupMetrics> groupMetrics = new HashMap<>();
+    private final PluginMetrics metrics;
     private final String source;
     private final String target;
 
-    MirrorCheckpointMetrics(MirrorCheckpointTaskConfig taskConfig) {
-        this.target = taskConfig.targetClusterAlias();
+    MirrorCheckpointMetrics(PluginMetrics metrics, MirrorCheckpointTaskConfig 
taskConfig) {
+        this.metrics = metrics;
         this.source = taskConfig.sourceClusterAlias();
-        this.metrics = new Metrics();
-
-        // for side-effect
-        metrics.sensor("record-count");
-        metrics.sensor("byte-rate");
-        metrics.sensor("record-age");
-        metrics.sensor("replication-latency");
-    }
-
-    @Override
-    public void close() {
-        metrics.close();
+        this.target = taskConfig.targetClusterAlias();
     }
 
     void checkpointLatency(TopicPartition topicPartition, String group, long 
millis) {
@@ -80,29 +57,34 @@ class MirrorCheckpointMetrics implements AutoCloseable {
 
     GroupMetrics group(TopicPartition topicPartition, String group) {
         return groupMetrics.computeIfAbsent(String.join("-", 
topicPartition.toString(), group),
-            x -> new GroupMetrics(topicPartition, group));
-    }
-
-    void addReporter(MetricsReporter reporter) {
-        metrics.addReporter(reporter);
+                x -> new GroupMetrics(topicPartition, group));
     }
 
     private class GroupMetrics {
         private final Sensor checkpointLatencySensor;
 
         GroupMetrics(TopicPartition topicPartition, String group) {
-            Map<String, String> tags = new LinkedHashMap<>();
-            tags.put("source", source); 
-            tags.put("target", target); 
-            tags.put("group", group);
-            tags.put("topic", topicPartition.topic());
-            tags.put("partition", 
Integer.toString(topicPartition.partition()));
- 
-            checkpointLatencySensor = metrics.sensor("checkpoint-latency");
-            
checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY, tags), 
new Value());
-            
checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY_MAX, 
tags), new Max());
-            
checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY_MIN, 
tags), new Min());
-            
checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY_AVG, 
tags), new Avg());
+            LinkedHashMap<String, String> tags = groupTags(source, target, 
group, topicPartition);
+
+            checkpointLatencySensor = metrics.addSensor("checkpoint-latency");
+            MetricName checkpointLatency = metrics.metricName(LATENCY_MS, 
LATENCY_MS_DESCRIPTION, tags);
+            MetricName checkpointLatencyMax = 
metrics.metricName(LATENCY_MS_MAX, LATENCY_MS_MAX_DESCRIPTION, tags);
+            MetricName checkpointLatencyMin = 
metrics.metricName(LATENCY_MS_MIN, LATENCY_MS_MIN_DESCRIPTION, tags);
+            MetricName checkpointLatencyAvg = 
metrics.metricName(LATENCY_MS_AVG, LATENCY_MS_AVG_DESCRIPTION, tags);
+            checkpointLatencySensor.add(checkpointLatency, new Value());
+            checkpointLatencySensor.add(checkpointLatencyMax, new Max());
+            checkpointLatencySensor.add(checkpointLatencyMin, new Min());
+            checkpointLatencySensor.add(checkpointLatencyAvg, new Avg());
         }
     }
+
+    static LinkedHashMap<String, String> groupTags(String source, String 
target, String group, TopicPartition topicPartition) {
+        LinkedHashMap<String, String> tags = new LinkedHashMap<>();
+        tags.put("source", source);
+        tags.put("target", target);
+        tags.put("group", group);
+        tags.put("topic", topicPartition.topic());
+        tags.put("partition", Integer.toString(topicPartition.partition()));
+        return tags;
+    }
 }
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index db86fbdb40b..f09aae0b077 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -46,6 +46,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_LEGACY;
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_NEW;
 import static org.apache.kafka.connect.mirror.MirrorUtils.adminCall;
 
 /** Emits checkpoints for upstream consumer groups. */
@@ -65,6 +67,7 @@ public class MirrorCheckpointTask extends SourceTask {
     private ReplicationPolicy replicationPolicy;
     private OffsetSyncStore offsetSyncStore;
     private boolean stopping;
+    private MirrorCheckpointLegacyMetrics legacyMetrics;
     private MirrorCheckpointMetrics metrics;
     private Scheduler scheduler;
     private Map<String, Map<TopicPartition, OffsetAndMetadata>> 
idleConsumerGroupsOffset;
@@ -104,7 +107,9 @@ public class MirrorCheckpointTask extends SourceTask {
         offsetSyncStore = new OffsetSyncStore(config);
         sourceAdminClient = 
config.forwardingAdmin(config.sourceAdminConfig("checkpoint-source-admin"));
         targetAdminClient = 
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
-        metrics = config.metrics();
+        List<String> metricNamesFormats = config.metricNamesFormats();
+        legacyMetrics = metricNamesFormats.contains(METRIC_NAMES_LEGACY) ? 
config.legacyMetrics() : null;
+        metrics = metricNamesFormats.contains(METRIC_NAMES_NEW) ? 
config.metrics(context.pluginMetrics()) : null;
         idleConsumerGroupsOffset = new HashMap<>();
         checkpointStore = new CheckpointStore(config, consumerGroups);
         scheduler = new Scheduler(getClass(), config.entityLabel(), 
config.adminTimeout());
@@ -136,7 +141,7 @@ public class MirrorCheckpointTask extends SourceTask {
         Utils.closeQuietly(offsetSyncStore, "offset sync store");
         Utils.closeQuietly(sourceAdminClient, "source admin client");
         Utils.closeQuietly(targetAdminClient, "target admin client");
-        Utils.closeQuietly(metrics, "metrics");
+        Utils.closeQuietly(legacyMetrics, "metrics");
         Utils.closeQuietly(scheduler, "scheduler");
         log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), 
System.currentTimeMillis() - start);
     }
@@ -282,9 +287,16 @@ public class MirrorCheckpointTask extends SourceTask {
 
     @Override
     public void commitRecord(SourceRecord record, RecordMetadata metadata) {
-        
metrics.checkpointLatency(MirrorUtils.unwrapPartition(record.sourcePartition()),
-            Checkpoint.unwrapGroup(record.sourcePartition()),
-            System.currentTimeMillis() - record.timestamp());
+        TopicPartition topicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
+        String group = Checkpoint.unwrapGroup(record.sourcePartition());
+        long millis = System.currentTimeMillis() - record.timestamp();
+        if (legacyMetrics != null) {
+            legacyMetrics.checkpointLatency(topicPartition, group, millis);
+        }
+        if (metrics != null) {
+            metrics.checkpointLatency(topicPartition, group, millis);
+        }
+
     }
 
     private void refreshIdleConsumerGroupOffset() throws ExecutionException, 
InterruptedException {
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java
index 3d2cfda6dcc..c985c6be5f1 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.metrics.PluginMetrics;
 
 import java.util.HashSet;
 import java.util.Map;
@@ -34,12 +35,16 @@ public class MirrorCheckpointTaskConfig extends 
MirrorCheckpointConfig {
         return new HashSet<>(getList(TASK_CONSUMER_GROUPS));
     }
 
-    MirrorCheckpointMetrics metrics() {
-        MirrorCheckpointMetrics metrics = new MirrorCheckpointMetrics(this);
+    MirrorCheckpointLegacyMetrics legacyMetrics() {
+        MirrorCheckpointLegacyMetrics metrics = new 
MirrorCheckpointLegacyMetrics(this);
         metricsReporters().forEach(metrics::addReporter);
         return metrics;
     }
 
+    MirrorCheckpointMetrics metrics(PluginMetrics pluginMetrics) {
+        return new MirrorCheckpointMetrics(pluginMetrics, this);
+    }
+
     @Override
     String entityLabel() {
         return super.entityLabel() + "-" + (getInt(TASK_INDEX) == null ? "?" : 
getInt(TASK_INDEX));
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index 9baf7c1f35c..96aec8330e0 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -122,6 +122,11 @@ public abstract class MirrorConnectorConfig extends 
AbstractConfig {
             EMIT_OFFSET_SYNCS_ENABLED + " is disabled.";
     public static final boolean EMIT_OFFSET_SYNCS_ENABLED_DEFAULT = true;
 
+    public static final String METRIC_NAMES_FORMAT = "metric.names.formats";
+    public static final String METRIC_NAMES_LEGACY = "legacy";
+    public static final String METRIC_NAMES_NEW = "new";
+    public static final String METRIC_NAMES_FORMAT_DEFAULT = 
METRIC_NAMES_LEGACY;
+
     public static final String OFFSET_SYNCS_CLIENT_ROLE_PREFIX = 
"offset-syncs-";
 
     public static final String TASK_INDEX = "task.index";
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java
index 15558567ef2..b629ac73882 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java
@@ -57,6 +57,7 @@ public class MirrorHeartbeatConfig extends 
MirrorConnectorConfig {
         return getShort(HEARTBEATS_TOPIC_REPLICATION_FACTOR);
     }
 
+
     private static ConfigDef defineHeartbeatConfig(ConfigDef baseConfig) {
         return baseConfig
                 .define(
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
index 2ec663ad2fc..13b2aa0bbbe 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
@@ -94,6 +94,14 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
             " If set to false, heartbeats topics will only be replicated if 
the topic filter allows.";
     public static final boolean HEARTBEATS_REPLICATION_ENABLED_DEFAULT = true;
 
+    public static final String METRIC_NAMES_FORMAT = "metric.names.formats";
+    public static final List<String> METRIC_NAMES_FORMAT_DEFAULT = 
List.of(METRIC_NAMES_LEGACY);
+    public static final String METRIC_NAMES_FORMAT_DOC = "Deprecated. The 
formats in which metrics are emitted. Valid values are legacy and new. " +
+            "When set to legacy, the metrics have the following name 
\"kafka.connect.mirror:type=MirrorSourceConnector,source={SOURCE},target={TARGET},topic={TOPIC},partition={PARTITION}\".
 " +
+            "When set to new, the metrics have the following name 
\"kafka.connect:type=plugins,connector={CONNECTOR},task={TASK},source={SOURCE},target={TARGET},topic={TOPIC},partition={PARTITION}\".
 " +
+            "When set to \"legacy,new\" the connector will emit metrics with 
both names, this can be useful when migrating to the new format but it doubles 
the amount of metrics that are emitted.\n" +
+            "In Kafka 5.0 the legacy format and this configuration will be 
removed and the metrics will always use the new names.";
+
     public static final String OFFSET_SYNCS_SOURCE_PRODUCER_ROLE = 
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-producer";
     public static final String OFFSET_SYNCS_TARGET_PRODUCER_ROLE = 
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "target-producer";
     public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE = 
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-admin";
@@ -203,6 +211,10 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
         return getBoolean(HEARTBEATS_REPLICATION_ENABLED);
     }
 
+    List<String> metricNamesFormats() {
+        return getList(METRIC_NAMES_FORMAT);
+    }
+
     private static ConfigDef defineSourceConfig(ConfigDef baseConfig) {
         return baseConfig
                 .define(
@@ -318,6 +330,14 @@ public class MirrorSourceConfig extends 
MirrorConnectorConfig {
                         HEARTBEATS_REPLICATION_ENABLED_DEFAULT,
                         ConfigDef.Importance.LOW,
                         HEARTBEATS_REPLICATION_ENABLED_DOC
+                )
+                .define(
+                        METRIC_NAMES_FORMAT,
+                        ConfigDef.Type.LIST,
+                        METRIC_NAMES_FORMAT_DEFAULT,
+                        ConfigDef.ValidList.in(false, METRIC_NAMES_LEGACY, 
METRIC_NAMES_NEW),
+                        ConfigDef.Importance.LOW,
+                        METRIC_NAMES_FORMAT_DOC
                 );
     }
 
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceLegacyMetrics.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceLegacyMetrics.java
new file mode 100644
index 00000000000..4df43a6ca34
--- /dev/null
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceLegacyMetrics.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.BYTE_COUNT_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.BYTE_RATE_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_AVG;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_AVG_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_MAX;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_MAX_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_MIN;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_MIN_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_COUNT_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_RATE_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_AVG;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_AVG_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_MAX;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_MAX_DESCRIPTION;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_MIN;
+import static 
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_MIN_DESCRIPTION;
+
+/** Metrics for replicated topic-partitions */
+class MirrorSourceLegacyMetrics implements AutoCloseable {
+
+    private static final String SOURCE_CONNECTOR_GROUP = 
MirrorSourceConnector.class.getSimpleName();
+
+    private static final Set<String> PARTITION_TAGS = Set.of("source", 
"target", "topic", "partition");
+    private static final MetricNameTemplate RECORD_COUNT = new 
MetricNameTemplate(
+            MirrorSourceMetrics.RECORD_COUNT, SOURCE_CONNECTOR_GROUP,
+            RECORD_COUNT_DESCRIPTION, PARTITION_TAGS);
+    private static final MetricNameTemplate RECORD_RATE = new 
MetricNameTemplate(
+            MirrorSourceMetrics.RECORD_RATE, SOURCE_CONNECTOR_GROUP,
+            RECORD_RATE_DESCRIPTION, PARTITION_TAGS);
+    private static final MetricNameTemplate RECORD_AGE = new 
MetricNameTemplate(
+            RECORD_AGE_MS, SOURCE_CONNECTOR_GROUP,
+            RECORD_AGE_MS_DESCRIPTION, PARTITION_TAGS);
+    private static final MetricNameTemplate RECORD_AGE_MAX = new 
MetricNameTemplate(
+            RECORD_AGE_MS_MAX, SOURCE_CONNECTOR_GROUP,
+            RECORD_AGE_MS_MAX_DESCRIPTION, PARTITION_TAGS);
+    private static final MetricNameTemplate RECORD_AGE_MIN = new 
MetricNameTemplate(
+            RECORD_AGE_MS_MIN, SOURCE_CONNECTOR_GROUP,
+            RECORD_AGE_MS_MIN_DESCRIPTION, PARTITION_TAGS);
+    private static final MetricNameTemplate RECORD_AGE_AVG = new 
MetricNameTemplate(
+            RECORD_AGE_MS_AVG, SOURCE_CONNECTOR_GROUP,
+            RECORD_AGE_MS_AVG_DESCRIPTION, PARTITION_TAGS);
+    private static final MetricNameTemplate BYTE_COUNT = new 
MetricNameTemplate(
+            MirrorSourceMetrics.BYTE_COUNT, SOURCE_CONNECTOR_GROUP,
+            BYTE_COUNT_DESCRIPTION, PARTITION_TAGS);
+    private static final MetricNameTemplate BYTE_RATE = new MetricNameTemplate(
+            MirrorSourceMetrics.BYTE_RATE, SOURCE_CONNECTOR_GROUP,
+            BYTE_RATE_DESCRIPTION, PARTITION_TAGS);
+    private static final MetricNameTemplate REPLICATION_LATENCY = new 
MetricNameTemplate(
+            REPLICATION_LATENCY_MS, SOURCE_CONNECTOR_GROUP,
+            REPLICATION_LATENCY_MS_DESCRIPTION, PARTITION_TAGS);
+    private static final MetricNameTemplate REPLICATION_LATENCY_MAX = new 
MetricNameTemplate(
+            REPLICATION_LATENCY_MS_MAX, SOURCE_CONNECTOR_GROUP,
+            REPLICATION_LATENCY_MS_MAX_DESCRIPTION, PARTITION_TAGS);
+    private static final MetricNameTemplate REPLICATION_LATENCY_MIN = new 
MetricNameTemplate(
+            REPLICATION_LATENCY_MS_MIN, SOURCE_CONNECTOR_GROUP,
+            REPLICATION_LATENCY_MS_MIN_DESCRIPTION, PARTITION_TAGS);
+    private static final MetricNameTemplate REPLICATION_LATENCY_AVG = new 
MetricNameTemplate(
+            REPLICATION_LATENCY_MS_AVG, SOURCE_CONNECTOR_GROUP,
+            REPLICATION_LATENCY_MS_AVG_DESCRIPTION, PARTITION_TAGS);
+
+    private final Metrics metrics;
+    private final Map<TopicPartition, PartitionMetrics> partitionMetrics;
+    private final String source;
+    private final String target;
+
+    MirrorSourceLegacyMetrics(MirrorSourceTaskConfig taskConfig) {
+        this.target = taskConfig.targetClusterAlias();
+        this.source = taskConfig.sourceClusterAlias();
+        this.metrics = new Metrics();
+
+        // for side-effect
+        metrics.sensor("record-count");
+        metrics.sensor("byte-rate");
+        metrics.sensor("record-age");
+        metrics.sensor("replication-latency");
+
+        ReplicationPolicy replicationPolicy = taskConfig.replicationPolicy();
+        partitionMetrics = taskConfig.taskTopicPartitions().stream()
+            .map(x -> new 
TopicPartition(replicationPolicy.formatRemoteTopic(source, x.topic()), 
x.partition()))
+            .collect(Collectors.toMap(x -> x, PartitionMetrics::new));
+    }
+
+    @Override
+    public void close() {
+        metrics.close();
+    }
+
+    void countRecord(TopicPartition topicPartition) {
+        partitionMetrics.get(topicPartition).recordSensor.record();
+    }
+
+    void recordAge(TopicPartition topicPartition, long ageMillis) {
+        partitionMetrics.get(topicPartition).recordAgeSensor.record((double) 
ageMillis);
+    }
+
+    void replicationLatency(TopicPartition topicPartition, long millis) {
+        
partitionMetrics.get(topicPartition).replicationLatencySensor.record((double) 
millis);
+    }
+
+    void recordBytes(TopicPartition topicPartition, long bytes) {
+        partitionMetrics.get(topicPartition).byteSensor.record((double) bytes);
+    }
+
+    void addReporter(MetricsReporter reporter) {
+        metrics.addReporter(reporter);
+    }
+
+    private class PartitionMetrics {
+        private final Sensor recordSensor;
+        private final Sensor byteSensor;
+        private final Sensor recordAgeSensor;
+        private final Sensor replicationLatencySensor;
+
+        PartitionMetrics(TopicPartition topicPartition) {
+            String prefix = topicPartition.topic() + "-" + 
topicPartition.partition() + "-";
+
+            Map<String, String> tags = new LinkedHashMap<>();
+            tags.put("source", source);
+            tags.put("target", target); 
+            tags.put("topic", topicPartition.topic());
+            tags.put("partition", 
Integer.toString(topicPartition.partition()));
+
+            recordSensor = metrics.sensor(prefix + "records-sent");
+            recordSensor.add(new Meter(metrics.metricInstance(RECORD_RATE, 
tags), metrics.metricInstance(RECORD_COUNT, tags)));
+
+            byteSensor = metrics.sensor(prefix + "bytes-sent");
+            byteSensor.add(new Meter(metrics.metricInstance(BYTE_RATE, tags), 
metrics.metricInstance(BYTE_COUNT, tags)));
+
+            recordAgeSensor = metrics.sensor(prefix + "record-age");
+            recordAgeSensor.add(metrics.metricInstance(RECORD_AGE, tags), new 
Value());
+            recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MAX, tags), 
new Max());
+            recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MIN, tags), 
new Min());
+            recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_AVG, tags), 
new Avg());
+
+            replicationLatencySensor = metrics.sensor(prefix + 
"replication-latency");
+            
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY, tags), 
new Value());
+            
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MAX, 
tags), new Max());
+            
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MIN, 
tags), new Min());
+            
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_AVG, 
tags), new Avg());
+        }
+    }
+}
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java
index c297c4c5fcf..ad2763c9710 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java
@@ -16,10 +16,9 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
@@ -29,94 +28,52 @@ import org.apache.kafka.common.metrics.stats.Value;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 
-/** Metrics for replicated topic-partitions */
-class MirrorSourceMetrics implements AutoCloseable {
-
-    private static final String SOURCE_CONNECTOR_GROUP = 
MirrorSourceConnector.class.getSimpleName();
-
-    private final MetricNameTemplate recordCount;
-    private final MetricNameTemplate recordRate;
-    private final MetricNameTemplate recordAge;
-    private final MetricNameTemplate recordAgeMax;
-    private final MetricNameTemplate recordAgeMin;
-    private final MetricNameTemplate recordAgeAvg;
-    private final MetricNameTemplate byteCount;
-    private final MetricNameTemplate byteRate;
-    private final MetricNameTemplate replicationLatency;
-    private final MetricNameTemplate replicationLatencyMax;
-    private final MetricNameTemplate replicationLatencyMin;
-    private final MetricNameTemplate replicationLatencyAvg;
-
-    private final Metrics metrics;
+public class MirrorSourceMetrics {
+
+    static final String RECORD_COUNT = "record-count";
+    static final String RECORD_COUNT_DESCRIPTION = "Number of source records 
replicated to the target cluster.";
+    static final String RECORD_RATE = "record-rate";
+    static final String RECORD_RATE_DESCRIPTION = "Average number of source 
records replicated to the target cluster per second.";
+    static final String RECORD_AGE_MS = "record-age-ms";
+    static final String RECORD_AGE_MS_DESCRIPTION = "The age of incoming 
source records when replicated to the target cluster.";
+    static final String RECORD_AGE_MS_MAX = "record-age-ms-max";
+    static final String RECORD_AGE_MS_MAX_DESCRIPTION = "The max age of 
incoming source records when replicated to the target cluster.";
+    static final String RECORD_AGE_MS_MIN = "record-age-ms-min";
+    static final String RECORD_AGE_MS_MIN_DESCRIPTION = "The min age of 
incoming source records when replicated to the target cluster.";
+    static final String RECORD_AGE_MS_AVG = "record-age-ms-avg";
+    static final String RECORD_AGE_MS_AVG_DESCRIPTION = "The average age of 
incoming source records when replicated to the target cluster.";
+    static final String BYTE_COUNT = "byte-count";
+    static final String BYTE_COUNT_DESCRIPTION = "Number of bytes replicated 
to the target cluster.";
+    static final String BYTE_RATE = "byte-rate";
+    static final String BYTE_RATE_DESCRIPTION = "Average number of bytes 
replicated per second.";
+    static final String REPLICATION_LATENCY_MS = "replication-latency-ms";
+    static final String REPLICATION_LATENCY_MS_DESCRIPTION = "Time it takes 
records to replicate from source to target cluster.";
+    static final String REPLICATION_LATENCY_MS_MAX = 
"replication-latency-ms-max";
+    static final String REPLICATION_LATENCY_MS_MAX_DESCRIPTION = "Max time it 
takes records to replicate from source to target cluster.";
+    static final String REPLICATION_LATENCY_MS_MIN = 
"replication-latency-ms-min";
+    static final String REPLICATION_LATENCY_MS_MIN_DESCRIPTION = "Min time it 
takes records to replicate from source to target cluster.";
+    static final String REPLICATION_LATENCY_MS_AVG = 
"replication-latency-ms-avg";
+    static final String REPLICATION_LATENCY_MS_AVG_DESCRIPTION = "Average time 
it takes records to replicate from source to target cluster.";
+
+    private final PluginMetrics metrics;
     private final Map<TopicPartition, PartitionMetrics> partitionMetrics;
     private final String source;
     private final String target;
 
-    MirrorSourceMetrics(MirrorSourceTaskConfig taskConfig) {
+    MirrorSourceMetrics(PluginMetrics pluginMetrics, MirrorSourceTaskConfig 
taskConfig) {
+        this.metrics = pluginMetrics;
         this.target = taskConfig.targetClusterAlias();
         this.source = taskConfig.sourceClusterAlias();
-        this.metrics = new Metrics();
-
-        Set<String> partitionTags = Set.of("source", "target", "topic", 
"partition");
-
-        recordCount = new MetricNameTemplate(
-                "record-count", SOURCE_CONNECTOR_GROUP,
-                "Number of source records replicated to the target cluster.", 
partitionTags);
-        recordRate = new MetricNameTemplate(
-                "record-rate", SOURCE_CONNECTOR_GROUP,
-                "Average number of source records replicated to the target 
cluster per second.", partitionTags);
-        recordAge = new MetricNameTemplate(
-                "record-age-ms", SOURCE_CONNECTOR_GROUP,
-                "The age of incoming source records when replicated to the 
target cluster.", partitionTags);
-        recordAgeMax = new MetricNameTemplate(
-                "record-age-ms-max", SOURCE_CONNECTOR_GROUP,
-                "The max age of incoming source records when replicated to the 
target cluster.", partitionTags);
-        recordAgeMin = new MetricNameTemplate(
-                "record-age-ms-min", SOURCE_CONNECTOR_GROUP,
-                "The min age of incoming source records when replicated to the 
target cluster.", partitionTags);
-        recordAgeAvg = new MetricNameTemplate(
-                "record-age-ms-avg", SOURCE_CONNECTOR_GROUP,
-                "The average age of incoming source records when replicated to 
the target cluster.", partitionTags);
-        byteCount = new MetricNameTemplate(
-                "byte-count", SOURCE_CONNECTOR_GROUP,
-                "Number of bytes replicated to the target cluster.", 
partitionTags);
-        byteRate = new MetricNameTemplate(
-                "byte-rate", SOURCE_CONNECTOR_GROUP,
-                "Average number of bytes replicated per second.", 
partitionTags);
-        replicationLatency = new MetricNameTemplate(
-                "replication-latency-ms", SOURCE_CONNECTOR_GROUP,
-                "Time it takes records to replicate from source to target 
cluster.", partitionTags);
-        replicationLatencyMax = new MetricNameTemplate(
-                "replication-latency-ms-max", SOURCE_CONNECTOR_GROUP,
-                "Max time it takes records to replicate from source to target 
cluster.", partitionTags);
-        replicationLatencyMin = new MetricNameTemplate(
-                "replication-latency-ms-min", SOURCE_CONNECTOR_GROUP,
-                "Min time it takes records to replicate from source to target 
cluster.", partitionTags);
-        replicationLatencyAvg = new MetricNameTemplate(
-                "replication-latency-ms-avg", SOURCE_CONNECTOR_GROUP,
-                "Average time it takes records to replicate from source to 
target cluster.", partitionTags);
-
-        // for side-effect
-        metrics.sensor("record-count");
-        metrics.sensor("byte-rate");
-        metrics.sensor("record-age");
-        metrics.sensor("replication-latency");
 
         ReplicationPolicy replicationPolicy = taskConfig.replicationPolicy();
         partitionMetrics = taskConfig.taskTopicPartitions().stream()
-            .map(x -> new 
TopicPartition(replicationPolicy.formatRemoteTopic(source, x.topic()), 
x.partition()))
-            .collect(Collectors.toMap(x -> x, PartitionMetrics::new));
+                .map(x -> new 
TopicPartition(replicationPolicy.formatRemoteTopic(source, x.topic()), 
x.partition()))
+                .collect(Collectors.toMap(x -> x, 
MirrorSourceMetrics.PartitionMetrics::new));
 
     }
 
-    @Override
-    public void close() {
-        metrics.close();
-    }
-
     void countRecord(TopicPartition topicPartition) {
         partitionMetrics.get(topicPartition).recordSensor.record();
     }
@@ -133,10 +90,6 @@ class MirrorSourceMetrics implements AutoCloseable {
         partitionMetrics.get(topicPartition).byteSensor.record((double) bytes);
     }
 
-    void addReporter(MetricsReporter reporter) {
-        metrics.addReporter(reporter);
-    }
-
     private class PartitionMetrics {
         private final Sensor recordSensor;
         private final Sensor byteSensor;
@@ -146,29 +99,42 @@ class MirrorSourceMetrics implements AutoCloseable {
         PartitionMetrics(TopicPartition topicPartition) {
             String prefix = topicPartition.topic() + "-" + 
topicPartition.partition() + "-";
 
-            Map<String, String> tags = new LinkedHashMap<>();
+            LinkedHashMap<String, String> tags = new LinkedHashMap<>();
             tags.put("source", source);
-            tags.put("target", target); 
+            tags.put("target", target);
             tags.put("topic", topicPartition.topic());
             tags.put("partition", 
Integer.toString(topicPartition.partition()));
 
-            recordSensor = metrics.sensor(prefix + "records-sent");
-            recordSensor.add(new Meter(metrics.metricInstance(recordRate, 
tags), metrics.metricInstance(recordCount, tags)));
-
-            byteSensor = metrics.sensor(prefix + "bytes-sent");
-            byteSensor.add(new Meter(metrics.metricInstance(byteRate, tags), 
metrics.metricInstance(byteCount, tags)));
-
-            recordAgeSensor = metrics.sensor(prefix + "record-age");
-            recordAgeSensor.add(metrics.metricInstance(recordAge, tags), new 
Value());
-            recordAgeSensor.add(metrics.metricInstance(recordAgeMax, tags), 
new Max());
-            recordAgeSensor.add(metrics.metricInstance(recordAgeMin, tags), 
new Min());
-            recordAgeSensor.add(metrics.metricInstance(recordAgeAvg, tags), 
new Avg());
-
-            replicationLatencySensor = metrics.sensor(prefix + 
"replication-latency");
-            
replicationLatencySensor.add(metrics.metricInstance(replicationLatency, tags), 
new Value());
-            
replicationLatencySensor.add(metrics.metricInstance(replicationLatencyMax, 
tags), new Max());
-            
replicationLatencySensor.add(metrics.metricInstance(replicationLatencyMin, 
tags), new Min());
-            
replicationLatencySensor.add(metrics.metricInstance(replicationLatencyAvg, 
tags), new Avg());
+            MetricName recordCount = metrics.metricName(RECORD_COUNT, 
RECORD_COUNT_DESCRIPTION, tags);
+            MetricName recordRate = metrics.metricName(RECORD_RATE, 
RECORD_RATE_DESCRIPTION, tags);
+            MetricName recordAge = metrics.metricName(RECORD_AGE_MS, 
RECORD_AGE_MS_DESCRIPTION, tags);
+            MetricName recordAgeMax = metrics.metricName(RECORD_AGE_MS_MAX, 
RECORD_AGE_MS_MAX_DESCRIPTION, tags);
+            MetricName recordAgeMin = metrics.metricName(RECORD_AGE_MS_MIN, 
RECORD_AGE_MS_MIN_DESCRIPTION, tags);
+            MetricName recordAgeAvg = metrics.metricName(RECORD_AGE_MS_AVG, 
RECORD_AGE_MS_AVG_DESCRIPTION, tags);
+            MetricName byteCount = metrics.metricName(BYTE_COUNT, 
BYTE_COUNT_DESCRIPTION, tags);
+            MetricName byteRate = metrics.metricName(BYTE_RATE, 
BYTE_RATE_DESCRIPTION, tags);
+            MetricName replicationLatency = 
metrics.metricName(REPLICATION_LATENCY_MS, REPLICATION_LATENCY_MS_DESCRIPTION, 
tags);
+            MetricName replicationLatencyMax = 
metrics.metricName(REPLICATION_LATENCY_MS_MAX, 
REPLICATION_LATENCY_MS_MAX_DESCRIPTION, tags);
+            MetricName replicationLatencyMin = 
metrics.metricName(REPLICATION_LATENCY_MS_MIN, 
REPLICATION_LATENCY_MS_MIN_DESCRIPTION, tags);
+            MetricName replicationLatencyAvg = 
metrics.metricName(REPLICATION_LATENCY_MS_AVG, 
REPLICATION_LATENCY_MS_AVG_DESCRIPTION, tags);
+
+            recordSensor = metrics.addSensor(prefix + "records-sent");
+            recordSensor.add(new Meter(recordRate, recordCount));
+
+            byteSensor = metrics.addSensor(prefix + "bytes-sent");
+            byteSensor.add(new Meter(byteRate, byteCount));
+
+            recordAgeSensor = metrics.addSensor(prefix + "record-age");
+            recordAgeSensor.add(recordAge, new Value());
+            recordAgeSensor.add(recordAgeMax, new Max());
+            recordAgeSensor.add(recordAgeMin, new Min());
+            recordAgeSensor.add(recordAgeAvg, new Avg());
+
+            replicationLatencySensor = metrics.addSensor(prefix + 
"replication-latency");
+            replicationLatencySensor.add(replicationLatency, new Value());
+            replicationLatencySensor.add(replicationLatencyMax, new Max());
+            replicationLatencySensor.add(replicationLatencyMin, new Min());
+            replicationLatencySensor.add(replicationLatencyAvg, new Avg());
         }
     }
 }
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index 6ab65bebdca..cd6b9b01eed 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -42,6 +42,9 @@ import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_LEGACY;
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_NEW;
+
 /** Replicates a set of topic-partitions. */
 public class MirrorSourceTask extends SourceTask {
 
@@ -51,6 +54,7 @@ public class MirrorSourceTask extends SourceTask {
     private String sourceClusterAlias;
     private Duration pollTimeout;
     private ReplicationPolicy replicationPolicy;
+    private MirrorSourceLegacyMetrics legacyMetrics;
     private MirrorSourceMetrics metrics;
     private boolean stopping = false;
     private Semaphore consumerAccess;
@@ -59,11 +63,11 @@ public class MirrorSourceTask extends SourceTask {
     public MirrorSourceTask() {}
 
     // for testing
-    MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, 
MirrorSourceMetrics metrics, String sourceClusterAlias,
+    MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, 
MirrorSourceLegacyMetrics metrics, String sourceClusterAlias,
                      ReplicationPolicy replicationPolicy,
                      OffsetSyncWriter offsetSyncWriter) {
         this.consumer = consumer;
-        this.metrics = metrics;
+        this.legacyMetrics = metrics;
         this.sourceClusterAlias = sourceClusterAlias;
         this.replicationPolicy = replicationPolicy;
         consumerAccess = new Semaphore(1);
@@ -75,7 +79,9 @@ public class MirrorSourceTask extends SourceTask {
         MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
         consumerAccess = new Semaphore(1);  // let one thread at a time access 
the consumer
         sourceClusterAlias = config.sourceClusterAlias();
-        metrics = config.metrics();
+        List<String> metricNamesFormats = config.metricNamesFormats();
+        legacyMetrics = metricNamesFormats.contains(METRIC_NAMES_LEGACY) ? 
config.legacyMetrics() : null;
+        metrics = metricNamesFormats.contains(METRIC_NAMES_NEW) ? 
config.metrics(context.pluginMetrics()) : null;
         pollTimeout = config.consumerPollTimeout();
         replicationPolicy = config.replicationPolicy();
         if (config.emitOffsetSyncsEnabled()) {
@@ -114,7 +120,7 @@ public class MirrorSourceTask extends SourceTask {
         }
         Utils.closeQuietly(consumer, "source consumer");
         Utils.closeQuietly(offsetSyncWriter, "offset sync writer");
-        Utils.closeQuietly(metrics, "metrics");
+        Utils.closeQuietly(legacyMetrics, "metrics");
         log.info("Stopping {} took {} ms.", Thread.currentThread().getName(), 
System.currentTimeMillis() - start);
     }
    
@@ -138,8 +144,16 @@ public class MirrorSourceTask extends SourceTask {
                 SourceRecord converted = convertRecord(record);
                 sourceRecords.add(converted);
                 TopicPartition topicPartition = new 
TopicPartition(converted.topic(), converted.kafkaPartition());
-                metrics.recordAge(topicPartition, System.currentTimeMillis() - 
record.timestamp());
-                metrics.recordBytes(topicPartition, byteSize(record.value()));
+                long age = System.currentTimeMillis() - record.timestamp();
+                long size = byteSize(record.value());
+                if (legacyMetrics != null) {
+                    legacyMetrics.recordAge(topicPartition, age);
+                    legacyMetrics.recordBytes(topicPartition, size);
+                }
+                if (metrics != null) {
+                    metrics.recordAge(topicPartition, age);
+                    metrics.recordBytes(topicPartition, size);
+                }
             }
             if (sourceRecords.isEmpty()) {
                 // WorkerSourceTasks expects non-zero batch size
@@ -177,8 +191,14 @@ public class MirrorSourceTask extends SourceTask {
         }
         TopicPartition topicPartition = new TopicPartition(record.topic(), 
record.kafkaPartition());
         long latency = System.currentTimeMillis() - record.timestamp();
-        metrics.countRecord(topicPartition);
-        metrics.replicationLatency(topicPartition, latency);
+        if (legacyMetrics != null) {
+            legacyMetrics.countRecord(topicPartition);
+            legacyMetrics.replicationLatency(topicPartition, latency);
+        }
+        if (metrics != null) {
+            metrics.countRecord(topicPartition);
+            metrics.replicationLatency(topicPartition, latency);
+        }
         // Queue offset syncs only when offsetWriter is available
         if (offsetSyncWriter != null) {
             TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java
index aa5d300c00a..1996ea2c632 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.metrics.PluginMetrics;
 
 import java.util.List;
 import java.util.Map;
@@ -39,12 +40,16 @@ public class MirrorSourceTaskConfig extends 
MirrorSourceConfig {
             .collect(Collectors.toSet());
     }
 
-    MirrorSourceMetrics metrics() {
-        MirrorSourceMetrics metrics = new MirrorSourceMetrics(this);
+    MirrorSourceLegacyMetrics legacyMetrics() {
+        MirrorSourceLegacyMetrics metrics = new 
MirrorSourceLegacyMetrics(this);
         metricsReporters().forEach(metrics::addReporter);
         return metrics;
     }
 
+    MirrorSourceMetrics metrics(PluginMetrics pluginMetrics) {
+        return new MirrorSourceMetrics(pluginMetrics, this);
+    }
+
     @Override
     String entityLabel() {
         return super.entityLabel() + "-" + (getInt(TASK_INDEX) == null ? "?" : 
getInt(TASK_INDEX));
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorLegacyMetricsTest.java
similarity index 66%
copy from 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
copy to 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorLegacyMetricsTest.java
index 57eae629baa..42f413e8120 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorLegacyMetricsTest.java
@@ -31,12 +31,13 @@ import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class MirrorSourceMetricsTest {
+public class MirrorLegacyMetricsTest {
 
     private static final String SOURCE = "source";
     private static final String TARGET = "target";
     private static final TopicPartition TP = new TopicPartition("topic", 0);
     private static final TopicPartition SOURCE_TP = new TopicPartition(SOURCE 
+ "." + TP.topic(), TP.partition());
+    private static final String GROUP = "my-group";
 
     private final Map<String, String> configs = new HashMap<>();
     private TestReporter reporter;
@@ -44,25 +45,51 @@ public class MirrorSourceMetricsTest {
     @BeforeEach
     public void setUp() {
         configs.put(ConnectorConfig.NAME_CONFIG, "name");
-        configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
MirrorSourceConnector.class.getName());
         configs.put(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS, SOURCE);
         configs.put(MirrorConnectorConfig.TARGET_CLUSTER_ALIAS, TARGET);
         configs.put(MirrorConnectorConfig.TASK_INDEX, "0");
-        configs.put(MirrorSourceTaskConfig.TASK_TOPIC_PARTITIONS, 
TP.toString());
         reporter = new TestReporter();
     }
 
     @Test
-    public void testTags() {
+    public void testSourceTags() {
+        configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
MirrorSourceConnector.class.getName());
+        configs.put(MirrorSourceTaskConfig.TASK_TOPIC_PARTITIONS, 
TP.toString());
         MirrorSourceTaskConfig taskConfig = new 
MirrorSourceTaskConfig(configs);
-        MirrorSourceMetrics metrics = new MirrorSourceMetrics(taskConfig);
+        MirrorSourceLegacyMetrics metrics = new 
MirrorSourceLegacyMetrics(taskConfig);
         metrics.addReporter(reporter);
 
         metrics.countRecord(SOURCE_TP);
+        // 12 MirrorSourceConnector metrics + the metrics count metric
         assertEquals(13, reporter.metrics.size());
-        Map<String, String> tags = reporter.metrics.get(0).metricName().tags();
+        Map<String, String> tags = reporter.metrics.stream()
+                .filter(m -> 
m.metricName().group().equals(MirrorSourceConnector.class.getSimpleName()))
+                .findFirst()
+                .get().metricName().tags();
+        assertEquals(SOURCE, tags.get("source"));
+        assertEquals(TARGET, tags.get("target"));
+        assertEquals(SOURCE_TP.topic(), tags.get("topic"));
+        assertEquals(String.valueOf(SOURCE_TP.partition()), 
tags.get("partition"));
+    }
+
+    @Test
+    public void testCheckpointTags() {
+        configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
MirrorCheckpointConnector.class.getName());
+        configs.put(MirrorCheckpointTaskConfig.TASK_CONSUMER_GROUPS, GROUP);
+        MirrorCheckpointTaskConfig taskConfig = new 
MirrorCheckpointTaskConfig(configs);
+        MirrorCheckpointLegacyMetrics metrics = new 
MirrorCheckpointLegacyMetrics(taskConfig);
+        metrics.addReporter(reporter);
+
+        metrics.checkpointLatency(SOURCE_TP, GROUP, 1L);
+        // 4 MirrorCheckpointConnector metrics + the metrics count metric
+        assertEquals(5, reporter.metrics.size());
+        Map<String, String> tags = reporter.metrics.stream()
+                .filter(m -> 
m.metricName().group().equals(MirrorCheckpointConnector.class.getSimpleName()))
+                .findFirst()
+                .get().metricName().tags();
         assertEquals(SOURCE, tags.get("source"));
         assertEquals(TARGET, tags.get("target"));
+        assertEquals(GROUP, tags.get("group"));
         assertEquals(SOURCE_TP.topic(), tags.get("topic"));
         assertEquals(String.valueOf(SOURCE_TP.partition()), 
tags.get("partition"));
     }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMetricsTest.java
similarity index 54%
rename from 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
rename to 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMetricsTest.java
index 57eae629baa..566ab236bc9 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMetricsTest.java
@@ -17,79 +17,77 @@
 package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.KafkaMetric;
-import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.PluginMetrics;
+import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class MirrorSourceMetricsTest {
+public class MirrorMetricsTest {
 
     private static final String SOURCE = "source";
     private static final String TARGET = "target";
     private static final TopicPartition TP = new TopicPartition("topic", 0);
     private static final TopicPartition SOURCE_TP = new TopicPartition(SOURCE 
+ "." + TP.topic(), TP.partition());
+    private static final String GROUP = "my-group";
 
     private final Map<String, String> configs = new HashMap<>();
-    private TestReporter reporter;
+    private final Metrics metrics = new Metrics();
+    private final PluginMetrics pluginMetrics = new PluginMetricsImpl(metrics, 
Map.of());
 
     @BeforeEach
     public void setUp() {
         configs.put(ConnectorConfig.NAME_CONFIG, "name");
-        configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
MirrorSourceConnector.class.getName());
         configs.put(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS, SOURCE);
         configs.put(MirrorConnectorConfig.TARGET_CLUSTER_ALIAS, TARGET);
         configs.put(MirrorConnectorConfig.TASK_INDEX, "0");
-        configs.put(MirrorSourceTaskConfig.TASK_TOPIC_PARTITIONS, 
TP.toString());
-        reporter = new TestReporter();
     }
 
     @Test
-    public void testTags() {
+    public void testSourceTags() {
+        configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
MirrorSourceConnector.class.getName());
+        configs.put(MirrorSourceTaskConfig.TASK_TOPIC_PARTITIONS, 
TP.toString());
         MirrorSourceTaskConfig taskConfig = new 
MirrorSourceTaskConfig(configs);
-        MirrorSourceMetrics metrics = new MirrorSourceMetrics(taskConfig);
-        metrics.addReporter(reporter);
-
-        metrics.countRecord(SOURCE_TP);
-        assertEquals(13, reporter.metrics.size());
-        Map<String, String> tags = reporter.metrics.get(0).metricName().tags();
+        MirrorSourceMetrics sourceMetrics = new 
MirrorSourceMetrics(pluginMetrics, taskConfig);
+
+        sourceMetrics.countRecord(SOURCE_TP);
+        // 12 MirrorSourceConnector metrics + the metrics count metric
+        assertEquals(13, metrics.metrics().size());
+        Map<String, String> tags = metrics.metrics().keySet().stream()
+                .filter(m -> m.group().equals("plugins"))
+                .findFirst()
+                .get().tags();
         assertEquals(SOURCE, tags.get("source"));
         assertEquals(TARGET, tags.get("target"));
         assertEquals(SOURCE_TP.topic(), tags.get("topic"));
         assertEquals(String.valueOf(SOURCE_TP.partition()), 
tags.get("partition"));
     }
 
-    static class TestReporter implements MetricsReporter {
-
-        List<KafkaMetric> metrics = new ArrayList<>();
-
-        @Override
-        public void init(List<KafkaMetric> metrics) {
-            for (KafkaMetric metric : metrics) {
-                metricChange(metric);
-            }
-        }
-
-        @Override
-        public void metricChange(KafkaMetric metric) {
-            metrics.add(metric);
-        }
-
-        @Override
-        public void metricRemoval(KafkaMetric metric) {}
-
-        @Override
-        public void close() {}
-
-        @Override
-        public void configure(Map<String, ?> configs) {}
+    @Test
+    public void testCheckpointTags() {
+        configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
MirrorCheckpointConnector.class.getName());
+        configs.put(MirrorCheckpointTaskConfig.TASK_CONSUMER_GROUPS, GROUP);
+        MirrorCheckpointTaskConfig taskConfig = new 
MirrorCheckpointTaskConfig(configs);
+        MirrorCheckpointMetrics checkpointMetrics = new 
MirrorCheckpointMetrics(pluginMetrics, taskConfig);
+
+        checkpointMetrics.checkpointLatency(SOURCE_TP, GROUP, 1L);
+        // 4 MirrorCheckpointConnector metrics + the metrics count metric
+        assertEquals(5, metrics.metrics().size());
+        Map<String, String> tags = metrics.metrics().keySet().stream()
+                .filter(m -> m.group().equals("plugins"))
+                .findFirst()
+                .get().tags();
+        assertEquals(SOURCE, tags.get("source"));
+        assertEquals(TARGET, tags.get("target"));
+        assertEquals(GROUP, tags.get("group"));
+        assertEquals(SOURCE_TP.topic(), tags.get("topic"));
+        assertEquals(String.valueOf(SOURCE_TP.partition()), 
tags.get("partition"));
     }
 }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 4a676855378..ba23b42b80b 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -181,7 +181,7 @@ public class MirrorSourceTaskTest {
         KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
         when(consumer.poll(any())).thenReturn(consumerRecords);
 
-        MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+        MirrorSourceLegacyMetrics metrics = 
mock(MirrorSourceLegacyMetrics.class);
 
         String sourceClusterName = "cluster1";
         ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
@@ -280,7 +280,7 @@ public class MirrorSourceTaskTest {
 
         @SuppressWarnings("unchecked")
         KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
-        MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+        MirrorSourceLegacyMetrics metrics = 
mock(MirrorSourceLegacyMetrics.class);
 
         String sourceClusterName = "cluster1";
         ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
@@ -310,7 +310,7 @@ public class MirrorSourceTaskTest {
 
         @SuppressWarnings("unchecked")
         KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
-        MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+        MirrorSourceLegacyMetrics metrics = 
mock(MirrorSourceLegacyMetrics.class);
         PartitionState partitionState = new PartitionState(maxOffsetLag);
         Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
         OffsetSyncWriter offsetSyncWriter = mock(OffsetSyncWriter.class);
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 6d1d50f558b..e266acce74a 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.mirror.integration;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.AlterConfigOp;
@@ -26,15 +27,20 @@ import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
@@ -56,6 +62,7 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
 import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestCondition;
 
 import org.junit.jupiter.api.AfterEach;
@@ -84,10 +91,13 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.LongUnaryOperator;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_LEGACY;
+import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_NEW;
 import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_CLIENT_ROLE_PREFIX;
 import static 
org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_TOPIC_CONFIG_PREFIX;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
@@ -202,7 +212,9 @@ public class MirrorConnectorsIntegrationBaseTest {
         mm2Config = new MirrorMakerConfig(mm2Props);
         primaryWorkerProps = mm2Config.workerConfig(new 
SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS));
         backupWorkerProps.putAll(mm2Config.workerConfig(new 
SourceAndTarget(PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS)));
-        
+        // Add the custom reporter to the worker configuration to collect 
MirrorMarker metrics with the new formats
+        
backupWorkerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
CollectAllMetricsReporter.class.getName());
+
         primary = new EmbeddedConnectCluster.Builder()
                 .name(PRIMARY_CLUSTER_ALIAS + "-connect-cluster")
                 .numWorkers(NUM_WORKERS)
@@ -1001,6 +1013,108 @@ public class MirrorConnectorsIntegrationBaseTest {
         });
     }
 
+    @Test
+    public void testConnectorMetricsNew() throws InterruptedException, 
ExecutionException {
+        testConnectorMetrics(METRIC_NAMES_NEW, () -> assertMetrics(false));
+    }
+
+    @Test
+    public void testConnectorMetricsLegacy() throws InterruptedException, 
ExecutionException {
+        testConnectorMetrics(METRIC_NAMES_LEGACY, () -> assertMetrics(true));
+    }
+
+    @Test
+    public void testConnectorMetricsDefault() throws InterruptedException, 
ExecutionException {
+        testConnectorMetrics(null, () -> assertMetrics(true));
+    }
+
+    @Test
+    public void testConnectorMetricsNewAndLegacy() throws 
InterruptedException, ExecutionException {
+        testConnectorMetrics(METRIC_NAMES_NEW + "," + METRIC_NAMES_LEGACY, () 
-> assertMetrics(true) && assertMetrics(false));
+    }
+
+    private void testConnectorMetrics(String format, Supplier<Boolean> 
assertions) throws InterruptedException, ExecutionException {
+        // one way replication from primary to backup
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
+        if (format != null) {
+            mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + 
"." + MirrorConnectorConfig.METRIC_NAMES_FORMAT, format);
+        }
+        // Add the custom reporter to the connector configuration to collect 
MirrorMarker metrics with the old formats
+        mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + "." 
+ CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
CollectAllMetricsReporter.class.getName());
+
+        
backupWorkerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
CollectAllMetricsReporter.class.getName());
+        mm2Config = new MirrorMakerConfig(mm2Props);
+
+        String topic = "test-topic-metrics";
+        primary.kafka().createTopic(topic);
+        try (KafkaProducer<byte[], byte[]> producer = 
primary.kafka().createProducer(Map.of())) {
+            for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+                producer.send(new ProducerRecord<>(topic, ("value" + 
i).getBytes())).get();
+            }
+        }
+
+        waitUntilMirrorMakerIsRunning(backup,
+                List.of(MirrorSourceConnector.class, 
MirrorCheckpointConnector.class),
+                mm2Config,
+                PRIMARY_CLUSTER_ALIAS,
+                BACKUP_CLUSTER_ALIAS);
+
+        try (KafkaConsumer<byte[], byte[]> consumer = 
primary.kafka().createConsumer(
+                Map.of("group.id", "consumer-group-metrics",
+                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
+                        ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "1"))) {
+            int consumed = 0;
+            consumer.subscribe(List.of(topic));
+            while (consumed < NUM_RECORDS_PRODUCED) {
+                ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(100));
+                consumer.commitSync();
+                consumed += records.count();
+            }
+        }
+
+        waitForCondition(assertions::get, 30_000L, "Unable to find the 
MirrorMaker metrics");
+    }
+
+    private boolean assertMetrics(boolean legacy) {
+        Set<String> expectedSourceTags = legacy
+                ? Set.of("source", "target", "topic", "partition")
+                : Set.of("connector", "task", "source", "target", "topic", 
"partition");
+        Set<String> expectedCheckpointTags = legacy
+                ? Set.of("source", "target", "group", "topic", "partition")
+                : Set.of("connector", "task", "source", "target", "group", 
"topic", "partition");
+        String source = MirrorSourceConnector.class.getSimpleName();
+        String checkpoint = MirrorCheckpointConnector.class.getSimpleName();
+        // We have 4 topics totalling 13 partitions
+        // primary.test-topic - 10 partitions
+        // primary.test-topic-no-checkpoints - 1 partition
+        // primary.test-topic-metrics - 1 partition
+        // primary.heartbeats - 1 partition
+        // There are 12 metrics per partition, 12 x 13 = 156
+        int expectedSourceCount = 156;
+        // We have 1 consumer group (consumer-group-metrics)
+        // There are 4 metrics per group
+        int expectedCheckpointCount = 4;
+
+        int sourceMetrics = 0;
+        int checkpointMetrics = 0;
+        for (MetricName metricName : 
CollectAllMetricsReporter.METRICS.keySet()) {
+            Map<String, String> tags = metricName.tags();
+            if ((legacy && metricName.group().equals(source)) ||
+                    (!legacy && metricName.group().equals("plugins") && 
source.equals(tags.get("connector")))) {
+                assertEquals(expectedSourceTags, tags.keySet());
+                sourceMetrics++;
+            }
+
+
+            if ((legacy && metricName.group().equals(checkpoint)) ||
+                    (!legacy && metricName.group().equals("plugins") && 
checkpoint.equals(tags.get("connector")))) {
+                assertEquals(expectedCheckpointTags, tags.keySet());
+                checkpointMetrics++;
+            }
+        }
+        return sourceMetrics == expectedSourceCount && checkpointMetrics == 
expectedCheckpointCount;
+    }
+
     private TopicPartition remoteTopicPartition(TopicPartition tp, String 
alias) {
         return new TopicPartition(remoteTopicName(tp.topic(), alias), 
tp.partition());
     }
@@ -1498,4 +1612,21 @@ public class MirrorConnectorsIntegrationBaseTest {
             );
         }
     }
+
+    public static class CollectAllMetricsReporter extends MockMetricsReporter {
+
+        public static final Map<MetricName, KafkaMetric> METRICS = new  
HashMap<>();
+
+        @Override
+        public void init(List<KafkaMetric> metrics) {
+            for (KafkaMetric metric : metrics) {
+                METRICS.put(metric.metricName(), metric);
+            }
+        }
+
+        @Override
+        public void metricChange(KafkaMetric metric) {
+            METRICS.put(metric.metricName(), metric);
+        }
+    }
 }

Reply via email to