mjsax commented on code in PR #21535:
URL: https://github.com/apache/kafka/pull/21535#discussion_r2893711125
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
record.headers());
globalProcessorContext.setRecordContext(recordContext);
- try {
- if (record.key() != null) {
- source.process(new Record(
+ if (record.key() != null) {
+ // Deserialization phase
+ final Record<?, ?> deserializedRecord;
+ try {
+ deserializedRecord = new Record<>(
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.value()),
record.timestamp(),
- record.headers()));
+ record.headers());
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ handleDeserializationFailure(
+ deserializationExceptionHandler,
+ globalProcessorContext,
+ deserializationException,
+ record,
+ log,
+ droppedRecordsSensor(
+ Thread.currentThread().getName(),
+ globalProcessorContext.taskId().toString(),
+ globalProcessorContext.metrics()
+ ),
+ null
+ );
+ continue; // Skip this record
+ }
+ final ProcessingExceptionHandler.Response response;
+ // Processing phase
+ try {
+ ((Processor) source).process(deserializedRecord);
restoreCount++;
batchRestoreCount++;
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ if (processingExceptionHandler != null) {
+ final ErrorHandlerContext errorHandlerContext
= new DefaultErrorHandlerContext(
+ globalProcessorContext,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.headers(),
+ null,
+ globalProcessorContext.taskId(),
+ record.timestamp(),
+ record.key(),
+ record.value()
+ );
+ try {
+ response =
+
Objects.requireNonNull(processingExceptionHandler.handleError(
+ errorHandlerContext,
+ deserializedRecord,
+ processingException
+ ), "Invalid ProcessingExceptionHandler
response");
+ if
(!response.deadLetterQueueRecords().isEmpty()) {
+ log.warn("Dead letter queue records
cannot be sent for GlobalKTable processors " +
+ "(no producer available). DLQ
support for GlobalKTable will be addressed in a future KIP. " + "Record
context: {}",
+ errorHandlerContext);
+ }
+ } catch (final Exception fatalUserException) {
+ log.error(
+ "Processing error callback failed
after processing error for record: {}",
+ errorHandlerContext,
+ processingException
+ );
+ throw new FailedProcessingException(
+ "Fatal user code error in
processing error callback",
+ null,
+ fatalUserException
+ );
+ }
+
+ if (response.result() ==
ProcessingExceptionHandler.Result.FAIL) {
+ log.error("Processing exception handler is
set to fail upon" +
+ " a processing error. If you would
rather have the streaming pipeline" +
+ " continue after a processing
error, please set the " +
+
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");
+ throw new FailedProcessingException(null,
processingException);
+ }
+ // RESUME - log and continue
+ log.warn("Processing exception handler chose
to resume for record at offset {}", record.offset(), processingException);
+ droppedRecordsSensor(
Review Comment:
Should we refactor this, and add a member variable `Sensor
droppedRecordSensor` which we set inside `initialze()` and just use here? And
also change the code in L343 above?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
record.headers());
globalProcessorContext.setRecordContext(recordContext);
- try {
- if (record.key() != null) {
- source.process(new Record(
+ if (record.key() != null) {
+ // Deserialization phase
+ final Record<?, ?> deserializedRecord;
+ try {
+ deserializedRecord = new Record<>(
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.value()),
record.timestamp(),
- record.headers()));
+ record.headers());
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ handleDeserializationFailure(
+ deserializationExceptionHandler,
+ globalProcessorContext,
+ deserializationException,
+ record,
+ log,
+ droppedRecordsSensor(
+ Thread.currentThread().getName(),
+ globalProcessorContext.taskId().toString(),
+ globalProcessorContext.metrics()
+ ),
+ null
+ );
+ continue; // Skip this record
+ }
+ final ProcessingExceptionHandler.Response response;
+ // Processing phase
+ try {
+ ((Processor) source).process(deserializedRecord);
restoreCount++;
batchRestoreCount++;
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ if (processingExceptionHandler != null) {
+ final ErrorHandlerContext errorHandlerContext
= new DefaultErrorHandlerContext(
+ globalProcessorContext,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.headers(),
+ null,
+ globalProcessorContext.taskId(),
+ record.timestamp(),
+ record.key(),
+ record.value()
+ );
+ try {
+ response =
+
Objects.requireNonNull(processingExceptionHandler.handleError(
+ errorHandlerContext,
+ deserializedRecord,
+ processingException
+ ), "Invalid ProcessingExceptionHandler
response");
+ if
(!response.deadLetterQueueRecords().isEmpty()) {
+ log.warn("Dead letter queue records
cannot be sent for GlobalKTable processors " +
+ "(no producer available). DLQ
support for GlobalKTable will be addressed in a future KIP. " + "Record
context: {}",
+ errorHandlerContext);
+ }
+ } catch (final Exception fatalUserException) {
+ log.error(
+ "Processing error callback failed
after processing error for record: {}",
+ errorHandlerContext,
+ processingException
+ );
+ throw new FailedProcessingException(
+ "Fatal user code error in
processing error callback",
+ null,
+ fatalUserException
+ );
+ }
+
+ if (response.result() ==
ProcessingExceptionHandler.Result.FAIL) {
+ log.error("Processing exception handler is
set to fail upon" +
+ " a processing error. If you would
rather have the streaming pipeline" +
+ " continue after a processing
error, please set the " +
+
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.");
+ throw new FailedProcessingException(null,
processingException);
+ }
+ // RESUME - log and continue
+ log.warn("Processing exception handler chose
to resume for record at offset {}", record.offset(), processingException);
Review Comment:
I don't think this is a WARN -- not even sure if we would want to log at
all? And if, only at DEBUG or TRACE I would say?
We don't log anything in existing code using processor exception handler --
we record as a metric which we believe is enough.
##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -1484,6 +1485,30 @@ Serde for the inner class of a windowed record. Must
implement the `Serde` inter
> }
> }
+### processing.exception.handler.global.enabled
Review Comment:
```suggestion
### processing.exception.handler.global.enabled (deprecated)
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -254,23 +252,29 @@ public void process(final Record<KIn, VIn> record) {
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords
= response.deadLetterQueueRecords();
if (!deadLetterQueueRecords.isEmpty()) {
- final RecordCollector collector = ((RecordCollector.Supplier)
internalProcessorContext).recordCollector();
- for (final ProducerRecord<byte[], byte[]>
deadLetterQueueRecord : deadLetterQueueRecords) {
- collector.send(
- deadLetterQueueRecord.key(),
- deadLetterQueueRecord.value(),
- name(),
- internalProcessorContext,
- deadLetterQueueRecord
- );
+ if (!(internalProcessorContext instanceof
RecordCollector.Supplier)) {
+ log.warn("Dead letter queue records cannot be sent for
GlobalKTable processors " +
+ "(no producer available). DLQ support for
GlobalKTable will be addressed in a future KIP. " + "Record context: {}",
Review Comment:
```suggestion
"DLQ support for global state/KTable will be
addressed in a future release. " + "Record context: {}",
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
record.headers());
globalProcessorContext.setRecordContext(recordContext);
- try {
- if (record.key() != null) {
- source.process(new Record(
+ if (record.key() != null) {
+ // Deserialization phase
+ final Record<?, ?> deserializedRecord;
+ try {
+ deserializedRecord = new Record<>(
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.value()),
record.timestamp(),
- record.headers()));
+ record.headers());
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ handleDeserializationFailure(
+ deserializationExceptionHandler,
+ globalProcessorContext,
+ deserializationException,
+ record,
+ log,
+ droppedRecordsSensor(
+ Thread.currentThread().getName(),
+ globalProcessorContext.taskId().toString(),
+ globalProcessorContext.metrics()
+ ),
+ null
+ );
+ continue; // Skip this record
+ }
+ final ProcessingExceptionHandler.Response response;
+ // Processing phase
+ try {
+ ((Processor) source).process(deserializedRecord);
restoreCount++;
batchRestoreCount++;
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ if (processingExceptionHandler != null) {
+ final ErrorHandlerContext errorHandlerContext
= new DefaultErrorHandlerContext(
+ globalProcessorContext,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.headers(),
+ null,
+ globalProcessorContext.taskId(),
+ record.timestamp(),
+ record.key(),
+ record.value()
+ );
+ try {
+ response =
+
Objects.requireNonNull(processingExceptionHandler.handleError(
+ errorHandlerContext,
+ deserializedRecord,
+ processingException
+ ), "Invalid ProcessingExceptionHandler
response");
+ if
(!response.deadLetterQueueRecords().isEmpty()) {
+ log.warn("Dead letter queue records
cannot be sent for GlobalKTable processors " +
+ "(no producer available). DLQ
support for GlobalKTable will be addressed in a future KIP. " + "Record
context: {}",
Review Comment:
```suggestion
"DLQ support for global
state/KTable will be added in a future release. " + "Record context: {}",
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
record.headers());
globalProcessorContext.setRecordContext(recordContext);
- try {
- if (record.key() != null) {
- source.process(new Record(
+ if (record.key() != null) {
+ // Deserialization phase
+ final Record<?, ?> deserializedRecord;
+ try {
+ deserializedRecord = new Record<>(
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.value()),
record.timestamp(),
- record.headers()));
+ record.headers());
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ handleDeserializationFailure(
+ deserializationExceptionHandler,
+ globalProcessorContext,
+ deserializationException,
+ record,
+ log,
+ droppedRecordsSensor(
+ Thread.currentThread().getName(),
+ globalProcessorContext.taskId().toString(),
+ globalProcessorContext.metrics()
+ ),
+ null
+ );
+ continue; // Skip this record
+ }
+ final ProcessingExceptionHandler.Response response;
+ // Processing phase
+ try {
+ ((Processor) source).process(deserializedRecord);
restoreCount++;
batchRestoreCount++;
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ if (processingExceptionHandler != null) {
+ final ErrorHandlerContext errorHandlerContext
= new DefaultErrorHandlerContext(
+ globalProcessorContext,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.headers(),
+ null,
+ globalProcessorContext.taskId(),
+ record.timestamp(),
+ record.key(),
+ record.value()
+ );
+ try {
+ response =
+
Objects.requireNonNull(processingExceptionHandler.handleError(
+ errorHandlerContext,
+ deserializedRecord,
+ processingException
+ ), "Invalid ProcessingExceptionHandler
response");
+ if
(!response.deadLetterQueueRecords().isEmpty()) {
+ log.warn("Dead letter queue records
cannot be sent for GlobalKTable processors " +
Review Comment:
```suggestion
log.warn("Dead letter queue records
cannot be sent for global state/KTable processors. " +
```
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -861,6 +843,13 @@ public class StreamsConfig extends AbstractConfig {
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
ProducerConfig.TRANSACTIONAL_ID_CONFIG
};
+ @Deprecated
Review Comment:
```suggestion
/**
* {@code processing.exception.handler.global.enabled}
* @deprecated Since 4.3. Default will change to {@code true} when
removed.
*/
@Deprecated
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +321,100 @@ private void reprocessState(final List<TopicPartition>
topicPartitions,
record.headers());
globalProcessorContext.setRecordContext(recordContext);
- try {
- if (record.key() != null) {
- source.process(new Record(
+ if (record.key() != null) {
+ // Deserialization phase
+ final Record<?, ?> deserializedRecord;
+ try {
+ deserializedRecord = new Record<>(
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
reprocessFactory.valueDeserializer().deserialize(record.topic(),
record.value()),
record.timestamp(),
- record.headers()));
+ record.headers());
+ } catch (final Exception deserializationException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ handleDeserializationFailure(
+ deserializationExceptionHandler,
+ globalProcessorContext,
+ deserializationException,
+ record,
+ log,
+ droppedRecordsSensor(
+ Thread.currentThread().getName(),
+ globalProcessorContext.taskId().toString(),
+ globalProcessorContext.metrics()
+ ),
+ null
+ );
+ continue; // Skip this record
+ }
+ final ProcessingExceptionHandler.Response response;
+ // Processing phase
+ try {
+ ((Processor) source).process(deserializedRecord);
restoreCount++;
batchRestoreCount++;
+ } catch (final Exception processingException) {
+ // while Java distinguishes checked vs unchecked
exceptions, other languages
+ // like Scala or Kotlin do not, and thus we need
to catch `Exception`
+ // (instead of `RuntimeException`) to work well
with those languages
+ if (processingExceptionHandler != null) {
+ final ErrorHandlerContext errorHandlerContext
= new DefaultErrorHandlerContext(
+ globalProcessorContext,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.headers(),
+ null,
Review Comment:
Can't we use `globalProcessorContext.currentNode().name()` here?
##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -1446,7 +1447,7 @@ Serde for the inner class of a windowed record. Must
implement the `Serde` inter
> The processing exception handler allows you to manage exceptions triggered
> during the processing of a record. The implemented exception handler needs
> to return a `FAIL` or `CONTINUE` depending on the record and the exception
> thrown. Returning `FAIL` will signal that Streams should shut down and
> `CONTINUE` will signal that Streams should ignore the issue and continue
> processing.
>
-> **Note:** This handler applies only to regular stream processing tasks. It
does not apply to global state store updates (global threads). Exceptions
occurring in global threads will bubble up to the configured uncaught exception
handler.
+> **Note:** By default, this handler applies only to regular stream processing
tasks. To enable exception handling for Global KTable processing, see
`processing.exception.handler.global.enabled` below. When global exception
handling is disabled (default), exceptions occurring during Global KTable
processing will bubble up to the configured uncaught exception handler.
Review Comment:
```suggestion
> **Note:** By default, this handler applies only to regular stream
processing tasks. To enable exception handling for global stores/KTable
processing (which is recommended), see
`processing.exception.handler.global.enabled` below. When global exception
handling is disabled (default), exceptions occurring during global store/KTable
processing will bubble up to the configured uncaught exception handler.
```
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1274,7 +1257,12 @@ public class StreamsConfig extends AbstractConfig {
Type.LONG,
null,
Importance.LOW,
- WINDOW_SIZE_MS_DOC);
+ WINDOW_SIZE_MS_DOC)
+ .define(PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
+ Type.BOOLEAN,
+ false,
+ Importance.HIGH,
Review Comment:
We order config per importance -- HIGH goes into the first section (we order
by name alphabetically within each section)
##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -1484,6 +1485,30 @@ Serde for the inner class of a windowed record. Must
implement the `Serde` inter
> }
> }
+### processing.exception.handler.global.enabled
+
+> Controls whether the configured `ProcessingExceptionHandler` is invoked for
exceptions occurring during Global KTable processing. When set to `true`, the
handler specified via `processing.exception.handler` will be invoked for
exceptions occurring during Global KTable processing. When set to `false`
(default), exceptions from Global KTables will not invoke the processing
exception handler and will instead bubble up to the configured uncaught
exception handler.
Review Comment:
```suggestion
> Controls whether the configured `ProcessingExceptionHandler` is invoked
for exceptions occurring during global store/KTable processing. When set to
`true` (recommended), the handler specified via `processing.exception.handler`
will be invoked for exceptions occurring during global store/KTable processing.
When set to `false` (default), exceptions from global store/KTables will not
invoke the processing exception handler and will instead bubble up to the
configured uncaught exception handler.
```
##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -1484,6 +1485,30 @@ Serde for the inner class of a windowed record. Must
implement the `Serde` inter
> }
> }
+### processing.exception.handler.global.enabled
+
+> Controls whether the configured `ProcessingExceptionHandler` is invoked for
exceptions occurring during Global KTable processing. When set to `true`, the
handler specified via `processing.exception.handler` will be invoked for
exceptions occurring during Global KTable processing. When set to `false`
(default), exceptions from Global KTables will not invoke the processing
exception handler and will instead bubble up to the configured uncaught
exception handler.
+>
+> **Default value:** `false`
+>
+> **Important Notes:**
+>
+> * Dead Letter Queue (DLQ) functionality is not supported for Global
KTables. For Global KTable exceptions, the record metadata will be logged and
the record will not be sent to the DLQ.
Review Comment:
```suggestion
> * Dead Letter Queue (DLQ) functionality is not supported for global
state/KTables. For global state/KTable exceptions, the record metadata will be
logged and the record will not be sent to the DLQ.
```
##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -1484,6 +1485,30 @@ Serde for the inner class of a windowed record. Must
implement the `Serde` inter
> }
> }
+### processing.exception.handler.global.enabled
+
+> Controls whether the configured `ProcessingExceptionHandler` is invoked for
exceptions occurring during Global KTable processing. When set to `true`, the
handler specified via `processing.exception.handler` will be invoked for
exceptions occurring during Global KTable processing. When set to `false`
(default), exceptions from Global KTables will not invoke the processing
exception handler and will instead bubble up to the configured uncaught
exception handler.
+>
+> **Default value:** `false`
Review Comment:
Insert:
```
**Deprecated:** The config is deprecated for removal in 5.0 release. With
the removal of the config, the processing exception handler will be applied
during global state/KTable processing and cannot be disabled any longer. Thus,
it's recommended to enable this config now, to avoid backward incompatibilities
in the future.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -404,6 +405,7 @@ private StateConsumer initialize() {
globalProcessorContext,
stateMgr,
config.deserializationExceptionHandler(),
+ processingExceptionHandler,
Review Comment:
Should be fine as-is. There is no contract that says we create only one
instance.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -254,23 +252,29 @@ public void process(final Record<KIn, VIn> record) {
final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords
= response.deadLetterQueueRecords();
if (!deadLetterQueueRecords.isEmpty()) {
- final RecordCollector collector = ((RecordCollector.Supplier)
internalProcessorContext).recordCollector();
- for (final ProducerRecord<byte[], byte[]>
deadLetterQueueRecord : deadLetterQueueRecords) {
- collector.send(
- deadLetterQueueRecord.key(),
- deadLetterQueueRecord.value(),
- name(),
- internalProcessorContext,
- deadLetterQueueRecord
- );
+ if (!(internalProcessorContext instanceof
RecordCollector.Supplier)) {
+ log.warn("Dead letter queue records cannot be sent for
GlobalKTable processors " +
Review Comment:
```suggestion
log.warn("Dead letter queue records cannot be sent for
global state/KTable processors. " +
```
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -861,6 +843,13 @@ public class StreamsConfig extends AbstractConfig {
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
ProducerConfig.TRANSACTIONAL_ID_CONFIG
};
+ @Deprecated
+ @SuppressWarnings("WeakerAccess")
+ public static final String
PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG =
"processing.exception.handler.global.enabled";
+ private static final String
PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_DOC =
+ "When false (default), maintains backwards-compatible behaviour
where global exceptions terminate or gracefully shut down the application. " +
+ "When true, enables the ProcessingExceptionHandler for
GlobalKTable exceptions. " +
+ "This config will be removed in Kafka Streams 5.0, where
global exception handling will be enabled by default";
Review Comment:
I would rephrase this slightly.
```
"Whether to use the configured <code>" +
PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + "</code> during global store/KTable
processing. " +
Disabled by default. This config will be removed in Kafka Streams 5.0, where
global exception handling will be enabled by default.".
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -209,10 +209,8 @@ public void process(final Record<KIn, VIn> record) {
// like Scala or Kotlin do not, and thus we need to catch
`Exception`
// (instead of `RuntimeException`) to work well with those
languages
- // If the processing exception handler is not set (e.g., for
global threads),
+ // If the processing exception handler is not set,
// rethrow the exception to let it bubble up to the uncaught
exception handler.
- // The processing exception handler is only set for regular stream
tasks, not for
- // global state update tasks which use a different error handling
mechanism.
if (processingExceptionHandler == null) {
Review Comment:
I think we should leave this comment as-is, because by default this is still
correct.
But we could extend the comment and say: can removed in AK 5.0 release (Cf.
KIP-1270)
##########
docs/streams/developer-guide/config-streams.md:
##########
@@ -75,6 +75,7 @@ This section contains the most common Streams configuration
parameters. For a fu
* num.stream.threads
* probing.rebalance.interval.ms
* processing.exception.handler
+ * processing.exception.handler.global.enabled
Review Comment:
```suggestion
* processing.exception.handler.global.enabled (deprecated)
```
##########
docs/streams/upgrade-guide.md:
##########
@@ -65,13 +65,9 @@ Since 2.6.0 release, Kafka Streams depends on a RocksDB
version that requires Ma
## Streams API changes in 4.3.0
-The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`,
and `poll-ratio`, along with streams state updater metrics
`active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and
`checkpoint-ratio` have been updated. Each metric now reports, over a rolling
measurement window, the ratio of time this thread spends performing the given
action (`{action}`) to the total elapsed time in that window. The effective
window duration is determined by the metrics configuration:
`metrics.sample.window.ms` (per-sample window length) and `metrics.num.samples`
(number of rolling windows).
-
-### Deprecation of streams-scala module (KIP-1244)
+Kafka Streams now supports `ProcessingExceptionHandler` for Global KTable
processing via
[KIP-1270](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Extend+ProcessExceptionalHandler+for+GlobalThread).
Previously, the `ProcessingExceptionHandler` only applied to regular stream
tasks. With this release, you can now configure exception handling for Global
KTables by setting the new config `processing.exception.handler.global.enabled`
to `true`. When enabled, the configured `ProcessingExceptionHandler` will be
invoked for exceptions occurring during Global KTable processing. Note that
Dead Letter Queue (DLQ) support is not yet available for Global KTables and
will be added in an upcoming release. More details can be found in
[KIP-1270](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Extend+ProcessExceptionalHandler+for+GlobalThread).
Review Comment:
```suggestion
Kafka Streams now supports `ProcessingExceptionHandler` for global
store/KTable processing via
[KIP-1270](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Extend+ProcessExceptionalHandler+for+GlobalThread).
Previously, the `ProcessingExceptionHandler` only applied to regular stream
tasks. With this release, you can now configure exception handling for global
stores/KTables by setting the new config
`processing.exception.handler.global.enabled` to `true` (recommended). When
enabled, the configured `ProcessingExceptionHandler` will be invoked for
exceptions occurring during global store/KTable processing. Note that Dead
Letter Queue (DLQ) support is not yet available for global store/KTables and
will be added in an upcoming release. More details can be found in
[KIP-1270](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1270%3A+Extend+ProcessExceptionalHandler+for+GlobalThread).
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]