mjsax commented on code in PR #16988:
URL: https://github.com/apache/kafka/pull/16988#discussion_r1755671800
##########
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java:
##########
@@ -1085,77 +1086,72 @@ public void shouldAllowToQueryAfterThreadDied() throws
Exception {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> input = builder.stream(streamOne);
input
- .groupByKey()
- .reduce((value1, value2) -> {
- if (value1.length() > 1) {
- if (beforeFailure.compareAndSet(true, false)) {
- throw new RuntimeException("Injected test exception");
+ .groupByKey()
+ .reduce((value1, value2) -> {
+ if (value1.length() > 1) {
+ if (beforeFailure.compareAndSet(true, false)) {
+ throw new RuntimeException("Injected test
exception");
+ }
}
- }
- return value1 + value2;
- }, Materialized.as(storeName))
- .toStream()
- .to(outputTopic);
+ return value1 + value2;
+ }, Materialized.as(storeName))
+ .toStream()
+ .to(outputTopic);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
- kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
+ kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD);
Review Comment:
The test used the old handler, which translated to letting the thread die
IIRC? Is this change to replace the thread correct?
##########
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java:
##########
@@ -1085,77 +1086,72 @@ public void shouldAllowToQueryAfterThreadDied() throws
Exception {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> input = builder.stream(streamOne);
input
- .groupByKey()
- .reduce((value1, value2) -> {
- if (value1.length() > 1) {
- if (beforeFailure.compareAndSet(true, false)) {
- throw new RuntimeException("Injected test exception");
+ .groupByKey()
Review Comment:
nit: seems this is just reformatting changes? (Make the diff hard to read --
please avoid, so we can focus on actual changes)
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -488,18 +461,7 @@ public void setUncaughtExceptionHandler(final
StreamsUncaughtExceptionHandler us
}
private void defaultStreamsUncaughtExceptionHandler(final Throwable
throwable, final boolean skipThreadReplacement) {
Review Comment:
Given that this method just call the other one, it seems we can remove this
method, and let the caller call ` handleStreamsUncaughtException` directly?
##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -561,12 +561,12 @@ public void
shouldNotViolateEosIfOneTaskFailsWithState(final String eosConfig, f
getMaxPerKey(expectedResultBeforeFailure),
"The state store content before failure do not match what
expected");
- errorInjected.set(true);
+ //errorInjected.set(true);
Review Comment:
Seems like an invalid change? (btw: this test failed on the CI build -- I
assume this unnecessary change is the reason why it's failing?)
##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -442,8 +442,8 @@ public void shouldNotViolateEosIfOneTaskFails(final String
eosConfig, final bool
writeInputData(dataAfterFailure);
waitForCondition(
- () -> uncaughtException != null, MAX_WAIT_TIME_MS,
- "Should receive uncaught exception from one StreamThread.");
+ () -> uncaughtException != null, MAX_WAIT_TIME_MS,
+ "Should receive uncaught exception from one
StreamThread.");
Review Comment:
nit: avoid unnecessary reformatting (maybe need to change your IDE setting
to disable auto-reformatting)
##########
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -1170,16 +1170,11 @@ public void process(final Record<Long, Long> record) {
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
- streams.setUncaughtExceptionHandler((t, e) -> {
- if (uncaughtException != null ||
- !(e instanceof StreamsException) ||
- !e.getCause().getMessage().equals("Injected test exception."))
{
- e.printStackTrace(System.err);
- hasUnexpectedError = true;
- }
- uncaughtException = e;
+ streams.setUncaughtExceptionHandler(e -> {
Review Comment:
Seems the new impl for the handler is different than the original one. Why?
##########
streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java:
##########
@@ -1085,77 +1086,72 @@ public void shouldAllowToQueryAfterThreadDied() throws
Exception {
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> input = builder.stream(streamOne);
input
- .groupByKey()
- .reduce((value1, value2) -> {
- if (value1.length() > 1) {
- if (beforeFailure.compareAndSet(true, false)) {
- throw new RuntimeException("Injected test exception");
+ .groupByKey()
+ .reduce((value1, value2) -> {
+ if (value1.length() > 1) {
+ if (beforeFailure.compareAndSet(true, false)) {
+ throw new RuntimeException("Injected test
exception");
+ }
}
- }
- return value1 + value2;
- }, Materialized.as(storeName))
- .toStream()
- .to(outputTopic);
+ return value1 + value2;
+ }, Materialized.as(storeName))
+ .toStream()
+ .to(outputTopic);
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
- kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
+ kafkaStreams.setUncaughtExceptionHandler(exception -> REPLACE_THREAD);
startApplicationAndWaitUntilRunning(kafkaStreams);
IntegrationTestUtils.produceKeyValuesSynchronously(
- streamOne,
- Arrays.asList(
- KeyValue.pair("a", "1"),
- KeyValue.pair("a", "2"),
- KeyValue.pair("b", "3"),
- KeyValue.pair("b", "4")),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- mockTime);
+ streamOne,
+ Arrays.asList(
+ KeyValue.pair("a", "1"),
+ KeyValue.pair("a", "2"),
+ KeyValue.pair("b", "3"),
+ KeyValue.pair("b", "4")),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ mockTime);
final int maxWaitMs = 30000;
final ReadOnlyKeyValueStore<String, String> store =
- IntegrationTestUtils.getStore(storeName, kafkaStreams,
keyValueStore());
+ IntegrationTestUtils.getStore(storeName, kafkaStreams,
keyValueStore());
TestUtils.waitForCondition(
- () -> "12".equals(store.get("a")) && "34".equals(store.get("b")),
- maxWaitMs,
- "wait for agg to be <a,12> and <b,34>");
+ () -> "12".equals(store.get("a")) &&
"34".equals(store.get("b")),
+ maxWaitMs,
+ "wait for agg to be <a,12> and <b,34>");
IntegrationTestUtils.produceKeyValuesSynchronously(
- streamOne,
- Collections.singleton(KeyValue.pair("a", "5")),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- mockTime);
-
- TestUtils.waitForCondition(
Review Comment:
Cf my comment above -- I think we need to keep this condition; we don't want
to modify the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]