morrySnow commented on code in PR #31259:
URL: https://github.com/apache/doris/pull/31259#discussion_r1500561785


##########
fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java:
##########
@@ -2921,6 +2926,92 @@ public List<Type> getReturnTypes() {
         return exprToType(parsedStmt.getResultExprs());
     }
 
+    private void generateStreamLoadNereidsPlan(TUniqueId queryId) {
+        LOG.info("TUniqueId: {} generate stream load plan", queryId);
+        context.setQueryId(queryId);
+        context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
+
+        parseByNereids();
+        Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter,
+                "Nereids only process LogicalPlanAdapter, but parsedStmt is " 
+ parsedStmt.getClass().getName());
+        context.getState().setNereids(true);
+        InsertIntoTableCommand insert = (InsertIntoTableCommand) 
((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
+        try {
+            if 
(!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
+                if (!Config.wait_internal_group_commit_finish && 
insert.getLabelName().isPresent()) {
+                    throw new AnalysisException("label and group_commit can't 
be set at the same time");
+                }
+                context.setGroupCommitStreamLoadSql(true);
+            }
+            InsertExecutor insertExecutor = insert.initPlan(context, this);
+            context.getExecutor().setPlanner(this.planner);
+            if (context.getTxnEntry() == null) {
+                TransactionEntry transactionEntry =
+                        new TransactionEntry(new 
TTxnParams().setTxnId(insertExecutor.getTxnId()),
+                            insertExecutor.getDatabase(), 
insertExecutor.getTable());
+                transactionEntry.setLabel(insertExecutor.getLabelName());
+                context.setTxnEntry(transactionEntry);
+            }
+            PlanNode planRoot = planner.getFragments().get(0).getPlanRoot();
+            Preconditions.checkState(planRoot instanceof TVFScanNode || 
planRoot instanceof GroupCommitScanNode,
+                    "Nereids' planNode cannot be converted to " + 
planRoot.getClass().getName());
+        } catch (QueryStateException e) {
+            LOG.debug("Command(" + originStmt.originStmt + ") process 
failed.", e);
+            context.setState(e.getQueryState());
+            throw new NereidsException("Command(" + originStmt.originStmt + ") 
process failed",
+                    new AnalysisException(e.getMessage(), e));
+        } catch (UserException e) {
+            // Return message to info client what happened.
+            LOG.debug("Command(" + originStmt.originStmt + ") process 
failed.", e);
+            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
+            throw new NereidsException("Command (" + originStmt.originStmt + 
") process failed",
+                    new AnalysisException(e.getMessage(), e));
+        } catch (Exception e) {
+            // Maybe our bug
+            LOG.debug("Command (" + originStmt.originStmt + ") process 
failed.", e);
+            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
e.getMessage());
+            throw new NereidsException("Command (" + originStmt.originStmt + 
") process failed.",
+                    new AnalysisException(e.getMessage(), e));
+        }
+    }
+
+    private void generateStreamLoadLegacyPlan(TUniqueId queryId) throws 
Exception {
+        // Due to executing Nereids, it needs to be reset
+        planner = null;
+        context.getState().setNereids(false);
+        context.setTxnEntry(null);
+        context.setQueryId(queryId);
+        context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
+        SqlScanner input = new SqlScanner(new 
StringReader(originStmt.originStmt),
+                context.getSessionVariable().getSqlMode());
+        SqlParser parser = new SqlParser(input);
+        parsedStmt = SqlParserUtils.getFirstStmt(parser);
+        if (!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
+            if (!Config.wait_internal_group_commit_finish && 
((NativeInsertStmt) parsedStmt).getLabel() != null) {
+                throw new AnalysisException("label and group_commit can't be 
set at the same time");
+            }
+            ((NativeInsertStmt) parsedStmt).isGroupCommitStreamLoadSql = true;
+        }
+        analyze(context.getSessionVariable().toThrift());
+    }
+
+    public void generateStreamLoadPlan(TUniqueId queryId) throws Exception {
+        try {
+            generateStreamLoadNereidsPlan(queryId);

Review Comment:
    should use nereids planner only if session variable 
enable_nereids_planner=true and enable_nereids_dml=true. if we cannot get 
session level variable, we should use global level variable



##########
fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java:
##########
@@ -2921,6 +2926,92 @@ public List<Type> getReturnTypes() {
         return exprToType(parsedStmt.getResultExprs());
     }
 
+    private void generateStreamLoadNereidsPlan(TUniqueId queryId) {
+        LOG.info("TUniqueId: {} generate stream load plan", queryId);
+        context.setQueryId(queryId);
+        context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
+
+        parseByNereids();
+        Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter,
+                "Nereids only process LogicalPlanAdapter, but parsedStmt is " 
+ parsedStmt.getClass().getName());
+        context.getState().setNereids(true);
+        InsertIntoTableCommand insert = (InsertIntoTableCommand) 
((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
+        try {
+            if 
(!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
+                if (!Config.wait_internal_group_commit_finish && 
insert.getLabelName().isPresent()) {
+                    throw new AnalysisException("label and group_commit can't 
be set at the same time");
+                }
+                context.setGroupCommitStreamLoadSql(true);
+            }
+            InsertExecutor insertExecutor = insert.initPlan(context, this);
+            context.getExecutor().setPlanner(this.planner);
+            if (context.getTxnEntry() == null) {
+                TransactionEntry transactionEntry =
+                        new TransactionEntry(new 
TTxnParams().setTxnId(insertExecutor.getTxnId()),
+                            insertExecutor.getDatabase(), 
insertExecutor.getTable());
+                transactionEntry.setLabel(insertExecutor.getLabelName());
+                context.setTxnEntry(transactionEntry);
+            }
+            PlanNode planRoot = planner.getFragments().get(0).getPlanRoot();
+            Preconditions.checkState(planRoot instanceof TVFScanNode || 
planRoot instanceof GroupCommitScanNode,
+                    "Nereids' planNode cannot be converted to " + 
planRoot.getClass().getName());
+        } catch (QueryStateException e) {
+            LOG.debug("Command(" + originStmt.originStmt + ") process 
failed.", e);
+            context.setState(e.getQueryState());
+            throw new NereidsException("Command(" + originStmt.originStmt + ") 
process failed",
+                    new AnalysisException(e.getMessage(), e));
+        } catch (UserException e) {
+            // Return message to info client what happened.
+            LOG.debug("Command(" + originStmt.originStmt + ") process 
failed.", e);
+            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
+            throw new NereidsException("Command (" + originStmt.originStmt + 
") process failed",
+                    new AnalysisException(e.getMessage(), e));
+        } catch (Exception e) {
+            // Maybe our bug
+            LOG.debug("Command (" + originStmt.originStmt + ") process 
failed.", e);
+            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
e.getMessage());
+            throw new NereidsException("Command (" + originStmt.originStmt + 
") process failed.",
+                    new AnalysisException(e.getMessage(), e));
+        }
+    }
+
+    private void generateStreamLoadLegacyPlan(TUniqueId queryId) throws 
Exception {
+        // Due to executing Nereids, it needs to be reset
+        planner = null;
+        context.getState().setNereids(false);
+        context.setTxnEntry(null);
+        context.setQueryId(queryId);
+        context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
+        SqlScanner input = new SqlScanner(new 
StringReader(originStmt.originStmt),
+                context.getSessionVariable().getSqlMode());
+        SqlParser parser = new SqlParser(input);
+        parsedStmt = SqlParserUtils.getFirstStmt(parser);
+        if (!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
+            if (!Config.wait_internal_group_commit_finish && 
((NativeInsertStmt) parsedStmt).getLabel() != null) {
+                throw new AnalysisException("label and group_commit can't be 
set at the same time");
+            }
+            ((NativeInsertStmt) parsedStmt).isGroupCommitStreamLoadSql = true;
+        }
+        analyze(context.getSessionVariable().toThrift());
+    }
+
+    public void generateStreamLoadPlan(TUniqueId queryId) throws Exception {
+        try {
+            generateStreamLoadNereidsPlan(queryId);
+        } catch (NereidsException | ParseException e) {
+            if (context.getMinidump() != null) {
+                MinidumpUtils.saveMinidumpString(context.getMinidump(), 
DebugUtil.printId(context.queryId()));
+            }
+            // try to fall back to legacy planner
+            LOG.debug("nereids cannot process statement\n" + 
originStmt.originStmt
+                    + "\n because of " + e.getMessage(), e);
+            LOG.debug("fall back to legacy planner on statement:\n{}", 
originStmt.originStmt);

Review Comment:
   we should check enable_fallback_to_original_planner == true before fall back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to