Disentinel opened a new issue, #7800:
URL: https://github.com/apache/iceberg/issues/7800
### Apache Iceberg version
1.2.1 (latest release)
### Query engine
Spark
### Please describe the bug 🐞
I got `The Spark SQL phase optimization failed with an internal error.
Please, fill a bug report in, and provide the full stack trace.`
High-level def:
- I have a Kafka structured streaming job that passes Mongo documents from
the Kafka topic to the Iceberg table on S3.
- I'm trying to speed up sync by adding more jobs in parallel. Each job read
its own partition from Kafka and writes to it's own partition in Iceberg.
- Glue catalog for metadata.
- This is an intermediary version of the script since I'm facing troubles
with concurrent writing to Iceberg.
- I'm submitting this bug because Spark log asked me to =)
Spark log with trace:
```
23/06/08 06:33:01 ERROR MicroBatchExecution: Query [id =
3e585ace-8106-4bdd-a011-967c7db7799a, runId =
3ce50cdc-7dd7-44c8-8921-150568ccf4ae] terminated with error
py4j.Py4JException: An exception was raised by the Python Proxy. Return
Message: Traceback (most recent call last):
File
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py",
line 617, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 276, in call
raise e
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 273, in call
self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
File
"/tmp/spark-fad2f6c4-d2e6-4343-840c-6295df177f61/partitioned_sync.py", line
131, in process
df.sparkSession.sql("""
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/session.py",
line 1034, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self)
File
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py",
line 1321, in __call__
return_value = get_return_value(
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 190, in deco
return f(*a, **kw)
File
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line
326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o92.sql.
: org.apache.spark.SparkException: The Spark SQL phase optimization failed
with an internal error. Please, fill a bug report in, and provide the full
stack trace.
at
org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
at
org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
at
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
at
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
at
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at
py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy38.call(Unknown Source)
at
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
at
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
at
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.lang.NullPointerException
at org.apache.iceberg.DeleteFileIndex.forEntry(DeleteFileIndex.java:122)
at
org.apache.iceberg.ManifestGroup.lambda$createFileScanTasks$13(ManifestGroup.java:351)
at
org.apache.iceberg.io.CloseableIterable$7$1.next(CloseableIterable.java:202)
at
org.apache.iceberg.util.ParallelIterable$ParallelIterator.lambda$new$1(ParallelIterable.java:69)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
at py4j.Protocol.getReturnValue(Protocol.java:476)
at
py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
at com.sun.proxy.$Proxy38.call(Unknown Source)
at
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
at
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
at
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Traceback (most recent call last):
File
"/tmp/spark-fad2f6c4-d2e6-4343-840c-6295df177f61/partitioned_sync.py", line
184, in <module>
SQ.awaitTermination()
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py",
line 107, in awaitTermination
File
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py",
line 1321, in __call__
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 196, in deco
pyspark.sql.utils.StreamingQueryException: Query [id =
3e585ace-8106-4bdd-a011-967c7db7799a, runId =
3ce50cdc-7dd7-44c8-8921-150568ccf4ae] terminated with exception: An exception
was raised by the Python Proxy. Return Message: Traceback (most recent call
last):
File
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py",
line 617, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 276, in call
raise e
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 273, in call
self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
File
"/tmp/spark-fad2f6c4-d2e6-4343-840c-6295df177f61/partitioned_sync.py", line
131, in process
df.sparkSession.sql("""
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/session.py",
line 1034, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self)
File
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py",
line 1321, in __call__
return_value = get_return_value(
File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
line 190, in deco
return f(*a, **kw)
File
"/opt/bitnami/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line
326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o92.sql.
: org.apache.spark.SparkException: The Spark SQL phase optimization failed
with an internal error. Please, fill a bug report in, and provide the full
stack trace.
at
org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500)
at
org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
at
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
at
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
at
org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
at
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
at
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
at
org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
at
org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at
org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at
org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at
py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at
py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy38.call(Unknown Source)
at
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:51)
at
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:51)
at
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:32)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:665)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:663)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:663)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
at
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.lang.NullPointerException
at org.apache.iceberg.DeleteFileIndex.forEntry(DeleteFileIndex.java:122)
at
org.apache.iceberg.ManifestGroup.lambda$createFileScanTasks$13(ManifestGroup.java:351)
at
org.apache.iceberg.io.CloseableIterable$7$1.next(CloseableIterable.java:202)
at
org.apache.iceberg.util.ParallelIterable$ParallelIterator.lambda$new$1(ParallelIterable.java:69)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
```
Job source code:
```
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.column import *
import sys
import os
import http.client
import json
from datetime import datetime as dt
import time
dl_service = http.client.HTTPSConnection(sys.argv[5])
headers = {'Content-Type': 'application/json'}
HOSTNAME = sys.argv[1]
DATABASE = sys.argv[2]
COLLECTION = sys.argv[3]
KAFKA_BOOTSTRAP_SERVER = sys.argv[4]
OFFSETS_LIMIT = os.getenv("OFFSETS_LIMIT")
LOG_LEVEL = os.getenv("LOG_LEVEL")
JOB_NAME = os.getenv("JOB_NAME")
PARTITIONS = os.getenv("PARTITIONS") or "[0,1,2,3,4,5,6,7,8,9,10,11]"
if __name__ == "__main__":
spark = (
SparkSession.builder.appName(HOSTNAME+"-"+DATABASE+"-"+COLLECTION)
.config('spark.sql.sources.partitionOverwriteMode=dynamic')
.getOrCreate()
)
spark.sparkContext.setLogLevel(LOG_LEVEL)
KAFKA_PASSWORD = os.getenv("KAFKA_PASSWORD")
KAFKA_USER = os.getenv("KAFKA_USER")
KAFKA_TOPIC_NAME = "dl."+COLLECTION
ASSIGN_PATTERN = '{"'+KAFKA_TOPIC_NAME+'":'+PARTITIONS+"}"
print('Starting kafka on topic '+KAFKA_TOPIC_NAME+PARTITIONS)
payloadSchema = StructType([
StructField('documentKey', StringType()),
StructField('fullDocument', StringType()),
StructField('operationType', StringType()),
StructField('clusterTime', StringType()),
])
keySchema = StructType([
StructField('_id', StringType())
])
clusterTimeSchema = StructType([
StructField('$timestamp', StructType([
StructField('t', StringType()),
StructField('i', StringType()),
]))
])
last_exec = dt.now()
print('OFFSETS_LIMIT', OFFSETS_LIMIT)
kafkaStream = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER) \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_PLAINTEXT") \
.option("kafka.sasl.jaas.config",
'org.apache.kafka.common.security.plain.PlainLoginModule required
username="'+KAFKA_USER+'" password="'+KAFKA_PASSWORD+'";') \
.option("kafka.group.id", COLLECTION) \
.option("failOnDataLoss", "false") \
.option("assign", ASSIGN_PATTERN) \
.option("maxOffsetsPerTrigger", OFFSETS_LIMIT) \
.option("startingOffsets", "earliest")
streamingDF = kafkaStream.load() \
.selectExpr("CAST(value AS STRING)", "offset", "timestamp") \
.withColumn("val", from_json("value", payloadSchema)) \
.withColumn("key", from_json("val.documentKey", keySchema)) \
.withColumn("clusterTime", from_json("val.clusterTime",
clusterTimeSchema)) \
.select(
col('key._id').alias('_id'),
col('val.fullDocument').alias('doc'),
col('val.operationType').alias('op_type'),
'offset',
'timestamp',
col('clusterTime.$timestamp.t').cast('int').alias('oplog_ts_t'),
col('clusterTime.$timestamp.i').cast('int').alias('oplog_ts_i')
)
SQ = None
def process(df, epoch_id):
global last_exec
global SQ
print("========= batch =========")
df.show()
df.persist()
df.createOrReplaceTempView("update_batch")
# create database
temp_view_start = time.perf_counter()
df.sparkSession.sql("""
CREATE DATABASE IF NOT EXISTS `{host}`;
""".format(host=HOSTNAME))
df.sparkSession.sql("""
CREATE TABLE IF NOT EXISTS `{host}`.`{db}__{table}` (
_id string COMMENT 'unique id',
doc string,
offset bigint,
oplog_ts_t bigint,
oplog_ts_i bigint,
timestamp timestamp
)
USING iceberg PARTITIONED BY (bucket(12, _id));
""".format(host=HOSTNAME, db=DATABASE, table=COLLECTION))
# NOTE! I'm doing also ALTER TABLE from another script
# ALTER TABLE `{host}`.`{db}__{table}` SET TBLPROPERTIES (
# 'write.delete.mode' = 'merge-on-read',
# 'write.update.mode' = 'merge-on-read',
# 'write.merge.mode' = 'merge-on-read',
# 'option.format-version' = '2',
# 'format-version' = '2'
# )
# create temp view
df.sparkSession.sql("""
SELECT
_id,
FIRST(doc) as doc,
FIRST(op_type) as op_type,
FIRST(offset) as offset,
FIRST(timestamp) as timestamp,
FIRST(oplog_ts_t) as oplog_ts_t,
FIRST(oplog_ts_i) as oplog_ts_i
FROM (
SELECT * FROM update_batch
ORDER BY oplog_ts_t DESC, oplog_ts_i DESC
)
GROUP BY _id
""").createOrReplaceTempView("last_state")
# apply changes to table
temp_view_end = time.perf_counter()
print("temp view time", temp_view_end - temp_view_start)
merge_start = time.perf_counter()
df.sparkSession.sql("""
MERGE INTO `{host}`.`{db}__{table}` t
USING (
SELECT _id, doc, op_type, offset, timestamp, oplog_ts_t, oplog_ts_i
FROM last_state
) s
ON t._id = s._id
WHEN MATCHED AND
s.op_type = 'delete' THEN DELETE
WHEN MATCHED
AND ISNULL(t.oplog_ts_t)
OR s.oplog_ts_t > t.oplog_ts_t
OR s.oplog_ts_t = t.oplog_ts_t AND s.oplog_ts_i > t.oplog_ts_i
THEN UPDATE SET
t.doc = s.doc,
t.offset = s.offset,
t.timestamp = s.timestamp,
t.oplog_ts_t = s.oplog_ts_t,
t.oplog_ts_i = s.oplog_ts_i
WHEN NOT MATCHED AND
s.op_type != 'delete'
THEN INSERT (_id, doc, offset, oplog_ts_t, oplog_ts_i, timestamp)
VALUES (s._id, s.doc, s.offset, s.oplog_ts_t, s.oplog_ts_i, s.timestamp)
""".format(host=HOSTNAME, db=DATABASE, table=COLLECTION))
merge_end = time.perf_counter()
print("merge time", merge_end - merge_start)
print('processed')
df.unpersist()
SQ = streamingDF.writeStream.format('console') \
.outputMode('append') \
.option("checkpointLocation",
"s3a://MY_BUCKET_NAME/checkpoints/"+JOB_NAME) \
.foreachBatch(process) \
.start()
while SQ.isActive:
if SQ.lastProgress is not None:
endOffsets =
SQ.lastProgress['sources'][0]['endOffset'][KAFKA_TOPIC_NAME]
print('SENDING LAST OFFSET', endOffsets)
data = {
'spark': spark.sparkContext.getConf().getAll(),
'topic': KAFKA_TOPIC_NAME,
'last_progress': SQ.lastProgress,
'exec': sys.argv[0]
}
json_data = json.dumps(data)
dl_service.request('POST', '/datalake/metric', json_data, headers)
response = dl_service.getresponse()
print(response.read().decode())
else:
print('NO LAST PROGRESS DEFINED YET')
time.sleep(60)
SQ.awaitTermination()
print('Waiting!')
```
Some of related spark config options:
```
'spark.jars.packages': 'org.apache.spark:'
+'spark-sql-kafka-0-10_2.12:3.3.2,'
+'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.2.1,'
+'software.amazon.awssdk:bundle:2.20.56,'
+'software.amazon.awssdk:url-connection-client:2.20.56',
'spark.sql.extensions': 'org.apache.iceberg.spark.'
+'extensions.IcebergSparkSessionExtensions',
'spark.sql.broadcastTimeout': '3600',
'spark.sql.defaultCatalog': 'datalake',
'spark.sql.catalog.datalake': 'org.apache.iceberg.'
+'spark.SparkCatalog',
'spark.sql.catalog.datalake.warehouse': 's3://MY_BUCKET_NAME/dbs',
'spark.sql.catalog.datalake.catalog-impl':
'org.apache.iceberg.aws.glue.GlueCatalog',
'spark.sql.catalog.datalake.io-impl':
'org.apache.iceberg.aws.s3.S3FileIO',
'spark.kafka.session.timeout.ms': '1800000',`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]