cadonna commented on code in PR #16433:
URL: https://github.com/apache/kafka/pull/16433#discussion_r1687751846
##########
streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java:
##########
@@ -26,7 +26,8 @@
*/
public class DefaultProductionExceptionHandler implements
ProductionExceptionHandler {
@Override
- public ProductionExceptionHandlerResponse handle(final
ProducerRecord<byte[], byte[]> record,
+ public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext
context,
Review Comment:
I do not think we can modify this without deprecating the old method and
mentioning it in the KIP since it is part of the public API.
I am aware that this method is only called internally.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1705,4 +1742,70 @@ public byte[] serialize(final String topic, final
Headers headers, final String
return serialize(topic, data);
}
}
+
+ public static class ProductionExceptionHandlerMock implements
ProductionExceptionHandler {
+ private final ProductionExceptionHandlerResponse response;
+ private InternalProcessorContext<Void, Void> expectedContext;
+ private String expectedProcessorNodeId;
+ private TaskId expectedTaskId;
+ private SerializationExceptionOrigin
expectedSerializationExceptionOrigin;
+
+ public ProductionExceptionHandlerMock(final
ProductionExceptionHandlerResponse response) {
+ this.response = response;
+ }
+
+ public ProductionExceptionHandlerMock(final
ProductionExceptionHandlerResponse response,
+ final
InternalProcessorContext<Void, Void> context,
+ final String processorNodeId,
+ final TaskId taskId) {
+ this(response);
+ this.expectedContext = context;
+ this.expectedProcessorNodeId = processorNodeId;
+ this.expectedTaskId = taskId;
+ }
+
+ public ProductionExceptionHandlerMock(final
ProductionExceptionHandlerResponse response,
+ final
InternalProcessorContext<Void, Void> context,
+ final String processorNodeId,
+ final TaskId taskId,
+ final
SerializationExceptionOrigin origin) {
+ this(response, context, processorNodeId, taskId);
+ this.expectedSerializationExceptionOrigin = origin;
+ }
+
+ @Override
+ public ProductionExceptionHandlerResponse handle(final
ErrorHandlerContext context,
+ final
ProducerRecord<byte[], byte[]> record,
+ final Exception
exception) {
+ assertInputs(context, exception);
+ return response;
+ }
+
+ @Override
+ public ProductionExceptionHandlerResponse
handleSerializationException(final ErrorHandlerContext context,
+
final ProducerRecord record,
+
final Exception exception,
+
final SerializationExceptionOrigin origin) {
+ assertInputs(context, exception);
+ assertEquals(expectedSerializationExceptionOrigin, origin);
+ return response;
+ }
+
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ // do nothing
+ }
+
+ private void assertInputs(final ErrorHandlerContext context, final
Exception exception) {
+ assertEquals(expectedContext.recordContext().topic(),
context.topic());
+ assertEquals(expectedContext.recordContext().partition(),
context.partition());
+ assertEquals(expectedContext.recordContext().offset(),
context.offset());
+ assertEquals(expectedContext.recordContext().rawRecord().key(),
context.sourceRawKey());
+ assertEquals(expectedContext.recordContext().rawRecord().value(),
context.sourceRawValue());
Review Comment:
I think you need to use `assertArrayEquals()` here to verify the byte arrays.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -217,16 +218,32 @@ public <K, V> void send(final String topic,
valueClass),
exception);
} catch (final Exception exception) {
+ final ProductionExceptionHandler.SerializationExceptionOrigin
origin;
+ if (keyBytes == null) {
+ origin =
ProductionExceptionHandler.SerializationExceptionOrigin.KEY;
+ } else {
+ origin =
ProductionExceptionHandler.SerializationExceptionOrigin.VALUE;
+ }
Review Comment:
I think, it would be better to have something like:
```java
try {
keyBytes = keySerializer.serialize(topic, headers, key);
} catch (final ClassCastException exception) {
handleClassCastException(...)
} catch (final Exception exception) {
handleException(ProductionExceptionHandler.SerializationExceptionOrigin.KEY,
...)
}
try {
valBytes = valueSerializer.serialize(topic, headers, value);
} catch (final ClassCastException exception) {
handleClassCastException(...)
} catch (final Exception exception) {
handleException(ProductionExceptionHandler.SerializationExceptionOrigin.VALUE,
...)
}
```
##########
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java:
##########
@@ -30,22 +30,60 @@ public interface ProductionExceptionHandler extends
Configurable {
*
* @param record The record that failed to produce
* @param exception The exception that occurred during production
+ * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord,
Exception)} instead
*/
- ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[],
byte[]> record,
- final Exception exception);
+ @Deprecated
+ default ProductionExceptionHandlerResponse handle(final
ProducerRecord<byte[], byte[]> record,
+ final Exception
exception) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Inspect a record that we attempted to produce, and the exception that
resulted
+ * from attempting to produce it and determine whether or not to continue
processing.
+ *
+ * @param context The error handler context metadata
+ * @param record The record that failed to produce
+ * @param exception The exception that occurred during production
+ */
+ @SuppressWarnings("deprecation")
+ default ProductionExceptionHandlerResponse handle(final
ErrorHandlerContext context,
+ final
ProducerRecord<byte[], byte[]> record,
+ final Exception
exception) {
+ return handle(record, exception);
+ }
/**
* Handles serialization exception and determine if the process should
continue. The default implementation is to
* fail the process.
*
- * @param record the record that failed to serialize
- * @param exception the exception that occurred during serialization
+ * @param record the record that failed to serialize
+ * @param exception the exception that occurred during serialization
Review Comment:
Could you please revert the formatting changes in this two lines?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]