This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7cab52b1f03102d43bfac700f3b7cddc3de1cd96
Author: Shuai li <loney...@live.cn>
AuthorDate: Wed Oct 12 19:54:30 2022 +0800

    fix secondstorage index refresh locked
---
 .../kap/secondstorage/SecondStorageIndexTest.java  |  2 +-
 .../job/ClickhouseRefreshSecondaryIndex.java       | 47 +++--------------
 .../kap/clickhouse/job/RefreshSecondaryIndex.java  | 60 ++++++++++------------
 .../management/SecondStorageService.java           |  8 +--
 4 files changed, 39 insertions(+), 78 deletions(-)

diff --git 
a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
 
b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
index b5473c9f1f..767b045172 100644
--- 
a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
+++ 
b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java
@@ -349,7 +349,7 @@ public class SecondStorageIndexTest implements JobWaiter {
         String jobId = updatePrimaryIndexAndSecondaryIndex(modelName, null, 
Sets.newHashSet());
         waitJobEnd(getProject(), jobId);
 
-        assertThrows(String.format(Locale.ROOT, 
MsgPicker.getMsg().getSecondStorageProjectJobExists(), getProject()),
+        assertThrows(String.format(Locale.ROOT, 
MsgPicker.getMsg().getSecondStorageConcurrentOperate(), getProject()),
                 KylinException.class, () -> 
updatePrimaryIndexAndSecondaryIndex(modelName, null, secondaryIndex));
         clickhouse[0].start();
         ClickHouseUtils.internalConfigClickHouse(clickhouse, replica);
diff --git 
a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java
 
b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java
index 6f818af516..632cf60a88 100644
--- 
a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java
+++ 
b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/ClickhouseRefreshSecondaryIndex.java
@@ -22,14 +22,12 @@ import static 
io.kyligence.kap.secondstorage.SecondStorageConstants.STEP_SECOND_
 import static io.kyligence.kap.secondstorage.SecondStorageUtil.getTableFlow;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
@@ -39,9 +37,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
-import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -102,14 +98,15 @@ public class ClickhouseRefreshSecondaryIndex extends 
AbstractExecutable {
                 }
             }
 
+            val dataflow = NDataflowManager.getInstance(getConfig(), 
getProject()).getDataflow(modelId);
+            String database = NameUtil.getDatabase(getConfig(), getProject());
+            String table = NameUtil.getTable(dataflow, layoutId);
+            List<Future<?>> results = Lists.newArrayList();
             List<SecondStorageNode> nodes = 
SecondStorageUtil.listProjectNodes(getProject());
-            List<RefreshSecondaryIndex> allJob = getAddIndexJob(nodes, 
newIndexes, layoutId);
-            allJob.addAll(getToBeDeleteIndexJob(nodes, toBeDeleteIndexed, 
layoutId));
-
-            List<Future<?>> results = new ArrayList<>();
             val taskPool = new ThreadPoolExecutor(nodes.size(), nodes.size(), 
0L, TimeUnit.MILLISECONDS,
                     new LinkedBlockingQueue<>(), new 
NamedThreadFactory("Refresh Tiered Storage Index"));
-            allJob.forEach(job -> results.add(taskPool.submit(job::refresh)));
+            nodes.forEach(node -> results.add(taskPool.submit(() -> new 
RefreshSecondaryIndex(node.getName(), database,
+                    table, newIndexes, toBeDeleteIndexed, 
dataflow).refresh())));
 
             try {
                 for (Future<?> result : results) {
@@ -128,36 +125,4 @@ public class ClickhouseRefreshSecondaryIndex extends 
AbstractExecutable {
             return ExecuteResult.createSucceed();
         });
     }
-
-    private List<RefreshSecondaryIndex> getAddIndexJob(List<SecondStorageNode> 
nodes, Set<Integer> newIndexes,
-            long layoutId) {
-        String modelId = getTargetSubject();
-        val indexPlan = NIndexPlanManager.getInstance(getConfig(), 
project).getIndexPlan(modelId);
-
-        if (indexPlan == null || indexPlan.getLayoutEntity(layoutId) == null) {
-            return Lists.newArrayList();
-        }
-
-        LayoutEntity layout = indexPlan.getLayoutEntity(layoutId);
-        String database = NameUtil.getDatabase(getConfig(), getProject());
-        String table = 
NameUtil.getTable(NDataflowManager.getInstance(getConfig(), 
getProject()).getDataflow(modelId),
-                layoutId);
-        return nodes.stream()
-                .flatMap(node -> newIndexes.stream().map(column -> new 
RefreshSecondaryIndex(node.getName(), database,
-                        table, column, layout, 
RefreshSecondaryIndex.Type.ADD)))
-                .collect(Collectors.toList());
-    }
-
-    private List<RefreshSecondaryIndex> 
getToBeDeleteIndexJob(List<SecondStorageNode> nodes,
-            Set<Integer> toBeDeleteIndexed, long layoutId) {
-        String modelId = getTargetSubject();
-        String database = NameUtil.getDatabase(getConfig(), getProject());
-        String table = 
NameUtil.getTable(NDataflowManager.getInstance(getConfig(), 
getProject()).getDataflow(modelId),
-                layoutId);
-
-        return nodes.stream()
-                .flatMap(node -> toBeDeleteIndexed.stream().map(column -> new 
RefreshSecondaryIndex(node.getName(),
-                        database, table, column, null, 
RefreshSecondaryIndex.Type.DELETE)))
-                .collect(Collectors.toList());
-    }
 }
diff --git 
a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java
 
b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java
index 6c6c71193c..8b021828cc 100644
--- 
a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java
+++ 
b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/job/RefreshSecondaryIndex.java
@@ -26,9 +26,7 @@ import java.util.Set;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.cube.model.LayoutEntity;
-import org.apache.kylin.metadata.cube.model.NDataflowManager;
-import org.apache.kylin.metadata.model.NDataModel;
+import org.apache.kylin.metadata.cube.model.NDataflow;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -49,33 +47,32 @@ import lombok.extern.slf4j.Slf4j;
 @Getter
 @Slf4j
 public class RefreshSecondaryIndex {
-
     @JsonProperty("node")
     private String node;
     @JsonProperty("database")
     private String database;
     @JsonProperty("table")
     private String table;
-    @JsonProperty("column_id")
-    private Integer columnId;
-    @JsonProperty("type")
-    private Type type;
+    @JsonProperty("add_indexes")
+    private Set<Integer> addIndexes;
+    @JsonProperty("delete_indexes")
+    private Set<Integer> deleteIndexes;
 
     @JsonIgnore
-    private LayoutEntity layoutEntity;
+    private NDataflow dataflow;
 
     public RefreshSecondaryIndex() {
         // empty
     }
 
-    public RefreshSecondaryIndex(String node, String database, String table, 
Integer columnId,
-            LayoutEntity layoutEntity, Type type) {
+    public RefreshSecondaryIndex(String node, String database, String table, 
Set<Integer> addIndexes,
+            Set<Integer> deleteIndexes, NDataflow dataflow) {
         this.node = node;
         this.database = database;
         this.table = table;
-        this.columnId = columnId;
-        this.layoutEntity = layoutEntity;
-        this.type = type;
+        this.dataflow = dataflow;
+        this.addIndexes = addIndexes;
+        this.deleteIndexes = deleteIndexes;
     }
 
     public void refresh() {
@@ -88,29 +85,31 @@ public class RefreshSecondaryIndex {
                 return;
             }
             Set<String> existSkipIndex = existSkippingIndex(clickHouse, 
database, table);
-            String column = getPrefixColumn(String.valueOf(columnId));
-            String indexName = ClickHouseNameUtil.getSkippingIndexName(table, 
column);
-            if (type == Type.ADD) {
-                addSkippingIndex(clickHouse, tableIdentifier, column, 
indexName, existSkipIndex);
-            } else if (type == Type.DELETE) {
-                deleteSkippingIndex(clickHouse, tableIdentifier, indexName, 
existSkipIndex);
+
+            for (Integer deleteIndexColumnId : deleteIndexes) {
+                deleteSkippingIndex(clickHouse, tableIdentifier, 
deleteIndexColumnId, existSkipIndex);
+            }
+
+            for (Integer addIndexColumnId : addIndexes) {
+                addSkippingIndex(clickHouse, tableIdentifier, 
addIndexColumnId, existSkipIndex);
             }
         } catch (SQLException e) {
-            log.error("node {} clean index {}.{} failed", node, database, 
table);
+            log.error("node {} update index {}.{} failed", node, database, 
table);
             ExceptionUtils.rethrow(e);
         }
     }
 
-    private void addSkippingIndex(ClickHouse clickHouse, TableIdentifier 
tableIdentifier, String column,
-            String indexName, Set<String> existSkipIndex) throws SQLException {
-        NDataModel model = layoutEntity.getModel();
-        KylinConfig modelConfig = 
NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), 
model.getProject())
-                .getDataflow(model.getId()).getConfig();
+    private void addSkippingIndex(ClickHouse clickHouse, TableIdentifier 
tableIdentifier, int columnId,
+            Set<String> existSkipIndex) throws SQLException {
+        String column = getPrefixColumn(String.valueOf(columnId));
+        String indexName = ClickHouseNameUtil.getSkippingIndexName(table, 
column);
+        KylinConfig modelConfig = dataflow.getConfig();
         int granularity = 
modelConfig.getSecondStorageSkippingIndexGranularity();
         val render = new ClickHouseRender();
 
         String expr = SkippingIndexChooser
-                
.getSkippingIndexType(layoutEntity.getOrderedDimensions().get(columnId).getType()).toSql(modelConfig);
+                
.getSkippingIndexType(dataflow.getModel().getEffectiveDimensions().get(columnId).getType())
+                .toSql(modelConfig);
         AlterTable alterTable = new AlterTable(tableIdentifier,
                 new AlterTable.ManipulateIndex(indexName, column, expr, 
granularity));
         AlterTable materializeTable = new AlterTable(tableIdentifier,
@@ -122,8 +121,9 @@ public class RefreshSecondaryIndex {
         clickHouse.apply(materializeTable.toSql(render));
     }
 
-    private void deleteSkippingIndex(ClickHouse clickHouse, TableIdentifier 
tableIdentifier, String indexName,
+    private void deleteSkippingIndex(ClickHouse clickHouse, TableIdentifier 
tableIdentifier, int columnId,
             Set<String> existSkipIndex) throws SQLException {
+        String indexName = ClickHouseNameUtil.getSkippingIndexName(table, 
getPrefixColumn(String.valueOf(columnId)));
         if (!existSkipIndex.contains(indexName)) {
             return;
         }
@@ -150,8 +150,4 @@ public class RefreshSecondaryIndex {
 
         return Sets.newHashSet();
     }
-
-    enum Type {
-        ADD, DELETE;
-    }
 }
diff --git 
a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
 
b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
index eb43fddf83..b0886204aa 100644
--- 
a/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
+++ 
b/src/second-storage/core-ui/src/main/java/io/kyligence/kap/secondstorage/management/SecondStorageService.java
@@ -1158,14 +1158,14 @@ public class SecondStorageService extends BasicService 
implements SecondStorageU
         SecondStorageUtil.validateProjectLock(project, 
Collections.singletonList(LockTypeEnum.LOAD.name()));
         List<AbstractExecutable> jobs = getRelationJobsWithoutFinish(project, 
modelId);
         if (!jobs.isEmpty()) {
-            throw new 
KylinException(JobErrorCode.SECOND_STORAGE_PROJECT_JOB_EXISTS,
-                    String.format(Locale.ROOT, 
MsgPicker.getMsg().getSecondStorageProjectJobExists(), project));
+            throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS,
+                    String.format(Locale.ROOT, 
MsgPicker.getMsg().getSecondStorageConcurrentOperate(), project));
         }
         jobs = getJobs(project, modelId, 
Sets.newHashSet(ExecutableState.ERROR),
                 
Sets.newHashSet(JobTypeEnum.SECOND_STORAGE_REFRESH_SECONDARY_INDEXES));
         if (!jobs.isEmpty()) {
-            throw new 
KylinException(JobErrorCode.SECOND_STORAGE_PROJECT_JOB_EXISTS,
-                    String.format(Locale.ROOT, 
MsgPicker.getMsg().getSecondStorageProjectJobExists(), project));
+            throw new KylinException(JobErrorCode.SECOND_STORAGE_JOB_EXISTS,
+                    String.format(Locale.ROOT, 
MsgPicker.getMsg().getSecondStorageConcurrentOperate(), project));
         }
     }
 

Reply via email to