This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c9bdd64fa [fix](memory) arena support memory reuse after clear() 
(#21033)
2c9bdd64fa is described below

commit 2c9bdd64fa40c2e81b8092fb7bfeb9a193f9d1b8
Author: Xinyi Zou <zouxiny...@gmail.com>
AuthorDate: Wed Jun 21 23:27:21 2023 +0800

    [fix](memory) arena support memory reuse after clear() (#21033)
---
 be/src/common/config.cpp                           |  2 ++
 be/src/common/config.h                             |  3 +++
 be/src/vec/columns/predicate_column.h              |  2 +-
 be/src/vec/common/arena.h                          | 23 ++++++++++++++++++++++
 .../vec/exec/join/process_hash_table_probe_impl.h  |  2 +-
 be/src/vec/exec/join/vhash_join_node.h             |  4 +++-
 be/src/vec/exec/vaggregation_node.h                | 17 ++++++++--------
 be/src/vec/exec/vpartition_sort_node.h             | 14 +++++++------
 8 files changed, 50 insertions(+), 17 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c27fbad769..2ea50568fc 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -129,6 +129,8 @@ DEFINE_mBool(enable_query_memory_overcommit, "true");
 // The maximum time a thread waits for a full GC. Currently only query will 
wait for full gc.
 DEFINE_mInt32(thread_wait_gc_max_milliseconds, "1000");
 
+DEFINE_mInt64(pre_serialize_keys_limit_bytes, "16777216");
+
 // the port heartbeat service used
 DEFINE_Int32(heartbeat_service_port, "9050");
 // the count of heart beat service
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 983c525df0..0cd4e86302 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -169,6 +169,9 @@ DECLARE_mBool(enable_query_memory_overcommit);
 // The maximum time a thread waits for a full GC. Currently only query will 
wait for full gc.
 DECLARE_mInt32(thread_wait_gc_max_milliseconds);
 
+// reach mem limit, don't serialize in batch
+DECLARE_mInt64(pre_serialize_keys_limit_bytes);
+
 // the port heartbeat service used
 DECLARE_Int32(heartbeat_service_port);
 // the count of heart beat service
diff --git a/be/src/vec/columns/predicate_column.h 
b/be/src/vec/columns/predicate_column.h
index 28cb16e949..2cbf871804 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -343,7 +343,7 @@ public:
     void clear() override {
         data.clear();
         if (_arena != nullptr) {
-            _arena.reset(new Arena());
+            _arena->clear();
         }
     }
 
diff --git a/be/src/vec/common/arena.h b/be/src/vec/common/arena.h
index d98f4eeaa7..9fad70d845 100644
--- a/be/src/vec/common/arena.h
+++ b/be/src/vec/common/arena.h
@@ -277,6 +277,29 @@ public:
         return res;
     }
 
+    /**
+    * Delete all the chunks before the head, usually the head is the largest 
chunk in the arena.
+    * considering the scenario of memory reuse:
+    * 1. first time, use arena alloc 64K memory, 4K each time, at this time, 
there are 4 chunks of 4k 8k 16k 32k in arena.
+    * 2. then, clear arena, only one 32k chunk left in the arena.
+    * 3. second time, same alloc 64K memory, there are 4 chunks of 4k 8k 16k 
32k in arena.
+    * 4. then, clear arena, only one 64k chunk left in the arena.
+    * 5. third time, same alloc 64K memory, there is still only one 64K chunk 
in the arena, and the memory is fully reused.
+    *
+    * special case: if the chunk is larger than 128M, it will no longer be 
expanded by a multiple of 2.
+    * If alloc 4G memory, 128M each time, then only one 128M chunk will be 
reserved after clearing,
+    * and only 128M can be reused when you apply for 4G memory again.
+    */
+    void clear() {
+        if (head->prev) {
+            delete head->prev;
+            head->prev = nullptr;
+        }
+        head->pos = head->begin;
+        size_in_bytes = head->size();
+        _used_size_no_head = 0;
+    }
+
     /// Size of chunks in bytes.
     size_t size() const { return size_in_bytes; }
 
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h 
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index e595796bb5..5923dbf1c5 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -181,7 +181,7 @@ Status 
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
         if (_arena) {
             old_probe_keys_memory_usage = _arena->size();
         }
-        _arena.reset(new Arena());
+        _arena.reset(new Arena()); // TODO arena reuse by clear()?
         if constexpr 
(ColumnsHashing::IsPreSerializedKeysHashMethodTraits<KeyGetter>::value) {
             if (_probe_keys.size() < probe_rows) {
                 _probe_keys.resize(probe_rows);
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 02927f9724..671d0d4170 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -72,6 +72,8 @@ struct SerializedHashTableContext {
             ColumnsHashing::HashMethodSerialized<typename 
HashTable::value_type, Mapped, true>;
     using Iter = typename HashTable::iterator;
 
+    SerializedHashTableContext() { _arena.reset(new Arena()); }
+
     HashTable hash_table;
     Iter iter;
     bool inited = false;
@@ -83,7 +85,7 @@ struct SerializedHashTableContext {
             keys.resize(num_rows);
         }
 
-        _arena.reset(new Arena());
+        _arena->clear();
         keys_memory_usage = 0;
         size_t keys_size = key_columns.size();
         for (size_t i = 0; i < num_rows; ++i) {
diff --git a/be/src/vec/exec/vaggregation_node.h 
b/be/src/vec/exec/vaggregation_node.h
index 8b3bb0e82d..ce7070b817 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -102,7 +102,10 @@ struct AggregationMethodSerialized {
     std::vector<StringRef> keys;
     size_t keys_memory_usage = 0;
     AggregationMethodSerialized()
-            : _serialized_key_buffer_size(0), _serialized_key_buffer(nullptr) 
{}
+            : _serialized_key_buffer_size(0), _serialized_key_buffer(nullptr) {
+        _arena.reset(new Arena());
+        _serialize_key_arena.reset(new Arena());
+    }
 
     using State = ColumnsHashing::HashMethodSerialized<typename 
Data::value_type, Mapped, true>;
 
@@ -120,20 +123,19 @@ struct AggregationMethodSerialized {
         }
         size_t total_bytes = max_one_row_byte_size * num_rows;
 
-        if (total_bytes > SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES) {
+        if (total_bytes > config::pre_serialize_keys_limit_bytes) {
             // reach mem limit, don't serialize in batch
-            // for simplicity, we just create a new arena here.
-            _arena.reset(new Arena());
+            _arena->clear();
             size_t keys_size = key_columns.size();
             for (size_t i = 0; i < num_rows; ++i) {
                 keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, 
key_columns, *_arena);
             }
             keys_memory_usage = _arena->size();
         } else {
-            _arena.reset();
+            _arena->clear();
             if (total_bytes > _serialized_key_buffer_size) {
                 _serialized_key_buffer_size = total_bytes;
-                _serialize_key_arena.reset(new Arena());
+                _serialize_key_arena->clear();
                 _serialized_key_buffer = reinterpret_cast<uint8_t*>(
                         
_serialize_key_arena->alloc(_serialized_key_buffer_size));
             }
@@ -175,7 +177,7 @@ struct AggregationMethodSerialized {
     }
 
     void reset() {
-        _arena.reset();
+        _arena.reset(new Arena());
         keys_memory_usage = 0;
         _serialized_key_buffer_size = 0;
     }
@@ -185,7 +187,6 @@ private:
     uint8_t* _serialized_key_buffer;
     std::unique_ptr<Arena> _serialize_key_arena;
     std::unique_ptr<Arena> _arena;
-    static constexpr size_t SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES = 16 * 1024 * 
1024; // 16M
 };
 
 using AggregatedDataWithoutKey = AggregateDataPtr;
diff --git a/be/src/vec/exec/vpartition_sort_node.h 
b/be/src/vec/exec/vpartition_sort_node.h
index 4aae4a7acb..45b44da81e 100644
--- a/be/src/vec/exec/vpartition_sort_node.h
+++ b/be/src/vec/exec/vpartition_sort_node.h
@@ -102,7 +102,10 @@ struct PartitionMethodSerialized {
     using State = ColumnsHashing::HashMethodSerialized<typename 
Data::value_type, Mapped, true>;
 
     template <typename Other>
-    explicit PartitionMethodSerialized(const Other& other) : data(other.data) 
{}
+    explicit PartitionMethodSerialized(const Other& other) : data(other.data) {
+        _arena.reset(new Arena());
+        _serialize_key_arena.reset(new Arena());
+    }
 
     size_t serialize_keys(const ColumnRawPtrs& key_columns, size_t num_rows) {
         if (keys.size() < num_rows) {
@@ -115,20 +118,20 @@ struct PartitionMethodSerialized {
         }
         size_t total_bytes = max_one_row_byte_size * num_rows;
 
-        if (total_bytes > SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES) {
+        if (total_bytes > config::pre_serialize_keys_limit_bytes) {
             // reach mem limit, don't serialize in batch
             // for simplicity, we just create a new arena here.
-            _arena.reset(new Arena());
+            _arena->clear();
             size_t keys_size = key_columns.size();
             for (size_t i = 0; i < num_rows; ++i) {
                 keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, 
key_columns, *_arena);
             }
             keys_memory_usage = _arena->size();
         } else {
-            _arena.reset();
+            _arena->clear();
             if (total_bytes > _serialized_key_buffer_size) {
                 _serialized_key_buffer_size = total_bytes;
-                _serialize_key_arena.reset(new Arena());
+                _serialize_key_arena->clear();
                 _serialized_key_buffer = reinterpret_cast<uint8_t*>(
                         
_serialize_key_arena->alloc(_serialized_key_buffer_size));
             }
@@ -152,7 +155,6 @@ private:
     uint8_t* _serialized_key_buffer;
     std::unique_ptr<Arena> _serialize_key_arena;
     std::unique_ptr<Arena> _arena;
-    static constexpr size_t SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES = 16 * 1024 * 
1024; // 16M
 };
 
 //for string


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to