Repository: spark Updated Branches: refs/heads/master f14ae4900 -> 745ab8bc5
[SPARK-18379][SQL] Make the parallelism of parallelPartitionDiscovery configurable. ## What changes were proposed in this pull request? The largest parallelism in PartitioningAwareFileIndex #listLeafFilesInParallel() is 10000 in hard code. We may need to make this number configurable. And in PR, I reduce it to 100. ## How was this patch tested? Existing ut. Author: genmao.ygm <[email protected]> Author: dylon <[email protected]> Closes #15829 from uncleGen/SPARK-18379. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/745ab8bc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/745ab8bc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/745ab8bc Branch: refs/heads/master Commit: 745ab8bc50da89c42b297de9dcb833e5f2074481 Parents: f14ae49 Author: genmao.ygm <[email protected]> Authored: Tue Nov 15 10:32:43 2016 -0800 Committer: Yin Huai <[email protected]> Committed: Tue Nov 15 10:32:43 2016 -0800 ---------------------------------------------------------------------- .../datasources/PartitioningAwareFileIndex.scala | 4 +++- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/745ab8bc/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 3740caa..705a1e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -315,10 +315,12 @@ object PartitioningAwareFileIndex extends Logging { val sparkContext = sparkSession.sparkContext val serializableConfiguration = new SerializableConfiguration(hadoopConf) val serializedPaths = paths.map(_.toString) + val parallelPartitionDiscoveryParallelism = + sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism // Set the number of parallelism to prevent following file listing from generating many tasks // in case of large #defaultParallelism. - val numParallelism = Math.min(paths.size, 10000) + val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) val statusMap = sparkContext .parallelize(serializedPaths, numParallelism) http://git-wip-us.apache.org/repos/asf/spark/blob/745ab8bc/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 71f3a67..6372936 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -396,6 +396,14 @@ object SQLConf { .intConf .createWithDefault(32) + val PARALLEL_PARTITION_DISCOVERY_PARALLELISM = + SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.parallelism") + .doc("The number of parallelism to list a collection of path recursively, Set the " + + "number to prevent file listing from generating too many tasks.") + .internal() + .intConf + .createWithDefault(10000) + // Whether to automatically resolve ambiguity in join conditions for self-joins. // See SPARK-6231. val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = @@ -774,6 +782,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def parallelPartitionDiscoveryThreshold: Int = getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD) + def parallelPartitionDiscoveryParallelism: Int = + getConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_PARALLELISM) + def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED) def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
