This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch doris-for-zhongjin in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3c26b5f218aad22c1eb2465244ba67dbfaecfba2 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Wed Apr 5 15:51:47 2023 +0800 [fix](file_cache) turn on file cache by FE session variable (#18340) Fix tow bugs: 1. Enabling file caching requires both `FE session` and `BE` configurations(enable_file_cache=true) to be enabled. 2. `ParquetReader` has not used `IOContext` previously, but `CachedRemoteFileReader::read_at` needs `IOContext` after PR(#17586). --- be/src/io/file_factory.cpp | 22 +++++++++------- be/src/io/file_factory.h | 12 +++++---- be/src/io/fs/buffered_reader.cpp | 10 ++++---- be/src/io/fs/buffered_reader.h | 10 +++++--- be/src/io/io_common.h | 4 +-- be/src/vec/exec/format/csv/csv_reader.cpp | 10 +++++--- be/src/vec/exec/format/json/new_json_reader.cpp | 6 +++-- be/src/vec/exec/format/orc/vorc_reader.cpp | 14 +++++++---- be/src/vec/exec/format/orc/vorc_reader.h | 3 ++- .../vec/exec/format/parquet/parquet_thrift_util.h | 10 +++++--- .../parquet/vparquet_column_chunk_reader.cpp | 7 +++--- .../format/parquet/vparquet_column_chunk_reader.h | 3 ++- .../exec/format/parquet/vparquet_column_reader.cpp | 21 ++++++++-------- .../exec/format/parquet/vparquet_column_reader.h | 29 ++++++++++++++-------- .../exec/format/parquet/vparquet_group_reader.cpp | 6 +++-- .../exec/format/parquet/vparquet_group_reader.h | 3 ++- .../exec/format/parquet/vparquet_page_reader.cpp | 9 ++++--- .../vec/exec/format/parquet/vparquet_page_reader.h | 4 ++- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 23 +++++++++++------ be/src/vec/exec/scan/vfile_scanner.cpp | 5 ++-- be/src/vec/exec/varrow_scanner.cpp | 7 +++--- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 17 ++++++++----- 22 files changed, 141 insertions(+), 94 deletions(-) diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index 64bbcfd1d9..c539f9ae50 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -26,16 +26,23 @@ #include "io/fs/hdfs_file_system.h" #include "io/fs/hdfs_file_writer.h" #include "io/fs/local_file_system.h" -#include "io/fs/local_file_writer.h" -#include "io/fs/remote_file_system.h" #include "io/fs/s3_file_system.h" -#include "io/fs/s3_file_writer.h" #include "runtime/exec_env.h" +#include "runtime/runtime_state.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" +#include "util/runtime_profile.h" #include "util/s3_uri.h" namespace doris { +io::FileCachePolicy FileFactory::get_cache_policy(RuntimeState* state) { + if (state != nullptr) { + if (config::enable_file_cache && state->query_options().enable_file_cache) { + return io::FileCachePolicy::FILE_BLOCK_CACHE; + } + } + return io::FileCachePolicy::NO_CACHE; +} Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env, const std::vector<TNetworkAddress>& broker_addresses, @@ -78,16 +85,13 @@ Status FileFactory::create_file_writer(TFileType::type type, ExecEnv* env, return Status::OK(); } -Status FileFactory::create_file_reader(RuntimeProfile* /*profile*/, +Status FileFactory::create_file_reader(RuntimeProfile* profile, const FileSystemProperties& system_properties, const FileDescription& file_description, std::shared_ptr<io::FileSystem>* file_system, - io::FileReaderSPtr* file_reader) { + io::FileReaderSPtr* file_reader, + io::FileCachePolicy cache_policy) { TFileType::type type = system_properties.system_type; - auto cache_policy = io::FileCachePolicy::NO_CACHE; - if (config::enable_file_cache) { - cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE; - } io::FileBlockCachePathPolicy file_block_cache; io::FileReaderOptions reader_options(cache_policy, file_block_cache); reader_options.file_size = file_description.file_size; diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h index 978241f8d6..01b13cc521 100644 --- a/be/src/io/file_factory.h +++ b/be/src/io/file_factory.h @@ -45,6 +45,8 @@ struct FileDescription { class FileFactory { public: + static io::FileCachePolicy get_cache_policy(RuntimeState* state); + /// Create FileWriter static Status create_file_writer(TFileType::type type, ExecEnv* env, const std::vector<TNetworkAddress>& broker_addresses, @@ -53,11 +55,11 @@ public: std::unique_ptr<io::FileWriter>& file_writer); /// Create FileReader - static Status create_file_reader(RuntimeProfile* profile, - const FileSystemProperties& system_properties, - const FileDescription& file_description, - std::shared_ptr<io::FileSystem>* file_system, - io::FileReaderSPtr* file_reader); + static Status create_file_reader( + RuntimeProfile* profile, const FileSystemProperties& system_properties, + const FileDescription& file_description, std::shared_ptr<io::FileSystem>* file_system, + io::FileReaderSPtr* file_reader, + io::FileCachePolicy cache_policy = io::FileCachePolicy::NO_CACHE); // Create FileReader for stream load pipe static Status create_pipe_reader(const TUniqueId& load_id, io::FileReaderSPtr* file_reader); diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 4f47ff5671..d29ba2fe0d 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -23,7 +23,6 @@ #include "common/config.h" #include "olap/iterators.h" #include "olap/olap_define.h" -#include "util/bit_util.h" namespace doris { namespace io { @@ -191,7 +190,7 @@ BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, uint _max_buf_size(max_buf_size) {} Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset, - const size_t bytes_to_read) { + const size_t bytes_to_read, const IOContext* io_ctx) { if (offset < _file_start_offset || offset >= _file_end_offset) { return Status::IOError("Out-of-bounds Access"); } @@ -223,7 +222,7 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset while (has_read < to_read) { size_t loop_read = 0; Slice result(_buf.get() + buf_remaining + has_read, to_read - has_read); - RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result, &loop_read)); + RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result, &loop_read, io_ctx)); _statistics.read_calls++; if (loop_read == 0) { break; @@ -239,8 +238,9 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset return Status::OK(); } -Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset) { - return read_bytes((const uint8_t**)&slice.data, offset, slice.size); +Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset, + const IOContext* io_ctx) { + return read_bytes((const uint8_t**)&slice.data, offset, slice.size, io_ctx); } } // namespace io diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index 2b3b6054b9..434058e32e 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -156,11 +156,12 @@ public: * @param offset start offset ot read in stream * @param bytes_to_read bytes to read */ - virtual Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read) = 0; + virtual Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read, + const IOContext* io_ctx) = 0; /** * Save the data address to slice.data, and the slice.size is the bytes to read. */ - virtual Status read_bytes(Slice& slice, uint64_t offset) = 0; + virtual Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) = 0; Statistics& statistics() { return _statistics; } virtual ~BufferedStreamReader() = default; // return the file path @@ -176,8 +177,9 @@ public: size_t max_buf_size); ~BufferedFileStreamReader() override = default; - Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read) override; - Status read_bytes(Slice& slice, uint64_t offset) override; + Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read, + const IOContext* io_ctx) override; + Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override; std::string path() override { return _file->path(); } private: diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h index 39a059b7cb..d99b4bb1fb 100644 --- a/be/src/io/io_common.h +++ b/be/src/io/io_common.h @@ -55,15 +55,13 @@ public: is_persistent(is_presistent_), use_disposable_cache(use_disposable_cache_), read_segment_index(read_segment_index_), - file_cache_stats(stats_), - enable_file_cache(enable_file_cache) {} + file_cache_stats(stats_) {} ReaderType reader_type; const TUniqueId* query_id = nullptr; bool is_persistent = false; bool use_disposable_cache = false; bool read_segment_index = false; FileCacheStatistics* file_cache_stats = nullptr; - bool enable_file_cache = true; }; } // namespace io diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index f11a5c2273..7f114167ea 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -142,8 +142,10 @@ Status CsvReader::init_reader(bool is_load) { if (_params.file_type == TFileType::FILE_STREAM) { RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &csv_file_reader)); } else { - RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &csv_file_reader)); + io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, + _file_description, &_file_system, + &_file_reader, cache_policy)); } if (typeid_cast<io::S3FileReader*>(csv_file_reader.get()) != nullptr || typeid_cast<io::BrokerFileReader*>(csv_file_reader.get()) != nullptr) { @@ -634,9 +636,9 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* is_parse_name) { } _file_description.start_offset = start_offset; - + io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, _file_description, - &_file_system, &_file_reader)); + &_file_system, &_file_reader, cache_policy)); if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && _params.file_type != TFileType::FILE_BROKER) { return Status::EndOfFile("get parsed schema failed, empty csv file: " + _range.path); diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index 6ae0c74e85..cea79dad75 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -339,8 +339,10 @@ Status NewJsonReader::_open_file_reader() { if (_params.file_type == TFileType::FILE_STREAM) { RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &json_file_reader)); } else { - RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &json_file_reader)); + io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, + _file_description, &_file_system, + &_file_reader, cache_policy)); } if (typeid_cast<io::S3FileReader*>(json_file_reader.get()) != nullptr || typeid_cast<io::BrokerFileReader*>(json_file_reader.get()) != nullptr) { diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp index 293e1233d1..c4fd3c2409 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.cpp +++ b/be/src/vec/exec/format/orc/vorc_reader.cpp @@ -72,10 +72,12 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) { } } -OrcReader::OrcReader(RuntimeProfile* profile, const TFileScanRangeParams& params, - const TFileRangeDesc& range, const std::vector<std::string>& column_names, - size_t batch_size, const std::string& ctz, io::IOContext* io_ctx) +OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state, + const TFileScanRangeParams& params, const TFileRangeDesc& range, + const std::vector<std::string>& column_names, size_t batch_size, + const std::string& ctz, io::IOContext* io_ctx) : _profile(profile), + _state(state), _scan_params(params), _scan_range(range), _batch_size(std::max(batch_size, _MIN_BATCH_SIZE)), @@ -153,8 +155,10 @@ void OrcReader::_init_profile() { Status OrcReader::_create_file_reader() { if (_file_input_stream == nullptr) { io::FileReaderSPtr inner_reader; - RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &inner_reader)); + io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, + _file_description, &_file_system, + &inner_reader, cache_policy)); _file_input_stream.reset( new ORCFileInputStream(_scan_range.path, inner_reader, &_statistics, _io_ctx)); } diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h index 4fc2fd5ec1..5430922833 100644 --- a/be/src/vec/exec/format/orc/vorc_reader.h +++ b/be/src/vec/exec/format/orc/vorc_reader.h @@ -45,7 +45,7 @@ public: int64_t decode_null_map_time = 0; }; - OrcReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params, const TFileRangeDesc& range, const std::vector<std::string>& column_names, size_t batch_size, const std::string& ctz, io::IOContext* io_ctx); @@ -248,6 +248,7 @@ private: private: RuntimeProfile* _profile; + RuntimeState* _state; const TFileScanRangeParams& _scan_params; const TFileRangeDesc& _scan_range; FileSystemProperties _system_properties; diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h b/be/src/vec/exec/format/parquet/parquet_thrift_util.h index cccbe0f9c2..98b02609db 100644 --- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h +++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h @@ -35,12 +35,13 @@ namespace doris::vectorized { constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'}; constexpr uint32_t PARQUET_FOOTER_SIZE = 8; -static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** file_metadata) { +static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** file_metadata, + size_t* meta_size, io::IOContext* io_ctx) { uint8_t footer[PARQUET_FOOTER_SIZE]; int64_t file_size = file->size(); size_t bytes_read = 0; Slice result(footer, PARQUET_FOOTER_SIZE); - RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE, result, &bytes_read)); + RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE, result, &bytes_read, io_ctx)); DCHECK_EQ(bytes_read, PARQUET_FOOTER_SIZE); // validate magic @@ -60,12 +61,13 @@ static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** file_m // deserialize footer std::unique_ptr<uint8_t[]> meta_buff(new uint8_t[metadata_size]); Slice res(meta_buff.get(), metadata_size); - RETURN_IF_ERROR( - file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size, res, &bytes_read)); + RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size, res, &bytes_read, + io_ctx)); DCHECK_EQ(bytes_read, metadata_size); RETURN_IF_ERROR(deserialize_thrift_msg(meta_buff.get(), &metadata_size, true, &t_metadata)); *file_metadata = new FileMetaData(t_metadata); RETURN_IF_ERROR((*file_metadata)->init_schema()); + *meta_size = PARQUET_FOOTER_SIZE + metadata_size; return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index b08e316c22..d0e5f25d71 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -21,20 +21,21 @@ namespace doris::vectorized { ColumnChunkReader::ColumnChunkReader(io::BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema, - cctz::time_zone* ctz) + cctz::time_zone* ctz, io::IOContext* io_ctx) : _field_schema(field_schema), _max_rep_level(field_schema->repetition_level), _max_def_level(field_schema->definition_level), _stream_reader(reader), _metadata(column_chunk->meta_data), - _ctz(ctz) {} + _ctz(ctz), + _io_ctx(io_ctx) {} Status ColumnChunkReader::init() { size_t start_offset = _metadata.__isset.dictionary_page_offset ? _metadata.dictionary_page_offset : _metadata.data_page_offset; size_t chunk_size = _metadata.total_compressed_size; - _page_reader = std::make_unique<PageReader>(_stream_reader, start_offset, chunk_size); + _page_reader = std::make_unique<PageReader>(_stream_reader, _io_ctx, start_offset, chunk_size); // get the block compression codec RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, &_block_compress_codec)); if (_metadata.__isset.dictionary_page_offset) { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 6075275a7e..ad37e13815 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -65,7 +65,7 @@ public: }; ColumnChunkReader(io::BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk, - FieldSchema* field_schema, cctz::time_zone* ctz); + FieldSchema* field_schema, cctz::time_zone* ctz, io::IOContext* io_ctx); ~ColumnChunkReader() = default; // Initialize chunk reader, will generate the decoder and codec. @@ -175,6 +175,7 @@ private: io::BufferedStreamReader* _stream_reader; tparquet::ColumnMetaData _metadata; cctz::time_zone* _ctz; + io::IOContext* _io_ctx; std::unique_ptr<PageReader> _page_reader = nullptr; BlockCompressionCodec* _block_compress_codec = nullptr; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 02bb4e1d70..1657f5ab3a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -97,43 +97,44 @@ static void fill_array_offset(FieldSchema* field, ColumnArray::Offsets64& offset Status ParquetColumnReader::create(io::FileReaderSPtr file, FieldSchema* field, const tparquet::RowGroup& row_group, const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, + io::IOContext* io_ctx, std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size) { if (field->type.type == TYPE_ARRAY) { std::unique_ptr<ParquetColumnReader> element_reader; - RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, + RETURN_IF_ERROR(create(file, &field->children[0], row_group, row_ranges, ctz, io_ctx, element_reader, max_buf_size)); element_reader->set_nested_column(); - ArrayColumnReader* array_reader = new ArrayColumnReader(row_ranges, ctz); + ArrayColumnReader* array_reader = new ArrayColumnReader(row_ranges, ctz, io_ctx); RETURN_IF_ERROR(array_reader->init(std::move(element_reader), field)); reader.reset(array_reader); } else if (field->type.type == TYPE_MAP) { std::unique_ptr<ParquetColumnReader> key_reader; std::unique_ptr<ParquetColumnReader> value_reader; RETURN_IF_ERROR(create(file, &field->children[0].children[0], row_group, row_ranges, ctz, - key_reader, max_buf_size)); + io_ctx, key_reader, max_buf_size)); RETURN_IF_ERROR(create(file, &field->children[0].children[1], row_group, row_ranges, ctz, - value_reader, max_buf_size)); + io_ctx, value_reader, max_buf_size)); key_reader->set_nested_column(); value_reader->set_nested_column(); - MapColumnReader* map_reader = new MapColumnReader(row_ranges, ctz); + MapColumnReader* map_reader = new MapColumnReader(row_ranges, ctz, io_ctx); RETURN_IF_ERROR(map_reader->init(std::move(key_reader), std::move(value_reader), field)); reader.reset(map_reader); } else if (field->type.type == TYPE_STRUCT) { std::vector<std::unique_ptr<ParquetColumnReader>> child_readers; for (int i = 0; i < field->children.size(); ++i) { std::unique_ptr<ParquetColumnReader> child_reader; - RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz, + RETURN_IF_ERROR(create(file, &field->children[i], row_group, row_ranges, ctz, io_ctx, child_reader, max_buf_size)); child_reader->set_nested_column(); child_readers.emplace_back(std::move(child_reader)); } - StructColumnReader* struct_reader = new StructColumnReader(row_ranges, ctz); + StructColumnReader* struct_reader = new StructColumnReader(row_ranges, ctz, io_ctx); RETURN_IF_ERROR(struct_reader->init(std::move(child_readers), field)); reader.reset(struct_reader); } else { const tparquet::ColumnChunk& chunk = row_group.columns[field->physical_column_index]; - ScalarColumnReader* scalar_reader = new ScalarColumnReader(row_ranges, chunk, ctz); + ScalarColumnReader* scalar_reader = new ScalarColumnReader(row_ranges, chunk, ctz, io_ctx); RETURN_IF_ERROR(scalar_reader->init(file, field, max_buf_size)); reader.reset(scalar_reader); } @@ -173,8 +174,8 @@ Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, siz size_t chunk_len = chunk_meta.total_compressed_size; _stream_reader = std::make_unique<io::BufferedFileStreamReader>( file, chunk_start, chunk_len, std::min(chunk_len, max_buf_size)); - _chunk_reader = - std::make_unique<ColumnChunkReader>(_stream_reader.get(), &_chunk_meta, field, _ctz); + _chunk_reader = std::make_unique<ColumnChunkReader>(_stream_reader.get(), &_chunk_meta, field, + _ctz, _io_ctx); RETURN_IF_ERROR(_chunk_reader->init()); return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 181f93d974..26cdbd9dd3 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -77,8 +77,9 @@ public: } }; - ParquetColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz) - : _row_ranges(row_ranges), _ctz(ctz) {} + ParquetColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, + io::IOContext* io_ctx) + : _row_ranges(row_ranges), _ctz(ctz), _io_ctx(io_ctx) {} virtual ~ParquetColumnReader() = default; virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, ColumnSelectVector& select_vector, size_t batch_size, @@ -100,7 +101,8 @@ public: static Status create(io::FileReaderSPtr file, FieldSchema* field, const tparquet::RowGroup& row_group, const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, - std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size); + io::IOContext* io_ctx, std::unique_ptr<ParquetColumnReader>& reader, + size_t max_buf_size); void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; } void set_nested_column() { _nested_column = true; } virtual const std::vector<level_t>& get_rep_level() const = 0; @@ -117,6 +119,7 @@ protected: bool _nested_column = false; const std::vector<RowRange>& _row_ranges; cctz::time_zone* _ctz; + io::IOContext* _io_ctx; tparquet::OffsetIndex* _offset_index; int64_t _current_row_index = 0; int _row_range_index = 0; @@ -126,8 +129,9 @@ protected: class ScalarColumnReader : public ParquetColumnReader { public: ScalarColumnReader(const std::vector<RowRange>& row_ranges, - const tparquet::ColumnChunk& chunk_meta, cctz::time_zone* ctz) - : ParquetColumnReader(row_ranges, ctz), _chunk_meta(chunk_meta) {} + const tparquet::ColumnChunk& chunk_meta, cctz::time_zone* ctz, + io::IOContext* io_ctx) + : ParquetColumnReader(row_ranges, ctz, io_ctx), _chunk_meta(chunk_meta) {} ~ScalarColumnReader() override { close(); } Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, @@ -163,8 +167,9 @@ private: class ArrayColumnReader : public ParquetColumnReader { public: - ArrayColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz) - : ParquetColumnReader(row_ranges, ctz) {} + ArrayColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, + io::IOContext* io_ctx) + : ParquetColumnReader(row_ranges, ctz, io_ctx) {} ~ArrayColumnReader() override { close(); } Status init(std::unique_ptr<ParquetColumnReader> element_reader, FieldSchema* field); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, @@ -185,8 +190,9 @@ private: class MapColumnReader : public ParquetColumnReader { public: - MapColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz) - : ParquetColumnReader(row_ranges, ctz) {} + MapColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, + io::IOContext* io_ctx) + : ParquetColumnReader(row_ranges, ctz, io_ctx) {} ~MapColumnReader() override { close(); } Status init(std::unique_ptr<ParquetColumnReader> key_reader, @@ -218,8 +224,9 @@ private: class StructColumnReader : public ParquetColumnReader { public: - StructColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz) - : ParquetColumnReader(row_ranges, ctz) {} + StructColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz, + io::IOContext* io_ctx) + : ParquetColumnReader(row_ranges, ctz, io_ctx) {} ~StructColumnReader() override { close(); } Status init(std::vector<std::unique_ptr<ParquetColumnReader>>&& child_readers, diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index a25cce7171..0604106b03 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -34,7 +34,7 @@ const std::vector<int64_t> RowGroupReader::NO_DELETE = {}; RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, const std::vector<ParquetReadColumn>& read_columns, const int32_t row_group_id, const tparquet::RowGroup& row_group, - cctz::time_zone* ctz, + cctz::time_zone* ctz, io::IOContext* io_ctx, const PositionDeleteContext& position_delete_ctx, const LazyReadContext& lazy_read_ctx, RuntimeState* state) : _file_reader(file_reader), @@ -43,6 +43,7 @@ RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader, _row_group_meta(row_group), _remaining_rows(row_group.num_rows), _ctz(ctz), + _io_ctx(io_ctx), _position_delete_ctx(position_delete_ctx), _lazy_read_ctx(lazy_read_ctx), _state(state), @@ -85,7 +86,8 @@ Status RowGroupReader::init( auto field = const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name)); std::unique_ptr<ParquetColumnReader> reader; RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, _row_group_meta, - _read_ranges, _ctz, reader, max_buf_size)); + _read_ranges, _ctz, _io_ctx, reader, + max_buf_size)); auto col_iter = col_offsets.find(read_col._parquet_col_id); if (col_iter != col_offsets.end()) { tparquet::OffsetIndex oi = col_iter->second; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 49545e1051..33f8371699 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -107,7 +107,7 @@ public: RowGroupReader(io::FileReaderSPtr file_reader, const std::vector<ParquetReadColumn>& read_columns, const int32_t row_group_id, - const tparquet::RowGroup& row_group, cctz::time_zone* ctz, + const tparquet::RowGroup& row_group, cctz::time_zone* ctz, io::IOContext* io_ctx, const PositionDeleteContext& position_delete_ctx, const LazyReadContext& lazy_read_ctx, RuntimeState* state); @@ -167,6 +167,7 @@ private: const tparquet::RowGroup& _row_group_meta; int64_t _remaining_rows; cctz::time_zone* _ctz; + io::IOContext* _io_ctx; PositionDeleteContext _position_delete_ctx; // merge the row ranges generated from page index and position delete. std::vector<RowRange> _read_ranges; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index a74b328dee..12ef8d0427 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -26,8 +26,9 @@ namespace doris::vectorized { static constexpr size_t INIT_PAGE_HEADER_SIZE = 128; -PageReader::PageReader(io::BufferedStreamReader* reader, uint64_t offset, uint64_t length) - : _reader(reader), _start_offset(offset), _end_offset(offset + length) {} +PageReader::PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t offset, + uint64_t length) + : _reader(reader), _io_ctx(io_ctx), _start_offset(offset), _end_offset(offset + length) {} Status PageReader::next_page_header() { if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) { @@ -47,7 +48,7 @@ Status PageReader::next_page_header() { uint32_t real_header_size = 0; while (true) { header_size = std::min(header_size, max_size); - RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size)); + RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size, _io_ctx)); real_header_size = header_size; SCOPED_RAW_TIMER(&_statistics.decode_header_time); auto st = @@ -87,7 +88,7 @@ Status PageReader::get_page_data(Slice& slice) { } else { slice.size = _cur_page_header.compressed_page_size; } - RETURN_IF_ERROR(_reader->read_bytes(slice, _offset)); + RETURN_IF_ERROR(_reader->read_bytes(slice, _offset, _io_ctx)); _offset += slice.size; _state = INITIALIZED; return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h index ab42b45d6b..846ab96f36 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h @@ -32,7 +32,8 @@ public: int64_t decode_header_time = 0; }; - PageReader(io::BufferedStreamReader* reader, uint64_t offset, uint64_t length); + PageReader(io::BufferedStreamReader* reader, io::IOContext* io_ctx, uint64_t offset, + uint64_t length); ~PageReader() = default; bool has_next_page() const { return _offset < _end_offset; } @@ -57,6 +58,7 @@ private: enum PageReaderState { INITIALIZED, HEADER_PARSED }; io::BufferedStreamReader* _reader; + io::IOContext* _io_ctx; tparquet::PageHeader _cur_page_header; Statistics _statistics; PageReaderState _state = INITIALIZED; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 50f01b6b86..1c34ea8412 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -170,23 +170,27 @@ Status ParquetReader::_open_file() { if (_file_reader == nullptr) { SCOPED_RAW_TIMER(&_statistics.open_file_time); ++_statistics.open_file_num; - RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &_file_reader)); + io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, + _file_description, &_file_system, + &_file_reader, cache_policy)); } if (_file_metadata == nullptr) { SCOPED_RAW_TIMER(&_statistics.parse_footer_time); if (_file_reader->size() == 0) { return Status::EndOfFile("open file failed, empty parquet file: " + _scan_range.path); } + size_t meta_size = 0; if (_kv_cache == nullptr) { _is_file_metadata_owned = true; - RETURN_IF_ERROR(parse_thrift_footer(_file_reader, &_file_metadata)); + RETURN_IF_ERROR( + parse_thrift_footer(_file_reader, &_file_metadata, &meta_size, _io_ctx)); } else { _is_file_metadata_owned = false; _file_metadata = _kv_cache->get<FileMetaData>( _meta_cache_key(_file_reader->path()), [&]() -> FileMetaData* { FileMetaData* meta; - Status st = parse_thrift_footer(_file_reader, &meta); + Status st = parse_thrift_footer(_file_reader, &meta, &meta_size, _io_ctx); if (!st) { LOG(INFO) << "failed to parse parquet footer for " << _file_description.path << ", err: " << st; @@ -200,6 +204,8 @@ Status ParquetReader::_open_file() { return Status::InternalError("failed to get file meta data: {}", _file_description.path); } + _column_statistics.read_bytes += meta_size; + _column_statistics.read_calls += 2; } return Status::OK(); } @@ -519,9 +525,9 @@ Status ParquetReader::_next_row_group_reader() { RowGroupReader::PositionDeleteContext position_delete_ctx = _get_position_delete_ctx(row_group, row_group_index); - _current_group_reader.reset(new RowGroupReader(_file_reader, _read_columns, - row_group_index.row_group_id, row_group, _ctz, - position_delete_ctx, _lazy_read_ctx, _state)); + _current_group_reader.reset( + new RowGroupReader(_file_reader, _read_columns, row_group_index.row_group_id, row_group, + _ctz, _io_ctx, position_delete_ctx, _lazy_read_ctx, _state)); _row_group_eof = false; return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges, _col_offsets, _tuple_descriptor, _row_descriptor, _colname_to_slot_id, @@ -617,12 +623,15 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, Slice result(col_index_buff, page_index._column_index_size); RETURN_IF_ERROR( _file_reader->read_at(page_index._column_index_start, result, &bytes_read, _io_ctx)); + _column_statistics.read_bytes += bytes_read; auto& schema_desc = _file_metadata->schema(); std::vector<RowRange> skipped_row_ranges; uint8_t off_index_buff[page_index._offset_index_size]; Slice res(off_index_buff, page_index._offset_index_size); RETURN_IF_ERROR( _file_reader->read_at(page_index._offset_index_start, res, &bytes_read, _io_ctx)); + _column_statistics.read_bytes += bytes_read; + _column_statistics.read_calls += 2; for (auto& read_col : _read_columns) { auto conjunct_iter = _colname_to_value_range->find(read_col._file_slot_name); if (_colname_to_value_range->end() == conjunct_iter) { diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 298fad4d11..44c567ec3d 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -81,7 +81,6 @@ Status VFileScanner::prepare( _io_ctx.reset(new io::IOContext()); _io_ctx->file_cache_stats = _file_cache_statistics.get(); _io_ctx->query_id = &_state->query_id(); - _io_ctx->enable_file_cache = _state->query_options().enable_file_cache; if (_is_load) { _src_row_desc.reset(new RowDescriptor(_state->desc_tbl(), @@ -566,7 +565,7 @@ Status VFileScanner::_get_next_reader() { break; } case TFileFormatType::FORMAT_ORC: { - _cur_reader.reset(new OrcReader(_profile, _params, range, _file_col_names, + _cur_reader.reset(new OrcReader(_profile, _state, _params, range, _file_col_names, _state->query_options().batch_size, _state->timezone(), _io_ctx.get())); init_status = ((OrcReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range); @@ -832,7 +831,7 @@ Status VFileScanner::close(RuntimeState* state) { } } - if (config::enable_file_cache) { + if (config::enable_file_cache && _state->query_options().enable_file_cache) { io::FileCacheProfileReporter cache_profile(_profile); cache_profile.update(_file_cache_statistics.get()); } diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp index 388a0e78ce..9e3137f7c6 100644 --- a/be/src/vec/exec/varrow_scanner.cpp +++ b/be/src/vec/exec/varrow_scanner.cpp @@ -80,9 +80,10 @@ Status VArrowScanner::_open_next_reader() { io::FileReaderSPtr file_reader; _init_system_properties(range); _init_file_description(range); - // no use - RETURN_IF_ERROR(FileFactory::create_file_reader( - _profile, _system_properties, _file_description, &_file_system, &file_reader)); + io::FileCachePolicy cache_policy = FileFactory::get_cache_policy(_state); + RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _system_properties, + _file_description, &_file_system, + &file_reader, cache_policy)); if (file_reader->size() == 0) { continue; diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 033606f310..3db30dfc2b 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -55,7 +55,8 @@ TEST_F(ParquetThriftReaderTest, normal) { EXPECT_TRUE(st.ok()); FileMetaData* meta_data; - parse_thrift_footer(reader, &meta_data); + size_t meta_size; + parse_thrift_footer(reader, &meta_data, &meta_size, nullptr); tparquet::FileMetaData t_metadata = meta_data->to_thrift(); LOG(WARNING) << "====================================="; @@ -88,7 +89,8 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) { EXPECT_TRUE(st.ok()); FileMetaData* metadata; - parse_thrift_footer(reader, &metadata); + size_t meta_size; + parse_thrift_footer(reader, &metadata, &meta_size, nullptr); tparquet::FileMetaData t_metadata = metadata->to_thrift(); FieldDescriptor schemaDescriptor; schemaDescriptor.parse_from_thrift(t_metadata.schema); @@ -164,7 +166,7 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column cctz::time_zone ctz; TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); - ColumnChunkReader chunk_reader(&stream_reader, column_chunk, field_schema, &ctz); + ColumnChunkReader chunk_reader(&stream_reader, column_chunk, field_schema, &ctz, nullptr); // initialize chunk reader chunk_reader.init(); // seek to next page header @@ -361,7 +363,8 @@ static void read_parquet_data_and_check(const std::string& parquet_file, std::unique_ptr<vectorized::Block> block; create_block(block); FileMetaData* metadata; - parse_thrift_footer(reader, &metadata); + size_t meta_size; + parse_thrift_footer(reader, &metadata, &meta_size, nullptr); tparquet::FileMetaData t_metadata = metadata->to_thrift(); FieldDescriptor schema_descriptor; schema_descriptor.parse_from_thrift(t_metadata.schema); @@ -482,7 +485,8 @@ TEST_F(ParquetThriftReaderTest, group_reader) { // prepare metadata FileMetaData* meta_data; - parse_thrift_footer(file_reader, &meta_data); + size_t meta_size; + parse_thrift_footer(file_reader, &meta_data, &meta_size, nullptr); tparquet::FileMetaData t_metadata = meta_data->to_thrift(); cctz::time_zone ctz; @@ -491,7 +495,8 @@ TEST_F(ParquetThriftReaderTest, group_reader) { std::shared_ptr<RowGroupReader> row_group_reader; RowGroupReader::PositionDeleteContext position_delete_ctx(row_group.num_rows, 0); row_group_reader.reset(new RowGroupReader(file_reader, read_columns, 0, row_group, &ctz, - position_delete_ctx, lazy_read_ctx, nullptr)); + nullptr, position_delete_ctx, lazy_read_ctx, + nullptr)); std::vector<RowRange> row_ranges; row_ranges.emplace_back(0, row_group.num_rows); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org