Repository: spark
Updated Branches:
refs/heads/master f1b220eee -> b4c32c495
[SPARK-15549][SQL] Disable bucketing when the output doesn't contain all
bucketing columns
## What changes were proposed in this pull request?
I create a bucketed table bucketed_table with bucket column i,
```scala
case class Data(i: Int, j: Int, k: Int)
sc.makeRDD(Array((1, 2, 3))).map(x => Data(x._1, x._2,
x._3)).toDF.write.bucketBy(2, "i").saveAsTable("bucketed_table")
```
and I run the following SQLs:
```sql
SELECT j FROM bucketed_table;
Error in query: bucket column i not found in existing columns (j);
SELECT j, MAX(k) FROM bucketed_table GROUP BY j;
Error in query: bucket column i not found in existing columns (j, k);
```
I think we should add a check that, we only enable bucketing when it satisfies
all conditions below:
1. the conf is enabled
2. the relation is bucketed
3. the output contains all bucketing columns
## How was this patch tested?
Updated test cases to reflect the changes.
Author: Yadong Qi <[email protected]>
Closes #13321 from watermen/SPARK-15549.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4c32c49
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4c32c49
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4c32c49
Branch: refs/heads/master
Commit: b4c32c4952f7af2733258aa4e27f21e8832c8a3a
Parents: f1b220e
Author: Yadong Qi <[email protected]>
Authored: Sat May 28 10:19:29 2016 -0700
Committer: Wenchen Fan <[email protected]>
Committed: Sat May 28 10:19:29 2016 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/execution/ExistingRDD.scala | 13 ++++++-------
.../apache/spark/sql/sources/BucketedReadSuite.scala | 11 +++++++++++
2 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b4c32c49/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 412f5fa..fef3255 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -347,15 +347,14 @@ private[sql] object DataSourceScanExec {
case _ => None
}
- def toAttribute(colName: String): Attribute = output.find(_.name ==
colName).getOrElse {
- throw new AnalysisException(s"bucket column $colName not found in
existing columns " +
- s"(${output.map(_.name).mkString(", ")})")
- }
-
bucketSpec.map { spec =>
val numBuckets = spec.numBuckets
- val bucketColumns = spec.bucketColumnNames.map(toAttribute)
- HashPartitioning(bucketColumns, numBuckets)
+ val bucketColumns = spec.bucketColumnNames.flatMap { n =>
output.find(_.name == n) }
+ if (bucketColumns.size == spec.bucketColumnNames.size) {
+ HashPartitioning(bucketColumns, numBuckets)
+ } else {
+ UnknownPartitioning(0)
+ }
}.getOrElse {
UnknownPartitioning(0)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b4c32c49/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index f9891ac..bab0092 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -362,4 +362,15 @@ class BucketedReadSuite extends QueryTest with
SQLTestUtils with TestHiveSinglet
assert(error.toString contains "Invalid bucket file")
}
}
+
+ test("disable bucketing when the output doesn't contain all bucketing
columns") {
+ withTable("bucketed_table") {
+ df1.write.format("parquet").bucketBy(8,
"i").saveAsTable("bucketed_table")
+
+ checkAnswer(hiveContext.table("bucketed_table").select("j"),
df1.select("j"))
+
+
checkAnswer(hiveContext.table("bucketed_table").groupBy("j").agg(max("k")),
+ df1.groupBy("j").agg(max("k")))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]