This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c2ff940947 [refactor](parquet)change decimal type export as 
fixed-len-byte on parquet write (#22792)
c2ff940947 is described below

commit c2ff940947d27e14dcfaad915b39a982512068e9
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Tue Aug 15 13:17:50 2023 +0800

    [refactor](parquet)change decimal type export as fixed-len-byte on parquet 
write (#22792)
    
    before the parquet write export decimal as byte-binary,
    but can't be import those fied to Hive.
    Now, change to export decimal as fixed-len-byte-array in order to import 
hive directly.
---
 be/src/vec/runtime/vparquet_writer.cpp             | 167 ++++++++++++++++-----
 be/src/vec/runtime/vparquet_writer.h               |   3 +-
 .../org/apache/doris/analysis/OutFileClause.java   |  38 +++--
 3 files changed, 156 insertions(+), 52 deletions(-)

diff --git a/be/src/vec/runtime/vparquet_writer.cpp 
b/be/src/vec/runtime/vparquet_writer.cpp
index 13fd6d8b85..230abce0de 100644
--- a/be/src/vec/runtime/vparquet_writer.cpp
+++ b/be/src/vec/runtime/vparquet_writer.cpp
@@ -34,6 +34,7 @@
 #include <string>
 
 #include "common/status.h"
+#include "gutil/endian.h"
 #include "io/fs/file_writer.h"
 #include "olap/olap_common.h"
 #include "runtime/decimalv2_value.h"
@@ -180,8 +181,31 @@ void 
ParquetBuildHelper::build_schema_data_type(parquet::Type::type& parquet_dat
 
 void ParquetBuildHelper::build_schema_data_logical_type(
         std::shared_ptr<const parquet::LogicalType>& 
parquet_data_logical_type_ptr,
-        const TParquetDataLogicalType::type& column_data_logical_type) {
+        const TParquetDataLogicalType::type& column_data_logical_type, int* 
primitive_length,
+        const TypeDescriptor& type_desc) {
     switch (column_data_logical_type) {
+    case TParquetDataLogicalType::DECIMAL: {
+        DCHECK(type_desc.precision != -1 && type_desc.scale != -1)
+                << "precision and scale: " << type_desc.precision << " " << 
type_desc.scale;
+        if (type_desc.type == TYPE_DECIMAL32) {
+            *primitive_length = 4;
+        } else if (type_desc.type == TYPE_DECIMAL64) {
+            *primitive_length = 8;
+        } else if (type_desc.type == TYPE_DECIMAL128I) {
+            *primitive_length = 16;
+        } else {
+            throw parquet::ParquetException(
+                    "the logical decimal now only support in decimalv3, maybe 
error of " +
+                    type_desc.debug_string());
+        }
+        parquet_data_logical_type_ptr =
+                parquet::LogicalType::Decimal(type_desc.precision, 
type_desc.scale);
+        break;
+    }
+    case TParquetDataLogicalType::STRING: {
+        parquet_data_logical_type_ptr = parquet::LogicalType::String();
+        break;
+    }
     case TParquetDataLogicalType::DATE: {
         parquet_data_logical_type_ptr = parquet::LogicalType::Date();
         break;
@@ -290,19 +314,22 @@ Status VParquetWriterWrapper::parse_properties() {
 Status VParquetWriterWrapper::parse_schema() {
     parquet::schema::NodeVector fields;
     parquet::Repetition::type parquet_repetition_type;
-    parquet::Type::type parquet_data_type;
+    parquet::Type::type parquet_physical_type;
     std::shared_ptr<const parquet::LogicalType> parquet_data_logical_type;
+    int primitive_length = -1;
     for (int idx = 0; idx < _parquet_schemas.size(); ++idx) {
+        primitive_length = -1;
         ParquetBuildHelper::build_schema_repetition_type(
                 parquet_repetition_type, 
_parquet_schemas[idx].schema_repetition_type);
-        ParquetBuildHelper::build_schema_data_type(parquet_data_type,
+        ParquetBuildHelper::build_schema_data_type(parquet_physical_type,
                                                    
_parquet_schemas[idx].schema_data_type);
         ParquetBuildHelper::build_schema_data_logical_type(
-                parquet_data_logical_type, 
_parquet_schemas[idx].schema_data_logical_type);
+                parquet_data_logical_type, 
_parquet_schemas[idx].schema_data_logical_type,
+                &primitive_length, _output_vexpr_ctxs[idx]->root()->type());
         try {
             fields.push_back(parquet::schema::PrimitiveNode::Make(
                     _parquet_schemas[idx].schema_column_name, 
parquet_repetition_type,
-                    parquet_data_logical_type, parquet_data_type));
+                    parquet_data_logical_type, parquet_physical_type, 
primitive_length));
         } catch (const parquet::ParquetException& e) {
             LOG(WARNING) << "parquet writer parse schema error: " << e.what();
             return Status::InternalError("parquet writer parse schema error: 
{}", e.what());
@@ -335,37 +362,6 @@ Status VParquetWriterWrapper::parse_schema() {
         RETURN_WRONG_TYPE                                                      
                   \
     }
 
-#define DISPATCH_PARQUET_DECIMAL_WRITER(DECIMAL_TYPE)                          
                  \
-    parquet::RowGroupWriter* rgWriter = get_rg_writer();                       
                  \
-    parquet::ByteArrayWriter* col_writer =                                     
                  \
-            static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));       
                  \
-    parquet::ByteArray value;                                                  
                  \
-    auto decimal_type =                                                        
                  \
-            
check_and_get_data_type<DataTypeDecimal<DECIMAL_TYPE>>(remove_nullable(type).get());
 \
-    DCHECK(decimal_type);                                                      
                  \
-    if (null_map != nullptr) {                                                 
                  \
-        auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();                 \
-        for (size_t row_id = 0; row_id < sz; row_id++) {                       
                  \
-            if (null_data[row_id] != 0) {                                      
                  \
-                single_def_level = 0;                                          
                  \
-                col_writer->WriteBatch(1, &single_def_level, nullptr, &value); 
                  \
-                single_def_level = 1;                                          
                  \
-            } else {                                                           
                  \
-                auto s = decimal_type->to_string(*col, row_id);                
                  \
-                value.ptr = reinterpret_cast<const uint8_t*>(s.data());        
                  \
-                value.len = s.size();                                          
                  \
-                col_writer->WriteBatch(1, &single_def_level, nullptr, &value); 
                  \
-            }                                                                  
                  \
-        }                                                                      
                  \
-    } else {                                                                   
                  \
-        for (size_t row_id = 0; row_id < sz; row_id++) {                       
                  \
-            auto s = decimal_type->to_string(*col, row_id);                    
                  \
-            value.ptr = reinterpret_cast<const uint8_t*>(s.data());            
                  \
-            value.len = s.size();                                              
                  \
-            col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr, 
nullptr, &value);   \
-        }                                                                      
                  \
-    }
-
 #define DISPATCH_PARQUET_COMPLEX_WRITER(COLUMN_TYPE)                           
                  \
     parquet::RowGroupWriter* rgWriter = get_rg_writer();                       
                  \
     parquet::ByteArrayWriter* col_writer =                                     
                  \
@@ -791,15 +787,108 @@ Status VParquetWriterWrapper::write(const Block& block) {
                 break;
             }
             case TYPE_DECIMAL32: {
-                DISPATCH_PARQUET_DECIMAL_WRITER(Decimal32)
+                parquet::RowGroupWriter* rgWriter = get_rg_writer();
+                parquet::FixedLenByteArrayWriter* col_writer =
+                        
static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i));
+                parquet::FixedLenByteArray value;
+                auto decimal_type = 
check_and_get_data_type<DataTypeDecimal<Decimal32>>(
+                        remove_nullable(type).get());
+                DCHECK(decimal_type);
+                if (null_map != nullptr) {
+                    auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();
+                    const auto& data_column = assert_cast<const 
ColumnDecimal32&>(*col);
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        if (null_data[row_id] != 0) {
+                            single_def_level = 0;
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
+                            single_def_level = 1;
+                        } else {
+                            auto data = data_column.get_element(row_id);
+                            auto big_endian = bswap_32(data);
+                            value.ptr = reinterpret_cast<const 
uint8_t*>(&big_endian);
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
+                        }
+                    }
+                } else {
+                    const auto& data_column = assert_cast<const 
ColumnDecimal32&>(*col);
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        auto data = data_column.get_element(row_id);
+                        auto big_endian = bswap_32(data);
+                        value.ptr = reinterpret_cast<const 
uint8_t*>(&big_endian);
+                        col_writer->WriteBatch(1, nullable ? &single_def_level 
: nullptr, nullptr,
+                                               &value);
+                    }
+                }
                 break;
             }
             case TYPE_DECIMAL64: {
-                DISPATCH_PARQUET_DECIMAL_WRITER(Decimal64)
+                parquet::RowGroupWriter* rgWriter = get_rg_writer();
+                parquet::FixedLenByteArrayWriter* col_writer =
+                        
static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i));
+                parquet::FixedLenByteArray value;
+                auto decimal_type = 
check_and_get_data_type<DataTypeDecimal<Decimal64>>(
+                        remove_nullable(type).get());
+                DCHECK(decimal_type);
+                if (null_map != nullptr) {
+                    auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();
+                    const auto& data_column = assert_cast<const 
ColumnDecimal64&>(*col);
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        if (null_data[row_id] != 0) {
+                            single_def_level = 0;
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
+                            single_def_level = 1;
+                        } else {
+                            auto data = data_column.get_element(row_id);
+                            auto big_endian = bswap_64(data);
+                            value.ptr = reinterpret_cast<const 
uint8_t*>(&big_endian);
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
+                        }
+                    }
+                } else {
+                    const auto& data_column = assert_cast<const 
ColumnDecimal64&>(*col);
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        auto data = data_column.get_element(row_id);
+                        auto big_endian = bswap_64(data);
+                        value.ptr = reinterpret_cast<const 
uint8_t*>(&big_endian);
+                        col_writer->WriteBatch(1, nullable ? &single_def_level 
: nullptr, nullptr,
+                                               &value);
+                    }
+                }
                 break;
             }
             case TYPE_DECIMAL128I: {
-                DISPATCH_PARQUET_DECIMAL_WRITER(Decimal128I)
+                parquet::RowGroupWriter* rgWriter = get_rg_writer();
+                parquet::FixedLenByteArrayWriter* col_writer =
+                        
static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i));
+                parquet::FixedLenByteArray value;
+                auto decimal_type = 
check_and_get_data_type<DataTypeDecimal<Decimal128I>>(
+                        remove_nullable(type).get());
+                DCHECK(decimal_type);
+                if (null_map != nullptr) {
+                    auto& null_data = assert_cast<const 
ColumnUInt8&>(*null_map).get_data();
+                    const auto& data_column = assert_cast<const 
ColumnDecimal128I&>(*col);
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        if (null_data[row_id] != 0) {
+                            single_def_level = 0;
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
+                            single_def_level = 1;
+                        } else {
+                            auto data = data_column.get_element(row_id);
+                            auto big_endian = gbswap_128(data);
+                            value.ptr = reinterpret_cast<const 
uint8_t*>(&big_endian);
+                            col_writer->WriteBatch(1, &single_def_level, 
nullptr, &value);
+                        }
+                    }
+                } else {
+                    const auto& data_column = assert_cast<const 
ColumnDecimal128I&>(*col);
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        auto data = data_column.get_element(row_id);
+                        auto big_endian = gbswap_128(data);
+                        value.ptr = reinterpret_cast<const 
uint8_t*>(&big_endian);
+                        col_writer->WriteBatch(1, nullable ? &single_def_level 
: nullptr, nullptr,
+                                               &value);
+                    }
+                }
                 break;
             }
             default: {
diff --git a/be/src/vec/runtime/vparquet_writer.h 
b/be/src/vec/runtime/vparquet_writer.h
index 6e07aa0e44..a79ab6ebc4 100644
--- a/be/src/vec/runtime/vparquet_writer.h
+++ b/be/src/vec/runtime/vparquet_writer.h
@@ -86,7 +86,8 @@ public:
                               const TParquetVersion::type& parquet_version);
     static void build_schema_data_logical_type(
             std::shared_ptr<const parquet::LogicalType>& 
parquet_data_logical_type_ptr,
-            const TParquetDataLogicalType::type& column_data_logical_type);
+            const TParquetDataLogicalType::type& column_data_logical_type, 
int* primitive_length,
+            const TypeDescriptor& type_desc);
 };
 
 class VFileWriterWrapper {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 85c40d79bc..24ed977a7d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -99,6 +99,7 @@ public class OutFileClause {
         PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE);
         PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array", 
TParquetDataType.FIXED_LEN_BYTE_ARRAY);
 
+        PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("decimal", 
TParquetDataLogicalType.DECIMAL);
         PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("date", 
TParquetDataLogicalType.DATE);
         PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("datetime", 
TParquetDataLogicalType.TIMESTAMP);
         // TODO(ftw): add other logical type
@@ -145,7 +146,6 @@ public class OutFileClause {
     private static final long MIN_FILE_SIZE_BYTES = 5 * 1024 * 1024L; // 5MB
     private static final long MAX_FILE_SIZE_BYTES = 2 * 1024 * 1024 * 1024L; 
// 2GB
 
-
     private String filePath;
     private String format;
     private Map<String, String> properties;
@@ -358,7 +358,7 @@ public class OutFileClause {
                 case STRING:
                     if 
(!schema.second.equals(resultType.getPrimitiveType().toString().toLowerCase())) 
{
                         throw new AnalysisException("project field type is " + 
resultType.getPrimitiveType().toString()
-                                + ", should use "  + 
resultType.getPrimitiveType().toString() +  ","
+                                + ", should use " + 
resultType.getPrimitiveType().toString() + ","
                                 + " but the type of column " + i + " is " + 
schema.second);
                     }
                     break;
@@ -454,13 +454,20 @@ public class OutFileClause {
                                 + " but the definition type of column " + i + 
" is " + type);
                     }
                     break;
-                case CHAR:
-                case VARCHAR:
-                case STRING:
                 case DECIMAL32:
                 case DECIMAL64:
-                case DECIMAL128:
+                case DECIMAL128: {
+                    if 
(!PARQUET_DATA_TYPE_MAP.get("fixed_len_byte_array").equals(type)) {
+                        throw new AnalysisException("project field type is 
DECIMAL"
+                                + ", should use fixed_len_byte_array, but the 
definition type of column "
+                                + i + " is " + type);
+                    }
+                    break;
+                }
                 case DECIMALV2:
+                case CHAR:
+                case VARCHAR:
+                case STRING:
                 case DATETIMEV2:
                 case DATEV2:
                 case LARGEINT:
@@ -520,13 +527,16 @@ public class OutFileClause {
                 case DOUBLE:
                     parquetSchema.schema_data_type = 
PARQUET_DATA_TYPE_MAP.get("double");
                     break;
+                case DECIMAL32:
+                case DECIMAL64:
+                case DECIMAL128: {
+                    parquetSchema.schema_data_type = 
PARQUET_DATA_TYPE_MAP.get("fixed_len_byte_array");
+                    break;
+                }
+                case DECIMALV2:
                 case CHAR:
                 case VARCHAR:
                 case STRING:
-                case DECIMALV2:
-                case DECIMAL32:
-                case DECIMAL64:
-                case DECIMAL128:
                 case DATETIMEV2:
                 case DATEV2:
                 case LARGEINT:
@@ -545,6 +555,12 @@ public class OutFileClause {
             }
 
             switch (expr.getType().getPrimitiveType()) {
+                case DECIMAL32:
+                case DECIMAL64:
+                case DECIMAL128: {
+                    parquetSchema.schema_data_logical_type = 
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("decimal");
+                    break;
+                }
                 case DATE:
                     parquetSchema.schema_data_logical_type = 
PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("date");
                     break;
@@ -884,5 +900,3 @@ public class OutFileClause {
         return sinkOptions;
     }
 }
-
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to