loicgreffier commented on code in PR #16432:
URL: https://github.com/apache/kafka/pull/16432#discussion_r1697456251


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java:
##########
@@ -80,12 +81,29 @@ public static void handleDeserializationFailure(final 
DeserializationExceptionHa
                                                     final 
ConsumerRecord<byte[], byte[]> rawRecord,
                                                     final Logger log,
                                                     final Sensor 
droppedRecordsSensor) {
+        handleDeserializationFailure(deserializationExceptionHandler, 
processorContext, deserializationException, rawRecord, log, 
droppedRecordsSensor, null);

Review Comment:
   @mjsax I've been looking for a way to update `GlobalStateManagerImpl` so we 
can remove this method. But it looks like we have no source node name in 
`GlobalStateManagerImpl`. There's a "source" processor: 
   
   
https://github.com/apache/kafka/blob/010ab19b724ae011e85686ce47320f4f85d9a11f/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L315
   
   But it doesn't look to be named. E.g., in the following scenario:
   
   ```java
   streamsBuilder
       .addGlobalStore(storeBuilder, "PERSON_TOPIC",
           Consumed.with(Serdes.String(), 
SerdesUtils.specificAvroValueSerdes()),
           () -> (Processor<String, SpecificRecord, Void, Void>) record -> 
log.info("Processing record with key = {}, value = {}", record.key(), 
record.value())); 
   // Source var is this processor ⬆️
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to