This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/kylin.git
commit dc3fd9ab25ecd03ec53f8855f5366db8de542ce5 Author: hujiahua <hujia...@youzan.com> AuthorDate: Thu Nov 11 14:39:03 2021 +0800 [KYLIN-5121] Make JobMetricsUtils.collectMetrics be working again --- .../kylin/engine/spark/job/OptimizeBuildJob.java | 28 ++++---- .../kylin/engine/spark/job/CubeBuildJob.java | 68 +++++++++---------- .../kylin/engine/spark/job/CubeMergeJob.java | 41 +++++------ .../kylin/engine/spark/utils/JobMetricsUtils.scala | 79 +++++----------------- .../utils/QueryExecutionInterceptListener.scala | 33 +++++++++ 5 files changed, 110 insertions(+), 139 deletions(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java index 96ca190..8a412d5 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java @@ -41,16 +41,15 @@ import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree; import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity; import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree; +import org.apache.kylin.engine.spark.utils.BuildUtils; import org.apache.kylin.engine.spark.utils.JobMetrics; import org.apache.kylin.engine.spark.utils.JobMetricsUtils; import org.apache.kylin.engine.spark.utils.Metrics; -import org.apache.kylin.engine.spark.utils.QueryExecutionCache; -import org.apache.kylin.engine.spark.utils.BuildUtils; -import org.apache.kylin.metadata.model.IStorageAware; -import org.apache.kylin.shaded.com.google.common.base.Preconditions; import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.shaded.com.google.common.base.Joiner; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; import org.apache.kylin.shaded.com.google.common.collect.Lists; import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.apache.kylin.storage.StorageFactory; @@ -63,16 +62,13 @@ import scala.Tuple2; import scala.collection.JavaConversions; import java.io.IOException; - +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.Collection; -import java.util.LinkedList; -import java.util.ArrayList; -import java.util.UUID; - import java.util.stream.Collectors; public class OptimizeBuildJob extends SparkApplication { @@ -378,15 +374,15 @@ public class OptimizeBuildJob extends SparkApplication { long parentId) throws IOException { long layoutId = layout.getId(); - // for spark metrics - String queryExecutionId = UUID.randomUUID().toString(); - ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId); - NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout, NSparkCubingEngine.NSparkCubingStorage.class); String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(), String.valueOf(layoutId)); + String tempPath = path + TEMP_DIR_SUFFIX; + // for spark metrics + String queryExecutionId = tempPath; + JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId); // save to temp path logger.info("Cuboids are saved to temp path : " + tempPath); storage.saveTo(tempPath, dataset, ss); @@ -402,14 +398,14 @@ public class OptimizeBuildJob extends SparkApplication { cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt); layout.setSourceRows(cuboidsRowCount.get(parentId)); } else { + cuboidsRowCount.putIfAbsent(layoutId, rowCount); layout.setRows(rowCount); layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT())); } int shardNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, cubeInstance.getConfig(), ss); layout.setShardNum(shardNum); cuboidShardNum.put(layoutId, (short) shardNum); - ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null); - QueryExecutionCache.removeQueryExecution(queryExecutionId); + JobMetricsUtils.unRegisterQueryExecutionListener(ss, queryExecutionId); BuildUtils.fillCuboidInfo(layout, path); } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index c30c206..a5f0e11 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -18,41 +18,21 @@ package org.apache.kylin.engine.spark.job; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.cube.cuboid.CuboidModeEnum; -import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.CubeStatsWriter; -import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; -import org.apache.kylin.measure.hllc.HLLCounter; -import org.apache.kylin.shaded.com.google.common.base.Joiner; -import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.StringUtils; +import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.cube.cuboid.CuboidModeEnum; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.engine.mr.common.CubeStatsWriter; +import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil; import org.apache.kylin.engine.spark.NSparkCubingEngine; import org.apache.kylin.engine.spark.application.SparkApplication; import org.apache.kylin.engine.spark.builder.NBuildSourceInfo; @@ -66,9 +46,14 @@ import org.apache.kylin.engine.spark.utils.BuildUtils; import org.apache.kylin.engine.spark.utils.JobMetrics; import org.apache.kylin.engine.spark.utils.JobMetricsUtils; import org.apache.kylin.engine.spark.utils.Metrics; -import org.apache.kylin.engine.spark.utils.QueryExecutionCache; +import org.apache.kylin.measure.hllc.HLLCounter; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.shaded.com.google.common.base.Joiner; +import org.apache.kylin.shaded.com.google.common.base.Preconditions; +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.apache.kylin.shaded.com.google.common.collect.Maps; +import org.apache.kylin.shaded.com.google.common.collect.Sets; import org.apache.kylin.storage.StorageFactory; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -77,14 +62,24 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.hive.utils.ResourceDetectUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.kylin.shaded.com.google.common.base.Preconditions; -import org.apache.kylin.shaded.com.google.common.collect.Lists; -import org.apache.kylin.shaded.com.google.common.collect.Sets; - import scala.Tuple2; import scala.collection.JavaConversions; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + public class CubeBuildJob extends SparkApplication { protected static final Logger logger = LoggerFactory.getLogger(CubeBuildJob.class); protected static String TEMP_DIR_SUFFIX = "_temp"; @@ -457,15 +452,14 @@ public class CubeBuildJob extends SparkApplication { long parentId) throws IOException { long layoutId = layout.getId(); - // for spark metrics - String queryExecutionId = UUID.randomUUID().toString(); - ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId); - NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout, NSparkCubingEngine.NSparkCubingStorage.class); String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(), String.valueOf(layoutId)); String tempPath = path + TEMP_DIR_SUFFIX; + // for spark metrics + String queryExecutionId = tempPath; + JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId); // save to temp path logger.info("Cuboids are saved to temp path : " + tempPath); storage.saveTo(tempPath, dataset, ss); @@ -479,14 +473,14 @@ public class CubeBuildJob extends SparkApplication { cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt); layout.setSourceRows(cuboidsRowCount.get(parentId)); } else { + cuboidsRowCount.putIfAbsent(layoutId, rowCount); layout.setRows(rowCount); layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT())); } int shardNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, cubeInstance.getConfig(), ss); layout.setShardNum(shardNum); cuboidShardNum.put(layoutId, (short) shardNum); - ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null); - QueryExecutionCache.removeQueryExecution(queryExecutionId); + JobMetricsUtils.unRegisterQueryExecutionListener(ss, queryExecutionId); BuildUtils.fillCuboidInfo(layout, path); } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java index ef3325f..16fecd3 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java @@ -18,16 +18,13 @@ package org.apache.kylin.engine.spark.job; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.spark.NSparkCubingEngine; +import org.apache.kylin.engine.spark.application.SparkApplication; +import org.apache.kylin.engine.spark.builder.CubeMergeAssist; import org.apache.kylin.engine.spark.builder.NBuildSourceInfo; import org.apache.kylin.engine.spark.metadata.SegmentInfo; import org.apache.kylin.engine.spark.metadata.cube.ManagerHub; @@ -35,28 +32,27 @@ import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree; import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity; import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree; +import org.apache.kylin.engine.spark.utils.BuildUtils; +import org.apache.kylin.engine.spark.utils.JobMetrics; +import org.apache.kylin.engine.spark.utils.JobMetricsUtils; +import org.apache.kylin.engine.spark.utils.Metrics; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.IStorageAware; +import org.apache.kylin.shaded.com.google.common.collect.Lists; +import org.apache.kylin.shaded.com.google.common.collect.Maps; import org.apache.kylin.storage.StorageFactory; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import org.apache.kylin.shaded.com.google.common.collect.Lists; -import org.apache.kylin.shaded.com.google.common.collect.Maps; - -import org.apache.kylin.engine.spark.NSparkCubingEngine; -import org.apache.kylin.engine.spark.application.SparkApplication; -import org.apache.kylin.engine.spark.builder.CubeMergeAssist; -import org.apache.kylin.engine.spark.utils.BuildUtils; -import org.apache.kylin.engine.spark.utils.JobMetrics; -import org.apache.kylin.engine.spark.utils.JobMetricsUtils; -import org.apache.kylin.engine.spark.utils.Metrics; -import org.apache.kylin.engine.spark.utils.QueryExecutionCache; import scala.collection.JavaConversions; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class CubeMergeJob extends SparkApplication { protected static final Logger logger = LoggerFactory.getLogger(CubeMergeJob.class); @@ -165,14 +161,14 @@ public class CubeMergeJob extends SparkApplication { sourceCount += cuboid.getSourceRows(); } - // for spark metrics - String queryExecutionId = UUID.randomUUID().toString(); - ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId); ss.sparkContext().setJobDescription("merge layout " + layoutId); NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout, NSparkCubingEngine.NSparkCubingStorage.class); String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(), String.valueOf(layoutId)); String tempPath = path + CubeBuildJob.TEMP_DIR_SUFFIX; + // for spark metrics + String queryExecutionId = tempPath; + JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId); // save to temp path storage.saveTo(tempPath, dataset, ss); @@ -191,9 +187,8 @@ public class CubeMergeJob extends SparkApplication { int partitionNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, config, ss); layout.setShardNum(partitionNum); cuboidShardNum.put(layoutId, (short)partitionNum); - ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null); ss.sparkContext().setJobDescription(null); - QueryExecutionCache.removeQueryExecution(queryExecutionId); + JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId); BuildUtils.fillCuboidInfo(layout, path); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala index dfb0d19..3130130 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala @@ -21,7 +21,6 @@ package org.apache.kylin.engine.spark.utils import java.util.concurrent.ConcurrentHashMap import org.apache.spark.internal.Logging -import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} @@ -33,20 +32,18 @@ object JobMetricsUtils extends Logging { private val aggs = List(classOf[HashAggregateExec], classOf[SortAggregateExec], classOf[ObjectHashAggregateExec]) private val joins = List(classOf[BroadcastHashJoinExec], classOf[ShuffledHashJoinExec], classOf[SortMergeJoinExec], classOf[BroadcastNestedLoopJoinExec], classOf[StreamingSymmetricHashJoinExec]) - var sparkListener : SparkListener = _ + + private val executionIdToListener = new ConcurrentHashMap[String, QueryExecutionInterceptListener]() + def collectMetrics(executionId: String): JobMetrics = { var metrics = new JobMetrics - val execution = QueryExecutionCache.getQueryExecution(executionId) - if (execution != null) { - metrics = collectOutputRows(execution.executedPlan) + val listener = executionIdToListener.getOrDefault(executionId,null) + if (listener != null && listener.queryExecution.isDefined) { + metrics = collectOutputRows(listener.queryExecution.get.executedPlan) logInfo(s"Collect output rows successfully. $metrics") + } else { + logInfo(s"Collect output rows failed.") } - - // comment below source, because it always collect failed when using apache spark. - - // else { - // logDebug(s"Collect output rows failed.") - //} metrics } @@ -90,61 +87,17 @@ object JobMetricsUtils extends Logging { rowMetrics } - /** - * When using a custom spark which sent event which contain QueryExecution belongs to a specific N_EXECUTION_ID_KEY, - * kylin can cache QueryExecution object into QueryExecutionCache and collect metrics such as bytes/row count for a cuboid - * - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case e: PostQueryExecutionForKylin => - val nExecutionId = e.localProperties.getProperty(QueryExecutionCache.N_EXECUTION_ID_KEY, "") - if (nExecutionId != "" && e.queryExecution != null) { - QueryExecutionCache.setQueryExecution(nExecutionId, e.queryExecution) - } else { - logWarning("executionIdStr is null, can't get QueryExecution from SQLExecution.") - } - case _ => // Ignore - } - */ - def registerListener(ss: SparkSession): Unit = { - sparkListener = new SparkListener { - - override def onOtherEvent(event: SparkListenerEvent): Unit = event match { - case _ => // Ignore - } - } - ss.sparkContext.addSparkListener(sparkListener) + def registerQueryExecutionListener(ss: SparkSession, executionId: String): Unit = { + val listener = new QueryExecutionInterceptListener(executionId) + executionIdToListener.put(executionId, listener) + ss.listenerManager.register(listener) } - def unRegisterListener(ss: SparkSession) : Unit = { - if (sparkListener != null) { - ss.sparkContext.removeSparkListener(sparkListener) + def unRegisterQueryExecutionListener(ss: SparkSession, executionId: String) : Unit = { + val listener = executionIdToListener.remove(executionId) + if (listener != null) { + ss.listenerManager.unregister(listener) } } } -object QueryExecutionCache extends Logging { - val N_EXECUTION_ID_KEY = "kylin.query.execution.id" - - private val executionIdToQueryExecution = new ConcurrentHashMap[String, QueryExecution]() - - def getQueryExecution(executionId: String): QueryExecution = { - if (executionId != null) { - executionIdToQueryExecution.get(executionId) - } else { - null - } - } - - def setQueryExecution(executionId: String, queryExecution: QueryExecution): Unit = { - if (executionId != null) { - executionIdToQueryExecution.put(executionId, queryExecution) - } else { - logWarning("kylin.query.execution.id is null, don't put QueryExecution into QueryExecutionCache.") - } - } - - def removeQueryExecution(executionId: String): Unit = { - executionIdToQueryExecution.remove(executionId) - } - -} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala new file mode 100644 index 0000000..d9d64ef --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala @@ -0,0 +1,33 @@ +package org.apache.kylin.engine.spark.utils + +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.command.DataWritingCommandExec +import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand +import org.apache.spark.sql.util.QueryExecutionListener + +import java.net.URI + +/** + * This QueryExecutionListener will intercept QueryExecution when outputPath matched, so make sure outputPath was unique. + */ +class QueryExecutionInterceptListener(outputPath: String) extends QueryExecutionListener{ + + var queryExecution : Option[QueryExecution] = None + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + qe.sparkPlan foreach { + case plan: DataWritingCommandExec =>{ + //check if output path match + if (plan.cmd.isInstanceOf[InsertIntoHadoopFsRelationCommand] + && plan.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand].outputPath.toUri.equals(new URI(outputPath))) { + queryExecution = Some(qe) + } + } + case _ => + } + } + + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = { + + } +}