This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new fedbf161cd0 [Bug](join) return eof when join build sink awakend by downstream source #47380 #48247 (#47791) fedbf161cd0 is described below commit fedbf161cd0e470ceec6307f0e282f7c00c6b5bc Author: Pxl <x...@selectdb.com> AuthorDate: Tue Feb 25 19:07:05 2025 +0800 [Bug](join) return eof when join build sink awakend by downstream source #47380 #48247 (#47791) pick from #47380 pick case part from #48247 --- be/src/exprs/runtime_filter_slots.h | 3 + be/src/pipeline/exec/hashjoin_build_sink.cpp | 11 +-- be/src/pipeline/pipeline.cpp | 9 +++ be/src/pipeline/pipeline_task.cpp | 25 ++++++ be/src/vec/runtime/shared_hash_table_controller.h | 1 - .../join/test_slow_close/test_slow_close.out | Bin 0 -> 133 bytes .../join/test_slow_close/test_slow_close.groovy | 88 +++++++++++++++++++++ 7 files changed, 128 insertions(+), 9 deletions(-) diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 160664a45f6..8337a5e9945 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -125,6 +125,9 @@ public: Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) { // process IN_OR_BLOOM_FILTER's real type for (auto filter : _runtime_filters) { + if (filter->get_ignored() || filter->get_disabled()) { + continue; + } if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER && get_real_size(filter.get(), local_hash_table_size) > state->runtime_filter_max_in_num()) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index bcad867495f..b25ea8a37c3 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -160,20 +160,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu SCOPED_TIMER(_publish_runtime_filter_timer); RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table)); } catch (Exception& e) { - bool blocked_by_complete_build_stage = p._shared_hashtable_controller && - !p._shared_hash_table_context->complete_build_stage; bool blocked_by_shared_hash_table_signal = !_should_build_hash_table && p._shared_hashtable_controller && !p._shared_hash_table_context->signaled; return Status::InternalError( "rf process meet error: {}, wake_up_early: {}, should_build_hash_table: " - "{}, _finish_dependency: {}, blocked_by_complete_build_stage: {}, " + "{}, _finish_dependency: {}," "blocked_by_shared_hash_table_signal: " "{}", e.to_string(), state->get_task()->wake_up_early(), _should_build_hash_table, - _finish_dependency->debug_string(), blocked_by_complete_build_stage, - blocked_by_shared_hash_table_signal); + _finish_dependency->debug_string(), blocked_by_shared_hash_table_signal); } return Base::close(state, exec_status); @@ -618,7 +615,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state.process_build_block(state, (*local_state._shared_state->build_block))); if (_shared_hashtable_controller) { _shared_hash_table_context->status = Status::OK(); - _shared_hash_table_context->complete_build_stage = true; // arena will be shared with other instances. _shared_hash_table_context->arena = local_state._shared_state->arena; _shared_hash_table_context->hash_table_variants = @@ -631,8 +627,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context); _shared_hashtable_controller->signal(node_id()); } - } else if (!local_state._should_build_hash_table && - _shared_hash_table_context->complete_build_stage) { + } else if (!local_state._should_build_hash_table) { DCHECK(_shared_hashtable_controller != nullptr); DCHECK(_shared_hash_table_context != nullptr); // the instance which is not build hash table, it's should wait the signal of hash table build finished. diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 6c39d361e59..7397c3221d5 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -109,6 +109,15 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) { } void Pipeline::make_all_runnable() { + DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", { + auto pipeline_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "Pipeline::make_all_runnable.sleep", "pipeline_id", -1); + if (pipeline_id == id()) { + LOG(WARNING) << "Pipeline::make_all_runnable.sleep sleep 10s"; + sleep(10); + } + }); + if (_sink->count_down_destination()) { for (auto* task : _tasks) { if (task) { diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 5b5698936d7..6bd95ff3d18 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -319,6 +319,18 @@ Status PipelineTask::execute(bool* eos) { // The status must be runnable if (!_opened && !_fragment_context->is_canceled()) { + DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", { + auto required_pipeline_id = + DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.open_sleep", "pipeline_id", -1); + auto required_task_id = DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.open_sleep", "task_id", -1); + if (required_pipeline_id == pipeline_id() && required_task_id == task_id()) { + LOG(WARNING) << "PipelineTask::execute.open_sleep sleep 5s"; + sleep(5); + } + }); + if (_wake_up_early) { *eos = true; _eos = true; @@ -385,6 +397,19 @@ Status PipelineTask::execute(bool* eos) { if (_block->rows() != 0 || *eos) { SCOPED_TIMER(_sink_timer); + DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", { + auto required_pipeline_id = + DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.sink_eos_sleep", "pipeline_id", -1); + auto required_task_id = + DebugPoints::instance()->get_debug_param_or_default<int32_t>( + "PipelineTask::execute.sink_eos_sleep", "task_id", -1); + if (required_pipeline_id == pipeline_id() && required_task_id == task_id()) { + LOG(WARNING) << "PipelineTask::execute.sink_eos_sleep sleep 10s"; + sleep(10); + } + }); + Status status = _sink->sink(_state, block, *eos); if (status.is<ErrorCode::END_OF_FILE>()) { diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index 421faa4c1d9..b5be516dd53 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -68,7 +68,6 @@ struct SharedHashTableContext { std::map<int, RuntimeFilterContextSPtr> runtime_filters; std::atomic<bool> signaled = false; bool short_circuit_for_null_in_probe_side = false; - std::atomic<bool> complete_build_stage = false; }; using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>; diff --git a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out new file mode 100644 index 00000000000..5e4d8ec9448 Binary files /dev/null and b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out differ diff --git a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy new file mode 100644 index 00000000000..0b36d2da5ab --- /dev/null +++ b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_slow_close") { + sql "set disable_join_reorder=true;" + sql "set runtime_filter_type='bloom_filter';" + sql "set parallel_pipeline_task_num=3" + sql "set ignore_runtime_filter_ids='1,2';" + sql "set enable_runtime_filter_prune=false;" + + sql """ drop table if exists t1; """ + sql """ drop table if exists t3; """ + sql """ drop table if exists t5; """ + + sql """ + create table t1 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + """ + + sql """ + create table t3 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + + """ + + sql """ + create table t5 ( + k1 int null, + k2 int null + ) + duplicate key (k1) + distributed BY hash(k1) buckets 16 + properties("replication_num" = "1"); + """ + + sql """ + insert into t1 select e1,e1 from (select 1 k1) as t lateral view explode_numbers(100000) tmp1 as e1; + """ + + sql """ + insert into t3 values(1,1),(2,2),(3,3); + """ + + sql """ + insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5); + """ + + try { + GetDebugPoint().enableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep",[pipeline_id: 4]) + qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join [broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep") + } + + sql "set ignore_runtime_filter_ids='0';" + try { + GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.open_sleep",[pipeline_id: 4, task_id: 7]) + GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep",[pipeline_id: 4, task_id: 15]) + qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join [broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;" + } finally { + GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.open_sleep") + GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org