This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a36c387a2b [Refactor](transformer) convert to file format writer to 
transformer (#23888)
a36c387a2b is described below

commit a36c387a2b14f843ac56ef4a0332d00bbc7cc5f7
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Tue Sep 5 10:50:10 2023 +0800

    [Refactor](transformer) convert to file format writer to transformer 
(#23888)
---
 be/src/vec/runtime/vcsv_transformer.cpp            | 262 +++++++++++++++++++++
 be/src/vec/runtime/vcsv_transformer.h              |  75 ++++++
 be/src/vec/runtime/vfile_writer_wrapper.h          |   8 +-
 .../{vorc_writer.cpp => vorc_transformer.cpp}      |  20 +-
 .../runtime/{vorc_writer.h => vorc_transformer.h}  |  13 +-
 ...parquet_writer.cpp => vparquet_transformer.cpp} |  32 +--
 .../{vparquet_writer.h => vparquet_transformer.h}  |  18 +-
 be/src/vec/sink/writer/vfile_result_writer.cpp     | 243 ++-----------------
 be/src/vec/sink/writer/vfile_result_writer.h       |  16 +-
 9 files changed, 402 insertions(+), 285 deletions(-)

diff --git a/be/src/vec/runtime/vcsv_transformer.cpp 
b/be/src/vec/runtime/vcsv_transformer.cpp
new file mode 100644
index 0000000000..da5a697460
--- /dev/null
+++ b/be/src/vec/runtime/vcsv_transformer.cpp
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vcsv_transformer.h"
+
+#include <glog/logging.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <exception>
+#include <ostream>
+
+#include "gutil/strings/numbers.h"
+#include "io/fs/file_writer.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/large_int_value.h"
+#include "runtime/primitive_type.h"
+#include "runtime/types.h"
+#include "util/binary_cast.hpp"
+#include "util/mysql_global.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_complex.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
+#include "vec/columns/columns_number.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/pod_array.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/core/types.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/runtime/vdatetime_value.h"
+
+namespace doris::vectorized {
+
+VCSVTransformer::VCSVTransformer(doris::io::FileWriter* file_writer,
+                                 const VExprContextSPtrs& output_vexpr_ctxs,
+                                 bool output_object_data, std::string_view 
header_type,
+                                 std::string_view header, std::string_view 
column_separator,
+                                 std::string_view line_delimiter)
+        : VFileFormatTransformer(output_vexpr_ctxs, output_object_data),
+          _column_separator(column_separator),
+          _line_delimiter(line_delimiter),
+          _file_writer(file_writer) {
+    if (header.size() > 0) {
+        _csv_header = header;
+        if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
+            _csv_header += _gen_csv_header_types();
+        }
+    } else {
+        _csv_header = "";
+    }
+}
+
+Status VCSVTransformer::open() {
+    if (!_csv_header.empty()) {
+        return _file_writer->append(Slice(_csv_header.data(), 
_csv_header.size()));
+    }
+    return Status::OK();
+}
+
+int64_t VCSVTransformer::written_len() {
+    return _file_writer->bytes_appended();
+}
+
+Status VCSVTransformer::close() {
+    return _file_writer->close();
+}
+
+Status VCSVTransformer::write(const Block& block) {
+    using doris::operator<<;
+    for (size_t i = 0; i < block.rows(); i++) {
+        for (size_t col_id = 0; col_id < block.columns(); col_id++) {
+            auto col = block.get_by_position(col_id);
+            if (col.column->is_null_at(i)) {
+                _plain_text_outstream << NULL_IN_CSV;
+            } else {
+                switch (_output_vexpr_ctxs[col_id]->root()->type().type) {
+                case TYPE_BOOLEAN:
+                case TYPE_TINYINT:
+                    _plain_text_outstream << (int)*reinterpret_cast<const 
int8_t*>(
+                            col.column->get_data_at(i).data);
+                    break;
+                case TYPE_SMALLINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
int16_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_INT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
int32_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_BIGINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
int64_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_LARGEINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const 
__int128*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_FLOAT: {
+                    char buffer[MAX_FLOAT_STR_LENGTH + 2];
+                    float float_value =
+                            *reinterpret_cast<const 
float*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = FloatToBuffer(float_value, 
MAX_FLOAT_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt float failed, float value=" 
<< float_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DOUBLE: {
+                    // To prevent loss of precision on float and double types,
+                    // they are converted to strings before output.
+                    // For example: For a double value 27361919854.929001,
+                    // the direct output of using std::stringstream is 
2.73619e+10,
+                    // and after conversion to a string, it outputs 
27361919854.929001
+                    char buffer[MAX_DOUBLE_STR_LENGTH + 2];
+                    double double_value =
+                            *reinterpret_cast<const 
double*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = DoubleToBuffer(double_value, 
MAX_DOUBLE_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt double failed, double value=" 
<< double_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DATEV2: {
+                    char buf[64];
+                    const DateV2Value<DateV2ValueType>* time_val =
+                            (const 
DateV2Value<DateV2ValueType>*)(col.column->get_data_at(i).data);
+                    time_val->to_string(buf);
+                    _plain_text_outstream << buf;
+                    break;
+                }
+                case TYPE_DATETIMEV2: {
+                    char buf[64];
+                    const DateV2Value<DateTimeV2ValueType>* time_val =
+                            (const 
DateV2Value<DateTimeV2ValueType>*)(col.column->get_data_at(i)
+                                                                              
.data);
+                    time_val->to_string(buf, 
_output_vexpr_ctxs[col_id]->root()->type().scale);
+                    _plain_text_outstream << buf;
+                    break;
+                }
+                case TYPE_DATE:
+                case TYPE_DATETIME: {
+                    char buf[64];
+                    const VecDateTimeValue* time_val =
+                            (const 
VecDateTimeValue*)(col.column->get_data_at(i).data);
+                    time_val->to_string(buf);
+                    _plain_text_outstream << buf;
+                    break;
+                }
+                case TYPE_OBJECT:
+                case TYPE_HLL: {
+                    if (!_output_object_data) {
+                        _plain_text_outstream << NULL_IN_CSV;
+                        break;
+                    }
+                    [[fallthrough]];
+                }
+                case TYPE_VARCHAR:
+                case TYPE_CHAR:
+                case TYPE_STRING: {
+                    auto value = col.column->get_data_at(i);
+                    _plain_text_outstream << value;
+                    break;
+                }
+                case TYPE_DECIMALV2: {
+                    const DecimalV2Value decimal_val(
+                            reinterpret_cast<const 
PackedInt128*>(col.column->get_data_at(i).data)
+                                    ->value);
+                    std::string decimal_str;
+                    decimal_str = decimal_val.to_string();
+                    _plain_text_outstream << decimal_str;
+                    break;
+                }
+                case TYPE_DECIMAL32: {
+                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
+                    break;
+                }
+                case TYPE_DECIMAL64: {
+                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
+                    break;
+                }
+                case TYPE_DECIMAL128I: {
+                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
+                    break;
+                }
+                case TYPE_ARRAY: {
+                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
+                    break;
+                }
+                case TYPE_MAP: {
+                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
+                    break;
+                }
+                case TYPE_STRUCT: {
+                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
+                    break;
+                }
+                default: {
+                    // not supported type, like BITMAP, just export null
+                    _plain_text_outstream << NULL_IN_CSV;
+                }
+                }
+            }
+            if (col_id < block.columns() - 1) {
+                _plain_text_outstream << _column_separator;
+            }
+        }
+        _plain_text_outstream << _line_delimiter;
+    }
+
+    return _flush_plain_text_outstream();
+}
+
+Status VCSVTransformer::_flush_plain_text_outstream() {
+    size_t pos = _plain_text_outstream.tellp();
+    if (pos == 0) {
+        return Status::OK();
+    }
+
+    const std::string& buf = _plain_text_outstream.str();
+    RETURN_IF_ERROR(_file_writer->append(buf));
+
+    // clear the stream
+    _plain_text_outstream.str("");
+    _plain_text_outstream.clear();
+
+    return Status::OK();
+}
+
+std::string VCSVTransformer::_gen_csv_header_types() {
+    std::string types;
+    int num_columns = _output_vexpr_ctxs.size();
+    for (int i = 0; i < num_columns; ++i) {
+        types += type_to_string(_output_vexpr_ctxs[i]->root()->type().type);
+        if (i < num_columns - 1) {
+            types += _column_separator;
+        }
+    }
+    types += _line_delimiter;
+    return types;
+}
+
+const std::string VCSVTransformer::NULL_IN_CSV = "\\N";
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vcsv_transformer.h 
b/be/src/vec/runtime/vcsv_transformer.h
new file mode 100644
index 0000000000..fb3232ac93
--- /dev/null
+++ b/be/src/vec/runtime/vcsv_transformer.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <arrow/io/interfaces.h>
+#include <arrow/result.h>
+#include <arrow/status.h>
+#include <gen_cpp/DataSinks_types.h>
+#include <parquet/file_writer.h>
+#include <parquet/properties.h>
+#include <parquet/types.h>
+#include <stdint.h>
+
+#include "vfile_writer_wrapper.h"
+
+namespace doris {
+namespace io {
+class FileWriter;
+} // namespace io
+} // namespace doris
+
+namespace doris::vectorized {
+
+class VCSVTransformer final : public VFileFormatTransformer {
+public:
+    VCSVTransformer(doris::io::FileWriter* file_writer, const 
VExprContextSPtrs& output_vexpr_ctxs,
+                    bool output_object_data, std::string_view header_type, 
std::string_view header,
+                    std::string_view column_separator, std::string_view 
line_delimiter);
+
+    ~VCSVTransformer() = default;
+
+    Status open() override;
+
+    Status write(const Block& block) override;
+
+    Status close() override;
+
+    int64_t written_len() override;
+
+private:
+    Status _flush_plain_text_outstream();
+    std::string _gen_csv_header_types();
+
+    static const std::string NULL_IN_CSV;
+    std::string _csv_header;
+    std::string_view _column_separator;
+    std::string_view _line_delimiter;
+
+    doris::io::FileWriter* _file_writer;
+    // Used to buffer the export data of plain text
+    // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid 
calling
+    // file writer's write() for every single row.
+    // But this cannot solve the problem of a row of data that is too large.
+    // For example: bitmap_to_string() may return large volume of data.
+    // And the speed is relative low, in my test, is about 6.5MB/s.
+    std::stringstream _plain_text_outstream;
+    static const size_t OUTSTREAM_BUFFER_SIZE_BYTES;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vfile_writer_wrapper.h 
b/be/src/vec/runtime/vfile_writer_wrapper.h
index e418a14ffa..e94f6b0ead 100644
--- a/be/src/vec/runtime/vfile_writer_wrapper.h
+++ b/be/src/vec/runtime/vfile_writer_wrapper.h
@@ -26,16 +26,16 @@
 
 namespace doris::vectorized {
 
-class VFileWriterWrapper {
+class VFileFormatTransformer {
 public:
-    VFileWriterWrapper(const VExprContextSPtrs& output_vexpr_ctxs, bool 
output_object_data)
+    VFileFormatTransformer(const VExprContextSPtrs& output_vexpr_ctxs, bool 
output_object_data)
             : _output_vexpr_ctxs(output_vexpr_ctxs),
               _cur_written_rows(0),
               _output_object_data(output_object_data) {}
 
-    virtual ~VFileWriterWrapper() = default;
+    virtual ~VFileFormatTransformer() = default;
 
-    virtual Status prepare() = 0;
+    virtual Status open() = 0;
 
     virtual Status write(const Block& block) = 0;
 
diff --git a/be/src/vec/runtime/vorc_writer.cpp 
b/be/src/vec/runtime/vorc_transformer.cpp
similarity index 98%
rename from be/src/vec/runtime/vorc_writer.cpp
rename to be/src/vec/runtime/vorc_transformer.cpp
index df9615d668..fa83e83e3a 100644
--- a/be/src/vec/runtime/vorc_writer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/runtime/vorc_writer.h"
+#include "vec/runtime/vorc_transformer.h"
 
 #include <glog/logging.h>
 #include <stdlib.h>
@@ -85,15 +85,15 @@ void VOrcOutputStream::set_written_len(int64_t written_len) 
{
     _written_len = written_len;
 }
 
-VOrcWriterWrapper::VOrcWriterWrapper(doris::io::FileWriter* file_writer,
-                                     const VExprContextSPtrs& 
output_vexpr_ctxs,
-                                     const std::string& schema, bool 
output_object_data)
-        : VFileWriterWrapper(output_vexpr_ctxs, output_object_data),
+VOrcTransformer::VOrcTransformer(doris::io::FileWriter* file_writer,
+                                 const VExprContextSPtrs& output_vexpr_ctxs,
+                                 const std::string& schema, bool 
output_object_data)
+        : VFileFormatTransformer(output_vexpr_ctxs, output_object_data),
           _file_writer(file_writer),
           _write_options(new orc::WriterOptions()),
           _schema_str(schema) {}
 
-Status VOrcWriterWrapper::prepare() {
+Status VOrcTransformer::open() {
     try {
         _schema = orc::Type::buildTypeFromString(_schema_str);
     } catch (const std::exception& e) {
@@ -108,15 +108,15 @@ Status VOrcWriterWrapper::prepare() {
     return Status::OK();
 }
 
-std::unique_ptr<orc::ColumnVectorBatch> 
VOrcWriterWrapper::_create_row_batch(size_t sz) {
+std::unique_ptr<orc::ColumnVectorBatch> 
VOrcTransformer::_create_row_batch(size_t sz) {
     return _writer->createRowBatch(sz);
 }
 
-int64_t VOrcWriterWrapper::written_len() {
+int64_t VOrcTransformer::written_len() {
     return _output_stream->getLength();
 }
 
-Status VOrcWriterWrapper::close() {
+Status VOrcTransformer::close() {
     if (_writer != nullptr) {
         try {
             _writer->close();
@@ -398,7 +398,7 @@ Status VOrcWriterWrapper::close() {
 
 #define SET_NUM_ELEMENTS cur_batch->numElements = sz;
 
-Status VOrcWriterWrapper::write(const Block& block) {
+Status VOrcTransformer::write(const Block& block) {
     if (block.rows() == 0) {
         return Status::OK();
     }
diff --git a/be/src/vec/runtime/vorc_writer.h 
b/be/src/vec/runtime/vorc_transformer.h
similarity index 88%
rename from be/src/vec/runtime/vorc_writer.h
rename to be/src/vec/runtime/vorc_transformer.h
index 9afed17e20..06a42361fb 100644
--- a/be/src/vec/runtime/vorc_writer.h
+++ b/be/src/vec/runtime/vorc_transformer.h
@@ -29,7 +29,7 @@
 #include "orc/Type.hh"
 #include "orc/Writer.hh"
 #include "vec/core/block.h"
-#include "vec/runtime/vparquet_writer.h"
+#include "vec/runtime/vparquet_transformer.h"
 
 namespace doris {
 namespace io {
@@ -72,15 +72,14 @@ private:
 };
 
 // a wrapper of parquet output stream
-class VOrcWriterWrapper final : public VFileWriterWrapper {
+class VOrcTransformer final : public VFileFormatTransformer {
 public:
-    VOrcWriterWrapper(doris::io::FileWriter* file_writer,
-                      const VExprContextSPtrs& output_vexpr_ctxs, const 
std::string& schema,
-                      bool output_object_data);
+    VOrcTransformer(doris::io::FileWriter* file_writer, const 
VExprContextSPtrs& output_vexpr_ctxs,
+                    const std::string& schema, bool output_object_data);
 
-    ~VOrcWriterWrapper() = default;
+    ~VOrcTransformer() = default;
 
-    Status prepare() override;
+    Status open() override;
 
     Status write(const Block& block) override;
 
diff --git a/be/src/vec/runtime/vparquet_writer.cpp 
b/be/src/vec/runtime/vparquet_transformer.cpp
similarity index 97%
rename from be/src/vec/runtime/vparquet_writer.cpp
rename to be/src/vec/runtime/vparquet_transformer.cpp
index 0a6392319b..7d1ceed404 100644
--- a/be/src/vec/runtime/vparquet_writer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/runtime/vparquet_writer.h"
+#include "vec/runtime/vparquet_transformer.h"
 
 #include <arrow/io/type_fwd.h>
 #include <glog/logging.h>
@@ -278,14 +278,14 @@ void 
ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build
     }
 }
 
-VParquetWriterWrapper::VParquetWriterWrapper(doris::io::FileWriter* 
file_writer,
-                                             const VExprContextSPtrs& 
output_vexpr_ctxs,
-                                             const 
std::vector<TParquetSchema>& parquet_schemas,
-                                             const 
TParquetCompressionType::type& compression_type,
-                                             const bool& 
parquet_disable_dictionary,
-                                             const TParquetVersion::type& 
parquet_version,
-                                             bool output_object_data)
-        : VFileWriterWrapper(output_vexpr_ctxs, output_object_data),
+VParquetTransformer::VParquetTransformer(doris::io::FileWriter* file_writer,
+                                         const VExprContextSPtrs& 
output_vexpr_ctxs,
+                                         const std::vector<TParquetSchema>& 
parquet_schemas,
+                                         const TParquetCompressionType::type& 
compression_type,
+                                         const bool& 
parquet_disable_dictionary,
+                                         const TParquetVersion::type& 
parquet_version,
+                                         bool output_object_data)
+        : VFileFormatTransformer(output_vexpr_ctxs, output_object_data),
           _rg_writer(nullptr),
           _parquet_schemas(parquet_schemas),
           _compression_type(compression_type),
@@ -294,7 +294,7 @@ 
VParquetWriterWrapper::VParquetWriterWrapper(doris::io::FileWriter* file_writer,
     _outstream = std::shared_ptr<ParquetOutputStream>(new 
ParquetOutputStream(file_writer));
 }
 
-Status VParquetWriterWrapper::parse_properties() {
+Status VParquetTransformer::parse_properties() {
     try {
         parquet::WriterProperties::Builder builder;
         ParquetBuildHelper::build_compression_type(builder, _compression_type);
@@ -311,7 +311,7 @@ Status VParquetWriterWrapper::parse_properties() {
     return Status::OK();
 }
 
-Status VParquetWriterWrapper::parse_schema() {
+Status VParquetTransformer::parse_schema() {
     parquet::schema::NodeVector fields;
     parquet::Repetition::type parquet_repetition_type;
     parquet::Type::type parquet_physical_type;
@@ -394,7 +394,7 @@ Status VParquetWriterWrapper::parse_schema() {
         RETURN_WRONG_TYPE                                                      
                  \
     }
 
-Status VParquetWriterWrapper::write(const Block& block) {
+Status VParquetTransformer::write(const Block& block) {
     if (block.rows() == 0) {
         return Status::OK();
     }
@@ -906,7 +906,7 @@ Status VParquetWriterWrapper::write(const Block& block) {
     return Status::OK();
 }
 
-Status VParquetWriterWrapper::prepare() {
+Status VParquetTransformer::open() {
     RETURN_IF_ERROR(parse_properties());
     RETURN_IF_ERROR(parse_schema());
     try {
@@ -921,7 +921,7 @@ Status VParquetWriterWrapper::prepare() {
     return Status::OK();
 }
 
-parquet::RowGroupWriter* VParquetWriterWrapper::get_rg_writer() {
+parquet::RowGroupWriter* VParquetTransformer::get_rg_writer() {
     if (_rg_writer == nullptr) {
         _rg_writer = _writer->AppendBufferedRowGroup();
     }
@@ -933,11 +933,11 @@ parquet::RowGroupWriter* 
VParquetWriterWrapper::get_rg_writer() {
     return _rg_writer;
 }
 
-int64_t VParquetWriterWrapper::written_len() {
+int64_t VParquetTransformer::written_len() {
     return _outstream->get_written_len();
 }
 
-Status VParquetWriterWrapper::close() {
+Status VParquetTransformer::close() {
     try {
         if (_rg_writer != nullptr) {
             _rg_writer->Close();
diff --git a/be/src/vec/runtime/vparquet_writer.h 
b/be/src/vec/runtime/vparquet_transformer.h
similarity index 87%
rename from be/src/vec/runtime/vparquet_writer.h
rename to be/src/vec/runtime/vparquet_transformer.h
index 36514c9fe8..1fedc3f44f 100644
--- a/be/src/vec/runtime/vparquet_writer.h
+++ b/be/src/vec/runtime/vparquet_transformer.h
@@ -86,18 +86,18 @@ public:
 };
 
 // a wrapper of parquet output stream
-class VParquetWriterWrapper final : public VFileWriterWrapper {
+class VParquetTransformer final : public VFileFormatTransformer {
 public:
-    VParquetWriterWrapper(doris::io::FileWriter* file_writer,
-                          const VExprContextSPtrs& output_vexpr_ctxs,
-                          const std::vector<TParquetSchema>& parquet_schemas,
-                          const TParquetCompressionType::type& 
compression_type,
-                          const bool& parquet_disable_dictionary,
-                          const TParquetVersion::type& parquet_version, bool 
output_object_data);
+    VParquetTransformer(doris::io::FileWriter* file_writer,
+                        const VExprContextSPtrs& output_vexpr_ctxs,
+                        const std::vector<TParquetSchema>& parquet_schemas,
+                        const TParquetCompressionType::type& compression_type,
+                        const bool& parquet_disable_dictionary,
+                        const TParquetVersion::type& parquet_version, bool 
output_object_data);
 
-    ~VParquetWriterWrapper() = default;
+    ~VParquetTransformer() = default;
 
-    Status prepare() override;
+    Status open() override;
 
     Status write(const Block& block) override;
 
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index e54c426f74..0523f0ac08 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -30,25 +30,19 @@
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/consts.h"
 #include "common/status.h"
-#include "gutil/strings/numbers.h"
 #include "io/file_factory.h"
 #include "io/fs/broker_file_system.h"
-#include "io/fs/file_writer.h"
 #include "io/fs/hdfs_file_system.h"
 #include "io/fs/local_file_system.h"
 #include "io/fs/s3_file_system.h"
 #include "io/hdfs_builder.h"
 #include "runtime/buffer_control_block.h"
-#include "runtime/decimalv2_value.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/descriptors.h"
 #include "runtime/large_int_value.h"
 #include "runtime/primitive_type.h"
 #include "runtime/runtime_state.h"
-#include "runtime/types.h"
 #include "service/backend_options.h"
-#include "util/metrics.h"
-#include "util/mysql_global.h"
 #include "util/mysql_row_buffer.h"
 #include "util/s3_uri.h"
 #include "util/s3_util.h"
@@ -57,20 +51,15 @@
 #include "vec/columns/column.h"
 #include "vec/columns/column_string.h"
 #include "vec/columns/column_vector.h"
-#include "vec/common/string_ref.h"
 #include "vec/core/block.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
-#include "vec/runtime/vdatetime_value.h"
-#include "vec/runtime/vorc_writer.h"
-#include "vec/runtime/vparquet_writer.h"
+#include "vec/runtime/vcsv_transformer.h"
+#include "vec/runtime/vorc_transformer.h"
+#include "vec/runtime/vparquet_transformer.h"
 #include "vec/sink/vresult_sink.h"
 
 namespace doris::vectorized {
-const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
-using doris::operator<<;
 
 VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs)
         : AsyncResultWriter(output_exprs) {}
@@ -88,8 +77,7 @@ VFileResultWriter::VFileResultWriter(const ResultFileOptions* 
file_opts,
           _fragment_instance_id(fragment_instance_id),
           _sinker(sinker),
           _output_block(output_block),
-          _output_row_descriptor(output_row_descriptor),
-          _vfile_writer(nullptr) {
+          _output_row_descriptor(output_row_descriptor) {
     _output_object_data = output_object_data;
 }
 
@@ -158,19 +146,19 @@ Status VFileResultWriter::_create_file_writer(const 
std::string& file_name) {
             _file_writer_impl));
     switch (_file_opts->file_format) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
-        // just use file writer is enough
+        _vfile_writer.reset(new VCSVTransformer(
+                _file_writer_impl.get(), _vec_output_expr_ctxs, 
_output_object_data, _header_type,
+                _header, _file_opts->column_separator, 
_file_opts->line_delimiter));
         break;
     case TFileFormatType::FORMAT_PARQUET:
-        _vfile_writer.reset(new VParquetWriterWrapper(
+        _vfile_writer.reset(new VParquetTransformer(
                 _file_writer_impl.get(), _vec_output_expr_ctxs, 
_file_opts->parquet_schemas,
                 _file_opts->parquet_commpression_type, 
_file_opts->parquert_disable_dictionary,
                 _file_opts->parquet_version, _output_object_data));
-        RETURN_IF_ERROR(_vfile_writer->prepare());
         break;
     case TFileFormatType::FORMAT_ORC:
-        _vfile_writer.reset(new VOrcWriterWrapper(_file_writer_impl.get(), 
_vec_output_expr_ctxs,
-                                                  _file_opts->orc_schema, 
_output_object_data));
-        RETURN_IF_ERROR(_vfile_writer->prepare());
+        _vfile_writer.reset(new VOrcTransformer(_file_writer_impl.get(), 
_vec_output_expr_ctxs,
+                                                _file_opts->orc_schema, 
_output_object_data));
         break;
     default:
         return Status::InternalError("unsupported file format: {}", 
_file_opts->file_format);
@@ -178,7 +166,8 @@ Status VFileResultWriter::_create_file_writer(const 
std::string& file_name) {
     LOG(INFO) << "create file for exporting query result. file name: " << 
file_name
               << ". query id: " << print_id(_state->query_id())
               << " format:" << _file_opts->file_format;
-    return Status::OK();
+
+    return _vfile_writer->open();
 }
 
 // file name format as: my_prefix_{fragment_instance_id}_0.csv
@@ -187,7 +176,6 @@ Status VFileResultWriter::_get_next_file_name(std::string* 
file_name) {
     ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << 
(_file_idx++) << "."
        << _file_format_to_name();
     *file_name = ss.str();
-    _header_sent = false;
     if (_storage_type == TStorageBackendType::LOCAL) {
         // For local file writer, the file_path is a local dir.
         // Here we do a simple security verification by checking whether the 
file exists.
@@ -237,215 +225,22 @@ Status VFileResultWriter::append_block(Block& block) {
     if (block.rows() == 0) {
         return Status::OK();
     }
-    RETURN_IF_ERROR(write_csv_header());
     SCOPED_TIMER(_append_row_batch_timer);
     Block output_block;
     RETURN_IF_ERROR(_projection_block(block, &output_block));
-
-    if (_vfile_writer) {
-        RETURN_IF_ERROR(_write_file(output_block));
-    } else {
-        RETURN_IF_ERROR(_write_csv_file(output_block));
-    }
+    RETURN_IF_ERROR(_write_file(output_block));
 
     _written_rows += block.rows();
     return Status::OK();
 }
 
 Status VFileResultWriter::_write_file(const Block& block) {
-    RETURN_IF_ERROR(_vfile_writer->write(block));
-    // split file if exceed limit
-    _current_written_bytes = _vfile_writer->written_len();
-    return _create_new_file_if_exceed_size();
-}
-
-Status VFileResultWriter::_write_csv_file(const Block& block) {
-    for (size_t i = 0; i < block.rows(); i++) {
-        for (size_t col_id = 0; col_id < block.columns(); col_id++) {
-            auto col = block.get_by_position(col_id);
-            if (col.column->is_null_at(i)) {
-                _plain_text_outstream << NULL_IN_CSV;
-            } else {
-                switch (_vec_output_expr_ctxs[col_id]->root()->type().type) {
-                case TYPE_BOOLEAN:
-                case TYPE_TINYINT:
-                    _plain_text_outstream << (int)*reinterpret_cast<const 
int8_t*>(
-                            col.column->get_data_at(i).data);
-                    break;
-                case TYPE_SMALLINT:
-                    _plain_text_outstream
-                            << *reinterpret_cast<const 
int16_t*>(col.column->get_data_at(i).data);
-                    break;
-                case TYPE_INT:
-                    _plain_text_outstream
-                            << *reinterpret_cast<const 
int32_t*>(col.column->get_data_at(i).data);
-                    break;
-                case TYPE_BIGINT:
-                    _plain_text_outstream
-                            << *reinterpret_cast<const 
int64_t*>(col.column->get_data_at(i).data);
-                    break;
-                case TYPE_LARGEINT:
-                    _plain_text_outstream
-                            << *reinterpret_cast<const 
__int128*>(col.column->get_data_at(i).data);
-                    break;
-                case TYPE_FLOAT: {
-                    char buffer[MAX_FLOAT_STR_LENGTH + 2];
-                    float float_value =
-                            *reinterpret_cast<const 
float*>(col.column->get_data_at(i).data);
-                    buffer[0] = '\0';
-                    int length = FloatToBuffer(float_value, 
MAX_FLOAT_STR_LENGTH, buffer);
-                    DCHECK(length >= 0) << "gcvt float failed, float value=" 
<< float_value;
-                    _plain_text_outstream << buffer;
-                    break;
-                }
-                case TYPE_DOUBLE: {
-                    // To prevent loss of precision on float and double types,
-                    // they are converted to strings before output.
-                    // For example: For a double value 27361919854.929001,
-                    // the direct output of using std::stringstream is 
2.73619e+10,
-                    // and after conversion to a string, it outputs 
27361919854.929001
-                    char buffer[MAX_DOUBLE_STR_LENGTH + 2];
-                    double double_value =
-                            *reinterpret_cast<const 
double*>(col.column->get_data_at(i).data);
-                    buffer[0] = '\0';
-                    int length = DoubleToBuffer(double_value, 
MAX_DOUBLE_STR_LENGTH, buffer);
-                    DCHECK(length >= 0) << "gcvt double failed, double value=" 
<< double_value;
-                    _plain_text_outstream << buffer;
-                    break;
-                }
-                case TYPE_DATEV2: {
-                    char buf[64];
-                    const DateV2Value<DateV2ValueType>* time_val =
-                            (const 
DateV2Value<DateV2ValueType>*)(col.column->get_data_at(i).data);
-                    time_val->to_string(buf);
-                    _plain_text_outstream << buf;
-                    break;
-                }
-                case TYPE_DATETIMEV2: {
-                    char buf[64];
-                    const DateV2Value<DateTimeV2ValueType>* time_val =
-                            (const 
DateV2Value<DateTimeV2ValueType>*)(col.column->get_data_at(i)
-                                                                              
.data);
-                    time_val->to_string(buf, 
_vec_output_expr_ctxs[col_id]->root()->type().scale);
-                    _plain_text_outstream << buf;
-                    break;
-                }
-                case TYPE_DATE:
-                case TYPE_DATETIME: {
-                    char buf[64];
-                    const VecDateTimeValue* time_val =
-                            (const 
VecDateTimeValue*)(col.column->get_data_at(i).data);
-                    time_val->to_string(buf);
-                    _plain_text_outstream << buf;
-                    break;
-                }
-                case TYPE_OBJECT:
-                case TYPE_HLL: {
-                    if (!_output_object_data) {
-                        _plain_text_outstream << NULL_IN_CSV;
-                        break;
-                    }
-                    [[fallthrough]];
-                }
-                case TYPE_VARCHAR:
-                case TYPE_CHAR:
-                case TYPE_STRING: {
-                    auto value = col.column->get_data_at(i);
-                    _plain_text_outstream << value;
-                    break;
-                }
-                case TYPE_DECIMALV2: {
-                    const DecimalV2Value decimal_val(
-                            reinterpret_cast<const 
PackedInt128*>(col.column->get_data_at(i).data)
-                                    ->value);
-                    std::string decimal_str;
-                    decimal_str = decimal_val.to_string();
-                    _plain_text_outstream << decimal_str;
-                    break;
-                }
-                case TYPE_DECIMAL32: {
-                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
-                    break;
-                }
-                case TYPE_DECIMAL64: {
-                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
-                    break;
-                }
-                case TYPE_DECIMAL128I: {
-                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
-                    break;
-                }
-                case TYPE_ARRAY: {
-                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
-                    break;
-                }
-                case TYPE_MAP: {
-                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
-                    break;
-                }
-                case TYPE_STRUCT: {
-                    _plain_text_outstream << col.type->to_string(*col.column, 
i);
-                    break;
-                }
-                default: {
-                    // not supported type, like BITMAP, just export null
-                    _plain_text_outstream << NULL_IN_CSV;
-                }
-                }
-            }
-            if (col_id < block.columns() - 1) {
-                _plain_text_outstream << _file_opts->column_separator;
-            }
-        }
-        _plain_text_outstream << _file_opts->line_delimiter;
-    }
-
-    return _flush_plain_text_outstream(true);
-}
-
-std::string VFileResultWriter::gen_types() {
-    std::string types;
-    int num_columns = _vec_output_expr_ctxs.size();
-    for (int i = 0; i < num_columns; ++i) {
-        types += type_to_string(_vec_output_expr_ctxs[i]->root()->type().type);
-        if (i < num_columns - 1) {
-            types += _file_opts->column_separator;
-        }
-    }
-    types += _file_opts->line_delimiter;
-    return types;
-}
-
-Status VFileResultWriter::write_csv_header() {
-    if (!_header_sent && _header.size() > 0) {
-        std::string tmp_header(_header);
-        if (_header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
-            tmp_header += gen_types();
-        }
-        RETURN_IF_ERROR(_file_writer_impl->append(tmp_header));
-        _header_sent = true;
-    }
-    return Status::OK();
-}
-
-Status VFileResultWriter::_flush_plain_text_outstream(bool eos) {
-    SCOPED_TIMER(_file_write_timer);
-    size_t pos = _plain_text_outstream.tellp();
-    if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) {
-        return Status::OK();
+    {
+        SCOPED_TIMER(_file_write_timer);
+        RETURN_IF_ERROR(_vfile_writer->write(block));
     }
-
-    const std::string& buf = _plain_text_outstream.str();
-    size_t written_len = buf.size();
-    RETURN_IF_ERROR(_file_writer_impl->append(buf));
-    COUNTER_UPDATE(_written_data_bytes, written_len);
-    _current_written_bytes += written_len;
-
-    // clear the stream
-    _plain_text_outstream.str("");
-    _plain_text_outstream.clear();
-
     // split file if exceed limit
+    _current_written_bytes = _vfile_writer->written_len();
     return _create_new_file_if_exceed_size();
 }
 
@@ -516,7 +311,7 @@ Status VFileResultWriter::_send_result() {
     result->result_batch.rows.resize(1);
     result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());
 
-    std::map<std::string, string> attach_infos;
+    std::map<std::string, std::string> attach_infos;
     attach_infos.insert(std::make_pair("FileNumber", 
std::to_string(_file_idx)));
     attach_infos.insert(
             std::make_pair("TotalRows", 
std::to_string(_written_rows_counter->value())));
@@ -634,6 +429,4 @@ Status VFileResultWriter::close() {
     return _close_file_writer(true);
 }
 
-const string VFileResultWriter::NULL_IN_CSV = "\\N";
-
 } // namespace doris::vectorized
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h 
b/be/src/vec/sink/writer/vfile_result_writer.h
index b56e41c377..69a26714dc 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -69,9 +69,6 @@ public:
     // file result writer always return statistic result in one row
     int64_t get_written_rows() const override { return 1; }
 
-    std::string gen_types();
-    Status write_csv_header();
-
     void set_header_info(const std::string& header_type, const std::string& 
header) {
         _header_type = header_type;
         _header = header;
@@ -80,11 +77,7 @@ public:
 private:
     Status _init(RuntimeState* state, RuntimeProfile*);
     Status _write_file(const Block& block);
-    Status _write_csv_file(const Block& block);
 
-    // if buffer exceed the limit, write the data buffered in 
_plain_text_outstream via file_writer
-    // if eos, write the data even if buffer is not full.
-    Status _flush_plain_text_outstream(bool eos);
     void _init_profile(RuntimeProfile*);
 
     Status _create_file_writer(const std::string& file_name);
@@ -106,8 +99,6 @@ private:
     // delete the dir of file_path
     Status _delete_dir();
 
-    static const std::string NULL_IN_CSV;
-
     RuntimeState* _state; // not owned, set when init
     const ResultFileOptions* _file_opts;
     TStorageBackendType::type _storage_type;
@@ -123,7 +114,6 @@ private:
     // For example: bitmap_to_string() may return large volume of data.
     // And the speed is relative low, in my test, is about 6.5MB/s.
     std::stringstream _plain_text_outstream;
-    static const size_t OUTSTREAM_BUFFER_SIZE_BYTES;
 
     // current written bytes, used for split data
     int64_t _current_written_bytes = 0;
@@ -148,13 +138,11 @@ private:
     Block* _output_block = nullptr;
     // set to true if the final statistic result is sent
     bool _is_result_sent = false;
-    bool _header_sent = false;
     RowDescriptor _output_row_descriptor;
-    // parquet/orc file writer
-    std::unique_ptr<VFileWriterWrapper> _vfile_writer;
+    // convert block to parquet/orc/csv fomrat
+    std::unique_ptr<VFileFormatTransformer> _vfile_writer;
 
     std::string_view _header_type;
     std::string_view _header;
-    std::unique_ptr<VFileResultWriter> _writer;
 };
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to