Repository: spark Updated Branches: refs/heads/master a0300ea32 -> e47c38763
[SPARK-4391][SQL] Configure parquet filters using SQLConf This is more uniform with the rest of SQL configuration and allows it to be turned on and off without restarting the SparkContext. In this PR I also turn off filter pushdown by default due to a number of outstanding issues (in particular SPARK-4258). When those are fixed we should turn it back on by default. Author: Michael Armbrust <[email protected]> Closes #3258 from marmbrus/parquetFilters and squashes the following commits: 5655bfe [Michael Armbrust] Remove extra line. 15e9a98 [Michael Armbrust] Enable filters for tests 75afd39 [Michael Armbrust] Fix comments 78fa02d [Michael Armbrust] off by default e7f9e16 [Michael Armbrust] First draft of correctly configuring parquet filter pushdown Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e47c3876 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e47c3876 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e47c3876 Branch: refs/heads/master Commit: e47c38763914aaf89a7a851c5f41b7549a75615b Parents: a0300ea Author: Michael Armbrust <[email protected]> Authored: Fri Nov 14 14:59:35 2014 -0800 Committer: Michael Armbrust <[email protected]> Committed: Fri Nov 14 14:59:35 2014 -0800 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/sql/SQLConf.scala | 8 +++++++- .../apache/spark/sql/execution/SparkStrategies.scala | 7 +++++-- .../org/apache/spark/sql/parquet/ParquetFilters.scala | 2 -- .../spark/sql/parquet/ParquetTableOperations.scala | 13 +++++++------ .../apache/spark/sql/parquet/ParquetQuerySuite.scala | 2 ++ 5 files changed, 21 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 279495a..cd7d78e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -22,7 +22,6 @@ import scala.collection.JavaConversions._ import java.util.Properties - private[spark] object SQLConf { val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed" val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize" @@ -32,9 +31,12 @@ private[spark] object SQLConf { val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" val CODEGEN_ENABLED = "spark.sql.codegen" val DIALECT = "spark.sql.dialect" + val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString" val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata" val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec" + val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown" + val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord" // This is only used for the thriftserver @@ -90,6 +92,10 @@ private[sql] trait SQLConf { /** Number of partitions to use for shuffle operators. */ private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt + /** When true predicates will be passed to the parquet record reader when possible. */ + private[spark] def parquetFilterPushDown = + getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean + /** * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode * that evaluates expressions found in queries. In general this custom code runs much faster http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index cc7e0c0..03cd5bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -208,7 +208,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => val prunePushedDownFilters = - if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { + if (sqlContext.parquetFilterPushDown) { (filters: Seq[Expression]) => { filters.filter { filter => // Note: filters cannot be pushed down to Parquet if they contain more complex @@ -234,7 +234,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { projectList, filters, prunePushedDownFilters, - ParquetTableScan(_, relation, filters)) :: Nil + ParquetTableScan( + _, + relation, + if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil case _ => Nil } http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index 1e67799..9a3f6d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -43,8 +43,6 @@ import org.apache.spark.sql.parquet.ParquetColumns._ private[sql] object ParquetFilters { val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter" - // set this to false if pushdown should be disabled - val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown" def createRecordFilter(filterExpressions: Seq[Expression]): Filter = { val filters: Seq[CatalystFilter] = filterExpressions.collect { http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 74c43e0..5f93279 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -23,6 +23,8 @@ import java.text.SimpleDateFormat import java.util.concurrent.{Callable, TimeUnit} import java.util.{ArrayList, Collections, Date, List => JList} +import org.apache.spark.annotation.DeveloperApi + import scala.collection.JavaConversions._ import scala.collection.mutable import scala.util.Try @@ -52,6 +54,7 @@ import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode} import org.apache.spark.{Logging, SerializableWritable, TaskContext} /** + * :: DeveloperApi :: * Parquet table scan operator. Imports the file that backs the given * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``. */ @@ -108,15 +111,11 @@ case class ParquetTableScan( // Note 1: the input format ignores all predicates that cannot be expressed // as simple column predicate filters in Parquet. Here we just record // the whole pruning predicate. - // Note 2: you can disable filter predicate pushdown by setting - // "spark.sql.hints.parquetFilterPushdown" to false inside SparkConf. - if (columnPruningPred.length > 0 && - sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { - + if (columnPruningPred.length > 0) { // Set this in configuration of ParquetInputFormat, needed for RowGroupFiltering val filter: Filter = ParquetFilters.createRecordFilter(columnPruningPred) if (filter != null){ - val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate() + val filterPredicate = filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate ParquetInputFormat.setFilterPredicate(conf, filterPredicate) } } @@ -193,6 +192,7 @@ case class ParquetTableScan( } /** + * :: DeveloperApi :: * Operator that acts as a sink for queries on RDDs and can be used to * store the output inside a directory of Parquet files. This operator * is similar to Hive's INSERT INTO TABLE operation in the sense that @@ -208,6 +208,7 @@ case class ParquetTableScan( * cause unpredicted behaviour and therefore results in a RuntimeException * (only detected via filename pattern so will not catch all cases). */ +@DeveloperApi case class InsertIntoParquetTable( relation: ParquetRelation, child: SparkPlan, http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 3cccafe..80a3e0b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -95,6 +95,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA testRDD.registerTempTable("testsource") parquetFile(ParquetTestData.testFilterDir.toString) .registerTempTable("testfiltersource") + + setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true") } override def afterAll() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
