This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new c74ca15753 [pipeline](sink) Supprt Async Writer Sink of result file sink and memory scratch sink (#23589) c74ca15753 is described below commit c74ca157530a8b10bf24324f957df67e273599a4 Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Aug 31 22:44:25 2023 +0800 [pipeline](sink) Supprt Async Writer Sink of result file sink and memory scratch sink (#23589) --- be/src/exec/data_sink.cpp | 22 +++---- be/src/exec/data_sink.h | 4 +- be/src/runtime/record_batch_queue.h | 2 + be/src/runtime/result_writer.h | 11 +--- .../runtime/vfile_writer_wrapper.h} | 35 ++++++++-- be/src/vec/runtime/vparquet_writer.h | 30 +-------- .../sink/{vtable_sink.h => async_writer_sink.h} | 34 ++++------ be/src/vec/sink/multi_cast_data_stream_sink.h | 6 +- be/src/vec/sink/vdata_stream_sender.cpp | 2 - be/src/vec/sink/vdata_stream_sender.h | 2 - be/src/vec/sink/vmemory_scratch_sink.cpp | 4 ++ be/src/vec/sink/vmemory_scratch_sink.h | 2 +- be/src/vec/sink/vresult_file_sink.cpp | 77 ++++++---------------- be/src/vec/sink/vresult_file_sink.h | 25 +++---- be/src/vec/sink/vresult_sink.cpp | 1 + be/src/vec/sink/vresult_sink.h | 4 +- be/src/vec/sink/vtablet_sink.h | 5 -- be/src/vec/sink/vtablet_sink_v2.h | 5 -- be/src/vec/sink/writer/async_result_writer.h | 4 +- .../writer}/vfile_result_writer.cpp | 51 +++++++------- .../{runtime => sink/writer}/vfile_result_writer.h | 34 +++++++--- be/src/vec/sink/writer/vjdbc_table_writer.h | 2 + be/src/vec/sink/writer/vmysql_table_writer.cpp | 4 +- be/src/vec/sink/writer/vmysql_table_writer.h | 4 +- be/src/vec/sink/writer/vodbc_table_writer.h | 3 +- 25 files changed, 159 insertions(+), 214 deletions(-) diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 53f44dd590..5c40475eeb 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -30,12 +30,12 @@ #include <string> #include "common/config.h" +#include "vec/sink/async_writer_sink.h" #include "vec/sink/multi_cast_data_stream_sink.h" #include "vec/sink/vdata_stream_sender.h" #include "vec/sink/vmemory_scratch_sink.h" #include "vec/sink/vresult_file_sink.h" #include "vec/sink/vresult_sink.h" -#include "vec/sink/vtable_sink.h" #include "vec/sink/vtablet_sink.h" #include "vec/sink/vtablet_sink_v2.h" @@ -92,9 +92,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink params.destinations, send_query_statistics_with_every_batch, output_exprs, desc_tbl)); } else { - sink->reset(new doris::vectorized::VResultFileSink( - state, pool, row_desc, thrift_sink.result_file_sink, - send_query_statistics_with_every_batch, output_exprs)); + sink->reset(new doris::vectorized::VResultFileSink(row_desc, output_exprs)); } break; } @@ -112,7 +110,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink return Status::InternalError("Missing data buffer sink."); } vectorized::VMysqlTableSink* vmysql_tbl_sink = - new vectorized::VMysqlTableSink(pool, row_desc, output_exprs); + new vectorized::VMysqlTableSink(row_desc, output_exprs); sink->reset(vmysql_tbl_sink); break; #else @@ -124,7 +122,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink if (!thrift_sink.__isset.odbc_table_sink) { return Status::InternalError("Missing data odbc sink."); } - sink->reset(new vectorized::VOdbcTableSink(pool, row_desc, output_exprs)); + sink->reset(new vectorized::VOdbcTableSink(row_desc, output_exprs)); break; } @@ -133,7 +131,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink return Status::InternalError("Missing data jdbc sink."); } if (config::enable_java_support) { - sink->reset(new vectorized::VJdbcTableSink(pool, row_desc, output_exprs)); + sink->reset(new vectorized::VJdbcTableSink(row_desc, output_exprs)); } else { return Status::InternalError( "Jdbc table sink is not enabled, you can change be config " @@ -234,9 +232,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink params.destinations, send_query_statistics_with_every_batch, output_exprs, desc_tbl)); } else { - sink->reset(new doris::vectorized::VResultFileSink( - state, pool, row_desc, thrift_sink.result_file_sink, - send_query_statistics_with_every_batch, output_exprs)); + sink->reset(new doris::vectorized::VResultFileSink(row_desc, output_exprs)); } break; } @@ -254,7 +250,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink return Status::InternalError("Missing data buffer sink."); } vectorized::VMysqlTableSink* vmysql_tbl_sink = - new vectorized::VMysqlTableSink(pool, row_desc, output_exprs); + new vectorized::VMysqlTableSink(row_desc, output_exprs); sink->reset(vmysql_tbl_sink); break; #else @@ -266,7 +262,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink if (!thrift_sink.__isset.odbc_table_sink) { return Status::InternalError("Missing data odbc sink."); } - sink->reset(new vectorized::VOdbcTableSink(pool, row_desc, output_exprs)); + sink->reset(new vectorized::VOdbcTableSink(row_desc, output_exprs)); break; } @@ -275,7 +271,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink return Status::InternalError("Missing data jdbc sink."); } if (config::enable_java_support) { - sink->reset(new vectorized::VJdbcTableSink(pool, row_desc, output_exprs)); + sink->reset(new vectorized::VJdbcTableSink(row_desc, output_exprs)); } else { return Status::InternalError( "Jdbc table sink is not enabled, you can change be config " diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index 8200ff5d4c..778ee2da83 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -104,7 +104,7 @@ public: DescriptorTbl& desc_tbl); // Returns the runtime profile for the sink. - virtual RuntimeProfile* profile() = 0; + RuntimeProfile* profile() { return _profile; } virtual void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) { _query_statistics = statistics; @@ -121,6 +121,8 @@ protected: std::string _name; const RowDescriptor& _row_desc; + RuntimeProfile* _profile = nullptr; // Allocated from _pool + // Maybe this will be transferred to BufferControlBlock. std::shared_ptr<QueryStatistics> _query_statistics; diff --git a/be/src/runtime/record_batch_queue.h b/be/src/runtime/record_batch_queue.h index 7ababc9b61..7528b85f09 100644 --- a/be/src/runtime/record_batch_queue.h +++ b/be/src/runtime/record_batch_queue.h @@ -59,6 +59,8 @@ public: // Shut down the queue. Wakes up all threads waiting on blocking_get or blocking_put. void shutdown(); + size_t size() { return _queue.get_size(); } + private: BlockingQueue<std::shared_ptr<arrow::RecordBatch>> _queue; SpinLock _status_lock; diff --git a/be/src/runtime/result_writer.h b/be/src/runtime/result_writer.h index a1458f1a71..b6cdd10c3a 100644 --- a/be/src/runtime/result_writer.h +++ b/be/src/runtime/result_writer.h @@ -34,7 +34,6 @@ class RuntimeState; class ResultWriter { public: ResultWriter() = default; - ResultWriter(bool output_object_data) : _output_object_data(output_object_data) {} virtual ~ResultWriter() = default; virtual Status init(RuntimeState* state) = 0; @@ -43,7 +42,7 @@ public: virtual int64_t get_written_rows() const { return _written_rows; } - virtual bool output_object_data() const { return _output_object_data; } + bool output_object_data() const { return _output_object_data; } virtual Status append_block(vectorized::Block& block) = 0; @@ -53,17 +52,9 @@ public: _output_object_data = output_object_data; } - static const std::string NULL_IN_CSV; - virtual void set_header_info(const std::string& header_type, const std::string& header) { - _header_type = header_type; - _header = header; - } - protected: int64_t _written_rows = 0; // number of rows written bool _output_object_data = false; - std::string _header_type; - std::string _header; }; } // namespace doris diff --git a/be/src/runtime/result_writer.cpp b/be/src/vec/runtime/vfile_writer_wrapper.h similarity index 50% rename from be/src/runtime/result_writer.cpp rename to be/src/vec/runtime/vfile_writer_wrapper.h index b5537e486c..e418a14ffa 100644 --- a/be/src/runtime/result_writer.cpp +++ b/be/src/vec/runtime/vfile_writer_wrapper.h @@ -15,12 +15,37 @@ // specific language governing permissions and limitations // under the License. -#include "runtime/result_writer.h" +#pragma once -namespace doris { +#include <memory> +#include <vector> -const std::string ResultWriter::NULL_IN_CSV = "\\N"; +#include "common/status.h" +#include "vec/core/block.h" +#include "vec/exprs/vexpr_fwd.h" -} +namespace doris::vectorized { -/* vim: set ts=4 sw=4 sts=4 tw=100 expandtab : */ +class VFileWriterWrapper { +public: + VFileWriterWrapper(const VExprContextSPtrs& output_vexpr_ctxs, bool output_object_data) + : _output_vexpr_ctxs(output_vexpr_ctxs), + _cur_written_rows(0), + _output_object_data(output_object_data) {} + + virtual ~VFileWriterWrapper() = default; + + virtual Status prepare() = 0; + + virtual Status write(const Block& block) = 0; + + virtual Status close() = 0; + + virtual int64_t written_len() = 0; + +protected: + const VExprContextSPtrs& _output_vexpr_ctxs; + int64_t _cur_written_rows; + bool _output_object_data; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/runtime/vparquet_writer.h b/be/src/vec/runtime/vparquet_writer.h index 22410b5d06..36514c9fe8 100644 --- a/be/src/vec/runtime/vparquet_writer.h +++ b/be/src/vec/runtime/vparquet_writer.h @@ -26,12 +26,7 @@ #include <parquet/types.h> #include <stdint.h> -#include <memory> -#include <vector> - -#include "common/status.h" -#include "vec/core/block.h" -#include "vec/exprs/vexpr_fwd.h" +#include "vfile_writer_wrapper.h" namespace doris { namespace io { @@ -90,29 +85,6 @@ public: const TypeDescriptor& type_desc); }; -class VFileWriterWrapper { -public: - VFileWriterWrapper(const VExprContextSPtrs& output_vexpr_ctxs, bool output_object_data) - : _output_vexpr_ctxs(output_vexpr_ctxs), - _cur_written_rows(0), - _output_object_data(output_object_data) {} - - virtual ~VFileWriterWrapper() = default; - - virtual Status prepare() = 0; - - virtual Status write(const Block& block) = 0; - - virtual Status close() = 0; - - virtual int64_t written_len() = 0; - -protected: - const VExprContextSPtrs& _output_vexpr_ctxs; - int64_t _cur_written_rows; - bool _output_object_data; -}; - // a wrapper of parquet output stream class VParquetWriterWrapper final : public VFileWriterWrapper { public: diff --git a/be/src/vec/sink/vtable_sink.h b/be/src/vec/sink/async_writer_sink.h similarity index 75% rename from be/src/vec/sink/vtable_sink.h rename to be/src/vec/sink/async_writer_sink.h index fc41faa175..8eb177ce8e 100644 --- a/be/src/vec/sink/vtable_sink.h +++ b/be/src/vec/sink/async_writer_sink.h @@ -42,10 +42,10 @@ namespace vectorized { class Block; template <typename Writer, const char* Name> -class VTableSink : public DataSink { +class AsyncWriterSink : public DataSink { public: - VTableSink(ObjectPool* pool, const RowDescriptor& row_desc, const std::vector<TExpr>& t_exprs) - : DataSink(row_desc), _pool(pool), _t_output_expr(t_exprs) { + AsyncWriterSink(const RowDescriptor& row_desc, const std::vector<TExpr>& t_exprs) + : DataSink(row_desc), _t_output_expr(t_exprs) { _name = Name; } @@ -87,14 +87,12 @@ public: return _writer->sink(block, eos); } - RuntimeProfile* profile() override { return _profile; } - bool can_write() override { return _writer->can_write(); } Status close(RuntimeState* state, Status exec_status) override { if (_writer->need_normal_close()) { if (exec_status.ok() && !state->is_cancelled()) { - RETURN_IF_ERROR(_writer->finish_trans()); + RETURN_IF_ERROR(_writer->commit_trans()); } RETURN_IF_ERROR(_writer->close()); } @@ -111,37 +109,31 @@ public: bool is_close_done() override { return !_writer->is_pending_finish(); } protected: - // owned by RuntimeState - ObjectPool* _pool; const std::vector<TExpr>& _t_output_expr; VExprContextSPtrs _output_vexpr_ctxs; std::unique_ptr<Writer> _writer; - RuntimeProfile* _profile; }; inline constexpr char VJDBC_TABLE_SINK_NAME[] = "VJdbcTableSink"; inline constexpr char VODBC_TABLE_SINK_NAME[] = "VOdbcTableSink"; inline constexpr char VMYSQL_TABLE_SINK_NAME[] = "VMysqlTableSink"; -class VJdbcTableSink : public VTableSink<VJdbcTableWriter, VJDBC_TABLE_SINK_NAME> { +class VJdbcTableSink : public AsyncWriterSink<VJdbcTableWriter, VJDBC_TABLE_SINK_NAME> { public: - VJdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc, - const std::vector<TExpr>& t_exprs) - : VTableSink<VJdbcTableWriter, VJDBC_TABLE_SINK_NAME>(pool, row_desc, t_exprs) {}; + VJdbcTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>& t_exprs) + : AsyncWriterSink<VJdbcTableWriter, VJDBC_TABLE_SINK_NAME>(row_desc, t_exprs) {}; }; -class VOdbcTableSink : public VTableSink<VOdbcTableWriter, VODBC_TABLE_SINK_NAME> { +class VOdbcTableSink : public AsyncWriterSink<VOdbcTableWriter, VODBC_TABLE_SINK_NAME> { public: - VOdbcTableSink(ObjectPool* pool, const RowDescriptor& row_desc, - const std::vector<TExpr>& t_exprs) - : VTableSink<VOdbcTableWriter, VODBC_TABLE_SINK_NAME>(pool, row_desc, t_exprs) {}; + VOdbcTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>& t_exprs) + : AsyncWriterSink<VOdbcTableWriter, VODBC_TABLE_SINK_NAME>(row_desc, t_exprs) {}; }; -class VMysqlTableSink : public VTableSink<VMysqlTableWriter, VMYSQL_TABLE_SINK_NAME> { +class VMysqlTableSink : public AsyncWriterSink<VMysqlTableWriter, VMYSQL_TABLE_SINK_NAME> { public: - VMysqlTableSink(ObjectPool* pool, const RowDescriptor& row_desc, - const std::vector<TExpr>& t_exprs) - : VTableSink<VMysqlTableWriter, VMYSQL_TABLE_SINK_NAME>(pool, row_desc, t_exprs) {}; + VMysqlTableSink(const RowDescriptor& row_desc, const std::vector<TExpr>& t_exprs) + : AsyncWriterSink<VMysqlTableWriter, VMYSQL_TABLE_SINK_NAME>(row_desc, t_exprs) {}; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/multi_cast_data_stream_sink.h b/be/src/vec/sink/multi_cast_data_stream_sink.h index df210d74ff..364586cad0 100644 --- a/be/src/vec/sink/multi_cast_data_stream_sink.h +++ b/be/src/vec/sink/multi_cast_data_stream_sink.h @@ -25,7 +25,9 @@ namespace doris::vectorized { class MultiCastDataStreamSink : public DataSink { public: MultiCastDataStreamSink(std::shared_ptr<pipeline::MultiCastDataStreamer>& streamer) - : DataSink(streamer->row_desc()), _multi_cast_data_streamer(streamer) {}; + : DataSink(streamer->row_desc()), _multi_cast_data_streamer(streamer) { + _profile = _multi_cast_data_streamer->profile(); + }; ~MultiCastDataStreamSink() override = default; @@ -39,8 +41,6 @@ public: // use sink to check can_write, now always true after we support spill to disk bool can_write() override { return _multi_cast_data_streamer->can_write(); } - RuntimeProfile* profile() override { return _multi_cast_data_streamer->profile(); } - std::shared_ptr<pipeline::MultiCastDataStreamer>& get_multi_cast_data_streamer() { return _multi_cast_data_streamer; } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 4c547c7840..23194db6d9 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -300,7 +300,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _pool(pool), _current_channel_idx(0), _part_type(sink.output_partition.type), - _profile(nullptr), _serialize_batch_timer(nullptr), _bytes_sent_counter(nullptr), _local_send_timer(nullptr), @@ -365,7 +364,6 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int _pool(pool), _current_channel_idx(0), _part_type(TPartitionType::UNPARTITIONED), - _profile(nullptr), _serialize_batch_timer(nullptr), _compress_timer(nullptr), _brpc_send_timer(nullptr), diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index a7cceb1385..a3e4ccf2a6 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -117,7 +117,6 @@ public: Status send(RuntimeState* state, Block* block, bool eos = false) override; Status try_close(RuntimeState* state, Status exec_status) override; Status close(RuntimeState* state, Status exec_status) override; - RuntimeProfile* profile() override { return _profile; } RuntimeState* state() { return _state; } @@ -192,7 +191,6 @@ protected: std::vector<Channel<VDataStreamSender>*> _channels; std::vector<std::shared_ptr<Channel<VDataStreamSender>>> _channel_shared_ptrs; - RuntimeProfile* _profile; // Allocated from _pool RuntimeProfile::Counter* _serialize_batch_timer; RuntimeProfile::Counter* _compress_timer; RuntimeProfile::Counter* _brpc_send_timer; diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp b/be/src/vec/sink/vmemory_scratch_sink.cpp index 6e9f5ca743..9b65838a3a 100644 --- a/be/src/vec/sink/vmemory_scratch_sink.cpp +++ b/be/src/vec/sink/vmemory_scratch_sink.cpp @@ -90,6 +90,10 @@ Status MemoryScratchSink::open(RuntimeState* state) { return VExpr::open(_output_vexpr_ctxs, state); } +bool MemoryScratchSink::can_write() { + return _queue->size() < 10; +} + Status MemoryScratchSink::close(RuntimeState* state, Status exec_status) { if (_closed) { return Status::OK(); diff --git a/be/src/vec/sink/vmemory_scratch_sink.h b/be/src/vec/sink/vmemory_scratch_sink.h index c6306c481a..848952b1d7 100644 --- a/be/src/vec/sink/vmemory_scratch_sink.h +++ b/be/src/vec/sink/vmemory_scratch_sink.h @@ -60,7 +60,7 @@ public: Status close(RuntimeState* state, Status exec_status) override; - RuntimeProfile* profile() override { return _profile; } + bool can_write() override; private: Status _prepare_vexpr(RuntimeState* state); diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index e30585419b..66d6414081 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -18,7 +18,6 @@ #include "vec/sink/vresult_file_sink.h" #include <gen_cpp/DataSinks_types.h> -#include <gen_cpp/PaloInternalService_types.h> #include <glog/logging.h> #include <opentelemetry/nostd/shared_ptr.h> #include <time.h> @@ -29,14 +28,10 @@ #include "common/config.h" #include "common/object_pool.h" #include "runtime/buffer_control_block.h" -#include "runtime/exec_env.h" #include "runtime/result_buffer_mgr.h" #include "runtime/runtime_state.h" -#include "util/runtime_profile.h" #include "util/telemetry/telemetry.h" -#include "util/uid_util.h" #include "vec/exprs/vexpr.h" -#include "vec/runtime/vfile_result_writer.h" namespace doris { class QueryStatistics; @@ -45,71 +40,45 @@ class TExpr; namespace doris::vectorized { -VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, - const RowDescriptor& row_desc, const TResultFileSink& sink, - bool send_query_statistics_with_every_batch, +VResultFileSink::VResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>& t_output_expr) - : DataSink(row_desc), _t_output_expr(t_output_expr) { - CHECK(sink.__isset.file_options); - _file_opts.reset(new ResultFileOptions(sink.file_options)); - CHECK(sink.__isset.storage_backend_type); - _storage_type = sink.storage_backend_type; - _is_top_sink = true; - - _name = "VResultFileSink"; - //for impl csv_with_name and csv_with_names_and_types - _header_type = sink.header_type; - _header = sink.header; -} + : AsyncWriterSink<VFileResultWriter, VRESULT_FILE_SINK>(row_desc, t_output_expr) {} VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TResultFileSink& sink, const std::vector<TPlanFragmentDestination>& destinations, bool send_query_statistics_with_every_batch, const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs) - : DataSink(row_desc), - _t_output_expr(t_output_expr), + : AsyncWriterSink<VFileResultWriter, VRESULT_FILE_SINK>(row_desc, t_output_expr), _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) { - CHECK(sink.__isset.file_options); - _file_opts.reset(new ResultFileOptions(sink.file_options)); - CHECK(sink.__isset.storage_backend_type); - _storage_type = sink.storage_backend_type; _is_top_sink = false; CHECK_EQ(destinations.size(), 1); _stream_sender.reset(new VDataStreamSender(state, pool, sender_id, row_desc, sink.dest_node_id, destinations, send_query_statistics_with_every_batch)); - - _name = "VResultFileSink"; - //for impl csv_with_name and csv_with_names_and_types - _header_type = sink.header_type; - _header = sink.header; } Status VResultFileSink::init(const TDataSink& tsink) { if (!_is_top_sink) { RETURN_IF_ERROR(_stream_sender->init(tsink)); } - return Status::OK(); -} -Status VResultFileSink::prepare_exprs(RuntimeState* state) { - // From the thrift expressions create the real exprs. - RETURN_IF_ERROR(VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs)); - // Prepare the exprs to run. - RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc)); - return Status::OK(); + auto& sink = tsink.result_file_sink; + CHECK(sink.__isset.file_options); + _file_opts.reset(new ResultFileOptions(sink.file_options)); + CHECK(sink.__isset.storage_backend_type); + _storage_type = sink.storage_backend_type; + + _name = "VResultFileSink"; + //for impl csv_with_name and csv_with_names_and_types + _header_type = sink.header_type; + _header = sink.header; + + return VExpr::create_expr_trees(_t_output_expr, _output_vexpr_ctxs); } Status VResultFileSink::prepare(RuntimeState* state) { - RETURN_IF_ERROR(DataSink::prepare(state)); - std::stringstream title; - title << "VResultFileSink (fragment_instance_id=" << print_id(state->fragment_instance_id()) - << ")"; - // create profile - _profile = state->obj_pool()->add(new RuntimeProfile(title.str())); - // prepare output_expr - RETURN_IF_ERROR(prepare_exprs(state)); + RETURN_IF_ERROR(AsyncWriterSink::prepare(state)); CHECK(_file_opts.get() != nullptr); if (_is_top_sink) { @@ -120,7 +89,7 @@ Status VResultFileSink::prepare(RuntimeState* state) { // create writer _writer.reset(new (std::nothrow) VFileResultWriter( _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, - _profile, _sender.get(), nullptr, state->return_object_data_as_binary(), + _sender.get(), nullptr, state->return_object_data_as_binary(), _output_row_descriptor)); } else { // init channel @@ -128,13 +97,12 @@ Status VResultFileSink::prepare(RuntimeState* state) { Block::create_unique(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1); _writer.reset(new (std::nothrow) VFileResultWriter( _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, - _profile, nullptr, _output_block.get(), state->return_object_data_as_binary(), + nullptr, _output_block.get(), state->return_object_data_as_binary(), _output_row_descriptor)); RETURN_IF_ERROR(_stream_sender->prepare(state)); _profile->add_child(_stream_sender->profile(), true, nullptr); } _writer->set_header_info(_header_type, _header); - RETURN_IF_ERROR(_writer->init(state)); return Status::OK(); } @@ -142,12 +110,7 @@ Status VResultFileSink::open(RuntimeState* state) { if (!_is_top_sink) { RETURN_IF_ERROR(_stream_sender->open(state)); } - return VExpr::open(_output_vexpr_ctxs, state); -} - -Status VResultFileSink::send(RuntimeState* state, Block* block, bool eos) { - RETURN_IF_ERROR(_writer->append_block(*block)); - return Status::OK(); + return AsyncWriterSink::open(state); } Status VResultFileSink::close(RuntimeState* state, Status exec_status) { @@ -157,7 +120,7 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) { Status final_status = exec_status; // close the writer - if (_writer) { + if (_writer && _writer->need_normal_close()) { Status st = _writer->close(); if (!st.ok() && exec_status.ok()) { // close file writer failed, should return this error to client diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h index 848a6c371d..fdf4843a52 100644 --- a/be/src/vec/sink/vresult_file_sink.h +++ b/be/src/vec/sink/vresult_file_sink.h @@ -24,11 +24,12 @@ #include <vector> #include "common/status.h" -#include "exec/data_sink.h" #include "runtime/descriptors.h" #include "vec/core/block.h" +#include "vec/sink/async_writer_sink.h" #include "vec/sink/vdata_stream_sender.h" #include "vec/sink/vresult_sink.h" +#include "vec/sink/writer/vfile_result_writer.h" namespace doris { class BufferControlBlock; @@ -44,51 +45,43 @@ class TResultFileSink; namespace vectorized { class VExprContext; -class VResultFileSink : public DataSink { +inline constexpr char VRESULT_FILE_SINK[] = "VResultFileSink"; + +class VResultFileSink : public AsyncWriterSink<VFileResultWriter, VRESULT_FILE_SINK> { public: - VResultFileSink(RuntimeState* state, ObjectPool* pool, const RowDescriptor& row_desc, - const TResultFileSink& sink, bool send_query_statistics_with_every_batch, - const std::vector<TExpr>& t_output_expr); + VResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>& t_output_expr); + VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id, const RowDescriptor& row_desc, const TResultFileSink& sink, const std::vector<TPlanFragmentDestination>& destinations, bool send_query_statistics_with_every_batch, const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs); - ~VResultFileSink() override = default; + Status init(const TDataSink& thrift_sink) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; - // send data in 'batch' to this backend stream mgr - // Blocks until all rows in batch are placed in the buffer - Status send(RuntimeState* state, Block* block, bool eos = false) override; + // Flush all buffered data and close all existing channels to destination // hosts. Further send() calls are illegal after calling close(). Status close(RuntimeState* state, Status exec_status) override; - RuntimeProfile* profile() override { return _profile; } void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override; private: - Status prepare_exprs(RuntimeState* state); // set file options when sink type is FILE std::unique_ptr<ResultFileOptions> _file_opts; TStorageBackendType::type _storage_type; // Owned by the RuntimeState. - const std::vector<TExpr>& _t_output_expr; - VExprContextSPtrs _output_vexpr_ctxs; RowDescriptor _output_row_descriptor; std::unique_ptr<Block> _output_block = nullptr; std::shared_ptr<BufferControlBlock> _sender; std::unique_ptr<VDataStreamSender> _stream_sender; - std::shared_ptr<ResultWriter> _writer; int _buf_size = 1024; // Allocated from _pool bool _is_top_sink = true; std::string _header; std::string _header_type; - - RuntimeProfile* _profile; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp index 2e82011bd7..5fdaaf1628 100644 --- a/be/src/vec/sink/vresult_sink.cpp +++ b/be/src/vec/sink/vresult_sink.cpp @@ -38,6 +38,7 @@ #include "vec/exprs/vexpr.h" #include "vec/exprs/vexpr_context.h" #include "vec/sink/vmysql_result_writer.h" +#include "vec/sink/writer/vfile_result_writer.h" namespace doris { class QueryStatistics; diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h index c9374d7cf9..a10e60d467 100644 --- a/be/src/vec/sink/vresult_sink.h +++ b/be/src/vec/sink/vresult_sink.h @@ -135,7 +135,6 @@ public: // Flush all buffered data and close all existing channels to destination // hosts. Further send() calls are illegal after calling close(). virtual Status close(RuntimeState* state, Status exec_status) override; - virtual RuntimeProfile* profile() override { return _profile; } void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override; @@ -152,8 +151,7 @@ private: std::shared_ptr<BufferControlBlock> _sender; std::shared_ptr<ResultWriter> _writer; - RuntimeProfile* _profile; // Allocated from _pool - int _buf_size; // Allocated from _pool + int _buf_size; // Allocated from _pool // for fetch data by rowids TFetchOption _fetch_option; diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index e6d4992b48..dc408d392b 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -408,9 +408,6 @@ public: size_t get_pending_bytes() const; - // Returns the runtime profile for the sink. - RuntimeProfile* profile() override { return _profile; } - // the consumer func of sending pending batches in every NodeChannel. // use polling & NodeChannel::try_send_and_fetch_status() to achieve nonblocking sending. // only focus on pending batches and channel status, the internal errors of NodeChannels will be handled by the producer @@ -464,8 +461,6 @@ private: OlapTableLocationParam* _slave_location = nullptr; DorisNodesInfo* _nodes_info = nullptr; - RuntimeProfile* _profile = nullptr; - std::unique_ptr<OlapTabletFinder> _tablet_finder; // index_channel diff --git a/be/src/vec/sink/vtablet_sink_v2.h b/be/src/vec/sink/vtablet_sink_v2.h index 5f50463268..047377f4e2 100644 --- a/be/src/vec/sink/vtablet_sink_v2.h +++ b/be/src/vec/sink/vtablet_sink_v2.h @@ -132,9 +132,6 @@ public: Status close(RuntimeState* state, Status close_status) override; Status send(RuntimeState* state, vectorized::Block* block, bool eos = false) override; - // Returns the runtime profile for the sink. - RuntimeProfile* profile() override { return _profile; } - private: Status _init_stream_pool(const NodeInfo& node_info, Streams& stream_pool, LoadStreamStub& stub_template); @@ -181,8 +178,6 @@ private: OlapTableLocationParam* _location = nullptr; DorisNodesInfo* _nodes_info = nullptr; - RuntimeProfile* _profile = nullptr; - std::unique_ptr<OlapTabletFinder> _tablet_finder; std::unique_ptr<OlapTableBlockConvertor> _block_convertor; diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index b294e110a9..2f70b05266 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -54,7 +54,9 @@ public: virtual bool in_transaction() { return false; } - bool need_normal_close() { return _need_normal_close; } + virtual Status commit_trans() { return Status::OK(); } + + bool need_normal_close() const { return _need_normal_close; } Status init(RuntimeState* state) override { return Status::OK(); } diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp similarity index 94% rename from be/src/vec/runtime/vfile_result_writer.cpp rename to be/src/vec/sink/writer/vfile_result_writer.cpp index 9d5fc4e158..e54c426f74 100644 --- a/be/src/vec/runtime/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "vec/runtime/vfile_result_writer.h" +#include "vfile_result_writer.h" #include <gen_cpp/Data_types.h> #include <gen_cpp/Metrics_types.h> @@ -72,18 +72,20 @@ namespace doris::vectorized { const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024; using doris::operator<<; +VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs) + : AsyncResultWriter(output_exprs) {} + VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts, const TStorageBackendType::type storage_type, const TUniqueId fragment_instance_id, const VExprContextSPtrs& output_vexpr_ctxs, - RuntimeProfile* parent_profile, BufferControlBlock* sinker, - Block* output_block, bool output_object_data, + BufferControlBlock* sinker, Block* output_block, + bool output_object_data, const RowDescriptor& output_row_descriptor) - : _file_opts(file_opts), + : AsyncResultWriter(output_vexpr_ctxs), + _file_opts(file_opts), _storage_type(storage_type), _fragment_instance_id(fragment_instance_id), - _output_vexpr_ctxs(output_vexpr_ctxs), - _parent_profile(parent_profile), _sinker(sinker), _output_block(output_block), _output_row_descriptor(output_row_descriptor), @@ -91,9 +93,9 @@ VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts, _output_object_data = output_object_data; } -Status VFileResultWriter::init(RuntimeState* state) { +Status VFileResultWriter::_init(RuntimeState* state, RuntimeProfile* profile) { _state = state; - _init_profile(); + _init_profile(profile); // Delete existing files if (_file_opts->delete_existing_files) { RETURN_IF_ERROR(_delete_dir()); @@ -101,8 +103,8 @@ Status VFileResultWriter::init(RuntimeState* state) { return _create_next_file_writer(); } -void VFileResultWriter::_init_profile() { - RuntimeProfile* profile = _parent_profile->create_child("VFileResultWriter", true, true); +void VFileResultWriter::_init_profile(RuntimeProfile* parent_profile) { + RuntimeProfile* profile = parent_profile->create_child("VFileResultWriter", true, true); _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime"); _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime"); _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime"); @@ -160,13 +162,13 @@ Status VFileResultWriter::_create_file_writer(const std::string& file_name) { break; case TFileFormatType::FORMAT_PARQUET: _vfile_writer.reset(new VParquetWriterWrapper( - _file_writer_impl.get(), _output_vexpr_ctxs, _file_opts->parquet_schemas, + _file_writer_impl.get(), _vec_output_expr_ctxs, _file_opts->parquet_schemas, _file_opts->parquet_commpression_type, _file_opts->parquert_disable_dictionary, _file_opts->parquet_version, _output_object_data)); RETURN_IF_ERROR(_vfile_writer->prepare()); break; case TFileFormatType::FORMAT_ORC: - _vfile_writer.reset(new VOrcWriterWrapper(_file_writer_impl.get(), _output_vexpr_ctxs, + _vfile_writer.reset(new VOrcWriterWrapper(_file_writer_impl.get(), _vec_output_expr_ctxs, _file_opts->orc_schema, _output_object_data)); RETURN_IF_ERROR(_vfile_writer->prepare()); break; @@ -237,12 +239,9 @@ Status VFileResultWriter::append_block(Block& block) { } RETURN_IF_ERROR(write_csv_header()); SCOPED_TIMER(_append_row_batch_timer); - Status status = Status::OK(); - // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec - // failed, just return the error status Block output_block; - RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, block, - &output_block)); + RETURN_IF_ERROR(_projection_block(block, &output_block)); + if (_vfile_writer) { RETURN_IF_ERROR(_write_file(output_block)); } else { @@ -267,7 +266,7 @@ Status VFileResultWriter::_write_csv_file(const Block& block) { if (col.column->is_null_at(i)) { _plain_text_outstream << NULL_IN_CSV; } else { - switch (_output_vexpr_ctxs[col_id]->root()->type().type) { + switch (_vec_output_expr_ctxs[col_id]->root()->type().type) { case TYPE_BOOLEAN: case TYPE_TINYINT: _plain_text_outstream << (int)*reinterpret_cast<const int8_t*>( @@ -327,7 +326,7 @@ Status VFileResultWriter::_write_csv_file(const Block& block) { const DateV2Value<DateTimeV2ValueType>* time_val = (const DateV2Value<DateTimeV2ValueType>*)(col.column->get_data_at(i) .data); - time_val->to_string(buf, _output_vexpr_ctxs[col_id]->root()->type().scale); + time_val->to_string(buf, _vec_output_expr_ctxs[col_id]->root()->type().scale); _plain_text_outstream << buf; break; } @@ -406,9 +405,9 @@ Status VFileResultWriter::_write_csv_file(const Block& block) { std::string VFileResultWriter::gen_types() { std::string types; - int num_columns = _output_vexpr_ctxs.size(); + int num_columns = _vec_output_expr_ctxs.size(); for (int i = 0; i < num_columns; ++i) { - types += type_to_string(_output_vexpr_ctxs[i]->root()->type().type); + types += type_to_string(_vec_output_expr_ctxs[i]->root()->type().type); if (i < num_columns - 1) { types += _file_opts->column_separator; } @@ -419,7 +418,7 @@ std::string VFileResultWriter::gen_types() { Status VFileResultWriter::write_csv_header() { if (!_header_sent && _header.size() > 0) { - std::string tmp_header = _header; + std::string tmp_header(_header); if (_header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { tmp_header += gen_types(); } @@ -628,9 +627,13 @@ Status VFileResultWriter::close() { // because `_close_file_writer()` may be called in deconstructor, // at that time, the RuntimeState may already been deconstructed, // so does the profile in RuntimeState. - COUNTER_SET(_written_rows_counter, _written_rows); - SCOPED_TIMER(_writer_close_timer); + if (_written_rows_counter) { + COUNTER_SET(_written_rows_counter, _written_rows); + SCOPED_TIMER(_writer_close_timer); + } return _close_file_writer(true); } +const string VFileResultWriter::NULL_IN_CSV = "\\N"; + } // namespace doris::vectorized diff --git a/be/src/vec/runtime/vfile_result_writer.h b/be/src/vec/sink/writer/vfile_result_writer.h similarity index 85% rename from be/src/vec/runtime/vfile_result_writer.h rename to be/src/vec/sink/writer/vfile_result_writer.h index 4ec425755d..b56e41c377 100644 --- a/be/src/vec/runtime/vfile_result_writer.h +++ b/be/src/vec/sink/writer/vfile_result_writer.h @@ -29,10 +29,10 @@ #include "common/status.h" #include "io/fs/file_writer.h" #include "runtime/descriptors.h" -#include "runtime/result_writer.h" #include "util/runtime_profile.h" #include "vec/core/block.h" -#include "vec/runtime/vparquet_writer.h" +#include "vec/runtime/vfile_writer_wrapper.h" +#include "vec/sink/writer/async_result_writer.h" namespace doris { class BufferControlBlock; @@ -47,35 +47,45 @@ struct ResultFileOptions; namespace doris::vectorized { // write result to file -class VFileResultWriter final : public ResultWriter { +class VFileResultWriter final : public AsyncResultWriter { public: VFileResultWriter(const ResultFileOptions* file_option, const TStorageBackendType::type storage_type, const TUniqueId fragment_instance_id, - const VExprContextSPtrs& _output_vexpr_ctxs, RuntimeProfile* parent_profile, - BufferControlBlock* sinker, Block* output_block, bool output_object_data, + const VExprContextSPtrs& _output_vexpr_ctxs, BufferControlBlock* sinker, + Block* output_block, bool output_object_data, const RowDescriptor& output_row_descriptor); - virtual ~VFileResultWriter() = default; + + VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); Status append_block(Block& block) override; - Status init(RuntimeState* state) override; Status close() override; + Status open(RuntimeState* state, RuntimeProfile* profile) override { + return _init(state, profile); + } + // file result writer always return statistic result in one row int64_t get_written_rows() const override { return 1; } std::string gen_types(); Status write_csv_header(); + void set_header_info(const std::string& header_type, const std::string& header) { + _header_type = header_type; + _header = header; + } + private: + Status _init(RuntimeState* state, RuntimeProfile*); Status _write_file(const Block& block); Status _write_csv_file(const Block& block); // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer // if eos, write the data even if buffer is not full. Status _flush_plain_text_outstream(bool eos); - void _init_profile(); + void _init_profile(RuntimeProfile*); Status _create_file_writer(const std::string& file_name); Status _create_next_file_writer(); @@ -96,11 +106,12 @@ private: // delete the dir of file_path Status _delete_dir(); + static const std::string NULL_IN_CSV; + RuntimeState* _state; // not owned, set when init const ResultFileOptions* _file_opts; TStorageBackendType::type _storage_type; TUniqueId _fragment_instance_id; - const VExprContextSPtrs& _output_vexpr_ctxs; // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter. // If the result file format is Parquet, this _file_writer is owned by _parquet_writer. @@ -119,7 +130,6 @@ private: // the suffix idx of export file name, start at 0 int _file_idx = 0; - RuntimeProfile* _parent_profile; // profile from result sink, not owned // total time cost on append batch operation RuntimeProfile::Counter* _append_row_batch_timer = nullptr; // tuple convert timer, child timer of _append_row_batch_timer @@ -142,5 +152,9 @@ private: RowDescriptor _output_row_descriptor; // parquet/orc file writer std::unique_ptr<VFileWriterWrapper> _vfile_writer; + + std::string_view _header_type; + std::string_view _header; + std::unique_ptr<VFileResultWriter> _writer; }; } // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h b/be/src/vec/sink/writer/vjdbc_table_writer.h index 1aa1e59482..205f0835f5 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.h +++ b/be/src/vec/sink/writer/vjdbc_table_writer.h @@ -50,6 +50,8 @@ public: bool in_transaction() override { return TableConnector::_is_in_transaction; } + Status commit_trans() override { return JdbcConnector::finish_trans(); } + private: JdbcConnectorParam _param; }; diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp b/be/src/vec/sink/writer/vmysql_table_writer.cpp index a9dad2ca80..6897a6b3e2 100644 --- a/be/src/vec/sink/writer/vmysql_table_writer.cpp +++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp @@ -115,12 +115,12 @@ Status VMysqlTableWriter::append_block(vectorized::Block& block) { RETURN_IF_ERROR(_projection_block(block, &output_block)); auto num_rows = output_block.rows(); for (int i = 0; i < num_rows; ++i) { - RETURN_IF_ERROR(insert_row(output_block, i)); + RETURN_IF_ERROR(_insert_row(output_block, i)); } return Status::OK(); } -Status VMysqlTableWriter::insert_row(vectorized::Block& block, size_t row) { +Status VMysqlTableWriter::_insert_row(vectorized::Block& block, size_t row) { _insert_stmt_buffer.clear(); fmt::format_to(_insert_stmt_buffer, "INSERT INTO {} VALUES (", _conn_info.table_name); int num_columns = _vec_output_expr_ctxs.size(); diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h b/be/src/vec/sink/writer/vmysql_table_writer.h index bb134f9a58..9028801144 100644 --- a/be/src/vec/sink/writer/vmysql_table_writer.h +++ b/be/src/vec/sink/writer/vmysql_table_writer.h @@ -53,12 +53,10 @@ public: Status append_block(vectorized::Block& block) override; - Status finish_trans() { return Status::OK(); } - Status close() override; private: - Status insert_row(vectorized::Block& block, size_t row); + Status _insert_row(vectorized::Block& block, size_t row); MysqlConnInfo _conn_info; fmt::memory_buffer _insert_stmt_buffer; MYSQL* _mysql_conn; diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h b/be/src/vec/sink/writer/vodbc_table_writer.h index e07f44c9e4..3df973e4b1 100644 --- a/be/src/vec/sink/writer/vodbc_table_writer.h +++ b/be/src/vec/sink/writer/vodbc_table_writer.h @@ -1,4 +1,3 @@ - // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -50,6 +49,8 @@ public: Status close() override { return ODBCConnector::close(); } bool in_transaction() override { return TableConnector::_is_in_transaction; } + + Status commit_trans() override { return ODBCConnector::finish_trans(); } }; } // namespace vectorized } // namespace doris \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org