This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 517c12478f21a0388afc716ae61ea1ebbcc6a9c6
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Fri Mar 29 16:56:57 2024 +0800

    [improvement](spill) spill trigger improvement (#32641)
---
 be/src/common/config.cpp                           |   3 +
 be/src/common/config.h                             |   3 +
 be/src/common/daemon.cpp                           |  13 +++
 be/src/common/daemon.h                             |   1 +
 .../exec/partitioned_aggregation_sink_operator.cpp |   3 +
 .../partitioned_aggregation_source_operator.cpp    |   2 +
 .../exec/partitioned_hash_join_probe_operator.cpp  |  50 +++++----
 .../exec/partitioned_hash_join_sink_operator.cpp   |  28 ++++-
 .../exec/partitioned_hash_join_sink_operator.h     |   1 +
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |   8 ++
 .../pipeline/exec/spill_sort_source_operator.cpp   |   6 +
 be/src/pipeline/pipeline_x/operator.h              |  36 ++++++
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |  73 +++++++-----
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |   2 +
 be/src/runtime/fragment_mgr.cpp                    |   1 +
 be/src/runtime/query_context.cpp                   |   1 -
 be/src/runtime/query_context.h                     |  28 +++++
 be/src/runtime/workload_group/workload_group.cpp   |   4 +
 be/src/runtime/workload_group/workload_group.h     |  38 ++++++-
 .../workload_group/workload_group_manager.cpp      | 123 +++++++++++++++++++++
 .../workload_group/workload_group_manager.h        |   2 +
 be/src/vec/spill/spill_stream.h                    |   2 +
 be/src/vec/spill/spill_writer.cpp                  |  23 ++--
 be/src/vec/spill/spill_writer.h                    |   5 +-
 24 files changed, 382 insertions(+), 74 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9364303a3ea..f3bde7dd36b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -565,6 +565,9 @@ DEFINE_mInt32(memory_gc_sleep_time_ms, "1000");
 // Sleep time in milliseconds between memtbale flush mgr refresh iterations
 DEFINE_mInt64(memtable_mem_tracker_refresh_interval_ms, "5");
 
+// Sleep time in milliseconds between refresh iterations of workload group 
memory statistics
+DEFINE_mInt64(wg_mem_refresh_interval_ms, "50");
+
 // percent of (active memtables size / all memtables size) when reach hard 
limit
 DEFINE_mInt32(memtable_hard_limit_active_percent, "50");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 01d8a123a45..a0845d043de 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -616,6 +616,9 @@ DECLARE_mInt32(memory_gc_sleep_time_ms);
 // Sleep time in milliseconds between memtbale flush mgr memory refresh 
iterations
 DECLARE_mInt64(memtable_mem_tracker_refresh_interval_ms);
 
+// Sleep time in milliseconds between refresh iterations of workload group 
memory statistics
+DECLARE_mInt64(wg_mem_refresh_interval_ms);
+
 // percent of (active memtables size / all memtables size) when reach hard 
limit
 DECLARE_mInt32(memtable_hard_limit_active_percent);
 
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 4735c41c724..582319befb2 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -369,6 +369,14 @@ void Daemon::je_purge_dirty_pages_thread() const {
     } while (true);
 }
 
+void Daemon::wg_mem_used_refresh_thread() {
+    // Refresh memory usage and limit of workload groups
+    while (!_stop_background_threads_latch.wait_for(
+            std::chrono::milliseconds(config::wg_mem_refresh_interval_ms))) {
+        
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_memory_info();
+    }
+}
+
 void Daemon::start() {
     Status st;
     st = Thread::create(
@@ -402,6 +410,11 @@ void Daemon::start() {
             "Daemon", "query_runtime_statistics_thread",
             [this]() { this->report_runtime_query_statistics_thread(); }, 
&_threads.emplace_back());
     CHECK(st.ok()) << st;
+
+    st = Thread::create(
+            "Daemon", "wg_mem_refresh_thread", [this]() { 
this->wg_mem_used_refresh_thread(); },
+            &_threads.emplace_back());
+    CHECK(st.ok()) << st;
 }
 
 void Daemon::stop() {
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 5d54ba5b49d..28f63067896 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -44,6 +44,7 @@ private:
     void calculate_metrics_thread();
     void je_purge_dirty_pages_thread() const;
     void report_runtime_query_statistics_thread();
+    void wg_mem_used_refresh_thread();
 
     CountDownLatch _stop_background_threads_latch;
     std::vector<scoped_refptr<Thread>> _threads;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 3207c109589..074565b4027 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -70,6 +70,7 @@ Status PartitionedAggSinkLocalState::close(RuntimeState* 
state, Status exec_stat
     if (Base::_closed) {
         return Status::OK();
     }
+    dec_running_big_mem_op_num(state);
     {
         std::unique_lock<std::mutex> lk(_spill_lock);
         if (_is_spilling) {
@@ -159,6 +160,7 @@ Status PartitionedAggSinkOperatorX::close(RuntimeState* 
state) {
 Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in_block,
                                          bool eos) {
     auto& local_state = get_local_state(state);
+    local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
@@ -174,6 +176,7 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
                 RETURN_IF_ERROR(partition->finish_current_spilling(eos));
             }
             local_state._dependency->set_ready_to_read();
+            local_state._finish_dependency->set_ready();
         }
     }
     if (local_state._runtime_state) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 960decdb951..b6620458c06 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -88,6 +88,7 @@ Status PartitionedAggLocalState::close(RuntimeState* state) {
     if (_closed) {
         return Status::OK();
     }
+    dec_running_big_mem_op_num(state);
     {
         std::unique_lock<std::mutex> lk(_merge_spill_lock);
         if (_is_merging) {
@@ -128,6 +129,7 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* 
state) {
 Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                 bool* eos) {
     auto& local_state = get_local_state(state);
+    local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     RETURN_IF_ERROR(local_state._status);
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index c0f5e3b65b8..8f859820252 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -17,6 +17,7 @@
 
 #include "partitioned_hash_join_probe_operator.h"
 
+#include "pipeline/pipeline_x/pipeline_x_task.h"
 #include "util/mem_info.h"
 #include "vec/spill/spill_stream_manager.h"
 
@@ -148,6 +149,10 @@ Status 
PartitionedHashJoinProbeLocalState::open(RuntimeState* state) {
     return _partitioner->open(state);
 }
 Status PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
+    if (_closed) {
+        return Status::OK();
+    }
+    dec_running_big_mem_op_num(state);
     RETURN_IF_ERROR(JoinProbeLocalState::close(state));
     return Status::OK();
 }
@@ -156,7 +161,8 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
                                                              uint32_t 
partition_index) {
     auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
     auto& mutable_block = partitioned_build_blocks[partition_index];
-    if (!mutable_block || mutable_block->rows() < state->batch_size()) {
+    if (!mutable_block ||
+        mutable_block->allocated_bytes() < 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
         --_spilling_task_count;
         return Status::OK();
     }
@@ -201,6 +207,8 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
                 --_spilling_task_count;
 
                 if (_spilling_task_count == 0) {
+                    LOG(INFO) << "hash probe " << _parent->id()
+                              << " revoke memory spill_build_block finish";
                     std::unique_lock<std::mutex> lock(_spill_lock);
                     _dependency->set_ready();
                 }
@@ -225,7 +233,8 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
 
     auto& blocks = _probe_blocks[partition_index];
     auto& partitioned_block = _partitioned_blocks[partition_index];
-    if (partitioned_block && partitioned_block->rows() >= state->batch_size()) 
{
+    if (partitioned_block && partitioned_block->allocated_bytes() >=
+                                     
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
         blocks.emplace_back(partitioned_block->to_block());
         partitioned_block.reset();
     }
@@ -263,6 +272,8 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
                     --_spilling_task_count;
 
                     if (_spilling_task_count == 0) {
+                        LOG(INFO) << "hash probe " << _parent->id()
+                                  << " revoke memory spill_probe_blocks 
finish";
                         std::unique_lock<std::mutex> lock(_spill_lock);
                         _dependency->set_ready();
                     }
@@ -304,8 +315,6 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
     auto& spilled_stream = _shared_state->spilled_streams[partition_index];
     has_data = false;
     if (!spilled_stream) {
-        LOG(INFO) << "no data need to recovery for partition: " << 
partition_index
-                  << ", node id: " << _parent->id() << ", task id: " << 
state->task_id();
         return Status::OK();
     }
 
@@ -492,6 +501,7 @@ Status 
PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) {
 Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, 
vectorized::Block* input_block,
                                                bool eos) const {
     auto& local_state = get_local_state(state);
+    local_state.inc_running_big_mem_op_num(state);
     const auto rows = input_block->rows();
     auto& partitioned_blocks = local_state._partitioned_blocks;
     if (rows == 0) {
@@ -694,8 +704,11 @@ size_t 
PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
     auto& probe_blocks = local_state._probe_blocks;
     for (uint32_t i = spilling_start; i < _partition_count; ++i) {
         auto& build_block = partitioned_build_blocks[i];
-        if (build_block && build_block->rows() >= state->batch_size()) {
-            mem_size += build_block->allocated_bytes();
+        if (build_block) {
+            auto block_bytes = build_block->allocated_bytes();
+            if (block_bytes >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+                mem_size += build_block->allocated_bytes();
+            }
         }
 
         for (auto& block : probe_blocks[i]) {
@@ -703,8 +716,11 @@ size_t 
PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
         }
 
         auto& partitioned_block = local_state._partitioned_blocks[i];
-        if (partitioned_block && partitioned_block->rows() >= 
state->batch_size()) {
-            mem_size += partitioned_block->allocated_bytes();
+        if (partitioned_block) {
+            auto block_bytes = partitioned_block->allocated_bytes();
+            if (block_bytes >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+                mem_size += block_bytes;
+            }
         }
     }
     return mem_size;
@@ -722,6 +738,8 @@ Status 
PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo
         return Status::OK();
     }
 
+    LOG(INFO) << "hash probe " << id()
+              << " revoke memory, spill task count: " << 
local_state._spilling_task_count;
     for (uint32_t i = spilling_start; i < _partition_count; ++i) {
         RETURN_IF_ERROR(local_state.spill_build_block(state, i));
         RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i));
@@ -739,22 +757,14 @@ Status 
PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state, bo
 
 bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* 
state) const {
     auto& local_state = get_local_state(state);
-
+    const auto revocable_size = revocable_mem_size(state);
+    if (PipelineXTask::should_revoke_memory(state, revocable_size)) {
+        return true;
+    }
     if (local_state._shared_state->need_to_spill) {
-        const auto revocable_size = revocable_mem_size(state);
         const auto min_revocable_size = state->min_revocable_mem();
         return revocable_size > min_revocable_size;
     }
-
-    auto sys_mem_available = MemInfo::sys_mem_available();
-    auto sys_mem_warning_water_mark = 
doris::MemInfo::sys_mem_available_warning_water_mark();
-
-    if (sys_mem_available <
-        sys_mem_warning_water_mark * 
config::spill_mem_warning_water_mark_multiplier) {
-        const auto revocable_size = revocable_mem_size(state);
-        const auto min_revocable_size = state->min_revocable_mem();
-        return min_revocable_size > 0 && revocable_size > min_revocable_size;
-    }
     return false;
 }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index a63f22c1329..dd119ade14b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -48,15 +48,27 @@ Status 
PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
     RETURN_IF_ERROR(PipelineXSinkLocalState::open(state));
     return _partitioner->open(state);
 }
+Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status 
exec_status) {
+    SCOPED_TIMER(PipelineXSinkLocalState::exec_time_counter());
+    SCOPED_TIMER(PipelineXSinkLocalState::_close_timer);
+    if (PipelineXSinkLocalState::_closed) {
+        return Status::OK();
+    }
+    dec_running_big_mem_op_num(state);
+    return PipelineXSinkLocalState::close(state, exec_status);
+}
 
 Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
+    LOG(INFO) << "hash join sink " << _parent->id() << " revoke_memory"
+              << ", eos: " << _child_eos;
     DCHECK_EQ(_spilling_streams_count, 0);
     _spilling_streams_count = _shared_state->partitioned_build_blocks.size();
     for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); 
++i) {
         vectorized::SpillStreamSPtr& spilling_stream = 
_shared_state->spilled_streams[i];
         auto& mutable_block = _shared_state->partitioned_build_blocks[i];
 
-        if (!mutable_block || mutable_block->rows() < state->batch_size()) {
+        if (!mutable_block ||
+            mutable_block->allocated_bytes() < 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
             --_spilling_streams_count;
             continue;
         }
@@ -99,7 +111,7 @@ Status 
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
         if (_spilling_streams_count > 0) {
             _dependency->block();
         } else if (_child_eos) {
-            LOG(INFO) << "sink eos, set_ready_to_read, node id: " << 
_parent->id()
+            LOG(INFO) << "hash join sink " << _parent->id() << " 
set_ready_to_read"
                       << ", task id: " << state->task_id();
             _dependency->set_ready_to_read();
         }
@@ -129,7 +141,7 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
         std::unique_lock<std::mutex> lock(_spill_lock);
         _dependency->set_ready();
         if (_child_eos) {
-            LOG(INFO) << "sink eos, set_ready_to_read, node id: " << 
_parent->id()
+            LOG(INFO) << "hash join sink " << _parent->id() << " 
set_ready_to_read"
                       << ", task id: " << state()->task_id();
             _dependency->set_ready_to_read();
         }
@@ -176,6 +188,7 @@ Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* 
state) {
 Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                               bool eos) {
     auto& local_state = get_local_state(state);
+    local_state.inc_running_big_mem_op_num(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     if (!local_state._spill_status_ok) {
         DCHECK_NE(local_state._spill_status.code(), 0);
@@ -227,7 +240,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* 
state, vectorized::B
     }
 
     if (eos) {
-        LOG(INFO) << "sink eos, set_ready_to_read, node id: " << id()
+        LOG(INFO) << "hash join sink " << id() << " sink eos, 
set_ready_to_read"
                   << ", task id: " << state->task_id();
         local_state._dependency->set_ready_to_read();
     }
@@ -243,8 +256,11 @@ size_t 
PartitionedHashJoinSinkOperatorX::revocable_mem_size(RuntimeState* state)
     size_t mem_size = 0;
     for (uint32_t i = 0; i != _partition_count; ++i) {
         auto& block = partitioned_blocks[i];
-        if (block && block->rows() >= state->batch_size()) {
-            mem_size += block->allocated_bytes();
+        if (block) {
+            auto block_bytes = block->allocated_bytes();
+            if (block_bytes >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+                mem_size += block_bytes;
+            }
         }
     }
     return mem_size;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 44081bb0caa..4d25acd1b20 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -46,6 +46,7 @@ public:
     ~PartitionedHashJoinSinkLocalState() override = default;
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
+    Status close(RuntimeState* state, Status exec_status) override;
     Status revoke_memory(RuntimeState* state);
 
 protected:
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index c586a8e5e56..0ddc6daa79c 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -80,6 +80,10 @@ Status SpillSortSinkLocalState::close(RuntimeState* state, 
Status execsink_statu
             _spill_cv.wait(lk);
         }
     }
+    auto& parent = Base::_parent->template cast<Parent>();
+    if (parent._enable_spill) {
+        dec_running_big_mem_op_num(state);
+    }
     return Status::OK();
 }
 Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
@@ -160,6 +164,9 @@ size_t 
SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
 Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* in_block,
                                     bool eos) {
     auto& local_state = get_local_state(state);
+    if (_enable_spill) {
+        local_state.inc_running_big_mem_op_num(state);
+    }
     SCOPED_TIMER(local_state.exec_time_counter());
     RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
@@ -176,6 +183,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
                 RETURN_IF_ERROR(revoke_memory(state));
             } else {
                 local_state._dependency->set_ready_to_read();
+                local_state._finish_dependency->set_ready();
             }
         } else {
             RETURN_IF_ERROR(
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index d249b3be56e..56c20c853be 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -64,6 +64,9 @@ Status SpillSortLocalState::close(RuntimeState* state) {
             _merge_spill_cv.wait(lk);
         }
     }
+    if (Base::_shared_state->enable_spill) {
+        dec_running_big_mem_op_num(state);
+    }
     RETURN_IF_ERROR(Base::close(state));
     for (auto& stream : _current_merging_streams) {
         
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
@@ -249,6 +252,9 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state) 
{
 Status SpillSortSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                            bool* eos) {
     auto& local_state = get_local_state(state);
+    if (local_state.Base::_shared_state->enable_spill) {
+        local_state.inc_running_big_mem_op_num(state);
+    }
     SCOPED_TIMER(local_state.exec_time_counter());
     RETURN_IF_ERROR(local_state._status);
 
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 94e3df7c947..4ca8022d163 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -413,9 +413,27 @@ public:
         return _dependency ? std::vector<Dependency*> {_dependency} : 
std::vector<Dependency*> {};
     }
 
+    void inc_running_big_mem_op_num(RuntimeState* state) {
+        if (!_big_mem_op_num_added) {
+            state->get_query_ctx()->inc_running_big_mem_op_num();
+            _big_mem_op_num_added = true;
+        }
+    }
+
+    void dec_running_big_mem_op_num(RuntimeState* state) {
+        if (_big_mem_op_num_added && !_big_mem_op_num_deced) {
+            state->get_query_ctx()->dec_running_big_mem_op_num();
+            _big_mem_op_num_deced = true;
+        }
+    }
+
 protected:
     Dependency* _dependency = nullptr;
     SharedStateArg* _shared_state = nullptr;
+
+private:
+    bool _big_mem_op_num_added = false;
+    bool _big_mem_op_num_deced = false;
 };
 
 template <typename SharedStateArg>
@@ -708,9 +726,27 @@ public:
         return _dependency ? std::vector<Dependency*> {_dependency} : 
std::vector<Dependency*> {};
     }
 
+    void inc_running_big_mem_op_num(RuntimeState* state) {
+        if (!_big_mem_op_num_added) {
+            state->get_query_ctx()->inc_running_big_mem_op_num();
+            _big_mem_op_num_added = true;
+        }
+    }
+
+    void dec_running_big_mem_op_num(RuntimeState* state) {
+        if (_big_mem_op_num_added && !_big_mem_op_num_deced) {
+            state->get_query_ctx()->dec_running_big_mem_op_num();
+            _big_mem_op_num_deced = true;
+        }
+    }
+
 protected:
     Dependency* _dependency = nullptr;
     SharedStateType* _shared_state = nullptr;
+
+private:
+    bool _big_mem_op_num_added = false;
+    bool _big_mem_op_num_deced = false;
 };
 
 template <typename SharedStateArg>
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 94cd106bbce..9222c482381 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -263,32 +263,10 @@ Status PipelineXTask::execute(bool* eos) {
         _block->clear_column_data(_root->row_desc().num_materialized_slots());
         auto* block = _block.get();
 
-        auto sys_mem_available = doris::MemInfo::sys_mem_available();
-        auto sys_mem_warning_water_mark = 
doris::MemInfo::sys_mem_available_warning_water_mark();
-        auto query_mem = query_context()->query_mem_tracker->consumption();
         auto sink_revocable_mem_size = _sink->revocable_mem_size(_state);
-        if (sink_revocable_mem_size > 0) {
-            VLOG_ROW << "sys mem available: "
-                     << PrettyPrinter::print(sys_mem_available, TUnit::BYTES)
-                     << ",\nsys_mem_available_warning_water_mark: "
-                     << PrettyPrinter::print(sys_mem_warning_water_mark, 
TUnit::BYTES)
-                     << ",\nquery mem limit: "
-                     << PrettyPrinter::print(_state->query_mem_limit(), 
TUnit::BYTES)
-                     << ",\nquery mem: " << PrettyPrinter::print(query_mem, 
TUnit::BYTES)
-                     << ",\nmin revocable mem: "
-                     << PrettyPrinter::print(_state->min_revocable_mem(), 
TUnit::BYTES)
-                     << ",\nrevocable mem: "
-                     << PrettyPrinter::print(
-                                
static_cast<uint64_t>(_sink->revocable_mem_size(_state)),
-                                TUnit::BYTES);
-        }
-        if (sys_mem_available < sys_mem_warning_water_mark * 
config::spill_mem_warning_water_mark_multiplier /*&&
-            (double)query_mem >= (double)_state->query_mem_limit() * 0.8*/) {
-            if (_state->min_revocable_mem() > 0 &&
-                sink_revocable_mem_size >= _state->min_revocable_mem()) {
-                RETURN_IF_ERROR(_sink->revoke_memory(_state));
-                continue;
-            }
+        if (should_revoke_memory(_state, sink_revocable_mem_size)) {
+            RETURN_IF_ERROR(_sink->revoke_memory(_state));
+            continue;
         }
 
         // Pull block from operator chain
@@ -321,6 +299,50 @@ Status PipelineXTask::execute(bool* eos) {
     return status;
 }
 
+bool PipelineXTask::should_revoke_memory(RuntimeState* state, int64_t 
revocable_mem_bytes) {
+    auto* query_ctx = state->get_query_ctx();
+    auto wg = query_ctx->workload_group();
+    if (!wg) {
+        LOG_ONCE(INFO) << "no workload group for query " << 
print_id(state->query_id());
+        return false;
+    }
+    bool is_wg_mem_low_water_mark = false;
+    bool is_wg_mem_high_water_mark = false;
+    wg->check_mem_used(&is_wg_mem_low_water_mark, &is_wg_mem_high_water_mark);
+    if (is_wg_mem_high_water_mark) {
+        if (revocable_mem_bytes > 0) {
+            LOG_EVERY_N(INFO, 5) << "revoke memory, hight water mark";
+            return true;
+        }
+        return false;
+    } else if (is_wg_mem_low_water_mark) {
+        int64_t query_weighted_limit = 0;
+        int64_t query_weighted_consumption = 0;
+        query_ctx->get_weighted_mem_info(query_weighted_limit, 
query_weighted_consumption);
+        if (query_weighted_consumption < query_weighted_limit) {
+            return false;
+        }
+        auto big_memory_operator_num = query_ctx->get_running_big_mem_op_num();
+        DCHECK(big_memory_operator_num >= 0);
+        int64_t mem_limit_of_op;
+        if (0 == big_memory_operator_num) {
+            mem_limit_of_op = (double)query_weighted_limit * 0.8;
+        } else {
+            mem_limit_of_op = query_weighted_limit / big_memory_operator_num;
+        }
+
+        const auto min_revocable_mem_bytes = state->min_revocable_mem();
+        LOG_EVERY_N(INFO, 5) << "revoke memory, low water mark, 
revocable_mem_bytes: "
+                             << PrettyPrinter::print_bytes(revocable_mem_bytes)
+                             << ", mem_limit_of_op: " << 
PrettyPrinter::print_bytes(mem_limit_of_op)
+                             << ", min_revocable_mem_bytes: "
+                             << 
PrettyPrinter::print_bytes(min_revocable_mem_bytes);
+        return (revocable_mem_bytes > mem_limit_of_op ||
+                revocable_mem_bytes > min_revocable_mem_bytes);
+    } else {
+        return false;
+    }
+}
 void PipelineXTask::finalize() {
     PipelineTask::finalize();
     std::unique_lock<std::mutex> lc(_release_lock);
@@ -417,5 +439,4 @@ void PipelineXTask::wake_up() {
     // call by dependency
     static_cast<void>(get_task_queue()->push_back(this));
 }
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index a89df75fc9b..1f3dd9c3b71 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -154,6 +154,8 @@ public:
         return false;
     }
 
+    static bool should_revoke_memory(RuntimeState* state, int64_t 
revocable_mem_bytes);
+
 private:
     friend class RuntimeFilterDependency;
     Dependency* _write_blocked_dependency() {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0ec721c1255..7e15477a641 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -651,6 +651,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
             WorkloadGroupPtr workload_group_ptr =
                     
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
             if (workload_group_ptr != nullptr) {
+                RETURN_IF_ERROR(workload_group_ptr->add_query(query_id, 
query_ctx));
                 
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
                 
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
                                                                                
  tg_id);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 76f262df0af..10f54255741 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -297,7 +297,6 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& 
tg) {
     _workload_group = tg;
     // Should add query first, then the workload group will not be deleted.
     // see task_group_manager::delete_workload_group_by_ids
-    RETURN_IF_ERROR(_workload_group->add_query(_query_id));
     _workload_group->add_mem_tracker_limiter(query_mem_tracker);
     _workload_group->get_query_scheduler(&_task_scheduler, 
&_scan_task_scheduler,
                                          &_non_pipe_thread_pool, 
&_remote_scan_task_scheduler);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index b4120b6942e..c78886997d0 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -247,6 +247,29 @@ public:
 
     bool is_nereids() const { return _is_nereids; }
 
+    WorkloadGroupPtr workload_group() const { return _workload_group; }
+
+    void inc_running_big_mem_op_num() {
+        _running_big_mem_op_num.fetch_add(1, std::memory_order_relaxed);
+    }
+    void dec_running_big_mem_op_num() {
+        _running_big_mem_op_num.fetch_sub(1, std::memory_order_relaxed);
+    }
+    int32_t get_running_big_mem_op_num() {
+        return _running_big_mem_op_num.load(std::memory_order_relaxed);
+    }
+
+    void set_weighted_mem(int64_t weighted_limit, int64_t 
weighted_consumption) {
+        std::lock_guard<std::mutex> l(_weighted_mem_lock);
+        _weighted_consumption = weighted_consumption;
+        _weighted_limit = weighted_limit;
+    }
+    void get_weighted_mem_info(int64_t& weighted_limit, int64_t& 
weighted_consumption) {
+        std::lock_guard<std::mutex> l(_weighted_mem_lock);
+        weighted_limit = _weighted_limit;
+        weighted_consumption = _weighted_consumption;
+    }
+
     DescriptorTbl* desc_tbl = nullptr;
     bool set_rsc_info = false;
     std::string user;
@@ -280,6 +303,7 @@ private:
     int64_t _bytes_limit = 0;
     bool _is_pipeline = false;
     bool _is_nereids = false;
+    std::atomic<int> _running_big_mem_op_num = 0;
 
     // A token used to submit olap scanner to the "_limited_scan_thread_pool",
     // This thread pool token is created from "_limited_scan_thread_pool" from 
exec env.
@@ -323,6 +347,10 @@ private:
 
     std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> 
_fragment_id_to_pipeline_ctx;
     std::mutex _pipeline_map_write_lock;
+
+    std::mutex _weighted_mem_lock;
+    int64_t _weighted_consumption = 0;
+    int64_t _weighted_limit = 0;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 36d2a1be310..1b0430d64fb 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -118,6 +118,10 @@ int64_t WorkloadGroup::memory_used() {
     return used_memory;
 }
 
+void WorkloadGroup::set_weighted_memory_used(int64_t wg_total_mem_used, double 
ratio) {
+    _weighted_mem_used.store(wg_total_mem_used * ratio, 
std::memory_order_relaxed);
+}
+
 void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr) {
     auto group_num = mem_tracker_ptr->group_num();
     std::lock_guard<std::mutex> 
l(_mem_tracker_limiter_pool[group_num].group_lock);
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 4df33b349eb..49bcd841a0f 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -39,6 +39,7 @@ class RuntimeProfile;
 class ThreadPool;
 class ExecEnv;
 class CgroupCpuCtl;
+class QueryContext;
 
 namespace vectorized {
 class SimplifiedScanScheduler;
@@ -78,6 +79,25 @@ public:
 
     int64_t memory_used();
 
+    int spill_threshold_low_water_mark() const {
+        return _spill_low_watermark.load(std::memory_order_relaxed);
+    }
+    int spill_threashold_high_water_mark() const {
+        return _spill_high_watermark.load(std::memory_order_relaxed);
+    }
+
+    void set_weighted_memory_used(int64_t wg_total_mem_used, double ratio);
+
+    void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) 
const {
+        auto weighted_mem_used = 
_weighted_mem_used.load(std::memory_order_relaxed);
+        *is_low_wartermark =
+                (weighted_mem_used > ((double)_memory_limit *
+                                      
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
+        *is_high_wartermark =
+                (weighted_mem_used > ((double)_memory_limit *
+                                      
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
+    }
+
     std::string debug_string() const;
 
     void check_and_update(const WorkloadGroupInfo& tg_info);
@@ -93,7 +113,7 @@ public:
         return _memory_limit > 0;
     }
 
-    Status add_query(TUniqueId query_id) {
+    Status add_query(TUniqueId query_id, std::shared_ptr<QueryContext> 
query_ctx) {
         std::unique_lock<std::shared_mutex> wlock(_mutex);
         if (_is_shutdown) {
             // If the workload group is set shutdown, then should not run any 
more,
@@ -102,13 +122,13 @@ public:
                     "Failed add query to workload group, the workload group is 
shutdown. host: {}",
                     BackendOptions::get_localhost());
         }
-        _query_id_set.insert(query_id);
+        _query_ctxs.insert({query_id, query_ctx});
         return Status::OK();
     }
 
     void remove_query(TUniqueId query_id) {
         std::unique_lock<std::shared_mutex> wlock(_mutex);
-        _query_id_set.erase(query_id);
+        _query_ctxs.erase(query_id);
     }
 
     void shutdown() {
@@ -118,7 +138,7 @@ public:
 
     int query_num() {
         std::shared_lock<std::shared_mutex> r_lock(_mutex);
-        return _query_id_set.size();
+        return _query_ctxs.size();
     }
 
     int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile);
@@ -132,12 +152,18 @@ public:
 
     void try_stop_schedulers();
 
+    std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> queries() {
+        std::shared_lock<std::shared_mutex> r_lock(_mutex);
+        return _query_ctxs;
+    }
+
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
     std::string _name;
     int64_t _version;
-    int64_t _memory_limit; // bytes
+    int64_t _memory_limit;                      // bytes
+    std::atomic_int64_t _weighted_mem_used = 0; // bytes
     bool _enable_memory_overcommit;
     std::atomic<uint64_t> _cpu_share;
     std::vector<TrackerLimiterGroup> _mem_tracker_limiter_pool;
@@ -152,7 +178,7 @@ private:
     // new query can not submit
     // waiting running query to be cancelled or finish
     bool _is_shutdown = false;
-    std::unordered_set<TUniqueId> _query_id_set;
+    std::unordered_map<TUniqueId, std::weak_ptr<QueryContext>> _query_ctxs;
 
     std::shared_mutex _task_sched_lock;
     std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl = nullptr;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 7ec08387543..027cfb2b2dd 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -19,10 +19,12 @@
 
 #include <memory>
 #include <mutex>
+#include <unordered_map>
 
 #include "pipeline/task_scheduler.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/workload_group/workload_group.h"
+#include "util/mem_info.h"
 #include "util/threadpool.h"
 #include "util/time.h"
 #include "vec/exec/scan/scanner_scheduler.h"
@@ -135,6 +137,127 @@ void 
WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
               << "ms, deleted group size:" << deleted_task_groups.size();
 }
 
+struct WorkloadGroupMemInfo {
+    int64_t total_mem_used = 0;
+    int64_t weighted_mem_used = 0;
+    bool is_low_wartermark = false;
+    bool is_high_wartermark = false;
+    double mem_used_ratio = 0;
+};
+void WorkloadGroupMgr::refresh_wg_memory_info() {
+    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+    // workload group id -> workload group queries
+    std::unordered_map<uint64_t, std::unordered_map<TUniqueId, 
std::weak_ptr<QueryContext>>>
+            all_wg_queries;
+    for (auto& [wg_id, wg] : _workload_groups) {
+        all_wg_queries.insert({wg_id, wg->queries()});
+    }
+
+    int64_t all_queries_mem_used = 0;
+
+    // calculate total memory used of each workload group and total memory 
used of all queries
+    std::unordered_map<uint64_t, WorkloadGroupMemInfo> wgs_mem_info;
+    for (auto& [wg_id, wg_queries] : all_wg_queries) {
+        int64_t wg_total_mem_used = 0;
+        for (const auto& [query_id, query_ctx_ptr] : wg_queries) {
+            if (auto query_ctx = query_ctx_ptr.lock()) {
+                wg_total_mem_used += 
query_ctx->query_mem_tracker->consumption();
+            }
+        }
+        all_queries_mem_used += wg_total_mem_used;
+        wgs_mem_info[wg_id] = {wg_total_mem_used};
+    }
+
+    auto proc_vm_rss = PerfCounters::get_vm_rss();
+    if (all_queries_mem_used <= 0) {
+        return;
+    }
+
+    auto process_mem_used = doris::MemInfo::proc_mem_no_allocator_cache();
+    auto sys_mem_available = doris::MemInfo::sys_mem_available();
+    if (proc_vm_rss < all_queries_mem_used) {
+        all_queries_mem_used = proc_vm_rss;
+    }
+
+    // process memory used is actually bigger than all_queries_mem_used,
+    // because memory of page cache, allocator cache, segment cache etc. are 
included
+    // in process_mem_used.
+    // we count these cache memories equally on workload groups.
+    double ratio = (double)proc_vm_rss / (double)all_queries_mem_used;
+    if (ratio >= 1.25) {
+        std::string debug_msg = fmt::format(
+                "\nProcess Memory Summary: process_vm_rss: {}, process mem: 
{}, sys mem available: "
+                "{}, all quries mem: {}",
+                PrettyPrinter::print(proc_vm_rss, TUnit::BYTES),
+                PrettyPrinter::print(process_mem_used, TUnit::BYTES),
+                PrettyPrinter::print(sys_mem_available, TUnit::BYTES),
+                PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES));
+        LOG_EVERY_N(INFO, 10) << debug_msg;
+    }
+
+    for (auto& wg : _workload_groups) {
+        auto wg_mem_limit = wg.second->memory_limit();
+        auto& wg_mem_info = wgs_mem_info[wg.first];
+        wg_mem_info.weighted_mem_used = wg_mem_info.total_mem_used * ratio;
+        wg_mem_info.mem_used_ratio = (double)wg_mem_info.weighted_mem_used / 
wg_mem_limit;
+
+        wg.second->set_weighted_memory_used(wg_mem_info.total_mem_used, ratio);
+
+        auto spill_low_water_mark = 
wg.second->spill_threshold_low_water_mark();
+        auto spill_high_water_mark = 
wg.second->spill_threashold_high_water_mark();
+        wg_mem_info.is_high_wartermark = (wg_mem_info.weighted_mem_used >
+                                          ((double)wg_mem_limit * 
spill_high_water_mark / 100));
+        wg_mem_info.is_low_wartermark = (wg_mem_info.weighted_mem_used >
+                                         ((double)wg_mem_limit * 
spill_low_water_mark / 100));
+
+        // calculate query weighted memory limit of task group
+        const auto& wg_queries = all_wg_queries[wg.first];
+        auto wg_query_count = wg_queries.size();
+        int64_t query_weighted_mem_limit =
+                wg_query_count ? (wg_mem_limit + wg_query_count) / 
wg_query_count : wg_mem_limit;
+
+        std::string debug_msg;
+        if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) {
+            debug_msg = fmt::format(
+                    "\nWorkload Group {}: mem limit: {}, mem used: {}, 
weighted mem used: {}, used "
+                    "ratio: {}, query "
+                    "count: {}, query_weighted_mem_limit: {}",
+                    wg.second->name(), PrettyPrinter::print(wg_mem_limit, 
TUnit::BYTES),
+                    PrettyPrinter::print(wg_mem_info.total_mem_used, 
TUnit::BYTES),
+                    PrettyPrinter::print(wg_mem_info.weighted_mem_used, 
TUnit::BYTES),
+                    wg_mem_info.mem_used_ratio, wg_query_count,
+                    PrettyPrinter::print(query_weighted_mem_limit, 
TUnit::BYTES));
+
+            debug_msg += "\n  Query Memory Summary:";
+        }
+        // check where queries need to revoke memory for task group
+        for (const auto& query : wg_queries) {
+            auto query_ctx = query.second.lock();
+            if (!query_ctx) {
+                continue;
+            }
+            auto query_consumption = 
query_ctx->query_mem_tracker->consumption();
+            int64_t query_weighted_consumption = query_consumption * ratio;
+            query_ctx->set_weighted_mem(query_weighted_mem_limit, 
query_weighted_consumption);
+
+            if (wg_mem_info.is_high_wartermark || 
wg_mem_info.is_low_wartermark) {
+                debug_msg += fmt::format(
+                        "\n    MemTracker Label={}, Parent Label={}, Used={}, 
WeightedUsed={}, "
+                        "Peak={}",
+                        query_ctx->query_mem_tracker->label(),
+                        query_ctx->query_mem_tracker->parent_label(),
+                        PrettyPrinter::print(query_consumption, TUnit::BYTES),
+                        PrettyPrinter::print(query_weighted_consumption, 
TUnit::BYTES),
+                        
PrettyPrinter::print(query_ctx->query_mem_tracker->peak_consumption(),
+                                             TUnit::BYTES));
+            }
+        }
+        if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) {
+            LOG_EVERY_N(INFO, 10) << debug_msg;
+        }
+    }
+}
+
 void WorkloadGroupMgr::stop() {
     for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); 
iter++) {
         iter->second->try_stop_schedulers();
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index 1f680eb17c8..8aeb8f988a3 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -54,6 +54,8 @@ public:
 
     bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); }
 
+    void refresh_wg_memory_info();
+
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 9f328240d75..4d53b439712 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -34,6 +34,8 @@ class SpillDataDir;
 
 class SpillStream {
 public:
+    // to avoid too many small file writes
+    static constexpr int MIN_SPILL_WRITE_BATCH_MEM = 32 * 1024;
     SpillStream(RuntimeState* state, int64_t stream_id, SpillDataDir* data_dir,
                 std::string spill_dir, size_t batch_rows, size_t batch_bytes,
                 RuntimeProfile* profile);
diff --git a/be/src/vec/spill/spill_writer.cpp 
b/be/src/vec/spill/spill_writer.cpp
index d51dcf8f1ec..6c25c40e858 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -18,6 +18,7 @@
 #include "vec/spill/spill_writer.h"
 
 #include "agent/be_exec_version_manager.h"
+#include "common/status.h"
 #include "io/file_factory.h"
 #include "io/fs/local_file_system.h"
 #include "io/fs/local_file_writer.h"
@@ -35,14 +36,18 @@ Status SpillWriter::open() {
     return Status::OK();
 }
 
+SpillWriter::~SpillWriter() {
+    if (!closed_) {
+        (void)Status::Error<ErrorCode::INTERNAL_ERROR>("spill writer not 
closed correctly");
+    }
+}
+
 Status SpillWriter::close() {
     if (closed_ || !file_writer_) {
         return Status::OK();
     }
     closed_ = true;
 
-    tmp_block_.clear_column_data();
-
     meta_.append((const char*)&max_sub_block_size_, 
sizeof(max_sub_block_size_));
     meta_.append((const char*)&written_blocks_, sizeof(written_blocks_));
 
@@ -71,17 +76,13 @@ Status SpillWriter::write(const Block& block, size_t& 
written_bytes) {
     if (rows <= batch_size_) {
         return _write_internal(block, written_bytes);
     } else {
-        if (is_first_write_) {
-            is_first_write_ = false;
-            tmp_block_ = block.clone_empty();
-        }
-
+        auto tmp_block = block.clone_empty();
         const auto& src_data = block.get_columns_with_type_and_name();
 
         for (size_t row_idx = 0; row_idx < rows;) {
-            tmp_block_.clear_column_data();
+            tmp_block.clear_column_data();
 
-            auto& dst_data = tmp_block_.get_columns_with_type_and_name();
+            auto& dst_data = tmp_block.get_columns_with_type_and_name();
 
             size_t block_rows = std::min(rows - row_idx, batch_size_);
             RETURN_IF_CATCH_EXCEPTION({
@@ -91,7 +92,7 @@ Status SpillWriter::write(const Block& block, size_t& 
written_bytes) {
                 }
             });
 
-            RETURN_IF_ERROR(_write_internal(tmp_block_, written_bytes));
+            RETURN_IF_ERROR(_write_internal(tmp_block, written_bytes));
 
             row_idx += block_rows;
         }
@@ -106,8 +107,8 @@ Status SpillWriter::_write_internal(const Block& block, 
size_t& written_bytes) {
     std::string buff;
 
     if (block.rows() > 0) {
-        PBlock pblock;
         {
+            PBlock pblock;
             SCOPED_TIMER(serialize_timer_);
             status = 
block.serialize(BeExecVersionManager::get_newest_version(), &pblock,
                                      &uncompressed_bytes, &compressed_bytes,
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index 14e8120f775..45317e8b8cf 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -35,7 +35,7 @@ public:
         file_path_ = dir + "/" + std::to_string(file_index_);
     }
 
-    ~SpillWriter() { (void)close(); }
+    ~SpillWriter();
 
     Status open();
 
@@ -79,9 +79,6 @@ private:
     size_t total_written_bytes_ = 0;
     std::string meta_;
 
-    bool is_first_write_ = true;
-    Block tmp_block_;
-
     RuntimeProfile::Counter* write_bytes_counter_;
     RuntimeProfile::Counter* serialize_timer_;
     RuntimeProfile::Counter* write_timer_;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to