This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 7a98ac434f7 [Chore](case) pick some broadcast join case from #48247 #47380 (#48310) 7a98ac434f7 is described below commit 7a98ac434f79a2060e2bac623da39701b7a9e3b4 Author: Pxl <x...@selectdb.com> AuthorDate: Tue Feb 25 20:32:12 2025 +0800 [Chore](case) pick some broadcast join case from #48247 #47380 (#48310) ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: ### Release note None ### Check List (For Author) - Test <!-- At least one of them must be included. --> - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason <!-- Add your reason? --> - Behavior changed: - [ ] No. - [ ] Yes. <!-- Explain the behavior change --> - Does this need documentation? - [ ] No. - [ ] Yes. <!-- Add document PR link here. eg: https://github.com/apache/doris-website/pull/1214 --> ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label <!-- Add branch pick label that this PR should merge into --> --- be/src/pipeline/pipeline.cpp | 9 +++ be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 25 ++++++ .../join/test_slow_close/test_slow_close.out | Bin 0 -> 133 bytes .../join/test_slow_close/test_slow_close.groovy | 88 +++++++++++++++++++++ 4 files changed, 122 insertions(+) diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 450cb0c123d..7f7c086f38a 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -101,6 +101,15 @@ Status Pipeline::set_sink(DataSinkOperatorXPtr& sink) { } void Pipeline::make_all_runnable() { + DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", { + auto pipeline_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "Pipeline::make_all_runnable.sleep", "pipeline_id", -1); + if (pipeline_id == id()) { + LOG(WARNING) << "Pipeline::make_all_runnable.sleep sleep 10s"; + sleep(10); + } + }); + if (_sink_x->count_down_destination()) { for (auto* task : _tasks) { if (task) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index bea8d52200d..d158057173e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -263,6 +263,17 @@ Status PipelineXTask::execute(bool* eos) { // The status must be runnable if (!_opened) { + DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", { + auto required_pipeline_id = + DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.open_sleep", "pipeline_id", -1); + auto required_task_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.open_sleep", "task_id", -1); + if (required_pipeline_id == pipeline_id() && required_task_id == task_id()) { + LOG(WARNING) << "PipelineTask::execute.open_sleep sleep 5s"; + sleep(5); + } + }); if (_wake_up_early) { *eos = true; _eos = true; @@ -352,6 +363,20 @@ Status PipelineXTask::execute(bool* eos) { if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); + + DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", { + auto required_pipeline_id = + DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.sink_eos_sleep", "pipeline_id", -1); + auto required_task_id = + DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.sink_eos_sleep", "task_id", -1); + if (required_pipeline_id == pipeline_id() && required_task_id == task_id()) { + LOG(WARNING) << "PipelineTask::execute.sink_eos_sleep sleep 10s"; + sleep(10); + } + }); + status = _sink->sink(_state, block, *eos); if (status.is<ErrorCode::END_OF_FILE>()) { set_wake_up_and_dep_ready(); diff --git a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out new file mode 100644 index 00000000000..5e4d8ec9448 Binary files /dev/null and b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out differ diff --git a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy new file mode 100644 index 00000000000..0b36d2da5ab --- /dev/null +++ b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_slow_close") { + sql "set disable_join_reorder=true;" + sql "set runtime_filter_type='bloom_filter';" + sql "set parallel_pipeline_task_num=3" + sql "set ignore_runtime_filter_ids='1,2';" + sql "set enable_runtime_filter_prune=false;" + + sql """ drop table if exists t1; """ + sql """ drop table if exists t3; """ + sql """ drop table if exists t5; """ + + sql """ + create table t1 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + """ + + sql """ + create table t3 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + + """ + + sql """ + create table t5 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + """ + + sql """ + insert into t1 select e1,e1 from (select 1 k1) as t lateral view explode_numbers(100000) tmp1 as e1; + """ + + sql """ + insert into t3 values(1,1),(2,2),(3,3); + """ + + sql """ + insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5); + """ + + try { + GetDebugPoint().enableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep",[pipeline_id: 4]) + qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join [broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep") + } + + sql "set ignore_runtime_filter_ids='0';" + try { + GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.open_sleep",[pipeline_id: 4, task_id: 7]) + GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep",[pipeline_id: 4, task_id: 15]) + qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join [broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.open_sleep") + GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org