[
https://issues.apache.org/jira/browse/KAFKA-10049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117976#comment-17117976
]
Adam Bellemare commented on KAFKA-10049:
----------------------------------------
Is this us? I think this is their serializer complaining about their input
types.
{code:java}
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
com.messages.JSONSerdeCompatible
at com.messages.JSONSerdeComp.serialize(JSONSerdeComp.java:1)
~[classes/:?]
{code}
Isn't it that the com.messages.JSONSerdeComp.serialize() function is expecting
a com.message.JSONSerdeCompatible?
com.messages.* isn't showing up in maven repo for me, so I'm pretty sure this
is a private library issue.
To [~amicngh], try using one of the basic StringSerdes instead and see if this
still happens. The unit tests suggest it should not, but I just want to exclude
the com.messages.JSONSerdeComp library before going any further.
> KTable-KTable Foreign Key join throwing Serialization Exception
> ----------------------------------------------------------------
>
> Key: KAFKA-10049
> URL: https://issues.apache.org/jira/browse/KAFKA-10049
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.5.0, 2.6.0
> Reporter: Amit Chauhan
> Assignee: John Roesler
> Priority: Blocker
>
> I want to make use of _KTable-KTable_ Foreign Key join feature released in
> *_2.5.0_* but facing issue while running the code.
> {code:java}
>
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> "my-stream-processing-application-2");
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass());
> props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, new
> JSONSerdeComp<>().getClass());
> props.put(StreamsConfig.STATE_DIR_CONFIG, "C:\\temp");
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> StreamsBuilder builder = new StreamsBuilder();
> KTable<String, OrderObject> ordersTable = builder.<String,
> OrderObject>table(TOPIC_Agora);
> KTable<String, StockMarketData> stockTable = builder.<String,
> StockMarketData>table(TOPIC_Stock_Data);
> KTable<String, EnrichedOrder> enriched =
> ordersTable.leftJoin(stockTable, OrderObject:: getSymbol, new
> ValueJoiner<OrderObject, StockMarketData, EnrichedOrder>() {
> @Override
> public EnrichedOrder apply(OrderObject order, StockMarketData
> stock) {
> EnrichedOrder enOrder = EnrichedOrder.builder()
> .orderId(order.getOrderId())
> .execPrice(order.getPrice())
> .symbol(order.getSymbol())
> .quanity(order.getQuanity())
> .side(order.getSide())
> .filledQty(order.getFilledQty())
> .leaveQty(order.getLeaveQty())
> .index(order.getIndex())
> .vWaprelative(order.getVWaprelative())
>
> .stockAsk(stock!=null?stock.getAsk().doubleValue():0.0)
>
> .stockBid(stock!=null?stock.getBid().doubleValue():0.0)
>
> .stockLast(stock!=null?stock.getLast().doubleValue():0.0)
>
> .stockClose(stock!=null?stock.getClose().doubleValue():0.0)
> .build();
> return enOrder;
> }
> } , Materialized.with(Serdes.String(), new JSONSerdeComp<>()));
> enriched.toStream().foreach(new ForeachAction<String, EnrichedOrder>() \{
> @Override
> public void apply(String arg0, EnrichedOrder arg1) {
> logger.info(String.format("key = %s, value = %s", arg0, arg1));
> }
> });
> KafkaStreams streams = new KafkaStreams(builder.build(), props);
> streams.start();
> Runtime.getRuntime().addShutdownHook(new Thread(() -> streams.close()));
> }}}
>
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-clients</artifactId>
> <version>2.5.0</version>
> </dependency>
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-streams</artifactId>
> <version>2.5.0</version>
> </dependency>
> {code}
> *+Exception:+*
> {code:java}
> 18:49:31.525
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
> ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager -
> stream-thread
> [my-stream-processing-application-2-37cfd34a-6eb4-411a-a2dc-faa9194ce04e-StreamThread-1]
> task [0_0] Failed to flush state store orders-STATE-STORE-0000000000:
> org.apache.kafka.streams.errors.StreamsException: ClassCastException
> while producing data to a sink topic. A serializer (key:
> org.apache.kafka.common.serialization.StringSerializer / value:
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer)
> is not compatible to the actual key or value type (key type:
> java.lang.String / value type:
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper).
> Change the default Serdes in StreamConfig or provide correct Serdes via
> method parameters (for example if using the DSL, `#to(String topic,
> Produced<K, V> produced)` with
> `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:157)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:71)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:119)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$flush$7(MeteredKeyValueStore.java:192)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:192)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:282)
> [kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177)
> [kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:554)
> [kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:490)
> [kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)
> [kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)
> [kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)
> [kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)
> [kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)
> [kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
> [kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> [kafka-streams-2.5.0.jar:?]
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
> to com.messages.JSONSerdeCompatible
> at com.messages.JSONSerdeComp.serialize(JSONSerdeComp.java:1)
> ~[classes/:?]
> at
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:79)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:51)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
> ~[kafka-clients-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
> ~[kafka-streams-2.5.0.jar:?]
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
> ~[kafka-streams-2.5.0.jar:?]
> ... 34 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)