http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index ab84c12..e18c817 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.joins
 import java.io.{ObjectInput, ObjectOutput, Externalizable}
 import java.util.{HashMap => JavaHashMap}
 
-import org.apache.spark.sql.catalyst.expressions.{Projection, Row}
+import org.apache.spark.sql.catalyst.expressions.{Projection, InternalRow}
 import org.apache.spark.sql.execution.SparkSqlSerializer
 import org.apache.spark.util.collection.CompactBuffer
 
@@ -30,7 +30,7 @@ import org.apache.spark.util.collection.CompactBuffer
  * object.
  */
 private[joins] sealed trait HashedRelation {
-  def get(key: Row): CompactBuffer[Row]
+  def get(key: InternalRow): CompactBuffer[InternalRow]
 
   // This is a helper method to implement Externalizable, and is used by
   // GeneralHashedRelation and UniqueKeyHashedRelation
@@ -54,12 +54,12 @@ private[joins] sealed trait HashedRelation {
  * A general [[HashedRelation]] backed by a hash map that maps the key into a 
sequence of values.
  */
 private[joins] final class GeneralHashedRelation(
-    private var hashTable: JavaHashMap[Row, CompactBuffer[Row]])
+    private var hashTable: JavaHashMap[InternalRow, 
CompactBuffer[InternalRow]])
   extends HashedRelation with Externalizable {
 
   def this() = this(null) // Needed for serialization
 
-  override def get(key: Row): CompactBuffer[Row] = hashTable.get(key)
+  override def get(key: InternalRow): CompactBuffer[InternalRow] = 
hashTable.get(key)
 
   override def writeExternal(out: ObjectOutput): Unit = {
     writeBytes(out, SparkSqlSerializer.serialize(hashTable))
@@ -75,17 +75,18 @@ private[joins] final class GeneralHashedRelation(
  * A specialized [[HashedRelation]] that maps key into a single value. This 
implementation
  * assumes the key is unique.
  */
-private[joins] final class UniqueKeyHashedRelation(private var hashTable: 
JavaHashMap[Row, Row])
+private[joins]
+final class UniqueKeyHashedRelation(private var hashTable: 
JavaHashMap[InternalRow, InternalRow])
   extends HashedRelation with Externalizable {
 
   def this() = this(null) // Needed for serialization
 
-  override def get(key: Row): CompactBuffer[Row] = {
+  override def get(key: InternalRow): CompactBuffer[InternalRow] = {
     val v = hashTable.get(key)
     if (v eq null) null else CompactBuffer(v)
   }
 
-  def getValue(key: Row): Row = hashTable.get(key)
+  def getValue(key: InternalRow): InternalRow = hashTable.get(key)
 
   override def writeExternal(out: ObjectOutput): Unit = {
     writeBytes(out, SparkSqlSerializer.serialize(hashTable))
@@ -103,13 +104,13 @@ private[joins] final class 
UniqueKeyHashedRelation(private var hashTable: JavaHa
 private[joins] object HashedRelation {
 
   def apply(
-      input: Iterator[Row],
+      input: Iterator[InternalRow],
       keyGenerator: Projection,
       sizeEstimate: Int = 64): HashedRelation = {
 
     // TODO: Use Spark's HashMap implementation.
-    val hashTable = new JavaHashMap[Row, CompactBuffer[Row]](sizeEstimate)
-    var currentRow: Row = null
+    val hashTable = new JavaHashMap[InternalRow, 
CompactBuffer[InternalRow]](sizeEstimate)
+    var currentRow: InternalRow = null
 
     // Whether the join key is unique. If the key is unique, we can convert 
the underlying
     // hash map into one specialized for this.
@@ -122,7 +123,7 @@ private[joins] object HashedRelation {
       if (!rowKey.anyNull) {
         val existingMatchList = hashTable.get(rowKey)
         val matchList = if (existingMatchList == null) {
-          val newMatchList = new CompactBuffer[Row]()
+          val newMatchList = new CompactBuffer[InternalRow]()
           hashTable.put(rowKey, newMatchList)
           newMatchList
         } else {
@@ -134,7 +135,7 @@ private[joins] object HashedRelation {
     }
 
     if (keyIsUnique) {
-      val uniqHashTable = new JavaHashMap[Row, Row](hashTable.size)
+      val uniqHashTable = new JavaHashMap[InternalRow, 
InternalRow](hashTable.size)
       val iter = hashTable.entrySet().iterator()
       while (iter.hasNext) {
         val entry = iter.next()

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
index 036423e..2a6d4d1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala
@@ -47,7 +47,7 @@ case class LeftSemiJoinBNL(
   @transient private lazy val boundCondition =
     newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output)
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     val broadcastedRelation =
       
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
index 8ad27ea..20d7427 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
InternalRow}
 import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 
@@ -42,10 +42,10 @@ case class LeftSemiJoinHash(
 
   override def output: Seq[Attribute] = left.output
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, 
streamIter) =>
-      val hashSet = new java.util.HashSet[Row]()
-      var currentRow: Row = null
+      val hashSet = new java.util.HashSet[InternalRow]()
+      var currentRow: InternalRow = null
 
       // Create a Hash set of buildKeys
       while (buildIter.hasNext) {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
index 219525d..5439e10 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Partitioning}
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
@@ -43,7 +43,7 @@ case class ShuffledHashJoin(
   override def requiredChildDistribution: Seq[ClusteredDistribution] =
     ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, 
streamIter) =>
       val hashed = HashedRelation(buildIter, buildSideKeyGenerator)
       hashJoin(streamIter, hashed)

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 1a39fb4..2abe65a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -21,9 +21,7 @@ import java.util.NoSuchElementException
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 import org.apache.spark.util.collection.CompactBuffer
@@ -60,29 +58,29 @@ case class SortMergeJoin(
   private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
     keys.map(SortOrder(_, Ascending))
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     val leftResults = left.execute().map(_.copy())
     val rightResults = right.execute().map(_.copy())
 
     leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =>
-      new Iterator[Row] {
+      new Iterator[InternalRow] {
         // Mutable per row objects.
         private[this] val joinRow = new JoinedRow5
-        private[this] var leftElement: Row = _
-        private[this] var rightElement: Row = _
-        private[this] var leftKey: Row = _
-        private[this] var rightKey: Row = _
-        private[this] var rightMatches: CompactBuffer[Row] = _
+        private[this] var leftElement: InternalRow = _
+        private[this] var rightElement: InternalRow = _
+        private[this] var leftKey: InternalRow = _
+        private[this] var rightKey: InternalRow = _
+        private[this] var rightMatches: CompactBuffer[InternalRow] = _
         private[this] var rightPosition: Int = -1
         private[this] var stop: Boolean = false
-        private[this] var matchKey: Row = _
+        private[this] var matchKey: InternalRow = _
 
         // initialize iterator
         initialize()
 
         override final def hasNext: Boolean = nextMatchingPair()
 
-        override final def next(): Row = {
+        override final def next(): InternalRow = {
           if (hasNext) {
             // we are using the buffered right rows and run down left iterator
             val joinedRow = joinRow(leftElement, rightMatches(rightPosition))
@@ -145,7 +143,7 @@ case class SortMergeJoin(
                 fetchLeft()
               }
             }
-            rightMatches = new CompactBuffer[Row]()
+            rightMatches = new CompactBuffer[InternalRow]()
             if (stop) {
               stop = false
               // iterate the right side to buffer all rows that matches

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
index 2b45a83..1ce150c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala
@@ -29,7 +29,8 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.python.{PythonBroadcast, PythonRDD}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Row, _}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -56,7 +57,7 @@ private[spark] case class PythonUDF(
 
   def nullable: Boolean = true
 
-  override def eval(input: Row): Any = {
+  override def eval(input: InternalRow): Any = {
     throw new UnsupportedOperationException("PythonUDFs can not be directly 
evaluated.")
   }
 }
@@ -241,7 +242,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: 
Seq[Attribute], child:
 
   def children: Seq[SparkPlan] = child :: Nil
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     val childResults = child.execute().map(_.copy())
 
     val parent = childResults.mapPartitions { iter =>
@@ -276,7 +277,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: 
Seq[Attribute], child:
       val row = new GenericMutableRow(1)
       iter.map { result =>
         row(0) = EvaluatePython.fromJava(result, udf.dataType)
-        row: Row
+        row: InternalRow
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
index c41c21c..8df1da0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.stat
 import scala.collection.mutable.{Map => MutableMap}
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.{Column, DataFrame, Row}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.types.{ArrayType, StructField, StructType}
+import org.apache.spark.sql.{Column, DataFrame}
 
 private[sql] object FrequentItems extends Logging {
 
@@ -110,7 +111,7 @@ private[sql] object FrequentItems extends Logging {
       }
     )
     val justItems = freqItems.map(m => m.baseMap.keys.toSeq)
-    val resultRow = Row(justItems : _*)
+    val resultRow = InternalRow(justItems : _*)
     // append frequent Items to the column name for easy debugging
     val outputCols = colInfo.map { v =>
       StructField(v._1 + "_freqItems", ArrayType(v._2, false))

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index e75e668..667fc70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -24,7 +24,7 @@ import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow}
+import org.apache.spark.sql.catalyst.expressions.{InternalRow, 
SpecificMutableRow}
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.sources._
@@ -211,7 +211,7 @@ private[sql] object JDBCRDD extends Logging {
       fqTable: String,
       requiredColumns: Array[String],
       filters: Array[Filter],
-      parts: Array[Partition]): RDD[Row] = {
+      parts: Array[Partition]): RDD[InternalRow] = {
     val dialect = JdbcDialects.get(url)
     val quotedColumns = requiredColumns.map(colName => 
dialect.quoteIdentifier(colName))
     new JDBCRDD(
@@ -240,7 +240,7 @@ private[sql] class JDBCRDD(
     filters: Array[Filter],
     partitions: Array[Partition],
     properties: Properties)
-  extends RDD[Row](sc, Nil) {
+  extends RDD[InternalRow](sc, Nil) {
 
   /**
    * Retrieve the list of partitions corresponding to this RDD.
@@ -348,12 +348,12 @@ private[sql] class JDBCRDD(
   /**
    * Runs the SQL query against the JDBC driver.
    */
-  override def compute(thePart: Partition, context: TaskContext): 
Iterator[Row] = new Iterator[Row]
-  {
+  override def compute(thePart: Partition, context: TaskContext): 
Iterator[InternalRow] =
+    new Iterator[InternalRow] {
     var closed = false
     var finished = false
     var gotNext = false
-    var nextValue: Row = null
+    var nextValue: InternalRow = null
 
     context.addTaskCompletionListener{ context => close() }
     val part = thePart.asInstanceOf[JDBCPartition]
@@ -375,7 +375,7 @@ private[sql] class JDBCRDD(
     val conversions = getConversions(schema)
     val mutableRow = new SpecificMutableRow(schema.fields.map(x => x.dataType))
 
-    def getNext(): Row = {
+    def getNext(): InternalRow = {
       if (rs.next()) {
         var i = 0
         while (i < conversions.length) {
@@ -443,7 +443,7 @@ private[sql] class JDBCRDD(
         mutableRow
       } else {
         finished = true
-        null.asInstanceOf[Row]
+        null.asInstanceOf[InternalRow]
       }
     }
 
@@ -486,7 +486,7 @@ private[sql] class JDBCRDD(
       !finished
     }
 
-    override def next(): Row = {
+    override def next(): InternalRow = {
       if (!hasNext) {
         throw new NoSuchElementException("End of stream")
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index 30f9190..4d3aac4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -23,10 +23,9 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
-import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
 
 /**
  * Instructions on how to partition the table among workers.
@@ -138,7 +137,7 @@ private[sql] case class JDBCRelation(
       table,
       requiredColumns,
       filters,
-      parts)
+      parts).map(_.asInstanceOf[Row])
   }
 
   override def insert(data: DataFrame, overwrite: Boolean): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index c772cd1..69bf13e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -22,10 +22,10 @@ import java.io.IOException
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
 
 
 private[sql] class DefaultSource
@@ -154,12 +154,12 @@ private[sql] class JSONRelation(
       JacksonParser(
         baseRDD(),
         schema,
-        sqlContext.conf.columnNameOfCorruptRecord)
+        sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
     } else {
       JsonRDD.jsonStringToRow(
         baseRDD(),
         schema,
-        sqlContext.conf.columnNameOfCorruptRecord)
+        sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
     }
   }
 
@@ -168,12 +168,12 @@ private[sql] class JSONRelation(
       JacksonParser(
         baseRDD(),
         StructType.fromAttributes(requiredColumns),
-        sqlContext.conf.columnNameOfCorruptRecord)
+        sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
     } else {
       JsonRDD.jsonStringToRow(
         baseRDD(),
         StructType.fromAttributes(requiredColumns),
-        sqlContext.conf.columnNameOfCorruptRecord)
+        sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala
index 325f54b..1e6b119 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala
@@ -21,7 +21,7 @@ import scala.collection.Map
 
 import com.fasterxml.jackson.core._
 
-import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.types._
 
 private[sql] object JacksonGenerator {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
index f16075c..817e8a2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala
@@ -35,7 +35,7 @@ private[sql] object JacksonParser {
   def apply(
       json: RDD[String],
       schema: StructType,
-      columnNameOfCorruptRecords: String): RDD[Row] = {
+      columnNameOfCorruptRecords: String): RDD[InternalRow] = {
     parseJson(json, schema, columnNameOfCorruptRecords)
   }
 
@@ -130,7 +130,10 @@ private[sql] object JacksonParser {
    *
    * Fields in the json that are not defined in the requested schema will be 
dropped.
    */
-  private def convertObject(factory: JsonFactory, parser: JsonParser, schema: 
StructType): Row = {
+  private def convertObject(
+      factory: JsonFactory,
+      parser: JsonParser,
+      schema: StructType): InternalRow = {
     val row = new GenericMutableRow(schema.length)
     while (nextUntil(parser, JsonToken.END_OBJECT)) {
       schema.getFieldIndex(parser.getCurrentName) match {
@@ -176,9 +179,9 @@ private[sql] object JacksonParser {
   private def parseJson(
       json: RDD[String],
       schema: StructType,
-      columnNameOfCorruptRecords: String): RDD[Row] = {
+      columnNameOfCorruptRecords: String): RDD[InternalRow] = {
 
-    def failedRecord(record: String): Seq[Row] = {
+    def failedRecord(record: String): Seq[InternalRow] = {
       // create a row even if no corrupt record column is present
       val row = new GenericMutableRow(schema.length)
       for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) {
@@ -202,7 +205,7 @@ private[sql] object JacksonParser {
           // convertField wrap an object into a single value array when 
necessary.
           convertField(factory, parser, ArrayType(schema)) match {
             case null => failedRecord(record)
-            case list: Seq[Row @unchecked] => list
+            case list: Seq[InternalRow @unchecked] => list
             case _ =>
               sys.error(
                 s"Failed to parse record $record. Please make sure that each 
line of the file " +

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index e4acf1d..44594c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -38,7 +38,7 @@ private[sql] object JsonRDD extends Logging {
   private[sql] def jsonStringToRow(
       json: RDD[String],
       schema: StructType,
-      columnNameOfCorruptRecords: String): RDD[Row] = {
+      columnNameOfCorruptRecords: String): RDD[InternalRow] = {
     parseJson(json, columnNameOfCorruptRecords).map(parsed => asRow(parsed, 
schema))
   }
 
@@ -434,7 +434,7 @@ private[sql] object JsonRDD extends Logging {
     }
   }
 
-  private def asRow(json: Map[String, Any], schema: StructType): Row = {
+  private def asRow(json: Map[String, Any], schema: StructType): InternalRow = 
{
     // TODO: Reuse the row instead of creating a new one for every record.
     val row = new GenericMutableRow(schema.fields.length)
     schema.fields.zipWithIndex.foreach {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index ab9f878..4da5e96 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -79,7 +79,7 @@ private[sql] object CatalystConverter {
 
   // TODO: consider using Array[T] for arrays to avoid boxing of primitive 
types
   type ArrayScalaType[T] = Seq[T]
-  type StructScalaType[T] = Row
+  type StructScalaType[T] = InternalRow
   type MapScalaType[K, V] = Map[K, V]
 
   protected[parquet] def createConverter(
@@ -240,7 +240,7 @@ private[parquet] abstract class CatalystConverter extends 
GroupConverter {
    *
    * @return
    */
-  def getCurrentRecord: Row = throw new UnsupportedOperationException
+  def getCurrentRecord: InternalRow = throw new UnsupportedOperationException
 
   /**
    * Read a decimal value from a Parquet Binary into "dest". Only supports 
decimals that fit in
@@ -275,7 +275,7 @@ private[parquet] abstract class CatalystConverter extends 
GroupConverter {
 
 /**
  * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record
- * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object.
+ * to a [[org.apache.spark.sql.catalyst.expressions.InternalRow]] object.
  *
  * @param schema The corresponding Catalyst schema in the form of a list of 
attributes.
  */
@@ -284,7 +284,7 @@ private[parquet] class CatalystGroupConverter(
     protected[parquet] val index: Int,
     protected[parquet] val parent: CatalystConverter,
     protected[parquet] var current: ArrayBuffer[Any],
-    protected[parquet] var buffer: ArrayBuffer[Row])
+    protected[parquet] var buffer: ArrayBuffer[InternalRow])
   extends CatalystConverter {
 
   def this(schema: Array[FieldType], index: Int, parent: CatalystConverter) =
@@ -293,7 +293,7 @@ private[parquet] class CatalystGroupConverter(
       index,
       parent,
       current = null,
-      buffer = new ArrayBuffer[Row](
+      buffer = new ArrayBuffer[InternalRow](
         CatalystArrayConverter.INITIAL_ARRAY_SIZE))
 
   /**
@@ -309,7 +309,7 @@ private[parquet] class CatalystGroupConverter(
 
   override val size = schema.size
 
-  override def getCurrentRecord: Row = {
+  override def getCurrentRecord: InternalRow = {
     assert(isRootConverter, "getCurrentRecord should only be called in root 
group converter!")
     // TODO: use iterators if possible
     // Note: this will ever only be called in the root converter when the 
record has been
@@ -347,7 +347,7 @@ private[parquet] class CatalystGroupConverter(
 
 /**
  * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record
- * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. Note that his
+ * to a [[org.apache.spark.sql.catalyst.expressions.InternalRow]] object. Note 
that his
  * converter is optimized for rows of primitive types (non-nested records).
  */
 private[parquet] class CatalystPrimitiveRowConverter(
@@ -373,7 +373,7 @@ private[parquet] class CatalystPrimitiveRowConverter(
   override val parent = null
 
   // Should be only called in root group converter!
-  override def getCurrentRecord: Row = current
+  override def getCurrentRecord: InternalRow = current
 
   override def getConverter(fieldIndex: Int): Converter = 
converters(fieldIndex)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 272608d..39360e1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -46,7 +46,7 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLConf
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row, 
_}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
InternalRow, _}
 import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.{Logging, SerializableWritable, TaskContext}
@@ -54,7 +54,7 @@ import org.apache.spark.{Logging, SerializableWritable, 
TaskContext}
 /**
  * :: DeveloperApi ::
  * Parquet table scan operator. Imports the file that backs the given
- * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
+ * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[InternalRow]``.
  */
 private[sql] case class ParquetTableScan(
     attributes: Seq[Attribute],
@@ -77,7 +77,7 @@ private[sql] case class ParquetTableScan(
     }
   }.toArray
 
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat
 
     val sc = sqlContext.sparkContext
@@ -125,7 +125,7 @@ private[sql] case class ParquetTableScan(
         sc,
         classOf[FilteringParquetRowInputFormat],
         classOf[Void],
-        classOf[Row],
+        classOf[InternalRow],
         conf)
 
     if (requestedPartitionOrdinals.nonEmpty) {
@@ -154,9 +154,9 @@ private[sql] case class ParquetTableScan(
             .map(a => Cast(Literal(partValues(a.name)), 
a.dataType).eval(EmptyRow))
 
         if (primitiveRow) {
-          new Iterator[Row] {
+          new Iterator[InternalRow] {
             def hasNext: Boolean = iter.hasNext
-            def next(): Row = {
+            def next(): InternalRow = {
               // We are using CatalystPrimitiveRowConverter and it returns a 
SpecificMutableRow.
               val row = iter.next()._2.asInstanceOf[SpecificMutableRow]
 
@@ -173,12 +173,12 @@ private[sql] case class ParquetTableScan(
         } else {
           // Create a mutable row since we need to fill in values from 
partition columns.
           val mutableRow = new GenericMutableRow(outputSize)
-          new Iterator[Row] {
+          new Iterator[InternalRow] {
             def hasNext: Boolean = iter.hasNext
-            def next(): Row = {
+            def next(): InternalRow = {
               // We are using CatalystGroupConverter and it returns a 
GenericRow.
               // Since GenericRow is not mutable, we just cast it to a Row.
-              val row = iter.next()._2.asInstanceOf[Row]
+              val row = iter.next()._2.asInstanceOf[InternalRow]
 
               var i = 0
               while (i < row.size) {
@@ -258,7 +258,7 @@ private[sql] case class InsertIntoParquetTable(
   /**
    * Inserts all rows into the Parquet file.
    */
-  protected override def doExecute(): RDD[Row] = {
+  protected override def doExecute(): RDD[InternalRow] = {
     // TODO: currently we do not check whether the "schema"s are compatible
     // That means if one first creates a table and then INSERTs data with
     // and incompatible schema the execution will fail. It would be nice
@@ -321,13 +321,13 @@ private[sql] case class InsertIntoParquetTable(
    * @param conf A [[org.apache.hadoop.conf.Configuration]].
    */
   private def saveAsHadoopFile(
-      rdd: RDD[Row],
+      rdd: RDD[InternalRow],
       path: String,
       conf: Configuration) {
     val job = new Job(conf)
     val keyType = classOf[Void]
     job.setOutputKeyClass(keyType)
-    job.setOutputValueClass(classOf[Row])
+    job.setOutputValueClass(classOf[InternalRow])
     NewFileOutputFormat.setOutputPath(job, new Path(path))
     val wrappedConf = new SerializableWritable(job.getConfiguration)
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -342,7 +342,7 @@ private[sql] case class InsertIntoParquetTable(
           .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, 
job.getConfiguration) + 1
       }
 
-    def writeShard(context: TaskContext, iter: Iterator[Row]): Int = {
+    def writeShard(context: TaskContext, iter: Iterator[InternalRow]): Int = {
       /* "reduce task" <split #> <attempt # = spark task #> */
       val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, 
context.partitionId,
         context.attemptNumber)
@@ -381,7 +381,7 @@ private[sql] case class InsertIntoParquetTable(
  * to imported ones.
  */
 private[parquet] class AppendingParquetOutputFormat(offset: Int)
-  extends org.apache.parquet.hadoop.ParquetOutputFormat[Row] {
+  extends org.apache.parquet.hadoop.ParquetOutputFormat[InternalRow] {
   // override to accept existing directories as valid output directory
   override def checkOutputSpecs(job: JobContext): Unit = {}
   var committer: OutputCommitter = null
@@ -434,25 +434,25 @@ private[parquet] class 
AppendingParquetOutputFormat(offset: Int)
  * RecordFilter we want to use.
  */
 private[parquet] class FilteringParquetRowInputFormat
-  extends org.apache.parquet.hadoop.ParquetInputFormat[Row] with Logging {
+  extends org.apache.parquet.hadoop.ParquetInputFormat[InternalRow] with 
Logging {
 
   private var fileStatuses = Map.empty[Path, FileStatus]
 
   override def createRecordReader(
       inputSplit: InputSplit,
-      taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = {
+      taskAttemptContext: TaskAttemptContext): RecordReader[Void, InternalRow] 
= {
 
     import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter
 
-    val readSupport: ReadSupport[Row] = new RowReadSupport()
+    val readSupport: ReadSupport[InternalRow] = new RowReadSupport()
 
     val filter = 
ParquetInputFormat.getFilter(ContextUtil.getConfiguration(taskAttemptContext))
     if (!filter.isInstanceOf[NoOpFilter]) {
-      new ParquetRecordReader[Row](
+      new ParquetRecordReader[InternalRow](
         readSupport,
         filter)
     } else {
-      new ParquetRecordReader[Row](readSupport)
+      new ParquetRecordReader[InternalRow](readSupport)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index c62c592..a8775a2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -28,7 +28,7 @@ import org.apache.parquet.io.api._
 import org.apache.parquet.schema.MessageType
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow}
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -39,12 +39,12 @@ import org.apache.spark.unsafe.types.UTF8String
  *@param root The root group converter for the record.
  */
 private[parquet] class RowRecordMaterializer(root: CatalystConverter)
-  extends RecordMaterializer[Row] {
+  extends RecordMaterializer[InternalRow] {
 
   def this(parquetSchema: MessageType, attributes: Seq[Attribute]) =
     this(CatalystConverter.createRootConverter(parquetSchema, attributes))
 
-  override def getCurrentRecord: Row = root.getCurrentRecord
+  override def getCurrentRecord: InternalRow = root.getCurrentRecord
 
   override def getRootConverter: GroupConverter = 
root.asInstanceOf[GroupConverter]
 }
@@ -52,13 +52,13 @@ private[parquet] class RowRecordMaterializer(root: 
CatalystConverter)
 /**
  * A `parquet.hadoop.api.ReadSupport` for Row objects.
  */
-private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
+private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with 
Logging {
 
   override def prepareForRead(
       conf: Configuration,
       stringMap: java.util.Map[String, String],
       fileSchema: MessageType,
-      readContext: ReadContext): RecordMaterializer[Row] = {
+      readContext: ReadContext): RecordMaterializer[InternalRow] = {
     log.debug(s"preparing for read with Parquet file schema $fileSchema")
     // Note: this very much imitates AvroParquet
     val parquetSchema = readContext.getRequestedSchema
@@ -133,7 +133,7 @@ private[parquet] object RowReadSupport {
 /**
  * A `parquet.hadoop.api.WriteSupport` for Row objects.
  */
-private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
+private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with 
Logging {
 
   private[parquet] var writer: RecordConsumer = null
   private[parquet] var attributes: Array[Attribute] = null
@@ -157,7 +157,7 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[Row] with Logging {
     log.debug(s"preparing for write with schema $attributes")
   }
 
-  override def write(record: Row): Unit = {
+  override def write(record: InternalRow): Unit = {
     val attributesSize = attributes.size
     if (attributesSize > record.size) {
       throw new IndexOutOfBoundsException(
@@ -322,7 +322,7 @@ private[parquet] class RowWriteSupport extends 
WriteSupport[Row] with Logging {
 
 // Optimized for non-nested rows
 private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
-  override def write(record: Row): Unit = {
+  override def write(record: InternalRow): Unit = {
     val attributesSize = attributes.size
     if (attributesSize > record.size) {
       throw new IndexOutOfBoundsException(
@@ -345,7 +345,7 @@ private[parquet] class MutableRowWriteSupport extends 
RowWriteSupport {
 
   private def consumeType(
       ctype: DataType,
-      record: Row,
+      record: InternalRow,
       index: Int): Unit = {
     ctype match {
       case StringType => writer.addBinary(

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 7af4eb1..bc27a9b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -34,15 +34,15 @@ import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 import org.apache.parquet.hadoop.util.ContextUtil
 
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.{Partition => SparkPartition, SerializableWritable, 
Logging, SparkException}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql._
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.sql.{Row, SQLConf, SQLContext}
 import org.apache.spark.util.Utils
 
 private[sql] class DefaultSource extends HadoopFsRelationProvider {
@@ -60,7 +60,7 @@ private[sql] class DefaultSource extends 
HadoopFsRelationProvider {
 private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext)
   extends OutputWriter {
 
-  private val recordWriter: RecordWriter[Void, Row] = {
+  private val recordWriter: RecordWriter[Void, InternalRow] = {
     val conf = context.getConfiguration
     val outputFormat = {
       // When appending new Parquet files to an existing Parquet file 
directory, to avoid
@@ -93,7 +93,7 @@ private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext
         }
       }
 
-      new ParquetOutputFormat[Row]() {
+      new ParquetOutputFormat[InternalRow]() {
         // Here we override `getDefaultWorkFile` for two reasons:
         //
         //  1. To allow appending.  We need to generate output file name based 
on the max available
@@ -112,7 +112,7 @@ private[sql] class ParquetOutputWriter(path: String, 
context: TaskAttemptContext
     outputFormat.getRecordWriter(context)
   }
 
-  override def write(row: Row): Unit = recordWriter.write(null, row)
+  override def write(row: Row): Unit = recordWriter.write(null, 
row.asInstanceOf[InternalRow])
 
   override def close(): Unit = recordWriter.close(context)
 }
@@ -286,7 +286,7 @@ private[sql] class ParquetRelation2(
         initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
         inputFormatClass = classOf[FilteringParquetRowInputFormat],
         keyClass = classOf[Void],
-        valueClass = classOf[Row]) {
+        valueClass = classOf[InternalRow]) {
 
         val cacheMetadata = useMetadataCache
 
@@ -331,7 +331,7 @@ private[sql] class ParquetRelation2(
             new SqlNewHadoopPartition(id, i, 
rawSplits(i).asInstanceOf[InputSplit with Writable])
           }
         }
-      }.values
+      }.values.map(_.asInstanceOf[Row])
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index edda3f2..4cf6743 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -17,48 +17,48 @@
 
 package org.apache.spark.sql.sources
 
-import org.apache.spark.{Logging, SerializableWritable, TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.{SaveMode, Strategy, execution, sources}
 import org.apache.spark.util.Utils
 import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.{Logging, SerializableWritable, TaskContext}
 
 /**
  * A Strategy for planning scans over data sources defined using the sources 
API.
  */
 private[sql] object DataSourceStrategy extends Strategy with Logging {
   def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
-    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: 
CatalystScan)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
CatalystScan)) =>
       pruneFilterProjectRaw(
         l,
-        projectList,
+        projects,
         filters,
-        (a, f) => t.buildScan(a, f)) :: Nil
+        (a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil
 
-    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: 
PrunedFilteredScan)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
PrunedFilteredScan)) =>
       pruneFilterProject(
         l,
-        projectList,
+        projects,
         filters,
-        (a, f) => t.buildScan(a, f)) :: Nil
+        (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) 
:: Nil
 
-    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: 
PrunedScan)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
PrunedScan)) =>
       pruneFilterProject(
         l,
-        projectList,
+        projects,
         filters,
-        (a, _) => t.buildScan(a)) :: Nil
+        (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: 
Nil
 
     // Scanning partitioned HadoopFsRelation
-    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: 
HadoopFsRelation))
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
HadoopFsRelation))
         if t.partitionSpec.partitionColumns.nonEmpty =>
       val selectedPartitions = prunePartitions(filters, 
t.partitionSpec).toArray
 
@@ -80,13 +80,13 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
 
       buildPartitionedTableScan(
         l,
-        projectList,
+        projects,
         pushedFilters,
         t.partitionSpec.partitionColumns,
         selectedPartitions) :: Nil
 
     // Scanning non-partitioned HadoopFsRelation
-    case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: 
HadoopFsRelation)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
HadoopFsRelation)) =>
       // See buildPartitionedTableScan for the reason that we need to create a 
shard
       // broadcast HadoopConf.
       val sharedHadoopConf = SparkHadoopUtil.get.conf
@@ -94,12 +94,13 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         t.sqlContext.sparkContext.broadcast(new 
SerializableWritable(sharedHadoopConf))
       pruneFilterProject(
         l,
-        projectList,
+        projects,
         filters,
-        (a, f) => t.buildScan(a, f, t.paths, confBroadcast)) :: Nil
+        (a, f) =>
+          toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, 
confBroadcast))) :: Nil
 
     case l @ LogicalRelation(t: TableScan) =>
-      createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil
+      execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil
 
     case i @ logical.InsertIntoTable(
       l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, 
false) if part.isEmpty =>
@@ -119,7 +120,6 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
       filters: Seq[Expression],
       partitionColumns: StructType,
       partitions: Array[Partition]) = {
-    val output = projections.map(_.toAttribute)
     val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation]
 
     // Because we are creating one RDD per partition, we need to have a shared 
HadoopConf.
@@ -138,23 +138,23 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
           logicalRelation,
           projections,
           filters,
-          (requiredColumns, filters) => {
+          (columns: Seq[Attribute], filters) => {
             val partitionColNames = partitionColumns.fieldNames
 
             // Don't scan any partition columns to save I/O.  Here we are 
being optimistic and
             // assuming partition columns data stored in data files are always 
consistent with those
             // partition values encoded in partition directory paths.
-            val nonPartitionColumns = 
requiredColumns.filterNot(partitionColNames.contains)
+            val needed = columns.filterNot(a => 
partitionColNames.contains(a.name))
             val dataRows =
-              relation.buildScan(nonPartitionColumns, filters, Array(dir), 
confBroadcast)
+              relation.buildScan(needed.map(_.name).toArray, filters, 
Array(dir), confBroadcast)
 
             // Merges data values with partition values.
             mergeWithPartitionValues(
               relation.schema,
-              requiredColumns,
+              columns.map(_.name).toArray,
               partitionColNames,
               partitionValues,
-              dataRows)
+              toCatalystRDD(logicalRelation, needed, dataRows))
           })
 
       scan.execute()
@@ -167,15 +167,15 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
       }
 
-    createPhysicalRDD(logicalRelation.relation, output, unionedRows)
+    execution.PhysicalRDD(projections.map(_.toAttribute), unionedRows)
   }
 
   private def mergeWithPartitionValues(
       schema: StructType,
       requiredColumns: Array[String],
       partitionColumns: Array[String],
-      partitionValues: Row,
-      dataRows: RDD[Row]): RDD[Row] = {
+      partitionValues: InternalRow,
+      dataRows: RDD[InternalRow]): RDD[InternalRow] = {
     val nonPartitionColumns = 
requiredColumns.filterNot(partitionColumns.contains)
 
     // If output columns contain any partition column(s), we need to merge 
scanned data
@@ -186,13 +186,13 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         val i = partitionColumns.indexOf(name)
         if (i != -1) {
           // If yes, gets column value from partition values.
-          (mutableRow: MutableRow, dataRow: expressions.Row, ordinal: Int) => {
+          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
             mutableRow(ordinal) = partitionValues(i)
           }
         } else {
           // Otherwise, inherits the value from scanned data.
           val i = nonPartitionColumns.indexOf(name)
-          (mutableRow: MutableRow, dataRow: expressions.Row, ordinal: Int) => {
+          (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
             mutableRow(ordinal) = dataRow(i)
           }
         }
@@ -201,7 +201,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
       // Since we know for sure that this closure is serializable, we can 
avoid the overhead
       // of cleaning a closure for each RDD by creating our own 
MapPartitionsRDD. Functionally
       // this is equivalent to calling 
`dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
-      val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: 
Iterator[Row]) => {
+      val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: 
Iterator[InternalRow]) => {
         val dataTypes = requiredColumns.map(schema(_).dataType)
         val mutableRow = new SpecificMutableRow(dataTypes)
         iterator.map { dataRow =>
@@ -210,7 +210,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
             mergers(i)(mutableRow, dataRow, i)
             i += 1
           }
-          mutableRow.asInstanceOf[expressions.Row]
+          mutableRow.asInstanceOf[InternalRow]
         }
       }
 
@@ -256,26 +256,26 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
   // Based on Public API.
   protected def pruneFilterProject(
       relation: LogicalRelation,
-      projectList: Seq[NamedExpression],
+      projects: Seq[NamedExpression],
       filterPredicates: Seq[Expression],
-      scanBuilder: (Array[String], Array[Filter]) => RDD[Row]) = {
+      scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]) = {
     pruneFilterProjectRaw(
       relation,
-      projectList,
+      projects,
       filterPredicates,
       (requestedColumns, pushedFilters) => {
-        scanBuilder(requestedColumns.map(_.name).toArray, 
selectFilters(pushedFilters).toArray)
+        scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray)
       })
   }
 
   // Based on Catalyst expressions.
   protected def pruneFilterProjectRaw(
       relation: LogicalRelation,
-      projectList: Seq[NamedExpression],
+      projects: Seq[NamedExpression],
       filterPredicates: Seq[Expression],
-      scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[Row]) = {
+      scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = {
 
-    val projectSet = AttributeSet(projectList.flatMap(_.references))
+    val projectSet = AttributeSet(projects.flatMap(_.references))
     val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
     val filterCondition = filterPredicates.reduceLeftOption(expressions.And)
 
@@ -283,38 +283,47 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
       case a: AttributeReference => relation.attributeMap(a) // Match original 
case of attributes.
     }}
 
-    if (projectList.map(_.toAttribute) == projectList &&
-        projectSet.size == projectList.size &&
+    if (projects.map(_.toAttribute) == projects &&
+        projectSet.size == projects.size &&
         filterSet.subsetOf(projectSet)) {
       // When it is possible to just use column pruning to get the right 
projection and
       // when the columns of this projection are enough to evaluate all filter 
conditions,
       // just do a scan followed by a filter, with no extra project.
       val requestedColumns =
-        projectList.asInstanceOf[Seq[Attribute]] // Safe due to if above.
+        projects.asInstanceOf[Seq[Attribute]] // Safe due to if above.
           .map(relation.attributeMap)            // Match original case of 
attributes.
 
-      val scan = createPhysicalRDD(relation.relation, 
projectList.map(_.toAttribute),
-          scanBuilder(requestedColumns, pushedFilters))
+      val scan = execution.PhysicalRDD(projects.map(_.toAttribute),
+        scanBuilder(requestedColumns, pushedFilters))
       filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
     } else {
       val requestedColumns = (projectSet ++ 
filterSet).map(relation.attributeMap).toSeq
 
-      val scan = createPhysicalRDD(relation.relation, requestedColumns,
+      val scan = execution.PhysicalRDD(requestedColumns,
         scanBuilder(requestedColumns, pushedFilters))
-      execution.Project(projectList, filterCondition.map(execution.Filter(_, 
scan)).getOrElse(scan))
+      execution.Project(projects, filterCondition.map(execution.Filter(_, 
scan)).getOrElse(scan))
     }
   }
 
-  private[this] def createPhysicalRDD(
-      relation: BaseRelation,
+  /**
+   * Convert RDD of Row into RDD of InternalRow with objects in catalyst types
+   */
+  private[this] def toCatalystRDD(
+      relation: LogicalRelation,
       output: Seq[Attribute],
-      rdd: RDD[Row]): SparkPlan = {
-    val converted = if (relation.needConversion) {
-      execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType))
+      rdd: RDD[Row]): RDD[InternalRow] = {
+    if (relation.relation.needConversion) {
+      execution.RDDConversions.rowToRowRdd(rdd.asInstanceOf[RDD[Row]], 
output.map(_.dataType))
     } else {
-      rdd
+      rdd.map(_.asInstanceOf[InternalRow])
     }
-    execution.PhysicalRDD(output, converted)
+  }
+
+  /**
+   * Convert RDD of Row into RDD of InternalRow with objects in catalyst types
+   */
+  private[this] def toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]): 
RDD[InternalRow] = {
+    toCatalystRDD(relation, relation.output, rdd)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index 7a2b5b9..c6f535d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -25,12 +25,11 @@ import scala.util.Try
 
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.util.Shell
-
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.types._
 
-private[sql] case class Partition(values: Row, path: String)
+private[sql] case class Partition(values: InternalRow, path: String)
 
 private[sql] case class PartitionSpec(partitionColumns: StructType, 
partitions: Seq[Partition])
 
@@ -100,7 +99,7 @@ private[sql] object PartitioningUtils {
       // Finally, we create `Partition`s based on paths and resolved partition 
values.
       val partitions = 
resolvedPartitionValues.zip(pathsWithPartitionValues).map {
         case (PartitionValues(_, literals), (path, _)) =>
-          Partition(Row.fromSeq(literals.map(_.value)), path.toString)
+          Partition(InternalRow.fromSeq(literals.map(_.value)), path.toString)
       }
 
       PartitionSpec(StructType(fields), partitions)

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index c94199b..1763cee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -36,7 +36,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode}
+import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
 
 private[sql] case class InsertIntoDataSource(
     logicalRelation: LogicalRelation,
@@ -44,18 +44,17 @@ private[sql] case class InsertIntoDataSource(
     overwrite: Boolean)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sqlContext: SQLContext): Seq[InternalRow] = {
     val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
     val data = DataFrame(sqlContext, query)
     // Apply the schema of the existing table to the new data.
-    val df = sqlContext.createDataFrame(
-      data.queryExecution.toRdd, logicalRelation.schema, needsConversion = 
false)
+    val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, 
logicalRelation.schema)
     relation.insert(df, overwrite)
 
     // Invalidate the cache.
     sqlContext.cacheManager.invalidateCache(logicalRelation)
 
-    Seq.empty[Row]
+    Seq.empty[InternalRow]
   }
 }
 
@@ -65,7 +64,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
     mode: SaveMode)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sqlContext: SQLContext): Seq[InternalRow] = {
     require(
       relation.paths.length == 1,
       s"Cannot write to multiple destinations: 
${relation.paths.mkString(",")}")
@@ -90,7 +89,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
     if (doInsertion) {
       val job = new Job(hadoopConf)
       job.setOutputKeyClass(classOf[Void])
-      job.setOutputValueClass(classOf[Row])
+      job.setOutputValueClass(classOf[InternalRow])
       FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
 
       // We create a DataFrame by applying the schema of relation to the data 
to make sure.
@@ -103,10 +102,8 @@ private[sql] case class InsertIntoHadoopFsRelation(
         val project = Project(
           relation.schema.map(field => new 
UnresolvedAttribute(Seq(field.name))), query)
 
-        sqlContext.createDataFrame(
-          DataFrame(sqlContext, project).queryExecution.toRdd,
-          relation.schema,
-          needsConversion = false)
+        sqlContext.internalCreateDataFrame(
+          DataFrame(sqlContext, project).queryExecution.toRdd, relation.schema)
       }
 
       val partitionColumns = relation.partitionColumns.fieldNames
@@ -119,7 +116,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
       }
     }
 
-    Seq.empty[Row]
+    Seq.empty[InternalRow]
   }
 
   private def insert(writerContainer: BaseWriterContainer, df: DataFrame): 
Unit = {
@@ -141,22 +138,19 @@ private[sql] case class InsertIntoHadoopFsRelation(
       throw new SparkException("Job aborted.", cause)
     }
 
-    def writeRows(taskContext: TaskContext, iterator: Iterator[Row]): Unit = {
+    def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
       // If anything below fails, we should abort the task.
       try {
         writerContainer.executorSideSetup(taskContext)
 
-        if (needsConversion) {
-          val converter = 
CatalystTypeConverters.createToScalaConverter(dataSchema)
-          while (iterator.hasNext) {
-            val row = converter(iterator.next()).asInstanceOf[Row]
-            writerContainer.outputWriterForRow(row).write(row)
-          }
+        val converter = if (needsConversion) {
+          
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow
 => Row]
         } else {
-          while (iterator.hasNext) {
-            val row = iterator.next()
-            writerContainer.outputWriterForRow(row).write(row)
-          }
+          r: InternalRow => r.asInstanceOf[Row]
+        }
+        while (iterator.hasNext) {
+          val row = converter(iterator.next())
+          writerContainer.outputWriterForRow(row).write(row)
         }
 
         writerContainer.commitTask()
@@ -210,32 +204,28 @@ private[sql] case class InsertIntoHadoopFsRelation(
       throw new SparkException("Job aborted.", cause)
     }
 
-    def writeRows(taskContext: TaskContext, iterator: Iterator[Row]): Unit = {
+    def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): 
Unit = {
       // If anything below fails, we should abort the task.
       try {
         writerContainer.executorSideSetup(taskContext)
 
         val partitionProj = newProjection(codegenEnabled, partitionOutput, 
output)
         val dataProj = newProjection(codegenEnabled, dataOutput, output)
-
-        if (needsConversion) {
-          val converter = 
CatalystTypeConverters.createToScalaConverter(dataSchema)
-          while (iterator.hasNext) {
-            val row = iterator.next()
-            val partitionPart = partitionProj(row)
-            val dataPart = dataProj(row)
-            val convertedDataPart = converter(dataPart).asInstanceOf[Row]
-            
writerContainer.outputWriterForRow(partitionPart).write(convertedDataPart)
-          }
+        val dataConverter: InternalRow => Row = if (needsConversion) {
+          
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow
 => Row]
         } else {
-          val partitionSchema = StructType.fromAttributes(partitionOutput)
-          val converter = 
CatalystTypeConverters.createToScalaConverter(partitionSchema)
-          while (iterator.hasNext) {
-            val row = iterator.next()
-            val partitionPart = converter(partitionProj(row)).asInstanceOf[Row]
-            val dataPart = dataProj(row)
-            writerContainer.outputWriterForRow(partitionPart).write(dataPart)
-          }
+          r: InternalRow => r.asInstanceOf[Row]
+        }
+        val partitionSchema = StructType.fromAttributes(partitionOutput)
+        val partConverter: InternalRow => Row =
+          CatalystTypeConverters.createToScalaConverter(partitionSchema)
+            .asInstanceOf[InternalRow => Row]
+
+        while (iterator.hasNext) {
+          val row = iterator.next()
+          val partitionPart = partConverter(partitionProj(row))
+          val dataPart = dataConverter(dataProj(row))
+          writerContainer.outputWriterForRow(partitionPart).write(dataPart)
         }
 
         writerContainer.commitTask()

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 20afd60..01c67db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -26,7 +26,7 @@ import org.apache.spark.Logging
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Row}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, InternalRow}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.types._
@@ -404,7 +404,7 @@ private[sql] case class CreateTempTableUsing(
     provider: String,
     options: Map[String, String]) extends RunnableCommand {
 
-  def run(sqlContext: SQLContext): Seq[Row] = {
+  def run(sqlContext: SQLContext): Seq[InternalRow] = {
     val resolved = ResolvedDataSource(
       sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
     sqlContext.registerDataFrameAsTable(
@@ -421,7 +421,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
     options: Map[String, String],
     query: LogicalPlan) extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sqlContext: SQLContext): Seq[InternalRow] = {
     val df = DataFrame(sqlContext, query)
     val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, 
mode, options, df)
     sqlContext.registerDataFrameAsTable(
@@ -434,7 +434,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
 private[sql] case class RefreshTable(databaseName: String, tableName: String)
   extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext): Seq[Row] = {
+  override def run(sqlContext: SQLContext): Seq[InternalRow] = {
     // Refresh the given table's metadata first.
     sqlContext.catalog.refreshTable(databaseName, tableName)
 
@@ -453,7 +453,7 @@ private[sql] case class RefreshTable(databaseName: String, 
tableName: String)
       sqlContext.cacheManager.cacheQuery(df, Some(tableName))
     }
 
-    Seq.empty[Row]
+    Seq.empty[InternalRow]
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index d1547fb..27534a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -28,7 +28,8 @@ import org.apache.spark.annotation.{DeveloperApi, 
Experimental}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.SerializableWritable
-import org.apache.spark.sql.{Row, _}
+import org.apache.spark.sql.execution.RDDConversions
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.types.StructType
@@ -195,6 +196,8 @@ abstract class BaseRelation {
    *  java.lang.String -> UTF8String
    *  java.lang.Decimal -> Decimal
    *
+   * If `needConversion` is `false`, buildScan() should return an [[RDD]] of 
[[InternalRow]]
+   *
    * Note: The internal representation is not stable across releases and thus 
data sources outside
    * of Spark SQL should leave this as true.
    *
@@ -443,7 +446,7 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
               val castedValues = partitionSchema.zip(literals).map { case 
(field, literal) =>
                 Cast(literal, field.dataType).eval()
               }
-              p.copy(values = Row.fromSeq(castedValues))
+              p.copy(values = InternalRow.fromSeq(castedValues))
             }
             PartitionSpec(partitionSchema, castedPartitions)
           }
@@ -579,15 +582,21 @@ abstract class HadoopFsRelation 
private[sql](maybePartitionSpec: Option[Partitio
       BoundReference(dataSchema.fieldIndex(col), field.dataType, 
field.nullable)
     }.toSeq
 
-    buildScan(inputFiles).mapPartitions { rows =>
+    val rdd = buildScan(inputFiles)
+    val converted =
+      if (needConversion) {
+        RDDConversions.rowToRowRdd(rdd, dataSchema.fields.map(_.dataType))
+      } else {
+        rdd.map(_.asInstanceOf[InternalRow])
+      }
+    converted.mapPartitions { rows =>
       val buildProjection = if (codegenEnabled) {
         GenerateMutableProjection.generate(requiredOutput, 
dataSchema.toAttributes)
       } else {
         () => new InterpretedMutableProjection(requiredOutput, 
dataSchema.toAttributes)
       }
-
       val mutableProjection = buildProjection()
-      rows.map(mutableProjection)
+      rows.map(r => mutableProjection(r).asInstanceOf[Row])
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 17a3cec..eb3e913 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -94,7 +94,7 @@ class CachedTableSuite extends QueryTest {
   }
 
   test("too big for memory") {
-    val data = "*" * 10000
+    val data = "*" * 1000
     ctx.sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).toDF()
       .registerTempTable("bigData")
     ctx.table("bigData").persist(StorageLevel.MEMORY_AND_DISK)

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
index 1683662..1f37455 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
@@ -18,25 +18,28 @@
 package org.apache.spark.sql.columnar
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.expressions.InternalRow
 import org.apache.spark.sql.types._
 
 class ColumnStatsSuite extends SparkFunSuite {
-  testColumnStats(classOf[ByteColumnStats], BYTE, Row(Byte.MaxValue, 
Byte.MinValue, 0))
-  testColumnStats(classOf[ShortColumnStats], SHORT, Row(Short.MaxValue, 
Short.MinValue, 0))
-  testColumnStats(classOf[IntColumnStats], INT, Row(Int.MaxValue, 
Int.MinValue, 0))
-  testColumnStats(classOf[LongColumnStats], LONG, Row(Long.MaxValue, 
Long.MinValue, 0))
-  testColumnStats(classOf[FloatColumnStats], FLOAT, Row(Float.MaxValue, 
Float.MinValue, 0))
-  testColumnStats(classOf[DoubleColumnStats], DOUBLE, Row(Double.MaxValue, 
Double.MinValue, 0))
-  testColumnStats(classOf[FixedDecimalColumnStats], FIXED_DECIMAL(15, 10), 
Row(null, null, 0))
-  testColumnStats(classOf[StringColumnStats], STRING, Row(null, null, 0))
-  testColumnStats(classOf[DateColumnStats], DATE, Row(Int.MaxValue, 
Int.MinValue, 0))
-  testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(Long.MaxValue, 
Long.MinValue, 0))
+  testColumnStats(classOf[ByteColumnStats], BYTE, InternalRow(Byte.MaxValue, 
Byte.MinValue, 0))
+  testColumnStats(classOf[ShortColumnStats], SHORT, 
InternalRow(Short.MaxValue, Short.MinValue, 0))
+  testColumnStats(classOf[IntColumnStats], INT, InternalRow(Int.MaxValue, 
Int.MinValue, 0))
+  testColumnStats(classOf[LongColumnStats], LONG, InternalRow(Long.MaxValue, 
Long.MinValue, 0))
+  testColumnStats(classOf[FloatColumnStats], FLOAT, 
InternalRow(Float.MaxValue, Float.MinValue, 0))
+  testColumnStats(classOf[DoubleColumnStats], DOUBLE,
+    InternalRow(Double.MaxValue, Double.MinValue, 0))
+  testColumnStats(classOf[FixedDecimalColumnStats],
+    FIXED_DECIMAL(15, 10), InternalRow(null, null, 0))
+  testColumnStats(classOf[StringColumnStats], STRING, InternalRow(null, null, 
0))
+  testColumnStats(classOf[DateColumnStats], DATE, InternalRow(Int.MaxValue, 
Int.MinValue, 0))
+  testColumnStats(classOf[TimestampColumnStats], TIMESTAMP,
+    InternalRow(Long.MaxValue, Long.MinValue, 0))
 
   def testColumnStats[T <: AtomicType, U <: ColumnStats](
       columnStatsClass: Class[U],
       columnType: NativeColumnType[T],
-      initialStatistics: Row): Unit = {
+      initialStatistics: InternalRow): Unit = {
 
     val columnStatsName = columnStatsClass.getSimpleName
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
index 1bc7eb3..7c86eae 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
@@ -19,14 +19,11 @@ package org.apache.spark.sql.columnar
 
 import scala.collection.immutable.HashSet
 import scala.util.Random
-
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.types.{AtomicType, DataType, Decimal}
 import org.apache.spark.sql.types.{DataType, Decimal, AtomicType}
 import org.apache.spark.unsafe.types.UTF8String
 
-
 object ColumnarTestUtils {
   def makeNullRow(length: Int): GenericMutableRow = {
     val row = new GenericMutableRow(length)
@@ -79,9 +76,9 @@ object ColumnarTestUtils {
 
   def makeRandomRow(
       head: ColumnType[_ <: DataType, _],
-      tail: ColumnType[_ <: DataType, _]*): Row = makeRandomRow(Seq(head) ++ 
tail)
+      tail: ColumnType[_ <: DataType, _]*): InternalRow = 
makeRandomRow(Seq(head) ++ tail)
 
-  def makeRandomRow(columnTypes: Seq[ColumnType[_ <: DataType, _]]): Row = {
+  def makeRandomRow(columnTypes: Seq[ColumnType[_ <: DataType, _]]): 
InternalRow = {
     val row = new GenericMutableRow(columnTypes.length)
     makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) =>
       row(index) = value

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index fa3b814..12f95eb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -20,9 +20,8 @@ package org.apache.spark.sql.columnar
 import java.sql.{Date, Timestamp}
 
 import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{QueryTest, TestData}
+import org.apache.spark.sql.{QueryTest, Row, TestData}
 import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
 
 class InMemoryColumnarQuerySuite extends QueryTest {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
index 20d65a7..f606e21 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
@@ -18,10 +18,10 @@
 package org.apache.spark.sql.columnar.compression
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.columnar.{NoopColumnStats, BOOLEAN}
 import org.apache.spark.sql.columnar.ColumnarTestUtils._
+import org.apache.spark.sql.columnar.{BOOLEAN, NoopColumnStats}
 
 class BooleanBitSetSuite extends SparkFunSuite {
   import BooleanBitSet._
@@ -32,7 +32,7 @@ class BooleanBitSetSuite extends SparkFunSuite {
     // -------------
 
     val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, 
BooleanBitSet)
-    val rows = Seq.fill[Row](count)(makeRandomRow(BOOLEAN))
+    val rows = Seq.fill[InternalRow](count)(makeRandomRow(BOOLEAN))
     val values = rows.map(_(0))
 
     rows.foreach(builder.appendFrom(_, 0))

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 45a7e8f..3e27f58 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -18,16 +18,15 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{SQLConf, execution}
-import org.apache.spark.sql.functions._
 import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, 
ShuffledHashJoin}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.TestSQLContext._
 import org.apache.spark.sql.test.TestSQLContext.implicits._
 import org.apache.spark.sql.test.TestSQLContext.planner._
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SQLConf, execution}
 
 
 class PlannerSuite extends SparkFunSuite {

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 5290c28..71db6a2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.joins
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{Projection, Row}
+import org.apache.spark.sql.catalyst.expressions.{Projection, InternalRow}
 import org.apache.spark.util.collection.CompactBuffer
 
 
@@ -26,37 +26,37 @@ class HashedRelationSuite extends SparkFunSuite {
 
   // Key is simply the record itself
   private val keyProjection = new Projection {
-    override def apply(row: Row): Row = row
+    override def apply(row: InternalRow): InternalRow = row
   }
 
   test("GeneralHashedRelation") {
-    val data = Array(Row(0), Row(1), Row(2), Row(2))
+    val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), 
InternalRow(2))
     val hashed = HashedRelation(data.iterator, keyProjection)
     assert(hashed.isInstanceOf[GeneralHashedRelation])
 
-    assert(hashed.get(data(0)) == CompactBuffer[Row](data(0)))
-    assert(hashed.get(data(1)) == CompactBuffer[Row](data(1)))
-    assert(hashed.get(Row(10)) === null)
+    assert(hashed.get(data(0)) == CompactBuffer[InternalRow](data(0)))
+    assert(hashed.get(data(1)) == CompactBuffer[InternalRow](data(1)))
+    assert(hashed.get(InternalRow(10)) === null)
 
-    val data2 = CompactBuffer[Row](data(2))
+    val data2 = CompactBuffer[InternalRow](data(2))
     data2 += data(2)
     assert(hashed.get(data(2)) == data2)
   }
 
   test("UniqueKeyHashedRelation") {
-    val data = Array(Row(0), Row(1), Row(2))
+    val data = Array(InternalRow(0), InternalRow(1), InternalRow(2))
     val hashed = HashedRelation(data.iterator, keyProjection)
     assert(hashed.isInstanceOf[UniqueKeyHashedRelation])
 
-    assert(hashed.get(data(0)) == CompactBuffer[Row](data(0)))
-    assert(hashed.get(data(1)) == CompactBuffer[Row](data(1)))
-    assert(hashed.get(data(2)) == CompactBuffer[Row](data(2)))
-    assert(hashed.get(Row(10)) === null)
+    assert(hashed.get(data(0)) == CompactBuffer[InternalRow](data(0)))
+    assert(hashed.get(data(1)) == CompactBuffer[InternalRow](data(1)))
+    assert(hashed.get(data(2)) == CompactBuffer[InternalRow](data(2)))
+    assert(hashed.get(InternalRow(10)) === null)
 
     val uniqHashed = hashed.asInstanceOf[UniqueKeyHashedRelation]
     assert(uniqHashed.getValue(data(0)) == data(0))
     assert(uniqHashed.getValue(data(1)) == data(1))
     assert(uniqHashed.getValue(data(2)) == data(2))
-    assert(uniqHashed.getValue(Row(10)) == null)
+    assert(uniqHashed.getValue(InternalRow(10)) == null)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 17f5f9a..fa5d4ec 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{Column, DataFrame, QueryTest, SQLConf}
+import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
 
 /**
  * A test suite that tests Parquet filter2 API based filter pushdown 
optimization.

http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 46b2585..fc827bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -35,11 +35,10 @@ import org.apache.parquet.schema.{MessageType, 
MessageTypeParser}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode}
 
 // Write support class for nested groups: ParquetWriter initializes 
GroupWriteSupport
 // with an empty configuration (it is after all not intended to be used in 
this way?)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to