This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d2688ae6021 [feature](merge-cloud) schema change for mow table (#31819)
d2688ae6021 is described below

commit d2688ae6021314f4e9d2341a7e08f8fe9d43e97a
Author: Xin Liao <liaoxin...@126.com>
AuthorDate: Wed Mar 6 00:11:50 2024 +0800

    [feature](merge-cloud) schema change for mow table (#31819)
---
 be/src/cloud/cloud_schema_change_job.cpp | 80 +++++++++++++++++++++++++++++++-
 be/src/cloud/cloud_schema_change_job.h   |  3 ++
 be/src/olap/base_tablet.cpp              | 55 ++++++++++++++++++++++
 be/src/olap/base_tablet.h                |  3 ++
 be/src/olap/tablet.cpp                   | 55 ----------------------
 be/src/olap/tablet.h                     |  3 --
 6 files changed, 140 insertions(+), 59 deletions(-)

diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index 2099f22f1cd..cd7e0744324 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -39,6 +39,7 @@ namespace doris {
 using namespace ErrorCode;
 
 static constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
+static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2;
 
 static std::unique_ptr<SchemaChange> get_sc_procedure(const BlockChanger& 
changer,
                                                       bool sc_sorting) {
@@ -234,6 +235,7 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
     }
 
     // 3. Convert historical data
+    bool already_exist_any_version = false;
     for (const auto& rs_reader : sc_params.ref_rowset_readers) {
         VLOG_TRACE << "Begin to convert a history rowset. version=" << 
rs_reader->version();
 
@@ -264,6 +266,7 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
                 RETURN_IF_ERROR(RowsetFactory::create_rowset(nullptr, 
_new_tablet->tablet_path(),
                                                              existed_rs_meta, 
&rowset));
                 _output_rowsets.push_back(std::move(rowset));
+                already_exist_any_version = true;
                 continue;
             } else {
                 return st;
@@ -327,7 +330,18 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
     _output_cumulative_point = std::min(_output_cumulative_point, 
sc_job->alter_version() + 1);
     sc_job->set_output_cumulative_point(_output_cumulative_point);
 
-    // TODO(Lchangliang): process delete bitmap if the table is MOW
+    // process delete bitmap if the table is MOW
+    if (_new_tablet->enable_unique_key_merge_on_write()) {
+        int64_t initiator = 
boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
+                            std::numeric_limits<int64_t>::max();
+        // If there are historical versions of rowsets, we need to recalculate 
their delete
+        // bitmaps, otherwise we will miss the delete bitmaps of incremental 
rowsets
+        int64_t start_calc_delete_bitmap_version =
+                already_exist_any_version ? 0 : sc_job->alter_version() + 1;
+        RETURN_IF_ERROR(_process_delete_bitmap(sc_job->alter_version(),
+                                               
start_calc_delete_bitmap_version, initiator));
+        sc_job->set_delete_bitmap_lock_initiator(initiator);
+    }
 
     cloud::FinishTabletJobResponse finish_resp;
     st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp);
@@ -361,4 +375,68 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
     }
     return Status::OK();
 }
+
+Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
+                                                    int64_t 
start_calc_delete_bitmap_version,
+                                                    int64_t initiator) {
+    LOG_INFO("process mow table")
+            .tag("new_tablet_id", _new_tablet->tablet_id())
+            .tag("out_rowset_size", _output_rowsets.size())
+            .tag("start_calc_delete_bitmap_version", 
start_calc_delete_bitmap_version)
+            .tag("alter_version", alter_version);
+    TabletMetaSharedPtr tmp_meta = 
std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
+    tmp_meta->delete_bitmap().delete_bitmap.clear();
+    std::shared_ptr<CloudTablet> tmp_tablet =
+            std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
+    {
+        std::unique_lock wlock(tmp_tablet->get_header_lock());
+        tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
+    }
+
+    // step 1, process incremental rowset without delete bitmap update lock
+    std::vector<RowsetSharedPtr> incremental_rowsets;
+    
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
+    int64_t max_version = tmp_tablet->max_version().second;
+    LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
+              << "incremental rowsets without lock, version: " << 
start_calc_delete_bitmap_version
+              << "-" << max_version << " new_table_id: " << 
_new_tablet->tablet_id();
+    if (max_version >= start_calc_delete_bitmap_version) {
+        RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
+                {start_calc_delete_bitmap_version, max_version}, 
&incremental_rowsets));
+        for (auto rowset : incremental_rowsets) {
+            
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, 
rowset));
+        }
+    }
+
+    // step 2, process incremental rowset with delete bitmap update lock
+    
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().get_delete_bitmap_update_lock(
+            *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator));
+    
RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().sync_tablet_rowsets(tmp_tablet.get()));
+    int64_t new_max_version = tmp_tablet->max_version().second;
+    LOG(INFO) << "alter table for mow table, calculate delete bitmap of "
+              << "incremental rowsets with lock, version: " << max_version + 1 
<< "-"
+              << new_max_version << " new_tablet_id: " << 
_new_tablet->tablet_id();
+    std::vector<RowsetSharedPtr> new_incremental_rowsets;
+    if (new_max_version > max_version) {
+        RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked(
+                {max_version + 1, new_max_version}, &new_incremental_rowsets));
+        {
+            std::unique_lock wlock(tmp_tablet->get_header_lock());
+            tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
+        }
+        for (auto rowset : new_incremental_rowsets) {
+            
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, 
rowset));
+        }
+    }
+
+    auto& delete_bitmap = tmp_tablet->tablet_meta()->delete_bitmap();
+
+    // step4, store delete bitmap
+    RETURN_IF_ERROR(_cloud_storage_engine.meta_mgr().update_delete_bitmap(
+            *_new_tablet, SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID, initiator, 
&delete_bitmap));
+
+    _new_tablet->tablet_meta()->delete_bitmap() = delete_bitmap;
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_schema_change_job.h 
b/be/src/cloud/cloud_schema_change_job.h
index 7bb03fda12a..d587111df71 100644
--- a/be/src/cloud/cloud_schema_change_job.h
+++ b/be/src/cloud/cloud_schema_change_job.h
@@ -39,6 +39,9 @@ public:
 private:
     Status _convert_historical_rowsets(const SchemaChangeParams& sc_params);
 
+    Status _process_delete_bitmap(int64_t alter_version, int64_t 
start_calc_delete_bitmap_version,
+                                  int64_t initiator);
+
 private:
     CloudStorageEngine& _cloud_storage_engine;
     std::shared_ptr<CloudTablet> _base_tablet;
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index fe76d43c7f0..e5a41abdbd9 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -1375,4 +1375,59 @@ Status BaseTablet::check_rowid_conversion(
     return Status::OK();
 }
 
+// The caller should hold _rowset_update_lock and _meta_lock lock.
+Status BaseTablet::update_delete_bitmap_without_lock(const BaseTabletSPtr& 
self,
+                                                     const RowsetSharedPtr& 
rowset) {
+    DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", {
+        if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
+            
LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed");
+            return Status::InternalError(
+                    "debug tablet update delete bitmap without lock random 
failed");
+        }
+    });
+    int64_t cur_version = rowset->end_version();
+    std::vector<segment_v2::SegmentSharedPtr> segments;
+    
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
+
+    // If this rowset does not have a segment, there is no need for an update.
+    if (segments.empty()) {
+        LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap 
tablet: "
+                  << self->tablet_id() << " cur max_version: " << cur_version;
+        return Status::OK();
+    }
+    RowsetIdUnorderedSet cur_rowset_ids;
+    RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, 
&cur_rowset_ids));
+    DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(self->tablet_id());
+    RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, 
segments, delete_bitmap));
+
+    std::vector<RowsetSharedPtr> specified_rowsets = 
self->get_rowset_by_ids(&cur_rowset_ids);
+    OlapStopWatch watch;
+    auto token = self->calc_delete_bitmap_executor()->create_token();
+    RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, 
specified_rowsets, delete_bitmap,
+                                       cur_version - 1, token.get()));
+    RETURN_IF_ERROR(token->wait());
+    size_t total_rows = std::accumulate(
+            segments.begin(), segments.end(), 0,
+            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
+    LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " 
<< self->tablet_id()
+              << ", rowset_ids: " << cur_rowset_ids.size() << ", cur 
max_version: " << cur_version
+              << ", transaction_id: " << -1 << ", cost: " << 
watch.get_elapse_time_us()
+              << "(us), total rows: " << total_rows;
+    if (config::enable_merge_on_write_correctness_check) {
+        // check if all the rowset has ROWSET_SENTINEL_MARK
+        auto st = self->check_delete_bitmap_correctness(delete_bitmap, 
cur_version - 1, -1,
+                                                        cur_rowset_ids, 
&specified_rowsets);
+        if (!st.ok()) {
+            LOG(WARNING) << fmt::format("delete bitmap correctness check 
failed in publish phase!");
+        }
+        self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
+    }
+    for (auto& iter : delete_bitmap->delete_bitmap) {
+        self->_tablet_meta->delete_bitmap().merge(
+                {std::get<0>(iter.first), std::get<1>(iter.first), 
cur_version}, iter.second);
+    }
+
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 867ff9c1e3f..b59a4303a0b 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -229,6 +229,9 @@ public:
             const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, 
RowLocation>>>&
                     location_map);
 
+    static Status update_delete_bitmap_without_lock(const BaseTabletSPtr& self,
+                                                    const RowsetSharedPtr& 
rowset);
+
     
////////////////////////////////////////////////////////////////////////////
     // end MoW functions
     
////////////////////////////////////////////////////////////////////////////
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 431577726ad..d1bb734d903 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2300,61 +2300,6 @@ void Tablet::update_max_version_schema(const 
TabletSchemaSPtr& tablet_schema) {
     }
 }
 
-// The caller should hold _rowset_update_lock and _meta_lock lock.
-Status Tablet::update_delete_bitmap_without_lock(const TabletSharedPtr& self,
-                                                 const RowsetSharedPtr& 
rowset) {
-    DBUG_EXECUTE_IF("Tablet.update_delete_bitmap_without_lock.random_failed", {
-        if (rand() % 100 < (100 * dp->param("percent", 0.1))) {
-            
LOG_WARNING("Tablet.update_delete_bitmap_without_lock.random_failed");
-            return Status::InternalError(
-                    "debug tablet update delete bitmap without lock random 
failed");
-        }
-    });
-    int64_t cur_version = rowset->end_version();
-    std::vector<segment_v2::SegmentSharedPtr> segments;
-    
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
-
-    // If this rowset does not have a segment, there is no need for an update.
-    if (segments.empty()) {
-        LOG(INFO) << "[Schema Change or Clone] skip to construct delete bitmap 
tablet: "
-                  << self->tablet_id() << " cur max_version: " << cur_version;
-        return Status::OK();
-    }
-    RowsetIdUnorderedSet cur_rowset_ids;
-    RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, 
&cur_rowset_ids));
-    DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(self->tablet_id());
-    RETURN_IF_ERROR(self->calc_delete_bitmap_between_segments(rowset, 
segments, delete_bitmap));
-
-    std::vector<RowsetSharedPtr> specified_rowsets = 
self->get_rowset_by_ids(&cur_rowset_ids);
-    OlapStopWatch watch;
-    auto token = self->_engine.calc_delete_bitmap_executor()->create_token();
-    RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, 
specified_rowsets, delete_bitmap,
-                                       cur_version - 1, token.get()));
-    RETURN_IF_ERROR(token->wait());
-    size_t total_rows = std::accumulate(
-            segments.begin(), segments.end(), 0,
-            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
-    LOG(INFO) << "[Schema Change or Clone] construct delete bitmap tablet: " 
<< self->tablet_id()
-              << ", rowset_ids: " << cur_rowset_ids.size() << ", cur 
max_version: " << cur_version
-              << ", transaction_id: " << -1 << ", cost: " << 
watch.get_elapse_time_us()
-              << "(us), total rows: " << total_rows;
-    if (config::enable_merge_on_write_correctness_check) {
-        // check if all the rowset has ROWSET_SENTINEL_MARK
-        auto st = self->check_delete_bitmap_correctness(delete_bitmap, 
cur_version - 1, -1,
-                                                        cur_rowset_ids, 
&specified_rowsets);
-        if (!st.ok()) {
-            LOG(WARNING) << fmt::format("delete bitmap correctness check 
failed in publish phase!");
-        }
-        self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
-    }
-    for (auto& iter : delete_bitmap->delete_bitmap) {
-        self->_tablet_meta->delete_bitmap().merge(
-                {std::get<0>(iter.first), std::get<1>(iter.first), 
cur_version}, iter.second);
-    }
-
-    return Status::OK();
-}
-
 CalcDeleteBitmapExecutor* Tablet::calc_delete_bitmap_executor() {
     return _engine.calc_delete_bitmap_executor();
 }
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 0f6cfa9c04f..50970c51c5e 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -369,9 +369,6 @@ public:
     // end cooldown functions
     
////////////////////////////////////////////////////////////////////////////
 
-    static Status update_delete_bitmap_without_lock(const TabletSharedPtr& 
self,
-                                                    const RowsetSharedPtr& 
rowset);
-
     CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
     Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
                               DeleteBitmapPtr delete_bitmap, RowsetWriter* 
rowset_writer,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to