http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 7c8bc7f..56a3906 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.types.{DataType, ObjectType}
 case class DeserializeToObject(
     deserializer: Expression,
     outputObjAttr: Attribute,
-    child: SparkPlan) extends UnaryNode with CodegenSupport {
+    child: SparkPlan) extends UnaryExecNode with CodegenSupport {
 
   override def output: Seq[Attribute] = outputObjAttr :: Nil
   override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
@@ -67,9 +67,9 @@ case class DeserializeToObject(
  * Takes the input object from child and turns in into unsafe row using the 
given serializer
  * expression.  The output of its child must be a single-field row containing 
the input object.
  */
-case class SerializeFromObject(
+case class SerializeFromObjectExec(
     serializer: Seq[NamedExpression],
-    child: SparkPlan) extends UnaryNode with CodegenSupport {
+    child: SparkPlan) extends UnaryExecNode with CodegenSupport {
 
   override def output: Seq[Attribute] = serializer.map(_.toAttribute)
 
@@ -136,10 +136,11 @@ trait ObjectOperator extends SparkPlan {
  * Applies the given function to input object iterator.
  * The output of its child must be a single-field row containing the input 
object.
  */
-case class MapPartitions(
+case class MapPartitionsExec(
     func: Iterator[Any] => Iterator[Any],
     outputObjAttr: Attribute,
-    child: SparkPlan) extends UnaryNode with ObjectOperator {
+    child: SparkPlan)
+  extends UnaryExecNode with ObjectOperator {
 
   override def output: Seq[Attribute] = outputObjAttr :: Nil
   override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
@@ -157,13 +158,14 @@ case class MapPartitions(
  * Applies the given function to each input object.
  * The output of its child must be a single-field row containing the input 
object.
  *
- * This operator is kind of a safe version of [[Project]], as it's output is 
custom object, we need
- * to use safe row to contain it.
+ * This operator is kind of a safe version of [[ProjectExec]], as it's output 
is custom object,
+ * we need to use safe row to contain it.
  */
-case class MapElements(
+case class MapElementsExec(
     func: AnyRef,
     outputObjAttr: Attribute,
-    child: SparkPlan) extends UnaryNode with ObjectOperator with 
CodegenSupport {
+    child: SparkPlan)
+  extends UnaryExecNode with ObjectOperator with CodegenSupport {
 
   override def output: Seq[Attribute] = outputObjAttr :: Nil
   override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
@@ -211,11 +213,11 @@ case class MapElements(
 /**
  * Applies the given function to each input row, appending the encoded result 
at the end of the row.
  */
-case class AppendColumns(
+case class AppendColumnsExec(
     func: Any => Any,
     deserializer: Expression,
     serializer: Seq[NamedExpression],
-    child: SparkPlan) extends UnaryNode with ObjectOperator {
+    child: SparkPlan) extends UnaryExecNode with ObjectOperator {
 
   override def output: Seq[Attribute] = child.output ++ 
serializer.map(_.toAttribute)
 
@@ -236,13 +238,14 @@ case class AppendColumns(
 }
 
 /**
- * An optimized version of [[AppendColumns]], that can be executed on 
deserialized object directly.
+ * An optimized version of [[AppendColumnsExec]], that can be executed
+ * on deserialized object directly.
  */
-case class AppendColumnsWithObject(
+case class AppendColumnsWithObjectExec(
     func: Any => Any,
     inputSerializer: Seq[NamedExpression],
     newColumnsSerializer: Seq[NamedExpression],
-    child: SparkPlan) extends UnaryNode with ObjectOperator {
+    child: SparkPlan) extends UnaryExecNode with ObjectOperator {
 
   override def output: Seq[Attribute] = (inputSerializer ++ 
newColumnsSerializer).map(_.toAttribute)
 
@@ -269,14 +272,14 @@ case class AppendColumnsWithObject(
  * Groups the input rows together and calls the function with each group and 
an iterator containing
  * all elements in the group.  The result of this function is flattened before 
being output.
  */
-case class MapGroups(
+case class MapGroupsExec(
     func: (Any, Iterator[Any]) => TraversableOnce[Any],
     keyDeserializer: Expression,
     valueDeserializer: Expression,
     groupingAttributes: Seq[Attribute],
     dataAttributes: Seq[Attribute],
     outputObjAttr: Attribute,
-    child: SparkPlan) extends UnaryNode with ObjectOperator {
+    child: SparkPlan) extends UnaryExecNode with ObjectOperator {
 
   override def output: Seq[Attribute] = outputObjAttr :: Nil
   override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
@@ -310,7 +313,7 @@ case class MapGroups(
  * iterators containing all elements in the group from left and right side.
  * The result of this function is flattened before being output.
  */
-case class CoGroup(
+case class CoGroupExec(
     func: (Any, Iterator[Any], Iterator[Any]) => TraversableOnce[Any],
     keyDeserializer: Expression,
     leftDeserializer: Expression,
@@ -321,7 +324,7 @@ case class CoGroup(
     rightAttr: Seq[Attribute],
     outputObjAttr: Attribute,
     left: SparkPlan,
-    right: SparkPlan) extends BinaryNode with ObjectOperator {
+    right: SparkPlan) extends BinaryExecNode with ObjectOperator {
 
   override def output: Seq[Attribute] = outputObjAttr :: Nil
   override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
new file mode 100644
index 0000000..061d7c7
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
@@ -0,0 +1,149 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import net.razorvine.pickle.{Pickler, Unpickler}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+
+
+/**
+ * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at 
a time.
+ *
+ * Python evaluation works by sending the necessary (projected) input data via 
a socket to an
+ * external Python process, and combine the result from the Python process 
with the original row.
+ *
+ * For each row we send to Python, we also put it in a queue. For each output 
row from Python,
+ * we drain the queue to find the original input row. Note that if the Python 
process is way too
+ * slow, this could lead to the queue growing unbounded and eventually run out 
of memory.
+ */
+case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], 
child: SparkPlan)
+  extends SparkPlan {
+
+  def children: Seq[SparkPlan] = child :: Nil
+
+  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
+    udf.children match {
+      case Seq(u: PythonUDF) =>
+        val (chained, children) = collectFunctions(u)
+        (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
+      case children =>
+        // There should not be any other UDFs, or the children can't be 
evaluated directly.
+        assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
+        (ChainedPythonFunctions(Seq(udf.func)), udf.children)
+    }
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val inputRDD = child.execute().map(_.copy())
+    val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+    val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", 
defaultValue = true)
+
+    inputRDD.mapPartitions { iter =>
+      EvaluatePython.registerPicklers()  // register pickler for Row
+
+      // The queue used to buffer input rows so we can drain it to
+      // combine input with output from Python.
+      val queue = new java.util.concurrent.ConcurrentLinkedQueue[InternalRow]()
+
+      val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip
+
+      // flatten all the arguments
+      val allInputs = new ArrayBuffer[Expression]
+      val dataTypes = new ArrayBuffer[DataType]
+      val argOffsets = inputs.map { input =>
+        input.map { e =>
+          if (allInputs.exists(_.semanticEquals(e))) {
+            allInputs.indexWhere(_.semanticEquals(e))
+          } else {
+            allInputs += e
+            dataTypes += e.dataType
+            allInputs.length - 1
+          }
+        }.toArray
+      }.toArray
+      val projection = newMutableProjection(allInputs, child.output)
+      val schema = StructType(dataTypes.map(dt => StructField("", dt)))
+      val needConversion = 
dataTypes.exists(EvaluatePython.needConversionInPython)
+
+      // enable memo iff we serialize the row with schema (schema and class 
should be memorized)
+      val pickle = new Pickler(needConversion)
+      // Input iterator to Python: input rows are grouped so we send them in 
batches to Python.
+      // For each row, add it to the queue.
+      val inputIterator = iter.grouped(100).map { inputRows =>
+        val toBePickled = inputRows.map { inputRow =>
+          queue.add(inputRow)
+          val row = projection(inputRow)
+          if (needConversion) {
+            EvaluatePython.toJava(row, schema)
+          } else {
+            // fast path for these types that does not need conversion in 
Python
+            val fields = new Array[Any](row.numFields)
+            var i = 0
+            while (i < row.numFields) {
+              val dt = dataTypes(i)
+              fields(i) = EvaluatePython.toJava(row.get(i, dt), dt)
+              i += 1
+            }
+            fields
+          }
+        }.toArray
+        pickle.dumps(toBePickled)
+      }
+
+      val context = TaskContext.get()
+
+      // Output iterator for results from Python.
+      val outputIterator = new PythonRunner(pyFuncs, bufferSize, reuseWorker, 
true, argOffsets)
+        .compute(inputIterator, context.partitionId(), context)
+
+      val unpickle = new Unpickler
+      val mutableRow = new GenericMutableRow(1)
+      val joined = new JoinedRow
+      val resultType = if (udfs.length == 1) {
+        udfs.head.dataType
+      } else {
+        StructType(udfs.map(u => StructField("", u.dataType, u.nullable)))
+      }
+      val resultProj = UnsafeProjection.create(output, output)
+
+      outputIterator.flatMap { pickedResult =>
+        val unpickledBatch = unpickle.loads(pickedResult)
+        unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
+      }.map { result =>
+        val row = if (udfs.length == 1) {
+          // fast path for single UDF
+          mutableRow(0) = EvaluatePython.fromJava(result, resultType)
+          mutableRow
+        } else {
+          EvaluatePython.fromJava(result, resultType).asInstanceOf[InternalRow]
+        }
+        resultProj(joined(queue.poll(), row))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala
deleted file mode 100644
index c49f173..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchPythonEvaluation.scala
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.spark.sql.execution.python
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import net.razorvine.pickle.{Pickler, Unpickler}
-
-import org.apache.spark.TaskContext
-import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
-
-
-/**
- * A physical plan that evaluates a [[PythonUDF]], one partition of tuples at 
a time.
- *
- * Python evaluation works by sending the necessary (projected) input data via 
a socket to an
- * external Python process, and combine the result from the Python process 
with the original row.
- *
- * For each row we send to Python, we also put it in a queue. For each output 
row from Python,
- * we drain the queue to find the original input row. Note that if the Python 
process is way too
- * slow, this could lead to the queue growing unbounded and eventually run out 
of memory.
- */
-case class BatchPythonEvaluation(udfs: Seq[PythonUDF], output: Seq[Attribute], 
child: SparkPlan)
-  extends SparkPlan {
-
-  def children: Seq[SparkPlan] = child :: Nil
-
-  private def collectFunctions(udf: PythonUDF): (ChainedPythonFunctions, 
Seq[Expression]) = {
-    udf.children match {
-      case Seq(u: PythonUDF) =>
-        val (chained, children) = collectFunctions(u)
-        (ChainedPythonFunctions(chained.funcs ++ Seq(udf.func)), children)
-      case children =>
-        // There should not be any other UDFs, or the children can't be 
evaluated directly.
-        assert(children.forall(_.find(_.isInstanceOf[PythonUDF]).isEmpty))
-        (ChainedPythonFunctions(Seq(udf.func)), udf.children)
-    }
-  }
-
-  protected override def doExecute(): RDD[InternalRow] = {
-    val inputRDD = child.execute().map(_.copy())
-    val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
-    val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", 
defaultValue = true)
-
-    inputRDD.mapPartitions { iter =>
-      EvaluatePython.registerPicklers()  // register pickler for Row
-
-      // The queue used to buffer input rows so we can drain it to
-      // combine input with output from Python.
-      val queue = new java.util.concurrent.ConcurrentLinkedQueue[InternalRow]()
-
-      val (pyFuncs, inputs) = udfs.map(collectFunctions).unzip
-
-      // flatten all the arguments
-      val allInputs = new ArrayBuffer[Expression]
-      val dataTypes = new ArrayBuffer[DataType]
-      val argOffsets = inputs.map { input =>
-        input.map { e =>
-          if (allInputs.exists(_.semanticEquals(e))) {
-            allInputs.indexWhere(_.semanticEquals(e))
-          } else {
-            allInputs += e
-            dataTypes += e.dataType
-            allInputs.length - 1
-          }
-        }.toArray
-      }.toArray
-      val projection = newMutableProjection(allInputs, child.output)
-      val schema = StructType(dataTypes.map(dt => StructField("", dt)))
-      val needConversion = 
dataTypes.exists(EvaluatePython.needConversionInPython)
-
-      // enable memo iff we serialize the row with schema (schema and class 
should be memorized)
-      val pickle = new Pickler(needConversion)
-      // Input iterator to Python: input rows are grouped so we send them in 
batches to Python.
-      // For each row, add it to the queue.
-      val inputIterator = iter.grouped(100).map { inputRows =>
-        val toBePickled = inputRows.map { inputRow =>
-          queue.add(inputRow)
-          val row = projection(inputRow)
-          if (needConversion) {
-            EvaluatePython.toJava(row, schema)
-          } else {
-            // fast path for these types that does not need conversion in 
Python
-            val fields = new Array[Any](row.numFields)
-            var i = 0
-            while (i < row.numFields) {
-              val dt = dataTypes(i)
-              fields(i) = EvaluatePython.toJava(row.get(i, dt), dt)
-              i += 1
-            }
-            fields
-          }
-        }.toArray
-        pickle.dumps(toBePickled)
-      }
-
-      val context = TaskContext.get()
-
-      // Output iterator for results from Python.
-      val outputIterator = new PythonRunner(pyFuncs, bufferSize, reuseWorker, 
true, argOffsets)
-        .compute(inputIterator, context.partitionId(), context)
-
-      val unpickle = new Unpickler
-      val mutableRow = new GenericMutableRow(1)
-      val joined = new JoinedRow
-      val resultType = if (udfs.length == 1) {
-        udfs.head.dataType
-      } else {
-        StructType(udfs.map(u => StructField("", u.dataType, u.nullable)))
-      }
-      val resultProj = UnsafeProjection.create(output, output)
-
-      outputIterator.flatMap { pickedResult =>
-        val unpickledBatch = unpickle.loads(pickedResult)
-        unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala
-      }.map { result =>
-        val row = if (udfs.length == 1) {
-          // fast path for single UDF
-          mutableRow(0) = EvaluatePython.fromJava(result, resultType)
-          mutableRow
-        } else {
-          EvaluatePython.fromJava(result, resultType).asInstanceOf[InternalRow]
-        }
-        resultProj(joined(queue.poll(), row))
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index d72b3d3..ab19236 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -79,7 +79,7 @@ private[spark] object ExtractPythonUDFs extends 
Rule[SparkPlan] {
           val resultAttrs = udfs.zipWithIndex.map { case (u, i) =>
             AttributeReference(s"pythonUDF$i", u.dataType)()
           }
-          val evaluation = BatchPythonEvaluation(validUdfs, child.output ++ 
resultAttrs, child)
+          val evaluation = BatchEvalPythonExec(validUdfs, child.output ++ 
resultAttrs, child)
           attributeMap ++= validUdfs.zip(resultAttrs)
           evaluation
         } else {
@@ -105,7 +105,7 @@ private[spark] object ExtractPythonUDFs extends 
Rule[SparkPlan] {
       val newPlan = extract(rewritten)
       if (newPlan.output != plan.output) {
         // Trim away the new UDF value if it was only used for filtering or 
something.
-        execution.Project(plan.output, newPlan)
+        execution.ProjectExec(plan.output, newPlan)
       } else {
         newPlan
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 81244ed..a1a1108 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -18,11 +18,10 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.analysis.{OutputMode, 
UnsupportedOperationChecker}
+import org.apache.spark.sql.catalyst.analysis.OutputMode
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, 
SparkPlanner, UnaryNode}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, 
SparkPlanner, UnaryExecNode}
 
 /**
  * A variant of [[QueryExecution]] that allows the execution of the given 
[[LogicalPlan]]
@@ -54,17 +53,17 @@ class IncrementalExecution(
   /** Locates save/restore pairs surrounding aggregation. */
   val state = new Rule[SparkPlan] {
     override def apply(plan: SparkPlan): SparkPlan = plan transform {
-      case StateStoreSave(keys, None,
-             UnaryNode(agg,
-               StateStoreRestore(keys2, None, child))) =>
+      case StateStoreSaveExec(keys, None,
+             UnaryExecNode(agg,
+               StateStoreRestoreExec(keys2, None, child))) =>
         val stateId = OperatorStateId(checkpointLocation, operatorId, 
currentBatchId - 1)
         operatorId += 1
 
-        StateStoreSave(
+        StateStoreSaveExec(
           keys,
           Some(stateId),
           agg.withNewChildren(
-            StateStoreRestore(
+            StateStoreRestoreExec(
               keys,
               Some(stateId),
               child) :: Nil))

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 5957747..de4305f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -50,10 +50,11 @@ trait StatefulOperator extends SparkPlan {
  * For each input tuple, the key is calculated and the value from the 
[[StateStore]] is added
  * to the stream (in addition to the input tuple) if present.
  */
-case class StateStoreRestore(
+case class StateStoreRestoreExec(
     keyExpressions: Seq[Attribute],
     stateId: Option[OperatorStateId],
-    child: SparkPlan) extends execution.UnaryNode with StatefulOperator {
+    child: SparkPlan)
+  extends execution.UnaryExecNode with StatefulOperator {
 
   override protected def doExecute(): RDD[InternalRow] = {
     child.execute().mapPartitionsWithStateStore(
@@ -78,10 +79,11 @@ case class StateStoreRestore(
 /**
  * For each input tuple, the key is calculated and the tuple is `put` into the 
[[StateStore]].
  */
-case class StateStoreSave(
+case class StateStoreSaveExec(
     keyExpressions: Seq[Attribute],
     stateId: Option[OperatorStateId],
-    child: SparkPlan) extends execution.UnaryNode with StatefulOperator {
+    child: SparkPlan)
+  extends execution.UnaryExecNode with StatefulOperator {
 
   override protected def doExecute(): RDD[InternalRow] = {
     child.execute().mapPartitionsWithStateStore(

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index c023cc5..1341e45 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -40,7 +40,7 @@ case class ScalarSubquery(
   override def withNewPlan(plan: LogicalPlan): SubqueryExpression = {
     throw new UnsupportedOperationException
   }
-  override def plan: SparkPlan = Subquery(simpleString, executedPlan)
+  override def plan: SparkPlan = SubqueryExec(simpleString, executedPlan)
 
   override def dataType: DataType = executedPlan.schema.fields.head.dataType
   override def children: Seq[Expression] = Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index c6fcb69..1959f1e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 
 import org.apache.commons.lang3.StringEscapeUtils
 
-import org.apache.spark.sql.execution.{SparkPlanInfo, WholeStageCodegen}
+import org.apache.spark.sql.execution.{SparkPlanInfo, WholeStageCodegenExec}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
@@ -178,7 +178,7 @@ private[ui] class SparkPlanGraphCluster(
   extends SparkPlanGraphNode(id, name, desc, Map.empty, metrics) {
 
   override def makeDotNode(metricsValue: Map[Long, String]): String = {
-    val duration = 
metrics.filter(_.name.startsWith(WholeStageCodegen.PIPELINE_DURATION_METRIC))
+    val duration = 
metrics.filter(_.name.startsWith(WholeStageCodegenExec.PIPELINE_DURATION_METRIC))
     val labelStr = if (duration.nonEmpty) {
       require(duration.length == 1)
       val id = duration(0).accumulatorId

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 82b79c7..4aea21e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -23,7 +23,7 @@ import scala.language.postfixOps
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark.Accumulators
-import org.apache.spark.sql.execution.PhysicalRDD
+import org.apache.spark.sql.execution.RDDScanExec
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.functions._
@@ -38,7 +38,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
   def rddIdOf(tableName: String): Int = {
     val plan = sqlContext.table(tableName).queryExecution.sparkPlan
     plan.collect {
-      case InMemoryColumnarTableScan(_, _, relation) =>
+      case InMemoryTableScanExec(_, _, relation) =>
         relation.cachedColumnBuffers.id
       case _ =>
         fail(s"Table $tableName is not cached\n" + plan)
@@ -167,7 +167,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
     sqlContext.cacheTable("testData")
     assertResult(0, "Double InMemoryRelations found, cacheTable() is not 
idempotent") {
       sqlContext.table("testData").queryExecution.withCachedData.collect {
-        case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan, _) 
=> r
+        case r @ InMemoryRelation(_, _, _, _, _: InMemoryTableScanExec, _) => r
       }.size
     }
 
@@ -351,8 +351,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
         |abc a join abc b on a.key=b.key
         |join abc c on a.key=c.key""".stripMargin).queryExecution.sparkPlan
 
-    assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size 
=== 3)
-    assert(sparkPlan.collect { case e: PhysicalRDD => e }.size === 0)
+    assert(sparkPlan.collect { case e: InMemoryTableScanExec => e }.size === 3)
+    assert(sparkPlan.collect { case e: RDDScanExec => e }.size === 0)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 351b03b..19fe29a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import org.scalatest.Matchers._
 
 import org.apache.spark.sql.catalyst.expressions.NamedExpression
-import org.apache.spark.sql.execution.Project
+import org.apache.spark.sql.execution.ProjectExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -631,7 +631,7 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 
     def checkNumProjects(df: DataFrame, expectedNumProjects: Int): Unit = {
       val projects = df.queryExecution.sparkPlan.collect {
-        case tungstenProject: Project => tungstenProject
+        case tungstenProject: ProjectExec => tungstenProject
       }
       assert(projects.size === expectedNumProjects)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 067a62d..0414fa1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.plans.logical.Join
-import org.apache.spark.sql.execution.joins.BroadcastHashJoin
+import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -142,11 +142,11 @@ class DataFrameJoinSuite extends QueryTest with 
SharedSQLContext {
 
     // equijoin - should be converted into broadcast join
     val plan1 = df1.join(broadcast(df2), "key").queryExecution.sparkPlan
-    assert(plan1.collect { case p: BroadcastHashJoin => p }.size === 1)
+    assert(plan1.collect { case p: BroadcastHashJoinExec => p }.size === 1)
 
     // no join key -- should not be a broadcast join
     val plan2 = df1.join(broadcast(df2)).queryExecution.sparkPlan
-    assert(plan2.collect { case p: BroadcastHashJoin => p }.size === 0)
+    assert(plan2.collect { case p: BroadcastHashJoinExec => p }.size === 0)
 
     // planner should not crash without a join
     broadcast(df1).queryExecution.sparkPlan

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index e953a6e..4c18784 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.aggregate.TungstenAggregate
-import org.apache.spark.sql.execution.exchange.{BroadcastExchange, 
ReusedExchange, ShuffleExchange}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchange}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, 
SharedSQLContext}
@@ -1355,16 +1355,18 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
       checkAnswer(join, df)
       assert(
         join.queryExecution.executedPlan.collect { case e: ShuffleExchange => 
true }.size === 1)
-      assert(join.queryExecution.executedPlan.collect { case e: ReusedExchange 
=> true }.size === 1)
+      assert(
+        join.queryExecution.executedPlan.collect { case e: ReusedExchangeExec 
=> true }.size === 1)
       val broadcasted = broadcast(join)
       val join2 = join.join(broadcasted, "id").join(broadcasted, "id")
       checkAnswer(join2, df)
       assert(
         join2.queryExecution.executedPlan.collect { case e: ShuffleExchange => 
true }.size === 1)
       assert(
-        join2.queryExecution.executedPlan.collect { case e: BroadcastExchange 
=> true }.size === 1)
+        join2.queryExecution.executedPlan
+          .collect { case e: BroadcastExchangeExec => true }.size === 1)
       assert(
-        join2.queryExecution.executedPlan.collect { case e: ReusedExchange => 
true }.size === 4)
+        join2.queryExecution.executedPlan.collect { case e: ReusedExchangeExec 
=> true }.size === 4)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index a87a41c..9e5a41d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -46,11 +46,11 @@ class JoinSuite extends QueryTest with SharedSQLContext {
     val df = sql(sqlString)
     val physical = df.queryExecution.sparkPlan
     val operators = physical.collect {
-      case j: BroadcastHashJoin => j
-      case j: ShuffledHashJoin => j
-      case j: CartesianProduct => j
-      case j: BroadcastNestedLoopJoin => j
-      case j: SortMergeJoin => j
+      case j: BroadcastHashJoinExec => j
+      case j: ShuffledHashJoinExec => j
+      case j: CartesianProductExec => j
+      case j: BroadcastNestedLoopJoinExec => j
+      case j: SortMergeJoinExec => j
     }
 
     assert(operators.size === 1)
@@ -64,39 +64,43 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 
     withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0") {
       Seq(
-        ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", 
classOf[ShuffledHashJoin]),
-        ("SELECT * FROM testData LEFT SEMI JOIN testData2", 
classOf[BroadcastNestedLoopJoin]),
-        ("SELECT * FROM testData JOIN testData2", classOf[CartesianProduct]),
-        ("SELECT * FROM testData JOIN testData2 WHERE key = 2", 
classOf[CartesianProduct]),
-        ("SELECT * FROM testData LEFT JOIN testData2", 
classOf[BroadcastNestedLoopJoin]),
-        ("SELECT * FROM testData RIGHT JOIN testData2", 
classOf[BroadcastNestedLoopJoin]),
-        ("SELECT * FROM testData FULL OUTER JOIN testData2", 
classOf[BroadcastNestedLoopJoin]),
+        ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
+          classOf[ShuffledHashJoinExec]),
+        ("SELECT * FROM testData LEFT SEMI JOIN testData2", 
classOf[BroadcastNestedLoopJoinExec]),
+        ("SELECT * FROM testData JOIN testData2", 
classOf[CartesianProductExec]),
+        ("SELECT * FROM testData JOIN testData2 WHERE key = 2", 
classOf[CartesianProductExec]),
+        ("SELECT * FROM testData LEFT JOIN testData2", 
classOf[BroadcastNestedLoopJoinExec]),
+        ("SELECT * FROM testData RIGHT JOIN testData2", 
classOf[BroadcastNestedLoopJoinExec]),
+        ("SELECT * FROM testData FULL OUTER JOIN testData2", 
classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData LEFT JOIN testData2 WHERE key = 2",
-          classOf[BroadcastNestedLoopJoin]),
-        ("SELECT * FROM testData RIGHT JOIN testData2 WHERE key = 2", 
classOf[CartesianProduct]),
+          classOf[BroadcastNestedLoopJoinExec]),
+        ("SELECT * FROM testData RIGHT JOIN testData2 WHERE key = 2",
+          classOf[CartesianProductExec]),
         ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2",
-          classOf[BroadcastNestedLoopJoin]),
-        ("SELECT * FROM testData JOIN testData2 WHERE key > a", 
classOf[CartesianProduct]),
+          classOf[BroadcastNestedLoopJoinExec]),
+        ("SELECT * FROM testData JOIN testData2 WHERE key > a", 
classOf[CartesianProductExec]),
         ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a",
-          classOf[CartesianProduct]),
-        ("SELECT * FROM testData JOIN testData2 ON key = a", 
classOf[SortMergeJoin]),
-        ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", 
classOf[SortMergeJoin]),
-        ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", 
classOf[SortMergeJoin]),
-        ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", 
classOf[SortMergeJoin]),
+          classOf[CartesianProductExec]),
+        ("SELECT * FROM testData JOIN testData2 ON key = a", 
classOf[SortMergeJoinExec]),
+        ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2",
+          classOf[SortMergeJoinExec]),
+        ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2",
+          classOf[SortMergeJoinExec]),
+        ("SELECT * FROM testData LEFT JOIN testData2 ON key = a", 
classOf[SortMergeJoinExec]),
         ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 
2",
-          classOf[SortMergeJoin]),
+          classOf[SortMergeJoinExec]),
         ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
-          classOf[SortMergeJoin]),
+          classOf[SortMergeJoinExec]),
         ("SELECT * FROM testData full outer join testData2 ON key = a",
-          classOf[SortMergeJoin]),
+          classOf[SortMergeJoinExec]),
         ("SELECT * FROM testData left JOIN testData2 ON (key * a != key + a)",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData right JOIN testData2 ON (key * a != key + a)",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData full JOIN testData2 ON (key * a != key + a)",
-          classOf[BroadcastNestedLoopJoin]),
-        ("SELECT * FROM testData ANTI JOIN testData2 ON key = a", 
classOf[ShuffledHashJoin]),
-        ("SELECT * FROM testData LEFT ANTI JOIN testData2", 
classOf[BroadcastNestedLoopJoin])
+          classOf[BroadcastNestedLoopJoinExec]),
+        ("SELECT * FROM testData ANTI JOIN testData2 ON key = a", 
classOf[ShuffledHashJoinExec]),
+        ("SELECT * FROM testData LEFT ANTI JOIN testData2", 
classOf[BroadcastNestedLoopJoinExec])
       ).foreach(assertJoin)
     }
   }
@@ -112,11 +116,11 @@ class JoinSuite extends QueryTest with SharedSQLContext {
     sql("CACHE TABLE testData")
     Seq(
       ("SELECT * FROM testData join testData2 ON key = a",
-        classOf[BroadcastHashJoin]),
+        classOf[BroadcastHashJoinExec]),
       ("SELECT * FROM testData join testData2 ON key = a and key = 2",
-        classOf[BroadcastHashJoin]),
+        classOf[BroadcastHashJoinExec]),
       ("SELECT * FROM testData join testData2 ON key = a where key = 2",
-        classOf[BroadcastHashJoin])
+        classOf[BroadcastHashJoinExec])
     ).foreach(assertJoin)
     sql("UNCACHE TABLE testData")
   }
@@ -127,11 +131,11 @@ class JoinSuite extends QueryTest with SharedSQLContext {
     sql("CACHE TABLE testData2")
     Seq(
       ("SELECT * FROM testData LEFT JOIN testData2 ON key = a",
-        classOf[BroadcastHashJoin]),
+        classOf[BroadcastHashJoinExec]),
       ("SELECT * FROM testData RIGHT JOIN testData2 ON key = a where key = 2",
-        classOf[BroadcastHashJoin]),
+        classOf[BroadcastHashJoinExec]),
       ("SELECT * FROM testData right join testData2 ON key = a and key = 2",
-        classOf[BroadcastHashJoin])
+        classOf[BroadcastHashJoinExec])
     ).foreach(assertJoin)
     sql("UNCACHE TABLE testData")
   }
@@ -428,15 +432,18 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1000000000") {
       Seq(
-        ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", 
classOf[BroadcastHashJoin]),
-        ("SELECT * FROM testData ANT JOIN testData2 ON key = a", 
classOf[BroadcastHashJoin])
+        ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
+          classOf[BroadcastHashJoinExec]),
+        ("SELECT * FROM testData ANT JOIN testData2 ON key = a", 
classOf[BroadcastHashJoinExec])
       ).foreach(assertJoin)
     }
 
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
       Seq(
-        ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", 
classOf[ShuffledHashJoin]),
-        ("SELECT * FROM testData LEFT ANTI JOIN testData2 ON key = a", 
classOf[ShuffledHashJoin])
+        ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
+          classOf[ShuffledHashJoinExec]),
+        ("SELECT * FROM testData LEFT ANTI JOIN testData2 ON key = a",
+          classOf[ShuffledHashJoinExec])
       ).foreach(assertJoin)
     }
 
@@ -460,35 +467,35 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 
       Seq(
         ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
-          classOf[ShuffledHashJoin]),
+          classOf[ShuffledHashJoinExec]),
         ("SELECT * FROM testData LEFT SEMI JOIN testData2",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData JOIN testData2",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData JOIN testData2 WHERE key = 2",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData LEFT JOIN testData2",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData RIGHT JOIN testData2",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData FULL OUTER JOIN testData2",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData LEFT JOIN testData2 WHERE key = 2",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData RIGHT JOIN testData2 WHERE key = 2",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key = 2",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData JOIN testData2 WHERE key > a",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData FULL OUTER JOIN testData2 WHERE key > a",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData left JOIN testData2 WHERE (key * a != key + 
a)",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData right JOIN testData2 WHERE (key * a != key + 
a)",
-          classOf[BroadcastNestedLoopJoin]),
+          classOf[BroadcastNestedLoopJoinExec]),
         ("SELECT * FROM testData full JOIN testData2 WHERE (key * a != key + 
a)",
-          classOf[BroadcastNestedLoopJoin])
+          classOf[BroadcastNestedLoopJoinExec])
       ).foreach(assertJoin)
 
       checkAnswer(

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 9e64049..84f0c00 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.catalyst.plans.logical.Aggregate
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.execution.aggregate
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, 
CartesianProduct, SortMergeJoin}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
CartesianProductExec, SortMergeJoinExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
@@ -866,12 +866,12 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   test("SPARK-11111 null-safe join should not use cartesian product") {
     val df = sql("select count(*) from testData a join testData b on (a.key 
<=> b.key)")
     val cp = df.queryExecution.sparkPlan.collect {
-      case cp: CartesianProduct => cp
+      case cp: CartesianProductExec => cp
     }
     assert(cp.isEmpty, "should not use CartesianProduct for null-safe join")
     val smj = df.queryExecution.sparkPlan.collect {
-      case smj: SortMergeJoin => smj
-      case j: BroadcastHashJoin => j
+      case smj: SortMergeJoinExec => smj
+      case j: BroadcastHashJoinExec => j
     }
     assert(smj.size > 0, "should use SortMergeJoin or BroadcastHashJoin")
     checkAnswer(df, Row(100) :: Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
index 17f2343..ba16810 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
IdentityBroadcastMode, SinglePartition}
-import org.apache.spark.sql.execution.exchange.{BroadcastExchange, 
ReusedExchange, ShuffleExchange}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchange}
 import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -55,13 +55,13 @@ class ExchangeSuite extends SparkPlanTest with 
SharedSQLContext {
     val output = plan.output
     assert(plan sameResult plan)
 
-    val exchange1 = BroadcastExchange(IdentityBroadcastMode, plan)
+    val exchange1 = BroadcastExchangeExec(IdentityBroadcastMode, plan)
     val hashMode = HashedRelationBroadcastMode(output)
-    val exchange2 = BroadcastExchange(hashMode, plan)
+    val exchange2 = BroadcastExchangeExec(hashMode, plan)
     val hashMode2 =
       HashedRelationBroadcastMode(Alias(output.head, "id2")() :: Nil)
-    val exchange3 = BroadcastExchange(hashMode2, plan)
-    val exchange4 = ReusedExchange(output, exchange3)
+    val exchange3 = BroadcastExchangeExec(hashMode2, plan)
+    val exchange4 = ReusedExchangeExec(output, exchange3)
 
     assert(exchange1 sameResult exchange1)
     assert(exchange2 sameResult exchange2)
@@ -87,7 +87,7 @@ class ExchangeSuite extends SparkPlanTest with 
SharedSQLContext {
     val exchange3 = ShuffleExchange(part2, plan)
     val part3 = HashPartitioning(output ++ output, 2)
     val exchange4 = ShuffleExchange(part3, plan)
-    val exchange5 = ReusedExchange(output, exchange4)
+    val exchange5 = ReusedExchangeExec(output, exchange4)
 
     assert(exchange1 sameResult exchange1)
     assert(exchange2 sameResult exchange2)

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index bdbcf84..3b2911d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
-import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReusedExchange, ReuseExchange, ShuffleExchange}
-import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
+import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReusedExchangeExec, ReuseExchange, ShuffleExchange}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -86,8 +86,8 @@ class PlannerSuite extends SharedSQLContext {
             |FROM testData2 l JOIN (SELECT * FROM testLimit LIMIT 1) r ON (l.a 
= r.key)
           """.stripMargin).queryExecution.sparkPlan
 
-        val broadcastHashJoins = planned.collect { case join: 
BroadcastHashJoin => join }
-        val sortMergeJoins = planned.collect { case join: SortMergeJoin => 
join }
+        val broadcastHashJoins = planned.collect { case join: 
BroadcastHashJoinExec => join }
+        val sortMergeJoins = planned.collect { case join: SortMergeJoinExec => 
join }
 
         assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
         assert(sortMergeJoins.isEmpty, "Should not use sort merge join")
@@ -139,8 +139,8 @@ class PlannerSuite extends SharedSQLContext {
         val b = sqlContext.table("tiny").as("b")
         val planned = a.join(b, $"a.key" === $"b.key").queryExecution.sparkPlan
 
-        val broadcastHashJoins = planned.collect { case join: 
BroadcastHashJoin => join }
-        val sortMergeJoins = planned.collect { case join: SortMergeJoin => 
join }
+        val broadcastHashJoins = planned.collect { case join: 
BroadcastHashJoinExec => join }
+        val sortMergeJoins = planned.collect { case join: SortMergeJoinExec => 
join }
 
         assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
         assert(sortMergeJoins.isEmpty, "Should not use shuffled hash join")
@@ -167,34 +167,34 @@ class PlannerSuite extends SharedSQLContext {
   test("efficient terminal limit -> sort should use TakeOrderedAndProject") {
     val query = testData.select('key, 'value).sort('key).limit(2)
     val planned = query.queryExecution.executedPlan
-    assert(planned.isInstanceOf[execution.TakeOrderedAndProject])
+    assert(planned.isInstanceOf[execution.TakeOrderedAndProjectExec])
     assert(planned.output === testData.select('key, 'value).logicalPlan.output)
   }
 
   test("terminal limit -> project -> sort should use TakeOrderedAndProject") {
     val query = testData.select('key, 'value).sort('key).select('value, 
'key).limit(2)
     val planned = query.queryExecution.executedPlan
-    assert(planned.isInstanceOf[execution.TakeOrderedAndProject])
+    assert(planned.isInstanceOf[execution.TakeOrderedAndProjectExec])
     assert(planned.output === testData.select('value, 'key).logicalPlan.output)
   }
 
   test("terminal limits that are not handled by TakeOrderedAndProject should 
use CollectLimit") {
     val query = testData.select('value).limit(2)
     val planned = query.queryExecution.sparkPlan
-    assert(planned.isInstanceOf[CollectLimit])
+    assert(planned.isInstanceOf[CollectLimitExec])
     assert(planned.output === testData.select('value).logicalPlan.output)
   }
 
   test("TakeOrderedAndProject can appear in the middle of plans") {
     val query = testData.select('key, 'value).sort('key).limit(2).filter('key 
=== 3)
     val planned = query.queryExecution.executedPlan
-    assert(planned.find(_.isInstanceOf[TakeOrderedAndProject]).isDefined)
+    assert(planned.find(_.isInstanceOf[TakeOrderedAndProjectExec]).isDefined)
   }
 
   test("CollectLimit can appear in the middle of a plan when caching is used") 
{
     val query = testData.select('key, 'value).limit(2).cache()
     val planned = 
query.queryExecution.optimizedPlan.asInstanceOf[InMemoryRelation]
-    assert(planned.child.isInstanceOf[CollectLimit])
+    assert(planned.child.isInstanceOf[CollectLimitExec])
   }
 
   test("PartitioningCollection") {
@@ -394,7 +394,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = 
EnsureRequirements(sqlContext.sessionState.conf).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case s: Sort => true }.isEmpty) {
+    if (outputPlan.collect { case s: SortExec => true }.isEmpty) {
       fail(s"Sort should have been added:\n$outputPlan")
     }
   }
@@ -410,7 +410,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = 
EnsureRequirements(sqlContext.sessionState.conf).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case s: Sort => true }.nonEmpty) {
+    if (outputPlan.collect { case s: SortExec => true }.nonEmpty) {
       fail(s"No sorts should have been added:\n$outputPlan")
     }
   }
@@ -427,7 +427,7 @@ class PlannerSuite extends SharedSQLContext {
     )
     val outputPlan = 
EnsureRequirements(sqlContext.sessionState.conf).apply(inputPlan)
     assertDistributionRequirementsAreSatisfied(outputPlan)
-    if (outputPlan.collect { case s: Sort => true }.isEmpty) {
+    if (outputPlan.collect { case s: SortExec => true }.isEmpty) {
       fail(s"Sort should have been added:\n$outputPlan")
     }
   }
@@ -485,7 +485,7 @@ class PlannerSuite extends SharedSQLContext {
         requiredChildOrdering = Seq(Seq.empty)),
       None)
 
-    val inputPlan = SortMergeJoin(
+    val inputPlan = SortMergeJoinExec(
         Literal(1) :: Nil,
         Literal(1) :: Nil,
         Inner,
@@ -494,7 +494,7 @@ class PlannerSuite extends SharedSQLContext {
         shuffle)
 
     val outputPlan = 
ReuseExchange(sqlContext.sessionState.conf).apply(inputPlan)
-    if (outputPlan.collect { case e: ReusedExchange => true }.size != 1) {
+    if (outputPlan.collect { case e: ReusedExchangeExec => true }.size != 1) {
       fail(s"Should re-use the shuffle:\n$outputPlan")
     }
     if (outputPlan.collect { case e: ShuffleExchange => true }.size != 1) {
@@ -502,7 +502,7 @@ class PlannerSuite extends SharedSQLContext {
     }
 
     // nested exchanges
-    val inputPlan2 = SortMergeJoin(
+    val inputPlan2 = SortMergeJoinExec(
       Literal(1) :: Nil,
       Literal(1) :: Nil,
       Inner,
@@ -511,7 +511,7 @@ class PlannerSuite extends SharedSQLContext {
       ShuffleExchange(finalPartitioning, inputPlan))
 
     val outputPlan2 = 
ReuseExchange(sqlContext.sessionState.conf).apply(inputPlan2)
-    if (outputPlan2.collect { case e: ReusedExchange => true }.size != 2) {
+    if (outputPlan2.collect { case e: ReusedExchangeExec => true }.size != 2) {
       fail(s"Should re-use the two shuffles:\n$outputPlan2")
     }
     if (outputPlan2.collect { case e: ShuffleExchange => true }.size != 2) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
index 2963a85..a19ea51 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
@@ -34,7 +34,7 @@ case class ReferenceSort(
     sortOrder: Seq[SortOrder],
     global: Boolean,
     child: SparkPlan)
-  extends UnaryNode {
+  extends UnaryExecNode {
 
   override def requiredChildDistribution: Seq[Distribution] =
     if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 7784776..ebeb39b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -43,13 +43,13 @@ class SortSuite extends SparkPlanTest with SharedSQLContext 
{
 
     checkAnswer(
       input.toDF("a", "b", "c"),
-      (child: SparkPlan) => Sort('a.asc :: 'b.asc :: Nil, global = true, child 
= child),
+      (child: SparkPlan) => SortExec('a.asc :: 'b.asc :: Nil, global = true, 
child = child),
       input.sortBy(t => (t._1, t._2)).map(Row.fromTuple),
       sortAnswers = false)
 
     checkAnswer(
       input.toDF("a", "b", "c"),
-      (child: SparkPlan) => Sort('b.asc :: 'a.asc :: Nil, global = true, child 
= child),
+      (child: SparkPlan) => SortExec('b.asc :: 'a.asc :: Nil, global = true, 
child = child),
       input.sortBy(t => (t._2, t._1)).map(Row.fromTuple),
       sortAnswers = false)
   }
@@ -57,8 +57,10 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
   test("sort followed by limit") {
     checkThatPlansAgree(
       (1 to 100).map(v => Tuple1(v)).toDF("a"),
-      (child: SparkPlan) => GlobalLimit(10, Sort('a.asc :: Nil, global = true, 
child = child)),
-      (child: SparkPlan) => GlobalLimit(10, ReferenceSort('a.asc :: Nil, 
global = true, child)),
+      (child: SparkPlan) =>
+        GlobalLimitExec(10, SortExec('a.asc :: Nil, global = true, child = 
child)),
+      (child: SparkPlan) =>
+        GlobalLimitExec(10, ReferenceSort('a.asc :: Nil, global = true, 
child)),
       sortAnswers = false
     )
   }
@@ -68,7 +70,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
     val stringLength = 1024 * 1024 * 2
     checkThatPlansAgree(
       Seq(Tuple1("a" * stringLength), Tuple1("b" * 
stringLength)).toDF("a").repartition(1),
-      Sort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
+      SortExec(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1),
       ReferenceSort(sortOrder, global = true, _: SparkPlan),
       sortAnswers = false
     )
@@ -78,7 +80,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
     AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "unsafe 
external sort") {
       checkThatPlansAgree(
         (1 to 100).map(v => Tuple1(v)).toDF("a"),
-        (child: SparkPlan) => Sort('a.asc :: Nil, global = true, child = 
child),
+        (child: SparkPlan) => SortExec('a.asc :: Nil, global = true, child = 
child),
         (child: SparkPlan) => ReferenceSort('a.asc :: Nil, global = true, 
child),
         sortAnswers = false)
     }
@@ -99,7 +101,7 @@ class SortSuite extends SparkPlanTest with SharedSQLContext {
       )
       checkThatPlansAgree(
         inputDf,
-        p => Sort(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 
23),
+        p => SortExec(sortOrder, global = true, p: SparkPlan, 
testSpillFrequency = 23),
         ReferenceSort(sortOrder, global = true, _: SparkPlan),
         sortAnswers = false
       )

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
index a4c6d07..fba04d0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
@@ -49,7 +49,7 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with 
SharedSQLContext {
    * Adds a no-op filter to the child plan in order to prevent 
executeCollect() from being
    * called directly on the child plan.
    */
-  private def noOpFilter(plan: SparkPlan): SparkPlan = Filter(Literal(true), 
plan)
+  private def noOpFilter(plan: SparkPlan): SparkPlan = 
FilterExec(Literal(true), plan)
 
   val limit = 250
   val sortOrder = 'a.desc :: 'b.desc :: Nil
@@ -59,11 +59,11 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with 
SharedSQLContext {
       checkThatPlansAgree(
         generateRandomInputData(),
         input =>
-          noOpFilter(TakeOrderedAndProject(limit, sortOrder, None, input)),
+          noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, None, input)),
         input =>
-          GlobalLimit(limit,
-            LocalLimit(limit,
-              Sort(sortOrder, true, input))),
+          GlobalLimitExec(limit,
+            LocalLimitExec(limit,
+              SortExec(sortOrder, true, input))),
         sortAnswers = false)
     }
   }
@@ -73,12 +73,13 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with 
SharedSQLContext {
       checkThatPlansAgree(
         generateRandomInputData(),
         input =>
-          noOpFilter(TakeOrderedAndProject(limit, sortOrder, 
Some(Seq(input.output.last)), input)),
+          noOpFilter(
+            TakeOrderedAndProjectExec(limit, sortOrder, 
Some(Seq(input.output.last)), input)),
         input =>
-          GlobalLimit(limit,
-            LocalLimit(limit,
-              Project(Seq(input.output.last),
-                Sort(sortOrder, true, input)))),
+          GlobalLimitExec(limit,
+            LocalLimitExec(limit,
+              ProjectExec(Seq(input.output.last),
+                SortExec(sortOrder, true, input)))),
         sortAnswers = false)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index d7cf1dc..233104a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.aggregate.TungstenAggregate
-import org.apache.spark.sql.execution.joins.BroadcastHashJoin
+import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.expressions.scala.typed
 import org.apache.spark.sql.functions.{avg, broadcast, col, max}
 import org.apache.spark.sql.test.SharedSQLContext
@@ -30,7 +30,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
   test("range/filter should be combined") {
     val df = sqlContext.range(10).filter("id = 1").selectExpr("id + 1")
     val plan = df.queryExecution.executedPlan
-    assert(plan.find(_.isInstanceOf[WholeStageCodegen]).isDefined)
+    assert(plan.find(_.isInstanceOf[WholeStageCodegenExec]).isDefined)
     assert(df.collect() === Array(Row(2)))
   }
 
@@ -38,8 +38,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
     val df = sqlContext.range(10).groupBy().agg(max(col("id")), avg(col("id")))
     val plan = df.queryExecution.executedPlan
     assert(plan.find(p =>
-      p.isInstanceOf[WholeStageCodegen] &&
-        
p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[TungstenAggregate]).isDefined)
+      p.isInstanceOf[WholeStageCodegenExec] &&
+        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined)
     assert(df.collect() === Array(Row(9, 4.5)))
   }
 
@@ -47,8 +47,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
     val df = sqlContext.range(3).groupBy("id").count().orderBy("id")
     val plan = df.queryExecution.executedPlan
     assert(plan.find(p =>
-      p.isInstanceOf[WholeStageCodegen] &&
-        
p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[TungstenAggregate]).isDefined)
+      p.isInstanceOf[WholeStageCodegenExec] &&
+        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined)
     assert(df.collect() === Array(Row(0, 1), Row(1, 1), Row(2, 1)))
   }
 
@@ -58,8 +58,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
     val smallDF = sqlContext.createDataFrame(rdd, schema)
     val df = sqlContext.range(10).join(broadcast(smallDF), col("k") === 
col("id"))
     assert(df.queryExecution.executedPlan.find(p =>
-      p.isInstanceOf[WholeStageCodegen] &&
-        
p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[BroadcastHashJoin]).isDefined)
+      p.isInstanceOf[WholeStageCodegenExec] &&
+        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[BroadcastHashJoinExec]).isDefined)
     assert(df.collect() === Array(Row(1, 1, "1"), Row(1, 1, "1"), Row(2, 2, 
"2")))
   }
 
@@ -67,8 +67,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
     val df = sqlContext.range(3, 0, -1).toDF().sort(col("id"))
     val plan = df.queryExecution.executedPlan
     assert(plan.find(p =>
-      p.isInstanceOf[WholeStageCodegen] &&
-        p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[Sort]).isDefined)
+      p.isInstanceOf[WholeStageCodegenExec] &&
+        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]).isDefined)
     assert(df.collect() === Array(Row(1), Row(2), Row(3)))
   }
 
@@ -78,8 +78,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
     val ds = sqlContext.range(10).map(_.toString)
     val plan = ds.queryExecution.executedPlan
     assert(plan.find(p =>
-      p.isInstanceOf[WholeStageCodegen] &&
-        
p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[SerializeFromObject]).isDefined)
+      p.isInstanceOf[WholeStageCodegenExec] &&
+      
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]).isDefined)
     assert(ds.collect() === 0.until(10).map(_.toString).toArray)
   }
 
@@ -87,8 +87,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
     val ds = sqlContext.range(10).filter(_ % 2 == 0)
     val plan = ds.queryExecution.executedPlan
     assert(plan.find(p =>
-      p.isInstanceOf[WholeStageCodegen] &&
-        
p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[Filter]).isDefined)
+      p.isInstanceOf[WholeStageCodegenExec] &&
+        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[FilterExec]).isDefined)
     assert(ds.collect() === Array(0, 2, 4, 6, 8))
   }
 
@@ -96,8 +96,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
     val ds = sqlContext.range(10).filter(_ % 2 == 0).filter(_ % 3 == 0)
     val plan = ds.queryExecution.executedPlan
     assert(plan.find(p =>
-      p.isInstanceOf[WholeStageCodegen] &&
-        
p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[SerializeFromObject]).isDefined)
+      p.isInstanceOf[WholeStageCodegenExec] &&
+      
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SerializeFromObjectExec]).isDefined)
     assert(ds.collect() === Array(0, 6))
   }
 
@@ -109,8 +109,8 @@ class WholeStageCodegenSuite extends SparkPlanTest with 
SharedSQLContext {
 
     val plan = ds.queryExecution.executedPlan
     assert(plan.find(p =>
-      p.isInstanceOf[WholeStageCodegen] &&
-        
p.asInstanceOf[WholeStageCodegen].child.isInstanceOf[TungstenAggregate]).isDefined)
+      p.isInstanceOf[WholeStageCodegenExec] &&
+        
p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[TungstenAggregate]).isDefined)
     assert(ds.collect() === Array(("a", 10.0), ("b", 3.0), ("c", 1.0)))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
index 4f185ed..9164074 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/PartitionBatchPruningSuite.scala
@@ -133,7 +133,7 @@ class PartitionBatchPruningSuite
       }
 
       val (readPartitions, readBatches) = df.queryExecution.sparkPlan.collect {
-        case in: InMemoryColumnarTableScan => (in.readPartitions.value, 
in.readBatches.value)
+        case in: InMemoryTableScanExec => (in.readPartitions.value, 
in.readBatches.value)
       }.head
 
       assert(readBatches === expectedReadBatches, s"Wrong number of read 
batches: $queryExecution")

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 50cd03a..fb70dbd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, 
PredicateHelper}
 import org.apache.spark.sql.catalyst.util
-import org.apache.spark.sql.execution.DataSourceScan
+import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
@@ -375,7 +375,7 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
   def getPhysicalFilters(df: DataFrame): ExpressionSet = {
     ExpressionSet(
       df.queryExecution.executedPlan.collect {
-        case execution.Filter(f, _) => splitConjunctivePredicates(f)
+        case execution.FilterExec(f, _) => splitConjunctivePredicates(f)
       }.flatten)
   }
 
@@ -422,7 +422,7 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
 
   def getFileScanRDD(df: DataFrame): FileScanRDD = {
     df.queryExecution.executedPlan.collect {
-      case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] =>
+      case scan: DataSourceScanExec if scan.rdd.isInstanceOf[FileScanRDD] =>
         scan.rdd.asInstanceOf[FileScanRDD]
     }.headOption.getOrElse {
       fail(s"No FileScan in query\n${df.queryExecution}")

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index babe7ef..b9df43d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -71,15 +71,15 @@ class BroadcastJoinSuite extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("unsafe broadcast hash join updates peak execution memory") {
-    testBroadcastJoin[BroadcastHashJoin]("unsafe broadcast hash join", "inner")
+    testBroadcastJoin[BroadcastHashJoinExec]("unsafe broadcast hash join", 
"inner")
   }
 
   test("unsafe broadcast hash outer join updates peak execution memory") {
-    testBroadcastJoin[BroadcastHashJoin]("unsafe broadcast hash outer join", 
"left_outer")
+    testBroadcastJoin[BroadcastHashJoinExec]("unsafe broadcast hash outer 
join", "left_outer")
   }
 
   test("unsafe broadcast left semi join updates peak execution memory") {
-    testBroadcastJoin[BroadcastHashJoin]("unsafe broadcast left semi join", 
"leftsemi")
+    testBroadcastJoin[BroadcastHashJoinExec]("unsafe broadcast left semi 
join", "leftsemi")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
index 8cdfa8a..bc838ee 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala
@@ -83,7 +83,7 @@ class ExistenceJoinSuite extends SparkPlanTest with 
SharedSQLContext {
         withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
           checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: 
SparkPlan) =>
             EnsureRequirements(left.sqlContext.sessionState.conf).apply(
-              ShuffledHashJoin(
+              ShuffledHashJoinExec(
                 leftKeys, rightKeys, joinType, BuildRight, boundCondition, 
left, right)),
             expectedAnswer,
             sortAnswers = true)
@@ -96,7 +96,7 @@ class ExistenceJoinSuite extends SparkPlanTest with 
SharedSQLContext {
         withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
           checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: 
SparkPlan) =>
             EnsureRequirements(left.sqlContext.sessionState.conf).apply(
-              BroadcastHashJoin(
+              BroadcastHashJoinExec(
                 leftKeys, rightKeys, joinType, BuildRight, boundCondition, 
left, right)),
             expectedAnswer,
             sortAnswers = true)
@@ -108,7 +108,7 @@ class ExistenceJoinSuite extends SparkPlanTest with 
SharedSQLContext {
       withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
         checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) 
=>
           EnsureRequirements(left.sqlContext.sessionState.conf).apply(
-            BroadcastNestedLoopJoin(left, right, BuildLeft, joinType, 
Some(condition))),
+            BroadcastNestedLoopJoinExec(left, right, BuildLeft, joinType, 
Some(condition))),
           expectedAnswer,
           sortAnswers = true)
       }
@@ -118,7 +118,7 @@ class ExistenceJoinSuite extends SparkPlanTest with 
SharedSQLContext {
       withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
         checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) 
=>
           EnsureRequirements(left.sqlContext.sessionState.conf).apply(
-            BroadcastNestedLoopJoin(left, right, BuildRight, joinType, 
Some(condition))),
+            BroadcastNestedLoopJoinExec(left, right, BuildRight, joinType, 
Some(condition))),
           expectedAnswer,
           sortAnswers = true)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
index 3cb3ef1..933f32e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala
@@ -91,7 +91,7 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
         leftPlan: SparkPlan,
         rightPlan: SparkPlan,
         side: BuildSide) = {
-      val broadcastJoin = joins.BroadcastHashJoin(
+      val broadcastJoin = joins.BroadcastHashJoinExec(
         leftKeys,
         rightKeys,
         Inner,
@@ -110,9 +110,9 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
         rightPlan: SparkPlan,
         side: BuildSide) = {
       val shuffledHashJoin =
-        joins.ShuffledHashJoin(leftKeys, rightKeys, Inner, side, None, 
leftPlan, rightPlan)
+        joins.ShuffledHashJoinExec(leftKeys, rightKeys, Inner, side, None, 
leftPlan, rightPlan)
       val filteredJoin =
-        boundCondition.map(Filter(_, 
shuffledHashJoin)).getOrElse(shuffledHashJoin)
+        boundCondition.map(FilterExec(_, 
shuffledHashJoin)).getOrElse(shuffledHashJoin)
       EnsureRequirements(sqlContext.sessionState.conf).apply(filteredJoin)
     }
 
@@ -123,7 +123,7 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
         leftPlan: SparkPlan,
         rightPlan: SparkPlan) = {
       val sortMergeJoin =
-        joins.SortMergeJoin(leftKeys, rightKeys, Inner, boundCondition, 
leftPlan, rightPlan)
+        joins.SortMergeJoinExec(leftKeys, rightKeys, Inner, boundCondition, 
leftPlan, rightPlan)
       EnsureRequirements(sqlContext.sessionState.conf).apply(sortMergeJoin)
     }
 
@@ -189,7 +189,7 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
     test(s"$testName using CartesianProduct") {
       withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
         checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) 
=>
-          Filter(condition(), CartesianProduct(left, right)),
+          FilterExec(condition(), CartesianProductExec(left, right)),
           expectedAnswer.map(Row.fromTuple),
           sortAnswers = true)
       }
@@ -198,7 +198,7 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
     test(s"$testName using BroadcastNestedLoopJoin build left") {
       withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
         checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) 
=>
-          BroadcastNestedLoopJoin(left, right, BuildLeft, Inner, 
Some(condition())),
+          BroadcastNestedLoopJoinExec(left, right, BuildLeft, Inner, 
Some(condition())),
           expectedAnswer.map(Row.fromTuple),
           sortAnswers = true)
       }
@@ -207,7 +207,7 @@ class InnerJoinSuite extends SparkPlanTest with 
SharedSQLContext {
     test(s"$testName using BroadcastNestedLoopJoin build right") {
       withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
         checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) 
=>
-          BroadcastNestedLoopJoin(left, right, BuildRight, Inner, 
Some(condition())),
+          BroadcastNestedLoopJoinExec(left, right, BuildRight, Inner, 
Some(condition())),
           expectedAnswer.map(Row.fromTuple),
           sortAnswers = true)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
index 4cacb20..c26cb84 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala
@@ -83,7 +83,7 @@ class OuterJoinSuite extends SparkPlanTest with 
SharedSQLContext {
             val buildSide = if (joinType == LeftOuter) BuildRight else 
BuildLeft
             checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: 
SparkPlan) =>
               EnsureRequirements(sqlContext.sessionState.conf).apply(
-                ShuffledHashJoin(
+                ShuffledHashJoinExec(
                   leftKeys, rightKeys, joinType, buildSide, boundCondition, 
left, right)),
               expectedAnswer.map(Row.fromTuple),
               sortAnswers = true)
@@ -102,7 +102,7 @@ class OuterJoinSuite extends SparkPlanTest with 
SharedSQLContext {
         extractJoinParts().foreach { case (_, leftKeys, rightKeys, 
boundCondition, _, _) =>
           withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
             checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: 
SparkPlan) =>
-              BroadcastHashJoin(
+              BroadcastHashJoinExec(
                 leftKeys, rightKeys, joinType, buildSide, boundCondition, 
left, right),
               expectedAnswer.map(Row.fromTuple),
               sortAnswers = true)
@@ -116,7 +116,7 @@ class OuterJoinSuite extends SparkPlanTest with 
SharedSQLContext {
         withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
           checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: 
SparkPlan) =>
             EnsureRequirements(sqlContext.sessionState.conf).apply(
-              SortMergeJoin(leftKeys, rightKeys, joinType, boundCondition, 
left, right)),
+              SortMergeJoinExec(leftKeys, rightKeys, joinType, boundCondition, 
left, right)),
             expectedAnswer.map(Row.fromTuple),
             sortAnswers = true)
         }
@@ -126,7 +126,7 @@ class OuterJoinSuite extends SparkPlanTest with 
SharedSQLContext {
     test(s"$testName using BroadcastNestedLoopJoin build left") {
       withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
         checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) 
=>
-          BroadcastNestedLoopJoin(left, right, BuildLeft, joinType, 
Some(condition)),
+          BroadcastNestedLoopJoinExec(left, right, BuildLeft, joinType, 
Some(condition)),
           expectedAnswer.map(Row.fromTuple),
           sortAnswers = true)
       }
@@ -135,7 +135,7 @@ class OuterJoinSuite extends SparkPlanTest with 
SharedSQLContext {
     test(s"$testName using BroadcastNestedLoopJoin build right") {
       withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
         checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) 
=>
-          BroadcastNestedLoopJoin(left, right, BuildRight, joinType, 
Some(condition)),
+          BroadcastNestedLoopJoinExec(left, right, BuildRight, joinType, 
Some(condition)),
           expectedAnswer.map(Row.fromTuple),
           sortAnswers = true)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index f66deea..c24abf1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.execution.DataSourceScan
+import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD
@@ -208,10 +208,10 @@ class JDBCSuite extends SparkFunSuite
       val parentPlan = df.queryExecution.executedPlan
       // Check if SparkPlan Filter is removed in a physical plan and
       // the plan only has PhysicalRDD to scan JDBCRelation.
-      
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
-      val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
-      
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScan])
-      
assert(node.child.asInstanceOf[DataSourceScan].nodeName.contains("JDBCRelation"))
+      
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+      val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
+      
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.DataSourceScanExec])
+      
assert(node.child.asInstanceOf[DataSourceScanExec].nodeName.contains("JDBCRelation"))
       df
     }
     assert(checkPushdown(sql("SELECT * FROM foobar WHERE THEID < 
1")).collect().size == 0)
@@ -246,9 +246,9 @@ class JDBCSuite extends SparkFunSuite
       val parentPlan = df.queryExecution.executedPlan
       // Check if SparkPlan Filter is not removed in a physical plan because 
JDBCRDD
       // cannot compile given predicates.
-      
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen])
-      val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegen]
-      assert(node.child.isInstanceOf[org.apache.spark.sql.execution.Filter])
+      
assert(parentPlan.isInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec])
+      val node = 
parentPlan.asInstanceOf[org.apache.spark.sql.execution.WholeStageCodegenExec]
+      
assert(node.child.isInstanceOf[org.apache.spark.sql.execution.FilterExec])
       df
     }
     assert(checkNotPushdown(sql("SELECT * FROM foobar WHERE (THEID + 1) < 
2")).collect().size == 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/d7d0cad0/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 19e34b4..1470777 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -312,7 +312,7 @@ class FilteredScanSuite extends DataSourceTest with 
SharedSQLContext with Predic
       try {
         val queryExecution = sql(sqlString).queryExecution
         val rawPlan = queryExecution.executedPlan.collect {
-          case p: execution.DataSourceScan => p
+          case p: execution.DataSourceScanExec => p
         } match {
           case Seq(p) => p
           case _ => fail(s"More than one PhysicalRDD found\n$queryExecution")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to