mjsax commented on code in PR #16432:
URL: https://github.com/apache/kafka/pull/16432#discussion_r1693433703
##########
streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java:
##########
@@ -37,11 +37,27 @@ public interface DeserializationExceptionHandler extends
Configurable {
* @param context processor context
* @param record record that failed deserialization
* @param exception the actual exception
+ * @deprecated Please {@link #handle(ErrorHandlerContext, ConsumerRecord,
Exception)}
Review Comment:
```suggestion
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext,
ConsumerRecord, Exception)} instead.
```
##########
streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java:
##########
@@ -90,4 +105,8 @@ public String processorNodeId() {
public TaskId taskId() {
return this.taskId;
}
+
+ public ProcessorContext processorContext() {
Review Comment:
Should this return `Optional<ProcessorContext>` given that it could be
`null` ?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java:
##########
@@ -80,12 +81,29 @@ public static void handleDeserializationFailure(final
DeserializationExceptionHa
final
ConsumerRecord<byte[], byte[]> rawRecord,
final Logger log,
final Sensor
droppedRecordsSensor) {
+ handleDeserializationFailure(deserializationExceptionHandler,
processorContext, deserializationException, rawRecord, log,
droppedRecordsSensor, null);
Review Comment:
Why do we still need this one? Can't we update `GlobalStateManagerImpl` to
also pass in a `sourceNode.name` ?
##########
streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java:
##########
@@ -37,11 +37,27 @@ public interface DeserializationExceptionHandler extends
Configurable {
* @param context processor context
* @param record record that failed deserialization
* @param exception the actual exception
+ * @deprecated Please {@link #handle(ErrorHandlerContext, ConsumerRecord,
Exception)}
+ */
+ @Deprecated
+ default DeserializationHandlerResponse handle(final ProcessorContext
context,
+ final ConsumerRecord<byte[],
byte[]> record,
+ final Exception exception) {
+ throw new UnsupportedOperationException();
Review Comment:
Nit. KIP says `NotImplementedException()` -- we should update the KIP
accordingly.
##########
streams/src/main/java/org/apache/kafka/streams/errors/internals/DefaultErrorHandlerContext.java:
##########
@@ -51,6 +53,19 @@ public DefaultErrorHandlerContext(final String topic,
this.taskId = taskId;
}
+ public DefaultErrorHandlerContext(final ProcessorContext processorContext,
Review Comment:
Should this constructor replace the other one? And we just pass in `null`
for `processorContext`?
Or we use `setRecordContext()` -- but having constructor overloads is always
something I think is better to avoid if we can.
##########
streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java:
##########
@@ -37,11 +37,27 @@ public interface DeserializationExceptionHandler extends
Configurable {
* @param context processor context
* @param record record that failed deserialization
* @param exception the actual exception
+ * @deprecated Please {@link #handle(ErrorHandlerContext, ConsumerRecord,
Exception)}
Review Comment:
Not sure about already merged PRs, but if they also have deprecation notice,
this the the format we usually use. Including the version helps user to see how
far behind they are and when the can expect the old API to be remove, so we
want to include it.
Can you double check other PRs so we can update stuff if necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]