This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cdc2d957edb KAFKA-16505: Adding dead letter queue in Kafka Streams
(#17942)
cdc2d957edb is described below
commit cdc2d957edb2551b4ace30acfc72f6853f00790b
Author: Gasparina Damien <[email protected]>
AuthorDate: Mon Jul 21 15:54:40 2025 +0200
KAFKA-16505: Adding dead letter queue in Kafka Streams (#17942)
Implements KIP-1034 to add support of Dead Letter
Queue in Kafka Streams.
Reviewers: Lucas Brutschy <[email protected]>, Bruno Cadonna
<[email protected]>
Co-authored-by: Sebastien Viale <[email protected]>
---
.../ProcessingExceptionHandlerIntegrationTest.java | 10 +-
.../SwallowUnknownTopicErrorIntegrationTest.java | 10 +-
.../org/apache/kafka/streams/StreamsConfig.java | 10 +
.../errors/DefaultProductionExceptionHandler.java | 36 +--
.../errors/DeserializationExceptionHandler.java | 156 +++++++++++++
.../errors/LogAndContinueExceptionHandler.java | 39 +---
.../LogAndContinueProcessingExceptionHandler.java | 14 +-
.../streams/errors/LogAndFailExceptionHandler.java | 39 +---
.../LogAndFailProcessingExceptionHandler.java | 13 +-
.../streams/errors/ProcessingExceptionHandler.java | 162 +++++++++++++-
.../streams/errors/ProductionExceptionHandler.java | 217 +++++++++++++++++-
.../errors/internals/ExceptionHandlerUtils.java | 101 +++++++++
.../streams/processor/internals/ProcessorNode.java | 23 +-
.../processor/internals/RecordCollector.java | 7 +
.../processor/internals/RecordCollectorImpl.java | 74 ++++--
.../processor/internals/RecordDeserializer.java | 27 ++-
.../streams/processor/internals/StreamTask.java | 22 +-
.../apache/kafka/streams/StreamsConfigTest.java | 5 +
.../streams/errors/ExceptionHandlerUtilsTest.java | 114 ++++++++++
.../processor/internals/ProcessorNodeTest.java | 132 ++++++++++-
.../processor/internals/RecordCollectorTest.java | 247 ++++++++++++++++++---
.../internals/RecordDeserializerTest.java | 168 +++++++++++++-
.../processor/internals/StreamTaskTest.java | 4 +-
.../org/apache/kafka/test/MockRecordCollector.java | 17 ++
24 files changed, 1477 insertions(+), 170 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
index 38711093ff8..4406f292205 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
@@ -357,7 +357,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
final StreamsException e = assertThrows(StreamsException.class, ()
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
assertEquals("Fatal user code error in processing error callback",
e.getMessage());
assertInstanceOf(NullPointerException.class, e.getCause());
- assertEquals("Invalid ProductionExceptionHandler response.",
e.getCause().getMessage());
+ assertEquals("Invalid ProcessingExceptionHandler response.",
e.getCause().getMessage());
assertFalse(isExecuted.get());
}
}
@@ -524,7 +524,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
public static class ContinueProcessingExceptionHandlerMockTest implements
ProcessingExceptionHandler {
@Override
- public ProcessingExceptionHandler.ProcessingHandlerResponse
handle(final ErrorHandlerContext context, final Record<?, ?> record, final
Exception exception) {
+ public Response handleError(final ErrorHandlerContext context, final
Record<?, ?> record, final Exception exception) {
if (((String) record.key()).contains("FATAL")) {
throw new RuntimeException("KABOOM!");
}
@@ -532,7 +532,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
return null;
}
assertProcessingExceptionHandlerInputs(context, record, exception);
- return
ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE;
+ return Response.resume();
}
@Override
@@ -543,9 +543,9 @@ public class ProcessingExceptionHandlerIntegrationTest {
public static class FailProcessingExceptionHandlerMockTest implements
ProcessingExceptionHandler {
@Override
- public ProcessingExceptionHandler.ProcessingHandlerResponse
handle(final ErrorHandlerContext context, final Record<?, ?> record, final
Exception exception) {
+ public Response handleError(final ErrorHandlerContext context, final
Record<?, ?> record, final Exception exception) {
assertProcessingExceptionHandlerInputs(context, record, exception);
- return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
+ return Response.fail();
}
@Override
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java
index a82e832e21c..0535dd2465e 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java
@@ -156,15 +156,15 @@ public class SwallowUnknownTopicErrorIntegrationTest {
public void configure(final Map<String, ?> configs) { }
@Override
- public ProductionExceptionHandlerResponse handle(final
ErrorHandlerContext context,
- final
ProducerRecord<byte[], byte[]> record,
- final Exception
exception) {
+ public Response handleError(final ErrorHandlerContext context,
+ final ProducerRecord<byte[], byte[]>
record,
+ final Exception exception) {
if (exception instanceof TimeoutException &&
exception.getCause() != null &&
exception.getCause() instanceof
UnknownTopicOrPartitionException) {
- return ProductionExceptionHandlerResponse.CONTINUE;
+ return Response.resume();
}
- return ProductionExceptionHandler.super.handle(context, record,
exception);
+ return ProductionExceptionHandler.super.handleError(context,
record, exception);
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5e1eaff1162..6364e7e9126 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -619,6 +619,11 @@ public class StreamsConfig extends AbstractConfig {
"support \"classic\" or \"streams\". If \"streams\" is specified, then
the streams rebalance protocol will be " +
"used. Otherwise, the classic group protocol will be used.";
+ public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG =
"errors.dead.letter.queue.topic.name";
+
+ private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If
not null, the default exception handler will build and send a Dead Letter Queue
record to the topic with the provided name if an error occurs.\n" +
+ "If a custom deserialization/production or processing exception
handler is set, this parameter is ignored for this handler.";
+
/** {@code log.summary.interval.ms} */
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG =
"log.summary.interval.ms";
private static final String LOG_SUMMARY_INTERVAL_MS_DOC = "The output
interval in milliseconds for logging summary information.\n" +
@@ -991,6 +996,11 @@ public class StreamsConfig extends AbstractConfig {
LogAndFailExceptionHandler.class.getName(),
Importance.MEDIUM,
DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC)
+ .define(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
+ Type.STRING,
+ null,
+ Importance.MEDIUM,
+ ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC)
.define(MAX_TASK_IDLE_MS_CONFIG,
Type.LONG,
0L,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
index 5994326770c..3e9eb2fba86 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
@@ -18,38 +18,42 @@ package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.streams.StreamsConfig;
import java.util.Map;
+import static
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
/**
* {@code ProductionExceptionHandler} that always instructs streams to fail
when an exception
* happens while attempting to produce result records.
*/
public class DefaultProductionExceptionHandler implements
ProductionExceptionHandler {
- /**
- * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext,
ProducerRecord, Exception)} instead.
- */
- @SuppressWarnings("deprecation")
- @Deprecated
+
+ private String deadLetterQueueTopic = null;
+
@Override
- public ProductionExceptionHandlerResponse handle(final
ProducerRecord<byte[], byte[]> record,
- final Exception
exception) {
+ public Response handleError(final ErrorHandlerContext context,
+ final ProducerRecord<byte[], byte[]> record,
+ final Exception exception) {
return exception instanceof RetriableException ?
- ProductionExceptionHandlerResponse.RETRY :
- ProductionExceptionHandlerResponse.FAIL;
+ Response.retry() :
+
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic,
context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
+ @SuppressWarnings("rawtypes")
@Override
- public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext
context,
- final
ProducerRecord<byte[], byte[]> record,
- final Exception
exception) {
- return exception instanceof RetriableException ?
- ProductionExceptionHandlerResponse.RETRY :
- ProductionExceptionHandlerResponse.FAIL;
+ public Response handleSerializationError(final ErrorHandlerContext context,
+ final ProducerRecord record,
+ final Exception exception,
+ final
SerializationExceptionOrigin origin) {
+ return
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic,
context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
+
@Override
public void configure(final Map<String, ?> configs) {
- // ignore
+ if
(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
+ deadLetterQueueTopic =
String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
index 0b44e04d791..8c3667c20f4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
@@ -17,10 +17,14 @@
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.processor.ProcessorContext;
+import java.util.Collections;
+import java.util.List;
+
/**
* Interface that specifies how an exception from source node deserialization
* (e.g., reading from Kafka) should be handled.
@@ -63,16 +67,35 @@ public interface DeserializationExceptionHandler extends
Configurable {
* The actual exception.
*
* @return Whether to continue or stop processing.
+ *
+ * @deprecated Use {@link #handleError(ErrorHandlerContext,
ConsumerRecord, Exception)} instead.
*/
+ @Deprecated
default DeserializationHandlerResponse handle(final ErrorHandlerContext
context,
final ConsumerRecord<byte[],
byte[]> record,
final Exception exception) {
return handle(((DefaultErrorHandlerContext)
context).processorContext().orElse(null), record, exception);
}
+ /**
+ * Inspects a record and the exception received during deserialization.
+ *
+ * @param context
+ * Error handler context.
+ * @param record
+ * Record that failed deserialization.
+ * @param exception
+ * The actual exception.
+ *
+ * @return a {@link Response} object
+ */
+ default Response handleError(final ErrorHandlerContext context, final
ConsumerRecord<byte[], byte[]> record, final Exception exception) {
+ return new Response(Result.from(handle(context, record, exception)),
Collections.emptyList());
+ }
/**
* Enumeration that describes the response from the exception handler.
*/
+ @Deprecated
enum DeserializationHandlerResponse {
/** Continue processing. */
CONTINUE(0, "CONTINUE"),
@@ -95,4 +118,137 @@ public interface DeserializationExceptionHandler extends
Configurable {
}
}
+ /**
+ * Enumeration that describes the response from the exception handler.
+ */
+ enum Result {
+ /** Continue processing. */
+ RESUME(0, "RESUME"),
+ /** Fail processing. */
+ FAIL(1, "FAIL");
+
+ /**
+ * An english description for the used option. This is for debugging
only and may change.
+ */
+ public final String name;
+
+ /**
+ * The permanent and immutable id for the used option. This can't
change ever.
+ */
+ public final int id;
+
+ Result(final int id, final String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ /**
+ * Converts the deprecated enum DeserializationHandlerResponse into
the new Result enum.
+ *
+ * @param value the old DeserializationHandlerResponse enum value
+ * @return a {@link Result} enum value
+ * @throws IllegalArgumentException if the provided value does not map
to a valid {@link Result}
+ */
+ private static DeserializationExceptionHandler.Result from(final
DeserializationHandlerResponse value) {
+ switch (value) {
+ case FAIL:
+ return Result.FAIL;
+ case CONTINUE:
+ return Result.RESUME;
+ default:
+ throw new IllegalArgumentException("No Result enum found
for old value: " + value);
+ }
+ }
+ }
+
+ /**
+ * Represents the result of handling a deserialization exception.
+ * <p>
+ * The {@code Response} class encapsulates a {@link Result},
+ * indicating whether processing should continue or fail, along with an
optional list of
+ * {@link ProducerRecord} instances to be sent to a dead letter queue.
+ * </p>
+ */
+ class Response {
+
+ private final Result result;
+
+ private final List<ProducerRecord<byte[], byte[]>>
deadLetterQueueRecords;
+
+ /**
+ * Constructs a new {@code DeserializationExceptionResponse} object.
+ *
+ * @param result the result indicating whether processing should
continue or fail;
+ * must not be {@code null}.
+ * @param deadLetterQueueRecords the list of records to be sent to the
dead letter queue; may be {@code null}.
+ */
+ private Response(final Result result,
+ final List<ProducerRecord<byte[], byte[]>>
deadLetterQueueRecords) {
+ this.result = result;
+ this.deadLetterQueueRecords = deadLetterQueueRecords;
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should fail.
+ *
+ * @param deadLetterQueueRecords the list of records to be sent to the
dead letter queue; may be {@code null}.
+ * @return a {@code Response} with a {@link
DeserializationExceptionHandler.Result#FAIL} status.
+ */
+ public static Response fail(final List<ProducerRecord<byte[], byte[]>>
deadLetterQueueRecords) {
+ return new Response(Result.FAIL, deadLetterQueueRecords);
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should fail.
+ *
+ * @return a {@code Response} with a {@link
DeserializationExceptionHandler.Result#FAIL} status.
+ */
+ public static Response fail() {
+ return fail(Collections.emptyList());
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should
continue.
+ *
+ * @param deadLetterQueueRecords the list of records to be sent to the
dead letter queue; may be {@code null}.
+ * @return a {@code Response} with a {@link
DeserializationExceptionHandler.Result#RESUME} status.
+ */
+ public static Response resume(final List<ProducerRecord<byte[],
byte[]>> deadLetterQueueRecords) {
+ return new Response(Result.RESUME, deadLetterQueueRecords);
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should
continue.
+ *
+ * @return a {@code Response} with a {@link
DeserializationExceptionHandler.Result#RESUME} status.
+ */
+ public static Response resume() {
+ return resume(Collections.emptyList());
+ }
+
+ /**
+ * Retrieves the deserialization handler result.
+ *
+ * @return the {@link Result} indicating whether processing should
continue or fail.
+ */
+ public Result result() {
+ return result;
+ }
+
+ /**
+ * Retrieves an unmodifiable list of records to be sent to the dead
letter queue.
+ * <p>
+ * If the list is {@code null}, an empty list is returned.
+ * </p>
+ *
+ * @return an unmodifiable list of {@link ProducerRecord} instances
+ * for the dead letter queue, or an empty list if no records
are available.
+ */
+ public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
+ if (deadLetterQueueRecords == null) {
+ return Collections.emptyList();
+ }
+ return Collections.unmodifiableList(deadLetterQueueRecords);
+ }
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
index 6de997be986..63972b0840d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
@@ -17,47 +17,27 @@
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
+import static
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
/**
* Deserialization handler that logs a deserialization exception and then
* signals the processing pipeline to continue processing more records.
*/
public class LogAndContinueExceptionHandler implements
DeserializationExceptionHandler {
private static final Logger log =
LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);
+ private String deadLetterQueueTopic = null;
- /**
- * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext,
ConsumerRecord, Exception)} instead.
- */
- @SuppressWarnings("deprecation")
- @Deprecated
@Override
- public DeserializationHandlerResponse handle(final ProcessorContext
context,
- final ConsumerRecord<byte[],
byte[]> record,
- final Exception exception) {
-
- log.warn(
- "Exception caught during Deserialization, taskId: {}, topic: {},
partition: {}, offset: {}",
- context.taskId(),
- record.topic(),
- record.partition(),
- record.offset(),
- exception
- );
-
- return DeserializationHandlerResponse.CONTINUE;
- }
-
- @Override
- public DeserializationHandlerResponse handle(final ErrorHandlerContext
context,
- final ConsumerRecord<byte[],
byte[]> record,
- final Exception exception) {
-
+ public Response handleError(final ErrorHandlerContext context,
+ final ConsumerRecord<byte[], byte[]> record,
+ final Exception exception) {
log.warn(
"Exception caught during Deserialization, taskId: {}, topic: {},
partition: {}, offset: {}",
context.taskId(),
@@ -67,11 +47,12 @@ public class LogAndContinueExceptionHandler implements
DeserializationExceptionH
exception
);
- return DeserializationHandlerResponse.CONTINUE;
+ return
Response.resume(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic,
context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
@Override
public void configure(final Map<String, ?> configs) {
- // ignore
+ if
(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
+ deadLetterQueueTopic =
String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
index c832ab14200..17de09e5f0c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.errors;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
@@ -23,15 +24,20 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
+import static
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
/**
* Processing exception handler that logs a processing exception and then
* signals the processing pipeline to continue processing more records.
*/
public class LogAndContinueProcessingExceptionHandler implements
ProcessingExceptionHandler {
private static final Logger log =
LoggerFactory.getLogger(LogAndContinueProcessingExceptionHandler.class);
+ private String deadLetterQueueTopic = null;
@Override
- public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
final Record<?, ?> record, final Exception exception) {
+ public Response handleError(final ErrorHandlerContext context,
+ final Record<?, ?> record,
+ final Exception exception) {
log.warn(
"Exception caught during message processing, processor node: {},
taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(),
@@ -41,12 +47,12 @@ public class LogAndContinueProcessingExceptionHandler
implements ProcessingExcep
context.offset(),
exception
);
-
- return ProcessingHandlerResponse.CONTINUE;
+ return
Response.resume(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic,
context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
@Override
public void configure(final Map<String, ?> configs) {
- // ignore
+ if
(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
+ deadLetterQueueTopic =
String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
index 20e6b9414de..6fc129b4d78 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
@@ -17,47 +17,27 @@
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
+import static
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
/**
* Deserialization handler that logs a deserialization exception and then
* signals the processing pipeline to stop processing more records and fail.
*/
public class LogAndFailExceptionHandler implements
DeserializationExceptionHandler {
private static final Logger log =
LoggerFactory.getLogger(LogAndFailExceptionHandler.class);
+ private String deadLetterQueueTopic = null;
- /**
- * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext,
ConsumerRecord, Exception)} instead.
- */
- @SuppressWarnings("deprecation")
- @Deprecated
@Override
- public DeserializationHandlerResponse handle(final ProcessorContext
context,
- final ConsumerRecord<byte[],
byte[]> record,
- final Exception exception) {
-
- log.error(
- "Exception caught during Deserialization, taskId: {}, topic: {},
partition: {}, offset: {}",
- context.taskId(),
- record.topic(),
- record.partition(),
- record.offset(),
- exception
- );
-
- return DeserializationHandlerResponse.FAIL;
- }
-
- @Override
- public DeserializationHandlerResponse handle(final ErrorHandlerContext
context,
- final ConsumerRecord<byte[],
byte[]> record,
- final Exception exception) {
-
+ public Response handleError(final ErrorHandlerContext context,
+ final ConsumerRecord<byte[], byte[]> record,
+ final Exception exception) {
log.error(
"Exception caught during Deserialization, taskId: {}, topic: {},
partition: {}, offset: {}",
context.taskId(),
@@ -67,11 +47,12 @@ public class LogAndFailExceptionHandler implements
DeserializationExceptionHandl
exception
);
- return DeserializationHandlerResponse.FAIL;
+ return
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic,
context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
@Override
public void configure(final Map<String, ?> configs) {
- // ignore
+ if
(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
+ deadLetterQueueTopic =
String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
index f592663a6c0..5372d9ad0b6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.errors;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.api.Record;
import org.slf4j.Logger;
@@ -23,15 +24,20 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
+import static
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
/**
* Processing exception handler that logs a processing exception and then
* signals the processing pipeline to stop processing more records and fail.
*/
public class LogAndFailProcessingExceptionHandler implements
ProcessingExceptionHandler {
private static final Logger log =
LoggerFactory.getLogger(LogAndFailProcessingExceptionHandler.class);
+ private String deadLetterQueueTopic = null;
@Override
- public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
final Record<?, ?> record, final Exception exception) {
+ public Response handleError(final ErrorHandlerContext context,
+ final Record<?, ?> record,
+ final Exception exception) {
log.error(
"Exception caught during message processing, processor node: {},
taskId: {}, source topic: {}, source partition: {}, source offset: {}",
context.processorNodeId(),
@@ -42,11 +48,12 @@ public class LogAndFailProcessingExceptionHandler
implements ProcessingException
exception
);
- return ProcessingHandlerResponse.FAIL;
+ return
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic,
context.sourceRawKey(), context.sourceRawValue(), context, exception));
}
@Override
public void configure(final Map<String, ?> configs) {
- // ignore
+ if
(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG) != null)
+ deadLetterQueueTopic =
String.valueOf(configs.get(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
index 7dc1b90bc2e..f4c32764877 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
@@ -16,13 +16,18 @@
*/
package org.apache.kafka.streams.errors;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.processor.api.Record;
+import java.util.Collections;
+import java.util.List;
+
/**
* An interface that allows user code to inspect a record that has failed
processing
*/
public interface ProcessingExceptionHandler extends Configurable {
+
/**
* Inspect a record and the exception received
*
@@ -34,9 +39,30 @@ public interface ProcessingExceptionHandler extends
Configurable {
* The actual exception.
*
* @return Whether to continue or stop processing.
+ * @deprecated Use {@link #handleError(ErrorHandlerContext, Record,
Exception)} instead.
+ */
+ @Deprecated
+ default ProcessingHandlerResponse handle(final ErrorHandlerContext
context, final Record<?, ?> record, final Exception exception) {
+ throw new UnsupportedOperationException();
+ };
+
+ /**
+ * Inspects a record and the exception received during processing.
+ *
+ * @param context
+ * Processing context metadata.
+ * @param record
+ * Record where the exception occurred.
+ * @param exception
+ * The actual exception.
+ *
+ * @return a {@link Response} object
*/
- ProcessingHandlerResponse handle(final ErrorHandlerContext context, final
Record<?, ?> record, final Exception exception);
+ default Response handleError(final ErrorHandlerContext context, final
Record<?, ?> record, final Exception exception) {
+ return new
Response(ProcessingExceptionHandler.Result.from(handle(context, record,
exception)), Collections.emptyList());
+ }
+ @Deprecated
enum ProcessingHandlerResponse {
/** Continue processing. */
CONTINUE(1, "CONTINUE"),
@@ -58,4 +84,138 @@ public interface ProcessingExceptionHandler extends
Configurable {
this.name = name;
}
}
+
+ /**
+ * Enumeration that describes the response from the exception handler.
+ */
+ enum Result {
+ /** Resume processing. */
+ RESUME(1, "RESUME"),
+ /** Fail processing. */
+ FAIL(2, "FAIL");
+
+ /**
+ * An english description for the used option. This is for debugging
only and may change.
+ */
+ public final String name;
+
+ /**
+ * The permanent and immutable id for the used option. This can't
change ever.
+ */
+ public final int id;
+
+ Result(final int id, final String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ /**
+ * Converts the deprecated enum ProcessingHandlerResponse into the new
Result enum.
+ *
+ * @param value the old DeserializationHandlerResponse enum value
+ * @return a {@link ProcessingExceptionHandler.Result} enum value
+ * @throws IllegalArgumentException if the provided value does not map
to a valid {@link ProcessingExceptionHandler.Result}
+ */
+ private static ProcessingExceptionHandler.Result from(final
ProcessingHandlerResponse value) {
+ switch (value) {
+ case FAIL:
+ return Result.FAIL;
+ case CONTINUE:
+ return Result.RESUME;
+ default:
+ throw new IllegalArgumentException("No Result enum found
for old value: " + value);
+ }
+ }
+ }
+
+ /**
+ * Represents the result of handling a processing exception.
+ * <p>
+ * The {@code Response} class encapsulates a {@link Result},
+ * indicating whether processing should continue or fail, along with an
optional list of
+ * {@link org.apache.kafka.clients.producer.ProducerRecord} instances to
be sent to a dead letter queue.
+ * </p>
+ */
+ class Response {
+
+ private final Result result;
+
+ private final List<ProducerRecord<byte[], byte[]>>
deadLetterQueueRecords;
+
+ /**
+ * Constructs a new {@code ProcessingExceptionResponse} object.
+ *
+ * @param result the result indicating whether processing should
continue or fail;
+ * must not be {@code null}.
+ * @param deadLetterQueueRecords the list of records to be sent to the
dead letter queue; may be {@code null}.
+ */
+ private Response(final Result result,
+ final List<ProducerRecord<byte[], byte[]>>
deadLetterQueueRecords) {
+ this.result = result;
+ this.deadLetterQueueRecords = deadLetterQueueRecords;
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should fail.
+ *
+ * @param deadLetterQueueRecords the list of records to be sent to the
dead letter queue; may be {@code null}.
+ * @return a {@code Response} with a {@link
ProcessingExceptionHandler.Result#FAIL} status.
+ */
+ public static Response fail(final List<ProducerRecord<byte[], byte[]>>
deadLetterQueueRecords) {
+ return new Response(Result.FAIL, deadLetterQueueRecords);
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should fail.
+ *
+ * @return a {@code Response} with a {@link
ProcessingExceptionHandler.Result#FAIL} status.
+ */
+ public static Response fail() {
+ return fail(Collections.emptyList());
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should
continue.
+ *
+ * @param deadLetterQueueRecords the list of records to be sent to the
dead letter queue; may be {@code null}.
+ * @return a {@code Response} with a {@link
ProcessingExceptionHandler.Result#RESUME} status.
+ */
+ public static Response resume(final List<ProducerRecord<byte[],
byte[]>> deadLetterQueueRecords) {
+ return new Response(Result.RESUME, deadLetterQueueRecords);
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should
continue.
+ *
+ * @return a {@code Response} with a {@link
ProcessingExceptionHandler.Result#RESUME} status.
+ */
+ public static Response resume() {
+ return resume(Collections.emptyList());
+ }
+
+ /**
+ * Retrieves the processing handler result.
+ *
+ * @return the {@link Result} indicating whether processing should
continue or fail.
+ */
+ public Result result() {
+ return result;
+ }
+
+ /**
+ * Retrieves an unmodifiable list of records to be sent to the dead
letter queue.
+ * <p>
+ * If the list is {@code null}, an empty list is returned.
+ * </p>
+ *
+ * @return an unmodifiable list of {@link ProducerRecord} instances
+ * for the dead letter queue, or an empty list if no records
are available.
+ */
+ public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
+ if (deadLetterQueueRecords == null) {
+ return Collections.emptyList();
+ }
+ return Collections.unmodifiableList(deadLetterQueueRecords);
+ }
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
index ed6b38a5692..717866a9bed 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
@@ -19,6 +19,9 @@ package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable;
+import java.util.Collections;
+import java.util.List;
+
/**
* Interface that specifies how an exception when attempting to produce a
result to
* Kafka should be handled.
@@ -55,13 +58,34 @@ public interface ProductionExceptionHandler extends
Configurable {
* The exception that occurred during production.
*
* @return Whether to continue or stop processing, or retry the failed
operation.
+ * @deprecated Use {@link #handleError(ErrorHandlerContext,
ProducerRecord, Exception)} instead.
*/
+ @Deprecated
default ProductionExceptionHandlerResponse handle(final
ErrorHandlerContext context,
final
ProducerRecord<byte[], byte[]> record,
final Exception
exception) {
return handle(record, exception);
}
+ /**
+ * Inspect a record that we attempted to produce, and the exception that
resulted
+ * from attempting to produce it and determine to continue or stop
processing.
+ *
+ * @param context
+ * The error handler context metadata.
+ * @param record
+ * The record that failed to produce.
+ * @param exception
+ * The exception that occurred during production.
+ *
+ * @return a {@link Response} object
+ */
+ default Response handleError(final ErrorHandlerContext context,
+ final ProducerRecord<byte[], byte[]> record,
+ final Exception exception) {
+ return new Response(Result.from(handle(context, record, exception)),
Collections.emptyList());
+ }
+
/**
* Handles serialization exception and determine if the process should
continue. The default implementation is to
* fail the process.
@@ -79,7 +103,7 @@ public interface ProductionExceptionHandler extends
Configurable {
@Deprecated
default ProductionExceptionHandlerResponse
handleSerializationException(final ProducerRecord record,
final Exception exception) {
- return ProductionExceptionHandlerResponse.FAIL;
+ return
ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL;
}
/**
@@ -96,8 +120,11 @@ public interface ProductionExceptionHandler extends
Configurable {
* The origin of the serialization exception.
*
* @return Whether to continue or stop processing, or retry the failed
operation.
+ *
+ * @deprecated Use {@link #handleSerializationError(ErrorHandlerContext,
ProducerRecord, Exception, SerializationExceptionOrigin)} instead.
*/
@SuppressWarnings("rawtypes")
+ @Deprecated
default ProductionExceptionHandlerResponse
handleSerializationException(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
@@ -105,6 +132,30 @@ public interface ProductionExceptionHandler extends
Configurable {
return handleSerializationException(record, exception);
}
+ /**
+ * Handles serialization exception and determine if the process should
continue. The default implementation is to
+ * fail the process.
+ *
+ * @param context
+ * The error handler context metadata.
+ * @param record
+ * The record that failed to serialize.
+ * @param exception
+ * The exception that occurred during serialization.
+ * @param origin
+ * The origin of the serialization exception.
+ *
+ * @return a {@link Response} object
+ */
+ @SuppressWarnings("rawtypes")
+ default Response handleSerializationError(final ErrorHandlerContext
context,
+ final ProducerRecord record,
+ final Exception exception,
+ final
SerializationExceptionOrigin origin) {
+ return new Response(Result.from(handleSerializationException(context,
record, exception, origin)), Collections.emptyList());
+ }
+
+ @Deprecated
enum ProductionExceptionHandlerResponse {
/** Continue processing.
*
@@ -147,10 +198,174 @@ public interface ProductionExceptionHandler extends
Configurable {
}
}
+ /**
+ * Enumeration that describes the response from the exception handler.
+ */
+ enum Result {
+ /** Resume processing.
+ *
+ * <p> For this case, output records which could not be written
successfully are lost.
+ * Use this option only if you can tolerate data loss.
+ */
+ RESUME(0, "RESUME"),
+ /** Fail processing.
+ *
+ * <p> Kafka Streams will raise an exception and the {@code
StreamsThread} will fail.
+ * No offsets (for {@link
org.apache.kafka.streams.StreamsConfig#AT_LEAST_ONCE at-least-once}) or
transactions
+ * (for {@link org.apache.kafka.streams.StreamsConfig#EXACTLY_ONCE_V2
exactly-once}) will be committed.
+ */
+ FAIL(1, "FAIL"),
+ /** Retry the failed operation.
+ *
+ * <p> Retrying might imply that a {@link TaskCorruptedException}
exception is thrown, and that the retry
+ * is started from the last committed offset.
+ *
+ * <p> <b>NOTE:</b> {@code RETRY} is only a valid return value for
+ * {@link org.apache.kafka.common.errors.RetriableException retriable
exceptions}.
+ * If {@code RETRY} is returned for a non-retriable exception it will
be interpreted as {@link #FAIL}.
+ */
+ RETRY(2, "RETRY");
+
+ /**
+ * An english description for the used option. This is for debugging
only and may change.
+ */
+ public final String name;
+
+ /**
+ * The permanent and immutable id for the used option. This can't
change ever.
+ */
+ public final int id;
+
+ Result(final int id, final String name) {
+ this.id = id;
+ this.name = name;
+ }
+
+ /**
+ * Converts the deprecated enum ProductionExceptionHandlerResponse
into the new Result enum.
+ *
+ * @param value the old ProductionExceptionHandlerResponse enum value
+ * @return a {@link ProductionExceptionHandler.Result} enum value
+ * @throws IllegalArgumentException if the provided value does not map
to a valid {@link ProductionExceptionHandler.Result}
+ */
+ private static ProductionExceptionHandler.Result from(final
ProductionExceptionHandlerResponse value) {
+ switch (value) {
+ case FAIL:
+ return Result.FAIL;
+ case CONTINUE:
+ return Result.RESUME;
+ case RETRY:
+ return Result.RETRY;
+ default:
+ throw new IllegalArgumentException("No Result enum found
for old value: " + value);
+ }
+ }
+ }
+
enum SerializationExceptionOrigin {
/** Serialization exception occurred during serialization of the key.
*/
KEY,
/** Serialization exception occurred during serialization of the
value. */
VALUE
}
+
+ /**
+ * Represents the result of handling a production exception.
+ * <p>
+ * The {@code Response} class encapsulates a {@link Result},
+ * indicating whether processing should continue or fail, along with an
optional list of
+ * {@link ProducerRecord} instances to be sent to a dead letter queue.
+ * </p>
+ */
+ class Response {
+
+ private final Result result;
+
+ private final List<ProducerRecord<byte[], byte[]>>
deadLetterQueueRecords;
+
+ /**
+ * Constructs a new {@code Response} object.
+ *
+ * @param result the result indicating whether processing should
continue or fail;
+ * must not be {@code null}.
+ * @param deadLetterQueueRecords the list of records to be sent to the
dead letter queue; may be {@code null}.
+ */
+ private Response(final Result result,
+ final List<ProducerRecord<byte[], byte[]>>
deadLetterQueueRecords) {
+ this.result = result;
+ this.deadLetterQueueRecords = deadLetterQueueRecords;
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should fail.
+ *
+ * @param deadLetterQueueRecords the list of records to be sent to the
dead letter queue; may be {@code null}.
+ * @return a {@code Response} with a {@link
ProductionExceptionHandler.Result#FAIL} status.
+ */
+ public static Response fail(final List<ProducerRecord<byte[], byte[]>>
deadLetterQueueRecords) {
+ return new Response(Result.FAIL, deadLetterQueueRecords);
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should fail.
+ *
+ * @return a {@code Response} with a {@link
ProductionExceptionHandler.Result#FAIL} status.
+ */
+ public static Response fail() {
+ return fail(Collections.emptyList());
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should
continue.
+ *
+ * @param deadLetterQueueRecords the list of records to be sent to the
dead letter queue; may be {@code null}.
+ * @return a {@code Response} with a {@link
ProductionExceptionHandler.Result#RESUME} status.
+ */
+ public static Response resume(final List<ProducerRecord<byte[],
byte[]>> deadLetterQueueRecords) {
+ return new Response(Result.RESUME, deadLetterQueueRecords);
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should
continue.
+ *
+ * @return a {@code Response} with a {@link
ProductionExceptionHandler.Result#RESUME} status.
+ */
+ public static Response resume() {
+ return resume(Collections.emptyList());
+ }
+
+ /**
+ * Creates a {@code Response} indicating that processing should retry.
+ *
+ * @return a {@code Response} with a {@link
ProductionExceptionHandler.Result#RETRY} status.
+ */
+ public static Response retry() {
+ return new Response(Result.RETRY, Collections.emptyList());
+ }
+
+ /**
+ * Retrieves the production exception handler result.
+ *
+ * @return the {@link Result} indicating whether processing should
continue, fail or retry.
+ */
+ public Result result() {
+ return result;
+ }
+
+ /**
+ * Retrieves an unmodifiable list of records to be sent to the dead
letter queue.
+ * <p>
+ * If the list is {@code null}, an empty list is returned.
+ * </p>
+ *
+ * @return an unmodifiable list of {@link ProducerRecord} instances
+ * for the dead letter queue, or an empty list if no records
are available.
+ */
+ public List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords() {
+ if (deadLetterQueueRecords == null) {
+ return Collections.emptyList();
+ }
+ return Collections.unmodifiableList(deadLetterQueueRecords);
+ }
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
new file mode 100644
index 00000000000..d3fd221cea8
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@code ExceptionHandlerUtils} Contains utilities method that could be used
by all exception handlers
+ */
+public class ExceptionHandlerUtils {
+ public static final String HEADER_ERRORS_EXCEPTION_NAME =
"__streams.errors.exception";
+ public static final String HEADER_ERRORS_STACKTRACE_NAME =
"__streams.errors.stacktrace";
+ public static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME =
"__streams.errors.message";
+ public static final String HEADER_ERRORS_TOPIC_NAME =
"__streams.errors.topic";
+ public static final String HEADER_ERRORS_PARTITION_NAME =
"__streams.errors.partition";
+ public static final String HEADER_ERRORS_OFFSET_NAME =
"__streams.errors.offset";
+
+
+ public static boolean shouldBuildDeadLetterQueueRecord(final String
deadLetterQueueTopicName) {
+ return deadLetterQueueTopicName != null;
+ }
+
+ /**
+ * If required, return Dead Letter Queue records for the provided exception
+ *
+ * @param key Serialized key for the records
+ * @param value Serialized value for the records
+ * @param context ErrorHandlerContext of the exception
+ * @param exception Thrown exception
+ * @return A list of Dead Letter Queue records to produce
+ */
+ public static List<ProducerRecord<byte[], byte[]>>
maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName,
+
final byte[] key,
+
final byte[] value,
+
final ErrorHandlerContext context,
+
final Exception exception) {
+ if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) {
+ return Collections.emptyList();
+ }
+
+ return
Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName,
key, value, context, exception));
+ }
+
+
+ /**
+ * Build dead letter queue record for the provided exception.
+ *
+ * @param key Serialized key for the record.
+ * @param value Serialized value for the record.
+ * @param context error handler context of the exception.
+ * @return A dead letter queue record to produce.
+ */
+ public static ProducerRecord<byte[], byte[]>
buildDeadLetterQueueRecord(final String deadLetterQueueTopicName,
+ final
byte[] key,
+ final
byte[] value,
+ final
ErrorHandlerContext context,
+ final
Exception e) {
+ if (deadLetterQueueTopicName == null) {
+ throw new InvalidConfigurationException(String.format("%s cannot
be null while building dead letter queue record",
StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
+ }
+ final ProducerRecord<byte[], byte[]> producerRecord = new
ProducerRecord<>(deadLetterQueueTopicName, null, context.timestamp(), key,
value);
+ final StringWriter stackTraceStringWriter = new StringWriter();
+ final PrintWriter stackTracePrintWriter = new
PrintWriter(stackTraceStringWriter);
+ e.printStackTrace(stackTracePrintWriter);
+
+ try (final StringSerializer stringSerializer = new StringSerializer())
{
+ producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME,
stringSerializer.serialize(null, e.toString()));
+ producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME,
stringSerializer.serialize(null, e.getMessage()));
+ producerRecord.headers().add(HEADER_ERRORS_STACKTRACE_NAME,
stringSerializer.serialize(null, stackTraceStringWriter.toString()));
+ producerRecord.headers().add(HEADER_ERRORS_TOPIC_NAME,
stringSerializer.serialize(null, context.topic()));
+ producerRecord.headers().add(HEADER_ERRORS_PARTITION_NAME,
stringSerializer.serialize(null, String.valueOf(context.partition())));
+ producerRecord.headers().add(HEADER_ERRORS_OFFSET_NAME,
stringSerializer.serialize(null, String.valueOf(context.offset())));
+ }
+
+ return producerRecord;
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 1dddc55ca3c..bbf82ff9033 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
@@ -220,11 +221,11 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
internalProcessorContext.recordContext().sourceRawValue()
);
- final ProcessingExceptionHandler.ProcessingHandlerResponse
response;
+ final ProcessingExceptionHandler.Response response;
try {
response = Objects.requireNonNull(
- processingExceptionHandler.handle(errorHandlerContext,
record, processingException),
- "Invalid ProductionExceptionHandler response."
+
processingExceptionHandler.handleError(errorHandlerContext, record,
processingException),
+ "Invalid ProcessingExceptionHandler response."
);
} catch (final Exception fatalUserException) {
// while Java distinguishes checked vs unchecked exceptions,
other languages
@@ -242,7 +243,21 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
);
}
- if (response ==
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
+ final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords
= response.deadLetterQueueRecords();
+ if (!deadLetterQueueRecords.isEmpty()) {
+ final RecordCollector collector = ((RecordCollector.Supplier)
internalProcessorContext).recordCollector();
+ for (final ProducerRecord<byte[], byte[]>
deadLetterQueueRecord : deadLetterQueueRecords) {
+ collector.send(
+ deadLetterQueueRecord.key(),
+ deadLetterQueueRecord.value(),
+ name(),
+ internalProcessorContext,
+ deadLetterQueueRecord
+ );
+ }
+ }
+
+ if (response.result() == ProcessingExceptionHandler.Result.FAIL) {
log.error("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the
streaming pipeline" +
" continue after a processing error, please set the " +
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index a48a671d460..2c14200f413 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
@@ -48,6 +49,12 @@ public interface RecordCollector {
final InternalProcessorContext<Void, Void> context,
final StreamPartitioner<? super K, ? super V>
partitioner);
+ <K, V> void send(K key,
+ V value,
+ String processorNodeId,
+ InternalProcessorContext<?, ?> context,
+ ProducerRecord<byte[], byte[]> serializedRecord);
+
/**
* Initialize the internal {@link Producer}; note this function should be
made idempotent
*
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 89cbf4d4c7d..c4178733ef3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -41,7 +41,6 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
-import
org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
@@ -263,6 +262,15 @@ public class RecordCollectorImpl implements
RecordCollector {
// freeing raw records in the context to reduce memory pressure
freeRawInputRecordFromContext(context);
+ send(key, value, processorNodeId, context, serializedRecord);
+ }
+
+ public <K, V> void send(final K key,
+ final V value,
+ final String processorNodeId,
+ final InternalProcessorContext<?, ?> context,
+ final ProducerRecord<byte[], byte[]>
serializedRecord) {
+
streamsProducer.send(serializedRecord, (metadata, exception) -> {
try {
// if there's already an exception record, skip logging
offsets or new exceptions
@@ -278,16 +286,16 @@ public class RecordCollectorImpl implements
RecordCollector {
log.warn("Received offset={} in produce response for
{}", metadata.offset(), tp);
}
- if (!topic.endsWith("-changelog")) {
+ if (!serializedRecord.topic().endsWith("-changelog")) {
// we may not have created a sensor during
initialization if the node uses dynamic topic routing,
// as all topics are not known up front, so create the
sensor for this topic if absent
final Sensor topicProducedSensor =
producedSensorByTopic.computeIfAbsent(
- topic,
+ serializedRecord.topic(),
t -> TopicMetrics.producedSensor(
Thread.currentThread().getName(),
taskId.toString(),
processorNodeId,
- topic,
+ serializedRecord.topic(),
context.metrics()
)
);
@@ -299,7 +307,7 @@ public class RecordCollectorImpl implements RecordCollector
{
}
} else {
recordSendError(
- topic,
+ serializedRecord.topic(),
exception,
serializedRecord,
context,
@@ -307,7 +315,7 @@ public class RecordCollectorImpl implements RecordCollector
{
);
// KAFKA-7510 only put message key and value in TRACE
level log so we don't leak data by default
- log.trace("Failed record: (key {} value {} timestamp {})
topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
+ log.trace("Failed record: (key {} value {} timestamp {})
topic=[{}] partition=[{}]", key, value, serializedRecord.timestamp(),
serializedRecord.topic(), serializedRecord.partition());
}
} catch (final RuntimeException fatal) {
sendException.set(new StreamsException("Producer.send
`Callback` failed", fatal));
@@ -329,16 +337,16 @@ public class RecordCollectorImpl implements
RecordCollector {
final Integer partition,
final Long timestamp,
final String processorNodeId,
- final InternalProcessorContext<Void,
Void> context,
+ final InternalProcessorContext<?, ?>
context,
final Exception
serializationException) {
log.debug(String.format("Error serializing record for topic %s",
topic), serializationException);
final ProducerRecord<K, V> record = new ProducerRecord<>(topic,
partition, timestamp, key, value, headers);
- final ProductionExceptionHandlerResponse response;
+ final ProductionExceptionHandler.Response response;
try {
response = Objects.requireNonNull(
- productionExceptionHandler.handleSerializationException(
+ productionExceptionHandler.handleSerializationError(
errorHandlerContext(context, processorNodeId),
record,
serializationException,
@@ -365,7 +373,20 @@ public class RecordCollectorImpl implements
RecordCollector {
);
}
- if (maybeFailResponse(response) ==
ProductionExceptionHandlerResponse.FAIL) {
+ final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords =
response.deadLetterQueueRecords();
+ if (!deadLetterQueueRecords.isEmpty()) {
+ for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord :
deadLetterQueueRecords) {
+ this.send(
+ deadLetterQueueRecord.key(),
+ deadLetterQueueRecord.value(),
+ processorNodeId,
+ context,
+ deadLetterQueueRecord
+ );
+ }
+ }
+
+ if (maybeFailResponse(response.result()) ==
ProductionExceptionHandler.Result.FAIL) {
throw new StreamsException(
String.format(
"Unable to serialize record. ProducerRecord(topic=[%s],
partition=[%d], timestamp=[%d]",
@@ -385,7 +406,7 @@ public class RecordCollectorImpl implements RecordCollector
{
droppedRecordsSensor.record();
}
- private DefaultErrorHandlerContext errorHandlerContext(final
InternalProcessorContext<Void, Void> context,
+ private DefaultErrorHandlerContext errorHandlerContext(final
InternalProcessorContext<?, ?> context,
final String
processorNodeId) {
final RecordContext recordContext = context != null ?
context.recordContext() : null;
@@ -442,7 +463,7 @@ public class RecordCollectorImpl implements RecordCollector
{
private void recordSendError(final String topic,
final Exception productionException,
final ProducerRecord<byte[], byte[]>
serializedRecord,
- final InternalProcessorContext<Void, Void>
context,
+ final InternalProcessorContext<?, ?> context,
final String processorNodeId) {
String errorMessage = String.format(SEND_EXCEPTION_MESSAGE, topic,
taskId, productionException.toString());
@@ -462,10 +483,10 @@ public class RecordCollectorImpl implements
RecordCollector {
// TransactionAbortedException is only thrown after
`abortTransaction()` was called,
// so it's only a followup error, and Kafka Streams is already
handling the original error
} else {
- final ProductionExceptionHandlerResponse response;
+ final ProductionExceptionHandler.Response response;
try {
response = Objects.requireNonNull(
- productionExceptionHandler.handle(
+ productionExceptionHandler.handleError(
errorHandlerContext(context, processorNodeId),
serializedRecord,
productionException
@@ -490,14 +511,27 @@ public class RecordCollectorImpl implements
RecordCollector {
return;
}
- if (productionException instanceof RetriableException && response
== ProductionExceptionHandlerResponse.RETRY) {
+ final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords
= response.deadLetterQueueRecords();
+ if (!deadLetterQueueRecords.isEmpty()) {
+ for (final ProducerRecord<byte[], byte[]>
deadLetterQueueRecord : deadLetterQueueRecords) {
+ this.send(
+ deadLetterQueueRecord.key(),
+ deadLetterQueueRecord.value(),
+ processorNodeId,
+ context,
+ deadLetterQueueRecord
+ );
+ }
+ }
+
+ if (productionException instanceof RetriableException &&
response.result() == ProductionExceptionHandler.Result.RETRY) {
errorMessage += "\nThe broker is either slow or in bad state
(like not having enough replicas) in responding the request, " +
"or the connection to broker was interrupted sending the
request or receiving the response. " +
"\nConsider overwriting `max.block.ms` and /or " +
"`delivery.timeout.ms` to a larger value to wait longer
for such scenarios and avoid timeout errors";
sendException.set(new
TaskCorruptedException(Collections.singleton(taskId)));
} else {
- if (maybeFailResponse(response) ==
ProductionExceptionHandlerResponse.FAIL) {
+ if (maybeFailResponse(response.result()) ==
ProductionExceptionHandler.Result.FAIL) {
errorMessage += "\nException handler choose to FAIL the
processing, no more records would be sent.";
sendException.set(new StreamsException(errorMessage,
productionException));
} else {
@@ -510,12 +544,12 @@ public class RecordCollectorImpl implements
RecordCollector {
log.error(errorMessage, productionException);
}
- private ProductionExceptionHandlerResponse maybeFailResponse(final
ProductionExceptionHandlerResponse response) {
- if (response == ProductionExceptionHandlerResponse.RETRY) {
+ private ProductionExceptionHandler.Result maybeFailResponse(final
ProductionExceptionHandler.Result result) {
+ if (result == ProductionExceptionHandler.Result.RETRY) {
log.warn("ProductionExceptionHandler returned RETRY for a
non-retriable exception. Will treat it as FAIL.");
- return ProductionExceptionHandlerResponse.FAIL;
+ return ProductionExceptionHandler.Result.FAIL;
} else {
- return response;
+ return result;
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 153ca2e02f1..88e756d785a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -17,17 +17,18 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
-import
org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.slf4j.Logger;
+import java.util.List;
import java.util.Objects;
import static
org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
@@ -50,7 +51,7 @@ public class RecordDeserializer {
/**
* @throws StreamsException if a deserialization error occurs and the
deserialization callback returns
- * {@link DeserializationHandlerResponse#FAIL
FAIL}
+ * {@link
DeserializationExceptionHandler.Result#FAIL FAIL}
* or throws an exception itself
*/
ConsumerRecord<Object, Object> deserialize(final ProcessorContext<?, ?>
processorContext,
@@ -100,11 +101,11 @@ public class RecordDeserializer {
rawRecord.value()
);
- final DeserializationHandlerResponse response;
+ final DeserializationExceptionHandler.Response response;
try {
response = Objects.requireNonNull(
- deserializationExceptionHandler.handle(errorHandlerContext,
rawRecord, deserializationException),
- "Invalid DeserializationExceptionHandler response."
+
deserializationExceptionHandler.handleError(errorHandlerContext, rawRecord,
deserializationException),
+ "Invalid DeserializationExceptionResponse response."
);
} catch (final Exception fatalUserException) {
// while Java distinguishes checked vs unchecked exceptions, other
languages
@@ -118,7 +119,21 @@ public class RecordDeserializer {
throw new StreamsException("Fatal user code error in
deserialization error callback", fatalUserException);
}
- if (response == DeserializationHandlerResponse.FAIL) {
+ final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords =
response.deadLetterQueueRecords();
+ if (!deadLetterQueueRecords.isEmpty()) {
+ final RecordCollector collector = ((RecordCollector.Supplier)
processorContext).recordCollector();
+ for (final ProducerRecord<byte[], byte[]> deadLetterQueueRecord :
deadLetterQueueRecords) {
+ collector.send(
+ deadLetterQueueRecord.key(),
+ deadLetterQueueRecord.value(),
+ sourceNodeName,
+ (InternalProcessorContext) processorContext,
+ deadLetterQueueRecord
+ );
+ }
+ }
+
+ if (response.result() == DeserializationExceptionHandler.Result.FAIL) {
throw new StreamsException("Deserialization exception handler is
set to fail upon" +
" a deserialization error. If you would rather have the
streaming pipeline" +
" continue after a deserialization error, please set the " +
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 82e9c8d7fb1..42b57e46aa4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
@@ -945,10 +946,10 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
recordContext.sourceRawValue()
);
- final ProcessingExceptionHandler.ProcessingHandlerResponse
response;
+ final ProcessingExceptionHandler.Response
processingExceptionResponse;
try {
- response = Objects.requireNonNull(
- processingExceptionHandler.handle(errorHandlerContext,
null, processingException),
+ processingExceptionResponse = Objects.requireNonNull(
+
processingExceptionHandler.handleError(errorHandlerContext, null,
processingException),
"Invalid ProcessingExceptionHandler response."
);
} catch (final Exception fatalUserException) {
@@ -963,7 +964,20 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
throw new FailedProcessingException("Fatal user code error in
processing error callback", node.name(), fatalUserException);
}
- if (response ==
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
+ final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords
= processingExceptionResponse.deadLetterQueueRecords();
+ if (!deadLetterQueueRecords.isEmpty()) {
+ final RecordCollector collector = ((RecordCollector.Supplier)
processorContext).recordCollector();
+ for (final ProducerRecord<byte[], byte[]>
deadLetterQueueRecord : deadLetterQueueRecords) {
+ collector.send(
+ deadLetterQueueRecord.key(),
+ deadLetterQueueRecord.value(),
+ node.name(),
+ processorContext,
+ deadLetterQueueRecord);
+ }
+ }
+
+ if (processingExceptionResponse.result() ==
ProcessingExceptionHandler.Result.FAIL) {
log.error("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the
streaming pipeline" +
" continue after a processing error, please set the " +
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 6505cde08ed..bd8002782d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1683,6 +1683,11 @@ public class StreamsConfigTest {
"Please set group.protocol=classic or remove group.instance.id
from the configuration."));
}
+ public void shouldSetDefaultDeadLetterQueue() {
+ final StreamsConfig config = new StreamsConfig(props);
+
assertNull(config.getString(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
+ }
+
static class MisconfiguredSerde implements Serde<Object> {
@Override
public void configure(final Map<String, ?> configs, final boolean
isKey) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
new file mode 100644
index 00000000000..915f3a3f650
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
+import org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@ExtendWith(MockitoExtension.class)
+public class ExceptionHandlerUtilsTest {
+ @Test
+ public void checkDeadLetterQueueRecords() {
+ final StringSerializer stringSerializer = new StringSerializer();
+ final StringDeserializer stringDeserializer = new StringDeserializer();
+ final MockRecordCollector collector = new MockRecordCollector();
+ final String key = "key";
+ final String value = "value";
+ final InternalProcessorContext<Object, Object>
internalProcessorContext = new InternalMockProcessorContext<>(
+ new StateSerdes<>("sink", Serdes.ByteArray(),
Serdes.ByteArray()),
+ collector
+ );
+ internalProcessorContext.setRecordContext(new ProcessorRecordContext(
+ 1L,
+ 2,
+ 3,
+ "source",
+ new RecordHeaders(Collections.singletonList(
+ new RecordHeader("sourceHeader",
stringSerializer.serialize(null, "hello world")))),
+ key.getBytes(),
+ value.getBytes()
+ ));
+ final ErrorHandlerContext errorHandlerContext =
getErrorHandlerContext(internalProcessorContext);
+
+ final NullPointerException exception = new
NullPointerException("Oopsie!");
+ final Iterable<ProducerRecord<byte[], byte[]>> dlqRecords =
ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords("dlq",
errorHandlerContext.sourceRawKey(), errorHandlerContext.sourceRawValue(),
errorHandlerContext, exception);
+ final Iterator<ProducerRecord<byte[], byte[]>> iterator =
dlqRecords.iterator();
+
+ assertTrue(iterator.hasNext());
+ final ProducerRecord<byte[], byte[]> dlqRecord = iterator.next();
+ final Headers headers = dlqRecord.headers();
+ assertFalse(iterator.hasNext()); // There should be only one record
+
+ assertEquals("dlq", dlqRecord.topic());
+ assertEquals(errorHandlerContext.timestamp(), dlqRecord.timestamp());
+ assertEquals(1, dlqRecord.timestamp());
+ assertEquals(key, new String(dlqRecord.key()));
+ assertEquals(value, new String(dlqRecord.value()));
+ assertEquals(exception.toString(),
stringDeserializer.deserialize(null,
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_NAME).value()));
+ assertEquals(exception.getMessage(),
stringDeserializer.deserialize(null,
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_EXCEPTION_MESSAGE_NAME).value()));
+ assertEquals("source", stringDeserializer.deserialize(null,
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_TOPIC_NAME).value()));
+ assertEquals("3", stringDeserializer.deserialize(null,
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_PARTITION_NAME).value()));
+ assertEquals("2", stringDeserializer.deserialize(null,
headers.lastHeader(ExceptionHandlerUtils.HEADER_ERRORS_OFFSET_NAME).value()));
+ }
+
+ @Test
+ public void doNotBuildDeadLetterQueueRecordsIfNotConfigured() {
+ final NullPointerException exception = new
NullPointerException("Oopsie!");
+ final Iterable<ProducerRecord<byte[], byte[]>> dlqRecords =
ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords(null, null, null, null,
exception);
+ final Iterator<ProducerRecord<byte[], byte[]>> iterator =
dlqRecords.iterator();
+
+ assertFalse(iterator.hasNext());
+ }
+
+ private static DefaultErrorHandlerContext getErrorHandlerContext(final
InternalProcessorContext<Object, Object> internalProcessorContext) {
+ return new DefaultErrorHandlerContext(
+ null,
+ internalProcessorContext.topic(),
+ internalProcessorContext.partition(),
+ internalProcessorContext.offset(),
+ internalProcessorContext.headers(),
+ internalProcessorContext.currentNode().name(),
+ internalProcessorContext.taskId(),
+ internalProcessorContext.timestamp(),
+ internalProcessorContext.recordContext().sourceRawKey(),
+ internalProcessorContext.recordContext().sourceRawValue());
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index e9669ac39f4..6b7c06580c0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
@@ -29,6 +30,8 @@ import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
+import
org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
@@ -39,7 +42,9 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Test;
@@ -52,10 +57,13 @@ import org.mockito.quality.Strictness;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import static
org.apache.kafka.streams.errors.ProcessingExceptionHandler.Response;
+import static
org.apache.kafka.streams.errors.ProcessingExceptionHandler.Result;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -103,7 +111,7 @@ public class ProcessorNodeTest {
new ProcessorNode<>(NAME, new
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
- node.init(internalProcessorContext, new
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL,
internalProcessorContext, false));
+ node.init(internalProcessorContext, new
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.fail(),
internalProcessorContext, false));
final FailedProcessingException failedProcessingException =
assertThrows(FailedProcessingException.class,
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
@@ -120,7 +128,7 @@ public class ProcessorNodeTest {
new ProcessorNode<>(NAME, new
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
- node.init(internalProcessorContext, new
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
internalProcessorContext, false));
+ node.init(internalProcessorContext, new
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.resume(),
internalProcessorContext, false));
assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE,
TIMESTAMP)));
}
@@ -147,7 +155,7 @@ public class ProcessorNodeTest {
assertEquals(ignoredExceptionCause,
runtimeException.getCause().getClass());
assertEquals(ignoredExceptionCauseMessage,
runtimeException.getCause().getMessage());
- verify(processingExceptionHandler, never()).handle(any(), any(),
any());
+ verify(processingExceptionHandler, never()).handleError(any(), any(),
any());
}
@Test
@@ -156,7 +164,7 @@ public class ProcessorNodeTest {
new ProcessorNode<>(NAME, new
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
- node.init(internalProcessorContext, new
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
internalProcessorContext, true));
+ node.init(internalProcessorContext, new
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.Response.resume(),
internalProcessorContext, true));
final FailedProcessingException failedProcessingException =
assertThrows(FailedProcessingException.class,
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
@@ -166,6 +174,58 @@ public class ProcessorNodeTest {
assertEquals(NAME,
failedProcessingException.failedProcessorNodeName());
}
+
+ @Test
+ public void
shouldBuildDeadLetterQueueRecordsInDefaultProcessingExceptionHandler() {
+ final ProcessorNode<Object, Object, Object, Object> node = new
ProcessorNode<>("processor",
+ (Processor<Object, Object, Object, Object>) record -> {
+ throw new NullPointerException("Oopsie!");
+ }, Collections.emptySet());
+
+ final MockRecordCollector collector = new MockRecordCollector();
+ final InternalProcessorContext<Object, Object>
internalProcessorContext =
+ new InternalMockProcessorContext<>(
+ new StateSerdes<>("sink", Serdes.ByteArray(),
Serdes.ByteArray()),
+ collector
+ );
+ final ProcessingExceptionHandler processingExceptionHandler = new
LogAndFailProcessingExceptionHandler();
+
processingExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
"dlq"));
+ node.init(internalProcessorContext, processingExceptionHandler);
+
+ assertThrows(RuntimeException.class,
+ () -> node.process(new Record<>("hello", "world", 1L)));
+
+ assertEquals(1, collector.collected().size());
+ assertEquals("dlq", collector.collected().get(0).topic());
+ assertEquals("sourceKey", new String((byte[])
collector.collected().get(0).key()));
+ assertEquals("sourceValue", new String((byte[])
collector.collected().get(0).value()));
+ }
+
+ @Test
+ public void
shouldBuildDeadLetterQueueRecordsInLogAndContinueProcessingExceptionHandler() {
+ final ProcessorNode<Object, Object, Object, Object> node = new
ProcessorNode<>("processor",
+ (Processor<Object, Object, Object, Object>) record -> {
+ throw new NullPointerException("Oopsie!");
+ }, Collections.emptySet());
+
+ final MockRecordCollector collector = new MockRecordCollector();
+ final InternalProcessorContext<Object, Object>
internalProcessorContext =
+ new InternalMockProcessorContext<>(
+ new StateSerdes<>("sink", Serdes.ByteArray(),
Serdes.ByteArray()),
+ collector
+ );
+ final ProcessingExceptionHandler processingExceptionHandler = new
LogAndContinueProcessingExceptionHandler();
+
processingExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
"dlq"));
+ node.init(internalProcessorContext, processingExceptionHandler);
+
+ node.process(new Record<>("hello", "world", 0L));
+
+ assertEquals(1, collector.collected().size());
+ assertEquals("dlq", collector.collected().get(0).topic());
+ assertEquals("sourceKey", new String((byte[])
collector.collected().get(0).key()));
+ assertEquals("sourceValue", new String((byte[])
collector.collected().get(0).value()));
+ }
+
private static class ExceptionalProcessor implements Processor<Object,
Object, Object, Object> {
@Override
public void init(final ProcessorContext<Object, Object> context) {
@@ -318,6 +378,64 @@ public class ProcessorNodeTest {
assertTrue(se.getMessage().contains("pname"));
}
+ @Test
+ void shouldFailWithDeadLetterQueueRecords() {
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+ final List<ProducerRecord<byte[], byte[]>> records =
Collections.singletonList(record);
+
+ final Response response = Response.fail(records);
+
+ assertEquals(Result.FAIL, response.result());
+ assertEquals(1, response.deadLetterQueueRecords().size());
+ assertEquals(record, response.deadLetterQueueRecords().get(0));
+ }
+
+ @Test
+ void shouldFailWithoutDeadLetterQueueRecords() {
+ final Response response = Response.fail();
+
+ assertEquals(Result.FAIL, response.result());
+ assertTrue(response.deadLetterQueueRecords().isEmpty());
+ }
+
+ @Test
+ void shouldResumeWithDeadLetterQueueRecords() {
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+ final List<ProducerRecord<byte[], byte[]>> records =
Collections.singletonList(record);
+
+ final Response response = Response.resume(records);
+
+ assertEquals(Result.RESUME, response.result());
+ assertEquals(1, response.deadLetterQueueRecords().size());
+ assertEquals(record, response.deadLetterQueueRecords().get(0));
+ }
+
+ @Test
+ void shouldResumeWithoutDeadLetterQueueRecords() {
+ final Response response = Response.resume();
+
+ assertEquals(Result.RESUME, response.result());
+ assertTrue(response.deadLetterQueueRecords().isEmpty());
+ }
+
+
+ @Test
+ void shouldNotBeModifiable() {
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+ final List<ProducerRecord<byte[], byte[]>> records =
Collections.singletonList(record);
+
+ final Response response = Response.fail(records);
+
+ assertThrows(UnsupportedOperationException.class, () ->
response.deadLetterQueueRecords().add(record));
+ }
+
+ @Test
+ void shouldReturnsEmptyList() {
+ final Response response = Response.fail();
+
+ assertTrue(response.deadLetterQueueRecords().isEmpty());
+ }
+
@SuppressWarnings("unchecked")
private InternalProcessorContext<Object, Object>
mockInternalProcessorContext() {
final InternalProcessorContext<Object, Object>
internalProcessorContext = mock(InternalProcessorContext.class,
withSettings().strictness(Strictness.LENIENT));
@@ -342,12 +460,12 @@ public class ProcessorNodeTest {
}
public static class ProcessingExceptionHandlerMock implements
ProcessingExceptionHandler {
- private final ProcessingExceptionHandler.ProcessingHandlerResponse
response;
+ private final Response response;
private final InternalProcessorContext<Object, Object>
internalProcessorContext;
private final boolean shouldThrowException;
- public ProcessingExceptionHandlerMock(final
ProcessingExceptionHandler.ProcessingHandlerResponse response,
+ public ProcessingExceptionHandlerMock(final Response response,
final
InternalProcessorContext<Object, Object> internalProcessorContext,
final boolean
shouldThrowException) {
this.response = response;
@@ -356,7 +474,7 @@ public class ProcessorNodeTest {
}
@Override
- public ProcessingExceptionHandler.ProcessingHandlerResponse
handle(final ErrorHandlerContext context, final Record<?, ?> record, final
Exception exception) {
+ public Response handleError(final ErrorHandlerContext context, final
Record<?, ?> record, final Exception exception) {
assertEquals(internalProcessorContext.topic(), context.topic());
assertEquals(internalProcessorContext.partition(),
context.partition());
assertEquals(internalProcessorContext.offset(), context.offset());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index ebc92c65184..64564278a18 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -47,10 +47,10 @@ import
org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
-import
org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
import
org.apache.kafka.streams.errors.ProductionExceptionHandler.SerializationExceptionOrigin;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
@@ -89,6 +89,8 @@ import static java.util.Collections.emptySet;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.streams.errors.ProductionExceptionHandler.Response;
+import static
org.apache.kafka.streams.errors.ProductionExceptionHandler.Result;
import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE;
import static
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2;
import static
org.apache.kafka.streams.processor.internals.ClientUtils.producerRecordSizeInBytes;
@@ -1201,7 +1203,7 @@ public class RecordCollectorTest {
logContext,
taskId,
getExceptionalStreamsProducerOnSend(exception),
- new
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)),
+ new
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())),
streamsMetrics,
topology
);
@@ -1228,7 +1230,7 @@ public class RecordCollectorTest {
logContext,
taskId,
getExceptionalStreamsProducerOnSend(exception),
- new
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)),
+ new
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())),
streamsMetrics,
topology
);
@@ -1252,7 +1254,7 @@ public class RecordCollectorTest {
logContext,
taskId,
getExceptionalStreamsProducerOnSend(exception),
- new
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE)),
+ new
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume())),
streamsMetrics,
topology
);
@@ -1276,7 +1278,7 @@ public class RecordCollectorTest {
taskId,
getExceptionalStreamsProducerOnSend(new
RuntimeException("KABOOM!")),
new ProductionExceptionHandlerMock(
- Optional.of(ProductionExceptionHandlerResponse.CONTINUE),
+ Optional.of(ProductionExceptionHandler.Response.resume()),
context,
sinkNodeName,
taskId
@@ -1347,7 +1349,7 @@ public class RecordCollectorTest {
taskId,
getExceptionalStreamsProducerOnSend(exception),
new ProductionExceptionHandlerMock(
- Optional.of(ProductionExceptionHandlerResponse.FAIL),
+ Optional.of(ProductionExceptionHandler.Response.fail()),
context,
sinkNodeName,
taskId
@@ -1377,7 +1379,7 @@ public class RecordCollectorTest {
taskId,
getExceptionalStreamsProducerOnSend(exception),
new ProductionExceptionHandlerMock(
- Optional.of(ProductionExceptionHandlerResponse.CONTINUE),
+ Optional.of(ProductionExceptionHandler.Response.resume()),
context,
sinkNodeName,
taskId
@@ -1400,7 +1402,7 @@ public class RecordCollectorTest {
taskId,
getExceptionalStreamsProducerOnSend(exception),
new ProductionExceptionHandlerMock(
- Optional.of(ProductionExceptionHandlerResponse.RETRY),
+ Optional.of(ProductionExceptionHandler.Response.retry()),
context,
sinkNodeName,
taskId
@@ -1535,7 +1537,7 @@ public class RecordCollectorTest {
public void shouldDropRecordExceptionUsingAlwaysContinueExceptionHandler()
{
try (final ErrorStringSerializer errorSerializer = new
ErrorStringSerializer()) {
final RecordCollector collector = newRecordCollector(new
ProductionExceptionHandlerMock(
- Optional.of(ProductionExceptionHandlerResponse.CONTINUE),
+ Optional.of(ProductionExceptionHandler.Response.resume()),
context,
sinkNodeName,
taskId,
@@ -1564,7 +1566,7 @@ public class RecordCollectorTest {
public void
shouldThrowStreamsExceptionWhenValueSerializationFailedAndProductionExceptionHandlerRepliesWithFail()
{
try (final ErrorStringSerializer errorSerializer = new
ErrorStringSerializer()) {
final RecordCollector collector = newRecordCollector(new
ProductionExceptionHandlerMock(
- Optional.of(ProductionExceptionHandlerResponse.FAIL),
+ Optional.of(ProductionExceptionHandler.Response.fail()),
context,
sinkNodeName,
taskId,
@@ -1585,7 +1587,7 @@ public class RecordCollectorTest {
public void
shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionExceptionHandlerRepliesWithFail()
{
try (final ErrorStringSerializer errorSerializer = new
ErrorStringSerializer()) {
final RecordCollector collector = newRecordCollector(new
ProductionExceptionHandlerMock(
- Optional.of(ProductionExceptionHandlerResponse.FAIL),
+ Optional.of(ProductionExceptionHandler.Response.fail()),
context,
sinkNodeName,
taskId,
@@ -1795,7 +1797,7 @@ public class RecordCollectorTest {
public void shouldNotCallProductionExceptionHandlerOnClassCastException() {
try (final ErrorStringSerializer errorSerializer = new
ErrorStringSerializer()) {
final RecordCollector collector = newRecordCollector(
- new
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandlerResponse.CONTINUE))
+ new
ProductionExceptionHandlerMock(Optional.of(ProductionExceptionHandler.Response.resume()))
);
collector.initialize();
@@ -1834,6 +1836,58 @@ public class RecordCollectorTest {
collector.flush(); // need to call flush() to check for internal
exceptions
}
+ @Test
+ public void
shouldBuildDeadLetterQueueRecordsInDefaultExceptionHandlerDuringDeserialization()
{
+ try (final ErrorStringSerializer errorSerializer = new
ErrorStringSerializer()) {
+ final DefaultProductionExceptionHandler productionExceptionHandler
= new DefaultProductionExceptionHandler();
+ productionExceptionHandler.configure(Collections.singletonMap(
+ StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
+ "dlq"
+ ));
+ final RecordCollector collector =
newRecordCollector(productionExceptionHandler);
+ collector.initialize();
+
+ assertThat(mockProducer.history().isEmpty(), equalTo(true));
+ assertThrows(
+ StreamsException.class,
+ () ->
+ collector.send(topic, "hello", "world", null, 0, null,
errorSerializer, stringSerializer, sinkNodeName, context)
+ );
+
+ assertEquals(1, mockProducer.history().size());
+ assertEquals("dlq", mockProducer.history().get(0).topic());
+ }
+ }
+
+
+ @Test
+ public void shouldBuildDeadLetterQueueRecordsInDefaultExceptionHandler() {
+ final KafkaException exception = new KafkaException("KABOOM!");
+ final StreamsProducer streamProducer =
getExceptionalStreamsProducerOnSend(exception);
+ final MockProducer<byte[], byte[]> mockProducer =
(MockProducer<byte[], byte[]>) streamProducer.kafkaProducer();
+ final DefaultProductionExceptionHandler productionExceptionHandler =
new DefaultProductionExceptionHandler();
+ productionExceptionHandler.configure(Collections.singletonMap(
+ StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
+ "dlq"
+ ));
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ streamProducer,
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ collector.initialize();
+
+ collector.send(topic, "hello", "world", null, 0, null,
stringSerializer, stringSerializer, sinkNodeName, context);
+ assertThrows(StreamsException.class, collector::flush);
+
+ assertEquals(1, mockProducer.history().size());
+ assertEquals("dlq", mockProducer.history().get(0).topic());
+ }
+
@Test
public void shouldNotSendIfSendOfOtherTaskFailedInCallback() {
final TaskId taskId1 = new TaskId(0, 0);
@@ -1954,6 +2008,116 @@ public class RecordCollectorTest {
}
}
+ public void shouldCallOldImplementationExceptionHandler() {
+ final KafkaException exception = new KafkaException("KABOOM!");
+ final StreamsProducer streamProducer =
getExceptionalStreamsProducerOnSend(exception);
+ final OldProductionExceptionHandlerImplementation
productionExceptionHandler = new OldProductionExceptionHandlerImplementation();
+
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ streamProducer,
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ collector.initialize();
+
+ collector.send(topic, "hello", "world", null, 0, null,
stringSerializer, stringSerializer, sinkNodeName, context);
+ final Exception thrown = assertThrows(StreamsException.class,
collector::flush);
+
+ assertEquals(exception, thrown.getCause());
+ }
+
+ @Test
+ public void shouldCallOldImplementationWithRecordContextExceptionHandler()
{
+ final KafkaException exception = new KafkaException("KABOOM!");
+ final StreamsProducer streamProducer =
getExceptionalStreamsProducerOnSend(exception);
+ final OldProductionExceptionHandlerWithRecordContextImplementation
productionExceptionHandler = new
OldProductionExceptionHandlerWithRecordContextImplementation();
+
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ streamProducer,
+ productionExceptionHandler,
+ streamsMetrics,
+ topology
+ );
+
+ collector.initialize();
+
+ collector.send(topic, "hello", "world", null, 0, null,
stringSerializer, stringSerializer, sinkNodeName, context);
+ final Exception thrown = assertThrows(StreamsException.class,
collector::flush);
+
+ assertEquals(exception, thrown.getCause());
+ }
+
+ @Test
+ void shouldFailWithDeadLetterQueueRecords() {
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+ final List<ProducerRecord<byte[], byte[]>> records =
Collections.singletonList(record);
+
+ final ProductionExceptionHandler.Response response =
ProductionExceptionHandler.Response.fail(records);
+
+ assertEquals(Result.FAIL, response.result());
+ assertEquals(1, response.deadLetterQueueRecords().size());
+ assertEquals(record, response.deadLetterQueueRecords().get(0));
+ }
+
+ @Test
+ void shouldFailWithoutDeadLetterQueueRecords() {
+ final ProductionExceptionHandler.Response response =
ProductionExceptionHandler.Response.fail();
+
+ assertEquals(Result.FAIL, response.result());
+ assertTrue(response.deadLetterQueueRecords().isEmpty());
+ }
+
+ @Test
+ void shouldResumeWithDeadLetterQueueRecords() {
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+ final List<ProducerRecord<byte[], byte[]>> records =
Collections.singletonList(record);
+
+ final Response response = Response.resume(records);
+
+ assertEquals(Result.RESUME, response.result());
+ assertEquals(1, response.deadLetterQueueRecords().size());
+ assertEquals(record, response.deadLetterQueueRecords().get(0));
+ }
+
+ @Test
+ void shouldResumeWithoutDeadLetterQueueRecords() {
+ final Response response = Response.resume();
+
+ assertEquals(Result.RESUME, response.result());
+ assertTrue(response.deadLetterQueueRecords().isEmpty());
+ }
+
+ @Test
+ void shouldRetryWithoutDeadLetterQueueRecords() {
+ final Response response = Response.retry();
+
+ assertEquals(Result.RETRY, response.result());
+ assertTrue(response.deadLetterQueueRecords().isEmpty());
+ }
+
+ @Test
+ void shouldNotBeModifiable() {
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+ final List<ProducerRecord<byte[], byte[]>> records =
Collections.singletonList(record);
+
+ final Response response = Response.fail(records);
+
+ assertThrows(UnsupportedOperationException.class, () ->
response.deadLetterQueueRecords().add(record));
+ }
+
+ @Test
+ void shouldReturnsEmptyList() {
+ final Response response = Response.fail();
+
+ assertTrue(response.deadLetterQueueRecords().isEmpty());
+ }
+
private RecordCollector newRecordCollector(final
ProductionExceptionHandler productionExceptionHandler) {
return new RecordCollectorImpl(
logContext,
@@ -1978,8 +2142,12 @@ public class RecordCollectorTest {
new MockProducer<>(cluster, true, null, byteArraySerializer,
byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final
ProducerRecord<byte[], byte[]> record, final Callback callback) {
- callback.onCompletion(null, exception);
- return null;
+ if (record.topic().equals("dlq")) {
+ return super.send(record, callback);
+ } else {
+ callback.onCompletion(null, exception);
+ return null;
+ }
}
},
AT_LEAST_ONCE,
@@ -2023,7 +2191,7 @@ public class RecordCollectorTest {
}
public static class ProductionExceptionHandlerMock implements
ProductionExceptionHandler {
- private final Optional<ProductionExceptionHandlerResponse> response;
+ private final Optional<Response> response;
private boolean shouldThrowException;
private InternalProcessorContext<Void, Void> expectedContext;
private String expectedProcessorNodeId;
@@ -2040,11 +2208,11 @@ public class RecordCollectorTest {
this.expectedSerializationExceptionOrigin = null;
}
- public ProductionExceptionHandlerMock(final
Optional<ProductionExceptionHandlerResponse> response) {
+ public ProductionExceptionHandlerMock(final Optional<Response>
response) {
this.response = response;
}
- public ProductionExceptionHandlerMock(final
Optional<ProductionExceptionHandlerResponse> response,
+ public ProductionExceptionHandlerMock(final Optional<Response>
response,
final
InternalProcessorContext<Void, Void> context,
final String processorNodeId,
final TaskId taskId) {
@@ -2064,7 +2232,7 @@ public class RecordCollectorTest {
this.shouldThrowException = shouldThrowException;
}
- public ProductionExceptionHandlerMock(final
Optional<ProductionExceptionHandlerResponse> response,
+ public ProductionExceptionHandlerMock(final Optional<Response>
response,
final
InternalProcessorContext<Void, Void> context,
final String processorNodeId,
final TaskId taskId,
@@ -2075,9 +2243,9 @@ public class RecordCollectorTest {
}
@Override
- public ProductionExceptionHandlerResponse handle(final
ErrorHandlerContext context,
- final
ProducerRecord<byte[], byte[]> record,
- final Exception
exception) {
+ public Response handleError(final ErrorHandlerContext context,
+ final ProducerRecord<byte[], byte[]>
record,
+ final Exception exception) {
assertInputs(context, exception);
if (shouldThrowException) {
throw new RuntimeException("CRASH");
@@ -2087,10 +2255,10 @@ public class RecordCollectorTest {
@SuppressWarnings("rawtypes")
@Override
- public ProductionExceptionHandlerResponse
handleSerializationException(final ErrorHandlerContext context,
-
final ProducerRecord record,
-
final Exception exception,
-
final SerializationExceptionOrigin origin) {
+ public Response handleSerializationError(final ErrorHandlerContext
context,
+ final ProducerRecord record,
+ final Exception exception,
+ final
SerializationExceptionOrigin origin) {
assertInputs(context, exception);
assertEquals(expectedSerializationExceptionOrigin, origin);
if (shouldThrowException) {
@@ -2115,4 +2283,33 @@ public class RecordCollectorTest {
assertEquals("KABOOM!", exception.getMessage());
}
}
+
+ public static class OldProductionExceptionHandlerImplementation implements
ProductionExceptionHandler {
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public ProductionExceptionHandlerResponse handle(final
ProducerRecord<byte[], byte[]> record,
+ final Exception
exception) {
+ return ProductionExceptionHandlerResponse.FAIL;
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ }
+ }
+
+ public static class
OldProductionExceptionHandlerWithRecordContextImplementation implements
ProductionExceptionHandler {
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public ProductionExceptionHandlerResponse handle(final
ErrorHandlerContext context,
+ final
ProducerRecord<byte[], byte[]> record,
+ final Exception
exception) {
+ return ProductionExceptionHandlerResponse.FAIL;
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ }
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index aa3bb57c7a6..db726d78dcc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -17,32 +17,44 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
-import
org.apache.kafka.streams.errors.DeserializationExceptionHandler.DeserializationHandlerResponse;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import static
org.apache.kafka.streams.StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
+import static
org.apache.kafka.streams.errors.DeserializationExceptionHandler.Response;
+import static
org.apache.kafka.streams.errors.DeserializationExceptionHandler.Result;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class RecordDeserializerTest {
private final String sourceNodeName = "source-node";
@@ -108,7 +120,7 @@ public class RecordDeserializerTest {
"value"
),
new DeserializationExceptionHandlerMock(
- Optional.of(DeserializationHandlerResponse.FAIL),
+
Optional.of(DeserializationExceptionHandler.Response.fail()),
rawRecord,
sourceNodeName,
taskId
@@ -147,7 +159,7 @@ public class RecordDeserializerTest {
"value"
),
new DeserializationExceptionHandlerMock(
-
Optional.of(DeserializationHandlerResponse.CONTINUE),
+
Optional.of(DeserializationExceptionHandler.Response.resume()),
rawRecord,
sourceNodeName,
taskId
@@ -188,7 +200,7 @@ public class RecordDeserializerTest {
);
assertEquals("Fatal user code error in deserialization error
callback", exception.getMessage());
assertInstanceOf(NullPointerException.class, exception.getCause());
- assertEquals("Invalid DeserializationExceptionHandler response.",
exception.getCause().getMessage());
+ assertEquals("Invalid DeserializationExceptionResponse response.",
exception.getCause().getMessage());
}
}
@@ -222,6 +234,144 @@ public class RecordDeserializerTest {
}
}
+
+ @Test
+ public void
shouldBuildDeadLetterQueueRecordsInDefaultDeserializationException() {
+ try (Metrics metrics = new Metrics()) {
+ final MockRecordCollector collector = new MockRecordCollector();
+ final InternalProcessorContext<Object, Object>
internalProcessorContext =
+ new InternalMockProcessorContext<>(
+ new StateSerdes<>("sink", Serdes.ByteArray(),
Serdes.ByteArray()),
+ collector
+ );
+ final DeserializationExceptionHandler
deserializationExceptionHandler = new LogAndFailExceptionHandler();
+
deserializationExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
"dlq"));
+
+ assertThrows(StreamsException.class, () ->
RecordDeserializer.handleDeserializationFailure(
+ deserializationExceptionHandler,
+ internalProcessorContext,
+ new RuntimeException(new NullPointerException("Oopsie")),
+ new ConsumerRecord<>("source",
+ 0,
+ 0,
+ 123,
+ TimestampType.CREATE_TIME,
+ -1,
+ -1,
+ "hello".getBytes(StandardCharsets.UTF_8),
+ "world".getBytes(StandardCharsets.UTF_8),
+ new RecordHeaders(),
+ Optional.empty()),
+ new LogContext().logger(this.getClass()),
+ metrics.sensor("dropped-records"),
+ "sourceNode"
+ ));
+
+ assertEquals(1, collector.collected().size());
+ assertEquals("dlq", collector.collected().get(0).topic());
+ assertEquals("hello", new String((byte[])
collector.collected().get(0).key()));
+ assertEquals("world", new String((byte[])
collector.collected().get(0).value()));
+ }
+ }
+
+
+ @Test
+ public void
shouldBuildDeadLetterQueueRecordsInLogAndContinueDeserializationException() {
+ try (Metrics metrics = new Metrics()) {
+ final MockRecordCollector collector = new MockRecordCollector();
+ final InternalProcessorContext<Object, Object>
internalProcessorContext =
+ new InternalMockProcessorContext<>(
+ new StateSerdes<>("sink", Serdes.ByteArray(),
Serdes.ByteArray()),
+ collector
+ );
+ final DeserializationExceptionHandler
deserializationExceptionHandler = new LogAndContinueExceptionHandler();
+
deserializationExceptionHandler.configure(Collections.singletonMap(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG,
"dlq"));
+
+ RecordDeserializer.handleDeserializationFailure(
+ deserializationExceptionHandler,
+ internalProcessorContext,
+ new RuntimeException(new NullPointerException("Oopsie")),
+ new ConsumerRecord<>("source",
+ 0,
+ 0,
+ 123,
+ TimestampType.CREATE_TIME,
+ -1,
+ -1,
+ "hello".getBytes(StandardCharsets.UTF_8),
+ "world".getBytes(StandardCharsets.UTF_8),
+ new RecordHeaders(),
+ Optional.empty()),
+ new LogContext().logger(this.getClass()),
+ metrics.sensor("dropped-records"),
+ "sourceNode"
+ );
+
+ assertEquals(1, collector.collected().size());
+ assertEquals("dlq", collector.collected().get(0).topic());
+ assertEquals("hello", new String((byte[])
collector.collected().get(0).key()));
+ assertEquals("world", new String((byte[])
collector.collected().get(0).value()));
+ }
+ }
+
+ @Test
+ void shouldFailWithDeadLetterQueueRecords() {
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+ final List<ProducerRecord<byte[], byte[]>> records =
Collections.singletonList(record);
+
+ final Response response = Response.fail(records);
+
+ assertEquals(Result.FAIL, response.result());
+ assertEquals(1, response.deadLetterQueueRecords().size());
+ assertEquals(record, response.deadLetterQueueRecords().get(0));
+ }
+
+ @Test
+ void shouldFailWithoutDeadLetterQueueRecords() {
+ final Response response =
DeserializationExceptionHandler.Response.fail();
+
+ assertEquals(Result.FAIL, response.result());
+ assertTrue(response.deadLetterQueueRecords().isEmpty());
+ }
+
+ @Test
+ void shouldResumeWithDeadLetterQueueRecords() {
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+ final List<ProducerRecord<byte[], byte[]>> records =
Collections.singletonList(record);
+
+ final Response response = Response.resume(records);
+
+ assertEquals(Result.RESUME, response.result());
+ assertEquals(1, response.deadLetterQueueRecords().size());
+ assertEquals(record, response.deadLetterQueueRecords().get(0));
+ }
+
+ @Test
+ void shouldResumeWithoutDeadLetterQueueRecords() {
+ final Response response = Response.resume();
+
+ assertEquals(Result.RESUME, response.result());
+ assertTrue(response.deadLetterQueueRecords().isEmpty());
+ }
+
+
+ @Test
+ void shouldNotBeModifiable() {
+ final ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>("topic", new byte[]{}, new byte[]{});
+ final List<ProducerRecord<byte[], byte[]>> records =
Collections.singletonList(record);
+
+ final Response response = Response.fail(records);
+
+ assertThrows(UnsupportedOperationException.class, () ->
response.deadLetterQueueRecords().add(record));
+ }
+
+ @Test
+ void shouldReturnsEmptyList() {
+ final Response response = Response.fail();
+
+ assertTrue(response.deadLetterQueueRecords().isEmpty());
+ }
+
static class TheSourceNode extends SourceNode<Object, Object> {
private final boolean keyThrowsException;
private final boolean valueThrowsException;
@@ -258,12 +408,12 @@ public class RecordDeserializerTest {
}
public static class DeserializationExceptionHandlerMock implements
DeserializationExceptionHandler {
- private final Optional<DeserializationHandlerResponse> response;
+ private final Optional<Response> response;
private final ConsumerRecord<byte[], byte[]> expectedRecord;
private final String expectedProcessorNodeId;
private final TaskId expectedTaskId;
- public DeserializationExceptionHandlerMock(final
Optional<DeserializationHandlerResponse> response,
+ public DeserializationExceptionHandlerMock(final Optional<Response>
response,
final
ConsumerRecord<byte[], byte[]> record,
final String
processorNodeId,
final TaskId taskId) {
@@ -274,9 +424,9 @@ public class RecordDeserializerTest {
}
@Override
- public DeserializationHandlerResponse handle(final ErrorHandlerContext
context,
- final
ConsumerRecord<byte[], byte[]> record,
- final Exception
exception) {
+ public Response handleError(final ErrorHandlerContext context,
+ final ConsumerRecord<byte[], byte[]>
record,
+ final Exception exception) {
assertEquals(expectedRecord.topic(), context.topic());
assertEquals(expectedRecord.partition(), context.partition());
assertEquals(expectedRecord.offset(), context.offset());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 701f38eda0c..385719530b9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -3032,7 +3032,7 @@ public class StreamTaskTest {
public static class CrashingProcessingExceptionHandler implements
ProcessingExceptionHandler {
@Override
- public ProcessingHandlerResponse handle(final ErrorHandlerContext
context, final Record<?, ?> record, final Exception exception) {
+ public Response handleError(final ErrorHandlerContext context, final
Record<?, ?> record, final Exception exception) {
throw new RuntimeException("KABOOM from
ProcessingExceptionHandlerMock!");
}
@@ -3044,7 +3044,7 @@ public class StreamTaskTest {
public static class NullProcessingExceptionHandler implements
ProcessingExceptionHandler {
@Override
- public ProcessingHandlerResponse handle(final ErrorHandlerContext
context, final Record<?, ?> record, final Exception exception) {
+ public Response handleError(final ErrorHandlerContext context, final
Record<?, ?> record, final Exception exception) {
return null;
}
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
index 8a7f5434963..ed6898b3713 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java
@@ -81,6 +81,23 @@ public class MockRecordCollector implements RecordCollector {
);
}
+ @Override
+ public <K, V> void send(final K key,
+ final V value,
+ final String processorNodeId,
+ final InternalProcessorContext<?, ?> context,
+ final ProducerRecord<byte[], byte[]>
serializedRecord) {
+ // Building a new ProducerRecord for key & value type conversion
+ collected.add(new ProducerRecord<>(
+ serializedRecord.topic(),
+ serializedRecord.partition(),
+ serializedRecord.timestamp(),
+ key,
+ value,
+ serializedRecord.headers())
+ );
+ }
+
@Override
public void initialize() {}