This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3b19178d6cc01fe6347dc6e9774c20115cdf3648 Author: airborne12 <airborn...@gmail.com> AuthorDate: Thu Jul 25 16:33:34 2024 +0800 [Fix](recycler) recycler need index_suffix and inverted index storage format to delete idx file (#38306) ## Proposed changes Recycler did not correctly delete idx file for inverted index format V2 and variant data type. --- cloud/src/recycler/recycler.cpp | 74 ++++++++++++++++++++++++++++------------- cloud/src/recycler/recycler.h | 2 ++ cloud/src/recycler/util.h | 15 +++++++-- cloud/test/recycler_test.cpp | 52 +++++++++++++++++++++-------- 4 files changed, 103 insertions(+), 40 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index b4c0ae84fc3..204ff8d2f18 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -363,7 +363,7 @@ public: : instance_id_(std::move(instance_id)), txn_kv_(std::move(txn_kv)) {} // Return 0 if success, 1 if schema kv not found, negative for error - int get(int64_t index_id, int32_t schema_version, std::vector<int64_t>& res) { + int get(int64_t index_id, int32_t schema_version, InvertedIndexInfo& res) { { std::lock_guard lock(mtx_); if (schemas_without_inverted_index_.count({index_id, schema_version})) { @@ -395,10 +395,13 @@ public: LOG(WARNING) << "malformed schema value, key=" << hex(schema_key); return -1; } - if (schema.index_size() > 0) { - res.reserve(schema.index_size()); + if (schema.index_size() > 0 && schema.has_inverted_index_storage_format()) { + res.first = schema.inverted_index_storage_format(); + res.second.reserve(schema.index_size()); for (auto& i : schema.index()) { - res.push_back(i.index_id()); + if (i.has_index_type() && i.index_type() == IndexType::INVERTED) { + res.second.push_back(std::make_pair(i.index_id(), i.index_suffix_name())); + } } } insert(index_id, schema_version, res); @@ -406,15 +409,15 @@ public: } // Empty `ids` means this schema has no inverted index - void insert(int64_t index_id, int32_t schema_version, const std::vector<int64_t>& ids) { - if (ids.empty()) { + void insert(int64_t index_id, int32_t schema_version, const InvertedIndexInfo& index_info) { + if (index_info.second.empty()) { TEST_SYNC_POINT("InvertedIndexIdCache::insert1"); std::lock_guard lock(mtx_); schemas_without_inverted_index_.emplace(index_id, schema_version); } else { TEST_SYNC_POINT("InvertedIndexIdCache::insert2"); std::lock_guard lock(mtx_); - inverted_index_id_map_.try_emplace({index_id, schema_version}, ids); + inverted_index_id_map_.try_emplace({index_id, schema_version}, index_info); } } @@ -433,7 +436,7 @@ private: } }; // <index_id, schema_version> -> inverted_index_ids - std::unordered_map<Key, std::vector<int64_t>, HashOfKey> inverted_index_id_map_; + std::unordered_map<Key, InvertedIndexInfo, HashOfKey> inverted_index_id_map_; // Store <index_id, schema_version> of schema which doesn't have inverted index std::unordered_set<Key, HashOfKey> schemas_without_inverted_index_; }; @@ -1197,17 +1200,26 @@ int InstanceRecycler::delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta const auto& rowset_id = rs_meta_pb.rowset_id_v2(); int64_t tablet_id = rs_meta_pb.tablet_id(); // process inverted indexes - std::vector<int64_t> index_ids; + std::vector<std::pair<int64_t, std::string>> index_ids; index_ids.reserve(rs_meta_pb.tablet_schema().index_size()); for (auto& i : rs_meta_pb.tablet_schema().index()) { - index_ids.push_back(i.index_id()); + if (i.has_index_type() && i.index_type() == IndexType::INVERTED) { + index_ids.push_back(std::make_pair(i.index_id(), i.index_suffix_name())); + } } std::vector<std::string> file_paths; - file_paths.reserve(num_segments * (1 + index_ids.size())); + auto tablet_schema = rs_meta_pb.tablet_schema(); for (int64_t i = 0; i < num_segments; ++i) { file_paths.push_back(segment_path(tablet_id, rowset_id, i)); - for (int64_t index_id : index_ids) { - file_paths.push_back(inverted_index_path(tablet_id, rowset_id, i, index_id)); + if (tablet_schema.has_inverted_index_storage_format()) { + if (tablet_schema.inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { + for (const auto& index_id : index_ids) { + file_paths.push_back(inverted_index_path_v1(tablet_id, rowset_id, i, + index_id.first, index_id.second)); + } + } else if (!index_ids.empty()) { + file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id, i)); + } } } // TODO(AlexYue): seems could do do batch @@ -1218,7 +1230,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou int ret = 0; // resource_id -> file_paths std::map<std::string, std::vector<std::string>> resource_file_paths; - for (auto& rs : rowsets) { + + for (const auto& rs : rowsets) { { std::lock_guard lock(recycled_tablets_mtx_); if (recycled_tablets_.count(rs.tablet_id())) { @@ -1241,14 +1254,21 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou int64_t num_segments = rs.num_segments(); if (num_segments <= 0) continue; - // process inverted indexes - std::vector<int64_t> index_ids; + // Process inverted indexes + std::vector<std::pair<int64_t, std::string>> index_ids; + // default format as v2. + InvertedIndexStorageFormatPB index_format = InvertedIndexStorageFormatPB::V2; + if (rs.has_tablet_schema()) { - index_ids.reserve(rs.tablet_schema().index().size()); - for (auto& index_pb : rs.tablet_schema().index()) { - index_ids.push_back(index_pb.index_id()); + for (const auto& index : rs.tablet_schema().index()) { + if (index.has_index_type() && index.index_type() == IndexType::INVERTED) { + index_ids.emplace_back(index.index_id(), index.index_suffix_name()); + } + } + if (rs.tablet_schema().has_inverted_index_storage_format()) { + index_format = rs.tablet_schema().inverted_index_storage_format(); } - } else { // Detached schema + } else { if (!rs.has_index_id() || !rs.has_schema_version()) { LOG(WARNING) << "rowset must have either schema or schema_version and index_id, " "instance_id=" @@ -1257,8 +1277,9 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou ret = -1; continue; } + InvertedIndexInfo index_info; int get_ret = - inverted_index_id_cache_->get(rs.index_id(), rs.schema_version(), index_ids); + inverted_index_id_cache_->get(rs.index_id(), rs.schema_version(), index_info); if (get_ret != 0) { if (get_ret == 1) { // Schema kv not found // Check tablet existence @@ -1276,11 +1297,18 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou ret = -1; continue; } + index_format = index_info.first; + index_ids = std::move(index_info.second); } for (int64_t i = 0; i < num_segments; ++i) { file_paths.push_back(segment_path(tablet_id, rowset_id, i)); - for (int64_t index_id : index_ids) { - file_paths.push_back(inverted_index_path(tablet_id, rowset_id, i, index_id)); + if (index_format == InvertedIndexStorageFormatPB::V1) { + for (const auto& index_id : index_ids) { + file_paths.push_back(inverted_index_path_v1(tablet_id, rowset_id, i, + index_id.first, index_id.second)); + } + } else if (!index_ids.empty()) { + file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id, i)); } } } diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index 130d860849d..1085d9362d5 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -206,6 +206,8 @@ private: // TODO(plat1ko): Add new accessor to map in runtime for new created storage vaults std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>> accessor_map_; + using InvertedIndexInfo = + std::pair<InvertedIndexStorageFormatPB, std::vector<std::pair<int64_t, std::string>>>; class InvertedIndexIdCache; std::unique_ptr<InvertedIndexIdCache> inverted_index_id_cache_; diff --git a/cloud/src/recycler/util.h b/cloud/src/recycler/util.h index 20ea66def8c..6c62bcaf7b0 100644 --- a/cloud/src/recycler/util.h +++ b/cloud/src/recycler/util.h @@ -56,9 +56,18 @@ inline std::string segment_path(int64_t tablet_id, const std::string& rowset_id, return fmt::format("data/{}/{}_{}.dat", tablet_id, rowset_id, segment_id); } -inline std::string inverted_index_path(int64_t tablet_id, const std::string& rowset_id, - int64_t segment_id, int64_t index_id) { - return fmt::format("data/{}/{}_{}_{}.idx", tablet_id, rowset_id, segment_id, index_id); +inline std::string inverted_index_path_v2(int64_t tablet_id, const std::string& rowset_id, + int64_t segment_id) { + return fmt::format("data/{}/{}_{}.idx", tablet_id, rowset_id, segment_id); +} + +inline std::string inverted_index_path_v1(int64_t tablet_id, const std::string& rowset_id, + int64_t segment_id, int64_t index_id, + std::string_view index_path_suffix) { + std::string suffix = + index_path_suffix.empty() ? "" : std::string {"@"} + index_path_suffix.data(); + return fmt::format("data/{}/{}_{}_{}{}.idx", tablet_id, rowset_id, segment_id, index_id, + suffix); } inline std::string rowset_path_prefix(int64_t tablet_id, const std::string& rowset_id) { diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index d8da067457b..3f9d1175746 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -149,8 +149,8 @@ static int create_recycle_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor, auto path = segment_path(rowset.tablet_id(), rowset.rowset_id_v2(), i); accessor->put_file(path, ""); for (auto& index : rowset.tablet_schema().index()) { - auto path = inverted_index_path(rowset.tablet_id(), rowset.rowset_id_v2(), i, - index.index_id()); + auto path = inverted_index_path_v1(rowset.tablet_id(), rowset.rowset_id_v2(), i, + index.index_id(), index.index_suffix_name()); accessor->put_file(path, ""); } } @@ -187,8 +187,8 @@ static int create_tmp_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor, auto path = segment_path(rowset.tablet_id(), rowset.rowset_id_v2(), i); accessor->put_file(path, path); for (auto& index : rowset.tablet_schema().index()) { - auto path = inverted_index_path(rowset.tablet_id(), rowset.rowset_id_v2(), i, - index.index_id()); + auto path = inverted_index_path_v1(rowset.tablet_id(), rowset.rowset_id_v2(), i, + index.index_id(), index.index_suffix_name()); accessor->put_file(path, path); } } @@ -234,7 +234,7 @@ static int create_committed_rowset(TxnKv* txn_kv, StorageVaultAccessor* accessor auto path = segment_path(tablet_id, rowset_id, i); accessor->put_file(path, ""); for (int j = 0; j < num_inverted_indexes; ++j) { - auto path = inverted_index_path(tablet_id, rowset_id, i, j); + auto path = inverted_index_path_v1(tablet_id, rowset_id, i, j, ""); accessor->put_file(path, ""); } } @@ -658,8 +658,11 @@ TEST(RecyclerTest, recycle_rowsets) { for (int i = 0; i < 5; ++i) { auto& schema = schemas.emplace_back(); schema.set_schema_version(i); + schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); for (int j = 0; j < i; ++j) { - schema.add_index()->set_index_id(j); + auto index = schema.add_index(); + index->set_index_id(j); + index->set_index_type(IndexType::INVERTED); } } @@ -737,8 +740,11 @@ TEST(RecyclerTest, bench_recycle_rowsets) { for (int i = 0; i < 5; ++i) { auto& schema = schemas.emplace_back(); schema.set_schema_version(i); + schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); for (int j = 0; j < i; ++j) { - schema.add_index()->set_index_id(j); + auto index = schema.add_index(); + index->set_index_id(j); + index->set_index_type(IndexType::INVERTED); } } @@ -799,9 +805,12 @@ TEST(RecyclerTest, recycle_tmp_rowsets) { std::vector<doris::TabletSchemaCloudPB> schemas; for (int i = 0; i < 5; ++i) { auto& schema = schemas.emplace_back(); + schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); schema.set_schema_version(i); for (int j = 0; j < i; ++j) { - schema.add_index()->set_index_id(j); + auto index = schema.add_index(); + index->set_index_id(j); + index->set_index_type(IndexType::INVERTED); } } @@ -860,7 +869,9 @@ TEST(RecyclerTest, recycle_tablet) { auto& schema = schemas.emplace_back(); schema.set_schema_version(i); for (int j = 0; j < i; ++j) { - schema.add_index()->set_index_id(j); + auto index = schema.add_index(); + index->set_index_id(j); + index->set_index_type(IndexType::INVERTED); } } @@ -932,8 +943,11 @@ TEST(RecyclerTest, recycle_indexes) { for (int i = 0; i < 5; ++i) { auto& schema = schemas.emplace_back(); schema.set_schema_version(i); + schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); for (int j = 0; j < i; ++j) { - schema.add_index()->set_index_id(j); + auto index = schema.add_index(); + index->set_index_id(j); + index->set_index_type(IndexType::INVERTED); } } @@ -1041,8 +1055,11 @@ TEST(RecyclerTest, recycle_partitions) { for (int i = 0; i < 5; ++i) { auto& schema = schemas.emplace_back(); schema.set_schema_version(i); + schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); for (int j = 0; j < i; ++j) { - schema.add_index()->set_index_id(j); + auto index = schema.add_index(); + index->set_index_id(j); + index->set_index_type(IndexType::INVERTED); } } @@ -1973,7 +1990,9 @@ TEST(RecyclerTest, recycle_deleted_instance) { auto& schema = schemas.emplace_back(); schema.set_schema_version(i); for (int j = 0; j < i; ++j) { - schema.add_index()->set_index_id(j); + auto index = schema.add_index(); + index->set_index_id(j); + index->set_index_type(IndexType::INVERTED); } } @@ -2241,7 +2260,9 @@ TEST(CheckerTest, DISABLED_abnormal_inverted_check) { auto& schema = schemas.emplace_back(); schema.set_schema_version(i); for (int j = 0; j < i; ++j) { - schema.add_index()->set_index_id(j); + auto index = schema.add_index(); + index->set_index_id(j); + index->set_index_type(IndexType::INVERTED); } } @@ -2525,8 +2546,11 @@ TEST(RecyclerTest, delete_rowset_data) { for (int i = 0; i < 5; ++i) { auto& schema = schemas.emplace_back(); schema.set_schema_version(i); + schema.set_inverted_index_storage_format(InvertedIndexStorageFormatPB::V1); for (int j = 0; j < i; ++j) { - schema.add_index()->set_index_id(j); + auto index = schema.add_index(); + index->set_index_id(j); + index->set_index_type(IndexType::INVERTED); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org