This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new fd860825c06 [improvement](checkpoint) checkpoint thread update tablet invert index #25098 (#25388) fd860825c06 is described below commit fd860825c06049f8883b361a35858f382899b6b8 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Fri Oct 13 16:23:59 2023 +0800 [improvement](checkpoint) checkpoint thread update tablet invert index #25098 (#25388) Checkpoint thread doesn't update tablet invert index, then in checkpoint thread TabletInvertedIndex.getTablet/getReplica will return null. It may cause some problems. Fix this, let checkpoint thread also update tablet invert index. --- .../doris/alter/MaterializedViewHandler.java | 8 +- .../apache/doris/catalog/CatalogRecycleBin.java | 6 +- .../main/java/org/apache/doris/catalog/Env.java | 9 +- .../apache/doris/catalog/TabletInvertedIndex.java | 12 --- .../org/apache/doris/catalog/TempPartitions.java | 2 +- .../apache/doris/datasource/InternalCatalog.java | 110 ++++++++++----------- 6 files changed, 61 insertions(+), 86 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index b9f1ba3585f..bf79c8a7359 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -1010,11 +1010,9 @@ public class MaterializedViewHandler extends AlterHandler { for (Partition partition : olapTable.getPartitions()) { MaterializedIndex rollupIndex = partition.deleteRollupIndex(rollupIndexId); - if (!Env.isCheckpointThread()) { - // remove from inverted index - for (Tablet tablet : rollupIndex.getTablets()) { - invertedIndex.deleteTablet(tablet.getId()); - } + // remove from inverted index + for (Tablet tablet : rollupIndex.getTablets()) { + invertedIndex.deleteTablet(tablet.getId()); } } String rollupIndexName = olapTable.getIndexNameById(rollupIndexId); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java index 759a34ad445..890a76ee24b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java @@ -409,7 +409,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { RecycleTableInfo tableInfo = idToTable.remove(tableId); idToRecycleTime.remove(tableId); Table table = tableInfo.getTable(); - if (table.getType() == TableType.OLAP && !Env.isCheckpointThread()) { + if (table.getType() == TableType.OLAP) { Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, true); } LOG.info("replay erase table[{}]", tableId); @@ -519,9 +519,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable { } Partition partition = partitionInfo.getPartition(); - if (!Env.isCheckpointThread()) { - Env.getCurrentEnv().onErasePartition(partition); - } + Env.getCurrentEnv().onErasePartition(partition); LOG.info("replay erase partition[{}]", partitionId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index ea4410f60c6..6748a8b4afc 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1888,10 +1888,8 @@ public class Env { public long loadRecycleBin(DataInputStream dis, long checksum) throws IOException { recycleBin.readFields(dis); - if (!isCheckpointThread()) { - // add tablet in Recycle bin to TabletInvertedIndex - recycleBin.addTabletToInvertedIndex(); - } + // add tablet in Recycle bin to TabletInvertedIndex + recycleBin.addTabletToInvertedIndex(); // create DatabaseTransactionMgr for db in recycle bin. // these dbs do not exist in `idToDb` of the catalog. for (Long dbId : recycleBin.getAllDbIds()) { @@ -5281,7 +5279,7 @@ public class Env { } } - if (!isReplay) { + if (!isReplay && !Env.isCheckpointThread()) { // drop all replicas AgentBatchTask batchTask = new AgentBatchTask(); for (Partition partition : olapTable.getAllPartitions()) { @@ -5305,6 +5303,7 @@ public class Env { AgentTaskExecutor.submit(batchTask); } + // TODO: does checkpoint need update colocate index ? // colocation Env.getCurrentColocateIndex().removeTable(olapTable.getId()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index ece6ebd8b5b..2b601f9f030 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -506,9 +506,6 @@ public class TabletInvertedIndex { // always add tablet before adding replicas public void addTablet(long tabletId, TabletMeta tabletMeta) { - if (Env.isCheckpointThread()) { - return; - } long stamp = writeLock(); try { if (tabletMetaMap.containsKey(tabletId)) { @@ -527,9 +524,6 @@ public class TabletInvertedIndex { } public void deleteTablet(long tabletId) { - if (Env.isCheckpointThread()) { - return; - } long stamp = writeLock(); try { Map<Long, Replica> replicas = replicaMetaTable.rowMap().remove(tabletId); @@ -555,9 +549,6 @@ public class TabletInvertedIndex { } public void addReplica(long tabletId, Replica replica) { - if (Env.isCheckpointThread()) { - return; - } long stamp = writeLock(); try { Preconditions.checkState(tabletMetaMap.containsKey(tabletId)); @@ -572,9 +563,6 @@ public class TabletInvertedIndex { } public void deleteReplica(long tabletId, long backendId) { - if (Env.isCheckpointThread()) { - return; - } long stamp = writeLock(); try { Preconditions.checkState(tabletMetaMap.containsKey(tabletId)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java index f64d5bed0c3..9cd2d61bf91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TempPartitions.java @@ -77,7 +77,7 @@ public class TempPartitions implements Writable, GsonPostProcessable { if (partition != null) { idToPartition.remove(partition.getId()); nameToPartition.remove(partitionName); - if (!Env.isCheckpointThread() && needDropTablet) { + if (needDropTablet) { TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { for (Tablet tablet : index.getTablets()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 65c8f885604..283ae2dcc67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -361,10 +361,6 @@ public class InternalCatalog implements CatalogIf<Database> { * create the tablet inverted index from metadata. */ public void recreateTabletInvertIndex() { - if (Env.isCheckpointThread()) { - return; - } - // create inverted index TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (Database db : this.fullNameToDb.values()) { @@ -1296,31 +1292,31 @@ public class InternalCatalog implements CatalogIf<Database> { } catch (DdlException e) { throw new MetaNotFoundException(e.getMessage()); } - if (!Env.isCheckpointThread()) { - // add to inverted index - if (table.isManagedTable()) { - TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - OlapTable olapTable = (OlapTable) table; - long dbId = db.getId(); - long tableId = table.getId(); - for (Partition partition : olapTable.getAllPartitions()) { - long partitionId = partition.getId(); - TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId) - .getStorageMedium(); - for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) { - long indexId = mIndex.getId(); - int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - for (Tablet tablet : mIndex.getTablets()) { - TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, - medium); - long tabletId = tablet.getId(); - invertedIndex.addTablet(tabletId, tabletMeta); - for (Replica replica : tablet.getReplicas()) { - invertedIndex.addReplica(tabletId, replica); - } + // add to inverted index + if (table.isManagedTable()) { + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + OlapTable olapTable = (OlapTable) table; + long dbId = db.getId(); + long tableId = table.getId(); + for (Partition partition : olapTable.getAllPartitions()) { + long partitionId = partition.getId(); + TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId) + .getStorageMedium(); + for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) { + long indexId = mIndex.getId(); + int schemaHash = olapTable.getSchemaHashByIndexId(indexId); + for (Tablet tablet : mIndex.getTablets()) { + TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, + medium); + long tabletId = tablet.getId(); + invertedIndex.addTablet(tabletId, tabletMeta); + for (Replica replica : tablet.getReplicas()) { + invertedIndex.addReplica(tabletId, replica); } } - } // end for partitions + } + } // end for partitions + if (!Env.isCheckpointThread()) { DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(dbId, olapTable, true); } } @@ -1695,20 +1691,18 @@ public class InternalCatalog implements CatalogIf<Database> { partitionInfo.unprotectHandleNewSinglePartitionDesc(partition.getId(), info.isTempPartition(), partitionItem, info.getDataProperty(), info.getReplicaAlloc(), info.isInMemory(), info.isMutable()); - if (!Env.isCheckpointThread()) { - // add to inverted index - TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { - long indexId = index.getId(); - int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - for (Tablet tablet : index.getTablets()) { - TabletMeta tabletMeta = new TabletMeta(info.getDbId(), info.getTableId(), partition.getId(), - index.getId(), schemaHash, info.getDataProperty().getStorageMedium()); - long tabletId = tablet.getId(); - invertedIndex.addTablet(tabletId, tabletMeta); - for (Replica replica : tablet.getReplicas()) { - invertedIndex.addReplica(tabletId, replica); - } + // add to inverted index + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) { + long indexId = index.getId(); + int schemaHash = olapTable.getSchemaHashByIndexId(indexId); + for (Tablet tablet : index.getTablets()) { + TabletMeta tabletMeta = new TabletMeta(info.getDbId(), info.getTableId(), partition.getId(), + index.getId(), schemaHash, info.getDataProperty().getStorageMedium()); + long tabletId = tablet.getId(); + invertedIndex.addTablet(tabletId, tabletMeta); + for (Replica replica : tablet.getReplicas()) { + invertedIndex.addReplica(tabletId, replica); } } } @@ -3054,24 +3048,22 @@ public class InternalCatalog implements CatalogIf<Database> { try { truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable()); - if (!Env.isCheckpointThread()) { - // add tablet to inverted index - TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); - for (Partition partition : info.getPartitions()) { - long partitionId = partition.getId(); - TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId) - .getStorageMedium(); - for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) { - long indexId = mIndex.getId(); - int schemaHash = olapTable.getSchemaHashByIndexId(indexId); - for (Tablet tablet : mIndex.getTablets()) { - TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partitionId, indexId, - schemaHash, medium); - long tabletId = tablet.getId(); - invertedIndex.addTablet(tabletId, tabletMeta); - for (Replica replica : tablet.getReplicas()) { - invertedIndex.addReplica(tabletId, replica); - } + // add tablet to inverted index + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + for (Partition partition : info.getPartitions()) { + long partitionId = partition.getId(); + TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId) + .getStorageMedium(); + for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) { + long indexId = mIndex.getId(); + int schemaHash = olapTable.getSchemaHashByIndexId(indexId); + for (Tablet tablet : mIndex.getTablets()) { + TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partitionId, indexId, + schemaHash, medium); + long tabletId = tablet.getId(); + invertedIndex.addTablet(tabletId, tabletMeta); + for (Replica replica : tablet.getReplicas()) { + invertedIndex.addReplica(tabletId, replica); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org