This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5abc84af715456b882201b3f10a0594448418671
Author: meiyi <myime...@gmail.com>
AuthorDate: Thu Apr 18 20:37:55 2024 +0800

    [fix](txn insert) Fix txn insert commit failed when schema change (#33706)
---
 .../apache/doris/planner/StreamLoadPlanner.java    |   4 +
 .../apache/doris/qe/InsertStreamTxnExecutor.java   | 153 ++++++++++-----------
 .../txn_insert_values_with_schema_change.out       |  13 ++
 .../txn_insert_values_with_schema_change.groovy    | 109 +++++++++++++++
 4 files changed, 202 insertions(+), 77 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 1e377389b44..8f75f5476f8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -111,10 +111,12 @@ public class StreamLoadPlanner {
         return destTable;
     }
 
+    // the caller should get table read lock when call this method
     public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException 
{
         return this.plan(loadId, 1);
     }
 
+    // the caller should get table read lock when call this method
     // create the plan. the plan's query id and load id are same, using the 
parameter 'loadId'
     public TExecPlanFragmentParams plan(TUniqueId loadId, int 
fragmentInstanceIdIndex) throws UserException {
         if (destTable.getKeysType() != KeysType.UNIQUE_KEYS
@@ -340,11 +342,13 @@ public class StreamLoadPlanner {
         return params;
     }
 
+    // the caller should get table read lock when call this method
     // single table plan fragmentInstanceIndex is 1(default value)
     public TPipelineFragmentParams planForPipeline(TUniqueId loadId) throws 
UserException {
         return this.planForPipeline(loadId, 1);
     }
 
+    // the caller should get table read lock when call this method
     public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int 
fragmentInstanceIdIndex) throws UserException {
         if (destTable.getKeysType() != KeysType.UNIQUE_KEYS
                 && taskInfo.getMergeType() != LoadTask.MergeType.APPEND) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index f37457cf58d..e89553e672d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -43,6 +43,7 @@ import org.apache.doris.thrift.TStreamLoadPutRequest;
 import org.apache.doris.thrift.TTxnParams;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.transaction.TransactionEntry;
+import org.apache.doris.transaction.TransactionState;
 
 import org.apache.thrift.TException;
 
@@ -65,92 +66,90 @@ public class InsertStreamTxnExecutor {
     public void beginTransaction(TStreamLoadPutRequest request) throws 
UserException, TException, TimeoutException,
             InterruptedException, ExecutionException {
         TTxnParams txnConf = txnEntry.getTxnConf();
+        OlapTable table = (OlapTable) txnEntry.getTable();
         // StreamLoadTask's id == request's load_id
         StreamLoadTask streamLoadTask = 
StreamLoadTask.fromTStreamLoadPutRequest(request);
-        StreamLoadPlanner planner = new StreamLoadPlanner(
-                (Database) txnEntry.getDb(), (OlapTable) txnEntry.getTable(), 
streamLoadTask);
-        // Will using load id as query id in fragment
-        if (Config.enable_pipeline_load) {
-            TPipelineFragmentParams tRequest = 
planner.planForPipeline(streamLoadTask.getId());
-            BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
-            List<Long> beIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
-            if (beIds.isEmpty()) {
-                throw new UserException("No available backend to match the 
policy: " + policy);
-            }
-
-            tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
-            for (Map.Entry<Integer, List<TScanRangeParams>> entry : 
tRequest.local_params.get(0)
-                    .per_node_scan_ranges.entrySet()) {
-                for (TScanRangeParams scanRangeParams : entry.getValue()) {
-                    
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
-                            TFileFormatType.FORMAT_PROTO);
-                    
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
-                            TFileCompressType.PLAIN);
+        StreamLoadPlanner planner = new StreamLoadPlanner((Database) 
txnEntry.getDb(), table, streamLoadTask);
+        boolean enablePipelineLoad = Config.enable_pipeline_load;
+        TPipelineFragmentParamsList pipelineParamsList = new 
TPipelineFragmentParamsList();
+        TExecPlanFragmentParamsList paramsList = new 
TExecPlanFragmentParamsList();
+        if (!table.tryReadLock(1, TimeUnit.MINUTES)) {
+            throw new UserException("get table read lock timeout, database=" + 
table.getDatabase().getId() + ",table="
+                    + table.getName());
+        }
+        try {
+            // Will using load id as query id in fragment
+            if (enablePipelineLoad) {
+                TPipelineFragmentParams tRequest = 
planner.planForPipeline(streamLoadTask.getId());
+                
tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
+                for (Map.Entry<Integer, List<TScanRangeParams>> entry : 
tRequest.local_params.get(0)
+                        .per_node_scan_ranges.entrySet()) {
+                    for (TScanRangeParams scanRangeParams : entry.getValue()) {
+                        
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
+                                TFileFormatType.FORMAT_PROTO);
+                        
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
+                                TFileCompressType.PLAIN);
+                    }
                 }
-            }
-            
txnConf.setFragmentInstanceId(tRequest.local_params.get(0).fragment_instance_id);
-            this.loadId = request.getLoadId();
-            this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
-                    .setHi(loadId.getHi())
-                    .setLo(loadId.getLo()).build());
-
-            Backend backend = 
Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
-            txnConf.setUserIp(backend.getHost());
-            txnEntry.setBackend(backend);
-            TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
-            try {
-                TPipelineFragmentParamsList paramsList = new 
TPipelineFragmentParamsList();
-                paramsList.addToParamsList(tRequest);
-                Future<InternalService.PExecPlanFragmentResult> future =
-                        
BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, 
false);
-                InternalService.PExecPlanFragmentResult result = future.get(5, 
TimeUnit.SECONDS);
-                TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-                if (code != TStatusCode.OK) {
-                    throw new TException("failed to execute plan fragment: " + 
result.getStatus().getErrorMsgsList());
+                
txnConf.setFragmentInstanceId(tRequest.local_params.get(0).fragment_instance_id);
+                this.loadId = request.getLoadId();
+                this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+                        .setHi(loadId.getHi())
+                        .setLo(loadId.getLo()).build());
+
+                pipelineParamsList.addToParamsList(tRequest);
+            } else {
+                TExecPlanFragmentParams tRequest = 
planner.plan(streamLoadTask.getId());
+                
tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
+                for (Map.Entry<Integer, List<TScanRangeParams>> entry : 
tRequest.params.per_node_scan_ranges
+                        .entrySet()) {
+                    for (TScanRangeParams scanRangeParams : entry.getValue()) {
+                        
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
+                                TFileFormatType.FORMAT_PROTO);
+                        
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
+                                TFileCompressType.PLAIN);
+                    }
                 }
-            } catch (RpcException e) {
-                throw new TException(e);
+                
txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
+                this.loadId = request.getLoadId();
+                this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+                        .setHi(loadId.getHi())
+                        .setLo(loadId.getLo()).build());
+                paramsList.addToParamsList(tRequest);
             }
-        } else {
-            TExecPlanFragmentParams tRequest = 
planner.plan(streamLoadTask.getId());
-            BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
-            List<Long> beIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
-            if (beIds.isEmpty()) {
-                throw new UserException("No available backend to match the 
policy: " + policy);
+            TransactionState transactionState = 
Env.getCurrentGlobalTransactionMgr()
+                    .getTransactionState(table.getDatabase().getId(), 
streamLoadTask.getTxnId());
+            if (transactionState != null) {
+                transactionState.addTableIndexes(table);
             }
+        } finally {
+            table.readUnlock();
+        }
 
-            tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
-            for (Map.Entry<Integer, List<TScanRangeParams>> entry : 
tRequest.params.per_node_scan_ranges.entrySet()) {
-                for (TScanRangeParams scanRangeParams : entry.getValue()) {
-                    
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
-                            TFileFormatType.FORMAT_PROTO);
-                    
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
-                            TFileCompressType.PLAIN);
-                }
+        BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
+        List<Long> beIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
+        if (beIds.isEmpty()) {
+            throw new UserException("No available backend to match the policy: 
" + policy);
+        }
+
+        Backend backend = 
Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
+        txnConf.setUserIp(backend.getHost());
+        txnEntry.setBackend(backend);
+        TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
+        try {
+            Future<InternalService.PExecPlanFragmentResult> future;
+            if (enablePipelineLoad) {
+                future = 
BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, 
pipelineParamsList, false);
+            } else {
+                future = 
BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, 
false);
             }
-            
txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
-            this.loadId = request.getLoadId();
-            this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
-                    .setHi(loadId.getHi())
-                    .setLo(loadId.getLo()).build());
-
-            Backend backend = 
Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
-            txnConf.setUserIp(backend.getHost());
-            txnEntry.setBackend(backend);
-            TNetworkAddress address = new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort());
-            try {
-                TExecPlanFragmentParamsList paramsList = new 
TExecPlanFragmentParamsList();
-                paramsList.addToParamsList(tRequest);
-                Future<InternalService.PExecPlanFragmentResult> future =
-                        
BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, paramsList, 
false);
-                InternalService.PExecPlanFragmentResult result = future.get(5, 
TimeUnit.SECONDS);
-                TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
-                if (code != TStatusCode.OK) {
-                    throw new TException("failed to execute plan fragment: " + 
result.getStatus().getErrorMsgsList());
-                }
-            } catch (RpcException e) {
-                throw new TException(e);
+            InternalService.PExecPlanFragmentResult result = future.get(5, 
TimeUnit.SECONDS);
+            TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+            if (code != TStatusCode.OK) {
+                throw new TException("failed to execute plan fragment: " + 
result.getStatus().getErrorMsgsList());
             }
+        } catch (RpcException e) {
+            throw new TException(e);
         }
     }
 
diff --git 
a/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out 
b/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out
new file mode 100644
index 00000000000..9e1016fe0e2
--- /dev/null
+++ b/regression-test/data/insert_p0/txn_insert_values_with_schema_change.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select1 --
+0      a       10
+1      b       20
+2      c       30
+3      d       40
+
+-- !select2 --
+0      a       10
+1      b       20
+2      c       30
+3      d       40
+
diff --git 
a/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy 
b/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy
new file mode 100644
index 00000000000..cd428b185c4
--- /dev/null
+++ 
b/regression-test/suites/insert_p0/txn_insert_values_with_schema_change.groovy
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.Statement
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+
+suite("txn_insert_values_with_schema_change") {
+    def table = "txn_insert_values_with_schema_change"
+
+    def dbName = "regression_test_insert_p0"
+    def url = getServerPrepareJdbcUrl(context.config.jdbcUrl, 
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
+    logger.info("url: ${url}")
+    List<String> errors = new ArrayList<>()
+    CountDownLatch insertLatch = new CountDownLatch(1)
+    CountDownLatch schemaChangeLatch = new CountDownLatch(1)
+
+    sql """ DROP TABLE IF EXISTS $table force """
+    sql """
+        create table $table (
+            `ID` int(11) NOT NULL,
+            `NAME` varchar(100) NULL,
+            `score` int(11) NULL
+        ) ENGINE=OLAP
+        duplicate KEY(`id`) 
+        distributed by hash(id) buckets 1
+        properties("replication_num" = "1"); 
+    """
+    sql """ insert into ${table} values(0, 'a', 10) """
+
+    def getAlterTableState = { job_state ->
+        def retry = 0
+        sql "use ${dbName};"
+        while (true) {
+            sleep(2000)
+            def state = sql " show alter table column where tablename = 
'${table}' order by CreateTime desc limit 1"
+            logger.info("alter table state: ${state}")
+            if (state.size() > 0 && state[0][9] == job_state) {
+                return
+            }
+            retry++
+            if (retry >= 10) {
+                break
+            }
+        }
+        Assert.fail("alter table job state is not ${job_state} after retry 10 
times")
+    }
+
+    def txnInsert = {
+        try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
+             Statement statement = conn.createStatement()) {
+            statement.execute("SET enable_nereids_planner = true")
+            statement.execute("SET enable_fallback_to_original_planner = 
false")
+            statement.execute("begin")
+            statement.execute("insert into ${table} values(1, 'b', 20), (2, 
'c', 30);")
+
+            schemaChangeLatch.countDown()
+            insertLatch.await(2, TimeUnit.MINUTES)
+
+            statement.execute("insert into ${table} values(3, 'd', 40);")
+            statement.execute("commit")
+        } catch (Throwable e) {
+            logger.error("txn insert failed", e)
+            errors.add("txn insert failed " + e.getMessage())
+        }
+    }
+
+    def schemaChange = { sql, job_state ->
+        try (Connection conn = DriverManager.getConnection(url, 
context.config.jdbcUser, context.config.jdbcPassword);
+             Statement statement = conn.createStatement()) {
+            schemaChangeLatch.await(2, TimeUnit.MINUTES)
+            statement.execute(sql)
+            getAlterTableState(job_state)
+            insertLatch.countDown()
+        } catch (Throwable e) {
+            logger.error("schema change failed", e)
+            errors.add("schema change failed " + e.getMessage())
+        }
+    }
+
+    Thread insert_thread = new Thread(() -> txnInsert())
+    Thread schema_change_thread = new Thread(() -> schemaChange("alter table 
${table} order by (id, score, name);", "WAITING_TXN"))
+    insert_thread.start()
+    schema_change_thread.start()
+    insert_thread.join()
+    schema_change_thread.join()
+
+    logger.info("errors: " + errors)
+    assertEquals(0, errors.size())
+    order_qt_select1 """select id, name, score from ${table} """
+    getAlterTableState("FINISHED")
+    order_qt_select2 """select id, name, score from ${table} """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to