zhangstar333 commented on code in PR #12730: URL: https://github.com/apache/doris/pull/12730#discussion_r976118119
########## be/src/vec/runtime/vparquet_writer.cpp: ########## @@ -35,207 +36,54 @@ namespace doris::vectorized { -VParquetWriterWrapper::VParquetWriterWrapper(doris::FileWriter* file_writer, - const std::vector<VExprContext*>& output_vexpr_ctxs, - const std::map<std::string, std::string>& properties, - const std::vector<std::vector<std::string>>& schema, - bool output_object_data) +VParquetWriterWrapper::VParquetWriterWrapper( + doris::FileWriter* file_writer, const std::vector<VExprContext*>& output_vexpr_ctxs, + const std::vector<TParquetRepetitionType::type>& schemas_repetition_type, + const std::vector<TParquetDataType::type>& schemas_data_type, + const std::vector<std::string>& schemas_column_name, + const TParquetCompressionType::type& compression_type, + const bool& parquet_disable_dictionary, const TParquetVersion::type& parquet_version, + bool output_object_data) : _output_vexpr_ctxs(output_vexpr_ctxs), - _str_schema(schema), _cur_written_rows(0), _rg_writer(nullptr), _output_object_data(output_object_data) { _outstream = std::shared_ptr<ParquetOutputStream>(new ParquetOutputStream(file_writer)); - parse_properties(properties); + parse_properties(compression_type, parquet_disable_dictionary, parquet_version); + parse_schema(schemas_repetition_type, schemas_data_type, schemas_column_name); } -void VParquetWriterWrapper::parse_properties( - const std::map<std::string, std::string>& propertie_map) { +void VParquetWriterWrapper::parse_properties(const TParquetCompressionType::type& compression_type, + const bool& parquet_disable_dictionary, + const TParquetVersion::type& parquet_version) { parquet::WriterProperties::Builder builder; - for (auto it = propertie_map.begin(); it != propertie_map.end(); it++) { - std::string property_name = it->first; - std::string property_value = it->second; - if (property_name == "compression") { - // UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2 - if (property_value == "snappy") { - builder.compression(parquet::Compression::SNAPPY); - } else if (property_value == "gzip") { - builder.compression(parquet::Compression::GZIP); - } else if (property_value == "brotli") { - builder.compression(parquet::Compression::BROTLI); - } else if (property_value == "zstd") { - builder.compression(parquet::Compression::ZSTD); - } else if (property_value == "lz4") { - builder.compression(parquet::Compression::LZ4); - } else if (property_value == "lzo") { - builder.compression(parquet::Compression::LZO); - } else if (property_value == "bz2") { - builder.compression(parquet::Compression::BZ2); - } else { - builder.compression(parquet::Compression::UNCOMPRESSED); - } - } else if (property_name == "disable_dictionary") { - if (property_value == "true") { - builder.enable_dictionary(); - } else { - builder.disable_dictionary(); - } - } else if (property_name == "version") { - if (property_value == "v1") { - builder.version(parquet::ParquetVersion::PARQUET_1_0); - } else { - builder.version(parquet::ParquetVersion::PARQUET_2_LATEST); - } - } + ParquetBuildHelper::build_compression_type(builder, compression_type); + ParquetBuildHelper::build_version(builder, parquet_version); + if (parquet_disable_dictionary) { + builder.disable_dictionary(); + } else { + builder.enable_dictionary(); } _properties = builder.build(); } -Status VParquetWriterWrapper::parse_schema(const std::vector<std::vector<std::string>>& schema) { +void VParquetWriterWrapper::parse_schema( + const std::vector<TParquetRepetitionType::type>& schemas_repetition_type, + const std::vector<TParquetDataType::type>& schemas_data_type, + const std::vector<std::string>& schemas_column_name) { parquet::schema::NodeVector fields; - for (auto column = schema.begin(); column != schema.end(); column++) { - std::string repetition_type = (*column)[0]; - parquet::Repetition::type parquet_repetition_type = parquet::Repetition::REQUIRED; - if (repetition_type.find("required") != std::string::npos) { - parquet_repetition_type = parquet::Repetition::REQUIRED; - } else if (repetition_type.find("repeated") != std::string::npos) { - parquet_repetition_type = parquet::Repetition::REPEATED; - } else if (repetition_type.find("optional") != std::string::npos) { - parquet_repetition_type = parquet::Repetition::OPTIONAL; - } else { - parquet_repetition_type = parquet::Repetition::UNDEFINED; - } - - std::string data_type = (*column)[1]; - parquet::Type::type parquet_data_type = parquet::Type::BYTE_ARRAY; - if (data_type == "boolean") { - parquet_data_type = parquet::Type::BOOLEAN; - } else if (data_type.find("int32") != std::string::npos) { - parquet_data_type = parquet::Type::INT32; - } else if (data_type.find("int64") != std::string::npos) { - parquet_data_type = parquet::Type::INT64; - } else if (data_type.find("int96") != std::string::npos) { - parquet_data_type = parquet::Type::INT96; - } else if (data_type.find("float") != std::string::npos) { - parquet_data_type = parquet::Type::FLOAT; - } else if (data_type.find("double") != std::string::npos) { - parquet_data_type = parquet::Type::DOUBLE; - } else if (data_type.find("byte_array") != std::string::npos) { - parquet_data_type = parquet::Type::BYTE_ARRAY; - } else if (data_type.find("fixed_len_byte_array") != std::string::npos) { - parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY; - } else { - parquet_data_type = parquet::Type::UNDEFINED; - } - - std::string column_name = (*column)[2]; - fields.push_back(parquet::schema::PrimitiveNode::Make(column_name, parquet_repetition_type, - parquet::LogicalType::None(), - parquet_data_type)); + parquet::Repetition::type parquet_repetition_type; + parquet::Type::type parquet_data_type; + for (int idx = 0; idx < schemas_column_name.size(); ++idx) { + ParquetBuildHelper::build_schema_repetition_type(parquet_repetition_type, + schemas_repetition_type[idx]); + ParquetBuildHelper::build_schema_data_type(parquet_data_type, schemas_data_type[idx]); + fields.push_back(parquet::schema::PrimitiveNode::Make( + schemas_column_name[idx], parquet_repetition_type, parquet::LogicalType::None(), + parquet_data_type)); _schema = std::static_pointer_cast<parquet::schema::GroupNode>( parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields)); } - return Status::OK(); -} - -Status VParquetWriterWrapper::init() { - RETURN_IF_ERROR(parse_schema(_str_schema)); - RETURN_IF_ERROR(init_parquet_writer()); Review Comment: have call `_vparquet_writer->init_parquet_writer()` directly after have new the object -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org