http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 7c8bc7f..56a3906 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.{DataType, ObjectType} case class DeserializeToObject( deserializer: Expression, outputObjAttr: Attribute, - child: SparkPlan) extends UnaryNode with CodegenSupport { + child: SparkPlan) extends UnaryExecNode with CodegenSupport { override def output: Seq[Attribute] = outputObjAttr :: Nil override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) @@ -67,9 +67,9 @@ case class DeserializeToObject( * Takes the input object from child and turns in into unsafe row using the given serializer * expression. The output of its child must be a single-field row containing the input object. */ -case class SerializeFromObject( +case class SerializeFromObjectExec( serializer: Seq[NamedExpression], - child: SparkPlan) extends UnaryNode with CodegenSupport { + child: SparkPlan) extends UnaryExecNode with CodegenSupport { override def output: Seq[Attribute] = serializer.map(_.toAttribute) @@ -136,10 +136,11 @@ trait ObjectOperator extends SparkPlan { * Applies the given function to input object iterator. * The output of its child must be a single-field row containing the input object. */ -case class MapPartitions( +case class MapPartitionsExec( func: Iterator[Any] => Iterator[Any], outputObjAttr: Attribute, - child: SparkPlan) extends UnaryNode with ObjectOperator { + child: SparkPlan) + extends UnaryExecNode with ObjectOperator { override def output: Seq[Attribute] = outputObjAttr :: Nil override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) @@ -157,13 +158,14 @@ case class MapPartitions( * Applies the given function to each input object. * The output of its child must be a single-field row containing the input object. * - * This operator is kind of a safe version of [[Project]], as it's output is custom object, we need - * to use safe row to contain it. + * This operator is kind of a safe version of [[ProjectExec]], as it's output is custom object, + * we need to use safe row to contain it. */ -case class MapElements( +case class MapElementsExec( func: AnyRef, outputObjAttr: Attribute, - child: SparkPlan) extends UnaryNode with ObjectOperator with CodegenSupport { + child: SparkPlan) + extends UnaryExecNode with ObjectOperator with CodegenSupport { override def output: Seq[Attribute] = outputObjAttr :: Nil override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) @@ -211,11 +213,11 @@ case class MapElements( /** * Applies the given function to each input row, appending the encoded result at the end of the row. */ -case class AppendColumns( +case class AppendColumnsExec( func: Any => Any, deserializer: Expression, serializer: Seq[NamedExpression], - child: SparkPlan) extends UnaryNode with ObjectOperator { + child: SparkPlan) extends UnaryExecNode with ObjectOperator { override def output: Seq[Attribute] = child.output ++ serializer.map(_.toAttribute) @@ -236,13 +238,14 @@ case class AppendColumns( } /** - * An optimized version of [[AppendColumns]], that can be executed on deserialized object directly. + * An optimized version of [[AppendColumnsExec]], that can be executed + * on deserialized object directly. */ -case class AppendColumnsWithObject( +case class AppendColumnsWithObjectExec( func: Any => Any, inputSerializer: Seq[NamedExpression], newColumnsSerializer: Seq[NamedExpression], - child: SparkPlan) extends UnaryNode with ObjectOperator { + child: SparkPlan) extends UnaryExecNode with ObjectOperator { override def output: Seq[Attribute] = (inputSerializer ++ newColumnsSerializer).map(_.toAttribute) @@ -269,14 +272,14 @@ case class AppendColumnsWithObject( * Groups the input rows together and calls the function with each group and an iterator containing * all elements in the group. The result of this function is flattened before being output. */ -case class MapGroups( +case class MapGroupsExec( func: (Any, Iterator[Any]) => TraversableOnce[Any], keyDeserializer: Expression, valueDeserializer: Expression, groupingAttributes: Seq[Attribute], dataAttributes: Seq[Attribute], outputObjAttr: Attribute, - child: SparkPlan) extends UnaryNode with ObjectOperator { + child: SparkPlan) extends UnaryExecNode with ObjectOperator { override def output: Seq[Attribute] = outputObjAttr :: Nil override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) @@ -310,7 +313,7 @@ case class MapGroups( * iterators containing all elements in the group from left and right side. * The result of this function is flattened before being output. */ -case class CoGroup( +case class CoGroupExec( func: (Any, Iterator[Any], Iterator[Any]) => TraversableOnce[Any], keyDeserializer: Expression, leftDeserializer: Expression, @@ -321,7 +324,7 @@ case class CoGroup( rightAttr: Seq[Attribute], outputObjAttr: Attribute, left: SparkPlan, - right: SparkPlan) extends BinaryNode with ObjectOperator { + right: SparkPlan) extends BinaryExecNode with ObjectOperator { override def output: Seq[Attribute] = outputObjAttr :: Nil override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala new file mode 100644 index 0000000..061d7c7 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -0,0 +1,149 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import net.razorvine.pickle.{Pickler, Unpickler} + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.{DataType, StructField, StructType} + + +/** + * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at a time. + * + * Python evaluation works by sending the necessary (projected) input data via a socket to an + * external Python process, and combine the result from the Python process with the original row. + * + * For each row we send to Python, we also put it in a queue. For each output row from Python, + * we drain the queue to find the original input row. Note that if the Python process is way too + * slow, this could lead to the queue growing unbounded and eventually run out of memory. + */ +case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan) + extends SparkPlan { + + def children: Seq[SparkPlan] = child :: Nil + + private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { + udf.children match { + case Seq(u: PythonUDF) => + val (chained, children) = collectFunctions(u) + (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) + case children => + // There should not be any other UDFs, or the children can't be evaluated directly. + assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty)) + (ChainedPythonFunctions(Seq(udf.func)), udf.children) + } + } + + protected override def doExecute(): RDD[InternalRow] = { + val inputRDD = child.execute().map(_.copy()) + val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) + val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) + + inputRDD.mapPartitions { iter => + EvaluatePython.registerPicklers() // register pickler for Row + + // The queue used to buffer input rows so we can drain it to + // combine input with output from Python. + val queue = new java.util.concurrent.ConcurrentLinkedQueue[InternalRow]() + + val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip + + // flatten all the arguments + val allInputs = new ArrayBuffer[Expression] + val dataTypes = new ArrayBuffer[DataType] + val argOffsets = inputs.map { input => + input.map { e => + if (allInputs.exists(_.semanticEquals(e))) { + allInputs.indexWhere(_.semanticEquals(e)) + } else { + allInputs += e + dataTypes += e.dataType + allInputs.length - 1 + } + }.toArray + }.toArray + val projection = newMutableProjection(allInputs, child.output) + val schema = StructType(dataTypes.map(dt => StructField("", dt))) + val needConversion = dataTypes.exists(EvaluatePython.needConversionInPython) + + // enable memo iff we serialize the row with schema (schema and class should be memorized) + val pickle = new Pickler(needConversion) + // Input iterator to Python: input rows are grouped so we send them in batches to Python. + // For each row, add it to the queue. + val inputIterator = iter.grouped(100).map { inputRows => + val toBePickled = inputRows.map { inputRow => + queue.add(inputRow) + val row = projection(inputRow) + if (needConversion) { + EvaluatePython.toJava(row, schema) + } else { + // fast path for these types that does not need conversion in Python + val fields = new Array[Any](row.numFields) + var i = 0 + while (i < row.numFields) { + val dt = dataTypes(i) + fields(i) = EvaluatePython.toJava(row.get(i, dt), dt) + i += 1 + } + fields + } + }.toArray + pickle.dumps(toBePickled) + } + + val context = TaskContext.get() + + // Output iterator for results from Python. + val outputIterator = new PythonRunner(pyFuncs, bufferSize, reuseWorker, true, argOffsets) + .compute(inputIterator, context.partitionId(), context) + + val unpickle = new Unpickler + val mutableRow = new GenericMutableRow(1) + val joined = new JoinedRow + val resultType = if (udfs.length == 1) { + udfs.head.dataType + } else { + StructType(udfs.map(u => StructField("", u.dataType, u.nullable))) + } + val resultProj = UnsafeProjection.create(output, output) + + outputIterator.flatMap { pickedResult => + val unpickledBatch = unpickle.loads(pickedResult) + unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala + }.map { result => + val row = if (udfs.length == 1) { + // fast path for single UDF + mutableRow(0) = EvaluatePython.fromJava(result, resultType) + mutableRow + } else { + EvaluatePython.fromJava(result, resultType).asInstanceOf[InternalRow] + } + resultProj(joined(queue.poll(), row)) + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala deleted file mode 100644 index c49f173..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala +++ /dev/null @@ -1,149 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.execution.python - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -import net.razorvine.pickle.{Pickler, Unpickler} - -import org.apache.spark.TaskContext -import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.types.{DataType, StructField, StructType} - - -/** - * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at a time. - * - * Python evaluation works by sending the necessary (projected) input data via a socket to an - * external Python process, and combine the result from the Python process with the original row. - * - * For each row we send to Python, we also put it in a queue. For each output row from Python, - * we drain the queue to find the original input row. Note that if the Python process is way too - * slow, this could lead to the queue growing unbounded and eventually run out of memory. - */ -case class BatchPythonEvaluation(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan) - extends SparkPlan { - - def children: Seq[SparkPlan] = child :: Nil - - private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, Seq[Expression]) = { - udf.children match { - case Seq(u: PythonUDF) => - val (chained, children) = collectFunctions(u) - (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children) - case children => - // There should not be any other UDFs, or the children can't be evaluated directly. - assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty)) - (ChainedPythonFunctions(Seq(udf.func)), udf.children) - } - } - - protected override def doExecute(): RDD[InternalRow] = { - val inputRDD = child.execute().map(_.copy()) - val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) - val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) - - inputRDD.mapPartitions { iter => - EvaluatePython.registerPicklers() // register pickler for Row - - // The queue used to buffer input rows so we can drain it to - // combine input with output from Python. - val queue = new java.util.concurrent.ConcurrentLinkedQueue[InternalRow]() - - val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip - - // flatten all the arguments - val allInputs = new ArrayBuffer[Expression] - val dataTypes = new ArrayBuffer[DataType] - val argOffsets = inputs.map { input => - input.map { e => - if (allInputs.exists(_.semanticEquals(e))) { - allInputs.indexWhere(_.semanticEquals(e)) - } else { - allInputs += e - dataTypes += e.dataType - allInputs.length - 1 - } - }.toArray - }.toArray - val projection = newMutableProjection(allInputs, child.output) - val schema = StructType(dataTypes.map(dt => StructField("", dt))) - val needConversion = dataTypes.exists(EvaluatePython.needConversionInPython) - - // enable memo iff we serialize the row with schema (schema and class should be memorized) - val pickle = new Pickler(needConversion) - // Input iterator to Python: input rows are grouped so we send them in batches to Python. - // For each row, add it to the queue. - val inputIterator = iter.grouped(100).map { inputRows => - val toBePickled = inputRows.map { inputRow => - queue.add(inputRow) - val row = projection(inputRow) - if (needConversion) { - EvaluatePython.toJava(row, schema) - } else { - // fast path for these types that does not need conversion in Python - val fields = new Array[Any](row.numFields) - var i = 0 - while (i < row.numFields) { - val dt = dataTypes(i) - fields(i) = EvaluatePython.toJava(row.get(i, dt), dt) - i += 1 - } - fields - } - }.toArray - pickle.dumps(toBePickled) - } - - val context = TaskContext.get() - - // Output iterator for results from Python. - val outputIterator = new PythonRunner(pyFuncs, bufferSize, reuseWorker, true, argOffsets) - .compute(inputIterator, context.partitionId(), context) - - val unpickle = new Unpickler - val mutableRow = new GenericMutableRow(1) - val joined = new JoinedRow - val resultType = if (udfs.length == 1) { - udfs.head.dataType - } else { - StructType(udfs.map(u => StructField("", u.dataType, u.nullable))) - } - val resultProj = UnsafeProjection.create(output, output) - - outputIterator.flatMap { pickedResult => - val unpickledBatch = unpickle.loads(pickedResult) - unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala - }.map { result => - val row = if (udfs.length == 1) { - // fast path for single UDF - mutableRow(0) = EvaluatePython.fromJava(result, resultType) - mutableRow - } else { - EvaluatePython.fromJava(result, resultType).asInstanceOf[InternalRow] - } - resultProj(joined(queue.poll(), row)) - } - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index d72b3d3..ab19236 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -79,7 +79,7 @@ private[spark] object ExtractPythonUDFs extends Rule[SparkPlan] { val resultAttrs = udfs.zipWithIndex.map { case (u, i) => AttributeReference(s"pythonUDF$i", u.dataType)() } - val evaluation = BatchPythonEvaluation(validUdfs, child.output ++ resultAttrs, child) + val evaluation = BatchEvalPythonExec(validUdfs, child.output ++ resultAttrs, child) attributeMap ++= validUdfs.zip(resultAttrs) evaluation } else { @@ -105,7 +105,7 @@ private[spark] object ExtractPythonUDFs extends Rule[SparkPlan] { val newPlan = extract(rewritten) if (newPlan.output != plan.output) { // Trim away the new UDF value if it was only used for filtering or something. - execution.Project(plan.output, newPlan) + execution.ProjectExec(plan.output, newPlan) } else { newPlan } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 81244ed..a1a1108 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -18,11 +18,10 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.analysis.{OutputMode, UnsupportedOperationChecker} +import org.apache.spark.sql.catalyst.analysis.OutputMode import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryNode} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} /** * A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]] @@ -54,17 +53,17 @@ class IncrementalExecution( /** Locates save/restore pairs surrounding aggregation. */ val state = new Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = plan transform { - case StateStoreSave(keys, None, - UnaryNode(agg, - StateStoreRestore(keys2, None, child))) => + case StateStoreSaveExec(keys, None, + UnaryExecNode(agg, + StateStoreRestoreExec(keys2, None, child))) => val stateId = OperatorStateId(checkpointLocation, operatorId, currentBatchId - 1) operatorId += 1 - StateStoreSave( + StateStoreSaveExec( keys, Some(stateId), agg.withNewChildren( - StateStoreRestore( + StateStoreRestoreExec( keys, Some(stateId), child) :: Nil)) http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index 5957747..de4305f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -50,10 +50,11 @@ trait StatefulOperator extends SparkPlan { * For each input tuple, the key is calculated and the value from the [[StateStore]] is added * to the stream (in addition to the input tuple) if present. */ -case class StateStoreRestore( +case class StateStoreRestoreExec( keyExpressions: Seq[Attribute], stateId: Option[OperatorStateId], - child: SparkPlan) extends execution.UnaryNode with StatefulOperator { + child: SparkPlan) + extends execution.UnaryExecNode with StatefulOperator { override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsWithStateStore( @@ -78,10 +79,11 @@ case class StateStoreRestore( /** * For each input tuple, the key is calculated and the tuple is `put` into the [[StateStore]]. */ -case class StateStoreSave( +case class StateStoreSaveExec( keyExpressions: Seq[Attribute], stateId: Option[OperatorStateId], - child: SparkPlan) extends execution.UnaryNode with StatefulOperator { + child: SparkPlan) + extends execution.UnaryExecNode with StatefulOperator { override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsWithStateStore( http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index c023cc5..1341e45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -40,7 +40,7 @@ case class ScalarSubquery( override def withNewPlan(plan: LogicalPlan): SubqueryExpression = { throw new UnsupportedOperationException } - override def plan: SparkPlan = Subquery(simpleString, executedPlan) + override def plan: SparkPlan = SubqueryExec(simpleString, executedPlan) override def dataType: DataType = executedPlan.schema.fields.head.dataType override def children: Seq[Expression] = Nil http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index c6fcb69..1959f1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import org.apache.commons.lang3.StringEscapeUtils -import org.apache.spark.sql.execution.{SparkPlanInfo, WholeStageCodegen} +import org.apache.spark.sql.execution.{SparkPlanInfo, WholeStageCodegenExec} import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -178,7 +178,7 @@ private[ui] class SparkPlanGraphCluster( extends SparkPlanGraphNode(id, name, desc, Map.empty, metrics) { override def makeDotNode(metricsValue: Map[Long, String]): String = { - val duration = metrics.filter(_.name.startsWith(WholeStageCodegen.PIPELINE_DURATION_METRIC)) + val duration = metrics.filter(_.name.startsWith(WholeStageCodegenExec.PIPELINE_DURATION_METRIC)) val labelStr = if (duration.nonEmpty) { require(duration.length == 1) val id = duration(0).accumulatorId http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 82b79c7..4aea21e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -23,7 +23,7 @@ import scala.language.postfixOps import org.scalatest.concurrent.Eventually._ import org.apache.spark.Accumulators -import org.apache.spark.sql.execution.PhysicalRDD +import org.apache.spark.sql.execution.RDDScanExec import org.apache.spark.sql.execution.columnar._ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.functions._ @@ -38,7 +38,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext def rddIdOf(tableName: String): Int = { val plan = sqlContext.table(tableName).queryExecution.sparkPlan plan.collect { - case InMemoryColumnarTableScan(_, _, relation) => + case InMemoryTableScanExec(_, _, relation) => relation.cachedColumnBuffers.id case _ => fail(s"Table $tableName is not cached\n" + plan) @@ -167,7 +167,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext sqlContext.cacheTable("testData") assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") { sqlContext.table("testData").queryExecution.withCachedData.collect { - case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan, _) => r + case r @ InMemoryRelation(_, _, _, _, _: InMemoryTableScanExec, _) => r }.size } @@ -351,8 +351,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext |abc a join abc b on a.key=b.key |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan - assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 3) - assert(sparkPlan.collect { case e: PhysicalRDD => e }.size === 0) + assert(sparkPlan.collect { case e: InMemoryTableScanExec => e }.size === 3) + assert(sparkPlan.collect { case e: RDDScanExec => e }.size === 0) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index 351b03b..19fe29a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import org.scalatest.Matchers._ import org.apache.spark.sql.catalyst.expressions.NamedExpression -import org.apache.spark.sql.execution.Project +import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -631,7 +631,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = { val projects = df.queryExecution.sparkPlan.collect { - case tungstenProject: Project => tungstenProject + case tungstenProject: ProjectExec => tungstenProject } assert(projects.size === expectedNumProjects) } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 067a62d..0414fa1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.plans.logical.Join -import org.apache.spark.sql.execution.joins.BroadcastHashJoin +import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext @@ -142,11 +142,11 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { // equijoin - should be converted into broadcast join val plan1 = df1.join(broadcast(df2), "key").queryExecution.sparkPlan - assert(plan1.collect { case p: BroadcastHashJoin => p }.size === 1) + assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size === 1) // no join key -- should not be a broadcast join val plan2 = df1.join(broadcast(df2)).queryExecution.sparkPlan - assert(plan2.collect { case p: BroadcastHashJoin => p }.size === 0) + assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size === 0) // planner should not crash without a join broadcast(df1).queryExecution.sparkPlan http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index e953a6e..4c18784 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.aggregate.TungstenAggregate -import org.apache.spark.sql.execution.exchange.{BroadcastExchange, ReusedExchange, ShuffleExchange} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext} @@ -1355,16 +1355,18 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { checkAnswer(join, df) assert( join.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1) - assert(join.queryExecution.executedPlan.collect { case e: ReusedExchange => true }.size === 1) + assert( + join.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size === 1) val broadcasted = broadcast(join) val join2 = join.join(broadcasted, "id").join(broadcasted, "id") checkAnswer(join2, df) assert( join2.queryExecution.executedPlan.collect { case e: ShuffleExchange => true }.size === 1) assert( - join2.queryExecution.executedPlan.collect { case e: BroadcastExchange => true }.size === 1) + join2.queryExecution.executedPlan + .collect { case e: BroadcastExchangeExec => true }.size === 1) assert( - join2.queryExecution.executedPlan.collect { case e: ReusedExchange => true }.size === 4) + join2.queryExecution.executedPlan.collect { case e: ReusedExchangeExec => true }.size === 4) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index a87a41c..9e5a41d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -46,11 +46,11 @@ class JoinSuite extends QueryTest with SharedSQLContext { val df = sql(sqlString) val physical = df.queryExecution.sparkPlan val operators = physical.collect { - case j: BroadcastHashJoin => j - case j: ShuffledHashJoin => j - case j: CartesianProduct => j - case j: BroadcastNestedLoopJoin => j - case j: SortMergeJoin => j + case j: BroadcastHashJoinExec => j + case j: ShuffledHashJoinExec => j + case j: CartesianProductExec => j + case j: BroadcastNestedLoopJoinExec => j + case j: SortMergeJoinExec => j } assert(operators.size === 1) @@ -64,39 +64,43 @@ class JoinSuite extends QueryTest with SharedSQLContext { withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") { Seq( - ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[ShuffledHashJoin]), - ("SELECT * FROM testData LEFT SEMI JOIN testData2", classOf[BroadcastNestedLoopJoin]), - ("SELECT * FROM testData JOIN testData2", classOf[CartesianProduct]), - ("SELECT * FROM testData JOIN testData2 WHERE key = 2", classOf[CartesianProduct]), - ("SELECT * FROM testData LEFT JOIN testData2", classOf[BroadcastNestedLoopJoin]), - ("SELECT * FROM testData RIGHT JOIN testData2", classOf[BroadcastNestedLoopJoin]), - ("SELECT * FROM testData FULL OUTER JOIN testData2", classOf[BroadcastNestedLoopJoin]), + ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", + classOf[ShuffledHashJoinExec]), + ("SELECT * FROM testData LEFT SEMI JOIN testData2", classOf[BroadcastNestedLoopJoinExec]), + ("SELECT * FROM testData JOIN testData2", classOf[CartesianProductExec]), + ("SELECT * FROM testData JOIN testData2 WHERE key = 2", classOf[CartesianProductExec]), + ("SELECT * FROM testData LEFT JOIN testData2", classOf[BroadcastNestedLoopJoinExec]), + ("SELECT * FROM testData RIGHT JOIN testData2", classOf[BroadcastNestedLoopJoinExec]), + ("SELECT * FROM testData FULL OUTER JOIN testData2", classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData LEFT JOIN testData2 WHERE key = 2", - classOf[BroadcastNestedLoopJoin]), - ("SELECT * FROM testData RIGHT JOIN testData2 WHERE key = 2", classOf[CartesianProduct]), + classOf[BroadcastNestedLoopJoinExec]), + ("SELECT * FROM testData RIGHT JOIN testData2 WHERE key = 2", + classOf[CartesianProductExec]), ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2", - classOf[BroadcastNestedLoopJoin]), - ("SELECT * FROM testData JOIN testData2 WHERE key > a", classOf[CartesianProduct]), + classOf[BroadcastNestedLoopJoinExec]), + ("SELECT * FROM testData JOIN testData2 WHERE key > a", classOf[CartesianProductExec]), ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a", - classOf[CartesianProduct]), - ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]), - ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]), - ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin]), - ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeJoin]), + classOf[CartesianProductExec]), + ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoinExec]), + ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", + classOf[SortMergeJoinExec]), + ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", + classOf[SortMergeJoinExec]), + ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", classOf[SortMergeJoinExec]), ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2", - classOf[SortMergeJoin]), + classOf[SortMergeJoinExec]), ("SELECT * FROM testData right join testData2 ON key = a and key = 2", - classOf[SortMergeJoin]), + classOf[SortMergeJoinExec]), ("SELECT * FROM testData full outer join testData2 ON key = a", - classOf[SortMergeJoin]), + classOf[SortMergeJoinExec]), ("SELECT * FROM testData left JOIN testData2 ON (key * a != key + a)", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData right JOIN testData2 ON (key * a != key + a)", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)", - classOf[BroadcastNestedLoopJoin]), - ("SELECT * FROM testData ANTI JOIN testData2 ON key = a", classOf[ShuffledHashJoin]), - ("SELECT * FROM testData LEFT ANTI JOIN testData2", classOf[BroadcastNestedLoopJoin]) + classOf[BroadcastNestedLoopJoinExec]), + ("SELECT * FROM testData ANTI JOIN testData2 ON key = a", classOf[ShuffledHashJoinExec]), + ("SELECT * FROM testData LEFT ANTI JOIN testData2", classOf[BroadcastNestedLoopJoinExec]) ).foreach(assertJoin) } } @@ -112,11 +116,11 @@ class JoinSuite extends QueryTest with SharedSQLContext { sql("CACHE TABLE testData") Seq( ("SELECT * FROM testData join testData2 ON key = a", - classOf[BroadcastHashJoin]), + classOf[BroadcastHashJoinExec]), ("SELECT * FROM testData join testData2 ON key = a and key = 2", - classOf[BroadcastHashJoin]), + classOf[BroadcastHashJoinExec]), ("SELECT * FROM testData join testData2 ON key = a where key = 2", - classOf[BroadcastHashJoin]) + classOf[BroadcastHashJoinExec]) ).foreach(assertJoin) sql("UNCACHE TABLE testData") } @@ -127,11 +131,11 @@ class JoinSuite extends QueryTest with SharedSQLContext { sql("CACHE TABLE testData2") Seq( ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", - classOf[BroadcastHashJoin]), + classOf[BroadcastHashJoinExec]), ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2", - classOf[BroadcastHashJoin]), + classOf[BroadcastHashJoinExec]), ("SELECT * FROM testData right join testData2 ON key = a and key = 2", - classOf[BroadcastHashJoin]) + classOf[BroadcastHashJoinExec]) ).foreach(assertJoin) sql("UNCACHE TABLE testData") } @@ -428,15 +432,18 @@ class JoinSuite extends QueryTest with SharedSQLContext { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1000000000") { Seq( - ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[BroadcastHashJoin]), - ("SELECT * FROM testData ANT JOIN testData2 ON key = a", classOf[BroadcastHashJoin]) + ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", + classOf[BroadcastHashJoinExec]), + ("SELECT * FROM testData ANT JOIN testData2 ON key = a", classOf[BroadcastHashJoinExec]) ).foreach(assertJoin) } withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { Seq( - ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[ShuffledHashJoin]), - ("SELECT * FROM testData LEFT ANTI JOIN testData2 ON key = a", classOf[ShuffledHashJoin]) + ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", + classOf[ShuffledHashJoinExec]), + ("SELECT * FROM testData LEFT ANTI JOIN testData2 ON key = a", + classOf[ShuffledHashJoinExec]) ).foreach(assertJoin) } @@ -460,35 +467,35 @@ class JoinSuite extends QueryTest with SharedSQLContext { Seq( ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", - classOf[ShuffledHashJoin]), + classOf[ShuffledHashJoinExec]), ("SELECT * FROM testData LEFT SEMI JOIN testData2", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData JOIN testData2", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData JOIN testData2 WHERE key = 2", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData LEFT JOIN testData2", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData RIGHT JOIN testData2", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData FULL OUTER JOIN testData2", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData LEFT JOIN testData2 WHERE key = 2", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData RIGHT JOIN testData2 WHERE key = 2", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData JOIN testData2 WHERE key > a", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData left JOIN testData2 WHERE (key * a != key + a)", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData right JOIN testData2 WHERE (key * a != key + a)", - classOf[BroadcastNestedLoopJoin]), + classOf[BroadcastNestedLoopJoinExec]), ("SELECT * FROM testData full JOIN testData2 WHERE (key * a != key + a)", - classOf[BroadcastNestedLoopJoin]) + classOf[BroadcastNestedLoopJoinExec]) ).foreach(assertJoin) checkAnswer( http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 9e64049..84f0c00 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.execution.aggregate -import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} @@ -866,12 +866,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("SPARK-11111 null-safe join should not use cartesian product") { val df = sql("select count(*) from testData a join testData b on (a.key <=> b.key)") val cp = df.queryExecution.sparkPlan.collect { - case cp: CartesianProduct => cp + case cp: CartesianProductExec => cp } assert(cp.isEmpty, "should not use CartesianProduct for null-safe join") val smj = df.queryExecution.sparkPlan.collect { - case smj: SortMergeJoin => smj - case j: BroadcastHashJoin => j + case smj: SortMergeJoinExec => smj + case j: BroadcastHashJoinExec => j } assert(smj.size > 0, "should use SortMergeJoin or BroadcastHashJoin") checkAnswer(df, Row(100) :: Nil) http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala index 17f2343..ba16810 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, IdentityBroadcastMode, SinglePartition} -import org.apache.spark.sql.execution.exchange.{BroadcastExchange, ReusedExchange, ShuffleExchange} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange} import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode import org.apache.spark.sql.test.SharedSQLContext @@ -55,13 +55,13 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext { val output = plan.output assert(plan sameResult plan) - val exchange1 = BroadcastExchange(IdentityBroadcastMode, plan) + val exchange1 = BroadcastExchangeExec(IdentityBroadcastMode, plan) val hashMode = HashedRelationBroadcastMode(output) - val exchange2 = BroadcastExchange(hashMode, plan) + val exchange2 = BroadcastExchangeExec(hashMode, plan) val hashMode2 = HashedRelationBroadcastMode(Alias(output.head, "id2")() :: Nil) - val exchange3 = BroadcastExchange(hashMode2, plan) - val exchange4 = ReusedExchange(output, exchange3) + val exchange3 = BroadcastExchangeExec(hashMode2, plan) + val exchange4 = ReusedExchangeExec(output, exchange3) assert(exchange1 sameResult exchange1) assert(exchange2 sameResult exchange2) @@ -87,7 +87,7 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext { val exchange3 = ShuffleExchange(part2, plan) val part3 = HashPartitioning(output ++ output, 2) val exchange4 = ShuffleExchange(part3, plan) - val exchange5 = ReusedExchange(output, exchange4) + val exchange5 = ReusedExchangeExec(output, exchange4) assert(exchange1 sameResult exchange1) assert(exchange2 sameResult exchange2) http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index bdbcf84..3b2911d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.columnar.InMemoryRelation -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchange, ReuseExchange, ShuffleExchange} -import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin} +import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchange} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -86,8 +86,8 @@ class PlannerSuite extends SharedSQLContext { |FROM testData2 l JOIN (SELECT * FROM testLimit LIMIT 1) r ON (l.a = r.key) """.stripMargin).queryExecution.sparkPlan - val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join } - val sortMergeJoins = planned.collect { case join: SortMergeJoin => join } + val broadcastHashJoins = planned.collect { case join: BroadcastHashJoinExec => join } + val sortMergeJoins = planned.collect { case join: SortMergeJoinExec => join } assert(broadcastHashJoins.size === 1, "Should use broadcast hash join") assert(sortMergeJoins.isEmpty, "Should not use sort merge join") @@ -139,8 +139,8 @@ class PlannerSuite extends SharedSQLContext { val b = sqlContext.table("tiny").as("b") val planned = a.join(b, $"a.key" === $"b.key").queryExecution.sparkPlan - val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join } - val sortMergeJoins = planned.collect { case join: SortMergeJoin => join } + val broadcastHashJoins = planned.collect { case join: BroadcastHashJoinExec => join } + val sortMergeJoins = planned.collect { case join: SortMergeJoinExec => join } assert(broadcastHashJoins.size === 1, "Should use broadcast hash join") assert(sortMergeJoins.isEmpty, "Should not use shuffled hash join") @@ -167,34 +167,34 @@ class PlannerSuite extends SharedSQLContext { test("efficient terminal limit -> sort should use TakeOrderedAndProject") { val query = testData.select('key, 'value).sort('key).limit(2) val planned = query.queryExecution.executedPlan - assert(planned.isInstanceOf[execution.TakeOrderedAndProject]) + assert(planned.isInstanceOf[execution.TakeOrderedAndProjectExec]) assert(planned.output === testData.select('key, 'value).logicalPlan.output) } test("terminal limit -> project -> sort should use TakeOrderedAndProject") { val query = testData.select('key, 'value).sort('key).select('value, 'key).limit(2) val planned = query.queryExecution.executedPlan - assert(planned.isInstanceOf[execution.TakeOrderedAndProject]) + assert(planned.isInstanceOf[execution.TakeOrderedAndProjectExec]) assert(planned.output === testData.select('value, 'key).logicalPlan.output) } test("terminal limits that are not handled by TakeOrderedAndProject should use CollectLimit") { val query = testData.select('value).limit(2) val planned = query.queryExecution.sparkPlan - assert(planned.isInstanceOf[CollectLimit]) + assert(planned.isInstanceOf[CollectLimitExec]) assert(planned.output === testData.select('value).logicalPlan.output) } test("TakeOrderedAndProject can appear in the middle of plans") { val query = testData.select('key, 'value).sort('key).limit(2).filter('key === 3) val planned = query.queryExecution.executedPlan - assert(planned.find(_.isInstanceOf[TakeOrderedAndProject]).isDefined) + assert(planned.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isDefined) } test("CollectLimit can appear in the middle of a plan when caching is used") { val query = testData.select('key, 'value).limit(2).cache() val planned = query.queryExecution.optimizedPlan.asInstanceOf[InMemoryRelation] - assert(planned.child.isInstanceOf[CollectLimit]) + assert(planned.child.isInstanceOf[CollectLimitExec]) } test("PartitioningCollection") { @@ -394,7 +394,7 @@ class PlannerSuite extends SharedSQLContext { ) val outputPlan = EnsureRequirements(sqlContext.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) - if (outputPlan.collect { case s: Sort => true }.isEmpty) { + if (outputPlan.collect { case s: SortExec => true }.isEmpty) { fail(s"Sort should have been added:\n$outputPlan") } } @@ -410,7 +410,7 @@ class PlannerSuite extends SharedSQLContext { ) val outputPlan = EnsureRequirements(sqlContext.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) - if (outputPlan.collect { case s: Sort => true }.nonEmpty) { + if (outputPlan.collect { case s: SortExec => true }.nonEmpty) { fail(s"No sorts should have been added:\n$outputPlan") } } @@ -427,7 +427,7 @@ class PlannerSuite extends SharedSQLContext { ) val outputPlan = EnsureRequirements(sqlContext.sessionState.conf).apply(inputPlan) assertDistributionRequirementsAreSatisfied(outputPlan) - if (outputPlan.collect { case s: Sort => true }.isEmpty) { + if (outputPlan.collect { case s: SortExec => true }.isEmpty) { fail(s"Sort should have been added:\n$outputPlan") } } @@ -485,7 +485,7 @@ class PlannerSuite extends SharedSQLContext { requiredChildOrdering = Seq(Seq.empty)), None) - val inputPlan = SortMergeJoin( + val inputPlan = SortMergeJoinExec( Literal(1) :: Nil, Literal(1) :: Nil, Inner, @@ -494,7 +494,7 @@ class PlannerSuite extends SharedSQLContext { shuffle) val outputPlan = ReuseExchange(sqlContext.sessionState.conf).apply(inputPlan) - if (outputPlan.collect { case e: ReusedExchange => true }.size != 1) { + if (outputPlan.collect { case e: ReusedExchangeExec => true }.size != 1) { fail(s"Should re-use the shuffle:\n$outputPlan") } if (outputPlan.collect { case e: ShuffleExchange => true }.size != 1) { @@ -502,7 +502,7 @@ class PlannerSuite extends SharedSQLContext { } // nested exchanges - val inputPlan2 = SortMergeJoin( + val inputPlan2 = SortMergeJoinExec( Literal(1) :: Nil, Literal(1) :: Nil, Inner, @@ -511,7 +511,7 @@ class PlannerSuite extends SharedSQLContext { ShuffleExchange(finalPartitioning, inputPlan)) val outputPlan2 = ReuseExchange(sqlContext.sessionState.conf).apply(inputPlan2) - if (outputPlan2.collect { case e: ReusedExchange => true }.size != 2) { + if (outputPlan2.collect { case e: ReusedExchangeExec => true }.size != 2) { fail(s"Should re-use the two shuffles:\n$outputPlan2") } if (outputPlan2.collect { case e: ShuffleExchange => true }.size != 2) { http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala index 2963a85..a19ea51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala @@ -34,7 +34,7 @@ case class ReferenceSort( sortOrder: Seq[SortOrder], global: Boolean, child: SparkPlan) - extends UnaryNode { + extends UnaryExecNode { override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala index 7784776..ebeb39b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala @@ -43,13 +43,13 @@ class SortSuite extends SparkPlanTest with SharedSQLContext { checkAnswer( input.toDF("a", "b", "c"), - (child: SparkPlan) => Sort('a.asc :: 'b.asc :: Nil, global = true, child = child), + (child: SparkPlan) => SortExec('a.asc :: 'b.asc :: Nil, global = true, child = child), input.sortBy(t => (t._1, t._2)).map(Row.fromTuple), sortAnswers = false) checkAnswer( input.toDF("a", "b", "c"), - (child: SparkPlan) => Sort('b.asc :: 'a.asc :: Nil, global = true, child = child), + (child: SparkPlan) => SortExec('b.asc :: 'a.asc :: Nil, global = true, child = child), input.sortBy(t => (t._2, t._1)).map(Row.fromTuple), sortAnswers = false) } @@ -57,8 +57,10 @@ class SortSuite extends SparkPlanTest with SharedSQLContext { test("sort followed by limit") { checkThatPlansAgree( (1 to 100).map(v => Tuple1(v)).toDF("a"), - (child: SparkPlan) => GlobalLimit(10, Sort('a.asc :: Nil, global = true, child = child)), - (child: SparkPlan) => GlobalLimit(10, ReferenceSort('a.asc :: Nil, global = true, child)), + (child: SparkPlan) => + GlobalLimitExec(10, SortExec('a.asc :: Nil, global = true, child = child)), + (child: SparkPlan) => + GlobalLimitExec(10, ReferenceSort('a.asc :: Nil, global = true, child)), sortAnswers = false ) } @@ -68,7 +70,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext { val stringLength = 1024 * 1024 * 2 checkThatPlansAgree( Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1), - Sort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1), + SortExec(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1), ReferenceSort(sortOrder, global = true, _: SparkPlan), sortAnswers = false ) @@ -78,7 +80,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext { AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "unsafe external sort") { checkThatPlansAgree( (1 to 100).map(v => Tuple1(v)).toDF("a"), - (child: SparkPlan) => Sort('a.asc :: Nil, global = true, child = child), + (child: SparkPlan) => SortExec('a.asc :: Nil, global = true, child = child), (child: SparkPlan) => ReferenceSort('a.asc :: Nil, global = true, child), sortAnswers = false) } @@ -99,7 +101,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext { ) checkThatPlansAgree( inputDf, - p => Sort(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23), + p => SortExec(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23), ReferenceSort(sortOrder, global = true, _: SparkPlan), sortAnswers = false ) http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala index a4c6d07..fba04d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala @@ -49,7 +49,7 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext { * Adds a no-op filter to the child plan in order to prevent executeCollect() from being * called directly on the child plan. */ - private def noOpFilter(plan: SparkPlan): SparkPlan = Filter(Literal(true), plan) + private def noOpFilter(plan: SparkPlan): SparkPlan = FilterExec(Literal(true), plan) val limit = 250 val sortOrder = 'a.desc :: 'b.desc :: Nil @@ -59,11 +59,11 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext { checkThatPlansAgree( generateRandomInputData(), input => - noOpFilter(TakeOrderedAndProject(limit, sortOrder, None, input)), + noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, None, input)), input => - GlobalLimit(limit, - LocalLimit(limit, - Sort(sortOrder, true, input))), + GlobalLimitExec(limit, + LocalLimitExec(limit, + SortExec(sortOrder, true, input))), sortAnswers = false) } } @@ -73,12 +73,13 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext { checkThatPlansAgree( generateRandomInputData(), input => - noOpFilter(TakeOrderedAndProject(limit, sortOrder, Some(Seq(input.output.last)), input)), + noOpFilter( + TakeOrderedAndProjectExec(limit, sortOrder, Some(Seq(input.output.last)), input)), input => - GlobalLimit(limit, - LocalLimit(limit, - Project(Seq(input.output.last), - Sort(sortOrder, true, input)))), + GlobalLimitExec(limit, + LocalLimitExec(limit, + ProjectExec(Seq(input.output.last), + SortExec(sortOrder, true, input)))), sortAnswers = false) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index d7cf1dc..233104a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.Row import org.apache.spark.sql.execution.aggregate.TungstenAggregate -import org.apache.spark.sql.execution.joins.BroadcastHashJoin +import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.expressions.scala.typed import org.apache.spark.sql.functions.{avg, broadcast, col, max} import org.apache.spark.sql.test.SharedSQLContext @@ -30,7 +30,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { test("range/filter should be combined") { val df = sqlContext.range(10).filter("id = 1").selectExpr("id + 1") val plan = df.queryExecution.executedPlan - assert(plan.find(_.isInstanceOf[WholeStageCodegen]).isDefined) + assert(plan.find(_.isInstanceOf[WholeStageCodegenExec]).isDefined) assert(df.collect() === Array(Row(2))) } @@ -38,8 +38,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val df = sqlContext.range(10).groupBy().agg(max(col("id")), avg(col("id"))) val plan = df.queryExecution.executedPlan assert(plan.find(p => - p.isInstanceOf[WholeStageCodegen] && - p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[TungstenAggregate]).isDefined) + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined) assert(df.collect() === Array(Row(9, 4.5))) } @@ -47,8 +47,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val df = sqlContext.range(3).groupBy("id").count().orderBy("id") val plan = df.queryExecution.executedPlan assert(plan.find(p => - p.isInstanceOf[WholeStageCodegen] && - p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[TungstenAggregate]).isDefined) + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined) assert(df.collect() === Array(Row(0, 1), Row(1, 1), Row(2, 1))) } @@ -58,8 +58,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val smallDF = sqlContext.createDataFrame(rdd, schema) val df = sqlContext.range(10).join(broadcast(smallDF), col("k") === col("id")) assert(df.queryExecution.executedPlan.find(p => - p.isInstanceOf[WholeStageCodegen] && - p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[BroadcastHashJoin]).isDefined) + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[BroadcastHashJoinExec]).isDefined) assert(df.collect() === Array(Row(1, 1, "1"), Row(1, 1, "1"), Row(2, 2, "2"))) } @@ -67,8 +67,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val df = sqlContext.range(3, 0, -1).toDF().sort(col("id")) val plan = df.queryExecution.executedPlan assert(plan.find(p => - p.isInstanceOf[WholeStageCodegen] && - p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[Sort]).isDefined) + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]).isDefined) assert(df.collect() === Array(Row(1), Row(2), Row(3))) } @@ -78,8 +78,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val ds = sqlContext.range(10).map(_.toString) val plan = ds.queryExecution.executedPlan assert(plan.find(p => - p.isInstanceOf[WholeStageCodegen] && - p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[SerializeFromObject]).isDefined) + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]).isDefined) assert(ds.collect() === 0.until(10).map(_.toString).toArray) } @@ -87,8 +87,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val ds = sqlContext.range(10).filter(_ % 2 == 0) val plan = ds.queryExecution.executedPlan assert(plan.find(p => - p.isInstanceOf[WholeStageCodegen] && - p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[Filter]).isDefined) + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]).isDefined) assert(ds.collect() === Array(0, 2, 4, 6, 8)) } @@ -96,8 +96,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val ds = sqlContext.range(10).filter(_ % 2 == 0).filter(_ % 3 == 0) val plan = ds.queryExecution.executedPlan assert(plan.find(p => - p.isInstanceOf[WholeStageCodegen] && - p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[SerializeFromObject]).isDefined) + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]).isDefined) assert(ds.collect() === Array(0, 6)) } @@ -109,8 +109,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val plan = ds.queryExecution.executedPlan assert(plan.find(p => - p.isInstanceOf[WholeStageCodegen] && - p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[TungstenAggregate]).isDefined) + p.isInstanceOf[WholeStageCodegenExec] && + p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined) assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala index 4f185ed..9164074 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala @@ -133,7 +133,7 @@ class PartitionBatchPruningSuite } val (readPartitions, readBatches) = df.queryExecution.sparkPlan.collect { - case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value) + case in: InMemoryTableScanExec => (in.readPartitions.value, in.readBatches.value) }.head assert(readBatches === expectedReadBatches, s"Wrong number of read batches: $queryExecution") http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 50cd03a..fb70dbd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} import org.apache.spark.sql.catalyst.util -import org.apache.spark.sql.execution.DataSourceScan +import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ @@ -375,7 +375,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi def getPhysicalFilters(df: DataFrame): ExpressionSet = { ExpressionSet( df.queryExecution.executedPlan.collect { - case execution.Filter(f, _) => splitConjunctivePredicates(f) + case execution.FilterExec(f, _) => splitConjunctivePredicates(f) }.flatten) } @@ -422,7 +422,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi def getFileScanRDD(df: DataFrame): FileScanRDD = { df.queryExecution.executedPlan.collect { - case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] => + case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] => scan.rdd.asInstanceOf[FileScanRDD] }.headOption.getOrElse { fail(s"No FileScan in query\n${df.queryExecution}") http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala index babe7ef..b9df43d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala @@ -71,15 +71,15 @@ class BroadcastJoinSuite extends QueryTest with BeforeAndAfterAll { } test("unsafe broadcast hash join updates peak execution memory") { - testBroadcastJoin[BroadcastHashJoin]("unsafe broadcast hash join", "inner") + testBroadcastJoin[BroadcastHashJoinExec]("unsafe broadcast hash join", "inner") } test("unsafe broadcast hash outer join updates peak execution memory") { - testBroadcastJoin[BroadcastHashJoin]("unsafe broadcast hash outer join", "left_outer") + testBroadcastJoin[BroadcastHashJoinExec]("unsafe broadcast hash outer join", "left_outer") } test("unsafe broadcast left semi join updates peak execution memory") { - testBroadcastJoin[BroadcastHashJoin]("unsafe broadcast left semi join", "leftsemi") + testBroadcastJoin[BroadcastHashJoinExec]("unsafe broadcast left semi join", "leftsemi") } } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala index 8cdfa8a..bc838ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala @@ -83,7 +83,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => EnsureRequirements(left.sqlContext.sessionState.conf).apply( - ShuffledHashJoin( + ShuffledHashJoinExec( leftKeys, rightKeys, joinType, BuildRight, boundCondition, left, right)), expectedAnswer, sortAnswers = true) @@ -96,7 +96,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => EnsureRequirements(left.sqlContext.sessionState.conf).apply( - BroadcastHashJoin( + BroadcastHashJoinExec( leftKeys, rightKeys, joinType, BuildRight, boundCondition, left, right)), expectedAnswer, sortAnswers = true) @@ -108,7 +108,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => EnsureRequirements(left.sqlContext.sessionState.conf).apply( - BroadcastNestedLoopJoin(left, right, BuildLeft, joinType, Some(condition))), + BroadcastNestedLoopJoinExec(left, right, BuildLeft, joinType, Some(condition))), expectedAnswer, sortAnswers = true) } @@ -118,7 +118,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => EnsureRequirements(left.sqlContext.sessionState.conf).apply( - BroadcastNestedLoopJoin(left, right, BuildRight, joinType, Some(condition))), + BroadcastNestedLoopJoinExec(left, right, BuildRight, joinType, Some(condition))), expectedAnswer, sortAnswers = true) } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 3cb3ef1..933f32e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -91,7 +91,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { leftPlan: SparkPlan, rightPlan: SparkPlan, side: BuildSide) = { - val broadcastJoin = joins.BroadcastHashJoin( + val broadcastJoin = joins.BroadcastHashJoinExec( leftKeys, rightKeys, Inner, @@ -110,9 +110,9 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { rightPlan: SparkPlan, side: BuildSide) = { val shuffledHashJoin = - joins.ShuffledHashJoin(leftKeys, rightKeys, Inner, side, None, leftPlan, rightPlan) + joins.ShuffledHashJoinExec(leftKeys, rightKeys, Inner, side, None, leftPlan, rightPlan) val filteredJoin = - boundCondition.map(Filter(_, shuffledHashJoin)).getOrElse(shuffledHashJoin) + boundCondition.map(FilterExec(_, shuffledHashJoin)).getOrElse(shuffledHashJoin) EnsureRequirements(sqlContext.sessionState.conf).apply(filteredJoin) } @@ -123,7 +123,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { leftPlan: SparkPlan, rightPlan: SparkPlan) = { val sortMergeJoin = - joins.SortMergeJoin(leftKeys, rightKeys, Inner, boundCondition, leftPlan, rightPlan) + joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition, leftPlan, rightPlan) EnsureRequirements(sqlContext.sessionState.conf).apply(sortMergeJoin) } @@ -189,7 +189,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { test(s"$testName using CartesianProduct") { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - Filter(condition(), CartesianProduct(left, right)), + FilterExec(condition(), CartesianProductExec(left, right)), expectedAnswer.map(Row.fromTuple), sortAnswers = true) } @@ -198,7 +198,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { test(s"$testName using BroadcastNestedLoopJoin build left") { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - BroadcastNestedLoopJoin(left, right, BuildLeft, Inner, Some(condition())), + BroadcastNestedLoopJoinExec(left, right, BuildLeft, Inner, Some(condition())), expectedAnswer.map(Row.fromTuple), sortAnswers = true) } @@ -207,7 +207,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { test(s"$testName using BroadcastNestedLoopJoin build right") { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - BroadcastNestedLoopJoin(left, right, BuildRight, Inner, Some(condition())), + BroadcastNestedLoopJoinExec(left, right, BuildRight, Inner, Some(condition())), expectedAnswer.map(Row.fromTuple), sortAnswers = true) } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 4cacb20..c26cb84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -83,7 +83,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext { val buildSide = if (joinType == LeftOuter) BuildRight else BuildLeft checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => EnsureRequirements(sqlContext.sessionState.conf).apply( - ShuffledHashJoin( + ShuffledHashJoinExec( leftKeys, rightKeys, joinType, buildSide, boundCondition, left, right)), expectedAnswer.map(Row.fromTuple), sortAnswers = true) @@ -102,7 +102,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext { extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _) => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - BroadcastHashJoin( + BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, boundCondition, left, right), expectedAnswer.map(Row.fromTuple), sortAnswers = true) @@ -116,7 +116,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => EnsureRequirements(sqlContext.sessionState.conf).apply( - SortMergeJoin(leftKeys, rightKeys, joinType, boundCondition, left, right)), + SortMergeJoinExec(leftKeys, rightKeys, joinType, boundCondition, left, right)), expectedAnswer.map(Row.fromTuple), sortAnswers = true) } @@ -126,7 +126,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext { test(s"$testName using BroadcastNestedLoopJoin build left") { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - BroadcastNestedLoopJoin(left, right, BuildLeft, joinType, Some(condition)), + BroadcastNestedLoopJoinExec(left, right, BuildLeft, joinType, Some(condition)), expectedAnswer.map(Row.fromTuple), sortAnswers = true) } @@ -135,7 +135,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext { test(s"$testName using BroadcastNestedLoopJoin build right") { withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => - BroadcastNestedLoopJoin(left, right, BuildRight, joinType, Some(condition)), + BroadcastNestedLoopJoinExec(left, right, BuildRight, joinType, Some(condition)), expectedAnswer.map(Row.fromTuple), sortAnswers = true) } http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index f66deea..c24abf1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.execution.DataSourceScan +import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD @@ -208,10 +208,10 @@ class JDBCSuite extends SparkFunSuite val parentPlan = df.queryExecution.executedPlan // Check if SparkPlan Filter is removed in a physical plan and // the plan only has PhysicalRDD to scan JDBCRelation. - assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]) - val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen] - assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScan]) - assert(node.child.asInstanceOf[DataSourceScan].nodeName.contains("JDBCRelation")) + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) + val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] + assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec]) + assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation")) df } assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 1")).collect().size == 0) @@ -246,9 +246,9 @@ class JDBCSuite extends SparkFunSuite val parentPlan = df.queryExecution.executedPlan // Check if SparkPlan Filter is not removed in a physical plan because JDBCRDD // cannot compile given predicates. - assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]) - val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen] - assert(node.child.isInstanceOf[org.apache.spark.sql.execution.Filter]) + assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]) + val node = parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec] + assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec]) df } assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 2")).collect().size == 0) http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 19e34b4..1470777 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -312,7 +312,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic try { val queryExecution = sql(sqlString).queryExecution val rawPlan = queryExecution.executedPlan.collect { - case p: execution.DataSourceScan => p + case p: execution.DataSourceScanExec => p } match { case Seq(p) => p case _ => fail(s"More than one PhysicalRDD found\n$queryExecution") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
