This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b43fa19ae5ee0af889c8c263150ccffadc49e5a6 Author: fanfanAlice <41991994+fanfanal...@users.noreply.github.com> AuthorDate: Fri Mar 17 19:22:24 2023 +0800 KYLIN-5571 Optimize the procedure of pushing down the query --- .../org/apache/kylin/common/KylinConfigBase.java | 8 ++ .../kylin/query/util/QueryInterruptChecker.java | 0 .../org/apache/kylin/query/util/QueryLimiter.java | 0 .../apache/kylin/query/util/SlowQueryDetector.java | 1 + .../hive/utils/TestResourceDetectUtilsByMock.scala | 2 +- .../kylin/query/pushdown/SparkSqlClient.scala | 52 +++++++--- .../pushdown/PushDownRunnerSparkImplTest.java | 58 +++++++++++ .../spark/sql/hive/utils/ResourceDetectUtils.scala | 108 +++++++++++++-------- 8 files changed, 176 insertions(+), 53 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 483de79997..1fc95ac5cd 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3934,4 +3934,12 @@ public abstract class KylinConfigBase implements Serializable { public int getBloomBuildColumnNvd() { return Integer.parseInt(getOptional("kylin.bloom.build.column.nvd", "200000")); } + + public int getAutoShufflePartitionMultiple() { + return Integer.parseInt(getOptional("kylin.query.pushdown.auto-set-shuffle-partitions-multiple", "3")); + } + + public int getAutoShufflePartitionTimeOut() { + return Integer.parseInt(getOptional("kylin.query.pushdown.auto-set-shuffle-partitions-timeout", "30")); + } } diff --git a/src/core-common/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java similarity index 100% rename from src/core-common/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java rename to src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java diff --git a/src/core-common/src/main/java/org/apache/kylin/query/util/QueryLimiter.java b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryLimiter.java similarity index 100% rename from src/core-common/src/main/java/org/apache/kylin/query/util/QueryLimiter.java rename to src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryLimiter.java diff --git a/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java similarity index 99% rename from src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java rename to src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java index cd55a4d690..a2ca994c5f 100644 --- a/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java @@ -21,6 +21,7 @@ package org.apache.kylin.query.util; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import org.apache.calcite.util.CancelFlag; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.scheduler.EventBusFactory; diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala index 164022af61..197dbe07cb 100644 --- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala +++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala @@ -21,8 +21,8 @@ package org.apache.spark.sql.hive.utils import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.common.SharedSparkSession -import org.apache.spark.sql.execution.{FileSourceScanExec, LayoutFileSourceScanExec} import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, PartitionDirectory} +import org.apache.spark.sql.execution.{FileSourceScanExec, LayoutFileSourceScanExec} import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.scalamock.scalatest.MockFactory import org.scalatest.wordspec.AnyWordSpec diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala index 0c3662cb0a..2b09a05d66 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala @@ -20,6 +20,7 @@ package org.apache.kylin.query.pushdown import java.sql.Timestamp import java.util +import java.util.concurrent.{Callable, Executors, TimeUnit, TimeoutException} import java.util.{UUID, List => JList} import org.apache.commons.lang3.StringUtils @@ -42,10 +43,13 @@ import org.slf4j.{Logger, LoggerFactory} import scala.collection.JavaConverters._ import scala.collection.{immutable, mutable} +import scala.concurrent.duration.Duration object SparkSqlClient { val DEFAULT_DB: String = "spark.sql.default.database" + val SHUFFLE_PARTITION: String = "spark.sql.shuffle.partitions" + val logger: Logger = LoggerFactory.getLogger(classOf[SparkSqlClient]) @deprecated @@ -83,23 +87,49 @@ object SparkSqlClient { } } - private def autoSetShufflePartitions(df: DataFrame) = { + def autoSetShufflePartitions(df: DataFrame): Unit = { val config = KylinConfig.getInstanceFromEnv - if (config.isAutoSetPushDownPartitions) { - try { + val oriShufflePartition = df.sparkSession.sessionState.conf.getConfString(SHUFFLE_PARTITION).toInt + val isSkip = !config.isAutoSetPushDownPartitions || !ResourceDetectUtils.checkPartitionFilter(df.queryExecution.sparkPlan) + if (isSkip) { + logger.info(s"Skip auto set $SHUFFLE_PARTITION, use origin value $oriShufflePartition") + return + } + val isConcurrency = config.isConcurrencyFetchDataSourceSize + val executor = Executors.newSingleThreadExecutor() + val timeOut = config.getAutoShufflePartitionTimeOut + val future = executor.submit(new Callable[Unit] { + override def call(): Unit = { val basePartitionSize = config.getBaseShufflePartitionSize val paths = ResourceDetectUtils.getPaths(df.queryExecution.sparkPlan) - val sourceTableSize = ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(), - config.isConcurrencyFetchDataSourceSize, paths: _*) + "b" + var sourceTableSize: String = "" + if (isConcurrency) { + sourceTableSize = ResourceDetectUtils.getResourceSizeWithTimeoutByConcurrency( + Duration(timeOut, TimeUnit.SECONDS), SparderEnv.getHadoopConfiguration(), paths: _*) + "b" + } else { + sourceTableSize = ResourceDetectUtils.getResourceSizBySerial(SparderEnv.getHadoopConfiguration(), paths: _*) + "b" + } val partitions = Math.max(1, JavaUtils.byteStringAsMb(sourceTableSize) / basePartitionSize).toString - df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", partitions) + df.sparkSession.sessionState.conf.setLocalProperty(SHUFFLE_PARTITION, partitions) QueryContext.current().setShufflePartitions(partitions.toInt) - logger.info(s"Auto set spark.sql.shuffle.partitions $partitions, " + + logger.info(s"Auto set $SHUFFLE_PARTITION $partitions, " + s"sourceTableSize $sourceTableSize, basePartitionSize $basePartitionSize") - } catch { - case e: Throwable => - logger.error("Auto set spark.sql.shuffle.partitions failed.", e) } + }) + try { + future.get(timeOut, TimeUnit.SECONDS) + } catch { + case e: TimeoutException => + val oriShufflePartition = df.sparkSession.sessionState.conf.getConfString(SHUFFLE_PARTITION).toInt + val partitions = oriShufflePartition * config.getAutoShufflePartitionMultiple + df.sparkSession.sessionState.conf.setLocalProperty(SHUFFLE_PARTITION, partitions.toString) + QueryContext.current().setShufflePartitions(partitions) + logger.info(s"Auto set shuffle partitions timeout. set $SHUFFLE_PARTITION $partitions.") + case e: Exception => + logger.error(s"Auto set $SHUFFLE_PARTITION failed.", e) + throw e + } finally { + executor.shutdownNow() } } @@ -145,7 +175,7 @@ object SparkSqlClient { throw e } finally { QueryContext.current().setExecutionID(QueryToExecutionIDCache.getQueryExecutionID(QueryContext.current().getQueryId)) - df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions", null) + df.sparkSession.sessionState.conf.setLocalProperty(SHUFFLE_PARTITION, null) HadoopUtil.setCurrentConfiguration(null) } } diff --git a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java index 06f8c2aec1..baebc21bfa 100644 --- a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java +++ b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.UUID; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.querymeta.SelectedColumnMeta; @@ -189,4 +190,61 @@ public class PushDownRunnerSparkImplTest extends NLocalFileMetadataTestCase { String sql = "select * from TEST_KYLIN_FACT"; SparkSqlClient.executeSql(ss, sql, UUID.randomUUID(), "tpch"); } + + @Test + public void testAutoSetShufflePartitionConcurrency() throws Exception { + try (SparkSubmitter.OverriddenSparkSession ignored = SparkSubmitter.getInstance().overrideSparkSession(ss)) { + ss.sql("DROP TABLE if exists mypartitionedtable"); + ss.sql("CREATE TABLE mypartitionedtable (col1 INT, col2 INT) USING PARQUET PARTITIONED BY (col3 STRING)"); + ss.sql("INSERT INTO mypartitionedtable PARTITION (col3='partitionvalue') SELECT TRANS_ID, ORDER_ID FROM test_kylin_fact"); + ss.sql("REFRESH TABLE mypartitionedtable"); + KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv(); + kylinconfig.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-timeout", "0"); + PushdownResponse timeoutRes = SparkSubmitter.getInstance().submitPushDownTask( + "select col1 from mypartitionedtable where col3='partitionvalue' limit 1", "tpch"); + Assert.assertEquals(1, timeoutRes.getColumns().size()); + Assert.assertEquals(1, timeoutRes.getSize()); + kylinconfig.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-timeout", "30"); + PushdownResponse successRes = SparkSubmitter.getInstance().submitPushDownTask( + "select col1 from mypartitionedtable where col3='partitionvalue' limit 1", "tpch"); + Assert.assertEquals(1, successRes.getColumns().size()); + Assert.assertEquals(1, successRes.getSize()); + ss.sql("DROP TABLE mypartitionedtable"); + } + } + + @Test + public void testAutoSetShufflePartition() throws Exception { + try (SparkSubmitter.OverriddenSparkSession ignored = SparkSubmitter.getInstance().overrideSparkSession(ss)) { + ss.sql("DROP TABLE if exists mypartitionedtable"); + ss.sql("CREATE TABLE mypartitionedtable (col1 INT, col2 INT) USING PARQUET PARTITIONED BY (col3 STRING)"); + ss.sql("INSERT INTO mypartitionedtable PARTITION (col3='partitionvalue') SELECT TRANS_ID, ORDER_ID FROM test_kylin_fact"); + ss.sql("REFRESH TABLE mypartitionedtable"); + KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv(); + kylinconfig.setProperty("kylin.job.concurrency-fetch-datasource-size-enabled", "false"); + PushdownResponse timeoutRes = SparkSubmitter.getInstance().submitPushDownTask( + "select col1 from mypartitionedtable where col3='partitionvalue' limit 1", "tpch"); + Assert.assertEquals(1, timeoutRes.getColumns().size()); + Assert.assertEquals(1, timeoutRes.getSize()); + kylinconfig.setProperty("kylin.job.concurrency-fetch-datasource-size-enabled", "true"); + PushdownResponse successRes = SparkSubmitter.getInstance().submitPushDownTask( + "select col1 from mypartitionedtable where col3='partitionvalue' limit 1", "tpch"); + Assert.assertEquals(1, successRes.getColumns().size()); + Assert.assertEquals(1, successRes.getSize()); + ss.sql("DROP TABLE mypartitionedtable"); + } + } + + @Test + public void testAutoSetShufflePartitionOff() throws Exception { + try (SparkSubmitter.OverriddenSparkSession ignored = SparkSubmitter.getInstance().overrideSparkSession(ss)) { + KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv(); + kylinconfig.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-enabled", "false"); + PushdownResponse resp = SparkSubmitter.getInstance() + .submitPushDownTask("select count(order_id) from test_kylin_fact limit 1", "tpch"); + Assert.assertEquals(1, resp.getColumns().size()); + Assert.assertEquals(1, resp.getSize()); + kylinconfig.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-enabled", "true"); + } + } } diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala index 495f3298df..e7f97a5655 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala @@ -18,21 +18,15 @@ package org.apache.spark.sql.hive.utils -import java.io.IOException -import java.nio.charset.Charset -import java.util.concurrent.ForkJoinPool -import java.util.concurrent.atomic.AtomicLong -import java.util.{Map => JMap} - import com.google.gson.Gson import com.google.gson.reflect.TypeToken import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.kylin.common.KylinConfig import org.apache.kylin.common.util.HadoopUtil +import org.apache.kylin.guava30.shaded.common.collect.Maps import org.apache.kylin.metadata.cube.model.{DimensionRangeInfo, LayoutEntity} import org.apache.kylin.query.util.QueryInterruptChecker -import org.apache.kylin.guava30.shaded.common.collect.Maps import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution._ @@ -41,10 +35,16 @@ import org.apache.spark.sql.execution.datasources.FileIndex import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec} import org.apache.spark.sql.hive.execution.HiveTableScanExec import org.apache.spark.sql.sources.NBaseRelation +import org.apache.spark.util.ThreadUtils +import java.io.IOException +import java.nio.charset.Charset +import java.util.concurrent.Executors +import java.util.{Map => JMap} import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.parallel.ForkJoinTaskSupport +import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future} object ResourceDetectUtils extends Logging { private val json = new Gson() @@ -100,6 +100,22 @@ object ResourceDetectUtils extends Logging { paths } + def checkPartitionFilter(plan: SparkPlan): Boolean = { + var isIncludeFilter = false + plan.foreach { + case plan: FileSourceScanExec => + isIncludeFilter ||= plan.partitionFilters.nonEmpty + case plan: LayoutFileSourceScanExec => + isIncludeFilter ||= plan.partitionFilters.nonEmpty + case plan: HiveTableScanExec => + if (plan.relation.isPartitioned) { + isIncludeFilter ||= plan.rawPartitions.nonEmpty + } + case _ => + } + isIncludeFilter + } + def getPartitions(plan: SparkPlan): String = { val leafNodePartitionsLengthMap: mutable.Map[String, Int] = mutable.Map() var pNum = 0 @@ -147,47 +163,57 @@ object ResourceDetectUtils extends Logging { false } - def getResourceSizeConcurrency(configuration: Configuration, paths: Path*): Long = { + def getResourceSize(configuration: Configuration, isConcurrencyFetchDataSourceSize: Boolean, paths: Path*): Long = { + val resourceSize = { + if (isConcurrencyFetchDataSourceSize) { + getResourceSizeWithTimeoutByConcurrency(Duration.Inf, configuration, paths: _*) + } else { + getResourceSizBySerial(configuration, paths: _*) + } + } + resourceSize + } + + def getResourceSizBySerial(configuration: Configuration, paths: Path*): Long = { + paths.map(path => { + QueryInterruptChecker.checkThreadInterrupted(errorMsgLog, "Current step: get resource size.") + val fs = path.getFileSystem(configuration) + if (fs.exists(path)) { + HadoopUtil.getContentSummary(fs, path).getLength + } else { + 0L + } + }).sum + } + + def getResourceSizeWithTimeoutByConcurrency(timeout: Duration, configuration: Configuration, paths: Path*): Long = { val kylinConfig = KylinConfig.getInstanceFromEnv val threadNumber = kylinConfig.getConcurrencyFetchDataSourceSizeThreadNumber logInfo(s"Get resource size concurrency, thread number is $threadNumber") - val forkJoinPool = new ForkJoinPool(threadNumber) - val parallel = paths.par - val sum = new AtomicLong() + val executor = Executors.newFixedThreadPool(threadNumber) + implicit val executionContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor) + val futures: Seq[Future[Long]] = getResourceSize(configuration, executionContext, paths: _*) try { - parallel.tasksupport = new ForkJoinTaskSupport(forkJoinPool) - parallel.foreach { - path => { - val fs = path.getFileSystem(configuration) - if (fs.exists(path)) { - sum.addAndGet(HadoopUtil.getContentSummaryFromHdfsKylinConfig(fs, path, kylinConfig).getLength) - } - } - } - } - finally { - forkJoinPool.shutdownNow() + val combinedFuture = Future.sequence(futures) + val results: Seq[Long] = ThreadUtils.awaitResult(combinedFuture, timeout) + results.sum + } finally { + executor.shutdownNow() } - sum.get() } - - def getResourceSize(configuration: Configuration, isConcurrencyFetchDataSourceSize: Boolean, paths: Path*): Long = { - val resourceSize = { - if (isConcurrencyFetchDataSourceSize) { - getResourceSizeConcurrency(configuration, paths: _*) - } else { - paths.map(path => { - val fs = path.getFileSystem(configuration) - if (fs.exists(path)) { - HadoopUtil.getContentSummary(fs, path).getLength - } else { - 0L - } - }).sum - } + def getResourceSize(configuration: Configuration, executionContext: ExecutionContextExecutor, paths: Path*): Seq[Future[Long]] = { + val kylinConfig = KylinConfig.getInstanceFromEnv + paths.map { path => + Future { + val fs = path.getFileSystem(configuration) + if (fs.exists(path)) { + HadoopUtil.getContentSummaryFromHdfsKylinConfig(fs, path, kylinConfig).getLength + } else { + 0L + } + }(executionContext) } - resourceSize } def getResourceSize(isConcurrencyFetchDataSourceSize: Boolean, paths: Path*): Long = {