eldenmoon commented on code in PR #20037: URL: https://github.com/apache/doris/pull/20037#discussion_r1206149773
########## be/src/olap/resource_pool.cpp: ########## @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/resource_pool.h" + +#include <butil/logging.h> +#include <fmt/core.h> +#include <gen_cpp/Descriptors_types.h> +#include <glog/logging.h> + +#include <memory> +#include <mutex> +#include <vector> + +#include "common/config.h" +#include "olap/iterators.h" +#include "olap/rowset/beta_rowset_reader.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_iterator.h" +#include "olap/schema.h" +#include "olap/tablet.h" +#include "olap/tablet_schema.h" +#include "util/defer_op.h" +#include "util/time.h" + +namespace doris { + +ResourcePool* ResourcePool::_s_instance = nullptr; + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const Schema& schema) { + std::string key = fmt::format("{}-", tablet_id); + const auto& column_ids = schema.column_ids(); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema.column(cid)->unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const TabletSchemaSPtr& schema, + const std::vector<uint32_t>& column_ids) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema->column(cid).unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const std::vector<TColumn>& columns) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(columns.begin(), columns.end(), [&](const TColumn& col) { + key.append(fmt::format("{}", col.col_unique_id)); + key.append("-"); + }); + return key; +} + +void ResourcePool::create_global_instance(size_t capacity) { + DCHECK(_s_instance == nullptr); + static ResourcePool instance(capacity); + _s_instance = &instance; +} + +ResourcePool::ResourcePool(size_t capacity) { + _seg_iter_cache = std::unique_ptr<Cache>( + new_lru_cache("SegmentIteratorPool", capacity, LRUCacheType::NUMBER)); +} + +bool ResourcePool::SegmentIterCacheKey::operator==(const SegmentIterCacheKey& rhs) const { + return rowset_id == rhs.rowset_id && segment_id == rhs.segment_id && + schema_key == rhs.schema_key; +} + +SegmentIteratorUPtr ResourcePool::SegmentIterCacheValue::pop_iter() { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.empty()) { + return nullptr; + } + SegmentIteratorUPtr it = std::move(_iters.back()); + _iters.pop_back(); + return it; +} + +void ResourcePool::SegmentIterCacheValue::append_iter(SegmentIteratorUPtr&& it) { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.size() + 1 > capacity) { + return; + } + _iters.push_back(std::move(it)); +} + +void ResourcePool::SegmentIterCacheValue::clear() { + _iters.clear(); +} + +size_t ResourcePool::HashOfCacheKey::operator()(const SegmentIterCacheKey& cache_key) const { + size_t seed = 0; + uint32_t segment_id = cache_key.segment_id; + seed = HashUtil::hash64(&cache_key.rowset_id.hi, sizeof(cache_key.rowset_id.hi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.mi, sizeof(cache_key.rowset_id.mi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.lo, sizeof(cache_key.rowset_id.lo), seed); + seed = HashUtil::hash64(&segment_id, sizeof(segment_id), seed); + seed = HashUtil::hash64(cache_key.schema_key.data(), cache_key.schema_key.size(), seed); + return seed; +} + +SegmentIteratorUPtr ResourcePool::borrow_segment_iter(int32_t tablet_id, SchemaSPtr schema, + SegmentSharedPtr segment) { + if (!config::enable_resource_pool) { + return nullptr; + } + ResourcePool::SegmentIterCacheKey key; + key.schema_key = ResourcePool::get_schema_key(tablet_id, *schema); + key.rowset_id = segment->rowset_id(); + key.segment_id = segment->id(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + if (lru_handle) { + Defer release([cache = _seg_iter_cache.get(), lru_handle] { cache->release(lru_handle); }); + auto value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->last_visit_time = UnixMillis(); + auto it = value->pop_iter(); + if (it) { + return it; + } + } + // Construct a new segment iterator, note that the shared_ptr of SegmentIterator can only + // processed in a single thread not shared among threads + auto new_seg_iter = std::make_unique<segment_v2::SegmentIterator>(segment, schema, + config::enable_resource_pool); + return new_seg_iter; +} + +bool ResourcePool::return_segment_iter(const SegmentIterCacheKey& key, SegmentIteratorUPtr&& iter) { + iter->clear(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + Defer release([&] { _seg_iter_cache->release(lru_handle); }); + if (lru_handle) { + SegmentIterCacheValue* value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->append_iter(std::move(iter)); + return true; + } + auto deleter = [](const doris::CacheKey& key, void* value) { + SegmentIterCacheValue* cache_value = (SegmentIterCacheValue*)value; + cache_value->clear(); + delete cache_value; + }; + SegmentIterCacheValue* value = new SegmentIterCacheValue(); + value->append_iter(std::move(iter)); + std::string encoded_key = key.encode(); + CHECK(!encoded_key.empty()); + lru_handle = _seg_iter_cache->insert( + encoded_key, value, sizeof(SegmentIterCacheValue), deleter, CachePriority::NORMAL, + sizeof(SegmentIterCacheValue) * SegmentIterCacheValue::capacity); + return true; +} + +void ResourcePool::return_iterator(RowwiseIteratorUPtr&& iter) { + CHECK(iter->is_borrowed()); + RowwiseIterator* iter_ptr = iter.release(); + SegmentIteratorUPtr segment_iter = SegmentIteratorUPtr(static_cast<SegmentIterator*>(iter_ptr)); + CHECK(segment_iter); + SegmentIterCacheKey key; + key.segment_id = segment_iter->data_id(); + key.rowset_id = segment_iter->rowset_id(); + key.schema_key = get_schema_key(segment_iter->tablet_id(), segment_iter->schema()); + ResourcePool::instance()->return_segment_iter(key, std::move(segment_iter)); +} + +template <typename MapType> +void remove_half(MapType& map) { + std::vector<std::string> keys(map.size() / 2); + for (auto it : map) { + if (keys.size() >= map.size() / 2) { + break; + } + keys.push_back(it.first); + } + for (const auto& key : keys) { + map.erase(key); + } +} + +Status ResourcePool::prune() { + const int64_t curtime = UnixMillis(); + auto pred = [curtime](const void* value) -> bool { + SegmentIterCacheValue* cache_value = (SegmentIterCacheValue*)value; + cache_value->clear(); Review Comment: yes, clear here is meaningless -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org