This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push: new d56d72f KYLIN-5121 fix key not found: numOutputRows on s3 d56d72f is described below commit d56d72f736db5ea52bcc9a672dd20dc6ee4aaaae Author: yaqian.zhang <598593...@qq.com> AuthorDate: Thu Mar 10 10:03:08 2022 +0800 KYLIN-5121 fix key not found: numOutputRows on s3 --- .../apache/kylin/engine/spark/utils/JobMetricsUtils.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala index 3130130..437d8b0 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala @@ -56,11 +56,15 @@ object JobMetricsUtils extends Logging { case plan: UnaryExecNode => if (aggs.contains(plan.getClass) && !afterAgg) { afterAgg = true - rowMetrics.setMetrics(Metrics.CUBOID_ROWS_CNT, plan.metrics.apply("numOutputRows").value) + if (plan.metrics.contains("numOutputRows")) { + rowMetrics.setMetrics(Metrics.CUBOID_ROWS_CNT, plan.metrics.apply("numOutputRows").value) + } } case plan: BinaryExecNode => if (joins.contains(plan.getClass) && !afterJoin) { - rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, plan.metrics.apply("numOutputRows").value) + if (plan.metrics.contains("numOutputRows")) { + rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, plan.metrics.apply("numOutputRows").value) + } afterJoin = true } case plan: LeafExecNode => @@ -72,8 +76,10 @@ object JobMetricsUtils extends Logging { rowMetrics.getMetrics(Metrics.SOURCE_ROWS_CNT) } - val rowsCnt = preCnt + plan.metrics.apply("numOutputRows").value - rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, rowsCnt) + if (plan.metrics.contains("numOutputRows")) { + val rowsCnt = preCnt + plan.metrics.apply("numOutputRows").value + rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, rowsCnt) + } } case _ => }