This is an automated email from the ASF dual-hosted git repository. liaoxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 98d87b21655 [fix](load) fix the error msg of task submission failure for memory back pressure (#51078) 98d87b21655 is described below commit 98d87b216557031a35dde63609015373eee0d309 Author: hui lai <lai...@selectdb.com> AuthorDate: Thu May 22 10:23:20 2025 +0800 [fix](load) fix the error msg of task submission failure for memory back pressure (#51078) If backend node memory reached limit, task submission will fail for memory back pressure. But the error msg is confusing: ``` failed to send task: errCode = 2, detailMessage = failed to submit task. error code: TOO_MANY_TASKS, msg: (127.0.0.1)[TOO_MANY_TASKS]... ``` Change the error msg to: ``` failed to submit task. error code: MEM_LIMIT_EXCEEDED, msg: (127.0.0.1)[MEM_LIMIT_EXCEEDED]... ``` --- .../routine_load/routine_load_task_executor.cpp | 29 +++- .../routine_load/routine_load_task_executor.h | 2 +- .../test_routine_load_error_info.groovy | 174 ++++++++------------- 3 files changed, 90 insertions(+), 115 deletions(-) diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp b/be/src/runtime/routine_load/routine_load_task_executor.cpp index da9654557f2..e51a91913a6 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.cpp +++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp @@ -209,13 +209,14 @@ Status RoutineLoadTaskExecutor::get_kafka_real_offsets_for_partitions( Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { std::unique_lock<std::mutex> l(_lock); + // check if already submitted if (_task_map.find(task.id) != _task_map.end()) { - // already submitted LOG(INFO) << "routine load task " << UniqueId(task.id) << " has already been submitted"; return Status::OK(); } - if (_task_map.size() >= config::max_routine_load_thread_pool_size || _reach_memory_limit()) { + // check task num limit + if (_task_map.size() >= config::max_routine_load_thread_pool_size) { LOG(INFO) << "too many tasks in thread pool. reject task: " << UniqueId(task.id) << ", job id: " << task.job_id << ", queue size: " << _thread_pool->get_queue_size() @@ -224,6 +225,18 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { BackendOptions::get_localhost()); } + // check memory limit + std::string reason; + DBUG_EXECUTE_IF("RoutineLoadTaskExecutor.submit_task.memory_limit", { + _reach_memory_limit(reason); + return Status::MemoryLimitExceeded("fake reason: " + reason); + }); + if (_reach_memory_limit(reason)) { + LOG(INFO) << "reach memory limit. reject task: " << UniqueId(task.id) + << ", job id: " << task.job_id << ", reason: " << reason; + return Status::MemoryLimitExceeded(reason); + } + // create the context std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); ctx->load_type = TLoadType::ROUTINE_LOAD; @@ -318,13 +331,17 @@ Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) { } } -bool RoutineLoadTaskExecutor::_reach_memory_limit() { +bool RoutineLoadTaskExecutor::_reach_memory_limit(std::string& reason) { + DBUG_EXECUTE_IF("RoutineLoadTaskExecutor.submit_task.memory_limit", { + reason = "reach memory limit"; + return true; + }); bool is_exceed_soft_mem_limit = GlobalMemoryArbitrator::is_exceed_soft_mem_limit(); auto current_load_mem_value = MemoryProfile::load_current_usage(); if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) { - LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit - << " current_load_mem_value: " << current_load_mem_value - << " _load_mem_limit: " << _load_mem_limit; + reason = "is_exceed_soft_mem_limit: " + std::to_string(is_exceed_soft_mem_limit) + + " current_load_mem_value: " + std::to_string(current_load_mem_value) + + " _load_mem_limit: " + std::to_string(_load_mem_limit); return true; } return false; diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h index b1196f7824a..eae3f9c4073 100644 --- a/be/src/runtime/routine_load/routine_load_task_executor.h +++ b/be/src/runtime/routine_load/routine_load_task_executor.h @@ -88,7 +88,7 @@ private: // create a dummy StreamLoadContext for PKafkaMetaProxyRequest Status _prepare_ctx(const PKafkaMetaProxyRequest& request, std::shared_ptr<StreamLoadContext> ctx); - bool _reach_memory_limit(); + bool _reach_memory_limit(std::string& reason); private: ExecEnv* _exec_env = nullptr; diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy index f05b8af79ee..2f018a93729 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy @@ -48,13 +48,11 @@ suite("test_routine_load_error_info","nonConcurrent") { } } - // case 1: task failed - if (enabled != null && enabled.equalsIgnoreCase("true")) { - // create table - def jobName = "test_error_info" - def tableName = "test_routine_error_info" - try { - sql """ + def createTable = {tableName -> + sql """ + DROP TABLE IF EXISTS ${tableName} + """ + sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( k00 INT NOT NULL, @@ -120,13 +118,12 @@ suite("test_routine_load_error_info","nonConcurrent") { "bloom_filter_columns"="k05", "replication_num" = "1" ); - """ - sql "sync" + """ + } - // create job - GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments") - sql """ - CREATE ROUTINE LOAD ${jobName} on ${tableName} + def createJob = {jobName, tableName, kafkaTopic -> + sql """ + CREATE ROUTINE LOAD ${jobName} on ${tableName} COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18), COLUMNS TERMINATED BY "|" PROPERTIES @@ -138,10 +135,22 @@ suite("test_routine_load_error_info","nonConcurrent") { FROM KAFKA ( "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", - "kafka_topic" = "${kafkaCsvTpoics[0]}", + "kafka_topic" = "${kafkaTopic}", "property.kafka_default_offsets" = "OFFSET_BEGINNING" ); - """ + """ + } + + // case 1: task failed + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // create table + def jobName = "test_error_info" + def tableName = "test_routine_error_info" + try { + createTable(tableName) + sql "sync" + GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments") + createJob(jobName, tableName, kafkaCsvTpoics[0]) sql "sync" // check error info @@ -158,10 +167,8 @@ suite("test_routine_load_error_info","nonConcurrent") { if (count > 60) { assertEquals(1, 2) break; - } else { - sleep(1000) - continue; } + sleep(1000) } } finally { GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments") @@ -175,93 +182,45 @@ suite("test_routine_load_error_info","nonConcurrent") { def jobName = "test_error_info" def tableName = "test_routine_error_info" try { - sql """ - CREATE TABLE IF NOT EXISTS ${tableName} - ( - k00 INT NOT NULL, - k01 DATE NOT NULL, - k02 BOOLEAN NULL, - k03 TINYINT NULL, - k04 SMALLINT NULL, - k05 INT NULL, - k06 BIGINT NULL, - k07 LARGEINT NULL, - k08 FLOAT NULL, - k09 DOUBLE NULL, - k10 DECIMAL(9,1) NULL, - k11 DECIMALV3(9,1) NULL, - k12 DATETIME NULL, - k13 DATEV2 NULL, - k14 DATETIMEV2 NULL, - k15 CHAR NULL, - k16 VARCHAR NULL, - k17 STRING NULL, - k18 JSON NULL, - kd01 BOOLEAN NOT NULL DEFAULT "TRUE", - kd02 TINYINT NOT NULL DEFAULT "1", - kd03 SMALLINT NOT NULL DEFAULT "2", - kd04 INT NOT NULL DEFAULT "3", - kd05 BIGINT NOT NULL DEFAULT "4", - kd06 LARGEINT NOT NULL DEFAULT "5", - kd07 FLOAT NOT NULL DEFAULT "6.0", - kd08 DOUBLE NOT NULL DEFAULT "7.0", - kd09 DECIMAL NOT NULL DEFAULT "888888888", - kd10 DECIMALV3 NOT NULL DEFAULT "999999999", - kd11 DATE NOT NULL DEFAULT "2023-08-24", - kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00", - kd13 DATEV2 NOT NULL DEFAULT "2023-08-24", - kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00", - kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", - kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体", - kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体", - kd18 JSON NULL, - - INDEX idx_inverted_k104 (`k05`) USING INVERTED, - INDEX idx_inverted_k110 (`k11`) USING INVERTED, - INDEX idx_inverted_k113 (`k13`) USING INVERTED, - INDEX idx_inverted_k114 (`k14`) USING INVERTED, - INDEX idx_inverted_k117 (`k17`) USING INVERTED PROPERTIES("parser" = "english"), - INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), - INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), - INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF PROPERTIES("gram_size"="3", "bf_size"="256"), - - INDEX idx_bitmap_k104 (`k02`) USING BITMAP, - INDEX idx_bitmap_k110 (`kd01`) USING BITMAP - - ) - DUPLICATE KEY(k00) - PARTITION BY RANGE(k01) - ( - PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')), - PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')), - PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01')) - ) - DISTRIBUTED BY HASH(k00) BUCKETS 32 - PROPERTIES ( - "bloom_filter_columns"="k05", - "replication_num" = "1" - ); - """ + createTable(tableName) + sql "sync" + createJob(jobName, tableName, "invalid_job") sql "sync" - // create job - sql """ - CREATE ROUTINE LOAD ${jobName} on ${tableName} - COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18), - COLUMNS TERMINATED BY "|" - PROPERTIES - ( - "max_batch_interval" = "5", - "max_batch_rows" = "300000", - "max_batch_size" = "209715200" - ) - FROM KAFKA - ( - "kafka_broker_list" = "${externalEnvIp}:${kafka_port}", - "kafka_topic" = "invalid_job", - "property.kafka_default_offsets" = "OFFSET_BEGINNING" - ); - """ + // check error info + def count = 0 + while (true) { + def res = sql "show routine load for ${jobName}" + log.info("show routine load: ${res[0].toString()}".toString()) + log.info("reason: ${res[0][17].toString()}".toString()) + if (res[0][17].toString() != "") { + assertTrue(res[0][17].toString().contains("may be Kafka properties set in job is error or no partition in this topic that should check Kafka")) + break; + } + count++ + if (count > 60) { + assertEquals(1, 2) + break; + } + sleep(1000) + } + } finally { + sql "stop routine load for ${jobName}" + sql "DROP TABLE IF EXISTS ${tableName}" + } + } + + // case 3: memory limit + if (enabled != null && enabled.equalsIgnoreCase("true")) { + def jobName = "test_memory_limit_error_info" + def tableName = "test_routine_memory_limit_error_info" + + try { + createTable(tableName) + sql "sync" + GetDebugPoint().enableDebugPointForAllBEs("RoutineLoadTaskExecutor.submit_task.memory_limit") + createJob(jobName, tableName, kafkaCsvTpoics[0]) + sql "sync" // check error info def count = 0 @@ -269,20 +228,19 @@ suite("test_routine_load_error_info","nonConcurrent") { def res = sql "show routine load for ${jobName}" log.info("show routine load: ${res[0].toString()}".toString()) log.info("other msg: ${res[0][19].toString()}".toString()) - if (res[0][19].toString() != "" && res[0][8].toString() == "NEED_SCHEDULE") { - assertTrue(res[0][19].toString().contains("may be Kafka properties set in job is error or no partition in this topic that should check Kafka")) + if (res[0][19].toString() != "") { + assertTrue(res[0][19].toString().contains("reach memory limit")) break; } count++ if (count > 60) { assertEquals(1, 2) break; - } else { - sleep(1000) - continue; } + sleep(1000) } } finally { + GetDebugPoint().disableDebugPointForAllBEs("RoutineLoadTaskExecutor.submit_task.memory_limit") sql "stop routine load for ${jobName}" sql "DROP TABLE IF EXISTS ${tableName}" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org