This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dec576a991 [feature-wip](parquet-reader) generate null values and 
NullMap for parquet column (#12115)
dec576a991 is described below

commit dec576a9918436ec8e80e899651e0f7db879885a
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Mon Aug 29 09:30:32 2022 +0800

    [feature-wip](parquet-reader) generate null values and NullMap for parquet 
column (#12115)
    
    Generate null values and NullMap for the nullable column by analyzing the 
definition levels.
---
 be/src/common/config.h                             |   1 +
 be/src/exec/schema_scanner.cpp                     |   1 +
 be/src/vec/exec/file_scan_node.cpp                 |  11 +-
 be/src/vec/exec/format/parquet/parquet_common.cpp  |   5 +-
 .../format/parquet/vparquet_column_chunk_reader.h  |   7 +-
 .../exec/format/parquet/vparquet_column_reader.cpp |  62 ++++++-
 .../exec/format/parquet/vparquet_group_reader.cpp  |   4 -
 .../exec/format/parquet/vparquet_group_reader.h    |   1 -
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   | 188 ++++-----------------
 9 files changed, 102 insertions(+), 178 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index a293dc64a4..fa84f5e11a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -805,6 +805,7 @@ CONF_Int32(object_pool_buffer_size, "100");
 CONF_Int32(parquet_reader_max_buffer_size, "50");
 CONF_Bool(parquet_predicate_push_down, "true");
 CONF_Int32(parquet_header_max_size, "8388608");
+CONF_Bool(parquet_reader_using_internal, "false");
 
 // When the rows number reached this limit, will check the filter rate the of 
bloomfilter
 // if it is lower than a specific threshold, the predicate will be disabled.
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index 9071293ff1..fb4623114c 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -162,6 +162,7 @@ Status SchemaScanner::create_tuple_desc(ObjectPool* pool) {
             t_slot_desc.__set_nullIndicatorBit(-1);
         }
 
+        t_slot_desc.id = i;
         t_slot_desc.__set_slotIdx(i);
         t_slot_desc.__set_isMaterialized(true);
 
diff --git a/be/src/vec/exec/file_scan_node.cpp 
b/be/src/vec/exec/file_scan_node.cpp
index e84c0e9371..8dc3cc5222 100644
--- a/be/src/vec/exec/file_scan_node.cpp
+++ b/be/src/vec/exec/file_scan_node.cpp
@@ -466,10 +466,13 @@ std::unique_ptr<FileScanner> 
FileScanNode::create_scanner(const TFileScanRange&
     FileScanner* scan = nullptr;
     switch (scan_range.params.format_type) {
     case TFileFormatType::FORMAT_PARQUET:
-        scan = new VFileParquetScanner(_runtime_state, runtime_profile(), 
scan_range.params,
-                                       scan_range.ranges, _pre_filter_texprs, 
counter);
-        //        scan = new ParquetFileHdfsScanner(_runtime_state, 
runtime_profile(), scan_range.params,
-        //                                       scan_range.ranges, 
_pre_filter_texprs, counter);
+        if (config::parquet_reader_using_internal) {
+            scan = new ParquetFileHdfsScanner(_runtime_state, 
runtime_profile(), scan_range.params,
+                                              scan_range.ranges, 
_pre_filter_texprs, counter);
+        } else {
+            scan = new VFileParquetScanner(_runtime_state, runtime_profile(), 
scan_range.params,
+                                           scan_range.ranges, 
_pre_filter_texprs, counter);
+        }
         break;
     case TFileFormatType::FORMAT_ORC:
         scan = new VFileORCScanner(_runtime_state, runtime_profile(), 
scan_range.params,
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp 
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 347db41d86..26c1ab7735 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -258,8 +258,7 @@ Status FixLengthDecoder::decode_values(MutableColumnPtr& 
doris_column, DataTypeP
     }
 
     return Status::InvalidArgument("Can't decode parquet physical type {} to 
doris logical type {}",
-                                   tparquet::to_string(_physical_type),
-                                   getTypeName(data_type->get_type_id()));
+                                   tparquet::to_string(_physical_type), 
getTypeName(logical_type));
 }
 
 Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, size_t 
dict_size) {
@@ -351,7 +350,7 @@ Status ByteArrayDecoder::decode_values(MutableColumnPtr& 
doris_column, DataTypeP
     }
     return Status::InvalidArgument(
             "Can't decode parquet physical type BYTE_ARRAY to doris logical 
type {}",
-            getTypeName(data_type->get_type_id()));
+            getTypeName(logical_type));
 }
 
 Status BoolPlainDecoder::skip_values(size_t num_values) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index 79fdc204dc..4319ba6689 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -105,6 +105,9 @@ public:
     // Get the definition level decoder of current page.
     LevelDecoder& def_level_decoder() { return _def_level_decoder; }
 
+    level_t max_rep_level() const { return _max_rep_level; }
+    level_t max_def_level() const { return _max_def_level; }
+
     // Get page decoder
     Decoder* get_page_decoder() { return _page_decoder; }
 
@@ -119,9 +122,7 @@ private:
     tparquet::LogicalType _parquet_logical_type;
 
     BufferedStreamReader* _stream_reader;
-    // tparquet::ColumnChunk* _column_chunk;
-    tparquet::ColumnMetaData& _metadata;
-    // FieldSchema* _field_schema;
+    tparquet::ColumnMetaData _metadata;
     cctz::time_zone* _ctz;
 
     std::unique_ptr<PageReader> _page_reader = nullptr;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 66d9793ab5..9cdf60f378 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -63,18 +63,17 @@ Status ScalarColumnReader::init(FileReader* file, 
FieldSchema* field, tparquet::
             new BufferedFileStreamReader(file, _metadata->start_offset(), 
_metadata->size());
     _row_ranges = &row_ranges;
     _chunk_reader.reset(new ColumnChunkReader(_stream_reader, chunk, field, 
_ctz));
-    RETURN_IF_ERROR(_chunk_reader->init());
-    RETURN_IF_ERROR(_chunk_reader->next_page());
-    if (_row_ranges->size() != 0) {
-        _skipped_pages();
-    }
-    RETURN_IF_ERROR(_chunk_reader->load_page_data());
-    return Status::OK();
+    return _chunk_reader->init();
 }
 
 Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, 
DataTypePtr& type,
                                             size_t batch_size, size_t* 
read_rows, bool* eof) {
-    if (_chunk_reader->remaining_num_values() <= 0) {
+    if (_chunk_reader->remaining_num_values() == 0) {
+        if (!_chunk_reader->has_next_page()) {
+            *eof = true;
+            *read_rows = 0;
+            return Status::OK();
+        }
         RETURN_IF_ERROR(_chunk_reader->next_page());
         if (_row_ranges->size() != 0) {
             _skipped_pages();
@@ -84,8 +83,53 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
     size_t read_values = _chunk_reader->remaining_num_values() < batch_size
                                  ? _chunk_reader->remaining_num_values()
                                  : batch_size;
-    RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type, 
read_values));
+    // get definition levels, and generate null values
+    level_t definitions[read_values];
+    if (_chunk_reader->max_def_level() == 0) { // required field
+        std::fill(definitions, definitions + read_values, 1);
+    } else {
+        _chunk_reader->get_def_levels(definitions, read_values);
+    }
+    // fill NullMap
+    CHECK(doris_column->is_nullable());
+    auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
+            (*std::move(doris_column)).mutate().get());
+    NullMap& map_data = nullable_column->get_null_map_data();
+    for (int i = 0; i < read_values; ++i) {
+        map_data.emplace_back(definitions[i] == 0);
+    }
+    // decode data
+    if (_chunk_reader->max_def_level() == 0) {
+        RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type, 
read_values));
+    } else if (read_values > 0) {
+        // column with null values
+        level_t level_type = definitions[0];
+        int num_values = 1;
+        for (int i = 1; i < read_values; ++i) {
+            if (definitions[i] != level_type) {
+                if (level_type == 0) {
+                    // null values
+                    _chunk_reader->insert_null_values(doris_column, 
num_values);
+                } else {
+                    RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, 
type, num_values));
+                }
+                level_type = definitions[i];
+                num_values = 1;
+            } else {
+                num_values++;
+            }
+        }
+        if (level_type == 0) {
+            // null values
+            _chunk_reader->insert_null_values(doris_column, num_values);
+        } else {
+            RETURN_IF_ERROR(_chunk_reader->decode_values(doris_column, type, 
num_values));
+        }
+    }
     *read_rows = read_values;
+    if (_chunk_reader->remaining_num_values() == 0 && 
!_chunk_reader->has_next_page()) {
+        *eof = true;
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 7443434cfb..0f6b0d084b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -31,7 +31,6 @@ RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
           _read_columns(read_columns),
           _row_group_id(row_group_id),
           _row_group_meta(row_group),
-          _total_rows(row_group.num_rows),
           _ctz(ctz) {}
 
 RowGroupReader::~RowGroupReader() {
@@ -72,9 +71,6 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, bool* _batch_
         RETURN_IF_ERROR(_column_readers[slot_desc->id()]->read_column_data(
                 column_ptr, column_type, batch_size, &batch_read_rows, 
_batch_eof));
         _read_rows += batch_read_rows;
-        if (_read_rows >= _total_rows) {
-            *_batch_eof = true;
-        }
     }
     // use data fill utils read column data to column ptr
     return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 5ed99cd4e3..57c72d4863 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -48,7 +48,6 @@ private:
     const int32_t _row_group_id;
     tparquet::RowGroup& _row_group_meta;
     int64_t _read_rows = 0;
-    int64_t _total_rows;
     cctz::time_zone* _ctz;
 };
 } // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index 75cc087d12..3bcc125178 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -329,158 +329,31 @@ TEST_F(ParquetThriftReaderTest, dict_decoder) {
                                 
"./be/test/exec/test_data/parquet_scanner/dict-decoder.txt", 12);
 }
 
-TEST_F(ParquetThriftReaderTest, column_reader) {
-    LocalFileReader 
file_reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
-    auto st = file_reader.open();
-    EXPECT_TRUE(st.ok());
-
-    // prepare metadata
-    std::shared_ptr<FileMetaData> meta_data;
-    parse_thrift_footer(&file_reader, meta_data);
-    tparquet::FileMetaData t_metadata = meta_data->to_thrift_metadata();
-
-    FieldDescriptor schema_descriptor;
-    // todo use schema of meta_data
-    schema_descriptor.parse_from_thrift(t_metadata.schema);
-    // create scalar column reader
-    std::unique_ptr<ParquetColumnReader> reader;
-    auto field = const_cast<FieldSchema*>(schema_descriptor.get_column(0));
-    // create read model
-    TDescriptorTable t_desc_table;
-    // table descriptors
-    TTableDescriptor t_table_desc;
-    cctz::time_zone ctz;
-    TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
-
-    t_table_desc.id = 0;
-    t_table_desc.tableType = TTableType::OLAP_TABLE;
-    t_table_desc.numCols = 0;
-    t_table_desc.numClusteringCols = 0;
-    t_desc_table.tableDescriptors.push_back(t_table_desc);
-    t_desc_table.__isset.tableDescriptors = true;
-    TSlotDescriptor tslot_desc;
-    {
-        tslot_desc.id = 0;
-        tslot_desc.parent = 0;
-        TTypeDesc type;
-        {
-            TTypeNode node;
-            node.__set_type(TTypeNodeType::SCALAR);
-            TScalarType scalar_type;
-            scalar_type.__set_type(TPrimitiveType::TINYINT);
-            node.__set_scalar_type(scalar_type);
-            type.types.push_back(node);
-        }
-        tslot_desc.slotType = type;
-        tslot_desc.columnPos = 0;
-        tslot_desc.byteOffset = 0;
-        tslot_desc.nullIndicatorByte = 0;
-        tslot_desc.nullIndicatorBit = -1;
-        tslot_desc.colName = "tinyint_col";
-        tslot_desc.slotIdx = 0;
-        tslot_desc.isMaterialized = true;
-        t_desc_table.slotDescriptors.push_back(tslot_desc);
-    }
-    t_desc_table.__isset.slotDescriptors = true;
-    {
-        // TTupleDescriptor dest
-        TTupleDescriptor t_tuple_desc;
-        t_tuple_desc.id = 0;
-        t_tuple_desc.byteSize = 16;
-        t_tuple_desc.numNullBytes = 0;
-        t_tuple_desc.tableId = 0;
-        t_tuple_desc.__isset.tableId = true;
-        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
-    }
-    DescriptorTbl* desc_tbl;
-    ObjectPool obj_pool;
-    DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
-    auto slot_desc = desc_tbl->get_slot_descriptor(0);
-    ParquetReadColumn column(slot_desc);
-    std::vector<RowRange> row_ranges = std::vector<RowRange>();
-    ParquetColumnReader::create(&file_reader, field, column, 
t_metadata.row_groups[0], row_ranges,
-                                &ctz, reader);
-    std::unique_ptr<vectorized::Block> block;
-    create_block(block);
-    auto& column_with_type_and_name = 
block->get_by_name(slot_desc->col_name());
-    auto& column_ptr = column_with_type_and_name.column;
-    auto& column_type = column_with_type_and_name.type;
-    size_t batch_read_rows = 0;
-    bool batch_eof = false;
-    ASSERT_EQ(column_ptr->size(), 0);
-
-    reader->read_column_data(column_ptr, column_type, 1024, &batch_read_rows, 
&batch_eof);
-    EXPECT_TRUE(!batch_eof);
-    ASSERT_EQ(batch_read_rows, 10);
-    ASSERT_EQ(column_ptr->size(), 10);
-
-    auto* nullable_column =
-            
reinterpret_cast<vectorized::ColumnNullable*>((*std::move(column_ptr)).mutate().get());
-    MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
-    int int_sum = 0;
-    for (int i = 0; i < column_ptr->size(); i++) {
-        int_sum += (int8_t)column_ptr->get64(i);
-    }
-    ASSERT_EQ(int_sum, 5);
-}
-
 TEST_F(ParquetThriftReaderTest, group_reader) {
-    TDescriptorTable t_desc_table;
-    TTableDescriptor t_table_desc;
-    std::vector<std::string> int_types = {"boolean_col", "tinyint_col", 
"smallint_col", "int_col",
-                                          "bigint_col",  "float_col",   
"double_col"};
-    //        "string_col"
-    t_table_desc.id = 0;
-    t_table_desc.tableType = TTableType::OLAP_TABLE;
-    t_table_desc.numCols = 0;
-    t_table_desc.numClusteringCols = 0;
-    t_desc_table.tableDescriptors.push_back(t_table_desc);
-    t_desc_table.__isset.tableDescriptors = true;
-
-    for (int i = 0; i < int_types.size(); i++) {
-        TSlotDescriptor tslot_desc;
-        {
-            tslot_desc.id = i;
-            tslot_desc.parent = 0;
-            TTypeDesc type;
-            {
-                TTypeNode node;
-                node.__set_type(TTypeNodeType::SCALAR);
-                TScalarType scalar_type;
-                scalar_type.__set_type(TPrimitiveType::type(i + 2));
-                node.__set_scalar_type(scalar_type);
-                type.types.push_back(node);
-            }
-            tslot_desc.slotType = type;
-            tslot_desc.columnPos = 0;
-            tslot_desc.byteOffset = 0;
-            tslot_desc.nullIndicatorByte = 0;
-            tslot_desc.nullIndicatorBit = -1;
-            tslot_desc.colName = int_types[i];
-            tslot_desc.slotIdx = 0;
-            tslot_desc.isMaterialized = true;
-            t_desc_table.slotDescriptors.push_back(tslot_desc);
-        }
-    }
-
-    t_desc_table.__isset.slotDescriptors = true;
-    {
-        // TTupleDescriptor dest
-        TTupleDescriptor t_tuple_desc;
-        t_tuple_desc.id = 0;
-        t_tuple_desc.byteSize = 16;
-        t_tuple_desc.numNullBytes = 0;
-        t_tuple_desc.tableId = 0;
-        t_tuple_desc.__isset.tableId = true;
-        t_desc_table.tupleDescriptors.push_back(t_tuple_desc);
-    }
-    DescriptorTbl* desc_tbl;
-    ObjectPool obj_pool;
-    DescriptorTbl::create(&obj_pool, t_desc_table, &desc_tbl);
+    SchemaScanner::ColumnDesc column_descs[] = {
+            {"tinyint_col", TYPE_TINYINT, sizeof(int8_t), true},
+            {"smallint_col", TYPE_SMALLINT, sizeof(int16_t), true},
+            {"int_col", TYPE_INT, sizeof(int32_t), true},
+            {"bigint_col", TYPE_BIGINT, sizeof(int64_t), true},
+            {"boolean_col", TYPE_BOOLEAN, sizeof(bool), true},
+            {"float_col", TYPE_FLOAT, sizeof(float_t), true},
+            {"double_col", TYPE_DOUBLE, sizeof(double_t), true},
+            {"string_col", TYPE_STRING, sizeof(StringValue), true},
+            {"binary_col", TYPE_STRING, sizeof(StringValue), true},
+            {"timestamp_col", TYPE_DATETIME, sizeof(DateTimeValue), true},
+            {"decimal_col", TYPE_DECIMALV2, sizeof(DecimalV2Value), true},
+            {"char_col", TYPE_CHAR, sizeof(StringValue), true},
+            {"varchar_col", TYPE_VARCHAR, sizeof(StringValue), true},
+            {"date_col", TYPE_DATE, sizeof(DateTimeValue), true}};
+    int num_cols = sizeof(column_descs) / sizeof(SchemaScanner::ColumnDesc);
+    SchemaScanner schema_scanner(column_descs, num_cols);
+    ObjectPool object_pool;
+    SchemaScannerParam param;
+    schema_scanner.init(&param, &object_pool);
+    auto tuple_slots = 
const_cast<TupleDescriptor*>(schema_scanner.tuple_desc())->slots();
     std::vector<ParquetReadColumn> read_columns;
-    for (int i = 0; i < int_types.size(); i++) {
-        auto slot_desc = desc_tbl->get_slot_descriptor(i);
-        ParquetReadColumn column(slot_desc);
+    for (const auto& slot : tuple_slots) {
+        ParquetReadColumn column(slot);
         read_columns.emplace_back(column);
     }
 
@@ -502,12 +375,19 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
     auto stg = row_group_reader->init(meta_data->schema(), row_ranges);
     EXPECT_TRUE(stg.ok());
 
-    std::unique_ptr<vectorized::Block> block;
-    create_block(block);
+    vectorized::Block block;
+    for (const auto& slot_desc : tuple_slots) {
+        auto is_nullable = slot_desc->is_nullable();
+        auto data_type = 
vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(),
+                                                                               
   is_nullable);
+        MutableColumnPtr data_column = data_type->create_column();
+        block.insert(
+                ColumnWithTypeAndName(std::move(data_column), data_type, 
slot_desc->col_name()));
+    }
     bool batch_eof = false;
-    auto stb = row_group_reader->next_batch(block.get(), 1024, &batch_eof);
+    auto stb = row_group_reader->next_batch(&block, 1024, &batch_eof);
     EXPECT_TRUE(stb.ok());
-    LOG(WARNING) << "block data: " << block->dump_structure();
+    LOG(WARNING) << "block data: " << block.dump_data(0, 10);
 }
 } // namespace vectorized
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to