cadonna commented on code in PR #16222:
URL: https://github.com/apache/kafka/pull/16222#discussion_r1629028760
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -88,6 +88,7 @@ public RecordCollectorImpl(final LogContext logContext,
this.log = logContext.logger(getClass());
this.taskId = taskId;
this.streamsProducer = streamsProducer;
+ this.sendException = streamsProducer.sendException();
Review Comment:
That is basically the fix.
Notice that now an exception caused by one task can be thrown by a different
task. For example:
```java
2024-05-30 10:20:35,916] ERROR [i-0af25f5c2bd9bba31-StreamThread-1]
stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Error flushing caches of
dirty task 0_0 (org.apache.kafka.streams.processor.internals.TaskManager)
org.apache.kafka.streams.errors.TaskMigratedException: Error encountered
sending record to topic stream-soak-test-network-id-repartition for task 1_1
due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent
since the producer is fenced, indicating the task may be migrated out; it means
all tasks belonging to this thread should be migrated.
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:305)
```
Task `0_0` throws error caused by `1_1`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]