Repository: spark
Updated Branches:
refs/heads/branch-2.1 2971ae564 -> cd297c390
[SPARK-18281] [SQL] [PYSPARK] Remove timeout for reading data through socket
for local iterator
## What changes were proposed in this pull request?
There is a timeout failure when using `rdd.toLocalIterator()` or
`df.toLocalIterator()` for a PySpark RDD and DataFrame:
df = spark.createDataFrame([[1],[2],[3]])
it = df.toLocalIterator()
row = next(it)
df2 = df.repartition(1000) # create many empty partitions which increase
materialization time so causing timeout
it2 = df2.toLocalIterator()
row = next(it2)
The cause of this issue is, we open a socket to serve the data from JVM side.
We set timeout for connection and reading through the socket in Python side. In
Python we use a generator to read the data, so we only begin to connect the
socket once we start to ask data from it. If we don't consume it immediately,
there is connection timeout.
In the other side, the materialization time for RDD partitions is
unpredictable. So we can't set a timeout for reading data through the socket.
Otherwise, it is very possibly to fail.
## How was this patch tested?
Added tests into PySpark.
Please review http://spark.apache.org/contributing.html before opening a pull
request.
Author: Liang-Chi Hsieh <[email protected]>
Closes #16263 from viirya/fix-pyspark-localiterator.
(cherry picked from commit 95c95b71ed31b2971475aec6d7776dc234845d0a)
Signed-off-by: Davies Liu <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd297c39
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd297c39
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd297c39
Branch: refs/heads/branch-2.1
Commit: cd297c390daedbfcaea8431dec4a37ca39dd26e3
Parents: 2971ae5
Author: Liang-Chi Hsieh <[email protected]>
Authored: Tue Dec 20 13:12:16 2016 -0800
Committer: Davies Liu <[email protected]>
Committed: Tue Dec 20 13:12:36 2016 -0800
----------------------------------------------------------------------
python/pyspark/rdd.py | 11 +++++------
python/pyspark/tests.py | 12 ++++++++++++
2 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cd297c39/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 9e05da8..b384b2b 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -135,12 +135,11 @@ def _load_from_socket(port, serializer):
break
if not sock:
raise Exception("could not open socket")
- try:
- rf = sock.makefile("rb", 65536)
- for item in serializer.load_stream(rf):
- yield item
- finally:
- sock.close()
+ # The RDD materialization time is unpredicable, if we set a timeout for
socket reading
+ # operation, it will very possibly fail. See SPARK-18281.
+ sock.settimeout(None)
+ # The socket will be automatically closed when garbage-collected.
+ return serializer.load_stream(sock.makefile("rb", 65536))
def ignore_unicode_prefix(f):
http://git-wip-us.apache.org/repos/asf/spark/blob/cd297c39/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 89fce8a..fe314c5 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -502,6 +502,18 @@ class RDDTests(ReusedPySparkTestCase):
self.assertEqual(0, self.sc.emptyRDD().sum())
self.assertEqual(6, self.sc.parallelize([1, 2, 3]).sum())
+ def test_to_localiterator(self):
+ from time import sleep
+ rdd = self.sc.parallelize([1, 2, 3])
+ it = rdd.toLocalIterator()
+ sleep(5)
+ self.assertEqual([1, 2, 3], sorted(it))
+
+ rdd2 = rdd.repartition(1000)
+ it2 = rdd2.toLocalIterator()
+ sleep(5)
+ self.assertEqual([1, 2, 3], sorted(it2))
+
def test_save_as_textfile_with_unicode(self):
# Regression test for SPARK-970
x = u"\u00A1Hola, mundo!"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]