xinyiZzz commented on code in PR #20037: URL: https://github.com/apache/doris/pull/20037#discussion_r1205692196
########## be/src/olap/resource_pool.cpp: ########## @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/resource_pool.h" + +#include <butil/logging.h> +#include <fmt/core.h> +#include <gen_cpp/Descriptors_types.h> +#include <glog/logging.h> + +#include <memory> +#include <mutex> +#include <vector> + +#include "common/config.h" +#include "olap/iterators.h" +#include "olap/rowset/beta_rowset_reader.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_iterator.h" +#include "olap/schema.h" +#include "olap/tablet.h" +#include "olap/tablet_schema.h" +#include "util/defer_op.h" +#include "util/time.h" + +namespace doris { + +ResourcePool* ResourcePool::_s_instance = nullptr; + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const Schema& schema) { + std::string key = fmt::format("{}-", tablet_id); + const auto& column_ids = schema.column_ids(); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema.column(cid)->unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const TabletSchemaSPtr& schema, + const std::vector<uint32_t>& column_ids) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema->column(cid).unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const std::vector<TColumn>& columns) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(columns.begin(), columns.end(), [&](const TColumn& col) { + key.append(fmt::format("{}", col.col_unique_id)); + key.append("-"); + }); + return key; +} + +void ResourcePool::create_global_instance(size_t capacity) { + DCHECK(_s_instance == nullptr); + static ResourcePool instance(capacity); + _s_instance = &instance; +} + +ResourcePool::ResourcePool(size_t capacity) { + _seg_iter_cache = std::unique_ptr<Cache>( + new_lru_cache("SegmentIteratorPool", capacity, LRUCacheType::NUMBER)); +} + +bool ResourcePool::SegmentIterCacheKey::operator==(const SegmentIterCacheKey& rhs) const { + return rowset_id == rhs.rowset_id && segment_id == rhs.segment_id && + schema_key == rhs.schema_key; +} + +SegmentIteratorUPtr ResourcePool::SegmentIterCacheValue::pop_iter() { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.empty()) { + return nullptr; + } + SegmentIteratorUPtr it = std::move(_iters.back()); + _iters.pop_back(); + return it; +} + +void ResourcePool::SegmentIterCacheValue::append_iter(SegmentIteratorUPtr&& it) { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.size() + 1 > capacity) { + return; + } + _iters.push_back(std::move(it)); +} + +void ResourcePool::SegmentIterCacheValue::clear() { + _iters.clear(); +} + +size_t ResourcePool::HashOfCacheKey::operator()(const SegmentIterCacheKey& cache_key) const { + size_t seed = 0; + uint32_t segment_id = cache_key.segment_id; + seed = HashUtil::hash64(&cache_key.rowset_id.hi, sizeof(cache_key.rowset_id.hi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.mi, sizeof(cache_key.rowset_id.mi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.lo, sizeof(cache_key.rowset_id.lo), seed); + seed = HashUtil::hash64(&segment_id, sizeof(segment_id), seed); + seed = HashUtil::hash64(cache_key.schema_key.data(), cache_key.schema_key.size(), seed); + return seed; +} + +SegmentIteratorUPtr ResourcePool::borrow_segment_iter(int32_t tablet_id, SchemaSPtr schema, + SegmentSharedPtr segment) { + if (!config::enable_resource_pool) { + return nullptr; + } + ResourcePool::SegmentIterCacheKey key; + key.schema_key = ResourcePool::get_schema_key(tablet_id, *schema); + key.rowset_id = segment->rowset_id(); + key.segment_id = segment->id(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + if (lru_handle) { + Defer release([cache = _seg_iter_cache.get(), lru_handle] { cache->release(lru_handle); }); + auto value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->last_visit_time = UnixMillis(); + auto it = value->pop_iter(); + if (it) { + return it; + } + } + // Construct a new segment iterator, note that the shared_ptr of SegmentIterator can only + // processed in a single thread not shared among threads + auto new_seg_iter = std::make_unique<segment_v2::SegmentIterator>(segment, schema, + config::enable_resource_pool); + return new_seg_iter; +} + +bool ResourcePool::return_segment_iter(const SegmentIterCacheKey& key, SegmentIteratorUPtr&& iter) { + iter->clear(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + Defer release([&] { _seg_iter_cache->release(lru_handle); }); + if (lru_handle) { + SegmentIterCacheValue* value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->append_iter(std::move(iter)); + return true; + } + auto deleter = [](const doris::CacheKey& key, void* value) { + SegmentIterCacheValue* cache_value = (SegmentIterCacheValue*)value; + cache_value->clear(); + delete cache_value; + }; + SegmentIterCacheValue* value = new SegmentIterCacheValue(); + value->append_iter(std::move(iter)); + std::string encoded_key = key.encode(); + CHECK(!encoded_key.empty()); + lru_handle = _seg_iter_cache->insert( + encoded_key, value, sizeof(SegmentIterCacheValue), deleter, CachePriority::NORMAL, + sizeof(SegmentIterCacheValue) * SegmentIterCacheValue::capacity); + return true; +} + +void ResourcePool::return_iterator(RowwiseIteratorUPtr&& iter) { + CHECK(iter->is_borrowed()); + RowwiseIterator* iter_ptr = iter.release(); + SegmentIteratorUPtr segment_iter = SegmentIteratorUPtr(static_cast<SegmentIterator*>(iter_ptr)); + CHECK(segment_iter); + SegmentIterCacheKey key; + key.segment_id = segment_iter->data_id(); + key.rowset_id = segment_iter->rowset_id(); + key.schema_key = get_schema_key(segment_iter->tablet_id(), segment_iter->schema()); + ResourcePool::instance()->return_segment_iter(key, std::move(segment_iter)); +} + +template <typename MapType> +void remove_half(MapType& map) { + std::vector<std::string> keys(map.size() / 2); + for (auto it : map) { + if (keys.size() >= map.size() / 2) { + break; + } + keys.push_back(it.first); + } + for (const auto& key : keys) { + map.erase(key); + } +} + +Status ResourcePool::prune() { + const int64_t curtime = UnixMillis(); + auto pred = [curtime](const void* value) -> bool { + SegmentIterCacheValue* cache_value = (SegmentIterCacheValue*)value; + cache_value->clear(); + return (cache_value->last_visit_time + config::tablet_rowset_stale_sweep_time_sec * 1000) < + curtime; + }; + + MonotonicStopWatch watch; + watch.start(); + // Prune cache in lazy mode to save cpu and minimize the time holding write lock + int64_t prune_num = _seg_iter_cache->prune_if(pred, true); + LOG(INFO) << "prune " << prune_num + << " entries in ResourcePool cache. cost(ms): " << watch.elapsed_time() / 1000 / 1000; + // TODO prune schemas more smart, here we just prune half of schemas + { + std::lock_guard<std::mutex> lock(_schema_lock); + remove_half(_schema_cache); + } + { + std::lock_guard<std::mutex> lock(_tablet_schema_lock); + remove_half(_tablet_schema_cache); + } + return Status::OK(); +} + +// Get a shared cached Schema from resource pool +SchemaSPtr ResourcePool::get_shared_schema(const std::string& schema_key) { + if (!config::enable_resource_pool) { + return nullptr; + } + std::lock_guard<std::mutex> lock(_schema_lock); + auto it = _schema_cache.find(schema_key); + if (it != _schema_cache.end()) { + return it->second; + } + return nullptr; +} + +TabletSchemaSPtr ResourcePool::get_shared_tablet_schema(const std::string& schema_key) { + if (!config::enable_resource_pool) { + return nullptr; + } + std::lock_guard<std::mutex> lock(_tablet_schema_lock); + auto it = _tablet_schema_cache.find(schema_key); + if (it != _tablet_schema_cache.end()) { + return it->second; + } + return nullptr; +} + +void ResourcePool::insert_schema(const std::string& key, SchemaSPtr schema) { + if (!config::enable_resource_pool) { + return; + } + std::lock_guard<std::mutex> lock(_schema_lock); + if (_schema_cache.size() + 1 > SCHEMA_CAPACITY) { + return; + } + auto it = _schema_cache.find(key); + if (it == _schema_cache.end()) { + _schema_cache.emplace_hint(it, key, schema); Review Comment: number and memory size of `_schema_cache` should be recorded, use metrics ########## be/src/olap/resource_pool.cpp: ########## @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/resource_pool.h" + +#include <butil/logging.h> +#include <fmt/core.h> +#include <gen_cpp/Descriptors_types.h> +#include <glog/logging.h> + +#include <memory> +#include <mutex> +#include <vector> + +#include "common/config.h" +#include "olap/iterators.h" +#include "olap/rowset/beta_rowset_reader.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_iterator.h" +#include "olap/schema.h" +#include "olap/tablet.h" +#include "olap/tablet_schema.h" +#include "util/defer_op.h" +#include "util/time.h" + +namespace doris { + +ResourcePool* ResourcePool::_s_instance = nullptr; + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const Schema& schema) { + std::string key = fmt::format("{}-", tablet_id); + const auto& column_ids = schema.column_ids(); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema.column(cid)->unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const TabletSchemaSPtr& schema, + const std::vector<uint32_t>& column_ids) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema->column(cid).unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const std::vector<TColumn>& columns) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(columns.begin(), columns.end(), [&](const TColumn& col) { + key.append(fmt::format("{}", col.col_unique_id)); + key.append("-"); + }); + return key; +} + +void ResourcePool::create_global_instance(size_t capacity) { + DCHECK(_s_instance == nullptr); + static ResourcePool instance(capacity); + _s_instance = &instance; +} + +ResourcePool::ResourcePool(size_t capacity) { + _seg_iter_cache = std::unique_ptr<Cache>( + new_lru_cache("SegmentIteratorPool", capacity, LRUCacheType::NUMBER)); +} + +bool ResourcePool::SegmentIterCacheKey::operator==(const SegmentIterCacheKey& rhs) const { + return rowset_id == rhs.rowset_id && segment_id == rhs.segment_id && + schema_key == rhs.schema_key; +} + +SegmentIteratorUPtr ResourcePool::SegmentIterCacheValue::pop_iter() { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.empty()) { + return nullptr; + } + SegmentIteratorUPtr it = std::move(_iters.back()); + _iters.pop_back(); + return it; +} + +void ResourcePool::SegmentIterCacheValue::append_iter(SegmentIteratorUPtr&& it) { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.size() + 1 > capacity) { + return; + } + _iters.push_back(std::move(it)); +} + +void ResourcePool::SegmentIterCacheValue::clear() { + _iters.clear(); +} + +size_t ResourcePool::HashOfCacheKey::operator()(const SegmentIterCacheKey& cache_key) const { + size_t seed = 0; + uint32_t segment_id = cache_key.segment_id; + seed = HashUtil::hash64(&cache_key.rowset_id.hi, sizeof(cache_key.rowset_id.hi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.mi, sizeof(cache_key.rowset_id.mi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.lo, sizeof(cache_key.rowset_id.lo), seed); + seed = HashUtil::hash64(&segment_id, sizeof(segment_id), seed); + seed = HashUtil::hash64(cache_key.schema_key.data(), cache_key.schema_key.size(), seed); + return seed; +} + +SegmentIteratorUPtr ResourcePool::borrow_segment_iter(int32_t tablet_id, SchemaSPtr schema, + SegmentSharedPtr segment) { + if (!config::enable_resource_pool) { + return nullptr; + } + ResourcePool::SegmentIterCacheKey key; + key.schema_key = ResourcePool::get_schema_key(tablet_id, *schema); + key.rowset_id = segment->rowset_id(); + key.segment_id = segment->id(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + if (lru_handle) { + Defer release([cache = _seg_iter_cache.get(), lru_handle] { cache->release(lru_handle); }); + auto value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->last_visit_time = UnixMillis(); + auto it = value->pop_iter(); + if (it) { + return it; + } + } + // Construct a new segment iterator, note that the shared_ptr of SegmentIterator can only + // processed in a single thread not shared among threads + auto new_seg_iter = std::make_unique<segment_v2::SegmentIterator>(segment, schema, + config::enable_resource_pool); + return new_seg_iter; +} + +bool ResourcePool::return_segment_iter(const SegmentIterCacheKey& key, SegmentIteratorUPtr&& iter) { + iter->clear(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + Defer release([&] { _seg_iter_cache->release(lru_handle); }); + if (lru_handle) { + SegmentIterCacheValue* value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->append_iter(std::move(iter)); + return true; + } + auto deleter = [](const doris::CacheKey& key, void* value) { + SegmentIterCacheValue* cache_value = (SegmentIterCacheValue*)value; + cache_value->clear(); + delete cache_value; + }; + SegmentIterCacheValue* value = new SegmentIterCacheValue(); + value->append_iter(std::move(iter)); + std::string encoded_key = key.encode(); + CHECK(!encoded_key.empty()); + lru_handle = _seg_iter_cache->insert( + encoded_key, value, sizeof(SegmentIterCacheValue), deleter, CachePriority::NORMAL, + sizeof(SegmentIterCacheValue) * SegmentIterCacheValue::capacity); + return true; +} + +void ResourcePool::return_iterator(RowwiseIteratorUPtr&& iter) { + CHECK(iter->is_borrowed()); + RowwiseIterator* iter_ptr = iter.release(); + SegmentIteratorUPtr segment_iter = SegmentIteratorUPtr(static_cast<SegmentIterator*>(iter_ptr)); + CHECK(segment_iter); + SegmentIterCacheKey key; + key.segment_id = segment_iter->data_id(); + key.rowset_id = segment_iter->rowset_id(); + key.schema_key = get_schema_key(segment_iter->tablet_id(), segment_iter->schema()); + ResourcePool::instance()->return_segment_iter(key, std::move(segment_iter)); +} + +template <typename MapType> +void remove_half(MapType& map) { + std::vector<std::string> keys(map.size() / 2); + for (auto it : map) { + if (keys.size() >= map.size() / 2) { + break; + } + keys.push_back(it.first); + } + for (const auto& key : keys) { + map.erase(key); + } +} + +Status ResourcePool::prune() { + const int64_t curtime = UnixMillis(); + auto pred = [curtime](const void* value) -> bool { + SegmentIterCacheValue* cache_value = (SegmentIterCacheValue*)value; + cache_value->clear(); + return (cache_value->last_visit_time + config::tablet_rowset_stale_sweep_time_sec * 1000) < Review Comment: clear() in `~SegmentIterCacheValue`, so that the cache value will be cleared when it is evict due to timeout ########## be/src/runtime/exec_env_init.cpp: ########## @@ -279,6 +280,13 @@ Status ExecEnv::_init_mem_env() { << " segment_cache_capacity: " << segment_cache_capacity; SegmentLoader::create_global_instance(segment_cache_capacity); + uint64_t segment_iter_pool_capacity = segment_cache_capacity / 10; Review Comment: why not add conf `segment_iter_pool_capacity`, segment cache capacity / 10 is there any basis ########## be/src/olap/resource_pool.cpp: ########## @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/resource_pool.h" + +#include <butil/logging.h> +#include <fmt/core.h> +#include <gen_cpp/Descriptors_types.h> +#include <glog/logging.h> + +#include <memory> +#include <mutex> +#include <vector> + +#include "common/config.h" +#include "olap/iterators.h" +#include "olap/rowset/beta_rowset_reader.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_iterator.h" +#include "olap/schema.h" +#include "olap/tablet.h" +#include "olap/tablet_schema.h" +#include "util/defer_op.h" +#include "util/time.h" + +namespace doris { + +ResourcePool* ResourcePool::_s_instance = nullptr; + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const Schema& schema) { + std::string key = fmt::format("{}-", tablet_id); + const auto& column_ids = schema.column_ids(); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema.column(cid)->unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const TabletSchemaSPtr& schema, + const std::vector<uint32_t>& column_ids) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema->column(cid).unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const std::vector<TColumn>& columns) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(columns.begin(), columns.end(), [&](const TColumn& col) { + key.append(fmt::format("{}", col.col_unique_id)); + key.append("-"); + }); + return key; +} + +void ResourcePool::create_global_instance(size_t capacity) { + DCHECK(_s_instance == nullptr); + static ResourcePool instance(capacity); + _s_instance = &instance; +} + +ResourcePool::ResourcePool(size_t capacity) { + _seg_iter_cache = std::unique_ptr<Cache>( + new_lru_cache("SegmentIteratorPool", capacity, LRUCacheType::NUMBER)); +} + +bool ResourcePool::SegmentIterCacheKey::operator==(const SegmentIterCacheKey& rhs) const { + return rowset_id == rhs.rowset_id && segment_id == rhs.segment_id && + schema_key == rhs.schema_key; +} + +SegmentIteratorUPtr ResourcePool::SegmentIterCacheValue::pop_iter() { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.empty()) { + return nullptr; + } + SegmentIteratorUPtr it = std::move(_iters.back()); + _iters.pop_back(); + return it; +} + +void ResourcePool::SegmentIterCacheValue::append_iter(SegmentIteratorUPtr&& it) { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.size() + 1 > capacity) { + return; + } + _iters.push_back(std::move(it)); +} + +void ResourcePool::SegmentIterCacheValue::clear() { + _iters.clear(); +} + +size_t ResourcePool::HashOfCacheKey::operator()(const SegmentIterCacheKey& cache_key) const { + size_t seed = 0; + uint32_t segment_id = cache_key.segment_id; + seed = HashUtil::hash64(&cache_key.rowset_id.hi, sizeof(cache_key.rowset_id.hi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.mi, sizeof(cache_key.rowset_id.mi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.lo, sizeof(cache_key.rowset_id.lo), seed); + seed = HashUtil::hash64(&segment_id, sizeof(segment_id), seed); + seed = HashUtil::hash64(cache_key.schema_key.data(), cache_key.schema_key.size(), seed); + return seed; +} + +SegmentIteratorUPtr ResourcePool::borrow_segment_iter(int32_t tablet_id, SchemaSPtr schema, + SegmentSharedPtr segment) { + if (!config::enable_resource_pool) { + return nullptr; + } + ResourcePool::SegmentIterCacheKey key; + key.schema_key = ResourcePool::get_schema_key(tablet_id, *schema); + key.rowset_id = segment->rowset_id(); + key.segment_id = segment->id(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + if (lru_handle) { + Defer release([cache = _seg_iter_cache.get(), lru_handle] { cache->release(lru_handle); }); + auto value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->last_visit_time = UnixMillis(); + auto it = value->pop_iter(); + if (it) { + return it; + } + } + // Construct a new segment iterator, note that the shared_ptr of SegmentIterator can only + // processed in a single thread not shared among threads + auto new_seg_iter = std::make_unique<segment_v2::SegmentIterator>(segment, schema, + config::enable_resource_pool); + return new_seg_iter; +} + +bool ResourcePool::return_segment_iter(const SegmentIterCacheKey& key, SegmentIteratorUPtr&& iter) { + iter->clear(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + Defer release([&] { _seg_iter_cache->release(lru_handle); }); + if (lru_handle) { + SegmentIterCacheValue* value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->append_iter(std::move(iter)); + return true; + } + auto deleter = [](const doris::CacheKey& key, void* value) { + SegmentIterCacheValue* cache_value = (SegmentIterCacheValue*)value; + cache_value->clear(); + delete cache_value; + }; + SegmentIterCacheValue* value = new SegmentIterCacheValue(); + value->append_iter(std::move(iter)); + std::string encoded_key = key.encode(); + CHECK(!encoded_key.empty()); + lru_handle = _seg_iter_cache->insert( + encoded_key, value, sizeof(SegmentIterCacheValue), deleter, CachePriority::NORMAL, + sizeof(SegmentIterCacheValue) * SegmentIterCacheValue::capacity); Review Comment: `insert` last parameter bytes should be the all `SegmentIterator` sum of memory, So `SegmentIterator` should have a method that returns the memory, similar to `Segment::_meta_mem_usage` ########## be/src/olap/resource_pool.cpp: ########## @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/resource_pool.h" + +#include <butil/logging.h> +#include <fmt/core.h> +#include <gen_cpp/Descriptors_types.h> +#include <glog/logging.h> + +#include <memory> +#include <mutex> +#include <vector> + +#include "common/config.h" +#include "olap/iterators.h" +#include "olap/rowset/beta_rowset_reader.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_iterator.h" +#include "olap/schema.h" +#include "olap/tablet.h" +#include "olap/tablet_schema.h" +#include "util/defer_op.h" +#include "util/time.h" + +namespace doris { + +ResourcePool* ResourcePool::_s_instance = nullptr; + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const Schema& schema) { + std::string key = fmt::format("{}-", tablet_id); + const auto& column_ids = schema.column_ids(); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema.column(cid)->unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const TabletSchemaSPtr& schema, + const std::vector<uint32_t>& column_ids) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema->column(cid).unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const std::vector<TColumn>& columns) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(columns.begin(), columns.end(), [&](const TColumn& col) { + key.append(fmt::format("{}", col.col_unique_id)); + key.append("-"); + }); + return key; +} + +void ResourcePool::create_global_instance(size_t capacity) { + DCHECK(_s_instance == nullptr); + static ResourcePool instance(capacity); + _s_instance = &instance; +} + +ResourcePool::ResourcePool(size_t capacity) { + _seg_iter_cache = std::unique_ptr<Cache>( + new_lru_cache("SegmentIteratorPool", capacity, LRUCacheType::NUMBER)); +} + +bool ResourcePool::SegmentIterCacheKey::operator==(const SegmentIterCacheKey& rhs) const { + return rowset_id == rhs.rowset_id && segment_id == rhs.segment_id && + schema_key == rhs.schema_key; +} + +SegmentIteratorUPtr ResourcePool::SegmentIterCacheValue::pop_iter() { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.empty()) { + return nullptr; + } + SegmentIteratorUPtr it = std::move(_iters.back()); + _iters.pop_back(); + return it; +} + +void ResourcePool::SegmentIterCacheValue::append_iter(SegmentIteratorUPtr&& it) { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.size() + 1 > capacity) { + return; + } + _iters.push_back(std::move(it)); +} + +void ResourcePool::SegmentIterCacheValue::clear() { + _iters.clear(); +} + +size_t ResourcePool::HashOfCacheKey::operator()(const SegmentIterCacheKey& cache_key) const { + size_t seed = 0; + uint32_t segment_id = cache_key.segment_id; + seed = HashUtil::hash64(&cache_key.rowset_id.hi, sizeof(cache_key.rowset_id.hi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.mi, sizeof(cache_key.rowset_id.mi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.lo, sizeof(cache_key.rowset_id.lo), seed); + seed = HashUtil::hash64(&segment_id, sizeof(segment_id), seed); + seed = HashUtil::hash64(cache_key.schema_key.data(), cache_key.schema_key.size(), seed); + return seed; +} + +SegmentIteratorUPtr ResourcePool::borrow_segment_iter(int32_t tablet_id, SchemaSPtr schema, + SegmentSharedPtr segment) { + if (!config::enable_resource_pool) { + return nullptr; + } + ResourcePool::SegmentIterCacheKey key; + key.schema_key = ResourcePool::get_schema_key(tablet_id, *schema); + key.rowset_id = segment->rowset_id(); + key.segment_id = segment->id(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + if (lru_handle) { + Defer release([cache = _seg_iter_cache.get(), lru_handle] { cache->release(lru_handle); }); + auto value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->last_visit_time = UnixMillis(); + auto it = value->pop_iter(); + if (it) { + return it; + } + } + // Construct a new segment iterator, note that the shared_ptr of SegmentIterator can only + // processed in a single thread not shared among threads + auto new_seg_iter = std::make_unique<segment_v2::SegmentIterator>(segment, schema, + config::enable_resource_pool); + return new_seg_iter; +} + +bool ResourcePool::return_segment_iter(const SegmentIterCacheKey& key, SegmentIteratorUPtr&& iter) { + iter->clear(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + Defer release([&] { _seg_iter_cache->release(lru_handle); }); + if (lru_handle) { + SegmentIterCacheValue* value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->append_iter(std::move(iter)); + return true; + } + auto deleter = [](const doris::CacheKey& key, void* value) { + SegmentIterCacheValue* cache_value = (SegmentIterCacheValue*)value; + cache_value->clear(); + delete cache_value; + }; + SegmentIterCacheValue* value = new SegmentIterCacheValue(); + value->append_iter(std::move(iter)); + std::string encoded_key = key.encode(); + CHECK(!encoded_key.empty()); + lru_handle = _seg_iter_cache->insert( + encoded_key, value, sizeof(SegmentIterCacheValue), deleter, CachePriority::NORMAL, + sizeof(SegmentIterCacheValue) * SegmentIterCacheValue::capacity); + return true; +} + +void ResourcePool::return_iterator(RowwiseIteratorUPtr&& iter) { + CHECK(iter->is_borrowed()); + RowwiseIterator* iter_ptr = iter.release(); + SegmentIteratorUPtr segment_iter = SegmentIteratorUPtr(static_cast<SegmentIterator*>(iter_ptr)); + CHECK(segment_iter); + SegmentIterCacheKey key; + key.segment_id = segment_iter->data_id(); + key.rowset_id = segment_iter->rowset_id(); + key.schema_key = get_schema_key(segment_iter->tablet_id(), segment_iter->schema()); + ResourcePool::instance()->return_segment_iter(key, std::move(segment_iter)); +} + +template <typename MapType> +void remove_half(MapType& map) { + std::vector<std::string> keys(map.size() / 2); + for (auto it : map) { + if (keys.size() >= map.size() / 2) { + break; + } + keys.push_back(it.first); + } + for (const auto& key : keys) { + map.erase(key); + } +} + +Status ResourcePool::prune() { + const int64_t curtime = UnixMillis(); + auto pred = [curtime](const void* value) -> bool { + SegmentIterCacheValue* cache_value = (SegmentIterCacheValue*)value; + cache_value->clear(); + return (cache_value->last_visit_time + config::tablet_rowset_stale_sweep_time_sec * 1000) < + curtime; + }; + + MonotonicStopWatch watch; + watch.start(); + // Prune cache in lazy mode to save cpu and minimize the time holding write lock + int64_t prune_num = _seg_iter_cache->prune_if(pred, true); + LOG(INFO) << "prune " << prune_num + << " entries in ResourcePool cache. cost(ms): " << watch.elapsed_time() / 1000 / 1000; + // TODO prune schemas more smart, here we just prune half of schemas + { + std::lock_guard<std::mutex> lock(_schema_lock); + remove_half(_schema_cache); + } + { + std::lock_guard<std::mutex> lock(_tablet_schema_lock); + remove_half(_tablet_schema_cache); + } + return Status::OK(); +} + +// Get a shared cached Schema from resource pool +SchemaSPtr ResourcePool::get_shared_schema(const std::string& schema_key) { + if (!config::enable_resource_pool) { + return nullptr; + } + std::lock_guard<std::mutex> lock(_schema_lock); + auto it = _schema_cache.find(schema_key); + if (it != _schema_cache.end()) { + return it->second; + } + return nullptr; +} + +TabletSchemaSPtr ResourcePool::get_shared_tablet_schema(const std::string& schema_key) { + if (!config::enable_resource_pool) { + return nullptr; + } + std::lock_guard<std::mutex> lock(_tablet_schema_lock); + auto it = _tablet_schema_cache.find(schema_key); + if (it != _tablet_schema_cache.end()) { + return it->second; + } + return nullptr; +} + +void ResourcePool::insert_schema(const std::string& key, SchemaSPtr schema) { + if (!config::enable_resource_pool) { + return; + } + std::lock_guard<std::mutex> lock(_schema_lock); + if (_schema_cache.size() + 1 > SCHEMA_CAPACITY) { + return; + } + auto it = _schema_cache.find(key); + if (it == _schema_cache.end()) { + _schema_cache.emplace_hint(it, key, schema); + } +} + +void ResourcePool::insert_tablet_schema(const std::string& key, TabletSchemaSPtr schema) { + if (!config::enable_resource_pool) { + return; + } + std::lock_guard<std::mutex> lock(_tablet_schema_lock); + if (_tablet_schema_cache.size() + 1 > SCHEMA_CAPACITY) { + return; + } + auto it = _tablet_schema_cache.find(key); + if (it == _tablet_schema_cache.end()) { + _tablet_schema_cache.emplace_hint(it, key, schema); Review Comment: also should be recorded ########## be/src/olap/resource_pool.cpp: ########## @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/resource_pool.h" + +#include <butil/logging.h> +#include <fmt/core.h> +#include <gen_cpp/Descriptors_types.h> +#include <glog/logging.h> + +#include <memory> +#include <mutex> +#include <vector> + +#include "common/config.h" +#include "olap/iterators.h" +#include "olap/rowset/beta_rowset_reader.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/rowset/segment_v2/segment_iterator.h" +#include "olap/schema.h" +#include "olap/tablet.h" +#include "olap/tablet_schema.h" +#include "util/defer_op.h" +#include "util/time.h" + +namespace doris { + +ResourcePool* ResourcePool::_s_instance = nullptr; + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const Schema& schema) { + std::string key = fmt::format("{}-", tablet_id); + const auto& column_ids = schema.column_ids(); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema.column(cid)->unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const TabletSchemaSPtr& schema, + const std::vector<uint32_t>& column_ids) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(column_ids.begin(), column_ids.end(), [&](const ColumnId& cid) { + uint32_t col_unique_id = schema->column(cid).unique_id(); + key.append(fmt::format("{}", col_unique_id)); + key.append("-"); + }); + return key; +} + +std::string ResourcePool::get_schema_key(int32_t tablet_id, const std::vector<TColumn>& columns) { + std::string key = fmt::format("{}-", tablet_id); + std::for_each(columns.begin(), columns.end(), [&](const TColumn& col) { + key.append(fmt::format("{}", col.col_unique_id)); + key.append("-"); + }); + return key; +} + +void ResourcePool::create_global_instance(size_t capacity) { + DCHECK(_s_instance == nullptr); + static ResourcePool instance(capacity); + _s_instance = &instance; +} + +ResourcePool::ResourcePool(size_t capacity) { + _seg_iter_cache = std::unique_ptr<Cache>( + new_lru_cache("SegmentIteratorPool", capacity, LRUCacheType::NUMBER)); +} + +bool ResourcePool::SegmentIterCacheKey::operator==(const SegmentIterCacheKey& rhs) const { + return rowset_id == rhs.rowset_id && segment_id == rhs.segment_id && + schema_key == rhs.schema_key; +} + +SegmentIteratorUPtr ResourcePool::SegmentIterCacheValue::pop_iter() { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.empty()) { + return nullptr; + } + SegmentIteratorUPtr it = std::move(_iters.back()); + _iters.pop_back(); + return it; +} + +void ResourcePool::SegmentIterCacheValue::append_iter(SegmentIteratorUPtr&& it) { + std::lock_guard<std::mutex> lock(_mtx); + if (_iters.size() + 1 > capacity) { + return; + } + _iters.push_back(std::move(it)); +} + +void ResourcePool::SegmentIterCacheValue::clear() { + _iters.clear(); +} + +size_t ResourcePool::HashOfCacheKey::operator()(const SegmentIterCacheKey& cache_key) const { + size_t seed = 0; + uint32_t segment_id = cache_key.segment_id; + seed = HashUtil::hash64(&cache_key.rowset_id.hi, sizeof(cache_key.rowset_id.hi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.mi, sizeof(cache_key.rowset_id.mi), seed); + seed = HashUtil::hash64(&cache_key.rowset_id.lo, sizeof(cache_key.rowset_id.lo), seed); + seed = HashUtil::hash64(&segment_id, sizeof(segment_id), seed); + seed = HashUtil::hash64(cache_key.schema_key.data(), cache_key.schema_key.size(), seed); + return seed; +} + +SegmentIteratorUPtr ResourcePool::borrow_segment_iter(int32_t tablet_id, SchemaSPtr schema, + SegmentSharedPtr segment) { + if (!config::enable_resource_pool) { + return nullptr; + } + ResourcePool::SegmentIterCacheKey key; + key.schema_key = ResourcePool::get_schema_key(tablet_id, *schema); + key.rowset_id = segment->rowset_id(); + key.segment_id = segment->id(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + if (lru_handle) { + Defer release([cache = _seg_iter_cache.get(), lru_handle] { cache->release(lru_handle); }); + auto value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->last_visit_time = UnixMillis(); + auto it = value->pop_iter(); + if (it) { + return it; + } + } + // Construct a new segment iterator, note that the shared_ptr of SegmentIterator can only + // processed in a single thread not shared among threads + auto new_seg_iter = std::make_unique<segment_v2::SegmentIterator>(segment, schema, + config::enable_resource_pool); + return new_seg_iter; +} + +bool ResourcePool::return_segment_iter(const SegmentIterCacheKey& key, SegmentIteratorUPtr&& iter) { + iter->clear(); + auto lru_handle = _seg_iter_cache->lookup(key.encode()); + Defer release([&] { _seg_iter_cache->release(lru_handle); }); + if (lru_handle) { + SegmentIterCacheValue* value = (SegmentIterCacheValue*)_seg_iter_cache->value(lru_handle); + value->append_iter(std::move(iter)); + return true; + } + auto deleter = [](const doris::CacheKey& key, void* value) { + SegmentIterCacheValue* cache_value = (SegmentIterCacheValue*)value; + cache_value->clear(); + delete cache_value; + }; + SegmentIterCacheValue* value = new SegmentIterCacheValue(); + value->append_iter(std::move(iter)); + std::string encoded_key = key.encode(); + CHECK(!encoded_key.empty()); + lru_handle = _seg_iter_cache->insert( + encoded_key, value, sizeof(SegmentIterCacheValue), deleter, CachePriority::NORMAL, + sizeof(SegmentIterCacheValue) * SegmentIterCacheValue::capacity); + return true; +} + +void ResourcePool::return_iterator(RowwiseIteratorUPtr&& iter) { + CHECK(iter->is_borrowed()); + RowwiseIterator* iter_ptr = iter.release(); + SegmentIteratorUPtr segment_iter = SegmentIteratorUPtr(static_cast<SegmentIterator*>(iter_ptr)); + CHECK(segment_iter); + SegmentIterCacheKey key; + key.segment_id = segment_iter->data_id(); + key.rowset_id = segment_iter->rowset_id(); + key.schema_key = get_schema_key(segment_iter->tablet_id(), segment_iter->schema()); + ResourcePool::instance()->return_segment_iter(key, std::move(segment_iter)); +} + +template <typename MapType> +void remove_half(MapType& map) { + std::vector<std::string> keys(map.size() / 2); + for (auto it : map) { + if (keys.size() >= map.size() / 2) { + break; + } + keys.push_back(it.first); + } + for (const auto& key : keys) { + map.erase(key); + } +} + +Status ResourcePool::prune() { + const int64_t curtime = UnixMillis(); + auto pred = [curtime](const void* value) -> bool { + SegmentIterCacheValue* cache_value = (SegmentIterCacheValue*)value; + cache_value->clear(); Review Comment: why clear, this seems to cause all SegmentIterators to be cleared, even without timeout ########## be/src/olap/resource_pool.h: ########## @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <fmt/core.h> +#include <parallel_hashmap/phmap.h> + +#include <algorithm> +#include <memory> +#include <mutex> +#include <vector> + +#include "olap/iterators.h" +#include "olap/olap_common.h" +#include "olap/schema.h" +#include "olap/tablet.h" +#include "olap/tablet_schema.h" + +namespace doris { + +namespace segment_v2 { +class Segment; +class SegmentIterator; +using SegmentIteratorUPtr = std::unique_ptr<SegmentIterator>; +} // namespace segment_v2 + +// The ResourcePool is utilized to cache pre-allocated data structures, +// eliminating the need for frequent allocation and deallocation during usage. +// This caching mechanism proves immensely advantageous, particularly in scenarios +// with high concurrency, where queries are executed simultaneously. +class ResourcePool { +public: + enum class Type { + SEGMENT_READ_SCHEMA = 0, + READ_SCHEMA = 1, + SEGMENT_ITERATOR = 2, + }; + + static ResourcePool* instance() { return _s_instance; } + + static void create_global_instance(size_t capacity); + + // Return the borrowed object back to cache, if reached capacity, return false. + // Do not touch iter if returned success. + static void return_iterator(RowwiseIteratorUPtr&& iter); + + // get cache schema key, delimiter with SCHEMA_DELIMITER + static std::string get_schema_key(int32_t tablet_id, const Schema& schema); + static std::string get_schema_key(int32_t tablet_id, const TabletSchemaSPtr& schema, + const std::vector<uint32_t>& column_ids); + static std::string get_schema_key(int32_t tablet_id, const std::vector<TColumn>& columns); + + // Get a cached segment iter, if not exist, then a new iterator will be constructed + // and put into cache when returned.Otherwise get the target iter and remove from map to prevent from + // multi threads access, since SegmentIterator is stateless and can't be shared. + SegmentIteratorUPtr borrow_segment_iter(int32_t tablet_id, SchemaSPtr schema, + std::shared_ptr<Segment> segment); + + // Get a shared cached Schema from resource pool, schema_key is a subset of column unique ids + SchemaSPtr get_shared_schema(const std::string& schema_key); + + // Get a shared cached tablet Schema from resource pool, schema_key is full column unique ids + TabletSchemaSPtr get_shared_tablet_schema(const std::string& schema_key); + + void insert_schema(const std::string& key, SchemaSPtr schema); + void insert_tablet_schema(const std::string& key, TabletSchemaSPtr schema); + + // Try to prune the cache if expired. + Status prune(); + +private: + ResourcePool(size_t capacity); + static constexpr char SCHEMA_DELIMITER = '-'; + static constexpr int SCHEMA_CAPACITY = 1024; + std::unique_ptr<Cache> _seg_iter_cache = nullptr; + + std::mutex _schema_lock; + phmap::flat_hash_map<std::string, SchemaSPtr> _schema_cache; + std::mutex _tablet_schema_lock; + phmap::flat_hash_map<std::string, TabletSchemaSPtr> _tablet_schema_cache; + + static ResourcePool* _s_instance; + + struct SegmentIterCacheValue { + static constexpr int32_t capacity = 8; + // Keep _iters thread safe + // TODO make it lock free + std::mutex _mtx; + std::vector<SegmentIteratorUPtr> _iters; + std::atomic<int64_t> last_visit_time = 0; + + SegmentIteratorUPtr pop_iter(); + void append_iter(SegmentIteratorUPtr&& it); + void clear(); + }; + + struct SegmentIterCacheKey { + RowsetId rowset_id; + uint32_t segment_id; + std::string schema_key; + // TODO add read option as part of key + + bool operator==(const SegmentIterCacheKey& rhs) const; + std::string encode() const { + return fmt::format("{}-{}-{}", rowset_id.to_string(), segment_id, schema_key); + } + }; + + bool return_segment_iter(const SegmentIterCacheKey& key, SegmentIteratorUPtr&& iter); + + struct HashOfCacheKey { Review Comment: doesn't seem to be used ########## be/src/olap/resource_pool.h: ########## @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <fmt/core.h> +#include <parallel_hashmap/phmap.h> + +#include <algorithm> +#include <memory> +#include <mutex> +#include <vector> + +#include "olap/iterators.h" +#include "olap/olap_common.h" +#include "olap/schema.h" +#include "olap/tablet.h" +#include "olap/tablet_schema.h" + +namespace doris { + +namespace segment_v2 { +class Segment; +class SegmentIterator; +using SegmentIteratorUPtr = std::unique_ptr<SegmentIterator>; +} // namespace segment_v2 + +// The ResourcePool is utilized to cache pre-allocated data structures, +// eliminating the need for frequent allocation and deallocation during usage. +// This caching mechanism proves immensely advantageous, particularly in scenarios +// with high concurrency, where queries are executed simultaneously. +class ResourcePool { +public: + enum class Type { + SEGMENT_READ_SCHEMA = 0, + READ_SCHEMA = 1, + SEGMENT_ITERATOR = 2, + }; + + static ResourcePool* instance() { return _s_instance; } + + static void create_global_instance(size_t capacity); + + // Return the borrowed object back to cache, if reached capacity, return false. + // Do not touch iter if returned success. + static void return_iterator(RowwiseIteratorUPtr&& iter); + + // get cache schema key, delimiter with SCHEMA_DELIMITER + static std::string get_schema_key(int32_t tablet_id, const Schema& schema); + static std::string get_schema_key(int32_t tablet_id, const TabletSchemaSPtr& schema, + const std::vector<uint32_t>& column_ids); + static std::string get_schema_key(int32_t tablet_id, const std::vector<TColumn>& columns); + + // Get a cached segment iter, if not exist, then a new iterator will be constructed + // and put into cache when returned.Otherwise get the target iter and remove from map to prevent from + // multi threads access, since SegmentIterator is stateless and can't be shared. + SegmentIteratorUPtr borrow_segment_iter(int32_t tablet_id, SchemaSPtr schema, + std::shared_ptr<Segment> segment); + + // Get a shared cached Schema from resource pool, schema_key is a subset of column unique ids + SchemaSPtr get_shared_schema(const std::string& schema_key); + + // Get a shared cached tablet Schema from resource pool, schema_key is full column unique ids + TabletSchemaSPtr get_shared_tablet_schema(const std::string& schema_key); + + void insert_schema(const std::string& key, SchemaSPtr schema); + void insert_tablet_schema(const std::string& key, TabletSchemaSPtr schema); + + // Try to prune the cache if expired. + Status prune(); + +private: + ResourcePool(size_t capacity); + static constexpr char SCHEMA_DELIMITER = '-'; + static constexpr int SCHEMA_CAPACITY = 1024; + std::unique_ptr<Cache> _seg_iter_cache = nullptr; + + std::mutex _schema_lock; + phmap::flat_hash_map<std::string, SchemaSPtr> _schema_cache; + std::mutex _tablet_schema_lock; + phmap::flat_hash_map<std::string, TabletSchemaSPtr> _tablet_schema_cache; + + static ResourcePool* _s_instance; + + struct SegmentIterCacheValue { + static constexpr int32_t capacity = 8; Review Comment: segment interator of the same key can cache up to 8. performance of different numbers has been tested? If has test, best to add comment ########## be/src/olap/rowset/beta_rowset_reader.cpp: ########## @@ -269,6 +279,10 @@ Status BetaRowsetReader::next_block(vectorized::Block* block) { if (!s.ok()) { if (!s.is<END_OF_FILE>()) { LOG(WARNING) << "failed to read next block: " << s.to_string(); + } else { + if (_iterator->is_borrowed()) { Review Comment: `_iterator->is_borrowed()` seems to always be false, `_iterator` is `VMergeIterator` or `VUnionIterator` Is it supposed to be like this? ``` for (RowwiseIteratorUPtr& it : _origin_iters) { if (it && it->is_borrowed()) { ResourcePool::return_iterator(std::move(it)); } } ``` ########## be/src/vec/olap/vgeneric_iterators.h: ########## @@ -99,7 +99,7 @@ class VMergeIteratorContext { VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete; VMergeIteratorContext& operator=(VMergeIteratorContext&&) = delete; - ~VMergeIteratorContext() {} + ~VMergeIteratorContext(); Review Comment: `~VMergeIterator() ` why not ` return_ iterator` ########## be/src/olap/resource_pool.h: ########## @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <fmt/core.h> +#include <parallel_hashmap/phmap.h> + +#include <algorithm> +#include <memory> +#include <mutex> +#include <vector> + +#include "olap/iterators.h" +#include "olap/olap_common.h" +#include "olap/schema.h" +#include "olap/tablet.h" +#include "olap/tablet_schema.h" + +namespace doris { + +namespace segment_v2 { +class Segment; +class SegmentIterator; +using SegmentIteratorUPtr = std::unique_ptr<SegmentIterator>; +} // namespace segment_v2 + +// The ResourcePool is utilized to cache pre-allocated data structures, +// eliminating the need for frequent allocation and deallocation during usage. +// This caching mechanism proves immensely advantageous, particularly in scenarios +// with high concurrency, where queries are executed simultaneously. +class ResourcePool { +public: + enum class Type { + SEGMENT_READ_SCHEMA = 0, + READ_SCHEMA = 1, + SEGMENT_ITERATOR = 2, + }; + + static ResourcePool* instance() { return _s_instance; } + + static void create_global_instance(size_t capacity); + + // Return the borrowed object back to cache, if reached capacity, return false. + // Do not touch iter if returned success. + static void return_iterator(RowwiseIteratorUPtr&& iter); + + // get cache schema key, delimiter with SCHEMA_DELIMITER + static std::string get_schema_key(int32_t tablet_id, const Schema& schema); + static std::string get_schema_key(int32_t tablet_id, const TabletSchemaSPtr& schema, + const std::vector<uint32_t>& column_ids); + static std::string get_schema_key(int32_t tablet_id, const std::vector<TColumn>& columns); + + // Get a cached segment iter, if not exist, then a new iterator will be constructed + // and put into cache when returned.Otherwise get the target iter and remove from map to prevent from + // multi threads access, since SegmentIterator is stateless and can't be shared. + SegmentIteratorUPtr borrow_segment_iter(int32_t tablet_id, SchemaSPtr schema, + std::shared_ptr<Segment> segment); + + // Get a shared cached Schema from resource pool, schema_key is a subset of column unique ids + SchemaSPtr get_shared_schema(const std::string& schema_key); + + // Get a shared cached tablet Schema from resource pool, schema_key is full column unique ids + TabletSchemaSPtr get_shared_tablet_schema(const std::string& schema_key); + + void insert_schema(const std::string& key, SchemaSPtr schema); + void insert_tablet_schema(const std::string& key, TabletSchemaSPtr schema); + + // Try to prune the cache if expired. + Status prune(); + +private: + ResourcePool(size_t capacity); + static constexpr char SCHEMA_DELIMITER = '-'; + static constexpr int SCHEMA_CAPACITY = 1024; Review Comment: 1024 seems small, consider add a conf for it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org