[
https://issues.apache.org/jira/browse/KAFKA-10829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang updated KAFKA-10829:
----------------------------------
Labels: new-streams-runtime-should-fix (was: )
> Kafka Streams handle produce exception improvement
> --------------------------------------------------
>
> Key: KAFKA-10829
> URL: https://issues.apache.org/jira/browse/KAFKA-10829
> Project: Kafka
> Issue Type: Improvement
> Components: producer , streams
> Reporter: Guozhang Wang
> Priority: Major
> Labels: new-streams-runtime-should-fix
>
> A summary of some recent discussions on how we should improve on embedded
> producer exception handling.
> Note that below the basline logic would guarantee that our correctness
> semantics is not violated; and optimization are on top of the baseline to
> reduce the user's burden by letting the library auto-handle certain types of
> exception.
> 1) ``Producer.send()`` throw exception directly:
> 1.a) baseline (to make sure correctness) logic is to always wrap them as
> StreamsException, it would cause the thread to shutdown and exception handler
> triggered. The handler could look into the wrapped exception and decide
> whether the shutdown thread can be restarted.
> 1.b) optimization is to look at the exception, and decide if they can be
> wrapped as TaskMigratedException instead (e.g. ProducerFenced). This would
> then be auto-handled by lost-all-tasks and re-join.
> 2) ``Producer.send()`` Callback has an exception:
> 2.a) baseline is first to check if the exception is instanceof
> RetriableException.
> If not retriable, pass it to the producer exception handler to decide whether
> to throw or to continue with record dropped. If decide to throw, always warp
> it as StreamsException and keep it locally; at the same time do not send more
> records from the caller. In the next send call, check the remembered
> exception and throw. It would cause the thread to shutdown and exception
> handler triggered.
> If the exception is not Retriable, always throw it as a fatal
> StreamsException.
> 2.b) optimization one: if the non-retriable exception can be translated as a
> TaskMigratedException, then do not wrap it as StreamsException to let the
> library handle internally.
> 2.c) optimization two: if the retriable exception is a timeout exception,
> then do not pass to the produce exception handler and treat it as
> TaskMigrated.
> 3) ``Producer.XXXTxn`` APIs except ``AbortTxn`` throw exception directly:
> 3.a) baseline logic is to capture all KafkaException except TimeoutException,
> and handle them as *TaskCorrupted* (which include abort the transaction,
> reset the state, and re-join the group). TimeoutException would be rethrown.
> 3.b) optimization: some exceptions can be handled as TaskMigrated, which
> would be handled in a lighter way.
> 4) ``Producer.abortTxn`` throw exception:
> 3.a) baseline logic is to capture all KafkaException except TimeoutException
> as fatal StreamsException. TimeoutException would be rethrown.
> 3.b) optimization: some exceptions can be ignored (e.g. invalidTxnTransition
> means the abort did not succeeded).
--
This message was sent by Atlassian Jira
(v8.20.1#820001)