This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c2ff940947 [refactor](parquet)change decimal type export as fixed-len-byte on parquet write (#22792) c2ff940947 is described below commit c2ff940947d27e14dcfaad915b39a982512068e9 Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Tue Aug 15 13:17:50 2023 +0800 [refactor](parquet)change decimal type export as fixed-len-byte on parquet write (#22792) before the parquet write export decimal as byte-binary, but can't be import those fied to Hive. Now, change to export decimal as fixed-len-byte-array in order to import hive directly. --- be/src/vec/runtime/vparquet_writer.cpp | 167 ++++++++++++++++----- be/src/vec/runtime/vparquet_writer.h | 3 +- .../org/apache/doris/analysis/OutFileClause.java | 38 +++-- 3 files changed, 156 insertions(+), 52 deletions(-) diff --git a/be/src/vec/runtime/vparquet_writer.cpp b/be/src/vec/runtime/vparquet_writer.cpp index 13fd6d8b85..230abce0de 100644 --- a/be/src/vec/runtime/vparquet_writer.cpp +++ b/be/src/vec/runtime/vparquet_writer.cpp @@ -34,6 +34,7 @@ #include <string> #include "common/status.h" +#include "gutil/endian.h" #include "io/fs/file_writer.h" #include "olap/olap_common.h" #include "runtime/decimalv2_value.h" @@ -180,8 +181,31 @@ void ParquetBuildHelper::build_schema_data_type(parquet::Type::type& parquet_dat void ParquetBuildHelper::build_schema_data_logical_type( std::shared_ptr<const parquet::LogicalType>& parquet_data_logical_type_ptr, - const TParquetDataLogicalType::type& column_data_logical_type) { + const TParquetDataLogicalType::type& column_data_logical_type, int* primitive_length, + const TypeDescriptor& type_desc) { switch (column_data_logical_type) { + case TParquetDataLogicalType::DECIMAL: { + DCHECK(type_desc.precision != -1 && type_desc.scale != -1) + << "precision and scale: " << type_desc.precision << " " << type_desc.scale; + if (type_desc.type == TYPE_DECIMAL32) { + *primitive_length = 4; + } else if (type_desc.type == TYPE_DECIMAL64) { + *primitive_length = 8; + } else if (type_desc.type == TYPE_DECIMAL128I) { + *primitive_length = 16; + } else { + throw parquet::ParquetException( + "the logical decimal now only support in decimalv3, maybe error of " + + type_desc.debug_string()); + } + parquet_data_logical_type_ptr = + parquet::LogicalType::Decimal(type_desc.precision, type_desc.scale); + break; + } + case TParquetDataLogicalType::STRING: { + parquet_data_logical_type_ptr = parquet::LogicalType::String(); + break; + } case TParquetDataLogicalType::DATE: { parquet_data_logical_type_ptr = parquet::LogicalType::Date(); break; @@ -290,19 +314,22 @@ Status VParquetWriterWrapper::parse_properties() { Status VParquetWriterWrapper::parse_schema() { parquet::schema::NodeVector fields; parquet::Repetition::type parquet_repetition_type; - parquet::Type::type parquet_data_type; + parquet::Type::type parquet_physical_type; std::shared_ptr<const parquet::LogicalType> parquet_data_logical_type; + int primitive_length = -1; for (int idx = 0; idx < _parquet_schemas.size(); ++idx) { + primitive_length = -1; ParquetBuildHelper::build_schema_repetition_type( parquet_repetition_type, _parquet_schemas[idx].schema_repetition_type); - ParquetBuildHelper::build_schema_data_type(parquet_data_type, + ParquetBuildHelper::build_schema_data_type(parquet_physical_type, _parquet_schemas[idx].schema_data_type); ParquetBuildHelper::build_schema_data_logical_type( - parquet_data_logical_type, _parquet_schemas[idx].schema_data_logical_type); + parquet_data_logical_type, _parquet_schemas[idx].schema_data_logical_type, + &primitive_length, _output_vexpr_ctxs[idx]->root()->type()); try { fields.push_back(parquet::schema::PrimitiveNode::Make( _parquet_schemas[idx].schema_column_name, parquet_repetition_type, - parquet_data_logical_type, parquet_data_type)); + parquet_data_logical_type, parquet_physical_type, primitive_length)); } catch (const parquet::ParquetException& e) { LOG(WARNING) << "parquet writer parse schema error: " << e.what(); return Status::InternalError("parquet writer parse schema error: {}", e.what()); @@ -335,37 +362,6 @@ Status VParquetWriterWrapper::parse_schema() { RETURN_WRONG_TYPE \ } -#define DISPATCH_PARQUET_DECIMAL_WRITER(DECIMAL_TYPE) \ - parquet::RowGroupWriter* rgWriter = get_rg_writer(); \ - parquet::ByteArrayWriter* col_writer = \ - static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i)); \ - parquet::ByteArray value; \ - auto decimal_type = \ - check_and_get_data_type<DataTypeDecimal<DECIMAL_TYPE>>(remove_nullable(type).get()); \ - DCHECK(decimal_type); \ - if (null_map != nullptr) { \ - auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); \ - for (size_t row_id = 0; row_id < sz; row_id++) { \ - if (null_data[row_id] != 0) { \ - single_def_level = 0; \ - col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \ - single_def_level = 1; \ - } else { \ - auto s = decimal_type->to_string(*col, row_id); \ - value.ptr = reinterpret_cast<const uint8_t*>(s.data()); \ - value.len = s.size(); \ - col_writer->WriteBatch(1, &single_def_level, nullptr, &value); \ - } \ - } \ - } else { \ - for (size_t row_id = 0; row_id < sz; row_id++) { \ - auto s = decimal_type->to_string(*col, row_id); \ - value.ptr = reinterpret_cast<const uint8_t*>(s.data()); \ - value.len = s.size(); \ - col_writer->WriteBatch(1, nullable ? def_level.data() : nullptr, nullptr, &value); \ - } \ - } - #define DISPATCH_PARQUET_COMPLEX_WRITER(COLUMN_TYPE) \ parquet::RowGroupWriter* rgWriter = get_rg_writer(); \ parquet::ByteArrayWriter* col_writer = \ @@ -791,15 +787,108 @@ Status VParquetWriterWrapper::write(const Block& block) { break; } case TYPE_DECIMAL32: { - DISPATCH_PARQUET_DECIMAL_WRITER(Decimal32) + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::FixedLenByteArrayWriter* col_writer = + static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i)); + parquet::FixedLenByteArray value; + auto decimal_type = check_and_get_data_type<DataTypeDecimal<Decimal32>>( + remove_nullable(type).get()); + DCHECK(decimal_type); + if (null_map != nullptr) { + auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); + const auto& data_column = assert_cast<const ColumnDecimal32&>(*col); + for (size_t row_id = 0; row_id < sz; row_id++) { + if (null_data[row_id] != 0) { + single_def_level = 0; + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); + single_def_level = 1; + } else { + auto data = data_column.get_element(row_id); + auto big_endian = bswap_32(data); + value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); + } + } + } else { + const auto& data_column = assert_cast<const ColumnDecimal32&>(*col); + for (size_t row_id = 0; row_id < sz; row_id++) { + auto data = data_column.get_element(row_id); + auto big_endian = bswap_32(data); + value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); + col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, + &value); + } + } break; } case TYPE_DECIMAL64: { - DISPATCH_PARQUET_DECIMAL_WRITER(Decimal64) + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::FixedLenByteArrayWriter* col_writer = + static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i)); + parquet::FixedLenByteArray value; + auto decimal_type = check_and_get_data_type<DataTypeDecimal<Decimal64>>( + remove_nullable(type).get()); + DCHECK(decimal_type); + if (null_map != nullptr) { + auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); + const auto& data_column = assert_cast<const ColumnDecimal64&>(*col); + for (size_t row_id = 0; row_id < sz; row_id++) { + if (null_data[row_id] != 0) { + single_def_level = 0; + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); + single_def_level = 1; + } else { + auto data = data_column.get_element(row_id); + auto big_endian = bswap_64(data); + value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); + } + } + } else { + const auto& data_column = assert_cast<const ColumnDecimal64&>(*col); + for (size_t row_id = 0; row_id < sz; row_id++) { + auto data = data_column.get_element(row_id); + auto big_endian = bswap_64(data); + value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); + col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, + &value); + } + } break; } case TYPE_DECIMAL128I: { - DISPATCH_PARQUET_DECIMAL_WRITER(Decimal128I) + parquet::RowGroupWriter* rgWriter = get_rg_writer(); + parquet::FixedLenByteArrayWriter* col_writer = + static_cast<parquet::FixedLenByteArrayWriter*>(rgWriter->column(i)); + parquet::FixedLenByteArray value; + auto decimal_type = check_and_get_data_type<DataTypeDecimal<Decimal128I>>( + remove_nullable(type).get()); + DCHECK(decimal_type); + if (null_map != nullptr) { + auto& null_data = assert_cast<const ColumnUInt8&>(*null_map).get_data(); + const auto& data_column = assert_cast<const ColumnDecimal128I&>(*col); + for (size_t row_id = 0; row_id < sz; row_id++) { + if (null_data[row_id] != 0) { + single_def_level = 0; + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); + single_def_level = 1; + } else { + auto data = data_column.get_element(row_id); + auto big_endian = gbswap_128(data); + value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); + col_writer->WriteBatch(1, &single_def_level, nullptr, &value); + } + } + } else { + const auto& data_column = assert_cast<const ColumnDecimal128I&>(*col); + for (size_t row_id = 0; row_id < sz; row_id++) { + auto data = data_column.get_element(row_id); + auto big_endian = gbswap_128(data); + value.ptr = reinterpret_cast<const uint8_t*>(&big_endian); + col_writer->WriteBatch(1, nullable ? &single_def_level : nullptr, nullptr, + &value); + } + } break; } default: { diff --git a/be/src/vec/runtime/vparquet_writer.h b/be/src/vec/runtime/vparquet_writer.h index 6e07aa0e44..a79ab6ebc4 100644 --- a/be/src/vec/runtime/vparquet_writer.h +++ b/be/src/vec/runtime/vparquet_writer.h @@ -86,7 +86,8 @@ public: const TParquetVersion::type& parquet_version); static void build_schema_data_logical_type( std::shared_ptr<const parquet::LogicalType>& parquet_data_logical_type_ptr, - const TParquetDataLogicalType::type& column_data_logical_type); + const TParquetDataLogicalType::type& column_data_logical_type, int* primitive_length, + const TypeDescriptor& type_desc); }; class VFileWriterWrapper { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 85c40d79bc..24ed977a7d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -99,6 +99,7 @@ public class OutFileClause { PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE); PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array", TParquetDataType.FIXED_LEN_BYTE_ARRAY); + PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("decimal", TParquetDataLogicalType.DECIMAL); PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("date", TParquetDataLogicalType.DATE); PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.put("datetime", TParquetDataLogicalType.TIMESTAMP); // TODO(ftw): add other logical type @@ -145,7 +146,6 @@ public class OutFileClause { private static final long MIN_FILE_SIZE_BYTES = 5 * 1024 * 1024L; // 5MB private static final long MAX_FILE_SIZE_BYTES = 2 * 1024 * 1024 * 1024L; // 2GB - private String filePath; private String format; private Map<String, String> properties; @@ -358,7 +358,7 @@ public class OutFileClause { case STRING: if (!schema.second.equals(resultType.getPrimitiveType().toString().toLowerCase())) { throw new AnalysisException("project field type is " + resultType.getPrimitiveType().toString() - + ", should use " + resultType.getPrimitiveType().toString() + "," + + ", should use " + resultType.getPrimitiveType().toString() + "," + " but the type of column " + i + " is " + schema.second); } break; @@ -454,13 +454,20 @@ public class OutFileClause { + " but the definition type of column " + i + " is " + type); } break; - case CHAR: - case VARCHAR: - case STRING: case DECIMAL32: case DECIMAL64: - case DECIMAL128: + case DECIMAL128: { + if (!PARQUET_DATA_TYPE_MAP.get("fixed_len_byte_array").equals(type)) { + throw new AnalysisException("project field type is DECIMAL" + + ", should use fixed_len_byte_array, but the definition type of column " + + i + " is " + type); + } + break; + } case DECIMALV2: + case CHAR: + case VARCHAR: + case STRING: case DATETIMEV2: case DATEV2: case LARGEINT: @@ -520,13 +527,16 @@ public class OutFileClause { case DOUBLE: parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("double"); break; + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: { + parquetSchema.schema_data_type = PARQUET_DATA_TYPE_MAP.get("fixed_len_byte_array"); + break; + } + case DECIMALV2: case CHAR: case VARCHAR: case STRING: - case DECIMALV2: - case DECIMAL32: - case DECIMAL64: - case DECIMAL128: case DATETIMEV2: case DATEV2: case LARGEINT: @@ -545,6 +555,12 @@ public class OutFileClause { } switch (expr.getType().getPrimitiveType()) { + case DECIMAL32: + case DECIMAL64: + case DECIMAL128: { + parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("decimal"); + break; + } case DATE: parquetSchema.schema_data_logical_type = PARQUET_DATA_LOGICAL_TYPE_TYPE_MAP.get("date"); break; @@ -884,5 +900,3 @@ public class OutFileClause { return sinkOptions; } } - - --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org