Repository: spark
Updated Branches:
  refs/heads/master 723db13e0 -> f0ebab3f6


[SPARK-9336][SQL] Remove extra JoinedRows

They were added to improve performance (so JIT can inline the JoinedRow calls). 
However, we can also just improve it by projecting output out to UnsafeRow in 
Tungsten variant of the operators.

Author: Reynold Xin <[email protected]>

Closes #7659 from rxin/remove-joinedrows and squashes the following commits:

7510447 [Reynold Xin] [SPARK-9336][SQL] Remove extra JoinedRows


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0ebab3f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0ebab3f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0ebab3f

Branch: refs/heads/master
Commit: f0ebab3f6d3a9231474acf20110db72c0fb51882
Parents: 723db13
Author: Reynold Xin <[email protected]>
Authored: Sat Jul 25 01:28:46 2015 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sat Jul 25 01:28:46 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/Projection.scala   | 494 +------------------
 .../apache/spark/sql/execution/Aggregate.scala  |   2 +-
 .../sql/execution/GeneratedAggregate.scala      |   2 +-
 .../org/apache/spark/sql/execution/Window.scala |   2 +-
 .../aggregate/sortBasedIterators.scala          |   2 +-
 .../spark/sql/execution/joins/HashJoin.scala    |   2 +-
 .../sql/execution/joins/SortMergeJoin.scala     |   2 +-
 7 files changed, 8 insertions(+), 498 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index dbda05a..6023a2c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -44,7 +44,7 @@ class InterpretedProjection(expressions: Seq[Expression]) 
extends Projection {
     new GenericInternalRow(outputArray)
   }
 
-  override def toString: String = s"Row => [${exprArray.mkString(",")}]"
+  override def toString(): String = s"Row => [${exprArray.mkString(",")}]"
 }
 
 /**
@@ -58,7 +58,7 @@ case class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mu
     this(expressions.map(BindReferences.bindReference(_, inputSchema)))
 
   private[this] val exprArray = expressions.toArray
-  private[this] var mutableRow: MutableRow = new 
GenericMutableRow(exprArray.size)
+  private[this] var mutableRow: MutableRow = new 
GenericMutableRow(exprArray.length)
   def currentValue: InternalRow = mutableRow
 
   override def target(row: MutableRow): MutableProjection = {
@@ -237,493 +237,3 @@ class JoinedRow extends InternalRow {
     }
   }
 }
-
-/**
- * JIT HACK: Replace with macros
- * The `JoinedRow` class is used in many performance critical situation.  
Unfortunately, since there
- * are multiple different types of `Rows` that could be stored as `row1` and 
`row2` most of the
- * calls in the critical path are polymorphic.  By creating special versions 
of this class that are
- * used in only a single location of the code, we increase the chance that 
only a single type of
- * Row will be referenced, increasing the opportunity for the JIT to play 
tricks.  This sounds
- * crazy but in benchmarks it had noticeable effects.
- */
-class JoinedRow2 extends InternalRow {
-  private[this] var row1: InternalRow = _
-  private[this] var row2: InternalRow = _
-
-  def this(left: InternalRow, right: InternalRow) = {
-    this()
-    row1 = left
-    row2 = right
-  }
-
-  /** Updates this JoinedRow to used point at two new base rows.  Returns 
itself. */
-  def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
-    row1 = r1
-    row2 = r2
-    this
-  }
-
-  /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: InternalRow): InternalRow = {
-    row1 = newLeft
-    this
-  }
-
-  /** Updates this JoinedRow by updating its right base row.  Returns itself. 
*/
-  def withRight(newRight: InternalRow): InternalRow = {
-    row2 = newRight
-    this
-  }
-
-  override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq
-
-  override def numFields: Int = row1.numFields + row2.numFields
-
-  override def getUTF8String(i: Int): UTF8String = {
-    if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - 
row1.numFields)
-  }
-
-  override def getBinary(i: Int): Array[Byte] = {
-    if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - 
row1.numFields)
-  }
-
-  override def get(i: Int): Any =
-    if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields)
-
-  override def isNullAt(i: Int): Boolean =
-    if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - 
row1.numFields)
-
-  override def getInt(i: Int): Int =
-    if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields)
-
-  override def getLong(i: Int): Long =
-    if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - 
row1.numFields)
-
-  override def getDouble(i: Int): Double =
-    if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - 
row1.numFields)
-
-  override def getBoolean(i: Int): Boolean =
-    if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - 
row1.numFields)
-
-  override def getShort(i: Int): Short =
-    if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - 
row1.numFields)
-
-  override def getByte(i: Int): Byte =
-    if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - 
row1.numFields)
-
-  override def getFloat(i: Int): Float =
-    if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - 
row1.numFields)
-
-  override def copy(): InternalRow = {
-    val totalSize = row1.numFields + row2.numFields
-    val copiedValues = new Array[Any](totalSize)
-    var i = 0
-    while(i < totalSize) {
-      copiedValues(i) = get(i)
-      i += 1
-    }
-    new GenericInternalRow(copiedValues)
-  }
-
-  override def toString: String = {
-    // Make sure toString never throws NullPointerException.
-    if ((row1 eq null) && (row2 eq null)) {
-      "[ empty row ]"
-    } else if (row1 eq null) {
-      row2.mkString("[", ",", "]")
-    } else if (row2 eq null) {
-      row1.mkString("[", ",", "]")
-    } else {
-      mkString("[", ",", "]")
-    }
-  }
-}
-
-/**
- * JIT HACK: Replace with macros
- */
-class JoinedRow3 extends InternalRow {
-  private[this] var row1: InternalRow = _
-  private[this] var row2: InternalRow = _
-
-  def this(left: InternalRow, right: InternalRow) = {
-    this()
-    row1 = left
-    row2 = right
-  }
-
-  /** Updates this JoinedRow to used point at two new base rows.  Returns 
itself. */
-  def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
-    row1 = r1
-    row2 = r2
-    this
-  }
-
-  /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: InternalRow): InternalRow = {
-    row1 = newLeft
-    this
-  }
-
-  /** Updates this JoinedRow by updating its right base row.  Returns itself. 
*/
-  def withRight(newRight: InternalRow): InternalRow = {
-    row2 = newRight
-    this
-  }
-
-  override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq
-
-  override def numFields: Int = row1.numFields + row2.numFields
-
-  override def getUTF8String(i: Int): UTF8String = {
-    if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - 
row1.numFields)
-  }
-
-  override def getBinary(i: Int): Array[Byte] = {
-    if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - 
row1.numFields)
-  }
-
-
-  override def get(i: Int): Any =
-    if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields)
-
-  override def isNullAt(i: Int): Boolean =
-    if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - 
row1.numFields)
-
-  override def getInt(i: Int): Int =
-    if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields)
-
-  override def getLong(i: Int): Long =
-    if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - 
row1.numFields)
-
-  override def getDouble(i: Int): Double =
-    if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - 
row1.numFields)
-
-  override def getBoolean(i: Int): Boolean =
-    if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - 
row1.numFields)
-
-  override def getShort(i: Int): Short =
-    if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - 
row1.numFields)
-
-  override def getByte(i: Int): Byte =
-    if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - 
row1.numFields)
-
-  override def getFloat(i: Int): Float =
-    if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - 
row1.numFields)
-
-  override def copy(): InternalRow = {
-    val totalSize = row1.numFields + row2.numFields
-    val copiedValues = new Array[Any](totalSize)
-    var i = 0
-    while(i < totalSize) {
-      copiedValues(i) = get(i)
-      i += 1
-    }
-    new GenericInternalRow(copiedValues)
-  }
-
-  override def toString: String = {
-    // Make sure toString never throws NullPointerException.
-    if ((row1 eq null) && (row2 eq null)) {
-      "[ empty row ]"
-    } else if (row1 eq null) {
-      row2.mkString("[", ",", "]")
-    } else if (row2 eq null) {
-      row1.mkString("[", ",", "]")
-    } else {
-      mkString("[", ",", "]")
-    }
-  }
-}
-
-/**
- * JIT HACK: Replace with macros
- */
-class JoinedRow4 extends InternalRow {
-  private[this] var row1: InternalRow = _
-  private[this] var row2: InternalRow = _
-
-  def this(left: InternalRow, right: InternalRow) = {
-    this()
-    row1 = left
-    row2 = right
-  }
-
-  /** Updates this JoinedRow to used point at two new base rows.  Returns 
itself. */
-  def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
-    row1 = r1
-    row2 = r2
-    this
-  }
-
-  /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: InternalRow): InternalRow = {
-    row1 = newLeft
-    this
-  }
-
-  /** Updates this JoinedRow by updating its right base row.  Returns itself. 
*/
-  def withRight(newRight: InternalRow): InternalRow = {
-    row2 = newRight
-    this
-  }
-
-  override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq
-
-  override def numFields: Int = row1.numFields + row2.numFields
-
-  override def getUTF8String(i: Int): UTF8String = {
-    if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - 
row1.numFields)
-  }
-
-  override def getBinary(i: Int): Array[Byte] = {
-    if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - 
row1.numFields)
-  }
-
-
-  override def get(i: Int): Any =
-    if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields)
-
-  override def isNullAt(i: Int): Boolean =
-    if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - 
row1.numFields)
-
-  override def getInt(i: Int): Int =
-    if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields)
-
-  override def getLong(i: Int): Long =
-    if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - 
row1.numFields)
-
-  override def getDouble(i: Int): Double =
-    if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - 
row1.numFields)
-
-  override def getBoolean(i: Int): Boolean =
-    if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - 
row1.numFields)
-
-  override def getShort(i: Int): Short =
-    if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - 
row1.numFields)
-
-  override def getByte(i: Int): Byte =
-    if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - 
row1.numFields)
-
-  override def getFloat(i: Int): Float =
-    if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - 
row1.numFields)
-
-  override def copy(): InternalRow = {
-    val totalSize = row1.numFields + row2.numFields
-    val copiedValues = new Array[Any](totalSize)
-    var i = 0
-    while(i < totalSize) {
-      copiedValues(i) = get(i)
-      i += 1
-    }
-    new GenericInternalRow(copiedValues)
-  }
-
-  override def toString: String = {
-    // Make sure toString never throws NullPointerException.
-    if ((row1 eq null) && (row2 eq null)) {
-      "[ empty row ]"
-    } else if (row1 eq null) {
-      row2.mkString("[", ",", "]")
-    } else if (row2 eq null) {
-      row1.mkString("[", ",", "]")
-    } else {
-      mkString("[", ",", "]")
-    }
-  }
-}
-
-/**
- * JIT HACK: Replace with macros
- */
-class JoinedRow5 extends InternalRow {
-  private[this] var row1: InternalRow = _
-  private[this] var row2: InternalRow = _
-
-  def this(left: InternalRow, right: InternalRow) = {
-    this()
-    row1 = left
-    row2 = right
-  }
-
-  /** Updates this JoinedRow to used point at two new base rows.  Returns 
itself. */
-  def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
-    row1 = r1
-    row2 = r2
-    this
-  }
-
-  /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: InternalRow): InternalRow = {
-    row1 = newLeft
-    this
-  }
-
-  /** Updates this JoinedRow by updating its right base row.  Returns itself. 
*/
-  def withRight(newRight: InternalRow): InternalRow = {
-    row2 = newRight
-    this
-  }
-
-  override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq
-
-  override def numFields: Int = row1.numFields + row2.numFields
-
-  override def getUTF8String(i: Int): UTF8String = {
-    if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - 
row1.numFields)
-  }
-
-  override def getBinary(i: Int): Array[Byte] = {
-    if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - 
row1.numFields)
-  }
-
-
-  override def get(i: Int): Any =
-    if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields)
-
-  override def isNullAt(i: Int): Boolean =
-    if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - 
row1.numFields)
-
-  override def getInt(i: Int): Int =
-    if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields)
-
-  override def getLong(i: Int): Long =
-    if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - 
row1.numFields)
-
-  override def getDouble(i: Int): Double =
-    if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - 
row1.numFields)
-
-  override def getBoolean(i: Int): Boolean =
-    if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - 
row1.numFields)
-
-  override def getShort(i: Int): Short =
-    if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - 
row1.numFields)
-
-  override def getByte(i: Int): Byte =
-    if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - 
row1.numFields)
-
-  override def getFloat(i: Int): Float =
-    if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - 
row1.numFields)
-
-  override def copy(): InternalRow = {
-    val totalSize = row1.numFields + row2.numFields
-    val copiedValues = new Array[Any](totalSize)
-    var i = 0
-    while(i < totalSize) {
-      copiedValues(i) = get(i)
-      i += 1
-    }
-    new GenericInternalRow(copiedValues)
-  }
-
-  override def toString: String = {
-    // Make sure toString never throws NullPointerException.
-    if ((row1 eq null) && (row2 eq null)) {
-      "[ empty row ]"
-    } else if (row1 eq null) {
-      row2.mkString("[", ",", "]")
-    } else if (row2 eq null) {
-      row1.mkString("[", ",", "]")
-    } else {
-      mkString("[", ",", "]")
-    }
-  }
-}
-
-/**
- * JIT HACK: Replace with macros
- */
-class JoinedRow6 extends InternalRow {
-  private[this] var row1: InternalRow = _
-  private[this] var row2: InternalRow = _
-
-  def this(left: InternalRow, right: InternalRow) = {
-    this()
-    row1 = left
-    row2 = right
-  }
-
-  /** Updates this JoinedRow to used point at two new base rows.  Returns 
itself. */
-  def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
-    row1 = r1
-    row2 = r2
-    this
-  }
-
-  /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: InternalRow): InternalRow = {
-    row1 = newLeft
-    this
-  }
-
-  /** Updates this JoinedRow by updating its right base row.  Returns itself. 
*/
-  def withRight(newRight: InternalRow): InternalRow = {
-    row2 = newRight
-    this
-  }
-
-  override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq
-
-  override def numFields: Int = row1.numFields + row2.numFields
-
-  override def getUTF8String(i: Int): UTF8String = {
-    if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - 
row1.numFields)
-  }
-
-  override def getBinary(i: Int): Array[Byte] = {
-    if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - 
row1.numFields)
-  }
-
-
-  override def get(i: Int): Any =
-    if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields)
-
-  override def isNullAt(i: Int): Boolean =
-    if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - 
row1.numFields)
-
-  override def getInt(i: Int): Int =
-    if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields)
-
-  override def getLong(i: Int): Long =
-    if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - 
row1.numFields)
-
-  override def getDouble(i: Int): Double =
-    if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - 
row1.numFields)
-
-  override def getBoolean(i: Int): Boolean =
-    if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - 
row1.numFields)
-
-  override def getShort(i: Int): Short =
-    if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - 
row1.numFields)
-
-  override def getByte(i: Int): Byte =
-    if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - 
row1.numFields)
-
-  override def getFloat(i: Int): Float =
-    if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - 
row1.numFields)
-
-  override def copy(): InternalRow = {
-    val totalSize = row1.numFields + row2.numFields
-    val copiedValues = new Array[Any](totalSize)
-    var i = 0
-    while(i < totalSize) {
-      copiedValues(i) = get(i)
-      i += 1
-    }
-    new GenericInternalRow(copiedValues)
-  }
-
-  override def toString: String = {
-    // Make sure toString never throws NullPointerException.
-    if ((row1 eq null) && (row2 eq null)) {
-      "[ empty row ]"
-    } else if (row1 eq null) {
-      row2.mkString("[", ",", "]")
-    } else if (row2 eq null) {
-      row1.mkString("[", ",", "]")
-    } else {
-      mkString("[", ",", "]")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
index c2c9453..e8c6a0f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala
@@ -172,7 +172,7 @@ case class Aggregate(
           private[this] val resultProjection =
             new InterpretedMutableProjection(
               resultExpressions, computedSchema ++ namedGroups.map(_._2))
-          private[this] val joinedRow = new JoinedRow4
+          private[this] val joinedRow = new JoinedRow
 
           override final def hasNext: Boolean = hashTableIter.hasNext
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
index 5ed158b..5ad4691 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala
@@ -269,7 +269,7 @@ case class GeneratedAggregate(
           namedGroups.map(_._2) ++ computationSchema)
       log.info(s"Result Projection: ${resultExpressions.mkString(",")}")
 
-      val joinedRow = new JoinedRow3
+      val joinedRow = new JoinedRow
 
       if (!iter.hasNext) {
         // This is an empty input, so return early so that we do not allocate 
data structures

http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index de04132..91c8a02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -298,7 +298,7 @@ case class Window(
         var rowsSize = 0
         override final def hasNext: Boolean = rowIndex < rowsSize || 
nextRowAvailable
 
-        val join = new JoinedRow6
+        val join = new JoinedRow
         val windowFunctionResult = new 
GenericMutableRow(unboundExpressions.size)
         override final def next(): InternalRow = {
           // Load the next partition if we need to.

http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala
index b8e95a5..1b89eda 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala
@@ -106,7 +106,7 @@ private[sql] abstract class SortAggregationIterator(
     new GenericMutableRow(size)
   }
 
-  protected val joinedRow = new JoinedRow4
+  protected val joinedRow = new JoinedRow
 
   protected val placeholderExpressions = Seq.fill(initialBufferOffset)(NoOp)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index ae34409..46ab5b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -69,7 +69,7 @@ trait HashJoin {
       private[this] var currentMatchPosition: Int = -1
 
       // Mutable per row objects.
-      private[this] val joinRow = new JoinedRow2
+      private[this] val joinRow = new JoinedRow
       private[this] val resultProjection: Projection = {
         if (supportUnsafe) {
           UnsafeProjection.create(self.schema)

http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 981447e..bb18b54 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -66,7 +66,7 @@ case class SortMergeJoin(
     leftResults.zipPartitions(rightResults) { (leftIter, rightIter) =>
       new Iterator[InternalRow] {
         // Mutable per row objects.
-        private[this] val joinRow = new JoinedRow5
+        private[this] val joinRow = new JoinedRow
         private[this] var leftElement: InternalRow = _
         private[this] var rightElement: InternalRow = _
         private[this] var leftKey: InternalRow = _


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to