This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 98d87b21655 [fix](load) fix the error msg of task submission failure 
for memory back pressure (#51078)
98d87b21655 is described below

commit 98d87b216557031a35dde63609015373eee0d309
Author: hui lai <lai...@selectdb.com>
AuthorDate: Thu May 22 10:23:20 2025 +0800

    [fix](load) fix the error msg of task submission failure for memory back 
pressure (#51078)
    
    If backend node memory reached limit, task submission will fail for
    memory back pressure. But the error msg is confusing:
    ```
    failed to send task: errCode = 2, detailMessage = failed to submit task. 
error code: TOO_MANY_TASKS, msg:
    (127.0.0.1)[TOO_MANY_TASKS]...
    ```
    
    Change the error msg to:
    ```
    failed to submit task. error code: MEM_LIMIT_EXCEEDED, msg: 
(127.0.0.1)[MEM_LIMIT_EXCEEDED]...
    ```
---
 .../routine_load/routine_load_task_executor.cpp    |  29 +++-
 .../routine_load/routine_load_task_executor.h      |   2 +-
 .../test_routine_load_error_info.groovy            | 174 ++++++++-------------
 3 files changed, 90 insertions(+), 115 deletions(-)

diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index da9654557f2..e51a91913a6 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -209,13 +209,14 @@ Status 
RoutineLoadTaskExecutor::get_kafka_real_offsets_for_partitions(
 
 Status RoutineLoadTaskExecutor::submit_task(const TRoutineLoadTask& task) {
     std::unique_lock<std::mutex> l(_lock);
+    // check if already submitted
     if (_task_map.find(task.id) != _task_map.end()) {
-        // already submitted
         LOG(INFO) << "routine load task " << UniqueId(task.id) << " has 
already been submitted";
         return Status::OK();
     }
 
-    if (_task_map.size() >= config::max_routine_load_thread_pool_size || 
_reach_memory_limit()) {
+    // check task num limit
+    if (_task_map.size() >= config::max_routine_load_thread_pool_size) {
         LOG(INFO) << "too many tasks in thread pool. reject task: " << 
UniqueId(task.id)
                   << ", job id: " << task.job_id
                   << ", queue size: " << _thread_pool->get_queue_size()
@@ -224,6 +225,18 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
                                     BackendOptions::get_localhost());
     }
 
+    // check memory limit
+    std::string reason;
+    DBUG_EXECUTE_IF("RoutineLoadTaskExecutor.submit_task.memory_limit", {
+        _reach_memory_limit(reason);
+        return Status::MemoryLimitExceeded("fake reason: " + reason);
+    });
+    if (_reach_memory_limit(reason)) {
+        LOG(INFO) << "reach memory limit. reject task: " << UniqueId(task.id)
+                  << ", job id: " << task.job_id << ", reason: " << reason;
+        return Status::MemoryLimitExceeded(reason);
+    }
+
     // create the context
     std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
     ctx->load_type = TLoadType::ROUTINE_LOAD;
@@ -318,13 +331,17 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
     }
 }
 
-bool RoutineLoadTaskExecutor::_reach_memory_limit() {
+bool RoutineLoadTaskExecutor::_reach_memory_limit(std::string& reason) {
+    DBUG_EXECUTE_IF("RoutineLoadTaskExecutor.submit_task.memory_limit", {
+        reason = "reach memory limit";
+        return true;
+    });
     bool is_exceed_soft_mem_limit = 
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
     auto current_load_mem_value = MemoryProfile::load_current_usage();
     if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) {
-        LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit
-                  << " current_load_mem_value: " << current_load_mem_value
-                  << " _load_mem_limit: " << _load_mem_limit;
+        reason = "is_exceed_soft_mem_limit: " + 
std::to_string(is_exceed_soft_mem_limit) +
+                 " current_load_mem_value: " + 
std::to_string(current_load_mem_value) +
+                 " _load_mem_limit: " + std::to_string(_load_mem_limit);
         return true;
     }
     return false;
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index b1196f7824a..eae3f9c4073 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -88,7 +88,7 @@ private:
     // create a dummy StreamLoadContext for PKafkaMetaProxyRequest
     Status _prepare_ctx(const PKafkaMetaProxyRequest& request,
                         std::shared_ptr<StreamLoadContext> ctx);
-    bool _reach_memory_limit();
+    bool _reach_memory_limit(std::string& reason);
 
 private:
     ExecEnv* _exec_env = nullptr;
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
index f05b8af79ee..2f018a93729 100644
--- 
a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy
@@ -48,13 +48,11 @@ suite("test_routine_load_error_info","nonConcurrent") {
         }
     }
 
-    // case 1: task failed
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
-        // create table
-        def jobName = "test_error_info"
-        def tableName = "test_routine_error_info"
-        try {
-            sql """
+    def createTable = {tableName ->
+        sql """
+            DROP TABLE IF EXISTS ${tableName}
+        """
+        sql """
             CREATE TABLE IF NOT EXISTS ${tableName}
             (
                 k00 INT             NOT NULL,
@@ -120,13 +118,12 @@ suite("test_routine_load_error_info","nonConcurrent") {
                 "bloom_filter_columns"="k05",
                 "replication_num" = "1"
             );
-            """
-            sql "sync"
+        """
+    }
 
-            // create job
-            
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
-            sql """
-                CREATE ROUTINE LOAD ${jobName} on ${tableName}
+    def createJob = {jobName, tableName, kafkaTopic ->
+        sql """
+        CREATE ROUTINE LOAD ${jobName} on ${tableName}
                 
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
                 COLUMNS TERMINATED BY "|"
                 PROPERTIES
@@ -138,10 +135,22 @@ suite("test_routine_load_error_info","nonConcurrent") {
                 FROM KAFKA
                 (
                     "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
-                    "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                    "kafka_topic" = "${kafkaTopic}",
                     "property.kafka_default_offsets" = "OFFSET_BEGINNING"
                 );
-            """
+        """
+    }
+
+    // case 1: task failed
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        // create table
+        def jobName = "test_error_info"
+        def tableName = "test_routine_error_info"
+        try {
+            createTable(tableName)
+            sql "sync"
+            
GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
+            createJob(jobName, tableName, kafkaCsvTpoics[0])
             sql "sync"
 
             // check error info
@@ -158,10 +167,8 @@ suite("test_routine_load_error_info","nonConcurrent") {
                 if (count > 60) {
                     assertEquals(1, 2)
                     break;
-                } else {
-                    sleep(1000)
-                    continue;
                 }
+                sleep(1000)
             }
         } finally {
             
GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments")
@@ -175,93 +182,45 @@ suite("test_routine_load_error_info","nonConcurrent") {
         def jobName = "test_error_info"
         def tableName = "test_routine_error_info"
         try {
-            sql """
-            CREATE TABLE IF NOT EXISTS ${tableName}
-            (
-                k00 INT             NOT NULL,
-                k01 DATE            NOT NULL,
-                k02 BOOLEAN         NULL,
-                k03 TINYINT         NULL,
-                k04 SMALLINT        NULL,
-                k05 INT             NULL,
-                k06 BIGINT          NULL,
-                k07 LARGEINT        NULL,
-                k08 FLOAT           NULL,
-                k09 DOUBLE          NULL,
-                k10 DECIMAL(9,1)    NULL,
-                k11 DECIMALV3(9,1)  NULL,
-                k12 DATETIME        NULL,
-                k13 DATEV2          NULL,
-                k14 DATETIMEV2      NULL,
-                k15 CHAR            NULL,
-                k16 VARCHAR         NULL,
-                k17 STRING          NULL,
-                k18 JSON            NULL,
-                kd01 BOOLEAN         NOT NULL DEFAULT "TRUE",
-                kd02 TINYINT         NOT NULL DEFAULT "1",
-                kd03 SMALLINT        NOT NULL DEFAULT "2",
-                kd04 INT             NOT NULL DEFAULT "3",
-                kd05 BIGINT          NOT NULL DEFAULT "4",
-                kd06 LARGEINT        NOT NULL DEFAULT "5",
-                kd07 FLOAT           NOT NULL DEFAULT "6.0",
-                kd08 DOUBLE          NOT NULL DEFAULT "7.0",
-                kd09 DECIMAL         NOT NULL DEFAULT "888888888",
-                kd10 DECIMALV3       NOT NULL DEFAULT "999999999",
-                kd11 DATE            NOT NULL DEFAULT "2023-08-24",
-                kd12 DATETIME        NOT NULL DEFAULT "2023-08-24 12:00:00",
-                kd13 DATEV2          NOT NULL DEFAULT "2023-08-24",
-                kd14 DATETIMEV2      NOT NULL DEFAULT "2023-08-24 12:00:00",
-                kd15 CHAR(255)       NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
-                kd16 VARCHAR(300)    NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
-                kd17 STRING          NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
-                kd18 JSON            NULL,
-                
-                INDEX idx_inverted_k104 (`k05`) USING INVERTED,
-                INDEX idx_inverted_k110 (`k11`) USING INVERTED,
-                INDEX idx_inverted_k113 (`k13`) USING INVERTED,
-                INDEX idx_inverted_k114 (`k14`) USING INVERTED,
-                INDEX idx_inverted_k117 (`k17`) USING INVERTED 
PROPERTIES("parser" = "english"),
-                INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
-                INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
-                INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF 
PROPERTIES("gram_size"="3", "bf_size"="256"),
-
-                INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
-                INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
-                
-            )
-            DUPLICATE KEY(k00)
-            PARTITION BY RANGE(k01)
-            (
-                PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
-                PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
-                PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
-            )
-            DISTRIBUTED BY HASH(k00) BUCKETS 32
-            PROPERTIES (
-                "bloom_filter_columns"="k05",
-                "replication_num" = "1"
-            );
-            """
+            createTable(tableName)
+            sql "sync"
+            createJob(jobName, tableName, "invalid_job")
             sql "sync"
 
-            // create job
-            sql """
-                CREATE ROUTINE LOAD ${jobName} on ${tableName}
-                
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
-                COLUMNS TERMINATED BY "|"
-                PROPERTIES
-                (
-                    "max_batch_interval" = "5",
-                    "max_batch_rows" = "300000",
-                    "max_batch_size" = "209715200"
-                )
-                FROM KAFKA
-                (
-                    "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
-                    "kafka_topic" = "invalid_job",
-                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
-                );
-            """
+            // check error info
+            def count = 0
+            while (true) {
+                def res = sql "show routine load for ${jobName}"
+                log.info("show routine load: ${res[0].toString()}".toString())
+                log.info("reason: ${res[0][17].toString()}".toString())
+                if (res[0][17].toString() != "") {
+                    assertTrue(res[0][17].toString().contains("may be Kafka 
properties set in job is error or no partition in this topic that should check 
Kafka"))
+                    break;
+                }
+                count++
+                if (count > 60) {
+                    assertEquals(1, 2)
+                    break;
+                }
+                sleep(1000)
+            }
+        } finally {
+            sql "stop routine load for ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tableName}"
+        }
+    }
+
+    // case 3: memory limit
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def jobName = "test_memory_limit_error_info"
+        def tableName = "test_routine_memory_limit_error_info"
+        
+        try {
+            createTable(tableName)
+            sql "sync"
+            
GetDebugPoint().enableDebugPointForAllBEs("RoutineLoadTaskExecutor.submit_task.memory_limit")
+            createJob(jobName, tableName, kafkaCsvTpoics[0])
+            sql "sync"
 
             // check error info
             def count = 0
@@ -269,20 +228,19 @@ suite("test_routine_load_error_info","nonConcurrent") {
                 def res = sql "show routine load for ${jobName}"
                 log.info("show routine load: ${res[0].toString()}".toString())
                 log.info("other msg: ${res[0][19].toString()}".toString())
-                if (res[0][19].toString() != "" && res[0][8].toString() == 
"NEED_SCHEDULE") {
-                    assertTrue(res[0][19].toString().contains("may be Kafka 
properties set in job is error or no partition in this topic that should check 
Kafka"))
+                if (res[0][19].toString() != "") {
+                    assertTrue(res[0][19].toString().contains("reach memory 
limit"))
                     break;
                 }
                 count++
                 if (count > 60) {
                     assertEquals(1, 2)
                     break;
-                } else {
-                    sleep(1000)
-                    continue;
                 }
+                sleep(1000)
             }
         } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("RoutineLoadTaskExecutor.submit_task.memory_limit")
             sql "stop routine load for ${jobName}"
             sql "DROP TABLE IF EXISTS ${tableName}"
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to