This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 385589c92b1 [fix](regression)Fix unstable compaction related cases 
(#46920) (#47003)
385589c92b1 is described below

commit 385589c92b1920c41394a1e0b6f35913d107c9d3
Author: qiye <l...@selectdb.com>
AuthorDate: Wed Jan 15 13:42:04 2025 +0800

    [fix](regression)Fix unstable compaction related cases (#46920) (#47003)
    
    bp #46920
---
 regression-test/plugins/plugin_compaction.groovy   | 10 ++-
 ...st_skip_index_compaction_fault_injection.groovy | 86 ++--------------------
 ..._index_change_with_cumulative_compaction.groovy | 33 +++++----
 3 files changed, 33 insertions(+), 96 deletions(-)

diff --git a/regression-test/plugins/plugin_compaction.groovy 
b/regression-test/plugins/plugin_compaction.groovy
index eaefa9a10d3..45dd99a97a3 100644
--- a/regression-test/plugins/plugin_compaction.groovy
+++ b/regression-test/plugins/plugin_compaction.groovy
@@ -57,8 +57,7 @@ Suite.metaClass.be_run_full_compaction_by_table_id = { String 
ip, String port, S
 }
 
 logger.info("Added 'be_run_full_compaction' function to Suite")
-
-Suite.metaClass.trigger_and_wait_compaction = { String table_name, String 
compaction_type, int timeout_seconds=300 ->
+Suite.metaClass.trigger_and_wait_compaction = { String table_name, String 
compaction_type, int timeout_seconds=300, String[] ignored_errors=[] ->
     if (!(compaction_type in ["cumulative", "base", "full"])) {
         throw new IllegalArgumentException("invalid compaction type: 
${compaction_type}, supported types: cumulative, base, full")
     }
@@ -102,12 +101,15 @@ Suite.metaClass.trigger_and_wait_compaction = { String 
table_name, String compac
         assert exit_code == 0: "trigger compaction failed, exit code: 
${exit_code}, stdout: ${stdout}, stderr: ${stderr}"
         def trigger_status = parseJson(stdout.trim())
         if (trigger_status.status.toLowerCase() != "success") {
-            if (trigger_status.status.toLowerCase() == "already_exist") {
+            def status_lower = trigger_status.status.toLowerCase()
+            if (status_lower == "already_exist") {
                 triggered_tablets.add(tablet) // compaction already in queue, 
treat it as successfully triggered
             } else if (!auto_compaction_disabled) {
                 // ignore the error if auto compaction enabled
-            } else if (trigger_status.status.contains("E-2000")) {
+            } else if (status_lower.contains("e-2000")) {
                 // ignore this tablet compaction.
+            } else if (ignored_errors.any { error -> 
status_lower.contains(error.toLowerCase()) }) {
+                // ignore this tablet compaction if the error is in the 
ignored_errors list
             } else {
                 throw new Exception("trigger compaction failed, be host: 
${be_host}, tablet id: ${tablet.TabletId}, status: ${trigger_status.status}")
             }
diff --git 
a/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy
index 52d8d481c8d..ee18ffb7304 100644
--- 
a/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy
+++ 
b/regression-test/suites/fault_injection_p0/test_skip_index_compaction_fault_injection.groovy
@@ -19,8 +19,8 @@ import org.codehaus.groovy.runtime.IOGroovyMethods
 
 suite("test_skip_index_compaction_fault_injection", "nonConcurrent") {
   def isCloudMode = isCloudMode()
+  // branch-2.1 only support index compaction with index_format_v1
   def tableName1 = "test_skip_index_compaction_fault_injection_1"
-  def tableName2 = "test_skip_index_compaction_fault_injection_2"
   def backendId_to_backendIP = [:]
   def backendId_to_backendHttpPort = [:]
   getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
@@ -46,29 +46,8 @@ suite("test_skip_index_compaction_fault_injection", 
"nonConcurrent") {
     );
   """
 
-  sql "DROP TABLE IF EXISTS ${tableName2}"
-  sql """
-    CREATE TABLE ${tableName2} (
-      `@timestamp` int(11) NULL COMMENT "",
-      `clientip` varchar(20) NULL COMMENT "",
-      `request` text NULL COMMENT "",
-      `status` int(11) NULL COMMENT "",
-      `size` int(11) NULL COMMENT "",
-      INDEX clientip_idx (`clientip`) USING INVERTED COMMENT '',
-      INDEX request_idx (`request`) USING INVERTED PROPERTIES("parser" = 
"english", "support_phrase" = "true") COMMENT ''
-    ) ENGINE=OLAP
-    DUPLICATE KEY(`@timestamp`)
-    COMMENT "OLAP"
-    DISTRIBUTED BY RANDOM BUCKETS 1
-    PROPERTIES (
-      "replication_allocation" = "tag.location.default: 1",
-      "disable_auto_compaction" = "true",
-      "inverted_index_storage_format" = "V2"
-    );
-  """
-
   boolean disableAutoCompaction = false
-  
+
   def set_be_config = { key, value ->
     for (String backend_id: backendId_to_backendIP.keySet()) {
       def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
@@ -76,49 +55,6 @@ suite("test_skip_index_compaction_fault_injection", 
"nonConcurrent") {
     }
   }
 
-  def trigger_full_compaction_on_tablets = { tablets ->
-    for (def tablet : tablets) {
-      String tablet_id = tablet.TabletId
-      String backend_id = tablet.BackendId
-      int times = 1
-
-      String compactionStatus;
-      do{
-        def (code, out, err) = 
be_run_full_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
-        logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" 
+ err)
-        ++times
-        sleep(2000)
-        compactionStatus = parseJson(out.trim()).status.toLowerCase();
-      } while (compactionStatus!="success" && times<=10 && 
compactionStatus!="e-6010")
-
-
-      if (compactionStatus == "fail") {
-        assertEquals(disableAutoCompaction, false)
-        logger.info("Compaction was done automatically!")
-      }
-      if (disableAutoCompaction && compactionStatus!="e-6010") {
-        assertEquals("success", compactionStatus)
-      }
-    }
-  }
-
-  def wait_full_compaction_done = { tablets ->
-    for (def tablet in tablets) {
-      boolean running = true
-      do {
-        Thread.sleep(1000)
-        String tablet_id = tablet.TabletId
-        String backend_id = tablet.BackendId
-        def (code, out, err) = 
be_get_compaction_status(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
-        logger.info("Get compaction status: code=" + code + ", out=" + out + 
", err=" + err)
-        assertEquals(code, 0)
-        def compactionStatus = parseJson(out.trim())
-        assertEquals("success", compactionStatus.status.toLowerCase())
-        running = compactionStatus.run_status
-      } while (running)
-    }
-  }
-
   def get_rowset_count = { tablets ->
     int rowsetCount = 0
     for (def tablet in tablets) {
@@ -148,7 +84,7 @@ suite("test_skip_index_compaction_fault_injection", 
"nonConcurrent") {
     }
   }
 
-  def run_test = { tableName -> 
+  def run_test = { tableName ->
     sql """ INSERT INTO ${tableName} VALUES (1, "40.135.0.0", "GET 
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
     sql """ INSERT INTO ${tableName} VALUES (2, "40.135.0.0", "GET 
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
     sql """ INSERT INTO ${tableName} VALUES (3, "40.135.0.0", "GET 
/images/hm_bg.jpg HTTP/1.0", 1, 2); """
@@ -178,15 +114,13 @@ suite("test_skip_index_compaction_fault_injection", 
"nonConcurrent") {
     assert (rowsetCount == 11 * replicaNum)
 
     // first
-    trigger_full_compaction_on_tablets.call(tablets)
-    wait_full_compaction_done.call(tablets)
+    trigger_and_wait_compaction(tableName, "full", 300, new String[]{"e-6010"})
 
     rowsetCount = get_rowset_count.call(tablets);
     assert (rowsetCount == 11 * replicaNum)
 
     // second
-    trigger_full_compaction_on_tablets.call(tablets)
-    wait_full_compaction_done.call(tablets)
+    trigger_and_wait_compaction(tableName, "full", 300, new String[]{"e-6010"})
 
     rowsetCount = get_rowset_count.call(tablets);
     if (isCloudMode) {
@@ -202,7 +136,7 @@ suite("test_skip_index_compaction_fault_injection", 
"nonConcurrent") {
     String backend_id;
     backend_id = backendId_to_backendIP.keySet()[0]
     def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
-    
+
     logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
     assertEquals(code, 0)
     def configList = parseJson(out.trim())
@@ -226,15 +160,9 @@ suite("test_skip_index_compaction_fault_injection", 
"nonConcurrent") {
       
GetDebugPoint().disableDebugPointForAllBEs("Compaction::open_inverted_index_file_reader")
     }
 
-    // try {
-    //   
GetDebugPoint().enableDebugPointForAllBEs("Compaction::open_inverted_index_file_writer")
-    //   run_test.call(tableName2)
-    // } finally {
-    //   
GetDebugPoint().disableDebugPointForAllBEs("Compaction::open_inverted_index_file_writer")
-    // }
   } finally {
     if (has_update_be_config) {
       set_be_config.call("inverted_index_compaction_enable", 
invertedIndexCompactionEnable.toString())
     }
   }
-}
\ No newline at end of file
+}
diff --git 
a/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy
 
b/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy
index ce12d1ede0c..4276edfdedf 100644
--- 
a/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy
+++ 
b/regression-test/suites/inverted_index_p0/index_change/test_index_change_with_cumulative_compaction.groovy
@@ -40,21 +40,25 @@ suite("test_index_change_with_cumulative_compaction", 
"nonConcurrent") {
         assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish 
timeout")
     }
 
-    def trigger_compaction_with_retry = {table_name, compaction_type = 
"cumulative", max_retries = 10, delay_ms = 2000 ->
-        def retry_count = 0
-        while (true) {
-            try {
-                trigger_and_wait_compaction(table_name, compaction_type)
-                return // Success
-            } catch (Exception e) {
-                retry_count++
-                if (retry_count >= max_retries) {
-                    throw new Exception("Failed to complete ${compaction_type} 
compaction after ${max_retries} attempts", e)
+    def wait_for_build_index_on_partition_finish = { table_name, OpTimeout ->
+        for(int t = delta_time; t <= OpTimeout; t += delta_time){
+            alter_res = sql """SHOW BUILD INDEX WHERE TableName = 
"${table_name}";"""
+            def expected_finished_num = alter_res.size();
+            def finished_num = 0;
+            for (int i = 0; i < expected_finished_num; i++) {
+                logger.info(table_name + " build index job state: " + 
alter_res[i][7] + i)
+                if (alter_res[i][7] == "FINISHED") {
+                    ++finished_num;
                 }
-                logger.warn("Compaction attempt ${retry_count} failed: 
${e.getMessage()}")
-                Thread.sleep(delay_ms)
             }
+            if (finished_num == expected_finished_num) {
+                logger.info(table_name + " all build index jobs finished, 
detail: " + alter_res)
+                break
+            }
+            useTime = t
+            sleep(delta_time)
         }
+        assertTrue(useTime <= OpTimeout, 
"wait_for_latest_build_index_on_partition_finish timeout")
     }
 
     try {
@@ -162,12 +166,15 @@ suite("test_index_change_with_cumulative_compaction", 
"nonConcurrent") {
         // build index
         if (!isCloudMode()) {
             sql "build index idx_user_id on ${tableName}"
+            wait_for_build_index_on_partition_finish(tableName, timeout)
             sql "build index idx_date on ${tableName}"
+            wait_for_build_index_on_partition_finish(tableName, timeout)
             sql "build index idx_city on ${tableName}"
+            wait_for_build_index_on_partition_finish(tableName, timeout)
         }
 
         // trigger compactions for all tablets in ${tableName}
-        trigger_compaction_with_retry(tableName, "cumulative")
+        trigger_and_wait_compaction(tableName, "cumulative")
 
         int rowCount = 0
         for (def tablet in tablets) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to