cadonna commented on code in PR #17169:
URL: https://github.com/apache/kafka/pull/17169#discussion_r1758243292


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -252,40 +254,55 @@ public <K, V> void send(final String topic,
         final ProducerRecord<byte[], byte[]> serializedRecord = new 
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
 
         streamsProducer.send(serializedRecord, (metadata, exception) -> {
-            // if there's already an exception record, skip logging offsets or 
new exceptions
-            if (sendException.get() != null) {
-                return;
-            }
-
-            if (exception == null) {
-                final TopicPartition tp = new TopicPartition(metadata.topic(), 
metadata.partition());
-                if (metadata.offset() >= 0L) {
-                    offsets.put(tp, metadata.offset());
-                } else {
-                    log.warn("Received offset={} in produce response for {}", 
metadata.offset(), tp);
+            try {
+                // if there's already an exception record, skip logging 
offsets or new exceptions
+                if (sendException.get() != null) {
+                    return;
                 }
 
-                if (!topic.endsWith("-changelog")) {
-                    // we may not have created a sensor during initialization 
if the node uses dynamic topic routing,
-                    // as all topics are not known up front, so create the 
sensor for this topic if absent
-                    final Sensor topicProducedSensor = 
producedSensorByTopic.computeIfAbsent(
-                        topic,
-                        t -> TopicMetrics.producedSensor(
-                            Thread.currentThread().getName(),
-                            taskId.toString(),
-                            processorNodeId,
+                if (exception == null) {
+                    final TopicPartition tp = new 
TopicPartition(metadata.topic(), metadata.partition());
+                    if (metadata.offset() >= 0L) {
+                        offsets.put(tp, metadata.offset());
+                    } else {
+                        log.warn("Received offset={} in produce response for 
{}", metadata.offset(), tp);
+                    }
+
+                    if (!topic.endsWith("-changelog")) {
+                        // we may not have created a sensor during 
initialization if the node uses dynamic topic routing,
+                        // as all topics are not known up front, so create the 
sensor for this topic if absent
+                        final Sensor topicProducedSensor = 
producedSensorByTopic.computeIfAbsent(
                             topic,
-                            context.metrics()
-                        )
+                            t -> TopicMetrics.producedSensor(
+                                Thread.currentThread().getName(),
+                                taskId.toString(),
+                                processorNodeId,
+                                topic,
+                                // no `null` check required, as `context` can 
only be null for changelogs what we check above
+                                context.metrics()
+                            )
+                        );
+                        final long bytesProduced = 
producerRecordSizeInBytes(serializedRecord);
+                        topicProducedSensor.record(
+                            bytesProduced,
+                            // no `null` check required, as `context` can only 
be null for changelogs what we check above
+                            context.currentSystemTimeMs()
+                        );
+                    }
+                } else {
+                    recordSendError(
+                        topic,
+                        exception,
+                        serializedRecord,
+                        context, // ok as-is; `null` check done inside 
`recordSendError(...)`

Review Comment:
   Could you remove the inline comment, please 🙏 ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -252,40 +254,55 @@ public <K, V> void send(final String topic,
         final ProducerRecord<byte[], byte[]> serializedRecord = new 
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
 
         streamsProducer.send(serializedRecord, (metadata, exception) -> {
-            // if there's already an exception record, skip logging offsets or 
new exceptions
-            if (sendException.get() != null) {
-                return;
-            }
-
-            if (exception == null) {
-                final TopicPartition tp = new TopicPartition(metadata.topic(), 
metadata.partition());
-                if (metadata.offset() >= 0L) {
-                    offsets.put(tp, metadata.offset());
-                } else {
-                    log.warn("Received offset={} in produce response for {}", 
metadata.offset(), tp);
+            try {
+                // if there's already an exception record, skip logging 
offsets or new exceptions
+                if (sendException.get() != null) {
+                    return;
                 }
 
-                if (!topic.endsWith("-changelog")) {
-                    // we may not have created a sensor during initialization 
if the node uses dynamic topic routing,
-                    // as all topics are not known up front, so create the 
sensor for this topic if absent
-                    final Sensor topicProducedSensor = 
producedSensorByTopic.computeIfAbsent(
-                        topic,
-                        t -> TopicMetrics.producedSensor(
-                            Thread.currentThread().getName(),
-                            taskId.toString(),
-                            processorNodeId,
+                if (exception == null) {
+                    final TopicPartition tp = new 
TopicPartition(metadata.topic(), metadata.partition());
+                    if (metadata.offset() >= 0L) {
+                        offsets.put(tp, metadata.offset());
+                    } else {
+                        log.warn("Received offset={} in produce response for 
{}", metadata.offset(), tp);
+                    }
+
+                    if (!topic.endsWith("-changelog")) {
+                        // we may not have created a sensor during 
initialization if the node uses dynamic topic routing,
+                        // as all topics are not known up front, so create the 
sensor for this topic if absent
+                        final Sensor topicProducedSensor = 
producedSensorByTopic.computeIfAbsent(
                             topic,
-                            context.metrics()
-                        )
+                            t -> TopicMetrics.producedSensor(
+                                Thread.currentThread().getName(),
+                                taskId.toString(),
+                                processorNodeId,
+                                topic,
+                                // no `null` check required, as `context` can 
only be null for changelogs what we check above

Review Comment:
   Could you please remove the inline comment?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1580,6 +1580,98 @@ public void 
shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionEx
         }
     }
 
+    @Test
+    public void shouldNotFailIfContextIsNotAvailableOnSerializationError() {
+        try (final ErrorStringSerializer errorSerializer = new 
ErrorStringSerializer()) {
+            final RecordCollector collector = new RecordCollectorImpl(
+                logContext,
+                taskId,
+                streamsProducer,
+                productionExceptionHandler,
+                streamsMetrics,
+                topology
+            );
+
+            assertThrows(
+                StreamsException.class, // should not crash with 
NullPointerException
+                () -> collector.send(
+                    topic,
+                    "key",
+                    "val",
+                    null,
+                    0,
+                    null,
+                    errorSerializer,
+                    stringSerializer,
+                    sinkNodeName,
+                    null // pass `null` context for testing
+                )
+            );
+        }
+    }
+    
+    @Test
+    public void 
shouldNotFailIfRecordContextIsNotAvailableOnSerializationError() {
+        try (final ErrorStringSerializer errorSerializer = new 
ErrorStringSerializer()) {
+            final RecordCollector collector = new RecordCollectorImpl(
+                logContext,
+                taskId,
+                streamsProducer,
+                productionExceptionHandler,
+                streamsMetrics,
+                topology
+            );
+
+            // RecordContext is null when writing into a changelog topic
+            context.setRecordContext(null);
+            assertThrows(
+                StreamsException.class, // should not crash with 
NullPointerException
+                () -> collector.send(topic, "key", "val", null, 0, null, 
errorSerializer, stringSerializer, sinkNodeName, context)
+            );
+        }
+    }
+
+    @Test
+    public void shouldNotFailIfContextIsNotAvailableOnSendError() {
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            getExceptionalStreamsProducerOnSend(new 
RuntimeException("Kaboom!")),
+            productionExceptionHandler,
+            streamsMetrics,
+            topology
+        );
+
+        collector.send(
+            topic,
+            "key",
+            "val",
+            null,
+            0,
+            null,
+            stringSerializer,
+            stringSerializer,
+            sinkNodeName,
+            null // pass `null` context for testing
+        );
+    }
+
+    @Test
+    public void shouldNotFailIfRecordContextIsNotAvailableOnSendError() {
+        final RecordCollector collector = new RecordCollectorImpl(
+            logContext,
+            taskId,
+            getExceptionalStreamsProducerOnSend(new 
RuntimeException("Kaboom!")),
+            productionExceptionHandler,
+            streamsMetrics,
+            topology
+        );
+
+        // RecordContext is null when writing into a changelog topic
+        context.setRecordContext(null);

Review Comment:
   See my comment above.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -352,6 +364,33 @@ private <K, V> void handleException(final 
ProductionExceptionHandler.Serializati
         droppedRecordsSensor.record();
     }
 
+    private DefaultErrorHandlerContext errorHandlerContext(final 
InternalProcessorContext<Void, Void> context,
+                                                           final String 
processorNodeId) {
+        final RecordContext recordContext = context != null ? 
context.recordContext() : null;
+
+        return recordContext != null ?
+            new DefaultErrorHandlerContext(
+                null, // only required to pass for 
DeserializationExceptionHandler

Review Comment:
   Then maybe, it makes sense to have an overload for the constructor instead 
of an inline comment.
   



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1580,6 +1580,98 @@ public void 
shouldThrowStreamsExceptionWhenKeySerializationFailedAndProductionEx
         }
     }
 
+    @Test
+    public void shouldNotFailIfContextIsNotAvailableOnSerializationError() {
+        try (final ErrorStringSerializer errorSerializer = new 
ErrorStringSerializer()) {
+            final RecordCollector collector = new RecordCollectorImpl(
+                logContext,
+                taskId,
+                streamsProducer,
+                productionExceptionHandler,
+                streamsMetrics,
+                topology
+            );
+
+            assertThrows(
+                StreamsException.class, // should not crash with 
NullPointerException
+                () -> collector.send(
+                    topic,
+                    "key",
+                    "val",
+                    null,
+                    0,
+                    null,
+                    errorSerializer,
+                    stringSerializer,
+                    sinkNodeName,
+                    null // pass `null` context for testing

Review Comment:
   A variable `notAvailableContext = null` would be better than the inline 
comment.



##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java:
##########
@@ -42,6 +42,8 @@ public interface ErrorHandlerContext {
      * {@link 
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
 String...)}
      * (and siblings), that do not always guarantee to provide a valid topic 
name, as they might be
      * executed "out-of-band" due to some internal optimizations applied by 
the Kafka Streams DSL.
+     * Additionally, when writing into a changelog topic, there is no 
associated input record,
+     * and thus no topic name is available.

Review Comment:
   The issue probably starts already in the class javadoc since it only 
mentions failed processing. 
   I would extend that javadoc and describe all three cases in which the error 
context is used and give names to the cases. Something like deserialization 
exception handling, processing exception handling, and production exception 
handling. Then here we could write during production exception handling when 
writing into a changelog topic no input topic name is available.
   
   We can do that in a follow-up PR. Let us not block this PR on this.
   
   Comments are always hard to keep up-to-date. That is the reason, I do not 
like inline comments because they eventually start to lie at some point. Also 
Javadocs are challenging to keep up-to-date, but it is worth the pain, IMO.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -252,40 +254,55 @@ public <K, V> void send(final String topic,
         final ProducerRecord<byte[], byte[]> serializedRecord = new 
ProducerRecord<>(topic, partition, timestamp, keyBytes, valBytes, headers);
 
         streamsProducer.send(serializedRecord, (metadata, exception) -> {
-            // if there's already an exception record, skip logging offsets or 
new exceptions
-            if (sendException.get() != null) {
-                return;
-            }
-
-            if (exception == null) {
-                final TopicPartition tp = new TopicPartition(metadata.topic(), 
metadata.partition());
-                if (metadata.offset() >= 0L) {
-                    offsets.put(tp, metadata.offset());
-                } else {
-                    log.warn("Received offset={} in produce response for {}", 
metadata.offset(), tp);
+            try {
+                // if there's already an exception record, skip logging 
offsets or new exceptions
+                if (sendException.get() != null) {
+                    return;
                 }
 
-                if (!topic.endsWith("-changelog")) {
-                    // we may not have created a sensor during initialization 
if the node uses dynamic topic routing,
-                    // as all topics are not known up front, so create the 
sensor for this topic if absent
-                    final Sensor topicProducedSensor = 
producedSensorByTopic.computeIfAbsent(
-                        topic,
-                        t -> TopicMetrics.producedSensor(
-                            Thread.currentThread().getName(),
-                            taskId.toString(),
-                            processorNodeId,
+                if (exception == null) {
+                    final TopicPartition tp = new 
TopicPartition(metadata.topic(), metadata.partition());
+                    if (metadata.offset() >= 0L) {
+                        offsets.put(tp, metadata.offset());
+                    } else {
+                        log.warn("Received offset={} in produce response for 
{}", metadata.offset(), tp);
+                    }
+
+                    if (!topic.endsWith("-changelog")) {
+                        // we may not have created a sensor during 
initialization if the node uses dynamic topic routing,
+                        // as all topics are not known up front, so create the 
sensor for this topic if absent
+                        final Sensor topicProducedSensor = 
producedSensorByTopic.computeIfAbsent(
                             topic,
-                            context.metrics()
-                        )
+                            t -> TopicMetrics.producedSensor(
+                                Thread.currentThread().getName(),
+                                taskId.toString(),
+                                processorNodeId,
+                                topic,
+                                // no `null` check required, as `context` can 
only be null for changelogs what we check above
+                                context.metrics()
+                            )
+                        );
+                        final long bytesProduced = 
producerRecordSizeInBytes(serializedRecord);
+                        topicProducedSensor.record(
+                            bytesProduced,
+                            // no `null` check required, as `context` can only 
be null for changelogs what we check above

Review Comment:
   Please remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to