This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new db3ece1042c KAFKA-20264: fix overwrite in recursive parent-node 
matching (#21540)
db3ece1042c is described below

commit db3ece1042cd6d66af7a251f0caf502d4f9438f3
Author: Gyeongwon, Do <[email protected]>
AuthorDate: Thu Apr 9 08:42:05 2026 +0900

    KAFKA-20264: fix overwrite in recursive parent-node matching (#21540)
    
    Description:
    
    This change fixes a bug in recursive parent-node matching where a
    previously found ancestor match could be overwritten by a later branch
    returning null.
    
    The issue occurs when traversing multiple parents.  if an earlier parent
    path finds a matching ancestor but a later parent path does not, the
    previous implementation could return null instead of preserving the
    first valid match.   This can lead to incorrect ancestor detection
    during topology optimization for merged streams and may prevent expected
    repartition-topic sharing behavior.  The fix returns immediately when a
    recursive parent traversal finds a non-null match, and returns null only
    after all parent branches have been checked with no match.
    
    Testing strategy:
    
    - Added a unit test that reproduces the multi-parent traversal case and
    verifies the matching ancestor is preserved.
    - Added topology optimization tests for merged streams with key-changing
    operations on both left and right branches, verifying repartition-topic
    sharing still produces a single repartition node.
    
    Reviewers Nikita Shupletsov <[email protected]> , Bill Bejeck
    <[email protected]>
---
 .../kstream/internals/InternalStreamsBuilder.java  | 13 ++--
 .../internals/InternalStreamsBuilderTest.java      | 82 ++++++++++++++++++++++
 2 files changed, 91 insertions(+), 4 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index d42f2de9167..c64e13d3cf6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -688,20 +688,25 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         return isVersionedUpstream(startSeekingNode);
     }
 
-    private GraphNode findParentNodeMatching(final GraphNode startSeekingNode,
+    // Visible for testing.
+    GraphNode findParentNodeMatching(final GraphNode startSeekingNode,
                                              final Predicate<GraphNode> 
parentNodePredicate) {
         if (parentNodePredicate.test(startSeekingNode)) {
             return startSeekingNode;
         }
-        GraphNode foundParentNode = null;
 
         for (final GraphNode parentNode : startSeekingNode.parentNodes()) {
             if (parentNodePredicate.test(parentNode)) {
                 return parentNode;
             }
-            foundParentNode = findParentNodeMatching(parentNode, 
parentNodePredicate);
+
+            final GraphNode foundParentNode = 
findParentNodeMatching(parentNode, parentNodePredicate);
+            if (foundParentNode != null) {
+                return foundParentNode;
+            }
         }
-        return foundParentNode;
+
+        return null;
     }
 
     public GraphNode root() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 1aad07eba27..0d12532a800 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -26,6 +26,7 @@ import 
org.apache.kafka.streams.internals.AutoOffsetResetInternal;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -36,6 +37,7 @@ import 
org.apache.kafka.streams.kstream.internals.graph.ForeignJoinSubscriptionS
 import org.apache.kafka.streams.kstream.internals.graph.ForeignTableJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
 import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
+import 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
 import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode;
 import 
org.apache.kafka.streams.kstream.internals.graph.TableRepartitionMapNode;
@@ -1235,6 +1237,79 @@ public class InternalStreamsBuilderTest {
         verifyVersionedSemantics((ForeignTableJoinNode<?, ?>) joinOther, true);
     }
 
+    @Test
+    public void shouldFindMatchingAncestorWhenLaterParentHasNoMatch() throws 
Exception {
+        // Given:
+        final GraphNode matchingNode = newTestGraphNode("matchingNode");
+        matchingNode.setKeyChangingOperation(true);
+
+        final GraphNode p1 = newTestGraphNode("p1");
+        final GraphNode p2 = newTestGraphNode("p2");
+        final GraphNode startSeekNode = newTestGraphNode("startSeekNode");
+        matchingNode.addChild(p1);
+        p1.addChild(startSeekNode);
+        p2.addChild(startSeekNode);
+
+        // When:
+        final GraphNode result = builder.findParentNodeMatching(startSeekNode, 
GraphNode::isKeyChangingOperation);
+
+        // Then:
+        assertNotNull(result);
+        assertEquals(matchingNode, result);
+    }
+
+    @Test
+    public void 
shouldShareRepartitionTopicForMergedStreamWithKeyChangingOpOnLeftBranch() {
+        // Given:
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        final KStream<String, String> left = 
builder.stream(Collections.singleton("topic-1"), consumed)
+                .selectKey((k, v) -> v)
+                .filter((k, v) -> v != null);
+        final KStream<String, String> right = 
builder.stream(Collections.singleton("topic-2"), consumed);
+
+        final KStream<String, String> merged = left.merge(right);
+
+        final KGroupedStream<String, String> grouped = merged.groupByKey();
+        grouped.count(Materialized.as("count-store"));
+        grouped.aggregate(
+                () -> null,
+                (k, v, agg) -> k, Materialized.as("latest-store"));
+
+        // When:
+        builder.buildAndOptimizeTopology(props);
+
+        // Then:
+        final List<GraphNode> repartitionNodes = new ArrayList<>();
+        getNodesByType(builder.root, OptimizableRepartitionNode.class, new 
HashSet<>(), repartitionNodes);
+        assertEquals(1, repartitionNodes.size());
+    }
+
+    @Test
+    public void 
shouldShareRepartitionTopicForMergedStreamWithKeyChangingOpOnRightBranch() {
+        // Given:
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
+        final KStream<String, String> left = 
builder.stream(Collections.singleton("topic-1"), consumed);
+        final KStream<String, String> right = 
builder.stream(Collections.singleton("topic-2"), consumed)
+                .selectKey((k, v) -> v)
+                .filter((k, v) -> v != null);
+
+        final KStream<String, String> merged = left.merge(right);
+
+        final KGroupedStream<String, String> grouped = merged.groupByKey();
+        grouped.count(Materialized.as("count-store"));
+        grouped.aggregate(
+                () -> null,
+                (k, v, agg) -> k, Materialized.as("latest-store"));
+
+        // When:
+        builder.buildAndOptimizeTopology(props);
+
+        // Then:
+        final List<GraphNode> repartitionNodes = new ArrayList<>();
+        getNodesByType(builder.root, OptimizableRepartitionNode.class, new 
HashSet<>(), repartitionNodes);
+        assertEquals(1, repartitionNodes.size());
+    }
+
     private void verifyVersionedSemantics(final TableFilterNode<?, ?> 
filterNode, final boolean expectedValue) {
         final ProcessorSupplier<?, ?, ?, ?> processorSupplier = 
filterNode.processorParameters().processorSupplier();
         assertInstanceOf(KTableFilter.class, processorSupplier);
@@ -1326,4 +1401,11 @@ public class InternalStreamsBuilderTest {
             }
         }
     }
+
+    private static GraphNode newTestGraphNode(final String name) {
+        return new GraphNode(name) {
+            @Override
+            public void writeToTopology(final InternalTopologyBuilder 
topologyBuilder) { }
+        };
+    }
 }

Reply via email to