Loïc Greffier created KAFKA-17099:
-------------------------------------
Summary: Improve the process exception logs with the exact
processor node where the exception occurs
Key: KAFKA-17099
URL: https://issues.apache.org/jira/browse/KAFKA-17099
Project: Kafka
Issue Type: Improvement
Reporter: Loïc Greffier
h3. Current Behaviour
When an exception occurs in a processor node, the task executor does not log
the actual processor node where the exception occurs.
For example, considering the following topology:
```
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [PERSON_TOPIC])
--> KSTREAM-PEEK-0000000001
Processor: KSTREAM-PEEK-0000000001 (stores: [])
--> KSTREAM-MAP-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAP-0000000002 (stores: [])
--> KSTREAM-SINK-0000000003
<-- KSTREAM-PEEK-0000000001
Sink: KSTREAM-SINK-0000000003 (topic: PERSON_MAP_TOPIC)
<-- KSTREAM-MAP-0000000002
```
When an exception is thrown in the processor KSTREAM-MAP-0000000002, the
following information will be logged by the task executor:
2024-07-08T22:17:19.926+02:00 INFO 10552 --- [-StreamThread-1]
i.g.l.s.map.app.KafkaStreamsTopology : Received key = 0, value = \{"id": 0,
"firstName": "Ethan", "lastName": "Moore", "nationality": "CH", "birthDate":
"2011-02-21T15:45:12Z"}
2024-07-08T22:17:30.082+02:00 ERROR 10552 --- [-StreamThread-1]
o.a.k.s.p.internals.TaskExecutor : stream-thread
[streams-map-StreamThread-1] Failed to process stream task 0_0 due to the
following error:
org.apache.kafka.streams.errors.StreamsException: Exception caught in process.
taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC,
partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad
happened...
at
io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33)
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
at
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
at
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
at
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
at
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:767)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
~[kafka-streams-3.6.1.jar:na]
Caused by: java.lang.RuntimeException: Something bad happened...
at
io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33)
~[classes/:na]
at
org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
~[kafka-streams-3.6.1.jar:na]
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
~[kafka-streams-3.6.1.jar:na]
... 6 common frames omitted
On line #1 of the stack trace, it appears that an exception has been caught in
the processor KSTREAM-SOURCE-0000000000 while the exception actually occurred
in KSTREAM-MAP-0000000002.
h3. Expected Behaviour
The stack trace should provide the precise node in which the exception occurred
(e.g., KSTREAM-MAP-0000000002).
h3. Current Limitation
The current limitation is that processing exceptions are caught in the [stream
task#process|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L802]
where it is not possible to get the exact processor node where the exception
occurred.
h3. Improvement Proposal
With the changes brought by
[KAFKA-16448|https://issues.apache.org/jira/browse/KAFKA-16448], processing
exceptions will be caught at the processor node level and wrapped into an
internal exception named *FailedProcessingException* before being thrown to the
stream task.
This change should allow to identify the precise processor node where a
processing exception occurs and bring its name up to the stream task where it
will be used to build the
[StreamsException|https://github.com/apache/kafka/blob/25230b538841a5e7256b1b51725361dd59435901/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L803]
that will appear in the logs.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)