This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f241d43dca49719c9848510ca936ecc63c06bab1
Author: Jiale He <35652389+jial...@users.noreply.github.com>
AuthorDate: Thu Nov 3 19:14:06 2022 +0800

    KYLIN-5343 Add column datatype check when import model
---
 .../org/apache/kylin/metadata/model/TableDesc.java |  18 ---
 .../metadata/model/schema/ModelImportChecker.java  |  31 ++--
 .../model/schema/SchemaChangeCheckResult.java      |  99 +++++++------
 .../kylin/metadata/model/schema/SchemaUtil.java    |  24 ++-
 .../schema/strategy/ComputedColumnStrategy.java    |  37 +++--
 .../schema/strategy/MultiplePartitionStrategy.java |   8 +-
 .../schema/strategy/OverWritableStrategy.java      |  16 +-
 .../schema/strategy/SchemaChangeStrategy.java      |  32 ++--
 .../model/schema/strategy/TableColumnStrategy.java |  27 ++--
 .../model/schema/strategy/TableStrategy.java       |   9 +-
 .../schema/strategy/UnOverWritableStrategy.java    |  18 ++-
 .../apache/kylin/metadata/model/TableDescTest.java |  55 -------
 .../metadata/model/schema/SchemaUtilTest.java      |  14 ++
 .../localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json |  14 +-
 .../kylin/rest/service/MetaStoreService.java       | 158 ++++++++++----------
 .../kylin/rest/service/MetaStoreServiceTest.java   | 161 ++++-----------------
 16 files changed, 316 insertions(+), 405 deletions(-)

diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 33231ff7b9..1202ba6b07 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -307,23 +306,6 @@ public class TableDesc extends RootPersistentEntity 
implements Serializable, ISo
         return null;
     }
 
-    public Pair<Set<ColumnDesc>, Set<ColumnDesc>> findColumns(Set<ColumnDesc> 
columnDescSet) {
-        Set<ColumnDesc> existColSet = Sets.newHashSet();
-        Set<ColumnDesc> notExistColSet = Sets.newHashSet();
-        if (CollectionUtils.isEmpty(columnDescSet)) {
-            return Pair.newPair(existColSet, notExistColSet);
-        }
-        for (ColumnDesc searchColumnDesc : columnDescSet) {
-            ColumnDesc columnDesc = 
findColumnByName(searchColumnDesc.getName());
-            if (Objects.isNull(columnDesc)) {
-                notExistColSet.add(searchColumnDesc);
-            } else {
-                existColSet.add(columnDesc);
-            }
-        }
-        return Pair.newPair(existColSet, notExistColSet);
-    }
-
     @Override
     public String getResourcePath() {
         return concatResourcePath(getIdentity(), project);
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java
index 560ca58538..49e8977d2d 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java
@@ -24,6 +24,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.schema.strategy.ComputedColumnStrategy;
 import 
org.apache.kylin.metadata.model.schema.strategy.MultiplePartitionStrategy;
@@ -43,24 +45,29 @@ public class ModelImportChecker {
             new UnOverWritableStrategy(), new TableColumnStrategy(), new 
TableStrategy(), new OverWritableStrategy(),
             new MultiplePartitionStrategy());
 
-    public static SchemaChangeCheckResult check(SchemaUtil.SchemaDifference 
difference,
-            ImportModelContext importModelContext) {
+    public static SchemaChangeCheckResult check(SchemaUtil.SchemaDifference 
diff, ImportModelContext context) {
+        String targetProject = context.getTargetProject();
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
         Set<String> importModels = NDataModelManager
-                .getInstance(importModelContext.getImportKylinConfig(), 
importModelContext.getTargetProject())
-                .listAllModelAlias().stream().map(model -> 
importModelContext.getNewModels().getOrDefault(model, model))
+                .getInstance(context.getImportKylinConfig(), 
targetProject).listAllModelAlias().stream()
+                .map(model -> context.getNewModels().getOrDefault(model, 
model))
                 .collect(Collectors.toSet());
 
-        Set<String> originalModels = NDataModelManager
-                .getInstance(KylinConfig.getInstanceFromEnv(), 
importModelContext.getTargetProject())
-                .listAllModelAlias();
+        // all models include broken
+        Set<String> originalModels = 
NDataModelManager.getInstance(kylinConfig, targetProject).listAllModelAlias();
+        // broken models
+        Set<String> originBrokenModels = 
NDataflowManager.getInstance(kylinConfig, targetProject).listAllDataflows(true)
+                .stream().filter(NDataflow::checkBrokenWithRelatedInfo).map(df 
-> df.getModel().getAlias())
+                .collect(Collectors.toSet());
 
         val result = new SchemaChangeCheckResult();
         for (SchemaChangeStrategy strategy : strategies) {
-            result.addMissingItems(strategy.missingItems(difference, 
importModels, originalModels));
-            result.addNewItems(strategy.newItems(difference, importModels, 
originalModels));
-            result.addReduceItems(strategy.reduceItems(difference, 
importModels, originalModels));
-            result.addUpdateItems(strategy.updateItems(difference, 
importModels, originalModels));
-            result.areEqual(strategy.areEqual(difference, importModels));
+            result.addMissingItems(strategy.missingItems(diff, importModels, 
originalModels, originBrokenModels));
+            result.addNewItems(strategy.newItems(diff, importModels, 
originalModels, originBrokenModels));
+            result.addReduceItems(strategy.reduceItems(diff, importModels, 
originalModels, originBrokenModels));
+            result.addUpdateItems(strategy.updateItems(diff, importModels, 
originalModels, originBrokenModels));
+            result.areEqual(strategy.areEqual(diff, importModels));
         }
         return result;
     }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java
index 90b116cd85..fbe1b0489d 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java
@@ -24,19 +24,20 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.kylin.metadata.model.TableDesc;
+
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonUnwrapped;
-import com.google.common.collect.Sets;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
+import lombok.val;
 
 @Data
 public class SchemaChangeCheckResult {
@@ -44,6 +45,9 @@ public class SchemaChangeCheckResult {
     @JsonProperty
     private Map<String, ModelSchemaChange> models = new HashMap<>();
 
+    @JsonIgnore
+    private List<TableDesc> existTableList = new ArrayList<>();
+
     @Data
     public static class ModelSchemaChange {
         private int differences;
@@ -67,13 +71,13 @@ public class SchemaChangeCheckResult {
         @JsonProperty("importable")
         public boolean importable() {
             return Stream.of(missingItems, newItems, updateItems, 
reduceItems).flatMap(Collection::stream)
-                    .allMatch(BaseItem::isImportable) || isLoadTableAble();
+                    .allMatch(BaseItem::isImportable);
         }
 
         @JsonProperty("creatable")
         public boolean creatable() {
             return Stream.of(missingItems, newItems, updateItems, 
reduceItems).flatMap(Collection::stream)
-                    .allMatch(BaseItem::isCreatable) || isLoadTableAble();
+                    .allMatch(BaseItem::isCreatable);
         }
 
         @JsonProperty("")
@@ -92,13 +96,12 @@ public class SchemaChangeCheckResult {
                     .allMatch(BaseItem::isHasSameName);
         }
 
-        @Setter
-        @JsonIgnore
-        private boolean loadTableAble = false;
-
-        @Getter
-        @JsonIgnore
-        private Set<String> loadTables = Sets.newHashSet();
+        @JsonProperty("has_same_name_broken")
+        public boolean hasSameNameBroken() {
+            val set = Stream.of(missingItems, newItems, updateItems, 
reduceItems).flatMap(Collection::stream)
+                    .collect(Collectors.toSet());
+            return !set.isEmpty() && 
set.stream().allMatch(BaseItem::isHasSameNameBroken);
+        }
     }
 
     @Data
@@ -117,6 +120,9 @@ public class SchemaChangeCheckResult {
         @JsonProperty("has_same_name")
         boolean hasSameName;
 
+        @JsonProperty("has_same_name_broken")
+        boolean hasSameNameBroken;
+
         @JsonProperty("importable")
         boolean importable;
         @JsonProperty("creatable")
@@ -142,40 +148,45 @@ public class SchemaChangeCheckResult {
     @NoArgsConstructor
     @AllArgsConstructor
     public static class ChangedItem extends BaseItem {
-        @Getter(PRIVATE)
+
         private SchemaNode schemaNode;
 
         public ChangedItem(SchemaNodeType type, SchemaNode schemaNode, String 
modelAlias, UN_IMPORT_REASON reason,
-                String conflictItem, boolean hasSameName, boolean importable, 
boolean creatable, boolean overwritable) {
-            super(type, modelAlias, new ConflictReason(reason, conflictItem), 
hasSameName, importable, creatable,
-                    overwritable);
+                String conflictItem, BaseItemParameter parameter) {
+            super(type, modelAlias, new ConflictReason(reason, conflictItem), 
parameter.hasSameName,
+                    parameter.hasSameNameBroken, parameter.importable, 
parameter.creatable, parameter.overwritable);
             this.schemaNode = schemaNode;
         }
 
         public static ChangedItem createUnImportableSchemaNode(SchemaNodeType 
type, SchemaNode schemaNode,
-                UN_IMPORT_REASON reason, String conflictItem, boolean 
hasSameName) {
-            return new ChangedItem(type, schemaNode, null, reason, 
conflictItem, hasSameName, false, false, false);
+                UN_IMPORT_REASON reason, String conflictItem, boolean 
hasSameName, boolean hasSameNameBroken) {
+            return new ChangedItem(type, schemaNode, null, reason, 
conflictItem,
+                    new BaseItemParameter(hasSameName, hasSameNameBroken, 
false, false, false));
         }
 
         public static ChangedItem createUnImportableSchemaNode(SchemaNodeType 
type, SchemaNode schemaNode,
-                String modelAlias, UN_IMPORT_REASON reason, String 
conflictItem, boolean hasSameName) {
-            return new ChangedItem(type, schemaNode, modelAlias, reason, 
conflictItem, hasSameName, false, false,
-                    false);
+                String modelAlias, UN_IMPORT_REASON reason, String 
conflictItem, boolean hasSameName,
+                boolean hasSameNameBroken) {
+            return new ChangedItem(type, schemaNode, modelAlias, reason, 
conflictItem,
+                    new BaseItemParameter(hasSameName, hasSameNameBroken, 
false, false, false));
         }
 
         public static ChangedItem createOverwritableSchemaNode(SchemaNodeType 
type, SchemaNode schemaNode,
-                boolean hasSameName) {
-            return new ChangedItem(type, schemaNode, null, null, null, 
hasSameName, true, true, true);
+                boolean hasSameName, boolean hasSameNameBroken) {
+            return new ChangedItem(type, schemaNode, null, null, null,
+                    new BaseItemParameter(hasSameName, hasSameNameBroken, 
true, true, true));
         }
 
         public static ChangedItem createOverwritableSchemaNode(SchemaNodeType 
type, SchemaNode schemaNode,
-                String modelAlias, boolean hasSameName) {
-            return new ChangedItem(type, schemaNode, modelAlias, null, null, 
hasSameName, true, true, true);
+                String modelAlias, boolean hasSameName, boolean 
hasSameNameBroken) {
+            return new ChangedItem(type, schemaNode, modelAlias, null, null,
+                    new BaseItemParameter(hasSameName, hasSameNameBroken, 
true, true, true));
         }
 
         public static ChangedItem createCreatableSchemaNode(SchemaNodeType 
type, SchemaNode schemaNode,
-                boolean hasSameName) {
-            return new ChangedItem(type, schemaNode, null, null, null, 
hasSameName, true, true, false);
+                boolean hasSameName, boolean hasSameNameBroken) {
+            return new ChangedItem(type, schemaNode, null, null, null,
+                    new BaseItemParameter(hasSameName, hasSameNameBroken, 
true, true, false));
         }
 
         public String getModelAlias() {
@@ -219,26 +230,22 @@ public class SchemaChangeCheckResult {
             return getDetail(secondSchemaNode);
         }
 
-        public UpdatedItem(SchemaNode firstSchemaNode, SchemaNode 
secondSchemaNode, String modelAlias,
-                UN_IMPORT_REASON reason, String conflictItem, boolean 
hasSameName, boolean importable,
-                boolean creatable, boolean overwritable) {
-            super(secondSchemaNode.getType(), modelAlias, new 
ConflictReason(reason, conflictItem), hasSameName,
-                    importable, creatable, overwritable);
-            this.firstSchemaNode = firstSchemaNode;
-            this.secondSchemaNode = secondSchemaNode;
+        public UpdatedItem(SchemaNode first, SchemaNode second, String 
modelAlias, UN_IMPORT_REASON reason,
+                String conflictItem, BaseItemParameter parameter) {
+            super(second.getType(), modelAlias, new ConflictReason(reason, 
conflictItem), parameter.hasSameName,
+                    parameter.hasSameNameBroken, parameter.importable, 
parameter.creatable, parameter.overwritable);
+            this.firstSchemaNode = first;
+            this.secondSchemaNode = second;
         }
 
         public static UpdatedItem getSchemaUpdate(SchemaNode first, SchemaNode 
second, String modelAlias,
-                UN_IMPORT_REASON reason, String conflictItem, boolean 
hasSameName, boolean importable,
-                boolean creatable, boolean overwritable) {
-            return new UpdatedItem(first, second, modelAlias, reason, 
conflictItem, hasSameName, importable, creatable,
-                    overwritable);
+                UN_IMPORT_REASON reason, String conflictItem, 
BaseItemParameter parameter) {
+            return new UpdatedItem(first, second, modelAlias, reason, 
conflictItem, parameter);
         }
 
         public static UpdatedItem getSchemaUpdate(SchemaNode first, SchemaNode 
second, String modelAlias,
-                boolean hasSameName, boolean importable, boolean creatable, 
boolean overwritable) {
-            return getSchemaUpdate(first, second, modelAlias, 
UN_IMPORT_REASON.NONE, null, hasSameName, importable,
-                    creatable, overwritable);
+                BaseItemParameter parameter) {
+            return getSchemaUpdate(first, second, modelAlias, 
UN_IMPORT_REASON.NONE, null, parameter);
         }
     }
 
@@ -302,4 +309,14 @@ public class SchemaChangeCheckResult {
         MISSING_TABLE, //
         NONE;
     }
+
+    @Data
+    @AllArgsConstructor
+    public static class BaseItemParameter {
+        private boolean hasSameName;
+        private boolean hasSameNameBroken;
+        private boolean importable;
+        private boolean creatable;
+        private boolean overwritable;
+    }
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java
index e90867b965..24b2512a49 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java
@@ -25,15 +25,15 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.JoinTableDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.cube.model.IndexPlan;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
 
 import com.google.common.collect.Lists;
 
@@ -48,6 +48,22 @@ import lombok.val;
 
 public class SchemaUtil {
 
+    public static SchemaDifference diff(String project, KylinConfig 
sourceConfig, KylinConfig targetConfig,
+            List<TableDesc> incrTableDescList) {
+        val sourceGraph = dependencyGraph(project, sourceConfig, 
incrTableDescList);
+        val targetGraph = dependencyGraph(project, targetConfig);
+        return new SchemaDifference(sourceGraph, targetGraph);
+    }
+
+    public static Graph<SchemaNode> dependencyGraph(String project, 
KylinConfig config,
+            List<TableDesc> incrTableDescList) {
+        val tableManager = NTableMetadataManager.getInstance(config, project);
+        val planManager = NIndexPlanManager.getInstance(config, project);
+        List<TableDesc> tableDescs = 
Lists.newArrayList(tableManager.listAllTables());
+        tableDescs.addAll(incrTableDescList);
+        return dependencyGraph(tableDescs, planManager.listAllIndexPlans());
+    }
+
     public static SchemaDifference diff(String project, KylinConfig 
sourceConfig, KylinConfig targetConfig) {
         val sourceGraph = dependencyGraph(project, sourceConfig);
         val targetGraph = dependencyGraph(project, targetConfig);
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java
index 6b3af9e78a..8274da560b 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java
@@ -48,7 +48,7 @@ public class ComputedColumnStrategy implements 
SchemaChangeStrategy {
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> 
newItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         List<SchemaNode> allComputedColumns = 
difference.getSourceGraph().nodes().stream()
                 .filter(schemaNode -> 
supportedSchemaNodeTypes().contains(schemaNode.getType()))
                 .collect(Collectors.toList());
@@ -59,7 +59,7 @@ public class ComputedColumnStrategy implements 
SchemaChangeStrategy {
         if (hasComputedColumnNameWithDifferentExpression(entry.getValue(), 
allComputedColumns)) {
             return 
Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode(
                     entry.getKey().getType(), entry.getValue(), 
SAME_CC_NAME_HAS_DIFFERENT_EXPR, null,
-                    hasSameName(modelAlias, originalModels)));
+                    hasSameName(modelAlias, originalModels), 
hasSameWithBroken(modelAlias, originalBrokenModels)));
         }
 
         // different cc name with same expression
@@ -67,21 +67,25 @@ public class ComputedColumnStrategy implements 
SchemaChangeStrategy {
         if (optional.isPresent()) {
             return 
Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode(
                     entry.getKey().getType(), entry.getValue(), 
DIFFERENT_CC_NAME_HAS_SAME_EXPR,
-                    optional.get().getDetail(), hasSameName(modelAlias, 
originalModels)));
+                    optional.get().getDetail(), hasSameName(modelAlias, 
originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         }
 
         if (overwritable(importModels, originalModels, modelAlias)) {
             return 
Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         } else {
             return 
Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         }
     }
 
     @Override
     public List<SchemaChangeCheckResult.UpdatedItem> 
updateItemFunction(SchemaUtil.SchemaDifference difference,
-            MapDifference.ValueDifference<SchemaNode> diff, Set<String> 
importModels, Set<String> originalModels) {
+            MapDifference.ValueDifference<SchemaNode> diff, Set<String> 
importModels, Set<String> originalModels,
+            Set<String> originalBrokenModels) {
         List<SchemaNode> allComputedColumns = 
difference.getSourceGraph().nodes().stream()
                 .filter(schemaNode -> 
supportedSchemaNodeTypes().contains(schemaNode.getType()))
                 .collect(Collectors.toList());
@@ -90,36 +94,43 @@ public class ComputedColumnStrategy implements 
SchemaChangeStrategy {
         String modelAlias = diff.rightValue().getSubject();
         // same cc name with different expression
         if (hasComputedColumnNameWithDifferentExpression(schemaNode, 
allComputedColumns)) {
+            val parameter = new 
SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, 
originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels), 
false, false, false);
             return 
Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                    diff.rightValue(), modelAlias, 
SAME_CC_NAME_HAS_DIFFERENT_EXPR, null,
-                    hasSameName(modelAlias, originalModels), false, false, 
false));
+                    diff.rightValue(), modelAlias, 
SAME_CC_NAME_HAS_DIFFERENT_EXPR, null, parameter));
         }
 
         // different cc name with same expression
         val optional = hasExpressionWithDifferentComputedColumn(schemaNode, 
allComputedColumns);
         if (optional.isPresent()) {
+            val parameter = new 
SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, 
originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels), 
false, false, false);
             return 
Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
                     diff.rightValue(), modelAlias, 
DIFFERENT_CC_NAME_HAS_SAME_EXPR, optional.get().getDetail(),
-                    hasSameName(modelAlias, originalModels), false, false, 
false));
+                    parameter));
         }
 
         boolean overwritable = overwritable(importModels, originalModels, 
modelAlias);
+        val parameter = new 
SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, 
originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels), true, 
true, overwritable);
         return 
Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                diff.rightValue(), modelAlias, hasSameName(modelAlias, 
originalModels), true, true, overwritable));
+                diff.rightValue(), modelAlias, parameter));
     }
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> 
reduceItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         String modelAlias = entry.getValue().getSubject();
         boolean overwritable = overwritable(importModels, originalModels, 
modelAlias);
         if (overwritable) {
             return 
Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         } else {
             return 
Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         }
     }
 
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java
index 17c63fdec3..b53420bfd4 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java
@@ -31,6 +31,7 @@ import org.apache.kylin.metadata.model.schema.SchemaUtil;
 import org.apache.kylin.metadata.model.util.MultiPartitionUtil;
 
 import io.kyligence.kap.guava20.shaded.common.collect.MapDifference;
+import lombok.val;
 
 public class MultiplePartitionStrategy extends UnOverWritableStrategy {
     @Override
@@ -40,7 +41,8 @@ public class MultiplePartitionStrategy extends 
UnOverWritableStrategy {
 
     @Override
     public List<SchemaChangeCheckResult.UpdatedItem> 
updateItemFunction(SchemaUtil.SchemaDifference difference,
-            MapDifference.ValueDifference<SchemaNode> diff, Set<String> 
importModels, Set<String> originalModels) {
+            MapDifference.ValueDifference<SchemaNode> diff, Set<String> 
importModels, Set<String> originalModels,
+            Set<String> originalBrokenModels) {
         String modelAlias = diff.rightValue().getSubject();
 
         boolean overwritable = overwritable(diff);
@@ -66,8 +68,10 @@ public class MultiplePartitionStrategy extends 
UnOverWritableStrategy {
             }
         }
 
+        val parameter = new 
SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, 
originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels), true, 
true, overwritable);
         return 
Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                diff.rightValue(), modelAlias, hasSameName(modelAlias, 
originalModels), true, true, overwritable));
+                diff.rightValue(), modelAlias, parameter));
     }
 
     /**
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java
index 998b3c201a..6ccf37feda 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java
@@ -40,27 +40,29 @@ public class OverWritableStrategy implements 
SchemaChangeStrategy {
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> 
newItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
-        return createSchemaChange(difference, entry, importModels, 
originalModels);
+            Set<String> originalModels, Set<String> originalBrokenModels) {
+        return createSchemaChange(difference, entry, importModels, 
originalModels, originalBrokenModels);
     }
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> 
reduceItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
-        return createSchemaChange(difference, entry, importModels, 
originalModels);
+            Set<String> originalModels, Set<String> originalBrokenModels) {
+        return createSchemaChange(difference, entry, importModels, 
originalModels, originalBrokenModels);
     }
 
     private List<SchemaChangeCheckResult.ChangedItem> 
createSchemaChange(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         String modelAlias = entry.getValue().getSubject();
         if (overwritable(importModels, originalModels, modelAlias)) {
             return 
Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         } else {
             return 
Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         }
     }
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java
index ce77f66bf1..30c182d440 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java
@@ -33,64 +33,68 @@ import org.apache.kylin.metadata.model.schema.SchemaUtil;
 import io.kyligence.kap.guava20.shaded.common.collect.MapDifference;
 import io.kyligence.kap.guava20.shaded.common.graph.Graph;
 import io.kyligence.kap.guava20.shaded.common.graph.Graphs;
+import lombok.val;
 
 public interface SchemaChangeStrategy {
     List<SchemaNodeType> supportedSchemaNodeTypes();
 
     default List<SchemaChangeCheckResult.ChangedItem> 
missingItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return Collections.emptyList();
     }
 
     default List<SchemaChangeCheckResult.ChangedItem> 
missingItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> 
originalBrokenModels) {
         return Collections.emptyList();
     }
 
     default List<SchemaChangeCheckResult.ChangedItem> 
newItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return Collections.emptyList();
     }
 
     default List<SchemaChangeCheckResult.ChangedItem> 
newItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> 
originalBrokenModels) {
         return 
difference.getNodeDiff().entriesOnlyOnRight().entrySet().stream()
                 .filter(entry -> 
supportedSchemaNodeTypes().contains(entry.getKey().getType()))
-                .map(entry -> newItemFunction(difference, entry, importModels, 
originalModels))
+                .map(entry -> newItemFunction(difference, entry, importModels, 
originalModels, originalBrokenModels))
                 .flatMap(Collection::stream).filter(schemaChange -> 
importModels.contains(schemaChange.getModelAlias()))
                 .collect(Collectors.toList());
     }
 
     default List<SchemaChangeCheckResult.UpdatedItem> 
updateItemFunction(SchemaUtil.SchemaDifference difference,
-            MapDifference.ValueDifference<SchemaNode> diff, Set<String> 
importModels, Set<String> originalModels) {
+            MapDifference.ValueDifference<SchemaNode> diff, Set<String> 
importModels, Set<String> originalModels,
+            Set<String> originalBrokenModels) {
         String modelAlias = diff.rightValue().getSubject();
         boolean overwritable = overwritable(importModels, originalModels, 
modelAlias);
+        val parameter = new 
SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, 
originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels), true, 
true, overwritable);
         return 
Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                diff.rightValue(), modelAlias, hasSameName(modelAlias, 
originalModels), true, true, overwritable));
+                diff.rightValue(), modelAlias, parameter));
     }
 
     default List<SchemaChangeCheckResult.UpdatedItem> 
updateItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> 
originalBrokenModels) {
         return difference.getNodeDiff().entriesDiffering().values().stream()
                 .filter(entry -> 
supportedSchemaNodeTypes().contains(entry.leftValue().getType()))
-                .map(diff -> updateItemFunction(difference, diff, 
importModels, originalModels))
+                .map(diff -> updateItemFunction(difference, diff, 
importModels, originalModels, originalBrokenModels))
                 .flatMap(Collection::stream).filter(schemaChange -> 
importModels.contains(schemaChange.getModelAlias()))
                 .collect(Collectors.toList());
     }
 
     default List<SchemaChangeCheckResult.ChangedItem> 
reduceItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return Collections.emptyList();
     }
 
     default List<SchemaChangeCheckResult.ChangedItem> 
reduceItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> 
originalBrokenModels) {
         return difference.getNodeDiff().entriesOnlyOnLeft().entrySet().stream()
                 .filter(entry -> 
supportedSchemaNodeTypes().contains(entry.getKey().getType()))
-                .map(entry -> reduceItemFunction(difference, entry, 
importModels, originalModels))
+                .map(entry -> reduceItemFunction(difference, entry, 
importModels, originalModels, originalBrokenModels))
                 .flatMap(Collection::stream).filter(schemaChange -> 
importModels.contains(schemaChange.getModelAlias()))
                 .collect(Collectors.toList());
     }
@@ -115,4 +119,8 @@ public interface SchemaChangeStrategy {
                 .map(SchemaNode::getSubject).collect(Collectors.toSet());
     }
 
+    default boolean hasSameWithBroken(String modelAlias, Set<String> 
originalBrokenModels) {
+        return originalBrokenModels.contains(modelAlias);
+    }
+
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java
index 2157d95a88..752f57dfed 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java
@@ -35,6 +35,7 @@ import org.apache.kylin.metadata.model.schema.SchemaUtil;
 
 import io.kyligence.kap.guava20.shaded.common.collect.MapDifference;
 import io.kyligence.kap.guava20.shaded.common.graph.Graphs;
+import lombok.val;
 
 public class TableColumnStrategy implements SchemaChangeStrategy {
     @Override
@@ -44,10 +45,10 @@ public class TableColumnStrategy implements 
SchemaChangeStrategy {
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> 
missingItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> 
originalBrokenModels) {
         return 
difference.getNodeDiff().entriesOnlyOnRight().entrySet().stream()
                 .filter(pair -> 
supportedSchemaNodeTypes().contains(pair.getKey().getType()))
-                .map(pair -> missingItemFunction(difference, pair, 
importModels, originalModels))
+                .map(pair -> missingItemFunction(difference, pair, 
importModels, originalModels, originalBrokenModels))
                 .flatMap(Collection::stream).filter(schemaChange -> 
importModels.contains(schemaChange.getModelAlias()))
                 .collect(Collectors.toList());
     }
@@ -55,34 +56,40 @@ public class TableColumnStrategy implements 
SchemaChangeStrategy {
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> 
missingItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return reachableModel(difference.getTargetGraph(), 
entry.getValue()).stream()
                 .map(modelAlias -> 
SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode(
                         entry.getKey().getType(), entry.getValue(), 
modelAlias, USED_UNLOADED_TABLE,
-                        entry.getValue().getDetail(), hasSameName(modelAlias, 
originalModels)))
+                        entry.getValue().getDetail(), hasSameName(modelAlias, 
originalModels),
+                        hasSameWithBroken(modelAlias, originalBrokenModels)))
                 .collect(Collectors.toList());
     }
 
     @Override
     public List<SchemaChangeCheckResult.UpdatedItem> 
updateItemFunction(SchemaUtil.SchemaDifference difference,
-            MapDifference.ValueDifference<SchemaNode> diff, Set<String> 
importModels, Set<String> originalModels) {
+            MapDifference.ValueDifference<SchemaNode> diff, Set<String> 
importModels, Set<String> originalModels,
+            Set<String> originalBrokenModels) {
         return Graphs.reachableNodes(difference.getTargetGraph(), 
diff.rightValue()).stream()
                 
.filter(SchemaNode::isModelNode).map(SchemaNode::getSubject).distinct()
-                .map(modelAlias -> 
SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                        diff.rightValue(), modelAlias, 
TABLE_COLUMN_DATATYPE_CHANGED, diff.rightValue().getDetail(),
-                        hasSameName(modelAlias, originalModels), false, false, 
false))
+                .map(modelAlias -> {
+                    val parameter = new 
SchemaChangeCheckResult.BaseItemParameter(
+                            hasSameName(modelAlias, originalModels),
+                            hasSameWithBroken(modelAlias, 
originalBrokenModels), false, false, false);
+                    return 
SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(), 
diff.rightValue(),
+                            modelAlias, TABLE_COLUMN_DATATYPE_CHANGED, 
diff.rightValue().getDetail(), parameter);
+                })
                 .collect(Collectors.toList());
     }
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> 
reduceItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return Graphs.reachableNodes(difference.getSourceGraph(), 
entry.getValue()).stream()
                 
.filter(SchemaNode::isModelNode).map(SchemaNode::getSubject).distinct()
                 .map(modelAlias -> 
SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode(
                         entry.getKey().getType(), entry.getValue(), modelAlias,
-                        hasSameName(modelAlias, originalModels)))
+                        hasSameName(modelAlias, originalModels), 
hasSameWithBroken(modelAlias, originalBrokenModels)))
                 .collect(Collectors.toList());
     }
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java
index b727aa8faa..55746d6fcf 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java
@@ -40,10 +40,10 @@ public class TableStrategy implements SchemaChangeStrategy {
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> 
missingItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> 
originalBrokenModels) {
         return 
difference.getNodeDiff().entriesOnlyOnRight().entrySet().stream()
                 .filter(pair -> 
supportedSchemaNodeTypes().contains(pair.getKey().getType()))
-                .map(pair -> missingItemFunction(difference, pair, 
importModels, originalModels))
+                .map(pair -> missingItemFunction(difference, pair, 
importModels, originalModels, originalBrokenModels))
                 .flatMap(Collection::stream).filter(schemaChange -> 
importModels.contains(schemaChange.getModelAlias()))
                 .collect(Collectors.toList());
     }
@@ -51,11 +51,12 @@ public class TableStrategy implements SchemaChangeStrategy {
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> 
missingItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return reachableModel(difference.getTargetGraph(), 
entry.getValue()).stream()
                 .map(modelAlias -> 
SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode(
                         entry.getKey().getType(), entry.getValue(), 
modelAlias, MISSING_TABLE,
-                        entry.getValue().getDetail(), hasSameName(modelAlias, 
originalModels)))
+                        entry.getValue().getDetail(), hasSameName(modelAlias, 
originalModels),
+                        hasSameWithBroken(modelAlias, originalBrokenModels)))
                 .collect(Collectors.toList());
     }
 }
diff --git 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java
 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java
index 20f85b7b24..f405c855e7 100644
--- 
a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java
+++ 
b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java
@@ -30,6 +30,7 @@ import org.apache.kylin.metadata.model.schema.SchemaNodeType;
 import org.apache.kylin.metadata.model.schema.SchemaUtil;
 
 import io.kyligence.kap.guava20.shaded.common.collect.MapDifference;
+import lombok.val;
 
 public class UnOverWritableStrategy implements SchemaChangeStrategy {
 
@@ -42,26 +43,31 @@ public class UnOverWritableStrategy implements 
SchemaChangeStrategy {
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> 
newItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         String modelAlias = entry.getValue().getSubject();
         return 
Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode(
-                entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels)));
+                entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels)));
     }
 
     @Override
     public List<SchemaChangeCheckResult.UpdatedItem> 
updateItemFunction(SchemaUtil.SchemaDifference difference,
-            MapDifference.ValueDifference<SchemaNode> diff, Set<String> 
importModels, Set<String> originalModels) {
+            MapDifference.ValueDifference<SchemaNode> diff, Set<String> 
importModels, Set<String> originalModels,
+            Set<String> originalBrokenModels) {
         String modelAlias = diff.rightValue().getSubject();
+        val parameter = new 
SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, 
originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels), true, 
true, false);
         return 
Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                diff.rightValue(), modelAlias, hasSameName(modelAlias, 
originalModels), true, true, false));
+                diff.rightValue(), modelAlias, parameter));
     }
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> 
reduceItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, 
Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         String modelAlias = entry.getValue().getSubject();
         return 
Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode(
-                entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels)));
+                entry.getKey().getType(), entry.getValue(), 
hasSameName(modelAlias, originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels)));
     }
 }
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java
index 7eff5cffa9..66e6036843 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java
@@ -21,7 +21,6 @@ package org.apache.kylin.metadata.model;
 import static 
org.apache.kylin.metadata.model.NTableMetadataManager.getInstance;
 
 import java.util.Locale;
-import java.util.Set;
 
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.junit.After;
@@ -29,10 +28,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Sets;
-
-import lombok.val;
-
 public class TableDescTest extends NLocalFileMetadataTestCase {
     private final String project = "default";
     private NTableMetadataManager tableMetadataManager;
@@ -72,54 +67,4 @@ public class TableDescTest extends 
NLocalFileMetadataTestCase {
         final TableDesc tableDesc = 
tableMetadataManager.getTableDesc(tableName);
         Assert.assertFalse(tableDesc.isRangePartition());
     }
-
-    @Test
-    public void testFindColumns() {
-        final String tableName = "DEFAULT.TEST_KYLIN_FACT";
-        final TableDesc tableDesc = 
tableMetadataManager.getTableDesc(tableName);
-        ColumnDesc[] columns = tableDesc.getColumns();
-        Assert.assertEquals(12, columns.length);
-
-        {
-            // test search column empty
-            Set<ColumnDesc> searchColSet = Sets.newHashSet();
-            val pair = tableDesc.findColumns(searchColSet);
-            Assert.assertTrue(pair.getFirst().isEmpty());
-            Assert.assertTrue(pair.getSecond().isEmpty());
-        }
-
-        {
-            // test all founded
-            Set<ColumnDesc> searchColSet = Sets.newHashSet(
-                    new ColumnDesc("1", "TRANS_ID", "bigint", "TRANS_ID", "", 
"", ""),
-                    new ColumnDesc("2", "ORDER_ID", "bigint", "TRANS_ID", "", 
"", ""));
-            val pair = tableDesc.findColumns(searchColSet);
-            Assert.assertFalse(pair.getFirst().isEmpty());
-            Assert.assertTrue(pair.getSecond().isEmpty());
-            Assert.assertEquals(2, pair.getFirst().size());
-        }
-
-        {
-            // test part founded
-            Set<ColumnDesc> searchColSet = Sets.newHashSet(
-                    new ColumnDesc("1", "TRANS_ID_1", "bigint", "TRANS_ID", 
"", "", ""),
-                    new ColumnDesc("2", "ORDER_ID", "bigint", "TRANS_ID", "", 
"", ""));
-            val pair = tableDesc.findColumns(searchColSet);
-            Assert.assertFalse(pair.getFirst().isEmpty());
-            Assert.assertFalse(pair.getSecond().isEmpty());
-            Assert.assertEquals(1, pair.getFirst().size());
-            Assert.assertEquals(1, pair.getSecond().size());
-        }
-
-        {
-            // test part founded
-            Set<ColumnDesc> searchColSet = Sets.newHashSet(
-                    new ColumnDesc("1", "TRANS_ID_1", "bigint", "TRANS_ID", 
"", "", ""),
-                    new ColumnDesc("2", "ORDER_ID_1", "bigint", "TRANS_ID", 
"", "", ""));
-            val pair = tableDesc.findColumns(searchColSet);
-            Assert.assertTrue(pair.getFirst().isEmpty());
-            Assert.assertFalse(pair.getSecond().isEmpty());
-            Assert.assertEquals(2, pair.getSecond().size());
-        }
-    }
 }
diff --git 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java
 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java
index 4b43a9e538..61a605201f 100644
--- 
a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java
+++ 
b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java
@@ -42,6 +42,8 @@ import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.model.ModelJoinRelationTypeEnum;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -49,6 +51,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import io.kyligence.kap.guava20.shaded.common.collect.Sets;
@@ -555,6 +558,17 @@ public class SchemaUtilTest extends 
NLocalFileMetadataTestCase {
                         && schemaChange.getConflictReason()
                                 .getReason() == 
SchemaChangeCheckResult.UN_IMPORT_REASON.MISSING_TABLE
                         && 
schemaChange.getConflictReason().getConflictItem().equals("SSB.CUSTOMER_NEW")));
+
+        NTableMetadataManager manager = 
NTableMetadataManager.getInstance(getTestConfig(), getTargetProject());
+        TableDesc tableDesc = manager.getTableDesc("SSB.CUSTOMER");
+        tableDesc.setName("CUSTOMER_NEW");
+        tableDesc.init(getTargetProject());
+        val diff = SchemaUtil.diff(getTargetProject(), 
KylinConfig.getInstanceFromEnv(),
+                importModelContext.getTargetKylinConfig(), 
Lists.newArrayList(tableDesc));
+        val checkResult = ModelImportChecker.check(diff, importModelContext);
+        Assert.assertFalse(checkResult.getModels().isEmpty());
+        val change = checkResult.getModels().get(getTargetModel());
+        Assert.assertTrue(change.getMissingItems().isEmpty());
     }
 
     @Test
diff --git 
a/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json 
b/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json
index 88ecdea540..076c6e26ee 100644
--- a/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json
+++ b/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json
@@ -9,31 +9,31 @@
   }, {
     "id" : "2",
     "name" : "C_NAME",
-    "datatype" : "varchar(25)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "3",
     "name" : "C_ADDRESS",
-    "datatype" : "varchar(40)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "4",
     "name" : "C_CITY",
-    "datatype" : "varchar(10)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "5",
     "name" : "C_NATION",
-    "datatype" : "varchar(15)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "6",
     "name" : "C_REGION",
-    "datatype" : "varchar(12)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "7",
     "name" : "C_PHONE",
-    "datatype" : "varchar(15)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "8",
     "name" : "C_MKTSEGMENT",
-    "datatype" : "varchar(10)"
+    "datatype": "varchar(4096)"
   } ],
   "database" : "SSB",
   "last_modified" : 1457444146362,
diff --git 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
index 21097bcd85..32d3f93901 100644
--- 
a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
+++ 
b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
@@ -28,10 +28,8 @@ import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_
 import static 
org.apache.kylin.common.persistence.ResourceStore.METASTORE_UUID_TAG;
 import static org.apache.kylin.common.persistence.ResourceStore.VERSION_FILE;
 import static 
org.apache.kylin.metadata.model.schema.ImportModelContext.MODEL_REC_PATH;
-import static 
org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.DIFFERENT_CC_NAME_HAS_SAME_EXPR;
-import static 
org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.SAME_CC_NAME_HAS_DIFFERENT_EXPR;
-import static 
org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.TABLE_COLUMN_DATATYPE_CHANGED;
-import static 
org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_TABLE;
+import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_DIM;
+import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_FACT;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -39,8 +37,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -72,13 +70,13 @@ import 
org.apache.kylin.common.persistence.metadata.MetadataStore;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.MetadataChecker;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
 import org.apache.kylin.metadata.cube.model.IndexPlan;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.apache.kylin.metadata.cube.model.RuleBasedIndex;
-import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.MultiPartitionDesc;
 import org.apache.kylin.metadata.model.NDataModel;
@@ -99,6 +97,7 @@ import 
org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.aspect.Transaction;
 import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum;
 import org.apache.kylin.rest.request.ModelImportRequest;
+import org.apache.kylin.rest.request.ModelImportRequest.ImportType;
 import org.apache.kylin.rest.request.UpdateRuleBasedCuboidRequest;
 import org.apache.kylin.rest.response.LoadTableResponse;
 import org.apache.kylin.rest.response.ModelPreviewResponse;
@@ -137,9 +136,6 @@ public class MetaStoreService extends BasicService {
     private static final Pattern MD5_PATTERN = 
Pattern.compile(".*([a-fA-F\\d]{32})\\.zip");
     private static final String RULE_SCHEDULER_DATA_KEY = 
"kylin.index.rule-scheduler-data";
 
-    private static final Set<SchemaChangeCheckResult.UN_IMPORT_REASON> 
UN_IMPORT_REASONS = Sets.newHashSet(
-            SAME_CC_NAME_HAS_DIFFERENT_EXPR, DIFFERENT_CC_NAME_HAS_SAME_EXPR, 
TABLE_COLUMN_DATATYPE_CHANGED);
-
     @Autowired
     public AclEvaluate aclEvaluate;
 
@@ -378,12 +374,12 @@ public class MetaStoreService extends BasicService {
 
         if (request != null) {
             val newModels = request.getModels().stream()
-                    .filter(modelImport -> modelImport.getImportType() == 
ModelImportRequest.ImportType.NEW)
+                    .filter(modelImport -> modelImport.getImportType() == 
ImportType.NEW)
                     
.collect(Collectors.toMap(ModelImportRequest.ModelImport::getOriginalName,
                             ModelImportRequest.ModelImport::getTargetName));
 
             val unImportModels = request.getModels().stream()
-                    .filter(modelImport -> modelImport.getImportType() == 
ModelImportRequest.ImportType.UN_IMPORT)
+                    .filter(modelImport -> modelImport.getImportType() == 
ImportType.UN_IMPORT)
                     
.map(ModelImportRequest.ModelImport::getOriginalName).collect(Collectors.toList());
 
             return new ImportModelContext(targetProject, srcProject, 
rawResourceMap, newModels, unImportModels);
@@ -418,83 +414,44 @@ public class MetaStoreService extends BasicService {
 
     public SchemaChangeCheckResult checkModelMetadata(String targetProject, 
ImportModelContext context,
             MultipartFile uploadFile) throws IOException {
-        Map<String, RawResource> rawResourceMap = 
getRawResourceFromUploadFile(uploadFile);
 
-        
checkModelMetadataFile(ResourceStore.getKylinMetaStore(context.getTargetKylinConfig()).getMetadataStore(),
+        KylinConfig targetKylinConfig = context.getTargetKylinConfig();
+        Map<String, RawResource> rawResourceMap = 
getRawResourceFromUploadFile(uploadFile);
+        
checkModelMetadataFile(ResourceStore.getKylinMetaStore(targetKylinConfig).getMetadataStore(),
                 rawResourceMap.keySet());
 
-        SchemaUtil.SchemaDifference difference = 
SchemaUtil.diff(targetProject, KylinConfig.getInstanceFromEnv(),
-                context.getTargetKylinConfig());
-
-        SchemaChangeCheckResult checkResult = 
ModelImportChecker.check(difference, context);
-
-        Set<String> loadAbleTables = getLoadAbleTables(targetProject, 
context.getTargetMissTableList());
-        if (CollectionUtils.isEmpty(loadAbleTables)) {
-            return checkResult;
-        }
-        // mark every model loadTableAble
-        return checkTableLoadAble(loadAbleTables, checkResult);
-    }
-
-    public SchemaChangeCheckResult checkTableLoadAble(Set<String> 
loadAbleTables, SchemaChangeCheckResult checkResult) {
-        checkResult.getModels().forEach((modelName, change) -> {
-            if (change.creatable() || change.importable() || 
change.overwritable()) {
-                return;
-            }
-            // Verify that tables used by the model can be fully loaded
-            Set<String> missedTableSet = change.getMissingItems().stream()//
-                    .filter(item -> item.getType().equals(MODEL_TABLE))
-                    
.map(SchemaChangeCheckResult.ChangedItem::getDetail).collect(Collectors.toSet());
-            if (missedTableSet.isEmpty() || 
!loadAbleTables.containsAll(missedTableSet)) {
-                return;
-            }
-            // Verify that model has no conflicts
-            List<SchemaChangeCheckResult.BaseItem> items = 
Lists.newArrayList();
-            items.addAll(change.getNewItems());
-            items.addAll(change.getUpdateItems());
-            boolean hasConflict = items.stream().anyMatch(item -> {
-                val reason = item.getConflictReason().getReason();
-                return UN_IMPORT_REASONS.contains(reason);
-            });
-            if (hasConflict) {
-                return;
-            }
-            change.setLoadTableAble(true);
-            change.getLoadTables().addAll(missedTableSet);
-        });
+        // check missing table exists in datasource
+        List<TableDesc> existTableList = 
searchTablesInDataSource(targetProject, context.getTargetMissTableList());
+        // diff (local metadata + searched tables) and import metadata
+        val diff = SchemaUtil.diff(targetProject, 
KylinConfig.getInstanceFromEnv(), targetKylinConfig, existTableList);
+        SchemaChangeCheckResult checkResult = ModelImportChecker.check(diff, 
context);
+        checkResult.getExistTableList().addAll(existTableList);
         return checkResult;
     }
 
-    public Set<String> getLoadAbleTables(String targetProject, List<TableDesc> 
missTableList) {
+    public List<TableDesc> searchTablesInDataSource(String targetProject, 
List<TableDesc> missTableList) {
         if (CollectionUtils.isEmpty(missTableList)) {
-            return Sets.newHashSet();
+            return Collections.emptyList();
         }
         ProjectInstance projectInstance = 
NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
                 .getProject(targetProject);
         ISourceMetadataExplorer explorer = 
SourceFactory.getSource(projectInstance).getSourceMetadataExplorer();
-        Set<String> loadAbleList = Sets.newHashSet();
+
+        List<TableDesc> existTableSet = Lists.newArrayList();
         for (TableDesc missTableDesc : missTableList) {
             try {
-                // get new table desc from datasource
+                // check datasource exist table
+                // no need to check column
                 TableDesc newTableDesc = explorer
                         .loadTableMetadata(missTableDesc.getDatabase(), 
missTableDesc.getName(), targetProject)
                         .getFirst();
-                // check column all exists
-                Set<ColumnDesc> columnDescList = 
Arrays.stream(missTableDesc.getColumns())
-                        .filter(col -> 
!col.isComputedColumn()).collect(Collectors.toSet());
-                Set<ColumnDesc> notExistColSet = 
newTableDesc.findColumns(columnDescList).getSecond();
-                if (CollectionUtils.isNotEmpty(notExistColSet)) {
-                    // some column not exist in new table desc, mark table 
cannot load
-                    String missCols = 
notExistColSet.stream().map(ColumnDesc::getName).collect(Collectors.joining(","));
-                    logger.warn("Can not find columns [{}] in table [{}]", 
missCols, newTableDesc.getIdentity());
-                    continue;
-                }
-                loadAbleList.add(newTableDesc.getIdentity());
+                newTableDesc.init(targetProject);
+                existTableSet.add(newTableDesc);
             } catch (Exception e) {
                 logger.warn("try load table: {} failed.", 
missTableDesc.getIdentity(), e);
             }
         }
-        return loadAbleList;
+        return existTableSet;
     }
 
     private void checkModelMetadataFile(MetadataStore metadataStore, 
Set<String> rawResourceList) {
@@ -654,24 +611,55 @@ public class MetaStoreService extends BasicService {
         }
     }
 
-    public LoadTableResponse innerLoadTables(String project, 
Set<SchemaChangeCheckResult.ModelSchemaChange> changes)
-            throws Exception {
-        Set<String> loadTables = Sets.newHashSet();
-        changes.forEach(change -> loadTables.addAll(change.getLoadTables()));
-        return tableExtService.loadDbTables(loadTables.toArray(new String[0]), 
project, false);
+    public LoadTableResponse innerLoadTables(String project, Set<String> 
needLoadTables) throws Exception {
+        return tableExtService.loadDbTables(needLoadTables.toArray(new 
String[0]), project, false);
+    }
+
+    public Pair<Set<String>, Map<String, Set<String>>> 
checkNewModelTables(SchemaChangeCheckResult checkResult,
+            ModelImportRequest request) {
+        List<String> existTableList = 
checkResult.getExistTableList().stream().map(TableDesc::getIdentity)
+                .collect(Collectors.toList());
+        List<String> newImportModelList = request.getModels().stream()
+                .filter(modelRequest -> modelRequest.getImportType() == 
ImportType.NEW)
+                
.map(ModelImportRequest.ModelImport::getTargetName).collect(Collectors.toList());
+        // all tables need to be loaded
+        Set<String> needLoadTableSet = Sets.newHashSet();
+        // every model need to be loaded tables
+        Map<String, Set<String>> modelTablesMap = Maps.newHashMap();
+
+        checkResult.getModels().forEach((modelName, change) -> {
+            if (!newImportModelList.contains(modelName) || 
!change.creatable()) {
+                return;
+            }
+            Set<String> modelTables = Sets.newHashSet();
+            change.getNewItems().stream()//
+                    .filter(item -> 
item.getSchemaNode().getType().equals(MODEL_DIM)
+                            || 
item.getSchemaNode().getType().equals(MODEL_FACT))
+                    
.map(SchemaChangeCheckResult.ChangedItem::getDetail).filter(existTableList::contains)
+                    .forEach(table -> {
+                        needLoadTableSet.add(table);
+                        modelTables.add(table);
+                    });
+            modelTablesMap.put(modelName, modelTables);
+        });
+        return Pair.newPair(needLoadTableSet, modelTablesMap);
     }
 
     private void innerImportModelMetadata(String project, MultipartFile 
metadataFile, ModelImportRequest request,
             ImportModelContext context, List<Exception> exceptions) throws 
Exception {
         val schemaChangeCheckResult = checkModelMetadata(project, context, 
metadataFile);
 
-        val schemaChanges = 
schemaChangeCheckResult.getModels().entrySet().stream()//
-                .filter(entry -> 
context.getNewModels().containsValue(entry.getKey())).map(Map.Entry::getValue)
-                .collect(Collectors.toSet());
-        boolean needLoadTable = schemaChanges.stream().anyMatch(change -> 
!change.getLoadTables().isEmpty());
+        val pair = checkNewModelTables(schemaChangeCheckResult, request);
+        Set<String> needLoadTableSet = pair.getFirst();
+        Map<String, Set<String>> modelTablesMap = pair.getSecond();
+
         LoadTableResponse loadTableResponse = null;
+        boolean needLoadTable = CollectionUtils.isNotEmpty(needLoadTableSet);
         if (needLoadTable) {
-            loadTableResponse = innerLoadTables(project, schemaChanges);
+            // try load tables
+            String needLoadTableStr = String.join(",", needLoadTableSet);
+            logger.info("try load tables: [{}]", needLoadTableStr);
+            loadTableResponse = innerLoadTables(project, needLoadTableSet);
             if (CollectionUtils.isNotEmpty(loadTableResponse.getFailed())) {
                 String loadFailedTables = String.join(",", 
loadTableResponse.getFailed());
                 logger.warn("Load Table failed: [{}]", loadFailedTables);
@@ -685,10 +673,10 @@ public class MetaStoreService extends BasicService {
         for (ModelImportRequest.ModelImport modelImport : request.getModels()) 
{
             try {
                 validateModelImport(project, modelImport, 
schemaChangeCheckResult);
-                if (modelImport.getImportType() == 
ModelImportRequest.ImportType.NEW) {
-                    val modelSchemaChange = 
schemaChangeCheckResult.getModels().get(modelImport.getTargetName());
-                    if (needLoadTable && modelSchemaChange.isLoadTableAble()) {
-                        Set<String> needLoadTables = 
modelSchemaChange.getLoadTables();
+                if (modelImport.getImportType() == ImportType.NEW) {
+                    if (needLoadTable) {
+                        Set<String> needLoadTables = 
modelTablesMap.getOrDefault(modelImport.getTargetName(),
+                                Collections.emptySet());
                         if 
(!loadTableResponse.getLoaded().containsAll(needLoadTables)) {
                             logger.warn("Import model [{}] failed, skip 
import.", modelImport.getOriginalName());
                             continue;
@@ -698,7 +686,7 @@ public class MetaStoreService extends BasicService {
                     var nDataModel = 
importDataModelManager.copyForWrite(importDataModel);
                     createNewModel(nDataModel, modelImport, project, 
importIndexPlanManager);
                     importRecommendations(project, nDataModel.getUuid(), 
importDataModel.getUuid(), targetKylinConfig);
-                } else if (modelImport.getImportType() == 
ModelImportRequest.ImportType.OVERWRITE) {
+                } else if (modelImport.getImportType() == 
ImportType.OVERWRITE) {
                     val importDataModel = 
importDataModelManager.getDataModelDescByAlias(modelImport.getOriginalName());
                     val nDataModel = 
importDataModelManager.copyForWrite(importDataModel);
 
@@ -733,7 +721,7 @@ public class MetaStoreService extends BasicService {
 
         Message msg = MsgPicker.getMsg();
 
-        if (modelImport.getImportType() == 
ModelImportRequest.ImportType.OVERWRITE) {
+        if (modelImport.getImportType() == ImportType.OVERWRITE) {
             NDataModel dataModel = 
NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
                     .getDataModelDescByAlias(modelImport.getOriginalName());
 
@@ -753,7 +741,7 @@ public class MetaStoreService extends BasicService {
                         String.format(Locale.ROOT, 
msg.getUnSuitableImportType(createType), modelImport.getImportType(),
                                 modelImport.getOriginalName()));
             }
-        } else if (modelImport.getImportType() == 
ModelImportRequest.ImportType.NEW) {
+        } else if (modelImport.getImportType() == ImportType.NEW) {
 
             if 
(!org.apache.commons.lang.StringUtils.containsOnly(modelImport.getTargetName(),
                     ModelService.VALID_NAME_FOR_MODEL)) {
diff --git 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java
 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java
index dbbedc48bc..948ee05789 100644
--- 
a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java
+++ 
b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java
@@ -20,8 +20,6 @@ package org.apache.kylin.rest.service;
 
 import static org.apache.kylin.common.constant.Constants.KE_VERSION;
 import static 
org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_ID_NOT_EXIST;
-import static 
org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.SAME_CC_NAME_HAS_DIFFERENT_EXPR;
-import static 
org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_TABLE;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -39,7 +37,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
@@ -71,6 +68,7 @@ import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult;
 import org.apache.kylin.metadata.model.schema.SchemaNodeType;
@@ -105,7 +103,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 import io.kyligence.kap.guava20.shaded.common.io.ByteSource;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
@@ -755,17 +752,16 @@ public class MetaStoreServiceTest extends ServiceTestBase 
{
                 
"src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
         val multipartFile = new MockMultipartFile(file.getName(), 
file.getName(), null,
                 Files.newInputStream(file.toPath()));
-        val metadataCheckResponse = 
metaStoreService.checkModelMetadata("original_project", multipartFile, null);
+        val checkResult = 
metaStoreService.checkModelMetadata("original_project", multipartFile, null);
+        Assert.assertEquals(1, checkResult.getExistTableList().size());
+        Assert.assertEquals("SSB.CUSTOMER_NEW", 
checkResult.getExistTableList().get(0).getIdentity());
 
-        SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = 
metadataCheckResponse.getModels()
+        SchemaChangeCheckResult.ModelSchemaChange change = 
checkResult.getModels()
                 .get("missing_table_model");
-        Assert.assertNotNull(modelSchemaChange);
-
-        Assert.assertEquals(11, modelSchemaChange.getDifferences());
-        Assert.assertTrue(
-                modelSchemaChange.getMissingItems().stream().anyMatch(sc -> 
sc.getType() == SchemaNodeType.MODEL_TABLE
-                        && sc.getDetail().equals("SSB.CUSTOMER_NEW") && 
!sc.isImportable()));
-        Assert.assertTrue(modelSchemaChange.importable());
+        Assert.assertNotNull(change);
+        Assert.assertTrue(change.getMissingItems().isEmpty());
+        Assert.assertTrue(change.importable());
+        Assert.assertTrue(change.creatable());
     }
 
     @Test
@@ -1244,10 +1240,6 @@ public class MetaStoreServiceTest extends 
ServiceTestBase {
         Assert.assertNull(manager.getTableDesc("SSB.CUSTOMER_NEW"));
         metaStoreService.importModelMetadata("original_project", 
multipartFile, request);
         Assert.assertNotNull(manager.getTableDesc("SSB.CUSTOMER_NEW"));
-
-        {
-
-        }
     }
 
     @Test
@@ -1389,135 +1381,46 @@ public class MetaStoreServiceTest extends 
ServiceTestBase {
 
     @Test
     public void testMissTable() throws IOException {
+        String table = "SSB.CUSTOMER_NEW";
         val file = new File(
                 
"src/test/resources/ut_meta/schema_utils/model_missing_table_update/model_table_missing_update_model_metadata_2020_11_16_02_37_33_3182D4A7694DA64E3D725C140CF80A47.zip");
         val multipartFile = new MockMultipartFile(file.getName(), 
file.getName(), null,
                 Files.newInputStream(file.toPath()));
-        val metadataCheckResponse = 
metaStoreService.checkModelMetadata("original_project", multipartFile, null);
-
-        val modelSchemaChange = 
metadataCheckResponse.getModels().get("ssb_model");
+        val checkResult = 
metaStoreService.checkModelMetadata("original_project", multipartFile, null);
+        val modelSchemaChange = checkResult.getModels().get("ssb_model");
+        Assert.assertEquals(1, checkResult.getExistTableList().size());
+        Assert.assertEquals(table, 
checkResult.getExistTableList().get(0).getIdentity());
         Assert.assertNotNull(modelSchemaChange);
-        Assert.assertTrue(modelSchemaChange.isLoadTableAble());
-        Set<String> loadTables = modelSchemaChange.getLoadTables();
-        Assert.assertEquals(1, loadTables.size());
-        Assert.assertEquals("SSB.CUSTOMER_NEW", loadTables.iterator().next());
         Assert.assertTrue(modelSchemaChange.creatable());
         Assert.assertTrue(modelSchemaChange.importable());
         Assert.assertFalse(modelSchemaChange.overwritable());
-
-        testModelImportable(metadataCheckResponse);
-
-        {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            Mockito.doReturn(false).when(mockChange).overwritable();
-            Mockito.doReturn(false).when(mockChange).creatable();
-            Mockito.doReturn(false).when(mockChange).importable();
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            
metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), 
metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertTrue(change.isLoadTableAble());
-            Assert.assertFalse(change.getLoadTables().isEmpty());
-        }
-
-        {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            
metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEWNEW"), 
metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
-        }
-
-        {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            val missItems = mockChange.getMissingItems().stream().filter(item 
-> item.getType() != MODEL_TABLE)
-                    .collect(Collectors.toList());
-            ReflectionTestUtils.setField(mockChange, "missingItems", 
missItems);
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            
metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), 
metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
-        }
-
-        {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            val newItems = mockChange.getNewItems().stream()
-                    .peek(item -> 
item.getConflictReason().setReason(SAME_CC_NAME_HAS_DIFFERENT_EXPR))
-                    .collect(Collectors.toList());
-            ReflectionTestUtils.setField(mockChange, "newItems", newItems);
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            
metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), 
metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
-        }
     }
 
-    private void testModelImportable(SchemaChangeCheckResult 
metadataCheckResponse) {
-        val modelSchemaChange = 
metadataCheckResponse.getModels().get("ssb_model");
-        {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            Mockito.doReturn(true).when(mockChange).overwritable();
-            Mockito.doReturn(true).when(mockChange).creatable();
-            Mockito.doReturn(true).when(mockChange).importable();
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            
metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), 
metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
-        }
-
+    @Test
+    public void testSearchTablesInDataSource() {
         {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            Mockito.doReturn(true).when(mockChange).overwritable();
-            Mockito.doReturn(false).when(mockChange).creatable();
-            Mockito.doReturn(false).when(mockChange).importable();
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            
metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), 
metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
+            val existTables = 
metaStoreService.searchTablesInDataSource("original_project", 
Lists.newArrayList());
+            Assert.assertTrue(existTables.isEmpty());
         }
 
         {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            Mockito.doReturn(false).when(mockChange).overwritable();
-            Mockito.doReturn(true).when(mockChange).creatable();
-            Mockito.doReturn(false).when(mockChange).importable();
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            
metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), 
metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
+            TableDesc tableDesc = new TableDesc();
+            tableDesc.setDatabase("SSB");
+            tableDesc.setName("CUSTOMER_NEW");
+            val existTables = 
metaStoreService.searchTablesInDataSource("original_project",
+                    Lists.newArrayList(tableDesc));
+            Assert.assertFalse(existTables.isEmpty());
+            Assert.assertEquals(1, existTables.size());
+            Assert.assertEquals("SSB.CUSTOMER_NEW", 
existTables.get(0).getIdentity());
         }
 
         {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            Mockito.doReturn(false).when(mockChange).overwritable();
-            Mockito.doReturn(false).when(mockChange).creatable();
-            Mockito.doReturn(true).when(mockChange).importable();
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            
metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), 
metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
+            TableDesc tableDesc = new TableDesc();
+            tableDesc.setDatabase("SSB");
+            tableDesc.setName("CUSTOMER_NEW_NEW");
+            val existTables = 
metaStoreService.searchTablesInDataSource("original_project",
+                    Lists.newArrayList(tableDesc));
+            Assert.assertTrue(existTables.isEmpty());
         }
     }
 

Reply via email to