yujun777 commented on code in PR #35970:
URL: https://github.com/apache/doris/pull/35970#discussion_r1636229061


##########
fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java:
##########
@@ -2620,58 +2684,93 @@ private PublishResult 
finishCheckQuorumReplicas(TransactionState transactionStat
 
         Map<Long, List<PublishVersionTask>> publishTasks = 
transactionState.getPublishVersionTasks();
         PublishResult publishResult = PublishResult.QUORUM_SUCC;
-        for (SubTransactionState subTransactionState : 
transactionState.getSubTransactionStates()) {
-            long subTxnId = subTransactionState.getSubTransactionId();
+        Map<Long, List<Long>> partitionToSubTxns = new HashMap<>();
+        for (Long subTxnId : transactionState.getSubTxnIds()) {
             TableCommitInfo tableCommitInfo = 
transactionState.getTableCommitInfoBySubTxnId(subTxnId);
             if (tableCommitInfo == null) {
                 continue;
             }
-            OlapTable table = (OlapTable) subTransactionState.getTable();
-            long tableId = table.getId();
-            for (Entry<Long, PartitionCommitInfo> entry : 
tableCommitInfo.getIdToPartitionCommitInfo().entrySet()) {
-                long partitionId = entry.getValue().getPartitionId();
+            Table tableIf = 
database.getTableNullable(tableCommitInfo.getTableId());
+            if (tableIf == null) {
+                continue;
+            }
+            OlapTable table = (OlapTable) tableIf;
+            for (Entry<Long, PartitionCommitInfo> entry : 
tableCommitInfo.getIdToPartitionCommitInfo()
+                    .entrySet()) {
+                long partitionId = entry.getKey();
                 Partition partition = table.getPartition(partitionId);
                 if (partition == null) {
                     continue;
                 }
-                int loadRequiredReplicaNum = 
table.getLoadRequiredReplicaNum(partitionId);
-
-                // TODO should use sub transaction load indexes
-                List<MaterializedIndex> allIndices;
-                if (transactionState.getLoadedTblIndexes().isEmpty()
-                        || transactionState.getLoadedTblIndexes().get(tableId) 
== null) {
-                    allIndices = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
-                } else {
-                    allIndices = Lists.newArrayList();
-                    for (long indexId : 
transactionState.getLoadedTblIndexes().get(tableId)) {
-                        MaterializedIndex index = partition.getIndex(indexId);
-                        if (index != null) {
-                            allIndices.add(index);
-                        }
+                partitionToSubTxns.compute(partitionId, (k, v) -> {
+                    if (v == null) {
+                        v = Lists.newArrayList();
+                    }
+                    v.add(subTxnId);
+                    return v;
+                });
+            }
+        }
+        for (Entry<Long, List<Long>> entry : partitionToSubTxns.entrySet()) {
+            long partitionId = entry.getKey();
+            List<Long> subTxnIds = entry.getValue();
+            if (subTxnIds.isEmpty()) {
+                continue;
+            }
+            TableCommitInfo tableCommitInfo = 
transactionState.getTableCommitInfoBySubTxnId(subTxnIds.get(0));
+            long tableId = tableCommitInfo.getTableId();
+            Table tableIf = 
database.getTableNullable(tableCommitInfo.getTableId());
+            if (tableIf == null) {
+                continue;
+            }
+            OlapTable table = (OlapTable) tableIf;
+            Partition partition = table.getPartition(partitionId);
+            if (partition == null) {
+                continue;
+            }
+            int loadRequiredReplicaNum = 
table.getLoadRequiredReplicaNum(partitionId);
+            // TODO should use sub transaction load indexes
+            List<MaterializedIndex> allIndices;
+            if (transactionState.getLoadedTblIndexes().isEmpty()
+                    || transactionState.getLoadedTblIndexes().get(tableId) == 
null) {
+                allIndices = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
+            } else {
+                allIndices = Lists.newArrayList();
+                for (long indexId : 
transactionState.getLoadedTblIndexes().get(tableId)) {
+                    MaterializedIndex index = partition.getIndex(indexId);
+                    if (index != null) {
+                        allIndices.add(index);
                     }
                 }
-
-                boolean alterReplicaLoadedTxn = 
isAlterReplicaLoadedTxn(transactionState.getTransactionId(), table);
-
-                // check success replica number for each tablet.
-                // a success replica means:
-                //  1. Not in errorReplicaIds: succeed in both commit and 
publish phase
-                //  2. last failed version < 0: is a health replica before
-                //  3. version catch up: not with a stale version
-                // Here we only check number, the replica version will be 
updated in updateCatalogAfterVisible()
-                for (MaterializedIndex index : allIndices) {
-                    for (Tablet tablet : 
partition.getIndex(index.getId()).getTablets()) {
-                        List<Replica> tabletSuccReplicas = 
Lists.newArrayList();
-                        List<Replica> tabletWriteFailedReplicas = 
Lists.newArrayList();
-                        List<Replica> tabletVersionFailedReplicas = 
Lists.newArrayList();
-                        // TODO always use the visible version because the 
replica version is not changed
-                        long newVersion = partition.getVisibleVersion() + 1;
-                        for (Replica replica : tablet.getReplicas()) {
-                            List<PublishVersionTask> publishVersionTasks = 
publishTasks.get(replica.getBackendId());
-                            PublishVersionTask publishVersionTask = null;
-                            if (publishVersionTasks != null) {
+            }
+            long minSubTxnId = subTxnIds.get(0);
+            long minVersion = 
transactionState.getTableCommitInfoBySubTxnId(minSubTxnId).getIdToPartitionCommitInfo()
+                    .get(partitionId).getVersion();
+            long maxSubTxnId = subTxnIds.get(subTxnIds.size() - 1);
+            long maxVersion = 
transactionState.getTableCommitInfoBySubTxnId(maxSubTxnId).getIdToPartitionCommitInfo()
+                    .get(partitionId).getVersion();
+            LOG.debug("txn_id={}, partition={}, min_version={}, 
max_version={}", transactionState.getTransactionId(),
+                    partitionId, minVersion, maxVersion);
+            boolean alterReplicaLoadedTxn = 
isAlterReplicaLoadedTxn(minVersion, table);
+            // check success replica number for each tablet.
+            // a success replica means:
+            //  1. Not in errorReplicaIds: succeed in both commit and publish 
phase
+            //  2. last failed version < 0: is a health replica before
+            //  3. version catch up: not with a stale version
+            // Here we only check number, the replica version will be updated 
in updateCatalogAfterVisible()
+            for (MaterializedIndex index : allIndices) {
+                for (Tablet tablet : 
partition.getIndex(index.getId()).getTablets()) {
+                    List<Replica> tabletSuccReplicas = Lists.newArrayList();
+                    List<Replica> tabletWriteFailedReplicas = 
Lists.newArrayList();
+                    List<Replica> tabletVersionFailedReplicas = 
Lists.newArrayList();
+                    for (Replica replica : tablet.getReplicas()) {
+                        List<PublishVersionTask> publishVersionTasks = 
publishTasks.get(replica.getBackendId());
+                        List<PublishVersionTask> replicaTasks = new 
ArrayList<>();
+                        if (publishVersionTasks != null) {

Review Comment:
   when publishVersionTasks == null,  replicaTasks will be empty.   But 
replicaTasks' size should be the same with subTxnIds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to