This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f39f57636b [feature-wip](parquet-reader) update column read model and add page index (#11601) f39f57636b is described below commit f39f57636bec019c646b5ed53aa80dff65e59360 Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Tue Aug 16 15:04:07 2022 +0800 [feature-wip](parquet-reader) update column read model and add page index (#11601) --- be/src/exprs/expr_context.h | 6 +- be/src/vec/exec/file_hdfs_scanner.cpp | 55 +++-- be/src/vec/exec/file_hdfs_scanner.h | 19 +- be/src/vec/exec/file_scan_node.cpp | 3 + be/src/vec/exec/format/parquet/parquet_pred_cmp.h | 30 +-- .../vec/exec/format/parquet/parquet_thrift_util.h | 5 - .../parquet/vparquet_column_chunk_reader.cpp | 29 ++- .../format/parquet/vparquet_column_chunk_reader.h | 7 +- .../exec/format/parquet/vparquet_column_reader.cpp | 85 +++++-- .../exec/format/parquet/vparquet_column_reader.h | 64 ++++- .../exec/format/parquet/vparquet_file_metadata.cpp | 2 +- .../exec/format/parquet/vparquet_file_metadata.h | 2 +- .../exec/format/parquet/vparquet_group_reader.cpp | 234 +++--------------- .../exec/format/parquet/vparquet_group_reader.h | 81 +------ .../exec/format/parquet/vparquet_page_index.cpp | 35 +-- .../vec/exec/format/parquet/vparquet_page_index.h | 22 +- .../exec/format/parquet/vparquet_page_reader.cpp | 2 +- .../vec/exec/format/parquet/vparquet_page_reader.h | 2 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 263 +++++++++++++++++---- be/src/vec/exec/format/parquet/vparquet_reader.h | 64 +++-- be/test/vec/exec/parquet/parquet_thrift_test.cpp | 2 +- 21 files changed, 548 insertions(+), 464 deletions(-) diff --git a/be/src/exprs/expr_context.h b/be/src/exprs/expr_context.h index 26d655a2e2..b1df684a9d 100644 --- a/be/src/exprs/expr_context.h +++ b/be/src/exprs/expr_context.h @@ -27,7 +27,7 @@ #include "exprs/expr_value.h" #include "exprs/slot_ref.h" #include "udf/udf.h" -#include "vec/exec/format/parquet/vparquet_group_reader.h" +#include "vec/exec/format/parquet/vparquet_reader.h" #undef USING_DORIS_UDF #define USING_DORIS_UDF using namespace doris_udf @@ -38,7 +38,7 @@ namespace doris { namespace vectorized { class VOlapScanNode; -class RowGroupReader; +class ParquetReader; } // namespace vectorized class Expr; @@ -166,7 +166,7 @@ private: friend class OlapScanNode; friend class EsPredicate; friend class RowGroupReader; - friend class vectorized::RowGroupReader; + friend class vectorized::ParquetReader; friend class vectorized::VOlapScanNode; /// FunctionContexts for each registered expression. The FunctionContexts are created diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp b/be/src/vec/exec/file_hdfs_scanner.cpp index 08c5084b6d..ab6401de8b 100644 --- a/be/src/vec/exec/file_hdfs_scanner.cpp +++ b/be/src/vec/exec/file_hdfs_scanner.cpp @@ -21,43 +21,62 @@ namespace doris::vectorized { +ParquetFileHdfsScanner::ParquetFileHdfsScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector<TFileRangeDesc>& ranges, + const std::vector<TExpr>& pre_filter_texprs, + ScannerCounter* counter) + : HdfsFileScanner(state, profile, params, ranges, pre_filter_texprs, counter) {} + Status ParquetFileHdfsScanner::open() { + RETURN_IF_ERROR(FileScanner::open()); + if (_ranges.empty()) { + return Status::OK(); + } + RETURN_IF_ERROR(_get_next_reader(_next_range)); return Status(); } +void ParquetFileHdfsScanner::_init_profiles(RuntimeProfile* profile) {} + Status ParquetFileHdfsScanner::get_next(vectorized::Block* block, bool* eof) { - // todo: get block from queue - auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); - if (_next_range >= _ranges.size()) { - _scanner_eof = true; + if (_next_range >= _ranges.size() || _scanner_eof) { + *eof = true; return Status::OK(); } - const TFileRangeDesc& range = _ranges[_next_range++]; + RETURN_IF_ERROR(init_block(block)); + bool range_eof = false; + RETURN_IF_ERROR(_reader->read_next_batch(block, &range_eof)); + if (range_eof) { + RETURN_IF_ERROR(_get_next_reader(_next_range++)); + } + return Status::OK(); +} + +Status ParquetFileHdfsScanner::_get_next_reader(int _next_range) { + const TFileRangeDesc& range = _ranges[_next_range]; + _current_range_offset = range.start_offset; std::unique_ptr<FileReader> file_reader; RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range, file_reader)); _reader.reset(new ParquetReader(file_reader.release(), _file_slot_descs.size(), - range.start_offset, range.size)); + _state->query_options().batch_size, range.start_offset, + range.size)); + auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); Status status = _reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, _state->timezone()); if (!status.ok()) { - _scanner_eof = true; - return Status::OK(); - } - while (_reader->has_next()) { - Status st = _reader->read_next_batch(block); - if (st.is_end_of_file()) { - break; + if (status.is_end_of_file()) { + _scanner_eof = true; + return Status::OK(); } + return status; } return Status::OK(); } -void ParquetFileHdfsScanner::close() {} - -void ParquetFileHdfsScanner::_prefetch_batch() { - // 1. call file reader next batch - // 2. push batch to queue, when get_next is called, pop batch +void ParquetFileHdfsScanner::close() { + FileScanner::close(); } } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/file_hdfs_scanner.h b/be/src/vec/exec/file_hdfs_scanner.h index 3db196a83c..e24063c89b 100644 --- a/be/src/vec/exec/file_hdfs_scanner.h +++ b/be/src/vec/exec/file_hdfs_scanner.h @@ -24,21 +24,34 @@ namespace doris::vectorized { -class HdfsFileScanner : public FileScanner {}; +class HdfsFileScanner : public FileScanner { +public: + HdfsFileScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, const std::vector<TFileRangeDesc>& ranges, + const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter) + : FileScanner(state, profile, params, ranges, pre_filter_texprs, counter) {}; +}; class ParquetFileHdfsScanner : public HdfsFileScanner { public: + ParquetFileHdfsScanner(RuntimeState* state, RuntimeProfile* profile, + const TFileScanRangeParams& params, + const std::vector<TFileRangeDesc>& ranges, + const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter); Status open() override; Status get_next(vectorized::Block* block, bool* eof) override; - void close() override; +protected: + void _init_profiles(RuntimeProfile* profile) override; + private: - void _prefetch_batch(); + Status _get_next_reader(int _next_range); private: std::shared_ptr<ParquetReader> _reader; + int64_t _current_range_offset; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp index c653c84a15..ec8a5165ce 100644 --- a/be/src/vec/exec/file_scan_node.cpp +++ b/be/src/vec/exec/file_scan_node.cpp @@ -30,6 +30,7 @@ #include "util/thread.h" #include "util/types.h" #include "vec/exec/file_arrow_scanner.h" +#include "vec/exec/file_hdfs_scanner.h" #include "vec/exec/file_text_scanner.h" #include "vec/exprs/vcompound_pred.h" #include "vec/exprs/vexpr.h" @@ -471,6 +472,8 @@ std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& case TFileFormatType::FORMAT_PARQUET: scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, _pre_filter_texprs, counter); + // scan = new ParquetFileHdfsScanner(_runtime_state, runtime_profile(), scan_range.params, + // scan_range.ranges, _pre_filter_texprs, counter); break; case TFileFormatType::FORMAT_ORC: scan = new VFileORCScanner(_runtime_state, runtime_profile(), scan_range.params, diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h index b07f9afbe9..b58701418e 100644 --- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h +++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h @@ -79,8 +79,8 @@ namespace doris::vectorized { return true; \ } -bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values, - const char* min_bytes, const char* max_bytes) { +bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values, + const char* min_bytes, const char* max_bytes) { switch (conjunct_type) { case TYPE_TINYINT: { _FILTER_GROUP_BY_IN(int8_t, in_pred_values, min_bytes, max_bytes) @@ -125,8 +125,8 @@ bool RowGroupReader::_eval_in_val(PrimitiveType conjunct_type, std::vector<void* return false; } -void RowGroupReader::_eval_in_predicate(ExprContext* ctx, const char* min_bytes, - const char* max_bytes, bool& need_filter) { +void ParquetReader::_eval_in_predicate(ExprContext* ctx, const char* min_bytes, + const char* max_bytes, bool& need_filter) { Expr* conjunct = ctx->root(); std::vector<void*> in_pred_values; const InPredicate* pred = static_cast<const InPredicate*>(conjunct); @@ -150,8 +150,8 @@ void RowGroupReader::_eval_in_predicate(ExprContext* ctx, const char* min_bytes, } } -bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes, - const char* max_bytes) { +bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes, + const char* max_bytes) { switch (conjunct_type) { case TYPE_TINYINT: { _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max) @@ -200,7 +200,7 @@ bool RowGroupReader::_eval_eq(PrimitiveType conjunct_type, void* value, const ch return false; } -bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes) { +bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes) { switch (conjunct_type) { case TYPE_TINYINT: { _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max) @@ -250,7 +250,7 @@ bool RowGroupReader::_eval_gt(PrimitiveType conjunct_type, void* value, const ch return false; } -bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes) { +bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes) { switch (conjunct_type) { case TYPE_TINYINT: { _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max) @@ -300,7 +300,7 @@ bool RowGroupReader::_eval_ge(PrimitiveType conjunct_type, void* value, const ch return false; } -bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes) { +bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes) { switch (conjunct_type) { case TYPE_TINYINT: { _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min) @@ -350,7 +350,7 @@ bool RowGroupReader::_eval_lt(PrimitiveType conjunct_type, void* value, const ch return false; } -bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes) { +bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes) { switch (conjunct_type) { case TYPE_TINYINT: { _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min) @@ -400,8 +400,8 @@ bool RowGroupReader::_eval_le(PrimitiveType conjunct_type, void* value, const ch return false; } -void RowGroupReader::_eval_binary_predicate(ExprContext* ctx, const char* min_bytes, - const char* max_bytes, bool& need_filter) { +void ParquetReader::_eval_binary_predicate(ExprContext* ctx, const char* min_bytes, + const char* max_bytes, bool& need_filter) { Expr* conjunct = ctx->root(); Expr* expr = conjunct->get_child(1); if (expr == nullptr) { @@ -433,9 +433,9 @@ void RowGroupReader::_eval_binary_predicate(ExprContext* ctx, const char* min_by } } -bool RowGroupReader::_determine_filter_row_group(const std::vector<ExprContext*>& conjuncts, - const std::string& encoded_min, - const std::string& encoded_max) { +bool ParquetReader::_determine_filter_min_max(const std::vector<ExprContext*>& conjuncts, + const std::string& encoded_min, + const std::string& encoded_max) { const char* min_bytes = encoded_min.data(); const char* max_bytes = encoded_max.data(); bool need_filter = false; diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h b/be/src/vec/exec/format/parquet/parquet_thrift_util.h index 939500ce97..cb5dc1558b 100644 --- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h +++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h @@ -67,9 +67,4 @@ Status parse_thrift_footer(FileReader* file, std::shared_ptr<FileMetaData>& file RETURN_IF_ERROR(file_metadata->init_schema()); return Status::OK(); } - -// Status parse_page_header() { -// uint8_t* page_buf; -// -// } } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index a0a21b00ca..751780fbae 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -32,6 +32,7 @@ Status ColumnChunkReader::init() { ? _metadata.dictionary_page_offset : _metadata.data_page_offset; size_t chunk_size = _metadata.total_compressed_size; + VLOG_DEBUG << "create _page_reader"; _page_reader = std::make_unique<PageReader>(_stream_reader, start_offset, chunk_size); if (_metadata.__isset.dictionary_page_offset) { @@ -43,12 +44,13 @@ Status ColumnChunkReader::init() { // get the block compression codec RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec)); + VLOG_DEBUG << "initColumnChunkReader finish"; return Status::OK(); } Status ColumnChunkReader::next_page() { - RETURN_IF_ERROR(_page_reader->next_page()); - _num_values = _page_reader->get_page_header()->data_page_header.num_values; + RETURN_IF_ERROR(_page_reader->next_page_header()); + _remaining_num_values = _page_reader->get_page_header()->data_page_header.num_values; return Status::OK(); } @@ -72,12 +74,12 @@ Status ColumnChunkReader::load_page_data() { if (_max_rep_level > 0) { RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data, header.data_page_header.repetition_level_encoding, - _max_rep_level, _num_values)); + _max_rep_level, _remaining_num_values)); } if (_max_def_level > 0) { RETURN_IF_ERROR(_def_level_decoder.init(&_page_data, header.data_page_header.definition_level_encoding, - _max_def_level, _num_values)); + _max_def_level, _remaining_num_values)); } auto encoding = header.data_page_header.encoding; @@ -85,6 +87,7 @@ Status ColumnChunkReader::load_page_data() { if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) { encoding = tparquet::Encoding::RLE_DICTIONARY; } + // Reuse page decoder if (_decoders.find(static_cast<int>(encoding)) != _decoders.end()) { _page_decoder = _decoders[static_cast<int>(encoding)].get(); @@ -104,7 +107,7 @@ Status ColumnChunkReader::load_page_data() { Status ColumnChunkReader::_decode_dict_page() { int64_t dict_offset = _metadata.dictionary_page_offset; _page_reader->seek_to_page(dict_offset); - _page_reader->next_page(); + _page_reader->next_page_header(); const tparquet::PageHeader& header = *_page_reader->get_page_header(); DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type); // TODO(gaoxin): decode dictionary page @@ -119,10 +122,10 @@ void ColumnChunkReader::_reserve_decompress_buf(size_t size) { } Status ColumnChunkReader::skip_values(size_t num_values) { - if (UNLIKELY(_num_values < num_values)) { + if (UNLIKELY(_remaining_num_values < num_values)) { return Status::IOError("Skip too many values in current page"); } - _num_values -= num_values; + _remaining_num_values -= num_values; return _page_decoder->skip_values(num_values); } @@ -138,27 +141,27 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) { Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values) { - if (UNLIKELY(_num_values < num_values)) { + if (UNLIKELY(_remaining_num_values < num_values)) { return Status::IOError("Decode too many values in current page"); } - _num_values -= num_values; + _remaining_num_values -= num_values; return _page_decoder->decode_values(doris_column, data_type, num_values); } Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values) { - if (UNLIKELY(_num_values < num_values)) { + if (UNLIKELY(_remaining_num_values < num_values)) { return Status::IOError("Decode too many values in current page"); } - _num_values -= num_values; + _remaining_num_values -= num_values; return _page_decoder->decode_values(doris_column, data_type, num_values); } Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) { - if (UNLIKELY(_num_values < num_values)) { + if (UNLIKELY(_remaining_num_values < num_values)) { return Status::IOError("Decode too many values in current page"); } - _num_values -= num_values; + _remaining_num_values -= num_values; return _page_decoder->decode_values(slice, num_values); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index f8510d4b37..b248ba0a51 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -85,10 +85,10 @@ public: // and initialize the repetition and definition level decoder for current page data. Status load_page_data(); // The remaining number of values in current page(including null values). Decreased when reading or skipping. - uint32_t num_values() const { return _num_values; }; + uint32_t remaining_num_values() const { return _remaining_num_values; }; // null values are not analyzing from definition levels // the caller should maintain the consistency after analyzing null values from definition levels. - void dec_num_values(uint32_t dec_num) { _num_values -= dec_num; }; + void dec_num_values(uint32_t dec_num) { _remaining_num_values -= dec_num; }; // Get the raw data of current page. Slice& get_page_data() { return _page_data; } @@ -116,6 +116,7 @@ private: FieldSchema* _field_schema; level_t _max_rep_level; level_t _max_def_level; + tparquet::LogicalType _parquet_logical_type; BufferedStreamReader* _stream_reader; // tparquet::ColumnChunk* _column_chunk; @@ -127,7 +128,7 @@ private: LevelDecoder _rep_level_decoder; LevelDecoder _def_level_decoder; - uint32_t _num_values = 0; + uint32_t _remaining_num_values = 0; Slice _page_data; std::unique_ptr<uint8_t[]> _decompress_buf; size_t _decompress_buf_size = 0; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 547dfba3bd..e7b189e40c 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -19,50 +19,83 @@ #include <common/status.h> #include <gen_cpp/parquet_types.h> +#include <vec/columns/columns_number.h> #include "schema_desc.h" #include "vparquet_column_chunk_reader.h" namespace doris::vectorized { -Status ScalarColumnReader::init(const FileReader* file, const FieldSchema* field, - const tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type, - int64_t chunk_size) { - // todo1: init column chunk reader - // BufferedFileStreamReader stream_reader(reader, 0, chunk_size); - // _chunk_reader(&stream_reader, chunk, field); - // _chunk_reader.init(); - return Status(); -} - -Status ParquetColumnReader::create(const FileReader* file, int64_t chunk_size, - const FieldSchema* field, const ParquetReadColumn& column, - const TypeDescriptor& col_type, +Status ParquetColumnReader::create(FileReader* file, FieldSchema* field, + const ParquetReadColumn& column, const tparquet::RowGroup& row_group, - const ParquetColumnReader* reader) { + std::vector<RowRange>& row_ranges, + std::unique_ptr<ParquetColumnReader>& reader) { if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) { return Status::Corruption("not supported type"); } if (field->type.type == TYPE_ARRAY) { return Status::Corruption("not supported array type yet"); } else { + VLOG_DEBUG << "field->physical_column_index: " << field->physical_column_index; + tparquet::ColumnChunk chunk = row_group.columns[field->physical_column_index]; ScalarColumnReader* scalar_reader = new ScalarColumnReader(column); - RETURN_IF_ERROR(scalar_reader->init(file, field, - &row_group.columns[field->physical_column_index], - col_type, chunk_size)); - reader = scalar_reader; + scalar_reader->init_column_metadata(chunk); + RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges)); + reader.reset(scalar_reader); } return Status::OK(); } -Status ScalarColumnReader::read_column_data(const tparquet::RowGroup& row_group_meta, - ColumnPtr* data) { - // todo2: read data with chunk reader to load page data - // while (_chunk_reader.has_next) { - // _chunk_reader.next_page(); - // _chunk_reader.load_page_data(); - // } - return Status(); +void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& chunk) { + auto chunk_meta = chunk.meta_data; + int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset + ? chunk_meta.dictionary_page_offset + : chunk_meta.data_page_offset; + size_t chunk_len = chunk_meta.total_compressed_size; + _metadata.reset(new ParquetColumnMetadata(chunk_start, chunk_len, chunk_meta)); +} + +void ParquetColumnReader::_skipped_pages() {} + +Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, + std::vector<RowRange>& row_ranges) { + BufferedFileStreamReader stream_reader(file, _metadata->start_offset(), _metadata->size()); + _row_ranges.reset(&row_ranges); + _chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field)); + _chunk_reader->init(); + return Status::OK(); +} + +Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, + size_t batch_size, size_t* read_rows, bool* eof) { + if (_chunk_reader->remaining_num_values() <= 0) { + // seek to next page header + _chunk_reader->next_page(); + if (_row_ranges->size() != 0) { + _skipped_pages(); + } + // load data to decoder + _chunk_reader->load_page_data(); + } + size_t read_values = _chunk_reader->remaining_num_values() < batch_size + ? _chunk_reader->remaining_num_values() + : batch_size; + *read_rows = read_values; + WhichDataType which_type(type); + switch (_metadata->t_metadata().type) { + case tparquet::Type::INT32: { + _chunk_reader->decode_values(doris_column, type, read_values); + return Status::OK(); + } + case tparquet::Type::INT64: { + // todo: test int64 + return Status::OK(); + } + default: + return Status::Corruption("unsupported parquet data type"); + } + return Status::OK(); } void ScalarColumnReader::close() {} diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 68a38607e6..696fbe5db0 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -20,36 +20,78 @@ #include <gen_cpp/parquet_types.h> #include "schema_desc.h" +#include "vparquet_column_chunk_reader.h" #include "vparquet_reader.h" -//#include "vparquet_column_chunk_reader.h" namespace doris::vectorized { +struct RowRange; class ParquetReadColumn; +class ParquetColumnMetadata { +public: + ParquetColumnMetadata(int64_t chunk_start_offset, int64_t chunk_length, + tparquet::ColumnMetaData metadata) + : _chunk_start_offset(chunk_start_offset), + _chunk_length(chunk_length), + _metadata(metadata) {}; + + ~ParquetColumnMetadata() = default; + int64_t start_offset() const { return _chunk_start_offset; }; + int64_t size() const { return _chunk_length; }; + tparquet::ColumnMetaData t_metadata() { return _metadata; }; + +private: + int64_t _chunk_start_offset; + int64_t _chunk_length; + tparquet::ColumnMetaData _metadata; +}; + class ParquetColumnReader { public: ParquetColumnReader(const ParquetReadColumn& column) : _column(column) {}; - virtual ~ParquetColumnReader() = 0; - virtual Status read_column_data(const tparquet::RowGroup& row_group_meta, ColumnPtr* data) = 0; - static Status create(const FileReader* file, int64_t chunk_size, const FieldSchema* field, - const ParquetReadColumn& column, const TypeDescriptor& col_type, - const tparquet::RowGroup& row_group, const ParquetColumnReader* reader); + virtual ~ParquetColumnReader() = default; + virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, + size_t* read_rows, bool* eof) = 0; + static Status create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column, + const tparquet::RowGroup& row_group, std::vector<RowRange>& row_ranges, + std::unique_ptr<ParquetColumnReader>& reader); + void init_column_metadata(const tparquet::ColumnChunk& chunk); virtual void close() = 0; +protected: + void _skipped_pages(); + protected: const ParquetReadColumn& _column; - // const ColumnChunkReader& _chunk_reader; + std::unique_ptr<ParquetColumnMetadata> _metadata; + std::unique_ptr<std::vector<RowRange>> _row_ranges; }; class ScalarColumnReader : public ParquetColumnReader { public: ScalarColumnReader(const ParquetReadColumn& column) : ParquetColumnReader(column) {}; ~ScalarColumnReader() override = default; - Status init(const FileReader* file, const FieldSchema* field, - const tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type, - int64_t chunk_size); - Status read_column_data(const tparquet::RowGroup& row_group_meta, ColumnPtr* data) override; + Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, + std::vector<RowRange>& row_ranges); + Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, + size_t* read_rows, bool* eof) override; void close() override; + +private: + std::unique_ptr<ColumnChunkReader> _chunk_reader; }; + +//class ArrayColumnReader : public ParquetColumnReader { +//public: +// ArrayColumnReader(const ParquetReadColumn& column) : ParquetColumnReader(column) {}; +// ~ArrayColumnReader() override = default; +// Status init(FileReader* file, FieldSchema* field, +// tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type, +// int64_t chunk_size); +// Status read_column_data(ColumnPtr* data) override; +// void close() override; +//private: +// std::unique_ptr<ColumnChunkReader> _chunk_reader; +//}; }; // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp index 445ce76318..4e413ec9e9 100644 --- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.cpp @@ -39,7 +39,7 @@ Status FileMetaData::init_schema() { return Status(); } -const tparquet::FileMetaData& FileMetaData::to_thrift_metadata() { +tparquet::FileMetaData& FileMetaData::to_thrift_metadata() { return _metadata; } diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h index 53d08fa855..1f4727242d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h +++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h @@ -27,7 +27,7 @@ public: FileMetaData(tparquet::FileMetaData& metadata); ~FileMetaData() = default; Status init_schema(); - const tparquet::FileMetaData& to_thrift_metadata(); + tparquet::FileMetaData& to_thrift_metadata(); int32_t num_row_groups() const { return _num_groups; } int32_t num_columns() const { return _num_columns; }; int32_t num_rows() const { return _num_rows; }; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 25b7819e8f..751e43863a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -24,235 +24,61 @@ namespace doris::vectorized { RowGroupReader::RowGroupReader(doris::FileReader* file_reader, - const std::shared_ptr<FileMetaData>& file_metadata, const std::vector<ParquetReadColumn>& read_columns, - const std::map<std::string, int>& map_column, - const std::vector<ExprContext*>& conjunct_ctxs) + const int32_t row_group_id, tparquet::RowGroup& row_group) : _file_reader(file_reader), - _file_metadata(file_metadata), _read_columns(read_columns), - _map_column(map_column), - _conjunct_ctxs(conjunct_ctxs), - _current_row_group(-1) {} + _row_group_id(row_group_id), + _row_group_meta(row_group), + _total_rows(row_group.num_rows) {} RowGroupReader::~RowGroupReader() { - for (auto& column_reader : _column_readers) { - auto reader = column_reader.second; - reader->close(); - delete reader; - reader = nullptr; - } _column_readers.clear(); } -Status RowGroupReader::init(const TupleDescriptor* tuple_desc, int64_t split_start_offset, - int64_t split_size) { - _tuple_desc = tuple_desc; - _split_start_offset = split_start_offset; - _split_size = split_size; - _init_conjuncts(tuple_desc, _conjunct_ctxs); - RETURN_IF_ERROR(_init_column_readers()); +Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges) { + VLOG_DEBUG << "Row group id: " << _row_group_id; + RETURN_IF_ERROR(_init_column_readers(schema, row_ranges)); return Status::OK(); } -void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc, - const std::vector<ExprContext*>& conjunct_ctxs) { - if (tuple_desc->slots().empty()) { - return; - } - for (auto& read_col : _read_columns) { - _parquet_column_ids.emplace(read_col.parquet_column_id); - } - - for (int i = 0; i < tuple_desc->slots().size(); i++) { - auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name()); - if (col_iter == _map_column.end()) { - continue; - } - int parquet_col_id = col_iter->second; - if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) { - continue; - } - for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) { - Expr* conjunct = conjunct_ctxs[conj_idx]->root(); - if (conjunct->get_num_children() == 0) { - continue; - } - Expr* raw_slot = conjunct->get_child(0); - if (TExprNodeType::SLOT_REF != raw_slot->node_type()) { - continue; - } - SlotRef* slot_ref = (SlotRef*)raw_slot; - SlotId conjunct_slot_id = slot_ref->slot_id(); - if (conjunct_slot_id == tuple_desc->slots()[i]->id()) { - // Get conjuncts by conjunct_slot_id - auto iter = _slot_conjuncts.find(conjunct_slot_id); - if (_slot_conjuncts.end() == iter) { - std::vector<ExprContext*> conjuncts; - conjuncts.emplace_back(conjunct_ctxs[conj_idx]); - _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, conjuncts)); - } else { - std::vector<ExprContext*> conjuncts = iter->second; - conjuncts.emplace_back(conjunct_ctxs[conj_idx]); - } - } - } - } -} - -Status RowGroupReader::_init_column_readers() { +Status RowGroupReader::_init_column_readers(const FieldDescriptor& schema, + std::vector<RowRange>& row_ranges) { for (auto& read_col : _read_columns) { SlotDescriptor* slot_desc = read_col.slot_desc; - FieldDescriptor schema = _file_metadata->schema(); TypeDescriptor col_type = slot_desc->type(); - const auto& field = schema.get_column(slot_desc->col_name()); - const tparquet::RowGroup row_group = - _file_metadata->to_thrift_metadata().row_groups[_current_row_group]; - ParquetColumnReader* reader = nullptr; - RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, MAX_PARQUET_BLOCK_SIZE, field, - read_col, slot_desc->type(), row_group, - reader)); + auto field = const_cast<FieldSchema*>(schema.get_column(slot_desc->col_name())); + VLOG_DEBUG << "field: " << field->debug_string(); + std::unique_ptr<ParquetColumnReader> reader; + RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, read_col, _row_group_meta, + row_ranges, reader)); if (reader == nullptr) { + VLOG_DEBUG << "Init row group reader failed"; return Status::Corruption("Init row group reader failed"); } - _column_readers[slot_desc->id()] = reader; + _column_readers[slot_desc->id()] = std::move(reader); } return Status::OK(); } -Status RowGroupReader::fill_columns_data(Block* block, const int32_t group_id) { - // get ColumnWithTypeAndName from src_block +Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_eof) { + if (_read_rows >= _total_rows) { + *_batch_eof = true; + } for (auto& read_col : _read_columns) { - const tparquet::RowGroup row_group = - _file_metadata->to_thrift_metadata().row_groups[_current_row_group]; - auto& column_with_type_and_name = block->get_by_name(read_col.slot_desc->col_name()); - RETURN_IF_ERROR(_column_readers[read_col.slot_desc->id()]->read_column_data( - row_group, &column_with_type_and_name.column)); - VLOG_DEBUG << column_with_type_and_name.name; + auto slot_desc = read_col.slot_desc; + auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); + auto column_ptr = column_with_type_and_name.column; + auto column_type = column_with_type_and_name.type; + size_t batch_read_rows = 0; + RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data( + column_ptr, column_type, batch_size, &batch_read_rows, _batch_eof)); + _read_rows += batch_read_rows; + VLOG_DEBUG << "read column: " << column_with_type_and_name.name; + VLOG_DEBUG << "read rows in column: " << batch_read_rows; } // use data fill utils read column data to column ptr return Status::OK(); } -Status RowGroupReader::get_next_row_group(const int32_t* group_id) { - int32_t total_group = _file_metadata->num_row_groups(); - if (total_group == 0 || _file_metadata->num_rows() == 0 || _split_size < 0) { - return Status::EndOfFile("No row group need read"); - } - while (_current_row_group < total_group) { - _current_row_group++; - const tparquet::RowGroup& row_group = - _file_metadata->to_thrift_metadata().row_groups[_current_row_group]; - if (!_is_misaligned_range_group(row_group)) { - continue; - } - bool filter_group = false; - RETURN_IF_ERROR(_process_row_group_filter(row_group, _conjunct_ctxs, &filter_group)); - if (!filter_group) { - group_id = &_current_row_group; - break; - } - } - return Status::OK(); -} - -bool RowGroupReader::_is_misaligned_range_group(const tparquet::RowGroup& row_group) { - int64_t start_offset = _get_column_start_offset(row_group.columns[0].meta_data); - - auto last_column = row_group.columns[row_group.columns.size() - 1].meta_data; - int64_t end_offset = _get_column_start_offset(last_column) + last_column.total_compressed_size; - - int64_t row_group_mid = start_offset + (end_offset - start_offset) / 2; - if (!(row_group_mid >= _split_start_offset && - row_group_mid < _split_start_offset + _split_size)) { - return true; - } - return false; -} - -Status RowGroupReader::_process_row_group_filter(const tparquet::RowGroup& row_group, - const std::vector<ExprContext*>& conjunct_ctxs, - bool* filter_group) { - _process_column_stat_filter(row_group, conjunct_ctxs, filter_group); - _init_chunk_dicts(); - RETURN_IF_ERROR(_process_dict_filter(filter_group)); - _init_bloom_filter(); - RETURN_IF_ERROR(_process_bloom_filter(filter_group)); - return Status::OK(); -} - -Status RowGroupReader::_process_column_stat_filter(const tparquet::RowGroup& row_group, - const std::vector<ExprContext*>& conjunct_ctxs, - bool* filter_group) { - int total_group = _file_metadata->num_row_groups(); - // It will not filter if head_group_offset equals tail_group_offset - int64_t total_rows = 0; - int64_t total_bytes = 0; - for (int row_group_id = 0; row_group_id < total_group; row_group_id++) { - total_rows += row_group.num_rows; - total_bytes += row_group.total_byte_size; - for (SlotId slot_id = 0; slot_id < _tuple_desc->slots().size(); slot_id++) { - const std::string& col_name = _tuple_desc->slots()[slot_id]->col_name(); - auto col_iter = _map_column.find(col_name); - if (col_iter == _map_column.end()) { - continue; - } - int parquet_col_id = col_iter->second; - if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) { - // Column not exist in parquet file - continue; - } - auto slot_iter = _slot_conjuncts.find(slot_id); - if (slot_iter == _slot_conjuncts.end()) { - continue; - } - auto statistic = row_group.columns[parquet_col_id].meta_data.statistics; - if (!statistic.__isset.max || !statistic.__isset.min) { - continue; - } - // Min-max of statistic is plain-encoded value - *filter_group = - _determine_filter_row_group(slot_iter->second, statistic.min, statistic.max); - if (*filter_group) { - _filtered_num_row_groups++; - VLOG_DEBUG << "Filter row group id: " << row_group_id; - break; - } - } - } - VLOG_DEBUG << "DEBUG total_rows: " << total_rows; - VLOG_DEBUG << "DEBUG total_bytes: " << total_bytes; - VLOG_DEBUG << "Parquet file: " << _file_metadata->schema().debug_string() - << ", Num of read row group: " << total_group - << ", and num of skip row group: " << _filtered_num_row_groups; - return Status::OK(); -} - -void RowGroupReader::_init_chunk_dicts() {} - -Status RowGroupReader::_process_dict_filter(bool* filter_group) { - return Status(); -} - -void RowGroupReader::_init_bloom_filter() {} - -Status RowGroupReader::_process_bloom_filter(bool* filter_group) { - RETURN_IF_ERROR(_file_reader->seek(0)); - return Status(); -} - -int64_t RowGroupReader::_get_row_group_start_offset(const tparquet::RowGroup& row_group) { - if (row_group.__isset.file_offset) { - return row_group.file_offset; - } - return row_group.columns[0].meta_data.data_page_offset; -} - -int64_t RowGroupReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) { - if (column.__isset.dictionary_page_offset) { - DCHECK_LT(column.dictionary_page_offset, column.data_page_offset); - return column.dictionary_page_offset; - } - return column.data_page_offset; -} } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index b69852f124..ea9eeed342 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -24,87 +24,30 @@ #include "vparquet_file_metadata.h" #include "vparquet_reader.h" -#define MAX_PARQUET_BLOCK_SIZE 1024 - namespace doris::vectorized { class ParquetReadColumn; class ParquetColumnReader; +struct RowRange; + class RowGroupReader { public: RowGroupReader(doris::FileReader* file_reader, - const std::shared_ptr<FileMetaData>& file_metadata, - const std::vector<ParquetReadColumn>& read_columns, - const std::map<std::string, int>& map_column, - const std::vector<ExprContext*>& conjunct_ctxs); + const std::vector<ParquetReadColumn>& read_columns, const int32_t _row_group_id, + tparquet::RowGroup& row_group); ~RowGroupReader(); - Status init(const TupleDescriptor* tuple_desc, int64_t split_start_offset, int64_t split_size); - Status get_next_row_group(const int32_t* group_id); - Status fill_columns_data(Block* block, const int32_t group_id); + Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges); + Status next_batch(Block* block, size_t batch_size, bool* _batch_eof); private: - bool _is_misaligned_range_group(const tparquet::RowGroup& row_group); - - Status _process_column_stat_filter(const tparquet::RowGroup& row_group, - const std::vector<ExprContext*>& conjunct_ctxs, - bool* filter_group); - - void _init_conjuncts(const TupleDescriptor* tuple_desc, - const std::vector<ExprContext*>& conjunct_ctxs); - - Status _init_column_readers(); - - Status _process_row_group_filter(const tparquet::RowGroup& row_group, - const std::vector<ExprContext*>& conjunct_ctxs, - bool* filter_group); - - void _init_chunk_dicts(); - - Status _process_dict_filter(bool* filter_group); - - void _init_bloom_filter(); - - Status _process_bloom_filter(bool* filter_group); - - int64_t _get_row_group_start_offset(const tparquet::RowGroup& row_group); - int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers); - - bool _determine_filter_row_group(const std::vector<ExprContext*>& conjuncts, - const std::string& encoded_min, - const std::string& encoded_max); - - void _eval_binary_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes, - bool& need_filter); - - void _eval_in_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes, - bool& need_filter); - - bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values, - const char* min_bytes, const char* max_bytes); - - bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes, - const char* max_bytes); - - bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes); - - bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes); - - bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes); - - bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes); + Status _init_column_readers(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges); private: doris::FileReader* _file_reader; - const std::shared_ptr<FileMetaData>& _file_metadata; - std::unordered_map<int32_t, ParquetColumnReader*> _column_readers; - const TupleDescriptor* _tuple_desc; // get all slot info + std::unordered_map<int32_t, std::unique_ptr<ParquetColumnReader>> _column_readers; const std::vector<ParquetReadColumn>& _read_columns; - const std::map<std::string, int>& _map_column; - std::unordered_set<int> _parquet_column_ids; - const std::vector<ExprContext*>& _conjunct_ctxs; - std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts; - int64_t _split_start_offset; - int64_t _split_size; - int32_t _current_row_group; - int32_t _filtered_num_row_groups = 0; + const int32_t _row_group_id; + tparquet::RowGroup& _row_group_meta; + int64_t _read_rows = 0; + int64_t _total_rows; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp index 6365ec2163..40df65ace6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp @@ -21,22 +21,22 @@ namespace doris::vectorized { -PageIndex::~PageIndex() { - if (_column_index != nullptr) { - delete _column_index; - _column_index = nullptr; +Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index, + int total_rows_of_group, int page_idx, + RowRange* row_range) { + const auto& page_locations = offset_index.page_locations; + DCHECK_LT(page_idx, page_locations.size()); + row_range->first_row = page_locations[page_idx].first_row_index; + if (page_idx == page_locations.size() - 1) { + row_range->last_row = total_rows_of_group - 1; + } else { + row_range->last_row = page_locations[page_idx + 1].first_row_index - 1; } - if (_offset_index != nullptr) { - delete _offset_index; - _offset_index = nullptr; - } -} - -Status PageIndex::get_row_range_for_page() { - return Status(); + return Status::OK(); } -Status PageIndex::collect_skipped_page_range() { +Status PageIndex::collect_skipped_page_range(std::vector<ExprContext*> conjuncts, + std::vector<int> page_range) { return Status(); } @@ -67,20 +67,21 @@ bool PageIndex::check_and_get_page_index_ranges(const std::vector<tparquet::Colu return has_page_index; } -Status PageIndex::parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff) { +Status PageIndex::parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff, + tparquet::ColumnIndex* column_index) { int64_t buffer_offset = chunk.column_index_offset - _column_index_start; uint32_t length = chunk.column_index_length; DCHECK_LE(buffer_offset + length, _column_index_size); - RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, true, _column_index)); + RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, true, column_index)); return Status::OK(); } Status PageIndex::parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff, - int64_t buffer_size) { + int64_t buffer_size, tparquet::OffsetIndex* offset_index) { int64_t buffer_offset = chunk.offset_index_offset - _offset_index_start + _column_index_size; uint32_t length = chunk.offset_index_length; DCHECK_LE(buffer_offset + length, buffer_size); - RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, true, _offset_index)); + RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, true, offset_index)); return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h index a26074ab0d..5894a4e8d6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h @@ -19,30 +19,32 @@ #include <common/status.h> #include <gen_cpp/parquet_types.h> +#include "exprs/expr_context.h" + namespace doris::vectorized { +class ParquetReader; +struct RowRange; class PageIndex { public: PageIndex() = default; - ~PageIndex(); - Status get_row_range_for_page(); - Status collect_skipped_page_range(); + ~PageIndex() = default; + Status create_skipped_row_range(tparquet::OffsetIndex& offset_index, int total_rows_of_group, + int page_idx, RowRange* row_range); + Status collect_skipped_page_range(std::vector<ExprContext*> conjuncts, + std::vector<int> page_range); bool check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns); - Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff); + Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff, + tparquet::ColumnIndex* _column_index); Status parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff, - int64_t buffer_size); + int64_t buffer_size, tparquet::OffsetIndex* _offset_index); -private: private: friend class ParquetReader; int64_t _column_index_start; int64_t _column_index_size; int64_t _offset_index_start; int64_t _offset_index_size; - - tparquet::OffsetIndex* _offset_index; - tparquet::ColumnIndex* _column_index; - // row range define }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index f554be169e..94a291f40e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -28,7 +28,7 @@ static constexpr size_t initPageHeaderSize = 1024; PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length) : _reader(reader), _start_offset(offset), _end_offset(offset + length) {} -Status PageReader::next_page() { +Status PageReader::next_page_header() { if (_offset < _start_offset || _offset >= _end_offset) { return Status::IOError("Out-of-bounds Access"); } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h index cf95812ead..256ddd13d1 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h @@ -34,7 +34,7 @@ public: bool has_next_page() const { return _offset < _end_offset; } - Status next_page(); + Status next_page_header(); Status skip_page(); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 815cba3c15..b16df6b557 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -21,13 +21,14 @@ namespace doris::vectorized { ParquetReader::ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, - int64_t range_start_offset, int64_t range_size) + size_t batch_size, int64_t range_start_offset, int64_t range_size) : _num_of_columns_from_file(num_of_columns_from_file), + _batch_size(batch_size), _range_start_offset(range_start_offset), _range_size(range_size) { _file_reader = file_reader; _total_groups = 0; - // _current_group = 0; + _current_row_group_id = 0; // _statistics = std::make_shared<Statistics>(); } @@ -36,6 +37,10 @@ ParquetReader::~ParquetReader() { } void ParquetReader::close() { + for (auto& conjuncts : _slot_conjuncts) { + conjuncts.second.clear(); + } + _slot_conjuncts.clear(); if (_file_reader != nullptr) { _file_reader->close(); delete _file_reader; @@ -45,26 +50,26 @@ void ParquetReader::close() { Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc, const std::vector<SlotDescriptor*>& tuple_slot_descs, - const std::vector<ExprContext*>& conjunct_ctxs, + std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone) { _file_reader->open(); + _conjunct_ctxs.reset(&conjunct_ctxs); RETURN_IF_ERROR(parse_thrift_footer(_file_reader, _file_metadata)); - auto metadata = _file_metadata->to_thrift_metadata(); - - _total_groups = metadata.row_groups.size(); + _t_metadata.reset(&_file_metadata->to_thrift_metadata()); + _total_groups = _file_metadata->num_row_groups(); if (_total_groups == 0) { return Status::EndOfFile("Empty Parquet File"); } auto schema_desc = _file_metadata->schema(); for (int i = 0; i < _file_metadata->num_columns(); ++i) { - LOG(WARNING) << schema_desc.debug_string(); + // for test + VLOG_DEBUG << schema_desc.debug_string(); // Get the Column Reader for the boolean column _map_column.emplace(schema_desc.get_column(i)->name, i); } - LOG(WARNING) << ""; RETURN_IF_ERROR(_init_read_columns(tuple_slot_descs)); RETURN_IF_ERROR( - _init_row_group_reader(tuple_desc, _range_start_offset, _range_size, conjunct_ctxs)); + _init_row_group_readers(tuple_desc, _range_start_offset, _range_size, conjunct_ctxs)); return Status::OK(); } @@ -81,7 +86,7 @@ Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tup } else { std::stringstream str_error; str_error << "Invalid Column Name:" << slot_desc->col_name(); - LOG(WARNING) << str_error.str(); + VLOG_DEBUG << str_error.str(); return Status::InvalidArgument(str_error.str()); } ParquetReadColumn column; @@ -90,63 +95,231 @@ Status ParquetReader::_init_read_columns(const std::vector<SlotDescriptor*>& tup auto physical_type = _file_metadata->schema().get_column(parquet_col_id)->physical_type; column.parquet_type = physical_type; _read_columns.emplace_back(column); + VLOG_DEBUG << "slot_desc " << slot_desc->debug_string(); } return Status::OK(); } -Status ParquetReader::read_next_batch(Block* block) { - int32_t group_id = 0; - RETURN_IF_ERROR(_row_group_reader->get_next_row_group(&group_id)); - auto metadata = _file_metadata->to_thrift_metadata(); - auto column_chunks = metadata.row_groups[group_id].columns; - if (_has_page_index(column_chunks)) { - Status st = _process_page_index(column_chunks); - if (st.ok()) { - // todo: process filter page - return Status::OK(); - } else { - // todo: record profile - LOG(WARNING) << ""; +Status ParquetReader::read_next_batch(Block* block, bool* eof) { + DCHECK(_total_groups == _row_group_readers.size()); + if (_total_groups == 0) { + *eof = true; + } + bool _batch_eof = false; + auto row_group_reader = _row_group_readers[_current_row_group_id]; + RETURN_IF_ERROR(row_group_reader->next_batch(block, _batch_size, &_batch_eof)); + if (_batch_eof) { + _current_row_group_id++; + if (_current_row_group_id > _total_groups) { + *eof = true; + } + } + return Status::OK(); +} + +Status ParquetReader::_init_row_group_readers(const TupleDescriptor* tuple_desc, + int64_t range_start_offset, int64_t range_size, + const std::vector<ExprContext*>& conjunct_ctxs) { + std::vector<int32_t> read_row_groups; + RETURN_IF_ERROR(_filter_row_groups(&read_row_groups)); + _init_conjuncts(tuple_desc, conjunct_ctxs); + for (auto row_group_id : read_row_groups) { + VLOG_DEBUG << "_has_page_index"; + auto row_group = _t_metadata->row_groups[row_group_id]; + auto column_chunks = row_group.columns; + std::vector<RowRange> skipped_row_ranges; + if (_has_page_index(column_chunks)) { + VLOG_DEBUG << "_process_page_index"; + RETURN_IF_ERROR(_process_page_index(row_group, skipped_row_ranges)); } + std::shared_ptr<RowGroupReader> row_group_reader; + row_group_reader.reset( + new RowGroupReader(_file_reader, _read_columns, row_group_id, row_group)); + // todo: can filter row with candidate ranges rather than skipped ranges + RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), skipped_row_ranges)); + _row_group_readers.emplace_back(row_group_reader); } - // metadata has been processed, fill parquet data to block - // block is the batch data of a row group. a row group has N batch - // push to scanner queue - _fill_block_data(block, group_id); + VLOG_DEBUG << "_init_row_group_reader finished"; return Status::OK(); } -void ParquetReader::_fill_block_data(Block* block, int group_id) { - // make and init src block here - // read column chunk - _row_group_reader->fill_columns_data(block, group_id); +void ParquetReader::_init_conjuncts(const TupleDescriptor* tuple_desc, + const std::vector<ExprContext*>& conjunct_ctxs) { + if (tuple_desc->slots().empty()) { + return; + } + std::unordered_set<int> parquet_col_ids(_include_column_ids.begin(), _include_column_ids.end()); + for (int i = 0; i < tuple_desc->slots().size(); i++) { + auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name()); + if (col_iter == _map_column.end()) { + continue; + } + int parquet_col_id = col_iter->second; + if (parquet_col_ids.end() == parquet_col_ids.find(parquet_col_id)) { + continue; + } + for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) { + Expr* conjunct = conjunct_ctxs[conj_idx]->root(); + if (conjunct->get_num_children() == 0) { + continue; + } + Expr* raw_slot = conjunct->get_child(0); + if (TExprNodeType::SLOT_REF != raw_slot->node_type()) { + continue; + } + SlotRef* slot_ref = (SlotRef*)raw_slot; + SlotId conjunct_slot_id = slot_ref->slot_id(); + if (conjunct_slot_id == tuple_desc->slots()[i]->id()) { + // Get conjuncts by conjunct_slot_id + auto iter = _slot_conjuncts.find(conjunct_slot_id); + if (_slot_conjuncts.end() == iter) { + std::vector<ExprContext*> conjuncts; + conjuncts.emplace_back(conjunct_ctxs[conj_idx]); + _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, conjuncts)); + } else { + std::vector<ExprContext*> conjuncts = iter->second; + conjuncts.emplace_back(conjunct_ctxs[conj_idx]); + } + } + } + } } -Status ParquetReader::_init_row_group_reader(const TupleDescriptor* tuple_desc, - int64_t range_start_offset, int64_t range_size, - const std::vector<ExprContext*>& conjunct_ctxs) { - // todo: extract as create() - _row_group_reader.reset(new RowGroupReader(_file_reader, _file_metadata, _read_columns, - _map_column, conjunct_ctxs)); - RETURN_IF_ERROR(_row_group_reader->init(tuple_desc, range_start_offset, range_size)); +Status ParquetReader::_filter_row_groups(std::vector<int32_t>* read_row_group_ids) { + if (_total_groups == 0 || _file_metadata->num_rows() == 0 || _range_size < 0) { + return Status::EndOfFile("No row group need read"); + } + int32_t row_group_idx = -1; + while (row_group_idx < _total_groups) { + row_group_idx++; + const tparquet::RowGroup& row_group = _t_metadata->row_groups[row_group_idx]; + if (_is_misaligned_range_group(row_group)) { + continue; + } + bool filter_group = false; + RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group)); + if (!filter_group) { + read_row_group_ids->emplace_back(row_group_idx); + break; + } + } return Status::OK(); } +bool ParquetReader::_is_misaligned_range_group(const tparquet::RowGroup& row_group) { + int64_t start_offset = _get_column_start_offset(row_group.columns[0].meta_data); + + auto last_column = row_group.columns[row_group.columns.size() - 1].meta_data; + int64_t end_offset = _get_column_start_offset(last_column) + last_column.total_compressed_size; + + int64_t row_group_mid = start_offset + (end_offset - start_offset) / 2; + if (!(row_group_mid >= _range_start_offset && + row_group_mid < _range_start_offset + _range_size)) { + return true; + } + return false; +} + bool ParquetReader::_has_page_index(std::vector<tparquet::ColumnChunk> columns) { _page_index.reset(new PageIndex()); return _page_index->check_and_get_page_index_ranges(columns); } -Status ParquetReader::_process_page_index(std::vector<tparquet::ColumnChunk> columns) { +Status ParquetReader::_process_page_index(tparquet::RowGroup& row_group, + std::vector<RowRange>& skipped_row_ranges) { int64_t buffer_size = _page_index->_column_index_size + _page_index->_offset_index_size; - uint8_t buff[buffer_size]; for (auto col_id : _include_column_ids) { - auto chunk = columns[col_id]; - RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff)); - // todo: use page index filter min/max val - RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff, buffer_size)); - // todo: calculate row range + uint8_t buff[buffer_size]; + auto chunk = row_group.columns[col_id]; + tparquet::ColumnIndex column_index; + RETURN_IF_ERROR(_page_index->parse_column_index(chunk, buff, &column_index)); + VLOG_DEBUG << "_column_index_size : " << _page_index->_column_index_size; + VLOG_DEBUG << "_page_index 0 max_values : " << column_index.max_values[0]; + const int num_of_page = column_index.null_pages.size(); + if (num_of_page <= 1) { + break; + } + auto conjunct_iter = _slot_conjuncts.find(col_id); + if (_slot_conjuncts.end() == conjunct_iter) { + continue; + } + auto conjuncts = conjunct_iter->second; + std::vector<int> candidate_page_range; + _page_index->collect_skipped_page_range(conjuncts, candidate_page_range); + tparquet::OffsetIndex offset_index; + RETURN_IF_ERROR(_page_index->parse_offset_index(chunk, buff, buffer_size, &offset_index)); + VLOG_DEBUG << "page_locations size : " << offset_index.page_locations.size(); + for (int page_id : candidate_page_range) { + RowRange skipped_row_range; + _page_index->create_skipped_row_range(offset_index, row_group.num_rows, page_id, + &skipped_row_range); + skipped_row_ranges.emplace_back(skipped_row_range); + } + } + return Status::OK(); +} + +Status ParquetReader::_process_row_group_filter(const tparquet::RowGroup& row_group, + bool* filter_group) { + _process_column_stat_filter(row_group.columns, filter_group); + _init_chunk_dicts(); + RETURN_IF_ERROR(_process_dict_filter(filter_group)); + _init_bloom_filter(); + RETURN_IF_ERROR(_process_bloom_filter(filter_group)); + return Status::OK(); +} + +Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::ColumnChunk>& columns, + bool* filter_group) { + // It will not filter if head_group_offset equals tail_group_offset + std::unordered_set<int> _parquet_column_ids(_include_column_ids.begin(), + _include_column_ids.end()); + for (SlotId slot_id = 0; slot_id < _tuple_desc->slots().size(); slot_id++) { + auto slot_iter = _slot_conjuncts.find(slot_id); + if (slot_iter == _slot_conjuncts.end()) { + continue; + } + const std::string& col_name = _tuple_desc->slots()[slot_id]->col_name(); + auto col_iter = _map_column.find(col_name); + if (col_iter == _map_column.end()) { + continue; + } + int parquet_col_id = col_iter->second; + if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) { + // Column not exist in parquet file + continue; + } + auto statistic = columns[parquet_col_id].meta_data.statistics; + if (!statistic.__isset.max || !statistic.__isset.min) { + continue; + } + // Min-max of statistic is plain-encoded value + *filter_group = _determine_filter_min_max(slot_iter->second, statistic.min, statistic.max); + if (*filter_group) { + break; + } } return Status::OK(); } + +void ParquetReader::_init_chunk_dicts() {} + +Status ParquetReader::_process_dict_filter(bool* filter_group) { + return Status(); +} + +void ParquetReader::_init_bloom_filter() {} + +Status ParquetReader::_process_bloom_filter(bool* filter_group) { + RETURN_IF_ERROR(_file_reader->seek(0)); + return Status(); +} + +int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) { + if (column.__isset.dictionary_page_offset) { + DCHECK_LT(column.dictionary_page_offset, column.data_page_offset); + return column.dictionary_page_offset; + } + return column.data_page_offset; +} } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index a4cf4e70e2..c1d0ec4247 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -42,6 +42,12 @@ namespace doris::vectorized { // int64_t total_bytes = 0; // }; class RowGroupReader; +class PageIndex; + +struct RowRange { + int64_t first_row; + int64_t last_row; +}; class ParquetReadColumn { public: @@ -58,49 +64,73 @@ private: class ParquetReader { public: - ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, + ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, size_t batch_size, int64_t range_start_offset, int64_t range_size); ~ParquetReader(); Status init_reader(const TupleDescriptor* tuple_desc, const std::vector<SlotDescriptor*>& tuple_slot_descs, - const std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone); - - Status read_next_batch(Block* block); + std::vector<ExprContext*>& conjunct_ctxs, const std::string& timezone); - bool has_next() const { return !_batch_eof; }; + Status read_next_batch(Block* block, bool* eof); - // std::shared_ptr<Statistics>& statistics() { return _statistics; } + // std::shared_ptr<Statistics>& statistics() { return _statistics; } void close(); int64_t size() const { return _file_reader->size(); } private: Status _init_read_columns(const std::vector<SlotDescriptor*>& tuple_slot_descs); - Status _init_row_group_reader(const TupleDescriptor* tuple_desc, int64_t range_start_offset, - int64_t range_size, - const std::vector<ExprContext*>& conjunct_ctxs); - void _fill_block_data(Block* block, int group_id); + Status _init_row_group_readers(const TupleDescriptor* tuple_desc, int64_t range_start_offset, + int64_t range_size, + const std::vector<ExprContext*>& conjunct_ctxs); + void _init_conjuncts(const TupleDescriptor* tuple_desc, + const std::vector<ExprContext*>& conjunct_ctxs); + // Page Index Filter bool _has_page_index(std::vector<tparquet::ColumnChunk> columns); - Status _process_page_index(std::vector<tparquet::ColumnChunk> columns); + Status _process_page_index(tparquet::RowGroup& row_group, + std::vector<RowRange>& skipped_row_ranges); + + // Row Group Filter + bool _is_misaligned_range_group(const tparquet::RowGroup& row_group); + Status _process_column_stat_filter(const std::vector<tparquet::ColumnChunk>& column_meta, + bool* filter_group); + Status _process_row_group_filter(const tparquet::RowGroup& row_group, bool* filter_group); + void _init_chunk_dicts(); + Status _process_dict_filter(bool* filter_group); + void _init_bloom_filter(); + Status _process_bloom_filter(bool* filter_group); + Status _filter_row_groups(std::vector<int32_t>* read_row_group_ids); + int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers); + bool _determine_filter_min_max(const std::vector<ExprContext*>& conjuncts, + const std::string& encoded_min, const std::string& encoded_max); + void _eval_binary_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes, + bool& need_filter); + void _eval_in_predicate(ExprContext* ctx, const char* min_bytes, const char* max_bytes, + bool& need_filter); private: FileReader* _file_reader; std::shared_ptr<FileMetaData> _file_metadata; - std::shared_ptr<RowGroupReader> _row_group_reader; + std::unique_ptr<tparquet::FileMetaData> _t_metadata; std::shared_ptr<PageIndex> _page_index; - int _total_groups; // num of groups(stripes) of a parquet(orc) file - // int _current_group; // current group(stripe) + std::vector<std::shared_ptr<RowGroupReader>> _row_group_readers; + int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file + int32_t _current_row_group_id; // std::shared_ptr<Statistics> _statistics; const int32_t _num_of_columns_from_file; - std::map<std::string, int> _map_column; // column-name <---> column-index - std::vector<int> _include_column_ids; // columns that need to get from file + std::shared_ptr<std::vector<ExprContext*>> _conjunct_ctxs; + std::unordered_map<int, std::vector<ExprContext*>> _slot_conjuncts; + std::vector<int> _include_column_ids; // columns that need to get from file std::vector<ParquetReadColumn> _read_columns; + bool* _file_eof; // parquet file reader object - bool* _batch_eof; + size_t _batch_size; int64_t _range_start_offset; int64_t _range_size; + + const TupleDescriptor* _tuple_desc; // get all slot info }; } // namespace doris::vectorized diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 95df8bd9a2..c334b105ed 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -147,7 +147,7 @@ static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk* // load page data into underlying container chunk_reader.load_page_data(); // decode page data - return chunk_reader.decode_values(doris_column, data_type, chunk_reader.num_values()); + return chunk_reader.decode_values(doris_column, data_type, chunk_reader.remaining_num_values()); } static void create_block(std::unique_ptr<vectorized::Block>& block) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org