This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8524079c3d3 [opt](txn) limit publishing txns on a table (#54230)
8524079c3d3 is described below

commit 8524079c3d3d40d08b72932402b29ed54314fa63
Author: Yongqiang YANG <[email protected]>
AuthorDate: Thu Sep 18 23:20:17 2025 +0800

    [opt](txn) limit publishing txns on a table (#54230)
    
    If there are too many publishing txns on a table, there are something
     wrong.
    Co-authored-by: Yongqiang YANG <[email protected]>
---
 .../main/java/org/apache/doris/common/Config.java  |  6 ++
 .../doris/transaction/DatabaseTransactionMgr.java  | 59 ++++++++++++--
 .../insert_p0/transaction/test_txn_limit.groovy    | 91 ++++++++++++++++++++++
 3 files changed, 149 insertions(+), 7 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index b170230aa89..22630badc8b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -572,6 +572,12 @@ public class Config extends ConfigBase {
             "The interval of publish task trigger thread, in milliseconds"})
     public static int publish_version_interval_ms = 10;
 
+    @ConfField(mutable = true, masterOnly = true, description = {
+            "If the number of publishing transactions of a table exceeds this 
value, new transactions will "
+            + "be rejected. Set to -1 to disable this limit.",
+            "当一个表的发布事务数量超过该值时,新的事务将被拒绝。设置为-1表示不限制。"})
+    public static long max_publishing_txn_num_per_table = 500;
+
     @ConfField(description = {"thrift server 的最大 worker 线程数", "The max worker 
threads of thrift server"})
     public static int thrift_server_max_worker_threads = 4096;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 6263b48fabe..610f62be204 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -145,6 +145,10 @@ public class DatabaseTransactionMgr {
     // it must exists in dbIdToTxnLabels, and vice versa
     private final Map<String, Set<Long>> labelToTxnIds = Maps.newHashMap();
 
+    private final Map<Long, Long> tableCommittedTxnCount = 
Maps.newConcurrentMap();
+
+    private Long lastCommittedTxnCountUpdateTime = 0L;
+
     // count the number of running txns of database
     private volatile int runningTxnNums = 0;
 
@@ -361,7 +365,7 @@ public class DatabaseTransactionMgr {
                 }
             }
 
-            checkRunningTxnExceedLimit();
+            checkRunningTxnExceedLimit(tableIdList);
 
             tid = idGenerator.getNextTransactionId();
             TransactionState transactionState = new TransactionState(dbId, 
tableIdList,
@@ -1062,17 +1066,45 @@ public class DatabaseTransactionMgr {
     }
 
     protected List<TransactionState> getCommittedTxnList() {
+        List<TransactionState> committedTxnList = null;
         readLock();
         try {
             // only send task to committed transaction
-            return idToRunningTransactionState.values().stream()
-                    .filter(transactionState ->
-                            (transactionState.getTransactionStatus() == 
TransactionStatus.COMMITTED))
-                    
.sorted(Comparator.comparing(TransactionState::getCommitTime))
-                    .collect(Collectors.toList());
+            committedTxnList = idToRunningTransactionState.values().stream()
+                                    .filter(transactionState ->
+                                            
(transactionState.getTransactionStatus() == TransactionStatus.COMMITTED))
+                                    
.sorted(Comparator.comparing(TransactionState::getCommitTime))
+                                    .collect(Collectors.toList());
         } finally {
             readUnlock();
         }
+
+        updateCommittedTxnCountPerTable(committedTxnList);
+
+        return committedTxnList;
+    }
+
+    // given list of transactionstate, calculate committed trasactions for 
each table.
+    protected void updateCommittedTxnCountPerTable(List<TransactionState> 
txnList) {
+        // Control frequency by recording last update time
+        long now = System.currentTimeMillis();
+        // Only update if enough time has passed (e.g., 60 seconds)
+        if (now - this.lastCommittedTxnCountUpdateTime < 60000 || 
Config.max_publishing_txn_num_per_table < 0) {
+            return;
+        }
+
+        this.lastCommittedTxnCountUpdateTime = now;
+        tableCommittedTxnCount.clear();
+        for (TransactionState txn : txnList) {
+            if (txn.getTransactionStatus() == TransactionStatus.COMMITTED) {
+                List<Long> tableIds = txn.getTableIdList();
+                if (tableIds != null) {
+                    for (Long tableId : tableIds) {
+                        tableCommittedTxnCount.put(tableId, 
tableCommittedTxnCount.getOrDefault(tableId, 0L) + 1);
+                    }
+                }
+            }
+        }
     }
 
     public void finishTransaction(long transactionId, Map<Long, Long> 
partitionVisibleVersions,
@@ -2068,13 +2100,26 @@ public class DatabaseTransactionMgr {
         return infos;
     }
 
-    protected void checkRunningTxnExceedLimit()
+    protected void checkRunningTxnExceedLimit(List<Long> tableIdList)
             throws BeginTransactionException, MetaNotFoundException {
         long txnQuota = 
env.getInternalCatalog().getDbOrMetaException(dbId).getTransactionQuotaSize();
+
         if (runningTxnNums >= txnQuota) {
             throw new BeginTransactionException("current running txns on db " 
+ dbId + " is "
                     + runningTxnNums + ", larger than limit " + txnQuota);
         }
+
+        // Check if committed txn count on any table exceeds the configured 
limit
+        if (Config.max_publishing_txn_num_per_table >= 0) {
+            for (Long tableId : tableIdList) {
+                long committedTxnCount = 
tableCommittedTxnCount.getOrDefault(tableId, 0L);
+                if (committedTxnCount >= 
Config.max_publishing_txn_num_per_table) {
+                    throw new BeginTransactionException("current committed 
txns on table " + tableId
+                            + " is " + committedTxnCount + ", larger than 
limit "
+                            + Config.max_publishing_txn_num_per_table);
+                }
+            }
+        }
     }
 
     private void updateCatalogAfterCommitted(TransactionState 
transactionState, Database db, boolean isReplay) {
diff --git a/regression-test/suites/insert_p0/transaction/test_txn_limit.groovy 
b/regression-test/suites/insert_p0/transaction/test_txn_limit.groovy
new file mode 100644
index 00000000000..7ac1ffe481e
--- /dev/null
+++ b/regression-test/suites/insert_p0/transaction/test_txn_limit.groovy
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_txn_limit", 'nonConcurrent') {
+    if (isCloudMode()) {
+        return
+    }
+
+    def configResult = sql "SHOW FRONTEND CONFIG LIKE 
'max_running_txn_num_per_db'"
+    logger.info("configResult: ${configResult}")
+    assert configResult.size() == 1
+
+
+    def originTxnNum = configResult[0][1] as long
+    logger.info("max_running_txn_num_per_db: $originTxnNum")
+
+    if (originTxnNum == 0) {
+        originTxnNum = 1000 // default value is 1000, if it is set to 0, we 
will use 1000
+    }
+
+    sql "ADMIN SET FRONTEND CONFIG ('max_running_txn_num_per_db' = '0')"
+    configResult = sql "SHOW FRONTEND CONFIG LIKE 'max_running_txn_num_per_db'"
+    logger.info("configResult: ${configResult}")
+
+    def create_db_and_table = { dbName, tableName ->
+        sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+        sql "DROP TABLE IF EXISTS ${tableName}"
+        sql "DROP DATABASE IF EXISTS ${dbName}"
+        sql "CREATE DATABASE IF NOT EXISTS ${dbName}"
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `user_id` bigint default 999,
+                `group_id` bigint,
+                `id` bigint,
+                `vv` variant,
+                INDEX idx_col1 (user_id) USING INVERTED
+                ) ENGINE=OLAP
+            UNIQUE KEY(user_id, group_id)
+            DISTRIBUTED BY HASH (user_id) BUCKETS 1
+            PROPERTIES(
+                    "store_row_column" = "true",
+                    "replication_num" = "1"
+                    );
+        """
+    }
+
+    def dbName = "test_txn_limit_db"
+    def tableName = "${dbName}.test_txn_limit"
+
+    create_db_and_table("${dbName}", "${tableName}")
+
+    test {
+        sql """insert into ${tableName} 
values(1,1,5,'{"b":"b"}'),(1,1,4,'{"b":"b"}'),(1,1,3,'{"b":"b"}')"""
+        exception "current running txns on db"
+    }
+
+    sql "ADMIN SET FRONTEND CONFIG ('max_running_txn_num_per_db' = 
'${originTxnNum}')"
+    logger.info("reset max_running_txn_num_per_db to ${originTxnNum}")
+
+    // test max_publishing_txn_num_per_table
+    // this config is used to limit the number of publishing txns on a table
+    dbName = "test_txn_limit_db1"
+    tableName = "${dbName}.test_txn_limit"
+    create_db_and_table("${dbName}", "${tableName}")
+
+    def maxPublishingTxnConfig = sql "SHOW FRONTEND CONFIG LIKE 
'max_publishing_txn_num_per_table'"
+    assert maxPublishingTxnConfig.size() == 1
+    def maxPublishingTxnNum = maxPublishingTxnConfig[0][1] as long
+    logger.info("max_publishing_txn_num_per_table: 
${maxPublishingTxnConfig[0][1]}")
+
+    sql "ADMIN SET FRONTEND CONFIG ('max_publishing_txn_num_per_table' = '0')"
+        test {
+        sql """insert into ${tableName} 
values(1,1,5,'{"b":"b"}'),(1,1,4,'{"b":"b"}'),(1,1,3,'{"b":"b"}')"""
+        exception "current committed txns on table"
+    }
+    sql "ADMIN SET FRONTEND CONFIG ('max_publishing_txn_num_per_table' = 
'${maxPublishingTxnNum}')"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to