Repository: spark Updated Branches: refs/heads/master 723db13e0 -> f0ebab3f6
[SPARK-9336][SQL] Remove extra JoinedRows They were added to improve performance (so JIT can inline the JoinedRow calls). However, we can also just improve it by projecting output out to UnsafeRow in Tungsten variant of the operators. Author: Reynold Xin <[email protected]> Closes #7659 from rxin/remove-joinedrows and squashes the following commits: 7510447 [Reynold Xin] [SPARK-9336][SQL] Remove extra JoinedRows Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0ebab3f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0ebab3f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0ebab3f Branch: refs/heads/master Commit: f0ebab3f6d3a9231474acf20110db72c0fb51882 Parents: 723db13 Author: Reynold Xin <[email protected]> Authored: Sat Jul 25 01:28:46 2015 -0700 Committer: Reynold Xin <[email protected]> Committed: Sat Jul 25 01:28:46 2015 -0700 ---------------------------------------------------------------------- .../sql/catalyst/expressions/Projection.scala | 494 +------------------ .../apache/spark/sql/execution/Aggregate.scala | 2 +- .../sql/execution/GeneratedAggregate.scala | 2 +- .../org/apache/spark/sql/execution/Window.scala | 2 +- .../aggregate/sortBasedIterators.scala | 2 +- .../spark/sql/execution/joins/HashJoin.scala | 2 +- .../sql/execution/joins/SortMergeJoin.scala | 2 +- 7 files changed, 8 insertions(+), 498 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index dbda05a..6023a2c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -44,7 +44,7 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection { new GenericInternalRow(outputArray) } - override def toString: String = s"Row => [${exprArray.mkString(",")}]" + override def toString(): String = s"Row => [${exprArray.mkString(",")}]" } /** @@ -58,7 +58,7 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu this(expressions.map(BindReferences.bindReference(_, inputSchema))) private[this] val exprArray = expressions.toArray - private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.size) + private[this] var mutableRow: MutableRow = new GenericMutableRow(exprArray.length) def currentValue: InternalRow = mutableRow override def target(row: MutableRow): MutableProjection = { @@ -237,493 +237,3 @@ class JoinedRow extends InternalRow { } } } - -/** - * JIT HACK: Replace with macros - * The `JoinedRow` class is used in many performance critical situation. Unfortunately, since there - * are multiple different types of `Rows` that could be stored as `row1` and `row2` most of the - * calls in the critical path are polymorphic. By creating special versions of this class that are - * used in only a single location of the code, we increase the chance that only a single type of - * Row will be referenced, increasing the opportunity for the JIT to play tricks. This sounds - * crazy but in benchmarks it had noticeable effects. - */ -class JoinedRow2 extends InternalRow { - private[this] var row1: InternalRow = _ - private[this] var row2: InternalRow = _ - - def this(left: InternalRow, right: InternalRow) = { - this() - row1 = left - row2 = right - } - - /** Updates this JoinedRow to used point at two new base rows. Returns itself. */ - def apply(r1: InternalRow, r2: InternalRow): InternalRow = { - row1 = r1 - row2 = r2 - this - } - - /** Updates this JoinedRow by updating its left base row. Returns itself. */ - def withLeft(newLeft: InternalRow): InternalRow = { - row1 = newLeft - this - } - - /** Updates this JoinedRow by updating its right base row. Returns itself. */ - def withRight(newRight: InternalRow): InternalRow = { - row2 = newRight - this - } - - override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq - - override def numFields: Int = row1.numFields + row2.numFields - - override def getUTF8String(i: Int): UTF8String = { - if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - row1.numFields) - } - - override def getBinary(i: Int): Array[Byte] = { - if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields) - } - - override def get(i: Int): Any = - if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields) - - override def isNullAt(i: Int): Boolean = - if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - row1.numFields) - - override def getInt(i: Int): Int = - if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields) - - override def getLong(i: Int): Long = - if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - row1.numFields) - - override def getDouble(i: Int): Double = - if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - row1.numFields) - - override def getBoolean(i: Int): Boolean = - if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - row1.numFields) - - override def getShort(i: Int): Short = - if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - row1.numFields) - - override def getByte(i: Int): Byte = - if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - row1.numFields) - - override def getFloat(i: Int): Float = - if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - row1.numFields) - - override def copy(): InternalRow = { - val totalSize = row1.numFields + row2.numFields - val copiedValues = new Array[Any](totalSize) - var i = 0 - while(i < totalSize) { - copiedValues(i) = get(i) - i += 1 - } - new GenericInternalRow(copiedValues) - } - - override def toString: String = { - // Make sure toString never throws NullPointerException. - if ((row1 eq null) && (row2 eq null)) { - "[ empty row ]" - } else if (row1 eq null) { - row2.mkString("[", ",", "]") - } else if (row2 eq null) { - row1.mkString("[", ",", "]") - } else { - mkString("[", ",", "]") - } - } -} - -/** - * JIT HACK: Replace with macros - */ -class JoinedRow3 extends InternalRow { - private[this] var row1: InternalRow = _ - private[this] var row2: InternalRow = _ - - def this(left: InternalRow, right: InternalRow) = { - this() - row1 = left - row2 = right - } - - /** Updates this JoinedRow to used point at two new base rows. Returns itself. */ - def apply(r1: InternalRow, r2: InternalRow): InternalRow = { - row1 = r1 - row2 = r2 - this - } - - /** Updates this JoinedRow by updating its left base row. Returns itself. */ - def withLeft(newLeft: InternalRow): InternalRow = { - row1 = newLeft - this - } - - /** Updates this JoinedRow by updating its right base row. Returns itself. */ - def withRight(newRight: InternalRow): InternalRow = { - row2 = newRight - this - } - - override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq - - override def numFields: Int = row1.numFields + row2.numFields - - override def getUTF8String(i: Int): UTF8String = { - if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - row1.numFields) - } - - override def getBinary(i: Int): Array[Byte] = { - if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields) - } - - - override def get(i: Int): Any = - if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields) - - override def isNullAt(i: Int): Boolean = - if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - row1.numFields) - - override def getInt(i: Int): Int = - if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields) - - override def getLong(i: Int): Long = - if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - row1.numFields) - - override def getDouble(i: Int): Double = - if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - row1.numFields) - - override def getBoolean(i: Int): Boolean = - if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - row1.numFields) - - override def getShort(i: Int): Short = - if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - row1.numFields) - - override def getByte(i: Int): Byte = - if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - row1.numFields) - - override def getFloat(i: Int): Float = - if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - row1.numFields) - - override def copy(): InternalRow = { - val totalSize = row1.numFields + row2.numFields - val copiedValues = new Array[Any](totalSize) - var i = 0 - while(i < totalSize) { - copiedValues(i) = get(i) - i += 1 - } - new GenericInternalRow(copiedValues) - } - - override def toString: String = { - // Make sure toString never throws NullPointerException. - if ((row1 eq null) && (row2 eq null)) { - "[ empty row ]" - } else if (row1 eq null) { - row2.mkString("[", ",", "]") - } else if (row2 eq null) { - row1.mkString("[", ",", "]") - } else { - mkString("[", ",", "]") - } - } -} - -/** - * JIT HACK: Replace with macros - */ -class JoinedRow4 extends InternalRow { - private[this] var row1: InternalRow = _ - private[this] var row2: InternalRow = _ - - def this(left: InternalRow, right: InternalRow) = { - this() - row1 = left - row2 = right - } - - /** Updates this JoinedRow to used point at two new base rows. Returns itself. */ - def apply(r1: InternalRow, r2: InternalRow): InternalRow = { - row1 = r1 - row2 = r2 - this - } - - /** Updates this JoinedRow by updating its left base row. Returns itself. */ - def withLeft(newLeft: InternalRow): InternalRow = { - row1 = newLeft - this - } - - /** Updates this JoinedRow by updating its right base row. Returns itself. */ - def withRight(newRight: InternalRow): InternalRow = { - row2 = newRight - this - } - - override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq - - override def numFields: Int = row1.numFields + row2.numFields - - override def getUTF8String(i: Int): UTF8String = { - if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - row1.numFields) - } - - override def getBinary(i: Int): Array[Byte] = { - if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields) - } - - - override def get(i: Int): Any = - if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields) - - override def isNullAt(i: Int): Boolean = - if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - row1.numFields) - - override def getInt(i: Int): Int = - if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields) - - override def getLong(i: Int): Long = - if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - row1.numFields) - - override def getDouble(i: Int): Double = - if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - row1.numFields) - - override def getBoolean(i: Int): Boolean = - if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - row1.numFields) - - override def getShort(i: Int): Short = - if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - row1.numFields) - - override def getByte(i: Int): Byte = - if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - row1.numFields) - - override def getFloat(i: Int): Float = - if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - row1.numFields) - - override def copy(): InternalRow = { - val totalSize = row1.numFields + row2.numFields - val copiedValues = new Array[Any](totalSize) - var i = 0 - while(i < totalSize) { - copiedValues(i) = get(i) - i += 1 - } - new GenericInternalRow(copiedValues) - } - - override def toString: String = { - // Make sure toString never throws NullPointerException. - if ((row1 eq null) && (row2 eq null)) { - "[ empty row ]" - } else if (row1 eq null) { - row2.mkString("[", ",", "]") - } else if (row2 eq null) { - row1.mkString("[", ",", "]") - } else { - mkString("[", ",", "]") - } - } -} - -/** - * JIT HACK: Replace with macros - */ -class JoinedRow5 extends InternalRow { - private[this] var row1: InternalRow = _ - private[this] var row2: InternalRow = _ - - def this(left: InternalRow, right: InternalRow) = { - this() - row1 = left - row2 = right - } - - /** Updates this JoinedRow to used point at two new base rows. Returns itself. */ - def apply(r1: InternalRow, r2: InternalRow): InternalRow = { - row1 = r1 - row2 = r2 - this - } - - /** Updates this JoinedRow by updating its left base row. Returns itself. */ - def withLeft(newLeft: InternalRow): InternalRow = { - row1 = newLeft - this - } - - /** Updates this JoinedRow by updating its right base row. Returns itself. */ - def withRight(newRight: InternalRow): InternalRow = { - row2 = newRight - this - } - - override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq - - override def numFields: Int = row1.numFields + row2.numFields - - override def getUTF8String(i: Int): UTF8String = { - if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - row1.numFields) - } - - override def getBinary(i: Int): Array[Byte] = { - if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields) - } - - - override def get(i: Int): Any = - if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields) - - override def isNullAt(i: Int): Boolean = - if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - row1.numFields) - - override def getInt(i: Int): Int = - if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields) - - override def getLong(i: Int): Long = - if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - row1.numFields) - - override def getDouble(i: Int): Double = - if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - row1.numFields) - - override def getBoolean(i: Int): Boolean = - if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - row1.numFields) - - override def getShort(i: Int): Short = - if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - row1.numFields) - - override def getByte(i: Int): Byte = - if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - row1.numFields) - - override def getFloat(i: Int): Float = - if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - row1.numFields) - - override def copy(): InternalRow = { - val totalSize = row1.numFields + row2.numFields - val copiedValues = new Array[Any](totalSize) - var i = 0 - while(i < totalSize) { - copiedValues(i) = get(i) - i += 1 - } - new GenericInternalRow(copiedValues) - } - - override def toString: String = { - // Make sure toString never throws NullPointerException. - if ((row1 eq null) && (row2 eq null)) { - "[ empty row ]" - } else if (row1 eq null) { - row2.mkString("[", ",", "]") - } else if (row2 eq null) { - row1.mkString("[", ",", "]") - } else { - mkString("[", ",", "]") - } - } -} - -/** - * JIT HACK: Replace with macros - */ -class JoinedRow6 extends InternalRow { - private[this] var row1: InternalRow = _ - private[this] var row2: InternalRow = _ - - def this(left: InternalRow, right: InternalRow) = { - this() - row1 = left - row2 = right - } - - /** Updates this JoinedRow to used point at two new base rows. Returns itself. */ - def apply(r1: InternalRow, r2: InternalRow): InternalRow = { - row1 = r1 - row2 = r2 - this - } - - /** Updates this JoinedRow by updating its left base row. Returns itself. */ - def withLeft(newLeft: InternalRow): InternalRow = { - row1 = newLeft - this - } - - /** Updates this JoinedRow by updating its right base row. Returns itself. */ - def withRight(newRight: InternalRow): InternalRow = { - row2 = newRight - this - } - - override def toSeq: Seq[Any] = row1.toSeq ++ row2.toSeq - - override def numFields: Int = row1.numFields + row2.numFields - - override def getUTF8String(i: Int): UTF8String = { - if (i < row1.numFields) row1.getUTF8String(i) else row2.getUTF8String(i - row1.numFields) - } - - override def getBinary(i: Int): Array[Byte] = { - if (i < row1.numFields) row1.getBinary(i) else row2.getBinary(i - row1.numFields) - } - - - override def get(i: Int): Any = - if (i < row1.numFields) row1.get(i) else row2.get(i - row1.numFields) - - override def isNullAt(i: Int): Boolean = - if (i < row1.numFields) row1.isNullAt(i) else row2.isNullAt(i - row1.numFields) - - override def getInt(i: Int): Int = - if (i < row1.numFields) row1.getInt(i) else row2.getInt(i - row1.numFields) - - override def getLong(i: Int): Long = - if (i < row1.numFields) row1.getLong(i) else row2.getLong(i - row1.numFields) - - override def getDouble(i: Int): Double = - if (i < row1.numFields) row1.getDouble(i) else row2.getDouble(i - row1.numFields) - - override def getBoolean(i: Int): Boolean = - if (i < row1.numFields) row1.getBoolean(i) else row2.getBoolean(i - row1.numFields) - - override def getShort(i: Int): Short = - if (i < row1.numFields) row1.getShort(i) else row2.getShort(i - row1.numFields) - - override def getByte(i: Int): Byte = - if (i < row1.numFields) row1.getByte(i) else row2.getByte(i - row1.numFields) - - override def getFloat(i: Int): Float = - if (i < row1.numFields) row1.getFloat(i) else row2.getFloat(i - row1.numFields) - - override def copy(): InternalRow = { - val totalSize = row1.numFields + row2.numFields - val copiedValues = new Array[Any](totalSize) - var i = 0 - while(i < totalSize) { - copiedValues(i) = get(i) - i += 1 - } - new GenericInternalRow(copiedValues) - } - - override def toString: String = { - // Make sure toString never throws NullPointerException. - if ((row1 eq null) && (row2 eq null)) { - "[ empty row ]" - } else if (row1 eq null) { - row2.mkString("[", ",", "]") - } else if (row2 eq null) { - row1.mkString("[", ",", "]") - } else { - mkString("[", ",", "]") - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index c2c9453..e8c6a0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -172,7 +172,7 @@ case class Aggregate( private[this] val resultProjection = new InterpretedMutableProjection( resultExpressions, computedSchema ++ namedGroups.map(_._2)) - private[this] val joinedRow = new JoinedRow4 + private[this] val joinedRow = new JoinedRow override final def hasNext: Boolean = hashTableIter.hasNext http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala index 5ed158b..5ad4691 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala @@ -269,7 +269,7 @@ case class GeneratedAggregate( namedGroups.map(_._2) ++ computationSchema) log.info(s"Result Projection: ${resultExpressions.mkString(",")}") - val joinedRow = new JoinedRow3 + val joinedRow = new JoinedRow if (!iter.hasNext) { // This is an empty input, so return early so that we do not allocate data structures http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index de04132..91c8a02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -298,7 +298,7 @@ case class Window( var rowsSize = 0 override final def hasNext: Boolean = rowIndex < rowsSize || nextRowAvailable - val join = new JoinedRow6 + val join = new JoinedRow val windowFunctionResult = new GenericMutableRow(unboundExpressions.size) override final def next(): InternalRow = { // Load the next partition if we need to. http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala index b8e95a5..1b89eda 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/sortBasedIterators.scala @@ -106,7 +106,7 @@ private[sql] abstract class SortAggregationIterator( new GenericMutableRow(size) } - protected val joinedRow = new JoinedRow4 + protected val joinedRow = new JoinedRow protected val placeholderExpressions = Seq.fill(initialBufferOffset)(NoOp) http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index ae34409..46ab5b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -69,7 +69,7 @@ trait HashJoin { private[this] var currentMatchPosition: Int = -1 // Mutable per row objects. - private[this] val joinRow = new JoinedRow2 + private[this] val joinRow = new JoinedRow private[this] val resultProjection: Projection = { if (supportUnsafe) { UnsafeProjection.create(self.schema) http://git-wip-us.apache.org/repos/asf/spark/blob/f0ebab3f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index 981447e..bb18b54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -66,7 +66,7 @@ case class SortMergeJoin( leftResults.zipPartitions(rightResults) { (leftIter, rightIter) => new Iterator[InternalRow] { // Mutable per row objects. - private[this] val joinRow = new JoinedRow5 + private[this] val joinRow = new JoinedRow private[this] var leftElement: InternalRow = _ private[this] var rightElement: InternalRow = _ private[this] var leftKey: InternalRow = _ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
