This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 5e28fde7db [cherry-pick](merge-on-write) support concurrent delete 
bitmap calc while close_wait (#21488) (#22267)
5e28fde7db is described below

commit 5e28fde7db94b1a3b708a08f96b455ccbfa11ae8
Author: zhannngchen <48427519+zhannngc...@users.noreply.github.com>
AuthorDate: Thu Jul 27 10:00:51 2023 +0800

    [cherry-pick](merge-on-write) support concurrent delete bitmap calc while 
close_wait (#21488) (#22267)
    
    cherry-pick #21488
---
 be/src/olap/calc_delete_bitmap_executor.cpp        |  91 ++++++
 be/src/olap/calc_delete_bitmap_executor.h          |  91 ++++++
 be/src/olap/delta_writer.cpp                       | 107 ++++---
 be/src/olap/delta_writer.h                         |  11 +-
 be/src/olap/full_compaction.cpp                    |   3 +-
 be/src/olap/memtable.cpp                           |   2 +-
 be/src/olap/olap_server.cpp                        |   5 -
 be/src/olap/storage_engine.cpp                     |   7 +-
 be/src/olap/storage_engine.h                       |   9 +-
 be/src/olap/tablet.cpp                             |  75 +++--
 be/src/olap/tablet.h                               |   5 +-
 be/src/runtime/tablets_channel.cpp                 |  62 +++-
 be/src/runtime/tablets_channel.h                   |   8 +-
 be/test/olap/delta_writer_test.cpp                 | 338 ++++++++++++++++++---
 .../olap/engine_storage_migration_task_test.cpp    |   4 +-
 be/test/olap/tablet_cooldown_test.cpp              |   4 +-
 16 files changed, 664 insertions(+), 158 deletions(-)

diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp 
b/be/src/olap/calc_delete_bitmap_executor.cpp
new file mode 100644
index 0000000000..284c03c985
--- /dev/null
+++ b/be/src/olap/calc_delete_bitmap_executor.cpp
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/calc_delete_bitmap_executor.h"
+
+#include <gen_cpp/olap_file.pb.h>
+
+#include <ostream>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "olap/memtable.h"
+#include "olap/tablet.h"
+#include "util/time.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+Status CalcDeleteBitmapToken::submit(TabletSharedPtr tablet, RowsetSharedPtr 
cur_rowset,
+                                     const segment_v2::SegmentSharedPtr& 
cur_segment,
+                                     const std::vector<RowsetSharedPtr>& 
target_rowsets,
+                                     int64_t end_version, RowsetWriter* 
rowset_writer) {
+    {
+        std::shared_lock rlock(_lock);
+        RETURN_IF_ERROR(_status);
+    }
+
+    DeleteBitmapPtr bitmap = 
std::make_shared<DeleteBitmap>(tablet->tablet_id());
+    {
+        std::lock_guard wlock(_lock);
+        _delete_bitmaps.push_back(bitmap);
+    }
+    return _thread_token->submit_func([=, this]() {
+        auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment, 
target_rowsets,
+                                                     bitmap, end_version, 
rowset_writer);
+        if (!st.ok()) {
+            LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: "
+                         << tablet->tablet_id() << " rowset: " << 
cur_rowset->rowset_id()
+                         << " seg_id: " << cur_segment->id() << " version: " 
<< end_version;
+            std::lock_guard wlock(_lock);
+            if (_status.ok()) {
+                _status = st;
+            }
+        }
+    });
+}
+
+Status CalcDeleteBitmapToken::wait() {
+    _thread_token->wait();
+    // all tasks complete here, don't need lock;
+    return _status;
+}
+
+Status CalcDeleteBitmapToken::get_delete_bitmap(DeleteBitmapPtr res_bitmap) {
+    std::lock_guard wlock(_lock);
+    RETURN_IF_ERROR(_status);
+
+    for (auto bitmap : _delete_bitmaps) {
+        res_bitmap->merge(*bitmap);
+    }
+    _delete_bitmaps.clear();
+    return Status::OK();
+}
+
+void CalcDeleteBitmapExecutor::init() {
+    ThreadPoolBuilder("TabletCalcDeleteBitmapThreadPool")
+            .set_min_threads(1)
+            .set_max_threads(config::calc_delete_bitmap_max_thread)
+            .build(&_thread_pool);
+}
+
+std::unique_ptr<CalcDeleteBitmapToken> 
CalcDeleteBitmapExecutor::create_token() {
+    return std::make_unique<CalcDeleteBitmapToken>(
+            _thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT));
+}
+
+} // namespace doris
diff --git a/be/src/olap/calc_delete_bitmap_executor.h 
b/be/src/olap/calc_delete_bitmap_executor.h
new file mode 100644
index 0000000000..d2c392a04d
--- /dev/null
+++ b/be/src/olap/calc_delete_bitmap_executor.h
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <atomic>
+#include <iosfwd>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segment_v2/segment.h"
+#include "util/threadpool.h"
+
+namespace doris {
+
+class DataDir;
+class Tablet;
+enum RowsetTypePB : int;
+using TabletSharedPtr = std::shared_ptr<Tablet>;
+
+// A thin wrapper of ThreadPoolToken to submit calc delete bitmap task.
+// Usage:
+// 1. create a token
+// 2. submit delete bitmap calculate tasks
+// 3. wait all tasks complete
+// 4. call `get_delete_bitmap()` to get the result of all tasks
+class CalcDeleteBitmapToken {
+public:
+    explicit CalcDeleteBitmapToken(std::unique_ptr<ThreadPoolToken> 
thread_token)
+            : _thread_token(std::move(thread_token)), _status(Status::OK()) {}
+
+    Status submit(TabletSharedPtr tablet, RowsetSharedPtr cur_rowset,
+                  const segment_v2::SegmentSharedPtr& cur_segment,
+                  const std::vector<RowsetSharedPtr>& target_rowsets, int64_t 
end_version,
+                  RowsetWriter* rowset_writer);
+
+    // wait all tasks in token to be completed.
+    Status wait();
+
+    void cancel() { _thread_token->shutdown(); }
+
+    Status get_delete_bitmap(DeleteBitmapPtr res_bitmap);
+
+private:
+    std::unique_ptr<ThreadPoolToken> _thread_token;
+
+    std::shared_mutex _lock;
+    std::vector<DeleteBitmapPtr> _delete_bitmaps;
+
+    // Records the current status of the calc delete bitmap job.
+    // Note: Once its value is set to Failed, it cannot return to SUCCESS.
+    Status _status;
+};
+
+// CalcDeleteBitmapExecutor is responsible for calc delete bitmap concurrently.
+// It encapsulate a ThreadPool to handle all tasks.
+class CalcDeleteBitmapExecutor {
+public:
+    CalcDeleteBitmapExecutor() {}
+    ~CalcDeleteBitmapExecutor() { _thread_pool->shutdown(); }
+
+    // init should be called after storage engine is opened,
+    void init();
+
+    std::unique_ptr<CalcDeleteBitmapToken> create_token();
+
+private:
+    std::unique_ptr<ThreadPool> _thread_pool;
+};
+
+} // namespace doris
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 197ecb417e..dd5b3c74f6 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -125,6 +125,10 @@ DeltaWriter::~DeltaWriter() {
         }
     }
 
+    if (_calc_delete_bitmap_token != nullptr) {
+        _calc_delete_bitmap_token->cancel();
+    }
+
     if (_tablet != nullptr) {
         _tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
                                                 
_rowset_writer->rowset_id().to_string());
@@ -225,6 +229,7 @@ Status DeltaWriter::init() {
     bool should_serial = false;
     
RETURN_IF_ERROR(_storage_engine->memtable_flush_executor()->create_flush_token(
             &_flush_token, _rowset_writer->type(), should_serial, 
_req.is_high_priority));
+    _calc_delete_bitmap_token = 
_storage_engine->calc_delete_bitmap_executor()->create_token();
 
     _is_init = true;
     return Status::OK();
@@ -411,12 +416,10 @@ Status DeltaWriter::close() {
     }
 }
 
-Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
-                               const bool write_single_replica) {
-    SCOPED_TIMER(_close_wait_timer);
+Status DeltaWriter::build_rowset() {
     std::lock_guard<std::mutex> l(_lock);
     DCHECK(_is_init)
-            << "delta writer is supposed be to initialized before close_wait() 
being called";
+            << "delta writer is supposed be to initialized before 
build_rowset() being called";
 
     if (_is_cancelled) {
         return _cancel_status;
@@ -447,43 +450,68 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& 
slave_tablet_nodes,
     if (_cur_rowset == nullptr) {
         return Status::Error<MEM_ALLOC_FAILED>("fail to build rowset");
     }
+    return Status::OK();
+}
 
-    if (_tablet->enable_unique_key_merge_on_write()) {
-        // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-        if (_tablet->tablet_state() == TABLET_NOTREADY &&
-            SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
-            LOG(INFO) << "tablet is under alter process, delete bitmap will be 
calculated later, "
-                         "tablet_id: "
-                      << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
-        } else {
-            auto beta_rowset = 
reinterpret_cast<BetaRowset*>(_cur_rowset.get());
-            std::vector<segment_v2::SegmentSharedPtr> segments;
-            RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
-            if (segments.size() > 1) {
-                // calculate delete bitmap between segments
-                
RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, 
segments,
-                                                                             
_delete_bitmap));
-            }
+Status DeltaWriter::submit_calc_delete_bitmap_task() {
+    if (!_tablet->enable_unique_key_merge_on_write()) {
+        return Status::OK();
+    }
 
-            // commit_phase_update_delete_bitmap() may generate new segments, 
we need to create a new
-            // transient rowset writer to write the new segments, then merge 
it back the original
-            // rowset.
-            std::unique_ptr<RowsetWriter> rowset_writer;
-            _tablet->create_transient_rowset_writer(_cur_rowset, 
&rowset_writer);
-            RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap(
-                    _cur_rowset, _rowset_ids, _delete_bitmap, segments, 
_req.txn_id,
-                    rowset_writer.get()));
-            if (_cur_rowset->tablet_schema()->is_partial_update()) {
-                // build rowset writer and merge transient rowset
-                RETURN_IF_ERROR(rowset_writer->flush());
-                RowsetSharedPtr transient_rowset = rowset_writer->build();
-                
_cur_rowset->merge_rowset_meta(transient_rowset->rowset_meta());
-
-                // erase segment cache cause we will add a segment to rowset
-                
SegmentLoader::instance()->erase_segment(_cur_rowset->rowset_id());
-            }
-        }
+    std::lock_guard<std::mutex> l(_lock);
+    // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
+    if (_tablet->tablet_state() == TABLET_NOTREADY &&
+        SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+        LOG(INFO) << "tablet is under alter process, delete bitmap will be 
calculated later, "
+                     "tablet_id: "
+                  << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
+        return Status::OK();
     }
+    auto beta_rowset = reinterpret_cast<BetaRowset*>(_cur_rowset.get());
+    std::vector<segment_v2::SegmentSharedPtr> segments;
+    RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
+    // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
+    if (_tablet->tablet_state() == TABLET_NOTREADY &&
+        SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+        return Status::OK();
+    }
+    if (segments.size() > 1) {
+        // calculate delete bitmap between segments
+        
RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, 
segments,
+                                                                     
_delete_bitmap));
+    }
+
+    // For partial update, we need to fill in the entire row of data, during 
the calculation
+    // of the delete bitmap. This operation is resource-intensive, and we need 
to minimize
+    // the number of times it occurs. Therefore, we skip this operation here.
+    if (_cur_rowset->tablet_schema()->is_partial_update()) {
+        return Status::OK();
+    }
+
+    LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " << 
_tablet->tablet_id()
+              << ", txn_id: " << _req.txn_id;
+    return _tablet->commit_phase_update_delete_bitmap(_cur_rowset, 
_rowset_ids, _delete_bitmap,
+                                                      segments, _req.txn_id,
+                                                      
_calc_delete_bitmap_token.get(), nullptr);
+}
+
+Status DeltaWriter::wait_calc_delete_bitmap() {
+    if (!_tablet->enable_unique_key_merge_on_write() ||
+        _cur_rowset->tablet_schema()->is_partial_update()) {
+        return Status::OK();
+    }
+    std::lock_guard<std::mutex> l(_lock);
+    RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
+    
RETURN_IF_ERROR(_calc_delete_bitmap_token->get_delete_bitmap(_delete_bitmap));
+    LOG(INFO) << "Got result of calc delete bitmap task from executor, 
tablet_id: "
+              << _tablet->tablet_id() << ", txn_id: " << _req.txn_id;
+    return Status::OK();
+}
+
+Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes,
+                               const bool write_single_replica) {
+    std::lock_guard<std::mutex> l(_lock);
+    SCOPED_TIMER(_close_wait_timer);
     Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, 
_tablet, _req.txn_id,
                                                             _req.load_id, 
_cur_rowset, false);
 
@@ -550,6 +578,9 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
         // cancel and wait all memtables in flush queue to be finished
         _flush_token->cancel();
     }
+    if (_calc_delete_bitmap_token != nullptr) {
+        _calc_delete_bitmap_token->cancel();
+    }
     _is_cancelled = true;
     _cancel_status = st;
     return Status::OK();
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index daf091bf8e..aeef1c6866 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -89,11 +89,14 @@ public:
 
     Status append(const vectorized::Block* block);
 
-    // flush the last memtable to flush queue, must call it before close_wait()
+    // flush the last memtable to flush queue, must call it before 
build_rowset()
     Status close();
     // wait for all memtables to be flushed.
     // mem_consumption() should be 0 after this function returns.
-    Status close_wait(const PSlaveTabletNodes& slave_tablet_nodes, const bool 
write_single_replica);
+    Status build_rowset();
+    Status submit_calc_delete_bitmap_task();
+    Status wait_calc_delete_bitmap();
+    Status commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, const bool 
write_single_replica);
 
     bool check_slave_replicas_done(google::protobuf::Map<int64_t, 
PSuccessSlaveTabletNodeIds>*
                                            success_slave_tablet_node_ids);
@@ -131,6 +134,9 @@ public:
 
     int64_t num_rows_filtered() const;
 
+    // For UT
+    DeleteBitmapPtr get_delete_bitmap() { return _delete_bitmap; }
+
 private:
     DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, 
RuntimeProfile* profile,
                 const UniqueId& load_id);
@@ -183,6 +189,7 @@ private:
     std::shared_mutex _slave_node_lock;
 
     DeleteBitmapPtr _delete_bitmap = nullptr;
+    std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token;
     // current rowset_ids, used to do diff in publish_version
     RowsetIdUnorderedSet _rowset_ids;
     // current max version, used to calculate delete bitmap
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index e2acd1cdb0..cdec9c09ed 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -193,7 +193,8 @@ Status 
FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr
 
     OlapStopWatch watch;
     RETURN_IF_ERROR(_tablet->calc_delete_bitmap(published_rowset, segments, 
specified_rowsets,
-                                                delete_bitmap, cur_version, 
rowset_writer));
+                                                delete_bitmap, cur_version, 
nullptr,
+                                                rowset_writer));
     size_t total_rows = std::accumulate(
             segments.begin(), segments.end(), 0,
             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 9716f265d3..78860a1f65 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -480,7 +480,7 @@ Status MemTable::_generate_delete_bitmap(int32_t 
segment_id) {
     OlapStopWatch watch;
     RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, 
specified_rowsets,
                                                 _mow_context->delete_bitmap,
-                                                _mow_context->max_version));
+                                                _mow_context->max_version, 
nullptr));
     size_t total_rows = std::accumulate(
             segments.begin(), segments.end(), 0,
             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index c1af281465..67c41c6f7e 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -243,11 +243,6 @@ Status StorageEngine::start_bg_threads() {
             .set_max_threads(config::tablet_publish_txn_max_thread)
             .build(&_tablet_publish_txn_thread_pool);
 
-    ThreadPoolBuilder("TabletCalcDeleteBitmapThreadPool")
-            .set_min_threads(1)
-            .set_max_threads(config::calc_delete_bitmap_max_thread)
-            .build(&_calc_delete_bitmap_thread_pool);
-
     RETURN_IF_ERROR(Thread::create(
             "StorageEngine", "aync_publish_version_thread",
             [this]() { this->_async_publish_callback(); }, 
&_async_publish_thread));
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 745637a296..340f02b65b 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -126,6 +126,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
           _txn_manager(new TxnManager(config::txn_map_shard_size, 
config::txn_shard_size)),
           _rowset_id_generator(new 
UniqueRowsetIdGenerator(options.backend_uid)),
           _memtable_flush_executor(nullptr),
+          _calc_delete_bitmap_executor(nullptr),
           _default_rowset_type(BETA_ROWSET),
           _heartbeat_flags(nullptr),
           _stream_load_recorder(nullptr) {
@@ -155,9 +156,6 @@ StorageEngine::~StorageEngine() {
     if (_tablet_meta_checkpoint_thread_pool) {
         _tablet_meta_checkpoint_thread_pool->shutdown();
     }
-    if (_calc_delete_bitmap_thread_pool) {
-        _calc_delete_bitmap_thread_pool->shutdown();
-    }
     if (_cold_data_compaction_thread_pool) {
         _cold_data_compaction_thread_pool->shutdown();
     }
@@ -199,6 +197,9 @@ Status StorageEngine::_open() {
     _memtable_flush_executor.reset(new MemTableFlushExecutor());
     _memtable_flush_executor->init(dirs);
 
+    _calc_delete_bitmap_executor.reset(new CalcDeleteBitmapExecutor());
+    _calc_delete_bitmap_executor->init();
+
     _parse_default_rowset_type();
 
     return Status::OK();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index a40e1792d9..7af2fde0fb 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -37,6 +37,7 @@
 
 #include "common/status.h"
 #include "gutil/ref_counted.h"
+#include "olap/calc_delete_bitmap_executor.h"
 #include "olap/compaction_permit_limiter.h"
 #include "olap/olap_common.h"
 #include "olap/options.h"
@@ -143,6 +144,9 @@ public:
     TabletManager* tablet_manager() { return _tablet_manager.get(); }
     TxnManager* txn_manager() { return _txn_manager.get(); }
     MemTableFlushExecutor* memtable_flush_executor() { return 
_memtable_flush_executor.get(); }
+    CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() {
+        return _calc_delete_bitmap_executor.get();
+    }
 
     bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id);
 
@@ -210,9 +214,6 @@ public:
     std::unique_ptr<ThreadPool>& tablet_publish_txn_thread_pool() {
         return _tablet_publish_txn_thread_pool;
     }
-    std::unique_ptr<ThreadPool>& calc_delete_bitmap_thread_pool() {
-        return _calc_delete_bitmap_thread_pool;
-    }
     bool stopped() { return _stopped; }
     ThreadPool* get_bg_multiget_threadpool() { return 
_bg_multi_get_thread_pool.get(); }
 
@@ -423,6 +424,7 @@ private:
     std::unique_ptr<RowsetIdGenerator> _rowset_id_generator;
 
     std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;
+    std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;
 
     // Used to control the migration from segment_v1 to segment_v2, can be 
deleted in futrue.
     // Type of new loaded data
@@ -437,7 +439,6 @@ private:
     std::unique_ptr<ThreadPool> _cold_data_compaction_thread_pool;
 
     std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool;
-    std::unique_ptr<ThreadPool> _calc_delete_bitmap_thread_pool;
 
     std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
     std::unique_ptr<ThreadPool> _bg_multi_get_thread_pool;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 1cd2e0427f..19c8a8467e 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2992,11 +2992,15 @@ Status 
Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
     return Status::OK();
 }
 
+// if user pass a token, then all calculation works will submit to a 
threadpool,
+// user can get all delete bitmaps from that token.
+// if `token` is nullptr, the calculation will run in local, and user can get 
the result
+// delete bitmap from `delete_bitmap` directly.
 Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
                                   const 
std::vector<segment_v2::SegmentSharedPtr>& segments,
                                   const std::vector<RowsetSharedPtr>& 
specified_rowsets,
                                   DeleteBitmapPtr delete_bitmap, int64_t 
end_version,
-                                  RowsetWriter* rowset_writer) {
+                                  CalcDeleteBitmapToken* token, RowsetWriter* 
rowset_writer) {
     auto rowset_id = rowset->rowset_id();
     if (specified_rowsets.empty() || segments.empty()) {
         LOG(INFO) << "skip to construct delete bitmap tablet: " << tablet_id()
@@ -3005,37 +3009,31 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr 
rowset,
     }
 
     OlapStopWatch watch;
+    doris::TabletSharedPtr tablet_ptr =
+            
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id());
+    if (tablet_ptr == nullptr) {
+        return Status::InternalError("Can't find tablet id: {}, maybe already 
dropped.",
+                                     tablet_id());
+    }
     std::vector<DeleteBitmapPtr> seg_delete_bitmaps;
-    std::unique_ptr<ThreadPoolToken> token =
-            
StorageEngine::instance()->calc_delete_bitmap_thread_pool()->new_token(
-                    ThreadPool::ExecutionMode::CONCURRENT);
-    std::atomic<int> calc_status {ErrorCode::OK};
-    for (size_t i = 1; i < segments.size(); i++) {
+    for (size_t i = 0; i < segments.size(); i++) {
         auto& seg = segments[i];
-        DeleteBitmapPtr seg_delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet_id());
-        seg_delete_bitmaps.push_back(seg_delete_bitmap);
-        RETURN_IF_ERROR(token->submit_func([=, &calc_status, this]() {
-            auto st = calc_segment_delete_bitmap(rowset, seg, 
specified_rowsets, seg_delete_bitmap,
-                                                 end_version, rowset_writer);
-            if (!st.ok()) {
-                LOG(WARNING) << "failed to calc segment delete bitmap, 
tablet_id: " << tablet_id()
-                             << " rowset: " << rowset_id << " seg_id: " << 
seg->id()
-                             << " version: " << end_version;
-                calc_status.store(st.code());
-            }
-        }));
+        if (token != nullptr) {
+            RETURN_IF_ERROR(token->submit(tablet_ptr, rowset, seg, 
specified_rowsets, end_version,
+                                          rowset_writer));
+        } else {
+            DeleteBitmapPtr seg_delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet_id());
+            seg_delete_bitmaps.push_back(seg_delete_bitmap);
+            RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[i], 
specified_rowsets,
+                                                       seg_delete_bitmap, 
end_version,
+                                                       rowset_writer));
+        }
     }
 
-    // this thread calc delete bitmap of segment 0
-    RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[0], 
specified_rowsets,
-                                               delete_bitmap, end_version, 
rowset_writer));
-    token->wait();
-    auto code = calc_status.load();
-    if (code != ErrorCode::OK) {
-        return Status::Error(code, "Tablet::calc_delete_bitmap meet error");
-    }
-    for (auto seg_delete_bitmap : seg_delete_bitmaps) {
-        delete_bitmap->merge(*seg_delete_bitmap);
+    if (token == nullptr) {
+        for (auto seg_delete_bitmap : seg_delete_bitmaps) {
+            delete_bitmap->merge(*seg_delete_bitmap);
+        }
     }
     return Status::OK();
 }
@@ -3212,8 +3210,11 @@ Status Tablet::update_delete_bitmap_without_lock(const 
RowsetSharedPtr& rowset)
 
     std::vector<RowsetSharedPtr> specified_rowsets = 
get_rowset_by_ids(&cur_rowset_ids);
     OlapStopWatch watch;
+    auto token = 
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
     RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, 
delete_bitmap,
-                                       cur_version - 1));
+                                       cur_version - 1, token.get()));
+    token->wait();
+    token->get_delete_bitmap(delete_bitmap);
     size_t total_rows = std::accumulate(
             segments.begin(), segments.end(), 0,
             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
@@ -3233,7 +3234,7 @@ Status Tablet::update_delete_bitmap_without_lock(const 
RowsetSharedPtr& rowset)
 Status Tablet::commit_phase_update_delete_bitmap(
         const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& pre_rowset_ids,
         DeleteBitmapPtr delete_bitmap, const 
std::vector<segment_v2::SegmentSharedPtr>& segments,
-        int64_t txn_id, RowsetWriter* rowset_writer) {
+        int64_t txn_id, CalcDeleteBitmapToken* token, RowsetWriter* 
rowset_writer) {
     SCOPED_BVAR_LATENCY(g_tablet_commit_phase_update_delete_bitmap_latency);
     RowsetIdUnorderedSet cur_rowset_ids;
     RowsetIdUnorderedSet rowset_ids_to_add;
@@ -3247,19 +3248,14 @@ Status Tablet::commit_phase_update_delete_bitmap(
         cur_rowset_ids = all_rs_id(cur_version);
         _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, 
&rowset_ids_to_add,
                                &rowset_ids_to_del);
-        if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) {
-            LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size()
-                      << ", rowset_ids_to_del: " << rowset_ids_to_del.size();
-        }
         specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
     }
     for (const auto& to_del : rowset_ids_to_del) {
         delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
     }
 
-    OlapStopWatch watch;
     RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, 
delete_bitmap,
-                                       cur_version, rowset_writer));
+                                       cur_version, token, rowset_writer));
     size_t total_rows = std::accumulate(
             segments.begin(), segments.end(), 0,
             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
@@ -3267,7 +3263,7 @@ Status Tablet::commit_phase_update_delete_bitmap(
               << ", rowset_ids to add: " << rowset_ids_to_add.size()
               << ", rowset_ids to del: " << rowset_ids_to_del.size()
               << ", cur max_version: " << cur_version << ", transaction_id: " 
<< txn_id
-              << ", cost: " << watch.get_elapse_time_us() << "(us), total 
rows: " << total_rows;
+              << ", total rows: " << total_rows;
     pre_rowset_ids = cur_rowset_ids;
     return Status::OK();
 }
@@ -3309,8 +3305,11 @@ Status Tablet::update_delete_bitmap(const 
RowsetSharedPtr& rowset,
     }
 
     OlapStopWatch watch;
+    auto token = 
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
     RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, 
delete_bitmap,
-                                       cur_version - 1, rowset_writer));
+                                       cur_version - 1, token.get(), 
rowset_writer));
+    token->wait();
+    token->get_delete_bitmap(delete_bitmap);
     size_t total_rows = std::accumulate(
             segments.begin(), segments.end(), 0,
             [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index adc06de339..3efd2c89ce 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -70,6 +70,7 @@ class RowIdConversion;
 class TTabletInfo;
 class TabletMetaPB;
 class TupleDescriptor;
+class CalcDeleteBitmapToken;
 enum CompressKind : int;
 
 namespace io {
@@ -446,7 +447,7 @@ public:
                               const std::vector<segment_v2::SegmentSharedPtr>& 
segments,
                               const std::vector<RowsetSharedPtr>& 
specified_rowsets,
                               DeleteBitmapPtr delete_bitmap, int64_t version,
-                              RowsetWriter* rowset_writer = nullptr);
+                              CalcDeleteBitmapToken* token, RowsetWriter* 
rowset_writer = nullptr);
 
     std::vector<RowsetSharedPtr> get_rowset_by_ids(
             const RowsetIdUnorderedSet* specified_rowset_ids);
@@ -479,7 +480,7 @@ public:
             const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& 
pre_rowset_ids,
             DeleteBitmapPtr delete_bitmap,
             const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t 
txn_id,
-            RowsetWriter* rowset_writer = nullptr);
+            CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = 
nullptr);
 
     Status update_delete_bitmap(const RowsetSharedPtr& rowset,
                                 const RowsetIdUnorderedSet& pre_rowset_ids,
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index 3862fd533e..714aff369e 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -166,9 +166,9 @@ Status TabletsChannel::close(
                     // just skip this tablet(writer) and continue to close 
others
                     continue;
                 }
-                // to make sure tablet writer in `_broken_tablets` won't call 
`close_wait` method.
-                // `close_wait` might create the rowset and commit txn 
directly, and the subsequent
-                // publish version task will success, which can cause the 
replica inconsistency.
+                // tablet writer in `_broken_tablets` should not call 
`build_rowset` and
+                // `commit_txn` method, after that, the publish-version task 
will success,
+                // which can cause the replica inconsistency.
                 if (_is_broken_tablet(it.second->tablet_id())) {
                     LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is 
broken but not cancelled"
                                  << ", tablet_id=" << it.first << ", 
transaction_id=" << _txn_id;
@@ -190,7 +190,39 @@ Status TabletsChannel::close(
 
         _write_single_replica = write_single_replica;
 
-        // 2. wait delta writers and build the tablet vector
+        // 2. wait all writer finished flush.
+        for (auto writer : need_wait_writers) {
+            writer->wait_flush();
+        }
+
+        // 3. build rowset
+        for (auto it = need_wait_writers.begin(); it != 
need_wait_writers.end(); it++) {
+            Status st = (*it)->build_rowset();
+            if (!st.ok()) {
+                _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
+                it = need_wait_writers.erase(it);
+                continue;
+            }
+            // 3.1 calculate delete bitmap for Unique Key MoW tables
+            st = (*it)->submit_calc_delete_bitmap_task();
+            if (!st.ok()) {
+                _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
+                it = need_wait_writers.erase(it);
+                continue;
+            }
+        }
+
+        // 4. wait for delete bitmap calculation complete if necessary
+        for (auto it = need_wait_writers.begin(); it != 
need_wait_writers.end(); it++) {
+            Status st = (*it)->wait_calc_delete_bitmap();
+            if (!st.ok()) {
+                _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
+                it = need_wait_writers.erase(it);
+                continue;
+            }
+        }
+
+        // 5. commit all writers
         for (auto writer : need_wait_writers) {
             PSlaveTabletNodes slave_nodes;
             if (write_single_replica) {
@@ -198,7 +230,7 @@ Status TabletsChannel::close(
             }
             // close may return failed, but no need to handle it here.
             // tablet_vec will only contains success tablet, and then let FE 
judge it.
-            _close_wait(writer, tablet_vec, tablet_errors, slave_nodes, 
write_single_replica);
+            _commit_txn(writer, tablet_vec, tablet_errors, slave_nodes, 
write_single_replica);
         }
 
         if (write_single_replica) {
@@ -229,12 +261,12 @@ Status TabletsChannel::close(
     return Status::OK();
 }
 
-void TabletsChannel::_close_wait(DeltaWriter* writer,
+void TabletsChannel::_commit_txn(DeltaWriter* writer,
                                  
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
                                  
google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors,
                                  PSlaveTabletNodes slave_tablet_nodes,
                                  const bool write_single_replica) {
-    Status st = writer->close_wait(slave_tablet_nodes, write_single_replica);
+    Status st = writer->commit_txn(slave_tablet_nodes, write_single_replica);
     if (st.ok()) {
         PTabletInfo* tablet_info = tablet_vec->Add();
         tablet_info->set_tablet_id(writer->tablet_id());
@@ -242,14 +274,20 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
         tablet_info->set_received_rows(writer->total_received_rows());
         tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
     } else {
-        PTabletError* tablet_error = tablet_errors->Add();
-        tablet_error->set_tablet_id(writer->tablet_id());
-        tablet_error->set_msg(st.to_string());
-        VLOG_PROGRESS << "close wait failed tablet " << writer->tablet_id() << 
" transaction_id "
-                      << _txn_id << "err msg " << st;
+        _add_error_tablet(tablet_errors, writer->tablet_id(), st);
     }
 }
 
+void TabletsChannel::_add_error_tablet(
+        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, 
int64_t tablet_id,
+        Status error) {
+    PTabletError* tablet_error = tablet_errors->Add();
+    tablet_error->set_tablet_id(tablet_id);
+    tablet_error->set_msg(error.to_string());
+    VLOG_PROGRESS << "close wait failed tablet " << tablet_id << " 
transaction_id " << _txn_id
+                  << "err msg " << error;
+}
+
 int64_t TabletsChannel::mem_consumption() {
     int64_t mem_usage = 0;
     {
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 9a06653c74..e7c05a1998 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -126,14 +126,16 @@ private:
     // open all writer
     Status _open_all_writers(const PTabletWriterOpenRequest& request);
 
-    // deal with DeltaWriter close_wait(), add tablet to list for return.
-    void _close_wait(DeltaWriter* writer,
+    // deal with DeltaWriter commit_txn(), add tablet to list for return.
+    void _commit_txn(DeltaWriter* writer,
                      google::protobuf::RepeatedPtrField<PTabletInfo>* 
tablet_vec,
-                     google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_error,
+                     google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_errors,
                      PSlaveTabletNodes slave_tablet_nodes, const bool 
write_single_replica);
     void _build_partition_tablets_relation(const PTabletWriterOpenRequest& 
request);
 
     void _add_broken_tablet(int64_t tablet_id);
+    void _add_error_tablet(google::protobuf::RepeatedPtrField<PTabletError>* 
tablet_errors,
+                           int64_t tablet_id, Status error);
     bool _is_broken_tablet(int64_t tablet_id);
     void _init_profile(RuntimeProfile* profile);
 
diff --git a/be/test/olap/delta_writer_test.cpp 
b/be/test/olap/delta_writer_test.cpp
index f1d5f49619..9fd998ebd0 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -261,7 +261,8 @@ static void create_tablet_request(int64_t tablet_id, 
int32_t schema_hash,
 }
 
 static void create_tablet_request_with_sequence_col(int64_t tablet_id, int32_t 
schema_hash,
-                                                    TCreateTabletReq* request) 
{
+                                                    TCreateTabletReq* request,
+                                                    bool enable_mow = false) {
     request->tablet_id = tablet_id;
     request->__set_version(1);
     request->tablet_schema.schema_hash = schema_hash;
@@ -270,6 +271,7 @@ static void create_tablet_request_with_sequence_col(int64_t 
tablet_id, int32_t s
     request->tablet_schema.storage_type = TStorageType::COLUMN;
     request->tablet_schema.__set_sequence_col_idx(4);
     request->__set_storage_format(TStorageFormat::V2);
+    request->__set_enable_unique_key_merge_on_write(enable_mow);
 
     TColumn k1;
     k1.column_name = "k1";
@@ -434,6 +436,28 @@ static TDescriptorTable 
create_descriptor_tablet_with_sequence_col() {
     return dtb.desc_tbl();
 }
 
+static void generate_data(vectorized::Block* block, int8_t k1, int16_t k2, 
int32_t seq) {
+    auto columns = block->mutate_columns();
+    int8_t c1 = k1;
+    columns[0]->insert_data((const char*)&c1, sizeof(c1));
+
+    int16_t c2 = k2;
+    columns[1]->insert_data((const char*)&c2, sizeof(c2));
+
+    vectorized::VecDateTimeValue c3;
+    c3.from_date_str("2020-07-16 19:39:43", 19);
+    int64_t c3_int = c3.to_int64();
+    columns[2]->insert_data((const char*)&c3_int, sizeof(c3));
+
+    doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c4;
+    c4.set_time(2022, 6, 6, 0, 0, 0, 0);
+    uint32_t c4_int = c4.to_date_int_val();
+    columns[3]->insert_data((const char*)&c4_int, sizeof(c4));
+
+    int32_t c5 = seq;
+    columns[4]->insert_data((const char*)&c5, sizeof(c2));
+}
+
 class TestDeltaWriter : public ::testing::Test {
 public:
     TestDeltaWriter() {}
@@ -473,7 +497,9 @@ TEST_F(TestDeltaWriter, open) {
     EXPECT_NE(delta_writer, nullptr);
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait(PSlaveTabletNodes(), false);
+    res = delta_writer->build_rowset();
+    EXPECT_EQ(Status::OK(), res);
+    res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
     EXPECT_EQ(Status::OK(), res);
     SAFE_DELETE(delta_writer);
 
@@ -593,7 +619,15 @@ TEST_F(TestDeltaWriter, vec_write) {
 
     res = delta_writer->close();
     ASSERT_TRUE(res.ok());
-    res = delta_writer->close_wait(PSlaveTabletNodes(), false);
+    res = delta_writer->wait_flush();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->build_rowset();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->submit_calc_delete_bitmap_task();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->wait_calc_delete_bitmap();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
     ASSERT_TRUE(res.ok());
 
     // publish version success
@@ -661,54 +695,25 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
                                                        slot_desc->col_name()));
     }
 
-    auto columns = block.mutate_columns();
-    {
-        int8_t c1 = 123;
-        columns[0]->insert_data((const char*)&c1, sizeof(c1));
-
-        int16_t c2 = 456;
-        columns[1]->insert_data((const char*)&c2, sizeof(c2));
-
-        vectorized::VecDateTimeValue c3;
-        c3.from_date_str("2020-07-16 19:39:43", 19);
-        int64_t c3_int = c3.to_int64();
-        columns[2]->insert_data((const char*)&c3_int, sizeof(c3));
-
-        doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c4;
-        c4.set_time(2022, 6, 6, 0, 0, 0, 0);
-        uint32_t c4_int = c4.to_date_int_val();
-        columns[3]->insert_data((const char*)&c4_int, sizeof(c4));
-
-        int32_t c5 = 100;
-        columns[4]->insert_data((const char*)&c5, sizeof(c2));
-        res = delta_writer->write(&block, {0});
-        ASSERT_TRUE(res.ok());
-    }
-    {
-        int8_t c1 = 123;
-        columns[0]->insert_data((const char*)&c1, sizeof(c1));
-
-        int16_t c2 = 456;
-        columns[1]->insert_data((const char*)&c2, sizeof(c2));
-
-        vectorized::VecDateTimeValue c3;
-        c3.from_date_str("2020-07-31 19:39:43", 19);
-        int64_t c3_int = c3.to_int64();
-        columns[2]->insert_data((const char*)&c3_int, sizeof(c3));
+    generate_data(&block, 123, 456, 100);
+    res = delta_writer->write(&block, {0});
+    ASSERT_TRUE(res.ok());
 
-        doris::vectorized::DateV2Value<doris::vectorized::DateV2ValueType> c4;
-        c4.set_time(2022, 7, 6, 0, 0, 0, 0);
-        uint32_t c4_int = c4.to_date_int_val();
-        columns[3]->insert_data((const char*)&c4_int, sizeof(c4));
+    generate_data(&block, 123, 456, 90);
+    res = delta_writer->write(&block, {1});
+    ASSERT_TRUE(res.ok());
 
-        int32_t c5 = 90;
-        columns[4]->insert_data((const char*)&c5, sizeof(c2));
-        res = delta_writer->write(&block, {1});
-        ASSERT_TRUE(res.ok());
-    }
     res = delta_writer->close();
     ASSERT_TRUE(res.ok());
-    res = delta_writer->close_wait(PSlaveTabletNodes(), false);
+    res = delta_writer->wait_flush();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->build_rowset();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->submit_calc_delete_bitmap_task();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->wait_calc_delete_bitmap();
+    ASSERT_TRUE(res.ok());
+    res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
     ASSERT_TRUE(res.ok());
 
     // publish version success
@@ -767,4 +772,243 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     delete delta_writer;
 }
 
+TEST_F(TestDeltaWriter, vec_sequence_col_concurrent_write) {
+    TCreateTabletReq request;
+    sleep(20);
+    create_tablet_request_with_sequence_col(10005, 270068377, &request, true);
+    Status res = k_engine->create_tablet(request);
+    ASSERT_TRUE(res.ok());
+
+    TDescriptorTable tdesc_tbl = create_descriptor_tablet_with_sequence_col();
+    ObjectPool obj_pool;
+    DescriptorTbl* desc_tbl = nullptr;
+    DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl);
+    TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
+    OlapTableSchemaParam param;
+
+    PUniqueId load_id;
+    load_id.set_hi(0);
+    load_id.set_lo(0);
+    WriteRequest write_req = {
+            10005, 270068377, WriteType::LOAD, 20003, 30003, load_id, 
tuple_desc, &(tuple_desc->slots()),
+            false, &param};
+    DeltaWriter* delta_writer1 = nullptr;
+    DeltaWriter* delta_writer2 = nullptr;
+    std::unique_ptr<RuntimeProfile> profile1;
+    profile1 = std::make_unique<RuntimeProfile>("LoadChannels1");
+    std::unique_ptr<RuntimeProfile> profile2;
+    profile2 = std::make_unique<RuntimeProfile>("LoadChannels2");
+    DeltaWriter::open(&write_req, &delta_writer1, profile1.get(), TUniqueId());
+    DeltaWriter::open(&write_req, &delta_writer2, profile2.get(), TUniqueId());
+    ASSERT_NE(delta_writer1, nullptr);
+    ASSERT_NE(delta_writer2, nullptr);
+
+    // write data in delta writer 1
+    {
+        vectorized::Block block;
+        for (const auto& slot_desc : tuple_desc->slots()) {
+            
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+                                                           
slot_desc->get_data_type_ptr(),
+                                                           
slot_desc->col_name()));
+        }
+
+        generate_data(&block, 10, 123, 100);
+        res = delta_writer1->write(&block, {0});
+        ASSERT_TRUE(res.ok());
+
+        generate_data(&block, 20, 123, 100);
+        res = delta_writer1->write(&block, {1});
+        ASSERT_TRUE(res.ok());
+
+        res = delta_writer1->close();
+        ASSERT_TRUE(res.ok());
+        res = delta_writer1->wait_flush();
+        ASSERT_TRUE(res.ok());
+        res = delta_writer1->build_rowset();
+        ASSERT_TRUE(res.ok());
+        res = delta_writer1->submit_calc_delete_bitmap_task();
+        ASSERT_TRUE(res.ok());
+        res = delta_writer1->wait_calc_delete_bitmap();
+        ASSERT_TRUE(res.ok());
+        res = delta_writer1->commit_txn(PSlaveTabletNodes(), false);
+        ASSERT_TRUE(res.ok());
+    }
+    // write data in delta writer 2
+    {
+        vectorized::Block block;
+        for (const auto& slot_desc : tuple_desc->slots()) {
+            
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
+                                                           
slot_desc->get_data_type_ptr(),
+                                                           
slot_desc->col_name()));
+        }
+
+        generate_data(&block, 10, 123, 110);
+        res = delta_writer2->write(&block, {0});
+        ASSERT_TRUE(res.ok());
+
+        generate_data(&block, 20, 123, 90);
+        res = delta_writer2->write(&block, {1});
+        ASSERT_TRUE(res.ok());
+
+        res = delta_writer2->close();
+        ASSERT_TRUE(res.ok());
+        res = delta_writer2->wait_flush();
+        ASSERT_TRUE(res.ok());
+    }
+    TabletSharedPtr tablet = 
k_engine->tablet_manager()->get_tablet(write_req.tablet_id);
+    std::cout << "before publish, tablet row nums:" << tablet->num_rows() << 
std::endl;
+    OlapMeta* meta = tablet->data_dir()->get_meta();
+    RowsetSharedPtr rowset1 = nullptr;
+    RowsetSharedPtr rowset2 = nullptr;
+
+    // publish version on delta writer 1 success
+    {
+        Version version;
+        version.first = tablet->rowset_with_max_version()->end_version() + 1;
+        version.second = tablet->rowset_with_max_version()->end_version() + 1;
+        std::cout << "start to add rowset version:" << version.first << "-" << 
version.second
+                  << std::endl;
+        std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+        StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+                write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+        ASSERT_EQ(1, tablet_related_rs.size());
+
+        std::cout << "start to publish txn" << std::endl;
+        rowset1 = tablet_related_rs.begin()->second;
+        TabletPublishStatistics pstats;
+        res = k_engine->txn_manager()->publish_txn(meta, 
write_req.partition_id, write_req.txn_id,
+                                                   write_req.tablet_id, 
write_req.schema_hash,
+                                                   
tablet_related_rs.begin()->first.tablet_uid,
+                                                   version, &pstats);
+        ASSERT_TRUE(res.ok());
+        std::cout << "start to add inc rowset:" << rowset1->rowset_id()
+                  << ", num rows:" << rowset1->num_rows()
+                  << ", version:" << rowset1->version().first << "-" << 
rowset1->version().second
+                  << std::endl;
+        res = tablet->add_inc_rowset(rowset1);
+        ASSERT_TRUE(res.ok());
+        ASSERT_EQ(2, tablet->num_rows());
+        std::vector<segment_v2::SegmentSharedPtr> segments;
+        res = ((BetaRowset*)rowset1.get())->load_segments(&segments);
+        ASSERT_TRUE(res.ok());
+        ASSERT_EQ(1, rowset1->num_segments());
+        ASSERT_EQ(1, segments.size());
+    }
+
+    // commit delta writer2, then publish it.
+    {
+        // commit, calc delete bitmap should happen here
+        res = delta_writer2->build_rowset();
+        ASSERT_TRUE(res.ok());
+        res = delta_writer2->submit_calc_delete_bitmap_task();
+        ASSERT_TRUE(res.ok());
+        res = delta_writer2->wait_calc_delete_bitmap();
+        ASSERT_TRUE(res.ok());
+
+        // verify that delete bitmap calculated correctly
+        // since the delete bitmap not published, versions are 0
+        auto delete_bitmap = delta_writer2->get_delete_bitmap();
+        ASSERT_TRUE(delete_bitmap->contains({rowset1->rowset_id(), 0, 0}, 0));
+        // We can't get the rowset id of rowset2 now, will check the delete 
bitmap
+        // contains row 0 of rowset2 at L929.
+
+        res = delta_writer2->commit_txn(PSlaveTabletNodes(), false);
+        ASSERT_TRUE(res.ok());
+
+        Version version;
+        version.first = tablet->rowset_with_max_version()->end_version() + 1;
+        version.second = tablet->rowset_with_max_version()->end_version() + 1;
+        std::cout << "start to add rowset version:" << version.first << "-" << 
version.second
+                  << std::endl;
+        std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+        StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+                write_req.txn_id, write_req.partition_id, &tablet_related_rs);
+        ASSERT_EQ(1, tablet_related_rs.size());
+
+        std::cout << "start to publish txn" << std::endl;
+        rowset2 = tablet_related_rs.begin()->second;
+        ASSERT_TRUE(delete_bitmap->contains({rowset2->rowset_id(), 0, 0}, 1));
+
+        TabletPublishStatistics pstats;
+        res = k_engine->txn_manager()->publish_txn(meta, 
write_req.partition_id, write_req.txn_id,
+                                                   write_req.tablet_id, 
write_req.schema_hash,
+                                                   
tablet_related_rs.begin()->first.tablet_uid,
+                                                   version, &pstats);
+        ASSERT_TRUE(res.ok());
+        std::cout << "start to add inc rowset:" << rowset2->rowset_id()
+                  << ", num rows:" << rowset2->num_rows()
+                  << ", version:" << rowset2->version().first << "-" << 
rowset2->version().second
+                  << std::endl;
+        res = tablet->add_inc_rowset(rowset2);
+        ASSERT_TRUE(res.ok());
+        ASSERT_EQ(4, tablet->num_rows());
+        std::vector<segment_v2::SegmentSharedPtr> segments;
+        res = ((BetaRowset*)rowset2.get())->load_segments(&segments);
+        ASSERT_TRUE(res.ok());
+        ASSERT_EQ(1, rowset2->num_segments());
+        ASSERT_EQ(1, segments.size());
+    }
+
+    auto cur_version = tablet->rowset_with_max_version()->end_version();
+    // read data from rowset 1, verify the data correct
+    {
+        OlapReaderStatistics stats;
+        StorageReadOptions opts;
+        opts.stats = &stats;
+        opts.tablet_schema = rowset1->tablet_schema();
+        opts.delete_bitmap.emplace(0, 
tablet->tablet_meta()->delete_bitmap().get_agg(
+                                              {rowset1->rowset_id(), 0, 
cur_version}));
+        std::unique_ptr<RowwiseIterator> iter;
+        std::shared_ptr<Schema> schema = 
std::make_shared<Schema>(rowset1->tablet_schema());
+        std::vector<segment_v2::SegmentSharedPtr> segments;
+        ((BetaRowset*)rowset1.get())->load_segments(&segments);
+        auto s = segments[0]->new_iterator(schema, opts, &iter);
+        ASSERT_TRUE(s.ok());
+        auto read_block = rowset1->tablet_schema()->create_block();
+        res = iter->next_batch(&read_block);
+        ASSERT_TRUE(res.ok());
+        // key of (10, 123) is deleted
+        ASSERT_EQ(1, read_block.rows());
+        auto k1 = read_block.get_by_position(0).column->get_int(0);
+        ASSERT_EQ(20, k1);
+        auto k2 = read_block.get_by_position(1).column->get_int(0);
+        ASSERT_EQ(123, k2);
+        // get the value from sequence column
+        auto seq_v = read_block.get_by_position(4).column->get_int(0);
+        ASSERT_EQ(100, seq_v);
+    }
+
+    // read data from rowset 2, verify the data correct
+    {
+        OlapReaderStatistics stats;
+        StorageReadOptions opts;
+        opts.stats = &stats;
+        opts.tablet_schema = rowset2->tablet_schema();
+        opts.delete_bitmap.emplace(0, 
tablet->tablet_meta()->delete_bitmap().get_agg(
+                                              {rowset2->rowset_id(), 0, 
cur_version}));
+        std::unique_ptr<RowwiseIterator> iter;
+        std::shared_ptr<Schema> schema = 
std::make_shared<Schema>(rowset2->tablet_schema());
+        std::vector<segment_v2::SegmentSharedPtr> segments;
+        ((BetaRowset*)rowset2.get())->load_segments(&segments);
+        auto s = segments[0]->new_iterator(schema, opts, &iter);
+        ASSERT_TRUE(s.ok());
+        auto read_block = rowset2->tablet_schema()->create_block();
+        res = iter->next_batch(&read_block);
+        ASSERT_TRUE(res.ok());
+        // key of (20, 123) is deleted, because it's seq value is low
+        ASSERT_EQ(1, read_block.rows());
+        auto k1 = read_block.get_by_position(0).column->get_int(0);
+        ASSERT_EQ(10, k1);
+        auto k2 = read_block.get_by_position(1).column->get_int(0);
+        ASSERT_EQ(123, k2);
+        // get the value from sequence column
+        auto seq_v = read_block.get_by_position(4).column->get_int(0);
+        ASSERT_EQ(110, seq_v);
+    }
+
+    res = k_engine->tablet_manager()->drop_tablet(request.tablet_id, 
request.replica_id, false);
+    ASSERT_TRUE(res.ok());
+    delete delta_writer1;
+    delete delta_writer2;
+}
 } // namespace doris
diff --git a/be/test/olap/engine_storage_migration_task_test.cpp 
b/be/test/olap/engine_storage_migration_task_test.cpp
index 79fbf26203..5e2312cb0d 100644
--- a/be/test/olap/engine_storage_migration_task_test.cpp
+++ b/be/test/olap/engine_storage_migration_task_test.cpp
@@ -193,7 +193,9 @@ TEST_F(TestEngineStorageMigrationTask, write_and_migration) 
{
 
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
-    res = delta_writer->close_wait(PSlaveTabletNodes(), false);
+    res = delta_writer->build_rowset();
+    EXPECT_EQ(Status::OK(), res);
+    res = delta_writer->commit_txn(PSlaveTabletNodes(), false);
     EXPECT_EQ(Status::OK(), res);
 
     // publish version success
diff --git a/be/test/olap/tablet_cooldown_test.cpp 
b/be/test/olap/tablet_cooldown_test.cpp
index 431342c4f2..509425cb15 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -393,7 +393,9 @@ void createTablet(StorageEngine* engine, TabletSharedPtr* 
tablet, int64_t replic
 
     st = delta_writer->close();
     ASSERT_EQ(Status::OK(), st);
-    st = delta_writer->close_wait(PSlaveTabletNodes(), false);
+    st = delta_writer->build_rowset();
+    ASSERT_EQ(Status::OK(), st);
+    st = delta_writer->commit_txn(PSlaveTabletNodes(), false);
     ASSERT_EQ(Status::OK(), st);
     delete delta_writer;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to