[
https://issues.apache.org/jira/browse/KAFKA-12396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17323977#comment-17323977
]
Matthias J. Sax commented on KAFKA-12396:
-----------------------------------------
Thanks for the PR [~Nathan22177] – I added you to the list on contributors and
assigned the ticket to you. You can now also self-assign tickets.
Also left a comment on the PR.
> Dedicated exception for kstreams when null key received
> -------------------------------------------------------
>
> Key: KAFKA-12396
> URL: https://issues.apache.org/jira/browse/KAFKA-12396
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 2.6.0
> Reporter: Veniamin Kalegin
> Assignee: Valery Kokorev
> Priority: Trivial
> Labels: beginner, newbie
>
> If kstreams application received null as a key (thanks to QA), kstream app
> gives long and confusing stack trace, it would be nice to have shorter and
> specific exception instead of
> {{org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=(hidden),
> partition=0, offset=3722, stacktrace=java.lang.NullPointerException}}
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:29)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1048/0x0000000060630fd0.get(Unknown
> Source)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133)
> at
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:78)
> at
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$$Lambda$1047/0x0000000060630b10.run(Unknown
> Source)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
> at
> org.apache.kafka.streams.processor.internals.StreamTask$$Lambda$1046/0x00000000605250f0.run(Unknown
> Source)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> Caused by: java.lang.NullPointerException
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:286)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.get(RocksDBStore.java:74)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:94)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.get(ChangeLoggingKeyValueBytesStore.java:29)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:133)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$$Lambda$1048/0x0000000060630fd0.get(Unknown
> Source)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:133)
> at
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.get(AbstractReadWriteDecorator.java:78)
> at
> HPX.Utilities.Java.Streams.MessageProcessing.ErrorHandlingAggregateTransformer.transform(ErrorHandlingAggregateTransformer.java:103)
> at
> HPX.Utilities.Java.Streams.MessageProcessing.ErrorHandlingAggregateTransformer.transform(ErrorHandlingAggregateTransformer.java:29)
> at
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:64)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$$Lambda$1047/0x0000000060630b10.run(Unknown
> Source)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
> at
> org.apache.kafka.streams.processor.internals.StreamTask$$Lambda$1046/0x00000000605250f0.run(Unknown
> Source)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
> ... 4 more
--
This message was sent by Atlassian Jira
(v8.3.4#803005)