Repository: spark
Updated Branches:
  refs/heads/master ca458618d -> dadf0138b


[SPARK-14259][SQL] Add a FileSourceStrategy option for limiting #files in a 
partition

## What changes were proposed in this pull request?
This pr is to add a config to control the maximum number of files as even small 
files have a non-trivial fixed cost. The current packing can put a lot of small 
files together which cases straggler tasks.

## How was this patch tested?
I added tests to check if many files get split into partitions in 
FileSourceStrategySuite.

Author: Takeshi YAMAMURO <[email protected]>

Closes #12068 from maropu/SPARK-14259.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dadf0138
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dadf0138
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dadf0138

Branch: refs/heads/master
Commit: dadf0138b3f6fd618677a2c26f40ab66b7a1139d
Parents: ca45861
Author: Takeshi YAMAMURO <[email protected]>
Authored: Wed Mar 30 16:02:48 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Wed Mar 30 16:02:48 2016 -0700

----------------------------------------------------------------------
 .../datasources/FileSourceStrategy.scala        |  7 ++-
 .../org/apache/spark/sql/internal/SQLConf.scala |  7 +++
 .../datasources/FileSourceStrategySuite.scala   | 47 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dadf0138/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 4448796..d653408 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -136,7 +136,9 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
 
         case _ =>
           val maxSplitBytes = files.sqlContext.conf.filesMaxPartitionBytes
-          logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes 
bytes")
+          val maxFileNumInPartition = 
files.sqlContext.conf.filesMaxNumInPartition
+          logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes 
bytes, " +
+            s"max #files: $maxFileNumInPartition")
 
           val splitFiles = selectedPartitions.flatMap { partition =>
             partition.files.flatMap { file =>
@@ -174,7 +176,8 @@ private[sql] object FileSourceStrategy extends Strategy 
with Logging {
           // Assign files to partitions using "First Fit Decreasing" (FFD)
           // TODO: consider adding a slop factor here?
           splitFiles.foreach { file =>
-            if (currentSize + file.length > maxSplitBytes) {
+            if (currentSize + file.length > maxSplitBytes ||
+                currentFiles.length >= maxFileNumInPartition) {
               closePartition()
               addFile(file)
             } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/dadf0138/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index ca6ba4c..d06e908 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -524,6 +524,11 @@ object SQLConf {
     doc = "The maximum number of bytes to pack into a single partition when 
reading files.",
     isPublic = true)
 
+  val FILES_MAX_NUM_IN_PARTITION = 
longConf("spark.sql.files.maxNumInPartition",
+    defaultValue = Some(32),
+    doc = "The maximum number of files to pack into a single partition when 
reading files.",
+    isPublic = true)
+
   val EXCHANGE_REUSE_ENABLED = booleanConf("spark.sql.exchange.reuse",
     defaultValue = Some(true),
     doc = "When true, the planner will try to find out duplicated exchanges 
and re-use them.",
@@ -581,6 +586,8 @@ class SQLConf extends Serializable with CatalystConf with 
ParserConf with Loggin
 
   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
 
+  def filesMaxNumInPartition: Long = getConf(FILES_MAX_NUM_IN_PARTITION)
+
   def useCompression: Boolean = getConf(COMPRESS_CACHED)
 
   def useFileScan: Boolean = getConf(USE_FILE_SCAN)

http://git-wip-us.apache.org/repos/asf/spark/blob/dadf0138/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 1fa1573..45620bc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -121,6 +121,53 @@ class FileSourceStrategySuite extends QueryTest with 
SharedSQLContext with Predi
     }
   }
 
+  test("Unpartitioned table, many files that get split") {
+    val table =
+      createTable(
+        files = Seq(
+          "file1" -> 2,
+          "file2" -> 2,
+          "file3" -> 1,
+          "file4" -> 1,
+          "file5" -> 1,
+          "file6" -> 1))
+
+    withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "3",
+        SQLConf.FILES_MAX_NUM_IN_PARTITION.key -> "2") {
+      checkScan(table.select('c1)) { partitions =>
+        // Files should be laid out [(file1), (file2, file3), (file4, file5), 
(file6)]
+        assert(partitions.size == 4, "when checking partitions")
+        assert(partitions(0).files.size == 1, "when checking partition 1")
+        assert(partitions(1).files.size == 2, "when checking partition 2")
+        assert(partitions(2).files.size == 2, "when checking partition 3")
+        assert(partitions(3).files.size == 1, "when checking partition 4")
+
+        // First partition reads (file1)
+        assert(partitions(0).files(0).start == 0)
+        assert(partitions(0).files(0).length == 2)
+
+        // Second partition reads (file2, file3)
+        assert(partitions(1).files(0).start == 0)
+        assert(partitions(1).files(0).length == 2)
+        assert(partitions(1).files(1).start == 0)
+        assert(partitions(1).files(1).length == 1)
+
+        // Third partition reads (file4, file5)
+        assert(partitions(2).files(0).start == 0)
+        assert(partitions(2).files(0).length == 1)
+        assert(partitions(2).files(1).start == 0)
+        assert(partitions(2).files(1).length == 1)
+
+        // Final partition reads (file6)
+        assert(partitions(3).files(0).start == 0)
+        assert(partitions(3).files(0).length == 1)
+      }
+
+      checkPartitionSchema(StructType(Nil))
+      checkDataSchema(StructType(Nil).add("c1", IntegerType))
+    }
+  }
+
   test("partitioned table") {
     val table =
       createTable(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to