This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1ae23a32063 [Fix](group commit) Fix wal mem back pressure fault injection case (#29493) 1ae23a32063 is described below commit 1ae23a32063818212cd59fe05cfc62b2ea36c898 Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Sat Jan 6 18:39:26 2024 +0800 [Fix](group commit) Fix wal mem back pressure fault injection case (#29493) --- be/src/runtime/group_commit_mgr.cpp | 2 +- ...st_wal_mem_back_pressure_fault_injection.groovy | 112 +++++++++------------ 2 files changed, 49 insertions(+), 65 deletions(-) diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 1bf189c384d..2838ebbed51 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -34,7 +34,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state, RETURN_IF_ERROR(status); auto start = std::chrono::steady_clock::now(); while (!runtime_state->is_cancelled() && status.ok() && - _all_block_queues_bytes->load(std::memory_order_relaxed) > + _all_block_queues_bytes->load(std::memory_order_relaxed) >= config::group_commit_queue_mem_limit) { _put_cond.wait_for(l, std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME)); diff --git a/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy index 473abb6bb2a..c3cfe5fa0c7 100644 --- a/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_wal_mem_back_pressure_fault_injection.groovy @@ -20,53 +20,17 @@ suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") { def tableName = "wal_test" sql """ DROP TABLE IF EXISTS ${tableName} """ - sql """ - CREATE TABLE IF NOT EXISTS `wal_baseall` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") - """ sql """ - CREATE TABLE IF NOT EXISTS `wal_test` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace_if_not_null null comment "", - `k13` largeint(40) replace null comment "" + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + DISTRIBUTED BY HASH(`k`) + BUCKETS 5 + properties("replication_num" = "1") """ - streamLoad { - table "wal_baseall" - db "regression_test_fault_injection_p0" - set 'column_separator', ',' - file "baseall.txt" - } - def enable_back_pressure = { try { def fes = sql_return_maparray "show frontends" @@ -76,14 +40,23 @@ suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") { def be = bes[0] def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/" logger.info("observer url: " + url) - StringBuilder sb = new StringBuilder(); - sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") - sb.append("/rest/v2/manager/node/set_config/be") - sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") - sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""") - String command = sb.toString() - logger.info(command) - def process = command.execute() + StringBuilder sb = new StringBuilder(); + sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") + sb.append("/rest/v2/manager/node/set_config/be") + sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") + sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""") + String command = sb.toString() + logger.info(command) + def process = command.execute() + + sb = new StringBuilder(); + sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") + sb.append("/rest/v2/manager/node/set_config/be") + sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") + sb.append(""" -d \"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""") + command = sb.toString() + logger.info(command) + process = command.execute() } finally { } } @@ -97,14 +70,23 @@ suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") { def be = bes[0] def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/" logger.info("observer url: " + url) - StringBuilder sb = new StringBuilder(); - sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") - sb.append("/rest/v2/manager/node/set_config/be") - sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") - sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"67108864\\",\\"persist\\": \\"false\\"}}\"""") - String command = sb.toString() - logger.info(command) - def process = command.execute() + StringBuilder sb = new StringBuilder(); + sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") + sb.append("/rest/v2/manager/node/set_config/be") + sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") + sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"67108864\\",\\"persist\\": \\"false\\"}}\"""") + String command = sb.toString() + logger.info(command) + def process = command.execute() + + sb = new StringBuilder(); + sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}") + sb.append("/rest/v2/manager/node/set_config/be") + sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"") + sb.append(""" -d \"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"10000\\",\\"persist\\": \\"false\\"}}\"""") + command = sb.toString() + logger.info(command) + process = command.execute() } finally { } } @@ -114,28 +96,30 @@ suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") { def thread1 = new Thread({ sql """ set group_commit = async_mode; """ try { - sql """insert into ${tableName} select * from wal_baseall where k1 <= 3""" + sql """insert into ${tableName} values(1,1)""" } catch (Exception e) { logger.info(e.getMessage()) assertTrue(e.getMessage().contains('Communications link failure')) - } - disable_back_pressure() - finish = true + } finally { + finish = true + } }) thread1.start() - for(int i = 0;i<10;i++){ + while(!finish){ def processList = sql "show processlist" logger.info(processList.toString()) processList.each { item -> logger.info(item[1].toString()) logger.info(item[11].toString()) - if (item[11].toString() == "insert into ${tableName} select * from wal_baseall where k1 <= 3".toString()){ + if (item[11].toString() == "".toString()){ def res = sql "kill ${item[1]}" logger.info(res.toString()) } } + sleep(1000) } + disable_back_pressure() thread1.join() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org