This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b2048a5908e KAFKA-19149: Add KIP-877 support to MirrorMaker (#21576)
b2048a5908e is described below
commit b2048a5908ef5cea5d6716d40d2a48f6c614c9e3
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Mar 17 09:56:53 2026 +0100
KAFKA-19149: Add KIP-877 support to MirrorMaker (#21576)
Implements [KIP-1280](https://cwiki.apache.org/confluence/x/a4s8G)
Reviewers: Luke Chen <[email protected]>
---
.../connect/mirror/MirrorCheckpointConfig.java | 21 ++-
...ics.java => MirrorCheckpointLegacyMetrics.java} | 40 ++---
.../connect/mirror/MirrorCheckpointMetrics.java | 92 +++++------
.../kafka/connect/mirror/MirrorCheckpointTask.java | 22 ++-
.../connect/mirror/MirrorCheckpointTaskConfig.java | 9 +-
.../connect/mirror/MirrorConnectorConfig.java | 5 +
.../connect/mirror/MirrorHeartbeatConfig.java | 1 +
.../kafka/connect/mirror/MirrorSourceConfig.java | 20 +++
.../connect/mirror/MirrorSourceLegacyMetrics.java | 180 +++++++++++++++++++++
.../kafka/connect/mirror/MirrorSourceMetrics.java | 166 ++++++++-----------
.../kafka/connect/mirror/MirrorSourceTask.java | 36 ++++-
.../connect/mirror/MirrorSourceTaskConfig.java | 9 +-
...tricsTest.java => MirrorLegacyMetricsTest.java} | 39 ++++-
...urceMetricsTest.java => MirrorMetricsTest.java} | 78 +++++----
.../kafka/connect/mirror/MirrorSourceTaskTest.java | 6 +-
.../MirrorConnectorsIntegrationBaseTest.java | 133 ++++++++++++++-
16 files changed, 615 insertions(+), 242 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
index b7625da619d..b397ff742e8 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
@@ -68,6 +68,12 @@ public class MirrorCheckpointConfig extends
MirrorConnectorConfig {
private static final String SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DOC =
"Frequency of consumer group offset sync.";
public static final long SYNC_GROUP_OFFSETS_INTERVAL_SECONDS_DEFAULT = 60;
+ public static final String METRIC_NAMES_FORMAT_DOC = "Deprecated. The
formats in which metrics are emitted. Valid values are legacy and new. " +
+ "When set to legacy, the metrics have the following name
\"kafka.connect.mirror:type=MirrorCheckpointConnector,source={SOURCE},target={TARGET},group={GROUP},topic={TOPIC},partition={PARTITION}\".
" +
+ "When set to new, the metrics have the following name
\"kafka.connect:type=plugins,connector={CONNECTOR},task={TASK},source={SOURCE},target={TARGET},group={GROUP},topic={TOPIC},partition={PARTITION}\".
" +
+ "When set to \"legacy,new\" the connector will emit metrics with
both names, this can be useful when migrating to the new format but it doubles
the amount of metrics that are emitted.\n" +
+ "In Kafka 5.0 the legacy format and this configuration will be
removed and the metrics will always use the new names.";
+
public static final String GROUP_FILTER_CLASS = "group.filter.class";
private static final String GROUP_FILTER_CLASS_DOC = "GroupFilter to use.
Selects consumer groups to replicate.";
public static final Class<?> GROUP_FILTER_CLASS_DEFAULT =
DefaultGroupFilter.class;
@@ -162,6 +168,10 @@ public class MirrorCheckpointConfig extends
MirrorConnectorConfig {
return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
}
+ List<String> metricNamesFormats() {
+ return getList(METRIC_NAMES_FORMAT);
+ }
+
public static Map<String, String> validate(Map<String, String> configs) {
Map<String, String> invalidConfigs = new HashMap<>();
@@ -264,7 +274,16 @@ public class MirrorCheckpointConfig extends
MirrorConnectorConfig {
ConfigDef.Type.CLASS,
TOPIC_FILTER_CLASS_DEFAULT,
ConfigDef.Importance.LOW,
- TOPIC_FILTER_CLASS_DOC);
+ TOPIC_FILTER_CLASS_DOC
+ )
+ .define(
+ METRIC_NAMES_FORMAT,
+ ConfigDef.Type.LIST,
+ METRIC_NAMES_FORMAT_DEFAULT,
+ ConfigDef.ValidList.in(false, METRIC_NAMES_LEGACY,
METRIC_NAMES_NEW),
+ ConfigDef.Importance.LOW,
+ METRIC_NAMES_FORMAT_DOC
+ );
}
protected static final ConfigDef CONNECTOR_CONFIG_DEF =
defineCheckpointConfig(new ConfigDef(BASE_CONNECTOR_CONFIG_DEF));
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointLegacyMetrics.java
similarity index 72%
copy from
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
copy to
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointLegacyMetrics.java
index 71e3edebf5b..dc592f512f5 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointLegacyMetrics.java
@@ -27,37 +27,44 @@ import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Value;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import static
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS;
+import static
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_AVG;
+import static
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_AVG_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_MAX;
+import static
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_MAX_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_MIN;
+import static
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.LATENCY_MS_MIN_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorCheckpointMetrics.groupTags;
+
/** Metrics for replicated topic-partitions */
-class MirrorCheckpointMetrics implements AutoCloseable {
+class MirrorCheckpointLegacyMetrics implements AutoCloseable {
private static final String CHECKPOINT_CONNECTOR_GROUP =
MirrorCheckpointConnector.class.getSimpleName();
private static final Set<String> GROUP_TAGS = Set.of("source", "target",
"group", "topic", "partition");
-
private static final MetricNameTemplate CHECKPOINT_LATENCY = new
MetricNameTemplate(
- "checkpoint-latency-ms", CHECKPOINT_CONNECTOR_GROUP,
- "Time it takes consumer group offsets to replicate from source to
target cluster.", GROUP_TAGS);
+ LATENCY_MS, CHECKPOINT_CONNECTOR_GROUP,
+ LATENCY_MS_DESCRIPTION, GROUP_TAGS);
private static final MetricNameTemplate CHECKPOINT_LATENCY_MAX = new
MetricNameTemplate(
- "checkpoint-latency-ms-max", CHECKPOINT_CONNECTOR_GROUP,
- "Max time it takes consumer group offsets to replicate from source
to target cluster.", GROUP_TAGS);
+ LATENCY_MS_MAX, CHECKPOINT_CONNECTOR_GROUP,
+ LATENCY_MS_MAX_DESCRIPTION, GROUP_TAGS);
private static final MetricNameTemplate CHECKPOINT_LATENCY_MIN = new
MetricNameTemplate(
- "checkpoint-latency-ms-min", CHECKPOINT_CONNECTOR_GROUP,
- "Min time it takes consumer group offsets to replicate from source
to target cluster.", GROUP_TAGS);
+ LATENCY_MS_MIN, CHECKPOINT_CONNECTOR_GROUP,
+ LATENCY_MS_MIN_DESCRIPTION, GROUP_TAGS);
private static final MetricNameTemplate CHECKPOINT_LATENCY_AVG = new
MetricNameTemplate(
- "checkpoint-latency-ms-avg", CHECKPOINT_CONNECTOR_GROUP,
- "Average time it takes consumer group offsets to replicate from
source to target cluster.", GROUP_TAGS);
-
+ LATENCY_MS_AVG, CHECKPOINT_CONNECTOR_GROUP,
+ LATENCY_MS_AVG_DESCRIPTION, GROUP_TAGS);
private final Metrics metrics;
private final Map<String, GroupMetrics> groupMetrics = new HashMap<>();
private final String source;
private final String target;
- MirrorCheckpointMetrics(MirrorCheckpointTaskConfig taskConfig) {
+ MirrorCheckpointLegacyMetrics(MirrorCheckpointTaskConfig taskConfig) {
this.target = taskConfig.targetClusterAlias();
this.source = taskConfig.sourceClusterAlias();
this.metrics = new Metrics();
@@ -91,12 +98,7 @@ class MirrorCheckpointMetrics implements AutoCloseable {
private final Sensor checkpointLatencySensor;
GroupMetrics(TopicPartition topicPartition, String group) {
- Map<String, String> tags = new LinkedHashMap<>();
- tags.put("source", source);
- tags.put("target", target);
- tags.put("group", group);
- tags.put("topic", topicPartition.topic());
- tags.put("partition",
Integer.toString(topicPartition.partition()));
+ Map<String, String> tags = groupTags(source, target, group,
topicPartition);
checkpointLatencySensor = metrics.sensor("checkpoint-latency");
checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY, tags),
new Value());
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
index 71e3edebf5b..e7798e71141 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointMetrics.java
@@ -16,10 +16,9 @@
*/
package org.apache.kafka.connect.mirror;
-import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
@@ -29,49 +28,27 @@ import org.apache.kafka.common.metrics.stats.Value;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Set;
-/** Metrics for replicated topic-partitions */
-class MirrorCheckpointMetrics implements AutoCloseable {
+public class MirrorCheckpointMetrics {
- private static final String CHECKPOINT_CONNECTOR_GROUP =
MirrorCheckpointConnector.class.getSimpleName();
+ static final String LATENCY_MS = "checkpoint-latency-ms";
+ static final String LATENCY_MS_DESCRIPTION = "Time it takes consumer group
offsets to replicate from source to target cluster.";
+ static final String LATENCY_MS_MAX = "checkpoint-latency-ms-max";
+ static final String LATENCY_MS_MAX_DESCRIPTION = "Max time it takes
consumer group offsets to replicate from source to target cluster.";
+ static final String LATENCY_MS_MIN = "checkpoint-latency-ms-min";
+ static final String LATENCY_MS_MIN_DESCRIPTION = "Min time it takes
consumer group offsets to replicate from source to target cluster.";
+ static final String LATENCY_MS_AVG = "checkpoint-latency-ms-avg";
+ static final String LATENCY_MS_AVG_DESCRIPTION = "Average time it takes
consumer group offsets to replicate from source to target cluster.";
- private static final Set<String> GROUP_TAGS = Set.of("source", "target",
"group", "topic", "partition");
-
- private static final MetricNameTemplate CHECKPOINT_LATENCY = new
MetricNameTemplate(
- "checkpoint-latency-ms", CHECKPOINT_CONNECTOR_GROUP,
- "Time it takes consumer group offsets to replicate from source to
target cluster.", GROUP_TAGS);
- private static final MetricNameTemplate CHECKPOINT_LATENCY_MAX = new
MetricNameTemplate(
- "checkpoint-latency-ms-max", CHECKPOINT_CONNECTOR_GROUP,
- "Max time it takes consumer group offsets to replicate from source
to target cluster.", GROUP_TAGS);
- private static final MetricNameTemplate CHECKPOINT_LATENCY_MIN = new
MetricNameTemplate(
- "checkpoint-latency-ms-min", CHECKPOINT_CONNECTOR_GROUP,
- "Min time it takes consumer group offsets to replicate from source
to target cluster.", GROUP_TAGS);
- private static final MetricNameTemplate CHECKPOINT_LATENCY_AVG = new
MetricNameTemplate(
- "checkpoint-latency-ms-avg", CHECKPOINT_CONNECTOR_GROUP,
- "Average time it takes consumer group offsets to replicate from
source to target cluster.", GROUP_TAGS);
-
-
- private final Metrics metrics;
private final Map<String, GroupMetrics> groupMetrics = new HashMap<>();
+ private final PluginMetrics metrics;
private final String source;
private final String target;
- MirrorCheckpointMetrics(MirrorCheckpointTaskConfig taskConfig) {
- this.target = taskConfig.targetClusterAlias();
+ MirrorCheckpointMetrics(PluginMetrics metrics, MirrorCheckpointTaskConfig
taskConfig) {
+ this.metrics = metrics;
this.source = taskConfig.sourceClusterAlias();
- this.metrics = new Metrics();
-
- // for side-effect
- metrics.sensor("record-count");
- metrics.sensor("byte-rate");
- metrics.sensor("record-age");
- metrics.sensor("replication-latency");
- }
-
- @Override
- public void close() {
- metrics.close();
+ this.target = taskConfig.targetClusterAlias();
}
void checkpointLatency(TopicPartition topicPartition, String group, long
millis) {
@@ -80,29 +57,34 @@ class MirrorCheckpointMetrics implements AutoCloseable {
GroupMetrics group(TopicPartition topicPartition, String group) {
return groupMetrics.computeIfAbsent(String.join("-",
topicPartition.toString(), group),
- x -> new GroupMetrics(topicPartition, group));
- }
-
- void addReporter(MetricsReporter reporter) {
- metrics.addReporter(reporter);
+ x -> new GroupMetrics(topicPartition, group));
}
private class GroupMetrics {
private final Sensor checkpointLatencySensor;
GroupMetrics(TopicPartition topicPartition, String group) {
- Map<String, String> tags = new LinkedHashMap<>();
- tags.put("source", source);
- tags.put("target", target);
- tags.put("group", group);
- tags.put("topic", topicPartition.topic());
- tags.put("partition",
Integer.toString(topicPartition.partition()));
-
- checkpointLatencySensor = metrics.sensor("checkpoint-latency");
-
checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY, tags),
new Value());
-
checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY_MAX,
tags), new Max());
-
checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY_MIN,
tags), new Min());
-
checkpointLatencySensor.add(metrics.metricInstance(CHECKPOINT_LATENCY_AVG,
tags), new Avg());
+ LinkedHashMap<String, String> tags = groupTags(source, target,
group, topicPartition);
+
+ checkpointLatencySensor = metrics.addSensor("checkpoint-latency");
+ MetricName checkpointLatency = metrics.metricName(LATENCY_MS,
LATENCY_MS_DESCRIPTION, tags);
+ MetricName checkpointLatencyMax =
metrics.metricName(LATENCY_MS_MAX, LATENCY_MS_MAX_DESCRIPTION, tags);
+ MetricName checkpointLatencyMin =
metrics.metricName(LATENCY_MS_MIN, LATENCY_MS_MIN_DESCRIPTION, tags);
+ MetricName checkpointLatencyAvg =
metrics.metricName(LATENCY_MS_AVG, LATENCY_MS_AVG_DESCRIPTION, tags);
+ checkpointLatencySensor.add(checkpointLatency, new Value());
+ checkpointLatencySensor.add(checkpointLatencyMax, new Max());
+ checkpointLatencySensor.add(checkpointLatencyMin, new Min());
+ checkpointLatencySensor.add(checkpointLatencyAvg, new Avg());
}
}
+
+ static LinkedHashMap<String, String> groupTags(String source, String
target, String group, TopicPartition topicPartition) {
+ LinkedHashMap<String, String> tags = new LinkedHashMap<>();
+ tags.put("source", source);
+ tags.put("target", target);
+ tags.put("group", group);
+ tags.put("topic", topicPartition.topic());
+ tags.put("partition", Integer.toString(topicPartition.partition()));
+ return tags;
+ }
}
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index db86fbdb40b..f09aae0b077 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -46,6 +46,8 @@ import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_LEGACY;
+import static
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_NEW;
import static org.apache.kafka.connect.mirror.MirrorUtils.adminCall;
/** Emits checkpoints for upstream consumer groups. */
@@ -65,6 +67,7 @@ public class MirrorCheckpointTask extends SourceTask {
private ReplicationPolicy replicationPolicy;
private OffsetSyncStore offsetSyncStore;
private boolean stopping;
+ private MirrorCheckpointLegacyMetrics legacyMetrics;
private MirrorCheckpointMetrics metrics;
private Scheduler scheduler;
private Map<String, Map<TopicPartition, OffsetAndMetadata>>
idleConsumerGroupsOffset;
@@ -104,7 +107,9 @@ public class MirrorCheckpointTask extends SourceTask {
offsetSyncStore = new OffsetSyncStore(config);
sourceAdminClient =
config.forwardingAdmin(config.sourceAdminConfig("checkpoint-source-admin"));
targetAdminClient =
config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"));
- metrics = config.metrics();
+ List<String> metricNamesFormats = config.metricNamesFormats();
+ legacyMetrics = metricNamesFormats.contains(METRIC_NAMES_LEGACY) ?
config.legacyMetrics() : null;
+ metrics = metricNamesFormats.contains(METRIC_NAMES_NEW) ?
config.metrics(context.pluginMetrics()) : null;
idleConsumerGroupsOffset = new HashMap<>();
checkpointStore = new CheckpointStore(config, consumerGroups);
scheduler = new Scheduler(getClass(), config.entityLabel(),
config.adminTimeout());
@@ -136,7 +141,7 @@ public class MirrorCheckpointTask extends SourceTask {
Utils.closeQuietly(offsetSyncStore, "offset sync store");
Utils.closeQuietly(sourceAdminClient, "source admin client");
Utils.closeQuietly(targetAdminClient, "target admin client");
- Utils.closeQuietly(metrics, "metrics");
+ Utils.closeQuietly(legacyMetrics, "metrics");
Utils.closeQuietly(scheduler, "scheduler");
log.info("Stopping {} took {} ms.", Thread.currentThread().getName(),
System.currentTimeMillis() - start);
}
@@ -282,9 +287,16 @@ public class MirrorCheckpointTask extends SourceTask {
@Override
public void commitRecord(SourceRecord record, RecordMetadata metadata) {
-
metrics.checkpointLatency(MirrorUtils.unwrapPartition(record.sourcePartition()),
- Checkpoint.unwrapGroup(record.sourcePartition()),
- System.currentTimeMillis() - record.timestamp());
+ TopicPartition topicPartition =
MirrorUtils.unwrapPartition(record.sourcePartition());
+ String group = Checkpoint.unwrapGroup(record.sourcePartition());
+ long millis = System.currentTimeMillis() - record.timestamp();
+ if (legacyMetrics != null) {
+ legacyMetrics.checkpointLatency(topicPartition, group, millis);
+ }
+ if (metrics != null) {
+ metrics.checkpointLatency(topicPartition, group, millis);
+ }
+
}
private void refreshIdleConsumerGroupOffset() throws ExecutionException,
InterruptedException {
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java
index 3d2cfda6dcc..c985c6be5f1 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskConfig.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.metrics.PluginMetrics;
import java.util.HashSet;
import java.util.Map;
@@ -34,12 +35,16 @@ public class MirrorCheckpointTaskConfig extends
MirrorCheckpointConfig {
return new HashSet<>(getList(TASK_CONSUMER_GROUPS));
}
- MirrorCheckpointMetrics metrics() {
- MirrorCheckpointMetrics metrics = new MirrorCheckpointMetrics(this);
+ MirrorCheckpointLegacyMetrics legacyMetrics() {
+ MirrorCheckpointLegacyMetrics metrics = new
MirrorCheckpointLegacyMetrics(this);
metricsReporters().forEach(metrics::addReporter);
return metrics;
}
+ MirrorCheckpointMetrics metrics(PluginMetrics pluginMetrics) {
+ return new MirrorCheckpointMetrics(pluginMetrics, this);
+ }
+
@Override
String entityLabel() {
return super.entityLabel() + "-" + (getInt(TASK_INDEX) == null ? "?" :
getInt(TASK_INDEX));
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index 9baf7c1f35c..96aec8330e0 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -122,6 +122,11 @@ public abstract class MirrorConnectorConfig extends
AbstractConfig {
EMIT_OFFSET_SYNCS_ENABLED + " is disabled.";
public static final boolean EMIT_OFFSET_SYNCS_ENABLED_DEFAULT = true;
+ public static final String METRIC_NAMES_FORMAT = "metric.names.formats";
+ public static final String METRIC_NAMES_LEGACY = "legacy";
+ public static final String METRIC_NAMES_NEW = "new";
+ public static final String METRIC_NAMES_FORMAT_DEFAULT =
METRIC_NAMES_LEGACY;
+
public static final String OFFSET_SYNCS_CLIENT_ROLE_PREFIX =
"offset-syncs-";
public static final String TASK_INDEX = "task.index";
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java
index 15558567ef2..b629ac73882 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorHeartbeatConfig.java
@@ -57,6 +57,7 @@ public class MirrorHeartbeatConfig extends
MirrorConnectorConfig {
return getShort(HEARTBEATS_TOPIC_REPLICATION_FACTOR);
}
+
private static ConfigDef defineHeartbeatConfig(ConfigDef baseConfig) {
return baseConfig
.define(
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
index 2ec663ad2fc..13b2aa0bbbe 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
@@ -94,6 +94,14 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
" If set to false, heartbeats topics will only be replicated if
the topic filter allows.";
public static final boolean HEARTBEATS_REPLICATION_ENABLED_DEFAULT = true;
+ public static final String METRIC_NAMES_FORMAT = "metric.names.formats";
+ public static final List<String> METRIC_NAMES_FORMAT_DEFAULT =
List.of(METRIC_NAMES_LEGACY);
+ public static final String METRIC_NAMES_FORMAT_DOC = "Deprecated. The
formats in which metrics are emitted. Valid values are legacy and new. " +
+ "When set to legacy, the metrics have the following name
\"kafka.connect.mirror:type=MirrorSourceConnector,source={SOURCE},target={TARGET},topic={TOPIC},partition={PARTITION}\".
" +
+ "When set to new, the metrics have the following name
\"kafka.connect:type=plugins,connector={CONNECTOR},task={TASK},source={SOURCE},target={TARGET},topic={TOPIC},partition={PARTITION}\".
" +
+ "When set to \"legacy,new\" the connector will emit metrics with
both names, this can be useful when migrating to the new format but it doubles
the amount of metrics that are emitted.\n" +
+ "In Kafka 5.0 the legacy format and this configuration will be
removed and the metrics will always use the new names.";
+
public static final String OFFSET_SYNCS_SOURCE_PRODUCER_ROLE =
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-producer";
public static final String OFFSET_SYNCS_TARGET_PRODUCER_ROLE =
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "target-producer";
public static final String OFFSET_SYNCS_SOURCE_ADMIN_ROLE =
OFFSET_SYNCS_CLIENT_ROLE_PREFIX + "source-admin";
@@ -203,6 +211,10 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
return getBoolean(HEARTBEATS_REPLICATION_ENABLED);
}
+ List<String> metricNamesFormats() {
+ return getList(METRIC_NAMES_FORMAT);
+ }
+
private static ConfigDef defineSourceConfig(ConfigDef baseConfig) {
return baseConfig
.define(
@@ -318,6 +330,14 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
HEARTBEATS_REPLICATION_ENABLED_DEFAULT,
ConfigDef.Importance.LOW,
HEARTBEATS_REPLICATION_ENABLED_DOC
+ )
+ .define(
+ METRIC_NAMES_FORMAT,
+ ConfigDef.Type.LIST,
+ METRIC_NAMES_FORMAT_DEFAULT,
+ ConfigDef.ValidList.in(false, METRIC_NAMES_LEGACY,
METRIC_NAMES_NEW),
+ ConfigDef.Importance.LOW,
+ METRIC_NAMES_FORMAT_DOC
);
}
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceLegacyMetrics.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceLegacyMetrics.java
new file mode 100644
index 00000000000..4df43a6ca34
--- /dev/null
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceLegacyMetrics.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror;
+
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.BYTE_COUNT_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.BYTE_RATE_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_AVG;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_AVG_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_MAX;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_MAX_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_MIN;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_AGE_MS_MIN_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_COUNT_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.RECORD_RATE_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_AVG;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_AVG_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_MAX;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_MAX_DESCRIPTION;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_MIN;
+import static
org.apache.kafka.connect.mirror.MirrorSourceMetrics.REPLICATION_LATENCY_MS_MIN_DESCRIPTION;
+
+/** Metrics for replicated topic-partitions */
+class MirrorSourceLegacyMetrics implements AutoCloseable {
+
+ private static final String SOURCE_CONNECTOR_GROUP =
MirrorSourceConnector.class.getSimpleName();
+
+ private static final Set<String> PARTITION_TAGS = Set.of("source",
"target", "topic", "partition");
+ private static final MetricNameTemplate RECORD_COUNT = new
MetricNameTemplate(
+ MirrorSourceMetrics.RECORD_COUNT, SOURCE_CONNECTOR_GROUP,
+ RECORD_COUNT_DESCRIPTION, PARTITION_TAGS);
+ private static final MetricNameTemplate RECORD_RATE = new
MetricNameTemplate(
+ MirrorSourceMetrics.RECORD_RATE, SOURCE_CONNECTOR_GROUP,
+ RECORD_RATE_DESCRIPTION, PARTITION_TAGS);
+ private static final MetricNameTemplate RECORD_AGE = new
MetricNameTemplate(
+ RECORD_AGE_MS, SOURCE_CONNECTOR_GROUP,
+ RECORD_AGE_MS_DESCRIPTION, PARTITION_TAGS);
+ private static final MetricNameTemplate RECORD_AGE_MAX = new
MetricNameTemplate(
+ RECORD_AGE_MS_MAX, SOURCE_CONNECTOR_GROUP,
+ RECORD_AGE_MS_MAX_DESCRIPTION, PARTITION_TAGS);
+ private static final MetricNameTemplate RECORD_AGE_MIN = new
MetricNameTemplate(
+ RECORD_AGE_MS_MIN, SOURCE_CONNECTOR_GROUP,
+ RECORD_AGE_MS_MIN_DESCRIPTION, PARTITION_TAGS);
+ private static final MetricNameTemplate RECORD_AGE_AVG = new
MetricNameTemplate(
+ RECORD_AGE_MS_AVG, SOURCE_CONNECTOR_GROUP,
+ RECORD_AGE_MS_AVG_DESCRIPTION, PARTITION_TAGS);
+ private static final MetricNameTemplate BYTE_COUNT = new
MetricNameTemplate(
+ MirrorSourceMetrics.BYTE_COUNT, SOURCE_CONNECTOR_GROUP,
+ BYTE_COUNT_DESCRIPTION, PARTITION_TAGS);
+ private static final MetricNameTemplate BYTE_RATE = new MetricNameTemplate(
+ MirrorSourceMetrics.BYTE_RATE, SOURCE_CONNECTOR_GROUP,
+ BYTE_RATE_DESCRIPTION, PARTITION_TAGS);
+ private static final MetricNameTemplate REPLICATION_LATENCY = new
MetricNameTemplate(
+ REPLICATION_LATENCY_MS, SOURCE_CONNECTOR_GROUP,
+ REPLICATION_LATENCY_MS_DESCRIPTION, PARTITION_TAGS);
+ private static final MetricNameTemplate REPLICATION_LATENCY_MAX = new
MetricNameTemplate(
+ REPLICATION_LATENCY_MS_MAX, SOURCE_CONNECTOR_GROUP,
+ REPLICATION_LATENCY_MS_MAX_DESCRIPTION, PARTITION_TAGS);
+ private static final MetricNameTemplate REPLICATION_LATENCY_MIN = new
MetricNameTemplate(
+ REPLICATION_LATENCY_MS_MIN, SOURCE_CONNECTOR_GROUP,
+ REPLICATION_LATENCY_MS_MIN_DESCRIPTION, PARTITION_TAGS);
+ private static final MetricNameTemplate REPLICATION_LATENCY_AVG = new
MetricNameTemplate(
+ REPLICATION_LATENCY_MS_AVG, SOURCE_CONNECTOR_GROUP,
+ REPLICATION_LATENCY_MS_AVG_DESCRIPTION, PARTITION_TAGS);
+
+ private final Metrics metrics;
+ private final Map<TopicPartition, PartitionMetrics> partitionMetrics;
+ private final String source;
+ private final String target;
+
+ MirrorSourceLegacyMetrics(MirrorSourceTaskConfig taskConfig) {
+ this.target = taskConfig.targetClusterAlias();
+ this.source = taskConfig.sourceClusterAlias();
+ this.metrics = new Metrics();
+
+ // for side-effect
+ metrics.sensor("record-count");
+ metrics.sensor("byte-rate");
+ metrics.sensor("record-age");
+ metrics.sensor("replication-latency");
+
+ ReplicationPolicy replicationPolicy = taskConfig.replicationPolicy();
+ partitionMetrics = taskConfig.taskTopicPartitions().stream()
+ .map(x -> new
TopicPartition(replicationPolicy.formatRemoteTopic(source, x.topic()),
x.partition()))
+ .collect(Collectors.toMap(x -> x, PartitionMetrics::new));
+ }
+
+ @Override
+ public void close() {
+ metrics.close();
+ }
+
+ void countRecord(TopicPartition topicPartition) {
+ partitionMetrics.get(topicPartition).recordSensor.record();
+ }
+
+ void recordAge(TopicPartition topicPartition, long ageMillis) {
+ partitionMetrics.get(topicPartition).recordAgeSensor.record((double)
ageMillis);
+ }
+
+ void replicationLatency(TopicPartition topicPartition, long millis) {
+
partitionMetrics.get(topicPartition).replicationLatencySensor.record((double)
millis);
+ }
+
+ void recordBytes(TopicPartition topicPartition, long bytes) {
+ partitionMetrics.get(topicPartition).byteSensor.record((double) bytes);
+ }
+
+ void addReporter(MetricsReporter reporter) {
+ metrics.addReporter(reporter);
+ }
+
+ private class PartitionMetrics {
+ private final Sensor recordSensor;
+ private final Sensor byteSensor;
+ private final Sensor recordAgeSensor;
+ private final Sensor replicationLatencySensor;
+
+ PartitionMetrics(TopicPartition topicPartition) {
+ String prefix = topicPartition.topic() + "-" +
topicPartition.partition() + "-";
+
+ Map<String, String> tags = new LinkedHashMap<>();
+ tags.put("source", source);
+ tags.put("target", target);
+ tags.put("topic", topicPartition.topic());
+ tags.put("partition",
Integer.toString(topicPartition.partition()));
+
+ recordSensor = metrics.sensor(prefix + "records-sent");
+ recordSensor.add(new Meter(metrics.metricInstance(RECORD_RATE,
tags), metrics.metricInstance(RECORD_COUNT, tags)));
+
+ byteSensor = metrics.sensor(prefix + "bytes-sent");
+ byteSensor.add(new Meter(metrics.metricInstance(BYTE_RATE, tags),
metrics.metricInstance(BYTE_COUNT, tags)));
+
+ recordAgeSensor = metrics.sensor(prefix + "record-age");
+ recordAgeSensor.add(metrics.metricInstance(RECORD_AGE, tags), new
Value());
+ recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MAX, tags),
new Max());
+ recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_MIN, tags),
new Min());
+ recordAgeSensor.add(metrics.metricInstance(RECORD_AGE_AVG, tags),
new Avg());
+
+ replicationLatencySensor = metrics.sensor(prefix +
"replication-latency");
+
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY, tags),
new Value());
+
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MAX,
tags), new Max());
+
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_MIN,
tags), new Min());
+
replicationLatencySensor.add(metrics.metricInstance(REPLICATION_LATENCY_AVG,
tags), new Avg());
+ }
+ }
+}
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java
index c297c4c5fcf..ad2763c9710 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceMetrics.java
@@ -16,10 +16,9 @@
*/
package org.apache.kafka.connect.mirror;
-import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
@@ -29,94 +28,52 @@ import org.apache.kafka.common.metrics.stats.Value;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
-/** Metrics for replicated topic-partitions */
-class MirrorSourceMetrics implements AutoCloseable {
-
- private static final String SOURCE_CONNECTOR_GROUP =
MirrorSourceConnector.class.getSimpleName();
-
- private final MetricNameTemplate recordCount;
- private final MetricNameTemplate recordRate;
- private final MetricNameTemplate recordAge;
- private final MetricNameTemplate recordAgeMax;
- private final MetricNameTemplate recordAgeMin;
- private final MetricNameTemplate recordAgeAvg;
- private final MetricNameTemplate byteCount;
- private final MetricNameTemplate byteRate;
- private final MetricNameTemplate replicationLatency;
- private final MetricNameTemplate replicationLatencyMax;
- private final MetricNameTemplate replicationLatencyMin;
- private final MetricNameTemplate replicationLatencyAvg;
-
- private final Metrics metrics;
+public class MirrorSourceMetrics {
+
+ static final String RECORD_COUNT = "record-count";
+ static final String RECORD_COUNT_DESCRIPTION = "Number of source records
replicated to the target cluster.";
+ static final String RECORD_RATE = "record-rate";
+ static final String RECORD_RATE_DESCRIPTION = "Average number of source
records replicated to the target cluster per second.";
+ static final String RECORD_AGE_MS = "record-age-ms";
+ static final String RECORD_AGE_MS_DESCRIPTION = "The age of incoming
source records when replicated to the target cluster.";
+ static final String RECORD_AGE_MS_MAX = "record-age-ms-max";
+ static final String RECORD_AGE_MS_MAX_DESCRIPTION = "The max age of
incoming source records when replicated to the target cluster.";
+ static final String RECORD_AGE_MS_MIN = "record-age-ms-min";
+ static final String RECORD_AGE_MS_MIN_DESCRIPTION = "The min age of
incoming source records when replicated to the target cluster.";
+ static final String RECORD_AGE_MS_AVG = "record-age-ms-avg";
+ static final String RECORD_AGE_MS_AVG_DESCRIPTION = "The average age of
incoming source records when replicated to the target cluster.";
+ static final String BYTE_COUNT = "byte-count";
+ static final String BYTE_COUNT_DESCRIPTION = "Number of bytes replicated
to the target cluster.";
+ static final String BYTE_RATE = "byte-rate";
+ static final String BYTE_RATE_DESCRIPTION = "Average number of bytes
replicated per second.";
+ static final String REPLICATION_LATENCY_MS = "replication-latency-ms";
+ static final String REPLICATION_LATENCY_MS_DESCRIPTION = "Time it takes
records to replicate from source to target cluster.";
+ static final String REPLICATION_LATENCY_MS_MAX =
"replication-latency-ms-max";
+ static final String REPLICATION_LATENCY_MS_MAX_DESCRIPTION = "Max time it
takes records to replicate from source to target cluster.";
+ static final String REPLICATION_LATENCY_MS_MIN =
"replication-latency-ms-min";
+ static final String REPLICATION_LATENCY_MS_MIN_DESCRIPTION = "Min time it
takes records to replicate from source to target cluster.";
+ static final String REPLICATION_LATENCY_MS_AVG =
"replication-latency-ms-avg";
+ static final String REPLICATION_LATENCY_MS_AVG_DESCRIPTION = "Average time
it takes records to replicate from source to target cluster.";
+
+ private final PluginMetrics metrics;
private final Map<TopicPartition, PartitionMetrics> partitionMetrics;
private final String source;
private final String target;
- MirrorSourceMetrics(MirrorSourceTaskConfig taskConfig) {
+ MirrorSourceMetrics(PluginMetrics pluginMetrics, MirrorSourceTaskConfig
taskConfig) {
+ this.metrics = pluginMetrics;
this.target = taskConfig.targetClusterAlias();
this.source = taskConfig.sourceClusterAlias();
- this.metrics = new Metrics();
-
- Set<String> partitionTags = Set.of("source", "target", "topic",
"partition");
-
- recordCount = new MetricNameTemplate(
- "record-count", SOURCE_CONNECTOR_GROUP,
- "Number of source records replicated to the target cluster.",
partitionTags);
- recordRate = new MetricNameTemplate(
- "record-rate", SOURCE_CONNECTOR_GROUP,
- "Average number of source records replicated to the target
cluster per second.", partitionTags);
- recordAge = new MetricNameTemplate(
- "record-age-ms", SOURCE_CONNECTOR_GROUP,
- "The age of incoming source records when replicated to the
target cluster.", partitionTags);
- recordAgeMax = new MetricNameTemplate(
- "record-age-ms-max", SOURCE_CONNECTOR_GROUP,
- "The max age of incoming source records when replicated to the
target cluster.", partitionTags);
- recordAgeMin = new MetricNameTemplate(
- "record-age-ms-min", SOURCE_CONNECTOR_GROUP,
- "The min age of incoming source records when replicated to the
target cluster.", partitionTags);
- recordAgeAvg = new MetricNameTemplate(
- "record-age-ms-avg", SOURCE_CONNECTOR_GROUP,
- "The average age of incoming source records when replicated to
the target cluster.", partitionTags);
- byteCount = new MetricNameTemplate(
- "byte-count", SOURCE_CONNECTOR_GROUP,
- "Number of bytes replicated to the target cluster.",
partitionTags);
- byteRate = new MetricNameTemplate(
- "byte-rate", SOURCE_CONNECTOR_GROUP,
- "Average number of bytes replicated per second.",
partitionTags);
- replicationLatency = new MetricNameTemplate(
- "replication-latency-ms", SOURCE_CONNECTOR_GROUP,
- "Time it takes records to replicate from source to target
cluster.", partitionTags);
- replicationLatencyMax = new MetricNameTemplate(
- "replication-latency-ms-max", SOURCE_CONNECTOR_GROUP,
- "Max time it takes records to replicate from source to target
cluster.", partitionTags);
- replicationLatencyMin = new MetricNameTemplate(
- "replication-latency-ms-min", SOURCE_CONNECTOR_GROUP,
- "Min time it takes records to replicate from source to target
cluster.", partitionTags);
- replicationLatencyAvg = new MetricNameTemplate(
- "replication-latency-ms-avg", SOURCE_CONNECTOR_GROUP,
- "Average time it takes records to replicate from source to
target cluster.", partitionTags);
-
- // for side-effect
- metrics.sensor("record-count");
- metrics.sensor("byte-rate");
- metrics.sensor("record-age");
- metrics.sensor("replication-latency");
ReplicationPolicy replicationPolicy = taskConfig.replicationPolicy();
partitionMetrics = taskConfig.taskTopicPartitions().stream()
- .map(x -> new
TopicPartition(replicationPolicy.formatRemoteTopic(source, x.topic()),
x.partition()))
- .collect(Collectors.toMap(x -> x, PartitionMetrics::new));
+ .map(x -> new
TopicPartition(replicationPolicy.formatRemoteTopic(source, x.topic()),
x.partition()))
+ .collect(Collectors.toMap(x -> x,
MirrorSourceMetrics.PartitionMetrics::new));
}
- @Override
- public void close() {
- metrics.close();
- }
-
void countRecord(TopicPartition topicPartition) {
partitionMetrics.get(topicPartition).recordSensor.record();
}
@@ -133,10 +90,6 @@ class MirrorSourceMetrics implements AutoCloseable {
partitionMetrics.get(topicPartition).byteSensor.record((double) bytes);
}
- void addReporter(MetricsReporter reporter) {
- metrics.addReporter(reporter);
- }
-
private class PartitionMetrics {
private final Sensor recordSensor;
private final Sensor byteSensor;
@@ -146,29 +99,42 @@ class MirrorSourceMetrics implements AutoCloseable {
PartitionMetrics(TopicPartition topicPartition) {
String prefix = topicPartition.topic() + "-" +
topicPartition.partition() + "-";
- Map<String, String> tags = new LinkedHashMap<>();
+ LinkedHashMap<String, String> tags = new LinkedHashMap<>();
tags.put("source", source);
- tags.put("target", target);
+ tags.put("target", target);
tags.put("topic", topicPartition.topic());
tags.put("partition",
Integer.toString(topicPartition.partition()));
- recordSensor = metrics.sensor(prefix + "records-sent");
- recordSensor.add(new Meter(metrics.metricInstance(recordRate,
tags), metrics.metricInstance(recordCount, tags)));
-
- byteSensor = metrics.sensor(prefix + "bytes-sent");
- byteSensor.add(new Meter(metrics.metricInstance(byteRate, tags),
metrics.metricInstance(byteCount, tags)));
-
- recordAgeSensor = metrics.sensor(prefix + "record-age");
- recordAgeSensor.add(metrics.metricInstance(recordAge, tags), new
Value());
- recordAgeSensor.add(metrics.metricInstance(recordAgeMax, tags),
new Max());
- recordAgeSensor.add(metrics.metricInstance(recordAgeMin, tags),
new Min());
- recordAgeSensor.add(metrics.metricInstance(recordAgeAvg, tags),
new Avg());
-
- replicationLatencySensor = metrics.sensor(prefix +
"replication-latency");
-
replicationLatencySensor.add(metrics.metricInstance(replicationLatency, tags),
new Value());
-
replicationLatencySensor.add(metrics.metricInstance(replicationLatencyMax,
tags), new Max());
-
replicationLatencySensor.add(metrics.metricInstance(replicationLatencyMin,
tags), new Min());
-
replicationLatencySensor.add(metrics.metricInstance(replicationLatencyAvg,
tags), new Avg());
+ MetricName recordCount = metrics.metricName(RECORD_COUNT,
RECORD_COUNT_DESCRIPTION, tags);
+ MetricName recordRate = metrics.metricName(RECORD_RATE,
RECORD_RATE_DESCRIPTION, tags);
+ MetricName recordAge = metrics.metricName(RECORD_AGE_MS,
RECORD_AGE_MS_DESCRIPTION, tags);
+ MetricName recordAgeMax = metrics.metricName(RECORD_AGE_MS_MAX,
RECORD_AGE_MS_MAX_DESCRIPTION, tags);
+ MetricName recordAgeMin = metrics.metricName(RECORD_AGE_MS_MIN,
RECORD_AGE_MS_MIN_DESCRIPTION, tags);
+ MetricName recordAgeAvg = metrics.metricName(RECORD_AGE_MS_AVG,
RECORD_AGE_MS_AVG_DESCRIPTION, tags);
+ MetricName byteCount = metrics.metricName(BYTE_COUNT,
BYTE_COUNT_DESCRIPTION, tags);
+ MetricName byteRate = metrics.metricName(BYTE_RATE,
BYTE_RATE_DESCRIPTION, tags);
+ MetricName replicationLatency =
metrics.metricName(REPLICATION_LATENCY_MS, REPLICATION_LATENCY_MS_DESCRIPTION,
tags);
+ MetricName replicationLatencyMax =
metrics.metricName(REPLICATION_LATENCY_MS_MAX,
REPLICATION_LATENCY_MS_MAX_DESCRIPTION, tags);
+ MetricName replicationLatencyMin =
metrics.metricName(REPLICATION_LATENCY_MS_MIN,
REPLICATION_LATENCY_MS_MIN_DESCRIPTION, tags);
+ MetricName replicationLatencyAvg =
metrics.metricName(REPLICATION_LATENCY_MS_AVG,
REPLICATION_LATENCY_MS_AVG_DESCRIPTION, tags);
+
+ recordSensor = metrics.addSensor(prefix + "records-sent");
+ recordSensor.add(new Meter(recordRate, recordCount));
+
+ byteSensor = metrics.addSensor(prefix + "bytes-sent");
+ byteSensor.add(new Meter(byteRate, byteCount));
+
+ recordAgeSensor = metrics.addSensor(prefix + "record-age");
+ recordAgeSensor.add(recordAge, new Value());
+ recordAgeSensor.add(recordAgeMax, new Max());
+ recordAgeSensor.add(recordAgeMin, new Min());
+ recordAgeSensor.add(recordAgeAvg, new Avg());
+
+ replicationLatencySensor = metrics.addSensor(prefix +
"replication-latency");
+ replicationLatencySensor.add(replicationLatency, new Value());
+ replicationLatencySensor.add(replicationLatencyMax, new Max());
+ replicationLatencySensor.add(replicationLatencyMin, new Min());
+ replicationLatencySensor.add(replicationLatencyAvg, new Avg());
}
}
}
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index 6ab65bebdca..cd6b9b01eed 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -42,6 +42,9 @@ import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
+import static
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_LEGACY;
+import static
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_NEW;
+
/** Replicates a set of topic-partitions. */
public class MirrorSourceTask extends SourceTask {
@@ -51,6 +54,7 @@ public class MirrorSourceTask extends SourceTask {
private String sourceClusterAlias;
private Duration pollTimeout;
private ReplicationPolicy replicationPolicy;
+ private MirrorSourceLegacyMetrics legacyMetrics;
private MirrorSourceMetrics metrics;
private boolean stopping = false;
private Semaphore consumerAccess;
@@ -59,11 +63,11 @@ public class MirrorSourceTask extends SourceTask {
public MirrorSourceTask() {}
// for testing
- MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer,
MirrorSourceMetrics metrics, String sourceClusterAlias,
+ MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer,
MirrorSourceLegacyMetrics metrics, String sourceClusterAlias,
ReplicationPolicy replicationPolicy,
OffsetSyncWriter offsetSyncWriter) {
this.consumer = consumer;
- this.metrics = metrics;
+ this.legacyMetrics = metrics;
this.sourceClusterAlias = sourceClusterAlias;
this.replicationPolicy = replicationPolicy;
consumerAccess = new Semaphore(1);
@@ -75,7 +79,9 @@ public class MirrorSourceTask extends SourceTask {
MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
consumerAccess = new Semaphore(1); // let one thread at a time access
the consumer
sourceClusterAlias = config.sourceClusterAlias();
- metrics = config.metrics();
+ List<String> metricNamesFormats = config.metricNamesFormats();
+ legacyMetrics = metricNamesFormats.contains(METRIC_NAMES_LEGACY) ?
config.legacyMetrics() : null;
+ metrics = metricNamesFormats.contains(METRIC_NAMES_NEW) ?
config.metrics(context.pluginMetrics()) : null;
pollTimeout = config.consumerPollTimeout();
replicationPolicy = config.replicationPolicy();
if (config.emitOffsetSyncsEnabled()) {
@@ -114,7 +120,7 @@ public class MirrorSourceTask extends SourceTask {
}
Utils.closeQuietly(consumer, "source consumer");
Utils.closeQuietly(offsetSyncWriter, "offset sync writer");
- Utils.closeQuietly(metrics, "metrics");
+ Utils.closeQuietly(legacyMetrics, "metrics");
log.info("Stopping {} took {} ms.", Thread.currentThread().getName(),
System.currentTimeMillis() - start);
}
@@ -138,8 +144,16 @@ public class MirrorSourceTask extends SourceTask {
SourceRecord converted = convertRecord(record);
sourceRecords.add(converted);
TopicPartition topicPartition = new
TopicPartition(converted.topic(), converted.kafkaPartition());
- metrics.recordAge(topicPartition, System.currentTimeMillis() -
record.timestamp());
- metrics.recordBytes(topicPartition, byteSize(record.value()));
+ long age = System.currentTimeMillis() - record.timestamp();
+ long size = byteSize(record.value());
+ if (legacyMetrics != null) {
+ legacyMetrics.recordAge(topicPartition, age);
+ legacyMetrics.recordBytes(topicPartition, size);
+ }
+ if (metrics != null) {
+ metrics.recordAge(topicPartition, age);
+ metrics.recordBytes(topicPartition, size);
+ }
}
if (sourceRecords.isEmpty()) {
// WorkerSourceTasks expects non-zero batch size
@@ -177,8 +191,14 @@ public class MirrorSourceTask extends SourceTask {
}
TopicPartition topicPartition = new TopicPartition(record.topic(),
record.kafkaPartition());
long latency = System.currentTimeMillis() - record.timestamp();
- metrics.countRecord(topicPartition);
- metrics.replicationLatency(topicPartition, latency);
+ if (legacyMetrics != null) {
+ legacyMetrics.countRecord(topicPartition);
+ legacyMetrics.replicationLatency(topicPartition, latency);
+ }
+ if (metrics != null) {
+ metrics.countRecord(topicPartition);
+ metrics.replicationLatency(topicPartition, latency);
+ }
// Queue offset syncs only when offsetWriter is available
if (offsetSyncWriter != null) {
TopicPartition sourceTopicPartition =
MirrorUtils.unwrapPartition(record.sourcePartition());
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java
index aa5d300c00a..1996ea2c632 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTaskConfig.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.metrics.PluginMetrics;
import java.util.List;
import java.util.Map;
@@ -39,12 +40,16 @@ public class MirrorSourceTaskConfig extends
MirrorSourceConfig {
.collect(Collectors.toSet());
}
- MirrorSourceMetrics metrics() {
- MirrorSourceMetrics metrics = new MirrorSourceMetrics(this);
+ MirrorSourceLegacyMetrics legacyMetrics() {
+ MirrorSourceLegacyMetrics metrics = new
MirrorSourceLegacyMetrics(this);
metricsReporters().forEach(metrics::addReporter);
return metrics;
}
+ MirrorSourceMetrics metrics(PluginMetrics pluginMetrics) {
+ return new MirrorSourceMetrics(pluginMetrics, this);
+ }
+
@Override
String entityLabel() {
return super.entityLabel() + "-" + (getInt(TASK_INDEX) == null ? "?" :
getInt(TASK_INDEX));
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorLegacyMetricsTest.java
similarity index 66%
copy from
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
copy to
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorLegacyMetricsTest.java
index 57eae629baa..42f413e8120 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorLegacyMetricsTest.java
@@ -31,12 +31,13 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class MirrorSourceMetricsTest {
+public class MirrorLegacyMetricsTest {
private static final String SOURCE = "source";
private static final String TARGET = "target";
private static final TopicPartition TP = new TopicPartition("topic", 0);
private static final TopicPartition SOURCE_TP = new TopicPartition(SOURCE
+ "." + TP.topic(), TP.partition());
+ private static final String GROUP = "my-group";
private final Map<String, String> configs = new HashMap<>();
private TestReporter reporter;
@@ -44,25 +45,51 @@ public class MirrorSourceMetricsTest {
@BeforeEach
public void setUp() {
configs.put(ConnectorConfig.NAME_CONFIG, "name");
- configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
MirrorSourceConnector.class.getName());
configs.put(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS, SOURCE);
configs.put(MirrorConnectorConfig.TARGET_CLUSTER_ALIAS, TARGET);
configs.put(MirrorConnectorConfig.TASK_INDEX, "0");
- configs.put(MirrorSourceTaskConfig.TASK_TOPIC_PARTITIONS,
TP.toString());
reporter = new TestReporter();
}
@Test
- public void testTags() {
+ public void testSourceTags() {
+ configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
MirrorSourceConnector.class.getName());
+ configs.put(MirrorSourceTaskConfig.TASK_TOPIC_PARTITIONS,
TP.toString());
MirrorSourceTaskConfig taskConfig = new
MirrorSourceTaskConfig(configs);
- MirrorSourceMetrics metrics = new MirrorSourceMetrics(taskConfig);
+ MirrorSourceLegacyMetrics metrics = new
MirrorSourceLegacyMetrics(taskConfig);
metrics.addReporter(reporter);
metrics.countRecord(SOURCE_TP);
+ // 12 MirrorSourceConnector metrics + the metrics count metric
assertEquals(13, reporter.metrics.size());
- Map<String, String> tags = reporter.metrics.get(0).metricName().tags();
+ Map<String, String> tags = reporter.metrics.stream()
+ .filter(m ->
m.metricName().group().equals(MirrorSourceConnector.class.getSimpleName()))
+ .findFirst()
+ .get().metricName().tags();
+ assertEquals(SOURCE, tags.get("source"));
+ assertEquals(TARGET, tags.get("target"));
+ assertEquals(SOURCE_TP.topic(), tags.get("topic"));
+ assertEquals(String.valueOf(SOURCE_TP.partition()),
tags.get("partition"));
+ }
+
+ @Test
+ public void testCheckpointTags() {
+ configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
MirrorCheckpointConnector.class.getName());
+ configs.put(MirrorCheckpointTaskConfig.TASK_CONSUMER_GROUPS, GROUP);
+ MirrorCheckpointTaskConfig taskConfig = new
MirrorCheckpointTaskConfig(configs);
+ MirrorCheckpointLegacyMetrics metrics = new
MirrorCheckpointLegacyMetrics(taskConfig);
+ metrics.addReporter(reporter);
+
+ metrics.checkpointLatency(SOURCE_TP, GROUP, 1L);
+ // 4 MirrorCheckpointConnector metrics + the metrics count metric
+ assertEquals(5, reporter.metrics.size());
+ Map<String, String> tags = reporter.metrics.stream()
+ .filter(m ->
m.metricName().group().equals(MirrorCheckpointConnector.class.getSimpleName()))
+ .findFirst()
+ .get().metricName().tags();
assertEquals(SOURCE, tags.get("source"));
assertEquals(TARGET, tags.get("target"));
+ assertEquals(GROUP, tags.get("group"));
assertEquals(SOURCE_TP.topic(), tags.get("topic"));
assertEquals(String.valueOf(SOURCE_TP.partition()),
tags.get("partition"));
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMetricsTest.java
similarity index 54%
rename from
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
rename to
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMetricsTest.java
index 57eae629baa..566ab236bc9 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceMetricsTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMetricsTest.java
@@ -17,79 +17,77 @@
package org.apache.kafka.connect.mirror;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.KafkaMetric;
-import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.PluginMetrics;
+import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.ArrayList;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class MirrorSourceMetricsTest {
+public class MirrorMetricsTest {
private static final String SOURCE = "source";
private static final String TARGET = "target";
private static final TopicPartition TP = new TopicPartition("topic", 0);
private static final TopicPartition SOURCE_TP = new TopicPartition(SOURCE
+ "." + TP.topic(), TP.partition());
+ private static final String GROUP = "my-group";
private final Map<String, String> configs = new HashMap<>();
- private TestReporter reporter;
+ private final Metrics metrics = new Metrics();
+ private final PluginMetrics pluginMetrics = new PluginMetricsImpl(metrics,
Map.of());
@BeforeEach
public void setUp() {
configs.put(ConnectorConfig.NAME_CONFIG, "name");
- configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
MirrorSourceConnector.class.getName());
configs.put(MirrorConnectorConfig.SOURCE_CLUSTER_ALIAS, SOURCE);
configs.put(MirrorConnectorConfig.TARGET_CLUSTER_ALIAS, TARGET);
configs.put(MirrorConnectorConfig.TASK_INDEX, "0");
- configs.put(MirrorSourceTaskConfig.TASK_TOPIC_PARTITIONS,
TP.toString());
- reporter = new TestReporter();
}
@Test
- public void testTags() {
+ public void testSourceTags() {
+ configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
MirrorSourceConnector.class.getName());
+ configs.put(MirrorSourceTaskConfig.TASK_TOPIC_PARTITIONS,
TP.toString());
MirrorSourceTaskConfig taskConfig = new
MirrorSourceTaskConfig(configs);
- MirrorSourceMetrics metrics = new MirrorSourceMetrics(taskConfig);
- metrics.addReporter(reporter);
-
- metrics.countRecord(SOURCE_TP);
- assertEquals(13, reporter.metrics.size());
- Map<String, String> tags = reporter.metrics.get(0).metricName().tags();
+ MirrorSourceMetrics sourceMetrics = new
MirrorSourceMetrics(pluginMetrics, taskConfig);
+
+ sourceMetrics.countRecord(SOURCE_TP);
+ // 12 MirrorSourceConnector metrics + the metrics count metric
+ assertEquals(13, metrics.metrics().size());
+ Map<String, String> tags = metrics.metrics().keySet().stream()
+ .filter(m -> m.group().equals("plugins"))
+ .findFirst()
+ .get().tags();
assertEquals(SOURCE, tags.get("source"));
assertEquals(TARGET, tags.get("target"));
assertEquals(SOURCE_TP.topic(), tags.get("topic"));
assertEquals(String.valueOf(SOURCE_TP.partition()),
tags.get("partition"));
}
- static class TestReporter implements MetricsReporter {
-
- List<KafkaMetric> metrics = new ArrayList<>();
-
- @Override
- public void init(List<KafkaMetric> metrics) {
- for (KafkaMetric metric : metrics) {
- metricChange(metric);
- }
- }
-
- @Override
- public void metricChange(KafkaMetric metric) {
- metrics.add(metric);
- }
-
- @Override
- public void metricRemoval(KafkaMetric metric) {}
-
- @Override
- public void close() {}
-
- @Override
- public void configure(Map<String, ?> configs) {}
+ @Test
+ public void testCheckpointTags() {
+ configs.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG,
MirrorCheckpointConnector.class.getName());
+ configs.put(MirrorCheckpointTaskConfig.TASK_CONSUMER_GROUPS, GROUP);
+ MirrorCheckpointTaskConfig taskConfig = new
MirrorCheckpointTaskConfig(configs);
+ MirrorCheckpointMetrics checkpointMetrics = new
MirrorCheckpointMetrics(pluginMetrics, taskConfig);
+
+ checkpointMetrics.checkpointLatency(SOURCE_TP, GROUP, 1L);
+ // 4 MirrorCheckpointConnector metrics + the metrics count metric
+ assertEquals(5, metrics.metrics().size());
+ Map<String, String> tags = metrics.metrics().keySet().stream()
+ .filter(m -> m.group().equals("plugins"))
+ .findFirst()
+ .get().tags();
+ assertEquals(SOURCE, tags.get("source"));
+ assertEquals(TARGET, tags.get("target"));
+ assertEquals(GROUP, tags.get("group"));
+ assertEquals(SOURCE_TP.topic(), tags.get("topic"));
+ assertEquals(String.valueOf(SOURCE_TP.partition()),
tags.get("partition"));
}
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 4a676855378..ba23b42b80b 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -181,7 +181,7 @@ public class MirrorSourceTaskTest {
KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
when(consumer.poll(any())).thenReturn(consumerRecords);
- MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+ MirrorSourceLegacyMetrics metrics =
mock(MirrorSourceLegacyMetrics.class);
String sourceClusterName = "cluster1";
ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
@@ -280,7 +280,7 @@ public class MirrorSourceTaskTest {
@SuppressWarnings("unchecked")
KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
- MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+ MirrorSourceLegacyMetrics metrics =
mock(MirrorSourceLegacyMetrics.class);
String sourceClusterName = "cluster1";
ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
@@ -310,7 +310,7 @@ public class MirrorSourceTaskTest {
@SuppressWarnings("unchecked")
KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
- MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+ MirrorSourceLegacyMetrics metrics =
mock(MirrorSourceLegacyMetrics.class);
PartitionState partitionState = new PartitionState(maxOffsetLag);
Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
OffsetSyncWriter offsetSyncWriter = mock(OffsetSyncWriter.class);
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 6d1d50f558b..e266acce74a 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.mirror.integration;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterConfigOp;
@@ -26,15 +27,20 @@ import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@@ -56,6 +62,7 @@ import
org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
+import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestCondition;
import org.junit.jupiter.api.AfterEach;
@@ -84,10 +91,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_LEGACY;
+import static
org.apache.kafka.connect.mirror.MirrorConnectorConfig.METRIC_NAMES_NEW;
import static
org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_CLIENT_ROLE_PREFIX;
import static
org.apache.kafka.connect.mirror.MirrorConnectorConfig.OFFSET_SYNCS_TOPIC_CONFIG_PREFIX;
import static org.apache.kafka.test.TestUtils.waitForCondition;
@@ -202,7 +212,9 @@ public class MirrorConnectorsIntegrationBaseTest {
mm2Config = new MirrorMakerConfig(mm2Props);
primaryWorkerProps = mm2Config.workerConfig(new
SourceAndTarget(BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS));
backupWorkerProps.putAll(mm2Config.workerConfig(new
SourceAndTarget(PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS)));
-
+ // Add the custom reporter to the worker configuration to collect
MirrorMarker metrics with the new formats
+
backupWorkerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
CollectAllMetricsReporter.class.getName());
+
primary = new EmbeddedConnectCluster.Builder()
.name(PRIMARY_CLUSTER_ALIAS + "-connect-cluster")
.numWorkers(NUM_WORKERS)
@@ -1001,6 +1013,108 @@ public class MirrorConnectorsIntegrationBaseTest {
});
}
+ @Test
+ public void testConnectorMetricsNew() throws InterruptedException,
ExecutionException {
+ testConnectorMetrics(METRIC_NAMES_NEW, () -> assertMetrics(false));
+ }
+
+ @Test
+ public void testConnectorMetricsLegacy() throws InterruptedException,
ExecutionException {
+ testConnectorMetrics(METRIC_NAMES_LEGACY, () -> assertMetrics(true));
+ }
+
+ @Test
+ public void testConnectorMetricsDefault() throws InterruptedException,
ExecutionException {
+ testConnectorMetrics(null, () -> assertMetrics(true));
+ }
+
+ @Test
+ public void testConnectorMetricsNewAndLegacy() throws
InterruptedException, ExecutionException {
+ testConnectorMetrics(METRIC_NAMES_NEW + "," + METRIC_NAMES_LEGACY, ()
-> assertMetrics(true) && assertMetrics(false));
+ }
+
+ private void testConnectorMetrics(String format, Supplier<Boolean>
assertions) throws InterruptedException, ExecutionException {
+ // one way replication from primary to backup
+ mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS +
".enabled", "false");
+ if (format != null) {
+ mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS +
"." + MirrorConnectorConfig.METRIC_NAMES_FORMAT, format);
+ }
+ // Add the custom reporter to the connector configuration to collect
MirrorMarker metrics with the old formats
+ mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + "."
+ CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
CollectAllMetricsReporter.class.getName());
+
+
backupWorkerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
CollectAllMetricsReporter.class.getName());
+ mm2Config = new MirrorMakerConfig(mm2Props);
+
+ String topic = "test-topic-metrics";
+ primary.kafka().createTopic(topic);
+ try (KafkaProducer<byte[], byte[]> producer =
primary.kafka().createProducer(Map.of())) {
+ for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+ producer.send(new ProducerRecord<>(topic, ("value" +
i).getBytes())).get();
+ }
+ }
+
+ waitUntilMirrorMakerIsRunning(backup,
+ List.of(MirrorSourceConnector.class,
MirrorCheckpointConnector.class),
+ mm2Config,
+ PRIMARY_CLUSTER_ALIAS,
+ BACKUP_CLUSTER_ALIAS);
+
+ try (KafkaConsumer<byte[], byte[]> consumer =
primary.kafka().createConsumer(
+ Map.of("group.id", "consumer-group-metrics",
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
+ ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "1"))) {
+ int consumed = 0;
+ consumer.subscribe(List.of(topic));
+ while (consumed < NUM_RECORDS_PRODUCED) {
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(100));
+ consumer.commitSync();
+ consumed += records.count();
+ }
+ }
+
+ waitForCondition(assertions::get, 30_000L, "Unable to find the
MirrorMaker metrics");
+ }
+
+ private boolean assertMetrics(boolean legacy) {
+ Set<String> expectedSourceTags = legacy
+ ? Set.of("source", "target", "topic", "partition")
+ : Set.of("connector", "task", "source", "target", "topic",
"partition");
+ Set<String> expectedCheckpointTags = legacy
+ ? Set.of("source", "target", "group", "topic", "partition")
+ : Set.of("connector", "task", "source", "target", "group",
"topic", "partition");
+ String source = MirrorSourceConnector.class.getSimpleName();
+ String checkpoint = MirrorCheckpointConnector.class.getSimpleName();
+ // We have 4 topics totalling 13 partitions
+ // primary.test-topic - 10 partitions
+ // primary.test-topic-no-checkpoints - 1 partition
+ // primary.test-topic-metrics - 1 partition
+ // primary.heartbeats - 1 partition
+ // There are 12 metrics per partition, 12 x 13 = 156
+ int expectedSourceCount = 156;
+ // We have 1 consumer group (consumer-group-metrics)
+ // There are 4 metrics per group
+ int expectedCheckpointCount = 4;
+
+ int sourceMetrics = 0;
+ int checkpointMetrics = 0;
+ for (MetricName metricName :
CollectAllMetricsReporter.METRICS.keySet()) {
+ Map<String, String> tags = metricName.tags();
+ if ((legacy && metricName.group().equals(source)) ||
+ (!legacy && metricName.group().equals("plugins") &&
source.equals(tags.get("connector")))) {
+ assertEquals(expectedSourceTags, tags.keySet());
+ sourceMetrics++;
+ }
+
+
+ if ((legacy && metricName.group().equals(checkpoint)) ||
+ (!legacy && metricName.group().equals("plugins") &&
checkpoint.equals(tags.get("connector")))) {
+ assertEquals(expectedCheckpointTags, tags.keySet());
+ checkpointMetrics++;
+ }
+ }
+ return sourceMetrics == expectedSourceCount && checkpointMetrics ==
expectedCheckpointCount;
+ }
+
private TopicPartition remoteTopicPartition(TopicPartition tp, String
alias) {
return new TopicPartition(remoteTopicName(tp.topic(), alias),
tp.partition());
}
@@ -1498,4 +1612,21 @@ public class MirrorConnectorsIntegrationBaseTest {
);
}
}
+
+ public static class CollectAllMetricsReporter extends MockMetricsReporter {
+
+ public static final Map<MetricName, KafkaMetric> METRICS = new
HashMap<>();
+
+ @Override
+ public void init(List<KafkaMetric> metrics) {
+ for (KafkaMetric metric : metrics) {
+ METRICS.put(metric.metricName(), metric);
+ }
+ }
+
+ @Override
+ public void metricChange(KafkaMetric metric) {
+ METRICS.put(metric.metricName(), metric);
+ }
+ }
}