This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit b43fa19ae5ee0af889c8c263150ccffadc49e5a6
Author: fanfanAlice <41991994+fanfanal...@users.noreply.github.com>
AuthorDate: Fri Mar 17 19:22:24 2023 +0800

    KYLIN-5571 Optimize the procedure of pushing down the query
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   8 ++
 .../kylin/query/util/QueryInterruptChecker.java    |   0
 .../org/apache/kylin/query/util/QueryLimiter.java  |   0
 .../apache/kylin/query/util/SlowQueryDetector.java |   1 +
 .../hive/utils/TestResourceDetectUtilsByMock.scala |   2 +-
 .../kylin/query/pushdown/SparkSqlClient.scala      |  52 +++++++---
 .../pushdown/PushDownRunnerSparkImplTest.java      |  58 +++++++++++
 .../spark/sql/hive/utils/ResourceDetectUtils.scala | 108 +++++++++++++--------
 8 files changed, 176 insertions(+), 53 deletions(-)

diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 483de79997..1fc95ac5cd 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3934,4 +3934,12 @@ public abstract class KylinConfigBase implements 
Serializable {
     public int getBloomBuildColumnNvd() {
         return Integer.parseInt(getOptional("kylin.bloom.build.column.nvd", 
"200000"));
     }
+
+    public int getAutoShufflePartitionMultiple() {
+        return 
Integer.parseInt(getOptional("kylin.query.pushdown.auto-set-shuffle-partitions-multiple",
 "3"));
+    }
+
+    public int getAutoShufflePartitionTimeOut() {
+        return 
Integer.parseInt(getOptional("kylin.query.pushdown.auto-set-shuffle-partitions-timeout",
 "30"));
+    }
 }
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
similarity index 100%
rename from 
src/core-common/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
rename to 
src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryInterruptChecker.java
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/query/util/QueryLimiter.java 
b/src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryLimiter.java
similarity index 100%
rename from 
src/core-common/src/main/java/org/apache/kylin/query/util/QueryLimiter.java
rename to 
src/core-metadata/src/main/java/org/apache/kylin/query/util/QueryLimiter.java
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
similarity index 99%
rename from 
src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
rename to 
src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
index cd55a4d690..a2ca994c5f 100644
--- 
a/src/core-common/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/query/util/SlowQueryDetector.java
@@ -21,6 +21,7 @@ package org.apache.kylin.query.util;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.calcite.util.CancelFlag;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.scheduler.EventBusFactory;
diff --git 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala
 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala
index 164022af61..197dbe07cb 100644
--- 
a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala
+++ 
b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/sql/hive/utils/TestResourceDetectUtilsByMock.scala
@@ -21,8 +21,8 @@ package org.apache.spark.sql.hive.utils
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.common.SharedSparkSession
-import org.apache.spark.sql.execution.{FileSourceScanExec, 
LayoutFileSourceScanExec}
 import org.apache.spark.sql.execution.datasources.{FileIndex, 
HadoopFsRelation, PartitionDirectory}
+import org.apache.spark.sql.execution.{FileSourceScanExec, 
LayoutFileSourceScanExec}
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.scalamock.scalatest.MockFactory
 import org.scalatest.wordspec.AnyWordSpec
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 0c3662cb0a..2b09a05d66 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -20,6 +20,7 @@ package org.apache.kylin.query.pushdown
 
 import java.sql.Timestamp
 import java.util
+import java.util.concurrent.{Callable, Executors, TimeUnit, TimeoutException}
 import java.util.{UUID, List => JList}
 
 import org.apache.commons.lang3.StringUtils
@@ -42,10 +43,13 @@ import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.JavaConverters._
 import scala.collection.{immutable, mutable}
+import scala.concurrent.duration.Duration
 
 object SparkSqlClient {
   val DEFAULT_DB: String = "spark.sql.default.database"
 
+  val SHUFFLE_PARTITION: String = "spark.sql.shuffle.partitions"
+
   val logger: Logger = LoggerFactory.getLogger(classOf[SparkSqlClient])
 
   @deprecated
@@ -83,23 +87,49 @@ object SparkSqlClient {
     }
   }
 
-  private def autoSetShufflePartitions(df: DataFrame) = {
+  def autoSetShufflePartitions(df: DataFrame): Unit = {
     val config = KylinConfig.getInstanceFromEnv
-    if (config.isAutoSetPushDownPartitions) {
-      try {
+    val oriShufflePartition = 
df.sparkSession.sessionState.conf.getConfString(SHUFFLE_PARTITION).toInt
+    val isSkip = !config.isAutoSetPushDownPartitions || 
!ResourceDetectUtils.checkPartitionFilter(df.queryExecution.sparkPlan)
+    if (isSkip) {
+      logger.info(s"Skip auto set $SHUFFLE_PARTITION, use origin value 
$oriShufflePartition")
+      return
+    }
+    val isConcurrency = config.isConcurrencyFetchDataSourceSize
+    val executor = Executors.newSingleThreadExecutor()
+    val timeOut = config.getAutoShufflePartitionTimeOut
+    val future = executor.submit(new Callable[Unit] {
+      override def call(): Unit = {
         val basePartitionSize = config.getBaseShufflePartitionSize
         val paths = ResourceDetectUtils.getPaths(df.queryExecution.sparkPlan)
-        val sourceTableSize = 
ResourceDetectUtils.getResourceSize(SparderEnv.getHadoopConfiguration(),
-          config.isConcurrencyFetchDataSourceSize, paths: _*) + "b"
+        var sourceTableSize: String = ""
+        if (isConcurrency) {
+          sourceTableSize = 
ResourceDetectUtils.getResourceSizeWithTimeoutByConcurrency(
+            Duration(timeOut, TimeUnit.SECONDS), 
SparderEnv.getHadoopConfiguration(), paths: _*) + "b"
+        } else {
+          sourceTableSize = 
ResourceDetectUtils.getResourceSizBySerial(SparderEnv.getHadoopConfiguration(), 
paths: _*) + "b"
+        }
         val partitions = Math.max(1, JavaUtils.byteStringAsMb(sourceTableSize) 
/ basePartitionSize).toString
-        
df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
 partitions)
+        df.sparkSession.sessionState.conf.setLocalProperty(SHUFFLE_PARTITION, 
partitions)
         QueryContext.current().setShufflePartitions(partitions.toInt)
-        logger.info(s"Auto set spark.sql.shuffle.partitions $partitions, " +
+        logger.info(s"Auto set $SHUFFLE_PARTITION $partitions, " +
           s"sourceTableSize $sourceTableSize, basePartitionSize 
$basePartitionSize")
-      } catch {
-        case e: Throwable =>
-          logger.error("Auto set spark.sql.shuffle.partitions failed.", e)
       }
+    })
+    try {
+      future.get(timeOut, TimeUnit.SECONDS)
+    } catch {
+      case e: TimeoutException =>
+        val oriShufflePartition = 
df.sparkSession.sessionState.conf.getConfString(SHUFFLE_PARTITION).toInt
+        val partitions = oriShufflePartition * 
config.getAutoShufflePartitionMultiple
+        df.sparkSession.sessionState.conf.setLocalProperty(SHUFFLE_PARTITION, 
partitions.toString)
+        QueryContext.current().setShufflePartitions(partitions)
+        logger.info(s"Auto set shuffle partitions timeout. set 
$SHUFFLE_PARTITION $partitions.")
+      case e: Exception =>
+        logger.error(s"Auto set $SHUFFLE_PARTITION failed.", e)
+        throw e
+    } finally {
+      executor.shutdownNow()
     }
   }
 
@@ -145,7 +175,7 @@ object SparkSqlClient {
         throw e
     } finally {
       
QueryContext.current().setExecutionID(QueryToExecutionIDCache.getQueryExecutionID(QueryContext.current().getQueryId))
-      
df.sparkSession.sessionState.conf.setLocalProperty("spark.sql.shuffle.partitions",
 null)
+      df.sparkSession.sessionState.conf.setLocalProperty(SHUFFLE_PARTITION, 
null)
       HadoopUtil.setCurrentConfiguration(null)
     }
   }
diff --git 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java
 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java
index 06f8c2aec1..baebc21bfa 100644
--- 
a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java
+++ 
b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/pushdown/PushDownRunnerSparkImplTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
@@ -189,4 +190,61 @@ public class PushDownRunnerSparkImplTest extends 
NLocalFileMetadataTestCase {
         String sql = "select * from TEST_KYLIN_FACT";
         SparkSqlClient.executeSql(ss, sql, UUID.randomUUID(), "tpch");
     }
+
+    @Test
+    public void testAutoSetShufflePartitionConcurrency() throws Exception {
+        try (SparkSubmitter.OverriddenSparkSession ignored = 
SparkSubmitter.getInstance().overrideSparkSession(ss)) {
+            ss.sql("DROP TABLE if exists mypartitionedtable");
+            ss.sql("CREATE TABLE mypartitionedtable (col1 INT, col2 INT) USING 
PARQUET PARTITIONED BY (col3 STRING)");
+            ss.sql("INSERT INTO mypartitionedtable PARTITION 
(col3='partitionvalue') SELECT TRANS_ID, ORDER_ID FROM test_kylin_fact");
+            ss.sql("REFRESH TABLE mypartitionedtable");
+            KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+            
kylinconfig.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-timeout",
 "0");
+            PushdownResponse timeoutRes = 
SparkSubmitter.getInstance().submitPushDownTask(
+                    "select col1 from mypartitionedtable where 
col3='partitionvalue' limit 1", "tpch");
+            Assert.assertEquals(1, timeoutRes.getColumns().size());
+            Assert.assertEquals(1, timeoutRes.getSize());
+            
kylinconfig.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-timeout",
 "30");
+            PushdownResponse successRes = 
SparkSubmitter.getInstance().submitPushDownTask(
+                    "select col1 from mypartitionedtable where 
col3='partitionvalue' limit 1", "tpch");
+            Assert.assertEquals(1, successRes.getColumns().size());
+            Assert.assertEquals(1, successRes.getSize());
+            ss.sql("DROP TABLE mypartitionedtable");
+        }
+    }
+
+    @Test
+    public void testAutoSetShufflePartition() throws Exception {
+        try (SparkSubmitter.OverriddenSparkSession ignored = 
SparkSubmitter.getInstance().overrideSparkSession(ss)) {
+            ss.sql("DROP TABLE if exists mypartitionedtable");
+            ss.sql("CREATE TABLE mypartitionedtable (col1 INT, col2 INT) USING 
PARQUET PARTITIONED BY (col3 STRING)");
+            ss.sql("INSERT INTO mypartitionedtable PARTITION 
(col3='partitionvalue') SELECT TRANS_ID, ORDER_ID FROM test_kylin_fact");
+            ss.sql("REFRESH TABLE mypartitionedtable");
+            KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+            
kylinconfig.setProperty("kylin.job.concurrency-fetch-datasource-size-enabled", 
"false");
+            PushdownResponse timeoutRes = 
SparkSubmitter.getInstance().submitPushDownTask(
+                    "select col1 from mypartitionedtable where 
col3='partitionvalue' limit 1", "tpch");
+            Assert.assertEquals(1, timeoutRes.getColumns().size());
+            Assert.assertEquals(1, timeoutRes.getSize());
+            
kylinconfig.setProperty("kylin.job.concurrency-fetch-datasource-size-enabled", 
"true");
+            PushdownResponse successRes = 
SparkSubmitter.getInstance().submitPushDownTask(
+                    "select col1 from mypartitionedtable where 
col3='partitionvalue' limit 1", "tpch");
+            Assert.assertEquals(1, successRes.getColumns().size());
+            Assert.assertEquals(1, successRes.getSize());
+            ss.sql("DROP TABLE mypartitionedtable");
+        }
+    }
+
+    @Test
+    public void testAutoSetShufflePartitionOff() throws Exception {
+        try (SparkSubmitter.OverriddenSparkSession ignored = 
SparkSubmitter.getInstance().overrideSparkSession(ss)) {
+            KylinConfig kylinconfig = KylinConfig.getInstanceFromEnv();
+            
kylinconfig.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-enabled",
 "false");
+            PushdownResponse resp = SparkSubmitter.getInstance()
+                    .submitPushDownTask("select count(order_id) from 
test_kylin_fact limit 1", "tpch");
+            Assert.assertEquals(1, resp.getColumns().size());
+            Assert.assertEquals(1, resp.getSize());
+            
kylinconfig.setProperty("kylin.query.pushdown.auto-set-shuffle-partitions-enabled",
 "true");
+        }
+    }
 }
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
index 495f3298df..e7f97a5655 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/hive/utils/ResourceDetectUtils.scala
@@ -18,21 +18,15 @@
 
 package org.apache.spark.sql.hive.utils
 
-import java.io.IOException
-import java.nio.charset.Charset
-import java.util.concurrent.ForkJoinPool
-import java.util.concurrent.atomic.AtomicLong
-import java.util.{Map => JMap}
-
 import com.google.gson.Gson
 import com.google.gson.reflect.TypeToken
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.util.HadoopUtil
+import org.apache.kylin.guava30.shaded.common.collect.Maps
 import org.apache.kylin.metadata.cube.model.{DimensionRangeInfo, LayoutEntity}
 import org.apache.kylin.query.util.QueryInterruptChecker
-import org.apache.kylin.guava30.shaded.common.collect.Maps
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution._
@@ -41,10 +35,16 @@ import org.apache.spark.sql.execution.datasources.FileIndex
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec}
 import org.apache.spark.sql.hive.execution.HiveTableScanExec
 import org.apache.spark.sql.sources.NBaseRelation
+import org.apache.spark.util.ThreadUtils
 
+import java.io.IOException
+import java.nio.charset.Charset
+import java.util.concurrent.Executors
+import java.util.{Map => JMap}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.collection.parallel.ForkJoinTaskSupport
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
 
 object ResourceDetectUtils extends Logging {
   private val json = new Gson()
@@ -100,6 +100,22 @@ object ResourceDetectUtils extends Logging {
     paths
   }
 
+  def checkPartitionFilter(plan: SparkPlan): Boolean = {
+    var isIncludeFilter = false
+    plan.foreach {
+      case plan: FileSourceScanExec =>
+        isIncludeFilter ||= plan.partitionFilters.nonEmpty
+      case plan: LayoutFileSourceScanExec =>
+        isIncludeFilter ||= plan.partitionFilters.nonEmpty
+      case plan: HiveTableScanExec =>
+        if (plan.relation.isPartitioned) {
+          isIncludeFilter ||= plan.rawPartitions.nonEmpty
+        }
+      case _ =>
+    }
+    isIncludeFilter
+  }
+
   def getPartitions(plan: SparkPlan): String = {
     val leafNodePartitionsLengthMap: mutable.Map[String, Int] = mutable.Map()
     var pNum = 0
@@ -147,47 +163,57 @@ object ResourceDetectUtils extends Logging {
     false
   }
 
-  def getResourceSizeConcurrency(configuration: Configuration, paths: Path*): 
Long = {
+  def getResourceSize(configuration: Configuration, 
isConcurrencyFetchDataSourceSize: Boolean, paths: Path*): Long = {
+    val resourceSize = {
+      if (isConcurrencyFetchDataSourceSize) {
+        getResourceSizeWithTimeoutByConcurrency(Duration.Inf, configuration, 
paths: _*)
+      } else {
+        getResourceSizBySerial(configuration, paths: _*)
+      }
+    }
+    resourceSize
+  }
+
+  def getResourceSizBySerial(configuration: Configuration, paths: Path*): Long 
= {
+    paths.map(path => {
+      QueryInterruptChecker.checkThreadInterrupted(errorMsgLog, "Current step: 
get resource size.")
+      val fs = path.getFileSystem(configuration)
+      if (fs.exists(path)) {
+        HadoopUtil.getContentSummary(fs, path).getLength
+      } else {
+        0L
+      }
+    }).sum
+  }
+
+  def getResourceSizeWithTimeoutByConcurrency(timeout: Duration, 
configuration: Configuration, paths: Path*): Long = {
     val kylinConfig = KylinConfig.getInstanceFromEnv
     val threadNumber = 
kylinConfig.getConcurrencyFetchDataSourceSizeThreadNumber
     logInfo(s"Get resource size concurrency, thread number is $threadNumber")
-    val forkJoinPool = new ForkJoinPool(threadNumber)
-    val parallel = paths.par
-    val sum = new AtomicLong()
+    val executor = Executors.newFixedThreadPool(threadNumber)
+    implicit val executionContext: ExecutionContextExecutor = 
ExecutionContext.fromExecutor(executor)
+    val futures: Seq[Future[Long]] = getResourceSize(configuration, 
executionContext, paths: _*)
     try {
-      parallel.tasksupport = new ForkJoinTaskSupport(forkJoinPool)
-      parallel.foreach {
-        path => {
-          val fs = path.getFileSystem(configuration)
-          if (fs.exists(path)) {
-            sum.addAndGet(HadoopUtil.getContentSummaryFromHdfsKylinConfig(fs, 
path, kylinConfig).getLength)
-          }
-        }
-      }
-    }
-    finally {
-      forkJoinPool.shutdownNow()
+      val combinedFuture = Future.sequence(futures)
+      val results: Seq[Long] = ThreadUtils.awaitResult(combinedFuture, timeout)
+      results.sum
+    } finally {
+      executor.shutdownNow()
     }
-    sum.get()
   }
 
-
-  def getResourceSize(configuration: Configuration, 
isConcurrencyFetchDataSourceSize: Boolean, paths: Path*): Long = {
-    val resourceSize = {
-      if (isConcurrencyFetchDataSourceSize) {
-        getResourceSizeConcurrency(configuration, paths: _*)
-      } else {
-        paths.map(path => {
-          val fs = path.getFileSystem(configuration)
-          if (fs.exists(path)) {
-            HadoopUtil.getContentSummary(fs, path).getLength
-          } else {
-            0L
-          }
-        }).sum
-      }
+  def getResourceSize(configuration: Configuration, executionContext: 
ExecutionContextExecutor, paths: Path*): Seq[Future[Long]] = {
+    val kylinConfig = KylinConfig.getInstanceFromEnv
+    paths.map { path =>
+      Future {
+        val fs = path.getFileSystem(configuration)
+        if (fs.exists(path)) {
+          HadoopUtil.getContentSummaryFromHdfsKylinConfig(fs, path, 
kylinConfig).getLength
+        } else {
+          0L
+        }
+      }(executionContext)
     }
-    resourceSize
   }
 
   def getResourceSize(isConcurrencyFetchDataSourceSize: Boolean, paths: 
Path*): Long = {

Reply via email to