gavinchou commented on code in PR #36786:
URL: https://github.com/apache/doris/pull/36786#discussion_r1666977144


##########
fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java:
##########
@@ -627,6 +652,109 @@ public void executeDynamicPartition(Collection<Pair<Long, 
Long>> dynamicPartitio
                         }
                     }
                 }
+                List<Long> succeedPartitionIds = 
cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo,
+                        addPartitionClauses, db, olapTable, indexIds, 
tableName);
+                // cloud mode, check recycle key not remained
+                if (Config.isCloudMode() && executeFirstTime) {
+                    
Env.getCurrentInternalCatalog().checkCreatePartitions(db.getId(), 
olapTable.getId(),
+                            succeedPartitionIds, indexIds);
+                }
+            }
+        }
+    }
+
+    private List<Long> cloudBatchAfterCreatePartitions(boolean 
executeFirstTime, List<PartitionPersistInfo> partsInfo,
+                                                       
ArrayList<AddPartitionClause> addPartitionClauses, Database db,
+                                                       OlapTable olapTable, 
List<Long> indexIds,
+                                                       String tableName) 
throws DdlException {
+        if (Config.isNotCloudMode()) {
+            return new ArrayList<>();
+        }
+        List<Long> succeedPartitionIds = 
partsInfo.stream().map(partitionPersistInfo
+                -> 
partitionPersistInfo.getPartition().getId()).collect(Collectors.toList());
+        if (executeFirstTime && !addPartitionClauses.isEmpty()) {
+            try {
+                // ATTN: failedPids = generatedPartitionIds - 
succeedPartitionIds,
+                // means some partitions failed when addPartition, failedPids 
will be recycled by recycler
+                if 
(DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.commitCloudPartition"))
 {
+                    LOG.info("debug point 
FE.DynamicPartitionScheduler.before.commitCloudPartition, throw e");
+                    // not commit, not log edit
+                    throw new Exception("debug point 
FE.DynamicPartitionScheduler.before.commitCloudPartition");
+                }
+                
Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), 
olapTable.getId(),
+                        succeedPartitionIds, indexIds, true);
+                LOG.info("begin write edit log to add partitions in batch, "
+                        + "numPartitions: {}, db: {}, table: {}, tableId: {}",
+                        partsInfo.size(), db.getFullName(), tableName, 
olapTable.getId());
+                // ATTN: here, edit log must after commit cloud partition,
+                // prevent commit RPC failure from causing data loss
+                if 
(DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions"))
 {
+                    LOG.info("debug point 
FE.DynamicPartitionScheduler.before.logEditPartitions, throw e");
+                    // committed, but not log edit
+                    throw new Exception("debug point 
FE.DynamicPartitionScheduler.before.commitCloudPartition");
+                }
+                for (int i = 0; i < partsInfo.size(); i++) {
+                    
Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i));
+                    if 
(DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
+                        if (i == partsInfo.size() / 2) {
+                            LOG.info("debug point 
FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
+                            // committed, but log some edit, others failed
+                            throw new Exception("debug point 
FE.DynamicPartitionScheduler"
+                                + ".in.commitCloudPartition");
+                        }
+                    }
+                }
+                LOG.info("finish write edit log to add partitions in batch, "
+                        + "numPartitions: {}, db: {}, table: {}, tableId: {}",
+                        partsInfo.size(), db.getFullName(), tableName, 
olapTable.getId());
+            } catch (Exception e) {
+                LOG.warn("cloud in commit step, dbName {}, tableName {}, 
tableId {} exception {}",
+                        db.getFullName(), tableName, olapTable.getId(), 
e.getMessage());
+                recordCreatePartitionFailedMsg(db.getFullName(), tableName, 
e.getMessage(), olapTable.getId());
+                throw new DdlException("cloud in commit step err");
+            }
+        }
+        return succeedPartitionIds;
+    }
+
+    private void cloudBatchBeforeCreatePartitions(boolean executeFirstTime,
+                                                  
ArrayList<AddPartitionClause> addPartitionClauses,
+                                                  OlapTable olapTable, 
List<Long> indexIds, Database db,
+                                                  String tableName, List<Long> 
generatedPartitionIds)
+            throws DdlException {
+        if (Config.isNotCloudMode()) {
+            return;
+        }
+        if (executeFirstTime && !addPartitionClauses.isEmpty()) {

Review Comment:
   ```suggestion
           if (!executeFirstTime || addPartitionClauses.isEmpty()) return;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to