This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bea1618da8d [refactor](minor) Delete unused code (#38848)
bea1618da8d is described below

commit bea1618da8d94134d7ffa193032360c54fb9fa17
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Mon Aug 5 22:15:16 2024 +0800

    [refactor](minor) Delete unused code (#38848)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/pipeline/dependency.h                       |  9 -----
 be/src/pipeline/exec/operator.cpp                  |  8 ++---
 be/src/pipeline/exec/operator.h                    |  3 +-
 be/src/pipeline/exec/result_file_sink_operator.cpp |  6 ++--
 be/src/pipeline/pipeline_fragment_context.cpp      |  2 +-
 .../vec/runtime/shared_hash_table_controller.cpp   |  2 --
 be/src/vec/runtime/shared_hash_table_controller.h  |  1 -
 be/src/vec/sink/writer/async_result_writer.cpp     | 40 +++++++++-------------
 be/src/vec/sink/writer/async_result_writer.h       | 15 +++-----
 .../sink/writer/iceberg/viceberg_table_writer.cpp  |  6 ++--
 .../sink/writer/iceberg/viceberg_table_writer.h    |  4 ++-
 be/src/vec/sink/writer/vfile_result_writer.cpp     | 23 +++++++------
 be/src/vec/sink/writer/vfile_result_writer.h       |  8 +++--
 be/src/vec/sink/writer/vhive_table_writer.cpp      |  6 ++--
 be/src/vec/sink/writer/vhive_table_writer.h        |  4 ++-
 be/src/vec/sink/writer/vjdbc_table_writer.cpp      |  7 ++--
 be/src/vec/sink/writer/vjdbc_table_writer.h        |  4 ++-
 be/src/vec/sink/writer/vmysql_table_writer.cpp     |  6 ++--
 be/src/vec/sink/writer/vmysql_table_writer.h       |  4 ++-
 be/src/vec/sink/writer/vodbc_table_writer.cpp      |  7 ++--
 be/src/vec/sink/writer/vodbc_table_writer.h        |  4 ++-
 be/src/vec/sink/writer/vtablet_writer.cpp          |  6 ++--
 be/src/vec/sink/writer/vtablet_writer.h            |  4 ++-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |  6 ++--
 be/src/vec/sink/writer/vtablet_writer_v2.h         |  4 ++-
 25 files changed, 100 insertions(+), 89 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 1e29cf904c7..171947ba1c7 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -649,15 +649,6 @@ public:
     std::mutex sink_eos_lock;
 };
 
-class AsyncWriterDependency final : public Dependency {
-public:
-    using SharedState = BasicSharedState;
-    ENABLE_FACTORY_CREATOR(AsyncWriterDependency);
-    AsyncWriterDependency(int id, int node_id)
-            : Dependency(id, node_id, "AsyncWriterDependency", true) {}
-    ~AsyncWriterDependency() override = default;
-};
-
 using SetHashTableVariants =
         std::variant<std::monostate,
                      vectorized::MethodSerialized<HashMap<StringRef, 
RowRefListWithFlags>>,
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 0928b32f41d..07e0c3cf640 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -621,10 +621,10 @@ template <typename Writer, typename Parent>
     requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
 Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
-    _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
-    _async_writer_dependency =
-            AsyncWriterDependency::create_shared(_parent->operator_id(), 
_parent->node_id());
-    _writer->set_dependency(_async_writer_dependency.get(), 
_finish_dependency.get());
+    _async_writer_dependency = 
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
+                                                         
"AsyncWriterDependency", true);
+    _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs, 
_async_writer_dependency,
+                             _finish_dependency));
 
     _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
             _profile, "WaitForDependency[" + _async_writer_dependency->name() 
+ "]Time", 1);
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 2db981ba88e..9d549690461 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -871,8 +871,7 @@ protected:
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
     std::unique_ptr<Writer> _writer;
 
-    std::shared_ptr<AsyncWriterDependency> _async_writer_dependency;
-
+    std::shared_ptr<Dependency> _async_writer_dependency;
     std::shared_ptr<Dependency> _finish_dependency;
 };
 
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 029bea7494e..0ba727543cd 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -105,7 +105,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
         _writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
                 p._file_opts.get(), p._storage_type, 
state->fragment_instance_id(),
                 _output_vexpr_ctxs, _sender, nullptr, 
state->return_object_data_as_binary(),
-                p._output_row_descriptor));
+                p._output_row_descriptor, _async_writer_dependency, 
_finish_dependency));
     } else {
         // init channel
         _output_block = vectorized::Block::create_unique(
@@ -113,7 +113,8 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
         _writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
                 p._file_opts.get(), p._storage_type, 
state->fragment_instance_id(),
                 _output_vexpr_ctxs, nullptr, _output_block.get(),
-                state->return_object_data_as_binary(), 
p._output_row_descriptor));
+                state->return_object_data_as_binary(), 
p._output_row_descriptor,
+                _async_writer_dependency, _finish_dependency));
 
         std::map<int64_t, int64_t> fragment_id_to_channel_index;
         for (int i = 0; i < p._dests.size(); ++i) {
@@ -129,7 +130,6 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& i
             RETURN_IF_ERROR(_channel->init_stub(state));
         }
     }
-    _writer->set_dependency(_async_writer_dependency.get(), 
_finish_dependency.get());
     _writer->set_header_info(p._header_type, p._header);
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index c4a2073c911..a2f26ac0a00 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1470,7 +1470,7 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         break;
     }
     default:
-        return Status::InternalError("Unsupported exec type in pipelineX: {}",
+        return Status::InternalError("Unsupported exec type in pipeline: {}",
                                      print_plan_node_type(tnode.node_type));
     }
 
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp 
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index a416ba6349e..4b77b1ed8a3 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -52,7 +52,6 @@ void SharedHashTableController::signal(int my_node_id) {
     for (auto& dep : _dependencies[my_node_id]) {
         dep->set_ready();
     }
-    _cv.notify_all();
 }
 
 void SharedHashTableController::signal_finish(int my_node_id) {
@@ -60,7 +59,6 @@ void SharedHashTableController::signal_finish(int my_node_id) 
{
     for (auto& dep : _finish_dependencies[my_node_id]) {
         dep->set_ready();
     }
-    _cv.notify_all();
 }
 
 TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int 
my_node_id) {
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index ec3c616bca8..b04b1cdba06 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -95,7 +95,6 @@ private:
     std::map<int /*node id*/, 
std::vector<std::shared_ptr<pipeline::Dependency>>> _dependencies;
     std::map<int /*node id*/, 
std::vector<std::shared_ptr<pipeline::Dependency>>>
             _finish_dependencies;
-    std::condition_variable _cv;
     std::map<int /*node id*/, TUniqueId /*fragment instance id*/> 
_builder_fragment_ids;
     std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts;
 };
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp 
b/be/src/vec/sink/writer/async_result_writer.cpp
index 82c5f4ab288..e5fe8f589d5 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -33,16 +33,10 @@ class TExpr;
 
 namespace vectorized {
 
-AsyncResultWriter::AsyncResultWriter(const 
doris::vectorized::VExprContextSPtrs& output_expr_ctxs)
-        : _vec_output_expr_ctxs(output_expr_ctxs),
-          _dependency(nullptr),
-          _finish_dependency(nullptr) {}
-
-void AsyncResultWriter::set_dependency(pipeline::AsyncWriterDependency* dep,
-                                       pipeline::Dependency* finish_dep) {
-    _dependency = dep;
-    _finish_dependency = finish_dep;
-}
+AsyncResultWriter::AsyncResultWriter(const 
doris::vectorized::VExprContextSPtrs& output_expr_ctxs,
+                                     std::shared_ptr<pipeline::Dependency> dep,
+                                     std::shared_ptr<pipeline::Dependency> 
fin_dep)
+        : _vec_output_expr_ctxs(output_expr_ctxs), _dependency(dep), 
_finish_dependency(fin_dep) {}
 
 Status AsyncResultWriter::sink(Block* block, bool eos) {
     auto rows = block->rows();
@@ -58,12 +52,13 @@ Status AsyncResultWriter::sink(Block* block, bool eos) {
         return _writer_status.status();
     }
 
-    if (_dependency && _is_finished()) {
+    DCHECK(_dependency);
+    if (_is_finished()) {
         _dependency->set_ready();
     }
     if (rows) {
         _data_queue.emplace_back(std::move(add_block));
-        if (_dependency && !_data_queue_is_available() && !_is_finished()) {
+        if (!_data_queue_is_available() && !_is_finished()) {
             _dependency->block();
         }
     }
@@ -81,7 +76,8 @@ std::unique_ptr<Block> 
AsyncResultWriter::_get_block_from_queue() {
     DCHECK(!_data_queue.empty());
     auto block = std::move(_data_queue.front());
     _data_queue.pop_front();
-    if (_dependency && _data_queue_is_available()) {
+    DCHECK(_dependency);
+    if (_data_queue_is_available()) {
         _dependency->set_ready();
     }
     return block;
@@ -89,10 +85,8 @@ std::unique_ptr<Block> 
AsyncResultWriter::_get_block_from_queue() {
 
 Status AsyncResultWriter::start_writer(RuntimeState* state, RuntimeProfile* 
profile) {
     // Should set to false here, to
-    _writer_thread_closed = false;
-    if (_finish_dependency) {
-        _finish_dependency->block();
-    }
+    DCHECK(_finish_dependency);
+    _finish_dependency->block();
     // This is a async thread, should lock the task ctx, to make sure 
runtimestate and profile
     // not deconstructed before the thread exit.
     auto task_ctx = state->get_task_execution_context();
@@ -113,6 +107,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
         force_close(status);
     }
 
+    DCHECK(_dependency);
     if (_writer_status.ok()) {
         while (true) {
             if (!_eos && _data_queue.empty() && _writer_status.ok()) {
@@ -133,7 +128,7 @@ void AsyncResultWriter::process_block(RuntimeState* state, 
RuntimeProfile* profi
             if (!status.ok()) [[unlikely]] {
                 std::unique_lock l(_m);
                 _writer_status.update(status);
-                if (_dependency && _is_finished()) {
+                if (_is_finished()) {
                     _dependency->set_ready();
                 }
                 break;
@@ -174,16 +169,14 @@ void AsyncResultWriter::process_block(RuntimeState* 
state, RuntimeProfile* profi
         if (_writer_status.ok()) {
             _writer_status.update(close_st);
         }
-        _writer_thread_closed = true;
     }
     // should set _finish_dependency first, as close function maybe blocked by 
wait_close of execution_timeout
     _set_ready_to_finish();
 }
 
 void AsyncResultWriter::_set_ready_to_finish() {
-    if (_finish_dependency) {
-        _finish_dependency->set_ready();
-    }
+    DCHECK(_finish_dependency);
+    _finish_dependency->set_ready();
 }
 
 Status AsyncResultWriter::_projection_block(doris::vectorized::Block& 
input_block,
@@ -201,7 +194,8 @@ Status 
AsyncResultWriter::_projection_block(doris::vectorized::Block& input_bloc
 void AsyncResultWriter::force_close(Status s) {
     std::lock_guard l(_m);
     _writer_status.update(s);
-    if (_dependency && _is_finished()) {
+    DCHECK(_dependency);
+    if (_is_finished()) {
         _dependency->set_ready();
     }
     _cv.notify_one();
diff --git a/be/src/vec/sink/writer/async_result_writer.h 
b/be/src/vec/sink/writer/async_result_writer.h
index 5e21dc13e12..36bca48358a 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -33,7 +33,6 @@ class TDataSink;
 class TExpr;
 
 namespace pipeline {
-class AsyncWriterDependency;
 class Dependency;
 class PipelineTask;
 
@@ -55,9 +54,9 @@ class Block;
  */
 class AsyncResultWriter : public ResultWriter {
 public:
-    AsyncResultWriter(const VExprContextSPtrs& output_expr_ctxs);
-
-    void set_dependency(pipeline::AsyncWriterDependency* dep, 
pipeline::Dependency* finish_dep);
+    AsyncResultWriter(const VExprContextSPtrs& output_expr_ctxs,
+                      std::shared_ptr<pipeline::Dependency> dep,
+                      std::shared_ptr<pipeline::Dependency> fin_dep);
 
     void force_close(Status s);
 
@@ -96,13 +95,9 @@ private:
     // Default value is ok
     AtomicStatus _writer_status;
     bool _eos = false;
-    // The writer is not started at the beginning. If prepare failed but not 
open, the the writer
-    // is not started, so should not pending finish on it.
-    bool _writer_thread_closed = true;
 
-    // Used by pipelineX
-    pipeline::AsyncWriterDependency* _dependency;
-    pipeline::Dependency* _finish_dependency;
+    std::shared_ptr<pipeline::Dependency> _dependency;
+    std::shared_ptr<pipeline::Dependency> _finish_dependency;
 
     moodycamel::ConcurrentQueue<std::unique_ptr<Block>> _free_blocks;
 };
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 070dbad3d78..898b71d1d9a 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -33,8 +33,10 @@ namespace doris {
 namespace vectorized {
 
 VIcebergTableWriter::VIcebergTableWriter(const TDataSink& t_sink,
-                                         const VExprContextSPtrs& 
output_expr_ctxs)
-        : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+                                         const VExprContextSPtrs& 
output_expr_ctxs,
+                                         std::shared_ptr<pipeline::Dependency> 
dep,
+                                         std::shared_ptr<pipeline::Dependency> 
fin_dep)
+        : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), _t_sink(t_sink) {
     DCHECK(_t_sink.__isset.iceberg_table_sink);
 }
 
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
index e2e582e04ad..ae53d3af98e 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.h
@@ -46,7 +46,9 @@ struct ColumnWithTypeAndName;
 
 class VIcebergTableWriter final : public AsyncResultWriter {
 public:
-    VIcebergTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
+    VIcebergTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs,
+                        std::shared_ptr<pipeline::Dependency> dep,
+                        std::shared_ptr<pipeline::Dependency> fin_dep);
 
     ~VIcebergTableWriter() = default;
 
diff --git a/be/src/vec/sink/writer/vfile_result_writer.cpp 
b/be/src/vec/sink/writer/vfile_result_writer.cpp
index ce8f2d18e07..16491311c17 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.cpp
+++ b/be/src/vec/sink/writer/vfile_result_writer.cpp
@@ -60,17 +60,18 @@
 
 namespace doris::vectorized {
 
-VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs)
-        : AsyncResultWriter(output_exprs) {}
-
-VFileResultWriter::VFileResultWriter(const pipeline::ResultFileOptions* 
file_opts,
-                                     const TStorageBackendType::type 
storage_type,
-                                     const TUniqueId fragment_instance_id,
-                                     const VExprContextSPtrs& 
output_vexpr_ctxs,
-                                     std::shared_ptr<BufferControlBlock> 
sinker,
-                                     Block* output_block, bool 
output_object_data,
-                                     const RowDescriptor& 
output_row_descriptor)
-        : AsyncResultWriter(output_vexpr_ctxs),
+VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs,
+                                     std::shared_ptr<pipeline::Dependency> dep,
+                                     std::shared_ptr<pipeline::Dependency> 
fin_dep)
+        : AsyncResultWriter(output_exprs, dep, fin_dep) {}
+
+VFileResultWriter::VFileResultWriter(
+        const pipeline::ResultFileOptions* file_opts, const 
TStorageBackendType::type storage_type,
+        const TUniqueId fragment_instance_id, const VExprContextSPtrs& 
output_vexpr_ctxs,
+        std::shared_ptr<BufferControlBlock> sinker, Block* output_block, bool 
output_object_data,
+        const RowDescriptor& output_row_descriptor, 
std::shared_ptr<pipeline::Dependency> dep,
+        std::shared_ptr<pipeline::Dependency> fin_dep)
+        : AsyncResultWriter(output_vexpr_ctxs, dep, fin_dep),
           _file_opts(file_opts),
           _storage_type(storage_type),
           _fragment_instance_id(fragment_instance_id),
diff --git a/be/src/vec/sink/writer/vfile_result_writer.h 
b/be/src/vec/sink/writer/vfile_result_writer.h
index 42753a5e261..bf0a5d3e9e2 100644
--- a/be/src/vec/sink/writer/vfile_result_writer.h
+++ b/be/src/vec/sink/writer/vfile_result_writer.h
@@ -56,9 +56,13 @@ public:
                       const TUniqueId fragment_instance_id,
                       const VExprContextSPtrs& _output_vexpr_ctxs,
                       std::shared_ptr<BufferControlBlock> sinker, Block* 
output_block,
-                      bool output_object_data, const RowDescriptor& 
output_row_descriptor);
+                      bool output_object_data, const RowDescriptor& 
output_row_descriptor,
+                      std::shared_ptr<pipeline::Dependency> dep,
+                      std::shared_ptr<pipeline::Dependency> fin_dep);
 
-    VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
+    VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs,
+                      std::shared_ptr<pipeline::Dependency> dep,
+                      std::shared_ptr<pipeline::Dependency> fin_dep);
 
     Status write(RuntimeState* state, Block& block) override;
 
diff --git a/be/src/vec/sink/writer/vhive_table_writer.cpp 
b/be/src/vec/sink/writer/vhive_table_writer.cpp
index f90c7134ccd..53f70b6b31a 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.cpp
+++ b/be/src/vec/sink/writer/vhive_table_writer.cpp
@@ -30,8 +30,10 @@ namespace doris {
 namespace vectorized {
 
 VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
-                                   const VExprContextSPtrs& output_expr_ctxs)
-        : AsyncResultWriter(output_expr_ctxs), _t_sink(t_sink) {
+                                   const VExprContextSPtrs& output_expr_ctxs,
+                                   std::shared_ptr<pipeline::Dependency> dep,
+                                   std::shared_ptr<pipeline::Dependency> 
fin_dep)
+        : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), _t_sink(t_sink) {
     DCHECK(_t_sink.__isset.hive_table_sink);
 }
 
diff --git a/be/src/vec/sink/writer/vhive_table_writer.h 
b/be/src/vec/sink/writer/vhive_table_writer.h
index 6c8b972f280..9361fdbc408 100644
--- a/be/src/vec/sink/writer/vhive_table_writer.h
+++ b/be/src/vec/sink/writer/vhive_table_writer.h
@@ -39,7 +39,9 @@ struct ColumnWithTypeAndName;
 
 class VHiveTableWriter final : public AsyncResultWriter {
 public:
-    VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
+    VHiveTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs,
+                     std::shared_ptr<pipeline::Dependency> dep,
+                     std::shared_ptr<pipeline::Dependency> fin_dep);
 
     ~VHiveTableWriter() override = default;
 
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp 
b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
index b7c8d1f78dd..8c24f4746ad 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp
@@ -57,8 +57,11 @@ JdbcConnectorParam 
VJdbcTableWriter::create_connect_param(const doris::TDataSink
 }
 
 VJdbcTableWriter::VJdbcTableWriter(const TDataSink& t_sink,
-                                   const VExprContextSPtrs& output_expr_ctxs)
-        : AsyncResultWriter(output_expr_ctxs), 
JdbcConnector(create_connect_param(t_sink)) {}
+                                   const VExprContextSPtrs& output_expr_ctxs,
+                                   std::shared_ptr<pipeline::Dependency> dep,
+                                   std::shared_ptr<pipeline::Dependency> 
fin_dep)
+        : AsyncResultWriter(output_expr_ctxs, dep, fin_dep),
+          JdbcConnector(create_connect_param(t_sink)) {}
 
 Status VJdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
     Block output_block;
diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.h 
b/be/src/vec/sink/writer/vjdbc_table_writer.h
index b8216c3bcd6..aa957d2495b 100644
--- a/be/src/vec/sink/writer/vjdbc_table_writer.h
+++ b/be/src/vec/sink/writer/vjdbc_table_writer.h
@@ -36,7 +36,9 @@ class VJdbcTableWriter final : public AsyncResultWriter, 
public JdbcConnector {
 public:
     static JdbcConnectorParam create_connect_param(const TDataSink&);
 
-    VJdbcTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
+    VJdbcTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs,
+                     std::shared_ptr<pipeline::Dependency> dep,
+                     std::shared_ptr<pipeline::Dependency> fin_dep);
 
     // connect to jdbc server
     Status open(RuntimeState* state, RuntimeProfile* profile) override {
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.cpp 
b/be/src/vec/sink/writer/vmysql_table_writer.cpp
index 45afe8ce019..a0d47ffec1e 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.cpp
+++ b/be/src/vec/sink/writer/vmysql_table_writer.cpp
@@ -61,8 +61,10 @@ std::string MysqlConnInfo::debug_string() const {
 }
 
 VMysqlTableWriter::VMysqlTableWriter(const TDataSink& t_sink,
-                                     const VExprContextSPtrs& output_expr_ctxs)
-        : AsyncResultWriter(output_expr_ctxs) {
+                                     const VExprContextSPtrs& output_expr_ctxs,
+                                     std::shared_ptr<pipeline::Dependency> dep,
+                                     std::shared_ptr<pipeline::Dependency> 
fin_dep)
+        : AsyncResultWriter(output_expr_ctxs, dep, fin_dep) {
     const auto& t_mysql_sink = t_sink.mysql_table_sink;
     _conn_info.host = t_mysql_sink.host;
     _conn_info.port = t_mysql_sink.port;
diff --git a/be/src/vec/sink/writer/vmysql_table_writer.h 
b/be/src/vec/sink/writer/vmysql_table_writer.h
index 072885b176b..04efabf3ffb 100644
--- a/be/src/vec/sink/writer/vmysql_table_writer.h
+++ b/be/src/vec/sink/writer/vmysql_table_writer.h
@@ -46,7 +46,9 @@ class Block;
 
 class VMysqlTableWriter final : public AsyncResultWriter {
 public:
-    VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
+    VMysqlTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs,
+                      std::shared_ptr<pipeline::Dependency> dep,
+                      std::shared_ptr<pipeline::Dependency> fin_dep);
 
     // connect to mysql server
     Status open(RuntimeState* state, RuntimeProfile* profile) override;
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.cpp 
b/be/src/vec/sink/writer/vodbc_table_writer.cpp
index c70bdd4ca19..19cb2e50109 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.cpp
+++ b/be/src/vec/sink/writer/vodbc_table_writer.cpp
@@ -42,8 +42,11 @@ ODBCConnectorParam 
VOdbcTableWriter::create_connect_param(const doris::TDataSink
 }
 
 VOdbcTableWriter::VOdbcTableWriter(const doris::TDataSink& t_sink,
-                                   const VExprContextSPtrs& output_expr_ctxs)
-        : AsyncResultWriter(output_expr_ctxs), 
ODBCConnector(create_connect_param(t_sink)) {}
+                                   const VExprContextSPtrs& output_expr_ctxs,
+                                   std::shared_ptr<pipeline::Dependency> dep,
+                                   std::shared_ptr<pipeline::Dependency> 
fin_dep)
+        : AsyncResultWriter(output_expr_ctxs, dep, fin_dep),
+          ODBCConnector(create_connect_param(t_sink)) {}
 
 Status VOdbcTableWriter::write(RuntimeState* state, vectorized::Block& block) {
     Block output_block;
diff --git a/be/src/vec/sink/writer/vodbc_table_writer.h 
b/be/src/vec/sink/writer/vodbc_table_writer.h
index fa4dc47b77f..9638dea684a 100644
--- a/be/src/vec/sink/writer/vodbc_table_writer.h
+++ b/be/src/vec/sink/writer/vodbc_table_writer.h
@@ -36,7 +36,9 @@ class VOdbcTableWriter final : public AsyncResultWriter, 
public ODBCConnector {
 public:
     static ODBCConnectorParam create_connect_param(const TDataSink&);
 
-    VOdbcTableWriter(const doris::TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
+    VOdbcTableWriter(const doris::TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs,
+                     std::shared_ptr<pipeline::Dependency> dep,
+                     std::shared_ptr<pipeline::Dependency> fin_dep);
 
     // connect to odbc server
     Status open(RuntimeState* state, RuntimeProfile* profile) override {
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index d3d6c35fc42..99eac0c1e51 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -976,8 +976,10 @@ void VNodeChannel::mark_close(bool hang_wait) {
     _eos_is_produced = true;
 }
 
-VTabletWriter::VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs)
-        : AsyncResultWriter(output_exprs), _t_sink(t_sink) {
+VTabletWriter::VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs,
+                             std::shared_ptr<pipeline::Dependency> dep,
+                             std::shared_ptr<pipeline::Dependency> fin_dep)
+        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
     _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc;
 }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index b9fbc4d0873..993f9781955 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -542,7 +542,9 @@ namespace doris::vectorized {
 // write result to file
 class VTabletWriter final : public AsyncResultWriter {
 public:
-    VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
+    VTabletWriter(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs,
+                  std::shared_ptr<pipeline::Dependency> dep,
+                  std::shared_ptr<pipeline::Dependency> fin_dep);
 
     Status write(RuntimeState* state, Block& block) override;
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 8bf0520aba0..a73cd5b4444 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -55,8 +55,10 @@
 
 namespace doris::vectorized {
 
-VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs)
-        : AsyncResultWriter(output_exprs), _t_sink(t_sink) {
+VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs,
+                                 std::shared_ptr<pipeline::Dependency> dep,
+                                 std::shared_ptr<pipeline::Dependency> fin_dep)
+        : AsyncResultWriter(output_exprs, dep, fin_dep), _t_sink(t_sink) {
     DCHECK(t_sink.__isset.olap_table_sink);
 }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 363dea54c3b..c3be80ce93e 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -102,7 +102,9 @@ using RowsForTablet = std::unordered_map<int64_t, Rows>;
 class VTabletWriterV2 final : public AsyncResultWriter {
 public:
     // Construct from thrift struct which is generated by FE.
-    VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
+    VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs,
+                    std::shared_ptr<pipeline::Dependency> dep,
+                    std::shared_ptr<pipeline::Dependency> fin_dep);
 
     ~VTabletWriterV2() override;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to