This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 1acd8e9fcb5 [fix](spill) incorrect result of hash join (#34450) 1acd8e9fcb5 is described below commit 1acd8e9fcb553536780e0328abb62dcdd1fe290c Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Wed May 8 10:05:30 2024 +0800 [fix](spill) incorrect result of hash join (#34450) --- .../exec/partitioned_hash_join_probe_operator.cpp | 63 +++++++++++----------- .../exec/partitioned_hash_join_sink_operator.cpp | 6 +-- be/src/vec/core/block.cpp | 1 + 3 files changed, 37 insertions(+), 33 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index f617ab21b1d..21134487c2e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -197,7 +197,7 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state execution_context_lock = execution_context.lock(); } if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) + LOG(INFO) << "query: " << print_id(query_id) << " execution_context released, maybe query was cancelled."; return; } @@ -216,12 +216,11 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state COUNTER_UPDATE(_spill_build_blocks, 1); } } - --_spilling_task_count; - if (_spilling_task_count == 0) { + std::unique_lock<std::mutex> lock(_spill_lock); + if (_spilling_task_count.fetch_sub(1) == 1) { LOG(INFO) << "hash probe " << _parent->id() << " revoke memory spill_build_block finish"; - std::unique_lock<std::mutex> lock(_spill_lock); _dependency->set_ready(); } }); @@ -274,7 +273,7 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat execution_context_lock = execution_context.lock(); } if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) + LOG(INFO) << "query: " << print_id(query_id) << " execution_context released, maybe query was cancelled."; return; } @@ -298,19 +297,16 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat } } - --_spilling_task_count; - - if (_spilling_task_count == 0) { + std::unique_lock<std::mutex> lock(_spill_lock); + if (_spilling_task_count.fetch_sub(1) == 1) { LOG(INFO) << "hash probe " << _parent->id() << " revoke memory spill_probe_blocks finish"; - std::unique_lock<std::mutex> lock(_spill_lock); _dependency->set_ready(); } }); } else { - --_spilling_task_count; - if (_spilling_task_count == 0) { - std::unique_lock<std::mutex> lock(_spill_lock); + std::unique_lock<std::mutex> lock(_spill_lock); + if (_spilling_task_count.fetch_sub(1) == 1) { _dependency->set_ready(); } } @@ -365,8 +361,9 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti MonotonicStopWatch submit_timer; submit_timer.start(); - auto read_func = [this, query_id, mem_tracker, state, &spilled_stream, &mutable_block, - shared_state_holder, execution_context, submit_timer] { + auto read_func = [this, query_id, mem_tracker, state, spilled_stream = spilled_stream, + &mutable_block, shared_state_holder, execution_context, submit_timer, + partition_index] { SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id); std::shared_ptr<TaskExecutionContext> execution_context_lock; auto shared_state_sptr = shared_state_holder.lock(); @@ -374,7 +371,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti execution_context_lock = execution_context.lock(); } if (!shared_state_sptr || !execution_context_lock || state->is_cancelled()) { - LOG(INFO) << "query " << print_id(query_id) + LOG(INFO) << "query: " << print_id(query_id) << " execution_context released, maybe query was cancelled."; return; } @@ -420,9 +417,11 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti } } - LOG(INFO) << "recovery data done for partition: " << spilled_stream->get_spill_dir(); + VLOG_DEBUG << "query: " << print_id(state->query_id()) + << ", recovery data done for partition: " << spilled_stream->get_spill_dir() + << ", task id: " << state->task_id(); ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); - spilled_stream.reset(); + shared_state_sptr->spilled_streams[partition_index].reset(); _dependency->set_ready(); }; @@ -469,7 +468,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti execution_context_lock = execution_context.lock(); } if (!shared_state_sptr || !execution_context_lock) { - LOG(INFO) << "query " << print_id(query_id) + LOG(INFO) << "query: " << print_id(query_id) << " execution_context released, maybe query was cancelled."; return; } @@ -493,7 +492,8 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti } if (eos) { - LOG(INFO) << "recovery probe data done: " << spilled_stream->get_spill_dir(); + VLOG_DEBUG << "query: " << print_id(query_id) + << ", recovery probe data done: " << spilled_stream->get_spill_dir(); ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); spilled_stream.reset(); } @@ -677,9 +677,10 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( partitioned_block.reset(); } RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._runtime_state.get(), &block, true)); - LOG(INFO) << "internal build operator finished, node id: " << id() - << ", task id: " << state->task_id() - << ", partition: " << local_state._partition_cursor; + VLOG_DEBUG << "query: " << print_id(state->query_id()) + << ", internal build operator finished, node id: " << id() + << ", task id: " << state->task_id() + << ", partition: " << local_state._partition_cursor; return Status::OK(); } @@ -728,6 +729,9 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, if (!has_data) { vectorized::Block block; RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, true)); + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << node_id() + << ", task: " << state->task_id() << "partition: " << partition_index + << " has no data to recovery"; break; } else { return Status::OK(); @@ -746,6 +750,9 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, *eos = false; if (in_mem_eos) { + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " << node_id() + << ", task: " << state->task_id() + << ", partition: " << local_state._partition_cursor; local_state._partition_cursor++; if (local_state._partition_cursor == _partition_count) { *eos = true; @@ -770,12 +777,7 @@ bool PartitionedHashJoinProbeOperatorX::need_more_input_data(RuntimeState* state } bool PartitionedHashJoinProbeOperatorX::need_data_from_children(RuntimeState* state) const { - auto& local_state = get_local_state(state); - if (local_state._spilling_task_count != 0) { - return true; - } - - return JoinProbeOperatorX::need_data_from_children(state); + return true; } size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state) const { @@ -822,8 +824,9 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo return Status::OK(); } - LOG(INFO) << "hash probe " << id() - << " revoke memory, spill task count: " << local_state._spilling_task_count; + VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe node: " << id() + << ", task: " << state->task_id() + << ", revoke memory, spill task count: " << local_state._spilling_task_count; for (uint32_t i = spilling_start; i < _partition_count; ++i) { RETURN_IF_ERROR(local_state.spill_build_block(state, i)); RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 8924ee6f773..97d5d145604 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -171,7 +171,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta for (size_t block_idx = 0; block_idx != build_blocks.size(); ++block_idx) { auto& build_block = build_blocks[block_idx]; - const auto is_last_block = block_idx == build_blocks.size() - 1; + const auto is_last_block = (block_idx == (build_blocks.size() - 1)); if (UNLIKELY(build_block.empty())) { continue; } @@ -207,8 +207,6 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta partitions_indexes[partition_idx].clear(); } - build_block.clear(); - if (partition_block->rows() >= reserved_size || is_last_block) { if (!flush_rows(partition_block, spilling_stream)) { return; @@ -217,6 +215,8 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta vectorized::MutableBlock::create_unique(build_block.clone_empty()); } } + + build_block.clear(); } _dependency->set_ready(); diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index d30eee8fcef..83ecf568d6f 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -978,6 +978,7 @@ void MutableBlock::add_rows(const Block* block, const uint32_t* row_begin, DCHECK_EQ(_data_types[i]->get_name(), block_data[i].type->get_name()); auto& dst = _columns[i]; const auto& src = *block_data[i].column.get(); + DCHECK_GE(src.size(), row_end - row_begin); dst->insert_indices_from(src, row_begin, row_end); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org