This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a35de811139 [branch-2.0][fix](multi-table) fix unknown source slot 
descriptor when load multi table (#25762) (#26223)
a35de811139 is described below

commit a35de81113906f199cfbebe4ae48aad36513f0b9
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Wed Nov 1 19:48:09 2023 +0800

    [branch-2.0][fix](multi-table) fix unknown source slot descriptor when load 
multi table (#25762) (#26223)
---
 be/src/pipeline/pipeline_fragment_context.cpp      | 14 +++-
 be/src/pipeline/pipeline_fragment_context.h        |  2 +
 be/src/runtime/plan_fragment_executor.cpp          | 14 ++--
 be/src/runtime/plan_fragment_executor.h            |  2 +
 .../kafka/scripts/multi_table_csv1.csv             |  3 +
 .../kafka/scripts/multi_table_json1.json           |  3 +
 docker/thirdparties/run-thirdparties-docker.sh     |  4 +-
 .../load_p0/routine_load/test_routine_load.out     | 14 +++-
 .../load_p0/routine_load/test_routine_load.groovy  | 89 ++++++++++++++++++++++
 9 files changed, 131 insertions(+), 14 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index cd529ee835d..87578fbc665 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -246,13 +246,19 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
         
fragment_context->set_is_report_success(request.query_options.is_report_success);
     }
 
-    auto* desc_tbl = _query_ctx->desc_tbl;
-    _runtime_state->set_desc_tbl(desc_tbl);
+    if (request.is_simplified_param) {
+        _desc_tbl = _query_ctx->desc_tbl;
+    } else {
+        DCHECK(request.__isset.desc_tbl);
+        RETURN_IF_ERROR(
+                DescriptorTbl::create(_runtime_state->obj_pool(), 
request.desc_tbl, &_desc_tbl));
+    }
+    _runtime_state->set_desc_tbl(_desc_tbl);
 
     // 2. Create ExecNode to build pipeline with PipelineFragmentContext
     RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
             ExecNode::create_tree(_runtime_state.get(), 
_runtime_state->obj_pool(),
-                                  request.fragment.plan, *desc_tbl, 
&_root_plan));
+                                  request.fragment.plan, *_desc_tbl, 
&_root_plan));
 
     // Set senders of exchange nodes before pipeline build
     std::vector<ExecNode*> exch_nodes;
@@ -308,7 +314,7 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
         RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
                 _runtime_state->obj_pool(), request.fragment.output_sink,
                 request.fragment.output_exprs, request, idx, 
_root_plan->row_desc(),
-                _runtime_state.get(), &_sink, *desc_tbl));
+                _runtime_state.get(), &_sink, *_desc_tbl));
     }
 
     _root_pipeline = fragment_context->add_pipeline();
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 287fac9e40a..ed79800ec60 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -200,6 +200,8 @@ private:
     // If this is set to false, and '_is_report_success' is false as well,
     // This executor will not report status to FE on being cancelled.
     bool _is_report_on_cancel;
+
+    DescriptorTbl* _desc_tbl;
 };
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index de6f5f55810..df5f4b7d3e4 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -149,19 +149,19 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request,
     }
 
     // set up desc tbl
-    DescriptorTbl* desc_tbl = nullptr;
-    if (query_ctx != nullptr) {
-        desc_tbl = query_ctx->desc_tbl;
+    if (request.is_simplified_param) {
+        _desc_tbl = query_ctx->desc_tbl;
     } else {
         DCHECK(request.__isset.desc_tbl);
-        RETURN_IF_ERROR(DescriptorTbl::create(obj_pool(), request.desc_tbl, 
&desc_tbl));
+        RETURN_IF_ERROR(
+                DescriptorTbl::create(_runtime_state->obj_pool(), 
request.desc_tbl, &_desc_tbl));
     }
-    _runtime_state->set_desc_tbl(desc_tbl);
+    _runtime_state->set_desc_tbl(_desc_tbl);
 
     // set up plan
     DCHECK(request.__isset.fragment);
     RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ExecNode::create_tree(
-            _runtime_state.get(), obj_pool(), request.fragment.plan, 
*desc_tbl, &_plan));
+            _runtime_state.get(), obj_pool(), request.fragment.plan, 
*_desc_tbl, &_plan));
 
     // set #senders of exchange nodes before calling Prepare()
     std::vector<ExecNode*> exch_nodes;
@@ -212,7 +212,7 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request,
     if (request.fragment.__isset.output_sink) {
         RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
                 obj_pool(), request.fragment.output_sink, 
request.fragment.output_exprs, params,
-                row_desc(), runtime_state(), &_sink, *desc_tbl));
+                row_desc(), runtime_state(), &_sink, *_desc_tbl));
         RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sink->prepare(runtime_state()));
 
         RuntimeProfile* sink_profile = _sink->profile();
diff --git a/be/src/runtime/plan_fragment_executor.h 
b/be/src/runtime/plan_fragment_executor.h
index c95ddc75c17..f2a9f10d9a3 100644
--- a/be/src/runtime/plan_fragment_executor.h
+++ b/be/src/runtime/plan_fragment_executor.h
@@ -208,6 +208,8 @@ private:
 
     OpentelemetrySpan _span;
 
+    DescriptorTbl* _desc_tbl;
+
     ObjectPool* obj_pool() { return _runtime_state->obj_pool(); }
 
     // typedef for TPlanFragmentExecParams.per_node_scan_ranges
diff --git 
a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv 
b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv
new file mode 100644
index 00000000000..1df0d787733
--- /dev/null
+++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_csv1.csv
@@ -0,0 +1,3 @@
+routine_load_dup_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, 
"name": "Alice", "grade": 9, "subjects": ["math", "science", 
"history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 
12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
+routine_load_uniq_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, 
"name": "Alice", "grade": 9, "subjects": ["math", "science", 
"history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 
12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
+routine_load_mow_tbl_basic|49|2023-08-08|FALSE|\N|16275|-2144851675|-2303421957908954634|-46526938720058765|-13141.142578|-686632233.230200|229942298.0|-152553823.0|2022-09-01
 00:16:01|2023-03-25|2022-09-07 14:59:03|s||yvuILR2iNxfe8RRml|{"student": true, 
"name": "Alice", "grade": 9, "subjects": ["math", "science", 
"history"]}|true|1|2|3|4|5|6.0|7.0|888888888|999999999|2023-08-24|2023-08-24 
12:00:00|2023-08-24|2023-08-24 12:00:00|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|我能吞下玻璃而不伤身体|{}
\ No newline at end of file
diff --git 
a/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json 
b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json
new file mode 100644
index 00000000000..0099b0b5432
--- /dev/null
+++ b/docker/thirdparties/docker-compose/kafka/scripts/multi_table_json1.json
@@ -0,0 +1,3 @@
+routine_load_dup_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", 
"k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": 
"3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", 
"k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 
14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", 
"k16": "", "k17": 
"PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", 
"k18": "\\N"}
+routine_load_uniq_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", 
"k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": 
"3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", 
"k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 
14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", 
"k16": "", "k17": 
"PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", 
"k18": "\\N"}
+routine_load_mow_tbl_basic|{"k00": "8", "k01": "2023-08-14", "k02": "1", 
"k03": "109", "k04": "-31573", "k05": "-1362465190", "k06": 
"3990845741226497177", "k07": "2732763251146840270", "k08": "-25698.553", 
"k09": "1312831962.5678179", "k10": "\\N", "k11": "\\N", "k12": "2023-03-07 
14:13:19", "k13": "2022-10-18", "k14": "2023-07-16 05:03:13", "k15": "D", 
"k16": "", "k17": 
"PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme", 
"k18": "\\N"}
\ No newline at end of file
diff --git a/docker/thirdparties/run-thirdparties-docker.sh 
b/docker/thirdparties/run-thirdparties-docker.sh
index 241ac6acc7b..e94ddbca9f8 100755
--- a/docker/thirdparties/run-thirdparties-docker.sh
+++ b/docker/thirdparties/run-thirdparties-docker.sh
@@ -257,7 +257,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then
         local container_id="$1"
         local ip_host="$2"
 
-        declare -a topics=("basic_data" "basic_array_data" 
"basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone" 
"basic_array_data_timezone" "multi_table_csv")
+        declare -a topics=("basic_data" "basic_array_data" 
"basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone" 
"basic_array_data_timezone" "multi_table_csv" "multi_table_csv1")
 
         for topic in "${topics[@]}"; do
             while IFS= read -r line; do
@@ -265,7 +265,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then
             done < "${ROOT}/docker-compose/kafka/scripts/${topic}.csv"
         done
 
-        declare -a json_topics=("basic_data_json" "basic_array_data_json" 
"basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json")
+        declare -a json_topics=("basic_data_json" "basic_array_data_json" 
"basic_array_data_json_by_line" "basic_data_json_by_line" "multi_table_json" 
"multi_table_json1")
         
         for json_topic in "${json_topics[@]}"; do
             echo ${json_topics}
diff --git a/regression-test/data/load_p0/routine_load/test_routine_load.out 
b/regression-test/data/load_p0/routine_load/test_routine_load.out
index 161af660b47..4288223ca02 100644
--- a/regression-test/data/load_p0/routine_load/test_routine_load.out
+++ b/regression-test/data/load_p0/routine_load/test_routine_load.out
@@ -986,4 +986,16 @@
 49     2023-08-08      false   \N      16275   -2144851675     
-2303421957908954634    -46526938720058765      -13141.143      
-6.866322332302E8       99999999.9      -99999999.9     2022-09-01T00:16:01     
2023-03-25      2022-09-07T14:59:03     s               yvuILR2iNxfe8RRml       
{"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}
 
 -- !sql_multi_table_one_data --
-8      2023-08-14      true    109     -31573  -1362465190     
3990845741226497177     2732763251146840270     -25698.553      
1.312831962567818E9     \N      \N      2023-03-07T14:13:19     2022-10-18      
2023-07-16T05:03:13     D               
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme        
\N
\ No newline at end of file
+8      2023-08-14      true    109     -31573  -1362465190     
3990845741226497177     2732763251146840270     -25698.553      
1.312831962567818E9     \N      \N      2023-03-07T14:13:19     2022-10-18      
2023-07-16T05:03:13     D               
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme        
\N
+
+-- !sql_multi_table --
+49     2023-08-08      false   \N      16275   -2144851675     
-2303421957908954634    -46526938720058765      -13141.143      
-6.866322332302E8       99999999.9      -99999999.9     2022-09-01T00:16:01     
2023-03-25      2022-09-07T14:59:03     s               yvuILR2iNxfe8RRml       
{"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}
       true    1       2       3       4       5       6.0     7.0     
888888888       999999999       2023-08-24      2023-08-24T12:00        
2023-08-24      2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     
我能吞下玻璃而不伤身体     {}
+
+-- !sql_multi_table --
+49     2023-08-08      false   \N      16275   -2144851675     
-2303421957908954634    -46526938720058765      -13141.143      
-6.866322332302E8       99999999.9      -99999999.9     2022-09-01T00:16:01     
2023-03-25      2022-09-07T14:59:03     s               yvuILR2iNxfe8RRml       
{"student":true,"name":"Alice","grade":9,"subjects":["math","science","history"]}
       true    1       2       3       4       5       6.0     7.0     
888888888       999999999       2023-08-24      2023-08-24T12:00        
2023-08-24      2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     
我能吞下玻璃而不伤身体     {}
+
+-- !sql_multi_table --
+8      2023-08-14      true    109     -31573  -1362465190     
3990845741226497177     2732763251146840270     -25698.553      
1.312831962567818E9     \N      \N      2023-03-07T14:13:19     2022-10-18      
2023-07-16T05:03:13     D               
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme        
\N      true    1       2       3       4       5       6.0     7.0     
888888888       999999999       2023-08-24      2023-08-24T12:00        
2023-08-24      2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     
我能吞下玻璃而不伤身体     \N
+
+-- !sql_multi_table --
+8      2023-08-14      true    109     -31573  -1362465190     
3990845741226497177     2732763251146840270     -25698.553      
1.312831962567818E9     \N      \N      2023-03-07T14:13:19     2022-10-18      
2023-07-16T05:03:13     D               
PBn1wa6X8WneZYLMac11zzyhGl7tPXB5XgjmOV8L6uav9ja5oY433ktb2yhyQQIqBveZPkme        
\N      true    1       2       3       4       5       6.0     7.0     
888888888       999999999       2023-08-24      2023-08-24T12:00        
2023-08-24      2023-08-24T12:00        我能吞下玻璃而不伤身体     我能吞下玻璃而不伤身体     
我能吞下玻璃而不伤身体     \N
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy 
b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
index 4b659df4eff..d9560e312c5 100644
--- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy
@@ -31,6 +31,11 @@ suite("test_routine_load","p0") {
                   "dup_tbl_basic_multi_table",
                  ]
 
+    def multiTables1 = [
+                  "dup_tbl_basic",
+                  "uniq_tbl_basic",
+                 ]
+
     def jobs =   [
                   "dup_tbl_basic_job",
                   "uniq_tbl_basic_job",
@@ -127,6 +132,11 @@ suite("test_routine_load","p0") {
                     "multi_table_json",
                   ]
 
+    def multiTableJobName1 = [
+                    "multi_table_csv1",
+                    "multi_table_json1",
+                  ]
+
     def formats = [
                     "csv",
                     "json",
@@ -980,4 +990,83 @@ suite("test_routine_load","p0") {
             j++
         }
     }
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        def j = 0
+        for (String jobName in multiTableJobName1) {
+            try {
+                for (String tableName in multiTables1) {
+                    sql new 
File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
+                    sql new 
File("""${context.file.parent}/ddl/${tableName}_create.sql""").text
+                }
+
+                sql """
+                    CREATE ROUTINE LOAD ${jobName}
+                    COLUMNS TERMINATED BY "|"
+                    PROPERTIES
+                    (
+                        "max_batch_interval" = "5",
+                        "format" = "${formats[j]}",
+                        "max_batch_rows" = "300000",
+                        "max_batch_size" = "209715200"
+                    )
+                    FROM KAFKA
+                    (
+                        "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+                        "kafka_topic" = "${jobName}",
+                        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                    );
+                """
+                sql "sync"
+
+                i = 0
+                for (String tableName in multiTables1) {
+                    while (true) {
+                        sleep(1000)
+                        def res = sql "show routine load for ${jobName}"
+                        def state = res[0][8].toString()
+                        if (state == "NEED_SCHEDULE") {
+                            continue;
+                        }
+                        assertEquals(res[0][8].toString(), "RUNNING")
+                        break;
+                    }
+
+                    def count = 0
+                    def tableName1 =  "routine_load_" + tableName
+                    while (true) {
+                        def res = sql "select count(*) from ${tableName1}"
+                        def state = sql "show routine load for ${jobName}"
+                        log.info("routine load state: 
${state[0][8].toString()}".toString())
+                        log.info("routine load statistic: 
${state[0][14].toString()}".toString())
+                        log.info("reason of state changed: 
${state[0][17].toString()}".toString())
+                        if (res[0][0] > 0) {
+                            break
+                        }
+                        if (count >= 120) {
+                            log.error("routine load can not visible for long 
time")
+                            assertEquals(20, res[0][0])
+                            break
+                        }
+                        sleep(5000)
+                        count++
+                    }
+                    
+                    if (i <= 3) {
+                        qt_sql_multi_table "select * from ${tableName1} order 
by k00,k01"
+                    } else {
+                        qt_sql_multi_table "select * from ${tableName1} order 
by k00"
+                    }
+                    
+                    i++
+                }
+            } finally {
+                sql "stop routine load for ${jobName}"
+                for (String tableName in multiTables1) {
+                    sql new 
File("""${context.file.parent}/ddl/${tableName}_drop.sql""").text
+                }
+            }
+            j++
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to