This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7a98ac434f7 [Chore](case) pick some broadcast join case from #48247 
#47380 (#48310)
7a98ac434f7 is described below

commit 7a98ac434f79a2060e2bac623da39701b7a9e3b4
Author: Pxl <x...@selectdb.com>
AuthorDate: Tue Feb 25 20:32:12 2025 +0800

    [Chore](case) pick some broadcast join case from #48247 #47380 (#48310)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/pipeline.cpp                       |   9 +++
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |  25 ++++++
 .../join/test_slow_close/test_slow_close.out       | Bin 0 -> 133 bytes
 .../join/test_slow_close/test_slow_close.groovy    |  88 +++++++++++++++++++++
 4 files changed, 122 insertions(+)

diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 450cb0c123d..7f7c086f38a 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -101,6 +101,15 @@ Status Pipeline::set_sink(DataSinkOperatorXPtr& sink) {
 }
 
 void Pipeline::make_all_runnable() {
+    DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
+        auto pipeline_id = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                "Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
+        if (pipeline_id == id()) {
+            LOG(WARNING) << "Pipeline::make_all_runnable.sleep sleep 10s";
+            sleep(10);
+        }
+    });
+
     if (_sink_x->count_down_destination()) {
         for (auto* task : _tasks) {
             if (task) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index bea8d52200d..d158057173e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -263,6 +263,17 @@ Status PipelineXTask::execute(bool* eos) {
 
     // The status must be runnable
     if (!_opened) {
+        DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", {
+            auto required_pipeline_id =
+                    
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                            "PipelineTask::execute.open_sleep", "pipeline_id", 
-1);
+            auto required_task_id = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                    "PipelineTask::execute.open_sleep", "task_id", -1);
+            if (required_pipeline_id == pipeline_id() && required_task_id == 
task_id()) {
+                LOG(WARNING) << "PipelineTask::execute.open_sleep sleep 5s";
+                sleep(5);
+            }
+        });
         if (_wake_up_early) {
             *eos = true;
             _eos = true;
@@ -352,6 +363,20 @@ Status PipelineXTask::execute(bool* eos) {
 
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
+
+            DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", {
+                auto required_pipeline_id =
+                        
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                                "PipelineTask::execute.sink_eos_sleep", 
"pipeline_id", -1);
+                auto required_task_id =
+                        
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                                "PipelineTask::execute.sink_eos_sleep", 
"task_id", -1);
+                if (required_pipeline_id == pipeline_id() && required_task_id 
== task_id()) {
+                    LOG(WARNING) << "PipelineTask::execute.sink_eos_sleep 
sleep 10s";
+                    sleep(10);
+                }
+            });
+
             status = _sink->sink(_state, block, *eos);
             if (status.is<ErrorCode::END_OF_FILE>()) {
                 set_wake_up_and_dep_ready();
diff --git 
a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out 
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out
new file mode 100644
index 00000000000..5e4d8ec9448
Binary files /dev/null and 
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out differ
diff --git 
a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy 
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
new file mode 100644
index 00000000000..0b36d2da5ab
--- /dev/null
+++ 
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_slow_close") {
+    sql "set disable_join_reorder=true;"
+    sql "set runtime_filter_type='bloom_filter';"
+    sql "set parallel_pipeline_task_num=3"
+    sql "set ignore_runtime_filter_ids='1,2';"
+    sql "set enable_runtime_filter_prune=false;"
+
+    sql """ drop table if exists t1; """
+    sql """ drop table if exists t3; """
+    sql """ drop table if exists t5; """
+
+    sql """
+        create table t1 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+        """
+
+    sql """
+        create table t3 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+
+    """
+
+    sql """
+        create table t5 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+    """
+
+    sql """
+    insert into t1 select e1,e1 from (select 1 k1) as t lateral view 
explode_numbers(100000) tmp1 as e1;
+    """
+    
+    sql """
+    insert into t3 values(1,1),(2,2),(3,3);
+    """
+
+    sql """
+    insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5);
+    """
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep",[pipeline_id:
 4])
+        qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join 
[broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] 
on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep")
+    }
+
+    sql "set ignore_runtime_filter_ids='0';"
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.open_sleep",[pipeline_id:
 4, task_id: 7])
+        
GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep",[pipeline_id:
 4, task_id: 15])
+        qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join 
[broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] 
on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.open_sleep")
+        
GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to