This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new db3ece1042c KAFKA-20264: fix overwrite in recursive parent-node
matching (#21540)
db3ece1042c is described below
commit db3ece1042cd6d66af7a251f0caf502d4f9438f3
Author: Gyeongwon, Do <[email protected]>
AuthorDate: Thu Apr 9 08:42:05 2026 +0900
KAFKA-20264: fix overwrite in recursive parent-node matching (#21540)
Description:
This change fixes a bug in recursive parent-node matching where a
previously found ancestor match could be overwritten by a later branch
returning null.
The issue occurs when traversing multiple parents. if an earlier parent
path finds a matching ancestor but a later parent path does not, the
previous implementation could return null instead of preserving the
first valid match. This can lead to incorrect ancestor detection
during topology optimization for merged streams and may prevent expected
repartition-topic sharing behavior. The fix returns immediately when a
recursive parent traversal finds a non-null match, and returns null only
after all parent branches have been checked with no match.
Testing strategy:
- Added a unit test that reproduces the multi-parent traversal case and
verifies the matching ancestor is preserved.
- Added topology optimization tests for merged streams with key-changing
operations on both left and right branches, verifying repartition-topic
sharing still produces a single repartition node.
Reviewers Nikita Shupletsov <[email protected]> , Bill Bejeck
<[email protected]>
---
.../kstream/internals/InternalStreamsBuilder.java | 13 ++--
.../internals/InternalStreamsBuilderTest.java | 82 ++++++++++++++++++++++
2 files changed, 91 insertions(+), 4 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index d42f2de9167..c64e13d3cf6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -688,20 +688,25 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
return isVersionedUpstream(startSeekingNode);
}
- private GraphNode findParentNodeMatching(final GraphNode startSeekingNode,
+ // Visible for testing.
+ GraphNode findParentNodeMatching(final GraphNode startSeekingNode,
final Predicate<GraphNode>
parentNodePredicate) {
if (parentNodePredicate.test(startSeekingNode)) {
return startSeekingNode;
}
- GraphNode foundParentNode = null;
for (final GraphNode parentNode : startSeekingNode.parentNodes()) {
if (parentNodePredicate.test(parentNode)) {
return parentNode;
}
- foundParentNode = findParentNodeMatching(parentNode,
parentNodePredicate);
+
+ final GraphNode foundParentNode =
findParentNodeMatching(parentNode, parentNodePredicate);
+ if (foundParentNode != null) {
+ return foundParentNode;
+ }
}
- return foundParentNode;
+
+ return null;
}
public GraphNode root() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 1aad07eba27..0d12532a800 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -26,6 +26,7 @@ import
org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -36,6 +37,7 @@ import
org.apache.kafka.streams.kstream.internals.graph.ForeignJoinSubscriptionS
import org.apache.kafka.streams.kstream.internals.graph.ForeignTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
+import
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode;
import
org.apache.kafka.streams.kstream.internals.graph.TableRepartitionMapNode;
@@ -1235,6 +1237,79 @@ public class InternalStreamsBuilderTest {
verifyVersionedSemantics((ForeignTableJoinNode<?, ?>) joinOther, true);
}
+ @Test
+ public void shouldFindMatchingAncestorWhenLaterParentHasNoMatch() throws
Exception {
+ // Given:
+ final GraphNode matchingNode = newTestGraphNode("matchingNode");
+ matchingNode.setKeyChangingOperation(true);
+
+ final GraphNode p1 = newTestGraphNode("p1");
+ final GraphNode p2 = newTestGraphNode("p2");
+ final GraphNode startSeekNode = newTestGraphNode("startSeekNode");
+ matchingNode.addChild(p1);
+ p1.addChild(startSeekNode);
+ p2.addChild(startSeekNode);
+
+ // When:
+ final GraphNode result = builder.findParentNodeMatching(startSeekNode,
GraphNode::isKeyChangingOperation);
+
+ // Then:
+ assertNotNull(result);
+ assertEquals(matchingNode, result);
+ }
+
+ @Test
+ public void
shouldShareRepartitionTopicForMergedStreamWithKeyChangingOpOnLeftBranch() {
+ // Given:
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
+ final KStream<String, String> left =
builder.stream(Collections.singleton("topic-1"), consumed)
+ .selectKey((k, v) -> v)
+ .filter((k, v) -> v != null);
+ final KStream<String, String> right =
builder.stream(Collections.singleton("topic-2"), consumed);
+
+ final KStream<String, String> merged = left.merge(right);
+
+ final KGroupedStream<String, String> grouped = merged.groupByKey();
+ grouped.count(Materialized.as("count-store"));
+ grouped.aggregate(
+ () -> null,
+ (k, v, agg) -> k, Materialized.as("latest-store"));
+
+ // When:
+ builder.buildAndOptimizeTopology(props);
+
+ // Then:
+ final List<GraphNode> repartitionNodes = new ArrayList<>();
+ getNodesByType(builder.root, OptimizableRepartitionNode.class, new
HashSet<>(), repartitionNodes);
+ assertEquals(1, repartitionNodes.size());
+ }
+
+ @Test
+ public void
shouldShareRepartitionTopicForMergedStreamWithKeyChangingOpOnRightBranch() {
+ // Given:
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
+ final KStream<String, String> left =
builder.stream(Collections.singleton("topic-1"), consumed);
+ final KStream<String, String> right =
builder.stream(Collections.singleton("topic-2"), consumed)
+ .selectKey((k, v) -> v)
+ .filter((k, v) -> v != null);
+
+ final KStream<String, String> merged = left.merge(right);
+
+ final KGroupedStream<String, String> grouped = merged.groupByKey();
+ grouped.count(Materialized.as("count-store"));
+ grouped.aggregate(
+ () -> null,
+ (k, v, agg) -> k, Materialized.as("latest-store"));
+
+ // When:
+ builder.buildAndOptimizeTopology(props);
+
+ // Then:
+ final List<GraphNode> repartitionNodes = new ArrayList<>();
+ getNodesByType(builder.root, OptimizableRepartitionNode.class, new
HashSet<>(), repartitionNodes);
+ assertEquals(1, repartitionNodes.size());
+ }
+
private void verifyVersionedSemantics(final TableFilterNode<?, ?>
filterNode, final boolean expectedValue) {
final ProcessorSupplier<?, ?, ?, ?> processorSupplier =
filterNode.processorParameters().processorSupplier();
assertInstanceOf(KTableFilter.class, processorSupplier);
@@ -1326,4 +1401,11 @@ public class InternalStreamsBuilderTest {
}
}
}
+
+ private static GraphNode newTestGraphNode(final String name) {
+ return new GraphNode(name) {
+ @Override
+ public void writeToTopology(final InternalTopologyBuilder
topologyBuilder) { }
+ };
+ }
}