This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new bb17194077b improve spill FE variables (#45934)
bb17194077b is described below

commit bb17194077bdd00ccbc59862b97781f3bab3b466
Author: TengJianPing <tengjianp...@selectdb.com>
AuthorDate: Wed Dec 25 16:46:33 2024 +0800

    improve spill FE variables (#45934)
---
 be/src/pipeline/dependency.cpp                     |   5 +-
 be/src/pipeline/dependency.h                       |  14 +-
 .../exec/partitioned_aggregation_sink_operator.cpp |   6 +-
 .../exec/partitioned_aggregation_sink_operator.h   |   2 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   |  30 ++---
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |   5 +-
 .../pipeline/exec/spill_sort_source_operator.cpp   |  17 +--
 be/src/pipeline/exec/spill_sort_source_operator.h  |   3 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |   2 +-
 be/src/runtime/runtime_state.h                     |  51 ++++---
 .../java/org/apache/doris/qe/SessionVariable.java  | 146 +++++++++------------
 gensrc/thrift/PaloInternalService.thrift           |  10 +-
 .../query/test_nested_type_with_resize.groovy      |   8 +-
 .../nereids_rules_p0/mv/variant/variant_mv.groovy  |   2 +-
 regression-test/suites/variant_p0/load.groovy      |   2 +-
 regression-test/suites/variant_p0/nested.groovy    |   4 +-
 .../suites/variant_p0/test_sub_path_pruning.groovy |   2 +-
 17 files changed, 151 insertions(+), 158 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index a7198a97da4..aee19ff58df 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -308,9 +308,8 @@ Status AggSharedState::reset_hash_table() {
             agg_data->method_variant);
 }
 
-void PartitionedAggSharedState::init_spill_params(size_t 
spill_partition_count_bits) {
-    partition_count_bits = spill_partition_count_bits;
-    partition_count = (1 << spill_partition_count_bits);
+void PartitionedAggSharedState::init_spill_params(size_t 
spill_partition_count) {
+    partition_count = spill_partition_count;
     max_partition_index = partition_count - 1;
 
     for (int i = 0; i < partition_count; ++i) {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 0f1539cadf2..13f983db3dd 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -453,24 +453,20 @@ struct PartitionedAggSharedState : public 
BasicSharedState,
 
     void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
 
-    void init_spill_params(size_t spill_partition_count_bits);
+    void init_spill_params(size_t spill_partition_count);
 
     void close();
 
     AggSharedState* in_mem_shared_state = nullptr;
     std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
 
-    size_t partition_count_bits;
     size_t partition_count;
     size_t max_partition_index;
     bool is_spilled = false;
     std::atomic_bool is_closed = false;
     std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
 
-    size_t get_partition_index(size_t hash_value) const {
-        // return (hash_value >> (32 - partition_count_bits)) & 
max_partition_index;
-        return hash_value % partition_count;
-    }
+    size_t get_partition_index(size_t hash_value) const { return hash_value % 
partition_count; }
 };
 
 struct AggSpillPartition {
@@ -523,14 +519,12 @@ struct SpillSortSharedState : public BasicSharedState,
     SpillSortSharedState() = default;
     ~SpillSortSharedState() override = default;
 
-    // This number specifies the maximum size of sub blocks
-    static constexpr size_t SORT_BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024;
-    void update_spill_block_batch_row_count(const vectorized::Block* block) {
+    void update_spill_block_batch_row_count(RuntimeState* state, const 
vectorized::Block* block) {
         auto rows = block->rows();
         if (rows > 0 && 0 == avg_row_bytes) {
             avg_row_bytes = std::max((std::size_t)1, block->bytes() / rows);
             spill_block_batch_row_count =
-                    (SORT_BLOCK_SPILL_BATCH_BYTES + avg_row_bytes - 1) / 
avg_row_bytes;
+                    (state->spill_sort_batch_bytes() + avg_row_bytes - 1) / 
avg_row_bytes;
             LOG(INFO) << "spill sort block batch row count: " << 
spill_block_batch_row_count;
         }
     }
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 58b272b3ac8..8cc6ae58a4f 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -47,7 +47,7 @@ Status 
PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
     _init_counters();
 
     auto& parent = Base::_parent->template cast<Parent>();
-    Base::_shared_state->init_spill_params(parent._spill_partition_count_bits);
+    Base::_shared_state->init_spill_params(parent._spill_partition_count);
 
     RETURN_IF_ERROR(setup_in_memory_agg_op(state));
 
@@ -155,9 +155,7 @@ 
PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int o
 Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::init(tnode, 
state));
     _name = "PARTITIONED_AGGREGATION_SINK_OPERATOR";
-    if (state->query_options().__isset.external_agg_partition_bits) {
-        _spill_partition_count_bits = 
state->query_options().external_agg_partition_bits;
-    }
+    _spill_partition_count = state->spill_aggregation_partition_count();
 
     
_agg_sink_operator->set_dests_id(DataSinkOperatorX<PartitionedAggSinkLocalState>::dests_id());
     RETURN_IF_ERROR(
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 499db4919e7..dae3ee4f4b3 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -337,6 +337,6 @@ private:
     friend class PartitionedAggSinkLocalState;
     std::unique_ptr<AggSinkOperatorX> _agg_sink_operator;
 
-    size_t _spill_partition_count_bits = 5;
+    size_t _spill_partition_count = 32;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 95675004c70..2e2c38f04c3 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -534,6 +534,19 @@ Status 
PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState*
     return Status::OK();
 }
 
+// After building hash table it will not be able to spill later
+// even if memory is low, and will cause cancel of queries.
+// So make a check here, if build blocks mem usage is too high,
+// then trigger revoke memory.
+static bool is_revocable_mem_high_watermark(RuntimeState* state, size_t 
revocable_size,
+                                            int64_t query_mem_limit) {
+    auto revocable_memory_high_watermark_percent =
+            state->spill_revocable_memory_high_watermark_percent();
+    return revocable_memory_high_watermark_percent > 0 &&
+           revocable_size >=
+                   (double)query_mem_limit / 100.0 * 
revocable_memory_high_watermark_percent;
+}
+
 Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                               bool eos) {
     auto& local_state = get_local_state(state);
@@ -575,16 +588,7 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
                             "sink_eos failed");
                 });
 
-                // TODO: consider parallel?
-                // After building hash table it will not be able to spill later
-                // even if memory is low, and will cause cancel of queries.
-                // So make a check here, if build blocks mem usage is too high,
-                // then trigger revoke memory.
-                auto revocable_memory_high_watermark_percent =
-                        state->revocable_memory_high_watermark_percent();
-                if (revocable_memory_high_watermark_percent > 0 &&
-                    revocable_size >= (double)query_mem_limit / 100.0 *
-                                              
revocable_memory_high_watermark_percent) {
+                if (is_revocable_mem_high_watermark(state, revocable_size, 
query_mem_limit)) {
                     LOG(INFO) << fmt::format(
                             "Query: {}, task {}, hash join sink {} eos, 
revoke_memory "
                             "because revocable memory is high",
@@ -636,11 +640,7 @@ Status 
PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B
         });
 
         if (eos) {
-            auto revocable_memory_high_watermark_percent =
-                    state->revocable_memory_high_watermark_percent();
-            if (revocable_memory_high_watermark_percent > 0 &&
-                revocable_size >=
-                        (double)query_mem_limit / 100.0 * 
revocable_memory_high_watermark_percent) {
+            if (is_revocable_mem_high_watermark(state, revocable_size, 
query_mem_limit)) {
                 LOG(INFO) << fmt::format(
                         "Query: {}, task {}, hash join sink {} eos, 
revoke_memory "
                         "because revocable memory is high",
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 2fa0c0ce8e1..03c4072f7de 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -155,7 +155,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* 
state, vectorized::Bloc
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     if (in_block->rows() > 0) {
-        
local_state._shared_state->update_spill_block_batch_row_count(in_block);
+        local_state._shared_state->update_spill_block_batch_row_count(state, 
in_block);
     }
     local_state._eos = eos;
     DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::sink",
@@ -201,8 +201,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* 
state,
 
     auto status = 
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
             state, _spilling_stream, print_id(state->query_id()), "sort", 
_parent->node_id(),
-            _shared_state->spill_block_batch_row_count,
-            SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile());
+            _shared_state->spill_block_batch_row_count, 
state->spill_sort_batch_bytes(), profile());
     RETURN_IF_ERROR(status);
 
     _shared_state->sorted_streams.emplace_back(_spilling_stream);
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp 
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 3464ecd847f..8a58d0b1504 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -29,11 +29,8 @@
 
 namespace doris::pipeline {
 SpillSortLocalState::SpillSortLocalState(RuntimeState* state, OperatorXBase* 
parent)
-        : Base(state, parent) {
-    if (state->external_sort_bytes_threshold() > 0) {
-        _external_sort_bytes_threshold = 
state->external_sort_bytes_threshold();
-    }
-}
+        : Base(state, parent) {}
+
 Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     init_spill_write_counters();
@@ -67,8 +64,8 @@ Status SpillSortLocalState::close(RuntimeState* state) {
     dec_running_big_mem_op_num(state);
     return Base::close(state);
 }
-int SpillSortLocalState::_calc_spill_blocks_to_merge() const {
-    int count = _external_sort_bytes_threshold / 
SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES;
+int SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) 
const {
+    int count = state->spill_sort_mem_limit() / 
state->spill_sort_batch_bytes();
     return std::max(2, count);
 }
 Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* 
state) {
@@ -101,7 +98,7 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
         vectorized::Block merge_sorted_block;
         vectorized::SpillStreamSPtr tmp_stream;
         while (!state->is_cancelled()) {
-            int max_stream_count = _calc_spill_blocks_to_merge();
+            int max_stream_count = _calc_spill_blocks_to_merge(state);
             VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << 
_parent->node_id()
                        << " merge spill streams, streams count: "
                        << _shared_state->sorted_streams.size()
@@ -122,8 +119,8 @@ Status 
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
             {
                 status = 
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
                         state, tmp_stream, print_id(state->query_id()), 
"sort", _parent->node_id(),
-                        _shared_state->spill_block_batch_row_count,
-                        SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, 
profile());
+                        _shared_state->spill_block_batch_row_count, 
state->spill_sort_batch_bytes(),
+                        profile());
                 RETURN_IF_ERROR(status);
 
                 _shared_state->sorted_streams.emplace_back(tmp_stream);
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h 
b/be/src/pipeline/exec/spill_sort_source_operator.h
index a7b8e8efde8..fae64e051f4 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -47,7 +47,7 @@ public:
     Status initiate_merge_sort_spill_streams(RuntimeState* state);
 
 protected:
-    int _calc_spill_blocks_to_merge() const;
+    int _calc_spill_blocks_to_merge(RuntimeState* state) const;
     Status _create_intermediate_merger(int num_blocks,
                                        const vectorized::SortDescription& 
sort_description);
     friend class SpillSortSourceOperatorX;
@@ -55,7 +55,6 @@ protected:
 
     bool _opened = false;
 
-    int64_t _external_sort_bytes_threshold = 134217728; // 128M
     std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index f86fb491d71..9acd38af7b0 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1365,7 +1365,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             auto tnode_ = tnode;
             /// TODO: support rf in partitioned hash join
             tnode_.runtime_filters.clear();
-            const uint32_t partition_count = 32;
+            uint32_t partition_count = 
_runtime_state->spill_hash_join_partition_count();
             auto inner_probe_operator =
                     std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, 
descs);
             auto inner_sink_operator =
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 879bd647d96..2220c1fc41e 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -504,13 +504,6 @@ public:
                        : 0;
     }
 
-    int64_t external_sort_bytes_threshold() const {
-        if (_query_options.__isset.external_sort_bytes_threshold) {
-            return _query_options.external_sort_bytes_threshold;
-        }
-        return 0;
-    }
-
     void set_be_exec_version(int32_t version) noexcept { 
_query_options.be_exec_version = version; }
 
     inline bool enable_delete_sub_pred_v2() const {
@@ -558,10 +551,6 @@ public:
                                             std::shared_ptr<IRuntimeFilter>* 
producer_filter);
     bool is_nereids() const;
 
-    bool enable_reserve_memory() const {
-        return _query_options.__isset.enable_reserve_memory && 
_query_options.enable_reserve_memory;
-    }
-
     bool enable_spill() const {
         return (_query_options.__isset.enable_force_spill && 
_query_options.enable_force_spill) ||
                (_query_options.__isset.enable_spill && 
_query_options.enable_spill);
@@ -571,9 +560,8 @@ public:
         return _query_options.__isset.enable_force_spill && 
_query_options.enable_force_spill;
     }
 
-    bool enable_local_merge_sort() const {
-        return _query_options.__isset.enable_local_merge_sort &&
-               _query_options.enable_local_merge_sort;
+    bool enable_reserve_memory() const {
+        return _query_options.__isset.enable_reserve_memory && 
_query_options.enable_reserve_memory;
     }
 
     int64_t min_revocable_mem() const {
@@ -583,13 +571,46 @@ public:
         return 1;
     }
 
-    int revocable_memory_high_watermark_percent() const {
+    int64_t spill_sort_mem_limit() const {
+        if (_query_options.__isset.spill_sort_mem_limit) {
+            return std::max(_query_options.spill_sort_mem_limit, 
(int64_t)16777216);
+        }
+        return 134217728;
+    }
+
+    int64_t spill_sort_batch_bytes() const {
+        if (_query_options.__isset.spill_sort_batch_bytes) {
+            return std::max(_query_options.spill_sort_batch_bytes, 
(int64_t)8388608);
+        }
+        return 8388608;
+    }
+
+    int spill_aggregation_partition_count() const {
+        if (_query_options.__isset.spill_aggregation_partition_count) {
+            return 
std::min(std::max(_query_options.spill_aggregation_partition_count, 16), 8192);
+        }
+        return 32;
+    }
+
+    int spill_hash_join_partition_count() const {
+        if (_query_options.__isset.spill_hash_join_partition_count) {
+            return 
std::min(std::max(_query_options.spill_hash_join_partition_count, 16), 8192);
+        }
+        return 32;
+    }
+
+    int spill_revocable_memory_high_watermark_percent() const {
         if (_query_options.__isset.revocable_memory_high_watermark_percent) {
             return _query_options.revocable_memory_high_watermark_percent;
         }
         return -1;
     }
 
+    bool enable_local_merge_sort() const {
+        return _query_options.__isset.enable_local_merge_sort &&
+               _query_options.enable_local_merge_sort;
+    }
+
     size_t minimum_operator_memory_required_bytes() const {
         if (_query_options.__isset.minimum_operator_memory_required_kb) {
             return _query_options.minimum_operator_memory_required_kb * 1024;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8de6f379ecd..bab6a34528e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -397,8 +397,6 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String INTERNAL_SESSION = "internal_session";
 
-    public static final String PARTITIONED_HASH_AGG_ROWS_THRESHOLD = 
"partitioned_hash_agg_rows_threshold";
-
     public static final String PARTITION_PRUNING_EXPAND_THRESHOLD = 
"partition_pruning_expand_threshold";
 
     public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN
@@ -560,14 +558,17 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final String HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES = 
"huge_table_lower_bound_size_in_bytes";
 
     // for spill to disk
-    public static final String EXTERNAL_SORT_BYTES_THRESHOLD = 
"external_sort_bytes_threshold";
-    public static final String EXTERNAL_AGG_PARTITION_BITS = 
"external_agg_partition_bits";
-    public static final String SPILL_STREAMING_AGG_MEM_LIMIT = 
"spill_streaming_agg_mem_limit";
-    public static final String MIN_REVOCABLE_MEM = "min_revocable_mem";
     public static final String ENABLE_SPILL = "enable_spill";
-    public static final String REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT = 
"revocable_memory_high_watermark_percent";
-    public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory";
     public static final String ENABLE_FORCE_SPILL = "enable_force_spill";
+    public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory";
+    public static final String SPILL_MIN_REVOCABLE_MEM = 
"spill_min_revocable_mem";
+    public static final String SPILL_SORT_MEM_LIMIT = "spill_sort_mem_limit";
+    // spill_sort_batch_bytes controls the memory size of a sindle block data 
of spill sort.
+    public static final String SPILL_SORT_BATCH_BYTES = 
"spill_sort_batch_bytes";
+    public static final String SPILL_AGGREGATION_PARTITION_COUNT = 
"spill_aggregation_partition_count";
+    public static final String SPILL_STREAMING_AGG_MEM_LIMIT = 
"spill_streaming_agg_mem_limit";
+    public static final String SPILL_HASH_JOIN_PARTITION_COUNT = 
"spill_hash_join_partition_count";
+    public static final String SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT = 
"spill_revocable_memory_high_watermark_percent";
     public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
 
     public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";
@@ -1586,10 +1587,6 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = INTERNAL_SESSION)
     public boolean internalSession = false;
 
-    // Use partitioned hash join if build side row count >= the threshold . 0 
- the threshold is not set.
-    @VariableMgr.VarAttr(name = PARTITIONED_HASH_AGG_ROWS_THRESHOLD, fuzzy = 
true)
-    public int partitionedHashAggRowsThreshold = 0;
-
     @VariableMgr.VarAttr(name = PARTITION_PRUNING_EXPAND_THRESHOLD, fuzzy = 
true)
     public int partitionPruningExpandThreshold = 10;
 
@@ -2187,10 +2184,6 @@ public class SessionVariable implements Serializable, 
Writable {
     public boolean disableEmptyPartitionPrune = false;
     // CLOUD_VARIABLES_END
 
-    // for spill to disk
-    @VariableMgr.VarAttr(name = MIN_REVOCABLE_MEM, fuzzy = true)
-    public long minRevocableMem = 32 * 1024 * 1024;
-
     // fetch remote schema rpc timeout
     @VariableMgr.VarAttr(name = FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS, fuzzy = 
true)
     public long fetchRemoteSchemaTimeoutSeconds = 120;
@@ -2198,14 +2191,6 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true)
     public int maxFetchRemoteTabletCount = 512;
 
-    @VariableMgr.VarAttr(
-            name = ENABLE_RESERVE_MEMORY,
-            description = {"控制是否启用分配内存前先reverve memory的功能。默认为 true。",
-                    "Controls whether to enable reserve memory before 
allocating memory. "
-                            + "The default value is true."},
-            needForward = true, fuzzy = true)
-    public boolean enableReserveMemory = true;
-
     @VariableMgr.VarAttr(
             name = "ENABLE_COMPRESS_MATERIALIZE",
             description = {"控制是否启用compress materialize。",
@@ -2215,6 +2200,14 @@ public class SessionVariable implements Serializable, 
Writable {
     )
     public boolean enableCompressMaterialize = false;
 
+    @VariableMgr.VarAttr(
+            name = DATA_QUEUE_MAX_BLOCKS,
+            description = {"DataQueue 中每个子队列允许最大的 block 个数",
+                    "Max blocks in DataQueue."},
+            needForward = true, fuzzy = true)
+    public long dataQueueMaxBlocks = 1;
+
+    // for spill to disk
     @VariableMgr.VarAttr(
             name = ENABLE_SPILL,
             description = {"控制是否启用查询算子落盘。默认为 false。",
@@ -2233,32 +2226,39 @@ public class SessionVariable implements Serializable, 
Writable {
     public boolean enableForceSpill = false;
 
     @VariableMgr.VarAttr(
-            name = DATA_QUEUE_MAX_BLOCKS,
-            description = {"DataQueue 中每个子队列允许最大的 block 个数",
-                    "Max blocks in DataQueue."},
+            name = ENABLE_RESERVE_MEMORY,
+            description = {"控制是否启用分配内存前先reverve memory的功能。默认为 true。",
+                    "Controls whether to enable reserve memory before 
allocating memory. "
+                            + "The default value is true."},
             needForward = true, fuzzy = true)
-    public long dataQueueMaxBlocks = 1;
+    public boolean enableReserveMemory = true;
 
-    @VariableMgr.VarAttr(name = REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT, fuzzy 
= true)
-    public int revocableMemoryHighWatermarkPercent = -1;
+    @VariableMgr.VarAttr(name = SPILL_MIN_REVOCABLE_MEM, fuzzy = true)
+    public long spillMinRevocableMem = 32 * 1024 * 1024;
 
-    // If the memory consumption of sort node exceed this limit, will trigger 
spill to disk;
-    // Set to 0 to disable; min: 128M
-    public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
-    @VariableMgr.VarAttr(name = EXTERNAL_SORT_BYTES_THRESHOLD,
-            checker = "checkExternalSortBytesThreshold", varType = 
VariableAnnotation.DEPRECATED)
-    public long externalSortBytesThreshold = 0;
+    // spill_sort_mem_limit controls the memory usage during merge sort phase 
of spill sort.
+    // During merge sort phase, mutiple sorted blocks will be read into memory 
and do merge sort,
+    // the count of blocks should be controlled or else will cause OOM, it's 
calculated as
+    // std::max(spill_sort_mem_limit / spill_sort_batch_bytes, 2)
+    @VariableMgr.VarAttr(name = SPILL_SORT_MEM_LIMIT)
+    public long spillSortMemLimit = 134217728; // 128M
+
+    @VariableMgr.VarAttr(name = SPILL_SORT_BATCH_BYTES)
+    public long spillSortBatchBytes = 8388608; // 8M
+
+    @VariableMgr.VarAttr(name = SPILL_AGGREGATION_PARTITION_COUNT, fuzzy = 
true)
+    public int spillAggregationPartitionCount = 32;
 
     // The memory limit of streaming agg when spilling is enabled
     // NOTE: streaming agg operator will not spill to disk.
     @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT, fuzzy = true)
     public long spillStreamingAggMemLimit = 268435456; //256MB
 
-    public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4;
-    public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 20;
-    @VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS,
-            checker = "checkExternalAggPartitionBits", fuzzy = true)
-    public int externalAggPartitionBits = 5; // means that the hash table will 
be partitioned into 32 blocks.
+    @VariableMgr.VarAttr(name = SPILL_HASH_JOIN_PARTITION_COUNT, fuzzy = true)
+    public int spillHashJoinPartitionCount = 32;
+
+    @VariableMgr.VarAttr(name = SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT, 
fuzzy = true)
+    public int spillRevocableMemoryHighWatermarkPercent = -1;
 
     @VariableMgr.VarAttr(name = USE_MAX_LENGTH_OF_VARCHAR_IN_CTAS, needForward 
= true, description = {
             "在CTAS中,如果 CHAR / VARCHAR 列不来自于源表,是否是将这一列的长度设置为 MAX,即65533。默认为 
true。",
@@ -2372,7 +2372,6 @@ public class SessionVariable implements Serializable, 
Writable {
         // this.disableJoinReorder = random.nextBoolean();
         this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
         this.disableStreamPreaggregations = random.nextBoolean();
-        this.partitionedHashAggRowsThreshold = random.nextBoolean() ? 8 : 
1048576;
         this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
         // this.enableHashJoinEarlyStartProbe = random.nextBoolean();
         this.enableParallelResultSink = random.nextBoolean();
@@ -2394,23 +2393,23 @@ public class SessionVariable implements Serializable, 
Writable {
         /*
         switch (randomInt) {
             case 0:
-                this.externalSortBytesThreshold = 0;
+                this.spillSortBytesThreshold = 0;
                 this.externalAggBytesThreshold = 0;
                 break;
             case 1:
-                this.externalSortBytesThreshold = 1;
+                this.spillSortBytesThreshold = 1;
                 this.externalAggBytesThreshold = 1;
-                this.externalAggPartitionBits = 6;
+                this.spillAggregationPartitionCount = 6;
                 break;
             case 2:
-                this.externalSortBytesThreshold = 1024 * 1024;
+                this.spillSortBytesThreshold = 1024 * 1024;
                 this.externalAggBytesThreshold = 1024 * 1024;
-                this.externalAggPartitionBits = 8;
+                this.spillAggregationPartitionCount = 8;
                 break;
             default:
-                this.externalSortBytesThreshold = 100 * 1024 * 1024 * 1024;
+                this.spillSortBytesThreshold = 100 * 1024 * 1024 * 1024;
                 this.externalAggBytesThreshold = 100 * 1024 * 1024 * 1024;
-                this.externalAggPartitionBits = 4;
+                this.spillAggregationPartitionCount = 4;
                 break;
         }
         */
@@ -2493,16 +2492,16 @@ public class SessionVariable implements Serializable, 
Writable {
                 randomInt = random.nextInt(4);
                 switch (randomInt) {
                     case 0:
-                        this.minRevocableMem = 0;
+                        this.spillMinRevocableMem = 0;
                         break;
                     case 1:
-                        this.minRevocableMem = 1;
+                        this.spillMinRevocableMem = 1;
                         break;
                     case 2:
-                        this.minRevocableMem = 1024 * 1024;
+                        this.spillMinRevocableMem = 1024 * 1024;
                         break;
                     default:
-                        this.minRevocableMem = 100L * 1024 * 1024 * 1024;
+                        this.spillMinRevocableMem = 100L * 1024 * 1024 * 1024;
                         break;
                 }
             } else {
@@ -3659,24 +3658,6 @@ public class SessionVariable implements Serializable, 
Writable {
         return dropTableIfCtasFailed;
     }
 
-    public void checkExternalSortBytesThreshold(String 
externalSortBytesThreshold) {
-        long value = Long.valueOf(externalSortBytesThreshold);
-        if (value > 0 && value < MIN_EXTERNAL_SORT_BYTES_THRESHOLD) {
-            LOG.warn("external sort bytes threshold: {}, min: {}", value, 
MIN_EXTERNAL_SORT_BYTES_THRESHOLD);
-            throw new UnsupportedOperationException("minimum value is " + 
MIN_EXTERNAL_SORT_BYTES_THRESHOLD);
-        }
-    }
-
-    public void checkExternalAggPartitionBits(String externalAggPartitionBits) 
{
-        int value = Integer.valueOf(externalAggPartitionBits);
-        if (value < MIN_EXTERNAL_AGG_PARTITION_BITS || value > 
MAX_EXTERNAL_AGG_PARTITION_BITS) {
-            LOG.warn("external agg bytes threshold: {}, min: {}, max: {}",
-                    value, MIN_EXTERNAL_AGG_PARTITION_BITS, 
MAX_EXTERNAL_AGG_PARTITION_BITS);
-            throw new UnsupportedOperationException("min value is " + 
MIN_EXTERNAL_AGG_PARTITION_BITS + " max value is "
-                    + MAX_EXTERNAL_AGG_PARTITION_BITS);
-        }
-    }
-
     public void checkQueryTimeoutValid(String newQueryTimeout) {
         int value = Integer.valueOf(newQueryTimeout);
         if (value <= 0) {
@@ -3896,14 +3877,6 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setSkipDeleteBitmap(skipDeleteBitmap);
 
-        
tResult.setPartitionedHashAggRowsThreshold(partitionedHashAggRowsThreshold);
-
-        tResult.setExternalSortBytesThreshold(externalSortBytesThreshold);
-
-        tResult.setExternalAggBytesThreshold(0); // disable for now
-
-        tResult.setSpillStreamingAggMemLimit(spillStreamingAggMemLimit);
-
         tResult.setEnableFileCache(enableFileCache);
 
         tResult.setEnablePageCache(enablePageCache);
@@ -3941,12 +3914,19 @@ public class SessionVariable implements Serializable, 
Writable {
         
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
         tResult.setSkipBadTablet(skipBadTablet);
         tResult.setDisableFileCache(disableFileCache);
-        tResult.setEnableReserveMemory(enableReserveMemory);
+        
+        // for spill
         tResult.setEnableSpill(enableSpill);
         tResult.setEnableForceSpill(enableForceSpill);
-        tResult.setExternalAggPartitionBits(externalAggPartitionBits);
-        tResult.setMinRevocableMem(minRevocableMem);
-        
tResult.setRevocableMemoryHighWatermarkPercent(revocableMemoryHighWatermarkPercent);
+        tResult.setEnableReserveMemory(enableReserveMemory);
+        tResult.setMinRevocableMem(spillMinRevocableMem);
+        tResult.setSpillSortMemLimit(spillSortMemLimit);
+        tResult.setSpillSortBatchBytes(spillSortBatchBytes);
+        
tResult.setSpillAggregationPartitionCount(spillAggregationPartitionCount);
+        tResult.setSpillStreamingAggMemLimit(spillStreamingAggMemLimit);
+        tResult.setSpillHashJoinPartitionCount(spillHashJoinPartitionCount);
+        
tResult.setRevocableMemoryHighWatermarkPercent(spillRevocableMemoryHighWatermarkPercent);
+
         tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
 
         tResult.setEnableLocalMergeSort(enableLocalMergeSort);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 8cf33a6218b..f99a88e55b6 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -196,9 +196,10 @@ struct TQueryOptions {
 
   58: optional i32 repeat_max_num = 0 // Deprecated
 
+  // deprecated, use spill_sort_mem_limit
   59: optional i64 external_sort_bytes_threshold = 0
 
-  // deprecated
+  // Not used any more
   60: optional i32 partitioned_hash_agg_rows_threshold = 0
 
   61: optional bool enable_file_cache = false
@@ -214,9 +215,10 @@ struct TQueryOptions {
   66: optional i32 parallel_instance = 1
   // Indicate where useServerPrepStmts enabled
   67: optional bool mysql_row_binary_format = false;
+  // Not used anymore
   68: optional i64 external_agg_bytes_threshold = 0
 
-  // partition count(1 << external_agg_partition_bits) when spill aggregation 
data into disk
+  // Not used anymore, use spill_aggregation_partition_count
   69: optional i32 external_agg_partition_bits = 4
 
   // Specify base path for file cache
@@ -369,6 +371,10 @@ struct TQueryOptions {
   145: optional bool enable_spill = false
   146: optional bool enable_reserve_memory = true
   147: optional i32 revocable_memory_high_watermark_percent = -1
+  148: optional i64 spill_sort_mem_limit = 134217728
+  149: optional i64 spill_sort_batch_bytes = 8388608
+  150: optional i32 spill_aggregation_partition_count = 32
+  151: optional i32 spill_hash_join_partition_count = 32
 
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
diff --git 
a/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
 
b/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
index eaf8c7e996b..2918d2ffbcd 100644
--- 
a/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
+++ 
b/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy
@@ -65,9 +65,9 @@ suite("test_nested_type_with_resize") {
         }
     }
 
-    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
-    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
-    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
-    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
+    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
+    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
+    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
+    order_qt_sql """ /*set 
ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re
 [...]
 
 }
diff --git 
a/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy 
b/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
index 493873fd5c8..d81f01aeeaa 100644
--- a/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
+++ b/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy
@@ -574,7 +574,7 @@ suite("variant_mv") {
     where g2.actor['id'] > 34259289;
     """
     def query3_6 = """
-    SELECT  
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=0,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=32,parallel_scan_min_rows_per_scanner=64,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=1,enable_parallel_result_sink=false,ena
 [...]
+    SELECT  
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=0,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=32,parallel_scan_min_rows_per_scanner=64,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=1,enable_parallel_result_sink=false,ena
 [...]
     g1.id,
     g2.type,
     floor(cast(g1.actor['id'] as int) + 100.5),
diff --git a/regression-test/suites/variant_p0/load.groovy 
b/regression-test/suites/variant_p0/load.groovy
index 5abc3346f4d..1b93ecc9747 100644
--- a/regression-test/suites/variant_p0/load.groovy
+++ b/regression-test/suites/variant_p0/load.groovy
@@ -290,7 +290,7 @@ suite("regression_test_variant", "p0"){
         sql """insert into ${table_name} values (5, '{"i" : 1}'), (1, '{"a" : 
1}')"""
         sql """insert into ${table_name} values (6, '{"j" : 1}'), (1, '{"a" : 
1}')"""
         sql """insert into ${table_name} values (6, '{"k" : 1}'), (1, '{"a" : 
1}')"""
-        sql "select 
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=1,parallel_pipeline_task_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=
 [...]
+        sql "select 
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=1,parallel_pipeline_task_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=
 [...]
         qt_sql_36_1 "select cast(v['a'] as int), cast(v['b'] as int), 
cast(v['c'] as int) from ${table_name} order by k limit 10"
         sql "DELETE FROM ${table_name} WHERE k=1"
         sql "select * from ${table_name}"
diff --git a/regression-test/suites/variant_p0/nested.groovy 
b/regression-test/suites/variant_p0/nested.groovy
index 584ba0e336d..6ae7ee2c3a7 100644
--- a/regression-test/suites/variant_p0/nested.groovy
+++ b/regression-test/suites/variant_p0/nested.groovy
@@ -133,7 +133,7 @@ suite("regression_test_variant_nested", "p0"){
 
         qt_sql """select  
/*+SET_VAR(batch_size=1024,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_
 
parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16
-,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=true,sort_phase_num=0,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=true,partitioned_hash_join_rows_threshold=8,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadca
 [...]
+,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=true,sort_phase_num=0,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=true,partitioned_hash_join_rows_threshold=8,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadca
 [...]
         qt_sql """select * from var_nested where v['k2'] = 'some'  and 
array_contains(cast(v['nested1']['nested2']['a'] as array<tinyint>), 10) order 
by k limit 1;"""
 
         sql """INSERT INTO var_nested SELECT *, '{"k1":1, "k2": "some", "k3" : 
[1234], "k4" : 1.10000, "k5" : [[123]], "nested1" : {"nested2" : [{"a" : 10, 
"b" : 1.1, "c" : "1111"}]}}' FROM numbers("number" = "4096") where number > 
1024 limit 1024;"""
@@ -162,7 +162,7 @@ 
parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level
                 properties("replication_num" = "1", "disable_auto_compaction" 
= "false", "enable_unique_key_merge_on_write" = "true", 
"variant_enable_flatten_nested" = "true");
             """
         sql """insert into var_nested2 select * from var_nested order by k 
limit 1024"""
-        qt_sql """select  
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_res
 [...]
+        qt_sql """select  
/*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_res
 [...]
         qt_sql """select v['nested'] from var_nested2 where k < 10 order by k 
limit 10;"""
         // 0. nomal explode variant array
         order_qt_explode_sql """select count(),cast(vv['xx'] as int) from 
var_nested lateral view explode_variant_array(v['nested']) tmp as vv where 
vv['xx'] = 10 group by cast(vv['xx'] as int)"""
diff --git a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy 
b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
index 4226de14d81..4b9cc317318 100644
--- a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
+++ b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
@@ -139,7 +139,7 @@ suite("variant_sub_path_pruning", "variant_type"){
 
     // two children
     order_qt_sql """
-        select  
/*+SET_VAR(batch_size=50,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=6,parallel_pipeline_task_num=2,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_nereids_planner=true,rewrite_or_to_in_predicate_thresh
 [...]
+        select  
/*+SET_VAR(batch_size=50,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=6,parallel_pipeline_task_num=2,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_nereids_planner=true,rewrite_or_to_in_predicate_thresh
 [...]
         """
     order_qt_sql """select c1['a'] from (select dt as c1 from pruning_test 
union all select dt as c1 from pruning_test) v1;"""
     order_qt_sql """select c1['b'] from (select dt['a'] as c1 from 
pruning_test union all select dt['a'] as c1 from pruning_test) v1;"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to