cadonna commented on code in PR #16433:
URL: https://github.com/apache/kafka/pull/16433#discussion_r1687751846


##########
streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java:
##########
@@ -26,7 +26,8 @@
  */
 public class DefaultProductionExceptionHandler implements 
ProductionExceptionHandler {
     @Override
-    public ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], byte[]> record,
+    public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext 
context,

Review Comment:
   I do not think we can modify this without deprecating the old method and 
mentioning it in the KIP since it is part of the public API. 
   I am aware that this method is only called internally.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1705,4 +1742,70 @@ public byte[] serialize(final String topic, final 
Headers headers, final String
             return serialize(topic, data);
         }
     }
+
+    public static class ProductionExceptionHandlerMock implements 
ProductionExceptionHandler {
+        private final ProductionExceptionHandlerResponse response;
+        private InternalProcessorContext<Void, Void> expectedContext;
+        private String expectedProcessorNodeId;
+        private TaskId expectedTaskId;
+        private SerializationExceptionOrigin 
expectedSerializationExceptionOrigin;
+
+        public ProductionExceptionHandlerMock(final 
ProductionExceptionHandlerResponse response) {
+            this.response = response;
+        }
+
+        public ProductionExceptionHandlerMock(final 
ProductionExceptionHandlerResponse response,
+                                              final 
InternalProcessorContext<Void, Void> context,
+                                              final String processorNodeId,
+                                              final TaskId taskId) {
+            this(response);
+            this.expectedContext = context;
+            this.expectedProcessorNodeId = processorNodeId;
+            this.expectedTaskId = taskId;
+        }
+
+        public ProductionExceptionHandlerMock(final 
ProductionExceptionHandlerResponse response,
+                                              final 
InternalProcessorContext<Void, Void> context,
+                                              final String processorNodeId,
+                                              final TaskId taskId,
+                                              final 
SerializationExceptionOrigin origin) {
+            this(response, context, processorNodeId, taskId);
+            this.expectedSerializationExceptionOrigin = origin;
+        }
+
+        @Override
+        public ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context,
+                                                         final 
ProducerRecord<byte[], byte[]> record,
+                                                         final Exception 
exception) {
+            assertInputs(context, exception);
+            return response;
+        }
+
+        @Override
+        public ProductionExceptionHandlerResponse 
handleSerializationException(final ErrorHandlerContext context,
+                                                                               
final ProducerRecord record,
+                                                                               
final Exception exception,
+                                                                               
final SerializationExceptionOrigin origin) {
+            assertInputs(context, exception);
+            assertEquals(expectedSerializationExceptionOrigin, origin);
+            return response;
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs) {
+            // do nothing
+        }
+
+        private void assertInputs(final ErrorHandlerContext context, final 
Exception exception) {
+            assertEquals(expectedContext.recordContext().topic(), 
context.topic());
+            assertEquals(expectedContext.recordContext().partition(), 
context.partition());
+            assertEquals(expectedContext.recordContext().offset(), 
context.offset());
+            assertEquals(expectedContext.recordContext().rawRecord().key(), 
context.sourceRawKey());
+            assertEquals(expectedContext.recordContext().rawRecord().value(), 
context.sourceRawValue());

Review Comment:
   I think you need to use `assertArrayEquals()` here to verify the byte arrays.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -217,16 +218,32 @@ public <K, V> void send(final String topic,
                     valueClass),
                 exception);
         } catch (final Exception exception) {
+            final ProductionExceptionHandler.SerializationExceptionOrigin 
origin;
+            if (keyBytes == null) {
+                origin = 
ProductionExceptionHandler.SerializationExceptionOrigin.KEY;
+            } else {
+                origin = 
ProductionExceptionHandler.SerializationExceptionOrigin.VALUE;
+            }

Review Comment:
   I think, it would be better to have something like:
   ```java
   try {
       keyBytes = keySerializer.serialize(topic, headers, key);
   } catch (final ClassCastException exception) {
       handleClassCastException(...)
   } catch (final Exception exception) {
      
handleException(ProductionExceptionHandler.SerializationExceptionOrigin.KEY, 
...)
   }
   
   try {
       valBytes = valueSerializer.serialize(topic, headers, value);
   } catch (final ClassCastException exception) {
       handleClassCastException(...)
   } catch (final Exception exception) {
      
handleException(ProductionExceptionHandler.SerializationExceptionOrigin.VALUE, 
...)
   }
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java:
##########
@@ -30,22 +30,60 @@ public interface ProductionExceptionHandler extends 
Configurable {
      *
      * @param record The record that failed to produce
      * @param exception The exception that occurred during production
+     * @deprecated Use {@link #handle(ErrorHandlerContext, ProducerRecord, 
Exception)} instead
      */
-    ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], 
byte[]> record,
-                                              final Exception exception);
+    @Deprecated
+    default ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], byte[]> record,
+                                                      final Exception 
exception) {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Inspect a record that we attempted to produce, and the exception that 
resulted
+     * from attempting to produce it and determine whether or not to continue 
processing.
+     *
+     * @param context The error handler context metadata
+     * @param record The record that failed to produce
+     * @param exception The exception that occurred during production
+     */
+    @SuppressWarnings("deprecation")
+    default ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context,
+                                                      final 
ProducerRecord<byte[], byte[]> record,
+                                                      final Exception 
exception) {
+        return handle(record, exception);
+    }
 
     /**
      * Handles serialization exception and determine if the process should 
continue. The default implementation is to
      * fail the process.
      *
-     * @param record        the record that failed to serialize
-     * @param exception     the exception that occurred during serialization
+     * @param record the record that failed to serialize
+     * @param exception the exception that occurred during serialization

Review Comment:
   Could you please revert the formatting changes in this two lines?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to