This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6da36e10773 [feature](merge-cloud) Refactor write path code by 
abstract base class (#26537)
6da36e10773 is described below

commit 6da36e107739905c1f2665028c5788f1a7313902
Author: plat1ko <platonekos...@gmail.com>
AuthorDate: Fri Dec 8 14:50:36 2023 +0800

    [feature](merge-cloud) Refactor write path code by abstract base class 
(#26537)
    
    Refactor write path code by abstract base class. Whether to use 
`StorageEngine` or `CloudStorageEngine` will be determined during compilation 
instead of runtime `config::cloud_mode` to avoid unexpected null pointer or 
undefined behavior issues caused by merging code.
    
    Class that depend on `StorageEngine` but are shared by the cloud mode need 
to have an abstract base class. Common code should be extracted into the base 
class, while the code that depends on `StorageEngine` should be implemented in 
a `StorageEngine` mix-in class of the base class.
---
 be/src/cloud/config.cpp                            |   2 +-
 be/src/cloud/config.h                              |   2 +-
 be/src/olap/delta_writer.cpp                       | 101 +++++-----
 be/src/olap/delta_writer.h                         |  71 ++++---
 be/src/olap/rowset/beta_rowset_writer.cpp          | 215 ++++++++++++---------
 be/src/olap/rowset/beta_rowset_writer.h            | 118 ++++++-----
 be/src/olap/rowset/beta_rowset_writer_v2.h         |   4 -
 be/src/olap/rowset/rowset_factory.cpp              |   5 +-
 be/src/olap/rowset/rowset_writer.h                 |   4 -
 be/src/olap/rowset/segcompaction.h                 |   4 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  14 +-
 .../rowset/segment_v2/vertical_segment_writer.cpp  |  10 +-
 be/src/olap/rowset/vertical_beta_rowset_writer.cpp |   3 +
 be/src/olap/rowset/vertical_beta_rowset_writer.h   |   7 +-
 be/src/olap/rowset_builder.cpp                     | 151 +++++++--------
 be/src/olap/rowset_builder.h                       |  53 +++--
 be/src/runtime/exec_env.cpp                        |   2 +-
 be/src/runtime/exec_env.h                          |   6 +
 be/src/runtime/load_channel.cpp                    |  33 +++-
 be/src/runtime/load_channel.h                      |  41 +---
 be/src/runtime/load_stream.h                       |  12 +-
 be/src/runtime/load_stream_mgr.h                   |   8 +-
 be/src/runtime/load_stream_writer.cpp              |  20 +-
 be/src/runtime/load_stream_writer.h                |  22 +--
 be/src/runtime/tablets_channel.cpp                 | 202 ++++++++++---------
 be/src/runtime/tablets_channel.h                   |  97 +++++-----
 be/src/vec/olap/block_reader.cpp                   |  13 +-
 be/src/vec/olap/vertical_block_reader.cpp          |  11 +-
 be/test/olap/delta_writer_test.cpp                 |  42 ++--
 .../olap/engine_storage_migration_task_test.cpp    |   9 +-
 be/test/olap/memtable_memory_limiter_test.cpp      |   7 +-
 be/test/olap/tablet_cooldown_test.cpp              |   7 +-
 32 files changed, 680 insertions(+), 616 deletions(-)

diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index e7e1510b2ba..4d9da1e9cfc 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -20,7 +20,7 @@
 namespace doris {
 namespace config {
 
-DEFINE_Bool(cloud_mode, "false");
+// TODO
 
 } // namespace config
 } // namespace doris
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index da7fea826cc..21a3b6052f5 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -22,7 +22,7 @@
 namespace doris {
 namespace config {
 
-DECLARE_Bool(cloud_mode);
+// TODO
 
 } // namespace config
 } // namespace doris
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 310ffacad9a..dc5973bf04d 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -28,7 +28,7 @@
 #include <string>
 #include <utility>
 
-#include "cloud/config.h"
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
 #include "common/logging.h"
@@ -41,6 +41,7 @@
 #include "olap/rowset/beta_rowset_writer.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
+#include "olap/rowset_builder.h"
 #include "olap/schema_change.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
@@ -57,25 +58,29 @@
 namespace doris {
 using namespace ErrorCode;
 
-Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, 
RuntimeProfile* profile,
-                         const UniqueId& load_id) {
-    *writer = new DeltaWriter(req, StorageEngine::instance(), profile, 
load_id);
-    return Status::OK();
+BaseDeltaWriter::BaseDeltaWriter(WriteRequest* req, RuntimeProfile* profile,
+                                 const UniqueId& load_id)
+        : _req(*req), _memtable_writer(new MemTableWriter(*req)) {
+    _init_profile(profile);
 }
 
-DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, 
RuntimeProfile* profile,
+DeltaWriter::DeltaWriter(StorageEngine& engine, WriteRequest* req, 
RuntimeProfile* profile,
                          const UniqueId& load_id)
-        : _req(*req), _rowset_builder(*req, profile), _memtable_writer(new 
MemTableWriter(*req)) {
-    _init_profile(profile);
+        : BaseDeltaWriter(req, profile, load_id), _engine(engine) {
+    _rowset_builder = std::make_unique<RowsetBuilder>(_engine, *req, profile);
 }
 
-void DeltaWriter::_init_profile(RuntimeProfile* profile) {
+void BaseDeltaWriter::_init_profile(RuntimeProfile* profile) {
     _profile = profile->create_child(fmt::format("DeltaWriter {}", 
_req.tablet_id), true, true);
     _close_wait_timer = ADD_TIMER(_profile, "CloseWaitTime");
+}
+
+void DeltaWriter::_init_profile(RuntimeProfile* profile) {
+    BaseDeltaWriter::_init_profile(profile);
     _commit_txn_timer = ADD_TIMER(_profile, "CommitTxnTime");
 }
 
-DeltaWriter::~DeltaWriter() {
+BaseDeltaWriter::~BaseDeltaWriter() {
     if (!_is_init) {
         return;
     }
@@ -83,33 +88,35 @@ DeltaWriter::~DeltaWriter() {
     // cancel and wait all memtables in flush queue to be finished
     static_cast<void>(_memtable_writer->cancel());
 
-    if (_rowset_builder.tablet() != nullptr) {
+    if (_rowset_builder->tablet() != nullptr) {
         const FlushStatistic& stat = _memtable_writer->get_flush_token_stats();
-        
_rowset_builder.tablet()->flush_bytes->increment(stat.flush_size_bytes);
-        
_rowset_builder.tablet()->flush_finish_count->increment(stat.flush_finish_count);
+        
_rowset_builder->tablet()->flush_bytes->increment(stat.flush_size_bytes);
+        
_rowset_builder->tablet()->flush_finish_count->increment(stat.flush_finish_count);
     }
 }
 
-Status DeltaWriter::init() {
+DeltaWriter::~DeltaWriter() = default;
+
+Status BaseDeltaWriter::init() {
     if (_is_init) {
         return Status::OK();
     }
-    RETURN_IF_ERROR(_rowset_builder.init());
-    RETURN_IF_ERROR(
-            _memtable_writer->init(_rowset_builder.rowset_writer(), 
_rowset_builder.tablet_schema(),
-                                   _rowset_builder.get_partial_update_info(),
-                                   
_rowset_builder.tablet()->enable_unique_key_merge_on_write()));
+    RETURN_IF_ERROR(_rowset_builder->init());
+    RETURN_IF_ERROR(_memtable_writer->init(
+            _rowset_builder->rowset_writer(), _rowset_builder->tablet_schema(),
+            _rowset_builder->get_partial_update_info(),
+            _rowset_builder->tablet()->enable_unique_key_merge_on_write()));
     
ExecEnv::GetInstance()->memtable_memory_limiter()->register_writer(_memtable_writer);
     _is_init = true;
     return Status::OK();
 }
 
-Status DeltaWriter::append(const vectorized::Block* block) {
+Status BaseDeltaWriter::append(const vectorized::Block* block) {
     return write(block, {}, true);
 }
 
-Status DeltaWriter::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs,
-                          bool is_append) {
+Status BaseDeltaWriter::write(const vectorized::Block* block, const 
std::vector<uint32_t>& row_idxs,
+                              bool is_append) {
     if (UNLIKELY(row_idxs.empty() && !is_append)) {
         return Status::OK();
     }
@@ -121,11 +128,11 @@ Status DeltaWriter::write(const vectorized::Block* block, 
const std::vector<uint
     }
     return _memtable_writer->write(block, row_idxs, is_append);
 }
-Status DeltaWriter::wait_flush() {
+Status BaseDeltaWriter::wait_flush() {
     return _memtable_writer->wait_flush();
 }
 
-Status DeltaWriter::close() {
+Status BaseDeltaWriter::close() {
     _lock_watch.start();
     std::lock_guard<std::mutex> l(_lock);
     _lock_watch.stop();
@@ -140,37 +147,31 @@ Status DeltaWriter::close() {
     return _memtable_writer->close();
 }
 
-Status DeltaWriter::build_rowset() {
+Status BaseDeltaWriter::build_rowset() {
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
             << "delta writer is supposed be to initialized before 
build_rowset() being called";
 
     SCOPED_TIMER(_close_wait_timer);
     RETURN_IF_ERROR(_memtable_writer->close_wait(_profile));
-    return _rowset_builder.build_rowset();
+    return _rowset_builder->build_rowset();
 }
 
-Status DeltaWriter::submit_calc_delete_bitmap_task() {
-    return _rowset_builder.submit_calc_delete_bitmap_task();
+Status BaseDeltaWriter::submit_calc_delete_bitmap_task() {
+    return _rowset_builder->submit_calc_delete_bitmap_task();
 }
 
-Status DeltaWriter::wait_calc_delete_bitmap() {
-    return _rowset_builder.wait_calc_delete_bitmap();
+Status BaseDeltaWriter::wait_calc_delete_bitmap() {
+    return _rowset_builder->wait_calc_delete_bitmap();
 }
 
-Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes,
-                               const bool write_single_replica) {
-    if (config::cloud_mode) {
-        return Status::OK();
-    }
+Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes) {
     std::lock_guard<std::mutex> l(_lock);
     SCOPED_TIMER(_commit_txn_timer);
-    RETURN_IF_ERROR(_rowset_builder.commit_txn());
+    RETURN_IF_ERROR(_rowset_builder->commit_txn());
 
-    if (write_single_replica) {
-        for (auto node_info : slave_tablet_nodes.slave_nodes()) {
-            _request_slave_tablet_pull_rowset(node_info);
-        }
+    for (auto&& node_info : slave_tablet_nodes.slave_nodes()) {
+        _request_slave_tablet_pull_rowset(node_info);
     }
     return Status::OK();
 }
@@ -191,11 +192,11 @@ void DeltaWriter::add_finished_slave_replicas(
     success_slave_tablet_node_ids->insert({_req.tablet_id, 
_success_slave_node_ids});
 }
 
-Status DeltaWriter::cancel() {
+Status BaseDeltaWriter::cancel() {
     return cancel_with_status(Status::Cancelled("already cancelled"));
 }
 
-Status DeltaWriter::cancel_with_status(const Status& st) {
+Status BaseDeltaWriter::cancel_with_status(const Status& st) {
     std::lock_guard<std::mutex> l(_lock);
     if (_is_cancelled) {
         return Status::OK();
@@ -205,14 +206,11 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
     return Status::OK();
 }
 
-int64_t DeltaWriter::mem_consumption(MemType mem) {
+int64_t BaseDeltaWriter::mem_consumption(MemType mem) {
     return _memtable_writer->mem_consumption(mem);
 }
 
 void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
-    if (config::cloud_mode) [[unlikely]] {
-        return;
-    }
     std::shared_ptr<PBackendService_Stub> stub =
             ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
                     node_info.host(), node_info.async_internal_port());
@@ -224,15 +222,14 @@ void 
DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
         return;
     }
 
-    
StorageEngine::instance()->txn_manager()->add_txn_tablet_delta_writer(_req.txn_id,
-                                                                          
_req.tablet_id, this);
+    _engine.txn_manager()->add_txn_tablet_delta_writer(_req.txn_id, 
_req.tablet_id, this);
     {
         std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
         _unfinished_slave_node.insert(node_info.id());
     }
 
     std::vector<std::pair<int64_t, std::string>> indices_ids;
-    auto cur_rowset = _rowset_builder.rowset();
+    auto cur_rowset = _rowset_builder->rowset();
     auto tablet_schema = cur_rowset->rowset_meta()->tablet_schema();
     if (!tablet_schema->skip_write_index_on_load()) {
         for (auto& column : tablet_schema->columns()) {
@@ -247,7 +244,7 @@ void 
DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
     *(request->mutable_rowset_meta()) = 
cur_rowset->rowset_meta()->get_rowset_pb();
     request->set_host(BackendOptions::get_localhost());
     request->set_http_port(config::webserver_port);
-    string tablet_path = _rowset_builder.tablet()->tablet_path();
+    string tablet_path = _rowset_builder->tablet()->tablet_path();
     request->set_rowset_path(tablet_path);
     request->set_token(ExecEnv::GetInstance()->token());
     request->set_brpc_port(config::brpc_port);
@@ -314,8 +311,8 @@ void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t 
node_id, bool is_succe
     _unfinished_slave_node.erase(node_id);
 }
 
-int64_t DeltaWriter::num_rows_filtered() const {
-    auto rowset_writer = _rowset_builder.rowset_writer();
+int64_t BaseDeltaWriter::num_rows_filtered() const {
+    auto rowset_writer = _rowset_builder->rowset_writer();
     return rowset_writer == nullptr ? 0 : rowset_writer->num_rows_filtered();
 }
 
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index d7e351a168e..c782c5ef3b6 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -20,7 +20,6 @@
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/internal_service.pb.h>
 #include <gen_cpp/types.pb.h>
-#include <stdint.h>
 
 #include <atomic>
 #include <memory>
@@ -34,7 +33,6 @@
 #include "olap/memtable_writer.h"
 #include "olap/olap_common.h"
 #include "olap/rowset/rowset.h"
-#include "olap/rowset_builder.h"
 #include "olap/tablet.h"
 #include "olap/tablet_meta.h"
 #include "olap/tablet_schema.h"
@@ -56,14 +54,15 @@ namespace vectorized {
 class Block;
 } // namespace vectorized
 
+class BaseRowsetBuilder;
+
 // Writer for a particular (load, index, tablet).
 // This class is NOT thread-safe, external synchronization is required.
-class DeltaWriter {
+class BaseDeltaWriter {
 public:
-    static Status open(WriteRequest* req, DeltaWriter** writer, 
RuntimeProfile* profile,
-                       const UniqueId& load_id = TUniqueId());
+    BaseDeltaWriter(WriteRequest* req, RuntimeProfile* profile, const 
UniqueId& load_id);
 
-    ~DeltaWriter();
+    virtual ~BaseDeltaWriter();
 
     Status init();
 
@@ -79,15 +78,6 @@ public:
     Status build_rowset();
     Status submit_calc_delete_bitmap_task();
     Status wait_calc_delete_bitmap();
-    Status commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, const bool 
write_single_replica);
-
-    bool check_slave_replicas_done(google::protobuf::Map<int64_t, 
PSuccessSlaveTabletNodeIds>*
-                                           success_slave_tablet_node_ids);
-
-    void add_finished_slave_replicas(google::protobuf::Map<int64_t, 
PSuccessSlaveTabletNodeIds>*
-                                             success_slave_tablet_node_ids);
-
-    void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
 
     // abandon current memtable and wait for all pending-flushing memtables to 
be destructed.
     // mem_consumption() should be 0 after this function returns.
@@ -109,34 +99,55 @@ public:
 
     int64_t num_rows_filtered() const;
 
-    // For UT
-    DeleteBitmapPtr get_delete_bitmap() { return 
_rowset_builder.get_delete_bitmap(); }
-
-private:
-    DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, 
RuntimeProfile* profile,
-                const UniqueId& load_id);
-
-    void _request_slave_tablet_pull_rowset(PNodeInfo node_info);
-
-    void _init_profile(RuntimeProfile* profile);
+protected:
+    virtual void _init_profile(RuntimeProfile* profile);
 
     bool _is_init = false;
     bool _is_cancelled = false;
     WriteRequest _req;
-    RowsetBuilder _rowset_builder;
+    std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
     std::shared_ptr<MemTableWriter> _memtable_writer;
 
     std::mutex _lock;
 
-    std::unordered_set<int64_t> _unfinished_slave_node;
-    PSuccessSlaveTabletNodeIds _success_slave_node_ids;
-    std::shared_mutex _slave_node_lock;
+    // total rows num written by DeltaWriter
+    std::atomic<int64_t> _total_received_rows = 0;
 
     RuntimeProfile* _profile = nullptr;
     RuntimeProfile::Counter* _close_wait_timer = nullptr;
-    RuntimeProfile::Counter* _commit_txn_timer = nullptr;
 
     MonotonicStopWatch _lock_watch;
 };
 
+// `StorageEngine` mixin for `BaseDeltaWriter`
+class DeltaWriter final : public BaseDeltaWriter {
+public:
+    DeltaWriter(StorageEngine& engine, WriteRequest* req, RuntimeProfile* 
profile,
+                const UniqueId& load_id);
+
+    ~DeltaWriter() override;
+
+    Status commit_txn(const PSlaveTabletNodes& slave_tablet_nodes);
+
+    bool check_slave_replicas_done(google::protobuf::Map<int64_t, 
PSuccessSlaveTabletNodeIds>*
+                                           success_slave_tablet_node_ids);
+
+    void add_finished_slave_replicas(google::protobuf::Map<int64_t, 
PSuccessSlaveTabletNodeIds>*
+                                             success_slave_tablet_node_ids);
+
+    void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
+
+private:
+    void _init_profile(RuntimeProfile* profile) override;
+
+    void _request_slave_tablet_pull_rowset(PNodeInfo node_info);
+
+    StorageEngine& _engine;
+    std::unordered_set<int64_t> _unfinished_slave_node;
+    PSuccessSlaveTabletNodeIds _success_slave_node_ids;
+    std::shared_mutex _slave_node_lock;
+
+    RuntimeProfile::Counter* _commit_txn_timer = nullptr;
+};
+
 } // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index ab9e97a3d3b..6850ce0f43c 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -25,10 +25,11 @@
 
 #include <ctime> // time
 #include <filesystem>
+#include <memory>
 #include <sstream>
 #include <utility>
 
-#include "cloud/config.h"
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
 #include "common/logging.h"
@@ -40,6 +41,7 @@
 #include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segcompaction.h"
 #include "olap/rowset/segment_v2/inverted_index_cache.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/rowset/segment_v2/segment.h"
@@ -59,27 +61,53 @@
 namespace doris {
 using namespace ErrorCode;
 
-BetaRowsetWriter::BetaRowsetWriter()
+namespace {
+
+bool is_segment_overlapping(const std::vector<KeyBoundsPB>& 
segments_encoded_key_bounds) {
+    std::string_view last;
+    for (auto&& segment_encode_key : segments_encoded_key_bounds) {
+        auto&& cur_min = segment_encode_key.min_key();
+        auto&& cur_max = segment_encode_key.max_key();
+        if (cur_min <= last) {
+            return true;
+        }
+        last = cur_max;
+    }
+    return false;
+}
+
+void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta,
+                                       const RowsetMeta& spec_rowset_meta) {
+    rowset_meta.set_num_rows(spec_rowset_meta.num_rows());
+    rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size());
+    rowset_meta.set_data_disk_size(spec_rowset_meta.total_disk_size());
+    rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size());
+    // TODO write zonemap to meta
+    rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0);
+    rowset_meta.set_creation_time(time(nullptr));
+    rowset_meta.set_num_segments(spec_rowset_meta.num_segments());
+    rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap());
+    rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state());
+
+    std::vector<KeyBoundsPB> segments_key_bounds;
+    spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds);
+    rowset_meta.set_segments_key_bounds(segments_key_bounds);
+}
+
+} // namespace
+
+BaseBetaRowsetWriter::BaseBetaRowsetWriter()
         : _rowset_meta(nullptr),
           _num_segment(0),
           _segment_start_id(0),
-          _segcompacted_point(0),
-          _num_segcompacted(0),
           _num_rows_written(0),
           _total_data_size(0),
-          _total_index_size(0),
-          _segcompaction_worker(this),
-          _is_doing_segcompaction(false) {
-    _segcompaction_status.store(OK);
-}
+          _total_index_size(0) {}
 
-BetaRowsetWriter::~BetaRowsetWriter() {
-    /* Note that segcompaction is async and in parallel with load job. So we 
should handle carefully
-     * when the job is cancelled. Although it is meaningless to continue 
segcompaction when the job
-     * is cancelled, the objects involved in the job should be preserved 
during segcompaction to
-     * avoid crashs for memory issues. */
-    WARN_IF_ERROR(wait_flying_segcompaction(), "segment compaction failed");
+BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine)
+        : _engine(engine), 
_segcompaction_worker(std::make_unique<SegcompactionWorker>(this)) {}
 
+BaseBetaRowsetWriter::~BaseBetaRowsetWriter() {
     // TODO(lingbin): Should wrapper exception logic, no need to know file ops 
directly.
     if (!_already_built) { // abnormal exit, remove all files generated
         WARN_IF_ERROR(_segment_creator.close(),
@@ -100,7 +128,15 @@ BetaRowsetWriter::~BetaRowsetWriter() {
     }
 }
 
-Status BetaRowsetWriter::init(const RowsetWriterContext& 
rowset_writer_context) {
+BetaRowsetWriter::~BetaRowsetWriter() {
+    /* Note that segcompaction is async and in parallel with load job. So we 
should handle carefully
+     * when the job is cancelled. Although it is meaningless to continue 
segcompaction when the job
+     * is cancelled, the objects involved in the job should be preserved 
during segcompaction to
+     * avoid crashs for memory issues. */
+    WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed");
+}
+
+Status BaseBetaRowsetWriter::init(const RowsetWriterContext& 
rowset_writer_context) {
     _context = rowset_writer_context;
     _rowset_meta.reset(new RowsetMeta);
     _rowset_meta->set_fs(_context.fs);
@@ -121,21 +157,17 @@ Status BetaRowsetWriter::init(const RowsetWriterContext& 
rowset_writer_context)
     }
     _rowset_meta->set_tablet_uid(_context.tablet_uid);
     _rowset_meta->set_tablet_schema(_context.tablet_schema);
-    _context.segment_collector = 
std::make_shared<SegmentCollectorT<BetaRowsetWriter>>(this);
-    _context.file_writer_creator = 
std::make_shared<FileWriterCreatorT<BetaRowsetWriter>>(this);
+    _context.segment_collector = 
std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this);
+    _context.file_writer_creator = 
std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this);
     RETURN_IF_ERROR(_segment_creator.init(_context));
     return Status::OK();
 }
 
-Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
+Status BaseBetaRowsetWriter::add_block(const vectorized::Block* block) {
     return _segment_creator.add_block(block);
 }
 
 Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
-    if (config::cloud_mode) {
-        // TODO(plat1ko)
-        return Status::NotSupported("_generate_delete_bitmap");
-    }
     SCOPED_RAW_TIMER(&_delete_bitmap_ns);
     if (!_context.tablet->enable_unique_key_merge_on_write() ||
         (_context.partial_update_info && 
_context.partial_update_info->is_partial_update)) {
@@ -374,12 +406,11 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
     } else if ((_num_segment - _segcompacted_point) >= 
config::segcompaction_batch_size) {
         SegCompactionCandidatesSharedPtr segments;
         status = _find_longest_consecutive_small_segment(segments);
-        if (LIKELY(status.ok()) && (segments->size() > 0)) {
+        if (LIKELY(status.ok()) && (!segments->empty())) {
             LOG(INFO) << "submit segcompaction task, tablet_id:" << 
_context.tablet_id
                       << " rowset_id:" << _context.rowset_id << " segment 
num:" << _num_segment
                       << ", segcompacted_point:" << _segcompacted_point;
-            status = 
StorageEngine::instance()->submit_seg_compaction_task(&_segcompaction_worker,
-                                                                           
segments);
+            status = 
_engine.submit_seg_compaction_task(_segcompaction_worker.get(), segments);
             if (status.ok()) {
                 return status;
             }
@@ -415,7 +446,7 @@ Status 
BetaRowsetWriter::_segcompaction_rename_last_segments() {
     return Status::OK();
 }
 
-Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
+Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
     assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET);
     RETURN_IF_ERROR(rowset->link_files_to(_context.rowset_dir, 
_context.rowset_id));
     _num_rows_written += rowset->num_rows();
@@ -438,17 +469,17 @@ Status BetaRowsetWriter::add_rowset(RowsetSharedPtr 
rowset) {
     return Status::OK();
 }
 
-Status BetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr 
rowset) {
+Status 
BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr 
rowset) {
     // TODO use schema_mapping to transfer zonemap
     return add_rowset(rowset);
 }
 
-Status BetaRowsetWriter::flush() {
+Status BaseBetaRowsetWriter::flush() {
     return _segment_creator.flush();
 }
 
-Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t 
segment_id,
-                                        int64_t* flush_size) {
+Status BaseBetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t 
segment_id,
+                                            int64_t* flush_size) {
     if (block->rows() == 0) {
         return Status::OK();
     }
@@ -460,11 +491,11 @@ Status 
BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segmen
     return Status::OK();
 }
 
-Status BetaRowsetWriter::flush_single_block(const vectorized::Block* block) {
+Status BaseBetaRowsetWriter::flush_single_block(const vectorized::Block* 
block) {
     return _segment_creator.flush_single_block(block);
 }
 
-Status BetaRowsetWriter::wait_flying_segcompaction() {
+Status BetaRowsetWriter::_wait_flying_segcompaction() {
     std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock);
     uint64_t begin_wait = GetCurrentTimeMicros();
     while (_is_doing_segcompaction) {
@@ -481,12 +512,12 @@ Status BetaRowsetWriter::wait_flying_segcompaction() {
     return Status::OK();
 }
 
-RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& 
spec_rowset_meta) {
+RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& 
spec_rowset_meta) {
     if (_rowset_meta->newest_write_timestamp() == -1) {
         _rowset_meta->set_newest_write_timestamp(UnixSeconds());
     }
 
-    _build_rowset_meta_with_spec_field(_rowset_meta, spec_rowset_meta);
+    build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta);
     RowsetSharedPtr rowset;
     auto status = RowsetFactory::create_rowset(_context.tablet_schema, 
_context.rowset_dir,
                                                _rowset_meta, &rowset);
@@ -498,7 +529,7 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const 
RowsetMetaSharedPtr& spec_r
     return rowset;
 }
 
-Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
+Status BaseBetaRowsetWriter::_close_file_writers() {
     // TODO(lingbin): move to more better place, or in a CreateBlockBatch?
     for (auto& file_writer : _file_writers) {
         RETURN_NOT_OK_STATUS_WITH_WARN(
@@ -506,20 +537,31 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
                 fmt::format("failed to close file writer, path={}", 
file_writer->path().string()));
     }
     RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(),
-                                   "failed to close segment creator when build 
new rowset")
+                                   "failed to close segment creator when build 
new rowset");
+    return Status::OK();
+}
+
+Status BetaRowsetWriter::_close_file_writers() {
+    RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers());
     // if _segment_start_id is not zero, that means it's a transient rowset 
writer for
     // MoW partial update, don't need to do segment compaction.
     if (_segment_start_id == 0) {
-        _segcompaction_worker.cancel();
-        RETURN_NOT_OK_STATUS_WITH_WARN(wait_flying_segcompaction(),
+        _segcompaction_worker->cancel();
+        RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(),
                                        "segcompaction failed when build new 
rowset");
         RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(),
                                        "rename last segments failed when build 
new rowset");
-        if (_segcompaction_worker.get_file_writer()) {
-            
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker.get_file_writer()->close(),
+        if (_segcompaction_worker->get_file_writer()) {
+            
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker->get_file_writer()->close(),
                                            "close segment compaction worker 
failed");
         }
     }
+    return Status::OK();
+}
+
+Status BaseBetaRowsetWriter::build(RowsetSharedPtr& rowset) {
+    RETURN_IF_ERROR(_close_file_writers());
+
     RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(),
                                    "too many segments when build new rowset");
     _build_rowset_meta(_rowset_meta);
@@ -541,25 +583,19 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
     return Status::OK();
 }
 
-bool BetaRowsetWriter::_is_segment_overlapping(
-        const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) {
-    std::string last;
-    for (auto segment_encode_key : segments_encoded_key_bounds) {
-        auto cur_min = segment_encode_key.min_key();
-        auto cur_max = segment_encode_key.max_key();
-        if (cur_min <= last) {
-            return true;
-        }
-        last = cur_max;
-    }
-    return false;
+int64_t BaseBetaRowsetWriter::_num_seg() const {
+    return _num_segment;
+}
+
+int64_t BetaRowsetWriter::_num_seg() const {
+    return _is_segcompacted() ? _num_segcompacted : _num_segment;
 }
 
 // update tablet schema when meet variant columns, before commit_txn
 // Eg. rowset schema:       A(int),    B(float),  C(int), D(int)
 // _tabelt->tablet_schema:  A(bigint), B(double)
 //  => update_schema:       A(bigint), B(double), C(int), D(int)
-void BetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) {
+void BaseBetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) 
{
     std::lock_guard<std::mutex> lock(*(_context.schema_lock));
     TabletSchemaSPtr update_schema;
     static_cast<void>(vectorized::schema_util::get_least_common_schema(
@@ -573,26 +609,7 @@ void 
BetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr flush_schema) {
     VLOG_DEBUG << "dump rs schema: " << 
_context.tablet_schema->dump_structure();
 }
 
-void BetaRowsetWriter::_build_rowset_meta_with_spec_field(
-        RowsetMetaSharedPtr rowset_meta, const RowsetMetaSharedPtr& 
spec_rowset_meta) {
-    rowset_meta->set_num_rows(spec_rowset_meta->num_rows());
-    rowset_meta->set_total_disk_size(spec_rowset_meta->total_disk_size());
-    rowset_meta->set_data_disk_size(spec_rowset_meta->total_disk_size());
-    rowset_meta->set_index_disk_size(spec_rowset_meta->index_disk_size());
-    // TODO write zonemap to meta
-    rowset_meta->set_empty(spec_rowset_meta->num_rows() == 0);
-    rowset_meta->set_creation_time(time(nullptr));
-    rowset_meta->set_num_segments(spec_rowset_meta->num_segments());
-    rowset_meta->set_segments_overlap(spec_rowset_meta->segments_overlap());
-    rowset_meta->set_rowset_state(spec_rowset_meta->rowset_state());
-
-    std::vector<KeyBoundsPB> segments_key_bounds;
-    spec_rowset_meta->get_segments_key_bounds(&segments_key_bounds);
-    rowset_meta->set_segments_key_bounds(segments_key_bounds);
-}
-
-void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> 
rowset_meta) {
-    int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment;
+void BaseBetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> 
rowset_meta) {
     int64_t num_rows_written = 0;
     int64_t total_data_size = 0;
     int64_t total_index_size = 0;
@@ -613,11 +630,11 @@ void 
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
     // segment key bounds are empty in old version(before version 1.2.x). So 
we should not modify
     // the overlap property when key bounds are empty.
     if (!segments_encoded_key_bounds.empty() &&
-        !_is_segment_overlapping(segments_encoded_key_bounds)) {
+        !is_segment_overlapping(segments_encoded_key_bounds)) {
         rowset_meta->set_segments_overlap(NONOVERLAPPING);
     }
 
-    rowset_meta->set_num_segments(num_seg);
+    rowset_meta->set_num_segments(_num_seg());
     // TODO(zhangzhengyu): key_bounds.size() should equal num_seg, but 
currently not always
     rowset_meta->set_num_rows(num_rows_written + _num_rows_written);
     rowset_meta->set_total_disk_size(total_data_size + _total_data_size);
@@ -635,7 +652,7 @@ void 
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
     }
 }
 
-RowsetSharedPtr BetaRowsetWriter::_build_tmp() {
+RowsetSharedPtr BaseBetaRowsetWriter::_build_tmp() {
     std::shared_ptr<RowsetMeta> rowset_meta_ = std::make_shared<RowsetMeta>();
     *rowset_meta_ = *_rowset_meta;
     _build_rowset_meta(rowset_meta_);
@@ -650,7 +667,7 @@ RowsetSharedPtr BetaRowsetWriter::_build_tmp() {
     return rowset;
 }
 
-Status BetaRowsetWriter::_create_file_writer(std::string path, 
io::FileWriterPtr& file_writer) {
+Status BaseBetaRowsetWriter::_create_file_writer(std::string path, 
io::FileWriterPtr& file_writer) {
     auto fs = _rowset_meta->fs();
     if (!fs) {
         return Status::Error<INIT_FAILED>("get fs failed");
@@ -673,7 +690,8 @@ Status BetaRowsetWriter::_create_file_writer(std::string 
path, io::FileWriterPtr
     return Status::OK();
 }
 
-Status BetaRowsetWriter::create_file_writer(uint32_t segment_id, 
io::FileWriterPtr& file_writer) {
+Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id,
+                                                io::FileWriterPtr& 
file_writer) {
     std::string path;
     path = BetaRowset::segment_file_path(_context.rowset_dir, 
_context.rowset_id, segment_id);
     return _create_file_writer(path, file_writer);
@@ -693,15 +711,28 @@ Status 
BetaRowsetWriter::_create_segment_writer_for_segcompaction(
     writer_options.write_type = _context.write_type;
     writer_options.write_type = DataWriteType::TYPE_COMPACTION;
 
-    writer->reset(new segment_v2::SegmentWriter(file_writer.get(), 
_num_segcompacted,
-                                                _context.tablet_schema, 
_context.tablet,
-                                                _context.data_dir, 
_context.max_rows_per_segment,
-                                                writer_options, 
_context.mow_context));
-    if (_segcompaction_worker.get_file_writer() != nullptr) {
-        RETURN_IF_ERROR(_segcompaction_worker.get_file_writer()->close());
+    *writer = std::make_unique<segment_v2::SegmentWriter>(
+            file_writer.get(), _num_segcompacted, _context.tablet_schema, 
_context.tablet,
+            _context.data_dir, _context.max_rows_per_segment, writer_options, 
_context.mow_context);
+    if (_segcompaction_worker->get_file_writer() != nullptr) {
+        RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close());
     }
-    _segcompaction_worker.get_file_writer().reset(file_writer.release());
+    _segcompaction_worker->get_file_writer().reset(file_writer.release());
+
+    return Status::OK();
+}
 
+Status BaseBetaRowsetWriter::_check_segment_number_limit() {
+    size_t total_segment_num = _num_segment + 1;
+    
DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments",
+                    { total_segment_num = dp->param("segnum", 1024); });
+    if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) {
+        return Status::Error<TOO_MANY_SEGMENTS>(
+                "too many segments in rowset. tablet_id:{}, rowset_id:{}, 
max:{}, "
+                "_num_segment:{}, ",
+                _context.tablet_id, _context.rowset_id.to_string(),
+                config::max_segment_num_per_rowset, _num_segment);
+    }
     return Status::OK();
 }
 
@@ -720,8 +751,8 @@ Status BetaRowsetWriter::_check_segment_number_limit() {
     return Status::OK();
 }
 
-Status BetaRowsetWriter::add_segment(uint32_t segment_id, const 
SegmentStatistics& segstat,
-                                     TabletSchemaSPtr flush_schema) {
+Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const 
SegmentStatistics& segstat,
+                                         TabletSchemaSPtr flush_schema) {
     uint32_t segid_offset = segment_id - _segment_start_id;
     {
         std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
@@ -750,10 +781,16 @@ Status BetaRowsetWriter::add_segment(uint32_t segment_id, 
const SegmentStatistic
     if (_context.mow_context != nullptr) {
         RETURN_IF_ERROR(_generate_delete_bitmap(segment_id));
     }
-    RETURN_IF_ERROR(_segcompaction_if_necessary());
     return Status::OK();
 }
 
+Status BetaRowsetWriter::add_segment(uint32_t segment_id, const 
SegmentStatistics& segstat,
+                                     TabletSchemaSPtr flush_schema) {
+    RETURN_IF_ERROR(
+            BaseBetaRowsetWriter::add_segment(segment_id, segstat, 
std::move(flush_schema)));
+    return _segcompaction_if_necessary();
+}
+
 Status BetaRowsetWriter::flush_segment_writer_for_segcompaction(
         std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t 
index_size,
         KeyBoundsPB& key_bounds) {
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index 68932c5ef77..347994e243e 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -19,8 +19,6 @@
 
 #include <fmt/format.h>
 #include <gen_cpp/olap_file.pb.h>
-#include <stddef.h>
-#include <stdint.h>
 
 #include <algorithm>
 #include <atomic>
@@ -43,7 +41,6 @@
 #include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/rowset_writer_context.h"
 #include "olap/rowset/segment_creator.h"
-#include "segcompaction.h"
 #include "segment_v2/segment.h"
 #include "util/spinlock.h"
 
@@ -59,21 +56,19 @@ class SegmentWriter;
 using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
 using SegCompactionCandidatesSharedPtr = 
std::shared_ptr<SegCompactionCandidates>;
 
-class BetaRowsetWriter : public RowsetWriter {
-    friend class SegcompactionWorker;
-
+class BaseBetaRowsetWriter : public RowsetWriter {
 public:
-    BetaRowsetWriter();
+    BaseBetaRowsetWriter();
 
-    ~BetaRowsetWriter() override;
+    ~BaseBetaRowsetWriter() override;
 
     Status init(const RowsetWriterContext& rowset_writer_context) override;
 
     Status add_block(const vectorized::Block* block) override;
 
+    // Declare these interface in `BaseBetaRowsetWriter`
     // add rowset by create hard link
     Status add_rowset(RowsetSharedPtr rowset) override;
-
     Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) 
override;
 
     Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer) 
override;
@@ -114,14 +109,6 @@ public:
 
     int32_t allocate_segment_id() override { return 
_segment_creator.allocate_segment_id(); };
 
-    Status flush_segment_writer_for_segcompaction(
-            std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t 
index_size,
-            KeyBoundsPB& key_bounds);
-
-    bool is_doing_segcompaction() const override { return 
_is_doing_segcompaction; }
-
-    Status wait_flying_segcompaction() override;
-
     void set_segment_start_id(int32_t start_id) override {
         _segment_creator.set_segment_start_id(start_id);
         _segment_start_id = start_id;
@@ -142,36 +129,20 @@ public:
     const RowsetWriterContext& context() const override { return _context; }
 
 private:
-    Status _create_file_writer(std::string path, io::FileWriterPtr& 
file_writer);
-    Status _check_segment_number_limit();
-    Status _generate_delete_bitmap(int32_t segment_id);
+    virtual Status _generate_delete_bitmap(int32_t segment_id) = 0;
     void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
 
-    // segment compaction
-    Status _create_segment_writer_for_segcompaction(
-            std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, 
int64_t end);
-    Status _segcompaction_if_necessary();
-    Status _segcompaction_rename_last_segments();
-    Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, 
int32_t segment_id);
-    Status 
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& 
segments);
-    bool _is_segcompacted() { return (_num_segcompacted > 0) ? true : false; }
-
-    bool _check_and_set_is_doing_segcompaction();
-
-    void _build_rowset_meta_with_spec_field(RowsetMetaSharedPtr rowset_meta,
-                                            const RowsetMetaSharedPtr& 
spec_rowset_meta);
-    bool _is_segment_overlapping(const std::vector<KeyBoundsPB>& 
segments_encoded_key_bounds);
-    void _clear_statistics_for_deleting_segments_unsafe(uint64_t begin, 
uint64_t end);
-    Status _rename_compacted_segments(int64_t begin, int64_t end);
-    Status _rename_compacted_segment_plain(uint64_t seg_id);
-    Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t 
seg_id);
-
     void update_rowset_schema(TabletSchemaSPtr flush_schema);
     // build a tmp rowset for load segment to calc delete_bitmap
     // for this segment
+protected:
+    Status _create_file_writer(std::string path, io::FileWriterPtr& 
file_writer);
+    virtual Status _close_file_writers();
+    virtual Status _check_segment_number_limit();
+    virtual int64_t _num_seg() const;
+    // build a tmp rowset for load segment to calc delete_bitmap for this 
segment
     RowsetSharedPtr _build_tmp();
 
-protected:
     RowsetWriterContext _context;
     std::shared_ptr<RowsetMeta> _rowset_meta;
 
@@ -179,9 +150,6 @@ protected:
     roaring::Roaring _segment_set;     // bitmap set to record flushed segment 
id
     std::mutex _segment_set_mutex;     // mutex for _segment_set
     int32_t _segment_start_id;         // basic write start from 0, partial 
update may be different
-    std::atomic<int32_t> _segcompacted_point; // segemnts before this point 
have
-                                              // already been segment compacted
-    std::atomic<int32_t> _num_segcompacted;   // index for segment compaction
 
     mutable SpinLock _lock; // protect following vectors.
     // record rows number of every segment already written, using for rowid
@@ -204,15 +172,6 @@ protected:
     bool _already_built = false;
 
     SegmentCreator _segment_creator;
-    SegcompactionWorker _segcompaction_worker;
-
-    // ensure only one inflight segcompaction task for each rowset
-    std::atomic<bool> _is_doing_segcompaction;
-    // enforce compare-and-swap on _is_doing_segcompaction
-    std::mutex _is_doing_segcompaction_lock;
-    std::condition_variable _segcompacting_cond;
-
-    std::atomic<int> _segcompaction_status;
 
     fmt::memory_buffer vlog_buffer;
 
@@ -222,4 +181,59 @@ protected:
     int64_t _segment_writer_ns = 0;
 };
 
+class SegcompactionWorker;
+
+// `StorageEngine` mixin for `BaseBetaRowsetWriter`
+class BetaRowsetWriter : public BaseBetaRowsetWriter {
+public:
+    BetaRowsetWriter(StorageEngine& engine);
+
+    ~BetaRowsetWriter() override;
+
+    Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
+                       TabletSchemaSPtr flush_schema) override;
+
+    Status flush_segment_writer_for_segcompaction(
+            std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t 
index_size,
+            KeyBoundsPB& key_bounds);
+
+private:
+    Status _generate_delete_bitmap(int32_t segment_id) override;
+
+    // segment compaction
+    friend class SegcompactionWorker;
+    Status _close_file_writers() override;
+    Status _check_segment_number_limit() override;
+    int64_t _num_seg() const override;
+    Status _wait_flying_segcompaction();
+    Status _create_segment_writer_for_segcompaction(
+            std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, 
int64_t end);
+    Status _segcompaction_if_necessary();
+    Status _segcompaction_rename_last_segments();
+    Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, 
int32_t segment_id);
+    Status 
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& 
segments);
+    bool _is_segcompacted() const { return _num_segcompacted > 0; }
+    bool _check_and_set_is_doing_segcompaction();
+    Status _rename_compacted_segments(int64_t begin, int64_t end);
+    Status _rename_compacted_segment_plain(uint64_t seg_id);
+    Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t 
seg_id);
+    void _clear_statistics_for_deleting_segments_unsafe(uint64_t begin, 
uint64_t end);
+
+    StorageEngine& _engine;
+
+    std::atomic<int32_t> _segcompacted_point {0}; // segemnts before this 
point have
+                                                  // already been segment 
compacted
+    std::atomic<int32_t> _num_segcompacted {0};   // index for segment 
compaction
+
+    std::unique_ptr<SegcompactionWorker> _segcompaction_worker;
+
+    // ensure only one inflight segcompaction task for each rowset
+    std::atomic<bool> _is_doing_segcompaction {false};
+    // enforce compare-and-swap on _is_doing_segcompaction
+    std::mutex _is_doing_segcompaction_lock;
+    std::condition_variable _segcompacting_cond;
+
+    std::atomic<int> _segcompaction_status {ErrorCode::OK};
+};
+
 } // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h 
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index 4a99acdaba6..bdcd8a47a98 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -127,10 +127,6 @@ public:
 
     int32_t allocate_segment_id() override { return 
_segment_creator.allocate_segment_id(); };
 
-    bool is_doing_segcompaction() const override { return false; }
-
-    Status wait_flying_segcompaction() override { return Status::OK(); }
-
     int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
 
     int64_t segment_writer_ns() override { return _segment_writer_ns; }
diff --git a/be/src/olap/rowset/rowset_factory.cpp 
b/be/src/olap/rowset/rowset_factory.cpp
index 8447154483c..9758a09e7ea 100644
--- a/be/src/olap/rowset/rowset_factory.cpp
+++ b/be/src/olap/rowset/rowset_factory.cpp
@@ -27,6 +27,7 @@
 #include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/rowset_writer_context.h"
 #include "olap/rowset/vertical_beta_rowset_writer.h"
+#include "olap/storage_engine.h"
 
 namespace doris {
 using namespace ErrorCode;
@@ -51,10 +52,10 @@ Status RowsetFactory::create_rowset_writer(const 
RowsetWriterContext& context, b
     }
     if (context.rowset_type == BETA_ROWSET) {
         if (is_vertical) {
-            output->reset(new VerticalBetaRowsetWriter);
+            output->reset(new 
VerticalBetaRowsetWriter(*StorageEngine::instance()));
             return (*output)->init(context);
         }
-        output->reset(new BetaRowsetWriter);
+        output->reset(new BetaRowsetWriter(*StorageEngine::instance()));
         return (*output)->init(context);
     }
     return Status::Error<ROWSET_TYPE_NOT_FOUND>("invalid rowset_type");
diff --git a/be/src/olap/rowset/rowset_writer.h 
b/be/src/olap/rowset/rowset_writer.h
index 542528b1acb..d7ec494f0d6 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -146,10 +146,6 @@ public:
 
     virtual int32_t allocate_segment_id() = 0;
 
-    virtual bool is_doing_segcompaction() const = 0;
-
-    virtual Status wait_flying_segcompaction() = 0;
-
     virtual void set_segment_start_id(int num_segment) { LOG(FATAL) << "not 
supported!"; }
 
     virtual int64_t delete_bitmap_ns() { return 0; }
diff --git a/be/src/olap/rowset/segcompaction.h 
b/be/src/olap/rowset/segcompaction.h
index a0f81e59c77..e2b5812ad8c 100644
--- a/be/src/olap/rowset/segcompaction.h
+++ b/be/src/olap/rowset/segcompaction.h
@@ -16,7 +16,6 @@
 // under the License.
 
 #pragma once
-#include <stdint.h>
 
 #include <memory>
 #include <vector>
@@ -48,7 +47,7 @@ class SegcompactionWorker {
     friend class BetaRowsetWriter;
 
 public:
-    SegcompactionWorker(BetaRowsetWriter* writer);
+    explicit SegcompactionWorker(BetaRowsetWriter* writer);
 
     void compact_segments(SegCompactionCandidatesSharedPtr segments);
 
@@ -75,6 +74,7 @@ private:
 
 private:
     //TODO(zhengyu): current impl depends heavily on the access to feilds of 
BetaRowsetWriter
+    // Currently cloud storage engine doesn't need segcompaction
     BetaRowsetWriter* _writer = nullptr;
     io::FileWriterPtr _file_writer;
     std::atomic<bool> _cancelled = false;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index ac70ef8c1f4..2b8375da598 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -26,7 +26,7 @@
 #include <unordered_map>
 #include <utility>
 
-#include "cloud/config.h"
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
 #include "common/logging.h" // LOG
@@ -44,8 +44,10 @@
 #include "olap/rowset/segment_v2/page_pointer.h"
 #include "olap/segment_loader.h"
 #include "olap/short_key_index.h"
+#include "olap/storage_engine.h"
 #include "olap/tablet_schema.h"
 #include "olap/utils.h"
+#include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
 #include "service/point_query_executor.h"
 #include "util/coding.h"
@@ -336,11 +338,12 @@ void 
SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
 // 3. set columns to data convertor and then write all columns
 Status SegmentWriter::append_block_with_partial_content(const 
vectorized::Block* block,
                                                         size_t row_pos, size_t 
num_rows) {
-    if (config::cloud_mode) {
-        // TODO(plat1ko)
+    if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+        // TODO(plat1ko): cloud mode
         return Status::NotSupported("append_block_with_partial_content");
     }
-    auto tablet = static_cast<Tablet*>(_tablet.get());
+
+    auto* tablet = static_cast<Tablet*>(_tablet.get());
     if (block->columns() <= _tablet_schema->num_key_columns() ||
         block->columns() >= _tablet_schema->num_columns()) {
         return Status::InternalError(
@@ -559,7 +562,8 @@ Status 
SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
                                            const std::vector<bool>& 
use_default_or_null_flag,
                                            bool has_default_or_nullable,
                                            const size_t& segment_start_pos) {
-    if (config::cloud_mode) [[unlikely]] {
+    if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+        // TODO(plat1ko): cloud mode
         return Status::NotSupported("fill_missing_columns");
     }
     auto tablet = static_cast<Tablet*>(_tablet.get());
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 91f890b7f96..842eecf3e10 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -26,7 +26,6 @@
 #include <unordered_map>
 #include <utility>
 
-#include "cloud/config.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
 #include "common/logging.h" // LOG
@@ -45,6 +44,7 @@
 #include "olap/short_key_index.h"
 #include "olap/tablet_schema.h"
 #include "olap/utils.h"
+#include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
 #include "service/point_query_executor.h"
 #include "util/coding.h"
@@ -283,10 +283,11 @@ void 
VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& bl
 //       2.3 fill block
 // 3. set columns to data convertor and then write all columns
 Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& 
data) {
-    if (config::cloud_mode) {
-        // TODO(plat1ko)
+    if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+        // TODO(plat1ko): CloudStorageEngine
         return Status::NotSupported("append_block_with_partial_content");
     }
+
     DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write);
     DCHECK(_opts.rowset_ctx->partial_update_info != nullptr);
 
@@ -495,7 +496,8 @@ Status VerticalSegmentWriter::_fill_missing_columns(
         vectorized::MutableColumns& mutable_full_columns,
         const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
         const size_t& segment_start_pos) {
-    if (config::cloud_mode) [[unlikely]] {
+    if constexpr (!std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+        // TODO(plat1ko): CloudStorageEngine
         return Status::NotSupported("fill_missing_columns");
     }
     auto tablet = static_cast<Tablet*>(_tablet.get());
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp 
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index cd9fee5fb4e..35f878dd17c 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -41,6 +41,9 @@
 namespace doris {
 using namespace ErrorCode;
 
+VerticalBetaRowsetWriter::VerticalBetaRowsetWriter(StorageEngine& engine)
+        : BetaRowsetWriter(engine) {}
+
 VerticalBetaRowsetWriter::~VerticalBetaRowsetWriter() {
     if (!_already_built) {
         const auto& fs = _rowset_meta->fs();
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.h 
b/be/src/olap/rowset/vertical_beta_rowset_writer.h
index 8251ad0a07e..a1477dc2a71 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.h
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.h
@@ -33,10 +33,11 @@ class Block;
 } // namespace vectorized
 
 // for vertical compaction
-class VerticalBetaRowsetWriter : public BetaRowsetWriter {
+// TODO(plat1ko): Inherited from template type `T`, `T` is `BetaRowsetWriter` 
or `CloudBetaRowsetWriter`
+class VerticalBetaRowsetWriter final : public BetaRowsetWriter {
 public:
-    VerticalBetaRowsetWriter() : BetaRowsetWriter() {}
-    ~VerticalBetaRowsetWriter();
+    VerticalBetaRowsetWriter(StorageEngine& engine);
+    ~VerticalBetaRowsetWriter() override;
 
     Status add_columns(const vectorized::Block* block, const 
std::vector<uint32_t>& col_ids,
                        bool is_key, uint32_t max_rows_per_segment) override;
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 21fbed78022..4675d668f41 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -26,7 +26,7 @@
 #include <string>
 #include <utility>
 
-#include "cloud/config.h"
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/config.h"
 #include "common/status.h"
@@ -60,12 +60,16 @@
 namespace doris {
 using namespace ErrorCode;
 
-RowsetBuilder::RowsetBuilder(const WriteRequest& req, RuntimeProfile* profile)
+BaseRowsetBuilder::BaseRowsetBuilder(const WriteRequest& req, RuntimeProfile* 
profile)
         : _req(req), _tablet_schema(std::make_shared<TabletSchema>()) {
     _init_profile(profile);
 }
 
-void RowsetBuilder::_init_profile(RuntimeProfile* profile) {
+RowsetBuilder::RowsetBuilder(StorageEngine& engine, const WriteRequest& req,
+                             RuntimeProfile* profile)
+        : BaseRowsetBuilder(req, profile), _engine(engine) {}
+
+void BaseRowsetBuilder::_init_profile(RuntimeProfile* profile) {
     _profile = profile->create_child(fmt::format("RowsetBuilder {}", 
_req.tablet_id), true, true);
     _build_rowset_timer = ADD_TIMER(_profile, "BuildRowsetTime");
     _submit_delete_bitmap_timer = ADD_TIMER(_profile, 
"DeleteBitmapSubmitTime");
@@ -73,11 +77,7 @@ void RowsetBuilder::_init_profile(RuntimeProfile* profile) {
     _commit_txn_timer = ADD_TIMER(_profile, "CommitTxnTime");
 }
 
-RowsetBuilder::~RowsetBuilder() {
-    if (_is_init && !_is_committed) {
-        _garbage_collection();
-    }
-
+BaseRowsetBuilder::~BaseRowsetBuilder() {
     if (!_is_init) {
         return;
     }
@@ -87,34 +87,39 @@ RowsetBuilder::~RowsetBuilder() {
     }
 }
 
-void RowsetBuilder::_garbage_collection() {
-    if (config::cloud_mode) {
-        return;
+RowsetBuilder::~RowsetBuilder() {
+    if (_is_init && !_is_committed) {
+        _garbage_collection();
     }
+}
+
+Tablet* RowsetBuilder::tablet() {
+    return static_cast<Tablet*>(_tablet.get());
+}
+
+TabletSharedPtr RowsetBuilder::tablet_sptr() {
+    return std::static_pointer_cast<Tablet>(_tablet);
+}
+
+void RowsetBuilder::_garbage_collection() {
     Status rollback_status;
-    TxnManager* txn_mgr = StorageEngine::instance()->txn_manager();
-    auto tablet = static_cast<Tablet*>(_tablet.get());
-    if (tablet != nullptr) {
-        rollback_status = txn_mgr->rollback_txn(_req.partition_id, *tablet, 
_req.txn_id);
+    TxnManager* txn_mgr = _engine.txn_manager();
+    if (tablet() != nullptr) {
+        rollback_status = txn_mgr->rollback_txn(_req.partition_id, *tablet(), 
_req.txn_id);
     }
     // has to check rollback status, because the rowset maybe committed in 
this thread and
     // published in another thread, then rollback will fail.
     // when rollback failed should not delete rowset
     if (rollback_status.ok()) {
-        StorageEngine::instance()->add_unused_rowset(_rowset);
+        _engine.add_unused_rowset(_rowset);
     }
 }
 
 Status RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& 
mow_context) {
-    if (config::cloud_mode) {
-        // TODO(plat1ko)
-        return Status::NotSupported("init_mow_context");
-    }
-    auto tablet = static_cast<Tablet*>(_tablet.get());
-    std::lock_guard<std::shared_mutex> lck(tablet->get_header_lock());
-    int64_t cur_max_version = tablet->max_version_unlocked().second;
+    std::lock_guard<std::shared_mutex> lck(tablet()->get_header_lock());
+    int64_t cur_max_version = tablet()->max_version_unlocked().second;
     // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (tablet->tablet_state() == TABLET_NOTREADY) {
+    if (tablet()->tablet_state() == TABLET_NOTREADY) {
         // Disable 'partial_update' when the tablet is undergoing a 'schema 
changing process'
         if (_req.table_schema_param->is_partial_update()) {
             return Status::InternalError(
@@ -123,9 +128,9 @@ Status 
RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
         }
         _rowset_ids.clear();
     } else {
-        RETURN_IF_ERROR(tablet->all_rs_id(cur_max_version, &_rowset_ids));
+        RETURN_IF_ERROR(tablet()->all_rs_id(cur_max_version, &_rowset_ids));
     }
-    _delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
+    _delete_bitmap = std::make_shared<DeleteBitmap>(tablet()->tablet_id());
     mow_context =
             std::make_shared<MowContext>(cur_max_version, _req.txn_id, 
_rowset_ids, _delete_bitmap);
     return Status::OK();
@@ -136,41 +141,31 @@ Status RowsetBuilder::check_tablet_version_count() {
         MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
         return Status::OK();
     }
-    if (config::cloud_mode) {
-        // TODO(plat1ko)
-        return Status::OK();
-    }
-    auto tablet = std::static_pointer_cast<Tablet>(_tablet);
     //trigger compaction
-    auto st = StorageEngine::instance()->submit_compaction_task(
-            tablet, CompactionType::CUMULATIVE_COMPACTION, true);
+    auto st = _engine.submit_compaction_task(tablet_sptr(), 
CompactionType::CUMULATIVE_COMPACTION,
+                                             true);
     if (!st.ok()) [[unlikely]] {
         LOG(WARNING) << "failed to trigger compaction, tablet_id=" << 
_tablet->tablet_id() << " : "
                      << st;
     }
-    int version_count = tablet->version_count();
+    int version_count = tablet()->version_count();
     if (version_count > config::max_tablet_version_num) {
         return Status::Error<TOO_MANY_VERSION>(
                 "failed to init rowset builder. version count: {}, exceed 
limit: {}, "
                 "tablet: {}",
-                version_count, config::max_tablet_version_num, 
tablet->tablet_id());
+                version_count, config::max_tablet_version_num, 
_tablet->tablet_id());
     }
     return Status::OK();
 }
 
 Status RowsetBuilder::prepare_txn() {
-    if (config::cloud_mode) {
-        // TODO(plat1ko)
-        return Status::OK();
-    }
-    auto tablet = static_cast<Tablet*>(_tablet.get());
-    std::shared_lock base_migration_lock(tablet->get_migration_lock(), 
std::try_to_lock);
+    std::shared_lock base_migration_lock(tablet()->get_migration_lock(), 
std::try_to_lock);
     if (!base_migration_lock.owns_lock()) {
         return Status::Error<TRY_LOCK_FAILED>("try migration lock failed");
     }
-    std::lock_guard<std::mutex> push_lock(tablet->get_push_lock());
-    return 
StorageEngine::instance()->txn_manager()->prepare_txn(_req.partition_id, 
*tablet,
-                                                                 _req.txn_id, 
_req.load_id);
+    std::lock_guard<std::mutex> push_lock(tablet()->get_push_lock());
+    return _engine.txn_manager()->prepare_txn(_req.partition_id, *tablet(), 
_req.txn_id,
+                                              _req.load_id);
 }
 
 Status RowsetBuilder::init() {
@@ -205,18 +200,13 @@ Status RowsetBuilder::init() {
     _rowset_writer = DORIS_TRY(_tablet->create_rowset_writer(context, false));
     _pending_rs_guard = 
StorageEngine::instance()->pending_local_rowsets().add(context.rowset_id);
 
-    if (config::cloud_mode) {
-        // TODO(plat1ko)
-    } else {
-        _calc_delete_bitmap_token =
-                
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
-    }
+    _calc_delete_bitmap_token = 
_engine.calc_delete_bitmap_executor()->create_token();
 
     _is_init = true;
     return Status::OK();
 }
 
-Status RowsetBuilder::build_rowset() {
+Status BaseRowsetBuilder::build_rowset() {
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init) << "rowset builder is supposed be to initialized before "
                         "build_rowset() being called";
@@ -231,27 +221,22 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
     if (!_tablet->enable_unique_key_merge_on_write()) {
         return Status::OK();
     }
-    if (config::cloud_mode) {
-        // TODO(plat1ko)
-        return Status::OK();
-    }
-    auto tablet = static_cast<Tablet*>(_tablet.get());
     std::lock_guard<std::mutex> l(_lock);
     SCOPED_TIMER(_submit_delete_bitmap_timer);
     // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (tablet->tablet_state() == TABLET_NOTREADY) {
+    if (tablet()->tablet_state() == TABLET_NOTREADY) {
         LOG(INFO) << "tablet is under alter process, delete bitmap will be 
calculated later, "
                      "tablet_id: "
-                  << tablet->tablet_id() << " txn_id: " << _req.txn_id;
+                  << tablet()->tablet_id() << " txn_id: " << _req.txn_id;
         return Status::OK();
     }
-    auto beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get());
+    auto* beta_rowset = reinterpret_cast<BetaRowset*>(_rowset.get());
     std::vector<segment_v2::SegmentSharedPtr> segments;
     RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
     if (segments.size() > 1) {
         // calculate delete bitmap between segments
         RETURN_IF_ERROR(
-                tablet->calc_delete_bitmap_between_segments(_rowset, segments, 
_delete_bitmap));
+                tablet()->calc_delete_bitmap_between_segments(_rowset, 
segments, _delete_bitmap));
     }
 
     // For partial update, we need to fill in the entire row of data, during 
the calculation
@@ -261,14 +246,14 @@ Status RowsetBuilder::submit_calc_delete_bitmap_task() {
         return Status::OK();
     }
 
-    LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " << 
tablet->tablet_id()
+    LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " << 
tablet()->tablet_id()
               << ", txn_id: " << _req.txn_id;
-    return tablet->commit_phase_update_delete_bitmap(_rowset, _rowset_ids, 
_delete_bitmap, segments,
-                                                     _req.txn_id, 
_calc_delete_bitmap_token.get(),
-                                                     nullptr);
+    return tablet()->commit_phase_update_delete_bitmap(_rowset, _rowset_ids, 
_delete_bitmap,
+                                                       segments, _req.txn_id,
+                                                       
_calc_delete_bitmap_token.get(), nullptr);
 }
 
-Status RowsetBuilder::wait_calc_delete_bitmap() {
+Status BaseRowsetBuilder::wait_calc_delete_bitmap() {
     if (!_tablet->enable_unique_key_merge_on_write() || 
_partial_update_info->is_partial_update) {
         return Status::OK();
     }
@@ -281,15 +266,10 @@ Status RowsetBuilder::wait_calc_delete_bitmap() {
 }
 
 Status RowsetBuilder::commit_txn() {
-    if (config::cloud_mode) {
-        // TODO(plat1ko)
-        return Status::OK();
-    }
-    auto tablet = static_cast<Tablet*>(_tablet.get());
-    if (tablet->enable_unique_key_merge_on_write() &&
+    if (tablet()->enable_unique_key_merge_on_write() &&
         config::enable_merge_on_write_correctness_check && _rowset->num_rows() 
!= 0 &&
-        tablet->tablet_state() != TABLET_NOTREADY) {
-        auto st = tablet->check_delete_bitmap_correctness(
+        tablet()->tablet_state() != TABLET_NOTREADY) {
+        auto st = tablet()->check_delete_bitmap_correctness(
                 _delete_bitmap, _rowset->end_version() - 1, _req.txn_id, 
_rowset_ids);
         if (!st.ok()) {
             LOG(WARNING) << fmt::format(
@@ -300,21 +280,20 @@ Status RowsetBuilder::commit_txn() {
             return st;
         }
     }
-    auto storage_engine = StorageEngine::instance();
     std::lock_guard<std::mutex> l(_lock);
     SCOPED_TIMER(_commit_txn_timer);
-    if (_tablet->tablet_schema()->num_variant_columns() > 0) {
+    if (tablet()->tablet_schema()->num_variant_columns() > 0) {
         // update tablet schema when meet variant columns, before commit_txn
         // Eg. rowset schema:       A(int),    B(float),  C(int), D(int)
         // _tabelt->tablet_schema:  A(bigint), B(double)
         //  => update_schema:       A(bigint), B(double), C(int), D(int)
         const RowsetWriterContext& rw_ctx = _rowset_writer->context();
-        
RETURN_IF_ERROR(_tablet->update_by_least_common_schema(rw_ctx.tablet_schema));
+        
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema));
     }
     // Transfer ownership of `PendingRowsetGuard` to `TxnManager`
-    Status res = storage_engine->txn_manager()->commit_txn(_req.partition_id, 
*tablet, _req.txn_id,
-                                                           _req.load_id, 
_rowset,
-                                                           
std::move(_pending_rs_guard), false);
+    Status res = _engine.txn_manager()->commit_txn(_req.partition_id, 
*tablet(), _req.txn_id,
+                                                   _req.load_id, _rowset,
+                                                   
std::move(_pending_rs_guard), false);
 
     if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
         LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
@@ -322,8 +301,8 @@ Status RowsetBuilder::commit_txn() {
         return res;
     }
     if (_tablet->enable_unique_key_merge_on_write()) {
-        storage_engine->txn_manager()->set_txn_related_delete_bitmap(
-                _req.partition_id, _req.txn_id, tablet->tablet_id(), 
tablet->tablet_uid(), true,
+        _engine.txn_manager()->set_txn_related_delete_bitmap(
+                _req.partition_id, _req.txn_id, tablet()->tablet_id(), 
tablet()->tablet_uid(), true,
                 _delete_bitmap, _rowset_ids, _partial_update_info);
     }
 
@@ -331,7 +310,7 @@ Status RowsetBuilder::commit_txn() {
     return Status::OK();
 }
 
-Status RowsetBuilder::cancel() {
+Status BaseRowsetBuilder::cancel() {
     std::lock_guard<std::mutex> l(_lock);
     if (_is_cancelled) {
         return Status::OK();
@@ -343,9 +322,9 @@ Status RowsetBuilder::cancel() {
     return Status::OK();
 }
 
-void RowsetBuilder::_build_current_tablet_schema(int64_t index_id,
-                                                 const OlapTableSchemaParam* 
table_schema_param,
-                                                 const TabletSchema& 
ori_tablet_schema) {
+void BaseRowsetBuilder::_build_current_tablet_schema(int64_t index_id,
+                                                     const 
OlapTableSchemaParam* table_schema_param,
+                                                     const TabletSchema& 
ori_tablet_schema) {
     _tablet_schema->copy_from(ori_tablet_schema);
     // find the right index id
     int i = 0;
@@ -356,7 +335,7 @@ void RowsetBuilder::_build_current_tablet_schema(int64_t 
index_id,
         }
     }
 
-    if (indexes.size() > 0 && indexes[i]->columns.size() != 0 &&
+    if (!indexes.empty() && !indexes[i]->columns.empty() &&
         indexes[i]->columns[0]->unique_id() >= 0) {
         _tablet_schema->build_current_tablet_schema(index_id, 
table_schema_param->version(),
                                                     indexes[i], 
ori_tablet_schema);
diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h
index ea849f80e61..8f254074c37 100644
--- a/be/src/olap/rowset_builder.h
+++ b/be/src/olap/rowset_builder.h
@@ -17,8 +17,6 @@
 
 #pragma once
 
-#include <stdint.h>
-
 #include <atomic>
 #include <memory>
 #include <mutex>
@@ -53,21 +51,21 @@ class Block;
 
 // Writer for a particular (load, index, tablet).
 // This class is NOT thread-safe, external synchronization is required.
-class RowsetBuilder {
+class BaseRowsetBuilder {
 public:
-    RowsetBuilder(const WriteRequest& req, RuntimeProfile* profile);
+    BaseRowsetBuilder(const WriteRequest& req, RuntimeProfile* profile);
 
-    ~RowsetBuilder();
+    virtual ~BaseRowsetBuilder();
 
-    Status init();
+    virtual Status init() = 0;
 
     Status build_rowset();
 
-    Status submit_calc_delete_bitmap_task();
+    virtual Status submit_calc_delete_bitmap_task() = 0;
 
     Status wait_calc_delete_bitmap();
 
-    Status commit_txn();
+    virtual Status commit_txn() = 0;
 
     Status cancel();
 
@@ -86,21 +84,13 @@ public:
         return _partial_update_info;
     }
 
-private:
-    void _garbage_collection();
-
+protected:
     void _build_current_tablet_schema(int64_t index_id,
                                       const OlapTableSchemaParam* 
table_schema_param,
                                       const TabletSchema& ori_tablet_schema);
 
     void _init_profile(RuntimeProfile* profile);
 
-    Status init_mow_context(std::shared_ptr<MowContext>& mow_context);
-
-    Status check_tablet_version_count();
-
-    Status prepare_txn();
-
     bool _is_init = false;
     bool _is_cancelled = false;
     bool _is_committed = false;
@@ -127,4 +117,33 @@ private:
     RuntimeProfile::Counter* _commit_txn_timer = nullptr;
 };
 
+// `StorageEngine` mixin for `BaseRowsetBuilder`
+class RowsetBuilder final : public BaseRowsetBuilder {
+public:
+    RowsetBuilder(StorageEngine& engine, const WriteRequest& req, 
RuntimeProfile* profile);
+
+    ~RowsetBuilder() override;
+
+    Status init() override;
+
+    Status commit_txn() override;
+
+    Status submit_calc_delete_bitmap_task() override;
+
+private:
+    Status check_tablet_version_count();
+
+    Status prepare_txn();
+
+    void _garbage_collection();
+
+    Status init_mow_context(std::shared_ptr<MowContext>& mow_context);
+
+    // Cast `BaseTablet` to `Tablet`
+    Tablet* tablet();
+    TabletSharedPtr tablet_sptr();
+
+    StorageEngine& _engine;
+};
+
 } // namespace doris
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 6ff06bc0189..7115dd76f13 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -42,9 +42,9 @@ ExecEnv::~ExecEnv() {
     destroy();
 }
 
+// TODO(plat1ko): template <class Engine>
 Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t tablet_id) {
     BaseTabletSPtr tablet;
-    // TODO(plat1ko): config::cloud_mode
     std::string err;
     tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
     if (tablet == nullptr) {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index c405953d6c1..94ba0720fb8 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -112,6 +112,12 @@ inline bool k_doris_exit = false;
 // once to properly initialise service state.
 class ExecEnv {
 public:
+#ifdef CLOUD_MODE
+    using Engine = CloudStorageEngine; // TODO(plat1ko)
+#else
+    using Engine = StorageEngine;
+#endif
+
     // Empty destructor because the compiler-generated one requires full
     // declarations for classes in scoped_ptrs.
     ~ExecEnv();
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 7c6549d692f..0dc0ac344b3 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -21,6 +21,7 @@
 #include <glog/logging.h>
 
 #include "bvar/bvar.h"
+#include "olap/storage_engine.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/tablets_channel.h"
 
@@ -68,7 +69,7 @@ void LoadChannel::_init_profile() {
 
 Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
     int64_t index_id = params.index_id();
-    std::shared_ptr<TabletsChannel> channel;
+    std::shared_ptr<BaseTabletsChannel> channel;
     {
         std::lock_guard<std::mutex> l(_lock);
         auto it = _tablets_channels.find(index_id);
@@ -77,8 +78,9 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& 
params) {
         } else {
             // create a new tablets channel
             TabletsChannelKey key(params.id(), index_id);
-            channel = std::make_shared<TabletsChannel>(key, _load_id, 
_is_high_priority,
-                                                       _self_profile);
+            // TODO(plat1ko): CloudTabletsChannel
+            channel = 
std::make_shared<TabletsChannel>(*StorageEngine::instance(), key, _load_id,
+                                                       _is_high_priority, 
_self_profile);
             {
                 std::lock_guard<SpinLock> l(_tablets_channels_lock);
                 _tablets_channels.insert({index_id, channel});
@@ -98,7 +100,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& 
params) {
     return Status::OK();
 }
 
-Status LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& 
channel,
+Status LoadChannel::_get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& 
channel,
                                          bool& is_finished, const int64_t 
index_id) {
     std::lock_guard<std::mutex> l(_lock);
     auto it = _tablets_channels.find(index_id);
@@ -124,7 +126,7 @@ Status LoadChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
     COUNTER_UPDATE(_add_batch_times, 1);
     int64_t index_id = request.index_id();
     // 1. get tablets channel
-    std::shared_ptr<TabletsChannel> channel;
+    std::shared_ptr<BaseTabletsChannel> channel;
     bool is_finished = false;
     Status st = _get_tablets_channel(channel, is_finished, index_id);
     if (!st.ok() || is_finished) {
@@ -139,7 +141,7 @@ Status LoadChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
 
     // 3. handle eos
     if (request.has_eos() && request.eos()) {
-        st = _handle_eos(channel, request, response);
+        st = _handle_eos(channel.get(), request, response);
         _report_profile(response);
         if (!st.ok()) {
             return st;
@@ -151,6 +153,25 @@ Status LoadChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
     return st;
 }
 
+Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
+                                const PTabletWriterAddBlockRequest& request,
+                                PTabletWriterAddBlockResult* response) {
+    _self_profile->add_info_string("EosHost", fmt::format("{}", 
request.backend_id()));
+    bool finished = false;
+    auto index_id = request.index_id();
+
+    RETURN_IF_ERROR(channel->close(this, request, response, &finished));
+    if (finished) {
+        std::lock_guard<std::mutex> l(_lock);
+        {
+            std::lock_guard<SpinLock> l(_tablets_channels_lock);
+            _tablets_channels.erase(index_id);
+        }
+        _finished_channel_ids.emplace(index_id);
+    }
+    return Status::OK();
+}
+
 void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) {
     if (!_enable_profile) {
         return;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index ea0fa0f2486..bdeedbd9eae 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -17,10 +17,6 @@
 
 #pragma once
 
-#include <gen_cpp/internal_service.pb.h>
-#include <stdint.h>
-#include <time.h>
-
 #include <algorithm>
 #include <atomic>
 #include <functional>
@@ -38,17 +34,18 @@
 #include "olap/memtable_memory_limiter.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
-#include "runtime/tablets_channel.h"
 #include "util/runtime_profile.h"
 #include "util/spinlock.h"
 #include "util/thrift_util.h"
 #include "util/uid_util.h"
-//#include <gen_cpp/internal_service.pb.h>
 
 namespace doris {
 
 class PTabletWriterOpenRequest;
+class PTabletWriterAddBlockRequest;
+class PTabletWriterAddBlockResult;
 class OpenPartitionRequest;
+class BaseTabletsChannel;
 
 // A LoadChannel manages tablets channels for all indexes
 // corresponding to a certain load job
@@ -82,31 +79,11 @@ public:
     RuntimeProfile::Counter* get_handle_mem_limit_timer() { return 
_handle_mem_limit_timer; }
 
 protected:
-    Status _get_tablets_channel(std::shared_ptr<TabletsChannel>& channel, 
bool& is_finished,
-                                const int64_t index_id);
-
-    Status _handle_eos(std::shared_ptr<TabletsChannel>& channel,
-                       const PTabletWriterAddBlockRequest& request,
-                       PTabletWriterAddBlockResult* response) {
-        _self_profile->add_info_string("EosHost", fmt::format("{}", 
request.backend_id()));
-        bool finished = false;
-        auto index_id = request.index_id();
-
-        RETURN_IF_ERROR(channel->close(
-                this, request.sender_id(), request.backend_id(), &finished, 
request.partition_ids(),
-                response->mutable_tablet_vec(), 
response->mutable_tablet_errors(),
-                request.slave_tablet_nodes(), 
response->mutable_success_slave_tablet_node_ids(),
-                request.write_single_replica()));
-        if (finished) {
-            std::lock_guard<std::mutex> l(_lock);
-            {
-                std::lock_guard<SpinLock> l(_tablets_channels_lock);
-                _tablets_channels.erase(index_id);
-            }
-            _finished_channel_ids.emplace(index_id);
-        }
-        return Status::OK();
-    }
+    Status _get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& channel, 
bool& is_finished,
+                                int64_t index_id);
+
+    Status _handle_eos(BaseTabletsChannel* channel, const 
PTabletWriterAddBlockRequest& request,
+                       PTabletWriterAddBlockResult* response);
 
     void _init_profile();
     // thread safety
@@ -129,7 +106,7 @@ private:
     // lock protect the tablets channel map
     std::mutex _lock;
     // index id -> tablets channel
-    std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> 
_tablets_channels;
+    std::unordered_map<int64_t, std::shared_ptr<BaseTabletsChannel>> 
_tablets_channels;
     SpinLock _tablets_channels_lock;
     // This is to save finished channels id, to handle the retry request.
     std::unordered_set<int64_t> _finished_channel_ids;
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index cc06f2a2f7e..a4d359dc0ea 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -17,9 +17,8 @@
 
 #pragma once
 
+#include <bthread/mutex.h>
 #include <gen_cpp/internal_service.pb.h>
-#include <runtime/load_stream_writer.h>
-#include <stdint.h>
 
 #include <condition_variable>
 #include <memory>
@@ -31,10 +30,15 @@
 #include "butil/iobuf.h"
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/status.h"
+#include "runtime/load_stream_writer.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
 
+class LoadStreamMgr;
+class ThreadPoolToken;
+class OlapTableSchemaParam;
+
 // origin_segid(index) -> new_segid(value in vector)
 using SegIdMapping = std::vector<uint32_t>;
 class TabletStream {
@@ -47,7 +51,7 @@ public:
     Status append_data(const PStreamHeader& header, butil::IOBuf* data);
     Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
     Status close();
-    int64_t id() { return _id; }
+    int64_t id() const { return _id; }
 
     friend std::ostream& operator<<(std::ostream& ostr, const TabletStream& 
tablet_stream);
 
@@ -103,7 +107,7 @@ using StreamId = brpc::StreamId;
 class LoadStream : public brpc::StreamInputHandler {
 public:
     LoadStream(PUniqueId load_id, LoadStreamMgr* load_stream_mgr, bool 
enable_profile);
-    ~LoadStream();
+    ~LoadStream() override;
 
     Status init(const POpenLoadStreamRequest* request);
 
diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h
index 1228061bb40..466a23c8c5c 100644
--- a/be/src/runtime/load_stream_mgr.h
+++ b/be/src/runtime/load_stream_mgr.h
@@ -17,10 +17,6 @@
 
 #pragma once
 
-#include <gen_cpp/internal_service.pb.h>
-#include <runtime/load_stream.h>
-#include <stdint.h>
-
 #include <condition_variable>
 #include <memory>
 #include <mutex>
@@ -29,9 +25,13 @@
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/status.h"
+#include "runtime/load_stream.h"
+#include "util/threadpool.h"
 
 namespace doris {
 
+class POpenStreamSinkRequest;
+
 class LoadStreamMgr {
 public:
     LoadStreamMgr(uint32_t segment_file_writer_thread_num, FifoThreadPool* 
heavy_work_pool,
diff --git a/be/src/runtime/load_stream_writer.cpp 
b/be/src/runtime/load_stream_writer.cpp
index 52948429e32..427ace47d00 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -48,6 +48,7 @@
 #include "olap/rowset/rowset_writer_context.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 #include "olap/rowset/segment_v2/segment.h"
+#include "olap/rowset_builder.h"
 #include "olap/schema.h"
 #include "olap/schema_change.h"
 #include "olap/storage_engine.h"
@@ -67,14 +68,17 @@
 namespace doris {
 using namespace ErrorCode;
 
-LoadStreamWriter::LoadStreamWriter(WriteRequest* req, RuntimeProfile* profile)
-        : _req(*req), _rowset_builder(*req, profile), _rowset_writer(nullptr) 
{}
+LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* 
profile)
+        : _req(*context), _rowset_writer(nullptr) {
+    _rowset_builder =
+            std::make_unique<RowsetBuilder>(*StorageEngine::instance(), 
*context, profile);
+}
 
 LoadStreamWriter::~LoadStreamWriter() = default;
 
 Status LoadStreamWriter::init() {
-    RETURN_IF_ERROR(_rowset_builder.init());
-    _rowset_writer = _rowset_builder.rowset_writer();
+    RETURN_IF_ERROR(_rowset_builder->init());
+    _rowset_writer = _rowset_builder->rowset_writer();
     _is_init = true;
     return Status::OK();
 }
@@ -157,10 +161,10 @@ Status LoadStreamWriter::close() {
         }
     }
 
-    RETURN_IF_ERROR(_rowset_builder.build_rowset());
-    RETURN_IF_ERROR(_rowset_builder.submit_calc_delete_bitmap_task());
-    RETURN_IF_ERROR(_rowset_builder.wait_calc_delete_bitmap());
-    RETURN_IF_ERROR(_rowset_builder.commit_txn());
+    RETURN_IF_ERROR(_rowset_builder->build_rowset());
+    RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
+    RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
+    RETURN_IF_ERROR(_rowset_builder->commit_txn());
 
     return Status::OK();
 }
diff --git a/be/src/runtime/load_stream_writer.h 
b/be/src/runtime/load_stream_writer.h
index 37514377a3d..e038ceeb89b 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -17,11 +17,6 @@
 
 #pragma once
 
-#include <gen_cpp/Types_types.h>
-#include <gen_cpp/internal_service.pb.h>
-#include <gen_cpp/types.pb.h>
-#include <stdint.h>
-
 #include <atomic>
 #include <memory>
 #include <mutex>
@@ -32,16 +27,12 @@
 #include "brpc/stream.h"
 #include "butil/iobuf.h"
 #include "common/status.h"
+#include "io/fs/file_reader_writer_fwd.h"
 #include "olap/delta_writer_context.h"
 #include "olap/memtable.h"
 #include "olap/olap_common.h"
-#include "olap/rowset/beta_rowset_writer.h"
-#include "olap/rowset/rowset.h"
-#include "olap/rowset/rowset_writer.h"
-#include "olap/rowset_builder.h"
-#include "olap/tablet.h"
-#include "olap/tablet_meta.h"
-#include "olap/tablet_schema.h"
+#include "olap/rowset/rowset_fwd.h"
+#include "olap/tablet_fwd.h"
 #include "util/spinlock.h"
 #include "util/uid_util.h"
 
@@ -53,6 +44,9 @@ class SlotDescriptor;
 class OlapTableSchemaParam;
 class RowsetWriter;
 class RuntimeProfile;
+struct SegmentStatistics;
+using SegmentStatisticsSharedPtr = std::shared_ptr<SegmentStatistics>;
+class BaseRowsetBuilder;
 
 namespace vectorized {
 class Block;
@@ -76,13 +70,13 @@ public:
     // wait for all memtables to be flushed.
     Status close();
 
-    int64_t tablet_id() { return _req.tablet_id; }
+    int64_t tablet_id() const { return _req.tablet_id; }
 
 private:
     bool _is_init = false;
     bool _is_canceled = false;
     WriteRequest _req;
-    RowsetBuilder _rowset_builder;
+    std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
     std::shared_ptr<RowsetWriter> _rowset_writer;
     std::mutex _lock;
 
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 5508bc3005a..d0d742e9152 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -50,10 +50,10 @@ class SlotDescriptor;
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
 
-std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
+std::atomic<uint64_t> BaseTabletsChannel::_s_tablet_writer_count;
 
-TabletsChannel::TabletsChannel(const TabletsChannelKey& key, const UniqueId& 
load_id,
-                               bool is_high_priority, RuntimeProfile* profile)
+BaseTabletsChannel::BaseTabletsChannel(const TabletsChannelKey& key, const 
UniqueId& load_id,
+                                       bool is_high_priority, RuntimeProfile* 
profile)
         : _key(key),
           _state(kInitialized),
           _load_id(load_id),
@@ -66,21 +66,41 @@ TabletsChannel::TabletsChannel(const TabletsChannelKey& 
key, const UniqueId& loa
     });
 }
 
-TabletsChannel::~TabletsChannel() {
+TabletsChannel::TabletsChannel(StorageEngine& engine, const TabletsChannelKey& 
key,
+                               const UniqueId& load_id, bool is_high_priority,
+                               RuntimeProfile* profile)
+        : BaseTabletsChannel(key, load_id, is_high_priority, profile), 
_engine(engine) {}
+
+BaseTabletsChannel::~BaseTabletsChannel() {
     _s_tablet_writer_count -= _tablet_writers.size();
-    for (auto& it : _tablet_writers) {
-        delete it.second;
+}
+
+TabletsChannel::~TabletsChannel() = default;
+
+Status BaseTabletsChannel::_get_current_seq(int64_t& cur_seq,
+                                            const 
PTabletWriterAddBlockRequest& request) {
+    std::lock_guard<std::mutex> l(_lock);
+    if (_state != kOpened) {
+        return _state == kFinished ? _close_status
+                                   : Status::InternalError("TabletsChannel {} 
state: {}",
+                                                           _key.to_string(), 
_state);
     }
-    delete _schema;
+    cur_seq = _next_seqs[request.sender_id()];
+    // check packet
+    if (request.packet_seq() > cur_seq) {
+        LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
+                     << ", recept_seq=" << request.packet_seq();
+        return Status::InternalError("lost data packet");
+    }
+    return Status::OK();
 }
 
-void TabletsChannel::_init_profile(RuntimeProfile* profile) {
+void BaseTabletsChannel::_init_profile(RuntimeProfile* profile) {
     _profile =
             profile->create_child(fmt::format("TabletsChannel {}", 
_key.to_string()), true, true);
     _add_batch_number_counter = ADD_COUNTER(_profile, "NumberBatchAdded", 
TUnit::UNIT);
 
     auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true);
-    _slave_replica_timer = ADD_TIMER(_profile, "SlaveReplicaTime");
     _add_batch_timer = ADD_TIMER(_profile, "AddBatchTime");
     _write_block_timer = ADD_TIMER(_profile, "WriteBlockTime");
     _incremental_open_timer = ADD_TIMER(_profile, "IncrementalOpenTabletTime");
@@ -95,7 +115,12 @@ void TabletsChannel::_init_profile(RuntimeProfile* profile) 
{
             memory_usage->AddHighWaterMarkCounter("MaxTabletFlush", 
TUnit::BYTES);
 }
 
-Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
+void TabletsChannel::_init_profile(RuntimeProfile* profile) {
+    BaseTabletsChannel::_init_profile(profile);
+    _slave_replica_timer = ADD_TIMER(_profile, "SlaveReplicaTime");
+}
+
+Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kOpened) {
         // Normal case, already open by other sender
@@ -105,7 +130,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& 
request) {
               << ", timeout(s): " << request.load_channel_timeout_s();
     _txn_id = request.txn_id();
     _index_id = request.index_id();
-    _schema = new OlapTableSchemaParam();
+    _schema = std::make_unique<OlapTableSchemaParam>();
     RETURN_IF_ERROR(_schema->init(request.schema()));
     _tuple_desc = _schema->tuple_desc();
 
@@ -119,7 +144,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& 
request) {
     return Status::OK();
 }
 
-Status TabletsChannel::incremental_open(const PTabletWriterOpenRequest& 
params) {
+Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& 
params) {
     SCOPED_TIMER(_incremental_open_timer);
     if (_state == kInitialized) { // haven't opened
         return open(params);
@@ -159,22 +184,15 @@ Status TabletsChannel::incremental_open(const 
PTabletWriterOpenRequest& params)
         wrequest.tuple_desc = _tuple_desc;
         wrequest.slots = index_slots;
         wrequest.is_high_priority = _is_high_priority;
-        wrequest.table_schema_param = _schema;
+        wrequest.table_schema_param = _schema.get();
 
-        DeltaWriter* writer = nullptr;
-        auto st = DeltaWriter::open(&wrequest, &writer, _profile, _load_id);
-        if (!st.ok()) {
-            auto err_msg = fmt::format(
-                    "open delta writer failed, tablet_id={}"
-                    ", txn_id={}, partition_id={}, err={}",
-                    tablet.tablet_id(), _txn_id, tablet.partition_id(), 
st.to_string());
-            LOG(WARNING) << err_msg;
-            return Status::InternalError(err_msg);
-        }
+        // TODO(plat1ko): CloudDeltaWriter
+        auto delta_writer = 
std::make_unique<DeltaWriter>(*StorageEngine::instance(), &wrequest,
+                                                          _profile, _load_id);
         ss << "[" << tablet.tablet_id() << "]";
         {
             std::lock_guard<SpinLock> l(_tablet_writers_lock);
-            _tablet_writers.emplace(tablet.tablet_id(), writer);
+            _tablet_writers.emplace(tablet.tablet_id(), 
std::move(delta_writer));
         }
     }
 
@@ -185,14 +203,12 @@ Status TabletsChannel::incremental_open(const 
PTabletWriterOpenRequest& params)
     return Status::OK();
 }
 
-Status TabletsChannel::close(
-        LoadChannel* parent, int sender_id, int64_t backend_id, bool* finished,
-        const google::protobuf::RepeatedField<int64_t>& partition_ids,
-        google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
-        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
-        const google::protobuf::Map<int64_t, PSlaveTabletNodes>& 
slave_tablet_nodes,
-        google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* 
success_slave_tablet_node_ids,
-        const bool write_single_replica) {
+Status TabletsChannel::close(LoadChannel* parent, const 
PTabletWriterAddBlockRequest& req,
+                             PTabletWriterAddBlockResult* res, bool* finished) 
{
+    int sender_id = req.sender_id();
+    int64_t backend_id = req.backend_id();
+    const auto& partition_ids = req.partition_ids();
+    auto* tablet_errors = res->mutable_tablet_errors();
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
         return _close_status;
@@ -215,17 +231,17 @@ Status TabletsChannel::close(
         // All senders are closed
         // 1. close all delta writers
         std::set<DeltaWriter*> need_wait_writers;
-        for (auto& it : _tablet_writers) {
-            if (_partition_ids.count(it.second->partition_id()) > 0) {
-                auto st = it.second->close();
+        for (auto&& [tablet_id, writer] : _tablet_writers) {
+            if (_partition_ids.contains(writer->partition_id())) {
+                auto st = writer->close();
                 if (!st.ok()) {
                     auto err_msg = fmt::format(
                             "close tablet writer failed, tablet_id={}, "
                             "transaction_id={}, err={}",
-                            it.first, _txn_id, st.to_string());
+                            tablet_id, _txn_id, st.to_string());
                     LOG(WARNING) << err_msg;
                     PTabletError* tablet_error = tablet_errors->Add();
-                    tablet_error->set_tablet_id(it.first);
+                    tablet_error->set_tablet_id(tablet_id);
                     tablet_error->set_msg(st.to_string());
                     // just skip this tablet(writer) and continue to close 
others
                     continue;
@@ -233,30 +249,30 @@ Status TabletsChannel::close(
                 // tablet writer in `_broken_tablets` should not call 
`build_rowset` and
                 // `commit_txn` method, after that, the publish-version task 
will success,
                 // which can cause the replica inconsistency.
-                if (_is_broken_tablet(it.second->tablet_id())) {
+                if (_is_broken_tablet(writer->tablet_id())) {
                     LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is 
broken but not cancelled"
-                                 << ", tablet_id=" << it.first << ", 
transaction_id=" << _txn_id;
+                                 << ", tablet_id=" << tablet_id << ", 
transaction_id=" << _txn_id;
                     continue;
                 }
-                need_wait_writers.insert(it.second);
+                
need_wait_writers.insert(static_cast<DeltaWriter*>(writer.get()));
             } else {
-                auto st = it.second->cancel();
+                auto st = writer->cancel();
                 if (!st.ok()) {
-                    LOG(WARNING) << "cancel tablet writer failed, tablet_id=" 
<< it.first
+                    LOG(WARNING) << "cancel tablet writer failed, tablet_id=" 
<< tablet_id
                                  << ", transaction_id=" << _txn_id;
                     // just skip this tablet(writer) and continue to close 
others
                     continue;
                 }
-                VLOG_PROGRESS << "cancel tablet writer successfully, 
tablet_id=" << it.first
+                VLOG_PROGRESS << "cancel tablet writer successfully, 
tablet_id=" << tablet_id
                               << ", transaction_id=" << _txn_id;
             }
         }
 
-        _write_single_replica = write_single_replica;
+        _write_single_replica = req.write_single_replica();
 
         // 2. wait all writer finished flush.
-        for (auto writer : need_wait_writers) {
-            static_cast<void>(writer->wait_flush());
+        for (auto* writer : need_wait_writers) {
+            RETURN_IF_ERROR((writer->wait_flush()));
         }
 
         // 3. build rowset
@@ -289,23 +305,23 @@ Status TabletsChannel::close(
         }
 
         // 5. commit all writers
-        for (auto writer : need_wait_writers) {
+
+        for (auto* writer : need_wait_writers) {
             PSlaveTabletNodes slave_nodes;
-            if (write_single_replica) {
-                slave_nodes = slave_tablet_nodes.at(writer->tablet_id());
-            }
+
             // close may return failed, but no need to handle it here.
             // tablet_vec will only contains success tablet, and then let FE 
judge it.
-            _commit_txn(writer, tablet_vec, tablet_errors, slave_nodes, 
write_single_replica);
+            _commit_txn(writer, req, res);
         }
 
-        if (write_single_replica) {
+        if (_write_single_replica) {
+            auto* success_slave_tablet_node_ids = 
res->mutable_success_slave_tablet_node_ids();
             // The operation waiting for all slave replicas to complete must 
end before the timeout,
             // so that there is enough time to collect completed replica. 
Otherwise, the task may
             // timeout and fail even though most of the replicas are 
completed. Here we set 0.9
             // times the timeout as the maximum waiting time.
             SCOPED_TIMER(_slave_replica_timer);
-            while (need_wait_writers.size() > 0 &&
+            while (!need_wait_writers.empty() &&
                    (time(nullptr) - parent->last_updated_time()) < 
(parent->timeout() * 0.9)) {
                 std::set<DeltaWriter*>::iterator it;
                 for (it = need_wait_writers.begin(); it != 
need_wait_writers.end();) {
@@ -318,22 +334,22 @@ Status TabletsChannel::close(
                 }
                 std::this_thread::sleep_for(std::chrono::milliseconds(100));
             }
-            for (auto writer : need_wait_writers) {
+            for (auto* writer : need_wait_writers) {
                 
writer->add_finished_slave_replicas(success_slave_tablet_node_ids);
             }
-            
StorageEngine::instance()->txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
+            _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
         }
     }
     return Status::OK();
 }
 
-void TabletsChannel::_commit_txn(DeltaWriter* writer,
-                                 
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
-                                 
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
-                                 PSlaveTabletNodes slave_tablet_nodes,
-                                 const bool write_single_replica) {
-    Status st = writer->commit_txn(slave_tablet_nodes, write_single_replica);
-    if (st.ok()) {
+void TabletsChannel::_commit_txn(DeltaWriter* writer, const 
PTabletWriterAddBlockRequest& req,
+                                 PTabletWriterAddBlockResult* res) {
+    Status st = writer->commit_txn(_write_single_replica
+                                           ? 
req.slave_tablet_nodes().at(writer->tablet_id())
+                                           : PSlaveTabletNodes {});
+    if (st.ok()) [[likely]] {
+        auto* tablet_vec = res->mutable_tablet_vec();
         PTabletInfo* tablet_info = tablet_vec->Add();
         tablet_info->set_tablet_id(writer->tablet_id());
         // unused required field.
@@ -341,11 +357,11 @@ void TabletsChannel::_commit_txn(DeltaWriter* writer,
         tablet_info->set_received_rows(writer->total_received_rows());
         tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
     } else {
-        _add_error_tablet(tablet_errors, writer->tablet_id(), st);
+        _add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(), 
st);
     }
 }
 
-void TabletsChannel::_add_error_tablet(
+void BaseTabletsChannel::_add_error_tablet(
         google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, 
int64_t tablet_id,
         Status error) const {
     PTabletError* tablet_error = tablet_errors->Add();
@@ -355,7 +371,7 @@ void TabletsChannel::_add_error_tablet(
                   << "err msg " << error;
 }
 
-void TabletsChannel::refresh_profile() {
+void BaseTabletsChannel::refresh_profile() {
     int64_t write_mem_usage = 0;
     int64_t flush_mem_usage = 0;
     int64_t max_tablet_mem_usage = 0;
@@ -363,10 +379,10 @@ void TabletsChannel::refresh_profile() {
     int64_t max_tablet_flush_mem_usage = 0;
     {
         std::lock_guard<SpinLock> l(_tablet_writers_lock);
-        for (auto& it : _tablet_writers) {
-            int64_t write_mem = it.second->mem_consumption(MemType::WRITE);
+        for (auto&& [tablet_id, writer] : _tablet_writers) {
+            int64_t write_mem = writer->mem_consumption(MemType::WRITE);
             write_mem_usage += write_mem;
-            int64_t flush_mem = it.second->mem_consumption(MemType::FLUSH);
+            int64_t flush_mem = writer->mem_consumption(MemType::FLUSH);
             flush_mem_usage += flush_mem;
             if (write_mem > max_tablet_write_mem_usage) {
                 max_tablet_write_mem_usage = write_mem;
@@ -387,7 +403,7 @@ void TabletsChannel::refresh_profile() {
     COUNTER_SET(_max_tablet_flush_memory_usage_counter, 
max_tablet_flush_mem_usage);
 }
 
-Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& 
request) {
+Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& 
request) {
     std::vector<SlotDescriptor*>* index_slots = nullptr;
     int32_t schema_hash = 0;
     for (auto& index : _schema->indexes()) {
@@ -428,24 +444,17 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& request
                 .load_id = request.id(),
                 .tuple_desc = _tuple_desc,
                 .slots = index_slots,
-                .table_schema_param = _schema,
+                .table_schema_param = _schema.get(),
                 .is_high_priority = _is_high_priority,
                 .write_file_cache = request.write_file_cache(),
         };
 
-        DeltaWriter* writer = nullptr;
-        auto st = DeltaWriter::open(&wrequest, &writer, _profile, _load_id);
-        if (!st.ok()) {
-            auto err_msg = fmt::format(
-                    "open delta writer failed, tablet_id={}"
-                    ", txn_id={}, partition_id={}, err={}",
-                    tablet.tablet_id(), _txn_id, tablet.partition_id(), 
st.to_string());
-            LOG(WARNING) << err_msg;
-            return Status::InternalError(err_msg);
-        }
+        // TODO(plat1ko): CloudDeltaWriter
+        auto writer = 
std::make_unique<DeltaWriter>(*StorageEngine::instance(), &wrequest, _profile,
+                                                    _load_id);
         {
             std::lock_guard<SpinLock> l(_tablet_writers_lock);
-            _tablet_writers.emplace(tablet.tablet_id(), writer);
+            _tablet_writers.emplace(tablet.tablet_id(), std::move(writer));
         }
     }
     _s_tablet_writer_count += _tablet_writers.size();
@@ -453,7 +462,7 @@ Status TabletsChannel::_open_all_writers(const 
PTabletWriterOpenRequest& request
     return Status::OK();
 }
 
-Status TabletsChannel::cancel() {
+Status BaseTabletsChannel::cancel() {
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
         return _close_status;
@@ -462,8 +471,14 @@ Status TabletsChannel::cancel() {
         static_cast<void>(it.second->cancel());
     }
     _state = kFinished;
+
+    return Status::OK();
+}
+
+Status TabletsChannel::cancel() {
+    RETURN_IF_ERROR(BaseTabletsChannel::cancel());
     if (_write_single_replica) {
-        
StorageEngine::instance()->txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
+        _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
     }
     return Status::OK();
 }
@@ -479,8 +494,8 @@ std::ostream& operator<<(std::ostream& os, const 
TabletsChannelKey& key) {
     return os;
 }
 
-Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
-                                 PTabletWriterAddBlockResult* response) {
+Status BaseTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& 
request,
+                                     PTabletWriterAddBlockResult* response) {
     SCOPED_TIMER(_add_batch_timer);
     int64_t cur_seq = 0;
     _add_batch_number_counter->update(1);
@@ -523,14 +538,14 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
             << ", tablet_ids_size: " << request.tablet_ids_size();
 
     auto write_tablet_data = [&](uint32_t tablet_id,
-                                 std::function<Status(DeltaWriter * writer)> 
write_func) {
+                                 std::function<Status(BaseDeltaWriter * 
writer)> write_func) {
         google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
                 response->mutable_tablet_errors();
         auto tablet_writer_it = _tablet_writers.find(tablet_id);
         if (tablet_writer_it == _tablet_writers.end()) {
             return Status::InternalError("unknown tablet to append data, 
tablet={}", tablet_id);
         }
-        Status st = write_func(tablet_writer_it->second);
+        Status st = write_func(tablet_writer_it->second.get());
         if (!st.ok()) {
             auto err_msg =
                     fmt::format("tablet writer write failed, tablet_id={}, 
txn_id={}, err={}",
@@ -549,15 +564,16 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
 
     if (request.is_single_tablet_block()) {
         SCOPED_TIMER(_write_block_timer);
-        RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0), 
[&](DeltaWriter* writer) {
+        RETURN_IF_ERROR(write_tablet_data(request.tablet_ids(0), 
[&](BaseDeltaWriter* writer) {
             return writer->append(&send_data);
         }));
     } else {
         SCOPED_TIMER(_write_block_timer);
         for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
-            RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, 
[&](DeltaWriter* writer) {
-                return writer->write(&send_data, tablet_to_rowidxs_it.second);
-            }));
+            RETURN_IF_ERROR(
+                    write_tablet_data(tablet_to_rowidxs_it.first, 
[&](BaseDeltaWriter* writer) {
+                        return writer->write(&send_data, 
tablet_to_rowidxs_it.second);
+                    }));
         }
     }
 
@@ -568,12 +584,12 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
     return Status::OK();
 }
 
-void TabletsChannel::_add_broken_tablet(int64_t tablet_id) {
+void BaseTabletsChannel::_add_broken_tablet(int64_t tablet_id) {
     std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock);
     _broken_tablets.insert(tablet_id);
 }
 
-bool TabletsChannel::_is_broken_tablet(int64_t tablet_id) {
+bool BaseTabletsChannel::_is_broken_tablet(int64_t tablet_id) {
     std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
     return _broken_tablets.find(tablet_id) != _broken_tablets.end();
 }
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index b8e3de0584b..2f9ec9d51a9 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -17,7 +17,6 @@
 
 #pragma once
 
-#include <gen_cpp/internal_service.pb.h>
 #include <glog/logging.h>
 
 #include <atomic>
@@ -38,16 +37,14 @@
 #include "util/spinlock.h"
 #include "util/uid_util.h"
 
-namespace google {
-namespace protobuf {
+namespace google::protobuf {
 template <typename Element>
 class RepeatedField;
 template <typename Key, typename T>
 class Map;
 template <typename T>
 class RepeatedPtrField;
-} // namespace protobuf
-} // namespace google
+} // namespace google::protobuf
 
 namespace doris {
 class PSlaveTabletNodes;
@@ -55,9 +52,12 @@ class PSuccessSlaveTabletNodeIds;
 class PTabletError;
 class PTabletInfo;
 class PTabletWriterOpenRequest;
+class PTabletWriterAddBlockRequest;
+class PTabletWriterAddBlockResult;
 class PUniqueId;
 class TupleDescriptor;
 class OpenPartitionRequest;
+class StorageEngine;
 
 struct TabletsChannelKey {
     UniqueId id;
@@ -65,7 +65,7 @@ struct TabletsChannelKey {
 
     TabletsChannelKey(const PUniqueId& pid, int64_t index_id_) : id(pid), 
index_id(index_id_) {}
 
-    ~TabletsChannelKey() noexcept {}
+    ~TabletsChannelKey() noexcept = default;
 
     bool operator==(const TabletsChannelKey& rhs) const noexcept {
         return index_id == rhs.index_id && id == rhs.id;
@@ -76,18 +76,18 @@ struct TabletsChannelKey {
 
 std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key);
 
-class DeltaWriter;
+class BaseDeltaWriter;
 class MemTableWriter;
 class OlapTableSchemaParam;
 class LoadChannel;
 
 // Write channel for a particular (load, index).
-class TabletsChannel {
+class BaseTabletsChannel {
 public:
-    TabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool 
is_high_priority,
-                   RuntimeProfile* profile);
+    BaseTabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, 
bool is_high_priority,
+                       RuntimeProfile* profile);
 
-    ~TabletsChannel();
+    virtual ~BaseTabletsChannel();
 
     Status open(const PTabletWriterOpenRequest& request);
     // open + open writers
@@ -101,38 +101,25 @@ public:
     // If all senders are closed, close this channel, set '*finished' to true, 
update 'tablet_vec'
     // to include all tablets written in this channel.
     // no-op when this channel has been closed or cancelled
-    Status
-    close(LoadChannel* parent, int sender_id, int64_t backend_id, bool* 
finished,
-          const google::protobuf::RepeatedField<int64_t>& partition_ids,
-          google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
-          google::protobuf::RepeatedPtrField<PTabletError>* tablet_error,
-          const google::protobuf::Map<int64_t, PSlaveTabletNodes>& 
slave_tablet_nodes,
-          google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* 
success_slave_tablet_node_ids,
-          const bool write_single_replica);
+    virtual Status close(LoadChannel* parent, const 
PTabletWriterAddBlockRequest& req,
+                         PTabletWriterAddBlockResult* res, bool* finished) = 0;
 
     // no-op when this channel has been closed or cancelled
-    Status cancel();
+    virtual Status cancel();
 
     void refresh_profile();
 
-private:
-    template <typename Request>
-    Status _get_current_seq(int64_t& cur_seq, const Request& request);
+protected:
+    Status _get_current_seq(int64_t& cur_seq, const 
PTabletWriterAddBlockRequest& request);
 
     // open all writer
     Status _open_all_writers(const PTabletWriterOpenRequest& request);
 
-    // deal with DeltaWriter commit_txn(), add tablet to list for return.
-    void _commit_txn(DeltaWriter* writer,
-                     google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec,
-                     google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_errors,
-                     PSlaveTabletNodes slave_tablet_nodes, const bool 
write_single_replica);
-
     void _add_broken_tablet(int64_t tablet_id);
     void _add_error_tablet(google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_errors,
                            int64_t tablet_id, Status error) const;
     bool _is_broken_tablet(int64_t tablet_id);
-    void _init_profile(RuntimeProfile* profile);
+    virtual void _init_profile(RuntimeProfile* profile);
 
     // id of this load channel
     TabletsChannelKey _key;
@@ -154,7 +141,7 @@ private:
     // initialized in open function
     int64_t _txn_id = -1;
     int64_t _index_id = -1;
-    OlapTableSchemaParam* _schema = nullptr;
+    std::unique_ptr<OlapTableSchemaParam> _schema;
 
     TupleDescriptor* _tuple_desc = nullptr;
 
@@ -167,7 +154,7 @@ private:
     Status _close_status;
 
     // tablet_id -> TabletChannel
-    std::unordered_map<int64_t, DeltaWriter*> _tablet_writers;
+    std::unordered_map<int64_t, std::unique_ptr<BaseDeltaWriter>> 
_tablet_writers;
     // broken tablet ids.
     // If a tablet write fails, it's id will be added to this set.
     // So that following batch will not handle this tablet anymore.
@@ -183,8 +170,6 @@ private:
 
     bool _is_high_priority = false;
 
-    bool _write_single_replica = false;
-
     RuntimeProfile* _profile = nullptr;
     RuntimeProfile::Counter* _add_batch_number_counter = nullptr;
     RuntimeProfile::HighWaterMarkCounter* _memory_usage_counter = nullptr;
@@ -193,28 +178,36 @@ private:
     RuntimeProfile::HighWaterMarkCounter* _max_tablet_memory_usage_counter = 
nullptr;
     RuntimeProfile::HighWaterMarkCounter* 
_max_tablet_write_memory_usage_counter = nullptr;
     RuntimeProfile::HighWaterMarkCounter* 
_max_tablet_flush_memory_usage_counter = nullptr;
-    RuntimeProfile::Counter* _slave_replica_timer = nullptr;
     RuntimeProfile::Counter* _add_batch_timer = nullptr;
     RuntimeProfile::Counter* _write_block_timer = nullptr;
     RuntimeProfile::Counter* _incremental_open_timer = nullptr;
 };
 
-template <typename Request>
-Status TabletsChannel::_get_current_seq(int64_t& cur_seq, const Request& 
request) {
-    std::lock_guard<std::mutex> l(_lock);
-    if (_state != kOpened) {
-        return _state == kFinished ? _close_status
-                                   : Status::InternalError("TabletsChannel {} 
state: {}",
-                                                           _key.to_string(), 
_state);
-    }
-    cur_seq = _next_seqs[request.sender_id()];
-    // check packet
-    if (request.packet_seq() > cur_seq) {
-        LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
-                     << ", recept_seq=" << request.packet_seq();
-        return Status::InternalError("lost data packet");
-    }
-    return Status::OK();
-}
+class DeltaWriter;
+
+// `StorageEngine` mixin for `BaseTabletsChannel`
+class TabletsChannel final : public BaseTabletsChannel {
+public:
+    TabletsChannel(StorageEngine& engine, const TabletsChannelKey& key, const 
UniqueId& load_id,
+                   bool is_high_priority, RuntimeProfile* profile);
+
+    ~TabletsChannel() override;
+
+    Status close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req,
+                 PTabletWriterAddBlockResult* res, bool* finished) override;
+
+    Status cancel() override;
+
+private:
+    void _init_profile(RuntimeProfile* profile) override;
+
+    // deal with DeltaWriter commit_txn(), add tablet to list for return.
+    void _commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& 
req,
+                     PTabletWriterAddBlockResult* res);
+
+    StorageEngine& _engine;
+    bool _write_single_replica = false;
+    RuntimeProfile::Counter* _slave_replica_timer = nullptr;
+};
 
 } // namespace doris
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 8602af091a0..9c15302af6c 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -27,7 +27,7 @@
 #include <ostream>
 #include <string>
 
-#include "cloud/config.h"
+// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/status.h"
 #include "exprs/function_filter.h"
@@ -62,8 +62,10 @@ BlockReader::~BlockReader() {
 
 Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) {
     auto res = (this->*_next_block_func)(block, eof);
-    if (!res.ok() && !res.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode) 
[[unlikely]] {
-        static_cast<Tablet*>(_tablet.get())->report_error(res);
+    if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+        if (!res.ok()) [[unlikely]] {
+            static_cast<Tablet*>(_tablet.get())->report_error(res);
+        }
     }
     return res;
 }
@@ -230,11 +232,10 @@ Status BlockReader::init(const ReaderParams& read_params) 
{
     }
 
     auto status = _init_collect_iter(read_params);
-    if (!status.ok()) {
-        if (!status.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode) 
[[unlikely]] {
+    if (!status.ok()) [[unlikely]] {
+        if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) {
             static_cast<Tablet*>(_tablet.get())->report_error(status);
         }
-
         return status;
     }
 
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index 52d35283b50..aa16c91cbd2 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -24,7 +24,6 @@
 #include <boost/iterator/iterator_facade.hpp>
 #include <ostream>
 
-#include "cloud/config.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
 #include "olap/rowset/rowset.h"
@@ -53,8 +52,10 @@ VerticalBlockReader::~VerticalBlockReader() {
 
 Status VerticalBlockReader::next_block_with_aggregation(Block* block, bool* 
eof) {
     auto res = (this->*_next_block_func)(block, eof);
-    if (!res.ok() && !res.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode) 
[[unlikely]] {
-        static_cast<Tablet*>(_tablet.get())->report_error(res);
+    if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) {
+        if (!res.ok()) [[unlikely]] {
+            static_cast<Tablet*>(_tablet.get())->report_error(res);
+        }
     }
     return res;
 }
@@ -212,8 +213,8 @@ Status VerticalBlockReader::init(const ReaderParams& 
read_params) {
     RETURN_IF_ERROR(TabletReader::init(read_params));
 
     auto status = _init_collect_iter(read_params);
-    if (!status.ok()) {
-        if (!status.is<ErrorCode::END_OF_FILE>() && !config::cloud_mode) 
[[unlikely]] {
+    if (!status.ok()) [[unlikely]] {
+        if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) {
             static_cast<Tablet*>(_tablet.get())->report_error(status);
         }
         return status;
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index 7db9e129e58..f2274a9a76a 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -43,6 +43,7 @@
 #include "olap/options.h"
 #include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/segment_v2/segment.h"
+#include "olap/rowset_builder.h"
 #include "olap/schema.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
@@ -500,19 +501,18 @@ TEST_F(TestDeltaWriter, open) {
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = true;
     write_req.table_schema_param = &param;
-    DeltaWriter* delta_writer = nullptr;
 
     // test vec delta writer
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
-    static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, 
profile.get(), TUniqueId()));
+    auto delta_writer =
+            std::make_unique<DeltaWriter>(*k_engine, &write_req, 
profile.get(), TUniqueId {});
     EXPECT_NE(delta_writer, nullptr);
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
     res = delta_writer->build_rowset();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+    res = delta_writer->commit_txn(PSlaveTabletNodes());
     EXPECT_EQ(Status::OK(), res);
-    SAFE_DELETE(delta_writer);
 
     res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     EXPECT_EQ(Status::OK(), res);
@@ -547,10 +547,9 @@ TEST_F(TestDeltaWriter, vec_write) {
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = false;
     write_req.table_schema_param = &param;
-    DeltaWriter* delta_writer = nullptr;
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
-    static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, 
profile.get(), TUniqueId()));
-    ASSERT_NE(delta_writer, nullptr);
+    auto delta_writer =
+            std::make_unique<DeltaWriter>(*k_engine, &write_req, 
profile.get(), TUniqueId {});
 
     vectorized::Block block;
     for (const auto& slot_desc : tuple_desc->slots()) {
@@ -647,7 +646,7 @@ TEST_F(TestDeltaWriter, vec_write) {
     ASSERT_TRUE(res.ok());
     res = delta_writer->wait_calc_delete_bitmap();
     ASSERT_TRUE(res.ok());
-    res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+    res = delta_writer->commit_txn(PSlaveTabletNodes());
     ASSERT_TRUE(res.ok());
 
     // publish version success
@@ -680,7 +679,6 @@ TEST_F(TestDeltaWriter, vec_write) {
 
     res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     ASSERT_TRUE(res.ok());
-    delete delta_writer;
 }
 
 TEST_F(TestDeltaWriter, vec_sequence_col) {
@@ -712,10 +710,9 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = false;
     write_req.table_schema_param = &param;
-    DeltaWriter* delta_writer = nullptr;
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
-    static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, 
profile.get(), TUniqueId()));
-    ASSERT_NE(delta_writer, nullptr);
+    auto delta_writer =
+            std::make_unique<DeltaWriter>(*k_engine, &write_req, 
profile.get(), TUniqueId {});
 
     vectorized::Block block;
     for (const auto& slot_desc : tuple_desc->slots()) {
@@ -742,7 +739,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     ASSERT_TRUE(res.ok());
     res = delta_writer->wait_calc_delete_bitmap();
     ASSERT_TRUE(res.ok());
-    res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+    res = delta_writer->commit_txn(PSlaveTabletNodes());
     ASSERT_TRUE(res.ok());
 
     // publish version success
@@ -798,7 +795,6 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
 
     res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     ASSERT_TRUE(res.ok());
-    delete delta_writer;
 }
 
 TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
@@ -829,16 +825,14 @@ TEST_F(TestDeltaWriter, 
vec_sequence_col_concurrent_write) {
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = false;
     write_req.table_schema_param = &param;
-    DeltaWriter* delta_writer1 = nullptr;
-    DeltaWriter* delta_writer2 = nullptr;
     std::unique_ptr<RuntimeProfile> profile1;
     profile1 = std::make_unique<RuntimeProfile>("LoadChannels1");
     std::unique_ptr<RuntimeProfile> profile2;
     profile2 = std::make_unique<RuntimeProfile>("LoadChannels2");
-    static_cast<void>(DeltaWriter::open(&write_req, &delta_writer1, 
profile1.get(), TUniqueId()));
-    static_cast<void>(DeltaWriter::open(&write_req, &delta_writer2, 
profile2.get(), TUniqueId()));
-    ASSERT_NE(delta_writer1, nullptr);
-    ASSERT_NE(delta_writer2, nullptr);
+    auto delta_writer1 =
+            std::make_unique<DeltaWriter>(*k_engine, &write_req, 
profile1.get(), TUniqueId {});
+    auto delta_writer2 =
+            std::make_unique<DeltaWriter>(*k_engine, &write_req, 
profile2.get(), TUniqueId {});
 
     // write data in delta writer 1
     {
@@ -867,7 +861,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
         ASSERT_TRUE(res.ok());
         res = delta_writer1->wait_calc_delete_bitmap();
         ASSERT_TRUE(res.ok());
-        res = delta_writer1->commit_txn(PSlaveTabletNodes(), false);
+        res = delta_writer1->commit_txn(PSlaveTabletNodes());
         ASSERT_TRUE(res.ok());
     }
     // write data in delta writer 2
@@ -943,12 +937,12 @@ TEST_F(TestDeltaWriter, 
vec_sequence_col_concurrent_write) {
 
         // verify that delete bitmap calculated correctly
         // since the delete bitmap not published, versions are 0
-        auto delete_bitmap = delta_writer2->get_delete_bitmap();
+        auto delete_bitmap = 
delta_writer2->_rowset_builder->get_delete_bitmap();
         ASSERT_TRUE(delete_bitmap->contains({rowset1->rowset_id(), 0, 0}, 0));
         // We can't get the rowset id of rowset2 now, will check the delete 
bitmap
         // contains row 0 of rowset2 at L929.
 
-        res = delta_writer2->commit_txn(PSlaveTabletNodes(), false);
+        res = delta_writer2->commit_txn(PSlaveTabletNodes());
         ASSERT_TRUE(res.ok());
 
         Version version;
@@ -1043,7 +1037,5 @@ TEST_F(TestDeltaWriter, 
vec_sequence_col_concurrent_write) {
 
     res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     ASSERT_TRUE(res.ok());
-    delete delta_writer1;
-    delete delta_writer2;
 }
 } // namespace doris
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
index 5107aafb552..87b30ae4da3 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -196,17 +196,15 @@ TEST_F(TestEngineStorageMigrationTask, 
write_and_migration) {
     write_req.is_high_priority = false;
     write_req.table_schema_param = &param;
 
-    DeltaWriter* delta_writer = nullptr;
-
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
-    static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, 
profile.get()));
-    EXPECT_NE(delta_writer, nullptr);
+    auto delta_writer =
+            std::make_unique<DeltaWriter>(*k_engine, &write_req, 
profile.get(), TUniqueId {});
 
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
     res = delta_writer->build_rowset();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+    res = delta_writer->commit_txn(PSlaveTabletNodes());
     EXPECT_EQ(Status::OK(), res);
 
     // publish version success
@@ -276,7 +274,6 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
 
     res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     EXPECT_EQ(Status::OK(), res);
-    delete delta_writer;
 }
 
 } // namespace doris
diff --git a/be/test/olap/memtable_memory_limiter_test.cpp 
b/be/test/olap/memtable_memory_limiter_test.cpp
index 9771fb324c6..11e791551b1 100644
--- a/be/test/olap/memtable_memory_limiter_test.cpp
+++ b/be/test/olap/memtable_memory_limiter_test.cpp
@@ -140,9 +140,9 @@ TEST_F(MemTableMemoryLimiterTest, 
handle_memtable_flush_test) {
     write_req.slots = &(tuple_desc->slots());
     write_req.is_high_priority = false;
     write_req.table_schema_param = &param;
-    DeltaWriter* delta_writer = nullptr;
     profile = std::make_unique<RuntimeProfile>("MemTableMemoryLimiterTest");
-    static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, 
profile.get(), TUniqueId()));
+    auto delta_writer =
+            std::make_unique<DeltaWriter>(*_engine, &write_req, profile.get(), 
TUniqueId {});
     ASSERT_NE(delta_writer, nullptr);
     auto mem_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
 
@@ -174,10 +174,9 @@ TEST_F(MemTableMemoryLimiterTest, 
handle_memtable_flush_test) {
     EXPECT_EQ(Status::OK(), res);
     res = delta_writer->build_rowset();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+    res = delta_writer->commit_txn(PSlaveTabletNodes());
     EXPECT_EQ(Status::OK(), res);
     res = _engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
     EXPECT_EQ(Status::OK(), res);
-    delete delta_writer;
 }
 } // namespace doris
diff --git a/be/test/olap/tablet_cooldown_test.cpp 
b/be/test/olap/tablet_cooldown_test.cpp
index 124380220f8..e19ba3bd32c 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -361,9 +361,9 @@ void createTablet(TabletSharedPtr* tablet, int64_t 
replica_id, int32_t schema_ha
     write_req.is_high_priority = false;
     write_req.table_schema_param = &param;
 
-    DeltaWriter* delta_writer = nullptr;
     profile = std::make_unique<RuntimeProfile>("LoadChannels");
-    static_cast<void>(DeltaWriter::open(&write_req, &delta_writer, 
profile.get()));
+    auto delta_writer =
+            std::make_unique<DeltaWriter>(*k_engine, &write_req, 
profile.get(), TUniqueId {});
     ASSERT_NE(delta_writer, nullptr);
 
     vectorized::Block block;
@@ -397,9 +397,8 @@ void createTablet(TabletSharedPtr* tablet, int64_t 
replica_id, int32_t schema_ha
     ASSERT_EQ(Status::OK(), st);
     st = delta_writer->build_rowset();
     ASSERT_EQ(Status::OK(), st);
-    st = delta_writer->commit_txn(PSlaveTabletNodes(), false);
+    st = delta_writer->commit_txn(PSlaveTabletNodes());
     ASSERT_EQ(Status::OK(), st);
-    delete delta_writer;
 
     // publish version success
     *tablet = k_engine->tablet_manager()->get_tablet(write_req.tablet_id, 
write_req.schema_hash);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to