This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4820ba2d41d [feat](cloud) Add injection point http api for ms (#41982) 
(#47204)
4820ba2d41d is described below

commit 4820ba2d41dbb4251f606425d18448f78d6bca49
Author: Gavin Chou <ga...@selectdb.com>
AuthorDate: Mon Jan 20 01:48:08 2025 +0800

    [feat](cloud) Add injection point http api for ms (#41982) (#47204)
    
    pick #41982
    
    Co-authored-by: Lei Zhang <27994433+swjtu-zhang...@users.noreply.github.com>
---
 build.sh                                           |   1 +
 cloud/CMakeLists.txt                               |   4 +
 cloud/src/meta-service/CMakeLists.txt              |   1 +
 cloud/src/meta-service/injection_point_http.cpp    | 226 +++++++++++++++++++++
 cloud/src/meta-service/meta_service.cpp            |   2 +-
 cloud/src/meta-service/meta_service_http.cpp       |   4 +
 cloud/src/meta-service/meta_service_txn.cpp        |   2 +-
 cloud/src/meta-service/txn_lazy_committer.cpp      |   5 +-
 common/cpp/sync_point.h                            |   2 +-
 .../apache/doris/cloud/catalog/CloudPartition.java |  14 +-
 10 files changed, 248 insertions(+), 13 deletions(-)

diff --git a/build.sh b/build.sh
index 8f1262aa76b..42c3a09a950 100755
--- a/build.sh
+++ b/build.sh
@@ -632,6 +632,7 @@ if [[ "${BUILD_CLOUD}" -eq 1 ]]; then
         -DCMAKE_MAKE_PROGRAM="${MAKE_PROGRAM}" \
         -DCMAKE_EXPORT_COMPILE_COMMANDS=ON \
         -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" \
+        -DENABLE_INJECTION_POINT="${ENABLE_INJECTION_POINT}" \
         -DMAKE_TEST=OFF \
         "${CMAKE_USE_CCACHE}" \
         -DUSE_LIBCPP="${USE_LIBCPP}" \
diff --git a/cloud/CMakeLists.txt b/cloud/CMakeLists.txt
index a8eccf60896..627e6f283b9 100644
--- a/cloud/CMakeLists.txt
+++ b/cloud/CMakeLists.txt
@@ -411,6 +411,10 @@ if (${MAKE_TEST} STREQUAL "ON")
     add_definitions(-DBE_TEST)
 endif ()
 
+if (ENABLE_INJECTION_POINT)
+    add_definitions(-DENABLE_INJECTION_POINT)
+endif()
+
 # Add libs if needed, download to current dir -- ${BUILD_DIR}
 set(FDB_LIB "fdb_lib_7_1_23.tar.xz")
 if (ARCH_AARCH64)
diff --git a/cloud/src/meta-service/CMakeLists.txt 
b/cloud/src/meta-service/CMakeLists.txt
index c7c4887a068..d11f87e7fa2 100644
--- a/cloud/src/meta-service/CMakeLists.txt
+++ b/cloud/src/meta-service/CMakeLists.txt
@@ -12,6 +12,7 @@ add_library(MetaService
     meta_server.cpp
     meta_service.cpp
     meta_service_http.cpp
+    injection_point_http.cpp
     meta_service_job.cpp
     meta_service_resource.cpp
     meta_service_schema.cpp
diff --git a/cloud/src/meta-service/injection_point_http.cpp 
b/cloud/src/meta-service/injection_point_http.cpp
new file mode 100644
index 00000000000..80d1bcfdf2e
--- /dev/null
+++ b/cloud/src/meta-service/injection_point_http.cpp
@@ -0,0 +1,226 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <fmt/format.h>
+#include <gen_cpp/cloud.pb.h>
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "cpp/sync_point.h"
+#include "meta-service/keys.h"
+#include "meta-service/meta_service_helper.h"
+#include "meta-service/txn_kv.h"
+#include "meta-service/txn_kv_error.h"
+#include "meta_service.h"
+#include "meta_service_http.h"
+
+namespace doris::cloud {
+
+std::map<std::string, std::function<void()>> suite_map;
+std::once_flag register_suites_once;
+
+inline std::default_random_engine make_random_engine() {
+    return std::default_random_engine(
+            
static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
+}
+
+static void register_suites() {
+    suite_map.emplace("test_txn_lazy_commit", [] {
+        auto sp = SyncPoint::get_instance();
+
+        
sp->set_call_back("commit_txn_immediately::advance_last_pending_txn_id", 
[&](auto&& args) {
+            std::default_random_engine rng = make_random_engine();
+            std::uniform_int_distribution<uint32_t> u(100, 1000);
+            uint32_t duration_ms = u(rng);
+            LOG(INFO) << "commit_txn_immediately::advance_last_pending_txn_id 
sleep " << duration_ms
+                      << " ms";
+            
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
+        });
+
+        sp->set_call_back("commit_txn_eventually::txn_lazy_committer_submit", 
[&](auto&& args) {
+            std::default_random_engine rng = make_random_engine();
+            std::uniform_int_distribution<uint32_t> u(100, 1000);
+            uint32_t duration_ms = u(rng);
+            LOG(INFO) << "commit_txn_eventually::txn_lazy_committer_submit 
sleep " << duration_ms
+                      << " ms";
+            
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
+        });
+
+        sp->set_call_back("commit_txn_eventually::txn_lazy_committer_wait", 
[&](auto&& args) {
+            std::default_random_engine rng = make_random_engine();
+            std::uniform_int_distribution<uint32_t> u(100, 1000);
+            uint32_t duration_ms = u(rng);
+            LOG(INFO) << "commit_txn_eventually::txn_lazy_committer_wait sleep 
" << duration_ms
+                      << " ms";
+            
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
+        });
+
+        sp->set_call_back("convert_tmp_rowsets::before_commit", [&](auto&& 
args) {
+            std::default_random_engine rng = make_random_engine();
+            std::uniform_int_distribution<uint32_t> u(1, 50);
+            uint32_t duration_ms = u(rng);
+            
std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms));
+            LOG(INFO) << "convert_tmp_rowsets::before_commit sleep " << 
duration_ms << " ms";
+            if (duration_ms <= 25) {
+                MetaServiceCode* code = 
try_any_cast<MetaServiceCode*>(args[0]);
+                *code = MetaServiceCode::KV_TXN_CONFLICT;
+                bool* pred = try_any_cast<bool*>(args.back());
+                *pred = true;
+                LOG(INFO) << "convert_tmp_rowsets::before_commit 
random_value=" << duration_ms
+                          << " inject kv txn conflict";
+            }
+        });
+    });
+}
+
+HttpResponse set_sleep(const std::string& point, const brpc::URI& uri) {
+    std::string duration_str(http_query(uri, "duration"));
+    int64_t duration = 0;
+    try {
+        duration = std::stol(duration_str);
+    } catch (const std::exception& e) {
+        auto msg = fmt::format("invalid duration:{}", duration_str);
+        LOG(WARNING) << msg;
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, msg);
+    }
+
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back(point, [point, duration](auto&& args) {
+        LOG(INFO) << "injection point hit, point=" << point << " sleep ms=" << 
duration;
+        std::this_thread::sleep_for(std::chrono::milliseconds(duration));
+    });
+    return http_json_reply(MetaServiceCode::OK, "OK");
+}
+
+HttpResponse set_return(const std::string& point, const brpc::URI& uri) {
+    auto sp = SyncPoint::get_instance();
+    sp->set_call_back(point, [point](auto&& args) {
+        try {
+            LOG(INFO) << "injection point hit, point=" << point << " return 
void";
+            auto pred = try_any_cast<bool*>(args.back());
+            *pred = true;
+        } catch (const std::bad_any_cast& e) {
+            LOG(ERROR) << "failed to process `return` e:" << e.what();
+        }
+    });
+
+    return http_json_reply(MetaServiceCode::OK, "OK");
+}
+
+HttpResponse handle_set(const brpc::URI& uri) {
+    const std::string point(http_query(uri, "name"));
+    if (point.empty()) {
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty point 
name");
+    }
+
+    const std::string behavior(http_query(uri, "behavior"));
+    if (behavior.empty()) {
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty 
behavior");
+    }
+    if (behavior == "sleep") {
+        return set_sleep(point, uri);
+    } else if (behavior == "return") {
+        return set_return(point, uri);
+    }
+
+    return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown 
behavior: " + behavior);
+}
+
+HttpResponse handle_clear(const brpc::URI& uri) {
+    const std::string point(http_query(uri, "name"));
+    auto* sp = SyncPoint::get_instance();
+    LOG(INFO) << "clear injection point : " << (point.empty() ? "(all points)" 
: point);
+    if (point.empty()) {
+        // If point name is emtpy, clear all
+        sp->clear_all_call_backs();
+        return http_json_reply(MetaServiceCode::OK, "OK");
+    }
+    sp->clear_call_back(point);
+    return http_json_reply(MetaServiceCode::OK, "OK");
+}
+
+HttpResponse handle_apply_suite(const brpc::URI& uri) {
+    const std::string suite(http_query(uri, "name"));
+    if (suite.empty()) {
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "empty suite 
name");
+    }
+
+    std::call_once(register_suites_once, register_suites);
+    if (auto it = suite_map.find(suite); it != suite_map.end()) {
+        it->second(); // set injection callbacks
+        return http_json_reply(MetaServiceCode::OK, "OK apply suite " + suite 
+ "\n");
+    }
+
+    return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown suite: 
" + suite + "\n");
+}
+
+HttpResponse handle_enable(const brpc::URI& uri) {
+    SyncPoint::get_instance()->enable_processing();
+    return http_json_reply(MetaServiceCode::OK, "OK");
+}
+
+HttpResponse handle_disable(const brpc::URI& uri) {
+    SyncPoint::get_instance()->disable_processing();
+    return http_json_reply(MetaServiceCode::OK, "OK");
+}
+
+//
+// enable/disable injection point
+// ```
+// curl 
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=enable"
+// curl 
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=disable"
+// ```
+
+// clear all injection points
+// ```
+// curl 
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=clear"
+// ```
+
+// apply/activate specific suite with registered action, see 
`register_suites()` for more details
+// ```
+// curl 
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=apply_suite&name=${suite_name}"
+// ```
+
+// ```
+// curl 
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
+//     &name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" # 
sleep x millisecs
+
+// curl 
"ms_ip:port/MetaService/http/v1/injection_point?token=greedisgood9999&op=set
+//     &name=${injection_point_name}&behavior=return" # return void
+// ```
+
+HttpResponse process_injection_point(MetaServiceImpl* service, 
brpc::Controller* ctrl) {
+    auto& uri = ctrl->http_request().uri();
+    LOG(INFO) << "handle InjectionPointAction uri:" << uri;
+    const std::string op(http_query(uri, "op"));
+
+    if (op == "set") {
+        return handle_set(uri);
+    } else if (op == "clear") {
+        return handle_clear(uri);
+    } else if (op == "apply_suite") {
+        return handle_apply_suite(uri);
+    } else if (op == "enable") {
+        return handle_enable(uri);
+    } else if (op == "disable") {
+        return handle_disable(uri);
+    }
+
+    return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "unknown op:" + 
op);
+}
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index 70fb53b7fb3..7914bf5db11 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1537,7 +1537,7 @@ void 
MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
                     if (code != MetaServiceCode::OK) {
                         LOG(WARNING) << "advance_last_txn failed last_txn="
                                      << version_pb.pending_txn_ids(0) << " 
code=" << code
-                                     << "msg=" << msg;
+                                     << " msg=" << msg;
                         return;
                     }
                     continue;
diff --git a/cloud/src/meta-service/meta_service_http.cpp 
b/cloud/src/meta-service/meta_service_http.cpp
index 51352d31941..c6eb07010c4 100644
--- a/cloud/src/meta-service/meta_service_http.cpp
+++ b/cloud/src/meta-service/meta_service_http.cpp
@@ -181,6 +181,8 @@ static std::string_view 
remove_version_prefix(std::string_view path) {
     return path;
 }
 
+HttpResponse process_injection_point(MetaServiceImpl* service, 
brpc::Controller* ctrl);
+
 static HttpResponse process_alter_cluster(MetaServiceImpl* service, 
brpc::Controller* ctrl) {
     static std::unordered_map<std::string_view, 
AlterClusterRequest::Operation> operations {
             {"add_cluster", AlterClusterRequest::ADD_CLUSTER},
@@ -738,11 +740,13 @@ void 
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
             {"show_meta_ranges", process_show_meta_ranges},
             {"txn_lazy_commit", process_txn_lazy_commit},
             {"fix_tablet_stats", process_fix_tablet_stats},
+            {"injection_point", process_injection_point},
             {"v1/decode_key", process_decode_key},
             {"v1/encode_key", process_encode_key},
             {"v1/get_value", process_get_value},
             {"v1/show_meta_ranges", process_show_meta_ranges},
             {"v1/txn_lazy_commit", process_txn_lazy_commit},
+            {"v1/injection_point", process_injection_point},
             // for get
             {"get_instance", process_get_instance_info},
             // for get
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 74092080e2f..ce0c5a535e4 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1933,7 +1933,7 @@ void commit_txn_eventually(
         std::pair<MetaServiceCode, std::string> ret = task->wait();
         if (ret.first != MetaServiceCode::OK) {
             LOG(WARNING) << "txn lazy commit failed txn_id=" << txn_id << " 
code=" << ret.first
-                         << "msg=" << ret.second;
+                         << " msg=" << ret.second;
         }
 
         std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> 
stats
diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp 
b/cloud/src/meta-service/txn_lazy_committer.cpp
index 0babd77bbda..9859c2b0ed1 100644
--- a/cloud/src/meta-service/txn_lazy_committer.cpp
+++ b/cloud/src/meta-service/txn_lazy_committer.cpp
@@ -21,6 +21,7 @@
 
 #include "common/logging.h"
 #include "common/util.h"
+#include "cpp/sync_point.h"
 #include "meta-service/keys.h"
 #include "meta-service/meta_service_helper.h"
 #include "meta-service/meta_service_tablet_stats.h"
@@ -190,6 +191,7 @@ void convert_tmp_rowsets(
         if (code != MetaServiceCode::OK) return;
     }
 
+    TEST_SYNC_POINT_RETURN_WITH_VOID("convert_tmp_rowsets::before_commit", 
&code);
     err = txn->commit();
     if (err != TxnErrorCode::TXN_OK) {
         code = cast_as<ErrCategory::COMMIT>(err);
@@ -490,7 +492,8 @@ std::pair<MetaServiceCode, std::string> 
TxnLazyCommitTask::wait() {
     sw.pause();
     if (sw.elapsed_us() > 1000000) {
         LOG(INFO) << "txn_lazy_commit task wait more than 1000ms, cost=" << 
sw.elapsed_us() / 1000
-                  << " ms";
+                  << " ms"
+                  << " txn_id=" << txn_id_;
     }
     return std::make_pair(this->code_, this->msg_);
 }
diff --git a/common/cpp/sync_point.h b/common/cpp/sync_point.h
index e3be626777c..25f8a5136cc 100644
--- a/common/cpp/sync_point.h
+++ b/common/cpp/sync_point.h
@@ -213,7 +213,7 @@ auto try_any_cast_ret(std::vector<std::any>& any) {
 
 // TEST_SYNC_POINT is no op in release build.
 // Turn on this feature by defining the macro
-#ifndef BE_TEST
+#if !defined(BE_TEST) && !defined(ENABLE_INJECTION_POINT)
 # define TEST_SYNC_POINT(x)
 # define TEST_IDX_SYNC_POINT(x, index)
 # define TEST_SYNC_POINT_CALLBACK(x, ...)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
index b2a9751394f..a075680e476 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
@@ -253,19 +253,15 @@ public class CloudPartition extends Partition {
             LOG.debug("get version from meta service, partitions: {}, 
versions: {}", partitionIds, versions);
         }
 
-        if (isEmptyPartitionPruneDisabled()) {
-            ArrayList<Long> news = new ArrayList<>();
-            for (Long v : versions) {
-                news.add(v == -1 ? 1 : v);
-            }
-            return news;
-        }
-
         if (versionUpdateTimesMs != null) {
             versionUpdateTimesMs.addAll(resp.getVersionUpdateTimeMsList());
         }
 
-        return versions;
+        ArrayList<Long> news = new ArrayList<>();
+        for (Long v : versions) {
+            news.add(v == -1 ?  Partition.PARTITION_INIT_VERSION : v);
+        }
+        return news;
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to