This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new e912ff60612 [cherry-pick](branch-2.0) Pick "[Enhancement](full compaction) Add run status support for full compaction (#34043)" (#37607) e912ff60612 is described below commit e912ff60612ad76b61b45fb7c3dec318dd51d764 Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Mon Jul 15 09:20:57 2024 +0800 [cherry-pick](branch-2.0) Pick "[Enhancement](full compaction) Add run status support for full compaction (#34043)" (#37607) Pick #34043 The usage is `curl http://{ip}:{host}/api/compaction/run_status?tablet_id={tablet_id}` e.g. `curl http://127.0.0.1:8040/api/compaction/run_status?tablet_id=10084` If full compaction is running, the output will be ``` { "status" : "Success", "run_status" : true, "msg" : "compaction task for this tablet is running", "tablet_id" : 10084, "compact_type" : "full" } ``` else the ouput will be ``` { "status" : "Success", "run_status" : false, "msg" : "compaction task for this tablet is not running", "tablet_id" : 10084, "compact_type" : "full" } ``` ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/http/action/compaction_action.cpp | 14 +++ be/src/olap/full_compaction.cpp | 7 +- be/src/olap/olap_server.cpp | 12 ++- be/src/olap/storage_engine.cpp | 24 +++++ be/src/olap/storage_engine.h | 1 + be/src/olap/tablet.cpp | 2 +- be/src/olap/tablet.h | 5 ++ .../test_full_compaciton_run_status.groovy | 100 +++++++++++++++++++++ 8 files changed, 160 insertions(+), 5 deletions(-) diff --git a/be/src/http/action/compaction_action.cpp b/be/src/http/action/compaction_action.cpp index facbcdf40ba..adcc52f9ee7 100644 --- a/be/src/http/action/compaction_action.cpp +++ b/be/src/http/action/compaction_action.cpp @@ -221,6 +221,20 @@ Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st std::string compaction_type; bool run_status = false; + { + // Full compaction holds both base compaction lock and cumu compaction lock. + // So we can not judge if full compaction is running by check these two locks holding. + // Here, we use a variable 'is_full_compaction_running' to check if full compaction is running. + if (tablet->is_full_compaction_running()) { + msg = "compaction task for this tablet is running"; + compaction_type = "full"; + run_status = true; + *json_result = strings::Substitute(json_template, run_status, msg, tablet_id, + compaction_type); + return Status::OK(); + } + } + { // use try lock to check this tablet is running cumulative compaction std::unique_lock<std::mutex> lock_cumulative(tablet->get_cumulative_compaction_lock(), diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 7567fd4deca..afb725797c1 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -44,7 +44,9 @@ using namespace ErrorCode; FullCompaction::FullCompaction(const TabletSharedPtr& tablet) : Compaction(tablet, "FullCompaction:" + std::to_string(tablet->tablet_id())) {} -FullCompaction::~FullCompaction() {} +FullCompaction::~FullCompaction() { + _tablet->set_is_full_compaction_running(false); +} Status FullCompaction::prepare_compact() { if (!_tablet->init_succeeded()) { @@ -53,10 +55,10 @@ Status FullCompaction::prepare_compact() { std::unique_lock base_lock(_tablet->get_base_compaction_lock()); std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock()); + _tablet->set_is_full_compaction_running(true); // 1. pick rowsets to compact RETURN_IF_ERROR(pick_rowsets_to_compact()); - return Status::OK(); } @@ -115,6 +117,7 @@ Status FullCompaction::modify_rowsets(const Merger::Statistics* stats) { std::lock_guard<std::mutex> rowset_update_wlock(_tablet->get_rowset_update_lock()); std::lock_guard<std::shared_mutex> meta_wlock(_tablet->get_header_lock()); RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true)); + DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.sleep", { sleep(5); }) _tablet->save_meta(); } return Status::OK(); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 9f4a2d1c030..01f3e45fd69 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -896,11 +896,16 @@ bool StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr table .insert(tablet->tablet_id()) .second); break; - default: + case CompactionType::BASE_COMPACTION: already_existed = !(_tablet_submitted_base_compaction[tablet->data_dir()] .insert(tablet->tablet_id()) .second); break; + case CompactionType::FULL_COMPACTION: + already_existed = !(_tablet_submitted_full_compaction[tablet->data_dir()] + .insert(tablet->tablet_id()) + .second); + break; } return already_existed; } @@ -913,9 +918,12 @@ void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet case CompactionType::CUMULATIVE_COMPACTION: removed = _tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet->tablet_id()); break; - default: + case CompactionType::BASE_COMPACTION: removed = _tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet->tablet_id()); break; + case CompactionType::FULL_COMPACTION: + removed = _tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet->tablet_id()); + break; } if (removed == 1) { diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 0ff941ae0bc..253e0389f56 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1383,6 +1383,30 @@ Status StorageEngine::get_compaction_status_json(std::string* result) { } root.AddMember(base_key, path_obj2, root.GetAllocator()); + // full + const std::string& full = "FullCompaction"; + rapidjson::Value full_key; + full_key.SetString(full.c_str(), full.length(), root.GetAllocator()); + rapidjson::Document path_obj3; + path_obj3.SetObject(); + for (auto& it : _tablet_submitted_full_compaction) { + const std::string& dir = it.first->path(); + rapidjson::Value path_key; + path_key.SetString(dir.c_str(), dir.length(), path_obj3.GetAllocator()); + + rapidjson::Document arr; + arr.SetArray(); + + for (auto& tablet_id : it.second) { + rapidjson::Value key; + const std::string& key_str = std::to_string(tablet_id); + key.SetString(key_str.c_str(), key_str.length(), path_obj3.GetAllocator()); + arr.PushBack(key, root.GetAllocator()); + } + path_obj3.AddMember(path_key, arr, path_obj3.GetAllocator()); + } + root.AddMember(full_key, path_obj3, root.GetAllocator()); + rapidjson::StringBuffer strbuf; rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf); root.Accept(writer); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index e450c504121..94478097c7b 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -436,6 +436,7 @@ private: // a tablet can do base and cumulative compaction at same time std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_cumu_compaction; std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_base_compaction; + std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_full_compaction; std::mutex _peer_replica_infos_mutex; // key: tabletId diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index df79698f041..f1eed8b52cd 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1575,7 +1575,7 @@ void Tablet::get_compaction_status(std::string* json_result) { root.AddMember("last base failure time", base_value, root.GetAllocator()); rapidjson::Value full_value; format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load()); - base_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + full_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); root.AddMember("last full failure time", full_value, root.GetAllocator()); rapidjson::Value cumu_success_value; format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load()); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 0518e32ae8c..20e9ae890cc 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -589,6 +589,10 @@ public: void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; } bool is_alter_failed() { return _alter_failed; } + void set_is_full_compaction_running(bool is_full_compaction_running) { + _is_full_compaction_running = is_full_compaction_running; + } + inline bool is_full_compaction_running() const { return _is_full_compaction_running; } void clear_cache(); private: @@ -743,6 +747,7 @@ public: IntCounter* flush_bytes; IntCounter* flush_finish_count; std::atomic<int64_t> publised_count = 0; + std::atomic_bool _is_full_compaction_running = false; }; inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() { diff --git a/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy b/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy new file mode 100644 index 00000000000..4f720d51331 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_full_compaction_run_status","nonConcurrent") { + + + def tableName = "full_compaction_run_status_test" + + // test successful group commit async load + sql """ DROP TABLE IF EXISTS ${tableName} """ + + String backend_id; + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DISTRIBUTED BY HASH(`k`) + BUCKETS 2 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true") + """ + + sql """ INSERT INTO ${tableName} VALUES (0,00)""" + sql """ INSERT INTO ${tableName} VALUES (1,10)""" + sql """ INSERT INTO ${tableName} VALUES (2,20)""" + sql """ INSERT INTO ${tableName} VALUES (3,30)""" + sql """ INSERT INTO ${tableName} VALUES (4,40)""" + sql """ INSERT INTO ${tableName} VALUES (5,50)""" + sql """ INSERT INTO ${tableName} VALUES (6,60)""" + sql """ INSERT INTO ${tableName} VALUES (7,70)""" + sql """ INSERT INTO ${tableName} VALUES (8,80)""" + sql """ INSERT INTO ${tableName} VALUES (9,90)""" + + GetDebugPoint().clearDebugPointsForAllBEs() + + def exception = false; + try { + GetDebugPoint().enableDebugPointForAllBEs("FullCompaction.modify_rowsets.sleep") + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + + times = 1 + do{ + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + ++times + sleep(1000) + } while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10) + + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertTrue(compactJson.msg.toLowerCase().contains("is running")) + } + Thread.sleep(30000) + logger.info("sleep 30s to wait full compaction finish.") + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertTrue(compactJson.msg.toLowerCase().contains("is not running")) + } + } catch (Exception e) { + logger.info(e.getMessage()) + exception = true; + } finally { + GetDebugPoint().disableDebugPointForAllBEs("FullCompaction.modify_rowsets.sleep") + assertFalse(exception) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org