This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.3 by this push:
new 70106524b3a KAFKA-19863: Add documentation on how to implement a
custom exception(with ref to DLQ) (#21606)
70106524b3a is described below
commit 70106524b3a8b236dae55ba5c6080ef011d3a710
Author: Rajarshi Misra <[email protected]>
AuthorDate: Thu Apr 9 05:41:04 2026 +0530
KAFKA-19863: Add documentation on how to implement a custom exception(with
ref to DLQ) (#21606)
KAFKA-19863: Add documentation on how to implement a custom
exception(with ref to DLQ)
This PR updates the Streams documentation to include guidance on
implementing a custom exception handler with built-in Dead Letter Queue
(DLQ support introduced by KIP-1034.
Reviewers Nikita Shuplestov <[email protected]>, Bill Bejeck
<[email protected]>
---
docs/streams/developer-guide/config-streams.md | 54 ++++++++++++++++++++++++++
1 file changed, 54 insertions(+)
diff --git a/docs/streams/developer-guide/config-streams.md
b/docs/streams/developer-guide/config-streams.md
index 265e58f9c20..3b248ba06b9 100644
--- a/docs/streams/developer-guide/config-streams.md
+++ b/docs/streams/developer-guide/config-streams.md
@@ -1501,6 +1501,60 @@ Serde for the inner class of a windowed record. Must
implement the `Serde` inter
> }
> }
+>**Note: The example above demonstrates manual production to a DLQ topic. The
following example shows the recommended approach using the built-in DLQ
support.**
+> A custom processing exception handler can decide whether to continue or fail
processing when user logic throws an exception. If DLQ behavior is required,
return DLQ records from the handler response.
+>
+> **Custom Exception Handler Implementation**
+>
+> The following example forwards failed records to a configured DLQ topic:
+>
+> ```java
+> public class DlqProcessingExceptionHandler implements
ProcessingExceptionHandler {
+>
+> private String deadLetterQueueTopic;
+>
+> @Override
+> public Response handleError(final ErrorHandlerContext context,
+> final Record<?, ?> record,
+> final Exception exception) {
+>
+> // Example: forward the raw record to a DLQ topic
+> Record<byte[], byte[]> dlqRecord =
+> new Record<>(deadLetterQueueTopic,
+> context.sourceRawKey(),
+> context.sourceRawValue(),
+> context.timestamp());
+>
+> // Applications may choose how to construct DLQ records. For example,
+> // they may forward the raw key/value bytes, transform the payload,
+> // or add headers with error metadata.
+> return Response.resume(List.of(dlqRecord));
+> }
+>
+> @Override
+> public void configure(final Map<String, ?> configs) {
+> // Retrieve the DLQ topic name from the configs map, or any other
source
+> deadLetterQueueTopic = (String)
configs.get("my.dlq.topic.config.key");
+> }
+> }
+> ```
+> To enable the custom exception handler and configure the DLQ topic:
+>
+> ```java
+> Properties props = new Properties();
+>
+> props.put(
+> StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+> DlqProcessingExceptionHandler.class
+> );
+>
+>// Optional: if your custom handler reads the DLQ topic from StreamsConfig,
+>// set it here. Otherwise, configure the topic name via your own properties.
+> // props.put(
+> // StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
+> // "dlq-topic"
+> // );
+> ```
### processing.exception.handler.global.enabled (deprecated)
> Controls whether the configured `ProcessingExceptionHandler` is invoked for
> exceptions occurring during global store/KTable processing. When set to
> `true` (recommended), the handler specified via
> `processing.exception.handler` will be invoked for exceptions occurring
> during global store/KTable processing. When set to `false` (default),
> exceptions from global store/KTable will not invoke the processing exception
> handler and will instead bubble up to the configured uncaught exception
> handler.