Gabriel39 commented on code in PR #12730:
URL: https://github.com/apache/doris/pull/12730#discussion_r975329125


##########
be/src/vec/runtime/vparquet_writer.cpp:
##########
@@ -35,207 +36,54 @@
 
 namespace doris::vectorized {
 
-VParquetWriterWrapper::VParquetWriterWrapper(doris::FileWriter* file_writer,
-                                             const std::vector<VExprContext*>& 
output_vexpr_ctxs,
-                                             const std::map<std::string, 
std::string>& properties,
-                                             const 
std::vector<std::vector<std::string>>& schema,
-                                             bool output_object_data)
+VParquetWriterWrapper::VParquetWriterWrapper(
+        doris::FileWriter* file_writer, const std::vector<VExprContext*>& 
output_vexpr_ctxs,
+        const std::vector<TParquetRepetitionType::type>& 
schemas_repetition_type,
+        const std::vector<TParquetDataType::type>& schemas_data_type,
+        const std::vector<std::string>& schemas_column_name,
+        const TParquetCompressionType::type& compression_type,
+        const bool& parquet_disable_dictionary, const TParquetVersion::type& 
parquet_version,
+        bool output_object_data)
         : _output_vexpr_ctxs(output_vexpr_ctxs),
-          _str_schema(schema),
           _cur_written_rows(0),
           _rg_writer(nullptr),
           _output_object_data(output_object_data) {
     _outstream = std::shared_ptr<ParquetOutputStream>(new 
ParquetOutputStream(file_writer));
-    parse_properties(properties);
+    parse_properties(compression_type, parquet_disable_dictionary, 
parquet_version);
+    parse_schema(schemas_repetition_type, schemas_data_type, 
schemas_column_name);
 }
 
-void VParquetWriterWrapper::parse_properties(
-        const std::map<std::string, std::string>& propertie_map) {
+void VParquetWriterWrapper::parse_properties(const 
TParquetCompressionType::type& compression_type,
+                                             const bool& 
parquet_disable_dictionary,
+                                             const TParquetVersion::type& 
parquet_version) {
     parquet::WriterProperties::Builder builder;
-    for (auto it = propertie_map.begin(); it != propertie_map.end(); it++) {
-        std::string property_name = it->first;
-        std::string property_value = it->second;
-        if (property_name == "compression") {
-            // UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2
-            if (property_value == "snappy") {
-                builder.compression(parquet::Compression::SNAPPY);
-            } else if (property_value == "gzip") {
-                builder.compression(parquet::Compression::GZIP);
-            } else if (property_value == "brotli") {
-                builder.compression(parquet::Compression::BROTLI);
-            } else if (property_value == "zstd") {
-                builder.compression(parquet::Compression::ZSTD);
-            } else if (property_value == "lz4") {
-                builder.compression(parquet::Compression::LZ4);
-            } else if (property_value == "lzo") {
-                builder.compression(parquet::Compression::LZO);
-            } else if (property_value == "bz2") {
-                builder.compression(parquet::Compression::BZ2);
-            } else {
-                builder.compression(parquet::Compression::UNCOMPRESSED);
-            }
-        } else if (property_name == "disable_dictionary") {
-            if (property_value == "true") {
-                builder.enable_dictionary();
-            } else {
-                builder.disable_dictionary();
-            }
-        } else if (property_name == "version") {
-            if (property_value == "v1") {
-                builder.version(parquet::ParquetVersion::PARQUET_1_0);
-            } else {
-                builder.version(parquet::ParquetVersion::PARQUET_2_LATEST);
-            }
-        }
+    ParquetBuildHelper::build_compression_type(builder, compression_type);
+    ParquetBuildHelper::build_version(builder, parquet_version);
+    if (parquet_disable_dictionary) {
+        builder.disable_dictionary();
+    } else {
+        builder.enable_dictionary();
     }
     _properties = builder.build();
 }
 
-Status VParquetWriterWrapper::parse_schema(const 
std::vector<std::vector<std::string>>& schema) {
+void VParquetWriterWrapper::parse_schema(
+        const std::vector<TParquetRepetitionType::type>& 
schemas_repetition_type,
+        const std::vector<TParquetDataType::type>& schemas_data_type,
+        const std::vector<std::string>& schemas_column_name) {
     parquet::schema::NodeVector fields;
-    for (auto column = schema.begin(); column != schema.end(); column++) {
-        std::string repetition_type = (*column)[0];
-        parquet::Repetition::type parquet_repetition_type = 
parquet::Repetition::REQUIRED;
-        if (repetition_type.find("required") != std::string::npos) {
-            parquet_repetition_type = parquet::Repetition::REQUIRED;
-        } else if (repetition_type.find("repeated") != std::string::npos) {
-            parquet_repetition_type = parquet::Repetition::REPEATED;
-        } else if (repetition_type.find("optional") != std::string::npos) {
-            parquet_repetition_type = parquet::Repetition::OPTIONAL;
-        } else {
-            parquet_repetition_type = parquet::Repetition::UNDEFINED;
-        }
-
-        std::string data_type = (*column)[1];
-        parquet::Type::type parquet_data_type = parquet::Type::BYTE_ARRAY;
-        if (data_type == "boolean") {
-            parquet_data_type = parquet::Type::BOOLEAN;
-        } else if (data_type.find("int32") != std::string::npos) {
-            parquet_data_type = parquet::Type::INT32;
-        } else if (data_type.find("int64") != std::string::npos) {
-            parquet_data_type = parquet::Type::INT64;
-        } else if (data_type.find("int96") != std::string::npos) {
-            parquet_data_type = parquet::Type::INT96;
-        } else if (data_type.find("float") != std::string::npos) {
-            parquet_data_type = parquet::Type::FLOAT;
-        } else if (data_type.find("double") != std::string::npos) {
-            parquet_data_type = parquet::Type::DOUBLE;
-        } else if (data_type.find("byte_array") != std::string::npos) {
-            parquet_data_type = parquet::Type::BYTE_ARRAY;
-        } else if (data_type.find("fixed_len_byte_array") != 
std::string::npos) {
-            parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
-        } else {
-            parquet_data_type = parquet::Type::UNDEFINED;
-        }
-
-        std::string column_name = (*column)[2];
-        fields.push_back(parquet::schema::PrimitiveNode::Make(column_name, 
parquet_repetition_type,
-                                                              
parquet::LogicalType::None(),
-                                                              
parquet_data_type));
+    parquet::Repetition::type parquet_repetition_type;
+    parquet::Type::type parquet_data_type;
+    for (int idx = 0; idx < schemas_column_name.size(); ++idx) {
+        
ParquetBuildHelper::build_schema_repetition_type(parquet_repetition_type,
+                                                         
schemas_repetition_type[idx]);
+        ParquetBuildHelper::build_schema_data_type(parquet_data_type, 
schemas_data_type[idx]);
+        fields.push_back(parquet::schema::PrimitiveNode::Make(
+                schemas_column_name[idx], parquet_repetition_type, 
parquet::LogicalType::None(),
+                parquet_data_type));
         _schema = std::static_pointer_cast<parquet::schema::GroupNode>(
                 parquet::schema::GroupNode::Make("schema", 
parquet::Repetition::REQUIRED, fields));
     }
-    return Status::OK();
-}
-
-Status VParquetWriterWrapper::init() {
-    RETURN_IF_ERROR(parse_schema(_str_schema));
-    RETURN_IF_ERROR(init_parquet_writer());

Review Comment:
   After remove `init_parquet_writer`,  `_writer` will be never initialized?



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java:
##########
@@ -71,18 +75,30 @@ public class OutFileClause {
         RESULT_COL_TYPES.add(PrimitiveType.BIGINT);
         RESULT_COL_TYPES.add(PrimitiveType.VARCHAR);
 
-        PARQUET_REPETITION_TYPES.add("required");
-        PARQUET_REPETITION_TYPES.add("repeated");
-        PARQUET_REPETITION_TYPES.add("optional");
-
-        PARQUET_DATA_TYPES.add("boolean");
-        PARQUET_DATA_TYPES.add("int32");
-        PARQUET_DATA_TYPES.add("int64");
-        PARQUET_DATA_TYPES.add("int96");
-        PARQUET_DATA_TYPES.add("byte_array");
-        PARQUET_DATA_TYPES.add("float");
-        PARQUET_DATA_TYPES.add("double");
-        PARQUET_DATA_TYPES.add("fixed_len_byte_array");
+        PARQUET_REPETITION_TYPE_MAP.put("required", 
TParquetRepetitionType.REQUIRED);
+        PARQUET_REPETITION_TYPE_MAP.put("repeated", 
TParquetRepetitionType.REPEATED);
+        PARQUET_REPETITION_TYPE_MAP.put("optional", 
TParquetRepetitionType.OPTIONAL);
+
+        PARQUET_DATA_TYPE_MAP.put("boolean", TParquetDataType.BOOLEAN);
+        PARQUET_DATA_TYPE_MAP.put("int32", TParquetDataType.INT32);
+        PARQUET_DATA_TYPE_MAP.put("int64", TParquetDataType.INT64);
+        PARQUET_DATA_TYPE_MAP.put("int96", TParquetDataType.INT96);
+        PARQUET_DATA_TYPE_MAP.put("byte_array", TParquetDataType.BYTE_ARRAY);
+        PARQUET_DATA_TYPE_MAP.put("float", TParquetDataType.FLOAT);
+        PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE);
+        PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array", 
TParquetDataType.FIXED_LEN_BYTE_ARRAY);
+
+        PARQUET_COMPRESSION_TYPE_MAP.put("snappy", 
TParquetCompressionType.SNAPPY);
+        PARQUET_COMPRESSION_TYPE_MAP.put("gzip", TParquetCompressionType.GZIP);
+        PARQUET_COMPRESSION_TYPE_MAP.put("brotli", 
TParquetCompressionType.BROTLI);
+        PARQUET_COMPRESSION_TYPE_MAP.put("zstd", TParquetCompressionType.ZSTD);
+        PARQUET_COMPRESSION_TYPE_MAP.put("lz4", TParquetCompressionType.LZ4);
+        PARQUET_COMPRESSION_TYPE_MAP.put("lzo", TParquetCompressionType.LZO);
+        PARQUET_COMPRESSION_TYPE_MAP.put("bz2", TParquetCompressionType.BZ2);
+        PARQUET_COMPRESSION_TYPE_MAP.put("INVALID", 
TParquetCompressionType.UNCOMPRESSED);
+
+        PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0);
+        PARQUET_VERSION_MAP.put("LATEST", TParquetVersion.PARQUET_2_LATEST);

Review Comment:
   why use upper case here?



##########
be/src/exec/parquet_writer.cpp:
##########
@@ -40,6 +40,124 @@ ParquetOutputStream::~ParquetOutputStream() {
     }
 }
 
+void ParquetBuildHelper::build_schema_repetition_type(
+        parquet::Repetition::type& parquet_repetition_type,
+        const TParquetRepetitionType::type& column_repetition_type) {
+    switch (column_repetition_type) {
+    case TParquetRepetitionType::REQUIRED: {
+        parquet_repetition_type = parquet::Repetition::REQUIRED;
+        break;
+    }
+    case TParquetRepetitionType::REPEATED: {
+        parquet_repetition_type = parquet::Repetition::REPEATED;
+        break;
+    }
+    case TParquetRepetitionType::OPTIONAL: {
+        parquet_repetition_type = parquet::Repetition::OPTIONAL;
+        break;
+    }
+    default:
+        parquet_repetition_type = parquet::Repetition::UNDEFINED;
+    }
+}
+
+void ParquetBuildHelper::build_schema_data_type(parquet::Type::type& 
parquet_data_type,
+                                                const TParquetDataType::type& 
column_data_type) {
+    switch (column_data_type) {
+    case TParquetDataType::BOOLEAN: {
+        parquet_data_type = parquet::Type::BOOLEAN;
+        break;
+    }
+    case TParquetDataType::INT32: {
+        parquet_data_type = parquet::Type::INT32;
+        break;
+    }
+    case TParquetDataType::INT64: {
+        parquet_data_type = parquet::Type::INT64;
+        break;
+    }
+    case TParquetDataType::INT96: {
+        parquet_data_type = parquet::Type::INT96;
+        break;
+    }
+    case TParquetDataType::BYTE_ARRAY: {
+        parquet_data_type = parquet::Type::BYTE_ARRAY;
+        break;
+    }
+    case TParquetDataType::FLOAT: {
+        parquet_data_type = parquet::Type::FLOAT;
+        break;
+    }
+    case TParquetDataType::DOUBLE: {
+        parquet_data_type = parquet::Type::DOUBLE;
+        break;
+    }
+    case TParquetDataType::FIXED_LEN_BYTE_ARRAY: {
+        parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
+        break;
+    }
+    default:
+        parquet_data_type = parquet::Type::UNDEFINED;
+    }
+}
+
+void ParquetBuildHelper::build_compression_type(
+        parquet::WriterProperties::Builder& builder,
+        const TParquetCompressionType::type& compression_type) {
+    switch (compression_type) {
+    case TParquetCompressionType::SNAPPY: {
+        builder.compression(parquet::Compression::SNAPPY);
+        break;
+    }
+    case TParquetCompressionType::GZIP: {
+        builder.compression(parquet::Compression::GZIP);
+        break;
+    }
+    case TParquetCompressionType::BROTLI: {
+        builder.compression(parquet::Compression::BROTLI);
+        break;
+    }
+    case TParquetCompressionType::ZSTD: {
+        builder.compression(parquet::Compression::ZSTD);
+        break;
+    }
+    case TParquetCompressionType::LZ4: {
+        builder.compression(parquet::Compression::LZ4);
+        break;
+    }
+    case TParquetCompressionType::LZO: {
+        builder.compression(parquet::Compression::LZO);
+        break;
+    }
+    case TParquetCompressionType::BZ2: {
+        builder.compression(parquet::Compression::BZ2);
+        break;
+    }
+    case TParquetCompressionType::UNCOMPRESSED: {
+        builder.compression(parquet::Compression::UNCOMPRESSED);
+        break;
+    }
+    default:
+        builder.compression(parquet::Compression::UNCOMPRESSED);
+    }
+}
+
+void ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& 
builder,
+                                       const TParquetVersion::type& 
parquet_version) {
+    switch (parquet_version) {
+    case TParquetVersion::PARQUET_1_0: {
+        builder.version(parquet::ParquetVersion::PARQUET_1_0);
+        break;
+    }
+    case TParquetVersion::PARQUET_2_LATEST: {
+        builder.version(parquet::ParquetVersion::PARQUET_2_LATEST);
+        break;
+    }
+    default:
+        builder.version(parquet::ParquetVersion::PARQUET_1_0);

Review Comment:
   Default value should be `PARQUET_2_LATEST` ?



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java:
##########
@@ -71,18 +75,30 @@ public class OutFileClause {
         RESULT_COL_TYPES.add(PrimitiveType.BIGINT);
         RESULT_COL_TYPES.add(PrimitiveType.VARCHAR);
 
-        PARQUET_REPETITION_TYPES.add("required");
-        PARQUET_REPETITION_TYPES.add("repeated");
-        PARQUET_REPETITION_TYPES.add("optional");
-
-        PARQUET_DATA_TYPES.add("boolean");
-        PARQUET_DATA_TYPES.add("int32");
-        PARQUET_DATA_TYPES.add("int64");
-        PARQUET_DATA_TYPES.add("int96");
-        PARQUET_DATA_TYPES.add("byte_array");
-        PARQUET_DATA_TYPES.add("float");
-        PARQUET_DATA_TYPES.add("double");
-        PARQUET_DATA_TYPES.add("fixed_len_byte_array");
+        PARQUET_REPETITION_TYPE_MAP.put("required", 
TParquetRepetitionType.REQUIRED);
+        PARQUET_REPETITION_TYPE_MAP.put("repeated", 
TParquetRepetitionType.REPEATED);
+        PARQUET_REPETITION_TYPE_MAP.put("optional", 
TParquetRepetitionType.OPTIONAL);
+
+        PARQUET_DATA_TYPE_MAP.put("boolean", TParquetDataType.BOOLEAN);
+        PARQUET_DATA_TYPE_MAP.put("int32", TParquetDataType.INT32);
+        PARQUET_DATA_TYPE_MAP.put("int64", TParquetDataType.INT64);
+        PARQUET_DATA_TYPE_MAP.put("int96", TParquetDataType.INT96);
+        PARQUET_DATA_TYPE_MAP.put("byte_array", TParquetDataType.BYTE_ARRAY);
+        PARQUET_DATA_TYPE_MAP.put("float", TParquetDataType.FLOAT);
+        PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE);
+        PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array", 
TParquetDataType.FIXED_LEN_BYTE_ARRAY);
+
+        PARQUET_COMPRESSION_TYPE_MAP.put("snappy", 
TParquetCompressionType.SNAPPY);
+        PARQUET_COMPRESSION_TYPE_MAP.put("gzip", TParquetCompressionType.GZIP);
+        PARQUET_COMPRESSION_TYPE_MAP.put("brotli", 
TParquetCompressionType.BROTLI);
+        PARQUET_COMPRESSION_TYPE_MAP.put("zstd", TParquetCompressionType.ZSTD);
+        PARQUET_COMPRESSION_TYPE_MAP.put("lz4", TParquetCompressionType.LZ4);
+        PARQUET_COMPRESSION_TYPE_MAP.put("lzo", TParquetCompressionType.LZO);
+        PARQUET_COMPRESSION_TYPE_MAP.put("bz2", TParquetCompressionType.BZ2);
+        PARQUET_COMPRESSION_TYPE_MAP.put("INVALID", 
TParquetCompressionType.UNCOMPRESSED);

Review Comment:
   why use upper case here?



##########
be/src/vec/runtime/vparquet_writer.h:
##########
@@ -45,36 +46,40 @@ class VParquetWriterWrapper {
 public:
     VParquetWriterWrapper(doris::FileWriter* file_writer,
                           const std::vector<VExprContext*>& output_vexpr_ctxs,
-                          const std::map<std::string, std::string>& properties,
-                          const std::vector<std::vector<std::string>>& schema,
-                          bool output_object_data);
-    virtual ~VParquetWriterWrapper();
+                          const std::vector<TParquetRepetitionType::type>& 
schemas_repetition_type,
+                          const std::vector<TParquetDataType::type>& 
schemas_data_type,
+                          const std::vector<std::string>& schemas_column_name,
+                          const TParquetCompressionType::type& 
compression_type,
+                          const bool& parquet_disable_dictionary,
+                          const TParquetVersion::type& parquet_version, bool 
output_object_data);
 
-    Status init();
+    virtual ~VParquetWriterWrapper() = default;

Review Comment:
   ```suggestion
       ~VParquetWriterWrapper() = default;
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java:
##########
@@ -118,12 +134,21 @@ public class OutFileClause {
     // If set to true, the brokerDesc must be null.
     private boolean isLocalOutput = false;
     private String successFileName = "";
-    private List<List<String>> schema = new ArrayList<>();
-    private Map<String, String> fileProperties = new HashMap<>();
+
+    private List<TParquetRepetitionType> schemasRepetitionType = new 
ArrayList<>();
+    private List<TParquetDataType> schemasDataType = new ArrayList<>();
+    private List<String> schemasColumnName = new ArrayList<>();
 
     private boolean isAnalyzed = false;
     private String headerType = "";
 
+    private static final String PARQUET_COMPRESSION = "compression";
+    private TParquetCompressionType parquetCompressionType = 
TParquetCompressionType.UNCOMPRESSED;
+    private static final String PARQUET_DISABLE_DICTIONARY = 
"disable_dictionary";
+    private boolean parquetDisableDictionary = true;
+    private static final String PARQUET_VERSION = "version";
+    private static TParquetVersion parquetVersion = 
TParquetVersion.PARQUET_1_0;

Review Comment:
   Default value should be PARQUET_2_LATEST ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to