This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 44a1a20e65 [feature-wip](parquet-reader)parse parquet schema (#11381) 44a1a20e65 is described below commit 44a1a20e65ad2b5e1ae05cefd90ced9c2a26a516 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Tue Aug 2 10:56:13 2022 +0800 [feature-wip](parquet-reader)parse parquet schema (#11381) Analyze schema elements in parquet FileMetaData, and generate the hierarchy of nested fields. For exmpale: 1. primitive type ``` // thrift: optional int32 <column-name>; // sql definition: <column-name> int32; ``` 2. nested type ``` // thrift: optional group <column-name> (LIST) { repeated group bag { optional group array_element (LIST) { repeated group bag { optional int32 array_element } } } } // sql definition: <column-name> array<array<int32>> ``` --- be/src/util/slice.h | 4 + be/src/vec/exec/format/parquet/schema_desc.cpp | 363 ++++++++++++++++++++- be/src/vec/exec/format/parquet/schema_desc.h | 91 +++++- .../exec/format/parquet/vparquet_file_metadata.h | 4 +- .../test_data/parquet_scanner/hive-complex.parquet | Bin 0 -> 1721 bytes be/test/vec/exec/parquet/parquet_thrift_test.cpp | 62 ++++ 6 files changed, 503 insertions(+), 21 deletions(-) diff --git a/be/src/util/slice.h b/be/src/util/slice.h index 90f48224ea..1a7ee78a01 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -141,6 +141,10 @@ public: return ((size >= x.size) && (mem_equal(data, x.data, x.size))); } + bool ends_with(const Slice& x) const { + return ((size >= x.size) && mem_equal(data + (size - x.size), x.data, x.size)); + } + /// @brief Comparator struct, useful for ordered collections (like STL maps). struct Comparator { /// Compare two slices using Slice::compare() diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp b/be/src/vec/exec/format/parquet/schema_desc.cpp index 21275d2bb4..88b3210341 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.cpp +++ b/be/src/vec/exec/format/parquet/schema_desc.cpp @@ -17,17 +17,370 @@ #include "schema_desc.h" +#include "gutil/strings/substitute.h" + namespace doris::vectorized { -SchemaDescriptor::~SchemaDescriptor() { - // fields.clear(); +static bool is_group_node(const tparquet::SchemaElement& schema) { + return schema.num_children > 0; +} + +static bool is_list_node(const tparquet::SchemaElement& schema) { + return schema.__isset.converted_type && schema.converted_type == tparquet::ConvertedType::LIST; +} + +static bool is_map_node(const tparquet::SchemaElement& schema) { + return schema.__isset.converted_type && + (schema.converted_type == tparquet::ConvertedType::MAP || + schema.converted_type == tparquet::ConvertedType::MAP_KEY_VALUE); +} + +static bool is_repeated_node(const tparquet::SchemaElement& schema) { + return schema.__isset.repetition_type && + schema.repetition_type == tparquet::FieldRepetitionType::REPEATED; +} + +static bool is_required_node(const tparquet::SchemaElement& schema) { + return schema.__isset.repetition_type && + schema.repetition_type == tparquet::FieldRepetitionType::REQUIRED; +} + +static bool is_optional_node(const tparquet::SchemaElement& schema) { + return schema.__isset.repetition_type && + schema.repetition_type == tparquet::FieldRepetitionType::OPTIONAL; +} + +static int num_children_node(const tparquet::SchemaElement& schema) { + return schema.__isset.num_children ? schema.num_children : 0; +} + +static void set_child_node_level(FieldSchema* parent, size_t rep_inc = 0, size_t def_inc = 0) { + for (auto& child : parent->children) { + child.repetition_level = parent->repetition_level + rep_inc; + child.definition_level = parent->definition_level + def_inc; + } } -std::string SchemaDescriptor::debug_string() const { - return std::string(); +static bool is_struct_list_node(const tparquet::SchemaElement& schema) { + const std::string& name = schema.name; + static const Slice array_slice("array", 5); + static const Slice tuple_slice("_tuple", 6); + Slice slice(name); + return slice == array_slice || slice.ends_with(tuple_slice); } std::string FieldSchema::debug_string() const { - return std::string(); + std::stringstream ss; + ss << "FieldSchema(name=" << name << ", R=" << repetition_level << ", D=" << definition_level; + if (children.size() > 0) { + ss << ", type=" << type.type << ", children=["; + for (int i = 0; i < children.size(); ++i) { + if (i != 0) { + ss << ", "; + } + ss << children[i].debug_string(); + } + ss << "]"; + } else { + ss << ", physical_type=" << physical_type; + } + ss << ")"; + return ss.str(); +} + +Status FieldDescriptor::parse_from_thrift(const std::vector<tparquet::SchemaElement>& t_schemas) { + if (t_schemas.size() == 0 || !is_group_node(t_schemas[0])) { + return Status::InvalidArgument("Wrong parquet root schema element"); + } + auto& root_schema = t_schemas[0]; + _fields.resize(root_schema.num_children); + _next_schema_pos = 1; + for (int i = 0; i < root_schema.num_children; ++i) { + RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &_fields[i])); + if (_name_to_field.find(_fields[i].name) != _name_to_field.end()) { + return Status::InvalidArgument( + strings::Substitute("Duplicated field name: $0", _fields[i].name)); + } + _name_to_field.emplace(_fields[i].name, &_fields[i]); + } + + if (_next_schema_pos != t_schemas.size()) { + return Status::InvalidArgument(strings::Substitute("Remaining $0 unparsed schema elements", + t_schemas.size() - _next_schema_pos)); + } + + return Status::OK(); +} + +Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas, + size_t curr_pos, FieldSchema* node_field) { + if (curr_pos >= t_schemas.size()) { + return Status::InvalidArgument("Out-of-bounds index of schema elements"); + } + auto& t_schema = t_schemas[curr_pos]; + if (is_group_node(t_schema)) { + // nested structure or nullable list + return parse_group_field(t_schemas, curr_pos, node_field); + } + if (is_repeated_node(t_schema)) { + // repeated <primitive-type> <name> (LIST) + // produce required list<element> + node_field->children.resize(1); + set_child_node_level(node_field); + auto child = &node_field->children[0]; + parse_physical_field(t_schema, false, child); + + node_field->name = t_schema.name; + node_field->type.type = TYPE_ARRAY; + node_field->is_nullable = false; + _next_schema_pos = curr_pos + 1; + } else { + bool is_optional = is_optional_node(t_schema); + if (is_optional) { + node_field->definition_level++; + } + parse_physical_field(t_schema, is_optional, node_field); + _next_schema_pos = curr_pos + 1; + } + return Status::OK(); +} + +void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physical_schema, + bool is_nullable, FieldSchema* physical_field) { + physical_field->name = physical_schema.name; + physical_field->parquet_schema = physical_schema; + physical_field->is_nullable = is_nullable; + physical_field->physical_type = physical_schema.type; + _physical_fields.push_back(physical_field); + physical_field->physical_column_index = _physical_fields.size() - 1; } + +Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElement>& t_schemas, + size_t curr_pos, FieldSchema* group_field) { + auto& group_schema = t_schemas[curr_pos]; + if (is_map_node(group_schema)) { + // the map definition: + // optional group <name> (MAP) { + // repeated group map (MAP_KEY_VALUE) { + // required <type> key; + // optional <type> value; + // } + // } + return parse_map_field(t_schemas, curr_pos, group_field); + } + if (is_list_node(group_schema)) { + // the list definition: + // optional group <name> (LIST) { + // repeated group [bag | list] { // hive or spark + // optional <type> [array_element | element]; // hive or spark + // } + // } + return parse_list_field(t_schemas, curr_pos, group_field); + } + + if (is_repeated_node(group_schema)) { + group_field->children.resize(1); + set_child_node_level(group_field); + auto struct_field = &group_field->children[0]; + // the list of struct: + // repeated group <name> (LIST) { + // optional/required <type> <name>; + // ... + // } + // produce a non-null list<struct> + RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, struct_field)); + + group_field->name = group_schema.name; + group_field->type.type = TYPE_ARRAY; + group_field->is_nullable = false; + } else { + RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos, group_field)); + } + + return Status::OK(); +} + +Status FieldDescriptor::parse_list_field(const std::vector<tparquet::SchemaElement>& t_schemas, + size_t curr_pos, FieldSchema* list_field) { + // the list definition: + // spark and hive have three level schemas but with different schema name + // spark: <column-name> - "list" - "list_child" + // hive: <column-name> - "bag" - "array_element" + // parse three level schemas to two level primitive like: LIST<INT>, + // or nested structure like: LIST<MAP<INT, INT>> + auto& first_level = t_schemas[curr_pos]; + if (first_level.num_children != 1) { + return Status::InvalidArgument("List list_child should have only one child"); + } + + if (curr_pos + 1 >= t_schemas.size()) { + return Status::InvalidArgument("List list_child should have the second level schema"); + } + + if (first_level.repetition_type == tparquet::FieldRepetitionType::REPEATED) { + return Status::InvalidArgument("List list_child can't be a repeated schema"); + } + + // the repeated schema list_child + auto& second_level = t_schemas[curr_pos + 1]; + if (second_level.repetition_type != tparquet::FieldRepetitionType::REPEATED) { + return Status::InvalidArgument("The second level of list list_child should be repeated"); + } + + // This indicates if this list is nullable. + bool is_optional = is_optional_node(first_level); + if (is_optional) { + list_field->definition_level++; + } + list_field->children.resize(1); + FieldSchema* list_child = &list_field->children[0]; + + size_t num_children = num_children_node(second_level); + if (num_children > 0) { + if (num_children == 1 && !is_struct_list_node(second_level)) { + // optional field, and the third level element is the nested structure in list + // produce nested structure like: LIST<INT>, LIST<MAP>, LIST<LIST<...>> + // skip bag/list, but it's a repeated element, so increase repetition and definition level + set_child_node_level(list_field, 1, 1); + RETURN_IF_ERROR(parse_node_field(t_schemas, curr_pos + 2, list_child)); + } else { + // required field, produce the list of struct + set_child_node_level(list_field); + RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos + 1, list_child)); + } + } else if (num_children == 0) { + // required two level list, for compatibility reason. + set_child_node_level(list_field); + parse_physical_field(second_level, false, list_child); + _next_schema_pos = curr_pos + 2; + } + + list_field->name = first_level.name; + list_field->type.type = TYPE_ARRAY; + list_field->type.children.push_back(list_field->children[0].type); + list_field->is_nullable = is_optional; + + return Status::OK(); +} + +Status FieldDescriptor::parse_map_field(const std::vector<tparquet::SchemaElement>& t_schemas, + size_t curr_pos, FieldSchema* map_field) { + // the map definition in parquet: + // optional group <name> (MAP) { + // repeated group map (MAP_KEY_VALUE) { + // required <type> key; + // optional <type> value; + // } + // } + // Map value can be optional, the map without values is a SET + if (curr_pos + 2 >= t_schemas.size()) { + return Status::InvalidArgument("Map element should have at least three levels"); + } + auto& map_schema = t_schemas[curr_pos]; + if (map_schema.num_children != 1) { + return Status::InvalidArgument( + "Map element should have only one child(name='map', type='MAP_KEY_VALUE')"); + } + if (is_repeated_node(map_schema)) { + return Status::InvalidArgument("Map element can't be a repeated schema"); + } + auto& map_key_value = t_schemas[curr_pos + 1]; + if (!is_group_node(map_key_value) || !is_repeated_node(map_key_value)) { + return Status::InvalidArgument( + "the second level in map must be a repeated group(key and value)"); + } + auto& map_key = t_schemas[curr_pos + 2]; + if (!is_required_node(map_key)) { + return Status::InvalidArgument("the third level(map key) in map group must be required"); + } + + if (map_key_value.num_children == 1) { + // The map with three levels is a SET + return parse_list_field(t_schemas, curr_pos, map_field); + } + if (map_key_value.num_children != 2) { + // A standard map should have four levels + return Status::InvalidArgument( + "the second level in map(MAP_KEY_VALUE) should have two children"); + } + // standard map + bool is_optional = is_optional_node(map_schema); + if (is_optional) { + map_field->definition_level++; + } + + map_field->children.resize(1); + set_child_node_level(map_field); + auto map_kv_field = &map_field->children[0]; + // produce MAP<STRUCT<KEY, VALUE>> + RETURN_IF_ERROR(parse_struct_field(t_schemas, curr_pos + 1, map_kv_field)); + + map_field->name = map_schema.name; + map_field->type.type = TYPE_MAP; + map_field->is_nullable = is_optional; + + return Status::OK(); +} + +Status FieldDescriptor::parse_struct_field(const std::vector<tparquet::SchemaElement>& t_schemas, + size_t curr_pos, FieldSchema* struct_field) { + // the nested column in parquet, parse group to struct. + auto& struct_schema = t_schemas[curr_pos]; + bool is_optional = is_optional_node(struct_schema); + if (is_optional) { + struct_field->definition_level++; + } else if (is_repeated_node(struct_schema)) { + struct_field->repetition_level++; + struct_field->definition_level++; + } + auto num_children = struct_schema.num_children; + struct_field->children.resize(num_children); + set_child_node_level(struct_field); + _next_schema_pos = curr_pos + 1; + for (int i = 0; i < num_children; ++i) { + RETURN_IF_ERROR(parse_node_field(t_schemas, _next_schema_pos, &struct_field->children[i])); + } + struct_field->name = struct_schema.name; + struct_field->is_nullable = is_optional; + struct_field->type.type = TYPE_STRUCT; + return Status::OK(); +} + +int FieldDescriptor::get_column_index(const std::string& column) const { + for (size_t i = 0; i < _fields.size(); i++) { + if (_fields[i].name == column) { + return i; + } + } + return -1; +} + +const FieldSchema* FieldDescriptor::get_column(const string& name) const { + auto it = _name_to_field.find(name); + if (it != _name_to_field.end()) { + return it->second; + } + return nullptr; +} + +void FieldDescriptor::get_column_names(std::unordered_set<std::string>* names) const { + names->clear(); + for (const FieldSchema& f : _fields) { + names->emplace(f.name); + } +} + +std::string FieldDescriptor::debug_string() const { + std::stringstream ss; + ss << "fields=["; + for (int i = 0; i < _fields.size(); ++i) { + if (i != 0) { + ss << ", "; + } + ss << _fields[i].debug_string(); + } + ss << "]"; + return ss.str(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index 678f633e29..7296762788 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -17,30 +17,93 @@ #pragma once +#include <unordered_map> +#include <vector> + #include "common/status.h" +#include "gen_cpp/parquet_types.h" +#include "runtime/types.h" namespace doris::vectorized { -class FieldSchema { -public: - int16_t max_def_level() const { return _max_def_level; } - int16_t max_rep_level() const { return _max_rep_level; } + +struct FieldSchema { + std::string name; + // the referenced parquet schema element + tparquet::SchemaElement parquet_schema; + + // Used to identify whether this field is a nested field. + TypeDescriptor type; + bool is_nullable; + + // Only valid when this field is a leaf node + tparquet::Type::type physical_type; + // The index order in FieldDescriptor._physical_fields + int physical_column_index = -1; + + int16_t definition_level = 0; + int16_t repetition_level = 0; + std::vector<FieldSchema> children; + std::string debug_string() const; +}; +class FieldDescriptor { private: - int16_t _max_def_level; - int16_t _max_rep_level; - // std::vector<FieldSchema> children; -}; + // Only the schema elements at the first level + std::vector<FieldSchema> _fields; + // The leaf node of schema elements + std::vector<FieldSchema*> _physical_fields; + // Name to _fields, not all schema elements + std::unordered_map<std::string, const FieldSchema*> _name_to_field; + // Used in from_thrift, marking the next schema position that should be parsed + size_t _next_schema_pos; + + void parse_physical_field(const tparquet::SchemaElement& physical_schema, bool is_nullable, + FieldSchema* physical_field); + + Status parse_list_field(const std::vector<tparquet::SchemaElement>& t_schemas, size_t curr_pos, + FieldSchema* list_field); + + Status parse_map_field(const std::vector<tparquet::SchemaElement>& t_schemas, size_t curr_pos, + FieldSchema* map_field); + + Status parse_struct_field(const std::vector<tparquet::SchemaElement>& t_schemas, + size_t curr_pos, FieldSchema* struct_field); + + Status parse_group_field(const std::vector<tparquet::SchemaElement>& t_schemas, size_t curr_pos, + FieldSchema* group_field); + + Status parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas, size_t curr_pos, + FieldSchema* node_field); -class SchemaDescriptor { public: - SchemaDescriptor() = default; - ~SchemaDescriptor(); + FieldDescriptor() = default; + ~FieldDescriptor() = default; - std::string debug_string() const; + /** + * Parse FieldDescriptor from parquet thrift FileMetaData. + * @param t_schemas list of schema elements + */ + Status parse_from_thrift(const std::vector<tparquet::SchemaElement>& t_schemas); -private: - // std::vector<FieldSchema> fields; + int get_column_index(const std::string& column) const; + + /** + * Get the column(the first level schema element, maybe nested field) by index. + * @param index Column index in _fields + */ + const FieldSchema* get_column(int index) const { return &_fields[index]; } + + /** + * Get the column(the first level schema element, maybe nested field) by name. + * @param name Column name + * @return FieldSchema or nullptr if not exists + */ + const FieldSchema* get_column(const std::string& name) const; + + void get_column_names(std::unordered_set<std::string>* names) const; + + std::string debug_string() const; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h index 1ad15edf7f..53d08fa855 100644 --- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h +++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h @@ -31,7 +31,7 @@ public: int32_t num_row_groups() const { return _num_groups; } int32_t num_columns() const { return _num_columns; }; int32_t num_rows() const { return _num_rows; }; - SchemaDescriptor schema() const { return _schema; }; + FieldDescriptor schema() const { return _schema; }; std::string debug_string() const; private: @@ -39,7 +39,7 @@ private: int32_t _num_groups = 0; int32_t _num_columns = 0; int64_t _num_rows = 0; - SchemaDescriptor _schema; + FieldDescriptor _schema; }; } // namespace doris::vectorized diff --git a/be/test/exec/test_data/parquet_scanner/hive-complex.parquet b/be/test/exec/test_data/parquet_scanner/hive-complex.parquet new file mode 100644 index 0000000000..2b626b895a Binary files /dev/null and b/be/test/exec/test_data/parquet_scanner/hive-complex.parquet differ diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index d5ac78264b..5100ea32f3 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -46,6 +46,7 @@ TEST_F(ParquetThriftReaderTest, normal) { std::shared_ptr<FileMetaData> metaData; parse_thrift_footer(&reader, metaData); tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata(); + LOG(WARNING) << "num row groups: " << metaData->num_row_groups(); LOG(WARNING) << "num columns: " << metaData->num_columns(); LOG(WARNING) << "====================================="; @@ -61,6 +62,67 @@ TEST_F(ParquetThriftReaderTest, normal) { } } +TEST_F(ParquetThriftReaderTest, complex_nested_file) { + // hive-complex.parquet is the part of following table: + // complex_nested_table( + // `name` string, + // `income` array<array<int>>, + // `hobby` array<map<string,string>>, + // `friend` map<string,string>, + // `mark` struct<math:int,english:int>) + LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/hive-complex.parquet", 0); + + auto st = reader.open(); + EXPECT_TRUE(st.ok()); + + std::shared_ptr<FileMetaData> metaData; + parse_thrift_footer(&reader, metaData); + tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata(); + FieldDescriptor schemaDescriptor; + schemaDescriptor.parse_from_thrift(t_metadata.schema); + + // table columns + ASSERT_EQ(schemaDescriptor.get_column_index("name"), 0); + auto name = schemaDescriptor.get_column("name"); + ASSERT_TRUE(name->children.size() == 0 && name->physical_column_index >= 0); + ASSERT_TRUE(name->repetition_level == 0 && name->definition_level == 1); + + ASSERT_EQ(schemaDescriptor.get_column_index("income"), 1); + auto income = schemaDescriptor.get_column("income"); + // should be parsed as ARRAY<ARRAY<INT32>> + ASSERT_TRUE(income->type.type == TYPE_ARRAY); + ASSERT_TRUE(income->children.size() == 1); + ASSERT_TRUE(income->children[0].type.type == TYPE_ARRAY); + ASSERT_TRUE(income->children[0].children.size() == 1); + auto i_physical = income->children[0].children[0]; + // five levels for ARRAY<ARRAY<INT32>> + // income --- bag --- array_element --- bag --- array_element + // opt rep opt rep opt + // R=0,D=1 R=1,D=2 R=1,D=3 R=2,D=4 R=2,D=5 + ASSERT_TRUE(i_physical.repetition_level == 2 && i_physical.definition_level == 5); + + ASSERT_EQ(schemaDescriptor.get_column_index("hobby"), 2); + auto hobby = schemaDescriptor.get_column("hobby"); + // should be parsed as ARRAY<MAP<STRUCT<STRING,STRING>>> + ASSERT_TRUE(hobby->children.size() == 1 && hobby->children[0].children.size() == 1 && + hobby->children[0].children[0].children.size() == 2); + ASSERT_TRUE(hobby->type.type == TYPE_ARRAY && hobby->children[0].type.type == TYPE_MAP && + hobby->children[0].children[0].type.type == TYPE_STRUCT); + // hobby(opt) --- bag(rep) --- array_element(opt) --- map(rep) + // \------- key(req) + // \------- value(opt) + // R=0,D=1 R=1,D=2 R=1,D=3 R=2,D=4 + // \------ R=2,D=4 + // \------ R=2,D=5 + auto h_key = hobby->children[0].children[0].children[0]; + auto h_value = hobby->children[0].children[0].children[1]; + ASSERT_TRUE(h_key.repetition_level == 2 && h_key.definition_level == 4); + ASSERT_TRUE(h_value.repetition_level == 2 && h_value.definition_level == 5); + + ASSERT_EQ(schemaDescriptor.get_column_index("friend"), 3); + ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4); +} + } // namespace vectorized } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org