This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e29d8cb1108 [feature](move-memtable) support pipelineX in sink v2 
(#27067)
e29d8cb1108 is described below

commit e29d8cb110849637fc7beca5b5d76b103b9c95bf
Author: Kaijie Chen <c...@apache.org>
AuthorDate: Thu Nov 16 15:00:55 2023 +0800

    [feature](move-memtable) support pipelineX in sink v2 (#27067)
---
 be/src/exec/data_sink.cpp                          |   8 +-
 ..._operator.h => olap_table_sink_v2_operator.cpp} |  51 ++-
 be/src/pipeline/exec/olap_table_sink_v2_operator.h |  74 ++-
 be/src/pipeline/pipeline_x/operator.cpp            |   3 +
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  30 +-
 .../pipeline_x/pipeline_x_fragment_context.h       |   3 +
 be/src/vec/sink/vtablet_sink_v2.cpp                | 505 +--------------------
 be/src/vec/sink/vtablet_sink_v2.h                  | 178 +-------
 .../vtablet_writer_v2.cpp}                         | 129 +++---
 .../vtablet_writer_v2.h}                           |  37 +-
 10 files changed, 228 insertions(+), 790 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index f849dc84a25..970e7a3a18a 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -146,15 +146,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         break;
     }
     case TDataSinkType::OLAP_TABLE_SINK: {
-        Status status = Status::OK();
         DCHECK(thrift_sink.__isset.olap_table_sink);
         if (state->query_options().enable_memtable_on_sink_node &&
             
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
-            sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs, &status));
+            sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs, false));
         } else {
             sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs, false));
         }
-        RETURN_IF_ERROR(status);
         break;
     }
     case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: {
@@ -301,15 +299,13 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         break;
     }
     case TDataSinkType::OLAP_TABLE_SINK: {
-        Status status = Status::OK();
         DCHECK(thrift_sink.__isset.olap_table_sink);
         if (state->query_options().enable_memtable_on_sink_node &&
             
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
-            sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs, &status));
+            sink->reset(new vectorized::VOlapTableSinkV2(pool, row_desc, 
output_exprs, false));
         } else {
             sink->reset(new vectorized::VOlapTableSink(pool, row_desc, 
output_exprs, false));
         }
-        RETURN_IF_ERROR(status);
         break;
     }
     case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
similarity index 53%
copy from be/src/pipeline/exec/olap_table_sink_v2_operator.h
copy to be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
index f280e856f0c..99efc1d752e 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.cpp
@@ -15,35 +15,36 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#pragma once
+#include "olap_table_sink_v2_operator.h"
 
-#include "operator.h"
-#include "vec/sink/vtablet_sink_v2.h"
+#include "common/status.h"
 
-namespace doris {
-
-namespace pipeline {
-
-class OlapTableSinkV2OperatorBuilder final
-        : public DataSinkOperatorBuilder<vectorized::VOlapTableSinkV2> {
-public:
-    OlapTableSinkV2OperatorBuilder(int32_t id, DataSink* sink)
-            : DataSinkOperatorBuilder(id, "OlapTableSinkV2Operator", sink) {}
-
-    OperatorPtr build_operator() override;
-};
-
-class OlapTableSinkV2Operator final : public 
DataSinkOperator<OlapTableSinkV2OperatorBuilder> {
-public:
-    OlapTableSinkV2Operator(OperatorBuilderBase* operator_builder, DataSink* 
sink)
-            : DataSinkOperator(operator_builder, sink) {}
-
-    bool can_write() override { return true; } // TODO: need use mem_limit
-};
+namespace doris::pipeline {
 
 OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() {
     return std::make_shared<OlapTableSinkV2Operator>(this, _sink);
 }
 
-} // namespace pipeline
-} // namespace doris
\ No newline at end of file
+Status OlapTableSinkV2LocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    auto& p = _parent->cast<Parent>();
+    RETURN_IF_ERROR(_writer->init_properties(p._pool, p._group_commit));
+    return Status::OK();
+}
+
+Status OlapTableSinkV2LocalState::close(RuntimeState* state, Status 
exec_status) {
+    if (Base::_closed) {
+        return Status::OK();
+    }
+    SCOPED_TIMER(_close_timer);
+    SCOPED_TIMER(exec_time_counter());
+    if (_closed) {
+        return _close_status;
+    }
+    _close_status = Base::close(state, exec_status);
+    return _close_status;
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h 
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index f280e856f0c..5fb8f64dd31 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "operator.h"
+#include "pipeline/pipeline_x/operator.h"
 #include "vec/sink/vtablet_sink_v2.h"
 
 namespace doris {
@@ -41,9 +42,76 @@ public:
     bool can_write() override { return true; } // TODO: need use mem_limit
 };
 
-OperatorPtr OlapTableSinkV2OperatorBuilder::build_operator() {
-    return std::make_shared<OlapTableSinkV2Operator>(this, _sink);
-}
+class OlapTableSinkV2OperatorX;
+
+class OlapTableSinkV2LocalState final
+        : public AsyncWriterSink<vectorized::VTabletWriterV2, 
OlapTableSinkV2OperatorX> {
+public:
+    using Base = AsyncWriterSink<vectorized::VTabletWriterV2, 
OlapTableSinkV2OperatorX>;
+    using Parent = OlapTableSinkV2OperatorX;
+    ENABLE_FACTORY_CREATOR(OlapTableSinkV2LocalState);
+    OlapTableSinkV2LocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state)
+            : Base(parent, state) {};
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override {
+        SCOPED_TIMER(exec_time_counter());
+        SCOPED_TIMER(_open_timer);
+        return Base::open(state);
+    }
+
+    Status close(RuntimeState* state, Status exec_status) override;
+    friend class OlapTableSinkV2OperatorX;
+
+private:
+    Status _close_status = Status::OK();
+};
+
+class OlapTableSinkV2OperatorX final : public 
DataSinkOperatorX<OlapTableSinkV2LocalState> {
+public:
+    using Base = DataSinkOperatorX<OlapTableSinkV2LocalState>;
+    OlapTableSinkV2OperatorX(ObjectPool* pool, int operator_id, const 
RowDescriptor& row_desc,
+                             const std::vector<TExpr>& t_output_expr, bool 
group_commit)
+            : Base(operator_id, 0),
+              _row_desc(row_desc),
+              _t_output_expr(t_output_expr),
+              _group_commit(group_commit),
+              _pool(pool) {};
+
+    Status init(const TDataSink& thrift_sink) override {
+        RETURN_IF_ERROR(Base::init(thrift_sink));
+        // From the thrift expressions create the real exprs.
+        RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr, 
_output_vexpr_ctxs));
+        return Status::OK();
+    }
+
+    Status prepare(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::prepare(state));
+        return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc);
+    }
+
+    Status open(RuntimeState* state) override {
+        RETURN_IF_ERROR(Base::open(state));
+        return vectorized::VExpr::open(_output_vexpr_ctxs, state);
+    }
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block,
+                SourceState source_state) override {
+        auto& local_state = get_local_state(state);
+        SCOPED_TIMER(local_state.exec_time_counter());
+        COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
+        return local_state.sink(state, in_block, source_state);
+    }
+
+private:
+    friend class OlapTableSinkV2LocalState;
+    template <typename Writer, typename Parent>
+    friend class AsyncWriterSink;
+    const RowDescriptor& _row_desc;
+    vectorized::VExprContextSPtrs _output_vexpr_ctxs;
+    const std::vector<TExpr>& _t_output_expr;
+    const bool _group_commit;
+    ObjectPool* _pool;
+};
 
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index f8430a57159..9e6df06da01 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -46,6 +46,7 @@
 #include "pipeline/exec/nested_loop_join_probe_operator.h"
 #include "pipeline/exec/olap_scan_operator.h"
 #include "pipeline/exec/olap_table_sink_operator.h"
+#include "pipeline/exec/olap_table_sink_v2_operator.h"
 #include "pipeline/exec/partition_sort_sink_operator.h"
 #include "pipeline/exec/partition_sort_source_operator.h"
 #include "pipeline/exec/repeat_operator.h"
@@ -544,6 +545,7 @@ DECLARE_OPERATOR_X(ResultSinkLocalState)
 DECLARE_OPERATOR_X(JdbcTableSinkLocalState)
 DECLARE_OPERATOR_X(ResultFileSinkLocalState)
 DECLARE_OPERATOR_X(OlapTableSinkLocalState)
+DECLARE_OPERATOR_X(OlapTableSinkV2LocalState)
 DECLARE_OPERATOR_X(AnalyticSinkLocalState)
 DECLARE_OPERATOR_X(SortSinkLocalState)
 DECLARE_OPERATOR_X(LocalExchangeSinkLocalState)
@@ -624,5 +626,6 @@ template class PipelineXLocalState<LocalExchangeDependency>;
 template class AsyncWriterSink<doris::vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, 
JdbcTableSinkOperatorX>;
 template class AsyncWriterSink<doris::vectorized::VTabletWriter, 
OlapTableSinkOperatorX>;
+template class AsyncWriterSink<doris::vectorized::VTabletWriterV2, 
OlapTableSinkV2OperatorX>;
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 200ac23504e..7113989ee1e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -62,6 +62,7 @@
 #include "pipeline/exec/nested_loop_join_probe_operator.h"
 #include "pipeline/exec/olap_scan_operator.h"
 #include "pipeline/exec/olap_table_sink_operator.h"
+#include "pipeline/exec/olap_table_sink_v2_operator.h"
 #include "pipeline/exec/partition_sort_sink_operator.h"
 #include "pipeline/exec/partition_sort_source_operator.h"
 #include "pipeline/exec/repeat_operator.h"
@@ -268,9 +269,10 @@ Status 
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
         break;
     }
     case TDataSinkType::OLAP_TABLE_SINK: {
-        if (state->query_options().enable_memtable_on_sink_node) {
-            return Status::InternalError(
-                    "Unsuported OLAP_TABLE_SINK with 
enable_memtable_on_sink_node ");
+        if (state->query_options().enable_memtable_on_sink_node &&
+            
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
+            _sink.reset(new OlapTableSinkV2OperatorX(pool, next_operator_id(), 
row_desc,
+                                                     output_exprs, false));
         } else {
             _sink.reset(new OlapTableSinkOperatorX(pool, next_operator_id(), 
row_desc, output_exprs,
                                                    false));
@@ -412,6 +414,9 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
         
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
         _runtime_states[i]->resize_op_id_to_local_state(max_operator_id());
+        
_runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node);
+        _runtime_states[i]->set_total_load_streams(request.total_load_streams);
+        _runtime_states[i]->set_num_local_sink(request.num_local_sink);
         std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
         for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
             auto task = std::make_unique<PipelineXTask>(
@@ -1005,4 +1010,23 @@ Status PipelineXFragmentContext::send_report(bool done) {
                        std::placeholders::_2)},
             shared_from_this());
 }
+
+bool 
PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink 
sink) {
+    OlapTableSchemaParam schema;
+    if (!schema.init(sink.schema).ok()) {
+        return false;
+    }
+    if (schema.is_partial_update()) {
+        return true;
+    }
+    for (const auto& index_schema : schema.indexes()) {
+        for (const auto& index : index_schema->indexes) {
+            if (index->index_type() == INVERTED) {
+                return true;
+            }
+        }
+    }
+    return false;
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 4d2a59277e9..6fa91aedf12 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -145,6 +145,9 @@ private:
                              const TPipelineFragmentParams& params, const 
RowDescriptor& row_desc,
                              RuntimeState* state, DescriptorTbl& desc_tbl,
                              PipelineId cur_pipeline_id);
+
+    bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
+
     OperatorXPtr _root_op = nullptr;
     // this is a [n * m] matrix. n is parallelism of pipeline engine and m is 
the number of pipelines.
     std::vector<std::vector<std::unique_ptr<PipelineXTask>>> _tasks;
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/vtablet_sink_v2.cpp
index ac9be0e7fb7..9385bd93202 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -17,47 +17,22 @@
 
 #include "vec/sink/vtablet_sink_v2.h"
 
-#include <brpc/uri.h>
-#include <bthread/bthread.h>
-#include <fmt/format.h>
 #include <gen_cpp/DataSinks_types.h>
 #include <gen_cpp/Descriptors_types.h>
-#include <gen_cpp/Metrics_types.h>
-#include <gen_cpp/Types_types.h>
-#include <gen_cpp/internal_service.pb.h>
 
-#include <algorithm>
-#include <execution>
-#include <mutex>
 #include <ranges>
-#include <string>
 #include <unordered_map>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
-#include "common/logging.h"
 #include "common/object_pool.h"
-#include "common/signal_handler.h"
 #include "common/status.h"
-#include "exec/tablet_info.h"
 #include "olap/delta_writer_v2.h"
 #include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
-#include "service/brpc.h"
-#include "util/brpc_client_cache.h"
 #include "util/doris_metrics.h"
-#include "util/network_util.h"
-#include "util/threadpool.h"
-#include "util/thrift_util.h"
-#include "util/uid_util.h"
-#include "vec/core/block.h"
-#include "vec/exprs/vexpr.h"
 #include "vec/sink/delta_writer_v2_pool.h"
 #include "vec/sink/load_stream_stub.h"
 #include "vec/sink/load_stream_stub_pool.h"
-#include "vec/sink/vtablet_block_convertor.h"
-#include "vec/sink/vtablet_finder.h"
 
 namespace doris {
 class TExpr;
@@ -65,391 +40,16 @@ class TExpr;
 namespace vectorized {
 
 VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& 
row_desc,
-                                   const std::vector<TExpr>& texprs, Status* 
status)
-        : DataSink(row_desc), _pool(pool) {
-    // From the thrift expressions create the real exprs.
-    *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
-    _name = "VOlapTableSinkV2";
-}
+                                   const std::vector<TExpr>& texprs, bool 
group_commit)
+        : AsyncWriterSink<VTabletWriterV2, VOLAP_TABLE_SINK_V2>(row_desc, 
texprs),
+          _pool(pool),
+          _group_commit(group_commit) {}
 
 VOlapTableSinkV2::~VOlapTableSinkV2() = default;
 
-Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) 
{
-    // add new tablet locations. it will use by address. so add to pool
-    auto* new_locations = _pool->add(new 
std::vector<TTabletLocation>(result->tablets));
-    _location->add_locations(*new_locations);
-
-    // update new node info
-    _nodes_info->add_nodes(result->nodes);
-
-    // incremental open stream
-    RETURN_IF_ERROR(_incremental_open_streams(result->partitions));
-
-    return Status::OK();
-}
-
-static Status on_partitions_created(void* writer, TCreatePartitionResult* 
result) {
-    return 
static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result);
-}
-
-Status VOlapTableSinkV2::_incremental_open_streams(
-        const std::vector<TOlapTablePartition>& partitions) {
-    // do what we did in prepare() for partitions. indexes which don't change 
when we create new partition is orthogonal to partitions.
-    std::unordered_set<int64_t> known_indexes;
-    std::unordered_set<int64_t> new_backends;
-    for (const auto& t_partition : partitions) {
-        VOlapTablePartition* partition = nullptr;
-        RETURN_IF_ERROR(_vpartition->generate_partition_from(t_partition, 
partition));
-        for (const auto& index : partition->indexes) {
-            for (const auto& tablet_id : index.tablets) {
-                auto nodes = _location->find_tablet(tablet_id)->node_ids;
-                for (auto& node : nodes) {
-                    PTabletID tablet;
-                    tablet.set_partition_id(partition->id);
-                    tablet.set_index_id(index.index_id);
-                    tablet.set_tablet_id(tablet_id);
-                    if (!_streams_for_node.contains(node)) {
-                        new_backends.insert(node);
-                    }
-                    _tablets_for_node[node].emplace(tablet_id, tablet);
-                    if (known_indexes.contains(index.index_id)) [[likely]] {
-                        continue;
-                    }
-                    _indexes_from_node[node].emplace_back(tablet);
-                    known_indexes.insert(index.index_id);
-                }
-            }
-        }
-    }
-    for (int64_t node_id : new_backends) {
-        auto load_streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
-                _load_id, _backend_id, node_id, _stream_per_node, 
_num_local_sink);
-        RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams));
-        _streams_for_node[node_id] = load_streams;
-    }
-    return Status::OK();
-}
-
-Status VOlapTableSinkV2::_init_row_distribution() {
-    VRowDistributionContext ctx;
-
-    ctx.state = _state;
-    ctx.block_convertor = _block_convertor.get();
-    ctx.tablet_finder = _tablet_finder.get();
-    ctx.vpartition = _vpartition;
-    ctx.add_partition_request_timer = _add_partition_request_timer;
-    ctx.txn_id = _txn_id;
-    ctx.pool = _pool;
-    ctx.location = _location;
-    ctx.vec_output_expr_ctxs = &_output_vexpr_ctxs;
-    ctx.on_partitions_created = &vectorized::on_partitions_created;
-    ctx.caller = (void*)this;
-    ctx.schema = _schema;
-
-    _row_distribution.init(&ctx);
-
-    RETURN_IF_ERROR(_row_distribution.open(_output_row_desc));
-
-    return Status::OK();
-}
-
 Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
-    DCHECK(t_sink.__isset.olap_table_sink);
-    auto& table_sink = t_sink.olap_table_sink;
-    _load_id.set_hi(table_sink.load_id.hi);
-    _load_id.set_lo(table_sink.load_id.lo);
-    _txn_id = table_sink.txn_id;
-    _num_replicas = table_sink.num_replicas;
-    _tuple_desc_id = table_sink.tuple_id;
-    _write_file_cache = table_sink.write_file_cache;
-    _schema.reset(new OlapTableSchemaParam());
-    RETURN_IF_ERROR(_schema->init(table_sink.schema));
-    _location = _pool->add(new OlapTableLocationParam(table_sink.location));
-    _nodes_info = _pool->add(new DorisNodesInfo(table_sink.nodes_info));
-
-    // if distributed column list is empty, we can ensure that tablet is with 
random distribution info
-    // and if load_to_single_tablet is set and set to true, we should find 
only one tablet in one partition
-    // for the whole olap table sink
-    auto find_tablet_mode = 
OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_ROW;
-    if (table_sink.partition.distributed_columns.empty()) {
-        if (table_sink.__isset.load_to_single_tablet && 
table_sink.load_to_single_tablet) {
-            find_tablet_mode = 
OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_SINK;
-        } else {
-            find_tablet_mode = 
OlapTabletFinder::FindTabletMode::FIND_TABLET_EVERY_BATCH;
-        }
-    }
-    _vpartition = _pool->add(new doris::VOlapTablePartitionParam(_schema, 
table_sink.partition));
-    _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, 
find_tablet_mode);
-    RETURN_IF_ERROR(_vpartition->init());
-
-    return Status::OK();
-}
-
-Status VOlapTableSinkV2::prepare(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSink::prepare(state));
-
-    _state = state;
-
-    _sender_id = state->per_fragment_instance_idx();
-    _num_senders = state->num_per_fragment_instances();
-    _backend_id = state->backend_id();
-    _stream_per_node = state->load_stream_per_node();
-    _total_streams = state->total_load_streams();
-    _num_local_sink = state->num_local_sink();
-    DCHECK(_stream_per_node > 0) << "load stream per node should be greator 
than 0";
-    DCHECK(_total_streams > 0) << "total load streams should be greator than 
0";
-    DCHECK(_num_local_sink > 0) << "num local sink should be greator than 0";
-    LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " << 
_stream_per_node
-              << ", total_streams " << _total_streams << ", num_local_sink: " 
<< _num_local_sink;
-    _is_high_priority =
-            (state->execution_timeout() <= 
config::load_task_high_priority_threshold_second);
-
-    // profile must add to state's object pool
-    _profile = state->obj_pool()->add(new RuntimeProfile("VOlapTableSinkV2"));
-    _mem_tracker = std::make_shared<MemTracker>("VOlapTableSinkV2:" +
-                                                
std::to_string(state->load_job_id()));
-    SCOPED_TIMER(_profile->total_time_counter());
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-
-    // get table's tuple descriptor
-    _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
-    if (_output_tuple_desc == nullptr) {
-        return Status::InternalError("unknown destination tuple descriptor, id 
= {}",
-                                     _tuple_desc_id);
-    }
-    _block_convertor = 
std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
-    _block_convertor->init_autoinc_info(_schema->db_id(), _schema->table_id(),
-                                        _state->batch_size());
-    _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, 
false));
-
-    // add all counter
-    _input_rows_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
-    _output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT);
-    _filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered", 
TUnit::UNIT);
-    _send_data_timer = ADD_TIMER(_profile, "SendDataTime");
-    _wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime", 
"SendDataTime");
-    _row_distribution_timer = ADD_CHILD_TIMER(_profile, "RowDistributionTime", 
"SendDataTime");
-    _write_memtable_timer = ADD_CHILD_TIMER(_profile, "WriteMemTableTime", 
"SendDataTime");
-    _validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime");
-    _open_timer = ADD_TIMER(_profile, "OpenTime");
-    _close_timer = ADD_TIMER(_profile, "CloseWaitTime");
-    _close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", 
"CloseWaitTime");
-    _close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", 
"CloseWaitTime");
-
-    // Prepare the exprs to run.
-    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
-    if (config::share_delta_writers) {
-        _delta_writer_for_tablet = 
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
-                _load_id, _num_local_sink);
-    } else {
-        _delta_writer_for_tablet = 
std::make_shared<DeltaWriterV2Map>(_load_id);
-    }
-    return Status::OK();
-}
-
-Status VOlapTableSinkV2::open(RuntimeState* state) {
-    // Prepare the exprs to run.
-    RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
-    SCOPED_TIMER(_profile->total_time_counter());
-    SCOPED_TIMER(_open_timer);
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-    signal::set_signal_task_id(_load_id);
-
-    _build_tablet_node_mapping();
-    RETURN_IF_ERROR(_open_streams(_backend_id));
-    RETURN_IF_ERROR(_init_row_distribution());
-
-    return Status::OK();
-}
-
-Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
-    for (auto& [dst_id, _] : _tablets_for_node) {
-        auto streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
-                _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
-        RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
-        _streams_for_node[dst_id] = streams;
-    }
-    return Status::OK();
-}
-
-Status VOlapTableSinkV2::_open_streams_to_backend(int64_t dst_id,
-                                                  
::doris::stream_load::LoadStreams& streams) {
-    auto node_info = _nodes_info->find_node(dst_id);
-    if (node_info == nullptr) {
-        return Status::InternalError("Unknown node {} in tablet location", 
dst_id);
-    }
-    // get tablet schema from each backend only in the 1st stream
-    for (auto& stream : streams.streams() | std::ranges::views::take(1)) {
-        const std::vector<PTabletID>& tablets_for_schema = 
_indexes_from_node[node_info->id];
-        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
-                                     _txn_id, *_schema, tablets_for_schema, 
_total_streams,
-                                     _state->enable_profile()));
-    }
-    // for the rest streams, open without getting tablet schema
-    for (auto& stream : streams.streams() | std::ranges::views::drop(1)) {
-        
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(), 
*node_info,
-                                     _txn_id, *_schema, {}, _total_streams,
-                                     _state->enable_profile()));
-    }
-    return Status::OK();
-}
-
-void VOlapTableSinkV2::_build_tablet_node_mapping() {
-    std::unordered_set<int64_t> known_indexes;
-    for (const auto& partition : _vpartition->get_partitions()) {
-        for (const auto& index : partition->indexes) {
-            for (const auto& tablet_id : index.tablets) {
-                auto nodes = _location->find_tablet(tablet_id)->node_ids;
-                for (auto& node : nodes) {
-                    PTabletID tablet;
-                    tablet.set_partition_id(partition->id);
-                    tablet.set_index_id(index.index_id);
-                    tablet.set_tablet_id(tablet_id);
-                    _tablets_for_node[node].emplace(tablet_id, tablet);
-                    if (known_indexes.contains(index.index_id)) [[likely]] {
-                        continue;
-                    }
-                    _indexes_from_node[node].emplace_back(tablet);
-                    known_indexes.insert(index.index_id);
-                }
-            }
-        }
-    }
-}
-
-void 
VOlapTableSinkV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
-                                                 RowsForTablet& 
rows_for_tablet) {
-    for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); 
index_idx++) {
-        auto& row_ids = row_part_tablet_ids[index_idx].row_ids;
-        auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
-        auto& tablet_ids = row_part_tablet_ids[index_idx].tablet_ids;
-
-        for (int i = 0; i < row_ids.size(); i++) {
-            auto& tablet_id = tablet_ids[i];
-            auto it = rows_for_tablet.find(tablet_id);
-            if (it == rows_for_tablet.end()) {
-                Rows rows;
-                rows.partition_id = partition_ids[i];
-                rows.index_id = _schema->indexes()[index_idx]->index_id;
-                rows.row_idxes.reserve(row_ids.size());
-                auto [tmp_it, _] = rows_for_tablet.insert({tablet_id, rows});
-                it = tmp_it;
-            }
-            it->second.row_idxes.push_back(row_ids[i]);
-            _number_output_rows++;
-        }
-    }
-}
-
-Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, int64_t 
partition_id, int64_t index_id,
-                                         Streams& streams) {
-    auto location = _location->find_tablet(tablet_id);
-    if (location == nullptr) {
-        return Status::InternalError("unknown tablet location, tablet id = 
{}", tablet_id);
-    }
-    for (auto& node_id : location->node_ids) {
-        PTabletID tablet;
-        tablet.set_partition_id(partition_id);
-        tablet.set_index_id(index_id);
-        tablet.set_tablet_id(tablet_id);
-        _tablets_for_node[node_id].emplace(tablet_id, tablet);
-        
streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index));
-        RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, 
tablet_id));
-    }
-    _stream_index = (_stream_index + 1) % _stream_per_node;
-    return Status::OK();
-}
-
-Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* 
input_block, bool eos) {
-    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-    Status status = Status::OK();
-
-    if (state->query_options().dry_run_query) {
-        return status;
-    }
-
-    auto input_rows = input_block->rows();
-    auto input_bytes = input_block->bytes();
-    if (UNLIKELY(input_rows == 0)) {
-        return status;
-    }
-    SCOPED_TIMER(_profile->total_time_counter());
-    _number_input_rows += input_rows;
-    // update incrementally so that FE can get the progress.
-    // the real 'num_rows_load_total' will be set when sink being closed.
-    state->update_num_rows_load_total(input_rows);
-    state->update_num_bytes_load_total(input_bytes);
-    DorisMetrics::instance()->load_rows->increment(input_rows);
-    DorisMetrics::instance()->load_bytes->increment(input_bytes);
-
-    bool has_filtered_rows = false;
-    int64_t filtered_rows = 0;
-
-    SCOPED_RAW_TIMER(&_send_data_ns);
-    // This is just for passing compilation.
-    _row_distribution_watch.start();
-
-    std::shared_ptr<vectorized::Block> block;
-    RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
-            *input_block, block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids));
-    RowsForTablet rows_for_tablet;
-    _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
-
-    _row_distribution_watch.stop();
-
-    // For each tablet, send its input_rows from block to delta writer
-    for (const auto& [tablet_id, rows] : rows_for_tablet) {
-        Streams streams;
-        RETURN_IF_ERROR(_select_streams(tablet_id, rows.partition_id, 
rows.index_id, streams));
-        RETURN_IF_ERROR(_write_memtable(block, tablet_id, rows, streams));
-    }
-
-    return Status::OK();
-}
-
-Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> 
block,
-                                         int64_t tablet_id, const Rows& rows,
-                                         const Streams& streams) {
-    DeltaWriterV2* delta_writer = 
_delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
-        WriteRequest req {
-                .tablet_id = tablet_id,
-                .txn_id = _txn_id,
-                .index_id = rows.index_id,
-                .partition_id = rows.partition_id,
-                .load_id = _load_id,
-                .tuple_desc = _output_tuple_desc,
-                .table_schema_param = _schema.get(),
-                .is_high_priority = _is_high_priority,
-                .write_file_cache = _write_file_cache,
-        };
-        for (auto& index : _schema->indexes()) {
-            if (index->index_id == rows.index_id) {
-                req.slots = &index->slots;
-                req.schema_hash = index->schema_hash;
-                break;
-            }
-        }
-        return DeltaWriterV2::open(&req, streams);
-    });
-    {
-        SCOPED_TIMER(_wait_mem_limit_timer);
-        
ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush();
-    }
-    SCOPED_TIMER(_write_memtable_timer);
-    auto st = delta_writer->write(block.get(), rows.row_idxes, false);
-    return st;
-}
-
-Status VOlapTableSinkV2::_cancel(Status status) {
-    LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
-              << ", txn_id=" << _txn_id << ", due to error: " << status;
-    if (_delta_writer_for_tablet) {
-        _delta_writer_for_tablet->cancel(status);
-        _delta_writer_for_tablet.reset();
-    }
-    for (const auto& [_, streams] : _streams_for_node) {
-        streams->release();
-    }
+    RETURN_IF_ERROR(AsyncWriterSink::init(t_sink));
+    RETURN_IF_ERROR(_writer->init_properties(_pool, _group_commit));
     return Status::OK();
 }
 
@@ -457,97 +57,8 @@ Status VOlapTableSinkV2::close(RuntimeState* state, Status 
exec_status) {
     if (_closed) {
         return _close_status;
     }
-    SCOPED_TIMER(_close_timer);
-    Status status = exec_status;
-    if (status.ok()) {
-        // only if status is ok can we call this 
_profile->total_time_counter().
-        // if status is not ok, this sink may not be prepared, so that 
_profile is null
-        SCOPED_TIMER(_profile->total_time_counter());
-
-        COUNTER_SET(_input_rows_counter, _number_input_rows);
-        COUNTER_SET(_output_rows_counter, _number_output_rows);
-        COUNTER_SET(_filtered_rows_counter,
-                    _block_convertor->num_filtered_rows() + 
_tablet_finder->num_filtered_rows());
-        COUNTER_SET(_send_data_timer, _send_data_ns);
-        COUNTER_SET(_row_distribution_timer, 
(int64_t)_row_distribution_watch.elapsed_time());
-        COUNTER_SET(_validate_data_timer, 
_block_convertor->validate_data_ns());
-
-        // release streams from the pool first, to prevent memory leak
-        for (const auto& [_, streams] : _streams_for_node) {
-            streams->release();
-        }
-
-        {
-            SCOPED_TIMER(_close_writer_timer);
-            // close all delta writers if this is the last user
-            RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile));
-            _delta_writer_for_tablet.reset();
-        }
-
-        {
-            // send CLOSE_LOAD to all streams, return ERROR if any
-            for (const auto& [_, streams] : _streams_for_node) {
-                RETURN_IF_ERROR(_close_load(streams->streams()));
-            }
-        }
-
-        {
-            SCOPED_TIMER(_close_load_timer);
-            for (const auto& [_, streams] : _streams_for_node) {
-                for (const auto& stream : streams->streams()) {
-                    RETURN_IF_ERROR(stream->close_wait());
-                }
-            }
-        }
-
-        std::vector<TTabletCommitInfo> tablet_commit_infos;
-        for (const auto& [node_id, streams] : _streams_for_node) {
-            for (const auto& stream : streams->streams()) {
-                for (auto tablet_id : stream->success_tablets()) {
-                    TTabletCommitInfo commit_info;
-                    commit_info.tabletId = tablet_id;
-                    commit_info.backendId = node_id;
-                    tablet_commit_infos.emplace_back(std::move(commit_info));
-                }
-            }
-        }
-        state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
-                                            
std::make_move_iterator(tablet_commit_infos.begin()),
-                                            
std::make_move_iterator(tablet_commit_infos.end()));
-        _streams_for_node.clear();
-
-        // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node
-        int64_t num_rows_load_total = _number_input_rows + 
state->num_rows_load_filtered() +
-                                      state->num_rows_load_unselected();
-        state->set_num_rows_load_total(num_rows_load_total);
-        
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
-                                             
_tablet_finder->num_filtered_rows());
-        state->update_num_rows_load_unselected(
-                _tablet_finder->num_immutable_partition_filtered_rows());
-
-        LOG(INFO) << "finished to close olap table sink. load_id=" << 
print_id(_load_id)
-                  << ", txn_id=" << _txn_id;
-    } else {
-        RETURN_IF_ERROR(_cancel(status));
-    }
-
-    _close_status = status;
-    RETURN_IF_ERROR(DataSink::close(state, exec_status));
-    return status;
-}
-
-Status VOlapTableSinkV2::_close_load(const Streams& streams) {
-    auto node_id = streams[0]->dst_id();
-    std::vector<PTabletID> tablets_to_commit;
-    for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) {
-        if (_tablet_finder->partition_ids().contains(tablet.partition_id())) {
-            tablets_to_commit.push_back(tablet);
-        }
-    }
-    for (const auto& stream : streams) {
-        RETURN_IF_ERROR(stream->close_load(tablets_to_commit));
-    }
-    return Status::OK();
+    _close_status = AsyncWriterSink::close(state, exec_status);
+    return _close_status;
 }
 
 } // namespace vectorized
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/vtablet_sink_v2.h
index 1f317420de4..cef4659bddb 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -26,204 +26,46 @@
 #include <gen_cpp/types.pb.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
-#include <stddef.h>
-#include <stdint.h>
 
 #include <atomic>
+#include <cstddef>
+#include <cstdint>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
-#include <functional>
-#include <initializer_list>
-#include <map>
-#include <memory>
-#include <mutex>
-#include <ostream>
-#include <queue>
-#include <set>
-#include <string>
-#include <unordered_map>
-#include <unordered_set>
-#include <utility>
 #include <vector>
 
-#include "common/config.h"
 #include "common/status.h"
-#include "exec/data_sink.h"
-#include "exec/tablet_info.h"
-#include "gutil/ref_counted.h"
 #include "runtime/exec_env.h"
-#include "runtime/memory/mem_tracker.h"
-#include "runtime/thread_context.h"
 #include "runtime/types.h"
-#include "util/countdown_latch.h"
-#include "util/runtime_profile.h"
-#include "util/stopwatch.hpp"
-#include "vec/columns/column.h"
-#include "vec/common/allocator.h"
-#include "vec/common/hash_table/phmap_fwd_decl.h"
-#include "vec/core/block.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
-#include "vec/sink/vrow_distribution.h"
+#include "vec/sink/async_writer_sink.h"
+#include "vec/sink/writer/vtablet_writer_v2.h"
 
 namespace doris {
-class DeltaWriterV2;
-class LoadStreamStub;
-class ObjectPool;
-class RowDescriptor;
-class RuntimeState;
-class TDataSink;
-class TExpr;
-class TabletSchema;
-class TupleDescriptor;
-
-namespace stream_load {
-class LoadStreams;
-}
 
 namespace vectorized {
 
-class OlapTableBlockConvertor;
-class OlapTabletFinder;
-class VOlapTableSinkV2;
-class DeltaWriterV2Map;
-
-using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
-
-struct Rows {
-    int64_t partition_id;
-    int64_t index_id;
-    std::vector<int32_t> row_idxes;
-};
-
-using RowsForTablet = std::unordered_map<int64_t, Rows>;
+inline constexpr char VOLAP_TABLE_SINK_V2[] = "VOlapTableSinkV2";
 
-// Write block data to Olap Table.
-// When OlapTableSink::open() called, there will be a consumer thread running 
in the background.
-// When you call VOlapTableSinkV2::send(), you will be the producer who 
products pending batches.
-// Join the consumer thread in close().
-class VOlapTableSinkV2 final : public DataSink {
+class VOlapTableSinkV2 final : public AsyncWriterSink<VTabletWriterV2, 
VOLAP_TABLE_SINK_V2> {
 public:
     // Construct from thrift struct which is generated by FE.
     VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc,
-                     const std::vector<TExpr>& texprs, Status* status);
+                     const std::vector<TExpr>& texprs, bool group_commit);
 
     ~VOlapTableSinkV2() override;
 
     Status init(const TDataSink& sink) override;
-    // TODO: unify the code of prepare/open/close with result sink
-    Status prepare(RuntimeState* state) override;
 
-    Status open(RuntimeState* state) override;
-
-    Status close(RuntimeState* state, Status close_status) override;
-
-    Status send(RuntimeState* state, vectorized::Block* block, bool eos = 
false) override;
-
-    Status on_partitions_created(TCreatePartitionResult* result);
+    Status close(RuntimeState* state, Status exec_status) override;
 
 private:
-    Status _init_row_distribution();
-
-    Status _open_streams(int64_t src_id);
-
-    Status _open_streams_to_backend(int64_t dst_id, 
::doris::stream_load::LoadStreams& streams);
-
-    Status _incremental_open_streams(const std::vector<TOlapTablePartition>& 
partitions);
-
-    void _build_tablet_node_mapping();
-
-    void _generate_rows_for_tablet(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
-                                   RowsForTablet& rows_for_tablet);
-
-    Status _write_memtable(std::shared_ptr<vectorized::Block> block, int64_t 
tablet_id,
-                           const Rows& rows, const Streams& streams);
-
-    Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t 
index_id,
-                           Streams& streams);
-
-    Status _close_load(const Streams& streams);
-
-    Status _cancel(Status status);
-
-    std::shared_ptr<MemTracker> _mem_tracker;
-
     ObjectPool* _pool;
 
-    // unique load id
-    PUniqueId _load_id;
-    int64_t _txn_id = -1;
-    int _num_replicas = -1;
-    int _tuple_desc_id = -1;
-
-    // this is tuple descriptor of destination OLAP table
-    TupleDescriptor* _output_tuple_desc = nullptr;
-    RowDescriptor* _output_row_desc = nullptr;
-
-    // number of senders used to insert into OlapTable, if we only support 
single node insert,
-    // all data from select should collectted and then send to OlapTable.
-    // To support multiple senders, we maintain a channel for each sender.
-    int _sender_id = -1;
-    int _num_senders = -1;
-    int64_t _backend_id = -1;
-    int _stream_per_node = -1;
-    int _total_streams = -1;
-    int _num_local_sink = -1;
-    bool _is_high_priority = false;
-    bool _write_file_cache = false;
-
-    // TODO(zc): think about cache this data
-    std::shared_ptr<OlapTableSchemaParam> _schema;
-    OlapTableLocationParam* _location = nullptr;
-    DorisNodesInfo* _nodes_info = nullptr;
-
-    std::unique_ptr<OlapTabletFinder> _tablet_finder;
-
-    std::unique_ptr<OlapTableBlockConvertor> _block_convertor;
-
-    // Stats for this
-    int64_t _send_data_ns = 0;
-    int64_t _number_input_rows = 0;
-    int64_t _number_output_rows = 0;
-
-    MonotonicStopWatch _row_distribution_watch;
-
-    RuntimeProfile::Counter* _input_rows_counter = nullptr;
-    RuntimeProfile::Counter* _output_rows_counter = nullptr;
-    RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
-    RuntimeProfile::Counter* _send_data_timer = nullptr;
-    RuntimeProfile::Counter* _row_distribution_timer = nullptr;
-    RuntimeProfile::Counter* _write_memtable_timer = nullptr;
-    RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
-    RuntimeProfile::Counter* _validate_data_timer = nullptr;
-    RuntimeProfile::Counter* _open_timer = nullptr;
-    RuntimeProfile::Counter* _close_timer = nullptr;
-    RuntimeProfile::Counter* _close_writer_timer = nullptr;
-    RuntimeProfile::Counter* _close_load_timer = nullptr;
-    RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
-
-    // Save the status of close() method
-    Status _close_status;
-
-    VOlapTablePartitionParam* _vpartition = nullptr;
-    vectorized::VExprContextSPtrs _output_vexpr_ctxs;
-
-    RuntimeState* _state = nullptr;
-
-    std::unordered_set<int64_t> _opened_partitions;
-
-    std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> 
_tablets_for_node;
-    std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
-
-    std::unordered_map<int64_t, 
std::shared_ptr<::doris::stream_load::LoadStreams>>
-            _streams_for_node;
-
-    size_t _stream_index = 0;
-    std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
+    bool _group_commit = false;
 
-    VRowDistribution _row_distribution;
-    // reuse to avoid frequent memory allocation and release.
-    std::vector<RowPartTabletIds> _row_part_tablet_ids;
+    Status _close_status = Status::OK();
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
similarity index 83%
copy from be/src/vec/sink/vtablet_sink_v2.cpp
copy to be/src/vec/sink/writer/vtablet_writer_v2.cpp
index ac9be0e7fb7..7cf553fddab 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -15,18 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/sink/vtablet_sink_v2.h"
+#include "vec/sink/writer/vtablet_writer_v2.h"
 
 #include <brpc/uri.h>
-#include <bthread/bthread.h>
-#include <fmt/format.h>
 #include <gen_cpp/DataSinks_types.h>
 #include <gen_cpp/Descriptors_types.h>
 #include <gen_cpp/Metrics_types.h>
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/internal_service.pb.h>
 
-#include <algorithm>
 #include <execution>
 #include <mutex>
 #include <ranges>
@@ -34,7 +31,6 @@
 #include <unordered_map>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
-#include "common/logging.h"
 #include "common/object_pool.h"
 #include "common/signal_handler.h"
 #include "common/status.h"
@@ -47,12 +43,10 @@
 #include "service/brpc.h"
 #include "util/brpc_client_cache.h"
 #include "util/doris_metrics.h"
-#include "util/network_util.h"
 #include "util/threadpool.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
 #include "vec/core/block.h"
-#include "vec/exprs/vexpr.h"
 #include "vec/sink/delta_writer_v2_pool.h"
 #include "vec/sink/load_stream_stub.h"
 #include "vec/sink/load_stream_stub_pool.h"
@@ -60,21 +54,17 @@
 #include "vec/sink/vtablet_finder.h"
 
 namespace doris {
-class TExpr;
 
 namespace vectorized {
 
-VOlapTableSinkV2::VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& 
row_desc,
-                                   const std::vector<TExpr>& texprs, Status* 
status)
-        : DataSink(row_desc), _pool(pool) {
-    // From the thrift expressions create the real exprs.
-    *status = vectorized::VExpr::create_expr_trees(texprs, _output_vexpr_ctxs);
-    _name = "VOlapTableSinkV2";
+VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs)
+        : AsyncResultWriter(output_exprs), _t_sink(t_sink) {
+    DCHECK(t_sink.__isset.olap_table_sink);
 }
 
-VOlapTableSinkV2::~VOlapTableSinkV2() = default;
+VTabletWriterV2::~VTabletWriterV2() = default;
 
-Status VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) 
{
+Status VTabletWriterV2::on_partitions_created(TCreatePartitionResult* result) {
     // add new tablet locations. it will use by address. so add to pool
     auto* new_locations = _pool->add(new 
std::vector<TTabletLocation>(result->tablets));
     _location->add_locations(*new_locations);
@@ -89,10 +79,10 @@ Status 
VOlapTableSinkV2::on_partitions_created(TCreatePartitionResult* result) {
 }
 
 static Status on_partitions_created(void* writer, TCreatePartitionResult* 
result) {
-    return 
static_cast<VOlapTableSinkV2*>(writer)->on_partitions_created(result);
+    return 
static_cast<VTabletWriterV2*>(writer)->on_partitions_created(result);
 }
 
-Status VOlapTableSinkV2::_incremental_open_streams(
+Status VTabletWriterV2::_incremental_open_streams(
         const std::vector<TOlapTablePartition>& partitions) {
     // do what we did in prepare() for partitions. indexes which don't change 
when we create new partition is orthogonal to partitions.
     std::unordered_set<int64_t> known_indexes;
@@ -130,7 +120,7 @@ Status VOlapTableSinkV2::_incremental_open_streams(
     return Status::OK();
 }
 
-Status VOlapTableSinkV2::_init_row_distribution() {
+Status VTabletWriterV2::_init_row_distribution() {
     VRowDistributionContext ctx;
 
     ctx.state = _state;
@@ -141,7 +131,7 @@ Status VOlapTableSinkV2::_init_row_distribution() {
     ctx.txn_id = _txn_id;
     ctx.pool = _pool;
     ctx.location = _location;
-    ctx.vec_output_expr_ctxs = &_output_vexpr_ctxs;
+    ctx.vec_output_expr_ctxs = &_vec_output_expr_ctxs;
     ctx.on_partitions_created = &vectorized::on_partitions_created;
     ctx.caller = (void*)this;
     ctx.schema = _schema;
@@ -153,11 +143,17 @@ Status VOlapTableSinkV2::_init_row_distribution() {
     return Status::OK();
 }
 
-Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
-    DCHECK(t_sink.__isset.olap_table_sink);
-    auto& table_sink = t_sink.olap_table_sink;
+Status VTabletWriterV2::init_properties(ObjectPool* pool, bool group_commit) {
+    _pool = pool;
+    _group_commit = group_commit;
+    return Status::OK();
+}
+
+Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
+    auto& table_sink = _t_sink.olap_table_sink;
     _load_id.set_hi(table_sink.load_id.hi);
     _load_id.set_lo(table_sink.load_id.lo);
+    signal::set_signal_task_id(_load_id);
     _txn_id = table_sink.txn_id;
     _num_replicas = table_sink.num_replicas;
     _tuple_desc_id = table_sink.tuple_id;
@@ -182,13 +178,8 @@ Status VOlapTableSinkV2::init(const TDataSink& t_sink) {
     _tablet_finder = std::make_unique<OlapTabletFinder>(_vpartition, 
find_tablet_mode);
     RETURN_IF_ERROR(_vpartition->init());
 
-    return Status::OK();
-}
-
-Status VOlapTableSinkV2::prepare(RuntimeState* state) {
-    RETURN_IF_ERROR(DataSink::prepare(state));
-
     _state = state;
+    _profile = profile;
 
     _sender_id = state->per_fragment_instance_idx();
     _num_senders = state->num_per_fragment_instances();
@@ -205,9 +196,9 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
             (state->execution_timeout() <= 
config::load_task_high_priority_threshold_second);
 
     // profile must add to state's object pool
-    _profile = state->obj_pool()->add(new RuntimeProfile("VOlapTableSinkV2"));
-    _mem_tracker = std::make_shared<MemTracker>("VOlapTableSinkV2:" +
-                                                
std::to_string(state->load_job_id()));
+    _profile = state->obj_pool()->add(new RuntimeProfile("VTabletWriterV2"));
+    _mem_tracker =
+            std::make_shared<MemTracker>("VTabletWriterV2:" + 
std::to_string(state->load_job_id()));
     SCOPED_TIMER(_profile->total_time_counter());
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
@@ -236,8 +227,6 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
     _close_writer_timer = ADD_CHILD_TIMER(_profile, "CloseWriterTime", 
"CloseWaitTime");
     _close_load_timer = ADD_CHILD_TIMER(_profile, "CloseLoadTime", 
"CloseWaitTime");
 
-    // Prepare the exprs to run.
-    RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, 
_row_desc));
     if (config::share_delta_writers) {
         _delta_writer_for_tablet = 
ExecEnv::GetInstance()->delta_writer_v2_pool()->get_or_create(
                 _load_id, _num_local_sink);
@@ -247,13 +236,11 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-Status VOlapTableSinkV2::open(RuntimeState* state) {
-    // Prepare the exprs to run.
-    RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
+Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) {
+    RETURN_IF_ERROR(_init(state, profile));
     SCOPED_TIMER(_profile->total_time_counter());
     SCOPED_TIMER(_open_timer);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-    signal::set_signal_task_id(_load_id);
 
     _build_tablet_node_mapping();
     RETURN_IF_ERROR(_open_streams(_backend_id));
@@ -262,7 +249,7 @@ Status VOlapTableSinkV2::open(RuntimeState* state) {
     return Status::OK();
 }
 
-Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
+Status VTabletWriterV2::_open_streams(int64_t src_id) {
     for (auto& [dst_id, _] : _tablets_for_node) {
         auto streams = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
                 _load_id, src_id, dst_id, _stream_per_node, _num_local_sink);
@@ -272,9 +259,9 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
     return Status::OK();
 }
 
-Status VOlapTableSinkV2::_open_streams_to_backend(int64_t dst_id,
-                                                  
::doris::stream_load::LoadStreams& streams) {
-    auto node_info = _nodes_info->find_node(dst_id);
+Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id,
+                                                 
::doris::stream_load::LoadStreams& streams) {
+    const auto* node_info = _nodes_info->find_node(dst_id);
     if (node_info == nullptr) {
         return Status::InternalError("Unknown node {} in tablet location", 
dst_id);
     }
@@ -294,7 +281,7 @@ Status VOlapTableSinkV2::_open_streams_to_backend(int64_t 
dst_id,
     return Status::OK();
 }
 
-void VOlapTableSinkV2::_build_tablet_node_mapping() {
+void VTabletWriterV2::_build_tablet_node_mapping() {
     std::unordered_set<int64_t> known_indexes;
     for (const auto& partition : _vpartition->get_partitions()) {
         for (const auto& index : partition->indexes) {
@@ -317,8 +304,8 @@ void VOlapTableSinkV2::_build_tablet_node_mapping() {
     }
 }
 
-void 
VOlapTableSinkV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
-                                                 RowsForTablet& 
rows_for_tablet) {
+void VTabletWriterV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
+                                                RowsForTablet& 
rows_for_tablet) {
     for (int index_idx = 0; index_idx < row_part_tablet_ids.size(); 
index_idx++) {
         auto& row_ids = row_part_tablet_ids[index_idx].row_ids;
         auto& partition_ids = row_part_tablet_ids[index_idx].partition_ids;
@@ -341,8 +328,8 @@ void 
VOlapTableSinkV2::_generate_rows_for_tablet(std::vector<RowPartTabletIds>&
     }
 }
 
-Status VOlapTableSinkV2::_select_streams(int64_t tablet_id, int64_t 
partition_id, int64_t index_id,
-                                         Streams& streams) {
+Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t 
partition_id, int64_t index_id,
+                                        Streams& streams) {
     auto location = _location->find_tablet(tablet_id);
     if (location == nullptr) {
         return Status::InternalError("unknown tablet location, tablet id = 
{}", tablet_id);
@@ -360,16 +347,16 @@ Status VOlapTableSinkV2::_select_streams(int64_t 
tablet_id, int64_t partition_id
     return Status::OK();
 }
 
-Status VOlapTableSinkV2::send(RuntimeState* state, vectorized::Block* 
input_block, bool eos) {
+Status VTabletWriterV2::append_block(Block& input_block) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     Status status = Status::OK();
 
-    if (state->query_options().dry_run_query) {
+    if (_state->query_options().dry_run_query) {
         return status;
     }
 
-    auto input_rows = input_block->rows();
-    auto input_bytes = input_block->bytes();
+    auto input_rows = input_block.rows();
+    auto input_bytes = input_block.bytes();
     if (UNLIKELY(input_rows == 0)) {
         return status;
     }
@@ -377,8 +364,8 @@ Status VOlapTableSinkV2::send(RuntimeState* state, 
vectorized::Block* input_bloc
     _number_input_rows += input_rows;
     // update incrementally so that FE can get the progress.
     // the real 'num_rows_load_total' will be set when sink being closed.
-    state->update_num_rows_load_total(input_rows);
-    state->update_num_bytes_load_total(input_bytes);
+    _state->update_num_rows_load_total(input_rows);
+    _state->update_num_bytes_load_total(input_bytes);
     DorisMetrics::instance()->load_rows->increment(input_rows);
     DorisMetrics::instance()->load_bytes->increment(input_bytes);
 
@@ -391,7 +378,7 @@ Status VOlapTableSinkV2::send(RuntimeState* state, 
vectorized::Block* input_bloc
 
     std::shared_ptr<vectorized::Block> block;
     RETURN_IF_ERROR(_row_distribution.generate_rows_distribution(
-            *input_block, block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids));
+            input_block, block, filtered_rows, has_filtered_rows, 
_row_part_tablet_ids));
     RowsForTablet rows_for_tablet;
     _generate_rows_for_tablet(_row_part_tablet_ids, rows_for_tablet);
 
@@ -407,9 +394,8 @@ Status VOlapTableSinkV2::send(RuntimeState* state, 
vectorized::Block* input_bloc
     return Status::OK();
 }
 
-Status VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> 
block,
-                                         int64_t tablet_id, const Rows& rows,
-                                         const Streams& streams) {
+Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> 
block, int64_t tablet_id,
+                                        const Rows& rows, const Streams& 
streams) {
     DeltaWriterV2* delta_writer = 
_delta_writer_for_tablet->get_or_create(tablet_id, [&]() {
         WriteRequest req {
                 .tablet_id = tablet_id,
@@ -440,7 +426,7 @@ Status 
VOlapTableSinkV2::_write_memtable(std::shared_ptr<vectorized::Block> bloc
     return st;
 }
 
-Status VOlapTableSinkV2::_cancel(Status status) {
+Status VTabletWriterV2::_cancel(Status status) {
     LOG(INFO) << "canceled olap table sink. load_id=" << print_id(_load_id)
               << ", txn_id=" << _txn_id << ", due to error: " << status;
     if (_delta_writer_for_tablet) {
@@ -453,8 +439,9 @@ Status VOlapTableSinkV2::_cancel(Status status) {
     return Status::OK();
 }
 
-Status VOlapTableSinkV2::close(RuntimeState* state, Status exec_status) {
-    if (_closed) {
+Status VTabletWriterV2::close(Status exec_status) {
+    std::lock_guard<std::mutex> close_lock(_close_mutex);
+    if (_is_closed) {
         return _close_status;
     }
     SCOPED_TIMER(_close_timer);
@@ -511,18 +498,18 @@ Status VOlapTableSinkV2::close(RuntimeState* state, 
Status exec_status) {
                 }
             }
         }
-        state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
-                                            
std::make_move_iterator(tablet_commit_infos.begin()),
-                                            
std::make_move_iterator(tablet_commit_infos.end()));
+        
_state->tablet_commit_infos().insert(_state->tablet_commit_infos().end(),
+                                             
std::make_move_iterator(tablet_commit_infos.begin()),
+                                             
std::make_move_iterator(tablet_commit_infos.end()));
         _streams_for_node.clear();
 
         // _number_input_rows don't contain num_rows_load_filtered and 
num_rows_load_unselected in scan node
-        int64_t num_rows_load_total = _number_input_rows + 
state->num_rows_load_filtered() +
-                                      state->num_rows_load_unselected();
-        state->set_num_rows_load_total(num_rows_load_total);
-        
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
-                                             
_tablet_finder->num_filtered_rows());
-        state->update_num_rows_load_unselected(
+        int64_t num_rows_load_total = _number_input_rows + 
_state->num_rows_load_filtered() +
+                                      _state->num_rows_load_unselected();
+        _state->set_num_rows_load_total(num_rows_load_total);
+        
_state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() +
+                                              
_tablet_finder->num_filtered_rows());
+        _state->update_num_rows_load_unselected(
                 _tablet_finder->num_immutable_partition_filtered_rows());
 
         LOG(INFO) << "finished to close olap table sink. load_id=" << 
print_id(_load_id)
@@ -531,12 +518,12 @@ Status VOlapTableSinkV2::close(RuntimeState* state, 
Status exec_status) {
         RETURN_IF_ERROR(_cancel(status));
     }
 
+    _is_closed = true;
     _close_status = status;
-    RETURN_IF_ERROR(DataSink::close(state, exec_status));
     return status;
 }
 
-Status VOlapTableSinkV2::_close_load(const Streams& streams) {
+Status VTabletWriterV2::_close_load(const Streams& streams) {
     auto node_id = streams[0]->dst_id();
     std::vector<PTabletID> tablets_to_commit;
     for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) {
diff --git a/be/src/vec/sink/vtablet_sink_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
similarity index 87%
copy from be/src/vec/sink/vtablet_sink_v2.h
copy to be/src/vec/sink/writer/vtablet_writer_v2.h
index 1f317420de4..d4ccf7b6523 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -26,10 +26,10 @@
 #include <gen_cpp/types.pb.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
-#include <stddef.h>
-#include <stdint.h>
 
 #include <atomic>
+#include <cstddef>
+#include <cstdint>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
 #include <functional>
@@ -48,7 +48,6 @@
 
 #include "common/config.h"
 #include "common/status.h"
-#include "exec/data_sink.h"
 #include "exec/tablet_info.h"
 #include "gutil/ref_counted.h"
 #include "runtime/exec_env.h"
@@ -65,6 +64,7 @@
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
 #include "vec/sink/vrow_distribution.h"
+#include "vec/sink/writer/async_result_writer.h"
 
 namespace doris {
 class DeltaWriterV2;
@@ -85,7 +85,7 @@ namespace vectorized {
 
 class OlapTableBlockConvertor;
 class OlapTabletFinder;
-class VOlapTableSinkV2;
+class VTabletWriterV2;
 class DeltaWriterV2Map;
 
 using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
@@ -100,31 +100,30 @@ using RowsForTablet = std::unordered_map<int64_t, Rows>;
 
 // Write block data to Olap Table.
 // When OlapTableSink::open() called, there will be a consumer thread running 
in the background.
-// When you call VOlapTableSinkV2::send(), you will be the producer who 
products pending batches.
+// When you call VTabletWriterV2::send(), you will be the producer who 
products pending batches.
 // Join the consumer thread in close().
-class VOlapTableSinkV2 final : public DataSink {
+class VTabletWriterV2 final : public AsyncResultWriter {
 public:
     // Construct from thrift struct which is generated by FE.
-    VOlapTableSinkV2(ObjectPool* pool, const RowDescriptor& row_desc,
-                     const std::vector<TExpr>& texprs, Status* status);
+    VTabletWriterV2(const TDataSink& t_sink, const VExprContextSPtrs& 
output_exprs);
 
-    ~VOlapTableSinkV2() override;
+    ~VTabletWriterV2() override;
 
-    Status init(const TDataSink& sink) override;
-    // TODO: unify the code of prepare/open/close with result sink
-    Status prepare(RuntimeState* state) override;
+    Status init_properties(ObjectPool* pool, bool group_commit);
 
-    Status open(RuntimeState* state) override;
+    Status append_block(Block& block) override;
 
-    Status close(RuntimeState* state, Status close_status) override;
+    Status open(RuntimeState* state, RuntimeProfile* profile) override;
 
-    Status send(RuntimeState* state, vectorized::Block* block, bool eos = 
false) override;
+    Status close(Status close_status) override;
 
     Status on_partitions_created(TCreatePartitionResult* result);
 
 private:
     Status _init_row_distribution();
 
+    Status _init(RuntimeState* state, RuntimeProfile* profile);
+
     Status _open_streams(int64_t src_id);
 
     Status _open_streams_to_backend(int64_t dst_id, 
::doris::stream_load::LoadStreams& streams);
@@ -148,6 +147,7 @@ private:
 
     std::shared_ptr<MemTracker> _mem_tracker;
 
+    TDataSink _t_sink;
     ObjectPool* _pool;
 
     // unique load id
@@ -202,13 +202,16 @@ private:
     RuntimeProfile::Counter* _close_load_timer = nullptr;
     RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
 
+    std::mutex _close_mutex;
+    bool _is_closed = false;
     // Save the status of close() method
     Status _close_status;
 
     VOlapTablePartitionParam* _vpartition = nullptr;
-    vectorized::VExprContextSPtrs _output_vexpr_ctxs;
 
-    RuntimeState* _state = nullptr;
+    RuntimeState* _state = nullptr;     // not owned, set when open
+    RuntimeProfile* _profile = nullptr; // not owned, set when open
+    bool _group_commit = false;
 
     std::unordered_set<int64_t> _opened_partitions;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to