This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e7f378fec6 [Enhancement](IdGenerator) Use IdGeneratorBuffer to get better performance for getNextId operation when create table, truncate table, add partition and so on (#11479) e7f378fec6 is described below commit e7f378fec6d1fc8201a9d714bee18e9624d2d2e2 Author: caiconghui <55968745+caicong...@users.noreply.github.com> AuthorDate: Thu Aug 4 11:21:35 2022 +0800 [Enhancement](IdGenerator) Use IdGeneratorBuffer to get better performance for getNextId operation when create table, truncate table, add partition and so on (#11479) Co-authored-by: caiconghui1 <caicongh...@jd.com> --- .../main/java/org/apache/doris/catalog/Env.java | 7 ++- ...atalogIdGenerator.java => MetaIdGenerator.java} | 40 +++++++++++--- .../apache/doris/common/util/IdGeneratorUtil.java | 61 +++++++++++++++++++++ .../doris/datasource/InternalDataSource.java | 62 ++++++++++++---------- .../doris/transaction/DatabaseTransactionMgr.java | 2 +- 5 files changed, 137 insertions(+), 35 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 55b1b5e142..c6bd37d08a 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -83,6 +83,7 @@ import org.apache.doris.blockrule.SqlBlockRuleMgr; import org.apache.doris.catalog.ColocateTableIndex.GroupId; import org.apache.doris.catalog.DistributionInfo.DistributionInfoType; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; +import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer; import org.apache.doris.catalog.OlapTable.OlapTableState; import org.apache.doris.catalog.Replica.ReplicaStatus; import org.apache.doris.catalog.TableIf.TableType; @@ -344,7 +345,7 @@ public class Env { private int masterHttpPort; private String masterIp; - private CatalogIdGenerator idGenerator = new CatalogIdGenerator(NEXT_ID_INIT_VALUE); + private MetaIdGenerator idGenerator = new MetaIdGenerator(NEXT_ID_INIT_VALUE); private EditLog editLog; private int clusterId; @@ -3205,6 +3206,10 @@ public class Env { return idGenerator.getNextId(); } + public IdGeneratorBuffer getIdGeneratorBuffer(long bufferSize) { + return idGenerator.getIdGeneratorBuffer(bufferSize); + } + public HashMap<Long, TStorageMedium> getPartitionIdToStorageMediumMap() { HashMap<Long, TStorageMedium> storageMediumMap = new HashMap<Long, TStorageMedium>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogIdGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetaIdGenerator.java similarity index 64% rename from fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogIdGenerator.java rename to fe/fe-core/src/main/java/org/apache/doris/catalog/MetaIdGenerator.java index 70c94c728b..d2f16a6fc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogIdGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetaIdGenerator.java @@ -19,10 +19,12 @@ package org.apache.doris.catalog; import org.apache.doris.persist.EditLog; +import com.google.common.base.Preconditions; + // This new Id generator is just same as TransactionIdGenerator. // But we can't just use TransactionIdGenerator to replace the old catalog's 'nextId' for compatibility reason. // cause they are using different edit log operation type. -public class CatalogIdGenerator { +public class MetaIdGenerator { private static final int BATCH_ID_INTERVAL = 1000; private long nextId; @@ -30,7 +32,7 @@ public class CatalogIdGenerator { private EditLog editLog; - public CatalogIdGenerator(long initValue) { + public MetaIdGenerator(long initValue) { nextId = initValue + 1; batchEndId = initValue; } @@ -41,16 +43,27 @@ public class CatalogIdGenerator { // performance is more quickly public synchronized long getNextId() { - if (nextId < batchEndId) { - return nextId++; - } else { + if (nextId >= batchEndId) { batchEndId = batchEndId + BATCH_ID_INTERVAL; if (editLog != null) { // add this check just for unit test editLog.logSaveNextId(batchEndId); } - return nextId++; } + return nextId++; + } + + public synchronized IdGeneratorBuffer getIdGeneratorBuffer(long bufferSize) { + Preconditions.checkState(bufferSize > 0); + IdGeneratorBuffer idGeneratorBuffer = new IdGeneratorBuffer(nextId, nextId + bufferSize - 1); + nextId = nextId + bufferSize; + if (nextId > batchEndId) { + batchEndId = batchEndId + (bufferSize / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL; + if (editLog != null) { + editLog.logSaveNextId(batchEndId); + } + } + return idGeneratorBuffer; } public synchronized void setId(long id) { @@ -64,4 +77,19 @@ public class CatalogIdGenerator { public long getBatchEndId() { return batchEndId; } + + public class IdGeneratorBuffer { + private long nextId; + private long batchEndId; + + private IdGeneratorBuffer(long nextId, long batchEndId) { + this.nextId = nextId; + this.batchEndId = batchEndId; + } + + public long getNextId() { + Preconditions.checkState(nextId <= batchEndId); + return nextId++; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/IdGeneratorUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/IdGeneratorUtil.java new file mode 100644 index 0000000000..b9609d409a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/IdGeneratorUtil.java @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.SinglePartitionDesc; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.ReplicaAllocation; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; + +import java.util.Collection; + +public class IdGeneratorUtil { + + public static long getBufferSize(CreateTableStmt stmt, ReplicaAllocation replicaAlloc) throws DdlException, + AnalysisException { + long bufferSize = 1; + long partitionNum = stmt.getPartitionDesc() == null ? 1 : + stmt.getPartitionDesc().getSinglePartitionDescs().size(); + long indexNum = stmt.getRollupAlterClauseList().size() + 1; + long bucketNum = stmt.getDistributionDesc().toDistributionInfo(stmt.getColumns()).getBucketNum(); + bufferSize = bufferSize + partitionNum + indexNum; + if (stmt.getPartitionDesc() == null) { + bufferSize = bufferSize + (replicaAlloc.getTotalReplicaNum() + 1) * indexNum * bucketNum; + } else { + for (SinglePartitionDesc partitionDesc : stmt.getPartitionDesc().getSinglePartitionDescs()) { + long replicaNum = partitionDesc.getReplicaAlloc().getTotalReplicaNum(); + bufferSize = bufferSize + (replicaNum + 1) * indexNum * bucketNum; + } + } + return bufferSize; + } + + public static long getBufferSize(OlapTable table, Collection<Long> partitionIds) { + long bufferSize = 0; + for (Long partitionId : partitionIds) { + bufferSize = bufferSize + 1; + long replicaNum = table.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum(); + long indexNum = table.getIndexIdToMeta().size(); + long bucketNum = table.getPartition(partitionId).getDistributionInfo().getBucketNum(); + bufferSize = bufferSize + (replicaNum + 1) * indexNum * bucketNum; + } + return bufferSize; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java index cb484d2820..027c7dd9c4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java @@ -85,6 +85,7 @@ import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; import org.apache.doris.catalog.MaterializedIndex.IndexState; import org.apache.doris.catalog.MaterializedIndexMeta; +import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer; import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.OdbcTable; import org.apache.doris.catalog.OlapTable; @@ -125,6 +126,7 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.util.DynamicPartitionUtil; +import org.apache.doris.common.util.IdGeneratorUtil; import org.apache.doris.common.util.MetaLockUtils; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.QueryableReentrantLock; @@ -1312,16 +1314,18 @@ public class InternalDataSource implements DataSourceIf<Database> { throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing " + totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]"); } - Set<Long> tabletIdSet = new HashSet<Long>(); + Set<Long> tabletIdSet = new HashSet<>(); + long bufferSize = 1 + totalReplicaNum + indexNum * bucketNum; + IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize); try { - long partitionId = Env.getCurrentEnv().getNextId(); + long partitionId = idGeneratorBuffer.getNextId(); Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(), olapTable.getBaseIndexId(), partitionId, partitionName, indexIdToMeta, distributionInfo, dataProperty.getStorageMedium(), singlePartitionDesc.getReplicaAlloc(), singlePartitionDesc.getVersionInfo(), bfColumns, olapTable.getBfFpp(), tabletIdSet, olapTable.getCopiedIndexes(), singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(), singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo(), - olapTable.getEnableUniqueKeyMergeOnWrite(), olapTable.getStoragePolicy()); + olapTable.getEnableUniqueKeyMergeOnWrite(), olapTable.getStoragePolicy(), idGeneratorBuffer); // check again table = db.getOlapTableOrDdlException(tableName); @@ -1542,7 +1546,8 @@ public class InternalDataSource implements DataSourceIf<Database> { DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc, Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes, boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType, - DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy) throws DdlException { + DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy, + IdGeneratorBuffer idGeneratorBuffer) throws DdlException { // create base index first. Preconditions.checkArgument(baseIndexId != -1); MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL); @@ -1580,7 +1585,7 @@ public class InternalDataSource implements DataSourceIf<Database> { int schemaHash = indexMeta.getSchemaHash(); TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium); createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, replicaAlloc, tabletMeta, - tabletIdSet); + tabletIdSet, idGeneratorBuffer); boolean ok = false; String errMsg = null; @@ -1659,15 +1664,23 @@ public class InternalDataSource implements DataSourceIf<Database> { List<Column> baseSchema = stmt.getColumns(); validateColumns(baseSchema); + // analyze replica allocation + ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(stmt.getProperties(), ""); + if (replicaAlloc.isNotSet()) { + replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; + } + + long bufferSize = IdGeneratorUtil.getBufferSize(stmt, replicaAlloc); + IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize); + // create partition info PartitionDesc partitionDesc = stmt.getPartitionDesc(); PartitionInfo partitionInfo = null; Map<String, Long> partitionNameToId = Maps.newHashMap(); if (partitionDesc != null) { - // gen partition id first PartitionDesc partDesc = partitionDesc; for (SinglePartitionDesc desc : partDesc.getSinglePartitionDescs()) { - long partitionId = Env.getCurrentEnv().getNextId(); + long partitionId = idGeneratorBuffer.getNextId(); partitionNameToId.put(desc.getPartitionName(), partitionId); } partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId, false); @@ -1675,7 +1688,7 @@ public class InternalDataSource implements DataSourceIf<Database> { if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(stmt.getProperties())) { throw new DdlException("Only support dynamic partition properties on range partition table"); } - long partitionId = Env.getCurrentEnv().getNextId(); + long partitionId = idGeneratorBuffer.getNextId(); // use table name as single partition name partitionNameToId.put(tableName, partitionId); partitionInfo = new SinglePartitionInfo(); @@ -1699,7 +1712,7 @@ public class InternalDataSource implements DataSourceIf<Database> { TableIndexes indexes = new TableIndexes(stmt.getIndexes()); // create table - long tableId = Env.getCurrentEnv().getNextId(); + long tableId = idGeneratorBuffer.getNextId(); OlapTable olapTable = new OlapTable(tableId, tableName, baseSchema, keysType, partitionInfo, defaultDistributionInfo, indexes); @@ -1711,7 +1724,7 @@ public class InternalDataSource implements DataSourceIf<Database> { olapTable.setComment(stmt.getComment()); // set base index id - long baseIndexId = Env.getCurrentEnv().getNextId(); + long baseIndexId = idGeneratorBuffer.getNextId(); olapTable.setBaseIndexId(baseIndexId); // set base index info to table @@ -1770,11 +1783,6 @@ public class InternalDataSource implements DataSourceIf<Database> { throw new DdlException(e.getMessage()); } - // analyze replica allocation - ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, ""); - if (replicaAlloc.isNotSet()) { - replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION; - } olapTable.setReplicationAllocation(replicaAlloc); // set in memory @@ -1880,7 +1888,7 @@ public class InternalDataSource implements DataSourceIf<Database> { .checkAndPrepareMaterializedView(addRollupClause, olapTable, baseRollupIndex, false); short rollupShortKeyColumnCount = Env.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties()); int rollupSchemaHash = Util.generateSchemaHash(); - long rollupIndexId = Env.getCurrentEnv().getNextId(); + long rollupIndexId = idGeneratorBuffer.getNextId(); olapTable.setIndexMeta(rollupIndexId, addRollupClause.getRollupName(), rollupColumns, schemaVersion, rollupSchemaHash, rollupShortKeyColumnCount, rollupIndexStorageType, keysType); } @@ -1938,7 +1946,8 @@ public class InternalDataSource implements DataSourceIf<Database> { partitionDistributionInfo, partitionInfo.getDataProperty(partitionId).getStorageMedium(), partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat, tabletType, compressionType, - olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy); + olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, + idGeneratorBuffer); olapTable.addPartition(partition); } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) { @@ -1995,7 +2004,8 @@ public class InternalDataSource implements DataSourceIf<Database> { partitionInfo.getReplicaAllocation(entry.getValue()), versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat, partitionInfo.getTabletType(entry.getValue()), compressionType, - olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy); + olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, + idGeneratorBuffer); olapTable.addPartition(partition); } } else { @@ -2059,7 +2069,6 @@ public class InternalDataSource implements DataSourceIf<Database> { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); } LOG.info("successfully create table[{}-{}]", tableName, tableId); - return; } private void createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException { @@ -2073,7 +2082,6 @@ public class InternalDataSource implements DataSourceIf<Database> { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); } LOG.info("successfully create table[{}-{}]", tableName, tableId); - return; } private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException { @@ -2130,8 +2138,6 @@ public class InternalDataSource implements DataSourceIf<Database> { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); } LOG.info("successfully create table[{}-{}]", tableName, tableId); - - return; } private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException { @@ -2200,7 +2206,7 @@ public class InternalDataSource implements DataSourceIf<Database> { private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState, DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta, - Set<Long> tabletIdSet) throws DdlException { + Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer) throws DdlException { ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex(); Map<Tag, List<List<Long>>> backendsPerBucketSeq = null; GroupId groupId = null; @@ -2222,7 +2228,7 @@ public class InternalDataSource implements DataSourceIf<Database> { } for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { // create a new tablet with random chosen backends - Tablet tablet = new Tablet(Env.getCurrentEnv().getNextId()); + Tablet tablet = new Tablet(idGeneratorBuffer.getNextId()); // add tablet to inverted index first index.addTablet(tablet, tabletMeta); @@ -2257,7 +2263,7 @@ public class InternalDataSource implements DataSourceIf<Database> { short totalReplicaNum = (short) 0; for (List<Long> backendIds : chosenBackendIds.values()) { for (long backendId : backendIds) { - long replicaId = Env.getCurrentEnv().getNextId(); + long replicaId = idGeneratorBuffer.getNextId(); Replica replica = new Replica(replicaId, backendId, replicaState, version, tabletMeta.getOldSchemaHash()); tablet.addReplica(replica); @@ -2356,6 +2362,8 @@ public class InternalDataSource implements DataSourceIf<Database> { List<Partition> newPartitions = Lists.newArrayList(); // tabletIdSet to save all newly created tablet ids. Set<Long> tabletIdSet = Sets.newHashSet(); + long bufferSize = IdGeneratorUtil.getBufferSize(copiedTbl, origPartitions.values()); + IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize); try { for (Map.Entry<String, Long> entry : origPartitions.entrySet()) { // the new partition must use new id @@ -2364,7 +2372,7 @@ public class InternalDataSource implements DataSourceIf<Database> { // By using a new id, load job will be aborted(just like partition is dropped), // which is the right behavior. long oldPartitionId = entry.getValue(); - long newPartitionId = Env.getCurrentEnv().getNextId(); + long newPartitionId = idGeneratorBuffer.getNextId(); Partition newPartition = createPartitionWithIndices(db.getClusterName(), db.getId(), copiedTbl.getId(), copiedTbl.getBaseIndexId(), newPartitionId, entry.getKey(), copiedTbl.getIndexIdToMeta(), partitionsDistributionInfo.get(oldPartitionId), @@ -2374,7 +2382,7 @@ public class InternalDataSource implements DataSourceIf<Database> { copiedTbl.isInMemory(), copiedTbl.getStorageFormat(), copiedTbl.getPartitionInfo().getTabletType(oldPartitionId), copiedTbl.getCompressionType(), copiedTbl.getDataSortInfo(), copiedTbl.getEnableUniqueKeyMergeOnWrite(), - olapTable.getStoragePolicy()); + olapTable.getStoragePolicy(), idGeneratorBuffer); newPartitions.add(newPartition); } } catch (DdlException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index c4b4e3954c..99fd47b159 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -326,7 +326,7 @@ public class DatabaseTransactionMgr { checkRunningTxnExceedLimit(sourceType); long tid = idGenerator.getNextTransactionId(); - LOG.info("begin transaction: txn id {} with label {} from coordinator {}, listner id: {}", + LOG.info("begin transaction: txn id {} with label {} from coordinator {}, listener id: {}", tid, label, coordinator, listenerId); TransactionState transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType, coordinator, listenerId, timeoutSecond * 1000); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org