This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new de96162ab37 [Chore](hash) catch error when hash method variant meet valueless_by_exception #34956 de96162ab37 is described below commit de96162ab3729569acf7e01cdc5d7f260eede39d Author: Pxl <pxl...@qq.com> AuthorDate: Fri May 17 11:33:41 2024 +0800 [Chore](hash) catch error when hash method variant meet valueless_by_exception #34956 --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 12 ++- be/src/pipeline/exec/aggregation_sink_operator.h | 2 +- .../distinct_streaming_aggregation_operator.cpp | 13 +-- .../exec/distinct_streaming_aggregation_operator.h | 2 +- .../pipeline/exec/partition_sort_sink_operator.cpp | 9 ++- .../pipeline/exec/partition_sort_sink_operator.h | 2 +- .../exec/streaming_aggregation_operator.cpp | 12 ++- .../pipeline/exec/streaming_aggregation_operator.h | 2 +- .../common/hash_table/hash_map_context_creator.h | 8 +- be/src/vec/exec/vaggregation_node.cpp | 9 ++- be/src/vec/exec/vaggregation_node.h | 2 +- be/src/vec/exec/vpartition_sort_node.cpp | 92 ++++++++++++---------- be/src/vec/exec/vpartition_sort_node.h | 4 +- 13 files changed, 100 insertions(+), 69 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index c36427b9519..dcdff106b90 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -20,6 +20,7 @@ #include <memory> #include <string> +#include "common/status.h" #include "pipeline/exec/operator.h" #include "runtime/primitive_type.h" #include "vec/common/hash_table/hash.h" @@ -100,7 +101,7 @@ Status AggSinkLocalState::open(RuntimeState* state) { _executor = std::make_unique<Executor<true, false>>(); } } else { - _init_hash_method(Base::_shared_state->probe_expr_ctxs); + RETURN_IF_ERROR(_init_hash_method(Base::_shared_state->probe_expr_ctxs)); std::visit(vectorized::Overload { [&](std::monostate& arg) { @@ -565,9 +566,12 @@ void AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places _agg_data->method_variant); } -void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs) { - init_agg_hash_method(_agg_data, probe_exprs, - Base::_parent->template cast<AggSinkOperatorX>()._is_first_phase); +Status AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs) { + if (!init_agg_hash_method(_agg_data, probe_exprs, + Base::_parent->template cast<AggSinkOperatorX>()._is_first_phase)) { + return Status::InternalError("init hash method failed"); + } + return Status::OK(); } AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index d32e5f616c2..036cbbf3162 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -75,7 +75,7 @@ protected: Status _execute_without_key(vectorized::Block* block); Status _merge_without_key(vectorized::Block* block); void _update_memusage_without_key(); - void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs); + Status _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs); Status _execute_with_serialized_key(vectorized::Block* block); Status _merge_with_serialized_key(vectorized::Block* block); void _update_memusage_with_serialized_key(); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 82b29b2afdb..c4c1ba370b0 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -98,7 +98,7 @@ Status DistinctStreamingAggLocalState::open(RuntimeState* state) { _agg_data->without_key = reinterpret_cast<vectorized::AggregateDataPtr>( _agg_profile_arena->alloc(p._total_size_of_aggregate_states)); } else { - _init_hash_method(_probe_expr_ctxs); + RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs)); } return Status::OK(); } @@ -168,11 +168,14 @@ bool DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() { _agg_data->method_variant); } -void DistinctStreamingAggLocalState::_init_hash_method( +Status DistinctStreamingAggLocalState::_init_hash_method( const vectorized::VExprContextSPtrs& probe_exprs) { - init_agg_hash_method( - _agg_data.get(), probe_exprs, - Base::_parent->template cast<DistinctStreamingAggOperatorX>()._is_first_phase); + if (!init_agg_hash_method( + _agg_data.get(), probe_exprs, + Base::_parent->template cast<DistinctStreamingAggOperatorX>()._is_first_phase)) { + return Status::InternalError("init agg hash method failed"); + } + return Status::OK(); } Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key( diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index db951b44142..0a3af64ed46 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -52,7 +52,7 @@ private: friend class StatefulOperatorX; Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block, vectorized::Block* out_block); - void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs); + Status _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs); void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector& distinct_row, vectorized::ColumnRawPtrs& key_columns, const size_t num_rows); diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp b/be/src/pipeline/exec/partition_sort_sink_operator.cpp index 3ec8b9e66cf..9d6cbd29b2f 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp @@ -46,7 +46,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first, p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit, p._top_n_algorithm, p._topn_phase); - _init_hash_method(); + RETURN_IF_ERROR(_init_hash_method()); return Status::OK(); } @@ -223,8 +223,11 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table( local_state._partitioned_data->method_variant); } -void PartitionSortSinkLocalState::_init_hash_method() { - init_partition_hash_method(_partitioned_data.get(), _partition_expr_ctxs, true); +Status PartitionSortSinkLocalState::_init_hash_method() { + if (!init_partition_hash_method(_partitioned_data.get(), _partition_expr_ctxs, true)) { + return Status::InternalError("init hash method failed"); + } + return Status::OK(); } } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h b/be/src/pipeline/exec/partition_sort_sink_operator.h index 43829f95a8f..f4d88204c0d 100644 --- a/be/src/pipeline/exec/partition_sort_sink_operator.h +++ b/be/src/pipeline/exec/partition_sort_sink_operator.h @@ -56,7 +56,7 @@ private: RuntimeProfile::Counter* _selector_block_timer = nullptr; RuntimeProfile::Counter* _hash_table_size_counter = nullptr; RuntimeProfile::Counter* _passthrough_rows_counter = nullptr; - void _init_hash_method(); + Status _init_hash_method(); }; class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortSinkLocalState> { diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index 10bd37d8c44..ba66d01a0f8 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -152,7 +152,7 @@ Status StreamingAggLocalState::open(RuntimeState* state) { } } } else { - _init_hash_method(_probe_expr_ctxs); + RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs)); std::visit(vectorized::Overload { [&](std::monostate& arg) -> void { @@ -503,9 +503,13 @@ Status StreamingAggLocalState::_merge_with_serialized_key(vectorized::Block* blo } } -void StreamingAggLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs) { - init_agg_hash_method(_agg_data.get(), probe_exprs, - Base::_parent->template cast<StreamingAggOperatorX>()._is_first_phase); +Status StreamingAggLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs) { + if (!init_agg_hash_method( + _agg_data.get(), probe_exprs, + Base::_parent->template cast<StreamingAggOperatorX>()._is_first_phase)) { + return Status::InternalError("init hash method failed"); + } + return Status::OK(); } Status StreamingAggLocalState::do_pre_agg(vectorized::Block* input_block, diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index 17040ca59ff..227536170ea 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -63,7 +63,7 @@ private: Status _execute_with_serialized_key(vectorized::Block* block); Status _merge_with_serialized_key(vectorized::Block* block); void _update_memusage_with_serialized_key(); - void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs); + Status _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs); Status _get_without_key_result(RuntimeState* state, vectorized::Block* block, bool* eos); Status _serialize_without_key(RuntimeState* state, vectorized::Block* block, bool* eos); Status _get_with_serialized_key_result(RuntimeState* state, vectorized::Block* block, diff --git a/be/src/vec/common/hash_table/hash_map_context_creator.h b/be/src/vec/common/hash_table/hash_map_context_creator.h index 22760ddfcb4..426892d3442 100644 --- a/be/src/vec/common/hash_table/hash_map_context_creator.h +++ b/be/src/vec/common/hash_table/hash_map_context_creator.h @@ -104,7 +104,7 @@ bool try_get_hash_map_context_fixed(Variant& variant, const std::vector<DataType } template <typename DataVariants, typename Data> -void init_hash_method(DataVariants* agg_data, const vectorized::VExprContextSPtrs& probe_exprs, +bool init_hash_method(DataVariants* agg_data, const vectorized::VExprContextSPtrs& probe_exprs, bool is_first_phase) { using Type = DataVariants::Type; Type t(Type::serialized); @@ -164,5 +164,11 @@ void init_hash_method(DataVariants* agg_data, const vectorized::VExprContextSPtr agg_data->init(Type::serialized); } } + + if (agg_data->method_variant.valueless_by_exception()) { + agg_data->method_variant.template emplace<std::monostate>(); + return false; + } + return true; } } // namespace doris \ No newline at end of file diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 77a883558de..3a66773cf03 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -161,8 +161,11 @@ Status AggregationNode::init(const TPlanNode& tnode, RuntimeState* state) { return Status::OK(); } -void AggregationNode::_init_hash_method(const VExprContextSPtrs& probe_exprs) { - init_agg_hash_method(_agg_data.get(), probe_exprs, _is_first_phase); +Status AggregationNode::_init_hash_method(const VExprContextSPtrs& probe_exprs) { + if (!init_agg_hash_method(_agg_data.get(), probe_exprs, _is_first_phase)) { + return Status::InternalError("init hash method failed"); + } + return Status::OK(); } Status AggregationNode::prepare_profile(RuntimeState* state) { @@ -269,7 +272,7 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { std::bind<void>(&AggregationNode::_update_memusage_without_key, this); _executor.close = std::bind<void>(&AggregationNode::_close_without_key, this); } else { - _init_hash_method(_probe_expr_ctxs); + RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs)); std::visit(Overload {[&](std::monostate& arg) { throw doris::Exception(ErrorCode::INTERNAL_ERROR, diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 5815f02e5bb..de94cd6d59b 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -533,7 +533,7 @@ private: Status _merge_with_serialized_key(Block* block); void _update_memusage_with_serialized_key(); void _close_with_serialized_key(); - void _init_hash_method(const VExprContextSPtrs& probe_exprs); + Status _init_hash_method(const VExprContextSPtrs& probe_exprs); template <bool limit> Status _execute_with_serialized_key_helper(Block* block) { diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index d7bae4bd35d..7788d58955e 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -158,7 +158,7 @@ Status VPartitionSortNode::prepare(RuntimeState* state) { SCOPED_TIMER(_exec_timer); RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor)); RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, child(0)->row_desc())); - _init_hash_method(); + RETURN_IF_ERROR(_init_hash_method()); _partition_sort_info = std::make_shared<PartitionSortInfo>( &_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, child(0)->row_desc(), @@ -182,45 +182,50 @@ Status VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_col const vectorized::Block* input_block, bool eos) { return std::visit( - [&](auto&& agg_method) -> Status { - SCOPED_TIMER(_build_timer); - using HashMethodType = std::decay_t<decltype(agg_method)>; - using AggState = typename HashMethodType::State; - - AggState state(key_columns); - size_t num_rows = input_block->rows(); - agg_method.init_serialized_keys(key_columns, num_rows); - - auto creator = [&](const auto& ctor, auto& key, auto& origin) { - HashMethodType::try_presis_key(key, origin, *_agg_arena_pool); - auto* aggregate_data = _pool->add( - new PartitionBlocks(_partition_sort_info, _value_places.empty())); - _value_places.push_back(aggregate_data); - ctor(key, aggregate_data); - _num_partition++; - }; - auto creator_for_null_key = [&](auto& mapped) { - mapped = _pool->add( - new PartitionBlocks(_partition_sort_info, _value_places.empty())); - _value_places.push_back(mapped); - _num_partition++; - }; - { - SCOPED_TIMER(_emplace_key_timer); - for (size_t row = 0; row < num_rows; ++row) { - auto& mapped = - agg_method.lazy_emplace(state, row, creator, creator_for_null_key); - mapped->add_row_idx(row); - } - } - { - SCOPED_TIMER(_selector_block_timer); - for (auto* place : _value_places) { - RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos)); - } - } - return Status::OK(); - }, + vectorized::Overload { + [&](std::monostate& arg) -> Status { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); + return Status::InternalError("Unit hash table"); + }, + [&](auto&& agg_method) -> Status { + SCOPED_TIMER(_build_timer); + using HashMethodType = std::decay_t<decltype(agg_method)>; + using AggState = typename HashMethodType::State; + + AggState state(key_columns); + size_t num_rows = input_block->rows(); + agg_method.init_serialized_keys(key_columns, num_rows); + + auto creator = [&](const auto& ctor, auto& key, auto& origin) { + HashMethodType::try_presis_key(key, origin, *_agg_arena_pool); + auto* aggregate_data = _pool->add(new PartitionBlocks( + _partition_sort_info, _value_places.empty())); + _value_places.push_back(aggregate_data); + ctor(key, aggregate_data); + _num_partition++; + }; + auto creator_for_null_key = [&](auto& mapped) { + mapped = _pool->add(new PartitionBlocks(_partition_sort_info, + _value_places.empty())); + _value_places.push_back(mapped); + _num_partition++; + }; + { + SCOPED_TIMER(_emplace_key_timer); + for (size_t row = 0; row < num_rows; ++row) { + auto& mapped = agg_method.lazy_emplace(state, row, creator, + creator_for_null_key); + mapped->add_row_idx(row); + } + } + { + SCOPED_TIMER(_selector_block_timer); + for (auto* place : _value_places) { + RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos)); + } + } + return Status::OK(); + }}, _partitioned_data->method_variant); } @@ -397,8 +402,11 @@ void VPartitionSortNode::release_resource(RuntimeState* state) { ExecNode::release_resource(state); } -void VPartitionSortNode::_init_hash_method() { - init_partition_hash_method(_partitioned_data.get(), _partition_expr_ctxs, true); +Status VPartitionSortNode::_init_hash_method() { + if (!init_partition_hash_method(_partitioned_data.get(), _partition_expr_ctxs, true)) { + return Status::InternalError("init hash method failed"); + } + return Status::OK(); } void VPartitionSortNode::debug_profile() { diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h index 10135bd5107..481a99719fb 100644 --- a/be/src/vec/exec/vpartition_sort_node.h +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -133,7 +133,7 @@ using PartitionDataWithUInt256Key = PHHashMap<UInt256, PartitionDataPtr, HashCRC using PartitionDataWithUInt136Key = PHHashMap<UInt136, PartitionDataPtr, HashCRC32<UInt136>>; using PartitionedMethodVariants = std::variant< - MethodSerialized<PartitionDataWithStringKey>, + std::monostate, MethodSerialized<PartitionDataWithStringKey>, MethodOneNumber<UInt8, PartitionDataWithUInt8Key>, MethodOneNumber<UInt16, PartitionDataWithUInt16Key>, MethodOneNumber<UInt32, PartitionDataWithUInt32Key>, @@ -236,7 +236,7 @@ public: bool can_read(); private: - void _init_hash_method(); + Status _init_hash_method(); Status _split_block_by_partition(vectorized::Block* input_block, bool eos); Status _emplace_into_hash_table(const ColumnRawPtrs& key_columns, const vectorized::Block* input_block, bool eos); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org