This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0967d7ec04 [improvement](agg) Do not serialize bitmap to string 
(#23172)
0967d7ec04 is described below

commit 0967d7ec04cfd79c710e3e655e4225d394333780
Author: Jerry Hu <mrh...@gmail.com>
AuthorDate: Mon Aug 21 10:10:15 2023 +0800

    [improvement](agg) Do not serialize bitmap to string (#23172)
---
 be/src/agent/be_exec_version_manager.h             |   6 +-
 be/src/util/bitmap_value.h                         |   4 +
 .../vec/aggregate_functions/aggregate_function.h   |  11 +-
 .../aggregate_function_bitmap.h                    | 148 ++++++++++++++++++++-
 .../aggregate_function_bitmap_agg.h                |  22 ++-
 be/src/vec/exec/vaggregation_node.cpp              |   1 +
 be/src/vec/exprs/vectorized_agg_fn.h               |   2 +
 .../main/java/org/apache/doris/common/Config.java  |   2 +-
 8 files changed, 181 insertions(+), 15 deletions(-)

diff --git a/be/src/agent/be_exec_version_manager.h 
b/be/src/agent/be_exec_version_manager.h
index 0ecc868651..25de399df3 100644
--- a/be/src/agent/be_exec_version_manager.h
+++ b/be/src/agent/be_exec_version_manager.h
@@ -56,10 +56,12 @@ private:
  *    a. function month/day/hour/minute/second's return type is changed to 
smaller type.
  *    b. in order to solve agg of sum/count is not compatibility during the 
upgrade process
  *    c. change the string hash method in runtime filter
- *    d. elt funciton return type change to nullable(string)
+ *    d. elt function return type change to nullable(string)
  *    e. add repeat_max_num in repeat function
+ * 3: start from doris 2.1
+ *    a. aggregation function do not serialize bitmap to string
 */
-inline const int BeExecVersionManager::max_be_exec_version = 2;
+inline const int BeExecVersionManager::max_be_exec_version = 3;
 inline const int BeExecVersionManager::min_be_exec_version = 0;
 
 } // namespace doris
diff --git a/be/src/util/bitmap_value.h b/be/src/util/bitmap_value.h
index ec75c4141c..96510bdde3 100644
--- a/be/src/util/bitmap_value.h
+++ b/be/src/util/bitmap_value.h
@@ -1191,6 +1191,10 @@ public:
         _is_shared = other._is_shared;
         _bitmap = std::move(other._bitmap);
         _set = std::move(other._set);
+
+        other._type = EMPTY;
+        other._is_shared = false;
+        other._bitmap = nullptr;
     }
 
     BitmapValue& operator=(const BitmapValue& other) {
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h 
b/be/src/vec/aggregate_functions/aggregate_function.h
index 4b2118fc51..cc1b7d88f5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -220,8 +220,11 @@ public:
 
     virtual DataTypePtr get_serialized_type() const { return 
std::make_shared<DataTypeString>(); }
 
+    virtual void set_version(const int version_) { version = version_; }
+
 protected:
     DataTypes argument_types;
+    int version {};
 };
 
 /// Implement method to obtain an address of 'add' function.
@@ -323,8 +326,8 @@ public:
 
     void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
                              MutableColumnPtr& dst, const size_t num_rows) 
const override {
-        VectorBufferWriter writter(assert_cast<ColumnString&>(*dst));
-        serialize_vec(places, offset, writter, num_rows);
+        VectorBufferWriter writer(assert_cast<ColumnString&>(*dst));
+        serialize_vec(places, offset, writer, num_rows);
     }
 
     void streaming_agg_serialize(const IColumn** columns, BufferWritable& buf,
@@ -341,8 +344,8 @@ public:
 
     void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
                                            const size_t num_rows, Arena* 
arena) const override {
-        VectorBufferWriter writter(assert_cast<ColumnString&>(*dst));
-        streaming_agg_serialize(columns, writter, num_rows, arena);
+        VectorBufferWriter writer(assert_cast<ColumnString&>(*dst));
+        streaming_agg_serialize(columns, writer, num_rows, arena);
     }
 
     void serialize_without_key_to_column(ConstAggregateDataPtr __restrict 
place,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h 
b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
index 00d3517fa0..7d2634a8dc 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap.h
@@ -146,10 +146,145 @@ struct AggregateFunctionBitmapData {
     BitmapValue& get() { return value; }
 };
 
+template <typename Data, typename Derived>
+class AggregateFunctionBitmapSerializationHelper
+        : public IAggregateFunctionDataHelper<Data, Derived> {
+public:
+    using BaseHelper = IAggregateFunctionHelper<Derived>;
+
+    AggregateFunctionBitmapSerializationHelper(const DataTypes& 
argument_types_)
+            : IAggregateFunctionDataHelper<Data, Derived>(argument_types_) {}
+
+    void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
+                                           const size_t num_rows, Arena* 
arena) const override {
+        if (version >= 3) {
+            auto& col = assert_cast<ColumnBitmap&>(*dst);
+            char place[sizeof(Data)];
+            col.resize(num_rows);
+            auto* data = col.get_data().data();
+            for (size_t i = 0; i != num_rows; ++i) {
+                assert_cast<const Derived*>(this)->create(place);
+                DEFER({ assert_cast<const Derived*>(this)->destroy(place); });
+                assert_cast<const Derived*>(this)->add(place, columns, i, 
arena);
+                data[i] = std::move(this->data(place).value);
+            }
+        } else {
+            BaseHelper::streaming_agg_serialize_to_column(columns, dst, 
num_rows, arena);
+        }
+    }
+
+    void serialize_to_column(const std::vector<AggregateDataPtr>& places, 
size_t offset,
+                             MutableColumnPtr& dst, const size_t num_rows) 
const override {
+        if (version >= 3) {
+            auto& col = assert_cast<ColumnBitmap&>(*dst);
+            col.resize(num_rows);
+            auto* data = col.get_data().data();
+            for (size_t i = 0; i != num_rows; ++i) {
+                data[i] = std::move(this->data(places[i] + offset).value);
+            }
+        } else {
+            BaseHelper::serialize_to_column(places, offset, dst, num_rows);
+        }
+    }
+
+    void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, 
const IColumn& column,
+                                           Arena* arena) const override {
+        if (version >= 3) {
+            auto& col = assert_cast<const ColumnBitmap&>(column);
+            const size_t num_rows = column.size();
+            auto* data = col.get_data().data();
+
+            for (size_t i = 0; i != num_rows; ++i) {
+                this->data(place).merge(data[i]);
+            }
+        } else {
+            BaseHelper::deserialize_and_merge_from_column(place, column, 
arena);
+        }
+    }
+
+    void deserialize_and_merge_from_column_range(AggregateDataPtr __restrict 
place,
+                                                 const IColumn& column, size_t 
begin, size_t end,
+                                                 Arena* arena) const override {
+        DCHECK(end <= column.size() && begin <= end)
+                << ", begin:" << begin << ", end:" << end << ", 
column.size():" << column.size();
+        if (version >= 3) {
+            auto& col = assert_cast<const ColumnBitmap&>(column);
+            auto* data = col.get_data().data();
+            for (size_t i = begin; i <= end; ++i) {
+                this->data(place).merge(data[i]);
+            }
+        } else {
+            BaseHelper::deserialize_and_merge_from_column_range(place, column, 
begin, end, arena);
+        }
+    }
+
+    void deserialize_and_merge_vec(const AggregateDataPtr* places, size_t 
offset,
+                                   AggregateDataPtr rhs, const ColumnString* 
column, Arena* arena,
+                                   const size_t num_rows) const override {
+        if (version >= 3) {
+            auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const 
IColumn*>(column));
+            auto* data = col.get_data().data();
+            for (size_t i = 0; i != num_rows; ++i) {
+                this->data(places[i]).merge(data[i]);
+            }
+        } else {
+            BaseHelper::deserialize_and_merge_vec(places, offset, rhs, column, 
arena, num_rows);
+        }
+    }
+
+    void deserialize_and_merge_vec_selected(const AggregateDataPtr* places, 
size_t offset,
+                                            AggregateDataPtr rhs, const 
ColumnString* column,
+                                            Arena* arena, const size_t 
num_rows) const override {
+        if (version >= 3) {
+            auto& col = assert_cast<const ColumnBitmap&>(*assert_cast<const 
IColumn*>(column));
+            auto* data = col.get_data().data();
+            for (size_t i = 0; i != num_rows; ++i) {
+                if (places[i]) {
+                    this->data(places[i]).merge(data[i]);
+                }
+            }
+        } else {
+            BaseHelper::deserialize_and_merge_vec_selected(places, offset, 
rhs, column, arena,
+                                                           num_rows);
+        }
+    }
+
+    void serialize_without_key_to_column(ConstAggregateDataPtr __restrict 
place,
+                                         IColumn& to) const override {
+        if (version >= 3) {
+            auto& col = assert_cast<ColumnBitmap&>(to);
+            size_t old_size = col.size();
+            col.resize(old_size + 1);
+            col.get_data()[old_size] = std::move(this->data(place).value);
+        } else {
+            BaseHelper::serialize_without_key_to_column(place, to);
+        }
+    }
+
+    [[nodiscard]] MutableColumnPtr create_serialize_column() const override {
+        if (version >= 3) {
+            return ColumnBitmap::create();
+        } else {
+            return ColumnString::create();
+        }
+    }
+
+    [[nodiscard]] DataTypePtr get_serialized_type() const override {
+        if (version >= 3) {
+            return std::make_shared<DataTypeBitMap>();
+        } else {
+            return IAggregateFunction::get_serialized_type();
+        }
+    }
+
+protected:
+    using IAggregateFunction::version;
+};
+
 template <typename Op>
 class AggregateFunctionBitmapOp final
-        : public IAggregateFunctionDataHelper<AggregateFunctionBitmapData<Op>,
-                                              AggregateFunctionBitmapOp<Op>> {
+        : public 
AggregateFunctionBitmapSerializationHelper<AggregateFunctionBitmapData<Op>,
+                                                            
AggregateFunctionBitmapOp<Op>> {
 public:
     using ResultDataType = BitmapValue;
     using ColVecType = ColumnBitmap;
@@ -158,8 +293,9 @@ public:
     String get_name() const override { return Op::name; }
 
     AggregateFunctionBitmapOp(const DataTypes& argument_types_)
-            : IAggregateFunctionDataHelper<AggregateFunctionBitmapData<Op>,
-                                           
AggregateFunctionBitmapOp<Op>>(argument_types_) {}
+            : 
AggregateFunctionBitmapSerializationHelper<AggregateFunctionBitmapData<Op>,
+                                                         
AggregateFunctionBitmapOp<Op>>(
+                      argument_types_) {}
 
     DataTypePtr get_return_type() const override { return 
std::make_shared<DataTypeBitMap>(); }
 
@@ -207,7 +343,7 @@ public:
 
 template <bool arg_is_nullable, typename ColVecType>
 class AggregateFunctionBitmapCount final
-        : public IAggregateFunctionDataHelper<
+        : public AggregateFunctionBitmapSerializationHelper<
                   AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>,
                   AggregateFunctionBitmapCount<arg_is_nullable, ColVecType>> {
 public:
@@ -216,7 +352,7 @@ public:
     using AggFunctionData = 
AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>;
 
     AggregateFunctionBitmapCount(const DataTypes& argument_types_)
-            : IAggregateFunctionDataHelper<
+            : AggregateFunctionBitmapSerializationHelper<
                       
AggregateFunctionBitmapData<AggregateFunctionBitmapUnionOp>,
                       AggregateFunctionBitmapCount<arg_is_nullable, 
ColVecType>>(argument_types_) {}
 
diff --git a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h 
b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
index 02e3b8f28e..d7b1fe72b9 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_bitmap_agg.h
@@ -49,6 +49,10 @@ struct AggregateFunctionBitmapAggData {
     void reset() { value.clear(); }
 
     void merge(const AggregateFunctionBitmapAggData& other) { value |= 
other.value; }
+
+    void write(BufferWritable& buf) const { 
DataTypeBitMap::serialize_as_stream(value, buf); }
+
+    void read(BufferReadable& buf) { 
DataTypeBitMap::deserialize_as_stream(value, buf); }
 };
 
 template <bool arg_nullable, typename T>
@@ -114,12 +118,26 @@ public:
     }
 
     void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& 
buf) const override {
-        __builtin_unreachable();
+        this->data(place).write(buf);
     }
 
     void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
                      Arena*) const override {
-        __builtin_unreachable();
+        this->data(place).read(buf);
+    }
+
+    void streaming_agg_serialize_to_column(const IColumn** columns, 
MutableColumnPtr& dst,
+                                           const size_t num_rows, Arena* 
arena) const override {
+        auto& col = assert_cast<ColumnBitmap&>(*dst);
+        char place[sizeof(Data)];
+        col.resize(num_rows);
+        auto* data = col.get_data().data();
+        for (size_t i = 0; i != num_rows; ++i) {
+            this->create(place);
+            DEFER({ this->destroy(place); });
+            this->add(place, columns, i, arena);
+            data[i] = std::move(this->data(place).value);
+        }
     }
 
     void deserialize_from_column(AggregateDataPtr places, const IColumn& 
column, Arena* arena,
diff --git a/be/src/vec/exec/vaggregation_node.cpp 
b/be/src/vec/exec/vaggregation_node.cpp
index c127c754f1..cb83d6fcab 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -465,6 +465,7 @@ Status AggregationNode::alloc_resource(doris::RuntimeState* 
state) {
 
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
         RETURN_IF_ERROR(_aggregate_evaluators[i]->open(state));
+        _aggregate_evaluators[i]->set_version(state->be_exec_version());
     }
 
     // move _create_agg_status to open not in during prepare,
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h 
b/be/src/vec/exprs/vectorized_agg_fn.h
index 97d13b1658..2688fae260 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -101,6 +101,8 @@ public:
     bool is_merge() const { return _is_merge; }
     const VExprContextSPtrs& input_exprs_ctxs() const { return 
_input_exprs_ctxs; }
 
+    void set_version(const int version) { _function->set_version(version); }
+
 private:
     const TFunction _fn;
 
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index c2dc625829..aa415da212 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1619,7 +1619,7 @@ public class Config extends ConfigBase {
      * Max data version of backends serialize block.
      */
     @ConfField(mutable = false)
-    public static int max_be_exec_version = 2;
+    public static int max_be_exec_version = 3;
 
     /**
      * Min data version of backends serialize block.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to