Repository: spark Updated Branches: refs/heads/master 76f2e393a -> 4a8bb9d00
Revert "[SPARK-9458] Avoid object allocation in prefix generation." This reverts commit 9514d874f0cf61f1eb4ec4f5f66e053119f769c9. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a8bb9d0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a8bb9d0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a8bb9d0 Branch: refs/heads/master Commit: 4a8bb9d00d8181aff5f5183194d9aa2a65deacdf Parents: 76f2e39 Author: Reynold Xin <[email protected]> Authored: Thu Jul 30 01:04:24 2015 -0700 Committer: Reynold Xin <[email protected]> Committed: Thu Jul 30 01:04:24 2015 -0700 ---------------------------------------------------------------------- .../unsafe/sort/PrefixComparators.java | 16 ++++++ .../unsafe/sort/PrefixComparatorsSuite.scala | 12 +++++ .../sql/execution/UnsafeExternalRowSorter.java | 2 +- .../spark/sql/execution/SortPrefixUtils.scala | 51 +++++++++++--------- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../org/apache/spark/sql/execution/sort.scala | 5 +- .../execution/RowFormatConvertersSuite.scala | 2 +- .../sql/execution/UnsafeExternalSortSuite.scala | 10 ++-- 8 files changed, 67 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java index a9ee604..600aff7 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java @@ -29,6 +29,7 @@ public class PrefixComparators { public static final StringPrefixComparator STRING = new StringPrefixComparator(); public static final IntegralPrefixComparator INTEGRAL = new IntegralPrefixComparator(); + public static final FloatPrefixComparator FLOAT = new FloatPrefixComparator(); public static final DoublePrefixComparator DOUBLE = new DoublePrefixComparator(); public static final class StringPrefixComparator extends PrefixComparator { @@ -54,6 +55,21 @@ public class PrefixComparators { public final long NULL_PREFIX = Long.MIN_VALUE; } + public static final class FloatPrefixComparator extends PrefixComparator { + @Override + public int compare(long aPrefix, long bPrefix) { + float a = Float.intBitsToFloat((int) aPrefix); + float b = Float.intBitsToFloat((int) bPrefix); + return Utils.nanSafeCompareFloats(a, b); + } + + public long computePrefix(float value) { + return Float.floatToIntBits(value) & 0xffffffffL; + } + + public final long NULL_PREFIX = computePrefix(Float.NEGATIVE_INFINITY); + } + public static final class DoublePrefixComparator extends PrefixComparator { @Override public int compare(long aPrefix, long bPrefix) { http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala index 26b7a9e..cf53a8a 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala @@ -55,6 +55,18 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks { forAll { (s1: String, s2: String) => testPrefixComparison(s1, s2) } } + test("float prefix comparator handles NaN properly") { + val nan1: Float = java.lang.Float.intBitsToFloat(0x7f800001) + val nan2: Float = java.lang.Float.intBitsToFloat(0x7fffffff) + assert(nan1.isNaN) + assert(nan2.isNaN) + val nan1Prefix = PrefixComparators.FLOAT.computePrefix(nan1) + val nan2Prefix = PrefixComparators.FLOAT.computePrefix(nan2) + assert(nan1Prefix === nan2Prefix) + val floatMaxPrefix = PrefixComparators.FLOAT.computePrefix(Float.MaxValue) + assert(PrefixComparators.FLOAT.compare(nan1Prefix, floatMaxPrefix) === 1) + } + test("double prefix comparator handles NaNs properly") { val nan1: Double = java.lang.Double.longBitsToDouble(0x7ff0000000000001L) val nan2: Double = java.lang.Double.longBitsToDouble(0x7fffffffffffffffL) http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index 8342833..4c3f2c6 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -121,7 +121,7 @@ final class UnsafeExternalRowSorter { // here in order to prevent memory leaks. cleanupResources(); } - return new AbstractScalaRowIterator<InternalRow>() { + return new AbstractScalaRowIterator() { private final int numFields = schema.length(); private UnsafeRow row = new UnsafeRow(); http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala index 050d27f..2dee354 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{BoundReference, SortOrder} +import org.apache.spark.sql.catalyst.expressions.SortOrder import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, PrefixComparator} @@ -39,54 +39,57 @@ object SortPrefixUtils { sortOrder.dataType match { case StringType => PrefixComparators.STRING case BooleanType | ByteType | ShortType | IntegerType | LongType => PrefixComparators.INTEGRAL - case FloatType | DoubleType => PrefixComparators.DOUBLE + case FloatType => PrefixComparators.FLOAT + case DoubleType => PrefixComparators.DOUBLE case _ => NoOpPrefixComparator } } def getPrefixComputer(sortOrder: SortOrder): InternalRow => Long = { - val bound = sortOrder.child.asInstanceOf[BoundReference] - val pos = bound.ordinal sortOrder.dataType match { - case StringType => - (row: InternalRow) => { - PrefixComparators.STRING.computePrefix(row.getUTF8String(pos)) - } + case StringType => (row: InternalRow) => { + PrefixComparators.STRING.computePrefix(sortOrder.child.eval(row).asInstanceOf[UTF8String]) + } case BooleanType => (row: InternalRow) => { - if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX - else if (row.getBoolean(pos)) 1 + val exprVal = sortOrder.child.eval(row) + if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX + else if (sortOrder.child.eval(row).asInstanceOf[Boolean]) 1 else 0 } case ByteType => (row: InternalRow) => { - if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getByte(pos) + val exprVal = sortOrder.child.eval(row) + if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX + else sortOrder.child.eval(row).asInstanceOf[Byte] } case ShortType => (row: InternalRow) => { - if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getShort(pos) + val exprVal = sortOrder.child.eval(row) + if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX + else sortOrder.child.eval(row).asInstanceOf[Short] } case IntegerType => (row: InternalRow) => { - if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getInt(pos) + val exprVal = sortOrder.child.eval(row) + if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX + else sortOrder.child.eval(row).asInstanceOf[Int] } case LongType => (row: InternalRow) => { - if (row.isNullAt(pos)) PrefixComparators.INTEGRAL.NULL_PREFIX else row.getLong(pos) + val exprVal = sortOrder.child.eval(row) + if (exprVal == null) PrefixComparators.INTEGRAL.NULL_PREFIX + else sortOrder.child.eval(row).asInstanceOf[Long] } case FloatType => (row: InternalRow) => { - if (row.isNullAt(pos)) { - PrefixComparators.DOUBLE.NULL_PREFIX - } else { - PrefixComparators.DOUBLE.computePrefix(row.getFloat(pos).toDouble) - } + val exprVal = sortOrder.child.eval(row) + if (exprVal == null) PrefixComparators.FLOAT.NULL_PREFIX + else PrefixComparators.FLOAT.computePrefix(sortOrder.child.eval(row).asInstanceOf[Float]) } case DoubleType => (row: InternalRow) => { - if (row.isNullAt(pos)) { - PrefixComparators.DOUBLE.NULL_PREFIX - } else { - PrefixComparators.DOUBLE.computePrefix(row.getDouble(pos)) - } + val exprVal = sortOrder.child.eval(row) + if (exprVal == null) PrefixComparators.DOUBLE.NULL_PREFIX + else PrefixComparators.DOUBLE.computePrefix(sortOrder.child.eval(row).asInstanceOf[Double]) } case _ => (row: InternalRow) => 0L } http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4ab2c41..f3ef066 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -340,8 +340,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = { if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled && - TungstenSort.supportsSchema(child.schema)) { - execution.TungstenSort(sortExprs, global, child) + UnsafeExternalSort.supportsSchema(child.schema)) { + execution.UnsafeExternalSort(sortExprs, global, child) } else if (sqlContext.conf.externalSortEnabled) { execution.ExternalSort(sortExprs, global, child) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala index d0ad310..f822088 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala @@ -97,7 +97,7 @@ case class ExternalSort( * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will * spill every `frequency` records. */ -case class TungstenSort( +case class UnsafeExternalSort( sortOrder: Seq[SortOrder], global: Boolean, child: SparkPlan, @@ -110,6 +110,7 @@ case class TungstenSort( if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") { + assert(codegenEnabled, "UnsafeExternalSort requires code generation to be enabled") def doSort(iterator: Iterator[InternalRow]): Iterator[InternalRow] = { val ordering = newOrdering(sortOrder, child.output) val boundSortExpression = BindReferences.bindReference(sortOrder.head, child.output) @@ -148,7 +149,7 @@ case class TungstenSort( } @DeveloperApi -object TungstenSort { +object UnsafeExternalSort { /** * Return true if UnsafeExternalSort can sort rows with the given schema, false otherwise. */ http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala index c458f95..7b75f75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala @@ -31,7 +31,7 @@ class RowFormatConvertersSuite extends SparkPlanTest { private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, null)) assert(!outputsSafe.outputsUnsafeRows) - private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, null)) + private val outputsUnsafe = UnsafeExternalSort(Nil, false, PhysicalRDD(Seq.empty, null)) assert(outputsUnsafe.outputsUnsafeRows) test("planner should insert unsafe->safe conversions when required") { http://git-wip-us.apache.org/repos/asf/spark/blob/4a8bb9d0/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala index 9cabc4b..7a4baa9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala @@ -42,7 +42,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll { TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "false") checkThatPlansAgree( (1 to 100).map(v => Tuple1(v)).toDF("a"), - (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)), + (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)), (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)), sortAnswers = false ) @@ -53,7 +53,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll { try { checkThatPlansAgree( (1 to 100).map(v => Tuple1(v)).toDF("a"), - (child: SparkPlan) => Limit(10, TungstenSort('a.asc :: Nil, true, child)), + (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)), (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)), sortAnswers = false ) @@ -68,7 +68,7 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll { val stringLength = 1024 * 1024 * 2 checkThatPlansAgree( Seq(Tuple1("a" * stringLength), Tuple1("b" * stringLength)).toDF("a").repartition(1), - TungstenSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1), + UnsafeExternalSort(sortOrder, global = true, _: SparkPlan, testSpillFrequency = 1), Sort(sortOrder, global = true, _: SparkPlan), sortAnswers = false ) @@ -88,11 +88,11 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll { TestSQLContext.sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))), StructType(StructField("a", dataType, nullable = true) :: Nil) ) - assert(TungstenSort.supportsSchema(inputDf.schema)) + assert(UnsafeExternalSort.supportsSchema(inputDf.schema)) checkThatPlansAgree( inputDf, plan => ConvertToSafe( - TungstenSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)), + UnsafeExternalSort(sortOrder, global = true, plan: SparkPlan, testSpillFrequency = 23)), Sort(sortOrder, global = true, _: SparkPlan), sortAnswers = false ) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
