Hi All
I am trying to run a flink scala application which reads from kafka apply some
lookup transformations and then writes to kafka.
I am using Flink Version 1.12.1
I tested it in local and it works fine. But when I try to run it on cluster
using native kubernetes integration I see weird errors like below.
The cluster also looks fine, because I tried to run a wordcount application on
the cluster and it worked fine.
The exception is not clear and also the stacktrace shows the taskmanager stack
trace and hence no idea where in the application the problem could be. Could
this be a serialization issue? Is there a way to debug such issues and find the
actual point in application code where there is a problem?
```org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
instantiate serializer.
at
org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:160)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
... 8 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.ClassNotFoundException:
__wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
at
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3557)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2302)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2289)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown
Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
... 8 more
Caused by: java.lang.ClassNotFoundException:
__wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
at
scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:64)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at java.lang.Class.forName0(Native Method) ~[?:?]
at java.lang.Class.forName(Unknown Source) ~[?:?]
at
scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:261)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:359)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:426)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.compileCbfInternal(TraversableSerializer.scala:230)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:220)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:216)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown
Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
at
org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214)
~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
... 8 more```