This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new cd3683b43a8 branch-3.1: [refactor](fe) Merge MasterOpExecutor and
FEOpExecutor #50776 (#52061)
cd3683b43a8 is described below
commit cd3683b43a82519d1def3b8e47ff2330200b6ad8
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Fri Jun 20 20:02:21 2025 +0800
branch-3.1: [refactor](fe) Merge MasterOpExecutor and FEOpExecutor #50776
(#52061)
bp #50776
---
.../main/java/org/apache/doris/catalog/Env.java | 4 +-
.../apache/doris/httpv2/rest/SetConfigAction.java | 2 +-
.../java/org/apache/doris/qe/ConnectContext.java | 7 +
.../java/org/apache/doris/qe/FEOpExecutor.java | 131 +++++++--
.../java/org/apache/doris/qe/MasterOpExecutor.java | 318 ++-------------------
.../java/org/apache/doris/qe/StmtExecutor.java | 6 +-
6 files changed, 142 insertions(+), 326 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 51f4cd120a0..a1931d57359 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -5966,7 +5966,7 @@ public class Env {
* we can't set callback which is in fe-core to config items which are in
fe-common. so wrap them here. it's not so
* good but is best for us now.
*/
- public void setMutableConfigwithCallback(String key, String value) throws
ConfigException {
+ public void setMutableConfigWithCallback(String key, String value) throws
ConfigException {
ConfigBase.setMutableConfig(key, value);
if (configtoThreads.get(key) != null) {
try {
@@ -5986,7 +5986,7 @@ public class Env {
for (Map.Entry<String, String> entry : configs.entrySet()) {
try {
- setMutableConfigwithCallback(entry.getKey(), entry.getValue());
+ setMutableConfigWithCallback(entry.getKey(), entry.getValue());
} catch (ConfigException e) {
throw new DdlException(e.getMessage());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java
index d9351ec5978..8d0cc12a235 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/SetConfigAction.java
@@ -94,7 +94,7 @@ public class SetConfigAction extends RestBaseController {
try {
if (confValue != null && confValue.length == 1) {
try {
-
Env.getCurrentEnv().setMutableConfigwithCallback(confKey, confValue[0]);
+
Env.getCurrentEnv().setMutableConfigWithCallback(confKey, confValue[0]);
} catch (ConfigException e) {
throw new DdlException(e.getMessage());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index c5782466e88..740c38579f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -236,6 +236,9 @@ public class ConnectContext {
private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL;
+ // unique session id in the doris cluster, added in #40680
+ private String sessionId = "";
+
// internal call like `insert overwrite` need skipAuth
// For example, `insert overwrite` only requires load permission,
// but the internal implementation will call the logic of `AlterTable`.
@@ -775,6 +778,10 @@ public class ConnectContext {
return getCatalog(defaultCatalog);
}
+ public String getSessionId() {
+ return sessionId;
+ }
+
/**
* Maybe return when catalogName is not exist. So need to check nullable.
*/
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
index 3914d7ecdab..37fe527a137 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
@@ -19,8 +19,9 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.catalog.Env;
-import org.apache.doris.cloud.qe.ComputeGroupException;
+import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TExpr;
@@ -39,22 +40,28 @@ import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+/**
+ * FEOpExecutor is used to send request to specific FE
+ */
public class FEOpExecutor {
private static final Logger LOG = LogManager.getLogger(FEOpExecutor.class);
- private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;
+ protected static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;
- private final OriginStatement originStmt;
- private final ConnectContext ctx;
- private TMasterOpResult result;
- private TNetworkAddress feAddr;
+ protected final OriginStatement originStmt;
+ protected final ConnectContext ctx;
+ protected TMasterOpResult result;
+ protected TNetworkAddress feAddr;
// the total time of thrift connectTime, readTime and writeTime
- private int thriftTimeoutMs;
+ protected int thriftTimeoutMs;
- private boolean shouldNotRetry;
+ protected boolean shouldNotRetry;
public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt,
ConnectContext ctx, boolean isQuery) {
this.feAddr = feAddress;
@@ -66,7 +73,15 @@ public class FEOpExecutor {
}
public void execute() throws Exception {
- result = forward(feAddr, buildStmtForwardParams());
+ result = forward(buildStmtForwardParams());
+ if (ctx.isTxnModel()) {
+ if (result.isSetTxnLoadInfo()) {
+
ctx.getTxnEntry().setTxnLoadInfoInObserver(result.getTxnLoadInfo());
+ } else {
+ ctx.setTxnEntry(null);
+ LOG.info("set txn entry to null");
+ }
+ }
}
public void cancel() throws Exception {
@@ -84,22 +99,24 @@ public class FEOpExecutor {
request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
// just make the protocol happy
request.setSql("");
- result = forward(feAddr, request);
+ result = forward(request);
}
// Send request to specific fe
- private TMasterOpResult forward(TNetworkAddress thriftAddress,
TMasterOpRequest params) throws Exception {
+ protected TMasterOpResult forward(TMasterOpRequest params) throws
Exception {
ctx.getEnv().checkReadyOrThrow();
FrontendService.Client client;
try {
- client = ClientPool.frontendPool.borrowObject(thriftAddress,
thriftTimeoutMs);
+ client = ClientPool.frontendPool.borrowObject(feAddr,
thriftTimeoutMs);
} catch (Exception e) {
// may throw NullPointerException. add err msg
- throw new Exception("Failed to get fe client: " +
thriftAddress.toString(), e);
+ throw new Exception("Failed to get master client.", e);
+ }
+ final StringBuilder forwardMsg = new StringBuilder("forward to master
FE " + feAddr.toString());
+ if (!params.isSyncJournalOnly()) {
+ forwardMsg.append(", statement id: ").append(ctx.getStmtId());
}
- final StringBuilder forwardMsg = new StringBuilder("forward to FE " +
thriftAddress.toString());
- forwardMsg.append(", statement id: ").append(ctx.getStmtId());
LOG.info(forwardMsg.toString());
boolean isReturnToPool = false;
@@ -110,7 +127,7 @@ public class FEOpExecutor {
} catch (TTransportException e) {
// wrap the raw exception.
forwardMsg.append(" : failed");
- Exception exception = new
ForwardToFEException(forwardMsg.toString(), e);
+ Exception exception = new
ForwardToMasterException(forwardMsg.toString(), e);
boolean ok = ClientPool.frontendPool.reopen(client,
thriftTimeoutMs);
if (!ok) {
@@ -130,14 +147,14 @@ public class FEOpExecutor {
}
} finally {
if (isReturnToPool) {
- ClientPool.frontendPool.returnObject(thriftAddress, client);
+ ClientPool.frontendPool.returnObject(feAddr, client);
} else {
- ClientPool.frontendPool.invalidateObject(thriftAddress,
client);
+ ClientPool.frontendPool.invalidateObject(feAddr, client);
}
}
}
- private TMasterOpRequest buildStmtForwardParams() {
+ protected TMasterOpRequest buildStmtForwardParams() throws
AnalysisException {
TMasterOpRequest params = new TMasterOpRequest();
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
@@ -151,15 +168,18 @@ public class FEOpExecutor {
params.setUserIp(ctx.getRemoteIP());
params.setStmtId(ctx.getStmtId());
params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
+ // params.setSessionId(ctx.getSessionId());
- String cluster = "";
- try {
- ctx.getCloudCluster(false);
- } catch (ComputeGroupException e) {
- LOG.warn("failed to get cloud cluster", e);
- }
- if (!Strings.isNullOrEmpty(cluster)) {
- params.setCloudCluster(cluster);
+ if (Config.isCloudMode()) {
+ String cluster = "";
+ try {
+ cluster = ctx.getCloudCluster(false);
+ } catch (Exception e) {
+ LOG.warn("failed to get cloud compute group", e);
+ }
+ if (!Strings.isNullOrEmpty(cluster)) {
+ params.setCloudCluster(cluster);
+ }
}
// query options
@@ -170,6 +190,12 @@ public class FEOpExecutor {
if (null != ctx.queryId()) {
params.setQueryId(ctx.queryId());
}
+
+ // set transaction load info
+ if (ctx.isTxnModel()) {
+
params.setTxnLoadInfo(ctx.getTxnEntry().getTxnLoadInfoInObserver());
+ }
+
return params;
}
@@ -190,6 +216,48 @@ public class FEOpExecutor {
return result.getErrMessage();
}
+
+ public ByteBuffer getOutputPacket() {
+ if (result == null) {
+ return null;
+ }
+ return result.packet;
+ }
+
+ public TUniqueId getQueryId() {
+ if (result != null && result.isSetQueryId()) {
+ return result.getQueryId();
+ } else {
+ return null;
+ }
+ }
+
+ public String getProxyStatus() {
+ if (result == null) {
+ return QueryState.MysqlStateType.UNKNOWN.name();
+ }
+ if (!result.isSetStatus()) {
+ return QueryState.MysqlStateType.UNKNOWN.name();
+ } else {
+ return result.getStatus();
+ }
+ }
+
+ public ShowResultSet getProxyResultSet() {
+ if (result == null) {
+ return null;
+ }
+ if (result.isSetResultSet()) {
+ return new ShowResultSet(result.resultSet);
+ } else {
+ return null;
+ }
+ }
+
+ public List<ByteBuffer> getQueryResultBufList() {
+ return result.isSetQueryResultBufList() ?
result.getQueryResultBufList() : Collections.emptyList();
+ }
+
private Map<String, TExprNode> getForwardUserVariables(Map<String,
LiteralExpr> userVariables) {
Map<String, TExprNode> forwardVariables = Maps.newHashMap();
for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) {
@@ -201,21 +269,22 @@ public class FEOpExecutor {
return forwardVariables;
}
- public static class ForwardToFEException extends RuntimeException {
-
+ protected static class ForwardToMasterException extends RuntimeException {
private static final Map<Integer, String> TYPE_MSG_MAP =
ImmutableMap.<Integer, String>builder()
.put(TTransportException.UNKNOWN, "Unknown exception")
.put(TTransportException.NOT_OPEN, "Connection is not
open")
.put(TTransportException.ALREADY_OPEN, "Connection has
already opened up")
- .put(TTransportException.TIMED_OUT, "Connection
timeout")
+ .put(TTransportException.TIMED_OUT,
+ "Connection timeout, please check network
state or enlarge session variable:"
+ + "`query_timeout`/`insert_timeout`")
.put(TTransportException.END_OF_FILE, "EOF")
.put(TTransportException.CORRUPTED_DATA, "Corrupted
data")
.build();
private final String msg;
- public ForwardToFEException(String msg, TTransportException exception)
{
+ public ForwardToMasterException(String msg, TTransportException
exception) {
this.msg = msg + ", cause: " +
TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 1f7d87bdfe3..87e16626e82 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -17,65 +17,33 @@
package org.apache.doris.qe;
-import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.catalog.Env;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.ClientPool;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.thrift.FrontendService;
-import org.apache.doris.thrift.TExpr;
-import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TGroupCommitInfo;
import org.apache.doris.thrift.TMasterOpRequest;
-import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TUniqueId;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class MasterOpExecutor {
+/**
+ * MasterOpExecutor is used to send request to Master FE.
+ * It is inherited from FEOpExecutor. The difference is that MasterOpExecutor
may need to wait the journal being
+ * synced before returning.
+ */
+public class MasterOpExecutor extends FEOpExecutor {
private static final Logger LOG =
LogManager.getLogger(MasterOpExecutor.class);
-
- private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;
-
- private final OriginStatement originStmt;
- private final ConnectContext ctx;
- private TMasterOpResult result;
-
- private TNetworkAddress masterAddr;
-
- private int waitTimeoutMs;
- // the total time of thrift connectTime add readTime and writeTime
- private int thriftTimeoutMs;
-
- private boolean shouldNotRetry;
+ private final int journalWaitTimeoutMs;
public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx,
RedirectStatus status, boolean isQuery) {
- this.originStmt = originStmt;
- this.ctx = ctx;
+ super(new TNetworkAddress(ctx.getEnv().getMasterHost(),
ctx.getEnv().getMasterRpcPort()),
+ originStmt, ctx, isQuery);
if (status.isNeedToWaitJournalSync()) {
- this.waitTimeoutMs = (int) (ctx.getExecTimeout() * 1000 *
RPC_TIMEOUT_COEFFICIENT);
+ this.journalWaitTimeoutMs = (int) (ctx.getExecTimeout() * 1000 *
RPC_TIMEOUT_COEFFICIENT);
} else {
- this.waitTimeoutMs = 0;
+ this.journalWaitTimeoutMs = 0;
}
- this.thriftTimeoutMs = (int) (ctx.getExecTimeout() * 1000 *
RPC_TIMEOUT_COEFFICIENT);
- // if isQuery=false, we shouldn't retry twice when catch exception
because of Idempotency
- this.shouldNotRetry = !isQuery;
}
/**
@@ -85,21 +53,25 @@ public class MasterOpExecutor {
this(null, ctx, RedirectStatus.FORWARD_WITH_SYNC, true);
}
+ @Override
public void execute() throws Exception {
- result = forward(buildStmtForwardParams());
- if (ctx.isTxnModel()) {
- if (result.isSetTxnLoadInfo()) {
-
ctx.getTxnEntry().setTxnLoadInfoInObserver(result.getTxnLoadInfo());
- } else {
- ctx.setTxnEntry(null);
- LOG.info("set txn entry to null");
- }
- }
+ super.execute();
waitOnReplaying();
}
+ @Override
+ public void cancel() throws Exception {
+ super.cancel();
+ waitOnReplaying();
+ }
+
+ private void waitOnReplaying() throws DdlException {
+ LOG.info("forwarding to master get result max journal id: {}",
result.maxJournalId);
+ ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId,
journalWaitTimeoutMs);
+ }
+
public void syncJournal() throws Exception {
- result = forward(buildSyncJournalParmas());
+ result = forward(buildSyncJournalParams());
waitOnReplaying();
}
@@ -114,132 +86,7 @@ public class MasterOpExecutor {
waitOnReplaying();
}
- public void cancel() throws Exception {
- TUniqueId queryId = ctx.queryId();
- if (queryId == null) {
- return;
- }
- Preconditions.checkNotNull(masterAddr, "query with id %s is not
forwarded to master", queryId);
- TMasterOpRequest request = new TMasterOpRequest();
- request.setCancelQeury(true);
- request.setQueryId(queryId);
- request.setDb(ctx.getDatabase());
- request.setUser(ctx.getQualifiedUser());
- request.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
- request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
- // just make the protocol happy
- request.setSql("");
- result = forward(masterAddr, request);
- waitOnReplaying();
- }
-
- private void waitOnReplaying() throws DdlException {
- LOG.info("forwarding to master get result max journal id: {}",
result.maxJournalId);
- ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId,
waitTimeoutMs);
- }
-
- private TMasterOpResult forward(TMasterOpRequest params) throws Exception {
- String masterHost = ctx.getEnv().getMasterHost();
- int masterRpcPort = ctx.getEnv().getMasterRpcPort();
- masterAddr = new TNetworkAddress(masterHost, masterRpcPort);
- return forward(masterAddr, params);
- }
-
- // Send request to Master
- private TMasterOpResult forward(TNetworkAddress thriftAddress,
TMasterOpRequest params) throws Exception {
- ctx.getEnv().checkReadyOrThrow();
-
- FrontendService.Client client;
- try {
- client = ClientPool.frontendPool.borrowObject(thriftAddress,
thriftTimeoutMs);
- } catch (Exception e) {
- // may throw NullPointerException. add err msg
- throw new Exception("Failed to get master client.", e);
- }
- final StringBuilder forwardMsg = new StringBuilder("forward to master
FE " + thriftAddress.toString());
- if (!params.isSyncJournalOnly()) {
- forwardMsg.append(", statement id: ").append(ctx.getStmtId());
- }
- LOG.info(forwardMsg.toString());
-
- boolean isReturnToPool = false;
- try {
- final TMasterOpResult result = client.forward(params);
- isReturnToPool = true;
- return result;
- } catch (TTransportException e) {
- // wrap the raw exception.
- forwardMsg.append(" : failed");
- Exception exception = new
ForwardToMasterException(forwardMsg.toString(), e);
-
- boolean ok = ClientPool.frontendPool.reopen(client,
thriftTimeoutMs);
- if (!ok) {
- throw exception;
- }
- if (shouldNotRetry || e.getType() ==
TTransportException.TIMED_OUT) {
- throw exception;
- } else {
- LOG.warn(forwardMsg.append(" twice").toString(), e);
- try {
- TMasterOpResult result = client.forward(params);
- isReturnToPool = true;
- return result;
- } catch (TException ex) {
- throw exception;
- }
- }
- } finally {
- if (isReturnToPool) {
- ClientPool.frontendPool.returnObject(thriftAddress, client);
- } else {
- ClientPool.frontendPool.invalidateObject(thriftAddress,
client);
- }
- }
- }
-
- private TMasterOpRequest buildStmtForwardParams() throws AnalysisException
{
- TMasterOpRequest params = new TMasterOpRequest();
- // node ident
- params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
- params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
- params.setSql(originStmt.originStmt);
- params.setStmtIdx(originStmt.idx);
- params.setUser(ctx.getQualifiedUser());
- params.setDefaultCatalog(ctx.getDefaultCatalog());
- params.setDefaultDatabase(ctx.getDatabase());
- params.setDb(ctx.getDatabase());
- params.setUserIp(ctx.getRemoteIP());
- params.setStmtId(ctx.getStmtId());
- params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
-
- if (Config.isCloudMode()) {
- String cluster = "";
- try {
- cluster = ctx.getCloudCluster(false);
- } catch (Exception e) {
- LOG.warn("failed to get cloud compute group", e);
- }
- if (!Strings.isNullOrEmpty(cluster)) {
- params.setCloudCluster(cluster);
- }
- }
-
- // query options
-
params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
- // session variables
-
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
- params.setUserVariables(getForwardUserVariables(ctx.getUserVars()));
- if (null != ctx.queryId()) {
- params.setQueryId(ctx.queryId());
- }
- // set transaction load info
- if (ctx.isTxnModel()) {
-
params.setTxnLoadInfo(ctx.getTxnEntry().getTxnLoadInfoInObserver());
- }
- return params;
- }
-
- private TMasterOpRequest buildSyncJournalParmas() {
+ private TMasterOpRequest buildSyncJournalParams() {
final TMasterOpRequest params = new TMasterOpRequest();
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
@@ -257,17 +104,7 @@ public class MasterOpExecutor {
groupCommitParams.setGetGroupCommitLoadBeId(true);
groupCommitParams.setGroupCommitLoadTableId(tableId);
groupCommitParams.setCluster(cluster);
-
- final TMasterOpRequest params = new TMasterOpRequest();
- // node ident
- params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
- params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
- params.setGroupCommitInfo(groupCommitParams);
- params.setDb(ctx.getDatabase());
- params.setUser(ctx.getQualifiedUser());
- // just make the protocol happy
- params.setSql("");
- return params;
+ return getMasterOpRequestForGroupCommit(groupCommitParams);
}
private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long
receiveData) {
@@ -275,7 +112,10 @@ public class MasterOpExecutor {
groupCommitParams.setUpdateLoadData(true);
groupCommitParams.setTableId(tableId);
groupCommitParams.setReceiveData(receiveData);
+ return getMasterOpRequestForGroupCommit(groupCommitParams);
+ }
+ private TMasterOpRequest getMasterOpRequestForGroupCommit(TGroupCommitInfo
groupCommitParams) {
final TMasterOpRequest params = new TMasterOpRequest();
// node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
@@ -288,102 +128,4 @@ public class MasterOpExecutor {
return params;
}
- public ByteBuffer getOutputPacket() {
- if (result == null) {
- return null;
- }
- return result.packet;
- }
-
- public TUniqueId getQueryId() {
- if (result != null && result.isSetQueryId()) {
- return result.getQueryId();
- } else {
- return null;
- }
- }
-
- public String getProxyStatus() {
- if (result == null) {
- return QueryState.MysqlStateType.UNKNOWN.name();
- }
- if (!result.isSetStatus()) {
- return QueryState.MysqlStateType.UNKNOWN.name();
- } else {
- return result.getStatus();
- }
- }
-
- public int getProxyStatusCode() {
- if (result == null || !result.isSetStatusCode()) {
- return ErrorCode.ERR_UNKNOWN_ERROR.getCode();
- }
- return result.getStatusCode();
- }
-
- public String getProxyErrMsg() {
- if (result == null) {
- return ErrorCode.ERR_UNKNOWN_ERROR.getErrorMsg();
- }
- if (!result.isSetErrMessage()) {
- return "";
- }
- return result.getErrMessage();
- }
-
- public ShowResultSet getProxyResultSet() {
- if (result == null) {
- return null;
- }
- if (result.isSetResultSet()) {
- return new ShowResultSet(result.resultSet);
- } else {
- return null;
- }
- }
-
- public List<ByteBuffer> getQueryResultBufList() {
- return result.isSetQueryResultBufList() ?
result.getQueryResultBufList() : Collections.emptyList();
- }
-
- public void setResult(TMasterOpResult result) {
- this.result = result;
- }
-
- public static class ForwardToMasterException extends RuntimeException {
-
- private static final Map<Integer, String> TYPE_MSG_MAP =
- ImmutableMap.<Integer, String>builder()
- .put(TTransportException.UNKNOWN, "Unknown exception")
- .put(TTransportException.NOT_OPEN, "Connection is not
open")
- .put(TTransportException.ALREADY_OPEN, "Connection has
already opened up")
- .put(TTransportException.TIMED_OUT,
- "Connection timeout, please check network
state or enlarge session variable:"
- + "`query_timeout`/`insert_timeout`")
- .put(TTransportException.END_OF_FILE, "EOF")
- .put(TTransportException.CORRUPTED_DATA, "Corrupted
data")
- .build();
-
- private final String msg;
-
- public ForwardToMasterException(String msg, TTransportException
exception) {
- this.msg = msg + ", cause: " +
TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage();
- }
-
- @Override
- public String getMessage() {
- return msg;
- }
- }
-
- private Map<String, TExprNode> getForwardUserVariables(Map<String,
LiteralExpr> userVariables) {
- Map<String, TExprNode> forwardVariables = Maps.newHashMap();
- for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) {
- LiteralExpr literalExpr = entry.getValue();
- TExpr tExpr = literalExpr.treeToThrift();
- TExprNode tExprNode = tExpr.nodes.get(0);
- forwardVariables.put(entry.getKey(), tExprNode);
- }
- return forwardVariables;
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 1dd64e8ebae..003882d4a41 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -484,14 +484,14 @@ public class StmtExecutor {
if (masterOpExecutor == null) {
return MysqlStateType.UNKNOWN.ordinal();
}
- return masterOpExecutor.getProxyStatusCode();
+ return masterOpExecutor.getStatusCode();
}
public String getProxyErrMsg() {
if (masterOpExecutor == null) {
return MysqlStateType.UNKNOWN.name();
}
- return masterOpExecutor.getProxyErrMsg();
+ return masterOpExecutor.getErrMsg();
}
public boolean isSyncLoadKindStmt() {
@@ -1185,8 +1185,6 @@ public class StmtExecutor {
/**
* get variables in stmt.
- *
- * @throws DdlException
*/
private void analyzeVariablesInStmt() throws DdlException {
analyzeVariablesInStmt(parsedStmt);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]