This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 9986fa747ad [Chore](compatible) adjust register_alternative_function (#40941) 9986fa747ad is described below commit 9986fa747ad6f44bd855e2a997bfabbe2e55fa9e Author: Pxl <pxl...@qq.com> AuthorDate: Thu Sep 19 16:24:40 2024 +0800 [Chore](compatible) adjust register_alternative_function (#40941) ## Proposed changes adjust register_alternative_function --- be/src/agent/be_exec_version_manager.cpp | 49 ++++++++++++++-------- be/src/agent/be_exec_version_manager.h | 21 ++++++++-- be/src/olap/rowset/segment_v2/column_reader.cpp | 2 +- be/src/olap/tablet_schema.cpp | 2 +- .../aggregate_function_covar.cpp | 8 ++-- .../aggregate_function_percentile.cpp | 16 ++++--- .../aggregate_function_percentile_approx.cpp | 11 +++-- .../aggregate_function_simple_factory.h | 23 ++++------ .../aggregate_function_stddev.cpp | 14 ++++--- .../aggregate_function_window_funnel.cpp | 6 ++- be/src/vec/data_types/data_type_agg_state.h | 6 +-- be/src/vec/functions/simple_function_factory.h | 8 ---- 12 files changed, 97 insertions(+), 69 deletions(-) diff --git a/be/src/agent/be_exec_version_manager.cpp b/be/src/agent/be_exec_version_manager.cpp index 32cbe569892..e44829ae39b 100644 --- a/be/src/agent/be_exec_version_manager.cpp +++ b/be/src/agent/be_exec_version_manager.cpp @@ -17,12 +17,9 @@ #include "agent/be_exec_version_manager.h" -namespace doris { +#include "common/exception.h" -const std::map<int, const std::set<std::string>> AGGREGATION_CHANGE_MAP = { - {AGGREGATION_2_1_VERSION, - {"window_funnel", "stddev_samp", "variance_samp", "percentile_approx_weighted", - "percentile_approx", "covar_samp", "percentile", "percentile_array"}}}; +namespace doris { Status BeExecVersionManager::check_be_exec_version(int be_exec_version) { if (be_exec_version > max_be_exec_version || be_exec_version < min_be_exec_version) { @@ -35,19 +32,35 @@ Status BeExecVersionManager::check_be_exec_version(int be_exec_version) { return Status::OK(); } -void BeExecVersionManager::check_agg_state_compatibility(int current_be_exec_version, - int data_be_exec_version, - std::string function_name) { - if (current_be_exec_version > AGGREGATION_2_1_VERSION && - data_be_exec_version <= AGGREGATION_2_1_VERSION && - AGGREGATION_CHANGE_MAP.find(AGGREGATION_2_1_VERSION)->second.contains(function_name)) { - throw Exception(Status::InternalError( - "agg state data with {} is not supported, " - "current_be_exec_version={}, data_be_exec_version={}, need to rebuild the data " - "or set the be_exec_version={} in fe.conf", - function_name, current_be_exec_version, data_be_exec_version, - AGGREGATION_2_1_VERSION)); +int BeExecVersionManager::get_function_compatibility(int be_exec_version, + std::string function_name) { + auto it = _function_change_map.find(function_name); + if (it == _function_change_map.end()) { + // 0 means no compatibility issues need to be dealt with + return 0; + } + + auto version_it = it->second.lower_bound(be_exec_version); + if (version_it == it->second.end()) { + return 0; + } + + return *version_it; +} + +void BeExecVersionManager::check_function_compatibility(int current_be_exec_version, + int data_be_exec_version, + std::string function_name) { + if (get_function_compatibility(current_be_exec_version, function_name) == + get_function_compatibility(data_be_exec_version, function_name)) { + return; } + + throw Exception(Status::InternalError( + "agg state data with {} is not supported, " + "current_be_exec_version={}, data_be_exec_version={}, need to rebuild the data " + "or set the be_exec_version={} in fe.conf temporary", + function_name, current_be_exec_version, data_be_exec_version, data_be_exec_version)); } /** @@ -88,5 +101,5 @@ void BeExecVersionManager::check_agg_state_compatibility(int current_be_exec_ver */ const int BeExecVersionManager::max_be_exec_version = 7; const int BeExecVersionManager::min_be_exec_version = 0; - +std::map<std::string, std::set<int>> BeExecVersionManager::_function_change_map {}; } // namespace doris diff --git a/be/src/agent/be_exec_version_manager.h b/be/src/agent/be_exec_version_manager.h index 16092197a3a..7ab3c7de23a 100644 --- a/be/src/agent/be_exec_version_manager.h +++ b/be/src/agent/be_exec_version_manager.h @@ -25,13 +25,14 @@ namespace doris { +constexpr static int AGG_FUNCTION_NEW_WINDOW_FUNNEL = 6; constexpr inline int BITMAP_SERDE = 3; constexpr inline int USE_NEW_SERDE = 4; // release on DORIS version 2.1 constexpr inline int OLD_WAL_SERDE = 3; // use to solve compatibility issues, see pr #32299 constexpr inline int AGG_FUNCTION_NULLABLE = 5; // change some agg nullable property: PR #37215 constexpr inline int VARIANT_SERDE = 6; // change variant serde to fix PR #38413 constexpr inline int AGGREGATION_2_1_VERSION = - 5; // some aggregation changed the data format after this version + 6; // some aggregation changed the data format after this version class BeExecVersionManager { public: @@ -39,14 +40,28 @@ public: static Status check_be_exec_version(int be_exec_version); - static void check_agg_state_compatibility(int current_be_exec_version, int data_be_exec_version, - std::string function_name); + static int get_function_compatibility(int be_exec_version, std::string function_name); + + static void check_function_compatibility(int current_be_exec_version, int data_be_exec_version, + std::string function_name); static int get_newest_version() { return max_be_exec_version; } + static std::string get_function_suffix(int be_exec_version) { + return "_for_old_version_" + std::to_string(be_exec_version); + } + + // For example, there are incompatible changes between version=7 and version=6, at this time breaking_old_version is 6. + static void registe_old_function_compatibility(int breaking_old_version, + std::string function_name) { + _function_change_map[function_name].insert(breaking_old_version); + } + private: static const int max_be_exec_version; static const int min_be_exec_version; + // [function name] -> [breaking change start version] + static std::map<std::string, std::set<int>> _function_change_map; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 7b5b39a4c39..3c9b5b7ce7e 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -201,7 +201,7 @@ Status ColumnReader::create_agg_state(const ColumnReaderOptions& opts, const Col auto data_type = vectorized::DataTypeFactory::instance().create_data_type(meta); const auto* agg_state_type = assert_cast<const vectorized::DataTypeAggState*>(data_type.get()); - agg_state_type->check_agg_state_compatibility(opts.be_exec_version); + agg_state_type->check_function_compatibility(opts.be_exec_version); auto type = agg_state_type->get_serialized_type()->get_type_as_type_descriptor().type; if (read_as_string(type)) { diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 813a5e5519f..83b2bd4f702 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -675,7 +675,7 @@ bool TabletColumn::is_row_store_column() const { vectorized::AggregateFunctionPtr TabletColumn::get_aggregate_function_union( vectorized::DataTypePtr type, int current_be_exec_version) const { const auto* state_type = assert_cast<const vectorized::DataTypeAggState*>(type.get()); - BeExecVersionManager::check_agg_state_compatibility( + BeExecVersionManager::check_function_compatibility( current_be_exec_version, _be_exec_version, state_type->get_nested_function()->get_name()); return vectorized::AggregateStateUnion::create(state_type->get_nested_function(), {type}, type); diff --git a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp index 790d0270aa3..76a2881dd78 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp @@ -81,9 +81,11 @@ void register_aggregate_function_covar_pop(AggregateFunctionSimpleFactory& facto void register_aggregate_function_covar_samp_old(AggregateFunctionSimpleFactory& factory) { factory.register_alternative_function( - "covar_samp", create_aggregate_function_covariance_samp_old<NOTNULLABLE>); - factory.register_alternative_function( - "covar_samp", create_aggregate_function_covariance_samp_old<NULLABLE>, NULLABLE); + "covar_samp", create_aggregate_function_covariance_samp_old<NOTNULLABLE>, false, + AGG_FUNCTION_NULLABLE); + factory.register_alternative_function("covar_samp", + create_aggregate_function_covariance_samp_old<NULLABLE>, + true, AGG_FUNCTION_NULLABLE); } void register_aggregate_function_covar_samp(AggregateFunctionSimpleFactory& factory) { diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp b/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp index a8767e6fae7..00034776607 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp @@ -111,16 +111,20 @@ void register_aggregate_function_percentile(AggregateFunctionSimpleFactory& fact } void register_percentile_approx_old_function(AggregateFunctionSimpleFactory& factory) { - factory.register_alternative_function( - "percentile_approx", create_aggregate_function_percentile_approx_older<false>, false); - factory.register_alternative_function( - "percentile_approx", create_aggregate_function_percentile_approx_older<true>, true); + factory.register_alternative_function("percentile_approx", + create_aggregate_function_percentile_approx_older<false>, + false, AGG_FUNCTION_NULLABLE); + factory.register_alternative_function("percentile_approx", + create_aggregate_function_percentile_approx_older<true>, + true, AGG_FUNCTION_NULLABLE); factory.register_alternative_function( "percentile_approx_weighted", - create_aggregate_function_percentile_approx_weighted_older<false>, false); + create_aggregate_function_percentile_approx_weighted_older<false>, false, + AGG_FUNCTION_NULLABLE); factory.register_alternative_function( "percentile_approx_weighted", - create_aggregate_function_percentile_approx_weighted_older<true>, true); + create_aggregate_function_percentile_approx_weighted_older<true>, true, + AGG_FUNCTION_NULLABLE); } void register_aggregate_function_percentile_approx(AggregateFunctionSimpleFactory& factory) { diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp index 01fdddf6074..5ad1ea8f2d3 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp @@ -24,13 +24,16 @@ namespace doris::vectorized { void register_aggregate_function_percentile_old(AggregateFunctionSimpleFactory& factory) { factory.register_alternative_function( - "percentile", creator_without_type::creator<AggregateFunctionPercentileOld>); + "percentile", creator_without_type::creator<AggregateFunctionPercentileOld>, false, + AGG_FUNCTION_NULLABLE); factory.register_alternative_function( - "percentile", creator_without_type::creator<AggregateFunctionPercentileOld>, true); + "percentile", creator_without_type::creator<AggregateFunctionPercentileOld>, true, + AGG_FUNCTION_NULLABLE); factory.register_alternative_function( - "percentile_array", creator_without_type::creator<AggregateFunctionPercentileArrayOld>); + "percentile_array", creator_without_type::creator<AggregateFunctionPercentileArrayOld>, + false, AGG_FUNCTION_NULLABLE); factory.register_alternative_function( "percentile_array", creator_without_type::creator<AggregateFunctionPercentileArrayOld>, - true); + true, AGG_FUNCTION_NULLABLE); } } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h index cc504b9f996..b22504dda9c 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h +++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h @@ -59,11 +59,6 @@ private: AggregateFunctions aggregate_functions; AggregateFunctions nullable_aggregate_functions; std::unordered_map<std::string, std::string> function_alias; - /// @TEMPORARY: for be_exec_version=4 - /// in order to solve agg of sum/count is not compatibility during the upgrade process - constexpr static int AGG_FUNCTION_NEW = 7; - /// @TEMPORARY: for be_exec_version < AGG_FUNCTION_NEW. replace function to old version. - std::unordered_map<std::string, std::string> function_to_replace; public: void register_nullable_function_combinator(const Creator& creator) { @@ -177,21 +172,19 @@ public: } } - /// @TEMPORARY: for be_exec_version < AGG_FUNCTION_NEW void register_alternative_function(const std::string& name, const Creator& creator, - bool nullable = false) { - static std::string suffix {"_old_for_version_before_2_0"}; - register_function(name + suffix, creator, nullable); - function_to_replace[name] = name + suffix; + bool nullable, int old_be_exec_version) { + auto new_name = name + BeExecVersionManager::get_function_suffix(old_be_exec_version); + register_function(new_name, creator, nullable); + BeExecVersionManager::registe_old_function_compatibility(old_be_exec_version, name); } - /// @TEMPORARY: for be_exec_version < AGG_FUNCTION_NEW void temporary_function_update(int fe_version_now, std::string& name) { - // replace if fe is old version. - if (fe_version_now < AGG_FUNCTION_NEW && - function_to_replace.find(name) != function_to_replace.end()) { - name = function_to_replace[name]; + int old_version = BeExecVersionManager::get_function_compatibility(fe_version_now, name); + if (!old_version) { + return; } + name = name + BeExecVersionManager::get_function_suffix(old_version); } static AggregateFunctionSimpleFactory& instance(); diff --git a/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp b/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp index 1d977c1c528..b9e39552395 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp @@ -109,13 +109,17 @@ void register_aggregate_function_stddev_variance_pop(AggregateFunctionSimpleFact void register_aggregate_function_stddev_variance_samp_old(AggregateFunctionSimpleFactory& factory) { factory.register_alternative_function( - "variance_samp", create_aggregate_function_variance_samp_older<false, false>); + "variance_samp", create_aggregate_function_variance_samp_older<false, false>, false, + AGG_FUNCTION_NULLABLE); factory.register_alternative_function( - "variance_samp", create_aggregate_function_variance_samp_older<false, true>, true); + "variance_samp", create_aggregate_function_variance_samp_older<false, true>, true, + AGG_FUNCTION_NULLABLE); factory.register_alternative_function("stddev_samp", - create_aggregate_function_stddev_samp_older<true, false>); - factory.register_alternative_function( - "stddev_samp", create_aggregate_function_stddev_samp_older<true, true>, true); + create_aggregate_function_stddev_samp_older<true, false>, + false, AGG_FUNCTION_NULLABLE); + factory.register_alternative_function("stddev_samp", + create_aggregate_function_stddev_samp_older<true, true>, + true, AGG_FUNCTION_NULLABLE); } void register_aggregate_function_stddev_variance_samp(AggregateFunctionSimpleFactory& factory) { diff --git a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp index 8bfdcc26f43..598c23eb147 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp @@ -77,8 +77,10 @@ void register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& f } void register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory& factory) { factory.register_alternative_function("window_funnel", - create_aggregate_function_window_funnel_old, true); + create_aggregate_function_window_funnel_old, true, + AGG_FUNCTION_NEW_WINDOW_FUNNEL); factory.register_alternative_function("window_funnel", - create_aggregate_function_window_funnel_old, false); + create_aggregate_function_window_funnel_old, false, + AGG_FUNCTION_NEW_WINDOW_FUNNEL); } } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_agg_state.h b/be/src/vec/data_types/data_type_agg_state.h index d7089503b01..35f86f23b2b 100644 --- a/be/src/vec/data_types/data_type_agg_state.h +++ b/be/src/vec/data_types/data_type_agg_state.h @@ -122,9 +122,9 @@ public: DataTypePtr get_serialized_type() const { return _agg_serialized_type; } - void check_agg_state_compatibility(int read_be_exec_version) const { - BeExecVersionManager::check_agg_state_compatibility(read_be_exec_version, _be_exec_version, - get_nested_function()->get_name()); + void check_function_compatibility(int read_be_exec_version) const { + BeExecVersionManager::check_function_compatibility(read_be_exec_version, _be_exec_version, + get_nested_function()->get_name()); } private: diff --git a/be/src/vec/functions/simple_function_factory.h b/be/src/vec/functions/simple_function_factory.h index 33a3202c18e..d8b544d5bfd 100644 --- a/be/src/vec/functions/simple_function_factory.h +++ b/be/src/vec/functions/simple_function_factory.h @@ -151,14 +151,6 @@ public: function_alias[alias] = name; } - /// @TEMPORARY: for be_exec_version=4 - template <class Function> - void register_alternative_function() { - static std::string suffix {"_old_for_version_before_5_0"}; - function_to_replace[Function::name] = Function::name + suffix; - register_function(Function::name + suffix, &createDefaultFunction<Function>); - } - FunctionBasePtr get_function(const std::string& name, const ColumnsWithTypeAndName& arguments, const DataTypePtr& return_type, int be_version = BeExecVersionManager::get_newest_version()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org