Repository: spark
Updated Branches:
refs/heads/master c41fdf04f -> 4415722e9
[SQL][SPARK-2212]Hash Outer Join
This patch is to support the hash based outer join. Currently, outer join for
big relations are resort to `BoradcastNestedLoopJoin`, which is super slow.
This PR will create 2 hash tables for both relations in the same partition,
which greatly reduce the table scans.
Here is the testing code that I used:
```
package org.apache.spark.sql.hive
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql._
case class Record(key: String, value: String)
object JoinTablePrepare extends App {
import TestHive2._
val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"${i %
828193}", s"val_$i")))
runSqlHive("SHOW TABLES")
runSqlHive("DROP TABLE if exists a")
runSqlHive("DROP TABLE if exists b")
runSqlHive("DROP TABLE if exists result")
rdd.registerAsTable("records")
runSqlHive("""CREATE TABLE a (key STRING, value STRING)
| ROW FORMAT SERDE
|
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
| STORED AS RCFILE
""".stripMargin)
runSqlHive("""CREATE TABLE b (key STRING, value STRING)
| ROW FORMAT SERDE
|
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
| STORED AS RCFILE
""".stripMargin)
runSqlHive("""CREATE TABLE result (key STRING, value STRING)
| ROW FORMAT SERDE
|
'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
| STORED AS RCFILE
""".stripMargin)
hql(s"""from records
| insert into table a
| select key, value
""".stripMargin)
hql(s"""from records
| insert into table b select key + 100000, value
""".stripMargin)
}
object JoinTablePerformanceTest extends App {
import TestHive2._
hql("SHOW TABLES")
hql("set spark.sql.shuffle.partitions=20")
val leftOuterJoin = "insert overwrite table result select a.key, b.value from
a left outer join b on a.key=b.key"
val rightOuterJoin = "insert overwrite table result select a.key, b.value
from a right outer join b on a.key=b.key"
val fullOuterJoin = "insert overwrite table result select a.key, b.value from
a full outer join b on a.key=b.key"
val results = ("LeftOuterJoin", benchmark(leftOuterJoin)) ::
("LeftOuterJoin", benchmark(leftOuterJoin)) ::
("RightOuterJoin", benchmark(rightOuterJoin)) ::
("RightOuterJoin", benchmark(rightOuterJoin)) ::
("FullOuterJoin", benchmark(fullOuterJoin)) ::
("FullOuterJoin", benchmark(fullOuterJoin)) :: Nil
val explains = hql(s"explain $leftOuterJoin").collect ++ hql(s"explain
$rightOuterJoin").collect ++ hql(s"explain $fullOuterJoin").collect
println(explains.mkString(",\n"))
results.foreach { case (prompt, result) => {
println(s"$prompt: took ${result._1} ms (${result._2} records)")
}
}
def benchmark(cmd: String) = {
val begin = System.currentTimeMillis()
val result = hql(cmd)
val end = System.currentTimeMillis()
val count = hql("select count(1) from result").collect.mkString("")
((end - begin), count)
}
}
```
And the result as shown below:
```
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#95,value#98]],
[ HashOuterJoin [key#95], [key#97], LeftOuter, None],
[ Exchange (HashPartitioning [key#95], 20)],
[ HiveTableScan [key#95], (MetastoreRelation default, a, None), None],
[ Exchange (HashPartitioning [key#97], 20)],
[ HiveTableScan [key#97,value#98], (MetastoreRelation default, b, None),
None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#102,value#105]],
[ HashOuterJoin [key#102], [key#104], RightOuter, None],
[ Exchange (HashPartitioning [key#102], 20)],
[ HiveTableScan [key#102], (MetastoreRelation default, a, None), None],
[ Exchange (HashPartitioning [key#104], 20)],
[ HiveTableScan [key#104,value#105], (MetastoreRelation default, b, None),
None],
[Physical execution plan:],
[InsertIntoHiveTable (MetastoreRelation default, result, None), Map(), true],
[ Project [key#109,value#112]],
[ HashOuterJoin [key#109], [key#111], FullOuter, None],
[ Exchange (HashPartitioning [key#109], 20)],
[ HiveTableScan [key#109], (MetastoreRelation default, a, None), None],
[ Exchange (HashPartitioning [key#111], 20)],
[ HiveTableScan [key#111,value#112], (MetastoreRelation default, b, None),
None]
LeftOuterJoin: took 16072 ms ([3000000] records)
LeftOuterJoin: took 14394 ms ([3000000] records)
RightOuterJoin: took 14802 ms ([3000000] records)
RightOuterJoin: took 14747 ms ([3000000] records)
FullOuterJoin: took 17715 ms ([6000000] records)
FullOuterJoin: took 17629 ms ([6000000] records)
```
Without this PR, the benchmark will run seems never end.
Author: Cheng Hao <[email protected]>
Closes #1147 from chenghao-intel/hash_based_outer_join and squashes the
following commits:
65c599e [Cheng Hao] Fix issues with the community comments
72b1394 [Cheng Hao] Fix bug of stale value in joinedRow
55baef7 [Cheng Hao] Add HashOuterJoin
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4415722e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4415722e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4415722e
Branch: refs/heads/master
Commit: 4415722e9199d04c2c18bfbd29113ebc40f732f5
Parents: c41fdf0
Author: Cheng Hao <[email protected]>
Authored: Fri Aug 1 11:27:12 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Fri Aug 1 11:27:12 2014 -0700
----------------------------------------------------------------------
.../spark/sql/execution/SparkStrategies.scala | 4 +
.../org/apache/spark/sql/execution/joins.scala | 183 ++++++++++++++++++-
.../scala/org/apache/spark/sql/JoinSuite.scala | 138 +++++++++++++-
3 files changed, 319 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4415722e/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d57b6ea..8bec015 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -94,6 +94,10 @@ private[sql] abstract class SparkStrategies extends
QueryPlanner[SparkPlan] {
leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
+ case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left,
right) =>
+ execution.HashOuterJoin(
+ leftKeys, rightKeys, joinType, condition, planLater(left),
planLater(right)) :: Nil
+
case _ => Nil
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4415722e/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
index b068579..82f0a74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
@@ -72,7 +72,7 @@ trait HashJoin {
while (buildIter.hasNext) {
currentRow = buildIter.next()
val rowKey = buildSideKeyGenerator(currentRow)
- if(!rowKey.anyNull) {
+ if (!rowKey.anyNull) {
val existingMatchList = hashTable.get(rowKey)
val matchList = if (existingMatchList == null) {
val newMatchList = new ArrayBuffer[Row]()
@@ -137,6 +137,185 @@ trait HashJoin {
}
/**
+ * Constant Value for Binary Join Node
+ */
+object HashOuterJoin {
+ val DUMMY_LIST = Seq[Row](null)
+ val EMPTY_LIST = Seq[Row]()
+}
+
+/**
+ * :: DeveloperApi ::
+ * Performs a hash based outer join for two child relations by shuffling the
data using
+ * the join keys. This operator requires loading the associated partition in
both side into memory.
+ */
+@DeveloperApi
+case class HashOuterJoin(
+ leftKeys: Seq[Expression],
+ rightKeys: Seq[Expression],
+ joinType: JoinType,
+ condition: Option[Expression],
+ left: SparkPlan,
+ right: SparkPlan) extends BinaryNode {
+
+ override def outputPartitioning: Partitioning = left.outputPartitioning
+
+ override def requiredChildDistribution =
+ ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
+
+ def output = left.output ++ right.output
+
+ // TODO we need to rewrite all of the iterators with our own implementation
instead of the Scala
+ // iterator for performance purpose.
+
+ private[this] def leftOuterIterator(
+ key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row]):
Iterator[Row] = {
+ val joinedRow = new JoinedRow()
+ val rightNullRow = new GenericRow(right.output.length)
+ val boundCondition =
+ condition.map(newPredicate(_, left.output ++
right.output)).getOrElse((row: Row) => true)
+
+ leftIter.iterator.flatMap { l =>
+ joinedRow.withLeft(l)
+ var matched = false
+ (if (!key.anyNull) rightIter.collect { case r if
(boundCondition(joinedRow.withRight(r))) =>
+ matched = true
+ joinedRow.copy
+ } else {
+ Nil
+ }) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
+ // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to
add additional row,
+ // as we don't know whether we need to append it until finish
iterating all of the
+ // records in right side.
+ // If we didn't get any proper row, then append a single row with
empty right
+ joinedRow.withRight(rightNullRow).copy
+ })
+ }
+ }
+
+ private[this] def rightOuterIterator(
+ key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row]):
Iterator[Row] = {
+ val joinedRow = new JoinedRow()
+ val leftNullRow = new GenericRow(left.output.length)
+ val boundCondition =
+ condition.map(newPredicate(_, left.output ++
right.output)).getOrElse((row: Row) => true)
+
+ rightIter.iterator.flatMap { r =>
+ joinedRow.withRight(r)
+ var matched = false
+ (if (!key.anyNull) leftIter.collect { case l if
(boundCondition(joinedRow.withLeft(l))) =>
+ matched = true
+ joinedRow.copy
+ } else {
+ Nil
+ }) ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
+ // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to
add additional row,
+ // as we don't know whether we need to append it until finish
iterating all of the
+ // records in left side.
+ // If we didn't get any proper row, then append a single row with
empty left.
+ joinedRow.withLeft(leftNullRow).copy
+ })
+ }
+ }
+
+ private[this] def fullOuterIterator(
+ key: Row, leftIter: Iterable[Row], rightIter: Iterable[Row]):
Iterator[Row] = {
+ val joinedRow = new JoinedRow()
+ val leftNullRow = new GenericRow(left.output.length)
+ val rightNullRow = new GenericRow(right.output.length)
+ val boundCondition =
+ condition.map(newPredicate(_, left.output ++
right.output)).getOrElse((row: Row) => true)
+
+ if (!key.anyNull) {
+ // Store the positions of records in right, if one of its associated row
satisfy
+ // the join condition.
+ val rightMatchedSet = scala.collection.mutable.Set[Int]()
+ leftIter.iterator.flatMap[Row] { l =>
+ joinedRow.withLeft(l)
+ var matched = false
+ rightIter.zipWithIndex.collect {
+ // 1. For those matched (satisfy the join condition) records with
both sides filled,
+ // append them directly
+
+ case (r, idx) if (boundCondition(joinedRow.withRight(r)))=> {
+ matched = true
+ // if the row satisfy the join condition, add its index into the
matched set
+ rightMatchedSet.add(idx)
+ joinedRow.copy
+ }
+ } ++ HashOuterJoin.DUMMY_LIST.filter(_ => !matched).map( _ => {
+ // 2. For those unmatched records in left, append additional records
with empty right.
+
+ // HashOuterJoin.DUMMY_LIST.filter(_ => !matched) is a tricky way to
add additional row,
+ // as we don't know whether we need to append it until finish
iterating all
+ // of the records in right side.
+ // If we didn't get any proper row, then append a single row with
empty right.
+ joinedRow.withRight(rightNullRow).copy
+ })
+ } ++ rightIter.zipWithIndex.collect {
+ // 3. For those unmatched records in right, append additional records
with empty left.
+
+ // Re-visiting the records in right, and append additional row with
empty left, if its not
+ // in the matched set.
+ case (r, idx) if (!rightMatchedSet.contains(idx)) => {
+ joinedRow(leftNullRow, r).copy
+ }
+ }
+ } else {
+ leftIter.iterator.map[Row] { l =>
+ joinedRow(l, rightNullRow).copy
+ } ++ rightIter.iterator.map[Row] { r =>
+ joinedRow(leftNullRow, r).copy
+ }
+ }
+ }
+
+ private[this] def buildHashTable(
+ iter: Iterator[Row], keyGenerator: Projection): Map[Row,
ArrayBuffer[Row]] = {
+ // TODO: Use Spark's HashMap implementation.
+ val hashTable = scala.collection.mutable.Map[Row, ArrayBuffer[Row]]()
+ while (iter.hasNext) {
+ val currentRow = iter.next()
+ val rowKey = keyGenerator(currentRow)
+
+ val existingMatchList = hashTable.getOrElseUpdate(rowKey, {new
ArrayBuffer[Row]()})
+ existingMatchList += currentRow.copy()
+ }
+
+ hashTable.toMap[Row, ArrayBuffer[Row]]
+ }
+
+ def execute() = {
+ left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
+ // TODO this probably can be replaced by external sort (sort merged
join?)
+ // Build HashMap for current partition in left relation
+ val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys,
left.output))
+ // Build HashMap for current partition in right relation
+ val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys,
right.output))
+
+ val boundCondition =
+ condition.map(newPredicate(_, left.output ++
right.output)).getOrElse((row: Row) => true)
+ joinType match {
+ case LeftOuter => leftHashTable.keysIterator.flatMap { key =>
+ leftOuterIterator(key, leftHashTable.getOrElse(key,
HashOuterJoin.EMPTY_LIST),
+ rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+ }
+ case RightOuter => rightHashTable.keysIterator.flatMap { key =>
+ rightOuterIterator(key, leftHashTable.getOrElse(key,
HashOuterJoin.EMPTY_LIST),
+ rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+ }
+ case FullOuter => (leftHashTable.keySet ++
rightHashTable.keySet).iterator.flatMap { key =>
+ fullOuterIterator(key,
+ leftHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST),
+ rightHashTable.getOrElse(key, HashOuterJoin.EMPTY_LIST))
+ }
+ case x => throw new Exception(s"Need to add implementation for $x")
+ }
+ }
+ }
+}
+
+/**
* :: DeveloperApi ::
* Performs an inner hash join of two child relations by first shuffling the
data using the join
* keys.
@@ -189,7 +368,7 @@ case class LeftSemiJoinHash(
while (buildIter.hasNext) {
currentRow = buildIter.next()
val rowKey = buildSideKeyGenerator(currentRow)
- if(!rowKey.anyNull) {
+ if (!rowKey.anyNull) {
val keyExists = hashSet.contains(rowKey)
if (!keyExists) {
hashSet.add(rowKey)
http://git-wip-us.apache.org/repos/asf/spark/blob/4415722e/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 025c396..0378906 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -17,11 +17,17 @@
package org.apache.spark.sql
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter,
Inner}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter,
Inner, LeftSemi}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._
-class JoinSuite extends QueryTest {
+class JoinSuite extends QueryTest with BeforeAndAfterEach {
// Ensures tables are loaded.
TestData
@@ -34,6 +40,56 @@ class JoinSuite extends QueryTest {
assert(planned.size === 1)
}
+ test("join operator selection") {
+ def assertJoin(sqlString: String, c: Class[_]): Any = {
+ val rdd = sql(sqlString)
+ val physical = rdd.queryExecution.sparkPlan
+ val operators = physical.collect {
+ case j: ShuffledHashJoin => j
+ case j: HashOuterJoin => j
+ case j: LeftSemiJoinHash => j
+ case j: BroadcastHashJoin => j
+ case j: LeftSemiJoinBNL => j
+ case j: CartesianProduct => j
+ case j: BroadcastNestedLoopJoin => j
+ }
+
+ assert(operators.size === 1)
+ if (operators(0).getClass() != c) {
+ fail(s"$sqlString expected operator: $c, but got ${operators(0)}\n
physical: \n$physical")
+ }
+ }
+
+ val cases1 = Seq(
+ ("SELECT * FROM testData left semi join testData2 ON key = a",
classOf[LeftSemiJoinHash]),
+ ("SELECT * FROM testData left semi join testData2",
classOf[LeftSemiJoinBNL]),
+ ("SELECT * FROM testData join testData2", classOf[CartesianProduct]),
+ ("SELECT * FROM testData join testData2 where key=2",
classOf[CartesianProduct]),
+ ("SELECT * FROM testData left join testData2",
classOf[CartesianProduct]),
+ ("SELECT * FROM testData right join testData2",
classOf[CartesianProduct]),
+ ("SELECT * FROM testData full outer join testData2",
classOf[CartesianProduct]),
+ ("SELECT * FROM testData left join testData2 where key=2",
classOf[CartesianProduct]),
+ ("SELECT * FROM testData right join testData2 where key=2",
classOf[CartesianProduct]),
+ ("SELECT * FROM testData full outer join testData2 where key=2",
classOf[CartesianProduct]),
+ ("SELECT * FROM testData join testData2 where key>a",
classOf[CartesianProduct]),
+ ("SELECT * FROM testData full outer join testData2 where key>a",
classOf[CartesianProduct]),
+ ("SELECT * FROM testData join testData2 ON key = a",
classOf[ShuffledHashJoin]),
+ ("SELECT * FROM testData join testData2 ON key = a and key=2",
classOf[ShuffledHashJoin]),
+ ("SELECT * FROM testData join testData2 ON key = a where key=2",
classOf[ShuffledHashJoin]),
+ ("SELECT * FROM testData left join testData2 ON key = a",
classOf[HashOuterJoin]),
+ ("SELECT * FROM testData right join testData2 ON key = a where key=2",
+ classOf[HashOuterJoin]),
+ ("SELECT * FROM testData right join testData2 ON key = a and key=2",
+ classOf[HashOuterJoin]),
+ ("SELECT * FROM testData full outer join testData2 ON key = a",
classOf[HashOuterJoin]),
+ ("SELECT * FROM testData join testData2 ON key = a",
classOf[ShuffledHashJoin]),
+ ("SELECT * FROM testData join testData2 ON key = a and key=2",
classOf[ShuffledHashJoin]),
+ ("SELECT * FROM testData join testData2 ON key = a where key=2",
classOf[ShuffledHashJoin])
+ // TODO add BroadcastNestedLoopJoin
+ )
+ cases1.foreach { c => assertJoin(c._1, c._2) }
+ }
+
test("multiple-key equi-join is hash-join") {
val x = testData2.as('x)
val y = testData2.as('y)
@@ -114,6 +170,33 @@ class JoinSuite extends QueryTest {
(4, "D", 4, "d") ::
(5, "E", null, null) ::
(6, "F", null, null) :: Nil)
+
+ checkAnswer(
+ upperCaseData.join(lowerCaseData, LeftOuter, Some('n === 'N && 'n > 1)),
+ (1, "A", null, null) ::
+ (2, "B", 2, "b") ::
+ (3, "C", 3, "c") ::
+ (4, "D", 4, "d") ::
+ (5, "E", null, null) ::
+ (6, "F", null, null) :: Nil)
+
+ checkAnswer(
+ upperCaseData.join(lowerCaseData, LeftOuter, Some('n === 'N && 'N > 1)),
+ (1, "A", null, null) ::
+ (2, "B", 2, "b") ::
+ (3, "C", 3, "c") ::
+ (4, "D", 4, "d") ::
+ (5, "E", null, null) ::
+ (6, "F", null, null) :: Nil)
+
+ checkAnswer(
+ upperCaseData.join(lowerCaseData, LeftOuter, Some('n === 'N && 'l > 'L)),
+ (1, "A", 1, "a") ::
+ (2, "B", 2, "b") ::
+ (3, "C", 3, "c") ::
+ (4, "D", 4, "d") ::
+ (5, "E", null, null) ::
+ (6, "F", null, null) :: Nil)
}
test("right outer join") {
@@ -125,11 +208,38 @@ class JoinSuite extends QueryTest {
(4, "d", 4, "D") ::
(null, null, 5, "E") ::
(null, null, 6, "F") :: Nil)
+ checkAnswer(
+ lowerCaseData.join(upperCaseData, RightOuter, Some('n === 'N && 'n > 1)),
+ (null, null, 1, "A") ::
+ (2, "b", 2, "B") ::
+ (3, "c", 3, "C") ::
+ (4, "d", 4, "D") ::
+ (null, null, 5, "E") ::
+ (null, null, 6, "F") :: Nil)
+ checkAnswer(
+ lowerCaseData.join(upperCaseData, RightOuter, Some('n === 'N && 'N > 1)),
+ (null, null, 1, "A") ::
+ (2, "b", 2, "B") ::
+ (3, "c", 3, "C") ::
+ (4, "d", 4, "D") ::
+ (null, null, 5, "E") ::
+ (null, null, 6, "F") :: Nil)
+ checkAnswer(
+ lowerCaseData.join(upperCaseData, RightOuter, Some('n === 'N && 'l >
'L)),
+ (1, "a", 1, "A") ::
+ (2, "b", 2, "B") ::
+ (3, "c", 3, "C") ::
+ (4, "d", 4, "D") ::
+ (null, null, 5, "E") ::
+ (null, null, 6, "F") :: Nil)
}
test("full outer join") {
- val left = upperCaseData.where('N <= 4).as('left)
- val right = upperCaseData.where('N >= 3).as('right)
+ upperCaseData.where('N <= 4).registerAsTable("left")
+ upperCaseData.where('N >= 3).registerAsTable("right")
+
+ val left = UnresolvedRelation(None, "left", None)
+ val right = UnresolvedRelation(None, "right", None)
checkAnswer(
left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)),
@@ -139,5 +249,25 @@ class JoinSuite extends QueryTest {
(4, "D", 4, "D") ::
(null, null, 5, "E") ::
(null, null, 6, "F") :: Nil)
+
+ checkAnswer(
+ left.join(right, FullOuter, Some(("left.N".attr === "right.N".attr) &&
("left.N".attr !== 3))),
+ (1, "A", null, null) ::
+ (2, "B", null, null) ::
+ (3, "C", null, null) ::
+ (null, null, 3, "C") ::
+ (4, "D", 4, "D") ::
+ (null, null, 5, "E") ::
+ (null, null, 6, "F") :: Nil)
+
+ checkAnswer(
+ left.join(right, FullOuter, Some(("left.N".attr === "right.N".attr) &&
("right.N".attr !== 3))),
+ (1, "A", null, null) ::
+ (2, "B", null, null) ::
+ (3, "C", null, null) ::
+ (null, null, 3, "C") ::
+ (4, "D", 4, "D") ::
+ (null, null, 5, "E") ::
+ (null, null, 6, "F") :: Nil)
}
}