This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new a36c387a2b [Refactor](transformer) convert to file format writer to transformer (#23888) a36c387a2b is described below commit a36c387a2b14f843ac56ef4a0332d00bbc7cc5f7 Author: HappenLee <happen...@hotmail.com> AuthorDate: Tue Sep 5 10:50:10 2023 +0800 [Refactor](transformer) convert to file format writer to transformer (#23888) --- be/src/vec/runtime/vcsv_transformer.cpp | 262 +++++++++++++++++++++ be/src/vec/runtime/vcsv_transformer.h | 75 ++++++ be/src/vec/runtime/vfile_writer_wrapper.h | 8 +- .../{vorc_writer.cpp => vorc_transformer.cpp} | 20 +- .../runtime/{vorc_writer.h => vorc_transformer.h} | 13 +- ...parquet_writer.cpp => vparquet_transformer.cpp} | 32 +-- .../{vparquet_writer.h => vparquet_transformer.h} | 18 +- be/src/vec/sink/writer/vfile_result_writer.cpp | 243 ++----------------- be/src/vec/sink/writer/vfile_result_writer.h | 16 +- 9 files changed, 402 insertions(+), 285 deletions(-) diff --git a/be/src/vec/runtime/vcsv_transformer.cpp b/be/src/vec/runtime/vcsv_transformer.cpp new file mode 100644 index 0000000000..da5a697460 --- /dev/null +++ b/be/src/vec/runtime/vcsv_transformer.cpp @@ -0,0 +1,262 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/runtime/vcsv_transformer.h" + +#include <glog/logging.h> +#include <stdlib.h> +#include <string.h> + +#include <exception> +#include <ostream> + +#include "gutil/strings/numbers.h" +#include "io/fs/file_writer.h" +#include "runtime/define_primitive_type.h" +#include "runtime/large_int_value.h" +#include "runtime/primitive_type.h" +#include "runtime/types.h" +#include "util/binary_cast.hpp" +#include "util/mysql_global.h" +#include "vec/columns/column.h" +#include "vec/columns/column_complex.h" +#include "vec/columns/column_decimal.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/common/pod_array.h" +#include "vec/common/string_ref.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/core/types.h" +#include "vec/exprs/vexpr.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/runtime/vdatetime_value.h" + +namespace doris::vectorized { + +VCSVTransformer::VCSVTransformer(doris::io::FileWriter* file_writer, + const VExprContextSPtrs& output_vexpr_ctxs, + bool output_object_data, std::string_view header_type, + std::string_view header, std::string_view column_separator, + std::string_view line_delimiter) + : VFileFormatTransformer(output_vexpr_ctxs, output_object_data), + _column_separator(column_separator), + _line_delimiter(line_delimiter), + _file_writer(file_writer) { + if (header.size() > 0) { + _csv_header = header; + if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { + _csv_header += _gen_csv_header_types(); + } + } else { + _csv_header = ""; + } +} + +Status VCSVTransformer::open() { + if (!_csv_header.empty()) { + return _file_writer->append(Slice(_csv_header.data(), _csv_header.size())); + } + return Status::OK(); +} + +int64_t VCSVTransformer::written_len() { + return _file_writer->bytes_appended(); +} + +Status VCSVTransformer::close() { + return _file_writer->close(); +} + +Status VCSVTransformer::write(const Block& block) { + using doris::operator<<; + for (size_t i = 0; i < block.rows(); i++) { + for (size_t col_id = 0; col_id < block.columns(); col_id++) { + auto col = block.get_by_position(col_id); + if (col.column->is_null_at(i)) { + _plain_text_outstream << NULL_IN_CSV; + } else { + switch (_output_vexpr_ctxs[col_id]->root()->type().type) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + _plain_text_outstream << (int)*reinterpret_cast<const int8_t*>( + col.column->get_data_at(i).data); + break; + case TYPE_SMALLINT: + _plain_text_outstream + << *reinterpret_cast<const int16_t*>(col.column->get_data_at(i).data); + break; + case TYPE_INT: + _plain_text_outstream + << *reinterpret_cast<const int32_t*>(col.column->get_data_at(i).data); + break; + case TYPE_BIGINT: + _plain_text_outstream + << *reinterpret_cast<const int64_t*>(col.column->get_data_at(i).data); + break; + case TYPE_LARGEINT: + _plain_text_outstream + << *reinterpret_cast<const __int128*>(col.column->get_data_at(i).data); + break; + case TYPE_FLOAT: { + char buffer[MAX_FLOAT_STR_LENGTH + 2]; + float float_value = + *reinterpret_cast<const float*>(col.column->get_data_at(i).data); + buffer[0] = '\0'; + int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value; + _plain_text_outstream << buffer; + break; + } + case TYPE_DOUBLE: { + // To prevent loss of precision on float and double types, + // they are converted to strings before output. + // For example: For a double value 27361919854.929001, + // the direct output of using std::stringstream is 2.73619e+10, + // and after conversion to a string, it outputs 27361919854.929001 + char buffer[MAX_DOUBLE_STR_LENGTH + 2]; + double double_value = + *reinterpret_cast<const double*>(col.column->get_data_at(i).data); + buffer[0] = '\0'; + int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value; + _plain_text_outstream << buffer; + break; + } + case TYPE_DATEV2: { + char buf[64]; + const DateV2Value<DateV2ValueType>* time_val = + (const DateV2Value<DateV2ValueType>*)(col.column->get_data_at(i).data); + time_val->to_string(buf); + _plain_text_outstream << buf; + break; + } + case TYPE_DATETIMEV2: { + char buf[64]; + const DateV2Value<DateTimeV2ValueType>* time_val = + (const DateV2Value<DateTimeV2ValueType>*)(col.column->get_data_at(i) + .data); + time_val->to_string(buf, _output_vexpr_ctxs[col_id]->root()->type().scale); + _plain_text_outstream << buf; + break; + } + case TYPE_DATE: + case TYPE_DATETIME: { + char buf[64]; + const VecDateTimeValue* time_val = + (const VecDateTimeValue*)(col.column->get_data_at(i).data); + time_val->to_string(buf); + _plain_text_outstream << buf; + break; + } + case TYPE_OBJECT: + case TYPE_HLL: { + if (!_output_object_data) { + _plain_text_outstream << NULL_IN_CSV; + break; + } + [[fallthrough]]; + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + auto value = col.column->get_data_at(i); + _plain_text_outstream << value; + break; + } + case TYPE_DECIMALV2: { + const DecimalV2Value decimal_val( + reinterpret_cast<const PackedInt128*>(col.column->get_data_at(i).data) + ->value); + std::string decimal_str; + decimal_str = decimal_val.to_string(); + _plain_text_outstream << decimal_str; + break; + } + case TYPE_DECIMAL32: { + _plain_text_outstream << col.type->to_string(*col.column, i); + break; + } + case TYPE_DECIMAL64: { + _plain_text_outstream << col.type->to_string(*col.column, i); + break; + } + case TYPE_DECIMAL128I: { + _plain_text_outstream << col.type->to_string(*col.column, i); + break; + } + case TYPE_ARRAY: { + _plain_text_outstream << col.type->to_string(*col.column, i); + break; + } + case TYPE_MAP: { + _plain_text_outstream << col.type->to_string(*col.column, i); + break; + } + case TYPE_STRUCT: { + _plain_text_outstream << col.type->to_string(*col.column, i); + break; + } + default: { + // not supported type, like BITMAP, just export null + _plain_text_outstream << NULL_IN_CSV; + } + } + } + if (col_id < block.columns() - 1) { + _plain_text_outstream << _column_separator; + } + } + _plain_text_outstream << _line_delimiter; + } + + return _flush_plain_text_outstream(); +} + +Status VCSVTransformer::_flush_plain_text_outstream() { + size_t pos = _plain_text_outstream.tellp(); + if (pos == 0) { + return Status::OK(); + } + + const std::string& buf = _plain_text_outstream.str(); + RETURN_IF_ERROR(_file_writer->append(buf)); + + // clear the stream + _plain_text_outstream.str(""); + _plain_text_outstream.clear(); + + return Status::OK(); +} + +std::string VCSVTransformer::_gen_csv_header_types() { + std::string types; + int num_columns = _output_vexpr_ctxs.size(); + for (int i = 0; i < num_columns; ++i) { + types += type_to_string(_output_vexpr_ctxs[i]->root()->type().type); + if (i < num_columns - 1) { + types += _column_separator; + } + } + types += _line_delimiter; + return types; +} + +const std::string VCSVTransformer::NULL_IN_CSV = "\\N"; +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vcsv_transformer.h b/be/src/vec/runtime/vcsv_transformer.h new file mode 100644 index 0000000000..fb3232ac93 --- /dev/null +++ b/be/src/vec/runtime/vcsv_transformer.h @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <arrow/io/interfaces.h> +#include <arrow/result.h> +#include <arrow/status.h> +#include <gen_cpp/DataSinks_types.h> +#include <parquet/file_writer.h> +#include <parquet/properties.h> +#include <parquet/types.h> +#include <stdint.h> + +#include "vfile_writer_wrapper.h" + +namespace doris { +namespace io { +class FileWriter; +} // namespace io +} // namespace doris + +namespace doris::vectorized { + +class VCSVTransformer final : public VFileFormatTransformer { +public: + VCSVTransformer(doris::io::FileWriter* file_writer, const VExprContextSPtrs& output_vexpr_ctxs, + bool output_object_data, std::string_view header_type, std::string_view header, + std::string_view column_separator, std::string_view line_delimiter); + + ~VCSVTransformer() = default; + + Status open() override; + + Status write(const Block& block) override; + + Status close() override; + + int64_t written_len() override; + +private: + Status _flush_plain_text_outstream(); + std::string _gen_csv_header_types(); + + static const std::string NULL_IN_CSV; + std::string _csv_header; + std::string_view _column_separator; + std::string_view _line_delimiter; + + doris::io::FileWriter* _file_writer; + // Used to buffer the export data of plain text + // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling + // file writer's write() for every single row. + // But this cannot solve the problem of a row of data that is too large. + // For example: bitmap_to_string() may return large volume of data. + // And the speed is relative low, in my test, is about 6.5MB/s. + std::stringstream _plain_text_outstream; + static const size_t OUTSTREAM_BUFFER_SIZE_BYTES; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vfile_writer_wrapper.h b/be/src/vec/runtime/vfile_writer_wrapper.h index e418a14ffa..e94f6b0ead 100644 --- a/be/src/vec/runtime/vfile_writer_wrapper.h +++ b/be/src/vec/runtime/vfile_writer_wrapper.h @@ -26,16 +26,16 @@ namespace doris::vectorized { -class VFileWriterWrapper { +class VFileFormatTransformer { public: - VFileWriterWrapper(const VExprContextSPtrs& output_vexpr_ctxs, bool output_object_data) + VFileFormatTransformer(const VExprContextSPtrs& output_vexpr_ctxs, bool output_object_data) : _output_vexpr_ctxs(output_vexpr_ctxs), _cur_written_rows(0), _output_object_data(output_object_data) {} - virtual ~VFileWriterWrapper() = default; + virtual ~VFileFormatTransformer() = default; - virtual Status prepare() = 0; + virtual Status open() = 0; virtual Status write(const Block& block) = 0; diff --git a/be/src/vec/runtime/vorc_writer.cpp b/be/src/vec/runtime/vorc_transformer.cpp similarity index 98% rename from be/src/vec/runtime/vorc_writer.cpp rename to be/src/vec/runtime/vorc_transformer.cpp index df9615d668..fa83e83e3a 100644 --- a/be/src/vec/runtime/vorc_writer.cpp +++ b/be/src/vec/runtime/vorc_transformer.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "vec/runtime/vorc_writer.h" +#include "vec/runtime/vorc_transformer.h" #include <glog/logging.h> #include <stdlib.h> @@ -85,15 +85,15 @@ void VOrcOutputStream::set_written_len(int64_t written_len) { _written_len = written_len; } -VOrcWriterWrapper::VOrcWriterWrapper(doris::io::FileWriter* file_writer, - const VExprContextSPtrs& output_vexpr_ctxs, - const std::string& schema, bool output_object_data) - : VFileWriterWrapper(output_vexpr_ctxs, output_object_data), +VOrcTransformer::VOrcTransformer(doris::io::FileWriter* file_writer, + const VExprContextSPtrs& output_vexpr_ctxs, + const std::string& schema, bool output_object_data) + : VFileFormatTransformer(output_vexpr_ctxs, output_object_data), _file_writer(file_writer), _write_options(new orc::WriterOptions()), _schema_str(schema) {} -Status VOrcWriterWrapper::prepare() { +Status VOrcTransformer::open() { try { _schema = orc::Type::buildTypeFromString(_schema_str); } catch (const std::exception& e) { @@ -108,15 +108,15 @@ Status VOrcWriterWrapper::prepare() { return Status::OK(); } -std::unique_ptr<orc::ColumnVectorBatch> VOrcWriterWrapper::_create_row_batch(size_t sz) { +std::unique_ptr<orc::ColumnVectorBatch> VOrcTransformer::_create_row_batch(size_t sz) { return _writer->createRowBatch(sz); } -int64_t VOrcWriterWrapper::written_len() { +int64_t VOrcTransformer::written_len() { return _output_stream->getLength(); } -Status VOrcWriterWrapper::close() { +Status VOrcTransformer::close() { if (_writer != nullptr) { try { _writer->close(); @@ -398,7 +398,7 @@ Status VOrcWriterWrapper::close() { #define SET_NUM_ELEMENTS cur_batch->numElements = sz; -Status VOrcWriterWrapper::write(const Block& block) { +Status VOrcTransformer::write(const Block& block) { if (block.rows() == 0) { return Status::OK(); } diff --git a/be/src/vec/runtime/vorc_writer.h b/be/src/vec/runtime/vorc_transformer.h similarity index 88% rename from be/src/vec/runtime/vorc_writer.h rename to be/src/vec/runtime/vorc_transformer.h index 9afed17e20..06a42361fb 100644 --- a/be/src/vec/runtime/vorc_writer.h +++ b/be/src/vec/runtime/vorc_transformer.h @@ -29,7 +29,7 @@ #include "orc/Type.hh" #include "orc/Writer.hh" #include "vec/core/block.h" -#include "vec/runtime/vparquet_writer.h" +#include "vec/runtime/vparquet_transformer.h" namespace doris { namespace io { @@ -72,15 +72,14 @@ private: }; // a wrapper of parquet output stream -class VOrcWriterWrapper final : public VFileWriterWrapper { +class VOrcTransformer final : public VFileFormatTransformer { public: - VOrcWriterWrapper(doris::io::FileWriter* file_writer, - const VExprContextSPtrs& output_vexpr_ctxs, const std::string& schema, - bool output_object_data); + VOrcTransformer(doris::io::FileWriter* file_writer, const VExprContextSPtrs& output_vexpr_ctxs, + const std::string& schema, bool output_object_data); - ~VOrcWriterWrapper() = default; + ~VOrcTransformer() = default; - Status prepare() override; + Status open() override; Status write(const Block& block) override; diff --git a/be/src/vec/runtime/vparquet_writer.cpp b/be/src/vec/runtime/vparquet_transformer.cpp similarity index 97% rename from be/src/vec/runtime/vparquet_writer.cpp rename to be/src/vec/runtime/vparquet_transformer.cpp index 0a6392319b..7d1ceed404 100644 --- a/be/src/vec/runtime/vparquet_writer.cpp +++ b/be/src/vec/runtime/vparquet_transformer.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "vec/runtime/vparquet_writer.h" +#include "vec/runtime/vparquet_transformer.h" #include <arrow/io/type_fwd.h> #include <glog/logging.h> @@ -278,14 +278,14 @@ void ParquetBuildHelper::build_version(parquet::WriterProperties::Builder& build } } -VParquetWriterWrapper::VParquetWriterWrapper(doris::io::FileWriter* file_writer, - const VExprContextSPtrs& output_vexpr_ctxs, - const std::vector<TParquetSchema>& parquet_schemas, - const TParquetCompressionType::type& compression_type, - const bool& parquet_disable_dictionary, - const TParquetVersion::type& parquet_version, - bool output_object_data) - : VFileWriterWrapper(output_vexpr_ctxs, output_object_data), +VParquetTransformer::VParquetTransformer(doris::io::FileWriter* file_writer, + const VExprContextSPtrs& output_vexpr_ctxs, + const std::vector<TParquetSchema>& parquet_schemas, + const TParquetCompressionType::type& compression_type, + const bool& parquet_disable_dictionary, + const TParquetVersion::type& parquet_version, + bool output_object_data) + : VFileFormatTransformer(output_vexpr_ctxs, output_object_data), _rg_writer(nullptr), _parquet_schemas(parquet_schemas), _compression_type(compression_type), @@ -294,7 +294,7 @@ VParquetWriterWrapper::VParquetWriterWrapper(doris::io::FileWriter* file_writer, _outstream = std::shared_ptr<ParquetOutputStream>(new ParquetOutputStream(file_writer)); } -Status VParquetWriterWrapper::parse_properties() { +Status VParquetTransformer::parse_properties() { try { parquet::WriterProperties::Builder builder; ParquetBuildHelper::build_compression_type(builder, _compression_type); @@ -311,7 +311,7 @@ Status VParquetWriterWrapper::parse_properties() { return Status::OK(); } -Status VParquetWriterWrapper::parse_schema() { +Status VParquetTransformer::parse_schema() { parquet::schema::NodeVector fields; parquet::Repetition::type parquet_repetition_type; parquet::Type::type parquet_physical_type; @@ -394,7 +394,7 @@ Status VParquetWriterWrapper::parse_schema() { RETURN_WRONG_TYPE \ } -Status VParquetWriterWrapper::write(const Block& block) { +Status VParquetTransformer::write(const Block& block) { if (block.rows() == 0) { return Status::OK(); } @@ -906,7 +906,7 @@ Status VParquetWriterWrapper::write(const Block& block) { return Status::OK(); } -Status VParquetWriterWrapper::prepare() { +Status VParquetTransformer::open() { RETURN_IF_ERROR(parse_properties()); RETURN_IF_ERROR(parse_schema()); try { @@ -921,7 +921,7 @@ Status VParquetWriterWrapper::prepare() { return Status::OK(); } -parquet::RowGroupWriter* VParquetWriterWrapper::get_rg_writer() { +parquet::RowGroupWriter* VParquetTransformer::get_rg_writer() { if (_rg_writer == nullptr) { _rg_writer = _writer->AppendBufferedRowGroup(); } @@ -933,11 +933,11 @@ parquet::RowGroupWriter* VParquetWriterWrapper::get_rg_writer() { return _rg_writer; } -int64_t VParquetWriterWrapper::written_len() { +int64_t VParquetTransformer::written_len() { return _outstream->get_written_len(); } -Status VParquetWriterWrapper::close() { +Status VParquetTransformer::close() { try { if (_rg_writer != nullptr) { _rg_writer->Close(); diff --git a/be/src/vec/runtime/vparquet_writer.h b/be/src/vec/runtime/vparquet_transformer.h similarity index 87% rename from be/src/vec/runtime/vparquet_writer.h rename to be/src/vec/runtime/vparquet_transformer.h index 36514c9fe8..1fedc3f44f 100644 --- a/be/src/vec/runtime/vparquet_writer.h +++ b/be/src/vec/runtime/vparquet_transformer.h @@ -86,18 +86,18 @@ public: }; // a wrapper of parquet output stream -class VParquetWriterWrapper final : public VFileWriterWrapper { +class VParquetTransformer final : public VFileFormatTransformer { public: - VParquetWriterWrapper(doris::io::FileWriter* file_writer, - const VExprContextSPtrs& output_vexpr_ctxs, - const std::vector<TParquetSchema>& parquet_schemas, - const TParquetCompressionType::type& compression_type, - const bool& parquet_disable_dictionary, - const TParquetVersion::type& parquet_version, bool output_object_data); + VParquetTransformer(doris::io::FileWriter* file_writer, + const VExprContextSPtrs& output_vexpr_ctxs, + const std::vector<TParquetSchema>& parquet_schemas, + const TParquetCompressionType::type& compression_type, + const bool& parquet_disable_dictionary, + const TParquetVersion::type& parquet_version, bool output_object_data); - ~VParquetWriterWrapper() = default; + ~VParquetTransformer() = default; - Status prepare() override; + Status open() override; Status write(const Block& block) override; diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index e54c426f74..0523f0ac08 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -30,25 +30,19 @@ #include "common/compiler_util.h" // IWYU pragma: keep #include "common/consts.h" #include "common/status.h" -#include "gutil/strings/numbers.h" #include "io/file_factory.h" #include "io/fs/broker_file_system.h" -#include "io/fs/file_writer.h" #include "io/fs/hdfs_file_system.h" #include "io/fs/local_file_system.h" #include "io/fs/s3_file_system.h" #include "io/hdfs_builder.h" #include "runtime/buffer_control_block.h" -#include "runtime/decimalv2_value.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/large_int_value.h" #include "runtime/primitive_type.h" #include "runtime/runtime_state.h" -#include "runtime/types.h" #include "service/backend_options.h" -#include "util/metrics.h" -#include "util/mysql_global.h" #include "util/mysql_row_buffer.h" #include "util/s3_uri.h" #include "util/s3_util.h" @@ -57,20 +51,15 @@ #include "vec/columns/column.h" #include "vec/columns/column_string.h" #include "vec/columns/column_vector.h" -#include "vec/common/string_ref.h" #include "vec/core/block.h" -#include "vec/core/column_with_type_and_name.h" -#include "vec/data_types/data_type.h" #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" -#include "vec/runtime/vdatetime_value.h" -#include "vec/runtime/vorc_writer.h" -#include "vec/runtime/vparquet_writer.h" +#include "vec/runtime/vcsv_transformer.h" +#include "vec/runtime/vorc_transformer.h" +#include "vec/runtime/vparquet_transformer.h" #include "vec/sink/vresult_sink.h" namespace doris::vectorized { -const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; -using doris::operator<<; VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs) : AsyncResultWriter(output_exprs) {} @@ -88,8 +77,7 @@ VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts, _fragment_instance_id(fragment_instance_id), _sinker(sinker), _output_block(output_block), - _output_row_descriptor(output_row_descriptor), - _vfile_writer(nullptr) { + _output_row_descriptor(output_row_descriptor) { _output_object_data = output_object_data; } @@ -158,19 +146,19 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) { _file_writer_impl)); switch (_file_opts->file_format) { case TFileFormatType::FORMAT_CSV_PLAIN: - // just use file writer is enough + _vfile_writer.reset(new VCSVTransformer( + _file_writer_impl.get(), _vec_output_expr_ctxs, _output_object_data, _header_type, + _header, _file_opts->column_separator, _file_opts->line_delimiter)); break; case TFileFormatType::FORMAT_PARQUET: - _vfile_writer.reset(new VParquetWriterWrapper( + _vfile_writer.reset(new VParquetTransformer( _file_writer_impl.get(), _vec_output_expr_ctxs, _file_opts->parquet_schemas, _file_opts->parquet_commpression_type, _file_opts->parquert_disable_dictionary, _file_opts->parquet_version, _output_object_data)); - RETURN_IF_ERROR(_vfile_writer->prepare()); break; case TFileFormatType::FORMAT_ORC: - _vfile_writer.reset(new VOrcWriterWrapper(_file_writer_impl.get(), _vec_output_expr_ctxs, - _file_opts->orc_schema, _output_object_data)); - RETURN_IF_ERROR(_vfile_writer->prepare()); + _vfile_writer.reset(new VOrcTransformer(_file_writer_impl.get(), _vec_output_expr_ctxs, + _file_opts->orc_schema, _output_object_data)); break; default: return Status::InternalError("unsupported file format: {}", _file_opts->file_format); @@ -178,7 +166,8 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) { LOG(INFO) << "create file for exporting query result. file name: " << file_name << ". query id: " << print_id(_state->query_id()) << " format:" << _file_opts->file_format; - return Status::OK(); + + return _vfile_writer->open(); } // file name format as: my_prefix_{fragment_instance_id}_0.csv @@ -187,7 +176,6 @@ Status VFileResultWriter::_get_next_file_name(std::string* file_name) { ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << (_file_idx++) << "." << _file_format_to_name(); *file_name = ss.str(); - _header_sent = false; if (_storage_type == TStorageBackendType::LOCAL) { // For local file writer, the file_path is a local dir. // Here we do a simple security verification by checking whether the file exists. @@ -237,215 +225,22 @@ Status VFileResultWriter::append_block(Block& block) { if (block.rows() == 0) { return Status::OK(); } - RETURN_IF_ERROR(write_csv_header()); SCOPED_TIMER(_append_row_batch_timer); Block output_block; RETURN_IF_ERROR(_projection_block(block, &output_block)); - - if (_vfile_writer) { - RETURN_IF_ERROR(_write_file(output_block)); - } else { - RETURN_IF_ERROR(_write_csv_file(output_block)); - } + RETURN_IF_ERROR(_write_file(output_block)); _written_rows += block.rows(); return Status::OK(); } Status VFileResultWriter::_write_file(const Block& block) { - RETURN_IF_ERROR(_vfile_writer->write(block)); - // split file if exceed limit - _current_written_bytes = _vfile_writer->written_len(); - return _create_new_file_if_exceed_size(); -} - -Status VFileResultWriter::_write_csv_file(const Block& block) { - for (size_t i = 0; i < block.rows(); i++) { - for (size_t col_id = 0; col_id < block.columns(); col_id++) { - auto col = block.get_by_position(col_id); - if (col.column->is_null_at(i)) { - _plain_text_outstream << NULL_IN_CSV; - } else { - switch (_vec_output_expr_ctxs[col_id]->root()->type().type) { - case TYPE_BOOLEAN: - case TYPE_TINYINT: - _plain_text_outstream << (int)*reinterpret_cast<const int8_t*>( - col.column->get_data_at(i).data); - break; - case TYPE_SMALLINT: - _plain_text_outstream - << *reinterpret_cast<const int16_t*>(col.column->get_data_at(i).data); - break; - case TYPE_INT: - _plain_text_outstream - << *reinterpret_cast<const int32_t*>(col.column->get_data_at(i).data); - break; - case TYPE_BIGINT: - _plain_text_outstream - << *reinterpret_cast<const int64_t*>(col.column->get_data_at(i).data); - break; - case TYPE_LARGEINT: - _plain_text_outstream - << *reinterpret_cast<const __int128*>(col.column->get_data_at(i).data); - break; - case TYPE_FLOAT: { - char buffer[MAX_FLOAT_STR_LENGTH + 2]; - float float_value = - *reinterpret_cast<const float*>(col.column->get_data_at(i).data); - buffer[0] = '\0'; - int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer); - DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value; - _plain_text_outstream << buffer; - break; - } - case TYPE_DOUBLE: { - // To prevent loss of precision on float and double types, - // they are converted to strings before output. - // For example: For a double value 27361919854.929001, - // the direct output of using std::stringstream is 2.73619e+10, - // and after conversion to a string, it outputs 27361919854.929001 - char buffer[MAX_DOUBLE_STR_LENGTH + 2]; - double double_value = - *reinterpret_cast<const double*>(col.column->get_data_at(i).data); - buffer[0] = '\0'; - int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer); - DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value; - _plain_text_outstream << buffer; - break; - } - case TYPE_DATEV2: { - char buf[64]; - const DateV2Value<DateV2ValueType>* time_val = - (const DateV2Value<DateV2ValueType>*)(col.column->get_data_at(i).data); - time_val->to_string(buf); - _plain_text_outstream << buf; - break; - } - case TYPE_DATETIMEV2: { - char buf[64]; - const DateV2Value<DateTimeV2ValueType>* time_val = - (const DateV2Value<DateTimeV2ValueType>*)(col.column->get_data_at(i) - .data); - time_val->to_string(buf, _vec_output_expr_ctxs[col_id]->root()->type().scale); - _plain_text_outstream << buf; - break; - } - case TYPE_DATE: - case TYPE_DATETIME: { - char buf[64]; - const VecDateTimeValue* time_val = - (const VecDateTimeValue*)(col.column->get_data_at(i).data); - time_val->to_string(buf); - _plain_text_outstream << buf; - break; - } - case TYPE_OBJECT: - case TYPE_HLL: { - if (!_output_object_data) { - _plain_text_outstream << NULL_IN_CSV; - break; - } - [[fallthrough]]; - } - case TYPE_VARCHAR: - case TYPE_CHAR: - case TYPE_STRING: { - auto value = col.column->get_data_at(i); - _plain_text_outstream << value; - break; - } - case TYPE_DECIMALV2: { - const DecimalV2Value decimal_val( - reinterpret_cast<const PackedInt128*>(col.column->get_data_at(i).data) - ->value); - std::string decimal_str; - decimal_str = decimal_val.to_string(); - _plain_text_outstream << decimal_str; - break; - } - case TYPE_DECIMAL32: { - _plain_text_outstream << col.type->to_string(*col.column, i); - break; - } - case TYPE_DECIMAL64: { - _plain_text_outstream << col.type->to_string(*col.column, i); - break; - } - case TYPE_DECIMAL128I: { - _plain_text_outstream << col.type->to_string(*col.column, i); - break; - } - case TYPE_ARRAY: { - _plain_text_outstream << col.type->to_string(*col.column, i); - break; - } - case TYPE_MAP: { - _plain_text_outstream << col.type->to_string(*col.column, i); - break; - } - case TYPE_STRUCT: { - _plain_text_outstream << col.type->to_string(*col.column, i); - break; - } - default: { - // not supported type, like BITMAP, just export null - _plain_text_outstream << NULL_IN_CSV; - } - } - } - if (col_id < block.columns() - 1) { - _plain_text_outstream << _file_opts->column_separator; - } - } - _plain_text_outstream << _file_opts->line_delimiter; - } - - return _flush_plain_text_outstream(true); -} - -std::string VFileResultWriter::gen_types() { - std::string types; - int num_columns = _vec_output_expr_ctxs.size(); - for (int i = 0; i < num_columns; ++i) { - types += type_to_string(_vec_output_expr_ctxs[i]->root()->type().type); - if (i < num_columns - 1) { - types += _file_opts->column_separator; - } - } - types += _file_opts->line_delimiter; - return types; -} - -Status VFileResultWriter::write_csv_header() { - if (!_header_sent && _header.size() > 0) { - std::string tmp_header(_header); - if (_header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { - tmp_header += gen_types(); - } - RETURN_IF_ERROR(_file_writer_impl->append(tmp_header)); - _header_sent = true; - } - return Status::OK(); -} - -Status VFileResultWriter::_flush_plain_text_outstream(bool eos) { - SCOPED_TIMER(_file_write_timer); - size_t pos = _plain_text_outstream.tellp(); - if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) { - return Status::OK(); + { + SCOPED_TIMER(_file_write_timer); + RETURN_IF_ERROR(_vfile_writer->write(block)); } - - const std::string& buf = _plain_text_outstream.str(); - size_t written_len = buf.size(); - RETURN_IF_ERROR(_file_writer_impl->append(buf)); - COUNTER_UPDATE(_written_data_bytes, written_len); - _current_written_bytes += written_len; - - // clear the stream - _plain_text_outstream.str(""); - _plain_text_outstream.clear(); - // split file if exceed limit + _current_written_bytes = _vfile_writer->written_len(); return _create_new_file_if_exceed_size(); } @@ -516,7 +311,7 @@ Status VFileResultWriter::_send_result() { result->result_batch.rows.resize(1); result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length()); - std::map<std::string, string> attach_infos; + std::map<std::string, std::string> attach_infos; attach_infos.insert(std::make_pair("FileNumber", std::to_string(_file_idx))); attach_infos.insert( std::make_pair("TotalRows", std::to_string(_written_rows_counter->value()))); @@ -634,6 +429,4 @@ Status VFileResultWriter::close() { return _close_file_writer(true); } -const string VFileResultWriter::NULL_IN_CSV = "\\N"; - } // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/vfile_result_writer.h b/be/src/vec/sink/writer/vfile_result_writer.h index b56e41c377..69a26714dc 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.h +++ b/be/src/vec/sink/writer/vfile_result_writer.h @@ -69,9 +69,6 @@ public: // file result writer always return statistic result in one row int64_t get_written_rows() const override { return 1; } - std::string gen_types(); - Status write_csv_header(); - void set_header_info(const std::string& header_type, const std::string& header) { _header_type = header_type; _header = header; @@ -80,11 +77,7 @@ public: private: Status _init(RuntimeState* state, RuntimeProfile*); Status _write_file(const Block& block); - Status _write_csv_file(const Block& block); - // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer - // if eos, write the data even if buffer is not full. - Status _flush_plain_text_outstream(bool eos); void _init_profile(RuntimeProfile*); Status _create_file_writer(const std::string& file_name); @@ -106,8 +99,6 @@ private: // delete the dir of file_path Status _delete_dir(); - static const std::string NULL_IN_CSV; - RuntimeState* _state; // not owned, set when init const ResultFileOptions* _file_opts; TStorageBackendType::type _storage_type; @@ -123,7 +114,6 @@ private: // For example: bitmap_to_string() may return large volume of data. // And the speed is relative low, in my test, is about 6.5MB/s. std::stringstream _plain_text_outstream; - static const size_t OUTSTREAM_BUFFER_SIZE_BYTES; // current written bytes, used for split data int64_t _current_written_bytes = 0; @@ -148,13 +138,11 @@ private: Block* _output_block = nullptr; // set to true if the final statistic result is sent bool _is_result_sent = false; - bool _header_sent = false; RowDescriptor _output_row_descriptor; - // parquet/orc file writer - std::unique_ptr<VFileWriterWrapper> _vfile_writer; + // convert block to parquet/orc/csv fomrat + std::unique_ptr<VFileFormatTransformer> _vfile_writer; std::string_view _header_type; std::string_view _header; - std::unique_ptr<VFileResultWriter> _writer; }; } // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org