brandboat commented on code in PR #17126:
URL: https://github.com/apache/kafka/pull/17126#discussion_r1754695410
##########
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java:
##########
@@ -193,7 +193,7 @@ public void shouldGroupByKey(final TestInfo testInfo)
throws Exception {
produceMessages(timestamp);
stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)))
Review Comment:
In this test, we create topic with 3 partitions,
https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java#L241
but records are all in same timestamp, so we don't need to care about any
recored will be dropped here, hence the grace period is not required.
##########
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java:
##########
@@ -157,7 +157,7 @@ public void shouldReduceWindowed(final TestInfo testInfo)
throws Exception {
produceMessages(secondBatchTimestamp);
groupedStream
- .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L),
ofMinutes(1L)))
Review Comment:
In this test, we create topic with 3 partitions,
https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java#L241
and records with secondBatchTimestamp may come after records with
firstBatchTimestamp, so we need grace period here to avoid records being
dropped.
https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java#L154-L157
##########
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java:
##########
@@ -349,7 +348,7 @@ private Topology setupTopologyWithIntermediateTopic(final
boolean useRepartition
stream = builder.stream(INTERMEDIATE_USER_TOPIC);
}
stream.groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(35)).advanceBy(ofMillis(10)))
+
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(35)).advanceBy(ofMillis(10)))
Review Comment:
I removed the grace period from the test code wherever possible, as I
believe most test cases don't require it. IMO, we should only add a grace
period when it's necessary. However, if this complicates the code review, I can
add grace period (1 day) back to these test cases.
I'm not entirely sure I know the details of Streams, so please correct me if
I'm wrong., in this test class, we create topics with only one partition,
ensuring that all records arrive in sequential order.
https://github.com/apache/kafka/blob/f59d829381a7021971b0b2835fbbacb4abd996a5/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java#L205
And in
https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java#L177
all records timestamp are in asendending order. As a result, there's no need
for grace period here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]