This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new ea8e979dcc2 KAFKA-20264: fix overwrite in recursive parent-node
matching (#21540)
ea8e979dcc2 is described below
commit ea8e979dcc261062a63010fd6f74f609f9791ee4
Author: Gyeongwon, Do <[email protected]>
AuthorDate: Thu Apr 9 08:42:05 2026 +0900
KAFKA-20264: fix overwrite in recursive parent-node matching (#21540)
Description:
This change fixes a bug in recursive parent-node matching where a
previously found ancestor match could be overwritten by a later branch
returning null.
The issue occurs when traversing multiple parents. if an earlier parent
path finds a matching ancestor but a later parent path does not, the
previous implementation could return null instead of preserving the
first valid match. This can lead to incorrect ancestor detection
during topology optimization for merged streams and may prevent expected
repartition-topic sharing behavior. The fix returns immediately when a
recursive parent traversal finds a non-null match, and returns null only
after all parent branches have been checked with no match.
Testing strategy:
- Added a unit test that reproduces the multi-parent traversal case and
verifies the matching ancestor is preserved.
- Added topology optimization tests for merged streams with key-changing
operations on both left and right branches, verifying repartition-topic
sharing still produces a single repartition node.
Reviewers Nikita Shupletsov <[email protected]> , Bill Bejeck
<[email protected]>
---
.../kstream/internals/InternalStreamsBuilder.java | 13 ++--
.../internals/InternalStreamsBuilderTest.java | 82 ++++++++++++++++++++++
2 files changed, 91 insertions(+), 4 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index d42f2de9167..c64e13d3cf6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -688,20 +688,25 @@ public class InternalStreamsBuilder implements
InternalNameProvider {
return isVersionedUpstream(startSeekingNode);
}
- private GraphNode findParentNodeMatching(final GraphNode startSeekingNode,
+ // Visible for testing.
+ GraphNode findParentNodeMatching(final GraphNode startSeekingNode,
final Predicate<GraphNode>
parentNodePredicate) {
if (parentNodePredicate.test(startSeekingNode)) {
return startSeekingNode;
}
- GraphNode foundParentNode = null;
for (final GraphNode parentNode : startSeekingNode.parentNodes()) {
if (parentNodePredicate.test(parentNode)) {
return parentNode;
}
- foundParentNode = findParentNodeMatching(parentNode,
parentNodePredicate);
+
+ final GraphNode foundParentNode =
findParentNodeMatching(parentNode, parentNodePredicate);
+ if (foundParentNode != null) {
+ return foundParentNode;
+ }
}
- return foundParentNode;
+
+ return null;
}
public GraphNode root() {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 1aad07eba27..0d12532a800 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -26,6 +26,7 @@ import
org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -36,6 +37,7 @@ import
org.apache.kafka.streams.kstream.internals.graph.ForeignJoinSubscriptionS
import org.apache.kafka.streams.kstream.internals.graph.ForeignTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode;
+import
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.TableFilterNode;
import
org.apache.kafka.streams.kstream.internals.graph.TableRepartitionMapNode;
@@ -1235,6 +1237,79 @@ public class InternalStreamsBuilderTest {
verifyVersionedSemantics((ForeignTableJoinNode<?, ?>) joinOther, true);
}
+ @Test
+ public void shouldFindMatchingAncestorWhenLaterParentHasNoMatch() throws
Exception {
+ // Given:
+ final GraphNode matchingNode = newTestGraphNode("matchingNode");
+ matchingNode.setKeyChangingOperation(true);
+
+ final GraphNode p1 = newTestGraphNode("p1");
+ final GraphNode p2 = newTestGraphNode("p2");
+ final GraphNode startSeekNode = newTestGraphNode("startSeekNode");
+ matchingNode.addChild(p1);
+ p1.addChild(startSeekNode);
+ p2.addChild(startSeekNode);
+
+ // When:
+ final GraphNode result = builder.findParentNodeMatching(startSeekNode,
GraphNode::isKeyChangingOperation);
+
+ // Then:
+ assertNotNull(result);
+ assertEquals(matchingNode, result);
+ }
+
+ @Test
+ public void
shouldShareRepartitionTopicForMergedStreamWithKeyChangingOpOnLeftBranch() {
+ // Given:
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
+ final KStream<String, String> left =
builder.stream(Collections.singleton("topic-1"), consumed)
+ .selectKey((k, v) -> v)
+ .filter((k, v) -> v != null);
+ final KStream<String, String> right =
builder.stream(Collections.singleton("topic-2"), consumed);
+
+ final KStream<String, String> merged = left.merge(right);
+
+ final KGroupedStream<String, String> grouped = merged.groupByKey();
+ grouped.count(Materialized.as("count-store"));
+ grouped.aggregate(
+ () -> null,
+ (k, v, agg) -> k, Materialized.as("latest-store"));
+
+ // When:
+ builder.buildAndOptimizeTopology(props);
+
+ // Then:
+ final List<GraphNode> repartitionNodes = new ArrayList<>();
+ getNodesByType(builder.root, OptimizableRepartitionNode.class, new
HashSet<>(), repartitionNodes);
+ assertEquals(1, repartitionNodes.size());
+ }
+
+ @Test
+ public void
shouldShareRepartitionTopicForMergedStreamWithKeyChangingOpOnRightBranch() {
+ // Given:
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
+ final KStream<String, String> left =
builder.stream(Collections.singleton("topic-1"), consumed);
+ final KStream<String, String> right =
builder.stream(Collections.singleton("topic-2"), consumed)
+ .selectKey((k, v) -> v)
+ .filter((k, v) -> v != null);
+
+ final KStream<String, String> merged = left.merge(right);
+
+ final KGroupedStream<String, String> grouped = merged.groupByKey();
+ grouped.count(Materialized.as("count-store"));
+ grouped.aggregate(
+ () -> null,
+ (k, v, agg) -> k, Materialized.as("latest-store"));
+
+ // When:
+ builder.buildAndOptimizeTopology(props);
+
+ // Then:
+ final List<GraphNode> repartitionNodes = new ArrayList<>();
+ getNodesByType(builder.root, OptimizableRepartitionNode.class, new
HashSet<>(), repartitionNodes);
+ assertEquals(1, repartitionNodes.size());
+ }
+
private void verifyVersionedSemantics(final TableFilterNode<?, ?>
filterNode, final boolean expectedValue) {
final ProcessorSupplier<?, ?, ?, ?> processorSupplier =
filterNode.processorParameters().processorSupplier();
assertInstanceOf(KTableFilter.class, processorSupplier);
@@ -1326,4 +1401,11 @@ public class InternalStreamsBuilderTest {
}
}
}
+
+ private static GraphNode newTestGraphNode(final String name) {
+ return new GraphNode(name) {
+ @Override
+ public void writeToTopology(final InternalTopologyBuilder
topologyBuilder) { }
+ };
+ }
}