cadonna commented on code in PR #16093:
URL: https://github.com/apache/kafka/pull/16093#discussion_r1650786044
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -99,30 +81,57 @@ public void
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("name", new ProcessingExceptionProcessor(),
Collections.emptySet());
- node.setProcessingExceptionHandler(new
LogAndFailProcessingExceptionHandler());
+ node.setProcessingExceptionHandler(new
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL));
+
+ final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext);
- assertThrows(StreamsException.class, () -> node.process(new
Record<>("key", "value", 0)));
+ final StreamsException processingException =
assertThrows(StreamsException.class,
+ () -> node.process(new Record<>("key", "value", 0)));
+
+ assertEquals("Processing exception handler is set to fail upon" +
+ " a processing error. If you would rather have the streaming
pipeline" +
+ " continue after a processing error, please set the " +
+ PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
processingException.getMessage());
+
+ assertTrue(processingException.getCause() instanceof RuntimeException);
+ assertEquals("Processing exception should be caught and handled by the
processing exception handler.",
+ processingException.getCause().getMessage());
}
@Test
public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue()
{
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("name", new ProcessingExceptionProcessor(),
Collections.emptySet());
- node.setProcessingExceptionHandler(new
LogAndContinueProcessingExceptionHandler());
+ node.setProcessingExceptionHandler(new
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE));
+
+ final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext);
assertDoesNotThrow(() -> node.process(new Record<>("key", "value",
0)));
}
@Test
+ @SuppressWarnings("unchecked")
public void shouldNotHandleStreamsExceptionAsProcessingException() {
+ final ProcessingExceptionHandler processingExceptionHandler =
spy(ProcessingExceptionHandler.class);
Review Comment:
Please do not use spies! They are bad practice, because they do not really
decouple the code to test from other code. Here a
`mock(ProcessingExceptionHandler.class)` should be fine since
`ProcessingExceptionHandler` is an interface.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -99,30 +81,57 @@ public void
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("name", new ProcessingExceptionProcessor(),
Collections.emptySet());
- node.setProcessingExceptionHandler(new
LogAndFailProcessingExceptionHandler());
+ node.setProcessingExceptionHandler(new
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL));
+
+ final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext);
- assertThrows(StreamsException.class, () -> node.process(new
Record<>("key", "value", 0)));
+ final StreamsException processingException =
assertThrows(StreamsException.class,
+ () -> node.process(new Record<>("key", "value", 0)));
+
+ assertEquals("Processing exception handler is set to fail upon" +
+ " a processing error. If you would rather have the streaming
pipeline" +
+ " continue after a processing error, please set the " +
+ PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
processingException.getMessage());
+
+ assertTrue(processingException.getCause() instanceof RuntimeException);
+ assertEquals("Processing exception should be caught and handled by the
processing exception handler.",
+ processingException.getCause().getMessage());
}
@Test
public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue()
{
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("name", new ProcessingExceptionProcessor(),
Collections.emptySet());
- node.setProcessingExceptionHandler(new
LogAndContinueProcessingExceptionHandler());
+ node.setProcessingExceptionHandler(new
ProcessingExceptionHandlerMockTest(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE));
+
+ final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
node.init(internalProcessorContext);
assertDoesNotThrow(() -> node.process(new Record<>("key", "value",
0)));
}
@Test
+ @SuppressWarnings("unchecked")
public void shouldNotHandleStreamsExceptionAsProcessingException() {
+ final ProcessingExceptionHandler processingExceptionHandler =
spy(ProcessingExceptionHandler.class);
+
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>("name", new StreamsExceptionProcessor(),
Collections.emptySet());
+ node.setProcessingExceptionHandler(processingExceptionHandler);
+
+ final InternalProcessorContext<Object, Object>
internalProcessorContext = mock(InternalProcessorContext.class);
+ when(internalProcessorContext.taskId()).thenReturn(new TaskId(0, 0));
+ when(internalProcessorContext.metrics()).thenReturn(new
StreamsMetricsImpl(new Metrics(), "test-client", StreamsConfig.METRICS_LATEST,
new MockTime()));
node.init(internalProcessorContext);
- assertThrows("Streams exception", StreamsException.class,
+ final StreamsException streamsException =
assertThrows(StreamsException.class,
() -> node.process(new Record<>("key", "value", 0)));
+
+ assertEquals("Streams exception should not be caught and handled by
the processing exception handler.",
+ streamsException.getMessage());
+
Review Comment:
nit:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -63,6 +95,36 @@ public void
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
assertThrows(StreamsException.class, () -> node.init(null));
}
+ @Test
+ public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
+ final ProcessorNode<Object, Object, Object, Object> node =
+ new ProcessorNode<>("name", new ProcessingExceptionProcessor(),
Collections.emptySet());
+ node.setProcessingExceptionHandler(new
LogAndFailProcessingExceptionHandler());
Review Comment:
I am fine if you want to use `ProcessingExceptionHandlerMockTest` but could
you please rename it to `ProcessingExceptionHandlerMock` so that it is clear
that it is a mock and not a test.
Could you also use constants in `mockInternalProcessorContext()` for the
returning values of the stubs and correspondingly use the same constants in the
assertions in `handle()` of `ProcessingExceptionHandlerMock`. It is not
important that `context.topic()` returns `"topic"`, it is important that it
returns the same value as set in the processor context.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java:
##########
@@ -63,6 +95,36 @@ public void
shouldThrowStreamsExceptionIfExceptionCaughtDuringClose() {
assertThrows(StreamsException.class, () -> node.init(null));
}
+ @Test
+ public void shouldThrowStreamsExceptionWhenProcessingMarkedAsFail() {
+ final ProcessorNode<Object, Object, Object, Object> node =
+ new ProcessorNode<>("name", new ProcessingExceptionProcessor(),
Collections.emptySet());
+ node.setProcessingExceptionHandler(new
LogAndFailProcessingExceptionHandler());
+ node.init(internalProcessorContext);
+
+ assertThrows(StreamsException.class, () -> node.process(new
Record<>("key", "value", 0)));
+ }
+
+ @Test
+ public void shouldNotThrowStreamsExceptionWhenProcessingMarkedAsContinue()
{
+ final ProcessorNode<Object, Object, Object, Object> node =
+ new ProcessorNode<>("name", new ProcessingExceptionProcessor(),
Collections.emptySet());
+ node.setProcessingExceptionHandler(new
LogAndContinueProcessingExceptionHandler());
+ node.init(internalProcessorContext);
+
+ assertDoesNotThrow(() -> node.process(new Record<>("key", "value",
0)));
+ }
+
+ @Test
+ public void shouldNotHandleStreamsExceptionAsProcessingException() {
+ final ProcessorNode<Object, Object, Object, Object> node =
+ new ProcessorNode<>("name", new StreamsExceptionProcessor(),
Collections.emptySet());
+ node.init(internalProcessorContext);
Review Comment:
Sorry, I probably made a mistake while writing my comment. I wanted to say
to verify that `handle()` is NOT called.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]