mjsax commented on code in PR #14426:
URL: https://github.com/apache/kafka/pull/14426#discussion_r1511998434
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##########
@@ -438,13 +438,13 @@ public void testOrdering() {
inputTopic1.pipeInput(1, "A1", 100L);
processor.checkAndClearProcessResult();
- // push one item to the other window that has a join; this should
produce non-joined records with a closed window first, then
- // the joined records
- // by the time they were produced before
+ // push one item to the other window that has a join;
+ // this should produce the joined record first;
+ // then the not-joined record
Review Comment:
This change to the comment needs to be rolled back, right? We indeed produce
the left-null join result first.
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamOuterJoinTest.java:
##########
@@ -884,11 +886,13 @@ public void
shouldNotEmitLeftJoinResultForAsymmetricAfterWindow() {
processor.checkAndClearProcessResult();
- // push one item to the first stream; this should produce one
full-join item
+ // push one item to the first stream;
+ // this should produce one inner-join item;
+ // and a right-joined item for a3
Review Comment:
We don't produce output for `a3`
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##########
@@ -436,6 +436,239 @@ public void
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
}
}
+ @Test
+ public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+ final KStream<Integer, String> joined;
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier =
new MockApiProcessorSupplier<>();
+ stream1 = builder.stream(topic1, consumed);
+ stream2 = builder.stream(topic2, consumed);
+
+ joined = stream1.leftJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+ StreamJoined.with(Serdes.Integer(),
+ Serdes.String(),
+ Serdes.String())
+ );
+ joined.process(supplier);
+
+ final Collection<Set<String>> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), PROPS)) {
+ final TestInputTopic<Integer, String> inputTopic1 =
+ driver.createInputTopic(topic1, new IntegerSerializer(),
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final TestInputTopic<Integer, String> inputTopic2 =
+ driver.createInputTopic(topic2, new IntegerSerializer(),
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final MockApiProcessor<Integer, String, Void, Void> processor =
supplier.theCapturedProcessor();
+
+ processor.init(null);
+ // push four items with increasing timestamps to the primary
stream; this should emit null-joined items
+ // w1 = {}
+ // w2 = {}
+ // --> w1 = { 0:B0 (ts: 1000), 1:B1 (ts: 1001), 2:B2 (ts: 1002),
3:B3 (ts: 1003) }
Review Comment:
Why are we using `B` not `A` for left input?
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##########
@@ -436,6 +436,239 @@ public void
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
}
}
+ @Test
+ public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
Review Comment:
Why do we need this test case? Seems it's fully contained in
`testLeftJoinedRecordsWithZeroAfterAreEmitted` below?
##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java:
##########
@@ -436,6 +436,239 @@ public void
testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
}
}
+ @Test
+ public void testLeftNonJoinedRecordsWithZeroAfterAreEmitted() {
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ final int[] expectedKeys = new int[] {0, 1, 2, 3};
+
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+ final KStream<Integer, String> joined;
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier =
new MockApiProcessorSupplier<>();
+ stream1 = builder.stream(topic1, consumed);
+ stream2 = builder.stream(topic2, consumed);
+
+ joined = stream1.leftJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)).after(ZERO),
+ StreamJoined.with(Serdes.Integer(),
+ Serdes.String(),
+ Serdes.String())
+ );
+ joined.process(supplier);
+
+ final Collection<Set<String>> copartitionGroups =
+
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
+
+ assertEquals(1, copartitionGroups.size());
+ assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)),
copartitionGroups.iterator().next());
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), PROPS)) {
+ final TestInputTopic<Integer, String> inputTopic1 =
+ driver.createInputTopic(topic1, new IntegerSerializer(),
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final TestInputTopic<Integer, String> inputTopic2 =
+ driver.createInputTopic(topic2, new IntegerSerializer(),
new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
+ final MockApiProcessor<Integer, String, Void, Void> processor =
supplier.theCapturedProcessor();
+
+ processor.init(null);
+ // push four items with increasing timestamps to the primary
stream; this should emit null-joined items
Review Comment:
`his should emit [three] null-joined items; the last one does not emit yet`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]