mjsax commented on code in PR #21535:
URL: https://github.com/apache/kafka/pull/21535#discussion_r2893711125


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition> 
topicPartitions,
                             record.headers());
                     globalProcessorContext.setRecordContext(recordContext);
 
-                    try {
-                        if (record.key() != null) {
-                            source.process(new Record(
+                    if (record.key() != null) {
+                        // Deserialization phase
+                        final Record<?, ?> deserializedRecord;
+                        try {
+                            deserializedRecord = new Record<>(
                                 
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
                                 
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
                                 record.timestamp(),
-                                record.headers()));
+                                record.headers());
+                        } catch (final Exception deserializationException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            handleDeserializationFailure(
+                                deserializationExceptionHandler,
+                                globalProcessorContext,
+                                deserializationException,
+                                record,
+                                log,
+                                droppedRecordsSensor(
+                                    Thread.currentThread().getName(),
+                                    globalProcessorContext.taskId().toString(),
+                                    globalProcessorContext.metrics()
+                                ),
+                                null
+                            );
+                            continue; // Skip this record
+                        }
+                        final ProcessingExceptionHandler.Response response;
+                        // Processing phase
+                        try {
+                            ((Processor) source).process(deserializedRecord);
                             restoreCount++;
                             batchRestoreCount++;
+                        } catch (final Exception processingException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            if (processingExceptionHandler != null) {
+                                final ErrorHandlerContext errorHandlerContext 
= new DefaultErrorHandlerContext(
+                                    globalProcessorContext,
+                                    record.topic(),
+                                    record.partition(),
+                                    record.offset(),
+                                    record.headers(),
+                                    null,
+                                    globalProcessorContext.taskId(),
+                                    record.timestamp(),
+                                    record.key(),
+                                    record.value()
+                                );
+                                try {
+                                    response =
+                                        
Objects.requireNonNull(processingExceptionHandler.handleError(
+                                            errorHandlerContext,
+                                            deserializedRecord,
+                                            processingException
+                                        ), "Invalid ProcessingExceptionHandler 
response");
+                                    if 
(!response.deadLetterQueueRecords().isEmpty()) {
+                                        log.warn("Dead letter queue records 
cannot be sent for GlobalKTable processors " +
+                                                "(no producer available). DLQ 
support for GlobalKTable will be addressed in a future KIP. " + "Record 
context: {}",
+                                            errorHandlerContext);
+                                    }
+                                } catch (final Exception fatalUserException) {
+                                    log.error(
+                                            "Processing error callback failed 
after processing error for record: {}",
+                                            errorHandlerContext,
+                                            processingException
+                                    );
+                                    throw new FailedProcessingException(
+                                            "Fatal user code error in 
processing error callback",
+                                            null,
+                                            fatalUserException
+                                    );
+                                }
+                                
+                                if (response.result() == 
ProcessingExceptionHandler.Result.FAIL) {
+                                    log.error("Processing exception handler is 
set to fail upon" +
+                                            " a processing error. If you would 
rather have the streaming pipeline" +
+                                            " continue after a processing 
error, please set the " +
+                                            
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");
+                                    throw new FailedProcessingException(null, 
processingException);
+                                }
+                                // RESUME - log and continue
+                                log.warn("Processing exception handler chose 
to resume for record at offset {}", record.offset(), processingException);
+                                droppedRecordsSensor(

Review Comment:
   Should we refactor this, and add a member variable `Sensor 
droppedRecordSensor` which we set inside `initialze()` and just use here? And 
also change the code in L343 above?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition> 
topicPartitions,
                             record.headers());
                     globalProcessorContext.setRecordContext(recordContext);
 
-                    try {
-                        if (record.key() != null) {
-                            source.process(new Record(
+                    if (record.key() != null) {
+                        // Deserialization phase
+                        final Record<?, ?> deserializedRecord;
+                        try {
+                            deserializedRecord = new Record<>(
                                 
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
                                 
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
                                 record.timestamp(),
-                                record.headers()));
+                                record.headers());
+                        } catch (final Exception deserializationException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            handleDeserializationFailure(
+                                deserializationExceptionHandler,
+                                globalProcessorContext,
+                                deserializationException,
+                                record,
+                                log,
+                                droppedRecordsSensor(
+                                    Thread.currentThread().getName(),
+                                    globalProcessorContext.taskId().toString(),
+                                    globalProcessorContext.metrics()
+                                ),
+                                null
+                            );
+                            continue; // Skip this record
+                        }
+                        final ProcessingExceptionHandler.Response response;
+                        // Processing phase
+                        try {
+                            ((Processor) source).process(deserializedRecord);
                             restoreCount++;
                             batchRestoreCount++;
+                        } catch (final Exception processingException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            if (processingExceptionHandler != null) {
+                                final ErrorHandlerContext errorHandlerContext 
= new DefaultErrorHandlerContext(
+                                    globalProcessorContext,
+                                    record.topic(),
+                                    record.partition(),
+                                    record.offset(),
+                                    record.headers(),
+                                    null,
+                                    globalProcessorContext.taskId(),
+                                    record.timestamp(),
+                                    record.key(),
+                                    record.value()
+                                );
+                                try {
+                                    response =
+                                        
Objects.requireNonNull(processingExceptionHandler.handleError(
+                                            errorHandlerContext,
+                                            deserializedRecord,
+                                            processingException
+                                        ), "Invalid ProcessingExceptionHandler 
response");
+                                    if 
(!response.deadLetterQueueRecords().isEmpty()) {
+                                        log.warn("Dead letter queue records 
cannot be sent for GlobalKTable processors " +
+                                                "(no producer available). DLQ 
support for GlobalKTable will be addressed in a future KIP. " + "Record 
context: {}",
+                                            errorHandlerContext);
+                                    }
+                                } catch (final Exception fatalUserException) {
+                                    log.error(
+                                            "Processing error callback failed 
after processing error for record: {}",
+                                            errorHandlerContext,
+                                            processingException
+                                    );
+                                    throw new FailedProcessingException(
+                                            "Fatal user code error in 
processing error callback",
+                                            null,
+                                            fatalUserException
+                                    );
+                                }
+                                
+                                if (response.result() == 
ProcessingExceptionHandler.Result.FAIL) {
+                                    log.error("Processing exception handler is 
set to fail upon" +
+                                            " a processing error. If you would 
rather have the streaming pipeline" +
+                                            " continue after a processing 
error, please set the " +
+                                            
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");
+                                    throw new FailedProcessingException(null, 
processingException);
+                                }
+                                // RESUME - log and continue
+                                log.warn("Processing exception handler chose 
to resume for record at offset {}", record.offset(), processingException);

Review Comment:
   I don't think this is a WARN -- not even sure if we would want to log at 
all? And if, only at DEBUG or TRACE I would say?
   
   We don't log anything in existing code using processor exception handler -- 
we record as a metric which we believe is enough.



##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -1484,6 +1485,30 @@ Serde for the inner class of a windowed record. Must 
implement the `Serde` inter
 >         }
 >     }
 
+### processing.exception.handler.global.enabled

Review Comment:
   ```suggestion
   ### processing.exception.handler.global.enabled (deprecated)
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -254,23 +252,29 @@ public void process(final Record<KIn, VIn> record) {
 
             final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords 
= response.deadLetterQueueRecords();
             if (!deadLetterQueueRecords.isEmpty()) {
-                final RecordCollector collector = ((RecordCollector.Supplier) 
internalProcessorContext).recordCollector();
-                for (final ProducerRecord<byte[], byte[]> 
deadLetterQueueRecord : deadLetterQueueRecords) {
-                    collector.send(
-                            deadLetterQueueRecord.key(),
-                            deadLetterQueueRecord.value(),
-                            name(),
-                            internalProcessorContext,
-                            deadLetterQueueRecord
-                    );
+                if (!(internalProcessorContext instanceof 
RecordCollector.Supplier)) {
+                    log.warn("Dead letter queue records cannot be sent for 
GlobalKTable processors " +
+                            "(no producer available). DLQ support for 
GlobalKTable will be addressed in a future KIP. " + "Record context: {}",

Review Comment:
   ```suggestion
                               "DLQ support for global state/KTable will be 
addressed in a future release. " + "Record context: {}",
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition> 
topicPartitions,
                             record.headers());
                     globalProcessorContext.setRecordContext(recordContext);
 
-                    try {
-                        if (record.key() != null) {
-                            source.process(new Record(
+                    if (record.key() != null) {
+                        // Deserialization phase
+                        final Record<?, ?> deserializedRecord;
+                        try {
+                            deserializedRecord = new Record<>(
                                 
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
                                 
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
                                 record.timestamp(),
-                                record.headers()));
+                                record.headers());
+                        } catch (final Exception deserializationException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            handleDeserializationFailure(
+                                deserializationExceptionHandler,
+                                globalProcessorContext,
+                                deserializationException,
+                                record,
+                                log,
+                                droppedRecordsSensor(
+                                    Thread.currentThread().getName(),
+                                    globalProcessorContext.taskId().toString(),
+                                    globalProcessorContext.metrics()
+                                ),
+                                null
+                            );
+                            continue; // Skip this record
+                        }
+                        final ProcessingExceptionHandler.Response response;
+                        // Processing phase
+                        try {
+                            ((Processor) source).process(deserializedRecord);
                             restoreCount++;
                             batchRestoreCount++;
+                        } catch (final Exception processingException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            if (processingExceptionHandler != null) {
+                                final ErrorHandlerContext errorHandlerContext 
= new DefaultErrorHandlerContext(
+                                    globalProcessorContext,
+                                    record.topic(),
+                                    record.partition(),
+                                    record.offset(),
+                                    record.headers(),
+                                    null,
+                                    globalProcessorContext.taskId(),
+                                    record.timestamp(),
+                                    record.key(),
+                                    record.value()
+                                );
+                                try {
+                                    response =
+                                        
Objects.requireNonNull(processingExceptionHandler.handleError(
+                                            errorHandlerContext,
+                                            deserializedRecord,
+                                            processingException
+                                        ), "Invalid ProcessingExceptionHandler 
response");
+                                    if 
(!response.deadLetterQueueRecords().isEmpty()) {
+                                        log.warn("Dead letter queue records 
cannot be sent for GlobalKTable processors " +
+                                                "(no producer available). DLQ 
support for GlobalKTable will be addressed in a future KIP. " + "Record 
context: {}",

Review Comment:
   ```suggestion
                                                   "DLQ support for global 
state/KTable will be added in a future release. " + "Record context: {}",
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition> 
topicPartitions,
                             record.headers());
                     globalProcessorContext.setRecordContext(recordContext);
 
-                    try {
-                        if (record.key() != null) {
-                            source.process(new Record(
+                    if (record.key() != null) {
+                        // Deserialization phase
+                        final Record<?, ?> deserializedRecord;
+                        try {
+                            deserializedRecord = new Record<>(
                                 
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
                                 
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
                                 record.timestamp(),
-                                record.headers()));
+                                record.headers());
+                        } catch (final Exception deserializationException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            handleDeserializationFailure(
+                                deserializationExceptionHandler,
+                                globalProcessorContext,
+                                deserializationException,
+                                record,
+                                log,
+                                droppedRecordsSensor(
+                                    Thread.currentThread().getName(),
+                                    globalProcessorContext.taskId().toString(),
+                                    globalProcessorContext.metrics()
+                                ),
+                                null
+                            );
+                            continue; // Skip this record
+                        }
+                        final ProcessingExceptionHandler.Response response;
+                        // Processing phase
+                        try {
+                            ((Processor) source).process(deserializedRecord);
                             restoreCount++;
                             batchRestoreCount++;
+                        } catch (final Exception processingException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            if (processingExceptionHandler != null) {
+                                final ErrorHandlerContext errorHandlerContext 
= new DefaultErrorHandlerContext(
+                                    globalProcessorContext,
+                                    record.topic(),
+                                    record.partition(),
+                                    record.offset(),
+                                    record.headers(),
+                                    null,
+                                    globalProcessorContext.taskId(),
+                                    record.timestamp(),
+                                    record.key(),
+                                    record.value()
+                                );
+                                try {
+                                    response =
+                                        
Objects.requireNonNull(processingExceptionHandler.handleError(
+                                            errorHandlerContext,
+                                            deserializedRecord,
+                                            processingException
+                                        ), "Invalid ProcessingExceptionHandler 
response");
+                                    if 
(!response.deadLetterQueueRecords().isEmpty()) {
+                                        log.warn("Dead letter queue records 
cannot be sent for GlobalKTable processors " +

Review Comment:
   ```suggestion
                                           log.warn("Dead letter queue records 
cannot be sent for global state/KTable processors. " +
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -861,6 +843,13 @@ public class StreamsConfig extends AbstractConfig {
             ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
             ProducerConfig.TRANSACTIONAL_ID_CONFIG
         };
+    @Deprecated

Review Comment:
   ```suggestion
   
       /**
        * {@code processing.exception.handler.global.enabled}
        * @deprecated Since 4.3. Default will change to {@code true} when 
removed.
        */
       @Deprecated
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition> 
topicPartitions,
                             record.headers());
                     globalProcessorContext.setRecordContext(recordContext);
 
-                    try {
-                        if (record.key() != null) {
-                            source.process(new Record(
+                    if (record.key() != null) {
+                        // Deserialization phase
+                        final Record<?, ?> deserializedRecord;
+                        try {
+                            deserializedRecord = new Record<>(
                                 
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
                                 
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
                                 record.timestamp(),
-                                record.headers()));
+                                record.headers());
+                        } catch (final Exception deserializationException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            handleDeserializationFailure(
+                                deserializationExceptionHandler,
+                                globalProcessorContext,
+                                deserializationException,
+                                record,
+                                log,
+                                droppedRecordsSensor(
+                                    Thread.currentThread().getName(),
+                                    globalProcessorContext.taskId().toString(),
+                                    globalProcessorContext.metrics()
+                                ),
+                                null
+                            );
+                            continue; // Skip this record
+                        }
+                        final ProcessingExceptionHandler.Response response;
+                        // Processing phase
+                        try {
+                            ((Processor) source).process(deserializedRecord);
                             restoreCount++;
                             batchRestoreCount++;
+                        } catch (final Exception processingException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            if (processingExceptionHandler != null) {
+                                final ErrorHandlerContext errorHandlerContext 
= new DefaultErrorHandlerContext(
+                                    globalProcessorContext,
+                                    record.topic(),
+                                    record.partition(),
+                                    record.offset(),
+                                    record.headers(),
+                                    null,

Review Comment:
   Can't we use `globalProcessorContext.currentNode().name()`  here?



##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -1446,7 +1447,7 @@ Serde for the inner class of a windowed record. Must 
implement the `Serde` inter
 
 > The processing exception handler allows you to manage exceptions triggered 
 > during the processing of a record. The implemented exception handler needs 
 > to return a `FAIL` or `CONTINUE` depending on the record and the exception 
 > thrown. Returning `FAIL` will signal that Streams should shut down and 
 > `CONTINUE` will signal that Streams should ignore the issue and continue 
 > processing.
 > 
-> **Note:** This handler applies only to regular stream processing tasks. It 
does not apply to global state store updates (global threads). Exceptions 
occurring in global threads will bubble up to the configured uncaught exception 
handler.
+> **Note:** By default, this handler applies only to regular stream processing 
tasks. To enable exception handling for Global KTable processing, see 
`processing.exception.handler.global.enabled` below. When global exception 
handling is disabled (default), exceptions occurring during Global KTable 
processing will bubble up to the configured uncaught exception handler.

Review Comment:
   ```suggestion
   > **Note:** By default, this handler applies only to regular stream 
processing tasks. To enable exception handling for global stores/KTable 
processing (which is recommended), see 
`processing.exception.handler.global.enabled` below. When global exception 
handling is disabled (default), exceptions occurring during global store/KTable 
processing will bubble up to the configured uncaught exception handler.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1274,7 +1257,12 @@ public class StreamsConfig extends AbstractConfig {
                     Type.LONG,
                     null,
                     Importance.LOW,
-                    WINDOW_SIZE_MS_DOC);
+                    WINDOW_SIZE_MS_DOC)
+            .define(PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
+                    Type.BOOLEAN,
+                    false,
+                    Importance.HIGH,

Review Comment:
   We order config per importance -- HIGH goes into the first section (we order 
by name alphabetically within each section)



##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -1484,6 +1485,30 @@ Serde for the inner class of a windowed record. Must 
implement the `Serde` inter
 >         }
 >     }
 
+### processing.exception.handler.global.enabled
+
+> Controls whether the configured `ProcessingExceptionHandler` is invoked for 
exceptions occurring during Global KTable processing. When set to `true`, the 
handler specified via `processing.exception.handler` will be invoked for 
exceptions occurring during Global KTable processing. When set to `false` 
(default), exceptions from Global KTables will not invoke the processing 
exception handler and will instead bubble up to the configured uncaught 
exception handler.

Review Comment:
   ```suggestion
   > Controls whether the configured `ProcessingExceptionHandler` is invoked 
for exceptions occurring during global store/KTable processing. When set to 
`true` (recommended), the handler specified via `processing.exception.handler` 
will be invoked for exceptions occurring during global store/KTable processing. 
When set to `false` (default), exceptions from global store/KTables will not 
invoke the processing exception handler and will instead bubble up to the 
configured uncaught exception handler.
   ```



##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -1484,6 +1485,30 @@ Serde for the inner class of a windowed record. Must 
implement the `Serde` inter
 >         }
 >     }
 
+### processing.exception.handler.global.enabled
+
+> Controls whether the configured `ProcessingExceptionHandler` is invoked for 
exceptions occurring during Global KTable processing. When set to `true`, the 
handler specified via `processing.exception.handler` will be invoked for 
exceptions occurring during Global KTable processing. When set to `false` 
(default), exceptions from Global KTables will not invoke the processing 
exception handler and will instead bubble up to the configured uncaught 
exception handler.
+> 
+> **Default value:** `false`
+> 
+> **Important Notes:**
+> 
+>   * Dead Letter Queue (DLQ) functionality is not supported for Global 
KTables. For Global KTable exceptions, the record metadata will be logged and 
the record will not be sent to the DLQ.

Review Comment:
   ```suggestion
   >   * Dead Letter Queue (DLQ) functionality is not supported for global 
state/KTables. For global state/KTable exceptions, the record metadata will be 
logged and the record will not be sent to the DLQ.
   ```



##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -1484,6 +1485,30 @@ Serde for the inner class of a windowed record. Must 
implement the `Serde` inter
 >         }
 >     }
 
+### processing.exception.handler.global.enabled
+
+> Controls whether the configured `ProcessingExceptionHandler` is invoked for 
exceptions occurring during Global KTable processing. When set to `true`, the 
handler specified via `processing.exception.handler` will be invoked for 
exceptions occurring during Global KTable processing. When set to `false` 
(default), exceptions from Global KTables will not invoke the processing 
exception handler and will instead bubble up to the configured uncaught 
exception handler.
+> 
+> **Default value:** `false`

Review Comment:
   Insert:
   ```
   **Deprecated:** The config is deprecated for removal in 5.0 release. With 
the removal of the config, the processing exception handler will be applied 
during global state/KTable processing and cannot be disabled any longer. Thus, 
it's recommended to enable this config now, to avoid backward incompatibilities 
in the future.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -404,6 +405,7 @@ private StateConsumer initialize() {
                     globalProcessorContext,
                     stateMgr,
                     config.deserializationExceptionHandler(),
+                    processingExceptionHandler,

Review Comment:
   Should be fine as-is. There is no contract that says we create only one 
instance.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -254,23 +252,29 @@ public void process(final Record<KIn, VIn> record) {
 
             final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords 
= response.deadLetterQueueRecords();
             if (!deadLetterQueueRecords.isEmpty()) {
-                final RecordCollector collector = ((RecordCollector.Supplier) 
internalProcessorContext).recordCollector();
-                for (final ProducerRecord<byte[], byte[]> 
deadLetterQueueRecord : deadLetterQueueRecords) {
-                    collector.send(
-                            deadLetterQueueRecord.key(),
-                            deadLetterQueueRecord.value(),
-                            name(),
-                            internalProcessorContext,
-                            deadLetterQueueRecord
-                    );
+                if (!(internalProcessorContext instanceof 
RecordCollector.Supplier)) {
+                    log.warn("Dead letter queue records cannot be sent for 
GlobalKTable processors " +

Review Comment:
   ```suggestion
                       log.warn("Dead letter queue records cannot be sent for 
global state/KTable processors. " +
   ```



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -861,6 +843,13 @@ public class StreamsConfig extends AbstractConfig {
             ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
             ProducerConfig.TRANSACTIONAL_ID_CONFIG
         };
+    @Deprecated
+    @SuppressWarnings("WeakerAccess")
+    public static final String 
PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG = 
"processing.exception.handler.global.enabled";
+    private static final String 
PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_DOC =
+            "When false (default), maintains backwards-compatible behaviour 
where global exceptions terminate or gracefully shut down the application. " +
+                    "When true, enables the ProcessingExceptionHandler for 
GlobalKTable exceptions. " +
+                    "This config will be removed in Kafka Streams 5.0, where 
global exception handling will be enabled by default";

Review Comment:
   I would rephrase this slightly.
   ```
   "Whether to use the configured <code>" + 
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + "</code> during global store/KTable 
processing. " +
   Disabled by default. This config will be removed in Kafka Streams 5.0, where 
global exception handling will be enabled by default.".
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -209,10 +209,8 @@ public void process(final Record<KIn, VIn> record) {
             // like Scala or Kotlin do not, and thus we need to catch 
`Exception`
             // (instead of `RuntimeException`) to work well with those 
languages
 
-            // If the processing exception handler is not set (e.g., for 
global threads),
+            // If the processing exception handler is not set,
             // rethrow the exception to let it bubble up to the uncaught 
exception handler.
-            // The processing exception handler is only set for regular stream 
tasks, not for
-            // global state update tasks which use a different error handling 
mechanism.
             if (processingExceptionHandler == null) {

Review Comment:
   I think we should leave this comment as-is, because by default this is still 
correct.
   
   But we could extend the comment and say: can removed in AK 5.0 release (Cf. 
KIP-1270)



##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -75,6 +75,7 @@ This section contains the most common Streams configuration 
parameters. For a fu
     * num.stream.threads
     * probing.rebalance.interval.ms
     * processing.exception.handler
+    * processing.exception.handler.global.enabled

Review Comment:
   ```suggestion
       * processing.exception.handler.global.enabled (deprecated)
   ```



##########
docs/streams/upgrade-guide.md:
##########
@@ -65,13 +65,9 @@ Since 2.6.0 release, Kafka Streams depends on a RocksDB 
version that requires Ma
 
 ## Streams API changes in 4.3.0
 
-The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`, 
and `poll-ratio`, along with streams state updater metrics 
`active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and 
`checkpoint-ratio` have been updated. Each metric now reports, over a rolling 
measurement window, the ratio of time this thread spends performing the given 
action (`{action}`) to the total elapsed time in that window. The effective 
window duration is determined by the metrics configuration: 
`metrics.sample.window.ms` (per-sample window length) and `metrics.num.samples` 
(number of rolling windows).
-
-### Deprecation of streams-scala module (KIP-1244)
+Kafka Streams now supports `ProcessingExceptionHandler` for Global KTable 
processing via 
[KIP-1270](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Extend+ProcessExceptionalHandler+for+GlobalThread).
 Previously, the `ProcessingExceptionHandler` only applied to regular stream 
tasks. With this release, you can now configure exception handling for Global 
KTables by setting the new config `processing.exception.handler.global.enabled` 
to `true`. When enabled, the configured `ProcessingExceptionHandler` will be 
invoked for exceptions occurring during Global KTable processing. Note that 
Dead Letter Queue (DLQ) support is not yet available for Global KTables and 
will be added in an upcoming release. More details can be found in 
[KIP-1270](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Extend+ProcessExceptionalHandler+for+GlobalThread).

Review Comment:
   ```suggestion
   Kafka Streams now supports `ProcessingExceptionHandler` for global 
store/KTable processing via 
[KIP-1270](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Extend+ProcessExceptionalHandler+for+GlobalThread).
 Previously, the `ProcessingExceptionHandler` only applied to regular stream 
tasks. With this release, you can now configure exception handling for global 
stores/KTables by setting the new config 
`processing.exception.handler.global.enabled` to `true` (recommended). When 
enabled, the configured `ProcessingExceptionHandler` will be invoked for 
exceptions occurring during global store/KTable processing. Note that Dead 
Letter Queue (DLQ) support is not yet available for global store/KTables and 
will be added in an upcoming release. More details can be found in 
[KIP-1270](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Extend+ProcessExceptionalHandler+for+GlobalThread).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to