Repository: spark
Updated Branches:
  refs/heads/branch-1.5 57596fb8c -> 7fa419535


[SPARK-9360] [SQL] Support BinaryType in PrefixComparators for 
UnsafeExternalSort

The current implementation of UnsafeExternalSort uses NoOpPrefixComparator for 
binary-typed data.
So, we need to add BinaryPrefixComparator in PrefixComparators.

Author: Takeshi YAMAMURO <[email protected]>

Closes #7676 from maropu/BinaryTypePrefixComparator and squashes the following 
commits:

fe6f31b [Takeshi YAMAMURO] Apply comments
d943c04 [Takeshi YAMAMURO] Add a codegen'd entry for BinaryType in SortPrefix
ecf3ac5 [Takeshi YAMAMURO] Support BinaryType in PrefixComparator

(cherry picked from commit 6d8a6e4161176e391514153d7535da14b52194be)
Signed-off-by: Davies Liu <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7fa41953
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7fa41953
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7fa41953

Branch: refs/heads/branch-1.5
Commit: 7fa41953536d60259dff8ee83e5a3b0050abc9ca
Parents: 57596fb
Author: Takeshi YAMAMURO <[email protected]>
Authored: Wed Aug 5 00:54:31 2015 -0700
Committer: Davies Liu <[email protected]>
Committed: Wed Aug 5 00:56:53 2015 -0700

----------------------------------------------------------------------
 .../unsafe/sort/PrefixComparators.java          | 35 ++++++++++++++++++
 .../unsafe/sort/PrefixComparatorsSuite.scala    | 38 ++++++++++++++++++++
 .../sql/catalyst/expressions/SortOrder.scala    |  3 ++
 .../spark/sql/execution/SortPrefixUtils.scala   |  2 ++
 4 files changed, 78 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7fa41953/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
index 4d7e5b3..b5f661c 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
@@ -20,6 +20,7 @@ package org.apache.spark.util.collection.unsafe.sort;
 import com.google.common.primitives.UnsignedLongs;
 
 import org.apache.spark.annotation.Private;
+import org.apache.spark.unsafe.PlatformDependent;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.apache.spark.util.Utils;
 
@@ -29,6 +30,8 @@ public class PrefixComparators {
 
   public static final StringPrefixComparator STRING = new 
StringPrefixComparator();
   public static final StringPrefixComparatorDesc STRING_DESC = new 
StringPrefixComparatorDesc();
+  public static final BinaryPrefixComparator BINARY = new 
BinaryPrefixComparator();
+  public static final BinaryPrefixComparatorDesc BINARY_DESC = new 
BinaryPrefixComparatorDesc();
   public static final LongPrefixComparator LONG = new LongPrefixComparator();
   public static final LongPrefixComparatorDesc LONG_DESC = new 
LongPrefixComparatorDesc();
   public static final DoublePrefixComparator DOUBLE = new 
DoublePrefixComparator();
@@ -52,6 +55,38 @@ public class PrefixComparators {
     }
   }
 
+  public static final class BinaryPrefixComparator extends PrefixComparator {
+    @Override
+    public int compare(long aPrefix, long bPrefix) {
+      return UnsignedLongs.compare(aPrefix, bPrefix);
+    }
+
+    public static long computePrefix(byte[] bytes) {
+      if (bytes == null) {
+        return 0L;
+      } else {
+        /**
+         * TODO: If a wrapper for BinaryType is created (SPARK-8786),
+         * these codes below will be in the wrapper class.
+         */
+        final int minLen = Math.min(bytes.length, 8);
+        long p = 0;
+        for (int i = 0; i < minLen; ++i) {
+          p |= (128L + PlatformDependent.UNSAFE.getByte(bytes, 
BYTE_ARRAY_OFFSET + i))
+              << (56 - 8 * i);
+        }
+        return p;
+      }
+    }
+  }
+
+  public static final class BinaryPrefixComparatorDesc extends 
PrefixComparator {
+    @Override
+    public int compare(long bPrefix, long aPrefix) {
+      return UnsignedLongs.compare(aPrefix, bPrefix);
+    }
+  }
+
   public static final class LongPrefixComparator extends PrefixComparator {
     @Override
     public int compare(long a, long b) {

http://git-wip-us.apache.org/repos/asf/spark/blob/7fa41953/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
index 26a2e96..0326ed7 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala
@@ -55,6 +55,44 @@ class PrefixComparatorsSuite extends SparkFunSuite with 
PropertyChecks {
     forAll { (s1: String, s2: String) => testPrefixComparison(s1, s2) }
   }
 
+  test("Binary prefix comparator") {
+
+     def compareBinary(x: Array[Byte], y: Array[Byte]): Int = {
+      for (i <- 0 until x.length; if i < y.length) {
+        val res = x(i).compare(y(i))
+        if (res != 0) return res
+      }
+      x.length - y.length
+    }
+
+    def testPrefixComparison(x: Array[Byte], y: Array[Byte]): Unit = {
+      val s1Prefix = PrefixComparators.BinaryPrefixComparator.computePrefix(x)
+      val s2Prefix = PrefixComparators.BinaryPrefixComparator.computePrefix(y)
+      val prefixComparisonResult =
+        PrefixComparators.BINARY.compare(s1Prefix, s2Prefix)
+      assert(
+        (prefixComparisonResult == 0) ||
+        (prefixComparisonResult < 0 && compareBinary(x, y) < 0) ||
+        (prefixComparisonResult > 0 && compareBinary(x, y) > 0))
+    }
+
+    // scalastyle:off
+    val regressionTests = Table(
+      ("s1", "s2"),
+      ("abc", "世界"),
+      ("你好", "世界"),
+      ("你好123", "你好122")
+    )
+    // scalastyle:on
+
+    forAll (regressionTests) { (s1: String, s2: String) =>
+      testPrefixComparison(s1.getBytes("UTF-8"), s2.getBytes("UTF-8"))
+    }
+    forAll { (s1: String, s2: String) =>
+      testPrefixComparison(s1.getBytes("UTF-8"), s2.getBytes("UTF-8"))
+    }
+  }
+
   test("double prefix comparator handles NaNs properly") {
     val nan1: Double = java.lang.Double.longBitsToDouble(0x7ff0000000000001L)
     val nan2: Double = java.lang.Double.longBitsToDouble(0x7fffffffffffffffL)

http://git-wip-us.apache.org/repos/asf/spark/blob/7fa41953/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index f6a872b..98e0290 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import 
org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, 
CodeGenContext}
 import org.apache.spark.sql.types._
+import 
org.apache.spark.util.collection.unsafe.sort.PrefixComparators.BinaryPrefixComparator
 import 
org.apache.spark.util.collection.unsafe.sort.PrefixComparators.DoublePrefixComparator
 
 abstract sealed class SortDirection
@@ -63,6 +64,7 @@ case class SortPrefix(child: SortOrder) extends 
UnaryExpression {
   override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): 
String = {
     val childCode = child.child.gen(ctx)
     val input = childCode.primitive
+    val BinaryPrefixCmp = classOf[BinaryPrefixComparator].getName
     val DoublePrefixCmp = classOf[DoublePrefixComparator].getName
 
     val (nullValue: Long, prefixCode: String) = child.child.dataType match {
@@ -76,6 +78,7 @@ case class SortPrefix(child: SortOrder) extends 
UnaryExpression {
         (DoublePrefixComparator.computePrefix(Double.NegativeInfinity),
           s"$DoublePrefixCmp.computePrefix((double)$input)")
       case StringType => (0L, s"$input.getPrefix()")
+      case BinaryType => (0L, s"$BinaryPrefixCmp.computePrefix($input)")
       case dt: DecimalType if dt.precision - dt.scale <= 
Decimal.MAX_LONG_DIGITS =>
         val prefix = if (dt.precision <= Decimal.MAX_LONG_DIGITS) {
           s"$input.toUnscaledLong()"

http://git-wip-us.apache.org/repos/asf/spark/blob/7fa41953/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
index 49adf21..e17b50e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala
@@ -38,6 +38,8 @@ object SortPrefixUtils {
     sortOrder.dataType match {
       case StringType =>
         if (sortOrder.isAscending) PrefixComparators.STRING else 
PrefixComparators.STRING_DESC
+      case BinaryType =>
+        if (sortOrder.isAscending) PrefixComparators.BINARY else 
PrefixComparators.BINARY_DESC
       case BooleanType | ByteType | ShortType | IntegerType | LongType | 
DateType | TimestampType =>
         if (sortOrder.isAscending) PrefixComparators.LONG else 
PrefixComparators.LONG_DESC
       case dt: DecimalType if dt.precision - dt.scale <= 
Decimal.MAX_LONG_DIGITS =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to