This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new d86a9dc481 [feature](outfile) support parquet writer (#12492) (#12873)
d86a9dc481 is described below

commit d86a9dc481b9f9ac5903641b84d44670463d4f2d
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Sep 23 08:52:17 2022 +0800

    [feature](outfile) support parquet writer (#12492) (#12873)
---
 be/src/vec/CMakeLists.txt                  |   3 +-
 be/src/vec/runtime/vfile_result_writer.cpp |  43 +-
 be/src/vec/runtime/vfile_result_writer.h   |   8 +-
 be/src/vec/runtime/vparquet_writer.cpp     | 654 +++++++++++++++++++++++++++++
 be/src/vec/runtime/vparquet_writer.h       |  84 ++++
 5 files changed, 773 insertions(+), 19 deletions(-)

diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 56cfdfcc14..a8355d8a0d 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -182,7 +182,8 @@ set(VEC_FILES
   runtime/vdata_stream_mgr.cpp
   runtime/vfile_result_writer.cpp
   runtime/vpartition_info.cpp
-  runtime/vsorted_run_merger.cpp)
+  runtime/vsorted_run_merger.cpp
+  runtime/vparquet_writer.cpp)
 
 add_library(Vec STATIC
     ${VEC_FILES}
diff --git a/be/src/vec/runtime/vfile_result_writer.cpp 
b/be/src/vec/runtime/vfile_result_writer.cpp
index 6d4ecb8db1..7bc56607b8 100644
--- a/be/src/vec/runtime/vfile_result_writer.cpp
+++ b/be/src/vec/runtime/vfile_result_writer.cpp
@@ -56,7 +56,8 @@ VFileResultWriter::VFileResultWriter(
           _parent_profile(parent_profile),
           _sinker(sinker),
           _output_block(output_block),
-          _output_row_descriptor(output_row_descriptor) {
+          _output_row_descriptor(output_row_descriptor),
+          _vparquet_writer(nullptr) {
     _output_object_data = output_object_data;
 }
 
@@ -129,7 +130,10 @@ Status VFileResultWriter::_create_file_writer(const 
std::string& file_name) {
         // just use file writer is enough
         break;
     case TFileFormatType::FORMAT_PARQUET:
-        return Status::NotSupported("Parquet Writer is not supported yet!");
+        _vparquet_writer.reset(new VParquetWriterWrapper(
+                _file_writer_impl.get(), _output_vexpr_ctxs, 
_file_opts->file_properties,
+                _file_opts->schema, _output_object_data));
+        RETURN_IF_ERROR(_vparquet_writer->init());
         break;
     default:
         return Status::InternalError(
@@ -195,18 +199,18 @@ Status VFileResultWriter::append_block(Block& block) {
         return Status::OK();
     }
     SCOPED_TIMER(_append_row_batch_timer);
-    if (_parquet_writer != nullptr) {
-        return Status::NotSupported("Parquet Writer is not supported yet!");
+    Status status = Status::OK();
+    // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
+    // failed, just return the error status
+    auto output_block =
+            
VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, block, 
status);
+    auto num_rows = output_block.rows();
+    if (UNLIKELY(num_rows == 0)) {
+        return status;
+    }
+    if (_vparquet_writer) {
+        _write_parquet_file(output_block);
     } else {
-        Status status = Status::OK();
-        // Exec vectorized expr here to speed up, block.rows() == 0 means expr 
exec
-        // failed, just return the error status
-        auto output_block = 
VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
-                                                                               
block, status);
-        auto num_rows = output_block.rows();
-        if (UNLIKELY(num_rows == 0)) {
-            return status;
-        }
         RETURN_IF_ERROR(_write_csv_file(output_block));
     }
 
@@ -214,6 +218,12 @@ Status VFileResultWriter::append_block(Block& block) {
     return Status::OK();
 }
 
+Status VFileResultWriter::_write_parquet_file(const Block& block) {
+    RETURN_IF_ERROR(_vparquet_writer->write(block));
+    // split file if exceed limit
+    return _create_new_file_if_exceed_size();
+}
+
 Status VFileResultWriter::_write_csv_file(const Block& block) {
     for (size_t i = 0; i < block.rows(); i++) {
         for (size_t col_id = 0; col_id < block.columns(); col_id++) {
@@ -348,8 +358,11 @@ Status 
VFileResultWriter::_create_new_file_if_exceed_size() {
 }
 
 Status VFileResultWriter::_close_file_writer(bool done) {
-    if (_parquet_writer != nullptr) {
-        return Status::NotSupported("Parquet Writer is not supported yet!");
+    if (_vparquet_writer) {
+        _vparquet_writer->close();
+        _current_written_bytes = _vparquet_writer->written_len();
+        COUNTER_UPDATE(_written_data_bytes, _current_written_bytes);
+        _vparquet_writer.reset(nullptr);
     } else if (_file_writer_impl) {
         _file_writer_impl->close();
     }
diff --git a/be/src/vec/runtime/vfile_result_writer.h 
b/be/src/vec/runtime/vfile_result_writer.h
index 5f0bb7971e..a3a4e59a17 100644
--- a/be/src/vec/runtime/vfile_result_writer.h
+++ b/be/src/vec/runtime/vfile_result_writer.h
@@ -20,6 +20,7 @@
 #include "exec/file_writer.h"
 #include "runtime/file_result_writer.h"
 #include "vec/sink/result_sink.h"
+#include "vec/runtime/vparquet_writer.h"
 
 namespace doris {
 
@@ -48,6 +49,7 @@ public:
     int64_t get_written_rows() const override { return 1; }
 
 private:
+    Status _write_parquet_file(const Block& block);
     Status _write_csv_file(const Block& block);
 
     // if buffer exceed the limit, write the data buffered in 
_plain_text_outstream via file_writer
@@ -81,9 +83,7 @@ private:
 
     // If the result file format is plain text, like CSV, this _file_writer is 
owned by this FileResultWriter.
     // If the result file format is Parquet, this _file_writer is owned by 
_parquet_writer.
-    std::unique_ptr<FileWriter> _file_writer_impl;
-    // parquet file writer
-    ParquetWriterWrapper* _parquet_writer = nullptr;
+    std::unique_ptr<doris::FileWriter> _file_writer_impl;
     // Used to buffer the export data of plain text
     // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid 
calling
     // file writer's write() for every single row.
@@ -119,6 +119,8 @@ private:
     bool _is_result_sent = false;
     bool _header_sent = false;
     RowDescriptor _output_row_descriptor;
+    // parquet file writer
+    std::unique_ptr<VParquetWriterWrapper> _vparquet_writer;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/runtime/vparquet_writer.cpp 
b/be/src/vec/runtime/vparquet_writer.cpp
new file mode 100644
index 0000000000..8e376a8149
--- /dev/null
+++ b/be/src/vec/runtime/vparquet_writer.cpp
@@ -0,0 +1,654 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vparquet_writer.h"
+
+#include <arrow/array.h>
+#include <arrow/status.h>
+#include <time.h>
+
+#include "io/file_writer.h"
+#include "util/mysql_global.h"
+#include "util/types.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/functions/function_helpers.h"
+
+namespace doris::vectorized {
+
+VParquetWriterWrapper::VParquetWriterWrapper(doris::FileWriter* file_writer,
+                                             const std::vector<VExprContext*>& 
output_vexpr_ctxs,
+                                             const std::map<std::string, 
std::string>& properties,
+                                             const 
std::vector<std::vector<std::string>>& schema,
+                                             bool output_object_data)
+        : _output_vexpr_ctxs(output_vexpr_ctxs),
+          _str_schema(schema),
+          _cur_written_rows(0),
+          _rg_writer(nullptr),
+          _output_object_data(output_object_data) {
+    _outstream = std::shared_ptr<ParquetOutputStream>(new 
ParquetOutputStream(file_writer));
+    parse_properties(properties);
+}
+
+void VParquetWriterWrapper::parse_properties(
+        const std::map<std::string, std::string>& propertie_map) {
+    parquet::WriterProperties::Builder builder;
+    for (auto it = propertie_map.begin(); it != propertie_map.end(); it++) {
+        std::string property_name = it->first;
+        std::string property_value = it->second;
+        if (property_name == "compression") {
+            // UNCOMPRESSED, SNAPPY, GZIP, BROTLI, ZSTD, LZ4, LZO, BZ2
+            if (property_value == "snappy") {
+                builder.compression(parquet::Compression::SNAPPY);
+            } else if (property_value == "gzip") {
+                builder.compression(parquet::Compression::GZIP);
+            } else if (property_value == "brotli") {
+                builder.compression(parquet::Compression::BROTLI);
+            } else if (property_value == "zstd") {
+                builder.compression(parquet::Compression::ZSTD);
+            } else if (property_value == "lz4") {
+                builder.compression(parquet::Compression::LZ4);
+            } else if (property_value == "lzo") {
+                builder.compression(parquet::Compression::LZO);
+            } else if (property_value == "bz2") {
+                builder.compression(parquet::Compression::BZ2);
+            } else {
+                builder.compression(parquet::Compression::UNCOMPRESSED);
+            }
+        } else if (property_name == "disable_dictionary") {
+            if (property_value == "true") {
+                builder.enable_dictionary();
+            } else {
+                builder.disable_dictionary();
+            }
+        } else if (property_name == "version") {
+            if (property_value == "v1") {
+                builder.version(parquet::ParquetVersion::PARQUET_1_0);
+            } else {
+                builder.version(parquet::ParquetVersion::PARQUET_2_LATEST);
+            }
+        }
+    }
+    _properties = builder.build();
+}
+
+Status VParquetWriterWrapper::parse_schema(const 
std::vector<std::vector<std::string>>& schema) {
+    parquet::schema::NodeVector fields;
+    for (auto column = schema.begin(); column != schema.end(); column++) {
+        std::string repetition_type = (*column)[0];
+        parquet::Repetition::type parquet_repetition_type = 
parquet::Repetition::REQUIRED;
+        if (repetition_type.find("required") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::REQUIRED;
+        } else if (repetition_type.find("repeated") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::REPEATED;
+        } else if (repetition_type.find("optional") != std::string::npos) {
+            parquet_repetition_type = parquet::Repetition::OPTIONAL;
+        } else {
+            parquet_repetition_type = parquet::Repetition::UNDEFINED;
+        }
+
+        std::string data_type = (*column)[1];
+        parquet::Type::type parquet_data_type = parquet::Type::BYTE_ARRAY;
+        if (data_type == "boolean") {
+            parquet_data_type = parquet::Type::BOOLEAN;
+        } else if (data_type.find("int32") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT32;
+        } else if (data_type.find("int64") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT64;
+        } else if (data_type.find("int96") != std::string::npos) {
+            parquet_data_type = parquet::Type::INT96;
+        } else if (data_type.find("float") != std::string::npos) {
+            parquet_data_type = parquet::Type::FLOAT;
+        } else if (data_type.find("double") != std::string::npos) {
+            parquet_data_type = parquet::Type::DOUBLE;
+        } else if (data_type.find("byte_array") != std::string::npos) {
+            parquet_data_type = parquet::Type::BYTE_ARRAY;
+        } else if (data_type.find("fixed_len_byte_array") != 
std::string::npos) {
+            parquet_data_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
+        } else {
+            parquet_data_type = parquet::Type::UNDEFINED;
+        }
+
+        std::string column_name = (*column)[2];
+        fields.push_back(parquet::schema::PrimitiveNode::Make(column_name, 
parquet_repetition_type,
+                                                              
parquet::LogicalType::None(),
+                                                              
parquet_data_type));
+        _schema = std::static_pointer_cast<parquet::schema::GroupNode>(
+                parquet::schema::GroupNode::Make("schema", 
parquet::Repetition::REQUIRED, fields));
+    }
+    return Status::OK();
+}
+
+Status VParquetWriterWrapper::init() {
+    RETURN_IF_ERROR(parse_schema(_str_schema));
+    RETURN_IF_ERROR(init_parquet_writer());
+    RETURN_IF_ERROR(validate_schema());
+    return Status::OK();
+}
+
+Status VParquetWriterWrapper::validate_schema() {
+    for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+        switch (_output_vexpr_ctxs[i]->root()->type().type) {
+        case TYPE_BOOLEAN: {
+            if (_str_schema[i][1] != "boolean") {
+                return Status::InvalidArgument(
+                        "project field type is boolean, "
+                        "but the definition type of column {} is {}",
+                        _str_schema[i][2], _str_schema[i][1]);
+            }
+            break;
+        }
+        case TYPE_TINYINT:
+        case TYPE_SMALLINT:
+        case TYPE_INT: {
+            if (_str_schema[i][1] != "int32") {
+                return Status::InvalidArgument(
+                        "project field type is {}, should use int32,"
+                        " but the definition type of column {} is {}",
+                        _output_vexpr_ctxs[i]->root()->type().debug_string(), 
_str_schema[i][2],
+                        _str_schema[i][1]);
+            }
+            break;
+        }
+        case TYPE_LARGEINT: {
+            return Status::InvalidArgument("do not support large int type.");
+        }
+        case TYPE_FLOAT: {
+            if (_str_schema[i][1] != "float") {
+                return Status::InvalidArgument(
+                        "project field type is float, "
+                        "but the definition type of column {} is {}",
+                        _str_schema[i][2], _str_schema[i][1]);
+            }
+            break;
+        }
+        case TYPE_DOUBLE: {
+            if (_str_schema[i][1] != "double") {
+                return Status::InvalidArgument(
+                        "project field type is double, "
+                        "but the definition type of column {} is {}",
+                        _str_schema[i][2], _str_schema[i][1]);
+            }
+            break;
+        }
+        case TYPE_BIGINT:
+        case TYPE_DATETIME:
+        case TYPE_DATE:
+        case TYPE_DATEV2:
+        case TYPE_DATETIMEV2: {
+            if (_str_schema[i][1] != "int64") {
+                return Status::InvalidArgument(
+                        "project field type is {}, should use int64, "
+                        "but the definition type of column {} is {}",
+                        _output_vexpr_ctxs[i]->root()->type().debug_string(), 
_str_schema[i][2],
+                        _str_schema[i][1]);
+            }
+            break;
+        }
+        case TYPE_HLL:
+        case TYPE_OBJECT: {
+            if (!_output_object_data) {
+                return Status::InvalidArgument(
+                        "Invalid expression type: {}",
+                        _output_vexpr_ctxs[i]->root()->type().debug_string());
+            }
+            [[fallthrough]];
+        }
+        case TYPE_CHAR:
+        case TYPE_VARCHAR:
+        case TYPE_STRING:
+        case TYPE_DECIMALV2:
+        case TYPE_DECIMAL32:
+        case TYPE_DECIMAL64:
+        case TYPE_DECIMAL128: {
+            if (_str_schema[i][1] != "byte_array") {
+                return Status::InvalidArgument(
+                        "project field type is {}, should use byte_array, "
+                        "but the definition type of column {} is {}",
+                        _output_vexpr_ctxs[i]->root()->type().debug_string(), 
_str_schema[i][2],
+                        _str_schema[i][1]);
+            }
+            break;
+        }
+        default: {
+            return Status::InvalidArgument("Invalid expression type: {}",
+                                           
_output_vexpr_ctxs[i]->root()->type().debug_string());
+        }
+        }
+    }
+    return Status::OK();
+}
+
+#define RETURN_WRONG_TYPE \
+    return Status::InvalidArgument("Invalid column type: {}", 
raw_column->get_name());
+
+#define DISPATCH_PARQUET_NUMERIC_WRITER(WRITER, COLUMN_TYPE, NATIVE_TYPE)      
                   \
+    parquet::RowGroupWriter* rgWriter = get_rg_writer();                       
                   \
+    parquet::WRITER* col_writer = 
static_cast<parquet::WRITER*>(rgWriter->column(i));             \
+    __int128 default_value = 0;                                                
                   \
+    if (null_map != nullptr) {                                                 
                   \
+        for (size_t row_id = 0; row_id < sz; row_id++) {                       
                   \
+            col_writer->WriteBatch(1, nullptr, nullptr,                        
                   \
+                                   (*null_map)[row_id] != 0                    
                   \
+                                           ? reinterpret_cast<const 
NATIVE_TYPE*>(&default_value) \
+                                           : reinterpret_cast<const 
NATIVE_TYPE*>(                \
+                                                     assert_cast<const 
COLUMN_TYPE&>(*col)        \
+                                                             
.get_data_at(row_id)                 \
+                                                             .data));          
                   \
+        }                                                                      
                   \
+    } else if (const auto* not_nullable_column = check_and_get_column<const 
COLUMN_TYPE>(col)) {  \
+        col_writer->WriteBatch(                                                
                   \
+                sz, nullptr, nullptr,                                          
                   \
+                reinterpret_cast<const 
NATIVE_TYPE*>(not_nullable_column->get_data().data()));    \
+    } else {                                                                   
                   \
+        RETURN_WRONG_TYPE                                                      
                   \
+    }
+
+#define DISPATCH_PARQUET_DECIMAL_WRITER(DECIMAL_TYPE)                          
                  \
+    parquet::RowGroupWriter* rgWriter = get_rg_writer();                       
                  \
+    parquet::ByteArrayWriter* col_writer =                                     
                  \
+            static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));       
                  \
+    parquet::ByteArray value;                                                  
                  \
+    auto decimal_type =                                                        
                  \
+            
check_and_get_data_type<DataTypeDecimal<DECIMAL_TYPE>>(remove_nullable(type).get());
 \
+    DCHECK(decimal_type);                                                      
                  \
+    if (null_map != nullptr) {                                                 
                  \
+        for (size_t row_id = 0; row_id < sz; row_id++) {                       
                  \
+            if ((*null_map)[row_id] != 0) {                                    
                  \
+                col_writer->WriteBatch(1, nullptr, nullptr, &value);           
                  \
+            } else {                                                           
                  \
+                auto s = decimal_type->to_string(*col, row_id);                
                  \
+                value.ptr = reinterpret_cast<const uint8_t*>(s.data());        
                  \
+                value.len = s.size();                                          
                  \
+                col_writer->WriteBatch(1, nullptr, nullptr, &value);           
                  \
+            }                                                                  
                  \
+        }                                                                      
                  \
+    } else {                                                                   
                  \
+        for (size_t row_id = 0; row_id < sz; row_id++) {                       
                  \
+            auto s = decimal_type->to_string(*col, row_id);                    
                  \
+            value.ptr = reinterpret_cast<const uint8_t*>(s.data());            
                  \
+            value.len = s.size();                                              
                  \
+            col_writer->WriteBatch(1, nullptr, nullptr, &value);               
                  \
+        }                                                                      
                  \
+    }
+
+#define DISPATCH_PARQUET_COMPLEX_WRITER(COLUMN_TYPE)                           
                  \
+    parquet::RowGroupWriter* rgWriter = get_rg_writer();                       
                  \
+    parquet::ByteArrayWriter* col_writer =                                     
                  \
+            static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));       
                  \
+    if (null_map != nullptr) {                                                 
                  \
+        for (size_t row_id = 0; row_id < sz; row_id++) {                       
                  \
+            if ((*null_map)[row_id] != 0) {                                    
                  \
+                parquet::ByteArray value;                                      
                  \
+                col_writer->WriteBatch(1, nullptr, nullptr, &value);           
                  \
+            } else {                                                           
                  \
+                const auto& tmp = col->get_data_at(row_id);                    
                  \
+                parquet::ByteArray value;                                      
                  \
+                value.ptr = reinterpret_cast<const uint8_t*>(tmp.data);        
                  \
+                value.len = tmp.size;                                          
                  \
+                col_writer->WriteBatch(1, nullptr, nullptr, &value);           
                  \
+            }                                                                  
                  \
+        }                                                                      
                  \
+    } else if (const auto* not_nullable_column = check_and_get_column<const 
COLUMN_TYPE>(col)) { \
+        for (size_t row_id = 0; row_id < sz; row_id++) {                       
                  \
+            const auto& tmp = not_nullable_column->get_data_at(row_id);        
                  \
+            parquet::ByteArray value;                                          
                  \
+            value.ptr = reinterpret_cast<const uint8_t*>(tmp.data);            
                  \
+            value.len = tmp.size;                                              
                  \
+            col_writer->WriteBatch(1, nullptr, nullptr, &value);               
                  \
+        }                                                                      
                  \
+    } else {                                                                   
                  \
+        RETURN_WRONG_TYPE                                                      
                  \
+    }
+
+Status VParquetWriterWrapper::write(const Block& block) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+    size_t sz = block.rows();
+    try {
+        for (size_t i = 0; i < block.columns(); i++) {
+            auto& raw_column = block.get_by_position(i).column;
+            const auto col = raw_column->is_nullable()
+                                     ? reinterpret_cast<const ColumnNullable*>(
+                                               
block.get_by_position(i).column.get())
+                                               ->get_nested_column_ptr()
+                                               .get()
+                                     : block.get_by_position(i).column.get();
+            auto null_map =
+                    raw_column->is_nullable() && reinterpret_cast<const 
ColumnNullable*>(
+                                                         
block.get_by_position(i).column.get())
+                                                         
->get_null_map_column_ptr()
+                                                         ->has_null()
+                            ? reinterpret_cast<const ColumnNullable*>(
+                                      block.get_by_position(i).column.get())
+                                      ->get_null_map_column_ptr()
+                            : nullptr;
+            auto& type = block.get_by_position(i).type;
+            switch (_output_vexpr_ctxs[i]->root()->type().type) {
+            case TYPE_BOOLEAN: {
+                DISPATCH_PARQUET_NUMERIC_WRITER(BoolWriter, 
ColumnVector<UInt8>, bool)
+                break;
+            }
+            case TYPE_BIGINT: {
+                DISPATCH_PARQUET_NUMERIC_WRITER(Int64Writer, 
ColumnVector<Int64>, int64_t)
+                break;
+            }
+            case TYPE_LARGEINT: {
+                return Status::InvalidArgument("do not support large int 
type.");
+            }
+            case TYPE_FLOAT: {
+                DISPATCH_PARQUET_NUMERIC_WRITER(FloatWriter, 
ColumnVector<Float32>, float_t)
+                break;
+            }
+            case TYPE_DOUBLE: {
+                DISPATCH_PARQUET_NUMERIC_WRITER(DoubleWriter, 
ColumnVector<Float64>, double_t)
+                break;
+            }
+            case TYPE_TINYINT:
+            case TYPE_SMALLINT:
+            case TYPE_INT: {
+                parquet::RowGroupWriter* rgWriter = get_rg_writer();
+                parquet::Int32Writer* col_writer =
+                        
static_cast<parquet::Int32Writer*>(rgWriter->column(i));
+                int32_t default_int32 = 0;
+                if (null_map != nullptr) {
+                    if (const auto* nested_column =
+                                check_and_get_column<const 
ColumnVector<Int32>>(col)) {
+                        for (size_t row_id = 0; row_id < sz; row_id++) {
+                            col_writer->WriteBatch(
+                                    1, nullptr, nullptr,
+                                    (*null_map)[row_id] != 0
+                                            ? &default_int32
+                                            : reinterpret_cast<const int32_t*>(
+                                                      
nested_column->get_data_at(row_id).data));
+                        }
+                    } else if (const auto* int16_column =
+                                       check_and_get_column<const 
ColumnVector<Int16>>(col)) {
+                        for (size_t row_id = 0; row_id < sz; row_id++) {
+                            const int32_t tmp = 
int16_column->get_data()[row_id];
+                            col_writer->WriteBatch(
+                                    1, nullptr, nullptr,
+                                    (*null_map)[row_id] != 0
+                                            ? &default_int32
+                                            : reinterpret_cast<const 
int32_t*>(&tmp));
+                        }
+                    } else if (const auto* int8_column =
+                                       check_and_get_column<const 
ColumnVector<Int8>>(col)) {
+                        for (size_t row_id = 0; row_id < sz; row_id++) {
+                            const int32_t tmp = 
int8_column->get_data()[row_id];
+                            col_writer->WriteBatch(
+                                    1, nullptr, nullptr,
+                                    (*null_map)[row_id] != 0
+                                            ? &default_int32
+                                            : reinterpret_cast<const 
int32_t*>(&tmp));
+                        }
+                    } else {
+                        RETURN_WRONG_TYPE
+                    }
+                } else if (const auto* not_nullable_column =
+                                   check_and_get_column<const 
ColumnVector<Int32>>(col)) {
+                    col_writer->WriteBatch(sz, nullptr, nullptr,
+                                           reinterpret_cast<const int32_t*>(
+                                                   
not_nullable_column->get_data().data()));
+                } else if (const auto& int16_column =
+                                   check_and_get_column<const 
ColumnVector<Int16>>(col)) {
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        const int32_t tmp = int16_column->get_data()[row_id];
+                        col_writer->WriteBatch(1, nullptr, nullptr,
+                                               reinterpret_cast<const 
int32_t*>(&tmp));
+                    }
+                } else if (const auto& int8_column =
+                                   check_and_get_column<const 
ColumnVector<Int8>>(col)) {
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        const int32_t tmp = int8_column->get_data()[row_id];
+                        col_writer->WriteBatch(1, nullptr, nullptr,
+                                               reinterpret_cast<const 
int32_t*>(&tmp));
+                    }
+                } else {
+                    RETURN_WRONG_TYPE
+                }
+                break;
+            }
+            case TYPE_DATETIME:
+            case TYPE_DATE: {
+                parquet::RowGroupWriter* rgWriter = get_rg_writer();
+                parquet::Int64Writer* col_writer =
+                        
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
+                int64_t default_int64 = 0;
+                if (null_map != nullptr) {
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        if ((*null_map)[row_id] != 0) {
+                            col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int64);
+                        } else {
+                            const auto tmp = binary_cast<Int64, 
VecDateTimeValue>(
+                                                     assert_cast<const 
ColumnVector<Int64>&>(*col)
+                                                             
.get_data()[row_id])
+                                                     .to_olap_datetime();
+                            col_writer->WriteBatch(1, nullptr, nullptr,
+                                                   reinterpret_cast<const 
int64_t*>(&tmp));
+                        }
+                    }
+                } else if (const auto* not_nullable_column =
+                                   check_and_get_column<const 
ColumnVector<Int64>>(col)) {
+                    std::vector<uint64_t> res(sz);
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        res[row_id] = binary_cast<Int64, VecDateTimeValue>(
+                                              
not_nullable_column->get_data()[row_id])
+                                              .to_olap_datetime();
+                    }
+                    col_writer->WriteBatch(sz, nullptr, nullptr,
+                                           reinterpret_cast<const 
int64_t*>(res.data()));
+                } else {
+                    RETURN_WRONG_TYPE
+                }
+                break;
+            }
+            case TYPE_DATEV2: {
+                parquet::RowGroupWriter* rgWriter = get_rg_writer();
+                parquet::Int64Writer* col_writer =
+                        
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
+                int64_t default_int64 = 0;
+                if (null_map != nullptr) {
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        if ((*null_map)[row_id] != 0) {
+                            col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int64);
+                        } else {
+                            uint64_t tmp = binary_cast<UInt32, 
DateV2Value<DateV2ValueType>>(
+                                                   assert_cast<const 
ColumnVector<UInt32>&>(*col)
+                                                           .get_data()[row_id])
+                                                   .to_olap_datetime();
+                            col_writer->WriteBatch(1, nullptr, nullptr,
+                                                   reinterpret_cast<const 
int64_t*>(&tmp));
+                        }
+                    }
+                } else if (const auto* not_nullable_column =
+                                   check_and_get_column<const 
ColumnVector<UInt32>>(col)) {
+                    std::vector<uint64_t> res(sz);
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        res[row_id] = binary_cast<UInt32, 
DateV2Value<DateV2ValueType>>(
+                                              
not_nullable_column->get_data()[row_id])
+                                              .to_olap_datetime();
+                    }
+                    col_writer->WriteBatch(sz, nullptr, nullptr,
+                                           reinterpret_cast<const 
int64_t*>(res.data()));
+                } else {
+                    RETURN_WRONG_TYPE
+                }
+                break;
+            }
+            case TYPE_DATETIMEV2: {
+                parquet::RowGroupWriter* rgWriter = get_rg_writer();
+                parquet::Int64Writer* col_writer =
+                        
static_cast<parquet::Int64Writer*>(rgWriter->column(i));
+                int64_t default_int64 = 0;
+                if (null_map != nullptr) {
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        if ((*null_map)[row_id] != 0) {
+                            col_writer->WriteBatch(1, nullptr, nullptr, 
&default_int64);
+                        } else {
+                            uint64_t tmp = binary_cast<UInt64, 
DateV2Value<DateTimeV2ValueType>>(
+                                                   assert_cast<const 
ColumnVector<UInt64>&>(*col)
+                                                           .get_data()[row_id])
+                                                   .to_olap_datetime();
+                            col_writer->WriteBatch(1, nullptr, nullptr,
+                                                   reinterpret_cast<const 
int64_t*>(&tmp));
+                        }
+                    }
+                } else if (const auto* not_nullable_column =
+                                   check_and_get_column<const 
ColumnVector<UInt64>>(col)) {
+                    std::vector<uint64_t> res(sz);
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        res[row_id] = binary_cast<UInt64, 
DateV2Value<DateTimeV2ValueType>>(
+                                              
not_nullable_column->get_data()[row_id])
+                                              .to_olap_datetime();
+                    }
+                    col_writer->WriteBatch(sz, nullptr, nullptr,
+                                           reinterpret_cast<const 
int64_t*>(res.data()));
+                } else {
+                    RETURN_WRONG_TYPE
+                }
+                break;
+            }
+            case TYPE_OBJECT: {
+                DISPATCH_PARQUET_COMPLEX_WRITER(ColumnBitmap)
+                break;
+            }
+            case TYPE_HLL: {
+                DISPATCH_PARQUET_COMPLEX_WRITER(ColumnHLL)
+                break;
+            }
+            case TYPE_CHAR:
+            case TYPE_VARCHAR:
+            case TYPE_STRING: {
+                DISPATCH_PARQUET_COMPLEX_WRITER(ColumnString)
+                break;
+            }
+            case TYPE_DECIMALV2: {
+                parquet::RowGroupWriter* rgWriter = get_rg_writer();
+                parquet::ByteArrayWriter* col_writer =
+                        
static_cast<parquet::ByteArrayWriter*>(rgWriter->column(i));
+                parquet::ByteArray value;
+                if (null_map != nullptr) {
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        if ((*null_map)[row_id] != 0) {
+                            col_writer->WriteBatch(1, nullptr, nullptr, 
&value);
+                        } else {
+                            const DecimalV2Value 
decimal_val(reinterpret_cast<const PackedInt128*>(
+                                                                     
col->get_data_at(row_id).data)
+                                                                     ->value);
+                            char decimal_buffer[MAX_DECIMAL_WIDTH];
+                            int output_scale = 
_output_vexpr_ctxs[i]->root()->type().scale;
+                            value.ptr = reinterpret_cast<const 
uint8_t*>(decimal_buffer);
+                            value.len = decimal_val.to_buffer(decimal_buffer, 
output_scale);
+                            col_writer->WriteBatch(1, nullptr, nullptr, 
&value);
+                        }
+                    }
+                } else if (const auto* not_nullable_column =
+                                   check_and_get_column<const 
ColumnDecimal128>(col)) {
+                    for (size_t row_id = 0; row_id < sz; row_id++) {
+                        const DecimalV2Value decimal_val(
+                                reinterpret_cast<const PackedInt128*>(
+                                        
not_nullable_column->get_data_at(row_id).data)
+                                        ->value);
+                        char decimal_buffer[MAX_DECIMAL_WIDTH];
+                        int output_scale = 
_output_vexpr_ctxs[i]->root()->type().scale;
+                        value.ptr = reinterpret_cast<const 
uint8_t*>(decimal_buffer);
+                        value.len = decimal_val.to_buffer(decimal_buffer, 
output_scale);
+                        col_writer->WriteBatch(1, nullptr, nullptr, &value);
+                    }
+                } else {
+                    RETURN_WRONG_TYPE
+                }
+                break;
+            }
+            case TYPE_DECIMAL32: {
+                DISPATCH_PARQUET_DECIMAL_WRITER(Decimal32)
+                break;
+            }
+            case TYPE_DECIMAL64: {
+                DISPATCH_PARQUET_DECIMAL_WRITER(Decimal64)
+                break;
+            }
+            case TYPE_DECIMAL128: {
+                DISPATCH_PARQUET_DECIMAL_WRITER(Decimal128)
+                break;
+            }
+            default: {
+                return Status::InvalidArgument(
+                        "Invalid expression type: {}",
+                        _output_vexpr_ctxs[i]->root()->type().debug_string());
+            }
+            }
+        }
+    } catch (const std::exception& e) {
+        LOG(WARNING) << "Parquet write error: " << e.what();
+        return Status::InternalError(e.what());
+    }
+    _cur_written_rows += sz;
+    return Status::OK();
+}
+
+Status VParquetWriterWrapper::init_parquet_writer() {
+    _writer = parquet::ParquetFileWriter::Open(_outstream, _schema, 
_properties);
+    if (_writer == nullptr) {
+        return Status::InternalError("Failed to create file writer");
+    }
+    return Status::OK();
+}
+
+parquet::RowGroupWriter* VParquetWriterWrapper::get_rg_writer() {
+    if (_rg_writer == nullptr) {
+        _rg_writer = _writer->AppendBufferedRowGroup();
+    }
+    if (_cur_written_rows > _max_row_per_group) {
+        _rg_writer->Close();
+        _rg_writer = _writer->AppendBufferedRowGroup();
+        _cur_written_rows = 0;
+    }
+    return _rg_writer;
+}
+
+int64_t VParquetWriterWrapper::written_len() {
+    return _outstream->get_written_len();
+}
+
+void VParquetWriterWrapper::close() {
+    try {
+        if (_rg_writer != nullptr) {
+            _rg_writer->Close();
+            _rg_writer = nullptr;
+        }
+        _writer->Close();
+        arrow::Status st = _outstream->Close();
+        if (!st.ok()) {
+            LOG(WARNING) << "close parquet file error: " << st.ToString();
+        }
+    } catch (const std::exception& e) {
+        _rg_writer = nullptr;
+        LOG(WARNING) << "Parquet writer close error: " << e.what();
+    }
+}
+
+VParquetWriterWrapper::~VParquetWriterWrapper() {}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vparquet_writer.h 
b/be/src/vec/runtime/vparquet_writer.h
new file mode 100644
index 0000000000..111ad6d689
--- /dev/null
+++ b/be/src/vec/runtime/vparquet_writer.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/api.h>
+#include <arrow/buffer.h>
+#include <arrow/io/api.h>
+#include <arrow/io/file.h>
+#include <arrow/io/interfaces.h>
+#include <parquet/api/reader.h>
+#include <parquet/api/writer.h>
+#include <parquet/arrow/reader.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/exception.h>
+#include <stdint.h>
+
+#include <map>
+#include <string>
+
+#include "common/status.h"
+#include "exec/parquet_writer.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris::vectorized {
+class FileWriter;
+
+// a wrapper of parquet output stream
+class VParquetWriterWrapper {
+public:
+    VParquetWriterWrapper(doris::FileWriter* file_writer,
+                          const std::vector<VExprContext*>& output_vexpr_ctxs,
+                          const std::map<std::string, std::string>& properties,
+                          const std::vector<std::vector<std::string>>& schema,
+                          bool output_object_data);
+    virtual ~VParquetWriterWrapper();
+
+    Status init();
+
+    Status validate_schema();
+
+    Status write(const Block& block);
+
+    Status init_parquet_writer();
+
+    void close();
+
+    void parse_properties(const std::map<std::string, std::string>& 
propertie_map);
+
+    Status parse_schema(const std::vector<std::vector<std::string>>& schema);
+
+    parquet::RowGroupWriter* get_rg_writer();
+
+    int64_t written_len();
+
+private:
+    std::shared_ptr<ParquetOutputStream> _outstream;
+    std::shared_ptr<parquet::WriterProperties> _properties;
+    std::shared_ptr<parquet::schema::GroupNode> _schema;
+    std::unique_ptr<parquet::ParquetFileWriter> _writer;
+    const std::vector<VExprContext*>& _output_vexpr_ctxs;
+    std::vector<std::vector<std::string>> _str_schema;
+    int64_t _cur_written_rows = 0;
+    parquet::RowGroupWriter* _rg_writer;
+    const int64_t _max_row_per_group = 10;
+    bool _output_object_data;
+};
+
+} // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to