This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit d494674ff494214da6902ec7969f35626b973c1d
Author: Qi Chen <kaka11.c...@gmail.com>
AuthorDate: Thu Jan 11 20:45:53 2024 +0800

    [opt](parquet-reader) Opt parquet decimal type reading. (#29825)
---
 .../exec/format/parquet/fix_length_plain_decoder.h |  29 +++--
 .../exec/format/parquet/parquet_column_convert.h   | 138 ++++++++++++++++-----
 .../exec/format/parquet/vparquet_column_reader.cpp |  12 +-
 .../exec/format/parquet/vparquet_column_reader.h   |   4 +-
 be/test/vec/exec/parquet/parquet_thrift_test.cpp   |   4 +-
 5 files changed, 137 insertions(+), 50 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h 
b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
index b21f58601d3..c06e33c550c 100644
--- a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
+++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h
@@ -103,14 +103,29 @@ Status 
FixLengthPlainDecoder<PhysicalType>::_decode_string(MutableColumnPtr& dor
     while (size_t run_length = 
select_vector.get_next_run<has_filter>(&read_type)) {
         switch (read_type) {
         case ColumnSelectVector::CONTENT: {
-            std::vector<StringRef> string_values;
-            string_values.reserve(run_length);
-            for (size_t i = 0; i < run_length; ++i) {
-                char* buf_start = _data->data + _offset;
-                string_values.emplace_back(buf_start, _type_length);
-                _offset += _type_length;
+            auto* column_string = 
assert_cast<ColumnString*>(doris_column.get());
+            auto& chars = column_string->get_chars();
+            auto& offsets = column_string->get_offsets();
+            size_t bytes_size = chars.size();
+
+            // copy chars
+            size_t data_size = run_length * _type_length;
+            size_t old_size = chars.size();
+            chars.resize(old_size + data_size);
+            memcpy(chars.data() + old_size, _data->data, data_size);
+
+            // copy offsets
+            offsets.resize(offsets.size() + run_length);
+            auto* offsets_data = offsets.data() + offsets.size() - run_length;
+
+            int i = 0;
+            for (; i < run_length; i++) {
+                bytes_size += _type_length;
+                *(offsets_data++) = bytes_size;
             }
-            doris_column->insert_many_strings(&string_values[0], run_length);
+
+            
//doris_column->insert_many_strings_fixed_length<_type_length>(&string_values[0],
 run_length);
+            _offset += data_size;
             break;
         }
         case ColumnSelectVector::NULL_DATA: {
diff --git a/be/src/vec/exec/format/parquet/parquet_column_convert.h 
b/be/src/vec/exec/format/parquet/parquet_column_convert.h
index e5ee2104176..b5c3ffb7c88 100644
--- a/be/src/vec/exec/format/parquet/parquet_column_convert.h
+++ b/be/src/vec/exec/format/parquet/parquet_column_convert.h
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#pragma once
+
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/parquet_types.h>
@@ -219,7 +221,7 @@ struct ColumnConvert {
     }
 
 public:
-    ConvertParams* _convert_params = nullptr;
+    std::unique_ptr<ConvertParams> _convert_params;
 };
 
 template <tparquet::Type::type parquet_physical_type, typename dst_type>
@@ -349,6 +351,91 @@ public:
     }
 };
 
+template <typename DecimalType, DecimalScaleParams::ScaleType ScaleType>
+class FixedStringToDecimal : public ColumnConvert {
+public:
+    FixedStringToDecimal(int32_t type_length) : ColumnConvert(), 
_type_length(type_length) {}
+    Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override {
+        convert_null(src_col, dst_col);
+
+#define M(FixedTypeLength, ValueCopyType) \
+    case FixedTypeLength:                 \
+        return _convert_internal<FixedTypeLength, ValueCopyType>(src_col, 
dst_col);
+
+#define APPLY_FOR_DECIMALS() \
+    M(1, int64_t)            \
+    M(2, int64_t)            \
+    M(3, int64_t)            \
+    M(4, int64_t)            \
+    M(5, int64_t)            \
+    M(6, int64_t)            \
+    M(7, int64_t)            \
+    M(8, int64_t)            \
+    M(9, int128_t)           \
+    M(10, int128_t)          \
+    M(11, int128_t)          \
+    M(12, int128_t)          \
+    M(13, int128_t)          \
+    M(14, int128_t)          \
+    M(15, int128_t)          \
+    M(16, int128_t)
+
+        switch (_type_length) {
+            APPLY_FOR_DECIMALS()
+        default:
+            LOG(FATAL) << "__builtin_unreachable";
+            __builtin_unreachable();
+        }
+        return Status::OK();
+#undef APPLY_FOR_DECIMALS
+#undef M
+    }
+
+    template <int fixed_type_length, typename ValueCopyType>
+    Status _convert_internal(ColumnPtr& src_col, MutableColumnPtr& dst_col) {
+        size_t rows = src_col->size();
+        DecimalScaleParams& scale_params = _convert_params->decimal_scale;
+        auto buf = static_cast<const 
ColumnString*>(src_col.get())->get_chars().data();
+        auto& offset = static_cast<const 
ColumnString*>(src_col.get())->get_offsets();
+        size_t start_idx = dst_col->size();
+        dst_col->resize(start_idx + rows);
+
+        auto& data = 
static_cast<ColumnDecimal<DecimalType>*>(dst_col.get())->get_data();
+        for (int i = 0; i < rows; i++) {
+            size_t len = offset[i] - offset[i - 1];
+            if (len == 0) {
+                continue;
+            }
+            // When Decimal in parquet is stored in byte arrays, binary and 
fixed,
+            // the unscaled number must be encoded as two's complement using 
big-endian byte order.
+            ValueCopyType value = 0;
+            //memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], 
fixed_type_length);
+            // For performance, we can copy sizeof(value) because 
`ColumnDecimal::Container` use `PaddedPODArray` which has 15 bytes pad_right 
and the max fixed_type_length is 16 bytes.
+            memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], 
sizeof(value));
+            //memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], 
len);
+            value = BitUtil::big_endian_to_host(value);
+            value = value >> ((sizeof(value) - fixed_type_length) * 8);
+            if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) {
+                value *= scale_params.scale_factor;
+            } else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) {
+                value /= scale_params.scale_factor;
+            } else if constexpr (ScaleType == DecimalScaleParams::NO_SCALE) {
+                // do nothing
+            } else {
+                LOG(FATAL) << "__builtin_unreachable";
+                __builtin_unreachable();
+            }
+            auto& v = reinterpret_cast<DecimalType&>(data[start_idx + i]);
+            v = (DecimalType)value;
+        }
+
+        return Status::OK();
+    }
+
+private:
+    int32_t _type_length;
+};
+
 template <typename DecimalType, typename ValueCopyType, 
DecimalScaleParams::ScaleType ScaleType>
 class StringToDecimal : public ColumnConvert {
 public:
@@ -497,8 +584,11 @@ public:
 
 inline Status get_converter(tparquet::Type::type parquet_physical_type, 
PrimitiveType show_type,
                             std::shared_ptr<const IDataType> dst_data_type,
-                            std::unique_ptr<ColumnConvert>* converter,
-                            ConvertParams* convert_params) {
+                            std::unique_ptr<ColumnConvert>* converter, 
FieldSchema* field_schema,
+                            cctz::time_zone* ctz) {
+    std::unique_ptr<ParquetConvert::ConvertParams> convert_params =
+            std::make_unique<ParquetConvert::ConvertParams>();
+    convert_params->init(field_schema, ctz);
     auto dst_type = remove_nullable(dst_data_type)->get_type_id();
     switch (dst_type) {
 #define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE)                
          \
@@ -592,34 +682,18 @@ inline Status get_converter(tparquet::Type::type 
parquet_physical_type, Primitiv
         DecimalScaleParams& scale_params = convert_params->decimal_scale;      
                   \
         if (tparquet::Type::FIXED_LEN_BYTE_ARRAY == parquet_physical_type) {   
                   \
             size_t string_length = 
convert_params->field_schema->parquet_schema.type_length;      \
-            if (string_length <= 8) {                                          
                   \
-                if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { 
                   \
-                    *converter =                                               
                   \
-                            std::make_unique<StringToDecimal<DECIMAL_TYPE, 
int64_t,               \
-                                                             
DecimalScaleParams::SCALE_UP>>();    \
-                } else if (scale_params.scale_type == 
DecimalScaleParams::SCALE_DOWN) {           \
-                    *converter =                                               
                   \
-                            std::make_unique<StringToDecimal<DECIMAL_TYPE, 
int64_t,               \
-                                                             
DecimalScaleParams::SCALE_DOWN>>();  \
-                } else {                                                       
                   \
-                    *converter =                                               
                   \
-                            std::make_unique<StringToDecimal<DECIMAL_TYPE, 
int64_t,               \
-                                                             
DecimalScaleParams::NO_SCALE>>();    \
-                }                                                              
                   \
-            } else if (string_length <= 16) {                                  
                   \
-                if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { 
                   \
-                    *converter =                                               
                   \
-                            std::make_unique<StringToDecimal<DECIMAL_TYPE, 
int128_t,              \
-                                                             
DecimalScaleParams::SCALE_UP>>();    \
-                } else if (scale_params.scale_type == 
DecimalScaleParams::SCALE_DOWN) {           \
-                    *converter =                                               
                   \
-                            std::make_unique<StringToDecimal<DECIMAL_TYPE, 
int128_t,              \
-                                                             
DecimalScaleParams::SCALE_DOWN>>();  \
-                } else {                                                       
                   \
-                    *converter =                                               
                   \
-                            std::make_unique<StringToDecimal<DECIMAL_TYPE, 
int128_t,              \
-                                                             
DecimalScaleParams::NO_SCALE>>();    \
-                }                                                              
                   \
+            if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {     
                   \
+                *converter = std::make_unique<                                 
                   \
+                        FixedStringToDecimal<DECIMAL_TYPE, 
DecimalScaleParams::SCALE_UP>>(        \
+                        string_length);                                        
                   \
+            } else if (scale_params.scale_type == 
DecimalScaleParams::SCALE_DOWN) {               \
+                *converter = std::make_unique<                                 
                   \
+                        FixedStringToDecimal<DECIMAL_TYPE, 
DecimalScaleParams::SCALE_DOWN>>(      \
+                        string_length);                                        
                   \
+            } else {                                                           
                   \
+                *converter = std::make_unique<                                 
                   \
+                        FixedStringToDecimal<DECIMAL_TYPE, 
DecimalScaleParams::NO_SCALE>>(        \
+                        string_length);                                        
                   \
             }                                                                  
                   \
         } else if (tparquet::Type::BYTE_ARRAY == parquet_physical_type) {      
                   \
             
convert_params->init_decimal_converter<PRIMARY_TYPE>(dst_data_type);            
      \
@@ -674,7 +748,7 @@ inline Status get_converter(tparquet::Type::type 
parquet_physical_type, Primitiv
                                     tparquet::to_string(parquet_physical_type),
                                     getTypeName(dst_type));
     }
-    (*converter)->_convert_params = convert_params;
+    (*converter)->_convert_params = std::move(convert_params);
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 7c088394131..cfddb5346e8 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -25,7 +25,6 @@
 #include <algorithm>
 #include <utility>
 
-#include "parquet_column_convert.h"
 #include "runtime/define_primitive_type.h"
 #include "schema_desc.h"
 #include "util/runtime_profile.h"
@@ -575,13 +574,12 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& 
doris_column, DataTypePtr
     } while (false);
 
     if (need_convert) {
-        std::unique_ptr<ParquetConvert::ColumnConvert> converter;
-        ParquetConvert::ConvertParams convert_params;
-        convert_params.init(_field_schema, _ctz);
-        RETURN_IF_ERROR(ParquetConvert::get_converter(parquet_physical_type, 
show_type, type,
-                                                      &converter, 
&convert_params));
+        if (_converter == nullptr) {
+            
RETURN_IF_ERROR(ParquetConvert::get_converter(parquet_physical_type, show_type, 
type,
+                                                          &_converter, 
_field_schema, _ctz));
+        }
         auto x = doris_column->assume_mutable();
-        RETURN_IF_ERROR(converter->convert(src_column, x));
+        RETURN_IF_ERROR(_converter->convert(src_column, x));
     }
 
     return Status::OK();
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index 994908711b2..d15d6d5efa1 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -29,6 +29,7 @@
 
 #include "io/fs/buffered_reader.h"
 #include "io/fs/file_reader_writer_fwd.h"
+#include "parquet_column_convert.h"
 #include "vec/columns/columns_number.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exec/format/parquet/parquet_common.h"
@@ -183,6 +184,7 @@ private:
     std::unique_ptr<ColumnChunkReader> _chunk_reader;
     std::vector<level_t> _rep_levels;
     std::vector<level_t> _def_levels;
+    std::unique_ptr<ParquetConvert::ColumnConvert> _converter = nullptr;
 
     Status _skip_values(size_t num_values);
     Status _read_values(size_t num_values, ColumnPtr& doris_column, 
DataTypePtr& type,
@@ -288,4 +290,4 @@ private:
     std::vector<std::unique_ptr<ParquetColumnReader>> _child_readers;
 };
 
-}; // namespace doris::vectorized
\ No newline at end of file
+}; // namespace doris::vectorized
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp 
b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index cb850cf5d57..03a6ac5eed1 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -266,10 +266,8 @@ static Status get_column_values(io::FileReaderSPtr 
file_reader, tparquet::Column
     }
     if (need_convert) {
         std::unique_ptr<ParquetConvert::ColumnConvert> converter;
-        ParquetConvert::ConvertParams convert_params;
-        convert_params.init(field_schema, &ctz);
         RETURN_IF_ERROR(ParquetConvert::get_converter(parquet_physical_type, 
show_type, data_type,
-                                                      &converter, 
&convert_params));
+                                                      &converter, 
field_schema, &ctz));
         auto x = doris_column->assume_mutable();
         RETURN_IF_ERROR(converter->convert(src_column, x));
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to