SPARK-977 Added Python RDD.zip function was raised earlier as a part of apache/incubator-spark#486
Author: Prabin Banka <[email protected]> Closes #76 from prabinb/python-api-zip and squashes the following commits: b1a31a0 [Prabin Banka] Added Python RDD.zip function Conflicts: python/pyspark/rdd.py Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1dc1e988 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1dc1e988 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1dc1e988 Branch: refs/heads/branch-0.9 Commit: 1dc1e988fd379bce8f661091bd8224724a7345a3 Parents: 249930a Author: Prabin Banka <[email protected]> Authored: Mon Mar 10 13:27:00 2014 -0700 Committer: Patrick Wendell <[email protected]> Committed: Sun Mar 16 22:16:17 2014 -0700 ---------------------------------------------------------------------- python/pyspark/rdd.py | 2 +- python/pyspark/serializers.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 29 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1dc1e988/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 0fdb9e6..c29cefa 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -31,7 +31,7 @@ import warnings from heapq import heappush, heappop, heappushpop from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ - BatchedSerializer, CloudPickleSerializer, pack_long + BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter http://git-wip-us.apache.org/repos/asf/spark/blob/1dc1e988/python/pyspark/serializers.py ---------------------------------------------------------------------- diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 8c6ad79..12c63f1 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -204,7 +204,7 @@ class CartesianDeserializer(FramedSerializer): self.key_ser = key_ser self.val_ser = val_ser - def load_stream(self, stream): + def prepare_keys_values(self, stream): key_stream = self.key_ser._load_stream_without_unbatching(stream) val_stream = self.val_ser._load_stream_without_unbatching(stream) key_is_batched = isinstance(self.key_ser, BatchedSerializer) @@ -212,6 +212,10 @@ class CartesianDeserializer(FramedSerializer): for (keys, vals) in izip(key_stream, val_stream): keys = keys if key_is_batched else [keys] vals = vals if val_is_batched else [vals] + yield (keys, vals) + + def load_stream(self, stream): + for (keys, vals) in self.prepare_keys_values(stream): for pair in product(keys, vals): yield pair @@ -224,6 +228,29 @@ class CartesianDeserializer(FramedSerializer): (str(self.key_ser), str(self.val_ser)) +class PairDeserializer(CartesianDeserializer): + """ + Deserializes the JavaRDD zip() of two PythonRDDs. + """ + + def __init__(self, key_ser, val_ser): + self.key_ser = key_ser + self.val_ser = val_ser + + def load_stream(self, stream): + for (keys, vals) in self.prepare_keys_values(stream): + for pair in izip(keys, vals): + yield pair + + def __eq__(self, other): + return isinstance(other, PairDeserializer) and \ + self.key_ser == other.key_ser and self.val_ser == other.val_ser + + def __str__(self): + return "PairDeserializer<%s, %s>" % \ + (str(self.key_ser), str(self.val_ser)) + + class NoOpSerializer(FramedSerializer): def loads(self, obj): return obj
