This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 789aa993442 [improve] (http api) Support calculating file information in the cloud(#37880) (#38626) 789aa993442 is described below commit 789aa993442c6c012d2886219f7e72f5c640a419 Author: Sun Chenyang <csun5...@gmail.com> AuthorDate: Thu Aug 1 10:36:51 2024 +0800 [improve] (http api) Support calculating file information in the cloud(#37880) (#38626) ## Proposed changes pick from master #37880 Issue Number: close #xxx --- be/src/cloud/cloud_tablet.h | 12 ----- be/src/http/action/calc_file_crc_action.cpp | 25 ++++++++--- be/src/http/action/calc_file_crc_action.h | 6 +-- be/src/olap/base_tablet.cpp | 32 ++++++++++++++ be/src/olap/base_tablet.h | 15 +++++++ be/src/olap/rowset/beta_rowset.cpp | 51 ++++++++++------------ be/src/olap/rowset/beta_rowset.h | 2 +- be/src/olap/tablet.cpp | 32 -------------- be/src/olap/tablet.h | 14 ------ be/src/service/http_service.cpp | 4 ++ .../test_calc_crc_fault_injection.groovy | 11 ++--- .../test_schema_change_storage_format.groovy | 3 -- .../test_variant_index_format_v1.groovy | 3 -- 13 files changed, 101 insertions(+), 109 deletions(-) diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 2e6938444d1..10ff1835e6c 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -147,18 +147,6 @@ public: std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction(); - void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor, - bool include_stale = false) { - std::shared_lock rlock(_meta_lock); - for (auto& [v, rs] : _rs_version_map) { - visitor(rs); - } - if (!include_stale) return; - for (auto& [v, rs] : _stale_rs_version_map) { - visitor(rs); - } - } - inline Version max_version() const { std::shared_lock rdlock(_meta_lock); return _tablet_meta->max_version(); diff --git a/be/src/http/action/calc_file_crc_action.cpp b/be/src/http/action/calc_file_crc_action.cpp index c713184ddfd..66ec96a2a9a 100644 --- a/be/src/http/action/calc_file_crc_action.cpp +++ b/be/src/http/action/calc_file_crc_action.cpp @@ -25,6 +25,7 @@ #include <exception> #include <string> +#include "cloud/cloud_storage_engine.h" #include "common/logging.h" #include "common/status.h" #include "http/http_channel.h" @@ -38,7 +39,7 @@ namespace doris { using namespace ErrorCode; -CalcFileCrcAction::CalcFileCrcAction(ExecEnv* exec_env, StorageEngine& engine, +CalcFileCrcAction::CalcFileCrcAction(ExecEnv* exec_env, BaseStorageEngine& engine, TPrivilegeHier::type hier, TPrivilegeType::type ptype) : HttpHandlerWithAuth(exec_env, hier, ptype), _engine(engine) {} @@ -58,16 +59,28 @@ Status CalcFileCrcAction::_handle_calc_crc(HttpRequest* req, uint32_t* crc_value return Status::InternalError("convert tablet id or failed, {}", e.what()); } - TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); + BaseTabletSPtr tablet = nullptr; + + if (auto cloudEngine = dynamic_cast<CloudStorageEngine*>(&_engine)) { + tablet = DORIS_TRY(cloudEngine->get_tablet(tablet_id)); + // sync all rowsets + RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(-1)); + } else if (auto storageEngine = dynamic_cast<StorageEngine*>(&_engine)) { + auto tabletPtr = storageEngine->tablet_manager()->get_tablet(tablet_id); + tablet = std::dynamic_pointer_cast<Tablet>(tabletPtr); + } else { + return Status::InternalError("convert _engine failed"); + } + if (tablet == nullptr) { - return Status::NotFound("Tablet not found. tablet_id={}", tablet_id); + return Status::NotFound("failed to get tablet {}", tablet_id); } const auto& req_start_version = req->param(PARAM_START_VERSION); const auto& req_end_version = req->param(PARAM_END_VERSION); *start_version = 0; - *end_version = tablet->max_version().second; + *end_version = tablet->max_version_unlocked(); if (!req_start_version.empty()) { try { @@ -85,8 +98,8 @@ Status CalcFileCrcAction::_handle_calc_crc(HttpRequest* req, uint32_t* crc_value } } - auto st = tablet->calc_local_file_crc(crc_value, *start_version, *end_version, rowset_count, - file_count); + auto st = tablet->calc_file_crc(crc_value, *start_version, *end_version, rowset_count, + file_count); if (!st.ok()) { return st; } diff --git a/be/src/http/action/calc_file_crc_action.h b/be/src/http/action/calc_file_crc_action.h index 2c0d19f0ca0..30df8bfe629 100644 --- a/be/src/http/action/calc_file_crc_action.h +++ b/be/src/http/action/calc_file_crc_action.h @@ -26,7 +26,7 @@ namespace doris { class HttpRequest; -class StorageEngine; +class BaseStorageEngine; class ExecEnv; const std::string PARAM_START_VERSION = "start_version"; @@ -35,7 +35,7 @@ const std::string PARAM_END_VERSION = "end_version"; // This action is used to calculate the crc value of the files in the tablet. class CalcFileCrcAction : public HttpHandlerWithAuth { public: - CalcFileCrcAction(ExecEnv* exec_env, StorageEngine& engine, TPrivilegeHier::type hier, + CalcFileCrcAction(ExecEnv* exec_env, BaseStorageEngine& engine, TPrivilegeHier::type hier, TPrivilegeType::type ptype); ~CalcFileCrcAction() override = default; @@ -47,7 +47,7 @@ private: int64_t* end_version, int32_t* rowset_count, int64_t* file_count); private: - StorageEngine& _engine; + BaseStorageEngine& _engine; }; } // end namespace doris diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index 71ece1599d6..c4330667dfc 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -33,6 +33,7 @@ #include "olap/txn_manager.h" #include "service/point_query_executor.h" #include "util/bvar_helper.h" +#include "util/crc32c.h" #include "util/debug_points.h" #include "util/doris_metrics.h" #include "vec/common/schema_util.h" @@ -1555,4 +1556,35 @@ void BaseTablet::calc_consecutive_empty_rowsets( } } +Status BaseTablet::calc_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version, + int32_t* rowset_count, int64_t* file_count) { + Version v(start_version, end_version); + std::vector<RowsetSharedPtr> rowsets; + traverse_rowsets([&rowsets, &v](const auto& rs) { + // get all rowsets + if (v.contains(rs->version())) { + rowsets.emplace_back(rs); + } + }); + std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator); + *rowset_count = rowsets.size(); + + *crc_value = 0; + *file_count = 0; + for (const auto& rs : rowsets) { + uint32_t rs_crc_value = 0; + int64_t rs_file_count = 0; + auto rowset = std::static_pointer_cast<BetaRowset>(rs); + auto st = rowset->calc_file_crc(&rs_crc_value, &rs_file_count); + if (!st.ok()) { + return st; + } + // crc_value is calculated based on the crc_value of each rowset. + *crc_value = crc32c::Extend(*crc_value, reinterpret_cast<const char*>(&rs_crc_value), + sizeof(rs_crc_value)); + *file_count += rs_file_count; + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index cefb31ccd11..fc75b5e31fd 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -256,6 +256,21 @@ public: // Return the merged schema of all rowsets virtual TabletSchemaSPtr merged_tablet_schema() const { return _max_version_schema; } + void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor, + bool include_stale = false) { + std::shared_lock rlock(_meta_lock); + for (auto& [v, rs] : _rs_version_map) { + visitor(rs); + } + if (!include_stale) return; + for (auto& [v, rs] : _stale_rs_version_map) { + visitor(rs); + } + } + + Status calc_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version, + int32_t* rowset_count, int64_t* file_count); + protected: // Find the missed versions until the spec_version. // diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index d16c1146142..992d437da4e 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -636,54 +636,51 @@ Status BetaRowset::add_to_binlog() { return Status::OK(); } -Status BetaRowset::calc_local_file_crc(uint32_t* crc_value, int64_t* file_count) { - if (!is_local()) { - DCHECK(false) << _rowset_meta->tablet_id() << ' ' << rowset_id(); - return Status::OK(); - } - +Status BetaRowset::calc_file_crc(uint32_t* crc_value, int64_t* file_count) { + const auto& fs = _rowset_meta->fs(); + DBUG_EXECUTE_IF("fault_inject::BetaRowset::calc_file_crc", + { return Status::Error<OS_ERROR>("fault_inject calc_file_crc error"); }); if (num_segments() < 1) { *crc_value = 0x92a8fc17; // magic code from crc32c table return Status::OK(); } // 1. pick up all the files including dat file and idx file - std::vector<io::Path> local_paths; - for (int i = 0; i < num_segments(); ++i) { - auto local_seg_path = local_segment_path(_tablet_path, rowset_id().to_string(), i); - local_paths.emplace_back(local_seg_path); + std::vector<io::Path> file_paths; + for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { + auto seg_path = DORIS_TRY(segment_path(seg_id)); + file_paths.emplace_back(seg_path); if (_schema->get_inverted_index_storage_format() == InvertedIndexStorageFormatPB::V1) { for (auto& column : _schema->columns()) { const TabletIndex* index_meta = _schema->get_inverted_index(*column); if (index_meta) { - std::string local_inverted_index_file = + std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v1( - InvertedIndexDescriptor::get_index_file_path_prefix( - local_seg_path), + InvertedIndexDescriptor::get_index_file_path_prefix(seg_path), index_meta->index_id(), index_meta->get_index_suffix()); - local_paths.emplace_back(std::move(local_inverted_index_file)); + file_paths.emplace_back(std::move(inverted_index_file)); } } } else { if (_schema->has_inverted_index()) { - std::string local_inverted_index_file = - InvertedIndexDescriptor::get_index_file_path_v2( - InvertedIndexDescriptor::get_index_file_path_prefix( - local_seg_path)); - local_paths.emplace_back(std::move(local_inverted_index_file)); + std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_path_v2( + InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)); + file_paths.emplace_back(std::move(inverted_index_file)); } } } + *crc_value = 0; + *file_count = file_paths.size(); + if (!is_local()) { + return Status::OK(); + } // 2. calculate the md5sum of each file const auto& local_fs = io::global_local_filesystem(); - DCHECK(!local_paths.empty()); + DCHECK(!file_paths.empty()); std::vector<std::string> all_file_md5; - all_file_md5.reserve(local_paths.size()); - for (const auto& file_path : local_paths) { - DBUG_EXECUTE_IF("fault_inject::BetaRowset::calc_local_file_crc", { - return Status::Error<OS_ERROR>("fault_inject calc_local_file_crc error"); - }); + all_file_md5.reserve(file_paths.size()); + for (const auto& file_path : file_paths) { std::string file_md5sum; auto status = local_fs->md5sum(file_path, &file_md5sum); if (!status.ok()) { @@ -696,9 +693,7 @@ Status BetaRowset::calc_local_file_crc(uint32_t* crc_value, int64_t* file_count) std::sort(all_file_md5.begin(), all_file_md5.end()); // 3. calculate the crc_value based on all_file_md5 - DCHECK(local_paths.size() == all_file_md5.size()); - *crc_value = 0; - *file_count = local_paths.size(); + DCHECK(file_paths.size() == all_file_md5.size()); for (auto& i : all_file_md5) { *crc_value = crc32c::Extend(*crc_value, i.data(), i.size()); } diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index bf7daf8bdfa..238073f066d 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -84,7 +84,7 @@ public: [[nodiscard]] virtual Status add_to_binlog() override; - Status calc_local_file_crc(uint32_t* crc_value, int64_t* file_count); + Status calc_file_crc(uint32_t* crc_value, int64_t* file_count); protected: BetaRowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta, diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index d8919fba417..1a1d3be6bc9 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -108,7 +108,6 @@ #include "service/point_query_executor.h" #include "tablet.h" #include "util/bvar_helper.h" -#include "util/crc32c.h" #include "util/debug_points.h" #include "util/defer_op.h" #include "util/doris_metrics.h" @@ -2652,35 +2651,4 @@ void Tablet::clear_cache() { recycle_segment_cache(stale_rowset_map()); } -Status Tablet::calc_local_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version, - int32_t* rowset_count, int64_t* file_count) { - Version v(start_version, end_version); - std::vector<RowsetSharedPtr> rowsets; - traverse_rowsets([&rowsets, &v](const auto& rs) { - // get local rowsets - if (rs->is_local() && v.contains(rs->version())) { - rowsets.emplace_back(rs); - } - }); - std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator); - *rowset_count = rowsets.size(); - - *crc_value = 0; - *file_count = 0; - for (const auto& rs : rowsets) { - uint32_t rs_crc_value; - int64_t rs_file_count = 0; - auto rowset = std::static_pointer_cast<BetaRowset>(rs); - auto st = rowset->calc_local_file_crc(&rs_crc_value, &rs_file_count); - if (!st.ok()) { - return st; - } - // crc_value is calculated based on the crc_value of each rowset. - *crc_value = crc32c::Extend(*crc_value, reinterpret_cast<const char*>(&rs_crc_value), - sizeof(rs_crc_value)); - *file_count += rs_file_count; - } - return Status::OK(); -} - } // namespace doris diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 759e3e65614..fa11c2d8685 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -421,18 +421,6 @@ public: int64_t start = -1); bool should_skip_compaction(CompactionType compaction_type, int64_t now); - void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor, - bool include_stale = false) { - std::shared_lock rlock(_meta_lock); - for (auto& [v, rs] : _rs_version_map) { - visitor(rs); - } - if (!include_stale) return; - for (auto& [v, rs] : _stale_rs_version_map) { - visitor(rs); - } - } - std::vector<std::string> get_binlog_filepath(std::string_view binlog_version) const; std::pair<std::string, int64_t> get_binlog_info(std::string_view binlog_version) const; std::string get_rowset_binlog_meta(std::string_view binlog_version, @@ -483,8 +471,6 @@ public: } inline bool is_full_compaction_running() const { return _is_full_compaction_running; } void clear_cache() override; - Status calc_local_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version, - int32_t* rowset_count, int64_t* file_count); private: Status _init_once_action(); diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 0be4dbff832..86862e4dbc9 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -405,6 +405,10 @@ void HttpService::register_cloud_handler(CloudStorageEngine& engine) { clear_file_cache_action); auto* show_hotspot_action = _pool.add(new ShowHotspotAction(engine)); _ev_http_server->register_handler(HttpMethod::GET, "/api/hotspot/tablet", show_hotspot_action); + + CalcFileCrcAction* calc_crc_action = _pool.add( + new CalcFileCrcAction(_env, engine, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); + _ev_http_server->register_handler(HttpMethod::GET, "/api/calc_crc", calc_crc_action); } // NOLINTEND(readability-function-size) diff --git a/regression-test/suites/fault_injection_p0/test_calc_crc_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_calc_crc_fault_injection.groovy index d20e7079780..e238aa48b47 100644 --- a/regression-test/suites/fault_injection_p0/test_calc_crc_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_calc_crc_fault_injection.groovy @@ -18,9 +18,6 @@ import org.codehaus.groovy.runtime.IOGroovyMethods suite("test_calc_crc", "nonConcurrent") { - if (isCloudMode()) { - return; - } def calc_file_crc_on_tablet = { ip, port, tablet -> return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet)) } @@ -79,12 +76,12 @@ suite("test_calc_crc", "nonConcurrent") { assertEquals("12", parseJson(out_0.trim()).file_count) try { - GetDebugPoint().enableDebugPointForAllBEs("fault_inject::BetaRowset::calc_local_file_crc") + GetDebugPoint().enableDebugPointForAllBEs("fault_inject::BetaRowset::calc_file_crc") def (code_1, out_1, err_1) = calc_file_crc_on_tablet(ip, port, tablet_id) logger.info("Run calc_file_crc_on_tablet: code=" + code_1 + ", out=" + out_1 + ", err=" + err_1) - assertTrue(out_1.contains("fault_inject calc_local_file_crc error")) + assertTrue(out_1.contains("fault_inject calc_file_crc error")) } finally { - GetDebugPoint().disableDebugPointForAllBEs("fault_inject::BetaRowset::calc_local_file_crc") + GetDebugPoint().disableDebugPointForAllBEs("fault_inject::BetaRowset::calc_file_crc") } def (code_2, out_2, err_2) = calc_file_crc_on_tablet_with_start(ip, port, tablet_id, 0) @@ -125,7 +122,7 @@ suite("test_calc_crc", "nonConcurrent") { def (code_6, out_6, err_6) = calc_file_crc_on_tablet(ip, port, 123) logger.info("Run calc_file_crc_on_tablet: code=" + code_6 + ", out=" + out_6 + ", err=" + err_6) - assertTrue(out_6.contains("Tablet not found.")) + assertTrue(out_6.contains("failed to get tablet")) sql "DROP TABLE IF EXISTS ${tableName}" } diff --git a/regression-test/suites/inverted_index_p0/storage_format/test_schema_change_storage_format.groovy b/regression-test/suites/inverted_index_p0/storage_format/test_schema_change_storage_format.groovy index a4ae74bd80a..fbccf0f8a62 100644 --- a/regression-test/suites/inverted_index_p0/storage_format/test_schema_change_storage_format.groovy +++ b/regression-test/suites/inverted_index_p0/storage_format/test_schema_change_storage_format.groovy @@ -16,9 +16,6 @@ // under the License. suite("test_local_schema_change_storge_format", "p0") { - if (isCloudMode()) { - return; - } def calc_file_crc_on_tablet = { ip, port, tablet -> return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet)) } diff --git a/regression-test/suites/inverted_index_p0/test_variant_index_format_v1.groovy b/regression-test/suites/inverted_index_p0/test_variant_index_format_v1.groovy index 627ed987e3a..153e8b82f56 100644 --- a/regression-test/suites/inverted_index_p0/test_variant_index_format_v1.groovy +++ b/regression-test/suites/inverted_index_p0/test_variant_index_format_v1.groovy @@ -16,9 +16,6 @@ // under the License. suite("test_variant_index_format_v1", "p0") { - if (isCloudMode()) { - return; - } def calc_file_crc_on_tablet = { ip, port, tablet -> return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org