This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 0bcb99710b0 [branch-2.1]Add customStdAllocator for vector/map (#41193) 
(#45124)
0bcb99710b0 is described below

commit 0bcb99710b0954f1c2aa8562b5d1beade7e31347
Author: wangbo <wan...@selectdb.com>
AuthorDate: Mon Dec 9 18:33:51 2024 +0800

    [branch-2.1]Add customStdAllocator for vector/map (#41193) (#45124)
---
 be/src/olap/compaction.cpp                         | 11 +--
 be/src/olap/compaction.h                           |  2 +-
 be/src/olap/rowid_conversion.h                     | 29 +++++++-
 be/src/olap/rowset/beta_rowset_writer.cpp          |  8 +++
 be/src/olap/rowset/beta_rowset_writer.h            |  4 +-
 be/src/olap/rowset/segcompaction.cpp               |  8 ++-
 be/src/olap/rowset/segcompaction.h                 | 13 +++-
 be/src/olap/simple_rowid_conversion.h              |  5 +-
 be/src/runtime/thread_context.h                    |  4 +-
 be/src/vec/common/custom_allocator.h               | 82 ++++++++++++++++++++++
 .../index_compaction_with_deleted_term.cpp         |  2 +-
 11 files changed, 152 insertions(+), 16 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 80239476e48..4b2ac6df119 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -81,6 +81,8 @@ Compaction::Compaction(const TabletSharedPtr& tablet, const 
std::string& label)
           _state(CompactionState::INITED) {
     _mem_tracker = 
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::COMPACTION, label);
     init_profile(label);
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
+    _rowid_conversion = std::make_unique<RowIdConversion>();
 }
 
 Compaction::~Compaction() {
@@ -90,6 +92,7 @@ Compaction::~Compaction() {
     _input_rowsets.clear();
     _output_rowset.reset();
     _cur_tablet_schema.reset();
+    _rowid_conversion.reset();
 }
 
 void Compaction::init_profile(const std::string& label) {
@@ -378,7 +381,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
     if (!ctx.columns_to_do_index_compaction.empty() ||
         (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
          _tablet->enable_unique_key_merge_on_write())) {
-        stats.rowid_conversion = &_rowid_conversion;
+        stats.rowid_conversion = _rowid_conversion.get();
     }
     int64_t way_num = merge_way_num();
 
@@ -964,7 +967,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* 
stats) {
         // TODO(LiaoXin): check if there are duplicate keys
         std::size_t missed_rows_size = 0;
         _tablet->calc_compaction_output_rowset_delete_bitmap(
-                _input_rowsets, _rowid_conversion, 0, version.second + 1, 
missed_rows.get(),
+                _input_rowsets, *_rowid_conversion, 0, version.second + 1, 
missed_rows.get(),
                 location_map.get(), _tablet->tablet_meta()->delete_bitmap(),
                 &output_rowset_delete_bitmap);
         if (missed_rows) {
@@ -1024,7 +1027,7 @@ Status Compaction::modify_rowsets(const 
Merger::Statistics* stats) {
                 }
                 DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id());
                 _tablet->calc_compaction_output_rowset_delete_bitmap(
-                        _input_rowsets, _rowid_conversion, 0, UINT64_MAX, 
missed_rows.get(),
+                        _input_rowsets, *_rowid_conversion, 0, UINT64_MAX, 
missed_rows.get(),
                         location_map.get(), *it.delete_bitmap.get(), 
&txn_output_delete_bitmap);
                 if (config::enable_merge_on_write_correctness_check) {
                     RowsetIdUnorderedSet rowsetids;
@@ -1044,7 +1047,7 @@ Status Compaction::modify_rowsets(const 
Merger::Statistics* stats) {
             // Convert the delete bitmap of the input rowsets to output rowset 
for
             // incremental data.
             _tablet->calc_compaction_output_rowset_delete_bitmap(
-                    _input_rowsets, _rowid_conversion, version.second, 
UINT64_MAX,
+                    _input_rowsets, *_rowid_conversion, version.second, 
UINT64_MAX,
                     missed_rows.get(), location_map.get(), 
_tablet->tablet_meta()->delete_bitmap(),
                     &output_rowset_delete_bitmap);
 
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 5aa3e260194..0c8eb211eda 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -130,7 +130,7 @@ protected:
     Version _output_version;
 
     int64_t _newest_write_timestamp;
-    RowIdConversion _rowid_conversion;
+    std::unique_ptr<RowIdConversion> _rowid_conversion = nullptr;
     TabletSchemaSPtr _cur_tablet_schema;
 
     std::unique_ptr<RuntimeProfile> _profile;
diff --git a/be/src/olap/rowid_conversion.h b/be/src/olap/rowid_conversion.h
index d8e2a058d81..01a2cea0d5e 100644
--- a/be/src/olap/rowid_conversion.h
+++ b/be/src/olap/rowid_conversion.h
@@ -22,6 +22,7 @@
 
 #include "olap/olap_common.h"
 #include "olap/utils.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
@@ -33,17 +34,24 @@ namespace doris {
 class RowIdConversion {
 public:
     RowIdConversion() = default;
-    ~RowIdConversion() = default;
+    ~RowIdConversion() { RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used); }
 
     // resize segment rowid map to its rows num
     void init_segment_map(const RowsetId& src_rowset_id, const 
std::vector<uint32_t>& num_rows) {
+        size_t delta_std_pair_cap = 0;
         for (size_t i = 0; i < num_rows.size(); i++) {
             uint32_t id = _segments_rowid_map.size();
             _segment_to_id_map.emplace(std::pair<RowsetId, uint32_t> 
{src_rowset_id, i}, id);
             _id_to_segment_map.emplace_back(src_rowset_id, i);
-            _segments_rowid_map.emplace_back(std::vector<std::pair<uint32_t, 
uint32_t>>(
-                    num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, 
UINT32_MAX)));
+            std::vector<std::pair<uint32_t, uint32_t>> vec(
+                    num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, 
UINT32_MAX));
+            delta_std_pair_cap += vec.capacity();
+            _segments_rowid_map.emplace_back(std::move(vec));
         }
+        //NOTE: manually count _segments_rowid_map's memory here, because 
_segments_rowid_map could be used by indexCompaction.
+        // indexCompaction is a thridparty code, it's too complex to modify it.
+        // refer compact_column.
+        track_mem_usage(delta_std_pair_cap);
     }
 
     // set dst rowset id
@@ -109,12 +117,27 @@ public:
         return _segment_to_id_map.at(segment);
     }
 
+private:
+    void track_mem_usage(size_t delta_std_pair_cap) {
+        _std_pair_cap += delta_std_pair_cap;
+
+        size_t new_size =
+                _std_pair_cap * sizeof(std::pair<uint32_t, uint32_t>) +
+                _segments_rowid_map.capacity() * 
sizeof(std::vector<std::pair<uint32_t, uint32_t>>);
+
+        RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used);
+        CONSUME_THREAD_MEM_TRACKER(new_size);
+        _seg_rowid_map_mem_used = new_size;
+    }
+
 private:
     // the first level vector: index indicates src segment.
     // the second level vector: index indicates row id of source segment,
     // value indicates row id of destination segment.
     // <UINT32_MAX, UINT32_MAX> indicates current row not exist.
     std::vector<std::vector<std::pair<uint32_t, uint32_t>>> 
_segments_rowid_map;
+    size_t _seg_rowid_map_mem_used {0};
+    size_t _std_pair_cap {0};
 
     // Map source segment to 0 to n
     std::map<std::pair<RowsetId, uint32_t>, uint32_t> _segment_to_id_map;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 65c17eaee7e..e2e0255d9a1 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -197,6 +197,14 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t 
segment_id) {
     return Status::OK();
 }
 
+Status BetaRowsetWriter::init(const RowsetWriterContext& 
rowset_writer_context) {
+    RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context));
+    if (_segcompaction_worker) {
+        _segcompaction_worker->init_mem_tracker(rowset_writer_context.txn_id);
+    }
+    return Status::OK();
+}
+
 Status 
BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& 
segment,
                                                     int32_t segment_id) {
     DCHECK(_rowset_meta->is_local());
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index 5a34371e656..6e33654753a 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -199,6 +199,8 @@ public:
 
     Status build(RowsetSharedPtr& rowset) override;
 
+    Status init(const RowsetWriterContext& rowset_writer_context) override;
+
     Status flush_segment_writer_for_segcompaction(
             std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t 
index_size,
             KeyBoundsPB& key_bounds);
@@ -231,7 +233,7 @@ private:
                                                   // already been segment 
compacted
     std::atomic<int32_t> _num_segcompacted {0};   // index for segment 
compaction
 
-    std::shared_ptr<SegcompactionWorker> _segcompaction_worker;
+    std::shared_ptr<SegcompactionWorker> _segcompaction_worker = nullptr;
 
     // ensure only one inflight segcompaction task for each rowset
     std::atomic<bool> _is_doing_segcompaction {false};
diff --git a/be/src/olap/rowset/segcompaction.cpp 
b/be/src/olap/rowset/segcompaction.cpp
index cd76409c822..1675b44ee82 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -69,6 +69,11 @@ using namespace ErrorCode;
 
 SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : 
_writer(writer) {}
 
+void SegcompactionWorker::init_mem_tracker(int64_t txn_id) {
+    _seg_compact_mem_tracker = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::COMPACTION, "segcompaction-" + 
std::to_string(txn_id));
+}
+
 Status SegcompactionWorker::_get_segcompaction_reader(
         SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet,
         std::shared_ptr<Schema> schema, OlapReaderStatistics* stat,
@@ -220,7 +225,8 @@ Status 
SegcompactionWorker::_create_segment_writer_for_segcompaction(
 }
 
 Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr 
segments) {
-    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->segcompaction_mem_tracker());
+    DCHECK(_seg_compact_mem_tracker != nullptr);
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker);
     /* throttle segcompaction task if memory depleted */
     if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
         return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to 
memory shortage");
diff --git a/be/src/olap/rowset/segcompaction.h 
b/be/src/olap/rowset/segcompaction.h
index 67dd6889aad..d498a5b8e33 100644
--- a/be/src/olap/rowset/segcompaction.h
+++ b/be/src/olap/rowset/segcompaction.h
@@ -50,6 +50,14 @@ class SegcompactionWorker {
 public:
     explicit SegcompactionWorker(BetaRowsetWriter* writer);
 
+    ~SegcompactionWorker() {
+        DCHECK(_seg_compact_mem_tracker != nullptr);
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker);
+        if (_rowid_conversion) {
+            _rowid_conversion.reset();
+        }
+    }
+
     void compact_segments(SegCompactionCandidatesSharedPtr segments);
 
     bool need_convert_delete_bitmap();
@@ -65,6 +73,8 @@ public:
     // set the cancel flag, tasks already started will not be cancelled.
     bool cancel();
 
+    void init_mem_tracker(int64_t txn_id);
+
 private:
     Status _create_segment_writer_for_segcompaction(
             std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t 
begin, uint32_t end);
@@ -88,8 +98,9 @@ private:
     io::FileWriterPtr _file_writer;
 
     // for unique key mow table
-    std::unique_ptr<SimpleRowIdConversion> _rowid_conversion;
+    std::unique_ptr<SimpleRowIdConversion> _rowid_conversion = nullptr;
     DeleteBitmapPtr _converted_delete_bitmap;
+    std::shared_ptr<MemTrackerLimiter> _seg_compact_mem_tracker = nullptr;
 
     // the state is not mutable when 1)actual compaction operation started or 
2) cancelled
     std::atomic<bool> _is_compacting_state_mutable = true;
diff --git a/be/src/olap/simple_rowid_conversion.h 
b/be/src/olap/simple_rowid_conversion.h
index 1a89b01838f..e4aa2eeec3b 100644
--- a/be/src/olap/simple_rowid_conversion.h
+++ b/be/src/olap/simple_rowid_conversion.h
@@ -22,6 +22,7 @@
 
 #include "olap/olap_common.h"
 #include "olap/utils.h"
+#include "vec/common/custom_allocator.h"
 
 namespace doris {
 
@@ -37,7 +38,7 @@ public:
         _cur_dst_segment_rowid = 0;
         for (auto seg_rows : num_rows) {
             _segments_rowid_map.emplace(seg_rows.first,
-                                        std::vector<uint32_t>(seg_rows.second, 
UINT32_MAX));
+                                        DorisVector<uint32_t>(seg_rows.second, 
UINT32_MAX));
         }
     }
 
@@ -72,7 +73,7 @@ private:
     // key:   index indicates src segment.
     // value: index indicates row id of source segment, value indicates row id 
of destination
     //        segment. UINT32_MAX indicates current row not exist.
-    std::map<uint32_t, std::vector<uint32_t>> _segments_rowid_map;
+    DorisMap<uint32_t, DorisVector<uint32_t>> _segments_rowid_map;
 
     // dst rowset id
     RowsetId _rowst_id;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 885d616eb06..f3a7476a962 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -225,8 +225,8 @@ public:
     // to nullptr, but the object it points to is not initialized. At this 
time, when the memory
     // is released somewhere, the hook is triggered to cause the crash.
     std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr;
-    [[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const {
-        return thread_mem_tracker_mgr->limiter_mem_tracker_raw();
+    [[nodiscard]] std::shared_ptr<MemTrackerLimiter> thread_mem_tracker() 
const {
+        return thread_mem_tracker_mgr->limiter_mem_tracker();
     }
 
     QueryThreadContext query_thread_context();
diff --git a/be/src/vec/common/custom_allocator.h 
b/be/src/vec/common/custom_allocator.h
new file mode 100644
index 00000000000..eee800a059d
--- /dev/null
+++ b/be/src/vec/common/custom_allocator.h
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/common/allocator.h"
+#include "vec/common/allocator_fwd.h"
+
+template <class T, typename MemoryAllocator = Allocator<true>>
+class CustomStdAllocator;
+
+template <typename T>
+using DorisVector = std::vector<T, CustomStdAllocator<T>>;
+
+template <class Key, class T, class Compare = std::less<Key>,
+          class Allocator = CustomStdAllocator<std::pair<const Key, T>>>
+using DorisMap = std::map<Key, T, Compare, Allocator>;
+
+// NOTE: Even CustomStdAllocator 's allocate/dallocate could modify memory 
tracker,but it's still stateless,
+// because threadcontext owns the memtracker, not CustomStdAllocator.
+template <class T, typename MemoryAllocator>
+class CustomStdAllocator : private MemoryAllocator {
+public:
+    using value_type = T;
+    using pointer = T*;
+    using const_pointer = const T*;
+    using size_type = std::size_t;
+    using difference_type = std::ptrdiff_t;
+
+    CustomStdAllocator() noexcept = default;
+
+    template <class U>
+    struct rebind {
+        typedef CustomStdAllocator<U> other;
+    };
+
+    template <class Up>
+    CustomStdAllocator(const CustomStdAllocator<Up>&) noexcept {}
+
+    T* allocate(size_t n) { return static_cast<T*>(MemoryAllocator::alloc(n * 
sizeof(T))); }
+
+    void deallocate(T* ptr, size_t n) noexcept { 
MemoryAllocator::free((void*)ptr, n * sizeof(T)); }
+
+    size_t max_size() const noexcept { return size_t(~0) / sizeof(T); }
+
+    T* allocate(size_t n, const void*) { return allocate(n); }
+
+    template <class Up, class... Args>
+    void construct(Up* p, Args&&... args) {
+        ::new ((void*)p) Up(std::forward<Args>(args)...);
+    }
+
+    void destroy(T* p) { p->~T(); }
+
+    T* address(T& t) const noexcept { return std::addressof(t); }
+
+    T* address(const T& t) const noexcept { return std::addressof(t); }
+};
+
+template <class T, class Up>
+bool operator==(const CustomStdAllocator<T>&, const CustomStdAllocator<Up>&) {
+    return true;
+}
+
+template <class T, class Up>
+bool operator!=(const CustomStdAllocator<T>&, const CustomStdAllocator<Up>&) {
+    return false;
+}
\ No newline at end of file
diff --git 
a/be/test/olap/rowset/segment_v2/inverted_index/index_compaction_with_deleted_term.cpp
 
b/be/test/olap/rowset/segment_v2/inverted_index/index_compaction_with_deleted_term.cpp
index 98ebd63bdf7..bb5cfc929b3 100644
--- 
a/be/test/olap/rowset/segment_v2/inverted_index/index_compaction_with_deleted_term.cpp
+++ 
b/be/test/olap/rowset/segment_v2/inverted_index/index_compaction_with_deleted_term.cpp
@@ -244,7 +244,7 @@ static RowsetSharedPtr 
do_compaction(std::vector<RowsetSharedPtr> rowsets,
     }
 
     Merger::Statistics stats;
-    stats.rowid_conversion = &compaction._rowid_conversion;
+    stats.rowid_conversion = compaction._rowid_conversion.get();
     Status st = Merger::vertical_merge_rowsets(
             tablet, compaction.compaction_type(), 
compaction._cur_tablet_schema, input_rs_readers,
             compaction._output_rs_writer.get(), 100000, 5, &stats);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to