This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new edc3b37 [SPARK-32142][SQL][TESTS] Keep the original tests and codes
to avoid potential conflicts in dev
edc3b37 is described below
commit edc3b37087f084fbabec8f53c45d99dc3370a129
Author: HyukjinKwon <[email protected]>
AuthorDate: Wed Jul 1 14:15:02 2020 +0900
[SPARK-32142][SQL][TESTS] Keep the original tests and codes to avoid
potential conflicts in dev
### What changes were proposed in this pull request?
This PR proposes to partially reverts back in the tests and some codes at
https://github.com/apache/spark/pull/27728 without touching any behaivours.
Most of changes in tests are back before #27728 by combining
`withNestedDataFrame` and `withParquetDataFrame`.
Basically, it addresses the comments
https://github.com/apache/spark/pull/27728#discussion_r397655390, and my own
comment in another PR at
https://github.com/apache/spark/pull/28761#discussion_r446761037
### Why are the changes needed?
For maintenance purpose and to avoid a potential conflicts during
backports. And also in case when other codes are matched with this.
### Does this PR introduce _any_ user-facing change?
No, dev-only.
### How was this patch tested?
Manually tested.
Closes #28955 from HyukjinKwon/SPARK-25556-followup.
Authored-by: HyukjinKwon <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 8194d9ef788278ec23902da851f2a3c95f5f71bf)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../datasources/parquet/ParquetFilterSuite.scala | 789 +++++++++++----------
.../datasources/parquet/ParquetIOSuite.scala | 20 +-
.../datasources/parquet/ParquetTest.scala | 14 +-
3 files changed, 411 insertions(+), 412 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index d20a07f..8b922aa 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -22,6 +22,9 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
import java.time.{LocalDate, LocalDateTime, ZoneId}
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+
import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate,
Operators}
import org.apache.parquet.filter2.predicate.FilterApi._
import org.apache.parquet.filter2.predicate.Operators.{Column => _, _}
@@ -106,10 +109,18 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
/**
- * Takes single level `inputDF` dataframe to generate multi-level nested
- * dataframes as new test data.
+ * Takes a sequence of products `data` to generate multi-level nested
+ * dataframes as new test data. It tests both non-nested and nested
dataframes
+ * which are written and read back with Parquet datasource.
+ *
+ * This is different from [[ParquetTest.withParquetDataFrame]] which does not
+ * test nested cases.
*/
- private def withNestedDataFrame(inputDF: DataFrame)
+ private def withNestedParquetDataFrame[T <: Product: ClassTag:
TypeTag](data: Seq[T])
+ (runTest: (DataFrame, String, Any => Any) => Unit): Unit =
+ withNestedParquetDataFrame(spark.createDataFrame(data))(runTest)
+
+ private def withNestedParquetDataFrame(inputDF: DataFrame)
(runTest: (DataFrame, String, Any => Any) => Unit): Unit = {
assert(inputDF.schema.fields.length == 1)
assert(!inputDF.schema.fields.head.dataType.isInstanceOf[StructType])
@@ -138,8 +149,11 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
"`a.b`.`c.d`", // one level nesting with column names containing `dots`
(x: Any) => Row(x)
)
- ).foreach { case (df, colName, resultFun) =>
- runTest(df, colName, resultFun)
+ ).foreach { case (newDF, colName, resultFun) =>
+ withTempPath { file =>
+ newDF.write.format(dataSourceName).save(file.getCanonicalPath)
+ readParquetFile(file.getCanonicalPath) { df => runTest(df, colName,
resultFun) }
+ }
}
}
@@ -155,7 +169,9 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
import testImplicits._
val df = data.map(i => Tuple1(Timestamp.valueOf(i))).toDF()
- withNestedDataFrame(df) { case (inputDF, colName, fun) =>
+ withNestedParquetDataFrame(df) { case (parquetDF, colName, fun) =>
+ implicit val df: DataFrame = parquetDF
+
def resultFun(tsStr: String): Any = {
val parsed = if (java8Api) {
LocalDateTime.parse(tsStr.replace(" ", "T"))
@@ -166,36 +182,35 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
fun(parsed)
}
- withParquetDataFrame(inputDF) { implicit df =>
- val tsAttr = df(colName).expr
- assert(df(colName).expr.dataType === TimestampType)
-
- checkFilterPredicate(tsAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate(tsAttr.isNotNull, classOf[NotEq[_]],
- data.map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(tsAttr === ts1.ts, classOf[Eq[_]], resultFun(ts1))
- checkFilterPredicate(tsAttr <=> ts1.ts, classOf[Eq[_]], resultFun(ts1))
- checkFilterPredicate(tsAttr =!= ts1.ts, classOf[NotEq[_]],
- Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(tsAttr < ts2.ts, classOf[Lt[_]], resultFun(ts1))
- checkFilterPredicate(tsAttr > ts1.ts, classOf[Gt[_]],
- Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
- checkFilterPredicate(tsAttr <= ts1.ts, classOf[LtEq[_]],
resultFun(ts1))
- checkFilterPredicate(tsAttr >= ts4.ts, classOf[GtEq[_]],
resultFun(ts4))
-
- checkFilterPredicate(Literal(ts1.ts) === tsAttr, classOf[Eq[_]],
resultFun(ts1))
- checkFilterPredicate(Literal(ts1.ts) <=> tsAttr, classOf[Eq[_]],
resultFun(ts1))
- checkFilterPredicate(Literal(ts2.ts) > tsAttr, classOf[Lt[_]],
resultFun(ts1))
- checkFilterPredicate(Literal(ts3.ts) < tsAttr, classOf[Gt[_]],
resultFun(ts4))
- checkFilterPredicate(Literal(ts1.ts) >= tsAttr, classOf[LtEq[_]],
resultFun(ts1))
- checkFilterPredicate(Literal(ts4.ts) <= tsAttr, classOf[GtEq[_]],
resultFun(ts4))
-
- checkFilterPredicate(!(tsAttr < ts4.ts), classOf[GtEq[_]],
resultFun(ts4))
- checkFilterPredicate(tsAttr < ts2.ts || tsAttr > ts3.ts,
classOf[Operators.Or],
- Seq(Row(resultFun(ts1)), Row(resultFun(ts4))))
- }
+
+ val tsAttr = df(colName).expr
+ assert(df(colName).expr.dataType === TimestampType)
+
+ checkFilterPredicate(tsAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(tsAttr.isNotNull, classOf[NotEq[_]],
+ data.map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(tsAttr === ts1.ts, classOf[Eq[_]], resultFun(ts1))
+ checkFilterPredicate(tsAttr <=> ts1.ts, classOf[Eq[_]], resultFun(ts1))
+ checkFilterPredicate(tsAttr =!= ts1.ts, classOf[NotEq[_]],
+ Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(tsAttr < ts2.ts, classOf[Lt[_]], resultFun(ts1))
+ checkFilterPredicate(tsAttr > ts1.ts, classOf[Gt[_]],
+ Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i))))
+ checkFilterPredicate(tsAttr <= ts1.ts, classOf[LtEq[_]], resultFun(ts1))
+ checkFilterPredicate(tsAttr >= ts4.ts, classOf[GtEq[_]], resultFun(ts4))
+
+ checkFilterPredicate(Literal(ts1.ts) === tsAttr, classOf[Eq[_]],
resultFun(ts1))
+ checkFilterPredicate(Literal(ts1.ts) <=> tsAttr, classOf[Eq[_]],
resultFun(ts1))
+ checkFilterPredicate(Literal(ts2.ts) > tsAttr, classOf[Lt[_]],
resultFun(ts1))
+ checkFilterPredicate(Literal(ts3.ts) < tsAttr, classOf[Gt[_]],
resultFun(ts4))
+ checkFilterPredicate(Literal(ts1.ts) >= tsAttr, classOf[LtEq[_]],
resultFun(ts1))
+ checkFilterPredicate(Literal(ts4.ts) <= tsAttr, classOf[GtEq[_]],
resultFun(ts4))
+
+ checkFilterPredicate(!(tsAttr < ts4.ts), classOf[GtEq[_]],
resultFun(ts4))
+ checkFilterPredicate(tsAttr < ts2.ts || tsAttr > ts3.ts,
classOf[Operators.Or],
+ Seq(Row(resultFun(ts1)), Row(resultFun(ts4))))
}
}
@@ -226,272 +241,264 @@ abstract class ParquetFilterSuite extends QueryTest
with ParquetTest with Shared
test("filter pushdown - boolean") {
val data = (true :: false :: Nil).map(b => Tuple1.apply(Option(b)))
- import testImplicits._
- withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
- withParquetDataFrame(inputDF) { implicit df =>
- val booleanAttr = df(colName).expr
- assert(df(colName).expr.dataType === BooleanType)
-
- checkFilterPredicate(booleanAttr.isNull, classOf[Eq[_]],
Seq.empty[Row])
- checkFilterPredicate(booleanAttr.isNotNull, classOf[NotEq[_]],
- Seq(Row(resultFun(true)), Row(resultFun(false))))
-
- checkFilterPredicate(booleanAttr === true, classOf[Eq[_]],
resultFun(true))
- checkFilterPredicate(booleanAttr <=> true, classOf[Eq[_]],
resultFun(true))
- checkFilterPredicate(booleanAttr =!= true, classOf[NotEq[_]],
resultFun(false))
- }
+ withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+ implicit val df: DataFrame = inputDF
+
+ val booleanAttr = df(colName).expr
+ assert(df(colName).expr.dataType === BooleanType)
+
+ checkFilterPredicate(booleanAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(booleanAttr.isNotNull, classOf[NotEq[_]],
+ Seq(Row(resultFun(true)), Row(resultFun(false))))
+
+ checkFilterPredicate(booleanAttr === true, classOf[Eq[_]],
resultFun(true))
+ checkFilterPredicate(booleanAttr <=> true, classOf[Eq[_]],
resultFun(true))
+ checkFilterPredicate(booleanAttr =!= true, classOf[NotEq[_]],
resultFun(false))
}
}
test("filter pushdown - tinyint") {
val data = (1 to 4).map(i => Tuple1(Option(i.toByte)))
- import testImplicits._
- withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
- withParquetDataFrame(inputDF) { implicit df =>
- val tinyIntAttr = df(colName).expr
- assert(df(colName).expr.dataType === ByteType)
-
- checkFilterPredicate(tinyIntAttr.isNull, classOf[Eq[_]],
Seq.empty[Row])
- checkFilterPredicate(tinyIntAttr.isNotNull, classOf[NotEq[_]],
- (1 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(tinyIntAttr === 1.toByte, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(tinyIntAttr <=> 1.toByte, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(tinyIntAttr =!= 1.toByte, classOf[NotEq[_]],
- (2 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(tinyIntAttr < 2.toByte, classOf[Lt[_]],
resultFun(1))
- checkFilterPredicate(tinyIntAttr > 3.toByte, classOf[Gt[_]],
resultFun(4))
- checkFilterPredicate(tinyIntAttr <= 1.toByte, classOf[LtEq[_]],
resultFun(1))
- checkFilterPredicate(tinyIntAttr >= 4.toByte, classOf[GtEq[_]],
resultFun(4))
-
- checkFilterPredicate(Literal(1.toByte) === tinyIntAttr,
classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(Literal(1.toByte) <=> tinyIntAttr,
classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(Literal(2.toByte) > tinyIntAttr, classOf[Lt[_]],
resultFun(1))
- checkFilterPredicate(Literal(3.toByte) < tinyIntAttr, classOf[Gt[_]],
resultFun(4))
- checkFilterPredicate(Literal(1.toByte) >= tinyIntAttr,
classOf[LtEq[_]], resultFun(1))
- checkFilterPredicate(Literal(4.toByte) <= tinyIntAttr,
classOf[GtEq[_]], resultFun(4))
-
- checkFilterPredicate(!(tinyIntAttr < 4.toByte), classOf[GtEq[_]],
resultFun(4))
- checkFilterPredicate(tinyIntAttr < 2.toByte || tinyIntAttr > 3.toByte,
- classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4))))
- }
+ withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+ implicit val df: DataFrame = inputDF
+
+ val tinyIntAttr = df(colName).expr
+ assert(df(colName).expr.dataType === ByteType)
+
+ checkFilterPredicate(tinyIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(tinyIntAttr.isNotNull, classOf[NotEq[_]],
+ (1 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(tinyIntAttr === 1.toByte, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(tinyIntAttr <=> 1.toByte, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(tinyIntAttr =!= 1.toByte, classOf[NotEq[_]],
+ (2 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(tinyIntAttr < 2.toByte, classOf[Lt[_]],
resultFun(1))
+ checkFilterPredicate(tinyIntAttr > 3.toByte, classOf[Gt[_]],
resultFun(4))
+ checkFilterPredicate(tinyIntAttr <= 1.toByte, classOf[LtEq[_]],
resultFun(1))
+ checkFilterPredicate(tinyIntAttr >= 4.toByte, classOf[GtEq[_]],
resultFun(4))
+
+ checkFilterPredicate(Literal(1.toByte) === tinyIntAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(1.toByte) <=> tinyIntAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(2.toByte) > tinyIntAttr, classOf[Lt[_]],
resultFun(1))
+ checkFilterPredicate(Literal(3.toByte) < tinyIntAttr, classOf[Gt[_]],
resultFun(4))
+ checkFilterPredicate(Literal(1.toByte) >= tinyIntAttr, classOf[LtEq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(4.toByte) <= tinyIntAttr, classOf[GtEq[_]],
resultFun(4))
+
+ checkFilterPredicate(!(tinyIntAttr < 4.toByte), classOf[GtEq[_]],
resultFun(4))
+ checkFilterPredicate(tinyIntAttr < 2.toByte || tinyIntAttr > 3.toByte,
+ classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4))))
}
}
test("filter pushdown - smallint") {
val data = (1 to 4).map(i => Tuple1(Option(i.toShort)))
- import testImplicits._
- withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
- withParquetDataFrame(inputDF) { implicit df =>
- val smallIntAttr = df(colName).expr
- assert(df(colName).expr.dataType === ShortType)
-
- checkFilterPredicate(smallIntAttr.isNull, classOf[Eq[_]],
Seq.empty[Row])
- checkFilterPredicate(smallIntAttr.isNotNull, classOf[NotEq[_]],
- (1 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(smallIntAttr === 1.toShort, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(smallIntAttr <=> 1.toShort, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(smallIntAttr =!= 1.toShort, classOf[NotEq[_]],
- (2 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(smallIntAttr < 2.toShort, classOf[Lt[_]],
resultFun(1))
- checkFilterPredicate(smallIntAttr > 3.toShort, classOf[Gt[_]],
resultFun(4))
- checkFilterPredicate(smallIntAttr <= 1.toShort, classOf[LtEq[_]],
resultFun(1))
- checkFilterPredicate(smallIntAttr >= 4.toShort, classOf[GtEq[_]],
resultFun(4))
-
- checkFilterPredicate(Literal(1.toShort) === smallIntAttr,
classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(Literal(1.toShort) <=> smallIntAttr,
classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(Literal(2.toShort) > smallIntAttr,
classOf[Lt[_]], resultFun(1))
- checkFilterPredicate(Literal(3.toShort) < smallIntAttr,
classOf[Gt[_]], resultFun(4))
- checkFilterPredicate(Literal(1.toShort) >= smallIntAttr,
classOf[LtEq[_]], resultFun(1))
- checkFilterPredicate(Literal(4.toShort) <= smallIntAttr,
classOf[GtEq[_]], resultFun(4))
-
- checkFilterPredicate(!(smallIntAttr < 4.toShort), classOf[GtEq[_]],
resultFun(4))
- checkFilterPredicate(smallIntAttr < 2.toShort || smallIntAttr >
3.toShort,
- classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4))))
- }
+ withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+ implicit val df: DataFrame = inputDF
+
+ val smallIntAttr = df(colName).expr
+ assert(df(colName).expr.dataType === ShortType)
+
+ checkFilterPredicate(smallIntAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(smallIntAttr.isNotNull, classOf[NotEq[_]],
+ (1 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(smallIntAttr === 1.toShort, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(smallIntAttr <=> 1.toShort, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(smallIntAttr =!= 1.toShort, classOf[NotEq[_]],
+ (2 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(smallIntAttr < 2.toShort, classOf[Lt[_]],
resultFun(1))
+ checkFilterPredicate(smallIntAttr > 3.toShort, classOf[Gt[_]],
resultFun(4))
+ checkFilterPredicate(smallIntAttr <= 1.toShort, classOf[LtEq[_]],
resultFun(1))
+ checkFilterPredicate(smallIntAttr >= 4.toShort, classOf[GtEq[_]],
resultFun(4))
+
+ checkFilterPredicate(Literal(1.toShort) === smallIntAttr,
classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(Literal(1.toShort) <=> smallIntAttr,
classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(Literal(2.toShort) > smallIntAttr, classOf[Lt[_]],
resultFun(1))
+ checkFilterPredicate(Literal(3.toShort) < smallIntAttr, classOf[Gt[_]],
resultFun(4))
+ checkFilterPredicate(Literal(1.toShort) >= smallIntAttr,
classOf[LtEq[_]], resultFun(1))
+ checkFilterPredicate(Literal(4.toShort) <= smallIntAttr,
classOf[GtEq[_]], resultFun(4))
+
+ checkFilterPredicate(!(smallIntAttr < 4.toShort), classOf[GtEq[_]],
resultFun(4))
+ checkFilterPredicate(smallIntAttr < 2.toShort || smallIntAttr >
3.toShort,
+ classOf[Operators.Or], Seq(Row(resultFun(1)), Row(resultFun(4))))
}
}
test("filter pushdown - integer") {
val data = (1 to 4).map(i => Tuple1(Option(i)))
- import testImplicits._
- withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
- withParquetDataFrame(inputDF) { implicit df =>
- val intAttr = df(colName).expr
- assert(df(colName).expr.dataType === IntegerType)
-
- checkFilterPredicate(intAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate(intAttr.isNotNull, classOf[NotEq[_]],
- (1 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(intAttr === 1, classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(intAttr <=> 1, classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(intAttr =!= 1, classOf[NotEq[_]],
- (2 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(intAttr < 2, classOf[Lt[_]], resultFun(1))
- checkFilterPredicate(intAttr > 3, classOf[Gt[_]], resultFun(4))
- checkFilterPredicate(intAttr <= 1, classOf[LtEq[_]], resultFun(1))
- checkFilterPredicate(intAttr >= 4, classOf[GtEq[_]], resultFun(4))
-
- checkFilterPredicate(Literal(1) === intAttr, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(Literal(1) <=> intAttr, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(Literal(2) > intAttr, classOf[Lt[_]],
resultFun(1))
- checkFilterPredicate(Literal(3) < intAttr, classOf[Gt[_]],
resultFun(4))
- checkFilterPredicate(Literal(1) >= intAttr, classOf[LtEq[_]],
resultFun(1))
- checkFilterPredicate(Literal(4) <= intAttr, classOf[GtEq[_]],
resultFun(4))
-
- checkFilterPredicate(!(intAttr < 4), classOf[GtEq[_]], resultFun(4))
- checkFilterPredicate(intAttr < 2 || intAttr > 3, classOf[Operators.Or],
- Seq(Row(resultFun(1)), Row(resultFun(4))))
- }
+ withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+ implicit val df: DataFrame = inputDF
+
+ val intAttr = df(colName).expr
+ assert(df(colName).expr.dataType === IntegerType)
+
+ checkFilterPredicate(intAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(intAttr.isNotNull, classOf[NotEq[_]],
+ (1 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(intAttr === 1, classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(intAttr <=> 1, classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(intAttr =!= 1, classOf[NotEq[_]],
+ (2 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(intAttr < 2, classOf[Lt[_]], resultFun(1))
+ checkFilterPredicate(intAttr > 3, classOf[Gt[_]], resultFun(4))
+ checkFilterPredicate(intAttr <= 1, classOf[LtEq[_]], resultFun(1))
+ checkFilterPredicate(intAttr >= 4, classOf[GtEq[_]], resultFun(4))
+
+ checkFilterPredicate(Literal(1) === intAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(1) <=> intAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(2) > intAttr, classOf[Lt[_]], resultFun(1))
+ checkFilterPredicate(Literal(3) < intAttr, classOf[Gt[_]], resultFun(4))
+ checkFilterPredicate(Literal(1) >= intAttr, classOf[LtEq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(4) <= intAttr, classOf[GtEq[_]],
resultFun(4))
+
+ checkFilterPredicate(!(intAttr < 4), classOf[GtEq[_]], resultFun(4))
+ checkFilterPredicate(intAttr < 2 || intAttr > 3, classOf[Operators.Or],
+ Seq(Row(resultFun(1)), Row(resultFun(4))))
}
}
test("filter pushdown - long") {
val data = (1 to 4).map(i => Tuple1(Option(i.toLong)))
- import testImplicits._
- withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
- withParquetDataFrame(inputDF) { implicit df =>
- val longAttr = df(colName).expr
- assert(df(colName).expr.dataType === LongType)
-
- checkFilterPredicate(longAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate(longAttr.isNotNull, classOf[NotEq[_]],
- (1 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(longAttr === 1, classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(longAttr <=> 1, classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(longAttr =!= 1, classOf[NotEq[_]],
- (2 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(longAttr < 2, classOf[Lt[_]], resultFun(1))
- checkFilterPredicate(longAttr > 3, classOf[Gt[_]], resultFun(4))
- checkFilterPredicate(longAttr <= 1, classOf[LtEq[_]], resultFun(1))
- checkFilterPredicate(longAttr >= 4, classOf[GtEq[_]], resultFun(4))
-
- checkFilterPredicate(Literal(1) === longAttr, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(Literal(1) <=> longAttr, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(Literal(2) > longAttr, classOf[Lt[_]],
resultFun(1))
- checkFilterPredicate(Literal(3) < longAttr, classOf[Gt[_]],
resultFun(4))
- checkFilterPredicate(Literal(1) >= longAttr, classOf[LtEq[_]],
resultFun(1))
- checkFilterPredicate(Literal(4) <= longAttr, classOf[GtEq[_]],
resultFun(4))
-
- checkFilterPredicate(!(longAttr < 4), classOf[GtEq[_]], resultFun(4))
- checkFilterPredicate(longAttr < 2 || longAttr > 3,
classOf[Operators.Or],
- Seq(Row(resultFun(1)), Row(resultFun(4))))
- }
+ withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+ implicit val df: DataFrame = inputDF
+
+ val longAttr = df(colName).expr
+ assert(df(colName).expr.dataType === LongType)
+
+ checkFilterPredicate(longAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(longAttr.isNotNull, classOf[NotEq[_]],
+ (1 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(longAttr === 1, classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(longAttr <=> 1, classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(longAttr =!= 1, classOf[NotEq[_]],
+ (2 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(longAttr < 2, classOf[Lt[_]], resultFun(1))
+ checkFilterPredicate(longAttr > 3, classOf[Gt[_]], resultFun(4))
+ checkFilterPredicate(longAttr <= 1, classOf[LtEq[_]], resultFun(1))
+ checkFilterPredicate(longAttr >= 4, classOf[GtEq[_]], resultFun(4))
+
+ checkFilterPredicate(Literal(1) === longAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(1) <=> longAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(2) > longAttr, classOf[Lt[_]], resultFun(1))
+ checkFilterPredicate(Literal(3) < longAttr, classOf[Gt[_]], resultFun(4))
+ checkFilterPredicate(Literal(1) >= longAttr, classOf[LtEq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(4) <= longAttr, classOf[GtEq[_]],
resultFun(4))
+
+ checkFilterPredicate(!(longAttr < 4), classOf[GtEq[_]], resultFun(4))
+ checkFilterPredicate(longAttr < 2 || longAttr > 3, classOf[Operators.Or],
+ Seq(Row(resultFun(1)), Row(resultFun(4))))
}
}
test("filter pushdown - float") {
val data = (1 to 4).map(i => Tuple1(Option(i.toFloat)))
- import testImplicits._
- withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
- withParquetDataFrame(inputDF) { implicit df =>
- val floatAttr = df(colName).expr
- assert(df(colName).expr.dataType === FloatType)
-
- checkFilterPredicate(floatAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate(floatAttr.isNotNull, classOf[NotEq[_]],
- (1 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(floatAttr === 1, classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(floatAttr <=> 1, classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(floatAttr =!= 1, classOf[NotEq[_]],
- (2 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(floatAttr < 2, classOf[Lt[_]], resultFun(1))
- checkFilterPredicate(floatAttr > 3, classOf[Gt[_]], resultFun(4))
- checkFilterPredicate(floatAttr <= 1, classOf[LtEq[_]], resultFun(1))
- checkFilterPredicate(floatAttr >= 4, classOf[GtEq[_]], resultFun(4))
-
- checkFilterPredicate(Literal(1) === floatAttr, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(Literal(1) <=> floatAttr, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(Literal(2) > floatAttr, classOf[Lt[_]],
resultFun(1))
- checkFilterPredicate(Literal(3) < floatAttr, classOf[Gt[_]],
resultFun(4))
- checkFilterPredicate(Literal(1) >= floatAttr, classOf[LtEq[_]],
resultFun(1))
- checkFilterPredicate(Literal(4) <= floatAttr, classOf[GtEq[_]],
resultFun(4))
-
- checkFilterPredicate(!(floatAttr < 4), classOf[GtEq[_]], resultFun(4))
- checkFilterPredicate(floatAttr < 2 || floatAttr > 3,
classOf[Operators.Or],
- Seq(Row(resultFun(1)), Row(resultFun(4))))
- }
+ withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+ implicit val df: DataFrame = inputDF
+
+ val floatAttr = df(colName).expr
+ assert(df(colName).expr.dataType === FloatType)
+
+ checkFilterPredicate(floatAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(floatAttr.isNotNull, classOf[NotEq[_]],
+ (1 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(floatAttr === 1, classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(floatAttr <=> 1, classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(floatAttr =!= 1, classOf[NotEq[_]],
+ (2 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(floatAttr < 2, classOf[Lt[_]], resultFun(1))
+ checkFilterPredicate(floatAttr > 3, classOf[Gt[_]], resultFun(4))
+ checkFilterPredicate(floatAttr <= 1, classOf[LtEq[_]], resultFun(1))
+ checkFilterPredicate(floatAttr >= 4, classOf[GtEq[_]], resultFun(4))
+
+ checkFilterPredicate(Literal(1) === floatAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(1) <=> floatAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(2) > floatAttr, classOf[Lt[_]],
resultFun(1))
+ checkFilterPredicate(Literal(3) < floatAttr, classOf[Gt[_]],
resultFun(4))
+ checkFilterPredicate(Literal(1) >= floatAttr, classOf[LtEq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(4) <= floatAttr, classOf[GtEq[_]],
resultFun(4))
+
+ checkFilterPredicate(!(floatAttr < 4), classOf[GtEq[_]], resultFun(4))
+ checkFilterPredicate(floatAttr < 2 || floatAttr > 3,
classOf[Operators.Or],
+ Seq(Row(resultFun(1)), Row(resultFun(4))))
}
}
test("filter pushdown - double") {
val data = (1 to 4).map(i => Tuple1(Option(i.toDouble)))
- import testImplicits._
- withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
- withParquetDataFrame(inputDF) { implicit df =>
- val doubleAttr = df(colName).expr
- assert(df(colName).expr.dataType === DoubleType)
-
- checkFilterPredicate(doubleAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate(doubleAttr.isNotNull, classOf[NotEq[_]],
- (1 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(doubleAttr === 1, classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(doubleAttr <=> 1, classOf[Eq[_]], resultFun(1))
- checkFilterPredicate(doubleAttr =!= 1, classOf[NotEq[_]],
- (2 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(doubleAttr < 2, classOf[Lt[_]], resultFun(1))
- checkFilterPredicate(doubleAttr > 3, classOf[Gt[_]], resultFun(4))
- checkFilterPredicate(doubleAttr <= 1, classOf[LtEq[_]], resultFun(1))
- checkFilterPredicate(doubleAttr >= 4, classOf[GtEq[_]], resultFun(4))
-
- checkFilterPredicate(Literal(1) === doubleAttr, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(Literal(1) <=> doubleAttr, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(Literal(2) > doubleAttr, classOf[Lt[_]],
resultFun(1))
- checkFilterPredicate(Literal(3) < doubleAttr, classOf[Gt[_]],
resultFun(4))
- checkFilterPredicate(Literal(1) >= doubleAttr, classOf[LtEq[_]],
resultFun(1))
- checkFilterPredicate(Literal(4) <= doubleAttr, classOf[GtEq[_]],
resultFun(4))
-
- checkFilterPredicate(!(doubleAttr < 4), classOf[GtEq[_]], resultFun(4))
- checkFilterPredicate(doubleAttr < 2 || doubleAttr > 3,
classOf[Operators.Or],
- Seq(Row(resultFun(1)), Row(resultFun(4))))
- }
+ withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+ implicit val df: DataFrame = inputDF
+
+ val doubleAttr = df(colName).expr
+ assert(df(colName).expr.dataType === DoubleType)
+
+ checkFilterPredicate(doubleAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(doubleAttr.isNotNull, classOf[NotEq[_]],
+ (1 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(doubleAttr === 1, classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(doubleAttr <=> 1, classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(doubleAttr =!= 1, classOf[NotEq[_]],
+ (2 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(doubleAttr < 2, classOf[Lt[_]], resultFun(1))
+ checkFilterPredicate(doubleAttr > 3, classOf[Gt[_]], resultFun(4))
+ checkFilterPredicate(doubleAttr <= 1, classOf[LtEq[_]], resultFun(1))
+ checkFilterPredicate(doubleAttr >= 4, classOf[GtEq[_]], resultFun(4))
+
+ checkFilterPredicate(Literal(1) === doubleAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(1) <=> doubleAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(2) > doubleAttr, classOf[Lt[_]],
resultFun(1))
+ checkFilterPredicate(Literal(3) < doubleAttr, classOf[Gt[_]],
resultFun(4))
+ checkFilterPredicate(Literal(1) >= doubleAttr, classOf[LtEq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(4) <= doubleAttr, classOf[GtEq[_]],
resultFun(4))
+
+ checkFilterPredicate(!(doubleAttr < 4), classOf[GtEq[_]], resultFun(4))
+ checkFilterPredicate(doubleAttr < 2 || doubleAttr > 3,
classOf[Operators.Or],
+ Seq(Row(resultFun(1)), Row(resultFun(4))))
}
}
test("filter pushdown - string") {
val data = (1 to 4).map(i => Tuple1(Option(i.toString)))
- import testImplicits._
- withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
- withParquetDataFrame(inputDF) { implicit df =>
- val stringAttr = df(colName).expr
- assert(df(colName).expr.dataType === StringType)
-
- checkFilterPredicate(stringAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate(stringAttr.isNotNull, classOf[NotEq[_]],
- (1 to 4).map(i => Row.apply(resultFun(i.toString))))
-
- checkFilterPredicate(stringAttr === "1", classOf[Eq[_]],
resultFun("1"))
- checkFilterPredicate(stringAttr <=> "1", classOf[Eq[_]],
resultFun("1"))
- checkFilterPredicate(stringAttr =!= "1", classOf[NotEq[_]],
- (2 to 4).map(i => Row.apply(resultFun(i.toString))))
-
- checkFilterPredicate(stringAttr < "2", classOf[Lt[_]], resultFun("1"))
- checkFilterPredicate(stringAttr > "3", classOf[Gt[_]], resultFun("4"))
- checkFilterPredicate(stringAttr <= "1", classOf[LtEq[_]],
resultFun("1"))
- checkFilterPredicate(stringAttr >= "4", classOf[GtEq[_]],
resultFun("4"))
-
- checkFilterPredicate(Literal("1") === stringAttr, classOf[Eq[_]],
resultFun("1"))
- checkFilterPredicate(Literal("1") <=> stringAttr, classOf[Eq[_]],
resultFun("1"))
- checkFilterPredicate(Literal("2") > stringAttr, classOf[Lt[_]],
resultFun("1"))
- checkFilterPredicate(Literal("3") < stringAttr, classOf[Gt[_]],
resultFun("4"))
- checkFilterPredicate(Literal("1") >= stringAttr, classOf[LtEq[_]],
resultFun("1"))
- checkFilterPredicate(Literal("4") <= stringAttr, classOf[GtEq[_]],
resultFun("4"))
-
- checkFilterPredicate(!(stringAttr < "4"), classOf[GtEq[_]],
resultFun("4"))
- checkFilterPredicate(stringAttr < "2" || stringAttr > "3",
classOf[Operators.Or],
- Seq(Row(resultFun("1")), Row(resultFun("4"))))
- }
+ withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+ implicit val df: DataFrame = inputDF
+
+ val stringAttr = df(colName).expr
+ assert(df(colName).expr.dataType === StringType)
+
+ checkFilterPredicate(stringAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(stringAttr.isNotNull, classOf[NotEq[_]],
+ (1 to 4).map(i => Row.apply(resultFun(i.toString))))
+
+ checkFilterPredicate(stringAttr === "1", classOf[Eq[_]], resultFun("1"))
+ checkFilterPredicate(stringAttr <=> "1", classOf[Eq[_]], resultFun("1"))
+ checkFilterPredicate(stringAttr =!= "1", classOf[NotEq[_]],
+ (2 to 4).map(i => Row.apply(resultFun(i.toString))))
+
+ checkFilterPredicate(stringAttr < "2", classOf[Lt[_]], resultFun("1"))
+ checkFilterPredicate(stringAttr > "3", classOf[Gt[_]], resultFun("4"))
+ checkFilterPredicate(stringAttr <= "1", classOf[LtEq[_]], resultFun("1"))
+ checkFilterPredicate(stringAttr >= "4", classOf[GtEq[_]], resultFun("4"))
+
+ checkFilterPredicate(Literal("1") === stringAttr, classOf[Eq[_]],
resultFun("1"))
+ checkFilterPredicate(Literal("1") <=> stringAttr, classOf[Eq[_]],
resultFun("1"))
+ checkFilterPredicate(Literal("2") > stringAttr, classOf[Lt[_]],
resultFun("1"))
+ checkFilterPredicate(Literal("3") < stringAttr, classOf[Gt[_]],
resultFun("4"))
+ checkFilterPredicate(Literal("1") >= stringAttr, classOf[LtEq[_]],
resultFun("1"))
+ checkFilterPredicate(Literal("4") <= stringAttr, classOf[GtEq[_]],
resultFun("4"))
+
+ checkFilterPredicate(!(stringAttr < "4"), classOf[GtEq[_]],
resultFun("4"))
+ checkFilterPredicate(stringAttr < "2" || stringAttr > "3",
classOf[Operators.Or],
+ Seq(Row(resultFun("1")), Row(resultFun("4"))))
}
}
@@ -501,38 +508,37 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
val data = (1 to 4).map(i => Tuple1(Option(i.b)))
- import testImplicits._
- withNestedDataFrame(data.toDF()) { case (inputDF, colName, resultFun) =>
- withParquetDataFrame(inputDF) { implicit df =>
- val binaryAttr: Expression = df(colName).expr
- assert(df(colName).expr.dataType === BinaryType)
-
- checkFilterPredicate(binaryAttr === 1.b, classOf[Eq[_]],
resultFun(1.b))
- checkFilterPredicate(binaryAttr <=> 1.b, classOf[Eq[_]],
resultFun(1.b))
-
- checkFilterPredicate(binaryAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
- checkFilterPredicate(binaryAttr.isNotNull, classOf[NotEq[_]],
- (1 to 4).map(i => Row.apply(resultFun(i.b))))
-
- checkFilterPredicate(binaryAttr =!= 1.b, classOf[NotEq[_]],
- (2 to 4).map(i => Row.apply(resultFun(i.b))))
-
- checkFilterPredicate(binaryAttr < 2.b, classOf[Lt[_]], resultFun(1.b))
- checkFilterPredicate(binaryAttr > 3.b, classOf[Gt[_]], resultFun(4.b))
- checkFilterPredicate(binaryAttr <= 1.b, classOf[LtEq[_]],
resultFun(1.b))
- checkFilterPredicate(binaryAttr >= 4.b, classOf[GtEq[_]],
resultFun(4.b))
-
- checkFilterPredicate(Literal(1.b) === binaryAttr, classOf[Eq[_]],
resultFun(1.b))
- checkFilterPredicate(Literal(1.b) <=> binaryAttr, classOf[Eq[_]],
resultFun(1.b))
- checkFilterPredicate(Literal(2.b) > binaryAttr, classOf[Lt[_]],
resultFun(1.b))
- checkFilterPredicate(Literal(3.b) < binaryAttr, classOf[Gt[_]],
resultFun(4.b))
- checkFilterPredicate(Literal(1.b) >= binaryAttr, classOf[LtEq[_]],
resultFun(1.b))
- checkFilterPredicate(Literal(4.b) <= binaryAttr, classOf[GtEq[_]],
resultFun(4.b))
-
- checkFilterPredicate(!(binaryAttr < 4.b), classOf[GtEq[_]],
resultFun(4.b))
- checkFilterPredicate(binaryAttr < 2.b || binaryAttr > 3.b,
classOf[Operators.Or],
- Seq(Row(resultFun(1.b)), Row(resultFun(4.b))))
- }
+ withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
+ implicit val df: DataFrame = inputDF
+
+ val binaryAttr: Expression = df(colName).expr
+ assert(df(colName).expr.dataType === BinaryType)
+
+ checkFilterPredicate(binaryAttr === 1.b, classOf[Eq[_]], resultFun(1.b))
+ checkFilterPredicate(binaryAttr <=> 1.b, classOf[Eq[_]], resultFun(1.b))
+
+ checkFilterPredicate(binaryAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(binaryAttr.isNotNull, classOf[NotEq[_]],
+ (1 to 4).map(i => Row.apply(resultFun(i.b))))
+
+ checkFilterPredicate(binaryAttr =!= 1.b, classOf[NotEq[_]],
+ (2 to 4).map(i => Row.apply(resultFun(i.b))))
+
+ checkFilterPredicate(binaryAttr < 2.b, classOf[Lt[_]], resultFun(1.b))
+ checkFilterPredicate(binaryAttr > 3.b, classOf[Gt[_]], resultFun(4.b))
+ checkFilterPredicate(binaryAttr <= 1.b, classOf[LtEq[_]], resultFun(1.b))
+ checkFilterPredicate(binaryAttr >= 4.b, classOf[GtEq[_]], resultFun(4.b))
+
+ checkFilterPredicate(Literal(1.b) === binaryAttr, classOf[Eq[_]],
resultFun(1.b))
+ checkFilterPredicate(Literal(1.b) <=> binaryAttr, classOf[Eq[_]],
resultFun(1.b))
+ checkFilterPredicate(Literal(2.b) > binaryAttr, classOf[Lt[_]],
resultFun(1.b))
+ checkFilterPredicate(Literal(3.b) < binaryAttr, classOf[Gt[_]],
resultFun(4.b))
+ checkFilterPredicate(Literal(1.b) >= binaryAttr, classOf[LtEq[_]],
resultFun(1.b))
+ checkFilterPredicate(Literal(4.b) <= binaryAttr, classOf[GtEq[_]],
resultFun(4.b))
+
+ checkFilterPredicate(!(binaryAttr < 4.b), classOf[GtEq[_]],
resultFun(4.b))
+ checkFilterPredicate(binaryAttr < 2.b || binaryAttr > 3.b,
classOf[Operators.Or],
+ Seq(Row(resultFun(1.b)), Row(resultFun(4.b))))
}
}
@@ -546,56 +552,57 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
Seq(false, true).foreach { java8Api =>
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
- val df = data.map(i => Tuple1(Date.valueOf(i))).toDF()
- withNestedDataFrame(df) { case (inputDF, colName, fun) =>
+ val dates = data.map(i => Tuple1(Date.valueOf(i))).toDF()
+ withNestedParquetDataFrame(dates) { case (inputDF, colName, fun) =>
+ implicit val df: DataFrame = inputDF
+
def resultFun(dateStr: String): Any = {
val parsed = if (java8Api) LocalDate.parse(dateStr) else
Date.valueOf(dateStr)
fun(parsed)
}
- withParquetDataFrame(inputDF) { implicit df =>
- val dateAttr: Expression = df(colName).expr
- assert(df(colName).expr.dataType === DateType)
-
- checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]],
Seq.empty[Row])
- checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
- data.map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(dateAttr === "2018-03-18".date,
classOf[Eq[_]],
- resultFun("2018-03-18"))
- checkFilterPredicate(dateAttr <=> "2018-03-18".date,
classOf[Eq[_]],
- resultFun("2018-03-18"))
- checkFilterPredicate(dateAttr =!= "2018-03-18".date,
classOf[NotEq[_]],
- Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i =>
Row.apply(resultFun(i))))
-
- checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
- resultFun("2018-03-18"))
- checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
- resultFun("2018-03-21"))
- checkFilterPredicate(dateAttr <= "2018-03-18".date,
classOf[LtEq[_]],
- resultFun("2018-03-18"))
- checkFilterPredicate(dateAttr >= "2018-03-21".date,
classOf[GtEq[_]],
- resultFun("2018-03-21"))
-
- checkFilterPredicate(Literal("2018-03-18".date) === dateAttr,
classOf[Eq[_]],
- resultFun("2018-03-18"))
- checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr,
classOf[Eq[_]],
- resultFun("2018-03-18"))
- checkFilterPredicate(Literal("2018-03-19".date) > dateAttr,
classOf[Lt[_]],
- resultFun("2018-03-18"))
- checkFilterPredicate(Literal("2018-03-20".date) < dateAttr,
classOf[Gt[_]],
- resultFun("2018-03-21"))
- checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr,
classOf[LtEq[_]],
- resultFun("2018-03-18"))
- checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr,
classOf[GtEq[_]],
- resultFun("2018-03-21"))
-
- checkFilterPredicate(!(dateAttr < "2018-03-21".date),
classOf[GtEq[_]],
- resultFun("2018-03-21"))
- checkFilterPredicate(
- dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
- classOf[Operators.Or],
- Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21"))))
- }
+
+ val dateAttr: Expression = df(colName).expr
+ assert(df(colName).expr.dataType === DateType)
+
+ checkFilterPredicate(dateAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+ checkFilterPredicate(dateAttr.isNotNull, classOf[NotEq[_]],
+ data.map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(dateAttr === "2018-03-18".date, classOf[Eq[_]],
+ resultFun("2018-03-18"))
+ checkFilterPredicate(dateAttr <=> "2018-03-18".date, classOf[Eq[_]],
+ resultFun("2018-03-18"))
+ checkFilterPredicate(dateAttr =!= "2018-03-18".date,
classOf[NotEq[_]],
+ Seq("2018-03-19", "2018-03-20", "2018-03-21").map(i =>
Row.apply(resultFun(i))))
+
+ checkFilterPredicate(dateAttr < "2018-03-19".date, classOf[Lt[_]],
+ resultFun("2018-03-18"))
+ checkFilterPredicate(dateAttr > "2018-03-20".date, classOf[Gt[_]],
+ resultFun("2018-03-21"))
+ checkFilterPredicate(dateAttr <= "2018-03-18".date, classOf[LtEq[_]],
+ resultFun("2018-03-18"))
+ checkFilterPredicate(dateAttr >= "2018-03-21".date, classOf[GtEq[_]],
+ resultFun("2018-03-21"))
+
+ checkFilterPredicate(Literal("2018-03-18".date) === dateAttr,
classOf[Eq[_]],
+ resultFun("2018-03-18"))
+ checkFilterPredicate(Literal("2018-03-18".date) <=> dateAttr,
classOf[Eq[_]],
+ resultFun("2018-03-18"))
+ checkFilterPredicate(Literal("2018-03-19".date) > dateAttr,
classOf[Lt[_]],
+ resultFun("2018-03-18"))
+ checkFilterPredicate(Literal("2018-03-20".date) < dateAttr,
classOf[Gt[_]],
+ resultFun("2018-03-21"))
+ checkFilterPredicate(Literal("2018-03-18".date) >= dateAttr,
classOf[LtEq[_]],
+ resultFun("2018-03-18"))
+ checkFilterPredicate(Literal("2018-03-21".date) <= dateAttr,
classOf[GtEq[_]],
+ resultFun("2018-03-21"))
+
+ checkFilterPredicate(!(dateAttr < "2018-03-21".date),
classOf[GtEq[_]],
+ resultFun("2018-03-21"))
+ checkFilterPredicate(
+ dateAttr < "2018-03-19".date || dateAttr > "2018-03-20".date,
+ classOf[Operators.Or],
+ Seq(Row(resultFun("2018-03-18")), Row(resultFun("2018-03-21"))))
}
}
}
@@ -603,7 +610,9 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
test("filter pushdown - timestamp") {
Seq(true, false).foreach { java8Api =>
- withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+ withSQLConf(
+ SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString,
+ SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") {
// spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS
val millisData = Seq(
"1000-06-14 08:28:53.123",
@@ -630,11 +639,14 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key ->
ParquetOutputTimestampType.INT96.toString) {
import testImplicits._
- withParquetDataFrame(
- millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF()) {
implicit df =>
- val schema = new
SparkToParquetSchemaConverter(conf).convert(df.schema)
- assertResult(None) {
- createParquetFilters(schema).createFilter(sources.IsNull("_1"))
+ withTempPath { file =>
+ millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF
+ .write.format(dataSourceName).save(file.getCanonicalPath)
+ readParquetFile(file.getCanonicalPath) { df =>
+ val schema = new
SparkToParquetSchemaConverter(conf).convert(df.schema)
+ assertResult(None) {
+ createParquetFilters(schema).createFilter(sources.IsNull("_1"))
+ }
}
}
}
@@ -653,36 +665,36 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
val rdd =
spark.sparkContext.parallelize((1 to 4).map(i => Row(new
java.math.BigDecimal(i))))
val dataFrame = spark.createDataFrame(rdd, StructType.fromDDL(s"a
decimal($precision, 2)"))
- withNestedDataFrame(dataFrame) { case (inputDF, colName, resultFun) =>
- withParquetDataFrame(inputDF) { implicit df =>
- val decimalAttr: Expression = df(colName).expr
- assert(df(colName).expr.dataType === DecimalType(precision, 2))
-
- checkFilterPredicate(decimalAttr.isNull, classOf[Eq[_]],
Seq.empty[Row])
- checkFilterPredicate(decimalAttr.isNotNull, classOf[NotEq[_]],
- (1 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(decimalAttr === 1, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(decimalAttr <=> 1, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(decimalAttr =!= 1, classOf[NotEq[_]],
- (2 to 4).map(i => Row.apply(resultFun(i))))
-
- checkFilterPredicate(decimalAttr < 2, classOf[Lt[_]], resultFun(1))
- checkFilterPredicate(decimalAttr > 3, classOf[Gt[_]], resultFun(4))
- checkFilterPredicate(decimalAttr <= 1, classOf[LtEq[_]],
resultFun(1))
- checkFilterPredicate(decimalAttr >= 4, classOf[GtEq[_]],
resultFun(4))
-
- checkFilterPredicate(Literal(1) === decimalAttr, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(Literal(1) <=> decimalAttr, classOf[Eq[_]],
resultFun(1))
- checkFilterPredicate(Literal(2) > decimalAttr, classOf[Lt[_]],
resultFun(1))
- checkFilterPredicate(Literal(3) < decimalAttr, classOf[Gt[_]],
resultFun(4))
- checkFilterPredicate(Literal(1) >= decimalAttr, classOf[LtEq[_]],
resultFun(1))
- checkFilterPredicate(Literal(4) <= decimalAttr, classOf[GtEq[_]],
resultFun(4))
-
- checkFilterPredicate(!(decimalAttr < 4), classOf[GtEq[_]],
resultFun(4))
- checkFilterPredicate(decimalAttr < 2 || decimalAttr > 3,
classOf[Operators.Or],
- Seq(Row(resultFun(1)), Row(resultFun(4))))
- }
+ withNestedParquetDataFrame(dataFrame) { case (inputDF, colName,
resultFun) =>
+ implicit val df: DataFrame = inputDF
+
+ val decimalAttr: Expression = df(colName).expr
+ assert(df(colName).expr.dataType === DecimalType(precision, 2))
+
+ checkFilterPredicate(decimalAttr.isNull, classOf[Eq[_]],
Seq.empty[Row])
+ checkFilterPredicate(decimalAttr.isNotNull, classOf[NotEq[_]],
+ (1 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(decimalAttr === 1, classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(decimalAttr <=> 1, classOf[Eq[_]], resultFun(1))
+ checkFilterPredicate(decimalAttr =!= 1, classOf[NotEq[_]],
+ (2 to 4).map(i => Row.apply(resultFun(i))))
+
+ checkFilterPredicate(decimalAttr < 2, classOf[Lt[_]], resultFun(1))
+ checkFilterPredicate(decimalAttr > 3, classOf[Gt[_]], resultFun(4))
+ checkFilterPredicate(decimalAttr <= 1, classOf[LtEq[_]],
resultFun(1))
+ checkFilterPredicate(decimalAttr >= 4, classOf[GtEq[_]],
resultFun(4))
+
+ checkFilterPredicate(Literal(1) === decimalAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(1) <=> decimalAttr, classOf[Eq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(2) > decimalAttr, classOf[Lt[_]],
resultFun(1))
+ checkFilterPredicate(Literal(3) < decimalAttr, classOf[Gt[_]],
resultFun(4))
+ checkFilterPredicate(Literal(1) >= decimalAttr, classOf[LtEq[_]],
resultFun(1))
+ checkFilterPredicate(Literal(4) <= decimalAttr, classOf[GtEq[_]],
resultFun(4))
+
+ checkFilterPredicate(!(decimalAttr < 4), classOf[GtEq[_]],
resultFun(4))
+ checkFilterPredicate(decimalAttr < 2 || decimalAttr > 3,
classOf[Operators.Or],
+ Seq(Row(resultFun(1)), Row(resultFun(4))))
}
}
}
@@ -1195,8 +1207,7 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
test("SPARK-16371 Do not push down filters when inner name and outer name
are the same") {
- import testImplicits._
- withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i))).toDF()) {
implicit df =>
+ withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df =>
// Here the schema becomes as below:
//
// root
@@ -1336,10 +1347,7 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
test("filter pushdown - StringStartsWith") {
- withParquetDataFrame {
- import testImplicits._
- (1 to 4).map(i => Tuple1(i + "str" + i)).toDF()
- } { implicit df =>
+ withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit
df =>
checkFilterPredicate(
'_1.startsWith("").asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]],
@@ -1385,10 +1393,7 @@ abstract class ParquetFilterSuite extends QueryTest with
ParquetTest with Shared
}
// SPARK-28371: make sure filter is null-safe.
- withParquetDataFrame {
- import testImplicits._
- Seq(Tuple1[String](null)).toDF()
- } { implicit df =>
+ withParquetDataFrame(Seq(Tuple1[String](null))) { implicit df =>
checkFilterPredicate(
'_1.startsWith("blah").asInstanceOf[Predicate],
classOf[UserDefinedByInstance[_, _]],
@@ -1607,7 +1612,7 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
expected: Seq[Row]): Unit = {
val output = predicate.collect { case a: Attribute => a }.distinct
- Seq(("parquet", true), ("", false)).map { case (pushdownDsList,
nestedPredicatePushdown) =>
+ Seq(("parquet", true), ("", false)).foreach { case (pushdownDsList,
nestedPredicatePushdown) =>
withSQLConf(
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_FILTER_PUSHDOWN_DATE_ENABLED.key -> "true",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 79c3297..2dc8a06 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -85,7 +85,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with
SharedSparkSession
* Writes `data` to a Parquet file, reads it back and check file contents.
*/
protected def checkParquetFile[T <: Product : ClassTag: TypeTag](data:
Seq[T]): Unit = {
- withParquetDataFrame(data.toDF())(r => checkAnswer(r,
data.map(Row.fromTuple)))
+ withParquetDataFrame(data)(r => checkAnswer(r, data.map(Row.fromTuple)))
}
test("basic data types (without binary)") {
@@ -97,7 +97,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with
SharedSparkSession
test("raw binary") {
val data = (1 to 4).map(i => Tuple1(Array.fill(3)(i.toByte)))
- withParquetDataFrame(data.toDF()) { df =>
+ withParquetDataFrame(data) { df =>
assertResult(data.map(_._1.mkString(",")).sorted) {
df.collect().map(_.getAs[Array[Byte]](0).mkString(",")).sorted
}
@@ -200,7 +200,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
testStandardAndLegacyModes("struct") {
val data = (1 to 4).map(i => Tuple1((i, s"val_$i")))
- withParquetDataFrame(data.toDF()) { df =>
+ withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(struct) =>
Row(Row(struct.productIterator.toSeq: _*))
@@ -217,7 +217,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
)
)
}
- withParquetDataFrame(data.toDF()) { df =>
+ withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(array) =>
Row(array.map(struct => Row(struct.productIterator.toSeq: _*)))
@@ -236,7 +236,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
)
)
}
- withParquetDataFrame(data.toDF()) { df =>
+ withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(array) =>
Row(array.map { case Tuple1(Tuple1(str)) => Row(Row(str))})
@@ -246,7 +246,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
testStandardAndLegacyModes("nested struct with array of array as field") {
val data = (1 to 4).map(i => Tuple1((i, Seq(Seq(s"val_$i")))))
- withParquetDataFrame(data.toDF()) { df =>
+ withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(struct) =>
Row(Row(struct.productIterator.toSeq: _*))
@@ -263,7 +263,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
)
)
}
- withParquetDataFrame(data.toDF()) { df =>
+ withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(m) =>
Row(m.map { case (k, v) => Row(k.productIterator.toSeq: _*) -> v })
@@ -280,7 +280,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
)
)
}
- withParquetDataFrame(data.toDF()) { df =>
+ withParquetDataFrame(data) { df =>
// Structs are converted to `Row`s
checkAnswer(df, data.map { case Tuple1(m) =>
Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
@@ -296,7 +296,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
null.asInstanceOf[java.lang.Float],
null.asInstanceOf[java.lang.Double])
- withParquetDataFrame((allNulls :: Nil).toDF()) { df =>
+ withParquetDataFrame(allNulls :: Nil) { df =>
val rows = df.collect()
assert(rows.length === 1)
assert(rows.head === Row(Seq.fill(5)(null): _*))
@@ -309,7 +309,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
None.asInstanceOf[Option[Long]],
None.asInstanceOf[Option[String]])
- withParquetDataFrame((allNones :: Nil).toDF()) { df =>
+ withParquetDataFrame(allNones :: Nil) { df =>
val rows = df.collect()
assert(rows.length === 1)
assert(rows.head === Row(Seq.fill(3)(null): _*))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
index 105f025..db8ee72 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
@@ -63,18 +63,12 @@ private[sql] trait ParquetTest extends
FileBasedDataSourceTest {
(f: String => Unit): Unit = withDataSourceFile(data)(f)
/**
- * Writes `df` dataframe to a Parquet file and reads it back as a
[[DataFrame]],
+ * Writes `data` to a Parquet file and reads it back as a [[DataFrame]],
* which is then passed to `f`. The Parquet file will be deleted after `f`
returns.
*/
- protected def withParquetDataFrame(df: DataFrame, testVectorized: Boolean =
true)
- (f: DataFrame => Unit): Unit = {
- withTempPath { file =>
- withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key ->
"CORRECTED") {
- df.write.format(dataSourceName).save(file.getCanonicalPath)
- }
- readFile(file.getCanonicalPath, testVectorized)(f)
- }
- }
+ protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag]
+ (data: Seq[T], testVectorized: Boolean = true)
+ (f: DataFrame => Unit): Unit = withDataSourceDataFrame(data,
testVectorized)(f)
/**
* Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and
registers it as a
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]