Repository: spark
Updated Branches:
  refs/heads/master 65253502b -> 7d2a7a91f


[SPARK-3235][SQL] Ensure in-memory tables don't always broadcast.

Author: Michael Armbrust <[email protected]>

Closes #2147 from marmbrus/inMemDefaultSize and squashes the following commits:

5390360 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into 
inMemDefaultSize
14204d3 [Michael Armbrust] Set the context before creating SparkLogicalPlans.
8da4414 [Michael Armbrust] Make sure we throw errors when leaf nodes fail to 
provide statistcs
18ce029 [Michael Armbrust] Ensure in-memory tables don't always broadcast.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d2a7a91
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d2a7a91
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d2a7a91

Branch: refs/heads/master
Commit: 7d2a7a91f263bb9fbf24dc4dbffde8fe5e2c7442
Parents: 6525350
Author: Michael Armbrust <[email protected]>
Authored: Wed Aug 27 15:14:08 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Wed Aug 27 15:14:08 2014 -0700

----------------------------------------------------------------------
 .../sql/catalyst/plans/logical/LogicalPlan.scala      | 14 ++++++++------
 .../main/scala/org/apache/spark/sql/SQLContext.scala  |  4 +++-
 .../sql/columnar/InMemoryColumnarTableScan.scala      |  3 +++
 .../org/apache/spark/sql/execution/SparkPlan.scala    |  2 +-
 .../sql/columnar/InMemoryColumnarQuerySuite.scala     |  8 ++++++++
 5 files changed, 23 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d2a7a91/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 8616ac4..f81d911 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -41,9 +41,14 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
   case class Statistics(
     sizeInBytes: BigInt
   )
-  lazy val statistics: Statistics = Statistics(
-    sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product
-  )
+  lazy val statistics: Statistics = {
+    if (children.size == 0) {
+      throw new UnsupportedOperationException(s"LeafNode $nodeName must 
implement statistics.")
+    }
+
+    Statistics(
+      sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
+  }
 
   /**
    * Returns the set of attributes that this node takes as
@@ -117,9 +122,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
  */
 abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
   self: Product =>
-
-  override lazy val statistics: Statistics =
-    throw new UnsupportedOperationException(s"LeafNode $nodeName must 
implement statistics.")
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7d2a7a91/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6f0eed3..a75af94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -89,8 +89,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *
    * @group userf
    */
-  implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
+  implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) = {
+    SparkPlan.currentContext.set(self)
     new SchemaRDD(this, 
SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))
+  }
 
   /**
    * :: DeveloperApi ::

http://git-wip-us.apache.org/repos/asf/spark/blob/7d2a7a91/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 24e88ee..bc36bac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -39,6 +39,9 @@ private[sql] case class InMemoryRelation(
     (private var _cachedColumnBuffers: RDD[Array[ByteBuffer]] = null)
   extends LogicalPlan with MultiInstanceRelation {
 
+  override lazy val statistics =
+    Statistics(sizeInBytes = child.sqlContext.defaultSizeInBytes)
+
   // If the cached column buffers were not passed in, we calculate them in the 
constructor.
   // As in Spark, the actual work of caching is lazy.
   if (_cachedColumnBuffers == null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/7d2a7a91/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 7d33ea5..2b89139 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -49,7 +49,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
    * populated by the query planning infrastructure.
    */
   @transient
-  protected val sqlContext = SparkPlan.currentContext.get()
+  protected[spark] val sqlContext = SparkPlan.currentContext.get()
 
   protected def sparkContext = sqlContext.sparkContext
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d2a7a91/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 736c0f8..fdd2799 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -33,6 +33,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
     checkAnswer(scan, testData.collect().toSeq)
   }
 
+  test("default size avoids broadcast") {
+    // TODO: Improve this test when we have better statistics
+    sparkContext.parallelize(1 to 10).map(i => TestData(i, 
i.toString)).registerTempTable("sizeTst")
+    cacheTable("sizeTst")
+    assert(
+      table("sizeTst").queryExecution.logical.statistics.sizeInBytes > 
autoBroadcastJoinThreshold)
+  }
+
   test("projection") {
     val plan = TestSQLContext.executePlan(testData.select('value, 
'key).logicalPlan).executedPlan
     val scan = InMemoryRelation(useCompression = true, 5, plan)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to