This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new bea1618da8d [refactor](minor) Delete unused code (#38848) bea1618da8d is described below commit bea1618da8d94134d7ffa193032360c54fb9fa17 Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Mon Aug 5 22:15:16 2024 +0800 [refactor](minor) Delete unused code (#38848) ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/pipeline/dependency.h | 9 ----- be/src/pipeline/exec/operator.cpp | 8 ++--- be/src/pipeline/exec/operator.h | 3 +- be/src/pipeline/exec/result_file_sink_operator.cpp | 6 ++-- be/src/pipeline/pipeline_fragment_context.cpp | 2 +- .../vec/runtime/shared_hash_table_controller.cpp | 2 -- be/src/vec/runtime/shared_hash_table_controller.h | 1 - be/src/vec/sink/writer/async_result_writer.cpp | 40 +++++++++------------- be/src/vec/sink/writer/async_result_writer.h | 15 +++----- .../sink/writer/iceberg/viceberg_table_writer.cpp | 6 ++-- .../sink/writer/iceberg/viceberg_table_writer.h | 4 ++- be/src/vec/sink/writer/vfile_result_writer.cpp | 23 +++++++------ be/src/vec/sink/writer/vfile_result_writer.h | 8 +++-- be/src/vec/sink/writer/vhive_table_writer.cpp | 6 ++-- be/src/vec/sink/writer/vhive_table_writer.h | 4 ++- be/src/vec/sink/writer/vjdbc_table_writer.cpp | 7 ++-- be/src/vec/sink/writer/vjdbc_table_writer.h | 4 ++- be/src/vec/sink/writer/vmysql_table_writer.cpp | 6 ++-- be/src/vec/sink/writer/vmysql_table_writer.h | 4 ++- be/src/vec/sink/writer/vodbc_table_writer.cpp | 7 ++-- be/src/vec/sink/writer/vodbc_table_writer.h | 4 ++- be/src/vec/sink/writer/vtablet_writer.cpp | 6 ++-- be/src/vec/sink/writer/vtablet_writer.h | 4 ++- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 6 ++-- be/src/vec/sink/writer/vtablet_writer_v2.h | 4 ++- 25 files changed, 100 insertions(+), 89 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 1e29cf904c7..171947ba1c7 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -649,15 +649,6 @@ public: std::mutex sink_eos_lock; }; -class AsyncWriterDependency final : public Dependency { -public: - using SharedState = BasicSharedState; - ENABLE_FACTORY_CREATOR(AsyncWriterDependency); - AsyncWriterDependency(int id, int node_id) - : Dependency(id, node_id, "AsyncWriterDependency", true) {} - ~AsyncWriterDependency() override = default; -}; - using SetHashTableVariants = std::variant<std::monostate, vectorized::MethodSerialized<HashMap<StringRef, RowRefListWithFlags>>, diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 0928b32f41d..07e0c3cf640 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -621,10 +621,10 @@ template <typename Writer, typename Parent> requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, LocalSinkStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); - _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs)); - _async_writer_dependency = - AsyncWriterDependency::create_shared(_parent->operator_id(), _parent->node_id()); - _writer->set_dependency(_async_writer_dependency.get(), _finish_dependency.get()); + _async_writer_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "AsyncWriterDependency", true); + _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs, _async_writer_dependency, + _finish_dependency)); _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( _profile, "WaitForDependency[" + _async_writer_dependency->name() + "]Time", 1); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 2db981ba88e..9d549690461 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -871,8 +871,7 @@ protected: vectorized::VExprContextSPtrs _output_vexpr_ctxs; std::unique_ptr<Writer> _writer; - std::shared_ptr<AsyncWriterDependency> _async_writer_dependency; - + std::shared_ptr<Dependency> _async_writer_dependency; std::shared_ptr<Dependency> _finish_dependency; }; diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp index 029bea7494e..0ba727543cd 100644 --- a/be/src/pipeline/exec/result_file_sink_operator.cpp +++ b/be/src/pipeline/exec/result_file_sink_operator.cpp @@ -105,7 +105,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i _writer.reset(new (std::nothrow) vectorized::VFileResultWriter( p._file_opts.get(), p._storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, _sender, nullptr, state->return_object_data_as_binary(), - p._output_row_descriptor)); + p._output_row_descriptor, _async_writer_dependency, _finish_dependency)); } else { // init channel _output_block = vectorized::Block::create_unique( @@ -113,7 +113,8 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i _writer.reset(new (std::nothrow) vectorized::VFileResultWriter( p._file_opts.get(), p._storage_type, state->fragment_instance_id(), _output_vexpr_ctxs, nullptr, _output_block.get(), - state->return_object_data_as_binary(), p._output_row_descriptor)); + state->return_object_data_as_binary(), p._output_row_descriptor, + _async_writer_dependency, _finish_dependency)); std::map<int64_t, int64_t> fragment_id_to_channel_index; for (int i = 0; i < p._dests.size(); ++i) { @@ -129,7 +130,6 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i RETURN_IF_ERROR(_channel->init_stub(state)); } } - _writer->set_dependency(_async_writer_dependency.get(), _finish_dependency.get()); _writer->set_header_info(p._header_type, p._header); return Status::OK(); } diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index c4a2073c911..a2f26ac0a00 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1470,7 +1470,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo break; } default: - return Status::InternalError("Unsupported exec type in pipelineX: {}", + return Status::InternalError("Unsupported exec type in pipeline: {}", print_plan_node_type(tnode.node_type)); } diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp b/be/src/vec/runtime/shared_hash_table_controller.cpp index a416ba6349e..4b77b1ed8a3 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.cpp +++ b/be/src/vec/runtime/shared_hash_table_controller.cpp @@ -52,7 +52,6 @@ void SharedHashTableController::signal(int my_node_id) { for (auto& dep : _dependencies[my_node_id]) { dep->set_ready(); } - _cv.notify_all(); } void SharedHashTableController::signal_finish(int my_node_id) { @@ -60,7 +59,6 @@ void SharedHashTableController::signal_finish(int my_node_id) { for (auto& dep : _finish_dependencies[my_node_id]) { dep->set_ready(); } - _cv.notify_all(); } TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int my_node_id) { diff --git a/be/src/vec/runtime/shared_hash_table_controller.h b/be/src/vec/runtime/shared_hash_table_controller.h index ec3c616bca8..b04b1cdba06 100644 --- a/be/src/vec/runtime/shared_hash_table_controller.h +++ b/be/src/vec/runtime/shared_hash_table_controller.h @@ -95,7 +95,6 @@ private: std::map<int /*node id*/, std::vector<std::shared_ptr<pipeline::Dependency>>> _dependencies; std::map<int /*node id*/, std::vector<std::shared_ptr<pipeline::Dependency>>> _finish_dependencies; - std::condition_variable _cv; std::map<int /*node id*/, TUniqueId /*fragment instance id*/> _builder_fragment_ids; std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts; }; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index 82c5f4ab288..e5fe8f589d5 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -33,16 +33,10 @@ class TExpr; namespace vectorized { -AsyncResultWriter::AsyncResultWriter(const doris::vectorized::VExprContextSPtrs& output_expr_ctxs) - : _vec_output_expr_ctxs(output_expr_ctxs), - _dependency(nullptr), - _finish_dependency(nullptr) {} - -void AsyncResultWriter::set_dependency(pipeline::AsyncWriterDependency* dep, - pipeline::Dependency* finish_dep) { - _dependency = dep; - _finish_dependency = finish_dep; -} +AsyncResultWriter::AsyncResultWriter(const doris::vectorized::VExprContextSPtrs& output_expr_ctxs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) + : _vec_output_expr_ctxs(output_expr_ctxs), _dependency(dep), _finish_dependency(fin_dep) {} Status AsyncResultWriter::sink(Block* block, bool eos) { auto rows = block->rows(); @@ -58,12 +52,13 @@ Status AsyncResultWriter::sink(Block* block, bool eos) { return _writer_status.status(); } - if (_dependency && _is_finished()) { + DCHECK(_dependency); + if (_is_finished()) { _dependency->set_ready(); } if (rows) { _data_queue.emplace_back(std::move(add_block)); - if (_dependency && !_data_queue_is_available() && !_is_finished()) { + if (!_data_queue_is_available() && !_is_finished()) { _dependency->block(); } } @@ -81,7 +76,8 @@ std::unique_ptr<Block> AsyncResultWriter::_get_block_from_queue() { DCHECK(!_data_queue.empty()); auto block = std::move(_data_queue.front()); _data_queue.pop_front(); - if (_dependency && _data_queue_is_available()) { + DCHECK(_dependency); + if (_data_queue_is_available()) { _dependency->set_ready(); } return block; @@ -89,10 +85,8 @@ std::unique_ptr<Block> AsyncResultWriter::_get_block_from_queue() { Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* profile) { // Should set to false here, to - _writer_thread_closed = false; - if (_finish_dependency) { - _finish_dependency->block(); - } + DCHECK(_finish_dependency); + _finish_dependency->block(); // This is a async thread, should lock the task ctx, to make sure runtimestate and profile // not deconstructed before the thread exit. auto task_ctx = state->get_task_execution_context(); @@ -113,6 +107,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi force_close(status); } + DCHECK(_dependency); if (_writer_status.ok()) { while (true) { if (!_eos && _data_queue.empty() && _writer_status.ok()) { @@ -133,7 +128,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi if (!status.ok()) [[unlikely]] { std::unique_lock l(_m); _writer_status.update(status); - if (_dependency && _is_finished()) { + if (_is_finished()) { _dependency->set_ready(); } break; @@ -174,16 +169,14 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi if (_writer_status.ok()) { _writer_status.update(close_st); } - _writer_thread_closed = true; } // should set _finish_dependency first, as close function maybe blocked by wait_close of execution_timeout _set_ready_to_finish(); } void AsyncResultWriter::_set_ready_to_finish() { - if (_finish_dependency) { - _finish_dependency->set_ready(); - } + DCHECK(_finish_dependency); + _finish_dependency->set_ready(); } Status AsyncResultWriter::_projection_block(doris::vectorized::Block& input_block, @@ -201,7 +194,8 @@ Status AsyncResultWriter::_projection_block(doris::vectorized::Block& input_bloc void AsyncResultWriter::force_close(Status s) { std::lock_guard l(_m); _writer_status.update(s); - if (_dependency && _is_finished()) { + DCHECK(_dependency); + if (_is_finished()) { _dependency->set_ready(); } _cv.notify_one(); diff --git a/be/src/vec/sink/writer/async_result_writer.h b/be/src/vec/sink/writer/async_result_writer.h index 5e21dc13e12..36bca48358a 100644 --- a/be/src/vec/sink/writer/async_result_writer.h +++ b/be/src/vec/sink/writer/async_result_writer.h @@ -33,7 +33,6 @@ class TDataSink; class TExpr; namespace pipeline { -class AsyncWriterDependency; class Dependency; class PipelineTask; @@ -55,9 +54,9 @@ class Block; */ class AsyncResultWriter : public ResultWriter { public: - AsyncResultWriter(const VExprContextSPtrs& output_expr_ctxs); - - void set_dependency(pipeline::AsyncWriterDependency* dep, pipeline::Dependency* finish_dep); + AsyncResultWriter(const VExprContextSPtrs& output_expr_ctxs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep); void force_close(Status s); @@ -96,13 +95,9 @@ private: // Default value is ok AtomicStatus _writer_status; bool _eos = false; - // The writer is not started at the beginning. If prepare failed but not open, the the writer - // is not started, so should not pending finish on it. - bool _writer_thread_closed = true; - // Used by pipelineX - pipeline::AsyncWriterDependency* _dependency; - pipeline::Dependency* _finish_dependency; + std::shared_ptr<pipeline::Dependency> _dependency; + std::shared_ptr<pipeline::Dependency> _finish_dependency; moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks; }; diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp index 070dbad3d78..898b71d1d9a 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp @@ -33,8 +33,10 @@ namespace doris { namespace vectorized { VIcebergTableWriter::VIcebergTableWriter(const TDataSink& t_sink, - const VExprContextSPtrs& output_expr_ctxs) - : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) { + const VExprContextSPtrs& output_expr_ctxs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) + : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), _t_sink(t_sink) { DCHECK(_t_sink.__isset.iceberg_table_sink); } diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h index e2e582e04ad..ae53d3af98e 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h @@ -46,7 +46,9 @@ struct ColumnWithTypeAndName; class VIcebergTableWriter final : public AsyncResultWriter { public: - VIcebergTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + VIcebergTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep); ~VIcebergTableWriter() = default; diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp b/be/src/vec/sink/writer/vfile_result_writer.cpp index ce8f2d18e07..16491311c17 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.cpp +++ b/be/src/vec/sink/writer/vfile_result_writer.cpp @@ -60,17 +60,18 @@ namespace doris::vectorized { -VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs) - : AsyncResultWriter(output_exprs) {} - -VFileResultWriter::VFileResultWriter(const pipeline::ResultFileOptions* file_opts, - const TStorageBackendType::type storage_type, - const TUniqueId fragment_instance_id, - const VExprContextSPtrs& output_vexpr_ctxs, - std::shared_ptr<BufferControlBlock> sinker, - Block* output_block, bool output_object_data, - const RowDescriptor& output_row_descriptor) - : AsyncResultWriter(output_vexpr_ctxs), +VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) + : AsyncResultWriter(output_exprs, dep, fin_dep) {} + +VFileResultWriter::VFileResultWriter( + const pipeline::ResultFileOptions* file_opts, const TStorageBackendType::type storage_type, + const TUniqueId fragment_instance_id, const VExprContextSPtrs& output_vexpr_ctxs, + std::shared_ptr<BufferControlBlock> sinker, Block* output_block, bool output_object_data, + const RowDescriptor& output_row_descriptor, std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) + : AsyncResultWriter(output_vexpr_ctxs, dep, fin_dep), _file_opts(file_opts), _storage_type(storage_type), _fragment_instance_id(fragment_instance_id), diff --git a/be/src/vec/sink/writer/vfile_result_writer.h b/be/src/vec/sink/writer/vfile_result_writer.h index 42753a5e261..bf0a5d3e9e2 100644 --- a/be/src/vec/sink/writer/vfile_result_writer.h +++ b/be/src/vec/sink/writer/vfile_result_writer.h @@ -56,9 +56,13 @@ public: const TUniqueId fragment_instance_id, const VExprContextSPtrs& _output_vexpr_ctxs, std::shared_ptr<BufferControlBlock> sinker, Block* output_block, - bool output_object_data, const RowDescriptor& output_row_descriptor); + bool output_object_data, const RowDescriptor& output_row_descriptor, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep); - VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep); Status write(RuntimeState* state, Block& block) override; diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp b/be/src/vec/sink/writer/vhive_table_writer.cpp index f90c7134ccd..53f70b6b31a 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.cpp +++ b/be/src/vec/sink/writer/vhive_table_writer.cpp @@ -30,8 +30,10 @@ namespace doris { namespace vectorized { VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink, - const VExprContextSPtrs& output_expr_ctxs) - : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) { + const VExprContextSPtrs& output_expr_ctxs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) + : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), _t_sink(t_sink) { DCHECK(_t_sink.__isset.hive_table_sink); } diff --git a/be/src/vec/sink/writer/vhive_table_writer.h b/be/src/vec/sink/writer/vhive_table_writer.h index 6c8b972f280..9361fdbc408 100644 --- a/be/src/vec/sink/writer/vhive_table_writer.h +++ b/be/src/vec/sink/writer/vhive_table_writer.h @@ -39,7 +39,9 @@ struct ColumnWithTypeAndName; class VHiveTableWriter final : public AsyncResultWriter { public: - VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep); ~VHiveTableWriter() override = default; diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp b/be/src/vec/sink/writer/vjdbc_table_writer.cpp index b7c8d1f78dd..8c24f4746ad 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp +++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp @@ -57,8 +57,11 @@ JdbcConnectorParam VJdbcTableWriter::create_connect_param(const doris::TDataSink } VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink, - const VExprContextSPtrs& output_expr_ctxs) - : AsyncResultWriter(output_expr_ctxs), JdbcConnector(create_connect_param(t_sink)) {} + const VExprContextSPtrs& output_expr_ctxs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) + : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), + JdbcConnector(create_connect_param(t_sink)) {} Status VJdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) { Block output_block; diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h b/be/src/vec/sink/writer/vjdbc_table_writer.h index b8216c3bcd6..aa957d2495b 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.h +++ b/be/src/vec/sink/writer/vjdbc_table_writer.h @@ -36,7 +36,9 @@ class VJdbcTableWriter final : public AsyncResultWriter, public JdbcConnector { public: static JdbcConnectorParam create_connect_param(const TDataSink&); - VJdbcTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + VJdbcTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep); // connect to jdbc server Status open(RuntimeState* state, RuntimeProfile* profile) override { diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp b/be/src/vec/sink/writer/vmysql_table_writer.cpp index 45afe8ce019..a0d47ffec1e 100644 --- a/be/src/vec/sink/writer/vmysql_table_writer.cpp +++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp @@ -61,8 +61,10 @@ std::string MysqlConnInfo::debug_string() const { } VMysqlTableWriter::VMysqlTableWriter(const TDataSink& t_sink, - const VExprContextSPtrs& output_expr_ctxs) - : AsyncResultWriter(output_expr_ctxs) { + const VExprContextSPtrs& output_expr_ctxs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) + : AsyncResultWriter(output_expr_ctxs, dep, fin_dep) { const auto& t_mysql_sink = t_sink.mysql_table_sink; _conn_info.host = t_mysql_sink.host; _conn_info.port = t_mysql_sink.port; diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h b/be/src/vec/sink/writer/vmysql_table_writer.h index 072885b176b..04efabf3ffb 100644 --- a/be/src/vec/sink/writer/vmysql_table_writer.h +++ b/be/src/vec/sink/writer/vmysql_table_writer.h @@ -46,7 +46,9 @@ class Block; class VMysqlTableWriter final : public AsyncResultWriter { public: - VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep); // connect to mysql server Status open(RuntimeState* state, RuntimeProfile* profile) override; diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp b/be/src/vec/sink/writer/vodbc_table_writer.cpp index c70bdd4ca19..19cb2e50109 100644 --- a/be/src/vec/sink/writer/vodbc_table_writer.cpp +++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp @@ -42,8 +42,11 @@ ODBCConnectorParam VOdbcTableWriter::create_connect_param(const doris::TDataSink } VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink& t_sink, - const VExprContextSPtrs& output_expr_ctxs) - : AsyncResultWriter(output_expr_ctxs), ODBCConnector(create_connect_param(t_sink)) {} + const VExprContextSPtrs& output_expr_ctxs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) + : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), + ODBCConnector(create_connect_param(t_sink)) {} Status VOdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) { Block output_block; diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h b/be/src/vec/sink/writer/vodbc_table_writer.h index fa4dc47b77f..9638dea684a 100644 --- a/be/src/vec/sink/writer/vodbc_table_writer.h +++ b/be/src/vec/sink/writer/vodbc_table_writer.h @@ -36,7 +36,9 @@ class VOdbcTableWriter final : public AsyncResultWriter, public ODBCConnector { public: static ODBCConnectorParam create_connect_param(const TDataSink&); - VOdbcTableWriter(const doris::TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + VOdbcTableWriter(const doris::TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep); // connect to odbc server Status open(RuntimeState* state, RuntimeProfile* profile) override { diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index d3d6c35fc42..99eac0c1e51 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -976,8 +976,10 @@ void VNodeChannel::mark_close(bool hang_wait) { _eos_is_produced = true; } -VTabletWriter::VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs) - : AsyncResultWriter(output_exprs), _t_sink(t_sink) { +VTabletWriter::VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) + : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) { _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc; } diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index b9fbc4d0873..993f9781955 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -542,7 +542,9 @@ namespace doris::vectorized { // write result to file class VTabletWriter final : public AsyncResultWriter { public: - VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep); Status write(RuntimeState* state, Block& block) override; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 8bf0520aba0..a73cd5b4444 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -55,8 +55,10 @@ namespace doris::vectorized { -VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs) - : AsyncResultWriter(output_exprs), _t_sink(t_sink) { +VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep) + : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) { DCHECK(t_sink.__isset.olap_table_sink); } diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 363dea54c3b..c3be80ce93e 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -102,7 +102,9 @@ using RowsForTablet = std::unordered_map<int64_t, Rows>; class VTabletWriterV2 final : public AsyncResultWriter { public: // Construct from thrift struct which is generated by FE. - VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs); + VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, + std::shared_ptr<pipeline::Dependency> dep, + std::shared_ptr<pipeline::Dependency> fin_dep); ~VTabletWriterV2() override; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org