This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dec576a991 [feature-wip](parquet-reader) generate null values and NullMap for parquet column (#12115) dec576a991 is described below commit dec576a9918436ec8e80e899651e0f7db879885a Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Mon Aug 29 09:30:32 2022 +0800 [feature-wip](parquet-reader) generate null values and NullMap for parquet column (#12115) Generate null values and NullMap for the nullable column by analyzing the definition levels. --- be/src/common/config.h | 1 + be/src/exec/schema_scanner.cpp | 1 + be/src/vec/exec/file_scan_node.cpp | 11 +- be/src/vec/exec/format/parquet/parquet_common.cpp | 5 +- .../format/parquet/vparquet_column_chunk_reader.h | 7 +- .../exec/format/parquet/vparquet_column_reader.cpp | 62 ++++++- .../exec/format/parquet/vparquet_group_reader.cpp | 4 - .../exec/format/parquet/vparquet_group_reader.h | 1 - be/test/vec/exec/parquet/parquet_thrift_test.cpp | 188 ++++----------------- 9 files changed, 102 insertions(+), 178 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index a293dc64a4..fa84f5e11a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -805,6 +805,7 @@ CONF_Int32(object_pool_buffer_size, "100"); CONF_Int32(parquet_reader_max_buffer_size, "50"); CONF_Bool(parquet_predicate_push_down, "true"); CONF_Int32(parquet_header_max_size, "8388608"); +CONF_Bool(parquet_reader_using_internal, "false"); // When the rows number reached this limit, will check the filter rate the of bloomfilter // if it is lower than a specific threshold, the predicate will be disabled. diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 9071293ff1..fb4623114c 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -162,6 +162,7 @@ Status SchemaScanner::create_tuple_desc(ObjectPool* pool) { t_slot_desc.__set_nullIndicatorBit(-1); } + t_slot_desc.id = i; t_slot_desc.__set_slotIdx(i); t_slot_desc.__set_isMaterialized(true); diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp index e84c0e9371..8dc3cc5222 100644 --- a/be/src/vec/exec/file_scan_node.cpp +++ b/be/src/vec/exec/file_scan_node.cpp @@ -466,10 +466,13 @@ std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& FileScanner* scan = nullptr; switch (scan_range.params.format_type) { case TFileFormatType::FORMAT_PARQUET: - scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params, - scan_range.ranges, _pre_filter_texprs, counter); - // scan = new ParquetFileHdfsScanner(_runtime_state, runtime_profile(), scan_range.params, - // scan_range.ranges, _pre_filter_texprs, counter); + if (config::parquet_reader_using_internal) { + scan = new ParquetFileHdfsScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, _pre_filter_texprs, counter); + } else { + scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, _pre_filter_texprs, counter); + } break; case TFileFormatType::FORMAT_ORC: scan = new VFileORCScanner(_runtime_state, runtime_profile(), scan_range.params, diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp index 347db41d86..26c1ab7735 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.cpp +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -258,8 +258,7 @@ Status FixLengthDecoder::decode_values(MutableColumnPtr& doris_column, DataTypeP } return Status::InvalidArgument("Can't decode parquet physical type {} to doris logical type {}", - tparquet::to_string(_physical_type), - getTypeName(data_type->get_type_id())); + tparquet::to_string(_physical_type), getTypeName(logical_type)); } Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) { @@ -351,7 +350,7 @@ Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column, DataTypeP } return Status::InvalidArgument( "Can't decode parquet physical type BYTE_ARRAY to doris logical type {}", - getTypeName(data_type->get_type_id())); + getTypeName(logical_type)); } Status BoolPlainDecoder::skip_values(size_t num_values) { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 79fdc204dc..4319ba6689 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -105,6 +105,9 @@ public: // Get the definition level decoder of current page. LevelDecoder& def_level_decoder() { return _def_level_decoder; } + level_t max_rep_level() const { return _max_rep_level; } + level_t max_def_level() const { return _max_def_level; } + // Get page decoder Decoder* get_page_decoder() { return _page_decoder; } @@ -119,9 +122,7 @@ private: tparquet::LogicalType _parquet_logical_type; BufferedStreamReader* _stream_reader; - // tparquet::ColumnChunk* _column_chunk; - tparquet::ColumnMetaData& _metadata; - // FieldSchema* _field_schema; + tparquet::ColumnMetaData _metadata; cctz::time_zone* _ctz; std::unique_ptr<PageReader> _page_reader = nullptr; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 66d9793ab5..9cdf60f378 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -63,18 +63,17 @@ Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet:: new BufferedFileStreamReader(file, _metadata->start_offset(), _metadata->size()); _row_ranges = &row_ranges; _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field, _ctz)); - RETURN_IF_ERROR(_chunk_reader->init()); - RETURN_IF_ERROR(_chunk_reader->next_page()); - if (_row_ranges->size() != 0) { - _skipped_pages(); - } - RETURN_IF_ERROR(_chunk_reader->load_page_data()); - return Status::OK(); + return _chunk_reader->init(); } Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, size_t* read_rows, bool* eof) { - if (_chunk_reader->remaining_num_values() <= 0) { + if (_chunk_reader->remaining_num_values() == 0) { + if (!_chunk_reader->has_next_page()) { + *eof = true; + *read_rows = 0; + return Status::OK(); + } RETURN_IF_ERROR(_chunk_reader->next_page()); if (_row_ranges->size() != 0) { _skipped_pages(); @@ -84,8 +83,53 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr size_t read_values = _chunk_reader->remaining_num_values() < batch_size ? _chunk_reader->remaining_num_values() : batch_size; - RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type, read_values)); + // get definition levels, and generate null values + level_t definitions[read_values]; + if (_chunk_reader->max_def_level() == 0) { // required field + std::fill(definitions, definitions + read_values, 1); + } else { + _chunk_reader->get_def_levels(definitions, read_values); + } + // fill NullMap + CHECK(doris_column->is_nullable()); + auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>( + (*std::move(doris_column)).mutate().get()); + NullMap& map_data = nullable_column->get_null_map_data(); + for (int i = 0; i < read_values; ++i) { + map_data.emplace_back(definitions[i] == 0); + } + // decode data + if (_chunk_reader->max_def_level() == 0) { + RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type, read_values)); + } else if (read_values > 0) { + // column with null values + level_t level_type = definitions[0]; + int num_values = 1; + for (int i = 1; i < read_values; ++i) { + if (definitions[i] != level_type) { + if (level_type == 0) { + // null values + _chunk_reader->insert_null_values(doris_column, num_values); + } else { + RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type, num_values)); + } + level_type = definitions[i]; + num_values = 1; + } else { + num_values++; + } + } + if (level_type == 0) { + // null values + _chunk_reader->insert_null_values(doris_column, num_values); + } else { + RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type, num_values)); + } + } *read_rows = read_values; + if (_chunk_reader->remaining_num_values() == 0 && !_chunk_reader->has_next_page()) { + *eof = true; + } return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 7443434cfb..0f6b0d084b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -31,7 +31,6 @@ RowGroupReader::RowGroupReader(doris::FileReader* file_reader, _read_columns(read_columns), _row_group_id(row_group_id), _row_group_meta(row_group), - _total_rows(row_group.num_rows), _ctz(ctz) {} RowGroupReader::~RowGroupReader() { @@ -72,9 +71,6 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, bool* _batch_ RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data( column_ptr, column_type, batch_size, &batch_read_rows, _batch_eof)); _read_rows += batch_read_rows; - if (_read_rows >= _total_rows) { - *_batch_eof = true; - } } // use data fill utils read column data to column ptr return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 5ed99cd4e3..57c72d4863 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -48,7 +48,6 @@ private: const int32_t _row_group_id; tparquet::RowGroup& _row_group_meta; int64_t _read_rows = 0; - int64_t _total_rows; cctz::time_zone* _ctz; }; } // namespace doris::vectorized diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 75cc087d12..3bcc125178 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -329,158 +329,31 @@ TEST_F(ParquetThriftReaderTest, dict_decoder) { "./be/test/exec/test_data/parquet_scanner/dict-decoder.txt", 12); } -TEST_F(ParquetThriftReaderTest, column_reader) { - LocalFileReader file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0); - auto st = file_reader.open(); - EXPECT_TRUE(st.ok()); - - // prepare metadata - std::shared_ptr<FileMetaData> meta_data; - parse_thrift_footer(&file_reader, meta_data); - tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata(); - - FieldDescriptor schema_descriptor; - // todo use schema of meta_data - schema_descriptor.parse_from_thrift(t_metadata.schema); - // create scalar column reader - std::unique_ptr<ParquetColumnReader> reader; - auto field = const_cast<FieldSchema*>(schema_descriptor.get_column(0)); - // create read model - TDescriptorTable t_desc_table; - // table descriptors - TTableDescriptor t_table_desc; - cctz::time_zone ctz; - TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); - - t_table_desc.id = 0; - t_table_desc.tableType = TTableType::OLAP_TABLE; - t_table_desc.numCols = 0; - t_table_desc.numClusteringCols = 0; - t_desc_table.tableDescriptors.push_back(t_table_desc); - t_desc_table.__isset.tableDescriptors = true; - TSlotDescriptor tslot_desc; - { - tslot_desc.id = 0; - tslot_desc.parent = 0; - TTypeDesc type; - { - TTypeNode node; - node.__set_type(TTypeNodeType::SCALAR); - TScalarType scalar_type; - scalar_type.__set_type(TPrimitiveType::TINYINT); - node.__set_scalar_type(scalar_type); - type.types.push_back(node); - } - tslot_desc.slotType = type; - tslot_desc.columnPos = 0; - tslot_desc.byteOffset = 0; - tslot_desc.nullIndicatorByte = 0; - tslot_desc.nullIndicatorBit = -1; - tslot_desc.colName = "tinyint_col"; - tslot_desc.slotIdx = 0; - tslot_desc.isMaterialized = true; - t_desc_table.slotDescriptors.push_back(tslot_desc); - } - t_desc_table.__isset.slotDescriptors = true; - { - // TTupleDescriptor dest - TTupleDescriptor t_tuple_desc; - t_tuple_desc.id = 0; - t_tuple_desc.byteSize = 16; - t_tuple_desc.numNullBytes = 0; - t_tuple_desc.tableId = 0; - t_tuple_desc.__isset.tableId = true; - t_desc_table.tupleDescriptors.push_back(t_tuple_desc); - } - DescriptorTbl* desc_tbl; - ObjectPool obj_pool; - DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl); - auto slot_desc = desc_tbl->get_slot_descriptor(0); - ParquetReadColumn column(slot_desc); - std::vector<RowRange> row_ranges = std::vector<RowRange>(); - ParquetColumnReader::create(&file_reader, field, column, t_metadata.row_groups[0], row_ranges, - &ctz, reader); - std::unique_ptr<vectorized::Block> block; - create_block(block); - auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); - auto& column_ptr = column_with_type_and_name.column; - auto& column_type = column_with_type_and_name.type; - size_t batch_read_rows = 0; - bool batch_eof = false; - ASSERT_EQ(column_ptr->size(), 0); - - reader->read_column_data(column_ptr, column_type, 1024, &batch_read_rows, &batch_eof); - EXPECT_TRUE(!batch_eof); - ASSERT_EQ(batch_read_rows, 10); - ASSERT_EQ(column_ptr->size(), 10); - - auto* nullable_column = - reinterpret_cast<vectorized::ColumnNullable*>((*std::move(column_ptr)).mutate().get()); - MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr(); - int int_sum = 0; - for (int i = 0; i < column_ptr->size(); i++) { - int_sum += (int8_t)column_ptr->get64(i); - } - ASSERT_EQ(int_sum, 5); -} - TEST_F(ParquetThriftReaderTest, group_reader) { - TDescriptorTable t_desc_table; - TTableDescriptor t_table_desc; - std::vector<std::string> int_types = {"boolean_col", "tinyint_col", "smallint_col", "int_col", - "bigint_col", "float_col", "double_col"}; - // "string_col" - t_table_desc.id = 0; - t_table_desc.tableType = TTableType::OLAP_TABLE; - t_table_desc.numCols = 0; - t_table_desc.numClusteringCols = 0; - t_desc_table.tableDescriptors.push_back(t_table_desc); - t_desc_table.__isset.tableDescriptors = true; - - for (int i = 0; i < int_types.size(); i++) { - TSlotDescriptor tslot_desc; - { - tslot_desc.id = i; - tslot_desc.parent = 0; - TTypeDesc type; - { - TTypeNode node; - node.__set_type(TTypeNodeType::SCALAR); - TScalarType scalar_type; - scalar_type.__set_type(TPrimitiveType::type(i + 2)); - node.__set_scalar_type(scalar_type); - type.types.push_back(node); - } - tslot_desc.slotType = type; - tslot_desc.columnPos = 0; - tslot_desc.byteOffset = 0; - tslot_desc.nullIndicatorByte = 0; - tslot_desc.nullIndicatorBit = -1; - tslot_desc.colName = int_types[i]; - tslot_desc.slotIdx = 0; - tslot_desc.isMaterialized = true; - t_desc_table.slotDescriptors.push_back(tslot_desc); - } - } - - t_desc_table.__isset.slotDescriptors = true; - { - // TTupleDescriptor dest - TTupleDescriptor t_tuple_desc; - t_tuple_desc.id = 0; - t_tuple_desc.byteSize = 16; - t_tuple_desc.numNullBytes = 0; - t_tuple_desc.tableId = 0; - t_tuple_desc.__isset.tableId = true; - t_desc_table.tupleDescriptors.push_back(t_tuple_desc); - } - DescriptorTbl* desc_tbl; - ObjectPool obj_pool; - DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl); + SchemaScanner::ColumnDesc column_descs[] = { + {"tinyint_col", TYPE_TINYINT, sizeof(int8_t), true}, + {"smallint_col", TYPE_SMALLINT, sizeof(int16_t), true}, + {"int_col", TYPE_INT, sizeof(int32_t), true}, + {"bigint_col", TYPE_BIGINT, sizeof(int64_t), true}, + {"boolean_col", TYPE_BOOLEAN, sizeof(bool), true}, + {"float_col", TYPE_FLOAT, sizeof(float_t), true}, + {"double_col", TYPE_DOUBLE, sizeof(double_t), true}, + {"string_col", TYPE_STRING, sizeof(StringValue), true}, + {"binary_col", TYPE_STRING, sizeof(StringValue), true}, + {"timestamp_col", TYPE_DATETIME, sizeof(DateTimeValue), true}, + {"decimal_col", TYPE_DECIMALV2, sizeof(DecimalV2Value), true}, + {"char_col", TYPE_CHAR, sizeof(StringValue), true}, + {"varchar_col", TYPE_VARCHAR, sizeof(StringValue), true}, + {"date_col", TYPE_DATE, sizeof(DateTimeValue), true}}; + int num_cols = sizeof(column_descs) / sizeof(SchemaScanner::ColumnDesc); + SchemaScanner schema_scanner(column_descs, num_cols); + ObjectPool object_pool; + SchemaScannerParam param; + schema_scanner.init(¶m, &object_pool); + auto tuple_slots = const_cast<TupleDescriptor*>(schema_scanner.tuple_desc())->slots(); std::vector<ParquetReadColumn> read_columns; - for (int i = 0; i < int_types.size(); i++) { - auto slot_desc = desc_tbl->get_slot_descriptor(i); - ParquetReadColumn column(slot_desc); + for (const auto& slot : tuple_slots) { + ParquetReadColumn column(slot); read_columns.emplace_back(column); } @@ -502,12 +375,19 @@ TEST_F(ParquetThriftReaderTest, group_reader) { auto stg = row_group_reader->init(meta_data->schema(), row_ranges); EXPECT_TRUE(stg.ok()); - std::unique_ptr<vectorized::Block> block; - create_block(block); + vectorized::Block block; + for (const auto& slot_desc : tuple_slots) { + auto is_nullable = slot_desc->is_nullable(); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(), + is_nullable); + MutableColumnPtr data_column = data_type->create_column(); + block.insert( + ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); + } bool batch_eof = false; - auto stb = row_group_reader->next_batch(block.get(), 1024, &batch_eof); + auto stb = row_group_reader->next_batch(&block, 1024, &batch_eof); EXPECT_TRUE(stb.ok()); - LOG(WARNING) << "block data: " << block->dump_structure(); + LOG(WARNING) << "block data: " << block.dump_data(0, 10); } } // namespace vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org