This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch 2.1-tmp in repository https://gitbox.apache.org/repos/asf/doris.git
commit 67bb5196137820a766568c520e9faa926166b2c1 Author: feiniaofeiafei <53502832+feiniaofeia...@users.noreply.github.com> AuthorDate: Wed Apr 3 15:31:31 2024 +0800 [Fix](nereids) forward the user define variables to master (#33013) --- .../org/apache/doris/analysis/LiteralExpr.java | 20 +++++++++++++++++ .../java/org/apache/doris/qe/ConnectContext.java | 8 +++++++ .../java/org/apache/doris/qe/ConnectProcessor.java | 25 +++++++++++++++++++++- .../java/org/apache/doris/qe/MasterOpExecutor.java | 17 ++++++++++++++- gensrc/thrift/FrontendService.thrift | 3 ++- 5 files changed, 70 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java index 0814235f0a3..eb6fedb7b50 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java @@ -26,6 +26,8 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.NotImplementedException; import org.apache.doris.mysql.MysqlProto; +import org.apache.doris.thrift.TExprNode; +import org.apache.doris.thrift.TExprNodeType; import com.google.common.base.Preconditions; import org.apache.logging.log4j.LogManager; @@ -478,4 +480,22 @@ public abstract class LiteralExpr extends Expr implements Comparable<LiteralExpr } return isZero; } + + public static LiteralExpr getLiteralExprFromThrift(TExprNode node) throws AnalysisException { + TExprNodeType type = node.node_type; + switch (type) { + case NULL_LITERAL: return new NullLiteral(); + case BOOL_LITERAL: return new BoolLiteral(node.bool_literal.value); + case INT_LITERAL: return new IntLiteral(node.int_literal.value); + case LARGE_INT_LITERAL: return new LargeIntLiteral(node.large_int_literal.value); + case FLOAT_LITERAL: return new FloatLiteral(node.float_literal.value); + case DECIMAL_LITERAL: return new DecimalLiteral(node.decimal_literal.value); + case STRING_LITERAL: return new StringLiteral(node.string_literal.value); + case JSON_LITERAL: return new JsonLiteral(node.json_literal.value); + case DATE_LITERAL: return new DateLiteral(node.date_literal.value); + case IPV4_LITERAL: return new IPv4Literal(node.ipv4_literal.value); + case IPV6_LITERAL: return new IPv6Literal(node.ipv6_literal.value); + default: throw new AnalysisException("Wrong type from thrift;"); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 17a3db0b7fc..a36d4aeacc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -1128,4 +1128,12 @@ public class ConnectContext { public void setGroupCommitStreamLoadSql(boolean groupCommitStreamLoadSql) { isGroupCommitStreamLoadSql = groupCommitStreamLoadSql; } + + public Map<String, LiteralExpr> getUserVars() { + return userVars; + } + + public void setUserVars(Map<String, LiteralExpr> userVars) { + this.userVars = userVars; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 6bf5aec217a..923dd2b6af1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -19,6 +19,7 @@ package org.apache.doris.qe; import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.KillStmt; +import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.QueryStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; @@ -56,22 +57,26 @@ import org.apache.doris.plugin.DialectConverterPlugin; import org.apache.doris.plugin.PluginMgr; import org.apache.doris.proto.Data; import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.thrift.TExprNode; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.Maps; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; import java.io.IOException; import java.io.StringReader; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; import javax.annotation.Nullable; @@ -509,7 +514,7 @@ public abstract class ConnectProcessor { } } - public TMasterOpResult proxyExecute(TMasterOpRequest request) { + public TMasterOpResult proxyExecute(TMasterOpRequest request) throws TException { ctx.setDatabase(request.db); ctx.setQualifiedUser(request.user); ctx.setEnv(Env.getCurrentEnv()); @@ -564,6 +569,10 @@ public abstract class ConnectProcessor { } } + if (request.isSetUserVariables()) { + ctx.setUserVars(userVariableFromThrift(request.getUserVariables())); + } + ctx.setThreadLocalInfo(); StmtExecutor executor = null; try { @@ -638,4 +647,18 @@ public abstract class ConnectProcessor { public void processOnce() throws IOException, NotImplementedException { throw new NotImplementedException("Not Impl processOnce"); } + + private Map<String, LiteralExpr> userVariableFromThrift(Map<String, TExprNode> thriftMap) throws TException { + try { + Map<String, LiteralExpr> userVariables = Maps.newHashMap(); + for (Map.Entry<String, TExprNode> entry : thriftMap.entrySet()) { + TExprNode tExprNode = entry.getValue(); + LiteralExpr literalExpr = LiteralExpr.getLiteralExprFromThrift(tExprNode); + userVariables.put(entry.getKey(), literalExpr); + } + return userVariables; + } catch (AnalysisException e) { + throw new TException(e.getMessage()); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 40c126b732d..934c221905f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -17,18 +17,22 @@ package org.apache.doris.qe; +import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.thrift.FrontendService; +import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TExprNode; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -163,7 +167,7 @@ public class MasterOpExecutor { params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables()); // session variables params.setSessionVariables(ctx.getSessionVariable().getForwardVariables()); - + params.setUserVariables(getForwardUserVariables(ctx.getUserVars())); if (null != ctx.queryId()) { params.setQueryId(ctx.queryId()); } @@ -268,4 +272,15 @@ public class MasterOpExecutor { return msg; } } + + private Map<String, TExprNode> getForwardUserVariables(Map<String, LiteralExpr> userVariables) { + Map<String, TExprNode> forwardVariables = Maps.newHashMap(); + for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) { + LiteralExpr literalExpr = entry.getValue(); + TExpr tExpr = literalExpr.treeToThrift(); + TExprNode tExprNode = tExpr.nodes.get(0); + forwardVariables.put(entry.getKey(), tExprNode); + } + return forwardVariables; + } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 049c8450b23..320149855b4 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -490,7 +490,6 @@ struct TFeResult { 1: required FrontendServiceVersion protocolVersion 2: required Status.TStatus status } - struct TMasterOpRequest { 1: required string user 2: required string db @@ -520,6 +519,8 @@ struct TMasterOpRequest { 24: optional bool syncJournalOnly // if set to true, this request means to do nothing but just sync max journal id of master 25: optional string defaultCatalog 26: optional string defaultDatabase + 27: optional bool cancel_qeury // if set to true, this request means to cancel one forwarded query, and query_id needs to be set + 28: optional map<string, Exprs.TExprNode> user_variables } struct TColumnDefinition { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org