This is an automated email from the ASF dual-hosted git repository.
viktorsomogyi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dc5f8164037 KAFKA-9914: Fix replication cycle detection (#22079)
dc5f8164037 is described below
commit dc5f816403724b2e558f826f6bf64c41821230b0
Author: Tommi Vainikainen <[email protected]>
AuthorDate: Tue Apr 21 14:52:31 2026 +0300
KAFKA-9914: Fix replication cycle detection (#22079)
Fix replication cycle detection to detect recursive topic cycles if
kafka cluster naming differs between multiple MM2 instances with special
handling for IdentityReplicationPolicy.
Reviewers: Viktor Somogyi-Vass <[email protected]>
---------
Co-authored-by: Ivan Yurchenko <[email protected]>
Co-authored-by: Viktor Somogyi-Vass <[email protected]>
---
.../kafka/connect/mirror/MirrorSourceConnector.java | 11 ++++++++++-
.../connect/mirror/MirrorSourceConnectorTest.java | 20 +++++++++++++++++++-
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 4c060b5c1ab..ecaf6bd06d1 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -720,7 +720,16 @@ public class MirrorSourceConnector extends SourceConnector
{
String source = replicationPolicy.topicSource(topic);
if (source == null) {
return false;
- } else if (source.equals(sourceAndTarget.target())) {
+ }
+
+ final boolean condition;
+ if (replicationPolicy instanceof IdentityReplicationPolicy) {
+ condition = source.equals(sourceAndTarget.target());
+ } else {
+ condition = source.equals(sourceAndTarget.source()) ||
source.equals(sourceAndTarget.target());
+ }
+
+ if (condition) {
return true;
} else {
String upstreamTopic = replicationPolicy.upstreamTopic(topic);
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index 8d17e2ac9ae..8fea16d1c91 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -130,13 +130,31 @@ public class MirrorSourceConnectorTest {
public void testNoCycles() {
MirrorSourceConnector connector = new MirrorSourceConnector(new
SourceAndTarget("source", "target"),
new DefaultReplicationPolicy(), x -> true,
getConfigPropertyFilter());
+ assertFalse(connector.shouldReplicateTopic("source.topic1"), "should
not allow cycles");
assertFalse(connector.shouldReplicateTopic("target.topic1"), "should
not allow cycles");
assertFalse(connector.shouldReplicateTopic("target.source.topic1"),
"should not allow cycles");
assertFalse(connector.shouldReplicateTopic("source.target.topic1"),
"should not allow cycles");
assertFalse(connector.shouldReplicateTopic("target.source.target.topic1"),
"should not allow cycles");
assertFalse(connector.shouldReplicateTopic("source.target.source.topic1"),
"should not allow cycles");
assertTrue(connector.shouldReplicateTopic("topic1"), "should allow
anything else");
- assertTrue(connector.shouldReplicateTopic("source.topic1"), "should
allow anything else");
+ assertTrue(connector.shouldReplicateTopic("othersource.topic1"),
"should allow anything else");
+ assertTrue(connector.shouldReplicateTopic("othertarget.topic1"),
"should allow anything else");
+ assertTrue(connector.shouldReplicateTopic("other.another.topic1"),
"should allow anything else");
+
+ final IdentityReplicationPolicy identityReplicationPolicy = new
IdentityReplicationPolicy();
+ final HashMap<String, String> props = new HashMap<>();
+ props.put("source.cluster.alias", "source");
+ identityReplicationPolicy.configure(props);
+ connector = new MirrorSourceConnector(new SourceAndTarget("source",
"target"),
+ identityReplicationPolicy, x -> true, x -> true);
+ assertTrue(connector.shouldReplicateTopic("source.topic1"), "should
not consider this a cycle");
+ assertTrue(connector.shouldReplicateTopic("target.topic1"), "should
not consider this a cycle");
+ assertTrue(connector.shouldReplicateTopic("target.source.topic1"),
"should not consider this a cycle");
+ assertTrue(connector.shouldReplicateTopic("source.target.topic1"),
"should not consider this a cycle");
+ assertTrue(connector.shouldReplicateTopic("topic1"), "should not
consider this a cycle");
+ assertTrue(connector.shouldReplicateTopic("othersource.topic1"),
"should not consider this a cycle");
+ assertTrue(connector.shouldReplicateTopic("othertarget.topic1"),
"should not consider this a cycle");
+ assertTrue(connector.shouldReplicateTopic("other.another.topic1"),
"should not consider this a cycle");
}
@Test