This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 5abc84af715456b882201b3f10a0594448418671 Author: meiyi <myime...@gmail.com> AuthorDate: Thu Apr 18 20:37:55 2024 +0800 [fix](txn insert) Fix txn insert commit failed when schema change (#33706) --- .../apache/doris/planner/StreamLoadPlanner.java | 4 + .../apache/doris/qe/InsertStreamTxnExecutor.java | 153 ++++++++++----------- .../txn_insert_values_with_schema_change.out | 13 ++ .../txn_insert_values_with_schema_change.groovy | 109 +++++++++++++++ 4 files changed, 202 insertions(+), 77 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 1e377389b44..8f75f5476f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -111,10 +111,12 @@ public class StreamLoadPlanner { return destTable; } + // the caller should get table read lock when call this method public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { return this.plan(loadId, 1); } + // the caller should get table read lock when call this method // create the plan. the plan's query id and load id are same, using the parameter 'loadId' public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdIndex) throws UserException { if (destTable.getKeysType() != KeysType.UNIQUE_KEYS @@ -340,11 +342,13 @@ public class StreamLoadPlanner { return params; } + // the caller should get table read lock when call this method // single table plan fragmentInstanceIndex is 1(default value) public TPipelineFragmentParams planForPipeline(TUniqueId loadId) throws UserException { return this.planForPipeline(loadId, 1); } + // the caller should get table read lock when call this method public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentInstanceIdIndex) throws UserException { if (destTable.getKeysType() != KeysType.UNIQUE_KEYS && taskInfo.getMergeType() != LoadTask.MergeType.APPEND) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java index f37457cf58d..e89553e672d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java @@ -43,6 +43,7 @@ import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TTxnParams; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionEntry; +import org.apache.doris.transaction.TransactionState; import org.apache.thrift.TException; @@ -65,92 +66,90 @@ public class InsertStreamTxnExecutor { public void beginTransaction(TStreamLoadPutRequest request) throws UserException, TException, TimeoutException, InterruptedException, ExecutionException { TTxnParams txnConf = txnEntry.getTxnConf(); + OlapTable table = (OlapTable) txnEntry.getTable(); // StreamLoadTask's id == request's load_id StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); - StreamLoadPlanner planner = new StreamLoadPlanner( - (Database) txnEntry.getDb(), (OlapTable) txnEntry.getTable(), streamLoadTask); - // Will using load id as query id in fragment - if (Config.enable_pipeline_load) { - TPipelineFragmentParams tRequest = planner.planForPipeline(streamLoadTask.getId()); - BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build(); - List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); - if (beIds.isEmpty()) { - throw new UserException("No available backend to match the policy: " + policy); - } - - tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); - for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.local_params.get(0) - .per_node_scan_ranges.entrySet()) { - for (TScanRangeParams scanRangeParams : entry.getValue()) { - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( - TFileFormatType.FORMAT_PROTO); - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( - TFileCompressType.PLAIN); + StreamLoadPlanner planner = new StreamLoadPlanner((Database) txnEntry.getDb(), table, streamLoadTask); + boolean enablePipelineLoad = Config.enable_pipeline_load; + TPipelineFragmentParamsList pipelineParamsList = new TPipelineFragmentParamsList(); + TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); + if (!table.tryReadLock(1, TimeUnit.MINUTES)) { + throw new UserException("get table read lock timeout, database=" + table.getDatabase().getId() + ",table=" + + table.getName()); + } + try { + // Will using load id as query id in fragment + if (enablePipelineLoad) { + TPipelineFragmentParams tRequest = planner.planForPipeline(streamLoadTask.getId()); + tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); + for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.local_params.get(0) + .per_node_scan_ranges.entrySet()) { + for (TScanRangeParams scanRangeParams : entry.getValue()) { + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( + TFileFormatType.FORMAT_PROTO); + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( + TFileCompressType.PLAIN); + } } - } - txnConf.setFragmentInstanceId(tRequest.local_params.get(0).fragment_instance_id); - this.loadId = request.getLoadId(); - this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() - .setHi(loadId.getHi()) - .setLo(loadId.getLo()).build()); - - Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); - txnConf.setUserIp(backend.getHost()); - txnEntry.setBackend(backend); - TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - try { - TPipelineFragmentParamsList paramsList = new TPipelineFragmentParamsList(); - paramsList.addToParamsList(tRequest); - Future<InternalService.PExecPlanFragmentResult> future = - BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false); - InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList()); + txnConf.setFragmentInstanceId(tRequest.local_params.get(0).fragment_instance_id); + this.loadId = request.getLoadId(); + this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() + .setHi(loadId.getHi()) + .setLo(loadId.getLo()).build()); + + pipelineParamsList.addToParamsList(tRequest); + } else { + TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); + tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); + for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges + .entrySet()) { + for (TScanRangeParams scanRangeParams : entry.getValue()) { + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( + TFileFormatType.FORMAT_PROTO); + scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( + TFileCompressType.PLAIN); + } } - } catch (RpcException e) { - throw new TException(e); + txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id); + this.loadId = request.getLoadId(); + this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() + .setHi(loadId.getHi()) + .setLo(loadId.getLo()).build()); + paramsList.addToParamsList(tRequest); } - } else { - TExecPlanFragmentParams tRequest = planner.plan(streamLoadTask.getId()); - BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build(); - List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); - if (beIds.isEmpty()) { - throw new UserException("No available backend to match the policy: " + policy); + TransactionState transactionState = Env.getCurrentGlobalTransactionMgr() + .getTransactionState(table.getDatabase().getId(), streamLoadTask.getTxnId()); + if (transactionState != null) { + transactionState.addTableIndexes(table); } + } finally { + table.readUnlock(); + } - tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel()); - for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.params.per_node_scan_ranges.entrySet()) { - for (TScanRangeParams scanRangeParams : entry.getValue()) { - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType( - TFileFormatType.FORMAT_PROTO); - scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType( - TFileCompressType.PLAIN); - } + BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build(); + List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1); + if (beIds.isEmpty()) { + throw new UserException("No available backend to match the policy: " + policy); + } + + Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); + txnConf.setUserIp(backend.getHost()); + txnEntry.setBackend(backend); + TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); + try { + Future<InternalService.PExecPlanFragmentResult> future; + if (enablePipelineLoad) { + future = BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, pipelineParamsList, false); + } else { + future = BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false); } - txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id); - this.loadId = request.getLoadId(); - this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder() - .setHi(loadId.getHi()) - .setLo(loadId.getLo()).build()); - - Backend backend = Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0)); - txnConf.setUserIp(backend.getHost()); - txnEntry.setBackend(backend); - TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort()); - try { - TExecPlanFragmentParamsList paramsList = new TExecPlanFragmentParamsList(); - paramsList.addToParamsList(tRequest); - Future<InternalService.PExecPlanFragmentResult> future = - BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, false); - InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); - TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); - if (code != TStatusCode.OK) { - throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList()); - } - } catch (RpcException e) { - throw new TException(e); + InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList()); } + } catch (RpcException e) { + throw new TException(e); } } diff --git a/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out b/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out new file mode 100644 index 00000000000..9e1016fe0e2 --- /dev/null +++ b/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out @@ -0,0 +1,13 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +0 a 10 +1 b 20 +2 c 30 +3 d 40 + +-- !select2 -- +0 a 10 +1 b 20 +2 c 30 +3 d 40 + diff --git a/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy b/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy new file mode 100644 index 00000000000..cd428b185c4 --- /dev/null +++ b/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.sql.Connection +import java.sql.DriverManager +import java.sql.Statement +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +suite("txn_insert_values_with_schema_change") { + def table = "txn_insert_values_with_schema_change" + + def dbName = "regression_test_insert_p0" + def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true" + logger.info("url: ${url}") + List<String> errors = new ArrayList<>() + CountDownLatch insertLatch = new CountDownLatch(1) + CountDownLatch schemaChangeLatch = new CountDownLatch(1) + + sql """ DROP TABLE IF EXISTS $table force """ + sql """ + create table $table ( + `ID` int(11) NOT NULL, + `NAME` varchar(100) NULL, + `score` int(11) NULL + ) ENGINE=OLAP + duplicate KEY(`id`) + distributed by hash(id) buckets 1 + properties("replication_num" = "1"); + """ + sql """ insert into ${table} values(0, 'a', 10) """ + + def getAlterTableState = { job_state -> + def retry = 0 + sql "use ${dbName};" + while (true) { + sleep(2000) + def state = sql " show alter table column where tablename = '${table}' order by CreateTime desc limit 1" + logger.info("alter table state: ${state}") + if (state.size() > 0 && state[0][9] == job_state) { + return + } + retry++ + if (retry >= 10) { + break + } + } + Assert.fail("alter table job state is not ${job_state} after retry 10 times") + } + + def txnInsert = { + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + statement.execute("SET enable_nereids_planner = true") + statement.execute("SET enable_fallback_to_original_planner = false") + statement.execute("begin") + statement.execute("insert into ${table} values(1, 'b', 20), (2, 'c', 30);") + + schemaChangeLatch.countDown() + insertLatch.await(2, TimeUnit.MINUTES) + + statement.execute("insert into ${table} values(3, 'd', 40);") + statement.execute("commit") + } catch (Throwable e) { + logger.error("txn insert failed", e) + errors.add("txn insert failed " + e.getMessage()) + } + } + + def schemaChange = { sql, job_state -> + try (Connection conn = DriverManager.getConnection(url, context.config.jdbcUser, context.config.jdbcPassword); + Statement statement = conn.createStatement()) { + schemaChangeLatch.await(2, TimeUnit.MINUTES) + statement.execute(sql) + getAlterTableState(job_state) + insertLatch.countDown() + } catch (Throwable e) { + logger.error("schema change failed", e) + errors.add("schema change failed " + e.getMessage()) + } + } + + Thread insert_thread = new Thread(() -> txnInsert()) + Thread schema_change_thread = new Thread(() -> schemaChange("alter table ${table} order by (id, score, name);", "WAITING_TXN")) + insert_thread.start() + schema_change_thread.start() + insert_thread.join() + schema_change_thread.join() + + logger.info("errors: " + errors) + assertEquals(0, errors.size()) + order_qt_select1 """select id, name, score from ${table} """ + getAlterTableState("FINISHED") + order_qt_select2 """select id, name, score from ${table} """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org