This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new ae4ae30cfe7 [enhancement](regression) fault injection for segcompaction test (#25709) (#26305) ae4ae30cfe7 is described below commit ae4ae30cfe702027730a64a2f253f1634c159383 Author: HHoflittlefish777 <77738092+hhoflittlefish...@users.noreply.github.com> AuthorDate: Mon Nov 6 23:08:19 2023 +0800 [enhancement](regression) fault injection for segcompaction test (#25709) (#26305) 1. generalized debug point facilities from docker suites for fault-injection/stubbing cases 2. add segcompaction fault-injection cases for demonstration 3. add -238 TOO_MANY_SEGMENTS fault-injection case for good Co-authored-by: zhengyu <freeman.zhang1...@gmail.com> --- be/src/olap/rowset/beta_rowset_writer.cpp | 4 + be/src/olap/rowset/segcompaction.cpp | 5 + .../test_segcompaction_fault_injection.out | 7 + .../test_too_many_segments_fault_injection.out | 3 + .../test_segcompaction_fault_injection.out | 3 + .../org/apache/doris/regression/suite/Suite.groovy | 21 ++- .../doris/regression/suite/SuiteCluster.groovy | 3 + .../apache/doris/regression/util/DebugPoint.groovy | 139 ++++++++++++++++++++ regression-test/pipeline/p0/conf/be.conf | 7 +- .../pipeline/p0/conf/regression-conf.groovy | 2 +- .../test_segcompaction_fault_injection.groovy | 143 +++++++++++++++++++++ .../test_too_many_segments_fault_injection.groovy | 124 ++++++++++++++++++ 12 files changed, 450 insertions(+), 11 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 5379ed65f1e..6d1334895da 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -50,7 +50,9 @@ #include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" +#include "runtime/thread_context.h" #include "segcompaction.h" +#include "util/debug_points.h" #include "util/slice.h" #include "util/time.h" #include "vec/common/schema_util.h" // LocalSchemaChangeRecorder @@ -733,6 +735,8 @@ Status BetaRowsetWriter::_do_create_segment_writer( Status BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer, const FlushContext* flush_ctx) { size_t total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; + DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", + { total_segment_num = dp->param("segnum", 1024); }); if (UNLIKELY(total_segment_num > config::max_segment_num_per_rowset)) { return Status::Error<TOO_MANY_SEGMENTS>( "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, " diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index f967cb86966..6b0635e0f77 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -57,6 +57,7 @@ #include "olap/storage_engine.h" #include "olap/tablet_schema.h" #include "runtime/thread_context.h" +#include "util/debug_points.h" #include "util/mem_info.h" #include "util/time.h" #include "vec/olap/vertical_block_reader.h" @@ -164,6 +165,7 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat } } + DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_sum_src_row", { sum_src_row++; }); if (raw_rows_read != sum_src_row) { return Status::Error<CHECK_LINES_ERROR>( "segcompaction read row num does not match source. expect read row:{}, actual read " @@ -171,12 +173,15 @@ Status SegcompactionWorker::_check_correctness(OlapReaderStatistics& reader_stat sum_src_row, raw_rows_read); } + DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_merged_rows", { merged_rows++; }); if ((output_rows + merged_rows) != raw_rows_read) { return Status::Error<CHECK_LINES_ERROR>( "segcompaction total row num does not match after merge. expect total row:{}, " "actual total row:{}, (output_rows:{},merged_rows:{})", raw_rows_read, output_rows + merged_rows, output_rows, merged_rows); } + DBUG_EXECUTE_IF("SegcompactionWorker._check_correctness_wrong_filtered_rows", + { filtered_rows++; }); if (filtered_rows != 0) { return Status::Error<CHECK_LINES_ERROR>( "segcompaction should not have filtered rows but actual filtered rows:{}", diff --git a/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out b/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out new file mode 100644 index 00000000000..e25bd2c3f92 --- /dev/null +++ b/regression-test/data/fault_injection_p0/fault_injection_p0/test_segcompaction_fault_injection.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- + +-- !select_default -- + +-- !select_default -- + diff --git a/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out b/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out new file mode 100644 index 00000000000..afeab4c41d0 --- /dev/null +++ b/regression-test/data/fault_injection_p0/fault_injection_p0/test_too_many_segments_fault_injection.out @@ -0,0 +1,3 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- + diff --git a/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out b/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out new file mode 100644 index 00000000000..afeab4c41d0 --- /dev/null +++ b/regression-test/data/segcompaction_p2/test_segcompaction_fault_injection.out @@ -0,0 +1,3 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_default -- + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index f03a59a0042..9902e30678e 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -38,6 +38,7 @@ import org.apache.doris.regression.action.HttpCliAction import org.apache.doris.regression.util.JdbcUtils import org.apache.doris.regression.util.Hdfs import org.apache.doris.regression.util.SuiteUtils +import org.apache.doris.regression.util.DebugPoint import org.junit.jupiter.api.Assertions import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -69,12 +70,14 @@ class Suite implements GroovyInterceptable { final List<Future> lazyCheckFutures = new Vector<>() SuiteCluster cluster + DebugPoint debugPoint Suite(String name, String group, SuiteContext context) { this.name = name this.group = group this.context = context this.cluster = null + this.debugPoint = new DebugPoint(this) } String getConf(String key, String defaultValue = null) { @@ -447,7 +450,7 @@ class Suite implements GroovyInterceptable { String s3Url = "http://${s3BucketName}.${s3Endpoint}" return s3Url } - + void scpFiles(String username, String host, String files, String filePath, boolean fromDst=true) { String cmd = "scp -r ${username}@${host}:${files} ${filePath}" if (!fromDst) { @@ -458,7 +461,7 @@ class Suite implements GroovyInterceptable { def code = process.waitFor() Assert.assertEquals(0, code) } - + void sshExec(String username, String host, String cmd) { String command = "ssh ${username}@${host} '${cmd}'" def cmds = ["/bin/bash", "-c", command] @@ -470,7 +473,7 @@ class Suite implements GroovyInterceptable { assert errMsg.length() == 0: "error occurred!" + errMsg assert p.exitValue() == 0 } - + void getBackendIpHttpPort(Map<String, String> backendId_to_backendIP, Map<String, String> backendId_to_backendHttpPort) { List<List<Object>> backends = sql("show backends"); @@ -480,7 +483,7 @@ class Suite implements GroovyInterceptable { backendId_to_backendHttpPort.put(String.valueOf(backend[0]), String.valueOf(backend[4])); } return; - } + } int getTotalLine(String filePath) { def file = new File(filePath) @@ -663,14 +666,14 @@ class Suite implements GroovyInterceptable { String cleanedSqlStr = sql.replaceAll("\\s*;\\s*\$", "") sql = cleanedSqlStr } - quickRunTest(tag, sql, isOrder) + quickRunTest(tag, sql, isOrder) } void quickExecute(String tag, PreparedStatement stmt) { logger.info("Execute tag: ${tag}, sql: ${stmt}".toString()) - quickRunTest(tag, stmt) + quickRunTest(tag, stmt) } - + @Override Object invokeMethod(String name, Object args) { // qt: quick test @@ -716,5 +719,9 @@ class Suite implements GroovyInterceptable { return (row[4] as String) == "FINISHED" } + + DebugPoint GetDebugPoint() { + return debugPoint + } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index 85dbea17b56..3fb12b8e346 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -18,6 +18,9 @@ package org.apache.doris.regression.suite import org.apache.doris.regression.Config +import org.apache.doris.regression.util.Http +import org.apache.doris.regression.util.DebugPoint +import org.apache.doris.regression.util.NodeType import com.google.common.collect.Maps import org.slf4j.Logger diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy new file mode 100644 index 00000000000..c30f7fbbcbe --- /dev/null +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/DebugPoint.groovy @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.regression.util +import org.apache.doris.regression.util.Http +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.suite.Suite + +enum NodeType { + FE, + BE, +} + +class DebugPoint { + Suite suite + + DebugPoint(Suite suite) { + this.suite = suite + } + + /* Enable debug point in regression + * Note: set BE config::enable_debug_points = true to take effect + * Parameters: + * host: hostname or ip of target node + * httpPort: http port of target node + * type: NodeType.BE or NodeType.FE + * name: debug point name + * params: timeout, execute, or other customized input params + */ + static def enableDebugPoint(String host, String httpPort, NodeType type, String name, Map<String, String> params = null) { + def url = 'http://' + host + ':' + httpPort + '/api/debug_point/add/' + name + if (params != null && params.size() > 0) { + url += '?' + params.collect((k, v) -> k + '=' + v).join('&') + } + def result = Http.http_post(url, null, true) + checkHttpResult(result, type) + } + + /* Disable debug point in regression + * Parameters: + * host: hostname or ip of target node + * httpPort: http port of target node + * type: NodeType.BE or NodeType.FE + * name: debug point name + */ + static def disableDebugPoint(String host, String httpPort, NodeType type, String name) { + def url = 'http://' + host + ':' + httpPort + '/api/debug_point/remove/' + name + def result = Http.http_post(url, null, true) + checkHttpResult(result, type) + } + + /* Disable all debug points in regression + * Parameters: + * host: hostname or ip of target node + * httpPort: http port of target node + * type: NodeType.BE or NodeType.FE + */ + static def clearDebugPoints(String host, String httpPort, NodeType type) { + def url = 'http://' + host + ':' + httpPort + '/api/debug_point/clear' + def result = Http.http_post(url, null, true) + checkHttpResult(result, type) + } + + def operateDebugPointForAllBEs(Closure closure) { + def ipList = [:] + def portList = [:] + (ipList, portList) = getBEHostAndHTTPPort() + ipList.each { beid, ip -> + closure.call(ip, portList[beid]) + } + } + + /* Enable specific debug point for all BE node in cluster */ + def enableDebugPointForAllBEs(String name, Map<String, String> params = null) { + operateDebugPointForAllBEs({ host, port -> + println "enable debug point $name for BE $host:$port" + enableDebugPoint(host, port, NodeType.BE, name, params) + }) + } + + /* Disable specific debug point for all BE node in cluster */ + def disableDebugPointForAllBEs(String name) { + operateDebugPointForAllBEs { host, port -> + disableDebugPoint(host, port, NodeType.BE, name) + } + } + + /* Disable all debug points for all BE node in cluster */ + def clearDebugPointsForAllBEs() { + operateDebugPointForAllBEs { host, port -> + clearDebugPoints(host, port, NodeType.BE) + } + } + + def getBEHostAndHTTPPort() { + def ipList = [:] + def portList = [:] + suite.getBackendIpHttpPort(ipList, portList) + return [ipList, portList] + } + + def getFEHostAndHTTPPort() { + assert false : 'not implemented yet' + } + + def enableDebugPointForAllFEs(String name, Map<String, String> params = null) { + assert false : 'not implemented yet' + } + + def disableDebugPointForAllFEs(String name) { + assert false : 'not implemented yet' + } + + def clearDebugPointsForAllFEs() { + assert false : 'not implemented yet' + } + + static void checkHttpResult(Object result, NodeType type) { + if (type == NodeType.FE) { + assert result.code == 0 : result.toString() + } else if (type == NodeType.BE) { + assert result.status == 'OK' : result.toString() + } + } +} + diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index 2216bc3e80a..eb4e91730ea 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -20,7 +20,7 @@ PPROF_TMPDIR="$DORIS_HOME/log/" # INFO, WARNING, ERROR, FATAL sys_log_level = INFO -# ports for admin, web, heartbeat service +# ports for admin, web, heartbeat service be_port = 9161 webserver_port = 8141 heartbeat_service_port = 9151 @@ -36,7 +36,7 @@ buffer_pool_limit = 2% storage_page_cache_limit = 0% disable_storage_page_cache = true chunk_reserved_bytes_limit = 134217728 -# Choose one if there are more than one ip except loopback address. +# Choose one if there are more than one ip except loopback address. # Note that there should at most one ip match this list. # If no ip match this rule, will choose one randomly. # use CIDR format, e.g. 10.10.10.0/24 @@ -48,7 +48,7 @@ chunk_reserved_bytes_limit = 134217728 # you can add capacity limit at the end of each root path, seperate by ',' # eg: # /home/disk2/doris, capacity limit is disk capacity, HDD(default) -# +# # you also can specify the properties by setting '<property>:<value>', seperate by ',' # property 'medium' has a higher priority than the extension of path # @@ -75,3 +75,4 @@ enable_feature_binlog=true max_sys_mem_available_low_water_mark_bytes=69206016 user_files_secure_path=/ enable_merge_on_write_correctness_check=false +enable_debug_points=true diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy b/regression-test/pipeline/p0/conf/regression-conf.groovy index ad48a3bdd05..7551bc086b3 100644 --- a/regression-test/pipeline/p0/conf/regression-conf.groovy +++ b/regression-test/pipeline/p0/conf/regression-conf.groovy @@ -57,7 +57,7 @@ excludeGroups = "" excludeSuites = "test_sql_block_rule,test_outfile_exception,test_digest,test_aggregate_all_functions2,test_cast_with_scale_type,test_hive_read_orc_complex_type,test_with_and_two_phase_agg,explode,test_cast_function,test_profile,test_broker_load_p2,test_spark_load,test_analyze_stats_p1,test_refresh_mtmv,test_bitmap_filter,test_export_parquet,test_doris_jdbc_catalog" // this directories will not be executed -excludeDirectories = "nereids_tpcds_shape_sf100_p0,nereids_tpch_shape_sf1000_p0,nereids_tpch_shape_sf500_p0,workload_manager_p1" +excludeDirectories = "nereids_tpcds_shape_sf100_p0,nereids_tpch_shape_sf1000_p0,nereids_tpch_shape_sf500_p0,workload_manager_p1,fault_injection_p0" customConf1 = "test_custom_conf_value" diff --git a/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy new file mode 100644 index 00000000000..2f601f13d11 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_segcompaction_fault_injection.groovy @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +def tableName = "segcompaction_correctness_test" +def create_table_sql = """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20), + `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20), + `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20), + `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20), + `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20), + `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20), + `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20), + `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20), + `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20), + `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20) + ) + DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1 + PROPERTIES ( "replication_num" = "1" ); + """ +def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49" + +suite("test_segcompaction_correctness") { + def runLoadWithSegcompaction = { + String ak = getS3AK() + String sk = getS3SK() + String endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = getS3BucketName() + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + String backend_id; + try { + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + boolean disableAutoCompaction = true + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) + } + } + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql "${create_table_sql}" + + def uuid = UUID.randomUUID().toString().replace("-", "0") + String columns_str = ("$columns" != "") ? "($columns)" : ""; + + sql """ + LOAD LABEL $uuid ( + DATA INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc") + INTO TABLE ${tableName} + FORMAT AS "ORC" + $columns_str + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "$endpoint", + "AWS_REGION" = "$region" + ) + properties( + "use_new_load_scan_node" = "true" + ) + """ + + def max_try_milli_secs = 3600000 + String [][] result = '' + while (max_try_milli_secs > 0) { + result = sql """ show load where label="$uuid" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + logger.info("Load FINISHED " + " $uuid") + break; + } + if (result[0][2].equals("CANCELLED")) { + logger.info("Load CANCELLED " + " $uuid") + break; + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertTrue(1 == 2, "load Timeout: $uuid") + } + } + assertTrue(result[0][2].equals("CANCELLED")) + + result = sql """ show load where label="$uuid" order by createtime desc limit 1; """ + qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47 order by col_1, col_2; """ + tablets = sql """ show tablets from ${tableName}; """ + } finally { + try_sql("DROP TABLE IF EXISTS ${tableName}") + } + } + + // wrong_sum_src_row + try { + GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_sum_src_row") + runLoadWithSegcompaction() + } finally { + GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_sum_src_row") + } + + // wrong_merged_rows + try { + GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_merged_rows") + runLoadWithSegcompaction() + } finally { + GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_merged_rows") + } + + // wrong_filtered_rows + try { + GetDebugPoint().enableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_filtered_rows") + runLoadWithSegcompaction() + } finally { + GetDebugPoint().disableDebugPointForAllBEs("SegcompactionWorker._check_correctness_wrong_filtered_rows") + } +} + diff --git a/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy new file mode 100644 index 00000000000..b68e324ee98 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_too_many_segments_fault_injection.groovy @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods +import org.apache.doris.regression.util.Http + +def tableName = "too_many_segments_test" +def create_table_sql = """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `col_0` BIGINT NOT NULL,`col_1` VARCHAR(20),`col_2` VARCHAR(20),`col_3` VARCHAR(20),`col_4` VARCHAR(20), + `col_5` VARCHAR(20),`col_6` VARCHAR(20),`col_7` VARCHAR(20),`col_8` VARCHAR(20),`col_9` VARCHAR(20), + `col_10` VARCHAR(20),`col_11` VARCHAR(20),`col_12` VARCHAR(20),`col_13` VARCHAR(20),`col_14` VARCHAR(20), + `col_15` VARCHAR(20),`col_16` VARCHAR(20),`col_17` VARCHAR(20),`col_18` VARCHAR(20),`col_19` VARCHAR(20), + `col_20` VARCHAR(20),`col_21` VARCHAR(20),`col_22` VARCHAR(20),`col_23` VARCHAR(20),`col_24` VARCHAR(20), + `col_25` VARCHAR(20),`col_26` VARCHAR(20),`col_27` VARCHAR(20),`col_28` VARCHAR(20),`col_29` VARCHAR(20), + `col_30` VARCHAR(20),`col_31` VARCHAR(20),`col_32` VARCHAR(20),`col_33` VARCHAR(20),`col_34` VARCHAR(20), + `col_35` VARCHAR(20),`col_36` VARCHAR(20),`col_37` VARCHAR(20),`col_38` VARCHAR(20),`col_39` VARCHAR(20), + `col_40` VARCHAR(20),`col_41` VARCHAR(20),`col_42` VARCHAR(20),`col_43` VARCHAR(20),`col_44` VARCHAR(20), + `col_45` VARCHAR(20),`col_46` VARCHAR(20),`col_47` VARCHAR(20),`col_48` VARCHAR(20),`col_49` VARCHAR(20) + ) + DUPLICATE KEY(`col_0`) DISTRIBUTED BY HASH(`col_0`) BUCKETS 1 + PROPERTIES ( "replication_num" = "1" ); + """ +def columns = "col_0, col_1, col_2, col_3, col_4, col_5, col_6, col_7, col_8, col_9, col_10, col_11, col_12, col_13, col_14, col_15, col_16, col_17, col_18, col_19, col_20, col_21, col_22, col_23, col_24, col_25, col_26, col_27, col_28, col_29, col_30, col_31, col_32, col_33, col_34, col_35, col_36, col_37, col_38, col_39, col_40, col_41, col_42, col_43, col_44, col_45, col_46, col_47, col_48, col_49" + +suite("test_too_many_segments") { // the epic -238 case + def runLoadWithTooManySegments = { + String ak = getS3AK() + String sk = getS3SK() + String endpoint = getS3Endpoint() + String region = getS3Region() + String bucket = getS3BucketName() + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + String backend_id; + try { + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + boolean disableAutoCompaction = true + for (Object ele in (List) configList) { + assert ele instanceof List<String> + if (((List<String>) ele)[0] == "disable_auto_compaction") { + disableAutoCompaction = Boolean.parseBoolean(((List<String>) ele)[2]) + } + } + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql "${create_table_sql}" + + def uuid = UUID.randomUUID().toString().replace("-", "0") + String columns_str = ("$columns" != "") ? "($columns)" : ""; + + sql """ + LOAD LABEL $uuid ( + DATA INFILE("s3://$bucket/regression/segcompaction/segcompaction.orc") + INTO TABLE ${tableName} + FORMAT AS "ORC" + $columns_str + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "$endpoint", + "AWS_REGION" = "$region" + ) + properties( + "use_new_load_scan_node" = "true" + ) + """ + + Thread.sleep(2000) + GetDebugPoint().enableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments", + ["segnum":2000]) + + def max_try_milli_secs = 3600000 + String [][] result = '' + while (max_try_milli_secs > 0) { + result = sql """ show load where label="$uuid" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + logger.info("Load FINISHED " + " $uuid") + break; + } + if (result[0][2].equals("CANCELLED")) { + logger.info("Load CANCELLED " + " $uuid") + break; + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertTrue(1 == 2, "load Timeout: $uuid") + } + } + assertTrue(result[0][7].contains("-238")) // EPIC! + + result = sql """ show load where label="$uuid" order by createtime desc limit 1; """ + qt_select_default """ SELECT * FROM ${tableName} WHERE col_0=47 order by col_1, col_2; """ + tablets = sql """ show tablets from ${tableName}; """ + } finally { + try_sql("DROP TABLE IF EXISTS ${tableName}") + GetDebugPoint().disableDebugPointForAllBEs("BetaRowsetWriter._check_segment_number_limit_too_many_segments") + } + } + runLoadWithTooManySegments() +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org