This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit b0a6fa377109fe6d2b0dd2601d7aa7073b534ed9 Author: Liang.Hua <36814772+jacob...@users.noreply.github.com> AuthorDate: Tue Apr 25 17:51:20 2023 +0800 KYLIN-5643 add public api for batch delete index Co-authored-by: liang.hua <liang....@kyligence.io> --- .../common/exception/code/ErrorCodeServer.java | 1 + .../resources/kylin_error_msg_conf_cn.properties | 1 + .../resources/kylin_error_msg_conf_en.properties | 1 + .../kylin_error_suggestion_conf_cn.properties | 1 + .../kylin_error_suggestion_conf_en.properties | 1 + .../main/resources/kylin_errorcode_conf.properties | 1 + .../rest/controller/NIndexPlanController.java | 2 +- .../controller/open/OpenIndexPlanController.java | 32 ++++++++++++ .../open/OpenIndexPlanControllerTest.java | 28 ++++++++++ .../kylin/rest/service/FusionIndexService.java | 61 ++++++++++++++++++---- .../kylin/rest/service/FusionIndexServiceTest.java | 49 +++++++++++++++++ 11 files changed, 167 insertions(+), 11 deletions(-) diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java index 91a14bb8ee..9f62f94734 100644 --- a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java +++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java @@ -121,6 +121,7 @@ public enum ErrorCodeServer implements ErrorCodeProducer { HIERARCHY_NOT_IN_DIMENSION("KE-010012209"), JOINT_NOT_IN_DIMENSION("KE-010012210"), DIMENSION_ONLY_SET_ONCE("KE-010012211"), + BASE_TABLE_INDEX_DELETE_DISABLE("KE-010012212"), // 10043XX parameter check REQUEST_PARAMETER_EMPTY_OR_VALUE_EMPTY("KE-010043201"), diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties index ee75b68336..4af7f8862c 100644 --- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties +++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties @@ -122,6 +122,7 @@ KE-010012208=必需维度必须在 'dimension' 参数中,请修改后重试。 KE-010012209=层级维度必须在 'dimension' 参数中,请修改后重试。 KE-010012210=联合维度必须在 'dimension' 参数中,请修改后重试。 KE-010012211=任一维度只能在必需维度、层级维度或联合维度中设置一次,请修改后重试。 +KE-010012212=删除基础明细索引,请先关闭模型的分层存储。 ## 10043XX parameter check KE-010043201=请求参数 “%s” 为空或值为空。请检查请求参数是否正确填写。 diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties index 511f9b6ee0..c4a10ecbf1 100644 --- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties +++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties @@ -121,6 +121,7 @@ KE-010012208=The mandatory dimension must be included in the 'dimension' paramet KE-010012209=The hierarchy dimension must be included in the 'dimension' parameter. Please modify it and try again. KE-010012210=The joint dimension must be included in the 'dimension' parameter. Please modify it and try again. KE-010012211=Any dimension can only be set once in mandatory dimension, hierarchy dimension or joint dimension. Please modify and try again. +KE-010012212=To delete the base table index, please turn off tiered storage for the model. ## 10043XX parameter check KE-010043201=Request parameter "%s" is empty or value is empty. Please check the request parameters. diff --git a/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties index 43af745ba9..758fd6b11a 100644 --- a/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties +++ b/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties @@ -109,6 +109,7 @@ KE-010010205= KE-010012201= KE-010012202= KE-010012203= +KE-010012212= ## 10043XX parameter check KE-010043201= diff --git a/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties b/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties index 6dc32307e2..0b45ebf647 100644 --- a/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties +++ b/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties @@ -109,6 +109,7 @@ KE-010010205= KE-010012201= KE-010012202= KE-010012203= +KE-010012212= ## 10043XX parameter check KE-010043201= diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf.properties b/src/core-common/src/main/resources/kylin_errorcode_conf.properties index 30f304c8e0..21699ac510 100644 --- a/src/core-common/src/main/resources/kylin_errorcode_conf.properties +++ b/src/core-common/src/main/resources/kylin_errorcode_conf.properties @@ -133,6 +133,7 @@ KE-010012208 KE-010012209 KE-010012210 KE-010012211 +KE-010012212 ## 10043XX parameter check KE-010043201 diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java index fe5bdc8cfc..68b30631f9 100644 --- a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java +++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NIndexPlanController.java @@ -223,7 +223,7 @@ public class NIndexPlanController extends NBasicController { } @ApiOperation(value = "batch deleteIndex", tags = { "AI" }) - @DeleteMapping(value = "/index") + @DeleteMapping(value = "/index", produces = HTTP_VND_APACHE_KYLIN_JSON) public EnvelopeResponse<String> batchDeleteIndex(@RequestParam(value = "layout_ids") Set<Long> layoutIds, @RequestParam(value = "project") String project, @RequestParam(value = "model") String modelId) { checkProjectName(project); diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/open/OpenIndexPlanController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/open/OpenIndexPlanController.java index d032bd14fb..ec2bf2b7c3 100644 --- a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/open/OpenIndexPlanController.java +++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/open/OpenIndexPlanController.java @@ -19,10 +19,15 @@ package org.apache.kylin.rest.controller.open; import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON; +import static org.apache.kylin.common.exception.code.ErrorCodeServer.LAYOUT_LIST_EMPTY; import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NOT_EXIST; +import java.util.Set; + +import org.apache.commons.collections.CollectionUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.exception.KylinException; +import org.apache.kylin.metadata.cube.model.IndexEntity; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.rest.controller.NBasicController; @@ -35,11 +40,14 @@ import org.apache.kylin.rest.response.OpenAddAggGroupResponse; import org.apache.kylin.rest.service.FusionIndexService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import io.swagger.annotations.ApiOperation; import lombok.val; @RestController @@ -48,6 +56,8 @@ public class OpenIndexPlanController extends NBasicController { private static final String MODEL_ALIAS = "model"; + private static final String MODEL_NAME = "model_name"; + private static final String AGGREGATION_GROUPS = "aggregation_groups"; @Autowired @@ -75,6 +85,28 @@ public class OpenIndexPlanController extends NBasicController { return convertResponse(response); } + @ApiOperation(value = "batch deleteIndex", tags = { "AI" }) + @DeleteMapping(value = "/index") + public EnvelopeResponse<String> batchDeleteIndex(@RequestParam(value = "index_ids") Set<Long> layoutIds, + @RequestParam(value = "project") String project, @RequestParam(value = "model_name") String modelName, + @RequestParam(value = "index_range", required = false) IndexEntity.Range indexRange) { + checkProjectName(project); + checkRequiredArg(MODEL_NAME, modelName); + if (null == indexRange) { + indexRange = IndexEntity.Range.BATCH; + } + NDataModel dataModel = NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project) + .getDataModelDescByAlias(modelName); + if (null == dataModel) { + throw new KylinException(MODEL_NOT_EXIST); + } + if (CollectionUtils.isEmpty(layoutIds)) { + throw new KylinException(LAYOUT_LIST_EMPTY); + } + fusionIndexService.batchRemoveIndex(project, dataModel.getUuid(), layoutIds, indexRange); + return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, "", ""); + } + private EnvelopeResponse<OpenAddAggGroupResponse> convertResponse( EnvelopeResponse<DiffRuleBasedIndexResponse> internal) { if (internal != null && internal.getData() != null) { diff --git a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/open/OpenIndexPlanControllerTest.java b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/open/OpenIndexPlanControllerTest.java index 6fba06521e..6d43a7828b 100644 --- a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/open/OpenIndexPlanControllerTest.java +++ b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/open/OpenIndexPlanControllerTest.java @@ -29,6 +29,7 @@ import org.apache.kylin.common.exception.KylinException; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.NLocalFileMetadataTestCase; import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.rest.constant.Constant; @@ -156,4 +157,31 @@ public class OpenIndexPlanControllerTest extends NLocalFileMetadataTestCase { Assert.assertTrue(mvcResult.getResponse().getContentAsString().contains("KE-010043201")); Mockito.verify(openIndexPlanController).updateRule(openRequest); } + + @Test + public void testBatchDeleteIndex() throws Exception { + String project = "default"; + String modelName = "nmodel_basic"; + mockMvc.perform(MockMvcRequestBuilders.delete("/api/index_plans/index").contentType(MediaType.APPLICATION_JSON) + .param("project", project).param("model_name", modelName).param("index_ids", "1,1001") + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON))) + .andExpect(MockMvcResultMatchers.status().isOk()); + Mockito.doNothing().when(fusionIndexService).batchRemoveIndex(project, modelName, Sets.newHashSet(1L, 1001L), + null); + Mockito.verify(openIndexPlanController).batchDeleteIndex(Sets.newHashSet(1L, 1001L), project, modelName, null); + + MvcResult result = mockMvc + .perform(MockMvcRequestBuilders.delete("/api/index_plans/index").contentType(MediaType.APPLICATION_JSON) + .param("project", project).param("model_name", "no_exist_model").param("index_ids", "1,1001") + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON))) + .andExpect(MockMvcResultMatchers.status().is5xxServerError()).andReturn(); + Assert.assertTrue(result.getResponse().getContentAsString().contains("KE-010002201")); + + MvcResult mvcResult = mockMvc + .perform(MockMvcRequestBuilders.delete("/api/index_plans/index").contentType(MediaType.APPLICATION_JSON) + .param("project", project).param("model_name", modelName).param("index_ids", "") + .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON))) + .andExpect(MockMvcResultMatchers.status().is5xxServerError()).andReturn(); + Assert.assertTrue(mvcResult.getResponse().getContentAsString().contains("KE-010043212")); + } } diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java index 2d5c51c47d..a7f2c67d68 100644 --- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java +++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/FusionIndexService.java @@ -29,6 +29,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -78,6 +79,7 @@ import org.apache.kylin.streaming.metadata.StreamingJobMeta; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import io.kyligence.kap.secondstorage.SecondStorageUtil; import lombok.val; import lombok.extern.slf4j.Slf4j; @@ -277,18 +279,14 @@ public class FusionIndexService extends BasicService { checkStreamingIndexEnabled(project, modelDesc); if (modelDesc.fusionModelStreamingPart()) { - if (!indexChangeEnable(project, model, indexRange, - Lists.newArrayList(IndexEntity.Range.HYBRID, Range.STREAMING, Range.EMPTY))) { - throw new KylinException(ServerErrorCode.STREAMING_INDEX_UPDATE_DISABLE, - String.format(Locale.ROOT, MsgPicker.getMsg().getStreamingIndexesDelete())); - } + checkStreamingIndexDeleteEnabledWithIndexRange(project, model, indexRange); FusionModel fusionModel = getManager(FusionModelManager.class, project).getFusionModel(model); String batchId = fusionModel.getBatchModel().getUuid(); if (IndexEntity.Range.BATCH == indexRange) { indexPlanService.removeIndex(project, batchId, id); return; } else if (IndexEntity.Range.HYBRID == indexRange) { - removeHybridIndex(project, batchId, id); + removeHybridIndex(project, batchId, Sets.newHashSet(id)); } } indexPlanService.removeIndex(project, model, id); @@ -305,13 +303,44 @@ public class FusionIndexService extends BasicService { indexPlanService.removeIndexes(project, modelId, ids); } - private void removeHybridIndex(String project, String model, final long id) { - val indexPlan = getManager(NIndexPlanManager.class, project).getIndexPlan(model); - if (indexPlan.getLayoutEntity(id) != null) { - indexPlanService.removeIndex(project, model, id); + @Transaction(project = 0) + public void batchRemoveIndex(String project, String modelId, Set<Long> ids, IndexEntity.Range indexRange) { + NDataModel modelDesc = getManager(NDataModelManager.class, project).getDataModelDesc(modelId); + checkSecondStorageBaseTableIndexEnabled(project, modelDesc, ids); + checkStreamingIndexEnabled(project, modelDesc); + if (!modelDesc.fusionModelStreamingPart()) { + indexPlanService.removeIndexes(project, modelId, ids); + return; + } + checkStreamingIndexDeleteEnabledWithIndexRange(project, modelId, indexRange); + FusionModel fusionModel = getManager(FusionModelManager.class, project).getFusionModel(modelId); + String batchId = fusionModel.getBatchModel().getUuid(); + if (IndexEntity.Range.BATCH == indexRange) { + indexPlanService.removeIndexes(project, batchId, ids); + return; + } + + indexPlanService.removeIndexes(project, modelId, ids); + if (IndexEntity.Range.HYBRID == indexRange) { + removeHybridIndex(project, batchId, ids); } } + private void checkStreamingIndexDeleteEnabledWithIndexRange(String project, String modelId, + IndexEntity.Range indexRange) { + if (!indexChangeEnable(project, modelId, indexRange, + Lists.newArrayList(IndexEntity.Range.HYBRID, Range.STREAMING, Range.EMPTY))) { + throw new KylinException(ServerErrorCode.STREAMING_INDEX_UPDATE_DISABLE, + String.format(Locale.ROOT, MsgPicker.getMsg().getStreamingIndexesDelete())); + } + } + + private void removeHybridIndex(String project, String model, final Set<Long> ids) { + val indexPlan = getManager(NIndexPlanManager.class, project).getIndexPlan(model); + ids.stream().filter(id -> indexPlan.getLayoutEntity(id) != null) + .forEach(id -> indexPlanService.removeIndex(project, model, id)); + } + public AggIndexResponse calculateAggIndexCount(UpdateRuleBasedCuboidRequest request) { if (isFusionModel(request.getProject(), request.getModelId())) { @@ -593,6 +622,18 @@ public class FusionIndexService extends BasicService { } } + private static void checkSecondStorageBaseTableIndexEnabled(String project, NDataModel model, Set<Long> ids) + throws KylinException { + IndexPlan indexPlan = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project) + .getIndexPlan(model.getUuid()); + boolean checkCannotDeleteEnabled = SecondStorageUtil.isModelEnable(project, model.getUuid()) + && ids.stream().map(indexPlan::getLayoutEntity).filter(Objects::nonNull) + .anyMatch(layout -> layout.isBase() && layout.getIndex().isTableIndex()); + if (checkCannotDeleteEnabled) { + throw new KylinException(ErrorCodeServer.BASE_TABLE_INDEX_DELETE_DISABLE); + } + } + private static boolean indexChangeEnable(String project, String modelId, IndexEntity.Range range, List<IndexEntity.Range> ranges) { if (!ranges.contains(range)) { diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java index 674692a341..52435701a5 100644 --- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java +++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/FusionIndexServiceTest.java @@ -34,6 +34,7 @@ import org.apache.kylin.common.exception.code.ErrorCodeServer; import org.apache.kylin.common.util.RandomUtil; import org.apache.kylin.cube.model.SelectRule; import org.apache.kylin.guava30.shaded.common.collect.Lists; +import org.apache.kylin.guava30.shaded.common.collect.Sets; import org.apache.kylin.metadata.cube.cuboid.NAggregationGroup; import org.apache.kylin.metadata.cube.model.IndexEntity; import org.apache.kylin.metadata.cube.model.IndexEntity.Range; @@ -42,6 +43,8 @@ import org.apache.kylin.metadata.cube.model.LayoutEntity; import org.apache.kylin.metadata.cube.model.NDataflow; import org.apache.kylin.metadata.cube.model.NDataflowManager; import org.apache.kylin.metadata.cube.model.NIndexPlanManager; +import org.apache.kylin.metadata.model.FusionModel; +import org.apache.kylin.metadata.model.FusionModelManager; import org.apache.kylin.metadata.model.NDataModel; import org.apache.kylin.metadata.model.NDataModelManager; import org.apache.kylin.metadata.model.SegmentRange; @@ -822,6 +825,52 @@ public class FusionIndexServiceTest extends SourceTestCase { Assert.assertEquals(0, indexPlanManager.getIndexPlan(batchId).getAllLayouts().size()); } + @Test + public void testBatchRemoveIndex() { + val project = "streaming_test"; + val model = "4965c827-fbb4-4ea1-a744-3f341a3b030d"; + + val indexPlanManager = NIndexPlanManager.getInstance(KylinConfig.getInstanceFromEnv(), project); + val dfMgr = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), "streaming_test"); + val df = dfMgr.getDataflow(model); + val segRange = new SegmentRange.KafkaOffsetPartitionedSegmentRange(10L, 100L, + createKafkaPartitionOffset(0, 200L), createKafkaPartitionOffset(0, 400L)); + dfMgr.appendSegmentForStreaming(df, segRange, RandomUtil.randomUUIDStr()); + try { + fusionIndexService.batchRemoveIndex("streaming_test", model, + indexPlanManager.getIndexPlan(model).getAllLayoutIds(false), Range.STREAMING); + } catch (KylinException e) { + Assert.assertEquals(ServerErrorCode.STREAMING_INDEX_UPDATE_DISABLE.toErrorCode().getCodeString(), + e.getErrorCode().getCodeString()); + } + + val modelId = "b05034a8-c037-416b-aa26-9e6b4a41ee40"; + Assert.assertEquals(5, indexPlanManager.getIndexPlan(modelId).getAllLayouts().size()); + fusionIndexService.batchRemoveIndex(project, modelId, Sets.newHashSet(20000040001L), Range.STREAMING); + FusionModel fusionModel = KylinConfig.getInstanceFromEnv() + .getManager("streaming_test", FusionModelManager.class).getFusionModel(modelId); + String batchId = fusionModel.getBatchModel().getUuid(); + Assert.assertEquals(4, indexPlanManager.getIndexPlan(modelId).getAllLayouts().size()); + + Assert.assertEquals(3, indexPlanManager.getIndexPlan(batchId).getAllLayouts().size()); + fusionIndexService.batchRemoveIndex(project, modelId, Sets.newHashSet(10001L), Range.BATCH); + Assert.assertEquals(4, indexPlanManager.getIndexPlan(modelId).getAllLayouts().size()); + Assert.assertEquals(2, indexPlanManager.getIndexPlan(batchId).getAllLayouts().size()); + + fusionIndexService.batchRemoveIndex(project, batchId, Sets.newHashSet(1L), Range.BATCH); + Assert.assertEquals(1, indexPlanManager.getIndexPlan(batchId).getAllLayouts().size()); + + Assert.assertTrue( + indexPlanManager.getIndexPlan(modelId).getAllLayouts().stream().anyMatch(e -> e.getId() == 20001L)); + Assert.assertTrue( + indexPlanManager.getIndexPlan(batchId).getAllLayouts().stream().anyMatch(e -> e.getId() == 20001L)); + fusionIndexService.batchRemoveIndex(project, modelId, Sets.newHashSet(20001L), Range.HYBRID); + Assert.assertFalse( + indexPlanManager.getIndexPlan(modelId).getAllLayouts().stream().anyMatch(e -> e.getId() == 20001L)); + Assert.assertFalse( + indexPlanManager.getIndexPlan(batchId).getAllLayouts().stream().anyMatch(e -> e.getId() == 20001L)); + } + @Test public void testRemoveIndexe() throws Exception { val modelId = "b05034a8-c037-416b-aa26-9e6b4a41ee40";