This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new de96162ab37 [Chore](hash) catch error when hash method variant meet 
valueless_by_exception #34956
de96162ab37 is described below

commit de96162ab3729569acf7e01cdc5d7f260eede39d
Author: Pxl <pxl...@qq.com>
AuthorDate: Fri May 17 11:33:41 2024 +0800

    [Chore](hash) catch error when hash method variant meet 
valueless_by_exception #34956
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp | 12 ++-
 be/src/pipeline/exec/aggregation_sink_operator.h   |  2 +-
 .../distinct_streaming_aggregation_operator.cpp    | 13 +--
 .../exec/distinct_streaming_aggregation_operator.h |  2 +-
 .../pipeline/exec/partition_sort_sink_operator.cpp |  9 ++-
 .../pipeline/exec/partition_sort_sink_operator.h   |  2 +-
 .../exec/streaming_aggregation_operator.cpp        | 12 ++-
 .../pipeline/exec/streaming_aggregation_operator.h |  2 +-
 .../common/hash_table/hash_map_context_creator.h   |  8 +-
 be/src/vec/exec/vaggregation_node.cpp              |  9 ++-
 be/src/vec/exec/vaggregation_node.h                |  2 +-
 be/src/vec/exec/vpartition_sort_node.cpp           | 92 ++++++++++++----------
 be/src/vec/exec/vpartition_sort_node.h             |  4 +-
 13 files changed, 100 insertions(+), 69 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index c36427b9519..dcdff106b90 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -20,6 +20,7 @@
 #include <memory>
 #include <string>
 
+#include "common/status.h"
 #include "pipeline/exec/operator.h"
 #include "runtime/primitive_type.h"
 #include "vec/common/hash_table/hash.h"
@@ -100,7 +101,7 @@ Status AggSinkLocalState::open(RuntimeState* state) {
             _executor = std::make_unique<Executor<true, false>>();
         }
     } else {
-        _init_hash_method(Base::_shared_state->probe_expr_ctxs);
+        
RETURN_IF_ERROR(_init_hash_method(Base::_shared_state->probe_expr_ctxs));
 
         std::visit(vectorized::Overload {
                            [&](std::monostate& arg) {
@@ -565,9 +566,12 @@ void 
AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places
                _agg_data->method_variant);
 }
 
-void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& 
probe_exprs) {
-    init_agg_hash_method(_agg_data, probe_exprs,
-                         Base::_parent->template 
cast<AggSinkOperatorX>()._is_first_phase);
+Status AggSinkLocalState::_init_hash_method(const 
vectorized::VExprContextSPtrs& probe_exprs) {
+    if (!init_agg_hash_method(_agg_data, probe_exprs,
+                              Base::_parent->template 
cast<AggSinkOperatorX>()._is_first_phase)) {
+        return Status::InternalError("init hash method failed");
+    }
+    return Status::OK();
 }
 
 AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index d32e5f616c2..036cbbf3162 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -75,7 +75,7 @@ protected:
     Status _execute_without_key(vectorized::Block* block);
     Status _merge_without_key(vectorized::Block* block);
     void _update_memusage_without_key();
-    void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
+    Status _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
     Status _execute_with_serialized_key(vectorized::Block* block);
     Status _merge_with_serialized_key(vectorized::Block* block);
     void _update_memusage_with_serialized_key();
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 82b29b2afdb..c4c1ba370b0 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -98,7 +98,7 @@ Status DistinctStreamingAggLocalState::open(RuntimeState* 
state) {
         _agg_data->without_key = 
reinterpret_cast<vectorized::AggregateDataPtr>(
                 _agg_profile_arena->alloc(p._total_size_of_aggregate_states));
     } else {
-        _init_hash_method(_probe_expr_ctxs);
+        RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs));
     }
     return Status::OK();
 }
@@ -168,11 +168,14 @@ bool 
DistinctStreamingAggLocalState::_should_expand_preagg_hash_tables() {
             _agg_data->method_variant);
 }
 
-void DistinctStreamingAggLocalState::_init_hash_method(
+Status DistinctStreamingAggLocalState::_init_hash_method(
         const vectorized::VExprContextSPtrs& probe_exprs) {
-    init_agg_hash_method(
-            _agg_data.get(), probe_exprs,
-            Base::_parent->template 
cast<DistinctStreamingAggOperatorX>()._is_first_phase);
+    if (!init_agg_hash_method(
+                _agg_data.get(), probe_exprs,
+                Base::_parent->template 
cast<DistinctStreamingAggOperatorX>()._is_first_phase)) {
+        return Status::InternalError("init agg hash method failed");
+    }
+    return Status::OK();
 }
 
 Status DistinctStreamingAggLocalState::_distinct_pre_agg_with_serialized_key(
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index db951b44142..0a3af64ed46 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -52,7 +52,7 @@ private:
     friend class StatefulOperatorX;
     Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block,
                                                  vectorized::Block* out_block);
-    void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
+    Status _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
     void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector& 
distinct_row,
                                               vectorized::ColumnRawPtrs& 
key_columns,
                                               const size_t num_rows);
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 3ec8b9e66cf..9d6cbd29b2f 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -46,7 +46,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo
             &_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, 
p._nulls_first,
             p._child_x->row_desc(), state, _profile, p._has_global_limit, 
p._partition_inner_limit,
             p._top_n_algorithm, p._topn_phase);
-    _init_hash_method();
+    RETURN_IF_ERROR(_init_hash_method());
     return Status::OK();
 }
 
@@ -223,8 +223,11 @@ Status 
PartitionSortSinkOperatorX::_emplace_into_hash_table(
             local_state._partitioned_data->method_variant);
 }
 
-void PartitionSortSinkLocalState::_init_hash_method() {
-    init_partition_hash_method(_partitioned_data.get(), _partition_expr_ctxs, 
true);
+Status PartitionSortSinkLocalState::_init_hash_method() {
+    if (!init_partition_hash_method(_partitioned_data.get(), 
_partition_expr_ctxs, true)) {
+        return Status::InternalError("init hash method failed");
+    }
+    return Status::OK();
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h 
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index 43829f95a8f..f4d88204c0d 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -56,7 +56,7 @@ private:
     RuntimeProfile::Counter* _selector_block_timer = nullptr;
     RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
     RuntimeProfile::Counter* _passthrough_rows_counter = nullptr;
-    void _init_hash_method();
+    Status _init_hash_method();
 };
 
 class PartitionSortSinkOperatorX final : public 
DataSinkOperatorX<PartitionSortSinkLocalState> {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 10bd37d8c44..ba66d01a0f8 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -152,7 +152,7 @@ Status StreamingAggLocalState::open(RuntimeState* state) {
             }
         }
     } else {
-        _init_hash_method(_probe_expr_ctxs);
+        RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs));
 
         std::visit(vectorized::Overload {
                            [&](std::monostate& arg) -> void {
@@ -503,9 +503,13 @@ Status 
StreamingAggLocalState::_merge_with_serialized_key(vectorized::Block* blo
     }
 }
 
-void StreamingAggLocalState::_init_hash_method(const 
vectorized::VExprContextSPtrs& probe_exprs) {
-    init_agg_hash_method(_agg_data.get(), probe_exprs,
-                         Base::_parent->template 
cast<StreamingAggOperatorX>()._is_first_phase);
+Status StreamingAggLocalState::_init_hash_method(const 
vectorized::VExprContextSPtrs& probe_exprs) {
+    if (!init_agg_hash_method(
+                _agg_data.get(), probe_exprs,
+                Base::_parent->template 
cast<StreamingAggOperatorX>()._is_first_phase)) {
+        return Status::InternalError("init hash method failed");
+    }
+    return Status::OK();
 }
 
 Status StreamingAggLocalState::do_pre_agg(vectorized::Block* input_block,
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 17040ca59ff..227536170ea 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -63,7 +63,7 @@ private:
     Status _execute_with_serialized_key(vectorized::Block* block);
     Status _merge_with_serialized_key(vectorized::Block* block);
     void _update_memusage_with_serialized_key();
-    void _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
+    Status _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
     Status _get_without_key_result(RuntimeState* state, vectorized::Block* 
block, bool* eos);
     Status _serialize_without_key(RuntimeState* state, vectorized::Block* 
block, bool* eos);
     Status _get_with_serialized_key_result(RuntimeState* state, 
vectorized::Block* block,
diff --git a/be/src/vec/common/hash_table/hash_map_context_creator.h 
b/be/src/vec/common/hash_table/hash_map_context_creator.h
index 22760ddfcb4..426892d3442 100644
--- a/be/src/vec/common/hash_table/hash_map_context_creator.h
+++ b/be/src/vec/common/hash_table/hash_map_context_creator.h
@@ -104,7 +104,7 @@ bool try_get_hash_map_context_fixed(Variant& variant, const 
std::vector<DataType
 }
 
 template <typename DataVariants, typename Data>
-void init_hash_method(DataVariants* agg_data, const 
vectorized::VExprContextSPtrs& probe_exprs,
+bool init_hash_method(DataVariants* agg_data, const 
vectorized::VExprContextSPtrs& probe_exprs,
                       bool is_first_phase) {
     using Type = DataVariants::Type;
     Type t(Type::serialized);
@@ -164,5 +164,11 @@ void init_hash_method(DataVariants* agg_data, const 
vectorized::VExprContextSPtr
             agg_data->init(Type::serialized);
         }
     }
+
+    if (agg_data->method_variant.valueless_by_exception()) {
+        agg_data->method_variant.template emplace<std::monostate>();
+        return false;
+    }
+    return true;
 }
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index 77a883558de..3a66773cf03 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -161,8 +161,11 @@ Status AggregationNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     return Status::OK();
 }
 
-void AggregationNode::_init_hash_method(const VExprContextSPtrs& probe_exprs) {
-    init_agg_hash_method(_agg_data.get(), probe_exprs, _is_first_phase);
+Status AggregationNode::_init_hash_method(const VExprContextSPtrs& 
probe_exprs) {
+    if (!init_agg_hash_method(_agg_data.get(), probe_exprs, _is_first_phase)) {
+        return Status::InternalError("init hash method failed");
+    }
+    return Status::OK();
 }
 
 Status AggregationNode::prepare_profile(RuntimeState* state) {
@@ -269,7 +272,7 @@ Status AggregationNode::prepare_profile(RuntimeState* 
state) {
                 
std::bind<void>(&AggregationNode::_update_memusage_without_key, this);
         _executor.close = 
std::bind<void>(&AggregationNode::_close_without_key, this);
     } else {
-        _init_hash_method(_probe_expr_ctxs);
+        RETURN_IF_ERROR(_init_hash_method(_probe_expr_ctxs));
 
         std::visit(Overload {[&](std::monostate& arg) {
                                  throw 
doris::Exception(ErrorCode::INTERNAL_ERROR,
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index 5815f02e5bb..de94cd6d59b 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -533,7 +533,7 @@ private:
     Status _merge_with_serialized_key(Block* block);
     void _update_memusage_with_serialized_key();
     void _close_with_serialized_key();
-    void _init_hash_method(const VExprContextSPtrs& probe_exprs);
+    Status _init_hash_method(const VExprContextSPtrs& probe_exprs);
 
     template <bool limit>
     Status _execute_with_serialized_key_helper(Block* block) {
diff --git a/be/src/vec/exec/vpartition_sort_node.cpp 
b/be/src/vec/exec/vpartition_sort_node.cpp
index d7bae4bd35d..7788d58955e 100644
--- a/be/src/vec/exec/vpartition_sort_node.cpp
+++ b/be/src/vec/exec/vpartition_sort_node.cpp
@@ -158,7 +158,7 @@ Status VPartitionSortNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_exec_timer);
     RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), 
_row_descriptor));
     RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, 
child(0)->row_desc()));
-    _init_hash_method();
+    RETURN_IF_ERROR(_init_hash_method());
 
     _partition_sort_info = std::make_shared<PartitionSortInfo>(
             &_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, 
child(0)->row_desc(),
@@ -182,45 +182,50 @@ Status VPartitionSortNode::_emplace_into_hash_table(const 
ColumnRawPtrs& key_col
                                                     const vectorized::Block* 
input_block,
                                                     bool eos) {
     return std::visit(
-            [&](auto&& agg_method) -> Status {
-                SCOPED_TIMER(_build_timer);
-                using HashMethodType = std::decay_t<decltype(agg_method)>;
-                using AggState = typename HashMethodType::State;
-
-                AggState state(key_columns);
-                size_t num_rows = input_block->rows();
-                agg_method.init_serialized_keys(key_columns, num_rows);
-
-                auto creator = [&](const auto& ctor, auto& key, auto& origin) {
-                    HashMethodType::try_presis_key(key, origin, 
*_agg_arena_pool);
-                    auto* aggregate_data = _pool->add(
-                            new PartitionBlocks(_partition_sort_info, 
_value_places.empty()));
-                    _value_places.push_back(aggregate_data);
-                    ctor(key, aggregate_data);
-                    _num_partition++;
-                };
-                auto creator_for_null_key = [&](auto& mapped) {
-                    mapped = _pool->add(
-                            new PartitionBlocks(_partition_sort_info, 
_value_places.empty()));
-                    _value_places.push_back(mapped);
-                    _num_partition++;
-                };
-                {
-                    SCOPED_TIMER(_emplace_key_timer);
-                    for (size_t row = 0; row < num_rows; ++row) {
-                        auto& mapped =
-                                agg_method.lazy_emplace(state, row, creator, 
creator_for_null_key);
-                        mapped->add_row_idx(row);
-                    }
-                }
-                {
-                    SCOPED_TIMER(_selector_block_timer);
-                    for (auto* place : _value_places) {
-                        
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
-                    }
-                }
-                return Status::OK();
-            },
+            vectorized::Overload {
+                    [&](std::monostate& arg) -> Status {
+                        throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
"uninited hash table");
+                        return Status::InternalError("Unit hash table");
+                    },
+                    [&](auto&& agg_method) -> Status {
+                        SCOPED_TIMER(_build_timer);
+                        using HashMethodType = 
std::decay_t<decltype(agg_method)>;
+                        using AggState = typename HashMethodType::State;
+
+                        AggState state(key_columns);
+                        size_t num_rows = input_block->rows();
+                        agg_method.init_serialized_keys(key_columns, num_rows);
+
+                        auto creator = [&](const auto& ctor, auto& key, auto& 
origin) {
+                            HashMethodType::try_presis_key(key, origin, 
*_agg_arena_pool);
+                            auto* aggregate_data = _pool->add(new 
PartitionBlocks(
+                                    _partition_sort_info, 
_value_places.empty()));
+                            _value_places.push_back(aggregate_data);
+                            ctor(key, aggregate_data);
+                            _num_partition++;
+                        };
+                        auto creator_for_null_key = [&](auto& mapped) {
+                            mapped = _pool->add(new 
PartitionBlocks(_partition_sort_info,
+                                                                    
_value_places.empty()));
+                            _value_places.push_back(mapped);
+                            _num_partition++;
+                        };
+                        {
+                            SCOPED_TIMER(_emplace_key_timer);
+                            for (size_t row = 0; row < num_rows; ++row) {
+                                auto& mapped = agg_method.lazy_emplace(state, 
row, creator,
+                                                                       
creator_for_null_key);
+                                mapped->add_row_idx(row);
+                            }
+                        }
+                        {
+                            SCOPED_TIMER(_selector_block_timer);
+                            for (auto* place : _value_places) {
+                                
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
+                            }
+                        }
+                        return Status::OK();
+                    }},
             _partitioned_data->method_variant);
 }
 
@@ -397,8 +402,11 @@ void VPartitionSortNode::release_resource(RuntimeState* 
state) {
     ExecNode::release_resource(state);
 }
 
-void VPartitionSortNode::_init_hash_method() {
-    init_partition_hash_method(_partitioned_data.get(), _partition_expr_ctxs, 
true);
+Status VPartitionSortNode::_init_hash_method() {
+    if (!init_partition_hash_method(_partitioned_data.get(), 
_partition_expr_ctxs, true)) {
+        return Status::InternalError("init hash method failed");
+    }
+    return Status::OK();
 }
 
 void VPartitionSortNode::debug_profile() {
diff --git a/be/src/vec/exec/vpartition_sort_node.h 
b/be/src/vec/exec/vpartition_sort_node.h
index 10135bd5107..481a99719fb 100644
--- a/be/src/vec/exec/vpartition_sort_node.h
+++ b/be/src/vec/exec/vpartition_sort_node.h
@@ -133,7 +133,7 @@ using PartitionDataWithUInt256Key = PHHashMap<UInt256, 
PartitionDataPtr, HashCRC
 using PartitionDataWithUInt136Key = PHHashMap<UInt136, PartitionDataPtr, 
HashCRC32<UInt136>>;
 
 using PartitionedMethodVariants = std::variant<
-        MethodSerialized<PartitionDataWithStringKey>,
+        std::monostate, MethodSerialized<PartitionDataWithStringKey>,
         MethodOneNumber<UInt8, PartitionDataWithUInt8Key>,
         MethodOneNumber<UInt16, PartitionDataWithUInt16Key>,
         MethodOneNumber<UInt32, PartitionDataWithUInt32Key>,
@@ -236,7 +236,7 @@ public:
     bool can_read();
 
 private:
-    void _init_hash_method();
+    Status _init_hash_method();
     Status _split_block_by_partition(vectorized::Block* input_block, bool eos);
     Status _emplace_into_hash_table(const ColumnRawPtrs& key_columns,
                                     const vectorized::Block* input_block, bool 
eos);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to