This is an automated email from the ASF dual-hosted git repository. zhangzc pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new d554c76 KYLIN-4903 cache parent datasource to accelerate next layer's cuboid building d554c76 is described below commit d554c762838ddac4bf5e38d294e049788e6b8423 Author: zhengshengjun <shengjun_zh...@sina.com> AuthorDate: Thu Mar 4 16:42:38 2021 +0800 KYLIN-4903 cache parent datasource to accelerate next layer's cuboid building --- .../org/apache/kylin/common/KylinConfigBase.java | 8 +++ .../engine/spark/job/BuildLayoutWithUpdate.java | 61 ++++++++++++++++++++++ .../kylin/engine/spark/job/CubeBuildJob.java | 16 ++++-- .../kylin/engine/spark/job/CubeMergeJob.java | 8 ++- 4 files changed, 89 insertions(+), 4 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index ed9a243..34f8ce9 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -3096,4 +3096,12 @@ public abstract class KylinConfigBase implements Serializable { public String getKerberosPrincipal() { return getOptional("kylin.kerberos.principal"); } + + public String getParentDatasetStorageLevel() { + return getOptional("kylin.engine.spark.parent-dataset.storage.level", "NONE"); + } + + public int getMaxParentDatasetPersistCount() { + return Integer.parseInt(getOptional("kylin.engine.spark.parent-dataset.max.persist.count", "1")); + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java index b07f848..91024b9 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/BuildLayoutWithUpdate.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine.spark.job; import java.io.IOException; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; @@ -26,10 +27,17 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.spark.builder.NBuildSourceInfo; import org.apache.kylin.engine.spark.metadata.SegmentInfo; import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,8 +46,48 @@ public class BuildLayoutWithUpdate { private ExecutorService pool = Executors.newCachedThreadPool(); private CompletionService<JobResult> completionService = new ExecutorCompletionService<>(pool); private int currentLayoutsNum = 0; + private Map<Long, AtomicLong> toBuildCuboidSize = new ConcurrentHashMap<>(); + private Semaphore semaphore; + private Map<Long, Dataset<Row>> layout2DataSet = new ConcurrentHashMap<>(); + private StorageLevel storageLevel; + private boolean persistParentDataset; + + public BuildLayoutWithUpdate(KylinConfig kylinConfig) { + this.storageLevel = StorageLevel.fromString(kylinConfig.getParentDatasetStorageLevel()); + this.persistParentDataset = !storageLevel.equals(StorageLevel.NONE()); + if (this.persistParentDataset) { + if (kylinConfig.getMaxParentDatasetPersistCount() < 1) { + throw new IllegalArgumentException("max parent dataset persist count should be larger than 1"); + } + this.semaphore = new Semaphore(kylinConfig.getMaxParentDatasetPersistCount()); + } + } + + public void cacheAndRegister(long layoutId, Dataset<Row> dataset) throws InterruptedException{ + if (!persistParentDataset) { + return; + } + logger.info("persist dataset of layout: {}", layoutId); + semaphore.acquire(); + layout2DataSet.put(layoutId, dataset); + dataset.persist(storageLevel); + } public void submit(JobEntity job, KylinConfig config) { + + //if job's BuildSourceInfo is empty, it means this is a merge job, no parent dataset to persist + if (persistParentDataset && job.getBuildSourceInfo() != null && job.getBuildSourceInfo().getToBuildCuboids().size() > 1) { + //when reuse parent dataset is enabled, ensure parent dataset is registered + if(!layout2DataSet.containsKey(job.getBuildSourceInfo().getLayoutId())){ + logger.error("persist parent dataset is enabled, but parent dataset not registered"); + throw new RuntimeException("parent dataset not registered"); + } + if (!toBuildCuboidSize.containsKey(job.getBuildSourceInfo().getLayoutId())) { + toBuildCuboidSize.put(job.getBuildSourceInfo().getLayoutId(), + new AtomicLong(job.getBuildSourceInfo().getToBuildCuboids().size())); + } + } + completionService.submit(new Callable<JobResult>() { @Override public JobResult call() throws Exception { @@ -52,6 +100,17 @@ public class BuildLayoutWithUpdate { } catch (Throwable t) { logger.error("Error occurred when run " + job.getName(), t); throwable = t; + } finally { + //unpersist parent dataset + if (persistParentDataset && job.getBuildSourceInfo() != null && job.getBuildSourceInfo().getToBuildCuboids().size() > 1) { + long remain = toBuildCuboidSize.get(job.getBuildSourceInfo().getLayoutId()).decrementAndGet(); + if (remain == 0) { + toBuildCuboidSize.remove(job.getBuildSourceInfo().getLayoutId()); + layout2DataSet.get(job.getBuildSourceInfo().getLayoutId()).unpersist(); + logger.info("dataset of layout: {} released", job.getBuildSourceInfo().getLayoutId()); + semaphore.release(); + } + } } return new JobResult(dataLayouts, throwable); } @@ -115,5 +174,7 @@ public class BuildLayoutWithUpdate { public abstract String getName(); public abstract LayoutEntity build() throws IOException; + + public abstract NBuildSourceInfo getBuildSourceInfo(); } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index b463dad..efce341 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -157,7 +157,7 @@ public class CubeBuildJob extends SparkApplication { logger.info("Triggered cube planner phase one ."); } - buildLayoutWithUpdate = new BuildLayoutWithUpdate(); + buildLayoutWithUpdate = new BuildLayoutWithUpdate(config); List<String> persistedFlatTable = new ArrayList<>(); List<String> persistedViewFactTable = new ArrayList<>(); Path shareDir = config.getJobTmpShareDir(project, jobId); @@ -297,7 +297,7 @@ public class CubeBuildJob extends SparkApplication { } } - private void build(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg, SpanningTree st) { + private void build(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg, SpanningTree st) throws InterruptedException{ List<NBuildSourceInfo> theFirstLevelBuildInfos = buildLayer(buildSourceInfos, seg, st); LinkedList<List<NBuildSourceInfo>> queue = new LinkedList<>(); @@ -318,7 +318,7 @@ public class CubeBuildJob extends SparkApplication { // build current layer and return the next layer to be built. private List<NBuildSourceInfo> buildLayer(Collection<NBuildSourceInfo> buildSourceInfos, SegmentInfo seg, - SpanningTree st) { + SpanningTree st) throws InterruptedException{ int cuboidsNumInLayer = 0; // build current layer @@ -330,6 +330,11 @@ public class CubeBuildJob extends SparkApplication { cuboidsNumInLayer += toBuildCuboids.size(); Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built cuboids is empty."); Dataset<Row> parentDS = info.getParentDS(); + + if (toBuildCuboids.size() > 1) { + buildLayoutWithUpdate.cacheAndRegister(info.getLayoutId(), parentDS); + } + // record the source count of flat table if (info.getLayoutId() == ParentSourceChooser.FLAT_TABLE_FLAG()) { cuboidsRowCount.putIfAbsent(info.getLayoutId(), parentDS.count()); @@ -347,6 +352,11 @@ public class CubeBuildJob extends SparkApplication { public LayoutEntity build() throws IOException { return buildCuboid(seg, index, parentDS, st, info.getLayoutId()); } + + @Override + public NBuildSourceInfo getBuildSourceInfo() { + return info; + } }, config); allIndexesInCurrentLayer.add(index); } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java index 12e939d..83bc601 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java @@ -28,6 +28,7 @@ import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.engine.spark.builder.NBuildSourceInfo; import org.apache.kylin.engine.spark.metadata.SegmentInfo; import org.apache.kylin.engine.spark.metadata.cube.ManagerHub; import org.apache.kylin.engine.spark.metadata.cube.PathManager; @@ -67,7 +68,7 @@ public class CubeMergeJob extends SparkApplication { @Override protected void doExecute() throws Exception { - buildLayoutWithUpdate = new BuildLayoutWithUpdate(); + buildLayoutWithUpdate = new BuildLayoutWithUpdate(config); String cubeId = getParam(MetadataConstants.P_CUBE_ID); String newSegmentId = getParam(MetadataConstants.P_SEGMENT_IDS); final CubeManager cubeManager = CubeManager.getInstance(config); @@ -117,6 +118,11 @@ public class CubeMergeJob extends SparkApplication { public LayoutEntity build() throws IOException { return saveAndUpdateCuboid(afterSort, mergedSegInfo, layout, assist); } + + @Override + public NBuildSourceInfo getBuildSourceInfo() { + return null; + } }, config); buildLayoutWithUpdate.updateLayout(mergedSegInfo, config);