mjsax commented on code in PR #20406:
URL: https://github.com/apache/kafka/pull/20406#discussion_r2408968508


##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##########
@@ -376,104 +380,168 @@ public static VerificationResult verify(final String 
kafka,
                                             final Map<String, Set<Integer>> 
inputs,
                                             final int maxRecordsPerKey,
                                             final boolean eosEnabled) {
+        final Properties props = createConsumerProperties(kafka);
+        final KafkaConsumer<String, Number> consumer = new 
KafkaConsumer<>(props);

Review Comment:
   nit: let's user try-with-resource



##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##########
@@ -721,6 +789,28 @@ private static Number getMax(final String key) {
         return Integer.parseInt(key.split("-")[1]);
     }
 
+    private static VerificationResult performEosVerification(final boolean 
eosEnabled, final String kafka) {
+        if (!eosEnabled) {
+            return new VerificationResult(true, "EOS verification skipped");
+        }
+        
+        final Properties eosProps = new Properties();
+        eosProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "transaction-verifier");
+        eosProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        eosProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+        eosProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+        eosProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString());
+        
+        try (final KafkaConsumer<byte[], byte[]> eosConsumer = new 
KafkaConsumer<>(eosProps)) {
+            verifyAllTransactionFinished(eosConsumer, kafka);
+            return new VerificationResult(true, "EOS verification passed");

Review Comment:
   If we are working with `VerificationResult`, would it be better to let 
`verifyAllTransactionFinished` return what we need? Or at least a boolean, to 
avoid unnecessary exception handling?



##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java:
##########
@@ -732,4 +822,52 @@ private static List<TopicPartition> getAllPartitions(final 
KafkaConsumer<?, ?> c
         return partitions;
     }
 
+    private static void verifyAllTransactionFinished(final 
KafkaConsumer<byte[], byte[]> consumer,

Review Comment:
   Seems we copied this from `EosTestDriver`? Why did we drop the repartition 
step?
   
   If we use repartitioning in the EOS test, but not in the Smoke test, we 
should add a partition step to the smoke test, to not reduce test coverage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to