Repository: spark Updated Branches: refs/heads/master f4dd665c8 -> 21e40ed88
[SPARK-1161] Add saveAsPickleFile and SparkContext.pickleFile in Python Author: Kan Zhang <[email protected]> Closes #755 from kanzhang/SPARK-1161 and squashes the following commits: 24ed8a2 [Kan Zhang] [SPARK-1161] Fixing doc tests 44e0615 [Kan Zhang] [SPARK-1161] Adding an optional batchSize with default value 10 d929429 [Kan Zhang] [SPARK-1161] Add saveAsObjectFile and SparkContext.objectFile in Python Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21e40ed8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21e40ed8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21e40ed8 Branch: refs/heads/master Commit: 21e40ed88bf2c205c3d7f947fde5d5a6f3e29f7f Parents: f4dd665 Author: Kan Zhang <[email protected]> Authored: Tue Jun 3 18:18:25 2014 -0700 Committer: Matei Zaharia <[email protected]> Committed: Tue Jun 3 18:18:25 2014 -0700 ---------------------------------------------------------------------- python/pyspark/context.py | 14 ++++++++++++++ python/pyspark/rdd.py | 33 +++++++++++++++++++++++++-------- 2 files changed, 39 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/21e40ed8/python/pyspark/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 9ae9305..211918f 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -271,6 +271,20 @@ class SparkContext(object): jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices) return RDD(jrdd, self, serializer) + def pickleFile(self, name, minPartitions=None): + """ + Load an RDD previously saved using L{RDD.saveAsPickleFile} method. + + >>> tmpFile = NamedTemporaryFile(delete=True) + >>> tmpFile.close() + >>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5) + >>> sorted(sc.pickleFile(tmpFile.name, 3).collect()) + [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + """ + minPartitions = minPartitions or self.defaultMinPartitions + return RDD(self._jsc.objectFile(name, minPartitions), self, + BatchedSerializer(PickleSerializer())) + def textFile(self, name, minPartitions=None): """ Read a text file from HDFS, a local file system (available on all http://git-wip-us.apache.org/repos/asf/spark/blob/21e40ed8/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1b3c460..ca0a955 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -33,7 +33,8 @@ import heapq from random import Random from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ - BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long + BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ + PickleSerializer, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter @@ -427,11 +428,14 @@ class RDD(object): .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ .keys() - def _reserialize(self): - if self._jrdd_deserializer == self.ctx.serializer: + def _reserialize(self, serializer=None): + serializer = serializer or self.ctx.serializer + if self._jrdd_deserializer == serializer: return self else: - return self.map(lambda x: x, preservesPartitioning=True) + converted = self.map(lambda x: x, preservesPartitioning=True) + converted._jrdd_deserializer = serializer + return converted def __add__(self, other): """ @@ -897,6 +901,20 @@ class RDD(object): """ return self.take(1)[0] + def saveAsPickleFile(self, path, batchSize=10): + """ + Save this RDD as a SequenceFile of serialized objects. The serializer used is + L{pyspark.serializers.PickleSerializer}, default batch size is 10. + + >>> tmpFile = NamedTemporaryFile(delete=True) + >>> tmpFile.close() + >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3) + >>> sorted(sc.pickleFile(tmpFile.name, 5).collect()) + [1, 2, 'rdd', 'spark'] + """ + self._reserialize(BatchedSerializer(PickleSerializer(), + batchSize))._jrdd.saveAsObjectFile(path) + def saveAsTextFile(self, path): """ Save this RDD as a text file, using string representations of elements. @@ -1421,10 +1439,9 @@ class PipelinedRDD(RDD): if self._jrdd_val: return self._jrdd_val if self._bypass_serializer: - serializer = NoOpSerializer() - else: - serializer = self.ctx.serializer - command = (self.func, self._prev_jrdd_deserializer, serializer) + self._jrdd_deserializer = NoOpSerializer() + command = (self.func, self._prev_jrdd_deserializer, + self._jrdd_deserializer) pickled_command = CloudPickleSerializer().dumps(command) broadcast_vars = ListConverter().convert( [x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
