This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 60da9e0ce1fa789ad580ca62ae4d47162f9d3201 Author: Yinghao Lin <39019287+yhca...@users.noreply.github.com> AuthorDate: Fri Jul 21 21:09:31 2023 +0800 KYLIN-5768 Refine and adjust some code for localCache --------- Co-authored-by: sibingzhang <74443791+sibingzh...@users.noreply.github.com> Co-authored-by: sibing.zhang <sibing.zh...@qq.com> --- pom.xml | 12 +- .../rest/controller/NBasicControllerTest.java | 2 +- .../apache/kylin/rest/service/ProjectService.java | 9 +- .../org/apache/kylin/common/KylinConfigBase.java | 8 ++ .../kylin/metadata/cube/model/IndexPlan.java | 136 +++++++++++++++------ .../cube/optimization/AbstractOptStrategy.java | 4 +- .../cube/optimization/GarbageLayoutType.java | 2 +- .../optimization/IncludedLayoutOptStrategy.java | 2 +- .../metadata/cube/optimization/IndexOptimizer.java | 14 ++- .../cube/optimization/IndexOptimizerFactory.java | 12 +- .../optimization/LowFreqLayoutOptStrategy.java | 2 +- .../cube/optimization/MergedLayoutOptStrategy.java | 59 +++++++++ .../optimization/SimilarLayoutOptStrategy.java | 2 +- .../cube/optimization/event/ApproveRecsEvent.java} | 27 ++-- .../cube/optimization/event/BuildIndexEvent.java} | 26 ++-- .../cube/storage/GarbageStorageCollector.java | 2 +- .../metadata/cube/utils/IndexPlanReduceUtil.java | 52 +++++++- .../storage/ProjectStorageInfoCollectorTest.java | 74 +++++++++-- .../cube/utils/IndexPlanReduceUtilTest.java | 27 +++- .../kylin/rest/initialize/BuildAppInitializer.java | 5 + .../kylin/rest/service/ModelBuildService.java | 38 ++++-- .../kylin/rest/service/ModelServiceBuildTest.java | 36 +++++- .../kylin/rest/controller/NProjectController.java | 14 ++- .../rest/controller/NProjectControllerTest.java | 2 +- .../kylin/rest/service/AbstractModelService.java | 114 ++++++++++++++++- .../kylin/rest/service/BaseIndexUpdateHelper.java | 17 ++- .../kylin/rest/service/IndexPlanService.java | 66 +++++++--- .../apache/kylin/rest/service/ModelService.java | 33 +++-- .../kylin/rest/service/IndexPlanServiceTest.java | 52 ++++---- .../kylin/rest/service/ProjectServiceTest.java | 2 +- .../plugin/diagnose/DiagnoseExecutorPlugin.scala | 9 +- .../common/logging/AbstractHdfsLogAppender.java | 7 +- .../common/logging/SparkDriverHdfsLogAppender.java | 2 +- .../logging/SparkExecutorHdfsLogAppender.java | 4 +- .../org/apache/spark/utils/SparkHadoopUtils.scala | 32 ++++- .../org/apache/kylin/helper/RoutineToolHelper.java | 8 +- .../kylin/tool/garbage/ExecutableCleaner.java | 10 ++ .../apache/kylin/tool/garbage/GarbageCleaner.java | 11 +- .../apache/kylin/tool/garbage/IndexCleaner.java | 129 ++++++++++++++++++- .../apache/kylin/tool/garbage/MetadataCleaner.java | 6 + .../apache/kylin/tool/garbage/SnapshotCleaner.java | 10 ++ 41 files changed, 888 insertions(+), 191 deletions(-) diff --git a/pom.xml b/pom.xml index 9b68f4a83d..ff1f965ea3 100644 --- a/pom.xml +++ b/pom.xml @@ -258,7 +258,11 @@ **/org/apache/kylin/tool/hadoop/KapGetPathWithoutSchemeAndAuthorityCLI.java, **/org/apache/kylin/engine/spark/streaming/**/*, **/org/apache/kylin/rest/security/KerberosLoginManager.java, - **/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java + **/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java, + **/org/apache/kylin/tool/garbage/IndexCleaner.java, + **/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java, + **/org/apache/kylin/rest/service/AbstractModelService.java, + **/org/apache/spark/utils/SparkHadoopUtils.scala </sonar.jacoco.excludes> <sonar.coverage.exclusions> **/org/apache/kylin/**/*Exception.java, @@ -304,7 +308,11 @@ **/org/apache/kylin/rest/util/InitResourceGroupUtils.java, **/org/apache/kylin/rest/QueryNodeFilter.java, **/org/apache/kylin/rest/config/SwaggerConfig.java, - **/org/apache/kylin/rest/config/SwaggerCompatibilityConfig.java + **/org/apache/kylin/rest/config/SwaggerCompatibilityConfig.java, + **/org/apache/kylin/tool/garbage/IndexCleaner.java, + **/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java, + **/org/apache/kylin/rest/service/AbstractModelService.java, + **/org/apache/spark/utils/SparkHadoopUtils.scala </sonar.coverage.exclusions> <sonar.organization>kylin</sonar.organization> <!--suppress UnresolvedMavenProperty --> diff --git a/src/common-server/src/test/java/org/apache/kylin/rest/controller/NBasicControllerTest.java b/src/common-server/src/test/java/org/apache/kylin/rest/controller/NBasicControllerTest.java index 34ac02cab2..586fcc62b6 100644 --- a/src/common-server/src/test/java/org/apache/kylin/rest/controller/NBasicControllerTest.java +++ b/src/common-server/src/test/java/org/apache/kylin/rest/controller/NBasicControllerTest.java @@ -256,7 +256,7 @@ public class NBasicControllerTest extends NLocalFileMetadataTestCase { @Test public void testCheckParamLength() { thrown.expect(KylinException.class); - thrown.expectMessage(String.format(Message.getInstance().getParamTooLarge(), "tag", 1000)); + thrown.expectMessage(String.format(MsgPicker.getMsg().getParamTooLarge(), "tag", 1000)); List param = new ArrayList(); param.add(1); param.add(6); diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java index 416f577e0e..aafde56676 100644 --- a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java +++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java @@ -358,7 +358,10 @@ public class ProjectService extends BasicService { logger.info("Start to cleanup garbage for project<{}>", project.getName()); try { projectSmartService.cleanupGarbage(project.getName(), remainingTime); - GarbageCleaner.cleanMetadata(project.getName()); + boolean needAggressiveOpt = Arrays.stream(config.getProjectsAggressiveOptimizationIndex()) + .map(StringUtils::lowerCase).collect(Collectors.toList()) + .contains(StringUtils.toRootLowerCase(project.getName())); + GarbageCleaner.cleanMetadata(project.getName(), needAggressiveOpt); EventBusFactory.getInstance().callService(new ProjectCleanOldQueryResultEvent(project.getName())); } catch (Exception e) { logger.warn("clean project<" + project.getName() + "> failed", e); @@ -407,9 +410,9 @@ public class ProjectService extends BasicService { } @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#project, 'ADMINISTRATION')") - public void cleanupGarbage(String project) throws Exception { + public void cleanupGarbage(String project, boolean needAggressiveOpt) throws Exception { projectSmartService.cleanupGarbage(project, 0); - GarbageCleaner.cleanMetadata(project); + GarbageCleaner.cleanMetadata(project, needAggressiveOpt); asyncTaskService.cleanupStorage(); } diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 7f5f2fb9e3..f48c808cdc 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -4017,4 +4017,12 @@ public abstract class KylinConfigBase implements Serializable { String defaultValue = "org.apache.kylin.common.extension.KylinInfoExtension$Factory"; return getOptional("kylin.extension.info.factory", defaultValue); } + + public String[] getProjectsAggressiveOptimizationIndex() { + return getOptionalStringArray("kylin.index.projects-optimized-aggressively", new String[0]); + } + + public int getExpectedIndexSizeOptimized() { + return Integer.parseInt(getOptional("kylin.index.expected-size-after-optimization", "0")); + } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java index 22336c3bc4..e44fa1b838 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java @@ -36,6 +36,7 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -50,12 +51,26 @@ import org.apache.kylin.common.persistence.MissingRootPersistentEntity; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; +import org.apache.kylin.guava30.shaded.common.base.Joiner; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.guava30.shaded.common.collect.BiMap; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableSortedSet; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.project.ProjectInstance; @@ -64,18 +79,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonManagedReference; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; -import org.apache.kylin.guava30.shaded.common.base.Joiner; -import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.guava30.shaded.common.collect.BiMap; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableSortedSet; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Data; @@ -180,6 +184,11 @@ public class IndexPlan extends RootPersistentEntity implements Serializable, IEn */ private final List<String> errors = Lists.newLinkedList(); + @Setter + @Getter + @JsonProperty("base_agg_index_reduce_high_cardinality_dim") + private boolean baseAggIndexReduceHighCardinalityDim; + public void initAfterReload(KylinConfig config, String p) { this.project = p; initConfig4IndexPlan(config); @@ -866,6 +875,19 @@ public class IndexPlan extends RootPersistentEntity implements Serializable, IEn getRuleBaseLayouts().stream().map(LayoutEntity::getId).collect(Collectors.toSet()), layoutIds)); } + private boolean isHighCardinalityDim(NTableMetadataManager tableManager, TblColRef colRef) { + String tableIdentity = colRef.getTableRef().getTableIdentity(); + TableDesc tableDesc = tableManager.getTableDesc(tableIdentity); + TableExtDesc tableExtIfExists = tableManager.getTableExtIfExists(tableDesc); + TableExtDesc.ColumnStats columnStats = tableExtIfExists.getColumnStatsByName(colRef.getName()); + + if (Objects.isNull(columnStats)) { + return false; + } + + return (double) (columnStats.getCardinality()) / tableExtIfExists.getTotalRows() > 0.2; + } + private void removeLayouts(Collection<IndexEntity> indexes, Set<Long> layoutIds, boolean deleteAuto, boolean deleteManual) { checkIsNotCachedAndShared(); @@ -937,10 +959,24 @@ public class IndexPlan extends RootPersistentEntity implements Serializable, IEn } public LayoutEntity createBaseAggIndex(NDataModel model) { - List<Integer> dims = model.getEffectiveDimensions().keySet().asList(); + NTableMetadataManager tableManager = NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), + model.getProject()); + List<Integer> dims; + if (this.isBaseAggIndexReduceHighCardinalityDim()) { + List<Integer> list = new ArrayList<>(); + for (Integer dimId : model.getEffectiveDimensions().keySet()) { + TblColRef colRef = model.getColRef(dimId); + if (!isHighCardinalityDim(tableManager, colRef)) { + list.add(dimId); + } + } + dims = list; + } else { + dims = model.getEffectiveDimensions().keySet().asList(); + } List<Integer> measures = model.getEffectiveMeasures().keySet().asList(); List<Integer> colOrder = naturalOrderCombine(dims, measures); - return createLayout(colOrder, true); + return createLayout(colOrder, true, true, Lists.newArrayList()); } public LayoutEntity createBaseTableIndex() { @@ -954,16 +990,17 @@ public class IndexPlan extends RootPersistentEntity implements Serializable, IEn if (colOrder.isEmpty()) { return null; } - return createLayout(colOrder, false); + return createLayout(colOrder, false, true, Lists.newArrayList()); } - private LayoutEntity createLayout(List<Integer> colOrder, boolean isAgg) { - LayoutEntity newBaseLayout = new LayoutEntity(); - newBaseLayout.setColOrder(colOrder); - newBaseLayout.setUpdateTime(System.currentTimeMillis()); - newBaseLayout.setBase(true); - newBaseLayout.initalId(isAgg); - return newBaseLayout; + public LayoutEntity createLayout(List<Integer> colOrder, boolean isAgg, boolean isBase, List<Integer> shardByCols) { + LayoutEntity newLayout = new LayoutEntity(); + newLayout.setColOrder(colOrder); + newLayout.setUpdateTime(System.currentTimeMillis()); + newLayout.setBase(isBase); + newLayout.initalId(isAgg); + newLayout.setShardByColumns(shardByCols); + return newLayout; } private List<Integer> naturalOrderCombine(List<Integer> col1, List<Integer> col2) { @@ -1143,6 +1180,10 @@ public class IndexPlan extends RootPersistentEntity implements Serializable, IEn } public boolean add(LayoutEntity layout, boolean isAgg) { + return add(layout, isAgg, true); + } + + public boolean add(LayoutEntity layout, boolean isAgg, boolean needUpdateApprovedRecs) { val identifier = createIndexIdentifier(layout, isAgg); if (allIndexesMap.get(identifier) != null && allIndexesMap.get(identifier).getLayouts().contains(layout)) { return false; @@ -1176,11 +1217,20 @@ public class IndexPlan extends RootPersistentEntity implements Serializable, IEn indexEntity.getLayouts().add(layout); addedLayouts.add(layout.getId()); } - approvedAdditionalRecs += 1; + + if (needUpdateApprovedRecs) { + approvedAdditionalRecs += 1; + } + return true; } public boolean remove(LayoutEntity layout, boolean isAgg, boolean needAddBlackList) { + return remove(layout, isAgg, needAddBlackList, true); + } + + public boolean remove(LayoutEntity layout, boolean isAgg, boolean needAddBlackList, + boolean needUpdateApprovedRecs) { IndexEntity.IndexIdentifier identifier = createIndexIdentifier(layout, isAgg); if (allIndexesMap.containsKey(identifier) && allIndexesMap.get(identifier).getLayouts().contains(layout)) { IndexEntity indexEntity = allIndexesMap.get(identifier); @@ -1190,33 +1240,45 @@ public class IndexPlan extends RootPersistentEntity implements Serializable, IEn } if (layoutInIndexPlan.isManual()) { - if (isAgg && needAddBlackList) { - // For similar strategy only works on AggIndex, we need add this to black list. - indexPlan.addRuleBasedBlackList(Lists.newArrayList(layout.getId())); - if (layoutInIndexPlan.isAuto()) { - indexEntity.getLayouts().remove(layoutInIndexPlan); - whiteIndexesMap.values().stream().filter( - indexEntityInIndexPlan -> indexEntityInIndexPlan.getId() == indexEntity.getId()) - .findFirst().ifPresent(indexEntityInIndexPlan -> indexEntityInIndexPlan.getLayouts() - .remove(layoutInIndexPlan)); - } - this.approvedRemovalRecs += 1; - return true; - } - return false; + return removeManualPlan(layout, isAgg, needAddBlackList, needUpdateApprovedRecs, indexEntity, layoutInIndexPlan); } indexEntity.getLayouts().remove(layoutInIndexPlan); whiteIndexesMap.values().stream() .filter(indexEntityInIndexPlan -> indexEntityInIndexPlan.getId() == indexEntity.getId()) .findFirst().ifPresent(indexEntityInIndexPlan -> indexEntityInIndexPlan.getLayouts() .remove(layoutInIndexPlan)); - this.approvedRemovalRecs += 1; + + if (needUpdateApprovedRecs) { + this.approvedRemovalRecs += 1; + } return true; } else { return false; } } + private boolean removeManualPlan(LayoutEntity layout, boolean isAgg, boolean needAddBlackList, + boolean needUpdateApprovedRecs, IndexEntity indexEntity, + LayoutEntity layoutInIndexPlan) { + if (isAgg && needAddBlackList) { + // For similar strategy only works on AggIndex, we need add this to black list. + indexPlan.addRuleBasedBlackList(Lists.newArrayList(layout.getId())); + if (layoutInIndexPlan.isAuto()) { + indexEntity.getLayouts().remove(layoutInIndexPlan); + whiteIndexesMap.values().stream().filter( + indexEntityInIndexPlan -> indexEntityInIndexPlan.getId() == indexEntity.getId()) + .findFirst().ifPresent(indexEntityInIndexPlan -> indexEntityInIndexPlan.getLayouts() + .remove(layoutInIndexPlan)); + } + + if (needUpdateApprovedRecs) { + this.approvedRemovalRecs += 1; + } + return true; + } + return false; + } + public IndexPlan complete() { indexPlan.setIndexes(whiteIndexesMap.values().stream().sorted(Comparator.comparingLong(IndexEntity::getId)) .collect(Collectors.toList())); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/AbstractOptStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/AbstractOptStrategy.java index fc9c38b2c2..a977583f8f 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/AbstractOptStrategy.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/AbstractOptStrategy.java @@ -52,11 +52,11 @@ public abstract class AbstractOptStrategy { private List<LayoutEntity> beforeCollect(List<LayoutEntity> inputLayouts) { List<LayoutEntity> layoutsToHandle = Lists.newArrayList(inputLayouts); - skipOptimizeTableIndex(layoutsToHandle); + skipOptimizeIndex(layoutsToHandle); return layoutsToHandle; } - protected abstract void skipOptimizeTableIndex(List<LayoutEntity> inputLayouts); + protected abstract void skipOptimizeIndex(List<LayoutEntity> inputLayouts); private void afterCollect(List<LayoutEntity> inputLayouts, Set<Long> garbages) { inputLayouts.removeIf(layout -> garbages.contains(layout.getId())); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/GarbageLayoutType.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/GarbageLayoutType.java index 935bd57724..397da688b9 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/GarbageLayoutType.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/GarbageLayoutType.java @@ -19,5 +19,5 @@ package org.apache.kylin.metadata.cube.optimization; public enum GarbageLayoutType { - LOW_FREQUENCY, INCLUDED, SIMILAR + LOW_FREQUENCY, INCLUDED, SIMILAR, MERGED } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IncludedLayoutOptStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IncludedLayoutOptStrategy.java index c0b224a807..ffdf0dfbab 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IncludedLayoutOptStrategy.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IncludedLayoutOptStrategy.java @@ -58,7 +58,7 @@ public class IncludedLayoutOptStrategy extends AbstractOptStrategy { } @Override - protected void skipOptimizeTableIndex(List<LayoutEntity> inputLayouts) { + protected void skipOptimizeIndex(List<LayoutEntity> inputLayouts) { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); if (!kylinConfig.isIncludedStrategyConsiderTableIndex()) { inputLayouts.removeIf(layout -> IndexEntity.isTableIndex(layout.getId())); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizer.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizer.java index 8317b2592c..9a81ea0890 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizer.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizer.java @@ -25,13 +25,12 @@ import java.util.stream.Collectors; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.annotation.Clarification; -import org.apache.kylin.metadata.cube.model.LayoutEntity; -import org.apache.kylin.metadata.cube.model.NDataflow; -import org.apache.kylin.metadata.project.NProjectManager; - import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.metadata.cube.model.LayoutEntity; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.project.NProjectManager; import lombok.Getter; @@ -54,7 +53,9 @@ public class IndexOptimizer { protected List<LayoutEntity> filterAutoLayouts(NDataflow dataflow) { return dataflow.extractReadyLayouts().stream() // .filter(layout -> !layout.isManual() && layout.isAuto()) // - .filter(layout -> !layout.isBase()).collect(Collectors.toList()); + .filter(layout -> !layout.isBase()) // + .filter(layout -> !layout.isToBeDeleted()) // + .collect(Collectors.toList()); } protected List<LayoutEntity> filterManualLayouts(NDataflow dataflow) { @@ -70,8 +71,9 @@ public class IndexOptimizer { } Map<Long, GarbageLayoutType> garbageLayoutTypeMap = Maps.newHashMap(); + List<LayoutEntity> autoLayouts = filterAutoLayouts(dataflow); for (AbstractOptStrategy strategy : getStrategiesForAuto()) { - strategy.collectGarbageLayouts(filterAutoLayouts(dataflow), dataflow, needLog) + strategy.collectGarbageLayouts(autoLayouts, dataflow, needLog) .forEach(id -> garbageLayoutTypeMap.put(id, strategy.getType())); } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizerFactory.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizerFactory.java index 850b884420..b68f37180d 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizerFactory.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizerFactory.java @@ -20,9 +20,8 @@ package org.apache.kylin.metadata.cube.optimization; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.annotation.Clarification; -import org.apache.kylin.metadata.cube.model.NDataflow; - import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.metadata.cube.model.NDataflow; import lombok.extern.slf4j.Slf4j; @@ -36,8 +35,9 @@ public class IndexOptimizerFactory { private static final AbstractOptStrategy INCLUDED_OPT_STRATEGY = new IncludedLayoutOptStrategy(); private static final AbstractOptStrategy LOW_FREQ_OPT_STRATEGY = new LowFreqLayoutOptStrategy(); private static final AbstractOptStrategy SIMILAR_OPT_STRATEGY = new SimilarLayoutOptStrategy(); + private static final AbstractOptStrategy MERGED_OPT_STRATEGY = new MergedLayoutOptStrategy(); - public static IndexOptimizer getOptimizer(NDataflow dataflow, boolean needLog) { + public static IndexOptimizer getOptimizer(NDataflow dataflow, boolean needAggressiveOpt, boolean needLog) { IndexOptimizer optimizer = new IndexOptimizer(needLog); final int indexOptimizationLevel = KylinConfig.getInstanceFromEnv().getIndexOptimizationLevel(); if (indexOptimizationLevel == 1) { @@ -49,6 +49,12 @@ public class IndexOptimizerFactory { optimizer.getStrategiesForManual().add(SIMILAR_OPT_STRATEGY); } + if (needAggressiveOpt) { + optimizer.getStrategiesForAuto().clear(); + optimizer.getStrategiesForAuto() + .addAll(Lists.newArrayList(INCLUDED_OPT_STRATEGY, LOW_FREQ_OPT_STRATEGY, MERGED_OPT_STRATEGY)); + } + // log if needed printLog(needLog, indexOptimizationLevel, dataflow.getIndexPlan().isFastBitmapEnabled()); return optimizer; diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/LowFreqLayoutOptStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/LowFreqLayoutOptStrategy.java index cfb2d4f22d..85250a4bdf 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/LowFreqLayoutOptStrategy.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/LowFreqLayoutOptStrategy.java @@ -72,7 +72,7 @@ public class LowFreqLayoutOptStrategy extends AbstractOptStrategy { } @Override - protected void skipOptimizeTableIndex(List<LayoutEntity> inputLayouts) { + protected void skipOptimizeIndex(List<LayoutEntity> inputLayouts) { final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); if (!kylinConfig.isLowFreqStrategyConsiderTableIndex()) { inputLayouts.removeIf(layout -> IndexEntity.isTableIndex(layout.getId())); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/MergedLayoutOptStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/MergedLayoutOptStrategy.java new file mode 100644 index 0000000000..a6bc091c38 --- /dev/null +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/MergedLayoutOptStrategy.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.metadata.cube.optimization; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.kylin.metadata.cube.model.IndexEntity; +import org.apache.kylin.metadata.cube.model.LayoutEntity; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.cube.utils.IndexPlanReduceUtil; + +import com.google.common.collect.Sets; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MergedLayoutOptStrategy extends AbstractOptStrategy { + + public MergedLayoutOptStrategy() { + this.setType(GarbageLayoutType.MERGED); + } + + @Override + protected Set<Long> doCollect(List<LayoutEntity> inputLayouts, NDataflow dataflow, boolean needLog) { + Set<Long> garbageLayouts = Sets.newHashSet(); + List<Set<LayoutEntity>> sameDimAggLayouts = IndexPlanReduceUtil.collectSameDimAggLayouts(inputLayouts); + garbageLayouts.addAll( + sameDimAggLayouts.stream().flatMap(Set::stream).map(LayoutEntity::getId).collect(Collectors.toSet())); + + if (needLog) { + log.info("In dataflow({}), MergeLayoutOptStrategy found garbage laoyouts: {}.", dataflow.getId(), + garbageLayouts); + } + return garbageLayouts; + } + + @Override + protected void skipOptimizeIndex(List<LayoutEntity> inputLayouts) { + inputLayouts.removeIf(layout -> layout.isBase() || IndexEntity.isTableIndex(layout.getId())); + } +} diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/SimilarLayoutOptStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/SimilarLayoutOptStrategy.java index dba099e210..b7eecdfd07 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/SimilarLayoutOptStrategy.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/SimilarLayoutOptStrategy.java @@ -69,7 +69,7 @@ public class SimilarLayoutOptStrategy extends AbstractOptStrategy { } @Override - protected void skipOptimizeTableIndex(List<LayoutEntity> inputLayouts) { + protected void skipOptimizeIndex(List<LayoutEntity> inputLayouts) { inputLayouts.removeIf(layout -> IndexEntity.isTableIndex(layout.getId())); } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/ApproveRecsEvent.java similarity index 62% copy from src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java copy to src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/ApproveRecsEvent.java index 84f04556d8..fb99ac98a2 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/ApproveRecsEvent.java @@ -16,19 +16,24 @@ * limitations under the License. */ -package org.apache.kylin.tool.garbage; +package org.apache.kylin.metadata.cube.optimization.event; -public abstract class MetadataCleaner { - protected final String project; +import java.util.Map; - protected MetadataCleaner(String project) { - this.project = project; - } +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.cube.optimization.GarbageLayoutType; - // do in transaction - public abstract void cleanup(); +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; - public void prepare() { - // default do nothing - } +@Setter +@Getter +@AllArgsConstructor +@ToString +public class ApproveRecsEvent { + private String project; + + Map<NDataflow, Map<Long, GarbageLayoutType>> needOptAggressivelyModels; } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/BuildIndexEvent.java similarity index 67% copy from src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java copy to src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/BuildIndexEvent.java index 84f04556d8..19d984c76a 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/BuildIndexEvent.java @@ -16,19 +16,21 @@ * limitations under the License. */ -package org.apache.kylin.tool.garbage; +package org.apache.kylin.metadata.cube.optimization.event; -public abstract class MetadataCleaner { - protected final String project; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.apache.kylin.metadata.cube.model.NDataflow; - protected MetadataCleaner(String project) { - this.project = project; - } +import java.util.List; - // do in transaction - public abstract void cleanup(); - - public void prepare() { - // default do nothing - } +@Setter +@Getter +@AllArgsConstructor +@ToString +public class BuildIndexEvent { + private String project; + private List<NDataflow> dataflows; } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java index 24d4d850f3..1a87822766 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java @@ -49,7 +49,7 @@ public class GarbageStorageCollector implements StorageInfoCollector { for (val model : getModels(project)) { val dataflow = getDataflow(model).copy(); - final IndexOptimizer indexOptimizer = IndexOptimizerFactory.getOptimizer(dataflow, false); + final IndexOptimizer indexOptimizer = IndexOptimizerFactory.getOptimizer(dataflow, false, false); val garbageLayouts = indexOptimizer.getGarbageLayoutMap(dataflow).keySet(); if (CollectionUtils.isNotEmpty(garbageLayouts)) { storageSize += calculateLayoutSize(garbageLayouts, dataflow); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtil.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtil.java index 08f2343733..736a8f80c3 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtil.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtil.java @@ -18,21 +18,25 @@ package org.apache.kylin.metadata.cube.utils; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; -import org.apache.kylin.metadata.cube.model.IndexEntity; -import org.apache.kylin.metadata.cube.model.LayoutEntity; -import org.apache.kylin.metadata.model.NDataModel; - import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.guava30.shaded.common.collect.Sets; +import org.apache.kylin.metadata.cube.model.IndexEntity; +import org.apache.kylin.metadata.cube.model.IndexPlan; +import org.apache.kylin.metadata.cube.model.LayoutEntity; +import org.apache.kylin.metadata.model.NDataModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IndexPlanReduceUtil { + private static final Logger log = LoggerFactory.getLogger(IndexPlanReduceUtil.class); private IndexPlanReduceUtil() { } @@ -65,6 +69,46 @@ public class IndexPlanReduceUtil { return redundantMap; } + public static List<Set<LayoutEntity>> collectSameDimAggLayouts(List<LayoutEntity> inputLayouts) { + List<Set<LayoutEntity>> sameDimAggLayouts = Lists.newArrayList(); + Map<List<Integer>, Set<LayoutEntity>> aggLayoutDimGroup = Maps.newHashMap(); + inputLayouts.stream().filter(layout -> !layout.isBase() && !IndexEntity.isTableIndex(layout.getId())) + .forEach(layout -> { + List<Integer> aggLayoutDims = new ArrayList<>(); + for (Integer idx : layout.getColOrder()) { + if (idx < NDataModel.MEASURE_ID_BASE) { + aggLayoutDims.add(idx); + } + } + aggLayoutDimGroup.putIfAbsent(aggLayoutDims, Sets.newHashSet()); + aggLayoutDimGroup.get(aggLayoutDims).add(layout); + }); + + sameDimAggLayouts.addAll( + aggLayoutDimGroup.values().stream().filter(layouts -> layouts.size() > 1).collect(Collectors.toSet())); + return sameDimAggLayouts; + } + + public static IndexPlan mergeSameDimLayout(IndexPlan indexPlan, List<Set<LayoutEntity>> sameDimLayouts) { + IndexPlan.IndexPlanUpdateHandler updateHandler = indexPlan.createUpdateHandler(); + for (Set<LayoutEntity> layoutEntities : sameDimLayouts) { + Set<Integer> colOrder = Sets.newLinkedHashSet(); + List<Integer> allColOrders = Lists.newArrayList(); + List<Integer> shardByCol = Lists.newArrayList(); + for (LayoutEntity layoutEntity : layoutEntities) { + colOrder.addAll(layoutEntity.getColOrder()); + allColOrders.addAll(layoutEntity.getColOrder()); + shardByCol = layoutEntity.getShardByColumns(); + } + + LayoutEntity mergedLayout = indexPlan.createLayout(Lists.newArrayList(colOrder), true, false, shardByCol); + log.info("merge colOrders: {} into {}", allColOrders, mergedLayout.getColOrder()); + updateHandler.add(mergedLayout, true, false); + } + + return updateHandler.complete(); + } + /** * Collect a redundant map from included layout to reserved layout. * @param sortedLayouts sorted by layout's colOrder diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java index b1fd2eb198..e324f2673e 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java @@ -33,6 +33,9 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.common.util.TimeUtil; import org.apache.kylin.common.util.Unsafe; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.cube.model.IndexEntity; import org.apache.kylin.metadata.cube.model.IndexPlan; import org.apache.kylin.metadata.cube.model.LayoutEntity; @@ -46,6 +49,7 @@ import org.apache.kylin.metadata.cube.optimization.FrequencyMap; import org.apache.kylin.metadata.cube.optimization.IncludedLayoutOptStrategy; import org.apache.kylin.metadata.cube.optimization.IndexOptimizer; import org.apache.kylin.metadata.cube.optimization.LowFreqLayoutOptStrategy; +import org.apache.kylin.metadata.cube.optimization.MergedLayoutOptStrategy; import org.apache.kylin.metadata.cube.optimization.SimilarLayoutOptStrategy; import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore; import org.apache.kylin.metrics.HdfsCapacityMetrics; @@ -55,10 +59,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - import lombok.val; import lombok.extern.slf4j.Slf4j; @@ -88,8 +88,8 @@ public class ProjectStorageInfoCollectorTest extends NLocalFileMetadataTestCase getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", "true"); initTestData(); - val collector = new ProjectStorageInfoCollector(Lists.newArrayList(StorageInfoEnum.GARBAGE_STORAGE, StorageInfoEnum.STORAGE_QUOTA, - StorageInfoEnum.TOTAL_STORAGE)); + val collector = new ProjectStorageInfoCollector(Lists.newArrayList(StorageInfoEnum.GARBAGE_STORAGE, + StorageInfoEnum.STORAGE_QUOTA, StorageInfoEnum.TOTAL_STORAGE)); val volumeInfo = collector.getStorageVolumeInfo(getTestConfig(), DEFAULT_PROJECT); Assert.assertEquals(10240L * 1024 * 1024 * 1024, volumeInfo.getStorageQuotaSize()); @@ -236,6 +236,65 @@ public class ProjectStorageInfoCollectorTest extends NLocalFileMetadataTestCase Assert.assertEquals(new Integer(200), dateFrequency.get(currentDate - DAY_IN_MILLIS)); } + @Test + public void testMergedLayoutGcStrategy() { + NIndexPlanManager indexPlanManager = NIndexPlanManager.getInstance(getTestConfig(), DEFAULT_PROJECT); + NDataflowManager dataflowManager = NDataflowManager.getInstance(getTestConfig(), DEFAULT_PROJECT); + IndexPlan indexPlan = indexPlanManager.getIndexPlan(DEFAULT_MODEL_BASIC_ID); + indexPlanManager.updateIndexPlan(indexPlan.getUuid(), copyForWrite -> { + LayoutEntity layout1 = new LayoutEntity(); + layout1.setId(40001L); + layout1.setColOrder(Lists.newArrayList(1, 2, 3, 100006, 100007)); + layout1.setAuto(false); + IndexEntity index1 = new IndexEntity(); + index1.setId(40000L); + index1.setDimensions(Lists.newArrayList(1, 2, 3)); + index1.setMeasures(Lists.newArrayList(100006, 100007)); + index1.setLayouts(Lists.newArrayList(layout1)); + + LayoutEntity layout2 = new LayoutEntity(); + layout2.setId(50001L); + layout2.setColOrder(Lists.newArrayList(1, 2, 3, 100001, 100002)); + layout2.setAuto(true); + IndexEntity index2 = new IndexEntity(); + index2.setId(50000L); + index2.setDimensions(Lists.newArrayList(1, 2, 3)); + index2.setMeasures(Lists.newArrayList(100001, 100002)); + index2.setLayouts(Lists.newArrayList(layout2)); + + LayoutEntity layout3 = new LayoutEntity(); + layout3.setId(60001L); + layout3.setColOrder(Lists.newArrayList(1, 2, 3, 100003, 100004, 100005)); + layout3.setAuto(true); + IndexEntity index3 = new IndexEntity(); + index3.setId(60000L); + index3.setDimensions(Lists.newArrayList(1, 2, 3)); + index3.setMeasures(Lists.newArrayList(100003, 100004, 100005)); + index3.setLayouts(Lists.newArrayList(layout3)); + + // Table Index + LayoutEntity layout4 = new LayoutEntity(); + layout4.setId(20_000_040_001L); + layout4.setColOrder(Lists.newArrayList(1, 2, 3, 4, 5, 6)); + layout4.setAuto(true); + IndexEntity index4 = new IndexEntity(); + index4.setId(20_000_040_001L); + index4.setDimensions(Lists.newArrayList(1, 2, 3, 4, 5, 6)); + copyForWrite.setIndexes(Lists.newArrayList(index1, index2, index3)); + }); + + // change all layouts' status to ready. + NDataflow dataflow = dataflowManager.getDataflow(DEFAULT_MODEL_BASIC_ID); + NDataflowUpdate update = new NDataflowUpdate(dataflow.getUuid()); + NDataSegment latestReadySegment = dataflow.getLatestReadySegment(); + Set<Long> ids = indexPlan.getAllLayouts().stream().map(LayoutEntity::getId).collect(Collectors.toSet()); + update.setToAddOrUpdateLayouts(genCuboids(dataflow, latestReadySegment.getId(), ids)); + dataflowManager.updateDataflow(update); + + Set<Long> garbageLayouts = IndexOptimizer.findGarbageLayouts(dataflow, new MergedLayoutOptStrategy()); + Assert.assertEquals(2, garbageLayouts.size()); + } + @Test public void testSimilarLayoutGcStrategy() { /* @@ -408,7 +467,8 @@ public class ProjectStorageInfoCollectorTest extends NLocalFileMetadataTestCase NDataflow df = dataflowManager.getDataflow(DEFAULT_MODEL_BASIC_ID); NDataflowUpdate update = new NDataflowUpdate(df.getUuid()); NDataSegment latestReadySegment = df.getLatestReadySegment(); - Set<Long> ids = Sets.newHashSet(2_000_020_001L, 2_000_030_001L, 2_000_040_001L, 40_001L, 40_002L); + // 60_001L and 60_002L are non-existing layouts + Set<Long> ids = Sets.newHashSet(60_001L, 60_002L, 40_001L, 40_002L); update.setToAddOrUpdateLayouts(genCuboids(df, latestReadySegment.getId(), ids)); dataflowManager.updateDataflow(update); } diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtilTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtilTest.java index 498c78325e..71e8a7c895 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtilTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtilTest.java @@ -18,15 +18,20 @@ package org.apache.kylin.metadata.cube.utils; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.metadata.cube.model.IndexEntity; +import org.apache.kylin.metadata.cube.model.IndexPlan; import org.apache.kylin.metadata.cube.model.LayoutEntity; import org.junit.Assert; import org.junit.Test; -import org.apache.kylin.guava30.shaded.common.collect.Lists; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; public class IndexPlanReduceUtilTest { @@ -140,6 +145,26 @@ public class IndexPlanReduceUtilTest { } + @Test + public void testMergeSameDimLayout() { + LayoutEntity layout1 = new LayoutEntity(); + layout1.setId(1); + layout1.setColOrder(Lists.newArrayList(1, 2, 3, 4, 100000, 100001)); + layout1.setInProposing(false); + LayoutEntity layout2 = new LayoutEntity(); + layout2.setId(10001); + layout2.setColOrder(Lists.newArrayList(1, 2, 3, 4, 100000, 100001, 100002)); + layout2.setInProposing(true); + + IndexPlan indexPlan = new IndexPlan(); + List<Set<LayoutEntity>> sameDimLayouts = Lists + .newArrayList(Collections.singleton(Sets.newHashSet(layout1, layout2))); + IndexPlan indexPlanMerged = IndexPlanReduceUtil.mergeSameDimLayout(indexPlan, sameDimLayouts); + Assert.assertEquals(1, indexPlanMerged.getAllIndexes().size()); + Assert.assertEquals(ImmutableList.of(1, 2, 3, 4, 100000, 100001, 100002), + indexPlanMerged.getAllIndexes().get(0).getLayouts().get(0).getColOrder()); + } + @Test public void testCollectIncludedAggIndexLayoutsWithProposingSmallerIndex() { LayoutEntity layout1 = new LayoutEntity(); // a proposed layout diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/initialize/BuildAppInitializer.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/initialize/BuildAppInitializer.java index 848b29e1a8..d478f42a5c 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/initialize/BuildAppInitializer.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/initialize/BuildAppInitializer.java @@ -19,6 +19,7 @@ package org.apache.kylin.rest.initialize; import org.apache.kylin.common.scheduler.EventBusFactory; import org.apache.kylin.rest.service.JobService; +import org.apache.kylin.rest.service.ModelBuildService; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -32,8 +33,12 @@ public class BuildAppInitializer implements InitializingBean { @Autowired private JobService jobService; + @Autowired + private ModelBuildService modelBuildService; + @Override public void afterPropertiesSet() throws Exception { EventBusFactory.getInstance().registerService(jobService); + EventBusFactory.getInstance().registerService(modelBuildService); } } diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java index 4fcd8f5d16..dd4afcb0f7 100644 --- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java +++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java @@ -45,6 +45,11 @@ import org.apache.kylin.common.exception.JobErrorCode; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.guava30.shaded.common.base.Strings; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Sets; +import org.apache.kylin.guava30.shaded.common.eventbus.Subscribe; import org.apache.kylin.job.dao.ExecutablePO; import org.apache.kylin.job.exception.JobSubmissionException; import org.apache.kylin.job.execution.JobTypeEnum; @@ -56,6 +61,7 @@ import org.apache.kylin.metadata.cube.model.NDataSegment; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.SegmentPartition; +import org.apache.kylin.metadata.cube.optimization.event.BuildIndexEvent; import org.apache.kylin.metadata.model.ManagementType; import org.apache.kylin.metadata.model.MultiPartitionDesc; import org.apache.kylin.metadata.model.NDataModel; @@ -85,14 +91,11 @@ import org.apache.kylin.rest.service.params.IncrementBuildSegmentParams; import org.apache.kylin.rest.service.params.MergeSegmentParams; import org.apache.kylin.rest.service.params.RefreshSegmentParams; import org.apache.kylin.source.SourceFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.guava30.shaded.common.base.Strings; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - import lombok.val; import lombok.var; @@ -105,6 +108,8 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil @Autowired private SegmentHelper segmentHelper; + private static final Logger logger = LoggerFactory.getLogger(ModelBuildService.class); + //only fo test public JobInfoResponse buildSegmentsManually(String project, String modelId, String start, String end) throws Exception { @@ -431,6 +436,12 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil public BuildIndexResponse buildIndicesManually(String modelId, String project, int priority, String yarnQueue, Object tag) { aclEvaluate.checkProjectOperationPermission(project); + String username = getUsername(); + return buildIndicesInternal(modelId, project, priority, yarnQueue, tag, username); + } + + private BuildIndexResponse buildIndicesInternal(String modelId, String project, int priority, String yarnQueue, + Object tag, String userName) { NDataModel modelDesc = getManager(NDataModelManager.class, project).getDataModelDesc(modelId); if (ManagementType.MODEL_BASED != modelDesc.getManagementType()) { throw new KylinException(PERMISSION_DENIED, String.format(Locale.ROOT, @@ -444,13 +455,26 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil } String jobId = getManager(SourceUsageManager.class).licenseCheckWrap(project, - () -> getManager(JobManager.class, project).addIndexJob(new JobParam(modelId, getUsername()) - .withPriority(priority).withYarnQueue(yarnQueue).withTag(tag))); + () -> getManager(JobManager.class, project).addIndexJob( + new JobParam(modelId, userName).withPriority(priority).withYarnQueue(yarnQueue).withTag(tag))); return new BuildIndexResponse(StringUtils.isBlank(jobId) ? BuildIndexResponse.BuildIndexType.NO_LAYOUT : BuildIndexResponse.BuildIndexType.NORM_BUILD, jobId); } + @Subscribe + public void buildIndicesManually(BuildIndexEvent event) { + String project = event.getProject(); + List<NDataflow> dataflows = event.getDataflows(); + dataflows.forEach(dataflow -> { + if (!dataflow.isOnline()) { + return; + } + logger.info("build indexes for model: {} under project: {}", dataflow.getModelAlias(), project); + buildIndicesInternal(dataflow.getId(), project, ExecutablePO.DEFAULT_PRIORITY, null, null, "System"); + }); + } + @Transaction(project = 0) public JobInfoResponse buildSegmentPartitionByValue(String project, String modelId, String segmentId, List<String[]> partitionValues, boolean parallelBuild, boolean buildAllPartitions, int priority, diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java index 109a8a16e7..f884d87aac 100644 --- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java +++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java @@ -57,6 +57,9 @@ import org.apache.kylin.engine.spark.job.ExecutableAddSegmentHandler; import org.apache.kylin.engine.spark.job.ExecutableMergeOrRefreshHandler; import org.apache.kylin.engine.spark.job.NSparkCubingJob; import org.apache.kylin.engine.spark.utils.ComputedColumnEvalUtil; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.job.dao.ExecutablePO; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableParams; @@ -78,6 +81,7 @@ import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NDataflowUpdate; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; import org.apache.kylin.metadata.cube.model.PartitionStatusEnum; +import org.apache.kylin.metadata.cube.optimization.event.BuildIndexEvent; import org.apache.kylin.metadata.job.JobBucket; import org.apache.kylin.metadata.model.ManagementType; import org.apache.kylin.metadata.model.NDataModel; @@ -122,10 +126,6 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.springframework.test.util.ReflectionTestUtils; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - import lombok.val; import lombok.var; @@ -877,6 +877,32 @@ public class ModelServiceBuildTest extends SourceTestCase { } + @Test + public void testBuildIndexManuallyForEvent() { + val project = "default"; + val modelId = "abe3bf1a-c4bc-458d-8278-7ea8b00f5e96"; + val dataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + val df = dataflowManager.getDataflow(modelId); + BuildIndexEvent buildIndexEvent = new BuildIndexEvent(project, Lists.newArrayList(df)); + + val dfUpdate = new NDataflowUpdate(df.getId()); + List<NDataLayout> tobeRemoveCuboidLayouts = Lists.newArrayList(); + Segments<NDataSegment> segments = df.getSegments(); + for (NDataSegment segment : segments) { + tobeRemoveCuboidLayouts.addAll(segment.getLayoutsMap().values()); + } + dfUpdate.setToRemoveLayouts(tobeRemoveCuboidLayouts.toArray(new NDataLayout[0])); + dataflowManager.updateDataflow(dfUpdate); + val modelManager = NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + modelManager.updateDataModel(modelId, + copyForWrite -> copyForWrite.setManagementType(ManagementType.MODEL_BASED)); + modelBuildService.buildIndicesManually(buildIndexEvent); + val executables = getRunningExecutables(project, modelId); + Assert.assertEquals(1, executables.size()); + Assert.assertTrue(((NSparkCubingJob) executables.get(0)).getHandler() instanceof ExecutableAddCuboidHandler); + Assert.assertEquals(3, executables.get(0).getPriority()); + } + @Test public void testBuildIndexManuallyWithoutLayout() { val project = "default"; @@ -1392,7 +1418,7 @@ public class ModelServiceBuildTest extends SourceTestCase { // build all partition values IncrementBuildSegmentParams incrParams2 = new IncrementBuildSegmentParams(project, modelId, "1633104000000", "1633190400000", model.getPartitionDesc(), model.getMultiPartitionDesc(), null, true, null) - .withBuildAllSubPartitions(true); + .withBuildAllSubPartitions(true); val jobInfo2 = modelBuildService.incrementBuildSegmentsManually(incrParams2); Assert.assertEquals(1, jobInfo2.getJobs().size()); Assert.assertEquals(jobInfo2.getJobs().get(0).getJobName(), JobTypeEnum.INC_BUILD.name()); diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java index 05f36400af..8cd51f7b9d 100644 --- a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java +++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java @@ -243,7 +243,19 @@ public class NProjectController extends NBasicController { if (projectInstance == null) { throw new KylinException(PROJECT_NOT_EXIST, project); } - projectService.cleanupGarbage(project); + projectService.cleanupGarbage(project, false); + return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, true, ""); + } + + @ApiOperation(value = "cleanupProjectStorage", tags = { "SM" }, notes = "Add URL: {project}; ") + @PutMapping(value = "/{project:.+}/optimize_index") + @ResponseBody + public EnvelopeResponse<Boolean> optimizeIndex(@PathVariable(value = "project") String project) throws Exception { + ProjectInstance projectInstance = projectService.getManager(NProjectManager.class).getProject(project); + if (projectInstance == null) { + throw new KylinException(PROJECT_NOT_EXIST, project); + } + projectService.cleanupGarbage(project, true); return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, true, ""); } diff --git a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java index 12f047236e..7677b38d94 100644 --- a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java +++ b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java @@ -231,7 +231,7 @@ public class NProjectControllerTest extends NLocalFileMetadataTestCase { NProjectManager projectManager = Mockito.mock(NProjectManager.class); Mockito.doReturn(projectInstance).when(projectManager).getProject("default"); Mockito.doReturn(projectManager).when(projectService).getManager(NProjectManager.class); - Mockito.doNothing().when(projectService).cleanupGarbage("default"); + Mockito.doNothing().when(projectService).cleanupGarbage("default", false); mockMvc.perform(MockMvcRequestBuilders.put("/api/projects/{project}/storage", "default") .contentType(MediaType.APPLICATION_JSON).param("project", "default") .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON))) diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java index 28c102a2d1..63d048f86e 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java @@ -26,30 +26,42 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_ import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NOT_EXIST; import java.util.Arrays; +import java.util.List; +import java.util.Objects; import java.util.Set; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.msg.MsgPicker; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.acl.AclTCRManager; +import org.apache.kylin.metadata.cube.model.IndexEntity; import org.apache.kylin.metadata.cube.model.IndexPlan; +import org.apache.kylin.metadata.cube.model.LayoutEntity; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.model.NTableMetadataManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableExtDesc; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.AclPermissionUtil; import org.springframework.beans.factory.annotation.Autowired; -import org.apache.kylin.guava30.shaded.common.collect.Sets; - import lombok.val; import lombok.var; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class AbstractModelService extends BasicService { public static final String VALID_NAME_FOR_MODEL = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_"; + protected static final String SAVE_INDEXES_STRATEGY = "single_dim_and_reduce_hc"; @Autowired public AclEvaluate aclEvaluate; @@ -137,4 +149,102 @@ public class AbstractModelService extends BasicService { } } + private boolean isHighCardinalityDim(NTableMetadataManager tableManager, TblColRef colRef) { + String tableIdentity = colRef.getTableRef().getTableIdentity(); + TableDesc tableDesc = tableManager.getTableDesc(tableIdentity); + TableExtDesc tableExtIfExists = tableManager.getTableExtIfExists(tableDesc); + TableExtDesc.ColumnStats columnStats = tableExtIfExists.getColumnStatsByName(colRef.getName()); + + // table not sampled + if (Objects.isNull(columnStats)) { + return false; + } + + return (double) (columnStats.getCardinality()) / tableExtIfExists.getTotalRows() > 0.2; + } + + private boolean needToDeleteLayout(NTableMetadataManager tableManager, NDataModel model, IndexEntity index) { + int dimSize = index.getDimensions().size(); + if (dimSize > 1) { + return true; + } + + if (dimSize == 0) { + return false; + } + + return isHighCardinalityDim(tableManager, model.getColRef(index.getDimensions().get(0))); + } + + private Pair<Boolean, Integer> processIndex(NDataModel model, IndexEntity index, + IndexPlan.IndexPlanUpdateHandler updateHandler, NTableMetadataManager tableManager) { + boolean needSplit = true; + Integer shardByCol = null; + + List<LayoutEntity> layouts = index.getLayouts(); + for (int i = layouts.size() - 1; i >= 0; i--) { + LayoutEntity layout = layouts.get(i); + + // When the recommended index and agg index overlap, + // there will be two layouts under this index, and they are only in different col order. + // This index does not require split. + if (layout.isBaseIndex()) { + needSplit = false; + continue; + } + + List<Integer> shardByCols = layout.getShardByColumns(); + if (CollectionUtils.isNotEmpty(shardByCols)) { + shardByCol = shardByCols.get(0); + } + + if (needToDeleteLayout(tableManager, model, index)) { + updateHandler.remove(layouts.get(i), IndexEntity.isAggIndex(index.getId()), false, false); + } + } + + return Pair.newPair(needSplit, shardByCol); + } + + protected void splitIndexesIntoSingleDimIndexes(NDataModel model, IndexPlan indexPlan) { + NTableMetadataManager tableManager = getManager(NTableMetadataManager.class, model.getProject()); + IndexPlan.IndexPlanUpdateHandler updateHandler = indexPlan.createUpdateHandler(); + List<IndexEntity> indexes = indexPlan.getIndexes(); + for (IndexEntity index : indexes) { + Pair<Boolean, Integer> needSplitAndShardByCol = processIndex(model, index, updateHandler, tableManager); + boolean needSplit = needSplitAndShardByCol.getFirst(); + Integer shardByCol = needSplitAndShardByCol.getSecond(); + + if (!needSplit) { + continue; + } + + List<Integer> measures = index.getMeasures(); + for (Integer dimension : index.getDimensions()) { + TblColRef colRef = model.getColRef(dimension); + if (isHighCardinalityDim(tableManager, colRef)) { + log.warn("The col {} is high cardinality dimension, not recommended as an index", colRef.getName()); + continue; + } + + List<Integer> colOrder = Lists.newArrayList(); + colOrder.add(dimension); + boolean containShardByCol = colOrder.contains(shardByCol); + colOrder.addAll(measures); + + LayoutEntity singleDimensionAggLayout; + if (containShardByCol) { + singleDimensionAggLayout = indexPlan.createLayout(colOrder, true, false, + Lists.newArrayList(shardByCol)); + } else { + singleDimensionAggLayout = indexPlan.createLayout(colOrder, true, false, Lists.newArrayList()); + } + + updateHandler.add(singleDimensionAggLayout, true, true); + } + } + + updateHandler.complete(); + } + } diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/BaseIndexUpdateHelper.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/BaseIndexUpdateHelper.java index 2a88e7f4b6..e9d7e80e50 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/BaseIndexUpdateHelper.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/BaseIndexUpdateHelper.java @@ -20,6 +20,7 @@ package org.apache.kylin.rest.service; import java.util.List; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.guava30.shaded.common.collect.Lists; import org.apache.kylin.metadata.cube.model.IndexEntity; import org.apache.kylin.metadata.cube.model.IndexPlan; import org.apache.kylin.metadata.cube.model.LayoutEntity; @@ -29,8 +30,6 @@ import org.apache.kylin.rest.request.CreateBaseIndexRequest; import org.apache.kylin.rest.response.BuildBaseIndexResponse; import org.apache.kylin.rest.util.SpringContext; -import org.apache.kylin.guava30.shaded.common.collect.Lists; - import io.kyligence.kap.secondstorage.SecondStorageUpdater; import io.kyligence.kap.secondstorage.SecondStorageUtil; import lombok.Setter; @@ -89,6 +88,10 @@ public class BaseIndexUpdateHelper { } public BuildBaseIndexResponse update(IndexPlanService service) { + return update(service, true); + } + + public BuildBaseIndexResponse update(IndexPlanService service, boolean checkProjectOperation) { if (!needUpdate) { return BuildBaseIndexResponse.EMPTY; } @@ -116,8 +119,14 @@ public class BaseIndexUpdateHelper { CreateBaseIndexRequest indexRequest = new CreateBaseIndexRequest(); indexRequest.setModelId(modelId); indexRequest.setProject(project); - BuildBaseIndexResponse response = service.updateBaseIndex(project, indexRequest, needCreateBaseTable, - needCreateBaseAgg, true); + BuildBaseIndexResponse response; + if (checkProjectOperation) { + response = service.updateBaseIndex(project, indexRequest, needCreateBaseTable, needCreateBaseAgg, true); + } else { + response = service.updateBaseIndexInternal(project, indexRequest, needCreateBaseTable, needCreateBaseAgg, + true); + } + response.judgeIndexOperateType(exist(preBaseAggLayout), true); response.judgeIndexOperateType(exist(preBaseTableLayout), false); diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java index aa7b481bbb..9a50ab5b75 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java @@ -52,7 +52,6 @@ import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.engine.spark.smarter.IndexDependencyParser; -import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; import org.apache.kylin.guava30.shaded.common.base.Preconditions; import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; import org.apache.kylin.guava30.shaded.common.collect.Lists; @@ -112,6 +111,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Service; import io.kyligence.kap.secondstorage.SecondStorageUpdater; @@ -265,7 +265,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp return createTableIndex(project, request.getModelId(), newLayout, request.isLoadData()); } - private BuildIndexResponse createTableIndex(String project, String modelId, LayoutEntity newLayout, + public BuildIndexResponse createTableIndex(String project, String modelId, LayoutEntity newLayout, boolean loadData) { NIndexPlanManager indexPlanManager = getManager(NIndexPlanManager.class, project); val jobManager = getManager(JobManager.class, project); @@ -494,6 +494,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp StringUtils.join(notExistCols.iterator(), ","))); } } + indexPlan.setRuleBasedIndex(ruleBasedIndex); } catch (OutOfMaxCombinationException oe) { invalid = true; @@ -652,13 +653,13 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp } public List<IndexResponse> getIndexes(String project, String modelId, String key, List<IndexEntity.Status> status, - String orderBy, Boolean desc, List<IndexEntity.Source> sources) { + String orderBy, Boolean desc, List<IndexEntity.Source> sources) { return getIndexes(new IndexPlanParams(project, modelId, null, null, sources, status, null), - new PaginationParams(null, null, orderBy, desc), - key); + new PaginationParams(null, null, orderBy, desc), key); } - public List<IndexResponse> getIndexes(IndexPlanParams indexPlanParams, PaginationParams paginationParams, String key) { + public List<IndexResponse> getIndexes(IndexPlanParams indexPlanParams, PaginationParams paginationParams, + String key) { String project = indexPlanParams.getProject(); String modelId = indexPlanParams.getModelId(); String segmentId = indexPlanParams.getSegmentId(); @@ -674,10 +675,11 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp Preconditions.checkState(indexPlan != null); val model = indexPlan.getModel(); val layouts = indexPlan.getAllLayouts(); - Set<Long> layoutsByRunningJobs = getLayoutsByRunningJobs(project, modelId); + Map<String, Set<Long>> layoutsByRunningJobs = getLayoutsByRunningJobs(project, modelId); if (StringUtils.isBlank(key)) { return sortAndFilterLayouts(layouts.stream() - .map(layoutEntity -> convertToResponse(layoutEntity, indexPlan.getModel(), layoutsByRunningJobs, segmentId)) + .map(layoutEntity -> convertToResponse(layoutEntity, indexPlan.getModel(), layoutsByRunningJobs, + segmentId)) .filter(indexResponse -> statusSet.isEmpty() || statusSet.contains(indexResponse.getStatus())), orderBy, desc, sources); } @@ -840,8 +842,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp return response; } - @VisibleForTesting - public Set<Long> getLayoutsByRunningJobs(String project, String modelId) { + public Map<String, Set<Long>> getLayoutsByRunningJobs(String project, String modelId) { List<AbstractExecutable> runningJobList = NExecutableManager .getInstance(KylinConfig.getInstanceFromEnv(), project) // .getPartialExecutablesByStatusList( @@ -849,17 +850,27 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp ExecutableState.ERROR), // path -> StringUtils.endsWith(path, modelId)); - return runningJobList.stream() - .filter(abstractExecutable -> Objects.equals(modelId, abstractExecutable.getTargetSubject())) - .map(AbstractExecutable::getToBeDeletedLayoutIds).flatMap(Set::stream).collect(Collectors.toSet()); + Map<String, Set<Long>> underConstructionLayoutsMap = new HashMap<>(); + runningJobList.stream().filter(ae -> Objects.equals(modelId, ae.getTargetSubject())).forEach(ae -> { + Set<Long> layoutIds = ae.getLayoutIds(); + for (String segmentId : ae.getSegmentIds()) { + Set<Long> segmentLayoutIds = underConstructionLayoutsMap.get(segmentId); + if (segmentLayoutIds == null) { + underConstructionLayoutsMap.put(segmentId, Sets.newHashSet(layoutIds)); + } else { + segmentLayoutIds.addAll(layoutIds); + } + } + }); + return underConstructionLayoutsMap; } private IndexResponse convertToResponse(LayoutEntity layoutEntity, NDataModel model) { - return convertToResponse(layoutEntity, model, Sets.newHashSet(), null); + return convertToResponse(layoutEntity, model, Maps.newHashMap(), null); } private IndexResponse convertToResponse(LayoutEntity layoutEntity, NDataModel model, - Set<Long> layoutIdsOfRunningJobs, String segmentId) { + Map<String, Set<Long>> layoutIdsOfRunningJobs, String segmentId) { // remove all internal measures val colOrders = Lists.newArrayList(layoutEntity.getColOrder()); @@ -880,8 +891,14 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp boolean hasDataInconsistent = false; List<NDataSegment> segments = StringUtils.isBlank(segmentId) ? dataflow.getSegments() - : dataflow.getSegments().stream().filter(seg -> seg.getId().equals(segmentId)).collect(Collectors.toList()); + : dataflow.getSegments().stream().filter(seg -> seg.getId().equals(segmentId)) + .collect(Collectors.toList()); + Set<Long> layoutIdsOnLoading = Sets.newHashSet(); for (NDataSegment segment : segments) { + Set<Long> segmentLayoutIdsOnLoading = layoutIdsOfRunningJobs.get(segment.getId()); + if (CollectionUtils.isNotEmpty(segmentLayoutIdsOnLoading)) { + layoutIdsOnLoading.addAll(segmentLayoutIdsOnLoading); + } NDataLayout layout = segment.getLayout(layoutEntity.getId(), true); if (layout == null) { continue; @@ -896,7 +913,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp IndexEntity.Status status; if (readyCount <= 0) { - if (layoutIdsOfRunningJobs.contains(layoutEntity.getId())) { + if (layoutIdsOnLoading.contains(layoutEntity.getId())) { status = IndexEntity.Status.BUILDING; } else { status = IndexEntity.Status.NO_BUILD; @@ -1110,6 +1127,13 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp public BuildBaseIndexResponse updateBaseIndex(String project, CreateBaseIndexRequest request, boolean createIfNotExistTableLayout, boolean createIfNotExistAggLayout, boolean isAuto) { aclEvaluate.checkProjectOperationDesignPermission(project); + return updateBaseIndexInternal(project, request, createIfNotExistTableLayout, createIfNotExistAggLayout, + isAuto); + } + + @Transaction(project = 0) + public BuildBaseIndexResponse updateBaseIndexInternal(String project, CreateBaseIndexRequest request, + boolean createIfNotExistTableLayout, boolean createIfNotExistAggLayout, boolean isAuto) { // update = delete + create Set<Long> needDelete = checkNeedUpdateBaseIndex(project, request, isAuto); List<LayoutEntity> needRetainAggLayout = getNeedRetainAggLayout(project, request, needDelete); @@ -1126,7 +1150,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp return BuildBaseIndexResponse.EMPTY; } - BuildBaseIndexResponse response = createBaseIndex(project, request); + BuildBaseIndexResponse response = createBaseIndexInternal(project, request); response.setIndexUpdateType(needDelete); return response; } @@ -1246,6 +1270,10 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp @Transaction(project = 0) public BuildBaseIndexResponse createBaseIndex(String project, CreateBaseIndexRequest request) { aclEvaluate.checkProjectOperationDesignPermission(project); + return createBaseIndexInternal(project, request); + } + + public BuildBaseIndexResponse createBaseIndexInternal(String project, CreateBaseIndexRequest request) { NDataModel model = getManager(NDataModelManager.class, project).getDataModelDesc(request.getModelId()); NIndexPlanManager indexPlanManager = getManager(NIndexPlanManager.class, project); IndexPlan indexPlan = indexPlanManager.getIndexPlan(request.getModelId()); @@ -1283,7 +1311,7 @@ public class IndexPlanService extends BasicService implements TableIndexPlanSupp } private void overrideLayout(LayoutEntity layout, LayoutProperty layoutProperty, NDataModel model) { - layout.setOwner(BasicService.getUsername()); + layout.setOwner(SecurityContextHolder.getContext() == null ? "System" : getUsername()); if (layoutProperty == null) { return; } diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java index 59cc0a9203..e542fe6698 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java @@ -124,14 +124,22 @@ import org.apache.kylin.common.persistence.transaction.TransactionException; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.persistence.transaction.UnitOfWorkContext; import org.apache.kylin.common.scheduler.EventBusFactory; -import org.apache.kylin.common.util.SqlIdentifierFormatterVisitor; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.RandomUtil; +import org.apache.kylin.common.util.SqlIdentifierFormatterVisitor; import org.apache.kylin.common.util.StringHelper; import org.apache.kylin.common.util.ThreadUtil; import org.apache.kylin.engine.spark.utils.ComputedColumnEvalUtil; +import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; +import org.apache.kylin.guava30.shaded.common.base.Preconditions; +import org.apache.kylin.guava30.shaded.common.base.Strings; +import org.apache.kylin.guava30.shaded.common.base.Supplier; +import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.job.SecondStorageJobParamUtil; import org.apache.kylin.job.common.SegmentUtil; import org.apache.kylin.job.execution.AbstractExecutable; @@ -267,15 +275,7 @@ import org.springframework.security.core.userdetails.UserDetails; import org.springframework.stereotype.Component; import com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting; -import org.apache.kylin.guava30.shaded.common.base.Preconditions; -import org.apache.kylin.guava30.shaded.common.base.Strings; -import org.apache.kylin.guava30.shaded.common.collect.ImmutableList; -import org.apache.kylin.guava30.shaded.common.collect.Lists; -import org.apache.kylin.guava30.shaded.common.collect.Maps; -import org.apache.kylin.guava30.shaded.common.collect.Sets; -import org.apache.kylin.guava30.shaded.common.base.Supplier; import io.kyligence.kap.secondstorage.SecondStorage; import io.kyligence.kap.secondstorage.SecondStorageNodeHelper; import io.kyligence.kap.secondstorage.SecondStorageUpdater; @@ -1847,6 +1847,10 @@ public class ModelService extends AbstractModelService implements TableModelSupp } public void saveNewModelsAndIndexes(String project, List<ModelRequest> newModels) { + saveNewModelsAndIndexes(project, null, newModels); + } + + public void saveNewModelsAndIndexes(String project, String saveIndexesStrategy, List<ModelRequest> newModels) { if (CollectionUtils.isEmpty(newModels)) { return; } @@ -1883,6 +1887,9 @@ public class ModelService extends AbstractModelService implements TableModelSupp emptyIndex.setUuid(expanded.getUuid()); indexPlanManager.createIndexPlan(emptyIndex); indexPlanService.expandIndexPlanRequest(indexPlan, expanded); + if (SAVE_INDEXES_STRATEGY.equalsIgnoreCase(saveIndexesStrategy)) { + indexPlan.setBaseAggIndexReduceHighCardinalityDim(true); + } addBaseIndex(modelRequest, expanded, indexPlan); // create DataFlow @@ -1896,7 +1903,7 @@ public class ModelService extends AbstractModelService implements TableModelSupp } createStreamingJob(project, expanded, modelRequest); - updateIndexPlan(project, indexPlan); + updateIndexPlan(project, indexPlan, expanded, saveIndexesStrategy); UnitOfWorkContext context = UnitOfWork.get(); context.doAfterUnit(() -> EventBusFactory.getInstance() .postSync(new ModelAddEvent(project, expanded.getId(), expanded.getAlias()))); @@ -2149,9 +2156,13 @@ public class ModelService extends AbstractModelService implements TableModelSupp copy.getPartitionDesc().changeTableAlias(oldAliasName, tableName); } - void updateIndexPlan(String project, IndexPlan indexPlan) { + void updateIndexPlan(String project, IndexPlan indexPlan, NDataModel model, String saveIndexesStrategy) { NIndexPlanManager indexPlanManager = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project); indexPlanManager.updateIndexPlan(indexPlan.getId(), copyForWrite -> { + if (SAVE_INDEXES_STRATEGY.equalsIgnoreCase(saveIndexesStrategy)) { + copyForWrite.setBaseAggIndexReduceHighCardinalityDim(true); + splitIndexesIntoSingleDimIndexes(model, indexPlan); + } if (indexPlan.getAggShardByColumns() != null) { copyForWrite.setAggShardByColumns(indexPlan.getAggShardByColumns()); } diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java index 05d3d4d107..73b9a94fb7 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java @@ -17,9 +17,28 @@ */ package org.apache.kylin.rest.service; -import lombok.extern.slf4j.Slf4j; -import lombok.val; -import lombok.var; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.INDEX_DUPLICATE; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.LAYOUT_NOT_EXISTS; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.SHARD_BY_COLUMN_NOT_IN_INDEX; +import static org.apache.kylin.metadata.cube.model.IndexEntity.Source.CUSTOM_TABLE_INDEX; +import static org.apache.kylin.metadata.cube.model.IndexEntity.Source.RECOMMENDED_TABLE_INDEX; +import static org.apache.kylin.metadata.model.SegmentStatusEnum.READY; +import static org.apache.kylin.metadata.model.SegmentStatusEnum.WARNING; +import static org.hamcrest.Matchers.is; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + import org.apache.commons.collections.ListUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; @@ -29,6 +48,7 @@ import org.apache.kylin.cube.model.SelectRule; import org.apache.kylin.engine.spark.job.ExecutableAddCuboidHandler; import org.apache.kylin.engine.spark.job.NSparkCubingJob; import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.cube.cuboid.NAggregationGroup; import org.apache.kylin.metadata.cube.model.IndexEntity; @@ -69,25 +89,9 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.springframework.test.util.ReflectionTestUtils; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; - -import static org.apache.kylin.common.exception.code.ErrorCodeServer.INDEX_DUPLICATE; -import static org.apache.kylin.common.exception.code.ErrorCodeServer.LAYOUT_NOT_EXISTS; -import static org.apache.kylin.common.exception.code.ErrorCodeServer.SHARD_BY_COLUMN_NOT_IN_INDEX; -import static org.apache.kylin.metadata.cube.model.IndexEntity.Source.CUSTOM_TABLE_INDEX; -import static org.apache.kylin.metadata.cube.model.IndexEntity.Source.RECOMMENDED_TABLE_INDEX; -import static org.apache.kylin.metadata.model.SegmentStatusEnum.READY; -import static org.apache.kylin.metadata.model.SegmentStatusEnum.WARNING; -import static org.hamcrest.Matchers.is; +import lombok.val; +import lombok.var; +import lombok.extern.slf4j.Slf4j; @Slf4j public class IndexPlanServiceTest extends SourceTestCase { @@ -1148,7 +1152,9 @@ public class IndexPlanServiceTest extends SourceTestCase { Assert.assertEquals(1, response.size()); Assert.assertTrue(ids.contains(20000010001L)); - Mockito.doReturn(Sets.newHashSet(20000020001L)).when(indexPlanService).getLayoutsByRunningJobs(getProject(), + Map<String, Set<Long>> underConstructionLayoutsMap = Maps.newHashMap(); + underConstructionLayoutsMap.put("ef5e0663-feba-4ed2-b71c-21958122bbff", Sets.newHashSet(20000020001L)); + Mockito.doReturn(underConstructionLayoutsMap).when(indexPlanService).getLayoutsByRunningJobs(getProject(), modelId); response = indexPlanService.getIndexes(getProject(), modelId, "", Lists.newArrayList(IndexEntity.Status.BUILDING), "data_size", false, null); diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java index 054755f307..de48d76570 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java @@ -1037,7 +1037,7 @@ public class ProjectServiceTest extends NLocalFileMetadataTestCase { @Test public void testCleanupGarbage() throws Exception { - projectService.cleanupGarbage(PROJECT); + projectService.cleanupGarbage(PROJECT, false); } @Test diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala index 89e9a2c58d..a1e15c8de4 100644 --- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala +++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala @@ -18,12 +18,13 @@ package org.apache.kylin.query.plugin.diagnose -import org.apache.kylin.guava30.shaded.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.kylin.common.util.ExecutorServiceUtil +import org.apache.kylin.guava30.shaded.common.util.concurrent.ThreadFactoryBuilder import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext} import org.apache.spark.internal.Logging +import org.apache.spark.utils.SparkHadoopUtils import org.joda.time.DateTime import java.io.File @@ -36,8 +37,10 @@ class DiagnoseExecutorPlugin extends ExecutorPlugin with Logging { private val LOCAL_GC_FILE_PREFIX: String = "gc" private val DATE_PATTERN = "yyyy-MM-dd" private val checkingInterval: Long = 10000L - private val configuration: Configuration = new Configuration() - private val fileSystem: FileSystem = FileSystem.get(configuration) + // Default hadoop configuration for the unique fileSystem below is sufficient + private val configuration: Configuration = SparkHadoopUtils.newConfiguration() + // Use FileSystem.newInstance(...) to prevent contamination of global file system cache + private val fileSystem: FileSystem = FileSystem.newInstance(configuration) private val scheduledExecutorService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Diagnose-%d").build()) diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/AbstractHdfsLogAppender.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/AbstractHdfsLogAppender.java index e80fbc9513..805647b3e5 100644 --- a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/AbstractHdfsLogAppender.java +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/AbstractHdfsLogAppender.java @@ -109,7 +109,7 @@ public abstract class AbstractHdfsLogAppender public FileSystem getFileSystem() { if (null == fileSystem) { - return getFileSystem(SparkHadoopUtils.newConfigurationWithSparkConf()); + return getFileSystem(SparkHadoopUtils.newConfiguration()); } return fileSystem; } @@ -122,7 +122,10 @@ public abstract class AbstractHdfsLogAppender workingDir = System.getProperty("kylin.hdfs.working.dir"); StatusLogger.getLogger().warn("hdfsWorkingDir -> " + getWorkingDir()); } - fileSystem = new Path(workingDir).getFileSystem(conf); + + // Use FileSystem.newInstance(...) to prevent contamination of global file system cache + fileSystem = FileSystem.newInstance(new Path(workingDir).toUri(), conf); + } catch (IOException e) { StatusLogger.getLogger().error("Failed to create the file system, ", e); throw new RuntimeException("Failed to create the file system, ", e); diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkDriverHdfsLogAppender.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkDriverHdfsLogAppender.java index a09c338f6f..3aaaff1f0f 100644 --- a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkDriverHdfsLogAppender.java +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkDriverHdfsLogAppender.java @@ -112,7 +112,7 @@ public class SparkDriverHdfsLogAppender extends AbstractHdfsLogAppender { @Override public void doWriteLog(int eventSize, List<LogEvent> transaction) throws IOException, InterruptedException { if (!isWriterInited()) { - Configuration conf = SparkHadoopUtils.newConfigurationWithSparkConf(); + Configuration conf = SparkHadoopUtils.newConfiguration(); if (!initHdfsWriter(new Path(getLogPath()), conf)) { StatusLogger.getLogger().error("init the hdfs writer failed!"); } diff --git a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java index b1712befb7..4d5132333e 100644 --- a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java +++ b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java @@ -173,14 +173,14 @@ public class SparkExecutorHdfsLogAppender extends AbstractHdfsLogAppender { if (ugi != null) { StatusLogger.getLogger().warn("Login user hashcode is " + ugi.hashCode()); ugi.doAs((PrivilegedExceptionAction<Void>) () -> { - if (!initHdfsWriter(file, SparkHadoopUtils.newConfigurationWithSparkConf())) { + if (!initHdfsWriter(file, SparkHadoopUtils.newConfiguration())) { StatusLogger.getLogger().error("Failed to init the hdfs writer!"); } doRollingClean(loggingEvent); return null; }); } else { - if (!initHdfsWriter(file, SparkHadoopUtils.newConfigurationWithSparkConf())) { + if (!initHdfsWriter(file, SparkHadoopUtils.newConfiguration())) { StatusLogger.getLogger().error("Failed to init the hdfs writer!"); } doRollingClean(loggingEvent); diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/utils/SparkHadoopUtils.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/utils/SparkHadoopUtils.scala index d6e77356bf..d3bf23a4ca 100644 --- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/utils/SparkHadoopUtils.scala +++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/utils/SparkHadoopUtils.scala @@ -18,16 +18,40 @@ package org.apache.spark.utils import org.apache.hadoop.conf.Configuration -import org.apache.spark.SparkEnv +import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil +/** + * Convenience utility object for invoking [[SparkHadoopUtil]]. + */ object SparkHadoopUtils { + /** + * Simply return a new default hadoop [[Configuration]]. + * @return Newly created default hadoop configuration + */ + def newConfiguration(): Configuration = { + new Configuration() + } + + /** + * Returns a new hadoop [[Configuration]] with current [[SparkConf]] from [[SparkEnv]]. + * @return Newly created hadoop configuration with extra spark properties + */ def newConfigurationWithSparkConf(): Configuration = { val sparkEnv = SparkEnv.get - if (sparkEnv != null) { - SparkHadoopUtil.newConfiguration(sparkEnv.conf) + if (sparkEnv == null) { + throw new IllegalStateException("sparkEnv should not be null") } - new Configuration() + SparkHadoopUtil.newConfiguration(sparkEnv.conf) + } + + /** + * Returns a new hadoop [[Configuration]] with [[SparkConf]] given. + * @param sparkConf A [[SparkConf]] + * @return Newly created hadoop configuration with extra spark properties given + */ + def newConfigurationWithSparkConf(sparkConf: SparkConf): Configuration = { + SparkHadoopUtil.newConfiguration(sparkConf) } } diff --git a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java index 7d796992b4..9bdbd5a3ff 100644 --- a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java +++ b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java @@ -18,12 +18,14 @@ package org.apache.kylin.helper; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.SetThreadName; @@ -128,7 +130,11 @@ public class RoutineToolHelper { public static void cleanMetaByProject(String projectName) { log.info("Start to clean up {} meta", projectName); try { - GarbageCleaner.cleanMetadata(projectName); + boolean needAggressiveOpt = Arrays + .stream(KylinConfig.getInstanceFromEnv().getProjectsAggressiveOptimizationIndex()) + .map(StringUtils::lowerCase).collect(Collectors.toList()) + .contains(StringUtils.toRootLowerCase(projectName)); + GarbageCleaner.cleanMetadata(projectName, needAggressiveOpt); } catch (Exception e) { log.error("Project[{}] cleanup Metadata failed", projectName, e); } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java index 498e0fcb35..04313ee7a5 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java @@ -36,6 +36,11 @@ public class ExecutableCleaner extends MetadataCleaner { super(project); } + @Override + public void beforeCleanup() { + // do nothing + } + @Override public void cleanup() { @@ -62,4 +67,9 @@ public class ExecutableCleaner extends MetadataCleaner { logger.info("Clean executable in project {} finished", project); } + @Override + public void afterCleanup() { + // do nothing + } + } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java index 5a05e52f8e..527f25527a 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java @@ -41,16 +41,18 @@ public class GarbageCleaner { * Clean up metadata * @param project */ - public static void cleanMetadata(String project) { + public static void cleanMetadata(String project, boolean needAggressiveOpt) { val projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project); if (projectInstance == null) { return; } - List<MetadataCleaner> cleaners = initCleaners(project); + List<MetadataCleaner> cleaners = initCleaners(project, needAggressiveOpt); cleaners.forEach(MetadataCleaner::prepare); EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> { + cleaners.forEach(MetadataCleaner::beforeCleanup); cleaners.forEach(MetadataCleaner::cleanup); + cleaners.forEach(MetadataCleaner::afterCleanup); return 0; }, project); @@ -58,8 +60,9 @@ public class GarbageCleaner { MetricsGroup.hostTagCounterInc(MetricsName.METADATA_CLEAN, MetricsCategory.PROJECT, project); } - private static List<MetadataCleaner> initCleaners(String project) { - return Arrays.asList(new SnapshotCleaner(project), new IndexCleaner(project), new ExecutableCleaner(project)); + private static List<MetadataCleaner> initCleaners(String project, boolean needAggressiveOpt) { + return Arrays.asList(new SnapshotCleaner(project), new IndexCleaner(project, needAggressiveOpt), + new ExecutableCleaner(project)); } } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java index 6c7b88fa4c..7036648383 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java @@ -20,21 +20,33 @@ package org.apache.kylin.tool.garbage; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.annotation.Clarification; +import org.apache.kylin.common.scheduler.EventBusFactory; +import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Maps; +import org.apache.kylin.metadata.cube.model.IndexPlan; +import org.apache.kylin.metadata.cube.model.LayoutEntity; +import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; +import org.apache.kylin.metadata.cube.model.NIndexPlanManager; import org.apache.kylin.metadata.cube.optimization.GarbageLayoutType; import org.apache.kylin.metadata.cube.optimization.IndexOptimizerFactory; +import org.apache.kylin.metadata.cube.optimization.event.ApproveRecsEvent; +import org.apache.kylin.metadata.cube.optimization.event.BuildIndexEvent; +import org.apache.kylin.metadata.cube.utils.IndexPlanReduceUtil; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.project.NProjectManager; import org.apache.kylin.metadata.recommendation.ref.OptRecManagerV2; import org.apache.kylin.metadata.recommendation.ref.OptRecV2; -import org.apache.kylin.guava30.shaded.common.collect.Lists; - import lombok.val; import lombok.extern.slf4j.Slf4j; @@ -42,10 +54,15 @@ import lombok.extern.slf4j.Slf4j; @Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise") public class IndexCleaner extends MetadataCleaner { - List<String> needUpdateModels = Lists.newArrayList(); + private List<String> needUpdateModels = Lists.newArrayList(); - public IndexCleaner(String project) { + private Map<NDataflow, Map<Long, GarbageLayoutType>> needOptAggressivelyModels = Maps.newHashMap(); + + private final boolean needAggressiveOpt; + + public IndexCleaner(String project, boolean needAggressiveOpt) { super(project); + this.needAggressiveOpt = needAggressiveOpt; } @Override @@ -59,15 +76,22 @@ public class IndexCleaner extends MetadataCleaner { log.info("not semiautomode, can't run index clean"); return; } + OptRecManagerV2 recManagerV2 = OptRecManagerV2.getInstance(project); for (val model : dataflowManager.listUnderliningDataModels()) { val dataflow = dataflowManager.getDataflow(model.getId()).copy(); - Map<Long, GarbageLayoutType> garbageLayouts = IndexOptimizerFactory.getOptimizer(dataflow, true) - .getGarbageLayoutMap(dataflow); + Map<Long, GarbageLayoutType> garbageLayouts = IndexOptimizerFactory + .getOptimizer(dataflow, needAggressiveOpt, true).getGarbageLayoutMap(dataflow); + + if (needAggressiveOpt) { + needOptAggressivelyModels.put(dataflow, garbageLayouts); + continue; + } if (MapUtils.isEmpty(garbageLayouts)) { continue; } + boolean hasNewRecItem = recManagerV2.genRecItemsFromIndexOptimizer(project, model.getUuid(), garbageLayouts); if (hasNewRecItem) { @@ -78,8 +102,22 @@ public class IndexCleaner extends MetadataCleaner { log.info("Clean index in project {} finished", project); } + @Override + public void beforeCleanup() { + if (MapUtils.isEmpty(needOptAggressivelyModels)) { + return; + } + + approveRec(); + mergeSameDimAggLayout(); + } + @Override public void cleanup() { + if (MapUtils.isNotEmpty(needOptAggressivelyModels)) { + cleanUpIndexAggressively(); + } + if (needUpdateModels.isEmpty()) { return; } @@ -96,4 +134,83 @@ public class IndexCleaner extends MetadataCleaner { }); } + @Override + public void afterCleanup() { + if (MapUtils.isEmpty(needOptAggressivelyModels)) { + return; + } + + buildIndexes(); + } + + private List<Set<LayoutEntity>> getMergedLayouts(IndexPlan indexPlan, Map<Long, GarbageLayoutType> garbageLayouts) { + List<LayoutEntity> merged = Lists.newArrayList(); + garbageLayouts.forEach(((layoutId, garbageLayoutType) -> { + LayoutEntity layout = indexPlan.getLayoutEntity(layoutId); + if (GarbageLayoutType.MERGED.equals(garbageLayoutType) && !Objects.isNull(layout)) { + merged.add(layout); + } + })); + + return IndexPlanReduceUtil.collectSameDimAggLayouts(merged); + } + + private void approveRec() { + EventBusFactory eventBusF = EventBusFactory.getInstance(); + eventBusF.callService(new ApproveRecsEvent(project, needOptAggressivelyModels)); + } + + private void mergeSameDimAggLayout() { + NIndexPlanManager indexPlanManager = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + needOptAggressivelyModels.forEach((dataflow, garbageLayouts) -> { + IndexPlan indexPlan = indexPlanManager.getIndexPlan(dataflow.getId()); + List<Set<LayoutEntity>> mergedLayouts = getMergedLayouts(indexPlan, garbageLayouts); + if (CollectionUtils.isEmpty(mergedLayouts)) { + return; + } + + log.info("merge same dimension index for model: {} under project: {}", dataflow.getModelAlias(), project); + indexPlanManager.updateIndexPlan(dataflow.getId(), copyForWrite -> { + IndexPlan indexPlanMerged = IndexPlanReduceUtil.mergeSameDimLayout(indexPlan, mergedLayouts); + copyForWrite.setIndexes(indexPlanMerged.getIndexes()); + copyForWrite.setLastModified(System.currentTimeMillis()); + }); + }); + } + + private void cleanUpIndexAggressively() { + NIndexPlanManager indexPlanManager = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + needOptAggressivelyModels.forEach((dataflow, garbageLayouts) -> { + if (MapUtils.isEmpty(garbageLayouts)) { + return; + } + log.info("aggressively clean up index for model: {} under project: {}", dataflow.getModelAlias(), project); + IndexPlan indexPlan = indexPlanManager.getIndexPlan(dataflow.getId()); + deleteIndexes(indexPlan, garbageLayouts.keySet()); + }); + } + + private void deleteIndexes(IndexPlan indexPlan, Set<Long> garbageLayouts) { + garbageLayouts.stream().map(layoutId -> indexPlan.getLayoutEntity(layoutId).getIndex()) + .forEachOrdered(index -> { + indexPlan.getIndexes().remove(index); + indexPlan.getToBeDeletedIndexes().add(index); + }); + + NIndexPlanManager indexPlanManager = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + indexPlanManager.updateIndexPlan(indexPlan.getId(), copyForWrite -> { + copyForWrite.setLastModified(System.currentTimeMillis()); + copyForWrite.getToBeDeletedIndexes() + .addAll(indexPlan.getToBeDeletedIndexes().stream() + .filter(index -> !copyForWrite.getToBeDeletedIndexes().contains(index)) + .collect(Collectors.toList())); + copyForWrite.getIndexes().removeAll(indexPlan.getToBeDeletedIndexes()); + }); + } + + private void buildIndexes() { + EventBusFactory eventBusFactory = EventBusFactory.getInstance(); + eventBusFactory + .callService(new BuildIndexEvent(project, Lists.newArrayList(needOptAggressivelyModels.keySet()))); + } } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java index 84f04556d8..2f96bfa281 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java @@ -25,9 +25,15 @@ public abstract class MetadataCleaner { this.project = project; } + // do in transaction + public abstract void beforeCleanup(); + // do in transaction public abstract void cleanup(); + // do in transaction + public abstract void afterCleanup(); + public void prepare() { // default do nothing } diff --git a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java index 1add64dc9c..3c1d897be8 100644 --- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java +++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java @@ -43,6 +43,11 @@ public class SnapshotCleaner extends MetadataCleaner { super(project); } + @Override + public void beforeCleanup() { + // do nothing + } + @Override public void prepare() { NTableMetadataManager tMgr = NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), project); @@ -88,4 +93,9 @@ public class SnapshotCleaner extends MetadataCleaner { } logger.info("Clean snapshot in project {} finished", project); } + + @Override + public void afterCleanup() { + // do nothing + } }