loicgreffier commented on PR #16093:
URL: https://github.com/apache/kafka/pull/16093#issuecomment-2197739588
@cadonna PR updated. `FailedProcessingException` has been introduced.
Note that:
➡️ 1. By default, `FailedProcessingException` appears in the logs:
```console
2024-06-28T23:03:34.862+02:00 ERROR 31724 --- [-StreamThread-1]
org.apache.kafka.streams.KafkaStreams : stream-client [streams-map]
Encountered the following exception during processing and sent shutdown request
for the entire application.
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC,
partition=0, offset=0,
stacktrace=org.apache.kafka.streams.processor.internals.FailedProcessingException:
java.lang.RuntimeException: Something bad happened...
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:217)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:95)
at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:848)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:848)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778)
at
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
at
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1982)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1000)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: java.lang.RuntimeException: Something bad happened...
at
com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32)
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
... 26 more
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:804)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1982)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1000)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
Caused by:
org.apache.kafka.streams.processor.internals.FailedProcessingException:
java.lang.RuntimeException: Something bad happened...
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:217)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:292)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:271)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:95)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:848)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:848)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
... 6 common frames omitted
Caused by: java.lang.RuntimeException: Something bad happened...
at
com.example.kstreamplify.sandbox.MyKafkaStreams.lambda$topology$2(MyKafkaStreams.java:32)
~[classes/:na]
at
org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:172)
~[kafka-streams-3.9.0-SNAPSHOT.jar:na]
... 26 common frames omitted
```
In case of `FailedProcessingException`, the cause is used to create the
`StreamsException` so the stack trace stays the same before and after the PR:
```java
Throwable processingException = e instanceof FailedProcessingException ?
e.getCause() : e;
final StreamsException error = new StreamsException(
String.format(
"Exception caught in process. taskId=%s, processor=%s, topic=%s,
partition=%d, offset=%d, stacktrace=%s",
id(),
processorContext.currentNode().name(),
record.topic(),
record.partition(),
record.offset(),
getStacktraceString(processingException)
),
processingException
```
---
➡️ 2. The processing exception handler logs:
```console
2024-06-28T23:03:34.743+02:00 WARN 31724 --- [-StreamThread-1]
s.e.LogAndFailProcessingExceptionHandler : Exception caught during message
processing, processor node: KSTREAM-MAPVALUES-0000000003, taskId: 0_0, source
topic: PERSON_TOPIC, source partition: 0, source offset: 0
```
While le `TaskExecutor` logs:
```console
2024-06-28T23:58:14.337+02:00 ERROR 28932 --- [-StreamThread-1]
org.apache.kafka.streams.KafkaStreams : stream-client [streams-map]
Encountered the following exception during processing and sent shutdown request
for the entire application.
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC,
partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad
happened...
```
Both log that an exception occurred in different processors.
Maybe this could be improved (or not) in another PR. The
`FailedProcessingException` could be used to pass the precise processor node
name where the exception occurred to the `StreamTask#process`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]