cadonna commented on code in PR #17169:
URL: https://github.com/apache/kafka/pull/17169#discussion_r1756478889


##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java:
##########
@@ -42,6 +42,8 @@ public interface ErrorHandlerContext {
      * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
      * (and siblings), that do not always guarantee to provide a valid topic 
name, as they might be
      * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
+     * Additionally, when writing into a changelog topic, there is no 
associated input record,
+     * and thus no topic name is available.

Review Comment:
   Could you be more explicit here?
   The error handler context is used in processing exception handler, 
production exception handler, and deserialization exception handler. It is not 
immediately clear that this sentence only applies to the production exception 
handler.
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -282,7 +284,22 @@ public <K, V> void send(final String topic,
                     topicProducedSensor.record(bytesProduced, 
context.currentSystemTimeMs());
                 }
             } else {
-                recordSendError(topic, exception, serializedRecord, context, 
processorNodeId);
+                final RecordContext recordContext = context.recordContext();

Review Comment:
   Isn't the issue here that `context` might be `null`?
   See 
https://github.com/apache/kafka/blob/a4ea9aec73a3da470d079bad82d92810cac49d55/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L143



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -220,7 +222,7 @@ public <K, V> void send(final String topic,
                 partition,
                 timestamp,
                 processorNodeId,
-                context,
+                context.recordContext(),

Review Comment:
   See comment below.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -298,19 +315,19 @@ private <K, V> void handleException(final 
ProductionExceptionHandler.Serializati
                                         final Integer partition,
                                         final Long timestamp,
                                         final String processorNodeId,
-                                        final InternalProcessorContext<Void, 
Void> context,
+                                        final RecordContext recordContext,
                                         final RuntimeException 
serializationException) {
         log.debug(String.format("Error serializing record for topic %s", 
topic), serializationException);
 
         final DefaultErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
             null, // only required to pass for DeserializationExceptionHandler
-            context.recordContext().topic(),
-            context.recordContext().partition(),
-            context.recordContext().offset(),
-            context.recordContext().headers(),
+            recordContext != null ? recordContext.topic() : null,
+            recordContext != null ? recordContext.partition() : -1,
+            recordContext != null ? recordContext.offset() : -1,
+            recordContext != null ? recordContext.headers() : new 
RecordHeaders(),

Review Comment:
   To avoid code duplication, I think it is better to do this when you verify 
that `context` is `null` in `send()`. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -244,7 +246,7 @@ public <K, V> void send(final String topic,
                 partition,
                 timestamp,
                 processorNodeId,
-                context,
+                context.recordContext(),

Review Comment:
   See comment below.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -282,7 +284,22 @@ public <K, V> void send(final String topic,
                     topicProducedSensor.record(bytesProduced, 
context.currentSystemTimeMs());
                 }
             } else {
-                recordSendError(topic, exception, serializedRecord, context, 
processorNodeId);
+                final RecordContext recordContext = context.recordContext();
+                recordSendError(
+                    topic,
+                    exception,
+                    serializedRecord,
+                    recordContext != null ?
+                        new ProcessorRecordContext(

Review Comment:
   I am not sure, whether this makes a difference. Once you get the reference 
with `context.recordContext()` the record context will not change anymore since 
it is an immutable object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to