This is an automated email from the ASF dual-hosted git repository.

jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 085897a6857 improve logs
085897a6857 is described below

commit 085897a6857ac71fc6da7848409471c8119163a4
Author: jacktengg <tengjianp...@selectdb.com>
AuthorDate: Tue Dec 17 11:34:02 2024 +0800

    improve logs
---
 be/src/common/config.cpp                           |   2 -
 be/src/common/config.h                             |   2 -
 be/src/pipeline/exec/operator.h                    |   2 -
 .../pipeline/exec/partition_sort_sink_operator.cpp |   4 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |  37 +++++---
 .../exec/partitioned_aggregation_sink_operator.h   |   6 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  |  14 +--
 .../exec/partitioned_hash_join_sink_operator.cpp   | 100 ++++++++++++---------
 .../exec/partitioned_hash_join_sink_operator.h     |   2 -
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |   6 +-
 be/src/pipeline/exec/spill_sort_sink_operator.h    |   1 -
 be/src/pipeline/pipeline_task.cpp                  |   1 -
 be/src/runtime/memory/global_memory_arbitrator.h   |   2 +-
 be/src/runtime/memory/memory_profile.cpp           |   9 +-
 be/src/runtime/query_context.h                     |   3 -
 be/src/runtime/runtime_state.h                     |   2 +-
 be/src/vec/common/allocator.cpp                    |   7 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |   2 +-
 gensrc/thrift/PaloInternalService.thrift           |   2 +-
 19 files changed, 102 insertions(+), 102 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f8b7b390ee9..f0184db7067 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1400,8 +1400,6 @@ DEFINE_mBool(enable_delete_bitmap_merge_on_compaction, 
"false");
 DEFINE_Bool(enable_table_size_correctness_check, "false");
 DEFINE_Bool(force_regenerate_rowsetid_on_start_error, "false");
 
-DEFINE_mInt32(revocable_memory_bytes_high_watermark, "5");
-
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 1b9b8a3d531..d935f5278f2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1483,8 +1483,6 @@ DECLARE_mBool(enable_delete_bitmap_merge_on_compaction);
 // Enable validation to check the correctness of table size.
 DECLARE_Bool(enable_table_size_correctness_check);
 
-DECLARE_mInt32(revocable_memory_bytes_high_watermark);
-
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 723e14f4bc7..af13ded196e 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -589,8 +589,6 @@ public:
         return state->minimum_operator_memory_required_bytes();
     }
 
-    [[nodiscard]] virtual bool is_spilled(RuntimeState* state) const { return 
false; }
-
     [[nodiscard]] bool is_spillable() const { return _spillable; }
 
     template <class TARGET>
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 759c7ea2bcc..b90af92f6b1 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -70,9 +70,7 @@ 
PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int ope
           _topn_phase(tnode.partition_sort_node.ptopn_phase),
           _has_global_limit(tnode.partition_sort_node.has_global_limit),
           _top_n_algorithm(tnode.partition_sort_node.top_n_algorithm),
-          
_partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {
-    _spillable = true;
-}
+          
_partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {}
 
 Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 6f008a3b1f2..58b272b3ac8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -27,6 +27,7 @@
 #include "pipeline/exec/spill_utils.h"
 #include "pipeline/pipeline_task.h"
 #include "runtime/fragment_mgr.h"
+#include "util/pretty_printer.h"
 #include "util/runtime_profile.h"
 #include "vec/spill/spill_stream.h"
 #include "vec/spill/spill_stream_manager.h"
@@ -180,7 +181,19 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
         return Status::Error<INTERNAL_ERROR>("fault_inject 
partitioned_agg_sink sink failed");
     });
     RETURN_IF_ERROR(_agg_sink_operator->sink(runtime_state, in_block, false));
+
+    size_t revocable_size = 0;
+    int64_t query_mem_limit = 0;
     if (eos) {
+        revocable_size = revocable_mem_size(state);
+        query_mem_limit = state->get_query_ctx()->get_mem_limit();
+        LOG(INFO) << fmt::format(
+                "Query: {}, task {}, agg sink {} eos, need spill: {}, query 
mem limit: {}, "
+                "revocable memory: {}",
+                print_id(state->query_id()), state->task_id(), node_id(),
+                local_state._shared_state->is_spilled, 
PrettyPrinter::print_bytes(query_mem_limit),
+                PrettyPrinter::print_bytes(revocable_size));
+
         if (local_state._shared_state->is_spilled) {
             if (revocable_mem_size(state) > 0) {
                 RETURN_IF_ERROR(revoke_memory(state, nullptr));
@@ -256,10 +269,12 @@ size_t 
PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bo
 Status PartitionedAggSinkLocalState::revoke_memory(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     const auto size_to_revoke = _parent->revocable_mem_size(state);
-    VLOG_DEBUG << "Query " << print_id(state->query_id()) << " agg node "
-               << Base::_parent->node_id()
-               << " revoke_memory, size: " << 
_parent->revocable_mem_size(state)
-               << ", eos: " << _eos;
+    LOG(INFO) << fmt::format(
+            "Query: {}, task {}, agg sink {} revoke_memory, eos: {}, need 
spill: {}, revocable "
+            "memory: {}",
+            print_id(state->query_id()), state->task_id(), _parent->node_id(), 
_eos,
+            _shared_state->is_spilled,
+            PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
     if (!_shared_state->is_spilled) {
         _shared_state->is_spilled = true;
         profile()->add_info_string("Spilled", "true");
@@ -309,9 +324,12 @@ Status PartitionedAggSinkLocalState::revoke_memory(
                         }
                         _shared_state->close();
                     } else {
-                        VLOG_DEBUG << "Query " << print_id(query_id) << " agg 
node "
-                                   << Base::_parent->node_id() << " 
revoke_memory finish, size: "
-                                   << _parent->revocable_mem_size(state) << ", 
eos: " << _eos;
+                        LOG(INFO) << fmt::format(
+                                "Query: {}, task {}, agg sink {} revoke_memory 
finish, eos: {}, "
+                                "revocable memory: {}",
+                                print_id(state->query_id()), state->task_id(), 
_parent->node_id(),
+                                _eos,
+                                
PrettyPrinter::print_bytes(_parent->revocable_mem_size(state)));
                     }
 
                     if (_eos) {
@@ -341,9 +359,4 @@ Status PartitionedAggSinkLocalState::revoke_memory(
             std::move(spill_runnable));
 }
 
-bool PartitionedAggSinkOperatorX::is_spilled(RuntimeState* state) const {
-    auto& local_state = get_local_state(state);
-    return local_state._shared_state->is_spilled;
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 922798707d0..499db4919e7 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -22,6 +22,7 @@
 #include "aggregation_sink_operator.h"
 #include "pipeline/dependency.h"
 #include "pipeline/exec/operator.h"
+#include "util/pretty_printer.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/spill/spill_stream.h"
@@ -83,8 +84,7 @@ public:
                                                total_rows / size_to_revoke_));
 
         VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " 
<< _parent->node_id()
-                   << ", spill_batch_rows: " << spill_batch_rows << ", total 
rows: " << total_rows
-                   << ", size_to_revoke: " << size_to_revoke;
+                   << ", spill_batch_rows: " << spill_batch_rows << ", total 
rows: " << total_rows;
         size_t row_count = 0;
 
         std::vector<TmpSpillInfo<typename HashTableType::key_type>> 
spill_infos(
@@ -333,8 +333,6 @@ public:
 
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
-    bool is_spilled(RuntimeState* state) const override;
-
 private:
     friend class PartitionedAggSinkLocalState;
     std::unique_ptr<AggSinkOperatorX> _agg_sink_operator;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index bbbcb9b9d5e..f6cea157cd5 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -339,9 +339,8 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
                 }
             }
 
-            auto block_bytes = _recovered_build_block->allocated_bytes();
-            COUNTER_UPDATE(_memory_used_counter, block_bytes);
-            if (block_bytes >= 
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+            if (_recovered_build_block->allocated_bytes() >=
+                vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
                 break;
             }
         }
@@ -608,10 +607,7 @@ Status 
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
         }
     }
 
-    auto old_probe_blocks_bytes = local_state._probe_blocks_bytes->value();
     COUNTER_SET(local_state._probe_blocks_bytes, bytes_of_blocks);
-    COUNTER_UPDATE(local_state._memory_used_counter,
-                   local_state._probe_blocks_bytes->value() - 
old_probe_blocks_bytes);
 
     return Status::OK();
 }
@@ -933,13 +929,8 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
         COUNTER_SET(local_state._memory_usage_reserved,
                     int64_t(local_state.estimate_memory_usage()));
     });
-    LOG(INFO) << "Query: " << print_id(state->query_id()) << ", hash probe 
node: " << node_id()
-              << ", task: " << state->task_id()
-              << " get_block, child eos: " << local_state._child_eos
-              << ", need spill: " << need_to_spill;
 
     if (need_more_input_data(state)) {
-        LOG(INFO) << "need more input data";
         {
             SCOPED_TIMER(local_state._get_child_next_timer);
             RETURN_IF_ERROR(_child->get_block_after_projects(state, 
local_state._child_block.get(),
@@ -969,7 +960,6 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
     }
 
     if (!need_more_input_data(state)) {
-        LOG(INFO) << "not need more input data";
         SCOPED_TIMER(local_state.exec_time_counter());
         if (need_to_spill) {
             RETURN_IF_ERROR(pull(state, block, eos));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index dad82b6cc8a..6cf9e658a60 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -270,7 +270,8 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
                           });
             status = _finish_spilling();
             VLOG_DEBUG << fmt::format(
-                    "Query: {}, task {}, sink {} _revoke_unpartitioned_block 
set_ready_to_read",
+                    "Query: {}, task {}, hash join sink {} 
_revoke_unpartitioned_block "
+                    "set_ready_to_read",
                     print_id(state->query_id()), state->task_id(), 
_parent->node_id());
             _dependency->set_ready_to_read();
         }
@@ -303,7 +304,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
         RuntimeState* state, const std::shared_ptr<SpillContext>& 
spill_context) {
     SCOPED_TIMER(_spill_total_timer);
     VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << 
state->task_id()
-               << " sink " << _parent->node_id() << " revoke_memory"
+               << " hash join sink " << _parent->node_id() << " revoke_memory"
                << ", eos: " << _child_eos;
     CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr);
 
@@ -321,9 +322,9 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
     auto spill_fin_cb = [this, state, query_id, spill_context]() {
         Status status;
         if (_child_eos) {
-            VLOG_DEBUG << "Query:" << print_id(this->state()->query_id()) << 
", task "
-                       << state->task_id() << " sink " << _parent->node_id()
-                       << " set_ready_to_read";
+            LOG(INFO) << "Query:" << print_id(this->state()->query_id()) << ", 
task "
+                      << state->task_id() << " hash join sink " << 
_parent->node_id()
+                      << " finish spilling, set_ready_to_read";
             std::for_each(_shared_state->partitioned_build_blocks.begin(),
                           _shared_state->partitioned_build_blocks.end(), 
[&](auto& block) {
                               if (block) {
@@ -542,43 +543,28 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
 
     local_state._child_eos = eos;
 
-    Defer defer_dgb {[&]() {
-        if (local_state.revocable_mem_size(state) > 128 * 1024 * 1024) {
-            VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task 
" << state->task_id()
-                       << " sink " << node_id() << " _child_eos: " << 
local_state._child_eos
-                       << ", revocable memory: "
-                       << 
PrettyPrinter::print_bytes(local_state.revocable_mem_size(state));
-        }
-    }};
     const auto rows = in_block->rows();
 
     const auto need_to_spill = local_state._shared_state->need_to_spill;
+    size_t revocable_size = 0;
+    int64_t query_mem_limit = 0;
+    if (eos) {
+        revocable_size = revocable_mem_size(state);
+        query_mem_limit = state->get_query_ctx()->get_mem_limit();
+        LOG(INFO) << fmt::format(
+                "Query: {}, task {}, hash join sink {} eos, need spill: {}, 
query mem limit: {}, "
+                "revocable "
+                "memory: {}",
+                print_id(state->query_id()), state->task_id(), node_id(), 
need_to_spill,
+                PrettyPrinter::print_bytes(query_mem_limit),
+                PrettyPrinter::print_bytes(revocable_size));
+    }
+
     if (rows == 0) {
         if (eos) {
-            VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task 
" << state->task_id()
-                       << " sink " << node_id() << " eos, need spill: " << 
need_to_spill;
-
             if (need_to_spill) {
                 return revoke_memory(state, nullptr);
             } else {
-                const auto revocable_size = revocable_mem_size(state);
-                // TODO: consider parallel?
-                // After building hash table it will not be able to spill later
-                // even if memory is low, and will cause cancel of queries.
-                // So make a check here, if build blocks mem usage is too high,
-                // then trigger revoke memory.
-                auto query_mem_limit = state->get_query_ctx()->mem_limit();
-                if (revocable_size >= (double)query_mem_limit / 100.0 *
-                                              
state->revocable_memory_high_watermark_percent()) {
-                    VLOG_DEBUG << fmt::format(
-                            "Query: {}, task {}, sink {}, query mem limit: {}, 
revoke_memory "
-                            "because revocable memory is high: {}",
-                            print_id(state->query_id()), state->task_id(), 
node_id(),
-                            PrettyPrinter::print_bytes(query_mem_limit),
-                            PrettyPrinter::print_bytes(revocable_size));
-                    return revoke_memory(state, nullptr);
-                }
-
                 if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) 
{
                     RETURN_IF_ERROR(_setup_internal_operator(state));
                 }
@@ -587,12 +573,31 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                             "fault_inject partitioned_hash_join_sink "
                             "sink_eos failed");
                 });
+
+                // TODO: consider parallel?
+                // After building hash table it will not be able to spill later
+                // even if memory is low, and will cause cancel of queries.
+                // So make a check here, if build blocks mem usage is too high,
+                // then trigger revoke memory.
+                auto revocable_memory_high_watermark_percent =
+                        state->revocable_memory_high_watermark_percent();
+                if (revocable_memory_high_watermark_percent > 0 &&
+                    revocable_size >= (double)query_mem_limit / 100.0 *
+                                              
revocable_memory_high_watermark_percent) {
+                    LOG(INFO) << fmt::format(
+                            "Query: {}, task {}, hash join sink {} eos, 
revoke_memory "
+                            "because revocable memory is high",
+                            print_id(state->query_id()), state->task_id(), 
node_id());
+                    return revoke_memory(state, nullptr);
+                }
+
                 Defer defer {[&]() { local_state.update_memory_usage(); }};
                 RETURN_IF_ERROR(_inner_sink_operator->sink(
                         local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
 
-                VLOG_DEBUG << fmt::format(
-                        "Query: {}, task {}, sink {} eos, set_ready_to_read, 
nonspill memory "
+                LOG(INFO) << fmt::format(
+                        "Query: {}, task {}, hash join sink {} eos, 
set_ready_to_read, nonspill "
+                        "memory "
                         "usage: {}",
                         print_id(state->query_id()), state->task_id(), 
node_id(),
                         _inner_sink_operator->get_memory_usage_debug_str(
@@ -628,12 +633,26 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                     "fault_inject partitioned_hash_join_sink "
                     "sink failed");
         });
+
+        if (eos) {
+            auto revocable_memory_high_watermark_percent =
+                    state->revocable_memory_high_watermark_percent();
+            if (revocable_memory_high_watermark_percent > 0 &&
+                revocable_size >=
+                        (double)query_mem_limit / 100.0 * 
revocable_memory_high_watermark_percent) {
+                LOG(INFO) << fmt::format(
+                        "Query: {}, task {}, hash join sink {} eos, 
revoke_memory "
+                        "because revocable memory is high",
+                        print_id(state->query_id()), state->task_id(), 
node_id());
+                return revoke_memory(state, nullptr);
+            }
+        }
         RETURN_IF_ERROR(_inner_sink_operator->sink(
                 local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
         local_state.update_memory_usage();
         if (eos) {
-            VLOG_DEBUG << fmt::format(
-                    "Query: {}, task {}, sink {} eos, set_ready_to_read, 
nonspill memory "
+            LOG(INFO) << fmt::format(
+                    "Query: {}, task {}, hash join sink {} eos, 
set_ready_to_read, nonspill memory "
                     "usage: {}",
                     print_id(state->query_id()), state->task_id(), node_id(),
                     _inner_sink_operator->get_memory_usage_debug_str(
@@ -663,9 +682,4 @@ size_t 
PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* stat
     return local_state.get_reserve_mem_size(state, eos);
 }
 
-bool PartitionedHashJoinSinkOperatorX::is_spilled(RuntimeState* state) const {
-    auto& local_state = get_local_state(state);
-    return local_state._shared_state->need_to_spill;
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 9e253ce3fca..b5e28f8b244 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -134,8 +134,6 @@ public:
         return _inner_probe_operator->require_data_distribution();
     }
 
-    bool is_spilled(RuntimeState* state) const override;
-
 private:
     friend class PartitionedHashJoinSinkLocalState;
 
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 6a472a09cfd..2fa0c0ce8e1 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -114,6 +114,7 @@ SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* 
pool, int operator_id
                                                const TPlanNode& tnode, const 
DescriptorTbl& descs,
                                                bool 
require_bucket_distribution)
         : DataSinkOperatorX(operator_id, tnode.node_id) {
+    _spillable = true;
     _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, 
operator_id, tnode, descs,
                                                               
require_bucket_distribution);
 }
@@ -299,9 +300,4 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state,
     return status;
 }
 
-bool SpillSortSinkOperatorX::is_spilled(RuntimeState* state) const {
-    auto& local_state = get_local_state(state);
-    return local_state._shared_state->is_spilled;
-}
-
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 226fe61d386..3d6ccdcc4ce 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -91,7 +91,6 @@ public:
 
     Status revoke_memory(RuntimeState* state,
                          const std::shared_ptr<SpillContext>& spill_context) 
override;
-    bool is_spilled(RuntimeState* state) const override;
 
     using DataSinkOperatorX<LocalStateType>::node_id;
     using DataSinkOperatorX<LocalStateType>::operator_id;
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index d4ed0790942..9d284b31861 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -43,7 +43,6 @@
 #include "util/container_util.hpp"
 #include "util/defer_op.h"
 #include "util/mem_info.h"
-#include "util/pretty_printer.h"
 #include "util/runtime_profile.h"
 #include "util/uid_util.h"
 #include "vec/core/block.h"
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h 
b/be/src/runtime/memory/global_memory_arbitrator.h
index abf3a51c9f3..05963132cb1 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -76,7 +76,7 @@ public:
     static inline int64_t sys_mem_available() {
         return MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed) -
                refresh_interval_memory_growth.load(std::memory_order_relaxed) -
-               process_reserved_memory();
+               process_reserved_memory() + MemInfo::allocator_cache_mem();
     }
 
     static inline std::string sys_mem_available_str() {
diff --git a/be/src/runtime/memory/memory_profile.cpp 
b/be/src/runtime/memory/memory_profile.cpp
index 8dbdcbdd3af..c7421236c42 100644
--- a/be/src/runtime/memory/memory_profile.cpp
+++ b/be/src/runtime/memory/memory_profile.cpp
@@ -343,10 +343,11 @@ int64_t MemoryProfile::other_current_usage() {
 void MemoryProfile::print_log_process_usage() {
     if (_enable_print_log_process_usage) {
         _enable_print_log_process_usage = false;
-        LOG(WARNING) << "Process Memory Summary: " + 
GlobalMemoryArbitrator::process_mem_log_str();
-        LOG(WARNING) << "\n" << print_memory_overview_profile();
-        LOG(WARNING) << "\n" << print_global_memory_profile();
-        LOG(WARNING) << "\n" << print_top_memory_tasks_profile();
+        LOG(WARNING) << "Process Memory Summary: " + 
GlobalMemoryArbitrator::process_mem_log_str()
+                     << "\n"
+                     << print_memory_overview_profile() << "\n"
+                     << print_global_memory_profile() << "\n"
+                     << print_top_memory_tasks_profile();
     }
 }
 
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 55b51088c50..bee6ad549e9 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -213,8 +213,6 @@ public:
 
     ThreadPool* get_memtable_flush_pool();
 
-    int64_t mem_limit() const { return _bytes_limit; }
-
     void set_merge_controller_handler(
             std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
         _merge_controller_handler = handler;
@@ -348,7 +346,6 @@ private:
     TUniqueId _query_id;
     ExecEnv* _exec_env = nullptr;
     MonotonicStopWatch _query_watcher;
-    int64_t _bytes_limit = 0;
     bool _is_nereids = false;
     std::atomic<int> _running_big_mem_op_num = 0;
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 7318c93f15a..16f500b2fcc 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -601,7 +601,7 @@ public:
         if (_query_options.__isset.revocable_memory_high_watermark_percent) {
             return _query_options.revocable_memory_high_watermark_percent;
         }
-        return 10;
+        return -1;
     }
 
     size_t minimum_operator_memory_required_bytes() const {
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index c8f0a7397d7..c59ad83c98d 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -35,6 +35,7 @@
 #include "runtime/process_profile.h"
 #include "runtime/thread_context.h"
 #include "util/mem_info.h"
+#include "util/pretty_printer.h"
 #include "util/stack_util.h"
 #include "util/uid_util.h"
 
@@ -86,8 +87,10 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator>::sys_mem
                 "Allocator sys memory check failed: Cannot alloc:{}, consuming 
"
                 "tracker:<{}>, peak used {}, current used {}, exec node:<{}>, 
{}.",
                 size, doris::thread_context()->thread_mem_tracker()->label(),
-                
doris::thread_context()->thread_mem_tracker()->peak_consumption(),
-                doris::thread_context()->thread_mem_tracker()->consumption(),
+                doris::PrettyPrinter::print_bytes(
+                        
doris::thread_context()->thread_mem_tracker()->peak_consumption()),
+                doris::PrettyPrinter::print_bytes(
+                        
doris::thread_context()->thread_mem_tracker()->consumption()),
                 
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker_label(),
                 
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 16b214b1536..f0380efd7b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -2245,7 +2245,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public long dataQueueMaxBlocks = 1;
 
     @VariableMgr.VarAttr(name = REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT, fuzzy 
= true)
-    public int revocableMemoryHighWatermarkPercent = 10;
+    public int revocableMemoryHighWatermarkPercent = -1;
 
     // If the memory consumption of sort node exceed this limit, will trigger 
spill to disk;
     // Set to 0 to disable; min: 128M
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 7ebe16583d8..b44196d3df2 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -367,7 +367,7 @@ struct TQueryOptions {
   144: optional i32 query_slot_count = 0;
   145: optional bool enable_spill = false
   146: optional bool enable_reserve_memory = true
-  147: optional i32 revocable_memory_high_watermark_percent = 10
+  147: optional i32 revocable_memory_high_watermark_percent = -1
 
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to