This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push: new ea119412aa KYLIN-5377 Import models auto load data source tables. ea119412aa is described below commit ea119412aa082999b7c8881e4828778e1955ff81 Author: Jiale He <35652389+jial...@users.noreply.github.com> AuthorDate: Fri Oct 28 17:35:05 2022 +0800 KYLIN-5377 Import models auto load data source tables. Co-authored-by: Jiale He <jiale...@kyligence.io> --- .../org/apache/kylin/metadata/model/TableDesc.java | 20 +- .../metadata/model/schema/ImportModelContext.java | 56 ++-- .../model/schema/SchemaChangeCheckResult.java | 14 +- .../apache/kylin/metadata/model/TableDescTest.java | 55 ++++ .../model/schema/ImportModelContextTest.java | 5 +- .../metadata/model/schema/SchemaUtilTest.java | 2 +- .../localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json | 41 +++ .../rest/controller/NMetaStoreController.java | 17 +- .../rest/controller/NMetaStoreControllerTest.java | 8 +- .../kylin/rest/service/MetaStoreService.java | 171 ++++++++---- .../kylin/rest/service/MetaStoreServiceTest.java | 309 ++++++++++++++++----- .../kylin/rest/service/TableServiceTest.java | 59 ++-- 12 files changed, 553 insertions(+), 204 deletions(-) diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java index 6c4da9b82e..33231ff7b9 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java @@ -28,6 +28,7 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; @@ -36,8 +37,8 @@ import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.StringSplitter; import org.apache.kylin.metadata.MetadataConstants; -import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.streaming.KafkaConfig; import org.apache.kylin.metadata.streaming.KafkaConfigManager; @@ -306,6 +307,23 @@ public class TableDesc extends RootPersistentEntity implements Serializable, ISo return null; } + public Pair<Set<ColumnDesc>, Set<ColumnDesc>> findColumns(Set<ColumnDesc> columnDescSet) { + Set<ColumnDesc> existColSet = Sets.newHashSet(); + Set<ColumnDesc> notExistColSet = Sets.newHashSet(); + if (CollectionUtils.isEmpty(columnDescSet)) { + return Pair.newPair(existColSet, notExistColSet); + } + for (ColumnDesc searchColumnDesc : columnDescSet) { + ColumnDesc columnDesc = findColumnByName(searchColumnDesc.getName()); + if (Objects.isNull(columnDesc)) { + notExistColSet.add(searchColumnDesc); + } else { + existColSet.add(columnDesc); + } + } + return Pair.newPair(existColSet, notExistColSet); + } + @Override public String getResourcePath() { return concatResourcePath(getIdentity(), project); diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ImportModelContext.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ImportModelContext.java index d0b673f883..24fa7559ca 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ImportModelContext.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ImportModelContext.java @@ -41,10 +41,9 @@ import org.apache.kylin.common.persistence.InMemResourceStore; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.cube.model.SelectRule; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.cube.cuboid.NAggregationGroup; import org.apache.kylin.metadata.cube.model.IndexEntity; import org.apache.kylin.metadata.cube.model.IndexPlan; @@ -56,6 +55,8 @@ import org.apache.kylin.metadata.cube.model.RuleBasedIndex; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.model.NTableMetadataManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.recommendation.candidate.RawRecItem; import org.apache.kylin.metadata.recommendation.entity.DimensionRecItemV2; import org.apache.kylin.metadata.recommendation.entity.LayoutRecItemV2; @@ -94,6 +95,10 @@ public class ImportModelContext implements AutoCloseable { private final NTableMetadataManager importTableMetadataManager; private final NIndexPlanManager importIndexPlanManager; + @Getter + private final List<TableDesc> targetMissTableList; + @Getter + private final List<TableDesc> loadTableList; @Getter private final Map<String, String> newModels; private final List<String> unImportModels; @@ -133,31 +138,39 @@ public class ImportModelContext implements AutoCloseable { targetKylinConfig.setProperty("kylin.metadata.validate-computed-column", "false"); + val pairTable = getPairTable(); + targetMissTableList = pairTable.getFirst(); + loadTableList = pairTable.getSecond(); loadTable(); loadModel(); } - private void loadTable() { + private Pair<List<TableDesc>, List<TableDesc>> getPairTable() { + List<TableDesc> missTables = Lists.newArrayList(); + List<TableDesc> loadTables = Lists.newArrayList(); + List<TableDesc> tables = importTableMetadataManager.listAllTables(); for (TableDesc tableDesc : tables) { TableDesc newTable = targetTableMetadataManager.copyForWrite(tableDesc); TableDesc originalTable = targetTableMetadataManager.getTableDesc(newTable.getIdentity()); - long mvcc = -1; - if (originalTable != null) { - mvcc = originalTable.getMvcc(); - } - newTable.setMvcc(mvcc); newTable.setLastModified(System.currentTimeMillis()); - targetTableMetadataManager.saveSourceTable(newTable); + if (Objects.isNull(originalTable)) { + newTable.setMvcc(-1); + missTables.add(newTable); + } else { + newTable.setMvcc(originalTable.getMvcc()); + } + loadTables.add(newTable); + } + return Pair.newPair(missTables, loadTables); + } + + private void loadTable() { + for (TableDesc tableDesc : loadTableList) { + targetTableMetadataManager.saveSourceTable(tableDesc); } } - /** - * - * @param newDataModel - * @param importModel - * @throws IOException - */ private void createNewModel(NDataModel newDataModel, NDataModel importModel) throws IOException { newDataModel.setProject(targetProject); newDataModel.setAlias(newModels.getOrDefault(importModel.getAlias(), newDataModel.getAlias())); @@ -180,7 +193,6 @@ public class ImportModelContext implements AutoCloseable { * * @param originalDataModel model from current env * @param newDataModel model from import - * @return */ private static Map<Integer, Integer> prepareIdChangedMap(NDataModel originalDataModel, NDataModel newDataModel) { Map<Integer, Integer> idChangedMap = new HashMap<>(); @@ -237,12 +249,6 @@ public class ImportModelContext implements AutoCloseable { return idChangedMap; } - /** - * - * @param newDataModel - * @param originalDataModel - * @param hasModelOverrideProps - */ private void updateModel(NDataModel newDataModel, NDataModel originalDataModel, boolean hasModelOverrideProps) { newDataModel.setUuid(originalDataModel.getUuid()); newDataModel.setProject(targetProject); @@ -254,12 +260,6 @@ public class ImportModelContext implements AutoCloseable { targetDataModelManager.updateDataModelDesc(newDataModel); } - /** - * - * @param originalDataModel - * @param targetIndexPlan - * @param hasModelOverrideProps - */ private void updateIndexPlan(NDataModel originalDataModel, IndexPlan targetIndexPlan, boolean hasModelOverrideProps) { targetIndexPlanManger.updateIndexPlan(originalDataModel.getUuid(), copyForWrite -> { diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java index b64c1321fb..90b116cd85 100644 --- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java +++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java @@ -24,11 +24,13 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Stream; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonUnwrapped; +import com.google.common.collect.Sets; import lombok.AllArgsConstructor; import lombok.Data; @@ -65,13 +67,13 @@ public class SchemaChangeCheckResult { @JsonProperty("importable") public boolean importable() { return Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream) - .allMatch(BaseItem::isImportable); + .allMatch(BaseItem::isImportable) || isLoadTableAble(); } @JsonProperty("creatable") public boolean creatable() { return Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream) - .allMatch(BaseItem::isCreatable); + .allMatch(BaseItem::isCreatable) || isLoadTableAble(); } @JsonProperty("") @@ -89,6 +91,14 @@ public class SchemaChangeCheckResult { return Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream) .allMatch(BaseItem::isHasSameName); } + + @Setter + @JsonIgnore + private boolean loadTableAble = false; + + @Getter + @JsonIgnore + private Set<String> loadTables = Sets.newHashSet(); } @Data diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java index 66e6036843..7eff5cffa9 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java @@ -21,6 +21,7 @@ package org.apache.kylin.metadata.model; import static org.apache.kylin.metadata.model.NTableMetadataManager.getInstance; import java.util.Locale; +import java.util.Set; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.junit.After; @@ -28,6 +29,10 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Sets; + +import lombok.val; + public class TableDescTest extends NLocalFileMetadataTestCase { private final String project = "default"; private NTableMetadataManager tableMetadataManager; @@ -67,4 +72,54 @@ public class TableDescTest extends NLocalFileMetadataTestCase { final TableDesc tableDesc = tableMetadataManager.getTableDesc(tableName); Assert.assertFalse(tableDesc.isRangePartition()); } + + @Test + public void testFindColumns() { + final String tableName = "DEFAULT.TEST_KYLIN_FACT"; + final TableDesc tableDesc = tableMetadataManager.getTableDesc(tableName); + ColumnDesc[] columns = tableDesc.getColumns(); + Assert.assertEquals(12, columns.length); + + { + // test search column empty + Set<ColumnDesc> searchColSet = Sets.newHashSet(); + val pair = tableDesc.findColumns(searchColSet); + Assert.assertTrue(pair.getFirst().isEmpty()); + Assert.assertTrue(pair.getSecond().isEmpty()); + } + + { + // test all founded + Set<ColumnDesc> searchColSet = Sets.newHashSet( + new ColumnDesc("1", "TRANS_ID", "bigint", "TRANS_ID", "", "", ""), + new ColumnDesc("2", "ORDER_ID", "bigint", "TRANS_ID", "", "", "")); + val pair = tableDesc.findColumns(searchColSet); + Assert.assertFalse(pair.getFirst().isEmpty()); + Assert.assertTrue(pair.getSecond().isEmpty()); + Assert.assertEquals(2, pair.getFirst().size()); + } + + { + // test part founded + Set<ColumnDesc> searchColSet = Sets.newHashSet( + new ColumnDesc("1", "TRANS_ID_1", "bigint", "TRANS_ID", "", "", ""), + new ColumnDesc("2", "ORDER_ID", "bigint", "TRANS_ID", "", "", "")); + val pair = tableDesc.findColumns(searchColSet); + Assert.assertFalse(pair.getFirst().isEmpty()); + Assert.assertFalse(pair.getSecond().isEmpty()); + Assert.assertEquals(1, pair.getFirst().size()); + Assert.assertEquals(1, pair.getSecond().size()); + } + + { + // test part founded + Set<ColumnDesc> searchColSet = Sets.newHashSet( + new ColumnDesc("1", "TRANS_ID_1", "bigint", "TRANS_ID", "", "", ""), + new ColumnDesc("2", "ORDER_ID_1", "bigint", "TRANS_ID", "", "", "")); + val pair = tableDesc.findColumns(searchColSet); + Assert.assertTrue(pair.getFirst().isEmpty()); + Assert.assertFalse(pair.getSecond().isEmpty()); + Assert.assertEquals(2, pair.getSecond().size()); + } + } } diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/ImportModelContextTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/ImportModelContextTest.java index 8cd432a3ef..a066863c12 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/ImportModelContextTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/ImportModelContextTest.java @@ -150,9 +150,8 @@ public class ImportModelContextTest extends NLocalFileMetadataTestCase { ResourceStore.setRS(importKylinConfig, importResourceStore); - rawResourceMap.forEach((resPath, raw) -> { - importResourceStore.putResourceWithoutCheck(resPath, raw.getByteSource(), raw.getTimestamp(), 0); - }); + rawResourceMap.forEach((resPath, raw) -> importResourceStore.putResourceWithoutCheck(resPath, + raw.getByteSource(), raw.getTimestamp(), 0)); return importKylinConfig; } diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java index 92cb7d2d53..4b43a9e538 100644 --- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java +++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java @@ -40,8 +40,8 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.msg.MsgPicker; import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.metadata.model.ModelJoinRelationTypeEnum; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; +import org.apache.kylin.metadata.model.ModelJoinRelationTypeEnum; import org.junit.After; import org.junit.Assert; import org.junit.Before; diff --git a/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json b/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json new file mode 100644 index 0000000000..88ecdea540 --- /dev/null +++ b/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json @@ -0,0 +1,41 @@ +{ + "uuid" : "d70320ec-949f-44df-8bf4-92dc005dd07d", + "version" : "2.1", + "name" : "CUSTOMER_NEW", + "columns" : [ { + "id" : "1", + "name" : "C_CUSTKEY", + "datatype" : "integer" + }, { + "id" : "2", + "name" : "C_NAME", + "datatype" : "varchar(25)" + }, { + "id" : "3", + "name" : "C_ADDRESS", + "datatype" : "varchar(40)" + }, { + "id" : "4", + "name" : "C_CITY", + "datatype" : "varchar(10)" + }, { + "id" : "5", + "name" : "C_NATION", + "datatype" : "varchar(15)" + }, { + "id" : "6", + "name" : "C_REGION", + "datatype" : "varchar(12)" + }, { + "id" : "7", + "name" : "C_PHONE", + "datatype" : "varchar(15)" + }, { + "id" : "8", + "name" : "C_MKTSEGMENT", + "datatype" : "varchar(10)" + } ], + "database" : "SSB", + "last_modified" : 1457444146362, + "source_type" : 9 +} \ No newline at end of file diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NMetaStoreController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NMetaStoreController.java index fbccca969e..84649c4bcb 100644 --- a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NMetaStoreController.java +++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NMetaStoreController.java @@ -18,10 +18,12 @@ package org.apache.kylin.rest.controller; +import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON; import static org.apache.kylin.common.exception.ServerErrorCode.EMPTY_MODEL_ID; import static org.apache.kylin.common.exception.ServerErrorCode.FILE_FORMAT_ERROR; import static org.apache.kylin.common.exception.ServerErrorCode.FILE_NOT_EXIST; -import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON; +import static org.apache.kylin.rest.request.ModelImportRequest.ImportType.NEW; +import static org.apache.kylin.rest.request.ModelImportRequest.ImportType.OVERWRITE; import static org.springframework.http.MediaType.MULTIPART_FORM_DATA_VALUE; import java.io.ByteArrayInputStream; @@ -37,14 +39,14 @@ import javax.xml.bind.DatatypeConverter; import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.exception.KylinException; -import org.apache.kylin.common.util.ZipFileUtils; -import org.apache.kylin.rest.response.EnvelopeResponse; import org.apache.kylin.common.persistence.transaction.UnitOfWork; +import org.apache.kylin.common.util.ZipFileUtils; import org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult; import org.apache.kylin.rest.request.MetadataCleanupRequest; import org.apache.kylin.rest.request.ModelImportRequest; import org.apache.kylin.rest.request.ModelPreviewRequest; import org.apache.kylin.rest.request.StorageCleanupRequest; +import org.apache.kylin.rest.response.EnvelopeResponse; import org.apache.kylin.rest.response.ModelPreviewResponse; import org.apache.kylin.rest.service.MetaStoreService; import org.apache.kylin.tool.util.HashFunction; @@ -61,6 +63,8 @@ import org.springframework.web.bind.annotation.RequestPart; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.multipart.MultipartFile; +import com.google.common.collect.Lists; + import io.swagger.annotations.ApiOperation; @Controller @@ -71,6 +75,8 @@ public class NMetaStoreController extends NBasicController { @Qualifier("metaStoreService") private MetaStoreService metaStoreService; + private static final List<ModelImportRequest.ImportType> IMPORT_TYPE = Lists.newArrayList(NEW, OVERWRITE); + @ApiOperation(value = "previewModels", tags = { "MID" }) @GetMapping(value = "/previews/models") @ResponseBody @@ -133,13 +139,10 @@ public class NMetaStoreController extends NBasicController { checkProjectName(project); checkUploadFile(metadataFile); if (request.getModels().stream() - .noneMatch(modelImport -> modelImport.getImportType() == ModelImportRequest.ImportType.NEW - || modelImport.getImportType() == ModelImportRequest.ImportType.OVERWRITE)) { + .noneMatch(modelImport -> IMPORT_TYPE.contains(modelImport.getImportType()))) { throw new KylinException(EMPTY_MODEL_ID, "At least one model should be selected to import!"); } - metaStoreService.importModelMetadata(project, metadataFile, request); - return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, "", ""); } diff --git a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NMetaStoreControllerTest.java b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NMetaStoreControllerTest.java index 2999d12365..d592092295 100644 --- a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NMetaStoreControllerTest.java +++ b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NMetaStoreControllerTest.java @@ -21,8 +21,8 @@ package org.apache.kylin.rest.controller; import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON; import java.io.File; -import java.io.FileInputStream; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -30,9 +30,9 @@ import java.util.List; import javax.servlet.http.HttpServletResponse; import org.apache.kylin.common.util.JsonUtil; -import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult; +import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.ModelImportRequest; import org.apache.kylin.rest.request.ModelPreviewRequest; import org.apache.kylin.rest.service.MetaStoreService; @@ -116,7 +116,7 @@ public class NMetaStoreControllerTest extends NLocalFileMetadataTestCase { public void testUploadAndCheckModelMetadata() throws Exception { File file = new File("src/test/resources/ut_model_metadata/ut_model_matadata.zip"); MockMultipartFile multipartFile = new MockMultipartFile("file", "ut_model_matadata.zip", "text/plain", - new FileInputStream(file)); + Files.newInputStream(file.toPath())); SchemaChangeCheckResult schemaChangeCheckResult = new SchemaChangeCheckResult(); Mockito.when(metaStoreService.checkModelMetadata("default", multipartFile, null)) @@ -135,7 +135,7 @@ public class NMetaStoreControllerTest extends NLocalFileMetadataTestCase { public void testImportModelMetadata() throws Throwable { File file = new File("src/test/resources/ut_model_metadata/ut_model_matadata.zip"); MockMultipartFile multipartFile = new MockMultipartFile("file", "ut_model_matadata.zip", "text/plain", - new FileInputStream(file)); + Files.newInputStream(file.toPath())); final ModelImportRequest request = new ModelImportRequest(); List<ModelImportRequest.ModelImport> models = new ArrayList<>(); diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java index d6df59f000..21097bcd85 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java @@ -28,6 +28,10 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_ import static org.apache.kylin.common.persistence.ResourceStore.METASTORE_UUID_TAG; import static org.apache.kylin.common.persistence.ResourceStore.VERSION_FILE; import static org.apache.kylin.metadata.model.schema.ImportModelContext.MODEL_REC_PATH; +import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.DIFFERENT_CC_NAME_HAS_SAME_EXPR; +import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.SAME_CC_NAME_HAS_DIFFERENT_EXPR; +import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.TABLE_COLUMN_DATATYPE_CHANGED; +import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_TABLE; import java.io.ByteArrayOutputStream; import java.io.File; @@ -35,6 +39,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.LinkedHashMap; @@ -73,6 +78,7 @@ import org.apache.kylin.metadata.cube.model.IndexPlan; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; import org.apache.kylin.metadata.cube.model.RuleBasedIndex; +import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.MultiPartitionDesc; import org.apache.kylin.metadata.model.NDataModel; @@ -87,16 +93,20 @@ import org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult; import org.apache.kylin.metadata.model.schema.SchemaNodeType; import org.apache.kylin.metadata.model.schema.SchemaUtil; import org.apache.kylin.metadata.project.NProjectManager; +import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.metadata.query.util.QueryHisStoreUtil; import org.apache.kylin.metadata.realization.RealizationStatusEnum; import org.apache.kylin.rest.aspect.Transaction; import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum; import org.apache.kylin.rest.request.ModelImportRequest; import org.apache.kylin.rest.request.UpdateRuleBasedCuboidRequest; +import org.apache.kylin.rest.response.LoadTableResponse; import org.apache.kylin.rest.response.ModelPreviewResponse; import org.apache.kylin.rest.response.SimplifiedTablePreviewResponse; import org.apache.kylin.rest.util.AclEvaluate; import org.apache.kylin.rest.util.AclPermissionUtil; +import org.apache.kylin.source.ISourceMetadataExplorer; +import org.apache.kylin.source.SourceFactory; import org.apache.kylin.tool.routine.RoutineTool; import org.apache.kylin.tool.util.HashFunction; import org.slf4j.Logger; @@ -107,6 +117,7 @@ import org.springframework.web.multipart.MultipartFile; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import io.kyligence.kap.guava20.shaded.common.io.ByteSource; import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore; @@ -126,6 +137,9 @@ public class MetaStoreService extends BasicService { private static final Pattern MD5_PATTERN = Pattern.compile(".*([a-fA-F\\d]{32})\\.zip"); private static final String RULE_SCHEDULER_DATA_KEY = "kylin.index.rule-scheduler-data"; + private static final Set<SchemaChangeCheckResult.UN_IMPORT_REASON> UN_IMPORT_REASONS = Sets.newHashSet( + SAME_CC_NAME_HAS_DIFFERENT_EXPR, DIFFERENT_CC_NAME_HAS_SAME_EXPR, TABLE_COLUMN_DATATYPE_CHANGED); + @Autowired public AclEvaluate aclEvaluate; @@ -135,6 +149,9 @@ public class MetaStoreService extends BasicService { @Autowired public IndexPlanService indexPlanService; + @Autowired + public TableExtService tableExtService; + @Setter @Autowired(required = false) private List<ModelChangeSupporter> modelChangeSupporters = Lists.newArrayList(); @@ -409,7 +426,75 @@ public class MetaStoreService extends BasicService { SchemaUtil.SchemaDifference difference = SchemaUtil.diff(targetProject, KylinConfig.getInstanceFromEnv(), context.getTargetKylinConfig()); - return ModelImportChecker.check(difference, context); + SchemaChangeCheckResult checkResult = ModelImportChecker.check(difference, context); + + Set<String> loadAbleTables = getLoadAbleTables(targetProject, context.getTargetMissTableList()); + if (CollectionUtils.isEmpty(loadAbleTables)) { + return checkResult; + } + // mark every model loadTableAble + return checkTableLoadAble(loadAbleTables, checkResult); + } + + public SchemaChangeCheckResult checkTableLoadAble(Set<String> loadAbleTables, SchemaChangeCheckResult checkResult) { + checkResult.getModels().forEach((modelName, change) -> { + if (change.creatable() || change.importable() || change.overwritable()) { + return; + } + // Verify that tables used by the model can be fully loaded + Set<String> missedTableSet = change.getMissingItems().stream()// + .filter(item -> item.getType().equals(MODEL_TABLE)) + .map(SchemaChangeCheckResult.ChangedItem::getDetail).collect(Collectors.toSet()); + if (missedTableSet.isEmpty() || !loadAbleTables.containsAll(missedTableSet)) { + return; + } + // Verify that model has no conflicts + List<SchemaChangeCheckResult.BaseItem> items = Lists.newArrayList(); + items.addAll(change.getNewItems()); + items.addAll(change.getUpdateItems()); + boolean hasConflict = items.stream().anyMatch(item -> { + val reason = item.getConflictReason().getReason(); + return UN_IMPORT_REASONS.contains(reason); + }); + if (hasConflict) { + return; + } + change.setLoadTableAble(true); + change.getLoadTables().addAll(missedTableSet); + }); + return checkResult; + } + + public Set<String> getLoadAbleTables(String targetProject, List<TableDesc> missTableList) { + if (CollectionUtils.isEmpty(missTableList)) { + return Sets.newHashSet(); + } + ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()) + .getProject(targetProject); + ISourceMetadataExplorer explorer = SourceFactory.getSource(projectInstance).getSourceMetadataExplorer(); + Set<String> loadAbleList = Sets.newHashSet(); + for (TableDesc missTableDesc : missTableList) { + try { + // get new table desc from datasource + TableDesc newTableDesc = explorer + .loadTableMetadata(missTableDesc.getDatabase(), missTableDesc.getName(), targetProject) + .getFirst(); + // check column all exists + Set<ColumnDesc> columnDescList = Arrays.stream(missTableDesc.getColumns()) + .filter(col -> !col.isComputedColumn()).collect(Collectors.toSet()); + Set<ColumnDesc> notExistColSet = newTableDesc.findColumns(columnDescList).getSecond(); + if (CollectionUtils.isNotEmpty(notExistColSet)) { + // some column not exist in new table desc, mark table cannot load + String missCols = notExistColSet.stream().map(ColumnDesc::getName).collect(Collectors.joining(",")); + logger.warn("Can not find columns [{}] in table [{}]", missCols, newTableDesc.getIdentity()); + continue; + } + loadAbleList.add(newTableDesc.getIdentity()); + } catch (Exception e) { + logger.warn("try load table: {} failed.", missTableDesc.getIdentity(), e); + } + } + return loadAbleList; } private void checkModelMetadataFile(MetadataStore metadataStore, Set<String> rawResourceList) { @@ -431,13 +516,6 @@ public class MetaStoreService extends BasicService { return anyPath.split(File.separator)[1]; } - /** - * - * @param nDataModel - * @param modelImport - * @param project - * @param importIndexPlanManager - */ private void createNewModel(NDataModel nDataModel, ModelImportRequest.ModelImport modelImport, String project, NIndexPlanManager importIndexPlanManager) { NDataModelManager dataModelManager = getManager(NDataModelManager.class, project); @@ -460,13 +538,6 @@ public class MetaStoreService extends BasicService { dataflowManager.createDataflow(indexPlan, nDataModel.getOwner(), RealizationStatusEnum.OFFLINE); } - /** - * - * @param project - * @param nDataModel - * @param modelImport - * @param hasModelOverrideProps - */ private void updateModel(String project, NDataModel nDataModel, ModelImportRequest.ModelImport modelImport, boolean hasModelOverrideProps) { NDataModelManager dataModelManager = getManager(NDataModelManager.class, project); @@ -497,13 +568,6 @@ public class MetaStoreService extends BasicService { dataModelManager.updateDataModelDesc(nDataModel); } - /** - * - * @param project - * @param nDataModel - * @param targetIndexPlan - * @param hasModelOverrideProps - */ private void updateIndexPlan(String project, NDataModel nDataModel, IndexPlan targetIndexPlan, boolean hasModelOverrideProps) { NIndexPlanManager indexPlanManager = getManager(NIndexPlanManager.class, project); @@ -532,12 +596,6 @@ public class MetaStoreService extends BasicService { }); } - /** - * - * @param project - * @param modelSchemaChange - * @param targetIndexPlan - */ private void removeIndexes(String project, SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange, IndexPlan targetIndexPlan) { if (modelSchemaChange != null) { @@ -555,12 +613,6 @@ public class MetaStoreService extends BasicService { } } - /** - * - * @param project - * @param modelSchemaChange - * @param targetIndexPlan - */ private void addWhiteListIndex(String project, SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange, IndexPlan targetIndexPlan) { if (modelSchemaChange != null) { @@ -585,7 +637,7 @@ public class MetaStoreService extends BasicService { @Transaction(project = 0, retry = 1) public void importModelMetadata(String project, MultipartFile metadataFile, ModelImportRequest request) - throws IOException { + throws Exception { aclEvaluate.checkProjectWritePermission(project); List<Exception> exceptions = new ArrayList<>(); @@ -602,23 +654,50 @@ public class MetaStoreService extends BasicService { } } + public LoadTableResponse innerLoadTables(String project, Set<SchemaChangeCheckResult.ModelSchemaChange> changes) + throws Exception { + Set<String> loadTables = Sets.newHashSet(); + changes.forEach(change -> loadTables.addAll(change.getLoadTables())); + return tableExtService.loadDbTables(loadTables.toArray(new String[0]), project, false); + } + private void innerImportModelMetadata(String project, MultipartFile metadataFile, ModelImportRequest request, - ImportModelContext importModelContext, List<Exception> exceptions) throws IOException { - val schemaChangeCheckResult = checkModelMetadata(project, importModelContext, metadataFile); + ImportModelContext context, List<Exception> exceptions) throws Exception { + val schemaChangeCheckResult = checkModelMetadata(project, context, metadataFile); - val importDataModelManager = NDataModelManager.getInstance(importModelContext.getTargetKylinConfig(), project); - val importIndexPlanManager = NIndexPlanManager.getInstance(importModelContext.getTargetKylinConfig(), project); + val schemaChanges = schemaChangeCheckResult.getModels().entrySet().stream()// + .filter(entry -> context.getNewModels().containsValue(entry.getKey())).map(Map.Entry::getValue) + .collect(Collectors.toSet()); + boolean needLoadTable = schemaChanges.stream().anyMatch(change -> !change.getLoadTables().isEmpty()); + LoadTableResponse loadTableResponse = null; + if (needLoadTable) { + loadTableResponse = innerLoadTables(project, schemaChanges); + if (CollectionUtils.isNotEmpty(loadTableResponse.getFailed())) { + String loadFailedTables = String.join(",", loadTableResponse.getFailed()); + logger.warn("Load Table failed: [{}]", loadFailedTables); + } + } + + KylinConfig targetKylinConfig = context.getTargetKylinConfig(); + val importDataModelManager = NDataModelManager.getInstance(targetKylinConfig, project); + val importIndexPlanManager = NIndexPlanManager.getInstance(targetKylinConfig, project); for (ModelImportRequest.ModelImport modelImport : request.getModels()) { try { validateModelImport(project, modelImport, schemaChangeCheckResult); if (modelImport.getImportType() == ModelImportRequest.ImportType.NEW) { + val modelSchemaChange = schemaChangeCheckResult.getModels().get(modelImport.getTargetName()); + if (needLoadTable && modelSchemaChange.isLoadTableAble()) { + Set<String> needLoadTables = modelSchemaChange.getLoadTables(); + if (!loadTableResponse.getLoaded().containsAll(needLoadTables)) { + logger.warn("Import model [{}] failed, skip import.", modelImport.getOriginalName()); + continue; + } + } var importDataModel = importDataModelManager.getDataModelDescByAlias(modelImport.getTargetName()); var nDataModel = importDataModelManager.copyForWrite(importDataModel); - createNewModel(nDataModel, modelImport, project, importIndexPlanManager); - importRecommendations(project, nDataModel.getUuid(), importDataModel.getUuid(), - importModelContext.getTargetKylinConfig()); + importRecommendations(project, nDataModel.getUuid(), importDataModel.getUuid(), targetKylinConfig); } else if (modelImport.getImportType() == ModelImportRequest.ImportType.OVERWRITE) { val importDataModel = importDataModelManager.getDataModelDescByAlias(modelImport.getOriginalName()); val nDataModel = importDataModelManager.copyForWrite(importDataModel); @@ -640,7 +719,7 @@ public class MetaStoreService extends BasicService { addWhiteListIndex(project, modelSchemaChange, targetIndexPlan); importRecommendations(project, nDataModel.getUuid(), importDataModel.getUuid(), - importModelContext.getTargetKylinConfig()); + targetKylinConfig); } } catch (Exception e) { logger.warn("Import model {} exception", modelImport.getOriginalName(), e); @@ -698,12 +777,6 @@ public class MetaStoreService extends BasicService { } } - /** - * @param project - * @param targetModelId - * @param srcModelId - * @param kylinConfig - */ private void importRecommendations(String project, String targetModelId, String srcModelId, KylinConfig kylinConfig) throws IOException { val projectManager = getManager(NProjectManager.class); diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java index 50ea987fc5..dbbedc48bc 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java @@ -20,6 +20,8 @@ package org.apache.kylin.rest.service; import static org.apache.kylin.common.constant.Constants.KE_VERSION; import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_ID_NOT_EXIST; +import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.SAME_CC_NAME_HAS_DIFFERENT_EXPR; +import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_TABLE; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -28,6 +30,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -36,6 +39,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; @@ -65,6 +69,7 @@ import org.apache.kylin.metadata.model.MultiPartitionDesc; import org.apache.kylin.metadata.model.MultiPartitionKeyMappingImpl; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; +import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.kylin.metadata.model.PartitionDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult; @@ -74,6 +79,7 @@ import org.apache.kylin.metadata.project.ProjectInstance; import org.apache.kylin.rest.constant.Constant; import org.apache.kylin.rest.request.ModelConfigRequest; import org.apache.kylin.rest.request.ModelImportRequest; +import org.apache.kylin.rest.response.LoadTableResponse; import org.apache.kylin.rest.response.ModelPreviewResponse; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -99,6 +105,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.base.Objects; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import io.kyligence.kap.guava20.shaded.common.io.ByteSource; import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore; @@ -129,7 +136,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void setup() { createTestMetadata("src/test/resources/ut_meta/metastore_model"); overwriteSystemProp("HADOOP_USER_NAME", "root"); - ReflectionTestUtils.setField(metaStoreService, "modelChangeSupporters", Arrays.asList(modelChangeSupporter)); + ReflectionTestUtils.setField(metaStoreService, "modelChangeSupporters", + Collections.singletonList(modelChangeSupporter)); try { SecurityContextHolder.getContext().setAuthentication(authentication); jdbcTemplate = JdbcUtil.getJdbcTemplate(getTestConfig()); @@ -166,11 +174,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { Assert.assertTrue( modelPreviewResponseList.stream().anyMatch(ModelPreviewResponse::isHasMultiplePartitionValues)); - val dfMgr = modelService.getManager(NDataflowManager.class, "default"); val id = "7212bf0c-0716-4cef-b623-69c161981262"; - val dataflow = dfMgr.getDataflow(id); val idxPlanMgr = modelService.getManager(NIndexPlanManager.class, "default"); - val indexPlan = idxPlanMgr.getIndexPlan(id); idxPlanMgr.updateIndexPlan(id, updater -> { val overrideProps = new LinkedHashMap<String, String>(); @@ -365,7 +370,7 @@ public class MetaStoreServiceTest extends ServiceTestBase { RawResource rw = rawResourceMap.get(ResourceStore.VERSION_FILE); try (InputStream inputStream = rw.getByteSource().openStream()) { - Assert.assertEquals("unknown", IOUtils.toString(inputStream)); + Assert.assertEquals("unknown", IOUtils.toString(inputStream, StandardCharsets.UTF_8)); } overwriteSystemProp(KE_VERSION, "4.3.x"); @@ -378,38 +383,37 @@ public class MetaStoreServiceTest extends ServiceTestBase { rw = rawResourceMap.get(ResourceStore.VERSION_FILE); try (InputStream inputStream = rw.getByteSource().openStream()) { - Assert.assertEquals("4.3.x", IOUtils.toString(inputStream)); + Assert.assertEquals("4.3.x", IOUtils.toString(inputStream, StandardCharsets.UTF_8)); } } @Test - public void testExportNotExistsModel() throws Exception { + public void testExportNotExistsModel() { String notExistsUuid = RandomUtil.randomUUIDStr(); - thrown.expect(KylinException.class); - thrown.expectMessage(MODEL_ID_NOT_EXIST.getMsg(notExistsUuid)); - metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, Lists.newArrayList(notExistsUuid), false, false, - false); + List<String> modelList = Lists.newArrayList(notExistsUuid); + Assert.assertThrows(MODEL_ID_NOT_EXIST.getMsg(notExistsUuid), KylinException.class, + () -> metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, modelList, false, false, false)); } @Test - public void testExportBrokenModel() throws Exception { + public void testExportBrokenModel() { // broken model id String brokenModelId = "8b5a2d39-304f-4a20-a9da-942f461534d8"; - thrown.expect(KylinException.class); - thrown.expectMessage(String.format(Locale.ROOT, + String msg = String.format(Locale.ROOT, "Can’t export model \"%s\" as it’s in \"BROKEN\" status. Please re-select and try again.", - brokenModelId)); - metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, Lists.newArrayList(brokenModelId), false, false, - false); + brokenModelId); + List<String> modelList = Lists.newArrayList(brokenModelId); + Assert.assertThrows(msg, KylinException.class, + () -> metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, modelList, false, false, false)); } @Test - public void testExportEmptyModel() throws Exception { + public void testExportEmptyModel() { // empty model list - thrown.expect(KylinException.class); - thrown.expectMessage("Please select at least one model to export."); - metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, Lists.newArrayList(), false, false, false); + List<String> emptyList = Lists.newArrayList(); + Assert.assertThrows("Please select at least one model to export.", KylinException.class, + () -> metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, emptyList, false, false, false)); } private Map<String, RawResource> getRawResourceFromZipFile(InputStream inputStream) throws IOException { @@ -434,7 +438,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testCheckModelMetadataModelCCUpdate() throws IOException { val file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -455,7 +460,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testCheckModelMetadataNoChanges() throws IOException { val file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -469,7 +475,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testCheckModelMetadataModelAggUpdate() throws IOException { val file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -488,7 +495,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testCheckModelMetadataModelDimConflict() throws IOException { val file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -512,7 +520,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testCheckModelMetadataModelJoinConflict() throws IOException { val file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -545,7 +554,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testCheckModelMetadataModelFactConflict() throws IOException { val file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -570,7 +580,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testCheckModelMetadataModelColumnUpdate() throws IOException { val file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -594,7 +605,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testCheckModelMetadataModelFilterConflict() throws IOException { val file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -609,7 +621,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testCheckModelMetadataModelPartitionConflict() throws IOException { val file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -679,7 +692,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testCheckModelMetadataModelDifferentMultiplePartitionColumnWithEmptyValue() throws IOException { val file = new File( "src/test/resources/ut_meta/schema_utils/model_different_multiple_column_with_empty_partition_value/model_different_multiple_column_with_empty_partition_value_2021_01_18_11_30_10_E70AE88EBB2371A8F3FE3979B9DCBB06.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -693,7 +707,7 @@ public class MetaStoreServiceTest extends ServiceTestBase { && pair.getFirstAttributes().get("partitions") .equals(Arrays.asList(Collections.singletonList("p1"), Collections.singletonList("p2"), Collections.singletonList("p3"))) - && ((List) pair.getSecondAttributes().get("partitions")).isEmpty())); + && ((List<?>) pair.getSecondAttributes().get("partitions")).isEmpty())); } @Test @@ -739,7 +753,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testCheckModelMetadataModelMissingTable() throws IOException { val file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -750,14 +765,15 @@ public class MetaStoreServiceTest extends ServiceTestBase { Assert.assertTrue( modelSchemaChange.getMissingItems().stream().anyMatch(sc -> sc.getType() == SchemaNodeType.MODEL_TABLE && sc.getDetail().equals("SSB.CUSTOMER_NEW") && !sc.isImportable())); - Assert.assertFalse(modelSchemaChange.importable()); + Assert.assertTrue(modelSchemaChange.importable()); } @Test public void testCheckModelMetadataModelIndex() throws IOException { val file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels() @@ -770,7 +786,7 @@ public class MetaStoreServiceTest extends ServiceTestBase { .filter(sc -> sc.getType() == SchemaNodeType.WHITE_LIST_INDEX) .filter(sc -> sc.getDetail().equals("20000000001")) .filter(SchemaChangeCheckResult.BaseItem::isOverwritable).anyMatch(sc -> { - String col_orders = String.join(",", ((ArrayList<String>) sc.getAttributes().get("col_orders"))); + String col_orders = String.join(",", ((List<String>) sc.getAttributes().get("col_orders"))); return col_orders.equals( "P_LINEORDER.LO_CUSTKEY,P_LINEORDER.LO_SUPPKEY,P_LINEORDER.LO_ORDERDATE,P_LINEORDER.LO_QUANTITY,P_LINEORDER.LO_DISCOUNT,P_LINEORDER.LO_LINENUMBER,P_LINEORDER.LO_PARTKEY,P_LINEORDER.LO_ORDERKEY"); })); @@ -780,7 +796,7 @@ public class MetaStoreServiceTest extends ServiceTestBase { .filter(sc -> sc.getDetail().equals("20000000001")) .filter(SchemaChangeCheckResult.BaseItem::isOverwritable).anyMatch(sc -> { String col_orders = String.join(",", - ((ArrayList<String>) sc.getAttributes().get("col_orders"))); + ((List<String>) sc.getAttributes().get("col_orders"))); return col_orders.equals( "P_LINEORDER.LO_LINENUMBER,P_LINEORDER.LO_SUPPKEY,P_LINEORDER.LO_QUANTITY,P_LINEORDER.LO_PARTKEY,P_LINEORDER.LO_ORDERKEY,P_LINEORDER.LO_CUSTKEY,P_LINEORDER.LO_DISCOUNT,P_LINEORDER.LO_ORDERDATE"); })); @@ -790,7 +806,7 @@ public class MetaStoreServiceTest extends ServiceTestBase { .filter(sc -> sc.getDetail().equals("20000010001")) .filter(SchemaChangeCheckResult.BaseItem::isOverwritable).anyMatch(sc -> { String col_orders = String.join(",", - ((ArrayList<String>) sc.getAttributes().get("col_orders"))); + ((List<String>) sc.getAttributes().get("col_orders"))); return col_orders.equals("P_LINEORDER.LO_SUPPKEY,P_LINEORDER.LO_QUANTITY"); })); } @@ -798,29 +814,30 @@ public class MetaStoreServiceTest extends ServiceTestBase { @Test public void testCheckModelMetadataWithoutMD5Checksum() throws Exception { File file = new File("src/test/resources/ut_model_metadata/metastore_model_metadata.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); - thrown.expect(KylinException.class); - thrown.expectMessage( - "Can’t parse the metadata file. Please don’t modify the content or zip the file manually after unzip."); - metaStoreService.checkModelMetadata("default", multipartFile, null); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); + Assert.assertThrows( + "Can’t parse the metadata file. Please don’t modify the content or zip the file manually after unzip.", + KylinException.class, () -> metaStoreService.checkModelMetadata("default", multipartFile, null)); } @Test public void testCheckModelMetadataWithWrongMD5Checksum() throws Exception { File file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b1.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); - thrown.expect(KylinException.class); - thrown.expectMessage( - "Can’t parse the metadata file. Please don’t modify the content or zip the file manually after unzip."); - metaStoreService.checkModelMetadata("default", multipartFile, null); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); + Assert.assertThrows( + "Can’t parse the metadata file. Please don’t modify the content or zip the file manually after unzip.", + KylinException.class, () -> metaStoreService.checkModelMetadata("default", multipartFile, null)); } @Test public void testImportModelMetadata() throws Exception { File file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); ModelImportRequest request = new ModelImportRequest(); List<ModelImportRequest.ModelImport> models = new ArrayList<>(); models.add(new ModelImportRequest.ModelImport("model_index", "model_index", @@ -885,12 +902,11 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testImportModelMetadataWithRecInExpertModeProject() throws Exception { String id = "761215ee-3f21-4d1a-aae5-3d0d9d6ede85"; NIndexPlanManager indexPlanManager = NIndexPlanManager.getInstance(getTestConfig(), "original_project"); - indexPlanManager.updateIndexPlan(id, copyForWrite -> { - copyForWrite.setRuleBasedIndex(new RuleBasedIndex()); - }); + indexPlanManager.updateIndexPlan(id, copyForWrite -> copyForWrite.setRuleBasedIndex(new RuleBasedIndex())); String fileName = "issue_model_metadata_2022_06_17_14_54_54_F89122A7E22F485D8359616BC1C30718.zip"; File file = new File("src/test/resources/ut_model_metadata/" + fileName); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); ModelImportRequest request = new ModelImportRequest(); List<ModelImportRequest.ModelImport> models = new ArrayList<>(); models.add(new ModelImportRequest.ModelImport("model_index", "model_index", @@ -1219,27 +1235,43 @@ public class MetaStoreServiceTest extends ServiceTestBase { public void testImportModelMetadataWithUnCreatable() throws Exception { File file = new File( "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); - var multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + var multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); ModelImportRequest request = new ModelImportRequest(); - List<ModelImportRequest.ModelImport> models = new ArrayList<>(); - models.add(new ModelImportRequest.ModelImport("missing_table_model", "missing_table_model_1", - ModelImportRequest.ImportType.NEW)); - - request.setModels(models); + request.setModels(Lists.newArrayList(new ModelImportRequest.ModelImport("missing_table_model", + "missing_table_model_1", ModelImportRequest.ImportType.NEW))); + val manager = NTableMetadataManager.getInstance(getTestConfig(), "original_project"); + Assert.assertNull(manager.getTableDesc("SSB.CUSTOMER_NEW")); + metaStoreService.importModelMetadata("original_project", multipartFile, request); + Assert.assertNotNull(manager.getTableDesc("SSB.CUSTOMER_NEW")); - thrown.expectCause(new BaseMatcher<Throwable>() { - @Override - public boolean matches(Object item) { - return ((Exception) item).getMessage().contains( - "Can’t select ImportType \"NEW\" for the model \"missing_table_model_1\". Please select \"UN_IMPORT\"."); - } + { - @Override - public void describeTo(Description description) { + } + } - } - }); - metaStoreService.importModelMetadata("original_project", multipartFile, request); + @Test + public void testImportModelWithLoadTableFailed() throws Exception { + File file = new File( + "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip"); + var multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); + ModelImportRequest request = new ModelImportRequest(); + request.setModels(Lists.newArrayList(new ModelImportRequest.ModelImport("missing_table_model", + "missing_table_model_1", ModelImportRequest.ImportType.NEW))); + val manager = NTableMetadataManager.getInstance(getTestConfig(), "original_project"); + Assert.assertNull(manager.getTableDesc("SSB.CUSTOMER_NEW")); + val spyService = Mockito.spy(metaStoreService); + val tableExtService = (TableExtService) ReflectionTestUtils.getField(spyService, "tableExtService"); + val spyTableService = Mockito.spy(tableExtService); + LoadTableResponse loadTableResponse = new LoadTableResponse(); + loadTableResponse.getFailed().add("SSB.CUSTOMER_NEW"); + Mockito.doReturn(loadTableResponse).when(spyTableService).loadDbTables(new String[] { "SSB.CUSTOMER_NEW" }, + "original_project", false); + ReflectionTestUtils.setField(spyService, "tableExtService", spyTableService); + Mockito.doReturn(loadTableResponse).when(spyService).innerLoadTables(Mockito.anyString(), Mockito.anySet()); + spyService.importModelMetadata("original_project", multipartFile, request); + Assert.assertNull(manager.getTableDesc("SSB.CUSTOMER_NEW")); } @Test @@ -1355,6 +1387,140 @@ public class MetaStoreServiceTest extends ServiceTestBase { && changedItem.getDetail().equals("SSB.P_LINEORDER.LO_SUPPKEY"))); } + @Test + public void testMissTable() throws IOException { + val file = new File( + "src/test/resources/ut_meta/schema_utils/model_missing_table_update/model_table_missing_update_model_metadata_2020_11_16_02_37_33_3182D4A7694DA64E3D725C140CF80A47.zip"); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); + val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null); + + val modelSchemaChange = metadataCheckResponse.getModels().get("ssb_model"); + Assert.assertNotNull(modelSchemaChange); + Assert.assertTrue(modelSchemaChange.isLoadTableAble()); + Set<String> loadTables = modelSchemaChange.getLoadTables(); + Assert.assertEquals(1, loadTables.size()); + Assert.assertEquals("SSB.CUSTOMER_NEW", loadTables.iterator().next()); + Assert.assertTrue(modelSchemaChange.creatable()); + Assert.assertTrue(modelSchemaChange.importable()); + Assert.assertFalse(modelSchemaChange.overwritable()); + + testModelImportable(metadataCheckResponse); + + { + val mockChange = Mockito.spy(modelSchemaChange); + mockChange.setLoadTableAble(false); + mockChange.setLoadTables(Sets.newHashSet()); + Mockito.doReturn(false).when(mockChange).overwritable(); + Mockito.doReturn(false).when(mockChange).creatable(); + Mockito.doReturn(false).when(mockChange).importable(); + metadataCheckResponse.getModels().put("ssb_model", mockChange); + metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); + val change = metadataCheckResponse.getModels().get("ssb_model"); + Assert.assertTrue(change.isLoadTableAble()); + Assert.assertFalse(change.getLoadTables().isEmpty()); + } + + { + val mockChange = Mockito.spy(modelSchemaChange); + mockChange.setLoadTableAble(false); + mockChange.setLoadTables(Sets.newHashSet()); + metadataCheckResponse.getModels().put("ssb_model", mockChange); + metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEWNEW"), metadataCheckResponse); + val change = metadataCheckResponse.getModels().get("ssb_model"); + Assert.assertFalse(change.isLoadTableAble()); + Assert.assertTrue(change.getLoadTables().isEmpty()); + } + + { + val mockChange = Mockito.spy(modelSchemaChange); + mockChange.setLoadTableAble(false); + mockChange.setLoadTables(Sets.newHashSet()); + val missItems = mockChange.getMissingItems().stream().filter(item -> item.getType() != MODEL_TABLE) + .collect(Collectors.toList()); + ReflectionTestUtils.setField(mockChange, "missingItems", missItems); + metadataCheckResponse.getModels().put("ssb_model", mockChange); + metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); + val change = metadataCheckResponse.getModels().get("ssb_model"); + Assert.assertFalse(change.isLoadTableAble()); + Assert.assertTrue(change.getLoadTables().isEmpty()); + } + + { + val mockChange = Mockito.spy(modelSchemaChange); + mockChange.setLoadTableAble(false); + mockChange.setLoadTables(Sets.newHashSet()); + val newItems = mockChange.getNewItems().stream() + .peek(item -> item.getConflictReason().setReason(SAME_CC_NAME_HAS_DIFFERENT_EXPR)) + .collect(Collectors.toList()); + ReflectionTestUtils.setField(mockChange, "newItems", newItems); + metadataCheckResponse.getModels().put("ssb_model", mockChange); + metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); + val change = metadataCheckResponse.getModels().get("ssb_model"); + Assert.assertFalse(change.isLoadTableAble()); + Assert.assertTrue(change.getLoadTables().isEmpty()); + } + } + + private void testModelImportable(SchemaChangeCheckResult metadataCheckResponse) { + val modelSchemaChange = metadataCheckResponse.getModels().get("ssb_model"); + { + val mockChange = Mockito.spy(modelSchemaChange); + mockChange.setLoadTableAble(false); + mockChange.setLoadTables(Sets.newHashSet()); + Mockito.doReturn(true).when(mockChange).overwritable(); + Mockito.doReturn(true).when(mockChange).creatable(); + Mockito.doReturn(true).when(mockChange).importable(); + metadataCheckResponse.getModels().put("ssb_model", mockChange); + metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); + val change = metadataCheckResponse.getModels().get("ssb_model"); + Assert.assertFalse(change.isLoadTableAble()); + Assert.assertTrue(change.getLoadTables().isEmpty()); + } + + { + val mockChange = Mockito.spy(modelSchemaChange); + mockChange.setLoadTableAble(false); + mockChange.setLoadTables(Sets.newHashSet()); + Mockito.doReturn(true).when(mockChange).overwritable(); + Mockito.doReturn(false).when(mockChange).creatable(); + Mockito.doReturn(false).when(mockChange).importable(); + metadataCheckResponse.getModels().put("ssb_model", mockChange); + metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); + val change = metadataCheckResponse.getModels().get("ssb_model"); + Assert.assertFalse(change.isLoadTableAble()); + Assert.assertTrue(change.getLoadTables().isEmpty()); + } + + { + val mockChange = Mockito.spy(modelSchemaChange); + mockChange.setLoadTableAble(false); + mockChange.setLoadTables(Sets.newHashSet()); + Mockito.doReturn(false).when(mockChange).overwritable(); + Mockito.doReturn(true).when(mockChange).creatable(); + Mockito.doReturn(false).when(mockChange).importable(); + metadataCheckResponse.getModels().put("ssb_model", mockChange); + metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); + val change = metadataCheckResponse.getModels().get("ssb_model"); + Assert.assertFalse(change.isLoadTableAble()); + Assert.assertTrue(change.getLoadTables().isEmpty()); + } + + { + val mockChange = Mockito.spy(modelSchemaChange); + mockChange.setLoadTableAble(false); + mockChange.setLoadTables(Sets.newHashSet()); + Mockito.doReturn(false).when(mockChange).overwritable(); + Mockito.doReturn(false).when(mockChange).creatable(); + Mockito.doReturn(true).when(mockChange).importable(); + metadataCheckResponse.getModels().put("ssb_model", mockChange); + metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse); + val change = metadataCheckResponse.getModels().get("ssb_model"); + Assert.assertFalse(change.isLoadTableAble()); + Assert.assertTrue(change.getLoadTables().isEmpty()); + } + } + @Test public void testGetModelMetadataProjectName() throws IOException { File file = new File( @@ -1373,7 +1539,8 @@ public class MetaStoreServiceTest extends ServiceTestBase { @Test public void testMetadataChecker() throws IOException { File file = new File("src/test/resources/ut_model_metadata/ut_model_matadata.zip"); - val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file)); + val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, + Files.newInputStream(file.toPath())); KylinConfig modelConfig = KylinConfig.createKylinConfig(KylinConfig.getInstanceFromEnv()); MetadataChecker metadataChecker = new MetadataChecker(MetadataStore.createMetadataStore(modelConfig)); Map<String, RawResource> rawResourceMap = getRawResourceFromUploadFile(multipartFile); diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java index ff8a49d330..1747593dbb 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java @@ -148,7 +148,7 @@ public class TableServiceTest extends CSVSourceTestCase { @InjectMocks private FusionModelService fusionModelService = Mockito.spy(new FusionModelService()); - private StreamingJobListener eventListener = new StreamingJobListener(); + private final StreamingJobListener eventListener = new StreamingJobListener(); @Before public void setup() { @@ -458,8 +458,8 @@ public class TableServiceTest extends CSVSourceTestCase { @Test public void testAddAndBroadcastSparkSession() { getTestConfig().setProperty("kylin.env.use-dynamic-S3-role-credential-in-table", "true"); - TableExtDesc.S3RoleCredentialInfo roleCredentialInfo = null; - tableService.addAndBroadcastSparkSession(roleCredentialInfo); + tableService.addAndBroadcastSparkSession(null); + TableExtDesc.S3RoleCredentialInfo roleCredentialInfo; roleCredentialInfo = new TableExtDesc.S3RoleCredentialInfo("testbucket2", "", ""); tableService.addAndBroadcastSparkSession(roleCredentialInfo); assert !SparderEnv.getSparkSession().conf().contains("fs.s3a.bucket2.testbucket.aws.credentials.provider"); @@ -588,7 +588,6 @@ public class TableServiceTest extends CSVSourceTestCase { val originSize = nTableMetadataManager.listAllTables().size(); // Add partition_key and data_loading_range - DateRangeRequest request = mockDateRangeRequest(); tableService.setPartitionKey(tableName, "default", "CAL_DT", "yyyy-MM-dd"); // unload table @@ -607,7 +606,6 @@ public class TableServiceTest extends CSVSourceTestCase { @Test public void testUnloadKafkaTable() { String project = "streaming_test"; - NProjectManager npr = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()); NTableMetadataManager tableManager = NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), project); StreamingJobManager mgr = StreamingJobManager.getInstance(getTestConfig(), project); @@ -868,7 +866,7 @@ public class TableServiceTest extends CSVSourceTestCase { NDataLoadingRangeManager rangeManager = NDataLoadingRangeManager.getInstance(KylinConfig.getInstanceFromEnv(), "default"); NDataLoadingRange dataLoadingRange = rangeManager.getDataLoadingRange("DEFAULT.TEST_KYLIN_FACT"); - SegmentRange segmentRange = new SegmentRange.TimePartitionedSegmentRange(1294364400000L, 1294364500000L); + SegmentRange<Long> segmentRange = new SegmentRange.TimePartitionedSegmentRange(1294364400000L, 1294364500000L); dataLoadingRange.setCoveredRange(segmentRange); NDataLoadingRange updateRange = rangeManager.copyForWrite(dataLoadingRange); rangeManager.updateDataLoadingRange(updateRange); @@ -1059,13 +1057,6 @@ public class TableServiceTest extends CSVSourceTestCase { return request; } - private DateRangeRequest mockeDateRangeRequestWithoutTime() { - DateRangeRequest request = new DateRangeRequest(); - request.setProject("default"); - request.setTable("DEFAULT.TEST_KYLIN_FACT"); - return request; - } - @Test public void testGetProjectTables() throws Exception { NInitTablesResponse response; @@ -1073,36 +1064,31 @@ public class TableServiceTest extends CSVSourceTestCase { (databaseName, tableName) -> tableService.getTableNameResponses("default", databaseName, tableName)); Assert.assertEquals(0, response.getDatabases().size()); - response = tableService.getProjectTables("default", "SSB.CU", 0, 14, true, (databaseName, tableName) -> { - return tableService.getTableNameResponses("default", databaseName, tableName); - }); + response = tableService.getProjectTables("default", "SSB.CU", 0, 14, true, + (databaseName, tableName) -> tableService.getTableNameResponses("default", databaseName, tableName)); Assert.assertEquals(1, response.getDatabases().size()); - Assert.assertEquals(1, response.getDatabases().get(0).getTables().size()); + Assert.assertEquals(2, response.getDatabases().get(0).getTables().size()); - response = tableService.getProjectTables("default", "", 0, 14, true, (databaseName, tableName) -> { - return tableService.getTableNameResponses("default", databaseName, tableName); - }); + response = tableService.getProjectTables("default", "", 0, 14, true, + (databaseName, tableName) -> tableService.getTableNameResponses("default", databaseName, tableName)); Assert.assertEquals(3, response.getDatabases().size()); - Assert.assertEquals(20, + Assert.assertEquals(21, response.getDatabases().get(0).getTables().size() + response.getDatabases().get(1).getTables().size() + response.getDatabases().get(2).getTables().size()); - response = tableService.getProjectTables("default", "TEST", 0, 14, true, (databaseName, tableName) -> { - return tableService.getTableNameResponses("default", databaseName, tableName); - }); + response = tableService.getProjectTables("default", "TEST", 0, 14, true, + (databaseName, tableName) -> tableService.getTableNameResponses("default", databaseName, tableName)); Assert.assertEquals(2, response.getDatabases().size()); Assert.assertEquals(13, response.getDatabases().get(0).getTables().size() + response.getDatabases().get(1).getTables().size()); - response = tableService.getProjectTables("default", "EDW.", 0, 14, true, (databaseName, tableName) -> { - return tableService.getTableNameResponses("default", databaseName, tableName); - }); + response = tableService.getProjectTables("default", "EDW.", 0, 14, true, + (databaseName, tableName) -> tableService.getTableNameResponses("default", databaseName, tableName)); Assert.assertEquals(1, response.getDatabases().size()); Assert.assertEquals(3, response.getDatabases().get(0).getTables().size()); - response = tableService.getProjectTables("default", "EDW.", 0, 14, false, (databaseName, tableName) -> { - return tableService.getTableDesc("default", true, tableName, databaseName, true); - }); + response = tableService.getProjectTables("default", "EDW.", 0, 14, false, + (databaseName, tableName) -> tableService.getTableDesc("default", true, tableName, databaseName, true)); Assert.assertEquals(1, response.getDatabases().size()); Assert.assertEquals(3, response.getDatabases().get(0).getTables().size()); @@ -1167,7 +1153,7 @@ public class TableServiceTest extends CSVSourceTestCase { .setProperty("kylin.source.hive.databases", "ssb"); tableService.loadProjectHiveTableNameToCacheImmediately("default", true); tables = tableService.getTableNameResponsesInCache("default", "SSB", ""); - Assert.assertEquals(6, tables.size()); + Assert.assertEquals(7, tables.size()); NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject("default").setPrincipal("default"); tableService.loadHiveTableNameToCache(); @@ -1199,7 +1185,6 @@ public class TableServiceTest extends CSVSourceTestCase { testData.put("t", Arrays.asList("aa", "ab", "bc")); NHiveSourceInfo sourceInfo = new NHiveSourceInfo(); sourceInfo.setTables(testData); - UserGroupInformation ugi = UserGroupInformation.getLoginUser(); DataSourceState.getInstance().putCache("project#default", sourceInfo, Arrays.asList("aa", "ab", "bc")); List<?> tables = tableService.getTableNameResponsesInCache("default", "t", "a"); Assert.assertEquals(2, tables.size()); @@ -1306,7 +1291,7 @@ public class TableServiceTest extends CSVSourceTestCase { KylinConfig config = getTestConfig(); config.setProperty("kylin.source.load-hive-tablename-enabled", "false"); config.setProperty("kylin.query.security.acl-tcr-enabled", "true"); - Assert.assertEquals(6, tableService.getHiveTableNameResponses("default", "SSB", "").size()); + Assert.assertEquals(7, tableService.getHiveTableNameResponses("default", "SSB", "").size()); Assert.assertEquals(11, tableService.getHiveTableNameResponses("default", "DEFAULT", "").size()); val table = NTableMetadataManager.getInstance(getTestConfig(), "default").getTableDesc("DEFAULT.TEST_ENCODING"); @@ -1321,7 +1306,7 @@ public class TableServiceTest extends CSVSourceTestCase { acl.setTable(aclTable); manager.updateAclTCR(acl, "test", true); - Assert.assertEquals(6, tableService.getHiveTableNameResponses("default", "SSB", "").size()); + Assert.assertEquals(7, tableService.getHiveTableNameResponses("default", "SSB", "").size()); Assert.assertEquals(11, tableService.getHiveTableNameResponses("default", "DEFAULT", "").size()); config.setProperty("kylin.source.load-hive-tablename-enabled", "true"); config.setProperty("kylin.query.security.acl-tcr-enabled", "false"); @@ -1393,10 +1378,8 @@ public class TableServiceTest extends CSVSourceTestCase { try { List<Integer> sourceTypes = Arrays.asList(1, 9); val tableDescs2 = tableService.getTableDescByTypes(project, true, "", "SSB", false, sourceTypes); - assert tableDescs2.stream().filter(tableDesc -> tableDesc.getSourceType() == 1).collect(Collectors.toList()) - .size() > 0; - assert tableDescs2.stream().filter(tableDesc -> tableDesc.getSourceType() == 9).collect(Collectors.toList()) - .size() > 0; + assert tableDescs2.stream().anyMatch(tableDesc -> tableDesc.getSourceType() == 1); + assert tableDescs2.stream().anyMatch(tableDesc -> tableDesc.getSourceType() == 9); } catch (Exception e) { Assert.fail(); }