This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch 2.1-tmp
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 67bb5196137820a766568c520e9faa926166b2c1
Author: feiniaofeiafei <53502832+feiniaofeia...@users.noreply.github.com>
AuthorDate: Wed Apr 3 15:31:31 2024 +0800

    [Fix](nereids) forward the user define variables to master (#33013)
---
 .../org/apache/doris/analysis/LiteralExpr.java     | 20 +++++++++++++++++
 .../java/org/apache/doris/qe/ConnectContext.java   |  8 +++++++
 .../java/org/apache/doris/qe/ConnectProcessor.java | 25 +++++++++++++++++++++-
 .../java/org/apache/doris/qe/MasterOpExecutor.java | 17 ++++++++++++++-
 gensrc/thrift/FrontendService.thrift               |  3 ++-
 5 files changed, 70 insertions(+), 3 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java
index 0814235f0a3..eb6fedb7b50 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LiteralExpr.java
@@ -26,6 +26,8 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.mysql.MysqlProto;
+import org.apache.doris.thrift.TExprNode;
+import org.apache.doris.thrift.TExprNodeType;
 
 import com.google.common.base.Preconditions;
 import org.apache.logging.log4j.LogManager;
@@ -478,4 +480,22 @@ public abstract class LiteralExpr extends Expr implements 
Comparable<LiteralExpr
         }
         return isZero;
     }
+
+    public static LiteralExpr getLiteralExprFromThrift(TExprNode node) throws 
AnalysisException {
+        TExprNodeType type = node.node_type;
+        switch (type) {
+            case NULL_LITERAL: return new NullLiteral();
+            case BOOL_LITERAL: return new BoolLiteral(node.bool_literal.value);
+            case INT_LITERAL: return new IntLiteral(node.int_literal.value);
+            case LARGE_INT_LITERAL: return new 
LargeIntLiteral(node.large_int_literal.value);
+            case FLOAT_LITERAL: return new 
FloatLiteral(node.float_literal.value);
+            case DECIMAL_LITERAL: return new 
DecimalLiteral(node.decimal_literal.value);
+            case STRING_LITERAL: return new 
StringLiteral(node.string_literal.value);
+            case JSON_LITERAL: return new JsonLiteral(node.json_literal.value);
+            case DATE_LITERAL: return new DateLiteral(node.date_literal.value);
+            case IPV4_LITERAL: return new IPv4Literal(node.ipv4_literal.value);
+            case IPV6_LITERAL: return new IPv6Literal(node.ipv6_literal.value);
+            default: throw new AnalysisException("Wrong type from thrift;");
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 17a3db0b7fc..a36d4aeacc9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -1128,4 +1128,12 @@ public class ConnectContext {
     public void setGroupCommitStreamLoadSql(boolean groupCommitStreamLoadSql) {
         isGroupCommitStreamLoadSql = groupCommitStreamLoadSql;
     }
+
+    public Map<String, LiteralExpr> getUserVars() {
+        return userVars;
+    }
+
+    public void setUserVars(Map<String, LiteralExpr> userVars) {
+        this.userVars = userVars;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index 6bf5aec217a..923dd2b6af1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.InsertStmt;
 import org.apache.doris.analysis.KillStmt;
+import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.QueryStmt;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
@@ -56,22 +57,26 @@ import org.apache.doris.plugin.DialectConverterPlugin;
 import org.apache.doris.plugin.PluginMgr;
 import org.apache.doris.proto.Data;
 import org.apache.doris.qe.QueryState.MysqlStateType;
+import org.apache.doris.thrift.TExprNode;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.io.StringReader;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import javax.annotation.Nullable;
 
@@ -509,7 +514,7 @@ public abstract class ConnectProcessor {
         }
     }
 
-    public TMasterOpResult proxyExecute(TMasterOpRequest request) {
+    public TMasterOpResult proxyExecute(TMasterOpRequest request) throws 
TException {
         ctx.setDatabase(request.db);
         ctx.setQualifiedUser(request.user);
         ctx.setEnv(Env.getCurrentEnv());
@@ -564,6 +569,10 @@ public abstract class ConnectProcessor {
             }
         }
 
+        if (request.isSetUserVariables()) {
+            
ctx.setUserVars(userVariableFromThrift(request.getUserVariables()));
+        }
+
         ctx.setThreadLocalInfo();
         StmtExecutor executor = null;
         try {
@@ -638,4 +647,18 @@ public abstract class ConnectProcessor {
     public void processOnce() throws IOException, NotImplementedException {
         throw new NotImplementedException("Not Impl processOnce");
     }
+
+    private Map<String, LiteralExpr> userVariableFromThrift(Map<String, 
TExprNode> thriftMap) throws TException {
+        try {
+            Map<String, LiteralExpr> userVariables = Maps.newHashMap();
+            for (Map.Entry<String, TExprNode> entry : thriftMap.entrySet()) {
+                TExprNode tExprNode = entry.getValue();
+                LiteralExpr literalExpr = 
LiteralExpr.getLiteralExprFromThrift(tExprNode);
+                userVariables.put(entry.getKey(), literalExpr);
+            }
+            return userVariables;
+        } catch (AnalysisException e) {
+            throw new TException(e.getMessage());
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 40c126b732d..934c221905f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -17,18 +17,22 @@
 
 package org.apache.doris.qe;
 
+import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.RedirectStatus;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TExprNode;
 import org.apache.doris.thrift.TMasterOpRequest;
 import org.apache.doris.thrift.TMasterOpResult;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -163,7 +167,7 @@ public class MasterOpExecutor {
         
params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
         // session variables
         
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
-
+        params.setUserVariables(getForwardUserVariables(ctx.getUserVars()));
         if (null != ctx.queryId()) {
             params.setQueryId(ctx.queryId());
         }
@@ -268,4 +272,15 @@ public class MasterOpExecutor {
             return msg;
         }
     }
+
+    private Map<String, TExprNode> getForwardUserVariables(Map<String, 
LiteralExpr> userVariables) {
+        Map<String, TExprNode> forwardVariables = Maps.newHashMap();
+        for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) {
+            LiteralExpr literalExpr = entry.getValue();
+            TExpr tExpr = literalExpr.treeToThrift();
+            TExprNode tExprNode = tExpr.nodes.get(0);
+            forwardVariables.put(entry.getKey(), tExprNode);
+        }
+        return forwardVariables;
+    }
 }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 049c8450b23..320149855b4 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -490,7 +490,6 @@ struct TFeResult {
     1: required FrontendServiceVersion protocolVersion
     2: required Status.TStatus status
 }
-
 struct TMasterOpRequest {
     1: required string user
     2: required string db
@@ -520,6 +519,8 @@ struct TMasterOpRequest {
     24: optional bool syncJournalOnly // if set to true, this request means to 
do nothing but just sync max journal id of master
     25: optional string defaultCatalog
     26: optional string defaultDatabase
+    27: optional bool cancel_qeury // if set to true, this request means to 
cancel one forwarded query, and query_id needs to be set
+    28: optional map<string, Exprs.TExprNode> user_variables
 }
 
 struct TColumnDefinition {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to