This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6983736cced [Fix](partial update) Fix core when successfully schema change and load during a partial update (#26210) 6983736cced is described below commit 6983736ccedea7aa6f33dbfb41587fbeca5b7f66 Author: bobhan1 <bh2444151...@outlook.com> AuthorDate: Mon Nov 6 23:16:05 2023 +0800 [Fix](partial update) Fix core when successfully schema change and load during a partial update (#26210) --- .../apache/doris/analysis/NativeInsertStmt.java | 3 + .../apache/doris/load/loadv2/BrokerLoadJob.java | 3 + .../doris/load/routineload/RoutineLoadJob.java | 6 + .../plans/commands/InsertIntoTableCommand.java | 3 + .../apache/doris/service/FrontendServiceImpl.java | 6 + .../doris/transaction/DatabaseTransactionMgr.java | 29 ++++ .../apache/doris/transaction/TransactionState.java | 65 ++++++++ .../partial_update/concurrency_update2.csv | 21 +++ .../partial_update/concurrency_update3.csv | 21 +++ .../test_partial_update_2pc_schema_change.out | 70 ++++++++ .../test_partial_update_2pc_schema_change.groovy | 181 +++++++++++++++++++++ 11 files changed, 408 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 68bec33a5e1..1837c268915 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -999,6 +999,9 @@ public class NativeInsertStmt extends InsertStmt { throw new DdlException("txn does not exist: " + transactionId); } txnState.addTableIndexes((OlapTable) targetTable); + if (!isFromDeleteOrUpdateStmt && isPartialUpdate) { + txnState.setSchemaForPartialUpdate((OlapTable) targetTable); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index df1abe1ec9e..82187858492 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -230,6 +230,9 @@ public class BrokerLoadJob extends BulkLoadJob { throw new UserException("txn does not exist: " + transactionId); } txnState.addTableIndexes(table); + if (isPartialUpdate()) { + txnState.setSchemaForPartialUpdate(table); + } } } finally { MetaLockUtils.readUnlockTables(tableList); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 9b3ac1b845e..86fe00a01a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -899,6 +899,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl throw new MetaNotFoundException("txn does not exist: " + txnId); } txnState.addTableIndexes(planner.getDestTable()); + if (isPartialUpdate) { + txnState.setSchemaForPartialUpdate((OlapTable) table); + } return planParams; } finally { @@ -919,6 +922,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl throw new MetaNotFoundException("txn does not exist: " + txnId); } txnState.addTableIndexes(planner.getDestTable()); + if (isPartialUpdate) { + txnState.setSchemaForPartialUpdate((OlapTable) table); + } return planParams; } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index 83273dc6961..f7c30f3820b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -176,6 +176,9 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, throw new DdlException("txn does not exist: " + txn.getTxnId()); } state.addTableIndexes(physicalOlapTableSink.getTargetTable()); + if (physicalOlapTableSink.isFromNativeInsertStmt() && physicalOlapTableSink.isPartialUpdate()) { + state.setSchemaForPartialUpdate(physicalOlapTableSink.getTargetTable()); + } executor.setProfileType(ProfileType.LOAD); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index e6a883e07c7..514b9df525d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2221,6 +2221,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new UserException("txn does not exist: " + request.getTxnId()); } txnState.addTableIndexes(table); + if (request.isPartialUpdate()) { + txnState.setSchemaForPartialUpdate(table); + } } plan.setTableName(table.getName()); plan.query_options.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID()); @@ -2284,6 +2287,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { throw new UserException("txn does not exist: " + request.getTxnId()); } txnState.addTableIndexes(table); + if (request.isPartialUpdate()) { + txnState.setSchemaForPartialUpdate(table); + } } return plan; } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 0fc355260ea..e6d266e43ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -674,6 +674,35 @@ public class DatabaseTransactionMgr { + "] is prepare, not pre-committed."); } + if (transactionState.isPartialUpdate()) { + if (is2PC) { + Iterator<TableCommitInfo> tableCommitInfoIterator + = transactionState.getIdToTableCommitInfos().values().iterator(); + while (tableCommitInfoIterator.hasNext()) { + TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next(); + long tableId = tableCommitInfo.getTableId(); + OlapTable table = (OlapTable) db.getTableNullable(tableId); + if (table != null && table instanceof OlapTable) { + if (!transactionState.checkSchemaCompatibility((OlapTable) table)) { + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] check schema compatibility failed, partial update can't commit with" + + " old schema sucessfully ."); + } + } + } + } else { + for (Table table : tableList) { + if (table instanceof OlapTable) { + if (!transactionState.checkSchemaCompatibility((OlapTable) table)) { + throw new TransactionCommitFailedException("transaction [" + transactionId + + "] check schema compatibility failed, partial update can't commit with" + + " old schema sucessfully ."); + } + } + } + } + } + Set<Long> errorReplicaIds = Sets.newHashSet(); Set<Long> totalInvolvedBackends = Sets.newHashSet(); Map<Long, Set<Long>> tableToPartition = new HashMap<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 5d95917e58d..2b01a612534 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -17,7 +17,9 @@ package org.apache.doris.transaction; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndexMeta; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.Config; import org.apache.doris.common.FeMetaVersion; @@ -45,8 +47,10 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -265,6 +269,24 @@ public class TransactionState implements Writable { // no need to persist. private String errMsg = ""; + public class SchemaInfo { + public List<Column> schema; + public int schemaVersion; + + public SchemaInfo(OlapTable olapTable) { + Map<Long, MaterializedIndexMeta> indexIdToMeta = olapTable.getIndexIdToMeta(); + for (MaterializedIndexMeta indexMeta : indexIdToMeta.values()) { + schema = indexMeta.getSchema(); + schemaVersion = indexMeta.getSchemaVersion(); + break; + } + } + } + + private boolean isPartialUpdate = false; + // table id -> schema info + private Map<Long, SchemaInfo> txnSchemas = new HashMap<>(); + public TransactionState() { this.dbId = -1; this.tableIdList = Lists.newArrayList(); @@ -725,4 +747,47 @@ public class TransactionState implements Writable { public String getErrMsg() { return this.errMsg; } + + public void setSchemaForPartialUpdate(OlapTable olapTable) { + // the caller should hold the read lock of the table + isPartialUpdate = true; + txnSchemas.put(olapTable.getId(), new SchemaInfo(olapTable)); + } + + public boolean isPartialUpdate() { + return isPartialUpdate; + } + + public SchemaInfo getTxnSchema(long id) { + return txnSchemas.get(id); + } + + public boolean checkSchemaCompatibility(OlapTable olapTable) { + SchemaInfo currentSchemaInfo = new SchemaInfo(olapTable); + SchemaInfo txnSchemaInfo = txnSchemas.get(olapTable.getId()); + if (txnSchemaInfo == null) { + return true; + } + if (txnSchemaInfo.schemaVersion >= currentSchemaInfo.schemaVersion) { + return true; + } + for (Column txnCol : txnSchemaInfo.schema) { + if (!txnCol.isVisible() || !txnCol.getType().isStringType()) { + continue; + } + int uniqueId = txnCol.getUniqueId(); + Optional<Column> currentCol = currentSchemaInfo.schema.stream() + .filter(col -> col.getUniqueId() == uniqueId).findFirst(); + // for now Doris's light schema change only supports adding columns, + // dropping columns, and type conversions that increase the varchar length + if (currentCol.isPresent() && currentCol.get().getType().isStringType()) { + if (currentCol.get().getStrLen() != txnCol.getStrLen()) { + LOG.warn("Check schema compatibility failed, txnId={}, table={}", + transactionId, olapTable.getName()); + return false; + } + } + } + return true; + } } diff --git a/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update2.csv b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update2.csv new file mode 100644 index 00000000000..23f43edc585 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update2.csv @@ -0,0 +1,21 @@ +0,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +1,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +2,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +3,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +4,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +5,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +6,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +7,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +8,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +9,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +10,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +11,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +12,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +13,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +14,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +15,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +16,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +17,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +18,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +19,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa +20,aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa diff --git a/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update3.csv b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update3.csv new file mode 100644 index 00000000000..5bc6c8de802 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/concurrency_update3.csv @@ -0,0 +1,21 @@ +0 +1 +2 +3 +4 +5 +6 +7 +8 +9 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +20 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.out new file mode 100644 index 00000000000..9ad9f8333a0 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.out @@ -0,0 +1,70 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +0 \N \N \N \N \N +1 \N \N \N \N \N +10 \N \N \N \N \N +11 \N \N \N \N \N +12 \N \N \N \N \N +13 \N \N \N \N \N +14 \N \N \N \N \N +15 \N \N \N \N \N +16 \N \N \N \N \N +17 \N \N \N \N \N +18 \N \N \N \N \N +19 \N \N \N \N \N +2 \N \N \N \N \N +20 \N \N \N \N \N +3 \N \N \N \N \N +4 \N \N \N \N \N +5 \N \N \N \N \N +6 \N \N \N \N \N +7 \N \N \N \N \N +8 \N \N \N \N \N +9 \N \N \N \N \N + +-- !sql -- +0 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +1 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +10 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +11 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +12 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +13 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +14 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +15 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +16 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +17 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +18 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +19 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +2 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +20 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +3 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +4 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +5 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +6 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +7 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +8 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +9 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N + +-- !sql -- +0 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +1 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +10 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +11 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +12 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +13 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +14 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +15 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +16 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +17 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +18 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +19 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +2 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +20 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +3 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +4 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +5 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +6 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +7 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +8 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N +9 \N aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa \N \N \N + diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.groovy new file mode 100644 index 00000000000..f63ebe9a45e --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_2pc_schema_change.groovy @@ -0,0 +1,181 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.Date +import java.text.SimpleDateFormat +import org.apache.http.HttpResponse +import org.apache.http.client.methods.HttpPut +import org.apache.http.impl.client.CloseableHttpClient +import org.apache.http.impl.client.HttpClients +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.client.config.RequestConfig +import org.apache.http.client.RedirectStrategy +import org.apache.http.protocol.HttpContext +import org.apache.http.HttpRequest +import org.apache.http.impl.client.LaxRedirectStrategy +import org.apache.http.client.methods.RequestBuilder +import org.apache.http.entity.StringEntity +import org.apache.http.client.methods.CloseableHttpResponse +import org.apache.http.util.EntityUtils + +suite("test_partial_update_2pc_schema_change", "p0") { + + def tableName = "test_partial_update_2pc_schema_change" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ CREATE TABLE ${tableName} ( + k1 varchar(20) not null, + v1 varchar(20), + v2 varchar(20), + v3 varchar(20), + v4 varchar(20), + v5 varchar(20)) + UNIQUE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 4 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true")""" + + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', "k1" + + file 'concurrency_update3.csv' + time 10000 // limit inflight 10s + } + qt_sql """ select * from ${tableName} order by k1;""" + + + def wait_for_schema_change = { + def try_times=100 + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + Thread.sleep(10) + if(res[0][9].toString() == "FINISHED"){ + break; + } + assert(try_times>0) + try_times-- + } + } + + InetSocketAddress address = context.config.feHttpInetSocketAddress + String user = context.config.feHttpUser + String password = context.config.feHttpPassword + String db = context.config.getDbNameByFile(context.file) + + def do_streamload_2pc = { txn_id, txn_operation, name -> + HttpClients.createDefault().withCloseable { client -> + RequestBuilder requestBuilder = RequestBuilder.put("http://${address.hostString}:${address.port}/api/${db}/${name}/_stream_load_2pc") + String encoding = Base64.getEncoder() + .encodeToString((user + ":" + (password == null ? "" : password)).getBytes("UTF-8")) + requestBuilder.setHeader("Authorization", "Basic ${encoding}") + requestBuilder.setHeader("Expect", "100-Continue") + requestBuilder.setHeader("txn_id", "${txn_id}") + requestBuilder.setHeader("txn_operation", "${txn_operation}") + + String backendStreamLoadUri = null + client.execute(requestBuilder.build()).withCloseable { resp -> + resp.withCloseable { + String body = EntityUtils.toString(resp.getEntity()) + def respCode = resp.getStatusLine().getStatusCode() + // should redirect to backend + if (respCode != 307) { + throw new IllegalStateException("Expect frontend stream load response code is 307, " + + "but meet ${respCode}\nbody: ${body}") + } + backendStreamLoadUri = resp.getFirstHeader("location").getValue() + } + } + + requestBuilder.setUri(backendStreamLoadUri) + try{ + client.execute(requestBuilder.build()).withCloseable { resp -> + resp.withCloseable { + String body = EntityUtils.toString(resp.getEntity()) + def respCode = resp.getStatusLine().getStatusCode() + if (respCode != 200) { + throw new IllegalStateException("Expect backend stream load response code is 200, " + + "but meet ${respCode}\nbody: ${body}") + } + } + } + } catch (Throwable t) { + log.info("StreamLoad Exception: ", t) + } + } + } + + String txnId + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k1,tmp,v1=substr(tmp,1,20)' + set 'strict_mode', "false" + set 'two_phase_commit', 'true' + file 'concurrency_update2.csv' + time 10000 // limit inflight 10s + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + txnId = json.TxnId + assertEquals("success", json.Status.toLowerCase()) + } + } + + sql """ alter table ${tableName} modify column v2 varchar(40);""" + wait_for_schema_change() + + sql """ alter table ${tableName} drop column v3;""" + wait_for_schema_change() + + sql """ alter table ${tableName} add column v6 varchar(50);""" + wait_for_schema_change() + + sql """ alter table ${tableName} rename column v4 renamed_v4;""" + wait_for_schema_change() + + streamLoad { + table "${tableName}" + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'k1,tmp,v2=substr(tmp,1,40)' + set 'strict_mode', "false" + file 'concurrency_update2.csv' + time 10000 // limit inflight 10s + } + + qt_sql """ select * from ${tableName} order by k1;""" + + do_streamload_2pc(txnId, "commit", tableName) + + qt_sql """ select * from ${tableName} order by k1;""" + + sql "drop table if exists ${tableName};" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org