This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 20634ab7e3 [feature-wip](multi-catalog) support partition&missing 
columns in parquet lazy read (#14264)
20634ab7e3 is described below

commit 20634ab7e3d90de41395543e1f4cdb5569bca6f2
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Wed Nov 16 08:43:11 2022 +0800

    [feature-wip](multi-catalog) support partition&missing columns in parquet 
lazy read (#14264)
    
    PR https://github.com/apache/doris/pull/13917 has supported lazy read for 
non-predicate columns in ParquetReader,
    but can't trigger lazy read when predicate columns are partition or missing 
columns.
    This PR support such case, and fill partition and missing columns in 
`FileReader`.
---
 be/src/exec/text_converter.cpp                     | 164 ++++++++++++++++++++-
 be/src/exec/text_converter.h                       |   5 +
 be/src/vec/exec/format/generic_reader.h            |  19 +++
 .../exec/format/parquet/vparquet_group_reader.cpp  | 135 ++++++++++++-----
 .../exec/format/parquet/vparquet_group_reader.h    |  35 +++--
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  59 ++++++--
 be/src/vec/exec/format/parquet/vparquet_reader.h   |   6 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |  66 +++++++--
 be/src/vec/exec/scan/vfile_scanner.h               |   1 +
 be/test/vec/exec/parquet/parquet_reader_test.cpp   |   4 +
 10 files changed, 425 insertions(+), 69 deletions(-)

diff --git a/be/src/exec/text_converter.cpp b/be/src/exec/text_converter.cpp
index 5fac00569c..5888eefc43 100644
--- a/be/src/exec/text_converter.cpp
+++ b/be/src/exec/text_converter.cpp
@@ -17,15 +17,177 @@
 
 #include "text_converter.h"
 
-#include <boost/algorithm/string.hpp>
+#include <sql.h>
 
+#include "runtime/decimalv2_value.h"
+#include "runtime/descriptors.h"
 #include "runtime/mem_pool.h"
 #include "runtime/string_value.h"
+#include "runtime/tuple.h"
+#include "util/string_parser.hpp"
+#include "util/types.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/runtime/vdatetime_value.h"
 
 namespace doris {
 
 TextConverter::TextConverter(char escape_char) : _escape_char(escape_char) {}
 
+bool TextConverter::write_vec_column(const SlotDescriptor* slot_desc,
+                                     vectorized::IColumn* nullable_col_ptr, 
const char* data,
+                                     size_t len, bool copy_string, bool 
need_escape, size_t rows) {
+    vectorized::IColumn* col_ptr = nullable_col_ptr;
+    // \N means it's NULL
+    if (slot_desc->is_nullable()) {
+        auto* nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(nullable_col_ptr);
+        if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == 
SQL_NULL_DATA) {
+            nullable_column->insert_many_defaults(rows);
+            return true;
+        } else {
+            auto& null_map = nullable_column->get_null_map_data();
+            null_map.resize_fill(null_map.size() + rows, 0);
+            col_ptr = &nullable_column->get_nested_column();
+        }
+    }
+
+    StringParser::ParseResult parse_result = StringParser::PARSE_SUCCESS;
+    size_t origin_size = col_ptr->size();
+    // Parse the raw-text data. Translate the text string to internal format.
+    switch (slot_desc->type().type) {
+    case TYPE_HLL: {
+        HyperLogLog hyper_log_log(Slice(data, len));
+        auto& hyper_data = 
reinterpret_cast<vectorized::ColumnHLL*>(col_ptr)->get_data();
+        for (size_t i = 0; i < rows; ++i) {
+            hyper_data.emplace_back(hyper_log_log);
+        }
+        break;
+    }
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+    case TYPE_CHAR: {
+        if (need_escape) {
+            unescape_string_on_spot(data, &len);
+        }
+        
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_many_data(data, 
len, rows);
+        break;
+    }
+
+    case TYPE_BOOLEAN: {
+        bool num = StringParser::string_to_bool(data, len, &parse_result);
+        reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)
+                ->get_data()
+                .resize_fill(origin_size + rows, (uint8_t)num);
+        break;
+    }
+    case TYPE_TINYINT: {
+        int8_t num = StringParser::string_to_int<int8_t>(data, len, 
&parse_result);
+        reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)
+                ->get_data()
+                .resize_fill(origin_size + rows, num);
+        break;
+    }
+    case TYPE_SMALLINT: {
+        int16_t num = StringParser::string_to_int<int16_t>(data, len, 
&parse_result);
+        reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)
+                ->get_data()
+                .resize_fill(origin_size + rows, num);
+        break;
+    }
+    case TYPE_INT: {
+        int32_t num = StringParser::string_to_int<int32_t>(data, len, 
&parse_result);
+        reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)
+                ->get_data()
+                .resize_fill(origin_size + rows, num);
+        break;
+    }
+    case TYPE_BIGINT: {
+        int64_t num = StringParser::string_to_int<int64_t>(data, len, 
&parse_result);
+        reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)
+                ->get_data()
+                .resize_fill(origin_size + rows, num);
+        break;
+    }
+    case TYPE_LARGEINT: {
+        __int128 num = StringParser::string_to_int<__int128>(data, len, 
&parse_result);
+        
reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)
+                ->get_data()
+                .resize_fill(origin_size + rows, num);
+        break;
+    }
+
+    case TYPE_FLOAT: {
+        float num = StringParser::string_to_float<float>(data, len, 
&parse_result);
+        
reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)
+                ->get_data()
+                .resize_fill(origin_size + rows, num);
+        break;
+    }
+    case TYPE_DOUBLE: {
+        double num = StringParser::string_to_float<double>(data, len, 
&parse_result);
+        
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)
+                ->get_data()
+                .resize_fill(origin_size + rows, num);
+        break;
+    }
+    case TYPE_DATE: {
+        vectorized::VecDateTimeValue ts_slot;
+        if (!ts_slot.from_date_str(data, len)) {
+            parse_result = StringParser::PARSE_FAILURE;
+            break;
+        }
+        ts_slot.cast_to_date();
+        reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)
+                ->get_data()
+                .resize_fill(origin_size + rows, 
*reinterpret_cast<int64_t*>(&ts_slot));
+        break;
+    }
+
+    case TYPE_DATETIME: {
+        vectorized::VecDateTimeValue ts_slot;
+        if (!ts_slot.from_date_str(data, len)) {
+            parse_result = StringParser::PARSE_FAILURE;
+            break;
+        }
+        ts_slot.to_datetime();
+        reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)
+                ->get_data()
+                .resize_fill(origin_size + rows, 
*reinterpret_cast<int64_t*>(&ts_slot));
+        break;
+    }
+
+    case TYPE_DECIMALV2: {
+        DecimalV2Value decimal_slot;
+        if (decimal_slot.parse_from_str(data, len)) {
+            parse_result = StringParser::PARSE_FAILURE;
+            break;
+        }
+        
reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)
+                ->get_data()
+                .resize_fill(origin_size + rows, decimal_slot.value());
+        break;
+    }
+
+    default:
+        DCHECK(false) << "bad slot type: " << slot_desc->type();
+        break;
+    }
+
+    if (UNLIKELY(parse_result == StringParser::PARSE_FAILURE)) {
+        if (true == slot_desc->is_nullable()) {
+            auto* nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(nullable_col_ptr);
+            size_t size = nullable_column->get_null_map_data().size();
+            doris::vectorized::NullMap& null_map_data = 
nullable_column->get_null_map_data();
+            for (int i = 1; i <= rows; ++i) {
+                null_map_data[size - i] = 1;
+            }
+            nullable_column->get_nested_column().insert_many_defaults(rows);
+        }
+        return false;
+    }
+    return true;
+}
+
 void TextConverter::unescape_string(StringValue* value, MemPool* pool) {
     char* new_data = reinterpret_cast<char*>(pool->allocate(value->len));
     unescape_string(value->ptr, new_data, &value->len);
diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h
index a3d3fe5ff6..79deac95ff 100644
--- a/be/src/exec/text_converter.h
+++ b/be/src/exec/text_converter.h
@@ -56,6 +56,11 @@ public:
     bool write_vec_column(const SlotDescriptor* slot_desc, 
vectorized::IColumn* nullable_col_ptr,
                           const char* data, size_t len, bool copy_string, bool 
need_escape);
 
+    /// Write consecutive rows of the same data.
+    bool write_vec_column(const SlotDescriptor* slot_desc, 
vectorized::IColumn* nullable_col_ptr,
+                          const char* data, size_t len, bool copy_string, bool 
need_escape,
+                          size_t rows);
+
     // Removes escape characters from len characters of the null-terminated 
string src,
     // and copies the unescaped string into dest, changing *len to the 
unescaped length.
     // No null-terminator is added to dest.
diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index e098557b82..dd2bdd249c 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -19,6 +19,7 @@
 
 #include "common/status.h"
 #include "runtime/types.h"
+#include "vec/exprs/vexpr_context.h"
 
 namespace doris::vectorized {
 
@@ -43,6 +44,24 @@ public:
         return Status::NotSupported("get_parser_schema is not implemented for 
this reader.");
     }
     virtual ~GenericReader() = default;
+
+    /// If the underlying FileReader has filled the partition&missing columns,
+    /// The FileScanner does not need to fill
+    bool fill_all_columns() const { return _fill_all_columns; }
+
+    /// Tell the underlying FileReader the partition&missing columns,
+    /// and the FileReader determine to fill columns or not.
+    /// Should set _fill_all_columns = true, if fill the columns.
+    virtual Status set_fill_columns(
+            const std::unordered_map<std::string, std::tuple<std::string, 
const SlotDescriptor*>>&
+                    partition_columns,
+            const std::unordered_map<std::string, VExprContext*>& 
missing_columns) {
+        return Status::OK();
+    }
+
+protected:
+    /// Whether the underlying FileReader has filled the partition&missing 
columns
+    bool _fill_all_columns = false;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0f71990b2b..e607cd510d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -33,13 +33,7 @@ RowGroupReader::RowGroupReader(doris::FileReader* 
file_reader,
           _row_group_meta(row_group),
           _remaining_rows(row_group.num_rows),
           _ctz(ctz),
-          _vconjunct_ctx(lazy_read_ctx.vconjunct_ctx),
-          _can_lazy_read(lazy_read_ctx.can_lazy_read),
-          _resize_first_column(lazy_read_ctx.resize_first_column),
-          _all_read_columns(lazy_read_ctx.all_read_columns),
-          _predicate_columns(lazy_read_ctx.predicate_columns),
-          _predicate_col_ids(lazy_read_ctx.predicate_col_ids),
-          _lazy_read_columns(lazy_read_ctx.lazy_read_columns) {}
+          _lazy_read_ctx(lazy_read_ctx) {}
 
 RowGroupReader::~RowGroupReader() {
     _column_readers.clear();
@@ -54,7 +48,6 @@ Status RowGroupReader::init(const FieldDescriptor& schema, 
std::vector<RowRange>
     const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb 
<< 20;
     const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 
20;
     size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / 
_read_columns.size());
-    std::set<std::string> predicate_columns(_predicate_columns.begin(), 
_predicate_columns.end());
     for (auto& read_col : _read_columns) {
         auto field = 
const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name));
         std::unique_ptr<ParquetColumnReader> reader;
@@ -70,13 +63,6 @@ Status RowGroupReader::init(const FieldDescriptor& schema, 
std::vector<RowRange>
             return Status::Corruption("Init row group reader failed");
         }
         _column_readers[read_col._file_slot_name] = std::move(reader);
-        PrimitiveType column_type = field->type.type;
-        if (column_type == TYPE_ARRAY || column_type == TYPE_MAP || 
column_type == TYPE_STRUCT) {
-            _can_lazy_read = false;
-        }
-    }
-    if (_vconjunct_ctx == nullptr) {
-        _can_lazy_read = false;
     }
     return Status::OK();
 }
@@ -85,16 +71,29 @@ Status RowGroupReader::next_batch(Block* block, size_t 
batch_size, size_t* read_
                                   bool* _batch_eof) {
     // Process external table query task that select columns are all from path.
     if (_read_columns.empty()) {
-        return _read_empty_batch(batch_size, read_rows, _batch_eof);
+        RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, _batch_eof));
+        RETURN_IF_ERROR(
+                _fill_partition_columns(block, *read_rows, 
_lazy_read_ctx.partition_columns));
+        RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, 
_lazy_read_ctx.missing_columns));
+
+        Status st =
+                VExprContext::filter_block(_lazy_read_ctx.vconjunct_ctx, 
block, block->columns());
+        *read_rows = block->rows();
+        return st;
     }
-    if (_can_lazy_read) {
+    if (_lazy_read_ctx.can_lazy_read) {
         // call _do_lazy_read recursively when current batch is skipped
         return _do_lazy_read(block, batch_size, read_rows, _batch_eof);
     } else {
         ColumnSelectVector run_length_vector;
-        RETURN_IF_ERROR(_read_column_data(block, _all_read_columns, 
batch_size, read_rows,
-                                          _batch_eof, run_length_vector));
-        Status st = VExprContext::filter_block(_vconjunct_ctx, block, 
block->columns());
+        RETURN_IF_ERROR(_read_column_data(block, 
_lazy_read_ctx.all_read_columns, batch_size,
+                                          read_rows, _batch_eof, 
run_length_vector));
+        RETURN_IF_ERROR(
+                _fill_partition_columns(block, *read_rows, 
_lazy_read_ctx.partition_columns));
+        RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, 
_lazy_read_ctx.missing_columns));
+
+        Status st =
+                VExprContext::filter_block(_lazy_read_ctx.vconjunct_ctx, 
block, block->columns());
         *read_rows = block->rows();
         return st;
     }
@@ -132,7 +131,6 @@ Status RowGroupReader::_read_column_data(Block* block, 
const std::vector<std::st
         col_idx++;
     }
     *read_rows = batch_read_rows;
-    _read_rows += batch_read_rows;
     *_batch_eof = has_eof;
     return Status::OK();
 }
@@ -143,19 +141,23 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     size_t pre_read_rows;
     bool pre_eof;
     ColumnSelectVector run_length_vector;
-    RETURN_IF_ERROR(_read_column_data(block, _predicate_columns, batch_size, 
&pre_read_rows,
-                                      &pre_eof, run_length_vector));
+    RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns, 
batch_size,
+                                      &pre_read_rows, &pre_eof, 
run_length_vector));
+    RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
+                                            
_lazy_read_ctx.predicate_partition_columns));
+    RETURN_IF_ERROR(
+            _fill_missing_columns(block, pre_read_rows, 
_lazy_read_ctx.predicate_missing_columns));
     // generate filter vector
-    if (_resize_first_column) {
+    if (_lazy_read_ctx.resize_first_column) {
         // VExprContext.execute has an optimization, the filtering is executed 
when block->rows() > 0
         // The following process may be tricky and time-consuming, but we have 
no other way.
         
block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
     }
     size_t origin_column_num = block->columns();
     int filter_column_id = -1;
-    RETURN_IF_ERROR(_vconjunct_ctx->execute(block, &filter_column_id));
+    RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, 
&filter_column_id));
     ColumnPtr& sv = block->get_by_position(filter_column_id).column;
-    if (_resize_first_column) {
+    if (_lazy_read_ctx.resize_first_column) {
         // We have to clean the first column to insert right data.
         block->get_by_position(0).column->assume_mutable()->clear();
     }
@@ -167,10 +169,16 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     if (select_vector.filter_all() && !pre_eof) {
         // If continuous batches are skipped, we can cache them to skip a 
whole page
         _cached_filtered_rows += pre_read_rows;
-        for (auto& col : _predicate_columns) {
+        for (auto& col : _lazy_read_ctx.predicate_columns) {
             // clean block to read predicate columns
             block->get_by_name(col).column->assume_mutable()->clear();
         }
+        for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
+            block->get_by_name(col.first).column->assume_mutable()->clear();
+        }
+        for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
+            block->get_by_name(col.first).column->assume_mutable()->clear();
+        }
         Block::erase_useless_column(block, origin_column_num);
         return _do_lazy_read(block, batch_size, read_rows, batch_eof);
     }
@@ -184,8 +192,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     // lazy read columns
     size_t lazy_read_rows;
     bool lazy_eof;
-    RETURN_IF_ERROR(_read_column_data(block, _lazy_read_columns, 
pre_read_rows, &lazy_read_rows,
-                                      &lazy_eof, select_vector));
+    RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, 
pre_read_rows,
+                                      &lazy_read_rows, &lazy_eof, 
select_vector));
     if (pre_read_rows != lazy_read_rows) {
         return Status::Corruption("Can't read the same number of rows when 
doing lazy read");
     }
@@ -194,24 +202,29 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
 
     // filter data in predicate columns, and remove filter column
     if (select_vector.has_filter()) {
-        Block::filter_block(block, _predicate_col_ids, filter_column_id, 
origin_column_num);
+        Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, 
filter_column_id,
+                            origin_column_num);
     } else {
         Block::erase_useless_column(block, origin_column_num);
     }
+
     size_t column_num = block->columns();
-    size_t column_size = -1;
+    size_t column_size = 0;
     for (int i = 0; i < column_num; ++i) {
         size_t cz = block->get_by_position(i).column->size();
-        if (column_size != -1) {
+        if (column_size != 0 && cz != 0) {
             DCHECK_EQ(column_size, cz);
         }
-        column_size = cz;
+        if (cz != 0) {
+            column_size = cz;
+        }
     }
     _lazy_read_filtered_rows += pre_read_rows - column_size;
     *read_rows = column_size;
 
     *batch_eof = pre_eof;
-    return Status::OK();
+    RETURN_IF_ERROR(_fill_partition_columns(block, column_size, 
_lazy_read_ctx.partition_columns));
+    return _fill_missing_columns(block, column_size, 
_lazy_read_ctx.missing_columns);
 }
 
 const uint8_t* RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t 
num_rows,
@@ -271,6 +284,60 @@ void 
RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector,
     select_vector.build(map, total_rows, false);
 }
 
+Status RowGroupReader::_fill_partition_columns(
+        Block* block, size_t rows,
+        const std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>&
+                partition_columns) {
+    for (auto& kv : partition_columns) {
+        auto doris_column = block->get_by_name(kv.first).column;
+        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
+        auto& [value, slot_desc] = kv.second;
+        if (!_text_converter->write_vec_column(slot_desc, col_ptr, 
const_cast<char*>(value.c_str()),
+                                               value.size(), true, false, 
rows)) {
+            return Status::InternalError("Failed to fill partition column: 
{}={}",
+                                         slot_desc->col_name(), value);
+        }
+    }
+    return Status::OK();
+}
+
+Status RowGroupReader::_fill_missing_columns(
+        Block* block, size_t rows,
+        const std::unordered_map<std::string, VExprContext*>& missing_columns) 
{
+    for (auto& kv : missing_columns) {
+        if (kv.second == nullptr) {
+            // no default column, fill with null
+            auto nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(
+                    
(*std::move(block->get_by_name(kv.first).column)).mutate().get());
+            nullable_column->insert_many_defaults(rows);
+        } else {
+            // fill with default value
+            auto* ctx = kv.second;
+            auto origin_column_num = block->columns();
+            int result_column_id = -1;
+            // PT1 => dest primitive type
+            RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
+            bool is_origin_column = result_column_id < origin_column_num;
+            if (!is_origin_column) {
+                // call resize because the first column of _src_block_ptr may 
not be filled by reader,
+                // so _src_block_ptr->rows() may return wrong result, cause 
the column created by `ctx->execute()`
+                // has only one row.
+                
std::move(*block->get_by_position(result_column_id).column).mutate()->resize(rows);
+                auto result_column_ptr = 
block->get_by_position(result_column_id).column;
+                // result_column_ptr maybe a ColumnConst, convert it to a 
normal column
+                result_column_ptr = 
result_column_ptr->convert_to_full_column_if_const();
+                auto origin_column_type = block->get_by_name(kv.first).type;
+                bool is_nullable = origin_column_type->is_nullable();
+                block->replace_by_position(
+                        block->get_position_by_name(kv.first),
+                        is_nullable ? make_nullable(result_column_ptr) : 
result_column_ptr);
+                block->erase(result_column_id);
+            }
+        }
+    }
+    return Status::OK();
+}
+
 Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, 
bool* _batch_eof) {
     if (batch_size < _remaining_rows) {
         *read_rows = batch_size;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index d1e0315c68..661d248f12 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -17,6 +17,7 @@
 #pragma once
 #include <common/status.h>
 
+#include "exec/text_converter.h"
 #include "io/file_reader.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr_context.h"
@@ -29,11 +30,22 @@ public:
     struct LazyReadContext {
         VExprContext* vconjunct_ctx = nullptr;
         bool can_lazy_read = false;
+        // block->rows() returns the number of rows of the first column,
+        // so we should check and resize the first column
         bool resize_first_column = true;
         std::vector<std::string> all_read_columns;
+        // include predicate_partition_columns & predicate_missing_columns
+        std::vector<uint32_t> all_predicate_col_ids;
         std::vector<std::string> predicate_columns;
-        std::vector<uint32_t> predicate_col_ids;
         std::vector<std::string> lazy_read_columns;
+        std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
+                predicate_partition_columns;
+        // lazy read partition columns or all partition columns
+        std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
+                partition_columns;
+        std::unordered_map<std::string, VExprContext*> 
predicate_missing_columns;
+        // lazy read missing columns or all missing columns
+        std::unordered_map<std::string, VExprContext*> missing_columns;
     };
 
     RowGroupReader(doris::FileReader* file_reader,
@@ -45,7 +57,7 @@ public:
     Status init(const FieldDescriptor& schema, std::vector<RowRange>& 
row_ranges,
                 std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
     Status next_batch(Block* block, size_t batch_size, size_t* read_rows, 
bool* _batch_eof);
-    int64_t lazy_read_filtered_rows() { return _lazy_read_filtered_rows; }
+    int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; 
}
 
     ParquetColumnReader::Statistics statistics();
 
@@ -58,6 +70,13 @@ private:
     const uint8_t* _build_filter_map(ColumnPtr& sv, size_t num_rows, bool* 
can_filter_all);
     void _rebuild_select_vector(ColumnSelectVector& select_vector,
                                 std::unique_ptr<uint8_t[]>& filter_map, size_t 
pre_read_rows);
+    Status _fill_partition_columns(
+            Block* block, size_t rows,
+            const std::unordered_map<std::string, std::tuple<std::string, 
const SlotDescriptor*>>&
+                    partition_columns);
+    Status _fill_missing_columns(
+            Block* block, size_t rows,
+            const std::unordered_map<std::string, VExprContext*>& 
missing_columns);
 
     doris::FileReader* _file_reader;
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> 
_column_readers;
@@ -65,20 +84,12 @@ private:
     const int32_t _row_group_id;
     const tparquet::RowGroup& _row_group_meta;
     int64_t _remaining_rows;
-    int64_t _read_rows = 0;
     cctz::time_zone* _ctz;
 
-    VExprContext* _vconjunct_ctx;
-    bool _can_lazy_read;
-    // block->rows() returns the number of rows of the first column,
-    // so we should check and resize the first column
-    const bool _resize_first_column;
-    const std::vector<std::string>& _all_read_columns;
-    const std::vector<std::string>& _predicate_columns;
-    const std::vector<uint32_t>& _predicate_col_ids;
-    const std::vector<std::string>& _lazy_read_columns;
+    const LazyReadContext& _lazy_read_ctx;
     int64_t _lazy_read_filtered_rows = 0;
     // If continuous batches are skipped, we can cache them to skip a whole 
page
     size_t _cached_filtered_rows = 0;
+    std::unique_ptr<TextConverter> _text_converter = nullptr;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 2ab2a1dfdc..63efce2882 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -160,13 +160,15 @@ Status ParquetReader::init_reader(
     RETURN_IF_ERROR(_init_read_columns());
     // build column predicates for column lazy read
     _lazy_read_ctx.vconjunct_ctx = vconjunct_ctx;
-    _init_lazy_read();
-    RETURN_IF_ERROR(_init_row_group_readers());
 
-    return Status::OK();
+    return _init_row_group_readers();
 }
 
-void ParquetReader::_init_lazy_read() {
+Status ParquetReader::set_fill_columns(
+        const std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>&
+                partition_columns,
+        const std::unordered_map<std::string, VExprContext*>& missing_columns) 
{
+    SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
     std::unordered_map<std::string, uint32_t> predicate_columns;
     std::function<void(VExpr * expr)> visit_slot = [&](VExpr* expr) {
         if (VSlotRef* slot_ref = typeid_cast<VSlotRef*>(expr)) {
@@ -200,27 +202,62 @@ void ParquetReader::_init_lazy_read() {
     if (_lazy_read_ctx.vconjunct_ctx != nullptr) {
         visit_slot(_lazy_read_ctx.vconjunct_ctx->root());
     }
+
+    bool has_complex_type = false;
+    const FieldDescriptor& schema = _file_metadata->schema();
     for (auto& read_col : _read_columns) {
         _lazy_read_ctx.all_read_columns.emplace_back(read_col._file_slot_name);
+        PrimitiveType column_type = 
schema.get_column(read_col._file_slot_name)->type.type;
+        if (column_type == TYPE_ARRAY || column_type == TYPE_MAP || 
column_type == TYPE_STRUCT) {
+            has_complex_type = true;
+        }
         if (predicate_columns.size() > 0) {
             auto iter = predicate_columns.find(read_col._file_slot_name);
             if (iter == predicate_columns.end()) {
                 
_lazy_read_ctx.lazy_read_columns.emplace_back(read_col._file_slot_name);
             } else {
                 _lazy_read_ctx.predicate_columns.emplace_back(iter->first);
-                _lazy_read_ctx.predicate_col_ids.emplace_back(iter->second);
+                
_lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second);
             }
         }
     }
-    if (_lazy_read_ctx.predicate_columns.size() > 0 &&
+
+    for (auto& kv : partition_columns) {
+        auto iter = predicate_columns.find(kv.first);
+        if (iter == predicate_columns.end()) {
+            _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
+        } else {
+            _lazy_read_ctx.predicate_partition_columns.emplace(kv.first, 
kv.second);
+            _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second);
+        }
+    }
+
+    for (auto& kv : missing_columns) {
+        auto iter = predicate_columns.find(kv.first);
+        if (iter == predicate_columns.end()) {
+            _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
+        } else {
+            _lazy_read_ctx.predicate_missing_columns.emplace(kv.first, 
kv.second);
+            _lazy_read_ctx.all_predicate_col_ids.emplace_back(iter->second);
+        }
+    }
+
+    if (!has_complex_type && _lazy_read_ctx.predicate_columns.size() > 0 &&
         _lazy_read_ctx.lazy_read_columns.size() > 0) {
-        if (predicate_columns.size() == 
_lazy_read_ctx.predicate_columns.size()) {
-            // TODO: support partition columns
-            // _vconjunct_ctx has partition columns, and will push down to row 
group reader.
-            // However, row group reader can't get partition column values now.
-            _lazy_read_ctx.can_lazy_read = true;
+        _lazy_read_ctx.can_lazy_read = true;
+    }
+
+    if (!_lazy_read_ctx.can_lazy_read) {
+        for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
+            _lazy_read_ctx.partition_columns.emplace(kv.first, kv.second);
+        }
+        for (auto& kv : _lazy_read_ctx.predicate_missing_columns) {
+            _lazy_read_ctx.missing_columns.emplace(kv.first, kv.second);
         }
     }
+
+    _fill_all_columns = true;
+    return Status::OK();
 }
 
 Status ParquetReader::_init_read_columns() {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 9f602e6901..b726a401ad 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -82,6 +82,11 @@ public:
 
     Statistics& statistics() { return _statistics; }
 
+    Status set_fill_columns(
+            const std::unordered_map<std::string, std::tuple<std::string, 
const SlotDescriptor*>>&
+                    partition_columns,
+            const std::unordered_map<std::string, VExprContext*>& 
missing_columns) override;
+
 private:
     struct ParquetProfile {
         RuntimeProfile::Counter* filtered_row_groups;
@@ -108,7 +113,6 @@ private:
 
     void _init_profile();
     bool _next_row_group_reader();
-    void _init_lazy_read();
     Status _init_read_columns();
     Status _init_row_group_readers();
     // Page Index Filter
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index a2400b1dad..8e6aabbd47 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -151,10 +151,13 @@ Status VFileScanner::_get_block_impl(RuntimeState* state, 
Block* block, bool* eo
         if (read_rows > 0) {
             // Convert the src block columns type to string in-place.
             RETURN_IF_ERROR(_cast_to_input_block(block));
-            // Fill rows in src block with partition columns from path. (e.g. 
Hive partition columns)
-            RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
-            // Fill columns not exist in file with null or default value
-            RETURN_IF_ERROR(_fill_missing_columns(read_rows));
+            // FileReader can fill partition and missing columns itself
+            if (!_cur_reader->fill_all_columns()) {
+                // Fill rows in src block with partition columns from path. 
(e.g. Hive partition columns)
+                RETURN_IF_ERROR(_fill_columns_from_path(read_rows));
+                // Fill columns not exist in file with null or default value
+                RETURN_IF_ERROR(_fill_missing_columns(read_rows));
+            }
             // Apply _pre_conjunct_ctx_ptr to filter src block.
             RETURN_IF_ERROR(_pre_filter_src_block());
             // Convert src block to output block (dest block), string to dest 
data type and apply filters.
@@ -261,10 +264,11 @@ Status VFileScanner::_fill_columns_from_path(size_t rows) 
{
             auto doris_column = 
_src_block_ptr->get_by_name(slot_desc->col_name()).column;
             IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
 
-            for (size_t j = 0; j < rows; ++j) {
-                _text_converter->write_vec_column(slot_desc, col_ptr,
-                                                  
const_cast<char*>(column_from_path.c_str()),
-                                                  column_from_path.size(), 
true, false);
+            if (!_text_converter->write_vec_column(slot_desc, col_ptr,
+                                                   
const_cast<char*>(column_from_path.c_str()),
+                                                   column_from_path.size(), 
true, false, rows)) {
+                return Status::InternalError("Failed to fill partition column: 
{}={}",
+                                             slot_desc->col_name(), 
column_from_path);
             }
         }
     }
@@ -473,8 +477,7 @@ Status VFileScanner::_get_next_reader() {
             _cur_reader.reset(new ParquetReader(
                     _profile, _params, range, _file_col_names, 
_state->query_options().batch_size,
                     const_cast<cctz::time_zone*>(&_state->timezone_obj())));
-            if (_push_down_expr == nullptr && _vconjunct_ctx != nullptr &&
-                _partition_slot_descs.empty()) { // TODO: support partition 
columns
+            if (_push_down_expr == nullptr && _vconjunct_ctx != nullptr) {
                 RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, 
&_push_down_expr));
                 _discard_conjuncts();
             }
@@ -521,6 +524,7 @@ Status VFileScanner::_get_next_reader() {
         _name_to_col_type.clear();
         _missing_cols.clear();
         _cur_reader->get_columns(&_name_to_col_type, &_missing_cols);
+        RETURN_IF_ERROR(_generate_fill_columns());
         if (VLOG_NOTICE_IS_ON && !_missing_cols.empty() && _is_load) {
             fmt::memory_buffer col_buf;
             for (auto& col : _missing_cols) {
@@ -535,6 +539,48 @@ Status VFileScanner::_get_next_reader() {
     return Status::OK();
 }
 
+Status VFileScanner::_generate_fill_columns() {
+    std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
+            partition_columns;
+    std::unordered_map<std::string, VExprContext*> missing_columns;
+
+    const TFileRangeDesc& range = _ranges.at(_next_range - 1);
+    if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
+        for (const auto& slot_desc : _partition_slot_descs) {
+            if (slot_desc) {
+                auto it = _partition_slot_index_map.find(slot_desc->id());
+                if (it == std::end(_partition_slot_index_map)) {
+                    return Status::InternalError("Unknown source slot 
descriptor, slot_id={}",
+                                                 slot_desc->id());
+                }
+                const std::string& column_from_path = 
range.columns_from_path[it->second];
+                partition_columns.emplace(slot_desc->col_name(),
+                                          std::make_tuple(column_from_path, 
slot_desc));
+            }
+        }
+    }
+
+    if (!_missing_cols.empty()) {
+        for (auto slot_desc : _real_tuple_desc->slots()) {
+            if (!slot_desc->is_materialized()) {
+                continue;
+            }
+            if (_missing_cols.find(slot_desc->col_name()) == 
_missing_cols.end()) {
+                continue;
+            }
+
+            auto it = _col_default_value_ctx.find(slot_desc->col_name());
+            if (it == _col_default_value_ctx.end()) {
+                return Status::InternalError("failed to find default value 
expr for slot: {}",
+                                             slot_desc->col_name());
+            }
+            missing_columns.emplace(slot_desc->col_name(), it->second);
+        }
+    }
+
+    return _cur_reader->set_fill_columns(partition_columns, missing_columns);
+}
+
 Status VFileScanner::_init_expr_ctxes() {
     DCHECK(!_ranges.empty());
 
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 3edd75a5ac..cfe26d9753 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -135,6 +135,7 @@ private:
     Status _fill_missing_columns(size_t rows);
     Status _pre_filter_src_block();
     Status _convert_to_output_block(Block* block);
+    Status _generate_fill_columns();
 
     void _reset_counter() {
         _counter.num_rows_unselected = 0;
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp 
b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 4b99ccae0b..743b2291cf 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -113,6 +113,10 @@ TEST_F(ParquetReaderTest, normal) {
 
     std::unordered_map<std::string, ColumnValueRangeType> 
colname_to_value_range;
     p_reader->init_reader(&colname_to_value_range, nullptr);
+    std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
+            partition_columns;
+    std::unordered_map<std::string, VExprContext*> missing_columns;
+    p_reader->set_fill_columns(partition_columns, missing_columns);
     Block* block = new Block();
     for (const auto& slot_desc : tuple_desc->slots()) {
         auto data_type =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to