This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 8cdccfb01a5ddd12d8ecef6f4870e8bb6235257d Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Jun 10 09:15:53 2022 +0800 [feature](vectorized) Support outfile on vectorized engine (#10013) This PR supports output csv format file on vectorized engine. ** Parquet is still not supported. ** --- be/src/exec/data_sink.cpp | 33 +- be/src/vec/CMakeLists.txt | 2 + be/src/vec/runtime/vfile_result_writer.cpp | 468 +++++++++++++++++++++++++++++ be/src/vec/runtime/vfile_result_writer.h | 124 ++++++++ be/src/vec/sink/vdata_stream_sender.cpp | 35 +++ be/src/vec/sink/vdata_stream_sender.h | 13 +- be/src/vec/sink/vresult_file_sink.cpp | 203 +++++++++++++ be/src/vec/sink/vresult_file_sink.h | 70 +++++ 8 files changed, 939 insertions(+), 9 deletions(-) diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index a0ef6336e5..a11556310a 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -38,6 +38,7 @@ #include "vec/sink/result_sink.h" #include "vec/sink/vdata_stream_sender.h" #include "vec/sink/vmysql_table_writer.h" +#include "vec/sink/vresult_file_sink.h" #include "vec/sink/vtablet_sink.h" #include "vec/sink/vmysql_table_sink.h" @@ -91,13 +92,35 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink if (!thrift_sink.__isset.result_file_sink) { return Status::InternalError("Missing result file sink."); } - // Result file sink is not the top sink - if (params.__isset.destinations && params.destinations.size() > 0) { - tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink, - params.destinations, pool, params.sender_id, desc_tbl); + + // TODO: figure out good buffer size based on size of output row + if (is_vec) { + bool send_query_statistics_with_every_batch = + params.__isset.send_query_statistics_with_every_batch + ? params.send_query_statistics_with_every_batch + : false; + // Result file sink is not the top sink + if (params.__isset.destinations && params.destinations.size() > 0) { + tmp_sink = new doris::vectorized::VResultFileSink( + pool, params.sender_id, row_desc, thrift_sink.result_file_sink, + params.destinations, 16 * 1024, send_query_statistics_with_every_batch, + output_exprs, desc_tbl); + } else { + tmp_sink = new doris::vectorized::VResultFileSink( + pool, row_desc, thrift_sink.result_file_sink, 16 * 1024, + send_query_statistics_with_every_batch, output_exprs); + } } else { - tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink); + // Result file sink is not the top sink + if (params.__isset.destinations && params.destinations.size() > 0) { + tmp_sink = + new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink, + params.destinations, pool, params.sender_id, desc_tbl); + } else { + tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink); + } } + sink->reset(tmp_sink); break; } diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 2d30b33f50..afc95e77a9 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -173,9 +173,11 @@ set(VEC_FILES sink/vtablet_sink.cpp sink/vmysql_table_writer.cpp sink/vmysql_table_sink.cpp + sink/vresult_file_sink.cpp runtime/vdatetime_value.cpp runtime/vdata_stream_recvr.cpp runtime/vdata_stream_mgr.cpp + runtime/vfile_result_writer.cpp runtime/vpartition_info.cpp runtime/vsorted_run_merger.cpp) diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp new file mode 100644 index 0000000000..8d70baa854 --- /dev/null +++ b/be/src/vec/runtime/vfile_result_writer.cpp @@ -0,0 +1,468 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/runtime/vfile_result_writer.h" + +#include "exprs/expr_context.h" +#include "gutil/strings/numbers.h" +#include "gutil/strings/substitute.h" +#include "exec/file_writer.h" +#include "exec/broker_writer.h" +#include "exec/hdfs_reader_writer.h" +#include "exec/local_file_writer.h" +#include "exec/s3_writer.h" +#include "runtime/buffer_control_block.h" +#include "runtime/descriptors.h" +#include "runtime/large_int_value.h" +#include "runtime/runtime_state.h" +#include "runtime/string_value.h" +#include "service/backend_options.h" +#include "util/file_utils.h" +#include "util/mysql_global.h" +#include "util/mysql_row_buffer.h" +#include "vec/core/block.h" + +namespace doris::vectorized { +const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; +using doris::operator<<; + +VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts, + const TStorageBackendType::type storage_type, + const TUniqueId fragment_instance_id, + const std::vector<ExprContext*>& output_expr_ctxs, + RuntimeProfile* parent_profile, BufferControlBlock* sinker, + Block* output_block, bool output_object_data, + const RowDescriptor& output_row_descriptor) + : _file_opts(file_opts), + _storage_type(storage_type), + _fragment_instance_id(fragment_instance_id), + _output_expr_ctxs(output_expr_ctxs), + _parent_profile(parent_profile), + _sinker(sinker), + _output_block(output_block), + _output_row_descriptor(output_row_descriptor) { + _output_object_data = output_object_data; +} + +Status VFileResultWriter::init(RuntimeState* state) { + _state = state; + _init_profile(); + return _create_next_file_writer(); +} + +void VFileResultWriter::_init_profile() { + RuntimeProfile* profile = _parent_profile->create_child("VFileResultWriter", true, true); + _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime"); + _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime"); + _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime"); + _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime"); + _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT); + _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES); +} + +Status VFileResultWriter::_create_success_file() { + std::string file_name; + RETURN_IF_ERROR(_get_success_file_name(&file_name)); + RETURN_IF_ERROR(_create_file_writer(file_name)); + return _close_file_writer(true); +} + +Status VFileResultWriter::_get_success_file_name(std::string* file_name) { + std::stringstream ss; + ss << _file_opts->file_path << _file_opts->success_file_name; + *file_name = ss.str(); + if (_storage_type == TStorageBackendType::LOCAL) { + // For local file writer, the file_path is a local dir. + // Here we do a simple security verification by checking whether the file exists. + // Because the file path is currently arbitrarily specified by the user, + // Doris is not responsible for ensuring the correctness of the path. + // This is just to prevent overwriting the existing file. + if (FileUtils::check_exist(*file_name)) { + return Status::InternalError("File already exists: " + *file_name + + ". Host: " + BackendOptions::get_localhost()); + } + } + + return Status::OK(); +} + +Status VFileResultWriter::_create_next_file_writer() { + std::string file_name; + RETURN_IF_ERROR(_get_next_file_name(&file_name)); + return _create_file_writer(file_name); +} + +Status VFileResultWriter::_create_file_writer(const std::string& file_name) { + if (_storage_type == TStorageBackendType::LOCAL) { + _file_writer_impl.reset(new LocalFileWriter(file_name, 0 /* start offset */)); + } else if (_storage_type == TStorageBackendType::BROKER) { + _file_writer_impl.reset(new BrokerWriter(_state->exec_env(), _file_opts->broker_addresses, _file_opts->broker_properties, file_name, 0 /*start offset*/)); + } else if (_storage_type == TStorageBackendType::S3) { + _file_writer_impl.reset(new S3Writer(_file_opts->broker_properties, file_name, 0 /* offset */)); + } else if (_storage_type == TStorageBackendType::HDFS) { + FileWriter* tmp_writer = nullptr; + RETURN_IF_ERROR(HdfsReaderWriter::create_writer( + const_cast<std::map<std::string, std::string>&>(_file_opts->broker_properties), + file_name, &tmp_writer)); + _file_writer_impl.reset(tmp_writer); + } + + RETURN_IF_ERROR(_file_writer_impl->open()); + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + // just use file writer is enough + break; + case TFileFormatType::FORMAT_PARQUET: + return Status::NotSupported("Parquet Writer is not supported yet!"); + break; + default: + return Status::InternalError( + strings::Substitute("unsupported file format: $0", _file_opts->file_format)); + } + LOG(INFO) << "create file for exporting query result. file name: " << file_name + << ". query id: " << print_id(_state->query_id()) + << " format:" << _file_opts->file_format; + return Status::OK(); +} + +// file name format as: my_prefix_{fragment_instance_id}_0.csv +Status VFileResultWriter::_get_next_file_name(std::string* file_name) { + std::stringstream ss; + ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << (_file_idx++) << "." + << _file_format_to_name(); + *file_name = ss.str(); + _header_sent = false; + if (_storage_type == TStorageBackendType::LOCAL) { + // For local file writer, the file_path is a local dir. + // Here we do a simple security verification by checking whether the file exists. + // Because the file path is currently arbitrarily specified by the user, + // Doris is not responsible for ensuring the correctness of the path. + // This is just to prevent overwriting the existing file. + if (FileUtils::check_exist(*file_name)) { + return Status::InternalError("File already exists: " + *file_name + + ". Host: " + BackendOptions::get_localhost()); + } + } + + return Status::OK(); +} + +// file url format as: +// LOCAL: file:///localhost_address/{file_path}{fragment_instance_id}_ +// S3: {file_path}{fragment_instance_id}_ +// BROKER: {file_path}{fragment_instance_id}_ + +Status VFileResultWriter::_get_file_url(std::string* file_url) { + std::stringstream ss; + if (_storage_type == TStorageBackendType::LOCAL) { + ss << "file:///" << BackendOptions::get_localhost(); + } + ss << _file_opts->file_path; + ss << print_id(_fragment_instance_id) << "_"; + *file_url = ss.str(); + return Status::OK(); +} + +std::string VFileResultWriter::_file_format_to_name() { + switch (_file_opts->file_format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + return "csv"; + case TFileFormatType::FORMAT_PARQUET: + return "parquet"; + default: + return "unknown"; + } +} + +Status VFileResultWriter::append_block(Block& block) { + if (block.rows() == 0) { + return Status::OK(); + } + SCOPED_TIMER(_append_row_batch_timer); + if (_parquet_writer != nullptr) { + return Status::NotSupported("Parquet Writer is not supported yet!"); + } else { + RETURN_IF_ERROR(_write_csv_file(block)); + } + + _written_rows += block.rows(); + return Status::OK(); +} + +Status VFileResultWriter::_write_csv_file(const Block& block) { + for (size_t i = 0; i < block.rows(); i++) { + for (size_t col_id = 0; col_id < block.columns(); col_id++) { + auto col = block.get_by_position(col_id); + if (col.column->is_null_at(i)) { + _plain_text_outstream << NULL_IN_CSV; + } else { + switch (_output_expr_ctxs[col_id]->root()->type().type) { + case TYPE_BOOLEAN: + case TYPE_TINYINT: + _plain_text_outstream << (int)*reinterpret_cast<const int8_t*>( + col.column->get_data_at(i).data); + break; + case TYPE_SMALLINT: + _plain_text_outstream + << *reinterpret_cast<const int16_t*>(col.column->get_data_at(i).data); + break; + case TYPE_INT: + _plain_text_outstream + << *reinterpret_cast<const int32_t*>(col.column->get_data_at(i).data); + break; + case TYPE_BIGINT: + _plain_text_outstream + << *reinterpret_cast<const int64_t*>(col.column->get_data_at(i).data); + break; + case TYPE_LARGEINT: + _plain_text_outstream + << *reinterpret_cast<const __int128*>(col.column->get_data_at(i).data); + break; + case TYPE_FLOAT: { + char buffer[MAX_FLOAT_STR_LENGTH + 2]; + float float_value = + *reinterpret_cast<const float*>(col.column->get_data_at(i).data); + buffer[0] = '\0'; + int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value; + _plain_text_outstream << buffer; + break; + } + case TYPE_DOUBLE: { + // To prevent loss of precision on float and double types, + // they are converted to strings before output. + // For example: For a double value 27361919854.929001, + // the direct output of using std::stringstream is 2.73619e+10, + // and after conversion to a string, it outputs 27361919854.929001 + char buffer[MAX_DOUBLE_STR_LENGTH + 2]; + double double_value = + *reinterpret_cast<const double*>(col.column->get_data_at(i).data); + buffer[0] = '\0'; + int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer); + DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value; + _plain_text_outstream << buffer; + break; + } + case TYPE_DATE: + case TYPE_DATETIME: { + char buf[64]; + const VecDateTimeValue* time_val = + (const VecDateTimeValue*)(col.column->get_data_at(i).data); + time_val->to_string(buf); + _plain_text_outstream << buf; + break; + } + case TYPE_OBJECT: + case TYPE_HLL: + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + auto value = col.column->get_data_at(i); + _plain_text_outstream << value; + break; + } + case TYPE_DECIMALV2: { + const DecimalV2Value decimal_val( + reinterpret_cast<const PackedInt128*>(col.column->get_data_at(i).data) + ->value); + std::string decimal_str; + int output_scale = _output_expr_ctxs[col_id]->root()->output_scale(); + decimal_str = decimal_val.to_string(output_scale); + _plain_text_outstream << decimal_str; + break; + } + default: { + // not supported type, like BITMAP, HLL, just export null + _plain_text_outstream << NULL_IN_CSV; + } + } + } + if (col_id < block.columns() - 1) { + _plain_text_outstream << _file_opts->column_separator; + } + } + _plain_text_outstream << _file_opts->line_delimiter; + } + + return _flush_plain_text_outstream(true); +} + +Status VFileResultWriter::_flush_plain_text_outstream(bool eos) { + SCOPED_TIMER(_file_write_timer); + size_t pos = _plain_text_outstream.tellp(); + if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) { + return Status::OK(); + } + + const std::string& buf = _plain_text_outstream.str(); + size_t written_len = 0; + RETURN_IF_ERROR(_file_writer_impl->write(reinterpret_cast<const uint8_t*>(buf.c_str()), + buf.size(), &written_len)); + COUNTER_UPDATE(_written_data_bytes, written_len); + _current_written_bytes += written_len; + + // clear the stream + _plain_text_outstream.str(""); + _plain_text_outstream.clear(); + + // split file if exceed limit + return _create_new_file_if_exceed_size(); +} + +Status VFileResultWriter::_create_new_file_if_exceed_size() { + if (_current_written_bytes < _file_opts->max_file_size_bytes) { + return Status::OK(); + } + // current file size exceed the max file size. close this file + // and create new one + { + SCOPED_TIMER(_writer_close_timer); + RETURN_IF_ERROR(_close_file_writer(false)); + } + _current_written_bytes = 0; + return Status::OK(); +} + +Status VFileResultWriter::_close_file_writer(bool done) { + if (_parquet_writer != nullptr) { + return Status::NotSupported("Parquet Writer is not supported yet!"); + } else if (_file_writer_impl) { + _file_writer_impl->close(); + } + + if (!done) { + // not finished, create new file writer for next file + RETURN_IF_ERROR(_create_next_file_writer()); + } else { + // All data is written to file, send statistic result + if (_file_opts->success_file_name != "") { + // write success file, just need to touch an empty file + RETURN_IF_ERROR(_create_success_file()); + } + if (_output_block == nullptr) { + RETURN_IF_ERROR(_send_result()); + } else { + RETURN_IF_ERROR(_fill_result_block()); + } + } + return Status::OK(); +} + +Status VFileResultWriter::_send_result() { + if (_is_result_sent) { + return Status::OK(); + } + _is_result_sent = true; + + // The final stat result include: + // FileNumber, TotalRows, FileSize and URL + // The type of these field should be conssitent with types defined + // in OutFileClause.java of FE. + MysqlRowBuffer row_buffer; + row_buffer.push_int(_file_idx); // file number + row_buffer.push_bigint(_written_rows_counter->value()); // total rows + row_buffer.push_bigint(_written_data_bytes->value()); // file size + std::string file_url; + _get_file_url(&file_url); + row_buffer.push_string(file_url.c_str(), file_url.length()); // url + + std::unique_ptr<TFetchDataResult> result = std::make_unique<TFetchDataResult>(); + result->result_batch.rows.resize(1); + result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length()); + RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send outfile result"); + return Status::OK(); +} + +Status VFileResultWriter::_fill_result_block() { + if (_is_result_sent) { + return Status::OK(); + } + _is_result_sent = true; + +#ifndef INSERT_TO_COLUMN +#define INSERT_TO_COLUMN \ + if (i == 0) { \ + column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0); \ + } else if (i == 1) { \ + int64_t written_rows = _written_rows_counter->value(); \ + column->insert_data(reinterpret_cast<const char*>(&written_rows), 0); \ + } else if (i == 2) { \ + int64_t written_data_bytes = _written_data_bytes->value(); \ + column->insert_data(reinterpret_cast<const char*>(&written_data_bytes), 0); \ + } else if (i == 3) { \ + std::string file_url; \ + _get_file_url(&file_url); \ + column->insert_data(file_url.c_str(), file_url.size()); \ + } \ + _output_block->replace_by_position(i, std::move(column)); +#endif + + for (int i = 0; i < _output_block->columns(); i++) { + switch (_output_row_descriptor.tuple_descriptors()[0]->slots()[i]->type().type) { + case TYPE_INT: { + auto column = ColumnVector<int32_t>::create(); + INSERT_TO_COLUMN; + break; + } + case TYPE_BIGINT: { + auto column = ColumnVector<int64_t>::create(); + INSERT_TO_COLUMN; + break; + } + case TYPE_LARGEINT: { + auto column = ColumnVector<int128_t>::create(); + INSERT_TO_COLUMN; + break; + } + case TYPE_SMALLINT: { + auto column = ColumnVector<int16_t>::create(); + INSERT_TO_COLUMN; + break; + } + case TYPE_TINYINT: { + auto column = ColumnVector<int8_t>::create(); + INSERT_TO_COLUMN; + break; + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + auto column = ColumnVector<int8_t>::create(); + INSERT_TO_COLUMN; + break; + } + default: + return Status::InternalError(strings::Substitute( + "Invalid type to print: $0", + _output_row_descriptor.tuple_descriptors()[0]->slots()[i]->type().type)); + } + } + return Status::OK(); +} + +Status VFileResultWriter::close() { + // the following 2 profile "_written_rows_counter" and "_writer_close_timer" + // must be outside the `_close_file_writer()`. + // because `_close_file_writer()` may be called in deconstructor, + // at that time, the RuntimeState may already been deconstructed, + // so does the profile in RuntimeState. + COUNTER_SET(_written_rows_counter, _written_rows); + SCOPED_TIMER(_writer_close_timer); + return _close_file_writer(true); +} + +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vfile_result_writer.h b/be/src/vec/runtime/vfile_result_writer.h new file mode 100644 index 0000000000..b7fd2cd737 --- /dev/null +++ b/be/src/vec/runtime/vfile_result_writer.h @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "exec/file_writer.h" +#include "runtime/file_result_writer.h" +#include "vec/sink/result_sink.h" + +namespace doris { + +namespace vectorized { +// write result to file +class VFileResultWriter final : public VResultWriter { +public: + VFileResultWriter(const ResultFileOptions* file_option, + const TStorageBackendType::type storage_type, + const TUniqueId fragment_instance_id, + const std::vector<ExprContext*>& output_expr_ctxs, + RuntimeProfile* parent_profile, BufferControlBlock* sinker, + Block* output_block, bool output_object_data, + const RowDescriptor& output_row_descriptor); + virtual ~VFileResultWriter() = default; + + virtual Status append_block(Block& block) override; + virtual Status append_row_batch(const RowBatch* batch) override { + return Status::NotSupported("append_row_batch is not supported in VFileResultWriter!"); + }; + + virtual Status init(RuntimeState* state) override; + virtual Status close() override; + + // file result writer always return statistic result in one row + virtual int64_t get_written_rows() const override { return 1; } + +private: + Status _write_csv_file(const Block& block); + + // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer + // if eos, write the data even if buffer is not full. + Status _flush_plain_text_outstream(bool eos); + void _init_profile(); + + Status _create_file_writer(const std::string& file_name); + Status _create_next_file_writer(); + Status _create_success_file(); + // get next export file name + Status _get_next_file_name(std::string* file_name); + Status _get_success_file_name(std::string* file_name); + Status _get_file_url(std::string* file_url); + std::string _file_format_to_name(); + // close file writer, and if !done, it will create new writer for next file. + // if only_close is true, this method will just close the file writer and return. + Status _close_file_writer(bool done); + // create a new file if current file size exceed limit + Status _create_new_file_if_exceed_size(); + // send the final statistic result + Status _send_result(); + // save result into batch rather than send it + Status _fill_result_block(); + + RuntimeState* _state; // not owned, set when init + const ResultFileOptions* _file_opts; + TStorageBackendType::type _storage_type; + TUniqueId _fragment_instance_id; + const std::vector<ExprContext*>& _output_expr_ctxs; + + // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter. + // If the result file format is Parquet, this _file_writer is owned by _parquet_writer. + std::unique_ptr<FileWriter> _file_writer_impl; + // parquet file writer + ParquetWriterWrapper* _parquet_writer = nullptr; + // Used to buffer the export data of plain text + // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling + // file writer's write() for every single row. + // But this cannot solve the problem of a row of data that is too large. + // For example: bitmap_to_string() may return large volumn of data. + // And the speed is relative low, in my test, is about 6.5MB/s. + std::stringstream _plain_text_outstream; + static const size_t OUTSTREAM_BUFFER_SIZE_BYTES; + + // current written bytes, used for split data + int64_t _current_written_bytes = 0; + // the suffix idx of export file name, start at 0 + int _file_idx = 0; + + RuntimeProfile* _parent_profile; // profile from result sink, not owned + // total time cost on append batch operation + RuntimeProfile::Counter* _append_row_batch_timer = nullptr; + // tuple convert timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _convert_tuple_timer = nullptr; + // file write timer, child timer of _append_row_batch_timer + RuntimeProfile::Counter* _file_write_timer = nullptr; + // time of closing the file writer + RuntimeProfile::Counter* _writer_close_timer = nullptr; + // number of written rows + RuntimeProfile::Counter* _written_rows_counter = nullptr; + // bytes of written data + RuntimeProfile::Counter* _written_data_bytes = nullptr; + + // _sinker and _output_batch are not owned by FileResultWriter + BufferControlBlock* _sinker = nullptr; + Block* _output_block = nullptr; + // set to true if the final statistic result is sent + bool _is_result_sent = false; + bool _header_sent = false; + RowDescriptor _output_row_descriptor; +}; +} // namespace vectorized +} // namespace doris diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 249f70ac0f..cbadfc931f 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -290,6 +290,41 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD _name = "VDataStreamSender"; } +VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, + const std::vector<TPlanFragmentDestination>& destinations, + int per_channel_buffer_size, + bool send_query_statistics_with_every_batch) + : _sender_id(sender_id), + _pool(pool), + _row_desc(row_desc), + _current_channel_idx(0), + _ignore_not_found(true), + _cur_pb_block(&_pb_block1), + _profile(nullptr), + _serialize_batch_timer(nullptr), + _bytes_sent_counter(nullptr), + _local_bytes_send_counter(nullptr), + _dest_node_id(0) { + _name = "VDataStreamSender"; +} + +VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, + int per_channel_buffer_size, + bool send_query_statistics_with_every_batch) + : _sender_id(0), + _pool(pool), + _row_desc(row_desc), + _current_channel_idx(0), + _ignore_not_found(true), + _cur_pb_block(&_pb_block1), + _profile(nullptr), + _serialize_batch_timer(nullptr), + _bytes_sent_counter(nullptr), + _local_bytes_send_counter(nullptr), + _dest_node_id(0) { + _name = "VDataStreamSender"; +} + VDataStreamSender::~VDataStreamSender() { _channel_shared_ptrs.clear(); } diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index d62ebeb684..67faae452b 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -45,13 +45,20 @@ namespace vectorized { class VExprContext; class VPartitionInfo; -class VDataStreamSender final : public DataSink { +class VDataStreamSender : public DataSink { public: VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TDataStreamSink& sink, const std::vector<TPlanFragmentDestination>& destinations, int per_channel_buffer_size, bool send_query_statistics_with_every_batch); + VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, + const std::vector<TPlanFragmentDestination>& destinations, + int per_channel_buffer_size, bool send_query_statistics_with_every_batch); + + VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int per_channel_buffer_size, + bool send_query_statistics_with_every_batch); + ~VDataStreamSender(); virtual Status init(const TDataSink& thrift_sink) override; @@ -69,10 +76,8 @@ public: Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1); -private: +protected: void _roll_pb_block(); - -private: class Channel; Status get_partition_column_result(Block* block, int* result) const { diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp new file mode 100644 index 0000000000..6d8d994585 --- /dev/null +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/sink/vresult_file_sink.h" + +#include "common/config.h" +#include "exprs/expr.h" +#include "runtime/buffer_control_block.h" +#include "runtime/exec_env.h" +#include "runtime/file_result_writer.h" +#include "runtime/result_buffer_mgr.h" +#include "runtime/result_file_sink.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" +#include "util/uid_util.h" +#include "vec/runtime/vfile_result_writer.h" + +namespace doris::vectorized { + +VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, + const TResultFileSink& sink, int per_channel_buffer_size, + bool send_query_statistics_with_every_batch, + const std::vector<TExpr>& t_output_expr) + : VDataStreamSender(pool, row_desc, per_channel_buffer_size, + send_query_statistics_with_every_batch), + _t_output_expr(t_output_expr) { + CHECK(sink.__isset.file_options); + _file_opts.reset(new ResultFileOptions(sink.file_options)); + CHECK(sink.__isset.storage_backend_type); + _storage_type = sink.storage_backend_type; + _is_top_sink = true; + + _name = "VResultFileSink"; +} + +VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, + const TResultFileSink& sink, + const std::vector<TPlanFragmentDestination>& destinations, + int per_channel_buffer_size, + bool send_query_statistics_with_every_batch, + const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs) + : VDataStreamSender(pool, sender_id, row_desc, destinations, per_channel_buffer_size, + send_query_statistics_with_every_batch), + _t_output_expr(t_output_expr), + _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) { + CHECK(sink.__isset.file_options); + _file_opts.reset(new ResultFileOptions(sink.file_options)); + CHECK(sink.__isset.storage_backend_type); + _storage_type = sink.storage_backend_type; + _is_top_sink = false; + DCHECK_EQ(destinations.size(), 1); + _channel_shared_ptrs.emplace_back(new Channel( + this, _output_row_descriptor, destinations[0].brpc_server, + destinations[0].fragment_instance_id, sink.dest_node_id, _buf_size, true, true)); + _channels.push_back(_channel_shared_ptrs.back().get()); + + _name = "VResultFileSink"; +} + +Status VResultFileSink::init(const TDataSink& tsink) { + return Status::OK(); +} + +Status VResultFileSink::prepare_exprs(RuntimeState* state) { + // From the thrift expressions create the real exprs. + RETURN_IF_ERROR(Expr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_expr_ctxs)); + // Prepare the exprs to run. + RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _expr_mem_tracker)); + return Status::OK(); +} + +Status VResultFileSink::prepare(RuntimeState* state) { + RETURN_IF_ERROR(DataSink::prepare(state)); + std::stringstream title; + title << "VResultFileSink (fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ")"; + // create profile + _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); + // prepare output_expr + RETURN_IF_ERROR(prepare_exprs(state)); + + CHECK(_file_opts.get() != nullptr); + if (_is_top_sink) { + // create sender + RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender( + state->fragment_instance_id(), _buf_size, &_sender)); + // create writer + _writer.reset(new (std::nothrow) VFileResultWriter( + _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs, + _profile, _sender.get(), nullptr, state->return_object_data_as_binary(), + _output_row_descriptor)); + } else { + // init channel + _profile = _pool->add(new RuntimeProfile(title.str())); + _state = state; + _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime"); + _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES); + _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES); + _uncompressed_bytes_counter = + ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES); + // create writer + _output_block.reset(new Block(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1)); + _writer.reset(new (std::nothrow) VFileResultWriter( + _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs, + _profile, nullptr, _output_block.get(), state->return_object_data_as_binary(), + _output_row_descriptor)); + } + RETURN_IF_ERROR(_writer->init(state)); + for (int i = 0; i < _channels.size(); ++i) { + RETURN_IF_ERROR(_channels[i]->init(state)); + } + return Status::OK(); +} + +Status VResultFileSink::open(RuntimeState* state) { + return Expr::open(_output_expr_ctxs, state); +} + +Status VResultFileSink::send(RuntimeState* state, RowBatch* batch) { + return Status::NotSupported("Not Implemented VResultFileSink Node::get_next scalar"); +} + +Status VResultFileSink::send(RuntimeState* state, Block* block) { + RETURN_IF_ERROR(_writer->append_block(*block)); + return Status::OK(); +} + +Status VResultFileSink::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } + + Status final_status = exec_status; + // close the writer + if (_writer) { + Status st = _writer->close(); + if (!st.ok() && exec_status.ok()) { + // close file writer failed, should return this error to client + final_status = st; + } + } + if (_is_top_sink) { + // close sender, this is normal path end + if (_sender) { + _sender->update_num_written_rows(_writer == nullptr ? 0 : _writer->get_written_rows()); + _sender->close(final_status); + } + state->exec_env()->result_mgr()->cancel_at_time( + time(nullptr) + config::result_buffer_cancelled_interval_time, + state->fragment_instance_id()); + } else { + if (final_status.ok()) { + RETURN_IF_ERROR(serialize_block(_output_block.get(), _cur_pb_block, _channels.size())); + for (auto channel : _channels) { + RETURN_IF_ERROR(channel->send_block(_cur_pb_block)); + } + } + Status final_st = Status::OK(); + for (int i = 0; i < _channels.size(); ++i) { + Status st = _channels[i]->close(state); + if (!st.ok() && final_st.ok()) { + final_st = st; + } + } + // wait all channels to finish + for (int i = 0; i < _channels.size(); ++i) { + Status st = _channels[i]->close_wait(state); + if (!st.ok() && final_st.ok()) { + final_st = st; + } + } + _output_block->clear(); + } + + Expr::close(_output_expr_ctxs, state); + + _closed = true; + return Status::OK(); +} + +void VResultFileSink::set_query_statistics(std::shared_ptr<QueryStatistics> statistics) { + if (_is_top_sink) { + _sender->set_query_statistics(statistics); + } else { + _query_statistics = statistics; + } +} + +} // namespace doris::vectorized diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h new file mode 100644 index 0000000000..e924883b42 --- /dev/null +++ b/be/src/vec/sink/vresult_file_sink.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "runtime/result_file_sink.h" +#include "vec/sink/vdata_stream_sender.h" + +namespace doris { +namespace vectorized { +class VResultWriter; + +class VResultFileSink : public VDataStreamSender { +public: + VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const TResultFileSink& sink, + int per_channel_buffer_size, bool send_query_statistics_with_every_batch, + const std::vector<TExpr>& t_output_expr); + VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, + const TResultFileSink& sink, + const std::vector<TPlanFragmentDestination>& destinations, + int per_channel_buffer_size, bool send_query_statistics_with_every_batch, + const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs); + virtual ~VResultFileSink() = default; + virtual Status init(const TDataSink& thrift_sink) override; + virtual Status prepare(RuntimeState* state) override; + virtual Status open(RuntimeState* state) override; + // send data in 'batch' to this backend stream mgr + // Blocks until all rows in batch are placed in the buffer + virtual Status send(RuntimeState* state, RowBatch* batch) override; + virtual Status send(RuntimeState* state, Block* block) override; + // Flush all buffered data and close all existing channels to destination + // hosts. Further send() calls are illegal after calling close(). + virtual Status close(RuntimeState* state, Status exec_status) override; + virtual RuntimeProfile* profile() override { return _profile; } + + void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override; + +private: + Status prepare_exprs(RuntimeState* state); + // set file options when sink type is FILE + std::unique_ptr<ResultFileOptions> _file_opts; + TStorageBackendType::type _storage_type; + + // Owned by the RuntimeState. + const std::vector<TExpr>& _t_output_expr; + std::vector<ExprContext*> _output_expr_ctxs; + RowDescriptor _output_row_descriptor; + + std::unique_ptr<Block> _output_block = nullptr; + std::shared_ptr<BufferControlBlock> _sender; + std::shared_ptr<VResultWriter> _writer; + int _buf_size = 1024; // Allocated from _pool + bool _is_top_sink = true; +}; +} // namespace vectorized +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org