This is an automated email from the ASF dual-hosted git repository. panxiaolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d0329a6f922 [Feature](agg-state) add be_exec_version to agg state type meta (#36607) d0329a6f922 is described below commit d0329a6f922ba5dfb3a335414fcf23854245040d Author: Pxl <pxl...@qq.com> AuthorDate: Mon Jun 24 16:46:15 2024 +0800 [Feature](agg-state) add be_exec_version to agg state type meta (#36607) ## Proposed changes add be_exec_version to agg state type meta --- be/src/olap/rowset/segment_v2/segment_writer.cpp | 1 + be/src/olap/schema_change.cpp | 4 +-- be/src/olap/tablet_meta.cpp | 4 +++ be/src/olap/tablet_schema.cpp | 34 +++++++++++++--------- be/src/olap/tablet_schema.h | 2 ++ be/src/runtime/types.h | 4 ++- .../aggregate_function_state_union.h | 8 +++-- be/src/vec/data_types/data_type_agg_state.h | 18 ++++++++---- be/src/vec/data_types/data_type_factory.cpp | 22 ++++++-------- .../org/apache/doris/catalog/AggStateType.java | 2 ++ .../main/java/org/apache/doris/catalog/Column.java | 2 ++ gensrc/proto/data.proto | 1 + gensrc/proto/olap_file.proto | 1 + gensrc/proto/segment_v2.proto | 1 + gensrc/thrift/Descriptors.thrift | 1 + gensrc/thrift/Types.thrift | 1 + .../diffrent_serialize/diffrent_serialize.out | 7 +++++ .../diffrent_serialize/diffrent_serialize.groovy | 7 +++++ .../suites/mv_p0/test_28741/test_28741.groovy | 1 + 19 files changed, 84 insertions(+), 37 deletions(-) diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 6c398f5813a..0d888f3ba87 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -174,6 +174,7 @@ void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id, } meta->set_result_is_nullable(column.get_result_is_nullable()); meta->set_function_name(column.get_aggregation_name()); + meta->set_be_exec_version(column.get_be_exec_version()); } Status SegmentWriter::init() { diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index f5d9137f663..599d9c1d142 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -153,13 +153,12 @@ public: for (int i = 0; i < rows; i++) { auto row_ref = row_refs[i]; - for (int j = key_number; j < columns; j++) { const auto* column_ptr = row_ref.get_column(j).get(); agg_functions[j - key_number]->add( agg_places[j - key_number], const_cast<const vectorized::IColumn**>(&column_ptr), row_ref.position, - nullptr); + &_arena); } if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) { @@ -245,6 +244,7 @@ private: BaseTabletSPtr _tablet; RowRefComparator _cmp; + vectorized::Arena _arena; }; BlockChanger::BlockChanger(TabletSchemaSPtr tablet_schema, DescriptorTbl desc_tbl) diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 84c09fa942e..26ed8d3ee57 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -371,6 +371,10 @@ void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tco column->set_result_is_nullable(tcolumn.result_is_nullable); } + if (tcolumn.__isset.be_exec_version) { + column->set_be_exec_version(tcolumn.be_exec_version); + } + if (tcolumn.column_type.type == TPrimitiveType::VARCHAR || tcolumn.column_type.type == TPrimitiveType::STRING) { if (!tcolumn.column_type.__isset.index_len) { diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index d8bf40a1171..cee6f3e2d2e 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -540,9 +540,12 @@ void TabletColumn::init_from_pb(const ColumnPB& column) { _aggregation = get_aggregation_type_by_string(column.aggregation()); _aggregation_name = column.aggregation(); } - if (column.has_result_is_nullable()) { + + if (_type == FieldType::OLAP_FIELD_TYPE_AGG_STATE) { _result_is_nullable = column.result_is_nullable(); + _be_exec_version = column.be_exec_version(); } + if (column.has_visible()) { _visible = column.visible(); } @@ -610,6 +613,7 @@ void TabletColumn::to_schema_pb(ColumnPB* column) const { column->set_aggregation(_aggregation_name); } column->set_result_is_nullable(_result_is_nullable); + column->set_be_exec_version(_be_exec_version); if (_has_bitmap_index) { column->set_has_bitmap_index(_has_bitmap_index); } @@ -657,28 +661,32 @@ bool TabletColumn::is_row_store_column() const { vectorized::AggregateFunctionPtr TabletColumn::get_aggregate_function_union( vectorized::DataTypePtr type) const { - auto state_type = assert_cast<const vectorized::DataTypeAggState*>(type.get()); + const auto* state_type = assert_cast<const vectorized::DataTypeAggState*>(type.get()); return vectorized::AggregateStateUnion::create(state_type->get_nested_function(), {type}, type); } vectorized::AggregateFunctionPtr TabletColumn::get_aggregate_function(std::string suffix) const { + vectorized::AggregateFunctionPtr function = nullptr; + auto type = vectorized::DataTypeFactory::instance().create_data_type(*this); if (type && type->get_type_as_type_descriptor().type == PrimitiveType::TYPE_AGG_STATE) { - return get_aggregate_function_union(type); + function = get_aggregate_function_union(type); + } else { + std::string origin_name = TabletColumn::get_string_by_aggregation_type(_aggregation); + std::string agg_name = origin_name + suffix; + std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), + [](unsigned char c) { return std::tolower(c); }); + function = vectorized::AggregateFunctionSimpleFactory::instance().get(agg_name, {type}, + type->is_nullable()); + if (!function) { + LOG(WARNING) << "get column aggregate function failed, aggregation_name=" << origin_name + << ", column_type=" << type->get_name(); + } } - - std::string origin_name = TabletColumn::get_string_by_aggregation_type(_aggregation); - std::string agg_name = origin_name + suffix; - std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), - [](unsigned char c) { return std::tolower(c); }); - - auto function = vectorized::AggregateFunctionSimpleFactory::instance().get(agg_name, {type}, - type->is_nullable()); if (function) { + function->set_version(_be_exec_version); return function; } - LOG(WARNING) << "get column aggregate function failed, aggregation_name=" << origin_name - << ", column_type=" << type->get_name(); return nullptr; } diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index ebfe06f3dc4..3a78f2e4748 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -153,6 +153,7 @@ public: bool is_row_store_column() const; std::string get_aggregation_name() const { return _aggregation_name; } bool get_result_is_nullable() const { return _result_is_nullable; } + int get_be_exec_version() const { return _be_exec_version; } bool has_path_info() const { return _column_path != nullptr && !_column_path->empty(); } const vectorized::PathInDataPtr& path_info_ptr() const { return _column_path; } // If it is an extracted column from variant column @@ -221,6 +222,7 @@ private: uint32_t _sub_column_count = 0; bool _result_is_nullable = false; + int _be_exec_version = -1; vectorized::PathInDataPtr _column_path; // Record information about columns merged into a sparse column within a variant diff --git a/be/src/runtime/types.h b/be/src/runtime/types.h index 4cb7d51e4b5..5d2f83c1bd6 100644 --- a/be/src/runtime/types.h +++ b/be/src/runtime/types.h @@ -38,7 +38,6 @@ extern const int HLL_COLUMN_DEFAULT_LEN; // Describes a type. Includes the enum, children types, and any type-specific metadata // (e.g. precision and scale for decimals). -// TODO for 2.3: rename to TypeDescriptor struct TypeDescriptor { PrimitiveType type; /// Only set if type == TYPE_CHAR or type == TYPE_VARCHAR @@ -57,6 +56,8 @@ struct TypeDescriptor { std::string function_name; + int be_exec_version = -1; + // Only set if type == TYPE_STRUCT. The field name of each child. std::vector<std::string> field_names; @@ -155,6 +156,7 @@ struct TypeDescriptor { result.result_is_nullable = t.result_is_nullable; DCHECK(t.__isset.function_name); result.function_name = t.function_name; + result.be_exec_version = t.be_exec_version; } return result; } diff --git a/be/src/vec/aggregate_functions/aggregate_function_state_union.h b/be/src/vec/aggregate_functions/aggregate_function_state_union.h index 3c9e2ed3767..6ff900c90d3 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_state_union.h +++ b/be/src/vec/aggregate_functions/aggregate_function_state_union.h @@ -17,6 +17,8 @@ #pragma once +#include <utility> + #include "vec/aggregate_functions/aggregate_function.h" #include "vec/data_types/data_type_agg_state.h" @@ -26,10 +28,10 @@ const static std::string AGG_UNION_SUFFIX = "_union"; class AggregateStateUnion : public IAggregateFunctionHelper<AggregateStateUnion> { public: AggregateStateUnion(AggregateFunctionPtr function, const DataTypes& argument_types_, - const DataTypePtr& return_type) + DataTypePtr return_type) : IAggregateFunctionHelper(argument_types_), - _function(function), - _return_type(return_type) {} + _function(std::move(function)), + _return_type(std::move(return_type)) {} ~AggregateStateUnion() override = default; static AggregateFunctionPtr create(AggregateFunctionPtr function, diff --git a/be/src/vec/data_types/data_type_agg_state.h b/be/src/vec/data_types/data_type_agg_state.h index ff6f1975e58..6b4fdc64562 100644 --- a/be/src/vec/data_types/data_type_agg_state.h +++ b/be/src/vec/data_types/data_type_agg_state.h @@ -35,24 +35,30 @@ namespace doris::vectorized { class DataTypeAggState : public DataTypeString { public: - DataTypeAggState(DataTypes sub_types, bool result_is_nullable, std::string function_name) + DataTypeAggState(DataTypes sub_types, bool result_is_nullable, std::string function_name, + int be_exec_version) : _result_is_nullable(result_is_nullable), _sub_types(std::move(sub_types)), - _function_name(std::move(function_name)) { + _function_name(std::move(function_name)), + _be_exec_version(be_exec_version) { _agg_function = AggregateFunctionSimpleFactory::instance().get(_function_name, _sub_types, _result_is_nullable); - if (_agg_function == nullptr) { + if (_agg_function == nullptr || + !BeExecVersionManager::check_be_exec_version(be_exec_version)) { throw Exception(ErrorCode::INVALID_ARGUMENT, "DataTypeAggState function get failed, type={}", do_get_name()); } + _agg_function->set_version(be_exec_version); _agg_serialized_type = _agg_function->get_serialized_type(); } const char* get_family_name() const override { return "AggState"; } std::string do_get_name() const override { - return fmt::format("AggState(function_name={},result_is_nullable={},arguments=[{}])", - _function_name, _result_is_nullable, get_types_string()); + return fmt::format( + "AggState(function_name={},result_is_nullable={},arguments=[{}],be_exec_version={}" + ")", + _function_name, _result_is_nullable, get_types_string(), _be_exec_version); } std::string get_function_name() const { return _function_name; } @@ -87,6 +93,7 @@ public: } col_meta->set_function_name(_function_name); col_meta->set_result_is_nullable(_result_is_nullable); + col_meta->set_be_exec_version(_be_exec_version); } AggregateFunctionPtr get_nested_function() const { return _agg_function; } @@ -133,6 +140,7 @@ private: AggregateFunctionPtr _agg_function; DataTypes _sub_types; std::string _function_name; + int _be_exec_version; }; } // namespace doris::vectorized diff --git a/be/src/vec/data_types/data_type_factory.cpp b/be/src/vec/data_types/data_type_factory.cpp index 8246977c6b0..17e6e06091e 100644 --- a/be/src/vec/data_types/data_type_factory.cpp +++ b/be/src/vec/data_types/data_type_factory.cpp @@ -76,11 +76,11 @@ DataTypePtr DataTypeFactory::create_data_type(const TabletColumn& col_desc, bool if (col_desc.type() == FieldType::OLAP_FIELD_TYPE_AGG_STATE) { DataTypes dataTypes; for (size_t i = 0; i < col_desc.get_subtype_count(); i++) { - dataTypes.push_back( - DataTypeFactory::instance().create_data_type(col_desc.get_sub_column(i))); + dataTypes.push_back(create_data_type(col_desc.get_sub_column(i))); } nested = std::make_shared<vectorized::DataTypeAggState>( - dataTypes, col_desc.get_result_is_nullable(), col_desc.get_aggregation_name()); + dataTypes, col_desc.get_result_is_nullable(), col_desc.get_aggregation_name(), + col_desc.get_be_exec_version()); } else if (col_desc.type() == FieldType::OLAP_FIELD_TYPE_ARRAY) { DCHECK(col_desc.get_subtype_count() == 1); nested = std::make_shared<DataTypeArray>(create_data_type(col_desc.get_sub_column(0))); @@ -101,13 +101,6 @@ DataTypePtr DataTypeFactory::create_data_type(const TabletColumn& col_desc, bool names.push_back(col_desc.get_sub_column(i).name()); } nested = std::make_shared<DataTypeStruct>(dataTypes, names); - } else if (col_desc.type() == FieldType::OLAP_FIELD_TYPE_AGG_STATE) { - DataTypes dataTypes; - for (size_t i = 0; i < col_desc.get_subtype_count(); i++) { - dataTypes.push_back(create_data_type(col_desc.get_sub_column(i))); - } - nested = std::make_shared<vectorized::DataTypeAggState>( - dataTypes, col_desc.get_result_is_nullable(), col_desc.get_aggregation_name()); } else { nested = _create_primitive_data_type(col_desc.type(), col_desc.precision(), col_desc.frac()); @@ -184,7 +177,8 @@ DataTypePtr DataTypeFactory::create_data_type(const TypeDescriptor& col_desc, bo subTypes.push_back(create_data_type(col_desc.children[i], col_desc.contains_nulls[i])); } nested = std::make_shared<vectorized::DataTypeAggState>( - subTypes, col_desc.result_is_nullable, col_desc.function_name); + subTypes, col_desc.result_is_nullable, col_desc.function_name, + col_desc.be_exec_version); break; case TYPE_JSONB: nested = std::make_shared<vectorized::DataTypeJsonb>(); @@ -577,7 +571,8 @@ DataTypePtr DataTypeFactory::create_data_type(const PColumnMeta& pcolumn) { sub_types.push_back(create_data_type(child)); } nested = std::make_shared<DataTypeAggState>(sub_types, pcolumn.result_is_nullable(), - pcolumn.function_name()); + pcolumn.function_name(), + pcolumn.be_exec_version()); break; } default: { @@ -600,7 +595,8 @@ DataTypePtr DataTypeFactory::create_data_type(const segment_v2::ColumnMetaPB& pc data_types.push_back(DataTypeFactory::instance().create_data_type(child)); } nested = std::make_shared<vectorized::DataTypeAggState>( - data_types, pcolumn.result_is_nullable(), pcolumn.function_name()); + data_types, pcolumn.result_is_nullable(), pcolumn.function_name(), + pcolumn.be_exec_version()); } else if (pcolumn.type() == static_cast<int>(FieldType::OLAP_FIELD_TYPE_ARRAY)) { // Item subcolumn and length subcolumn, for sparse columns only subcolumn DCHECK_GE(pcolumn.children_columns().size(), 1) << pcolumn.DebugString(); diff --git a/fe/fe-common/src/main/java/org/apache/doris/catalog/AggStateType.java b/fe/fe-common/src/main/java/org/apache/doris/catalog/AggStateType.java index d2c5b625ca6..296677da836 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/catalog/AggStateType.java +++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/AggStateType.java @@ -17,6 +17,7 @@ package org.apache.doris.catalog; +import org.apache.doris.common.Config; import org.apache.doris.thrift.TScalarType; import org.apache.doris.thrift.TTypeDesc; import org.apache.doris.thrift.TTypeNode; @@ -120,6 +121,7 @@ public class AggStateType extends Type { container.setSubTypes(types); container.setResultIsNullable(resultIsNullable); container.setFunctionName(functionName); + container.setBeExecVersion(Config.be_exec_version); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index d31ab73d83c..ca710188f5f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -587,6 +588,7 @@ public class Column implements Writable, GsonPostProcessable { for (Column column : children) { tColumn.addToChildrenColumn(column.toThrift()); } + tColumn.setBeExecVersion(Config.be_exec_version); } tColumn.setClusterKeyId(this.clusterKeyId); // ATTN: diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto index 755a3a042db..9b3824db3dc 100644 --- a/gensrc/proto/data.proto +++ b/gensrc/proto/data.proto @@ -62,6 +62,7 @@ message PColumnMeta { repeated PColumnMeta children = 5; optional bool result_is_nullable = 6; optional string function_name = 7; + optional int32 be_exec_version = 8; } message PBlock { diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index c4aa7654366..1f8f88801be 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -314,6 +314,7 @@ message ColumnPB { optional bool is_auto_increment = 22; // only reference by variant sparse columns optional int32 parent_unique_id = 23; + optional int32 be_exec_version = 24; } // Dictionary of Schema info, to reduce TabletSchemaCloudPB fdb kv size diff --git a/gensrc/proto/segment_v2.proto b/gensrc/proto/segment_v2.proto index ee82a5b5f1a..4c7183bae9a 100644 --- a/gensrc/proto/segment_v2.proto +++ b/gensrc/proto/segment_v2.proto @@ -196,6 +196,7 @@ message ColumnMetaPB { optional bool result_is_nullable = 18; // used on agg_state type optional string function_name = 19; // used on agg_state type + optional int32 be_exec_version = 20; // used on agg_state type } message PrimaryKeyIndexMetaPB { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 6c66c56e87c..2b8a74afd66 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -42,6 +42,7 @@ struct TColumn { 17: optional bool result_is_nullable 18: optional bool is_auto_increment = false; 19: optional i32 cluster_key_id = -1 + 20: optional i32 be_exec_version = -1 } struct TSlotDescriptor { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 925f7178cd4..01b7029f61d 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -168,6 +168,7 @@ struct TTypeDesc { 4: optional list<TTypeDesc> sub_types 5: optional bool result_is_nullable 6: optional string function_name + 7: optional i32 be_exec_version } enum TAggregationType { diff --git a/regression-test/data/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.out b/regression-test/data/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.out index 4e5d112ee95..87d4d391e55 100644 --- a/regression-test/data/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.out +++ b/regression-test/data/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.out @@ -13,6 +13,13 @@ 2 2 3 3 +-- !select_mv -- +\N 4 +-4 4 +1 1 +2 2 +3 3 + -- !select_mv -- \N 1 -4 1 diff --git a/regression-test/suites/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.groovy b/regression-test/suites/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.groovy index 9f1f26ae9af..9117615e28e 100644 --- a/regression-test/suites/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.groovy +++ b/regression-test/suites/mv_p0/agg_state/diffrent_serialize/diffrent_serialize.groovy @@ -37,6 +37,7 @@ suite ("diffrent_serialize") { sql "insert into d_table select 2,2,2,'b';" sql "insert into d_table select 3,3,null,'c';" + createMV("create materialized view mv1_1 as select k1,bitmap_intersect(to_bitmap(k2)) from d_table group by k1;") createMV("create materialized view mv1 as select k1,bitmap_agg(k2) from d_table group by k1;") /* createMV("create materialized view mv2 as select k1,map_agg(k2,k3) from d_table group by k1;") @@ -56,6 +57,12 @@ suite ("diffrent_serialize") { } qt_select_mv "select k1,bitmap_to_string(bitmap_agg(k2)) from d_table group by k1 order by 1;" + explain { + sql("select k1,bitmap_to_string(bitmap_intersect(to_bitmap(k2))) from d_table group by k1 order by 1;") + contains "(mv1_1)" + } + qt_select_mv "select k1,bitmap_to_string(bitmap_intersect(to_bitmap(k2))) from d_table group by k1 order by 1;" + sql "insert into d_table select 1,1,1,'a';" sql "insert into d_table select 1,2,1,'a';" diff --git a/regression-test/suites/mv_p0/test_28741/test_28741.groovy b/regression-test/suites/mv_p0/test_28741/test_28741.groovy index a6c2c878a91..9a3f0c1539c 100644 --- a/regression-test/suites/mv_p0/test_28741/test_28741.groovy +++ b/regression-test/suites/mv_p0/test_28741/test_28741.groovy @@ -37,6 +37,7 @@ suite ("test_28741") { "replication_num" = "1" ); """ + createMV ("CREATE MATERIALIZED VIEW mv_test AS SELECT a,b,t,SUM(d) FROM test GROUP BY 1,2,3") sql "INSERT INTO test(a,b,c,t,d,e) VALUES (1,2,3,'2023-12-19 18:21:00', 56, 78)" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org