This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 84af3394b95a38765eefa977f1a1077881bed0da Author: Yinghao Lin <yinghao....@kyligence.io> AuthorDate: Thu Mar 2 00:04:01 2023 +0800 KYLIN-5527 [FOLLOWUP] Rename working layouts to effective layouts --- .../kylin/metadata/cube/model/NDataLayout.java | 4 ++-- .../kylin/metadata/cube/model/NDataSegDetails.java | 16 ++++++------- .../kylin/metadata/cube/model/NDataSegment.java | 28 +++++++++++----------- .../metadata/cube/model/SegmentPartition.java | 2 +- .../org/apache/kylin/metadata/model/ISegment.java | 2 +- .../kylin/rest/response/NDataSegmentResponse.java | 4 ++-- .../apache/kylin/rest/service/ModelService.java | 2 +- .../builder/PartitionDictionaryBuilderHelper.java | 2 +- .../spark/merger/AfterBuildResourceMerger.java | 2 +- .../merger/AfterMergeOrRefreshResourceMerger.java | 4 ++-- .../spark/builder/DictionaryBuilderHelper.java | 2 +- .../apache/kylin/engine/spark/job/DFMergeJob.java | 2 +- .../spark/job/stage/build/GenerateFlatTable.scala | 2 +- .../engine/spark/job/stage/merge/MergeStage.scala | 2 +- .../merge/partition/PartitionMergeStage.scala | 2 +- 15 files changed, 38 insertions(+), 38 deletions(-) diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java index 08a99459e1..5055613787 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataLayout.java @@ -59,7 +59,7 @@ public class NDataLayout implements Serializable { return r; } - public static boolean filterWorkingLayout(NDataLayout layout) { + public static boolean filterEffectiveLayout(NDataLayout layout) { if (layout == null) { return false; } @@ -277,7 +277,7 @@ public class NDataLayout implements Serializable { if (segDetails == null || !segDetails.isCachedAndShared()) return false; - for (NDataLayout cached : segDetails.getWorkingLayouts()) { + for (NDataLayout cached : segDetails.getEffectiveLayouts()) { if (cached == this) return true; } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java index e9866f3509..f66dd54c9c 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegDetails.java @@ -115,7 +115,7 @@ public class NDataSegDetails extends RootPersistentEntity implements Serializabl public long getTotalRowCount() { long count = 0L; - for (NDataLayout cuboid : getWorkingLayouts()) { + for (NDataLayout cuboid : getEffectiveLayouts()) { count += cuboid.getRows(); } return count; @@ -123,16 +123,16 @@ public class NDataSegDetails extends RootPersistentEntity implements Serializabl /** * @deprecated Deprecated because of non-working layouts were added. - * <p>Use {@link NDataSegDetails#getWorkingLayouts} or {@link NDataSegDetails#getAllLayouts} instead. + * <p>Use {@link NDataSegDetails#getEffectiveLayouts} or {@link NDataSegDetails#getAllLayouts} instead. */ @Deprecated public List<NDataLayout> getLayouts() { return getAllLayouts(); } - public List<NDataLayout> getWorkingLayouts() { - List<NDataLayout> workingLayouts = getLayouts0(false); - return isCachedAndShared() ? ImmutableList.copyOf(workingLayouts) : workingLayouts; + public List<NDataLayout> getEffectiveLayouts() { + List<NDataLayout> effectiveLayouts = getLayouts0(false); + return isCachedAndShared() ? ImmutableList.copyOf(effectiveLayouts) : effectiveLayouts; } public List<NDataLayout> getAllLayouts() { @@ -140,11 +140,11 @@ public class NDataSegDetails extends RootPersistentEntity implements Serializabl return isCachedAndShared() ? ImmutableList.copyOf(allLayouts) : allLayouts; } - private List<NDataLayout> getLayouts0(boolean includingNonWorkingLayouts) { - if (includingNonWorkingLayouts) { + private List<NDataLayout> getLayouts0(boolean includingNonEffectiveLayouts) { + if (includingNonEffectiveLayouts) { return layouts; } - return layouts.stream().filter(NDataLayout::filterWorkingLayout).collect(Collectors.toList()); + return layouts.stream().filter(NDataLayout::filterEffectiveLayout).collect(Collectors.toList()); } public NDataLayout getLayoutById(long layoutId) { diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java index 00c2d105a7..3ce4790ad6 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/NDataSegment.java @@ -276,9 +276,9 @@ public class NDataSegment implements ISegment, Serializable { } @Override - public int getWorkingLayoutSize() { + public int getEffectiveLayoutSize() { return (int) getLayoutInfo().getLayoutsMap().values().stream() - .filter(NDataLayout::filterWorkingLayout).count(); + .filter(NDataLayout::filterEffectiveLayout).count(); } public NDataLayout getLayout(long layoutId) { @@ -332,7 +332,7 @@ public class NDataSegment implements ISegment, Serializable { private NDataSegDetails segDetails; // not required by spark cubing private Map<Long, NDataLayout> allLayoutsMap = Collections.emptyMap(); - private Map<Long, NDataLayout> workingLayoutsMap = Collections.emptyMap(); + private Map<Long, NDataLayout> effectiveLayoutsMap = Collections.emptyMap(); /** * for each layout, partition id -> bucket id */ @@ -348,8 +348,8 @@ public class NDataSegment implements ISegment, Serializable { public LayoutInfo(Map<Long, NDataLayout> layoutsMap) { this.allLayoutsMap = layoutsMap; - this.workingLayoutsMap = layoutsMap.entrySet().stream() - .filter(entry -> NDataLayout.filterWorkingLayout(entry.getValue())) + this.effectiveLayoutsMap = layoutsMap.entrySet().stream() + .filter(entry -> NDataLayout.filterEffectiveLayout(entry.getValue())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } @@ -373,11 +373,11 @@ public class NDataSegment implements ISegment, Serializable { segDetails.setCachedAndShared(dataflow.isCachedAndShared()); List<NDataLayout> allLayouts = segDetails.getAllLayouts(); allLayoutsMap = Maps.newHashMap(); - workingLayoutsMap = Maps.newHashMap(); + effectiveLayoutsMap = Maps.newHashMap(); for (NDataLayout layout : allLayouts) { allLayoutsMap.put(layout.getLayoutId(), layout); - if (NDataLayout.filterWorkingLayout(layout)) { - workingLayoutsMap.put(layout.getLayoutId(), layout); + if (NDataLayout.filterEffectiveLayout(layout)) { + effectiveLayoutsMap.put(layout.getLayoutId(), layout); Map<Long, Long> cuboidBucketMap = Maps.newHashMap(); layout.getMultiPartition().forEach(dataPartition -> cuboidBucketMap.put(dataPartition.getPartitionId(), dataPartition.getBucketId())); @@ -387,15 +387,15 @@ public class NDataSegment implements ISegment, Serializable { } public int getLayoutSize() { - return workingLayoutsMap.size(); + return effectiveLayoutsMap.size(); } public NDataLayout getLayout(long layoutId) { - return workingLayoutsMap.get(layoutId); + return effectiveLayoutsMap.get(layoutId); } public Map<Long, NDataLayout> getLayoutsMap() { - return workingLayoutsMap; + return effectiveLayoutsMap; } public Map<Long, NDataLayout> getAllLayoutsMap() { @@ -403,7 +403,7 @@ public class NDataSegment implements ISegment, Serializable { } public Set<Long> getLayoutIds() { - return workingLayoutsMap.keySet(); + return effectiveLayoutsMap.keySet(); } public List<Long> getMultiPartitionIds() { @@ -419,8 +419,8 @@ public class NDataSegment implements ISegment, Serializable { } public boolean isAlreadyBuilt(long layoutId) { - if (Objects.nonNull(workingLayoutsMap) && workingLayoutsMap.containsKey(layoutId)) { - return workingLayoutsMap.get(layoutId).isReady(); + if (Objects.nonNull(effectiveLayoutsMap) && effectiveLayoutsMap.containsKey(layoutId)) { + return effectiveLayoutsMap.get(layoutId).isReady(); } return false; } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java index 0b4b8ca873..3cd6dd051b 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/SegmentPartition.java @@ -142,7 +142,7 @@ public class SegmentPartition implements Serializable { return 0; } storageSize = dataSegment.getSegDetails() // - .getWorkingLayouts().stream() // + .getEffectiveLayouts().stream() // .flatMap(layout -> layout.getMultiPartition().stream()) // .filter(partition -> partition.getPartitionId() == partitionId) // .mapToLong(LayoutPartition::getByteSize).sum(); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java index 146eead0f4..34d3dcfa44 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISegment.java @@ -43,7 +43,7 @@ public interface ISegment extends Comparable<ISegment> { public int getLayoutSize(); - public int getWorkingLayoutSize(); + public int getEffectiveLayoutSize(); public NDataModel getModel(); diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java index 8af94f0868..9edd9fbba2 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/response/NDataSegmentResponse.java @@ -120,14 +120,14 @@ public class NDataSegmentResponse extends NDataSegment { startTime = Long.parseLong(getSegRange().getStart().toString()); endTime = Long.parseLong(getSegRange().getEnd().toString()); storage = bytesSize; - indexCount = segment.getWorkingLayoutSize(); + indexCount = segment.getEffectiveLayoutSize(); indexCountTotal = segment.getIndexPlan().getAllLayoutsSize(true); multiPartitionCount = segment.getMultiPartitions().size(); hasBaseAggIndex = segment.getIndexPlan().containBaseAggLayout(); hasBaseTableIndex = segment.getIndexPlan().containBaseTableLayout(); if (segment.getIndexPlan().getBaseTableLayout() != null) { val indexPlan = segment.getDataflow().getIndexPlan(); - long segmentFileCount = segment.getSegDetails().getWorkingLayouts().stream() + long segmentFileCount = segment.getSegDetails().getEffectiveLayouts().stream() .filter(layout -> indexPlan.getLayoutEntity(layout.getLayoutId()) != null && indexPlan.getLayoutEntity(layout.getLayoutId()).isBaseIndex()) .mapToLong(NDataLayout::getFileCount).sum(); diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java index a7b83f9ceb..7c077551db 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java @@ -1191,7 +1191,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp boolean allToComplement, Set<Long> allIndexWithoutTobeDel, NDataSegment segment) { if (allToComplement) { // find seg that does not have all indexes(don't include tobeDeleted) - val segLayoutIds = segment.getSegDetails().getWorkingLayouts().stream().map(NDataLayout::getLayoutId) + val segLayoutIds = segment.getSegDetails().getEffectiveLayouts().stream().map(NDataLayout::getLayoutId) .collect(Collectors.toSet()); return !Sets.difference(allIndexWithoutTobeDel, segLayoutIds).isEmpty(); } diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java index 2d376b51bc..2fa95ccba8 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/builder/PartitionDictionaryBuilderHelper.java @@ -49,7 +49,7 @@ public class PartitionDictionaryBuilderHelper extends DictionaryBuilderHelper { .filter(partition -> !partition.getStatus().equals(PartitionStatusEnum.READY)) .collect(Collectors.toSet()); if (CollectionUtils.isEmpty(newPartitions)) { - for (NDataLayout cuboid : seg.getSegDetails().getWorkingLayouts()) { + for (NDataLayout cuboid : seg.getSegDetails().getEffectiveLayouts()) { buildedLayouts.add(cuboid.getLayout()); } } diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java index d5efe0722a..66960d62f2 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterBuildResourceMerger.java @@ -110,7 +110,7 @@ public class AfterBuildResourceMerger extends SparkJobMetadataMerger { theSeg.setStatus(SegmentStatusEnum.READY); dfUpdate.setToUpdateSegs(theSeg); dfUpdate.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[toRemoveSegments.size()])); - dfUpdate.setToAddOrUpdateLayouts(theSeg.getSegDetails().getWorkingLayouts().toArray(new NDataLayout[0])); + dfUpdate.setToAddOrUpdateLayouts(theSeg.getSegDetails().getEffectiveLayouts().toArray(new NDataLayout[0])); localDataflowManager.updateDataflow(dfUpdate); updateIndexPlan(flowName, remoteStore); diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java index 8aa4073ae7..52943ecd2e 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java @@ -83,7 +83,7 @@ public class AfterMergeOrRefreshResourceMerger extends SparkJobMetadataMerger { partition.setLastBuildTime(lastBuildTime); }); mergedSegment.setLastBuildTime(lastBuildTime); - toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getWorkingLayouts())); + toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getEffectiveLayouts())); } else { mergedSegment = upsertSegmentPartition(localSegment, remoteSegment, partitions); for (val segId : segmentIds) { @@ -158,7 +158,7 @@ public class AfterMergeOrRefreshResourceMerger extends SparkJobMetadataMerger { mergedSegment.setStatus(SegmentStatusEnum.WARNING); } } - toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getWorkingLayouts())); + toUpdateCuboids.addAll(new ArrayList<>(mergedSegment.getSegDetails().getEffectiveLayouts())); update.setToAddOrUpdateLayouts(toUpdateCuboids.toArray(new NDataLayout[0])); update.setToRemoveSegs(toRemoveSegments.toArray(new NDataSegment[0])); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java index 698de6d745..3c3a5357fe 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/DictionaryBuilderHelper.java @@ -138,7 +138,7 @@ public class DictionaryBuilderHelper { List<LayoutEntity> buildedLayouts = Lists.newArrayList(); if (seg.getSegDetails() != null) { - for (NDataLayout cuboid : seg.getSegDetails().getWorkingLayouts()) { + for (NDataLayout cuboid : seg.getSegDetails().getEffectiveLayouts()) { buildedLayouts.add(cuboid.getLayout()); } } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java index 7109e0a297..2e49d3f33b 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/DFMergeJob.java @@ -73,7 +73,7 @@ public class DFMergeJob extends SparkApplication { // collect layouts need to merge Map<Long, DFLayoutMergeAssist> mergeCuboidsAssist = Maps.newConcurrentMap(); for (NDataSegment seg : mergingSegments) { - for (NDataLayout cuboid : seg.getSegDetails().getWorkingLayouts()) { + for (NDataLayout cuboid : seg.getSegDetails().getEffectiveLayouts()) { long layoutId = cuboid.getLayoutId(); DFLayoutMergeAssist assist = mergeCuboidsAssist.get(layoutId); diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala index 823079a59b..d1db397aba 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/build/GenerateFlatTable.scala @@ -91,7 +91,7 @@ class GenerateFlatTable(jobContext: SegmentJob, dataSegment: NDataSegment, build * @return <tt>true</tt> if data count check pass, <tt>false</tt> otherwise */ private def checkDataCount(): Boolean = { - val layouts = dataSegment.getSegDetails.getWorkingLayouts.asScala.map(lay => jobContext.getIndexPlan.getLayoutEntity(lay.getLayoutId)) + val layouts = dataSegment.getSegDetails.getEffectiveLayouts.asScala.map(lay => jobContext.getIndexPlan.getLayoutEntity(lay.getLayoutId)) val tasks = layouts.map(layout => new DataCountCheckTask(layout, StorageStoreUtils.toDF(dataSegment, layout, sparkSession))) val resultsQueue = Queues.newLinkedBlockingQueue[DataCountCheckResult]() diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala index 54aebff88c..0b19eec92e 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/MergeStage.scala @@ -78,7 +78,7 @@ abstract class MergeStage(private val jobContext: SegmentJob, // Cleanup previous potentially left temp layout data. cleanupLayoutTempData(dataSegment, jobContext.getReadOnlyLayouts.asScala.toSeq) - val tasks = unmerged.flatMap(segment => segment.getSegDetails.getWorkingLayouts.asScala) // + val tasks = unmerged.flatMap(segment => segment.getSegDetails.getEffectiveLayouts.asScala) // .groupBy(_.getLayoutId).values.map(LayoutMergeTask) slowStartExec(tasks.iterator, mergeLayout) } diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala index b932331a8a..52b11c304c 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/stage/merge/partition/PartitionMergeStage.scala @@ -50,7 +50,7 @@ abstract class PartitionMergeStage(private val jobContext: SegmentJob, override protected def mergeIndices(): Unit = { val tasks = unmerged.flatMap(segment => - segment.getSegDetails.getWorkingLayouts.asScala.flatMap(layout => + segment.getSegDetails.getEffectiveLayouts.asScala.flatMap(layout => layout.getMultiPartition.asScala.map(partition => (layout, partition)) )).groupBy(tp => (tp._1.getLayoutId, tp._2.getPartitionId)).values.map(PartitionMergeTask) slowStartExec(tasks.iterator, mergePartition)