cadonna commented on code in PR #17169:
URL: https://github.com/apache/kafka/pull/17169#discussion_r1756478889
##########
streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java:
##########
@@ -42,6 +42,8 @@ public interface ErrorHandlerContext {
* {@link
org.apache.kafka.streams.kstream.KTable#transformValues(ValueTransformerWithKeySupplier,
String...)}
* (and siblings), that do not always guarantee to provide a valid topic
name, as they might be
* executed "out-of-band" due to some internal optimizations applied by
the Kafka Streams DSL.
+ * Additionally, when writing into a changelog topic, there is no
associated input record,
+ * and thus no topic name is available.
Review Comment:
Could you be more explicit here?
The error handler context is used in processing exception handler,
production exception handler, and deserialization exception handler. It is not
immediately clear that this sentence only applies to the production exception
handler.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -282,7 +284,22 @@ public <K, V> void send(final String topic,
topicProducedSensor.record(bytesProduced,
context.currentSystemTimeMs());
}
} else {
- recordSendError(topic, exception, serializedRecord, context,
processorNodeId);
+ final RecordContext recordContext = context.recordContext();
Review Comment:
Isn't the issue here that `context` might be `null`?
See
https://github.com/apache/kafka/blob/a4ea9aec73a3da470d079bad82d92810cac49d55/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L143
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -220,7 +222,7 @@ public <K, V> void send(final String topic,
partition,
timestamp,
processorNodeId,
- context,
+ context.recordContext(),
Review Comment:
See comment below.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -298,19 +315,19 @@ private <K, V> void handleException(final
ProductionExceptionHandler.Serializati
final Integer partition,
final Long timestamp,
final String processorNodeId,
- final InternalProcessorContext<Void,
Void> context,
+ final RecordContext recordContext,
final RuntimeException
serializationException) {
log.debug(String.format("Error serializing record for topic %s",
topic), serializationException);
final DefaultErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
null, // only required to pass for DeserializationExceptionHandler
- context.recordContext().topic(),
- context.recordContext().partition(),
- context.recordContext().offset(),
- context.recordContext().headers(),
+ recordContext != null ? recordContext.topic() : null,
+ recordContext != null ? recordContext.partition() : -1,
+ recordContext != null ? recordContext.offset() : -1,
+ recordContext != null ? recordContext.headers() : new
RecordHeaders(),
Review Comment:
To avoid code duplication, I think it is better to do this when you verify
that `context` is `null` in `send()`.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -244,7 +246,7 @@ public <K, V> void send(final String topic,
partition,
timestamp,
processorNodeId,
- context,
+ context.recordContext(),
Review Comment:
See comment below.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -282,7 +284,22 @@ public <K, V> void send(final String topic,
topicProducedSensor.record(bytesProduced,
context.currentSystemTimeMs());
}
} else {
- recordSendError(topic, exception, serializedRecord, context,
processorNodeId);
+ final RecordContext recordContext = context.recordContext();
+ recordSendError(
+ topic,
+ exception,
+ serializedRecord,
+ recordContext != null ?
+ new ProcessorRecordContext(
Review Comment:
I am not sure, whether this makes a difference. Once you get the reference
with `context.recordContext()` the record context will not change anymore since
it is an immutable object.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]