This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 59f05cd5725604cff96df3f89a39454a73deb169 Author: Shuai li <loney...@live.cn> AuthorDate: Thu Oct 13 18:01:11 2022 +0800 fix second storage skipping index status error after rebalance --- .../kap/secondstorage/SecondStorageIndexTest.java | 10 ++ .../metadata/ClickHouseMetadataOperator.java | 101 +++++++++++---------- .../kap/secondstorage/metadata/TableEntity.java | 9 +- 3 files changed, 65 insertions(+), 55 deletions(-) diff --git a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java index 767b045172..38065420ca 100644 --- a/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java +++ b/src/second-storage/clickhouse-it/src/test/java/io/kyligence/kap/secondstorage/SecondStorageIndexTest.java @@ -488,6 +488,16 @@ public class SecondStorageIndexTest implements JobWaiter { assertEquals(SecondStorageIndexLoadStatus.ALL, r.getSecondaryIndexStatus()); }); + secondStorageService.sizeInNode(getProject()); + EnvelopeResponse<List<SecondStorageIndexResponse>> res3 = secondStorageEndpoint.listIndex(getProject(), + modelName); + assertEquals(KylinException.CODE_SUCCESS, res3.getCode()); + assertEquals(1, res3.getData().size()); + res3.getData().forEach(r -> { + assertEquals(SecondStorageIndexLoadStatus.ALL, r.getPrimaryIndexStatus()); + assertEquals(SecondStorageIndexLoadStatus.ALL, r.getSecondaryIndexStatus()); + }); + secondStorageService.triggerSegmentsClean(getProject(), modelId, getDataFlow(modelId).getSegments().stream().map(NDataSegment::getId).collect(Collectors.toSet())); waitAllJoEnd(); diff --git a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/metadata/ClickHouseMetadataOperator.java b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/metadata/ClickHouseMetadataOperator.java index 18a04037a0..6185ba5765 100644 --- a/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/metadata/ClickHouseMetadataOperator.java +++ b/src/second-storage/clickhouse/src/main/java/io/kyligence/kap/clickhouse/metadata/ClickHouseMetadataOperator.java @@ -17,13 +17,28 @@ */ package io.kyligence.kap.clickhouse.metadata; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.transaction.UnitOfWork; +import org.apache.kylin.metadata.project.EnhancedUnitOfWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.Preconditions; + import io.kyligence.kap.clickhouse.job.ClickHouse; import io.kyligence.kap.clickhouse.job.ClickHouseTableStorageMetric; import io.kyligence.kap.clickhouse.parser.ExistsQueryParser; import io.kyligence.kap.clickhouse.parser.ShowCreateQueryParser; -import org.apache.kylin.common.persistence.transaction.UnitOfWork; -import org.apache.kylin.metadata.project.EnhancedUnitOfWork; import io.kyligence.kap.secondstorage.SecondStorageConstants; import io.kyligence.kap.secondstorage.SecondStorageNodeHelper; import io.kyligence.kap.secondstorage.SecondStorageQueryRouteUtil; @@ -49,19 +64,6 @@ import io.kyligence.kap.secondstorage.response.SizeInNodeResponse; import io.kyligence.kap.secondstorage.response.TableSyncResponse; import io.kyligence.kap.secondstorage.util.SecondStorageSqlUtils; import lombok.val; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.kylin.common.KylinConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; public class ClickHouseMetadataOperator implements MetadataOperator { private static final Logger logger = LoggerFactory.getLogger(ClickHouseMetadataOperator.class); @@ -80,13 +82,10 @@ public class ClickHouseMetadataOperator implements MetadataOperator { KylinConfig config = KylinConfig.getInstanceFromEnv(); List<NodeGroup> nodeGroups = SecondStorageUtil.listNodeGroup(config, project); - Set<String> nodes = nodeGroups.stream() - .flatMap(x -> x.getNodeNames().stream()) - .collect(Collectors.toSet()); + Set<String> nodes = nodeGroups.stream().flatMap(x -> x.getNodeNames().stream()).collect(Collectors.toSet()); List<TableFlow> tableFlows = SecondStorageUtil.listTableFlow(config, project); - tableFlows = tableFlows.stream() - .filter(x -> x.getTableDataList() != null && x.getTableDataList().size() > 0) + tableFlows = tableFlows.stream().filter(x -> x.getTableDataList() != null && x.getTableDataList().size() > 0) .collect(Collectors.toList()); if (tableFlows.isEmpty()) { @@ -97,9 +96,7 @@ public class ClickHouseMetadataOperator implements MetadataOperator { //one project one database String database = tableFlows.get(0).getTableDataList().get(0).getDatabase(); - Set<String> tables = tableFlows.stream() - .flatMap(x -> x.getTableDataList().stream()) - .map(TableData::getTable) + Set<String> tables = tableFlows.stream().flatMap(x -> x.getTableDataList().stream()).map(TableData::getTable) .collect(Collectors.toSet()); Map<String, String> tableCreateSqlMap = new HashMap<>(); @@ -112,17 +109,24 @@ public class ClickHouseMetadataOperator implements MetadataOperator { try (ClickHouse clickHouse = new ClickHouse(SecondStorageNodeHelper.resolve(node))) { if (databaseCreateSql == null) { - int existCode = clickHouse.query(new ExistsDatabase(database).toSql(), ExistsQueryParser.EXISTS).get(0); + int existCode = clickHouse.query(new ExistsDatabase(database).toSql(), ExistsQueryParser.EXISTS) + .get(0); if (existCode == 1) { - databaseCreateSql = clickHouse.query(new ShowCreateDatabase(database).toSql(), ShowCreateQueryParser.SHOW_CREATE).get(0); + databaseCreateSql = clickHouse + .query(new ShowCreateDatabase(database).toSql(), ShowCreateQueryParser.SHOW_CREATE) + .get(0); } } for (String table : tables) { if (tableCreateSqlMap.get(table) == null) { - int existCode = clickHouse.query(new ExistsTable(TableIdentifier.table(database, table)).toSql(), ExistsQueryParser.EXISTS).get(0); + int existCode = clickHouse + .query(new ExistsTable(TableIdentifier.table(database, table)).toSql(), + ExistsQueryParser.EXISTS) + .get(0); if (existCode == 1) { tableCreateSqlMap.put(table, - SecondStorageSqlUtils.addIfNotExists(clickHouse.query(new ShowCreateTable(TableIdentifier.table(database, table)).toSql(), + SecondStorageSqlUtils.addIfNotExists(clickHouse.query( + new ShowCreateTable(TableIdentifier.table(database, table)).toSql(), ShowCreateQueryParser.SHOW_CREATE).get(0), "TABLE") ); } @@ -150,15 +154,15 @@ public class ClickHouseMetadataOperator implements MetadataOperator { return new TableSyncResponse(project, new ArrayList<>(nodes), database, new ArrayList<>(tables)); } - private NodeGroup getNodeGroup(List<NodeGroup> nodeGroups, Set<String> existShardNodes){ + private NodeGroup getNodeGroup(List<NodeGroup> nodeGroups, Set<String> existShardNodes) { Preconditions.checkArgument(!nodeGroups.isEmpty()); val existShardNodesList = new ArrayList<>(existShardNodes); NodeGroup addGroup = nodeGroups.get(0); if (existShardNodesList.size() > 0) { - for (NodeGroup nodeGroup : nodeGroups){ + for (NodeGroup nodeGroup : nodeGroups) { val nodeNames = nodeGroup.getNodeNames(); val item = existShardNodesList.get(0); - if (nodeNames.contains(item)){ + if (nodeNames.contains(item)) { addGroup = nodeGroup; break; } @@ -169,7 +173,8 @@ public class ClickHouseMetadataOperator implements MetadataOperator { @Override public SizeInNodeResponse sizeInNode() { - SecondStorageProjectModelSegment projectModelSegment = properties.get(new ConfigOption<>(SecondStorageConstants.PROJECT_MODEL_SEGMENT_PARAM, SecondStorageProjectModelSegment.class)); + SecondStorageProjectModelSegment projectModelSegment = properties.get(new ConfigOption<>( + SecondStorageConstants.PROJECT_MODEL_SEGMENT_PARAM, SecondStorageProjectModelSegment.class)); String project = projectModelSegment.getProject(); Map<String, SecondStorageModelSegment> modelSegmentMap = projectModelSegment.getModelSegmentMap(); KylinConfig config = KylinConfig.getInstanceFromEnv(); @@ -177,9 +182,7 @@ public class ClickHouseMetadataOperator implements MetadataOperator { List<TableFlow> tableFlows = SecondStorageUtil.listTableFlow(config, project); List<NodeGroup> nodeGroups = SecondStorageUtil.listNodeGroup(config, project); - Set<String> nodes = nodeGroups.stream() - .flatMap(x -> x.getNodeNames().stream()) - .collect(Collectors.toSet()); + Set<String> nodes = nodeGroups.stream().flatMap(x -> x.getNodeNames().stream()).collect(Collectors.toSet()); ClickHouseTableStorageMetric storageMetric = new ClickHouseTableStorageMetric(new ArrayList<>(nodes)); storageMetric.collect(true); @@ -191,28 +194,28 @@ public class ClickHouseMetadataOperator implements MetadataOperator { val newTablePartitions = new ArrayList<TablePartition>(); for (TablePartition tablePartition : tablePartitions) { SecondStorageModelSegment modelSegment = modelSegmentMap.get(tableFlow.getUuid()); - SecondStorageSegment secondStorageSegment = modelSegment.getSegmentMap().get(tablePartition.getSegmentId()); - Map<String, Long> sizeInNodeMap = storageMetric.getByPartitions(tableData.getDatabase(), tableData.getTable(), secondStorageSegment.getSegmentRange(), modelSegment.getDateFormat()); + SecondStorageSegment secondStorageSegment = modelSegment.getSegmentMap() + .get(tablePartition.getSegmentId()); + Map<String, Long> sizeInNodeMap = storageMetric.getByPartitions(tableData.getDatabase(), + tableData.getTable(), secondStorageSegment.getSegmentRange(), + modelSegment.getDateFormat()); Set<String> existShardNodes = new HashSet<>(tablePartition.getShardNodes()); NodeGroup addGroup = getNodeGroup(nodeGroups, existShardNodes); List<String> addShardNodes = addGroup.getNodeNames().stream() - .filter(node -> !existShardNodes.contains(node)) - .collect(Collectors.toList()); + .filter(node -> !existShardNodes.contains(node)).collect(Collectors.toList()); - tablePartition.getSizeInNode().entrySet().forEach( - e -> e.setValue(sizeInNodeMap.getOrDefault(e.getKey(), 0L)) - ); + tablePartition.getSizeInNode().entrySet() + .forEach(e -> e.setValue(sizeInNodeMap.getOrDefault(e.getKey(), 0L))); List<String> shardNodes = new ArrayList<>(tablePartition.getShardNodes()); shardNodes.addAll(addShardNodes); Map<String, Long> sizeInNode = new HashMap<>(tablePartition.getSizeInNode()); - sizeInNode.entrySet().forEach( - e -> e.setValue(sizeInNodeMap.getOrDefault(e.getKey(), 0L)) - ); + sizeInNode.entrySet().forEach(e -> e.setValue(sizeInNodeMap.getOrDefault(e.getKey(), 0L))); - Map<String, List<SegmentFileStatus>> nodeFileMap = new HashMap<>(tablePartition.getNodeFileMap()); + Map<String, List<SegmentFileStatus>> nodeFileMap = new HashMap<>( + tablePartition.getNodeFileMap()); for (String node : addShardNodes) { sizeInNode.put(node, sizeInNodeMap.getOrDefault(node, 0L)); @@ -220,11 +223,9 @@ public class ClickHouseMetadataOperator implements MetadataOperator { } TablePartition.Builder builder = new TablePartition.Builder(); - builder.setId(tablePartition.getId()) - .setSegmentId(tablePartition.getSegmentId()) - .setShardNodes(shardNodes) - .setSizeInNode(sizeInNode) - .setNodeFileMap(nodeFileMap); + builder.setId(tablePartition.getId()).setSegmentId(tablePartition.getSegmentId()) + .setShardNodes(shardNodes).setSizeInNode(sizeInNode).setNodeFileMap(nodeFileMap) + .setSecondaryIndexColumns(tablePartition.getSecondaryIndexColumns()); newTablePartitions.add(builder.build()); } newTablePartitions.forEach(tableData::addPartition); diff --git a/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/metadata/TableEntity.java b/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/metadata/TableEntity.java index c21feb7ebd..3bc5719b8a 100644 --- a/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/metadata/TableEntity.java +++ b/src/second-storage/core/src/main/java/io/kyligence/kap/secondstorage/metadata/TableEntity.java @@ -31,11 +31,7 @@ import io.kyligence.kap.guava20.shaded.common.collect.Lists; import io.kyligence.kap.guava20.shaded.common.collect.Sets; import io.kyligence.kap.secondstorage.metadata.annotation.TableDefinition; -@JsonAutoDetect( - fieldVisibility = JsonAutoDetect.Visibility.NONE, - getterVisibility = JsonAutoDetect.Visibility.NONE, - isGetterVisibility = JsonAutoDetect.Visibility.NONE, - setterVisibility = JsonAutoDetect.Visibility.NONE) +@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) @TableDefinition public class TableEntity implements Serializable, WithLayout { @@ -67,13 +63,16 @@ public class TableEntity implements Serializable, WithLayout { table.layoutID = layoutEntity.getId(); if (primaryIndexColumns != null) { table.primaryIndexColumns = primaryIndexColumns; + table.primaryIndexLastModified = System.currentTimeMillis(); } if (secondaryIndexColumns != null) { table.secondaryIndexColumns = secondaryIndexColumns; + table.secondaryIndexLastModified = System.currentTimeMillis(); } return table; } } + public static Builder builder() { return new Builder(); }