This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new a4a8480 KYLIN-4888, Performance optimization of union query with spark engine a4a8480 is described below commit a4a8480a05449b7ab19c7eace096dc3e39697dcb Author: feng.zhu <fish...@outlook.com> AuthorDate: Wed Jan 27 20:06:28 2021 +0800 KYLIN-4888, Performance optimization of union query with spark engine --- .../apache/kylin/query/runtime/CalciteToSparkPlaner.scala | 8 ++++---- .../scala/org/apache/kylin/query/runtime/SparkEngine.java | 1 - .../org/apache/kylin/query/runtime/plans/UnionPlan.scala | 13 +++++-------- .../main/java/org/apache/kylin/query/exec/SparkExec.java | 4 ---- 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala index 4d966d9..4642b52 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/CalciteToSparkPlaner.scala @@ -23,8 +23,8 @@ import java.util import org.apache.kylin.shaded.com.google.common.collect.Lists import org.apache.calcite.DataContext import org.apache.calcite.rel.{RelNode, RelVisitor} -import org.apache.kylin.query.relnode.{OLAPAggregateRel, OLAPFilterRel, OLAPJoinRel, OLAPLimitRel, OLAPProjectRel, OLAPSortRel, OLAPTableScan, OLAPUnionRel, OLAPValuesRel, OLAPWindowRel} -import org.apache.kylin.query.runtime.plans.{AggregatePlan, FilterPlan, LimitPlan, ProjectPlan, SortPlan, TableScanPlan, ValuesPlan, WindowPlan} +import org.apache.kylin.query.relnode._ +import org.apache.kylin.query.runtime.plans._ import org.apache.spark.internal.Logging import org.apache.spark.sql.DataFrame @@ -95,14 +95,14 @@ class CalciteToSparkPlaner(dataContext: DataContext) extends RelVisitor with Log val right = stack.pop() val left = stack.pop() logTime("join") { - plans.JoinPlan.join(Lists.newArrayList(left, right), rel) + JoinPlan.join(Lists.newArrayList(left, right), rel) } } case rel: OLAPUnionRel => val size = unionStack.pop() val java = Range(0, stack.size() - size).map(a => stack.pop()).asJava logTime("union") { - plans.UnionPlan.union(Lists.newArrayList(java), rel, dataContext) + UnionPlan.union(Lists.newArrayList(java), rel, dataContext) } case rel: OLAPValuesRel => logTime("values") { diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java index ed51427..c2b0cc4 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java @@ -57,7 +57,6 @@ public class SparkEngine implements QueryEngine { log.trace("Begin planning spark plan."); long start = System.currentTimeMillis(); CalciteToSparkPlaner calciteToSparkPlaner = new CalciteToSparkPlaner(dataContext); - long t = System.currentTimeMillis(); calciteToSparkPlaner.go(relNode); long takeTime = System.currentTimeMillis() - start; log.trace("Plan take {} ms", takeTime); diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala index ac45f0e..88647de 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/UnionPlan.scala @@ -31,14 +31,11 @@ object UnionPlan { dataContext: DataContext): DataFrame = { var df = inputs.get(0) val drop = inputs.asScala.drop(1) - if (rel.all) { - for (other <- drop) { - df = df.union(other) - } - } else { - for (other <- drop) { - df = df.union(other).distinct() - } + for (other <- drop) { + df = df.union(other) + } + if (!rel.all) { + df = df.distinct() } df } diff --git a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java index b469446..03c5837 100644 --- a/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java +++ b/query/src/main/java/org/apache/kylin/query/exec/SparkExec.java @@ -25,13 +25,9 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.kylin.common.QueryContextFacade; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.query.relnode.OLAPRel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class SparkExec { - private static final Logger logger = LoggerFactory.getLogger(SparkExec.class); - public static Enumerable<Object[]> collectToEnumerable(DataContext dataContext) { if (BackdoorToggles.getPrepareOnly()) { return Linq4j.emptyEnumerable();