This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 012f26a6cfc [fix](spill) Duplicate calls to Dependency::set_ready() in 
hash join (#37461)
012f26a6cfc is described below

commit 012f26a6cfcbfb27e420ae71e16795b4c6312a67
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Wed Jul 10 14:21:36 2024 +0800

    [fix](spill) Duplicate calls to Dependency::set_ready() in hash join 
(#37461)
    
    ## Proposed changes
    
    Duplicate calling the function `Dependency::set_ready()` will cause
    pipeline tasks to be scheduled incorrectly.
---
 be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 09976b3060e..8118b669ef8 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -204,7 +204,6 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
         VLOG_DEBUG << "query: " << print_id(query_id)
                    << " hash probe revoke done, node: " << p.node_id()
                    << ", task: " << state->task_id();
-        _dependency->set_ready();
         return Status::OK();
     };
 
@@ -340,7 +339,6 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
 
         
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
         shared_state_sptr->spilled_streams[partition_index].reset();
-        _dependency->set_ready();
         VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " 
<< _parent->node_id()
                    << ", task id: " << state->task_id() << ", partition: " << 
partition_index
                    << ", recovery build data done";
@@ -363,6 +361,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
             _spill_status_ok = false;
             _spill_status = std::move(status);
         }
+        _dependency->set_ready();
     };
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
@@ -459,8 +458,6 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
             
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
             spilled_stream.reset();
         }
-
-        _dependency->set_ready();
     };
 
     auto exception_catch_func = [read_func, query_id, this]() {
@@ -480,6 +477,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
             _spill_status_ok = false;
             _spill_status = std::move(status);
         }
+        _dependency->set_ready();
     };
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to