This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1-lakehouse in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1-lakehouse by this push: new cf6eadb4c28 [opt](parquet-reader)Implement late materialization of parquet complex types. (#44098) cf6eadb4c28 is described below commit cf6eadb4c28a1343b69915df69c4ac91d5d966c1 Author: Qi Chen <che...@selectdb.com> AuthorDate: Thu Dec 26 11:23:18 2024 +0800 [opt](parquet-reader)Implement late materialization of parquet complex types. (#44098) ### What problem does this PR solve? Problem Summary: Late materialization is not supported when querying fields with complex types. ### Release note [opt](parquet-reader)Implement late materialization of parquet complex types. --- be/src/vec/exec/format/parquet/parquet_common.cpp | 152 +++---- be/src/vec/exec/format/parquet/parquet_common.h | 156 +++++-- .../exec/format/parquet/vparquet_column_reader.cpp | 170 ++++++-- .../exec/format/parquet/vparquet_column_reader.h | 60 ++- .../exec/format/parquet/vparquet_group_reader.cpp | 58 +-- .../exec/format/parquet/vparquet_group_reader.h | 6 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 3 +- be/test/vec/exec/parquet/parquet_common_test.cpp | 457 +++++++++++++++++++++ .../parquet_nested_type_cross_page_test.cpp | 179 ++++++++ be/test/vec/exec/parquet/parquet_thrift_test.cpp | 8 +- .../parquet_nested_types/create_table.hql | 58 +++ .../multi_catalog/parquet_nested_types/data.tar.gz | Bin 0 -> 36976 bytes .../data_gen_scripts/nested_cross_page_test1.py | 192 +++++++++ .../data_gen_scripts/nested_cross_page_test2.py | 287 +++++++++++++ .../data_gen_scripts/nested_cross_page_test3.py | 196 +++++++++ .../data/multi_catalog/parquet_nested_types/run.sh | 12 + .../hive/test_parquet_nested_types.out | Bin 0 -> 181799 bytes .../hive/test_parquet_nested_types.groovy | 209 ++++++++++ 18 files changed, 1980 insertions(+), 223 deletions(-) diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp index 59e12fcc71a..f71f511edd3 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.cpp +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -28,24 +28,19 @@ const int32_t ParquetInt96::JULIAN_EPOCH_OFFSET_DAYS = 2440588; const int64_t ParquetInt96::MICROS_IN_DAY = 86400000000; const int64_t ParquetInt96::NANOS_PER_MICROSECOND = 1000; -ColumnSelectVector::ColumnSelectVector(const uint8_t* filter_map, size_t filter_map_size, - bool filter_all) { - build(filter_map, filter_map_size, filter_all); -} - -void ColumnSelectVector::build(const uint8_t* filter_map, size_t filter_map_size, bool filter_all) { +Status FilterMap::init(const uint8_t* filter_map_data, size_t filter_map_size, bool filter_all) { _filter_all = filter_all; - _filter_map = filter_map; + _filter_map_data = filter_map_data; _filter_map_size = filter_map_size; if (filter_all) { _has_filter = true; _filter_ratio = 1; - } else if (filter_map == nullptr) { + } else if (filter_map_data == nullptr) { _has_filter = false; _filter_ratio = 0; } else { - size_t filter_count = - simd::count_zero_num(reinterpret_cast<const int8_t*>(filter_map), filter_map_size); + size_t filter_count = simd::count_zero_num(reinterpret_cast<const int8_t*>(filter_map_data), + filter_map_size); if (filter_count == filter_map_size) { _has_filter = true; _filter_all = true; @@ -58,109 +53,68 @@ void ColumnSelectVector::build(const uint8_t* filter_map, size_t filter_map_size _filter_ratio = 0; } } + return Status::OK(); } -void ColumnSelectVector::set_run_length_null_map(const std::vector<uint16_t>& run_length_null_map, - size_t num_values, NullMap* null_map) { - _num_values = num_values; - _num_nulls = 0; - _read_index = 0; - size_t map_index = 0; - bool is_null = false; - if (_has_filter) { - // No run length null map is generated when _filter_all = true - DCHECK(!_filter_all); - _data_map.resize(num_values); - for (auto& run_length : run_length_null_map) { - if (is_null) { - _num_nulls += run_length; - for (int i = 0; i < run_length; ++i) { - _data_map[map_index++] = FILTERED_NULL; - } - } else { - for (int i = 0; i < run_length; ++i) { - _data_map[map_index++] = FILTERED_CONTENT; - } - } - is_null = !is_null; - } - size_t num_read = 0; - DCHECK_LE(_filter_map_index + num_values, _filter_map_size); - for (size_t i = 0; i < num_values; ++i) { - if (_filter_map[_filter_map_index++]) { - _data_map[i] = _data_map[i] == FILTERED_NULL ? NULL_DATA : CONTENT; - num_read++; - } - } - _num_filtered = num_values - num_read; - if (null_map != nullptr && num_read > 0) { - NullMap& map_data_column = *null_map; - auto null_map_index = map_data_column.size(); - map_data_column.resize(null_map_index + num_read); - if (_num_nulls == 0) { - memset(map_data_column.data() + null_map_index, 0, num_read); - } else if (_num_nulls == num_values) { - memset(map_data_column.data() + null_map_index, 1, num_read); - } else { - for (size_t i = 0; i < num_values; ++i) { - if (_data_map[i] == CONTENT) { - map_data_column[null_map_index++] = (UInt8) false; - } else if (_data_map[i] == NULL_DATA) { - map_data_column[null_map_index++] = (UInt8) true; - } - } - } - } - } else { - _num_filtered = 0; - _run_length_null_map = &run_length_null_map; - if (null_map != nullptr) { - NullMap& map_data_column = *null_map; - auto null_map_index = map_data_column.size(); - map_data_column.resize(null_map_index + num_values); - - for (auto& run_length : run_length_null_map) { - if (is_null) { - memset(map_data_column.data() + null_map_index, 1, run_length); - null_map_index += run_length; - _num_nulls += run_length; - } else { - memset(map_data_column.data() + null_map_index, 0, run_length); - null_map_index += run_length; - } - is_null = !is_null; - } - } else { - for (auto& run_length : run_length_null_map) { - if (is_null) { - _num_nulls += run_length; - } - is_null = !is_null; - } - } - } -} - -bool ColumnSelectVector::can_filter_all(size_t remaining_num_values) { +bool FilterMap::can_filter_all(size_t remaining_num_values, size_t filter_map_index) { if (!_has_filter) { return false; } if (_filter_all) { // all data in normal columns can be skipped when _filter_all = true, // so the remaining_num_values should be less than the remaining filter map size. - DCHECK_LE(remaining_num_values + _filter_map_index, _filter_map_size); + DCHECK_LE(remaining_num_values + filter_map_index, _filter_map_size); // return true always, to make sure that the data in normal columns can be skipped. return true; } - if (remaining_num_values + _filter_map_index > _filter_map_size) { + if (remaining_num_values + filter_map_index > _filter_map_size) { return false; } - return simd::count_zero_num(reinterpret_cast<const int8_t*>(_filter_map + _filter_map_index), - remaining_num_values) == remaining_num_values; -} + return simd::count_zero_num( + reinterpret_cast<const int8_t*>(_filter_map_data + filter_map_index), + remaining_num_values) == remaining_num_values; +} + +Status FilterMap::generate_nested_filter_map(const std::vector<level_t>& rep_levels, + std::vector<uint8_t>& nested_filter_map_data, + std::unique_ptr<FilterMap>* nested_filter_map, + size_t* current_row_ptr, size_t start_index) const { + if (!has_filter() || filter_all()) { + return Status::InternalError(fmt::format( + "FilterMap::generate_nested_filter_map failed: has_filter={}, filter_all={}", + has_filter(), filter_all())); + } + + if (rep_levels.empty()) { + return Status::OK(); + } + + nested_filter_map_data.resize(rep_levels.size()); -void ColumnSelectVector::skip(size_t num_values) { - _filter_map_index += num_values; + size_t current_row = current_row_ptr ? *current_row_ptr : 0; + + for (size_t i = start_index; i < rep_levels.size(); i++) { + if (i != start_index && rep_levels[i] == 0) { + current_row++; + if (current_row >= _filter_map_size) { + return Status::InvalidArgument(fmt::format( + "current_row >= _filter_map_size. current_row: {}, _filter_map_size: {}", + current_row, _filter_map_size)); + } + } + nested_filter_map_data[i] = _filter_map_data[current_row]; + } + + if (current_row_ptr) { + *current_row_ptr = current_row; + } + + auto new_filter = std::make_unique<FilterMap>(); + RETURN_IF_ERROR( + new_filter->init(nested_filter_map_data.data(), nested_filter_map_data.size(), false)); + *nested_filter_map = std::move(new_filter); + + return Status::OK(); } ParsedVersion::ParsedVersion(std::string application, std::optional<std::string> version, diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index da374d5fe79..e4c394c05d2 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -24,6 +24,7 @@ #include <ostream> #include <regex> #include <string> +#include <unordered_set> #include <vector> #include "vec/columns/column_nullable.h" @@ -69,42 +70,148 @@ struct ParquetInt96 { #pragma pack() static_assert(sizeof(ParquetInt96) == 12, "The size of ParquetInt96 is not 12."); -class ColumnSelectVector { +class FilterMap { public: - enum DataReadType : uint8_t { CONTENT = 0, NULL_DATA, FILTERED_CONTENT, FILTERED_NULL }; + FilterMap() = default; + Status init(const uint8_t* filter_map_data, size_t filter_map_size, bool filter_all); - ColumnSelectVector(const uint8_t* filter_map, size_t filter_map_size, bool filter_all); + Status generate_nested_filter_map(const std::vector<level_t>& rep_levels, + std::vector<uint8_t>& nested_filter_map_data, + std::unique_ptr<FilterMap>* nested_filter_map, + size_t* current_row_ptr, size_t start_index = 0) const; - ColumnSelectVector() = default; + const uint8_t* filter_map_data() const { return _filter_map_data; } + size_t filter_map_size() const { return _filter_map_size; } + bool has_filter() const { return _has_filter; } + bool filter_all() const { return _filter_all; } + double filter_ratio() const { return _has_filter ? _filter_ratio : 0; } - void build(const uint8_t* filter_map, size_t filter_map_size, bool filter_all); + bool can_filter_all(size_t remaining_num_values, size_t filter_map_index); - const uint8_t* filter_map() { return _filter_map; } +private: + bool _has_filter = false; + bool _filter_all = false; + const uint8_t* _filter_map_data = nullptr; + size_t _filter_map_size = 0; + double _filter_ratio = 0; +}; - size_t num_values() const { return _num_values; } +class ColumnSelectVector { +public: + enum DataReadType : uint8_t { CONTENT = 0, NULL_DATA, FILTERED_CONTENT, FILTERED_NULL }; - size_t num_nulls() const { return _num_nulls; } + ColumnSelectVector() = default; - size_t num_filtered() const { return _num_filtered; } + Status init(const std::vector<uint16_t>& run_length_null_map, size_t num_values, + NullMap* null_map, FilterMap* filter_map, size_t filter_map_index, + const std::unordered_set<size_t>* skipped_indices = nullptr) { + _num_values = num_values; + _num_nulls = 0; + _read_index = 0; + size_t map_index = 0; + bool is_null = false; + _has_filter = filter_map->has_filter(); + + if (filter_map->has_filter()) { + // No run length null map is generated when _filter_all = true + DCHECK(!filter_map->filter_all()); + _data_map.resize(num_values); + for (auto& run_length : run_length_null_map) { + if (is_null) { + _num_nulls += run_length; + for (int i = 0; i < run_length; ++i) { + _data_map[map_index++] = FILTERED_NULL; + } + } else { + for (int i = 0; i < run_length; ++i) { + _data_map[map_index++] = FILTERED_CONTENT; + } + } + is_null = !is_null; + } - double filter_ratio() const { return _has_filter ? _filter_ratio : 0; } + size_t num_read = 0; + size_t i = 0; + size_t valid_count = 0; - void fallback_filter() { _has_filter = false; } + while (valid_count < num_values) { + DCHECK_LT(filter_map_index + i, filter_map->filter_map_size()); - bool has_filter() const { return _has_filter; } + if (skipped_indices != nullptr && + skipped_indices->count(filter_map_index + i) > 0) { + ++i; + continue; + } - bool can_filter_all(size_t remaining_num_values); + if (filter_map->filter_map_data()[filter_map_index + i]) { + _data_map[valid_count] = + _data_map[valid_count] == FILTERED_NULL ? NULL_DATA : CONTENT; + num_read++; + } + ++valid_count; + ++i; + } - bool filter_all() const { return _filter_all; } + _num_filtered = num_values - num_read; - void skip(size_t num_values); + if (null_map != nullptr && num_read > 0) { + NullMap& map_data_column = *null_map; + auto null_map_index = map_data_column.size(); + map_data_column.resize(null_map_index + num_read); - void reset() { - if (_has_filter) { - _filter_map_index = 0; + if (_num_nulls == 0) { + memset(map_data_column.data() + null_map_index, 0, num_read); + } else if (_num_nulls == num_values) { + memset(map_data_column.data() + null_map_index, 1, num_read); + } else { + for (size_t i = 0; i < num_values; ++i) { + if (_data_map[i] == CONTENT) { + map_data_column[null_map_index++] = (UInt8) false; + } else if (_data_map[i] == NULL_DATA) { + map_data_column[null_map_index++] = (UInt8) true; + } + } + } + } + } else { + _num_filtered = 0; + _run_length_null_map = &run_length_null_map; + if (null_map != nullptr) { + NullMap& map_data_column = *null_map; + auto null_map_index = map_data_column.size(); + map_data_column.resize(null_map_index + num_values); + + for (auto& run_length : run_length_null_map) { + if (is_null) { + memset(map_data_column.data() + null_map_index, 1, run_length); + null_map_index += run_length; + _num_nulls += run_length; + } else { + memset(map_data_column.data() + null_map_index, 0, run_length); + null_map_index += run_length; + } + is_null = !is_null; + } + } else { + for (auto& run_length : run_length_null_map) { + if (is_null) { + _num_nulls += run_length; + } + is_null = !is_null; + } + } } + return Status::OK(); } + size_t num_values() const { return _num_values; } + + size_t num_nulls() const { return _num_nulls; } + + size_t num_filtered() const { return _num_filtered; } + + bool has_filter() const { return _has_filter; } + template <bool has_filter> size_t get_next_run(DataReadType* data_read_type) { DCHECK_EQ(_has_filter, has_filter); @@ -137,22 +244,11 @@ public: } } - void set_run_length_null_map(const std::vector<uint16_t>& run_length_null_map, - size_t num_values, NullMap* null_map = nullptr); - private: std::vector<DataReadType> _data_map; // the length of non-null values and null values are arranged in turn. const std::vector<uint16_t>* _run_length_null_map; - bool _has_filter = false; - // only used when the whole batch is skipped - bool _filter_all = false; - const uint8_t* _filter_map = nullptr; - size_t _filter_map_size = 0; - double _filter_ratio = 0; - size_t _filter_map_index = 0; - - // generated in set_run_length_null_map + bool _has_filter; size_t _num_values; size_t _num_nulls; size_t _num_filtered; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index fd3200b3640..207b917666b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -238,7 +238,7 @@ Status ScalarColumnReader::_skip_values(size_t num_values) { } Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_column, - DataTypePtr& type, ColumnSelectVector& select_vector, + DataTypePtr& type, FilterMap& filter_map, bool is_dict_filter) { if (num_values == 0) { return Status::OK(); @@ -301,9 +301,12 @@ Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_colu } null_map.emplace_back((u_short)remaining); } + ColumnSelectVector select_vector; { SCOPED_RAW_TIMER(&_decode_null_map_time); - select_vector.set_run_length_null_map(null_map, num_values, map_data_column); + RETURN_IF_ERROR(select_vector.init(null_map, num_values, map_data_column, &filter_map, + _filter_map_index)); + _filter_map_index += num_values; } return _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter); } @@ -314,9 +317,12 @@ Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_colu * whether the reader should read the remaining value of the last row in previous page. */ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, size_t batch_size, + FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter, - bool align_rows = false) { + bool align_rows) { + std::unique_ptr<FilterMap> nested_filter_map; + + FilterMap* current_filter_map = &filter_map; size_t origin_size = 0; if (align_rows) { origin_size = _rep_levels.size(); @@ -326,17 +332,22 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType } else { _rep_levels.resize(0); _def_levels.resize(0); + if (_nested_filter_map_data) { + _nested_filter_map_data->resize(0); + } } size_t parsed_rows = 0; size_t remaining_values = _chunk_reader->remaining_num_values(); bool has_rep_level = _chunk_reader->max_rep_level() > 0; bool has_def_level = _chunk_reader->max_def_level() > 0; + // Handle repetition levels (indicates nesting structure) if (has_rep_level) { LevelDecoder& rep_decoder = _chunk_reader->rep_level_decoder(); + // Read repetition levels until batch is full or no more values while (parsed_rows <= batch_size && remaining_values > 0) { level_t rep_level = rep_decoder.get_next(); - if (rep_level == 0) { + if (rep_level == 0) { // rep_level 0 indicates start of new row if (parsed_rows == batch_size) { rep_decoder.rewind_one(); break; @@ -346,12 +357,26 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType _rep_levels.emplace_back(rep_level); remaining_values--; } + + // Generate nested filter map + if (filter_map.has_filter() && (!filter_map.filter_all())) { + if (_nested_filter_map_data == nullptr) { + _nested_filter_map_data.reset(new std::vector<uint8_t>()); + } + RETURN_IF_ERROR(filter_map.generate_nested_filter_map( + _rep_levels, *_nested_filter_map_data, &nested_filter_map, + &_orig_filter_map_index, origin_size)); + // Update current_filter_map to nested_filter_map + current_filter_map = nested_filter_map.get(); + } } else if (!align_rows) { // case : required child columns in struct type parsed_rows = std::min(remaining_values, batch_size); remaining_values -= parsed_rows; _rep_levels.resize(parsed_rows, 0); } + + // Process definition levels (indicates null values) size_t parsed_values = _chunk_reader->remaining_num_values() - remaining_values; _def_levels.resize(origin_size + parsed_values); if (has_def_level) { @@ -360,6 +385,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType std::fill(_def_levels.begin() + origin_size, _def_levels.end(), 0); } + // Handle nullable columns MutableColumnPtr data_column; std::vector<uint16_t> null_map; NullMap* map_data_column = nullptr; @@ -375,10 +401,16 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType } data_column = doris_column->assume_mutable(); } + + // Process definition levels to build null map size_t has_read = origin_size; size_t ancestor_nulls = 0; + size_t null_size = 0; + size_t nonnull_size = 0; null_map.emplace_back(0); bool prev_is_null = false; + std::unordered_set<size_t> ancestor_null_indices; + while (has_read < origin_size + parsed_values) { level_t def_level = _def_levels[has_read++]; size_t loop_read = 1; @@ -386,16 +418,23 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType has_read++; loop_read++; } + if (def_level < _field_schema->repeated_parent_def_level) { - // when def_level is less than repeated_parent_def_level, it means that level - // will affect its ancestor. + for (size_t i = 0; i < loop_read; i++) { + ancestor_null_indices.insert(has_read - loop_read + i); + } ancestor_nulls += loop_read; continue; } + bool is_null = def_level < _field_schema->definition_level; + if (is_null) { + null_size += loop_read; + } else { + nonnull_size += loop_read; + } + if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { - // If whether the values are nullable in current loop is the same the previous values, - // we can save the memory usage in null map null_map.back() += loop_read; } else { if (!(prev_is_null ^ is_null)) { @@ -413,29 +452,78 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType } size_t num_values = parsed_values - ancestor_nulls; - { - SCOPED_RAW_TIMER(&_decode_null_map_time); - select_vector.set_run_length_null_map(null_map, num_values, map_data_column); - } - RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter)); - if (ancestor_nulls != 0) { - RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_nulls, false)); - } - if (!align_rows) { - *read_rows = parsed_rows; + // Handle filtered values + if (current_filter_map->filter_all()) { + // Skip all values if everything is filtered + if (null_size > 0) { + RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false)); + } + if (nonnull_size > 0) { + RETURN_IF_ERROR(_chunk_reader->skip_values(nonnull_size, true)); + } + if (ancestor_nulls != 0) { + RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_nulls, false)); + } + } else { + ColumnSelectVector select_vector; + { + SCOPED_RAW_TIMER(&_decode_null_map_time); + RETURN_IF_ERROR( + select_vector.init(null_map, num_values, map_data_column, current_filter_map, + _nested_filter_map_data ? origin_size : _filter_map_index, + &ancestor_null_indices)); + } + + RETURN_IF_ERROR( + _chunk_reader->decode_values(data_column, type, select_vector, is_dict_filter)); + if (ancestor_nulls != 0) { + RETURN_IF_ERROR(_chunk_reader->skip_values(ancestor_nulls, false)); + } } + *read_rows += parsed_rows; + _filter_map_index += parsed_values; + + // Handle cross-page reading if (_chunk_reader->remaining_num_values() == 0) { if (_chunk_reader->has_next_page()) { RETURN_IF_ERROR(_chunk_reader->next_page()); RETURN_IF_ERROR(_chunk_reader->load_page_data()); - select_vector.reset(); - return _read_nested_column(doris_column, type, select_vector, 0, read_rows, eof, + return _read_nested_column(doris_column, type, filter_map, 0, read_rows, eof, is_dict_filter, true); } else { *eof = true; } } + + // Apply filtering to repetition and definition levels + if (current_filter_map->has_filter()) { + if (current_filter_map->filter_all()) { + _rep_levels.resize(0); + _def_levels.resize(0); + } else { + std::vector<level_t> filtered_rep_levels; + std::vector<level_t> filtered_def_levels; + filtered_rep_levels.reserve(_rep_levels.size()); + filtered_def_levels.reserve(_def_levels.size()); + + const uint8_t* filter_map_data = current_filter_map->filter_map_data(); + + for (size_t i = 0; i < _rep_levels.size(); i++) { + if (filter_map_data[i]) { + filtered_rep_levels.push_back(_rep_levels[i]); + filtered_def_levels.push_back(_def_levels[i]); + } + } + + _rep_levels = std::move(filtered_rep_levels); + _def_levels = std::move(filtered_def_levels); + } + } + + // Prepare for next row + ++_orig_filter_map_index; + if (_rep_levels.size() > 0) { // make sure the rows of complex type are aligned correctly, // so the repetition level of first element should be 0, meaning a new row is started. @@ -443,6 +531,7 @@ Status ScalarColumnReader::_read_nested_column(ColumnPtr& doris_column, DataType } return Status::OK(); } + Status ScalarColumnReader::read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) { bool loaded; @@ -474,7 +563,7 @@ Status ScalarColumnReader::_try_load_dict_page(bool* loaded, bool* has_dict) { } Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, size_t batch_size, + FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter) { if (_converter == nullptr) { _converter = parquet::PhysicalToLogicalConverter::get_converter( @@ -499,8 +588,8 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr } if (_nested_column) { RETURN_IF_ERROR(_chunk_reader->load_page_data_idempotent()); - RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, select_vector, - batch_size, read_rows, eof, is_dict_filter)); + RETURN_IF_ERROR(_read_nested_column(resolved_column, resolved_type, filter_map, + batch_size, read_rows, eof, is_dict_filter, false)); break; } @@ -518,16 +607,16 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr bool skip_whole_batch = false; // Determining whether to skip page or batch will increase the calculation time. // When the filtering effect is greater than 60%, it is possible to skip the page or batch. - if (select_vector.has_filter() && select_vector.filter_ratio() > 0.6) { + if (filter_map.has_filter() && filter_map.filter_ratio() > 0.6) { // lazy read size_t remaining_num_values = 0; for (auto& range : read_ranges) { remaining_num_values += range.last_row - range.first_row; } if (batch_size >= remaining_num_values && - select_vector.can_filter_all(remaining_num_values)) { + filter_map.can_filter_all(remaining_num_values, _filter_map_index)) { // We can skip the whole page if the remaining values is filtered by predicate columns - select_vector.skip(remaining_num_values); + _filter_map_index += remaining_num_values; _current_row_index += _chunk_reader->remaining_num_values(); RETURN_IF_ERROR(_chunk_reader->skip_page()); *read_rows = remaining_num_values; @@ -537,9 +626,9 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr break; } skip_whole_batch = batch_size <= remaining_num_values && - select_vector.can_filter_all(batch_size); + filter_map.can_filter_all(batch_size, _filter_map_index); if (skip_whole_batch) { - select_vector.skip(batch_size); + _filter_map_index += batch_size; } } // load page data to decode or skip values @@ -557,7 +646,7 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr RETURN_IF_ERROR(_skip_values(read_values)); } else { RETURN_IF_ERROR(_read_values(read_values, resolved_column, resolved_type, - select_vector, is_dict_filter)); + filter_map, is_dict_filter)); } has_read += read_values; _current_row_index += read_values; @@ -585,7 +674,7 @@ Status ArrayColumnReader::init(std::unique_ptr<ParquetColumnReader> element_read } Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, size_t batch_size, + FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter) { MutableColumnPtr data_column; NullMap* null_map_ptr = nullptr; @@ -611,7 +700,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& (reinterpret_cast<const DataTypeArray*>(remove_nullable(type).get())) ->get_nested_type()); // read nested column - RETURN_IF_ERROR(_element_reader->read_column_data(element_column, element_type, select_vector, + RETURN_IF_ERROR(_element_reader->read_column_data(element_column, element_type, filter_map, batch_size, read_rows, eof, is_dict_filter)); if (*read_rows == 0) { return Status::OK(); @@ -636,7 +725,7 @@ Status MapColumnReader::init(std::unique_ptr<ParquetColumnReader> key_reader, } Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, size_t batch_size, + FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter) { MutableColumnPtr data_column; NullMap* null_map_ptr = nullptr; @@ -669,12 +758,12 @@ Status MapColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& t size_t value_rows = 0; bool key_eof = false; bool value_eof = false; - RETURN_IF_ERROR(_key_reader->read_column_data(key_column, key_type, select_vector, batch_size, + RETURN_IF_ERROR(_key_reader->read_column_data(key_column, key_type, filter_map, batch_size, &key_rows, &key_eof, is_dict_filter)); + while (value_rows < key_rows && !value_eof) { size_t loop_rows = 0; - select_vector.reset(); - RETURN_IF_ERROR(_value_reader->read_column_data(value_column, value_type, select_vector, + RETURN_IF_ERROR(_value_reader->read_column_data(value_column, value_type, filter_map, key_rows - value_rows, &loop_rows, &value_eof, is_dict_filter)); value_rows += loop_rows; @@ -705,7 +794,7 @@ Status StructColumnReader::init( return Status::OK(); } Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, size_t batch_size, + FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter) { MutableColumnPtr data_column; NullMap* null_map_ptr = nullptr; @@ -748,22 +837,21 @@ Status StructColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr _read_column_names.insert(doris_name); - select_vector.reset(); + // select_vector.reset(); size_t field_rows = 0; bool field_eof = false; if (not_missing_column_id == -1) { not_missing_column_id = i; RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data( - doris_field, doris_type, select_vector, batch_size, &field_rows, &field_eof, + doris_field, doris_type, filter_map, batch_size, &field_rows, &field_eof, is_dict_filter)); *read_rows = field_rows; *eof = field_eof; } else { while (field_rows < *read_rows && !field_eof) { size_t loop_rows = 0; - select_vector.reset(); RETURN_IF_ERROR(_child_readers[doris_name]->read_column_data( - doris_field, doris_type, select_vector, *read_rows - field_rows, &loop_rows, + doris_field, doris_type, filter_map, *read_rows - field_rows, &loop_rows, &field_eof, is_dict_filter)); field_rows += loop_rows; } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 4c6e5b1eac9..51cdc3dbbd7 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -121,8 +121,8 @@ public: : _row_ranges(row_ranges), _ctz(ctz), _io_ctx(io_ctx) {} virtual ~ParquetColumnReader() = default; virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, size_t batch_size, - size_t* read_rows, bool* eof, bool is_dict_filter) = 0; + FilterMap& filter_map, size_t batch_size, size_t* read_rows, + bool* eof, bool is_dict_filter) = 0; virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) { return Status::NotSupported("read_dict_values_to_column is not supported"); @@ -144,6 +144,8 @@ public: virtual Statistics statistics() = 0; virtual void close() = 0; + virtual void reset_filter_map_index() = 0; + protected: void _generate_read_ranges(int64_t start_index, int64_t end_index, std::list<RowRange>& read_ranges); @@ -157,6 +159,8 @@ protected: int64_t _current_row_index = 0; int _row_range_index = 0; int64_t _decode_null_map_time = 0; + + size_t _filter_map_index = 0; }; class ScalarColumnReader : public ParquetColumnReader { @@ -171,9 +175,9 @@ public: _offset_index(offset_index) {} ~ScalarColumnReader() override { close(); } Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size); - Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, - bool* eof, bool is_dict_filter) override; + Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, + size_t batch_size, size_t* read_rows, bool* eof, + bool is_dict_filter) override; Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) override; MutableColumnPtr convert_dict_column_to_string_column(const ColumnInt32* dict_column) override; const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } @@ -184,6 +188,11 @@ public: } void close() override {} + void reset_filter_map_index() override { + _filter_map_index = 0; // nested + _orig_filter_map_index = 0; + } + private: tparquet::ColumnChunk _chunk_meta; const tparquet::OffsetIndex* _offset_index; @@ -192,13 +201,15 @@ private: std::vector<level_t> _rep_levels; std::vector<level_t> _def_levels; std::unique_ptr<parquet::PhysicalToLogicalConverter> _converter = nullptr; + std::unique_ptr<std::vector<uint8_t>> _nested_filter_map_data = nullptr; + size_t _orig_filter_map_index = 0; Status _skip_values(size_t num_values); Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, bool is_dict_filter); - Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, size_t batch_size, - size_t* read_rows, bool* eof, bool is_dict_filter, bool align_rows); + FilterMap& filter_map, bool is_dict_filter); + Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, + size_t batch_size, size_t* read_rows, bool* eof, bool is_dict_filter, + bool align_rows); Status _try_load_dict_page(bool* loaded, bool* has_dict); }; @@ -210,9 +221,9 @@ public: : ParquetColumnReader(row_ranges, ctz, io_ctx) {} ~ArrayColumnReader() override { close(); } Status init(std::unique_ptr<ParquetColumnReader> element_reader, FieldSchema* field); - Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, - bool* eof, bool is_dict_filter) override; + Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, + size_t batch_size, size_t* read_rows, bool* eof, + bool is_dict_filter) override; const std::vector<level_t>& get_rep_level() const override { return _element_reader->get_rep_level(); } @@ -222,6 +233,8 @@ public: Statistics statistics() override { return _element_reader->statistics(); } void close() override {} + void reset_filter_map_index() override { _element_reader->reset_filter_map_index(); } + private: std::unique_ptr<ParquetColumnReader> _element_reader; }; @@ -236,9 +249,9 @@ public: Status init(std::unique_ptr<ParquetColumnReader> key_reader, std::unique_ptr<ParquetColumnReader> value_reader, FieldSchema* field); - Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, - bool* eof, bool is_dict_filter) override; + Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, + size_t batch_size, size_t* read_rows, bool* eof, + bool is_dict_filter) override; const std::vector<level_t>& get_rep_level() const override { return _key_reader->get_rep_level(); @@ -256,6 +269,11 @@ public: void close() override {} + void reset_filter_map_index() override { + _key_reader->reset_filter_map_index(); + _value_reader->reset_filter_map_index(); + } + private: std::unique_ptr<ParquetColumnReader> _key_reader; std::unique_ptr<ParquetColumnReader> _value_reader; @@ -272,9 +290,9 @@ public: Status init( std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers, FieldSchema* field); - Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, - ColumnSelectVector& select_vector, size_t batch_size, size_t* read_rows, - bool* eof, bool is_dict_filter) override; + Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, + size_t batch_size, size_t* read_rows, bool* eof, + bool is_dict_filter) override; const std::vector<level_t>& get_rep_level() const override { if (!_read_column_names.empty()) { @@ -306,6 +324,12 @@ public: void close() override {} + void reset_filter_map_index() override { + for (const auto& reader : _child_readers) { + reader.second->reset_filter_map_index(); + } + } + private: std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers; std::set<std::string> _read_column_names; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 14dd64097e9..c7ab73e1266 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -317,9 +317,9 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_ // call _do_lazy_read recursively when current batch is skipped return _do_lazy_read(block, batch_size, read_rows, batch_eof); } else { - ColumnSelectVector run_length_vector; + FilterMap filter_map; RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.all_read_columns, batch_size, - read_rows, batch_eof, run_length_vector)); + read_rows, batch_eof, filter_map)); RETURN_IF_ERROR( _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns)); RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns)); @@ -389,7 +389,7 @@ void RowGroupReader::_merge_read_ranges(std::vector<RowRange>& row_ranges) { Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::string>& columns, size_t batch_size, size_t* read_rows, bool* batch_eof, - ColumnSelectVector& select_vector) { + FilterMap& filter_map) { size_t batch_read_rows = 0; bool has_eof = false; for (auto& read_col_name : columns) { @@ -420,11 +420,12 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::st size_t col_read_rows = 0; bool col_eof = false; // Should reset _filter_map_index to 0 when reading next column. - select_vector.reset(); + // select_vector.reset(); + _column_readers[read_col_name]->reset_filter_map_index(); while (!col_eof && col_read_rows < batch_size) { size_t loop_rows = 0; RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data( - column_ptr, column_type, select_vector, batch_size - col_read_rows, &loop_rows, + column_ptr, column_type, filter_map, batch_size - col_read_rows, &loop_rows, &col_eof, is_dict_filter)); col_read_rows += loop_rows; } @@ -445,7 +446,7 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::st Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof) { - std::unique_ptr<ColumnSelectVector> select_vector_ptr = nullptr; + std::unique_ptr<FilterMap> filter_map_ptr = nullptr; size_t pre_read_rows; bool pre_eof; std::vector<uint32_t> columns_to_filter; @@ -460,9 +461,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re // read predicate columns pre_read_rows = 0; pre_eof = false; - ColumnSelectVector run_length_vector; + FilterMap filter_map; RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns.first, batch_size, - &pre_read_rows, &pre_eof, run_length_vector)); + &pre_read_rows, &pre_eof, filter_map)); if (pre_read_rows == 0) { DCHECK_EQ(pre_eof, true); break; @@ -504,9 +505,10 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re block->get_by_position(0).column->assume_mutable()->clear(); } - const uint8_t* __restrict filter_map = result_filter.data(); - select_vector_ptr.reset(new ColumnSelectVector(filter_map, pre_read_rows, can_filter_all)); - if (select_vector_ptr->filter_all()) { + const uint8_t* __restrict filter_map_data = result_filter.data(); + filter_map_ptr.reset(new FilterMap()); + RETURN_IF_ERROR(filter_map_ptr->init(filter_map_data, pre_read_rows, can_filter_all)); + if (filter_map_ptr->filter_all()) { for (auto& col : _lazy_read_ctx.predicate_columns.first) { // clean block to read predicate columns block->get_by_name(col).column->assume_mutable()->clear(); @@ -528,7 +530,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re return Status::OK(); } } else { // pre_eof - // If select_vector_ptr->filter_all() and pre_eof, we can skip whole row group. + // If filter_map_ptr->filter_all() and pre_eof, we can skip whole row group. *read_rows = 0; *batch_eof = true; _lazy_read_filtered_rows += (pre_read_rows + _cached_filtered_rows); @@ -543,17 +545,17 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re return Status::Cancelled("cancelled"); } - if (select_vector_ptr == nullptr) { + if (filter_map_ptr == nullptr) { DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0); *read_rows = 0; *batch_eof = true; return Status::OK(); } - ColumnSelectVector& select_vector = *select_vector_ptr; + FilterMap& filter_map = *filter_map_ptr; std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr; if (_cached_filtered_rows != 0) { - _rebuild_select_vector(select_vector, rebuild_filter_map, pre_read_rows); + RETURN_IF_ERROR(_rebuild_filter_map(filter_map, rebuild_filter_map, pre_read_rows)); pre_read_rows += _cached_filtered_rows; _cached_filtered_rows = 0; } @@ -562,7 +564,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re size_t lazy_read_rows; bool lazy_eof; RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, pre_read_rows, - &lazy_read_rows, &lazy_eof, select_vector)); + &lazy_read_rows, &lazy_eof, filter_map)); + if (pre_read_rows != lazy_read_rows) { return Status::Corruption("Can't read the same number of rows when doing lazy read"); } @@ -570,7 +573,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re // we set pre_read_rows as batch_size for lazy read columns, so pre_eof != lazy_eof // filter data in predicate columns, and remove filter column - if (select_vector.has_filter()) { + if (filter_map.has_filter()) { if (block->columns() == origin_column_num) { // the whole row group has been filtered by _lazy_read_ctx.vconjunct_ctx, and batch_eof is // generated from next batch, so the filter column is removed ahead. @@ -615,24 +618,24 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re return Status::OK(); } -void RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector, - std::unique_ptr<uint8_t[]>& filter_map, - size_t pre_read_rows) const { +Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map, + std::unique_ptr<uint8_t[]>& filter_map_data, + size_t pre_read_rows) const { if (_cached_filtered_rows == 0) { - return; + return Status::OK(); } size_t total_rows = _cached_filtered_rows + pre_read_rows; - if (select_vector.filter_all()) { - select_vector.build(nullptr, total_rows, true); - return; + if (filter_map.filter_all()) { + RETURN_IF_ERROR(filter_map.init(nullptr, total_rows, true)); + return Status::OK(); } uint8_t* map = new uint8_t[total_rows]; - filter_map.reset(map); + filter_map_data.reset(map); for (size_t i = 0; i < _cached_filtered_rows; ++i) { map[i] = 0; } - const uint8_t* old_map = select_vector.filter_map(); + const uint8_t* old_map = filter_map.filter_map_data(); if (old_map == nullptr) { // select_vector.filter_all() == true is already built. for (size_t i = _cached_filtered_rows; i < total_rows; ++i) { @@ -641,7 +644,8 @@ void RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector, } else { memcpy(map + _cached_filtered_rows, old_map, pre_read_rows); } - select_vector.build(map, total_rows, false); + RETURN_IF_ERROR(filter_map.init(map, total_rows, false)); + return Status::OK(); } Status RowGroupReader::_fill_partition_columns( diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index a889c1774ea..f73e9ebe09e 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -175,10 +175,10 @@ private: Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof); Status _read_column_data(Block* block, const std::vector<std::string>& columns, size_t batch_size, size_t* read_rows, bool* batch_eof, - ColumnSelectVector& select_vector); + FilterMap& filter_map); Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof); - void _rebuild_select_vector(ColumnSelectVector& select_vector, - std::unique_ptr<uint8_t[]>& filter_map, size_t pre_read_rows) const; + Status _rebuild_filter_map(FilterMap& filter_map, std::unique_ptr<uint8_t[]>& filter_map_data, + size_t pre_read_rows) const; Status _fill_partition_columns( Block* block, size_t rows, const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>& diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index ce3a1bbea84..f5e3e6fdcd5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -477,8 +477,7 @@ Status ParquetReader::set_fill_columns( } } - if (!_lazy_read_ctx.has_complex_type && _enable_lazy_mat && - _lazy_read_ctx.predicate_columns.first.size() > 0 && + if (_enable_lazy_mat && _lazy_read_ctx.predicate_columns.first.size() > 0 && _lazy_read_ctx.lazy_read_columns.size() > 0) { _lazy_read_ctx.can_lazy_read = true; } diff --git a/be/test/vec/exec/parquet/parquet_common_test.cpp b/be/test/vec/exec/parquet/parquet_common_test.cpp new file mode 100644 index 00000000000..06715362937 --- /dev/null +++ b/be/test/vec/exec/parquet/parquet_common_test.cpp @@ -0,0 +1,457 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/parquet/parquet_common.h" + +#include <gtest/gtest.h> + +namespace doris::vectorized { + +// ============= FilterMap Tests ============= +class FilterMapTest : public testing::Test { +protected: + void SetUp() override {} + void TearDown() override {} +}; + +// Basic initialization test +TEST_F(FilterMapTest, test_basic_init) { + std::vector<uint8_t> filter_data = {1, 0, 1, 0}; + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + EXPECT_TRUE(filter_map.has_filter()); + EXPECT_FALSE(filter_map.filter_all()); + EXPECT_EQ(filter_map.filter_map_size(), 4); + EXPECT_DOUBLE_EQ(filter_map.filter_ratio(), 0.5); +} + +// Empty filter test +TEST_F(FilterMapTest, test_empty_filter) { + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(nullptr, 0, false).ok()); + + EXPECT_FALSE(filter_map.has_filter()); + EXPECT_FALSE(filter_map.filter_all()); + EXPECT_DOUBLE_EQ(filter_map.filter_ratio(), 0.0); +} + +// Test filter all +TEST_F(FilterMapTest, test_filter_all) { + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(nullptr, 0, true).ok()); + + EXPECT_TRUE(filter_map.has_filter()); + EXPECT_TRUE(filter_map.filter_all()); + EXPECT_DOUBLE_EQ(filter_map.filter_ratio(), 1.0); +} + +// Test all zero filter +TEST_F(FilterMapTest, test_all_zero_filter) { + std::vector<uint8_t> filter_data(100, 0); // Large data test + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + EXPECT_TRUE(filter_map.has_filter()); + EXPECT_TRUE(filter_map.filter_all()); + EXPECT_DOUBLE_EQ(filter_map.filter_ratio(), 1.0); +} + +// Test all one filter +TEST_F(FilterMapTest, test_all_one_filter) { + std::vector<uint8_t> filter_data(100, 1); + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + EXPECT_FALSE(filter_map.has_filter()); + EXPECT_FALSE(filter_map.filter_all()); + EXPECT_DOUBLE_EQ(filter_map.filter_ratio(), 0.0); +} + +// Basic nested filter map generation test +TEST_F(FilterMapTest, test_generate_nested_filter_map_basic) { + std::vector<uint8_t> filter_data = {1, 0, 1}; + std::vector<level_t> rep_levels = {0, 1, 1, 0, 1, 0}; + + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + std::vector<uint8_t> nested_filter_map_data; + std::unique_ptr<FilterMap> nested_filter_map; + size_t current_row = 0; + + ASSERT_TRUE(filter_map + .generate_nested_filter_map(rep_levels, nested_filter_map_data, + &nested_filter_map, ¤t_row, 0) + .ok()); + + std::vector<uint8_t> expected = {1, 1, 1, 0, 0, 1}; + EXPECT_EQ(nested_filter_map_data, expected); + EXPECT_EQ(current_row, 2); +} + +// Empty rep_levels test +TEST_F(FilterMapTest, test_generate_nested_filter_map_empty_rep_levels) { + std::vector<uint8_t> filter_data = {1, 0, 1}; + std::vector<level_t> rep_levels; + + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + std::vector<uint8_t> nested_filter_map_data; + std::unique_ptr<FilterMap> nested_filter_map; + size_t current_row = 0; + + ASSERT_TRUE(filter_map + .generate_nested_filter_map(rep_levels, nested_filter_map_data, + &nested_filter_map, ¤t_row, 0) + .ok()); + + EXPECT_TRUE(nested_filter_map_data.empty()); + EXPECT_EQ(current_row, 0); +} + +// Test nested filter map generation with start index +TEST_F(FilterMapTest, test_generate_nested_filter_map_with_start_index) { + std::vector<uint8_t> filter_data = {1, 0, 1}; + std::vector<level_t> rep_levels = {0, 1, 1, 0, 1, 0}; + // 011, 01, 0 + // 111, 00, 1 + + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + std::vector<uint8_t> nested_filter_map_data; + std::unique_ptr<FilterMap> nested_filter_map; + size_t current_row = 1; + + ASSERT_TRUE(filter_map + .generate_nested_filter_map(rep_levels, nested_filter_map_data, + &nested_filter_map, ¤t_row, 3) + .ok()); + + std::vector<uint8_t> expected(6); // Initialize with zeros + expected[5] = 1; // Last value should be 1 + EXPECT_EQ(nested_filter_map_data, expected); + EXPECT_EQ(current_row, 2); +} + +// Test filter map boundary check +TEST_F(FilterMapTest, test_generate_nested_filter_map_boundary) { + std::vector<uint8_t> filter_data = {1}; + std::vector<level_t> rep_levels = {0, 1, 1, 0}; // Needs 2 rows but filter_data only has 1 + + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + std::vector<uint8_t> nested_filter_map_data; + std::unique_ptr<FilterMap> nested_filter_map; + size_t current_row = 0; + + // Should return error + auto status = filter_map.generate_nested_filter_map(rep_levels, nested_filter_map_data, + &nested_filter_map, ¤t_row, 0); + EXPECT_FALSE(status.ok()); +} + +// Test can_filter_all functionality +TEST_F(FilterMapTest, test_can_filter_all) { + std::vector<uint8_t> filter_data = {0, 0, 1, 0, 0, 1, 0}; + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + EXPECT_TRUE(filter_map.can_filter_all(2, 0)); // First two are 0 + EXPECT_FALSE(filter_map.can_filter_all(3, 0)); // First three include 1 + EXPECT_TRUE(filter_map.can_filter_all(2, 3)); // Two values starting at index 3 are 0 + EXPECT_FALSE(filter_map.can_filter_all(2, 5)); // Index 5 contains 1 + EXPECT_TRUE(filter_map.can_filter_all(1, 6)); // Last value is 0 +} + +// Test can_filter_all when filter_all is true +TEST_F(FilterMapTest, test_can_filter_all_when_filter_all) { + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(nullptr, 100, true).ok()); + + EXPECT_TRUE(filter_map.can_filter_all(50, 0)); + EXPECT_TRUE(filter_map.can_filter_all(100, 0)); +} + +class CrossPageTest : public testing::Test { +protected: + void SetUp() override { + filter_data = {1, 0, 1, 0, 1}; + + // 1111 00 + page1_rep_levels = {0, 1, 1, 1, 0, 1}; + // 00 11 000 1 + page2_rep_levels = {1, 1, 0, 1, 0, 1, 1, 0}; + } + + std::vector<uint8_t> filter_data; + std::vector<level_t> page1_rep_levels; + std::vector<level_t> page2_rep_levels; +}; + +TEST_F(CrossPageTest, test_basic1) { + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + std::vector<uint8_t> nested_filter_map_data; + std::unique_ptr<FilterMap> nested_filter_map; + size_t current_row = 0; + std::vector<level_t> rep_levels; + rep_levels.insert(rep_levels.end(), page1_rep_levels.begin(), page1_rep_levels.end()); + rep_levels.insert(rep_levels.end(), page2_rep_levels.begin(), page2_rep_levels.end()); + + ASSERT_TRUE(filter_map + .generate_nested_filter_map(rep_levels, nested_filter_map_data, + &nested_filter_map, ¤t_row, 0) + .ok()); + + std::vector<uint8_t> expected = {1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1}; + + EXPECT_EQ(nested_filter_map_data, expected); + + EXPECT_EQ(current_row, 4); +} + +TEST_F(CrossPageTest, test_basic2) { + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + std::vector<uint8_t> nested_filter_map_data; + std::unique_ptr<FilterMap> nested_filter_map; + + size_t current_row = 0; + std::vector<level_t> rep_levels; + rep_levels.insert(rep_levels.end(), page1_rep_levels.begin(), page1_rep_levels.end()); + + ASSERT_TRUE(filter_map + .generate_nested_filter_map(rep_levels, nested_filter_map_data, + &nested_filter_map, ¤t_row, 0) + .ok()); + std::vector<uint8_t> expected1 = {1, 1, 1, 1, 0, 0}; + + EXPECT_EQ(nested_filter_map_data, expected1); + EXPECT_EQ(current_row, 1); + + rep_levels.insert(rep_levels.end(), page2_rep_levels.begin(), page2_rep_levels.end()); + + size_t start_index = page1_rep_levels.size(); + ASSERT_TRUE(filter_map + .generate_nested_filter_map(rep_levels, nested_filter_map_data, + &nested_filter_map, ¤t_row, start_index) + .ok()); + + std::vector<uint8_t> expected2 = {1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1}; + + EXPECT_EQ(nested_filter_map_data, expected2); + EXPECT_EQ(current_row, 4); +} + +// ============= ColumnSelectVector Tests ============= +class ColumnSelectVectorTest : public testing::Test { +protected: + void SetUp() override {} + void TearDown() override {} +}; + +// Basic initialization test +TEST_F(ColumnSelectVectorTest, test_basic_init) { + std::vector<uint16_t> run_length_null_map = {2, 1, 3}; // 2 non-null, 1 null, 3 non-null + std::vector<uint8_t> filter_data = {1, 0, 1, 0, 1, 0}; + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + ColumnSelectVector select_vector; + NullMap null_map; + ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, &filter_map, 0).ok()); + + EXPECT_TRUE(select_vector.has_filter()); + EXPECT_EQ(select_vector.num_values(), 6); + EXPECT_EQ(select_vector.num_nulls(), 1); + EXPECT_EQ(select_vector.num_filtered(), 3); +} + +// Test initialization without null map +TEST_F(ColumnSelectVectorTest, test_init_without_null_map) { + std::vector<uint16_t> run_length_null_map = {2, 1, 3}; + std::vector<uint8_t> filter_data = {1, 1, 1, 1, 1, 1}; + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + ColumnSelectVector select_vector; + ASSERT_TRUE(select_vector.init(run_length_null_map, 6, nullptr, &filter_map, 0).ok()); + + EXPECT_EQ(select_vector.num_nulls(), 1); + EXPECT_EQ(select_vector.num_filtered(), 0); +} + +// Test all null values +TEST_F(ColumnSelectVectorTest, test_all_null) { + std::vector<uint16_t> run_length_null_map = {0, 6}; // All null + std::vector<uint8_t> filter_data = {1, 1, 1, 1, 1, 1}; + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + ColumnSelectVector select_vector; + NullMap null_map; + ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, &filter_map, 0).ok()); + + EXPECT_EQ(select_vector.num_nulls(), 6); + EXPECT_EQ(select_vector.num_filtered(), 0); + + // Verify null_map + EXPECT_EQ(null_map.size(), 6); + for (size_t i = 0; i < 6; i++) { + EXPECT_EQ(null_map[i], 1); + } +} + +// Test no null values +TEST_F(ColumnSelectVectorTest, test_no_null) { + std::vector<uint16_t> run_length_null_map = {6}; // All non-null + std::vector<uint8_t> filter_data = {1, 1, 1, 1, 1, 1}; + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + ColumnSelectVector select_vector; + NullMap null_map; + ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, &filter_map, 0).ok()); + + EXPECT_EQ(select_vector.num_nulls(), 0); + EXPECT_EQ(select_vector.num_filtered(), 0); + + // Verify null_map + EXPECT_EQ(null_map.size(), 6); + for (size_t i = 0; i < 6; i++) { + EXPECT_EQ(null_map[i], 0); + } +} + +// Test get_next_run with filter +TEST_F(ColumnSelectVectorTest, test_get_next_run_with_filter) { + std::vector<uint16_t> run_length_null_map = {2, 1, 3}; // 1, 1, 0, 1, 1, 1 + std::vector<uint8_t> filter_data = {1, 1, 0, 1, 1, 0}; + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + ColumnSelectVector select_vector; + NullMap null_map; + ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, &filter_map, 0).ok()); + + ColumnSelectVector::DataReadType type; + + // Verify read sequence + EXPECT_EQ(select_vector.get_next_run<true>(&type), 2); + EXPECT_EQ(type, ColumnSelectVector::CONTENT); + + EXPECT_EQ(select_vector.get_next_run<true>(&type), 1); + EXPECT_EQ(type, ColumnSelectVector::FILTERED_NULL); + + EXPECT_EQ(select_vector.get_next_run<true>(&type), 2); + EXPECT_EQ(type, ColumnSelectVector::CONTENT); + + EXPECT_EQ(select_vector.get_next_run<true>(&type), 1); + EXPECT_EQ(type, ColumnSelectVector::FILTERED_CONTENT); +} + +// Test get_next_run without filter +TEST_F(ColumnSelectVectorTest, test_get_next_run_without_filter) { + std::vector<uint16_t> run_length_null_map = {2, 1, 3}; + std::vector<uint8_t> filter_data = {1, 1, 1, 1, 1, 1}; + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(nullptr, 0, false).ok()); + + ColumnSelectVector select_vector; + NullMap null_map; + ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, &filter_map, 0).ok()); + + ColumnSelectVector::DataReadType type; + + // Verify read sequence + EXPECT_EQ(select_vector.get_next_run<false>(&type), 2); + EXPECT_EQ(type, ColumnSelectVector::CONTENT); + + EXPECT_EQ(select_vector.get_next_run<false>(&type), 1); + EXPECT_EQ(type, ColumnSelectVector::NULL_DATA); + + EXPECT_EQ(select_vector.get_next_run<false>(&type), 3); + EXPECT_EQ(type, ColumnSelectVector::CONTENT); + + EXPECT_EQ(select_vector.get_next_run<false>(&type), 0); +} + +// Test complex null pattern +TEST_F(ColumnSelectVectorTest, test_complex_null_pattern) { + // Alternating null and non-null values + std::vector<uint16_t> run_length_null_map = {1, 1, 1, 1, 1, 1}; // 1, 0, 1, 0, 1, 0 + std::vector<uint8_t> filter_data = {1, 0, 1, 0, 1, 0}; + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + ColumnSelectVector select_vector; + NullMap null_map; + ASSERT_TRUE(select_vector.init(run_length_null_map, 6, &null_map, &filter_map, 0).ok()); + + EXPECT_EQ(select_vector.num_nulls(), 3); + EXPECT_EQ(select_vector.num_filtered(), 3); + + ColumnSelectVector::DataReadType type; + + // Verify alternating read pattern + EXPECT_EQ(select_vector.get_next_run<true>(&type), 1); + EXPECT_EQ(type, ColumnSelectVector::CONTENT); + + EXPECT_EQ(select_vector.get_next_run<true>(&type), 1); + EXPECT_EQ(type, ColumnSelectVector::FILTERED_NULL); + + EXPECT_EQ(select_vector.get_next_run<true>(&type), 1); + EXPECT_EQ(type, ColumnSelectVector::CONTENT); + + EXPECT_EQ(select_vector.get_next_run<true>(&type), 1); + EXPECT_EQ(type, ColumnSelectVector::FILTERED_NULL); + + EXPECT_EQ(select_vector.get_next_run<true>(&type), 1); + EXPECT_EQ(type, ColumnSelectVector::CONTENT); + + EXPECT_EQ(select_vector.get_next_run<true>(&type), 1); + EXPECT_EQ(type, ColumnSelectVector::FILTERED_NULL); +} + +// Test filter_map_index +TEST_F(ColumnSelectVectorTest, test_filter_map_index) { + std::vector<uint16_t> run_length_null_map = {0, 1, 3}; // 0, 1, 1, 1 + std::vector<uint8_t> filter_data = {0, 0, 1, 1, 1, 1}; + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + ColumnSelectVector select_vector; + NullMap null_map; + ASSERT_TRUE(select_vector.init(run_length_null_map, 4, &null_map, &filter_map, 2).ok()); + + EXPECT_EQ(select_vector.num_filtered(), 0); + + ColumnSelectVector::DataReadType type; + EXPECT_EQ(select_vector.get_next_run<true>(&type), 1); + EXPECT_EQ(type, ColumnSelectVector::NULL_DATA); + + EXPECT_EQ(select_vector.get_next_run<true>(&type), 3); + EXPECT_EQ(type, ColumnSelectVector::CONTENT); +} + +} // namespace doris::vectorized diff --git a/be/test/vec/exec/parquet/parquet_nested_type_cross_page_test.cpp b/be/test/vec/exec/parquet/parquet_nested_type_cross_page_test.cpp new file mode 100644 index 00000000000..cff0937910e --- /dev/null +++ b/be/test/vec/exec/parquet/parquet_nested_type_cross_page_test.cpp @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gtest/gtest.h> + +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { +class ParquetCrossPageTest : public testing::Test { +protected: + void SetUp() override { + // filter_data is row-level, corresponding to 8 rows + filter_data = {1, 0, 0, 1, 0, 0, 1, 1}; // filter conditions for 8 rows + + // Page 1: 2 complete rows + 1 incomplete row + page1_rep_levels = { + 0, 1, 1, // Row 1: [1,1,1] + 0, 1, // Row 2: [1,1] + 0, 1, 1 // Row 3: [1,1,to be continued...] + }; + + // Page 2: continue Row 3 + 2 complete rows + 1 incomplete row + page2_rep_levels = { + 1, 1, // Continue Row 3: [...1,1] + 0, 1, // Row 4: [1] + 0, 1, 1, // Row 5: [1,1,1] + 0, 1 // Row 6: [1,to be continued...] + }; + + // Page 3: continue Row 6 + 2 complete rows + page3_rep_levels = { + 1, 1, // Continue Row 6: [...1,1] + 0, 1, 1, // Row 7: [1,1] + 0, 1 // Row 8: [1] + }; + } + + std::vector<uint8_t> filter_data; // Row-level filter conditions for all data + std::vector<level_t> page1_rep_levels; + std::vector<level_t> page2_rep_levels; + std::vector<level_t> page3_rep_levels; +}; + +// Test complete processing of three pages +TEST_F(ParquetCrossPageTest, test_three_pages_complete) { + size_t current_row = 0; + + // Process first page + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(filter_data.data(), filter_data.size(), false).ok()); + + std::vector<uint8_t> nested_data1; + std::unique_ptr<FilterMap> nested_map1; + + ASSERT_TRUE(filter_map + .generate_nested_filter_map(page1_rep_levels, nested_data1, &nested_map1, + ¤t_row, 0) + .ok()); + + // Verify first page results - using corresponding rows from overall filter_data + std::vector<uint8_t> expected1 = {1, 1, 1, 0, 0, 0, 0, 0}; + EXPECT_EQ(nested_data1, expected1); + EXPECT_EQ(current_row, 2); // Processed up to Row 3 + + // Process second page - continue with same filter_map + std::vector<uint8_t> nested_data2; + std::unique_ptr<FilterMap> nested_map2; + + ASSERT_TRUE(filter_map + .generate_nested_filter_map(page2_rep_levels, nested_data2, &nested_map2, + ¤t_row, 0) + .ok()); + + // Verify second page results + std::vector<uint8_t> expected2 = {0, 0, 1, 1, 0, 0, 0, 0, 0}; + EXPECT_EQ(nested_data2, expected2); + EXPECT_EQ(current_row, 5); // Processed up to Row 6 + + // Process third page - continue with same filter_map + std::vector<uint8_t> nested_data3; + std::unique_ptr<FilterMap> nested_map3; + + ASSERT_TRUE(filter_map + .generate_nested_filter_map(page3_rep_levels, nested_data3, &nested_map3, + ¤t_row, 0) + .ok()); + + // Verify third page results + std::vector<uint8_t> expected3 = {0, 0, 1, 1, 1, 1, 1}; + EXPECT_EQ(nested_data3, expected3); + EXPECT_EQ(current_row, 7); // Processed all 8 rows +} + +// Test case where a single row spans three pages +TEST_F(ParquetCrossPageTest, test_single_row_across_three_pages) { + // Filter for one long array row + std::vector<uint8_t> row_filter = {1, 0}; // Only 2 rows of data + + // First page + std::vector<level_t> rep1 = { + 0, // Start of first row + 1, 1, 1, // 3 array elements + 1, 1 // To be continued... + }; + + // Second page + std::vector<level_t> rep2 = { + 1, 1, 1, // Continue with 3 array elements + 1, 1, 1 // To be continued... + }; + + // Third page + std::vector<level_t> rep3 = { + 1, 1, 1, // Last 3 array elements + 0 // Start of second row + }; + + size_t current_row = 0; + + // Use same filter_map for all pages + FilterMap filter_map; + ASSERT_TRUE(filter_map.init(row_filter.data(), row_filter.size(), false).ok()); + + // Process first page + std::vector<uint8_t> nested_data1; + std::unique_ptr<FilterMap> nested_map1; + + ASSERT_TRUE( + filter_map.generate_nested_filter_map(rep1, nested_data1, &nested_map1, ¤t_row, 0) + .ok()); + + // Verify first page results + std::vector<uint8_t> expected1(rep1.size(), 1); // All use first row's filter value + EXPECT_EQ(nested_data1, expected1); + EXPECT_EQ(current_row, 0); // Still in first row + + // Process second page + std::vector<uint8_t> nested_data2; + std::unique_ptr<FilterMap> nested_map2; + + ASSERT_TRUE( + filter_map.generate_nested_filter_map(rep2, nested_data2, &nested_map2, ¤t_row, 0) + .ok()); + + // Verify second page results + std::vector<uint8_t> expected2(rep2.size(), 1); // Still using first row's filter value + EXPECT_EQ(nested_data2, expected2); + EXPECT_EQ(current_row, 0); // Still in first row + + // Process third page + std::vector<uint8_t> nested_data3; + std::unique_ptr<FilterMap> nested_map3; + + ASSERT_TRUE( + filter_map.generate_nested_filter_map(rep3, nested_data3, &nested_map3, ¤t_row, 0) + .ok()); + + // Verify third page results + std::vector<uint8_t> expected3(rep3.size(), 1); + expected3.back() = 0; // Last element uses second row's filter value + EXPECT_EQ(nested_data3, expected3); + EXPECT_EQ(current_row, 1); // Moved to second row +} + +} // namespace doris::vectorized diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp index 19b21c16a45..b792704bd0e 100644 --- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp +++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp @@ -229,12 +229,14 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column } else { data_column = src_column->assume_mutable(); } + FilterMap filter_map; + RETURN_IF_ERROR(filter_map.init(nullptr, 0, false)); ColumnSelectVector run_length_map; // decode page data if (field_schema->definition_level == 0) { // required column std::vector<u_short> null_map = {(u_short)rows}; - run_length_map.set_run_length_null_map(null_map, rows, nullptr); + RETURN_IF_ERROR(run_length_map.init(null_map, rows, nullptr, &filter_map, 0)); RETURN_IF_ERROR( chunk_reader.decode_values(data_column, resolved_type, run_length_map, false)); } else { @@ -248,7 +250,7 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column chunk_reader.insert_null_values(data_column, num_values); } else { std::vector<u_short> null_map = {(u_short)num_values}; - run_length_map.set_run_length_null_map(null_map, rows, nullptr); + RETURN_IF_ERROR(run_length_map.init(null_map, rows, nullptr, &filter_map, 0)); RETURN_IF_ERROR(chunk_reader.decode_values(data_column, resolved_type, run_length_map, false)); } @@ -263,7 +265,7 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column chunk_reader.insert_null_values(data_column, num_values); } else { std::vector<u_short> null_map = {(u_short)num_values}; - run_length_map.set_run_length_null_map(null_map, rows, nullptr); + RETURN_IF_ERROR(run_length_map.init(null_map, rows, nullptr, &filter_map, 0)); RETURN_IF_ERROR( chunk_reader.decode_values(data_column, resolved_type, run_length_map, false)); } diff --git a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/create_table.hql b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/create_table.hql new file mode 100644 index 00000000000..863595278f3 --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/create_table.hql @@ -0,0 +1,58 @@ +CREATE DATABASE IF NOT EXISTS multi_catalog; +USE multi_catalog; + +CREATE TABLE `nested_cross_page1_parquet`( + `id` int, + `array_col` array<int>, + `description` string) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +LOCATION + '/user/doris/suites/multi_catalog/nested_cross_page1_parquet'; + +msck repair table nested_cross_page1_parquet; + +CREATE TABLE `nested_cross_page2_parquet`( + id INT, + nested_array_col ARRAY<ARRAY<INT>>, + array_struct_col ARRAY<STRUCT<x:INT, y:STRING>>, + map_array_col MAP<STRING, ARRAY<INT>>, + complex_struct_col STRUCT< + a: ARRAY<INT>, + b: MAP<STRING, ARRAY<INT>>, + c: STRUCT< + x: ARRAY<INT>, + y: STRING + > + >, + description STRING) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +LOCATION + '/user/doris/suites/multi_catalog/nested_cross_page2_parquet'; + +msck repair table nested_cross_page2_parquet; + +CREATE TABLE `nested_cross_page3_parquet`( + `id` int, + `array_col` array<int>, + `description` string) +ROW FORMAT SERDE + 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' +STORED AS INPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' +OUTPUTFORMAT + 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' +LOCATION + '/user/doris/suites/multi_catalog/nested_cross_page3_parquet'; + +msck repair table nested_cross_page3_parquet; + diff --git a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data.tar.gz b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data.tar.gz new file mode 100644 index 00000000000..ce0d98f7bfc Binary files /dev/null and b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data.tar.gz differ diff --git a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test1.py b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test1.py new file mode 100644 index 00000000000..9fff6d362cd --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test1.py @@ -0,0 +1,192 @@ +import pyarrow as pa +import pyarrow.parquet as pq +import subprocess +import argparse +import json + +# Define the output file path as a constant +OUTPUT_PARQUET_FILE = 'nested_cross_page_test1.parquet' + +def generate_cross_page_test_data(output_file): + # Create test data + data = { + # id column (INT32) + 'id': [1, 2, 3], + + # array column (ARRAY<INT>) + 'array_col': [ + # Row 1 - Large array to force cross-page + [1, 2, 3, 4, 5] * 200, # 1000 elements + + # Row 2 - Normal sized array + [1, 2, 3], + + # Row 3 - Another large array + [6, 7, 8, 9, 10] * 200 # 1000 elements + ], + + # description column (STRING) + 'description': [ + 'This is a large array with repeated sequence [1,2,3,4,5]', + 'This is a small array with just three elements', + 'This is another large array with repeated sequence [6,7,8,9,10]' + ] + } + + # Create table structure + table = pa.Table.from_pydict({ + 'id': pa.array(data['id'], type=pa.int32()), + 'array_col': pa.array(data['array_col'], type=pa.list_(pa.int32())), + 'description': pa.array(data['description'], type=pa.string()) + }) + + # Write to parquet file + pq.write_table( + table, + output_file, + compression=None, # No compression for predictable size + version='2.6', + write_statistics=True, + row_group_size=3, # All data in one row group + data_page_size=100, # Very small page size + write_batch_size=10 # Small batch size + ) + +def inspect_parquet_file(file_path): + """Inspect the structure of generated parquet file""" + pf = pq.ParquetFile(file_path) + print(f"\nFile: {file_path}") + print(f"Number of row groups: {pf.num_row_groups}") + + metadata = pf.metadata + schema = pf.schema + print(f"\nSchema: {schema}") + print(f"\nDetailed schema:") + for i in range(len(schema)): + print(f"Column {i}: {schema[i]}") + + for i in range(metadata.num_row_groups): + rg = metadata.row_group(i) + print(f"\nRow Group {i}:") + print(f"Num rows: {rg.num_rows}") + + for j in range(rg.num_columns): + col = rg.column(j) + print(f"\nColumn {j}:") + print(f"Path: {schema[j].name}") + print(f"Type: {col.physical_type}") + print(f"Encodings: {col.encodings}") + print(f"Total compressed size: {col.total_compressed_size}") + print(f"Total uncompressed size: {col.total_uncompressed_size}") + print(f"Number of values: {col.num_values}") + print(f"Data page offset: {col.data_page_offset}") + if col.dictionary_page_offset is not None: + print(f"Dictionary page offset: {col.dictionary_page_offset}") + +def read_and_print_file(file_path): + """Read and print file content""" + table = pq.read_table(file_path) + df = table.to_pandas() + print("\nFile content:") + for i in range(len(df)): + print(f"\nRow {i}:") + print(f"ID: {df.iloc[i]['id']}") + arr = df.iloc[i]['array_col'] + print(f"Array length: {len(arr)}") + print(f"First few elements: {arr[:5]}...") + print(f"Last few elements: ...{arr[-5:]}") + print(f"Description: {df.iloc[i]['description']}") + +def inspect_pages_with_cli(file_path, parquet_cli_path=None): + """ + Inspect page information using parquet-cli + + Args: + file_path: Path to the parquet file + parquet_cli_path: Optional path to parquet-cli jar file + """ + if not parquet_cli_path: + print("\nSkipping parquet-cli inspection: No parquet-cli path provided") + return + + print("\nParquet CLI Output:") + try: + cmd = f"java -jar {parquet_cli_path} pages {file_path}" + result = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True) + print(result.stdout) + except subprocess.CalledProcessError as e: + print(f"Error running parquet-cli: {e}") + if e.output: + print(f"Error output: {e.output}") + except Exception as e: + print(f"Unexpected error running parquet-cli: {e}") + +def save_test_data_info(output_file): + """Save detailed test data information to text file""" + info = { + "file_format": "Parquet", + "version": "2.6", + "compression": "None", + "row_group_size": 3, + "data_page_size": 100, + "write_batch_size": 10, + "output_file": output_file, + "schema": { + "id": "INT32", + "array_col": "ARRAY<INT32>", + "description": "STRING" + }, + "test_cases": [ + { + "row": 1, + "description": "Large array", + "characteristics": [ + "1000 elements", + "Repeated sequence [1,2,3,4,5]", + "Forces cross-page scenario" + ] + }, + { + "row": 2, + "description": "Small array", + "characteristics": [ + "3 elements", + "Simple sequence [1,2,3]", + "Fits in single page" + ] + }, + { + "row": 3, + "description": "Another large array", + "characteristics": [ + "1000 elements", + "Repeated sequence [6,7,8,9,10]", + "Forces cross-page scenario" + ] + } + ] + } + + info_file = output_file.replace('.parquet', '_info.json') + with open(info_file, 'w') as f: + json.dump(info, f, indent=2) + +if __name__ == '__main__': + # Add command line argument parsing + parser = argparse.ArgumentParser(description='Generate and inspect parquet test data') + parser.add_argument('--parquet-cli', + help='Path to parquet-cli jar file', + default=None) + parser.add_argument('--output', + help='Output parquet file path', + default=OUTPUT_PARQUET_FILE) + args = parser.parse_args() + + # Use the output file path from command line or default + output_file = args.output + + generate_cross_page_test_data(output_file) + inspect_parquet_file(output_file) + read_and_print_file(output_file) + inspect_pages_with_cli(output_file, args.parquet_cli) + save_test_data_info(output_file) diff --git a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test2.py b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test2.py new file mode 100644 index 00000000000..2e67e962131 --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test2.py @@ -0,0 +1,287 @@ +import pyarrow as pa +import pyarrow.parquet as pq +import subprocess +import json +import argparse + +# Define the output file path as a constant +OUTPUT_PARQUET_FILE = 'nested_cross_page_test2.parquet' + +def generate_cross_page_test_data(output_file): + # Create test data + data = { + # id column (INT32) + 'id': [1, 2, 3], + + # Multi-level array column (ARRAY<ARRAY<INT>>) + 'nested_array_col': [ + # Row 1 - Large array + [[i for i in range(10)] for _ in range(100)], # 100 sub-arrays, each with 10 elements + + # Row 2 - Small array + [[1, 2], [3, 4], [5, 6]], + + # Row 3 - Another large array + [[i for i in range(10, 20)] for _ in range(100)] + ], + + # Struct array column (ARRAY<STRUCT<x: int, y: string>>) + 'array_struct_col': [ + # Row 1 + [{'x': i, 'y': f'value_{i}'} for i in range(500)], + + # Row 2 + [{'x': 1, 'y': 'small'}, {'x': 2, 'y': 'array'}], + + # Row 3 + [{'x': i, 'y': f'big_{i}'} for i in range(500)] + ], + + # Map column (MAP<STRING, ARRAY<INT>>) + 'map_array_col': [ + # Row 1 + {f'key_{i}': list(range(i, i+10)) for i in range(50)}, + + # Row 2 + {'small_key': [1, 2, 3]}, + + # Row 3 + {f'big_key_{i}': list(range(i*10, (i+1)*10)) for i in range(50)} + ], + + # Complex nested structure (STRUCT< + # a: ARRAY<INT>, + # b: MAP<STRING, ARRAY<INT>>, + # c: STRUCT<x: ARRAY<INT>, y: STRING> + # >) + 'complex_struct_col': [ + # Row 1 + { + 'a': list(range(100)), + 'b': {f'key_{i}': list(range(i, i+5)) for i in range(20)}, + 'c': {'x': list(range(50)), 'y': 'nested_struct_1'} + }, + + # Row 2 + { + 'a': [1, 2, 3], + 'b': {'small': [1, 2]}, + 'c': {'x': [1], 'y': 'small_struct'} + }, + + # Row 3 + { + 'a': list(range(100, 200)), + 'b': {f'big_{i}': list(range(i*5, (i+1)*5)) for i in range(20)}, + 'c': {'x': list(range(50)), 'y': 'nested_struct_2'} + } + ], + + # Description column (STRING) + 'description': [ + 'Row with large nested arrays and structures', + 'Row with small nested data', + 'Row with another set of large nested arrays and structures' + ] + } + + # Create complex table structure + table = pa.Table.from_pydict({ + 'id': pa.array(data['id'], type=pa.int32()), + + # Multi-level array type + 'nested_array_col': pa.array(data['nested_array_col'], + type=pa.list_(pa.list_(pa.int32()))), + + # Struct array type + 'array_struct_col': pa.array(data['array_struct_col'], + type=pa.list_(pa.struct([ + ('x', pa.int32()), + ('y', pa.string()) + ]))), + + # Map type + 'map_array_col': pa.array(data['map_array_col'], + type=pa.map_(pa.string(), pa.list_(pa.int32()))), + + # Complex nested structure type + 'complex_struct_col': pa.array(data['complex_struct_col'], + type=pa.struct([ + ('a', pa.list_(pa.int32())), + ('b', pa.map_(pa.string(), pa.list_(pa.int32()))), + ('c', pa.struct([ + ('x', pa.list_(pa.int32())), + ('y', pa.string()) + ])) + ])), + + 'description': pa.array(data['description'], type=pa.string()) + }) + + # Write to parquet file + pq.write_table( + table, + output_file, + compression=None, # No compression + version='2.6', + write_statistics=True, + row_group_size=3, # All data in one row group + data_page_size=100, # Very small page size + write_batch_size=1 # Minimum batch size + ) + +def inspect_parquet_file(file_path): + """Inspect the structure of generated parquet file""" + pf = pq.ParquetFile(file_path) + print(f"\nFile: {file_path}") + print(f"Number of row groups: {pf.num_row_groups}") + + metadata = pf.metadata + schema = pf.schema + print(f"\nSchema: {schema}") + print(f"\nDetailed schema:") + for i in range(len(schema)): + print(f"Column {i}: {schema[i]}") + + for i in range(metadata.num_row_groups): + rg = metadata.row_group(i) + print(f"\nRow Group {i}:") + print(f"Num rows: {rg.num_rows}") + + for j in range(rg.num_columns): + col = rg.column(j) + print(f"\nColumn {j}:") + print(f"Path: {schema[j].name}") + print(f"Type: {col.physical_type}") + print(f"Encodings: {col.encodings}") + print(f"Total compressed size: {col.total_compressed_size}") + print(f"Total uncompressed size: {col.total_uncompressed_size}") + print(f"Number of values: {col.num_values}") + print(f"Data page offset: {col.data_page_offset}") + if col.dictionary_page_offset is not None: + print(f"Dictionary page offset: {col.dictionary_page_offset}") + +def format_value(value): + """Format value for printing""" + if isinstance(value, (list, dict)): + return f"{str(value)[:100]}... (length: {len(str(value))})" + return str(value) + +def read_and_print_file(file_path): + """Read and print file content""" + table = pq.read_table(file_path) + df = table.to_pandas() + print("\nFile content:") + + for i in range(len(df)): + print(f"\nRow {i}:") + for column in df.columns: + value = df.iloc[i][column] + print(f"{column}: {format_value(value)}") + +def inspect_pages_with_cli(file_path, parquet_cli_path=None): + """ + Inspect page information using parquet-cli + + Args: + file_path: Path to the parquet file + parquet_cli_path: Optional path to parquet-cli jar file + """ + if not parquet_cli_path: + print("\nSkipping parquet-cli inspection: No parquet-cli path provided") + return + + print("\nParquet CLI Output:") + try: + cmd = f"java -jar {parquet_cli_path} pages {file_path}" + result = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True) + print(result.stdout) + except subprocess.CalledProcessError as e: + print(f"Error running parquet-cli: {e}") + if e.output: + print(f"Error output: {e.output}") + except Exception as e: + print(f"Unexpected error running parquet-cli: {e}") + +def save_test_data_info(output_file): + """Save detailed test data information to text file""" + info = { + "file_format": "Parquet", + "version": "2.6", + "compression": "None", + "row_group_size": 3, + "data_page_size": 100, + "write_batch_size": 1, + "output_file": output_file, + "schema": { + "id": "INT32", + "nested_array_col": "ARRAY<ARRAY<INT32>>", + "array_struct_col": "ARRAY<STRUCT<x: INT32, y: STRING>>", + "map_array_col": "MAP<STRING, ARRAY<INT32>>", + "complex_struct_col": """STRUCT< + a: ARRAY<INT32>, + b: MAP<STRING, ARRAY<INT32>>, + c: STRUCT< + x: ARRAY<INT32>, + y: STRING + > + >""", + "description": "STRING" + }, + "test_cases": [ + { + "row": 1, + "description": "Large nested data", + "characteristics": [ + "Large nested arrays (100 arrays of 10 elements each)", + "Large struct array (500 elements)", + "Large map (50 key-value pairs)", + "Complex nested structure with large arrays" + ] + }, + { + "row": 2, + "description": "Small nested data", + "characteristics": [ + "Small nested arrays (3 arrays of 2 elements each)", + "Small struct array (2 elements)", + "Small map (1 key-value pair)", + "Complex nested structure with small arrays" + ] + }, + { + "row": 3, + "description": "Another large nested data", + "characteristics": [ + "Large nested arrays (100 arrays of 10 elements each)", + "Large struct array (500 elements)", + "Large map (50 key-value pairs)", + "Complex nested structure with large arrays" + ] + } + ] + } + + info_file = output_file.replace('.parquet', '_info.json') + with open(info_file, 'w') as f: + json.dump(info, f, indent=2) + +if __name__ == '__main__': + # Add command line argument parsing + parser = argparse.ArgumentParser(description='Generate and inspect parquet test data') + parser.add_argument('--parquet-cli', + help='Path to parquet-cli jar file', + default=None) + parser.add_argument('--output', + help='Output parquet file path', + default=OUTPUT_PARQUET_FILE) + args = parser.parse_args() + + # Use the output file path from command line or default + output_file = args.output + + generate_cross_page_test_data(output_file) + inspect_parquet_file(output_file) + read_and_print_file(output_file) + inspect_pages_with_cli(output_file, args.parquet_cli) + save_test_data_info(output_file) diff --git a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test3.py b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test3.py new file mode 100644 index 00000000000..75ef7c7e755 --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/data_gen_scripts/nested_cross_page_test3.py @@ -0,0 +1,196 @@ +import pyarrow as pa +import pyarrow.parquet as pq +import subprocess +import argparse +import json + +# Define the output file path as a constant +OUTPUT_PARQUET_FILE = 'nested_cross_page_test3.parquet' + +def generate_cross_page_test_data(output_file): + # Create test data + data = { + # id column (INT32) + 'id': [1, None, 3], + + # array column (ARRAY<INT>) + 'array_col': [ + # Row 1 - Large array to force cross-page + [1, None, 3, 4, 5] * 200, # 1000 elements + + # Row 2 - Null array + None, + + # Row 3 - Another large array with nulls + [6, None, 8, None, 10] * 200 # 1000 elements + ], + + # description column (STRING) + 'description': [ + 'This is a large array with repeated sequence [1,null,3,4,5]', + None, + 'This is another large array with repeated sequence [6,null,8,null,10]' + ] + } + + # Create table structure + table = pa.Table.from_pydict({ + 'id': pa.array(data['id'], type=pa.int32()), + 'array_col': pa.array(data['array_col'], type=pa.list_(pa.int32())), + 'description': pa.array(data['description'], type=pa.string()) + }) + + # Write to parquet file + pq.write_table( + table, + output_file, + compression=None, # No compression for predictable size + version='2.6', + write_statistics=True, + row_group_size=3, # All data in one row group + data_page_size=100, # Very small page size + write_batch_size=10 # Small batch size + ) + +def inspect_parquet_file(file_path): + """Inspect the structure of generated parquet file""" + pf = pq.ParquetFile(file_path) + print(f"\nFile: {file_path}") + print(f"Number of row groups: {pf.num_row_groups}") + + metadata = pf.metadata + schema = pf.schema + print(f"\nSchema: {schema}") + print(f"\nDetailed schema:") + for i in range(len(schema)): + print(f"Column {i}: {schema[i]}") + + for i in range(metadata.num_row_groups): + rg = metadata.row_group(i) + print(f"\nRow Group {i}:") + print(f"Num rows: {rg.num_rows}") + + for j in range(rg.num_columns): + col = rg.column(j) + print(f"\nColumn {j}:") + print(f"Path: {schema[j].name}") + print(f"Type: {col.physical_type}") + print(f"Encodings: {col.encodings}") + print(f"Total compressed size: {col.total_compressed_size}") + print(f"Total uncompressed size: {col.total_uncompressed_size}") + print(f"Number of values: {col.num_values}") + print(f"Data page offset: {col.data_page_offset}") + if col.dictionary_page_offset is not None: + print(f"Dictionary page offset: {col.dictionary_page_offset}") + +def read_and_print_file(file_path): + """Read and print file content""" + table = pq.read_table(file_path) + df = table.to_pandas() + print("\nFile content:") + for i in range(len(df)): + print(f"\nRow {i}:") + print(f"ID: {df.iloc[i]['id']}") + arr = df.iloc[i]['array_col'] + if arr is not None: + print(f"Array length: {len(arr)}") + print(f"First few elements: {arr[:5]}...") + print(f"Last few elements: ...{arr[-5:]}") + else: + print("Array: None") + print(f"Description: {df.iloc[i]['description']}") + +def inspect_pages_with_cli(file_path, parquet_cli_path=None): + """ + Inspect page information using parquet-cli + + Args: + file_path: Path to the parquet file + parquet_cli_path: Optional path to parquet-cli jar file + """ + if not parquet_cli_path: + print("\nSkipping parquet-cli inspection: No parquet-cli path provided") + return + + print("\nParquet CLI Output:") + try: + cmd = f"java -jar {parquet_cli_path} pages {file_path}" + result = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True) + print(result.stdout) + except subprocess.CalledProcessError as e: + print(f"Error running parquet-cli: {e}") + if e.output: + print(f"Error output: {e.output}") + except Exception as e: + print(f"Unexpected error running parquet-cli: {e}") + +def save_test_data_info(output_file): + """Save detailed test data information to text file""" + info = { + "file_format": "Parquet", + "version": "2.6", + "compression": "None", + "row_group_size": 3, + "data_page_size": 100, + "write_batch_size": 10, + "output_file": output_file, + "schema": { + "id": "INT32", + "array_col": "ARRAY<INT32>", + "description": "STRING" + }, + "test_cases": [ + { + "row": 1, + "description": "Large array with nulls", + "characteristics": [ + "1000 elements", + "Repeated sequence [1,null,3,4,5]", + "Forces cross-page scenario" + ] + }, + { + "row": 2, + "description": "Null array and values", + "characteristics": [ + "Entire array is null", + "ID is null", + "Description is null" + ] + }, + { + "row": 3, + "description": "Another large array with nulls", + "characteristics": [ + "1000 elements", + "Repeated sequence [6,null,8,null,10]", + "Forces cross-page scenario" + ] + } + ] + } + + info_file = output_file.replace('.parquet', '_info.json') + with open(info_file, 'w') as f: + json.dump(info, f, indent=2) + +if __name__ == '__main__': + # Add command line argument parsing + parser = argparse.ArgumentParser(description='Generate and inspect parquet test data') + parser.add_argument('--parquet-cli', + help='Path to parquet-cli jar file', + default=None) + parser.add_argument('--output', + help='Output parquet file path', + default=OUTPUT_PARQUET_FILE) + args = parser.parse_args() + + # Use the output file path from command line or default + output_file = args.output + + generate_cross_page_test_data(output_file) + inspect_parquet_file(output_file) + read_and_print_file(output_file) + inspect_pages_with_cli(output_file, args.parquet_cli) + save_test_data_info(output_file) + diff --git a/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/run.sh b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/run.sh new file mode 100755 index 00000000000..f3136eaa200 --- /dev/null +++ b/docker/thirdparties/docker-compose/hive/scripts/data/multi_catalog/parquet_nested_types/run.sh @@ -0,0 +1,12 @@ +#!/bin/bash +set -x + +CUR_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" + +## mkdir and put data to hdfs +cd "${CUR_DIR}" && rm -rf data/ && tar xzf data.tar.gz +hadoop fs -mkdir -p /user/doris/suites/multi_catalog/ +hadoop fs -put "${CUR_DIR}"/data/* /user/doris/suites/multi_catalog/ + +# create table +hive -f "${CUR_DIR}/create_table.hql" diff --git a/regression-test/data/external_table_p0/hive/test_parquet_nested_types.out b/regression-test/data/external_table_p0/hive/test_parquet_nested_types.out new file mode 100644 index 00000000000..73049b7866f Binary files /dev/null and b/regression-test/data/external_table_p0/hive/test_parquet_nested_types.out differ diff --git a/regression-test/suites/external_table_p0/hive/test_parquet_nested_types.groovy b/regression-test/suites/external_table_p0/hive/test_parquet_nested_types.groovy new file mode 100644 index 00000000000..d94f909142e --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/test_parquet_nested_types.groovy @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_parquet_nested_types", "p0,external,hive,external_docker,external_docker_hive") { + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable Hive test.") + return; + } + + for (String hivePrefix : ["hive2", "hive3"]) { + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String catalog_name = "${hivePrefix}_test_parquet_nested_types" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + );""" + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + + sql """ use multi_catalog """ + + order_qt_nested_cross_page1_parquet_q1 """select array_col from nested_cross_page1_parquet where id = 1""" + + order_qt_nested_cross_page1_parquet_q2 """select array_col from nested_cross_page1_parquet where id = 2""" + + order_qt_nested_cross_page1_parquet_q3 """select array_col from nested_cross_page1_parquet where id = 3""" + + order_qt_nested_cross_page1_parquet_q4 """ + SELECT id, array_size(array_col) as arr_size + FROM nested_cross_page1_parquet + ORDER BY id + """ + + order_qt_nested_cross_page1_parquet_q5 """ + SELECT id, array_col[1] as first_elem, array_col[3] as third_elem + FROM nested_cross_page1_parquet + ORDER BY id + """ + + order_qt_nested_cross_page1_parquet_q6 """ + SELECT id, array_col + FROM nested_cross_page1_parquet + WHERE array_size(array_col) > 100 + ORDER BY id + """ + + order_qt_nested_cross_page1_parquet_q7 """ + SELECT id, array_col + FROM nested_cross_page1_parquet + WHERE array_contains(array_col, 1) + ORDER BY id + """ + + order_qt_nested_cross_page1_parquet_q8 """ + SELECT id, array_col, description + FROM nested_cross_page1_parquet + WHERE id > 1 AND array_size(array_col) < 100 + ORDER BY id + """ + + order_qt_nested_cross_page1_parquet_q9 """ + SELECT + id, + array_min(array_col) as min_val, + array_max(array_col) as max_val + FROM nested_cross_page1_parquet + ORDER BY id + """ + + order_qt_nested_cross_page1_parquet_q10 """ + SELECT id, array_col + FROM nested_cross_page1_parquet + WHERE description LIKE '%large array%' + ORDER BY id + """ + + order_qt_nested_cross_page2_parquet_q1 """ + SELECT + id, + nested_array_col, + array_size(nested_array_col) as outer_size + FROM nested_cross_page2_parquet + WHERE id = 1 + """ + + order_qt_nested_cross_page2_parquet_q2 """ + SELECT + id, + nested_array_col, + array_size(nested_array_col) as outer_size + FROM nested_cross_page2_parquet + WHERE id = 2 + """ + + order_qt_nested_cross_page2_parquet_q3 """ + SELECT + id, + nested_array_col, + array_size(nested_array_col) as outer_size + FROM nested_cross_page2_parquet + WHERE id = 3 + """ + + order_qt_nested_cross_page2_parquet_q4 """ + SELECT + id, + array_struct_col, + array_size(array_struct_col) as struct_arr_size + FROM nested_cross_page2_parquet + WHERE description LIKE '%large%' + """ + + order_qt_nested_cross_page2_parquet_q5 """ + SELECT + id, + item_x as x_value, + item_y as y_value + FROM nested_cross_page2_parquet + LATERAL VIEW EXPLODE(array_struct_col) tmp AS item_x, item_y + WHERE id = 1 AND item_x > 100 + """ + + order_qt_nested_cross_page2_parquet_q6 """ + SELECT + id, + map_array_col, + array_size(map_array_col) as map_size + FROM nested_cross_page2_parquet + WHERE id = 2 + """ + + order_qt_nested_cross_page3_parquet_q1 """select array_col from nested_cross_page3_parquet where id = 1""" + + order_qt_nested_cross_page3_parquet_q2 """select array_col from nested_cross_page3_parquet where id = 2""" + + order_qt_nested_cross_page3_parquet_q3 """select array_col from nested_cross_page3_parquet where id = 3""" + + order_qt_nested_cross_page3_parquet_q4 """ + SELECT id, array_size(array_col) as arr_size + FROM nested_cross_page3_parquet + ORDER BY id + """ + + order_qt_nested_cross_page3_parquet_q5 """ + SELECT id, array_col[1] as first_elem, array_col[3] as third_elem + FROM nested_cross_page3_parquet + ORDER BY id + """ + + order_qt_nested_cross_page3_parquet_q6 """ + SELECT id, array_col + FROM nested_cross_page3_parquet + WHERE array_size(array_col) > 100 + ORDER BY id + """ + + order_qt_nested_cross_page3_parquet_q7 """ + SELECT id, array_col + FROM nested_cross_page3_parquet + WHERE array_contains(array_col, 1) + ORDER BY id + """ + + order_qt_nested_cross_page3_parquet_q8 """ + SELECT id, array_col, description + FROM nested_cross_page3_parquet + WHERE id > 1 AND array_size(array_col) < 100 + ORDER BY id + """ + + order_qt_nested_cross_page3_parquet_q9 """ + SELECT + id, + array_min(array_col) as min_val, + array_max(array_col) as max_val + FROM nested_cross_page3_parquet + ORDER BY id + """ + + order_qt_nested_cross_page3_parquet_q10 """ + SELECT id, array_col + FROM nested_cross_page3_parquet + WHERE description LIKE '%large array%' + ORDER BY id + """ + + sql """drop catalog ${catalog_name};""" + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org