wsjz commented on code in PR #11601: URL: https://github.com/apache/doris/pull/11601#discussion_r945666374
########## be/src/vec/exec/format/parquet/vparquet_column_reader.cpp: ########## @@ -19,50 +19,82 @@ #include <common/status.h> #include <gen_cpp/parquet_types.h> +#include <vec/columns/columns_number.h> #include "schema_desc.h" #include "vparquet_column_chunk_reader.h" namespace doris::vectorized { -Status ScalarColumnReader::init(const FileReader* file, const FieldSchema* field, - const tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type, - int64_t chunk_size) { - // todo1: init column chunk reader - // BufferedFileStreamReader stream_reader(reader, 0, chunk_size); - // _chunk_reader(&stream_reader, chunk, field); - // _chunk_reader.init(); - return Status(); -} - -Status ParquetColumnReader::create(const FileReader* file, int64_t chunk_size, - const FieldSchema* field, const ParquetReadColumn& column, - const TypeDescriptor& col_type, +Status ParquetColumnReader::create(FileReader* file, FieldSchema* field, + const ParquetReadColumn& column, const tparquet::RowGroup& row_group, - const ParquetColumnReader* reader) { + std::vector<RowRange>& row_ranges, + std::unique_ptr<ParquetColumnReader>& reader) { if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) { return Status::Corruption("not supported type"); } if (field->type.type == TYPE_ARRAY) { return Status::Corruption("not supported array type yet"); } else { + VLOG_DEBUG << "field->physical_column_index: " << field->physical_column_index; + tparquet::ColumnChunk chunk = row_group.columns[field->physical_column_index]; ScalarColumnReader* scalar_reader = new ScalarColumnReader(column); - RETURN_IF_ERROR(scalar_reader->init(file, field, - &row_group.columns[field->physical_column_index], - col_type, chunk_size)); - reader = scalar_reader; + scalar_reader->init_column_metadata(chunk); + RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, row_ranges)); + reader.reset(scalar_reader); } return Status::OK(); } -Status ScalarColumnReader::read_column_data(const tparquet::RowGroup& row_group_meta, - ColumnPtr* data) { - // todo2: read data with chunk reader to load page data - // while (_chunk_reader.has_next) { - // _chunk_reader.next_page(); - // _chunk_reader.load_page_data(); - // } - return Status(); +void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& chunk) { + auto chunk_meta = chunk.meta_data; + int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset + ? chunk_meta.dictionary_page_offset + : chunk_meta.data_page_offset; + size_t chunk_len = chunk_meta.total_compressed_size; + _metadata.reset(new ParquetColumnMetadata(chunk_start, chunk_len, chunk_meta)); +} + +void ParquetColumnReader::_skipped_pages() {} + +Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, + std::vector<RowRange>& row_ranges) { + BufferedFileStreamReader stream_reader(file, _metadata->start_offset(), _metadata->size()); + _row_ranges.reset(&row_ranges); + _chunk_reader.reset(new ColumnChunkReader(&stream_reader, chunk, field)); + _chunk_reader->init(); + return Status::OK(); +} + +Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, + size_t batch_size, int64_t* read_rows, bool* eof) { + if (_chunk_reader->num_values() <= 0) { + // seek to next page header + _chunk_reader->next_page(); + if (_row_ranges->size() != 0) { + _skipped_pages(); + } + // load data to decoder + _chunk_reader->load_page_data(); + } + size_t read_values = + _chunk_reader->num_values() < batch_size ? _chunk_reader->num_values() : batch_size; Review Comment: default size of a block is `batch_size`, I think we should limit it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org