This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8524079c3d3 [opt](txn) limit publishing txns on a table (#54230)
8524079c3d3 is described below
commit 8524079c3d3d40d08b72932402b29ed54314fa63
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu Sep 18 23:20:17 2025 +0800
[opt](txn) limit publishing txns on a table (#54230)
If there are too many publishing txns on a table, there are something
wrong.
Co-authored-by: Yongqiang YANG <[email protected]>
---
.../main/java/org/apache/doris/common/Config.java | 6 ++
.../doris/transaction/DatabaseTransactionMgr.java | 59 ++++++++++++--
.../insert_p0/transaction/test_txn_limit.groovy | 91 ++++++++++++++++++++++
3 files changed, 149 insertions(+), 7 deletions(-)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index b170230aa89..22630badc8b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -572,6 +572,12 @@ public class Config extends ConfigBase {
"The interval of publish task trigger thread, in milliseconds"})
public static int publish_version_interval_ms = 10;
+ @ConfField(mutable = true, masterOnly = true, description = {
+ "If the number of publishing transactions of a table exceeds this
value, new transactions will "
+ + "be rejected. Set to -1 to disable this limit.",
+ "当一个表的发布事务数量超过该值时,新的事务将被拒绝。设置为-1表示不限制。"})
+ public static long max_publishing_txn_num_per_table = 500;
+
@ConfField(description = {"thrift server 的最大 worker 线程数", "The max worker
threads of thrift server"})
public static int thrift_server_max_worker_threads = 4096;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 6263b48fabe..610f62be204 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -145,6 +145,10 @@ public class DatabaseTransactionMgr {
// it must exists in dbIdToTxnLabels, and vice versa
private final Map<String, Set<Long>> labelToTxnIds = Maps.newHashMap();
+ private final Map<Long, Long> tableCommittedTxnCount =
Maps.newConcurrentMap();
+
+ private Long lastCommittedTxnCountUpdateTime = 0L;
+
// count the number of running txns of database
private volatile int runningTxnNums = 0;
@@ -361,7 +365,7 @@ public class DatabaseTransactionMgr {
}
}
- checkRunningTxnExceedLimit();
+ checkRunningTxnExceedLimit(tableIdList);
tid = idGenerator.getNextTransactionId();
TransactionState transactionState = new TransactionState(dbId,
tableIdList,
@@ -1062,17 +1066,45 @@ public class DatabaseTransactionMgr {
}
protected List<TransactionState> getCommittedTxnList() {
+ List<TransactionState> committedTxnList = null;
readLock();
try {
// only send task to committed transaction
- return idToRunningTransactionState.values().stream()
- .filter(transactionState ->
- (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED))
-
.sorted(Comparator.comparing(TransactionState::getCommitTime))
- .collect(Collectors.toList());
+ committedTxnList = idToRunningTransactionState.values().stream()
+ .filter(transactionState ->
+
(transactionState.getTransactionStatus() == TransactionStatus.COMMITTED))
+
.sorted(Comparator.comparing(TransactionState::getCommitTime))
+ .collect(Collectors.toList());
} finally {
readUnlock();
}
+
+ updateCommittedTxnCountPerTable(committedTxnList);
+
+ return committedTxnList;
+ }
+
+ // given list of transactionstate, calculate committed trasactions for
each table.
+ protected void updateCommittedTxnCountPerTable(List<TransactionState>
txnList) {
+ // Control frequency by recording last update time
+ long now = System.currentTimeMillis();
+ // Only update if enough time has passed (e.g., 60 seconds)
+ if (now - this.lastCommittedTxnCountUpdateTime < 60000 ||
Config.max_publishing_txn_num_per_table < 0) {
+ return;
+ }
+
+ this.lastCommittedTxnCountUpdateTime = now;
+ tableCommittedTxnCount.clear();
+ for (TransactionState txn : txnList) {
+ if (txn.getTransactionStatus() == TransactionStatus.COMMITTED) {
+ List<Long> tableIds = txn.getTableIdList();
+ if (tableIds != null) {
+ for (Long tableId : tableIds) {
+ tableCommittedTxnCount.put(tableId,
tableCommittedTxnCount.getOrDefault(tableId, 0L) + 1);
+ }
+ }
+ }
+ }
}
public void finishTransaction(long transactionId, Map<Long, Long>
partitionVisibleVersions,
@@ -2068,13 +2100,26 @@ public class DatabaseTransactionMgr {
return infos;
}
- protected void checkRunningTxnExceedLimit()
+ protected void checkRunningTxnExceedLimit(List<Long> tableIdList)
throws BeginTransactionException, MetaNotFoundException {
long txnQuota =
env.getInternalCatalog().getDbOrMetaException(dbId).getTransactionQuotaSize();
+
if (runningTxnNums >= txnQuota) {
throw new BeginTransactionException("current running txns on db "
+ dbId + " is "
+ runningTxnNums + ", larger than limit " + txnQuota);
}
+
+ // Check if committed txn count on any table exceeds the configured
limit
+ if (Config.max_publishing_txn_num_per_table >= 0) {
+ for (Long tableId : tableIdList) {
+ long committedTxnCount =
tableCommittedTxnCount.getOrDefault(tableId, 0L);
+ if (committedTxnCount >=
Config.max_publishing_txn_num_per_table) {
+ throw new BeginTransactionException("current committed
txns on table " + tableId
+ + " is " + committedTxnCount + ", larger than
limit "
+ + Config.max_publishing_txn_num_per_table);
+ }
+ }
+ }
}
private void updateCatalogAfterCommitted(TransactionState
transactionState, Database db, boolean isReplay) {
diff --git a/regression-test/suites/insert_p0/transaction/test_txn_limit.groovy
b/regression-test/suites/insert_p0/transaction/test_txn_limit.groovy
new file mode 100644
index 00000000000..7ac1ffe481e
--- /dev/null
+++ b/regression-test/suites/insert_p0/transaction/test_txn_limit.groovy
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_txn_limit", 'nonConcurrent') {
+ if (isCloudMode()) {
+ return
+ }
+
+ def configResult = sql "SHOW FRONTEND CONFIG LIKE
'max_running_txn_num_per_db'"
+ logger.info("configResult: ${configResult}")
+ assert configResult.size() == 1
+
+
+ def originTxnNum = configResult[0][1] as long
+ logger.info("max_running_txn_num_per_db: $originTxnNum")
+
+ if (originTxnNum == 0) {
+ originTxnNum = 1000 // default value is 1000, if it is set to 0, we
will use 1000
+ }
+
+ sql "ADMIN SET FRONTEND CONFIG ('max_running_txn_num_per_db' = '0')"
+ configResult = sql "SHOW FRONTEND CONFIG LIKE 'max_running_txn_num_per_db'"
+ logger.info("configResult: ${configResult}")
+
+ def create_db_and_table = { dbName, tableName ->
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql "DROP DATABASE IF EXISTS ${dbName}"
+ sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `user_id` bigint default 999,
+ `group_id` bigint,
+ `id` bigint,
+ `vv` variant,
+ INDEX idx_col1 (user_id) USING INVERTED
+ ) ENGINE=OLAP
+ UNIQUE KEY(user_id, group_id)
+ DISTRIBUTED BY HASH (user_id) BUCKETS 1
+ PROPERTIES(
+ "store_row_column" = "true",
+ "replication_num" = "1"
+ );
+ """
+ }
+
+ def dbName = "test_txn_limit_db"
+ def tableName = "${dbName}.test_txn_limit"
+
+ create_db_and_table("${dbName}", "${tableName}")
+
+ test {
+ sql """insert into ${tableName}
values(1,1,5,'{"b":"b"}'),(1,1,4,'{"b":"b"}'),(1,1,3,'{"b":"b"}')"""
+ exception "current running txns on db"
+ }
+
+ sql "ADMIN SET FRONTEND CONFIG ('max_running_txn_num_per_db' =
'${originTxnNum}')"
+ logger.info("reset max_running_txn_num_per_db to ${originTxnNum}")
+
+ // test max_publishing_txn_num_per_table
+ // this config is used to limit the number of publishing txns on a table
+ dbName = "test_txn_limit_db1"
+ tableName = "${dbName}.test_txn_limit"
+ create_db_and_table("${dbName}", "${tableName}")
+
+ def maxPublishingTxnConfig = sql "SHOW FRONTEND CONFIG LIKE
'max_publishing_txn_num_per_table'"
+ assert maxPublishingTxnConfig.size() == 1
+ def maxPublishingTxnNum = maxPublishingTxnConfig[0][1] as long
+ logger.info("max_publishing_txn_num_per_table:
${maxPublishingTxnConfig[0][1]}")
+
+ sql "ADMIN SET FRONTEND CONFIG ('max_publishing_txn_num_per_table' = '0')"
+ test {
+ sql """insert into ${tableName}
values(1,1,5,'{"b":"b"}'),(1,1,4,'{"b":"b"}'),(1,1,3,'{"b":"b"}')"""
+ exception "current committed txns on table"
+ }
+ sql "ADMIN SET FRONTEND CONFIG ('max_publishing_txn_num_per_table' =
'${maxPublishingTxnNum}')"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]