cadonna commented on code in PR #16432:
URL: https://github.com/apache/kafka/pull/16432#discussion_r1687725502
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java:
##########
@@ -69,17 +85,76 @@ public void
shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() {
assertEquals(rawRecord.headers(), record.headers());
}
+ @ParameterizedTest
+ @CsvSource({
+ "true, false",
+ "false, true"
+ })
+ public void
shouldThrowStreamsExceptionWhenDeserializationFailsAndDeserializationExceptionHandlerRepliesWithFail(final
boolean keyThrowsException,
+
final boolean valueThrowsException) {
+ final RecordDeserializer recordDeserializer = new RecordDeserializer(
+ new TheSourceNode(
+ sourceNodeName,
+ keyThrowsException,
+ valueThrowsException,
+ "key",
+ "value"
+ ),
+ new DeserializationExceptionHandlerMock(
+
DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL,
+ rawRecord,
+ sourceNodeName,
+ taskId
+ ),
+ new LogContext(),
+ new Metrics().sensor("dropped-records")
+ );
+
+ final StreamsException e = assertThrows(StreamsException.class, () ->
recordDeserializer.deserialize(context, rawRecord));
+ assertEquals(e.getMessage(), "Deserialization exception handler is set
to fail upon a deserialization error. If you would rather have the streaming
pipeline continue after a deserialization error, please set the
default.deserialization.exception.handler appropriately.");
Review Comment:
This line is definitely too long. Please add some new lines.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java:
##########
@@ -80,12 +81,29 @@ public static void handleDeserializationFailure(final
DeserializationExceptionHa
final
ConsumerRecord<byte[], byte[]> rawRecord,
final Logger log,
final Sensor
droppedRecordsSensor) {
+ handleDeserializationFailure(deserializationExceptionHandler,
processorContext, deserializationException, rawRecord, log,
droppedRecordsSensor, null);
+ }
+
+ public static void handleDeserializationFailure(final
DeserializationExceptionHandler deserializationExceptionHandler,
+ final ProcessorContext<?,
?> processorContext,
+ final Exception
deserializationException,
+ final
ConsumerRecord<byte[], byte[]> rawRecord,
+ final Logger log,
+ final Sensor
droppedRecordsSensor,
+ final String
sourceNodeName) {
final DeserializationExceptionHandler.DeserializationHandlerResponse
response;
try {
- response = deserializationExceptionHandler.handle(
+ final DefaultErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
(InternalProcessorContext<?, ?>) processorContext,
- rawRecord,
- deserializationException);
+ rawRecord.topic(),
+ rawRecord.partition(),
+ rawRecord.offset(),
+ rawRecord.headers(),
+ rawRecord.key(),
+ rawRecord.value(),
+ sourceNodeName,
+ processorContext.taskId());
+ response =
deserializationExceptionHandler.handle(errorHandlerContext, rawRecord,
deserializationException);
} catch (final Exception fatalUserException) {
Review Comment:
I realized now that you did not consider that the exception handler can also
throw an exception for the processing exception handler (and I as the reviewer
also missed it). Could you open a separate PR that catches exceptions from the
processing exception handler and re-throws them as it is done here?
Sorry for not noticing it earlier!
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java:
##########
@@ -69,17 +85,76 @@ public void
shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() {
assertEquals(rawRecord.headers(), record.headers());
}
+ @ParameterizedTest
+ @CsvSource({
+ "true, false",
+ "false, true"
+ })
+ public void
shouldThrowStreamsExceptionWhenDeserializationFailsAndDeserializationExceptionHandlerRepliesWithFail(final
boolean keyThrowsException,
+
final boolean valueThrowsException) {
+ final RecordDeserializer recordDeserializer = new RecordDeserializer(
+ new TheSourceNode(
+ sourceNodeName,
+ keyThrowsException,
+ valueThrowsException,
+ "key",
+ "value"
+ ),
+ new DeserializationExceptionHandlerMock(
+
DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL,
+ rawRecord,
+ sourceNodeName,
+ taskId
+ ),
+ new LogContext(),
+ new Metrics().sensor("dropped-records")
+ );
+
+ final StreamsException e = assertThrows(StreamsException.class, () ->
recordDeserializer.deserialize(context, rawRecord));
+ assertEquals(e.getMessage(), "Deserialization exception handler is set
to fail upon a deserialization error. If you would rather have the streaming
pipeline continue after a deserialization error, please set the
default.deserialization.exception.handler appropriately.");
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "true, false",
+ "false, true"
+ })
+ public void
shouldNotThrowStreamsExceptionWhenDeserializationFailsAndDeserializationExceptionHandlerRepliesWithContinue(final
boolean keyThrowsException,
Review Comment:
nit:
```suggestion
public void
shouldNotThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithContinue(final
boolean keyThrowsException,
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java:
##########
@@ -69,17 +85,76 @@ public void
shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() {
assertEquals(rawRecord.headers(), record.headers());
}
+ @ParameterizedTest
+ @CsvSource({
+ "true, false",
+ "false, true"
+ })
+ public void
shouldThrowStreamsExceptionWhenDeserializationFailsAndDeserializationExceptionHandlerRepliesWithFail(final
boolean keyThrowsException,
Review Comment:
nit:
```suggestion
public void
shouldThrowStreamsExceptionWhenDeserializationFailsAndExceptionHandlerRepliesWithFail(final
boolean keyThrowsException,
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java:
##########
@@ -69,17 +85,76 @@ public void
shouldReturnConsumerRecordWithDeserializedValueWhenNoExceptions() {
assertEquals(rawRecord.headers(), record.headers());
}
+ @ParameterizedTest
+ @CsvSource({
+ "true, false",
+ "false, true"
+ })
+ public void
shouldThrowStreamsExceptionWhenDeserializationFailsAndDeserializationExceptionHandlerRepliesWithFail(final
boolean keyThrowsException,
+
final boolean valueThrowsException) {
+ final RecordDeserializer recordDeserializer = new RecordDeserializer(
+ new TheSourceNode(
+ sourceNodeName,
+ keyThrowsException,
+ valueThrowsException,
+ "key",
+ "value"
+ ),
+ new DeserializationExceptionHandlerMock(
+
DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL,
+ rawRecord,
+ sourceNodeName,
+ taskId
+ ),
+ new LogContext(),
+ new Metrics().sensor("dropped-records")
+ );
+
+ final StreamsException e = assertThrows(StreamsException.class, () ->
recordDeserializer.deserialize(context, rawRecord));
+ assertEquals(e.getMessage(), "Deserialization exception handler is set
to fail upon a deserialization error. If you would rather have the streaming
pipeline continue after a deserialization error, please set the
default.deserialization.exception.handler appropriately.");
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "true, false",
+ "false, true"
Review Comment:
Do you also need to test `"true, true"`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]