This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 0bcb99710b0 [branch-2.1]Add customStdAllocator for vector/map (#41193) (#45124) 0bcb99710b0 is described below commit 0bcb99710b0954f1c2aa8562b5d1beade7e31347 Author: wangbo <wan...@selectdb.com> AuthorDate: Mon Dec 9 18:33:51 2024 +0800 [branch-2.1]Add customStdAllocator for vector/map (#41193) (#45124) --- be/src/olap/compaction.cpp | 11 +-- be/src/olap/compaction.h | 2 +- be/src/olap/rowid_conversion.h | 29 +++++++- be/src/olap/rowset/beta_rowset_writer.cpp | 8 +++ be/src/olap/rowset/beta_rowset_writer.h | 4 +- be/src/olap/rowset/segcompaction.cpp | 8 ++- be/src/olap/rowset/segcompaction.h | 13 +++- be/src/olap/simple_rowid_conversion.h | 5 +- be/src/runtime/thread_context.h | 4 +- be/src/vec/common/custom_allocator.h | 82 ++++++++++++++++++++++ .../index_compaction_with_deleted_term.cpp | 2 +- 11 files changed, 152 insertions(+), 16 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 80239476e48..4b2ac6df119 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -81,6 +81,8 @@ Compaction::Compaction(const TabletSharedPtr& tablet, const std::string& label) _state(CompactionState::INITED) { _mem_tracker = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::COMPACTION, label); init_profile(label); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); + _rowid_conversion = std::make_unique<RowIdConversion>(); } Compaction::~Compaction() { @@ -90,6 +92,7 @@ Compaction::~Compaction() { _input_rowsets.clear(); _output_rowset.reset(); _cur_tablet_schema.reset(); + _rowid_conversion.reset(); } void Compaction::init_profile(const std::string& label) { @@ -378,7 +381,7 @@ Status Compaction::do_compaction_impl(int64_t permits) { if (!ctx.columns_to_do_index_compaction.empty() || (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write())) { - stats.rowid_conversion = &_rowid_conversion; + stats.rowid_conversion = _rowid_conversion.get(); } int64_t way_num = merge_way_num(); @@ -964,7 +967,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { // TODO(LiaoXin): check if there are duplicate keys std::size_t missed_rows_size = 0; _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, version.second + 1, missed_rows.get(), + _input_rowsets, *_rowid_conversion, 0, version.second + 1, missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); if (missed_rows) { @@ -1024,7 +1027,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { } DeleteBitmap txn_output_delete_bitmap(_tablet->tablet_id()); _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, 0, UINT64_MAX, missed_rows.get(), + _input_rowsets, *_rowid_conversion, 0, UINT64_MAX, missed_rows.get(), location_map.get(), *it.delete_bitmap.get(), &txn_output_delete_bitmap); if (config::enable_merge_on_write_correctness_check) { RowsetIdUnorderedSet rowsetids; @@ -1044,7 +1047,7 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { // Convert the delete bitmap of the input rowsets to output rowset for // incremental data. _tablet->calc_compaction_output_rowset_delete_bitmap( - _input_rowsets, _rowid_conversion, version.second, UINT64_MAX, + _input_rowsets, *_rowid_conversion, version.second, UINT64_MAX, missed_rows.get(), location_map.get(), _tablet->tablet_meta()->delete_bitmap(), &output_rowset_delete_bitmap); diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 5aa3e260194..0c8eb211eda 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -130,7 +130,7 @@ protected: Version _output_version; int64_t _newest_write_timestamp; - RowIdConversion _rowid_conversion; + std::unique_ptr<RowIdConversion> _rowid_conversion = nullptr; TabletSchemaSPtr _cur_tablet_schema; std::unique_ptr<RuntimeProfile> _profile; diff --git a/be/src/olap/rowid_conversion.h b/be/src/olap/rowid_conversion.h index d8e2a058d81..01a2cea0d5e 100644 --- a/be/src/olap/rowid_conversion.h +++ b/be/src/olap/rowid_conversion.h @@ -22,6 +22,7 @@ #include "olap/olap_common.h" #include "olap/utils.h" +#include "runtime/thread_context.h" namespace doris { @@ -33,17 +34,24 @@ namespace doris { class RowIdConversion { public: RowIdConversion() = default; - ~RowIdConversion() = default; + ~RowIdConversion() { RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used); } // resize segment rowid map to its rows num void init_segment_map(const RowsetId& src_rowset_id, const std::vector<uint32_t>& num_rows) { + size_t delta_std_pair_cap = 0; for (size_t i = 0; i < num_rows.size(); i++) { uint32_t id = _segments_rowid_map.size(); _segment_to_id_map.emplace(std::pair<RowsetId, uint32_t> {src_rowset_id, i}, id); _id_to_segment_map.emplace_back(src_rowset_id, i); - _segments_rowid_map.emplace_back(std::vector<std::pair<uint32_t, uint32_t>>( - num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX))); + std::vector<std::pair<uint32_t, uint32_t>> vec( + num_rows[i], std::pair<uint32_t, uint32_t>(UINT32_MAX, UINT32_MAX)); + delta_std_pair_cap += vec.capacity(); + _segments_rowid_map.emplace_back(std::move(vec)); } + //NOTE: manually count _segments_rowid_map's memory here, because _segments_rowid_map could be used by indexCompaction. + // indexCompaction is a thridparty code, it's too complex to modify it. + // refer compact_column. + track_mem_usage(delta_std_pair_cap); } // set dst rowset id @@ -109,12 +117,27 @@ public: return _segment_to_id_map.at(segment); } +private: + void track_mem_usage(size_t delta_std_pair_cap) { + _std_pair_cap += delta_std_pair_cap; + + size_t new_size = + _std_pair_cap * sizeof(std::pair<uint32_t, uint32_t>) + + _segments_rowid_map.capacity() * sizeof(std::vector<std::pair<uint32_t, uint32_t>>); + + RELEASE_THREAD_MEM_TRACKER(_seg_rowid_map_mem_used); + CONSUME_THREAD_MEM_TRACKER(new_size); + _seg_rowid_map_mem_used = new_size; + } + private: // the first level vector: index indicates src segment. // the second level vector: index indicates row id of source segment, // value indicates row id of destination segment. // <UINT32_MAX, UINT32_MAX> indicates current row not exist. std::vector<std::vector<std::pair<uint32_t, uint32_t>>> _segments_rowid_map; + size_t _seg_rowid_map_mem_used {0}; + size_t _std_pair_cap {0}; // Map source segment to 0 to n std::map<std::pair<RowsetId, uint32_t>, uint32_t> _segment_to_id_map; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 65c17eaee7e..e2e0255d9a1 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -197,6 +197,14 @@ Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { return Status::OK(); } +Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { + RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context)); + if (_segcompaction_worker) { + _segcompaction_worker->init_mem_tracker(rowset_writer_context.txn_id); + } + return Status::OK(); +} + Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id) { DCHECK(_rowset_meta->is_local()); diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 5a34371e656..6e33654753a 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -199,6 +199,8 @@ public: Status build(RowsetSharedPtr& rowset) override; + Status init(const RowsetWriterContext& rowset_writer_context) override; + Status flush_segment_writer_for_segcompaction( std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size, KeyBoundsPB& key_bounds); @@ -231,7 +233,7 @@ private: // already been segment compacted std::atomic<int32_t> _num_segcompacted {0}; // index for segment compaction - std::shared_ptr<SegcompactionWorker> _segcompaction_worker; + std::shared_ptr<SegcompactionWorker> _segcompaction_worker = nullptr; // ensure only one inflight segcompaction task for each rowset std::atomic<bool> _is_doing_segcompaction {false}; diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index cd76409c822..1675b44ee82 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -69,6 +69,11 @@ using namespace ErrorCode; SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) : _writer(writer) {} +void SegcompactionWorker::init_mem_tracker(int64_t txn_id) { + _seg_compact_mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::COMPACTION, "segcompaction-" + std::to_string(txn_id)); +} + Status SegcompactionWorker::_get_segcompaction_reader( SegCompactionCandidatesSharedPtr segments, TabletSharedPtr tablet, std::shared_ptr<Schema> schema, OlapReaderStatistics* stat, @@ -220,7 +225,8 @@ Status SegcompactionWorker::_create_segment_writer_for_segcompaction( } Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr segments) { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->segcompaction_mem_tracker()); + DCHECK(_seg_compact_mem_tracker != nullptr); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker); /* throttle segcompaction task if memory depleted */ if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to memory shortage"); diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index 67dd6889aad..d498a5b8e33 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -50,6 +50,14 @@ class SegcompactionWorker { public: explicit SegcompactionWorker(BetaRowsetWriter* writer); + ~SegcompactionWorker() { + DCHECK(_seg_compact_mem_tracker != nullptr); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_seg_compact_mem_tracker); + if (_rowid_conversion) { + _rowid_conversion.reset(); + } + } + void compact_segments(SegCompactionCandidatesSharedPtr segments); bool need_convert_delete_bitmap(); @@ -65,6 +73,8 @@ public: // set the cancel flag, tasks already started will not be cancelled. bool cancel(); + void init_mem_tracker(int64_t txn_id); + private: Status _create_segment_writer_for_segcompaction( std::unique_ptr<segment_v2::SegmentWriter>* writer, uint32_t begin, uint32_t end); @@ -88,8 +98,9 @@ private: io::FileWriterPtr _file_writer; // for unique key mow table - std::unique_ptr<SimpleRowIdConversion> _rowid_conversion; + std::unique_ptr<SimpleRowIdConversion> _rowid_conversion = nullptr; DeleteBitmapPtr _converted_delete_bitmap; + std::shared_ptr<MemTrackerLimiter> _seg_compact_mem_tracker = nullptr; // the state is not mutable when 1)actual compaction operation started or 2) cancelled std::atomic<bool> _is_compacting_state_mutable = true; diff --git a/be/src/olap/simple_rowid_conversion.h b/be/src/olap/simple_rowid_conversion.h index 1a89b01838f..e4aa2eeec3b 100644 --- a/be/src/olap/simple_rowid_conversion.h +++ b/be/src/olap/simple_rowid_conversion.h @@ -22,6 +22,7 @@ #include "olap/olap_common.h" #include "olap/utils.h" +#include "vec/common/custom_allocator.h" namespace doris { @@ -37,7 +38,7 @@ public: _cur_dst_segment_rowid = 0; for (auto seg_rows : num_rows) { _segments_rowid_map.emplace(seg_rows.first, - std::vector<uint32_t>(seg_rows.second, UINT32_MAX)); + DorisVector<uint32_t>(seg_rows.second, UINT32_MAX)); } } @@ -72,7 +73,7 @@ private: // key: index indicates src segment. // value: index indicates row id of source segment, value indicates row id of destination // segment. UINT32_MAX indicates current row not exist. - std::map<uint32_t, std::vector<uint32_t>> _segments_rowid_map; + DorisMap<uint32_t, DorisVector<uint32_t>> _segments_rowid_map; // dst rowset id RowsetId _rowst_id; diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 885d616eb06..f3a7476a962 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -225,8 +225,8 @@ public: // to nullptr, but the object it points to is not initialized. At this time, when the memory // is released somewhere, the hook is triggered to cause the crash. std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr; - [[nodiscard]] MemTrackerLimiter* thread_mem_tracker() const { - return thread_mem_tracker_mgr->limiter_mem_tracker_raw(); + [[nodiscard]] std::shared_ptr<MemTrackerLimiter> thread_mem_tracker() const { + return thread_mem_tracker_mgr->limiter_mem_tracker(); } QueryThreadContext query_thread_context(); diff --git a/be/src/vec/common/custom_allocator.h b/be/src/vec/common/custom_allocator.h new file mode 100644 index 00000000000..eee800a059d --- /dev/null +++ b/be/src/vec/common/custom_allocator.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/common/allocator.h" +#include "vec/common/allocator_fwd.h" + +template <class T, typename MemoryAllocator = Allocator<true>> +class CustomStdAllocator; + +template <typename T> +using DorisVector = std::vector<T, CustomStdAllocator<T>>; + +template <class Key, class T, class Compare = std::less<Key>, + class Allocator = CustomStdAllocator<std::pair<const Key, T>>> +using DorisMap = std::map<Key, T, Compare, Allocator>; + +// NOTE: Even CustomStdAllocator 's allocate/dallocate could modify memory tracker,but it's still stateless, +// because threadcontext owns the memtracker, not CustomStdAllocator. +template <class T, typename MemoryAllocator> +class CustomStdAllocator : private MemoryAllocator { +public: + using value_type = T; + using pointer = T*; + using const_pointer = const T*; + using size_type = std::size_t; + using difference_type = std::ptrdiff_t; + + CustomStdAllocator() noexcept = default; + + template <class U> + struct rebind { + typedef CustomStdAllocator<U> other; + }; + + template <class Up> + CustomStdAllocator(const CustomStdAllocator<Up>&) noexcept {} + + T* allocate(size_t n) { return static_cast<T*>(MemoryAllocator::alloc(n * sizeof(T))); } + + void deallocate(T* ptr, size_t n) noexcept { MemoryAllocator::free((void*)ptr, n * sizeof(T)); } + + size_t max_size() const noexcept { return size_t(~0) / sizeof(T); } + + T* allocate(size_t n, const void*) { return allocate(n); } + + template <class Up, class... Args> + void construct(Up* p, Args&&... args) { + ::new ((void*)p) Up(std::forward<Args>(args)...); + } + + void destroy(T* p) { p->~T(); } + + T* address(T& t) const noexcept { return std::addressof(t); } + + T* address(const T& t) const noexcept { return std::addressof(t); } +}; + +template <class T, class Up> +bool operator==(const CustomStdAllocator<T>&, const CustomStdAllocator<Up>&) { + return true; +} + +template <class T, class Up> +bool operator!=(const CustomStdAllocator<T>&, const CustomStdAllocator<Up>&) { + return false; +} \ No newline at end of file diff --git a/be/test/olap/rowset/segment_v2/inverted_index/index_compaction_with_deleted_term.cpp b/be/test/olap/rowset/segment_v2/inverted_index/index_compaction_with_deleted_term.cpp index 98ebd63bdf7..bb5cfc929b3 100644 --- a/be/test/olap/rowset/segment_v2/inverted_index/index_compaction_with_deleted_term.cpp +++ b/be/test/olap/rowset/segment_v2/inverted_index/index_compaction_with_deleted_term.cpp @@ -244,7 +244,7 @@ static RowsetSharedPtr do_compaction(std::vector<RowsetSharedPtr> rowsets, } Merger::Statistics stats; - stats.rowid_conversion = &compaction._rowid_conversion; + stats.rowid_conversion = compaction._rowid_conversion.get(); Status st = Merger::vertical_merge_rowsets( tablet, compaction.compaction_type(), compaction._cur_tablet_schema, input_rs_readers, compaction._output_rs_writer.get(), 100000, 5, &stats); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org