This is an automated email from the ASF dual-hosted git repository.
suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 8882d08a6a0 [feature](be) Support nested projection in new parquet
reader
8882d08a6a0 is described below
commit 8882d08a6a0fd2dad2b8aa9e6e543c521103db50
Author: Socrates <[email protected]>
AuthorDate: Wed May 27 16:47:19 2026 +0800
[feature](be) Support nested projection in new parquet reader
Issue Number: None
Related PR: None
Problem Summary: Add file-local nested schema metadata and projection
plumbing for the new Arrow Parquet reader. Struct child projection is now
pushed into the Parquet column reader factory, table scan schema is rebuilt
from projected complex types, and the mapper preserves path metadata for future
complex schema change handling while explicitly rejecting unsupported child
schema evolution for now.
None
- Test: Unit Test
- Added BE unit coverage for struct projection, nested schema path
metadata, and table mapper complex projection generation.
- Ran clang-format 16 dry-run on modified C++ files.
- Ran git diff --check.
- Attempted ./run-be-ut.sh --run
'--filter=ParquetColumnReaderTest.*:TableColumnMapperTest.*:NewParquetReaderTest.*:FileReaderTest.*',
but local CMake compiler sanity check failed before Doris code compilation
because ld could not find library 'c++'.
- Behavior changed: No
- Does this need documentation: Yes (included
docs/doris-arrow-parquet-complex-types-implementation.md)
---
be/src/format/new_parquet/column_reader.cpp | 48 +-
be/src/format/new_parquet/column_reader.h | 15 +-
.../format/new_parquet/parquet_column_schema.cpp | 103 +++-
be/src/format/new_parquet/parquet_column_schema.h | 10 +
be/src/format/new_parquet/parquet_reader.cpp | 142 +++++-
be/src/format/new_parquet/parquet_reader.h | 6 +
be/src/format/reader/column_mapper.cpp | 206 +++++++-
be/src/format/reader/column_mapper.h | 21 +-
be/src/format/reader/file_reader.h | 14 +
be/src/format/reader/table_reader.h | 97 +++-
.../new_parquet/parquet_column_reader_test.cpp | 297 +++++++----
be/test/format/new_parquet/parquet_reader_test.cpp | 71 ++-
...s-arrow-parquet-complex-types-implementation.md | 559 +++++++++++++++++++++
13 files changed, 1436 insertions(+), 153 deletions(-)
diff --git a/be/src/format/new_parquet/column_reader.cpp
b/be/src/format/new_parquet/column_reader.cpp
index bb101b5c5fc..f1674b767b0 100644
--- a/be/src/format/new_parquet/column_reader.cpp
+++ b/be/src/format/new_parquet/column_reader.cpp
@@ -33,8 +33,10 @@
#include "core/column/column.h"
#include "core/column/column_struct.h"
#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_struct.h"
#include "core/data_type_serde/decoded_column_view.h"
#include "format/new_parquet/parquet_column_schema.h"
+#include "format/reader/file_reader.h"
namespace doris::parquet {
namespace {
@@ -77,10 +79,10 @@ private:
class StructColumnReader final : public ParquetColumnReader {
public:
- StructColumnReader(const ParquetColumnSchema& schema,
+ StructColumnReader(const ParquetColumnSchema& schema, DataTypePtr type,
std::vector<std::unique_ptr<ParquetColumnReader>>
children)
- : _field_id(schema.field_id),
- _type(schema.type),
+ : _field_id(schema.top_level_field_id),
+ _type(std::move(type)),
_name(schema.name),
_children(std::move(children)) {}
@@ -364,6 +366,7 @@ Status StructColumnReader::read(int64_t rows,
MutableColumnPtr& column, int64_t*
size_t child_idx = 0;
DCHECK_EQ(assert_cast<ColumnStruct&>(*column).get_columns().size(),
_children.size());
for (auto& child_reader : _children) {
+ DORIS_CHECK(child_reader != nullptr);
int64_t child_rows = 0;
auto child_column =
assert_cast<ColumnStruct&>(*column).get_column_ptr(child_idx)->assume_mutable();
@@ -517,7 +520,7 @@ Status ParquetColumnReaderFactory::get_record_reader(
}
Status ParquetColumnReaderFactory::create_struct_column_reader(
- const ParquetColumnSchema& column_schema,
+ const ParquetColumnSchema& column_schema, const
reader::FieldProjection* projection,
std::unique_ptr<ParquetColumnReader>* reader) const {
if (reader == nullptr) {
return Status::InvalidArgument("reader is null");
@@ -529,16 +532,45 @@ Status
ParquetColumnReaderFactory::create_struct_column_reader(
}
std::vector<std::unique_ptr<ParquetColumnReader>> child_readers;
child_readers.reserve(column_schema.children.size());
- for (const auto& child_schema : column_schema.children) {
+ DataTypes projected_child_types;
+ Strings projected_child_names;
+ for (size_t child_idx = 0; child_idx < column_schema.children.size();
++child_idx) {
+ const auto& child_schema = column_schema.children[child_idx];
+ const reader::FieldProjection* child_projection = nullptr;
+ if (projection != nullptr && !projection->project_all_children) {
+ auto it = std::find_if(projection->children.begin(),
projection->children.end(),
+ [&](const reader::FieldProjection& child) {
+ return child.file_path ==
child_schema->file_path;
+ });
+ if (it == projection->children.end()) {
+ continue;
+ }
+ child_projection = &*it;
+ }
std::unique_ptr<ParquetColumnReader> child_reader;
- RETURN_IF_ERROR(create(*child_schema, &child_reader));
+ RETURN_IF_ERROR(create(*child_schema, child_projection,
&child_reader));
+ projected_child_types.push_back(child_reader->type());
+ projected_child_names.push_back(child_reader->name());
child_readers.push_back(std::move(child_reader));
}
- *reader = std::make_unique<StructColumnReader>(column_schema,
std::move(child_readers));
+ if (child_readers.empty() && !column_schema.children.empty()) {
+ return Status::NotSupported("Parquet STRUCT projection for column {}
contains no children",
+ column_schema.name);
+ }
+ DataTypePtr type = column_schema.type;
+ if (projection != nullptr && !projection->project_all_children) {
+ type = std::make_shared<DataTypeStruct>(projected_child_types,
projected_child_names);
+ if (column_schema.type != nullptr &&
column_schema.type->is_nullable()) {
+ type = make_nullable(type);
+ }
+ }
+ *reader = std::make_unique<StructColumnReader>(column_schema,
std::move(type),
+ std::move(child_readers));
return Status::OK();
}
Status ParquetColumnReaderFactory::create(const ParquetColumnSchema&
column_schema,
+ const reader::FieldProjection*
projection,
std::unique_ptr<ParquetColumnReader>* reader) const {
if (reader == nullptr) {
return Status::InvalidArgument("reader is null");
@@ -547,7 +579,7 @@ Status ParquetColumnReaderFactory::create(const
ParquetColumnSchema& column_sche
case ParquetColumnSchemaKind::PRIMITIVE:
return create_scalar_column_reader(column_schema, reader);
case ParquetColumnSchemaKind::STRUCT:
- return create_struct_column_reader(column_schema, reader);
+ return create_struct_column_reader(column_schema, projection, reader);
case ParquetColumnSchemaKind::LIST:
return Status::NotSupported("Parquet LIST reader is not implemented
for column {}",
column_schema.name);
diff --git a/be/src/format/new_parquet/column_reader.h
b/be/src/format/new_parquet/column_reader.h
index cd59fc3960a..93881ac8c48 100644
--- a/be/src/format/new_parquet/column_reader.h
+++ b/be/src/format/new_parquet/column_reader.h
@@ -39,6 +39,10 @@ class RecordReader;
namespace doris {
class IColumn;
+namespace reader {
+struct FieldProjection;
+} // namespace reader
+
namespace parquet {
struct ParquetColumnSchema;
@@ -88,14 +92,21 @@ public:
// 根据 file-local schema tree 创建 column reader。复杂类型会在这里递归创建
// children。该入口只理解 Parquet file schema,不处理 table/global schema。
Status create(const ParquetColumnSchema& column_schema,
+ const reader::FieldProjection* projection,
std::unique_ptr<ParquetColumnReader>* reader) const;
+ Status create(const ParquetColumnSchema& column_schema,
+ std::unique_ptr<ParquetColumnReader>* reader) const {
+ return create(column_schema, nullptr, reader);
+ }
+
private:
Status create_scalar_column_reader(const ParquetColumnSchema&
column_schema,
- std::unique_ptr<ParquetColumnReader>* reader) const;
+ std::unique_ptr<ParquetColumnReader>*
reader) const;
Status create_struct_column_reader(const ParquetColumnSchema&
column_schema,
- std::unique_ptr<ParquetColumnReader>* reader) const;
+ const reader::FieldProjection*
projection,
+ std::unique_ptr<ParquetColumnReader>*
reader) const;
Status get_record_reader(int leaf_column_id, const
::parquet::ColumnDescriptor* descriptor,
const std::string& name,
diff --git a/be/src/format/new_parquet/parquet_column_schema.cpp
b/be/src/format/new_parquet/parquet_column_schema.cpp
index 131bf9f22c0..3235ea38a06 100644
--- a/be/src/format/new_parquet/parquet_column_schema.cpp
+++ b/be/src/format/new_parquet/parquet_column_schema.cpp
@@ -19,6 +19,7 @@
#include <parquet/api/schema.h>
+#include <algorithm>
#include <memory>
#include <string>
#include <vector>
@@ -32,6 +33,19 @@
namespace doris::parquet {
namespace {
+struct SchemaBuildContext {
+ int32_t top_level_field_id = -1;
+ int32_t parent_schema_node_id = -1;
+ int16_t definition_level = 0;
+ int16_t repetition_level = 0;
+ int16_t nullable_definition_level = 0;
+ int16_t repeated_repetition_level = 0;
+ std::vector<int32_t> file_path;
+ std::vector<int32_t> field_id_path;
+ std::vector<std::string> name_path;
+ int* next_schema_node_id = nullptr;
+};
+
bool is_list_node(const ::parquet::schema::Node& node) {
const auto& logical_type = node.logical_type();
return node.converted_type() == ::parquet::ConvertedType::LIST ||
@@ -49,16 +63,63 @@ DataTypePtr nullable_if_needed(DataTypePtr type, const
::parquet::schema::Node&
return node.is_optional() ? make_nullable(type) : type;
}
+void inherit_common_schema_state(const ::parquet::schema::Node& node,
+ const SchemaBuildContext& context,
+ ParquetColumnSchema* column_schema) {
+ DORIS_CHECK(column_schema != nullptr);
+ DORIS_CHECK(context.next_schema_node_id != nullptr);
+ column_schema->field_id = node.field_id();
+ column_schema->top_level_field_id = context.top_level_field_id;
+ column_schema->schema_node_id = (*context.next_schema_node_id)++;
+ column_schema->parent_schema_node_id = context.parent_schema_node_id;
+ column_schema->file_path = context.file_path;
+ column_schema->field_id_path = context.field_id_path;
+ column_schema->name_path = context.name_path;
+ column_schema->name = node.name();
+ column_schema->node = &node;
+ column_schema->max_definition_level = context.definition_level;
+ column_schema->max_repetition_level = context.repetition_level;
+ column_schema->nullable_definition_level =
context.nullable_definition_level;
+ column_schema->repeated_repetition_level =
context.repeated_repetition_level;
+}
+
+SchemaBuildContext child_context(const SchemaBuildContext& parent,
+ const ::parquet::schema::Node& child_node,
int32_t child_idx,
+ int32_t parent_schema_node_id) {
+ SchemaBuildContext result = parent;
+ result.parent_schema_node_id = parent_schema_node_id;
+ result.file_path.push_back(child_idx);
+ result.field_id_path.push_back(child_node.field_id());
+ result.name_path.push_back(child_node.name());
+ if (child_node.repetition() != ::parquet::Repetition::REQUIRED) {
+ result.definition_level++;
+ result.nullable_definition_level = result.definition_level;
+ }
+ if (child_node.is_repeated()) {
+ result.repetition_level++;
+ result.repeated_repetition_level = result.repetition_level;
+ }
+ return result;
+}
+
+void propagate_child_levels(ParquetColumnSchema* column_schema) {
+ DORIS_CHECK(column_schema != nullptr);
+ for (const auto& child : column_schema->children) {
+ column_schema->max_definition_level =
+ std::max(column_schema->max_definition_level,
child->max_definition_level);
+ column_schema->max_repetition_level =
+ std::max(column_schema->max_repetition_level,
child->max_repetition_level);
+ }
+}
+
Status build_node_schema(const ::parquet::SchemaDescriptor& schema,
- const ::parquet::schema::Node& node,
+ const ::parquet::schema::Node& node, const
SchemaBuildContext& context,
std::unique_ptr<ParquetColumnSchema>* result) {
if (result == nullptr) {
return Status::InvalidArgument("result is null");
}
auto column_schema = std::make_unique<ParquetColumnSchema>();
- column_schema->field_id = node.field_id();
- column_schema->name = node.name();
- column_schema->node = &node;
+ inherit_common_schema_state(node, context, column_schema.get());
if (node.is_primitive()) {
const int leaf_column_id = schema.ColumnIndex(node);
@@ -69,6 +130,10 @@ Status build_node_schema(const ::parquet::SchemaDescriptor&
schema,
column_schema->kind = ParquetColumnSchemaKind::PRIMITIVE;
column_schema->leaf_column_id = leaf_column_id;
column_schema->descriptor = schema.Column(leaf_column_id);
+ if (column_schema->descriptor != nullptr) {
+ column_schema->max_definition_level =
column_schema->descriptor->max_definition_level();
+ column_schema->max_repetition_level =
column_schema->descriptor->max_repetition_level();
+ }
column_schema->type_descriptor =
resolve_parquet_type(column_schema->descriptor);
column_schema->type = column_schema->type_descriptor.doris_type;
if (column_schema->type == nullptr) {
@@ -87,10 +152,13 @@ Status build_node_schema(const
::parquet::SchemaDescriptor& schema,
node.name());
}
std::unique_ptr<ParquetColumnSchema> child;
- RETURN_IF_ERROR(build_node_schema(schema, *group.field(0), &child));
+ RETURN_IF_ERROR(build_node_schema(
+ schema, *group.field(0),
+ child_context(context, *group.field(0), 0,
column_schema->schema_node_id), &child));
column_schema->type =
nullable_if_needed(std::make_shared<DataTypeArray>(child->type), node);
column_schema->children.push_back(std::move(child));
+ propagate_child_levels(column_schema.get());
*result = std::move(column_schema);
return Status::OK();
}
@@ -102,16 +170,25 @@ Status build_node_schema(const
::parquet::SchemaDescriptor& schema,
node.name());
}
std::unique_ptr<ParquetColumnSchema> key_value;
- RETURN_IF_ERROR(build_node_schema(schema, *group.field(0),
&key_value));
+ RETURN_IF_ERROR(build_node_schema(
+ schema, *group.field(0),
+ child_context(context, *group.field(0), 0,
column_schema->schema_node_id),
+ &key_value));
if (key_value->children.size() != 2) {
return Status::NotSupported("Unsupported parquet MAP key_value
layout for column {}",
node.name());
}
+ if (key_value->children[0]->node == nullptr ||
+ key_value->children[0]->node->repetition() !=
::parquet::Repetition::REQUIRED) {
+ return Status::NotSupported("Unsupported nullable parquet MAP key
for column {}",
+ node.name());
+ }
auto key_type = key_value->children[0]->type;
auto value_type = key_value->children[1]->type;
column_schema->type =
nullable_if_needed(std::make_shared<DataTypeMap>(key_type,
value_type), node);
column_schema->children.push_back(std::move(key_value));
+ propagate_child_levels(column_schema.get());
*result = std::move(column_schema);
return Status::OK();
}
@@ -123,13 +200,17 @@ Status build_node_schema(const
::parquet::SchemaDescriptor& schema,
child_names.reserve(group.field_count());
for (int child_idx = 0; child_idx < group.field_count(); ++child_idx) {
std::unique_ptr<ParquetColumnSchema> child;
- RETURN_IF_ERROR(build_node_schema(schema, *group.field(child_idx),
&child));
+ RETURN_IF_ERROR(build_node_schema(schema, *group.field(child_idx),
+ child_context(context,
*group.field(child_idx), child_idx,
+
column_schema->schema_node_id),
+ &child));
child_types.push_back(child->type);
child_names.push_back(child->name);
column_schema->children.push_back(std::move(child));
}
column_schema->type =
nullable_if_needed(std::make_shared<DataTypeStruct>(child_types,
child_names), node);
+ propagate_child_levels(column_schema.get());
*result = std::move(column_schema);
return Status::OK();
}
@@ -146,10 +227,16 @@ Status build_parquet_column_schema(const
::parquet::SchemaDescriptor& schema,
if (root == nullptr) {
return Status::InvalidArgument("Parquet schema root is null");
}
+ int next_schema_node_id = 0;
fields->reserve(root->field_count());
for (int field_idx = 0; field_idx < root->field_count(); ++field_idx) {
std::unique_ptr<ParquetColumnSchema> field;
- RETURN_IF_ERROR(build_node_schema(schema, *root->field(field_idx),
&field));
+ SchemaBuildContext context;
+ context.top_level_field_id = field_idx;
+ context.next_schema_node_id = &next_schema_node_id;
+ RETURN_IF_ERROR(build_node_schema(
+ schema, *root->field(field_idx),
+ child_context(context, *root->field(field_idx), field_idx,
-1), &field));
fields->push_back(std::move(field));
}
return Status::OK();
diff --git a/be/src/format/new_parquet/parquet_column_schema.h
b/be/src/format/new_parquet/parquet_column_schema.h
index 0d089a0f9cd..81f9536243e 100644
--- a/be/src/format/new_parquet/parquet_column_schema.h
+++ b/be/src/format/new_parquet/parquet_column_schema.h
@@ -47,16 +47,26 @@ enum class ParquetColumnSchemaKind {
// 它描述 Parquet 逻辑字段到 leaf column ordinal 的关系,不包含 table/global schema 语义。
struct ParquetColumnSchema {
int field_id = -1;
+ int top_level_field_id = -1;
// Parquet schema 中的 primitive leaf column ordinal。
// 该 id 用于访问 ColumnDescriptor、RowGroupReader::RecordReader、ColumnChunk
// metadata 和 statistics。复杂类型节点本身没有单一 leaf column,因此为 -1。
int leaf_column_id = -1;
+ int schema_node_id = -1;
+ int parent_schema_node_id = -1;
+ std::vector<int32_t> file_path;
+ std::vector<int32_t> field_id_path;
+ std::vector<std::string> name_path;
std::string name;
DataTypePtr type;
ParquetTypeDescriptor type_descriptor;
ParquetColumnSchemaKind kind = ParquetColumnSchemaKind::PRIMITIVE;
const ::parquet::schema::Node* node = nullptr;
const ::parquet::ColumnDescriptor* descriptor = nullptr;
+ int16_t max_definition_level = 0;
+ int16_t max_repetition_level = 0;
+ int16_t nullable_definition_level = 0;
+ int16_t repeated_repetition_level = 0;
std::vector<std::unique_ptr<ParquetColumnSchema>> children;
};
diff --git a/be/src/format/new_parquet/parquet_reader.cpp
b/be/src/format/new_parquet/parquet_reader.cpp
index ff9d939b4d0..677a596debf 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -23,14 +23,19 @@
#include <parquet/api/reader.h>
#include <algorithm>
+#include <map>
#include <memory>
#include <string_view>
#include <utility>
#include <vector>
#include "common/exception.h"
+#include "core/assert_cast.h"
#include "core/block/block.h"
+#include "core/data_type/data_type_array.h"
+#include "core/data_type/data_type_map.h"
#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_struct.h"
#include "exprs/vexpr_context.h"
#include "format/new_parquet/column_reader.h"
#include "format/new_parquet/parquet_column_schema.h"
@@ -181,10 +186,12 @@ void ParquetReader::_reset_current_row_group() {
void ParquetReader::_fill_schema_field(const ParquetColumnSchema&
column_schema,
reader::SchemaField* field) const {
- field->id = column_schema.leaf_column_id >= 0 ?
column_schema.leaf_column_id
- : column_schema.field_id;
+ field->id = column_schema.top_level_field_id;
field->name = column_schema.name;
field->type = column_schema.type;
+ field->file_path = column_schema.file_path;
+ field->field_id_path = column_schema.field_id_path;
+ field->name_path = column_schema.name_path;
field->children.clear();
field->children.reserve(column_schema.children.size());
for (const auto& child : column_schema.children) {
@@ -194,6 +201,95 @@ void ParquetReader::_fill_schema_field(const
ParquetColumnSchema& column_schema,
}
}
+Status ParquetReader::_fill_projected_schema_field(const ParquetColumnSchema&
column_schema,
+ const
reader::FieldProjection* projection,
+ reader::SchemaField* field)
const {
+ if (field == nullptr) {
+ return Status::InvalidArgument("projected schema field is null");
+ }
+ _fill_schema_field(column_schema, field);
+ if (projection == nullptr || projection->project_all_children ||
+ column_schema.children.empty()) {
+ return Status::OK();
+ }
+
+ field->children.clear();
+ std::map<int32_t, const reader::FieldProjection*> child_projection_by_idx;
+ for (const auto& child_projection : projection->children) {
+ if (child_projection.file_path.empty()) {
+ return Status::InvalidArgument("Empty parquet projection path for
column {}",
+ column_schema.name);
+ }
+ child_projection_by_idx.emplace(child_projection.file_path.back(),
&child_projection);
+ }
+
+ DataTypes child_types;
+ Strings child_names;
+ for (size_t child_idx = 0; child_idx < column_schema.children.size();
++child_idx) {
+ auto it =
child_projection_by_idx.find(static_cast<int32_t>(child_idx));
+ if (it == child_projection_by_idx.end()) {
+ continue;
+ }
+ if (it->second->file_path !=
column_schema.children[child_idx]->file_path) {
+ return Status::InvalidArgument("Invalid parquet projection path
for column {}",
+
column_schema.children[child_idx]->name);
+ }
+ reader::SchemaField child_field;
+
RETURN_IF_ERROR(_fill_projected_schema_field(*column_schema.children[child_idx],
it->second,
+ &child_field));
+ child_types.push_back(child_field.type);
+ child_names.push_back(child_field.name);
+ field->children.push_back(std::move(child_field));
+ }
+
+ if (field->children.empty()) {
+ return Status::NotSupported("Parquet projection for column {} contains
no children",
+ column_schema.name);
+ }
+
+ const auto primitive_type =
remove_nullable(column_schema.type)->get_primitive_type();
+ DataTypePtr projected_type;
+ switch (primitive_type) {
+ case TYPE_STRUCT:
+ projected_type = std::make_shared<DataTypeStruct>(child_types,
child_names);
+ break;
+ case TYPE_ARRAY:
+ DORIS_CHECK(child_types.size() == 1);
+ projected_type = std::make_shared<DataTypeArray>(child_types[0]);
+ break;
+ case TYPE_MAP:
+ DORIS_CHECK(child_types.size() == 1);
+ DORIS_CHECK(remove_nullable(child_types[0])->get_primitive_type() ==
TYPE_STRUCT);
+ {
+ const auto* entry_type =
+ assert_cast<const
DataTypeStruct*>(remove_nullable(child_types[0]).get());
+ DORIS_CHECK(entry_type->get_elements().size() == 2);
+ projected_type =
std::make_shared<DataTypeMap>(entry_type->get_element(0),
+
entry_type->get_element(1));
+ }
+ break;
+ default:
+ return Status::InvalidArgument("Cannot project children from
non-complex parquet column {}",
+ column_schema.name);
+ }
+ field->type =
+ column_schema.type->is_nullable() ? make_nullable(projected_type)
: projected_type;
+ return Status::OK();
+}
+
+Status ParquetReader::_get_projected_schema_field(reader::ColumnId
file_column_id,
+ const
reader::FieldProjection* projection,
+ reader::SchemaField* field)
const {
+ if (file_column_id < 0 ||
+ file_column_id >=
static_cast<reader::ColumnId>(_state->file_schema.size())) {
+ return Status::InvalidArgument("Invalid parquet field id {}",
file_column_id);
+ }
+ RETURN_IF_ERROR(
+ _fill_projected_schema_field(*_state->file_schema[file_column_id],
projection, field));
+ field->id = file_column_id;
+ return Status::OK();
+}
+
bool ParquetReader::_has_expression_filter(const reader::FileLocalFilter&
local_filter) {
return local_filter.conjunct != nullptr;
}
@@ -228,13 +324,12 @@ Status ParquetReader::_read_filter_columns(int64_t
batch_rows, Block* file_block
}
IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
bool can_filter_all = false;
- RETURN_IF_ERROR(local_filter.conjunct->execute_filter(
- file_block, filter.data(),
static_cast<size_t>(batch_rows), false,
- &can_filter_all));
+ RETURN_IF_ERROR(local_filter.conjunct->execute_filter(file_block,
filter.data(),
+
static_cast<size_t>(batch_rows),
+ false,
&can_filter_all));
*selected_rows =
- can_filter_all
- ? 0
- : _apply_filter_to_selection(filter, selection,
*selected_rows);
+ can_filter_all ? 0
+ : _apply_filter_to_selection(filter,
selection, *selected_rows);
break;
}
if (*selected_rows == 0) {
@@ -298,14 +393,24 @@ Status ParquetReader::_open_next_row_group(bool*
has_row_group) {
_state->schema->num_columns());
for (const auto file_field_id : _request->predicate_columns) {
const auto& column_schema = _state->file_schema[file_field_id];
+ const auto projection_it =
_request->complex_projections.find(file_field_id);
+ const auto* projection = projection_it ==
_request->complex_projections.end()
+ ? nullptr
+ : &projection_it->second;
std::unique_ptr<ParquetColumnReader> column_reader;
- RETURN_IF_ERROR(column_reader_factory.create(*column_schema,
&column_reader));
+ RETURN_IF_ERROR(
+ column_reader_factory.create(*column_schema, projection,
&column_reader));
_state->current_predicate_columns.push_back(std::move(column_reader));
}
for (const auto file_field_id : _request->non_predicate_columns) {
const auto& column_schema = _state->file_schema[file_field_id];
+ const auto projection_it =
_request->complex_projections.find(file_field_id);
+ const auto* projection = projection_it ==
_request->complex_projections.end()
+ ? nullptr
+ : &projection_it->second;
std::unique_ptr<ParquetColumnReader> column_reader;
- RETURN_IF_ERROR(column_reader_factory.create(*column_schema,
&column_reader));
+ RETURN_IF_ERROR(
+ column_reader_factory.create(*column_schema, projection,
&column_reader));
_state->current_non_predicate_columns.push_back(std::move(column_reader));
}
*has_row_group = true;
@@ -456,6 +561,23 @@ Status
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
local_filter.file_column_id);
}
}
+ for (const auto& [file_column_id, projection] :
_request->complex_projections) {
+ if (file_column_id < 0 || file_column_id >= num_fields) {
+ return Status::InvalidArgument("Invalid parquet projection
top-level field id {}",
+ file_column_id);
+ }
+ if (projection.file_column_id != file_column_id) {
+ return Status::InvalidArgument(
+ "Parquet projection column id mismatch: key={}, value={}",
file_column_id,
+ projection.file_column_id);
+ }
+ if (!projection.file_path.empty() && projection.file_path.front() !=
file_column_id) {
+ return Status::InvalidArgument("Invalid parquet projection root
path for column {}",
+ file_column_id);
+ }
+ reader::SchemaField projected_field;
+ RETURN_IF_ERROR(_get_projected_schema_field(file_column_id,
&projection, &projected_field));
+ }
RETURN_IF_ERROR(select_row_groups_by_statistics(*_state->metadata,
_state->file_schema,
*_request,
&_state->selected_row_groups));
RETURN_IF_ERROR(_reset_reader_position());
diff --git a/be/src/format/new_parquet/parquet_reader.h
b/be/src/format/new_parquet/parquet_reader.h
index d7a9dc19a59..f6d47f46134 100644
--- a/be/src/format/new_parquet/parquet_reader.h
+++ b/be/src/format/new_parquet/parquet_reader.h
@@ -121,6 +121,12 @@ private:
void _reset_current_row_group();
void _fill_schema_field(const ParquetColumnSchema& column_schema,
reader::SchemaField* field) const;
+ Status _fill_projected_schema_field(const ParquetColumnSchema&
column_schema,
+ const reader::FieldProjection*
projection,
+ reader::SchemaField* field) const;
+ Status _get_projected_schema_field(reader::ColumnId file_column_id,
+ const reader::FieldProjection*
projection,
+ reader::SchemaField* field) const;
bool _has_expression_filter(const reader::FileLocalFilter& local_filter);
Status _read_filter_columns(int64_t batch_rows, Block* file_block,
SelectionVector* selection,
uint16_t* selected_rows);
diff --git a/be/src/format/reader/column_mapper.cpp
b/be/src/format/reader/column_mapper.cpp
index 5790517f7bb..4d9afdeff32 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -18,10 +18,16 @@
#include "format/reader/column_mapper.h"
#include <cstddef>
+#include <memory>
+#include <utility>
#include <vector>
#include "common/status.h"
#include "core/assert_cast.h"
+#include "core/data_type/data_type_array.h"
+#include "core/data_type/data_type_map.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_struct.h"
#include "format/reader/expr/cast.h"
#include "format/reader/expr/slot_ref.h"
#include "format/reader/file_reader.h"
@@ -86,8 +92,8 @@ static void rebuild_projection(ColumnMapping* mapping, size_t
block_position) {
mapping->projection = VExprContext::create_shared(expr);
}
-static std::map<int32_t, size_t> build_file_position_map(
- const std::vector<ColumnMapping>& mappings, const FileScanRequest&
file_request) {
+static std::map<int32_t, size_t> build_file_position_map(const
std::vector<ColumnMapping>& mappings,
+ const
FileScanRequest& file_request) {
std::map<int32_t, size_t> table_column_to_file_position;
for (const auto& mapping : mappings) {
if (!mapping.file_column_id.has_value()) {
@@ -101,6 +107,109 @@ static std::map<int32_t, size_t> build_file_position_map(
return table_column_to_file_position;
}
+static bool is_complex_type(const DataTypePtr& type) {
+ DORIS_CHECK(type != nullptr);
+ const auto primitive_type = remove_nullable(type)->get_primitive_type();
+ return primitive_type == TYPE_STRUCT || primitive_type == TYPE_ARRAY ||
+ primitive_type == TYPE_MAP;
+}
+
+static const SchemaField* find_file_child_by_table_column(
+ const TableColumn& table_column, const std::vector<SchemaField>&
file_children,
+ TableColumnMappingMode mode) {
+ for (const auto& field : file_children) {
+ if (mode == TableColumnMappingMode::BY_FIELD_ID &&
!field.field_id_path.empty() &&
+ field.field_id_path.back() != -1 && field.field_id_path.back() ==
table_column.id) {
+ return &field;
+ }
+ if (field.name == table_column.name) {
+ return &field;
+ }
+ }
+ return nullptr;
+}
+
+static bool complex_projection_has_pruned_children(const ColumnMapping&
mapping) {
+ if (!is_complex_type(mapping.file_type)) {
+ return false;
+ }
+ if (mapping.child_mappings.empty()) {
+ return false;
+ }
+ DORIS_CHECK(mapping.file_type != nullptr);
+ DORIS_CHECK(mapping.table_type != nullptr);
+ if (remove_nullable(mapping.file_type)->get_primitive_type() !=
+ remove_nullable(mapping.table_type)->get_primitive_type()) {
+ return true;
+ }
+ if (!mapping.table_type->equals(*mapping.file_type)) {
+ return true;
+ }
+ for (const auto& child_mapping : mapping.child_mappings) {
+ if (!child_mapping.file_column_id.has_value() ||
+ complex_projection_has_pruned_children(child_mapping)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+static Status rebuild_projected_file_type(ColumnMapping* mapping) {
+ if (mapping == nullptr) {
+ return Status::InvalidArgument("mapping is null");
+ }
+ DORIS_CHECK(is_complex_type(mapping->file_type));
+ DataTypes child_types;
+ Strings child_names;
+ child_types.reserve(mapping->child_mappings.size());
+ child_names.reserve(mapping->child_mappings.size());
+ for (auto& child_mapping : mapping->child_mappings) {
+ if (!child_mapping.file_column_id.has_value()) {
+ continue;
+ }
+ if (complex_projection_has_pruned_children(child_mapping)) {
+ RETURN_IF_ERROR(rebuild_projected_file_type(&child_mapping));
+ }
+ child_types.push_back(child_mapping.file_type);
+ child_names.push_back(child_mapping.file_column_name);
+ }
+ if (child_types.empty()) {
+ return Status::NotSupported("Projection for complex column {} contains
no file children",
+ mapping->file_column_name);
+ }
+ DataTypePtr projected_type;
+ const auto primitive_type =
remove_nullable(mapping->file_type)->get_primitive_type();
+ switch (primitive_type) {
+ case TYPE_STRUCT:
+ projected_type = std::make_shared<DataTypeStruct>(child_types,
child_names);
+ break;
+ case TYPE_ARRAY:
+ DORIS_CHECK(child_types.size() == 1);
+ projected_type = std::make_shared<DataTypeArray>(child_types[0]);
+ break;
+ case TYPE_MAP:
+ DORIS_CHECK(child_types.size() == 1);
+ DORIS_CHECK(remove_nullable(child_types[0])->get_primitive_type() ==
TYPE_STRUCT);
+ {
+ const auto* entry_type =
+ assert_cast<const
DataTypeStruct*>(remove_nullable(child_types[0]).get());
+ DORIS_CHECK(entry_type->get_elements().size() == 2);
+ projected_type =
std::make_shared<DataTypeMap>(entry_type->get_element(0),
+
entry_type->get_element(1));
+ }
+ break;
+ default:
+ return Status::InvalidArgument("Cannot project children from
non-complex column {}",
+ mapping->file_column_name);
+ }
+ mapping->file_type =
+ mapping->file_type->is_nullable() ? make_nullable(projected_type)
: projected_type;
+ mapping->is_trivial =
+ mapping->table_type != nullptr &&
mapping->table_type->equals(*mapping->file_type);
+ mapping->has_complex_projection = true;
+ return Status::OK();
+}
+
Status TableColumnMapper::create_mapping(const std::vector<TableColumn>&
projected_columns,
const std::map<std::string, Field>&
partition_values,
const std::vector<SchemaField>&
file_schema) {
@@ -110,10 +219,7 @@ Status TableColumnMapper::create_mapping(const
std::vector<TableColumn>& project
mapping.table_column_id = table_column.id;
mapping.table_type = table_column.type;
if (const auto* file_field = _find_file_field(table_column,
file_schema)) {
- mapping.file_column_id = file_field->id;
- mapping.file_column_name = file_field->name;
- mapping.file_type = file_field->type;
- mapping.is_trivial = _is_same_type(mapping.table_type,
mapping.file_type);
+ RETURN_IF_ERROR(_create_direct_mapping(table_column, *file_field,
&mapping));
} else if (table_column.is_partition_key &&
partition_values.count(table_column.name) > 0) {
// 3. Partition column, use partition value as a constant mapping.
Note that partition column may also have default expression, but partition
value should take precedence if it exists.
mapping.default_expr =
VExprContext::create_shared(TableLiteral::create_shared(
@@ -130,12 +236,12 @@ Status TableColumnMapper::create_mapping(const
std::vector<TableColumn>& project
} else {
if (table_column.is_partition_key) {
return Status::InvalidArgument(
- "Table column '%s' (id=%d) does not have a matching
partition value",
- table_column.name);
+ "Table column '{}' (id={}) does not have a matching
partition value",
+ table_column.name, table_column.id);
}
if (!_options.allow_missing_columns) {
return Status::InvalidArgument(
- "Table column '%s' (id=%d) does not have a matching
file column",
+ "Table column '{}' (id={}) does not have a matching
file column",
table_column.name, table_column.id);
}
}
@@ -152,15 +258,26 @@ Status TableColumnMapper::create_scan_request(const
std::map<int32_t, TableFilte
file_request->predicate_columns.clear();
file_request->non_predicate_columns.clear();
file_request->column_positions.clear();
+ file_request->complex_projections.clear();
file_request->local_filters.clear();
file_request->reader_expression_map.clear();
for (const auto& table_column : projected_columns) {
- const auto* mapping = _find_mapping(table_column.id);
+ auto* mapping = _find_mapping(table_column.id);
if (mapping != nullptr && mapping->file_column_id.has_value()) {
if (table_filters.count(table_column.id) == 0) {
add_scan_column(file_request, *mapping->file_column_id,
&file_request->non_predicate_columns);
}
+ if (mapping->has_complex_projection ||
+ complex_projection_has_pruned_children(*mapping)) {
+ if (!mapping->has_complex_projection) {
+ RETURN_IF_ERROR(rebuild_projected_file_type(mapping));
+ }
+ FieldProjection projection;
+ RETURN_IF_ERROR(_build_complex_projection(*mapping,
&projection));
+
file_request->complex_projections.emplace(*mapping->file_column_id,
+
std::move(projection));
+ }
}
}
RETURN_IF_ERROR(localize_filters(table_filters, file_request));
@@ -216,8 +333,15 @@ Status TableColumnMapper::localize_filters(const
std::map<int32_t, TableFilter>&
const SchemaField* TableColumnMapper::_find_file_field(
const TableColumn& table_column, const std::vector<SchemaField>&
file_schema) const {
for (const auto& field : file_schema) {
- if (_options.mode == TableColumnMappingMode::BY_FIELD_ID && field.id
== table_column.id) {
- return &field;
+ if (_options.mode == TableColumnMappingMode::BY_FIELD_ID) {
+ if (!field.field_id_path.empty() && field.field_id_path.back() !=
-1 &&
+ field.field_id_path.back() == table_column.id) {
+ return &field;
+ }
+ if ((field.field_id_path.empty() || field.field_id_path.back() ==
-1) &&
+ field.id == table_column.id) {
+ return &field;
+ }
}
if (field.name == table_column.name) {
return &field;
@@ -226,4 +350,62 @@ const SchemaField* TableColumnMapper::_find_file_field(
return nullptr;
}
+Status TableColumnMapper::_create_direct_mapping(const TableColumn&
table_column,
+ const SchemaField& file_field,
+ ColumnMapping* mapping) const
{
+ if (mapping == nullptr) {
+ return Status::InvalidArgument("mapping is null");
+ }
+ mapping->file_column_id = file_field.id;
+ mapping->file_column_name = file_field.name;
+ mapping->file_path = file_field.file_path;
+ mapping->file_type = file_field.type;
+ mapping->is_trivial = _is_same_type(mapping->table_type,
mapping->file_type);
+ mapping->child_mappings.clear();
+
+ if (!table_column.children.empty() && is_complex_type(file_field.type)) {
+ for (const auto& table_child : table_column.children) {
+ const auto* file_child = find_file_child_by_table_column(
+ table_child, file_field.children, _options.mode);
+ if (file_child == nullptr) {
+ return Status::NotSupported(
+ "Complex schema change is not implemented: table child
column '{}' "
+ "(id={}) does not have a matching file child under
column '{}'",
+ table_child.name, table_child.id, table_column.name);
+ }
+ ColumnMapping child_mapping;
+ child_mapping.table_column_id = table_child.id;
+ child_mapping.table_type = table_child.type;
+ RETURN_IF_ERROR(_create_direct_mapping(table_child, *file_child,
&child_mapping));
+ mapping->child_mappings.push_back(std::move(child_mapping));
+ }
+ }
+ return Status::OK();
+}
+
+Status TableColumnMapper::_build_complex_projection(const ColumnMapping&
mapping,
+ FieldProjection*
projection) const {
+ if (projection == nullptr) {
+ return Status::InvalidArgument("projection is null");
+ }
+ DORIS_CHECK(mapping.file_column_id.has_value());
+ projection->file_column_id = *mapping.file_column_id;
+ projection->file_path = mapping.file_path;
+ projection->project_all_children = mapping.child_mappings.empty();
+ projection->children.clear();
+ for (const auto& child_mapping : mapping.child_mappings) {
+ if (!child_mapping.file_column_id.has_value()) {
+ continue;
+ }
+ FieldProjection child_projection;
+ RETURN_IF_ERROR(_build_complex_projection(child_mapping,
&child_projection));
+ projection->children.push_back(std::move(child_projection));
+ }
+ if (!projection->project_all_children && projection->children.empty()) {
+ return Status::NotSupported("Projection for complex column {} contains
no file children",
+ mapping.file_column_name);
+ }
+ return Status::OK();
+}
+
} // namespace doris::reader
diff --git a/be/src/format/reader/column_mapper.h
b/be/src/format/reader/column_mapper.h
index 4360b23e7de..0c6ac9c8e6c 100644
--- a/be/src/format/reader/column_mapper.h
+++ b/be/src/format/reader/column_mapper.h
@@ -19,7 +19,9 @@
#include <cstddef>
#include <cstdint>
+#include <map>
#include <memory>
+#include <optional>
#include <string>
#include <utility>
#include <vector>
@@ -28,12 +30,14 @@
#include "core/data_type/data_type.h"
#include "exprs/vexpr_fwd.h"
#include "format/reader/expr/literal.h"
+
namespace doris::reader {
struct TableColumn;
struct TableFilter;
struct SchemaField;
struct FileScanRequest;
+struct FieldProjection;
enum class TableColumnMappingMode {
BY_FIELD_ID,
@@ -52,6 +56,7 @@ struct ColumnMapping {
int32_t table_column_id = -1;
std::optional<int32_t> file_column_id;
std::string file_column_name;
+ std::vector<int32_t> file_path;
DataTypePtr file_type;
DataTypePtr table_type;
@@ -66,6 +71,7 @@ struct ColumnMapping {
std::vector<ColumnMapping> child_mappings;
bool is_trivial = false;
bool is_constant = false;
+ bool has_complex_projection = false;
TableVirtualColumnType virtual_column_type =
TableVirtualColumnType::INVALID;
VExprContextSPtr default_expr;
};
@@ -110,8 +116,21 @@ public:
private:
const SchemaField* _find_file_field(const TableColumn& table_column,
const std::vector<SchemaField>&
file_schema) const;
+ Status _create_direct_mapping(const TableColumn& table_column, const
SchemaField& file_field,
+ ColumnMapping* mapping) const;
+ Status _build_complex_projection(const ColumnMapping& mapping,
+ FieldProjection* projection) const;
+
+ ColumnMapping* _find_mapping(int32_t table_column_id) {
+ for (auto& mapping : _mappings) {
+ if (mapping.table_column_id == table_column_id) {
+ return &mapping;
+ }
+ }
+ return nullptr;
+ }
- const ColumnMapping* _find_mapping(ColumnId table_column_id) const {
+ const ColumnMapping* _find_mapping(int32_t table_column_id) const {
for (const auto& mapping : _mappings) {
if (mapping.table_column_id == table_column_id) {
return &mapping;
diff --git a/be/src/format/reader/file_reader.h
b/be/src/format/reader/file_reader.h
index cb2096fd80a..918e2b4bd35 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -59,9 +59,22 @@ struct SchemaField {
std::string name;
DataTypePtr type;
std::vector<SchemaField> children;
+ std::vector<int32_t> file_path;
+ std::vector<int32_t> field_id_path;
+ std::vector<std::string> name_path;
ColumnType column_type = ColumnType::DATA_COLUMN;
};
+// File-local nested projection. The top-level scan column is still represented
+// by FileScanRequest::predicate_columns/non_predicate_columns; this tree only
+// describes which child paths are needed inside a complex top-level field.
+struct FieldProjection {
+ ColumnId file_column_id = -1;
+ std::vector<int32_t> file_path;
+ bool project_all_children = true;
+ std::vector<FieldProjection> children;
+};
+
// 已经 localize 到文件 schema 的过滤条件。
// TableColumnMapper 负责把 table-level filter 转成这个结构;FileReader 只消费
// file-local column id、表达式和结构化谓词。
@@ -96,6 +109,7 @@ struct FileScanRequest {
std::vector<ColumnId> predicate_columns;
std::vector<ColumnId> non_predicate_columns;
std::map<ColumnId, size_t> column_positions;
+ std::map<ColumnId, FieldProjection> complex_projections;
std::vector<FileLocalFilter> local_filters;
// fallback path if filters cannot be localized to file-local predicates.
The expression can reference projected_file_columns and partition columns.
std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map;
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index 4f28c4e1aaa..c9589af8017 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -26,8 +26,13 @@
#include <vector>
#include "common/status.h"
+#include "core/assert_cast.h"
#include "core/block/block.h"
#include "core/data_type/data_type.h"
+#include "core/data_type/data_type_array.h"
+#include "core/data_type/data_type_map.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_struct.h"
#include "exprs/vexpr_context.h"
#include "exprs/vexpr_fwd.h"
#include "format/reader/column_mapper.h"
@@ -180,8 +185,8 @@ public:
size_t idx = 0;
for (const auto& mapping : _data_reader.column_mapper.mappings()) {
ColumnPtr column;
- RETURN_IF_ERROR(_materialize_mapping_column(
- mapping, &_data_reader.block_template, current_rows,
&column));
+ RETURN_IF_ERROR(_materialize_mapping_column(mapping,
&_data_reader.block_template,
+ current_rows,
&column));
block->replace_by_position(idx, std::move(column));
idx++;
}
@@ -233,7 +238,13 @@ protected:
DORIS_CHECK(block_position < _data_reader.scan_schema.size());
const auto* field = _find_schema_field(_data_reader.block_schema,
file_column_id);
DORIS_CHECK(field != nullptr);
- _data_reader.scan_schema[block_position] = *field;
+ auto projection_it =
file_request->complex_projections.find(file_column_id);
+ if (projection_it == file_request->complex_projections.end()) {
+ _data_reader.scan_schema[block_position] = *field;
+ } else {
+ RETURN_IF_ERROR(_project_schema_field(*field,
projection_it->second,
+
&_data_reader.scan_schema[block_position]));
+ }
}
_data_reader.block_template.reserve(_data_reader.scan_schema.size());
for (const auto& field : _data_reader.scan_schema) {
@@ -342,6 +353,86 @@ private:
return nullptr;
}
+ static Status _project_schema_field(const SchemaField& field, const
FieldProjection& projection,
+ SchemaField* projected_field) {
+ if (projected_field == nullptr) {
+ return Status::InvalidArgument("projected_field is null");
+ }
+ *projected_field = field;
+ if (projection.project_all_children || projection.children.empty()) {
+ return Status::OK();
+ }
+ projected_field->children.clear();
+ for (const auto& child_projection : projection.children) {
+ if (child_projection.file_path.empty()) {
+ return Status::InvalidArgument("Empty projection path for
field {}", field.name);
+ }
+ const int32_t child_idx = child_projection.file_path.back();
+ if (child_idx < 0 || child_idx >=
static_cast<int32_t>(field.children.size())) {
+ return Status::InvalidArgument("Invalid projection child index
{} for field {}",
+ child_idx, field.name);
+ }
+ if (child_projection.file_path !=
field.children[child_idx].file_path) {
+ return Status::InvalidArgument("Invalid projection path for
field {}",
+ field.children[child_idx].name);
+ }
+ SchemaField projected_child;
+ RETURN_IF_ERROR(_project_schema_field(field.children[child_idx],
child_projection,
+ &projected_child));
+ projected_field->children.push_back(std::move(projected_child));
+ }
+ if (projected_field->children.empty()) {
+ return Status::NotSupported("Projection for field {} contains no
children", field.name);
+ }
+ RETURN_IF_ERROR(_rebuild_projected_type(field.type, projected_field));
+ return Status::OK();
+ }
+
+ static Status _rebuild_projected_type(const DataTypePtr& original_type,
+ SchemaField* projected_field) {
+ if (original_type == nullptr) {
+ return Status::InvalidArgument("Cannot rebuild projected type for
field {}",
+ projected_field->name);
+ }
+ DataTypes child_types;
+ Strings child_names;
+ child_types.reserve(projected_field->children.size());
+ child_names.reserve(projected_field->children.size());
+ for (const auto& child : projected_field->children) {
+ child_types.push_back(child.type);
+ child_names.push_back(child.name);
+ }
+
+ const auto primitive_type =
remove_nullable(original_type)->get_primitive_type();
+ DataTypePtr projected_type;
+ switch (primitive_type) {
+ case TYPE_STRUCT:
+ projected_type = std::make_shared<DataTypeStruct>(child_types,
child_names);
+ break;
+ case TYPE_ARRAY:
+ DORIS_CHECK(child_types.size() == 1);
+ projected_type = std::make_shared<DataTypeArray>(child_types[0]);
+ break;
+ case TYPE_MAP:
+ DORIS_CHECK(child_types.size() == 1);
+ DORIS_CHECK(remove_nullable(child_types[0])->get_primitive_type()
== TYPE_STRUCT);
+ {
+ const auto* entry_type =
+ assert_cast<const
DataTypeStruct*>(remove_nullable(child_types[0]).get());
+ DORIS_CHECK(entry_type->get_elements().size() == 2);
+ projected_type =
std::make_shared<DataTypeMap>(entry_type->get_element(0),
+
entry_type->get_element(1));
+ }
+ break;
+ default:
+ return Status::InvalidArgument("Cannot project children from
non-complex field {}",
+ projected_field->name);
+ }
+ projected_field->type =
+ original_type->is_nullable() ? make_nullable(projected_type) :
projected_type;
+ return Status::OK();
+ }
+
Status _parse_delete_predicates(const SplitReadOptions& options);
};
diff --git a/be/test/format/new_parquet/parquet_column_reader_test.cpp
b/be/test/format/new_parquet/parquet_column_reader_test.cpp
index e4a0841f5af..97773a5bada 100644
--- a/be/test/format/new_parquet/parquet_column_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_column_reader_test.cpp
@@ -25,19 +25,23 @@
#include <functional>
#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include "core/assert_cast.h"
#include "core/column/column_decimal.h"
#include "core/column/column_nullable.h"
#include "core/column/column_string.h"
+#include "core/column/column_struct.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type.h"
#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_struct.h"
#include "core/types.h"
#include "format/new_parquet/column_reader.h"
#include "format/new_parquet/parquet_column_schema.h"
#include "format/new_parquet/selection_vector.h"
+#include "format/reader/file_reader.h"
namespace doris::parquet {
namespace {
@@ -100,8 +104,7 @@ protected:
}
std::shared_ptr<arrow::Array> build_fixed_binary_array(
- const std::shared_ptr<arrow::DataType>& type,
- const std::vector<std::string>& values) {
+ const std::shared_ptr<arrow::DataType>& type, const
std::vector<std::string>& values) {
arrow::FixedSizeBinaryBuilder builder(type,
arrow::default_memory_pool());
for (const auto& value : values) {
EXPECT_TRUE(builder.Append(reinterpret_cast<const
uint8_t*>(value.data())).ok());
@@ -119,6 +122,28 @@ protected:
return finish_array(&builder);
}
+ std::shared_ptr<arrow::Array> build_required_struct_array() {
+ auto struct_type = arrow::struct_({arrow::field("a", arrow::int32(),
false),
+ arrow::field("b", arrow::utf8(),
false)});
+ std::vector<std::shared_ptr<arrow::ArrayBuilder>> field_builders;
+ auto a_array_builder = std::make_unique<arrow::Int32Builder>();
+
field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(a_array_builder)));
+ auto b_array_builder = std::make_unique<arrow::StringBuilder>();
+
field_builders.push_back(std::shared_ptr<arrow::ArrayBuilder>(std::move(b_array_builder)));
+ arrow::StructBuilder builder(struct_type, arrow::default_memory_pool(),
+ std::move(field_builders));
+ auto* a_builder =
assert_cast<arrow::Int32Builder*>(builder.field_builder(0));
+ auto* b_builder =
assert_cast<arrow::StringBuilder*>(builder.field_builder(1));
+ const std::vector<int32_t> a_values = {101, 102, 103, 104, 105};
+ const std::vector<std::string> b_values = {"sa", "sb", "sc", "sd",
"se"};
+ for (size_t row = 0; row < a_values.size(); ++row) {
+ EXPECT_TRUE(builder.Append().ok());
+ EXPECT_TRUE(a_builder->Append(a_values[row]).ok());
+ EXPECT_TRUE(b_builder->Append(b_values[row]).ok());
+ }
+ return finish_array(&builder);
+ }
+
std::shared_ptr<arrow::Array> build_time32_array(const
std::shared_ptr<arrow::DataType>& type,
const
std::vector<int32_t>& values) {
arrow::Time32Builder builder(type, arrow::default_memory_pool());
@@ -138,8 +163,7 @@ protected:
}
std::shared_ptr<arrow::Array> build_timestamp_array(
- const std::shared_ptr<arrow::DataType>& type,
- const std::vector<int64_t>& values) {
+ const std::shared_ptr<arrow::DataType>& type, const
std::vector<int64_t>& values) {
arrow::TimestampBuilder builder(type, arrow::default_memory_pool());
for (const auto value : values) {
EXPECT_TRUE(builder.Append(value).ok());
@@ -147,9 +171,8 @@ protected:
return finish_array(&builder);
}
- std::shared_ptr<arrow::Array> build_decimal_array(
- const std::shared_ptr<arrow::DataType>& type,
- const std::vector<int64_t>& values) {
+ std::shared_ptr<arrow::Array> build_decimal_array(const
std::shared_ptr<arrow::DataType>& type,
+ const
std::vector<int64_t>& values) {
arrow::Decimal128Builder builder(type, arrow::default_memory_pool());
for (const auto value : values) {
EXPECT_TRUE(builder.Append(arrow::Decimal128(value)).ok());
@@ -165,16 +188,16 @@ protected:
}
void write_parquet_file() {
- add_field(arrow::field("bool_col", arrow::boolean(), false),
- build_required_array<arrow::BooleanBuilder, bool>(
- {true, false, true, false, true}),
- [](const ParquetColumnSchema& schema, const IColumn& column)
{
- EXPECT_EQ(schema.type_descriptor.physical_type,
::parquet::Type::BOOLEAN);
- const auto& values = assert_cast<const
ColumnBool&>(column);
- EXPECT_EQ(values.get_element(0), 1);
- EXPECT_EQ(values.get_element(1), 0);
- EXPECT_EQ(values.get_element(4), 1);
- });
+ add_field(
+ arrow::field("bool_col", arrow::boolean(), false),
+ build_required_array<arrow::BooleanBuilder, bool>({true,
false, true, false, true}),
+ [](const ParquetColumnSchema& schema, const IColumn& column) {
+ EXPECT_EQ(schema.type_descriptor.physical_type,
::parquet::Type::BOOLEAN);
+ const auto& values = assert_cast<const
ColumnBool&>(column);
+ EXPECT_EQ(values.get_element(0), 1);
+ EXPECT_EQ(values.get_element(1), 0);
+ EXPECT_EQ(values.get_element(4), 1);
+ });
add_field(arrow::field("int32_col", arrow::int32(), false),
build_required_array<arrow::Int32Builder, int32_t>({10, 20,
30, 40, 50}),
[](const ParquetColumnSchema& schema, const IColumn& column)
{
@@ -192,18 +215,17 @@ protected:
EXPECT_EQ(values.get_element(0), 10000000000L);
EXPECT_EQ(values.get_element(1), -9L);
});
- add_field(arrow::field("float_col", arrow::float32(), false),
- build_required_array<arrow::FloatBuilder, float>(
- {1.5F, -2.25F, 3.0F, 4.5F, 5.75F}),
- [](const ParquetColumnSchema& schema, const IColumn& column)
{
- EXPECT_EQ(schema.type_descriptor.physical_type,
::parquet::Type::FLOAT);
- const auto& values = assert_cast<const
ColumnFloat32&>(column);
- EXPECT_FLOAT_EQ(values.get_element(0), 1.5F);
- EXPECT_FLOAT_EQ(values.get_element(1), -2.25F);
- });
+ add_field(
+ arrow::field("float_col", arrow::float32(), false),
+ build_required_array<arrow::FloatBuilder, float>({1.5F,
-2.25F, 3.0F, 4.5F, 5.75F}),
+ [](const ParquetColumnSchema& schema, const IColumn& column) {
+ EXPECT_EQ(schema.type_descriptor.physical_type,
::parquet::Type::FLOAT);
+ const auto& values = assert_cast<const
ColumnFloat32&>(column);
+ EXPECT_FLOAT_EQ(values.get_element(0), 1.5F);
+ EXPECT_FLOAT_EQ(values.get_element(1), -2.25F);
+ });
add_field(arrow::field("double_col", arrow::float64(), false),
- build_required_array<arrow::DoubleBuilder, double>(
- {3.5, -4.75, 6.0, 7.25, 8.5}),
+ build_required_array<arrow::DoubleBuilder, double>({3.5,
-4.75, 6.0, 7.25, 8.5}),
[](const ParquetColumnSchema& schema, const IColumn& column)
{
EXPECT_EQ(schema.type_descriptor.physical_type,
::parquet::Type::DOUBLE);
const auto& values = assert_cast<const
ColumnFloat64&>(column);
@@ -263,31 +285,27 @@ protected:
EXPECT_EQ(schema.type->to_string(column, 1),
"00:00:01.000000");
EXPECT_EQ(schema.type->to_string(column, 2),
"01:02:03.004567");
});
- add_field(arrow::field("timestamp_millis_col",
- arrow::timestamp(arrow::TimeUnit::MILLI),
false),
+ add_field(arrow::field("timestamp_millis_col",
arrow::timestamp(arrow::TimeUnit::MILLI),
+ false),
build_timestamp_array(arrow::timestamp(arrow::TimeUnit::MILLI),
{0, 1234, 1609459200000,
1609459201000, -1}),
[](const ParquetColumnSchema& schema, const IColumn& column)
{
EXPECT_EQ(schema.type_descriptor.physical_type,
::parquet::Type::INT64);
EXPECT_EQ(remove_nullable(schema.type)->get_primitive_type(),
TYPE_DATETIMEV2);
- EXPECT_EQ(schema.type->to_string(column, 1),
- "1970-01-01 00:00:01.234");
- EXPECT_EQ(schema.type->to_string(column, 4),
- "1969-12-31 23:59:59.999");
+ EXPECT_EQ(schema.type->to_string(column, 1), "1970-01-01
00:00:01.234");
+ EXPECT_EQ(schema.type->to_string(column, 4), "1969-12-31
23:59:59.999");
});
- add_field(arrow::field("timestamp_micros_col",
- arrow::timestamp(arrow::TimeUnit::MICRO),
false),
+ add_field(arrow::field("timestamp_micros_col",
arrow::timestamp(arrow::TimeUnit::MICRO),
+ false),
build_timestamp_array(arrow::timestamp(arrow::TimeUnit::MICRO),
{0, 1234567, 1609459200000000,
1609459201000000, -1}),
[](const ParquetColumnSchema& schema, const IColumn& column)
{
EXPECT_EQ(schema.type_descriptor.physical_type,
::parquet::Type::INT64);
EXPECT_EQ(remove_nullable(schema.type)->get_primitive_type(),
TYPE_DATETIMEV2);
- EXPECT_EQ(schema.type->to_string(column, 1),
- "1970-01-01 00:00:01.234567");
- EXPECT_EQ(schema.type->to_string(column, 4),
- "1969-12-31 23:59:59.999999");
+ EXPECT_EQ(schema.type->to_string(column, 1), "1970-01-01
00:00:01.234567");
+ EXPECT_EQ(schema.type->to_string(column, 4), "1969-12-31
23:59:59.999999");
});
add_field(arrow::field("decimal_fixed_binary_9_2_col",
arrow::decimal128(9, 2), false),
build_decimal_array(arrow::decimal128(9, 2), {12345, -67, 0,
987, 1000}),
@@ -295,8 +313,7 @@ protected:
EXPECT_EQ(schema.type_descriptor.physical_type,
::parquet::Type::FIXED_LEN_BYTE_ARRAY);
EXPECT_TRUE(schema.type_descriptor.is_decimal);
-
EXPECT_EQ(remove_nullable(schema.type)->get_primitive_type(),
- TYPE_DECIMAL32);
+
EXPECT_EQ(remove_nullable(schema.type)->get_primitive_type(), TYPE_DECIMAL32);
const auto& values = assert_cast<const
ColumnDecimal32&>(column);
EXPECT_EQ(values.get_element(0), Decimal32(12345));
EXPECT_EQ(schema.type->to_string(column, 0), "123.45");
@@ -308,8 +325,7 @@ protected:
EXPECT_EQ(schema.type_descriptor.physical_type,
::parquet::Type::FIXED_LEN_BYTE_ARRAY);
EXPECT_TRUE(schema.type_descriptor.is_decimal);
-
EXPECT_EQ(remove_nullable(schema.type)->get_primitive_type(),
- TYPE_DECIMAL64);
+
EXPECT_EQ(remove_nullable(schema.type)->get_primitive_type(), TYPE_DECIMAL64);
const auto& values = assert_cast<const
ColumnDecimal64&>(column);
EXPECT_EQ(values.get_element(0), Decimal64(1234567));
EXPECT_EQ(schema.type->to_string(column, 0), "1.234567");
@@ -329,6 +345,26 @@ protected:
EXPECT_EQ(nested_column.get_element(0), 1);
EXPECT_EQ(nested_column.get_element(2), 3);
});
+ add_field(arrow::field("struct_col",
+ arrow::struct_({
+ arrow::field("a", arrow::int32(),
false),
+ arrow::field("b", arrow::utf8(), false),
+ }),
+ false),
+ build_required_struct_array(),
+ [](const ParquetColumnSchema& schema, const IColumn& column)
{
+
EXPECT_EQ(remove_nullable(schema.type)->get_primitive_type(), TYPE_STRUCT);
+ const auto& struct_column = assert_cast<const
ColumnStruct&>(column);
+ ASSERT_EQ(struct_column.get_columns().size(), 2);
+ const auto& a_values =
+ assert_cast<const
ColumnInt32&>(struct_column.get_column(0));
+ const auto& b_values =
+ assert_cast<const
ColumnString&>(struct_column.get_column(1));
+ EXPECT_EQ(a_values.get_element(0), 101);
+ EXPECT_EQ(a_values.get_element(4), 105);
+ EXPECT_EQ(b_values.get_data_at(1).to_string(), "sb");
+ EXPECT_EQ(b_values.get_data_at(4).to_string(), "se");
+ });
auto schema = arrow::schema(_arrow_fields);
auto table = arrow::Table::Make(schema, _arrays);
@@ -341,9 +377,8 @@ protected:
builder.version(::parquet::ParquetVersion::PARQUET_2_6);
builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
builder.compression(::parquet::Compression::UNCOMPRESSED);
- PARQUET_THROW_NOT_OK(
- ::parquet::arrow::WriteTable(*table,
arrow::default_memory_pool(), out, ROW_COUNT,
- builder.build()));
+ PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table,
arrow::default_memory_pool(), out,
+ ROW_COUNT,
builder.build()));
}
std::unique_ptr<ParquetColumnReader> create_reader(size_t field_idx) const
{
@@ -378,6 +413,9 @@ protected:
TEST_F(ParquetColumnReaderTest, ReadAllSupportedPhysicalAndLogicalTypes) {
for (size_t field_idx = 0; field_idx < _fields.size(); ++field_idx) {
SCOPED_TRACE(_fields[field_idx]->name);
+ if (_fields[field_idx]->kind != ParquetColumnSchemaKind::PRIMITIVE) {
+ continue;
+ }
ASSERT_TRUE(supports_record_reader(_fields[field_idx]->type_descriptor));
read_and_validate(field_idx);
}
@@ -418,10 +456,64 @@ TEST_F(ParquetColumnReaderTest,
SelectReadsOnlySelectedRanges) {
EXPECT_EQ(int_values.get_element(2), 50);
}
+TEST_F(ParquetColumnReaderTest, ReadProjectedStructChildren) {
+ const auto field_idx = _fields.size() - 1;
+ const auto& struct_schema = *_fields[field_idx];
+ ASSERT_EQ(struct_schema.name, "struct_col");
+ ASSERT_EQ(struct_schema.children.size(), 2);
+
+ reader::FieldProjection projection;
+ projection.file_column_id = struct_schema.top_level_field_id;
+ projection.file_path = struct_schema.file_path;
+ projection.project_all_children = false;
+ reader::FieldProjection child_projection;
+ child_projection.file_column_id = struct_schema.top_level_field_id;
+ child_projection.file_path = struct_schema.children[1]->file_path;
+ projection.children.push_back(std::move(child_projection));
+
+ ParquetColumnReaderFactory factory(_row_group,
_file_reader->metadata()->num_columns());
+ std::unique_ptr<ParquetColumnReader> reader;
+ auto st = factory.create(struct_schema, &projection, &reader);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(remove_nullable(reader->type())->get_primitive_type(),
TYPE_STRUCT);
+ const auto* projected_type =
+ assert_cast<const
DataTypeStruct*>(remove_nullable(reader->type()).get());
+ ASSERT_EQ(projected_type->get_elements().size(), 1);
+ EXPECT_EQ(projected_type->get_element_name(0), "b");
+
+ MutableColumnPtr column = reader->type()->create_column();
+ int64_t rows_read = 0;
+ st = reader->read(ROW_COUNT, column, &rows_read);
+ ASSERT_TRUE(st.ok()) << st;
+ ASSERT_EQ(rows_read, ROW_COUNT);
+ const auto& struct_column = assert_cast<const ColumnStruct&>(*column);
+ ASSERT_EQ(struct_column.get_columns().size(), 1);
+ const auto& values = assert_cast<const
ColumnString&>(struct_column.get_column(0));
+ EXPECT_EQ(values.get_data_at(0).to_string(), "sa");
+ EXPECT_EQ(values.get_data_at(4).to_string(), "se");
+}
+
+TEST_F(ParquetColumnReaderTest, BuildComplexSchemaPathMetadata) {
+ const auto field_idx = _fields.size() - 1;
+ const auto& struct_schema = *_fields[field_idx];
+ ASSERT_EQ(struct_schema.name, "struct_col");
+ ASSERT_EQ(struct_schema.children.size(), 2);
+ EXPECT_EQ(struct_schema.file_path,
std::vector<int32_t>({static_cast<int32_t>(field_idx)}));
+ EXPECT_EQ(struct_schema.name_path,
std::vector<std::string>({"struct_col"}));
+ EXPECT_EQ(struct_schema.children[0]->file_path,
+ std::vector<int32_t>({static_cast<int32_t>(field_idx), 0}));
+ EXPECT_EQ(struct_schema.children[1]->file_path,
+ std::vector<int32_t>({static_cast<int32_t>(field_idx), 1}));
+ EXPECT_EQ(struct_schema.children[0]->name_path,
std::vector<std::string>({"struct_col", "a"}));
+ EXPECT_EQ(struct_schema.children[1]->name_path,
std::vector<std::string>({"struct_col", "b"}));
+ EXPECT_EQ(struct_schema.max_definition_level, 0);
+ EXPECT_EQ(struct_schema.max_repetition_level, 0);
+}
+
TEST_F(ParquetColumnReaderTest, ResolveSupportedPhysicalAndLogicalSchemas) {
std::vector<::parquet::schema::NodePtr> nodes = {
- ::parquet::schema::PrimitiveNode::Make(
- "required_bool", ::parquet::Repetition::REQUIRED,
::parquet::Type::BOOLEAN),
+ ::parquet::schema::PrimitiveNode::Make("required_bool",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::BOOLEAN),
::parquet::schema::PrimitiveNode::Make(
"required_int32", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT32),
::parquet::schema::PrimitiveNode::Make(
@@ -430,41 +522,42 @@ TEST_F(ParquetColumnReaderTest,
ResolveSupportedPhysicalAndLogicalSchemas) {
"required_float", ::parquet::Repetition::REQUIRED,
::parquet::Type::FLOAT),
::parquet::schema::PrimitiveNode::Make(
"required_double", ::parquet::Repetition::REQUIRED,
::parquet::Type::DOUBLE),
- ::parquet::schema::PrimitiveNode::Make(
- "required_binary", ::parquet::Repetition::REQUIRED,
::parquet::Type::BYTE_ARRAY),
+ ::parquet::schema::PrimitiveNode::Make("required_binary",
+
::parquet::Repetition::REQUIRED,
+
::parquet::Type::BYTE_ARRAY),
::parquet::schema::PrimitiveNode::Make(
"required_fixed_binary", ::parquet::Repetition::REQUIRED,
::parquet::Type::FIXED_LEN_BYTE_ARRAY,
::parquet::ConvertedType::NONE, 4),
::parquet::schema::PrimitiveNode::Make(
"optional_int32", ::parquet::Repetition::OPTIONAL,
::parquet::Type::INT32),
- ::parquet::schema::PrimitiveNode::Make(
- "utf8_binary", ::parquet::Repetition::REQUIRED,
::parquet::Type::BYTE_ARRAY,
- ::parquet::ConvertedType::UTF8),
- ::parquet::schema::PrimitiveNode::Make(
- "enum_binary", ::parquet::Repetition::REQUIRED,
::parquet::Type::BYTE_ARRAY,
- ::parquet::ConvertedType::ENUM),
- ::parquet::schema::PrimitiveNode::Make(
- "json_binary", ::parquet::Repetition::REQUIRED,
::parquet::Type::BYTE_ARRAY,
- ::parquet::ConvertedType::JSON),
- ::parquet::schema::PrimitiveNode::Make(
- "bson_binary", ::parquet::Repetition::REQUIRED,
::parquet::Type::BYTE_ARRAY,
- ::parquet::ConvertedType::BSON),
- ::parquet::schema::PrimitiveNode::Make(
- "decimal_int32", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT32,
- ::parquet::ConvertedType::DECIMAL, -1, 9, 2),
- ::parquet::schema::PrimitiveNode::Make(
- "decimal_int64", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT64,
- ::parquet::ConvertedType::DECIMAL, -1, 18, 6),
+ ::parquet::schema::PrimitiveNode::Make("utf8_binary",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::BYTE_ARRAY,
+
::parquet::ConvertedType::UTF8),
+ ::parquet::schema::PrimitiveNode::Make("enum_binary",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::BYTE_ARRAY,
+
::parquet::ConvertedType::ENUM),
+ ::parquet::schema::PrimitiveNode::Make("json_binary",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::BYTE_ARRAY,
+
::parquet::ConvertedType::JSON),
+ ::parquet::schema::PrimitiveNode::Make("bson_binary",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::BYTE_ARRAY,
+
::parquet::ConvertedType::BSON),
+ ::parquet::schema::PrimitiveNode::Make("decimal_int32",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::INT32,
+
::parquet::ConvertedType::DECIMAL, -1, 9, 2),
+ ::parquet::schema::PrimitiveNode::Make("decimal_int64",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::INT64,
+
::parquet::ConvertedType::DECIMAL, -1, 18, 6),
::parquet::schema::PrimitiveNode::Make(
"decimal_binary", ::parquet::Repetition::REQUIRED,
::parquet::Type::BYTE_ARRAY,
::parquet::ConvertedType::DECIMAL, -1, 18, 6),
- ::parquet::schema::PrimitiveNode::Make(
- "decimal_fixed_binary", ::parquet::Repetition::REQUIRED,
- ::parquet::Type::FIXED_LEN_BYTE_ARRAY,
::parquet::ConvertedType::DECIMAL, 8,
- 18, 6),
- ::parquet::schema::PrimitiveNode::Make(
- "date_int32", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT32,
- ::parquet::ConvertedType::DATE),
+ ::parquet::schema::PrimitiveNode::Make("decimal_fixed_binary",
+
::parquet::Repetition::REQUIRED,
+
::parquet::Type::FIXED_LEN_BYTE_ARRAY,
+
::parquet::ConvertedType::DECIMAL, 8, 18, 6),
+ ::parquet::schema::PrimitiveNode::Make("date_int32",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::INT32,
+
::parquet::ConvertedType::DATE),
::parquet::schema::PrimitiveNode::Make(
"time_millis_int32", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT32,
::parquet::ConvertedType::TIME_MILLIS),
@@ -477,27 +570,27 @@ TEST_F(ParquetColumnReaderTest,
ResolveSupportedPhysicalAndLogicalSchemas) {
::parquet::schema::PrimitiveNode::Make(
"timestamp_micros_int64", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT64,
::parquet::ConvertedType::TIMESTAMP_MICROS),
- ::parquet::schema::PrimitiveNode::Make(
- "int8_int32", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT32,
- ::parquet::ConvertedType::INT_8),
- ::parquet::schema::PrimitiveNode::Make(
- "uint8_int32", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT32,
- ::parquet::ConvertedType::UINT_8),
- ::parquet::schema::PrimitiveNode::Make(
- "int16_int32", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT32,
- ::parquet::ConvertedType::INT_16),
- ::parquet::schema::PrimitiveNode::Make(
- "uint16_int32", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT32,
- ::parquet::ConvertedType::UINT_16),
- ::parquet::schema::PrimitiveNode::Make(
- "int32_int32", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT32,
- ::parquet::ConvertedType::INT_32),
- ::parquet::schema::PrimitiveNode::Make(
- "uint32_int32", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT32,
- ::parquet::ConvertedType::UINT_32),
- ::parquet::schema::PrimitiveNode::Make(
- "int64_int64", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT64,
- ::parquet::ConvertedType::INT_64),
+ ::parquet::schema::PrimitiveNode::Make("int8_int32",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::INT32,
+
::parquet::ConvertedType::INT_8),
+ ::parquet::schema::PrimitiveNode::Make("uint8_int32",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::INT32,
+
::parquet::ConvertedType::UINT_8),
+ ::parquet::schema::PrimitiveNode::Make("int16_int32",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::INT32,
+
::parquet::ConvertedType::INT_16),
+ ::parquet::schema::PrimitiveNode::Make("uint16_int32",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::INT32,
+
::parquet::ConvertedType::UINT_16),
+ ::parquet::schema::PrimitiveNode::Make("int32_int32",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::INT32,
+
::parquet::ConvertedType::INT_32),
+ ::parquet::schema::PrimitiveNode::Make("uint32_int32",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::INT32,
+
::parquet::ConvertedType::UINT_32),
+ ::parquet::schema::PrimitiveNode::Make("int64_int64",
::parquet::Repetition::REQUIRED,
+ ::parquet::Type::INT64,
+
::parquet::ConvertedType::INT_64),
};
auto schema =
@@ -523,13 +616,13 @@ TEST_F(ParquetColumnReaderTest,
RejectUnsupportedPhysicalAndLogicalTypes) {
{
::parquet::schema::PrimitiveNode::Make(
"int96_col", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT96),
- ::parquet::schema::PrimitiveNode::Make(
- "repeated_int32_col",
::parquet::Repetition::REPEATED,
- ::parquet::Type::INT32),
+
::parquet::schema::PrimitiveNode::Make("repeated_int32_col",
+
::parquet::Repetition::REPEATED,
+
::parquet::Type::INT32),
::parquet::schema::PrimitiveNode::Make(
"decimal256_fixed_col",
::parquet::Repetition::REQUIRED,
- ::parquet::Type::FIXED_LEN_BYTE_ARRAY,
::parquet::ConvertedType::DECIMAL,
- 20, 39, 6),
+ ::parquet::Type::FIXED_LEN_BYTE_ARRAY,
+ ::parquet::ConvertedType::DECIMAL, 20, 39, 6),
::parquet::schema::PrimitiveNode::Make(
"uint64_col", ::parquet::Repetition::REQUIRED,
::parquet::Type::INT64,
::parquet::ConvertedType::UINT_64),
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp
b/be/test/format/new_parquet/parquet_reader_test.cpp
index 7e28b7fce5b..5341b4060b5 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include "format/new_parquet/parquet_reader.h"
+
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <gtest/gtest.h>
@@ -24,6 +26,7 @@
#include <filesystem>
#include <memory>
#include <string>
+#include <utility>
#include <vector>
#include "core/assert_cast.h"
@@ -31,11 +34,13 @@
#include "core/column/column_string.h"
#include "core/column/column_vector.h"
#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "core/data_type/data_type_struct.h"
#include "core/data_type/primitive_type.h"
#include "core/field.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
-#include "format/new_parquet/parquet_reader.h"
+#include "format/reader/column_mapper.h"
#include "format/reader/file_reader.h"
#include "gen_cpp/Types_types.h"
#include "io/io_common.h"
@@ -78,7 +83,8 @@ private:
};
VExprContextSPtr create_int32_greater_than_conjunct(int column_id, int32_t
value) {
- auto ctx =
VExprContext::create_shared(std::make_shared<Int32GreaterThanExpr>(column_id,
value));
+ auto ctx =
+
VExprContext::create_shared(std::make_shared<Int32GreaterThanExpr>(column_id,
value));
ctx->_prepared = true;
ctx->_opened = true;
return ctx;
@@ -111,9 +117,9 @@ void write_parquet_file(const std::string& file_path,
int64_t row_group_size = R
arrow::field("id", arrow::int32(), false),
arrow::field("value", arrow::utf8(), false),
});
- auto table = arrow::Table::Make(
- schema, {build_int32_array({1, 2, 3, 4, 5}),
- build_string_array({"one", "two", "three", "four",
"five"})});
+ auto table = arrow::Table::Make(schema,
+ {build_int32_array({1, 2, 3, 4, 5}),
+ build_string_array({"one", "two",
"three", "four", "five"})});
auto file_result = arrow::io::FileOutputStream::Open(file_path);
ASSERT_TRUE(file_result.ok()) << file_result.status();
@@ -123,8 +129,8 @@ void write_parquet_file(const std::string& file_path,
int64_t row_group_size = R
builder.version(::parquet::ParquetVersion::PARQUET_2_6);
builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
builder.compression(::parquet::Compression::UNCOMPRESSED);
- PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
- *table, arrow::default_memory_pool(), out, row_group_size,
builder.build()));
+ PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table,
arrow::default_memory_pool(), out,
+ row_group_size,
builder.build()));
}
Block build_file_block(const std::vector<reader::SchemaField>& schema) {
@@ -175,6 +181,57 @@ TEST(FileReaderTest, OpenStoresRequestAndCloseClearsState)
{
EXPECT_TRUE(reader.eof());
}
+TEST(TableColumnMapperTest, CreatesComplexProjectionForStructChildren) {
+ reader::SchemaField struct_field;
+ struct_field.id = 0;
+ struct_field.name = "s";
+ struct_field.file_path = {0};
+ reader::SchemaField a_field;
+ a_field.id = 0;
+ a_field.name = "a";
+ a_field.type = std::make_shared<DataTypeInt32>();
+ a_field.file_path = {0, 0};
+ reader::SchemaField b_field;
+ b_field.id = 0;
+ b_field.name = "b";
+ b_field.type = std::make_shared<DataTypeString>();
+ b_field.file_path = {0, 1};
+ struct_field.children = {a_field, b_field};
+ struct_field.type = std::make_shared<DataTypeStruct>(DataTypes
{a_field.type, b_field.type},
+ Strings {"a", "b"});
+
+ reader::TableColumn table_child;
+ table_child.id = 101;
+ table_child.name = "b";
+ table_child.type = b_field.type;
+ reader::TableColumn table_column;
+ table_column.id = 100;
+ table_column.name = "s";
+ table_column.type = std::make_shared<DataTypeStruct>(DataTypes
{b_field.type}, Strings {"b"});
+ table_column.children = {table_child};
+
+ reader::TableColumnMapperOptions options;
+ options.mode = reader::TableColumnMappingMode::BY_NAME;
+ reader::TableColumnMapper mapper(options);
+ ASSERT_TRUE(mapper.create_mapping({table_column}, {},
{struct_field}).ok());
+
+ auto request = std::make_unique<reader::FileScanRequest>();
+ ASSERT_TRUE(mapper.create_scan_request({}, {table_column},
request.get()).ok());
+ ASSERT_EQ(request->non_predicate_columns,
std::vector<reader::ColumnId>({0}));
+ ASSERT_EQ(request->complex_projections.size(), 1);
+ const auto& projection = request->complex_projections.at(0);
+ EXPECT_EQ(projection.file_path, std::vector<int32_t>({0}));
+ ASSERT_FALSE(projection.project_all_children);
+ ASSERT_EQ(projection.children.size(), 1);
+ EXPECT_EQ(projection.children[0].file_path, std::vector<int32_t>({0, 1}));
+
+ ASSERT_EQ(mapper.mappings().size(), 1);
+ const auto* projected_type =
+ assert_cast<const
DataTypeStruct*>(mapper.mappings()[0].file_type.get());
+ ASSERT_EQ(projected_type->get_elements().size(), 1);
+ EXPECT_EQ(projected_type->get_element_name(0), "b");
+}
+
class NewParquetReaderTest : public testing::Test {
protected:
void SetUp() override {
diff --git a/docs/doris-arrow-parquet-complex-types-implementation.md
b/docs/doris-arrow-parquet-complex-types-implementation.md
new file mode 100644
index 00000000000..1ee0dabc944
--- /dev/null
+++ b/docs/doris-arrow-parquet-complex-types-implementation.md
@@ -0,0 +1,559 @@
+# Doris Arrow Parquet Reader 复杂类型完整支持方案
+
+本文档描述 `be/src/format/new_parquet/` 新 Parquet reader 对 `STRUCT`、`LIST`、`MAP`
复杂类型的完整支持方案。
+
+目标是在现有 file-local reader 边界内补齐复杂类型读取能力:
+
+- 继续复用 Arrow C++ Parquet core API 解析文件、row group、column chunk 和 leaf value。
+- 输出仍然是 Doris `Block` / `Column`,不引入
`parquet::arrow::FileReader`、`arrow::RecordBatch` 或 `arrow::Table` 作为 scan 输出路径。
+- `ParquetReader` 仍只理解 Parquet file-local schema,不处理 Iceberg/global schema
evolution。
+- schema change、default/generated/partition column、delete、virtual column 仍由
`TableReader` / `TableColumnMapper` 负责。
+- 复杂类型读取必须以 Parquet definition level / repetition level 为准,不能依赖简单 row count 拼接。
+- 复杂类型列裁剪是本轮实现目标:读取 top-level complex column 时,只读取被请求的 child subtree。
+- 复杂类型 schema change 不在本轮实现,但本轮设计必须保留 field id、path、level 和 projection tree
边界,保证后续可以在 `TableColumnMapper` 中补齐 child-level mapping。
+
+## 参考实现:DuckDB Parquet Reader
+
+参考目录:
+
+```text
+/Users/xiaogangsu/code/duckdb/extension/parquet/
+```
+
+重点参考文件:
+
+```text
+extension/parquet/include/parquet_column_schema.hpp
+extension/parquet/include/column_reader.hpp
+extension/parquet/parquet_reader.cpp
+extension/parquet/column_reader.cpp
+extension/parquet/reader/struct_column_reader.cpp
+extension/parquet/reader/list_column_reader.cpp
+```
+
+DuckDB 中值得借鉴的核心结构:
+
+- `ParquetColumnSchema` 保存
`max_define`、`max_repeat`、`schema_index`、`column_index`,schema tree 本身携带 Dremel
level 信息。
+- `ParseSchemaRecursive()` 在解析 schema 时递增 definition/repetition level,并把
legacy repeated field、3-level LIST、MAP/MAP_KEY_VALUE 统一成 reader 可消费的 schema
tree。
+- primitive reader 读取 leaf value 的同时输出 definition/repetition level。
+- struct reader 递归读取 children,并用 child 输出的 definition level 设置 struct null。
+- list/map reader 不直接按 row 数读取 child;它读取 child leaf stream,根据当前 list/map 层的
repetition level 折叠出 parent rows、offsets 和 null map。
+- skip/select 是 reader 级语义,不是 column filter fallback;复杂类型 skip 也必须消费对应的 level
stream,保证所有 child reader 游标一致。
+
+Doris 不需要照搬 DuckDB 的 thrift/page decoder;当前方案仍优先封装 Arrow internal
`RecordReader`。但 DuckDB 的 reader 分层和 level 组装模型应作为 Doris 复杂类型支持的主参考。
+
+## 当前 Doris 状态
+
+现有文件:
+
+```text
+be/src/format/new_parquet/parquet_reader.*
+be/src/format/new_parquet/column_reader.*
+be/src/format/new_parquet/parquet_column_schema.*
+be/src/format/new_parquet/parquet_type.*
+be/src/format/new_parquet/selection_vector.h
+```
+
+已有能力:
+
+- schema builder 可以识别 `STRUCT`、`LIST`、`MAP`,并生成
`DataTypeStruct`、`DataTypeArray`、`DataTypeMap`。
+- `ScalarColumnReader` 支持 flat primitive/string/decimal/date/time/timestamp。
+- `StructColumnReader` 递归读取 children,支持非常基础的非 nullable struct。
+- `ColumnReader::select()` 已经定义为 `skip + read` 的 selected read,不退化为整批读取后过滤。
+
+主要缺口:
+
+- `ParquetColumnSchema` 没有保存完整 `max_definition_level` / `max_repetition_level`
和各复杂节点的 level 边界。
+- `ScalarColumnReader` 当前只支持 `max_repetition_level == 0 &&
max_definition_level <= 1`。
+- primitive reader 没有向 parent reader 暴露 leaf definition/repetition level
stream。
+- nullable struct、list、map 没有 assembler。
+- repeated primitive、legacy repeated group、嵌套 list/map/struct 没有统一 schema 规约。
+- `skip(rows)` 对复杂类型还不是 parent-row 语义。
+
+## 总体设计
+
+复杂类型读取分为两层:
+
+```text
+ParquetReader
+ -> ParquetColumnReader public API
+ read(parent_rows, output_column, rows_read)
+ skip(parent_rows)
+ select(selection, selected_rows, batch_rows, output_column)
+ -> Nested read API
+ read_nested(parent_rows, level_state, output_column, rows_read)
+ skip_nested(parent_rows, level_state)
+ -> Leaf RecordReader adapter
+ read leaf values + definition levels + repetition levels
+ -> Dremel assembler
+ Struct / List / Map build Doris columns
+```
+
+对 `ParquetReader` 来说,接口仍然是 top-level file-local row batch;复杂类型细节只存在于
`column_reader.*` 内部。
+
+### 关键原则
+
+- public `read(rows)` 和 `skip(rows)` 的 `rows` 始终表示当前 reader 对外暴露的 parent rows。
+- leaf reader 内部可以读取更多 physical records,但不能把 physical value count 泄露给
`ParquetReader`。
+- list/map 的 offsets 只能由 repetition level 生成,不能用 child column size 推断。
+- nullable 信息只能由 definition level 生成,不能通过 value 缺失猜测。
+- 所有复杂类型 reader 必须保持 child reader 游标严格同步;遇到不一致 level stream 应返回 `Corruption`。
+- 复杂类型 reader 不处理 table/global schema change;child-level schema evolution 后续在
`TableColumnMapper` 处理。
+- 复杂类型 reader 必须支持 file-local child projection。未投影 child 不创建 leaf reader,不读取对应
column chunk,不参与 value materialization。
+- 即使 child 被裁剪,也必须保留足够的 schema/path/level 元数据,使后续 schema change 可以把 table
child 映射到 file child、default child 或 cast projection。
+
+## Schema 扩展
+
+扩展 `ParquetColumnSchema`:
+
+```text
+struct ParquetColumnSchema {
+ int field_id;
+ int top_level_field_id;
+ int leaf_column_id;
+ int schema_node_id;
+ int parent_schema_node_id;
+ std::vector<int> file_path;
+ std::vector<int32_t> field_id_path;
+ std::vector<std::string> name_path;
+ std::string name;
+ DataTypePtr type;
+ ParquetColumnSchemaKind kind;
+ const parquet::schema::Node* node;
+ const parquet::ColumnDescriptor* descriptor;
+ ParquetTypeDescriptor type_descriptor;
+ int16_t max_definition_level;
+ int16_t max_repetition_level;
+ int16_t nullable_definition_level;
+ int16_t repeated_repetition_level;
+ std::vector<std::unique_ptr<ParquetColumnSchema>> children;
+};
+```
+
+字段含义:
+
+- `schema_node_id`:Parquet schema tree 中的 node ordinal,用于 debug、error
message、field id tracing。
+- `top_level_field_id`:FileScanRequest 使用的 file-local top-level id。
+- `leaf_column_id`:Parquet physical leaf column ordinal。复杂节点为 `-1`。
+- `file_path`:从 top-level field 到当前节点的 file-local child ordinal path,例如
`profile.address.city` 可以表示为 `[3, 0, 1]`。
+- `field_id_path`:从 top-level field 到当前节点的 Parquet field id path。缺失 field id
时使用 `-1` 占位,不在 file reader 层解释 Iceberg 语义。
+- `name_path`:从 top-level field 到当前节点的 Parquet node name path,用于 by-name
fallback、error message 和后续 schema change。
+- `max_definition_level` / `max_repetition_level`:该节点下 leaf stream 的最大
level。复杂节点取其 subtree leaf 的约束值。
+- `nullable_definition_level`:该节点自身从 null 变成 defined 所需的 definition
level。required 节点为 parent level,不额外增加。
+- `repeated_repetition_level`:该 repeated/list/map 层对应的 repetition level。非
repeated 节点为 parent level。
+
+Schema builder 改造:
+
+- 从 root 递归解析,每进入 optional 节点 `definition_level + 1`。
+- 每进入 repeated 节点 `definition_level + 1` 且 `repetition_level + 1`。
+- 识别标准 3-level LIST:
+
+```text
+optional group a (LIST) {
+ repeated group list {
+ optional <element_type> element;
+ }
+}
+```
+
+- 识别 legacy repeated primitive/group:
+
+```text
+repeated int32 a;
+repeated group a { ... }
+```
+
+并规约为 Doris `Array(element_type)`。
+
+- 识别 MAP/MAP_KEY_VALUE:
+
+```text
+optional group m (MAP) {
+ repeated group key_value {
+ required key_type key;
+ optional value_type value;
+ }
+}
+```
+
+并规约为 Doris `Map(key_type, value_type)`。
+
+- MAP key 按 Parquet 规范应为 required。若文件声明 nullable key,应在 schema 阶段返回
`NotSupported` 或 `Corruption`,不生成可继续执行的 reader。
+
+## 复杂类型列裁剪
+
+复杂类型列裁剪应在 file-local 层实现,语义是“只读取投影需要的 child subtree”,不是 table schema evolution。
+
+建议扩展 `reader::FileScanRequest`,增加嵌套 projection tree:
+
+```text
+struct FieldProjection {
+ ColumnId file_column_id;
+ std::vector<int> file_path;
+ bool project_all_children;
+ std::vector<FieldProjection> children;
+};
+
+struct FileScanRequest {
+ std::vector<ColumnId> predicate_columns;
+ std::vector<ColumnId> non_predicate_columns;
+ std::map<ColumnId, size_t> column_positions;
+ std::map<ColumnId, FieldProjection> complex_projections;
+ ...
+};
+```
+
+约束:
+
+- `predicate_columns` / `non_predicate_columns` 仍表示 top-level file-local
fields。
+- `complex_projections` 只描述 top-level complex field 内部需要读取哪些 child。
+- 没有出现在 `complex_projections` 的 top-level complex field 默认
`project_all_children = true`,保持兼容。
+- 对 `STRUCT`,允许只投影部分 children,输出 `DataTypeStruct` 只包含被投影 children,child 顺序保持
file schema 顺序。
+- 对 `LIST`,允许裁剪 element subtree。例如 `Array(Struct<a,b,c>)` 投影 `a,c` 时,输出
`Array(Struct<a,c>)`。
+- 对 `MAP`,key 永远需要读取并输出;value subtree 可以裁剪。例如 `Map<String, Struct<a,b>>` 投影
value.a 时,输出 `Map<String, Struct<a>>`。
+- 对 nullable parent,parent null map 和 offsets 必须完整生成;裁剪只影响 child value
materialization,不能影响 parent row shape。
+- 对所有 children 都被裁剪的 `STRUCT`,仍要能够根据某个保留的 level-driving child 生成 parent
row/null 形态。第一版可以要求至少保留一个 child;如果上层真的只需要 parent 存在性,后续补充
`NullShapeColumnReader`。
+
+`ParquetColumnReaderFactory` 应接收 projection tree:
+
+```text
+Status create(const ParquetColumnSchema& column_schema,
+ const FieldProjection* projection,
+ std::unique_ptr<ParquetColumnReader>* reader) const;
+```
+
+实现要求:
+
+- factory 只为投影中的 leaf 创建 `ScalarColumnReader`。
+- struct/list/map reader 保存 child reader slot;未投影 child 用 `nullptr` 表示,参考
DuckDB `StructColumnReader` 的 child reader 布局。
+- `TotalCompressedSize`、prefetch、statistics 等后续能力只能统计已投影 leaf。
+- 对 top-level output block,`TableReader` 需要使用 projection 后的 `SchemaField` /
`DataTypePtr` 构建 block template,而不是原始完整 file schema。
+
+列裁剪与延时物化的关系:
+
+- predicate complex child projection 和 output complex child projection
需要合并,避免同一 leaf 重复读取。
+- 如果 predicate 只依赖 complex child,FileScanRequest 应能表达该 child path 是 predicate
projection。
+- 本轮可以先支持 output child pruning;predicate child pruning 可在 batch 内 complex
predicate 接入时补齐,但 projection tree 的结构必须现在预留。
+
+## Schema Change 兼容边界
+
+复杂类型 schema change 不在本轮实现,原因是它涉及 table/global schema、Iceberg field id、default
value、cast、generated column 和 filter fallback,属于 `TableColumnMapper` /
`TableReader` 范围。
+
+但本轮实现必须保证后续可扩展:
+
+- file schema 中每个 node 都必须导出
`file_path`、`field_id_path`、`name_path`、file-local type 和 child schema。
+- reader 内部不得把 `SchemaField::id` 同时当作 Iceberg field id 和 file-local column
id。top-level scan id 只表示 file-local top-level ordinal。
+- `TableColumnMapper` 后续可以根据 table child field id/name path 生成
`FieldProjection`,也可以为缺失 child 生成 default/constant/finalize projection。
+- file reader 输出的 pruned complex type 是 file-local projected type;table reader
负责把它 finalize 成 table/global type。
+- filter localization 后续可以定位到 complex child path。无法安全定位或需要 cast 的 filter 进入
`reader_expression_map` 或 table-level finalize filter。
+- 不在 `ParquetReader` 中补缺失 child,不在 `ParquetReader` 中做 child cast,不在
`ParquetReader` 中解释 Iceberg field id。
+
+后续 schema change 的目标形态:
+
+```text
+table projection/filter
+ -> TableColumnMapper child-level mapping
+ -> FieldProjection(file-local child paths)
+ -> ParquetReader reads projected file-local complex block
+ -> TableReader fills default/generated/partition children
+ -> TableReader applies child cast/finalize/delete/virtual semantics
+```
+
+因此,本轮列裁剪实现时不能把 output type 和 original file type 强绑定。所有 `ColumnReader` 创建和
block template 构造都应基于 projected schema view。
+
+## Level 读取抽象
+
+新增内部结构,位置建议:
+
+```text
+be/src/format/new_parquet/level.h
+be/src/format/new_parquet/level.cpp
+```
+
+核心结构:
+
+```text
+struct LevelBatch {
+ int64_t record_count;
+ int64_t value_count;
+ std::vector<int16_t> definition_levels;
+ std::vector<int16_t> repetition_levels;
+};
+
+struct NestedReadResult {
+ int64_t parent_rows;
+ int64_t physical_records;
+};
+```
+
+`ScalarColumnReader` 内部新增 leaf read 路径:
+
+```text
+read_leaf_records(max_records, decoded_values, level_batch)
+skip_leaf_records(max_records, level_batch)
+```
+
+要求:
+
+- Arrow internal `RecordReader` 的创建和调用继续封装在 `column_reader.*`,不能泄露到
`ParquetReader`。
+- flat primitive 保持当前 `read()` 快路径。
+- nested primitive 必须允许 `max_repetition_level > 0` 或 `max_definition_level >
1`,并输出 definition/repetition levels。
+- `DecodedColumnView::row_count` 对 nested leaf 应表示 value slots 数量,null slot 由
definition level 决定。
+
+如果 Arrow internal `RecordReader` 无法稳定提供 Doris 需要的 level/value 对齐语义,则新增 Doris
自己的 leaf page decoder,范围仍限制在 `format/new_parquet/`,不要把 page decoder 细节扩散到
`ParquetReader` 主流程。
+
+## Reader 分层
+
+建议拆分 `column_reader.cpp`,避免复杂类型 assembler 混在 scalar 读值热路径:
+
+```text
+be/src/format/new_parquet/column_reader.h
+be/src/format/new_parquet/column_reader.cpp
+be/src/format/new_parquet/scalar_column_reader.cpp
+be/src/format/new_parquet/struct_column_reader.cpp
+be/src/format/new_parquet/list_column_reader.cpp
+be/src/format/new_parquet/map_column_reader.cpp
+be/src/format/new_parquet/level.h
+be/src/format/new_parquet/level.cpp
+```
+
+### ScalarColumnReader
+
+职责:
+
+- 读取 primitive leaf values。
+- 生成 leaf-level definition/repetition level。
+- 对 flat column 直接写 Doris scalar/nullable column。
+- 对 nested leaf 只作为 child reader 被复杂类型 assembler 调用。
+
+flat path:
+
+```text
+read(rows)
+ -> RecordReader::ReadRecords(rows)
+ -> DecodedColumnView
+ -> DataTypeSerDe::read_column_from_decoded_values
+```
+
+nested path:
+
+```text
+read_nested(parent_rows, level_state)
+ -> read leaf records until parent_rows complete
+ -> append valid leaf values into child column
+ -> expose level_batch to parent assembler
+```
+
+### StructColumnReader
+
+输出:
+
+- non-nullable struct:`ColumnStruct`。
+- nullable struct:`ColumnNullable(ColumnStruct, null_map)`。
+
+算法:
+
+1. 对每个 child reader 读取同样的 parent row count。
+2. child reader 返回的 parent rows 必须一致。
+3. struct 自身 nullable 时,根据 definition level 判断 struct row 是否 null。
+4. 对 null struct row,每个 child column 仍必须补一个 default/null slot,保证
`ColumnStruct` 所有 child size 等于 struct row count。
+5. child 本身的 null 由 child reader 自己根据更深层 definition level 处理。
+
+注意:
+
+- 当前实现仅递归读取 children,没有处理 nullable struct;应改为显式处理 struct-level null map。
+- 对未投影 children 不创建 reader、不写入 output `ColumnStruct`。
+- 对所有 children 都未投影的 struct,第一版可以返回 `NotSupported`,后续用 shape-only reader 支持
parent 存在性读取。
+
+### ListColumnReader
+
+输出:
+
+- non-nullable array:`ColumnArray(element_column, offsets)`。
+- nullable array:`ColumnNullable(ColumnArray, null_map)`。
+
+核心算法参考 DuckDB list reader:
+
+1. 从 child reader 读取 leaf stream,获得 child values、definition levels、repetition
levels。
+2. 根据当前 list 层的 `repeated_repetition_level` 判断一个 child record 是否属于当前 list:
+ - `rep == list_repetition_level`:当前 list 的后续 element。
+ - `rep < list_repetition_level`:新的 parent row 开始。
+3. 根据 definition level 判断 parent row 状态:
+ - `def < list_defined_level`:null list。
+ - `def == empty_list_level`:empty list。
+ - `def >= element_defined_level`:有 element。
+4. 对每个 parent row 写一个 offset。
+5. 只有 element defined 时向 element column append value;empty/null list 不 append
element。
+
+需要维护 overflow:
+
+- child reader 一次读取可能跨过本次 `parent_rows` 的边界。
+- list reader 必须缓存未消费的 child values 和 levels,下一次 `read()` 继续使用。
+- 该缓存是 reader 游标状态的一部分,`skip()` 和 `read()` 都必须共享。
+
+### MapColumnReader
+
+输出:
+
+- non-nullable map:`ColumnMap(key_column, value_column, offsets)`。
+- nullable map:`ColumnNullable(ColumnMap, null_map)`。
+
+实现方式:
+
+- 按 Parquet schema 将 map 规约为 `LIST<STRUCT<key, value>>` 的 level stream。
+- 复用 list assembler 的 parent row 边界判断。
+- 对每个 entry:
+ - key 必须 defined;key 缺失是文件格式错误。
+ - value 可 nullable;由 value child definition level 生成 value null map。
+- append entry 时分别写 key column 和 value column。
+- offsets 表示每个 map row 的 entry 数。
+
+不要把 `MAP` 先 materialize 成 `Array(Struct(key,value))` 再转换为
`ColumnMap`,否则会产生额外内存和拷贝。可以在内部复用 list 的边界识别逻辑,但直接写 `ColumnMap` 的
keys/values/offsets。
+
+## Skip 和 Select
+
+public 语义保持不变:
+
+```text
+skip(parent_rows)
+select(selection, selected_rows, batch_rows, column)
+```
+
+复杂类型要求:
+
+- `skip()` 必须消费 parent rows 对应的所有 child physical records 和 level stream。
+- `select()` 继续使用现有 range 合并策略,即按 selected row ranges 调用 `skip()` + `read()`。
+- list/map 的 `skip()` 不能只跳过 child value count;必须按 repetition level 找到 parent
row 边界。
+- empty selection 时必须跳过整个 batch 的 parent rows,保证 reader 游标推进。
+
+第一阶段不实现 page-level row range selection;只保证 `skip + read` 的 selected read 正确。
+
+## 与 ParquetReader Scan Loop 的关系
+
+`ParquetReader::_read_current_row_group_batch()` 不需要理解复杂类型:
+
+- predicate columns 仍先读。
+- non-predicate columns 仍根据 selection 调用 `read()` 或 `select()`。
+- column reader 自己负责 complex column 的 parent-row 语义。
+
+限制:
+
+- 初期不支持复杂类型直接作为 filter column 执行 batch predicate。
+- row group statistics 仍只对 primitive leaf 做保守裁剪。
+- complex child-level projection 是本轮 reader 实现目标;但 complex child predicate 执行和
schema change finalize 不在本轮完成。
+
+## 错误处理
+
+遇到明确违反 Parquet spec 或 reader invariant 的情况,应返回错误或触发检查,不能静默修复:
+
+- MAP key nullable 或 key definition level 缺失。
+- 同一 struct 的 children parent row count 不一致。
+- list/map repetition level 非法回退或超过当前 schema 最大值。
+- leaf reader 返回的 value count、definition/repetition level 数量不一致。
+- child reader overflow 状态与下一次 read/skip 请求冲突。
+
+对合法但暂未支持的编码形态返回 `NotSupported`,例如后续若发现 Arrow internal `RecordReader` 无法支持某类
nested level 输出。
+
+## 测试计划
+
+新增或扩展 BE UT:
+
+```text
+be/test/format/new_parquet/parquet_complex_reader_test.cpp
+```
+
+优先用 Arrow writer 生成小 Parquet 文件,覆盖:
+
+- required struct。
+- optional struct。
+- struct child nullable。
+- array of primitive:null array、empty array、array with null element。
+- array of struct。
+- nested array:`Array(Array(String))`。
+- map:empty map、null map、nullable value。
+- struct containing array/map。
+- multiple row groups。
+- child projection:struct child 裁剪、array element struct child 裁剪、map value
struct child 裁剪。
+- selected read:复杂列作为 non-predicate column,predicate column 过滤出稀疏 selection。
+- skip then read:直接验证复杂列 reader 游标。
+
+后续回归测试:
+
+```text
+regression-test/suites/external_table_p0/parquet_complex_types.groovy
+```
+
+要求:
+
+- 结果排序稳定,使用 `order_qt` 或显式 `order by`。
+- 错误场景使用 `test { sql; exception }`。
+- 测试前 drop table,不在测试末尾 drop,便于失败后排查。
+
+## 分阶段落地
+
+### 阶段 1:Schema level 信息补齐
+
+- 扩展 `ParquetColumnSchema`,保存 definition/repetition level。
+- 增加 `file_path`、`field_id_path`、`name_path`,并明确 top-level file-local id 与
table field id 的边界。
+- 重写 `build_parquet_column_schema()` 的复杂类型规约逻辑。
+- 增加 schema-only UT,覆盖 LIST/MAP legacy 和 standard encodings。
+
+### 阶段 1.5:Projection tree 和 projected schema view
+
+- 扩展 `FileScanRequest`,表达 top-level complex field 的 child projection tree。
+- 增加 projected `SchemaField` / `DataTypePtr` 构造逻辑。
+- `ParquetColumnReaderFactory` 接收 projection tree,只创建被投影 child reader。
+- 增加 child pruning UT,验证未投影 leaf 不创建 reader、不读取 column chunk。
+
+### 阶段 2:Leaf level reader
+
+- 为 `ScalarColumnReader` 增加 nested leaf read API。
+- 去掉 `max_repetition_level == 0 && max_definition_level <= 1` 的硬限制,改成 flat
path 和 nested path 分支。
+- 验证 nullable primitive 在 nested path 下的 value/null 对齐。
+
+### 阶段 3:Struct reader 完整化
+
+- 实现 nullable struct。
+- 保证 null struct row 对所有 children 插入 default/null slot。
+- 增加 required/optional struct UT。
+
+### 阶段 4:List reader
+
+- 实现 list assembler、offset 写入、null/empty list 区分。
+- 实现 overflow child buffer。
+- 实现 list `skip()`。
+- 增加 array、nested array、array of struct UT。
+
+### 阶段 5:Map reader
+
+- 实现 map schema 规约到 key/value children。
+- 直接写 `ColumnMap` keys、values、offsets。
+- 校验 required key。
+- 增加 map UT。
+
+### 阶段 6:Selected read 和集成测试
+
+- 验证 complex non-predicate column 在 lazy materialization 下正确。
+- 验证 complex projected child 在 lazy materialization 下正确。
+- 增加 sparse selection、empty selection、multi-row-group 测试。
+- 将复杂类型 reader 接入 `ParquetReader` 现有 scan loop,不改 table/global schema 边界。
+
+### 阶段 7:优化和扩展
+
+- complex child predicate execution。
+- complex column statistics 和 page index 支持。
+- complex predicate fallback。
+- 复杂列 schema change child-level mapping。
+
+## 验收标准
+
+完成“复杂类型完整支持”至少需要满足:
+
+- `STRUCT`、nullable `STRUCT`、`LIST`、nested `LIST`、`MAP` 可以正确读入 Doris complex
columns。
+- 复杂类型 child projection 可以裁剪未请求 leaf,并输出 projected complex type。
+- null、empty、missing element/value 的语义与 Parquet definition/repetition level 一致。
+- `read()`、`skip()`、`select()` 在复杂类型上均保持 parent-row 语义。
+- flat primitive 现有测试不退化。
+- 新增 BE UT 覆盖复杂类型基础、嵌套、selected read 和 multi-row-group。
+- `ParquetReader` 不引入 table/global schema 语义。
+- schema/path/level 元数据足够后续 `TableColumnMapper` 实现 child-level schema
change,不需要重写复杂类型 reader 主体。
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]