This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e74161706a [fix](group concat) Fix be oom caused by group concat 
(#42334)
1e74161706a is described below

commit 1e74161706a38650d0e57f8d27fee3b537579d80
Author: TengJianPing <18241664+jackte...@users.noreply.github.com>
AuthorDate: Tue Oct 29 10:01:40 2024 +0800

    [fix](group concat) Fix be oom caused by group concat (#42334)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    Memory usage of ```std::string``` is not recorded by BE memtrack, which
    may cause BE process OOM.
---
 .../aggregate_function_group_concat.h              | 39 ++++++++++++++--------
 be/src/vec/io/io_helper.h                          | 11 ++++--
 2 files changed, 34 insertions(+), 16 deletions(-)

diff --git a/be/src/vec/aggregate_functions/aggregate_function_group_concat.h 
b/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
index a62ffb8da61..a0cac9ab780 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
@@ -43,20 +43,27 @@ class IColumn;
 namespace doris::vectorized {
 
 struct AggregateFunctionGroupConcatData {
-    std::string data;
+    ColumnString::Chars data;
     std::string separator;
     bool inited = false;
 
     void add(StringRef ref, StringRef sep) {
+        auto delta_size = ref.size;
         if (!inited) {
-            inited = true;
             separator.assign(sep.data, sep.data + sep.size);
         } else {
-            data += separator;
+            delta_size += separator.size();
         }
+        auto offset = data.size();
+        data.resize(data.size() + delta_size);
 
-        data.resize(data.length() + ref.size);
-        memcpy(data.data() + data.length() - ref.size, ref.data, ref.size);
+        if (!inited) {
+            inited = true;
+        } else {
+            memcpy(data.data() + offset, separator.data(), separator.size());
+            offset += separator.size();
+        }
+        memcpy(data.data() + offset, ref.data, ref.size);
     }
 
     void merge(const AggregateFunctionGroupConcatData& rhs) {
@@ -67,17 +74,23 @@ struct AggregateFunctionGroupConcatData {
         if (!inited) {
             inited = true;
             separator = rhs.separator;
-            data = rhs.data;
+            data.assign(rhs.data);
         } else {
-            data += separator;
-            data += rhs.data;
+            auto offset = data.size();
+
+            auto delta_size = separator.size() + rhs.data.size();
+            data.resize(data.size() + delta_size);
+
+            memcpy(data.data() + offset, separator.data(), separator.size());
+            offset += separator.size();
+            memcpy(data.data() + offset, rhs.data.data(), rhs.data.size());
         }
     }
 
-    const std::string& get() const { return data; }
+    StringRef get() const { return StringRef {data.data(), data.size()}; }
 
     void write(BufferWritable& buf) const {
-        write_binary(data, buf);
+        write_binary(StringRef {data.data(), data.size()}, buf);
         write_binary(separator, buf);
         write_binary(inited, buf);
     }
@@ -89,7 +102,7 @@ struct AggregateFunctionGroupConcatData {
     }
 
     void reset() {
-        data = "";
+        data.clear();
         separator = "";
         inited = false;
     }
@@ -150,8 +163,8 @@ public:
     }
 
     void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& 
to) const override {
-        const std::string& result = this->data(place).get();
-        assert_cast<ColumnString&>(to).insert_data(result.c_str(), 
result.length());
+        const auto result = this->data(place).get();
+        assert_cast<ColumnString&>(to).insert_data(result.data, result.size);
     }
 };
 
diff --git a/be/src/vec/io/io_helper.h b/be/src/vec/io/io_helper.h
index d5ca522146a..221beeccbb3 100644
--- a/be/src/vec/io/io_helper.h
+++ b/be/src/vec/io/io_helper.h
@@ -22,6 +22,7 @@
 #include <snappy/snappy.h>
 
 #include <iostream>
+#include <type_traits>
 
 #include "common/exception.h"
 #include "util/binary_cast.hpp"
@@ -168,7 +169,9 @@ void read_float_binary(Type& x, BufferReadable& buf) {
     read_pod_binary(x, buf);
 }
 
-inline void read_string_binary(std::string& s, BufferReadable& buf,
+template <typename Type>
+    requires(std::is_same_v<Type, String> || std::is_same_v<Type, 
PaddedPODArray<UInt8>>)
+inline void read_string_binary(Type& s, BufferReadable& buf,
                                size_t MAX_STRING_SIZE = 
DEFAULT_MAX_STRING_SIZE) {
     UInt64 size = 0;
     read_var_uint(size, buf);
@@ -178,7 +181,7 @@ inline void read_string_binary(std::string& s, 
BufferReadable& buf,
     }
 
     s.resize(size);
-    buf.read(s.data(), size);
+    buf.read((char*)s.data(), size);
 }
 
 inline void read_string_binary(StringRef& s, BufferReadable& buf,
@@ -225,7 +228,9 @@ void read_vector_binary(std::vector<Type>& v, 
BufferReadable& buf,
     }
 }
 
-inline void read_binary(String& x, BufferReadable& buf) {
+template <typename Type>
+    requires(std::is_same_v<Type, String> || std::is_same_v<Type, 
PaddedPODArray<UInt8>>)
+inline void read_binary(Type& x, BufferReadable& buf) {
     read_string_binary(x, buf);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to