This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1e74161706a [fix](group concat) Fix be oom caused by group concat (#42334) 1e74161706a is described below commit 1e74161706a38650d0e57f8d27fee3b537579d80 Author: TengJianPing <18241664+jackte...@users.noreply.github.com> AuthorDate: Tue Oct 29 10:01:40 2024 +0800 [fix](group concat) Fix be oom caused by group concat (#42334) ## Proposed changes Issue Number: close #xxx Memory usage of ```std::string``` is not recorded by BE memtrack, which may cause BE process OOM. --- .../aggregate_function_group_concat.h | 39 ++++++++++++++-------- be/src/vec/io/io_helper.h | 11 ++++-- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function_group_concat.h b/be/src/vec/aggregate_functions/aggregate_function_group_concat.h index a62ffb8da61..a0cac9ab780 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_group_concat.h +++ b/be/src/vec/aggregate_functions/aggregate_function_group_concat.h @@ -43,20 +43,27 @@ class IColumn; namespace doris::vectorized { struct AggregateFunctionGroupConcatData { - std::string data; + ColumnString::Chars data; std::string separator; bool inited = false; void add(StringRef ref, StringRef sep) { + auto delta_size = ref.size; if (!inited) { - inited = true; separator.assign(sep.data, sep.data + sep.size); } else { - data += separator; + delta_size += separator.size(); } + auto offset = data.size(); + data.resize(data.size() + delta_size); - data.resize(data.length() + ref.size); - memcpy(data.data() + data.length() - ref.size, ref.data, ref.size); + if (!inited) { + inited = true; + } else { + memcpy(data.data() + offset, separator.data(), separator.size()); + offset += separator.size(); + } + memcpy(data.data() + offset, ref.data, ref.size); } void merge(const AggregateFunctionGroupConcatData& rhs) { @@ -67,17 +74,23 @@ struct AggregateFunctionGroupConcatData { if (!inited) { inited = true; separator = rhs.separator; - data = rhs.data; + data.assign(rhs.data); } else { - data += separator; - data += rhs.data; + auto offset = data.size(); + + auto delta_size = separator.size() + rhs.data.size(); + data.resize(data.size() + delta_size); + + memcpy(data.data() + offset, separator.data(), separator.size()); + offset += separator.size(); + memcpy(data.data() + offset, rhs.data.data(), rhs.data.size()); } } - const std::string& get() const { return data; } + StringRef get() const { return StringRef {data.data(), data.size()}; } void write(BufferWritable& buf) const { - write_binary(data, buf); + write_binary(StringRef {data.data(), data.size()}, buf); write_binary(separator, buf); write_binary(inited, buf); } @@ -89,7 +102,7 @@ struct AggregateFunctionGroupConcatData { } void reset() { - data = ""; + data.clear(); separator = ""; inited = false; } @@ -150,8 +163,8 @@ public: } void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn& to) const override { - const std::string& result = this->data(place).get(); - assert_cast<ColumnString&>(to).insert_data(result.c_str(), result.length()); + const auto result = this->data(place).get(); + assert_cast<ColumnString&>(to).insert_data(result.data, result.size); } }; diff --git a/be/src/vec/io/io_helper.h b/be/src/vec/io/io_helper.h index d5ca522146a..221beeccbb3 100644 --- a/be/src/vec/io/io_helper.h +++ b/be/src/vec/io/io_helper.h @@ -22,6 +22,7 @@ #include <snappy/snappy.h> #include <iostream> +#include <type_traits> #include "common/exception.h" #include "util/binary_cast.hpp" @@ -168,7 +169,9 @@ void read_float_binary(Type& x, BufferReadable& buf) { read_pod_binary(x, buf); } -inline void read_string_binary(std::string& s, BufferReadable& buf, +template <typename Type> + requires(std::is_same_v<Type, String> || std::is_same_v<Type, PaddedPODArray<UInt8>>) +inline void read_string_binary(Type& s, BufferReadable& buf, size_t MAX_STRING_SIZE = DEFAULT_MAX_STRING_SIZE) { UInt64 size = 0; read_var_uint(size, buf); @@ -178,7 +181,7 @@ inline void read_string_binary(std::string& s, BufferReadable& buf, } s.resize(size); - buf.read(s.data(), size); + buf.read((char*)s.data(), size); } inline void read_string_binary(StringRef& s, BufferReadable& buf, @@ -225,7 +228,9 @@ void read_vector_binary(std::vector<Type>& v, BufferReadable& buf, } } -inline void read_binary(String& x, BufferReadable& buf) { +template <typename Type> + requires(std::is_same_v<Type, String> || std::is_same_v<Type, PaddedPODArray<UInt8>>) +inline void read_binary(Type& x, BufferReadable& buf) { read_string_binary(x, buf); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org