Repository: spark
Updated Branches:
refs/heads/branch-1.5 a3ab67146 -> 0887e5e87
[SPARK-11153][SQL] Disables Parquet filter push-down for string and binary
columns
Due to PARQUET-251, `BINARY` columns in existing Parquet files may be written
with corrupted statistics information. This information is used by filter
push-down optimization. Since Spark 1.5 turns on Parquet filter push-down by
default, we may end up with wrong query results. PARQUET-251 has been fixed in
parquet-mr 1.8.1, but Spark 1.5 is still using 1.7.0.
This affects all Spark SQL data types that can be mapped to Parquet {{BINARY}},
namely:
- `StringType`
- `BinaryType`
- `DecimalType`
(But Spark SQL doesn't support pushing down filters involving `DecimalType`
columns for now.)
To avoid wrong query results, we should disable filter push-down for columns of
`StringType` and `BinaryType` until we upgrade to parquet-mr 1.8.
Author: Cheng Lian <[email protected]>
Closes #9152 from liancheng/spark-11153.workaround-parquet-251.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0887e5e8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0887e5e8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0887e5e8
Branch: refs/heads/branch-1.5
Commit: 0887e5e87891e8e22f534ca6d0406daf86ec2dad
Parents: a3ab671
Author: Cheng Lian <[email protected]>
Authored: Wed Oct 21 09:02:20 2015 +0800
Committer: Cheng Lian <[email protected]>
Committed: Wed Oct 21 09:02:20 2015 +0800
----------------------------------------------------------------------
.../datasources/parquet/ParquetFilters.scala | 27 ++++++++++++++++++++
.../parquet/ParquetFilterSuite.scala | 6 +++--
2 files changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/0887e5e8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index c6b3fe7..1f0405f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -60,6 +60,8 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Any) => FilterApi.eq(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
// Binary.fromString and Binary.fromByteArray don't accept null values
case StringType =>
(n: String, v: Any) => FilterApi.eq(
@@ -69,6 +71,7 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.eq(
binaryColumn(n),
Option(v).map(b =>
Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ */
}
private val makeNotEq: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
@@ -82,6 +85,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(floatColumn(n),
v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
@@ -90,6 +96,7 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.notEq(
binaryColumn(n),
Option(v).map(b =>
Binary.fromByteArray(v.asInstanceOf[Array[Byte]])).orNull)
+ */
}
private val makeLt: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
@@ -101,6 +108,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.lt(floatColumn(n),
v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.lt(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
@@ -108,6 +118,7 @@ private[sql] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.lt(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeLtEq: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
@@ -119,6 +130,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n),
v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
@@ -126,6 +140,7 @@ private[sql] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.ltEq(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeGt: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
@@ -137,6 +152,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gt(floatColumn(n),
v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gt(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
@@ -144,6 +162,7 @@ private[sql] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gt(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeGtEq: PartialFunction[DataType, (String, Any) =>
FilterPredicate] = {
@@ -155,6 +174,9 @@ private[sql] object ParquetFilters {
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n),
v.asInstanceOf[java.lang.Float])
case DoubleType =>
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n),
v.asInstanceOf[java.lang.Double])
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
@@ -162,6 +184,7 @@ private[sql] object ParquetFilters {
case BinaryType =>
(n: String, v: Any) =>
FilterApi.gtEq(binaryColumn(n),
Binary.fromByteArray(v.asInstanceOf[Array[Byte]]))
+ */
}
private val makeInSet: PartialFunction[DataType, (String, Set[Any]) =>
FilterPredicate] = {
@@ -177,6 +200,9 @@ private[sql] object ParquetFilters {
case DoubleType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(doubleColumn(n),
SetInFilter(v.asInstanceOf[Set[java.lang.Double]]))
+
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ /*
case StringType =>
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
@@ -185,6 +211,7 @@ private[sql] object ParquetFilters {
(n: String, v: Set[Any]) =>
FilterApi.userDefined(binaryColumn(n),
SetInFilter(v.map(e =>
Binary.fromByteArray(e.asInstanceOf[Array[Byte]]))))
+ */
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/0887e5e8/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 45ad3fd..7f4d367 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -219,7 +219,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest
with SharedSQLContex
}
}
- test("filter pushdown - string") {
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ ignore("filter pushdown - string") {
withParquetDataFrame((1 to 4).map(i => Tuple1(i.toString))) { implicit df
=>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
checkFilterPredicate(
@@ -247,7 +248,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest
with SharedSQLContex
}
}
- test("filter pushdown - binary") {
+ // See https://issues.apache.org/jira/browse/SPARK-11153
+ ignore("filter pushdown - binary") {
implicit class IntToBinary(int: Int) {
def b: Array[Byte] = int.toString.getBytes("UTF-8")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]