This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 536de88fd22 Fix bugs and undefined behaviors (#45410)
536de88fd22 is described below

commit 536de88fd228356ff1a626d0b2045b3468232580
Author: Jerry Hu <hushengg...@selectdb.com>
AuthorDate: Tue Dec 17 10:58:58 2024 +0800

    Fix bugs and undefined behaviors (#45410)
---
 be/src/common/config.cpp                           |   4 +
 be/src/common/config.h                             |   1 +
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |   9 +-
 be/src/pipeline/exec/multi_cast_data_streamer.cpp  |  49 +++++---
 be/src/pipeline/exec/operator.h                    |   5 +
 .../pipeline/exec/partition_sort_sink_operator.cpp |   4 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |   6 +
 .../exec/partitioned_aggregation_sink_operator.h   |   2 +
 .../partitioned_aggregation_source_operator.cpp    |  25 ++++-
 .../exec/partitioned_hash_join_probe_operator.cpp  |  85 +++++++-------
 .../exec/partitioned_hash_join_probe_operator.h    |   1 -
 .../exec/partitioned_hash_join_sink_operator.cpp   | 125 ++++++---------------
 .../exec/partitioned_hash_join_sink_operator.h     |   6 +-
 be/src/pipeline/exec/scan_operator.cpp             |  14 ++-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |   5 +
 be/src/pipeline/exec/spill_sort_sink_operator.h    |   1 +
 be/src/pipeline/exec/spill_utils.h                 |   6 +
 be/src/pipeline/pipeline_fragment_context.cpp      |   3 +-
 be/src/pipeline/pipeline_task.cpp                  |  26 ++++-
 be/src/pipeline/pipeline_task.h                    |   1 +
 be/src/runtime/fragment_mgr.cpp                    |   2 +-
 be/src/runtime/query_context.h                     |   4 +
 .../workload_group/workload_group_manager.cpp      |  13 +--
 be/src/vec/common/columns_hashing.h                |   8 +-
 be/src/vec/core/block.cpp                          |   3 +-
 be/src/vec/functions/function_string.h             |   2 +
 .../correctness_p0/test_mask_function.groovy       |  21 ++++
 27 files changed, 254 insertions(+), 177 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 89fcea3372d..f8b7b390ee9 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <fmt/core.h>
+#include <gflags/gflags.h>
 #include <stdint.h>
 
 #include <algorithm>
@@ -1263,6 +1264,9 @@ DEFINE_Validator(spill_io_thread_pool_thread_num, 
[](const int config) -> bool {
 });
 DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");
 
+// paused query in queue timeout(ms) will be resumed or canceled
+DEFINE_Int64(spill_in_paused_queue_timeout_ms, "60000");
+
 DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
 
 DEFINE_mInt32(max_s3_client_retry, "10");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8e308c1794d..1b9b8a3d531 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1344,6 +1344,7 @@ DECLARE_mInt32(spill_gc_interval_ms);
 DECLARE_mInt32(spill_gc_work_time_ms);
 DECLARE_Int32(spill_io_thread_pool_thread_num);
 DECLARE_Int32(spill_io_thread_pool_queue_size);
+DECLARE_Int64(spill_in_paused_queue_timeout_ms);
 
 DECLARE_mBool(check_segment_when_build_rowset_meta);
 
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index a8122dd11ed..02bc358e958 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -26,6 +26,7 @@
 #include "pipeline/exec/operator.h"
 #include "pipeline/pipeline_task.h"
 #include "util/pretty_printer.h"
+#include "vec/columns/column_nullable.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/utils/template_helpers.hpp"
@@ -153,12 +154,18 @@ size_t 
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bo
 
         if (build_block_rows > 0) {
             auto block = _build_side_mutable_block.to_block();
+            std::vector<uint16_t> converted_columns;
             Defer defer([&]() {
+                for (auto i : converted_columns) {
+                    auto& data = block.get_by_position(i);
+                    data.column = vectorized::remove_nullable(data.column);
+                    data.type = vectorized::remove_nullable(data.type);
+                }
                 _build_side_mutable_block = 
vectorized::MutableBlock(std::move(block));
             });
             vectorized::ColumnUInt8::MutablePtr null_map_val;
             if (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == 
TJoinOp::FULL_OUTER_JOIN) {
-                _convert_block_to_null(block);
+                converted_columns = _convert_block_to_null(block);
                 // first row is mocked
                 for (int i = 0; i < block.columns(); i++) {
                     auto [column, is_const] = 
unpack_if_const(block.safe_get_by_position(i).column);
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 8fc3bc6015c..9f0c653b1b7 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -128,6 +128,8 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int 
sender_idx, vectoriz
             return Status::OK();
         }
 
+        DCHECK_GT(pos_to_pull->_un_finish_copy, 0);
+        DCHECK_LE(pos_to_pull->_un_finish_copy, _cast_sender_count);
         *block = *pos_to_pull->_block;
 
         multi_cast_block = &(*pos_to_pull);
@@ -156,6 +158,8 @@ Status MultiCastDataStreamer::_copy_block(RuntimeState* 
state, int32_t sender_id
     multi_cast_block._un_finish_copy--;
     auto copying_count = _copying_count.fetch_sub(1) - 1;
     if (multi_cast_block._un_finish_copy == 0) {
+        DCHECK_EQ(_multi_cast_blocks.front()._un_finish_copy, 0);
+        DCHECK_EQ(&(_multi_cast_blocks.front()), &multi_cast_block);
         _multi_cast_blocks.pop_front();
         _write_dependency->set_ready();
     } else if (copying_count == 0) {
@@ -167,7 +171,7 @@ Status MultiCastDataStreamer::_copy_block(RuntimeState* 
state, int32_t sender_id
 }
 
 Status MultiCastDataStreamer::_trigger_spill_if_need(RuntimeState* state, 
bool* triggered) {
-    if (!state->enable_spill() && !state->enable_force_spill()) {
+    if (!state->enable_spill()) {
         *triggered = false;
         return Status::OK();
     }
@@ -232,6 +236,7 @@ Status 
MultiCastDataStreamer::_submit_spill_task(RuntimeState* state,
                                                  vectorized::SpillStreamSPtr 
spill_stream) {
     std::vector<vectorized::Block> blocks;
     for (auto& block : _multi_cast_blocks) {
+        DCHECK_GT(block._block->rows(), 0);
         blocks.emplace_back(std::move(*block._block));
     }
 
@@ -288,6 +293,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block
         std::lock_guard l(_mutex);
 
         if (_pending_block) {
+            DCHECK_GT(_pending_block->rows(), 0);
             const auto pending_size = _pending_block->allocated_bytes();
             _cumulative_mem_size += pending_size;
             _multi_cast_blocks.emplace_back(_pending_block.get(), 
_cast_sender_count, pending_size);
@@ -306,24 +312,33 @@ Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block
         COUNTER_SET(_peak_mem_usage,
                     std::max(_cumulative_mem_size.load(), 
_peak_mem_usage->value()));
 
-        if (!eos) {
-            bool spilled = false;
-            RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled));
-            if (spilled) {
-                _pending_block =
-                        
vectorized::Block::create_unique(block->get_columns_with_type_and_name());
-                block->clear();
-                return Status::OK();
+        if (rows > 0) {
+            if (!eos) {
+                bool spilled = false;
+                RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled));
+                if (spilled) {
+                    _pending_block = vectorized::Block::create_unique(
+                            block->get_columns_with_type_and_name());
+                    block->clear();
+                    return Status::OK();
+                }
             }
-        }
 
-        _multi_cast_blocks.emplace_back(block, _cast_sender_count, 
block_mem_size);
-        // last elem
-        auto end = std::prev(_multi_cast_blocks.end());
-        for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
-            if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
-                _sender_pos_to_read[i] = end;
-                _set_ready_for_read(i);
+            _multi_cast_blocks.emplace_back(block, _cast_sender_count, 
block_mem_size);
+
+            // last elem
+            auto end = std::prev(_multi_cast_blocks.end());
+            for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+                if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
+                    _sender_pos_to_read[i] = end;
+                    _set_ready_for_read(i);
+                }
+            }
+        } else if (eos) {
+            for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+                if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
+                    _set_ready_for_read(i);
+                }
             }
         }
 
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 590e04a2445..723e14f4bc7 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -589,6 +589,10 @@ public:
         return state->minimum_operator_memory_required_bytes();
     }
 
+    [[nodiscard]] virtual bool is_spilled(RuntimeState* state) const { return 
false; }
+
+    [[nodiscard]] bool is_spillable() const { return _spillable; }
+
     template <class TARGET>
     TARGET& cast() {
         DCHECK(dynamic_cast<TARGET*>(this))
@@ -651,6 +655,7 @@ protected:
     const int _operator_id;
     const int _node_id;
     int _nereids_id = -1;
+    bool _spillable = false;
     std::vector<int> _dests_id;
     std::string _name;
 };
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index b90af92f6b1..759c7ea2bcc 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -70,7 +70,9 @@ 
PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int ope
           _topn_phase(tnode.partition_sort_node.ptopn_phase),
           _has_global_limit(tnode.partition_sort_node.has_global_limit),
           _top_n_algorithm(tnode.partition_sort_node.top_n_algorithm),
-          
_partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {}
+          
_partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {
+    _spillable = true;
+}
 
 Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 8a29a88c82c..6f008a3b1f2 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -148,6 +148,7 @@ 
PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int o
         : DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id, 
tnode.node_id) {
     _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, 
tnode, descs,
                                                             
require_bucket_distribution);
+    _spillable = true;
 }
 
 Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
@@ -340,4 +341,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(
             std::move(spill_runnable));
 }
 
+bool PartitionedAggSinkOperatorX::is_spilled(RuntimeState* state) const {
+    auto& local_state = get_local_state(state);
+    return local_state._shared_state->is_spilled;
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 4b022dd1c5a..922798707d0 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -333,6 +333,8 @@ public:
 
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
+    bool is_spilled(RuntimeState* state) const override;
+
 private:
     friend class PartitionedAggSinkLocalState;
     std::unique_ptr<AggSinkOperatorX> _agg_sink_operator;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 6bd601383f7..57ebfd24558 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -17,10 +17,13 @@
 
 #include "partitioned_aggregation_source_operator.h"
 
+#include <glog/logging.h>
+
 #include <string>
 
 #include "aggregation_source_operator.h"
 #include "common/exception.h"
+#include "common/logging.h"
 #include "common/status.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/spill_utils.h"
@@ -257,7 +260,9 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
         size_t accumulated_blocks_size = 0;
         while (!state->is_cancelled() && !has_agg_data &&
                !_shared_state->spill_partitions.empty()) {
-            for (auto& stream : 
_shared_state->spill_partitions[0]->spill_streams_) {
+            while (!_shared_state->spill_partitions[0]->spill_streams_.empty() 
&&
+                   !state->is_cancelled()) {
+                auto& stream = 
_shared_state->spill_partitions[0]->spill_streams_[0];
                 stream->set_read_counters(profile());
                 vectorized::Block block;
                 bool eos = false;
@@ -291,10 +296,20 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
 
                 if (_current_partition_eos) {
                     
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
-                    _shared_state->spill_partitions.pop_front();
+                    
_shared_state->spill_partitions[0]->spill_streams_.pop_front();
                 }
             }
+
+            if (_shared_state->spill_partitions[0]->spill_streams_.empty()) {
+                _shared_state->spill_partitions.pop_front();
+            }
         }
+
+        VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << 
_parent->node_id()
+                   << ", task id: " << state->task_id() << " recover 
partitioned finished, "
+                   << _shared_state->spill_partitions.size() << " partitions 
left, "
+                   << accumulated_blocks_size
+                   << " bytes read, spill dep: " << 
(void*)(_spill_dependency.get());
         return status;
     };
 
@@ -308,6 +323,8 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
         });
 
         auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); 
}); }();
+        LOG_IF(INFO, !status.ok()) << "Query : " << print_id(query_id)
+                                   << " recover exception : " << 
status.to_string();
         return status;
     };
 
@@ -317,6 +334,10 @@ Status 
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
     });
     _spill_dependency->block();
 
+    VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << 
_parent->node_id()
+               << ", task id: " << state->task_id() << " begin to recover, "
+               << _shared_state->spill_partitions.size()
+               << " partitions left, _spill_dependency: " << 
(void*)(_spill_dependency.get());
     return 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
             std::make_shared<SpillRecoverRunnable>(state, _spill_dependency, 
_runtime_profile.get(),
                                                    
_shared_state->shared_from_this(),
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index e3f990c1667..bbbcb9b9d5e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -408,20 +408,21 @@ std::string 
PartitionedHashJoinProbeLocalState::debug_string(int indentation_lev
     bool need_more_input_data;
     if (_shared_state->need_to_spill) {
         need_more_input_data = !_child_eos;
-    } else if (_runtime_state) {
-        need_more_input_data = 
p._inner_probe_operator->need_more_input_data(_runtime_state.get());
+    } else if (_shared_state->inner_runtime_state) {
+        need_more_input_data = p._inner_probe_operator->need_more_input_data(
+                _shared_state->inner_runtime_state.get());
     } else {
         need_more_input_data = true;
     }
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
                    "{}, short_circuit_for_probe: {}, need_to_spill: {}, 
child_eos: {}, "
-                   "_runtime_state: {}, need_more_input_data: {}",
+                   "_shared_state->inner_runtime_state: {}, 
need_more_input_data: {}",
                    
PipelineXSpillLocalState<PartitionedHashJoinSharedState>::debug_string(
                            indentation_level),
                    _shared_state ? 
std::to_string(_shared_state->short_circuit_for_probe) : "NULL",
-                   _shared_state->need_to_spill, _child_eos, _runtime_state != 
nullptr,
-                   need_more_input_data);
+                   _shared_state->need_to_spill, _child_eos,
+                   _shared_state->inner_runtime_state != nullptr, 
need_more_input_data);
     return fmt::to_string(debug_string_buffer);
 }
 
@@ -528,7 +529,7 @@ Status PartitionedHashJoinProbeOperatorX::init(const 
TPlanNode& tnode, RuntimeSt
     auto tnode_ = _tnode;
     tnode_.runtime_filters.clear();
 
-    for (auto& conjunct : tnode.hash_join_node.eq_join_conjuncts) {
+    for (const auto& conjunct : tnode.hash_join_node.eq_join_conjuncts) {
         _probe_exprs.emplace_back(conjunct.left);
     }
     _partitioner = std::make_unique<SpillPartitionerType>(_partition_count);
@@ -618,11 +619,10 @@ Status 
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
 Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill(
         PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) {
     DCHECK(local_state._shared_state->inner_runtime_state);
-    local_state._runtime_state = 
std::move(local_state._shared_state->inner_runtime_state);
     local_state._in_mem_shared_state_sptr =
             std::move(local_state._shared_state->inner_shared_state);
 
-    auto* sink_state = local_state._runtime_state->get_sink_local_state();
+    auto* sink_state = 
local_state._shared_state->inner_runtime_state->get_sink_local_state();
     if (sink_state != nullptr) {
         COUNTER_SET(local_state._hash_table_memory_usage,
                     
sink_state->profile()->get_counter("MemoryUsageHashTable")->value());
@@ -632,21 +632,22 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill
 
 Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
         PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) 
const {
-    if (local_state._runtime_state) {
+    if (local_state._shared_state->inner_runtime_state) {
         _update_profile_from_internal_states(local_state);
     }
 
-    local_state._runtime_state = RuntimeState::create_unique(
+    local_state._shared_state->inner_runtime_state = 
RuntimeState::create_unique(
             nullptr, state->fragment_instance_id(), state->query_id(), 
state->fragment_id(),
             state->query_options(), TQueryGlobals {}, state->exec_env(), 
state->get_query_ctx());
 
-    local_state._runtime_state->set_task_execution_context(
+    local_state._shared_state->inner_runtime_state->set_task_execution_context(
             state->get_task_execution_context().lock());
-    local_state._runtime_state->set_be_number(state->be_number());
+    
local_state._shared_state->inner_runtime_state->set_be_number(state->be_number());
 
-    local_state._runtime_state->set_desc_tbl(&state->desc_tbl());
-    local_state._runtime_state->resize_op_id_to_local_state(-1);
-    
local_state._runtime_state->set_runtime_filter_mgr(state->local_runtime_filter_mgr());
+    
local_state._shared_state->inner_runtime_state->set_desc_tbl(&state->desc_tbl());
+    
local_state._shared_state->inner_runtime_state->resize_op_id_to_local_state(-1);
+    local_state._shared_state->inner_runtime_state->set_runtime_filter_mgr(
+            state->local_runtime_filter_mgr());
 
     local_state._in_mem_shared_state_sptr = 
_inner_sink_operator->create_shared_state();
 
@@ -654,23 +655,23 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
     LocalSinkStateInfo info {0,  local_state._internal_runtime_profile.get(),
                              -1, local_state._in_mem_shared_state_sptr.get(),
                              {}, {}};
-    RETURN_IF_ERROR(
-            
_inner_sink_operator->setup_local_state(local_state._runtime_state.get(), 
info));
+    RETURN_IF_ERROR(_inner_sink_operator->setup_local_state(
+            local_state._shared_state->inner_runtime_state.get(), info));
 
     LocalStateInfo state_info {local_state._internal_runtime_profile.get(),
                                {},
                                local_state._in_mem_shared_state_sptr.get(),
                                {},
                                0};
-    RETURN_IF_ERROR(
-            
_inner_probe_operator->setup_local_state(local_state._runtime_state.get(), 
state_info));
+    RETURN_IF_ERROR(_inner_probe_operator->setup_local_state(
+            local_state._shared_state->inner_runtime_state.get(), state_info));
 
-    auto* sink_local_state = 
local_state._runtime_state->get_sink_local_state();
+    auto* sink_local_state = 
local_state._shared_state->inner_runtime_state->get_sink_local_state();
     DCHECK(sink_local_state != nullptr);
     RETURN_IF_ERROR(sink_local_state->open(state));
 
-    auto* probe_local_state =
-            
local_state._runtime_state->get_local_state(_inner_probe_operator->operator_id());
+    auto* probe_local_state = 
local_state._shared_state->inner_runtime_state->get_local_state(
+            _inner_probe_operator->operator_id());
     DCHECK(probe_local_state != nullptr);
     RETURN_IF_ERROR(probe_local_state->open(state));
 
@@ -686,13 +687,15 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
                 "fault_inject partitioned_hash_join_probe sink failed");
     });
 
-    
RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._runtime_state.get(), 
&block, true));
+    
RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._shared_state->inner_runtime_state.get(),
+                                               &block, true));
     VLOG_DEBUG << "Query: " << print_id(state->query_id())
                << ", internal build operator finished, node id: " << node_id()
                << ", task id: " << state->task_id()
                << ", partition: " << local_state._partition_cursor << "rows: " 
<< block.rows()
                << ", usage: "
-               << 
_inner_sink_operator->get_memory_usage(local_state._runtime_state.get());
+               << _inner_sink_operator->get_memory_usage(
+                          
local_state._shared_state->inner_runtime_state.get());
 
     COUNTER_SET(local_state._hash_table_memory_usage,
                 
sink_local_state->profile()->get_counter("MemoryUsageHashTable")->value());
@@ -735,7 +738,7 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
         }
     }
     bool in_mem_eos = false;
-    auto* runtime_state = local_state._runtime_state.get();
+    auto* runtime_state = local_state._shared_state->inner_runtime_state.get();
     while (_inner_probe_operator->need_more_input_data(runtime_state)) {
         if (probe_blocks.empty()) {
             *eos = false;
@@ -761,8 +764,8 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
         }
     }
 
-    
RETURN_IF_ERROR(_inner_probe_operator->pull(local_state._runtime_state.get(), 
output_block,
-                                                &in_mem_eos));
+    RETURN_IF_ERROR(_inner_probe_operator->pull(
+            local_state._shared_state->inner_runtime_state.get(), 
output_block, &in_mem_eos));
 
     *eos = false;
     if (in_mem_eos) {
@@ -784,8 +787,9 @@ bool 
PartitionedHashJoinProbeOperatorX::need_more_input_data(RuntimeState* state
     auto& local_state = get_local_state(state);
     if (local_state._shared_state->need_to_spill) {
         return !local_state._child_eos;
-    } else if (local_state._runtime_state) {
-        return 
_inner_probe_operator->need_more_input_data(local_state._runtime_state.get());
+    } else if (local_state._shared_state->inner_runtime_state) {
+        return _inner_probe_operator->need_more_input_data(
+                local_state._shared_state->inner_runtime_state.get());
     } else {
         return true;
     }
@@ -898,11 +902,12 @@ bool 
PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* stat
 
 void PartitionedHashJoinProbeOperatorX::_update_profile_from_internal_states(
         PartitionedHashJoinProbeLocalState& local_state) const {
-    if (local_state._runtime_state) {
-        auto* sink_local_state = 
local_state._runtime_state->get_sink_local_state();
+    if (local_state._shared_state->inner_runtime_state) {
+        auto* sink_local_state =
+                
local_state._shared_state->inner_runtime_state->get_sink_local_state();
         local_state.update_build_profile(sink_local_state->profile());
-        auto* probe_local_state =
-                
local_state._runtime_state->get_local_state(_inner_probe_operator->operator_id());
+        auto* probe_local_state = 
local_state._shared_state->inner_runtime_state->get_local_state(
+                _inner_probe_operator->operator_id());
         local_state.update_probe_profile(probe_local_state->profile());
     }
 }
@@ -953,13 +958,13 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
                 return _revoke_memory(state);
             }
         } else {
-            if (UNLIKELY(!local_state._runtime_state)) {
+            if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
                 
RETURN_IF_ERROR(_setup_internal_operator_for_non_spill(local_state, state));
             }
 
-            
RETURN_IF_ERROR(_inner_probe_operator->push(local_state._runtime_state.get(),
-                                                        
local_state._child_block.get(),
-                                                        
local_state._child_eos));
+            RETURN_IF_ERROR(_inner_probe_operator->push(
+                    local_state._shared_state->inner_runtime_state.get(),
+                    local_state._child_block.get(), local_state._child_eos));
         }
     }
 
@@ -969,11 +974,11 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
         if (need_to_spill) {
             RETURN_IF_ERROR(pull(state, block, eos));
         } else {
-            RETURN_IF_ERROR(
-                    
_inner_probe_operator->pull(local_state._runtime_state.get(), block, eos));
+            RETURN_IF_ERROR(_inner_probe_operator->pull(
+                    local_state._shared_state->inner_runtime_state.get(), 
block, eos));
             if (*eos) {
                 _update_profile_from_internal_states(local_state);
-                local_state._runtime_state.reset();
+                local_state._shared_state->inner_runtime_state.reset();
             }
         }
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index f020c0a832a..7b77e1e6e3f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -81,7 +81,6 @@ private:
     std::vector<vectorized::SpillStreamSPtr> _probe_spilling_streams;
 
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
-    std::unique_ptr<RuntimeState> _runtime_state;
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
 
     bool _need_to_setup_internal_operators {true};
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 220115c085e..672eb36a907 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -290,7 +290,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
     VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << 
state->task_id()
                << " sink " << _parent->node_id() << " revoke_memory"
                << ", eos: " << _child_eos;
-    DCHECK_EQ(_spilling_task_count, 0);
     CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr);
 
     if (!_shared_state->need_to_spill) {
@@ -299,18 +298,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
         return _revoke_unpartitioned_block(state, spill_context);
     }
 
-    _spilling_task_count = _shared_state->partitioned_build_blocks.size();
-
     auto query_id = state->query_id();
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     DCHECK(spill_io_pool != nullptr);
 
     auto spill_fin_cb = [this, state, query_id, spill_context]() {
-        if (_spilling_task_count.fetch_sub(1) != 1) {
-            return Status::OK();
-        }
-
         Status status;
         if (_child_eos) {
             VLOG_DEBUG << "Query:" << print_id(this->state()->query_id()) << 
", task "
@@ -330,99 +323,49 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
             spill_context->on_task_finished();
         }
 
-        std::lock_guard<std::mutex> lock(_spill_mutex);
         _spill_dependency->set_ready();
         return status;
     };
 
-    for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); 
++i) {
-        vectorized::SpillStreamSPtr& spilling_stream = 
_shared_state->spilled_streams[i];
-        auto& mutable_block = _shared_state->partitioned_build_blocks[i];
-
-        if (!mutable_block ||
-            mutable_block->allocated_bytes() < 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-            --_spilling_task_count;
-            continue;
-        }
-
-        DCHECK(spilling_stream != nullptr);
+    auto spill_runnable = std::make_shared<SpillSinkRunnable>(
+            state, nullptr, nullptr, _profile, 
_shared_state->shared_from_this(),
+            [this, query_id] {
+                
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel",
 {
+                    auto status = Status::InternalError(
+                            "fault_inject partitioned_hash_join_sink "
+                            "revoke_memory canceled");
+                    
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
+                    return status;
+                });
+                SCOPED_TIMER(_spill_build_timer);
 
-        MonotonicStopWatch submit_timer;
-        submit_timer.start();
+                for (size_t i = 0; i != 
_shared_state->partitioned_build_blocks.size(); ++i) {
+                    vectorized::SpillStreamSPtr& spilling_stream =
+                            _shared_state->spilled_streams[i];
+                    DCHECK(spilling_stream != nullptr);
+                    auto& mutable_block = 
_shared_state->partitioned_build_blocks[i];
 
-        Status st;
-        
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_submit_func",
 {
-            st = Status::Error<INTERNAL_ERROR>(
-                    "fault_inject partitioned_hash_join_sink revoke_memory 
submit_func failed");
-        });
-        // For every stream, the task counter is increased +1
-        // so that when a stream finished, it should desc -1
-        state->get_query_ctx()->increase_revoking_tasks_count();
-        auto spill_runnable = std::make_shared<SpillSinkRunnable>(
-                state, nullptr, nullptr, _profile, 
_shared_state->shared_from_this(),
-                [this, query_id, spilling_stream, i] {
-                    DBUG_EXECUTE_IF(
-                            
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
-                                auto status = Status::InternalError(
-                                        "fault_inject 
partitioned_hash_join_sink "
-                                        "revoke_memory canceled");
-                                
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id,
-                                                                               
      status);
-                                return status;
-                            });
-                    SCOPED_TIMER(_spill_build_timer);
+                    if (!mutable_block ||
+                        mutable_block->allocated_bytes() <
+                                
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+                        continue;
+                    }
 
                     auto status = [&]() {
                         RETURN_IF_CATCH_EXCEPTION(return _spill_to_disk(i, 
spilling_stream));
                     }();
 
-                    _state->get_query_ctx()->decrease_revoking_tasks_count();
-                    return status;
-                },
-                spill_fin_cb);
-        if (st.ok()) {
-            st = spill_io_pool->submit(std::move(spill_runnable));
-        }
-
-        if (!st.ok()) {
-            --_spilling_task_count;
-            return st;
-        }
-    }
-
-    if (_spilling_task_count.load() > 0) {
-        std::lock_guard<std::mutex> lock(_spill_mutex);
-        if (_spilling_task_count.load() > 0) {
-            _spill_dependency->block();
-            return Status::OK();
-        }
-    }
-
-    if (_child_eos) {
-        VLOG_DEBUG << "Query:" << print_id(state->query_id()) << ", task " << 
state->task_id()
-                   << " sink " << _parent->node_id() << " set_ready_to_read";
-        std::for_each(_shared_state->partitioned_build_blocks.begin(),
-                      _shared_state->partitioned_build_blocks.end(), [&](auto& 
block) {
-                          if (block) {
-                              COUNTER_UPDATE(_in_mem_rows_counter, 
block->rows());
-                          }
-                      });
-        RETURN_IF_ERROR(_finish_spilling());
-        _dependency->set_ready_to_read();
+                    RETURN_IF_ERROR(status);
+                }
+                return Status::OK();
+            },
+            spill_fin_cb);
 
-        if (spill_context) {
-            spill_context->on_task_finished();
-        }
-    }
-    return Status::OK();
+    _spill_dependency->block();
+    return spill_io_pool->submit(std::move(spill_runnable));
 }
 
 Status PartitionedHashJoinSinkLocalState::_finish_spilling() {
-    bool expected = false;
-    if (!_spilling_finished.compare_exchange_strong(expected, true)) {
-        return Status::OK();
-    }
-
     for (auto& stream : _shared_state->spilled_streams) {
         if (stream) {
             RETURN_IF_ERROR(stream->spill_eof());
@@ -504,7 +447,9 @@ 
PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(ObjectPool* p
                                                 : std::vector<TExpr> {}),
           _tnode(tnode),
           _descriptor_tbl(descs),
-          _partition_count(partition_count) {}
+          _partition_count(partition_count) {
+    _spillable = true;
+}
 
 Status PartitionedHashJoinSinkOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state) {
     RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state));
@@ -661,10 +606,9 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                     "fault_inject partitioned_hash_join_sink "
                     "sink failed");
         });
-        Defer defer {[&]() { local_state.update_memory_usage(); }};
         RETURN_IF_ERROR(_inner_sink_operator->sink(
                 local_state._shared_state->inner_runtime_state.get(), 
in_block, eos));
-
+        local_state.update_memory_usage();
         if (eos) {
             VLOG_DEBUG << fmt::format(
                     "Query: {}, task {}, sink {} eos, set_ready_to_read, 
nonspill memory "
@@ -697,4 +641,9 @@ size_t 
PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* stat
     return local_state.get_reserve_mem_size(state, eos);
 }
 
+bool PartitionedHashJoinSinkOperatorX::is_spilled(RuntimeState* state) const {
+    auto& local_state = get_local_state(state);
+    return local_state._shared_state->need_to_spill;
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 8e3c53ffd38..9e253ce3fca 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -68,10 +68,6 @@ protected:
 
     friend class PartitionedHashJoinSinkOperatorX;
 
-    std::mutex _spill_mutex;
-    std::atomic<bool> _spilling_finished {false};
-    std::atomic_int32_t _spilling_task_count {0};
-
     bool _child_eos {false};
 
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
@@ -138,6 +134,8 @@ public:
         return _inner_probe_operator->require_data_distribution();
     }
 
+    bool is_spilled(RuntimeState* state) const override;
+
 private:
     friend class PartitionedHashJoinSinkLocalState;
 
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 7173601595b..767a1f168da 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1289,16 +1289,20 @@ Status 
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
 template <typename LocalStateType>
 size_t ScanOperatorX<LocalStateType>::get_reserve_mem_size(RuntimeState* 
state) {
     auto& local_state = get_local_state(state);
+    if (!local_state._opened || local_state._closed || 
!local_state._scanner_ctx) {
+        return config::doris_scanner_row_bytes;
+    }
+
     if (local_state.low_memory_mode()) {
         return 
local_state._scanner_ctx->low_memory_mode_scan_bytes_per_scanner() *
                local_state._scanner_ctx->low_memory_mode_scanners();
     } else {
-        if (local_state._memory_used_counter->value() > 0) {
+        const auto peak_usage = local_state._memory_used_counter->value();
+        const auto block_usage = 
local_state._scanner_ctx->block_memory_usage();
+        if (peak_usage > 0) {
             // It is only a safty check, to avoid some counter not right.
-            if (local_state._memory_used_counter->value() >
-                local_state._scanner_ctx->block_memory_usage()) {
-                return local_state._memory_used_counter->value() -
-                       local_state._scanner_ctx->block_memory_usage();
+            if (peak_usage > block_usage) {
+                return peak_usage - block_usage;
             } else {
                 return config::doris_scanner_row_bytes;
             }
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 25205ab09fe..6a472a09cfd 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -299,4 +299,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state,
     return status;
 }
 
+bool SpillSortSinkOperatorX::is_spilled(RuntimeState* state) const {
+    auto& local_state = get_local_state(state);
+    return local_state._shared_state->is_spilled;
+}
+
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 3d6ccdcc4ce..226fe61d386 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -91,6 +91,7 @@ public:
 
     Status revoke_memory(RuntimeState* state,
                          const std::shared_ptr<SpillContext>& spill_context) 
override;
+    bool is_spilled(RuntimeState* state) const override;
 
     using DataSinkOperatorX<LocalStateType>::node_id;
     using DataSinkOperatorX<LocalStateType>::operator_id;
diff --git a/be/src/pipeline/exec/spill_utils.h 
b/be/src/pipeline/exec/spill_utils.h
index e669c0e343c..84a3f8c2e29 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -187,6 +187,9 @@ protected:
     }
 
     virtual void _on_task_started(uint64_t submit_elapsed_time) {
+        VLOG_DEBUG << "Query: " << print_id(_state->query_id())
+                   << " spill task started, pipeline task id: " << 
_state->task_id()
+                   << ", spill dep: " << (void*)(_spill_dependency.get());
         if (_is_write_task) {
             COUNTER_UPDATE(_spill_write_wait_in_queue_timer, 
submit_elapsed_time);
             COUNTER_UPDATE(_write_wait_in_queue_task_count, -1);
@@ -269,6 +272,9 @@ protected:
     }
 
     void _on_task_started(uint64_t submit_elapsed_time) override {
+        LOG(INFO) << "SpillRecoverRunnable, Query: " << 
print_id(_state->query_id())
+                  << " spill task started, pipeline task id: " << 
_state->task_id()
+                  << ", spill dep: " << (void*)(_spill_dependency.get());
         COUNTER_UPDATE(_spill_read_wait_in_queue_timer, submit_elapsed_time);
         COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
         COUNTER_UPDATE(_reading_task_count, 1);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 2986b4b96ed..42d2640441e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1841,7 +1841,8 @@ std::vector<PipelineTask*> 
PipelineFragmentContext::get_revocable_tasks() const
     for (const auto& task_instances : _tasks) {
         for (const auto& task : task_instances) {
             size_t revocable_size_ = task->get_revocable_size();
-            if (revocable_size_ > _runtime_state->min_revocable_mem()) {
+            if (revocable_size_ > _runtime_state->min_revocable_mem() ||
+                (revocable_size_ > 0 && _query_ctx->enable_force_spill())) {
                 revocable_tasks.emplace_back(task.get());
             }
         }
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 6bf4cb39fd1..cd822ef15e2 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -26,6 +26,7 @@
 #include <ostream>
 #include <vector>
 
+#include "common/logging.h"
 #include "common/status.h"
 #include "pipeline/dependency.h"
 #include "pipeline/exec/operator.h"
@@ -403,6 +404,7 @@ Status PipelineTask::execute(bool* eos) {
                       << " has pending block, size: " << 
_pending_block->allocated_bytes();
             _block = std::move(_pending_block);
             block = _block.get();
+            *eos = _pending_eos;
         }
         // `_dry_run` means sink operator need no more data
         // `_sink->is_finished(_state)` means sink operator should be finished
@@ -420,7 +422,7 @@ Status PipelineTask::execute(bool* eos) {
                 auto st = thread_context()->try_reserve_memory(reserve_size);
 
                 COUNTER_UPDATE(_memory_reserve_times, 1);
-                if (!st.ok()) {
+                if (!st.ok() && !_state->enable_force_spill()) {
                     COUNTER_UPDATE(_memory_reserve_failed_times, 1);
                     auto debug_msg = fmt::format(
                             "Query: {} , try to reserve: {}, operator name: 
{}, operator id: {}, "
@@ -455,9 +457,16 @@ Status PipelineTask::execute(bool* eos) {
             DEFER_RELEASE_RESERVED();
             COUNTER_UPDATE(_memory_reserve_times, 1);
             const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, 
*eos);
-            if (_state->enable_reserve_memory()) {
+            auto workload_group = _state->get_query_ctx()->workload_group();
+            if (_state->enable_reserve_memory() && workload_group) {
                 status = 
thread_context()->try_reserve_memory(sink_reserve_size);
 
+                if (status.ok() && _state->enable_force_spill() && 
_sink->is_spillable() &&
+                    _sink->revocable_mem_size(_state) >=
+                            
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+                    status = Status(ErrorCode::QUERY_MEMORY_EXCEEDED, "Force 
Spill");
+                }
+
                 if (!status.ok()) {
                     COUNTER_UPDATE(_memory_reserve_failed_times, 1);
                     auto debug_msg = fmt::format(
@@ -481,7 +490,7 @@ Status PipelineTask::execute(bool* eos) {
                     _state->get_query_ctx()->set_memory_sufficient(false);
                     
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
                             _state->get_query_ctx()->shared_from_this(), 
sink_reserve_size);
-                    _eos = *eos;
+                    _pending_eos = *eos;
                     *eos = false;
                     continue;
                 }
@@ -644,7 +653,7 @@ std::string PipelineTask::debug_string() {
 }
 
 size_t PipelineTask::get_revocable_size() const {
-    if (_running || (_eos && !_pending_block)) {
+    if (_finalized || _running || (_eos && !_pending_block)) {
         return 0;
     }
 
@@ -652,6 +661,15 @@ size_t PipelineTask::get_revocable_size() const {
 }
 
 Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& 
spill_context) {
+    if (_finalized) {
+        if (spill_context) {
+            spill_context->on_task_finished();
+            VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", 
task: " << ((void*)this)
+                       << " finalized";
+        }
+        return Status::OK();
+    }
+
     RETURN_IF_ERROR(_root->revoke_memory(_state, spill_context));
 
     const auto revocable_size = _sink->revocable_mem_size(_state);
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index e10d4217464..99decc05a9d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -270,6 +270,7 @@ private:
     uint32_t _schedule_time = 0;
     std::unique_ptr<vectorized::Block> _block;
     std::unique_ptr<vectorized::Block> _pending_block;
+    bool _pending_eos = false;
 
     PipelineFragmentContext* _fragment_context = nullptr;
     MultiCoreTaskQueue* _task_queue = nullptr;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index ece9a843d33..96deed90fbf 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -704,7 +704,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
                                                                                
  tg_id);
             } else {
                 LOG(WARNING) << "Query/load id: " << 
print_id(query_ctx->query_id())
-                             << "can't find its workload group " << tg_id;
+                             << " can't find its workload group " << tg_id;
             }
         }
         // There is some logic in query ctx's dctor, we could not check if 
exists and delete the
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index b011e8d850f..55b51088c50 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -176,6 +176,10 @@ public:
                        : false;
     }
 
+    bool enable_force_spill() const {
+        return _query_options.__isset.enable_force_spill && 
_query_options.enable_force_spill;
+    }
+
     // global runtime filter mgr, the runtime filter have remote target or
     // need local merge should regist here. before publish() or 
push_to_remote()
     // the runtime filter should do the local merge work
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 86c33f5a334..4b61cd7892d 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -287,11 +287,6 @@ void WorkloadGroupMgr::add_paused_query(const 
std::shared_ptr<QueryContext>& que
     }
 }
 
-#ifdef BE_TEST
-constexpr int64_t TIMEOUT_IN_QUEUE_LIMIT = 10L;
-#else
-constexpr int64_t TIMEOUT_IN_QUEUE_LIMIT = 1000L * 60;
-#endif
 /**
  * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
  * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
@@ -425,7 +420,7 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 } else {
                     // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
                     // and then set wg's flag, other query may not free memory 
very quickly.
-                    if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE_LIMIT) {
+                    if (query_it->elapsed_time() > 
config::spill_in_paused_queue_timeout_ms) {
                         // set wg's memory to insufficent, then add it back to 
task scheduler to run.
                         LOG(INFO) << "Query: " << 
print_id(query_ctx->query_id())
                                   << " will be resume.";
@@ -676,7 +671,7 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
                           << "), resume it.";
                 query_ctx->set_memory_sufficient(true);
                 return true;
-            } else if (time_in_queue >= TIMEOUT_IN_QUEUE_LIMIT) {
+            } else if (time_in_queue >= 
config::spill_in_paused_queue_timeout_ms) {
                 // Use MEM_LIMIT_EXCEEDED so that FE could parse the error 
code and do try logic
                 auto msg1 = fmt::format(
                         "Query {} reserve memory failed, but could not find 
memory that could "
@@ -707,7 +702,7 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
                           << " paused caused by 
WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it.";
                 query_ctx->set_memory_sufficient(true);
                 return true;
-            } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) {
+            } else if (time_in_queue > 
config::spill_in_paused_queue_timeout_ms) {
                 LOG(INFO) << "Query: " << query_id << ", workload group 
exceeded, info: "
                           << 
GlobalMemoryArbitrator::process_memory_used_details_str()
                           << ", wg info: " << 
query_ctx->workload_group()->memory_debug_string();
@@ -731,7 +726,7 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<QueryContext>&
                           << ", wg info: " << 
query_ctx->workload_group()->memory_debug_string();
                 query_ctx->set_memory_sufficient(true);
                 return true;
-            } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) {
+            } else if (time_in_queue > 
config::spill_in_paused_queue_timeout_ms) {
                 LOG(INFO) << "Query: " << query_id << ", process limit 
exceeded, info: "
                           << 
GlobalMemoryArbitrator::process_memory_used_details_str()
                           << ", wg info: " << 
query_ctx->workload_group()->memory_debug_string();
diff --git a/be/src/vec/common/columns_hashing.h 
b/be/src/vec/common/columns_hashing.h
index f0a365cfc09..60c2370bc60 100644
--- a/be/src/vec/common/columns_hashing.h
+++ b/be/src/vec/common/columns_hashing.h
@@ -152,8 +152,12 @@ struct HashMethodSingleLowNullableColumn : public 
SingleColumnMethod {
 
     template <typename Data, typename Key>
     ALWAYS_INLINE FindResult find_key_with_hash(Data& data, size_t i, Key key, 
size_t hash_value) {
-        if (key_column->is_null_at(i) && data.has_null_key_data()) {
-            return FindResult {&data.template get_null_key_data<Mapped>(), 
true};
+        if (key_column->is_null_at(i)) {
+            if (data.has_null_key_data()) {
+                return FindResult {&data.template get_null_key_data<Mapped>(), 
true};
+            } else {
+                return FindResult {nullptr, false};
+            }
         }
         return Base::find_key_impl(key, hash_value, data);
     }
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 4dc553b1a57..38185ded5fb 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -734,7 +734,8 @@ void Block::clear_column_data(int64_t column_size) noexcept 
{
     }
     for (auto& d : data) {
         if (d.column) {
-            DCHECK_EQ(d.column->use_count(), 1) << " " << print_use_count();
+            // Temporarily disable reference count check because a column 
might be referenced multiple times within a block.
+            // DCHECK_EQ(d.column->use_count(), 1) << " " << print_use_count();
             (*std::move(d.column)).assume_mutable()->clear();
         }
     }
diff --git a/be/src/vec/functions/function_string.h 
b/be/src/vec/functions/function_string.h
index 6e4a18fdd31..7626f86b6a9 100644
--- a/be/src/vec/functions/function_string.h
+++ b/be/src/vec/functions/function_string.h
@@ -702,6 +702,8 @@ public:
 
     size_t get_number_of_arguments() const override { return 0; }
 
+    ColumnNumbers get_arguments_that_are_always_constant() const override { 
return {1, 2, 3}; }
+
     bool is_variadic() const override { return true; }
 
     Status execute_impl(FunctionContext* context, Block& block, const 
ColumnNumbers& arguments,
diff --git a/regression-test/suites/correctness_p0/test_mask_function.groovy 
b/regression-test/suites/correctness_p0/test_mask_function.groovy
index b7717ab183c..c1116e633d2 100644
--- a/regression-test/suites/correctness_p0/test_mask_function.groovy
+++ b/regression-test/suites/correctness_p0/test_mask_function.groovy
@@ -76,6 +76,27 @@ suite("test_mask_function") {
         select digital_masking(13812345678);
     """
 
+    test {
+        sql """
+            select mask('abcd', name) from table_mask_test order by id;
+        """
+        exception "Argument at index 1 for function mask must be constant"
+    }
+
+    test {
+        sql """
+            select mask('abcd', '>', name) from table_mask_test order by id;
+        """
+        exception "Argument at index 2 for function mask must be constant"
+    }
+
+    test {
+        sql """
+            select mask('abcd', '>', '<', `name`) from table_mask_test order 
by id;
+        """
+        exception "Argument at index 3 for function mask must be constant"
+    }
+
     test {
         sql """ select mask_last_n("12345", -100); """
         exception "function mask_last_n only accept non-negative input for 2nd 
argument but got -100"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to