mjsax commented on code in PR #16988:
URL: https://github.com/apache/kafka/pull/16988#discussion_r1755671800


##########
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java:
##########
@@ -1085,77 +1086,72 @@ public void shouldAllowToQueryAfterThreadDied() throws 
Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> input = builder.stream(streamOne);
         input
-            .groupByKey()
-            .reduce((value1, value2) -> {
-                if (value1.length() > 1) {
-                    if (beforeFailure.compareAndSet(true, false)) {
-                        throw new RuntimeException("Injected test exception");
+                .groupByKey()
+                .reduce((value1, value2) -> {
+                    if (value1.length() > 1) {
+                        if (beforeFailure.compareAndSet(true, false)) {
+                            throw new RuntimeException("Injected test 
exception");
+                        }
                     }
-                }
-                return value1 + value2;
-            }, Materialized.as(storeName))
-            .toStream()
-            .to(outputTopic);
+                    return value1 + value2;
+                }, Materialized.as(storeName))
+                .toStream()
+                .to(outputTopic);
 
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
+        kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD);

Review Comment:
   The test used the old handler, which translated to letting the thread die 
IIRC? Is this change to replace the thread correct?



##########
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java:
##########
@@ -1085,77 +1086,72 @@ public void shouldAllowToQueryAfterThreadDied() throws 
Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> input = builder.stream(streamOne);
         input
-            .groupByKey()
-            .reduce((value1, value2) -> {
-                if (value1.length() > 1) {
-                    if (beforeFailure.compareAndSet(true, false)) {
-                        throw new RuntimeException("Injected test exception");
+                .groupByKey()

Review Comment:
   nit: seems this is just reformatting changes? (Make the diff hard to read -- 
please avoid, so we can focus on actual changes)



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -488,18 +461,7 @@ public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler us
     }
 
     private void defaultStreamsUncaughtExceptionHandler(final Throwable 
throwable, final boolean skipThreadReplacement) {

Review Comment:
   Given that this method just call the other one, it seems we can remove this 
method, and let the caller call ` handleStreamsUncaughtException` directly?



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -561,12 +561,12 @@ public void 
shouldNotViolateEosIfOneTaskFailsWithState(final String eosConfig, f
                 getMaxPerKey(expectedResultBeforeFailure),
                 "The state store content before failure do not match what 
expected");
 
-            errorInjected.set(true);
+            //errorInjected.set(true);

Review Comment:
   Seems like an invalid change? (btw: this test failed on the CI build -- I 
assume this unnecessary change is the reason why it's failing?)



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -442,8 +442,8 @@ public void shouldNotViolateEosIfOneTaskFails(final String 
eosConfig, final bool
             writeInputData(dataAfterFailure);
 
             waitForCondition(
-                () -> uncaughtException != null, MAX_WAIT_TIME_MS,
-                "Should receive uncaught exception from one StreamThread.");
+                    () -> uncaughtException != null, MAX_WAIT_TIME_MS,
+                    "Should receive uncaught exception from one 
StreamThread.");

Review Comment:
   nit: avoid unnecessary reformatting (maybe need to change your IDE setting 
to disable auto-reformatting)



##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -1170,16 +1170,11 @@ public void process(final Record<Long, Long> record) {
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), config);
 
-        streams.setUncaughtExceptionHandler((t, e) -> {
-            if (uncaughtException != null ||
-                !(e instanceof StreamsException) ||
-                !e.getCause().getMessage().equals("Injected test exception.")) 
{
-                e.printStackTrace(System.err);
-                hasUnexpectedError = true;
-            }
-            uncaughtException = e;
+        streams.setUncaughtExceptionHandler(e -> {

Review Comment:
   Seems the new impl for the handler is different than the original one. Why?



##########
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java:
##########
@@ -1085,77 +1086,72 @@ public void shouldAllowToQueryAfterThreadDied() throws 
Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         final KStream<String, String> input = builder.stream(streamOne);
         input
-            .groupByKey()
-            .reduce((value1, value2) -> {
-                if (value1.length() > 1) {
-                    if (beforeFailure.compareAndSet(true, false)) {
-                        throw new RuntimeException("Injected test exception");
+                .groupByKey()
+                .reduce((value1, value2) -> {
+                    if (value1.length() > 1) {
+                        if (beforeFailure.compareAndSet(true, false)) {
+                            throw new RuntimeException("Injected test 
exception");
+                        }
                     }
-                }
-                return value1 + value2;
-            }, Materialized.as(storeName))
-            .toStream()
-            .to(outputTopic);
+                    return value1 + value2;
+                }, Materialized.as(storeName))
+                .toStream()
+                .to(outputTopic);
 
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
+        kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD);
 
         startApplicationAndWaitUntilRunning(kafkaStreams);
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
-            streamOne,
-            Arrays.asList(
-                KeyValue.pair("a", "1"),
-                KeyValue.pair("a", "2"),
-                KeyValue.pair("b", "3"),
-                KeyValue.pair("b", "4")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                StringSerializer.class,
-                StringSerializer.class,
-                new Properties()),
-            mockTime);
+                streamOne,
+                Arrays.asList(
+                        KeyValue.pair("a", "1"),
+                        KeyValue.pair("a", "2"),
+                        KeyValue.pair("b", "3"),
+                        KeyValue.pair("b", "4")),
+                TestUtils.producerConfig(
+                        CLUSTER.bootstrapServers(),
+                        StringSerializer.class,
+                        StringSerializer.class,
+                        new Properties()),
+                mockTime);
 
         final int maxWaitMs = 30000;
 
         final ReadOnlyKeyValueStore<String, String> store =
-            IntegrationTestUtils.getStore(storeName, kafkaStreams, 
keyValueStore());
+                IntegrationTestUtils.getStore(storeName, kafkaStreams, 
keyValueStore());
 
         TestUtils.waitForCondition(
-            () -> "12".equals(store.get("a")) && "34".equals(store.get("b")),
-            maxWaitMs,
-            "wait for agg to be <a,12> and <b,34>");
+                () -> "12".equals(store.get("a")) && 
"34".equals(store.get("b")),
+                maxWaitMs,
+                "wait for agg to be <a,12> and <b,34>");
 
         IntegrationTestUtils.produceKeyValuesSynchronously(
-            streamOne,
-            Collections.singleton(KeyValue.pair("a", "5")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                StringSerializer.class,
-                StringSerializer.class,
-                new Properties()),
-            mockTime);
-
-        TestUtils.waitForCondition(

Review Comment:
   Cf my comment above -- I think we need to keep this condition; we don't want 
to modify the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to