This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push: new b341568 KYLIN-4761 Update some missing values of new segment when merge segments b341568 is described below commit b3415685d5de9abbf47ee12a5fcdcca4a56accdc Author: Zhichao Zhang <441586...@qq.com> AuthorDate: Sat Sep 19 23:04:22 2020 +0800 KYLIN-4761 Update some missing values of new segment when merge segments (cherry picked from commit 878874ba1d833a60b941c0655b4338618cc32a80) --- .../kylin/engine/spark/job/NSparkCubingStep.java | 43 +------ .../kylin/engine/spark/job/NSparkExecutable.java | 8 +- .../kylin/engine/spark/job/NSparkMergingStep.java | 6 + .../NSparkUpdateMetaAndCleanupAfterMergeStep.java | 41 +++---- .../merger/AfterMergeOrRefreshResourceMerger.java | 133 -------------------- .../kylin/engine/spark/merger/MetadataMerger.java | 40 ------ .../engine/spark/utils/UpdateMetadataUtil.java | 136 +++++++++++++++++++++ .../kylin/engine/spark/job/CubeMergeJob.java | 44 ++++++- .../engine/spark/LocalWithSparkSessionTest.java | 12 +- 9 files changed, 215 insertions(+), 248 deletions(-) diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java index c3234f1..5c073e1 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java @@ -18,30 +18,23 @@ package org.apache.kylin.engine.spark.job; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.spark.metadata.cube.PathManager; import org.apache.kylin.engine.spark.utils.MetaDumpUtil; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.spark.utils.UpdateMetadataUtil; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.MetadataConstants; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; -import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; @@ -79,38 +72,8 @@ public class NSparkCubingStep extends NSparkExecutable { } @Override - protected void updateMetaAfterBuilding(KylinConfig config) throws IOException { - CubeManager cubeManager = CubeManager.getInstance(config); - CubeInstance currentInstanceCopy = cubeManager.getCube(getCubeName()).latestCopyForWrite(); - KylinConfig kylinDistConfig = MetaDumpUtil.loadKylinConfigFromHdfs(getDistMetaUrl()); - CubeInstance distCube = CubeManager.getInstance(kylinDistConfig).reloadCube(getCubeName()); - CubeUpdate update = new CubeUpdate(currentInstanceCopy); - Set<String> segmentIds = Sets.newHashSet(org.apache.hadoop.util.StringUtils.split(getParam(MetadataConstants.P_SEGMENT_IDS))); - CubeSegment toUpdateSegs = distCube.getSegmentById(segmentIds.iterator().next()); - - List<CubeSegment> tobe = currentInstanceCopy.calculateToBeSegments(toUpdateSegs); - - if (!tobe.contains(toUpdateSegs)) - throw new IllegalStateException( - String.format(Locale.ROOT, "For cube %s, segment %s is expected but not in the tobe %s", - currentInstanceCopy.toString(), toUpdateSegs.toString(), tobe.toString())); - - toUpdateSegs.setStatus(SegmentStatusEnum.READY); - - List<CubeSegment> toRemoveSegs = Lists.newArrayList(); - for (CubeSegment segment : currentInstanceCopy.getSegments()) { - if (!tobe.contains(segment)) - toRemoveSegs.add(segment); - } - - logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, toRemoveSegs); - - update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])) - .setToUpdateSegs(toUpdateSegs); - if (currentInstanceCopy.getConfig().isJobAutoReadyCubeEnabled()) { - update.setStatus(RealizationStatusEnum.READY); - } - cubeManager.updateCube(update); + protected void updateMetaAfterOperation(KylinConfig config) throws IOException { + UpdateMetadataUtil.syncLocalMetadataToRemote(config, this); } @Override diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java index 7f5e4f0..f4efb76 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java @@ -266,7 +266,7 @@ public class NSparkExecutable extends AbstractExecutable { CliCommandExecutor exec = new CliCommandExecutor(); exec.execute(cmd, patternedLogger, jobId); - updateMetaAfterBuilding(config); + updateMetaAfterOperation(config); //Add metrics information to execute result for JobMetricsFacade getManager().addJobInfo(getId(), getJobMetricsInfo(config)); Map<String, String> extraInfo = makeExtraInfo(patternedLogger.getInfo()); @@ -278,7 +278,7 @@ public class NSparkExecutable extends AbstractExecutable { } } - protected void updateMetaAfterBuilding(KylinConfig config) throws IOException { + protected void updateMetaAfterOperation(KylinConfig config) throws IOException { } protected Map<String, String> getJobMetricsInfo(KylinConfig config) { @@ -343,14 +343,14 @@ public class NSparkExecutable extends AbstractExecutable { protected void appendSparkConf(StringBuilder sb, String key, String value) { // Multiple parameters in "--conf" need to be enclosed in single quotes - sb.append(" --conf '").append(key).append("=").append(value).append("' "); + sb.append(" --conf '").append(key).append("=").append(value.trim()).append("' "); } private ExecuteResult runLocalMode(String appArgs, KylinConfig config) { try { Class<? extends Object> appClz = ClassUtil.forName(getSparkSubmitClassName(), Object.class); appClz.getMethod("main", String[].class).invoke(null, (Object) new String[] { appArgs }); - updateMetaAfterBuilding(config); + updateMetaAfterOperation(config); //Add metrics information to execute result for JobMetricsFacade getManager().addJobInfo(getId(), getJobMetricsInfo(config)); diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java index 9699f23..90e5953 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java @@ -18,6 +18,7 @@ package org.apache.kylin.engine.spark.job; +import java.io.IOException; import java.util.Arrays; import java.util.Set; @@ -25,6 +26,7 @@ import org.apache.kylin.engine.spark.utils.MetaDumpUtil; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.engine.spark.utils.UpdateMetadataUtil; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.metadata.MetadataConstants; import org.slf4j.Logger; @@ -59,4 +61,8 @@ public class NSparkMergingStep extends NSparkExecutable { return true; } + @Override + protected void updateMetaAfterOperation(KylinConfig config) throws IOException { + UpdateMetadataUtil.syncLocalMetadataToRemote(config, this); + } } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java index 2ccd810..d01f714 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java @@ -21,13 +21,12 @@ package org.apache.kylin.engine.spark.job; import java.io.IOException; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger; import org.apache.kylin.engine.spark.metadata.cube.PathManager; +import org.apache.kylin.engine.spark.utils.UpdateMetadataUtil; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.ExecuteResult; @@ -44,35 +43,31 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep extends NSparkExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { String cubeId = getParam(MetadataConstants.P_CUBE_ID); String mergedSegmentUuid = getParam(CubingExecutableUtil.SEGMENT_ID); - KylinConfig config = KylinConfig.getInstanceFromEnv(); + final KylinConfig config = wrapConfig(context); CubeInstance cube = CubeManager.getInstance(config).getCubeByUuid(cubeId); - updateMetadataAfterMerge(cubeId); + try { + // update segments + UpdateMetadataUtil.updateMetadataAfterMerge(cubeId, mergedSegmentUuid, config); + } catch (IOException e) { + throw new ExecuteException("Can not update metadata of cube: " + cube.getName()); + } - CubeSegment mergedSegment = cube.getSegmentById(mergedSegmentUuid); - Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergedSegment); - // delete segments which were merged - for (CubeSegment segment : mergingSegments) { - try { - PathManager.deleteSegmentParquetStoragePath(cube, segment); - } catch (IOException e) { - throw new ExecuteException("Can not delete segment: " + segment.getName() + ", in cube: " + cube.getName()); + if (config.cleanStorageAfterDelOperation()) { + CubeSegment mergedSegment = cube.getSegmentById(mergedSegmentUuid); + Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergedSegment); + // delete segments which were merged + for (CubeSegment segment : mergingSegments) { + try { + PathManager.deleteSegmentParquetStoragePath(cube, segment); + } catch (IOException e) { + throw new ExecuteException("Can not delete segment: " + segment.getName() + ", in cube: " + cube.getName()); + } } } - return ExecuteResult.createSucceed(); } - private void updateMetadataAfterMerge(String cubeId) { - String buildStepUrl = getParam(MetadataConstants.P_OUTPUT_META_URL); - KylinConfig buildConfig = KylinConfig.createKylinConfig(this.getConfig()); - buildConfig.setMetadataUrl(buildStepUrl); - ResourceStore resourceStore = ResourceStore.getStore(buildConfig); - String mergedSegmentId = getParam(CubingExecutableUtil.SEGMENT_ID); - AfterMergeOrRefreshResourceMerger merger = new AfterMergeOrRefreshResourceMerger(buildConfig); - merger.merge(cubeId, mergedSegmentId, resourceStore, getParam(MetadataConstants.P_JOB_TYPE)); - } - @Override public void cleanup(ExecuteResult result) throws ExecuteException { // delete job tmp dir diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java deleted file mode 100644 index 2fb1812..0000000 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.merger; - -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.cube.CubeUpdate; -import org.apache.kylin.cube.model.CubeBuildTypeEnum; -import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; -import org.apache.kylin.job.execution.AbstractExecutable; -import org.apache.kylin.metadata.MetadataConstants; -import org.apache.kylin.metadata.model.IStorageAware; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.apache.kylin.metadata.model.Segments; - -import com.google.common.collect.Lists; - -public class AfterMergeOrRefreshResourceMerger extends MetadataMerger { - - public AfterMergeOrRefreshResourceMerger(KylinConfig config) { - super(config); - } - - @Override - public void merge(String cubeId, String segmentId, ResourceStore remoteResourceStore, String jobType) { - - CubeManager cubeManager = CubeManager.getInstance(getConfig()); - CubeInstance cubeInstance = cubeManager.getCubeByUuid(cubeId); - CubeUpdate update = new CubeUpdate(cubeInstance.latestCopyForWrite()); - - CubeManager distManager = CubeManager.getInstance(remoteResourceStore.getConfig()); - CubeInstance distCube = distManager.getCubeByUuid(cubeId).latestCopyForWrite(); - - List<CubeSegment> toUpdateSegments = Lists.newArrayList(); - - CubeSegment mergedSegment = distCube.getSegmentById(segmentId); - mergedSegment.setStatus(SegmentStatusEnum.READY); - Map<String, String> additionalInfo = mergedSegment.getAdditionalInfo(); - additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET); - mergedSegment.setAdditionalInfo(additionalInfo); - toUpdateSegments.add(mergedSegment); - - List<CubeSegment> toRemoveSegments = getToRemoveSegs(distCube, mergedSegment); - Collections.sort(toRemoveSegments); - makeSnapshotForNewSegment(mergedSegment, toRemoveSegments); - - if (String.valueOf(CubeBuildTypeEnum.MERGE).equals(jobType)) { - Optional<Long> reduce = toRemoveSegments.stream().map(CubeSegment::getSizeKB).filter(size -> size != -1) - .reduce(Long::sum); - Optional<Long> inputRecords = toRemoveSegments.stream().map(CubeSegment::getInputRecords).filter(records -> records != -1) - .reduce(Long::sum); - if (reduce.isPresent()) { - long totalSourceSize = reduce.get(); - mergedSegment.setSizeKB(totalSourceSize); - mergedSegment.setInputRecords(inputRecords.get()); - mergedSegment.setLastBuildTime(System.currentTimeMillis()); - } - } - - update.setToRemoveSegs(toRemoveSegments.toArray(new CubeSegment[0])); - update.setToUpdateSegs(toUpdateSegments.toArray(new CubeSegment[0])); - - try { - cubeManager.updateCube(update); - } catch (IOException e) { - e.printStackTrace(); - } - - } - - List<CubeSegment> getToRemoveSegs(CubeInstance cube, CubeSegment mergedSegment) { - Segments tobe = cube.calculateToBeSegments(mergedSegment); - - if (!tobe.contains(mergedSegment)) - throw new IllegalStateException( - "For Cube " + cube + ", segment " + mergedSegment + " is expected but not in the tobe " + tobe); - - if (mergedSegment.getStatus() == SegmentStatusEnum.NEW) - mergedSegment.setStatus(SegmentStatusEnum.READY); - - List<CubeSegment> toRemoveSegs = Lists.newArrayList(); - for (CubeSegment s : cube.getSegments()) { - if (!tobe.contains(s)) - toRemoveSegs.add(s); - } - - return toRemoveSegs; - } - - private void makeSnapshotForNewSegment(CubeSegment newSeg, List<CubeSegment> mergingSegments) { - CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1); - for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) { - newSeg.putSnapshotResPath(entry.getKey(), entry.getValue()); - } - } - - @Override - public void merge(AbstractExecutable abstractExecutable) { - String buildStepUrl = abstractExecutable.getParam(MetadataConstants.P_OUTPUT_META_URL); - KylinConfig buildConfig = KylinConfig.createKylinConfig(this.getConfig()); - buildConfig.setMetadataUrl(buildStepUrl); - ResourceStore resourceStore = ResourceStore.getStore(buildConfig); - String cubeId = abstractExecutable.getParam(MetadataConstants.P_CUBE_ID); - String segmentId = abstractExecutable.getParam(CubingExecutableUtil.SEGMENT_ID); - merge(cubeId, segmentId, resourceStore, abstractExecutable.getParam(MetadataConstants.P_JOB_TYPE)); - } - -} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java deleted file mode 100644 index b5e2d9e..0000000 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.engine.spark.merger; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.job.execution.AbstractExecutable; - -public abstract class MetadataMerger { - private final KylinConfig config; - - protected MetadataMerger(KylinConfig config) { - this.config = config; - } - - public KylinConfig getConfig() { - return config; - } - - public abstract void merge(String cubeId, String segmentIds, ResourceStore remoteResourceStore, String jobType); - - public abstract void merge(AbstractExecutable abstractExecutable); - -} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java new file mode 100644 index 0000000..ab89f44 --- /dev/null +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.spark.utils; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; +import org.apache.kylin.cube.model.CubeBuildTypeEnum; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.engine.spark.job.NSparkExecutable; +import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.realization.RealizationStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class UpdateMetadataUtil { + + protected static final Logger logger = LoggerFactory.getLogger(UpdateMetadataUtil.class); + + public static void syncLocalMetadataToRemote(KylinConfig config, + NSparkExecutable nsparkExecutable) throws IOException { + String cubeId = nsparkExecutable.getParam(MetadataConstants.P_CUBE_ID); + Set<String> segmentIds = Sets.newHashSet(StringUtils.split( + nsparkExecutable.getParam(CubingExecutableUtil.SEGMENT_ID), " ")); + String segmentId = segmentIds.iterator().next(); + String remoteResourceStore = nsparkExecutable.getDistMetaUrl(); + String jobType = nsparkExecutable.getParam(MetadataConstants.P_JOB_TYPE); + + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance currentInstanceCopy = cubeManager.getCubeByUuid(cubeId).latestCopyForWrite(); + + // load the meta from local meta path of this job + KylinConfig kylinDistConfig = MetaDumpUtil.loadKylinConfigFromHdfs(remoteResourceStore); + CubeInstance distCube = CubeManager.getInstance(kylinDistConfig).getCubeByUuid(cubeId); + CubeSegment toUpdateSegs = distCube.getSegmentById(segmentId); + + List<CubeSegment> tobeSegments = currentInstanceCopy.calculateToBeSegments(toUpdateSegs); + if (!tobeSegments.contains(toUpdateSegs)) + throw new IllegalStateException( + String.format(Locale.ROOT, "For cube %s, segment %s is expected but not in the tobe %s", + currentInstanceCopy.toString(), toUpdateSegs.toString(), tobeSegments.toString())); + + CubeUpdate update = new CubeUpdate(currentInstanceCopy); + List<CubeSegment> toRemoveSegs = Lists.newArrayList(); + + if (String.valueOf(CubeBuildTypeEnum.MERGE).equals(jobType)) { + toUpdateSegs.getSnapshots().clear(); + // update the snapshot table path + for (Map.Entry<String, String> entry : + currentInstanceCopy.getLatestReadySegment().getSnapshots().entrySet()) { + toUpdateSegs.putSnapshotResPath(entry.getKey(), entry.getValue()); + } + } else { + toUpdateSegs.setStatus(SegmentStatusEnum.READY); + for (CubeSegment segment : currentInstanceCopy.getSegments()) { + if (!tobeSegments.contains(segment)) + toRemoveSegs.add(segment); + } + Collections.sort(toRemoveSegs); + if (currentInstanceCopy.getConfig().isJobAutoReadyCubeEnabled()) { + update.setStatus(RealizationStatusEnum.READY); + } + } + + logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, toRemoveSegs); + + toUpdateSegs.setLastBuildTime(System.currentTimeMillis()); + update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0])) + .setToUpdateSegs(toUpdateSegs); + cubeManager.updateCube(update); + } + + public static void updateMetadataAfterMerge(String cubeId, String segmentId, + KylinConfig config) throws IOException { + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance currentInstanceCopy = cubeManager.getCubeByUuid(cubeId).latestCopyForWrite(); + + CubeSegment toUpdateSegs = currentInstanceCopy.getSegmentById(segmentId); + + List<CubeSegment> tobeSegments = currentInstanceCopy.calculateToBeSegments(toUpdateSegs); + if (!tobeSegments.contains(toUpdateSegs)) + throw new IllegalStateException( + String.format(Locale.ROOT, "For cube %s, segment %s is expected but not in the tobe %s", + currentInstanceCopy.toString(), toUpdateSegs.toString(), tobeSegments.toString())); + + CubeUpdate update = new CubeUpdate(currentInstanceCopy); + List<CubeSegment> toRemoveSegs = Lists.newArrayList(); + + toUpdateSegs.setStatus(SegmentStatusEnum.READY); + for (CubeSegment segment : currentInstanceCopy.getSegments()) { + if (!tobeSegments.contains(segment)) + toRemoveSegs.add(segment); + } + Collections.sort(toRemoveSegs); + if (currentInstanceCopy.getConfig().isJobAutoReadyCubeEnabled()) { + update.setStatus(RealizationStatusEnum.READY); + } + + logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, toRemoveSegs); + + toUpdateSegs.setLastBuildTime(System.currentTimeMillis()); + update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0])) + .setToUpdateSegs(toUpdateSegs); + cubeManager.updateCube(update); + } +} diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java index 2772118..af46217 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.cube.CubeUpdate; import org.apache.kylin.engine.spark.metadata.SegmentInfo; import org.apache.kylin.engine.spark.metadata.cube.ManagerHub; import org.apache.kylin.engine.spark.metadata.cube.PathManager; @@ -34,8 +35,8 @@ import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree; import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity; import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree; import org.apache.kylin.metadata.MetadataConstants; +import org.apache.kylin.metadata.model.IStorageAware; import org.apache.kylin.storage.StorageFactory; -import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -57,9 +58,12 @@ import scala.collection.JavaConversions; public class CubeMergeJob extends SparkApplication { protected static final Logger logger = LoggerFactory.getLogger(CubeMergeJob.class); + private BuildLayoutWithUpdate buildLayoutWithUpdate; + private Map<Long, CubeMergeAssist> mergeCuboidsAssist; private List<CubeSegment> mergingSegments = Lists.newArrayList(); private List<SegmentInfo> mergingSegInfos = Lists.newArrayList(); + private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap(); @Override protected void doExecute() throws Exception { @@ -74,8 +78,10 @@ public class CubeMergeJob extends SparkApplication { SegmentInfo segInfo = ManagerHub.getSegmentInfo(config, getParam(MetadataConstants.P_CUBE_ID), segment.getUuid()); mergingSegInfos.add(segInfo); } - //merge and save segments + // merge segments mergeSegments(cubeId, newSegmentId); + // update segment + updateSegmentInfo(cubeId, newSegmentId); } private void mergeSegments(String cubeId, String segmentId) throws IOException { @@ -84,7 +90,7 @@ public class CubeMergeJob extends SparkApplication { CubeSegment mergedSeg = cube.getSegmentById(segmentId); SegmentInfo mergedSegInfo = ManagerHub.getSegmentInfo(config, getParam(MetadataConstants.P_CUBE_ID), mergedSeg.getUuid()); - Map<Long, CubeMergeAssist> mergeCuboidsAssist = generateMergeAssist(mergingSegInfos, ss); + mergeCuboidsAssist = generateMergeAssist(mergingSegInfos, ss); for (CubeMergeAssist assist : mergeCuboidsAssist.values()) { SpanningTree spanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(mergedSegInfo.toBuildLayouts())); Dataset<Row> afterMerge = assist.merge(config, cube.getName()); @@ -178,6 +184,7 @@ public class CubeMergeJob extends SparkApplication { int partitionNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, config, ss); layout.setShardNum(partitionNum); + cuboidShardNum.put(layoutId, (short)partitionNum); ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null); ss.sparkContext().setJobDescription(null); QueryExecutionCache.removeQueryExecution(queryExecutionId); @@ -187,6 +194,37 @@ public class CubeMergeJob extends SparkApplication { return layout; } + private void updateSegmentInfo(String cubeId, String segmentId) throws IOException { + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cubeCopy = cubeManager.getCubeByUuid(cubeId).latestCopyForWrite(); + CubeUpdate update = new CubeUpdate(cubeCopy); + + List<CubeSegment> cubeSegments = Lists.newArrayList(); + CubeSegment segment = cubeCopy.getSegmentById(segmentId); + long totalSourceSize = 0l; + long totalInputRecords = 0l; + long totalInputRecordsSize = 0l; + for (CubeMergeAssist assist : mergeCuboidsAssist.values()) { + totalSourceSize += assist.getLayout().getByteSize(); + } + for (CubeSegment toRemoveSeg : mergingSegments) { + totalInputRecords += toRemoveSeg.getInputRecords(); + totalInputRecordsSize += toRemoveSeg.getInputRecordsSize(); + } + // Unit: KB + segment.setSizeKB(totalSourceSize / 1024); + segment.setInputRecords(totalInputRecords); + segment.setInputRecordsSize(totalInputRecordsSize); + segment.setLastBuildJobID(getParam(MetadataConstants.P_JOB_ID)); + segment.setCuboidShardNums(cuboidShardNum); + Map<String, String> additionalInfo = segment.getAdditionalInfo(); + additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET); + segment.setAdditionalInfo(additionalInfo); + cubeSegments.add(segment); + update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0])); + cubeManager.updateCube(update); + } + @Override protected String generateInfo() { return LogJobInfoUtils.dfMergeJobInfo(); diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java index 2d26105..1d29074 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java @@ -203,11 +203,13 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment, "ADMIN"); execMgr.addJob(mergeJob); ExecutableState result = wait(mergeJob); - Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergeSegment); - for (CubeSegment segment : mergingSegments) { - String path = PathManager.getSegmentParquetStoragePath(cube, segment.getName(), - segment.getStorageLocationIdentifier()); - Assert.assertFalse(HadoopUtil.getFileSystem(path).exists(new Path(HadoopUtil.makeURI(path)))); + if (config.cleanStorageAfterDelOperation()) { + Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergeSegment); + for (CubeSegment segment : mergingSegments) { + String path = PathManager.getSegmentParquetStoragePath(cube, segment.getName(), + segment.getStorageLocationIdentifier()); + Assert.assertFalse(HadoopUtil.getFileSystem(path).exists(new Path(HadoopUtil.makeURI(path)))); + } } checkJobTmpPathDeleted(config, mergeJob); return result;