This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new d9a5c70 KYLIN-5111 Record the time spent for each stage of query in kylin4's log d9a5c70 is described below commit d9a5c70136c29b77f981058c7166492ce53ea2a4 Author: yaqian.zhang <598593...@qq.com> AuthorDate: Mon Nov 1 10:55:37 2021 +0800 KYLIN-5111 Record the time spent for each stage of query in kylin4's log --- .../java/org/apache/kylin/common/QueryContext.java | 10 +- .../java/org/apache/kylin/common/QueryTrace.java | 141 +++++++++++++++++++++ .../org/apache/kylin/common/QueryTraceTest.java | 47 +++++++ .../apache/kylin/query/runtime/SparkEngine.java | 3 + .../kylin/query/runtime/plans/ResultPlan.scala | 5 + .../apache/kylin/query/util/SparkJobTrace.scala | 118 +++++++++++++++++ .../org/apache/spark/sql/metrics/AppStatus.scala | 101 +++++++++++++++ .../query/relnode/OLAPToEnumerableConverter.java | 3 + .../apache/kylin/rest/response/SQLResponse.java | 10 ++ .../kylin/rest/response/SQLResponseTrace.java | 61 +++++++++ .../apache/kylin/rest/service/QueryService.java | 16 ++- .../kylin/rest/response/SQLResponseTest.java | 2 +- 12 files changed, 513 insertions(+), 4 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java index 53f67f9..69e2994 100644 --- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java +++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java @@ -69,12 +69,12 @@ public class QueryContext { private boolean isHighPriorityQuery = false; private boolean isTableIndex = false; private boolean withoutSyntaxError; + private QueryTrace queryTrace = new QueryTrace(); private AtomicBoolean isRunning = new AtomicBoolean(true); private AtomicReference<Throwable> throwable = new AtomicReference<>(); private String stopReason; private List<QueryStopListener> stopListeners = Lists.newCopyOnWriteArrayList(); - private List<RPCStatistics> rpcStatisticsList = Lists.newCopyOnWriteArrayList(); private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap(); @@ -205,6 +205,14 @@ public class QueryContext { return metadataTime.addAndGet(time); } + public QueryTrace getQueryTrace() { + return queryTrace; + } + + public void setQueryTrace(QueryTrace queryTrace) { + this.queryTrace = queryTrace; + } + //Scaned time with Spark public long getScanTime() { return scanTime.get(); diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java b/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java new file mode 100644 index 0000000..4387e0a --- /dev/null +++ b/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class QueryTrace { + + // span name + public static final String SQL_TRANSFORMATION = "SQL_TRANSFORMATION"; + public static final String SQL_PARSE_AND_OPTIMIZE = "SQL_PARSE_AND_OPTIMIZE"; + public static final String CUBE_MATCHING = "CUBE_MATCHING"; + public static final String PREPARE_AND_SUBMIT_JOB = "PREPARE_AND_SUBMIT_JOB"; + public static final String WAIT_FOR_EXECUTION = "WAIT_FOR_EXECUTION"; + public static final String EXECUTION = "EXECUTION"; + public static final String FETCH_RESULT = "FETCH_RESULT"; + + // group name + static final String PREPARATION = "PREPARATION"; + static final Map<String, String> SPAN_GROUPS = new HashMap<>(); + static { + SPAN_GROUPS.put(SQL_TRANSFORMATION, PREPARATION); + SPAN_GROUPS.put(SQL_PARSE_AND_OPTIMIZE, PREPARATION); + SPAN_GROUPS.put(CUBE_MATCHING, PREPARATION); + } + + + private List<Span> spans = new LinkedList<>(); + + public Optional<Span> getLastSpan() { + return spans.isEmpty() ? Optional.empty() : Optional.of(spans.get(spans.size() - 1)); + } + + public void endLastSpan() { + getLastSpan().ifPresent(span -> { + if (span.duration == -1) { + span.duration = System.currentTimeMillis() - span.start; + } + }); + } + + public void startSpan(String name) { + endLastSpan(); + spans.add(new Span(name, System.currentTimeMillis())); + } + + public void appendSpan(String name, long duration) { + spans.add(new Span(name, + getLastSpan().map(span -> span.getStart() + span.getDuration()).orElse(System.currentTimeMillis()), + duration)); + } + + public void amendLast(String name, long endAt) { + for (int i = spans.size() - 1; i >= 0; i--) { + if (spans.get(i).name.equals(name)) { + spans.get(i).duration = endAt - spans.get(i).start; + return; + } + } + } + + public List<Span> spans() { + return spans; + } + + public static class Span { + String name; + + String group; + + long start; + + long duration = -1; + + public long getDuration() { + return duration; + } + + public long getStart() { + return start; + } + + public String getGroup() { + return group; + } + + public String getName() { + return name; + } + + public void setDuration(long duration) { + this.duration = duration; + } + + public void setGroup(String group) { + this.group = group; + } + + public void setName(String name) { + this.name = name; + } + + public void setStart(long start) { + this.start = start; + } + + public Span(String name, long start, long duration) { + this.name = name; + this.start = start; + this.duration = duration; + this.group = SPAN_GROUPS.get(name); + } + + public Span(String name, long start) { + this.name = name; + this.start = start; + this.group = SPAN_GROUPS.get(name); + } + } +} + diff --git a/core-common/src/test/java/org/apache/kylin/common/QueryTraceTest.java b/core-common/src/test/java/org/apache/kylin/common/QueryTraceTest.java new file mode 100644 index 0000000..5261b73 --- /dev/null +++ b/core-common/src/test/java/org/apache/kylin/common/QueryTraceTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common; + +import org.junit.Assert; +import org.junit.Test; + +public class QueryTraceTest { + + @Test + public void test() throws InterruptedException { + QueryTrace trace = new QueryTrace(); + trace.startSpan("span 1"); + Thread.sleep(100); + trace.startSpan("span 2"); + Thread.sleep(100); + trace.endLastSpan(); + Assert.assertEquals(2, trace.spans().size()); + Assert.assertTrue(trace.getLastSpan().isPresent()); + Assert.assertEquals("span 2", trace.getLastSpan().get().name); + assertTimeEqual(100, trace.getLastSpan().get().duration); + + trace.amendLast("span 2", trace.getLastSpan().get().start + trace.getLastSpan().get().getDuration() + 1000); + assertTimeEqual(1100, trace.getLastSpan().get().duration); + } + + private void assertTimeEqual(long expected, long actual) { + Assert.assertTrue(Math.abs(expected - actual) < 1000); + } + +} diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java index c2b0cc4..3a504ae 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java @@ -22,6 +22,8 @@ import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; +import org.apache.kylin.common.QueryContextFacade; +import org.apache.kylin.common.QueryTrace; import org.apache.kylin.query.exec.QueryEngine; import org.apache.kylin.query.runtime.plans.ResultPlan; import org.apache.kylin.query.runtime.plans.ResultType; @@ -55,6 +57,7 @@ public class SparkEngine implements QueryEngine { private Dataset<Row> toSparkPlan(DataContext dataContext, RelNode relNode) { log.trace("Begin planning spark plan."); + QueryContextFacade.current().getQueryTrace().startSpan(QueryTrace.PREPARE_AND_SUBMIT_JOB); long start = System.currentTimeMillis(); CalciteToSparkPlaner calciteToSparkPlaner = new CalciteToSparkPlaner(dataContext); calciteToSparkPlaner.go(relNode); diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala index 991a7f2..c0558b5 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala @@ -27,6 +27,7 @@ import org.apache.kylin.common.{KylinConfig, QueryContext, QueryContextFacade} import org.apache.kylin.common.util.HadoopUtil import org.apache.kylin.metadata.project.ProjectManager import org.apache.kylin.query.runtime.plans.ResultType.ResultType +import org.apache.kylin.query.util.SparkJobTrace import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, SparderContext} import org.apache.spark.sql.hive.utils.QueryMetricUtils @@ -104,8 +105,12 @@ object ResultPlan extends Logging { sparkContext.setJobGroup(jobGroup, "Query Id: " + QueryContextFacade.current().getQueryId, interruptOnCancel = true) + val currentTrace = QueryContextFacade.current().getQueryTrace + currentTrace.endLastSpan() + val jobTrace = new SparkJobTrace(jobGroup, currentTrace, sparkContext) try { val rows = df.collect() + jobTrace.jobFinished() val (scanRows, scanFiles, metadataTime, scanTime, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan) QueryContextFacade.current().addAndGetScannedRows(scanRows.asScala.map(Long2long(_)).sum) QueryContextFacade.current().addAndGetScanFiles(scanFiles.asScala.map(Long2long(_)).sum) diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala new file mode 100644 index 0000000..e6de31a --- /dev/null +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.query.util + +import org.apache.kylin.common.QueryTrace +import org.apache.spark.SparkContext +import org.apache.spark.sql.metrics.AppStatus +import org.apache.spark.utils.LogEx + +/** + * helper class for tracing the spark job execution time during query + */ +class SparkJobTrace(jobGroup: String, + queryTrace: QueryTrace, + sparkContext: SparkContext, + startAt: Long = System.currentTimeMillis()) extends LogEx { + + val appStatus = new AppStatus(sparkContext) + + /** + * called right after job execution is done and the helper will calculate and estimate + * durations for job execution steps (WAIT_FOR_EXECUTION, EXECUTION, FETCH_RESULT) + * + * As stages and tasks are executed in parallel, it is hard to have a precise duration + * trace for each step + * In this helper, we estimate the duration of WAIT_FOR_EXECUTION, EXECUTION, FETCH_RESULT + * as follows + * 1. Calculate the mean task launch delay, task execution duration and task fetch result time. + * the launch delay = task launch time - stage submission time + * 2. Sum the mean task launch delay, task execution duration and task fetch result time + * from all stages. And calculate the proportion of each part + * 3. Calculate the duration of each step by multiple the corresponding proportion and the + * total job execution duration + * + * We use the mean of task launch delay as it can give a rough estimation on how much time + * the tasks in a stage are spending on waiting for a free executor. And If the delay is + * Long, it may imply the executor-core config is not insufficient for the number of tasks, + * or the cluster is in heavy work load + */ + def jobFinished(): Unit = { + try { + val jobDataSeq = appStatus.getJobData(jobGroup) + + if (jobDataSeq.isEmpty) { + endAbnormalExecutionTrace() + return + } + + var jobExecutionTime = System.currentTimeMillis() - startAt + val submissionTime = jobDataSeq.map(_.submissionTime).min + if (submissionTime.isDefined) { + queryTrace.amendLast(QueryTrace.PREPARE_AND_SUBMIT_JOB, submissionTime.get.getTime) + } + val completionTime = jobDataSeq.map(_.completionTime).max + if (submissionTime.isDefined && completionTime.isDefined) { + jobExecutionTime = completionTime.get.getTime - submissionTime.get.getTime + } + + val jobMetrics = jobDataSeq.map(_.jobId) + .flatMap(appStatus.getJobStagesSummary(_, 0.5)) + .foldLeft((0.0, 0.0)) { (acc, taskMetrics) => + ( + acc._1 + taskMetrics.executorRunTime.head + taskMetrics.executorDeserializeTime.head, + acc._2 + taskMetrics.gettingResultTime.head + ) + } + val launchDelayTimeSum = jobDataSeq.flatMap(_.stageIds).flatMap(appStatus.getStage).map { stage => + appStatus.getTaskLaunchTime(stage.stageId(), 0.5) - stage.submissionTime() + }.sum + val sum = jobMetrics._1 + jobMetrics._2 + launchDelayTimeSum + val computingTime = jobMetrics._1 * jobExecutionTime / sum + val getResultTime = jobMetrics._2 * jobExecutionTime / sum + val launchDelayTime = launchDelayTimeSum * jobExecutionTime / sum + + queryTrace.appendSpan(QueryTrace.WAIT_FOR_EXECUTION, launchDelayTime.longValue()); + queryTrace.appendSpan(QueryTrace.EXECUTION, computingTime.longValue()); + queryTrace.appendSpan(QueryTrace.FETCH_RESULT, getResultTime.longValue()); + } catch { + case e => + logWarning(s"Failed trace spark job execution for $jobGroup", e) + endAbnormalExecutionTrace() + } + } + + /** + * called right after result transformation is done to count the + * transformation time to total result fetch duration + */ + def resultConverted(): Unit = { + queryTrace.amendLast(QueryTrace.FETCH_RESULT, System.currentTimeMillis()) + } + + /** + * add dummy spans for abnormal trace anyway + */ + def endAbnormalExecutionTrace(): Unit = { + queryTrace.appendSpan(QueryTrace.WAIT_FOR_EXECUTION, 0); + queryTrace.appendSpan(QueryTrace.EXECUTION, System.currentTimeMillis() - startAt); + queryTrace.appendSpan(QueryTrace.FETCH_RESULT, 0); + } +} + diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala new file mode 100644 index 0000000..e1eb6f8 --- /dev/null +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.metrics + +import org.apache.spark.{SparkContext, SparkStageInfo} +import org.apache.spark.status.{TaskDataWrapper, TaskIndexNames} +import org.apache.spark.util.Utils +import org.apache.spark.status.api.v1 + +class AppStatus(sparkContext: SparkContext) { + + def getTaskLaunchTime(stageId: Int, quantile: Double): Double = { + scanTasks(stageId, TaskIndexNames.LAUNCH_TIME, quantile) { t => t.launchTime } + } + + // copied from org.apache.spark.status.AppStatusStore.taskSummary + def scanTasks(stageId: Int, index: String, quantile: Double)(fn: TaskDataWrapper => Long): Double = { + val stageKey = Array(stageId, 0) + val count = { + Utils.tryWithResource( + sparkContext.statusStore.store.view(classOf[TaskDataWrapper]) + .parent(stageKey) + .index(TaskIndexNames.EXEC_RUN_TIME) + .first(0L) + .closeableIterator() + ) { it => + var _count = 0L + while (it.hasNext()) { + _count += 1 + it.skip(1) + } + _count + } + } + + val idx = math.min((quantile * count).toLong, count - 1) + Utils.tryWithResource( + sparkContext.statusStore.store.view(classOf[TaskDataWrapper]) + .parent(stageKey) + .index(index) + .first(0L) + .closeableIterator() + ) { it => + var last = Double.NaN + var currentIdx = -1L + if (idx == currentIdx) { + last + } else { + val diff = idx - currentIdx + currentIdx = idx + if (it.skip(diff - 1)) { + last = fn(it.next()).toDouble + last + } else { + Double.NaN + } + } + } + } + + def getJobStagesSummary(jobId: Int, quantile: Double): Seq[v1.TaskMetricDistributions] = { + getJobData(jobId).map { jobData => + jobData.stageIds.flatMap { stageId => + sparkContext.statusStore.taskSummary(stageId, 0, Array(quantile)) + } + }.getOrElse(Seq.empty) + } + + def getStage(stageId: Int): Option[SparkStageInfo] = { + sparkContext.statusTracker.getStageInfo(stageId) + } + + def getJobData(jobGroup: String): Seq[v1.JobData] = { + sparkContext.statusTracker.getJobIdsForGroup(jobGroup).map(getJobData).filter(_.isDefined).map(_.get) + } + + def getJobData(jobId: Int): Option[v1.JobData] = { + try { + Some(sparkContext.statusStore.job(jobId)) + } catch { + case _: NoSuchElementException => None + } + } +} + diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java index 20e5f21..5a4094f 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.stream.Collectors; import com.google.common.collect.Lists; +import org.apache.kylin.common.QueryTrace; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,6 +108,8 @@ public class OLAPToEnumerableConverter extends ConverterImpl implements Enumerab logger.debug(dumpPlan); } + QueryContextFacade.current().getQueryTrace().startSpan(QueryTrace.CUBE_MATCHING); + RealizationChooser.selectRealization(contexts); QueryInfoCollector.current().setCubeNames(contexts.stream() diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java index 37578ec..fcfc8ad 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java +++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java @@ -95,6 +95,8 @@ public class SQLResponse implements Serializable { // indicating the lazy query start time, -1 indicating not enabled protected long lazyQueryStartTime = -1L; + private List<SQLResponseTrace> traces; + public SQLResponse() { } @@ -296,6 +298,14 @@ public class SQLResponse implements Serializable { this.realizationTypes = realizationTypes; } + public void setTraces(List<SQLResponseTrace> traces) { + this.traces = traces; + } + + public List<SQLResponseTrace> getTraces() { + return traces; + } + @JsonIgnore public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() { try { diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponseTrace.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponseTrace.java new file mode 100644 index 0000000..3145f69 --- /dev/null +++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponseTrace.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.rest.response; + +public class SQLResponseTrace { + + public void setName(String name) { + this.name = name; + } + + public void setGroup(String group) { + this.group = group; + } + + public void setDuration(long duration) { + this.duration = duration; + } + + public String getName() { + return name; + } + + public String getGroup() { + return group; + } + + public long getDuration() { + return duration; + } + + private String name; + + private String group; + + private long duration; + + public SQLResponseTrace() { + } + + public SQLResponseTrace(String name, String group, long duration) { + this.name = name; + this.group = group; + this.duration = duration; + } +} diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java index d1fc461..fcf3be2 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.NoSuchElementException; +import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -66,6 +67,7 @@ import org.apache.kylin.cache.cachemanager.MemcachedCacheManager; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.QueryContext; import org.apache.kylin.common.QueryContextFacade; +import org.apache.kylin.common.QueryTrace; import org.apache.kylin.metrics.QuerySparkMetrics; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.exceptions.ResourceLimitExceededException; @@ -113,6 +115,7 @@ import org.apache.kylin.rest.msg.MsgPicker; import org.apache.kylin.rest.request.PrepareSqlRequest; import org.apache.kylin.rest.request.SQLRequest; import org.apache.kylin.rest.response.SQLResponse; +import org.apache.kylin.rest.response.SQLResponseTrace; import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.AclPermissionUtil; import org.apache.kylin.rest.util.QueryRequestLimits; @@ -219,6 +222,9 @@ public class QueryService extends BasicService { badQueryDetector.queryStart(Thread.currentThread(), sqlRequest, user, queryId); ret = queryWithSqlMassage(sqlRequest); + ret.setTraces(QueryContextFacade.current().getQueryTrace().spans().stream() + .map(span -> new SQLResponseTrace(span.getName(), span.getGroup(), span.getDuration())) + .collect(Collectors.toList())); return ret; } finally { @@ -386,6 +392,11 @@ public class QueryService extends BasicService { stringBuilder.append("Used Spark pool: ").append(response.getSparkPool()).append(newLine); stringBuilder.append("Trace URL: ").append(response.getTraceUrl()).append(newLine); stringBuilder.append("Message: ").append(response.getExceptionMessage()).append(newLine); + if (response.getTraces() != null) { + stringBuilder.append("Time consuming for each query stage: -----------------").append(newLine); + response.getTraces().forEach(trace -> stringBuilder.append(trace.getName() + " : " + trace.getDuration() + "ms").append(newLine)); + stringBuilder.append("Time consuming for each query stage: -----------------").append(newLine); + } stringBuilder.append("==========================[QUERY]===============================").append(newLine); logger.info(stringBuilder.toString()); @@ -408,8 +419,10 @@ public class QueryService extends BasicService { public SQLResponse doQueryWithCache(SQLRequest sqlRequest, boolean isQueryInspect) { Message msg = MsgPicker.getMsg(); sqlRequest.setUsername(getUserName()); + final QueryContext queryContext = QueryContextFacade.current(); KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + queryContext.getQueryTrace().startSpan(QueryTrace.SQL_TRANSFORMATION); if (!ServerMode.SERVER_MODE.canServeQuery()) { throw new BadRequestException( String.format(Locale.ROOT, msg.getQUERY_NOT_ALLOWED(), kylinConfig.getServerMode())); @@ -430,8 +443,6 @@ public class QueryService extends BasicService { if (sqlRequest.getBackdoorToggles() != null) BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles()); - final QueryContext queryContext = QueryContextFacade.current(); - try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) { // force clear the query context before a new query OLAPContext.clearThreadLocalContexts(); @@ -1023,6 +1034,7 @@ public class QueryService extends BasicService { try { stat = conn.createStatement(); processStatementAttr(stat, sqlRequest); + QueryContextFacade.current().getQueryTrace().startSpan(QueryTrace.SQL_PARSE_AND_OPTIMIZE); resultSet = stat.executeQuery(correctedSql); r = createResponseFromResultSet(resultSet); diff --git a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java index 370b1ea..deeafc9 100644 --- a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java +++ b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java @@ -36,7 +36,7 @@ public class SQLResponseTest { "realizationTypes", "affectedRowCount", "isException", "exceptionMessage", "duration", "partial", "totalScanCount", "hitExceptionCache", "storageCacheUsed", "sparkPool", "pushDown", "traceUrl", "totalScanBytes", - "totalScanFiles", "metadataTime", "totalSparkScanTime" }; + "totalScanFiles", "metadataTime", "totalSparkScanTime", "traces"}; SQLResponse sqlResponse = new SQLResponse(null, null, "learn_cube", 100, false, null, false, false); String jsonStr = JsonUtil.writeValueAsString(sqlResponse);