This is an automated email from the ASF dual-hosted git repository. zhangzc pushed a commit to branch kylin-soft-affinity-local-cache in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ec4103306c67e323732ac3bc2368f8dfb0de0c73 Author: Zhichao Zhang <zhan...@apache.org> AuthorDate: Sat Sep 25 23:44:38 2021 +0800 Pre-init KylinCacheFileSystem to fix s3a issue --- .../org/apache/kylin/common/KylinConfigBase.java | 7 ++++- .../sql/execution/datasource/FilePruner.scala | 9 +++++++ .../org/apache/spark/sql/SparderContext.scala | 31 +++++++++++----------- 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 710f8e18e4..361ba30403 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -37,6 +37,7 @@ import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.text.StrSubstitutor; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.kylin.common.annotation.ConfigTag; @@ -315,6 +316,10 @@ public abstract class KylinConfigBase implements Serializable { } public String getHdfsWorkingDirectory() { + return getHdfsWorkingDirectoryInternal(HadoopUtil.getCurrentConfiguration()); + } + + public String getHdfsWorkingDirectoryInternal(Configuration hadoopConf) { if (cachedHdfsWorkingDirectory != null) { return cachedHdfsWorkingDirectory; } @@ -326,7 +331,7 @@ public abstract class KylinConfigBase implements Serializable { throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + root); try { - FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration()); + FileSystem fs = path.getFileSystem(hadoopConf); path = fs.makeQualified(path); } catch (IOException e) { throw new RuntimeException(e); diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index 0c5cfbff7b..34fc967c29 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -211,15 +211,20 @@ class FilePruner(cubeInstance: CubeInstance, var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression]), Seq[PartitionDirectory]]() private def getFileStatusBySeg(seg: SegmentDirectory, fsc: FileStatusCache): SegmentDirectory = { + var startT = System.currentTimeMillis() val path = new Path(toPath(seg.segmentName, seg.identifier)) val fs = path.getFileSystem(session.sparkContext.hadoopConfiguration) + logInfo(s"Get segment filesystem: ${System.currentTimeMillis() - startT}") + startT = System.currentTimeMillis() if (fs.isDirectory(path) && fs.exists(path)) { val maybeStatuses = fsc.getLeafFiles(path) if (maybeStatuses.isDefined) { + logInfo(s"Get segment status from cache: ${System.currentTimeMillis() - startT}") SegmentDirectory(seg.segmentName, seg.identifier, maybeStatuses.get) } else { val statuses = fs.listStatus(path) fsc.putLeafFiles(path, statuses) + logInfo(s"Get segment status and cache: ${System.currentTimeMillis() - startT}") SegmentDirectory(seg.segmentName, seg.identifier, statuses) } } else { @@ -239,17 +244,21 @@ class FilePruner(cubeInstance: CubeInstance, val timePartitionFilters = getSegmentFilter(dataFilters, timePartitionColumn) logInfo(s"Applying time partition filters: ${timePartitionFilters.mkString(",")}") + var startT = System.currentTimeMillis() val fsc = ShardFileStatusCache.getFileStatusCache(session) + logInfo(s"Get file status cache: ${System.currentTimeMillis() - startT}") // segment pruning var selected = afterPruning("segment", timePartitionFilters, segmentDirs) { pruneSegments } + startT = System.currentTimeMillis() // fetch segment directories info in parallel selected = selected.par.map(seg => { getFileStatusBySeg(seg, fsc) }).filter(_.files.nonEmpty).seq + logInfo(s"Get segment status: ${System.currentTimeMillis() - startT}") // shards pruning selected = afterPruning("shard", dataFilters, selected) { diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala index faa921230d..e6b73d66cc 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala @@ -18,30 +18,28 @@ package org.apache.spark.sql -import java.lang.{Boolean => JBoolean, String => JString} -import java.nio.file.Paths - -import org.apache.spark.memory.MonitorEnv -import org.apache.spark.util.Utils -import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} -import org.apache.kylin.query.UdfManager -import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.KylinSession._ -import java.util.concurrent.atomic.AtomicReference - import org.apache.commons.io.FileUtils import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.util.ToolUtil +import org.apache.kylin.query.UdfManager import org.apache.kylin.query.monitor.SparderContextCanary import org.apache.kylin.spark.classloader.ClassLoaderUtils import org.apache.spark.deploy.StandaloneAppClient -import org.apache.spark.sql.SparderContext.master_app_url -import org.apache.spark.{SparkConf, SparkContext, SparkEnv} +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MonitorEnv +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} +import org.apache.spark.sql.KylinSession._ +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasource.{KylinSourceStrategy, ShardFileStatusCache} import org.apache.spark.sql.metrics.SparderMetricsListener +import org.apache.spark.util.Utils import org.apache.spark.utils.YarnInfoFetcherUtils +import org.apache.spark.{SparkConf, SparkContext, SparkEnv} + +import java.lang.{Boolean => JBoolean, String => JString} +import java.nio.file.Paths +import java.util.concurrent.atomic.AtomicReference // scalastyle:off object SparderContext extends Logging { @@ -192,6 +190,9 @@ object SparderContext extends Logging { case _ => master_app_url = YarnInfoFetcherUtils.getTrackingUrl(appid) } + + // pre-init FileSystem, fix s3a issue + kylinConf.getHdfsWorkingDirectoryInternal(spark.sparkContext.hadoopConfiguration) } catch { case throwable: Throwable => logError("Error for initializing spark ", throwable)