This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push: new 536de88fd22 Fix bugs and undefined behaviors (#45410) 536de88fd22 is described below commit 536de88fd228356ff1a626d0b2045b3468232580 Author: Jerry Hu <hushengg...@selectdb.com> AuthorDate: Tue Dec 17 10:58:58 2024 +0800 Fix bugs and undefined behaviors (#45410) --- be/src/common/config.cpp | 4 + be/src/common/config.h | 1 + be/src/pipeline/exec/hashjoin_build_sink.cpp | 9 +- be/src/pipeline/exec/multi_cast_data_streamer.cpp | 49 +++++--- be/src/pipeline/exec/operator.h | 5 + .../pipeline/exec/partition_sort_sink_operator.cpp | 4 +- .../exec/partitioned_aggregation_sink_operator.cpp | 6 + .../exec/partitioned_aggregation_sink_operator.h | 2 + .../partitioned_aggregation_source_operator.cpp | 25 ++++- .../exec/partitioned_hash_join_probe_operator.cpp | 85 +++++++------- .../exec/partitioned_hash_join_probe_operator.h | 1 - .../exec/partitioned_hash_join_sink_operator.cpp | 125 ++++++--------------- .../exec/partitioned_hash_join_sink_operator.h | 6 +- be/src/pipeline/exec/scan_operator.cpp | 14 ++- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 5 + be/src/pipeline/exec/spill_sort_sink_operator.h | 1 + be/src/pipeline/exec/spill_utils.h | 6 + be/src/pipeline/pipeline_fragment_context.cpp | 3 +- be/src/pipeline/pipeline_task.cpp | 26 ++++- be/src/pipeline/pipeline_task.h | 1 + be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/query_context.h | 4 + .../workload_group/workload_group_manager.cpp | 13 +-- be/src/vec/common/columns_hashing.h | 8 +- be/src/vec/core/block.cpp | 3 +- be/src/vec/functions/function_string.h | 2 + .../correctness_p0/test_mask_function.groovy | 21 ++++ 27 files changed, 254 insertions(+), 177 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 89fcea3372d..f8b7b390ee9 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -16,6 +16,7 @@ // under the License. #include <fmt/core.h> +#include <gflags/gflags.h> #include <stdint.h> #include <algorithm> @@ -1263,6 +1264,9 @@ DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool { }); DEFINE_Int32(spill_io_thread_pool_queue_size, "102400"); +// paused query in queue timeout(ms) will be resumed or canceled +DEFINE_Int64(spill_in_paused_queue_timeout_ms, "60000"); + DEFINE_mBool(check_segment_when_build_rowset_meta, "false"); DEFINE_mInt32(max_s3_client_retry, "10"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 8e308c1794d..1b9b8a3d531 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1344,6 +1344,7 @@ DECLARE_mInt32(spill_gc_interval_ms); DECLARE_mInt32(spill_gc_work_time_ms); DECLARE_Int32(spill_io_thread_pool_thread_num); DECLARE_Int32(spill_io_thread_pool_queue_size); +DECLARE_Int64(spill_in_paused_queue_timeout_ms); DECLARE_mBool(check_segment_when_build_rowset_meta); diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index a8122dd11ed..02bc358e958 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -26,6 +26,7 @@ #include "pipeline/exec/operator.h" #include "pipeline/pipeline_task.h" #include "util/pretty_printer.h" +#include "vec/columns/column_nullable.h" #include "vec/core/block.h" #include "vec/data_types/data_type_nullable.h" #include "vec/utils/template_helpers.hpp" @@ -153,12 +154,18 @@ size_t HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state, bo if (build_block_rows > 0) { auto block = _build_side_mutable_block.to_block(); + std::vector<uint16_t> converted_columns; Defer defer([&]() { + for (auto i : converted_columns) { + auto& data = block.get_by_position(i); + data.column = vectorized::remove_nullable(data.column); + data.type = vectorized::remove_nullable(data.type); + } _build_side_mutable_block = vectorized::MutableBlock(std::move(block)); }); vectorized::ColumnUInt8::MutablePtr null_map_val; if (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == TJoinOp::FULL_OUTER_JOIN) { - _convert_block_to_null(block); + converted_columns = _convert_block_to_null(block); // first row is mocked for (int i = 0; i < block.columns(); i++) { auto [column, is_const] = unpack_if_const(block.safe_get_by_position(i).column); diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index 8fc3bc6015c..9f0c653b1b7 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -128,6 +128,8 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, vectoriz return Status::OK(); } + DCHECK_GT(pos_to_pull->_un_finish_copy, 0); + DCHECK_LE(pos_to_pull->_un_finish_copy, _cast_sender_count); *block = *pos_to_pull->_block; multi_cast_block = &(*pos_to_pull); @@ -156,6 +158,8 @@ Status MultiCastDataStreamer::_copy_block(RuntimeState* state, int32_t sender_id multi_cast_block._un_finish_copy--; auto copying_count = _copying_count.fetch_sub(1) - 1; if (multi_cast_block._un_finish_copy == 0) { + DCHECK_EQ(_multi_cast_blocks.front()._un_finish_copy, 0); + DCHECK_EQ(&(_multi_cast_blocks.front()), &multi_cast_block); _multi_cast_blocks.pop_front(); _write_dependency->set_ready(); } else if (copying_count == 0) { @@ -167,7 +171,7 @@ Status MultiCastDataStreamer::_copy_block(RuntimeState* state, int32_t sender_id } Status MultiCastDataStreamer::_trigger_spill_if_need(RuntimeState* state, bool* triggered) { - if (!state->enable_spill() && !state->enable_force_spill()) { + if (!state->enable_spill()) { *triggered = false; return Status::OK(); } @@ -232,6 +236,7 @@ Status MultiCastDataStreamer::_submit_spill_task(RuntimeState* state, vectorized::SpillStreamSPtr spill_stream) { std::vector<vectorized::Block> blocks; for (auto& block : _multi_cast_blocks) { + DCHECK_GT(block._block->rows(), 0); blocks.emplace_back(std::move(*block._block)); } @@ -288,6 +293,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block std::lock_guard l(_mutex); if (_pending_block) { + DCHECK_GT(_pending_block->rows(), 0); const auto pending_size = _pending_block->allocated_bytes(); _cumulative_mem_size += pending_size; _multi_cast_blocks.emplace_back(_pending_block.get(), _cast_sender_count, pending_size); @@ -306,24 +312,33 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size.load(), _peak_mem_usage->value())); - if (!eos) { - bool spilled = false; - RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled)); - if (spilled) { - _pending_block = - vectorized::Block::create_unique(block->get_columns_with_type_and_name()); - block->clear(); - return Status::OK(); + if (rows > 0) { + if (!eos) { + bool spilled = false; + RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled)); + if (spilled) { + _pending_block = vectorized::Block::create_unique( + block->get_columns_with_type_and_name()); + block->clear(); + return Status::OK(); + } } - } - _multi_cast_blocks.emplace_back(block, _cast_sender_count, block_mem_size); - // last elem - auto end = std::prev(_multi_cast_blocks.end()); - for (int i = 0; i < _sender_pos_to_read.size(); ++i) { - if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) { - _sender_pos_to_read[i] = end; - _set_ready_for_read(i); + _multi_cast_blocks.emplace_back(block, _cast_sender_count, block_mem_size); + + // last elem + auto end = std::prev(_multi_cast_blocks.end()); + for (int i = 0; i < _sender_pos_to_read.size(); ++i) { + if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) { + _sender_pos_to_read[i] = end; + _set_ready_for_read(i); + } + } + } else if (eos) { + for (int i = 0; i < _sender_pos_to_read.size(); ++i) { + if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) { + _set_ready_for_read(i); + } } } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 590e04a2445..723e14f4bc7 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -589,6 +589,10 @@ public: return state->minimum_operator_memory_required_bytes(); } + [[nodiscard]] virtual bool is_spilled(RuntimeState* state) const { return false; } + + [[nodiscard]] bool is_spillable() const { return _spillable; } + template <class TARGET> TARGET& cast() { DCHECK(dynamic_cast<TARGET*>(this)) @@ -651,6 +655,7 @@ protected: const int _operator_id; const int _node_id; int _nereids_id = -1; + bool _spillable = false; std::vector<int> _dests_id; std::string _name; }; diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index b90af92f6b1..759c7ea2bcc 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -70,7 +70,9 @@ PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, int ope _topn_phase(tnode.partition_sort_node.ptopn_phase), _has_global_limit(tnode.partition_sort_node.has_global_limit), _top_n_algorithm(tnode.partition_sort_node.top_n_algorithm), - _partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) {} + _partition_inner_limit(tnode.partition_sort_node.partition_inner_limit) { + _spillable = true; +} Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 8a29a88c82c..6f008a3b1f2 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -148,6 +148,7 @@ PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int o : DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id, tnode.node_id) { _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, tnode, descs, require_bucket_distribution); + _spillable = true; } Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { @@ -340,4 +341,9 @@ Status PartitionedAggSinkLocalState::revoke_memory( std::move(spill_runnable)); } +bool PartitionedAggSinkOperatorX::is_spilled(RuntimeState* state) const { + auto& local_state = get_local_state(state); + return local_state._shared_state->is_spilled; +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 4b022dd1c5a..922798707d0 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -333,6 +333,8 @@ public: size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; + bool is_spilled(RuntimeState* state) const override; + private: friend class PartitionedAggSinkLocalState; std::unique_ptr<AggSinkOperatorX> _agg_sink_operator; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 6bd601383f7..57ebfd24558 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -17,10 +17,13 @@ #include "partitioned_aggregation_source_operator.h" +#include <glog/logging.h> + #include <string> #include "aggregation_source_operator.h" #include "common/exception.h" +#include "common/logging.h" #include "common/status.h" #include "pipeline/exec/operator.h" #include "pipeline/exec/spill_utils.h" @@ -257,7 +260,9 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b size_t accumulated_blocks_size = 0; while (!state->is_cancelled() && !has_agg_data && !_shared_state->spill_partitions.empty()) { - for (auto& stream : _shared_state->spill_partitions[0]->spill_streams_) { + while (!_shared_state->spill_partitions[0]->spill_streams_.empty() && + !state->is_cancelled()) { + auto& stream = _shared_state->spill_partitions[0]->spill_streams_[0]; stream->set_read_counters(profile()); vectorized::Block block; bool eos = false; @@ -291,10 +296,20 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b if (_current_partition_eos) { (void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream); - _shared_state->spill_partitions.pop_front(); + _shared_state->spill_partitions[0]->spill_streams_.pop_front(); } } + + if (_shared_state->spill_partitions[0]->spill_streams_.empty()) { + _shared_state->spill_partitions.pop_front(); + } } + + VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << _parent->node_id() + << ", task id: " << state->task_id() << " recover partitioned finished, " + << _shared_state->spill_partitions.size() << " partitions left, " + << accumulated_blocks_size + << " bytes read, spill dep: " << (void*)(_spill_dependency.get()); return status; }; @@ -308,6 +323,8 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b }); auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); }); }(); + LOG_IF(INFO, !status.ok()) << "Query : " << print_id(query_id) + << " recover exception : " << status.to_string(); return status; }; @@ -317,6 +334,10 @@ Status PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b }); _spill_dependency->block(); + VLOG_DEBUG << "Query: " << print_id(query_id) << " agg node " << _parent->node_id() + << ", task id: " << state->task_id() << " begin to recover, " + << _shared_state->spill_partitions.size() + << " partitions left, _spill_dependency: " << (void*)(_spill_dependency.get()); return ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit( std::make_shared<SpillRecoverRunnable>(state, _spill_dependency, _runtime_profile.get(), _shared_state->shared_from_this(), diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index e3f990c1667..bbbcb9b9d5e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -408,20 +408,21 @@ std::string PartitionedHashJoinProbeLocalState::debug_string(int indentation_lev bool need_more_input_data; if (_shared_state->need_to_spill) { need_more_input_data = !_child_eos; - } else if (_runtime_state) { - need_more_input_data = p._inner_probe_operator->need_more_input_data(_runtime_state.get()); + } else if (_shared_state->inner_runtime_state) { + need_more_input_data = p._inner_probe_operator->need_more_input_data( + _shared_state->inner_runtime_state.get()); } else { need_more_input_data = true; } fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}, need_to_spill: {}, child_eos: {}, " - "_runtime_state: {}, need_more_input_data: {}", + "_shared_state->inner_runtime_state: {}, need_more_input_data: {}", PipelineXSpillLocalState<PartitionedHashJoinSharedState>::debug_string( indentation_level), _shared_state ? std::to_string(_shared_state->short_circuit_for_probe) : "NULL", - _shared_state->need_to_spill, _child_eos, _runtime_state != nullptr, - need_more_input_data); + _shared_state->need_to_spill, _child_eos, + _shared_state->inner_runtime_state != nullptr, need_more_input_data); return fmt::to_string(debug_string_buffer); } @@ -528,7 +529,7 @@ Status PartitionedHashJoinProbeOperatorX::init(const TPlanNode& tnode, RuntimeSt auto tnode_ = _tnode; tnode_.runtime_filters.clear(); - for (auto& conjunct : tnode.hash_join_node.eq_join_conjuncts) { + for (const auto& conjunct : tnode.hash_join_node.eq_join_conjuncts) { _probe_exprs.emplace_back(conjunct.left); } _partitioner = std::make_unique<SpillPartitionerType>(_partition_count); @@ -618,11 +619,10 @@ Status PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized:: Status PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill( PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) { DCHECK(local_state._shared_state->inner_runtime_state); - local_state._runtime_state = std::move(local_state._shared_state->inner_runtime_state); local_state._in_mem_shared_state_sptr = std::move(local_state._shared_state->inner_shared_state); - auto* sink_state = local_state._runtime_state->get_sink_local_state(); + auto* sink_state = local_state._shared_state->inner_runtime_state->get_sink_local_state(); if (sink_state != nullptr) { COUNTER_SET(local_state._hash_table_memory_usage, sink_state->profile()->get_counter("MemoryUsageHashTable")->value()); @@ -632,21 +632,22 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( PartitionedHashJoinProbeLocalState& local_state, RuntimeState* state) const { - if (local_state._runtime_state) { + if (local_state._shared_state->inner_runtime_state) { _update_profile_from_internal_states(local_state); } - local_state._runtime_state = RuntimeState::create_unique( + local_state._shared_state->inner_runtime_state = RuntimeState::create_unique( nullptr, state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); - local_state._runtime_state->set_task_execution_context( + local_state._shared_state->inner_runtime_state->set_task_execution_context( state->get_task_execution_context().lock()); - local_state._runtime_state->set_be_number(state->be_number()); + local_state._shared_state->inner_runtime_state->set_be_number(state->be_number()); - local_state._runtime_state->set_desc_tbl(&state->desc_tbl()); - local_state._runtime_state->resize_op_id_to_local_state(-1); - local_state._runtime_state->set_runtime_filter_mgr(state->local_runtime_filter_mgr()); + local_state._shared_state->inner_runtime_state->set_desc_tbl(&state->desc_tbl()); + local_state._shared_state->inner_runtime_state->resize_op_id_to_local_state(-1); + local_state._shared_state->inner_runtime_state->set_runtime_filter_mgr( + state->local_runtime_filter_mgr()); local_state._in_mem_shared_state_sptr = _inner_sink_operator->create_shared_state(); @@ -654,23 +655,23 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( LocalSinkStateInfo info {0, local_state._internal_runtime_profile.get(), -1, local_state._in_mem_shared_state_sptr.get(), {}, {}}; - RETURN_IF_ERROR( - _inner_sink_operator->setup_local_state(local_state._runtime_state.get(), info)); + RETURN_IF_ERROR(_inner_sink_operator->setup_local_state( + local_state._shared_state->inner_runtime_state.get(), info)); LocalStateInfo state_info {local_state._internal_runtime_profile.get(), {}, local_state._in_mem_shared_state_sptr.get(), {}, 0}; - RETURN_IF_ERROR( - _inner_probe_operator->setup_local_state(local_state._runtime_state.get(), state_info)); + RETURN_IF_ERROR(_inner_probe_operator->setup_local_state( + local_state._shared_state->inner_runtime_state.get(), state_info)); - auto* sink_local_state = local_state._runtime_state->get_sink_local_state(); + auto* sink_local_state = local_state._shared_state->inner_runtime_state->get_sink_local_state(); DCHECK(sink_local_state != nullptr); RETURN_IF_ERROR(sink_local_state->open(state)); - auto* probe_local_state = - local_state._runtime_state->get_local_state(_inner_probe_operator->operator_id()); + auto* probe_local_state = local_state._shared_state->inner_runtime_state->get_local_state( + _inner_probe_operator->operator_id()); DCHECK(probe_local_state != nullptr); RETURN_IF_ERROR(probe_local_state->open(state)); @@ -686,13 +687,15 @@ Status PartitionedHashJoinProbeOperatorX::_setup_internal_operators( "fault_inject partitioned_hash_join_probe sink failed"); }); - RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._runtime_state.get(), &block, true)); + RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._shared_state->inner_runtime_state.get(), + &block, true)); VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", internal build operator finished, node id: " << node_id() << ", task id: " << state->task_id() << ", partition: " << local_state._partition_cursor << "rows: " << block.rows() << ", usage: " - << _inner_sink_operator->get_memory_usage(local_state._runtime_state.get()); + << _inner_sink_operator->get_memory_usage( + local_state._shared_state->inner_runtime_state.get()); COUNTER_SET(local_state._hash_table_memory_usage, sink_local_state->profile()->get_counter("MemoryUsageHashTable")->value()); @@ -735,7 +738,7 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, } } bool in_mem_eos = false; - auto* runtime_state = local_state._runtime_state.get(); + auto* runtime_state = local_state._shared_state->inner_runtime_state.get(); while (_inner_probe_operator->need_more_input_data(runtime_state)) { if (probe_blocks.empty()) { *eos = false; @@ -761,8 +764,8 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, } } - RETURN_IF_ERROR(_inner_probe_operator->pull(local_state._runtime_state.get(), output_block, - &in_mem_eos)); + RETURN_IF_ERROR(_inner_probe_operator->pull( + local_state._shared_state->inner_runtime_state.get(), output_block, &in_mem_eos)); *eos = false; if (in_mem_eos) { @@ -784,8 +787,9 @@ bool PartitionedHashJoinProbeOperatorX::need_more_input_data(RuntimeState* state auto& local_state = get_local_state(state); if (local_state._shared_state->need_to_spill) { return !local_state._child_eos; - } else if (local_state._runtime_state) { - return _inner_probe_operator->need_more_input_data(local_state._runtime_state.get()); + } else if (local_state._shared_state->inner_runtime_state) { + return _inner_probe_operator->need_more_input_data( + local_state._shared_state->inner_runtime_state.get()); } else { return true; } @@ -898,11 +902,12 @@ bool PartitionedHashJoinProbeOperatorX::_should_revoke_memory(RuntimeState* stat void PartitionedHashJoinProbeOperatorX::_update_profile_from_internal_states( PartitionedHashJoinProbeLocalState& local_state) const { - if (local_state._runtime_state) { - auto* sink_local_state = local_state._runtime_state->get_sink_local_state(); + if (local_state._shared_state->inner_runtime_state) { + auto* sink_local_state = + local_state._shared_state->inner_runtime_state->get_sink_local_state(); local_state.update_build_profile(sink_local_state->profile()); - auto* probe_local_state = - local_state._runtime_state->get_local_state(_inner_probe_operator->operator_id()); + auto* probe_local_state = local_state._shared_state->inner_runtime_state->get_local_state( + _inner_probe_operator->operator_id()); local_state.update_probe_profile(probe_local_state->profile()); } } @@ -953,13 +958,13 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori return _revoke_memory(state); } } else { - if (UNLIKELY(!local_state._runtime_state)) { + if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) { RETURN_IF_ERROR(_setup_internal_operator_for_non_spill(local_state, state)); } - RETURN_IF_ERROR(_inner_probe_operator->push(local_state._runtime_state.get(), - local_state._child_block.get(), - local_state._child_eos)); + RETURN_IF_ERROR(_inner_probe_operator->push( + local_state._shared_state->inner_runtime_state.get(), + local_state._child_block.get(), local_state._child_eos)); } } @@ -969,11 +974,11 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori if (need_to_spill) { RETURN_IF_ERROR(pull(state, block, eos)); } else { - RETURN_IF_ERROR( - _inner_probe_operator->pull(local_state._runtime_state.get(), block, eos)); + RETURN_IF_ERROR(_inner_probe_operator->pull( + local_state._shared_state->inner_runtime_state.get(), block, eos)); if (*eos) { _update_profile_from_internal_states(local_state); - local_state._runtime_state.reset(); + local_state._shared_state->inner_runtime_state.reset(); } } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index f020c0a832a..7b77e1e6e3f 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -81,7 +81,6 @@ private: std::vector<vectorized::SpillStreamSPtr> _probe_spilling_streams; std::unique_ptr<vectorized::PartitionerBase> _partitioner; - std::unique_ptr<RuntimeState> _runtime_state; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; bool _need_to_setup_internal_operators {true}; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 220115c085e..672eb36a907 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -290,7 +290,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task " << state->task_id() << " sink " << _parent->node_id() << " revoke_memory" << ", eos: " << _child_eos; - DCHECK_EQ(_spilling_task_count, 0); CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr); if (!_shared_state->need_to_spill) { @@ -299,18 +298,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( return _revoke_unpartitioned_block(state, spill_context); } - _spilling_task_count = _shared_state->partitioned_build_blocks.size(); - auto query_id = state->query_id(); auto* spill_io_pool = ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool(); DCHECK(spill_io_pool != nullptr); auto spill_fin_cb = [this, state, query_id, spill_context]() { - if (_spilling_task_count.fetch_sub(1) != 1) { - return Status::OK(); - } - Status status; if (_child_eos) { VLOG_DEBUG << "Query:" << print_id(this->state()->query_id()) << ", task " @@ -330,99 +323,49 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory( spill_context->on_task_finished(); } - std::lock_guard<std::mutex> lock(_spill_mutex); _spill_dependency->set_ready(); return status; }; - for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) { - vectorized::SpillStreamSPtr& spilling_stream = _shared_state->spilled_streams[i]; - auto& mutable_block = _shared_state->partitioned_build_blocks[i]; - - if (!mutable_block || - mutable_block->allocated_bytes() < vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { - --_spilling_task_count; - continue; - } - - DCHECK(spilling_stream != nullptr); + auto spill_runnable = std::make_shared<SpillSinkRunnable>( + state, nullptr, nullptr, _profile, _shared_state->shared_from_this(), + [this, query_id] { + DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { + auto status = Status::InternalError( + "fault_inject partitioned_hash_join_sink " + "revoke_memory canceled"); + ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status); + return status; + }); + SCOPED_TIMER(_spill_build_timer); - MonotonicStopWatch submit_timer; - submit_timer.start(); + for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); ++i) { + vectorized::SpillStreamSPtr& spilling_stream = + _shared_state->spilled_streams[i]; + DCHECK(spilling_stream != nullptr); + auto& mutable_block = _shared_state->partitioned_build_blocks[i]; - Status st; - DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_sink::revoke_memory_submit_func", { - st = Status::Error<INTERNAL_ERROR>( - "fault_inject partitioned_hash_join_sink revoke_memory submit_func failed"); - }); - // For every stream, the task counter is increased +1 - // so that when a stream finished, it should desc -1 - state->get_query_ctx()->increase_revoking_tasks_count(); - auto spill_runnable = std::make_shared<SpillSinkRunnable>( - state, nullptr, nullptr, _profile, _shared_state->shared_from_this(), - [this, query_id, spilling_stream, i] { - DBUG_EXECUTE_IF( - "fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", { - auto status = Status::InternalError( - "fault_inject partitioned_hash_join_sink " - "revoke_memory canceled"); - ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, - status); - return status; - }); - SCOPED_TIMER(_spill_build_timer); + if (!mutable_block || + mutable_block->allocated_bytes() < + vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + continue; + } auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return _spill_to_disk(i, spilling_stream)); }(); - _state->get_query_ctx()->decrease_revoking_tasks_count(); - return status; - }, - spill_fin_cb); - if (st.ok()) { - st = spill_io_pool->submit(std::move(spill_runnable)); - } - - if (!st.ok()) { - --_spilling_task_count; - return st; - } - } - - if (_spilling_task_count.load() > 0) { - std::lock_guard<std::mutex> lock(_spill_mutex); - if (_spilling_task_count.load() > 0) { - _spill_dependency->block(); - return Status::OK(); - } - } - - if (_child_eos) { - VLOG_DEBUG << "Query:" << print_id(state->query_id()) << ", task " << state->task_id() - << " sink " << _parent->node_id() << " set_ready_to_read"; - std::for_each(_shared_state->partitioned_build_blocks.begin(), - _shared_state->partitioned_build_blocks.end(), [&](auto& block) { - if (block) { - COUNTER_UPDATE(_in_mem_rows_counter, block->rows()); - } - }); - RETURN_IF_ERROR(_finish_spilling()); - _dependency->set_ready_to_read(); + RETURN_IF_ERROR(status); + } + return Status::OK(); + }, + spill_fin_cb); - if (spill_context) { - spill_context->on_task_finished(); - } - } - return Status::OK(); + _spill_dependency->block(); + return spill_io_pool->submit(std::move(spill_runnable)); } Status PartitionedHashJoinSinkLocalState::_finish_spilling() { - bool expected = false; - if (!_spilling_finished.compare_exchange_strong(expected, true)) { - return Status::OK(); - } - for (auto& stream : _shared_state->spilled_streams) { if (stream) { RETURN_IF_ERROR(stream->spill_eof()); @@ -504,7 +447,9 @@ PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(ObjectPool* p : std::vector<TExpr> {}), _tnode(tnode), _descriptor_tbl(descs), - _partition_count(partition_count) {} + _partition_count(partition_count) { + _spillable = true; +} Status PartitionedHashJoinSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(JoinBuildSinkOperatorX::init(tnode, state)); @@ -661,10 +606,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, vectorized::B "fault_inject partitioned_hash_join_sink " "sink failed"); }); - Defer defer {[&]() { local_state.update_memory_usage(); }}; RETURN_IF_ERROR(_inner_sink_operator->sink( local_state._shared_state->inner_runtime_state.get(), in_block, eos)); - + local_state.update_memory_usage(); if (eos) { VLOG_DEBUG << fmt::format( "Query: {}, task {}, sink {} eos, set_ready_to_read, nonspill memory " @@ -697,4 +641,9 @@ size_t PartitionedHashJoinSinkOperatorX::get_reserve_mem_size(RuntimeState* stat return local_state.get_reserve_mem_size(state, eos); } +bool PartitionedHashJoinSinkOperatorX::is_spilled(RuntimeState* state) const { + auto& local_state = get_local_state(state); + return local_state._shared_state->need_to_spill; +} + } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 8e3c53ffd38..9e253ce3fca 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -68,10 +68,6 @@ protected: friend class PartitionedHashJoinSinkOperatorX; - std::mutex _spill_mutex; - std::atomic<bool> _spilling_finished {false}; - std::atomic_int32_t _spilling_task_count {0}; - bool _child_eos {false}; std::unique_ptr<vectorized::PartitionerBase> _partitioner; @@ -138,6 +134,8 @@ public: return _inner_probe_operator->require_data_distribution(); } + bool is_spilled(RuntimeState* state) const override; + private: friend class PartitionedHashJoinSinkLocalState; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 7173601595b..767a1f168da 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -1289,16 +1289,20 @@ Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized: template <typename LocalStateType> size_t ScanOperatorX<LocalStateType>::get_reserve_mem_size(RuntimeState* state) { auto& local_state = get_local_state(state); + if (!local_state._opened || local_state._closed || !local_state._scanner_ctx) { + return config::doris_scanner_row_bytes; + } + if (local_state.low_memory_mode()) { return local_state._scanner_ctx->low_memory_mode_scan_bytes_per_scanner() * local_state._scanner_ctx->low_memory_mode_scanners(); } else { - if (local_state._memory_used_counter->value() > 0) { + const auto peak_usage = local_state._memory_used_counter->value(); + const auto block_usage = local_state._scanner_ctx->block_memory_usage(); + if (peak_usage > 0) { // It is only a safty check, to avoid some counter not right. - if (local_state._memory_used_counter->value() > - local_state._scanner_ctx->block_memory_usage()) { - return local_state._memory_used_counter->value() - - local_state._scanner_ctx->block_memory_usage(); + if (peak_usage > block_usage) { + return peak_usage - block_usage; } else { return config::doris_scanner_row_bytes; } diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 25205ab09fe..6a472a09cfd 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -299,4 +299,9 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, return status; } +bool SpillSortSinkOperatorX::is_spilled(RuntimeState* state) const { + auto& local_state = get_local_state(state); + return local_state._shared_state->is_spilled; +} + } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 3d6ccdcc4ce..226fe61d386 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -91,6 +91,7 @@ public: Status revoke_memory(RuntimeState* state, const std::shared_ptr<SpillContext>& spill_context) override; + bool is_spilled(RuntimeState* state) const override; using DataSinkOperatorX<LocalStateType>::node_id; using DataSinkOperatorX<LocalStateType>::operator_id; diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h index e669c0e343c..84a3f8c2e29 100644 --- a/be/src/pipeline/exec/spill_utils.h +++ b/be/src/pipeline/exec/spill_utils.h @@ -187,6 +187,9 @@ protected: } virtual void _on_task_started(uint64_t submit_elapsed_time) { + VLOG_DEBUG << "Query: " << print_id(_state->query_id()) + << " spill task started, pipeline task id: " << _state->task_id() + << ", spill dep: " << (void*)(_spill_dependency.get()); if (_is_write_task) { COUNTER_UPDATE(_spill_write_wait_in_queue_timer, submit_elapsed_time); COUNTER_UPDATE(_write_wait_in_queue_task_count, -1); @@ -269,6 +272,9 @@ protected: } void _on_task_started(uint64_t submit_elapsed_time) override { + LOG(INFO) << "SpillRecoverRunnable, Query: " << print_id(_state->query_id()) + << " spill task started, pipeline task id: " << _state->task_id() + << ", spill dep: " << (void*)(_spill_dependency.get()); COUNTER_UPDATE(_spill_read_wait_in_queue_timer, submit_elapsed_time); COUNTER_UPDATE(_read_wait_in_queue_task_count, -1); COUNTER_UPDATE(_reading_task_count, 1); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 2986b4b96ed..42d2640441e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1841,7 +1841,8 @@ std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const for (const auto& task_instances : _tasks) { for (const auto& task : task_instances) { size_t revocable_size_ = task->get_revocable_size(); - if (revocable_size_ > _runtime_state->min_revocable_mem()) { + if (revocable_size_ > _runtime_state->min_revocable_mem() || + (revocable_size_ > 0 && _query_ctx->enable_force_spill())) { revocable_tasks.emplace_back(task.get()); } } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 6bf4cb39fd1..cd822ef15e2 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -26,6 +26,7 @@ #include <ostream> #include <vector> +#include "common/logging.h" #include "common/status.h" #include "pipeline/dependency.h" #include "pipeline/exec/operator.h" @@ -403,6 +404,7 @@ Status PipelineTask::execute(bool* eos) { << " has pending block, size: " << _pending_block->allocated_bytes(); _block = std::move(_pending_block); block = _block.get(); + *eos = _pending_eos; } // `_dry_run` means sink operator need no more data // `_sink->is_finished(_state)` means sink operator should be finished @@ -420,7 +422,7 @@ Status PipelineTask::execute(bool* eos) { auto st = thread_context()->try_reserve_memory(reserve_size); COUNTER_UPDATE(_memory_reserve_times, 1); - if (!st.ok()) { + if (!st.ok() && !_state->enable_force_spill()) { COUNTER_UPDATE(_memory_reserve_failed_times, 1); auto debug_msg = fmt::format( "Query: {} , try to reserve: {}, operator name: {}, operator id: {}, " @@ -455,9 +457,16 @@ Status PipelineTask::execute(bool* eos) { DEFER_RELEASE_RESERVED(); COUNTER_UPDATE(_memory_reserve_times, 1); const auto sink_reserve_size = _sink->get_reserve_mem_size(_state, *eos); - if (_state->enable_reserve_memory()) { + auto workload_group = _state->get_query_ctx()->workload_group(); + if (_state->enable_reserve_memory() && workload_group) { status = thread_context()->try_reserve_memory(sink_reserve_size); + if (status.ok() && _state->enable_force_spill() && _sink->is_spillable() && + _sink->revocable_mem_size(_state) >= + vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) { + status = Status(ErrorCode::QUERY_MEMORY_EXCEEDED, "Force Spill"); + } + if (!status.ok()) { COUNTER_UPDATE(_memory_reserve_failed_times, 1); auto debug_msg = fmt::format( @@ -481,7 +490,7 @@ Status PipelineTask::execute(bool* eos) { _state->get_query_ctx()->set_memory_sufficient(false); ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query( _state->get_query_ctx()->shared_from_this(), sink_reserve_size); - _eos = *eos; + _pending_eos = *eos; *eos = false; continue; } @@ -644,7 +653,7 @@ std::string PipelineTask::debug_string() { } size_t PipelineTask::get_revocable_size() const { - if (_running || (_eos && !_pending_block)) { + if (_finalized || _running || (_eos && !_pending_block)) { return 0; } @@ -652,6 +661,15 @@ size_t PipelineTask::get_revocable_size() const { } Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>& spill_context) { + if (_finalized) { + if (spill_context) { + spill_context->on_task_finished(); + VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", task: " << ((void*)this) + << " finalized"; + } + return Status::OK(); + } + RETURN_IF_ERROR(_root->revoke_memory(_state, spill_context)); const auto revocable_size = _sink->revocable_mem_size(_state); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index e10d4217464..99decc05a9d 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -270,6 +270,7 @@ private: uint32_t _schedule_time = 0; std::unique_ptr<vectorized::Block> _block; std::unique_ptr<vectorized::Block> _pending_block; + bool _pending_eos = false; PipelineFragmentContext* _fragment_context = nullptr; MultiCoreTaskQueue* _task_queue = nullptr; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index ece9a843d33..96deed90fbf 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -704,7 +704,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo tg_id); } else { LOG(WARNING) << "Query/load id: " << print_id(query_ctx->query_id()) - << "can't find its workload group " << tg_id; + << " can't find its workload group " << tg_id; } } // There is some logic in query ctx's dctor, we could not check if exists and delete the diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index b011e8d850f..55b51088c50 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -176,6 +176,10 @@ public: : false; } + bool enable_force_spill() const { + return _query_options.__isset.enable_force_spill && _query_options.enable_force_spill; + } + // global runtime filter mgr, the runtime filter have remote target or // need local merge should regist here. before publish() or push_to_remote() // the runtime filter should do the local merge work diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 86c33f5a334..4b61cd7892d 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -287,11 +287,6 @@ void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& que } } -#ifdef BE_TEST -constexpr int64_t TIMEOUT_IN_QUEUE_LIMIT = 10L; -#else -constexpr int64_t TIMEOUT_IN_QUEUE_LIMIT = 1000L * 60; -#endif /** * Strategy 1: A revocable query should not have any running task(PipelineTask). * strategy 2: If the workload group has any task exceed workload group memlimit, then set all queryctx's memlimit @@ -425,7 +420,7 @@ void WorkloadGroupMgr::handle_paused_queries() { } else { // Should not put the query back to task scheduler immediately, because when wg's memory not sufficient, // and then set wg's flag, other query may not free memory very quickly. - if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE_LIMIT) { + if (query_it->elapsed_time() > config::spill_in_paused_queue_timeout_ms) { // set wg's memory to insufficent, then add it back to task scheduler to run. LOG(INFO) << "Query: " << print_id(query_ctx->query_id()) << " will be resume."; @@ -676,7 +671,7 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& << "), resume it."; query_ctx->set_memory_sufficient(true); return true; - } else if (time_in_queue >= TIMEOUT_IN_QUEUE_LIMIT) { + } else if (time_in_queue >= config::spill_in_paused_queue_timeout_ms) { // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code and do try logic auto msg1 = fmt::format( "Query {} reserve memory failed, but could not find memory that could " @@ -707,7 +702,7 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it."; query_ctx->set_memory_sufficient(true); return true; - } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) { + } else if (time_in_queue > config::spill_in_paused_queue_timeout_ms) { LOG(INFO) << "Query: " << query_id << ", workload group exceeded, info: " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); @@ -731,7 +726,7 @@ bool WorkloadGroupMgr::handle_single_query_(const std::shared_ptr<QueryContext>& << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); query_ctx->set_memory_sufficient(true); return true; - } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) { + } else if (time_in_queue > config::spill_in_paused_queue_timeout_ms) { LOG(INFO) << "Query: " << query_id << ", process limit exceeded, info: " << GlobalMemoryArbitrator::process_memory_used_details_str() << ", wg info: " << query_ctx->workload_group()->memory_debug_string(); diff --git a/be/src/vec/common/columns_hashing.h b/be/src/vec/common/columns_hashing.h index f0a365cfc09..60c2370bc60 100644 --- a/be/src/vec/common/columns_hashing.h +++ b/be/src/vec/common/columns_hashing.h @@ -152,8 +152,12 @@ struct HashMethodSingleLowNullableColumn : public SingleColumnMethod { template <typename Data, typename Key> ALWAYS_INLINE FindResult find_key_with_hash(Data& data, size_t i, Key key, size_t hash_value) { - if (key_column->is_null_at(i) && data.has_null_key_data()) { - return FindResult {&data.template get_null_key_data<Mapped>(), true}; + if (key_column->is_null_at(i)) { + if (data.has_null_key_data()) { + return FindResult {&data.template get_null_key_data<Mapped>(), true}; + } else { + return FindResult {nullptr, false}; + } } return Base::find_key_impl(key, hash_value, data); } diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 4dc553b1a57..38185ded5fb 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -734,7 +734,8 @@ void Block::clear_column_data(int64_t column_size) noexcept { } for (auto& d : data) { if (d.column) { - DCHECK_EQ(d.column->use_count(), 1) << " " << print_use_count(); + // Temporarily disable reference count check because a column might be referenced multiple times within a block. + // DCHECK_EQ(d.column->use_count(), 1) << " " << print_use_count(); (*std::move(d.column)).assume_mutable()->clear(); } } diff --git a/be/src/vec/functions/function_string.h b/be/src/vec/functions/function_string.h index 6e4a18fdd31..7626f86b6a9 100644 --- a/be/src/vec/functions/function_string.h +++ b/be/src/vec/functions/function_string.h @@ -702,6 +702,8 @@ public: size_t get_number_of_arguments() const override { return 0; } + ColumnNumbers get_arguments_that_are_always_constant() const override { return {1, 2, 3}; } + bool is_variadic() const override { return true; } Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, diff --git a/regression-test/suites/correctness_p0/test_mask_function.groovy b/regression-test/suites/correctness_p0/test_mask_function.groovy index b7717ab183c..c1116e633d2 100644 --- a/regression-test/suites/correctness_p0/test_mask_function.groovy +++ b/regression-test/suites/correctness_p0/test_mask_function.groovy @@ -76,6 +76,27 @@ suite("test_mask_function") { select digital_masking(13812345678); """ + test { + sql """ + select mask('abcd', name) from table_mask_test order by id; + """ + exception "Argument at index 1 for function mask must be constant" + } + + test { + sql """ + select mask('abcd', '>', name) from table_mask_test order by id; + """ + exception "Argument at index 2 for function mask must be constant" + } + + test { + sql """ + select mask('abcd', '>', '<', `name`) from table_mask_test order by id; + """ + exception "Argument at index 3 for function mask must be constant" + } + test { sql """ select mask_last_n("12345", -100); """ exception "function mask_last_n only accept non-negative input for 2nd argument but got -100" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org