This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ed2d9786a106da804b918a2be5e3c2e0e81cbd56
Author: jacktengg <tengjianp...@selectdb.com>
AuthorDate: Sun Dec 15 20:30:22 2024 +0800

    test
---
 .../pipeline/exec/partitioned_hash_join_probe_operator.cpp | 14 ++++++++++++--
 .../pipeline/exec/partitioned_hash_join_sink_operator.cpp  | 14 +++++++++++++-
 2 files changed, 25 insertions(+), 3 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 1cdb5ce7be0..e3f990c1667 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -339,8 +339,9 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
                 }
             }
 
-            if (_recovered_build_block->allocated_bytes() >=
-                vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+            auto block_bytes = _recovered_build_block->allocated_bytes();
+            COUNTER_UPDATE(_memory_used_counter, block_bytes);
+            if (block_bytes >= 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
                 break;
             }
         }
@@ -606,7 +607,10 @@ Status 
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
         }
     }
 
+    auto old_probe_blocks_bytes = local_state._probe_blocks_bytes->value();
     COUNTER_SET(local_state._probe_blocks_bytes, bytes_of_blocks);
+    COUNTER_UPDATE(local_state._memory_used_counter,
+                   local_state._probe_blocks_bytes->value() - 
old_probe_blocks_bytes);
 
     return Status::OK();
 }
@@ -924,8 +928,13 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
         COUNTER_SET(local_state._memory_usage_reserved,
                     int64_t(local_state.estimate_memory_usage()));
     });
+    LOG(INFO) << "Query: " << print_id(state->query_id()) << ", hash probe 
node: " << node_id()
+              << ", task: " << state->task_id()
+              << " get_block, child eos: " << local_state._child_eos
+              << ", need spill: " << need_to_spill;
 
     if (need_more_input_data(state)) {
+        LOG(INFO) << "need more input data";
         {
             SCOPED_TIMER(local_state._get_child_next_timer);
             RETURN_IF_ERROR(_child->get_block_after_projects(state, 
local_state._child_block.get(),
@@ -955,6 +964,7 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
     }
 
     if (!need_more_input_data(state)) {
+        LOG(INFO) << "not need more input data";
         SCOPED_TIMER(local_state.exec_time_counter());
         if (need_to_spill) {
             RETURN_IF_ERROR(pull(state, block, eos));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index fe78489cb98..220115c085e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -586,7 +586,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
         if (local_state.revocable_mem_size(state) > 128 * 1024 * 1024) {
             VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task 
" << state->task_id()
                        << " sink " << node_id() << " _child_eos: " << 
local_state._child_eos
-                       << ", revocable memory: " << 
local_state.revocable_mem_size(state);
+                       << ", revocable memory: "
+                       << 
PrettyPrinter::print_bytes(local_state.revocable_mem_size(state));
         }
     }};
     const auto rows = in_block->rows();
@@ -600,6 +601,17 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
             if (need_to_spill) {
                 return revoke_memory(state, nullptr);
             } else {
+                const auto revocable_size = revocable_mem_size(state);
+                if (revocable_size >= 
config::revocable_memory_bytes_high_watermark) {
+                    LOG(INFO) << fmt::format(
+                            "Query: {}, sink name: {}, node id: {}, task id: 
{}, "
+                            "revoke_memory "
+                            "because revocable memory is high: {}",
+                            print_id(state->query_id()), get_name(), 
node_id(), state->task_id(),
+                            PrettyPrinter::print_bytes(revocable_size));
+                    return revoke_memory(state, nullptr);
+                }
+
                 if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) 
{
                     RETURN_IF_ERROR(_setup_internal_operator(state));
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to