loicgreffier commented on code in PR #16432:
URL: https://github.com/apache/kafka/pull/16432#discussion_r1687815892


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java:
##########
@@ -80,12 +81,29 @@ public static void handleDeserializationFailure(final 
DeserializationExceptionHa
                                                     final 
ConsumerRecord<byte[], byte[]> rawRecord,
                                                     final Logger log,
                                                     final Sensor 
droppedRecordsSensor) {
+        handleDeserializationFailure(deserializationExceptionHandler, 
processorContext, deserializationException, rawRecord, log, 
droppedRecordsSensor, null);
+    }
+
+    public static void handleDeserializationFailure(final 
DeserializationExceptionHandler deserializationExceptionHandler,
+                                                    final ProcessorContext<?, 
?> processorContext,
+                                                    final Exception 
deserializationException,
+                                                    final 
ConsumerRecord<byte[], byte[]> rawRecord,
+                                                    final Logger log,
+                                                    final Sensor 
droppedRecordsSensor,
+                                                    final String 
sourceNodeName) {
         final DeserializationExceptionHandler.DeserializationHandlerResponse 
response;
         try {
-            response = deserializationExceptionHandler.handle(
+            final DefaultErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
                 (InternalProcessorContext<?, ?>) processorContext,
-                rawRecord,
-                deserializationException);
+                rawRecord.topic(),
+                rawRecord.partition(),
+                rawRecord.offset(),
+                rawRecord.headers(),
+                rawRecord.key(),
+                rawRecord.value(),
+                sourceNodeName,
+                processorContext.taskId());
+            response = 
deserializationExceptionHandler.handle(errorHandlerContext, rawRecord, 
deserializationException);
         } catch (final Exception fatalUserException) {

Review Comment:
   @cadonna As processing exception handler handles exception at 
`ProcessorNode`, indeed exceptions during deserialization are not caught right 
now. Is it your point?
   
   Maybe we should consider catching and handling exceptions when getting the 
`nextRecord` at `StreamTask` level right before processing it: 
   
   
https://github.com/apache/kafka/blob/a5bfc2190c3448039c9361909e547f64f7fdb6e2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L770
   
   Going to sleep on it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to