This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 0967d7ec04 [improvement](agg) Do not serialize bitmap to string (#23172) 0967d7ec04 is described below commit 0967d7ec04cfd79c710e3e655e4225d394333780 Author: Jerry Hu <mrh...@gmail.com> AuthorDate: Mon Aug 21 10:10:15 2023 +0800 [improvement](agg) Do not serialize bitmap to string (#23172) --- be/src/agent/be_exec_version_manager.h | 6 +- be/src/util/bitmap_value.h | 4 + .../vec/aggregate_functions/aggregate_function.h | 11 +- .../aggregate_function_bitmap.h | 148 ++++++++++++++++++++- .../aggregate_function_bitmap_agg.h | 22 ++- be/src/vec/exec/vaggregation_node.cpp | 1 + be/src/vec/exprs/vectorized_agg_fn.h | 2 + .../main/java/org/apache/doris/common/Config.java | 2 +- 8 files changed, 181 insertions(+), 15 deletions(-) diff --git a/be/src/agent/be_exec_version_manager.h b/be/src/agent/be_exec_version_manager.h index 0ecc868651..25de399df3 100644 --- a/be/src/agent/be_exec_version_manager.h +++ b/be/src/agent/be_exec_version_manager.h @@ -56,10 +56,12 @@ private: * a. function month/day/hour/minute/second's return type is changed to smaller type. * b. in order to solve agg of sum/count is not compatibility during the upgrade process * c. change the string hash method in runtime filter - * d. elt funciton return type change to nullable(string) + * d. elt function return type change to nullable(string) * e. add repeat_max_num in repeat function + * 3: start from doris 2.1 + * a. aggregation function do not serialize bitmap to string */ -inline const int BeExecVersionManager::max_be_exec_version = 2; +inline const int BeExecVersionManager::max_be_exec_version = 3; inline const int BeExecVersionManager::min_be_exec_version = 0; } // namespace doris diff --git a/be/src/util/bitmap_value.h b/be/src/util/bitmap_value.h index ec75c4141c..96510bdde3 100644 --- a/be/src/util/bitmap_value.h +++ b/be/src/util/bitmap_value.h @@ -1191,6 +1191,10 @@ public: _is_shared = other._is_shared; _bitmap = std::move(other._bitmap); _set = std::move(other._set); + + other._type = EMPTY; + other._is_shared = false; + other._bitmap = nullptr; } BitmapValue& operator=(const BitmapValue& other) { diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h index 4b2118fc51..cc1b7d88f5 100644 --- a/be/src/vec/aggregate_functions/aggregate_function.h +++ b/be/src/vec/aggregate_functions/aggregate_function.h @@ -220,8 +220,11 @@ public: virtual DataTypePtr get_serialized_type() const { return std::make_shared<DataTypeString>(); } + virtual void set_version(const int version_) { version = version_; } + protected: DataTypes argument_types; + int version {}; }; /// Implement method to obtain an address of 'add' function. @@ -323,8 +326,8 @@ public: void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, MutableColumnPtr& dst, const size_t num_rows) const override { - VectorBufferWriter writter(assert_cast<ColumnString&>(*dst)); - serialize_vec(places, offset, writter, num_rows); + VectorBufferWriter writer(assert_cast<ColumnString&>(*dst)); + serialize_vec(places, offset, writer, num_rows); } void streaming_agg_serialize(const IColumn** columns, BufferWritable& buf, @@ -341,8 +344,8 @@ public: void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, const size_t num_rows, Arena* arena) const override { - VectorBufferWriter writter(assert_cast<ColumnString&>(*dst)); - streaming_agg_serialize(columns, writter, num_rows, arena); + VectorBufferWriter writer(assert_cast<ColumnString&>(*dst)); + streaming_agg_serialize(columns, writer, num_rows, arena); } void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h index 00d3517fa0..7d2634a8dc 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h +++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h @@ -146,10 +146,145 @@ struct AggregateFunctionBitmapData { BitmapValue& get() { return value; } }; +template <typename Data, typename Derived> +class AggregateFunctionBitmapSerializationHelper + : public IAggregateFunctionDataHelper<Data, Derived> { +public: + using BaseHelper = IAggregateFunctionHelper<Derived>; + + AggregateFunctionBitmapSerializationHelper(const DataTypes& argument_types_) + : IAggregateFunctionDataHelper<Data, Derived>(argument_types_) {} + + void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) const override { + if (version >= 3) { + auto& col = assert_cast<ColumnBitmap&>(*dst); + char place[sizeof(Data)]; + col.resize(num_rows); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + assert_cast<const Derived*>(this)->create(place); + DEFER({ assert_cast<const Derived*>(this)->destroy(place); }); + assert_cast<const Derived*>(this)->add(place, columns, i, arena); + data[i] = std::move(this->data(place).value); + } + } else { + BaseHelper::streaming_agg_serialize_to_column(columns, dst, num_rows, arena); + } + } + + void serialize_to_column(const std::vector<AggregateDataPtr>& places, size_t offset, + MutableColumnPtr& dst, const size_t num_rows) const override { + if (version >= 3) { + auto& col = assert_cast<ColumnBitmap&>(*dst); + col.resize(num_rows); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + data[i] = std::move(this->data(places[i] + offset).value); + } + } else { + BaseHelper::serialize_to_column(places, offset, dst, num_rows); + } + } + + void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column, + Arena* arena) const override { + if (version >= 3) { + auto& col = assert_cast<const ColumnBitmap&>(column); + const size_t num_rows = column.size(); + auto* data = col.get_data().data(); + + for (size_t i = 0; i != num_rows; ++i) { + this->data(place).merge(data[i]); + } + } else { + BaseHelper::deserialize_and_merge_from_column(place, column, arena); + } + } + + void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict place, + const IColumn& column, size_t begin, size_t end, + Arena* arena) const override { + DCHECK(end <= column.size() && begin <= end) + << ", begin:" << begin << ", end:" << end << ", column.size():" << column.size(); + if (version >= 3) { + auto& col = assert_cast<const ColumnBitmap&>(column); + auto* data = col.get_data().data(); + for (size_t i = begin; i <= end; ++i) { + this->data(place).merge(data[i]); + } + } else { + BaseHelper::deserialize_and_merge_from_column_range(place, column, begin, end, arena); + } + } + + void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, Arena* arena, + const size_t num_rows) const override { + if (version >= 3) { + auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column)); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + this->data(places[i]).merge(data[i]); + } + } else { + BaseHelper::deserialize_and_merge_vec(places, offset, rhs, column, arena, num_rows); + } + } + + void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, size_t offset, + AggregateDataPtr rhs, const ColumnString* column, + Arena* arena, const size_t num_rows) const override { + if (version >= 3) { + auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const IColumn*>(column)); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + if (places[i]) { + this->data(places[i]).merge(data[i]); + } + } + } else { + BaseHelper::deserialize_and_merge_vec_selected(places, offset, rhs, column, arena, + num_rows); + } + } + + void serialize_without_key_to_column(ConstAggregateDataPtr __restrict place, + IColumn& to) const override { + if (version >= 3) { + auto& col = assert_cast<ColumnBitmap&>(to); + size_t old_size = col.size(); + col.resize(old_size + 1); + col.get_data()[old_size] = std::move(this->data(place).value); + } else { + BaseHelper::serialize_without_key_to_column(place, to); + } + } + + [[nodiscard]] MutableColumnPtr create_serialize_column() const override { + if (version >= 3) { + return ColumnBitmap::create(); + } else { + return ColumnString::create(); + } + } + + [[nodiscard]] DataTypePtr get_serialized_type() const override { + if (version >= 3) { + return std::make_shared<DataTypeBitMap>(); + } else { + return IAggregateFunction::get_serialized_type(); + } + } + +protected: + using IAggregateFunction::version; +}; + template <typename Op> class AggregateFunctionBitmapOp final - : public IAggregateFunctionDataHelper<AggregateFunctionBitmapData<Op>, - AggregateFunctionBitmapOp<Op>> { + : public AggregateFunctionBitmapSerializationHelper<AggregateFunctionBitmapData<Op>, + AggregateFunctionBitmapOp<Op>> { public: using ResultDataType = BitmapValue; using ColVecType = ColumnBitmap; @@ -158,8 +293,9 @@ public: String get_name() const override { return Op::name; } AggregateFunctionBitmapOp(const DataTypes& argument_types_) - : IAggregateFunctionDataHelper<AggregateFunctionBitmapData<Op>, - AggregateFunctionBitmapOp<Op>>(argument_types_) {} + : AggregateFunctionBitmapSerializationHelper<AggregateFunctionBitmapData<Op>, + AggregateFunctionBitmapOp<Op>>( + argument_types_) {} DataTypePtr get_return_type() const override { return std::make_shared<DataTypeBitMap>(); } @@ -207,7 +343,7 @@ public: template <bool arg_is_nullable, typename ColVecType> class AggregateFunctionBitmapCount final - : public IAggregateFunctionDataHelper< + : public AggregateFunctionBitmapSerializationHelper< AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>, AggregateFunctionBitmapCount<arg_is_nullable, ColVecType>> { public: @@ -216,7 +352,7 @@ public: using AggFunctionData = AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>; AggregateFunctionBitmapCount(const DataTypes& argument_types_) - : IAggregateFunctionDataHelper< + : AggregateFunctionBitmapSerializationHelper< AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>, AggregateFunctionBitmapCount<arg_is_nullable, ColVecType>>(argument_types_) {} diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h index 02e3b8f28e..d7b1fe72b9 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h +++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h @@ -49,6 +49,10 @@ struct AggregateFunctionBitmapAggData { void reset() { value.clear(); } void merge(const AggregateFunctionBitmapAggData& other) { value |= other.value; } + + void write(BufferWritable& buf) const { DataTypeBitMap::serialize_as_stream(value, buf); } + + void read(BufferReadable& buf) { DataTypeBitMap::deserialize_as_stream(value, buf); } }; template <bool arg_nullable, typename T> @@ -114,12 +118,26 @@ public: } void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { - __builtin_unreachable(); + this->data(place).write(buf); } void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, Arena*) const override { - __builtin_unreachable(); + this->data(place).read(buf); + } + + void streaming_agg_serialize_to_column(const IColumn** columns, MutableColumnPtr& dst, + const size_t num_rows, Arena* arena) const override { + auto& col = assert_cast<ColumnBitmap&>(*dst); + char place[sizeof(Data)]; + col.resize(num_rows); + auto* data = col.get_data().data(); + for (size_t i = 0; i != num_rows; ++i) { + this->create(place); + DEFER({ this->destroy(place); }); + this->add(place, columns, i, arena); + data[i] = std::move(this->data(place).value); + } } void deserialize_from_column(AggregateDataPtr places, const IColumn& column, Arena* arena, diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index c127c754f1..cb83d6fcab 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -465,6 +465,7 @@ Status AggregationNode::alloc_resource(doris::RuntimeState* state) { for (int i = 0; i < _aggregate_evaluators.size(); ++i) { RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state)); + _aggregate_evaluators[i]->set_version(state->be_exec_version()); } // move _create_agg_status to open not in during prepare, diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h index 97d13b1658..2688fae260 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.h +++ b/be/src/vec/exprs/vectorized_agg_fn.h @@ -101,6 +101,8 @@ public: bool is_merge() const { return _is_merge; } const VExprContextSPtrs& input_exprs_ctxs() const { return _input_exprs_ctxs; } + void set_version(const int version) { _function->set_version(version); } + private: const TFunction _fn; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c2dc625829..aa415da212 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1619,7 +1619,7 @@ public class Config extends ConfigBase { * Max data version of backends serialize block. */ @ConfField(mutable = false) - public static int max_be_exec_version = 2; + public static int max_be_exec_version = 3; /** * Min data version of backends serialize block. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org