Repository: spark Updated Branches: refs/heads/branch-0.9 70491642f -> 51a77e977
SPARK-1162 Added top in python. Author: Prashant Sharma <[email protected]> Closes #93 from ScrapCodes/SPARK-1162/pyspark-top-takeOrdered and squashes the following commits: ece1fa4 [Prashant Sharma] Added top in python. (cherry picked from commit b8afe3052086547879ebf28d6e36207e0d370710) Signed-off-by: Matei Zaharia <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51a77e97 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51a77e97 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51a77e97 Branch: refs/heads/branch-0.9 Commit: 51a77e9779b64e464d07580de12ac3e1fe77e41a Parents: 7049164 Author: Prashant Sharma <[email protected]> Authored: Wed Mar 12 15:57:44 2014 -0700 Committer: Matei Zaharia <[email protected]> Committed: Wed Mar 12 15:57:54 2014 -0700 ---------------------------------------------------------------------- python/pyspark/rdd.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/51a77e97/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 60d4cb2..678b005 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -28,6 +28,7 @@ from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread import warnings +from heapq import heappush, heappop, heappushpop from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, pack_long @@ -616,6 +617,30 @@ class RDD(object): m1[k] += v return m1 return self.mapPartitions(countPartition).reduce(mergeMaps) + + def top(self, num): + """ + Get the top N elements from a RDD. + + Note: It returns the list sorted in ascending order. + >>> sc.parallelize([10, 4, 2, 12, 3]).top(1) + [12] + >>> sc.parallelize([2, 3, 4, 5, 6]).cache().top(2) + [5, 6] + """ + def topIterator(iterator): + q = [] + for k in iterator: + if len(q) < num: + heappush(q, k) + else: + heappushpop(q, k) + yield q + + def merge(a, b): + return next(topIterator(a + b)) + + return sorted(self.mapPartitions(topIterator).reduce(merge)) def take(self, num): """
