This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 991b9f64590 [fix](recycler) Fix premature exit recycling when there is 
an invalid storage vault (#46798) (#47159)
991b9f64590 is described below

commit 991b9f645908f9699a8e6a999895d8dcd73eb6c5
Author: abmdocrt <lianyuk...@selectdb.com>
AuthorDate: Mon Jan 20 10:37:01 2025 +0800

    [fix](recycler) Fix premature exit recycling when there is an invalid 
storage vault (#46798) (#47159)
    
    Related PR: Pick #46798 #47164
---
 cloud/src/recycler/hdfs_accessor.cpp |   1 +
 cloud/src/recycler/recycler.cpp      | 221 ++++++++++++++++++++++++++++-------
 cloud/test/recycler_test.cpp         | 136 +++++++++++++++++++++
 3 files changed, 314 insertions(+), 44 deletions(-)

diff --git a/cloud/src/recycler/hdfs_accessor.cpp 
b/cloud/src/recycler/hdfs_accessor.cpp
index 1999bcfa165..024acd0efe7 100644
--- a/cloud/src/recycler/hdfs_accessor.cpp
+++ b/cloud/src/recycler/hdfs_accessor.cpp
@@ -356,6 +356,7 @@ std::string extract_parent_path(const std::string& path) {
 }
 
 int HdfsAccessor::init() {
+    TEST_SYNC_POINT_RETURN_WITH_VALUE("HdfsAccessor::init.hdfs_init_failed", 
(int)-1);
     // TODO(plat1ko): Cache hdfsFS
     fs_ = HDFSBuilder::create_fs(info_.build_conf());
     if (!fs_) {
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 307528011ea..62a161e3edb 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -24,12 +24,14 @@
 
 #include <atomic>
 #include <chrono>
+#include <cstddef>
 #include <cstdint>
 #include <deque>
 #include <string>
 #include <string_view>
 
 #include "common/stopwatch.h"
+#include "meta-service/meta_service.h"
 #include "meta-service/meta_service_schema.h"
 #include "meta-service/txn_kv.h"
 #include "meta-service/txn_kv_error.h"
@@ -249,8 +251,9 @@ void Recycler::recycle_callback() {
         auto instance_recycler = std::make_shared<InstanceRecycler>(
                 txn_kv_, instance, _thread_pool_group, txn_lazy_committer_);
 
-        if (instance_recycler->init() != 0) {
-            LOG(WARNING) << "failed to init instance recycler, instance_id=" 
<< instance_id;
+        if (int r = instance_recycler->init(); r != 0) {
+            LOG(WARNING) << "failed to init instance recycler, instance_id=" 
<< instance_id
+                         << " ret=" << r;
             continue;
         }
         std::string recycle_job_key;
@@ -258,6 +261,8 @@ void Recycler::recycle_callback() {
         int ret = prepare_instance_recycle_job(txn_kv_.get(), recycle_job_key, 
instance_id,
                                                ip_port_, 
config::recycle_interval_seconds * 1000);
         if (ret != 0) { // Prepare failed
+            LOG(WARNING) << "failed to prepare recycle_job, instance_id=" << 
instance_id
+                         << " ret=" << ret;
             continue;
         } else {
             std::lock_guard lock(mtx_);
@@ -276,7 +281,12 @@ void Recycler::recycle_callback() {
             std::lock_guard lock(mtx_);
             recycling_instance_map_.erase(instance_id);
         }
-        LOG_INFO("finish recycle instance").tag("instance_id", instance_id);
+        auto elpased_ms =
+                ctime_ms -
+                
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+        LOG_INFO("finish recycle instance")
+                .tag("instance_id", instance_id)
+                .tag("cost_ms", elpased_ms);
     }
 }
 
@@ -523,35 +533,37 @@ int InstanceRecycler::init_storage_vault_accessors() {
             LOG(WARNING) << "malformed storage vault, unable to deserialize 
key=" << hex(k);
             return -1;
         }
+        
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
+                                 &accessor_map_, &vault);
 
         if (vault.has_hdfs_info()) {
             auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
             int ret = accessor->init();
             if (ret != 0) {
                 LOG(WARNING) << "failed to init hdfs accessor. instance_id=" 
<< instance_id_
-                             << " resource_id=" << vault.id() << " name=" << 
vault.name();
-                return ret;
+                             << " resource_id=" << vault.id() << " name=" << 
vault.name()
+                             << " hdfs_vault=" << 
vault.hdfs_info().ShortDebugString();
+                continue;
             }
 
             accessor_map_.emplace(vault.id(), std::move(accessor));
         } else if (vault.has_obj_info()) {
-#ifdef UNIT_TEST
-            auto accessor = std::make_shared<MockAccessor>();
-#else
             auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
             if (!s3_conf) {
-                LOG(WARNING) << "failed to init object accessor, instance_id=" 
<< instance_id_;
-                return -1;
+                LOG(WARNING) << "failed to init object accessor, invalid conf, 
instance_id="
+                             << instance_id_ << " s3_vault=" << 
vault.obj_info().ShortDebugString();
+                continue;
             }
 
             std::shared_ptr<S3Accessor> accessor;
             int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
             if (ret != 0) {
                 LOG(WARNING) << "failed to init s3 accessor. instance_id=" << 
instance_id_
-                             << " resource_id=" << vault.id() << " name=" << 
vault.name();
-                return ret;
+                             << " resource_id=" << vault.id() << " name=" << 
vault.name()
+                             << " ret=" << ret
+                             << " s3_vault=" << 
vault.obj_info().ShortDebugString();
+                continue;
             }
-#endif
 
             accessor_map_.emplace(vault.id(), std::move(accessor));
         }
@@ -562,6 +574,13 @@ int InstanceRecycler::init_storage_vault_accessors() {
         return -1;
     }
 
+    if (accessor_map_.empty()) {
+        LOG(WARNING) << "no accessors for instance=" << instance_id_;
+        return -2;
+    }
+    LOG_INFO("finish init instance recycler number_accessors={} instance=", 
accessor_map_.size(),
+             instance_id_);
+
     return 0;
 }
 
@@ -1461,7 +1480,8 @@ int InstanceRecycler::delete_rowset_data(const 
std::vector<doris::RowsetMetaClou
         }
 
         auto it = accessor_map_.find(rs.resource_id());
-        if (it == accessor_map_.end()) [[unlikely]] { // impossible
+        // possible if the accessor is not initilized correctly
+        if (it == accessor_map_.end()) [[unlikely]] {
             LOG_WARNING("instance has no such resource id")
                     .tag("instance_id", instance_id_)
                     .tag("resource_id", rs.resource_id());
@@ -1545,8 +1565,16 @@ int InstanceRecycler::delete_rowset_data(const 
std::vector<doris::RowsetMetaClou
                                                  [](const int& ret) { return 
ret != 0; });
     for (auto& [resource_id, file_paths] : resource_file_paths) {
         concurrent_delete_executor.add([&, rid = &resource_id, paths = 
&file_paths]() -> int {
+            DCHECK(accessor_map_.count(*rid))
+                    << "uninitilized accessor, instance_id=" << instance_id_
+                    << " resource_id=" << resource_id << " path[0]=" << 
(*paths)[0];
+            if (!accessor_map_.contains(*rid)) {
+                LOG_WARNING("delete rowset data accessor_map_ does not 
contains resouce id")
+                        .tag("resource_id", resource_id)
+                        .tag("instance_id", instance_id_);
+                return -1;
+            }
             auto& accessor = accessor_map_[*rid];
-            DCHECK(accessor);
             return accessor->delete_files(*paths);
         });
     }
@@ -1576,7 +1604,9 @@ int InstanceRecycler::delete_rowset_data(const 
std::string& resource_id, int64_t
     if (it == accessor_map_.end()) {
         LOG_WARNING("instance has no such resource id")
                 .tag("instance_id", instance_id_)
-                .tag("resource_id", resource_id);
+                .tag("resource_id", resource_id)
+                .tag("tablet_id", tablet_id)
+                .tag("rowset_id", rowset_id);
         return -1;
     }
     auto& accessor = it->second;
@@ -1588,42 +1618,107 @@ int InstanceRecycler::recycle_tablet(int64_t 
tablet_id) {
             .tag("instance_id", instance_id_)
             .tag("tablet_id", tablet_id);
 
+    int ret = 0;
     auto start_time = steady_clock::now();
 
+    // collect resource ids
+    std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0});
+    std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0});
+    std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id, 
""});
+    std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id + 
1, ""});
+
+    std::set<std::string> resource_ids;
+    int64_t recycle_rowsets_number = 0;
+    int64_t recycle_segments_number = 0;
+    int64_t recycle_rowsets_data_size = 0;
+    int64_t recycle_rowsets_index_size = 0;
+    int64_t max_rowset_version = 0;
+    int64_t min_rowset_creation_time = INT64_MAX;
+    int64_t max_rowset_creation_time = 0;
+    int64_t min_rowset_expiration_time = INT64_MAX;
+    int64_t max_rowset_expiration_time = 0;
+
     std::unique_ptr<int, std::function<void(int*)>> 
defer_log_statistics((int*)0x01, [&](int*) {
         auto cost = duration<float>(steady_clock::now() - start_time).count();
         LOG_INFO("recycle the rowsets of dropped tablet finished, cost={}s", 
cost)
                 .tag("instance_id", instance_id_)
-                .tag("tablet_id", tablet_id);
+                .tag("tablet_id", tablet_id)
+                .tag("recycle rowsets number", recycle_rowsets_number)
+                .tag("recycle segments number", recycle_segments_number)
+                .tag("all rowsets recycle data size", 
recycle_rowsets_data_size)
+                .tag("all rowsets recycle index size", 
recycle_rowsets_index_size)
+                .tag("max rowset version", max_rowset_version)
+                .tag("min rowset creation time", min_rowset_creation_time)
+                .tag("max rowset creation time", max_rowset_creation_time)
+                .tag("min rowset expiration time", min_rowset_expiration_time)
+                .tag("max rowset expiration time", max_rowset_expiration_time)
+                .tag("ret", ret);
     });
 
-    // delete all rowset kv in this tablet
-    std::string rs_key0 = meta_rowset_key({instance_id_, tablet_id, 0});
-    std::string rs_key1 = meta_rowset_key({instance_id_, tablet_id + 1, 0});
-    std::string recyc_rs_key0 = recycle_rowset_key({instance_id_, tablet_id, 
""});
-    std::string recyc_rs_key1 = recycle_rowset_key({instance_id_, tablet_id + 
1, ""});
-
-    int ret = 0;
     std::unique_ptr<Transaction> txn;
     if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
-        LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id;
+        LOG_WARNING("failed to recycle tablet ")
+                .tag("tablet id", tablet_id)
+                .tag("instance_id", instance_id_)
+                .tag("reason", "failed to create txn");
         ret = -1;
     }
-    txn->remove(rs_key0, rs_key1);
-    txn->remove(recyc_rs_key0, recyc_rs_key1);
-
-    // remove delete bitmap for MoW table
-    std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, 
tablet_id});
-    txn->remove(pending_key);
-    std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_, 
tablet_id, "", 0, 0});
-    std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_, 
tablet_id + 1, "", 0, 0});
-    txn->remove(delete_bitmap_start, delete_bitmap_end);
-
-    TxnErrorCode err = txn->commit();
-    if (err != TxnErrorCode::TXN_OK) {
-        LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id 
<< ", err=" << err;
+    GetRowsetResponse resp;
+    std::string msg;
+    MetaServiceCode code = MetaServiceCode::OK;
+    // get rowsets in tablet
+    internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, 
instance_id_,
+                        tablet_id, code, msg, &resp);
+    if (code != MetaServiceCode::OK) {
+        LOG_WARNING("failed to get rowsets of tablet when recycle tablet")
+                .tag("tablet id", tablet_id)
+                .tag("msg", msg)
+                .tag("code", code)
+                .tag("instance id", instance_id_);
         ret = -1;
     }
+    
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::recycle_tablet.create_rowset_meta", 
&resp);
+
+    for (const auto& rs_meta : resp.rowset_meta()) {
+        if (!rs_meta.has_resource_id()) {
+            LOG_WARNING("rowset meta does not have a resource id, impossible!")
+                    .tag("rs_meta", rs_meta.ShortDebugString())
+                    .tag("instance_id", instance_id_)
+                    .tag("tablet_id", tablet_id);
+            return -1;
+        }
+        auto it = accessor_map_.find(rs_meta.resource_id());
+        // possible if the accessor is not initilized correctly
+        if (it == accessor_map_.end()) [[unlikely]] {
+            LOG_WARNING(
+                    "failed to find resource id when recycle tablet, skip this 
vault accessor "
+                    "recycle process")
+                    .tag("tablet id", tablet_id)
+                    .tag("instance_id", instance_id_)
+                    .tag("resource_id", rs_meta.resource_id())
+                    .tag("rowset meta pb", rs_meta.ShortDebugString());
+            return -1;
+        }
+        recycle_rowsets_number += 1;
+        recycle_segments_number += rs_meta.num_segments();
+        recycle_rowsets_data_size += rs_meta.data_disk_size();
+        recycle_rowsets_index_size += rs_meta.index_disk_size();
+        max_rowset_version = std::max(max_rowset_version, 
rs_meta.end_version());
+        min_rowset_creation_time = std::min(min_rowset_creation_time, 
rs_meta.creation_time());
+        max_rowset_creation_time = std::max(max_rowset_creation_time, 
rs_meta.creation_time());
+        min_rowset_expiration_time = std::min(min_rowset_expiration_time, 
rs_meta.txn_expiration());
+        max_rowset_expiration_time = std::max(max_rowset_expiration_time, 
rs_meta.txn_expiration());
+        resource_ids.emplace(rs_meta.resource_id());
+    }
+
+    LOG_INFO("recycle tablet start to delete object")
+            .tag("instance id", instance_id_)
+            .tag("tablet id", tablet_id)
+            .tag("recycle tablet resource ids are",
+                 std::accumulate(resource_ids.begin(), resource_ids.end(), 
std::string(),
+                                 [](const std::string& a, const std::string& 
b) {
+                                     return a.empty() ? b : a + "," + b;
+                                 }));
 
     SyncExecutor<int> concurrent_delete_executor(
             _thread_pool_group.s3_producer_pool,
@@ -1631,16 +1726,20 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) 
{
             [](const int& ret) { return ret != 0; });
 
     // delete all rowset data in this tablet
-    for (auto& [_, accessor] : accessor_map_) {
-        concurrent_delete_executor.add([&, accessor_ptr = &accessor]() {
-            if 
((*accessor_ptr)->delete_directory(tablet_path_prefix(tablet_id)) != 0) {
+    // ATTN: there may be data leak if not all accessor initilized successfully
+    //       partial data deleted if the tablet is stored cross-storage vault
+    //       vault id is not attached to TabletMeta...
+    for (const auto& resource_id : resource_ids) {
+        concurrent_delete_executor.add([&, accessor_ptr = 
accessor_map_[resource_id]]() {
+            if (accessor_ptr->delete_directory(tablet_path_prefix(tablet_id)) 
!= 0) {
                 LOG(WARNING) << "failed to delete rowset data of tablet " << 
tablet_id
-                             << " s3_path=" << accessor->uri();
+                             << " path=" << accessor_ptr->uri();
                 return -1;
             }
             return 0;
         });
     }
+
     bool finished = true;
     std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
     for (int r : rets) {
@@ -1651,6 +1750,40 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id) {
 
     ret = finished ? ret : -1;
 
+    if (ret != 0) { // failed recycle tablet data
+        LOG_WARNING("ret!=0")
+                .tag("finished", finished)
+                .tag("ret", ret)
+                .tag("instance_id", instance_id_)
+                .tag("tablet_id", tablet_id);
+        return ret;
+    }
+
+    txn.reset();
+    if (txn_kv_->create_txn(&txn) != TxnErrorCode::TXN_OK) {
+        LOG_WARNING("failed to recycle tablet ")
+                .tag("tablet id", tablet_id)
+                .tag("instance_id", instance_id_)
+                .tag("reason", "failed to create txn");
+        ret = -1;
+    }
+    // delete all rowset kv in this tablet
+    txn->remove(rs_key0, rs_key1);
+    txn->remove(recyc_rs_key0, recyc_rs_key1);
+
+    // remove delete bitmap for MoW table
+    std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, 
tablet_id});
+    txn->remove(pending_key);
+    std::string delete_bitmap_start = meta_delete_bitmap_key({instance_id_, 
tablet_id, "", 0, 0});
+    std::string delete_bitmap_end = meta_delete_bitmap_key({instance_id_, 
tablet_id + 1, "", 0, 0});
+    txn->remove(delete_bitmap_start, delete_bitmap_end);
+
+    TxnErrorCode err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to delete rowset kv of tablet " << tablet_id 
<< ", err=" << err;
+        ret = -1;
+    }
+
     if (ret == 0) {
         // All object files under tablet have been deleted
         std::lock_guard lock(recycled_tablets_mtx_);
@@ -2233,7 +2366,7 @@ int InstanceRecycler::abort_timeout_txn() {
             txn_info.set_status(TxnStatusPB::TXN_STATUS_ABORTED);
             txn_info.set_finish_time(current_time);
             txn_info.set_reason("timeout");
-            VLOG_DEBUG << "txn_info=" << txn_info.DebugString();
+            VLOG_DEBUG << "txn_info=" << txn_info.ShortDebugString();
             txn_inf_val.clear();
             if (!txn_info.SerializeToString(&txn_inf_val)) {
                 LOG_WARNING("failed to serialize txn info").tag("key", hex(k));
@@ -2911,7 +3044,7 @@ int InstanceRecycler::recycle_expired_stage_objects() {
         const auto& old_obj = instance_info_.obj_info()[idx - 1];
         auto s3_conf = S3Conf::from_obj_store_info(old_obj);
         if (!s3_conf) {
-            LOG(WARNING) << "failed to init s3_conf with obj_info=" << 
old_obj.DebugString();
+            LOG(WARNING) << "failed to init s3_conf with obj_info=" << 
old_obj.ShortDebugString();
             continue;
         }
 
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index e38d25aaa84..42ab8f629cb 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -36,6 +36,7 @@
 #include "meta-service/mem_txn_kv.h"
 #include "meta-service/meta_service.h"
 #include "meta-service/txn_kv_error.h"
+#include "mock_accessor.h"
 #include "mock_resource_manager.h"
 #include "rate-limiter/rate_limiter.h"
 #include "recycler/checker.h"
@@ -3263,4 +3264,139 @@ TEST(RecyclerTest, 
delete_rowset_data_without_inverted_index_storage_format) {
     }
 }
 
+TEST(RecyclerTest, init_vault_accessor_failed_test) {
+    auto* sp = SyncPoint::get_instance();
+    std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, 
[&sp](int*) {
+        sp->clear_all_call_backs();
+        sp->clear_trace();
+        sp->disable_processing();
+    });
+
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    EXPECT_EQ(txn_kv->init(), 0);
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::string key;
+    std::string val;
+
+    InstanceKeyInfo key_info {"test_instance"};
+    instance_key(key_info, &key);
+    InstanceInfoPB instance;
+    instance.set_instance_id("GetObjStoreInfoTestInstance");
+    // failed to init because S3Conf::from_obj_store_info() fails
+    {
+        ObjectStoreInfoPB obj_info;
+        StorageVaultPB vault;
+        obj_info.set_id("id");
+        obj_info.set_ak("ak");
+        obj_info.set_sk("sk");
+        vault.mutable_obj_info()->MergeFrom(obj_info);
+        vault.set_name("test_failed_s3_vault_1");
+        vault.set_id("failed_s3_1");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        txn->put(storage_vault_key({instance.instance_id(), "1"}), 
vault.SerializeAsString());
+    }
+
+    // succeed to init but unuseful
+    {
+        ObjectStoreInfoPB obj_info;
+        StorageVaultPB vault;
+        obj_info.set_id("id");
+        obj_info.set_ak("ak");
+        obj_info.set_sk("sk");
+        obj_info.set_provider(ObjectStoreInfoPB_Provider_COS);
+        vault.mutable_obj_info()->MergeFrom(obj_info);
+        vault.set_name("test_failed_s3_vault_2");
+        vault.set_id("failed_s3_2");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        instance.set_instance_id("GetObjStoreInfoTestInstance");
+        txn->put(storage_vault_key({instance.instance_id(), "2"}), 
vault.SerializeAsString());
+    }
+
+    // failed to init because accessor->init() fails
+    {
+        HdfsBuildConf hdfs_build_conf;
+        StorageVaultPB vault;
+        hdfs_build_conf.set_fs_name("fs_name");
+        hdfs_build_conf.set_user("root");
+        HdfsVaultInfo hdfs_info;
+        hdfs_info.set_prefix("root_path");
+        hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+        vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+        vault.set_name("test_failed_hdfs_vault_1");
+        vault.set_id("failed_hdfs_1");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        instance.set_instance_id("GetObjStoreInfoTestInstance");
+        txn->put(storage_vault_key({instance.instance_id(), "3"}), 
vault.SerializeAsString());
+    }
+
+    auto accessor = std::make_shared<MockAccessor>();
+    EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0);
+    sp->set_call_back(
+            "InstanceRecycler::init_storage_vault_accessors.mock_vault", 
[&accessor](auto&& args) {
+                auto* map = try_any_cast<
+                        std::unordered_map<std::string, 
std::shared_ptr<StorageVaultAccessor>>*>(
+                        args[0]);
+                auto* vault = try_any_cast<StorageVaultPB*>(args[1]);
+                if (vault->name() == "test_success_hdfs_vault") {
+                    map->emplace(vault->id(), accessor);
+                }
+            });
+    sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta", 
[](auto&& args) {
+        auto* resp = try_any_cast<GetRowsetResponse*>(args[0]);
+        auto* rs = resp->add_rowset_meta();
+        rs->set_resource_id("failed_s3_2");
+        rs = resp->add_rowset_meta();
+        rs->set_resource_id("success_vault");
+    });
+    sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
+        auto* ret = try_any_cast_ret<int>(args);
+        ret->first = -1;
+        ret->second = true;
+    });
+
+    sp->enable_processing();
+
+    // succeed to init MockAccessor
+    {
+        HdfsBuildConf hdfs_build_conf;
+        StorageVaultPB vault;
+        hdfs_build_conf.set_fs_name("fs_name");
+        hdfs_build_conf.set_user("root");
+        HdfsVaultInfo hdfs_info;
+        hdfs_info.set_prefix("root_path");
+        hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
+        vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
+        vault.set_name("test_success_hdfs_vault");
+        vault.set_id("success_vault");
+        instance.add_storage_vault_names(vault.name());
+        instance.add_resource_ids(vault.id());
+        instance.set_instance_id("GetObjStoreInfoTestInstance");
+        txn->put(storage_vault_key({instance.instance_id(), "4"}), 
vault.SerializeAsString());
+    }
+    val = instance.SerializeAsString();
+    txn->put(key, val);
+    EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+    EXPECT_EQ(accessor->exists("data/0/test.csv"), 0);
+
+    InstanceRecycler recycler(txn_kv, instance, thread_group,
+                              std::make_shared<TxnLazyCommitter>(txn_kv));
+    EXPECT_EQ(recycler.init(), 0);
+    EXPECT_EQ(recycler.accessor_map_.size(), 2);
+
+    // unuseful obj accessor
+    
EXPECT_EQ(recycler.accessor_map_.at("failed_s3_2")->exists("data/0/test.csv"), 
-1);
+    // useful mock accessor
+    
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 0);
+
+    // recycle tablet will fail because unuseful obj accessor can not 
connectted
+    EXPECT_EQ(recycler.recycle_tablet(0), -1);
+    // however, useful mock accessor can recycle tablet
+    
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"),
 1);
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to