morningman commented on code in PR #23635:
URL: https://github.com/apache/doris/pull/23635#discussion_r1310454185


##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportTaskExecutor.java:
##########
@@ -85,8 +90,18 @@ public void execute() throws JobException {
                 OlapTable table = 
db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl());
                 table.readLock();
                 try {
-                    SelectStmt selectStmt = selectStmtLists.get(idx);
-                    List<Long> tabletIds = 
selectStmt.getTableRefs().get(0).getSampleTabletIds();
+                    List<Long> tabletIds;
+                    if 
(exportJob.getSessionVariables().isEnableNereidsPlanner()) {
+                        LogicalPlanAdapter logicalPlanAdapter = 
(LogicalPlanAdapter) selectStmtLists.get(idx);
+                        Optional<UnboundRelation> unboundRelation = 
findUnboundRelation(
+                                logicalPlanAdapter.getLogicalPlan());
+                        tabletIds = unboundRelation.get().getTabletIds();
+                        
logicalPlanAdapter.getLogicalPlan().getLogicalProperties();

Review Comment:
   What's this for?



##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -356,25 +374,191 @@ private void generateQueryStmt() throws UserException {
         if (LOG.isDebugEnabled()) {
             for (int i = 0; i < selectStmtListPerParallel.size(); ++i) {
                 LOG.debug("ExportTaskExecutor {} is responsible for outfile:", 
i);
-                for (SelectStmt outfile : selectStmtListPerParallel.get(i)) {
+                for (StatementBase outfile : selectStmtListPerParallel.get(i)) 
{
                     LOG.debug("outfile sql: [{}]", outfile.toSql());
                 }
             }
         }
     }
 
-    private ArrayList<ArrayList<TableRef>> getTableRefListPerParallel() throws 
UserException {
-        ArrayList<ArrayList<Long>> tabletsListPerParallel = splitTablets();
+    private void generateLogicalPlanAdapter() throws UserException {
+        // generate 'select...into outfile' sql
+        generateOutfileSqlPerParallel();
+        // debug outfile sql
+        if (LOG.isDebugEnabled()) {
+            for (int i = 0; i < outfileSqlPerParallel.size(); ++i) {
+                LOG.debug("ExportTaskExecutor {} is responsible for outfile 
sql:", i);
+                for (String outfileSql : outfileSqlPerParallel.get(i)) {
+                    LOG.debug("outfile sql: [{}]", outfileSql);
+                }
+            }
+        }
+
+        outfileSqlPerParallel.stream().forEach(outfileSqlList -> {
+            List<StatementBase> logicalPlanAdapterList = Lists.newArrayList();
+            outfileSqlList.stream().forEach(outfileSql -> {

Review Comment:
   Looks like this is a common method to parse a string to a comment.
   I suggest to extract it a a Util class, so that others can reuse it.



##########
fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java:
##########
@@ -364,6 +364,7 @@ public static void loadJournal(Env env, Long logId, 
JournalEntity journal) {
                 case OperationType.OP_EXPORT_CREATE: {
                     ExportJob job = (ExportJob) journal.getData();
                     ExportMgr exportMgr = env.getExportMgr();
+                    job.cancelReplayedExportJob();

Review Comment:
   How about move this into `replayCreateExportJob()`?



##########
fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java:
##########
@@ -336,6 +338,7 @@ public class Env {
     private MetastoreEventsProcessor metastoreEventsProcessor;
 
     private PersistentJobRegister persistentJobRegister;
+    private TransientTaskRegister transientTaskRegister;

Review Comment:
   ExportTaskRegister



##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -653,6 +837,20 @@ public void replayExportJobState(ExportJobState newState) {
         setExportJobState(newState);
     }
 
+    /**
+     * If there are export which state is PENDING or EXPORTING or IN_QUEUE
+     * in checkpoint, we translate their state to CANCELLED.
+     *
+     * This function is only used in replay catalog phase.
+     */
+    public void cancelReplayedExportJob() {
+        if (state == ExportJobState.PENDING || state == 
ExportJobState.EXPORTING || state == ExportJobState.IN_QUEUE) {
+            String failMsg = "FE restarted or Master changed during exporting. 
Job must be cancelled.";

Review Comment:
   ```suggestion
               final String failMsg = "FE restarted or Master changed during 
exporting. Job must be cancelled.";
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java:
##########
@@ -172,7 +183,10 @@ public Map<String, Long> getPartitionToVersion() {
     // TODO(ftw): delete
     private List<SelectStmt> selectStmtList = Lists.newArrayList();
 
-    private List<List<SelectStmt>> selectStmtListPerParallel = 
Lists.newArrayList();
+    private List<List<StatementBase>> selectStmtListPerParallel = 
Lists.newArrayList();

Review Comment:
   Add comment to explain these fields



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to