This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 38114ce1e86 [fix](merge-cloud) fix single rowset did not trigger 
compaction in cloud mode (#34622)
38114ce1e86 is described below

commit 38114ce1e860799a3100a6395211e8dbcc27fb63
Author: Luwei <814383...@qq.com>
AuthorDate: Fri May 10 10:42:34 2024 +0800

    [fix](merge-cloud) fix single rowset did not trigger compaction in cloud 
mode (#34622)
---
 .../cloud/cloud_cumulative_compaction_policy.cpp   | 11 ++--
 .../suites/compaction/test_base_compaction.groovy  | 74 ++++++++++++---------
 .../test_base_compaction_no_value.groovy           | 76 ++++++++++++----------
 3 files changed, 92 insertions(+), 69 deletions(-)

diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp 
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index d040b21d421..1c658e5d546 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -54,7 +54,7 @@ int32_t 
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
         const int64_t max_compaction_score, const int64_t min_compaction_score,
         std::vector<RowsetSharedPtr>* input_rowsets, Version* 
last_delete_version,
         size_t* compaction_score, bool allow_delete) {
-    //size_t promotion_size = tablet->cumulative_promotion_size();
+    size_t promotion_size = cloud_promotion_size(tablet);
     auto max_version = tablet->max_version().first;
     int transient_size = 0;
     *compaction_score = 0;
@@ -93,6 +93,10 @@ int32_t 
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
         input_rowsets->push_back(rowset);
     }
 
+    if (total_size >= promotion_size) {
+        return transient_size;
+    }
+
     // if there is delete version, do compaction directly
     if (last_delete_version->first != -1) {
         if (input_rowsets->size() == 1) {
@@ -154,9 +158,8 @@ int32_t 
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
     *compaction_score = new_compaction_score;
 
     VLOG_CRITICAL << "cumulative compaction size_based policy, 
compaction_score = "
-                  << *compaction_score << ", total_size = "
-                  << total_size
-                  //<< ", calc promotion size value = " << promotion_size
+                  << *compaction_score << ", total_size = " << total_size
+                  << ", calc promotion size value = " << promotion_size
                   << ", tablet = " << tablet->tablet_id() << ", input_rowset 
size "
                   << input_rowsets->size();
 
diff --git a/regression-test/suites/compaction/test_base_compaction.groovy 
b/regression-test/suites/compaction/test_base_compaction.groovy
index d0c8074ca4c..5aae1ecdef7 100644
--- a/regression-test/suites/compaction/test_base_compaction.groovy
+++ b/regression-test/suites/compaction/test_base_compaction.groovy
@@ -33,14 +33,6 @@ suite("test_base_compaction") {
     def configList = parseJson(out.trim())
     assert configList instanceof List
 
-    boolean disableAutoCompaction = true
-    for (Object ele in (List) configList) {
-        assert ele instanceof List<String>
-        if (((List<String>) ele)[0] == "disable_auto_compaction") {
-            disableAutoCompaction = Boolean.parseBoolean(((List<String>) 
ele)[2])
-        }
-    }
-
     sql """ DROP TABLE IF EXISTS ${tableName} """
     sql """
         CREATE TABLE IF NOT EXISTS ${tableName} (
@@ -64,7 +56,8 @@ suite("test_base_compaction") {
         UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
         DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 1
         PROPERTIES (
-          "replication_num" = "1"
+          "replication_num" = "1",
+          "disable_auto_compaction" = "true"
         )
 
     """
@@ -84,7 +77,42 @@ suite("test_base_compaction") {
 
         // relate to 
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
         // also, you can stream load a http stream, e.g. http://xxx/some.csv
-        file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
+        file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
+
+        time 10000 // limit inflight 10s
+
+        // stream load action will check result, include Success status, and 
NumberTotalRows == NumberLoadedRows
+
+        // if declared a check callback, the default check condition will 
ignore.
+        // So you must check all condition
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+            assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        // a default db 'regression_test' is specified in
+        // ${DORIS_HOME}/conf/regression-conf.groovy
+        table tableName
+
+        // default label is UUID:
+        // set 'label' UUID.randomUUID().toString()
+
+        // default column_separator is specify in doris fe config, usually is 
'\t'.
+        // this line change to ','
+        set 'column_separator', '|'
+        set 'compress_type', 'GZ'
+
+        // relate to 
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
+        // also, you can stream load a http stream, e.g. http://xxx/some.csv
+        file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
 
         time 10000 // limit inflight 10s
 
@@ -114,13 +142,7 @@ suite("test_base_compaction") {
         logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" 
+ err)
         assertEquals(code, 0)
         def compactJson = parseJson(out.trim())
-        if (compactJson.status.toLowerCase() == "fail") {
-            assertEquals(disableAutoCompaction, false)
-            logger.info("Compaction was done automatically!")
-        }
-        if (disableAutoCompaction) {
-            assertEquals("success", compactJson.status.toLowerCase())
-        }
+        assertEquals("success", compactJson.status.toLowerCase())
     }
 
     // wait for all compactions done
@@ -154,7 +176,7 @@ suite("test_base_compaction") {
 
         // relate to 
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
         // also, you can stream load a http stream, e.g. http://xxx/some.csv
-        file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
+        file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
 
         time 10000 // limit inflight 10s
 
@@ -182,13 +204,7 @@ suite("test_base_compaction") {
         logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" 
+ err)
         assertEquals(code, 0)
         def compactJson = parseJson(out.trim())
-        if (compactJson.status.toLowerCase() == "fail") {
-            assertEquals(disableAutoCompaction, false)
-            logger.info("Compaction was done automatically!")
-        }
-        if (disableAutoCompaction) {
-            assertEquals("success", compactJson.status.toLowerCase())
-        }
+        assertEquals("success", compactJson.status.toLowerCase())
     }
 
     // wait for all compactions done
@@ -219,13 +235,7 @@ suite("test_base_compaction") {
         logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" 
+ err)
         assertEquals(code, 0)
         def compactJson = parseJson(out.trim())
-        if (compactJson.status.toLowerCase() == "fail") {
-            assertEquals(disableAutoCompaction, false)
-            logger.info("Compaction was done automatically!")
-        }
-        if (disableAutoCompaction) {
-            assertEquals("success", compactJson.status.toLowerCase())
-        }
+        assertEquals("success", compactJson.status.toLowerCase())
     }
 
     // wait for all compactions done
diff --git 
a/regression-test/suites/compaction/test_base_compaction_no_value.groovy 
b/regression-test/suites/compaction/test_base_compaction_no_value.groovy
index 81ce0cd8263..91a50ce4dcd 100644
--- a/regression-test/suites/compaction/test_base_compaction_no_value.groovy
+++ b/regression-test/suites/compaction/test_base_compaction_no_value.groovy
@@ -27,20 +27,12 @@ suite("test_base_compaction_no_value") {
 
     backend_id = backendId_to_backendIP.keySet()[0]
     def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
-    
+
     logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
     assertEquals(code, 0)
     def configList = parseJson(out.trim())
     assert configList instanceof List
 
-    boolean disableAutoCompaction = true
-    for (Object ele in (List) configList) {
-        assert ele instanceof List<String>
-        if (((List<String>) ele)[0] == "disable_auto_compaction") {
-            disableAutoCompaction = Boolean.parseBoolean(((List<String>) 
ele)[2])
-        }
-    }
-
     sql """ DROP TABLE IF EXISTS ${tableName} """
     sql """
         CREATE TABLE IF NOT EXISTS ${tableName} (
@@ -64,7 +56,8 @@ suite("test_base_compaction_no_value") {
         UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, 
L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, 
L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT)
         DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 1
         PROPERTIES (
-          "replication_num" = "1"
+          "replication_num" = "1",
+          "disable_auto_compaction" = "true"
         )
 
     """
@@ -84,7 +77,42 @@ suite("test_base_compaction_no_value") {
 
         // relate to 
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
         // also, you can stream load a http stream, e.g. http://xxx/some.csv
-        file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
+        file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
+
+        time 10000 // limit inflight 10s
+
+        // stream load action will check result, include Success status, and 
NumberTotalRows == NumberLoadedRows
+
+        // if declared a check callback, the default check condition will 
ignore.
+        // So you must check all condition
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            assertEquals("success", json.Status.toLowerCase())
+            assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+            assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+        }
+    }
+
+    streamLoad {
+        // a default db 'regression_test' is specified in
+        // ${DORIS_HOME}/conf/regression-conf.groovy
+        table tableName
+
+        // default label is UUID:
+        // set 'label' UUID.randomUUID().toString()
+
+        // default column_separator is specify in doris fe config, usually is 
'\t'.
+        // this line change to ','
+        set 'column_separator', '|'
+        set 'compress_type', 'GZ'
+
+        // relate to 
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
+        // also, you can stream load a http stream, e.g. http://xxx/some.csv
+        file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
 
         time 10000 // limit inflight 10s
 
@@ -114,13 +142,7 @@ suite("test_base_compaction_no_value") {
         logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" 
+ err)
         assertEquals(code, 0)
         def compactJson = parseJson(out.trim())
-        if (compactJson.status.toLowerCase() == "fail") {
-            assertEquals(disableAutoCompaction, false)
-            logger.info("Compaction was done automatically!")
-        }
-        if (disableAutoCompaction) {
-            assertEquals("success", compactJson.status.toLowerCase())
-        }
+        assertEquals("success", compactJson.status.toLowerCase())
     }
 
     // wait for all compactions done
@@ -154,7 +176,7 @@ suite("test_base_compaction_no_value") {
 
         // relate to 
${DORIS_HOME}/regression-test/data/demo/streamload_input.csv.
         // also, you can stream load a http stream, e.g. http://xxx/some.csv
-        file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz"""
+        file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz"""
 
         time 10000 // limit inflight 10s
 
@@ -182,13 +204,7 @@ suite("test_base_compaction_no_value") {
         logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" 
+ err)
         assertEquals(code, 0)
         def compactJson = parseJson(out.trim())
-        if (compactJson.status.toLowerCase() == "fail") {
-            assertEquals(disableAutoCompaction, false)
-            logger.info("Compaction was done automatically!")
-        }
-        if (disableAutoCompaction) {
-            assertEquals("success", compactJson.status.toLowerCase())
-        }
+        assertEquals("success", compactJson.status.toLowerCase())
     }
 
     // wait for all compactions done
@@ -219,13 +235,7 @@ suite("test_base_compaction_no_value") {
         logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" 
+ err)
         assertEquals(code, 0)
         def compactJson = parseJson(out.trim())
-        if (compactJson.status.toLowerCase() == "fail") {
-            assertEquals(disableAutoCompaction, false)
-            logger.info("Compaction was done automatically!")
-        }
-        if (disableAutoCompaction) {
-            assertEquals("success", compactJson.status.toLowerCase())
-        }
+        assertEquals("success", compactJson.status.toLowerCase())
     }
 
     // wait for all compactions done


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to