This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2b3878043c9 [fix](regression test) fix test_abort_txn_by_fe #41382 (#41868) 2b3878043c9 is described below commit 2b3878043c97d58ddfefc0fbbe3f0471f9f10c18 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Wed Oct 16 09:46:04 2024 +0800 [fix](regression test) fix test_abort_txn_by_fe #41382 (#41868) cherry pick from #41382 --- be/src/cloud/cloud_stream_load_executor.cpp | 2 + .../org/apache/doris/regression/Config.groovy | 7 + .../org/apache/doris/regression/suite/Suite.groovy | 28 ++-- .../doris/regression/suite/SuiteCluster.groovy | 7 + ...e_cloud1.groovy => test_abort_txn_by_be.groovy} | 35 +++-- .../test_abort_txn_by_be_cloud2.groovy | 164 --------------------- .../test_abort_txn_by_be_local5.groovy | 164 --------------------- .../test_abort_txn_by_be_local6.groovy | 164 --------------------- ...e_cloud4.groovy => test_abort_txn_by_fe.groovy} | 42 ++++-- .../test_abort_txn_by_fe_local3.groovy | 104 ------------- 10 files changed, 85 insertions(+), 632 deletions(-) diff --git a/be/src/cloud/cloud_stream_load_executor.cpp b/be/src/cloud/cloud_stream_load_executor.cpp index 1352b4aac81..46ceca851e2 100644 --- a/be/src/cloud/cloud_stream_load_executor.cpp +++ b/be/src/cloud/cloud_stream_load_executor.cpp @@ -23,6 +23,7 @@ #include "common/logging.h" #include "common/status.h" #include "runtime/stream_load/stream_load_context.h" +#include "util/debug_points.h" namespace doris { @@ -96,6 +97,7 @@ Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) { } Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) { + DBUG_EXECUTE_IF("StreamLoadExecutor.commit_txn.block", DBUG_BLOCK); // forward to fe to excute commit transaction for MoW table if (ctx->is_mow_table() || !config::enable_stream_load_commit_txn_on_be || ctx->load_type == TLoadType::ROUTINE_LOAD) { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy index 5e79ccef21d..4487d76beaa 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/Config.groovy @@ -518,6 +518,13 @@ class Config { // mainly auth_xxx cases use defaultDb, these suites better not use defaultDb config.createDefaultDb() + try { + config.fetchCloudMode() + } catch (Exception e) { + // docker suite no need external cluster. + // so can ignore error here. + } + return config } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 3ff261d6b34..ba053aec1dd 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -290,17 +290,13 @@ class Suite implements GroovyInterceptable { + "see example demo_p0/docker_action.groovy") } - try { - context.config.fetchCloudMode() - } catch (Exception e) { - } - - boolean dockerIsCloud = false if (options.cloudMode == null) { if (context.config.runMode == RunMode.UNKNOWN) { - throw new Exception("Bad run mode, cloud or not_cloud is unknown") + dockerImpl(options, false, actionSupplier) + dockerImpl(options, true, actionSupplier) + } else { + dockerImpl(options, context.config.runMode == RunMode.CLOUD, actionSupplier) } - dockerIsCloud = context.config.runMode == RunMode.CLOUD } else { if (options.cloudMode == true && context.config.runMode == RunMode.NOT_CLOUD) { return @@ -308,12 +304,16 @@ class Suite implements GroovyInterceptable { if (options.cloudMode == false && context.config.runMode == RunMode.CLOUD) { return } - dockerIsCloud = options.cloudMode + dockerImpl(options, options.cloudMode, actionSupplier) } + } + + private void dockerImpl(ClusterOptions options, boolean isCloud, Closure actionSupplier) throws Exception { + logger.info("=== start run suite {} in {} mode. ===", name, (isCloud ? "cloud" : "not_cloud")) try { cluster.destroy(true) - cluster.init(options, dockerIsCloud) + cluster.init(options, isCloud) def user = context.config.jdbcUser def password = context.config.jdbcPassword @@ -329,7 +329,7 @@ class Suite implements GroovyInterceptable { logger.info("get fe {}", fe) assertNotNull(fe) - if (!dockerIsCloud) { + if (!isCloud) { for (def be : cluster.getAllBackends()) { be_report_disk(be.host, be.httpPort) } @@ -1481,7 +1481,11 @@ class Suite implements GroovyInterceptable { } boolean isCloudMode() { - return context.config.isCloudMode() + if (cluster.isRunning()) { + return cluster.isCloudMode() + } else { + return context.config.isCloudMode() + } } boolean enableStoragevault() { diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy index c272be39051..2aaece2c678 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy @@ -270,6 +270,7 @@ class SuiteCluster { final Config config private boolean running private boolean sqlModeNodeMgr = false + private boolean isCloudMode = false SuiteCluster(String name, Config config) { this.name = name @@ -282,6 +283,8 @@ class SuiteCluster { assert options.feNum > 0 || options.beNum > 0 assert config.image != null && config.image != '' + this.isCloudMode = isCloud + def cmd = [ 'up', name, config.image ] @@ -515,6 +518,10 @@ class SuiteCluster { return running } + boolean isCloudMode() { + return this.isCloudMode + } + // if not specific fe indices, then start all frontends void startFrontends(int... indices) { runFrontendsCmd('start', indices) diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud1.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_be.groovy similarity index 88% rename from regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud1.groovy rename to regression-test/suites/schema_change_p0/test_abort_txn_by_be.groovy index f2d0b767eb8..b1af2b1fcad 100644 --- a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud1.groovy +++ b/regression-test/suites/schema_change_p0/test_abort_txn_by_be.groovy @@ -18,13 +18,15 @@ import org.apache.doris.regression.suite.ClusterOptions import org.apache.http.NoHttpResponseException -suite('test_abort_txn_by_be_cloud1', 'docker') { +suite('test_abort_txn_by_be', 'docker') { + + def run_test = { enable_abort_txn_by_checking_coordinator_be, enable_abort_txn_by_checking_conflict_txn -> def options = new ClusterOptions() - options.cloudMode = true + options.cloudMode = null options.enableDebugPoints() options.beConfigs += [ "enable_java_support=false" ] - options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=true" ] - options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=false" ] + options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=${enable_abort_txn_by_checking_coordinator_be}" ] + options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=${enable_abort_txn_by_checking_conflict_txn}" ] options.beNum = 1 docker(options) { @@ -59,6 +61,8 @@ suite('test_abort_txn_by_be_cloud1', 'docker') { lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""" + GetDebugPoint().enableDebugPointForAllBEs('StreamLoadExecutor.commit_txn.block') + thread { streamLoad { // a default db 'regression_test' is specified in @@ -86,22 +90,21 @@ suite('test_abort_txn_by_be_cloud1', 'docker') { // if declared a check callback, the default check condition will ignore. // So you must check all condition check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) } } } - sleep(10000) + if (isCloudMode()) { + sleep 3000 + } else { + def dbId = getDbId() + dockerAwaitUntil(20, { + def txns = sql_return_maparray("show proc '/transactions/${dbId}/running'") + txns.size() > 0 + }) + } sql """ alter table ${tableName} modify column lo_suppkey bigint NULL """ - String result = "" int max_try_time = 3000 while (max_try_time--){ @@ -161,4 +164,8 @@ suite('test_abort_txn_by_be_cloud1', 'docker') { } } } + } + + run_test(true, false) + run_test(false, true) } diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud2.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud2.groovy deleted file mode 100644 index 7264ac7f90a..00000000000 --- a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_cloud2.groovy +++ /dev/null @@ -1,164 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -import org.apache.doris.regression.suite.ClusterOptions -import org.apache.http.NoHttpResponseException - -suite('test_abort_txn_by_be_cloud2', 'docker') { - def options = new ClusterOptions() - options.cloudMode = true - options.enableDebugPoints() - options.beConfigs += [ "enable_java_support=false" ] - options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false" ] - options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ] - options.beNum = 1 - - docker(options) { - def getJobState = { tableName -> - def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ - return jobStateResult[0][9] - } - - def s3BucketName = getS3BucketName() - def s3WithProperties = """WITH S3 ( - |"AWS_ACCESS_KEY" = "${getS3AK()}", - |"AWS_SECRET_KEY" = "${getS3SK()}", - |"AWS_ENDPOINT" = "${getS3Endpoint()}", - |"AWS_REGION" = "${getS3Region()}", - |"provider" = "${getS3Provider()}") - |PROPERTIES( - |"exec_mem_limit" = "8589934592", - |"load_parallelism" = "3")""".stripMargin() - - // set fe configuration - sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" - - def tableName = "lineorder" - // create table if not exists - sql new File("""${context.file.parent}/ddl/lineorder_delete.sql""").text - sql new File("""${context.file.parent}/ddl/lineorder_create.sql""").text - - def coordinatorBe = cluster.getAllBackends().get(0) - def coordinatorBeHost = coordinatorBe.host - - def column = """lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, - lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, - lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""" - - thread { - streamLoad { - // a default db 'regression_test' is specified in - // ${DORIS_HOME}/conf/regression-conf.groovy - table tableName - - // default label is UUID: - // set 'label' UUID.randomUUID().toString() - - // default column_separator is specify in doris fe config, usually is '\t'. - // this line change to ',' - set 'column_separator', '|' - set 'compress_type', 'GZ' - set 'columns', column - - - // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. - // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/ssb/sf100/lineorder.tbl.1.gz""" - - time 600 * 1000 - - // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows - - // if declared a check callback, the default check condition will ignore. - // So you must check all condition - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) - } - } - } - - sleep(10000) - - sql """ alter table ${tableName} modify column lo_suppkey bigint NULL """ - - String result = "" - int max_try_time = 3000 - while (max_try_time--){ - result = getJobState(tableName) - if (result == "PENDING") { - sleep(3000) - } else { - break; - } - } - if (max_try_time < 1){ - assertEquals(1,2) - } - assertEquals(result, "WAITING_TXN"); - - cluster.stopBackends(coordinatorBe.index) - def isDead = false - for (def i = 0; i < 10; i++) { - def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } - if (!be.Alive.toBoolean()) { - isDead = true - break - } - sleep 1000 - } - assertTrue(isDead) - sleep 10000 - - result = getJobState(tableName) - assertEquals(result, "WAITING_TXN"); - - // coordinatorBe restart, abort txn on it - cluster.startBackends(coordinatorBe.index) - def isAlive = false - for (def i = 0; i < 20; i++) { - def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } - if (be.Alive.toBoolean()) { - isAlive = true - break - } - sleep 1000 - } - assertTrue(isAlive) - sleep 5000 - - max_try_time = 3000 - while (max_try_time--){ - result = getJobState(tableName) - if (result == "FINISHED") { - sleep(3000) - break - } else { - sleep(100) - if (max_try_time < 1){ - assertEquals(1,2) - } - } - } - } -} diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local5.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local5.groovy deleted file mode 100644 index 3835da4ccb2..00000000000 --- a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local5.groovy +++ /dev/null @@ -1,164 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -import org.apache.doris.regression.suite.ClusterOptions -import org.apache.http.NoHttpResponseException - -suite('test_abort_txn_by_be_local5', 'docker') { - def options = new ClusterOptions() - options.cloudMode = false - options.enableDebugPoints() - options.beConfigs += [ "enable_java_support=false" ] - options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=true" ] - options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=false" ] - options.beNum = 1 - - docker(options) { - def getJobState = { tableName -> - def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ - return jobStateResult[0][9] - } - - def s3BucketName = getS3BucketName() - def s3WithProperties = """WITH S3 ( - |"AWS_ACCESS_KEY" = "${getS3AK()}", - |"AWS_SECRET_KEY" = "${getS3SK()}", - |"AWS_ENDPOINT" = "${getS3Endpoint()}", - |"AWS_REGION" = "${getS3Region()}", - |"provider" = "${getS3Provider()}") - |PROPERTIES( - |"exec_mem_limit" = "8589934592", - |"load_parallelism" = "3")""".stripMargin() - - // set fe configuration - sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" - - def tableName = "lineorder" - // create table if not exists - sql new File("""${context.file.parent}/ddl/lineorder_delete.sql""").text - sql new File("""${context.file.parent}/ddl/lineorder_create.sql""").text - - def coordinatorBe = cluster.getAllBackends().get(0) - def coordinatorBeHost = coordinatorBe.host - - def column = """lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, - lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, - lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""" - - thread { - streamLoad { - // a default db 'regression_test' is specified in - // ${DORIS_HOME}/conf/regression-conf.groovy - table tableName - - // default label is UUID: - // set 'label' UUID.randomUUID().toString() - - // default column_separator is specify in doris fe config, usually is '\t'. - // this line change to ',' - set 'column_separator', '|' - set 'compress_type', 'GZ' - set 'columns', column - - - // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. - // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/ssb/sf100/lineorder.tbl.1.gz""" - - time 600 * 1000 - - // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows - - // if declared a check callback, the default check condition will ignore. - // So you must check all condition - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) - } - } - } - - sleep(10000) - - sql """ alter table ${tableName} modify column lo_suppkey bigint NULL """ - - String result = "" - int max_try_time = 3000 - while (max_try_time--){ - result = getJobState(tableName) - if (result == "PENDING") { - sleep(3000) - } else { - break; - } - } - if (max_try_time < 1){ - assertEquals(1,2) - } - assertEquals(result, "WAITING_TXN"); - - cluster.stopBackends(coordinatorBe.index) - def isDead = false - for (def i = 0; i < 10; i++) { - def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } - if (!be.Alive.toBoolean()) { - isDead = true - break - } - sleep 1000 - } - assertTrue(isDead) - sleep 10000 - - result = getJobState(tableName) - assertEquals(result, "WAITING_TXN"); - - // coordinatorBe restart, abort txn on it - cluster.startBackends(coordinatorBe.index) - def isAlive = false - for (def i = 0; i < 20; i++) { - def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } - if (be.Alive.toBoolean()) { - isAlive = true - break - } - sleep 1000 - } - assertTrue(isAlive) - sleep 20000 - - max_try_time = 3000 - while (max_try_time--){ - result = getJobState(tableName) - if (result == "FINISHED") { - sleep(3000) - break - } else { - sleep(100) - if (max_try_time < 1){ - assertEquals(1,2) - } - } - } - } -} diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local6.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local6.groovy deleted file mode 100644 index ff53c412590..00000000000 --- a/regression-test/suites/schema_change_p0/test_abort_txn_by_be_local6.groovy +++ /dev/null @@ -1,164 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -import org.apache.doris.regression.suite.ClusterOptions -import org.apache.http.NoHttpResponseException - -suite('test_abort_txn_by_be_local6', 'docker') { - def options = new ClusterOptions() - options.cloudMode = false - options.enableDebugPoints() - options.beConfigs += [ "enable_java_support=false" ] - options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false" ] - options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ] - options.beNum = 1 - - docker(options) { - def getJobState = { tableName -> - def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ - return jobStateResult[0][9] - } - - def s3BucketName = getS3BucketName() - def s3WithProperties = """WITH S3 ( - |"AWS_ACCESS_KEY" = "${getS3AK()}", - |"AWS_SECRET_KEY" = "${getS3SK()}", - |"AWS_ENDPOINT" = "${getS3Endpoint()}", - |"AWS_REGION" = "${getS3Region()}", - |"provider" = "${getS3Provider()}") - |PROPERTIES( - |"exec_mem_limit" = "8589934592", - |"load_parallelism" = "3")""".stripMargin() - - // set fe configuration - sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" - - def tableName = "lineorder" - // create table if not exists - sql new File("""${context.file.parent}/ddl/lineorder_delete.sql""").text - sql new File("""${context.file.parent}/ddl/lineorder_create.sql""").text - - def coordinatorBe = cluster.getAllBackends().get(0) - def coordinatorBeHost = coordinatorBe.host - - def column = """lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority, - lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount, - lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode,lo_dummy""" - - thread { - streamLoad { - // a default db 'regression_test' is specified in - // ${DORIS_HOME}/conf/regression-conf.groovy - table tableName - - // default label is UUID: - // set 'label' UUID.randomUUID().toString() - - // default column_separator is specify in doris fe config, usually is '\t'. - // this line change to ',' - set 'column_separator', '|' - set 'compress_type', 'GZ' - set 'columns', column - - - // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. - // also, you can stream load a http stream, e.g. http://xxx/some.csv - file """${getS3Url()}/regression/ssb/sf100/lineorder.tbl.1.gz""" - - time 600 * 1000 - - // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows - - // if declared a check callback, the default check condition will ignore. - // So you must check all condition - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) - } - } - } - - sleep(10000) - - sql """ alter table ${tableName} modify column lo_suppkey bigint NULL """ - - String result = "" - int max_try_time = 3000 - while (max_try_time--){ - result = getJobState(tableName) - if (result == "PENDING") { - sleep(3000) - } else { - break; - } - } - if (max_try_time < 1){ - assertEquals(1,2) - } - assertEquals(result, "WAITING_TXN"); - - cluster.stopBackends(coordinatorBe.index) - def isDead = false - for (def i = 0; i < 10; i++) { - def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } - if (!be.Alive.toBoolean()) { - isDead = true - break - } - sleep 1000 - } - assertTrue(isDead) - sleep 10000 - - result = getJobState(tableName) - assertEquals(result, "WAITING_TXN"); - - // coordinatorBe restart, abort txn on it - cluster.startBackends(coordinatorBe.index) - def isAlive = false - for (def i = 0; i < 20; i++) { - def be = sql_return_maparray('show backends').find { it.Host == coordinatorBeHost } - if (be.Alive.toBoolean()) { - isAlive = true - break - } - sleep 1000 - } - assertTrue(isAlive) - sleep 5000 - - max_try_time = 3000 - while (max_try_time--){ - result = getJobState(tableName) - if (result == "FINISHED") { - sleep(3000) - break - } else { - sleep(100) - if (max_try_time < 1){ - assertEquals(1,2) - } - } - } - } -} diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_cloud4.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_fe.groovy similarity index 75% rename from regression-test/suites/schema_change_p0/test_abort_txn_by_fe_cloud4.groovy rename to regression-test/suites/schema_change_p0/test_abort_txn_by_fe.groovy index 80b61e16efd..d93e8a203e3 100644 --- a/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_cloud4.groovy +++ b/regression-test/suites/schema_change_p0/test_abort_txn_by_fe.groovy @@ -18,13 +18,17 @@ import org.apache.doris.regression.suite.ClusterOptions import org.apache.http.NoHttpResponseException -suite('test_abort_txn_by_fe_cloud4', 'docker') { +suite('test_abort_txn_by_fe', 'docker') { def options = new ClusterOptions() - options.cloudMode = true + options.cloudMode = null options.enableDebugPoints() options.beConfigs += [ "enable_java_support=false" ] - options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false" ] - options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ] + options.feConfigs += [ + "load_checker_interval_second=2", + "enable_abort_txn_by_checking_coordinator_be=false", + "enable_abort_txn_by_checking_conflict_txn=true", + ] + options.feNum = 3 options.beNum = 1 docker(options) { @@ -59,10 +63,15 @@ suite('test_abort_txn_by_fe_cloud4', 'docker') { loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties sql loadSql - def coordinatorFe = cluster.getAllFrontends().get(0) - def coordinatorFeHost = coordinatorFe.host - - sleep(5000) + if (isCloudMode()) { + sleep 6000 + } else { + def dbId = getDbId() + dockerAwaitUntil(20, { + def txns = sql_return_maparray("show proc '/transactions/${dbId}/running'") + txns.any { it.Label == loadLabel } + }) + } sql """ alter table ${table} modify column lo_suppkey bigint NULL """ @@ -82,9 +91,22 @@ suite('test_abort_txn_by_fe_cloud4', 'docker') { sleep 10000 assertEquals(result, "WAITING_TXN"); - cluster.restartFrontends() - sleep(30000) + def oldMasterFe = cluster.getMasterFe() + cluster.restartFrontends(oldMasterFe.index) + boolean hasRestart = false + for (int i = 0; i < 30; i++) { + if (cluster.getFeByIndex(oldMasterFe.index).alive) { + hasRestart = true + break + } + sleep 1000 + } + assertTrue(hasRestart) context.reconnectFe() + if (!isCloudMode()) { + def newMasterFe = cluster.getMasterFe() + assertTrue(oldMasterFe.index != newMasterFe.index) + } max_try_time = 3000 while (max_try_time--){ diff --git a/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_local3.groovy b/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_local3.groovy deleted file mode 100644 index 32cd9d0eba7..00000000000 --- a/regression-test/suites/schema_change_p0/test_abort_txn_by_fe_local3.groovy +++ /dev/null @@ -1,104 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -import org.apache.doris.regression.suite.ClusterOptions -import org.apache.http.NoHttpResponseException - -suite('test_abort_txn_by_fe_local3', 'docker') { - def options = new ClusterOptions() - options.cloudMode = false - options.enableDebugPoints() - options.beConfigs += [ "enable_java_support=false" ] - options.feConfigs += [ "enable_abort_txn_by_checking_coordinator_be=false" ] - options.feConfigs += [ "enable_abort_txn_by_checking_conflict_txn=true" ] - options.beNum = 1 - - docker(options) { - def getJobState = { tableName -> - def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """ - return jobStateResult[0][9] - } - - def s3BucketName = getS3BucketName() - def s3WithProperties = """WITH S3 ( - |"AWS_ACCESS_KEY" = "${getS3AK()}", - |"AWS_SECRET_KEY" = "${getS3SK()}", - |"AWS_ENDPOINT" = "${getS3Endpoint()}", - |"AWS_REGION" = "${getS3Region()}", - |"provider" = "${getS3Provider()}") - |PROPERTIES( - |"exec_mem_limit" = "8589934592", - |"load_parallelism" = "3")""".stripMargin() - - // set fe configuration - sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" - - def table= "lineorder" - // create table if not exists - sql new File("""${context.file.parent}/ddl/lineorder_delete.sql""").text - sql new File("""${context.file.parent}/ddl/lineorder_create.sql""").text - def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() - def loadLabel = table + '_' + uniqueID - - // load data from cos - def loadSql = new File("""${context.file.parent}/ddl/${table}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) - loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) + s3WithProperties - sql loadSql - - def coordinatorFe = cluster.getAllFrontends().get(0) - def coordinatorFeHost = coordinatorFe.host - - sleep(5000) - - sql """ alter table ${table} modify column lo_suppkey bigint NULL """ - - String result = "" - int max_try_time = 3000 - while (max_try_time--){ - result = getJobState(table) - if (result == "PENDING") { - sleep(3000) - } else { - break; - } - } - if (max_try_time < 1){ - assertEquals(1,2) - } - sleep 10000 - assertEquals(result, "WAITING_TXN"); - - cluster.restartFrontends() - sleep(30000) - context.reconnectFe() - - max_try_time = 3000 - while (max_try_time--){ - result = getJobState(table) - System.out.println(result) - if (result == "FINISHED") { - sleep(3000) - break - } else { - sleep(100) - if (max_try_time < 1){ - assertEquals(1,2) - } - } - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org