This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e0d843501 [fix](publish) publish go ahead even if quorum is not met 
(#23806)
9e0d843501 is described below

commit 9e0d843501f74bd77090afda5e37506bb0d70387
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Tue Sep 12 14:29:01 2023 +0800

    [fix](publish) publish go ahead even if quorum is not met (#23806)
    
    
    
    Co-authored-by: Yongqiang YANG <dataroar...@gmail.com>
---
 .../main/java/org/apache/doris/common/Config.java  |   7 +
 .../java/org/apache/doris/catalog/Replica.java     |  26 +++
 .../doris/transaction/DatabaseTransactionMgr.java  | 208 ++++++++++++---------
 .../apache/doris/transaction/TransactionState.java |  14 ++
 4 files changed, 166 insertions(+), 89 deletions(-)

diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index fd5258e0c0..34160ab75d 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -439,6 +439,13 @@ public class Config extends ConfigBase {
             "Maximal waiting time for all publish version tasks of one 
transaction to be finished, in seconds."})
     public static int publish_version_timeout_second = 30; // 30 seconds
 
+    @ConfField(mutable = true, masterOnly = true, description = {"导入 Publish 
阶段的等待时间,单位是秒。超过此时间,"
+            + "则只需每个tablet包含一个成功副本,则导入成功。值为 -1 时,表示无限等待。",
+            "Waiting time for one transaction changing to \"at least one 
replica success\", in seconds."
+            + "If time exceeds this, and for each tablet it has at least one 
replica publish successful, "
+            + "then the load task will be successful." })
+    public static int publish_wait_time_second = 300;
+
     @ConfField(mutable = true, masterOnly = true, description = 
{"提交事务的最大超时时间,单位是秒。"
             + "该参数仅用于事务型 insert 操作中。",
             "Maximal waiting time for all data inserted before one transaction 
to be committed, in seconds. "
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index c444cdf1db..c76dc5e0ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -487,6 +487,32 @@ public class Replica implements Writable {
         return strBuffer.toString();
     }
 
+    public String toStringSimple(boolean checkBeAlive) {
+        StringBuilder strBuffer = new StringBuilder("[replicaId=");
+        strBuffer.append(id);
+        strBuffer.append(", backendId=");
+        strBuffer.append(backendId);
+        if (checkBeAlive) {
+            strBuffer.append(", backendAlive=");
+            
strBuffer.append(Env.getCurrentSystemInfo().checkBackendAlive(backendId));
+        }
+        strBuffer.append(", version=");
+        strBuffer.append(version);
+        if (lastFailedVersion > 0) {
+            strBuffer.append(", lastFailedVersion=");
+            strBuffer.append(lastFailedVersion);
+            strBuffer.append(", lastSuccessVersion=");
+            strBuffer.append(lastSuccessVersion);
+            strBuffer.append(", lastFailedTimestamp=");
+            strBuffer.append(lastFailedTimestamp);
+        }
+        strBuffer.append(", state=");
+        strBuffer.append(state.name());
+        strBuffer.append("]");
+
+        return strBuffer.toString();
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         out.writeLong(id);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index dba5d24855..8397499ded 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -81,7 +81,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -95,6 +94,12 @@ import java.util.stream.Collectors;
 
 public class DatabaseTransactionMgr {
 
+    private enum PublishResult {
+        FAILED,
+        TIMEOUT_SUCC,  // each tablet has least one replica succ, and timeout
+        QUORUM_SUCC,   // each tablet has least quorum replicas succ
+    }
+
     private static final Logger LOG = 
LogManager.getLogger(DatabaseTransactionMgr.class);
     // the max number of txn that can be remove per round.
     // set it to avoid holding lock too long when removing too many txns per 
round.
@@ -485,32 +490,9 @@ public class DatabaseTransactionMgr {
             }
             
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
         }
-        List<String> tabletSuccReplicas = Lists.newArrayList();
-        List<String> tabletWriteFailedReplicas = Lists.newArrayList();
-        List<String> tabletVersionFailedReplicas = Lists.newArrayList();
-        Function<Replica, String> getReplicaInfo = replica -> {
-            StringBuilder strBuffer = new StringBuilder("[replicaId=");
-            strBuffer.append(replica.getId());
-            strBuffer.append(", backendId=");
-            strBuffer.append(replica.getBackendId());
-            strBuffer.append(", backendAlive=");
-            
strBuffer.append(Env.getCurrentSystemInfo().checkBackendAlive(replica.getBackendId()));
-            strBuffer.append(", version=");
-            strBuffer.append(replica.getVersion());
-            if (replica.getLastFailedVersion() >= 0) {
-                strBuffer.append(", lastFailedVersion=");
-                strBuffer.append(replica.getLastFailedVersion());
-                strBuffer.append(", lastSuccessVersion=");
-                strBuffer.append(replica.getLastSuccessVersion());
-                strBuffer.append(", lastFailedTimestamp=");
-                strBuffer.append(replica.getLastFailedTimestamp());
-            }
-            strBuffer.append(", state=");
-            strBuffer.append(replica.getState().name());
-            strBuffer.append("]");
-
-            return strBuffer.toString();
-        };
+        List<Replica> tabletSuccReplicas = Lists.newArrayList();
+        List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
+        List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
 
         for (long tableId : tableToPartition.keySet()) {
             OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
@@ -558,15 +540,12 @@ public class DatabaseTransactionMgr {
                         tabletSuccReplicas.clear();
                         tabletWriteFailedReplicas.clear();
                         tabletVersionFailedReplicas.clear();
-                        int successReplicaNum = 0;
                         long tabletId = tablet.getId();
                         Set<Long> tabletBackends = tablet.getBackendIds();
                         totalInvolvedBackends.addAll(tabletBackends);
                         Set<Long> commitBackends = 
tabletToBackends.get(tabletId);
                         // save the error replica ids for current tablet
                         // this param is used for log
-                        Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
-                        String errorReplicaInfo = new String();
                         for (long tabletBackend : tabletBackends) {
                             Replica replica = 
tabletInvertedIndex.getReplica(tabletId, tabletBackend);
                             if (replica == null) {
@@ -582,55 +561,28 @@ public class DatabaseTransactionMgr {
                                 // ignore it but not log it
                                 // for example, a replica is in clone state
                                 if (replica.getLastFailedVersion() < 0) {
-                                    ++successReplicaNum;
-                                    
tabletSuccReplicas.add(getReplicaInfo.apply(replica));
+                                    tabletSuccReplicas.add(replica);
                                 } else {
-                                    errorReplicaInfo += " replica [" + 
replica.getId() + "], lastFailedVersion ["
-                                                        + 
replica.getLastFailedVersion() + "]";
-                                    
tabletVersionFailedReplicas.add(getReplicaInfo.apply(replica));
+                                    tabletVersionFailedReplicas.add(replica);
                                 }
                             } else {
-                                
tabletWriteFailedReplicas.add(getReplicaInfo.apply(replica));
-                                errorBackendIdsForTablet.add(tabletBackend);
+                                tabletWriteFailedReplicas.add(replica);
                                 errorReplicaIds.add(replica.getId());
-                                // not remove rollup task here, because the 
commit maybe failed
-                                // remove rollup task when commit successfully
-                                errorReplicaInfo += " replica [" + 
replica.getId() + "] commitBackends null or "
-                                                    + "tabletBackend [" + 
tabletBackend + "] does not "
-                                                    + "in commitBackends";
                             }
                         }
 
+                        int successReplicaNum = tabletSuccReplicas.size();
                         if (successReplicaNum < quorumReplicaNum) {
-                            LOG.warn("Failed to commit txn [{}]. "
-                                            + "Tablet [{}] success replica num 
is {} < quorum replica num {} "
-                                            + "while error backends {} error 
replica info {} commitBackends {}",
-                                    transactionState.getTransactionId(), 
tablet.getId(), successReplicaNum,
-                                    quorumReplicaNum, 
Joiner.on(",").join(errorBackendIdsForTablet),
-                                    errorReplicaInfo, commitBackends);
-
-                            String replicasDetailMsg = "";
-                            if (!tabletSuccReplicas.isEmpty()) {
-                                replicasDetailMsg += String.format("%s 
replicas final succ: { %s }; ",
-                                        tabletSuccReplicas.size(), 
Joiner.on(", ").join(tabletSuccReplicas));
-                            }
-                            if (!tabletWriteFailedReplicas.isEmpty()) {
-                                replicasDetailMsg += String.format("%s 
replicas write data failed: { %s }; ",
-                                        tabletWriteFailedReplicas.size(),
-                                        Joiner.on(", 
").join(tabletWriteFailedReplicas));
-                            }
-                            if (!tabletVersionFailedReplicas.isEmpty()) {
-                                replicasDetailMsg += String.format("%s 
replicas write data succ but miss previous "
-                                                + "version: { %s }.",
-                                        tabletVersionFailedReplicas.size(),
-                                        Joiner.on(", 
").join(tabletVersionFailedReplicas));
-                            }
+                            String writeDetail = 
getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
+                                    tabletVersionFailedReplicas);
+
+                            String errMsg = String.format("Failed to commit 
txn %s, cause tablet %s succ replica "
+                                    + "num %s < quorum replica num %s. table 
%s, partition %s, this tablet detail: %s",
+                                    transactionId, tablet.getId(), 
successReplicaNum, quorumReplicaNum, tableId,
+                                    partition.getId(), writeDetail);
+                            LOG.info(errMsg);
 
-                            throw new 
TabletQuorumFailedException(transactionId, String.format(
-                                        "Failed to commit txn %s, cause tablet 
%s succ replica num %s < quorum "
-                                                + " replica num %s. table %s, 
partition %s, this tablet detail: %s",
-                                        transactionId, tablet.getId(), 
successReplicaNum, quorumReplicaNum, tableId,
-                                        partition.getId(), replicasDetailMsg));
+                            throw new 
TabletQuorumFailedException(transactionId, errMsg);
                         }
                     }
                 }
@@ -638,6 +590,32 @@ public class DatabaseTransactionMgr {
         }
     }
 
+    private String getTabletWriteDetail(List<Replica> tabletSuccReplicas, 
List<Replica> tabletWriteFailedReplicas,
+            List<Replica> tabletVersionFailedReplicas) {
+        String writeDetail = "";
+        if (!tabletSuccReplicas.isEmpty()) {
+            writeDetail += String.format("%s replicas final succ: { %s }; ",
+                    tabletSuccReplicas.size(), Joiner.on(", ").join(
+                            tabletSuccReplicas.stream().map(replica -> 
replica.toStringSimple(true))
+                                    .collect(Collectors.toList())));
+        }
+        if (!tabletWriteFailedReplicas.isEmpty()) {
+            writeDetail += String.format("%s replicas write data failed: { %s 
}; ",
+                    tabletWriteFailedReplicas.size(), Joiner.on(", ").join(
+                            tabletWriteFailedReplicas.stream().map(replica -> 
replica.toStringSimple(true))
+                                    .collect(Collectors.toList())));
+        }
+        if (!tabletVersionFailedReplicas.isEmpty()) {
+            writeDetail += String.format("%s replicas write data succ but miss 
previous "
+                            + "version: { %s }.",
+                    tabletVersionFailedReplicas.size(), Joiner.on(",").join(
+                            tabletVersionFailedReplicas.stream().map(replica 
-> replica.toStringSimple(true))
+                                    .collect(Collectors.toList())));
+        }
+
+        return writeDetail;
+    }
+
     /**
      * commit transaction process as follows:
      * 1. validate whether `Load` is cancelled
@@ -907,6 +885,18 @@ public class DatabaseTransactionMgr {
             errorReplicaIds.addAll(originalErrorReplicas);
         }
 
+        long now = System.currentTimeMillis();
+        long firstPublishOneSuccTime = 
transactionState.getFirstPublishOneSuccTime();
+        boolean allowPublishOneSucc = false;
+        if (Config.publish_wait_time_second > 0 && firstPublishOneSuccTime > 0
+                && now >= firstPublishOneSuccTime + 
Config.publish_wait_time_second * 1000L) {
+            allowPublishOneSucc = true;
+        }
+
+        List<Replica> tabletSuccReplicas = Lists.newArrayList();
+        List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
+        List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
+
         // case 1 If database is dropped, then we just throw 
MetaNotFoundException, because all related tables are
         // already force dropped, we just ignore the transaction with all 
tables been force dropped.
         // case 2 If at least one table lock successfully, which means that 
the transaction should be finished for
@@ -920,8 +910,9 @@ public class DatabaseTransactionMgr {
         LOG.debug("finish transaction {} with tables {}", transactionId, 
tableIdList);
         List<? extends TableIf> tableList = 
db.getTablesOnIdOrderIfExist(tableIdList);
         tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
+        PublishResult publishResult = PublishResult.QUORUM_SUCC;
         try {
-            boolean hasError = false;
+            boolean allTabletsLeastOneSucc = true;
             Iterator<TableCommitInfo> tableCommitInfoIterator
                     = 
transactionState.getIdToTableCommitInfos().values().iterator();
             while (tableCommitInfoIterator.hasNext()) {
@@ -985,48 +976,87 @@ public class DatabaseTransactionMgr {
                     // Here we only check number, the replica version will be 
updated in updateCatalogAfterVisible()
                     for (MaterializedIndex index : allIndices) {
                         for (Tablet tablet : index.getTablets()) {
-                            int healthReplicaNum = 0;
+                            tabletSuccReplicas.clear();
+                            tabletWriteFailedReplicas.clear();
+                            tabletVersionFailedReplicas.clear();
                             for (Replica replica : tablet.getReplicas()) {
                                 if 
(!errorReplicaIds.contains(replica.getId())) {
                                     if 
(replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
-                                        ++healthReplicaNum;
+                                        tabletSuccReplicas.add(replica);
                                     } else {
-                                        LOG.info("publish version {} failed 
for transaction {} on tablet {},"
-                                                 + " on replica {} due to not 
catchup",
-                                                 
partitionCommitInfo.getVersion(), transactionState, tablet,
-                                                 replica);
+                                        
tabletVersionFailedReplicas.add(replica);
                                     }
                                 } else if (replica.getVersion() >= 
partitionCommitInfo.getVersion()) {
                                     // the replica's version is larger than or 
equal to current transaction
                                     // partition's version the replica is 
normal, then remove it from error replica ids
                                     // TODO(cmy): actually I have no idea why 
we need this check
+                                    tabletSuccReplicas.add(replica);
                                     errorReplicaIds.remove(replica.getId());
-                                    ++healthReplicaNum;
                                 } else {
-                                    LOG.info("publish version {} failed for 
transaction {} on tablet {},"
-                                             + " on replica {} due to version 
hole or error",
-                                             partitionCommitInfo.getVersion(), 
transactionState, tablet, replica);
+                                    tabletWriteFailedReplicas.add(replica);
+                                }
+                            }
+
+                            int healthReplicaNum = tabletSuccReplicas.size();
+                            if (healthReplicaNum >= quorumReplicaNum) {
+                                if (!tabletWriteFailedReplicas.isEmpty() || 
!tabletVersionFailedReplicas.isEmpty()) {
+                                    String writeDetail = 
getTabletWriteDetail(tabletSuccReplicas,
+                                            tabletWriteFailedReplicas, 
tabletVersionFailedReplicas);
+                                    LOG.info("publish version quorum succ for 
transaction {} on tablet {} with version"
+                                            + " {}, and has failed replicas, 
quorum num {}. table {}, partition {},"
+                                            + " tablet detail: {}",
+                                            transactionState, tablet, 
partitionCommitInfo.getVersion(),
+                                            quorumReplicaNum, tableId, 
partitionId, writeDetail);
                                 }
+                                continue;
                             }
 
-                            if (healthReplicaNum < quorumReplicaNum) {
-                                LOG.info("publish version {} failed for 
transaction {} on tablet {},"
-                                                + " with only {} replicas less 
than quorum {}",
-                                         partitionCommitInfo.getVersion(), 
transactionState, tablet, healthReplicaNum,
-                                         quorumReplicaNum);
+                            if (healthReplicaNum == 0) {
+                                allTabletsLeastOneSucc = false;
+                            }
+
+                            String writeDetail = 
getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
+                                    tabletVersionFailedReplicas);
+                            if (allowPublishOneSucc && healthReplicaNum > 0) {
+                                if (publishResult == 
PublishResult.QUORUM_SUCC) {
+                                    publishResult = PublishResult.TIMEOUT_SUCC;
+                                }
+                                // We can not do any thing except retrying,
+                                // because publish task is assigned a version,
+                                // and doris does not permit discontinuous
+                                // versions.
+                                //
+                                // If a timeout happens, it means that the 
rowset
+                                // that are being publised exists on a few 
replicas we should go
+                                // ahead, otherwise data may be lost and thre
+                                // publish task hangs forever.
+                                LOG.info("publish version timeout succ for 
transaction {} on tablet {} with version"
+                                        + " {}, and has failed replicas, 
quorum num {}. table {}, partition {},"
+                                        + " tablet detail: {}",
+                                        transactionState, tablet, 
partitionCommitInfo.getVersion(), quorumReplicaNum,
+                                        tableId, partitionId, writeDetail);
+                            } else {
+                                publishResult = PublishResult.FAILED;
                                 String errMsg = String.format("publish on 
tablet %d failed."
                                                 + " succeed replica num %d 
less than quorum %d."
                                                 + " table: %d, partition: %d, 
publish version: %d",
                                         tablet.getId(), healthReplicaNum, 
quorumReplicaNum, tableId,
                                         partitionId, 
partition.getVisibleVersion() + 1);
                                 transactionState.setErrorMsg(errMsg);
-                                hasError = true;
+                                LOG.info("publish version failed for 
transaction {} on tablet {} with version"
+                                        + " {}, and has failed replicas, 
quorum num {}. table {}, partition {},"
+                                        + " tablet detail: {}",
+                                        transactionState, tablet, 
partitionCommitInfo.getVersion(), quorumReplicaNum,
+                                        tableId, partitionId, writeDetail);
                             }
                         }
                     }
                 }
             }
-            if (hasError) {
+            if (allTabletsLeastOneSucc && firstPublishOneSuccTime <= 0) {
+                transactionState.setFirstPublishOneSuccTime(now);
+            }
+            if (publishResult == PublishResult.FAILED) {
                 return;
             }
             boolean txnOperated = false;
@@ -1060,7 +1090,7 @@ public class DatabaseTransactionMgr {
         // Otherwise, there is no way for stream load to query the result 
right after loading finished,
         // even if we call "sync" before querying.
         transactionState.countdownVisibleLatch();
-        LOG.info("finish transaction {} successfully", transactionState);
+        LOG.info("finish transaction {} successfully, publish result: {}", 
transactionState, publishResult.name());
     }
 
     protected void unprotectedPreCommitTransaction2PC(TransactionState 
transactionState, Set<Long> errorReplicaIds,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
index b15329e6b0..897bc3b63b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java
@@ -219,8 +219,14 @@ public class TransactionState implements Writable {
     private long publishVersionTime = -1;
     private TransactionStatus preStatus = null;
 
+    // When publish txn, if every tablet has at least 1 replica published 
succ, but not quorum replicas succ,
+    // and time since firstPublishOneSuccTime has exceeds 
Config.publish_wait_time_second,
+    // then this transaction will become visible.
+    private long firstPublishOneSuccTime = -1;
+
     @SerializedName(value = "callbackId")
     private long callbackId = -1;
+
     // In the beforeStateTransform() phase, we will get the callback object 
through the callbackId,
     // and if we get it, we will save it in this variable.
     // The main function of this variable is to retain a reference to this 
callback object.
@@ -387,6 +393,14 @@ public class TransactionState implements Writable {
         return errorLogUrl;
     }
 
+    public long getFirstPublishOneSuccTime() {
+        return firstPublishOneSuccTime;
+    }
+
+    public void setFirstPublishOneSuccTime(long firstPublishOneSuccTime) {
+        this.firstPublishOneSuccTime = firstPublishOneSuccTime;
+    }
+
     public void setTransactionStatus(TransactionStatus transactionStatus) {
         // status changed
         this.preStatus = this.transactionStatus;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to