This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1738c30ab5e KAFKA-19863: Add documentation on how to implement a 
custom exception(with ref to DLQ) (#21606)
1738c30ab5e is described below

commit 1738c30ab5e890b25f1727f4a9c53c818b414463
Author: Rajarshi Misra <[email protected]>
AuthorDate: Thu Apr 9 05:41:04 2026 +0530

    KAFKA-19863: Add documentation on how to implement a custom exception(with 
ref to DLQ) (#21606)
    
    KAFKA-19863: Add documentation on how to implement a custom
    exception(with ref to DLQ)
    
    This PR updates the Streams documentation to include guidance on
    implementing a custom exception handler with built-in Dead Letter Queue
    (DLQ support introduced by KIP-1034.
    
    Reviewers Nikita Shuplestov <[email protected]>, Bill Bejeck
    <[email protected]>
---
 docs/streams/developer-guide/config-streams.md | 54 ++++++++++++++++++++++++++
 1 file changed, 54 insertions(+)

diff --git a/docs/streams/developer-guide/config-streams.md 
b/docs/streams/developer-guide/config-streams.md
index 40802a67c1f..7b1a9e03de3 100644
--- a/docs/streams/developer-guide/config-streams.md
+++ b/docs/streams/developer-guide/config-streams.md
@@ -1501,6 +1501,60 @@ Serde for the inner class of a windowed record. Must 
implement the `Serde` inter
 >         }
 >     }
 
+>**Note: The example above demonstrates manual production to a DLQ topic. The 
following example shows the recommended approach using the built-in DLQ 
support.**
+> A custom processing exception handler can decide whether to continue or fail 
processing when user logic throws an exception. If DLQ behavior is required, 
return DLQ records from the handler response.
+>
+> **Custom Exception Handler Implementation**
+>
+> The following example forwards failed records to a configured DLQ topic:
+>
+> ```java
+> public class DlqProcessingExceptionHandler implements 
ProcessingExceptionHandler {
+>
+>     private String deadLetterQueueTopic;
+>
+>     @Override
+>     public Response handleError(final ErrorHandlerContext context,
+>                                 final Record<?, ?> record,
+>                                 final Exception exception) {
+>
+>       // Example: forward the raw record to a DLQ topic
+>       Record<byte[], byte[]> dlqRecord =
+>           new Record<>(deadLetterQueueTopic,
+>                        context.sourceRawKey(),
+>                        context.sourceRawValue(),
+>                        context.timestamp());
+>
+>       // Applications may choose how to construct DLQ records. For example,
+>       // they may forward the raw key/value bytes, transform the payload,
+>       // or add headers with error metadata.
+>       return Response.resume(List.of(dlqRecord));
+>      }
+>
+>     @Override
+>     public void configure(final Map<String, ?> configs) {
+>         // Retrieve the DLQ topic name from the configs map, or any other 
source
+>         deadLetterQueueTopic = (String) 
configs.get("my.dlq.topic.config.key");
+>     }
+> }
+> ```
+> To enable the custom exception handler and configure the DLQ topic:
+>
+> ```java
+> Properties props = new Properties();
+>
+> props.put(
+>     StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+>     DlqProcessingExceptionHandler.class
+> );
+>
+>//   Optional: if your custom handler reads the DLQ topic from StreamsConfig,
+>//   set it here. Otherwise, configure the topic name via your own properties.
+> //  props.put(
+> //    StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
+> //    "dlq-topic"
+> //  );
+> ```
 ### processing.exception.handler.global.enabled (deprecated)
 
 > Controls whether the configured `ProcessingExceptionHandler` is invoked for 
 > exceptions occurring during global store/KTable processing. When set to 
 > `true` (recommended), the handler specified via 
 > `processing.exception.handler` will be invoked for exceptions occurring 
 > during global store/KTable processing. When set to `false` (default), 
 > exceptions from global store/KTable will not invoke the processing exception 
 > handler and will instead bubble up to the configured uncaught exception 
 > handler.

Reply via email to