This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 38114ce1e86 [fix](merge-cloud) fix single rowset did not trigger compaction in cloud mode (#34622) 38114ce1e86 is described below commit 38114ce1e860799a3100a6395211e8dbcc27fb63 Author: Luwei <814383...@qq.com> AuthorDate: Fri May 10 10:42:34 2024 +0800 [fix](merge-cloud) fix single rowset did not trigger compaction in cloud mode (#34622) --- .../cloud/cloud_cumulative_compaction_policy.cpp | 11 ++-- .../suites/compaction/test_base_compaction.groovy | 74 ++++++++++++--------- .../test_base_compaction_no_value.groovy | 76 ++++++++++++---------- 3 files changed, 92 insertions(+), 69 deletions(-) diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp b/be/src/cloud/cloud_cumulative_compaction_policy.cpp index d040b21d421..1c658e5d546 100644 --- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp +++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp @@ -54,7 +54,7 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( const int64_t max_compaction_score, const int64_t min_compaction_score, std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version, size_t* compaction_score, bool allow_delete) { - //size_t promotion_size = tablet->cumulative_promotion_size(); + size_t promotion_size = cloud_promotion_size(tablet); auto max_version = tablet->max_version().first; int transient_size = 0; *compaction_score = 0; @@ -93,6 +93,10 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( input_rowsets->push_back(rowset); } + if (total_size >= promotion_size) { + return transient_size; + } + // if there is delete version, do compaction directly if (last_delete_version->first != -1) { if (input_rowsets->size() == 1) { @@ -154,9 +158,8 @@ int32_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( *compaction_score = new_compaction_score; VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = " - << *compaction_score << ", total_size = " - << total_size - //<< ", calc promotion size value = " << promotion_size + << *compaction_score << ", total_size = " << total_size + << ", calc promotion size value = " << promotion_size << ", tablet = " << tablet->tablet_id() << ", input_rowset size " << input_rowsets->size(); diff --git a/regression-test/suites/compaction/test_base_compaction.groovy b/regression-test/suites/compaction/test_base_compaction.groovy index d0c8074ca4c..5aae1ecdef7 100644 --- a/regression-test/suites/compaction/test_base_compaction.groovy +++ b/regression-test/suites/compaction/test_base_compaction.groovy @@ -33,14 +33,6 @@ suite("test_base_compaction") { def configList = parseJson(out.trim()) assert configList instanceof List - boolean disableAutoCompaction = true - for (Object ele in (List) configList) { - assert ele instanceof List<String> - if (((List<String>) ele)[0] == "disable_auto_compaction") { - disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) - } - } - sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( @@ -64,7 +56,8 @@ suite("test_base_compaction") { UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER) DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 1 PROPERTIES ( - "replication_num" = "1" + "replication_num" = "1", + "disable_auto_compaction" = "true" ) """ @@ -84,7 +77,42 @@ suite("test_base_compaction") { // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz""" + file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz""" + + time 10000 // limit inflight 10s + + // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + + streamLoad { + // a default db 'regression_test' is specified in + // ${DORIS_HOME}/conf/regression-conf.groovy + table tableName + + // default label is UUID: + // set 'label' UUID.randomUUID().toString() + + // default column_separator is specify in doris fe config, usually is '\t'. + // this line change to ',' + set 'column_separator', '|' + set 'compress_type', 'GZ' + + // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. + // also, you can stream load a http stream, e.g. http://xxx/some.csv + file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz""" time 10000 // limit inflight 10s @@ -114,13 +142,7 @@ suite("test_base_compaction") { logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) assertEquals(code, 0) def compactJson = parseJson(out.trim()) - if (compactJson.status.toLowerCase() == "fail") { - assertEquals(disableAutoCompaction, false) - logger.info("Compaction was done automatically!") - } - if (disableAutoCompaction) { - assertEquals("success", compactJson.status.toLowerCase()) - } + assertEquals("success", compactJson.status.toLowerCase()) } // wait for all compactions done @@ -154,7 +176,7 @@ suite("test_base_compaction") { // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz""" + file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz""" time 10000 // limit inflight 10s @@ -182,13 +204,7 @@ suite("test_base_compaction") { logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) assertEquals(code, 0) def compactJson = parseJson(out.trim()) - if (compactJson.status.toLowerCase() == "fail") { - assertEquals(disableAutoCompaction, false) - logger.info("Compaction was done automatically!") - } - if (disableAutoCompaction) { - assertEquals("success", compactJson.status.toLowerCase()) - } + assertEquals("success", compactJson.status.toLowerCase()) } // wait for all compactions done @@ -219,13 +235,7 @@ suite("test_base_compaction") { logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) assertEquals(code, 0) def compactJson = parseJson(out.trim()) - if (compactJson.status.toLowerCase() == "fail") { - assertEquals(disableAutoCompaction, false) - logger.info("Compaction was done automatically!") - } - if (disableAutoCompaction) { - assertEquals("success", compactJson.status.toLowerCase()) - } + assertEquals("success", compactJson.status.toLowerCase()) } // wait for all compactions done diff --git a/regression-test/suites/compaction/test_base_compaction_no_value.groovy b/regression-test/suites/compaction/test_base_compaction_no_value.groovy index 81ce0cd8263..91a50ce4dcd 100644 --- a/regression-test/suites/compaction/test_base_compaction_no_value.groovy +++ b/regression-test/suites/compaction/test_base_compaction_no_value.groovy @@ -27,20 +27,12 @@ suite("test_base_compaction_no_value") { backend_id = backendId_to_backendIP.keySet()[0] def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) - + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) assertEquals(code, 0) def configList = parseJson(out.trim()) assert configList instanceof List - boolean disableAutoCompaction = true - for (Object ele in (List) configList) { - assert ele instanceof List<String> - if (((List<String>) ele)[0] == "disable_auto_compaction") { - disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) - } - } - sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE IF NOT EXISTS ${tableName} ( @@ -64,7 +56,8 @@ suite("test_base_compaction_no_value") { UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT) DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 1 PROPERTIES ( - "replication_num" = "1" + "replication_num" = "1", + "disable_auto_compaction" = "true" ) """ @@ -84,7 +77,42 @@ suite("test_base_compaction_no_value") { // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz""" + file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz""" + + time 10000 // limit inflight 10s + + // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + + streamLoad { + // a default db 'regression_test' is specified in + // ${DORIS_HOME}/conf/regression-conf.groovy + table tableName + + // default label is UUID: + // set 'label' UUID.randomUUID().toString() + + // default column_separator is specify in doris fe config, usually is '\t'. + // this line change to ',' + set 'column_separator', '|' + set 'compress_type', 'GZ' + + // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. + // also, you can stream load a http stream, e.g. http://xxx/some.csv + file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz""" time 10000 // limit inflight 10s @@ -114,13 +142,7 @@ suite("test_base_compaction_no_value") { logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) assertEquals(code, 0) def compactJson = parseJson(out.trim()) - if (compactJson.status.toLowerCase() == "fail") { - assertEquals(disableAutoCompaction, false) - logger.info("Compaction was done automatically!") - } - if (disableAutoCompaction) { - assertEquals("success", compactJson.status.toLowerCase()) - } + assertEquals("success", compactJson.status.toLowerCase()) } // wait for all compactions done @@ -154,7 +176,7 @@ suite("test_base_compaction_no_value") { // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split01.gz""" + file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split00.gz""" time 10000 // limit inflight 10s @@ -182,13 +204,7 @@ suite("test_base_compaction_no_value") { logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) assertEquals(code, 0) def compactJson = parseJson(out.trim()) - if (compactJson.status.toLowerCase() == "fail") { - assertEquals(disableAutoCompaction, false) - logger.info("Compaction was done automatically!") - } - if (disableAutoCompaction) { - assertEquals("success", compactJson.status.toLowerCase()) - } + assertEquals("success", compactJson.status.toLowerCase()) } // wait for all compactions done @@ -219,13 +235,7 @@ suite("test_base_compaction_no_value") { logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) assertEquals(code, 0) def compactJson = parseJson(out.trim()) - if (compactJson.status.toLowerCase() == "fail") { - assertEquals(disableAutoCompaction, false) - logger.info("Compaction was done automatically!") - } - if (disableAutoCompaction) { - assertEquals("success", compactJson.status.toLowerCase()) - } + assertEquals("success", compactJson.status.toLowerCase()) } // wait for all compactions done --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org