This is an automated email from the ASF dual-hosted git repository. kangkaisen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push: new 6c33f80 Add disable_storage_page_cache config (#2890) 6c33f80 is described below commit 6c33f805449d9a7bc8809a575407acfa87bb061e Author: kangkaisen <kangkai...@apache.org> AuthorDate: Sun Feb 16 19:13:30 2020 +0800 Add disable_storage_page_cache config (#2890) 1. when read column data page: for compaction, schema_change, check_sum: we don't use page cache for query and config::disable_storage_page_cache is false, we use page cache 2. when read column index page if config::disable_storage_page_cache is false, we use page cache --- be/src/common/config.h | 2 + be/src/exec/olap_scanner.cpp | 16 ++++--- be/src/olap/iterators.h | 1 + be/src/olap/reader.cpp | 19 ++++---- be/src/olap/reader.h | 8 +++- be/src/olap/rowset/beta_rowset_reader.cpp | 1 + be/src/olap/rowset/rowset_reader_context.h | 1 + be/src/olap/rowset/segment_v2/column_reader.cpp | 53 ++++++++++++++-------- be/src/olap/rowset/segment_v2/column_reader.h | 5 +- .../rowset/segment_v2/indexed_column_reader.cpp | 15 ++++-- be/src/olap/rowset/segment_v2/segment_iterator.cpp | 3 +- 11 files changed, 81 insertions(+), 43 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 9bbcf51..5ce57d6 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -230,6 +230,8 @@ namespace config { // Cache for stoage page size CONF_String(storage_page_cache_limit, "20G"); + // whether to disable page cache feature in storage + CONF_Bool(disable_storage_page_cache, "false"); // be policy CONF_Int64(base_compaction_start_hour, "20"); diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index ce6b50b..fbf3bf3 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -99,7 +99,7 @@ Status OlapScanner::_prepare( return Status::InternalError(ss.str()); } - // acquire tablet rowset readers at the beginning of the scan node + // acquire tablet rowset readers at the beginning of the scan node // to prevent this case: when there are lots of olap scanners to run for example 10000 // the rowsets maybe compacted when the last olap scanner starts Version rd_version(0, _version); @@ -113,7 +113,7 @@ Status OlapScanner::_prepare( } } } - + { // Initialize _params RETURN_IF_ERROR(_init_params(key_ranges, filters, is_nulls)); @@ -141,7 +141,7 @@ Status OlapScanner::open() { return Status::OK(); } -// it will be called under tablet read lock because capture rs readers need +// it will be called under tablet read lock because capture rs readers need Status OlapScanner::_init_params( const std::vector<OlapScanRange*>& key_ranges, const std::vector<TCondition>& filters, @@ -206,6 +206,10 @@ Status OlapScanner::_init_params( // to avoid the unnecessary SerDe and improve query performance _params.need_agg_finalize = _need_agg_finalize; + if (!config::disable_storage_page_cache) { + _params.use_page_cache = true; + } + return Status::OK(); } @@ -483,7 +487,7 @@ void OlapScanner::_update_realtime_counter() { COUNTER_UPDATE(_parent->_read_compressed_counter, _reader->stats().compressed_bytes_read); _compressed_bytes_read += _reader->stats().compressed_bytes_read; _reader->mutable_stats()->compressed_bytes_read = 0; - + COUNTER_UPDATE(_parent->_raw_rows_counter, _reader->stats().raw_rows_read); // if raw_rows_read is reset, scanNode will scan all table rows which may cause BE crash _raw_rows_read += _reader->stats().raw_rows_read; @@ -497,8 +501,8 @@ Status OlapScanner::close(RuntimeState* state) { // olap scan node will call scanner.close() when finished // will release resources here // if not clear rowset readers in read_params here - // readers will be release when runtime state deconstructed but - // deconstructor in reader references runtime state + // readers will be release when runtime state deconstructed but + // deconstructor in reader references runtime state // so that it will core _params.rs_readers.clear(); update_counter(); diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index ff378de..35e2687 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -79,6 +79,7 @@ public: // REQUIRED (null is not allowed) OlapReaderStatistics* stats = nullptr; + bool use_page_cache = false; }; // Used to read data in RowBlockV2 one by one diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index d88471e..a19dd03 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -405,7 +405,7 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool *eof = true; return OLAP_SUCCESS; } - + cur_delete_flag = _next_delete_flag; init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool); @@ -418,7 +418,7 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool } break; } - + // we will not do aggregation in two case: // 1. DUP_KEYS keys type has no semantic to aggregate, // 2. to make cost of each scan round reasonable, we will control merged_count. @@ -436,13 +436,13 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool agg_update_row(_value_cids, row_cursor, *_next_key); ++merged_count; } - + _merged_rows += merged_count; - + if (!cur_delete_flag) { return OLAP_SUCCESS; } - + _stats.rows_del_filtered++; } while (cur_delete_flag); @@ -547,6 +547,7 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params) { _reader_context.delete_handler = &_delete_handler; _reader_context.stats = &_stats; _reader_context.runtime_state = read_params.runtime_state; + _reader_context.use_page_cache = read_params.use_page_cache; for (auto& rs_reader : *rs_readers) { RETURN_NOT_OK(rs_reader->init(&_reader_context)); _rs_readers.push_back(rs_reader); @@ -711,14 +712,14 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { OLAP_LOG_WARNING("fail to new RowCursor!"); return OLAP_ERR_MALLOC_ERROR; } - + res = _keys_param.start_keys[i]->init_scan_key(_tablet->tablet_schema(), read_params.start_key[i].values()); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res); return res; } - + res = _keys_param.start_keys[i]->from_tuple(read_params.start_key[i]); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init row cursor from Keys. [res=%d key_index=%ld]", res, i); @@ -733,14 +734,14 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) { OLAP_LOG_WARNING("fail to new RowCursor!"); return OLAP_ERR_MALLOC_ERROR; } - + res = _keys_param.end_keys[i]->init_scan_key(_tablet->tablet_schema(), read_params.end_key[i].values()); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res); return res; } - + res = _keys_param.end_keys[i]->from_tuple(read_params.end_key[i]); if (res != OLAP_SUCCESS) { OLAP_LOG_WARNING("fail to init row cursor from Keys. [res=%d key_index=%ld]", res, i); diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index d07fbce..0041c62 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -54,6 +54,12 @@ struct ReaderParams { ReaderType reader_type; bool aggregation; bool need_agg_finalize = true; + // 1. when read column data page: + // for compaction, schema_change, check_sum: we don't use page cache + // for query and config::disable_storage_page_cache is false, we use page cache + // 2. when read column index page + // if config::disable_storage_page_cache is false, we use page cache + bool use_page_cache = false; Version version; // possible values are "gt", "ge", "eq" std::string range; @@ -107,7 +113,7 @@ struct ReaderParams { for (int i = 0, size = conditions.size(); i < size; ++i) { ss << " conditions=" << apache::thrift::ThriftDebugString(conditions[i]); } - + return ss.str(); } }; diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 05a69dc..9098500 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -62,6 +62,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { &read_options.delete_conditions); } read_options.column_predicates = read_context->predicates; + read_options.use_page_cache = read_context->use_page_cache; // create iterator for each segment std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators; diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 748b8dc..0fb18f4 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -54,6 +54,7 @@ struct RowsetReaderContext { const DeleteHandler* delete_handler = nullptr; OlapReaderStatistics* stats = nullptr; RuntimeState* runtime_state = nullptr; + bool use_page_cache = false; }; } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index 61fb2fd..5bfd6c6 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -84,23 +84,23 @@ Status ColumnReader::new_bitmap_index_iterator(BitmapIndexIterator** iterator) { return Status::OK(); } -Status ColumnReader::read_page(const PagePointer& pp, OlapReaderStatistics* stats, PageHandle* handle) { +Status ColumnReader::read_page(const PagePointer& pp, const ColumnIteratorOptions& opts, PageHandle* handle) { OpenedFileHandle<RandomAccessFile> file_handle; RETURN_IF_ERROR(FileManager::instance()->open_file(_file_name, &file_handle)); RandomAccessFile* input_file = file_handle.file(); - return read_page(input_file, pp, stats, handle); + return read_page(input_file, pp, opts, handle); } Status ColumnReader::read_page(RandomAccessFile* file, const PagePointer& pp, - OlapReaderStatistics* stats, PageHandle* handle) { - stats->total_pages_num++; + const ColumnIteratorOptions& iter_opts, PageHandle* handle) { + iter_opts.stats->total_pages_num++; auto cache = StoragePageCache::instance(); PageCacheHandle cache_handle; StoragePageCache::CacheKey cache_key(file->file_name(), pp.offset); - if (cache->lookup(cache_key, &cache_handle)) { + if (iter_opts.use_page_cache && cache->lookup(cache_key, &cache_handle)) { // we find page in cache, use it *handle = PageHandle(std::move(cache_handle)); - stats->cached_pages_num++; + iter_opts.stats->cached_pages_num++; return Status::OK(); } // Now we read this from file. @@ -114,9 +114,9 @@ Status ColumnReader::read_page(RandomAccessFile* file, const PagePointer& pp, std::unique_ptr<uint8_t[]> page(new uint8_t[page_size]); Slice page_slice(page.get(), page_size); { - SCOPED_RAW_TIMER(&stats->io_ns); + SCOPED_RAW_TIMER(&iter_opts.stats->io_ns); RETURN_IF_ERROR(file->read_at(pp.offset, page_slice)); - stats->compressed_bytes_read += page_size; + iter_opts.stats->compressed_bytes_read += page_size; } size_t data_size = page_size - 4; @@ -137,7 +137,7 @@ Status ColumnReader::read_page(RandomAccessFile* file, const PagePointer& pp, Slice uncompressed_page; { - SCOPED_RAW_TIMER(&stats->decompress_ns); + SCOPED_RAW_TIMER(&iter_opts.stats->decompress_ns); RETURN_IF_ERROR(decompressor.decompress_to(&uncompressed_page)); } @@ -147,13 +147,17 @@ Status ColumnReader::read_page(RandomAccessFile* file, const PagePointer& pp, page.reset((uint8_t*)uncompressed_page.data); } page_slice = uncompressed_page; - stats->uncompressed_bytes_read += page_slice.size; + iter_opts.stats->uncompressed_bytes_read += page_slice.size; + } + if (iter_opts.use_page_cache) { + // insert this into cache and return the cache handle + cache->insert(cache_key, page_slice, &cache_handle); + *handle = PageHandle(std::move(cache_handle)); + } else { + *handle = PageHandle(page_slice); } - // insert this into cache and return the cache handle - cache->insert(cache_key, page_slice, &cache_handle); - page.release(); - *handle = PageHandle(std::move(cache_handle)); + page.release(); return Status::OK(); } @@ -259,7 +263,13 @@ Status ColumnReader::_load_ordinal_index() { PagePointer pp = _meta.ordinal_index_page(); PageHandle ph; OlapReaderStatistics stats; - RETURN_IF_ERROR(read_page(pp, &stats, &ph)); + ColumnIteratorOptions opts; + // column index only load once, so we use global config to decide + if (!config::disable_storage_page_cache) { + opts.use_page_cache = true; + } + opts.stats = &stats; + RETURN_IF_ERROR(read_page(pp, opts, &ph)); _ordinal_index.reset(new OrdinalPageIndex(ph.data(), _num_rows)); RETURN_IF_ERROR(_ordinal_index->load()); @@ -270,9 +280,14 @@ Status ColumnReader::_load_zone_map_index() { if (_meta.has_zone_map_page()) { PagePointer pp = _meta.zone_map_page(); PageHandle ph; - // tmp statistics OlapReaderStatistics stats; - RETURN_IF_ERROR(read_page(pp, &stats, &ph)); + ColumnIteratorOptions opts; + // column index only load once, so we use global config to decide + if (!config::disable_storage_page_cache) { + opts.use_page_cache = true; + } + opts.stats = &stats; + RETURN_IF_ERROR(read_page(pp, opts, &ph)); _column_zone_map.reset(new ColumnZoneMap(ph.data())); RETURN_IF_ERROR(_column_zone_map->load()); } else { @@ -463,7 +478,7 @@ Status FileColumnIterator::_load_next_page(bool* eos) { // it ready to read Status FileColumnIterator::_read_page(const OrdinalPageIndexIterator& iter, ParsedPage* page) { page->page_pointer = iter.page(); - RETURN_IF_ERROR(_reader->read_page(_file, page->page_pointer, _opts.stats, &page->page_handle)); + RETURN_IF_ERROR(_reader->read_page(_file, page->page_pointer, _opts, &page->page_handle)); // TODO(zc): read page from file Slice data = page->page_handle.data(); @@ -505,7 +520,7 @@ Status FileColumnIterator::_read_page(const OrdinalPageIndexIterator& iter, Pars if (binary_dict_page_decoder->is_dict_encoding()) { if (_dict_decoder == nullptr) { PagePointer pp = _reader->get_dict_page_pointer(); - RETURN_IF_ERROR(_reader->read_page(_file, pp, _opts.stats, &_dict_page_handle)); + RETURN_IF_ERROR(_reader->read_page(_file, pp, _opts, &_dict_page_handle)); _dict_decoder.reset(new BinaryPlainPageDecoder(_dict_page_handle.data())); RETURN_IF_ERROR(_dict_decoder->init()); diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h index 1590005..cce8dcd 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.h +++ b/be/src/olap/rowset/segment_v2/column_reader.h @@ -59,6 +59,7 @@ struct ColumnIteratorOptions { // reader statistics OlapReaderStatistics* stats = nullptr; RandomAccessFile* file = nullptr; + bool use_page_cache = false; }; // There will be concurrent users to read the same column. So @@ -88,11 +89,11 @@ public: // read a page from file into a page handle // use reader owned _file(usually is Descriptor<RandomAccessFile>*) to read page - Status read_page(const PagePointer& pp, OlapReaderStatistics* stats, PageHandle* handle); + Status read_page(const PagePointer& pp, const ColumnIteratorOptions& opts, PageHandle* handle); // read a page from file into a page handle // use file(usually is RandomAccessFile*) to read page - Status read_page(RandomAccessFile* file, const PagePointer& pp, OlapReaderStatistics* stats, PageHandle* handle); + Status read_page(RandomAccessFile* file, const PagePointer& pp, const ColumnIteratorOptions& opts, PageHandle* handle); bool is_nullable() const { return _meta.is_nullable(); } diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index d40182a..f2c572d 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -75,7 +75,8 @@ Status IndexedColumnReader::read_page(RandomAccessFile* file, const PagePointer& auto cache = StoragePageCache::instance(); PageCacheHandle cache_handle; StoragePageCache::CacheKey cache_key(file->file_name(), pp.offset); - if (cache->lookup(cache_key, &cache_handle)) { + // column index only load once, so we use global config to decide + if (!config::disable_storage_page_cache && cache->lookup(cache_key, &cache_handle)) { // we find page in cache, use it *handle = PageHandle(std::move(cache_handle)); return Status::OK(); @@ -117,11 +118,15 @@ Status IndexedColumnReader::read_page(RandomAccessFile* file, const PagePointer& } page_slice = uncompressed_page; } - // insert this into cache and return the cache handle - cache->insert(cache_key, page_slice, &cache_handle); - page.release(); - *handle = PageHandle(std::move(cache_handle)); + if (!config::disable_storage_page_cache) { + // insert this into cache and return the cache handle + cache->insert(cache_key, page_slice, &cache_handle); + *handle = PageHandle(std::move(cache_handle)); + } else { + *handle = PageHandle(page_slice); + } + page.release(); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index 6f0f347..83ecfb0 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -315,6 +315,7 @@ Status SegmentIterator::_init_return_column_iterators() { RETURN_IF_ERROR(_segment->new_column_iterator(cid, &_column_iterators[cid])); ColumnIteratorOptions iter_opts; iter_opts.stats = _opts.stats; + iter_opts.use_page_cache = _opts.use_page_cache; iter_opts.file = _file_handle.file(); RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts)); } @@ -347,7 +348,7 @@ int compare_row_with_lhs_columns(const LhsRowType& lhs, const RhsRowType& rhs) { return 0; } -// look up one key to get its ordinal at which can get data. +// look up one key to get its ordinal at which can get data. // 'upper_bound' is defined the max ordinal the function will search. // We use upper_bound to reduce search times. // If we find a valid ordinal, it will be set in rowid and with Status::OK() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org