This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 2c9bdd64fa [fix](memory) arena support memory reuse after clear() (#21033) 2c9bdd64fa is described below commit 2c9bdd64fa40c2e81b8092fb7bfeb9a193f9d1b8 Author: Xinyi Zou <zouxiny...@gmail.com> AuthorDate: Wed Jun 21 23:27:21 2023 +0800 [fix](memory) arena support memory reuse after clear() (#21033) --- be/src/common/config.cpp | 2 ++ be/src/common/config.h | 3 +++ be/src/vec/columns/predicate_column.h | 2 +- be/src/vec/common/arena.h | 23 ++++++++++++++++++++++ .../vec/exec/join/process_hash_table_probe_impl.h | 2 +- be/src/vec/exec/join/vhash_join_node.h | 4 +++- be/src/vec/exec/vaggregation_node.h | 17 ++++++++-------- be/src/vec/exec/vpartition_sort_node.h | 14 +++++++------ 8 files changed, 50 insertions(+), 17 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c27fbad769..2ea50568fc 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -129,6 +129,8 @@ DEFINE_mBool(enable_query_memory_overcommit, "true"); // The maximum time a thread waits for a full GC. Currently only query will wait for full gc. DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000"); +DEFINE_mInt64(pre_serialize_keys_limit_bytes, "16777216"); + // the port heartbeat service used DEFINE_Int32(heartbeat_service_port, "9050"); // the count of heart beat service diff --git a/be/src/common/config.h b/be/src/common/config.h index 983c525df0..0cd4e86302 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -169,6 +169,9 @@ DECLARE_mBool(enable_query_memory_overcommit); // The maximum time a thread waits for a full GC. Currently only query will wait for full gc. DECLARE_mInt32(thread_wait_gc_max_milliseconds); +// reach mem limit, don't serialize in batch +DECLARE_mInt64(pre_serialize_keys_limit_bytes); + // the port heartbeat service used DECLARE_Int32(heartbeat_service_port); // the count of heart beat service diff --git a/be/src/vec/columns/predicate_column.h b/be/src/vec/columns/predicate_column.h index 28cb16e949..2cbf871804 100644 --- a/be/src/vec/columns/predicate_column.h +++ b/be/src/vec/columns/predicate_column.h @@ -343,7 +343,7 @@ public: void clear() override { data.clear(); if (_arena != nullptr) { - _arena.reset(new Arena()); + _arena->clear(); } } diff --git a/be/src/vec/common/arena.h b/be/src/vec/common/arena.h index d98f4eeaa7..9fad70d845 100644 --- a/be/src/vec/common/arena.h +++ b/be/src/vec/common/arena.h @@ -277,6 +277,29 @@ public: return res; } + /** + * Delete all the chunks before the head, usually the head is the largest chunk in the arena. + * considering the scenario of memory reuse: + * 1. first time, use arena alloc 64K memory, 4K each time, at this time, there are 4 chunks of 4k 8k 16k 32k in arena. + * 2. then, clear arena, only one 32k chunk left in the arena. + * 3. second time, same alloc 64K memory, there are 4 chunks of 4k 8k 16k 32k in arena. + * 4. then, clear arena, only one 64k chunk left in the arena. + * 5. third time, same alloc 64K memory, there is still only one 64K chunk in the arena, and the memory is fully reused. + * + * special case: if the chunk is larger than 128M, it will no longer be expanded by a multiple of 2. + * If alloc 4G memory, 128M each time, then only one 128M chunk will be reserved after clearing, + * and only 128M can be reused when you apply for 4G memory again. + */ + void clear() { + if (head->prev) { + delete head->prev; + head->prev = nullptr; + } + head->pos = head->begin; + size_in_bytes = head->size(); + _used_size_no_head = 0; + } + /// Size of chunks in bytes. size_t size() const { return size_in_bytes; } diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index e595796bb5..5923dbf1c5 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -181,7 +181,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c if (_arena) { old_probe_keys_memory_usage = _arena->size(); } - _arena.reset(new Arena()); + _arena.reset(new Arena()); // TODO arena reuse by clear()? if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) { if (_probe_keys.size() < probe_rows) { _probe_keys.resize(probe_rows); diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h index 02927f9724..671d0d4170 100644 --- a/be/src/vec/exec/join/vhash_join_node.h +++ b/be/src/vec/exec/join/vhash_join_node.h @@ -72,6 +72,8 @@ struct SerializedHashTableContext { ColumnsHashing::HashMethodSerialized<typename HashTable::value_type, Mapped, true>; using Iter = typename HashTable::iterator; + SerializedHashTableContext() { _arena.reset(new Arena()); } + HashTable hash_table; Iter iter; bool inited = false; @@ -83,7 +85,7 @@ struct SerializedHashTableContext { keys.resize(num_rows); } - _arena.reset(new Arena()); + _arena->clear(); keys_memory_usage = 0; size_t keys_size = key_columns.size(); for (size_t i = 0; i < num_rows; ++i) { diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index 8b3bb0e82d..ce7070b817 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -102,7 +102,10 @@ struct AggregationMethodSerialized { std::vector<StringRef> keys; size_t keys_memory_usage = 0; AggregationMethodSerialized() - : _serialized_key_buffer_size(0), _serialized_key_buffer(nullptr) {} + : _serialized_key_buffer_size(0), _serialized_key_buffer(nullptr) { + _arena.reset(new Arena()); + _serialize_key_arena.reset(new Arena()); + } using State = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped, true>; @@ -120,20 +123,19 @@ struct AggregationMethodSerialized { } size_t total_bytes = max_one_row_byte_size * num_rows; - if (total_bytes > SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES) { + if (total_bytes > config::pre_serialize_keys_limit_bytes) { // reach mem limit, don't serialize in batch - // for simplicity, we just create a new arena here. - _arena.reset(new Arena()); + _arena->clear(); size_t keys_size = key_columns.size(); for (size_t i = 0; i < num_rows; ++i) { keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, key_columns, *_arena); } keys_memory_usage = _arena->size(); } else { - _arena.reset(); + _arena->clear(); if (total_bytes > _serialized_key_buffer_size) { _serialized_key_buffer_size = total_bytes; - _serialize_key_arena.reset(new Arena()); + _serialize_key_arena->clear(); _serialized_key_buffer = reinterpret_cast<uint8_t*>( _serialize_key_arena->alloc(_serialized_key_buffer_size)); } @@ -175,7 +177,7 @@ struct AggregationMethodSerialized { } void reset() { - _arena.reset(); + _arena.reset(new Arena()); keys_memory_usage = 0; _serialized_key_buffer_size = 0; } @@ -185,7 +187,6 @@ private: uint8_t* _serialized_key_buffer; std::unique_ptr<Arena> _serialize_key_arena; std::unique_ptr<Arena> _arena; - static constexpr size_t SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES = 16 * 1024 * 1024; // 16M }; using AggregatedDataWithoutKey = AggregateDataPtr; diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h index 4aae4a7acb..45b44da81e 100644 --- a/be/src/vec/exec/vpartition_sort_node.h +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -102,7 +102,10 @@ struct PartitionMethodSerialized { using State = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped, true>; template <typename Other> - explicit PartitionMethodSerialized(const Other& other) : data(other.data) {} + explicit PartitionMethodSerialized(const Other& other) : data(other.data) { + _arena.reset(new Arena()); + _serialize_key_arena.reset(new Arena()); + } size_t serialize_keys(const ColumnRawPtrs& key_columns, size_t num_rows) { if (keys.size() < num_rows) { @@ -115,20 +118,20 @@ struct PartitionMethodSerialized { } size_t total_bytes = max_one_row_byte_size * num_rows; - if (total_bytes > SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES) { + if (total_bytes > config::pre_serialize_keys_limit_bytes) { // reach mem limit, don't serialize in batch // for simplicity, we just create a new arena here. - _arena.reset(new Arena()); + _arena->clear(); size_t keys_size = key_columns.size(); for (size_t i = 0; i < num_rows; ++i) { keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, key_columns, *_arena); } keys_memory_usage = _arena->size(); } else { - _arena.reset(); + _arena->clear(); if (total_bytes > _serialized_key_buffer_size) { _serialized_key_buffer_size = total_bytes; - _serialize_key_arena.reset(new Arena()); + _serialize_key_arena->clear(); _serialized_key_buffer = reinterpret_cast<uint8_t*>( _serialize_key_arena->alloc(_serialized_key_buffer_size)); } @@ -152,7 +155,6 @@ private: uint8_t* _serialized_key_buffer; std::unique_ptr<Arena> _serialize_key_arena; std::unique_ptr<Arena> _arena; - static constexpr size_t SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES = 16 * 1024 * 1024; // 16M }; //for string --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org