This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit f241d43dca49719c9848510ca936ecc63c06bab1 Author: Jiale He <35652389+jial...@users.noreply.github.com> AuthorDate: Thu Nov 3 19:14:06 2022 +0800 KYLIN-5343 Add column datatype check when import model --- .../org/apache/kylin/metadata/model/TableDesc.java | 18 --- .../metadata/model/schema/ModelImportChecker.java | 31 ++-- .../model/schema/SchemaChangeCheckResult.java | 99 +++++++------ .../kylin/metadata/model/schema/SchemaUtil.java | 24 ++- .../schema/strategy/ComputedColumnStrategy.java | 37 +++-- .../schema/strategy/MultiplePartitionStrategy.java | 8 +- .../schema/strategy/OverWritableStrategy.java | 16 +- .../schema/strategy/SchemaChangeStrategy.java | 32 ++-- .../model/schema/strategy/TableColumnStrategy.java | 27 ++-- .../model/schema/strategy/TableStrategy.java | 9 +- .../schema/strategy/UnOverWritableStrategy.java | 18 ++- .../apache/kylin/metadata/model/TableDescTest.java | 55 ------- .../metadata/model/schema/SchemaUtilTest.java | 14 ++ .../localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json | 14 +- .../kylin/rest/service/MetaStoreService.java | 158 ++++++++++---------- .../kylin/rest/service/MetaStoreServiceTest.java | 161 ++++----------------- 16 files changed, 316 insertions(+), 405 deletions(-) diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java index 33231ff7b9..1202ba6b07 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java @@ -28,7 +28,6 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -307,23 +306,6 @@ public class TableDesc extends RootPersistentEntity implements Serializable, ISo return null; } - public Pair<Set<ColumnDesc>, Set<ColumnDesc>> findColumns(Set<ColumnDesc> columnDescSet) { - Set<ColumnDesc> existColSet = Sets.newHashSet(); - Set<ColumnDesc> notExistColSet = Sets.newHashSet(); - if (CollectionUtils.isEmpty(columnDescSet)) { - return Pair.newPair(existColSet, notExistColSet); - } - for (ColumnDesc searchColumnDesc : columnDescSet) { - ColumnDesc columnDesc = findColumnByName(searchColumnDesc.getName()); - if (Objects.isNull(columnDesc)) { - notExistColSet.add(searchColumnDesc); - } else { - existColSet.add(columnDesc); - } - } - return Pair.newPair(existColSet, notExistColSet); - } - @Override public String getResourcePath() { return concatResourcePath(getIdentity(), project); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java index 560ca58538..49e8977d2d 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java @@ -24,6 +24,8 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metadata.cube.model.NDataflow; +import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.model.schema.strategy.ComputedColumnStrategy; import org.apache.kylin.metadata.model.schema.strategy.MultiplePartitionStrategy; @@ -43,24 +45,29 @@ public class ModelImportChecker { new UnOverWritableStrategy(), new TableColumnStrategy(), new TableStrategy(), new OverWritableStrategy(), new MultiplePartitionStrategy()); - public static SchemaChangeCheckResult check(SchemaUtil.SchemaDifference difference, - ImportModelContext importModelContext) { + public static SchemaChangeCheckResult check(SchemaUtil.SchemaDifference diff, ImportModelContext context) { + String targetProject = context.getTargetProject(); + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + Set<String> importModels = NDataModelManager - .getInstance(importModelContext.getImportKylinConfig(), importModelContext.getTargetProject()) - .listAllModelAlias().stream().map(model -> importModelContext.getNewModels().getOrDefault(model, model)) + .getInstance(context.getImportKylinConfig(), targetProject).listAllModelAlias().stream() + .map(model -> context.getNewModels().getOrDefault(model, model)) .collect(Collectors.toSet()); - Set<String> originalModels = NDataModelManager - .getInstance(KylinConfig.getInstanceFromEnv(), importModelContext.getTargetProject()) - .listAllModelAlias(); + // all models include broken + Set<String> originalModels = NDataModelManager.getInstance(kylinConfig, targetProject).listAllModelAlias(); + // broken models + Set<String> originBrokenModels = NDataflowManager.getInstance(kylinConfig, targetProject).listAllDataflows(true) + .stream().filter(NDataflow::checkBrokenWithRelatedInfo).map(df -> df.getModel().getAlias()) + .collect(Collectors.toSet()); val result = new SchemaChangeCheckResult(); for (SchemaChangeStrategy strategy : strategies) { - result.addMissingItems(strategy.missingItems(difference, importModels, originalModels)); - result.addNewItems(strategy.newItems(difference, importModels, originalModels)); - result.addReduceItems(strategy.reduceItems(difference, importModels, originalModels)); - result.addUpdateItems(strategy.updateItems(difference, importModels, originalModels)); - result.areEqual(strategy.areEqual(difference, importModels)); + result.addMissingItems(strategy.missingItems(diff, importModels, originalModels, originBrokenModels)); + result.addNewItems(strategy.newItems(diff, importModels, originalModels, originBrokenModels)); + result.addReduceItems(strategy.reduceItems(diff, importModels, originalModels, originBrokenModels)); + result.addUpdateItems(strategy.updateItems(diff, importModels, originalModels, originBrokenModels)); + result.areEqual(strategy.areEqual(diff, importModels)); } return result; } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java index 90b116cd85..fbe1b0489d 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java @@ -24,19 +24,20 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.kylin.metadata.model.TableDesc; + import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonUnwrapped; -import com.google.common.collect.Sets; import lombok.AllArgsConstructor; import lombok.Data; -import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import lombok.val; @Data public class SchemaChangeCheckResult { @@ -44,6 +45,9 @@ public class SchemaChangeCheckResult { @JsonProperty private Map<String, ModelSchemaChange> models = new HashMap<>(); + @JsonIgnore + private List<TableDesc> existTableList = new ArrayList<>(); + @Data public static class ModelSchemaChange { private int differences; @@ -67,13 +71,13 @@ public class SchemaChangeCheckResult { @JsonProperty("importable") public boolean importable() { return Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream) - .allMatch(BaseItem::isImportable) || isLoadTableAble(); + .allMatch(BaseItem::isImportable); } @JsonProperty("creatable") public boolean creatable() { return Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream) - .allMatch(BaseItem::isCreatable) || isLoadTableAble(); + .allMatch(BaseItem::isCreatable); } @JsonProperty("") @@ -92,13 +96,12 @@ public class SchemaChangeCheckResult { .allMatch(BaseItem::isHasSameName); } - @Setter - @JsonIgnore - private boolean loadTableAble = false; - - @Getter - @JsonIgnore - private Set<String> loadTables = Sets.newHashSet(); + @JsonProperty("has_same_name_broken") + public boolean hasSameNameBroken() { + val set = Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream) + .collect(Collectors.toSet()); + return !set.isEmpty() && set.stream().allMatch(BaseItem::isHasSameNameBroken); + } } @Data @@ -117,6 +120,9 @@ public class SchemaChangeCheckResult { @JsonProperty("has_same_name") boolean hasSameName; + @JsonProperty("has_same_name_broken") + boolean hasSameNameBroken; + @JsonProperty("importable") boolean importable; @JsonProperty("creatable") @@ -142,40 +148,45 @@ public class SchemaChangeCheckResult { @NoArgsConstructor @AllArgsConstructor public static class ChangedItem extends BaseItem { - @Getter(PRIVATE) + private SchemaNode schemaNode; public ChangedItem(SchemaNodeType type, SchemaNode schemaNode, String modelAlias, UN_IMPORT_REASON reason, - String conflictItem, boolean hasSameName, boolean importable, boolean creatable, boolean overwritable) { - super(type, modelAlias, new ConflictReason(reason, conflictItem), hasSameName, importable, creatable, - overwritable); + String conflictItem, BaseItemParameter parameter) { + super(type, modelAlias, new ConflictReason(reason, conflictItem), parameter.hasSameName, + parameter.hasSameNameBroken, parameter.importable, parameter.creatable, parameter.overwritable); this.schemaNode = schemaNode; } public static ChangedItem createUnImportableSchemaNode(SchemaNodeType type, SchemaNode schemaNode, - UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName) { - return new ChangedItem(type, schemaNode, null, reason, conflictItem, hasSameName, false, false, false); + UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName, boolean hasSameNameBroken) { + return new ChangedItem(type, schemaNode, null, reason, conflictItem, + new BaseItemParameter(hasSameName, hasSameNameBroken, false, false, false)); } public static ChangedItem createUnImportableSchemaNode(SchemaNodeType type, SchemaNode schemaNode, - String modelAlias, UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName) { - return new ChangedItem(type, schemaNode, modelAlias, reason, conflictItem, hasSameName, false, false, - false); + String modelAlias, UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName, + boolean hasSameNameBroken) { + return new ChangedItem(type, schemaNode, modelAlias, reason, conflictItem, + new BaseItemParameter(hasSameName, hasSameNameBroken, false, false, false)); } public static ChangedItem createOverwritableSchemaNode(SchemaNodeType type, SchemaNode schemaNode, - boolean hasSameName) { - return new ChangedItem(type, schemaNode, null, null, null, hasSameName, true, true, true); + boolean hasSameName, boolean hasSameNameBroken) { + return new ChangedItem(type, schemaNode, null, null, null, + new BaseItemParameter(hasSameName, hasSameNameBroken, true, true, true)); } public static ChangedItem createOverwritableSchemaNode(SchemaNodeType type, SchemaNode schemaNode, - String modelAlias, boolean hasSameName) { - return new ChangedItem(type, schemaNode, modelAlias, null, null, hasSameName, true, true, true); + String modelAlias, boolean hasSameName, boolean hasSameNameBroken) { + return new ChangedItem(type, schemaNode, modelAlias, null, null, + new BaseItemParameter(hasSameName, hasSameNameBroken, true, true, true)); } public static ChangedItem createCreatableSchemaNode(SchemaNodeType type, SchemaNode schemaNode, - boolean hasSameName) { - return new ChangedItem(type, schemaNode, null, null, null, hasSameName, true, true, false); + boolean hasSameName, boolean hasSameNameBroken) { + return new ChangedItem(type, schemaNode, null, null, null, + new BaseItemParameter(hasSameName, hasSameNameBroken, true, true, false)); } public String getModelAlias() { @@ -219,26 +230,22 @@ public class SchemaChangeCheckResult { return getDetail(secondSchemaNode); } - public UpdatedItem(SchemaNode firstSchemaNode, SchemaNode secondSchemaNode, String modelAlias, - UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName, boolean importable, - boolean creatable, boolean overwritable) { - super(secondSchemaNode.getType(), modelAlias, new ConflictReason(reason, conflictItem), hasSameName, - importable, creatable, overwritable); - this.firstSchemaNode = firstSchemaNode; - this.secondSchemaNode = secondSchemaNode; + public UpdatedItem(SchemaNode first, SchemaNode second, String modelAlias, UN_IMPORT_REASON reason, + String conflictItem, BaseItemParameter parameter) { + super(second.getType(), modelAlias, new ConflictReason(reason, conflictItem), parameter.hasSameName, + parameter.hasSameNameBroken, parameter.importable, parameter.creatable, parameter.overwritable); + this.firstSchemaNode = first; + this.secondSchemaNode = second; } public static UpdatedItem getSchemaUpdate(SchemaNode first, SchemaNode second, String modelAlias, - UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName, boolean importable, - boolean creatable, boolean overwritable) { - return new UpdatedItem(first, second, modelAlias, reason, conflictItem, hasSameName, importable, creatable, - overwritable); + UN_IMPORT_REASON reason, String conflictItem, BaseItemParameter parameter) { + return new UpdatedItem(first, second, modelAlias, reason, conflictItem, parameter); } public static UpdatedItem getSchemaUpdate(SchemaNode first, SchemaNode second, String modelAlias, - boolean hasSameName, boolean importable, boolean creatable, boolean overwritable) { - return getSchemaUpdate(first, second, modelAlias, UN_IMPORT_REASON.NONE, null, hasSameName, importable, - creatable, overwritable); + BaseItemParameter parameter) { + return getSchemaUpdate(first, second, modelAlias, UN_IMPORT_REASON.NONE, null, parameter); } } @@ -302,4 +309,14 @@ public class SchemaChangeCheckResult { MISSING_TABLE, // NONE; } + + @Data + @AllArgsConstructor + public static class BaseItemParameter { + private boolean hasSameName; + private boolean hasSameNameBroken; + private boolean importable; + private boolean creatable; + private boolean overwritable; + } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java index e90867b965..24b2512a49 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java @@ -25,15 +25,15 @@ import java.util.function.Function; import java.util.stream.Collectors; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.JoinTableDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.model.TableRef; import org.apache.kylin.metadata.cube.model.IndexPlan; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.model.NTableMetadataManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.model.TableRef; import com.google.common.collect.Lists; @@ -48,6 +48,22 @@ import lombok.val; public class SchemaUtil { + public static SchemaDifference diff(String project, KylinConfig sourceConfig, KylinConfig targetConfig, + List<TableDesc> incrTableDescList) { + val sourceGraph = dependencyGraph(project, sourceConfig, incrTableDescList); + val targetGraph = dependencyGraph(project, targetConfig); + return new SchemaDifference(sourceGraph, targetGraph); + } + + public static Graph<SchemaNode> dependencyGraph(String project, KylinConfig config, + List<TableDesc> incrTableDescList) { + val tableManager = NTableMetadataManager.getInstance(config, project); + val planManager = NIndexPlanManager.getInstance(config, project); + List<TableDesc> tableDescs = Lists.newArrayList(tableManager.listAllTables()); + tableDescs.addAll(incrTableDescList); + return dependencyGraph(tableDescs, planManager.listAllIndexPlans()); + } + public static SchemaDifference diff(String project, KylinConfig sourceConfig, KylinConfig targetConfig) { val sourceGraph = dependencyGraph(project, sourceConfig); val targetGraph = dependencyGraph(project, targetConfig); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java index 6b3af9e78a..8274da560b 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java @@ -48,7 +48,7 @@ public class ComputedColumnStrategy implements SchemaChangeStrategy { @Override public List<SchemaChangeCheckResult.ChangedItem> newItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { + Set<String> originalModels, Set<String> originalBrokenModels) { List<SchemaNode> allComputedColumns = difference.getSourceGraph().nodes().stream() .filter(schemaNode -> supportedSchemaNodeTypes().contains(schemaNode.getType())) .collect(Collectors.toList()); @@ -59,7 +59,7 @@ public class ComputedColumnStrategy implements SchemaChangeStrategy { if (hasComputedColumnNameWithDifferentExpression(entry.getValue(), allComputedColumns)) { return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode( entry.getKey().getType(), entry.getValue(), SAME_CC_NAME_HAS_DIFFERENT_EXPR, null, - hasSameName(modelAlias, originalModels))); + hasSameName(modelAlias, originalModels), hasSameWithBroken(modelAlias, originalBrokenModels))); } // different cc name with same expression @@ -67,21 +67,25 @@ public class ComputedColumnStrategy implements SchemaChangeStrategy { if (optional.isPresent()) { return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode( entry.getKey().getType(), entry.getValue(), DIFFERENT_CC_NAME_HAS_SAME_EXPR, - optional.get().getDetail(), hasSameName(modelAlias, originalModels))); + optional.get().getDetail(), hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels))); } if (overwritable(importModels, originalModels, modelAlias)) { return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode( - entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels))); + entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels))); } else { return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode( - entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels))); + entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels))); } } @Override public List<SchemaChangeCheckResult.UpdatedItem> updateItemFunction(SchemaUtil.SchemaDifference difference, - MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels) { + MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels, + Set<String> originalBrokenModels) { List<SchemaNode> allComputedColumns = difference.getSourceGraph().nodes().stream() .filter(schemaNode -> supportedSchemaNodeTypes().contains(schemaNode.getType())) .collect(Collectors.toList()); @@ -90,36 +94,43 @@ public class ComputedColumnStrategy implements SchemaChangeStrategy { String modelAlias = diff.rightValue().getSubject(); // same cc name with different expression if (hasComputedColumnNameWithDifferentExpression(schemaNode, allComputedColumns)) { + val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels), false, false, false); return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(), - diff.rightValue(), modelAlias, SAME_CC_NAME_HAS_DIFFERENT_EXPR, null, - hasSameName(modelAlias, originalModels), false, false, false)); + diff.rightValue(), modelAlias, SAME_CC_NAME_HAS_DIFFERENT_EXPR, null, parameter)); } // different cc name with same expression val optional = hasExpressionWithDifferentComputedColumn(schemaNode, allComputedColumns); if (optional.isPresent()) { + val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels), false, false, false); return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(), diff.rightValue(), modelAlias, DIFFERENT_CC_NAME_HAS_SAME_EXPR, optional.get().getDetail(), - hasSameName(modelAlias, originalModels), false, false, false)); + parameter)); } boolean overwritable = overwritable(importModels, originalModels, modelAlias); + val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels), true, true, overwritable); return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(), - diff.rightValue(), modelAlias, hasSameName(modelAlias, originalModels), true, true, overwritable)); + diff.rightValue(), modelAlias, parameter)); } @Override public List<SchemaChangeCheckResult.ChangedItem> reduceItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { + Set<String> originalModels, Set<String> originalBrokenModels) { String modelAlias = entry.getValue().getSubject(); boolean overwritable = overwritable(importModels, originalModels, modelAlias); if (overwritable) { return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode( - entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels))); + entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels))); } else { return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode( - entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels))); + entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels))); } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java index 17c63fdec3..b53420bfd4 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java @@ -31,6 +31,7 @@ import org.apache.kylin.metadata.model.schema.SchemaUtil; import org.apache.kylin.metadata.model.util.MultiPartitionUtil; import io.kyligence.kap.guava20.shaded.common.collect.MapDifference; +import lombok.val; public class MultiplePartitionStrategy extends UnOverWritableStrategy { @Override @@ -40,7 +41,8 @@ public class MultiplePartitionStrategy extends UnOverWritableStrategy { @Override public List<SchemaChangeCheckResult.UpdatedItem> updateItemFunction(SchemaUtil.SchemaDifference difference, - MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels) { + MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels, + Set<String> originalBrokenModels) { String modelAlias = diff.rightValue().getSubject(); boolean overwritable = overwritable(diff); @@ -66,8 +68,10 @@ public class MultiplePartitionStrategy extends UnOverWritableStrategy { } } + val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels), true, true, overwritable); return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(), - diff.rightValue(), modelAlias, hasSameName(modelAlias, originalModels), true, true, overwritable)); + diff.rightValue(), modelAlias, parameter)); } /** diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java index 998b3c201a..6ccf37feda 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java @@ -40,27 +40,29 @@ public class OverWritableStrategy implements SchemaChangeStrategy { @Override public List<SchemaChangeCheckResult.ChangedItem> newItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { - return createSchemaChange(difference, entry, importModels, originalModels); + Set<String> originalModels, Set<String> originalBrokenModels) { + return createSchemaChange(difference, entry, importModels, originalModels, originalBrokenModels); } @Override public List<SchemaChangeCheckResult.ChangedItem> reduceItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { - return createSchemaChange(difference, entry, importModels, originalModels); + Set<String> originalModels, Set<String> originalBrokenModels) { + return createSchemaChange(difference, entry, importModels, originalModels, originalBrokenModels); } private List<SchemaChangeCheckResult.ChangedItem> createSchemaChange(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { + Set<String> originalModels, Set<String> originalBrokenModels) { String modelAlias = entry.getValue().getSubject(); if (overwritable(importModels, originalModels, modelAlias)) { return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode( - entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels))); + entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels))); } else { return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode( - entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels))); + entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels))); } } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java index ce77f66bf1..30c182d440 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java @@ -33,64 +33,68 @@ import org.apache.kylin.metadata.model.schema.SchemaUtil; import io.kyligence.kap.guava20.shaded.common.collect.MapDifference; import io.kyligence.kap.guava20.shaded.common.graph.Graph; import io.kyligence.kap.guava20.shaded.common.graph.Graphs; +import lombok.val; public interface SchemaChangeStrategy { List<SchemaNodeType> supportedSchemaNodeTypes(); default List<SchemaChangeCheckResult.ChangedItem> missingItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { + Set<String> originalModels, Set<String> originalBrokenModels) { return Collections.emptyList(); } default List<SchemaChangeCheckResult.ChangedItem> missingItems(SchemaUtil.SchemaDifference difference, - Set<String> importModels, Set<String> originalModels) { + Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) { return Collections.emptyList(); } default List<SchemaChangeCheckResult.ChangedItem> newItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { + Set<String> originalModels, Set<String> originalBrokenModels) { return Collections.emptyList(); } default List<SchemaChangeCheckResult.ChangedItem> newItems(SchemaUtil.SchemaDifference difference, - Set<String> importModels, Set<String> originalModels) { + Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) { return difference.getNodeDiff().entriesOnlyOnRight().entrySet().stream() .filter(entry -> supportedSchemaNodeTypes().contains(entry.getKey().getType())) - .map(entry -> newItemFunction(difference, entry, importModels, originalModels)) + .map(entry -> newItemFunction(difference, entry, importModels, originalModels, originalBrokenModels)) .flatMap(Collection::stream).filter(schemaChange -> importModels.contains(schemaChange.getModelAlias())) .collect(Collectors.toList()); } default List<SchemaChangeCheckResult.UpdatedItem> updateItemFunction(SchemaUtil.SchemaDifference difference, - MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels) { + MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels, + Set<String> originalBrokenModels) { String modelAlias = diff.rightValue().getSubject(); boolean overwritable = overwritable(importModels, originalModels, modelAlias); + val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels), true, true, overwritable); return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(), - diff.rightValue(), modelAlias, hasSameName(modelAlias, originalModels), true, true, overwritable)); + diff.rightValue(), modelAlias, parameter)); } default List<SchemaChangeCheckResult.UpdatedItem> updateItems(SchemaUtil.SchemaDifference difference, - Set<String> importModels, Set<String> originalModels) { + Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) { return difference.getNodeDiff().entriesDiffering().values().stream() .filter(entry -> supportedSchemaNodeTypes().contains(entry.leftValue().getType())) - .map(diff -> updateItemFunction(difference, diff, importModels, originalModels)) + .map(diff -> updateItemFunction(difference, diff, importModels, originalModels, originalBrokenModels)) .flatMap(Collection::stream).filter(schemaChange -> importModels.contains(schemaChange.getModelAlias())) .collect(Collectors.toList()); } default List<SchemaChangeCheckResult.ChangedItem> reduceItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { + Set<String> originalModels, Set<String> originalBrokenModels) { return Collections.emptyList(); } default List<SchemaChangeCheckResult.ChangedItem> reduceItems(SchemaUtil.SchemaDifference difference, - Set<String> importModels, Set<String> originalModels) { + Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) { return difference.getNodeDiff().entriesOnlyOnLeft().entrySet().stream() .filter(entry -> supportedSchemaNodeTypes().contains(entry.getKey().getType())) - .map(entry -> reduceItemFunction(difference, entry, importModels, originalModels)) + .map(entry -> reduceItemFunction(difference, entry, importModels, originalModels, originalBrokenModels)) .flatMap(Collection::stream).filter(schemaChange -> importModels.contains(schemaChange.getModelAlias())) .collect(Collectors.toList()); } @@ -115,4 +119,8 @@ public interface SchemaChangeStrategy { .map(SchemaNode::getSubject).collect(Collectors.toSet()); } + default boolean hasSameWithBroken(String modelAlias, Set<String> originalBrokenModels) { + return originalBrokenModels.contains(modelAlias); + } + } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java index 2157d95a88..752f57dfed 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java @@ -35,6 +35,7 @@ import org.apache.kylin.metadata.model.schema.SchemaUtil; import io.kyligence.kap.guava20.shaded.common.collect.MapDifference; import io.kyligence.kap.guava20.shaded.common.graph.Graphs; +import lombok.val; public class TableColumnStrategy implements SchemaChangeStrategy { @Override @@ -44,10 +45,10 @@ public class TableColumnStrategy implements SchemaChangeStrategy { @Override public List<SchemaChangeCheckResult.ChangedItem> missingItems(SchemaUtil.SchemaDifference difference, - Set<String> importModels, Set<String> originalModels) { + Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) { return difference.getNodeDiff().entriesOnlyOnRight().entrySet().stream() .filter(pair -> supportedSchemaNodeTypes().contains(pair.getKey().getType())) - .map(pair -> missingItemFunction(difference, pair, importModels, originalModels)) + .map(pair -> missingItemFunction(difference, pair, importModels, originalModels, originalBrokenModels)) .flatMap(Collection::stream).filter(schemaChange -> importModels.contains(schemaChange.getModelAlias())) .collect(Collectors.toList()); } @@ -55,34 +56,40 @@ public class TableColumnStrategy implements SchemaChangeStrategy { @Override public List<SchemaChangeCheckResult.ChangedItem> missingItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { + Set<String> originalModels, Set<String> originalBrokenModels) { return reachableModel(difference.getTargetGraph(), entry.getValue()).stream() .map(modelAlias -> SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode( entry.getKey().getType(), entry.getValue(), modelAlias, USED_UNLOADED_TABLE, - entry.getValue().getDetail(), hasSameName(modelAlias, originalModels))) + entry.getValue().getDetail(), hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels))) .collect(Collectors.toList()); } @Override public List<SchemaChangeCheckResult.UpdatedItem> updateItemFunction(SchemaUtil.SchemaDifference difference, - MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels) { + MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels, + Set<String> originalBrokenModels) { return Graphs.reachableNodes(difference.getTargetGraph(), diff.rightValue()).stream() .filter(SchemaNode::isModelNode).map(SchemaNode::getSubject).distinct() - .map(modelAlias -> SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(), - diff.rightValue(), modelAlias, TABLE_COLUMN_DATATYPE_CHANGED, diff.rightValue().getDetail(), - hasSameName(modelAlias, originalModels), false, false, false)) + .map(modelAlias -> { + val parameter = new SchemaChangeCheckResult.BaseItemParameter( + hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels), false, false, false); + return SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(), diff.rightValue(), + modelAlias, TABLE_COLUMN_DATATYPE_CHANGED, diff.rightValue().getDetail(), parameter); + }) .collect(Collectors.toList()); } @Override public List<SchemaChangeCheckResult.ChangedItem> reduceItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { + Set<String> originalModels, Set<String> originalBrokenModels) { return Graphs.reachableNodes(difference.getSourceGraph(), entry.getValue()).stream() .filter(SchemaNode::isModelNode).map(SchemaNode::getSubject).distinct() .map(modelAlias -> SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode( entry.getKey().getType(), entry.getValue(), modelAlias, - hasSameName(modelAlias, originalModels))) + hasSameName(modelAlias, originalModels), hasSameWithBroken(modelAlias, originalBrokenModels))) .collect(Collectors.toList()); } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java index b727aa8faa..55746d6fcf 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java @@ -40,10 +40,10 @@ public class TableStrategy implements SchemaChangeStrategy { @Override public List<SchemaChangeCheckResult.ChangedItem> missingItems(SchemaUtil.SchemaDifference difference, - Set<String> importModels, Set<String> originalModels) { + Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) { return difference.getNodeDiff().entriesOnlyOnRight().entrySet().stream() .filter(pair -> supportedSchemaNodeTypes().contains(pair.getKey().getType())) - .map(pair -> missingItemFunction(difference, pair, importModels, originalModels)) + .map(pair -> missingItemFunction(difference, pair, importModels, originalModels, originalBrokenModels)) .flatMap(Collection::stream).filter(schemaChange -> importModels.contains(schemaChange.getModelAlias())) .collect(Collectors.toList()); } @@ -51,11 +51,12 @@ public class TableStrategy implements SchemaChangeStrategy { @Override public List<SchemaChangeCheckResult.ChangedItem> missingItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { + Set<String> originalModels, Set<String> originalBrokenModels) { return reachableModel(difference.getTargetGraph(), entry.getValue()).stream() .map(modelAlias -> SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode( entry.getKey().getType(), entry.getValue(), modelAlias, MISSING_TABLE, - entry.getValue().getDetail(), hasSameName(modelAlias, originalModels))) + entry.getValue().getDetail(), hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels))) .collect(Collectors.toList()); } } diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java index 20f85b7b24..f405c855e7 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java @@ -30,6 +30,7 @@ import org.apache.kylin.metadata.model.schema.SchemaNodeType; import org.apache.kylin.metadata.model.schema.SchemaUtil; import io.kyligence.kap.guava20.shaded.common.collect.MapDifference; +import lombok.val; public class UnOverWritableStrategy implements SchemaChangeStrategy { @@ -42,26 +43,31 @@ public class UnOverWritableStrategy implements SchemaChangeStrategy { @Override public List<SchemaChangeCheckResult.ChangedItem> newItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { + Set<String> originalModels, Set<String> originalBrokenModels) { String modelAlias = entry.getValue().getSubject(); return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode( - entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels))); + entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels))); } @Override public List<SchemaChangeCheckResult.UpdatedItem> updateItemFunction(SchemaUtil.SchemaDifference difference, - MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels) { + MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels, + Set<String> originalBrokenModels) { String modelAlias = diff.rightValue().getSubject(); + val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels), true, true, false); return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(), - diff.rightValue(), modelAlias, hasSameName(modelAlias, originalModels), true, true, false)); + diff.rightValue(), modelAlias, parameter)); } @Override public List<SchemaChangeCheckResult.ChangedItem> reduceItemFunction(SchemaUtil.SchemaDifference difference, Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels, - Set<String> originalModels) { + Set<String> originalModels, Set<String> originalBrokenModels) { String modelAlias = entry.getValue().getSubject(); return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode( - entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels))); + entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels), + hasSameWithBroken(modelAlias, originalBrokenModels))); } } diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java index 7eff5cffa9..66e6036843 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java @@ -21,7 +21,6 @@ package org.apache.kylin.metadata.model; import static org.apache.kylin.metadata.model.NTableMetadataManager.getInstance; import java.util.Locale; -import java.util.Set; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.junit.After; @@ -29,10 +28,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.collect.Sets; - -import lombok.val; - public class TableDescTest extends NLocalFileMetadataTestCase { private final String project = "default"; private NTableMetadataManager tableMetadataManager; @@ -72,54 +67,4 @@ public class TableDescTest extends NLocalFileMetadataTestCase { final TableDesc tableDesc = tableMetadataManager.getTableDesc(tableName); Assert.assertFalse(tableDesc.isRangePartition()); } - - @Test - public void testFindColumns() { - final String tableName = "DEFAULT.TEST_KYLIN_FACT"; - final TableDesc tableDesc = tableMetadataManager.getTableDesc(tableName); - ColumnDesc[] columns = tableDesc.getColumns(); - Assert.assertEquals(12, columns.length); - - { - // test search column empty - Set<ColumnDesc> searchColSet = Sets.newHashSet(); - val pair = tableDesc.findColumns(searchColSet); - Assert.assertTrue(pair.getFirst().isEmpty()); - Assert.assertTrue(pair.getSecond().isEmpty()); - } - - { - // test all founded - Set<ColumnDesc> searchColSet = Sets.newHashSet( - new ColumnDesc("1", "TRANS_ID", "bigint", "TRANS_ID", "", "", ""), - new ColumnDesc("2", "ORDER_ID", "bigint", "TRANS_ID", "", "", "")); - val pair = tableDesc.findColumns(searchColSet); - Assert.assertFalse(pair.getFirst().isEmpty()); - Assert.assertTrue(pair.getSecond().isEmpty()); - Assert.assertEquals(2, pair.getFirst().size()); - } - - { - // test part founded - Set<ColumnDesc> searchColSet = Sets.newHashSet( - new ColumnDesc("1", "TRANS_ID_1", "bigint", "TRANS_ID", "", "", ""), - new ColumnDesc("2", "ORDER_ID", "bigint", "TRANS_ID", "", "", "")); - val pair = tableDesc.findColumns(searchColSet); - Assert.assertFalse(pair.getFirst().isEmpty()); - Assert.assertFalse(pair.getSecond().isEmpty()); - Assert.assertEquals(1, pair.getFirst().size()); - Assert.assertEquals(1, pair.getSecond().size()); - } - - { - // test part founded - Set<ColumnDesc> searchColSet = Sets.newHashSet( - new ColumnDesc("1", "TRANS_ID_1", "bigint", "TRANS_ID", "", "", ""), - new ColumnDesc("2", "ORDER_ID_1", "bigint", "TRANS_ID", "", "", "")); - val pair = tableDesc.findColumns(searchColSet); - Assert.assertTrue(pair.getFirst().isEmpty()); - Assert.assertFalse(pair.getSecond().isEmpty()); - Assert.assertEquals(2, pair.getSecond().size()); - } - } } diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java index 4b43a9e538..61a605201f 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java @@ -42,6 +42,8 @@ import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.model.ModelJoinRelationTypeEnum; +import org.apache.kylin.metadata.model.NTableMetadataManager; +import org.apache.kylin.metadata.model.TableDesc; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -49,6 +51,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.kyligence.kap.guava20.shaded.common.collect.Sets; @@ -555,6 +558,17 @@ public class SchemaUtilTest extends NLocalFileMetadataTestCase { && schemaChange.getConflictReason() .getReason() == SchemaChangeCheckResult.UN_IMPORT_REASON.MISSING_TABLE && schemaChange.getConflictReason().getConflictItem().equals("SSB.CUSTOMER_NEW"))); + + NTableMetadataManager manager = NTableMetadataManager.getInstance(getTestConfig(), getTargetProject()); + TableDesc tableDesc = manager.getTableDesc("SSB.CUSTOMER"); + tableDesc.setName("CUSTOMER_NEW"); + tableDesc.init(getTargetProject()); + val diff = SchemaUtil.diff(getTargetProject(), KylinConfig.getInstanceFromEnv(), + importModelContext.getTargetKylinConfig(), Lists.newArrayList(tableDesc)); + val checkResult = ModelImportChecker.check(diff, importModelContext); + Assert.assertFalse(checkResult.getModels().isEmpty()); + val change = checkResult.getModels().get(getTargetModel()); + Assert.assertTrue(change.getMissingItems().isEmpty()); } @Test diff --git a/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json b/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json index 88ecdea540..076c6e26ee 100644 --- a/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json +++ b/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json @@ -9,31 +9,31 @@ }, { "id" : "2", "name" : "C_NAME", - "datatype" : "varchar(25)" + "datatype": "varchar(4096)" }, { "id" : "3", "name" : "C_ADDRESS", - "datatype" : "varchar(40)" + "datatype": "varchar(4096)" }, { "id" : "4", "name" : "C_CITY", - "datatype" : "varchar(10)" + "datatype": "varchar(4096)" }, { "id" : "5", "name" : "C_NATION", - "datatype" : "varchar(15)" + "datatype": "varchar(4096)" }, { "id" : "6", "name" : "C_REGION", - "datatype" : "varchar(12)" + "datatype": "varchar(4096)" }, { "id" : "7", "name" : "C_PHONE", - "datatype" : "varchar(15)" + "datatype": "varchar(4096)" }, { "id" : "8", "name" : "C_MKTSEGMENT", - "datatype" : "varchar(10)" + "datatype": "varchar(4096)" } ], "database" : "SSB", "last_modified" : 1457444146362, diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java index 21097bcd85..32d3f93901 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java @@ -28,10 +28,8 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_ import static org.apache.kylin.common.persistence.ResourceStore.METASTORE_UUID_TAG; import static org.apache.kylin.common.persistence.ResourceStore.VERSION_FILE; import static org.apache.kylin.metadata.model.schema.ImportModelContext.MODEL_REC_PATH; -import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.DIFFERENT_CC_NAME_HAS_SAME_EXPR; -import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.SAME_CC_NAME_HAS_DIFFERENT_EXPR; -import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.TABLE_COLUMN_DATATYPE_CHANGED; -import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_TABLE; +import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_DIM; +import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_FACT; import java.io.ByteArrayOutputStream; import java.io.File; @@ -39,8 +37,8 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; @@ -72,13 +70,13 @@ import org.apache.kylin.common.persistence.metadata.MetadataStore; import org.apache.kylin.common.persistence.transaction.UnitOfWork; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.MetadataChecker; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.metadata.cube.model.IndexEntity; import org.apache.kylin.metadata.cube.model.IndexPlan; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; import org.apache.kylin.metadata.cube.model.RuleBasedIndex; -import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.MultiPartitionDesc; import org.apache.kylin.metadata.model.NDataModel; @@ -99,6 +97,7 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.aspect.Transaction; import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum; import org.apache.kylin.rest.request.ModelImportRequest; +import org.apache.kylin.rest.request.ModelImportRequest.ImportType; import org.apache.kylin.rest.request.UpdateRuleBasedCuboidRequest; import org.apache.kylin.rest.response.LoadTableResponse; import org.apache.kylin.rest.response.ModelPreviewResponse; @@ -137,9 +136,6 @@ public class MetaStoreService extends BasicService { private static final Pattern MD5_PATTERN = Pattern.compile(".*([a-fA-F\\d]{32})\\.zip"); private static final String RULE_SCHEDULER_DATA_KEY = "kylin.index.rule-scheduler-data"; - private static final Set<SchemaChangeCheckResult.UN_IMPORT_REASON> UN_IMPORT_REASONS = Sets.newHashSet( - SAME_CC_NAME_HAS_DIFFERENT_EXPR, DIFFERENT_CC_NAME_HAS_SAME_EXPR, TABLE_COLUMN_DATATYPE_CHANGED); - @Autowired public AclEvaluate aclEvaluate; @@ -378,12 +374,12 @@ public class MetaStoreService extends BasicService { if (request != null) { val newModels = request.getModels().stream() - .filter(modelImport -> modelImport.getImportType() == ModelImportRequest.ImportType.NEW) + .filter(modelImport -> modelImport.getImportType() == ImportType.NEW) .collect(Collectors.toMap(ModelImportRequest.ModelImport::getOriginalName, ModelImportRequest.ModelImport::getTargetName)); val unImportModels = request.getModels().stream() - .filter(modelImport -> modelImport.getImportType() == ModelImportRequest.ImportType.UN_IMPORT) + .filter(modelImport -> modelImport.getImportType() == ImportType.UN_IMPORT) .map(ModelImportRequest.ModelImport::getOriginalName).collect(Collectors.toList()); return new ImportModelContext(targetProject, srcProject, rawResourceMap, newModels, unImportModels); @@ -418,83 +414,44 @@ public class MetaStoreService extends BasicService { public SchemaChangeCheckResult checkModelMetadata(String targetProject, ImportModelContext context, MultipartFile uploadFile) throws IOException { - Map<String, RawResource> rawResourceMap = getRawResourceFromUploadFile(uploadFile); - checkModelMetadataFile(ResourceStore.getKylinMetaStore(context.getTargetKylinConfig()).getMetadataStore(), + KylinConfig targetKylinConfig = context.getTargetKylinConfig(); + Map<String, RawResource> rawResourceMap = getRawResourceFromUploadFile(uploadFile); + checkModelMetadataFile(ResourceStore.getKylinMetaStore(targetKylinConfig).getMetadataStore(), rawResourceMap.keySet()); - SchemaUtil.SchemaDifference difference = SchemaUtil.diff(targetProject, KylinConfig.getInstanceFromEnv(), - context.getTargetKylinConfig()); - - SchemaChangeCheckResult checkResult = ModelImportChecker.check(difference, context); - - Set<String> loadAbleTables = getLoadAbleTables(targetProject, context.getTargetMissTableList()); - if (CollectionUtils.isEmpty(loadAbleTables)) { - return checkResult; - } - // mark every model loadTableAble - return checkTableLoadAble(loadAbleTables, checkResult); - } - - public SchemaChangeCheckResult checkTableLoadAble(Set<String> loadAbleTables, SchemaChangeCheckResult checkResult) { - checkResult.getModels().forEach((modelName, change) -> { - if (change.creatable() || change.importable() || change.overwritable()) { - return; - } - // Verify that tables used by the model can be fully loaded - Set<String> missedTableSet = change.getMissingItems().stream()// - .filter(item -> item.getType().equals(MODEL_TABLE)) - .map(SchemaChangeCheckResult.ChangedItem::getDetail).collect(Collectors.toSet()); - if (missedTableSet.isEmpty() || !loadAbleTables.containsAll(missedTableSet)) { - return; - } - // Verify that model has no conflicts - List<SchemaChangeCheckResult.BaseItem> items = Lists.newArrayList(); - items.addAll(change.getNewItems()); - items.addAll(change.getUpdateItems()); - boolean hasConflict = items.stream().anyMatch(item -> { - val reason = item.getConflictReason().getReason(); - return UN_IMPORT_REASONS.contains(reason); - }); - if (hasConflict) { - return; - } - change.setLoadTableAble(true); - change.getLoadTables().addAll(missedTableSet); - }); + // check missing table exists in datasource + List<TableDesc> existTableList = searchTablesInDataSource(targetProject, context.getTargetMissTableList()); + // diff (local metadata + searched tables) and import metadata + val diff = SchemaUtil.diff(targetProject, KylinConfig.getInstanceFromEnv(), targetKylinConfig, existTableList); + SchemaChangeCheckResult checkResult = ModelImportChecker.check(diff, context); + checkResult.getExistTableList().addAll(existTableList); return checkResult; } - public Set<String> getLoadAbleTables(String targetProject, List<TableDesc> missTableList) { + public List<TableDesc> searchTablesInDataSource(String targetProject, List<TableDesc> missTableList) { if (CollectionUtils.isEmpty(missTableList)) { - return Sets.newHashSet(); + return Collections.emptyList(); } ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()) .getProject(targetProject); ISourceMetadataExplorer explorer = SourceFactory.getSource(projectInstance).getSourceMetadataExplorer(); - Set<String> loadAbleList = Sets.newHashSet(); + + List<TableDesc> existTableSet = Lists.newArrayList(); for (TableDesc missTableDesc : missTableList) { try { - // get new table desc from datasource + // check datasource exist table + // no need to check column TableDesc newTableDesc = explorer .loadTableMetadata(missTableDesc.getDatabase(), missTableDesc.getName(), targetProject) .getFirst(); - // check column all exists - Set<ColumnDesc> columnDescList = Arrays.stream(missTableDesc.getColumns()) - .filter(col -> !col.isComputedColumn()).collect(Collectors.toSet()); - Set<ColumnDesc> notExistColSet = newTableDesc.findColumns(columnDescList).getSecond(); - if (CollectionUtils.isNotEmpty(notExistColSet)) { - // some column not exist in new table desc, mark table cannot load - String missCols = notExistColSet.stream().map(ColumnDesc::getName).collect(Collectors.joining(",")); - logger.warn("Can not find columns [{}] in table [{}]", missCols, newTableDesc.getIdentity()); - continue; - } - loadAbleList.add(newTableDesc.getIdentity()); + newTableDesc.init(targetProject); + existTableSet.add(newTableDesc); } catch (Exception e) { logger.warn("try load table: {} failed.", missTableDesc.getIdentity(), e); } } - return loadAbleList; + return existTableSet; } private void checkModelMetadataFile(MetadataStore metadataStore, Set<String> rawResourceList) { @@ -654,24 +611,55 @@ public class MetaStoreService extends BasicService { } } - public LoadTableResponse innerLoadTables(String project, Set<SchemaChangeCheckResult.ModelSchemaChange> changes) - throws Exception { - Set<String> loadTables = Sets.newHashSet(); - changes.forEach(change -> loadTables.addAll(change.getLoadTables())); - return tableExtService.loadDbTables(loadTables.toArray(new String[0]), project, false); + public LoadTableResponse innerLoadTables(String project, Set<String> needLoadTables) throws Exception { + return tableExtService.loadDbTables(needLoadTables.toArray(new String[0]), project, false); + } + + public Pair<Set<String>, Map<String, Set<String>>> checkNewModelTables(SchemaChangeCheckResult checkResult, + ModelImportRequest request) { + List<String> existTableList = checkResult.getExistTableList().stream().map(TableDesc::getIdentity) + .collect(Collectors.toList()); + List<String> newImportModelList = request.getModels().stream() + .filter(modelRequest -> modelRequest.getImportType() == ImportType.NEW) + .map(ModelImportRequest.ModelImport::getTargetName).collect(Collectors.toList()); + // all tables need to be loaded + Set<String> needLoadTableSet = Sets.newHashSet(); + // every model need to be loaded tables + Map<String, Set<String>> modelTablesMap = Maps.newHashMap(); + + checkResult.getModels().forEach((modelName, change) -> { + if (!newImportModelList.contains(modelName) || !change.creatable()) { + return; + } + Set<String> modelTables = Sets.newHashSet(); + change.getNewItems().stream()// + .filter(item -> item.getSchemaNode().getType().equals(MODEL_DIM) + || item.getSchemaNode().getType().equals(MODEL_FACT)) + .map(SchemaChangeCheckResult.ChangedItem::getDetail).filter(existTableList::contains) + .forEach(table -> { + needLoadTableSet.add(table); + modelTables.add(table); + }); + modelTablesMap.put(modelName, modelTables); + }); + return Pair.newPair(needLoadTableSet, modelTablesMap); } private void innerImportModelMetadata(String project, MultipartFile metadataFile, ModelImportRequest request, ImportModelContext context, List<Exception> exceptions) throws Exception { val schemaChangeCheckResult = checkModelMetadata(project, context, metadataFile); - val schemaChanges = schemaChangeCheckResult.getModels().entrySet().stream()// - .filter(entry -> context.getNewModels().containsValue(entry.getKey())).map(Map.Entry::getValue) - .collect(Collectors.toSet()); - boolean needLoadTable = schemaChanges.stream().anyMatch(change -> !change.getLoadTables().isEmpty()); + val pair = checkNewModelTables(schemaChangeCheckResult, request); + Set<String> needLoadTableSet = pair.getFirst(); + Map<String, Set<String>> modelTablesMap = pair.getSecond(); + LoadTableResponse loadTableResponse = null; + boolean needLoadTable = CollectionUtils.isNotEmpty(needLoadTableSet); if (needLoadTable) { - loadTableResponse = innerLoadTables(project, schemaChanges); + // try load tables + String needLoadTableStr = String.join(",", needLoadTableSet); + logger.info("try load tables: [{}]", needLoadTableStr); + loadTableResponse = innerLoadTables(project, needLoadTableSet); if (CollectionUtils.isNotEmpty(loadTableResponse.getFailed())) { String loadFailedTables = String.join(",", loadTableResponse.getFailed()); logger.warn("Load Table failed: [{}]", loadFailedTables); @@ -685,10 +673,10 @@ public class MetaStoreService extends BasicService { for (ModelImportRequest.ModelImport modelImport : request.getModels()) { try { validateModelImport(project, modelImport, schemaChangeCheckResult); - if (modelImport.getImportType() == ModelImportRequest.ImportType.NEW) { - val modelSchemaChange = schemaChangeCheckResult.getModels().get(modelImport.getTargetName()); - if (needLoadTable && modelSchemaChange.isLoadTableAble()) { - Set<String> needLoadTables = modelSchemaChange.getLoadTables(); + if (modelImport.getImportType() == ImportType.NEW) { + if (needLoadTable) { + Set<String> needLoadTables = modelTablesMap.getOrDefault(modelImport.getTargetName(), + Collections.emptySet()); if (!loadTableResponse.getLoaded().containsAll(needLoadTables)) { logger.warn("Import model [{}] failed, skip import.", modelImport.getOriginalName()); continue; @@ -698,7 +686,7 @@ public class MetaStoreService extends BasicService { var nDataModel = importDataModelManager.copyForWrite(importDataModel); createNewModel(nDataModel, modelImport, project, importIndexPlanManager); importRecommendations(project, nDataModel.getUuid(), importDataModel.getUuid(), targetKylinConfig); - } else if (modelImport.getImportType() == ModelImportRequest.ImportType.OVERWRITE) { + } else if (modelImport.getImportType() == ImportType.OVERWRITE) { val importDataModel = importDataModelManager.getDataModelDescByAlias(modelImport.getOriginalName()); val nDataModel = importDataModelManager.copyForWrite(importDataModel); @@ -733,7 +721,7 @@ public class MetaStoreService extends BasicService { Message msg = MsgPicker.getMsg(); - if (modelImport.getImportType() == ModelImportRequest.ImportType.OVERWRITE) { + if (modelImport.getImportType() == ImportType.OVERWRITE) { NDataModel dataModel = NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project) .getDataModelDescByAlias(modelImport.getOriginalName()); @@ -753,7 +741,7 @@ public class MetaStoreService extends BasicService { String.format(Locale.ROOT, msg.getUnSuitableImportType(createType), modelImport.getImportType(), modelImport.getOriginalName())); } - } else if (modelImport.getImportType() == ModelImportRequest.ImportType.NEW) { + } else if (modelImport.getImportType() == ImportType.NEW) { if (!org.apache.commons.lang.StringUtils.containsOnly(modelImport.getTargetName(), ModelService.VALID_NAME_FOR_MODEL)) { diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java index dbbedc48bc..948ee05789 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java @@ -20,8 +20,6 @@ package org.apache.kylin.rest.service; import static org.apache.kylin.common.constant.Constants.KE_VERSION; import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_ID_NOT_EXIST; -import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.SAME_CC_NAME_HAS_DIFFERENT_EXPR; -import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_TABLE; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -39,7 +37,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Random; -import java.util.Set; import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; @@ -71,6 +68,7 @@ import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.model.PartitionDesc; +import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult; import org.apache.kylin.metadata.model.schema.SchemaNodeType; @@ -105,7 +103,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import io.kyligence.kap.guava20.shaded.common.io.ByteSource; import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore; @@ -755,17 +752,16 @@ public class MetaStoreServiceTest extends ServiceTestBase { "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, Files.newInputStream(file.toPath())); - val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); + val checkResult = metaStoreService.checkModelMetadata("original_project", multipartFile, null); + Assert.assertEquals(1, checkResult.getExistTableList().size()); + Assert.assertEquals("SSB.CUSTOMER_NEW", checkResult.getExistTableList().get(0).getIdentity()); - SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() + SchemaChangeCheckResult.ModelSchemaChange change = checkResult.getModels() .get("missing_table_model"); - Assert.assertNotNull(modelSchemaChange); - - Assert.assertEquals(11, modelSchemaChange.getDifferences()); - Assert.assertTrue( - modelSchemaChange.getMissingItems().stream().anyMatch(sc -> sc.getType() == SchemaNodeType.MODEL_TABLE - && sc.getDetail().equals("SSB.CUSTOMER_NEW") && !sc.isImportable())); - Assert.assertTrue(modelSchemaChange.importable()); + Assert.assertNotNull(change); + Assert.assertTrue(change.getMissingItems().isEmpty()); + Assert.assertTrue(change.importable()); + Assert.assertTrue(change.creatable()); } @Test @@ -1244,10 +1240,6 @@ public class MetaStoreServiceTest extends ServiceTestBase { Assert.assertNull(manager.getTableDesc("SSB.CUSTOMER_NEW")); metaStoreService.importModelMetadata("original_project", multipartFile, request); Assert.assertNotNull(manager.getTableDesc("SSB.CUSTOMER_NEW")); - - { - - } } @Test @@ -1389,135 +1381,46 @@ public class MetaStoreServiceTest extends ServiceTestBase { @Test public void testMissTable() throws IOException { + String table = "SSB.CUSTOMER_NEW"; val file = new File( "src/test/resources/ut_meta/schema_utils/model_missing_table_update/model_table_missing_update_model_metadata_2020_11_16_02_37_33_3182D4A7694DA64E3D725C140CF80A47.zip"); val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, Files.newInputStream(file.toPath())); - val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); - - val modelSchemaChange = metadataCheckResponse.getModels().get("ssb_model"); + val checkResult = metaStoreService.checkModelMetadata("original_project", multipartFile, null); + val modelSchemaChange = checkResult.getModels().get("ssb_model"); + Assert.assertEquals(1, checkResult.getExistTableList().size()); + Assert.assertEquals(table, checkResult.getExistTableList().get(0).getIdentity()); Assert.assertNotNull(modelSchemaChange); - Assert.assertTrue(modelSchemaChange.isLoadTableAble()); - Set<String> loadTables = modelSchemaChange.getLoadTables(); - Assert.assertEquals(1, loadTables.size()); - Assert.assertEquals("SSB.CUSTOMER_NEW", loadTables.iterator().next()); Assert.assertTrue(modelSchemaChange.creatable()); Assert.assertTrue(modelSchemaChange.importable()); Assert.assertFalse(modelSchemaChange.overwritable()); - - testModelImportable(metadataCheckResponse); - - { - val mockChange = Mockito.spy(modelSchemaChange); - mockChange.setLoadTableAble(false); - mockChange.setLoadTables(Sets.newHashSet()); - Mockito.doReturn(false).when(mockChange).overwritable(); - Mockito.doReturn(false).when(mockChange).creatable(); - Mockito.doReturn(false).when(mockChange).importable(); - metadataCheckResponse.getModels().put("ssb_model", mockChange); - metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); - val change = metadataCheckResponse.getModels().get("ssb_model"); - Assert.assertTrue(change.isLoadTableAble()); - Assert.assertFalse(change.getLoadTables().isEmpty()); - } - - { - val mockChange = Mockito.spy(modelSchemaChange); - mockChange.setLoadTableAble(false); - mockChange.setLoadTables(Sets.newHashSet()); - metadataCheckResponse.getModels().put("ssb_model", mockChange); - metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEWNEW"), metadataCheckResponse); - val change = metadataCheckResponse.getModels().get("ssb_model"); - Assert.assertFalse(change.isLoadTableAble()); - Assert.assertTrue(change.getLoadTables().isEmpty()); - } - - { - val mockChange = Mockito.spy(modelSchemaChange); - mockChange.setLoadTableAble(false); - mockChange.setLoadTables(Sets.newHashSet()); - val missItems = mockChange.getMissingItems().stream().filter(item -> item.getType() != MODEL_TABLE) - .collect(Collectors.toList()); - ReflectionTestUtils.setField(mockChange, "missingItems", missItems); - metadataCheckResponse.getModels().put("ssb_model", mockChange); - metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); - val change = metadataCheckResponse.getModels().get("ssb_model"); - Assert.assertFalse(change.isLoadTableAble()); - Assert.assertTrue(change.getLoadTables().isEmpty()); - } - - { - val mockChange = Mockito.spy(modelSchemaChange); - mockChange.setLoadTableAble(false); - mockChange.setLoadTables(Sets.newHashSet()); - val newItems = mockChange.getNewItems().stream() - .peek(item -> item.getConflictReason().setReason(SAME_CC_NAME_HAS_DIFFERENT_EXPR)) - .collect(Collectors.toList()); - ReflectionTestUtils.setField(mockChange, "newItems", newItems); - metadataCheckResponse.getModels().put("ssb_model", mockChange); - metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); - val change = metadataCheckResponse.getModels().get("ssb_model"); - Assert.assertFalse(change.isLoadTableAble()); - Assert.assertTrue(change.getLoadTables().isEmpty()); - } } - private void testModelImportable(SchemaChangeCheckResult metadataCheckResponse) { - val modelSchemaChange = metadataCheckResponse.getModels().get("ssb_model"); - { - val mockChange = Mockito.spy(modelSchemaChange); - mockChange.setLoadTableAble(false); - mockChange.setLoadTables(Sets.newHashSet()); - Mockito.doReturn(true).when(mockChange).overwritable(); - Mockito.doReturn(true).when(mockChange).creatable(); - Mockito.doReturn(true).when(mockChange).importable(); - metadataCheckResponse.getModels().put("ssb_model", mockChange); - metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); - val change = metadataCheckResponse.getModels().get("ssb_model"); - Assert.assertFalse(change.isLoadTableAble()); - Assert.assertTrue(change.getLoadTables().isEmpty()); - } - + @Test + public void testSearchTablesInDataSource() { { - val mockChange = Mockito.spy(modelSchemaChange); - mockChange.setLoadTableAble(false); - mockChange.setLoadTables(Sets.newHashSet()); - Mockito.doReturn(true).when(mockChange).overwritable(); - Mockito.doReturn(false).when(mockChange).creatable(); - Mockito.doReturn(false).when(mockChange).importable(); - metadataCheckResponse.getModels().put("ssb_model", mockChange); - metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); - val change = metadataCheckResponse.getModels().get("ssb_model"); - Assert.assertFalse(change.isLoadTableAble()); - Assert.assertTrue(change.getLoadTables().isEmpty()); + val existTables = metaStoreService.searchTablesInDataSource("original_project", Lists.newArrayList()); + Assert.assertTrue(existTables.isEmpty()); } { - val mockChange = Mockito.spy(modelSchemaChange); - mockChange.setLoadTableAble(false); - mockChange.setLoadTables(Sets.newHashSet()); - Mockito.doReturn(false).when(mockChange).overwritable(); - Mockito.doReturn(true).when(mockChange).creatable(); - Mockito.doReturn(false).when(mockChange).importable(); - metadataCheckResponse.getModels().put("ssb_model", mockChange); - metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); - val change = metadataCheckResponse.getModels().get("ssb_model"); - Assert.assertFalse(change.isLoadTableAble()); - Assert.assertTrue(change.getLoadTables().isEmpty()); + TableDesc tableDesc = new TableDesc(); + tableDesc.setDatabase("SSB"); + tableDesc.setName("CUSTOMER_NEW"); + val existTables = metaStoreService.searchTablesInDataSource("original_project", + Lists.newArrayList(tableDesc)); + Assert.assertFalse(existTables.isEmpty()); + Assert.assertEquals(1, existTables.size()); + Assert.assertEquals("SSB.CUSTOMER_NEW", existTables.get(0).getIdentity()); } { - val mockChange = Mockito.spy(modelSchemaChange); - mockChange.setLoadTableAble(false); - mockChange.setLoadTables(Sets.newHashSet()); - Mockito.doReturn(false).when(mockChange).overwritable(); - Mockito.doReturn(false).when(mockChange).creatable(); - Mockito.doReturn(true).when(mockChange).importable(); - metadataCheckResponse.getModels().put("ssb_model", mockChange); - metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); - val change = metadataCheckResponse.getModels().get("ssb_model"); - Assert.assertFalse(change.isLoadTableAble()); - Assert.assertTrue(change.getLoadTables().isEmpty()); + TableDesc tableDesc = new TableDesc(); + tableDesc.setDatabase("SSB"); + tableDesc.setName("CUSTOMER_NEW_NEW"); + val existTables = metaStoreService.searchTablesInDataSource("original_project", + Lists.newArrayList(tableDesc)); + Assert.assertTrue(existTables.isEmpty()); } }