This is an automated email from the ASF dual-hosted git repository. zhangchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e5361a6839c [fix](cloud-mow)Fe should process KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES when geting delete bitmap lock fail (#47161) e5361a6839c is described below commit e5361a6839c3369ca80740c262c7a279a824b416 Author: huanghaibin <huanghai...@selectdb.com> AuthorDate: Tue Jan 21 21:21:11 2025 +0800 [fix](cloud-mow)Fe should process KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES when geting delete bitmap lock fail (#47161) Fe should process KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES when geting delete bitmap lock fail --- .../transaction/CloudGlobalTransactionMgr.java | 21 ++++++- ...est_cloud_mow_stream_load_with_txn_conflict.out | Bin 0 -> 122 bytes ..._cloud_mow_stream_load_with_txn_conflict.groovy | 61 +++++++++++++++++++++ 3 files changed, 81 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index e1e722443e4..114dbeaa4e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -54,6 +54,7 @@ import org.apache.doris.cloud.proto.Cloud.GetTxnRequest; import org.apache.doris.cloud.proto.Cloud.GetTxnResponse; import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB; import org.apache.doris.cloud.proto.Cloud.MetaServiceCode; +import org.apache.doris.cloud.proto.Cloud.MetaServiceResponseStatus; import org.apache.doris.cloud.proto.Cloud.PrecommitTxnRequest; import org.apache.doris.cloud.proto.Cloud.PrecommitTxnResponse; import org.apache.doris.cloud.proto.Cloud.RemoveDeleteBitmapUpdateLockRequest; @@ -143,6 +144,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -867,6 +869,22 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { LOG.debug("get delete bitmap lock, transactionId={}, Request: {}, Response: {}", transactionId, request, response); } + if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.conflict")) { + DebugPoint debugPoint = DebugPointUtil.getDebugPoint( + "CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.conflict"); + double percent = debugPoint.param("percent", 0.4); + long timestamp = System.currentTimeMillis(); + Random random = new Random(timestamp); + if (Math.abs(random.nextInt()) % 100 < 100 * percent) { + LOG.info("set kv txn conflict for test"); + GetDeleteBitmapUpdateLockResponse.Builder getLockResponseBuilder + = GetDeleteBitmapUpdateLockResponse.newBuilder(); + getLockResponseBuilder.setStatus(MetaServiceResponseStatus.newBuilder() + .setCode(MetaServiceCode.KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) + .setMsg("kv txn conflict")); + response = getLockResponseBuilder.build(); + } + } if (response.getStatus().getCode() != MetaServiceCode.LOCK_CONFLICT && response.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { break; @@ -892,7 +910,8 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { LOG.warn("get delete bitmap lock failed, transactionId={}, for {} times, response:{}", transactionId, retryTime, response); if (response.getStatus().getCode() == MetaServiceCode.LOCK_CONFLICT - || response.getStatus().getCode() == MetaServiceCode.KV_TXN_CONFLICT) { + || response.getStatus().getCode() == MetaServiceCode.KV_TXN_CONFLICT + || response.getStatus().getCode() == MetaServiceCode.KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) { // DELETE_BITMAP_LOCK_ERR will be retried on be throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, "Failed to get delete bitmap lock due to confilct"); diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_txn_conflict.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_txn_conflict.out new file mode 100644 index 00000000000..9c8bb8cd785 Binary files /dev/null and b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_txn_conflict.out differ diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_txn_conflict.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_txn_conflict.groovy new file mode 100644 index 00000000000..5e4479064f4 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_txn_conflict.groovy @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_mow_stream_load_with_txn_conflict", "nonConcurrent") { + GetDebugPoint().clearDebugPointsForAllFEs() + def tableName = "test_cloud_mow_stream_load_with_txn_conflict" + try { + // create table + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + GetDebugPoint().enableDebugPointForAllFEs('CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.conflict', [percent: 0.4]) + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + qt_sql """ select * from ${tableName} order by id""" + } finally { + GetDebugPoint().disableDebugPointForAllFEs('CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.conflict') + sql "DROP TABLE IF EXISTS ${tableName};" + GetDebugPoint().clearDebugPointsForAllFEs() + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org