This is an automated email from the ASF dual-hosted git repository. jacktengg pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit ed2d9786a106da804b918a2be5e3c2e0e81cbd56 Author: jacktengg <tengjianp...@selectdb.com> AuthorDate: Sun Dec 15 20:30:22 2024 +0800 test --- .../pipeline/exec/partitioned_hash_join_probe_operator.cpp | 14 ++++++++++++-- .../pipeline/exec/partitioned_hash_join_sink_operator.cpp | 14 +++++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 1cdb5ce7be0..e3f990c1667 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -339,8 +339,9 @@ Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim } } - if (_recovered_build_block->allocated_bytes() >= - vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { + auto block_bytes = _recovered_build_block->allocated_bytes(); + COUNTER_UPDATE(_memory_used_counter, block_bytes); + if (block_bytes >= vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { break; } } @@ -606,7 +607,10 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized:: } } + auto old_probe_blocks_bytes = local_state._probe_blocks_bytes->value(); COUNTER_SET(local_state._probe_blocks_bytes, bytes_of_blocks); + COUNTER_UPDATE(local_state._memory_used_counter, + local_state._probe_blocks_bytes->value() - old_probe_blocks_bytes); return Status::OK(); } @@ -924,8 +928,13 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori COUNTER_SET(local_state._memory_usage_reserved, int64_t(local_state.estimate_memory_usage())); }); + LOG(INFO) << "Query: " << print_id(state->query_id()) << ", hash probe node: " << node_id() + << ", task: " << state->task_id() + << " get_block, child eos: " << local_state._child_eos + << ", need spill: " << need_to_spill; if (need_more_input_data(state)) { + LOG(INFO) << "need more input data"; { SCOPED_TIMER(local_state._get_child_next_timer); RETURN_IF_ERROR(_child->get_block_after_projects(state, local_state._child_block.get(), @@ -955,6 +964,7 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori } if (!need_more_input_data(state)) { + LOG(INFO) << "not need more input data"; SCOPED_TIMER(local_state.exec_time_counter()); if (need_to_spill) { RETURN_IF_ERROR(pull(state, block, eos)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index fe78489cb98..220115c085e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -586,7 +586,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B if (local_state.revocable_mem_size(state) > 128 * 1024 * 1024) { VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << state->task_id() << " sink " << node_id() << " _child_eos: " << local_state._child_eos - << ", revocable memory: " << local_state.revocable_mem_size(state); + << ", revocable memory: " + << PrettyPrinter::print_bytes(local_state.revocable_mem_size(state)); } }}; const auto rows = in_block->rows(); @@ -600,6 +601,17 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B if (need_to_spill) { return revoke_memory(state, nullptr); } else { + const auto revocable_size = revocable_mem_size(state); + if (revocable_size >= config::revocable_memory_bytes_high_watermark) { + LOG(INFO) << fmt::format( + "Query: {}, sink name: {}, node id: {}, task id: {}, " + "revoke_memory " + "because revocable memory is high: {}", + print_id(state->query_id()), get_name(), node_id(), state->task_id(), + PrettyPrinter::print_bytes(revocable_size)); + return revoke_memory(state, nullptr); + } + if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { RETURN_IF_ERROR(_setup_internal_operator(state)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org