http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index ab84c12..e18c817 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.joins import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.util.{HashMap => JavaHashMap} -import org.apache.spark.sql.catalyst.expressions.{Projection, Row} +import org.apache.spark.sql.catalyst.expressions.{Projection, InternalRow} import org.apache.spark.sql.execution.SparkSqlSerializer import org.apache.spark.util.collection.CompactBuffer @@ -30,7 +30,7 @@ import org.apache.spark.util.collection.CompactBuffer * object. */ private[joins] sealed trait HashedRelation { - def get(key: Row): CompactBuffer[Row] + def get(key: InternalRow): CompactBuffer[InternalRow] // This is a helper method to implement Externalizable, and is used by // GeneralHashedRelation and UniqueKeyHashedRelation @@ -54,12 +54,12 @@ private[joins] sealed trait HashedRelation { * A general [[HashedRelation]] backed by a hash map that maps the key into a sequence of values. */ private[joins] final class GeneralHashedRelation( - private var hashTable: JavaHashMap[Row, CompactBuffer[Row]]) + private var hashTable: JavaHashMap[InternalRow, CompactBuffer[InternalRow]]) extends HashedRelation with Externalizable { def this() = this(null) // Needed for serialization - override def get(key: Row): CompactBuffer[Row] = hashTable.get(key) + override def get(key: InternalRow): CompactBuffer[InternalRow] = hashTable.get(key) override def writeExternal(out: ObjectOutput): Unit = { writeBytes(out, SparkSqlSerializer.serialize(hashTable)) @@ -75,17 +75,18 @@ private[joins] final class GeneralHashedRelation( * A specialized [[HashedRelation]] that maps key into a single value. This implementation * assumes the key is unique. */ -private[joins] final class UniqueKeyHashedRelation(private var hashTable: JavaHashMap[Row, Row]) +private[joins] +final class UniqueKeyHashedRelation(private var hashTable: JavaHashMap[InternalRow, InternalRow]) extends HashedRelation with Externalizable { def this() = this(null) // Needed for serialization - override def get(key: Row): CompactBuffer[Row] = { + override def get(key: InternalRow): CompactBuffer[InternalRow] = { val v = hashTable.get(key) if (v eq null) null else CompactBuffer(v) } - def getValue(key: Row): Row = hashTable.get(key) + def getValue(key: InternalRow): InternalRow = hashTable.get(key) override def writeExternal(out: ObjectOutput): Unit = { writeBytes(out, SparkSqlSerializer.serialize(hashTable)) @@ -103,13 +104,13 @@ private[joins] final class UniqueKeyHashedRelation(private var hashTable: JavaHa private[joins] object HashedRelation { def apply( - input: Iterator[Row], + input: Iterator[InternalRow], keyGenerator: Projection, sizeEstimate: Int = 64): HashedRelation = { // TODO: Use Spark's HashMap implementation. - val hashTable = new JavaHashMap[Row, CompactBuffer[Row]](sizeEstimate) - var currentRow: Row = null + val hashTable = new JavaHashMap[InternalRow, CompactBuffer[InternalRow]](sizeEstimate) + var currentRow: InternalRow = null // Whether the join key is unique. If the key is unique, we can convert the underlying // hash map into one specialized for this. @@ -122,7 +123,7 @@ private[joins] object HashedRelation { if (!rowKey.anyNull) { val existingMatchList = hashTable.get(rowKey) val matchList = if (existingMatchList == null) { - val newMatchList = new CompactBuffer[Row]() + val newMatchList = new CompactBuffer[InternalRow]() hashTable.put(rowKey, newMatchList) newMatchList } else { @@ -134,7 +135,7 @@ private[joins] object HashedRelation { } if (keyIsUnique) { - val uniqHashTable = new JavaHashMap[Row, Row](hashTable.size) + val uniqHashTable = new JavaHashMap[InternalRow, InternalRow](hashTable.size) val iter = hashTable.entrySet().iterator() while (iter.hasNext) { val entry = iter.next()
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala index 036423e..2a6d4d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinBNL.scala @@ -47,7 +47,7 @@ case class LeftSemiJoinBNL( @transient private lazy val boundCondition = newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output) - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { val broadcastedRelation = sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala index 8ad27ea..20d7427 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/LeftSemiJoinHash.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, InternalRow} import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} @@ -42,10 +42,10 @@ case class LeftSemiJoinHash( override def output: Seq[Attribute] = left.output - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => - val hashSet = new java.util.HashSet[Row]() - var currentRow: Row = null + val hashSet = new java.util.HashSet[InternalRow]() + var currentRow: InternalRow = null // Create a Hash set of buildKeys while (buildIter.hasNext) { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala index 219525d..5439e10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoin.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Partitioning} import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} @@ -43,7 +43,7 @@ case class ShuffledHashJoin( override def requiredChildDistribution: Seq[ClusteredDistribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => val hashed = HashedRelation(buildIter, buildSideKeyGenerator) hashJoin(streamIter, hashed) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index 1a39fb4..2abe65a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -21,9 +21,7 @@ import java.util.NoSuchElementException import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} import org.apache.spark.util.collection.CompactBuffer @@ -60,29 +58,29 @@ case class SortMergeJoin( private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = keys.map(SortOrder(_, Ascending)) - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { val leftResults = left.execute().map(_.copy()) val rightResults = right.execute().map(_.copy()) leftResults.zipPartitions(rightResults) { (leftIter, rightIter) => - new Iterator[Row] { + new Iterator[InternalRow] { // Mutable per row objects. private[this] val joinRow = new JoinedRow5 - private[this] var leftElement: Row = _ - private[this] var rightElement: Row = _ - private[this] var leftKey: Row = _ - private[this] var rightKey: Row = _ - private[this] var rightMatches: CompactBuffer[Row] = _ + private[this] var leftElement: InternalRow = _ + private[this] var rightElement: InternalRow = _ + private[this] var leftKey: InternalRow = _ + private[this] var rightKey: InternalRow = _ + private[this] var rightMatches: CompactBuffer[InternalRow] = _ private[this] var rightPosition: Int = -1 private[this] var stop: Boolean = false - private[this] var matchKey: Row = _ + private[this] var matchKey: InternalRow = _ // initialize iterator initialize() override final def hasNext: Boolean = nextMatchingPair() - override final def next(): Row = { + override final def next(): InternalRow = { if (hasNext) { // we are using the buffered right rows and run down left iterator val joinedRow = joinRow(leftElement, rightMatches(rightPosition)) @@ -145,7 +143,7 @@ case class SortMergeJoin( fetchLeft() } } - rightMatches = new CompactBuffer[Row]() + rightMatches = new CompactBuffer[InternalRow]() if (stop) { stop = false // iterate the right side to buffer all rows that matches http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 2b45a83..1ce150c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -29,7 +29,8 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.{PythonBroadcast, PythonRDD} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Row, _} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule @@ -56,7 +57,7 @@ private[spark] case class PythonUDF( def nullable: Boolean = true - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { throw new UnsupportedOperationException("PythonUDFs can not be directly evaluated.") } } @@ -241,7 +242,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: def children: Seq[SparkPlan] = child :: Nil - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { val childResults = child.execute().map(_.copy()) val parent = childResults.mapPartitions { iter => @@ -276,7 +277,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: val row = new GenericMutableRow(1) iter.map { result => row(0) = EvaluatePython.fromJava(result, udf.dataType) - row: Row + row: InternalRow } } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala index c41c21c..8df1da0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.stat import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.Logging -import org.apache.spark.sql.{Column, DataFrame, Row} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.types.{ArrayType, StructField, StructType} +import org.apache.spark.sql.{Column, DataFrame} private[sql] object FrequentItems extends Logging { @@ -110,7 +111,7 @@ private[sql] object FrequentItems extends Logging { } ) val justItems = freqItems.map(m => m.baseMap.keys.toSeq) - val resultRow = Row(justItems : _*) + val resultRow = InternalRow(justItems : _*) // append frequent Items to the column name for easy debugging val outputCols = colInfo.map { v => StructField(v._1 + "_freqItems", ArrayType(v._2, false)) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index e75e668..667fc70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -24,7 +24,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow} +import org.apache.spark.sql.catalyst.expressions.{InternalRow, SpecificMutableRow} import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.sources._ @@ -211,7 +211,7 @@ private[sql] object JDBCRDD extends Logging { fqTable: String, requiredColumns: Array[String], filters: Array[Filter], - parts: Array[Partition]): RDD[Row] = { + parts: Array[Partition]): RDD[InternalRow] = { val dialect = JdbcDialects.get(url) val quotedColumns = requiredColumns.map(colName => dialect.quoteIdentifier(colName)) new JDBCRDD( @@ -240,7 +240,7 @@ private[sql] class JDBCRDD( filters: Array[Filter], partitions: Array[Partition], properties: Properties) - extends RDD[Row](sc, Nil) { + extends RDD[InternalRow](sc, Nil) { /** * Retrieve the list of partitions corresponding to this RDD. @@ -348,12 +348,12 @@ private[sql] class JDBCRDD( /** * Runs the SQL query against the JDBC driver. */ - override def compute(thePart: Partition, context: TaskContext): Iterator[Row] = new Iterator[Row] - { + override def compute(thePart: Partition, context: TaskContext): Iterator[InternalRow] = + new Iterator[InternalRow] { var closed = false var finished = false var gotNext = false - var nextValue: Row = null + var nextValue: InternalRow = null context.addTaskCompletionListener{ context => close() } val part = thePart.asInstanceOf[JDBCPartition] @@ -375,7 +375,7 @@ private[sql] class JDBCRDD( val conversions = getConversions(schema) val mutableRow = new SpecificMutableRow(schema.fields.map(x => x.dataType)) - def getNext(): Row = { + def getNext(): InternalRow = { if (rs.next()) { var i = 0 while (i < conversions.length) { @@ -443,7 +443,7 @@ private[sql] class JDBCRDD( mutableRow } else { finished = true - null.asInstanceOf[Row] + null.asInstanceOf[InternalRow] } } @@ -486,7 +486,7 @@ private[sql] class JDBCRDD( !finished } - override def next(): Row = { + override def next(): InternalRow = { if (!hasNext) { throw new NoSuchElementException("End of stream") } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala index 30f9190..4d3aac4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala @@ -23,10 +23,9 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.Partition import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext} -import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} /** * Instructions on how to partition the table among workers. @@ -138,7 +137,7 @@ private[sql] case class JDBCRelation( table, requiredColumns, filters, - parts) + parts).map(_.asInstanceOf[Row]) } override def insert(data: DataFrame, overwrite: Boolean): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index c772cd1..69bf13e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -22,10 +22,10 @@ import java.io.IOException import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute, Row} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{StructField, StructType} -import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} private[sql] class DefaultSource @@ -154,12 +154,12 @@ private[sql] class JSONRelation( JacksonParser( baseRDD(), schema, - sqlContext.conf.columnNameOfCorruptRecord) + sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row]) } else { JsonRDD.jsonStringToRow( baseRDD(), schema, - sqlContext.conf.columnNameOfCorruptRecord) + sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row]) } } @@ -168,12 +168,12 @@ private[sql] class JSONRelation( JacksonParser( baseRDD(), StructType.fromAttributes(requiredColumns), - sqlContext.conf.columnNameOfCorruptRecord) + sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row]) } else { JsonRDD.jsonStringToRow( baseRDD(), StructType.fromAttributes(requiredColumns), - sqlContext.conf.columnNameOfCorruptRecord) + sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row]) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala index 325f54b..1e6b119 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonGenerator.scala @@ -21,7 +21,7 @@ import scala.collection.Map import com.fasterxml.jackson.core._ -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.Row import org.apache.spark.sql.types._ private[sql] object JacksonGenerator { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala index f16075c..817e8a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala @@ -35,7 +35,7 @@ private[sql] object JacksonParser { def apply( json: RDD[String], schema: StructType, - columnNameOfCorruptRecords: String): RDD[Row] = { + columnNameOfCorruptRecords: String): RDD[InternalRow] = { parseJson(json, schema, columnNameOfCorruptRecords) } @@ -130,7 +130,10 @@ private[sql] object JacksonParser { * * Fields in the json that are not defined in the requested schema will be dropped. */ - private def convertObject(factory: JsonFactory, parser: JsonParser, schema: StructType): Row = { + private def convertObject( + factory: JsonFactory, + parser: JsonParser, + schema: StructType): InternalRow = { val row = new GenericMutableRow(schema.length) while (nextUntil(parser, JsonToken.END_OBJECT)) { schema.getFieldIndex(parser.getCurrentName) match { @@ -176,9 +179,9 @@ private[sql] object JacksonParser { private def parseJson( json: RDD[String], schema: StructType, - columnNameOfCorruptRecords: String): RDD[Row] = { + columnNameOfCorruptRecords: String): RDD[InternalRow] = { - def failedRecord(record: String): Seq[Row] = { + def failedRecord(record: String): Seq[InternalRow] = { // create a row even if no corrupt record column is present val row = new GenericMutableRow(schema.length) for (corruptIndex <- schema.getFieldIndex(columnNameOfCorruptRecords)) { @@ -202,7 +205,7 @@ private[sql] object JacksonParser { // convertField wrap an object into a single value array when necessary. convertField(factory, parser, ArrayType(schema)) match { case null => failedRecord(record) - case list: Seq[Row @unchecked] => list + case list: Seq[InternalRow @unchecked] => list case _ => sys.error( s"Failed to parse record $record. Please make sure that each line of the file " + http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index e4acf1d..44594c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -38,7 +38,7 @@ private[sql] object JsonRDD extends Logging { private[sql] def jsonStringToRow( json: RDD[String], schema: StructType, - columnNameOfCorruptRecords: String): RDD[Row] = { + columnNameOfCorruptRecords: String): RDD[InternalRow] = { parseJson(json, columnNameOfCorruptRecords).map(parsed => asRow(parsed, schema)) } @@ -434,7 +434,7 @@ private[sql] object JsonRDD extends Logging { } } - private def asRow(json: Map[String, Any], schema: StructType): Row = { + private def asRow(json: Map[String, Any], schema: StructType): InternalRow = { // TODO: Reuse the row instead of creating a new one for every record. val row = new GenericMutableRow(schema.fields.length) schema.fields.zipWithIndex.foreach { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala index ab9f878..4da5e96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala @@ -79,7 +79,7 @@ private[sql] object CatalystConverter { // TODO: consider using Array[T] for arrays to avoid boxing of primitive types type ArrayScalaType[T] = Seq[T] - type StructScalaType[T] = Row + type StructScalaType[T] = InternalRow type MapScalaType[K, V] = Map[K, V] protected[parquet] def createConverter( @@ -240,7 +240,7 @@ private[parquet] abstract class CatalystConverter extends GroupConverter { * * @return */ - def getCurrentRecord: Row = throw new UnsupportedOperationException + def getCurrentRecord: InternalRow = throw new UnsupportedOperationException /** * Read a decimal value from a Parquet Binary into "dest". Only supports decimals that fit in @@ -275,7 +275,7 @@ private[parquet] abstract class CatalystConverter extends GroupConverter { /** * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record - * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. + * to a [[org.apache.spark.sql.catalyst.expressions.InternalRow]] object. * * @param schema The corresponding Catalyst schema in the form of a list of attributes. */ @@ -284,7 +284,7 @@ private[parquet] class CatalystGroupConverter( protected[parquet] val index: Int, protected[parquet] val parent: CatalystConverter, protected[parquet] var current: ArrayBuffer[Any], - protected[parquet] var buffer: ArrayBuffer[Row]) + protected[parquet] var buffer: ArrayBuffer[InternalRow]) extends CatalystConverter { def this(schema: Array[FieldType], index: Int, parent: CatalystConverter) = @@ -293,7 +293,7 @@ private[parquet] class CatalystGroupConverter( index, parent, current = null, - buffer = new ArrayBuffer[Row]( + buffer = new ArrayBuffer[InternalRow]( CatalystArrayConverter.INITIAL_ARRAY_SIZE)) /** @@ -309,7 +309,7 @@ private[parquet] class CatalystGroupConverter( override val size = schema.size - override def getCurrentRecord: Row = { + override def getCurrentRecord: InternalRow = { assert(isRootConverter, "getCurrentRecord should only be called in root group converter!") // TODO: use iterators if possible // Note: this will ever only be called in the root converter when the record has been @@ -347,7 +347,7 @@ private[parquet] class CatalystGroupConverter( /** * A `parquet.io.api.GroupConverter` that is able to convert a Parquet record - * to a [[org.apache.spark.sql.catalyst.expressions.Row]] object. Note that his + * to a [[org.apache.spark.sql.catalyst.expressions.InternalRow]] object. Note that his * converter is optimized for rows of primitive types (non-nested records). */ private[parquet] class CatalystPrimitiveRowConverter( @@ -373,7 +373,7 @@ private[parquet] class CatalystPrimitiveRowConverter( override val parent = null // Should be only called in root group converter! - override def getCurrentRecord: Row = current + override def getCurrentRecord: InternalRow = current override def getConverter(fieldIndex: Int): Converter = converters(fieldIndex) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 272608d..39360e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -46,7 +46,7 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLConf -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row, _} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, InternalRow, _} import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} import org.apache.spark.sql.types.StructType import org.apache.spark.{Logging, SerializableWritable, TaskContext} @@ -54,7 +54,7 @@ import org.apache.spark.{Logging, SerializableWritable, TaskContext} /** * :: DeveloperApi :: * Parquet table scan operator. Imports the file that backs the given - * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``. + * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[InternalRow]``. */ private[sql] case class ParquetTableScan( attributes: Seq[Attribute], @@ -77,7 +77,7 @@ private[sql] case class ParquetTableScan( } }.toArray - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat val sc = sqlContext.sparkContext @@ -125,7 +125,7 @@ private[sql] case class ParquetTableScan( sc, classOf[FilteringParquetRowInputFormat], classOf[Void], - classOf[Row], + classOf[InternalRow], conf) if (requestedPartitionOrdinals.nonEmpty) { @@ -154,9 +154,9 @@ private[sql] case class ParquetTableScan( .map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow)) if (primitiveRow) { - new Iterator[Row] { + new Iterator[InternalRow] { def hasNext: Boolean = iter.hasNext - def next(): Row = { + def next(): InternalRow = { // We are using CatalystPrimitiveRowConverter and it returns a SpecificMutableRow. val row = iter.next()._2.asInstanceOf[SpecificMutableRow] @@ -173,12 +173,12 @@ private[sql] case class ParquetTableScan( } else { // Create a mutable row since we need to fill in values from partition columns. val mutableRow = new GenericMutableRow(outputSize) - new Iterator[Row] { + new Iterator[InternalRow] { def hasNext: Boolean = iter.hasNext - def next(): Row = { + def next(): InternalRow = { // We are using CatalystGroupConverter and it returns a GenericRow. // Since GenericRow is not mutable, we just cast it to a Row. - val row = iter.next()._2.asInstanceOf[Row] + val row = iter.next()._2.asInstanceOf[InternalRow] var i = 0 while (i < row.size) { @@ -258,7 +258,7 @@ private[sql] case class InsertIntoParquetTable( /** * Inserts all rows into the Parquet file. */ - protected override def doExecute(): RDD[Row] = { + protected override def doExecute(): RDD[InternalRow] = { // TODO: currently we do not check whether the "schema"s are compatible // That means if one first creates a table and then INSERTs data with // and incompatible schema the execution will fail. It would be nice @@ -321,13 +321,13 @@ private[sql] case class InsertIntoParquetTable( * @param conf A [[org.apache.hadoop.conf.Configuration]]. */ private def saveAsHadoopFile( - rdd: RDD[Row], + rdd: RDD[InternalRow], path: String, conf: Configuration) { val job = new Job(conf) val keyType = classOf[Void] job.setOutputKeyClass(keyType) - job.setOutputValueClass(classOf[Row]) + job.setOutputValueClass(classOf[InternalRow]) NewFileOutputFormat.setOutputPath(job, new Path(path)) val wrappedConf = new SerializableWritable(job.getConfiguration) val formatter = new SimpleDateFormat("yyyyMMddHHmm") @@ -342,7 +342,7 @@ private[sql] case class InsertIntoParquetTable( .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1 } - def writeShard(context: TaskContext, iter: Iterator[Row]): Int = { + def writeShard(context: TaskContext, iter: Iterator[InternalRow]): Int = { /* "reduce task" <split #> <attempt # = spark task #> */ val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, context.attemptNumber) @@ -381,7 +381,7 @@ private[sql] case class InsertIntoParquetTable( * to imported ones. */ private[parquet] class AppendingParquetOutputFormat(offset: Int) - extends org.apache.parquet.hadoop.ParquetOutputFormat[Row] { + extends org.apache.parquet.hadoop.ParquetOutputFormat[InternalRow] { // override to accept existing directories as valid output directory override def checkOutputSpecs(job: JobContext): Unit = {} var committer: OutputCommitter = null @@ -434,25 +434,25 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int) * RecordFilter we want to use. */ private[parquet] class FilteringParquetRowInputFormat - extends org.apache.parquet.hadoop.ParquetInputFormat[Row] with Logging { + extends org.apache.parquet.hadoop.ParquetInputFormat[InternalRow] with Logging { private var fileStatuses = Map.empty[Path, FileStatus] override def createRecordReader( inputSplit: InputSplit, - taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = { + taskAttemptContext: TaskAttemptContext): RecordReader[Void, InternalRow] = { import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter - val readSupport: ReadSupport[Row] = new RowReadSupport() + val readSupport: ReadSupport[InternalRow] = new RowReadSupport() val filter = ParquetInputFormat.getFilter(ContextUtil.getConfiguration(taskAttemptContext)) if (!filter.isInstanceOf[NoOpFilter]) { - new ParquetRecordReader[Row]( + new ParquetRecordReader[InternalRow]( readSupport, filter) } else { - new ParquetRecordReader[Row](readSupport) + new ParquetRecordReader[InternalRow](readSupport) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index c62c592..a8775a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -28,7 +28,7 @@ import org.apache.parquet.io.api._ import org.apache.parquet.schema.MessageType import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.expressions.{Attribute, Row} +import org.apache.spark.sql.catalyst.expressions.{Attribute, InternalRow} import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -39,12 +39,12 @@ import org.apache.spark.unsafe.types.UTF8String *@param root The root group converter for the record. */ private[parquet] class RowRecordMaterializer(root: CatalystConverter) - extends RecordMaterializer[Row] { + extends RecordMaterializer[InternalRow] { def this(parquetSchema: MessageType, attributes: Seq[Attribute]) = this(CatalystConverter.createRootConverter(parquetSchema, attributes)) - override def getCurrentRecord: Row = root.getCurrentRecord + override def getCurrentRecord: InternalRow = root.getCurrentRecord override def getRootConverter: GroupConverter = root.asInstanceOf[GroupConverter] } @@ -52,13 +52,13 @@ private[parquet] class RowRecordMaterializer(root: CatalystConverter) /** * A `parquet.hadoop.api.ReadSupport` for Row objects. */ -private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { +private[parquet] class RowReadSupport extends ReadSupport[InternalRow] with Logging { override def prepareForRead( conf: Configuration, stringMap: java.util.Map[String, String], fileSchema: MessageType, - readContext: ReadContext): RecordMaterializer[Row] = { + readContext: ReadContext): RecordMaterializer[InternalRow] = { log.debug(s"preparing for read with Parquet file schema $fileSchema") // Note: this very much imitates AvroParquet val parquetSchema = readContext.getRequestedSchema @@ -133,7 +133,7 @@ private[parquet] object RowReadSupport { /** * A `parquet.hadoop.api.WriteSupport` for Row objects. */ -private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { +private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Logging { private[parquet] var writer: RecordConsumer = null private[parquet] var attributes: Array[Attribute] = null @@ -157,7 +157,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { log.debug(s"preparing for write with schema $attributes") } - override def write(record: Row): Unit = { + override def write(record: InternalRow): Unit = { val attributesSize = attributes.size if (attributesSize > record.size) { throw new IndexOutOfBoundsException( @@ -322,7 +322,7 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging { // Optimized for non-nested rows private[parquet] class MutableRowWriteSupport extends RowWriteSupport { - override def write(record: Row): Unit = { + override def write(record: InternalRow): Unit = { val attributesSize = attributes.size if (attributesSize > record.size) { throw new IndexOutOfBoundsException( @@ -345,7 +345,7 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport { private def consumeType( ctype: DataType, - record: Row, + record: InternalRow, index: Int): Unit = { ctype match { case StringType => writer.addBinary( http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 7af4eb1..bc27a9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -34,15 +34,15 @@ import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.hadoop.util.ContextUtil +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.{Partition => SparkPartition, SerializableWritable, Logging, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.spark.sql.{Row, SQLConf, SQLContext} import org.apache.spark.util.Utils private[sql] class DefaultSource extends HadoopFsRelationProvider { @@ -60,7 +60,7 @@ private[sql] class DefaultSource extends HadoopFsRelationProvider { private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext) extends OutputWriter { - private val recordWriter: RecordWriter[Void, Row] = { + private val recordWriter: RecordWriter[Void, InternalRow] = { val conf = context.getConfiguration val outputFormat = { // When appending new Parquet files to an existing Parquet file directory, to avoid @@ -93,7 +93,7 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext } } - new ParquetOutputFormat[Row]() { + new ParquetOutputFormat[InternalRow]() { // Here we override `getDefaultWorkFile` for two reasons: // // 1. To allow appending. We need to generate output file name based on the max available @@ -112,7 +112,7 @@ private[sql] class ParquetOutputWriter(path: String, context: TaskAttemptContext outputFormat.getRecordWriter(context) } - override def write(row: Row): Unit = recordWriter.write(null, row) + override def write(row: Row): Unit = recordWriter.write(null, row.asInstanceOf[InternalRow]) override def close(): Unit = recordWriter.close(context) } @@ -286,7 +286,7 @@ private[sql] class ParquetRelation2( initLocalJobFuncOpt = Some(initLocalJobFuncOpt), inputFormatClass = classOf[FilteringParquetRowInputFormat], keyClass = classOf[Void], - valueClass = classOf[Row]) { + valueClass = classOf[InternalRow]) { val cacheMetadata = useMetadataCache @@ -331,7 +331,7 @@ private[sql] class ParquetRelation2( new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) } } - }.values + }.values.map(_.asInstanceOf[Row]) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index edda3f2..4cf6743 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -17,48 +17,48 @@ package org.apache.spark.sql.sources -import org.apache.spark.{Logging, SerializableWritable, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{MapPartitionsRDD, RDD, UnionRDD} +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{SaveMode, Strategy, execution, sources} import org.apache.spark.util.Utils import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.{Logging, SerializableWritable, TaskContext} /** * A Strategy for planning scans over data sources defined using the sources API. */ private[sql] object DataSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { - case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: CatalystScan)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan)) => pruneFilterProjectRaw( l, - projectList, + projects, filters, - (a, f) => t.buildScan(a, f)) :: Nil + (a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil - case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: PrunedFilteredScan)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan)) => pruneFilterProject( l, - projectList, + projects, filters, - (a, f) => t.buildScan(a, f)) :: Nil + (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil - case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: PrunedScan)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan)) => pruneFilterProject( l, - projectList, + projects, filters, - (a, _) => t.buildScan(a)) :: Nil + (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil // Scanning partitioned HadoopFsRelation - case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) if t.partitionSpec.partitionColumns.nonEmpty => val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray @@ -80,13 +80,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { buildPartitionedTableScan( l, - projectList, + projects, pushedFilters, t.partitionSpec.partitionColumns, selectedPartitions) :: Nil // Scanning non-partitioned HadoopFsRelation - case PhysicalOperation(projectList, filters, l @ LogicalRelation(t: HadoopFsRelation)) => + case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation)) => // See buildPartitionedTableScan for the reason that we need to create a shard // broadcast HadoopConf. val sharedHadoopConf = SparkHadoopUtil.get.conf @@ -94,12 +94,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { t.sqlContext.sparkContext.broadcast(new SerializableWritable(sharedHadoopConf)) pruneFilterProject( l, - projectList, + projects, filters, - (a, f) => t.buildScan(a, f, t.paths, confBroadcast)) :: Nil + (a, f) => + toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, confBroadcast))) :: Nil case l @ LogicalRelation(t: TableScan) => - createPhysicalRDD(l.relation, l.output, t.buildScan()) :: Nil + execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil case i @ logical.InsertIntoTable( l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, false) if part.isEmpty => @@ -119,7 +120,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { filters: Seq[Expression], partitionColumns: StructType, partitions: Array[Partition]) = { - val output = projections.map(_.toAttribute) val relation = logicalRelation.relation.asInstanceOf[HadoopFsRelation] // Because we are creating one RDD per partition, we need to have a shared HadoopConf. @@ -138,23 +138,23 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { logicalRelation, projections, filters, - (requiredColumns, filters) => { + (columns: Seq[Attribute], filters) => { val partitionColNames = partitionColumns.fieldNames // Don't scan any partition columns to save I/O. Here we are being optimistic and // assuming partition columns data stored in data files are always consistent with those // partition values encoded in partition directory paths. - val nonPartitionColumns = requiredColumns.filterNot(partitionColNames.contains) + val needed = columns.filterNot(a => partitionColNames.contains(a.name)) val dataRows = - relation.buildScan(nonPartitionColumns, filters, Array(dir), confBroadcast) + relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast) // Merges data values with partition values. mergeWithPartitionValues( relation.schema, - requiredColumns, + columns.map(_.name).toArray, partitionColNames, partitionValues, - dataRows) + toCatalystRDD(logicalRelation, needed, dataRows)) }) scan.execute() @@ -167,15 +167,15 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) } - createPhysicalRDD(logicalRelation.relation, output, unionedRows) + execution.PhysicalRDD(projections.map(_.toAttribute), unionedRows) } private def mergeWithPartitionValues( schema: StructType, requiredColumns: Array[String], partitionColumns: Array[String], - partitionValues: Row, - dataRows: RDD[Row]): RDD[Row] = { + partitionValues: InternalRow, + dataRows: RDD[InternalRow]): RDD[InternalRow] = { val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains) // If output columns contain any partition column(s), we need to merge scanned data @@ -186,13 +186,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val i = partitionColumns.indexOf(name) if (i != -1) { // If yes, gets column value from partition values. - (mutableRow: MutableRow, dataRow: expressions.Row, ordinal: Int) => { + (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => { mutableRow(ordinal) = partitionValues(i) } } else { // Otherwise, inherits the value from scanned data. val i = nonPartitionColumns.indexOf(name) - (mutableRow: MutableRow, dataRow: expressions.Row, ordinal: Int) => { + (mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => { mutableRow(ordinal) = dataRow(i) } } @@ -201,7 +201,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Since we know for sure that this closure is serializable, we can avoid the overhead // of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally // this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718). - val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[Row]) => { + val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => { val dataTypes = requiredColumns.map(schema(_).dataType) val mutableRow = new SpecificMutableRow(dataTypes) iterator.map { dataRow => @@ -210,7 +210,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { mergers(i)(mutableRow, dataRow, i) i += 1 } - mutableRow.asInstanceOf[expressions.Row] + mutableRow.asInstanceOf[InternalRow] } } @@ -256,26 +256,26 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Based on Public API. protected def pruneFilterProject( relation: LogicalRelation, - projectList: Seq[NamedExpression], + projects: Seq[NamedExpression], filterPredicates: Seq[Expression], - scanBuilder: (Array[String], Array[Filter]) => RDD[Row]) = { + scanBuilder: (Seq[Attribute], Array[Filter]) => RDD[InternalRow]) = { pruneFilterProjectRaw( relation, - projectList, + projects, filterPredicates, (requestedColumns, pushedFilters) => { - scanBuilder(requestedColumns.map(_.name).toArray, selectFilters(pushedFilters).toArray) + scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray) }) } // Based on Catalyst expressions. protected def pruneFilterProjectRaw( relation: LogicalRelation, - projectList: Seq[NamedExpression], + projects: Seq[NamedExpression], filterPredicates: Seq[Expression], - scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[Row]) = { + scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = { - val projectSet = AttributeSet(projectList.flatMap(_.references)) + val projectSet = AttributeSet(projects.flatMap(_.references)) val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) val filterCondition = filterPredicates.reduceLeftOption(expressions.And) @@ -283,38 +283,47 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes. }} - if (projectList.map(_.toAttribute) == projectList && - projectSet.size == projectList.size && + if (projects.map(_.toAttribute) == projects && + projectSet.size == projects.size && filterSet.subsetOf(projectSet)) { // When it is possible to just use column pruning to get the right projection and // when the columns of this projection are enough to evaluate all filter conditions, // just do a scan followed by a filter, with no extra project. val requestedColumns = - projectList.asInstanceOf[Seq[Attribute]] // Safe due to if above. + projects.asInstanceOf[Seq[Attribute]] // Safe due to if above. .map(relation.attributeMap) // Match original case of attributes. - val scan = createPhysicalRDD(relation.relation, projectList.map(_.toAttribute), - scanBuilder(requestedColumns, pushedFilters)) + val scan = execution.PhysicalRDD(projects.map(_.toAttribute), + scanBuilder(requestedColumns, pushedFilters)) filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) } else { val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq - val scan = createPhysicalRDD(relation.relation, requestedColumns, + val scan = execution.PhysicalRDD(requestedColumns, scanBuilder(requestedColumns, pushedFilters)) - execution.Project(projectList, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) + execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) } } - private[this] def createPhysicalRDD( - relation: BaseRelation, + /** + * Convert RDD of Row into RDD of InternalRow with objects in catalyst types + */ + private[this] def toCatalystRDD( + relation: LogicalRelation, output: Seq[Attribute], - rdd: RDD[Row]): SparkPlan = { - val converted = if (relation.needConversion) { - execution.RDDConversions.rowToRowRdd(rdd, output.map(_.dataType)) + rdd: RDD[Row]): RDD[InternalRow] = { + if (relation.relation.needConversion) { + execution.RDDConversions.rowToRowRdd(rdd.asInstanceOf[RDD[Row]], output.map(_.dataType)) } else { - rdd + rdd.map(_.asInstanceOf[InternalRow]) } - execution.PhysicalRDD(output, converted) + } + + /** + * Convert RDD of Row into RDD of InternalRow with objects in catalyst types + */ + private[this] def toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]): RDD[InternalRow] = { + toCatalystRDD(relation, relation.output, rdd) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala index 7a2b5b9..c6f535d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala @@ -25,12 +25,11 @@ import scala.util.Try import org.apache.hadoop.fs.Path import org.apache.hadoop.util.Shell - -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.types._ -private[sql] case class Partition(values: Row, path: String) +private[sql] case class Partition(values: InternalRow, path: String) private[sql] case class PartitionSpec(partitionColumns: StructType, partitions: Seq[Partition]) @@ -100,7 +99,7 @@ private[sql] object PartitioningUtils { // Finally, we create `Partition`s based on paths and resolved partition values. val partitions = resolvedPartitionValues.zip(pathsWithPartitionValues).map { case (PartitionValues(_, literals), (path, _)) => - Partition(Row.fromSeq(literals.map(_.value)), path.toString) + Partition(InternalRow.fromSeq(literals.map(_.value)), path.toString) } PartitionSpec(StructType(fields), partitions) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index c94199b..1763cee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext, SaveMode} +import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode} private[sql] case class InsertIntoDataSource( logicalRelation: LogicalRelation, @@ -44,18 +44,17 @@ private[sql] case class InsertIntoDataSource( overwrite: Boolean) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[Row] = { + override def run(sqlContext: SQLContext): Seq[InternalRow] = { val relation = logicalRelation.relation.asInstanceOf[InsertableRelation] val data = DataFrame(sqlContext, query) // Apply the schema of the existing table to the new data. - val df = sqlContext.createDataFrame( - data.queryExecution.toRdd, logicalRelation.schema, needsConversion = false) + val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema) relation.insert(df, overwrite) // Invalidate the cache. sqlContext.cacheManager.invalidateCache(logicalRelation) - Seq.empty[Row] + Seq.empty[InternalRow] } } @@ -65,7 +64,7 @@ private[sql] case class InsertIntoHadoopFsRelation( mode: SaveMode) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[Row] = { + override def run(sqlContext: SQLContext): Seq[InternalRow] = { require( relation.paths.length == 1, s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}") @@ -90,7 +89,7 @@ private[sql] case class InsertIntoHadoopFsRelation( if (doInsertion) { val job = new Job(hadoopConf) job.setOutputKeyClass(classOf[Void]) - job.setOutputValueClass(classOf[Row]) + job.setOutputValueClass(classOf[InternalRow]) FileOutputFormat.setOutputPath(job, qualifiedOutputPath) // We create a DataFrame by applying the schema of relation to the data to make sure. @@ -103,10 +102,8 @@ private[sql] case class InsertIntoHadoopFsRelation( val project = Project( relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query) - sqlContext.createDataFrame( - DataFrame(sqlContext, project).queryExecution.toRdd, - relation.schema, - needsConversion = false) + sqlContext.internalCreateDataFrame( + DataFrame(sqlContext, project).queryExecution.toRdd, relation.schema) } val partitionColumns = relation.partitionColumns.fieldNames @@ -119,7 +116,7 @@ private[sql] case class InsertIntoHadoopFsRelation( } } - Seq.empty[Row] + Seq.empty[InternalRow] } private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = { @@ -141,22 +138,19 @@ private[sql] case class InsertIntoHadoopFsRelation( throw new SparkException("Job aborted.", cause) } - def writeRows(taskContext: TaskContext, iterator: Iterator[Row]): Unit = { + def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { // If anything below fails, we should abort the task. try { writerContainer.executorSideSetup(taskContext) - if (needsConversion) { - val converter = CatalystTypeConverters.createToScalaConverter(dataSchema) - while (iterator.hasNext) { - val row = converter(iterator.next()).asInstanceOf[Row] - writerContainer.outputWriterForRow(row).write(row) - } + val converter = if (needsConversion) { + CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row] } else { - while (iterator.hasNext) { - val row = iterator.next() - writerContainer.outputWriterForRow(row).write(row) - } + r: InternalRow => r.asInstanceOf[Row] + } + while (iterator.hasNext) { + val row = converter(iterator.next()) + writerContainer.outputWriterForRow(row).write(row) } writerContainer.commitTask() @@ -210,32 +204,28 @@ private[sql] case class InsertIntoHadoopFsRelation( throw new SparkException("Job aborted.", cause) } - def writeRows(taskContext: TaskContext, iterator: Iterator[Row]): Unit = { + def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = { // If anything below fails, we should abort the task. try { writerContainer.executorSideSetup(taskContext) val partitionProj = newProjection(codegenEnabled, partitionOutput, output) val dataProj = newProjection(codegenEnabled, dataOutput, output) - - if (needsConversion) { - val converter = CatalystTypeConverters.createToScalaConverter(dataSchema) - while (iterator.hasNext) { - val row = iterator.next() - val partitionPart = partitionProj(row) - val dataPart = dataProj(row) - val convertedDataPart = converter(dataPart).asInstanceOf[Row] - writerContainer.outputWriterForRow(partitionPart).write(convertedDataPart) - } + val dataConverter: InternalRow => Row = if (needsConversion) { + CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row] } else { - val partitionSchema = StructType.fromAttributes(partitionOutput) - val converter = CatalystTypeConverters.createToScalaConverter(partitionSchema) - while (iterator.hasNext) { - val row = iterator.next() - val partitionPart = converter(partitionProj(row)).asInstanceOf[Row] - val dataPart = dataProj(row) - writerContainer.outputWriterForRow(partitionPart).write(dataPart) - } + r: InternalRow => r.asInstanceOf[Row] + } + val partitionSchema = StructType.fromAttributes(partitionOutput) + val partConverter: InternalRow => Row = + CatalystTypeConverters.createToScalaConverter(partitionSchema) + .asInstanceOf[InternalRow => Row] + + while (iterator.hasNext) { + val row = iterator.next() + val partitionPart = partConverter(partitionProj(row)) + val dataPart = dataConverter(dataProj(row)) + writerContainer.outputWriterForRow(partitionPart).write(dataPart) } writerContainer.commitTask() http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 20afd60..01c67db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -26,7 +26,7 @@ import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.catalyst.AbstractSparkSQLParser import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InternalRow} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.types._ @@ -404,7 +404,7 @@ private[sql] case class CreateTempTableUsing( provider: String, options: Map[String, String]) extends RunnableCommand { - def run(sqlContext: SQLContext): Seq[Row] = { + def run(sqlContext: SQLContext): Seq[InternalRow] = { val resolved = ResolvedDataSource( sqlContext, userSpecifiedSchema, Array.empty[String], provider, options) sqlContext.registerDataFrameAsTable( @@ -421,7 +421,7 @@ private[sql] case class CreateTempTableUsingAsSelect( options: Map[String, String], query: LogicalPlan) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[Row] = { + override def run(sqlContext: SQLContext): Seq[InternalRow] = { val df = DataFrame(sqlContext, query) val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df) sqlContext.registerDataFrameAsTable( @@ -434,7 +434,7 @@ private[sql] case class CreateTempTableUsingAsSelect( private[sql] case class RefreshTable(databaseName: String, tableName: String) extends RunnableCommand { - override def run(sqlContext: SQLContext): Seq[Row] = { + override def run(sqlContext: SQLContext): Seq[InternalRow] = { // Refresh the given table's metadata first. sqlContext.catalog.refreshTable(databaseName, tableName) @@ -453,7 +453,7 @@ private[sql] case class RefreshTable(databaseName: String, tableName: String) sqlContext.cacheManager.cacheQuery(df, Some(tableName)) } - Seq.empty[Row] + Seq.empty[InternalRow] } } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index d1547fb..27534a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -28,7 +28,8 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.SerializableWritable -import org.apache.spark.sql.{Row, _} +import org.apache.spark.sql.execution.RDDConversions +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection import org.apache.spark.sql.types.StructType @@ -195,6 +196,8 @@ abstract class BaseRelation { * java.lang.String -> UTF8String * java.lang.Decimal -> Decimal * + * If `needConversion` is `false`, buildScan() should return an [[RDD]] of [[InternalRow]] + * * Note: The internal representation is not stable across releases and thus data sources outside * of Spark SQL should leave this as true. * @@ -443,7 +446,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio val castedValues = partitionSchema.zip(literals).map { case (field, literal) => Cast(literal, field.dataType).eval() } - p.copy(values = Row.fromSeq(castedValues)) + p.copy(values = InternalRow.fromSeq(castedValues)) } PartitionSpec(partitionSchema, castedPartitions) } @@ -579,15 +582,21 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio BoundReference(dataSchema.fieldIndex(col), field.dataType, field.nullable) }.toSeq - buildScan(inputFiles).mapPartitions { rows => + val rdd = buildScan(inputFiles) + val converted = + if (needConversion) { + RDDConversions.rowToRowRdd(rdd, dataSchema.fields.map(_.dataType)) + } else { + rdd.map(_.asInstanceOf[InternalRow]) + } + converted.mapPartitions { rows => val buildProjection = if (codegenEnabled) { GenerateMutableProjection.generate(requiredOutput, dataSchema.toAttributes) } else { () => new InterpretedMutableProjection(requiredOutput, dataSchema.toAttributes) } - val mutableProjection = buildProjection() - rows.map(mutableProjection) + rows.map(r => mutableProjection(r).asInstanceOf[Row]) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 17a3cec..eb3e913 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -94,7 +94,7 @@ class CachedTableSuite extends QueryTest { } test("too big for memory") { - val data = "*" * 10000 + val data = "*" * 1000 ctx.sparkContext.parallelize(1 to 200000, 1).map(_ => BigData(data)).toDF() .registerTempTable("bigData") ctx.table("bigData").persist(StorageLevel.MEMORY_AND_DISK) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala index 1683662..1f37455 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala @@ -18,25 +18,28 @@ package org.apache.spark.sql.columnar import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.expressions.InternalRow import org.apache.spark.sql.types._ class ColumnStatsSuite extends SparkFunSuite { - testColumnStats(classOf[ByteColumnStats], BYTE, Row(Byte.MaxValue, Byte.MinValue, 0)) - testColumnStats(classOf[ShortColumnStats], SHORT, Row(Short.MaxValue, Short.MinValue, 0)) - testColumnStats(classOf[IntColumnStats], INT, Row(Int.MaxValue, Int.MinValue, 0)) - testColumnStats(classOf[LongColumnStats], LONG, Row(Long.MaxValue, Long.MinValue, 0)) - testColumnStats(classOf[FloatColumnStats], FLOAT, Row(Float.MaxValue, Float.MinValue, 0)) - testColumnStats(classOf[DoubleColumnStats], DOUBLE, Row(Double.MaxValue, Double.MinValue, 0)) - testColumnStats(classOf[FixedDecimalColumnStats], FIXED_DECIMAL(15, 10), Row(null, null, 0)) - testColumnStats(classOf[StringColumnStats], STRING, Row(null, null, 0)) - testColumnStats(classOf[DateColumnStats], DATE, Row(Int.MaxValue, Int.MinValue, 0)) - testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, Row(Long.MaxValue, Long.MinValue, 0)) + testColumnStats(classOf[ByteColumnStats], BYTE, InternalRow(Byte.MaxValue, Byte.MinValue, 0)) + testColumnStats(classOf[ShortColumnStats], SHORT, InternalRow(Short.MaxValue, Short.MinValue, 0)) + testColumnStats(classOf[IntColumnStats], INT, InternalRow(Int.MaxValue, Int.MinValue, 0)) + testColumnStats(classOf[LongColumnStats], LONG, InternalRow(Long.MaxValue, Long.MinValue, 0)) + testColumnStats(classOf[FloatColumnStats], FLOAT, InternalRow(Float.MaxValue, Float.MinValue, 0)) + testColumnStats(classOf[DoubleColumnStats], DOUBLE, + InternalRow(Double.MaxValue, Double.MinValue, 0)) + testColumnStats(classOf[FixedDecimalColumnStats], + FIXED_DECIMAL(15, 10), InternalRow(null, null, 0)) + testColumnStats(classOf[StringColumnStats], STRING, InternalRow(null, null, 0)) + testColumnStats(classOf[DateColumnStats], DATE, InternalRow(Int.MaxValue, Int.MinValue, 0)) + testColumnStats(classOf[TimestampColumnStats], TIMESTAMP, + InternalRow(Long.MaxValue, Long.MinValue, 0)) def testColumnStats[T <: AtomicType, U <: ColumnStats]( columnStatsClass: Class[U], columnType: NativeColumnType[T], - initialStatistics: Row): Unit = { + initialStatistics: InternalRow): Unit = { val columnStatsName = columnStatsClass.getSimpleName http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala index 1bc7eb3..7c86eae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala @@ -19,14 +19,11 @@ package org.apache.spark.sql.columnar import scala.collection.immutable.HashSet import scala.util.Random - -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.types.{AtomicType, DataType, Decimal} import org.apache.spark.sql.types.{DataType, Decimal, AtomicType} import org.apache.spark.unsafe.types.UTF8String - object ColumnarTestUtils { def makeNullRow(length: Int): GenericMutableRow = { val row = new GenericMutableRow(length) @@ -79,9 +76,9 @@ object ColumnarTestUtils { def makeRandomRow( head: ColumnType[_ <: DataType, _], - tail: ColumnType[_ <: DataType, _]*): Row = makeRandomRow(Seq(head) ++ tail) + tail: ColumnType[_ <: DataType, _]*): InternalRow = makeRandomRow(Seq(head) ++ tail) - def makeRandomRow(columnTypes: Seq[ColumnType[_ <: DataType, _]]): Row = { + def makeRandomRow(columnTypes: Seq[ColumnType[_ <: DataType, _]]): InternalRow = { val row = new GenericMutableRow(columnTypes.length) makeRandomValues(columnTypes).zipWithIndex.foreach { case (value, index) => row(index) = value http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index fa3b814..12f95eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql.columnar import java.sql.{Date, Timestamp} import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.types._ -import org.apache.spark.sql.{QueryTest, TestData} +import org.apache.spark.sql.{QueryTest, Row, TestData} import org.apache.spark.storage.StorageLevel.MEMORY_ONLY class InMemoryColumnarQuerySuite extends QueryTest { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala index 20d65a7..f606e21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.columnar.compression import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericMutableRow -import org.apache.spark.sql.columnar.{NoopColumnStats, BOOLEAN} import org.apache.spark.sql.columnar.ColumnarTestUtils._ +import org.apache.spark.sql.columnar.{BOOLEAN, NoopColumnStats} class BooleanBitSetSuite extends SparkFunSuite { import BooleanBitSet._ @@ -32,7 +32,7 @@ class BooleanBitSetSuite extends SparkFunSuite { // ------------- val builder = TestCompressibleColumnBuilder(new NoopColumnStats, BOOLEAN, BooleanBitSet) - val rows = Seq.fill[Row](count)(makeRandomRow(BOOLEAN)) + val rows = Seq.fill[InternalRow](count)(makeRandomRow(BOOLEAN)) val values = rows.map(_(0)) rows.foreach(builder.appendFrom(_, 0)) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 45a7e8f..3e27f58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -18,16 +18,15 @@ package org.apache.spark.sql.execution import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{SQLConf, execution} -import org.apache.spark.sql.functions._ import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.sql.test.TestSQLContext.implicits._ import org.apache.spark.sql.test.TestSQLContext.planner._ import org.apache.spark.sql.types._ +import org.apache.spark.sql.{Row, SQLConf, execution} class PlannerSuite extends SparkFunSuite { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala index 5290c28..71db6a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.{Projection, Row} +import org.apache.spark.sql.catalyst.expressions.{Projection, InternalRow} import org.apache.spark.util.collection.CompactBuffer @@ -26,37 +26,37 @@ class HashedRelationSuite extends SparkFunSuite { // Key is simply the record itself private val keyProjection = new Projection { - override def apply(row: Row): Row = row + override def apply(row: InternalRow): InternalRow = row } test("GeneralHashedRelation") { - val data = Array(Row(0), Row(1), Row(2), Row(2)) + val data = Array(InternalRow(0), InternalRow(1), InternalRow(2), InternalRow(2)) val hashed = HashedRelation(data.iterator, keyProjection) assert(hashed.isInstanceOf[GeneralHashedRelation]) - assert(hashed.get(data(0)) == CompactBuffer[Row](data(0))) - assert(hashed.get(data(1)) == CompactBuffer[Row](data(1))) - assert(hashed.get(Row(10)) === null) + assert(hashed.get(data(0)) == CompactBuffer[InternalRow](data(0))) + assert(hashed.get(data(1)) == CompactBuffer[InternalRow](data(1))) + assert(hashed.get(InternalRow(10)) === null) - val data2 = CompactBuffer[Row](data(2)) + val data2 = CompactBuffer[InternalRow](data(2)) data2 += data(2) assert(hashed.get(data(2)) == data2) } test("UniqueKeyHashedRelation") { - val data = Array(Row(0), Row(1), Row(2)) + val data = Array(InternalRow(0), InternalRow(1), InternalRow(2)) val hashed = HashedRelation(data.iterator, keyProjection) assert(hashed.isInstanceOf[UniqueKeyHashedRelation]) - assert(hashed.get(data(0)) == CompactBuffer[Row](data(0))) - assert(hashed.get(data(1)) == CompactBuffer[Row](data(1))) - assert(hashed.get(data(2)) == CompactBuffer[Row](data(2))) - assert(hashed.get(Row(10)) === null) + assert(hashed.get(data(0)) == CompactBuffer[InternalRow](data(0))) + assert(hashed.get(data(1)) == CompactBuffer[InternalRow](data(1))) + assert(hashed.get(data(2)) == CompactBuffer[InternalRow](data(2))) + assert(hashed.get(InternalRow(10)) === null) val uniqHashed = hashed.asInstanceOf[UniqueKeyHashedRelation] assert(uniqHashed.getValue(data(0)) == data(0)) assert(uniqHashed.getValue(data(1)) == data(1)) assert(uniqHashed.getValue(data(2)) == data(2)) - assert(uniqHashed.getValue(Row(10)) == null) + assert(uniqHashed.getValue(InternalRow(10)) == null) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala index 17f5f9a..fa5d4ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ -import org.apache.spark.sql.{Column, DataFrame, QueryTest, SQLConf} +import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf} /** * A test suite that tests Parquet filter2 API based filter pushdown optimization. http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 46b2585..fc827bc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -35,11 +35,10 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, QueryTest, SQLConf, SaveMode} // Write support class for nested groups: ParquetWriter initializes GroupWriteSupport // with an empty configuration (it is after all not intended to be used in this way?) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
