HannesDS opened a new issue, #6266:
URL: https://github.com/apache/iceberg/issues/6266

   ### Apache Iceberg version
   
   1.0.0 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   **Application**:
   
   We are trying to use spark structured streaming in combination with iceberg. 
The application is containerised and behind the scenes running on EKS. Spark 
structured streaming worked well until we tried to add the iceberg extension. 
We have used iceberg successfully before in a local spark session before (not 
structured streaming). 
   
   **Environment**
   We have tried with the following versions:
   
   Iceberg version: 1.0.0 ; 0.14.1 ; 0.13.1
   Spark version: 3.2.0 ; 3.0.3
   AWS SDK version: 2.17.257 ; 2.18.22
   Scala version: 2.12
   
   Extra configuration included into the spark session:
   
   `
   REMOTE_CATALOG_NAME: str = "remote_glue_catalog"
   
   config.setAll(
           [
               (
                   "spark.jars.packages",
                   "org.apache.iceberg:iceberg-spark-runtime-3.0_2.12:0.14.1",
               ),
               ("spark.jars.packages", 
"software.amazon.awssdk:bundle:2.17.257"),
               (
                   "spark.jars.packages",
                   "software.amazon.awssdk:url-connection-client:2.17.257",
               ),
               (
                   f"spark.sql.catalog.{REMOTE_CATALOG_NAME}",
                   "org.apache.iceberg.spark.SparkCatalog",
               ),
               (
                   f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.warehouse",
                   f"{aws.datalake_root_path()}/{table_name}/",
               ),
               (
                   f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.catalog-impl",
                   "org.apache.iceberg.aws.glue.GlueCatalog",
               ),
               (
                   f"spark.sql.catalog.{REMOTE_CATALOG_NAME}.io-impl",
                   "org.apache.iceberg.aws.s3.S3FileIO",
               ),
               (
                   "spark.sql.extensions",
                   
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
               ),
               (f"spark.sql.defaultCatalog", REMOTE_CATALOG_NAME),
           ]
       )`
   
   **Stacktrace**
   
   `22/11/24 09:18:36 WARN SparkSession: Cannot use 
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions to configure 
session extensions.
   java.lang.ClassNotFoundException: 
org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
   at java.base/java.net.URLClassLoader.findClass(Unknown Source)
   at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
   at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
   at java.base/java.lang.Class.forName0(Native Method)
   at java.base/java.lang.Class.forName(Unknown Source)
   at org.apache.spark.util.Utils$.classForName(Utils.scala:206)
   at 
org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1160)
   at 
org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1158)
   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   at 
org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1158)
   at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:101)
   at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native
 Method)
   at 
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown
 Source)
   at 
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
 Source)
   at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
   at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
   at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
   at py4j.Gateway.invoke(Gateway.java:238)
   at 
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
   at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
   at py4j.GatewayConnection.run(GatewayConnection.java:238)
   at java.base/java.lang.Thread.run(Unknown Source)
   
   ....
   22/11/24 09:18:45
   at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
   at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:564)
   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
   at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
   at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
   at 
org.apache.spark.sql.execution.streaming.OneTimeExecutor.execute(TriggerExecutor.scala:39)
   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
   at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
   at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
   22/11/24 09:18:45 ERROR MicroBatchExecution: Query [id = , runId = ] 
terminated with error
   org.apache.spark.SparkException: Cannot find catalog plugin class for 
catalog 'remote_glue_catalog': org.apache.iceberg.spark.SparkCatalog
   at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:66)
   at 
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
   at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
   at 
org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
   at 
org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:122)
   at 
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$currentNamespace$1(CatalogManager.scala:100)
   at scala.Option.getOrElse(Option.scala:189)
   at 
org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:100)
   at 
org.apache.spark.sql.catalyst.optimizer.GetCurrentDatabase.apply(finishAnalysis.scala:98)
   at 
org.apache.spark.sql.catalyst.optimizer.GetCurrentDatabase.apply(finishAnalysis.scala:95)
   at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
   at 
scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
   at 
scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
   at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
   at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
   at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
   at scala.collection.immutable.List.foreach(List.scala:392)
   at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
   at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
   at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
   at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
   at 
org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
   at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
   at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:138)
   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
   at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:138)
   at 
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81)
   at 
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
   at 
org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:90)
   at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:108)
   at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:105)
   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:574)
   at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
   at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
   at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:564)
   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
   at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
   at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
   at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
   at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
   at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
   at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)
   ERROR:root:<traceback object at 0x7f50f454a640>
   Traceback (most recent call last):
   File "/opt/spark/work-dir/src/app/streaming/main.py", line 11, in <module>
   streaming_main(registry=streaming_registry, )
   File "/opt/spark/work-dir/src/app/app.py", line 20, in main
   start_job(session, job=job, environment=args.env, kwargs=args.kwargs)
   File "/opt/spark/work-dir/src/app/app.py", line 44, in start_job
   job.run(session, kwargs=kwargs)
   File 
"/opt/spark/work-dir/src/elia_streaming/jobs/current_system_imbalance/producer.py",
 line 31, in run
   iceberg.awaitTermination()
   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 103, 
in awaitTermination
   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 
1304, in __call__
   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134, in 
deco
   File "<string>", line 3, in raise_from
   pyspark.sql.utils.StreamingQueryException: Cannot find catalog plugin class 
for catalog 'remote_glue_catalog': org.apache.iceberg.spark.SparkCatalog`
   
   **Debug checks**:
   - [x]  We have checked the docker container if the jars are included.
   - [x] We checked the spark config at runtime (logged) to make sure all 
options were included which they were
   - [x] Checked several version of spark + iceberg
   
   **Question**
   So anybody know what is going on?
   My guess is that with spark structured streaming the jars aren't really 
added to the classpath for the different executors but not sure. 
   
   Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to