cadonna commented on code in PR #16432:
URL: https://github.com/apache/kafka/pull/16432#discussion_r1689250700
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java:
##########
@@ -80,12 +81,29 @@ public static void handleDeserializationFailure(final
DeserializationExceptionHa
final
ConsumerRecord<byte[], byte[]> rawRecord,
final Logger log,
final Sensor
droppedRecordsSensor) {
+ handleDeserializationFailure(deserializationExceptionHandler,
processorContext, deserializationException, rawRecord, log,
droppedRecordsSensor, null);
+ }
+
+ public static void handleDeserializationFailure(final
DeserializationExceptionHandler deserializationExceptionHandler,
+ final ProcessorContext<?,
?> processorContext,
+ final Exception
deserializationException,
+ final
ConsumerRecord<byte[], byte[]> rawRecord,
+ final Logger log,
+ final Sensor
droppedRecordsSensor,
+ final String
sourceNodeName) {
final DeserializationExceptionHandler.DeserializationHandlerResponse
response;
try {
- response = deserializationExceptionHandler.handle(
+ final DefaultErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
(InternalProcessorContext<?, ?>) processorContext,
- rawRecord,
- deserializationException);
+ rawRecord.topic(),
+ rawRecord.partition(),
+ rawRecord.offset(),
+ rawRecord.headers(),
+ rawRecord.key(),
+ rawRecord.value(),
+ sourceNodeName,
+ processorContext.taskId());
+ response =
deserializationExceptionHandler.handle(errorHandlerContext, rawRecord,
deserializationException);
} catch (final Exception fatalUserException) {
Review Comment:
I think, we misunderstood each other. On this line an exception originating
from the user-specified handler is caught. The exception can be anything that
is thrown by user code. We did not consider exceptions thrown from user code
for the processing exception handler.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]