This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5c9db1acfaa [Enhancement](Load) Nereids supports http_stream and group_commit with stream load (#31259) 5c9db1acfaa is described below commit 5c9db1acfaab8b2b4822b612022d13de3bbd37d0 Author: 赵硕 <1443539...@qq.com> AuthorDate: Fri Mar 8 13:47:29 2024 +0800 [Enhancement](Load) Nereids supports http_stream and group_commit with stream load (#31259) --- .../glue/translator/PhysicalPlanTranslator.java | 13 +- .../commands/insert/AbstractInsertExecutor.java | 13 ++ .../commands/insert/InsertIntoTableCommand.java | 15 ++- .../java/org/apache/doris/qe/ConnectContext.java | 9 ++ .../java/org/apache/doris/qe/HttpStreamParams.java | 33 +++++ .../java/org/apache/doris/qe/StmtExecutor.java | 140 +++++++++++++++++++++ .../apache/doris/service/FrontendServiceImpl.java | 100 +++++++-------- 7 files changed, 266 insertions(+), 57 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 7bb958f5117..e366521464c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -161,6 +161,7 @@ import org.apache.doris.planner.DataStreamSink; import org.apache.doris.planner.EmptySetNode; import org.apache.doris.planner.ExceptNode; import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.GroupCommitBlockSink; import org.apache.doris.planner.HashJoinNode; import org.apache.doris.planner.HashJoinNode.DistributionMode; import org.apache.doris.planner.IntersectNode; @@ -415,12 +416,20 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla slotDesc.setIsNullable(column.isAllowNull()); slotDesc.setAutoInc(column.isAutoInc()); } - OlapTableSink sink = new OlapTableSink( + OlapTableSink sink; + // This statement is only used in the group_commit mode in the http_stream + if (context.getConnectContext().isGroupCommitStreamLoadSql()) { + sink = new GroupCommitBlockSink(olapTableSink.getTargetTable(), olapTuple, + olapTableSink.getTargetTable().getPartitionIds(), olapTableSink.isSingleReplicaLoad(), + context.getSessionVariable().getGroupCommit(), 0); + } else { + sink = new OlapTableSink( olapTableSink.getTargetTable(), olapTuple, olapTableSink.getPartitionIds().isEmpty() ? null : olapTableSink.getPartitionIds(), olapTableSink.isSingleReplicaLoad() - ); + ); + } sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols); rootFragment.setSink(sink); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java index 2af6212808d..a2560fc1c3e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/AbstractInsertExecutor.java @@ -56,6 +56,7 @@ public abstract class AbstractInsertExecutor { protected final long createTime = System.currentTimeMillis(); protected long loadedRows = 0; protected int filteredRows = 0; + protected String errMsg = ""; protected Optional<InsertCommandContext> insertCtx; @@ -76,6 +77,18 @@ public abstract class AbstractInsertExecutor { return coordinator; } + public DatabaseIf getDatabase() { + return database; + } + + public TableIf getTable() { + return table; + } + + public String getLabelName() { + return labelName; + } + /** * begin transaction if necessary */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 29d96ae4ad9..5775b9d6ef0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -105,7 +105,12 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, runInternal(ctx, executor); } - private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception { + /** + * This function is used to generate the plan for Nereids. + * There are some load functions that only need to the plan, such as stream_load. + * Therefore, this section will be presented separately. + */ + public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor) throws Exception { if (!ctx.getSessionVariable().isEnableNereidsDML()) { try { ctx.getSessionVariable().enableFallbackToOriginalPlannerOnce(); @@ -152,7 +157,8 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, if (physicalSink instanceof PhysicalOlapTableSink) { if (GroupCommitInserter.groupCommit(ctx, sink, physicalSink)) { - return; + // return; + throw new AnalysisException("group commit is not supported in Nereids now"); } OlapTable olapTable = (OlapTable) targetTableIf; insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx); @@ -180,6 +186,11 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, // We exposed @StmtExecutor#cancel as a unified entry point for statement interruption, // so we need to set this here executor.setCoord(insertExecutor.getCoordinator()); + return insertExecutor; + } + + private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception { + AbstractInsertExecutor insertExecutor = initPlan(ctx, executor); insertExecutor.executeSingleInsert(executor, jobId); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 3e66374e429..fa212b83d6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -222,6 +222,7 @@ public class ConnectContext { private String workloadGroupName = ""; private Map<Long, Backend> insertGroupCommitTableToBeMap = new HashMap<>(); + private boolean isGroupCommitStreamLoadSql; private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL; @@ -1193,4 +1194,12 @@ public class ConnectContext { public int getNetWriteTimeout() { return this.sessionVariable.getNetWriteTimeout(); } + + public boolean isGroupCommitStreamLoadSql() { + return isGroupCommitStreamLoadSql; + } + + public void setGroupCommitStreamLoadSql(boolean groupCommitStreamLoadSql) { + isGroupCommitStreamLoadSql = groupCommitStreamLoadSql; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/HttpStreamParams.java b/fe/fe-core/src/main/java/org/apache/doris/qe/HttpStreamParams.java new file mode 100644 index 00000000000..7383392434b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/HttpStreamParams.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.qe; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.thrift.TExecPlanFragmentParams; + +import lombok.Data; + +@Data +public class HttpStreamParams { + private TExecPlanFragmentParams params; + private long txnId; + private DatabaseIf db; + private TableIf table; + private String label; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 05b85d230e2..c5b0e6fba94 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -113,6 +113,7 @@ import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.jdbc.client.JdbcClientException; +import org.apache.doris.datasource.tvf.source.TVFScanNode; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.LoadJobRowResult; import org.apache.doris.load.loadv2.LoadManager; @@ -138,10 +139,13 @@ import org.apache.doris.nereids.trees.plans.commands.NotAllowFallback; import org.apache.doris.nereids.trees.plans.commands.insert.BatchInsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand; import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; +import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.planner.GroupCommitPlanner; +import org.apache.doris.planner.GroupCommitScanNode; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OriginalPlanner; +import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; import org.apache.doris.proto.Data; @@ -189,6 +193,7 @@ import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import com.google.protobuf.ProtocolStringList; import lombok.Setter; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -2969,6 +2974,141 @@ public class StmtExecutor { return exprToType(parsedStmt.getResultExprs()); } + private HttpStreamParams generateHttpStreamNereidsPlan(TUniqueId queryId) { + LOG.info("TUniqueId: {} generate stream load plan", queryId); + context.setQueryId(queryId); + context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); + + parseByNereids(); + Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter, + "Nereids only process LogicalPlanAdapter, but parsedStmt is " + parsedStmt.getClass().getName()); + context.getState().setNereids(true); + InsertIntoTableCommand insert = (InsertIntoTableCommand) ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); + HttpStreamParams httpStreamParams = new HttpStreamParams(); + + try { + if (!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) { + if (!Config.wait_internal_group_commit_finish && insert.getLabelName().isPresent()) { + throw new AnalysisException("label and group_commit can't be set at the same time"); + } + context.setGroupCommitStreamLoadSql(true); + } + OlapInsertExecutor insertExecutor = (OlapInsertExecutor) insert.initPlan(context, this); + httpStreamParams.setTxnId(insertExecutor.getTxnId()); + httpStreamParams.setDb(insertExecutor.getDatabase()); + httpStreamParams.setTable(insertExecutor.getTable()); + httpStreamParams.setLabel(insertExecutor.getLabelName()); + + PlanNode planRoot = planner.getFragments().get(0).getPlanRoot(); + Preconditions.checkState(planRoot instanceof TVFScanNode || planRoot instanceof GroupCommitScanNode, + "Nereids' planNode cannot be converted to " + planRoot.getClass().getName()); + } catch (QueryStateException e) { + LOG.debug("Command(" + originStmt.originStmt + ") process failed.", e); + context.setState(e.getQueryState()); + throw new NereidsException("Command(" + originStmt.originStmt + ") process failed", + new AnalysisException(e.getMessage(), e)); + } catch (UserException e) { + // Return message to info client what happened. + LOG.debug("Command(" + originStmt.originStmt + ") process failed.", e); + context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); + throw new NereidsException("Command (" + originStmt.originStmt + ") process failed", + new AnalysisException(e.getMessage(), e)); + } catch (Exception e) { + // Maybe our bug + LOG.debug("Command (" + originStmt.originStmt + ") process failed.", e); + context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, e.getMessage()); + throw new NereidsException("Command (" + originStmt.originStmt + ") process failed.", + new AnalysisException(e.getMessage(), e)); + } + return httpStreamParams; + } + + private HttpStreamParams generateHttpStreamLegacyPlan(TUniqueId queryId) throws Exception { + // Due to executing Nereids, it needs to be reset + planner = null; + context.getState().setNereids(false); + context.setTxnEntry(null); + context.setQueryId(queryId); + context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); + SqlScanner input = new SqlScanner(new StringReader(originStmt.originStmt), + context.getSessionVariable().getSqlMode()); + SqlParser parser = new SqlParser(input); + parsedStmt = SqlParserUtils.getFirstStmt(parser); + if (!StringUtils.isEmpty(context.getSessionVariable().groupCommit)) { + if (!Config.wait_internal_group_commit_finish && ((NativeInsertStmt) parsedStmt).getLabel() != null) { + throw new AnalysisException("label and group_commit can't be set at the same time"); + } + ((NativeInsertStmt) parsedStmt).isGroupCommitStreamLoadSql = true; + } + NativeInsertStmt insertStmt = (NativeInsertStmt) parsedStmt; + analyze(context.getSessionVariable().toThrift()); + HttpStreamParams httpStreamParams = new HttpStreamParams(); + httpStreamParams.setTxnId(insertStmt.getTransactionId()); + httpStreamParams.setDb(insertStmt.getDbObj()); + httpStreamParams.setTable(insertStmt.getTargetTable()); + httpStreamParams.setLabel(insertStmt.getLabel()); + return httpStreamParams; + } + + public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Exception { + SessionVariable sessionVariable = context.getSessionVariable(); + HttpStreamParams httpStreamParams = null; + try { + if (sessionVariable.isEnableNereidsPlanner()) { + try { + httpStreamParams = generateHttpStreamNereidsPlan(queryId); + } catch (NereidsException | ParseException e) { + if (context.getMinidump() != null && context.getMinidump().toString(4) != null) { + MinidumpUtils.saveMinidumpString(context.getMinidump(), DebugUtil.printId(context.queryId())); + } + // try to fall back to legacy planner + if (LOG.isDebugEnabled()) { + LOG.debug("nereids cannot process statement\n" + originStmt.originStmt + + "\n because of " + e.getMessage(), e); + } + if (notAllowFallback()) { + LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); + throw ((NereidsException) e).getException(); + } + boolean isInsertIntoCommand = parsedStmt != null && parsedStmt instanceof LogicalPlanAdapter + && ((LogicalPlanAdapter) parsedStmt).getLogicalPlan() instanceof InsertIntoTableCommand; + if (e instanceof NereidsException + && !context.getSessionVariable().enableFallbackToOriginalPlanner + && !isInsertIntoCommand) { + LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); + throw ((NereidsException) e).getException(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("fall back to legacy planner on statement:\n{}", originStmt.originStmt); + } + // Attention: currently exception from nereids does not mean an Exception to user terminal + // unless user does not allow fallback to lagency planner. But state of query + // has already been set to Error in this case, it will have some side effect on profile result + // and audit log. So we need to reset state to OK if query cancel be processd by lagency. + context.getState().reset(); + context.getState().setNereids(false); + httpStreamParams = generateHttpStreamLegacyPlan(queryId); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + httpStreamParams = generateHttpStreamLegacyPlan(queryId); + } + } finally { + // revert Session Value + try { + VariableMgr.revertSessionValue(sessionVariable); + // origin value init + sessionVariable.setIsSingleSetVar(false); + sessionVariable.clearSessionOriginValue(); + } catch (DdlException e) { + LOG.warn("failed to revert Session value. {}", context.getQueryIdentifier(), e); + context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); + } + } + return httpStreamParams; + } + public SummaryProfile getSummaryProfile() { return profile.getSummaryProfile(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index f71f02b6823..eb62cd9c75a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -21,12 +21,9 @@ import org.apache.doris.analysis.AbstractBackupTableRefClause; import org.apache.doris.analysis.AddPartitionClause; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.LabelName; -import org.apache.doris.analysis.NativeInsertStmt; import org.apache.doris.analysis.PartitionExprUtil; import org.apache.doris.analysis.RestoreStmt; import org.apache.doris.analysis.SetType; -import org.apache.doris.analysis.SqlParser; -import org.apache.doris.analysis.SqlScanner; import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TableRef; import org.apache.doris.analysis.UserIdentity; @@ -36,7 +33,6 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.EnvFactory; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; @@ -69,7 +65,6 @@ import org.apache.doris.common.ThriftServerEventProcessor; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.annotation.LogException; -import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.Util; import org.apache.doris.cooldown.CooldownDelete; import org.apache.doris.datasource.CatalogIf; @@ -94,9 +89,9 @@ import org.apache.doris.qe.ConnectProcessor; import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.DdlExecutor; import org.apache.doris.qe.GlobalVariable; +import org.apache.doris.qe.HttpStreamParams; import org.apache.doris.qe.MasterCatalogExecutor; import org.apache.doris.qe.MysqlConnectProcessor; -import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QueryState; import org.apache.doris.qe.StmtExecutor; @@ -202,7 +197,6 @@ import org.apache.doris.thrift.TPrivilegeCtrl; import org.apache.doris.thrift.TPrivilegeHier; import org.apache.doris.thrift.TPrivilegeStatus; import org.apache.doris.thrift.TPrivilegeType; -import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.thrift.TQueryStatsResult; import org.apache.doris.thrift.TQueryType; import org.apache.doris.thrift.TReplicaInfo; @@ -256,7 +250,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; -import java.io.StringReader; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -2029,12 +2022,42 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } + private HttpStreamParams initHttpStreamPlan(TStreamLoadPutRequest request, ConnectContext ctx) + throws UserException { + String originStmt = request.getLoadSql(); + HttpStreamParams httpStreamParams; + try { + StmtExecutor executor = new StmtExecutor(ctx, originStmt); + ctx.setExecutor(executor); + httpStreamParams = executor.generateHttpStreamPlan(ctx.queryId()); + + Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx); + Coordinator coord = new Coordinator(ctx, analyzer, executor.planner()); + coord.setLoadMemLimit(request.getExecMemLimit()); + coord.setQueryType(TQueryType.LOAD); + TableIf table = httpStreamParams.getTable(); + if (table instanceof OlapTable) { + boolean isEnableMemtableOnSinkNode = + ((OlapTable) table).getTableProperty().getUseSchemaLightChange() + ? coord.getQueryOptions().isEnableMemtableOnSinkNode() : false; + coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); + } + httpStreamParams.setParams(coord.getStreamLoadPlan()); + } catch (UserException e) { + LOG.warn("exec sql error", e); + throw new UserException("exec sql error" + e); + } catch (Throwable e) { + LOG.warn("exec sql error catch unknown result.", e); + throw new UserException("exec sql error catch unknown result." + e); + } + return httpStreamParams; + } + private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result, ConnectContext ctx) throws UserException { if (LOG.isDebugEnabled()) { LOG.debug("receive http stream put request: {}", request); } - String originStmt = request.getLoadSql(); if (request.isSetAuthCode()) { // TODO(cmy): find a way to check } else if (Strings.isNullOrEmpty(request.getToken())) { @@ -2047,55 +2070,26 @@ public class FrontendServiceImpl implements FrontendService.Iface { } else { ctx.getSessionVariable().enableMemtableOnSinkNode = Config.stream_load_default_memtable_on_sink_node; } - SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode()); - SqlParser parser = new SqlParser(input); + ctx.getSessionVariable().groupCommit = request.getGroupCommitMode(); try { - NativeInsertStmt parsedStmt = (NativeInsertStmt) SqlParserUtils.getFirstStmt(parser); - parsedStmt.setOrigStmt(new OriginStatement(originStmt, 0)); - parsedStmt.setUserInfo(ctx.getCurrentUserIdentity()); - if (!StringUtils.isEmpty(request.getGroupCommitMode())) { - if (!Config.wait_internal_group_commit_finish && parsedStmt.getLabel() != null) { - throw new AnalysisException("label and group_commit can't be set at the same time"); - } - ctx.getSessionVariable().groupCommit = request.getGroupCommitMode(); - parsedStmt.isGroupCommitStreamLoadSql = true; - } - StmtExecutor executor = new StmtExecutor(ctx, parsedStmt); - ctx.setExecutor(executor); - TQueryOptions tQueryOptions = ctx.getSessionVariable().toThrift(); - executor.analyze(tQueryOptions); - Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx); - Coordinator coord = EnvFactory.getInstance().createCoordinator(ctx, analyzer, executor.planner(), null); - coord.setLoadMemLimit(request.getExecMemLimit()); - coord.setQueryType(TQueryType.LOAD); - Table table = parsedStmt.getTargetTable(); - if (table instanceof OlapTable) { - boolean isEnableMemtableOnSinkNode = - ((OlapTable) table).getTableProperty().getUseSchemaLightChange() - ? coord.getQueryOptions().isEnableMemtableOnSinkNode() : false; - coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); - } - - TExecPlanFragmentParams plan = coord.getStreamLoadPlan(); + HttpStreamParams httpStreamParams = initHttpStreamPlan(request, ctx); int loadStreamPerNode = 20; if (request.getStreamPerNode() > 0) { loadStreamPerNode = request.getStreamPerNode(); } - plan.setLoadStreamPerNode(loadStreamPerNode); - plan.setTotalLoadStreams(loadStreamPerNode); - plan.setNumLocalSink(1); - final long txn_id = parsedStmt.getTransactionId(); - result.setParams(plan); - result.getParams().setDbName(parsedStmt.getDbName()); - result.getParams().setTableName(parsedStmt.getTbl()); - // The txn_id here is obtained from the NativeInsertStmt - result.getParams().setTxnConf(new TTxnParams().setTxnId(txn_id)); - result.getParams().setImportLabel(parsedStmt.getLabel()); - result.setDbId(table.getDatabase().getId()); - result.setTableId(table.getId()); - result.setBaseSchemaVersion(((OlapTable) table).getBaseSchemaVersion()); - result.setGroupCommitIntervalMs(((OlapTable) table).getGroupCommitIntervalMs()); - result.setGroupCommitDataBytes(((OlapTable) table).getGroupCommitDataBytes()); + httpStreamParams.getParams().setLoadStreamPerNode(loadStreamPerNode); + httpStreamParams.getParams().setTotalLoadStreams(loadStreamPerNode); + httpStreamParams.getParams().setNumLocalSink(1); + result.setParams(httpStreamParams.getParams()); + result.getParams().setDbName(httpStreamParams.getDb().getFullName()); + result.getParams().setTableName(httpStreamParams.getTable().getName()); + result.getParams().setTxnConf(new TTxnParams().setTxnId(httpStreamParams.getTxnId())); + result.getParams().setImportLabel(httpStreamParams.getLabel()); + result.setDbId(httpStreamParams.getDb().getId()); + result.setTableId(httpStreamParams.getTable().getId()); + result.setBaseSchemaVersion(((OlapTable) httpStreamParams.getTable()).getBaseSchemaVersion()); + result.setGroupCommitIntervalMs(((OlapTable) httpStreamParams.getTable()).getGroupCommitIntervalMs()); + result.setGroupCommitDataBytes(((OlapTable) httpStreamParams.getTable()).getGroupCommitDataBytes()); result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish); } catch (UserException e) { LOG.warn("exec sql error", e); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org