This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6983736cced [Fix](partial update) Fix core when successfully schema 
change and load during a partial update (#26210)
6983736cced is described below

commit 6983736ccedea7aa6f33dbfb41587fbeca5b7f66
Author: bobhan1 <bh2444151...@outlook.com>
AuthorDate: Mon Nov 6 23:16:05 2023 +0800

    [Fix](partial update) Fix core when successfully schema change and load 
during a partial update (#26210)
---
 .../apache/doris/analysis/NativeInsertStmt.java    |   3 +
 .../apache/doris/load/loadv2/BrokerLoadJob.java    |   3 +
 .../doris/load/routineload/RoutineLoadJob.java     |   6 +
 .../plans/commands/InsertIntoTableCommand.java     |   3 +
 .../apache/doris/service/FrontendServiceImpl.java  |   6 +
 .../doris/transaction/DatabaseTransactionMgr.java  |  29 ++++
 .../apache/doris/transaction/TransactionState.java |  65 ++++++++
 .../partial_update/concurrency_update2.csv         |  21 +++
 .../partial_update/concurrency_update3.csv         |  21 +++
 .../test_partial_update_2pc_schema_change.out      |  70 ++++++++
 .../test_partial_update_2pc_schema_change.groovy   | 181 +++++++++++++++++++++
 11 files changed, 408 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 68bec33a5e1..1837c268915 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -999,6 +999,9 @@ public class NativeInsertStmt extends InsertStmt {
                 throw new DdlException("txn does not exist: " + transactionId);
             }
             txnState.addTableIndexes((OlapTable) targetTable);
+            if (!isFromDeleteOrUpdateStmt && isPartialUpdate) {
+                txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
+            }
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index df1abe1ec9e..82187858492 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -230,6 +230,9 @@ public class BrokerLoadJob extends BulkLoadJob {
                     throw new UserException("txn does not exist: " + 
transactionId);
                 }
                 txnState.addTableIndexes(table);
+                if (isPartialUpdate()) {
+                    txnState.setSchemaForPartialUpdate(table);
+                }
             }
         } finally {
             MetaLockUtils.readUnlockTables(tableList);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 9b3ac1b845e..86fe00a01a9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -899,6 +899,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                 throw new MetaNotFoundException("txn does not exist: " + 
txnId);
             }
             txnState.addTableIndexes(planner.getDestTable());
+            if (isPartialUpdate) {
+                txnState.setSchemaForPartialUpdate((OlapTable) table);
+            }
 
             return planParams;
         } finally {
@@ -919,6 +922,9 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
                 throw new MetaNotFoundException("txn does not exist: " + 
txnId);
             }
             txnState.addTableIndexes(planner.getDestTable());
+            if (isPartialUpdate) {
+                txnState.setSchemaForPartialUpdate((OlapTable) table);
+            }
 
             return planParams;
         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
index 83273dc6961..f7c30f3820b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java
@@ -176,6 +176,9 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
             throw new DdlException("txn does not exist: " + txn.getTxnId());
         }
         state.addTableIndexes(physicalOlapTableSink.getTargetTable());
+        if (physicalOlapTableSink.isFromNativeInsertStmt() && 
physicalOlapTableSink.isPartialUpdate()) {
+            
state.setSchemaForPartialUpdate(physicalOlapTableSink.getTargetTable());
+        }
 
         executor.setProfileType(ProfileType.LOAD);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index e6a883e07c7..514b9df525d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2221,6 +2221,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     throw new UserException("txn does not exist: " + 
request.getTxnId());
                 }
                 txnState.addTableIndexes(table);
+                if (request.isPartialUpdate()) {
+                    txnState.setSchemaForPartialUpdate(table);
+                }
             }
             plan.setTableName(table.getName());
             
plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
@@ -2284,6 +2287,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     throw new UserException("txn does not exist: " + 
request.getTxnId());
                 }
                 txnState.addTableIndexes(table);
+                if (request.isPartialUpdate()) {
+                    txnState.setSchemaForPartialUpdate(table);
+                }
             }
             return plan;
         } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 0fc355260ea..e6d266e43ee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -674,6 +674,35 @@ public class DatabaseTransactionMgr {
                     + "] is prepare, not pre-committed.");
         }
 
+        if (transactionState.isPartialUpdate()) {
+            if (is2PC) {
+                Iterator<TableCommitInfo> tableCommitInfoIterator
+                        = 
transactionState.getIdToTableCommitInfos().values().iterator();
+                while (tableCommitInfoIterator.hasNext()) {
+                    TableCommitInfo tableCommitInfo = 
tableCommitInfoIterator.next();
+                    long tableId = tableCommitInfo.getTableId();
+                    OlapTable table = (OlapTable) db.getTableNullable(tableId);
+                    if (table != null && table instanceof OlapTable) {
+                        if 
(!transactionState.checkSchemaCompatibility((OlapTable) table)) {
+                            throw new 
TransactionCommitFailedException("transaction [" + transactionId
+                                    + "] check schema compatibility failed, 
partial update can't commit with"
+                                            + " old schema sucessfully .");
+                        }
+                    }
+                }
+            } else {
+                for (Table table : tableList) {
+                    if (table instanceof OlapTable) {
+                        if 
(!transactionState.checkSchemaCompatibility((OlapTable) table)) {
+                            throw new 
TransactionCommitFailedException("transaction [" + transactionId
+                                    + "] check schema compatibility failed, 
partial update can't commit with"
+                                            + " old schema sucessfully .");
+                        }
+                    }
+                }
+            }
+        }
+
         Set<Long> errorReplicaIds = Sets.newHashSet();
         Set<Long> totalInvolvedBackends = Sets.newHashSet();
         Map<Long, Set<Long>> tableToPartition = new HashMap<>();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index 5d95917e58d..2b01a612534 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -17,7 +17,9 @@
 
 package org.apache.doris.transaction;
 
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeMetaVersion;
@@ -45,8 +47,10 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -265,6 +269,24 @@ public class TransactionState implements Writable {
     // no need to persist.
     private String errMsg = "";
 
+    public class SchemaInfo {
+        public List<Column> schema;
+        public int schemaVersion;
+
+        public SchemaInfo(OlapTable olapTable) {
+            Map<Long, MaterializedIndexMeta> indexIdToMeta = 
olapTable.getIndexIdToMeta();
+            for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) {
+                schema = indexMeta.getSchema();
+                schemaVersion = indexMeta.getSchemaVersion();
+                break;
+            }
+        }
+    }
+
+    private boolean isPartialUpdate = false;
+    // table id -> schema info
+    private Map<Long, SchemaInfo> txnSchemas = new HashMap<>();
+
     public TransactionState() {
         this.dbId = -1;
         this.tableIdList = Lists.newArrayList();
@@ -725,4 +747,47 @@ public class TransactionState implements Writable {
     public String getErrMsg() {
         return this.errMsg;
     }
+
+    public void setSchemaForPartialUpdate(OlapTable olapTable) {
+        // the caller should hold the read lock of the table
+        isPartialUpdate = true;
+        txnSchemas.put(olapTable.getId(), new SchemaInfo(olapTable));
+    }
+
+    public boolean isPartialUpdate() {
+        return isPartialUpdate;
+    }
+
+    public SchemaInfo getTxnSchema(long id) {
+        return txnSchemas.get(id);
+    }
+
+    public boolean checkSchemaCompatibility(OlapTable olapTable) {
+        SchemaInfo currentSchemaInfo = new SchemaInfo(olapTable);
+        SchemaInfo txnSchemaInfo = txnSchemas.get(olapTable.getId());
+        if (txnSchemaInfo == null) {
+            return true;
+        }
+        if (txnSchemaInfo.schemaVersion >= currentSchemaInfo.schemaVersion) {
+            return true;
+        }
+        for (Column txnCol : txnSchemaInfo.schema) {
+            if (!txnCol.isVisible() || !txnCol.getType().isStringType()) {
+                continue;
+            }
+            int uniqueId = txnCol.getUniqueId();
+            Optional<Column> currentCol = currentSchemaInfo.schema.stream()
+                    .filter(col -> col.getUniqueId() == uniqueId).findFirst();
+            // for now Doris's light schema change only supports adding 
columns,
+            // dropping columns, and type conversions that increase the 
varchar length
+            if (currentCol.isPresent() && 
currentCol.get().getType().isStringType()) {
+                if (currentCol.get().getStrLen() != txnCol.getStrLen()) {
+                    LOG.warn("Check schema compatibility failed, txnId={}, 
table={}",
+                            transactionId, olapTable.getName());
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
 }
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update2.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update2.csv
new file mode 100644
index 00000000000..23f43edc585
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update2.csv
@@ -0,0 +1,21 @@
+0,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+1,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+2,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+3,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+4,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+5,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+6,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+7,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+8,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+9,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+10,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+11,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+12,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+13,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+14,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+15,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+16,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+17,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+18,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+19,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+20,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update3.csv
 
b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update3.csv
new file mode 100644
index 00000000000..5bc6c8de802
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update3.csv
@@ -0,0 +1,21 @@
+0
+1
+2
+3
+4
+5
+6
+7
+8
+9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
diff --git 
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.out
 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.out
new file mode 100644
index 00000000000..9ad9f8333a0
--- /dev/null
+++ 
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.out
@@ -0,0 +1,70 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+0      \N      \N      \N      \N      \N
+1      \N      \N      \N      \N      \N
+10     \N      \N      \N      \N      \N
+11     \N      \N      \N      \N      \N
+12     \N      \N      \N      \N      \N
+13     \N      \N      \N      \N      \N
+14     \N      \N      \N      \N      \N
+15     \N      \N      \N      \N      \N
+16     \N      \N      \N      \N      \N
+17     \N      \N      \N      \N      \N
+18     \N      \N      \N      \N      \N
+19     \N      \N      \N      \N      \N
+2      \N      \N      \N      \N      \N
+20     \N      \N      \N      \N      \N
+3      \N      \N      \N      \N      \N
+4      \N      \N      \N      \N      \N
+5      \N      \N      \N      \N      \N
+6      \N      \N      \N      \N      \N
+7      \N      \N      \N      \N      \N
+8      \N      \N      \N      \N      \N
+9      \N      \N      \N      \N      \N
+
+-- !sql --
+0      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+1      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+10     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+11     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+12     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+13     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+14     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+15     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+16     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+17     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+18     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+19     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+2      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+20     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+3      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+4      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+5      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+6      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+7      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+8      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+9      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+
+-- !sql --
+0      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+1      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+10     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+11     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+12     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+13     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+14     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+15     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+16     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+17     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+18     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+19     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+2      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+20     \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+3      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+4      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+5      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+6      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+7      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+8      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+9      \N      aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa        \N      \N      
\N
+
diff --git 
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.groovy
 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.groovy
new file mode 100644
index 00000000000..f63ebe9a45e
--- /dev/null
+++ 
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.groovy
@@ -0,0 +1,181 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+
+suite("test_partial_update_2pc_schema_change", "p0") {
+
+    def tableName = "test_partial_update_2pc_schema_change"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """ CREATE TABLE ${tableName} (
+        k1 varchar(20) not null,
+        v1 varchar(20),
+        v2 varchar(20),
+        v3 varchar(20),
+        v4 varchar(20),
+        v5 varchar(20))
+        UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 4
+        PROPERTIES(
+            "replication_num" = "1",
+            "light_schema_change" = "true",
+            "enable_unique_key_merge_on_write" = "true",
+            "disable_auto_compaction" = "true")"""
+
+
+    streamLoad {
+        table "${tableName}"
+
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'columns', "k1"
+
+        file 'concurrency_update3.csv'
+        time 10000 // limit inflight 10s
+    }
+    qt_sql """ select * from ${tableName} order by k1;"""
+
+
+    def wait_for_schema_change = {
+        def try_times=100
+        while(true){
+            def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = 
'${tableName}' ORDER BY CreateTime DESC LIMIT 1 "
+            Thread.sleep(10)
+            if(res[0][9].toString() == "FINISHED"){
+                break;
+            }
+            assert(try_times>0)
+            try_times--
+        }
+    }
+
+    InetSocketAddress address = context.config.feHttpInetSocketAddress
+    String user = context.config.feHttpUser
+    String password = context.config.feHttpPassword
+    String db = context.config.getDbNameByFile(context.file)
+
+    def do_streamload_2pc = { txn_id, txn_operation, name ->
+        HttpClients.createDefault().withCloseable { client ->
+            RequestBuilder requestBuilder = 
RequestBuilder.put("http://${address.hostString}:${address.port}/api/${db}/${name}/_stream_load_2pc";)
+            String encoding = Base64.getEncoder()
+                .encodeToString((user + ":" + (password == null ? "" : 
password)).getBytes("UTF-8"))
+            requestBuilder.setHeader("Authorization", "Basic ${encoding}")
+            requestBuilder.setHeader("Expect", "100-Continue")
+            requestBuilder.setHeader("txn_id", "${txn_id}")
+            requestBuilder.setHeader("txn_operation", "${txn_operation}")
+
+            String backendStreamLoadUri = null
+            client.execute(requestBuilder.build()).withCloseable { resp ->
+                resp.withCloseable {
+                    String body = EntityUtils.toString(resp.getEntity())
+                    def respCode = resp.getStatusLine().getStatusCode()
+                    // should redirect to backend
+                    if (respCode != 307) {
+                        throw new IllegalStateException("Expect frontend 
stream load response code is 307, " +
+                                "but meet ${respCode}\nbody: ${body}")
+                    }
+                    backendStreamLoadUri = 
resp.getFirstHeader("location").getValue()
+                }
+            }
+
+            requestBuilder.setUri(backendStreamLoadUri)
+            try{
+                client.execute(requestBuilder.build()).withCloseable { resp ->
+                    resp.withCloseable {
+                        String body = EntityUtils.toString(resp.getEntity())
+                        def respCode = resp.getStatusLine().getStatusCode()
+                        if (respCode != 200) {
+                            throw new IllegalStateException("Expect backend 
stream load response code is 200, " +
+                                    "but meet ${respCode}\nbody: ${body}")
+                        }
+                    }
+                }
+            } catch (Throwable t) {
+                log.info("StreamLoad Exception: ", t)
+            }
+        }
+    }
+
+    String txnId
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'partial_columns', 'true'
+        set 'columns', 'k1,tmp,v1=substr(tmp,1,20)'
+        set 'strict_mode', "false"
+        set 'two_phase_commit', 'true'
+        file 'concurrency_update2.csv'
+        time 10000 // limit inflight 10s
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            log.info("Stream load result: ${result}".toString())
+            def json = parseJson(result)
+            txnId = json.TxnId
+            assertEquals("success", json.Status.toLowerCase())
+        }
+    }
+
+    sql """ alter table ${tableName} modify column v2 varchar(40);"""
+    wait_for_schema_change()
+
+    sql """ alter table ${tableName} drop column v3;"""
+    wait_for_schema_change()
+
+    sql """ alter table ${tableName} add column v6 varchar(50);"""
+    wait_for_schema_change()
+
+    sql """ alter table ${tableName} rename column v4 renamed_v4;"""
+    wait_for_schema_change()
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'format', 'csv'
+        set 'partial_columns', 'true'
+        set 'columns', 'k1,tmp,v2=substr(tmp,1,40)'
+        set 'strict_mode', "false"
+        file 'concurrency_update2.csv'
+        time 10000 // limit inflight 10s
+    }
+
+    qt_sql """ select * from ${tableName} order by k1;"""
+
+    do_streamload_2pc(txnId, "commit", tableName)
+    
+    qt_sql """ select * from ${tableName} order by k1;"""
+
+    sql "drop table if exists ${tableName};"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to