This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new dc713f993ed [Enhancement](full compaction) Add run status support for full compaction (#34043) dc713f993ed is described below commit dc713f993eddc117c7ca8eff0c7cc5da973a3314 Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Sat Apr 27 13:51:28 2024 +0800 [Enhancement](full compaction) Add run status support for full compaction (#34043) * The usage is `curl http://{ip}:{host}/api/compaction/run_status?tablet_id={tablet_id}` e.g. `curl http://127.0.0.1:8040/api/compaction/run_status?tablet_id=10084` If full compaction is running, the output will be ``` { "status" : "Success", "run_status" : true, "msg" : "compaction task for this tablet is running", "tablet_id" : 10084, "compact_type" : "full" } ``` else the ouput will be ``` { "status" : "Success", "run_status" : false, "msg" : "compaction task for this tablet is not running", "tablet_id" : 10084, "compact_type" : "full" } ``` * 2 --- be/src/http/action/compaction_action.cpp | 14 +++ be/src/olap/full_compaction.cpp | 6 +- be/src/olap/olap_server.cpp | 12 ++- be/src/olap/storage_engine.cpp | 24 +++++ be/src/olap/storage_engine.h | 1 + be/src/olap/tablet.cpp | 2 +- be/src/olap/tablet.h | 6 ++ .../test_full_compaciton_run_status.groovy | 100 +++++++++++++++++++++ 8 files changed, 161 insertions(+), 4 deletions(-) diff --git a/be/src/http/action/compaction_action.cpp b/be/src/http/action/compaction_action.cpp index f61edf92fc1..43ad940db5e 100644 --- a/be/src/http/action/compaction_action.cpp +++ b/be/src/http/action/compaction_action.cpp @@ -193,6 +193,20 @@ Status CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st std::string compaction_type; bool run_status = false; + { + // Full compaction holds both base compaction lock and cumu compaction lock. + // So we can not judge if full compaction is running by check these two locks holding. + // Here, we use a variable 'is_full_compaction_running' to check if full compaction is running. + if (tablet->is_full_compaction_running()) { + msg = "compaction task for this tablet is running"; + compaction_type = "full"; + run_status = true; + *json_result = strings::Substitute(json_template, run_status, msg, tablet_id, + compaction_type); + return Status::OK(); + } + } + { // use try lock to check this tablet is running cumulative compaction std::unique_lock<std::mutex> lock_cumulative(tablet->get_cumulative_compaction_lock(), diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 0d6660ca543..8a2712c38b5 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -46,7 +46,9 @@ FullCompaction::FullCompaction(StorageEngine& engine, const TabletSharedPtr& tab : CompactionMixin(engine, tablet, "FullCompaction:" + std::to_string(tablet->tablet_id())) { } -FullCompaction::~FullCompaction() = default; +FullCompaction::~FullCompaction() { + tablet()->set_is_full_compaction_running(false); +} Status FullCompaction::prepare_compact() { if (!tablet()->init_succeeded()) { @@ -55,6 +57,7 @@ Status FullCompaction::prepare_compact() { std::unique_lock base_lock(tablet()->get_base_compaction_lock()); std::unique_lock cumu_lock(tablet()->get_cumulative_compaction_lock()); + tablet()->set_is_full_compaction_running(true); // 1. pick rowsets to compact RETURN_IF_ERROR(pick_rowsets_to_compact()); @@ -112,6 +115,7 @@ Status FullCompaction::modify_rowsets() { std::lock_guard<std::mutex> rowset_update_wlock(tablet()->get_rowset_update_lock()); std::lock_guard<std::shared_mutex> meta_wlock(_tablet->get_header_lock()); RETURN_IF_ERROR(tablet()->modify_rowsets(output_rowsets, _input_rowsets, true)); + DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.sleep", { sleep(5); }) tablet()->save_meta(); } return Status::OK(); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 73bd0e37f81..17ad3979fae 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -918,11 +918,16 @@ bool StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr table .insert(tablet->tablet_id()) .second); break; - default: + case CompactionType::BASE_COMPACTION: already_existed = !(_tablet_submitted_base_compaction[tablet->data_dir()] .insert(tablet->tablet_id()) .second); break; + case CompactionType::FULL_COMPACTION: + already_existed = !(_tablet_submitted_full_compaction[tablet->data_dir()] + .insert(tablet->tablet_id()) + .second); + break; } return already_existed; } @@ -935,9 +940,12 @@ void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet case CompactionType::CUMULATIVE_COMPACTION: removed = _tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet->tablet_id()); break; - default: + case CompactionType::BASE_COMPACTION: removed = _tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet->tablet_id()); break; + case CompactionType::FULL_COMPACTION: + removed = _tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet->tablet_id()); + break; } if (removed == 1) { diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index d327f82ab33..d549e17b1bf 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1396,6 +1396,30 @@ Status StorageEngine::get_compaction_status_json(std::string* result) { } root.AddMember(base_key, path_obj2, root.GetAllocator()); + // full + const std::string& full = "FullCompaction"; + rapidjson::Value full_key; + full_key.SetString(full.c_str(), full.length(), root.GetAllocator()); + rapidjson::Document path_obj3; + path_obj3.SetObject(); + for (auto& it : _tablet_submitted_full_compaction) { + const std::string& dir = it.first->path(); + rapidjson::Value path_key; + path_key.SetString(dir.c_str(), dir.length(), path_obj3.GetAllocator()); + + rapidjson::Document arr; + arr.SetArray(); + + for (auto& tablet_id : it.second) { + rapidjson::Value key; + const std::string& key_str = std::to_string(tablet_id); + key.SetString(key_str.c_str(), key_str.length(), path_obj3.GetAllocator()); + arr.PushBack(key, root.GetAllocator()); + } + path_obj3.AddMember(path_key, arr, path_obj3.GetAllocator()); + } + root.AddMember(full_key, path_obj3, root.GetAllocator()); + rapidjson::StringBuffer strbuf; rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf); root.Accept(writer); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 0412a576e16..63234047305 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -441,6 +441,7 @@ private: // a tablet can do base and cumulative compaction at same time std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_cumu_compaction; std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_base_compaction; + std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_full_compaction; std::mutex _low_priority_task_nums_mutex; std::unordered_map<DataDir*, int32_t> _low_priority_task_nums; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 94889bbcf8f..358292463fc 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1241,7 +1241,7 @@ void Tablet::get_compaction_status(std::string* json_result) { root.AddMember("last base failure time", base_value, root.GetAllocator()); rapidjson::Value full_value; format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load()); - base_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); + full_value.SetString(format_str.c_str(), format_str.length(), root.GetAllocator()); root.AddMember("last full failure time", full_value, root.GetAllocator()); rapidjson::Value cumu_success_value; format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load()); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 41f67b898ce..678a519cfae 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -459,6 +459,10 @@ public: void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; } bool is_alter_failed() { return _alter_failed; } + void set_is_full_compaction_running(bool is_full_compaction_running) { + _is_full_compaction_running = is_full_compaction_running; + } + inline bool is_full_compaction_running() const { return _is_full_compaction_running; } void clear_cache() override; private: @@ -573,6 +577,8 @@ private: std::atomic<bool> _alter_failed = false; int64_t _io_error_times = 0; + + std::atomic_bool _is_full_compaction_running = false; }; inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() { diff --git a/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy b/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy new file mode 100644 index 00000000000..4f720d51331 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_full_compaction_run_status","nonConcurrent") { + + + def tableName = "full_compaction_run_status_test" + + // test successful group commit async load + sql """ DROP TABLE IF EXISTS ${tableName} """ + + String backend_id; + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + backend_id = backendId_to_backendIP.keySet()[0] + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + DISTRIBUTED BY HASH(`k`) + BUCKETS 2 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true") + """ + + sql """ INSERT INTO ${tableName} VALUES (0,00)""" + sql """ INSERT INTO ${tableName} VALUES (1,10)""" + sql """ INSERT INTO ${tableName} VALUES (2,20)""" + sql """ INSERT INTO ${tableName} VALUES (3,30)""" + sql """ INSERT INTO ${tableName} VALUES (4,40)""" + sql """ INSERT INTO ${tableName} VALUES (5,50)""" + sql """ INSERT INTO ${tableName} VALUES (6,60)""" + sql """ INSERT INTO ${tableName} VALUES (7,70)""" + sql """ INSERT INTO ${tableName} VALUES (8,80)""" + sql """ INSERT INTO ${tableName} VALUES (9,90)""" + + GetDebugPoint().clearDebugPointsForAllBEs() + + def exception = false; + try { + GetDebugPoint().enableDebugPointForAllBEs("FullCompaction.modify_rowsets.sleep") + def tablets = sql_return_maparray """ show tablets from ${tableName}; """ + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + + times = 1 + do{ + (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + ++times + sleep(1000) + } while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10) + + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertTrue(compactJson.msg.toLowerCase().contains("is running")) + } + Thread.sleep(30000) + logger.info("sleep 30s to wait full compaction finish.") + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + + (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactJson = parseJson(out.trim()) + assertTrue(compactJson.msg.toLowerCase().contains("is not running")) + } + } catch (Exception e) { + logger.info(e.getMessage()) + exception = true; + } finally { + GetDebugPoint().disableDebugPointForAllBEs("FullCompaction.modify_rowsets.sleep") + assertFalse(exception) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org