This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 0b9817ea4b4 [cherry-pick](branch-2.1) Pick "[Enhancement](txn) Block new insert into if schema change happens during transaction (#39483)" (#40115) 0b9817ea4b4 is described below commit 0b9817ea4b4840597b6433189389336b18c1b9fd Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Fri Aug 30 10:01:10 2024 +0800 [cherry-pick](branch-2.1) Pick "[Enhancement](txn) Block new insert into if schema change happens during transaction (#39483)" (#40115) ## Proposed changes Pick #39483 <!--Describe your changes.--> --- be/src/olap/schema_change.cpp | 1 + .../insert/BatchInsertIntoTableCommand.java | 11 ++ .../trees/plans/physical/PhysicalResultSink.java | 4 - .../java/org/apache/doris/qe/StmtExecutor.java | 1 + .../apache/doris/transaction/TransactionEntry.java | 18 +++ .../data/insert_p0/{ => transaction}/test_txn.out | 0 .../insert_p0/{ => transaction}/txn_insert.out | 0 .../txn_insert_values_with_schema_change.out | 0 ...pecify_columns_schema_change_add_key_column.out | 15 +++ ...cify_columns_schema_change_add_value_column.out | 15 +++ ...pecify_columns_schema_change_reorder_column.out | 19 +++ .../insert_p0/{ => transaction}/test_txn.groovy | 0 .../insert_p0/{ => transaction}/txn_insert.groovy | 0 .../txn_insert_values_with_schema_change.groovy | 2 +- ...ify_columns_schema_change_add_key_column.groovy | 127 ++++++++++++++++++++ ...y_columns_schema_change_add_value_column.groovy | 129 +++++++++++++++++++++ ...ify_columns_schema_change_reorder_column.groovy | 129 +++++++++++++++++++++ 17 files changed, 466 insertions(+), 5 deletions(-) diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 97d367e78d4..f50c1fa9522 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -735,6 +735,7 @@ std::unordered_set<int64_t> SchemaChangeHandler::_tablet_ids_in_converting; // The admin should upgrade all BE and then upgrade FE. // Should delete the old code after upgrade finished. Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2& request) { + DBUG_EXECUTE_IF("SchemaChangeJob._do_process_alter_tablet.sleep", { sleep(10); }) Status res = Status::OK(); TabletSharedPtr base_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(request.base_tablet_id); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java index 4b7afb1f6a8..12c9827f552 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/BatchInsertIntoTableCommand.java @@ -19,6 +19,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.ErrorCode; @@ -114,6 +115,16 @@ public class BatchInsertIntoTableCommand extends Command implements NoForward, E Preconditions.checkArgument(plan.isPresent(), "insert into command must contain OlapTableSinkNode"); sink = ((PhysicalOlapTableSink<?>) plan.get()); Table targetTable = sink.getTargetTable(); + if (ctx.getTxnEntry().isFirstTxnInsert()) { + ctx.getTxnEntry().setTxnSchemaVersion(((OlapTable) targetTable).getBaseSchemaVersion()); + ctx.getTxnEntry().setFirstTxnInsert(false); + } else { + if (((OlapTable) targetTable).getBaseSchemaVersion() != ctx.getTxnEntry().getTxnSchemaVersion()) { + throw new AnalysisException("There are schema changes in one transaction, " + + "you can commit this transaction with formal data or rollback " + + "this whole transaction."); + } + } // should set columns of sink since we maybe generate some invisible columns List<Column> fullSchema = sink.getTargetTable().getFullSchema(); List<Column> targetSchema = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java index aceb1f13774..8fb6dfb286e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java @@ -58,10 +58,6 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CH logicalProperties, physicalProperties, statistics, child); } - public List<NamedExpression> getOutputExprs() { - return outputExprs; - } - @Override public PhysicalResultSink<Plan> withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c78a701d279..062f83443f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1978,6 +1978,7 @@ public class StmtExecutor { .setThriftRpcTimeoutMs(5000).setTxnId(-1).setDb("").setTbl("") .setMaxFilterRatio(context.getSessionVariable().getEnableInsertStrict() ? 0 : context.getSessionVariable().getInsertMaxFilterRatio())); + context.getTxnEntry().setFirstTxnInsert(true); StringBuilder sb = new StringBuilder(); sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 'status':'") .append(TransactionStatus.PREPARE.name()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java index 816740a3202..b88ca66027a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java @@ -67,6 +67,8 @@ public class TransactionEntry { private List<InternalService.PDataRow> dataToSend = new ArrayList<>(); private long rowsInTransaction = 0; private Types.PUniqueId pLoadId; + private boolean isFirstTxnInsert = false; + private volatile int txnSchemaVersion = -1; // for insert into select for multi tables private boolean isTransactionBegan = false; @@ -164,6 +166,22 @@ public class TransactionEntry { this.pLoadId = pLoadId; } + public boolean isFirstTxnInsert() { + return isFirstTxnInsert; + } + + public void setFirstTxnInsert(boolean firstTxnInsert) { + isFirstTxnInsert = firstTxnInsert; + } + + public int getTxnSchemaVersion() { + return txnSchemaVersion; + } + + public void setTxnSchemaVersion(int txnSchemaVersion) { + this.txnSchemaVersion = txnSchemaVersion; + } + // Used for insert into select public void beginTransaction(DatabaseIf database, TableIf table) throws DdlException, BeginTransactionException, MetaNotFoundException, AnalysisException, diff --git a/regression-test/data/insert_p0/test_txn.out b/regression-test/data/insert_p0/transaction/test_txn.out similarity index 100% rename from regression-test/data/insert_p0/test_txn.out rename to regression-test/data/insert_p0/transaction/test_txn.out diff --git a/regression-test/data/insert_p0/txn_insert.out b/regression-test/data/insert_p0/transaction/txn_insert.out similarity index 100% rename from regression-test/data/insert_p0/txn_insert.out rename to regression-test/data/insert_p0/transaction/txn_insert.out diff --git a/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out b/regression-test/data/insert_p0/transaction/txn_insert_values_with_schema_change.out similarity index 100% rename from regression-test/data/insert_p0/txn_insert_values_with_schema_change.out rename to regression-test/data/insert_p0/transaction/txn_insert_values_with_schema_change.out diff --git a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out new file mode 100644 index 00000000000..b06ce07b4a8 --- /dev/null +++ b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.out @@ -0,0 +1,15 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_desc1 -- +c1 int Yes true \N +c2 int Yes false \N NONE +c3 int Yes false \N NONE + +-- !select_desc2 -- +c1 int Yes true \N +new_col int Yes true \N +c2 int Yes false \N NONE +c3 int Yes false \N NONE + +-- !select1 -- +1 \N 2 3 + diff --git a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out new file mode 100644 index 00000000000..560051c8800 --- /dev/null +++ b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.out @@ -0,0 +1,15 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_desc1 -- +c1 int Yes true \N +c2 int Yes false \N NONE +c3 int Yes false \N NONE + +-- !select_desc2 -- +c1 int Yes true \N +c2 int Yes false \N NONE +c3 int Yes false \N NONE +new_col int Yes false \N NONE + +-- !select1 -- +1 2 3 \N + diff --git a/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out new file mode 100644 index 00000000000..2cac2aa11bc --- /dev/null +++ b/regression-test/data/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_desc1 -- +c1 int Yes true \N +c2 bigint Yes false \N NONE +c3 int Yes false \N NONE + +-- !select_desc2 -- +c1 int Yes true \N +c3 int Yes false \N NONE +c2 bigint Yes false \N NONE + +-- !select_desc3 -- +c1 int Yes true \N +c3 int Yes false \N NONE +c2 bigint Yes false \N NONE + +-- !select1 -- +1 3 2 + diff --git a/regression-test/suites/insert_p0/test_txn.groovy b/regression-test/suites/insert_p0/transaction/test_txn.groovy similarity index 100% rename from regression-test/suites/insert_p0/test_txn.groovy rename to regression-test/suites/insert_p0/transaction/test_txn.groovy diff --git a/regression-test/suites/insert_p0/txn_insert.groovy b/regression-test/suites/insert_p0/transaction/txn_insert.groovy similarity index 100% rename from regression-test/suites/insert_p0/txn_insert.groovy rename to regression-test/suites/insert_p0/transaction/txn_insert.groovy diff --git a/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy similarity index 98% rename from regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy rename to regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy index cd428b185c4..477579760aa 100644 --- a/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy +++ b/regression-test/suites/insert_p0/transaction/txn_insert_values_with_schema_change.groovy @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit suite("txn_insert_values_with_schema_change") { def table = "txn_insert_values_with_schema_change" - def dbName = "regression_test_insert_p0" + def dbName = "regression_test_insert_p0_transaction" def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" logger.info("url: ${url}") List<String> errors = new ArrayList<>() diff --git a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy new file mode 100644 index 00000000000..de9162cabc7 --- /dev/null +++ b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_key_column.groovy @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.Statement +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +suite("txn_insert_with_specify_columns_schema_change_add_key_column", "nonConcurrent") { + if(!isCloudMode()) { + def table = "txn_insert_with_specify_columns_schema_change_add_key_column" + + def dbName = "regression_test_insert_p0_transaction" + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" + logger.info("url: ${url}") + List<String> errors = new ArrayList<>() + CountDownLatch insertLatch = new CountDownLatch(1) + CountDownLatch insertLatch2 = new CountDownLatch(1) + + sql """ DROP TABLE IF EXISTS $table force """ + sql """ + create table $table ( + c1 INT NULL, + c2 INT NULL, + c3 INT NULL + ) ENGINE=OLAP + UNIQUE KEY(c1) + DISTRIBUTED BY HASH(c1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1"); + """ + sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """ + + def getAlterTableState = { job_state -> + def retry = 0 + sql "use ${dbName};" + while (true) { + sleep(2000) + def state = sql " show alter table column where tablename = '${table}' order by CreateTime desc limit 1" + logger.info("alter table state: ${state}") + if (state.size() > 0 && state[0][9] == job_state) { + return + } + retry++ + if (retry >= 10) { + break + } + } + assertTrue(false, "alter table job state is ${last_state}, not ${job_state} after retry ${retry} times") + } + + def txnInsert = { + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + try { + qt_select_desc1 """desc $table""" + + insertLatch.await(2, TimeUnit.MINUTES) + + statement.execute("begin") + statement.execute("insert into ${table} (c3, c2, c1) values (33, 22, 11),(333, 222, 111);") + + insertLatch2.await(2, TimeUnit.MINUTES) + qt_select_desc2 """desc $table""" + statement.execute("insert into ${table} (c3, c2, c1) values(3333, 2222, 1111);") + statement.execute("insert into ${table} (c3, c2, c1) values(33333, 22222, 11111),(333333, 222222, 111111);") + statement.execute("commit") + } catch (Exception e) { + logger.info("txn insert failed", e) + assertTrue(e.getMessage().contains("There are schema changes in one transaction, you can commit this transaction with formal data or rollback this whole transaction.")) + statement.execute("rollback") + } + } + } + + def schemaChange = { sql -> + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + statement.execute(sql) + getAlterTableState("RUNNING") + insertLatch.countDown() + getAlterTableState("FINISHED") + insertLatch2.countDown() + } catch (Throwable e) { + logger.error("schema change failed", e) + errors.add("schema change failed " + e.getMessage()) + } + } + + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep") + Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table} add column new_col int key after c1;")) + Thread insert_thread = new Thread(() -> txnInsert()) + schema_change_thread.start() + insert_thread.start() + schema_change_thread.join() + insert_thread.join() + + logger.info("errors: " + errors) + assertEquals(0, errors.size()) + getAlterTableState("FINISHED") + order_qt_select1 """select * from ${table} order by c1, c2, c3""" + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(false) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +} diff --git a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy new file mode 100644 index 00000000000..1d072f572c6 --- /dev/null +++ b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_add_value_column.groovy @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.Statement +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +suite("txn_insert_with_specify_columns_schema_change_add_value_column", "nonConcurrent") { + if(!isCloudMode()) { + def table = "txn_insert_with_specify_columns_schema_change_add_value_column" + + def dbName = "regression_test_insert_p0_transaction" + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" + logger.info("url: ${url}") + List<String> errors = new ArrayList<>() + CountDownLatch insertLatch = new CountDownLatch(1) + CountDownLatch schemaChangeLatch = new CountDownLatch(1) + + sql """ DROP TABLE IF EXISTS $table force """ + sql """ + create table $table ( + c1 INT NULL, + c2 INT NULL, + c3 INT NULL + ) ENGINE=OLAP + UNIQUE KEY(c1) + DISTRIBUTED BY HASH(c1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1"); + """ + sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """ + + def getAlterTableState = { job_state -> + def retry = 0 + sql "use ${dbName};" + while (true) { + sleep(2000) + def state = sql " show alter table column where tablename = '${table}' order by CreateTime desc limit 1" + logger.info("alter table state: ${state}") + if (state.size() > 0 && state[0][9] == job_state) { + return + } + retry++ + if (retry >= 10) { + break + } + } + assertTrue(false, "alter table job state is ${last_state}, not ${job_state} after retry ${retry} times") + } + + def txnInsert = { + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + try { + qt_select_desc1 """desc $table""" + + statement.execute("begin") + statement.execute("insert into ${table} (c3, c2, c1) values (33, 22, 11),(333, 222, 111);") + + schemaChangeLatch.countDown() + + insertLatch.await(2, TimeUnit.MINUTES) + + qt_select_desc2 """desc $table""" + statement.execute("insert into ${table} (c3, c2, c1) values(3333, 2222, 1111);") + statement.execute("insert into ${table} (c3, c2, c1) values(33333, 22222, 11111),(333333, 222222, 111111);") + statement.execute("commit") + } catch (Exception e) { + logger.info("txn insert failed", e) + assertTrue(e.getMessage().contains("There are schema changes in one transaction, you can commit this transaction with formal data or rollback this whole transaction.")) + statement.execute("rollback") + } + } + } + + def schemaChange = { sql -> + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + schemaChangeLatch.await(2, TimeUnit.MINUTES) + statement.execute(sql) + getAlterTableState("FINISHED") + insertLatch.countDown() + } catch (Throwable e) { + logger.error("schema change failed", e) + errors.add("schema change failed " + e.getMessage()) + } + } + + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep") + Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table} add column new_col int after c3;")) + Thread insert_thread = new Thread(() -> txnInsert()) + schema_change_thread.start() + insert_thread.start() + + schema_change_thread.join() + insert_thread.join() + + logger.info("errors: " + errors) + assertEquals(0, errors.size()) + getAlterTableState("FINISHED") + order_qt_select1 """select * from ${table} order by c1, c2, c3""" + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(false) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } + +} diff --git a/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy new file mode 100644 index 00000000000..e4c16fdf8c3 --- /dev/null +++ b/regression-test/suites/insert_p0/transaction/txn_insert_with_specify_columns_schema_change_reorder_column.groovy @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.Statement +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +suite("txn_insert_with_specify_columns_schema_change_reorder_column", "nonConcurrent") { + if(!isCloudMode()){ + def table = "txn_insert_with_specify_columns_schema_change_reorder_column" + + def dbName = "regression_test_insert_p0_transaction" + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" + logger.info("url: ${url}") + List<String> errors = new ArrayList<>() + CountDownLatch insertLatch = new CountDownLatch(1) + CountDownLatch insertLatch2 = new CountDownLatch(1) + + sql """ DROP TABLE IF EXISTS $table force """ + sql """ + create table $table ( + c1 INT NULL, + c2 BIGINT NULL, + c3 INT NULL + ) ENGINE=OLAP + UNIQUE KEY(c1) + DISTRIBUTED BY HASH(c1) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1"); + """ + sql """ insert into ${table} (c3, c2, c1) values (3, 2, 1) """ + + def getAlterTableState = { job_state -> + def retry = 0 + sql "use ${dbName};" + while (true) { + sleep(2000) + def state = sql " show alter table column where tablename = '${table}' order by CreateTime desc limit 1" + logger.info("alter table state: ${state}") + if (state.size() > 0 && state[0][9] == job_state) { + return + } + retry++ + if (retry >= 10) { + break + } + } + assertTrue(false, "alter table job state is ${last_state}, not ${job_state} after retry ${retry} times") + } + + def txnInsert = { + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + try { + qt_select_desc1 """desc $table""" + + insertLatch.await(2, TimeUnit.MINUTES) + + statement.execute("begin") + statement.execute("insert into ${table} (c3, c2, c1) values (33, 22, 11),(333, 222, 111);") + + insertLatch2.await(2, TimeUnit.MINUTES) + + qt_select_desc2 """desc $table""" + statement.execute("insert into ${table} (c3, c2, c1) values(3333, 2222, 1111);") + statement.execute("insert into ${table} (c3, c2, c1) values(33333, 22222, 11111),(333333, 222222, 111111);") + statement.execute("commit") + } catch (Throwable e) { + logger.error("txn insert failed", e) + assertTrue(e.getMessage().contains("There are schema changes in one transaction, you can commit this transaction with formal data or rollback this whole transaction.")) + statement.execute("rollback") + } + } + } + + def schemaChange = { sql -> + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + statement.execute(sql) + getAlterTableState("RUNNING") + insertLatch.countDown() + getAlterTableState("FINISHED") + insertLatch2.countDown() + } catch (Throwable e) { + logger.error("schema change failed", e) + errors.add("schema change failed " + e.getMessage()) + } + } + + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("SchemaChangeJob._do_process_alter_tablet.sleep") + Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table} order by (c1,c3,c2);")) + Thread insert_thread = new Thread(() -> txnInsert()) + schema_change_thread.start() + insert_thread.start() + schema_change_thread.join() + insert_thread.join() + + logger.info("errors: " + errors) + assertEquals(0, errors.size()) + getAlterTableState("FINISHED") + qt_select_desc3 """desc $table""" + order_qt_select1 """select * from ${table} order by c1, c2, c3""" + } catch (Exception e) { + logger.info("failed: " + e.getMessage()) + assertTrue(false) + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org