brandboat commented on code in PR #17126:
URL: https://github.com/apache/kafka/pull/17126#discussion_r1754695410


##########
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java:
##########
@@ -193,7 +193,7 @@ public void shouldGroupByKey(final TestInfo testInfo) 
throws Exception {
         produceMessages(timestamp);
 
         stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)))

Review Comment:
   In this test, we create topic with 3 partitions,  
https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java#L241
   but records are all in same timestamp, so we don't need to care about any 
recored will be dropped here, hence the grace period is not required.
   



##########
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java:
##########
@@ -157,7 +157,7 @@ public void shouldReduceWindowed(final TestInfo testInfo) 
throws Exception {
         produceMessages(secondBatchTimestamp);
 
         groupedStream
-            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L), 
ofMinutes(1L)))

Review Comment:
   In this test, we create topic with 3 partitions,  
https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java#L241
   and records with secondBatchTimestamp may come after records with 
firstBatchTimestamp, so we need grace period here to avoid records being 
dropped.
   
https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java#L154-L157



##########
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java:
##########
@@ -349,7 +348,7 @@ private Topology setupTopologyWithIntermediateTopic(final 
boolean useRepartition
             stream = builder.stream(INTERMEDIATE_USER_TOPIC);
         }
         stream.groupByKey()
-            .windowedBy(TimeWindows.of(ofMillis(35)).advanceBy(ofMillis(10)))
+            
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(35)).advanceBy(ofMillis(10)))

Review Comment:
   I removed the grace period from the test code wherever possible, as I 
believe most test cases don't require it. IMO, we should only add a grace 
period when it's necessary. However, if this complicates the code review, I can 
add grace period (1 day) back to these test cases.
   
   I'm not entirely sure I know the details of Streams, so please correct me if 
I'm wrong., in this test class, we create topics with only one partition, 
ensuring that all records arrive in sequential order. 
   
https://github.com/apache/kafka/blob/f59d829381a7021971b0b2835fbbacb4abd996a5/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java#L205
   And in 
https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java#L177
 all records timestamp are in asendending order. As a result, there's no need 
for grace period here.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to