This is an automated email from the ASF dual-hosted git repository. zhangchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 6a189b34348 [fix](cloud-mow) FE should release delete bitmap lock when calculating delete bitmap failed #45673 (#46650) 6a189b34348 is described below commit 6a189b3434895b10a8f0aff9be3761e25f09ab01 Author: huanghaibin <huanghai...@selectdb.com> AuthorDate: Fri Jan 10 18:14:20 2025 +0800 [fix](cloud-mow) FE should release delete bitmap lock when calculating delete bitmap failed #45673 (#46650) pick pr #45673 --- .../transaction/CloudGlobalTransactionMgr.java | 180 ++++++++++----------- .../transaction/DeleteBitmapUpdateLockContext.java | 82 ++++++++++ ...test_cloud_mow_stream_load_with_commit_fail.out | 7 + ...t_cloud_mow_stream_load_with_commit_fail.groovy | 142 ++++++++++++++++ 4 files changed, 315 insertions(+), 96 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index c91a8a4bdfb..706ab5161f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -346,7 +346,29 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { public void commitTransaction(long dbId, List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment) throws UserException { - commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false); + List<OlapTable> mowTableList = getMowTableList(tableList, tabletCommitInfos); + try { + LOG.info("try to commit transaction, transactionId: {}", transactionId); + Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = null; + if (!mowTableList.isEmpty()) { + DeleteBitmapUpdateLockContext lockContext = new DeleteBitmapUpdateLockContext(); + getDeleteBitmapUpdateLock(transactionId, mowTableList, tabletCommitInfos, lockContext); + if (lockContext.getBackendToPartitionTablets().isEmpty()) { + throw new UserException( + "The partition info is empty, table may be dropped, txnid=" + transactionId); + } + backendToPartitionInfos = getCalcDeleteBitmapInfo(lockContext); + } + commitTransactionWithoutLock(dbId, tableList, transactionId, tabletCommitInfos, txnCommitAttachment, false, + mowTableList, backendToPartitionInfos); + } catch (Exception e) { + if (!mowTableList.isEmpty()) { + LOG.warn("commit txn {} failed, release delete bitmap lock, catch exception {}", transactionId, + e.getMessage()); + removeDeleteBitmapUpdateLock(mowTableList, transactionId); + } + throw e; + } } /** @@ -464,17 +486,15 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { return baseTabletIds; } - private void commitTransaction(long dbId, List<Table> tableList, long transactionId, - List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC) + private void commitTransactionWithoutLock(long dbId, List<Table> tableList, long transactionId, + List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment txnCommitAttachment, boolean is2PC, + List<OlapTable> mowTableList, Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos) throws UserException { - - LOG.info("try to commit transaction, transactionId: {}", transactionId); if (Config.disable_load_job) { throw new TransactionCommitFailedException( "disable_load_job is set to true, all load jobs are not allowed"); } - List<OlapTable> mowTableList = getMowTableList(tableList, tabletCommitInfos); if (!mowTableList.isEmpty()) { // may be this txn has been calculated by previously task but commit rpc is timeout, // and be will send another commit request to fe, so need to check txn status first @@ -493,7 +513,7 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { transactionState.getTransactionStatus().toString()); } } - calcDeleteBitmapForMow(dbId, mowTableList, transactionId, tabletCommitInfos); + sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos); } CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder(); @@ -529,6 +549,11 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } } + if (DebugPointUtil.isEnable("FE.mow.commit.exception")) { + LOG.info("debug point FE.mow.commit.exception, throw e"); + throw new UserException("debug point FE.mow.commit.exception"); + } + final CommitTxnRequest commitTxnRequest = builder.build(); boolean txnOperated = false; TransactionState txnState = null; @@ -648,46 +673,9 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { return mowTableList; } - private void calcDeleteBitmapForMow(long dbId, List<OlapTable> tableList, long transactionId, - List<TabletCommitInfo> tabletCommitInfos) - throws UserException { - Map<Long, Map<Long, List<Long>>> backendToPartitionTablets = Maps.newHashMap(); - Map<Long, Partition> partitions = Maps.newHashMap(); - Map<Long, Set<Long>> tableToPartitions = Maps.newHashMap(); - Map<Long, List<Long>> tableToTabletList = Maps.newHashMap(); - Map<Long, TabletMeta> tabletToTabletMeta = Maps.newHashMap(); - getPartitionInfo(tableList, tabletCommitInfos, tableToPartitions, partitions, backendToPartitionTablets, - tableToTabletList, tabletToTabletMeta); - if (backendToPartitionTablets.isEmpty()) { - throw new UserException("The partition info is empty, table may be dropped, txnid=" + transactionId); - } - - Map<Long, Long> baseCompactionCnts = Maps.newHashMap(); - Map<Long, Long> cumulativeCompactionCnts = Maps.newHashMap(); - Map<Long, Long> cumulativePoints = Maps.newHashMap(); - getDeleteBitmapUpdateLock(tableToPartitions, transactionId, tableToTabletList, tabletToTabletMeta, - baseCompactionCnts, cumulativeCompactionCnts, cumulativePoints); - Map<Long, Long> partitionVersions = getPartitionVersions(partitions); - - Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = getCalcDeleteBitmapInfo( - backendToPartitionTablets, partitionVersions, baseCompactionCnts, cumulativeCompactionCnts, - cumulativePoints); - try { - sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos); - } catch (UserException e) { - LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" + transactionId + ",exception=" + e.getMessage()); - removeDeleteBitmapUpdateLock(tableToPartitions, transactionId); - throw e; - } - } - private void getPartitionInfo(List<OlapTable> tableList, List<TabletCommitInfo> tabletCommitInfos, - Map<Long, Set<Long>> tableToParttions, - Map<Long, Partition> partitions, - Map<Long, Map<Long, List<Long>>> backendToPartitionTablets, - Map<Long, List<Long>> tableToTabletList, - Map<Long, TabletMeta> tabletToTabletMeta) { + DeleteBitmapUpdateLockContext lockContext) { Map<Long, OlapTable> tableMap = Maps.newHashMap(); for (OlapTable olapTable : tableList) { tableMap.put(olapTable.getId(), olapTable); @@ -704,30 +692,30 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { continue; } - tabletToTabletMeta.put(tabletIds.get(i), tabletMeta); + lockContext.getTabletToTabletMeta().put(tabletIds.get(i), tabletMeta); - if (!tableToTabletList.containsKey(tableId)) { - tableToTabletList.put(tableId, Lists.newArrayList()); + if (!lockContext.getTableToTabletList().containsKey(tableId)) { + lockContext.getTableToTabletList().put(tableId, Lists.newArrayList()); } - tableToTabletList.get(tableId).add(tabletIds.get(i)); + lockContext.getTableToTabletList().get(tableId).add(tabletIds.get(i)); long partitionId = tabletMeta.getPartitionId(); long backendId = tabletCommitInfos.get(i).getBackendId(); - if (!tableToParttions.containsKey(tableId)) { - tableToParttions.put(tableId, Sets.newHashSet()); + if (!lockContext.getTableToPartitions().containsKey(tableId)) { + lockContext.getTableToPartitions().put(tableId, Sets.newHashSet()); } - tableToParttions.get(tableId).add(partitionId); + lockContext.getTableToPartitions().get(tableId).add(partitionId); - if (!backendToPartitionTablets.containsKey(backendId)) { - backendToPartitionTablets.put(backendId, Maps.newHashMap()); + if (!lockContext.getBackendToPartitionTablets().containsKey(backendId)) { + lockContext.getBackendToPartitionTablets().put(backendId, Maps.newHashMap()); } - Map<Long, List<Long>> partitionToTablets = backendToPartitionTablets.get(backendId); + Map<Long, List<Long>> partitionToTablets = lockContext.getBackendToPartitionTablets().get(backendId); if (!partitionToTablets.containsKey(partitionId)) { partitionToTablets.put(partitionId, Lists.newArrayList()); } partitionToTablets.get(partitionId).add(tabletIds.get(i)); - partitions.putIfAbsent(partitionId, tableMap.get(tableId).getPartition(partitionId)); + lockContext.getPartitions().putIfAbsent(partitionId, tableMap.get(tableId).getPartition(partitionId)); } } @@ -742,11 +730,10 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } private Map<Long, List<TCalcDeleteBitmapPartitionInfo>> getCalcDeleteBitmapInfo( - Map<Long, Map<Long, List<Long>>> backendToPartitionTablets, Map<Long, Long> partitionVersions, - Map<Long, Long> baseCompactionCnts, Map<Long, Long> cumulativeCompactionCnts, - Map<Long, Long> cumulativePoints) { + DeleteBitmapUpdateLockContext lockContext) { Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos = Maps.newHashMap(); - for (Map.Entry<Long, Map<Long, List<Long>>> entry : backendToPartitionTablets.entrySet()) { + Map<Long, Long> partitionVersions = getPartitionVersions(lockContext.getPartitions()); + for (Map.Entry<Long, Map<Long, List<Long>>> entry : lockContext.getBackendToPartitionTablets().entrySet()) { List<TCalcDeleteBitmapPartitionInfo> partitionInfos = Lists.newArrayList(); for (Map.Entry<Long, List<Long>> partitionToTablets : entry.getValue().entrySet()) { Long partitionId = partitionToTablets.getKey(); @@ -754,15 +741,16 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { TCalcDeleteBitmapPartitionInfo partitionInfo = new TCalcDeleteBitmapPartitionInfo(partitionId, partitionVersions.get(partitionId), tabletList); - if (!baseCompactionCnts.isEmpty() && !cumulativeCompactionCnts.isEmpty() - && !cumulativePoints.isEmpty()) { + if (!lockContext.getBaseCompactionCnts().isEmpty() + && !lockContext.getCumulativeCompactionCnts().isEmpty() + && !lockContext.getCumulativePoints().isEmpty()) { List<Long> reqBaseCompactionCnts = Lists.newArrayList(); List<Long> reqCumulativeCompactionCnts = Lists.newArrayList(); List<Long> reqCumulativePoints = Lists.newArrayList(); for (long tabletId : tabletList) { - reqBaseCompactionCnts.add(baseCompactionCnts.get(tabletId)); - reqCumulativeCompactionCnts.add(cumulativeCompactionCnts.get(tabletId)); - reqCumulativePoints.add(cumulativePoints.get(tabletId)); + reqBaseCompactionCnts.add(lockContext.getBaseCompactionCnts().get(tabletId)); + reqCumulativeCompactionCnts.add(lockContext.getCumulativeCompactionCnts().get(tabletId)); + reqCumulativePoints.add(lockContext.getCumulativePoints().get(tabletId)); } partitionInfo.setBaseCompactionCnts(reqBaseCompactionCnts); partitionInfo.setCumulativeCompactionCnts(reqCumulativeCompactionCnts); @@ -775,10 +763,9 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { return backendToPartitionInfos; } - private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, long transactionId, - Map<Long, List<Long>> tableToTabletList, Map<Long, TabletMeta> tabletToTabletMeta, - Map<Long, Long> baseCompactionCnts, Map<Long, Long> cumulativeCompactionCnts, - Map<Long, Long> cumulativePoints) throws UserException { + private void getDeleteBitmapUpdateLock(long transactionId, List<OlapTable> mowTableList, + List<TabletCommitInfo> tabletCommitInfos, DeleteBitmapUpdateLockContext lockContext) + throws UserException { if (DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.sleep")) { DebugPoint debugPoint = DebugPointUtil.getDebugPoint( "CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.sleep"); @@ -811,17 +798,15 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { } StopWatch stopWatch = new StopWatch(); stopWatch.start(); + getPartitionInfo(mowTableList, tabletCommitInfos, lockContext); int totalRetryTime = 0; - for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) { + for (Map.Entry<Long, Set<Long>> entry : lockContext.getTableToPartitions().entrySet()) { GetDeleteBitmapUpdateLockRequest.Builder builder = GetDeleteBitmapUpdateLockRequest.newBuilder(); - builder.setTableId(entry.getKey()) - .setLockId(transactionId) - .setInitiator(-1) - .setExpiration(Config.delete_bitmap_lock_expiration_seconds) - .setRequireCompactionStats(true); - List<Long> tabletList = tableToTabletList.get(entry.getKey()); + builder.setTableId(entry.getKey()).setLockId(transactionId).setInitiator(-1) + .setExpiration(Config.delete_bitmap_lock_expiration_seconds).setRequireCompactionStats(true); + List<Long> tabletList = lockContext.getTableToTabletList().get(entry.getKey()); for (Long tabletId : tabletList) { - TabletMeta tabletMeta = tabletToTabletMeta.get(tabletId); + TabletMeta tabletMeta = lockContext.getTabletToTabletMeta().get(tabletId); TabletIndexPB.Builder tabletIndexBuilder = TabletIndexPB.newBuilder(); tabletIndexBuilder.setDbId(tabletMeta.getDbId()); tabletIndexBuilder.setTableId(tabletMeta.getTableId()); @@ -838,16 +823,16 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { try { response = MetaServiceProxy.getInstance().getDeleteBitmapUpdateLock(request); if (LOG.isDebugEnabled()) { - LOG.debug("get delete bitmap lock, transactionId={}, Request: {}, Response: {}", - transactionId, request, response); + LOG.debug("get delete bitmap lock, transactionId={}, Request: {}, Response: {}", transactionId, + request, response); } if (response.getStatus().getCode() != MetaServiceCode.LOCK_CONFLICT && response.getStatus().getCode() != MetaServiceCode.KV_TXN_CONFLICT) { break; } } catch (Exception e) { - LOG.warn("ignore get delete bitmap lock exception, transactionId={}, retryTime={}", - transactionId, retryTime, e); + LOG.warn("ignore get delete bitmap lock exception, transactionId={}, retryTime={}", transactionId, + retryTime, e); } // sleep random millis [20, 300] ms, avoid txn conflict int randomMillis = 20 + (int) (Math.random() * (300 - 20)); @@ -863,8 +848,8 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { Preconditions.checkNotNull(response); Preconditions.checkNotNull(response.getStatus()); if (response.getStatus().getCode() != MetaServiceCode.OK) { - LOG.warn("get delete bitmap lock failed, transactionId={}, for {} times, response:{}", - transactionId, retryTime, response); + LOG.warn("get delete bitmap lock failed, transactionId={}, for {} times, response:{}", transactionId, + retryTime, response); if (response.getStatus().getCode() == MetaServiceCode.LOCK_CONFLICT || response.getStatus().getCode() == MetaServiceCode.KV_TXN_CONFLICT) { // DELETE_BITMAP_LOCK_ERR will be retried on be @@ -885,30 +870,28 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { if (size1 != tabletList.size() || size2 != tabletList.size() || size3 != tabletList.size()) { throw new UserException("The size of returned compaction cnts can't match the size of tabletList, " + "tabletList.size()=" + tabletList.size() + ", respBaseCompactionCnts.size()=" + size1 - + ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" - + size3); + + ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" + size3); } for (int i = 0; i < tabletList.size(); i++) { long tabletId = tabletList.get(i); - baseCompactionCnts.put(tabletId, respBaseCompactionCnts.get(i)); - cumulativeCompactionCnts.put(tabletId, respCumulativeCompactionCnts.get(i)); - cumulativePoints.put(tabletId, respCumulativePoints.get(i)); + lockContext.getBaseCompactionCnts().put(tabletId, respBaseCompactionCnts.get(i)); + lockContext.getCumulativeCompactionCnts().put(tabletId, respCumulativeCompactionCnts.get(i)); + lockContext.getCumulativePoints().put(tabletId, respCumulativePoints.get(i)); } totalRetryTime += retryTime; } stopWatch.stop(); if (totalRetryTime > 0 || stopWatch.getTime() > 20) { - LOG.info( - "get delete bitmap lock successfully. txns: {}. totalRetryTime: {}. " - + "partitionSize: {}. time cost: {} ms.", - transactionId, totalRetryTime, tableToParttions.size(), stopWatch.getTime()); + LOG.info("get delete bitmap lock successfully. txns: {}. totalRetryTime: {}. " + + "partitionSize: {}. time cost: {} ms.", transactionId, totalRetryTime, + lockContext.getTableToPartitions().size(), stopWatch.getTime()); } } - private void removeDeleteBitmapUpdateLock(Map<Long, Set<Long>> tableToParttions, long transactionId) { - for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) { + private void removeDeleteBitmapUpdateLock(List<OlapTable> tableList, long transactionId) { + for (OlapTable table : tableList) { RemoveDeleteBitmapUpdateLockRequest.Builder builder = RemoveDeleteBitmapUpdateLockRequest.newBuilder(); - builder.setTableId(entry.getKey()) + builder.setTableId(table.getId()) .setLockId(transactionId) .setInitiator(-1); final RemoveDeleteBitmapUpdateLockRequest request = builder.build(); @@ -935,6 +918,10 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { private void sendCalcDeleteBitmaptask(long dbId, long transactionId, Map<Long, List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos) throws UserException { + if (backendToPartitionInfos == null) { + throw new UserException("failed to send calculate delete bitmap task to be,transactionId=" + transactionId + + ",but backendToPartitionInfos is null"); + } if (backendToPartitionInfos.isEmpty()) { return; } @@ -1131,7 +1118,8 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { @Override public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis) throws UserException { - commitTransaction(db.getId(), tableList, transactionId, null, null, true); + List<OlapTable> mowTableList = getMowTableList(tableList, null); + commitTransactionWithoutLock(db.getId(), tableList, transactionId, null, null, true, mowTableList, null); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java new file mode 100644 index 00000000000..02886f63427 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.transaction; + +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.TabletMeta; + +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DeleteBitmapUpdateLockContext { + private Map<Long, Long> baseCompactionCnts; + private Map<Long, Long> cumulativeCompactionCnts; + private Map<Long, Long> cumulativePoints; + private Map<Long, Set<Long>> tableToPartitions; + private Map<Long, Partition> partitions; + private Map<Long, Map<Long, List<Long>>> backendToPartitionTablets; + private Map<Long, List<Long>> tableToTabletList; + private Map<Long, TabletMeta> tabletToTabletMeta; + + public DeleteBitmapUpdateLockContext() { + baseCompactionCnts = Maps.newHashMap(); + cumulativeCompactionCnts = Maps.newHashMap(); + cumulativePoints = Maps.newHashMap(); + tableToPartitions = Maps.newHashMap(); + partitions = Maps.newHashMap(); + backendToPartitionTablets = Maps.newHashMap(); + tableToTabletList = Maps.newHashMap(); + tabletToTabletMeta = Maps.newHashMap(); + } + + public Map<Long, List<Long>> getTableToTabletList() { + return tableToTabletList; + } + + public Map<Long, Long> getBaseCompactionCnts() { + return baseCompactionCnts; + } + + public Map<Long, Long> getCumulativeCompactionCnts() { + return cumulativeCompactionCnts; + } + + public Map<Long, Long> getCumulativePoints() { + return cumulativePoints; + } + + public Map<Long, Map<Long, List<Long>>> getBackendToPartitionTablets() { + return backendToPartitionTablets; + } + + public Map<Long, Partition> getPartitions() { + return partitions; + } + + public Map<Long, Set<Long>> getTableToPartitions() { + return tableToPartitions; + } + + public Map<Long, TabletMeta> getTabletToTabletMeta() { + return tabletToTabletMeta; + } + +} diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out new file mode 100644 index 00000000000..b8b3ea3ecca --- /dev/null +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- + +-- !sql -- +5 e 90 +6 f 100 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy new file mode 100644 index 00000000000..fa71c3644f2 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") { + GetDebugPoint().clearDebugPointsForAllFEs() + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string: [:]] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_param = { paramName, paramValue -> + // for eache be node, set paramName=paramValue + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, paramValue)) + assertTrue(out.contains("OK")) + } + } + + def reset_be_param = { paramName -> + // for eache be node, reset paramName to default + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + def original_value = backendId_to_params.get(id).get(paramName) + def (code, out, err) = curl("POST", String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName, original_value)) + assertTrue(out.contains("OK")) + } + } + + def get_be_param = { paramName -> + // for eache be node, get param value by default + def paramValue = "" + for (String id in backendId_to_backendIP.keySet()) { + def beIp = backendId_to_backendIP.get(id) + def bePort = backendId_to_backendHttpPort.get(id) + // get the config value from be + def (code, out, err) = curl("GET", String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort, paramName)) + assertTrue(code == 0) + assertTrue(out.contains(paramName)) + // parsing + def resultList = parseJson(out)[0] + assertTrue(resultList.size() == 4) + // get original value + paramValue = resultList[2] + backendId_to_params.get(id, [:]).put(paramName, paramValue) + } + } + + def customFeConfig = [ + calculate_delete_bitmap_task_timeout_seconds: 2, + meta_service_rpc_retry_times : 5 + ] + + // store the original value + get_be_param("mow_stream_load_commit_retry_times") + // disable retry to make this problem more clear + set_be_param("mow_stream_load_commit_retry_times", "1") + + + def tableName = "tbl_basic" + setFeConfigTemporary(customFeConfig) { + try { + // create table + sql """ drop table if exists ${tableName}; """ + + sql """ + CREATE TABLE `${tableName}` ( + `id` int(11) NOT NULL, + `name` varchar(1100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "replication_num" = "1" + ); + """ + // this streamLoad will fail on fe commit phase + GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception', null) + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("fail", json.Status.toLowerCase()) + assertTrue(json.Message.contains("FE.mow.commit.exception")) + } + } + qt_sql """ select * from ${tableName} order by id""" + + // this streamLoad will success because of removing exception injection + GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception') + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'id, name, score' + file "test_stream_load.csv" + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + log.info("Stream load result: ${result}") + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + } + } + qt_sql """ select * from ${tableName} order by id""" + } finally { + reset_be_param("mow_stream_load_commit_retry_times") + GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception') + sql "DROP TABLE IF EXISTS ${tableName};" + GetDebugPoint().clearDebugPointsForAllFEs() + } + + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org