This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4fc7a048d2 [feature-wip](parquet-reader) fix string test and support decimal64 (#13184) 4fc7a048d2 is described below commit 4fc7a048d210ac1396b92ea70f962e989cb95c97 Author: slothever <18522955+w...@users.noreply.github.com> AuthorDate: Wed Oct 12 16:52:28 2022 +0800 [feature-wip](parquet-reader) fix string test and support decimal64 (#13184) 1. Refactor arguments list of parquet min max filter, pass parquet type for min max value parsing 2. Fix the filter of string min max Co-authored-by: jinzhe <jin...@selectdb.com> --- be/src/vec/exec/format/parquet/parquet_pred_cmp.h | 391 +++++++++++++-------- .../exec/format/parquet/vparquet_page_index.cpp | 3 +- .../vec/exec/format/parquet/vparquet_page_index.h | 1 + be/src/vec/exec/format/parquet/vparquet_reader.cpp | 13 +- 4 files changed, 259 insertions(+), 149 deletions(-) diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h index 89792eaec1..5f33880c2a 100644 --- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h +++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h @@ -38,26 +38,6 @@ namespace doris::vectorized { return true; \ } -#define _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) \ - if (max <= conjunct_value) { \ - return true; \ - } - -#define _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) \ - if (max < conjunct_value) { \ - return true; \ - } - -#define _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) \ - if (min >= conjunct_value) { \ - return true; \ - } - -#define _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) \ - if (min > conjunct_value) { \ - return true; \ - } - #define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \ std::vector<T> in_values; \ for (auto val : in_pred_values) { \ @@ -76,32 +56,80 @@ namespace doris::vectorized { return true; \ } -static bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred_values, - const char* min_bytes, const char* max_bytes) { - switch (conjunct_type) { +struct ColumnMinMaxParams { + PrimitiveType conjunct_type; + tparquet::Type::type parquet_type; + void* value; + // Use for decimal type + int32_t parquet_precision; + int32_t parquet_scale; + int32_t parquet_type_length; + // Use for in predicate + std::vector<void*> in_pred_values; + const char* min_bytes; + const char* max_bytes; +}; + +template <typename T> +static void _align_decimal_v2_scale(T* conjunct_value, int32_t value_scale, T* parquet_value, + int32_t parquet_scale) { + if (value_scale > parquet_scale) { + *parquet_value = *parquet_value * common::exp10_i32(value_scale - parquet_scale); + } else if (value_scale < parquet_scale) { + *conjunct_value = *conjunct_value * common::exp10_i32(parquet_scale - value_scale); + } +} + +template <typename T> +static void _decode_decimal_v2_to_primary(const ColumnMinMaxParams& params, + const char* raw_parquet_val, T* out_value, + T* parquet_val) { + *parquet_val = reinterpret_cast<const T*>(raw_parquet_val)[0]; + DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); + *out_value = conjunct_value.value(); + _align_decimal_v2_scale(out_value, conjunct_value.scale(), parquet_val, params.parquet_scale); +} + +// todo: support decimal128 after the test passes +//static Int128 _decode_value_to_int128(const ColumnMinMaxParams& params, +// const char* raw_parquet_val) { +// const uint8_t* buf = reinterpret_cast<const uint8_t*>(raw_parquet_val); +// int32_t length = params.parquet_type_length; +// Int128 value = buf[0] & 0x80 ? -1 : 0; +// memcpy(reinterpret_cast<uint8_t*>(&value) + sizeof(value) - length, buf, length); +// return BigEndian::ToHost128(value); +//} + +static bool _eval_in_val(const ColumnMinMaxParams& params) { + switch (params.conjunct_type) { case TYPE_TINYINT: { - _FILTER_GROUP_BY_IN(int8_t, in_pred_values, min_bytes, max_bytes) + _FILTER_GROUP_BY_IN(int8_t, params.in_pred_values, params.min_bytes, params.max_bytes) break; } case TYPE_SMALLINT: { - _FILTER_GROUP_BY_IN(int16_t, in_pred_values, min_bytes, max_bytes) + _FILTER_GROUP_BY_IN(int16_t, params.in_pred_values, params.min_bytes, params.max_bytes) break; } + case TYPE_DECIMAL32: case TYPE_INT: { - _FILTER_GROUP_BY_IN(int32_t, in_pred_values, min_bytes, max_bytes) + _FILTER_GROUP_BY_IN(int32_t, params.in_pred_values, params.min_bytes, params.max_bytes) break; } + case TYPE_DECIMAL64: case TYPE_BIGINT: { - _FILTER_GROUP_BY_IN(int64_t, in_pred_values, min_bytes, max_bytes) + _FILTER_GROUP_BY_IN(int64_t, params.in_pred_values, params.min_bytes, params.max_bytes) + break; + } + case TYPE_DECIMALV2: { break; } case TYPE_STRING: case TYPE_VARCHAR: case TYPE_CHAR: { std::vector<const char*> in_values; - for (auto val : in_pred_values) { - const char* value = ((std::string*)val)->data(); - in_values.emplace_back(value); + for (auto val : params.in_pred_values) { + std::string value = ((StringValue*)val)->to_string(); + in_values.emplace_back(value.data()); } if (in_values.empty()) { return false; @@ -109,7 +137,7 @@ static bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred auto result = std::minmax_element(in_values.begin(), in_values.end()); const char* in_min = *result.first; const char* in_max = *result.second; - if (strcmp(in_max, min_bytes) < 0 || strcmp(in_min, max_bytes) > 0) { + if (strcmp(in_max, params.min_bytes) < 0 || strcmp(in_min, params.max_bytes) > 0) { return true; } break; @@ -120,34 +148,77 @@ static bool _eval_in_val(PrimitiveType conjunct_type, std::vector<void*> in_pred return false; } -static bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_bytes, - const char* max_bytes) { - switch (conjunct_type) { +static bool _eval_eq(const ColumnMinMaxParams& params) { + switch (params.conjunct_type) { case TYPE_TINYINT: { - _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max) + _PLAIN_DECODE(int16_t, params.value, params.min_bytes, params.max_bytes, conjunct_value, + min, max) _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) break; } case TYPE_SMALLINT: { - _PLAIN_DECODE(int16_t, value, min_bytes, max_bytes, conjunct_value, min, max) + _PLAIN_DECODE(int16_t, params.value, params.min_bytes, params.max_bytes, conjunct_value, + min, max) _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) break; } + case TYPE_DECIMAL32: case TYPE_INT: { - _PLAIN_DECODE(int32_t, value, min_bytes, max_bytes, conjunct_value, min, max) + _PLAIN_DECODE(int32_t, params.value, params.min_bytes, params.max_bytes, conjunct_value, + min, max) _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) break; } + case TYPE_DECIMAL64: case TYPE_BIGINT: { - _PLAIN_DECODE(int64_t, value, min_bytes, max_bytes, conjunct_value, min, max) + _PLAIN_DECODE(int64_t, params.value, params.min_bytes, params.max_bytes, conjunct_value, + min, max) _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) break; } + case TYPE_DECIMALV2: { + if (params.parquet_type == tparquet::Type::INT32) { + int32_t min_value = reinterpret_cast<const int32_t*>(params.min_bytes)[0]; + int32_t max_value = reinterpret_cast<const int32_t*>(params.max_bytes)[0]; + DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); + int32_t conjunct_int_value = conjunct_value.value(); + _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min_value, + params.parquet_scale); + _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max_value, + params.parquet_scale); + _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value) + } else if (params.parquet_type == tparquet::Type::INT64) { + int64_t min_value = reinterpret_cast<const int64_t*>(params.min_bytes)[0]; + int64_t max_value = reinterpret_cast<const int64_t*>(params.max_bytes)[0]; + DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); + int64_t conjunct_int_value = conjunct_value.value(); + _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min_value, + params.parquet_scale); + _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max_value, + params.parquet_scale); + _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value) + } + break; + // When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode + // todo: support decimal128 after the test passes + // else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + // DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); + // Int128 conjunct_int_value = conjunct_value.value(); + // Int128 max = _decode_value_to_int128(params, params.max_bytes); + // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max, + // params.parquet_scale); + // Int128 min = _decode_value_to_int128(params, params.min_bytes); + // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min, + // params.parquet_scale); + // _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min, max) + // } + } case TYPE_STRING: case TYPE_VARCHAR: case TYPE_CHAR: { - const char* conjunct_value = ((std::string*)value)->data(); - if (strcmp(conjunct_value, min_bytes) < 0 || strcmp(conjunct_value, max_bytes) > 0) { + std::string conjunct_value = ((StringValue*)params.value)->to_string(); + if (strcmp(conjunct_value.data(), params.min_bytes) < 0 || + strcmp(conjunct_value.data(), params.max_bytes) > 0) { return true; } break; @@ -158,70 +229,73 @@ static bool _eval_eq(PrimitiveType conjunct_type, void* value, const char* min_b return false; } -static bool _eval_gt(PrimitiveType conjunct_type, void* value, const char* max_bytes) { - switch (conjunct_type) { - case TYPE_TINYINT: { - _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max) - _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) - break; - } - case TYPE_SMALLINT: { - _PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max) - _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) - break; - } - case TYPE_INT: { - _PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max) - _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) - break; - } - case TYPE_BIGINT: { - _PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max) - _FILTER_GROUP_BY_GT_PRED(conjunct_value, max) - break; - } - case TYPE_STRING: - case TYPE_VARCHAR: - case TYPE_CHAR: { - const char* conjunct_value = ((std::string*)value)->data(); - if (strcmp(max_bytes, conjunct_value) <= 0) { +template <typename T> +static bool _filter_group_by_gt_or_ge(T conjunct_value, T max, bool is_ge) { + if (!is_ge) { + if (max <= conjunct_value) { + return true; + } + } else { + if (max < conjunct_value) { return true; } - break; - } - default: - return false; } return false; } -static bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_bytes) { - switch (conjunct_type) { +static bool _eval_gt(const ColumnMinMaxParams& params, bool is_eq) { + switch (params.conjunct_type) { case TYPE_TINYINT: { - _PLAIN_DECODE_SINGLE(int8_t, value, max_bytes, conjunct_value, max) - _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) - break; + _PLAIN_DECODE_SINGLE(int8_t, params.value, params.max_bytes, conjunct_value, max) + return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq); } case TYPE_SMALLINT: { - _PLAIN_DECODE_SINGLE(int16_t, value, max_bytes, conjunct_value, max) - _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) - break; + _PLAIN_DECODE_SINGLE(int16_t, params.value, params.max_bytes, conjunct_value, max) + return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq); } + case TYPE_DECIMAL32: case TYPE_INT: { - _PLAIN_DECODE_SINGLE(int32_t, value, max_bytes, conjunct_value, max) - _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) - break; + _PLAIN_DECODE_SINGLE(int32_t, params.value, params.max_bytes, conjunct_value, max) + return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq); } + case TYPE_DECIMAL64: case TYPE_BIGINT: { - _PLAIN_DECODE_SINGLE(int64_t, value, max_bytes, conjunct_value, max) - _FILTER_GROUP_BY_GE_PRED(conjunct_value, max) + _PLAIN_DECODE_SINGLE(int64_t, params.value, params.max_bytes, conjunct_value, max) + return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq); + } + case TYPE_DECIMALV2: { + if (params.parquet_type == tparquet::Type::INT32) { + int32_t conjunct_int_value = 0; + int32_t parquet_value = 0; + _decode_decimal_v2_to_primary(params, params.max_bytes, &conjunct_int_value, + &parquet_value); + return _filter_group_by_gt_or_ge(conjunct_int_value, parquet_value, is_eq); + } else if (params.parquet_type == tparquet::Type::INT64) { + int64_t conjunct_int_value = 0; + int64_t parquet_value = 0; + _decode_decimal_v2_to_primary(params, params.max_bytes, &conjunct_int_value, + &parquet_value); + return _filter_group_by_gt_or_ge(conjunct_int_value, parquet_value, is_eq); + } break; + // When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode + // todo: support decimal128 after the test passes + // else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + // DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); + // Int128 conjunct_int_value = conjunct_value.value(); + // Int128 max = _decode_value_to_int128(params, params.max_bytes); + // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max, + // params.parquet_scale); + // return _filter_group_by_gt_or_ge(conjunct_int_value, max, is_eq); + // } } case TYPE_STRING: case TYPE_VARCHAR: case TYPE_CHAR: { - const char* conjunct_value = ((std::string*)value)->data(); - if (strcmp(max_bytes, conjunct_value) < 0) { + std::string conjunct_value = ((StringValue*)params.value)->to_string(); + if (!is_eq && strcmp(params.max_bytes, conjunct_value.data()) <= 0) { + return true; + } else if (strcmp(params.max_bytes, conjunct_value.data()) < 0) { return true; } break; @@ -232,74 +306,82 @@ static bool _eval_ge(PrimitiveType conjunct_type, void* value, const char* max_b return false; } -static bool _eval_lt(PrimitiveType conjunct_type, void* value, const char* min_bytes) { - switch (conjunct_type) { - case TYPE_TINYINT: { - _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min) - _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) - break; - } - case TYPE_SMALLINT: { - _PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min) - _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) - break; - } - case TYPE_INT: { - _PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min) - _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) - break; - } - case TYPE_BIGINT: { - _PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min) - _FILTER_GROUP_BY_LT_PRED(conjunct_value, min) - break; - } - case TYPE_STRING: - case TYPE_VARCHAR: - case TYPE_CHAR: { - const char* conjunct_value = ((std::string*)value)->data(); - if (strcmp(min_bytes, conjunct_value) >= 0) { +template <typename T> +static bool _filter_group_by_lt_or_le(T conjunct_value, T min, bool is_le) { + if (!is_le) { + if (min >= conjunct_value) { + return true; + } + } else { + if (min > conjunct_value) { return true; } - break; - } - default: - return false; } return false; } -static bool _eval_le(PrimitiveType conjunct_type, void* value, const char* min_bytes) { - switch (conjunct_type) { +static bool _eval_lt(const ColumnMinMaxParams& params, bool is_eq) { + switch (params.conjunct_type) { case TYPE_TINYINT: { - _PLAIN_DECODE_SINGLE(int8_t, value, min_bytes, conjunct_value, min) - _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) - break; + _PLAIN_DECODE_SINGLE(int8_t, params.value, params.min_bytes, conjunct_value, min) + return _filter_group_by_lt_or_le(conjunct_value, min, is_eq); } case TYPE_SMALLINT: { - _PLAIN_DECODE_SINGLE(int16_t, value, min_bytes, conjunct_value, min) - _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) - break; + _PLAIN_DECODE_SINGLE(int16_t, params.value, params.min_bytes, conjunct_value, min) + return _filter_group_by_lt_or_le(conjunct_value, min, is_eq); } + case TYPE_DECIMAL32: case TYPE_INT: { - _PLAIN_DECODE_SINGLE(int32_t, value, min_bytes, conjunct_value, min) - _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) - break; + _PLAIN_DECODE_SINGLE(int32_t, params.value, params.min_bytes, conjunct_value, min) + return _filter_group_by_lt_or_le(conjunct_value, min, is_eq); } + case TYPE_DECIMAL64: case TYPE_BIGINT: { - _PLAIN_DECODE_SINGLE(int64_t, value, min_bytes, conjunct_value, min) - _FILTER_GROUP_BY_LE_PRED(conjunct_value, min) - break; + _PLAIN_DECODE_SINGLE(int64_t, params.value, params.min_bytes, conjunct_value, min) + return _filter_group_by_lt_or_le(conjunct_value, min, is_eq); } case TYPE_STRING: case TYPE_VARCHAR: case TYPE_CHAR: { - const char* conjunct_value = ((std::string*)value)->data(); - if (strcmp(min_bytes, conjunct_value) > 0) { + std::string conjunct_value = ((StringValue*)params.value)->to_string(); + if (!is_eq && strcmp(params.min_bytes, conjunct_value.data()) >= 0) { + return true; + } else if (strcmp(params.min_bytes, conjunct_value.data()) > 0) { return true; } break; } + case TYPE_DECIMALV2: { + if (params.parquet_type == tparquet::Type::INT32) { + int32_t conjunct_int_value = 0; + int32_t parquet_value = 0; + _decode_decimal_v2_to_primary(params, params.min_bytes, &conjunct_int_value, + &parquet_value); + return _filter_group_by_lt_or_le(conjunct_int_value, parquet_value, is_eq); + } else if (params.parquet_type == tparquet::Type::INT64) { + int64_t conjunct_int_value = 0; + int64_t parquet_value = 0; + _decode_decimal_v2_to_primary(params, params.min_bytes, &conjunct_int_value, + &parquet_value); + return _filter_group_by_lt_or_le(conjunct_int_value, parquet_value, is_eq); + } + break; + // When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode + // todo: support decimal128 after the test passes + // else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + // DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); + // Int128 conjunct_int_value = conjunct_value.value(); + // Int128 min = _decode_value_to_int128(params, params.min_bytes); + // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min, + // params.parquet_scale); + // return _filter_group_by_lt_or_le(conjunct_int_value, min, is_eq); + // } + } + case TYPE_DATE: { + // doris::DateTimeValue* min_date = (doris::DateTimeValue*)params.value; + // LOG(INFO) << min_date->debug_string(); + return false; + } default: return false; } @@ -386,42 +468,48 @@ static void to_filter(const ColumnValueRange<primitive_type>& col_val_range, } } -static void _eval_predicate(ScanPredicate filter, PrimitiveType col_type, const char* min_bytes, - const char* max_bytes, bool& need_filter) { +static void _eval_predicate(const ScanPredicate& filter, ColumnMinMaxParams* params, + bool* need_filter) { if (filter._values.empty()) { return; } if (filter._op == TExprOpcode::FILTER_NEW_IN) { - need_filter = _eval_in_val(col_type, filter._values, min_bytes, max_bytes); + if (filter._values.size() == 1) { + params->value = filter._values[0]; + *need_filter = _eval_eq(*params); + return; + } + params->in_pred_values = filter._values; + *need_filter = _eval_in_val(*params); return; } // preserve TExprOpcode::FILTER_NEW_NOT_IN - auto& value = filter._values[0]; + params->value = filter._values[0]; switch (filter._op) { case TExprOpcode::EQ: - need_filter = _eval_eq(col_type, value, min_bytes, max_bytes); + *need_filter = _eval_eq(*params); break; case TExprOpcode::NE: break; case TExprOpcode::GT: - need_filter = _eval_gt(col_type, value, max_bytes); + *need_filter = _eval_gt(*params, false); break; case TExprOpcode::GE: - need_filter = _eval_ge(col_type, value, max_bytes); + *need_filter = _eval_gt(*params, true); break; case TExprOpcode::LT: - need_filter = _eval_lt(col_type, value, min_bytes); + *need_filter = _eval_lt(*params, false); break; case TExprOpcode::LE: - need_filter = _eval_le(col_type, value, min_bytes); + *need_filter = _eval_lt(*params, true); break; default: break; } } -static bool determine_filter_min_max(ColumnValueRangeType& col_val_range, - const std::string& encoded_min, +static bool determine_filter_min_max(const ColumnValueRangeType& col_val_range, + const FieldSchema* col_schema, const std::string& encoded_min, const std::string& encoded_max) { const char* min_bytes = encoded_min.data(); const char* max_bytes = encoded_max.data(); @@ -434,10 +522,21 @@ static bool determine_filter_min_max(ColumnValueRangeType& col_val_range, to_filter(range, filters); }, col_val_range); + if (filters.empty()) { + return false; + } + + ColumnMinMaxParams params; + params.conjunct_type = col_type; + params.parquet_type = col_schema->physical_type; + params.parquet_precision = col_schema->parquet_schema.precision; + params.parquet_scale = col_schema->parquet_schema.scale; + params.parquet_type_length = col_schema->parquet_schema.type_length; + params.min_bytes = min_bytes; + params.max_bytes = max_bytes; for (int i = 0; i < filters.size(); i++) { - ScanPredicate filter = filters[i]; - _eval_predicate(filter, col_type, min_bytes, max_bytes, need_filter); + _eval_predicate(filters[i], ¶ms, &need_filter); if (need_filter) { break; } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp index acc076ff7c..a5673833a8 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp @@ -39,6 +39,7 @@ Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index, Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* column_index, ColumnValueRangeType& col_val_range, + const FieldSchema* col_schema, std::vector<int>& skipped_ranges) { const std::vector<std::string>& encoded_min_vals = column_index->min_values; const std::vector<std::string>& encoded_max_vals = column_index->max_values; @@ -46,7 +47,7 @@ Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* column_index const int num_of_pages = column_index->null_pages.size(); for (int page_id = 0; page_id < num_of_pages; page_id++) { - if (determine_filter_min_max(col_val_range, encoded_min_vals[page_id], + if (determine_filter_min_max(col_val_range, col_schema, encoded_min_vals[page_id], encoded_max_vals[page_id])) { skipped_ranges.emplace_back(page_id); } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h index cfbe97ded4..c5f8183e35 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h @@ -32,6 +32,7 @@ public: int page_idx, RowRange* row_range); Status collect_skipped_page_range(tparquet::ColumnIndex* column_index, ColumnValueRangeType& col_val_range, + const FieldSchema* col_schema, std::vector<int>& skipped_ranges); bool check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns); Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff, diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index c8ea5da343..8d5042af09 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -312,6 +312,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, RETURN_IF_ERROR( _file_reader->readat(page_index._column_index_start, buffer_size, &bytes_read, buff)); + auto& schema_desc = _file_metadata->schema(); std::vector<RowRange> skipped_row_ranges; for (auto& read_col : _read_columns) { auto conjunct_iter = _colname_to_value_range->find(read_col._file_slot_name); @@ -320,6 +321,9 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, } auto& chunk = row_group.columns[read_col._parquet_col_id]; tparquet::ColumnIndex column_index; + if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) { + return Status::OK(); + } RETURN_IF_ERROR(page_index.parse_column_index(chunk, buff, &column_index)); const int num_of_pages = column_index.null_pages.size(); if (num_of_pages <= 0) { @@ -327,7 +331,9 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, } auto& conjuncts = conjunct_iter->second; std::vector<int> skipped_page_range; - page_index.collect_skipped_page_range(&column_index, conjuncts, skipped_page_range); + const FieldSchema* col_schema = schema_desc.get_column(read_col._file_slot_name); + page_index.collect_skipped_page_range(&column_index, conjuncts, col_schema, + skipped_page_range); if (skipped_page_range.empty()) { return Status::OK(); } @@ -387,6 +393,7 @@ Status ParquetReader::_process_row_group_filter(const tparquet::RowGroup& row_gr Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::ColumnChunk>& columns, bool* filter_group) { + auto& schema_desc = _file_metadata->schema(); for (auto& col_name : _column_names) { auto col_iter = _map_column.find(col_name); if (col_iter == _map_column.end()) { @@ -401,8 +408,10 @@ Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::Co if (!statistic.__isset.max || !statistic.__isset.min) { continue; } + const FieldSchema* col_schema = schema_desc.get_column(col_name); // Min-max of statistic is plain-encoded value - *filter_group = determine_filter_min_max(slot_iter->second, statistic.min, statistic.max); + *filter_group = determine_filter_min_max(slot_iter->second, col_schema, statistic.min, + statistic.max); if (*filter_group) { break; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org