[
https://issues.apache.org/jira/browse/KAFKA-16573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ayoub Omari reassigned KAFKA-16573:
-----------------------------------
Assignee: Ayoub Omari
> Streams does not specify where a Serde is needed
> ------------------------------------------------
>
> Key: KAFKA-16573
> URL: https://issues.apache.org/jira/browse/KAFKA-16573
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 3.7.0
> Reporter: Ayoub Omari
> Assignee: Ayoub Omari
> Priority: Minor
>
> Example topology:
> {code:java}
> builder
> .table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
> .groupBy((key, value) => new KeyValue(value, key))
> .count()
> .toStream()
> .to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
> {code}
> At runtime, we get the following exception
> {code:java}
> Please specify a key serde or set one through
> StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> org.apache.kafka.common.config.ConfigException: Please specify a key serde or
> set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
> at
> org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
> at
> org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
> at
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
> at
> org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
> at
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
> The error does not give information about the line or the processor causing
> the issue.
> Here a Grouped was missing inside the groupBy, but because the groupBy api
> doesn't force to define Grouped, this one can be missed, and it could be
> difficult to spot on a more complex topology.
> Also, for someone who needs control over serdes in the topology and doesn't
> want to define default serdes.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)