This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 46d207e5f41 branch-3.0: [Fix](compaction) Should do_lease for full 
compaction #47436 (#47519)
46d207e5f41 is described below

commit 46d207e5f41d6a82b50ae8c62f57659d303eb09f
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Feb 6 14:42:30 2025 +0800

    branch-3.0: [Fix](compaction) Should do_lease for full compaction #47436 
(#47519)
    
    Cherry-picked from #47436
    
    Co-authored-by: bobhan1 <bao...@selectdb.com>
---
 .../cloud/cloud_cumulative_compaction_policy.cpp   |  16 +++
 be/src/cloud/cloud_full_compaction.cpp             |   3 +
 be/src/cloud/cloud_storage_engine.cpp              |   9 ++
 .../cloud/test_cloud_full_compaction_do_lease.out  | Bin 0 -> 110 bytes
 .../org/apache/doris/regression/suite/Suite.groovy |  18 +++
 .../test_cloud_full_compaction_do_lease.groovy     | 123 +++++++++++++++++++++
 6 files changed, 169 insertions(+)

diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp 
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index 6445b47fc59..9e3ca3eb3db 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -54,6 +54,22 @@ int64_t 
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
         const int64_t max_compaction_score, const int64_t min_compaction_score,
         std::vector<RowsetSharedPtr>* input_rowsets, Version* 
last_delete_version,
         size_t* compaction_score, bool allow_delete) {
+    DBUG_EXECUTE_IF(
+            
"CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
 {
+                auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+                if (target_tablet_id == tablet->tablet_id()) {
+                    auto start_version = dp->param<int64_t>("start_version", 
-1);
+                    auto end_version = dp->param<int64_t>("end_version", -1);
+                    for (auto& rowset : candidate_rowsets) {
+                        if (rowset->start_version() >= start_version &&
+                            rowset->end_version() <= end_version) {
+                            input_rowsets->push_back(rowset);
+                        }
+                    }
+                }
+                return input_rowsets->size();
+            })
+
     size_t promotion_size = cloud_promotion_size(tablet);
     auto max_version = tablet->max_version().first;
     int transient_size = 0;
diff --git a/be/src/cloud/cloud_full_compaction.cpp 
b/be/src/cloud/cloud_full_compaction.cpp
index bce00c9a2e7..f983e57ebe0 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -30,6 +30,7 @@
 #include "olap/rowset/beta_rowset.h"
 #include "olap/tablet_meta.h"
 #include "service/backend_options.h"
+#include "util/debug_points.h"
 #include "util/thread.h"
 #include "util/uuid_generator.h"
 #include "vec/columns/column.h"
@@ -221,6 +222,8 @@ Status CloudFullCompaction::modify_rowsets() {
     
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
     
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());
 
+    DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.block", DBUG_BLOCK);
+
     DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
     if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
         _tablet->enable_unique_key_merge_on_write()) {
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index ee4f8b611a6..a8768554cff 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -749,6 +749,7 @@ Status CloudStorageEngine::submit_compaction_task(const 
CloudTabletSPtr& tablet,
 void CloudStorageEngine::_lease_compaction_thread_callback() {
     while (!_stop_background_threads_latch.wait_for(
             std::chrono::seconds(config::lease_compaction_interval_seconds))) {
+        std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions;
         std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions;
         std::vector<std::shared_ptr<CloudCumulativeCompaction>> 
cumu_compactions;
         {
@@ -763,8 +764,16 @@ void 
CloudStorageEngine::_lease_compaction_thread_callback() {
                     cumu_compactions.push_back(cumu);
                 }
             }
+            for (auto& [_, full] : _submitted_full_compactions) {
+                if (full) {
+                    full_compactions.push_back(full);
+                }
+            }
         }
         // TODO(plat1ko): Support batch lease rpc
+        for (auto& comp : full_compactions) {
+            comp->do_lease();
+        }
         for (auto& comp : cumu_compactions) {
             comp->do_lease();
         }
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.out
new file mode 100644
index 00000000000..6e498c12c60
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.out
 differ
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 4c6fdcb5e6b..47826ca639e 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1724,6 +1724,24 @@ class Suite implements GroovyInterceptable {
         }
     }
 
+    void setBeConfigTemporary(Map<String, Object> tempConfig, Closure 
actionSupplier) {
+        Map<String, Map<String, String>> originConf = Maps.newHashMap()
+        tempConfig.each{ k, v ->
+            originConf.put(k, get_be_param(k))
+        }
+        try {
+            tempConfig.each{ k, v -> set_be_param(k, v)}
+            actionSupplier()
+        } catch (Exception e) {
+            logger.info(e.getMessage())
+            throw e
+        } finally {
+            originConf.each { k, confs ->
+                set_original_be_param(k, confs)
+            }
+        }
+    }
+
     void waitAddFeFinished(String host, int port) {
         logger.info("waiting for ${host}:${port}")
         Awaitility.await().atMost(60, TimeUnit.SECONDS).with().pollDelay(100, 
TimeUnit.MILLISECONDS).and()
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy
new file mode 100644
index 00000000000..1910788d92f
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+
+suite("test_cloud_full_compaction_do_lease","nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    def tableName = "test_cloud_full_compaction_do_lease"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """ CREATE TABLE ${tableName}
+            (k int, v1 int, v2 int )
+            UNIQUE KEY(k)
+            DISTRIBUTED BY HASH (k) 
+            BUCKETS 1  PROPERTIES(
+                "replication_num" = "1",
+                "enable_unique_key_merge_on_write"="true",
+                "disable_auto_compaction" = "true");
+        """
+
+    (1..20).each{ id -> 
+        sql """insert into ${tableName} select number, number, number from 
numbers("number"="10");"""
+    }
+
+    qt_sql "select count(1) from ${tableName};"
+
+    def backends = sql_return_maparray('show backends')
+    def tabletStats = sql_return_maparray("show tablets from ${tableName};")
+    assert tabletStats.size() == 1
+    def tabletId = tabletStats[0].TabletId
+    def tabletBackendId = tabletStats[0].BackendId
+    def tabletBackend
+    for (def be : backends) {
+        if (be.BackendId == tabletBackendId) {
+            tabletBackend = be
+            break;
+        }
+    }
+    logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with 
backendId=${tabletBackend.BackendId}");
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    def customBeConfig = [
+        lease_compaction_interval_seconds : 2
+    ]
+
+    setBeConfigTemporary(customBeConfig) {
+        // the default value of lease_compaction_interval_seconds is 20s, 
which means
+        // the compaction lease thread will sleep for 20s first, we sleep 20s 
in case
+        // so that compaction lease thread can be scheduled as we expect(2s)
+        Thread.sleep(20000)
+        try {
+            // block the full compaction
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block")
+
+            
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+                    [tablet_id:"${tabletId}", start_version:"2", 
end_version:"10"]);
+
+            {
+                // trigger full compaction, it will be blokced in 
modify_rowsets
+                logger.info("trigger full compaction on BE 
${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
+                def (code, out, err) = 
be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
+                logger.info("Run compaction: code=" + code + ", out=" + out + 
", err=" + err)
+                assert  code == 0
+                def compactJson = parseJson(out.trim())
+                assert "success" == compactJson.status.toLowerCase()
+            }
+            
+            // wait until the full compaction job's lease 
timeout(lease_compaction_interval_seconds * 4)
+            Thread.sleep(10000);
+
+            {
+                // trigger cumu compaction
+                logger.info("trigger cumu compaction on BE 
${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
+                def (code, out, err) = 
be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort, 
tabletId)
+                logger.info("Run compaction: code=" + code + ", out=" + out + 
", err=" + err)
+                assert code == 0
+                def compactJson = parseJson(out.trim())
+                // this will fail due to existing full compaction
+                assert "e-2000" == compactJson.status.toLowerCase()
+            }
+
+            Thread.sleep(1000);
+
+            // unblock full compaction
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block")
+
+            Thread.sleep(3000);
+
+            {
+                def (code, out, err) = 
be_show_tablet_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
+                assert code == 0
+                def compactJson = parseJson(out.trim())
+                assert compactJson["rowsets"].toString().contains("[2-21]")
+            }
+            
+
+        } catch (Exception e) {
+            logger.info(e.getMessage())
+            assert false
+        } finally {
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block")
+            
GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets")
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to