This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new e912ff60612 [cherry-pick](branch-2.0) Pick "[Enhancement](full 
compaction) Add run status support for full compaction (#34043)" (#37607)
e912ff60612 is described below

commit e912ff60612ad76b61b45fb7c3dec318dd51d764
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Mon Jul 15 09:20:57 2024 +0800

    [cherry-pick](branch-2.0) Pick "[Enhancement](full compaction) Add run 
status support for full compaction (#34043)" (#37607)
    
    Pick #34043
    
    The usage is `curl
    http://{ip}:{host}/api/compaction/run_status?tablet_id={tablet_id}` e.g.
    `curl http://127.0.0.1:8040/api/compaction/run_status?tablet_id=10084`
    
    If full compaction is running, the output will be
    ```
    {
    "status" : "Success",
    "run_status" : true,
    "msg" : "compaction task for this tablet is running",
    "tablet_id" : 10084,
    "compact_type" : "full"
    }
    ```
    else the ouput will be
    ```
    {
    "status" : "Success",
    "run_status" : false,
    "msg" : "compaction task for this tablet is not running",
    "tablet_id" : 10084,
    "compact_type" : "full"
    }
    ```
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/http/action/compaction_action.cpp           |  14 +++
 be/src/olap/full_compaction.cpp                    |   7 +-
 be/src/olap/olap_server.cpp                        |  12 ++-
 be/src/olap/storage_engine.cpp                     |  24 +++++
 be/src/olap/storage_engine.h                       |   1 +
 be/src/olap/tablet.cpp                             |   2 +-
 be/src/olap/tablet.h                               |   5 ++
 .../test_full_compaciton_run_status.groovy         | 100 +++++++++++++++++++++
 8 files changed, 160 insertions(+), 5 deletions(-)

diff --git a/be/src/http/action/compaction_action.cpp 
b/be/src/http/action/compaction_action.cpp
index facbcdf40ba..adcc52f9ee7 100644
--- a/be/src/http/action/compaction_action.cpp
+++ b/be/src/http/action/compaction_action.cpp
@@ -221,6 +221,20 @@ Status 
CompactionAction::_handle_run_status_compaction(HttpRequest* req, std::st
         std::string compaction_type;
         bool run_status = false;
 
+        {
+            // Full compaction holds both base compaction lock and cumu 
compaction lock.
+            // So we can not judge if full compaction is running by check 
these two locks holding.
+            // Here, we use a variable 'is_full_compaction_running' to check 
if full compaction is running.
+            if (tablet->is_full_compaction_running()) {
+                msg = "compaction task for this tablet is running";
+                compaction_type = "full";
+                run_status = true;
+                *json_result = strings::Substitute(json_template, run_status, 
msg, tablet_id,
+                                                   compaction_type);
+                return Status::OK();
+            }
+        }
+
         {
             // use try lock to check this tablet is running cumulative 
compaction
             std::unique_lock<std::mutex> 
lock_cumulative(tablet->get_cumulative_compaction_lock(),
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 7567fd4deca..afb725797c1 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -44,7 +44,9 @@ using namespace ErrorCode;
 FullCompaction::FullCompaction(const TabletSharedPtr& tablet)
         : Compaction(tablet, "FullCompaction:" + 
std::to_string(tablet->tablet_id())) {}
 
-FullCompaction::~FullCompaction() {}
+FullCompaction::~FullCompaction() {
+    _tablet->set_is_full_compaction_running(false);
+}
 
 Status FullCompaction::prepare_compact() {
     if (!_tablet->init_succeeded()) {
@@ -53,10 +55,10 @@ Status FullCompaction::prepare_compact() {
 
     std::unique_lock base_lock(_tablet->get_base_compaction_lock());
     std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock());
+    _tablet->set_is_full_compaction_running(true);
 
     // 1. pick rowsets to compact
     RETURN_IF_ERROR(pick_rowsets_to_compact());
-
     return Status::OK();
 }
 
@@ -115,6 +117,7 @@ Status FullCompaction::modify_rowsets(const 
Merger::Statistics* stats) {
         std::lock_guard<std::mutex> 
rowset_update_wlock(_tablet->get_rowset_update_lock());
         std::lock_guard<std::shared_mutex> 
meta_wlock(_tablet->get_header_lock());
         RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, 
_input_rowsets, true));
+        DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.sleep", { sleep(5); })
         _tablet->save_meta();
     }
     return Status::OK();
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 9f4a2d1c030..01f3e45fd69 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -896,11 +896,16 @@ bool 
StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr table
                                     .insert(tablet->tablet_id())
                                     .second);
         break;
-    default:
+    case CompactionType::BASE_COMPACTION:
         already_existed = 
!(_tablet_submitted_base_compaction[tablet->data_dir()]
                                     .insert(tablet->tablet_id())
                                     .second);
         break;
+    case CompactionType::FULL_COMPACTION:
+        already_existed = 
!(_tablet_submitted_full_compaction[tablet->data_dir()]
+                                    .insert(tablet->tablet_id())
+                                    .second);
+        break;
     }
     return already_existed;
 }
@@ -913,9 +918,12 @@ void 
StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet
     case CompactionType::CUMULATIVE_COMPACTION:
         removed = 
_tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet->tablet_id());
         break;
-    default:
+    case CompactionType::BASE_COMPACTION:
         removed = 
_tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet->tablet_id());
         break;
+    case CompactionType::FULL_COMPACTION:
+        removed = 
_tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet->tablet_id());
+        break;
     }
 
     if (removed == 1) {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 0ff941ae0bc..253e0389f56 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1383,6 +1383,30 @@ Status 
StorageEngine::get_compaction_status_json(std::string* result) {
     }
     root.AddMember(base_key, path_obj2, root.GetAllocator());
 
+    // full
+    const std::string& full = "FullCompaction";
+    rapidjson::Value full_key;
+    full_key.SetString(full.c_str(), full.length(), root.GetAllocator());
+    rapidjson::Document path_obj3;
+    path_obj3.SetObject();
+    for (auto& it : _tablet_submitted_full_compaction) {
+        const std::string& dir = it.first->path();
+        rapidjson::Value path_key;
+        path_key.SetString(dir.c_str(), dir.length(), 
path_obj3.GetAllocator());
+
+        rapidjson::Document arr;
+        arr.SetArray();
+
+        for (auto& tablet_id : it.second) {
+            rapidjson::Value key;
+            const std::string& key_str = std::to_string(tablet_id);
+            key.SetString(key_str.c_str(), key_str.length(), 
path_obj3.GetAllocator());
+            arr.PushBack(key, root.GetAllocator());
+        }
+        path_obj3.AddMember(path_key, arr, path_obj3.GetAllocator());
+    }
+    root.AddMember(full_key, path_obj3, root.GetAllocator());
+
     rapidjson::StringBuffer strbuf;
     rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
     root.Accept(writer);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index e450c504121..94478097c7b 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -436,6 +436,7 @@ private:
     // a tablet can do base and cumulative compaction at same time
     std::map<DataDir*, std::unordered_set<TTabletId>> 
_tablet_submitted_cumu_compaction;
     std::map<DataDir*, std::unordered_set<TTabletId>> 
_tablet_submitted_base_compaction;
+    std::map<DataDir*, std::unordered_set<TTabletId>> 
_tablet_submitted_full_compaction;
 
     std::mutex _peer_replica_infos_mutex;
     // key: tabletId
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index df79698f041..f1eed8b52cd 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1575,7 +1575,7 @@ void Tablet::get_compaction_status(std::string* 
json_result) {
     root.AddMember("last base failure time", base_value, root.GetAllocator());
     rapidjson::Value full_value;
     format_str = 
ToStringFromUnixMillis(_last_full_compaction_failure_millis.load());
-    base_value.SetString(format_str.c_str(), format_str.length(), 
root.GetAllocator());
+    full_value.SetString(format_str.c_str(), format_str.length(), 
root.GetAllocator());
     root.AddMember("last full failure time", full_value, root.GetAllocator());
     rapidjson::Value cumu_success_value;
     format_str = 
ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load());
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 0518e32ae8c..20e9ae890cc 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -589,6 +589,10 @@ public:
     void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
     bool is_alter_failed() { return _alter_failed; }
 
+    void set_is_full_compaction_running(bool is_full_compaction_running) {
+        _is_full_compaction_running = is_full_compaction_running;
+    }
+    inline bool is_full_compaction_running() const { return 
_is_full_compaction_running; }
     void clear_cache();
 
 private:
@@ -743,6 +747,7 @@ public:
     IntCounter* flush_bytes;
     IntCounter* flush_finish_count;
     std::atomic<int64_t> publised_count = 0;
+    std::atomic_bool _is_full_compaction_running = false;
 };
 
 inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
diff --git 
a/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy
 
b/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy
new file mode 100644
index 00000000000..4f720d51331
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_full_compaciton_run_status.groovy
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_full_compaction_run_status","nonConcurrent") {
+
+
+    def tableName = "full_compaction_run_status_test"
+ 
+    // test successful group commit async load
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    String backend_id;
+
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    backend_id = backendId_to_backendIP.keySet()[0]
+
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k` int ,
+            `v` int ,
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k`) 
+        BUCKETS 2 
+        properties(
+            "replication_num" = "1",
+            "disable_auto_compaction" = "true")
+        """
+
+    sql """ INSERT INTO ${tableName} VALUES (0,00)"""
+    sql """ INSERT INTO ${tableName} VALUES (1,10)"""
+    sql """ INSERT INTO ${tableName} VALUES (2,20)"""
+    sql """ INSERT INTO ${tableName} VALUES (3,30)"""
+    sql """ INSERT INTO ${tableName} VALUES (4,40)"""
+    sql """ INSERT INTO ${tableName} VALUES (5,50)"""
+    sql """ INSERT INTO ${tableName} VALUES (6,60)"""
+    sql """ INSERT INTO ${tableName} VALUES (7,70)"""
+    sql """ INSERT INTO ${tableName} VALUES (8,80)"""
+    sql """ INSERT INTO ${tableName} VALUES (9,90)"""
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    def exception = false;
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("FullCompaction.modify_rowsets.sleep")
+        def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            backend_id = tablet.BackendId
+
+            times = 1
+            do{
+                (code, out, err) = 
be_run_full_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+                logger.info("Run compaction: code=" + code + ", out=" + out + 
", err=" + err)
+                ++times
+                sleep(1000)
+            } while (parseJson(out.trim()).status.toLowerCase()!="success" && 
times<=10)
+
+            (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Get compaction status: code=" + code + ", out=" + out 
+ ", err=" + err)
+            assertEquals(code, 0)
+            def compactJson = parseJson(out.trim())
+            assertTrue(compactJson.msg.toLowerCase().contains("is running"))
+        }
+        Thread.sleep(30000)
+        logger.info("sleep 30s to wait full compaction finish.")
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            backend_id = tablet.BackendId
+
+            (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Get compaction status: code=" + code + ", out=" + out 
+ ", err=" + err)
+            assertEquals(code, 0)
+            def compactJson = parseJson(out.trim())
+            assertTrue(compactJson.msg.toLowerCase().contains("is not 
running"))
+        }
+    } catch (Exception e) {
+        logger.info(e.getMessage())
+        exception = true;
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("FullCompaction.modify_rowsets.sleep")
+        assertFalse(exception)
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to