loicgreffier commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1619416340
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java:
##########
@@ -288,7 +301,31 @@ private <K, V> void forwardInternal(final ProcessorNode<K,
V, ?, ?> child,
final Record<K, V> record) {
setCurrentNode(child);
- child.process(record);
+ try {
Review Comment:
@cadonna Catching exceptions right here:
https://github.com/apache/kafka/blob/0f0c9ecbf330923ad653cc2ff4fca6c4dced1cf7/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L847
was our very first intention before we decided to move it to
`ProcessorContextImpl#forwardInternal`.
Unless we're wrong, we cannot get the precise node name where the exception
occured at `StreamTask#doProcess` level.
Could we:
- catch exceptions at `StreamTask#doProcess` level
- catch exceptions at `ProcessorContextImpl#forwardInternal` level, but
rather than rethrowing a `StreamsException`
(https://github.com/loicgreffier/kafka/blob/960c2a3153b30e48963387b5756b0310275bf48b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L320),
it can throw a `new ChildNodeProcessingException(child.name, e)` caught and
handled at StreamTask#doProcess level.
`ChildNodeProcessingException` would be a new exception acting as a wrapper
of the root cause, that additionally provides the name of the child node in
which an exception occurs... letting the first node know about something
happened in one of its children.
Hope it is clear, if it sounds like a plan to you, we can give a try to this
impl
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]