This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7464f461bc6 [Bug](runtime-filter) avoid ignore rf multiple times 
(#44408) (#44625)
7464f461bc6 is described below

commit 7464f461bc6234ecc8106d9ef71de8b4bf3f7282
Author: Pxl <x...@selectdb.com>
AuthorDate: Wed Nov 27 10:51:02 2024 +0800

    [Bug](runtime-filter) avoid ignore rf multiple times (#44408) (#44625)
    
    avoid ignore rf multiple times
    
    When an instance is wake_up_by_downstream, it will destroy the logic of
    `first closing the instance that should build hash table and then
    closing other instances`, instance that should build hash table during
    the process of inserting data into rf, this instance may find that rf is
    ignored, causing dcheck to fail.
    
    ```cpp
     F20241121 12:01:02.245405 9832 runtime_filter.cpp:395] Check failed: 
!is_ignored()
    
    3# raise at ../sysdeps/posix/raise.c:27
    4# abort at ./stdlib/abort.c:81
    5# 0x00005603CFDF778D in 
/mnt/ssd01/doris-branch40preview/NEREIDS_ASAN/be/lib/doris_be
    6# 0x00005603CFDE9DCA in 
/mnt/ssd01/doris-branch40preview/NEREIDS_ASAN/be/lib/doris_be
    7# google::LogMessage::SendToLog() in 
/mnt/ssd01/doris-branch40preview/NEREIDS_ASAN/be/lib/doris_be
    8# google::LogMessage::Flush() in 
/mnt/ssd01/doris-branch40preview/NEREIDS_ASAN/be/lib/doris_be
    9# google::LogMessageFatal::~LogMessageFatal() in 
/mnt/ssd01/doris-branch40preview/NEREIDS_ASAN/be/lib/doris_be
    10# 
doris::RuntimePredicateWrapper::insert_fixed_len(COW<doris::vectorized::IColumn>::immutable_ptr<doris::vectorized::IColumn>
 const&, unsigned long) at 
/home/zcp/repo_center/doris_branch-3.0/doris/be/src/exprs/runtime_filter.cpp:395
    11# 
doris::RuntimePredicateWrapper::insert_batch(COW<doris::vectorized::IColumn>::immutable_ptr<doris::vectorized::IColumn>
 const&, unsigned long) at 
/home/zcp/repo_center/doris_branch-3.0/doris/be/src/exprs/runtime_filter.cpp:431
    12# 
doris::IRuntimeFilter::insert_batch(COW<doris::vectorized::IColumn>::immutable_ptr<doris::vectorized::IColumn>,
 unsigned long) at 
/home/zcp/repo_center/doris_branch-3.0/doris/be/src/exprs/runtime_filter.cpp:992
    13# doris::VRuntimeFilterSlots::insert(doris::vectorized::Block const*) at 
/home/zcp/repo_center/doris_branch-3.0/doris/be/src/exprs/runtime_filter_slots.h:146
    14# 
doris::pipeline::HashJoinBuildSinkLocalState::close(doris::RuntimeState*, 
doris::Status) in /mnt/ssd01/doris-branch40preview/NEREIDS_ASAN/be/lib/doris_be
    15# doris::pipeline::DataSinkOperatorXBase::close(doris::RuntimeState*, 
doris::Status) in /mnt/ssd01/doris-branch40preview/NEREIDS_ASAN/be/lib/doris_be
    16# doris::pipeline::PipelineTask::close(doris::Status) at 
/home/zcp/repo_center/doris_branch-3.0/doris/be/src/pipeline/pipeline_task.cpp:487
    ```
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp | 34 ++++++++++++++++------------
 1 file changed, 19 insertions(+), 15 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index a177c1b59e1..3e364d78594 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -128,7 +128,11 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
     }
     auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
     Defer defer {[&]() {
-        if (_should_build_hash_table && p._shared_hashtable_controller) {
+        if (!_should_build_hash_table) {
+            return;
+        }
+
+        if (p._shared_hashtable_controller) {
             p._shared_hashtable_controller->signal_finish(p.node_id());
         }
     }};
@@ -137,22 +141,22 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         return Base::close(state, exec_status);
     }
 
-    if (state->get_task()->wake_up_by_downstream()) {
-        RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, 
_finish_dependency));
-        RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
-    } else {
-        auto* block = _shared_state->build_block.get();
-        uint64_t hash_table_size = block ? block->rows() : 0;
-        {
-            SCOPED_TIMER(_runtime_filter_init_timer);
-            if (_should_build_hash_table) {
+    if (_should_build_hash_table) {
+        if (state->get_task()->wake_up_by_downstream()) {
+            RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, 
_finish_dependency));
+            RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
+        } else {
+            auto* block = _shared_state->build_block.get();
+            uint64_t hash_table_size = block ? block->rows() : 0;
+            {
+                SCOPED_TIMER(_runtime_filter_init_timer);
                 RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
+                RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
+            }
+            if (hash_table_size > 1) {
+                SCOPED_TIMER(_runtime_filter_compute_timer);
+                _runtime_filter_slots->insert(block);
             }
-            RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
-        }
-        if (_should_build_hash_table && hash_table_size > 1) {
-            SCOPED_TIMER(_runtime_filter_compute_timer);
-            _runtime_filter_slots->insert(block);
         }
     }
     SCOPED_TIMER(_publish_runtime_filter_timer);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to