Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c1c498006 -> fb1c69714


[SPARK-17061][SPARK-17093][SQL] MapObjects` should make copies of unsafe-backed 
data

Currently `MapObjects` does not make copies of unsafe-backed data, leading to 
problems like [SPARK-17061](https://issues.apache.org/jira/browse/SPARK-17061) 
[SPARK-17093](https://issues.apache.org/jira/browse/SPARK-17093).

This patch makes `MapObjects` make copies of unsafe-backed data.

Generated code - prior to this patch:
```java
...
/* 295 */ if (isNull12) {
/* 296 */   convertedArray1[loopIndex1] = null;
/* 297 */ } else {
/* 298 */   convertedArray1[loopIndex1] = value12;
/* 299 */ }
...
```

Generated code - after this patch:
```java
...
/* 295 */ if (isNull12) {
/* 296 */   convertedArray1[loopIndex1] = null;
/* 297 */ } else {
/* 298 */   convertedArray1[loopIndex1] = value12 instanceof UnsafeRow? 
value12.copy() : value12;
/* 299 */ }
...
```

Add a new test case which would fail without this patch.

Author: Liwei Lin <[email protected]>

Closes #14698 from lw-lin/mapobjects-copy.

(cherry picked from commit e0b20f9f24d5c3304bf517a4dcfb0da93be5bc75)
Signed-off-by: Herman van Hovell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb1c6971
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb1c6971
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb1c6971

Branch: refs/heads/branch-2.0
Commit: fb1c697143a5bb2df69d9f2c9cbddc4eb526f047
Parents: c1c4980
Author: Liwei Lin <[email protected]>
Authored: Thu Aug 25 11:24:40 2016 +0200
Committer: Herman van Hovell <[email protected]>
Committed: Thu Aug 25 11:45:51 2016 +0200

----------------------------------------------------------------------
 .../maven_app_core/src/main/java/SimpleApp.java | 41 ++++++++++++++++++++
 .../catalyst/expressions/objects/objects.scala  | 12 +++++-
 .../expressions/ExpressionEvalHelper.scala      |  2 +-
 3 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fb1c6971/dev/audit-release/maven_app_core/src/main/java/SimpleApp.java
----------------------------------------------------------------------
diff --git a/dev/audit-release/maven_app_core/src/main/java/SimpleApp.java 
b/dev/audit-release/maven_app_core/src/main/java/SimpleApp.java
index 5217689..059e51b 100644
--- a/dev/audit-release/maven_app_core/src/main/java/SimpleApp.java
+++ b/dev/audit-release/maven_app_core/src/main/java/SimpleApp.java
@@ -18,11 +18,20 @@
 import org.apache.spark.api.java.*;
 import org.apache.spark.api.java.function.Function;
 
+<<<<<<< HEAD:dev/audit-release/maven_app_core/src/main/java/SimpleApp.java
 public class SimpleApp {
   public static void main(String[] args) {
     String logFile = "input.txt";
     JavaSparkContext sc = new JavaSparkContext("local", "Simple App");
     JavaRDD<String> logData = sc.textFile(logFile).cache();
+=======
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.objects.Invoke
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.types.{IntegerType, ObjectType}
+>>>>>>> e0b20f9... [SPARK-17061][SPARK-17093][SQL] MapObjects` should make 
copies of unsafe-backed 
data:sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala
 
     long numAs = logData.filter(new Function<String, Boolean>() {
       public Boolean call(String s) { return s.contains("a"); }
@@ -39,4 +48,36 @@ public class SimpleApp {
    System.out.println("Test succeeded");
    sc.stop();
   }
+
+  test("MapObjects should make copies of unsafe-backed data") {
+    // test UnsafeRow-backed data
+    val structEncoder = ExpressionEncoder[Array[Tuple2[java.lang.Integer, 
java.lang.Integer]]]
+    val structInputRow = InternalRow.fromSeq(Seq(Array((1, 2), (3, 4))))
+    val structExpected = new GenericArrayData(
+      Array(InternalRow.fromSeq(Seq(1, 2)), InternalRow.fromSeq(Seq(3, 4))))
+    checkEvalutionWithUnsafeProjection(
+      structEncoder.serializer.head, structExpected, structInputRow)
+
+    // test UnsafeArray-backed data
+    val arrayEncoder = ExpressionEncoder[Array[Array[Int]]]
+    val arrayInputRow = InternalRow.fromSeq(Seq(Array(Array(1, 2), Array(3, 
4))))
+    val arrayExpected = new GenericArrayData(
+      Array(new GenericArrayData(Array(1, 2)), new GenericArrayData(Array(3, 
4))))
+    checkEvalutionWithUnsafeProjection(
+      arrayEncoder.serializer.head, arrayExpected, arrayInputRow)
+
+    // test UnsafeMap-backed data
+    val mapEncoder = ExpressionEncoder[Array[Map[Int, Int]]]
+    val mapInputRow = InternalRow.fromSeq(Seq(Array(
+      Map(1 -> 100, 2 -> 200), Map(3 -> 300, 4 -> 400))))
+    val mapExpected = new GenericArrayData(Seq(
+      new ArrayBasedMapData(
+        new GenericArrayData(Array(1, 2)),
+        new GenericArrayData(Array(100, 200))),
+      new ArrayBasedMapData(
+        new GenericArrayData(Array(3, 4)),
+        new GenericArrayData(Array(300, 400)))))
+    checkEvalutionWithUnsafeProjection(
+      mapEncoder.serializer.head, mapExpected, mapInputRow)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fb1c6971/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 37ec1a6..1cdda53 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -481,6 +481,16 @@ case class MapObjects private(
           s"$seq == null ? $array[$loopIndex] : $seq.apply($loopIndex)"
     }
 
+    // Make a copy of the data if it's unsafe-backed
+    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
+      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
+    val genFunctionValue = lambdaFunction.dataType match {
+      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], 
genFunction.value)
+      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], 
genFunction.value)
+      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], 
genFunction.value)
+      case _ => genFunction.value
+    }
+
     val loopNullCheck = inputDataType match {
       case _: ArrayType => s"$loopIsNull = 
${genInputData.value}.isNullAt($loopIndex);"
       // The element of primitive array will never be null.
@@ -508,7 +518,7 @@ case class MapObjects private(
           if (${genFunction.isNull}) {
             $convertedArray[$loopIndex] = null;
           } else {
-            $convertedArray[$loopIndex] = ${genFunction.value};
+            $convertedArray[$loopIndex] = $genFunctionValue;
           }
 
           $loopIndex += 1;

http://git-wip-us.apache.org/repos/asf/spark/blob/fb1c6971/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
index d6a9672..668543a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
@@ -136,7 +136,7 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
     // some expression is reusing variable names across different instances.
     // This behavior is tested in ExpressionEvalHelperSuite.
     val plan = generateProject(
-      GenerateUnsafeProjection.generate(
+      UnsafeProjection.create(
         Alias(expression, s"Optimized($expression)1")() ::
           Alias(expression, s"Optimized($expression)2")() :: Nil),
       expression)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to