This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new b9cd81a KYLIN-4892 Reduce the times of fetching files status from HDFS Namenode in FilePruner b9cd81a is described below commit b9cd81a0b1710913af469b04d44f3451d6a87f0c Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Mon Feb 1 16:57:02 2021 +0800 KYLIN-4892 Reduce the times of fetching files status from HDFS Namenode in FilePruner (cherry picked from commit 6e4d94d1c027d5877eb3013f37ef223aa0532cc2) (cherry picked from commit edebb98ca33e1f3ddf000842f12d0bff45109c57) --- .../sql/execution/datasource/FilePruner.scala | 109 +++++++++++---------- .../org/apache/spark/sql/SparderContext.scala | 5 +- .../engine/spark2/NBadQueryAndPushDownTest.java | 2 +- 3 files changed, 62 insertions(+), 54 deletions(-) diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index 9f62f52..f0f7916 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -51,10 +51,9 @@ case class SegmentDirectory(segmentName: String, identifier: String, files: Seq[ * @param shardColumnNames the names of the columns that used to generate the shard id. * @param sortColumnNames the names of the columns that used to sort data in each shard. */ -case class ShardSpec( - numShards: Int, - shardColumnNames: Seq[String], - sortColumnNames: Seq[String]) { +case class ShardSpec(numShards: Int, + shardColumnNames: Seq[String], + sortColumnNames: Seq[String]) { if (numShards <= 0) { throw new AnalysisException( @@ -72,28 +71,17 @@ case class ShardSpec( } } -class FilePruner( - cubeInstance: CubeInstance, - cuboid: Cuboid, - val session: SparkSession, - val options: Map[String, String]) +class FilePruner(cubeInstance: CubeInstance, + cuboid: Cuboid, + val session: SparkSession, + val options: Map[String, String]) extends FileIndex with ResetShufflePartition with Logging { private lazy val segmentDirs: Seq[SegmentDirectory] = { cubeInstance.getSegments.asScala .filter(_.getStatus.equals(SegmentStatusEnum.READY)).map(seg => { - val segName = seg.getName - val path = PathManager.getParquetStoragePath(cubeInstance, segName, seg.getStorageLocationIdentifier, layoutEntity.getId) - val files = new InMemoryFileIndex(session, - Seq(new Path(path)), - options, - Some(dataSchema), - FileStatusCache.getOrCreate(session)) - .listFiles(Nil, Nil) - .flatMap(_.files) - .filter(_.isFile) - SegmentDirectory(segName, seg.getStorageLocationIdentifier, files) - }).filter(_.files.nonEmpty) + SegmentDirectory(seg.getName, seg.getStorageLocationIdentifier, Nil) + }) } val layoutEntity = MetadataConverter.toLayoutEntity(cubeInstance, cuboid) @@ -103,7 +91,8 @@ class FilePruner( .map { column => StructField(column.id.toString, column.dataType) } .toSeq ++ layoutEntity.getOrderedMeasures.asScala - .map { entry => StructField(entry._1.toString, SparkTypeUtil.generateFunctionReturnDataType(entry._2)) } + .map { entry => + StructField(entry._1.toString, SparkTypeUtil.generateFunctionReturnDataType(entry._2)) } .toSeq) } @@ -115,7 +104,6 @@ class FilePruner( PathManager.getParquetStoragePath(cubeInstance, segmentName, identifier, cuboid.getId) } - override lazy val partitionSchema: StructType = { // we did not use the partitionBy mechanism of spark new StructType() @@ -130,7 +118,8 @@ class FilePruner( val ref = desc.getPartitionDateColumnRef // only consider partition date column // we can only get col ID in layout cuz data schema is all ids. - val id = layoutEntity.getOrderedDimensions.asScala.values.find(column => column.columnName.equals(ref.getName)) + val id = layoutEntity.getOrderedDimensions.asScala.values.find( + column => column.columnName.equals(ref.getName)) if (id.isDefined && (ref.getType.isDateTimeFamily || ref.getType.isStringFamily)) { pattern = desc.getPartitionDateFormat dataSchema.filter(_.name == String.valueOf(id.get.id)) @@ -208,12 +197,32 @@ class FilePruner( var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression]), Seq[PartitionDirectory]]() - override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + private def getFileStatusBySeg(seg: SegmentDirectory, fsc: FileStatusCache): SegmentDirectory = { + val path = new Path(toPath(seg.segmentName, seg.identifier)) + val fs = path.getFileSystem(session.sparkContext.hadoopConfiguration) + if (fs.isDirectory(path) && fs.exists(path)) { + val maybeStatuses = fsc.getLeafFiles(path) + if (maybeStatuses.isDefined) { + SegmentDirectory(seg.segmentName, seg.identifier, maybeStatuses.get) + } else { + val statuses = fs.listStatus(path) + fsc.putLeafFiles(path, statuses) + SegmentDirectory(seg.segmentName, seg.identifier, statuses) + } + } else { + logWarning(s"Segment path ${path.toString} not exists.") + SegmentDirectory(seg.segmentName, seg.identifier, Nil) + } + } + + override def listFiles(partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { if (cached.containsKey((partitionFilters, dataFilters))) { return cached.get((partitionFilters, dataFilters)) } require(isResolved) + val startTime = System.nanoTime val timePartitionFilters = getSpecFilter(dataFilters, timePartitionColumn) logInfo(s"Applying time partition filters: ${timePartitionFilters.mkString(",")}") @@ -223,19 +232,12 @@ class FilePruner( var selected = afterPruning("segment", timePartitionFilters, segmentDirs) { pruneSegments } - // QueryContextFacade.current().record("seg_pruning") - selected = selected.par.map { e => - val path = new Path(toPath(e.segmentName, e.identifier)) - val maybeStatuses = fsc.getLeafFiles(path) - if (maybeStatuses.isDefined) { - SegmentDirectory(e.segmentName, e.identifier, maybeStatuses.get) - } else { - val statuses = path.getFileSystem(session.sparkContext.hadoopConfiguration).listStatus(path) - fsc.putLeafFiles(path, statuses) - SegmentDirectory(e.segmentName, e.identifier, statuses) - } - }.toIterator.toSeq - // QueryContextFacade.current().record("fetch_file_status") + + // fetch segment directories info in parallel + selected = selected.par.map(seg => { + getFileStatusBySeg(seg, fsc) + }).filter(_.files.nonEmpty).seq + // shards pruning selected = afterPruning("shard", dataFilters, selected) { pruneShards @@ -246,6 +248,7 @@ class FilePruner( val totalFileSize = selected.flatMap(partition => partition.files).map(_.getLen).sum logInfo(s"totalFileSize is ${totalFileSize}") setShufflePartitions(totalFileSize, session) + logInfo(s"Files pruning in ${(System.nanoTime() - startTime).toDouble / 1000000} ms") if (selected.isEmpty) { val value = Seq.empty[PartitionDirectory] cached.put((partitionFilters, dataFilters), value) @@ -255,11 +258,12 @@ class FilePruner( cached.put((partitionFilters, dataFilters), value) value } - } - private def afterPruning(pruningType: String, specFilters: Seq[Expression], inputs: Seq[SegmentDirectory]) - (pruningFunc: (Seq[Expression], Seq[SegmentDirectory]) => Seq[SegmentDirectory]): Seq[SegmentDirectory] = { + private def afterPruning(pruningType: String, specFilters: Seq[Expression], + inputs: Seq[SegmentDirectory]) + (pruningFunc: (Seq[Expression], Seq[SegmentDirectory]) => + Seq[SegmentDirectory]): Seq[SegmentDirectory] = { if (specFilters.isEmpty) { inputs } else { @@ -281,9 +285,8 @@ class FilePruner( dataFilters.filter(_.references.subsetOf(AttributeSet(col))) } - private def pruneSegments( - filters: Seq[Expression], - segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = { + private def pruneSegments(filters: Seq[Expression], + segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = { val filteredStatuses = if (filters.isEmpty) { segDirs @@ -298,7 +301,8 @@ class FilePruner( val pruned = segDirs.filter { e => { val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange - SegFilters(tsRange.startValue, tsRange.endValue, pattern).foldFilter(reducedFilter) match { + SegFilters(tsRange.startValue, tsRange.endValue, pattern) + .foldFilter(reducedFilter) match { case Trivial(true) => true case Trivial(false) => false } @@ -312,7 +316,7 @@ class FilePruner( } private def pruneShards(filters: Seq[Expression], - segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = { + segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = { val filteredStatuses = if (layoutEntity.getShardByColumns.size() != 1) { segDirs } else { @@ -323,7 +327,8 @@ class FilePruner( val partitionNumber = segment.getCuboidShardNum(layoutEntity.getId).toInt require(partitionNumber > 0, "Shards num with shard by col should greater than 0.") - val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, partitionNumber) + val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, + partitionNumber) val selected = files.filter(f => { val partitionId = FilePruner.getPartitionId(f.getPath) @@ -331,7 +336,8 @@ class FilePruner( }) SegmentDirectory(segName, segIdentifier, selected) } - logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files).map(_.getPath.toString).mkString(";")) + logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files) + .map(_.getPath.toString).mkString(";")) pruned } filteredStatuses @@ -348,10 +354,9 @@ class FilePruner( override def refresh(): Unit = {} - private def getExpressionShards( - expr: Expression, - shardColumnName: String, - numShards: Int): BitSet = { + private def getExpressionShards(expr: Expression, + shardColumnName: String, + numShards: Int): BitSet = { def getShardNumber(attr: Attribute, v: Any): Int = { BucketingUtils.getBucketIdFromValue(attr, numShards, v) diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala index c3d089d..aaec4d5 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala @@ -36,7 +36,7 @@ import org.apache.kylin.common.KylinConfig import org.apache.kylin.query.monitor.SparderContextCanary import org.apache.kylin.spark.classloader.ClassLoaderUtils import org.apache.spark.{SparkConf, SparkContext, SparkEnv} -import org.apache.spark.sql.execution.datasource.KylinSourceStrategy +import org.apache.spark.sql.execution.datasource.{KylinSourceStrategy, ShardFileStatusCache} import org.apache.spark.sql.metrics.SparderMetricsListener import org.apache.spark.utils.YarnInfoFetcherUtils @@ -202,6 +202,9 @@ object SparderContext extends Logging { //monitor sparder SparderContextCanary.init() } + + // init FileStatusCache + ShardFileStatusCache.getFileStatusCache(getOriginalSparkSession) } } diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java index 3e50415..744922a 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java @@ -92,7 +92,7 @@ public class NBadQueryAndPushDownTest extends LocalWithSparkSessionTest { @Test public void testPushDownForFileNotExist() throws Exception { - final String sql = "select max(price) from test_kylin_fact"; + final String sql = "select max(ITEM_COUNT) from test_kylin_fact"; KylinConfig.getInstanceFromEnv().setProperty(PUSHDOWN_RUNNER_KEY, "org.apache.kylin.query.pushdown.PushDownRunnerSparkImpl"); try {