Repository: spark
Updated Branches:
  refs/heads/master a0300ea32 -> e47c38763


[SPARK-4391][SQL] Configure parquet filters using SQLConf

This is more uniform with the rest of SQL configuration and allows it to be 
turned on and off without restarting the SparkContext.  In this PR I also turn 
off filter pushdown by default due to a number of outstanding issues (in 
particular SPARK-4258).  When those are fixed we should turn it back on by 
default.

Author: Michael Armbrust <[email protected]>

Closes #3258 from marmbrus/parquetFilters and squashes the following commits:

5655bfe [Michael Armbrust] Remove extra line.
15e9a98 [Michael Armbrust] Enable filters for tests
75afd39 [Michael Armbrust] Fix comments
78fa02d [Michael Armbrust] off by default
e7f9e16 [Michael Armbrust] First draft of correctly configuring parquet filter 
pushdown


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e47c3876
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e47c3876
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e47c3876

Branch: refs/heads/master
Commit: e47c38763914aaf89a7a851c5f41b7549a75615b
Parents: a0300ea
Author: Michael Armbrust <[email protected]>
Authored: Fri Nov 14 14:59:35 2014 -0800
Committer: Michael Armbrust <[email protected]>
Committed: Fri Nov 14 14:59:35 2014 -0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/sql/SQLConf.scala  |  8 +++++++-
 .../apache/spark/sql/execution/SparkStrategies.scala   |  7 +++++--
 .../org/apache/spark/sql/parquet/ParquetFilters.scala  |  2 --
 .../spark/sql/parquet/ParquetTableOperations.scala     | 13 +++++++------
 .../apache/spark/sql/parquet/ParquetQuerySuite.scala   |  2 ++
 5 files changed, 21 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 279495a..cd7d78e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -22,7 +22,6 @@ import scala.collection.JavaConversions._
 
 import java.util.Properties
 
-
 private[spark] object SQLConf {
   val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
   val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
@@ -32,9 +31,12 @@ private[spark] object SQLConf {
   val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
   val CODEGEN_ENABLED = "spark.sql.codegen"
   val DIALECT = "spark.sql.dialect"
+
   val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
   val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
   val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
+  val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
+
   val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
 
   // This is only used for the thriftserver
@@ -90,6 +92,10 @@ private[sql] trait SQLConf {
   /** Number of partitions to use for shuffle operators. */
   private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, 
"200").toInt
 
+  /** When true predicates will be passed to the parquet record reader when 
possible. */
+  private[spark] def parquetFilterPushDown =
+    getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+
   /**
    * When set to true, Spark SQL will use the Scala compiler at runtime to 
generate custom bytecode
    * that evaluates expressions found in queries.  In general this custom code 
runs much faster

http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index cc7e0c0..03cd5bd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -208,7 +208,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
       case PhysicalOperation(projectList, filters: Seq[Expression], relation: 
ParquetRelation) =>
         val prunePushedDownFilters =
-          if 
(sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, 
true)) {
+          if (sqlContext.parquetFilterPushDown) {
             (filters: Seq[Expression]) => {
               filters.filter { filter =>
                 // Note: filters cannot be pushed down to Parquet if they 
contain more complex
@@ -234,7 +234,10 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           projectList,
           filters,
           prunePushedDownFilters,
-          ParquetTableScan(_, relation, filters)) :: Nil
+          ParquetTableScan(
+            _,
+            relation,
+            if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil
 
       case _ => Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
index 1e67799..9a3f6d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala
@@ -43,8 +43,6 @@ import org.apache.spark.sql.parquet.ParquetColumns._
 
 private[sql] object ParquetFilters {
   val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
-  // set this to false if pushdown should be disabled
-  val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.hints.parquetFilterPushdown"
 
   def createRecordFilter(filterExpressions: Seq[Expression]): Filter = {
     val filters: Seq[CatalystFilter] = filterExpressions.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 74c43e0..5f93279 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -23,6 +23,8 @@ import java.text.SimpleDateFormat
 import java.util.concurrent.{Callable, TimeUnit}
 import java.util.{ArrayList, Collections, Date, List => JList}
 
+import org.apache.spark.annotation.DeveloperApi
+
 import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.util.Try
@@ -52,6 +54,7 @@ import org.apache.spark.sql.execution.{LeafNode, SparkPlan, 
UnaryNode}
 import org.apache.spark.{Logging, SerializableWritable, TaskContext}
 
 /**
+ * :: DeveloperApi ::
  * Parquet table scan operator. Imports the file that backs the given
  * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
  */
@@ -108,15 +111,11 @@ case class ParquetTableScan(
     // Note 1: the input format ignores all predicates that cannot be expressed
     // as simple column predicate filters in Parquet. Here we just record
     // the whole pruning predicate.
-    // Note 2: you can disable filter predicate pushdown by setting
-    // "spark.sql.hints.parquetFilterPushdown" to false inside SparkConf.
-    if (columnPruningPred.length > 0 &&
-      sc.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, 
true)) {
-      
+    if (columnPruningPred.length > 0) {
       // Set this in configuration of ParquetInputFormat, needed for 
RowGroupFiltering
       val filter: Filter = ParquetFilters.createRecordFilter(columnPruningPred)
       if (filter != null){
-        val filterPredicate = 
filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate()
+        val filterPredicate = 
filter.asInstanceOf[FilterPredicateCompat].getFilterPredicate
         ParquetInputFormat.setFilterPredicate(conf, filterPredicate)  
       }
     }
@@ -193,6 +192,7 @@ case class ParquetTableScan(
 }
 
 /**
+ * :: DeveloperApi ::
  * Operator that acts as a sink for queries on RDDs and can be used to
  * store the output inside a directory of Parquet files. This operator
  * is similar to Hive's INSERT INTO TABLE operation in the sense that
@@ -208,6 +208,7 @@ case class ParquetTableScan(
  * cause unpredicted behaviour and therefore results in a RuntimeException
  * (only detected via filename pattern so will not catch all cases).
  */
+@DeveloperApi
 case class InsertIntoParquetTable(
     relation: ParquetRelation,
     child: SparkPlan,

http://git-wip-us.apache.org/repos/asf/spark/blob/e47c3876/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 3cccafe..80a3e0b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -95,6 +95,8 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike 
with BeforeAndAfterA
     testRDD.registerTempTable("testsource")
     parquetFile(ParquetTestData.testFilterDir.toString)
       .registerTempTable("testfiltersource")
+
+    setConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED, "true")
   }
 
   override def afterAll() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to