Repository: spark Updated Branches: refs/heads/master cbb0b65ad -> 5a7af9e7a
[SPARK-13250] [SQL] Update PhysicallRDD to convert to UnsafeRow if using the vectorized scanner. Some parts of the engine rely on UnsafeRow which the vectorized parquet scanner does not want to produce. This add a conversion in Physical RDD. In the case where codegen is used (and the scan is the start of the pipeline), there is no requirement to use UnsafeRow. This patch adds update PhysicallRDD to support codegen, which eliminates the need for the UnsafeRow conversion in all cases. The result of these changes for TPCDS-Q19 at the 10gb sf reduces the query time from 9.5 seconds to 6.5 seconds. Author: Nong Li <[email protected]> Closes #11141 from nongli/spark-13250. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a7af9e7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a7af9e7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a7af9e7 Branch: refs/heads/master Commit: 5a7af9e7ac85e04aa4a420bc2887207bfa18f792 Parents: cbb0b65 Author: Nong Li <[email protected]> Authored: Wed Feb 24 17:16:45 2016 -0800 Committer: Davies Liu <[email protected]> Committed: Wed Feb 24 17:16:45 2016 -0800 ---------------------------------------------------------------------- python/pyspark/sql/dataframe.py | 3 +- .../spark/sql/execution/ExistingRDD.scala | 45 ++++++++++++++-- .../spark/sql/execution/WholeStageCodegen.scala | 1 + .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 6 ++- .../spark/sql/sources/FilteredScanSuite.scala | 54 +++++++++++--------- .../spark/sql/sources/PrunedScanSuite.scala | 45 +++++++++------- .../spark/sql/sources/BucketedReadSuite.scala | 46 +++++++++-------- 7 files changed, 129 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index bf43452..7275e69 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -173,7 +173,8 @@ class DataFrame(object): >>> df.explain() == Physical Plan == - Scan ExistingRDD[age#0,name#1] + WholeStageCodegen + : +- Scan ExistingRDD[age#0,name#1] >>> df.explain(True) == Parsed Logical Plan == http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index cad7e25..8649d2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -18,12 +18,14 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Row, SQLContext} +import org.apache.spark.sql.{AnalysisException, Row, SQLConf, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation} import org.apache.spark.sql.types.DataType @@ -102,7 +104,7 @@ private[sql] case class PhysicalRDD( override val metadata: Map[String, String] = Map.empty, isUnsafeRow: Boolean = false, override val outputPartitioning: Partitioning = UnknownPartitioning(0)) - extends LeafNode { + extends LeafNode with CodegenSupport { private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) @@ -128,6 +130,36 @@ private[sql] case class PhysicalRDD( val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } + + override def upstreams(): Seq[RDD[InternalRow]] = { + rdd :: Nil + } + + // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen + // never requires UnsafeRow as input. + override protected def doProduce(ctx: CodegenContext): String = { + val input = ctx.freshName("input") + // PhysicalRDD always just has one input + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + + val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true)) + val row = ctx.freshName("row") + val numOutputRows = metricTerm(ctx, "numOutputRows") + ctx.INPUT_ROW = row + ctx.currentVars = null + val columns = exprs.map(_.gen(ctx)) + s""" + | while ($input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); + | $numOutputRows.add(1); + | ${columns.map(_.code).mkString("\n").trim} + | ${consume(ctx, columns).trim} + | if (shouldStop()) { + | return; + | } + | } + """.stripMargin + } } private[sql] object PhysicalRDD { @@ -140,8 +172,13 @@ private[sql] object PhysicalRDD { rdd: RDD[InternalRow], relation: BaseRelation, metadata: Map[String, String] = Map.empty): PhysicalRDD = { - // All HadoopFsRelations output UnsafeRows - val outputUnsafeRows = relation.isInstanceOf[HadoopFsRelation] + val outputUnsafeRows = if (relation.isInstanceOf[ParquetRelation]) { + // The vectorized parquet reader does not produce unsafe rows. + !SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) + } else { + // All HadoopFsRelations output UnsafeRows + relation.isInstanceOf[HadoopFsRelation] + } val bucketSpec = relation match { case r: HadoopFsRelation => r.getBucketSpec http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 08c52e5..afaddcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -42,6 +42,7 @@ trait CodegenSupport extends SparkPlan { case _: TungstenAggregate => "agg" case _: BroadcastHashJoin => "bhj" case _: SortMergeJoin => "smj" + case _: PhysicalRDD => "rdd" case _ => nodeName.toLowerCase } http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 7a0f7ab..f8a9a95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -187,8 +187,10 @@ class JDBCSuite extends SparkFunSuite val parentPlan = df.queryExecution.executedPlan // Check if SparkPlan Filter is removed in a physical plan and // the plan only has PhysicalRDD to scan JDBCRelation. - assert(parentPlan.isInstanceOf[PhysicalRDD]) - assert(parentPlan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]) + val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen] + assert(node.plan.isInstanceOf[org.apache.spark.sql.execution.PhysicalRDD]) + assert(node.plan.asInstanceOf[PhysicalRDD].nodeName.contains("JDBCRelation")) df } assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 7196b6d..4b578a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -304,30 +304,38 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic expectedCount: Int, requiredColumnNames: Set[String], expectedUnhandledFilters: Set[Filter]): Unit = { + test(s"PushDown Returns $expectedCount: $sqlString") { - val queryExecution = sql(sqlString).queryExecution - val rawPlan = queryExecution.executedPlan.collect { - case p: execution.PhysicalRDD => p - } match { - case Seq(p) => p - case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") - } - val rawCount = rawPlan.execute().count() - assert(ColumnsRequired.set === requiredColumnNames) - - val table = caseInsensitiveContext.table("oneToTenFiltered") - val relation = table.queryExecution.logical.collectFirst { - case LogicalRelation(r, _, _) => r - }.get - - assert( - relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters) - - if (rawCount != expectedCount) { - fail( - s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" + - s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + - queryExecution) + // These tests check a particular plan, disable whole stage codegen. + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) + try { + val queryExecution = sql(sqlString).queryExecution + val rawPlan = queryExecution.executedPlan.collect { + case p: execution.PhysicalRDD => p + } match { + case Seq(p) => p + case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") + } + val rawCount = rawPlan.execute().count() + assert(ColumnsRequired.set === requiredColumnNames) + + val table = caseInsensitiveContext.table("oneToTenFiltered") + val relation = table.queryExecution.logical.collectFirst { + case LogicalRelation(r, _, _) => r + }.get + + assert( + relation.unhandledFilters(FiltersPushed.list.toArray).toSet === expectedUnhandledFilters) + + if (rawCount != expectedCount) { + fail( + s"Wrong # of results for pushed filter. Got $rawCount, Expected $expectedCount\n" + + s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + + queryExecution) + } + } finally { + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index a89c5f8..02e8ab1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -117,28 +117,35 @@ class PrunedScanSuite extends DataSourceTest with SharedSQLContext { def testPruning(sqlString: String, expectedColumns: String*): Unit = { test(s"Columns output ${expectedColumns.mkString(",")}: $sqlString") { - val queryExecution = sql(sqlString).queryExecution - val rawPlan = queryExecution.executedPlan.collect { - case p: execution.PhysicalRDD => p - } match { - case Seq(p) => p - case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") - } - val rawColumns = rawPlan.output.map(_.name) - val rawOutput = rawPlan.execute().first() - - if (rawColumns != expectedColumns) { - fail( - s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" + - s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + - queryExecution) - } - if (rawOutput.numFields != expectedColumns.size) { - fail(s"Wrong output row. Got $rawOutput\n$queryExecution") + // These tests check a particular plan, disable whole stage codegen. + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, false) + try { + val queryExecution = sql(sqlString).queryExecution + val rawPlan = queryExecution.executedPlan.collect { + case p: execution.PhysicalRDD => p + } match { + case Seq(p) => p + case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") + } + val rawColumns = rawPlan.output.map(_.name) + val rawOutput = rawPlan.execute().first() + + if (rawColumns != expectedColumns) { + fail( + s"Wrong column names. Got $rawColumns, Expected $expectedColumns\n" + + s"Filters pushed: ${FiltersPushed.list.mkString(",")}\n" + + queryExecution) + } + + if (rawOutput.numFields != expectedColumns.size) { + fail(s"Wrong output row. Got $rawOutput\n$queryExecution") + } + } finally { + caseInsensitiveContext.conf.setConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED, + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.defaultValue.get) } } } - } http://git-wip-us.apache.org/repos/asf/spark/blob/5a7af9e7/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 0ef42f4..62f6b99 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -74,32 +74,34 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with TestHiveSinglet bucketValues: Seq[Integer], filterCondition: Column, originalDataFrame: DataFrame): Unit = { + // This test verifies parts of the plan. Disable whole stage codegen. + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k") + val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec + // Limit: bucket pruning only works when the bucket column has one and only one column + assert(bucketColumnNames.length == 1) + val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head) + val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex) + val matchedBuckets = new BitSet(numBuckets) + bucketValues.foreach { value => + matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value)) + } - val bucketedDataFrame = hiveContext.table("bucketed_table").select("i", "j", "k") - val BucketSpec(numBuckets, bucketColumnNames, _) = bucketSpec - // Limit: bucket pruning only works when the bucket column has one and only one column - assert(bucketColumnNames.length == 1) - val bucketColumnIndex = bucketedDataFrame.schema.fieldIndex(bucketColumnNames.head) - val bucketColumn = bucketedDataFrame.schema.toAttributes(bucketColumnIndex) - val matchedBuckets = new BitSet(numBuckets) - bucketValues.foreach { value => - matchedBuckets.set(DataSourceStrategy.getBucketId(bucketColumn, numBuckets, value)) - } + // Filter could hide the bug in bucket pruning. Thus, skipping all the filters + val plan = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan + val rdd = plan.find(_.isInstanceOf[PhysicalRDD]) + assert(rdd.isDefined, plan) - // Filter could hide the bug in bucket pruning. Thus, skipping all the filters - val rdd = bucketedDataFrame.filter(filterCondition).queryExecution.executedPlan - .find(_.isInstanceOf[PhysicalRDD]) - assert(rdd.isDefined) + val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => + if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty) + } + // checking if all the pruned buckets are empty + assert(checkedResult.collect().forall(_ == true)) - val checkedResult = rdd.get.execute().mapPartitionsWithIndex { case (index, iter) => - if (matchedBuckets.get(index % numBuckets)) Iterator(true) else Iterator(iter.isEmpty) + checkAnswer( + bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"), + originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")) } - // checking if all the pruned buckets are empty - assert(checkedResult.collect().forall(_ == true)) - - checkAnswer( - bucketedDataFrame.filter(filterCondition).orderBy("i", "j", "k"), - originalDataFrame.filter(filterCondition).orderBy("i", "j", "k")) } test("read partitioning bucketed tables with bucket pruning filters") { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
