This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2d9ef61f272 [fix](recycler) Implement missing delete_prefix() of hdfs accessor (#40215) 2d9ef61f272 is described below commit 2d9ef61f27247fbfda6b0497f91cee75d11b3e6f Author: Gavin Chou <gavineaglec...@gmail.com> AuthorDate: Tue Sep 3 18:09:22 2024 +0800 [fix](recycler) Implement missing delete_prefix() of hdfs accessor (#40215) To test it, 1. enable commit_rowset fail with injection point 2. issue a streamload 3. check the recycler log or storage vault the uncommitted rowset recycled ``` curl 'be_ip:http_port/api/injection_point/enable' curl 'be_ip:http_port/api/injection_point/clear' curl "be_ip:http_port/api/injection_point/set?name=CloudMetaMgr::commit_rowset&behavior=return_error" curl -XPUT --location-trusted -vv -T tmp/1.csv -H'format:csv' -H 'column_separator:,' -H'timeout:60' -uroot: be_ip:http_port/api/db1/t2/_stream_load curl 'be_ip:http_port/api/injection_point/clear' curl 'be_ip:http_port/api/injection_point/disable' ``` --- be/src/cloud/cloud_meta_mgr.cpp | 21 +++++++++- be/src/cloud/cloud_stream_load_executor.cpp | 2 +- be/src/cloud/injection_point_action.cpp | 63 +++++++++++++++++++++++++---- cloud/src/common/config.h | 5 ++- cloud/src/meta-service/meta_service_txn.cpp | 19 +++++---- cloud/src/recycler/hdfs_accessor.cpp | 23 ++++++++++- cloud/src/recycler/recycler.cpp | 16 ++++++-- 7 files changed, 124 insertions(+), 25 deletions(-) diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index 2599a8c7b76..4ca44f67cec 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -712,7 +712,10 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, RowsetMetaSharedPtr* existed_rs_meta) { VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id() << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); - + { + Status ret_st; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_rowset", ret_st); + } CreateRowsetRequest req; CreateRowsetResponse resp; req.set_cloud_unique_id(config::cloud_unique_id); @@ -738,6 +741,10 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, RowsetMetaSharedPtr* existed_rs_meta) { VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id() << ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id(); + { + Status ret_st; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st); + } CreateRowsetRequest req; CreateRowsetResponse resp; req.set_cloud_unique_id(config::cloud_unique_id); @@ -838,6 +845,10 @@ static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id, Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id << ", label: " << ctx.label << ", is_2pc: " << is_2pc; + { + Status ret_st; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_txn", ret_st); + } CommitTxnRequest req; CommitTxnResponse res; req.set_cloud_unique_id(config::cloud_unique_id); @@ -856,6 +867,10 @@ Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id << ", label: " << ctx.label; + { + Status ret_st; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_txn", ret_st); + } AbortTxnRequest req; AbortTxnResponse res; req.set_cloud_unique_id(config::cloud_unique_id); @@ -875,6 +890,10 @@ Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) { VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id << ", label: " << ctx.label; + { + Status ret_st; + TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::precommit_txn", ret_st); + } PrecommitTxnRequest req; PrecommitTxnResponse res; req.set_cloud_unique_id(config::cloud_unique_id); diff --git a/be/src/cloud/cloud_stream_load_executor.cpp b/be/src/cloud/cloud_stream_load_executor.cpp index 1b8167c96eb..1352b4aac81 100644 --- a/be/src/cloud/cloud_stream_load_executor.cpp +++ b/be/src/cloud/cloud_stream_load_executor.cpp @@ -129,7 +129,7 @@ void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) { std::stringstream ss; ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label; std::string op_info = ss.str(); - LOG(INFO) << "rollback stream laod txn " << op_info; + LOG(INFO) << "rollback stream load txn " << op_info; TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID : !ctx->label.empty() ? TxnOpParamType::WITH_LABEL : TxnOpParamType::ILLEGAL; diff --git a/be/src/cloud/injection_point_action.cpp b/be/src/cloud/injection_point_action.cpp index be90ee23afd..1880f14a3b7 100644 --- a/be/src/cloud/injection_point_action.cpp +++ b/be/src/cloud/injection_point_action.cpp @@ -124,6 +124,17 @@ void register_suites() { *arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error"); }); }); + // curl be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn' + suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] { + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("CloudMetaMgr::commit_txn", [](auto&& args) { + LOG(INFO) << "injection CloudMetaMgr::commit_txn"; + auto* arg0 = try_any_cast_ret<Status>(args); + arg0->first = Status::InternalError<false>( + "test_file_segment_cache_corruption injection error"); + arg0->second = true; + }); + }); } void set_sleep(const std::string& point, HttpRequest* req) { @@ -139,7 +150,8 @@ void set_sleep(const std::string& point, HttpRequest* req) { } } auto sp = SyncPoint::get_instance(); - sp->set_call_back(point, [duration](auto&& args) { + sp->set_call_back(point, [point, duration](auto&& args) { + LOG(INFO) << "injection point hit, point=" << point << " sleep milliseconds=" << duration; std::this_thread::sleep_for(std::chrono::milliseconds(duration)); }); HttpChannel::send_reply(req, HttpStatus::OK, "OK"); @@ -147,8 +159,9 @@ void set_sleep(const std::string& point, HttpRequest* req) { void set_return(const std::string& point, HttpRequest* req) { auto sp = SyncPoint::get_instance(); - sp->set_call_back(point, [](auto&& args) { + sp->set_call_back(point, [point](auto&& args) { try { + LOG(INFO) << "injection point hit, point=" << point << " return void"; auto pred = try_any_cast<bool*>(args.back()); *pred = true; } catch (const std::bad_any_cast&) { @@ -160,8 +173,9 @@ void set_return(const std::string& point, HttpRequest* req) { void set_return_ok(const std::string& point, HttpRequest* req) { auto sp = SyncPoint::get_instance(); - sp->set_call_back(point, [](auto&& args) { + sp->set_call_back(point, [point](auto&& args) { try { + LOG(INFO) << "injection point hit, point=" << point << " return ok"; auto* pair = try_any_cast_ret<Status>(args); pair->first = Status::OK(); pair->second = true; @@ -188,8 +202,9 @@ void set_return_error(const std::string& point, HttpRequest* req) { } auto sp = SyncPoint::get_instance(); - sp->set_call_back(point, [code](auto&& args) { + sp->set_call_back(point, [code, point](auto&& args) { try { + LOG(INFO) << "injection point hit, point=" << point << " return error code=" << code; auto* pair = try_any_cast_ret<Status>(args); pair->first = Status::Error<false>(code, "injected error"); pair->second = true; @@ -243,7 +258,7 @@ void handle_clear(HttpRequest* req) { HttpChannel::send_reply(req, HttpStatus::OK, "OK"); } -void handle_suite(HttpRequest* req) { +void handle_apply_suite(HttpRequest* req) { auto& suite = req->param("name"); if (suite.empty()) { HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty suite name"); @@ -253,10 +268,11 @@ void handle_suite(HttpRequest* req) { std::call_once(register_suites_once, register_suites); if (auto it = suite_map.find(suite); it != suite_map.end()) { it->second(); // set injection callbacks - HttpChannel::send_reply(req, HttpStatus::OK, "OK"); + HttpChannel::send_reply(req, HttpStatus::OK, "OK apply suite " + suite + "\n"); return; } - HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, "unknown suite: " + suite); + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, + "unknown suite: " + suite + "\n"); } void handle_enable(HttpRequest* req) { @@ -273,6 +289,37 @@ void handle_disable(HttpRequest* req) { InjectionPointAction::InjectionPointAction() = default; +// +// enable/disable injection point +// ``` +// curl "be_ip:http_port/api/injection_point/enable" +// curl "be_ip:http_port/api/injection_point/disable" +// ``` +// +// clear all injection points +// ``` +// curl "be_ip:http_port/api/injection_point/clear" +// ``` +// +// apply/activate specific suite with registered action, see `register_suites()` for more details +// ``` +// curl "be_ip:http_port/api/injection_point/apply_suite?name=${suite_name}" +// ``` +// +// set predifined action for specific injection point, supported actions are: +// * sleep: for injection point with callback, accepted param is `duration` in milliseconds +// * return: for injection point without return value (return void) +// * return_ok: for injection point with return value, always return Status::OK +// * return_error: for injection point with return value, accepted param is `code`, +// which is an int, valid values can be found in status.h, e.g. -235 or -230, +// if `code` is not present return Status::InternalError +// ``` +// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" # sleep x millisecs +// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return" # return void +// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_ok" # return ok +// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error" # internal error +// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error&code=${code}" # -235 +// ``` void InjectionPointAction::handle(HttpRequest* req) { LOG(INFO) << "handle InjectionPointAction " << req->debug_string(); auto& op = req->param("op"); @@ -283,7 +330,7 @@ void InjectionPointAction::handle(HttpRequest* req) { handle_clear(req); return; } else if (op == "apply_suite") { - handle_suite(req); + handle_apply_suite(req); return; } else if (op == "enable") { handle_enable(req); diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 2b31eed3f24..6c3b22e1bb9 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -55,7 +55,7 @@ CONF_Bool(enable_file_logger, "true"); // recycler config CONF_mInt64(recycle_interval_seconds, "3600"); -CONF_mInt64(retention_seconds, "259200"); // 72h +CONF_mInt64(retention_seconds, "259200"); // 72h, global retention time CONF_Int32(recycle_concurrency, "16"); CONF_Int32(recycle_job_lease_expired_ms, "60000"); CONF_mInt64(compacted_rowset_retention_seconds, "10800"); // 3h @@ -77,7 +77,8 @@ CONF_mInt32(scan_instances_interval_seconds, "60"); // 1min CONF_mInt32(check_object_interval_seconds, "43200"); // 12hours CONF_mInt64(check_recycle_task_interval_seconds, "600"); // 10min -CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h +// log a warning if a recycle task takes longer than this duration +CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h CONF_String(test_s3_ak, ""); CONF_String(test_s3_sk, ""); diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index af28e180e30..dfa633b270c 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -479,8 +479,8 @@ void MetaServiceImpl::precommit_txn(::google::protobuf::RpcController* controlle return; } - LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id; txn->put(running_key, running_val); + LOG(INFO) << "xxx put running_key=" << hex(running_key) << " txn_id=" << txn_id; err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { @@ -569,8 +569,6 @@ void put_routine_load_progress(MetaServiceCode& code, std::string& msg, new_statistic_info->set_task_execution_time_ms(commit_attachment.task_execution_time_ms()); } - LOG(INFO) << "routine load new progress: " << new_progress_info.ShortDebugString(); - if (!new_progress_info.SerializeToString(&new_progress_val)) { code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; ss << "failed to serialize new progress val, txn_id=" << txn_id; @@ -579,6 +577,8 @@ void put_routine_load_progress(MetaServiceCode& code, std::string& msg, } txn->put(rl_progress_key, new_progress_val); + LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key) + << " routine load new progress: " << new_progress_info.ShortDebugString(); } void MetaServiceImpl::get_rl_task_commit_attach(::google::protobuf::RpcController* controller, @@ -689,6 +689,7 @@ void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* contr if (request->partition_to_offset().size() == 0) { txn->remove(rl_progress_key); + LOG(INFO) << "remove rl_progress_key key=" << hex(rl_progress_key); } if (request->partition_to_offset().size() > 0) { @@ -738,6 +739,7 @@ void MetaServiceImpl::reset_rl_progress(::google::protobuf::RpcController* contr return; } txn->put(rl_progress_key, new_progress_val); + LOG(INFO) << "put rl_progress_key key=" << hex(rl_progress_key); } err = txn->commit(); @@ -892,6 +894,7 @@ void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stat stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs); stats_pb.SerializeToString(&val); txn->put(key, val); + LOG(INFO) << "put stats_tablet_key key=" << hex(key); } } /** @@ -1851,6 +1854,7 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* stats_pb.set_num_segments(stats_pb.num_segments() + stats.num_segs); stats_pb.SerializeToString(&val); txn->put(key, val); + LOG(INFO) << "put stats_tablet_key, key=" << hex(key); }; } for (auto& [tablet_id, stats] : tablet_stats) { @@ -1886,8 +1890,9 @@ void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* return; } txn->put(recycle_key, recycle_val); + LOG(INFO) << "xxx commit_txn put recycle_txn_key key=" << hex(recycle_key) + << " txn_id=" << txn_id; - LOG(INFO) << "xxx commit_txn put recycle_key key=" << hex(recycle_key) << " txn_id=" << txn_id; LOG(INFO) << "commit_txn put_size=" << txn->put_bytes() << " del_size=" << txn->delete_bytes() << " num_put_keys=" << txn->num_put_keys() << " num_del_keys=" << txn->num_del_keys() << " txn_size=" << txn->approximate_bytes() << " txn_id=" << txn_id; @@ -2011,7 +2016,7 @@ static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* re //1. abort txn by txn id //2. abort txn by label and db_id if (txn_id > 0) { - VLOG_DEBUG << "abort_txn by txn_id"; + VLOG_DEBUG << "abort_txn by txn_id, txn_id=" << txn_id; //abort txn by txn id // Get db id with txn id @@ -2080,7 +2085,7 @@ static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* re return; } } else { - VLOG_DEBUG << "abort_txn by db_id and txn label"; + VLOG_DEBUG << "abort_txn db_id and label, db_id=" << db_id << " label=" << label; //abort txn by label. std::string label_key = txn_label_key({instance_id, db_id, label}); std::string label_val; @@ -2196,7 +2201,7 @@ static void _abort_txn(const std::string& instance_id, const AbortTxnRequest* re return; } txn->put(recycle_key, recycle_val); - LOG(INFO) << "xxx put recycle_key=" << hex(recycle_key) + LOG(INFO) << "put recycle_txn_key=" << hex(recycle_key) << " txn_id=" << return_txn_info.txn_id(); } diff --git a/cloud/src/recycler/hdfs_accessor.cpp b/cloud/src/recycler/hdfs_accessor.cpp index e5038735f57..97a4670d2bf 100644 --- a/cloud/src/recycler/hdfs_accessor.cpp +++ b/cloud/src/recycler/hdfs_accessor.cpp @@ -354,8 +354,27 @@ int HdfsAccessor::init() { } int HdfsAccessor::delete_prefix(const std::string& path_prefix, int64_t expiration_time) { - LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix)); // Audit log - return 0; + auto uri = to_uri(path_prefix); + LOG(INFO) << "delete prefix, uri=" << uri; + std::unique_ptr<ListIterator> list_iter; + int ret = list_all(&list_iter); + if (ret != 0) { + LOG(WARNING) << "delete prefix, failed to list" << uri; + return ret; + } + size_t num_listed = 0, num_deleted = 0; + for (auto file = list_iter->next(); file; file = list_iter->next()) { + ++num_listed; + if (file->path.find(path_prefix) != 0) continue; + if (int del_ret = delete_file(file->path); del_ret != 0) { + ret = del_ret; + break; + } + ++num_deleted; + } + LOG(INFO) << "delete prefix " << (ret != 0 ? "failed" : "succ") << " ret=" << ret + << " uri=" << uri << " num_listed=" << num_listed << " num_deleted=" << num_deleted; + return ret; } int HdfsAccessor::delete_directory_impl(const std::string& dir_path) { diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 0e5293b916e..38c1b797904 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1131,7 +1131,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id, int64_ return {std::string_view(), range_move}; } ++num_recycled; - LOG_INFO("k is {}, is empty {}", k, k.empty()); + LOG(INFO) << "recycle_tablets scan, key=" << (k.empty() ? "(empty)" : hex(k)); return {k, range_move}; }); } else { @@ -1602,6 +1602,10 @@ int InstanceRecycler::recycle_rowsets() { LOG_WARNING("malformed recycle rowset").tag("key", hex(k)); return -1; } + + VLOG_DEBUG << "recycle rowset scan, key=" << hex(k) << " num_scanned=" << num_scanned + << " num_expired=" << num_expired << " expiration=" << calc_expiration(rowset) + << " RecycleRowsetPB=" << rowset.ShortDebugString(); int64_t current_time = ::time(nullptr); if (current_time < calc_expiration(rowset)) { // not expired return 0; @@ -1653,8 +1657,8 @@ int InstanceRecycler::recycle_rowsets() { << rowset_meta->start_version() << '-' << rowset_meta->end_version() << "] txn_id=" << rowset_meta->txn_id() << " type=" << RecycleRowsetPB_Type_Name(rowset.type()) - << " rowset_meta_size=" << v.size() << " creation_time" - << rowset_meta->creation_time(); + << " rowset_meta_size=" << v.size() + << " creation_time=" << rowset_meta->creation_time(); if (rowset.type() == RecycleRowsetPB::PREPARE) { // unable to calculate file path, can only be deleted by rowset id prefix num_prepare += 1; @@ -1760,6 +1764,10 @@ int InstanceRecycler::recycle_tmp_rowsets() { // duration or timeout always < `retention_time` in practice. int64_t expiration = rowset.txn_expiration() > 0 ? rowset.txn_expiration() : rowset.creation_time(); + VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << " num_scanned=" << num_scanned + << " num_expired=" << num_expired << " expiration=" << expiration + << " txn_expiration=" << rowset.txn_expiration() + << " rowset_creation_time=" << rowset.creation_time(); if (current_time < expiration + config::retention_seconds) { // not expired return 0; @@ -1974,7 +1982,7 @@ int InstanceRecycler::recycle_expired_txn_label() { recycle_txn_key(recycle_txn_key_info0, &begin_recycle_txn_key); recycle_txn_key(recycle_txn_key_info1, &end_recycle_txn_key); - LOG_INFO("begin to recycle expire txn").tag("instance_id", instance_id_); + LOG_INFO("begin to recycle expired txn").tag("instance_id", instance_id_); int64_t start_time = duration_cast<seconds>(steady_clock::now().time_since_epoch()).count(); register_recycle_task(task_name, start_time); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org