Repository: spark
Updated Branches:
  refs/heads/master 2ef4c5963 -> ba8c86d06


[SPARK-13671] [SPARK-13311] [SQL] Use different physical plans for RDD and data 
sources

## What changes were proposed in this pull request?

This PR split the PhysicalRDD into two classes, PhysicalRDD and PhysicalScan. 
PhysicalRDD is used for DataFrames that is created from existing RDD. 
PhysicalScan is used for DataFrame that is created from data sources. This 
enable use to apply different optimization on both of them.

Also fix the problem for sameResult() on two DataSourceScan.

Also fix the equality check to toString for `In`. It's better to use Seq there, 
but we can't break this public API (sad).

## How was this patch tested?

Existing tests. Manually tested with TPCDS query Q59 and Q64, all those 
duplicated exchanges can be re-used now, also saw there are 40+% performance 
improvement (saving half of the scan).

Author: Davies Liu <[email protected]>

Closes #11514 from davies/existing_rdd.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba8c86d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba8c86d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba8c86d0

Branch: refs/heads/master
Commit: ba8c86d06f5968c1af4db8dd9a458005bc5f214c
Parents: 2ef4c59
Author: Davies Liu <[email protected]>
Authored: Sat Mar 12 00:48:36 2016 -0800
Committer: Davies Liu <[email protected]>
Committed: Sat Mar 12 00:48:36 2016 -0800

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                 |   3 +-
 .../spark/sql/catalyst/plans/QueryPlan.scala    |  12 +--
 .../spark/sql/execution/ExistingRDD.scala       | 107 ++++++++++++-------
 .../spark/sql/execution/WholeStageCodegen.scala |   1 +
 .../datasources/DataSourceStrategy.scala        |   8 +-
 .../spark/sql/execution/ui/SparkPlanGraph.scala |   6 +-
 .../org/apache/spark/sql/sources/filters.scala  |  19 +++-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |   6 +-
 .../spark/sql/sources/FilteredScanSuite.scala   |   2 +-
 .../spark/sql/sources/PrunedScanSuite.scala     |   2 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |   4 +-
 .../spark/sql/sources/BucketedReadSuite.scala   |   4 +-
 12 files changed, 110 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 99d665f..7008e8f 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -173,8 +173,7 @@ class DataFrame(object):
 
         >>> df.explain()
         == Physical Plan ==
-        WholeStageCodegen
-        :  +- Scan ExistingRDD[age#0,name#1]
+        Scan ExistingRDD[age#0,name#1]
 
         >>> df.explain(True)
         == Parsed Logical Plan ==

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index c222571..920e989 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -280,12 +280,12 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] 
extends TreeNode[PlanT
    * can do better should override this function.
    */
   def sameResult(plan: PlanType): Boolean = {
-    val canonicalizedLeft = this.canonicalized
-    val canonicalizedRight = plan.canonicalized
-    canonicalizedLeft.getClass == canonicalizedRight.getClass &&
-      canonicalizedLeft.children.size == canonicalizedRight.children.size &&
-      canonicalizedLeft.cleanArgs == canonicalizedRight.cleanArgs &&
-      (canonicalizedLeft.children, 
canonicalizedRight.children).zipped.forall(_ sameResult _)
+    val left = this.canonicalized
+    val right = plan.canonicalized
+    left.getClass == right.getClass &&
+      left.children.size == right.children.size &&
+      left.cleanArgs == right.cleanArgs &&
+      (left.children, right.children).zipped.forall(_ sameResult _)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 3662ed7..d363cb0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -101,17 +101,76 @@ private[sql] case class LogicalRDD(
 private[sql] case class PhysicalRDD(
     output: Seq[Attribute],
     rdd: RDD[InternalRow],
-    override val nodeName: String,
-    override val metadata: Map[String, String] = Map.empty,
-    isUnsafeRow: Boolean = false,
-    override val outputPartitioning: Partitioning = UnknownPartitioning(0))
+    override val nodeName: String) extends LeafNode {
+
+  private[sql] override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val numOutputRows = longMetric("numOutputRows")
+    rdd.mapPartitionsInternal { iter =>
+      val proj = UnsafeProjection.create(schema)
+      iter.map { r =>
+        numOutputRows += 1
+        proj(r)
+      }
+    }
+  }
+
+  override def simpleString: String = {
+    s"Scan $nodeName${output.mkString("[", ",", "]")}"
+  }
+}
+
+/** Physical plan node for scanning data from a relation. */
+private[sql] case class DataSourceScan(
+    output: Seq[Attribute],
+    rdd: RDD[InternalRow],
+    @transient relation: BaseRelation,
+    override val metadata: Map[String, String] = Map.empty)
   extends LeafNode with CodegenSupport {
 
+  override val nodeName: String = relation.toString
+
+  // Ignore rdd when checking results
+  override def sameResult(plan: SparkPlan ): Boolean = plan match {
+    case other: DataSourceScan => relation == other.relation && metadata == 
other.metadata
+    case _ => false
+  }
+
   private[sql] override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of 
output rows"))
 
+  val outputUnsafeRows = relation match {
+    case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
+      
!SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
+    case _: HadoopFsRelation => true
+    case _ => false
+  }
+
+  override val outputPartitioning = {
+    val bucketSpec = relation match {
+      // TODO: this should be closer to bucket planning.
+      case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => 
r.bucketSpec
+      case _ => None
+    }
+
+    def toAttribute(colName: String): Attribute = output.find(_.name == 
colName).getOrElse {
+      throw new AnalysisException(s"bucket column $colName not found in 
existing columns " +
+        s"(${output.map(_.name).mkString(", ")})")
+    }
+
+    bucketSpec.map { spec =>
+      val numBuckets = spec.numBuckets
+      val bucketColumns = spec.bucketColumnNames.map(toAttribute)
+      HashPartitioning(bucketColumns, numBuckets)
+    }.getOrElse {
+      UnknownPartitioning(0)
+    }
+  }
+
   protected override def doExecute(): RDD[InternalRow] = {
-    val unsafeRow = if (isUnsafeRow) {
+    val unsafeRow = if (outputUnsafeRows) {
       rdd
     } else {
       rdd.mapPartitionsInternal { iter =>
@@ -187,7 +246,7 @@ private[sql] case class PhysicalRDD(
     ctx.INPUT_ROW = row
     ctx.currentVars = null
     val columns2 = exprs.map(_.gen(ctx))
-    val inputRow = if (isUnsafeRow) row else null
+    val inputRow = if (outputUnsafeRows) row else null
     val scanRows = ctx.freshName("processRows")
     ctx.addNewFunction(scanRows,
       s"""
@@ -221,42 +280,8 @@ private[sql] case class PhysicalRDD(
   }
 }
 
-private[sql] object PhysicalRDD {
+private[sql] object DataSourceScan {
   // Metadata keys
   val INPUT_PATHS = "InputPaths"
   val PUSHED_FILTERS = "PushedFilters"
-
-  def createFromDataSource(
-      output: Seq[Attribute],
-      rdd: RDD[InternalRow],
-      relation: BaseRelation,
-      metadata: Map[String, String] = Map.empty): PhysicalRDD = {
-
-    val outputUnsafeRows = relation match {
-      case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
-        
!SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED)
-      case _: HadoopFsRelation => true
-      case _ => false
-    }
-
-    val bucketSpec = relation match {
-      // TODO: this should be closer to bucket planning.
-      case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled() => 
r.bucketSpec
-      case _ => None
-    }
-
-    def toAttribute(colName: String): Attribute = output.find(_.name == 
colName).getOrElse {
-      throw new AnalysisException(s"bucket column $colName not found in 
existing columns " +
-        s"(${output.map(_.name).mkString(", ")})")
-    }
-
-    bucketSpec.map { spec =>
-      val numBuckets = spec.numBuckets
-      val bucketColumns = spec.bucketColumnNames.map(toAttribute)
-      val partitioning = HashPartitioning(bucketColumns, numBuckets)
-      PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows, 
partitioning)
-    }.getOrElse {
-      PhysicalRDD(output, rdd, relation.toString, metadata, outputUnsafeRows)
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 52c2971..8fb4705 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -41,6 +41,7 @@ trait CodegenSupport extends SparkPlan {
     case _: BroadcastHashJoin => "bhj"
     case _: SortMergeJoin => "smj"
     case _: PhysicalRDD => "rdd"
+    case _: DataSourceScan => "scan"
     case _ => nodeName.toLowerCase
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2944a8f..1adf3b6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.PhysicalRDD.{INPUT_PATHS, PUSHED_FILTERS}
+import org.apache.spark.sql.execution.DataSourceScan.{INPUT_PATHS, 
PUSHED_FILTERS}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.ExecutedCommand
 import org.apache.spark.sql.execution.vectorized.{ColumnarBatch, 
ColumnVectorUtils}
@@ -239,7 +239,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
       }
 
     case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
-      execution.PhysicalRDD.createFromDataSource(
+      execution.DataSourceScan(
         l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: 
Nil
 
     case i @ logical.InsertIntoTable(l @ LogicalRelation(t: 
InsertableRelation, _, _),
@@ -639,7 +639,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         // Don't request columns that are only referenced by pushed filters.
         .filterNot(handledSet.contains)
 
-      val scan = execution.PhysicalRDD.createFromDataSource(
+      val scan = execution.DataSourceScan(
         projects.map(_.toAttribute),
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
         relation.relation, metadata)
@@ -649,7 +649,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
       val requestedColumns =
         (projectSet ++ filterSet -- 
handledSet).map(relation.attributeMap).toSeq
 
-      val scan = execution.PhysicalRDD.createFromDataSource(
+      val scan = execution.DataSourceScan(
         requestedColumns,
         scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
         relation.relation, metadata)

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 94d318e..8a36d32 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -93,6 +93,10 @@ private[sql] object SparkPlanGraph {
       case "Subquery" if subgraph != null =>
         // Subquery should not be included in WholeStageCodegen
         buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, 
parent, null, exchanges)
+      case "ReusedExchange" =>
+        // Point to the re-used exchange
+        val node = exchanges(planInfo.children.head)
+        edges += SparkPlanGraphEdge(node.id, parent.id)
       case name =>
         val metrics = planInfo.metrics.map { metric =>
           SQLPlanMetric(metric.name, metric.accumulatorId,
@@ -106,7 +110,7 @@ private[sql] object SparkPlanGraph {
         } else {
           subgraph.nodes += node
         }
-        if (name == "ShuffleExchange" || name == "BroadcastExchange") {
+        if (name.contains("Exchange")) {
           exchanges += planInfo -> node
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 3780cbb..9130e77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -82,7 +82,24 @@ case class LessThanOrEqual(attribute: String, value: Any) 
extends Filter
  *
  * @since 1.3.0
  */
-case class In(attribute: String, values: Array[Any]) extends Filter
+case class In(attribute: String, values: Array[Any]) extends Filter {
+  override def hashCode(): Int = {
+    var h = attribute.hashCode
+    values.foreach { v =>
+      h *= 41
+      h += v.hashCode()
+    }
+    h
+  }
+  override def equals(o: Any): Boolean = o match {
+    case In(a, vs) =>
+      a == attribute && vs.length == values.length && vs.zip(values).forall(x 
=> x._1 == x._2)
+    case _ => false
+  }
+  override def toString: String = {
+    s"In($attribute, [${values.mkString(",")}]"
+  }
+}
 
 /**
  * A filter that evaluates to `true` iff the attribute evaluates to null.

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index dfffa58..1ef5173 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.execution.PhysicalRDD
+import org.apache.spark.sql.execution.DataSourceScan
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
@@ -210,8 +210,8 @@ class JDBCSuite extends SparkFunSuite
       // the plan only has PhysicalRDD to scan JDBCRelation.
       
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
       val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
-      
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD])
-      
assert(node.child.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation"))
+      
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScan])
+      
assert(node.child.asInstanceOf[DataSourceScan].nodeName.contains("JDBCRelation"))
       df
     }
     assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 
1")).collect().size == 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 2ff79a2..19e34b4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -312,7 +312,7 @@ class FilteredScanSuite extends DataSourceTest with 
SharedSQLContext with Predic
       try {
         val queryExecution = sql(sqlString).queryExecution
         val rawPlan = queryExecution.executedPlan.collect {
-          case p: execution.PhysicalRDD => p
+          case p: execution.DataSourceScan => p
         } match {
           case Seq(p) => p
           case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
index db72297..62f991f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala
@@ -124,7 +124,7 @@ class PrunedScanSuite extends DataSourceTest with 
SharedSQLContext {
       try {
         val queryExecution = sql(sqlString).queryExecution
         val rawPlan = queryExecution.executedPlan.collect {
-          case p: execution.PhysicalRDD => p
+          case p: execution.DataSourceScan => p
         } match {
           case Seq(p) => p
           case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index a0f09d6..8fdbbd9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
 import java.io.File
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.PhysicalRDD
+import org.apache.spark.sql.execution.DataSourceScan
 import org.apache.spark.sql.execution.command.ExecutedCommand
 import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, 
InsertIntoHadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.execution.HiveTableScan
@@ -196,7 +196,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
       }.isEmpty)
     assert(
       sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
-        case _: PhysicalRDD => true
+        case _: DataSourceScan => true
       }.nonEmpty)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ba8c86d0/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 35573f6..a0be55c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -22,7 +22,7 @@ import java.io.File
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
-import org.apache.spark.sql.execution.PhysicalRDD
+import org.apache.spark.sql.execution.DataSourceScan
 import org.apache.spark.sql.execution.datasources.{BucketSpec, 
DataSourceStrategy}
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.SortMergeJoin
@@ -93,7 +93,7 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils 
with TestHiveSinglet
 
       // Filter could hide the bug in bucket pruning. Thus, skipping all the 
filters
       val plan = 
bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan
-      val rdd = plan.find(_.isInstanceOf[PhysicalRDD])
+      val rdd = plan.find(_.isInstanceOf[DataSourceScan])
       assert(rdd.isDefined, plan)
 
       val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case 
(index, iter) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to