Repository: spark Updated Branches: refs/heads/branch-0.9 87e4dd58c -> ef74e44e0
SPARK-1019: pyspark RDD take() throws an NPE Author: Patrick Wendell <[email protected]> Closes #112 from pwendell/pyspark-take and squashes the following commits: daae80e [Patrick Wendell] SPARK-1019: pyspark RDD take() throws an NPE (cherry picked from commit 4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d) Signed-off-by: Patrick Wendell <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef74e44e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef74e44e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef74e44e Branch: refs/heads/branch-0.9 Commit: ef74e44e04517abd4c7058c87abd1f4e8fbaa09d Parents: 87e4dd5 Author: Patrick Wendell <[email protected]> Authored: Wed Mar 12 23:16:59 2014 -0700 Committer: Patrick Wendell <[email protected]> Committed: Wed Mar 12 23:17:17 2014 -0700 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/TaskContext.scala | 3 ++- .../main/scala/org/apache/spark/api/python/PythonRDD.scala | 8 ++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ef74e44e/core/src/main/scala/org/apache/spark/TaskContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index cae983e..be53ca2 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -46,6 +46,7 @@ class TaskContext( } def executeOnCompleteCallbacks() { - onCompleteCallbacks.foreach{_()} + // Process complete callbacks in the reverse order of registration + onCompleteCallbacks.reverse.foreach{_()} } } http://git-wip-us.apache.org/repos/asf/spark/blob/ef74e44e/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 9cbd26b..e03c6f9 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -99,6 +99,14 @@ private[spark] class PythonRDD[T: ClassTag]( } }.start() + /* + * Partial fix for SPARK-1019: Attempts to stop reading the input stream since + * other completion callbacks might invalidate the input. Because interruption + * is not synchronous this still leaves a potential race where the interruption is + * processed only after the stream becomes invalid. + */ + context.addOnCompleteCallback(() => context.interrupted = true) + // Return an iterator that read lines from the process's stdout val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) val stdoutIterator = new Iterator[Array[Byte]] {
