This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 7cab52b1f03102d43bfac700f3b7cddc3de1cd96 Author: Shuai li <loney...@live.cn> AuthorDate: Wed Oct 12 19:54:30 2022 +0800 fix secondstorage index refresh locked --- .../kap/secondstorage/SecondStorageIndexTest.java | 2 +- .../job/ClickhouseRefreshSecondaryIndex.java | 47 +++-------------- .../kap/clickhouse/job/RefreshSecondaryIndex.java | 60 ++++++++++------------ .../management/SecondStorageService.java | 8 +-- 4 files changed, 39 insertions(+), 78 deletions(-) diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java index b5473c9f1f..767b045172 100644 --- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java +++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java @@ -349,7 +349,7 @@ public class SecondStorageIndexTest implements JobWaiter { String jobId = updatePrimaryIndexAndSecondaryIndex(modelName, null, Sets.newHashSet()); waitJobEnd(getProject(), jobId); - assertThrows(String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), getProject()), + assertThrows(String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), getProject()), KylinException.class, () -> updatePrimaryIndexAndSecondaryIndex(modelName, null, secondaryIndex)); clickhouse[0].start(); ClickHouseUtils.internalConfigClickHouse(clickhouse, replica); diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java index 6f818af516..632cf60a88 100644 --- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java +++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java @@ -22,14 +22,12 @@ import static io.kyligence.kap.secondstorage.SecondStorageConstants.STEP_SECOND_ import static io.kyligence.kap.secondstorage.SecondStorageUtil.getTableFlow; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kylin.common.persistence.transaction.UnitOfWork; @@ -39,9 +37,7 @@ import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; -import org.apache.kylin.metadata.cube.model.LayoutEntity; import org.apache.kylin.metadata.cube.model.NDataflowManager; -import org.apache.kylin.metadata.cube.model.NIndexPlanManager; import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import com.fasterxml.jackson.core.type.TypeReference; @@ -102,14 +98,15 @@ public class ClickhouseRefreshSecondaryIndex extends AbstractExecutable { } } + val dataflow = NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(modelId); + String database = NameUtil.getDatabase(getConfig(), getProject()); + String table = NameUtil.getTable(dataflow, layoutId); + List<Future<?>> results = Lists.newArrayList(); List<SecondStorageNode> nodes = SecondStorageUtil.listProjectNodes(getProject()); - List<RefreshSecondaryIndex> allJob = getAddIndexJob(nodes, newIndexes, layoutId); - allJob.addAll(getToBeDeleteIndexJob(nodes, toBeDeleteIndexed, layoutId)); - - List<Future<?>> results = new ArrayList<>(); val taskPool = new ThreadPoolExecutor(nodes.size(), nodes.size(), 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("Refresh Tiered Storage Index")); - allJob.forEach(job -> results.add(taskPool.submit(job::refresh))); + nodes.forEach(node -> results.add(taskPool.submit(() -> new RefreshSecondaryIndex(node.getName(), database, + table, newIndexes, toBeDeleteIndexed, dataflow).refresh()))); try { for (Future<?> result : results) { @@ -128,36 +125,4 @@ public class ClickhouseRefreshSecondaryIndex extends AbstractExecutable { return ExecuteResult.createSucceed(); }); } - - private List<RefreshSecondaryIndex> getAddIndexJob(List<SecondStorageNode> nodes, Set<Integer> newIndexes, - long layoutId) { - String modelId = getTargetSubject(); - val indexPlan = NIndexPlanManager.getInstance(getConfig(), project).getIndexPlan(modelId); - - if (indexPlan == null || indexPlan.getLayoutEntity(layoutId) == null) { - return Lists.newArrayList(); - } - - LayoutEntity layout = indexPlan.getLayoutEntity(layoutId); - String database = NameUtil.getDatabase(getConfig(), getProject()); - String table = NameUtil.getTable(NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(modelId), - layoutId); - return nodes.stream() - .flatMap(node -> newIndexes.stream().map(column -> new RefreshSecondaryIndex(node.getName(), database, - table, column, layout, RefreshSecondaryIndex.Type.ADD))) - .collect(Collectors.toList()); - } - - private List<RefreshSecondaryIndex> getToBeDeleteIndexJob(List<SecondStorageNode> nodes, - Set<Integer> toBeDeleteIndexed, long layoutId) { - String modelId = getTargetSubject(); - String database = NameUtil.getDatabase(getConfig(), getProject()); - String table = NameUtil.getTable(NDataflowManager.getInstance(getConfig(), getProject()).getDataflow(modelId), - layoutId); - - return nodes.stream() - .flatMap(node -> toBeDeleteIndexed.stream().map(column -> new RefreshSecondaryIndex(node.getName(), - database, table, column, null, RefreshSecondaryIndex.Type.DELETE))) - .collect(Collectors.toList()); - } } diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java index 6c6c71193c..8b021828cc 100644 --- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java +++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java @@ -26,9 +26,7 @@ import java.util.Set; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.metadata.cube.model.LayoutEntity; -import org.apache.kylin.metadata.cube.model.NDataflowManager; -import org.apache.kylin.metadata.model.NDataModel; +import org.apache.kylin.metadata.cube.model.NDataflow; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; @@ -49,33 +47,32 @@ import lombok.extern.slf4j.Slf4j; @Getter @Slf4j public class RefreshSecondaryIndex { - @JsonProperty("node") private String node; @JsonProperty("database") private String database; @JsonProperty("table") private String table; - @JsonProperty("column_id") - private Integer columnId; - @JsonProperty("type") - private Type type; + @JsonProperty("add_indexes") + private Set<Integer> addIndexes; + @JsonProperty("delete_indexes") + private Set<Integer> deleteIndexes; @JsonIgnore - private LayoutEntity layoutEntity; + private NDataflow dataflow; public RefreshSecondaryIndex() { // empty } - public RefreshSecondaryIndex(String node, String database, String table, Integer columnId, - LayoutEntity layoutEntity, Type type) { + public RefreshSecondaryIndex(String node, String database, String table, Set<Integer> addIndexes, + Set<Integer> deleteIndexes, NDataflow dataflow) { this.node = node; this.database = database; this.table = table; - this.columnId = columnId; - this.layoutEntity = layoutEntity; - this.type = type; + this.dataflow = dataflow; + this.addIndexes = addIndexes; + this.deleteIndexes = deleteIndexes; } public void refresh() { @@ -88,29 +85,31 @@ public class RefreshSecondaryIndex { return; } Set<String> existSkipIndex = existSkippingIndex(clickHouse, database, table); - String column = getPrefixColumn(String.valueOf(columnId)); - String indexName = ClickHouseNameUtil.getSkippingIndexName(table, column); - if (type == Type.ADD) { - addSkippingIndex(clickHouse, tableIdentifier, column, indexName, existSkipIndex); - } else if (type == Type.DELETE) { - deleteSkippingIndex(clickHouse, tableIdentifier, indexName, existSkipIndex); + + for (Integer deleteIndexColumnId : deleteIndexes) { + deleteSkippingIndex(clickHouse, tableIdentifier, deleteIndexColumnId, existSkipIndex); + } + + for (Integer addIndexColumnId : addIndexes) { + addSkippingIndex(clickHouse, tableIdentifier, addIndexColumnId, existSkipIndex); } } catch (SQLException e) { - log.error("node {} clean index {}.{} failed", node, database, table); + log.error("node {} update index {}.{} failed", node, database, table); ExceptionUtils.rethrow(e); } } - private void addSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, String column, - String indexName, Set<String> existSkipIndex) throws SQLException { - NDataModel model = layoutEntity.getModel(); - KylinConfig modelConfig = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), model.getProject()) - .getDataflow(model.getId()).getConfig(); + private void addSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, int columnId, + Set<String> existSkipIndex) throws SQLException { + String column = getPrefixColumn(String.valueOf(columnId)); + String indexName = ClickHouseNameUtil.getSkippingIndexName(table, column); + KylinConfig modelConfig = dataflow.getConfig(); int granularity = modelConfig.getSecondStorageSkippingIndexGranularity(); val render = new ClickHouseRender(); String expr = SkippingIndexChooser - .getSkippingIndexType(layoutEntity.getOrderedDimensions().get(columnId).getType()).toSql(modelConfig); + .getSkippingIndexType(dataflow.getModel().getEffectiveDimensions().get(columnId).getType()) + .toSql(modelConfig); AlterTable alterTable = new AlterTable(tableIdentifier, new AlterTable.ManipulateIndex(indexName, column, expr, granularity)); AlterTable materializeTable = new AlterTable(tableIdentifier, @@ -122,8 +121,9 @@ public class RefreshSecondaryIndex { clickHouse.apply(materializeTable.toSql(render)); } - private void deleteSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, String indexName, + private void deleteSkippingIndex(ClickHouse clickHouse, TableIdentifier tableIdentifier, int columnId, Set<String> existSkipIndex) throws SQLException { + String indexName = ClickHouseNameUtil.getSkippingIndexName(table, getPrefixColumn(String.valueOf(columnId))); if (!existSkipIndex.contains(indexName)) { return; } @@ -150,8 +150,4 @@ public class RefreshSecondaryIndex { return Sets.newHashSet(); } - - enum Type { - ADD, DELETE; - } } diff --git a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java index eb43fddf83..b0886204aa 100644 --- a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java +++ b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java @@ -1158,14 +1158,14 @@ public class SecondStorageService extends BasicService implements SecondStorageU SecondStorageUtil.validateProjectLock(project, Collections.singletonList(LockTypeEnum.LOAD.name())); List<AbstractExecutable> jobs = getRelationJobsWithoutFinish(project, modelId); if (!jobs.isEmpty()) { - throw new KylinException(JobErrorCode.SECOND_STORAGE_PROJECT_JOB_EXISTS, - String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), project)); + throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS, + String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), project)); } jobs = getJobs(project, modelId, Sets.newHashSet(ExecutableState.ERROR), Sets.newHashSet(JobTypeEnum.SECOND_STORAGE_REFRESH_SECONDARY_INDEXES)); if (!jobs.isEmpty()) { - throw new KylinException(JobErrorCode.SECOND_STORAGE_PROJECT_JOB_EXISTS, - String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageProjectJobExists(), project)); + throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS, + String.format(Locale.ROOT, MsgPicker.getMsg().getSecondStorageConcurrentOperate(), project)); } }