http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala index 0266084..f9e8150 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregates.scala @@ -19,9 +19,10 @@ package org.apache.spark.sql.catalyst.expressions import com.clearspring.analytics.stream.cardinality.HyperLogLog -import org.apache.spark.sql.types._ -import org.apache.spark.sql.catalyst.trees +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.catalyst.trees +import org.apache.spark.sql.types._ import org.apache.spark.util.collection.OpenHashSet abstract class AggregateExpression extends Expression { @@ -37,7 +38,7 @@ abstract class AggregateExpression extends Expression { * [[AggregateExpression.eval]] should never be invoked because [[AggregateExpression]]'s are * replaced with a physical aggregate operator at runtime. */ - override def eval(input: Row = null): Any = + override def eval(input: catalyst.InternalRow = null): Any = throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") } @@ -80,7 +81,7 @@ abstract class AggregateFunction override def nullable: Boolean = base.nullable override def dataType: DataType = base.dataType - def update(input: Row): Unit + def update(input: catalyst.InternalRow): Unit // Do we really need this? override def newInstance(): AggregateFunction = { @@ -108,7 +109,7 @@ case class MinFunction(expr: Expression, base: AggregateExpression) extends Aggr val currentMin: MutableLiteral = MutableLiteral(null, expr.dataType) val cmp = GreaterThan(currentMin, expr) - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { if (currentMin.value == null) { currentMin.value = expr.eval(input) } else if (cmp.eval(input) == true) { @@ -116,7 +117,7 @@ case class MinFunction(expr: Expression, base: AggregateExpression) extends Aggr } } - override def eval(input: Row): Any = currentMin.value + override def eval(input: catalyst.InternalRow): Any = currentMin.value } case class Max(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -139,7 +140,7 @@ case class MaxFunction(expr: Expression, base: AggregateExpression) extends Aggr val currentMax: MutableLiteral = MutableLiteral(null, expr.dataType) val cmp = LessThan(currentMax, expr) - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { if (currentMax.value == null) { currentMax.value = expr.eval(input) } else if (cmp.eval(input) == true) { @@ -147,7 +148,7 @@ case class MaxFunction(expr: Expression, base: AggregateExpression) extends Aggr } } - override def eval(input: Row): Any = currentMax.value + override def eval(input: catalyst.InternalRow): Any = currentMax.value } case class Count(child: Expression) extends PartialAggregate with trees.UnaryNode[Expression] { @@ -205,14 +206,14 @@ case class CollectHashSetFunction( @transient val distinctValue = new InterpretedProjection(expr) - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { val evaluatedExpr = distinctValue(input) if (!evaluatedExpr.anyNull) { seen.add(evaluatedExpr) } } - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { seen } } @@ -238,7 +239,7 @@ case class CombineSetsAndCountFunction( val seen = new OpenHashSet[Any]() - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { val inputSetEval = inputSet.eval(input).asInstanceOf[OpenHashSet[Any]] val inputIterator = inputSetEval.iterator while (inputIterator.hasNext) { @@ -246,7 +247,7 @@ case class CombineSetsAndCountFunction( } } - override def eval(input: Row): Any = seen.size.toLong + override def eval(input: catalyst.InternalRow): Any = seen.size.toLong } /** The data type of ApproxCountDistinctPartition since its output is a HyperLogLog object. */ @@ -453,7 +454,7 @@ case class CombineSetsAndSumFunction( val seen = new OpenHashSet[Any]() - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { val inputSetEval = inputSet.eval(input).asInstanceOf[OpenHashSet[Any]] val inputIterator = inputSetEval.iterator while (inputIterator.hasNext) { @@ -461,8 +462,8 @@ case class CombineSetsAndSumFunction( } } - override def eval(input: Row): Any = { - val casted = seen.asInstanceOf[OpenHashSet[Row]] + override def eval(input: catalyst.InternalRow): Any = { + val casted = seen.asInstanceOf[OpenHashSet[catalyst.InternalRow]] if (casted.size == 0) { null } else { @@ -524,7 +525,7 @@ case class AverageFunction(expr: Expression, base: AggregateExpression) private def addFunction(value: Any) = Add(sum, Cast(Literal.create(value, expr.dataType), calcType)) - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { if (count == 0L) { null } else { @@ -541,7 +542,7 @@ case class AverageFunction(expr: Expression, base: AggregateExpression) } } - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { val evaluatedExpr = expr.eval(input) if (evaluatedExpr != null) { count += 1 @@ -555,14 +556,14 @@ case class CountFunction(expr: Expression, base: AggregateExpression) extends Ag var count: Long = _ - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { val evaluatedExpr = expr.eval(input) if (evaluatedExpr != null) { count += 1L } } - override def eval(input: Row): Any = count + override def eval(input: catalyst.InternalRow): Any = count } case class ApproxCountDistinctPartitionFunction( @@ -574,14 +575,14 @@ case class ApproxCountDistinctPartitionFunction( private val hyperLogLog = new HyperLogLog(relativeSD) - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { val evaluatedExpr = expr.eval(input) if (evaluatedExpr != null) { hyperLogLog.offer(evaluatedExpr) } } - override def eval(input: Row): Any = hyperLogLog + override def eval(input: catalyst.InternalRow): Any = hyperLogLog } case class ApproxCountDistinctMergeFunction( @@ -593,12 +594,12 @@ case class ApproxCountDistinctMergeFunction( private val hyperLogLog = new HyperLogLog(relativeSD) - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { val evaluatedExpr = expr.eval(input) hyperLogLog.addAll(evaluatedExpr.asInstanceOf[HyperLogLog]) } - override def eval(input: Row): Any = hyperLogLog.cardinality() + override def eval(input: catalyst.InternalRow): Any = hyperLogLog.cardinality() } case class SumFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { @@ -619,11 +620,11 @@ case class SumFunction(expr: Expression, base: AggregateExpression) extends Aggr private val addFunction = Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, zero)) - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { sum.update(addFunction, input) } - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { expr.dataType match { case DecimalType.Fixed(_, _) => Cast(sum, dataType).eval(null) @@ -652,7 +653,7 @@ case class CombineSumFunction(expr: Expression, base: AggregateExpression) private val addFunction = Coalesce(Seq(Add(Coalesce(Seq(sum, zero)), Cast(expr, calcType)), sum, zero)) - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { val result = expr.eval(input) // partial sum result can be null only when no input rows present if(result != null) { @@ -660,7 +661,7 @@ case class CombineSumFunction(expr: Expression, base: AggregateExpression) } } - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { expr.dataType match { case DecimalType.Fixed(_, _) => Cast(sum, dataType).eval(null) @@ -676,14 +677,14 @@ case class SumDistinctFunction(expr: Expression, base: AggregateExpression) private val seen = new scala.collection.mutable.HashSet[Any]() - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { val evaluatedExpr = expr.eval(input) if (evaluatedExpr != null) { seen += evaluatedExpr } } - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { if (seen.size == 0) { null } else { @@ -707,14 +708,14 @@ case class CountDistinctFunction( @transient val distinctValue = new InterpretedProjection(expr) - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { val evaluatedExpr = distinctValue(input) if (!evaluatedExpr.anyNull) { seen.add(evaluatedExpr) } } - override def eval(input: Row): Any = seen.size.toLong + override def eval(input: catalyst.InternalRow): Any = seen.size.toLong } case class FirstFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { @@ -722,13 +723,13 @@ case class FirstFunction(expr: Expression, base: AggregateExpression) extends Ag var result: Any = null - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { if (result == null) { result = expr.eval(input) } } - override def eval(input: Row): Any = result + override def eval(input: catalyst.InternalRow): Any = result } case class LastFunction(expr: Expression, base: AggregateExpression) extends AggregateFunction { @@ -736,11 +737,11 @@ case class LastFunction(expr: Expression, base: AggregateExpression) extends Agg var result: Any = null - override def update(input: Row): Unit = { + override def update(input: catalyst.InternalRow): Unit = { result = input } - override def eval(input: Row): Any = { - if (result != null) expr.eval(result.asInstanceOf[Row]) else null + override def eval(input: catalyst.InternalRow): Any = { + if (result != null) expr.eval(result.asInstanceOf[catalyst.InternalRow]) else null } }
http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index 124274c..0ba2ff7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ @@ -29,7 +30,7 @@ abstract class UnaryArithmetic extends UnaryExpression { override def nullable: Boolean = child.nullable override def dataType: DataType = child.dataType - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val evalE = child.eval(input) if (evalE == null) { null @@ -124,7 +125,7 @@ abstract class BinaryArithmetic extends BinaryExpression { protected def checkTypesInternal(t: DataType): TypeCheckResult - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val evalE1 = left.eval(input) if(evalE1 == null) { null @@ -219,7 +220,7 @@ case class Divide(left: Expression, right: Expression) extends BinaryArithmetic case it: IntegralType => it.integral.asInstanceOf[Integral[Any]].quot } - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val evalE2 = right.eval(input) if (evalE2 == null || evalE2 == 0) { null @@ -279,7 +280,7 @@ case class Remainder(left: Expression, right: Expression) extends BinaryArithmet case i: FractionalType => i.asIntegral.asInstanceOf[Integral[Any]] } - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val evalE2 = right.eval(input) if (evalE2 == null || evalE2 == 0) { null @@ -330,7 +331,7 @@ case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic { private lazy val ordering = TypeUtils.getOrdering(dataType) - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val evalE1 = left.eval(input) val evalE2 = right.eval(input) if (evalE1 == null) { @@ -384,7 +385,7 @@ case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic { private lazy val ordering = TypeUtils.getOrdering(dataType) - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val evalE1 = left.eval(input) val evalE2 = right.eval(input) if (evalE1 == null) { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 536e477..244a066 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -24,6 +24,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader} import org.codehaus.janino.ClassBodyEvaluator import org.apache.spark.Logging +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -34,7 +35,7 @@ class IntegerHashSet extends org.apache.spark.util.collection.OpenHashSet[Int] class LongHashSet extends org.apache.spark.util.collection.OpenHashSet[Long] /** - * Java source for evaluating an [[Expression]] given a [[Row]] of input. + * Java source for evaluating an [[Expression]] given a [[catalyst.InternalRow]] of input. * * @param code The sequence of statements required to evaluate the expression. * @param isNull A term that holds a boolean value representing whether the expression evaluated @@ -183,13 +184,13 @@ class CodeGenContext { } /** - * List of data types that have special accessors and setters in [[Row]]. + * List of data types that have special accessors and setters in [[catalyst.InternalRow]]. */ val nativeTypes = Seq(IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType) /** - * Returns true if the data type has a special accessor and setter in [[Row]]. + * Returns true if the data type has a special accessor and setter in [[catalyst.InternalRow]]. */ def isNativeType(dt: DataType): Boolean = nativeTypes.contains(dt) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index ed3df54..35cb954 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions._ // MutableProjection is not accessible in Java @@ -24,7 +25,7 @@ abstract class BaseMutableProjection extends MutableProjection {} /** * Generates byte code that produces a [[MutableRow]] object that can update itself based on a new - * input [[Row]] for a fixed set of [[Expression Expressions]]. + * input [[catalyst.InternalRow]] for a fixed set of [[Expression Expressions]]. */ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => MutableProjection] { @@ -47,7 +48,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu """ }.mkString("\n") val code = s""" - import org.apache.spark.sql.Row; + import org.apache.spark.sql.catalyst.InternalRow; public SpecificProjection generate($exprType[] expr) { return new SpecificProjection(expr); @@ -69,12 +70,12 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu } /* Provide immutable access to the last projected row. */ - public Row currentValue() { - return mutableRow; + public InternalRow currentValue() { + return (InternalRow) mutableRow; } public Object apply(Object _i) { - Row i = (Row) _i; + InternalRow i = (InternalRow) _i; $projectionCode return mutableRow; http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index 56ecc5f..db5d570 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -19,15 +19,15 @@ package org.apache.spark.sql.catalyst.expressions.codegen import org.apache.spark.Logging import org.apache.spark.annotation.Private -import org.apache.spark.sql.Row +import org.apache.spark.sql.{catalyst, Row} import org.apache.spark.sql.catalyst.expressions._ /** * Inherits some default implementation for Java from `Ordering[Row]` */ @Private -class BaseOrdering extends Ordering[Row] { - def compare(a: Row, b: Row): Int = { +class BaseOrdering extends Ordering[catalyst.InternalRow] { + def compare(a: catalyst.InternalRow, b: catalyst.InternalRow): Int = { throw new UnsupportedOperationException } } @@ -36,7 +36,8 @@ class BaseOrdering extends Ordering[Row] { * Generates bytecode for an [[Ordering]] of [[Row Rows]] for a given set of * [[Expression Expressions]]. */ -object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] with Logging { +object GenerateOrdering + extends CodeGenerator[Seq[SortOrder], Ordering[catalyst.InternalRow]] with Logging { import scala.reflect.runtime.universe._ protected def canonicalize(in: Seq[SortOrder]): Seq[SortOrder] = @@ -45,7 +46,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] wit protected def bind(in: Seq[SortOrder], inputSchema: Seq[Attribute]): Seq[SortOrder] = in.map(BindReferences.bindReference(_, inputSchema)) - protected def create(ordering: Seq[SortOrder]): Ordering[Row] = { + protected def create(ordering: Seq[SortOrder]): Ordering[catalyst.InternalRow] = { val a = newTermName("a") val b = newTermName("b") val ctx = newCodeGenContext() @@ -75,7 +76,7 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] wit }.mkString("\n") val code = s""" - import org.apache.spark.sql.Row; + import org.apache.spark.sql.catalyst.InternalRow; public SpecificOrdering generate($exprType[] expr) { return new SpecificOrdering(expr); @@ -90,8 +91,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[Row]] wit } @Override - public int compare(Row a, Row b) { - Row i = null; // Holds current row being evaluated. + public int compare(InternalRow a, InternalRow b) { + InternalRow i = null; // Holds current row being evaluated. $comparisons return 0; } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala index 4a547b5..9e191dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala @@ -17,30 +17,31 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions._ /** * Interface for generated predicate */ abstract class Predicate { - def eval(r: Row): Boolean + def eval(r: catalyst.InternalRow): Boolean } /** - * Generates bytecode that evaluates a boolean [[Expression]] on a given input [[Row]]. + * Generates bytecode that evaluates a boolean [[Expression]] on a given input [[InternalRow]]. */ -object GeneratePredicate extends CodeGenerator[Expression, (Row) => Boolean] { +object GeneratePredicate extends CodeGenerator[Expression, (catalyst.InternalRow) => Boolean] { protected def canonicalize(in: Expression): Expression = ExpressionCanonicalizer.execute(in) protected def bind(in: Expression, inputSchema: Seq[Attribute]): Expression = BindReferences.bindReference(in, inputSchema) - protected def create(predicate: Expression): ((Row) => Boolean) = { + protected def create(predicate: Expression): ((catalyst.InternalRow) => Boolean) = { val ctx = newCodeGenContext() val eval = predicate.gen(ctx) val code = s""" - import org.apache.spark.sql.Row; + import org.apache.spark.sql.catalyst.InternalRow; public SpecificPredicate generate($exprType[] expr) { return new SpecificPredicate(expr); @@ -53,7 +54,7 @@ object GeneratePredicate extends CodeGenerator[Expression, (Row) => Boolean] { } @Override - public boolean eval(Row i) { + public boolean eval(InternalRow i) { ${eval.code} return !${eval.isNull} && ${eval.primitive}; } @@ -65,6 +66,6 @@ object GeneratePredicate extends CodeGenerator[Expression, (Row) => Boolean] { // fetch the only one method `generate(Expression[])` val m = c.getDeclaredMethods()(0) val p = m.invoke(c.newInstance(), ctx.references.toArray).asInstanceOf[Predicate] - (r: Row) => p.eval(r) + (r: catalyst.InternalRow) => p.eval(r) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala index 9b906c3..8b5dc19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala @@ -27,9 +27,10 @@ import org.apache.spark.sql.types._ abstract class BaseProject extends Projection {} /** - * Generates bytecode that produces a new [[Row]] object based on a fixed set of input - * [[Expression Expressions]] and a given input [[Row]]. The returned [[Row]] object is custom - * generated based on the output types of the [[Expression]] to avoid boxing of primitive values. + * Generates bytecode that produces a new [[InternalRow]] object based on a fixed set of input + * [[Expression Expressions]] and a given input [[InternalRow]]. The returned [[InternalRow]] + * object is custom generated based on the output types of the [[Expression]] to avoid boxing of + * primitive values. */ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { import scala.reflect.runtime.universe._ @@ -146,7 +147,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { }.mkString("\n") val code = s""" - import org.apache.spark.sql.Row; + import org.apache.spark.sql.catalyst.InternalRow; public SpecificProjection generate($exprType[] expr) { return new SpecificProjection(expr); @@ -161,7 +162,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { @Override public Object apply(Object r) { - return new SpecificRow(expressions, (Row) r); + return new SpecificRow(expressions, (InternalRow) r); } } @@ -169,7 +170,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { $columns - public SpecificRow($exprType[] expressions, Row i) { + public SpecificRow($exprType[] expressions, InternalRow i) { $initColumns } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala index 6398b8f..a6913cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst import org.apache.spark.sql.types._ @@ -41,7 +42,7 @@ case class CreateArray(children: Seq[Expression]) extends Expression { override def nullable: Boolean = false - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { children.map(_.eval(input)) } @@ -69,7 +70,7 @@ case class CreateStruct(children: Seq[NamedExpression]) extends Expression { override def nullable: Boolean = false - override def eval(input: Row): Any = { - Row(children.map(_.eval(input)): _*) + override def eval(input: catalyst.InternalRow): Any = { + InternalRow(children.map(_.eval(input)): _*) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala index 72b9f23..a119c31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.types.{BooleanType, DataType} @@ -42,7 +43,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi override def dataType: DataType = trueValue.dataType - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { if (true == predicate.eval(input)) { trueValue.eval(input) } else { @@ -137,7 +138,7 @@ case class CaseWhen(branches: Seq[Expression]) extends CaseWhenLike { } /** Written in imperative fashion for performance considerations. */ - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val len = branchesArr.length var i = 0 // If all branches fail and an elseVal is not provided, the whole statement @@ -229,7 +230,7 @@ case class CaseKeyWhen(key: Expression, branches: Seq[Expression]) extends CaseW } /** Written in imperative fashion for performance considerations. */ - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val evaluatedKey = key.eval(input) val len = branchesArr.length var i = 0 http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala index 8ab6d97..de8b66b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalFunctions.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} +import org.apache.spark.sql.catalyst +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} import org.apache.spark.sql.types._ /** Return the unscaled Long value of a Decimal, assuming it fits in a Long */ @@ -28,7 +29,7 @@ case class UnscaledValue(child: Expression) extends UnaryExpression { override def nullable: Boolean = child.nullable override def toString: String = s"UnscaledValue($child)" - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val childResult = child.eval(input) if (childResult == null) { null @@ -50,7 +51,7 @@ case class MakeDecimal(child: Expression, precision: Int, scale: Int) extends Un override def nullable: Boolean = child.nullable override def toString: String = s"MakeDecimal($child,$precision,$scale)" - override def eval(input: Row): Decimal = { + override def eval(input: catalyst.InternalRow): Decimal = { val childResult = child.eval(input) if (childResult == null) { null http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index b6191ea..a80c255 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import scala.collection.Map +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees} import org.apache.spark.sql.types._ @@ -53,13 +54,13 @@ abstract class Generator extends Expression { def elementTypes: Seq[(DataType, Boolean)] /** Should be implemented by child classes to perform specific Generators. */ - override def eval(input: Row): TraversableOnce[Row] + override def eval(input: catalyst.InternalRow): TraversableOnce[catalyst.InternalRow] /** * Notifies that there are no more rows to process, clean up code, and additional * rows can be made here. */ - def terminate(): TraversableOnce[Row] = Nil + def terminate(): TraversableOnce[catalyst.InternalRow] = Nil } /** @@ -67,22 +68,22 @@ abstract class Generator extends Expression { */ case class UserDefinedGenerator( elementTypes: Seq[(DataType, Boolean)], - function: Row => TraversableOnce[Row], + function: catalyst.InternalRow => TraversableOnce[catalyst.InternalRow], children: Seq[Expression]) extends Generator { @transient private[this] var inputRow: InterpretedProjection = _ - @transient private[this] var convertToScala: (Row) => Row = _ + @transient private[this] var convertToScala: (catalyst.InternalRow) => catalyst.InternalRow = _ private def initializeConverters(): Unit = { inputRow = new InterpretedProjection(children) convertToScala = { val inputSchema = StructType(children.map(e => StructField(e.simpleString, e.dataType, true))) CatalystTypeConverters.createToScalaConverter(inputSchema) - }.asInstanceOf[(Row => Row)] + }.asInstanceOf[(catalyst.InternalRow => catalyst.InternalRow)] } - override def eval(input: Row): TraversableOnce[Row] = { + override def eval(input: catalyst.InternalRow): TraversableOnce[catalyst.InternalRow] = { if (inputRow == null) { initializeConverters() } @@ -108,7 +109,7 @@ case class Explode(child: Expression) case MapType(kt, vt, valueContainsNull) => (kt, false) :: (vt, valueContainsNull) :: Nil } - override def eval(input: Row): TraversableOnce[Row] = { + override def eval(input: catalyst.InternalRow): TraversableOnce[catalyst.InternalRow] = { child.dataType match { case ArrayType(_, _) => val inputArray = child.eval(input).asInstanceOf[Seq[Any]] http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index a33007b..d8fff2b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.{Date, Timestamp} +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} import org.apache.spark.sql.catalyst.util.DateUtils @@ -87,7 +88,7 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres case _ => false } - override def eval(input: Row): Any = value + override def eval(input: catalyst.InternalRow): Any = value override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { // change the isNull and primitive to consts, to inline them @@ -142,9 +143,9 @@ case class Literal protected (value: Any, dataType: DataType) extends LeafExpres case class MutableLiteral(var value: Any, dataType: DataType, nullable: Boolean = true) extends LeafExpression { - def update(expression: Expression, input: Row): Unit = { + def update(expression: Expression, input: catalyst.InternalRow): Unit = { value = expression.eval(input) } - override def eval(input: Row): Any = value + override def eval(input: catalyst.InternalRow): Any = value } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala index 97e960b..6f90d60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/math.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.types.{DataType, DoubleType} @@ -34,7 +35,7 @@ abstract class LeafMathExpression(c: Double, name: String) override def nullable: Boolean = false override def toString: String = s"$name()" - override def eval(input: Row): Any = c + override def eval(input: catalyst.InternalRow): Any = c override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { s""" @@ -60,7 +61,7 @@ abstract class UnaryMathExpression(f: Double => Double, name: String) override def nullable: Boolean = true override def toString: String = s"$name($child)" - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val evalE = child.eval(input) if (evalE == null) { null @@ -103,7 +104,7 @@ abstract class BinaryMathExpression(f: (Double, Double) => Double, name: String) override def dataType: DataType = DoubleType - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val evalE1 = left.eval(input) if (evalE1 == null) { null @@ -215,7 +216,7 @@ case class ToRadians(child: Expression) extends UnaryMathExpression(math.toRadia case class Atan2(left: Expression, right: Expression) extends BinaryMathExpression(math.atan2, "ATAN2") { - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { val evalE1 = left.eval(input) if (evalE1 == null) { null http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 2e4b9ba..2050512 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} @@ -114,7 +115,7 @@ case class Alias(child: Expression, name: String)( // Alias(Generator, xx) need to be transformed into Generate(generator, ...) override lazy val resolved = childrenResolved && !child.isInstanceOf[Generator] - override def eval(input: Row): Any = child.eval(input) + override def eval(input: catalyst.InternalRow): Any = child.eval(input) override def gen(ctx: CodeGenContext): GeneratedExpressionCode = child.gen(ctx) @@ -230,7 +231,7 @@ case class AttributeReference( } // Unresolved attributes are transient at compile time and don't get evaluated during execution. - override def eval(input: Row = null): Any = + override def eval(input: catalyst.InternalRow = null): Any = throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") override def toString: String = s"$name#${exprId.id}$typeSuffix" @@ -252,7 +253,7 @@ case class PrettyAttribute(name: String) extends Attribute with trees.LeafNode[E override def withName(newName: String): Attribute = throw new UnsupportedOperationException override def qualifiers: Seq[String] = throw new UnsupportedOperationException override def exprId: ExprId = throw new UnsupportedOperationException - override def eval(input: Row): Any = throw new UnsupportedOperationException + override def eval(input: catalyst.InternalRow): Any = throw new UnsupportedOperationException override def nullable: Boolean = throw new UnsupportedOperationException override def dataType: DataType = NullType } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala index c2d1a4e..292d626 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullFunctions.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.UnresolvedException @@ -43,7 +44,7 @@ case class Coalesce(children: Seq[Expression]) extends Expression { this, s"Coalesce cannot have children of different types. $childTypes") } - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { var i = 0 var result: Any = null val childIterator = children.iterator @@ -77,7 +78,7 @@ case class IsNull(child: Expression) extends Predicate with trees.UnaryNode[Expr override def foldable: Boolean = child.foldable override def nullable: Boolean = false - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { child.eval(input) == null } @@ -96,7 +97,7 @@ case class IsNotNull(child: Expression) extends Predicate with trees.UnaryNode[E override def nullable: Boolean = false override def toString: String = s"IS NOT NULL $child" - override def eval(input: Row): Any = { + override def eval(input: catalyst.InternalRow): Any = { child.eval(input) != null } @@ -118,7 +119,7 @@ case class AtLeastNNonNulls(n: Int, children: Seq[Expression]) extends Predicate private[this] val childrenArray = children.toArray - override def eval(input: Row): Boolean = { + override def eval(input: catalyst.InternalRow): Boolean = { var numNonNulls = 0 var i = 0 while (i < childrenArray.length && numNonNulls < n) { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index fbc97b2..c2e57b4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst +import org.apache.spark.sql.catalyst + /** * A set of classes that can be used to represent trees of relational expressions. A key goal of * the expression library is to hide the details of naming and scoping from developers who want to @@ -49,30 +51,30 @@ package org.apache.spark.sql.catalyst */ package object expressions { - type Row = org.apache.spark.sql.Row + type InternalRow = catalyst.InternalRow - val Row = org.apache.spark.sql.Row + val InternalRow = catalyst.InternalRow /** - * Converts a [[Row]] to another Row given a sequence of expression that define each column of the - * new row. If the schema of the input row is specified, then the given expression will be bound - * to that schema. + * Converts a [[InternalRow]] to another Row given a sequence of expression that define each + * column of the new row. If the schema of the input row is specified, then the given expression + * will be bound to that schema. */ - abstract class Projection extends (Row => Row) + abstract class Projection extends (InternalRow => InternalRow) /** - * Converts a [[Row]] to another Row given a sequence of expression that define each column of the - * new row. If the schema of the input row is specified, then the given expression will be bound - * to that schema. + * Converts a [[InternalRow]] to another Row given a sequence of expression that define each + * column of the new row. If the schema of the input row is specified, then the given expression + * will be bound to that schema. * * In contrast to a normal projection, a MutableProjection reuses the same underlying row object * each time an input row is added. This significantly reduces the cost of calculating the - * projection, but means that it is not safe to hold on to a reference to a [[Row]] after `next()` - * has been called on the [[Iterator]] that produced it. Instead, the user must call `Row.copy()` - * and hold on to the returned [[Row]] before calling `next()`. + * projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after + * `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call + * `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`. */ abstract class MutableProjection extends Projection { - def currentValue: Row + def currentValue: InternalRow /** Uses the given row to store the output of the projection. */ def target(row: MutableRow): MutableProjection http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 7574d1c..082d72e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -24,11 +24,11 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types._ object InterpretedPredicate { - def create(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) = + def create(expression: Expression, inputSchema: Seq[Attribute]): (InternalRow => Boolean) = create(BindReferences.bindReference(expression, inputSchema)) - def create(expression: Expression): (Row => Boolean) = { - (r: Row) => expression.eval(r).asInstanceOf[Boolean] + def create(expression: Expression): (InternalRow => Boolean) = { + (r: InternalRow) => expression.eval(r).asInstanceOf[Boolean] } } @@ -77,7 +77,7 @@ case class Not(child: Expression) extends UnaryExpression with Predicate with Ex override def expectedChildTypes: Seq[DataType] = Seq(BooleanType) - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { child.eval(input) match { case null => null case b: Boolean => !b @@ -98,7 +98,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { override def nullable: Boolean = true // TODO: Figure out correct nullability semantics of IN. override def toString: String = s"$value IN ${list.mkString("(", ",", ")")}" - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val evaluatedValue = value.eval(input) list.exists(e => e.eval(input) == evaluatedValue) } @@ -117,7 +117,7 @@ case class InSet(value: Expression, hset: Set[Any]) override def nullable: Boolean = true // TODO: Figure out correct nullability semantics of IN. override def toString: String = s"$value INSET ${hset.mkString("(", ",", ")")}" - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { hset.contains(value.eval(input)) } } @@ -129,7 +129,7 @@ case class And(left: Expression, right: Expression) override def symbol: String = "&&" - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val l = left.eval(input) if (l == false) { false @@ -178,7 +178,7 @@ case class Or(left: Expression, right: Expression) override def symbol: String = "||" - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val l = left.eval(input) if (l == true) { true @@ -235,7 +235,7 @@ abstract class BinaryComparison extends BinaryExpression with Predicate { protected def checkTypesInternal(t: DataType): TypeCheckResult - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val evalE1 = left.eval(input) if (evalE1 == null) { null @@ -288,7 +288,7 @@ case class EqualNullSafe(left: Expression, right: Expression) extends BinaryComp override protected def checkTypesInternal(t: DataType) = TypeCheckResult.TypeCheckSuccess - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val l = left.eval(input) val r = right.eval(input) if (l == null && r == null) { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala index 6e4e9cb..7e80333 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/random.scala @@ -48,7 +48,7 @@ abstract class RDG(seed: Long) extends LeafExpression with Serializable { /** Generate a random column with i.i.d. uniformly distributed values in [0, 1). */ case class Rand(seed: Long) extends RDG(seed) { - override def eval(input: Row): Double = rng.nextDouble() + override def eval(input: InternalRow): Double = rng.nextDouble() } object Rand { @@ -62,7 +62,7 @@ object Rand { /** Generate a random column with i.i.d. gaussian random distribution. */ case class Randn(seed: Long) extends RDG(seed) { - override def eval(input: Row): Double = rng.nextGaussian() + override def eval(input: InternalRow): Double = rng.nextGaussian() } object Randn { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala index 5d2d820..534dac1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala @@ -21,10 +21,10 @@ import org.apache.spark.sql.types.{DataType, StructType, AtomicType} import org.apache.spark.unsafe.types.UTF8String /** - * An extended interface to [[Row]] that allows the values for each column to be updated. Setting - * a value through a primitive function implicitly marks that column as not null. + * An extended interface to [[InternalRow]] that allows the values for each column to be updated. + * Setting a value through a primitive function implicitly marks that column as not null. */ -trait MutableRow extends Row { +trait MutableRow extends InternalRow { def setNullAt(i: Int): Unit def update(ordinal: Int, value: Any) @@ -37,13 +37,12 @@ trait MutableRow extends Row { def setByte(ordinal: Int, value: Byte) def setFloat(ordinal: Int, value: Float) def setString(ordinal: Int, value: String) - // TODO(davies): add setDate() and setDecimal() } /** * A row with no data. Calling any methods will result in an error. Can be used as a placeholder. */ -object EmptyRow extends Row { +object EmptyRow extends InternalRow { override def apply(i: Int): Any = throw new UnsupportedOperationException override def toSeq: Seq[Any] = Seq.empty override def length: Int = 0 @@ -57,7 +56,7 @@ object EmptyRow extends Row { override def getByte(i: Int): Byte = throw new UnsupportedOperationException override def getString(i: Int): String = throw new UnsupportedOperationException override def getAs[T](i: Int): T = throw new UnsupportedOperationException - override def copy(): Row = this + override def copy(): InternalRow = this } /** @@ -65,7 +64,7 @@ object EmptyRow extends Row { * the array is not copied, and thus could technically be mutated after creation, this is not * allowed. */ -class GenericRow(protected[sql] val values: Array[Any]) extends Row { +class GenericRow(protected[sql] val values: Array[Any]) extends InternalRow { /** No-arg constructor for serialization. */ protected def this() = this(null) @@ -154,7 +153,7 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row { } override def equals(o: Any): Boolean = o match { - case other: Row => + case other: InternalRow => if (values.length != other.length) { return false } @@ -174,7 +173,7 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row { case _ => false } - override def copy(): Row = this + override def copy(): InternalRow = this } class GenericRowWithSchema(values: Array[Any], override val schema: StructType) @@ -207,15 +206,15 @@ class GenericMutableRow(v: Array[Any]) extends GenericRow(v) with MutableRow { override def update(ordinal: Int, value: Any): Unit = { values(ordinal) = value } - override def copy(): Row = new GenericRow(values.clone()) + override def copy(): InternalRow = new GenericRow(values.clone()) } -class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] { +class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] { def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = this(ordering.map(BindReferences.bindReference(_, inputSchema))) - def compare(a: Row, b: Row): Int = { + def compare(a: InternalRow, b: InternalRow): Int = { var i = 0 while (i < ordering.size) { val order = ordering(i) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala index 2bcb960..30e4167 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/sets.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenContext, GeneratedExpressionCode} import org.apache.spark.sql.types._ import org.apache.spark.util.collection.OpenHashSet @@ -57,7 +57,7 @@ case class NewSet(elementType: DataType) extends LeafExpression { override def dataType: OpenHashSetUDT = new OpenHashSetUDT(elementType) - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { new OpenHashSet[Any]() } @@ -87,7 +87,7 @@ case class AddItemToSet(item: Expression, set: Expression) extends Expression { override def dataType: OpenHashSetUDT = set.dataType.asInstanceOf[OpenHashSetUDT] - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val itemEval = item.eval(input) val setEval = set.eval(input).asInstanceOf[OpenHashSet[Any]] @@ -137,7 +137,7 @@ case class CombineSets(left: Expression, right: Expression) extends BinaryExpres override def symbol: String = "++=" - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val leftEval = left.eval(input).asInstanceOf[OpenHashSet[Any]] if(leftEval != null) { val rightEval = right.eval(input).asInstanceOf[OpenHashSet[Any]] @@ -183,7 +183,7 @@ case class CountSet(child: Expression) extends UnaryExpression { override def dataType: DataType = LongType - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val childEval = child.eval(input).asInstanceOf[OpenHashSet[Any]] if (childEval != null) { childEval.size.toLong http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala index 4f4c195..8ca8d22 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala @@ -49,7 +49,7 @@ trait StringRegexExpression extends ExpectsInputTypes { protected def pattern(str: String) = if (cache == null) compile(str) else cache - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val l = left.eval(input) if (l == null) { null @@ -121,7 +121,7 @@ trait CaseConversionExpression extends ExpectsInputTypes { override def dataType: DataType = StringType override def expectedChildTypes: Seq[DataType] = Seq(StringType) - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val evaluated = child.eval(input) if (evaluated == null) { null @@ -169,7 +169,7 @@ trait StringComparison extends ExpectsInputTypes { override def expectedChildTypes: Seq[DataType] = Seq(StringType, StringType) - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val leftEval = left.eval(input) if(leftEval == null) { null @@ -262,7 +262,7 @@ case class Substring(str: Expression, pos: Expression, len: Expression) (start, end) } - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val string = str.eval(input) val po = pos.eval(input) val ln = len.eval(input) @@ -303,7 +303,7 @@ case class StringLength(child: Expression) extends UnaryExpression with ExpectsI override def dataType: DataType = IntegerType override def expectedChildTypes: Seq[DataType] = Seq(StringType) - override def eval(input: Row): Any = { + override def eval(input: InternalRow): Any = { val string = child.eval(input) if (string == null) null else string.asInstanceOf[UTF8String].length } @@ -314,5 +314,3 @@ case class StringLength(child: Expression) extends UnaryExpression with ExpectsI defineCodeGen(ctx, ev, c => s"($c).length()") } } - - http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 82c4d46..056f170 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -74,7 +74,7 @@ case class WindowSpecDefinition( override def toString: String = simpleString - override def eval(input: Row): Any = throw new UnsupportedOperationException + override def eval(input: InternalRow): Any = throw new UnsupportedOperationException override def nullable: Boolean = true override def foldable: Boolean = false override def dataType: DataType = throw new UnsupportedOperationException @@ -259,7 +259,7 @@ trait WindowFunction extends Expression { def reset(): Unit - def prepareInputParameters(input: Row): AnyRef + def prepareInputParameters(input: InternalRow): AnyRef def update(input: AnyRef): Unit @@ -286,7 +286,7 @@ case class UnresolvedWindowFunction( throw new UnresolvedException(this, "init") override def reset(): Unit = throw new UnresolvedException(this, "reset") - override def prepareInputParameters(input: Row): AnyRef = + override def prepareInputParameters(input: InternalRow): AnyRef = throw new UnresolvedException(this, "prepareInputParameters") override def update(input: AnyRef): Unit = throw new UnresolvedException(this, "update") @@ -297,7 +297,7 @@ case class UnresolvedWindowFunction( override def get(index: Int): Any = throw new UnresolvedException(this, "get") // Unresolved functions are transient at compile time and don't get evaluated during execution. - override def eval(input: Row = null): Any = + override def eval(input: InternalRow = null): Any = throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") override def toString: String = s"'$name(${children.mkString(",")})" @@ -316,7 +316,7 @@ case class UnresolvedWindowExpression( override lazy val resolved = false // Unresolved functions are transient at compile time and don't get evaluated during execution. - override def eval(input: Row = null): Any = + override def eval(input: InternalRow = null): Any = throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") } @@ -327,7 +327,7 @@ case class WindowExpression( override def children: Seq[Expression] = windowFunction :: windowSpec :: Nil - override def eval(input: Row): Any = + override def eval(input: InternalRow): Any = throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") override def dataType: DataType = windowFunction.dataType http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index e3e070f..2c946cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -17,10 +17,9 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, analysis} import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.types.{StructType, StructField} +import org.apache.spark.sql.catalyst.{InternalRow, CatalystTypeConverters, analysis} +import org.apache.spark.sql.types.{StructField, StructType} object LocalRelation { def apply(output: Attribute*): LocalRelation = new LocalRelation(output) @@ -32,11 +31,11 @@ object LocalRelation { def fromProduct(output: Seq[Attribute], data: Seq[Product]): LocalRelation = { val schema = StructType.fromAttributes(output) val converter = CatalystTypeConverters.createToCatalystConverter(schema) - LocalRelation(output, data.map(converter(_).asInstanceOf[Row])) + LocalRelation(output, data.map(converter(_).asInstanceOf[InternalRow])) } } -case class LocalRelation(output: Seq[Attribute], data: Seq[Row] = Nil) +case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) extends LeafNode with analysis.MultiInstanceRelation { /** http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 80ba57a..42dead7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.catalyst.plans.physical +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.TreeNodeException -import org.apache.spark.sql.catalyst.expressions.{Expression, Row, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Expression, SortOrder} import org.apache.spark.sql.types.{DataType, IntegerType} /** @@ -169,7 +170,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) override def keyExpressions: Seq[Expression] = expressions - override def eval(input: Row = null): Any = + override def eval(input: InternalRow = null): Any = throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") } @@ -213,6 +214,6 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) override def keyExpressions: Seq[Expression] = ordering.map(_.child) - override def eval(input: Row): Any = + override def eval(input: InternalRow): Any = throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}") } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala index 9a24b23..b4d5e01 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala @@ -21,7 +21,7 @@ import java.math.BigInteger import java.sql.{Date, Timestamp} import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ case class PrimitiveData( @@ -257,7 +257,7 @@ class ScalaReflectionSuite extends SparkFunSuite { test("convert PrimitiveData to catalyst") { val data = PrimitiveData(1, 1, 1, 1, 1, 1, true) - val convertedData = Row(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true) + val convertedData = InternalRow(1, 1.toLong, 1.toDouble, 1.toFloat, 1.toShort, 1.toByte, true) val dataType = schemaFor[PrimitiveData].dataType assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData) } @@ -267,8 +267,8 @@ class ScalaReflectionSuite extends SparkFunSuite { val data = OptionalData(Some(2), Some(2), Some(2), Some(2), Some(2), Some(2), Some(true), Some(primitiveData)) val dataType = schemaFor[OptionalData].dataType - val convertedData = Row(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true, - Row(1, 1, 1, 1, 1, 1, true)) + val convertedData = InternalRow(2, 2.toLong, 2.toDouble, 2.toFloat, 2.toShort, 2.toByte, true, + InternalRow(1, 1, 1, 1, 1, 1, true)) assert(CatalystTypeConverters.convertToCatalyst(data, dataType) === convertedData) } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index 969c6cc..e407f6f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -437,14 +437,14 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { test("cast from struct") { val struct = Literal.create( - Row("123", "abc", "", null), + InternalRow("123", "abc", "", null), StructType(Seq( StructField("a", StringType, nullable = true), StructField("b", StringType, nullable = true), StructField("c", StringType, nullable = true), StructField("d", StringType, nullable = true)))) val struct_notNull = Literal.create( - Row("123", "abc", ""), + InternalRow("123", "abc", ""), StructType(Seq( StructField("a", StringType, nullable = false), StructField("b", StringType, nullable = false), @@ -457,7 +457,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { StructField("c", IntegerType, nullable = true), StructField("d", IntegerType, nullable = true)))) assert(ret.resolved === true) - checkEvaluation(ret, Row(123, null, null, null)) + checkEvaluation(ret, InternalRow(123, null, null, null)) } { val ret = cast(struct, StructType(Seq( @@ -474,7 +474,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { StructField("c", BooleanType, nullable = true), StructField("d", BooleanType, nullable = true)))) assert(ret.resolved === true) - checkEvaluation(ret, Row(true, true, false, null)) + checkEvaluation(ret, InternalRow(true, true, false, null)) } { val ret = cast(struct, StructType(Seq( @@ -491,7 +491,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { StructField("b", IntegerType, nullable = true), StructField("c", IntegerType, nullable = true)))) assert(ret.resolved === true) - checkEvaluation(ret, Row(123, null, null)) + checkEvaluation(ret, InternalRow(123, null, null)) } { val ret = cast(struct_notNull, StructType(Seq( @@ -506,7 +506,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { StructField("b", BooleanType, nullable = true), StructField("c", BooleanType, nullable = true)))) assert(ret.resolved === true) - checkEvaluation(ret, Row(true, true, false)) + checkEvaluation(ret, InternalRow(true, true, false)) } { val ret = cast(struct_notNull, StructType(Seq( @@ -514,7 +514,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { StructField("b", BooleanType, nullable = true), StructField("c", BooleanType, nullable = false)))) assert(ret.resolved === true) - checkEvaluation(ret, Row(true, true, false)) + checkEvaluation(ret, InternalRow(true, true, false)) } { @@ -532,10 +532,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { test("complex casting") { val complex = Literal.create( - Row( + InternalRow( Seq("123", "abc", ""), Map("a" -> "123", "b" -> "abc", "c" -> ""), - Row(0)), + InternalRow(0)), StructType(Seq( StructField("a", ArrayType(StringType, containsNull = false), nullable = true), @@ -555,10 +555,10 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { StructField("l", LongType, nullable = true))))))) assert(ret.resolved === true) - checkEvaluation(ret, Row( + checkEvaluation(ret, InternalRow( Seq(123, null, null), Map("a" -> true, "b" -> true, "c" -> false), - Row(0L))) + InternalRow(0L))) } } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala index bcc594c..2b0f461 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ComplexTypeSuite.scala @@ -27,10 +27,10 @@ import org.apache.spark.unsafe.types.UTF8String class ComplexTypeSuite extends SparkFunSuite with ExpressionEvalHelper { test("CreateStruct") { - val row = Row(1, 2, 3) + val row = InternalRow(1, 2, 3) val c1 = 'a.int.at(0).as("a") val c3 = 'c.int.at(2).as("c") - checkEvaluation(CreateStruct(Seq(c1, c3)), Row(1, 3), row) + checkEvaluation(CreateStruct(Seq(c1, c3)), InternalRow(1, 3), row) } test("complex type") { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 4a241d3..12d2da8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -32,26 +32,26 @@ import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project} trait ExpressionEvalHelper { self: SparkFunSuite => - protected def create_row(values: Any*): Row = { + protected def create_row(values: Any*): InternalRow = { new GenericRow(values.map(CatalystTypeConverters.convertToCatalyst).toArray) } protected def checkEvaluation( - expression: Expression, expected: Any, inputRow: Row = EmptyRow): Unit = { + expression: Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { checkEvaluationWithoutCodegen(expression, expected, inputRow) checkEvaluationWithGeneratedMutableProjection(expression, expected, inputRow) checkEvaluationWithGeneratedProjection(expression, expected, inputRow) checkEvaluationWithOptimization(expression, expected, inputRow) } - protected def evaluate(expression: Expression, inputRow: Row = EmptyRow): Any = { + protected def evaluate(expression: Expression, inputRow: InternalRow = EmptyRow): Any = { expression.eval(inputRow) } protected def checkEvaluationWithoutCodegen( expression: Expression, expected: Any, - inputRow: Row = EmptyRow): Unit = { + inputRow: InternalRow = EmptyRow): Unit = { val actual = try evaluate(expression, inputRow) catch { case e: Exception => fail(s"Exception evaluating $expression", e) } @@ -66,7 +66,7 @@ trait ExpressionEvalHelper { protected def checkEvaluationWithGeneratedMutableProjection( expression: Expression, expected: Any, - inputRow: Row = EmptyRow): Unit = { + inputRow: InternalRow = EmptyRow): Unit = { val plan = try { GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil)() @@ -92,7 +92,7 @@ trait ExpressionEvalHelper { protected def checkEvaluationWithGeneratedProjection( expression: Expression, expected: Any, - inputRow: Row = EmptyRow): Unit = { + inputRow: InternalRow = EmptyRow): Unit = { val ctx = GenerateProjection.newCodeGenContext() lazy val evaluated = expression.gen(ctx) @@ -128,7 +128,7 @@ trait ExpressionEvalHelper { protected def checkEvaluationWithOptimization( expression: Expression, expected: Any, - inputRow: Row = EmptyRow): Unit = { + inputRow: InternalRow = EmptyRow): Unit = { val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation) val optimizedPlan = DefaultOptimizer.execute(plan) checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow) @@ -137,7 +137,7 @@ trait ExpressionEvalHelper { protected def checkDoubleEvaluation( expression: Expression, expected: Spread[Double], - inputRow: Row = EmptyRow): Unit = { + inputRow: InternalRow = EmptyRow): Unit = { val actual = try evaluate(expression, inputRow) catch { case e: Exception => fail(s"Exception evaluating $expression", e) } http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala index 72bbc4e..7aae2bb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeFixedWidthAggregationMapSuite.scala @@ -37,7 +37,7 @@ class UnsafeFixedWidthAggregationMapSuite private val groupKeySchema = StructType(StructField("product", StringType) :: Nil) private val aggBufferSchema = StructType(StructField("salePrice", IntegerType) :: Nil) - private def emptyAggregationBuffer: Row = new GenericRow(Array[Any](0)) + private def emptyAggregationBuffer: InternalRow = new GenericRow(Array[Any](0)) private var memoryManager: TaskMemoryManager = null http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala index 61722f1..577c7a0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala @@ -86,7 +86,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers { DoubleType) val converter = new UnsafeRowConverter(fieldTypes) - val rowWithAllNullColumns: Row = { + val rowWithAllNullColumns: InternalRow = { val r = new SpecificMutableRow(fieldTypes) for (i <- 0 to fieldTypes.length - 1) { r.setNullAt(i) @@ -117,7 +117,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers { // If we have an UnsafeRow with columns that are initially non-null and we null out those // columns, then the serialized row representation should be identical to what we would get by // creating an entirely null row via the converter - val rowWithNoNullColumns: Row = { + val rowWithNoNullColumns: InternalRow = { val r = new SpecificMutableRow(fieldTypes) r.setNullAt(0) r.setBoolean(1, false) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala index 6841bd9..54e8c64 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute -import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -37,13 +37,11 @@ class ConvertToLocalRelationSuite extends PlanTest { test("Project on LocalRelation should be turned into a single LocalRelation") { val testRelation = LocalRelation( LocalRelation('a.int, 'b.int).output, - Row(1, 2) :: - Row(4, 5) :: Nil) + InternalRow(1, 2) :: InternalRow(4, 5) :: Nil) val correctAnswer = LocalRelation( LocalRelation('a1.int, 'b1.int).output, - Row(1, 3) :: - Row(4, 6) :: Nil) + InternalRow(1, 3) :: InternalRow(4, 6) :: Nil) val projectOnLocal = testRelation.select( UnresolvedAttribute("a").as("a1"), http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 8ec79c3..bda2179 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -28,7 +28,7 @@ case class Dummy(optKey: Option[Expression]) extends Expression { override def nullable: Boolean = true override def dataType: NullType = NullType override lazy val resolved = true - override def eval(input: Row): Any = null.asInstanceOf[Any] + override def eval(input: InternalRow): Any = null.asInstanceOf[Any] } case class ComplexPlan(exprs: Seq[Seq[Expression]]) http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala index a424554..4d8fe4a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateUtilsSuite.scala @@ -21,7 +21,6 @@ import java.sql.Timestamp import org.apache.spark.SparkFunSuite - class DateUtilsSuite extends SparkFunSuite { test("timestamp") { http://git-wip-us.apache.org/repos/asf/spark/blob/d46f8e5d/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index f041fd3..f1acdfe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql import java.io.CharArrayWriter import java.util.Properties -import scala.collection.JavaConversions._ import scala.language.implicitConversions import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -33,7 +32,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.python.SerDeUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.analysis.{MultiAlias, ResolvedStar, UnresolvedAttribute, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.{MultiAlias, ResolvedStar, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _} import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} @@ -1032,7 +1031,8 @@ class DataFrame private[sql]( val names = schema.toAttributes.map(_.name) val rowFunction = - f.andThen(_.map(CatalystTypeConverters.convertToCatalyst(_, schema).asInstanceOf[Row])) + f.andThen(_.map(CatalystTypeConverters.convertToCatalyst(_, schema) + .asInstanceOf[InternalRow])) val generator = UserDefinedGenerator(elementTypes, rowFunction, input.map(_.expr)) Generate(generator, join = true, outer = false, @@ -1058,8 +1058,9 @@ class DataFrame private[sql]( val elementTypes = attributes.map { attr => (attr.dataType, attr.nullable) } val names = attributes.map(_.name) - def rowFunction(row: Row): TraversableOnce[Row] = { - f(row(0).asInstanceOf[A]).map(o => Row(CatalystTypeConverters.convertToCatalyst(o, dataType))) + def rowFunction(row: Row): TraversableOnce[InternalRow] = { + f(row(0).asInstanceOf[A]).map(o => + catalyst.InternalRow(CatalystTypeConverters.convertToCatalyst(o, dataType))) } val generator = UserDefinedGenerator(elementTypes, rowFunction, apply(inputColumn).expr :: Nil) @@ -1221,7 +1222,7 @@ class DataFrame private[sql]( val outputCols = (if (cols.isEmpty) numericColumns.map(_.prettyString) else cols).toList - val ret: Seq[Row] = if (outputCols.nonEmpty) { + val ret: Seq[InternalRow] = if (outputCols.nonEmpty) { val aggExprs = statistics.flatMap { case (_, colToAgg) => outputCols.map(c => Column(Cast(colToAgg(Column(c).expr), StringType)).as(c)) } @@ -1230,11 +1231,12 @@ class DataFrame private[sql]( // Pivot the data so each summary is one row row.grouped(outputCols.size).toSeq.zip(statistics).map { - case (aggregation, (statistic, _)) => Row(statistic :: aggregation.toList: _*) + case (aggregation, (statistic, _)) => + catalyst.InternalRow(statistic :: aggregation.toList: _*) } } else { // If there are no output columns, just output a single column that contains the stats. - statistics.map { case (name, _) => Row(name) } + statistics.map { case (name, _) => catalyst.InternalRow(name) } } // All columns are string type --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
