This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a5a8ec2 [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more
Pythonic
a5a8ec2 is described below
commit a5a8ec2ca8e75207b4e6c7b76ab6214be4a4237e
Author: HyukjinKwon <[email protected]>
AuthorDate: Mon Jun 1 09:45:21 2020 +0900
[SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic
### What changes were proposed in this pull request?
This PR proposes to make PySpark exception more Pythonic by hiding JVM
stacktrace by default. It can be enabled by turning on
`spark.sql.pyspark.jvmStacktrace.enabled` configuration.
```
Traceback (most recent call last):
...
pyspark.sql.utils.PythonException:
An exception was thrown from Python worker in the executor. The below is
the Python worker stacktrace.
Traceback (most recent call last):
...
```
If this `spark.sql.pyspark.jvmStacktrace.enabled` is enabled, it appends:
```
JVM stacktrace:
org.apache.spark.Exception: ...
...
```
For example, the codes below:
```python
from pyspark.sql.functions import udf
udf
def divide_by_zero(v):
raise v / 0
spark.range(1).select(divide_by_zero("id")).show()
```
will show an error messages that looks like Python exception thrown from
the local.
<details>
<summary>Python exception message when
<code>spark.sql.pyspark.jvmStacktrace.enabled</code> is off (default)</summary>
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show
print(self._jdf.showString(n, 20, vertical))
File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1305, in __call__
File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
An exception was thrown from Python worker in the executor. The below is
the Python worker stacktrace.
Traceback (most recent call last):
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in
main
process()
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in
process
serializer.dump_stream(out_iter, outfile)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
223, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
141, in dump_stream
for obj in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
212, in _batched
for item in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
<genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in
<lambda>
return lambda *a: f(*a)
File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in
wrapper
return f(*args, **kwargs)
File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero
```
</details>
<details>
<summary>Python exception message when
<code>spark.sql.pyspark.jvmStacktrace.enabled</code> is on</summary>
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show
print(self._jdf.showString(n, 20, vertical))
File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1305, in __call__
File "/.../spark/python/pyspark/sql/utils.py", line 137, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
An exception was thrown from Python worker in the executor. The below is
the Python worker stacktrace.
Traceback (most recent call last):
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in
main
process()
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in
process
serializer.dump_stream(out_iter, outfile)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
223, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
141, in dump_stream
for obj in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
212, in _batched
for item in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
<genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in
<lambda>
return lambda *a: f(*a)
File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in
wrapper
return f(*args, **kwargs)
File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero
JVM stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0
(TID 4, 192.168.35.193, executor 0):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in
main
process()
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in
process
serializer.dump_stream(out_iter, outfile)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
223, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
141, in dump_stream
for obj in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
212, in _batched
for item in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
<genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in
<lambda>
return lambda *a: f(*a)
File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in
wrapper
return f(*args, **kwargs)
File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most
recent call last):
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in
main
process()
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in
process
serializer.dump_stream(out_iter, outfile)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
223, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
141, in dump_stream
for obj in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
212, in _batched
for item in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
<genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in
<lambda>
return lambda *a: f(*a)
File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in
wrapper
return f(*args, **kwargs)
File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
```
</details>
<details>
<summary>Python exception message without this change</summary>
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show
print(self._jdf.showString(n, 20, vertical))
File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1305, in __call__
File "/.../spark/python/pyspark/sql/utils.py", line 98, in deco
return f(*a, **kw)
File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line
328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
o160.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
10 in stage 5.0 failed 4 times, most recent failure: Lost task 10.3 in stage
5.0 (TID 37, 192.168.35.193, executor 3):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in
main
process()
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in
process
serializer.dump_stream(out_iter, outfile)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
223, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
141, in dump_stream
for obj in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
212, in _batched
for item in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
<genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in
<lambda>
return lambda *a: f(*a)
File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in
wrapper
return f(*args, **kwargs)
File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065)
at
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
at
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
at
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most
recent call last):
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in
main
process()
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in
process
serializer.dump_stream(out_iter, outfile)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
223, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
141, in dump_stream
for obj in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line
212, in _batched
for item in iterator:
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in
<genexpr>
result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in
udfs)
File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in
<lambda>
return lambda *a: f(*a)
File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in
wrapper
return f(*args, **kwargs)
File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
at
org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
```
</details>
<br/>
Another example with Python 3.7:
```python
sql("a")
```
<details>
<summary>Python exception message when
<code>spark.sql.pyspark.jvmStacktrace.enabled</code> is off (default)</summary>
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/session.py", line 646, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1305, in __call__
File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE',
'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS',
'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST',
'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE',
'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE',
'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos
0)
== SQL ==
a
^^^
```
</details>
<details>
<summary>Python exception message when
<code>spark.sql.pyspark.jvmStacktrace.enabled</code> is on</summary>
```
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/session.py", line 646, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1305, in __call__
File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE',
'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS',
'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST',
'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE',
'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE',
'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos
0)
== SQL ==
a
^^^
JVM stacktrace:
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE',
'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS',
'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST',
'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE',
'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE',
'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos
0)
== SQL ==
a
^^^
at
org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266)
at
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133)
at
org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
at
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:81)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:604)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:604)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```
</details>
<details>
<summary>Python exception message without this change</summary>
```
Traceback (most recent call last):
File "/.../spark/python/pyspark/sql/utils.py", line 98, in deco
return f(*a, **kw)
File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line
328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o26.sql.
: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE',
'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS',
'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST',
'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE',
'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE',
'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos
0)
== SQL ==
a
^^^
at
org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266)
at
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133)
at
org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
at
org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:81)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:604)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:604)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/session.py", line 646, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1305, in __call__
File "/.../spark/python/pyspark/sql/utils.py", line 102, in deco
raise converted
pyspark.sql.utils.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE',
'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS',
'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST',
'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE',
'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE',
'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos
0)
== SQL ==
a
^^^
```
</details>
### Why are the changes needed?
Currently, PySpark exceptions are very unfriendly to Python users with
causing a bunch of JVM stacktrace. See "Python exception message without this
change" above.
### Does this PR introduce _any_ user-facing change?
Yes, it will change the exception message. See the examples above.
### How was this patch tested?
Manually tested by
```bash
./bin/pyspark --conf spark.sql.pyspark.jvmStacktrace.enabled=true
```
and running the examples above.
Closes #28661 from HyukjinKwon/python-debug.
Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit e69466056fb2c121b7bbb6ad082f09deb1c41063)
Signed-off-by: HyukjinKwon <[email protected]>
---
python/pyspark/sql/tests/test_pandas_udf.py | 14 +++---
python/pyspark/sql/utils.py | 53 ++++++++++++++++++----
.../org/apache/spark/sql/internal/SQLConf.scala | 11 +++++
3 files changed, 61 insertions(+), 17 deletions(-)
diff --git a/python/pyspark/sql/tests/test_pandas_udf.py
b/python/pyspark/sql/tests/test_pandas_udf.py
index 4218f5c..7fa65f0 100644
--- a/python/pyspark/sql/tests/test_pandas_udf.py
+++ b/python/pyspark/sql/tests/test_pandas_udf.py
@@ -19,14 +19,12 @@ import unittest
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
from pyspark.sql.types import *
-from pyspark.sql.utils import ParseException
+from pyspark.sql.utils import ParseException, PythonException
from pyspark.rdd import PythonEvalType
from pyspark.testing.sqlutils import ReusedSQLTestCase, have_pandas,
have_pyarrow, \
pandas_requirement_message, pyarrow_requirement_message
from pyspark.testing.utils import QuietTest
-from py4j.protocol import Py4JJavaError
-
@unittest.skipIf(
not have_pandas or not have_pyarrow,
@@ -157,14 +155,14 @@ class PandasUDFTests(ReusedSQLTestCase):
# plain udf (test for SPARK-23754)
self.assertRaisesRegexp(
- Py4JJavaError,
+ PythonException,
exc_message,
df.withColumn('v', udf(foo)('id')).collect
)
# pandas scalar udf
self.assertRaisesRegexp(
- Py4JJavaError,
+ PythonException,
exc_message,
df.withColumn(
'v', pandas_udf(foo, 'double', PandasUDFType.SCALAR)('id')
@@ -173,7 +171,7 @@ class PandasUDFTests(ReusedSQLTestCase):
# pandas grouped map
self.assertRaisesRegexp(
- Py4JJavaError,
+ PythonException,
exc_message,
df.groupBy('id').apply(
pandas_udf(foo, df.schema, PandasUDFType.GROUPED_MAP)
@@ -181,7 +179,7 @@ class PandasUDFTests(ReusedSQLTestCase):
)
self.assertRaisesRegexp(
- Py4JJavaError,
+ PythonException,
exc_message,
df.groupBy('id').apply(
pandas_udf(foofoo, df.schema, PandasUDFType.GROUPED_MAP)
@@ -190,7 +188,7 @@ class PandasUDFTests(ReusedSQLTestCase):
# pandas grouped agg
self.assertRaisesRegexp(
- Py4JJavaError,
+ PythonException,
exc_message,
df.groupBy('id').agg(
pandas_udf(foo, 'double', PandasUDFType.GROUPED_AGG)('id')
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 147ac33..27adc23 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -18,8 +18,19 @@
import py4j
import sys
+from pyspark import SparkContext
+
if sys.version_info.major >= 3:
unicode = str
+ # Disable exception chaining (PEP 3134) in captured exceptions
+ # in order to hide JVM stacktace.
+ exec("""
+def raise_from(e):
+ raise e from None
+""")
+else:
+ def raise_from(e):
+ raise e
class CapturedException(Exception):
@@ -29,7 +40,11 @@ class CapturedException(Exception):
self.cause = convert_exception(cause) if cause is not None else None
def __str__(self):
+ sql_conf =
SparkContext._jvm.org.apache.spark.sql.internal.SQLConf.get()
+ debug_enabled = sql_conf.pysparkJVMStacktraceEnabled()
desc = self.desc
+ if debug_enabled:
+ desc = desc + "\nJVM stacktrace:\n%s" % self.stackTrace
# encode unicode instance for python2 for human readable description
if sys.version_info.major < 3 and isinstance(desc, unicode):
return str(desc.encode('utf-8'))
@@ -67,6 +82,12 @@ class QueryExecutionException(CapturedException):
"""
+class PythonException(CapturedException):
+ """
+ Exceptions thrown from Python workers.
+ """
+
+
class UnknownException(CapturedException):
"""
None of the above exceptions.
@@ -75,21 +96,33 @@ class UnknownException(CapturedException):
def convert_exception(e):
s = e.toString()
- stackTrace = '\n\t at '.join(map(lambda x: x.toString(),
e.getStackTrace()))
c = e.getCause()
+
+ jvm = SparkContext._jvm
+ jwriter = jvm.java.io.StringWriter()
+ e.printStackTrace(jvm.java.io.PrintWriter(jwriter))
+ stacktrace = jwriter.toString()
if s.startswith('org.apache.spark.sql.AnalysisException: '):
- return AnalysisException(s.split(': ', 1)[1], stackTrace, c)
+ return AnalysisException(s.split(': ', 1)[1], stacktrace, c)
if s.startswith('org.apache.spark.sql.catalyst.analysis'):
- return AnalysisException(s.split(': ', 1)[1], stackTrace, c)
+ return AnalysisException(s.split(': ', 1)[1], stacktrace, c)
if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
- return ParseException(s.split(': ', 1)[1], stackTrace, c)
+ return ParseException(s.split(': ', 1)[1], stacktrace, c)
if s.startswith('org.apache.spark.sql.streaming.StreamingQueryException:
'):
- return StreamingQueryException(s.split(': ', 1)[1], stackTrace, c)
+ return StreamingQueryException(s.split(': ', 1)[1], stacktrace, c)
if s.startswith('org.apache.spark.sql.execution.QueryExecutionException:
'):
- return QueryExecutionException(s.split(': ', 1)[1], stackTrace, c)
+ return QueryExecutionException(s.split(': ', 1)[1], stacktrace, c)
if s.startswith('java.lang.IllegalArgumentException: '):
- return IllegalArgumentException(s.split(': ', 1)[1], stackTrace, c)
- return UnknownException(s, stackTrace, c)
+ return IllegalArgumentException(s.split(': ', 1)[1], stacktrace, c)
+ if c is not None and (
+
c.toString().startswith('org.apache.spark.api.python.PythonException: ')
+ # To make sure this only catches Python UDFs.
+ and any(map(lambda v: "org.apache.spark.sql.execution.python" in
v.toString(),
+ c.getStackTrace()))):
+ msg = ("\n An exception was thrown from Python worker in the
executor. "
+ "The below is the Python worker stacktrace.\n%s" %
c.getMessage())
+ return PythonException(msg, stacktrace)
+ return UnknownException(s, stacktrace, c)
def capture_sql_exception(f):
@@ -99,7 +132,9 @@ def capture_sql_exception(f):
except py4j.protocol.Py4JJavaError as e:
converted = convert_exception(e.java_exception)
if not isinstance(converted, UnknownException):
- raise converted
+ # Hide where the exception came from that shows a non-Pythonic
+ # JVM exception message.
+ raise_from(converted)
else:
raise
return deco
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b36c01d..b8b0f32 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1783,6 +1783,15 @@ object SQLConf {
.version("3.0.0")
.fallbackConf(ARROW_EXECUTION_ENABLED)
+ val PYSPARK_JVM_STACKTRACE_ENABLED =
+ buildConf("spark.sql.pyspark.jvmStacktrace.enabled")
+ .doc("When true, it shows the JVM stacktrace in the user-facing PySpark
exception " +
+ "together with Python stacktrace. By default, it is disabled and hides
JVM stacktrace " +
+ "and shows a Python-friendly exception only.")
+ .version("3.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
val ARROW_SPARKR_EXECUTION_ENABLED =
buildConf("spark.sql.execution.arrow.sparkr.enabled")
.doc("When true, make use of Apache Arrow for columnar data transfers in
SparkR. " +
@@ -3042,6 +3051,8 @@ class SQLConf extends Serializable with Logging {
def arrowPySparkEnabled: Boolean = getConf(ARROW_PYSPARK_EXECUTION_ENABLED)
+ def pysparkJVMStacktraceEnabled: Boolean =
getConf(PYSPARK_JVM_STACKTRACE_ENABLED)
+
def arrowSparkREnabled: Boolean = getConf(ARROW_SPARKR_EXECUTION_ENABLED)
def arrowPySparkFallbackEnabled: Boolean =
getConf(ARROW_PYSPARK_FALLBACK_ENABLED)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]