This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 60da9e0ce1fa789ad580ca62ae4d47162f9d3201
Author: Yinghao Lin <39019287+yhca...@users.noreply.github.com>
AuthorDate: Fri Jul 21 21:09:31 2023 +0800

    KYLIN-5768 Refine and adjust some code for localCache
    
    ---------
    Co-authored-by: sibingzhang <74443791+sibingzh...@users.noreply.github.com>
    Co-authored-by: sibing.zhang <sibing.zh...@qq.com>
---
 pom.xml                                            |  12 +-
 .../rest/controller/NBasicControllerTest.java      |   2 +-
 .../apache/kylin/rest/service/ProjectService.java  |   9 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |   8 ++
 .../kylin/metadata/cube/model/IndexPlan.java       | 136 +++++++++++++++------
 .../cube/optimization/AbstractOptStrategy.java     |   4 +-
 .../cube/optimization/GarbageLayoutType.java       |   2 +-
 .../optimization/IncludedLayoutOptStrategy.java    |   2 +-
 .../metadata/cube/optimization/IndexOptimizer.java |  14 ++-
 .../cube/optimization/IndexOptimizerFactory.java   |  12 +-
 .../optimization/LowFreqLayoutOptStrategy.java     |   2 +-
 .../cube/optimization/MergedLayoutOptStrategy.java |  59 +++++++++
 .../optimization/SimilarLayoutOptStrategy.java     |   2 +-
 .../cube/optimization/event/ApproveRecsEvent.java} |  27 ++--
 .../cube/optimization/event/BuildIndexEvent.java}  |  26 ++--
 .../cube/storage/GarbageStorageCollector.java      |   2 +-
 .../metadata/cube/utils/IndexPlanReduceUtil.java   |  52 +++++++-
 .../storage/ProjectStorageInfoCollectorTest.java   |  74 +++++++++--
 .../cube/utils/IndexPlanReduceUtilTest.java        |  27 +++-
 .../kylin/rest/initialize/BuildAppInitializer.java |   5 +
 .../kylin/rest/service/ModelBuildService.java      |  38 ++++--
 .../kylin/rest/service/ModelServiceBuildTest.java  |  36 +++++-
 .../kylin/rest/controller/NProjectController.java  |  14 ++-
 .../rest/controller/NProjectControllerTest.java    |   2 +-
 .../kylin/rest/service/AbstractModelService.java   | 114 ++++++++++++++++-
 .../kylin/rest/service/BaseIndexUpdateHelper.java  |  17 ++-
 .../kylin/rest/service/IndexPlanService.java       |  66 +++++++---
 .../apache/kylin/rest/service/ModelService.java    |  33 +++--
 .../kylin/rest/service/IndexPlanServiceTest.java   |  52 ++++----
 .../kylin/rest/service/ProjectServiceTest.java     |   2 +-
 .../plugin/diagnose/DiagnoseExecutorPlugin.scala   |   9 +-
 .../common/logging/AbstractHdfsLogAppender.java    |   7 +-
 .../common/logging/SparkDriverHdfsLogAppender.java |   2 +-
 .../logging/SparkExecutorHdfsLogAppender.java      |   4 +-
 .../org/apache/spark/utils/SparkHadoopUtils.scala  |  32 ++++-
 .../org/apache/kylin/helper/RoutineToolHelper.java |   8 +-
 .../kylin/tool/garbage/ExecutableCleaner.java      |  10 ++
 .../apache/kylin/tool/garbage/GarbageCleaner.java  |  11 +-
 .../apache/kylin/tool/garbage/IndexCleaner.java    | 129 ++++++++++++++++++-
 .../apache/kylin/tool/garbage/MetadataCleaner.java |   6 +
 .../apache/kylin/tool/garbage/SnapshotCleaner.java |  10 ++
 41 files changed, 888 insertions(+), 191 deletions(-)

diff --git a/pom.xml b/pom.xml
index 9b68f4a83d..ff1f965ea3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -258,7 +258,11 @@
             
**/org/apache/kylin/tool/hadoop/KapGetPathWithoutSchemeAndAuthorityCLI.java,
             **/org/apache/kylin/engine/spark/streaming/**/*,
             **/org/apache/kylin/rest/security/KerberosLoginManager.java,
-            **/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
+            
**/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java,
+            **/org/apache/kylin/tool/garbage/IndexCleaner.java,
+            
**/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java,
+            **/org/apache/kylin/rest/service/AbstractModelService.java,
+            **/org/apache/spark/utils/SparkHadoopUtils.scala
         </sonar.jacoco.excludes>
         <sonar.coverage.exclusions>
             **/org/apache/kylin/**/*Exception.java,
@@ -304,7 +308,11 @@
             **/org/apache/kylin/rest/util/InitResourceGroupUtils.java,
             **/org/apache/kylin/rest/QueryNodeFilter.java,
             **/org/apache/kylin/rest/config/SwaggerConfig.java,
-            **/org/apache/kylin/rest/config/SwaggerCompatibilityConfig.java
+            **/org/apache/kylin/rest/config/SwaggerCompatibilityConfig.java,
+            **/org/apache/kylin/tool/garbage/IndexCleaner.java,
+            
**/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java,
+            **/org/apache/kylin/rest/service/AbstractModelService.java,
+            **/org/apache/spark/utils/SparkHadoopUtils.scala
         </sonar.coverage.exclusions>
         <sonar.organization>kylin</sonar.organization>
         <!--suppress UnresolvedMavenProperty -->
diff --git 
a/src/common-server/src/test/java/org/apache/kylin/rest/controller/NBasicControllerTest.java
 
b/src/common-server/src/test/java/org/apache/kylin/rest/controller/NBasicControllerTest.java
index 34ac02cab2..586fcc62b6 100644
--- 
a/src/common-server/src/test/java/org/apache/kylin/rest/controller/NBasicControllerTest.java
+++ 
b/src/common-server/src/test/java/org/apache/kylin/rest/controller/NBasicControllerTest.java
@@ -256,7 +256,7 @@ public class NBasicControllerTest extends 
NLocalFileMetadataTestCase {
     @Test
     public void testCheckParamLength() {
         thrown.expect(KylinException.class);
-        
thrown.expectMessage(String.format(Message.getInstance().getParamTooLarge(), 
"tag", 1000));
+        
thrown.expectMessage(String.format(MsgPicker.getMsg().getParamTooLarge(), 
"tag", 1000));
         List param = new ArrayList();
         param.add(1);
         param.add(6);
diff --git 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
index 416f577e0e..aafde56676 100644
--- 
a/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
+++ 
b/src/common-service/src/main/java/org/apache/kylin/rest/service/ProjectService.java
@@ -358,7 +358,10 @@ public class ProjectService extends BasicService {
                 logger.info("Start to cleanup garbage for project<{}>", 
project.getName());
                 try {
                     projectSmartService.cleanupGarbage(project.getName(), 
remainingTime);
-                    GarbageCleaner.cleanMetadata(project.getName());
+                    boolean needAggressiveOpt = 
Arrays.stream(config.getProjectsAggressiveOptimizationIndex())
+                            
.map(StringUtils::lowerCase).collect(Collectors.toList())
+                            
.contains(StringUtils.toRootLowerCase(project.getName()));
+                    GarbageCleaner.cleanMetadata(project.getName(), 
needAggressiveOpt);
                     EventBusFactory.getInstance().callService(new 
ProjectCleanOldQueryResultEvent(project.getName()));
                 } catch (Exception e) {
                     logger.warn("clean project<" + project.getName() + "> 
failed", e);
@@ -407,9 +410,9 @@ public class ProjectService extends BasicService {
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or 
hasPermission(#project, 'ADMINISTRATION')")
-    public void cleanupGarbage(String project) throws Exception {
+    public void cleanupGarbage(String project, boolean needAggressiveOpt) 
throws Exception {
         projectSmartService.cleanupGarbage(project, 0);
-        GarbageCleaner.cleanMetadata(project);
+        GarbageCleaner.cleanMetadata(project, needAggressiveOpt);
         asyncTaskService.cleanupStorage();
     }
 
diff --git 
a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7f5f2fb9e3..f48c808cdc 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -4017,4 +4017,12 @@ public abstract class KylinConfigBase implements 
Serializable {
         String defaultValue = 
"org.apache.kylin.common.extension.KylinInfoExtension$Factory";
         return getOptional("kylin.extension.info.factory", defaultValue);
     }
+
+    public String[] getProjectsAggressiveOptimizationIndex() {
+        return 
getOptionalStringArray("kylin.index.projects-optimized-aggressively", new 
String[0]);
+    }
+
+    public int getExpectedIndexSizeOptimized() {
+        return 
Integer.parseInt(getOptional("kylin.index.expected-size-after-optimization", 
"0"));
+    }
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java
index 22336c3bc4..e44fa1b838 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/model/IndexPlan.java
@@ -36,6 +36,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
@@ -50,12 +51,26 @@ import 
org.apache.kylin.common.persistence.MissingRootPersistentEntity;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
+import org.apache.kylin.guava30.shaded.common.base.Joiner;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.collect.BiMap;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableSortedSet;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
@@ -64,18 +79,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonManagedReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
-import org.apache.kylin.guava30.shaded.common.base.Joiner;
-import org.apache.kylin.guava30.shaded.common.base.Preconditions;
-import org.apache.kylin.guava30.shaded.common.collect.BiMap;
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableBiMap;
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableMap;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
 
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableSortedSet;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -180,6 +184,11 @@ public class IndexPlan extends RootPersistentEntity 
implements Serializable, IEn
      */
     private final List<String> errors = Lists.newLinkedList();
 
+    @Setter
+    @Getter
+    @JsonProperty("base_agg_index_reduce_high_cardinality_dim")
+    private boolean baseAggIndexReduceHighCardinalityDim;
+
     public void initAfterReload(KylinConfig config, String p) {
         this.project = p;
         initConfig4IndexPlan(config);
@@ -866,6 +875,19 @@ public class IndexPlan extends RootPersistentEntity 
implements Serializable, IEn
                 
getRuleBaseLayouts().stream().map(LayoutEntity::getId).collect(Collectors.toSet()),
 layoutIds));
     }
 
+    private boolean isHighCardinalityDim(NTableMetadataManager tableManager, 
TblColRef colRef) {
+        String tableIdentity = colRef.getTableRef().getTableIdentity();
+        TableDesc tableDesc = tableManager.getTableDesc(tableIdentity);
+        TableExtDesc tableExtIfExists = 
tableManager.getTableExtIfExists(tableDesc);
+        TableExtDesc.ColumnStats columnStats = 
tableExtIfExists.getColumnStatsByName(colRef.getName());
+
+        if (Objects.isNull(columnStats)) {
+            return false;
+        }
+
+        return (double) (columnStats.getCardinality()) / 
tableExtIfExists.getTotalRows() > 0.2;
+    }
+
     private void removeLayouts(Collection<IndexEntity> indexes, Set<Long> 
layoutIds, boolean deleteAuto,
             boolean deleteManual) {
         checkIsNotCachedAndShared();
@@ -937,10 +959,24 @@ public class IndexPlan extends RootPersistentEntity 
implements Serializable, IEn
     }
 
     public LayoutEntity createBaseAggIndex(NDataModel model) {
-        List<Integer> dims = model.getEffectiveDimensions().keySet().asList();
+        NTableMetadataManager tableManager = 
NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(),
+                model.getProject());
+        List<Integer> dims;
+        if (this.isBaseAggIndexReduceHighCardinalityDim()) {
+            List<Integer> list = new ArrayList<>();
+            for (Integer dimId : model.getEffectiveDimensions().keySet()) {
+                TblColRef colRef = model.getColRef(dimId);
+                if (!isHighCardinalityDim(tableManager, colRef)) {
+                    list.add(dimId);
+                }
+            }
+            dims = list;
+        } else {
+            dims = model.getEffectiveDimensions().keySet().asList();
+        }
         List<Integer> measures = 
model.getEffectiveMeasures().keySet().asList();
         List<Integer> colOrder = naturalOrderCombine(dims, measures);
-        return createLayout(colOrder, true);
+        return createLayout(colOrder, true, true, Lists.newArrayList());
     }
 
     public LayoutEntity createBaseTableIndex() {
@@ -954,16 +990,17 @@ public class IndexPlan extends RootPersistentEntity 
implements Serializable, IEn
         if (colOrder.isEmpty()) {
             return null;
         }
-        return createLayout(colOrder, false);
+        return createLayout(colOrder, false, true, Lists.newArrayList());
     }
 
-    private LayoutEntity createLayout(List<Integer> colOrder, boolean isAgg) {
-        LayoutEntity newBaseLayout = new LayoutEntity();
-        newBaseLayout.setColOrder(colOrder);
-        newBaseLayout.setUpdateTime(System.currentTimeMillis());
-        newBaseLayout.setBase(true);
-        newBaseLayout.initalId(isAgg);
-        return newBaseLayout;
+    public LayoutEntity createLayout(List<Integer> colOrder, boolean isAgg, 
boolean isBase, List<Integer> shardByCols) {
+        LayoutEntity newLayout = new LayoutEntity();
+        newLayout.setColOrder(colOrder);
+        newLayout.setUpdateTime(System.currentTimeMillis());
+        newLayout.setBase(isBase);
+        newLayout.initalId(isAgg);
+        newLayout.setShardByColumns(shardByCols);
+        return newLayout;
     }
 
     private List<Integer> naturalOrderCombine(List<Integer> col1, 
List<Integer> col2) {
@@ -1143,6 +1180,10 @@ public class IndexPlan extends RootPersistentEntity 
implements Serializable, IEn
         }
 
         public boolean add(LayoutEntity layout, boolean isAgg) {
+            return add(layout, isAgg, true);
+        }
+
+        public boolean add(LayoutEntity layout, boolean isAgg, boolean 
needUpdateApprovedRecs) {
             val identifier = createIndexIdentifier(layout, isAgg);
             if (allIndexesMap.get(identifier) != null && 
allIndexesMap.get(identifier).getLayouts().contains(layout)) {
                 return false;
@@ -1176,11 +1217,20 @@ public class IndexPlan extends RootPersistentEntity 
implements Serializable, IEn
                 indexEntity.getLayouts().add(layout);
                 addedLayouts.add(layout.getId());
             }
-            approvedAdditionalRecs += 1;
+
+            if (needUpdateApprovedRecs) {
+                approvedAdditionalRecs += 1;
+            }
+
             return true;
         }
 
         public boolean remove(LayoutEntity layout, boolean isAgg, boolean 
needAddBlackList) {
+            return remove(layout, isAgg, needAddBlackList, true);
+        }
+
+        public boolean remove(LayoutEntity layout, boolean isAgg, boolean 
needAddBlackList,
+                              boolean needUpdateApprovedRecs) {
             IndexEntity.IndexIdentifier identifier = 
createIndexIdentifier(layout, isAgg);
             if (allIndexesMap.containsKey(identifier) && 
allIndexesMap.get(identifier).getLayouts().contains(layout)) {
                 IndexEntity indexEntity = allIndexesMap.get(identifier);
@@ -1190,33 +1240,45 @@ public class IndexPlan extends RootPersistentEntity 
implements Serializable, IEn
                 }
 
                 if (layoutInIndexPlan.isManual()) {
-                    if (isAgg && needAddBlackList) {
-                        // For similar strategy only works on AggIndex, we 
need add this to black list.
-                        
indexPlan.addRuleBasedBlackList(Lists.newArrayList(layout.getId()));
-                        if (layoutInIndexPlan.isAuto()) {
-                            indexEntity.getLayouts().remove(layoutInIndexPlan);
-                            whiteIndexesMap.values().stream().filter(
-                                    indexEntityInIndexPlan -> 
indexEntityInIndexPlan.getId() == indexEntity.getId())
-                                    
.findFirst().ifPresent(indexEntityInIndexPlan -> 
indexEntityInIndexPlan.getLayouts()
-                                            .remove(layoutInIndexPlan));
-                        }
-                        this.approvedRemovalRecs += 1;
-                        return true;
-                    }
-                    return false;
+                    return removeManualPlan(layout, isAgg, needAddBlackList, 
needUpdateApprovedRecs, indexEntity, layoutInIndexPlan);
                 }
                 indexEntity.getLayouts().remove(layoutInIndexPlan);
                 whiteIndexesMap.values().stream()
                         .filter(indexEntityInIndexPlan -> 
indexEntityInIndexPlan.getId() == indexEntity.getId())
                         .findFirst().ifPresent(indexEntityInIndexPlan -> 
indexEntityInIndexPlan.getLayouts()
                                 .remove(layoutInIndexPlan));
-                this.approvedRemovalRecs += 1;
+
+                if (needUpdateApprovedRecs) {
+                    this.approvedRemovalRecs += 1;
+                }
                 return true;
             } else {
                 return false;
             }
         }
 
+        private boolean removeManualPlan(LayoutEntity layout, boolean isAgg, 
boolean needAddBlackList,
+                                         boolean needUpdateApprovedRecs, 
IndexEntity indexEntity,
+                                         LayoutEntity layoutInIndexPlan) {
+            if (isAgg && needAddBlackList) {
+                // For similar strategy only works on AggIndex, we need add 
this to black list.
+                
indexPlan.addRuleBasedBlackList(Lists.newArrayList(layout.getId()));
+                if (layoutInIndexPlan.isAuto()) {
+                    indexEntity.getLayouts().remove(layoutInIndexPlan);
+                    whiteIndexesMap.values().stream().filter(
+                                    indexEntityInIndexPlan -> 
indexEntityInIndexPlan.getId() == indexEntity.getId())
+                            .findFirst().ifPresent(indexEntityInIndexPlan -> 
indexEntityInIndexPlan.getLayouts()
+                                    .remove(layoutInIndexPlan));
+                }
+
+                if (needUpdateApprovedRecs) {
+                    this.approvedRemovalRecs += 1;
+                }
+                return true;
+            }
+            return false;
+        }
+
         public IndexPlan complete() {
             
indexPlan.setIndexes(whiteIndexesMap.values().stream().sorted(Comparator.comparingLong(IndexEntity::getId))
                     .collect(Collectors.toList()));
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/AbstractOptStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/AbstractOptStrategy.java
index fc9c38b2c2..a977583f8f 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/AbstractOptStrategy.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/AbstractOptStrategy.java
@@ -52,11 +52,11 @@ public abstract class AbstractOptStrategy {
 
     private List<LayoutEntity> beforeCollect(List<LayoutEntity> inputLayouts) {
         List<LayoutEntity> layoutsToHandle = Lists.newArrayList(inputLayouts);
-        skipOptimizeTableIndex(layoutsToHandle);
+        skipOptimizeIndex(layoutsToHandle);
         return layoutsToHandle;
     }
 
-    protected abstract void skipOptimizeTableIndex(List<LayoutEntity> 
inputLayouts);
+    protected abstract void skipOptimizeIndex(List<LayoutEntity> inputLayouts);
 
     private void afterCollect(List<LayoutEntity> inputLayouts, Set<Long> 
garbages) {
         inputLayouts.removeIf(layout -> garbages.contains(layout.getId()));
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/GarbageLayoutType.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/GarbageLayoutType.java
index 935bd57724..397da688b9 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/GarbageLayoutType.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/GarbageLayoutType.java
@@ -19,5 +19,5 @@
 package org.apache.kylin.metadata.cube.optimization;
 
 public enum GarbageLayoutType {
-    LOW_FREQUENCY, INCLUDED, SIMILAR
+    LOW_FREQUENCY, INCLUDED, SIMILAR, MERGED
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IncludedLayoutOptStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IncludedLayoutOptStrategy.java
index c0b224a807..ffdf0dfbab 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IncludedLayoutOptStrategy.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IncludedLayoutOptStrategy.java
@@ -58,7 +58,7 @@ public class IncludedLayoutOptStrategy extends 
AbstractOptStrategy {
     }
 
     @Override
-    protected void skipOptimizeTableIndex(List<LayoutEntity> inputLayouts) {
+    protected void skipOptimizeIndex(List<LayoutEntity> inputLayouts) {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         if (!kylinConfig.isIncludedStrategyConsiderTableIndex()) {
             inputLayouts.removeIf(layout -> 
IndexEntity.isTableIndex(layout.getId()));
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizer.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizer.java
index 8317b2592c..9a81ea0890 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizer.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizer.java
@@ -25,13 +25,12 @@ import java.util.stream.Collectors;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.annotation.Clarification;
-import org.apache.kylin.metadata.cube.model.LayoutEntity;
-import org.apache.kylin.metadata.cube.model.NDataflow;
-import org.apache.kylin.metadata.project.NProjectManager;
-
 import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.project.NProjectManager;
 
 import lombok.Getter;
 
@@ -54,7 +53,9 @@ public class IndexOptimizer {
     protected List<LayoutEntity> filterAutoLayouts(NDataflow dataflow) {
         return dataflow.extractReadyLayouts().stream() //
                 .filter(layout -> !layout.isManual() && layout.isAuto()) //
-                .filter(layout -> 
!layout.isBase()).collect(Collectors.toList());
+                .filter(layout -> !layout.isBase()) //
+                .filter(layout -> !layout.isToBeDeleted()) //
+                .collect(Collectors.toList());
     }
 
     protected List<LayoutEntity> filterManualLayouts(NDataflow dataflow) {
@@ -70,8 +71,9 @@ public class IndexOptimizer {
         }
 
         Map<Long, GarbageLayoutType> garbageLayoutTypeMap = Maps.newHashMap();
+        List<LayoutEntity> autoLayouts = filterAutoLayouts(dataflow);
         for (AbstractOptStrategy strategy : getStrategiesForAuto()) {
-            strategy.collectGarbageLayouts(filterAutoLayouts(dataflow), 
dataflow, needLog)
+            strategy.collectGarbageLayouts(autoLayouts, dataflow, needLog)
                     .forEach(id -> garbageLayoutTypeMap.put(id, 
strategy.getType()));
         }
 
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizerFactory.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizerFactory.java
index 850b884420..b68f37180d 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizerFactory.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/IndexOptimizerFactory.java
@@ -20,9 +20,8 @@ package org.apache.kylin.metadata.cube.optimization;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.annotation.Clarification;
-import org.apache.kylin.metadata.cube.model.NDataflow;
-
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.metadata.cube.model.NDataflow;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -36,8 +35,9 @@ public class IndexOptimizerFactory {
     private static final AbstractOptStrategy INCLUDED_OPT_STRATEGY = new 
IncludedLayoutOptStrategy();
     private static final AbstractOptStrategy LOW_FREQ_OPT_STRATEGY = new 
LowFreqLayoutOptStrategy();
     private static final AbstractOptStrategy SIMILAR_OPT_STRATEGY = new 
SimilarLayoutOptStrategy();
+    private static final AbstractOptStrategy MERGED_OPT_STRATEGY = new 
MergedLayoutOptStrategy();
 
-    public static IndexOptimizer getOptimizer(NDataflow dataflow, boolean 
needLog) {
+    public static IndexOptimizer getOptimizer(NDataflow dataflow, boolean 
needAggressiveOpt, boolean needLog) {
         IndexOptimizer optimizer = new IndexOptimizer(needLog);
         final int indexOptimizationLevel = 
KylinConfig.getInstanceFromEnv().getIndexOptimizationLevel();
         if (indexOptimizationLevel == 1) {
@@ -49,6 +49,12 @@ public class IndexOptimizerFactory {
             optimizer.getStrategiesForManual().add(SIMILAR_OPT_STRATEGY);
         }
 
+        if (needAggressiveOpt) {
+            optimizer.getStrategiesForAuto().clear();
+            optimizer.getStrategiesForAuto()
+                    .addAll(Lists.newArrayList(INCLUDED_OPT_STRATEGY, 
LOW_FREQ_OPT_STRATEGY, MERGED_OPT_STRATEGY));
+        }
+
         // log if needed
         printLog(needLog, indexOptimizationLevel, 
dataflow.getIndexPlan().isFastBitmapEnabled());
         return optimizer;
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/LowFreqLayoutOptStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/LowFreqLayoutOptStrategy.java
index cfb2d4f22d..85250a4bdf 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/LowFreqLayoutOptStrategy.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/LowFreqLayoutOptStrategy.java
@@ -72,7 +72,7 @@ public class LowFreqLayoutOptStrategy extends 
AbstractOptStrategy {
     }
 
     @Override
-    protected void skipOptimizeTableIndex(List<LayoutEntity> inputLayouts) {
+    protected void skipOptimizeIndex(List<LayoutEntity> inputLayouts) {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         if (!kylinConfig.isLowFreqStrategyConsiderTableIndex()) {
             inputLayouts.removeIf(layout -> 
IndexEntity.isTableIndex(layout.getId()));
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/MergedLayoutOptStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/MergedLayoutOptStrategy.java
new file mode 100644
index 0000000000..a6bc091c38
--- /dev/null
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/MergedLayoutOptStrategy.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.cube.optimization;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.kylin.metadata.cube.model.IndexEntity;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.cube.utils.IndexPlanReduceUtil;
+
+import com.google.common.collect.Sets;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class MergedLayoutOptStrategy extends AbstractOptStrategy {
+
+    public MergedLayoutOptStrategy() {
+        this.setType(GarbageLayoutType.MERGED);
+    }
+
+    @Override
+    protected Set<Long> doCollect(List<LayoutEntity> inputLayouts, NDataflow 
dataflow, boolean needLog) {
+        Set<Long> garbageLayouts = Sets.newHashSet();
+        List<Set<LayoutEntity>> sameDimAggLayouts = 
IndexPlanReduceUtil.collectSameDimAggLayouts(inputLayouts);
+        garbageLayouts.addAll(
+                
sameDimAggLayouts.stream().flatMap(Set::stream).map(LayoutEntity::getId).collect(Collectors.toSet()));
+
+        if (needLog) {
+            log.info("In dataflow({}), MergeLayoutOptStrategy found garbage 
laoyouts: {}.", dataflow.getId(),
+                    garbageLayouts);
+        }
+        return garbageLayouts;
+    }
+
+    @Override
+    protected void skipOptimizeIndex(List<LayoutEntity> inputLayouts) {
+        inputLayouts.removeIf(layout -> layout.isBase() || 
IndexEntity.isTableIndex(layout.getId()));
+    }
+}
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/SimilarLayoutOptStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/SimilarLayoutOptStrategy.java
index dba099e210..b7eecdfd07 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/SimilarLayoutOptStrategy.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/SimilarLayoutOptStrategy.java
@@ -69,7 +69,7 @@ public class SimilarLayoutOptStrategy extends 
AbstractOptStrategy {
     }
 
     @Override
-    protected void skipOptimizeTableIndex(List<LayoutEntity> inputLayouts) {
+    protected void skipOptimizeIndex(List<LayoutEntity> inputLayouts) {
         inputLayouts.removeIf(layout -> 
IndexEntity.isTableIndex(layout.getId()));
     }
 
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/ApproveRecsEvent.java
similarity index 62%
copy from 
src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java
copy to 
src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/ApproveRecsEvent.java
index 84f04556d8..fb99ac98a2 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/ApproveRecsEvent.java
@@ -16,19 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.tool.garbage;
+package org.apache.kylin.metadata.cube.optimization.event;
 
-public abstract class MetadataCleaner {
-    protected final String project;
+import java.util.Map;
 
-    protected MetadataCleaner(String project) {
-        this.project = project;
-    }
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.cube.optimization.GarbageLayoutType;
 
-    // do in transaction
-    public abstract void cleanup();
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 
-    public void prepare() {
-        // default do nothing
-    }
+@Setter
+@Getter
+@AllArgsConstructor
+@ToString
+public class ApproveRecsEvent {
+    private String project;
+
+    Map<NDataflow, Map<Long, GarbageLayoutType>> needOptAggressivelyModels;
 }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/BuildIndexEvent.java
similarity index 67%
copy from 
src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java
copy to 
src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/BuildIndexEvent.java
index 84f04556d8..19d984c76a 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/optimization/event/BuildIndexEvent.java
@@ -16,19 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.tool.garbage;
+package org.apache.kylin.metadata.cube.optimization.event;
 
-public abstract class MetadataCleaner {
-    protected final String project;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.kylin.metadata.cube.model.NDataflow;
 
-    protected MetadataCleaner(String project) {
-        this.project = project;
-    }
+import java.util.List;
 
-    // do in transaction
-    public abstract void cleanup();
-
-    public void prepare() {
-        // default do nothing
-    }
+@Setter
+@Getter
+@AllArgsConstructor
+@ToString
+public class BuildIndexEvent {
+    private String project;
+    private List<NDataflow> dataflows;
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
index 24d4d850f3..1a87822766 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/storage/GarbageStorageCollector.java
@@ -49,7 +49,7 @@ public class GarbageStorageCollector implements 
StorageInfoCollector {
         for (val model : getModels(project)) {
             val dataflow = getDataflow(model).copy();
 
-            final IndexOptimizer indexOptimizer = 
IndexOptimizerFactory.getOptimizer(dataflow, false);
+            final IndexOptimizer indexOptimizer = 
IndexOptimizerFactory.getOptimizer(dataflow, false, false);
             val garbageLayouts = 
indexOptimizer.getGarbageLayoutMap(dataflow).keySet();
             if (CollectionUtils.isNotEmpty(garbageLayouts)) {
                 storageSize += calculateLayoutSize(garbageLayouts, dataflow);
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtil.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtil.java
index 08f2343733..736a8f80c3 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtil.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtil.java
@@ -18,21 +18,25 @@
 
 package org.apache.kylin.metadata.cube.utils;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import org.apache.kylin.metadata.cube.model.IndexEntity;
-import org.apache.kylin.metadata.cube.model.LayoutEntity;
-import org.apache.kylin.metadata.model.NDataModel;
-
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
+import org.apache.kylin.metadata.cube.model.IndexEntity;
+import org.apache.kylin.metadata.cube.model.IndexPlan;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.model.NDataModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class IndexPlanReduceUtil {
+    private static final Logger log = 
LoggerFactory.getLogger(IndexPlanReduceUtil.class);
 
     private IndexPlanReduceUtil() {
     }
@@ -65,6 +69,46 @@ public class IndexPlanReduceUtil {
         return redundantMap;
     }
 
+    public static List<Set<LayoutEntity>> 
collectSameDimAggLayouts(List<LayoutEntity> inputLayouts) {
+        List<Set<LayoutEntity>> sameDimAggLayouts = Lists.newArrayList();
+        Map<List<Integer>, Set<LayoutEntity>> aggLayoutDimGroup = 
Maps.newHashMap();
+        inputLayouts.stream().filter(layout -> !layout.isBase() && 
!IndexEntity.isTableIndex(layout.getId()))
+                .forEach(layout -> {
+                    List<Integer> aggLayoutDims = new ArrayList<>();
+                    for (Integer idx : layout.getColOrder()) {
+                        if (idx < NDataModel.MEASURE_ID_BASE) {
+                            aggLayoutDims.add(idx);
+                        }
+                    }
+                    aggLayoutDimGroup.putIfAbsent(aggLayoutDims, 
Sets.newHashSet());
+                    aggLayoutDimGroup.get(aggLayoutDims).add(layout);
+                });
+
+        sameDimAggLayouts.addAll(
+                aggLayoutDimGroup.values().stream().filter(layouts -> 
layouts.size() > 1).collect(Collectors.toSet()));
+        return sameDimAggLayouts;
+    }
+
+    public static IndexPlan mergeSameDimLayout(IndexPlan indexPlan, 
List<Set<LayoutEntity>> sameDimLayouts) {
+        IndexPlan.IndexPlanUpdateHandler updateHandler = 
indexPlan.createUpdateHandler();
+        for (Set<LayoutEntity> layoutEntities : sameDimLayouts) {
+            Set<Integer> colOrder = Sets.newLinkedHashSet();
+            List<Integer> allColOrders = Lists.newArrayList();
+            List<Integer> shardByCol = Lists.newArrayList();
+            for (LayoutEntity layoutEntity : layoutEntities) {
+                colOrder.addAll(layoutEntity.getColOrder());
+                allColOrders.addAll(layoutEntity.getColOrder());
+                shardByCol = layoutEntity.getShardByColumns();
+            }
+
+            LayoutEntity mergedLayout = 
indexPlan.createLayout(Lists.newArrayList(colOrder), true, false, shardByCol);
+            log.info("merge colOrders: {} into {}", allColOrders, 
mergedLayout.getColOrder());
+            updateHandler.add(mergedLayout, true, false);
+        }
+
+        return updateHandler.complete();
+    }
+
     /**
      * Collect a redundant map from included layout to reserved layout.
      * @param sortedLayouts sorted by layout's colOrder
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java
index b1fd2eb198..e324f2673e 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/storage/ProjectStorageInfoCollectorTest.java
@@ -33,6 +33,9 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.common.util.Unsafe;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
 import org.apache.kylin.metadata.cube.model.IndexPlan;
 import org.apache.kylin.metadata.cube.model.LayoutEntity;
@@ -46,6 +49,7 @@ import 
org.apache.kylin.metadata.cube.optimization.FrequencyMap;
 import org.apache.kylin.metadata.cube.optimization.IncludedLayoutOptStrategy;
 import org.apache.kylin.metadata.cube.optimization.IndexOptimizer;
 import org.apache.kylin.metadata.cube.optimization.LowFreqLayoutOptStrategy;
+import org.apache.kylin.metadata.cube.optimization.MergedLayoutOptStrategy;
 import org.apache.kylin.metadata.cube.optimization.SimilarLayoutOptStrategy;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
 import org.apache.kylin.metrics.HdfsCapacityMetrics;
@@ -55,10 +59,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
 import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
@@ -88,8 +88,8 @@ public class ProjectStorageInfoCollectorTest extends 
NLocalFileMetadataTestCase
         getTestConfig().setProperty("kylin.metadata.semi-automatic-mode", 
"true");
         initTestData();
 
-        val collector = new 
ProjectStorageInfoCollector(Lists.newArrayList(StorageInfoEnum.GARBAGE_STORAGE, 
StorageInfoEnum.STORAGE_QUOTA,
-                StorageInfoEnum.TOTAL_STORAGE));
+        val collector = new 
ProjectStorageInfoCollector(Lists.newArrayList(StorageInfoEnum.GARBAGE_STORAGE,
+                StorageInfoEnum.STORAGE_QUOTA, StorageInfoEnum.TOTAL_STORAGE));
         val volumeInfo = collector.getStorageVolumeInfo(getTestConfig(), 
DEFAULT_PROJECT);
 
         Assert.assertEquals(10240L * 1024 * 1024 * 1024, 
volumeInfo.getStorageQuotaSize());
@@ -236,6 +236,65 @@ public class ProjectStorageInfoCollectorTest extends 
NLocalFileMetadataTestCase
         Assert.assertEquals(new Integer(200), dateFrequency.get(currentDate - 
DAY_IN_MILLIS));
     }
 
+    @Test
+    public void testMergedLayoutGcStrategy() {
+        NIndexPlanManager indexPlanManager = 
NIndexPlanManager.getInstance(getTestConfig(), DEFAULT_PROJECT);
+        NDataflowManager dataflowManager = 
NDataflowManager.getInstance(getTestConfig(), DEFAULT_PROJECT);
+        IndexPlan indexPlan = 
indexPlanManager.getIndexPlan(DEFAULT_MODEL_BASIC_ID);
+        indexPlanManager.updateIndexPlan(indexPlan.getUuid(), copyForWrite -> {
+            LayoutEntity layout1 = new LayoutEntity();
+            layout1.setId(40001L);
+            layout1.setColOrder(Lists.newArrayList(1, 2, 3, 100006, 100007));
+            layout1.setAuto(false);
+            IndexEntity index1 = new IndexEntity();
+            index1.setId(40000L);
+            index1.setDimensions(Lists.newArrayList(1, 2, 3));
+            index1.setMeasures(Lists.newArrayList(100006, 100007));
+            index1.setLayouts(Lists.newArrayList(layout1));
+
+            LayoutEntity layout2 = new LayoutEntity();
+            layout2.setId(50001L);
+            layout2.setColOrder(Lists.newArrayList(1, 2, 3, 100001, 100002));
+            layout2.setAuto(true);
+            IndexEntity index2 = new IndexEntity();
+            index2.setId(50000L);
+            index2.setDimensions(Lists.newArrayList(1, 2, 3));
+            index2.setMeasures(Lists.newArrayList(100001, 100002));
+            index2.setLayouts(Lists.newArrayList(layout2));
+
+            LayoutEntity layout3 = new LayoutEntity();
+            layout3.setId(60001L);
+            layout3.setColOrder(Lists.newArrayList(1, 2, 3, 100003, 100004, 
100005));
+            layout3.setAuto(true);
+            IndexEntity index3 = new IndexEntity();
+            index3.setId(60000L);
+            index3.setDimensions(Lists.newArrayList(1, 2, 3));
+            index3.setMeasures(Lists.newArrayList(100003, 100004, 100005));
+            index3.setLayouts(Lists.newArrayList(layout3));
+
+            // Table Index
+            LayoutEntity layout4 = new LayoutEntity();
+            layout4.setId(20_000_040_001L);
+            layout4.setColOrder(Lists.newArrayList(1, 2, 3, 4, 5, 6));
+            layout4.setAuto(true);
+            IndexEntity index4 = new IndexEntity();
+            index4.setId(20_000_040_001L);
+            index4.setDimensions(Lists.newArrayList(1, 2, 3, 4, 5, 6));
+            copyForWrite.setIndexes(Lists.newArrayList(index1, index2, 
index3));
+        });
+
+        // change all layouts' status to ready.
+        NDataflow dataflow = 
dataflowManager.getDataflow(DEFAULT_MODEL_BASIC_ID);
+        NDataflowUpdate update = new NDataflowUpdate(dataflow.getUuid());
+        NDataSegment latestReadySegment = dataflow.getLatestReadySegment();
+        Set<Long> ids = 
indexPlan.getAllLayouts().stream().map(LayoutEntity::getId).collect(Collectors.toSet());
+        update.setToAddOrUpdateLayouts(genCuboids(dataflow, 
latestReadySegment.getId(), ids));
+        dataflowManager.updateDataflow(update);
+
+        Set<Long> garbageLayouts = IndexOptimizer.findGarbageLayouts(dataflow, 
new MergedLayoutOptStrategy());
+        Assert.assertEquals(2, garbageLayouts.size());
+    }
+
     @Test
     public void testSimilarLayoutGcStrategy() {
         /*
@@ -408,7 +467,8 @@ public class ProjectStorageInfoCollectorTest extends 
NLocalFileMetadataTestCase
         NDataflow df = dataflowManager.getDataflow(DEFAULT_MODEL_BASIC_ID);
         NDataflowUpdate update = new NDataflowUpdate(df.getUuid());
         NDataSegment latestReadySegment = df.getLatestReadySegment();
-        Set<Long> ids = Sets.newHashSet(2_000_020_001L, 2_000_030_001L, 
2_000_040_001L, 40_001L, 40_002L);
+        // 60_001L and 60_002L are non-existing layouts
+        Set<Long> ids = Sets.newHashSet(60_001L, 60_002L, 40_001L, 40_002L);
         update.setToAddOrUpdateLayouts(genCuboids(df, 
latestReadySegment.getId(), ids));
         dataflowManager.updateDataflow(update);
     }
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtilTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtilTest.java
index 498c78325e..71e8a7c895 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtilTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/cube/utils/IndexPlanReduceUtilTest.java
@@ -18,15 +18,20 @@
 
 package org.apache.kylin.metadata.cube.utils;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
+import org.apache.kylin.metadata.cube.model.IndexPlan;
 import org.apache.kylin.metadata.cube.model.LayoutEntity;
 import org.junit.Assert;
 import org.junit.Test;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 
 public class IndexPlanReduceUtilTest {
 
@@ -140,6 +145,26 @@ public class IndexPlanReduceUtilTest {
 
     }
 
+    @Test
+    public void testMergeSameDimLayout() {
+        LayoutEntity layout1 = new LayoutEntity();
+        layout1.setId(1);
+        layout1.setColOrder(Lists.newArrayList(1, 2, 3, 4, 100000, 100001));
+        layout1.setInProposing(false);
+        LayoutEntity layout2 = new LayoutEntity();
+        layout2.setId(10001);
+        layout2.setColOrder(Lists.newArrayList(1, 2, 3, 4, 100000, 100001, 
100002));
+        layout2.setInProposing(true);
+
+        IndexPlan indexPlan = new IndexPlan();
+        List<Set<LayoutEntity>> sameDimLayouts = Lists
+                .newArrayList(Collections.singleton(Sets.newHashSet(layout1, 
layout2)));
+        IndexPlan indexPlanMerged = 
IndexPlanReduceUtil.mergeSameDimLayout(indexPlan, sameDimLayouts);
+        Assert.assertEquals(1, indexPlanMerged.getAllIndexes().size());
+        Assert.assertEquals(ImmutableList.of(1, 2, 3, 4, 100000, 100001, 
100002),
+                
indexPlanMerged.getAllIndexes().get(0).getLayouts().get(0).getColOrder());
+    }
+
     @Test
     public void testCollectIncludedAggIndexLayoutsWithProposingSmallerIndex() {
         LayoutEntity layout1 = new LayoutEntity(); // a proposed layout
diff --git 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/initialize/BuildAppInitializer.java
 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/initialize/BuildAppInitializer.java
index 848b29e1a8..d478f42a5c 100644
--- 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/initialize/BuildAppInitializer.java
+++ 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/initialize/BuildAppInitializer.java
@@ -19,6 +19,7 @@ package org.apache.kylin.rest.initialize;
 
 import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.rest.service.JobService;
+import org.apache.kylin.rest.service.ModelBuildService;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -32,8 +33,12 @@ public class BuildAppInitializer implements InitializingBean 
{
     @Autowired
     private JobService jobService;
 
+    @Autowired
+    private ModelBuildService modelBuildService;
+
     @Override
     public void afterPropertiesSet() throws Exception {
         EventBusFactory.getInstance().registerService(jobService);
+        EventBusFactory.getInstance().registerService(modelBuildService);
     }
 }
diff --git 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
index 4fcd8f5d16..dd4afcb0f7 100644
--- 
a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
+++ 
b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
@@ -45,6 +45,11 @@ import org.apache.kylin.common.exception.JobErrorCode;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.base.Strings;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
+import org.apache.kylin.guava30.shaded.common.eventbus.Subscribe;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.exception.JobSubmissionException;
 import org.apache.kylin.job.execution.JobTypeEnum;
@@ -56,6 +61,7 @@ import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.SegmentPartition;
+import org.apache.kylin.metadata.cube.optimization.event.BuildIndexEvent;
 import org.apache.kylin.metadata.model.ManagementType;
 import org.apache.kylin.metadata.model.MultiPartitionDesc;
 import org.apache.kylin.metadata.model.NDataModel;
@@ -85,14 +91,11 @@ import 
org.apache.kylin.rest.service.params.IncrementBuildSegmentParams;
 import org.apache.kylin.rest.service.params.MergeSegmentParams;
 import org.apache.kylin.rest.service.params.RefreshSegmentParams;
 import org.apache.kylin.source.SourceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import org.apache.kylin.guava30.shaded.common.base.Preconditions;
-import org.apache.kylin.guava30.shaded.common.base.Strings;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
 import lombok.val;
 import lombok.var;
 
@@ -105,6 +108,8 @@ public class ModelBuildService extends AbstractModelService 
implements ModelBuil
     @Autowired
     private SegmentHelper segmentHelper;
 
+    private static final Logger logger = 
LoggerFactory.getLogger(ModelBuildService.class);
+
     //only fo test
     public JobInfoResponse buildSegmentsManually(String project, String 
modelId, String start, String end)
             throws Exception {
@@ -431,6 +436,12 @@ public class ModelBuildService extends 
AbstractModelService implements ModelBuil
     public BuildIndexResponse buildIndicesManually(String modelId, String 
project, int priority, String yarnQueue,
             Object tag) {
         aclEvaluate.checkProjectOperationPermission(project);
+        String username = getUsername();
+        return buildIndicesInternal(modelId, project, priority, yarnQueue, 
tag, username);
+    }
+
+    private BuildIndexResponse buildIndicesInternal(String modelId, String 
project, int priority, String yarnQueue,
+            Object tag, String userName) {
         NDataModel modelDesc = getManager(NDataModelManager.class, 
project).getDataModelDesc(modelId);
         if (ManagementType.MODEL_BASED != modelDesc.getManagementType()) {
             throw new KylinException(PERMISSION_DENIED, 
String.format(Locale.ROOT,
@@ -444,13 +455,26 @@ public class ModelBuildService extends 
AbstractModelService implements ModelBuil
         }
 
         String jobId = 
getManager(SourceUsageManager.class).licenseCheckWrap(project,
-                () -> getManager(JobManager.class, project).addIndexJob(new 
JobParam(modelId, getUsername())
-                        
.withPriority(priority).withYarnQueue(yarnQueue).withTag(tag)));
+                () -> getManager(JobManager.class, project).addIndexJob(
+                        new JobParam(modelId, 
userName).withPriority(priority).withYarnQueue(yarnQueue).withTag(tag)));
 
         return new BuildIndexResponse(StringUtils.isBlank(jobId) ? 
BuildIndexResponse.BuildIndexType.NO_LAYOUT
                 : BuildIndexResponse.BuildIndexType.NORM_BUILD, jobId);
     }
 
+    @Subscribe
+    public void buildIndicesManually(BuildIndexEvent event) {
+        String project = event.getProject();
+        List<NDataflow> dataflows = event.getDataflows();
+        dataflows.forEach(dataflow -> {
+            if (!dataflow.isOnline()) {
+                return;
+            }
+            logger.info("build indexes for model: {} under project: {}", 
dataflow.getModelAlias(), project);
+            buildIndicesInternal(dataflow.getId(), project, 
ExecutablePO.DEFAULT_PRIORITY, null, null, "System");
+        });
+    }
+
     @Transaction(project = 0)
     public JobInfoResponse buildSegmentPartitionByValue(String project, String 
modelId, String segmentId,
             List<String[]> partitionValues, boolean parallelBuild, boolean 
buildAllPartitions, int priority,
diff --git 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
index 109a8a16e7..f884d87aac 100644
--- 
a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
+++ 
b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
@@ -57,6 +57,9 @@ import 
org.apache.kylin.engine.spark.job.ExecutableAddSegmentHandler;
 import org.apache.kylin.engine.spark.job.ExecutableMergeOrRefreshHandler;
 import org.apache.kylin.engine.spark.job.NSparkCubingJob;
 import org.apache.kylin.engine.spark.utils.ComputedColumnEvalUtil;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableParams;
@@ -78,6 +81,7 @@ import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.apache.kylin.metadata.cube.model.PartitionStatusEnum;
+import org.apache.kylin.metadata.cube.optimization.event.BuildIndexEvent;
 import org.apache.kylin.metadata.job.JobBucket;
 import org.apache.kylin.metadata.model.ManagementType;
 import org.apache.kylin.metadata.model.NDataModel;
@@ -122,10 +126,6 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.springframework.test.util.ReflectionTestUtils;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
 import lombok.val;
 import lombok.var;
 
@@ -877,6 +877,32 @@ public class ModelServiceBuildTest extends SourceTestCase {
 
     }
 
+    @Test
+    public void testBuildIndexManuallyForEvent() {
+        val project = "default";
+        val modelId = "abe3bf1a-c4bc-458d-8278-7ea8b00f5e96";
+        val dataflowManager = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        val df = dataflowManager.getDataflow(modelId);
+        BuildIndexEvent buildIndexEvent = new BuildIndexEvent(project, 
Lists.newArrayList(df));
+
+        val dfUpdate = new NDataflowUpdate(df.getId());
+        List<NDataLayout> tobeRemoveCuboidLayouts = Lists.newArrayList();
+        Segments<NDataSegment> segments = df.getSegments();
+        for (NDataSegment segment : segments) {
+            tobeRemoveCuboidLayouts.addAll(segment.getLayoutsMap().values());
+        }
+        dfUpdate.setToRemoveLayouts(tobeRemoveCuboidLayouts.toArray(new 
NDataLayout[0]));
+        dataflowManager.updateDataflow(dfUpdate);
+        val modelManager = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        modelManager.updateDataModel(modelId,
+                copyForWrite -> 
copyForWrite.setManagementType(ManagementType.MODEL_BASED));
+        modelBuildService.buildIndicesManually(buildIndexEvent);
+        val executables = getRunningExecutables(project, modelId);
+        Assert.assertEquals(1, executables.size());
+        Assert.assertTrue(((NSparkCubingJob) executables.get(0)).getHandler() 
instanceof ExecutableAddCuboidHandler);
+        Assert.assertEquals(3, executables.get(0).getPriority());
+    }
+
     @Test
     public void testBuildIndexManuallyWithoutLayout() {
         val project = "default";
@@ -1392,7 +1418,7 @@ public class ModelServiceBuildTest extends SourceTestCase 
{
         // build all partition values
         IncrementBuildSegmentParams incrParams2 = new 
IncrementBuildSegmentParams(project, modelId, "1633104000000",
                 "1633190400000", model.getPartitionDesc(), 
model.getMultiPartitionDesc(), null, true, null)
-                        .withBuildAllSubPartitions(true);
+                .withBuildAllSubPartitions(true);
         val jobInfo2 = 
modelBuildService.incrementBuildSegmentsManually(incrParams2);
         Assert.assertEquals(1, jobInfo2.getJobs().size());
         Assert.assertEquals(jobInfo2.getJobs().get(0).getJobName(), 
JobTypeEnum.INC_BUILD.name());
diff --git 
a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java
 
b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java
index 05f36400af..8cd51f7b9d 100644
--- 
a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java
+++ 
b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NProjectController.java
@@ -243,7 +243,19 @@ public class NProjectController extends NBasicController {
         if (projectInstance == null) {
             throw new KylinException(PROJECT_NOT_EXIST, project);
         }
-        projectService.cleanupGarbage(project);
+        projectService.cleanupGarbage(project, false);
+        return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, true, "");
+    }
+
+    @ApiOperation(value = "cleanupProjectStorage", tags = { "SM" }, notes = 
"Add URL: {project}; ")
+    @PutMapping(value = "/{project:.+}/optimize_index")
+    @ResponseBody
+    public EnvelopeResponse<Boolean> optimizeIndex(@PathVariable(value = 
"project") String project) throws Exception {
+        ProjectInstance projectInstance = 
projectService.getManager(NProjectManager.class).getProject(project);
+        if (projectInstance == null) {
+            throw new KylinException(PROJECT_NOT_EXIST, project);
+        }
+        projectService.cleanupGarbage(project, true);
         return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, true, "");
     }
 
diff --git 
a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java
 
b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java
index 12f047236e..7677b38d94 100644
--- 
a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java
+++ 
b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NProjectControllerTest.java
@@ -231,7 +231,7 @@ public class NProjectControllerTest extends 
NLocalFileMetadataTestCase {
         NProjectManager projectManager = Mockito.mock(NProjectManager.class);
         
Mockito.doReturn(projectInstance).when(projectManager).getProject("default");
         
Mockito.doReturn(projectManager).when(projectService).getManager(NProjectManager.class);
-        Mockito.doNothing().when(projectService).cleanupGarbage("default");
+        Mockito.doNothing().when(projectService).cleanupGarbage("default", 
false);
         
mockMvc.perform(MockMvcRequestBuilders.put("/api/projects/{project}/storage", 
"default")
                 .contentType(MediaType.APPLICATION_JSON).param("project", 
"default")
                 .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_JSON)))
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java
index 28c102a2d1..63d048f86e 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/AbstractModelService.java
@@ -26,30 +26,42 @@ import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NOT_EXIST;
 
 import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.acl.AclTCRManager;
+import org.apache.kylin.metadata.cube.model.IndexEntity;
 import org.apache.kylin.metadata.cube.model.IndexPlan;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.AclPermissionUtil;
 import org.springframework.beans.factory.annotation.Autowired;
 
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
-
 import lombok.val;
 import lombok.var;
+import lombok.extern.slf4j.Slf4j;
 
+@Slf4j
 public class AbstractModelService extends BasicService {
 
     public static final String VALID_NAME_FOR_MODEL = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890_";
+    protected static final String SAVE_INDEXES_STRATEGY = 
"single_dim_and_reduce_hc";
 
     @Autowired
     public AclEvaluate aclEvaluate;
@@ -137,4 +149,102 @@ public class AbstractModelService extends BasicService {
         }
     }
 
+    private boolean isHighCardinalityDim(NTableMetadataManager tableManager, 
TblColRef colRef) {
+        String tableIdentity = colRef.getTableRef().getTableIdentity();
+        TableDesc tableDesc = tableManager.getTableDesc(tableIdentity);
+        TableExtDesc tableExtIfExists = 
tableManager.getTableExtIfExists(tableDesc);
+        TableExtDesc.ColumnStats columnStats = 
tableExtIfExists.getColumnStatsByName(colRef.getName());
+
+        // table not sampled
+        if (Objects.isNull(columnStats)) {
+            return false;
+        }
+
+        return (double) (columnStats.getCardinality()) / 
tableExtIfExists.getTotalRows() > 0.2;
+    }
+
+    private boolean needToDeleteLayout(NTableMetadataManager tableManager, 
NDataModel model, IndexEntity index) {
+        int dimSize = index.getDimensions().size();
+        if (dimSize > 1) {
+            return true;
+        }
+
+        if (dimSize == 0) {
+            return false;
+        }
+
+        return isHighCardinalityDim(tableManager, 
model.getColRef(index.getDimensions().get(0)));
+    }
+
+    private Pair<Boolean, Integer> processIndex(NDataModel model, IndexEntity 
index,
+            IndexPlan.IndexPlanUpdateHandler updateHandler, 
NTableMetadataManager tableManager) {
+        boolean needSplit = true;
+        Integer shardByCol = null;
+
+        List<LayoutEntity> layouts = index.getLayouts();
+        for (int i = layouts.size() - 1; i >= 0; i--) {
+            LayoutEntity layout = layouts.get(i);
+
+            // When the recommended index and agg index overlap,
+            // there will be two layouts under this index, and they are only 
in different col order.
+            // This index does not require split.
+            if (layout.isBaseIndex()) {
+                needSplit = false;
+                continue;
+            }
+
+            List<Integer> shardByCols = layout.getShardByColumns();
+            if (CollectionUtils.isNotEmpty(shardByCols)) {
+                shardByCol = shardByCols.get(0);
+            }
+
+            if (needToDeleteLayout(tableManager, model, index)) {
+                updateHandler.remove(layouts.get(i), 
IndexEntity.isAggIndex(index.getId()), false, false);
+            }
+        }
+
+        return Pair.newPair(needSplit, shardByCol);
+    }
+
+    protected void splitIndexesIntoSingleDimIndexes(NDataModel model, 
IndexPlan indexPlan) {
+        NTableMetadataManager tableManager = 
getManager(NTableMetadataManager.class, model.getProject());
+        IndexPlan.IndexPlanUpdateHandler updateHandler = 
indexPlan.createUpdateHandler();
+        List<IndexEntity> indexes = indexPlan.getIndexes();
+        for (IndexEntity index : indexes) {
+            Pair<Boolean, Integer> needSplitAndShardByCol = 
processIndex(model, index, updateHandler, tableManager);
+            boolean needSplit = needSplitAndShardByCol.getFirst();
+            Integer shardByCol = needSplitAndShardByCol.getSecond();
+
+            if (!needSplit) {
+                continue;
+            }
+
+            List<Integer> measures = index.getMeasures();
+            for (Integer dimension : index.getDimensions()) {
+                TblColRef colRef = model.getColRef(dimension);
+                if (isHighCardinalityDim(tableManager, colRef)) {
+                    log.warn("The col {} is high cardinality dimension, not 
recommended as an index", colRef.getName());
+                    continue;
+                }
+
+                List<Integer> colOrder = Lists.newArrayList();
+                colOrder.add(dimension);
+                boolean containShardByCol = colOrder.contains(shardByCol);
+                colOrder.addAll(measures);
+
+                LayoutEntity singleDimensionAggLayout;
+                if (containShardByCol) {
+                    singleDimensionAggLayout = 
indexPlan.createLayout(colOrder, true, false,
+                            Lists.newArrayList(shardByCol));
+                } else {
+                    singleDimensionAggLayout = 
indexPlan.createLayout(colOrder, true, false, Lists.newArrayList());
+                }
+
+                updateHandler.add(singleDimensionAggLayout, true, true);
+            }
+        }
+
+        updateHandler.complete();
+    }
+
 }
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/BaseIndexUpdateHelper.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/BaseIndexUpdateHelper.java
index 2a88e7f4b6..e9d7e80e50 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/BaseIndexUpdateHelper.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/BaseIndexUpdateHelper.java
@@ -20,6 +20,7 @@ package org.apache.kylin.rest.service;
 import java.util.List;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
 import org.apache.kylin.metadata.cube.model.IndexPlan;
 import org.apache.kylin.metadata.cube.model.LayoutEntity;
@@ -29,8 +30,6 @@ import org.apache.kylin.rest.request.CreateBaseIndexRequest;
 import org.apache.kylin.rest.response.BuildBaseIndexResponse;
 import org.apache.kylin.rest.util.SpringContext;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-
 import io.kyligence.kap.secondstorage.SecondStorageUpdater;
 import io.kyligence.kap.secondstorage.SecondStorageUtil;
 import lombok.Setter;
@@ -89,6 +88,10 @@ public class BaseIndexUpdateHelper {
     }
 
     public BuildBaseIndexResponse update(IndexPlanService service) {
+        return update(service, true);
+    }
+
+    public BuildBaseIndexResponse update(IndexPlanService service, boolean 
checkProjectOperation) {
         if (!needUpdate) {
             return BuildBaseIndexResponse.EMPTY;
         }
@@ -116,8 +119,14 @@ public class BaseIndexUpdateHelper {
         CreateBaseIndexRequest indexRequest = new CreateBaseIndexRequest();
         indexRequest.setModelId(modelId);
         indexRequest.setProject(project);
-        BuildBaseIndexResponse response = service.updateBaseIndex(project, 
indexRequest, needCreateBaseTable,
-                needCreateBaseAgg, true);
+        BuildBaseIndexResponse response;
+        if (checkProjectOperation) {
+            response = service.updateBaseIndex(project, indexRequest, 
needCreateBaseTable, needCreateBaseAgg, true);
+        } else {
+            response = service.updateBaseIndexInternal(project, indexRequest, 
needCreateBaseTable, needCreateBaseAgg,
+                    true);
+        }
+
         response.judgeIndexOperateType(exist(preBaseAggLayout), true);
         response.judgeIndexOperateType(exist(preBaseTableLayout), false);
 
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java
index aa7b481bbb..9a50ab5b75 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/IndexPlanService.java
@@ -52,7 +52,6 @@ import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.spark.smarter.IndexDependencyParser;
-import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
 import org.apache.kylin.guava30.shaded.common.base.Preconditions;
 import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
@@ -112,6 +111,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Service;
 
 import io.kyligence.kap.secondstorage.SecondStorageUpdater;
@@ -265,7 +265,7 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
         return createTableIndex(project, request.getModelId(), newLayout, 
request.isLoadData());
     }
 
-    private BuildIndexResponse createTableIndex(String project, String 
modelId, LayoutEntity newLayout,
+    public BuildIndexResponse createTableIndex(String project, String modelId, 
LayoutEntity newLayout,
             boolean loadData) {
         NIndexPlanManager indexPlanManager = 
getManager(NIndexPlanManager.class, project);
         val jobManager = getManager(JobManager.class, project);
@@ -494,6 +494,7 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
                                     StringUtils.join(notExistCols.iterator(), 
",")));
                 }
             }
+
             indexPlan.setRuleBasedIndex(ruleBasedIndex);
         } catch (OutOfMaxCombinationException oe) {
             invalid = true;
@@ -652,13 +653,13 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
     }
 
     public List<IndexResponse> getIndexes(String project, String modelId, 
String key, List<IndexEntity.Status> status,
-                                          String orderBy, Boolean desc, 
List<IndexEntity.Source> sources) {
+            String orderBy, Boolean desc, List<IndexEntity.Source> sources) {
         return getIndexes(new IndexPlanParams(project, modelId, null, null, 
sources, status, null),
-                new PaginationParams(null, null, orderBy, desc),
-                key);
+                new PaginationParams(null, null, orderBy, desc), key);
     }
 
-    public List<IndexResponse> getIndexes(IndexPlanParams indexPlanParams, 
PaginationParams paginationParams, String key) {
+    public List<IndexResponse> getIndexes(IndexPlanParams indexPlanParams, 
PaginationParams paginationParams,
+            String key) {
         String project = indexPlanParams.getProject();
         String modelId = indexPlanParams.getModelId();
         String segmentId = indexPlanParams.getSegmentId();
@@ -674,10 +675,11 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
         Preconditions.checkState(indexPlan != null);
         val model = indexPlan.getModel();
         val layouts = indexPlan.getAllLayouts();
-        Set<Long> layoutsByRunningJobs = getLayoutsByRunningJobs(project, 
modelId);
+        Map<String, Set<Long>> layoutsByRunningJobs = 
getLayoutsByRunningJobs(project, modelId);
         if (StringUtils.isBlank(key)) {
             return sortAndFilterLayouts(layouts.stream()
-                    .map(layoutEntity -> convertToResponse(layoutEntity, 
indexPlan.getModel(), layoutsByRunningJobs, segmentId))
+                    .map(layoutEntity -> convertToResponse(layoutEntity, 
indexPlan.getModel(), layoutsByRunningJobs,
+                            segmentId))
                     .filter(indexResponse -> statusSet.isEmpty() || 
statusSet.contains(indexResponse.getStatus())),
                     orderBy, desc, sources);
         }
@@ -840,8 +842,7 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
         return response;
     }
 
-    @VisibleForTesting
-    public Set<Long> getLayoutsByRunningJobs(String project, String modelId) {
+    public Map<String, Set<Long>> getLayoutsByRunningJobs(String project, 
String modelId) {
         List<AbstractExecutable> runningJobList = NExecutableManager
                 .getInstance(KylinConfig.getInstanceFromEnv(), project) //
                 .getPartialExecutablesByStatusList(
@@ -849,17 +850,27 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
                                 ExecutableState.ERROR), //
                         path -> StringUtils.endsWith(path, modelId));
 
-        return runningJobList.stream()
-                .filter(abstractExecutable -> Objects.equals(modelId, 
abstractExecutable.getTargetSubject()))
-                
.map(AbstractExecutable::getToBeDeletedLayoutIds).flatMap(Set::stream).collect(Collectors.toSet());
+        Map<String, Set<Long>> underConstructionLayoutsMap = new HashMap<>();
+        runningJobList.stream().filter(ae -> Objects.equals(modelId, 
ae.getTargetSubject())).forEach(ae -> {
+            Set<Long> layoutIds = ae.getLayoutIds();
+            for (String segmentId : ae.getSegmentIds()) {
+                Set<Long> segmentLayoutIds = 
underConstructionLayoutsMap.get(segmentId);
+                if (segmentLayoutIds == null) {
+                    underConstructionLayoutsMap.put(segmentId, 
Sets.newHashSet(layoutIds));
+                } else {
+                    segmentLayoutIds.addAll(layoutIds);
+                }
+            }
+        });
+        return underConstructionLayoutsMap;
     }
 
     private IndexResponse convertToResponse(LayoutEntity layoutEntity, 
NDataModel model) {
-        return convertToResponse(layoutEntity, model, Sets.newHashSet(), null);
+        return convertToResponse(layoutEntity, model, Maps.newHashMap(), null);
     }
 
     private IndexResponse convertToResponse(LayoutEntity layoutEntity, 
NDataModel model,
-            Set<Long> layoutIdsOfRunningJobs, String segmentId) {
+            Map<String, Set<Long>> layoutIdsOfRunningJobs, String segmentId) {
 
         // remove all internal measures
         val colOrders = Lists.newArrayList(layoutEntity.getColOrder());
@@ -880,8 +891,14 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
         boolean hasDataInconsistent = false;
 
         List<NDataSegment> segments = StringUtils.isBlank(segmentId) ? 
dataflow.getSegments()
-                : dataflow.getSegments().stream().filter(seg -> 
seg.getId().equals(segmentId)).collect(Collectors.toList());
+                : dataflow.getSegments().stream().filter(seg -> 
seg.getId().equals(segmentId))
+                        .collect(Collectors.toList());
+        Set<Long> layoutIdsOnLoading = Sets.newHashSet();
         for (NDataSegment segment : segments) {
+            Set<Long> segmentLayoutIdsOnLoading = 
layoutIdsOfRunningJobs.get(segment.getId());
+            if (CollectionUtils.isNotEmpty(segmentLayoutIdsOnLoading)) {
+                layoutIdsOnLoading.addAll(segmentLayoutIdsOnLoading);
+            }
             NDataLayout layout = segment.getLayout(layoutEntity.getId(), true);
             if (layout == null) {
                 continue;
@@ -896,7 +913,7 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
 
         IndexEntity.Status status;
         if (readyCount <= 0) {
-            if (layoutIdsOfRunningJobs.contains(layoutEntity.getId())) {
+            if (layoutIdsOnLoading.contains(layoutEntity.getId())) {
                 status = IndexEntity.Status.BUILDING;
             } else {
                 status = IndexEntity.Status.NO_BUILD;
@@ -1110,6 +1127,13 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
     public BuildBaseIndexResponse updateBaseIndex(String project, 
CreateBaseIndexRequest request,
             boolean createIfNotExistTableLayout, boolean 
createIfNotExistAggLayout, boolean isAuto) {
         aclEvaluate.checkProjectOperationDesignPermission(project);
+        return updateBaseIndexInternal(project, request, 
createIfNotExistTableLayout, createIfNotExistAggLayout,
+                isAuto);
+    }
+
+    @Transaction(project = 0)
+    public BuildBaseIndexResponse updateBaseIndexInternal(String project, 
CreateBaseIndexRequest request,
+            boolean createIfNotExistTableLayout, boolean 
createIfNotExistAggLayout, boolean isAuto) {
         // update = delete + create
         Set<Long> needDelete = checkNeedUpdateBaseIndex(project, request, 
isAuto);
         List<LayoutEntity> needRetainAggLayout = 
getNeedRetainAggLayout(project, request, needDelete);
@@ -1126,7 +1150,7 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
             return BuildBaseIndexResponse.EMPTY;
         }
 
-        BuildBaseIndexResponse response = createBaseIndex(project, request);
+        BuildBaseIndexResponse response = createBaseIndexInternal(project, 
request);
         response.setIndexUpdateType(needDelete);
         return response;
     }
@@ -1246,6 +1270,10 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
     @Transaction(project = 0)
     public BuildBaseIndexResponse createBaseIndex(String project, 
CreateBaseIndexRequest request) {
         aclEvaluate.checkProjectOperationDesignPermission(project);
+        return createBaseIndexInternal(project, request);
+    }
+
+    public BuildBaseIndexResponse createBaseIndexInternal(String project, 
CreateBaseIndexRequest request) {
         NDataModel model = getManager(NDataModelManager.class, 
project).getDataModelDesc(request.getModelId());
         NIndexPlanManager indexPlanManager = 
getManager(NIndexPlanManager.class, project);
         IndexPlan indexPlan = 
indexPlanManager.getIndexPlan(request.getModelId());
@@ -1283,7 +1311,7 @@ public class IndexPlanService extends BasicService 
implements TableIndexPlanSupp
     }
 
     private void overrideLayout(LayoutEntity layout, LayoutProperty 
layoutProperty, NDataModel model) {
-        layout.setOwner(BasicService.getUsername());
+        layout.setOwner(SecurityContextHolder.getContext() == null ? "System" 
: getUsername());
         if (layoutProperty == null) {
             return;
         }
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
index 59cc0a9203..e542fe6698 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -124,14 +124,22 @@ import 
org.apache.kylin.common.persistence.transaction.TransactionException;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.persistence.transaction.UnitOfWorkContext;
 import org.apache.kylin.common.scheduler.EventBusFactory;
-import org.apache.kylin.common.util.SqlIdentifierFormatterVisitor;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.common.util.SqlIdentifierFormatterVisitor;
 import org.apache.kylin.common.util.StringHelper;
 import org.apache.kylin.common.util.ThreadUtil;
 import org.apache.kylin.engine.spark.utils.ComputedColumnEvalUtil;
+import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
+import org.apache.kylin.guava30.shaded.common.base.Preconditions;
+import org.apache.kylin.guava30.shaded.common.base.Strings;
+import org.apache.kylin.guava30.shaded.common.base.Supplier;
+import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.job.SecondStorageJobParamUtil;
 import org.apache.kylin.job.common.SegmentUtil;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -267,15 +275,7 @@ import 
org.springframework.security.core.userdetails.UserDetails;
 import org.springframework.stereotype.Component;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.kylin.guava30.shaded.common.annotations.VisibleForTesting;
-import org.apache.kylin.guava30.shaded.common.base.Preconditions;
-import org.apache.kylin.guava30.shaded.common.base.Strings;
-import org.apache.kylin.guava30.shaded.common.collect.ImmutableList;
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-import org.apache.kylin.guava30.shaded.common.collect.Maps;
-import org.apache.kylin.guava30.shaded.common.collect.Sets;
 
-import org.apache.kylin.guava30.shaded.common.base.Supplier;
 import io.kyligence.kap.secondstorage.SecondStorage;
 import io.kyligence.kap.secondstorage.SecondStorageNodeHelper;
 import io.kyligence.kap.secondstorage.SecondStorageUpdater;
@@ -1847,6 +1847,10 @@ public class ModelService extends AbstractModelService 
implements TableModelSupp
     }
 
     public void saveNewModelsAndIndexes(String project, List<ModelRequest> 
newModels) {
+        saveNewModelsAndIndexes(project, null, newModels);
+    }
+
+    public void saveNewModelsAndIndexes(String project, String 
saveIndexesStrategy, List<ModelRequest> newModels) {
         if (CollectionUtils.isEmpty(newModels)) {
             return;
         }
@@ -1883,6 +1887,9 @@ public class ModelService extends AbstractModelService 
implements TableModelSupp
             emptyIndex.setUuid(expanded.getUuid());
             indexPlanManager.createIndexPlan(emptyIndex);
             indexPlanService.expandIndexPlanRequest(indexPlan, expanded);
+            if (SAVE_INDEXES_STRATEGY.equalsIgnoreCase(saveIndexesStrategy)) {
+                indexPlan.setBaseAggIndexReduceHighCardinalityDim(true);
+            }
             addBaseIndex(modelRequest, expanded, indexPlan);
 
             // create DataFlow
@@ -1896,7 +1903,7 @@ public class ModelService extends AbstractModelService 
implements TableModelSupp
             }
 
             createStreamingJob(project, expanded, modelRequest);
-            updateIndexPlan(project, indexPlan);
+            updateIndexPlan(project, indexPlan, expanded, saveIndexesStrategy);
             UnitOfWorkContext context = UnitOfWork.get();
             context.doAfterUnit(() -> EventBusFactory.getInstance()
                     .postSync(new ModelAddEvent(project, expanded.getId(), 
expanded.getAlias())));
@@ -2149,9 +2156,13 @@ public class ModelService extends AbstractModelService 
implements TableModelSupp
         copy.getPartitionDesc().changeTableAlias(oldAliasName, tableName);
     }
 
-    void updateIndexPlan(String project, IndexPlan indexPlan) {
+    void updateIndexPlan(String project, IndexPlan indexPlan, NDataModel 
model, String saveIndexesStrategy) {
         NIndexPlanManager indexPlanManager = 
NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
         indexPlanManager.updateIndexPlan(indexPlan.getId(), copyForWrite -> {
+            if (SAVE_INDEXES_STRATEGY.equalsIgnoreCase(saveIndexesStrategy)) {
+                copyForWrite.setBaseAggIndexReduceHighCardinalityDim(true);
+                splitIndexesIntoSingleDimIndexes(model, indexPlan);
+            }
             if (indexPlan.getAggShardByColumns() != null) {
                 
copyForWrite.setAggShardByColumns(indexPlan.getAggShardByColumns());
             }
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java
index 05d3d4d107..73b9a94fb7 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/IndexPlanServiceTest.java
@@ -17,9 +17,28 @@
  */
 package org.apache.kylin.rest.service;
 
-import lombok.extern.slf4j.Slf4j;
-import lombok.val;
-import lombok.var;
+import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.INDEX_DUPLICATE;
+import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.LAYOUT_NOT_EXISTS;
+import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.SHARD_BY_COLUMN_NOT_IN_INDEX;
+import static 
org.apache.kylin.metadata.cube.model.IndexEntity.Source.CUSTOM_TABLE_INDEX;
+import static 
org.apache.kylin.metadata.cube.model.IndexEntity.Source.RECOMMENDED_TABLE_INDEX;
+import static org.apache.kylin.metadata.model.SegmentStatusEnum.READY;
+import static org.apache.kylin.metadata.model.SegmentStatusEnum.WARNING;
+import static org.hamcrest.Matchers.is;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
 import org.apache.commons.collections.ListUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
@@ -29,6 +48,7 @@ import org.apache.kylin.cube.model.SelectRule;
 import org.apache.kylin.engine.spark.job.ExecutableAddCuboidHandler;
 import org.apache.kylin.engine.spark.job.NSparkCubingJob;
 import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
 import org.apache.kylin.guava30.shaded.common.collect.Sets;
 import org.apache.kylin.metadata.cube.cuboid.NAggregationGroup;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
@@ -69,25 +89,9 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.springframework.test.util.ReflectionTestUtils;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-
-import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.INDEX_DUPLICATE;
-import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.LAYOUT_NOT_EXISTS;
-import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.SHARD_BY_COLUMN_NOT_IN_INDEX;
-import static 
org.apache.kylin.metadata.cube.model.IndexEntity.Source.CUSTOM_TABLE_INDEX;
-import static 
org.apache.kylin.metadata.cube.model.IndexEntity.Source.RECOMMENDED_TABLE_INDEX;
-import static org.apache.kylin.metadata.model.SegmentStatusEnum.READY;
-import static org.apache.kylin.metadata.model.SegmentStatusEnum.WARNING;
-import static org.hamcrest.Matchers.is;
+import lombok.val;
+import lombok.var;
+import lombok.extern.slf4j.Slf4j;
 
 @Slf4j
 public class IndexPlanServiceTest extends SourceTestCase {
@@ -1148,7 +1152,9 @@ public class IndexPlanServiceTest extends SourceTestCase {
         Assert.assertEquals(1, response.size());
         Assert.assertTrue(ids.contains(20000010001L));
 
-        
Mockito.doReturn(Sets.newHashSet(20000020001L)).when(indexPlanService).getLayoutsByRunningJobs(getProject(),
+        Map<String, Set<Long>> underConstructionLayoutsMap = Maps.newHashMap();
+        
underConstructionLayoutsMap.put("ef5e0663-feba-4ed2-b71c-21958122bbff", 
Sets.newHashSet(20000020001L));
+        
Mockito.doReturn(underConstructionLayoutsMap).when(indexPlanService).getLayoutsByRunningJobs(getProject(),
                 modelId);
         response = indexPlanService.getIndexes(getProject(), modelId, "",
                 Lists.newArrayList(IndexEntity.Status.BUILDING), "data_size", 
false, null);
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
index 054755f307..de48d76570 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/ProjectServiceTest.java
@@ -1037,7 +1037,7 @@ public class ProjectServiceTest extends 
NLocalFileMetadataTestCase {
 
     @Test
     public void testCleanupGarbage() throws Exception {
-        projectService.cleanupGarbage(PROJECT);
+        projectService.cleanupGarbage(PROJECT, false);
     }
 
     @Test
diff --git 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala
 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala
index 89e9a2c58d..a1e15c8de4 100644
--- 
a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala
+++ 
b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala
@@ -18,12 +18,13 @@
 
 package org.apache.kylin.query.plugin.diagnose
 
-import 
org.apache.kylin.guava30.shaded.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.kylin.common.util.ExecutorServiceUtil
+import 
org.apache.kylin.guava30.shaded.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext}
 import org.apache.spark.internal.Logging
+import org.apache.spark.utils.SparkHadoopUtils
 import org.joda.time.DateTime
 
 import java.io.File
@@ -36,8 +37,10 @@ class DiagnoseExecutorPlugin extends ExecutorPlugin with 
Logging {
   private val LOCAL_GC_FILE_PREFIX: String = "gc"
   private val DATE_PATTERN = "yyyy-MM-dd"
   private val checkingInterval: Long = 10000L
-  private val configuration: Configuration = new Configuration()
-  private val fileSystem: FileSystem = FileSystem.get(configuration)
+  // Default hadoop configuration for the unique fileSystem below is sufficient
+  private val configuration: Configuration = 
SparkHadoopUtils.newConfiguration()
+  // Use FileSystem.newInstance(...) to prevent contamination of global file 
system cache
+  private val fileSystem: FileSystem = FileSystem.newInstance(configuration)
 
   private val scheduledExecutorService = Executors.newScheduledThreadPool(1,
     new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Diagnose-%d").build())
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/AbstractHdfsLogAppender.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/AbstractHdfsLogAppender.java
index e80fbc9513..805647b3e5 100644
--- 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/AbstractHdfsLogAppender.java
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/AbstractHdfsLogAppender.java
@@ -109,7 +109,7 @@ public abstract class AbstractHdfsLogAppender
 
     public FileSystem getFileSystem() {
         if (null == fileSystem) {
-            return 
getFileSystem(SparkHadoopUtils.newConfigurationWithSparkConf());
+            return getFileSystem(SparkHadoopUtils.newConfiguration());
         }
         return fileSystem;
     }
@@ -122,7 +122,10 @@ public abstract class AbstractHdfsLogAppender
                         workingDir = 
System.getProperty("kylin.hdfs.working.dir");
                         StatusLogger.getLogger().warn("hdfsWorkingDir -> " + 
getWorkingDir());
                     }
-                    fileSystem = new Path(workingDir).getFileSystem(conf);
+
+                    // Use FileSystem.newInstance(...) to prevent 
contamination of global file system cache
+                    fileSystem = FileSystem.newInstance(new 
Path(workingDir).toUri(), conf);
+
                 } catch (IOException e) {
                     StatusLogger.getLogger().error("Failed to create the file 
system, ", e);
                     throw new RuntimeException("Failed to create the file 
system, ", e);
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkDriverHdfsLogAppender.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkDriverHdfsLogAppender.java
index a09c338f6f..3aaaff1f0f 100644
--- 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkDriverHdfsLogAppender.java
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkDriverHdfsLogAppender.java
@@ -112,7 +112,7 @@ public class SparkDriverHdfsLogAppender extends 
AbstractHdfsLogAppender {
     @Override
     public void doWriteLog(int eventSize, List<LogEvent> transaction) throws 
IOException, InterruptedException {
         if (!isWriterInited()) {
-            Configuration conf = 
SparkHadoopUtils.newConfigurationWithSparkConf();
+            Configuration conf = SparkHadoopUtils.newConfiguration();
             if (!initHdfsWriter(new Path(getLogPath()), conf)) {
                 StatusLogger.getLogger().error("init the hdfs writer failed!");
             }
diff --git 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java
 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java
index b1712befb7..4d5132333e 100644
--- 
a/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java
+++ 
b/src/spark-project/spark-common/src/main/java/org/apache/kylin/spark/common/logging/SparkExecutorHdfsLogAppender.java
@@ -173,14 +173,14 @@ public class SparkExecutorHdfsLogAppender extends 
AbstractHdfsLogAppender {
                 if (ugi != null) {
                     StatusLogger.getLogger().warn("Login user hashcode is " + 
ugi.hashCode());
                     ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
-                        if (!initHdfsWriter(file, 
SparkHadoopUtils.newConfigurationWithSparkConf())) {
+                        if (!initHdfsWriter(file, 
SparkHadoopUtils.newConfiguration())) {
                             StatusLogger.getLogger().error("Failed to init the 
hdfs writer!");
                         }
                         doRollingClean(loggingEvent);
                         return null;
                     });
                 } else {
-                    if (!initHdfsWriter(file, 
SparkHadoopUtils.newConfigurationWithSparkConf())) {
+                    if (!initHdfsWriter(file, 
SparkHadoopUtils.newConfiguration())) {
                         StatusLogger.getLogger().error("Failed to init the 
hdfs writer!");
                     }
                     doRollingClean(loggingEvent);
diff --git 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/utils/SparkHadoopUtils.scala
 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/utils/SparkHadoopUtils.scala
index d6e77356bf..d3bf23a4ca 100644
--- 
a/src/spark-project/spark-common/src/main/scala/org/apache/spark/utils/SparkHadoopUtils.scala
+++ 
b/src/spark-project/spark-common/src/main/scala/org/apache/spark/utils/SparkHadoopUtils.scala
@@ -18,16 +18,40 @@
 package org.apache.spark.utils
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.SparkEnv
+import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.deploy.SparkHadoopUtil
 
+/**
+ * Convenience utility object for invoking [[SparkHadoopUtil]].
+ */
 object SparkHadoopUtils {
 
+  /**
+   * Simply return a new default hadoop [[Configuration]].
+   * @return Newly created default hadoop configuration
+   */
+  def newConfiguration(): Configuration = {
+    new Configuration()
+  }
+
+  /**
+   * Returns a new hadoop [[Configuration]] with current [[SparkConf]] from 
[[SparkEnv]].
+   * @return Newly created hadoop configuration with extra spark properties
+   */
   def newConfigurationWithSparkConf(): Configuration = {
     val sparkEnv = SparkEnv.get
-    if (sparkEnv != null) {
-      SparkHadoopUtil.newConfiguration(sparkEnv.conf)
+    if (sparkEnv == null) {
+      throw new IllegalStateException("sparkEnv should not be null")
     }
-    new Configuration()
+    SparkHadoopUtil.newConfiguration(sparkEnv.conf)
+  }
+
+  /**
+   * Returns a new hadoop [[Configuration]] with [[SparkConf]] given.
+   * @param sparkConf A [[SparkConf]]
+   * @return Newly created hadoop configuration with extra spark properties 
given
+   */
+  def newConfigurationWithSparkConf(sparkConf: SparkConf): Configuration = {
+    SparkHadoopUtil.newConfiguration(sparkConf)
   }
 }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java 
b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java
index 7d796992b4..9bdbd5a3ff 100644
--- a/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java
+++ b/src/tool/src/main/java/org/apache/kylin/helper/RoutineToolHelper.java
@@ -18,12 +18,14 @@
 
 package org.apache.kylin.helper;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.SetThreadName;
@@ -128,7 +130,11 @@ public class RoutineToolHelper {
     public static void cleanMetaByProject(String projectName) {
         log.info("Start to clean up {} meta", projectName);
         try {
-            GarbageCleaner.cleanMetadata(projectName);
+            boolean needAggressiveOpt = Arrays
+                    
.stream(KylinConfig.getInstanceFromEnv().getProjectsAggressiveOptimizationIndex())
+                    .map(StringUtils::lowerCase).collect(Collectors.toList())
+                    .contains(StringUtils.toRootLowerCase(projectName));
+            GarbageCleaner.cleanMetadata(projectName, needAggressiveOpt);
         } catch (Exception e) {
             log.error("Project[{}] cleanup Metadata failed", projectName, e);
         }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java
index 498e0fcb35..04313ee7a5 100644
--- 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java
+++ 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/ExecutableCleaner.java
@@ -36,6 +36,11 @@ public class ExecutableCleaner extends MetadataCleaner {
         super(project);
     }
 
+    @Override
+    public void beforeCleanup() {
+        // do nothing
+    }
+
     @Override
     public void cleanup() {
 
@@ -62,4 +67,9 @@ public class ExecutableCleaner extends MetadataCleaner {
         logger.info("Clean executable in project {} finished", project);
     }
 
+    @Override
+    public void afterCleanup() {
+        // do nothing
+    }
+
 }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java
index 5a05e52f8e..527f25527a 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/GarbageCleaner.java
@@ -41,16 +41,18 @@ public class GarbageCleaner {
      * Clean up metadata
      * @param project
      */
-    public static void cleanMetadata(String project) {
+    public static void cleanMetadata(String project, boolean 
needAggressiveOpt) {
         val projectInstance = 
NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject(project);
         if (projectInstance == null) {
             return;
         }
 
-        List<MetadataCleaner> cleaners = initCleaners(project);
+        List<MetadataCleaner> cleaners = initCleaners(project, 
needAggressiveOpt);
         cleaners.forEach(MetadataCleaner::prepare);
         EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
+            cleaners.forEach(MetadataCleaner::beforeCleanup);
             cleaners.forEach(MetadataCleaner::cleanup);
+            cleaners.forEach(MetadataCleaner::afterCleanup);
             return 0;
         }, project);
 
@@ -58,8 +60,9 @@ public class GarbageCleaner {
         MetricsGroup.hostTagCounterInc(MetricsName.METADATA_CLEAN, 
MetricsCategory.PROJECT, project);
     }
 
-    private static List<MetadataCleaner> initCleaners(String project) {
-        return Arrays.asList(new SnapshotCleaner(project), new 
IndexCleaner(project), new ExecutableCleaner(project));
+    private static List<MetadataCleaner> initCleaners(String project, boolean 
needAggressiveOpt) {
+        return Arrays.asList(new SnapshotCleaner(project), new 
IndexCleaner(project, needAggressiveOpt),
+                new ExecutableCleaner(project));
     }
 
 }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java
index 6c7b88fa4c..7036648383 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/IndexCleaner.java
@@ -20,21 +20,33 @@ package org.apache.kylin.tool.garbage;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.annotation.Clarification;
+import org.apache.kylin.common.scheduler.EventBusFactory;
+import org.apache.kylin.guava30.shaded.common.collect.Lists;
+import org.apache.kylin.guava30.shaded.common.collect.Maps;
+import org.apache.kylin.metadata.cube.model.IndexPlan;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.apache.kylin.metadata.cube.optimization.GarbageLayoutType;
 import org.apache.kylin.metadata.cube.optimization.IndexOptimizerFactory;
+import org.apache.kylin.metadata.cube.optimization.event.ApproveRecsEvent;
+import org.apache.kylin.metadata.cube.optimization.event.BuildIndexEvent;
+import org.apache.kylin.metadata.cube.utils.IndexPlanReduceUtil;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.recommendation.ref.OptRecManagerV2;
 import org.apache.kylin.metadata.recommendation.ref.OptRecV2;
 
-import org.apache.kylin.guava30.shaded.common.collect.Lists;
-
 import lombok.val;
 import lombok.extern.slf4j.Slf4j;
 
@@ -42,10 +54,15 @@ import lombok.extern.slf4j.Slf4j;
 @Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
 public class IndexCleaner extends MetadataCleaner {
 
-    List<String> needUpdateModels = Lists.newArrayList();
+    private List<String> needUpdateModels = Lists.newArrayList();
 
-    public IndexCleaner(String project) {
+    private Map<NDataflow, Map<Long, GarbageLayoutType>> 
needOptAggressivelyModels = Maps.newHashMap();
+
+    private final boolean needAggressiveOpt;
+
+    public IndexCleaner(String project, boolean needAggressiveOpt) {
         super(project);
+        this.needAggressiveOpt = needAggressiveOpt;
     }
 
     @Override
@@ -59,15 +76,22 @@ public class IndexCleaner extends MetadataCleaner {
             log.info("not semiautomode, can't run index clean");
             return;
         }
+
         OptRecManagerV2 recManagerV2 = OptRecManagerV2.getInstance(project);
         for (val model : dataflowManager.listUnderliningDataModels()) {
             val dataflow = dataflowManager.getDataflow(model.getId()).copy();
-            Map<Long, GarbageLayoutType> garbageLayouts = 
IndexOptimizerFactory.getOptimizer(dataflow, true)
-                    .getGarbageLayoutMap(dataflow);
+            Map<Long, GarbageLayoutType> garbageLayouts = IndexOptimizerFactory
+                    .getOptimizer(dataflow, needAggressiveOpt, 
true).getGarbageLayoutMap(dataflow);
+
+            if (needAggressiveOpt) {
+                needOptAggressivelyModels.put(dataflow, garbageLayouts);
+                continue;
+            }
 
             if (MapUtils.isEmpty(garbageLayouts)) {
                 continue;
             }
+
             boolean hasNewRecItem = 
recManagerV2.genRecItemsFromIndexOptimizer(project, model.getUuid(),
                     garbageLayouts);
             if (hasNewRecItem) {
@@ -78,8 +102,22 @@ public class IndexCleaner extends MetadataCleaner {
         log.info("Clean index in project {} finished", project);
     }
 
+    @Override
+    public void beforeCleanup() {
+        if (MapUtils.isEmpty(needOptAggressivelyModels)) {
+            return;
+        }
+
+        approveRec();
+        mergeSameDimAggLayout();
+    }
+
     @Override
     public void cleanup() {
+        if (MapUtils.isNotEmpty(needOptAggressivelyModels)) {
+            cleanUpIndexAggressively();
+        }
+
         if (needUpdateModels.isEmpty()) {
             return;
         }
@@ -96,4 +134,83 @@ public class IndexCleaner extends MetadataCleaner {
         });
     }
 
+    @Override
+    public void afterCleanup() {
+        if (MapUtils.isEmpty(needOptAggressivelyModels)) {
+            return;
+        }
+
+        buildIndexes();
+    }
+
+    private List<Set<LayoutEntity>> getMergedLayouts(IndexPlan indexPlan, 
Map<Long, GarbageLayoutType> garbageLayouts) {
+        List<LayoutEntity> merged = Lists.newArrayList();
+        garbageLayouts.forEach(((layoutId, garbageLayoutType) -> {
+            LayoutEntity layout = indexPlan.getLayoutEntity(layoutId);
+            if (GarbageLayoutType.MERGED.equals(garbageLayoutType) && 
!Objects.isNull(layout)) {
+                merged.add(layout);
+            }
+        }));
+
+        return IndexPlanReduceUtil.collectSameDimAggLayouts(merged);
+    }
+
+    private void approveRec() {
+        EventBusFactory eventBusF = EventBusFactory.getInstance();
+        eventBusF.callService(new ApproveRecsEvent(project, 
needOptAggressivelyModels));
+    }
+
+    private void mergeSameDimAggLayout() {
+        NIndexPlanManager indexPlanManager = 
NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        needOptAggressivelyModels.forEach((dataflow, garbageLayouts) -> {
+            IndexPlan indexPlan = 
indexPlanManager.getIndexPlan(dataflow.getId());
+            List<Set<LayoutEntity>> mergedLayouts = 
getMergedLayouts(indexPlan, garbageLayouts);
+            if (CollectionUtils.isEmpty(mergedLayouts)) {
+                return;
+            }
+
+            log.info("merge same dimension index for model: {} under project: 
{}", dataflow.getModelAlias(), project);
+            indexPlanManager.updateIndexPlan(dataflow.getId(), copyForWrite -> 
{
+                IndexPlan indexPlanMerged = 
IndexPlanReduceUtil.mergeSameDimLayout(indexPlan, mergedLayouts);
+                copyForWrite.setIndexes(indexPlanMerged.getIndexes());
+                copyForWrite.setLastModified(System.currentTimeMillis());
+            });
+        });
+    }
+
+    private void cleanUpIndexAggressively() {
+        NIndexPlanManager indexPlanManager = 
NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        needOptAggressivelyModels.forEach((dataflow, garbageLayouts) -> {
+            if (MapUtils.isEmpty(garbageLayouts)) {
+                return;
+            }
+            log.info("aggressively clean up index for model: {} under project: 
{}", dataflow.getModelAlias(), project);
+            IndexPlan indexPlan = 
indexPlanManager.getIndexPlan(dataflow.getId());
+            deleteIndexes(indexPlan, garbageLayouts.keySet());
+        });
+    }
+
+    private void deleteIndexes(IndexPlan indexPlan, Set<Long> garbageLayouts) {
+        garbageLayouts.stream().map(layoutId -> 
indexPlan.getLayoutEntity(layoutId).getIndex())
+                .forEachOrdered(index -> {
+                    indexPlan.getIndexes().remove(index);
+                    indexPlan.getToBeDeletedIndexes().add(index);
+                });
+
+        NIndexPlanManager indexPlanManager = 
NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        indexPlanManager.updateIndexPlan(indexPlan.getId(), copyForWrite -> {
+            copyForWrite.setLastModified(System.currentTimeMillis());
+            copyForWrite.getToBeDeletedIndexes()
+                    .addAll(indexPlan.getToBeDeletedIndexes().stream()
+                            .filter(index -> 
!copyForWrite.getToBeDeletedIndexes().contains(index))
+                            .collect(Collectors.toList()));
+            
copyForWrite.getIndexes().removeAll(indexPlan.getToBeDeletedIndexes());
+        });
+    }
+
+    private void buildIndexes() {
+        EventBusFactory eventBusFactory = EventBusFactory.getInstance();
+        eventBusFactory
+                .callService(new BuildIndexEvent(project, 
Lists.newArrayList(needOptAggressivelyModels.keySet())));
+    }
 }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java
index 84f04556d8..2f96bfa281 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/MetadataCleaner.java
@@ -25,9 +25,15 @@ public abstract class MetadataCleaner {
         this.project = project;
     }
 
+    // do in transaction
+    public abstract void beforeCleanup();
+
     // do in transaction
     public abstract void cleanup();
 
+    // do in transaction
+    public abstract void afterCleanup();
+
     public void prepare() {
         // default do nothing
     }
diff --git 
a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java 
b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java
index 1add64dc9c..3c1d897be8 100644
--- a/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java
+++ b/src/tool/src/main/java/org/apache/kylin/tool/garbage/SnapshotCleaner.java
@@ -43,6 +43,11 @@ public class SnapshotCleaner extends MetadataCleaner {
         super(project);
     }
 
+    @Override
+    public void beforeCleanup() {
+        // do nothing
+    }
+
     @Override
     public void prepare() {
         NTableMetadataManager tMgr = 
NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
@@ -88,4 +93,9 @@ public class SnapshotCleaner extends MetadataCleaner {
         }
         logger.info("Clean snapshot in project {} finished", project);
     }
+
+    @Override
+    public void afterCleanup() {
+        // do nothing
+    }
 }

Reply via email to