cadonna commented on code in PR #17711:
URL: https://github.com/apache/kafka/pull/17711#discussion_r1839727730
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -538,13 +539,16 @@ public void flush() {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by
Streams itself
- if (exception instanceof StreamsException)
+ // In case of FailedProcessingException Do not keep
the failed processing exception in the stack trace
+ if (exception instanceof FailedProcessingException)
+ firstException = new
StreamsException(exception.getCause());
+ else if (exception instanceof StreamsException)
firstException = exception;
else
firstException = new ProcessorStateException(
format("%sFailed to flush state store %s",
logPrefix, store.name()), exception);
}
- log.error("Failed to flush state store {}: ",
store.name(), exception);
+ log.error("Failed to flush state store {}: ",
store.name(), exception.getCause());
Review Comment:
Why did you change this? If a `StreamsException` that is NOT a
`FailedProcessingException` is thrown, it would be perfectly fine to log the
exception.
You could do something like:
```java
log.error("Failed to flush state store {}: ", store.name(), exception
instanceof FailedProcessingException ? exception.getCause() : exception);
```
or maybe it is cleaner to copy the log message inside the `if
(firstException == null) {`.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -538,13 +539,16 @@ public void flush() {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by
Streams itself
- if (exception instanceof StreamsException)
+ // In case of FailedProcessingException Do not keep
the failed processing exception in the stack trace
+ if (exception instanceof FailedProcessingException)
Review Comment:
Could you please add unit tests to `ProcessorStateManagerTest` that verify
the correct behavior?
You should also verify the behavior when a processing error handler that
continues instead of fails is set.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -538,13 +539,16 @@ public void flush() {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by
Streams itself
- if (exception instanceof StreamsException)
+ // In case of FailedProcessingException Do not keep
the failed processing exception in the stack trace
+ if (exception instanceof FailedProcessingException)
+ firstException = new
StreamsException(exception.getCause());
Review Comment:
Before we introduced the `FailedProcessingException` we did not wrap the
exception into a plain `StreamsException` but we wrapped it into a
`ProcessorStateException` in the `else`-branch. We should keep that behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]