This is an automated email from the ASF dual-hosted git repository.

viktorsomogyi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dc5f8164037 KAFKA-9914: Fix replication cycle detection (#22079)
dc5f8164037 is described below

commit dc5f816403724b2e558f826f6bf64c41821230b0
Author: Tommi Vainikainen <[email protected]>
AuthorDate: Tue Apr 21 14:52:31 2026 +0300

    KAFKA-9914: Fix replication cycle detection (#22079)
    
    Fix replication cycle detection to detect recursive topic cycles if
    kafka cluster naming differs between multiple MM2 instances with special
    handling for IdentityReplicationPolicy.
    
    Reviewers: Viktor Somogyi-Vass <[email protected]>
    
    ---------
    
    Co-authored-by: Ivan Yurchenko <[email protected]>
    Co-authored-by: Viktor Somogyi-Vass <[email protected]>
---
 .../kafka/connect/mirror/MirrorSourceConnector.java  | 11 ++++++++++-
 .../connect/mirror/MirrorSourceConnectorTest.java    | 20 +++++++++++++++++++-
 2 files changed, 29 insertions(+), 2 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index 4c060b5c1ab..ecaf6bd06d1 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -720,7 +720,16 @@ public class MirrorSourceConnector extends SourceConnector 
{
         String source = replicationPolicy.topicSource(topic);
         if (source == null) {
             return false;
-        } else if (source.equals(sourceAndTarget.target())) {
+        }
+
+        final boolean condition;
+        if (replicationPolicy instanceof IdentityReplicationPolicy) {
+            condition = source.equals(sourceAndTarget.target());
+        } else {
+            condition = source.equals(sourceAndTarget.source()) || 
source.equals(sourceAndTarget.target());
+        }
+
+        if (condition) {
             return true;
         } else {
             String upstreamTopic = replicationPolicy.upstreamTopic(topic);
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index 8d17e2ac9ae..8fea16d1c91 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -130,13 +130,31 @@ public class MirrorSourceConnectorTest {
     public void testNoCycles() {
         MirrorSourceConnector connector = new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),
             new DefaultReplicationPolicy(), x -> true, 
getConfigPropertyFilter());
+        assertFalse(connector.shouldReplicateTopic("source.topic1"), "should 
not allow cycles");
         assertFalse(connector.shouldReplicateTopic("target.topic1"), "should 
not allow cycles");
         assertFalse(connector.shouldReplicateTopic("target.source.topic1"), 
"should not allow cycles");
         assertFalse(connector.shouldReplicateTopic("source.target.topic1"), 
"should not allow cycles");
         
assertFalse(connector.shouldReplicateTopic("target.source.target.topic1"), 
"should not allow cycles");
         
assertFalse(connector.shouldReplicateTopic("source.target.source.topic1"), 
"should not allow cycles");
         assertTrue(connector.shouldReplicateTopic("topic1"), "should allow 
anything else");
-        assertTrue(connector.shouldReplicateTopic("source.topic1"), "should 
allow anything else");
+        assertTrue(connector.shouldReplicateTopic("othersource.topic1"), 
"should allow anything else");
+        assertTrue(connector.shouldReplicateTopic("othertarget.topic1"), 
"should allow anything else");
+        assertTrue(connector.shouldReplicateTopic("other.another.topic1"), 
"should allow anything else");
+
+        final IdentityReplicationPolicy identityReplicationPolicy = new 
IdentityReplicationPolicy();
+        final HashMap<String, String> props = new HashMap<>();
+        props.put("source.cluster.alias", "source");
+        identityReplicationPolicy.configure(props);
+        connector = new MirrorSourceConnector(new SourceAndTarget("source", 
"target"),
+            identityReplicationPolicy, x -> true, x -> true);
+        assertTrue(connector.shouldReplicateTopic("source.topic1"), "should 
not consider this a cycle");
+        assertTrue(connector.shouldReplicateTopic("target.topic1"), "should 
not consider this a cycle");
+        assertTrue(connector.shouldReplicateTopic("target.source.topic1"), 
"should not consider this a cycle");
+        assertTrue(connector.shouldReplicateTopic("source.target.topic1"), 
"should not consider this a cycle");
+        assertTrue(connector.shouldReplicateTopic("topic1"), "should not 
consider this a cycle");
+        assertTrue(connector.shouldReplicateTopic("othersource.topic1"), 
"should not consider this a cycle");
+        assertTrue(connector.shouldReplicateTopic("othertarget.topic1"), 
"should not consider this a cycle");
+        assertTrue(connector.shouldReplicateTopic("other.another.topic1"), 
"should not consider this a cycle");
     }
 
     @Test

Reply via email to