This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ae23a32063 [Fix](group commit) Fix wal mem back pressure fault 
injection case (#29493)
1ae23a32063 is described below

commit 1ae23a32063818212cd59fe05cfc62b2ea36c898
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Sat Jan 6 18:39:26 2024 +0800

    [Fix](group commit) Fix wal mem back pressure fault injection case (#29493)
---
 be/src/runtime/group_commit_mgr.cpp                |   2 +-
 ...st_wal_mem_back_pressure_fault_injection.groovy | 112 +++++++++------------
 2 files changed, 49 insertions(+), 65 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 1bf189c384d..2838ebbed51 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -34,7 +34,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
     RETURN_IF_ERROR(status);
     auto start = std::chrono::steady_clock::now();
     while (!runtime_state->is_cancelled() && status.ok() &&
-           _all_block_queues_bytes->load(std::memory_order_relaxed) >
+           _all_block_queues_bytes->load(std::memory_order_relaxed) >=
                    config::group_commit_queue_mem_limit) {
         _put_cond.wait_for(l,
                            
std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME));
diff --git 
a/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy
index 473abb6bb2a..c3cfe5fa0c7 100644
--- 
a/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy
@@ -20,53 +20,17 @@ 
suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
 
     def tableName = "wal_test"
     sql """ DROP TABLE IF EXISTS ${tableName} """
-    sql """
-        CREATE TABLE IF NOT EXISTS `wal_baseall` (
-            `k0` boolean null comment "",
-            `k1` tinyint(4) null comment "",
-            `k2` smallint(6) null comment "",
-            `k3` int(11) null comment "",
-            `k4` bigint(20) null comment "",
-            `k5` decimal(9, 3) null comment "",
-            `k6` char(5) null comment "",
-            `k10` date null comment "",
-            `k11` datetime null comment "",
-            `k7` varchar(20) null comment "",
-            `k8` double max null comment "",
-            `k9` float sum null comment "",
-            `k12` string replace null comment "",
-            `k13` largeint(40) replace null comment ""
-        ) engine=olap
-        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
-        """
 
     sql """
-        CREATE TABLE IF NOT EXISTS `wal_test` (
-            `k0` boolean null comment "",
-            `k1` tinyint(4) null comment "",
-            `k2` smallint(6) null comment "",
-            `k3` int(11) null comment "",
-            `k4` bigint(20) null comment "",
-            `k5` decimal(9, 3) null comment "",
-            `k6` char(5) null comment "",
-            `k10` date null comment "",
-            `k11` datetime null comment "",
-            `k7` varchar(20) null comment "",
-            `k8` double max null comment "",
-            `k9` float sum null comment "",
-            `k12` string replace_if_not_null null comment "",
-            `k13` largeint(40) replace null comment ""
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k` int ,
+            `v` int ,
         ) engine=olap
-        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        DISTRIBUTED BY HASH(`k`) 
+        BUCKETS 5 
+        properties("replication_num" = "1")
         """
 
-    streamLoad {
-        table "wal_baseall"
-        db "regression_test_fault_injection_p0"
-        set 'column_separator', ','
-        file "baseall.txt"
-    }
-
     def enable_back_pressure = {
         try {
             def fes = sql_return_maparray "show frontends"
@@ -76,14 +40,23 @@ 
suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
                 def be = bes[0]
                     def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
                     logger.info("observer url: " + url)
-                        StringBuilder sb = new StringBuilder();
-                        sb.append("curl -X POST 
http://${fe.Host}:${fe.HttpPort}";)
-                        sb.append("/rest/v2/manager/node/set_config/be")
-                        sb.append(" -H \"Content-Type: application/json\" -H 
\"Authorization: Basic cm9vdDo= \"")
-                        sb.append(""" -d 
\"{\\"group_commit_queue_mem_limit\\": {\\"node\\": 
[\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": 
\\"false\\"}}\"""")
-                        String command = sb.toString()
-                        logger.info(command)
-                        def process = command.execute()
+                    StringBuilder sb = new StringBuilder();
+                    sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}";)
+                    sb.append("/rest/v2/manager/node/set_config/be")
+                    sb.append(" -H \"Content-Type: application/json\" -H 
\"Authorization: Basic cm9vdDo= \"")
+                    sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": 
{\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": 
\\"0\\",\\"persist\\": \\"false\\"}}\"""")
+                    String command = sb.toString()
+                    logger.info(command)
+                    def process = command.execute()
+
+                    sb = new StringBuilder();
+                    sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}";)
+                    sb.append("/rest/v2/manager/node/set_config/be")
+                    sb.append(" -H \"Content-Type: application/json\" -H 
\"Authorization: Basic cm9vdDo= \"")
+                    sb.append(""" -d 
\"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": 
[\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": 
\\"false\\"}}\"""")
+                    command = sb.toString()
+                    logger.info(command)
+                    process = command.execute()
         } finally {
         }
     }
@@ -97,14 +70,23 @@ 
suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
                 def be = bes[0]
                     def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
                     logger.info("observer url: " + url)
-                        StringBuilder sb = new StringBuilder();
-                        sb.append("curl -X POST 
http://${fe.Host}:${fe.HttpPort}";)
-                        sb.append("/rest/v2/manager/node/set_config/be")
-                        sb.append(" -H \"Content-Type: application/json\" -H 
\"Authorization: Basic cm9vdDo= \"")
-                        sb.append(""" -d 
\"{\\"group_commit_queue_mem_limit\\": {\\"node\\": 
[\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"67108864\\",\\"persist\\": 
\\"false\\"}}\"""")
-                        String command = sb.toString()
-                        logger.info(command)
-                        def process = command.execute()
+                    StringBuilder sb = new StringBuilder();
+                    sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}";)
+                    sb.append("/rest/v2/manager/node/set_config/be")
+                    sb.append(" -H \"Content-Type: application/json\" -H 
\"Authorization: Basic cm9vdDo= \"")
+                    sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": 
{\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": 
\\"67108864\\",\\"persist\\": \\"false\\"}}\"""")
+                    String command = sb.toString()
+                    logger.info(command)
+                    def process = command.execute()
+
+                    sb = new StringBuilder();
+                    sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}";)
+                    sb.append("/rest/v2/manager/node/set_config/be")
+                    sb.append(" -H \"Content-Type: application/json\" -H 
\"Authorization: Basic cm9vdDo= \"")
+                    sb.append(""" -d 
\"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": 
[\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"10000\\",\\"persist\\": 
\\"false\\"}}\"""")
+                    command = sb.toString()
+                    logger.info(command)
+                    process = command.execute()
         } finally {
         }
     }
@@ -114,28 +96,30 @@ 
suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
     def thread1 = new Thread({
     sql """ set group_commit = async_mode; """
         try {
-            sql """insert into ${tableName} select * from wal_baseall where k1 
<= 3"""
+            sql """insert into ${tableName} values(1,1)"""
         } catch (Exception e) {
             logger.info(e.getMessage())
             assertTrue(e.getMessage().contains('Communications link failure'))
-        } 
-    disable_back_pressure()
-    finish  = true
+        } finally {
+            finish  = true
+        }
     })
     thread1.start()
 
-    for(int i = 0;i<10;i++){
+    while(!finish){
         def processList = sql "show processlist"
         logger.info(processList.toString())
         processList.each { item ->
             logger.info(item[1].toString())
             logger.info(item[11].toString())
-            if (item[11].toString() == "insert into ${tableName} select * from 
wal_baseall where k1 <= 3".toString()){
+            if (item[11].toString() == "".toString()){
                 def res = sql "kill ${item[1]}"
                 logger.info(res.toString())
             }
         }
+        sleep(1000)
     }
+    disable_back_pressure()
 
     thread1.join()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to