This is an automated email from the ASF dual-hosted git repository.

zhangchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 6a189b34348 [fix](cloud-mow) FE should release delete bitmap lock when 
calculating delete bitmap failed #45673 (#46650)
6a189b34348 is described below

commit 6a189b3434895b10a8f0aff9be3761e25f09ab01
Author: huanghaibin <huanghai...@selectdb.com>
AuthorDate: Fri Jan 10 18:14:20 2025 +0800

    [fix](cloud-mow) FE should release delete bitmap lock when calculating 
delete bitmap failed #45673 (#46650)
    
    pick pr #45673
---
 .../transaction/CloudGlobalTransactionMgr.java     | 180 ++++++++++-----------
 .../transaction/DeleteBitmapUpdateLockContext.java |  82 ++++++++++
 ...test_cloud_mow_stream_load_with_commit_fail.out |   7 +
 ...t_cloud_mow_stream_load_with_commit_fail.groovy | 142 ++++++++++++++++
 4 files changed, 315 insertions(+), 96 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index c91a8a4bdfb..706ab5161f4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -346,7 +346,29 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     public void commitTransaction(long dbId, List<Table> tableList, long 
transactionId,
             List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment 
txnCommitAttachment)
             throws UserException {
-        commitTransaction(dbId, tableList, transactionId, tabletCommitInfos, 
txnCommitAttachment, false);
+        List<OlapTable> mowTableList = getMowTableList(tableList, 
tabletCommitInfos);
+        try {
+            LOG.info("try to commit transaction, transactionId: {}", 
transactionId);
+            Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos = null;
+            if (!mowTableList.isEmpty()) {
+                DeleteBitmapUpdateLockContext lockContext = new 
DeleteBitmapUpdateLockContext();
+                getDeleteBitmapUpdateLock(transactionId, mowTableList, 
tabletCommitInfos, lockContext);
+                if (lockContext.getBackendToPartitionTablets().isEmpty()) {
+                    throw new UserException(
+                            "The partition info is empty, table may be 
dropped, txnid=" + transactionId);
+                }
+                backendToPartitionInfos = getCalcDeleteBitmapInfo(lockContext);
+            }
+            commitTransactionWithoutLock(dbId, tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment, false,
+                    mowTableList, backendToPartitionInfos);
+        } catch (Exception e) {
+            if (!mowTableList.isEmpty()) {
+                LOG.warn("commit txn {} failed, release delete bitmap lock, 
catch exception {}", transactionId,
+                        e.getMessage());
+                removeDeleteBitmapUpdateLock(mowTableList, transactionId);
+            }
+            throw e;
+        }
     }
 
     /**
@@ -464,17 +486,15 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         return baseTabletIds;
     }
 
-    private void commitTransaction(long dbId, List<Table> tableList, long 
transactionId,
-            List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment 
txnCommitAttachment, boolean is2PC)
+    private void commitTransactionWithoutLock(long dbId, List<Table> 
tableList, long transactionId,
+            List<TabletCommitInfo> tabletCommitInfos, TxnCommitAttachment 
txnCommitAttachment, boolean is2PC,
+            List<OlapTable> mowTableList, Map<Long, 
List<TCalcDeleteBitmapPartitionInfo>> backendToPartitionInfos)
             throws UserException {
-
-        LOG.info("try to commit transaction, transactionId: {}", 
transactionId);
         if (Config.disable_load_job) {
             throw new TransactionCommitFailedException(
                     "disable_load_job is set to true, all load jobs are not 
allowed");
         }
 
-        List<OlapTable> mowTableList = getMowTableList(tableList, 
tabletCommitInfos);
         if (!mowTableList.isEmpty()) {
             // may be this txn has been calculated by previously task but 
commit rpc is timeout,
             // and be will send another commit request to fe, so need to check 
txn status first
@@ -493,7 +513,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                             
transactionState.getTransactionStatus().toString());
                 }
             }
-            calcDeleteBitmapForMow(dbId, mowTableList, transactionId, 
tabletCommitInfos);
+            sendCalcDeleteBitmaptask(dbId, transactionId, 
backendToPartitionInfos);
         }
 
         CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
@@ -529,6 +549,11 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             }
         }
 
+        if (DebugPointUtil.isEnable("FE.mow.commit.exception")) {
+            LOG.info("debug point FE.mow.commit.exception, throw e");
+            throw new UserException("debug point FE.mow.commit.exception");
+        }
+
         final CommitTxnRequest commitTxnRequest = builder.build();
         boolean txnOperated = false;
         TransactionState txnState = null;
@@ -648,46 +673,9 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         return mowTableList;
     }
 
-    private void calcDeleteBitmapForMow(long dbId, List<OlapTable> tableList, 
long transactionId,
-            List<TabletCommitInfo> tabletCommitInfos)
-            throws UserException {
-        Map<Long, Map<Long, List<Long>>> backendToPartitionTablets = 
Maps.newHashMap();
-        Map<Long, Partition> partitions = Maps.newHashMap();
-        Map<Long, Set<Long>> tableToPartitions = Maps.newHashMap();
-        Map<Long, List<Long>> tableToTabletList = Maps.newHashMap();
-        Map<Long, TabletMeta> tabletToTabletMeta = Maps.newHashMap();
-        getPartitionInfo(tableList, tabletCommitInfos, tableToPartitions, 
partitions, backendToPartitionTablets,
-                tableToTabletList, tabletToTabletMeta);
-        if (backendToPartitionTablets.isEmpty()) {
-            throw new UserException("The partition info is empty, table may be 
dropped, txnid=" + transactionId);
-        }
-
-        Map<Long, Long> baseCompactionCnts = Maps.newHashMap();
-        Map<Long, Long> cumulativeCompactionCnts = Maps.newHashMap();
-        Map<Long, Long> cumulativePoints = Maps.newHashMap();
-        getDeleteBitmapUpdateLock(tableToPartitions, transactionId, 
tableToTabletList, tabletToTabletMeta,
-                baseCompactionCnts, cumulativeCompactionCnts, 
cumulativePoints);
-        Map<Long, Long> partitionVersions = getPartitionVersions(partitions);
-
-        Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos = getCalcDeleteBitmapInfo(
-                backendToPartitionTablets, partitionVersions, 
baseCompactionCnts, cumulativeCompactionCnts,
-                        cumulativePoints);
-        try {
-            sendCalcDeleteBitmaptask(dbId, transactionId, 
backendToPartitionInfos);
-        } catch (UserException e) {
-            LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" + 
transactionId + ",exception=" + e.getMessage());
-            removeDeleteBitmapUpdateLock(tableToPartitions, transactionId);
-            throw e;
-        }
-    }
-
     private void getPartitionInfo(List<OlapTable> tableList,
             List<TabletCommitInfo> tabletCommitInfos,
-            Map<Long, Set<Long>> tableToParttions,
-            Map<Long, Partition> partitions,
-            Map<Long, Map<Long, List<Long>>> backendToPartitionTablets,
-            Map<Long, List<Long>> tableToTabletList,
-            Map<Long, TabletMeta> tabletToTabletMeta) {
+            DeleteBitmapUpdateLockContext lockContext) {
         Map<Long, OlapTable> tableMap = Maps.newHashMap();
         for (OlapTable olapTable : tableList) {
             tableMap.put(olapTable.getId(), olapTable);
@@ -704,30 +692,30 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                 continue;
             }
 
-            tabletToTabletMeta.put(tabletIds.get(i), tabletMeta);
+            lockContext.getTabletToTabletMeta().put(tabletIds.get(i), 
tabletMeta);
 
-            if (!tableToTabletList.containsKey(tableId)) {
-                tableToTabletList.put(tableId, Lists.newArrayList());
+            if (!lockContext.getTableToTabletList().containsKey(tableId)) {
+                lockContext.getTableToTabletList().put(tableId, 
Lists.newArrayList());
             }
-            tableToTabletList.get(tableId).add(tabletIds.get(i));
+            
lockContext.getTableToTabletList().get(tableId).add(tabletIds.get(i));
 
             long partitionId = tabletMeta.getPartitionId();
             long backendId = tabletCommitInfos.get(i).getBackendId();
 
-            if (!tableToParttions.containsKey(tableId)) {
-                tableToParttions.put(tableId, Sets.newHashSet());
+            if (!lockContext.getTableToPartitions().containsKey(tableId)) {
+                lockContext.getTableToPartitions().put(tableId, 
Sets.newHashSet());
             }
-            tableToParttions.get(tableId).add(partitionId);
+            lockContext.getTableToPartitions().get(tableId).add(partitionId);
 
-            if (!backendToPartitionTablets.containsKey(backendId)) {
-                backendToPartitionTablets.put(backendId, Maps.newHashMap());
+            if 
(!lockContext.getBackendToPartitionTablets().containsKey(backendId)) {
+                lockContext.getBackendToPartitionTablets().put(backendId, 
Maps.newHashMap());
             }
-            Map<Long, List<Long>> partitionToTablets = 
backendToPartitionTablets.get(backendId);
+            Map<Long, List<Long>> partitionToTablets = 
lockContext.getBackendToPartitionTablets().get(backendId);
             if (!partitionToTablets.containsKey(partitionId)) {
                 partitionToTablets.put(partitionId, Lists.newArrayList());
             }
             partitionToTablets.get(partitionId).add(tabletIds.get(i));
-            partitions.putIfAbsent(partitionId, 
tableMap.get(tableId).getPartition(partitionId));
+            lockContext.getPartitions().putIfAbsent(partitionId, 
tableMap.get(tableId).getPartition(partitionId));
         }
     }
 
@@ -742,11 +730,10 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     }
 
     private Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
getCalcDeleteBitmapInfo(
-            Map<Long, Map<Long, List<Long>>> backendToPartitionTablets, 
Map<Long, Long> partitionVersions,
-                    Map<Long, Long> baseCompactionCnts, Map<Long, Long> 
cumulativeCompactionCnts,
-                            Map<Long, Long> cumulativePoints) {
+            DeleteBitmapUpdateLockContext lockContext) {
         Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos = Maps.newHashMap();
-        for (Map.Entry<Long, Map<Long, List<Long>>> entry : 
backendToPartitionTablets.entrySet()) {
+        Map<Long, Long> partitionVersions = 
getPartitionVersions(lockContext.getPartitions());
+        for (Map.Entry<Long, Map<Long, List<Long>>> entry : 
lockContext.getBackendToPartitionTablets().entrySet()) {
             List<TCalcDeleteBitmapPartitionInfo> partitionInfos = 
Lists.newArrayList();
             for (Map.Entry<Long, List<Long>> partitionToTablets : 
entry.getValue().entrySet()) {
                 Long partitionId = partitionToTablets.getKey();
@@ -754,15 +741,16 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                 TCalcDeleteBitmapPartitionInfo partitionInfo = new 
TCalcDeleteBitmapPartitionInfo(partitionId,
                         partitionVersions.get(partitionId),
                         tabletList);
-                if (!baseCompactionCnts.isEmpty() && 
!cumulativeCompactionCnts.isEmpty()
-                        && !cumulativePoints.isEmpty()) {
+                if (!lockContext.getBaseCompactionCnts().isEmpty()
+                        && !lockContext.getCumulativeCompactionCnts().isEmpty()
+                        && !lockContext.getCumulativePoints().isEmpty()) {
                     List<Long> reqBaseCompactionCnts = Lists.newArrayList();
                     List<Long> reqCumulativeCompactionCnts = 
Lists.newArrayList();
                     List<Long> reqCumulativePoints = Lists.newArrayList();
                     for (long tabletId : tabletList) {
-                        
reqBaseCompactionCnts.add(baseCompactionCnts.get(tabletId));
-                        
reqCumulativeCompactionCnts.add(cumulativeCompactionCnts.get(tabletId));
-                        
reqCumulativePoints.add(cumulativePoints.get(tabletId));
+                        
reqBaseCompactionCnts.add(lockContext.getBaseCompactionCnts().get(tabletId));
+                        
reqCumulativeCompactionCnts.add(lockContext.getCumulativeCompactionCnts().get(tabletId));
+                        
reqCumulativePoints.add(lockContext.getCumulativePoints().get(tabletId));
                     }
                     partitionInfo.setBaseCompactionCnts(reqBaseCompactionCnts);
                     
partitionInfo.setCumulativeCompactionCnts(reqCumulativeCompactionCnts);
@@ -775,10 +763,9 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         return backendToPartitionInfos;
     }
 
-    private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>> 
tableToParttions, long transactionId,
-            Map<Long, List<Long>> tableToTabletList, Map<Long, TabletMeta> 
tabletToTabletMeta,
-                    Map<Long, Long> baseCompactionCnts, Map<Long, Long> 
cumulativeCompactionCnts,
-                            Map<Long, Long> cumulativePoints) throws 
UserException {
+    private void getDeleteBitmapUpdateLock(long transactionId, List<OlapTable> 
mowTableList,
+            List<TabletCommitInfo> tabletCommitInfos, 
DeleteBitmapUpdateLockContext lockContext)
+            throws UserException {
         if 
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.sleep"))
 {
             DebugPoint debugPoint = DebugPointUtil.getDebugPoint(
                     
"CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.sleep");
@@ -811,17 +798,15 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
+        getPartitionInfo(mowTableList, tabletCommitInfos, lockContext);
         int totalRetryTime = 0;
-        for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) {
+        for (Map.Entry<Long, Set<Long>> entry : 
lockContext.getTableToPartitions().entrySet()) {
             GetDeleteBitmapUpdateLockRequest.Builder builder = 
GetDeleteBitmapUpdateLockRequest.newBuilder();
-            builder.setTableId(entry.getKey())
-                    .setLockId(transactionId)
-                    .setInitiator(-1)
-                    
.setExpiration(Config.delete_bitmap_lock_expiration_seconds)
-                    .setRequireCompactionStats(true);
-            List<Long> tabletList = tableToTabletList.get(entry.getKey());
+            
builder.setTableId(entry.getKey()).setLockId(transactionId).setInitiator(-1)
+                    
.setExpiration(Config.delete_bitmap_lock_expiration_seconds).setRequireCompactionStats(true);
+            List<Long> tabletList = 
lockContext.getTableToTabletList().get(entry.getKey());
             for (Long tabletId : tabletList) {
-                TabletMeta tabletMeta = tabletToTabletMeta.get(tabletId);
+                TabletMeta tabletMeta = 
lockContext.getTabletToTabletMeta().get(tabletId);
                 TabletIndexPB.Builder tabletIndexBuilder = 
TabletIndexPB.newBuilder();
                 tabletIndexBuilder.setDbId(tabletMeta.getDbId());
                 tabletIndexBuilder.setTableId(tabletMeta.getTableId());
@@ -838,16 +823,16 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                 try {
                     response = 
MetaServiceProxy.getInstance().getDeleteBitmapUpdateLock(request);
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("get delete bitmap lock, transactionId={}, 
Request: {}, Response: {}",
-                                transactionId, request, response);
+                        LOG.debug("get delete bitmap lock, transactionId={}, 
Request: {}, Response: {}", transactionId,
+                                request, response);
                     }
                     if (response.getStatus().getCode() != 
MetaServiceCode.LOCK_CONFLICT
                             && response.getStatus().getCode() != 
MetaServiceCode.KV_TXN_CONFLICT) {
                         break;
                     }
                 } catch (Exception e) {
-                    LOG.warn("ignore get delete bitmap lock exception, 
transactionId={}, retryTime={}",
-                            transactionId, retryTime, e);
+                    LOG.warn("ignore get delete bitmap lock exception, 
transactionId={}, retryTime={}", transactionId,
+                            retryTime, e);
                 }
                 // sleep random millis [20, 300] ms, avoid txn conflict
                 int randomMillis = 20 + (int) (Math.random() * (300 - 20));
@@ -863,8 +848,8 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             Preconditions.checkNotNull(response);
             Preconditions.checkNotNull(response.getStatus());
             if (response.getStatus().getCode() != MetaServiceCode.OK) {
-                LOG.warn("get delete bitmap lock failed, transactionId={}, for 
{} times, response:{}",
-                        transactionId, retryTime, response);
+                LOG.warn("get delete bitmap lock failed, transactionId={}, for 
{} times, response:{}", transactionId,
+                        retryTime, response);
                 if (response.getStatus().getCode() == 
MetaServiceCode.LOCK_CONFLICT
                         || response.getStatus().getCode() == 
MetaServiceCode.KV_TXN_CONFLICT) {
                     // DELETE_BITMAP_LOCK_ERR will be retried on be
@@ -885,30 +870,28 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             if (size1 != tabletList.size() || size2 != tabletList.size() || 
size3 != tabletList.size()) {
                 throw new UserException("The size of returned compaction cnts 
can't match the size of tabletList, "
                         + "tabletList.size()=" + tabletList.size() + ", 
respBaseCompactionCnts.size()=" + size1
-                                + ", respCumulativeCompactionCnts.size()=" + 
size2 + ", respCumulativePoints.size()="
-                                        + size3);
+                        + ", respCumulativeCompactionCnts.size()=" + size2 + 
", respCumulativePoints.size()=" + size3);
             }
             for (int i = 0; i < tabletList.size(); i++) {
                 long tabletId = tabletList.get(i);
-                baseCompactionCnts.put(tabletId, 
respBaseCompactionCnts.get(i));
-                cumulativeCompactionCnts.put(tabletId, 
respCumulativeCompactionCnts.get(i));
-                cumulativePoints.put(tabletId, respCumulativePoints.get(i));
+                lockContext.getBaseCompactionCnts().put(tabletId, 
respBaseCompactionCnts.get(i));
+                lockContext.getCumulativeCompactionCnts().put(tabletId, 
respCumulativeCompactionCnts.get(i));
+                lockContext.getCumulativePoints().put(tabletId, 
respCumulativePoints.get(i));
             }
             totalRetryTime += retryTime;
         }
         stopWatch.stop();
         if (totalRetryTime > 0 || stopWatch.getTime() > 20) {
-            LOG.info(
-                    "get delete bitmap lock successfully. txns: {}. 
totalRetryTime: {}. "
-                            + "partitionSize: {}. time cost: {} ms.",
-                    transactionId, totalRetryTime, tableToParttions.size(), 
stopWatch.getTime());
+            LOG.info("get delete bitmap lock successfully. txns: {}. 
totalRetryTime: {}. "
+                            + "partitionSize: {}. time cost: {} ms.", 
transactionId, totalRetryTime,
+                    lockContext.getTableToPartitions().size(), 
stopWatch.getTime());
         }
     }
 
-    private void removeDeleteBitmapUpdateLock(Map<Long, Set<Long>> 
tableToParttions, long transactionId) {
-        for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) {
+    private void removeDeleteBitmapUpdateLock(List<OlapTable> tableList, long 
transactionId) {
+        for (OlapTable table : tableList) {
             RemoveDeleteBitmapUpdateLockRequest.Builder builder = 
RemoveDeleteBitmapUpdateLockRequest.newBuilder();
-            builder.setTableId(entry.getKey())
+            builder.setTableId(table.getId())
                     .setLockId(transactionId)
                     .setInitiator(-1);
             final RemoveDeleteBitmapUpdateLockRequest request = 
builder.build();
@@ -935,6 +918,10 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     private void sendCalcDeleteBitmaptask(long dbId, long transactionId,
             Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos)
             throws UserException {
+        if (backendToPartitionInfos == null) {
+            throw new UserException("failed to send calculate delete bitmap 
task to be,transactionId=" + transactionId
+                    + ",but backendToPartitionInfos is null");
+        }
         if (backendToPartitionInfos.isEmpty()) {
             return;
         }
@@ -1131,7 +1118,8 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     @Override
     public void commitTransaction2PC(Database db, List<Table> tableList, long 
transactionId, long timeoutMillis)
             throws UserException {
-        commitTransaction(db.getId(), tableList, transactionId, null, null, 
true);
+        List<OlapTable> mowTableList = getMowTableList(tableList, null);
+        commitTransactionWithoutLock(db.getId(), tableList, transactionId, 
null, null, true, mowTableList, null);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java
new file mode 100644
index 00000000000..02886f63427
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/DeleteBitmapUpdateLockContext.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud.transaction;
+
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.TabletMeta;
+
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DeleteBitmapUpdateLockContext {
+    private Map<Long, Long> baseCompactionCnts;
+    private Map<Long, Long> cumulativeCompactionCnts;
+    private Map<Long, Long> cumulativePoints;
+    private Map<Long, Set<Long>> tableToPartitions;
+    private Map<Long, Partition> partitions;
+    private Map<Long, Map<Long, List<Long>>> backendToPartitionTablets;
+    private Map<Long, List<Long>> tableToTabletList;
+    private Map<Long, TabletMeta> tabletToTabletMeta;
+
+    public DeleteBitmapUpdateLockContext() {
+        baseCompactionCnts = Maps.newHashMap();
+        cumulativeCompactionCnts = Maps.newHashMap();
+        cumulativePoints = Maps.newHashMap();
+        tableToPartitions = Maps.newHashMap();
+        partitions = Maps.newHashMap();
+        backendToPartitionTablets = Maps.newHashMap();
+        tableToTabletList = Maps.newHashMap();
+        tabletToTabletMeta = Maps.newHashMap();
+    }
+
+    public Map<Long, List<Long>> getTableToTabletList() {
+        return tableToTabletList;
+    }
+
+    public Map<Long, Long> getBaseCompactionCnts() {
+        return baseCompactionCnts;
+    }
+
+    public Map<Long, Long> getCumulativeCompactionCnts() {
+        return cumulativeCompactionCnts;
+    }
+
+    public Map<Long, Long> getCumulativePoints() {
+        return cumulativePoints;
+    }
+
+    public Map<Long, Map<Long, List<Long>>> getBackendToPartitionTablets() {
+        return backendToPartitionTablets;
+    }
+
+    public Map<Long, Partition> getPartitions() {
+        return partitions;
+    }
+
+    public Map<Long, Set<Long>> getTableToPartitions() {
+        return tableToPartitions;
+    }
+
+    public Map<Long, TabletMeta> getTabletToTabletMeta() {
+        return tabletToTabletMeta;
+    }
+
+}
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out
new file mode 100644
index 00000000000..b8b3ea3ecca
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql --
+
+-- !sql --
+5      e       90
+6      f       100
+
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
new file mode 100644
index 00000000000..fa71c3644f2
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_stream_load_with_commit_fail.groovy
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_stream_load_with_commit_fail", "nonConcurrent") {
+    GetDebugPoint().clearDebugPointsForAllFEs()
+
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    def backendId_to_params = [string: [:]]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    def set_be_param = { paramName, paramValue ->
+        // for eache be node, set paramName=paramValue
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
paramValue))
+            assertTrue(out.contains("OK"))
+        }
+    }
+
+    def reset_be_param = { paramName ->
+        // for eache be node, reset paramName to default
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            def original_value = backendId_to_params.get(id).get(paramName)
+            def (code, out, err) = curl("POST", 
String.format("http://%s:%s/api/update_config?%s=%s";, beIp, bePort, paramName, 
original_value))
+            assertTrue(out.contains("OK"))
+        }
+    }
+
+    def get_be_param = { paramName ->
+        // for eache be node, get param value by default
+        def paramValue = ""
+        for (String id in backendId_to_backendIP.keySet()) {
+            def beIp = backendId_to_backendIP.get(id)
+            def bePort = backendId_to_backendHttpPort.get(id)
+            // get the config value from be
+            def (code, out, err) = curl("GET", 
String.format("http://%s:%s/api/show_config?conf_item=%s";, beIp, bePort, 
paramName))
+            assertTrue(code == 0)
+            assertTrue(out.contains(paramName))
+            // parsing
+            def resultList = parseJson(out)[0]
+            assertTrue(resultList.size() == 4)
+            // get original value
+            paramValue = resultList[2]
+            backendId_to_params.get(id, [:]).put(paramName, paramValue)
+        }
+    }
+
+    def customFeConfig = [
+            calculate_delete_bitmap_task_timeout_seconds: 2,
+            meta_service_rpc_retry_times                : 5
+    ]
+
+    // store the original value
+    get_be_param("mow_stream_load_commit_retry_times")
+    // disable retry to make this problem more clear
+    set_be_param("mow_stream_load_commit_retry_times", "1")
+
+
+    def tableName = "tbl_basic"
+    setFeConfigTemporary(customFeConfig) {
+        try {
+            // create table
+            sql """ drop table if exists ${tableName}; """
+
+            sql """
+        CREATE TABLE `${tableName}` (
+            `id` int(11) NOT NULL,
+            `name` varchar(1100) NULL,
+            `score` int(11) NULL default "-1"
+        ) ENGINE=OLAP
+        UNIQUE KEY(`id`)
+        DISTRIBUTED BY HASH(`id`) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "true",
+            "replication_num" = "1"
+        );
+        """
+            // this streamLoad will fail on fe commit phase
+            
GetDebugPoint().enableDebugPointForAllFEs('FE.mow.commit.exception', null)
+            streamLoad {
+                table "${tableName}"
+
+                set 'column_separator', ','
+                set 'columns', 'id, name, score'
+                file "test_stream_load.csv"
+
+                time 10000 // limit inflight 10s
+
+                check { result, exception, startTime, endTime ->
+                    log.info("Stream load result: ${result}")
+                    def json = parseJson(result)
+                    assertEquals("fail", json.Status.toLowerCase())
+                    
assertTrue(json.Message.contains("FE.mow.commit.exception"))
+                }
+            }
+            qt_sql """ select * from ${tableName} order by id"""
+
+            // this streamLoad will success because of removing exception 
injection
+            
GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
+            streamLoad {
+                table "${tableName}"
+
+                set 'column_separator', ','
+                set 'columns', 'id, name, score'
+                file "test_stream_load.csv"
+
+                time 10000 // limit inflight 10s
+
+                check { result, exception, startTime, endTime ->
+                    log.info("Stream load result: ${result}")
+                    def json = parseJson(result)
+                    assertEquals("success", json.Status.toLowerCase())
+                }
+            }
+            qt_sql """ select * from ${tableName} order by id"""
+        } finally {
+            reset_be_param("mow_stream_load_commit_retry_times")
+            
GetDebugPoint().disableDebugPointForAllFEs('FE.mow.commit.exception')
+            sql "DROP TABLE IF EXISTS ${tableName};"
+            GetDebugPoint().clearDebugPointsForAllFEs()
+        }
+
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to