sebastienviale commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1650931604
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -99,30 +81,57 @@ public void
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("name", new ProcessingExceptionProcessor(),
Collections.emptySet());
- node.setProcessingExceptionHandler(new
LogAndFailProcessingExceptionHandler());
+ node.setProcessingExceptionHandler(new
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL));
+
+ final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext);
- assertThrows(StreamsException.class, () -> node.process(new
Record<>("key", "value", 0)));
+ final StreamsException processingException =
assertThrows(StreamsException.class,
+ () -> node.process(new Record<>("key", "value", 0)));
+
+ assertEquals("Processing exception handler is set to fail upon" +
+ " a processing error. If you would rather have the streaming
pipeline" +
+ " continue after a processing error, please set the " +
+ PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
processingException.getMessage());
+
+ assertTrue(processingException.getCause() instanceof RuntimeException);
+ assertEquals("Processing exception should be caught and handled by the
processing exception handler.",
+ processingException.getCause().getMessage());
}
@Test
public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue()
{
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("name", new ProcessingExceptionProcessor(),
Collections.emptySet());
- node.setProcessingExceptionHandler(new
LogAndContinueProcessingExceptionHandler());
+ node.setProcessingExceptionHandler(new
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE));
+
+ final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext);
assertDoesNotThrow(() -> node.process(new Record<>("key", "value",
0)));
}
@Test
+ @SuppressWarnings("unchecked")
public void shouldNotHandleStreamsExceptionAsProcessingException() {
+ final ProcessingExceptionHandler processingExceptionHandler =
spy(ProcessingExceptionHandler.class);
Review Comment:
I changed for mock(ProcessingExceptionHandler.class) to check if handle()
method is called
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]