This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new bb17194077b improve spill FE variables (#45934) bb17194077b is described below commit bb17194077bdd00ccbc59862b97781f3bab3b466 Author: TengJianPing <tengjianp...@selectdb.com> AuthorDate: Wed Dec 25 16:46:33 2024 +0800 improve spill FE variables (#45934) --- be/src/pipeline/dependency.cpp | 5 +- be/src/pipeline/dependency.h | 14 +- .../exec/partitioned_aggregation_sink_operator.cpp | 6 +- .../exec/partitioned_aggregation_sink_operator.h | 2 +- .../exec/partitioned_hash_join_sink_operator.cpp | 30 ++--- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 5 +- .../pipeline/exec/spill_sort_source_operator.cpp | 17 +-- be/src/pipeline/exec/spill_sort_source_operator.h | 3 +- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- be/src/runtime/runtime_state.h | 51 ++++--- .../java/org/apache/doris/qe/SessionVariable.java | 146 +++++++++------------ gensrc/thrift/PaloInternalService.thrift | 10 +- .../query/test_nested_type_with_resize.groovy | 8 +- .../nereids_rules_p0/mv/variant/variant_mv.groovy | 2 +- regression-test/suites/variant_p0/load.groovy | 2 +- regression-test/suites/variant_p0/nested.groovy | 4 +- .../suites/variant_p0/test_sub_path_pruning.groovy | 2 +- 17 files changed, 151 insertions(+), 158 deletions(-) diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp index a7198a97da4..aee19ff58df 100644 --- a/be/src/pipeline/dependency.cpp +++ b/be/src/pipeline/dependency.cpp @@ -308,9 +308,8 @@ Status AggSharedState::reset_hash_table() { agg_data->method_variant); } -void PartitionedAggSharedState::init_spill_params(size_t spill_partition_count_bits) { - partition_count_bits = spill_partition_count_bits; - partition_count = (1 << spill_partition_count_bits); +void PartitionedAggSharedState::init_spill_params(size_t spill_partition_count) { + partition_count = spill_partition_count; max_partition_index = partition_count - 1; for (int i = 0; i < partition_count; ++i) { diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 0f1539cadf2..13f983db3dd 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -453,24 +453,20 @@ struct PartitionedAggSharedState : public BasicSharedState, void update_spill_stream_profiles(RuntimeProfile* source_profile) override; - void init_spill_params(size_t spill_partition_count_bits); + void init_spill_params(size_t spill_partition_count); void close(); AggSharedState* in_mem_shared_state = nullptr; std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr; - size_t partition_count_bits; size_t partition_count; size_t max_partition_index; bool is_spilled = false; std::atomic_bool is_closed = false; std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions; - size_t get_partition_index(size_t hash_value) const { - // return (hash_value >> (32 - partition_count_bits)) & max_partition_index; - return hash_value % partition_count; - } + size_t get_partition_index(size_t hash_value) const { return hash_value % partition_count; } }; struct AggSpillPartition { @@ -523,14 +519,12 @@ struct SpillSortSharedState : public BasicSharedState, SpillSortSharedState() = default; ~SpillSortSharedState() override = default; - // This number specifies the maximum size of sub blocks - static constexpr size_t SORT_BLOCK_SPILL_BATCH_BYTES = 8 * 1024 * 1024; - void update_spill_block_batch_row_count(const vectorized::Block* block) { + void update_spill_block_batch_row_count(RuntimeState* state, const vectorized::Block* block) { auto rows = block->rows(); if (rows > 0 && 0 == avg_row_bytes) { avg_row_bytes = std::max((std::size_t)1, block->bytes() / rows); spill_block_batch_row_count = - (SORT_BLOCK_SPILL_BATCH_BYTES + avg_row_bytes - 1) / avg_row_bytes; + (state->spill_sort_batch_bytes() + avg_row_bytes - 1) / avg_row_bytes; LOG(INFO) << "spill sort block batch row count: " << spill_block_batch_row_count; } } diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 58b272b3ac8..8cc6ae58a4f 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -47,7 +47,7 @@ Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state, _init_counters(); auto& parent = Base::_parent->template cast<Parent>(); - Base::_shared_state->init_spill_params(parent._spill_partition_count_bits); + Base::_shared_state->init_spill_params(parent._spill_partition_count); RETURN_IF_ERROR(setup_in_memory_agg_op(state)); @@ -155,9 +155,7 @@ PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int o Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::init(tnode, state)); _name = "PARTITIONED_AGGREGATION_SINK_OPERATOR"; - if (state->query_options().__isset.external_agg_partition_bits) { - _spill_partition_count_bits = state->query_options().external_agg_partition_bits; - } + _spill_partition_count = state->spill_aggregation_partition_count(); _agg_sink_operator->set_dests_id(DataSinkOperatorX<PartitionedAggSinkLocalState>::dests_id()); RETURN_IF_ERROR( diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 499db4919e7..dae3ee4f4b3 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -337,6 +337,6 @@ private: friend class PartitionedAggSinkLocalState; std::unique_ptr<AggSinkOperatorX> _agg_sink_operator; - size_t _spill_partition_count_bits = 5; + size_t _spill_partition_count = 32; }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 95675004c70..2e2c38f04c3 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -534,6 +534,19 @@ Status PartitionedHashJoinSinkOperatorX::_setup_internal_operator(RuntimeState* return Status::OK(); } +// After building hash table it will not be able to spill later +// even if memory is low, and will cause cancel of queries. +// So make a check here, if build blocks mem usage is too high, +// then trigger revoke memory. +static bool is_revocable_mem_high_watermark(RuntimeState* state, size_t revocable_size, + int64_t query_mem_limit) { + auto revocable_memory_high_watermark_percent = + state->spill_revocable_memory_high_watermark_percent(); + return revocable_memory_high_watermark_percent > 0 && + revocable_size >= + (double)query_mem_limit / 100.0 * revocable_memory_high_watermark_percent; +} + Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); @@ -575,16 +588,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B "sink_eos failed"); }); - // TODO: consider parallel? - // After building hash table it will not be able to spill later - // even if memory is low, and will cause cancel of queries. - // So make a check here, if build blocks mem usage is too high, - // then trigger revoke memory. - auto revocable_memory_high_watermark_percent = - state->revocable_memory_high_watermark_percent(); - if (revocable_memory_high_watermark_percent > 0 && - revocable_size >= (double)query_mem_limit / 100.0 * - revocable_memory_high_watermark_percent) { + if (is_revocable_mem_high_watermark(state, revocable_size, query_mem_limit)) { LOG(INFO) << fmt::format( "Query: {}, task {}, hash join sink {} eos, revoke_memory " "because revocable memory is high", @@ -636,11 +640,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B }); if (eos) { - auto revocable_memory_high_watermark_percent = - state->revocable_memory_high_watermark_percent(); - if (revocable_memory_high_watermark_percent > 0 && - revocable_size >= - (double)query_mem_limit / 100.0 * revocable_memory_high_watermark_percent) { + if (is_revocable_mem_high_watermark(state, revocable_size, query_mem_limit)) { LOG(INFO) << fmt::format( "Query: {}, task {}, hash join sink {} eos, revoke_memory " "because revocable memory is high", diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 2fa0c0ce8e1..03c4072f7de 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -155,7 +155,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); if (in_block->rows() > 0) { - local_state._shared_state->update_spill_block_batch_row_count(in_block); + local_state._shared_state->update_spill_block_batch_row_count(state, in_block); } local_state._eos = eos; DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::sink", @@ -201,8 +201,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, auto status = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( state, _spilling_stream, print_id(state->query_id()), "sort", _parent->node_id(), - _shared_state->spill_block_batch_row_count, - SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile()); + _shared_state->spill_block_batch_row_count, state->spill_sort_batch_bytes(), profile()); RETURN_IF_ERROR(status); _shared_state->sorted_streams.emplace_back(_spilling_stream); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 3464ecd847f..8a58d0b1504 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -29,11 +29,8 @@ namespace doris::pipeline { SpillSortLocalState::SpillSortLocalState(RuntimeState* state, OperatorXBase* parent) - : Base(state, parent) { - if (state->external_sort_bytes_threshold() > 0) { - _external_sort_bytes_threshold = state->external_sort_bytes_threshold(); - } -} + : Base(state, parent) {} + Status SpillSortLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); init_spill_write_counters(); @@ -67,8 +64,8 @@ Status SpillSortLocalState::close(RuntimeState* state) { dec_running_big_mem_op_num(state); return Base::close(state); } -int SpillSortLocalState::_calc_spill_blocks_to_merge() const { - int count = _external_sort_bytes_threshold / SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES; +int SpillSortLocalState::_calc_spill_blocks_to_merge(RuntimeState* state) const { + int count = state->spill_sort_mem_limit() / state->spill_sort_batch_bytes(); return std::max(2, count); } Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* state) { @@ -101,7 +98,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat vectorized::Block merge_sorted_block; vectorized::SpillStreamSPtr tmp_stream; while (!state->is_cancelled()) { - int max_stream_count = _calc_spill_blocks_to_merge(); + int max_stream_count = _calc_spill_blocks_to_merge(state); VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " << _parent->node_id() << " merge spill streams, streams count: " << _shared_state->sorted_streams.size() @@ -122,8 +119,8 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat { status = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( state, tmp_stream, print_id(state->query_id()), "sort", _parent->node_id(), - _shared_state->spill_block_batch_row_count, - SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile()); + _shared_state->spill_block_batch_row_count, state->spill_sort_batch_bytes(), + profile()); RETURN_IF_ERROR(status); _shared_state->sorted_streams.emplace_back(tmp_stream); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h b/be/src/pipeline/exec/spill_sort_source_operator.h index a7b8e8efde8..fae64e051f4 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.h +++ b/be/src/pipeline/exec/spill_sort_source_operator.h @@ -47,7 +47,7 @@ public: Status initiate_merge_sort_spill_streams(RuntimeState* state); protected: - int _calc_spill_blocks_to_merge() const; + int _calc_spill_blocks_to_merge(RuntimeState* state) const; Status _create_intermediate_merger(int num_blocks, const vectorized::SortDescription& sort_description); friend class SpillSortSourceOperatorX; @@ -55,7 +55,6 @@ protected: bool _opened = false; - int64_t _external_sort_bytes_threshold = 134217728; // 128M std::vector<vectorized::SpillStreamSPtr> _current_merging_streams; std::unique_ptr<vectorized::VSortedRunMerger> _merger; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index f86fb491d71..9acd38af7b0 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1365,7 +1365,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo auto tnode_ = tnode; /// TODO: support rf in partitioned hash join tnode_.runtime_filters.clear(); - const uint32_t partition_count = 32; + uint32_t partition_count = _runtime_state->spill_hash_join_partition_count(); auto inner_probe_operator = std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs); auto inner_sink_operator = diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 879bd647d96..2220c1fc41e 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -504,13 +504,6 @@ public: : 0; } - int64_t external_sort_bytes_threshold() const { - if (_query_options.__isset.external_sort_bytes_threshold) { - return _query_options.external_sort_bytes_threshold; - } - return 0; - } - void set_be_exec_version(int32_t version) noexcept { _query_options.be_exec_version = version; } inline bool enable_delete_sub_pred_v2() const { @@ -558,10 +551,6 @@ public: std::shared_ptr<IRuntimeFilter>* producer_filter); bool is_nereids() const; - bool enable_reserve_memory() const { - return _query_options.__isset.enable_reserve_memory && _query_options.enable_reserve_memory; - } - bool enable_spill() const { return (_query_options.__isset.enable_force_spill && _query_options.enable_force_spill) || (_query_options.__isset.enable_spill && _query_options.enable_spill); @@ -571,9 +560,8 @@ public: return _query_options.__isset.enable_force_spill && _query_options.enable_force_spill; } - bool enable_local_merge_sort() const { - return _query_options.__isset.enable_local_merge_sort && - _query_options.enable_local_merge_sort; + bool enable_reserve_memory() const { + return _query_options.__isset.enable_reserve_memory && _query_options.enable_reserve_memory; } int64_t min_revocable_mem() const { @@ -583,13 +571,46 @@ public: return 1; } - int revocable_memory_high_watermark_percent() const { + int64_t spill_sort_mem_limit() const { + if (_query_options.__isset.spill_sort_mem_limit) { + return std::max(_query_options.spill_sort_mem_limit, (int64_t)16777216); + } + return 134217728; + } + + int64_t spill_sort_batch_bytes() const { + if (_query_options.__isset.spill_sort_batch_bytes) { + return std::max(_query_options.spill_sort_batch_bytes, (int64_t)8388608); + } + return 8388608; + } + + int spill_aggregation_partition_count() const { + if (_query_options.__isset.spill_aggregation_partition_count) { + return std::min(std::max(_query_options.spill_aggregation_partition_count, 16), 8192); + } + return 32; + } + + int spill_hash_join_partition_count() const { + if (_query_options.__isset.spill_hash_join_partition_count) { + return std::min(std::max(_query_options.spill_hash_join_partition_count, 16), 8192); + } + return 32; + } + + int spill_revocable_memory_high_watermark_percent() const { if (_query_options.__isset.revocable_memory_high_watermark_percent) { return _query_options.revocable_memory_high_watermark_percent; } return -1; } + bool enable_local_merge_sort() const { + return _query_options.__isset.enable_local_merge_sort && + _query_options.enable_local_merge_sort; + } + size_t minimum_operator_memory_required_bytes() const { if (_query_options.__isset.minimum_operator_memory_required_kb) { return _query_options.minimum_operator_memory_required_kb * 1024; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 8de6f379ecd..bab6a34528e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -397,8 +397,6 @@ public class SessionVariable implements Serializable, Writable { public static final String INTERNAL_SESSION = "internal_session"; - public static final String PARTITIONED_HASH_AGG_ROWS_THRESHOLD = "partitioned_hash_agg_rows_threshold"; - public static final String PARTITION_PRUNING_EXPAND_THRESHOLD = "partition_pruning_expand_threshold"; public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN @@ -560,14 +558,17 @@ public class SessionVariable implements Serializable, Writable { public static final String HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES = "huge_table_lower_bound_size_in_bytes"; // for spill to disk - public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold"; - public static final String EXTERNAL_AGG_PARTITION_BITS = "external_agg_partition_bits"; - public static final String SPILL_STREAMING_AGG_MEM_LIMIT = "spill_streaming_agg_mem_limit"; - public static final String MIN_REVOCABLE_MEM = "min_revocable_mem"; public static final String ENABLE_SPILL = "enable_spill"; - public static final String REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT = "revocable_memory_high_watermark_percent"; - public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory"; public static final String ENABLE_FORCE_SPILL = "enable_force_spill"; + public static final String ENABLE_RESERVE_MEMORY = "enable_reserve_memory"; + public static final String SPILL_MIN_REVOCABLE_MEM = "spill_min_revocable_mem"; + public static final String SPILL_SORT_MEM_LIMIT = "spill_sort_mem_limit"; + // spill_sort_batch_bytes controls the memory size of a sindle block data of spill sort. + public static final String SPILL_SORT_BATCH_BYTES = "spill_sort_batch_bytes"; + public static final String SPILL_AGGREGATION_PARTITION_COUNT = "spill_aggregation_partition_count"; + public static final String SPILL_STREAMING_AGG_MEM_LIMIT = "spill_streaming_agg_mem_limit"; + public static final String SPILL_HASH_JOIN_PARTITION_COUNT = "spill_hash_join_partition_count"; + public static final String SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT = "spill_revocable_memory_high_watermark_percent"; public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks"; public static final String GENERATE_STATS_FACTOR = "generate_stats_factor"; @@ -1586,10 +1587,6 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = INTERNAL_SESSION) public boolean internalSession = false; - // Use partitioned hash join if build side row count >= the threshold . 0 - the threshold is not set. - @VariableMgr.VarAttr(name = PARTITIONED_HASH_AGG_ROWS_THRESHOLD, fuzzy = true) - public int partitionedHashAggRowsThreshold = 0; - @VariableMgr.VarAttr(name = PARTITION_PRUNING_EXPAND_THRESHOLD, fuzzy = true) public int partitionPruningExpandThreshold = 10; @@ -2187,10 +2184,6 @@ public class SessionVariable implements Serializable, Writable { public boolean disableEmptyPartitionPrune = false; // CLOUD_VARIABLES_END - // for spill to disk - @VariableMgr.VarAttr(name = MIN_REVOCABLE_MEM, fuzzy = true) - public long minRevocableMem = 32 * 1024 * 1024; - // fetch remote schema rpc timeout @VariableMgr.VarAttr(name = FETCH_REMOTE_SCHEMA_TIMEOUT_SECONDS, fuzzy = true) public long fetchRemoteSchemaTimeoutSeconds = 120; @@ -2198,14 +2191,6 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = MAX_FETCH_REMOTE_TABLET_COUNT, fuzzy = true) public int maxFetchRemoteTabletCount = 512; - @VariableMgr.VarAttr( - name = ENABLE_RESERVE_MEMORY, - description = {"控制是否启用分配内存前先reverve memory的功能。默认为 true。", - "Controls whether to enable reserve memory before allocating memory. " - + "The default value is true."}, - needForward = true, fuzzy = true) - public boolean enableReserveMemory = true; - @VariableMgr.VarAttr( name = "ENABLE_COMPRESS_MATERIALIZE", description = {"控制是否启用compress materialize。", @@ -2215,6 +2200,14 @@ public class SessionVariable implements Serializable, Writable { ) public boolean enableCompressMaterialize = false; + @VariableMgr.VarAttr( + name = DATA_QUEUE_MAX_BLOCKS, + description = {"DataQueue 中每个子队列允许最大的 block 个数", + "Max blocks in DataQueue."}, + needForward = true, fuzzy = true) + public long dataQueueMaxBlocks = 1; + + // for spill to disk @VariableMgr.VarAttr( name = ENABLE_SPILL, description = {"控制是否启用查询算子落盘。默认为 false。", @@ -2233,32 +2226,39 @@ public class SessionVariable implements Serializable, Writable { public boolean enableForceSpill = false; @VariableMgr.VarAttr( - name = DATA_QUEUE_MAX_BLOCKS, - description = {"DataQueue 中每个子队列允许最大的 block 个数", - "Max blocks in DataQueue."}, + name = ENABLE_RESERVE_MEMORY, + description = {"控制是否启用分配内存前先reverve memory的功能。默认为 true。", + "Controls whether to enable reserve memory before allocating memory. " + + "The default value is true."}, needForward = true, fuzzy = true) - public long dataQueueMaxBlocks = 1; + public boolean enableReserveMemory = true; - @VariableMgr.VarAttr(name = REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT, fuzzy = true) - public int revocableMemoryHighWatermarkPercent = -1; + @VariableMgr.VarAttr(name = SPILL_MIN_REVOCABLE_MEM, fuzzy = true) + public long spillMinRevocableMem = 32 * 1024 * 1024; - // If the memory consumption of sort node exceed this limit, will trigger spill to disk; - // Set to 0 to disable; min: 128M - public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152; - @VariableMgr.VarAttr(name = EXTERNAL_SORT_BYTES_THRESHOLD, - checker = "checkExternalSortBytesThreshold", varType = VariableAnnotation.DEPRECATED) - public long externalSortBytesThreshold = 0; + // spill_sort_mem_limit controls the memory usage during merge sort phase of spill sort. + // During merge sort phase, mutiple sorted blocks will be read into memory and do merge sort, + // the count of blocks should be controlled or else will cause OOM, it's calculated as + // std::max(spill_sort_mem_limit / spill_sort_batch_bytes, 2) + @VariableMgr.VarAttr(name = SPILL_SORT_MEM_LIMIT) + public long spillSortMemLimit = 134217728; // 128M + + @VariableMgr.VarAttr(name = SPILL_SORT_BATCH_BYTES) + public long spillSortBatchBytes = 8388608; // 8M + + @VariableMgr.VarAttr(name = SPILL_AGGREGATION_PARTITION_COUNT, fuzzy = true) + public int spillAggregationPartitionCount = 32; // The memory limit of streaming agg when spilling is enabled // NOTE: streaming agg operator will not spill to disk. @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT, fuzzy = true) public long spillStreamingAggMemLimit = 268435456; //256MB - public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4; - public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 20; - @VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS, - checker = "checkExternalAggPartitionBits", fuzzy = true) - public int externalAggPartitionBits = 5; // means that the hash table will be partitioned into 32 blocks. + @VariableMgr.VarAttr(name = SPILL_HASH_JOIN_PARTITION_COUNT, fuzzy = true) + public int spillHashJoinPartitionCount = 32; + + @VariableMgr.VarAttr(name = SPILL_REVOCABLE_MEMORY_HIGH_WATERMARK_PERCENT, fuzzy = true) + public int spillRevocableMemoryHighWatermarkPercent = -1; @VariableMgr.VarAttr(name = USE_MAX_LENGTH_OF_VARCHAR_IN_CTAS, needForward = true, description = { "在CTAS中,如果 CHAR / VARCHAR 列不来自于源表,是否是将这一列的长度设置为 MAX,即65533。默认为 true。", @@ -2372,7 +2372,6 @@ public class SessionVariable implements Serializable, Writable { // this.disableJoinReorder = random.nextBoolean(); this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean(); this.disableStreamPreaggregations = random.nextBoolean(); - this.partitionedHashAggRowsThreshold = random.nextBoolean() ? 8 : 1048576; this.enableShareHashTableForBroadcastJoin = random.nextBoolean(); // this.enableHashJoinEarlyStartProbe = random.nextBoolean(); this.enableParallelResultSink = random.nextBoolean(); @@ -2394,23 +2393,23 @@ public class SessionVariable implements Serializable, Writable { /* switch (randomInt) { case 0: - this.externalSortBytesThreshold = 0; + this.spillSortBytesThreshold = 0; this.externalAggBytesThreshold = 0; break; case 1: - this.externalSortBytesThreshold = 1; + this.spillSortBytesThreshold = 1; this.externalAggBytesThreshold = 1; - this.externalAggPartitionBits = 6; + this.spillAggregationPartitionCount = 6; break; case 2: - this.externalSortBytesThreshold = 1024 * 1024; + this.spillSortBytesThreshold = 1024 * 1024; this.externalAggBytesThreshold = 1024 * 1024; - this.externalAggPartitionBits = 8; + this.spillAggregationPartitionCount = 8; break; default: - this.externalSortBytesThreshold = 100 * 1024 * 1024 * 1024; + this.spillSortBytesThreshold = 100 * 1024 * 1024 * 1024; this.externalAggBytesThreshold = 100 * 1024 * 1024 * 1024; - this.externalAggPartitionBits = 4; + this.spillAggregationPartitionCount = 4; break; } */ @@ -2493,16 +2492,16 @@ public class SessionVariable implements Serializable, Writable { randomInt = random.nextInt(4); switch (randomInt) { case 0: - this.minRevocableMem = 0; + this.spillMinRevocableMem = 0; break; case 1: - this.minRevocableMem = 1; + this.spillMinRevocableMem = 1; break; case 2: - this.minRevocableMem = 1024 * 1024; + this.spillMinRevocableMem = 1024 * 1024; break; default: - this.minRevocableMem = 100L * 1024 * 1024 * 1024; + this.spillMinRevocableMem = 100L * 1024 * 1024 * 1024; break; } } else { @@ -3659,24 +3658,6 @@ public class SessionVariable implements Serializable, Writable { return dropTableIfCtasFailed; } - public void checkExternalSortBytesThreshold(String externalSortBytesThreshold) { - long value = Long.valueOf(externalSortBytesThreshold); - if (value > 0 && value < MIN_EXTERNAL_SORT_BYTES_THRESHOLD) { - LOG.warn("external sort bytes threshold: {}, min: {}", value, MIN_EXTERNAL_SORT_BYTES_THRESHOLD); - throw new UnsupportedOperationException("minimum value is " + MIN_EXTERNAL_SORT_BYTES_THRESHOLD); - } - } - - public void checkExternalAggPartitionBits(String externalAggPartitionBits) { - int value = Integer.valueOf(externalAggPartitionBits); - if (value < MIN_EXTERNAL_AGG_PARTITION_BITS || value > MAX_EXTERNAL_AGG_PARTITION_BITS) { - LOG.warn("external agg bytes threshold: {}, min: {}, max: {}", - value, MIN_EXTERNAL_AGG_PARTITION_BITS, MAX_EXTERNAL_AGG_PARTITION_BITS); - throw new UnsupportedOperationException("min value is " + MIN_EXTERNAL_AGG_PARTITION_BITS + " max value is " - + MAX_EXTERNAL_AGG_PARTITION_BITS); - } - } - public void checkQueryTimeoutValid(String newQueryTimeout) { int value = Integer.valueOf(newQueryTimeout); if (value <= 0) { @@ -3896,14 +3877,6 @@ public class SessionVariable implements Serializable, Writable { tResult.setSkipDeleteBitmap(skipDeleteBitmap); - tResult.setPartitionedHashAggRowsThreshold(partitionedHashAggRowsThreshold); - - tResult.setExternalSortBytesThreshold(externalSortBytesThreshold); - - tResult.setExternalAggBytesThreshold(0); // disable for now - - tResult.setSpillStreamingAggMemLimit(spillStreamingAggMemLimit); - tResult.setEnableFileCache(enableFileCache); tResult.setEnablePageCache(enablePageCache); @@ -3941,12 +3914,19 @@ public class SessionVariable implements Serializable, Writable { tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner); tResult.setSkipBadTablet(skipBadTablet); tResult.setDisableFileCache(disableFileCache); - tResult.setEnableReserveMemory(enableReserveMemory); + + // for spill tResult.setEnableSpill(enableSpill); tResult.setEnableForceSpill(enableForceSpill); - tResult.setExternalAggPartitionBits(externalAggPartitionBits); - tResult.setMinRevocableMem(minRevocableMem); - tResult.setRevocableMemoryHighWatermarkPercent(revocableMemoryHighWatermarkPercent); + tResult.setEnableReserveMemory(enableReserveMemory); + tResult.setMinRevocableMem(spillMinRevocableMem); + tResult.setSpillSortMemLimit(spillSortMemLimit); + tResult.setSpillSortBatchBytes(spillSortBatchBytes); + tResult.setSpillAggregationPartitionCount(spillAggregationPartitionCount); + tResult.setSpillStreamingAggMemLimit(spillStreamingAggMemLimit); + tResult.setSpillHashJoinPartitionCount(spillHashJoinPartitionCount); + tResult.setRevocableMemoryHighWatermarkPercent(spillRevocableMemoryHighWatermarkPercent); + tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks); tResult.setEnableLocalMergeSort(enableLocalMergeSort); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 8cf33a6218b..f99a88e55b6 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -196,9 +196,10 @@ struct TQueryOptions { 58: optional i32 repeat_max_num = 0 // Deprecated + // deprecated, use spill_sort_mem_limit 59: optional i64 external_sort_bytes_threshold = 0 - // deprecated + // Not used any more 60: optional i32 partitioned_hash_agg_rows_threshold = 0 61: optional bool enable_file_cache = false @@ -214,9 +215,10 @@ struct TQueryOptions { 66: optional i32 parallel_instance = 1 // Indicate where useServerPrepStmts enabled 67: optional bool mysql_row_binary_format = false; + // Not used anymore 68: optional i64 external_agg_bytes_threshold = 0 - // partition count(1 << external_agg_partition_bits) when spill aggregation data into disk + // Not used anymore, use spill_aggregation_partition_count 69: optional i32 external_agg_partition_bits = 4 // Specify base path for file cache @@ -369,6 +371,10 @@ struct TQueryOptions { 145: optional bool enable_spill = false 146: optional bool enable_reserve_memory = true 147: optional i32 revocable_memory_high_watermark_percent = -1 + 148: optional i64 spill_sort_mem_limit = 134217728 + 149: optional i64 spill_sort_batch_bytes = 8388608 + 150: optional i32 spill_aggregation_partition_count = 32 + 151: optional i32 spill_hash_join_partition_count = 32 // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. diff --git a/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy b/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy index eaf8c7e996b..2918d2ffbcd 100644 --- a/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy +++ b/regression-test/suites/datatype_p0/nested_types/query/test_nested_type_with_resize.groovy @@ -65,9 +65,9 @@ suite("test_nested_type_with_resize") { } } - order_qt_sql """ /*set ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re [...] - order_qt_sql """ /*set ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re [...] - order_qt_sql """ /*set ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re [...] - order_qt_sql """ /*set ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re [...] + order_qt_sql """ /*set ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re [...] + order_qt_sql """ /*set ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re [...] + order_qt_sql """ /*set ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re [...] + order_qt_sql """ /*set ShuffleSendBytes=0|ShuffleSendRows=0|FuzzyVariables=batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=5,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,parallel_prepare_threshold=13,enable_fold_constant_by_be=true,enable_re [...] } diff --git a/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy b/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy index 493873fd5c8..d81f01aeeaa 100644 --- a/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy +++ b/regression-test/suites/nereids_rules_p0/mv/variant/variant_mv.groovy @@ -574,7 +574,7 @@ suite("variant_mv") { where g2.actor['id'] > 34259289; """ def query3_6 = """ - SELECT /*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=0,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=32,parallel_scan_min_rows_per_scanner=64,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=1,enable_parallel_result_sink=false,ena [...] + SELECT /*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=3,parallel_pipeline_task_num=0,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=32,parallel_scan_min_rows_per_scanner=64,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=1,enable_parallel_result_sink=false,ena [...] g1.id, g2.type, floor(cast(g1.actor['id'] as int) + 100.5), diff --git a/regression-test/suites/variant_p0/load.groovy b/regression-test/suites/variant_p0/load.groovy index 5abc3346f4d..1b93ecc9747 100644 --- a/regression-test/suites/variant_p0/load.groovy +++ b/regression-test/suites/variant_p0/load.groovy @@ -290,7 +290,7 @@ suite("regression_test_variant", "p0"){ sql """insert into ${table_name} values (5, '{"i" : 1}'), (1, '{"a" : 1}')""" sql """insert into ${table_name} values (6, '{"j" : 1}'), (1, '{"a" : 1}')""" sql """insert into ${table_name} values (6, '{"k" : 1}'), (1, '{"a" : 1}')""" - sql "select /*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=1,parallel_pipeline_task_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink= [...] + sql "select /*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=1,parallel_pipeline_task_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=true,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink= [...] qt_sql_36_1 "select cast(v['a'] as int), cast(v['b'] as int), cast(v['c'] as int) from ${table_name} order by k limit 10" sql "DELETE FROM ${table_name} WHERE k=1" sql "select * from ${table_name}" diff --git a/regression-test/suites/variant_p0/nested.groovy b/regression-test/suites/variant_p0/nested.groovy index 584ba0e336d..6ae7ee2c3a7 100644 --- a/regression-test/suites/variant_p0/nested.groovy +++ b/regression-test/suites/variant_p0/nested.groovy @@ -133,7 +133,7 @@ suite("regression_test_variant_nested", "p0"){ qt_sql """select /*+SET_VAR(batch_size=1024,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_ parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16 -,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=true,sort_phase_num=0,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=true,partitioned_hash_join_rows_threshold=8,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadca [...] +,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_parallel_result_sink=true,sort_phase_num=0,enable_nereids_planner=true,rewrite_or_to_in_predicate_threshold=2,enable_function_pushdown=true,enable_common_expr_pushdown=false,enable_local_exchange=true,partitioned_hash_join_rows_threshold=8,partitioned_hash_agg_rows_threshold=8,partition_pruning_expand_threshold=10,enable_share_hash_table_for_broadca [...] qt_sql """select * from var_nested where v['k2'] = 'some' and array_contains(cast(v['nested1']['nested2']['a'] as array<tinyint>), 10) order by k limit 1;""" sql """INSERT INTO var_nested SELECT *, '{"k1":1, "k2": "some", "k3" : [1234], "k4" : 1.10000, "k5" : [[123]], "nested1" : {"nested2" : [{"a" : 10, "b" : 1.1, "c" : "1111"}]}}' FROM numbers("number" = "4096") where number > 1024 limit 1024;""" @@ -162,7 +162,7 @@ parallel_pipeline_task_num=7,parallel_fragment_exec_instance_num=4,profile_level properties("replication_num" = "1", "disable_auto_compaction" = "false", "enable_unique_key_merge_on_write" = "true", "variant_enable_flatten_nested" = "true"); """ sql """insert into var_nested2 select * from var_nested order by k limit 1024""" - qt_sql """select /*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_res [...] + qt_sql """select /*+SET_VAR(batch_size=4064,broker_load_batch_size=16352,disable_streaming_preaggregations=true,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=5,parallel_pipeline_task_num=1,profile_level=1,enable_pipeline_engine=false,enable_parallel_scan=true,parallel_scan_max_scanners_count=48,parallel_scan_min_rows_per_scanner=16384,enable_fold_constant_by_be=true,enable_rewrite_element_at_to_slot=true,runtime_filter_type=12,enable_parallel_res [...] qt_sql """select v['nested'] from var_nested2 where k < 10 order by k limit 10;""" // 0. nomal explode variant array order_qt_explode_sql """select count(),cast(vv['xx'] as int) from var_nested lateral view explode_variant_array(v['nested']) tmp as vv where vv['xx'] = 10 group by cast(vv['xx'] as int)""" diff --git a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy index 4226de14d81..4b9cc317318 100644 --- a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy +++ b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy @@ -139,7 +139,7 @@ suite("variant_sub_path_pruning", "variant_type"){ // two children order_qt_sql """ - select /*+SET_VAR(batch_size=50,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=6,parallel_pipeline_task_num=2,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_nereids_planner=true,rewrite_or_to_in_predicate_thresh [...] + select /*+SET_VAR(batch_size=50,disable_streaming_preaggregations=false,enable_distinct_streaming_aggregation=true,parallel_fragment_exec_instance_num=6,parallel_pipeline_task_num=2,profile_level=1,enable_pipeline_engine=true,enable_parallel_scan=false,parallel_scan_max_scanners_count=16,parallel_scan_min_rows_per_scanner=128,enable_fold_constant_by_be=false,enable_rewrite_element_at_to_slot=true,runtime_filter_type=2,enable_nereids_planner=true,rewrite_or_to_in_predicate_thresh [...] """ order_qt_sql """select c1['a'] from (select dt as c1 from pruning_test union all select dt as c1 from pruning_test) v1;""" order_qt_sql """select c1['b'] from (select dt['a'] as c1 from pruning_test union all select dt['a'] as c1 from pruning_test) v1;""" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org