This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fedbf161cd0  [Bug](join) return eof when join build sink awakend by 
downstream source #47380 #48247 (#47791)
fedbf161cd0 is described below

commit fedbf161cd0e470ceec6307f0e282f7c00c6b5bc
Author: Pxl <x...@selectdb.com>
AuthorDate: Tue Feb 25 19:07:05 2025 +0800

     [Bug](join) return eof when join build sink awakend by downstream source 
#47380 #48247 (#47791)
    
    pick from #47380
    pick case part from #48247
---
 be/src/exprs/runtime_filter_slots.h                |   3 +
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  11 +--
 be/src/pipeline/pipeline.cpp                       |   9 +++
 be/src/pipeline/pipeline_task.cpp                  |  25 ++++++
 be/src/vec/runtime/shared_hash_table_controller.h  |   1 -
 .../join/test_slow_close/test_slow_close.out       | Bin 0 -> 133 bytes
 .../join/test_slow_close/test_slow_close.groovy    |  88 +++++++++++++++++++++
 7 files changed, 128 insertions(+), 9 deletions(-)

diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index 160664a45f6..8337a5e9945 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -125,6 +125,9 @@ public:
     Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
         // process IN_OR_BLOOM_FILTER's real type
         for (auto filter : _runtime_filters) {
+            if (filter->get_ignored() || filter->get_disabled()) {
+                continue;
+            }
             if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
                 get_real_size(filter.get(), local_hash_table_size) >
                         state->runtime_filter_max_in_num()) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index bcad867495f..b25ea8a37c3 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -160,20 +160,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         SCOPED_TIMER(_publish_runtime_filter_timer);
         RETURN_IF_ERROR(_runtime_filter_slots->publish(state, 
!_should_build_hash_table));
     } catch (Exception& e) {
-        bool blocked_by_complete_build_stage = p._shared_hashtable_controller 
&&
-                                               
!p._shared_hash_table_context->complete_build_stage;
         bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
                                                    
p._shared_hashtable_controller &&
                                                    
!p._shared_hash_table_context->signaled;
 
         return Status::InternalError(
                 "rf process meet error: {}, wake_up_early: {}, 
should_build_hash_table: "
-                "{}, _finish_dependency: {}, blocked_by_complete_build_stage: 
{}, "
+                "{}, _finish_dependency: {},"
                 "blocked_by_shared_hash_table_signal: "
                 "{}",
                 e.to_string(), state->get_task()->wake_up_early(), 
_should_build_hash_table,
-                _finish_dependency->debug_string(), 
blocked_by_complete_build_stage,
-                blocked_by_shared_hash_table_signal);
+                _finish_dependency->debug_string(), 
blocked_by_shared_hash_table_signal);
     }
 
     return Base::close(state, exec_status);
@@ -618,7 +615,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                 local_state.process_build_block(state, 
(*local_state._shared_state->build_block)));
         if (_shared_hashtable_controller) {
             _shared_hash_table_context->status = Status::OK();
-            _shared_hash_table_context->complete_build_stage = true;
             // arena will be shared with other instances.
             _shared_hash_table_context->arena = 
local_state._shared_state->arena;
             _shared_hash_table_context->hash_table_variants =
@@ -631,8 +627,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
             _shared_hashtable_controller->signal(node_id());
         }
-    } else if (!local_state._should_build_hash_table &&
-               _shared_hash_table_context->complete_build_stage) {
+    } else if (!local_state._should_build_hash_table) {
         DCHECK(_shared_hashtable_controller != nullptr);
         DCHECK(_shared_hash_table_context != nullptr);
         // the instance which is not build hash table, it's should wait the 
signal of hash table build finished.
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 6c39d361e59..7397c3221d5 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -109,6 +109,15 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
 }
 
 void Pipeline::make_all_runnable() {
+    DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
+        auto pipeline_id = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                "Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
+        if (pipeline_id == id()) {
+            LOG(WARNING) << "Pipeline::make_all_runnable.sleep sleep 10s";
+            sleep(10);
+        }
+    });
+
     if (_sink->count_down_destination()) {
         for (auto* task : _tasks) {
             if (task) {
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 5b5698936d7..6bd95ff3d18 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -319,6 +319,18 @@ Status PipelineTask::execute(bool* eos) {
 
     // The status must be runnable
     if (!_opened && !_fragment_context->is_canceled()) {
+        DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", {
+            auto required_pipeline_id =
+                    
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                            "PipelineTask::execute.open_sleep", "pipeline_id", 
-1);
+            auto required_task_id = 
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                    "PipelineTask::execute.open_sleep", "task_id", -1);
+            if (required_pipeline_id == pipeline_id() && required_task_id == 
task_id()) {
+                LOG(WARNING) << "PipelineTask::execute.open_sleep sleep 5s";
+                sleep(5);
+            }
+        });
+
         if (_wake_up_early) {
             *eos = true;
             _eos = true;
@@ -385,6 +397,19 @@ Status PipelineTask::execute(bool* eos) {
 
         if (_block->rows() != 0 || *eos) {
             SCOPED_TIMER(_sink_timer);
+            DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", {
+                auto required_pipeline_id =
+                        
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                                "PipelineTask::execute.sink_eos_sleep", 
"pipeline_id", -1);
+                auto required_task_id =
+                        
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+                                "PipelineTask::execute.sink_eos_sleep", 
"task_id", -1);
+                if (required_pipeline_id == pipeline_id() && required_task_id 
== task_id()) {
+                    LOG(WARNING) << "PipelineTask::execute.sink_eos_sleep 
sleep 10s";
+                    sleep(10);
+                }
+            });
+
             Status status = _sink->sink(_state, block, *eos);
 
             if (status.is<ErrorCode::END_OF_FILE>()) {
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index 421faa4c1d9..b5be516dd53 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -68,7 +68,6 @@ struct SharedHashTableContext {
     std::map<int, RuntimeFilterContextSPtr> runtime_filters;
     std::atomic<bool> signaled = false;
     bool short_circuit_for_null_in_probe_side = false;
-    std::atomic<bool> complete_build_stage = false;
 };
 
 using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
diff --git 
a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out 
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out
new file mode 100644
index 00000000000..5e4d8ec9448
Binary files /dev/null and 
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out differ
diff --git 
a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy 
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
new file mode 100644
index 00000000000..0b36d2da5ab
--- /dev/null
+++ 
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_slow_close") {
+    sql "set disable_join_reorder=true;"
+    sql "set runtime_filter_type='bloom_filter';"
+    sql "set parallel_pipeline_task_num=3"
+    sql "set ignore_runtime_filter_ids='1,2';"
+    sql "set enable_runtime_filter_prune=false;"
+
+    sql """ drop table if exists t1; """
+    sql """ drop table if exists t3; """
+    sql """ drop table if exists t5; """
+
+    sql """
+        create table t1 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+        """
+
+    sql """
+        create table t3 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+
+    """
+
+    sql """
+        create table t5 (
+            k1 int null,
+            k2 int null
+        )
+        duplicate key (k1)
+        distributed BY hash(k1) buckets 16
+        properties("replication_num" = "1");
+    """
+
+    sql """
+    insert into t1 select e1,e1 from (select 1 k1) as t lateral view 
explode_numbers(100000) tmp1 as e1;
+    """
+    
+    sql """
+    insert into t3 values(1,1),(2,2),(3,3);
+    """
+
+    sql """
+    insert into t5 values(1,1),(2,2),(3,3),(4,4),(5,5);
+    """
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep",[pipeline_id:
 4])
+        qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join 
[broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] 
on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep")
+    }
+
+    sql "set ignore_runtime_filter_ids='0';"
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.open_sleep",[pipeline_id:
 4, task_id: 7])
+        
GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep",[pipeline_id:
 4, task_id: 15])
+        qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join 
[broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast] 
on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.open_sleep")
+        
GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to