Repository: spark
Updated Branches:
  refs/heads/branch-0.9 6fe72dd38 -> 886a466f3


SPARK-1115: Catch depickling errors

This surroungs the complete worker code in a try/except block so we catch any 
error that arrives. An example would be the depickling failing for some reason

@JoshRosen

Author: Bouke van der Bijl <[email protected]>

Closes #644 from bouk/catch-depickling-errors and squashes the following 
commits:

f0f67cc [Bouke van der Bijl] Lol indentation
0e4d504 [Bouke van der Bijl] Surround the complete python worker with the try 
block
(cherry picked from commit 12738c1aec136acd7f2e3e2f8f2b541db0890630)

Signed-off-by: Josh Rosen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/886a466f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/886a466f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/886a466f

Branch: refs/heads/branch-0.9
Commit: 886a466f32b06846a7f3f664c4a906fb6c0cb25d
Parents: 6fe72dd
Author: Bouke van der Bijl <[email protected]>
Authored: Wed Feb 26 14:50:37 2014 -0800
Committer: Josh Rosen <[email protected]>
Committed: Wed Feb 26 14:53:30 2014 -0800

----------------------------------------------------------------------
 python/pyspark/worker.py | 48 +++++++++++++++++++++----------------------
 1 file changed, 24 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/886a466f/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 4be4063..4e47d02 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -45,34 +45,34 @@ def report_times(outfile, boot, init, finish):
 
 
 def main(infile, outfile):
-    boot_time = time.time()
-    split_index = read_int(infile)
-    if split_index == -1:  # for unit tests
-        return
+    try:
+        boot_time = time.time()
+        split_index = read_int(infile)
+        if split_index == -1:  # for unit tests
+            return
 
-    # fetch name of workdir
-    spark_files_dir = utf8_deserializer.loads(infile)
-    SparkFiles._root_directory = spark_files_dir
-    SparkFiles._is_running_on_worker = True
+        # fetch name of workdir
+        spark_files_dir = utf8_deserializer.loads(infile)
+        SparkFiles._root_directory = spark_files_dir
+        SparkFiles._is_running_on_worker = True
 
-    # fetch names and values of broadcast variables
-    num_broadcast_variables = read_int(infile)
-    for _ in range(num_broadcast_variables):
-        bid = read_long(infile)
-        value = pickleSer._read_with_length(infile)
-        _broadcastRegistry[bid] = Broadcast(bid, value)
+        # fetch names and values of broadcast variables
+        num_broadcast_variables = read_int(infile)
+        for _ in range(num_broadcast_variables):
+            bid = read_long(infile)
+            value = pickleSer._read_with_length(infile)
+            _broadcastRegistry[bid] = Broadcast(bid, value)
 
-    # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
-    sys.path.append(spark_files_dir) # *.py files that were added will be 
copied here
-    num_python_includes =  read_int(infile)
-    for _ in range(num_python_includes):
-        filename = utf8_deserializer.loads(infile)
-        sys.path.append(os.path.join(spark_files_dir, filename))
+        # fetch names of includes (*.zip and *.egg files) and construct 
PYTHONPATH
+        sys.path.append(spark_files_dir) # *.py files that were added will be 
copied here
+        num_python_includes =  read_int(infile)
+        for _ in range(num_python_includes):
+            filename = utf8_deserializer.loads(infile)
+            sys.path.append(os.path.join(spark_files_dir, filename))
 
-    command = pickleSer._read_with_length(infile)
-    (func, deserializer, serializer) = command
-    init_time = time.time()
-    try:
+        command = pickleSer._read_with_length(infile)
+        (func, deserializer, serializer) = command
+        init_time = time.time()
         iterator = deserializer.load_stream(infile)
         serializer.dump_stream(func(split_index, iterator), outfile)
     except Exception as e:

Reply via email to