This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c9db1acfaa [Enhancement](Load) Nereids supports http_stream and 
group_commit with stream load (#31259)
5c9db1acfaa is described below

commit 5c9db1acfaab8b2b4822b612022d13de3bbd37d0
Author: 赵硕 <1443539...@qq.com>
AuthorDate: Fri Mar 8 13:47:29 2024 +0800

    [Enhancement](Load) Nereids supports http_stream and group_commit with 
stream load (#31259)
---
 .../glue/translator/PhysicalPlanTranslator.java    |  13 +-
 .../commands/insert/AbstractInsertExecutor.java    |  13 ++
 .../commands/insert/InsertIntoTableCommand.java    |  15 ++-
 .../java/org/apache/doris/qe/ConnectContext.java   |   9 ++
 .../java/org/apache/doris/qe/HttpStreamParams.java |  33 +++++
 .../java/org/apache/doris/qe/StmtExecutor.java     | 140 +++++++++++++++++++++
 .../apache/doris/service/FrontendServiceImpl.java  | 100 +++++++--------
 7 files changed, 266 insertions(+), 57 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 7bb958f5117..e366521464c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -161,6 +161,7 @@ import org.apache.doris.planner.DataStreamSink;
 import org.apache.doris.planner.EmptySetNode;
 import org.apache.doris.planner.ExceptNode;
 import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.GroupCommitBlockSink;
 import org.apache.doris.planner.HashJoinNode;
 import org.apache.doris.planner.HashJoinNode.DistributionMode;
 import org.apache.doris.planner.IntersectNode;
@@ -415,12 +416,20 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             slotDesc.setIsNullable(column.isAllowNull());
             slotDesc.setAutoInc(column.isAutoInc());
         }
-        OlapTableSink sink = new OlapTableSink(
+        OlapTableSink sink;
+        // This statement is only used in the group_commit mode in the 
http_stream
+        if (context.getConnectContext().isGroupCommitStreamLoadSql()) {
+            sink = new GroupCommitBlockSink(olapTableSink.getTargetTable(), 
olapTuple,
+                olapTableSink.getTargetTable().getPartitionIds(), 
olapTableSink.isSingleReplicaLoad(),
+                context.getSessionVariable().getGroupCommit(), 0);
+        } else {
+            sink = new OlapTableSink(
                 olapTableSink.getTargetTable(),
                 olapTuple,
                 olapTableSink.getPartitionIds().isEmpty() ? null : 
olapTableSink.getPartitionIds(),
                 olapTableSink.isSingleReplicaLoad()
-        );
+            );
+        }
         sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols);
         rootFragment.setSink(sink);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
index 2af6212808d..a2560fc1c3e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java
@@ -56,6 +56,7 @@ public abstract class AbstractInsertExecutor {
     protected final long createTime = System.currentTimeMillis();
     protected long loadedRows = 0;
     protected int filteredRows = 0;
+
     protected String errMsg = "";
     protected Optional<InsertCommandContext> insertCtx;
 
@@ -76,6 +77,18 @@ public abstract class AbstractInsertExecutor {
         return coordinator;
     }
 
+    public DatabaseIf getDatabase() {
+        return database;
+    }
+
+    public TableIf getTable() {
+        return table;
+    }
+
+    public String getLabelName() {
+        return labelName;
+    }
+
     /**
      * begin transaction if necessary
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 29d96ae4ad9..5775b9d6ef0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -105,7 +105,12 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         runInternal(ctx, executor);
     }
 
-    private void runInternal(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+    /**
+     * This function is used to generate the plan for Nereids.
+     * There are some load functions that only need to the plan, such as 
stream_load.
+     * Therefore, this section will be presented separately.
+     */
+    public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor 
executor) throws Exception {
         if (!ctx.getSessionVariable().isEnableNereidsDML()) {
             try {
                 ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce();
@@ -152,7 +157,8 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
 
             if (physicalSink instanceof PhysicalOlapTableSink) {
                 if (GroupCommitInserter.groupCommit(ctx, sink, physicalSink)) {
-                    return;
+                    // return;
+                    throw new AnalysisException("group commit is not supported 
in Nereids now");
                 }
                 OlapTable olapTable = (OlapTable) targetTableIf;
                 insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, 
planner, insertCtx);
@@ -180,6 +186,11 @@ public class InsertIntoTableCommand extends Command 
implements ForwardWithSync,
         // We exposed @StmtExecutor#cancel as a unified entry point for 
statement interruption,
         // so we need to set this here
         executor.setCoord(insertExecutor.getCoordinator());
+        return insertExecutor;
+    }
+
+    private void runInternal(ConnectContext ctx, StmtExecutor executor) throws 
Exception {
+        AbstractInsertExecutor insertExecutor = initPlan(ctx, executor);
         insertExecutor.executeSingleInsert(executor, jobId);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 3e66374e429..fa212b83d6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -222,6 +222,7 @@ public class ConnectContext {
 
     private String workloadGroupName = "";
     private Map<Long, Backend> insertGroupCommitTableToBeMap = new HashMap<>();
+    private boolean isGroupCommitStreamLoadSql;
 
     private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL;
 
@@ -1193,4 +1194,12 @@ public class ConnectContext {
     public int getNetWriteTimeout() {
         return this.sessionVariable.getNetWriteTimeout();
     }
+
+    public boolean isGroupCommitStreamLoadSql() {
+        return isGroupCommitStreamLoadSql;
+    }
+
+    public void setGroupCommitStreamLoadSql(boolean groupCommitStreamLoadSql) {
+        isGroupCommitStreamLoadSql = groupCommitStreamLoadSql;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/HttpStreamParams.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/HttpStreamParams.java
new file mode 100644
index 00000000000..7383392434b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/HttpStreamParams.java
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.thrift.TExecPlanFragmentParams;
+
+import lombok.Data;
+
+@Data
+public class HttpStreamParams {
+    private TExecPlanFragmentParams params;
+    private long txnId;
+    private DatabaseIf db;
+    private TableIf table;
+    private String label;
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 05b85d230e2..c5b0e6fba94 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -113,6 +113,7 @@ import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.jdbc.client.JdbcClientException;
+import org.apache.doris.datasource.tvf.source.TVFScanNode;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.LoadJobRowResult;
 import org.apache.doris.load.loadv2.LoadManager;
@@ -138,10 +139,13 @@ import 
org.apache.doris.nereids.trees.plans.commands.NotAllowFallback;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.planner.GroupCommitPlanner;
+import org.apache.doris.planner.GroupCommitScanNode;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.OriginalPlanner;
+import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.Planner;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.proto.Data;
@@ -189,6 +193,7 @@ import com.google.common.collect.Sets;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.ProtocolStringList;
 import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -2969,6 +2974,141 @@ public class StmtExecutor {
         return exprToType(parsedStmt.getResultExprs());
     }
 
+    private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) {
+        LOG.info("TUniqueId: {} generate stream load plan", queryId);
+        context.setQueryId(queryId);
+        context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
+
+        parseByNereids();
+        Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter,
+                "Nereids only process LogicalPlanAdapter, but parsedStmt is " 
+ parsedStmt.getClass().getName());
+        context.getState().setNereids(true);
+        InsertIntoTableCommand insert = (InsertIntoTableCommand) 
((LogicalPlanAdapter) parsedStmt).getLogicalPlan();
+        HttpStreamParams httpStreamParams = new HttpStreamParams();
+
+        try {
+            if 
(!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
+                if (!Config.wait_internal_group_commit_finish && 
insert.getLabelName().isPresent()) {
+                    throw new AnalysisException("label and group_commit can't 
be set at the same time");
+                }
+                context.setGroupCommitStreamLoadSql(true);
+            }
+            OlapInsertExecutor insertExecutor = (OlapInsertExecutor) 
insert.initPlan(context, this);
+            httpStreamParams.setTxnId(insertExecutor.getTxnId());
+            httpStreamParams.setDb(insertExecutor.getDatabase());
+            httpStreamParams.setTable(insertExecutor.getTable());
+            httpStreamParams.setLabel(insertExecutor.getLabelName());
+
+            PlanNode planRoot = planner.getFragments().get(0).getPlanRoot();
+            Preconditions.checkState(planRoot instanceof TVFScanNode || 
planRoot instanceof GroupCommitScanNode,
+                    "Nereids' planNode cannot be converted to " + 
planRoot.getClass().getName());
+        } catch (QueryStateException e) {
+            LOG.debug("Command(" + originStmt.originStmt + ") process 
failed.", e);
+            context.setState(e.getQueryState());
+            throw new NereidsException("Command(" + originStmt.originStmt + ") 
process failed",
+                    new AnalysisException(e.getMessage(), e));
+        } catch (UserException e) {
+            // Return message to info client what happened.
+            LOG.debug("Command(" + originStmt.originStmt + ") process 
failed.", e);
+            context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
+            throw new NereidsException("Command (" + originStmt.originStmt + 
") process failed",
+                    new AnalysisException(e.getMessage(), e));
+        } catch (Exception e) {
+            // Maybe our bug
+            LOG.debug("Command (" + originStmt.originStmt + ") process 
failed.", e);
+            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
e.getMessage());
+            throw new NereidsException("Command (" + originStmt.originStmt + 
") process failed.",
+                    new AnalysisException(e.getMessage(), e));
+        }
+        return httpStreamParams;
+    }
+
+    private HttpStreamParams generateHttpStreamLegacyPlan(TUniqueId queryId) 
throws Exception {
+        // Due to executing Nereids, it needs to be reset
+        planner = null;
+        context.getState().setNereids(false);
+        context.setTxnEntry(null);
+        context.setQueryId(queryId);
+        context.setStmtId(STMT_ID_GENERATOR.incrementAndGet());
+        SqlScanner input = new SqlScanner(new 
StringReader(originStmt.originStmt),
+                context.getSessionVariable().getSqlMode());
+        SqlParser parser = new SqlParser(input);
+        parsedStmt = SqlParserUtils.getFirstStmt(parser);
+        if (!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) {
+            if (!Config.wait_internal_group_commit_finish && 
((NativeInsertStmt) parsedStmt).getLabel() != null) {
+                throw new AnalysisException("label and group_commit can't be 
set at the same time");
+            }
+            ((NativeInsertStmt) parsedStmt).isGroupCommitStreamLoadSql = true;
+        }
+        NativeInsertStmt insertStmt = (NativeInsertStmt) parsedStmt;
+        analyze(context.getSessionVariable().toThrift());
+        HttpStreamParams httpStreamParams = new HttpStreamParams();
+        httpStreamParams.setTxnId(insertStmt.getTransactionId());
+        httpStreamParams.setDb(insertStmt.getDbObj());
+        httpStreamParams.setTable(insertStmt.getTargetTable());
+        httpStreamParams.setLabel(insertStmt.getLabel());
+        return httpStreamParams;
+    }
+
+    public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws 
Exception {
+        SessionVariable sessionVariable = context.getSessionVariable();
+        HttpStreamParams httpStreamParams = null;
+        try {
+            if (sessionVariable.isEnableNereidsPlanner()) {
+                try {
+                    httpStreamParams = generateHttpStreamNereidsPlan(queryId);
+                } catch (NereidsException | ParseException e) {
+                    if (context.getMinidump() != null && 
context.getMinidump().toString(4) != null) {
+                        
MinidumpUtils.saveMinidumpString(context.getMinidump(), 
DebugUtil.printId(context.queryId()));
+                    }
+                    // try to fall back to legacy planner
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("nereids cannot process statement\n" + 
originStmt.originStmt
+                                + "\n because of " + e.getMessage(), e);
+                    }
+                    if (notAllowFallback()) {
+                        LOG.warn("Analyze failed. {}", 
context.getQueryIdentifier(), e);
+                        throw ((NereidsException) e).getException();
+                    }
+                    boolean isInsertIntoCommand = parsedStmt != null && 
parsedStmt instanceof LogicalPlanAdapter
+                            && ((LogicalPlanAdapter) 
parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand;
+                    if (e instanceof NereidsException
+                                && 
!context.getSessionVariable().enableFallbackToOriginalPlanner
+                                && !isInsertIntoCommand) {
+                        LOG.warn("Analyze failed. {}", 
context.getQueryIdentifier(), e);
+                        throw ((NereidsException) e).getException();
+                    }
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("fall back to legacy planner on 
statement:\n{}", originStmt.originStmt);
+                    }
+                    // Attention: currently exception from nereids does not 
mean an Exception to user terminal
+                    // unless user does not allow fallback to lagency planner. 
But state of query
+                    // has already been set to Error in this case, it will 
have some side effect on profile result
+                    // and audit log. So we need to reset state to OK if query 
cancel be processd by lagency.
+                    context.getState().reset();
+                    context.getState().setNereids(false);
+                    httpStreamParams = generateHttpStreamLegacyPlan(queryId);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            } else {
+                httpStreamParams = generateHttpStreamLegacyPlan(queryId);
+            }
+        } finally {
+            // revert Session Value
+            try {
+                VariableMgr.revertSessionValue(sessionVariable);
+                // origin value init
+                sessionVariable.setIsSingleSetVar(false);
+                sessionVariable.clearSessionOriginValue();
+            } catch (DdlException e) {
+                LOG.warn("failed to revert Session value. {}", 
context.getQueryIdentifier(), e);
+                context.getState().setError(e.getMysqlErrorCode(), 
e.getMessage());
+            }
+        }
+        return httpStreamParams;
+    }
+
     public SummaryProfile getSummaryProfile() {
         return profile.getSummaryProfile();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f71f02b6823..eb62cd9c75a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -21,12 +21,9 @@ import 
org.apache.doris.analysis.AbstractBackupTableRefClause;
 import org.apache.doris.analysis.AddPartitionClause;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.LabelName;
-import org.apache.doris.analysis.NativeInsertStmt;
 import org.apache.doris.analysis.PartitionExprUtil;
 import org.apache.doris.analysis.RestoreStmt;
 import org.apache.doris.analysis.SetType;
-import org.apache.doris.analysis.SqlParser;
-import org.apache.doris.analysis.SqlScanner;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.UserIdentity;
@@ -36,7 +33,6 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.EnvFactory;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
@@ -69,7 +65,6 @@ import org.apache.doris.common.ThriftServerEventProcessor;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.annotation.LogException;
-import org.apache.doris.common.util.SqlParserUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.cooldown.CooldownDelete;
 import org.apache.doris.datasource.CatalogIf;
@@ -94,9 +89,9 @@ import org.apache.doris.qe.ConnectProcessor;
 import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.DdlExecutor;
 import org.apache.doris.qe.GlobalVariable;
+import org.apache.doris.qe.HttpStreamParams;
 import org.apache.doris.qe.MasterCatalogExecutor;
 import org.apache.doris.qe.MysqlConnectProcessor;
-import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.QeProcessorImpl;
 import org.apache.doris.qe.QueryState;
 import org.apache.doris.qe.StmtExecutor;
@@ -202,7 +197,6 @@ import org.apache.doris.thrift.TPrivilegeCtrl;
 import org.apache.doris.thrift.TPrivilegeHier;
 import org.apache.doris.thrift.TPrivilegeStatus;
 import org.apache.doris.thrift.TPrivilegeType;
-import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TQueryStatsResult;
 import org.apache.doris.thrift.TQueryType;
 import org.apache.doris.thrift.TReplicaInfo;
@@ -256,7 +250,6 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
-import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -2029,12 +2022,42 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
+    private HttpStreamParams initHttpStreamPlan(TStreamLoadPutRequest request, 
ConnectContext ctx)
+            throws UserException {
+        String originStmt = request.getLoadSql();
+        HttpStreamParams httpStreamParams;
+        try {
+            StmtExecutor executor = new StmtExecutor(ctx, originStmt);
+            ctx.setExecutor(executor);
+            httpStreamParams = executor.generateHttpStreamPlan(ctx.queryId());
+
+            Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
+            Coordinator coord = new Coordinator(ctx, analyzer, 
executor.planner());
+            coord.setLoadMemLimit(request.getExecMemLimit());
+            coord.setQueryType(TQueryType.LOAD);
+            TableIf table = httpStreamParams.getTable();
+            if (table instanceof OlapTable) {
+                boolean isEnableMemtableOnSinkNode =
+                        ((OlapTable) 
table).getTableProperty().getUseSchemaLightChange()
+                            ? 
coord.getQueryOptions().isEnableMemtableOnSinkNode() : false;
+                
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
+            }
+            httpStreamParams.setParams(coord.getStreamLoadPlan());
+        } catch (UserException e) {
+            LOG.warn("exec sql error", e);
+            throw new UserException("exec sql error" + e);
+        } catch (Throwable e) {
+            LOG.warn("exec sql error catch unknown result.", e);
+            throw new UserException("exec sql error catch unknown result." + 
e);
+        }
+        return httpStreamParams;
+    }
+
     private void httpStreamPutImpl(TStreamLoadPutRequest request, 
TStreamLoadPutResult result, ConnectContext ctx)
             throws UserException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("receive http stream put request: {}", request);
         }
-        String originStmt = request.getLoadSql();
         if (request.isSetAuthCode()) {
             // TODO(cmy): find a way to check
         } else if (Strings.isNullOrEmpty(request.getToken())) {
@@ -2047,55 +2070,26 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         } else {
             ctx.getSessionVariable().enableMemtableOnSinkNode = 
Config.stream_load_default_memtable_on_sink_node;
         }
-        SqlScanner input = new SqlScanner(new StringReader(originStmt), 
ctx.getSessionVariable().getSqlMode());
-        SqlParser parser = new SqlParser(input);
+        ctx.getSessionVariable().groupCommit = request.getGroupCommitMode();
         try {
-            NativeInsertStmt parsedStmt = (NativeInsertStmt) 
SqlParserUtils.getFirstStmt(parser);
-            parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0));
-            parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
-            if (!StringUtils.isEmpty(request.getGroupCommitMode())) {
-                if (!Config.wait_internal_group_commit_finish && 
parsedStmt.getLabel() != null) {
-                    throw new AnalysisException("label and group_commit can't 
be set at the same time");
-                }
-                ctx.getSessionVariable().groupCommit = 
request.getGroupCommitMode();
-                parsedStmt.isGroupCommitStreamLoadSql = true;
-            }
-            StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
-            ctx.setExecutor(executor);
-            TQueryOptions tQueryOptions = ctx.getSessionVariable().toThrift();
-            executor.analyze(tQueryOptions);
-            Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
-            Coordinator coord =  
EnvFactory.getInstance().createCoordinator(ctx, analyzer, executor.planner(), 
null);
-            coord.setLoadMemLimit(request.getExecMemLimit());
-            coord.setQueryType(TQueryType.LOAD);
-            Table table = parsedStmt.getTargetTable();
-            if (table instanceof OlapTable) {
-                boolean isEnableMemtableOnSinkNode =
-                        ((OlapTable) 
table).getTableProperty().getUseSchemaLightChange()
-                                ? 
coord.getQueryOptions().isEnableMemtableOnSinkNode() : false;
-                
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
-            }
-
-            TExecPlanFragmentParams plan = coord.getStreamLoadPlan();
+            HttpStreamParams httpStreamParams = initHttpStreamPlan(request, 
ctx);
             int loadStreamPerNode = 20;
             if (request.getStreamPerNode() > 0) {
                 loadStreamPerNode = request.getStreamPerNode();
             }
-            plan.setLoadStreamPerNode(loadStreamPerNode);
-            plan.setTotalLoadStreams(loadStreamPerNode);
-            plan.setNumLocalSink(1);
-            final long txn_id = parsedStmt.getTransactionId();
-            result.setParams(plan);
-            result.getParams().setDbName(parsedStmt.getDbName());
-            result.getParams().setTableName(parsedStmt.getTbl());
-            // The txn_id here is obtained from the NativeInsertStmt
-            result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id));
-            result.getParams().setImportLabel(parsedStmt.getLabel());
-            result.setDbId(table.getDatabase().getId());
-            result.setTableId(table.getId());
-            result.setBaseSchemaVersion(((OlapTable) 
table).getBaseSchemaVersion());
-            result.setGroupCommitIntervalMs(((OlapTable) 
table).getGroupCommitIntervalMs());
-            result.setGroupCommitDataBytes(((OlapTable) 
table).getGroupCommitDataBytes());
+            
httpStreamParams.getParams().setLoadStreamPerNode(loadStreamPerNode);
+            
httpStreamParams.getParams().setTotalLoadStreams(loadStreamPerNode);
+            httpStreamParams.getParams().setNumLocalSink(1);
+            result.setParams(httpStreamParams.getParams());
+            
result.getParams().setDbName(httpStreamParams.getDb().getFullName());
+            
result.getParams().setTableName(httpStreamParams.getTable().getName());
+            result.getParams().setTxnConf(new 
TTxnParams().setTxnId(httpStreamParams.getTxnId()));
+            result.getParams().setImportLabel(httpStreamParams.getLabel());
+            result.setDbId(httpStreamParams.getDb().getId());
+            result.setTableId(httpStreamParams.getTable().getId());
+            result.setBaseSchemaVersion(((OlapTable) 
httpStreamParams.getTable()).getBaseSchemaVersion());
+            result.setGroupCommitIntervalMs(((OlapTable) 
httpStreamParams.getTable()).getGroupCommitIntervalMs());
+            result.setGroupCommitDataBytes(((OlapTable) 
httpStreamParams.getTable()).getGroupCommitDataBytes());
             
result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish);
         } catch (UserException e) {
             LOG.warn("exec sql error", e);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to