This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5571b1f93fb [improvement](spill) optimize the spilling logic of hash join operator (#32202) 5571b1f93fb is described below commit 5571b1f93fbd342bc8fd3accda3f18048efaf4d2 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Wed Mar 20 08:20:40 2024 +0800 [improvement](spill) optimize the spilling logic of hash join operator (#32202) --- .../exec/partitioned_hash_join_probe_operator.cpp | 86 +++++++++++++++++++--- .../exec/partitioned_hash_join_probe_operator.h | 8 ++ .../exec/partitioned_hash_join_sink_operator.cpp | 33 ++++++++- .../exec/partitioned_hash_join_sink_operator.h | 6 ++ be/src/pipeline/pipeline_x/dependency.h | 1 + .../pipeline_x/pipeline_x_fragment_context.cpp | 2 +- 6 files changed, 121 insertions(+), 15 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index f37f28d59c7..93ce24e8e72 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -58,6 +58,21 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI _recovery_probe_blocks = ADD_CHILD_COUNTER(profile(), "RecoveryProbeBlocks", TUnit::UNIT, "SpillAndPartition"); + _spill_serialize_block_timer = ADD_CHILD_TIMER_WITH_LEVEL( + Base::profile(), "SpillSerializeBlockTime", "SpillAndPartition", 1); + _spill_write_disk_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", + "SpillAndPartition", 1); + _spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize", + TUnit::BYTES, "SpillAndPartition", 1); + _spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", + TUnit::UNIT, "SpillAndPartition", 1); + _spill_read_data_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDataTime", + "SpillAndPartition", 1); + _spill_deserialize_time = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime", + "SpillAndPartition", 1); + _spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize", + TUnit::BYTES, "SpillAndPartition", 1); + // Build phase _build_phase_label = ADD_LABEL_COUNTER(profile(), "BuildPhase"); _build_rows_counter = ADD_CHILD_COUNTER(profile(), "BuildRows", TUnit::UNIT, "BuildPhase"); @@ -141,7 +156,7 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state uint32_t partition_index) { auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks; auto& mutable_block = partitioned_build_blocks[partition_index]; - if (!mutable_block || mutable_block->rows() == 0) { + if (!mutable_block || mutable_block->rows() < state->batch_size()) { --_spilling_task_count; return Status::OK(); } @@ -153,6 +168,8 @@ Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state _parent->id(), std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), _runtime_profile.get())); RETURN_IF_ERROR(build_spilling_stream->prepare_spill()); + build_spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, + _spill_data_size, _spill_write_disk_timer); } auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool( @@ -191,18 +208,28 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), _runtime_profile.get())); RETURN_IF_ERROR(spilling_stream->prepare_spill()); + spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, + _spill_data_size, _spill_write_disk_timer); } auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool( spilling_stream->get_spill_root_dir()); auto& blocks = _probe_blocks[partition_index]; + auto& partitioned_block = _partitioned_blocks[partition_index]; + if (partitioned_block && partitioned_block->rows() >= state->batch_size()) { + blocks.emplace_back(partitioned_block->to_block()); + partitioned_block.reset(); + } if (!blocks.empty()) { return spill_io_pool->submit_func([state, &blocks, &spilling_stream, this] { (void)state; // avoid ut compile error SCOPED_ATTACH_TASK(state); - for (auto& block : blocks) { + COUNTER_UPDATE(_spill_probe_blocks, blocks.size()); + while (!blocks.empty()) { + auto block = std::move(blocks.back()); + blocks.pop_back(); if (_spill_status_ok) { auto st = spilling_stream->spill_block(block, false); if (!st.ok()) { @@ -217,8 +244,6 @@ Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat } } - COUNTER_UPDATE(_spill_probe_blocks, blocks.size()); - blocks.clear(); --_spilling_task_count; if (_spilling_task_count == 0) { @@ -241,6 +266,8 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in if (build_spilling_stream) { build_spilling_stream->end_spill(Status::OK()); RETURN_IF_ERROR(build_spilling_stream->spill_eof()); + build_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time, + _spill_read_bytes); } auto& probe_spilling_stream = _probe_spilling_streams[partition_index]; @@ -248,6 +275,8 @@ Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in if (probe_spilling_stream) { probe_spilling_stream->end_spill(Status::OK()); RETURN_IF_ERROR(probe_spilling_stream->spill_eof()); + probe_spilling_stream->set_read_counters(_spill_read_data_time, _spill_deserialize_time, + _spill_read_bytes); } return Status::OK(); @@ -259,6 +288,8 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti auto& spilled_stream = _shared_state->spilled_streams[partition_index]; has_data = false; if (!spilled_stream) { + LOG(INFO) << "no data need to recovery for partition: " << partition_index + << ", node id: " << _parent->id() << ", task id: " << state->task_id(); return Status::OK(); } @@ -288,6 +319,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti continue; } + DCHECK_EQ(mutable_block->columns(), block.columns()); if (mutable_block->empty()) { *mutable_block = std::move(block); } else { @@ -301,6 +333,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti } } + LOG(INFO) << "recovery data done for partition: " << spilled_stream->get_spill_dir(); ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); spilled_stream.reset(); _dependency->set_ready(); @@ -350,6 +383,7 @@ Status PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti } if (eos) { + LOG(INFO) << "recovery probe data done: " << spilled_stream->get_spill_dir(); ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream); spilled_stream.reset(); } @@ -401,10 +435,17 @@ Status PartitionedHashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeSt return _probe_operator->init(tnode_, state); } Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) { - RETURN_IF_ERROR(OperatorXBase::prepare(state)); + // here do NOT call `OperatorXBase::prepare(state)` + // RETURN_IF_ERROR(OperatorXBase::prepare(state)); + for (auto& conjunct : _conjuncts) { + RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc())); + } + + RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc())); RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); RETURN_IF_ERROR(_probe_operator->set_child(_child_x)); - RETURN_IF_ERROR(_probe_operator->set_child(_build_side_child)); + DCHECK(_build_side_child != nullptr); + _probe_operator->set_build_side_child(_build_side_child); RETURN_IF_ERROR(_sink_operator->set_child(_build_side_child)); RETURN_IF_ERROR(_probe_operator->prepare(state)); RETURN_IF_ERROR(_sink_operator->prepare(state)); @@ -524,6 +565,9 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( partitioned_block.reset(); } RETURN_IF_ERROR(_sink_operator->sink(local_state._runtime_state.get(), &block, true)); + LOG(INFO) << "internal build operator finished, node id: " << id() + << ", task id: " << state->task_id() + << ", partition: " << local_state._partition_cursor; return Status::OK(); } @@ -615,18 +659,25 @@ bool PartitionedHashJoinProbeOperatorX::need_data_from_children(RuntimeState* st size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state) const { auto& local_state = get_local_state(state); size_t mem_size = 0; + uint32_t spilling_start = local_state._child_eos ? local_state._partition_cursor + 1 : 0; + DCHECK_GE(spilling_start, local_state._partition_cursor); auto& partitioned_build_blocks = local_state._shared_state->partitioned_build_blocks; auto& probe_blocks = local_state._probe_blocks; - for (uint32_t i = local_state._partition_cursor + 1; i < _partition_count; ++i) { + for (uint32_t i = spilling_start; i < _partition_count; ++i) { auto& build_block = partitioned_build_blocks[i]; - if (build_block && build_block->rows() > 0) { + if (build_block && build_block->rows() >= state->batch_size()) { mem_size += build_block->allocated_bytes(); } for (auto& block : probe_blocks[i]) { mem_size += block.allocated_bytes(); } + + auto& partitioned_block = local_state._partitioned_blocks[i]; + if (partitioned_block && partitioned_block->rows() >= state->batch_size()) { + mem_size += partitioned_block->allocated_bytes(); + } } return mem_size; } @@ -634,14 +685,16 @@ size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bool& wait_for_io) { auto& local_state = get_local_state(state); wait_for_io = false; - if (_partition_count > (local_state._partition_cursor + 1)) { - local_state._spilling_task_count = - (_partition_count - local_state._partition_cursor - 1) * 2; + uint32_t spilling_start = local_state._child_eos ? local_state._partition_cursor + 1 : 0; + DCHECK_GE(spilling_start, local_state._partition_cursor); + + if (_partition_count > spilling_start) { + local_state._spilling_task_count = (_partition_count - spilling_start) * 2; } else { return Status::OK(); } - for (uint32_t i = local_state._partition_cursor + 1; i < _partition_count; ++i) { + for (uint32_t i = spilling_start; i < _partition_count; ++i) { RETURN_IF_ERROR(local_state.spill_build_block(state, i)); RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i)); } @@ -657,6 +710,14 @@ Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo } bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* state) const { + auto& local_state = get_local_state(state); + + if (local_state._shared_state->need_to_spill) { + const auto revocable_size = revocable_mem_size(state); + const auto min_revocable_size = state->min_revocable_mem(); + return revocable_size > min_revocable_size; + } + auto sys_mem_available = MemInfo::sys_mem_available(); auto sys_mem_warning_water_mark = doris::MemInfo::sys_mem_available_warning_water_mark(); @@ -692,6 +753,7 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori bool wait_for_io = false; RETURN_IF_ERROR(_revoke_memory(state, wait_for_io)); if (wait_for_io) { + local_state._shared_state->need_to_spill = true; return Status::OK(); } } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 27c4a9e0bb2..adbaf19314f 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -100,6 +100,14 @@ private: RuntimeProfile::Counter* _recovery_probe_rows = nullptr; RuntimeProfile::Counter* _recovery_probe_blocks = nullptr; + RuntimeProfile::Counter* _spill_read_data_time = nullptr; + RuntimeProfile::Counter* _spill_deserialize_time = nullptr; + RuntimeProfile::Counter* _spill_read_bytes = nullptr; + RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr; + RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; + RuntimeProfile::Counter* _spill_data_size = nullptr; + RuntimeProfile::Counter* _spill_block_count = nullptr; + RuntimeProfile::Counter* _build_phase_label = nullptr; RuntimeProfile::Counter* _build_rows_counter = nullptr; RuntimeProfile::Counter* _publish_runtime_filter_timer = nullptr; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 5cbf2b15ec1..20b3531c0cd 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -36,6 +36,11 @@ Status PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state, _partition_timer = ADD_TIMER(profile(), "PartitionTime"); _partition_shuffle_timer = ADD_TIMER(profile(), "PartitionShuffleTime"); + _spill_serialize_block_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillSerializeBlockTime", 1); + _spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(profile(), "SpillWriteDiskTime", 1); + _spill_data_size = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteDataSize", TUnit::BYTES, 1); + _spill_block_count = ADD_COUNTER_WITH_LEVEL(profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); + return _partitioner->prepare(state, p._child_x->row_desc()); } @@ -51,7 +56,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i]; auto& mutable_block = _shared_state->partitioned_build_blocks[i]; - if (!mutable_block || mutable_block->rows() == 0) { + if (!mutable_block || mutable_block->rows() < state->batch_size()) { + --_spilling_streams_count; continue; } @@ -61,6 +67,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { _parent->id(), std::numeric_limits<int32_t>::max(), std::numeric_limits<size_t>::max(), _profile)); RETURN_IF_ERROR(spilling_stream->prepare_spill()); + spilling_stream->set_write_counters(_spill_serialize_block_timer, _spill_block_count, + _spill_data_size, _spill_write_disk_timer); } auto* spill_io_pool = @@ -79,9 +87,14 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { } if (_spilling_streams_count > 0) { + _shared_state->need_to_spill = true; std::unique_lock<std::mutex> lock(_spill_lock); if (_spilling_streams_count > 0) { _dependency->block(); + } else if (_child_eos) { + LOG(INFO) << "sink eos, set_ready_to_read, node id: " << _parent->id() + << ", task id: " << state->task_id(); + _dependency->set_ready_to_read(); } } return Status::OK(); @@ -108,6 +121,11 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk( if (_spilling_streams_count == 0) { std::unique_lock<std::mutex> lock(_spill_lock); _dependency->set_ready(); + if (_child_eos) { + LOG(INFO) << "sink eos, set_ready_to_read, node id: " << _parent->id() + << ", task id: " << state()->task_id(); + _dependency->set_ready_to_read(); + } } } @@ -157,6 +175,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B return local_state._spill_status; } + local_state._child_eos = eos; + const auto rows = in_block->rows(); if (rows > 0) { @@ -190,9 +210,18 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B partitioned_blocks[i]->add_rows(in_block, &(partition_indexes[i][0]), &(partition_indexes[i][count])); } + + if (local_state._shared_state->need_to_spill) { + const auto revocable_size = revocable_mem_size(state); + if (revocable_size > state->min_revocable_mem()) { + return local_state.revoke_memory(state); + } + } } if (eos) { + LOG(INFO) << "sink eos, set_ready_to_read, node id: " << id() + << ", task id: " << state->task_id(); local_state._dependency->set_ready_to_read(); } @@ -207,7 +236,7 @@ size_t PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state) size_t mem_size = 0; for (uint32_t i = 0; i != _partition_count; ++i) { auto& block = partitioned_blocks[i]; - if (block) { + if (block && block->rows() >= state->batch_size()) { mem_size += block->allocated_bytes(); } } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 152b36459b3..f2d5ca3e140 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -61,6 +61,8 @@ protected: std::atomic<bool> _spill_status_ok {true}; std::mutex _spill_lock; + bool _child_eos {false}; + Status _spill_status; std::mutex _spill_status_lock; @@ -68,6 +70,10 @@ protected: RuntimeProfile::Counter* _partition_timer = nullptr; RuntimeProfile::Counter* _partition_shuffle_timer = nullptr; + RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr; + RuntimeProfile::Counter* _spill_write_disk_timer = nullptr; + RuntimeProfile::Counter* _spill_data_size = nullptr; + RuntimeProfile::Counter* _spill_block_count = nullptr; }; class PartitionedHashJoinSinkOperatorX diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index db2b0341fe4..7815d4a9ce0 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -578,6 +578,7 @@ struct HashJoinSharedState : public JoinSharedState { struct PartitionedHashJoinSharedState : public HashJoinSharedState { std::vector<std::unique_ptr<vectorized::MutableBlock>> partitioned_build_blocks; std::vector<vectorized::SpillStreamSPtr> spilled_streams; + bool need_to_spill = false; }; struct NestedLoopJoinSharedState : public JoinSharedState { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 1df715176ac..ecb7023be87 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1031,7 +1031,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN tnode.hash_join_node.is_broadcast_join; const auto enable_join_spill = _runtime_state->enable_join_spill(); if (enable_join_spill && !is_broadcast_join) { - const uint32_t partition_count = 16; + const uint32_t partition_count = 32; op.reset(new PartitionedHashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs, partition_count)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org