This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9986fa747ad [Chore](compatible) adjust register_alternative_function 
(#40941)
9986fa747ad is described below

commit 9986fa747ad6f44bd855e2a997bfabbe2e55fa9e
Author: Pxl <pxl...@qq.com>
AuthorDate: Thu Sep 19 16:24:40 2024 +0800

    [Chore](compatible) adjust register_alternative_function (#40941)
    
    ## Proposed changes
    adjust register_alternative_function
---
 be/src/agent/be_exec_version_manager.cpp           | 49 ++++++++++++++--------
 be/src/agent/be_exec_version_manager.h             | 21 ++++++++--
 be/src/olap/rowset/segment_v2/column_reader.cpp    |  2 +-
 be/src/olap/tablet_schema.cpp                      |  2 +-
 .../aggregate_function_covar.cpp                   |  8 ++--
 .../aggregate_function_percentile.cpp              | 16 ++++---
 .../aggregate_function_percentile_approx.cpp       | 11 +++--
 .../aggregate_function_simple_factory.h            | 23 ++++------
 .../aggregate_function_stddev.cpp                  | 14 ++++---
 .../aggregate_function_window_funnel.cpp           |  6 ++-
 be/src/vec/data_types/data_type_agg_state.h        |  6 +--
 be/src/vec/functions/simple_function_factory.h     |  8 ----
 12 files changed, 97 insertions(+), 69 deletions(-)

diff --git a/be/src/agent/be_exec_version_manager.cpp 
b/be/src/agent/be_exec_version_manager.cpp
index 32cbe569892..e44829ae39b 100644
--- a/be/src/agent/be_exec_version_manager.cpp
+++ b/be/src/agent/be_exec_version_manager.cpp
@@ -17,12 +17,9 @@
 
 #include "agent/be_exec_version_manager.h"
 
-namespace doris {
+#include "common/exception.h"
 
-const std::map<int, const std::set<std::string>> AGGREGATION_CHANGE_MAP = {
-        {AGGREGATION_2_1_VERSION,
-         {"window_funnel", "stddev_samp", "variance_samp", 
"percentile_approx_weighted",
-          "percentile_approx", "covar_samp", "percentile", 
"percentile_array"}}};
+namespace doris {
 
 Status BeExecVersionManager::check_be_exec_version(int be_exec_version) {
     if (be_exec_version > max_be_exec_version || be_exec_version < 
min_be_exec_version) {
@@ -35,19 +32,35 @@ Status BeExecVersionManager::check_be_exec_version(int 
be_exec_version) {
     return Status::OK();
 }
 
-void BeExecVersionManager::check_agg_state_compatibility(int 
current_be_exec_version,
-                                                         int 
data_be_exec_version,
-                                                         std::string 
function_name) {
-    if (current_be_exec_version > AGGREGATION_2_1_VERSION &&
-        data_be_exec_version <= AGGREGATION_2_1_VERSION &&
-        
AGGREGATION_CHANGE_MAP.find(AGGREGATION_2_1_VERSION)->second.contains(function_name))
 {
-        throw Exception(Status::InternalError(
-                "agg state data with {} is not supported, "
-                "current_be_exec_version={}, data_be_exec_version={}, need to 
rebuild the data "
-                "or set the be_exec_version={} in fe.conf",
-                function_name, current_be_exec_version, data_be_exec_version,
-                AGGREGATION_2_1_VERSION));
+int BeExecVersionManager::get_function_compatibility(int be_exec_version,
+                                                     std::string 
function_name) {
+    auto it = _function_change_map.find(function_name);
+    if (it == _function_change_map.end()) {
+        // 0 means no compatibility issues need to be dealt with
+        return 0;
+    }
+
+    auto version_it = it->second.lower_bound(be_exec_version);
+    if (version_it == it->second.end()) {
+        return 0;
+    }
+
+    return *version_it;
+}
+
+void BeExecVersionManager::check_function_compatibility(int 
current_be_exec_version,
+                                                        int 
data_be_exec_version,
+                                                        std::string 
function_name) {
+    if (get_function_compatibility(current_be_exec_version, function_name) ==
+        get_function_compatibility(data_be_exec_version, function_name)) {
+        return;
     }
+
+    throw Exception(Status::InternalError(
+            "agg state data with {} is not supported, "
+            "current_be_exec_version={}, data_be_exec_version={}, need to 
rebuild the data "
+            "or set the be_exec_version={} in fe.conf temporary",
+            function_name, current_be_exec_version, data_be_exec_version, 
data_be_exec_version));
 }
 
 /**
@@ -88,5 +101,5 @@ void BeExecVersionManager::check_agg_state_compatibility(int 
current_be_exec_ver
  */
 const int BeExecVersionManager::max_be_exec_version = 7;
 const int BeExecVersionManager::min_be_exec_version = 0;
-
+std::map<std::string, std::set<int>> 
BeExecVersionManager::_function_change_map {};
 } // namespace doris
diff --git a/be/src/agent/be_exec_version_manager.h 
b/be/src/agent/be_exec_version_manager.h
index 16092197a3a..7ab3c7de23a 100644
--- a/be/src/agent/be_exec_version_manager.h
+++ b/be/src/agent/be_exec_version_manager.h
@@ -25,13 +25,14 @@
 
 namespace doris {
 
+constexpr static int AGG_FUNCTION_NEW_WINDOW_FUNNEL = 6;
 constexpr inline int BITMAP_SERDE = 3;
 constexpr inline int USE_NEW_SERDE = 4;         // release on DORIS version 2.1
 constexpr inline int OLD_WAL_SERDE = 3;         // use to solve compatibility 
issues, see pr #32299
 constexpr inline int AGG_FUNCTION_NULLABLE = 5; // change some agg nullable 
property: PR #37215
 constexpr inline int VARIANT_SERDE = 6;         // change variant serde to fix 
PR #38413
 constexpr inline int AGGREGATION_2_1_VERSION =
-        5; // some aggregation changed the data format after this version
+        6; // some aggregation changed the data format after this version
 
 class BeExecVersionManager {
 public:
@@ -39,14 +40,28 @@ public:
 
     static Status check_be_exec_version(int be_exec_version);
 
-    static void check_agg_state_compatibility(int current_be_exec_version, int 
data_be_exec_version,
-                                              std::string function_name);
+    static int get_function_compatibility(int be_exec_version, std::string 
function_name);
+
+    static void check_function_compatibility(int current_be_exec_version, int 
data_be_exec_version,
+                                             std::string function_name);
 
     static int get_newest_version() { return max_be_exec_version; }
 
+    static std::string get_function_suffix(int be_exec_version) {
+        return "_for_old_version_" + std::to_string(be_exec_version);
+    }
+
+    // For example, there are incompatible changes between version=7 and 
version=6, at this time breaking_old_version is 6.
+    static void registe_old_function_compatibility(int breaking_old_version,
+                                                   std::string function_name) {
+        _function_change_map[function_name].insert(breaking_old_version);
+    }
+
 private:
     static const int max_be_exec_version;
     static const int min_be_exec_version;
+    // [function name] -> [breaking change start version]
+    static std::map<std::string, std::set<int>> _function_change_map;
 };
 
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp 
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 7b5b39a4c39..3c9b5b7ce7e 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -201,7 +201,7 @@ Status ColumnReader::create_agg_state(const 
ColumnReaderOptions& opts, const Col
 
     auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(meta);
     const auto* agg_state_type = assert_cast<const 
vectorized::DataTypeAggState*>(data_type.get());
-    agg_state_type->check_agg_state_compatibility(opts.be_exec_version);
+    agg_state_type->check_function_compatibility(opts.be_exec_version);
     auto type = 
agg_state_type->get_serialized_type()->get_type_as_type_descriptor().type;
 
     if (read_as_string(type)) {
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 813a5e5519f..83b2bd4f702 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -675,7 +675,7 @@ bool TabletColumn::is_row_store_column() const {
 vectorized::AggregateFunctionPtr TabletColumn::get_aggregate_function_union(
         vectorized::DataTypePtr type, int current_be_exec_version) const {
     const auto* state_type = assert_cast<const 
vectorized::DataTypeAggState*>(type.get());
-    BeExecVersionManager::check_agg_state_compatibility(
+    BeExecVersionManager::check_function_compatibility(
             current_be_exec_version, _be_exec_version,
             state_type->get_nested_function()->get_name());
     return 
vectorized::AggregateStateUnion::create(state_type->get_nested_function(), 
{type}, type);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp
index 790d0270aa3..76a2881dd78 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_covar.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_covar.cpp
@@ -81,9 +81,11 @@ void 
register_aggregate_function_covar_pop(AggregateFunctionSimpleFactory& facto
 
 void 
register_aggregate_function_covar_samp_old(AggregateFunctionSimpleFactory& 
factory) {
     factory.register_alternative_function(
-            "covar_samp", 
create_aggregate_function_covariance_samp_old<NOTNULLABLE>);
-    factory.register_alternative_function(
-            "covar_samp", 
create_aggregate_function_covariance_samp_old<NULLABLE>, NULLABLE);
+            "covar_samp", 
create_aggregate_function_covariance_samp_old<NOTNULLABLE>, false,
+            AGG_FUNCTION_NULLABLE);
+    factory.register_alternative_function("covar_samp",
+                                          
create_aggregate_function_covariance_samp_old<NULLABLE>,
+                                          true, AGG_FUNCTION_NULLABLE);
 }
 
 void register_aggregate_function_covar_samp(AggregateFunctionSimpleFactory& 
factory) {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp
index a8767e6fae7..00034776607 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_percentile.cpp
@@ -111,16 +111,20 @@ void 
register_aggregate_function_percentile(AggregateFunctionSimpleFactory& fact
 }
 
 void register_percentile_approx_old_function(AggregateFunctionSimpleFactory& 
factory) {
-    factory.register_alternative_function(
-            "percentile_approx", 
create_aggregate_function_percentile_approx_older<false>, false);
-    factory.register_alternative_function(
-            "percentile_approx", 
create_aggregate_function_percentile_approx_older<true>, true);
+    factory.register_alternative_function("percentile_approx",
+                                          
create_aggregate_function_percentile_approx_older<false>,
+                                          false, AGG_FUNCTION_NULLABLE);
+    factory.register_alternative_function("percentile_approx",
+                                          
create_aggregate_function_percentile_approx_older<true>,
+                                          true, AGG_FUNCTION_NULLABLE);
     factory.register_alternative_function(
             "percentile_approx_weighted",
-            create_aggregate_function_percentile_approx_weighted_older<false>, 
false);
+            create_aggregate_function_percentile_approx_weighted_older<false>, 
false,
+            AGG_FUNCTION_NULLABLE);
     factory.register_alternative_function(
             "percentile_approx_weighted",
-            create_aggregate_function_percentile_approx_weighted_older<true>, 
true);
+            create_aggregate_function_percentile_approx_weighted_older<true>, 
true,
+            AGG_FUNCTION_NULLABLE);
 }
 
 void 
register_aggregate_function_percentile_approx(AggregateFunctionSimpleFactory& 
factory) {
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp
index 01fdddf6074..5ad1ea8f2d3 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_percentile_approx.cpp
@@ -24,13 +24,16 @@ namespace doris::vectorized {
 
 void 
register_aggregate_function_percentile_old(AggregateFunctionSimpleFactory& 
factory) {
     factory.register_alternative_function(
-            "percentile", 
creator_without_type::creator<AggregateFunctionPercentileOld>);
+            "percentile", 
creator_without_type::creator<AggregateFunctionPercentileOld>, false,
+            AGG_FUNCTION_NULLABLE);
     factory.register_alternative_function(
-            "percentile", 
creator_without_type::creator<AggregateFunctionPercentileOld>, true);
+            "percentile", 
creator_without_type::creator<AggregateFunctionPercentileOld>, true,
+            AGG_FUNCTION_NULLABLE);
     factory.register_alternative_function(
-            "percentile_array", 
creator_without_type::creator<AggregateFunctionPercentileArrayOld>);
+            "percentile_array", 
creator_without_type::creator<AggregateFunctionPercentileArrayOld>,
+            false, AGG_FUNCTION_NULLABLE);
     factory.register_alternative_function(
             "percentile_array", 
creator_without_type::creator<AggregateFunctionPercentileArrayOld>,
-            true);
+            true, AGG_FUNCTION_NULLABLE);
 }
 } // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h 
b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
index cc504b9f996..b22504dda9c 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.h
@@ -59,11 +59,6 @@ private:
     AggregateFunctions aggregate_functions;
     AggregateFunctions nullable_aggregate_functions;
     std::unordered_map<std::string, std::string> function_alias;
-    /// @TEMPORARY: for be_exec_version=4
-    /// in order to solve agg of sum/count is not compatibility during the 
upgrade process
-    constexpr static int AGG_FUNCTION_NEW = 7;
-    /// @TEMPORARY: for be_exec_version < AGG_FUNCTION_NEW. replace function 
to old version.
-    std::unordered_map<std::string, std::string> function_to_replace;
 
 public:
     void register_nullable_function_combinator(const Creator& creator) {
@@ -177,21 +172,19 @@ public:
         }
     }
 
-    /// @TEMPORARY: for be_exec_version < AGG_FUNCTION_NEW
     void register_alternative_function(const std::string& name, const Creator& 
creator,
-                                       bool nullable = false) {
-        static std::string suffix {"_old_for_version_before_2_0"};
-        register_function(name + suffix, creator, nullable);
-        function_to_replace[name] = name + suffix;
+                                       bool nullable, int old_be_exec_version) 
{
+        auto new_name = name + 
BeExecVersionManager::get_function_suffix(old_be_exec_version);
+        register_function(new_name, creator, nullable);
+        
BeExecVersionManager::registe_old_function_compatibility(old_be_exec_version, 
name);
     }
 
-    /// @TEMPORARY: for be_exec_version < AGG_FUNCTION_NEW
     void temporary_function_update(int fe_version_now, std::string& name) {
-        // replace if fe is old version.
-        if (fe_version_now < AGG_FUNCTION_NEW &&
-            function_to_replace.find(name) != function_to_replace.end()) {
-            name = function_to_replace[name];
+        int old_version = 
BeExecVersionManager::get_function_compatibility(fe_version_now, name);
+        if (!old_version) {
+            return;
         }
+        name = name + BeExecVersionManager::get_function_suffix(old_version);
     }
 
     static AggregateFunctionSimpleFactory& instance();
diff --git a/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp
index 1d977c1c528..b9e39552395 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_stddev.cpp
@@ -109,13 +109,17 @@ void 
register_aggregate_function_stddev_variance_pop(AggregateFunctionSimpleFact
 
 void 
register_aggregate_function_stddev_variance_samp_old(AggregateFunctionSimpleFactory&
 factory) {
     factory.register_alternative_function(
-            "variance_samp", 
create_aggregate_function_variance_samp_older<false, false>);
+            "variance_samp", 
create_aggregate_function_variance_samp_older<false, false>, false,
+            AGG_FUNCTION_NULLABLE);
     factory.register_alternative_function(
-            "variance_samp", 
create_aggregate_function_variance_samp_older<false, true>, true);
+            "variance_samp", 
create_aggregate_function_variance_samp_older<false, true>, true,
+            AGG_FUNCTION_NULLABLE);
     factory.register_alternative_function("stddev_samp",
-                                          
create_aggregate_function_stddev_samp_older<true, false>);
-    factory.register_alternative_function(
-            "stddev_samp", create_aggregate_function_stddev_samp_older<true, 
true>, true);
+                                          
create_aggregate_function_stddev_samp_older<true, false>,
+                                          false, AGG_FUNCTION_NULLABLE);
+    factory.register_alternative_function("stddev_samp",
+                                          
create_aggregate_function_stddev_samp_older<true, true>,
+                                          true, AGG_FUNCTION_NULLABLE);
 }
 
 void 
register_aggregate_function_stddev_variance_samp(AggregateFunctionSimpleFactory&
 factory) {
diff --git 
a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp 
b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp
index 8bfdcc26f43..598c23eb147 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp
+++ b/be/src/vec/aggregate_functions/aggregate_function_window_funnel.cpp
@@ -77,8 +77,10 @@ void 
register_aggregate_function_window_funnel(AggregateFunctionSimpleFactory& f
 }
 void 
register_aggregate_function_window_funnel_old(AggregateFunctionSimpleFactory& 
factory) {
     factory.register_alternative_function("window_funnel",
-                                          
create_aggregate_function_window_funnel_old, true);
+                                          
create_aggregate_function_window_funnel_old, true,
+                                          AGG_FUNCTION_NEW_WINDOW_FUNNEL);
     factory.register_alternative_function("window_funnel",
-                                          
create_aggregate_function_window_funnel_old, false);
+                                          
create_aggregate_function_window_funnel_old, false,
+                                          AGG_FUNCTION_NEW_WINDOW_FUNNEL);
 }
 } // namespace doris::vectorized
diff --git a/be/src/vec/data_types/data_type_agg_state.h 
b/be/src/vec/data_types/data_type_agg_state.h
index d7089503b01..35f86f23b2b 100644
--- a/be/src/vec/data_types/data_type_agg_state.h
+++ b/be/src/vec/data_types/data_type_agg_state.h
@@ -122,9 +122,9 @@ public:
 
     DataTypePtr get_serialized_type() const { return _agg_serialized_type; }
 
-    void check_agg_state_compatibility(int read_be_exec_version) const {
-        
BeExecVersionManager::check_agg_state_compatibility(read_be_exec_version, 
_be_exec_version,
-                                                            
get_nested_function()->get_name());
+    void check_function_compatibility(int read_be_exec_version) const {
+        
BeExecVersionManager::check_function_compatibility(read_be_exec_version, 
_be_exec_version,
+                                                           
get_nested_function()->get_name());
     }
 
 private:
diff --git a/be/src/vec/functions/simple_function_factory.h 
b/be/src/vec/functions/simple_function_factory.h
index 33a3202c18e..d8b544d5bfd 100644
--- a/be/src/vec/functions/simple_function_factory.h
+++ b/be/src/vec/functions/simple_function_factory.h
@@ -151,14 +151,6 @@ public:
         function_alias[alias] = name;
     }
 
-    /// @TEMPORARY: for be_exec_version=4
-    template <class Function>
-    void register_alternative_function() {
-        static std::string suffix {"_old_for_version_before_5_0"};
-        function_to_replace[Function::name] = Function::name + suffix;
-        register_function(Function::name + suffix, 
&createDefaultFunction<Function>);
-    }
-
     FunctionBasePtr get_function(const std::string& name, const 
ColumnsWithTypeAndName& arguments,
                                  const DataTypePtr& return_type,
                                  int be_version = 
BeExecVersionManager::get_newest_version()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to