[
https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16831897#comment-16831897
]
John Roesler edited comment on KAFKA-8317 at 5/2/19 7:41 PM:
-------------------------------------------------------------
It looks like the windowed aggregation isn't properly setting the serde for
downstream. I thought I'd already fixed this, but there might be another edge
case...
Can you provide a more complete code snippet? In particular, we'd need to see
how you're building the grouped stream before aggregate. Also, how you're
building `materialized`.
Thanks for the report!
was (Author: vvcephei):
It looks like the windowed aggregation isn't properly setting the serde for
downstream. I'll take a look. I thought I'd already fixed this.
> ClassCastException using KTable.suppress()
> ------------------------------------------
>
> Key: KAFKA-8317
> URL: https://issues.apache.org/jira/browse/KAFKA-8317
> Project: Kafka
> Issue Type: Bug
> Reporter: Andrew
> Priority: Major
>
> I am trying to use `KTable.suppress()` and I am getting the following error :
> {Code}
> java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed
> cannot be cast to java.lang.String
> at
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
> at
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
> at
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
> at
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> {Code}
> My code is as follows :
> {Code}
> final KTable<Windowed<Object>, GenericRecord> groupTable =
> groupedStream
> .aggregate(lastAggregator, lastAggregator, materialized);
> final KTable<Windowed<Object>, GenericRecord> suppressedTable =
> groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
> // write the change-log stream to the topic
> suppressedTable.toStream((k, v) -> k.key())
> .mapValues(joinValueMapper::apply)
> .to(props.joinTopic());
> {Code}
> The code without using `suppressedTable` works... what am i doing wrong.
> Someone else has encountered the same issue :
> https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
> Slack conversation :
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)