This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 40efd97666e branch-3.0: [Fix](recycler) Fix potential data leak when a 
partial update load which has publish conflict fails #45626 (#46138)
40efd97666e is described below

commit 40efd97666ee87d8d8a556a948cfd1d2902630c7
Author: bobhan1 <bao...@selectdb.com>
AuthorDate: Tue Dec 31 10:58:32 2024 +0800

    branch-3.0: [Fix](recycler) Fix potential data leak when a partial update 
load which has publish conflict fails #45626 (#46138)
    
    pick https://github.com/apache/doris/pull/45626
---
 cloud/src/recycler/recycler.cpp | 16 ++++++++
 cloud/test/recycler_test.cpp    | 81 +++++++++++++++++++++++++++++++++++++++++
 2 files changed, 97 insertions(+)

diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 84d755958ee..04476704bd3 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -1449,6 +1449,8 @@ int InstanceRecycler::delete_rowset_data(const 
std::vector<doris::RowsetMetaClou
     int ret = 0;
     // resource_id -> file_paths
     std::map<std::string, std::vector<std::string>> resource_file_paths;
+    // (resource_id, tablet_id, rowset_id)
+    std::vector<std::tuple<std::string, int64_t, std::string>> 
rowsets_delete_by_prefix;
 
     for (const auto& rs : rowsets) {
         {
@@ -1519,6 +1521,12 @@ int InstanceRecycler::delete_rowset_data(const 
std::vector<doris::RowsetMetaClou
             index_format = index_info.first;
             index_ids = std::move(index_info.second);
         }
+        if (rs.rowset_state() == RowsetStatePB::BEGIN_PARTIAL_UPDATE) {
+            // if rowset state is RowsetStatePB::BEGIN_PARTIAL_UPDATE, the 
number of segments data
+            // may be larger than num_segments field in RowsetMeta, so we need 
to delete the rowset's data by prefix
+            rowsets_delete_by_prefix.emplace_back(rs.resource_id(), tablet_id, 
rs.rowset_id_v2());
+            continue;
+        }
         for (int64_t i = 0; i < num_segments; ++i) {
             file_paths.push_back(segment_path(tablet_id, rowset_id, i));
             if (index_format == InvertedIndexStorageFormatPB::V1) {
@@ -1542,6 +1550,14 @@ int InstanceRecycler::delete_rowset_data(const 
std::vector<doris::RowsetMetaClou
             return accessor->delete_files(*paths);
         });
     }
+    for (const auto& [resource_id, tablet_id, rowset_id] : 
rowsets_delete_by_prefix) {
+        LOG_INFO(
+                "delete rowset {} by prefix because it's in 
BEGIN_PARTIAL_UPDATE state, "
+                "resource_id={}, tablet_id={}, instance_id={}",
+                rowset_id, resource_id, tablet_id, instance_id_);
+        concurrent_delete_executor.add(
+                [&]() -> int { return delete_rowset_data(resource_id, 
tablet_id, rowset_id); });
+    }
     bool finished = true;
     std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
     for (int r : rets) {
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 0bc16644a82..e38d25aaa84 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -121,6 +121,24 @@ static doris::RowsetMetaCloudPB create_rowset(const 
std::string& resource_id, in
     return rowset;
 }
 
+static doris::RowsetMetaCloudPB create_rowset(const std::string& resource_id, 
int64_t tablet_id,
+                                              int64_t index_id, int 
num_segments,
+                                              const 
doris::TabletSchemaCloudPB& schema,
+                                              RowsetStatePB rowset_state, 
int64_t txn_id = 0) {
+    doris::RowsetMetaCloudPB rowset;
+    rowset.set_rowset_id(0); // useless but required
+    rowset.set_rowset_id_v2(next_rowset_id());
+    rowset.set_txn_id(txn_id);
+    rowset.set_num_segments(num_segments);
+    rowset.set_tablet_id(tablet_id);
+    rowset.set_index_id(index_id);
+    rowset.set_resource_id(resource_id);
+    rowset.set_schema_version(schema.schema_version());
+    rowset.mutable_tablet_schema()->CopyFrom(schema);
+    rowset.set_rowset_state(rowset_state);
+    return rowset;
+}
+
 static int create_recycle_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor,
                                  const doris::RowsetMetaCloudPB& rowset, 
RecycleRowsetPB::Type type,
                                  bool write_schema_kv) {
@@ -924,6 +942,69 @@ TEST(RecyclerTest, recycle_tmp_rowsets) {
     EXPECT_EQ(insert_no_inverted_index, 4);
 }
 
+TEST(RecyclerTest, recycle_tmp_rowsets_partial_update) {
+    config::retention_seconds = 0;
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    ASSERT_EQ(txn_kv->init(), 0);
+
+    InstanceInfoPB instance;
+    instance.set_instance_id(instance_id);
+    auto obj_info = instance.add_obj_info();
+    obj_info->set_id("recycle_tmp_rowsets_partial_update");
+    obj_info->set_ak(config::test_s3_ak);
+    obj_info->set_sk(config::test_s3_sk);
+    obj_info->set_endpoint(config::test_s3_endpoint);
+    obj_info->set_region(config::test_s3_region);
+    obj_info->set_bucket(config::test_s3_bucket);
+    obj_info->set_prefix("recycle_tmp_rowsets_partial_update");
+
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    ASSERT_EQ(recycler.init(), 0);
+
+    doris::TabletSchemaCloudPB schema;
+
+    auto accessor = recycler.accessor_map_.begin()->second;
+    int64_t tablet_id = 10015;
+    int64_t index_id = 1000;
+    int64_t txn_id_base = 293039;
+    for (int j = 0; j < 20; ++j) {
+        int64_t txn_id = txn_id_base + j;
+        int segment_num = 5;
+        if (j < 15) {
+            auto rowset = create_rowset("recycle_tmp_rowsets_partial_update", 
tablet_id, index_id,
+                                        segment_num, schema, 
RowsetStatePB::VISIBLE, txn_id);
+            create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, false);
+        } else {
+            auto rowset =
+                    create_rowset("recycle_tmp_rowsets_partial_update", 
tablet_id, tablet_id,
+                                  segment_num, schema, 
RowsetStatePB::BEGIN_PARTIAL_UPDATE, txn_id);
+            create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, false);
+
+            // partial update may write new segment to an existing tmp rowsets
+            // we simulate that partial update load fails after it writes a 
segment
+            // and before it updates the segments num in tmp rowset meta
+            int extra_segment_id = segment_num;
+            auto path = segment_path(rowset.tablet_id(), 
rowset.rowset_id_v2(), extra_segment_id);
+            accessor->put_file(path, path);
+        }
+    }
+
+    ASSERT_EQ(recycler.recycle_tmp_rowsets(), 0);
+    // check rowset does not exist on obj store
+    std::unique_ptr<ListIterator> list_iter;
+    ASSERT_EQ(0, accessor->list_directory("data/", &list_iter));
+    ASSERT_FALSE(list_iter->has_next());
+    // check all tmp rowset kv have been deleted
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::unique_ptr<RangeGetIterator> it;
+    auto begin_key = meta_rowset_tmp_key({instance_id, 0, 0});
+    auto end_key = meta_rowset_tmp_key({instance_id, INT64_MAX, 0});
+    ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
+    ASSERT_EQ(it->size(), 0);
+}
+
 TEST(RecyclerTest, recycle_tablet) {
     auto txn_kv = std::make_shared<MemTxnKv>();
     ASSERT_EQ(txn_kv->init(), 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to