This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 1acd8e9fcb5 [fix](spill) incorrect result of hash join (#34450)
1acd8e9fcb5 is described below

commit 1acd8e9fcb553536780e0328abb62dcdd1fe290c
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Wed May 8 10:05:30 2024 +0800

    [fix](spill) incorrect result of hash join (#34450)
---
 .../exec/partitioned_hash_join_probe_operator.cpp  | 63 +++++++++++-----------
 .../exec/partitioned_hash_join_sink_operator.cpp   |  6 +--
 be/src/vec/core/block.cpp                          |  1 +
 3 files changed, 37 insertions(+), 33 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index f617ab21b1d..21134487c2e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -197,7 +197,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
             execution_context_lock = execution_context.lock();
         }
         if (!shared_state_sptr || !execution_context_lock) {
-            LOG(INFO) << "query " << print_id(query_id)
+            LOG(INFO) << "query: " << print_id(query_id)
                       << " execution_context released, maybe query was 
cancelled.";
             return;
         }
@@ -216,12 +216,11 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
                 COUNTER_UPDATE(_spill_build_blocks, 1);
             }
         }
-        --_spilling_task_count;
 
-        if (_spilling_task_count == 0) {
+        std::unique_lock<std::mutex> lock(_spill_lock);
+        if (_spilling_task_count.fetch_sub(1) == 1) {
             LOG(INFO) << "hash probe " << _parent->id()
                       << " revoke memory spill_build_block finish";
-            std::unique_lock<std::mutex> lock(_spill_lock);
             _dependency->set_ready();
         }
     });
@@ -274,7 +273,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
                 execution_context_lock = execution_context.lock();
             }
             if (!shared_state_sptr || !execution_context_lock) {
-                LOG(INFO) << "query " << print_id(query_id)
+                LOG(INFO) << "query: " << print_id(query_id)
                           << " execution_context released, maybe query was 
cancelled.";
                 return;
             }
@@ -298,19 +297,16 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
                 }
             }
 
-            --_spilling_task_count;
-
-            if (_spilling_task_count == 0) {
+            std::unique_lock<std::mutex> lock(_spill_lock);
+            if (_spilling_task_count.fetch_sub(1) == 1) {
                 LOG(INFO) << "hash probe " << _parent->id()
                           << " revoke memory spill_probe_blocks finish";
-                std::unique_lock<std::mutex> lock(_spill_lock);
                 _dependency->set_ready();
             }
         });
     } else {
-        --_spilling_task_count;
-        if (_spilling_task_count == 0) {
-            std::unique_lock<std::mutex> lock(_spill_lock);
+        std::unique_lock<std::mutex> lock(_spill_lock);
+        if (_spilling_task_count.fetch_sub(1) == 1) {
             _dependency->set_ready();
         }
     }
@@ -365,8 +361,9 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
     MonotonicStopWatch submit_timer;
     submit_timer.start();
 
-    auto read_func = [this, query_id, mem_tracker, state, &spilled_stream, 
&mutable_block,
-                      shared_state_holder, execution_context, submit_timer] {
+    auto read_func = [this, query_id, mem_tracker, state, spilled_stream = 
spilled_stream,
+                      &mutable_block, shared_state_holder, execution_context, 
submit_timer,
+                      partition_index] {
         SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
         std::shared_ptr<TaskExecutionContext> execution_context_lock;
         auto shared_state_sptr = shared_state_holder.lock();
@@ -374,7 +371,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
             execution_context_lock = execution_context.lock();
         }
         if (!shared_state_sptr || !execution_context_lock || 
state->is_cancelled()) {
-            LOG(INFO) << "query " << print_id(query_id)
+            LOG(INFO) << "query: " << print_id(query_id)
                       << " execution_context released, maybe query was 
cancelled.";
             return;
         }
@@ -420,9 +417,11 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
             }
         }
 
-        LOG(INFO) << "recovery data done for partition: " << 
spilled_stream->get_spill_dir();
+        VLOG_DEBUG << "query: " << print_id(state->query_id())
+                   << ", recovery data done for partition: " << 
spilled_stream->get_spill_dir()
+                   << ", task id: " << state->task_id();
         
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
-        spilled_stream.reset();
+        shared_state_sptr->spilled_streams[partition_index].reset();
         _dependency->set_ready();
     };
 
@@ -469,7 +468,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
             execution_context_lock = execution_context.lock();
         }
         if (!shared_state_sptr || !execution_context_lock) {
-            LOG(INFO) << "query " << print_id(query_id)
+            LOG(INFO) << "query: " << print_id(query_id)
                       << " execution_context released, maybe query was 
cancelled.";
             return;
         }
@@ -493,7 +492,8 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
         }
 
         if (eos) {
-            LOG(INFO) << "recovery probe data done: " << 
spilled_stream->get_spill_dir();
+            VLOG_DEBUG << "query: " << print_id(query_id)
+                       << ", recovery probe data done: " << 
spilled_stream->get_spill_dir();
             
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
             spilled_stream.reset();
         }
@@ -677,9 +677,10 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
         partitioned_block.reset();
     }
     
RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._runtime_state.get(), 
&block, true));
-    LOG(INFO) << "internal build operator finished, node id: " << id()
-              << ", task id: " << state->task_id()
-              << ", partition: " << local_state._partition_cursor;
+    VLOG_DEBUG << "query: " << print_id(state->query_id())
+               << ", internal build operator finished, node id: " << id()
+               << ", task id: " << state->task_id()
+               << ", partition: " << local_state._partition_cursor;
     return Status::OK();
 }
 
@@ -728,6 +729,9 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
             if (!has_data) {
                 vectorized::Block block;
                 RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, 
&block, true));
+                VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", 
node: " << node_id()
+                           << ", task: " << state->task_id() << "partition: " 
<< partition_index
+                           << " has no data to recovery";
                 break;
             } else {
                 return Status::OK();
@@ -746,6 +750,9 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
 
     *eos = false;
     if (in_mem_eos) {
+        VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " 
<< node_id()
+                   << ", task: " << state->task_id()
+                   << ", partition: " << local_state._partition_cursor;
         local_state._partition_cursor++;
         if (local_state._partition_cursor == _partition_count) {
             *eos = true;
@@ -770,12 +777,7 @@ bool 
PartitionedHashJoinProbeOperatorX::need_more_input_data(RuntimeState* state
 }
 
 bool PartitionedHashJoinProbeOperatorX::need_data_from_children(RuntimeState* 
state) const {
-    auto& local_state = get_local_state(state);
-    if (local_state._spilling_task_count != 0) {
-        return true;
-    }
-
-    return JoinProbeOperatorX::need_data_from_children(state);
+    return true;
 }
 
 size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* 
state) const {
@@ -822,8 +824,9 @@ Status 
PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo
         return Status::OK();
     }
 
-    LOG(INFO) << "hash probe " << id()
-              << " revoke memory, spill task count: " << 
local_state._spilling_task_count;
+    VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe 
node: " << id()
+               << ", task: " << state->task_id()
+               << ", revoke memory, spill task count: " << 
local_state._spilling_task_count;
     for (uint32_t i = spilling_start; i < _partition_count; ++i) {
         RETURN_IF_ERROR(local_state.spill_build_block(state, i));
         RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 8924ee6f773..97d5d145604 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -171,7 +171,7 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
 
         for (size_t block_idx = 0; block_idx != build_blocks.size(); 
++block_idx) {
             auto& build_block = build_blocks[block_idx];
-            const auto is_last_block = block_idx == build_blocks.size() - 1;
+            const auto is_last_block = (block_idx == (build_blocks.size() - 
1));
             if (UNLIKELY(build_block.empty())) {
                 continue;
             }
@@ -207,8 +207,6 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
                     partitions_indexes[partition_idx].clear();
                 }
 
-                build_block.clear();
-
                 if (partition_block->rows() >= reserved_size || is_last_block) 
{
                     if (!flush_rows(partition_block, spilling_stream)) {
                         return;
@@ -217,6 +215,8 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
                             
vectorized::MutableBlock::create_unique(build_block.clone_empty());
                 }
             }
+
+            build_block.clear();
         }
 
         _dependency->set_ready();
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index d30eee8fcef..83ecf568d6f 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -978,6 +978,7 @@ void MutableBlock::add_rows(const Block* block, const 
uint32_t* row_begin,
         DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name());
         auto& dst = _columns[i];
         const auto& src = *block_data[i].column.get();
+        DCHECK_GE(src.size(), row_end - row_begin);
         dst->insert_indices_from(src, row_begin, row_end);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to