This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 27216dc7e0 [improvement](multi-catalog) push down all predicates into rowgroup/page filtering for ParquetReader (#16388) 27216dc7e0 is described below commit 27216dc7e0af8d5613249d01f3dd63088db5fb62 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Tue Feb 7 11:32:57 2023 +0800 [improvement](multi-catalog) push down all predicates into rowgroup/page filtering for ParquetReader (#16388) Tow improvements: 1. Refactor rowgroup&page filtering in `ParquetReader`, and use the operator overloading of Doris native c++ type to process comparison. 2. Support decimal/decimal v3/date/datev2/datetime/datetimev2 --- be/src/vec/exec/format/parquet/parquet_common.cpp | 4 - be/src/vec/exec/format/parquet/parquet_common.h | 8 +- be/src/vec/exec/format/parquet/parquet_pred_cmp.h | 823 ++++++++------------- .../exec/format/parquet/vparquet_page_index.cpp | 8 +- .../vec/exec/format/parquet/vparquet_page_index.h | 2 +- be/src/vec/exec/format/parquet/vparquet_reader.cpp | 6 +- 6 files changed, 336 insertions(+), 515 deletions(-) diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp index 5f8656fcd9..1449d1bdf3 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.cpp +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -29,10 +29,6 @@ const uint32_t ParquetInt96::JULIAN_EPOCH_OFFSET_DAYS = 2440588; const uint64_t ParquetInt96::MICROS_IN_DAY = 86400000000; const uint64_t ParquetInt96::NANOS_PER_MICROSECOND = 1000; -inline uint64_t ParquetInt96::to_timestamp_micros() const { - return (hi - JULIAN_EPOCH_OFFSET_DAYS) * MICROS_IN_DAY + lo / NANOS_PER_MICROSECOND; -} - #define FOR_LOGICAL_NUMERIC_TYPES(M) \ M(TypeIndex::Int8, Int8) \ M(TypeIndex::UInt8, UInt8) \ diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index c06d8690ac..95727ed139 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -68,7 +68,9 @@ struct ParquetInt96 { uint64_t lo; // time of nanoseconds in a day uint32_t hi; // days from julian epoch - inline uint64_t to_timestamp_micros() const; + inline uint64_t to_timestamp_micros() const { + return (hi - JULIAN_EPOCH_OFFSET_DAYS) * MICROS_IN_DAY + lo / NANOS_PER_MICROSECOND; + } static const uint32_t JULIAN_EPOCH_OFFSET_DAYS; static const uint64_t MICROS_IN_DAY; @@ -361,7 +363,6 @@ Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column, size_t data_index = column_data.size(); column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); size_t dict_index = 0; - int64_t scale_to_micro = _decode_params->scale_to_nano_factor / 1000; ColumnSelectVector::DataReadType read_type; while (size_t run_length = select_vector.get_next_run(&read_type)) { switch (read_type) { @@ -373,7 +374,8 @@ Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column, v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz); if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { // nanoseconds will be ignored. - v.set_microsecond((date_value % _decode_params->second_mask) * scale_to_micro); + v.set_microsecond((date_value % _decode_params->second_mask) * + _decode_params->scale_to_nano_factor / 1000); // TODO: the precision of datetime v1 } _FIXED_SHIFT_DATA_OFFSET(); diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h index 49016b5266..adc71dcedd 100644 --- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h +++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h @@ -21,541 +21,362 @@ #include <vector> #include "exec/olap_common.h" +#include "parquet_common.h" namespace doris::vectorized { -#define _PLAIN_DECODE(T, value, min_bytes, max_bytes, out_value, out_min, out_max) \ - const T out_min = reinterpret_cast<const T*>(min_bytes)[0]; \ - const T out_max = reinterpret_cast<const T*>(max_bytes)[0]; \ - T out_value = *((T*)value); +class ParquetPredicate { +#define FOR_REINTERPRET_TYPES(M) \ + M(TYPE_BOOLEAN, tparquet::Type::BOOLEAN) \ + M(TYPE_TINYINT, tparquet::Type::INT32) \ + M(TYPE_SMALLINT, tparquet::Type::INT32) \ + M(TYPE_INT, tparquet::Type::INT32) \ + M(TYPE_BIGINT, tparquet::Type::INT64) \ + M(TYPE_FLOAT, tparquet::Type::FLOAT) \ + M(TYPE_DOUBLE, tparquet::Type::DOUBLE) -#define _PLAIN_DECODE_SINGLE(T, value, bytes, conjunct_value, out) \ - const T out = reinterpret_cast<const T*>(bytes)[0]; \ - T conjunct_value = *((T*)value); +private: + struct ScanPredicate { + ScanPredicate() = default; + ~ScanPredicate() = default; + SQLFilterOp op; + std::vector<const void*> values; + int scale; -#define _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) \ - if (conjunct_value < min || conjunct_value > max) { \ - return true; \ - } - -#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes) \ - std::vector<T> in_values; \ - for (auto val : in_pred_values) { \ - T value = reinterpret_cast<T*>(val)[0]; \ - in_values.emplace_back(value); \ - } \ - if (in_values.empty()) { \ - return false; \ - } \ - auto result = std::minmax_element(in_values.begin(), in_values.end()); \ - T in_min = *result.first; \ - T in_max = *result.second; \ - const T group_min = reinterpret_cast<const T*>(min_bytes)[0]; \ - const T group_max = reinterpret_cast<const T*>(max_bytes)[0]; \ - if (in_max < group_min || in_min > group_max) { \ - return true; \ - } - -struct ColumnMinMaxParams { - PrimitiveType conjunct_type; - tparquet::Type::type parquet_type; - void* value; - // Use for decimal type - int32_t parquet_precision; - int32_t parquet_scale; - int32_t parquet_type_length; - // Use for in predicate - std::vector<void*> in_pred_values; - const char* min_bytes; - const char* max_bytes; -}; - -template <typename T> -static void _align_decimal_v2_scale(T* conjunct_value, int32_t value_scale, T* parquet_value, - int32_t parquet_scale) { - if (value_scale > parquet_scale) { - *parquet_value = *parquet_value * common::exp10_i32(value_scale - parquet_scale); - } else if (value_scale < parquet_scale) { - *conjunct_value = *conjunct_value * common::exp10_i32(parquet_scale - value_scale); - } -} - -template <typename T> -static void _decode_decimal_v2_to_primary(const ColumnMinMaxParams& params, - const char* raw_parquet_val, T* out_value, - T* parquet_val) { - *parquet_val = reinterpret_cast<const T*>(raw_parquet_val)[0]; - DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); - *out_value = conjunct_value.value(); - _align_decimal_v2_scale(out_value, conjunct_value.scale(), parquet_val, params.parquet_scale); -} - -// todo: support decimal128 after the test passes -//static Int128 _decode_value_to_int128(const ColumnMinMaxParams& params, -// const char* raw_parquet_val) { -// const uint8_t* buf = reinterpret_cast<const uint8_t*>(raw_parquet_val); -// int32_t length = params.parquet_type_length; -// Int128 value = buf[0] & 0x80 ? -1 : 0; -// memcpy(reinterpret_cast<uint8_t*>(&value) + sizeof(value) - length, buf, length); -// return BigEndian::ToHost128(value); -//} - -static bool _eval_in_val(const ColumnMinMaxParams& params) { - switch (params.conjunct_type) { - case TYPE_TINYINT: { - _FILTER_GROUP_BY_IN(int8_t, params.in_pred_values, params.min_bytes, params.max_bytes) - break; - } - case TYPE_SMALLINT: { - _FILTER_GROUP_BY_IN(int16_t, params.in_pred_values, params.min_bytes, params.max_bytes) - break; - } - case TYPE_DECIMAL32: - case TYPE_INT: { - _FILTER_GROUP_BY_IN(int32_t, params.in_pred_values, params.min_bytes, params.max_bytes) - break; - } - case TYPE_DECIMAL64: - case TYPE_BIGINT: { - _FILTER_GROUP_BY_IN(int64_t, params.in_pred_values, params.min_bytes, params.max_bytes) - break; - } - case TYPE_DECIMALV2: { - break; - } - case TYPE_STRING: - case TYPE_VARCHAR: - case TYPE_CHAR: { - std::vector<std::string> in_values; - for (auto val : params.in_pred_values) { - in_values.emplace_back(((StringRef*)val)->to_string()); + ScanPredicate(const ScanPredicate& other) { + op = other.op; + for (auto v : other.values) { + values.emplace_back(v); + } + scale = other.scale; } - if (in_values.empty()) { - return false; - } - auto result = std::minmax_element(in_values.begin(), in_values.end()); - std::string& in_min = *result.first; - std::string& in_max = *result.second; - if (strcmp(in_max.data(), params.min_bytes) < 0 || - strcmp(in_min.data(), params.max_bytes) > 0) { - return true; - } - break; - } - default: - return false; - } - return false; -} + }; -static bool _eval_eq(const ColumnMinMaxParams& params) { - switch (params.conjunct_type) { - case TYPE_TINYINT: { - _PLAIN_DECODE(int16_t, params.value, params.min_bytes, params.max_bytes, conjunct_value, - min, max) - _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) - break; - } - case TYPE_SMALLINT: { - _PLAIN_DECODE(int16_t, params.value, params.min_bytes, params.max_bytes, conjunct_value, - min, max) - _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) - break; - } - case TYPE_DECIMAL32: - case TYPE_INT: { - _PLAIN_DECODE(int32_t, params.value, params.min_bytes, params.max_bytes, conjunct_value, - min, max) - _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) - break; - } - case TYPE_DECIMAL64: - case TYPE_BIGINT: { - _PLAIN_DECODE(int64_t, params.value, params.min_bytes, params.max_bytes, conjunct_value, - min, max) - _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) - break; - } - case TYPE_DECIMALV2: { - if (params.parquet_type == tparquet::Type::INT32) { - int32_t min_value = reinterpret_cast<const int32_t*>(params.min_bytes)[0]; - int32_t max_value = reinterpret_cast<const int32_t*>(params.max_bytes)[0]; - DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); - int32_t conjunct_int_value = conjunct_value.value(); - _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min_value, - params.parquet_scale); - _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max_value, - params.parquet_scale); - _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value) - } else if (params.parquet_type == tparquet::Type::INT64) { - int64_t min_value = reinterpret_cast<const int64_t*>(params.min_bytes)[0]; - int64_t max_value = reinterpret_cast<const int64_t*>(params.max_bytes)[0]; - DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); - int64_t conjunct_int_value = conjunct_value.value(); - _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min_value, - params.parquet_scale); - _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max_value, - params.parquet_scale); - _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value) + template <typename DecimalPrimitiveType, typename DecimalPhysicalType> + static DecimalPrimitiveType _decode_primitive_decimal(const FieldSchema* col_schema, + const std::string& encoded_data, + int dest_scale) { + int scale = col_schema->parquet_schema.scale; + Int128 value = *reinterpret_cast<const DecimalPhysicalType*>(encoded_data.data()); + if (dest_scale > scale) { + value *= DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale); + } else if (dest_scale < scale) { + value /= DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale); } - break; - // When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode - // todo: support decimal128 after the test passes - // else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { - // DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); - // Int128 conjunct_int_value = conjunct_value.value(); - // Int128 max = _decode_value_to_int128(params, params.max_bytes); - // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max, - // params.parquet_scale); - // Int128 min = _decode_value_to_int128(params, params.min_bytes); - // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min, - // params.parquet_scale); - // _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min, max) - // } + return (DecimalPrimitiveType)value; } - case TYPE_STRING: - case TYPE_VARCHAR: - case TYPE_CHAR: { - std::string conjunct_value = ((StringRef*)params.value)->to_string(); - if (strcmp(conjunct_value.data(), params.min_bytes) < 0 || - strcmp(conjunct_value.data(), params.max_bytes) > 0) { - return true; - } - break; - } - default: - return false; - } - return false; -} -template <typename T> -static bool _filter_group_by_gt_or_ge(T conjunct_value, T max, bool is_ge) { - if (!is_ge) { - if (max <= conjunct_value) { - return true; - } - } else { - if (max < conjunct_value) { - return true; + template <typename DecimalPrimitiveType> + static DecimalPrimitiveType _decode_binary_decimal(const FieldSchema* col_schema, + const std::string& encoded_data, + int dest_scale) { + int scale = col_schema->parquet_schema.scale; + const char* buf_start = encoded_data.data(); + Int128 value = buf_start[0] & 0x80 ? -1 : 0; + memcpy(reinterpret_cast<char*>(&value) + sizeof(Int128) - encoded_data.size(), buf_start, + encoded_data.size()); + value = BigEndian::ToHost128(value); + if (dest_scale > scale) { + value *= DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale); + } else if (dest_scale < scale) { + value /= DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale); } + return (DecimalPrimitiveType)value; } - return false; -} -static bool _eval_gt(const ColumnMinMaxParams& params, bool is_eq) { - switch (params.conjunct_type) { - case TYPE_TINYINT: { - _PLAIN_DECODE_SINGLE(int8_t, params.value, params.max_bytes, conjunct_value, max) - return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq); - } - case TYPE_SMALLINT: { - _PLAIN_DECODE_SINGLE(int16_t, params.value, params.max_bytes, conjunct_value, max) - return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq); - } - case TYPE_DECIMAL32: - case TYPE_INT: { - _PLAIN_DECODE_SINGLE(int32_t, params.value, params.max_bytes, conjunct_value, max) - return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq); - } - case TYPE_DECIMAL64: - case TYPE_BIGINT: { - _PLAIN_DECODE_SINGLE(int64_t, params.value, params.max_bytes, conjunct_value, max) - return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq); - } - case TYPE_DECIMALV2: { - if (params.parquet_type == tparquet::Type::INT32) { - int32_t conjunct_int_value = 0; - int32_t parquet_value = 0; - _decode_decimal_v2_to_primary(params, params.max_bytes, &conjunct_int_value, - &parquet_value); - return _filter_group_by_gt_or_ge(conjunct_int_value, parquet_value, is_eq); - } else if (params.parquet_type == tparquet::Type::INT64) { - int64_t conjunct_int_value = 0; - int64_t parquet_value = 0; - _decode_decimal_v2_to_primary(params, params.max_bytes, &conjunct_int_value, - &parquet_value); - return _filter_group_by_gt_or_ge(conjunct_int_value, parquet_value, is_eq); + template <typename CppType> + static bool _filter_by_min_max(const SQLFilterOp op, + const std::vector<CppType>& predicate_values, CppType& min_value, + CppType& max_value) { + if (predicate_values.empty()) { + return false; } - break; - // When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode - // todo: support decimal128 after the test passes - // else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { - // DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); - // Int128 conjunct_int_value = conjunct_value.value(); - // Int128 max = _decode_value_to_int128(params, params.max_bytes); - // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &max, - // params.parquet_scale); - // return _filter_group_by_gt_or_ge(conjunct_int_value, max, is_eq); - // } - } - case TYPE_STRING: - case TYPE_VARCHAR: - case TYPE_CHAR: { - std::string conjunct_value = ((StringRef*)params.value)->to_string(); - if (!is_eq && strcmp(params.max_bytes, conjunct_value.data()) <= 0) { - return true; - } else if (strcmp(params.max_bytes, conjunct_value.data()) < 0) { + switch (op) { + case FILTER_IN: + for (const CppType& in_value : predicate_values) { + if (in_value >= min_value && in_value <= max_value) { + return false; + } + } return true; + case FILTER_LESS: + return min_value >= predicate_values[0]; + case FILTER_LESS_OR_EQUAL: + return min_value > predicate_values[0]; + case FILTER_LARGER: + return max_value <= predicate_values[0]; + case FILTER_LARGER_OR_EQUAL: + return max_value < predicate_values[0]; + default: + return false; } - break; - } - default: - return false; } - return false; -} -template <typename T> -static bool _filter_group_by_lt_or_le(T conjunct_value, T min, bool is_le) { - if (!is_le) { - if (min >= conjunct_value) { - return true; - } - } else { - if (min > conjunct_value) { - return true; + template <PrimitiveType primitive_type> + static bool _filter_by_min_max(const ColumnValueRange<primitive_type>& col_val_range, + const ScanPredicate& predicate, const FieldSchema* col_schema, + const std::string& encoded_min, const std::string& encoded_max, + const cctz::time_zone& ctz) { + using CppType = typename PrimitiveTypeTraits<primitive_type>::CppType; + std::vector<CppType> predicate_values; + for (const void* v : predicate.values) { + predicate_values.emplace_back(*reinterpret_cast<const CppType*>(v)); } - } - return false; -} -static bool _eval_lt(const ColumnMinMaxParams& params, bool is_eq) { - switch (params.conjunct_type) { - case TYPE_TINYINT: { - _PLAIN_DECODE_SINGLE(int8_t, params.value, params.min_bytes, conjunct_value, min) - return _filter_group_by_lt_or_le(conjunct_value, min, is_eq); - } - case TYPE_SMALLINT: { - _PLAIN_DECODE_SINGLE(int16_t, params.value, params.min_bytes, conjunct_value, min) - return _filter_group_by_lt_or_le(conjunct_value, min, is_eq); - } - case TYPE_DECIMAL32: - case TYPE_INT: { - _PLAIN_DECODE_SINGLE(int32_t, params.value, params.min_bytes, conjunct_value, min) - return _filter_group_by_lt_or_le(conjunct_value, min, is_eq); - } - case TYPE_DECIMAL64: - case TYPE_BIGINT: { - _PLAIN_DECODE_SINGLE(int64_t, params.value, params.min_bytes, conjunct_value, min) - return _filter_group_by_lt_or_le(conjunct_value, min, is_eq); - } - case TYPE_STRING: - case TYPE_VARCHAR: - case TYPE_CHAR: { - std::string conjunct_value = ((StringRef*)params.value)->to_string(); - if (!is_eq && strcmp(params.min_bytes, conjunct_value.data()) >= 0) { - return true; - } else if (strcmp(params.min_bytes, conjunct_value.data()) > 0) { - return true; - } - break; - } - case TYPE_DECIMALV2: { - if (params.parquet_type == tparquet::Type::INT32) { - int32_t conjunct_int_value = 0; - int32_t parquet_value = 0; - _decode_decimal_v2_to_primary(params, params.min_bytes, &conjunct_int_value, - &parquet_value); - return _filter_group_by_lt_or_le(conjunct_int_value, parquet_value, is_eq); - } else if (params.parquet_type == tparquet::Type::INT64) { - int64_t conjunct_int_value = 0; - int64_t parquet_value = 0; - _decode_decimal_v2_to_primary(params, params.min_bytes, &conjunct_int_value, - &parquet_value); - return _filter_group_by_lt_or_le(conjunct_int_value, parquet_value, is_eq); - } + CppType min_value; + CppType max_value; + tparquet::Type::type physical_type = col_schema->physical_type; + switch (col_val_range.type()) { +#define DISPATCH(REINTERPRET_TYPE, PARQUET_TYPE) \ + case REINTERPRET_TYPE: \ + if (col_schema->physical_type != PARQUET_TYPE) return false; \ + min_value = *reinterpret_cast<const CppType*>(encoded_min.data()); \ + max_value = *reinterpret_cast<const CppType*>(encoded_max.data()); \ break; - // When precision exceeds 18, decimal will use tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode - // todo: support decimal128 after the test passes - // else if (params.parquet_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { - // DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value); - // Int128 conjunct_int_value = conjunct_value.value(); - // Int128 min = _decode_value_to_int128(params, params.min_bytes); - // _align_decimal_v2_scale(&conjunct_int_value, conjunct_value.scale(), &min, - // params.parquet_scale); - // return _filter_group_by_lt_or_le(conjunct_int_value, min, is_eq); - // } - } - case TYPE_DATE: { - // doris::DateTimeValue* min_date = (doris::DateTimeValue*)params.value; - // LOG(INFO) << min_date->debug_string(); - return false; - } - default: - return false; - } - return false; -} + FOR_REINTERPRET_TYPES(DISPATCH) +#undef DISPATCH + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: + if constexpr (std::is_same_v<CppType, StringRef>) { + min_value = StringRef(encoded_min); + max_value = StringRef(encoded_max); + } else { + return false; + }; + break; + case TYPE_DECIMALV2: + if constexpr (std::is_same_v<CppType, DecimalV2Value>) { + size_t max_precision = max_decimal_precision<Decimal<__int128_t>>(); + if (col_schema->parquet_schema.precision < 1 || + col_schema->parquet_schema.precision > max_precision || + col_schema->parquet_schema.scale > max_precision) { + return false; + } + int v2_scale = DecimalV2Value::SCALE; + if (physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + min_value = DecimalV2Value( + _decode_binary_decimal<Int128>(col_schema, encoded_min, v2_scale)); + max_value = DecimalV2Value( + _decode_binary_decimal<Int128>(col_schema, encoded_max, v2_scale)); + } else if (physical_type == tparquet::Type::INT32) { + min_value = DecimalV2Value(_decode_primitive_decimal<Int128, Int32>( + col_schema, encoded_min, v2_scale)); + max_value = DecimalV2Value(_decode_primitive_decimal<Int128, Int32>( + col_schema, encoded_max, v2_scale)); + } else if (physical_type == tparquet::Type::INT64) { + min_value = DecimalV2Value(_decode_primitive_decimal<Int128, Int64>( + col_schema, encoded_min, v2_scale)); + max_value = DecimalV2Value(_decode_primitive_decimal<Int128, Int64>( + col_schema, encoded_max, v2_scale)); + } else { + return false; + } + } else { + return false; + } + break; + case TYPE_DECIMAL32: + case TYPE_DECIMAL64: + case TYPE_DECIMAL128I: + if constexpr (std::is_same_v<CppType, int32_t> || std::is_same_v<CppType, int64_t> || + std::is_same_v<CppType, __int128_t>) { + size_t max_precision = max_decimal_precision<Decimal<CppType>>(); + if (col_schema->parquet_schema.precision < 1 || + col_schema->parquet_schema.precision > max_precision || + col_schema->parquet_schema.scale > max_precision) { + return false; + } + if (physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + min_value = _decode_binary_decimal<CppType>(col_schema, encoded_min, + predicate.scale); + max_value = _decode_binary_decimal<CppType>(col_schema, encoded_max, + predicate.scale); + } else if (physical_type == tparquet::Type::INT32) { + min_value = _decode_primitive_decimal<CppType, Int32>(col_schema, encoded_min, + predicate.scale); + max_value = _decode_primitive_decimal<CppType, Int32>(col_schema, encoded_max, + predicate.scale); + } else if (physical_type == tparquet::Type::INT64) { + min_value = _decode_primitive_decimal<CppType, Int64>(col_schema, encoded_min, + predicate.scale); + max_value = _decode_primitive_decimal<CppType, Int64>(col_schema, encoded_max, + predicate.scale); + } else { + return false; + } + } else { + return false; + } + break; + case TYPE_DATE: + case TYPE_DATEV2: + if (physical_type == tparquet::Type::INT32) { + int64_t min_date_value = + static_cast<int64_t>(*reinterpret_cast<const int32_t*>(encoded_min.data())); + int64_t max_date_value = + static_cast<int64_t>(*reinterpret_cast<const int32_t*>(encoded_max.data())); + if constexpr (std::is_same_v<CppType, DateTimeValue> || + std::is_same_v<CppType, DateV2Value<DateV2ValueType>>) { + min_value.from_unixtime(min_date_value * 24 * 60 * 60, ctz); + max_value.from_unixtime(max_date_value * 24 * 60 * 60, ctz); + } else { + return false; + } + } else { + return false; + } + break; + case TYPE_DATETIME: + case TYPE_DATETIMEV2: + if (physical_type == tparquet::Type::INT96) { + ParquetInt96 datetime96_min = + *reinterpret_cast<const ParquetInt96*>(encoded_min.data()); + int64_t micros_min = datetime96_min.to_timestamp_micros(); + ParquetInt96 datetime96_max = + *reinterpret_cast<const ParquetInt96*>(encoded_max.data()); + int64_t micros_max = datetime96_max.to_timestamp_micros(); + if constexpr (std::is_same_v<CppType, DateTimeValue> || + std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { + min_value.from_unixtime(micros_min / 1000000, ctz); + max_value.from_unixtime(micros_max / 1000000, ctz); + if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { + min_value.set_microsecond(micros_min % 1000000); + max_value.set_microsecond(micros_max % 1000000); + } + } else { + return false; + } + } else if (physical_type == tparquet::Type::INT64) { + int64_t date_value_min = *reinterpret_cast<const int64_t*>(encoded_min.data()); + int64_t date_value_max = *reinterpret_cast<const int64_t*>(encoded_max.data()); -struct ScanPredicate { - ScanPredicate() = default; - ~ScanPredicate() = default; - std::string _col_name; - TExprOpcode::type _op; - std::vector<void*> _values; - bool _null_op = false; - bool _is_null = false; - int _scale; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + cctz::time_zone resolved_ctz = ctz; + const auto& schema = col_schema->parquet_schema; + if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) { + const auto& timestamp_info = schema.logicalType.TIMESTAMP; + if (!timestamp_info.isAdjustedToUTC) { + // should set timezone to utc+0 + resolved_ctz = cctz::utc_time_zone(); + } + const auto& time_unit = timestamp_info.unit; + if (time_unit.__isset.MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (time_unit.__isset.MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } else if (time_unit.__isset.NANOS) { + second_mask = 1000000000; + scale_to_nano_factor = 1; + } + } else if (schema.__isset.converted_type) { + const auto& converted_type = schema.converted_type; + if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) { + second_mask = 1000; + scale_to_nano_factor = 1000000; + } else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) { + second_mask = 1000000; + scale_to_nano_factor = 1000; + } + } - ScanPredicate(const ScanPredicate& other) { - _col_name = other._col_name; - _op = other._op; - for (void* v : other._values) { - _values.push_back(v); + if constexpr (std::is_same_v<CppType, DateTimeValue> || + std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { + min_value.from_unixtime(date_value_min / second_mask, resolved_ctz); + max_value.from_unixtime(date_value_max / second_mask, resolved_ctz); + if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) { + min_value.set_microsecond((date_value_min % second_mask) * + scale_to_nano_factor / 1000); + max_value.set_microsecond((date_value_max % second_mask) * + scale_to_nano_factor / 1000); + } + } else { + return false; + } + } else { + return false; + } + break; + default: + return false; } - _null_op = other._null_op; - _is_null = other._is_null; - _scale = other._scale; + return _filter_by_min_max(predicate.op, predicate_values, min_value, max_value); } -}; -template <PrimitiveType primitive_type> -static void to_filter(const ColumnValueRange<primitive_type>& col_val_range, - std::vector<ScanPredicate>& filters) { - using CppType = typename PrimitiveTypeTraits<primitive_type>::CppType; - const auto& high_value = col_val_range.get_range_max_value(); - const auto& low_value = col_val_range.get_range_min_value(); - const auto& high_op = col_val_range.get_range_high_op(); - const auto& low_op = col_val_range.get_range_low_op(); + template <PrimitiveType primitive_type> + static std::vector<ScanPredicate> _value_range_to_predicate( + const ColumnValueRange<primitive_type>& col_val_range) { + using CppType = typename PrimitiveTypeTraits<primitive_type>::CppType; + std::vector<ScanPredicate> predicates; - // todo: process equals - if (col_val_range.is_fixed_value_range()) { - // 1. convert to in filter condition - ScanPredicate condition; - condition._col_name = col_val_range.column_name(); - condition._op = TExprOpcode::FILTER_NEW_IN; - condition._scale = col_val_range.scale(); - if (col_val_range.get_fixed_value_set().empty()) { - return; - } - for (const auto& value : col_val_range.get_fixed_value_set()) { - condition._values.push_back(const_cast<CppType*>(&value)); - } - filters.push_back(condition); - } else if (low_value < high_value) { - // 2. convert to min max filter condition - ScanPredicate null_pred; - if (col_val_range.is_high_value_maximum() && high_op == SQLFilterOp::FILTER_LESS_OR_EQUAL && - col_val_range.is_low_value_mininum() && low_op == SQLFilterOp::FILTER_LARGER_OR_EQUAL && - !col_val_range.contain_null()) { - null_pred._col_name = col_val_range.column_name(); - null_pred._null_op = true; - null_pred._is_null = false; - filters.push_back(null_pred); - return; - } - ScanPredicate low; - if (!col_val_range.is_low_value_mininum() || - SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) { - low._col_name = col_val_range.column_name(); - low._op = (low_op == SQLFilterOp::FILTER_LARGER_OR_EQUAL ? TExprOpcode::GE - : TExprOpcode::GT); - // NOTICE: use get_range_min_value_ptr, not "low_value"'s addr, - // to avoid stack-use-after-return bug - low._values.push_back(const_cast<CppType*>(col_val_range.get_range_min_value_ptr())); - low._scale = col_val_range.scale(); - filters.push_back(low); + if (col_val_range.is_fixed_value_range()) { + ScanPredicate in_predicate; + in_predicate.op = SQLFilterOp::FILTER_IN; + in_predicate.scale = col_val_range.scale(); + for (const auto& value : col_val_range.get_fixed_value_set()) { + in_predicate.values.emplace_back(&value); + } + if (!in_predicate.values.empty()) { + predicates.emplace_back(in_predicate); + } + return predicates; } - ScanPredicate high; - if (!col_val_range.is_high_value_maximum() || - SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) { - high._col_name = col_val_range.column_name(); - high._op = (high_op == SQLFilterOp::FILTER_LESS_OR_EQUAL ? TExprOpcode::LE - : TExprOpcode::LT); - // NOTICE: use get_range_max_value_ptr, not "high_value"'s addr, - // to avoid stack-use-after-return bug - high._values.push_back(const_cast<CppType*>(col_val_range.get_range_max_value_ptr())); - high._scale = col_val_range.scale(); - filters.push_back(high); - } - } else { - // 3. convert to is null and is not null filter condition - ScanPredicate null_pred; - if (col_val_range.is_low_value_maximum() && col_val_range.is_high_value_mininum() && - col_val_range.contain_null()) { - null_pred._col_name = col_val_range.column_name(); - null_pred._null_op = true; - null_pred._is_null = true; - filters.push_back(null_pred); - } - } -} + const CppType high_value = col_val_range.get_range_max_value(); + const CppType low_value = col_val_range.get_range_min_value(); + const SQLFilterOp high_op = col_val_range.get_range_high_op(); + const SQLFilterOp low_op = col_val_range.get_range_low_op(); -static void _eval_predicate(const ScanPredicate& filter, ColumnMinMaxParams* params, - bool* need_filter) { - if (filter._values.empty()) { - return; - } - if (filter._op == TExprOpcode::FILTER_NEW_IN) { - if (filter._values.size() == 1) { - params->value = filter._values[0]; - *need_filter = _eval_eq(*params); - return; + // orc can only push down is_null. When col_value_range._contain_null = true, only indicating that + // value can be null, not equals null, so ignore _contain_null in col_value_range + if (col_val_range.is_high_value_maximum() && high_op == SQLFilterOp::FILTER_LESS_OR_EQUAL && + col_val_range.is_low_value_mininum() && low_op == SQLFilterOp::FILTER_LARGER_OR_EQUAL) { + return predicates; } - params->in_pred_values = filter._values; - *need_filter = _eval_in_val(*params); - return; - } - // preserve TExprOpcode::FILTER_NEW_NOT_IN - params->value = filter._values[0]; - switch (filter._op) { - case TExprOpcode::EQ: - *need_filter = _eval_eq(*params); - break; - case TExprOpcode::NE: - break; - case TExprOpcode::GT: - *need_filter = _eval_gt(*params, false); - break; - case TExprOpcode::GE: - *need_filter = _eval_gt(*params, true); - break; - case TExprOpcode::LT: - *need_filter = _eval_lt(*params, false); - break; - case TExprOpcode::LE: - *need_filter = _eval_lt(*params, true); - break; - default: - break; - } -} -static bool determine_filter_min_max(const ColumnValueRangeType& col_val_range, - const FieldSchema* col_schema, const std::string& encoded_min, - const std::string& encoded_max) { - const char* min_bytes = encoded_min.data(); - const char* max_bytes = encoded_max.data(); - bool need_filter = false; - std::vector<ScanPredicate> filters; - PrimitiveType col_type; - std::visit( - [&](auto&& range) { - col_type = range.type(); - to_filter(range, filters); - }, - col_val_range); - if (filters.empty()) { - return false; + if (low_value < high_value) { + if (!col_val_range.is_low_value_mininum() || + SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) { + ScanPredicate low_predicate; + low_predicate.scale = col_val_range.scale(); + low_predicate.op = low_op; + low_predicate.values.emplace_back(col_val_range.get_range_min_value_ptr()); + predicates.emplace_back(low_predicate); + } + if (!col_val_range.is_high_value_maximum() || + SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) { + ScanPredicate high_predicate; + high_predicate.scale = col_val_range.scale(); + high_predicate.op = high_op; + high_predicate.values.emplace_back(col_val_range.get_range_max_value_ptr()); + predicates.emplace_back(high_predicate); + } + } + return predicates; } - ColumnMinMaxParams params; - params.conjunct_type = col_type; - params.parquet_type = col_schema->physical_type; - params.parquet_precision = col_schema->parquet_schema.precision; - params.parquet_scale = col_schema->parquet_schema.scale; - params.parquet_type_length = col_schema->parquet_schema.type_length; - params.min_bytes = min_bytes; - params.max_bytes = max_bytes; - for (int i = 0; i < filters.size(); i++) { - _eval_predicate(filters[i], ¶ms, &need_filter); - if (need_filter) { - break; - } +public: + static bool filter_by_min_max(const ColumnValueRangeType& col_val_range, + const FieldSchema* col_schema, const std::string& encoded_min, + const std::string& encoded_max, const cctz::time_zone& ctz) { + bool need_filter = false; + std::visit( + [&](auto&& range) { + std::vector<ScanPredicate> filters = _value_range_to_predicate(range); + for (auto& filter : filters) { + need_filter |= _filter_by_min_max(range, filter, col_schema, encoded_min, + encoded_max, ctz); + if (need_filter) { + break; + } + } + }, + col_val_range); + return need_filter; } - return need_filter; -} +}; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp index acb56d0c2a..ad70d8f8bc 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp @@ -40,15 +40,17 @@ Status PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index, Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* column_index, ColumnValueRangeType& col_val_range, const FieldSchema* col_schema, - std::vector<int>& skipped_ranges) { + std::vector<int>& skipped_ranges, + const cctz::time_zone& ctz) { const std::vector<std::string>& encoded_min_vals = column_index->min_values; const std::vector<std::string>& encoded_max_vals = column_index->max_values; DCHECK_EQ(encoded_min_vals.size(), encoded_max_vals.size()); const int num_of_pages = column_index->null_pages.size(); for (int page_id = 0; page_id < num_of_pages; page_id++) { - if (determine_filter_min_max(col_val_range, col_schema, encoded_min_vals[page_id], - encoded_max_vals[page_id])) { + if (ParquetPredicate::filter_by_min_max(col_val_range, col_schema, + encoded_min_vals[page_id], + encoded_max_vals[page_id], ctz)) { skipped_ranges.emplace_back(page_id); } } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h index 4a27593abf..00cbbdcfdd 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h @@ -33,7 +33,7 @@ public: Status collect_skipped_page_range(tparquet::ColumnIndex* column_index, ColumnValueRangeType& col_val_range, const FieldSchema* col_schema, - std::vector<int>& skipped_ranges); + std::vector<int>& skipped_ranges, const cctz::time_zone& ctz); bool check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns); Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff, tparquet::ColumnIndex* column_index); diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 1b7653433a..2d149d7ba5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -583,7 +583,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, std::vector<int> skipped_page_range; const FieldSchema* col_schema = schema_desc.get_column(read_col._file_slot_name); page_index.collect_skipped_page_range(&column_index, conjuncts, col_schema, - skipped_page_range); + skipped_page_range, *_ctz); if (skipped_page_range.empty()) { continue; } @@ -664,8 +664,8 @@ Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::Co } const FieldSchema* col_schema = schema_desc.get_column(col_name); // Min-max of statistic is plain-encoded value - *filter_group = determine_filter_min_max(slot_iter->second, col_schema, statistic.min, - statistic.max); + *filter_group = ParquetPredicate::filter_by_min_max(slot_iter->second, col_schema, + statistic.min, statistic.max, *_ctz); if (*filter_group) { break; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org