This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 4820ba2d41d [feat](cloud) Add injection point http api for ms (#41982) (#47204) 4820ba2d41d is described below commit 4820ba2d41dbb4251f606425d18448f78d6bca49 Author: Gavin Chou <ga...@selectdb.com> AuthorDate: Mon Jan 20 01:48:08 2025 +0800 [feat](cloud) Add injection point http api for ms (#41982) (#47204) pick #41982 Co-authored-by: Lei Zhang <27994433+swjtu-zhang...@users.noreply.github.com> --- build.sh | 1 + cloud/CMakeLists.txt | 4 + cloud/src/meta-service/CMakeLists.txt | 1 + cloud/src/meta-service/injection_point_http.cpp | 226 +++++++++++++++++++++ cloud/src/meta-service/meta_service.cpp | 2 +- cloud/src/meta-service/meta_service_http.cpp | 4 + cloud/src/meta-service/meta_service_txn.cpp | 2 +- cloud/src/meta-service/txn_lazy_committer.cpp | 5 +- common/cpp/sync_point.h | 2 +- .../apache/doris/cloud/catalog/CloudPartition.java | 14 +- 10 files changed, 248 insertions(+), 13 deletions(-) diff --git a/build.sh b/build.sh index 8f1262aa76b..42c3a09a950 100755 --- a/build.sh +++ b/build.sh @@ -632,6 +632,7 @@ if [[ "${BUILD_CLOUD}" -eq 1 ]]; then -DCMAKE_MAKE_PROGRAM="${MAKE_PROGRAM}" \ -DCMAKE_EXPORT_COMPILE_COMMANDS=ON \ -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" \ + -DENABLE_INJECTION_POINT="${ENABLE_INJECTION_POINT}" \ -DMAKE_TEST=OFF \ "${CMAKE_USE_CCACHE}" \ -DUSE_LIBCPP="${USE_LIBCPP}" \ diff --git a/cloud/CMakeLists.txt b/cloud/CMakeLists.txt index a8eccf60896..627e6f283b9 100644 --- a/cloud/CMakeLists.txt +++ b/cloud/CMakeLists.txt @@ -411,6 +411,10 @@ if (${MAKE_TEST} STREQUAL "ON") add_definitions(-DBE_TEST) endif () +if (ENABLE_INJECTION_POINT) + add_definitions(-DENABLE_INJECTION_POINT) +endif() + # Add libs if needed, download to current dir -- ${BUILD_DIR} set(FDB_LIB "fdb_lib_7_1_23.tar.xz") if (ARCH_AARCH64) diff --git a/cloud/src/meta-service/CMakeLists.txt b/cloud/src/meta-service/CMakeLists.txt index c7c4887a068..d11f87e7fa2 100644 --- a/cloud/src/meta-service/CMakeLists.txt +++ b/cloud/src/meta-service/CMakeLists.txt @@ -12,6 +12,7 @@ add_library(MetaService meta_server.cpp meta_service.cpp meta_service_http.cpp + injection_point_http.cpp meta_service_job.cpp meta_service_resource.cpp meta_service_schema.cpp diff --git a/cloud/src/meta-service/injection_point_http.cpp b/cloud/src/meta-service/injection_point_http.cpp new file mode 100644 index 00000000000..80d1bcfdf2e --- /dev/null +++ b/cloud/src/meta-service/injection_point_http.cpp @@ -0,0 +1,226 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <fmt/format.h> +#include <gen_cpp/cloud.pb.h> + +#include "common/config.h" +#include "common/logging.h" +#include "cpp/sync_point.h" +#include "meta-service/keys.h" +#include "meta-service/meta_service_helper.h" +#include "meta-service/txn_kv.h" +#include "meta-service/txn_kv_error.h" +#include "meta_service.h" +#include "meta_service_http.h" + +namespace doris::cloud { + +std::map<std::string, std::function<void()>> suite_map; +std::once_flag register_suites_once; + +inline std::default_random_engine make_random_engine() { + return std::default_random_engine( + static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count())); +} + +static void register_suites() { + suite_map.emplace("test_txn_lazy_commit", [] { + auto sp = SyncPoint::get_instance(); + + sp->set_call_back("commit_txn_immediately::advance_last_pending_txn_id", [&](auto&& args) { + std::default_random_engine rng = make_random_engine(); + std::uniform_int_distribution<uint32_t> u(100, 1000); + uint32_t duration_ms = u(rng); + LOG(INFO) << "commit_txn_immediately::advance_last_pending_txn_id sleep " << duration_ms + << " ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); + }); + + sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", [&](auto&& args) { + std::default_random_engine rng = make_random_engine(); + std::uniform_int_distribution<uint32_t> u(100, 1000); + uint32_t duration_ms = u(rng); + LOG(INFO) << "commit_txn_eventually::txn_lazy_committer_submit sleep " << duration_ms + << " ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); + }); + + sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait", [&](auto&& args) { + std::default_random_engine rng = make_random_engine(); + std::uniform_int_distribution<uint32_t> u(100, 1000); + uint32_t duration_ms = u(rng); + LOG(INFO) << "commit_txn_eventually::txn_lazy_committer_wait sleep " << duration_ms + << " ms"; + std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); + }); + + sp->set_call_back("convert_tmp_rowsets::before_commit", [&](auto&& args) { + std::default_random_engine rng = make_random_engine(); + std::uniform_int_distribution<uint32_t> u(1, 50); + uint32_t duration_ms = u(rng); + std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); + LOG(INFO) << "convert_tmp_rowsets::before_commit sleep " << duration_ms << " ms"; + if (duration_ms <= 25) { + MetaServiceCode* code = try_any_cast<MetaServiceCode*>(args[0]); + *code = MetaServiceCode::KV_TXN_CONFLICT; + bool* pred = try_any_cast<bool*>(args.back()); + *pred = true; + LOG(INFO) << "convert_tmp_rowsets::before_commit random_value=" << duration_ms + << " inject kv txn conflict"; + } + }); + }); +} + +HttpResponse set_sleep(const std::string& point, const brpc::URI& uri) { + std::string duration_str(http_query(uri, "duration")); + int64_t duration = 0; + try { + duration = std::stol(duration_str); + } catch (const std::exception& e) { + auto msg = fmt::format("invalid duration:{}", duration_str); + LOG(WARNING) << msg; + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg); + } + + auto sp = SyncPoint::get_instance(); + sp->set_call_back(point, [point, duration](auto&& args) { + LOG(INFO) << "injection point hit, point=" << point << " sleep ms=" << duration; + std::this_thread::sleep_for(std::chrono::milliseconds(duration)); + }); + return http_json_reply(MetaServiceCode::OK, "OK"); +} + +HttpResponse set_return(const std::string& point, const brpc::URI& uri) { + auto sp = SyncPoint::get_instance(); + sp->set_call_back(point, [point](auto&& args) { + try { + LOG(INFO) << "injection point hit, point=" << point << " return void"; + auto pred = try_any_cast<bool*>(args.back()); + *pred = true; + } catch (const std::bad_any_cast& e) { + LOG(ERROR) << "failed to process `return` e:" << e.what(); + } + }); + + return http_json_reply(MetaServiceCode::OK, "OK"); +} + +HttpResponse handle_set(const brpc::URI& uri) { + const std::string point(http_query(uri, "name")); + if (point.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty point name"); + } + + const std::string behavior(http_query(uri, "behavior")); + if (behavior.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty behavior"); + } + if (behavior == "sleep") { + return set_sleep(point, uri); + } else if (behavior == "return") { + return set_return(point, uri); + } + + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown behavior: " + behavior); +} + +HttpResponse handle_clear(const brpc::URI& uri) { + const std::string point(http_query(uri, "name")); + auto* sp = SyncPoint::get_instance(); + LOG(INFO) << "clear injection point : " << (point.empty() ? "(all points)" : point); + if (point.empty()) { + // If point name is emtpy, clear all + sp->clear_all_call_backs(); + return http_json_reply(MetaServiceCode::OK, "OK"); + } + sp->clear_call_back(point); + return http_json_reply(MetaServiceCode::OK, "OK"); +} + +HttpResponse handle_apply_suite(const brpc::URI& uri) { + const std::string suite(http_query(uri, "name")); + if (suite.empty()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty suite name"); + } + + std::call_once(register_suites_once, register_suites); + if (auto it = suite_map.find(suite); it != suite_map.end()) { + it->second(); // set injection callbacks + return http_json_reply(MetaServiceCode::OK, "OK apply suite " + suite + "\n"); + } + + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown suite: " + suite + "\n"); +} + +HttpResponse handle_enable(const brpc::URI& uri) { + SyncPoint::get_instance()->enable_processing(); + return http_json_reply(MetaServiceCode::OK, "OK"); +} + +HttpResponse handle_disable(const brpc::URI& uri) { + SyncPoint::get_instance()->disable_processing(); + return http_json_reply(MetaServiceCode::OK, "OK"); +} + +// +// enable/disable injection point +// ``` +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=enable" +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=disable" +// ``` + +// clear all injection points +// ``` +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=clear" +// ``` + +// apply/activate specific suite with registered action, see `register_suites()` for more details +// ``` +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=apply_suite&name=${suite_name}" +// ``` + +// ``` +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set +// &name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" # sleep x millisecs + +// curl "ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set +// &name=${injection_point_name}&behavior=return" # return void +// ``` + +HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl) { + auto& uri = ctrl->http_request().uri(); + LOG(INFO) << "handle InjectionPointAction uri:" << uri; + const std::string op(http_query(uri, "op")); + + if (op == "set") { + return handle_set(uri); + } else if (op == "clear") { + return handle_clear(uri); + } else if (op == "apply_suite") { + return handle_apply_suite(uri); + } else if (op == "enable") { + return handle_enable(uri); + } else if (op == "disable") { + return handle_disable(uri); + } + + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown op:" + op); +} +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 70fb53b7fb3..7914bf5db11 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -1537,7 +1537,7 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller, if (code != MetaServiceCode::OK) { LOG(WARNING) << "advance_last_txn failed last_txn=" << version_pb.pending_txn_ids(0) << " code=" << code - << "msg=" << msg; + << " msg=" << msg; return; } continue; diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index 51352d31941..c6eb07010c4 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -181,6 +181,8 @@ static std::string_view remove_version_prefix(std::string_view path) { return path; } +HttpResponse process_injection_point(MetaServiceImpl* service, brpc::Controller* ctrl); + static HttpResponse process_alter_cluster(MetaServiceImpl* service, brpc::Controller* ctrl) { static std::unordered_map<std::string_view, AlterClusterRequest::Operation> operations { {"add_cluster", AlterClusterRequest::ADD_CLUSTER}, @@ -738,11 +740,13 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, {"show_meta_ranges", process_show_meta_ranges}, {"txn_lazy_commit", process_txn_lazy_commit}, {"fix_tablet_stats", process_fix_tablet_stats}, + {"injection_point", process_injection_point}, {"v1/decode_key", process_decode_key}, {"v1/encode_key", process_encode_key}, {"v1/get_value", process_get_value}, {"v1/show_meta_ranges", process_show_meta_ranges}, {"v1/txn_lazy_commit", process_txn_lazy_commit}, + {"v1/injection_point", process_injection_point}, // for get {"get_instance", process_get_instance_info}, // for get diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 74092080e2f..ce0c5a535e4 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -1933,7 +1933,7 @@ void commit_txn_eventually( std::pair<MetaServiceCode, std::string> ret = task->wait(); if (ret.first != MetaServiceCode::OK) { LOG(WARNING) << "txn lazy commit failed txn_id=" << txn_id << " code=" << ret.first - << "msg=" << ret.second; + << " msg=" << ret.second; } std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp b/cloud/src/meta-service/txn_lazy_committer.cpp index 0babd77bbda..9859c2b0ed1 100644 --- a/cloud/src/meta-service/txn_lazy_committer.cpp +++ b/cloud/src/meta-service/txn_lazy_committer.cpp @@ -21,6 +21,7 @@ #include "common/logging.h" #include "common/util.h" +#include "cpp/sync_point.h" #include "meta-service/keys.h" #include "meta-service/meta_service_helper.h" #include "meta-service/meta_service_tablet_stats.h" @@ -190,6 +191,7 @@ void convert_tmp_rowsets( if (code != MetaServiceCode::OK) return; } + TEST_SYNC_POINT_RETURN_WITH_VOID("convert_tmp_rowsets::before_commit", &code); err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { code = cast_as<ErrCategory::COMMIT>(err); @@ -490,7 +492,8 @@ std::pair<MetaServiceCode, std::string> TxnLazyCommitTask::wait() { sw.pause(); if (sw.elapsed_us() > 1000000) { LOG(INFO) << "txn_lazy_commit task wait more than 1000ms, cost=" << sw.elapsed_us() / 1000 - << " ms"; + << " ms" + << " txn_id=" << txn_id_; } return std::make_pair(this->code_, this->msg_); } diff --git a/common/cpp/sync_point.h b/common/cpp/sync_point.h index e3be626777c..25f8a5136cc 100644 --- a/common/cpp/sync_point.h +++ b/common/cpp/sync_point.h @@ -213,7 +213,7 @@ auto try_any_cast_ret(std::vector<std::any>& any) { // TEST_SYNC_POINT is no op in release build. // Turn on this feature by defining the macro -#ifndef BE_TEST +#if !defined(BE_TEST) && !defined(ENABLE_INJECTION_POINT) # define TEST_SYNC_POINT(x) # define TEST_IDX_SYNC_POINT(x, index) # define TEST_SYNC_POINT_CALLBACK(x, ...) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java index b2a9751394f..a075680e476 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java @@ -253,19 +253,15 @@ public class CloudPartition extends Partition { LOG.debug("get version from meta service, partitions: {}, versions: {}", partitionIds, versions); } - if (isEmptyPartitionPruneDisabled()) { - ArrayList<Long> news = new ArrayList<>(); - for (Long v : versions) { - news.add(v == -1 ? 1 : v); - } - return news; - } - if (versionUpdateTimesMs != null) { versionUpdateTimesMs.addAll(resp.getVersionUpdateTimeMsList()); } - return versions; + ArrayList<Long> news = new ArrayList<>(); + for (Long v : versions) { + news.add(v == -1 ? Partition.PARTITION_INIT_VERSION : v); + } + return news; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org