Repository: spark Updated Branches: refs/heads/branch-1.1 0faea1761 -> fa3b3e384
SPARK-785 [CORE] ClosureCleaner not invoked on most PairRDDFunctions This looked like perhaps a simple and important one. `combineByKey` looks like it should clean its arguments' closures, and that in turn covers apparently all remaining functions in `PairRDDFunctions` which delegate to it. Author: Sean Owen <so...@cloudera.com> Closes #3690 from srowen/SPARK-785 and squashes the following commits: 8df68fe [Sean Owen] Clean context of most remaining functions in PairRDDFunctions, which ultimately call combineByKey (cherry picked from commit 2a28bc61009a170af3853c78f7f36970898a6d56) Signed-off-by: Josh Rosen <joshro...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa3b3e38 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa3b3e38 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa3b3e38 Branch: refs/heads/branch-1.1 Commit: fa3b3e384531ad1e8a3f6f138110c89a5ba42ec7 Parents: 0faea17 Author: Sean Owen <so...@cloudera.com> Authored: Mon Dec 15 16:06:15 2014 -0800 Committer: Josh Rosen <joshro...@databricks.com> Committed: Mon Dec 15 16:07:30 2014 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fa3b3e38/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index f6d9d12..869321d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -84,7 +84,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) throw new SparkException("Default partitioner cannot partition array keys.") } } - val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) + val aggregator = new Aggregator[K, V, C]( + self.context.clean(createCombiner), + self.context.clean(mergeValue), + self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org