[
https://issues.apache.org/jira/browse/KAFKA-19430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18013562#comment-18013562
]
Matthias J. Sax commented on KAFKA-19430:
-----------------------------------------
Well, that's all up for discussion I guess :) – And there is multiple issue we
need to consider.
# The idea to maybe reuse `DeserializationExceptionHandler` was, to avoid
having too many callback interfaces, and to keep the API surface area small.
Even if this error is not really about deserialization, it's about a broken
read-path, so if we squint a little bit, it might be fine to reuse (misuse?)
the existing handler? (Btw: there is actually another ticket to add RETRY to
the other handlers https://issues.apache.org/jira/browse/KAFKA-17441, so it's
for sure a valid option to add RETRY and reuse
`DeserializationExceptionHandler` for this case. But I am not even sure if we
would a RETRY option (cf below) – also, the user wanted to be able to skip over
this error via CONTINUE).
# Of course, there is some mismatch because `CorruptRecordException` is for a
batch of records, and thus, we don't have a `ConsumerRecord` to pass into the
handler...
# I was also just looking into the code a little bit more, and also into the
originally reported stack trace, and it actually seems that we might not even
get `CorruptRecordException` directly, but that `CorruptRecordException` is
used internally only, and the consumer throws a `KafkaException` from `poll()`
for this case instead – this would make it even more difficult to handle this
case. – And it make it questionable if RETRY is actually a valid option?
# In the end, yes, we could also have one unified handler for read and write
path, but would it really make it easier to use? What API would you propose to
unify both?
I also don't have all the answers, and that is what I meant by "I just don't
know how complex it will be". Happy to keep brainstorming a little bit, but it
could also turn out, that this issue will be a "won't fix", as it might be too
complex (or we would also change the consumer to maybe throw
`CorruptRecordException` directly). – We might also need a KIP for this, if we
make too complex changes, or touch any public APIs.
Maybe [~lianetm] and/or [~kirktrue] could comment, as they now the consumer
internals much better. Would be good to better understand in what internal
state the consumer is, when such an error happens. – But even if the error is
not retrieable directly, we could always close the consumer, do some cleanup,
create a new one, and resume from the last committed offset (well, that's
technically already possible via REPLACE_THREAD option of the uncaught
exception handler). Btw: if the consumer is in a non-reusable state for this
error, we would need to also do some clever cleanup for the CONTINUE case...
> Don't fail on RecordCorruptedException
> --------------------------------------
>
> Key: KAFKA-19430
> URL: https://issues.apache.org/jira/browse/KAFKA-19430
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Matthias J. Sax
> Assignee: Uladzislau Blok
> Priority: Major
>
> From [https://github.com/confluentinc/kafka-streams-examples/issues/524]
> Currently, the existing `DeserializationExceptionHandler` is applied when
> de-serializing the record key/value byte[] inside Kafka Streams. This implies
> that a `RecordCorruptedException` is not handled.
> We should explore to not let Kafka Streams crash, but maybe retry this error
> automatically (as `RecordCorruptedException extends RetriableException`), and
> find a way to pump the error into the existing exception handler.
> If the error is transient, user can still use `REPLACE_THREAD` in the
> uncaught exception handler, but this is a rather heavy weight approach.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)