This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 026ffaf10d [feature-wip](parquet-reader) add detail profile for parquet reader (#13095) 026ffaf10d is described below commit 026ffaf10db3069270568c051eff0474ed8d4b0c Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Sun Oct 2 15:11:48 2022 +0800 [feature-wip](parquet-reader) add detail profile for parquet reader (#13095) Add more detail profile for ParquetReader: ParquetColumnReadTime: the total time of reading parquet columns ParquetDecodeDictTime: time to parse dictionary page ParquetDecodeHeaderTime: time to parse page header ParquetDecodeLevelTime: time to parse page's definition/repetition level ParquetDecodeValueTime: time to decode page data into doris column ParquetDecompressCount: counter of decompressing page data ParquetDecompressTime: time to decompress page data ParquetParseMetaTime: time to parse parquet meta data --- be/src/io/buffered_reader.cpp | 6 ++ be/src/io/buffered_reader.h | 10 ++ be/src/io/hdfs_file_reader.cpp | 21 +++-- be/src/vec/exec/format/parquet/parquet_common.h | 16 ++++ .../parquet/vparquet_column_chunk_reader.cpp | 10 ++ .../format/parquet/vparquet_column_chunk_reader.h | 19 +++- .../exec/format/parquet/vparquet_column_reader.cpp | 2 +- .../exec/format/parquet/vparquet_column_reader.h | 53 ++++++++++- .../exec/format/parquet/vparquet_group_reader.cpp | 19 ++-- .../exec/format/parquet/vparquet_group_reader.h | 10 +- .../vec/exec/format/parquet/vparquet_page_index.h | 5 +- .../exec/format/parquet/vparquet_page_reader.cpp | 1 + .../vec/exec/format/parquet/vparquet_page_reader.h | 7 ++ be/src/vec/exec/format/parquet/vparquet_reader.cpp | 103 +++++++++++++++------ be/src/vec/exec/format/parquet/vparquet_reader.h | 98 +++++++++----------- be/src/vec/exec/scan/vfile_scanner.cpp | 23 +++-- be/test/vec/exec/parquet/parquet_reader_test.cpp | 47 +++++----- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 1 + 18 files changed, 301 insertions(+), 150 deletions(-) diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp index c98b365235..021f0b9d23 100644 --- a/be/src/io/buffered_reader.cpp +++ b/be/src/io/buffered_reader.cpp @@ -221,15 +221,21 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset int64_t buf_remaining = _buf_end_offset - _buf_start_offset; int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset); int64_t has_read = 0; + SCOPED_RAW_TIMER(&_statistics.read_time); while (has_read < to_read) { int64_t loop_read = 0; RETURN_IF_ERROR(_file->readat(_buf_end_offset + has_read, to_read - has_read, &loop_read, _buf.get() + buf_remaining + has_read)); + _statistics.read_calls++; + if (loop_read <= 0) { + break; + } has_read += loop_read; } if (has_read != to_read) { return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read); } + _statistics.read_bytes += to_read; _buf_end_offset += to_read; *buf = _buf.get(); return Status::OK(); diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h index 97ec01cc7d..abcf24916e 100644 --- a/be/src/io/buffered_reader.h +++ b/be/src/io/buffered_reader.h @@ -87,6 +87,12 @@ private: */ class BufferedStreamReader { public: + struct Statistics { + int64_t read_time = 0; + int64_t read_calls = 0; + int64_t read_bytes = 0; + }; + /** * Return the address of underlying buffer that locates the start of data between [offset, offset + bytes_to_read) * @param buf the buffer address to save the start address of data @@ -98,7 +104,11 @@ public: * Save the data address to slice.data, and the slice.size is the bytes to read. */ virtual Status read_bytes(Slice& slice, uint64_t offset) = 0; + Statistics& statistics() { return _statistics; } virtual ~BufferedStreamReader() = default; + +protected: + Statistics _statistics; }; class BufferedFileStreamReader : public BufferedStreamReader { diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp index 37b2d73bba..de5d7ee8c7 100644 --- a/be/src/io/hdfs_file_reader.cpp +++ b/be/src/io/hdfs_file_reader.cpp @@ -144,13 +144,22 @@ Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_r seek(position); } - *bytes_read = hdfsRead(_hdfs_fs, _hdfs_file, out, nbytes); - if (*bytes_read < 0) { - return Status::InternalError( - "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", - BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError()); + int64_t has_read = 0; + char* cast_out = reinterpret_cast<char*>(out); + while (has_read < nbytes) { + int64_t loop_read = hdfsRead(_hdfs_fs, _hdfs_file, cast_out + has_read, nbytes - has_read); + if (loop_read < 0) { + return Status::InternalError( + "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", + BackendOptions::get_localhost(), _namenode, _path, hdfsGetLastError()); + } + if (loop_read == 0) { + break; + } + has_read += loop_read; } - _current_offset += *bytes_read; // save offset with file + *bytes_read = has_read; + _current_offset += has_read; // save offset with file return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index a56fdb6476..e08027a137 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -38,6 +38,22 @@ namespace doris::vectorized { using level_t = int16_t; +struct RowRange { + RowRange() {} + RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {} + + int64_t first_row; + int64_t last_row; +}; + +struct ParquetReadColumn { + ParquetReadColumn(int parquet_col_id, const std::string& file_slot_name) + : _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {}; + + int _parquet_col_id; + const std::string& _file_slot_name; +}; + struct ParquetInt96 { uint64_t lo; // time of nanoseconds in a day uint32_t hi; // days from julian epoch diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 193200f28d..fc8b8cfdee 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -86,6 +86,8 @@ Status ColumnChunkReader::load_page_data() { // check decompressed buffer size _reserve_decompress_buf(uncompressed_size); _page_data = Slice(_decompress_buf.get(), uncompressed_size); + SCOPED_RAW_TIMER(&_statistics.decompress_time); + _statistics.decompress_cnt++; RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data)); } else { RETURN_IF_ERROR(_page_reader->get_page_data(_page_data)); @@ -93,11 +95,13 @@ Status ColumnChunkReader::load_page_data() { // Initialize repetition level and definition level. Skip when level = 0, which means required field. if (_max_rep_level > 0) { + SCOPED_RAW_TIMER(&_statistics.decode_level_time); RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data, header.data_page_header.repetition_level_encoding, _max_rep_level, _remaining_num_values)); } if (_max_def_level > 0) { + SCOPED_RAW_TIMER(&_statistics.decode_level_time); RETURN_IF_ERROR(_def_level_decoder.init(&_page_data, header.data_page_header.definition_level_encoding, _max_def_level, _remaining_num_values)); @@ -132,6 +136,7 @@ Status ColumnChunkReader::load_page_data() { Status ColumnChunkReader::_decode_dict_page() { const tparquet::PageHeader& header = *_page_reader->get_page_header(); DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type); + SCOPED_RAW_TIMER(&_statistics.decode_dict_time); // Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 specification. // Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page for Parquet 2.0+ files. @@ -187,6 +192,7 @@ Status ColumnChunkReader::skip_values(size_t num_values, bool skip_data) { } _remaining_num_values -= num_values; if (skip_data) { + SCOPED_RAW_TIMER(&_statistics.decode_value_time); return _page_decoder->skip_values(num_values); } else { return Status::OK(); @@ -194,6 +200,7 @@ Status ColumnChunkReader::skip_values(size_t num_values, bool skip_data) { } void ColumnChunkReader::insert_null_values(ColumnPtr& doris_column, size_t num_values) { + SCOPED_RAW_TIMER(&_statistics.decode_value_time); DCHECK_GE(_remaining_num_values, num_values); CHECK(doris_column->is_nullable()); auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( @@ -206,6 +213,7 @@ void ColumnChunkReader::insert_null_values(ColumnPtr& doris_column, size_t num_v } void ColumnChunkReader::insert_null_values(MutableColumnPtr& doris_column, size_t num_values) { + SCOPED_RAW_TIMER(&_statistics.decode_value_time); for (int i = 0; i < num_values; ++i) { doris_column->insert_default(); } @@ -227,6 +235,7 @@ Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, DataTypePtr& da if (UNLIKELY(_remaining_num_values < num_values)) { return Status::IOError("Decode too many values in current page"); } + SCOPED_RAW_TIMER(&_statistics.decode_value_time); _remaining_num_values -= num_values; return _page_decoder->decode_values(doris_column, data_type, num_values); } @@ -236,6 +245,7 @@ Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataType if (UNLIKELY(_remaining_num_values < num_values)) { return Status::IOError("Decode too many values in current page"); } + SCOPED_RAW_TIMER(&_statistics.decode_value_time); _remaining_num_values -= num_values; return _page_decoder->decode_values(doris_column, data_type, num_values); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 44f5b56ff2..1599762152 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -29,8 +29,6 @@ #include "parquet_common.h" #include "schema_desc.h" #include "util/block_compression.h" -#include "vec/columns/column_array.h" -#include "vec/columns/column_nullable.h" #include "vparquet_page_reader.h" namespace doris::vectorized { @@ -57,6 +55,15 @@ namespace doris::vectorized { */ class ColumnChunkReader { public: + struct Statistics { + int64_t decompress_time = 0; + int64_t decompress_cnt = 0; + int64_t decode_header_time = 0; + int64_t decode_value_time = 0; + int64_t decode_dict_time = 0; + int64_t decode_level_time = 0; + }; + ColumnChunkReader(BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema, cctz::time_zone* ctz); ~ColumnChunkReader() = default; @@ -96,7 +103,7 @@ public: // Load page data into the underlying container, // and initialize the repetition and definition level decoder for current page data. Status load_page_data(); - Status load_page_date_idempotent() { + Status load_page_data_idempotent() { if (_state == DATA_LOADED) { return Status::OK(); } @@ -131,6 +138,11 @@ public: // Get page decoder Decoder* get_page_decoder() { return _page_decoder; } + Statistics& statistics() { + _statistics.decode_header_time = _page_reader->statistics().decode_header_time; + return _statistics; + } + private: enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED }; @@ -161,6 +173,7 @@ private: // Map: encoding -> Decoder // Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding std::unordered_map<int, std::unique_ptr<Decoder>> _decoders; + Statistics _statistics; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 3074705ffa..1bcb640865 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -176,7 +176,7 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr *read_rows = 0; } else { // load page data to decode or skip values - RETURN_IF_ERROR(_chunk_reader->load_page_date_idempotent()); + RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent()); size_t has_read = 0; for (auto& range : read_ranges) { // generate the skipped values diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index b245ff6aa4..0a5d51e2e0 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -21,13 +21,9 @@ #include "schema_desc.h" #include "vparquet_column_chunk_reader.h" -#include "vparquet_reader.h" namespace doris::vectorized { -struct RowRange; -class ParquetReadColumn; - class ParquetColumnMetadata { public: ParquetColumnMetadata(int64_t chunk_start_offset, int64_t chunk_length, @@ -49,6 +45,52 @@ private: class ParquetColumnReader { public: + struct Statistics { + Statistics() + : read_time(0), + read_calls(0), + read_bytes(0), + decompress_time(0), + decompress_cnt(0), + decode_header_time(0), + decode_value_time(0), + decode_dict_time(0), + decode_level_time(0) {} + + Statistics(BufferedStreamReader::Statistics& fs, ColumnChunkReader::Statistics& cs) + : read_time(fs.read_time), + read_calls(fs.read_calls), + read_bytes(fs.read_bytes), + decompress_time(cs.decompress_time), + decompress_cnt(cs.decompress_cnt), + decode_header_time(cs.decode_header_time), + decode_value_time(cs.decode_value_time), + decode_dict_time(cs.decode_dict_time), + decode_level_time(cs.decode_level_time) {} + + int64_t read_time; + int64_t read_calls; + int64_t read_bytes; + int64_t decompress_time; + int64_t decompress_cnt; + int64_t decode_header_time; + int64_t decode_value_time; + int64_t decode_dict_time; + int64_t decode_level_time; + + void merge(Statistics& statistics) { + read_time += statistics.read_time; + read_calls += statistics.read_calls; + read_bytes += statistics.read_bytes; + decompress_time += statistics.decompress_time; + decompress_cnt += statistics.decompress_cnt; + decode_header_time += statistics.decode_header_time; + decode_value_time += statistics.decode_value_time; + decode_dict_time += statistics.decode_dict_time; + decode_level_time += statistics.decode_level_time; + } + }; + ParquetColumnReader(cctz::time_zone* ctz) : _ctz(ctz) {}; virtual ~ParquetColumnReader() { if (_stream_reader != nullptr) { @@ -64,6 +106,9 @@ public: size_t max_buf_size); void init_column_metadata(const tparquet::ColumnChunk& chunk); void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; } + Statistics statistics() { + return Statistics(_stream_reader->statistics(), _chunk_reader->statistics()); + } virtual void close() = 0; protected: diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 0168a97a43..ddcc6494d0 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -38,14 +38,6 @@ RowGroupReader::~RowGroupReader() { Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) { - VLOG_DEBUG << "Row group id: " << _row_group_id; - RETURN_IF_ERROR(_init_column_readers(schema, row_ranges, col_offsets)); - return Status::OK(); -} - -Status RowGroupReader::_init_column_readers( - const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, - std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) { const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20; const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20; size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size()); @@ -60,7 +52,7 @@ Status RowGroupReader::_init_column_readers( reader->add_offset_index(&oi); } if (reader == nullptr) { - VLOG_DEBUG << "Init row group reader failed"; + VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed"; return Status::Corruption("Init row group reader failed"); } _column_readers[read_col._file_slot_name] = std::move(reader); @@ -100,4 +92,13 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_ return Status::OK(); } +ParquetColumnReader::Statistics RowGroupReader::statistics() { + ParquetColumnReader::Statistics st; + for (auto& reader : _column_readers) { + auto ost = reader.second->statistics(); + st.merge(ost); + } + return st; +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 2e72d42805..27daffe6f7 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -17,17 +17,11 @@ #pragma once #include <common/status.h> -#include "exprs/expr_context.h" #include "io/file_reader.h" #include "vec/core/block.h" #include "vparquet_column_reader.h" -#include "vparquet_file_metadata.h" -#include "vparquet_reader.h" namespace doris::vectorized { -class ParquetReadColumn; -class ParquetColumnReader; -struct RowRange; class RowGroupReader { public: @@ -39,9 +33,7 @@ public: std::unordered_map<int, tparquet::OffsetIndex>& col_offsets); Status next_batch(Block* block, size_t batch_size, bool* _batch_eof); -private: - Status _init_column_readers(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges, - std::unordered_map<int, tparquet::OffsetIndex>& col_offsets); + ParquetColumnReader::Statistics statistics(); private: doris::FileReader* _file_reader; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h index 2f4b0974b8..cfbe97ded4 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h @@ -19,11 +19,10 @@ #include <common/status.h> #include <gen_cpp/parquet_types.h> -#include "vparquet_reader.h" +#include "exec/olap_common.h" +#include "parquet_common.h" namespace doris::vectorized { -class ParquetReader; -struct RowRange; class PageIndex { public: diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index baa88036c2..00e2ef0926 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -48,6 +48,7 @@ Status PageReader::next_page_header() { header_size = std::min(header_size, max_size); RETURN_IF_ERROR(_reader->read_bytes(&page_header_buf, _offset, header_size)); real_header_size = header_size; + SCOPED_RAW_TIMER(&_statistics.decode_header_time); auto st = deserialize_thrift_msg(page_header_buf, &real_header_size, true, &_cur_page_header); if (st.ok()) { diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h index 0d83c81650..5563f97409 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h @@ -28,6 +28,10 @@ namespace doris::vectorized { */ class PageReader { public: + struct Statistics { + int64_t decode_header_time; + }; + PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length); ~PageReader() = default; @@ -41,6 +45,8 @@ public: Status get_page_data(Slice& slice); + Statistics& statistics() { return _statistics; } + void seek_to_page(int64_t page_header_offset) { _offset = page_header_offset; _next_header_offset = page_header_offset; @@ -52,6 +58,7 @@ private: BufferedStreamReader* _reader; tparquet::PageHeader _cur_page_header; + Statistics _statistics; PageReaderState _state = INITIALIZED; uint64_t _offset = 0; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 85e19425c0..6b8a01c83c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -24,47 +24,79 @@ #include "parquet_thrift_util.h" namespace doris::vectorized { -ParquetReader::ParquetReader(RuntimeProfile* profile, FileReader* file_reader, - const TFileScanRangeParams& params, const TFileRangeDesc& range, + +ParquetReader::ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, const std::vector<std::string>& column_names, size_t batch_size, cctz::time_zone* ctz) : _profile(profile), - _file_reader(file_reader), - // _scan_params(params), - // _scan_range(range), + _scan_params(params), + _scan_range(range), _batch_size(batch_size), _range_start_offset(range.start_offset), _range_size(range.size), _ctz(ctz), _column_names(column_names) { - if (profile != nullptr) { - _filtered_row_groups = ADD_COUNTER(profile, "ParquetFilteredGroups", TUnit::UNIT); - _to_read_row_groups = ADD_COUNTER(profile, "ParquetReadGroups", TUnit::UNIT); - _filtered_group_rows = ADD_COUNTER(profile, "ParquetFilteredRowsByGroup", TUnit::UNIT); - _filtered_page_rows = ADD_COUNTER(profile, "ParquetFilteredRowsByPage", TUnit::UNIT); - _filtered_bytes = ADD_COUNTER(profile, "ParquetFilteredBytes", TUnit::BYTES); - _to_read_bytes = ADD_COUNTER(profile, "ParquetReadBytes", TUnit::BYTES); - } + _init_profile(); } ParquetReader::~ParquetReader() { close(); } +void ParquetReader::_init_profile() { + if (_profile != nullptr) { + _parquet_profile.filtered_row_groups = + ADD_COUNTER(_profile, "ParquetFilteredGroups", TUnit::UNIT); + _parquet_profile.to_read_row_groups = + ADD_COUNTER(_profile, "ParquetReadGroups", TUnit::UNIT); + _parquet_profile.filtered_group_rows = + ADD_COUNTER(_profile, "ParquetFilteredRowsByGroup", TUnit::UNIT); + _parquet_profile.filtered_page_rows = + ADD_COUNTER(_profile, "ParquetFilteredRowsByPage", TUnit::UNIT); + _parquet_profile.filtered_bytes = + ADD_COUNTER(_profile, "ParquetFilteredBytes", TUnit::BYTES); + _parquet_profile.to_read_bytes = ADD_COUNTER(_profile, "ParquetReadBytes", TUnit::BYTES); + _parquet_profile.column_read_time = ADD_TIMER(_profile, "ParquetColumnReadTime"); + _parquet_profile.parse_meta_time = ADD_TIMER(_profile, "ParquetParseMetaTime"); + + _parquet_profile.file_read_time = ADD_TIMER(_profile, "FileReadTime"); + _parquet_profile.file_read_calls = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT); + _parquet_profile.file_read_bytes = ADD_COUNTER(_profile, "FileReadBytes", TUnit::BYTES); + _parquet_profile.decompress_time = ADD_TIMER(_profile, "ParquetDecompressTime"); + _parquet_profile.decompress_cnt = + ADD_COUNTER(_profile, "ParquetDecompressCount", TUnit::UNIT); + _parquet_profile.decode_header_time = ADD_TIMER(_profile, "ParquetDecodeHeaderTime"); + _parquet_profile.decode_value_time = ADD_TIMER(_profile, "ParquetDecodeValueTime"); + _parquet_profile.decode_dict_time = ADD_TIMER(_profile, "ParquetDecodeDictTime"); + _parquet_profile.decode_level_time = ADD_TIMER(_profile, "ParquetDecodeLevelTime"); + } +} + void ParquetReader::close() { if (!_closed) { - if (_file_reader != nullptr) { - _file_reader->close(); - delete _file_reader; - } - if (_profile != nullptr) { - COUNTER_UPDATE(_filtered_row_groups, _statistics.filtered_row_groups); - COUNTER_UPDATE(_to_read_row_groups, _statistics.read_row_groups); - COUNTER_UPDATE(_filtered_group_rows, _statistics.filtered_group_rows); - COUNTER_UPDATE(_filtered_page_rows, _statistics.filtered_page_rows); - COUNTER_UPDATE(_filtered_bytes, _statistics.filtered_bytes); - COUNTER_UPDATE(_to_read_bytes, _statistics.read_bytes); + COUNTER_UPDATE(_parquet_profile.filtered_row_groups, _statistics.filtered_row_groups); + COUNTER_UPDATE(_parquet_profile.to_read_row_groups, _statistics.read_row_groups); + COUNTER_UPDATE(_parquet_profile.filtered_group_rows, _statistics.filtered_group_rows); + COUNTER_UPDATE(_parquet_profile.filtered_page_rows, _statistics.filtered_page_rows); + COUNTER_UPDATE(_parquet_profile.filtered_bytes, _statistics.filtered_bytes); + COUNTER_UPDATE(_parquet_profile.to_read_bytes, _statistics.read_bytes); + COUNTER_UPDATE(_parquet_profile.column_read_time, _statistics.column_read_time); + COUNTER_UPDATE(_parquet_profile.parse_meta_time, _statistics.parse_meta_time); + + COUNTER_UPDATE(_parquet_profile.file_read_time, _column_statistics.read_time); + COUNTER_UPDATE(_parquet_profile.file_read_calls, _column_statistics.read_calls); + COUNTER_UPDATE(_parquet_profile.file_read_bytes, _column_statistics.read_bytes); + COUNTER_UPDATE(_parquet_profile.decompress_time, _column_statistics.decompress_time); + COUNTER_UPDATE(_parquet_profile.decompress_cnt, _column_statistics.decompress_cnt); + COUNTER_UPDATE(_parquet_profile.decode_header_time, + _column_statistics.decode_header_time); + COUNTER_UPDATE(_parquet_profile.decode_value_time, + _column_statistics.decode_value_time); + COUNTER_UPDATE(_parquet_profile.decode_dict_time, _column_statistics.decode_dict_time); + COUNTER_UPDATE(_parquet_profile.decode_level_time, + _column_statistics.decode_level_time); } _closed = true; } @@ -72,8 +104,16 @@ void ParquetReader::close() { Status ParquetReader::init_reader( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { - CHECK(_file_reader != nullptr); - RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata)); + SCOPED_RAW_TIMER(&_statistics.parse_meta_time); + if (_file_reader == nullptr) { + RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range, + _file_reader, 0)); + } + RETURN_IF_ERROR(_file_reader->open()); + if (_file_reader->size() == 0) { + return Status::EndOfFile("Empty Parquet File"); + } + RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata)); _t_metadata = &_file_metadata->to_thrift(); _total_groups = _t_metadata->row_groups.size(); if (_total_groups == 0) { @@ -145,8 +185,13 @@ Status ParquetReader::get_next_block(Block* block, bool* eof) { return Status::OK(); } bool _batch_eof = false; - RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size, &_batch_eof)); + { + SCOPED_RAW_TIMER(&_statistics.column_read_time); + RETURN_IF_ERROR(_current_group_reader->next_batch(block, _batch_size, &_batch_eof)); + } if (_batch_eof) { + auto column_st = _current_group_reader->statistics(); + _column_statistics.merge(column_st); if (!_next_row_group_reader()) { *eof = true; } @@ -169,8 +214,8 @@ Status ParquetReader::_init_row_group_readers() { for (auto row_group_id : _read_row_groups) { auto& row_group = _t_metadata->row_groups[row_group_id]; std::shared_ptr<RowGroupReader> row_group_reader; - row_group_reader.reset( - new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group, _ctz)); + row_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, row_group_id, + row_group, _ctz)); std::vector<RowRange> candidate_row_ranges; RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges)); if (candidate_row_ranges.empty()) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 9eea2ddb61..ab44c31517 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -29,55 +29,34 @@ #include "io/file_reader.h" #include "vec/core/block.h" #include "vec/exec/format/generic_reader.h" +#include "vparquet_column_reader.h" #include "vparquet_file_metadata.h" #include "vparquet_group_reader.h" #include "vparquet_page_index.h" namespace doris::vectorized { -struct ParquetStatistics { - int32_t filtered_row_groups = 0; - int32_t read_row_groups = 0; - int64_t filtered_group_rows = 0; - int64_t filtered_page_rows = 0; - int64_t read_rows = 0; - int64_t filtered_bytes = 0; - int64_t read_bytes = 0; -}; - -class RowGroupReader; -class PageIndex; - -struct RowRange { - RowRange() {} - RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {} - int64_t first_row; - int64_t last_row; -}; - -class ParquetReadColumn { -public: - ParquetReadColumn(int parquet_col_id, const std::string& file_slot_name) - : _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {}; - ~ParquetReadColumn() = default; - -private: - friend class ParquetReader; - friend class RowGroupReader; - int _parquet_col_id; - const std::string& _file_slot_name; -}; - class ParquetReader : public GenericReader { public: - ParquetReader(RuntimeProfile* profile, FileReader* file_reader, - const TFileScanRangeParams& params, const TFileRangeDesc& range, - const std::vector<std::string>& column_names, size_t batch_size, - cctz::time_zone* ctz); + struct Statistics { + int32_t filtered_row_groups = 0; + int32_t read_row_groups = 0; + int64_t filtered_group_rows = 0; + int64_t filtered_page_rows = 0; + int64_t read_rows = 0; + int64_t filtered_bytes = 0; + int64_t read_bytes = 0; + int64_t column_read_time = 0; + int64_t parse_meta_time = 0; + }; + + ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params, + const TFileRangeDesc& range, const std::vector<std::string>& column_names, + size_t batch_size, cctz::time_zone* ctz); virtual ~ParquetReader(); // for test - void set_file_reader(FileReader* file_reader) { _file_reader = file_reader; } + void set_file_reader(FileReader* file_reader) { _file_reader.reset(file_reader); } Status init_reader( std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); @@ -92,9 +71,31 @@ public: Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, std::unordered_set<std::string>* missing_cols) override; - ParquetStatistics& statistics() { return _statistics; } + Statistics& statistics() { return _statistics; } private: + struct ParquetProfile { + RuntimeProfile::Counter* filtered_row_groups; + RuntimeProfile::Counter* to_read_row_groups; + RuntimeProfile::Counter* filtered_group_rows; + RuntimeProfile::Counter* filtered_page_rows; + RuntimeProfile::Counter* filtered_bytes; + RuntimeProfile::Counter* to_read_bytes; + RuntimeProfile::Counter* column_read_time; + RuntimeProfile::Counter* parse_meta_time; + + RuntimeProfile::Counter* file_read_time; + RuntimeProfile::Counter* file_read_calls; + RuntimeProfile::Counter* file_read_bytes; + RuntimeProfile::Counter* decompress_time; + RuntimeProfile::Counter* decompress_cnt; + RuntimeProfile::Counter* decode_header_time; + RuntimeProfile::Counter* decode_value_time; + RuntimeProfile::Counter* decode_dict_time; + RuntimeProfile::Counter* decode_level_time; + }; + + void _init_profile(); bool _next_row_group_reader(); Status _init_read_columns(); Status _init_row_group_readers(); @@ -117,10 +118,9 @@ private: private: RuntimeProfile* _profile; - // file reader is passed from file scanner, and owned by this parquet reader. - FileReader* _file_reader = nullptr; - // const TFileScanRangeParams& _scan_params; - // const TFileRangeDesc& _scan_range; + const TFileScanRangeParams& _scan_params; + const TFileRangeDesc& _scan_range; + std::unique_ptr<FileReader> _file_reader = nullptr; std::shared_ptr<FileMetaData> _file_metadata; const tparquet::FileMetaData* _t_metadata; @@ -141,15 +141,9 @@ private: const std::vector<std::string> _column_names; std::vector<std::string> _missing_cols; - ParquetStatistics _statistics; + Statistics _statistics; + ParquetColumnReader::Statistics _column_statistics; + ParquetProfile _parquet_profile; bool _closed = false; - - // parquet profile - RuntimeProfile::Counter* _filtered_row_groups; - RuntimeProfile::Counter* _to_read_row_groups; - RuntimeProfile::Counter* _filtered_group_rows; - RuntimeProfile::Counter* _filtered_page_rows; - RuntimeProfile::Counter* _filtered_bytes; - RuntimeProfile::Counter* _to_read_bytes; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 34b67dd1e7..f6f8127146 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -454,13 +454,17 @@ Status VFileScanner::_get_next_reader() { const TFileRangeDesc& range = _ranges[_next_range++]; // 1. create file reader + // TODO: Each format requires its own FileReader to achieve a special access mode, + // so create the FileReader inner the format. std::unique_ptr<FileReader> file_reader; - RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, - range, file_reader)); - RETURN_IF_ERROR(file_reader->open()); - if (file_reader->size() == 0) { - file_reader->close(); - continue; + if (_params.format_type != TFileFormatType::FORMAT_PARQUET) { + RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, + range, file_reader)); + RETURN_IF_ERROR(file_reader->open()); + if (file_reader->size() == 0) { + file_reader->close(); + continue; + } } // 2. create reader for specific format @@ -468,10 +472,9 @@ Status VFileScanner::_get_next_reader() { Status init_status; switch (_params.format_type) { case TFileFormatType::FORMAT_PARQUET: { - _cur_reader.reset( - new ParquetReader(_profile, file_reader.release(), _params, range, - _file_col_names, _state->query_options().batch_size, - const_cast<cctz::time_zone*>(&_state->timezone_obj()))); + _cur_reader.reset(new ParquetReader( + _profile, _params, range, _file_col_names, _state->query_options().batch_size, + const_cast<cctz::time_zone*>(&_state->timezone_obj()))); init_status = ((ParquetReader*)(_cur_reader.get()))->init_reader(_colname_to_value_range); break; diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp index 42b15196b7..68a2043d66 100644 --- a/be/test/vec/exec/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp @@ -91,46 +91,45 @@ TEST_F(ParquetReaderTest, normal) { auto slot_descs = desc_tbl->get_tuple_descriptor(0)->slots(); LocalFileReader* reader = new LocalFileReader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); - reader->open(); cctz::time_zone ctz; TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); - // auto tuple_desc = desc_tbl->get_tuple_descriptor(0); + auto tuple_desc = desc_tbl->get_tuple_descriptor(0); std::vector<std::string> column_names; for (int i = 0; i < slot_descs.size(); i++) { column_names.push_back(slot_descs[i]->col_name()); } - // TFileScanRangeParams scan_params; + TFileScanRangeParams scan_params; TFileRangeDesc scan_range; { scan_range.start_offset = 0; scan_range.size = 1000; } - // auto p_reader = - // new ParquetReader(nullptr, reader, scan_params, scan_range, column_names, 992, &ctz); + auto p_reader = new ParquetReader(nullptr, scan_params, scan_range, column_names, 992, &ctz); + p_reader->set_file_reader(reader); RuntimeState runtime_state((TQueryGlobals())); runtime_state.set_desc_tbl(desc_tbl); runtime_state.init_instance_mem_tracker(); - // std::vector<ExprContext*> conjunct_ctxs = std::vector<ExprContext*>(); - // p_reader->init_reader(conjunct_ctxs); - // Block* block = new Block(); - // for (const auto& slot_desc : tuple_desc->slots()) { - // auto data_type = - // vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), true); - // MutableColumnPtr data_column = data_type->create_column(); - // block->insert( - // ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); - // } - // bool eof = false; - // p_reader->get_next_block(block, &eof); - // for (auto& col : block->get_columns_with_type_and_name()) { - // ASSERT_EQ(col.column->size(), 10); - // } - // EXPECT_TRUE(eof); - // delete block; - // delete p_reader; - delete reader; + std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range; + p_reader->init_reader(&colname_to_value_range); + Block* block = new Block(); + for (const auto& slot_desc : tuple_desc->slots()) { + auto data_type = + vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), true); + MutableColumnPtr data_column = data_type->create_column(); + block->insert( + ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); + } + bool eof = false; + p_reader->get_next_block(block, &eof); + for (auto& col : block->get_columns_with_type_and_name()) { + ASSERT_EQ(col.column->size(), 10); + } + EXPECT_TRUE(eof); + delete block; + delete p_reader; } + } // namespace vectorized } // namespace doris diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 4272954214..c18d3099d7 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -36,6 +36,7 @@ #include "vec/exec/format/parquet/vparquet_column_chunk_reader.h" #include "vec/exec/format/parquet/vparquet_column_reader.h" #include "vec/exec/format/parquet/vparquet_file_metadata.h" +#include "vec/exec/format/parquet/vparquet_group_reader.h" namespace doris { namespace vectorized { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org