This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 27216dc7e0 [improvement](multi-catalog) push down all predicates into 
rowgroup/page filtering for ParquetReader (#16388)
27216dc7e0 is described below

commit 27216dc7e0af8d5613249d01f3dd63088db5fb62
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Tue Feb 7 11:32:57 2023 +0800

    [improvement](multi-catalog) push down all predicates into rowgroup/page 
filtering for ParquetReader (#16388)
    
    Tow improvements:
    1. Refactor rowgroup&page filtering in `ParquetReader`, and use the 
operator overloading of Doris native c++ type to process comparison.
    2. Support decimal/decimal v3/date/datev2/datetime/datetimev2
---
 be/src/vec/exec/format/parquet/parquet_common.cpp  |   4 -
 be/src/vec/exec/format/parquet/parquet_common.h    |   8 +-
 be/src/vec/exec/format/parquet/parquet_pred_cmp.h  | 823 ++++++++-------------
 .../exec/format/parquet/vparquet_page_index.cpp    |   8 +-
 .../vec/exec/format/parquet/vparquet_page_index.h  |   2 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |   6 +-
 6 files changed, 336 insertions(+), 515 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp 
b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 5f8656fcd9..1449d1bdf3 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -29,10 +29,6 @@ const uint32_t ParquetInt96::JULIAN_EPOCH_OFFSET_DAYS = 
2440588;
 const uint64_t ParquetInt96::MICROS_IN_DAY = 86400000000;
 const uint64_t ParquetInt96::NANOS_PER_MICROSECOND = 1000;
 
-inline uint64_t ParquetInt96::to_timestamp_micros() const {
-    return (hi - JULIAN_EPOCH_OFFSET_DAYS) * MICROS_IN_DAY + lo / 
NANOS_PER_MICROSECOND;
-}
-
 #define FOR_LOGICAL_NUMERIC_TYPES(M) \
     M(TypeIndex::Int8, Int8)         \
     M(TypeIndex::UInt8, UInt8)       \
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h 
b/be/src/vec/exec/format/parquet/parquet_common.h
index c06d8690ac..95727ed139 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -68,7 +68,9 @@ struct ParquetInt96 {
     uint64_t lo; // time of nanoseconds in a day
     uint32_t hi; // days from julian epoch
 
-    inline uint64_t to_timestamp_micros() const;
+    inline uint64_t to_timestamp_micros() const {
+        return (hi - JULIAN_EPOCH_OFFSET_DAYS) * MICROS_IN_DAY + lo / 
NANOS_PER_MICROSECOND;
+    }
 
     static const uint32_t JULIAN_EPOCH_OFFSET_DAYS;
     static const uint64_t MICROS_IN_DAY;
@@ -361,7 +363,6 @@ Status 
FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column,
     size_t data_index = column_data.size();
     column_data.resize(data_index + select_vector.num_values() - 
select_vector.num_filtered());
     size_t dict_index = 0;
-    int64_t scale_to_micro = _decode_params->scale_to_nano_factor / 1000;
     ColumnSelectVector::DataReadType read_type;
     while (size_t run_length = select_vector.get_next_run(&read_type)) {
         switch (read_type) {
@@ -373,7 +374,8 @@ Status 
FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column,
                 v.from_unixtime(date_value / _decode_params->second_mask, 
*_decode_params->ctz);
                 if constexpr (std::is_same_v<CppType, 
DateV2Value<DateTimeV2ValueType>>) {
                     // nanoseconds will be ignored.
-                    v.set_microsecond((date_value % 
_decode_params->second_mask) * scale_to_micro);
+                    v.set_microsecond((date_value % 
_decode_params->second_mask) *
+                                      _decode_params->scale_to_nano_factor / 
1000);
                     // TODO: the precision of datetime v1
                 }
                 _FIXED_SHIFT_DATA_OFFSET();
diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h 
b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
index 49016b5266..adc71dcedd 100644
--- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
+++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h
@@ -21,541 +21,362 @@
 #include <vector>
 
 #include "exec/olap_common.h"
+#include "parquet_common.h"
 
 namespace doris::vectorized {
 
-#define _PLAIN_DECODE(T, value, min_bytes, max_bytes, out_value, out_min, 
out_max) \
-    const T out_min = reinterpret_cast<const T*>(min_bytes)[0];                
    \
-    const T out_max = reinterpret_cast<const T*>(max_bytes)[0];                
    \
-    T out_value = *((T*)value);
+class ParquetPredicate {
+#define FOR_REINTERPRET_TYPES(M)             \
+    M(TYPE_BOOLEAN, tparquet::Type::BOOLEAN) \
+    M(TYPE_TINYINT, tparquet::Type::INT32)   \
+    M(TYPE_SMALLINT, tparquet::Type::INT32)  \
+    M(TYPE_INT, tparquet::Type::INT32)       \
+    M(TYPE_BIGINT, tparquet::Type::INT64)    \
+    M(TYPE_FLOAT, tparquet::Type::FLOAT)     \
+    M(TYPE_DOUBLE, tparquet::Type::DOUBLE)
 
-#define _PLAIN_DECODE_SINGLE(T, value, bytes, conjunct_value, out) \
-    const T out = reinterpret_cast<const T*>(bytes)[0];            \
-    T conjunct_value = *((T*)value);
+private:
+    struct ScanPredicate {
+        ScanPredicate() = default;
+        ~ScanPredicate() = default;
+        SQLFilterOp op;
+        std::vector<const void*> values;
+        int scale;
 
-#define _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max) \
-    if (conjunct_value < min || conjunct_value > max) {    \
-        return true;                                       \
-    }
-
-#define _FILTER_GROUP_BY_IN(T, in_pred_values, min_bytes, max_bytes)       \
-    std::vector<T> in_values;                                              \
-    for (auto val : in_pred_values) {                                      \
-        T value = reinterpret_cast<T*>(val)[0];                            \
-        in_values.emplace_back(value);                                     \
-    }                                                                      \
-    if (in_values.empty()) {                                               \
-        return false;                                                      \
-    }                                                                      \
-    auto result = std::minmax_element(in_values.begin(), in_values.end()); \
-    T in_min = *result.first;                                              \
-    T in_max = *result.second;                                             \
-    const T group_min = reinterpret_cast<const T*>(min_bytes)[0];          \
-    const T group_max = reinterpret_cast<const T*>(max_bytes)[0];          \
-    if (in_max < group_min || in_min > group_max) {                        \
-        return true;                                                       \
-    }
-
-struct ColumnMinMaxParams {
-    PrimitiveType conjunct_type;
-    tparquet::Type::type parquet_type;
-    void* value;
-    // Use for decimal type
-    int32_t parquet_precision;
-    int32_t parquet_scale;
-    int32_t parquet_type_length;
-    // Use for in predicate
-    std::vector<void*> in_pred_values;
-    const char* min_bytes;
-    const char* max_bytes;
-};
-
-template <typename T>
-static void _align_decimal_v2_scale(T* conjunct_value, int32_t value_scale, T* 
parquet_value,
-                                    int32_t parquet_scale) {
-    if (value_scale > parquet_scale) {
-        *parquet_value = *parquet_value * common::exp10_i32(value_scale - 
parquet_scale);
-    } else if (value_scale < parquet_scale) {
-        *conjunct_value = *conjunct_value * common::exp10_i32(parquet_scale - 
value_scale);
-    }
-}
-
-template <typename T>
-static void _decode_decimal_v2_to_primary(const ColumnMinMaxParams& params,
-                                          const char* raw_parquet_val, T* 
out_value,
-                                          T* parquet_val) {
-    *parquet_val = reinterpret_cast<const T*>(raw_parquet_val)[0];
-    DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
-    *out_value = conjunct_value.value();
-    _align_decimal_v2_scale(out_value, conjunct_value.scale(), parquet_val, 
params.parquet_scale);
-}
-
-//  todo: support decimal128 after the test passes
-//static Int128 _decode_value_to_int128(const ColumnMinMaxParams& params,
-//                                      const char* raw_parquet_val) {
-//    const uint8_t* buf = reinterpret_cast<const uint8_t*>(raw_parquet_val);
-//    int32_t length = params.parquet_type_length;
-//    Int128 value = buf[0] & 0x80 ? -1 : 0;
-//    memcpy(reinterpret_cast<uint8_t*>(&value) + sizeof(value) - length, buf, 
length);
-//    return BigEndian::ToHost128(value);
-//}
-
-static bool _eval_in_val(const ColumnMinMaxParams& params) {
-    switch (params.conjunct_type) {
-    case TYPE_TINYINT: {
-        _FILTER_GROUP_BY_IN(int8_t, params.in_pred_values, params.min_bytes, 
params.max_bytes)
-        break;
-    }
-    case TYPE_SMALLINT: {
-        _FILTER_GROUP_BY_IN(int16_t, params.in_pred_values, params.min_bytes, 
params.max_bytes)
-        break;
-    }
-    case TYPE_DECIMAL32:
-    case TYPE_INT: {
-        _FILTER_GROUP_BY_IN(int32_t, params.in_pred_values, params.min_bytes, 
params.max_bytes)
-        break;
-    }
-    case TYPE_DECIMAL64:
-    case TYPE_BIGINT: {
-        _FILTER_GROUP_BY_IN(int64_t, params.in_pred_values, params.min_bytes, 
params.max_bytes)
-        break;
-    }
-    case TYPE_DECIMALV2: {
-        break;
-    }
-    case TYPE_STRING:
-    case TYPE_VARCHAR:
-    case TYPE_CHAR: {
-        std::vector<std::string> in_values;
-        for (auto val : params.in_pred_values) {
-            in_values.emplace_back(((StringRef*)val)->to_string());
+        ScanPredicate(const ScanPredicate& other) {
+            op = other.op;
+            for (auto v : other.values) {
+                values.emplace_back(v);
+            }
+            scale = other.scale;
         }
-        if (in_values.empty()) {
-            return false;
-        }
-        auto result = std::minmax_element(in_values.begin(), in_values.end());
-        std::string& in_min = *result.first;
-        std::string& in_max = *result.second;
-        if (strcmp(in_max.data(), params.min_bytes) < 0 ||
-            strcmp(in_min.data(), params.max_bytes) > 0) {
-            return true;
-        }
-        break;
-    }
-    default:
-        return false;
-    }
-    return false;
-}
+    };
 
-static bool _eval_eq(const ColumnMinMaxParams& params) {
-    switch (params.conjunct_type) {
-    case TYPE_TINYINT: {
-        _PLAIN_DECODE(int16_t, params.value, params.min_bytes, 
params.max_bytes, conjunct_value,
-                      min, max)
-        _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
-        break;
-    }
-    case TYPE_SMALLINT: {
-        _PLAIN_DECODE(int16_t, params.value, params.min_bytes, 
params.max_bytes, conjunct_value,
-                      min, max)
-        _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
-        break;
-    }
-    case TYPE_DECIMAL32:
-    case TYPE_INT: {
-        _PLAIN_DECODE(int32_t, params.value, params.min_bytes, 
params.max_bytes, conjunct_value,
-                      min, max)
-        _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
-        break;
-    }
-    case TYPE_DECIMAL64:
-    case TYPE_BIGINT: {
-        _PLAIN_DECODE(int64_t, params.value, params.min_bytes, 
params.max_bytes, conjunct_value,
-                      min, max)
-        _FILTER_GROUP_BY_EQ_PRED(conjunct_value, min, max)
-        break;
-    }
-    case TYPE_DECIMALV2: {
-        if (params.parquet_type == tparquet::Type::INT32) {
-            int32_t min_value = reinterpret_cast<const 
int32_t*>(params.min_bytes)[0];
-            int32_t max_value = reinterpret_cast<const 
int32_t*>(params.max_bytes)[0];
-            DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
-            int32_t conjunct_int_value = conjunct_value.value();
-            _align_decimal_v2_scale(&conjunct_int_value, 
conjunct_value.scale(), &min_value,
-                                    params.parquet_scale);
-            _align_decimal_v2_scale(&conjunct_int_value, 
conjunct_value.scale(), &max_value,
-                                    params.parquet_scale);
-            _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value)
-        } else if (params.parquet_type == tparquet::Type::INT64) {
-            int64_t min_value = reinterpret_cast<const 
int64_t*>(params.min_bytes)[0];
-            int64_t max_value = reinterpret_cast<const 
int64_t*>(params.max_bytes)[0];
-            DecimalV2Value conjunct_value = *((DecimalV2Value*)params.value);
-            int64_t conjunct_int_value = conjunct_value.value();
-            _align_decimal_v2_scale(&conjunct_int_value, 
conjunct_value.scale(), &min_value,
-                                    params.parquet_scale);
-            _align_decimal_v2_scale(&conjunct_int_value, 
conjunct_value.scale(), &max_value,
-                                    params.parquet_scale);
-            _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min_value, max_value)
+    template <typename DecimalPrimitiveType, typename DecimalPhysicalType>
+    static DecimalPrimitiveType _decode_primitive_decimal(const FieldSchema* 
col_schema,
+                                                          const std::string& 
encoded_data,
+                                                          int dest_scale) {
+        int scale = col_schema->parquet_schema.scale;
+        Int128 value = *reinterpret_cast<const 
DecimalPhysicalType*>(encoded_data.data());
+        if (dest_scale > scale) {
+            value *= 
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale);
+        } else if (dest_scale < scale) {
+            value /= 
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale);
         }
-        break;
-        //  When precision exceeds 18, decimal will use 
tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
-        //  todo: support decimal128 after the test passes
-        //        else if (params.parquet_type == 
tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
-        //            DecimalV2Value conjunct_value = 
*((DecimalV2Value*)params.value);
-        //            Int128 conjunct_int_value = conjunct_value.value();
-        //            Int128 max = _decode_value_to_int128(params, 
params.max_bytes);
-        //            _align_decimal_v2_scale(&conjunct_int_value, 
conjunct_value.scale(), &max,
-        //                                    params.parquet_scale);
-        //            Int128 min = _decode_value_to_int128(params, 
params.min_bytes);
-        //            _align_decimal_v2_scale(&conjunct_int_value, 
conjunct_value.scale(), &min,
-        //                                    params.parquet_scale);
-        //            _FILTER_GROUP_BY_EQ_PRED(conjunct_int_value, min, max)
-        //        }
+        return (DecimalPrimitiveType)value;
     }
-    case TYPE_STRING:
-    case TYPE_VARCHAR:
-    case TYPE_CHAR: {
-        std::string conjunct_value = ((StringRef*)params.value)->to_string();
-        if (strcmp(conjunct_value.data(), params.min_bytes) < 0 ||
-            strcmp(conjunct_value.data(), params.max_bytes) > 0) {
-            return true;
-        }
-        break;
-    }
-    default:
-        return false;
-    }
-    return false;
-}
 
-template <typename T>
-static bool _filter_group_by_gt_or_ge(T conjunct_value, T max, bool is_ge) {
-    if (!is_ge) {
-        if (max <= conjunct_value) {
-            return true;
-        }
-    } else {
-        if (max < conjunct_value) {
-            return true;
+    template <typename DecimalPrimitiveType>
+    static DecimalPrimitiveType _decode_binary_decimal(const FieldSchema* 
col_schema,
+                                                       const std::string& 
encoded_data,
+                                                       int dest_scale) {
+        int scale = col_schema->parquet_schema.scale;
+        const char* buf_start = encoded_data.data();
+        Int128 value = buf_start[0] & 0x80 ? -1 : 0;
+        memcpy(reinterpret_cast<char*>(&value) + sizeof(Int128) - 
encoded_data.size(), buf_start,
+               encoded_data.size());
+        value = BigEndian::ToHost128(value);
+        if (dest_scale > scale) {
+            value *= 
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale);
+        } else if (dest_scale < scale) {
+            value /= 
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale);
         }
+        return (DecimalPrimitiveType)value;
     }
-    return false;
-}
 
-static bool _eval_gt(const ColumnMinMaxParams& params, bool is_eq) {
-    switch (params.conjunct_type) {
-    case TYPE_TINYINT: {
-        _PLAIN_DECODE_SINGLE(int8_t, params.value, params.max_bytes, 
conjunct_value, max)
-        return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
-    }
-    case TYPE_SMALLINT: {
-        _PLAIN_DECODE_SINGLE(int16_t, params.value, params.max_bytes, 
conjunct_value, max)
-        return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
-    }
-    case TYPE_DECIMAL32:
-    case TYPE_INT: {
-        _PLAIN_DECODE_SINGLE(int32_t, params.value, params.max_bytes, 
conjunct_value, max)
-        return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
-    }
-    case TYPE_DECIMAL64:
-    case TYPE_BIGINT: {
-        _PLAIN_DECODE_SINGLE(int64_t, params.value, params.max_bytes, 
conjunct_value, max)
-        return _filter_group_by_gt_or_ge(conjunct_value, max, is_eq);
-    }
-    case TYPE_DECIMALV2: {
-        if (params.parquet_type == tparquet::Type::INT32) {
-            int32_t conjunct_int_value = 0;
-            int32_t parquet_value = 0;
-            _decode_decimal_v2_to_primary(params, params.max_bytes, 
&conjunct_int_value,
-                                          &parquet_value);
-            return _filter_group_by_gt_or_ge(conjunct_int_value, 
parquet_value, is_eq);
-        } else if (params.parquet_type == tparquet::Type::INT64) {
-            int64_t conjunct_int_value = 0;
-            int64_t parquet_value = 0;
-            _decode_decimal_v2_to_primary(params, params.max_bytes, 
&conjunct_int_value,
-                                          &parquet_value);
-            return _filter_group_by_gt_or_ge(conjunct_int_value, 
parquet_value, is_eq);
+    template <typename CppType>
+    static bool _filter_by_min_max(const SQLFilterOp op,
+                                   const std::vector<CppType>& 
predicate_values, CppType& min_value,
+                                   CppType& max_value) {
+        if (predicate_values.empty()) {
+            return false;
         }
-        break;
-        //  When precision exceeds 18, decimal will use 
tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
-        //  todo: support decimal128 after the test passes
-        //        else if (params.parquet_type == 
tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
-        //            DecimalV2Value conjunct_value = 
*((DecimalV2Value*)params.value);
-        //            Int128 conjunct_int_value = conjunct_value.value();
-        //            Int128 max = _decode_value_to_int128(params, 
params.max_bytes);
-        //            _align_decimal_v2_scale(&conjunct_int_value, 
conjunct_value.scale(), &max,
-        //                                    params.parquet_scale);
-        //            return _filter_group_by_gt_or_ge(conjunct_int_value, 
max, is_eq);
-        //        }
-    }
-    case TYPE_STRING:
-    case TYPE_VARCHAR:
-    case TYPE_CHAR: {
-        std::string conjunct_value = ((StringRef*)params.value)->to_string();
-        if (!is_eq && strcmp(params.max_bytes, conjunct_value.data()) <= 0) {
-            return true;
-        } else if (strcmp(params.max_bytes, conjunct_value.data()) < 0) {
+        switch (op) {
+        case FILTER_IN:
+            for (const CppType& in_value : predicate_values) {
+                if (in_value >= min_value && in_value <= max_value) {
+                    return false;
+                }
+            }
             return true;
+        case FILTER_LESS:
+            return min_value >= predicate_values[0];
+        case FILTER_LESS_OR_EQUAL:
+            return min_value > predicate_values[0];
+        case FILTER_LARGER:
+            return max_value <= predicate_values[0];
+        case FILTER_LARGER_OR_EQUAL:
+            return max_value < predicate_values[0];
+        default:
+            return false;
         }
-        break;
-    }
-    default:
-        return false;
     }
-    return false;
-}
 
-template <typename T>
-static bool _filter_group_by_lt_or_le(T conjunct_value, T min, bool is_le) {
-    if (!is_le) {
-        if (min >= conjunct_value) {
-            return true;
-        }
-    } else {
-        if (min > conjunct_value) {
-            return true;
+    template <PrimitiveType primitive_type>
+    static bool _filter_by_min_max(const ColumnValueRange<primitive_type>& 
col_val_range,
+                                   const ScanPredicate& predicate, const 
FieldSchema* col_schema,
+                                   const std::string& encoded_min, const 
std::string& encoded_max,
+                                   const cctz::time_zone& ctz) {
+        using CppType = typename PrimitiveTypeTraits<primitive_type>::CppType;
+        std::vector<CppType> predicate_values;
+        for (const void* v : predicate.values) {
+            predicate_values.emplace_back(*reinterpret_cast<const 
CppType*>(v));
         }
-    }
-    return false;
-}
 
-static bool _eval_lt(const ColumnMinMaxParams& params, bool is_eq) {
-    switch (params.conjunct_type) {
-    case TYPE_TINYINT: {
-        _PLAIN_DECODE_SINGLE(int8_t, params.value, params.min_bytes, 
conjunct_value, min)
-        return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
-    }
-    case TYPE_SMALLINT: {
-        _PLAIN_DECODE_SINGLE(int16_t, params.value, params.min_bytes, 
conjunct_value, min)
-        return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
-    }
-    case TYPE_DECIMAL32:
-    case TYPE_INT: {
-        _PLAIN_DECODE_SINGLE(int32_t, params.value, params.min_bytes, 
conjunct_value, min)
-        return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
-    }
-    case TYPE_DECIMAL64:
-    case TYPE_BIGINT: {
-        _PLAIN_DECODE_SINGLE(int64_t, params.value, params.min_bytes, 
conjunct_value, min)
-        return _filter_group_by_lt_or_le(conjunct_value, min, is_eq);
-    }
-    case TYPE_STRING:
-    case TYPE_VARCHAR:
-    case TYPE_CHAR: {
-        std::string conjunct_value = ((StringRef*)params.value)->to_string();
-        if (!is_eq && strcmp(params.min_bytes, conjunct_value.data()) >= 0) {
-            return true;
-        } else if (strcmp(params.min_bytes, conjunct_value.data()) > 0) {
-            return true;
-        }
-        break;
-    }
-    case TYPE_DECIMALV2: {
-        if (params.parquet_type == tparquet::Type::INT32) {
-            int32_t conjunct_int_value = 0;
-            int32_t parquet_value = 0;
-            _decode_decimal_v2_to_primary(params, params.min_bytes, 
&conjunct_int_value,
-                                          &parquet_value);
-            return _filter_group_by_lt_or_le(conjunct_int_value, 
parquet_value, is_eq);
-        } else if (params.parquet_type == tparquet::Type::INT64) {
-            int64_t conjunct_int_value = 0;
-            int64_t parquet_value = 0;
-            _decode_decimal_v2_to_primary(params, params.min_bytes, 
&conjunct_int_value,
-                                          &parquet_value);
-            return _filter_group_by_lt_or_le(conjunct_int_value, 
parquet_value, is_eq);
-        }
+        CppType min_value;
+        CppType max_value;
+        tparquet::Type::type physical_type = col_schema->physical_type;
+        switch (col_val_range.type()) {
+#define DISPATCH(REINTERPRET_TYPE, PARQUET_TYPE)                           \
+    case REINTERPRET_TYPE:                                                 \
+        if (col_schema->physical_type != PARQUET_TYPE) return false;       \
+        min_value = *reinterpret_cast<const CppType*>(encoded_min.data()); \
+        max_value = *reinterpret_cast<const CppType*>(encoded_max.data()); \
         break;
-        //  When precision exceeds 18, decimal will use 
tparquet::Type::FIXED_LEN_BYTE_ARRAY to encode
-        //  todo: support decimal128 after the test passes
-        //        else if (params.parquet_type == 
tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
-        //            DecimalV2Value conjunct_value = 
*((DecimalV2Value*)params.value);
-        //            Int128 conjunct_int_value = conjunct_value.value();
-        //            Int128 min = _decode_value_to_int128(params, 
params.min_bytes);
-        //            _align_decimal_v2_scale(&conjunct_int_value, 
conjunct_value.scale(), &min,
-        //                                    params.parquet_scale);
-        //            return _filter_group_by_lt_or_le(conjunct_int_value, 
min, is_eq);
-        //        }
-    }
-    case TYPE_DATE: {
-        //        doris::DateTimeValue* min_date = 
(doris::DateTimeValue*)params.value;
-        //        LOG(INFO) << min_date->debug_string();
-        return false;
-    }
-    default:
-        return false;
-    }
-    return false;
-}
+            FOR_REINTERPRET_TYPES(DISPATCH)
+#undef DISPATCH
+        case TYPE_VARCHAR:
+        case TYPE_CHAR:
+        case TYPE_STRING:
+            if constexpr (std::is_same_v<CppType, StringRef>) {
+                min_value = StringRef(encoded_min);
+                max_value = StringRef(encoded_max);
+            } else {
+                return false;
+            };
+            break;
+        case TYPE_DECIMALV2:
+            if constexpr (std::is_same_v<CppType, DecimalV2Value>) {
+                size_t max_precision = 
max_decimal_precision<Decimal<__int128_t>>();
+                if (col_schema->parquet_schema.precision < 1 ||
+                    col_schema->parquet_schema.precision > max_precision ||
+                    col_schema->parquet_schema.scale > max_precision) {
+                    return false;
+                }
+                int v2_scale = DecimalV2Value::SCALE;
+                if (physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
+                    min_value = DecimalV2Value(
+                            _decode_binary_decimal<Int128>(col_schema, 
encoded_min, v2_scale));
+                    max_value = DecimalV2Value(
+                            _decode_binary_decimal<Int128>(col_schema, 
encoded_max, v2_scale));
+                } else if (physical_type == tparquet::Type::INT32) {
+                    min_value = 
DecimalV2Value(_decode_primitive_decimal<Int128, Int32>(
+                            col_schema, encoded_min, v2_scale));
+                    max_value = 
DecimalV2Value(_decode_primitive_decimal<Int128, Int32>(
+                            col_schema, encoded_max, v2_scale));
+                } else if (physical_type == tparquet::Type::INT64) {
+                    min_value = 
DecimalV2Value(_decode_primitive_decimal<Int128, Int64>(
+                            col_schema, encoded_min, v2_scale));
+                    max_value = 
DecimalV2Value(_decode_primitive_decimal<Int128, Int64>(
+                            col_schema, encoded_max, v2_scale));
+                } else {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+            break;
+        case TYPE_DECIMAL32:
+        case TYPE_DECIMAL64:
+        case TYPE_DECIMAL128I:
+            if constexpr (std::is_same_v<CppType, int32_t> || 
std::is_same_v<CppType, int64_t> ||
+                          std::is_same_v<CppType, __int128_t>) {
+                size_t max_precision = 
max_decimal_precision<Decimal<CppType>>();
+                if (col_schema->parquet_schema.precision < 1 ||
+                    col_schema->parquet_schema.precision > max_precision ||
+                    col_schema->parquet_schema.scale > max_precision) {
+                    return false;
+                }
+                if (physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
+                    min_value = _decode_binary_decimal<CppType>(col_schema, 
encoded_min,
+                                                                
predicate.scale);
+                    max_value = _decode_binary_decimal<CppType>(col_schema, 
encoded_max,
+                                                                
predicate.scale);
+                } else if (physical_type == tparquet::Type::INT32) {
+                    min_value = _decode_primitive_decimal<CppType, 
Int32>(col_schema, encoded_min,
+                                                                          
predicate.scale);
+                    max_value = _decode_primitive_decimal<CppType, 
Int32>(col_schema, encoded_max,
+                                                                          
predicate.scale);
+                } else if (physical_type == tparquet::Type::INT64) {
+                    min_value = _decode_primitive_decimal<CppType, 
Int64>(col_schema, encoded_min,
+                                                                          
predicate.scale);
+                    max_value = _decode_primitive_decimal<CppType, 
Int64>(col_schema, encoded_max,
+                                                                          
predicate.scale);
+                } else {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+            break;
+        case TYPE_DATE:
+        case TYPE_DATEV2:
+            if (physical_type == tparquet::Type::INT32) {
+                int64_t min_date_value =
+                        static_cast<int64_t>(*reinterpret_cast<const 
int32_t*>(encoded_min.data()));
+                int64_t max_date_value =
+                        static_cast<int64_t>(*reinterpret_cast<const 
int32_t*>(encoded_max.data()));
+                if constexpr (std::is_same_v<CppType, DateTimeValue> ||
+                              std::is_same_v<CppType, 
DateV2Value<DateV2ValueType>>) {
+                    min_value.from_unixtime(min_date_value * 24 * 60 * 60, 
ctz);
+                    max_value.from_unixtime(max_date_value * 24 * 60 * 60, 
ctz);
+                } else {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+            break;
+        case TYPE_DATETIME:
+        case TYPE_DATETIMEV2:
+            if (physical_type == tparquet::Type::INT96) {
+                ParquetInt96 datetime96_min =
+                        *reinterpret_cast<const 
ParquetInt96*>(encoded_min.data());
+                int64_t micros_min = datetime96_min.to_timestamp_micros();
+                ParquetInt96 datetime96_max =
+                        *reinterpret_cast<const 
ParquetInt96*>(encoded_max.data());
+                int64_t micros_max = datetime96_max.to_timestamp_micros();
+                if constexpr (std::is_same_v<CppType, DateTimeValue> ||
+                              std::is_same_v<CppType, 
DateV2Value<DateTimeV2ValueType>>) {
+                    min_value.from_unixtime(micros_min / 1000000, ctz);
+                    max_value.from_unixtime(micros_max / 1000000, ctz);
+                    if constexpr (std::is_same_v<CppType, 
DateV2Value<DateTimeV2ValueType>>) {
+                        min_value.set_microsecond(micros_min % 1000000);
+                        max_value.set_microsecond(micros_max % 1000000);
+                    }
+                } else {
+                    return false;
+                }
+            } else if (physical_type == tparquet::Type::INT64) {
+                int64_t date_value_min = *reinterpret_cast<const 
int64_t*>(encoded_min.data());
+                int64_t date_value_max = *reinterpret_cast<const 
int64_t*>(encoded_max.data());
 
-struct ScanPredicate {
-    ScanPredicate() = default;
-    ~ScanPredicate() = default;
-    std::string _col_name;
-    TExprOpcode::type _op;
-    std::vector<void*> _values;
-    bool _null_op = false;
-    bool _is_null = false;
-    int _scale;
+                int64_t second_mask = 1;
+                int64_t scale_to_nano_factor = 1;
+                cctz::time_zone resolved_ctz = ctz;
+                const auto& schema = col_schema->parquet_schema;
+                if (schema.__isset.logicalType && 
schema.logicalType.__isset.TIMESTAMP) {
+                    const auto& timestamp_info = schema.logicalType.TIMESTAMP;
+                    if (!timestamp_info.isAdjustedToUTC) {
+                        // should set timezone to utc+0
+                        resolved_ctz = cctz::utc_time_zone();
+                    }
+                    const auto& time_unit = timestamp_info.unit;
+                    if (time_unit.__isset.MILLIS) {
+                        second_mask = 1000;
+                        scale_to_nano_factor = 1000000;
+                    } else if (time_unit.__isset.MICROS) {
+                        second_mask = 1000000;
+                        scale_to_nano_factor = 1000;
+                    } else if (time_unit.__isset.NANOS) {
+                        second_mask = 1000000000;
+                        scale_to_nano_factor = 1;
+                    }
+                } else if (schema.__isset.converted_type) {
+                    const auto& converted_type = schema.converted_type;
+                    if (converted_type == 
tparquet::ConvertedType::TIMESTAMP_MILLIS) {
+                        second_mask = 1000;
+                        scale_to_nano_factor = 1000000;
+                    } else if (converted_type == 
tparquet::ConvertedType::TIMESTAMP_MICROS) {
+                        second_mask = 1000000;
+                        scale_to_nano_factor = 1000;
+                    }
+                }
 
-    ScanPredicate(const ScanPredicate& other) {
-        _col_name = other._col_name;
-        _op = other._op;
-        for (void* v : other._values) {
-            _values.push_back(v);
+                if constexpr (std::is_same_v<CppType, DateTimeValue> ||
+                              std::is_same_v<CppType, 
DateV2Value<DateTimeV2ValueType>>) {
+                    min_value.from_unixtime(date_value_min / second_mask, 
resolved_ctz);
+                    max_value.from_unixtime(date_value_max / second_mask, 
resolved_ctz);
+                    if constexpr (std::is_same_v<CppType, 
DateV2Value<DateTimeV2ValueType>>) {
+                        min_value.set_microsecond((date_value_min % 
second_mask) *
+                                                  scale_to_nano_factor / 1000);
+                        max_value.set_microsecond((date_value_max % 
second_mask) *
+                                                  scale_to_nano_factor / 1000);
+                    }
+                } else {
+                    return false;
+                }
+            } else {
+                return false;
+            }
+            break;
+        default:
+            return false;
         }
-        _null_op = other._null_op;
-        _is_null = other._is_null;
-        _scale = other._scale;
+        return _filter_by_min_max(predicate.op, predicate_values, min_value, 
max_value);
     }
-};
 
-template <PrimitiveType primitive_type>
-static void to_filter(const ColumnValueRange<primitive_type>& col_val_range,
-                      std::vector<ScanPredicate>& filters) {
-    using CppType = typename PrimitiveTypeTraits<primitive_type>::CppType;
-    const auto& high_value = col_val_range.get_range_max_value();
-    const auto& low_value = col_val_range.get_range_min_value();
-    const auto& high_op = col_val_range.get_range_high_op();
-    const auto& low_op = col_val_range.get_range_low_op();
+    template <PrimitiveType primitive_type>
+    static std::vector<ScanPredicate> _value_range_to_predicate(
+            const ColumnValueRange<primitive_type>& col_val_range) {
+        using CppType = typename PrimitiveTypeTraits<primitive_type>::CppType;
+        std::vector<ScanPredicate> predicates;
 
-    // todo: process equals
-    if (col_val_range.is_fixed_value_range()) {
-        // 1. convert to in filter condition
-        ScanPredicate condition;
-        condition._col_name = col_val_range.column_name();
-        condition._op = TExprOpcode::FILTER_NEW_IN;
-        condition._scale = col_val_range.scale();
-        if (col_val_range.get_fixed_value_set().empty()) {
-            return;
-        }
-        for (const auto& value : col_val_range.get_fixed_value_set()) {
-            condition._values.push_back(const_cast<CppType*>(&value));
-        }
-        filters.push_back(condition);
-    } else if (low_value < high_value) {
-        // 2. convert to min max filter condition
-        ScanPredicate null_pred;
-        if (col_val_range.is_high_value_maximum() && high_op == 
SQLFilterOp::FILTER_LESS_OR_EQUAL &&
-            col_val_range.is_low_value_mininum() && low_op == 
SQLFilterOp::FILTER_LARGER_OR_EQUAL &&
-            !col_val_range.contain_null()) {
-            null_pred._col_name = col_val_range.column_name();
-            null_pred._null_op = true;
-            null_pred._is_null = false;
-            filters.push_back(null_pred);
-            return;
-        }
-        ScanPredicate low;
-        if (!col_val_range.is_low_value_mininum() ||
-            SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) {
-            low._col_name = col_val_range.column_name();
-            low._op = (low_op == SQLFilterOp::FILTER_LARGER_OR_EQUAL ? 
TExprOpcode::GE
-                                                                     : 
TExprOpcode::GT);
-            // NOTICE: use get_range_min_value_ptr, not "low_value"'s addr,
-            // to avoid stack-use-after-return bug
-            
low._values.push_back(const_cast<CppType*>(col_val_range.get_range_min_value_ptr()));
-            low._scale = col_val_range.scale();
-            filters.push_back(low);
+        if (col_val_range.is_fixed_value_range()) {
+            ScanPredicate in_predicate;
+            in_predicate.op = SQLFilterOp::FILTER_IN;
+            in_predicate.scale = col_val_range.scale();
+            for (const auto& value : col_val_range.get_fixed_value_set()) {
+                in_predicate.values.emplace_back(&value);
+            }
+            if (!in_predicate.values.empty()) {
+                predicates.emplace_back(in_predicate);
+            }
+            return predicates;
         }
 
-        ScanPredicate high;
-        if (!col_val_range.is_high_value_maximum() ||
-            SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) {
-            high._col_name = col_val_range.column_name();
-            high._op = (high_op == SQLFilterOp::FILTER_LESS_OR_EQUAL ? 
TExprOpcode::LE
-                                                                     : 
TExprOpcode::LT);
-            // NOTICE: use get_range_max_value_ptr, not "high_value"'s addr,
-            // to avoid stack-use-after-return bug
-            
high._values.push_back(const_cast<CppType*>(col_val_range.get_range_max_value_ptr()));
-            high._scale = col_val_range.scale();
-            filters.push_back(high);
-        }
-    } else {
-        // 3. convert to is null and is not null filter condition
-        ScanPredicate null_pred;
-        if (col_val_range.is_low_value_maximum() && 
col_val_range.is_high_value_mininum() &&
-            col_val_range.contain_null()) {
-            null_pred._col_name = col_val_range.column_name();
-            null_pred._null_op = true;
-            null_pred._is_null = true;
-            filters.push_back(null_pred);
-        }
-    }
-}
+        const CppType high_value = col_val_range.get_range_max_value();
+        const CppType low_value = col_val_range.get_range_min_value();
+        const SQLFilterOp high_op = col_val_range.get_range_high_op();
+        const SQLFilterOp low_op = col_val_range.get_range_low_op();
 
-static void _eval_predicate(const ScanPredicate& filter, ColumnMinMaxParams* 
params,
-                            bool* need_filter) {
-    if (filter._values.empty()) {
-        return;
-    }
-    if (filter._op == TExprOpcode::FILTER_NEW_IN) {
-        if (filter._values.size() == 1) {
-            params->value = filter._values[0];
-            *need_filter = _eval_eq(*params);
-            return;
+        // orc can only push down is_null. When col_value_range._contain_null 
= true, only indicating that
+        // value can be null, not equals null, so ignore _contain_null in 
col_value_range
+        if (col_val_range.is_high_value_maximum() && high_op == 
SQLFilterOp::FILTER_LESS_OR_EQUAL &&
+            col_val_range.is_low_value_mininum() && low_op == 
SQLFilterOp::FILTER_LARGER_OR_EQUAL) {
+            return predicates;
         }
-        params->in_pred_values = filter._values;
-        *need_filter = _eval_in_val(*params);
-        return;
-    }
-    // preserve TExprOpcode::FILTER_NEW_NOT_IN
-    params->value = filter._values[0];
-    switch (filter._op) {
-    case TExprOpcode::EQ:
-        *need_filter = _eval_eq(*params);
-        break;
-    case TExprOpcode::NE:
-        break;
-    case TExprOpcode::GT:
-        *need_filter = _eval_gt(*params, false);
-        break;
-    case TExprOpcode::GE:
-        *need_filter = _eval_gt(*params, true);
-        break;
-    case TExprOpcode::LT:
-        *need_filter = _eval_lt(*params, false);
-        break;
-    case TExprOpcode::LE:
-        *need_filter = _eval_lt(*params, true);
-        break;
-    default:
-        break;
-    }
-}
 
-static bool determine_filter_min_max(const ColumnValueRangeType& col_val_range,
-                                     const FieldSchema* col_schema, const 
std::string& encoded_min,
-                                     const std::string& encoded_max) {
-    const char* min_bytes = encoded_min.data();
-    const char* max_bytes = encoded_max.data();
-    bool need_filter = false;
-    std::vector<ScanPredicate> filters;
-    PrimitiveType col_type;
-    std::visit(
-            [&](auto&& range) {
-                col_type = range.type();
-                to_filter(range, filters);
-            },
-            col_val_range);
-    if (filters.empty()) {
-        return false;
+        if (low_value < high_value) {
+            if (!col_val_range.is_low_value_mininum() ||
+                SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) {
+                ScanPredicate low_predicate;
+                low_predicate.scale = col_val_range.scale();
+                low_predicate.op = low_op;
+                
low_predicate.values.emplace_back(col_val_range.get_range_min_value_ptr());
+                predicates.emplace_back(low_predicate);
+            }
+            if (!col_val_range.is_high_value_maximum() ||
+                SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) {
+                ScanPredicate high_predicate;
+                high_predicate.scale = col_val_range.scale();
+                high_predicate.op = high_op;
+                
high_predicate.values.emplace_back(col_val_range.get_range_max_value_ptr());
+                predicates.emplace_back(high_predicate);
+            }
+        }
+        return predicates;
     }
 
-    ColumnMinMaxParams params;
-    params.conjunct_type = col_type;
-    params.parquet_type = col_schema->physical_type;
-    params.parquet_precision = col_schema->parquet_schema.precision;
-    params.parquet_scale = col_schema->parquet_schema.scale;
-    params.parquet_type_length = col_schema->parquet_schema.type_length;
-    params.min_bytes = min_bytes;
-    params.max_bytes = max_bytes;
-    for (int i = 0; i < filters.size(); i++) {
-        _eval_predicate(filters[i], &params, &need_filter);
-        if (need_filter) {
-            break;
-        }
+public:
+    static bool filter_by_min_max(const ColumnValueRangeType& col_val_range,
+                                  const FieldSchema* col_schema, const 
std::string& encoded_min,
+                                  const std::string& encoded_max, const 
cctz::time_zone& ctz) {
+        bool need_filter = false;
+        std::visit(
+                [&](auto&& range) {
+                    std::vector<ScanPredicate> filters = 
_value_range_to_predicate(range);
+                    for (auto& filter : filters) {
+                        need_filter |= _filter_by_min_max(range, filter, 
col_schema, encoded_min,
+                                                          encoded_max, ctz);
+                        if (need_filter) {
+                            break;
+                        }
+                    }
+                },
+                col_val_range);
+        return need_filter;
     }
-    return need_filter;
-}
+};
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp 
b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
index acb56d0c2a..ad70d8f8bc 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
@@ -40,15 +40,17 @@ Status 
PageIndex::create_skipped_row_range(tparquet::OffsetIndex& offset_index,
 Status PageIndex::collect_skipped_page_range(tparquet::ColumnIndex* 
column_index,
                                              ColumnValueRangeType& 
col_val_range,
                                              const FieldSchema* col_schema,
-                                             std::vector<int>& skipped_ranges) 
{
+                                             std::vector<int>& skipped_ranges,
+                                             const cctz::time_zone& ctz) {
     const std::vector<std::string>& encoded_min_vals = 
column_index->min_values;
     const std::vector<std::string>& encoded_max_vals = 
column_index->max_values;
     DCHECK_EQ(encoded_min_vals.size(), encoded_max_vals.size());
 
     const int num_of_pages = column_index->null_pages.size();
     for (int page_id = 0; page_id < num_of_pages; page_id++) {
-        if (determine_filter_min_max(col_val_range, col_schema, 
encoded_min_vals[page_id],
-                                     encoded_max_vals[page_id])) {
+        if (ParquetPredicate::filter_by_min_max(col_val_range, col_schema,
+                                                encoded_min_vals[page_id],
+                                                encoded_max_vals[page_id], 
ctz)) {
             skipped_ranges.emplace_back(page_id);
         }
     }
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h 
b/be/src/vec/exec/format/parquet/vparquet_page_index.h
index 4a27593abf..00cbbdcfdd 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h
@@ -33,7 +33,7 @@ public:
     Status collect_skipped_page_range(tparquet::ColumnIndex* column_index,
                                       ColumnValueRangeType& col_val_range,
                                       const FieldSchema* col_schema,
-                                      std::vector<int>& skipped_ranges);
+                                      std::vector<int>& skipped_ranges, const 
cctz::time_zone& ctz);
     bool check_and_get_page_index_ranges(const 
std::vector<tparquet::ColumnChunk>& columns);
     Status parse_column_index(const tparquet::ColumnChunk& chunk, const 
uint8_t* buff,
                               tparquet::ColumnIndex* column_index);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 1b7653433a..2d149d7ba5 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -583,7 +583,7 @@ Status ParquetReader::_process_page_index(const 
tparquet::RowGroup& row_group,
         std::vector<int> skipped_page_range;
         const FieldSchema* col_schema = 
schema_desc.get_column(read_col._file_slot_name);
         page_index.collect_skipped_page_range(&column_index, conjuncts, 
col_schema,
-                                              skipped_page_range);
+                                              skipped_page_range, *_ctz);
         if (skipped_page_range.empty()) {
             continue;
         }
@@ -664,8 +664,8 @@ Status ParquetReader::_process_column_stat_filter(const 
std::vector<tparquet::Co
         }
         const FieldSchema* col_schema = schema_desc.get_column(col_name);
         // Min-max of statistic is plain-encoded value
-        *filter_group = determine_filter_min_max(slot_iter->second, 
col_schema, statistic.min,
-                                                 statistic.max);
+        *filter_group = ParquetPredicate::filter_by_min_max(slot_iter->second, 
col_schema,
+                                                            statistic.min, 
statistic.max, *_ctz);
         if (*filter_group) {
             break;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to