Repository: spark
Updated Branches:
  refs/heads/branch-2.3 8d32ed5f2 -> 0fc5533e5


[SPARK-22790][SQL] add a configurable factor to describe HadoopFsRelation's size

## What changes were proposed in this pull request?

as per discussion in 
https://github.com/apache/spark/pull/19864#discussion_r156847927

the current HadoopFsRelation is purely based on the underlying file size which 
is not accurate and makes the execution vulnerable to errors like OOM

Users can enable CBO with the functionalities in 
https://github.com/apache/spark/pull/19864 to avoid this issue

This JIRA proposes to add a configurable factor to sizeInBytes method in 
HadoopFsRelation class so that users can mitigate this problem without CBO

## How was this patch tested?

Existing tests

Author: CodingCat <[email protected]>
Author: Nan Zhu <[email protected]>

Closes #20072 from CodingCat/SPARK-22790.

(cherry picked from commit ba891ec993c616dc4249fc786c56ea82ed04a827)
Signed-off-by: gatorsmile <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fc5533e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fc5533e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fc5533e

Branch: refs/heads/branch-2.3
Commit: 0fc5533e53ad03eb67590ddd231f40c2713150c3
Parents: 8d32ed5
Author: CodingCat <[email protected]>
Authored: Sun Jan 14 02:36:32 2018 +0800
Committer: gatorsmile <[email protected]>
Committed: Sun Jan 14 02:36:52 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala | 13 ++++++-
 .../datasources/HadoopFsRelation.scala          |  6 ++-
 .../datasources/HadoopFsRelationSuite.scala     | 41 ++++++++++++++++++++
 3 files changed, 58 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0fc5533e/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 36e802a..6746fbc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -249,7 +249,7 @@ object SQLConf {
   val CONSTRAINT_PROPAGATION_ENABLED = 
buildConf("spark.sql.constraintPropagation.enabled")
     .internal()
     .doc("When true, the query optimizer will infer and propagate data 
constraints in the query " +
-      "plan to optimize them. Constraint propagation can sometimes be 
computationally expensive" +
+      "plan to optimize them. Constraint propagation can sometimes be 
computationally expensive " +
       "for certain kinds of query plans (such as those with a large number of 
predicates and " +
       "aliases) which might negatively impact overall runtime.")
     .booleanConf
@@ -263,6 +263,15 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val FILE_COMRESSION_FACTOR = 
buildConf("spark.sql.sources.fileCompressionFactor")
+    .internal()
+    .doc("When estimating the output data size of a table scan, multiply the 
file size with this " +
+      "factor as the estimated data size, in case the data is compressed in 
the file and lead to" +
+      " a heavily underestimated result.")
+    .doubleConf
+    .checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0")
+    .createWithDefault(1.0)
+
   val PARQUET_SCHEMA_MERGING_ENABLED = 
buildConf("spark.sql.parquet.mergeSchema")
     .doc("When true, the Parquet data source merges schemas collected from all 
data files, " +
          "otherwise the schema is picked from the summary file or a random 
data file " +
@@ -1255,6 +1264,8 @@ class SQLConf extends Serializable with Logging {
 
   def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS)
 
+  def fileCompressionFactor: Double = getConf(FILE_COMRESSION_FACTOR)
+
   def stringRedationPattern: Option[Regex] = 
SQL_STRING_REDACTION_PATTERN.readFrom(reader)
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0fc5533e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index 89d8a85..6b34638 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -82,7 +82,11 @@ case class HadoopFsRelation(
     }
   }
 
-  override def sizeInBytes: Long = location.sizeInBytes
+  override def sizeInBytes: Long = {
+    val compressionFactor = sqlContext.conf.fileCompressionFactor
+    (location.sizeInBytes * compressionFactor).toLong
+  }
+
 
   override def inputFiles: Array[String] = location.inputFiles
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0fc5533e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
index caf0388..c1f2c18 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
 import java.io.{File, FilenameFilter}
 
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
 import org.apache.spark.sql.test.SharedSQLContext
 
 class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
@@ -39,4 +40,44 @@ class HadoopFsRelationSuite extends QueryTest with 
SharedSQLContext {
       assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(totalSize))
     }
   }
+
+  test("SPARK-22790: spark.sql.sources.compressionFactor takes effect") {
+    import testImplicits._
+    Seq(1.0, 0.5).foreach { compressionFactor =>
+      withSQLConf("spark.sql.sources.fileCompressionFactor" -> 
compressionFactor.toString,
+        "spark.sql.autoBroadcastJoinThreshold" -> "400") {
+        withTempPath { workDir =>
+          // the file size is 740 bytes
+          val workDirPath = workDir.getAbsolutePath
+          val data1 = Seq(100, 200, 300, 400).toDF("count")
+          data1.write.parquet(workDirPath + "/data1")
+          val df1FromFile = spark.read.parquet(workDirPath + "/data1")
+          val data2 = Seq(100, 200, 300, 400).toDF("count")
+          data2.write.parquet(workDirPath + "/data2")
+          val df2FromFile = spark.read.parquet(workDirPath + "/data2")
+          val joinedDF = df1FromFile.join(df2FromFile, Seq("count"))
+          if (compressionFactor == 0.5) {
+            val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case bJoin: BroadcastHashJoinExec => bJoin
+            }
+            assert(bJoinExec.nonEmpty)
+            val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case smJoin: SortMergeJoinExec => smJoin
+            }
+            assert(smJoinExec.isEmpty)
+          } else {
+            // compressionFactor is 1.0
+            val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case bJoin: BroadcastHashJoinExec => bJoin
+            }
+            assert(bJoinExec.isEmpty)
+            val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case smJoin: SortMergeJoinExec => smJoin
+            }
+            assert(smJoinExec.nonEmpty)
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to