This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 346ccae56ec [opt](catalog) add some profile for parquet reader and change meta cache config (#37040) 346ccae56ec is described below commit 346ccae56ec009355cd5298e482cd05f7f4214c4 Author: Mingyu Chen <morning...@163.com> AuthorDate: Mon Jul 1 15:32:31 2024 +0800 [opt](catalog) add some profile for parquet reader and change meta cache config (#37040) ## Proposed changes This PR mainly changes: 1. add new BE config `enable_parquet_page_index` Default is true, if set to false, the parquet reader will not use page index to filter data. This is only for debug purpose, in case sometimes the page index filter wrong data. 2. Add new FE config `max_hive_partition_table_cache_num` Separator from `max_hive_table_cache_num`. This config is used to set the max cache number of `partitionValuesCache`, which is for partition values list of partitioned hive table. 3. Reduce the default expire time of meta cache from 86400 to 24400. The hive table partitioned by day, if the expire time of cache is 86400, it may always fetching the stale cache value after a day pass by. --- be/src/common/config.cpp | 5 +++++ be/src/common/config.h | 2 ++ be/src/vec/exec/format/parquet/vparquet_reader.cpp | 23 +++++++++++++++++----- be/src/vec/exec/format/parquet/vparquet_reader.h | 4 ++++ .../main/java/org/apache/doris/common/Config.java | 13 ++++++++---- .../doris/datasource/hive/HiveMetaStoreCache.java | 6 +++--- 6 files changed, 41 insertions(+), 12 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c2274fd169b..7166b39dda8 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1313,6 +1313,11 @@ DEFINE_Bool(enable_file_logger, "true"); // The minimum row group size when exporting Parquet files. default 128MB DEFINE_Int64(min_row_group_size, "134217728"); +// If set to false, the parquet reader will not use page index to filter data. +// This is only for debug purpose, in case sometimes the page index +// filter wrong data. +DEFINE_mBool(enable_parquet_page_index, "true"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 9920b65fe52..dbb5b716b78 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1399,6 +1399,8 @@ DECLARE_Bool(enable_file_logger); // The minimum row group size when exporting Parquet files. DECLARE_Int64(min_row_group_size); +DECLARE_mBool(enable_parquet_page_index); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 629f272ef72..25421d80b0e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -148,6 +148,10 @@ void ParquetReader::_init_profile() { ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "FileNum", TUnit::UNIT, parquet_profile, 1); _parquet_profile.page_index_filter_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageIndexFilterTime", parquet_profile, 1); + _parquet_profile.read_page_index_time = + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageIndexReadTime", parquet_profile, 1); + _parquet_profile.parse_page_index_time = + ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PageIndexParseTime", parquet_profile, 1); _parquet_profile.row_group_filter_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RowGroupFilterTime", parquet_profile, 1); @@ -747,25 +751,32 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, return Status::OK(); } PageIndex page_index; - if (!_has_page_index(row_group.columns, page_index)) { + if (!config::enable_parquet_page_index || !_has_page_index(row_group.columns, page_index)) { read_whole_row_group(); return Status::OK(); } std::vector<uint8_t> col_index_buff(page_index._column_index_size); size_t bytes_read = 0; Slice result(col_index_buff.data(), page_index._column_index_size); - RETURN_IF_ERROR( - _file_reader->read_at(page_index._column_index_start, result, &bytes_read, _io_ctx)); + { + SCOPED_RAW_TIMER(&_statistics.read_page_index_time); + RETURN_IF_ERROR(_file_reader->read_at(page_index._column_index_start, result, &bytes_read, + _io_ctx)); + } _column_statistics.read_bytes += bytes_read; auto& schema_desc = _file_metadata->schema(); std::vector<RowRange> skipped_row_ranges; std::vector<uint8_t> off_index_buff(page_index._offset_index_size); Slice res(off_index_buff.data(), page_index._offset_index_size); - RETURN_IF_ERROR( - _file_reader->read_at(page_index._offset_index_start, res, &bytes_read, _io_ctx)); + { + SCOPED_RAW_TIMER(&_statistics.read_page_index_time); + RETURN_IF_ERROR( + _file_reader->read_at(page_index._offset_index_start, res, &bytes_read, _io_ctx)); + } _column_statistics.read_bytes += bytes_read; // read twice: parse column index & parse offset index _column_statistics.meta_read_calls += 2; + SCOPED_RAW_TIMER(&_statistics.parse_page_index_time); for (auto& read_col : _read_columns) { auto conjunct_iter = _colname_to_value_range->find(read_col); if (_colname_to_value_range->end() == conjunct_iter) { @@ -935,6 +946,8 @@ void ParquetReader::_collect_profile() { COUNTER_UPDATE(_parquet_profile.open_file_time, _statistics.open_file_time); COUNTER_UPDATE(_parquet_profile.open_file_num, _statistics.open_file_num); COUNTER_UPDATE(_parquet_profile.page_index_filter_time, _statistics.page_index_filter_time); + COUNTER_UPDATE(_parquet_profile.read_page_index_time, _statistics.read_page_index_time); + COUNTER_UPDATE(_parquet_profile.parse_page_index_time, _statistics.parse_page_index_time); COUNTER_UPDATE(_parquet_profile.row_group_filter_time, _statistics.row_group_filter_time); COUNTER_UPDATE(_parquet_profile.skip_page_header_num, _column_statistics.skip_page_header_num); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 0eac1d65118..38b3d71a466 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -89,6 +89,8 @@ public: int64_t open_file_num = 0; int64_t row_group_filter_time = 0; int64_t page_index_filter_time = 0; + int64_t read_page_index_time = 0; + int64_t parse_page_index_time = 0; }; ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, @@ -170,6 +172,8 @@ private: RuntimeProfile::Counter* open_file_num = nullptr; RuntimeProfile::Counter* row_group_filter_time = nullptr; RuntimeProfile::Counter* page_index_filter_time = nullptr; + RuntimeProfile::Counter* read_page_index_time = nullptr; + RuntimeProfile::Counter* parse_page_index_time = nullptr; RuntimeProfile::Counter* file_read_time = nullptr; RuntimeProfile::Counter* file_read_calls = nullptr; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index c4340d783ef..109a0376a31 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1948,16 +1948,21 @@ public class Config extends ConfigBase { @ConfField(mutable = false, masterOnly = false) public static long max_hive_partition_cache_num = 100000; - @ConfField(mutable = false, masterOnly = false, description = {"Hive表到分区名列表缓存的最大数量。", - "Max cache number of hive table to partition names list."}) + @ConfField(mutable = false, masterOnly = false, description = {"Hive表名缓存的最大数量。", + "Max cache number of hive table name list."}) public static long max_hive_table_cache_num = 1000; + @ConfField(mutable = false, masterOnly = false, description = { + "Hive分区表缓存的最大数量", "Max cache number of hive partition table" + }) + public static long max_hive_partition_table_cache_num = 1000; + @ConfField(mutable = false, masterOnly = false, description = {"获取Hive分区值时候的最大返回数量,-1代表没有限制。", - "Max number of hive partition values to return while list partitions, -1 means no limitation."}) + "Max number of hive partition values to return while list partitions, -1 means no limitation."}) public static short max_hive_list_partition_num = -1; @ConfField(mutable = false, masterOnly = false, description = {"远程文件系统缓存的最大数量", - "Max cache number of remote file system."}) + "Max cache number of remote file system."}) public static long max_remote_file_system_cache_num = 100; @ConfField(mutable = false, masterOnly = false, description = {"外表行数缓存最大数量", diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index 5e3366e37c4..7f23385d847 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -136,16 +136,16 @@ public class HiveMetaStoreCache { **/ private void init() { CacheFactory partitionValuesCacheFactory = new CacheFactory( - OptionalLong.of(86400L), + OptionalLong.of(28800L), OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L), - Config.max_hive_table_cache_num, + Config.max_hive_partition_table_cache_num, false, null); partitionValuesCache = partitionValuesCacheFactory.buildCache(key -> loadPartitionValues(key), null, refreshExecutor); CacheFactory partitionCacheFactory = new CacheFactory( - OptionalLong.of(86400L), + OptionalLong.of(28800L), OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60L), Config.max_hive_partition_cache_num, false, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org