HannesDS opened a new issue, #6266: URL: https://github.com/apache/iceberg/issues/6266
### Apache Iceberg version 1.0.0 (latest release) ### Query engine Spark ### Please describe the bug 🐞 **Application**: We are trying to use spark structured streaming in combination with iceberg. The application is containerised and behind the scenes running on EKS. Spark structured streaming worked well until we tried to add the iceberg extension. We have used iceberg successfully before in a local spark session before (not structured streaming). **Environment** We have tried with the following versions: Iceberg version: 1.0.0 ; 0.14.1 ; 0.13.1 Spark version: 3.2.0 ; 3.0.3 AWS SDK version: 2.17.257 ; 2.18.22 Scala version: 2.12 Extra configuration included into the spark session: ` REMOTE_CATALOG_NAME: str = "remote_glue_catalog" config.setAll( [ ( "spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.0_2.12:0.14.1", ), ("spark.jars.packages", "software.amazon.awssdk:bundle:2.17.257"), ( "spark.jars.packages", "software.amazon.awssdk:url-connection-client:2.17.257", ), ( f"spark.sql.catalog.{REMOTE_CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog", ), ( f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.warehouse", f"{aws.datalake_root_path()}/{table_name}/", ), ( f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog", ), ( f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO", ), ( "spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", ), (f"spark.sql.defaultCatalog", REMOTE_CATALOG_NAME), ] )` **Stacktrace** `22/11/24 09:18:36 WARN SparkSession: Cannot use org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions to configure session extensions. java.lang.ClassNotFoundException: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions at java.base/java.net.URLClassLoader.findClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Unknown Source) at org.apache.spark.util.Utils$.classForName(Utils.scala:206) at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1160) at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1158) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1158) at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:101) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:238) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Unknown Source) .... 22/11/24 09:18:45 at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:564) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) at org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245) 22/11/24 09:18:45 ERROR MicroBatchExecution: Query [id = , runId = ] terminated with error org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'remote_glue_catalog': org.apache.iceberg.spark.SparkCatalog at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:66) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52) at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:122) at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$currentNamespace$1(CatalogManager.scala:100) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:100) at org.apache.spark.sql.catalyst.optimizer.GetCurrentDatabase.apply(finishAnalysis.scala:98) at org.apache.spark.sql.catalyst.optimizer.GetCurrentDatabase.apply(finishAnalysis.scala:95) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149) at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116) at org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:138) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81) at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:90) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:108) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:105) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:574) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:564) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245) ERROR:root:<traceback object at 0x7f50f454a640> Traceback (most recent call last): File "/opt/spark/work-dir/src/app/streaming/main.py", line 11, in <module> streaming_main(registry=streaming_registry, ) File "/opt/spark/work-dir/src/app/app.py", line 20, in main start_job(session, job=job, environment=args.env, kwargs=args.kwargs) File "/opt/spark/work-dir/src/app/app.py", line 44, in start_job job.run(session, kwargs=kwargs) File "/opt/spark/work-dir/src/elia_streaming/jobs/current_system_imbalance/producer.py", line 31, in run iceberg.awaitTermination() File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 103, in awaitTermination File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__ File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134, in deco File "<string>", line 3, in raise_from pyspark.sql.utils.StreamingQueryException: Cannot find catalog plugin class for catalog 'remote_glue_catalog': org.apache.iceberg.spark.SparkCatalog` **Debug checks**: - [x] We have checked the docker container if the jars are included. - [x] We checked the spark config at runtime (logged) to make sure all options were included which they were - [x] Checked several version of spark + iceberg **Question** So anybody know what is going on? My guess is that with spark structured streaming the jars aren't really added to the classpath for the different executors but not sure. Thanks in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org