Repository: spark Updated Branches: refs/heads/branch-1.5 f36624983 -> 30eea40ff
[SPARK-10577] [PYSPARK] DataFrame hint for broadcast join https://issues.apache.org/jira/browse/SPARK-10577 Author: Jian Feng <[email protected]> Closes #8801 from Jianfeng-chs/master. (cherry picked from commit 0180b849dbaf191826231eda7dfaaf146a19602b) Signed-off-by: Reynold Xin <[email protected]> Conflicts: python/pyspark/sql/tests.py Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30eea40f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30eea40f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30eea40f Branch: refs/heads/branch-1.5 Commit: 30eea40fff97391b8ee3201dd7c6ea7440521386 Parents: f366249 Author: Jian Feng <[email protected]> Authored: Mon Sep 21 23:36:41 2015 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed Oct 14 12:23:22 2015 -0700 ---------------------------------------------------------------------- python/pyspark/sql/functions.py | 9 +++++++++ python/pyspark/sql/tests.py | 27 +++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/30eea40f/python/pyspark/sql/functions.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 4b74a50..3c631a0 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -30,6 +30,7 @@ from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.sql import since from pyspark.sql.types import StringType from pyspark.sql.column import Column, _to_java_column, _to_seq +from pyspark.sql.dataframe import DataFrame def _create_function(name, doc=""): @@ -190,6 +191,14 @@ def approxCountDistinct(col, rsd=None): return Column(jc) +@since(1.6) +def broadcast(df): + """Marks a DataFrame as small enough for use in broadcast joins.""" + + sc = SparkContext._active_spark_context + return DataFrame(sc._jvm.functions.broadcast(df._jdf), df.sql_ctx) + + @since(1.4) def coalesce(*cols): """Returns the first column that is not null. http://git-wip-us.apache.org/repos/asf/spark/blob/30eea40f/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6b647f3..14414b3 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1056,6 +1056,33 @@ class SQLTests(ReusedPySparkTestCase): keys = self.df.withColumn("key", self.df.key).select("key").collect() self.assertEqual([r.key for r in keys], list(range(100))) + # regression test for SPARK-10417 + def test_column_iterator(self): + + def foo(): + for x in self.df.key: + break + + self.assertRaises(TypeError, foo) + + # add test for SPARK-10577 (test broadcast join hint) + def test_functions_broadcast(self): + from pyspark.sql.functions import broadcast + + df1 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value")) + df2 = self.sqlCtx.createDataFrame([(1, "1"), (2, "2")], ("key", "value")) + + # equijoin - should be converted into broadcast join + plan1 = df1.join(broadcast(df2), "key")._jdf.queryExecution().executedPlan() + self.assertEqual(1, plan1.toString().count("BroadcastHashJoin")) + + # no join key -- should not be a broadcast join + plan2 = df1.join(broadcast(df2))._jdf.queryExecution().executedPlan() + self.assertEqual(0, plan2.toString().count("BroadcastHashJoin")) + + # planner should not crash without a join + broadcast(df1)._jdf.queryExecution().executedPlan() + class HiveContextSQLTests(ReusedPySparkTestCase): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
