Disentinel opened a new issue, #7800:
URL: https://github.com/apache/iceberg/issues/7800

   ### Apache Iceberg version
   
   1.2.1 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   I got  `The Spark SQL phase optimization failed with an internal error. 
Please, fill a bug report in, and provide the full stack trace.`
   High-level def:
   - I have a Kafka structured streaming job that passes Mongo documents from 
the Kafka topic to the Iceberg table on S3.
   - I'm trying to speed up sync by adding more jobs in parallel. Each job read 
its own partition from Kafka and writes to it's own partition in Iceberg.
   - Glue catalog for metadata.
   - This is an intermediary version of the script since I'm facing troubles 
with concurrent writing to Iceberg.
   - I'm submitting this bug because Spark log asked me to =)
   
   Spark log with trace:
   ```
   23/06/08 06:33:01 ERROR MicroBatchExecution: Query [id = 
3e585ace-8106-4bdd-a011-967c7db7799a, runId = 
3ce50cdc-7dd7-44c8-8921-150568ccf4ae] terminated with error
   py4j.Py4JException: An exception was raised by the Python Proxy. Return 
Message: Traceback (most recent call last):
     File 
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", 
line 617, in _call_proxy
       return_value = getattr(self.pool[obj_id], method)(*params)
     File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", 
line 276, in call
       raise e
     File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", 
line 273, in call
       self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
     File 
"/tmp/spark-fad2f6c4-d2e6-4343-840c-6295df177f61/partitioned_sync.py", line 
131, in process
       df.sparkSession.sql("""
     File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/session.py", 
line 1034, in sql
       return DataFrame(self._jsparkSession.sql(sqlQuery), self)
     File 
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", 
line 1321, in __call__
       return_value = get_return_value(
     File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", 
line 190, in deco
       return f(*a, **kw)
     File 
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 
326, in get_return_value
       raise Py4JJavaError(
   py4j.protocol.Py4JJavaError: An error occurred while calling o92.sql.
   : org.apache.spark.SparkException: The Spark SQL phase optimization failed 
with an internal error. Please, fill a bug report in, and provide the full 
stack trace.
        at 
org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
        at 
org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
        at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
        at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
        at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
        at 
py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
        at com.sun.proxy.$Proxy38.call(Unknown Source)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
        at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
   Caused by: java.lang.NullPointerException
        at org.apache.iceberg.DeleteFileIndex.forEntry(DeleteFileIndex.java:122)
        at 
org.apache.iceberg.ManifestGroup.lambda$createFileScanTasks$13(ManifestGroup.java:351)
        at 
org.apache.iceberg.io.CloseableIterable$7$1.next(CloseableIterable.java:202)
        at 
org.apache.iceberg.util.ParallelIterable$ParallelIterator.lambda$new$1(ParallelIterable.java:69)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   
        at py4j.Protocol.getReturnValue(Protocol.java:476)
        at 
py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
        at com.sun.proxy.$Proxy38.call(Unknown Source)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
        at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
   Traceback (most recent call last):
     File 
"/tmp/spark-fad2f6c4-d2e6-4343-840c-6295df177f61/partitioned_sync.py", line 
184, in <module>
       SQ.awaitTermination()
     File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", 
line 107, in awaitTermination
     File 
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", 
line 1321, in __call__
     File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", 
line 196, in deco
   pyspark.sql.utils.StreamingQueryException: Query [id = 
3e585ace-8106-4bdd-a011-967c7db7799a, runId = 
3ce50cdc-7dd7-44c8-8921-150568ccf4ae] terminated with exception: An exception 
was raised by the Python Proxy. Return Message: Traceback (most recent call 
last):
     File 
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", 
line 617, in _call_proxy
       return_value = getattr(self.pool[obj_id], method)(*params)
     File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", 
line 276, in call
       raise e
     File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", 
line 273, in call
       self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
     File 
"/tmp/spark-fad2f6c4-d2e6-4343-840c-6295df177f61/partitioned_sync.py", line 
131, in process
       df.sparkSession.sql("""
     File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/session.py", 
line 1034, in sql
       return DataFrame(self._jsparkSession.sql(sqlQuery), self)
     File 
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", 
line 1321, in __call__
       return_value = get_return_value(
     File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", 
line 190, in deco
       return f(*a, **kw)
     File 
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 
326, in get_return_value
       raise Py4JJavaError(
   py4j.protocol.Py4JJavaError: An error occurred while calling o92.sql.
   : org.apache.spark.SparkException: The Spark SQL phase optimization failed 
with an internal error. Please, fill a bug report in, and provide the full 
stack trace.
        at 
org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
        at 
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
        at 
org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
        at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
        at 
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
        at 
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at 
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at 
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
        at 
py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
        at com.sun.proxy.$Proxy38.call(Unknown Source)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
        at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
   Caused by: java.lang.NullPointerException
        at org.apache.iceberg.DeleteFileIndex.forEntry(DeleteFileIndex.java:122)
        at 
org.apache.iceberg.ManifestGroup.lambda$createFileScanTasks$13(ManifestGroup.java:351)
        at 
org.apache.iceberg.io.CloseableIterable$7$1.next(CloseableIterable.java:202)
        at 
org.apache.iceberg.util.ParallelIterable$ParallelIterator.lambda$new$1(ParallelIterable.java:69)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   ```
   
   Job source code:
   ```
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import *
   from pyspark.sql.types import *
   from pyspark.sql.column import *
   import sys
   import os
   import http.client
   import json
   from datetime import datetime as dt
   import time
   
   dl_service = http.client.HTTPSConnection(sys.argv[5])
   headers = {'Content-Type': 'application/json'}
   
   HOSTNAME = sys.argv[1]
   DATABASE = sys.argv[2]
   COLLECTION = sys.argv[3]
   KAFKA_BOOTSTRAP_SERVER = sys.argv[4]
   OFFSETS_LIMIT = os.getenv("OFFSETS_LIMIT")
   LOG_LEVEL = os.getenv("LOG_LEVEL")
   JOB_NAME = os.getenv("JOB_NAME")
   PARTITIONS = os.getenv("PARTITIONS") or "[0,1,2,3,4,5,6,7,8,9,10,11]"
   
   if __name__ == "__main__":
       spark = (
           SparkSession.builder.appName(HOSTNAME+"-"+DATABASE+"-"+COLLECTION)
               .config('spark.sql.sources.partitionOverwriteMode=dynamic')
               .getOrCreate()
       )
       spark.sparkContext.setLogLevel(LOG_LEVEL)
   
   KAFKA_PASSWORD = os.getenv("KAFKA_PASSWORD")
   KAFKA_USER = os.getenv("KAFKA_USER")
   KAFKA_TOPIC_NAME = "dl."+COLLECTION
   ASSIGN_PATTERN = '{"'+KAFKA_TOPIC_NAME+'":'+PARTITIONS+"}"
   
   print('Starting kafka on topic '+KAFKA_TOPIC_NAME+PARTITIONS)
   
   payloadSchema = StructType([
       StructField('documentKey', StringType()),
       StructField('fullDocument', StringType()),
       StructField('operationType', StringType()),
       StructField('clusterTime', StringType()),
   ])
   
   keySchema = StructType([
       StructField('_id', StringType())
   ])
   
   clusterTimeSchema = StructType([
       StructField('$timestamp', StructType([
           StructField('t', StringType()),
           StructField('i', StringType()),
       ]))
   ])
   
   last_exec = dt.now()
   
   print('OFFSETS_LIMIT', OFFSETS_LIMIT)
   
   kafkaStream = spark.readStream.format("kafka") \
       .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER) \
       .option("kafka.sasl.mechanism", "PLAIN") \
       .option("kafka.security.protocol", "SASL_PLAINTEXT") \
       .option("kafka.sasl.jaas.config", 
'org.apache.kafka.common.security.plain.PlainLoginModule required 
username="'+KAFKA_USER+'" password="'+KAFKA_PASSWORD+'";') \
       .option("kafka.group.id", COLLECTION) \
       .option("failOnDataLoss", "false") \
       .option("assign", ASSIGN_PATTERN) \
       .option("maxOffsetsPerTrigger", OFFSETS_LIMIT) \
       .option("startingOffsets", "earliest")
   streamingDF = kafkaStream.load() \
       .selectExpr("CAST(value AS STRING)", "offset", "timestamp") \
       .withColumn("val", from_json("value", payloadSchema))  \
       .withColumn("key", from_json("val.documentKey", keySchema)) \
       .withColumn("clusterTime", from_json("val.clusterTime", 
clusterTimeSchema)) \
       .select(
           col('key._id').alias('_id'),
           col('val.fullDocument').alias('doc'),
           col('val.operationType').alias('op_type'),
           'offset',
           'timestamp',
           col('clusterTime.$timestamp.t').cast('int').alias('oplog_ts_t'),
           col('clusterTime.$timestamp.i').cast('int').alias('oplog_ts_i')
       )
   
   SQ = None
   
   def process(df, epoch_id):
       global last_exec
       global SQ
       print("========= batch =========")
       df.show()
       df.persist()
       df.createOrReplaceTempView("update_batch")
       # create database
       temp_view_start = time.perf_counter()
       df.sparkSession.sql("""
       CREATE DATABASE IF NOT EXISTS `{host}`;
       """.format(host=HOSTNAME))
       df.sparkSession.sql("""
       CREATE TABLE IF NOT EXISTS `{host}`.`{db}__{table}` (
           _id string COMMENT 'unique id',
           doc string,
           offset bigint,
           oplog_ts_t bigint,
           oplog_ts_i bigint,
           timestamp timestamp
       )
       USING iceberg PARTITIONED BY (bucket(12, _id));
       """.format(host=HOSTNAME, db=DATABASE, table=COLLECTION))
       
   # NOTE! I'm doing also ALTER TABLE from another script
   # ALTER TABLE `{host}`.`{db}__{table}` SET TBLPROPERTIES (
   #    'write.delete.mode' = 'merge-on-read',
   #    'write.update.mode' = 'merge-on-read',
   #    'write.merge.mode' = 'merge-on-read',
   #    'option.format-version' = '2',
   #    'format-version' = '2'
   # )
   
   # create temp view
       df.sparkSession.sql("""
       SELECT
           _id,
           FIRST(doc) as doc,
           FIRST(op_type) as op_type,
           FIRST(offset) as offset,
           FIRST(timestamp) as timestamp,
           FIRST(oplog_ts_t) as oplog_ts_t,
           FIRST(oplog_ts_i) as oplog_ts_i
       FROM (
           SELECT * FROM update_batch
           ORDER BY oplog_ts_t DESC, oplog_ts_i DESC
       )
       GROUP BY _id
       """).createOrReplaceTempView("last_state")
       # apply changes to table
       temp_view_end = time.perf_counter()
       print("temp view time", temp_view_end - temp_view_start)
       merge_start = time.perf_counter()
       df.sparkSession.sql("""
       MERGE INTO `{host}`.`{db}__{table}` t
       USING (
           SELECT _id, doc, op_type, offset, timestamp, oplog_ts_t, oplog_ts_i
           FROM last_state
       ) s
       ON t._id = s._id
       WHEN MATCHED AND
           s.op_type = 'delete' THEN DELETE
       WHEN MATCHED
           AND ISNULL(t.oplog_ts_t)
           OR s.oplog_ts_t > t.oplog_ts_t
           OR s.oplog_ts_t = t.oplog_ts_t AND s.oplog_ts_i > t.oplog_ts_i
       THEN UPDATE SET
           t.doc = s.doc,
           t.offset = s.offset,
           t.timestamp = s.timestamp,
           t.oplog_ts_t = s.oplog_ts_t,
           t.oplog_ts_i = s.oplog_ts_i
       WHEN NOT MATCHED AND
           s.op_type != 'delete'
       THEN INSERT (_id, doc, offset, oplog_ts_t, oplog_ts_i, timestamp)
       VALUES (s._id, s.doc, s.offset, s.oplog_ts_t, s.oplog_ts_i, s.timestamp)
       """.format(host=HOSTNAME, db=DATABASE, table=COLLECTION))
       merge_end = time.perf_counter()
       print("merge time", merge_end - merge_start)
       print('processed')
       df.unpersist()
   
   SQ = streamingDF.writeStream.format('console') \
       .outputMode('append') \
       .option("checkpointLocation", 
"s3a://MY_BUCKET_NAME/checkpoints/"+JOB_NAME) \
       .foreachBatch(process) \
       .start()
   
   while SQ.isActive:
       if SQ.lastProgress is not None:
           endOffsets = 
SQ.lastProgress['sources'][0]['endOffset'][KAFKA_TOPIC_NAME]
           print('SENDING LAST OFFSET', endOffsets)
           data = {
               'spark': spark.sparkContext.getConf().getAll(),
               'topic': KAFKA_TOPIC_NAME,
               'last_progress': SQ.lastProgress,
               'exec': sys.argv[0]
               }
           json_data = json.dumps(data)
           dl_service.request('POST', '/datalake/metric', json_data, headers)
           response = dl_service.getresponse()
           print(response.read().decode())
       else:
           print('NO LAST PROGRESS DEFINED YET')
       time.sleep(60)
   
   SQ.awaitTermination()
   
   print('Waiting!')
   ```
   
   Some of related spark config options:
   ```
   'spark.jars.packages': 'org.apache.spark:'
           +'spark-sql-kafka-0-10_2.12:3.3.2,'
           +'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1,'
           +'software.amazon.awssdk:bundle:2.20.56,'
           +'software.amazon.awssdk:url-connection-client:2.20.56',
       'spark.sql.extensions': 'org.apache.iceberg.spark.'
           +'extensions.IcebergSparkSessionExtensions',
       'spark.sql.broadcastTimeout': '3600',
       'spark.sql.defaultCatalog': 'datalake',
       'spark.sql.catalog.datalake': 'org.apache.iceberg.'
           +'spark.SparkCatalog',
       'spark.sql.catalog.datalake.warehouse': 's3://MY_BUCKET_NAME/dbs',
       'spark.sql.catalog.datalake.catalog-impl':
           'org.apache.iceberg.aws.glue.GlueCatalog',
       'spark.sql.catalog.datalake.io-impl':
           'org.apache.iceberg.aws.s3.S3FileIO',
       'spark.kafka.session.timeout.ms': '1800000',`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to