This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 545160a343 [refactor](planner) Separate the planning process for the legacy planner and Nereids (#17991) 545160a343 is described below commit 545160a3434075524f8f3f0f3dddd13a150ace97 Author: morrySnow <101034200+morrys...@users.noreply.github.com> AuthorDate: Wed Mar 29 11:36:38 2023 +0800 [refactor](planner) Separate the planning process for the legacy planner and Nereids (#17991) 1. separate the planning process for legacy planner and Nereids in StmtExecutor 2. add forward to master logic to Nereids 3. refactor Command process for Nereids, add run interface to Command 4. internal query could run on Nereids as normal query 5. fix CreatePolicyCommand syntax, let it exactlly same with legacy planner 6. let Nereids session variables forward to master --- .../antlr4/org/apache/doris/nereids/DorisLexer.g4 | 1 + .../antlr4/org/apache/doris/nereids/DorisParser.g4 | 22 +- .../org/apache/doris/nereids/NereidsPlanner.java | 23 +- .../doris/nereids/parser/LogicalPlanBuilder.java | 43 ++- .../apache/doris/nereids/parser/NereidsParser.java | 18 +- .../nereids/trees/plans/commands/Command.java | 6 + .../trees/plans/commands/CreatePolicyCommand.java | 46 ++- .../trees/plans/commands/ExplainCommand.java | 17 +- .../{CreatePolicyCommand.java => Forward.java} | 31 +- ...CreatePolicyCommand.java => ForwardNoSync.java} | 30 +- ...eatePolicyCommand.java => ForwardWithSync.java} | 30 +- .../{CreatePolicyCommand.java => NoForward.java} | 30 +- .../trees/plans/logical/LogicalCheckPolicy.java | 2 +- .../org/apache/doris/persist/meta/MetaReader.java | 3 +- .../java/org/apache/doris/qe/ConnectProcessor.java | 9 + .../main/java/org/apache/doris/qe/Coordinator.java | 8 +- .../java/org/apache/doris/qe/SessionVariable.java | 25 +- .../java/org/apache/doris/qe/StmtExecutor.java | 350 +++++++++++++-------- .../doris/nereids/parser/NereidsParserTest.java | 8 +- .../doris/nereids/preprocess/SelectHintTest.java | 14 +- .../org/apache/doris/qe/SessionVariablesTest.java | 2 +- 21 files changed, 413 insertions(+), 305 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 index 4316d6efc2..8b53d1fdde 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 @@ -337,6 +337,7 @@ SORT: 'SORT'; SORTED: 'SORTED'; START: 'START'; STATISTICS: 'STATISTICS'; +STORAGE: 'STORAGE'; STORED: 'STORED'; STRATIFY: 'STRATIFY'; STRUCT: 'STRUCT'; diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 5209d75487..73dc5079bd 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -34,14 +34,26 @@ singleStatement ; statement - : explain? query #statementDefault + : explain? query #statementDefault | CREATE ROW POLICY (IF NOT EXISTS)? name=identifier ON table=multipartIdentifier AS type=(RESTRICTIVE | PERMISSIVE) - TO user=identifier + TO user=userIdentify USING LEFT_PAREN booleanExpression RIGHT_PAREN #createRowPolicy ; +// -----------------Command accessories----------------- + +identifierOrText + : errorCapturingIdentifier + | STRING + ; + +userIdentify + : user=identifierOrText (AT (host=identifierOrText | LEFT_PAREN host=identifierOrText RIGHT_PAREN))? + ; + + explain : (EXPLAIN planType? | DESC | DESCRIBE) level=(VERBOSE | GRAPH | PLAN)? @@ -241,7 +253,7 @@ multipartIdentifier // -----------------Expression----------------- namedExpression - : expression (AS? (errorCapturingIdentifier | STRING))? + : expression (AS? (identifierOrText))? ; namedExpressionSeq @@ -610,6 +622,7 @@ nonReserved | PARTITIONS | PERCENTILE_CONT | PERCENTLIT + | PERMISSIVE | PHYSICAL | PIVOT | PLACING @@ -636,6 +649,7 @@ nonReserved | RESET | RESPECT | RESTRICT + | RESTRICTIVE | REVOKE | REWRITTEN | RLIKE @@ -643,7 +657,6 @@ nonReserved | ROLES | ROLLBACK | ROLLUP - | ROW | ROWS | SCHEMA | SCHEMAS @@ -662,6 +675,7 @@ nonReserved | SORTED | START | STATISTICS + | STORAGE | STORED | STRATIFY | STRUCT diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 6a70fbbc79..b124fb2590 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -56,6 +56,9 @@ import org.apache.doris.qe.ConnectContext; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -159,13 +162,31 @@ public class NereidsPlanner extends Planner { try (Lock lock = new Lock(plan, cascadesContext)) { // resolve column, table and function - analyze(); + + Span queryAnalysisSpan = + statementContext.getConnectContext().getTracer() + .spanBuilder("query analysis").setParent(Context.current()).startSpan(); + try (Scope scope = queryAnalysisSpan.makeCurrent()) { + // analyze this query + analyze(); + } catch (Exception e) { + queryAnalysisSpan.recordException(e); + throw e; + } finally { + queryAnalysisSpan.end(); + } + + if (statementContext.getConnectContext().getExecutor() != null) { + statementContext.getConnectContext().getExecutor().getPlannerProfile().setQueryAnalysisFinishTime(); + } + if (explainLevel == ExplainLevel.ANALYZED_PLAN || explainLevel == ExplainLevel.ALL_PLAN) { analyzedPlan = cascadesContext.getRewritePlan(); if (explainLevel == ExplainLevel.ANALYZED_PLAN) { return analyzedPlan; } } + // rule-based optimize rewrite(); if (explainLevel == ExplainLevel.REWRITTEN_PLAN || explainLevel == ExplainLevel.ALL_PLAN) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index cf7add4eea..22dcd3e171 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -19,6 +19,7 @@ package org.apache.doris.nereids.parser; import org.apache.doris.analysis.ArithmeticExpr.Operator; import org.apache.doris.analysis.SetType; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.nereids.DorisParser; @@ -50,6 +51,7 @@ import org.apache.doris.nereids.DorisParser.HavingClauseContext; import org.apache.doris.nereids.DorisParser.HintAssignmentContext; import org.apache.doris.nereids.DorisParser.HintStatementContext; import org.apache.doris.nereids.DorisParser.IdentifierListContext; +import org.apache.doris.nereids.DorisParser.IdentifierOrTextContext; import org.apache.doris.nereids.DorisParser.IdentifierSeqContext; import org.apache.doris.nereids.DorisParser.IntegerLiteralContext; import org.apache.doris.nereids.DorisParser.IntervalContext; @@ -98,6 +100,7 @@ import org.apache.doris.nereids.DorisParser.TvfPropertyContext; import org.apache.doris.nereids.DorisParser.TvfPropertyItemContext; import org.apache.doris.nereids.DorisParser.TypeConstructorContext; import org.apache.doris.nereids.DorisParser.UnitIdentifierContext; +import org.apache.doris.nereids.DorisParser.UserIdentifyContext; import org.apache.doris.nereids.DorisParser.UserVariableContext; import org.apache.doris.nereids.DorisParser.WhereClauseContext; import org.apache.doris.nereids.DorisParser.WindowFrameContext; @@ -224,6 +227,7 @@ import org.apache.doris.nereids.trees.plans.logical.UsingJoin; import org.apache.doris.nereids.types.DataType; import org.apache.doris.nereids.types.coercion.CharacterType; import org.apache.doris.nereids.util.ExpressionUtils; +import org.apache.doris.policy.FilterType; import org.apache.doris.policy.PolicyTypeEnum; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; @@ -361,8 +365,34 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { @Override public Command visitCreateRowPolicy(CreateRowPolicyContext ctx) { - // Only wherePredicate is needed at present - return new CreatePolicyCommand(PolicyTypeEnum.ROW, getExpression(ctx.booleanExpression())); + FilterType filterType = FilterType.of(ctx.type.getText()); + List<String> nameParts = visitMultipartIdentifier(ctx.table); + return new CreatePolicyCommand(PolicyTypeEnum.ROW, ctx.name.getText(), + ctx.EXISTS() != null, nameParts, Optional.of(filterType), visitUserIdentify(ctx.user), + Optional.of(getExpression(ctx.booleanExpression())), ImmutableMap.of()); + } + + @Override + public String visitIdentifierOrText(IdentifierOrTextContext ctx) { + if (ctx.STRING() != null) { + return ctx.STRING().getText().substring(1, ctx.STRING().getText().length() - 1); + } else { + return ctx.errorCapturingIdentifier().getText(); + } + } + + @Override + public UserIdentity visitUserIdentify(UserIdentifyContext ctx) { + String user = visitIdentifierOrText(ctx.user); + String host = null; + if (ctx.host != null) { + host = visitIdentifierOrText(ctx.host); + } + if (host == null) { + host = "%"; + } + boolean isDomain = ctx.LEFT_PAREN() != null; + return new UserIdentity(user, host, isDomain); } @Override @@ -544,14 +574,11 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { public Expression visitNamedExpression(NamedExpressionContext ctx) { return ParserUtils.withOrigin(ctx, () -> { Expression expression = getExpression(ctx.expression()); - if (ctx.errorCapturingIdentifier() != null) { - return new UnboundAlias(expression, ctx.errorCapturingIdentifier().getText()); - } else if (ctx.STRING() != null) { - return new UnboundAlias(expression, ctx.STRING().getText() - .substring(1, ctx.STRING().getText().length() - 1)); - } else { + if (ctx.identifierOrText() == null) { return expression; } + String alias = visitIdentifierOrText(ctx.identifierOrText()); + return new UnboundAlias(expression, alias); }); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java index b6af329ca4..895f057a69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java @@ -17,7 +17,6 @@ package org.apache.doris.nereids.parser; -import org.apache.doris.analysis.ExplainOptions; import org.apache.doris.analysis.StatementBase; import org.apache.doris.common.Pair; import org.apache.doris.nereids.DorisLexer; @@ -25,8 +24,6 @@ import org.apache.doris.nereids.DorisParser; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; -import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.types.DataType; @@ -55,20 +52,7 @@ public class NereidsParser { List<Pair<LogicalPlan, StatementContext>> logicalPlans = parseMultiple(originStr); List<StatementBase> statementBases = Lists.newArrayList(); for (Pair<LogicalPlan, StatementContext> parsedPlanToContext : logicalPlans) { - // TODO: this is a trick to support explain. Since we do not support any other command in a short time. - // It is acceptable. In the future, we need to refactor this. - StatementContext statementContext = parsedPlanToContext.second; - if (parsedPlanToContext.first instanceof ExplainCommand) { - ExplainCommand explainCommand = (ExplainCommand) parsedPlanToContext.first; - LogicalPlan innerPlan = explainCommand.getLogicalPlan(); - LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(innerPlan, statementContext); - ExplainLevel explainLevel = explainCommand.getLevel(); - ExplainOptions explainOptions = new ExplainOptions(explainLevel); - logicalPlanAdapter.setIsExplain(explainOptions); - statementBases.add(logicalPlanAdapter); - } else { - statementBases.add(new LogicalPlanAdapter(parsedPlanToContext.first, statementContext)); - } + statementBases.add(new LogicalPlanAdapter(parsedPlanToContext.first, parsedPlanToContext.second)); } return statementBases; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/Command.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/Command.java index 7e3405b251..556e989fae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/Command.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/Command.java @@ -25,6 +25,8 @@ import org.apache.doris.nereids.trees.plans.AbstractPlan; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.Statistics; import org.jetbrains.annotations.Nullable; @@ -52,6 +54,10 @@ public abstract class Command extends AbstractPlan implements LogicalPlan { super(type, groupExpression, optLogicalProperties, statistics, children); } + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + // all command should impl this interface. + } + @Override public Optional<GroupExpression> getGroupExpression() { throw new RuntimeException("Command do not implement getGroupExpression"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java index a405a61cb6..8509803589 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java @@ -17,34 +17,58 @@ package org.apache.doris.nereids.trees.plans.commands; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.policy.FilterType; import org.apache.doris.policy.PolicyTypeEnum; +import java.util.List; +import java.util.Map; import java.util.Optional; /** - * Create policy command. + * Create policy command use for row policy and storage policy. */ -public class CreatePolicyCommand extends Command { - - private PolicyTypeEnum type; +public class CreatePolicyCommand extends Command implements ForwardWithSync { + private final PolicyTypeEnum policyType; + private final String policyName; + private final boolean ifNotExists; + private final List<String> nameParts; + private final Optional<FilterType> filterType; + private final UserIdentity user; private final Optional<Expression> wherePredicate; + private final Map<String, String> properties; - public CreatePolicyCommand(PolicyTypeEnum type, Expression expr) { + /** + * ctor of this command. + */ + public CreatePolicyCommand(PolicyTypeEnum policyType, String policyName, boolean ifNotExists, + List<String> nameParts, Optional<FilterType> filterType, UserIdentity user, + Optional<Expression> wherePredicate, Map<String, String> properties) { super(PlanType.CREATE_POLICY_COMMAND); - this.type = type; - this.wherePredicate = Optional.of(expr); + this.policyType = policyType; + this.policyName = policyName; + this.ifNotExists = ifNotExists; + this.nameParts = nameParts; + this.filterType = filterType; + this.user = user; + this.wherePredicate = wherePredicate; + this.properties = properties; + } + + public Optional<Expression> getWherePredicate() { + return wherePredicate; + } + + public Map<String, String> getProperties() { + return properties; } @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitCreatePolicyCommand(this, context); } - - public Optional<Expression> getWherePredicate() { - return wherePredicate; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExplainCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExplainCommand.java index c96a95cc23..ce6e550b61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExplainCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExplainCommand.java @@ -17,14 +17,19 @@ package org.apache.doris.nereids.trees.plans.commands; +import org.apache.doris.analysis.ExplainOptions; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; /** * explain command. */ -public class ExplainCommand extends Command { +public class ExplainCommand extends Command implements NoForward { /** * explain level. @@ -57,6 +62,16 @@ public class ExplainCommand extends Command { this.logicalPlan = logicalPlan; } + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalPlan, ctx.getStatementContext()); + logicalPlanAdapter.setIsExplain(new ExplainOptions(level)); + executor.setParsedStmt(logicalPlanAdapter); + NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); + planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift()); + executor.handleExplainStmt(planner.getExplainString(new ExplainOptions(level))); + } + @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitExplainCommand(this, context); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/Forward.java similarity index 50% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/Forward.java index a405a61cb6..1f8f0a33eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/Forward.java @@ -17,34 +17,11 @@ package org.apache.doris.nereids.trees.plans.commands; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.plans.PlanType; -import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; -import org.apache.doris.policy.PolicyTypeEnum; - -import java.util.Optional; +import org.apache.doris.analysis.RedirectStatus; /** - * Create policy command. + * forward to master. */ -public class CreatePolicyCommand extends Command { - - private PolicyTypeEnum type; - - private final Optional<Expression> wherePredicate; - - public CreatePolicyCommand(PolicyTypeEnum type, Expression expr) { - super(PlanType.CREATE_POLICY_COMMAND); - this.type = type; - this.wherePredicate = Optional.of(expr); - } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitCreatePolicyCommand(this, context); - } - - public Optional<Expression> getWherePredicate() { - return wherePredicate; - } +public interface Forward { + RedirectStatus toRedirectStatus(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ForwardNoSync.java similarity index 51% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ForwardNoSync.java index a405a61cb6..dd3daaaa9a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ForwardNoSync.java @@ -17,34 +17,14 @@ package org.apache.doris.nereids.trees.plans.commands; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.plans.PlanType; -import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; -import org.apache.doris.policy.PolicyTypeEnum; - -import java.util.Optional; +import org.apache.doris.analysis.RedirectStatus; /** - * Create policy command. + * forward to master but do not wait sync metadata. */ -public class CreatePolicyCommand extends Command { - - private PolicyTypeEnum type; - - private final Optional<Expression> wherePredicate; - - public CreatePolicyCommand(PolicyTypeEnum type, Expression expr) { - super(PlanType.CREATE_POLICY_COMMAND); - this.type = type; - this.wherePredicate = Optional.of(expr); - } - +public interface ForwardNoSync extends Forward { @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitCreatePolicyCommand(this, context); - } - - public Optional<Expression> getWherePredicate() { - return wherePredicate; + default RedirectStatus toRedirectStatus() { + return RedirectStatus.FORWARD_NO_SYNC; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ForwardWithSync.java similarity index 51% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ForwardWithSync.java index a405a61cb6..536b39db60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ForwardWithSync.java @@ -17,34 +17,14 @@ package org.apache.doris.nereids.trees.plans.commands; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.plans.PlanType; -import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; -import org.apache.doris.policy.PolicyTypeEnum; - -import java.util.Optional; +import org.apache.doris.analysis.RedirectStatus; /** - * Create policy command. + * forward to master and wait sync metadata. */ -public class CreatePolicyCommand extends Command { - - private PolicyTypeEnum type; - - private final Optional<Expression> wherePredicate; - - public CreatePolicyCommand(PolicyTypeEnum type, Expression expr) { - super(PlanType.CREATE_POLICY_COMMAND); - this.type = type; - this.wherePredicate = Optional.of(expr); - } - +public interface ForwardWithSync extends Forward { @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitCreatePolicyCommand(this, context); - } - - public Optional<Expression> getWherePredicate() { - return wherePredicate; + default RedirectStatus toRedirectStatus() { + return RedirectStatus.FORWARD_WITH_SYNC; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NoForward.java similarity index 50% copy from fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NoForward.java index a405a61cb6..9ca64d73aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreatePolicyCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/NoForward.java @@ -17,34 +17,8 @@ package org.apache.doris.nereids.trees.plans.commands; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.plans.PlanType; -import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; -import org.apache.doris.policy.PolicyTypeEnum; - -import java.util.Optional; - /** - * Create policy command. + * not forward to master. */ -public class CreatePolicyCommand extends Command { - - private PolicyTypeEnum type; - - private final Optional<Expression> wherePredicate; - - public CreatePolicyCommand(PolicyTypeEnum type, Expression expr) { - super(PlanType.CREATE_POLICY_COMMAND); - this.type = type; - this.wherePredicate = Optional.of(expr); - } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitCreatePolicyCommand(this, context); - } - - public Optional<Expression> getWherePredicate() { - return wherePredicate; - } +public interface NoForward { } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCheckPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCheckPolicy.java index ffefee3242..1d02545404 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCheckPolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCheckPolicy.java @@ -150,7 +150,7 @@ public class LogicalCheckPolicy<CHILD_TYPE extends Plan> extends LogicalUnary<CH CreatePolicyCommand command = (CreatePolicyCommand) nereidsParser.parseSingle(sql); Optional<Expression> wherePredicate = command.getWherePredicate(); if (!wherePredicate.isPresent()) { - throw new AnalysisException("Invaild row policy [" + policy.getPolicyName() + "], " + sql); + throw new AnalysisException("Invalid row policy [" + policy.getPolicyName() + "], " + sql); } switch (policy.getFilterType()) { case PERMISSIVE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java index bdf616686e..13467889e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java @@ -88,7 +88,8 @@ public class MetaReader { continue; } // Should skip some bytes because ignore some meta, such as load job - if (metaIndex.name.equals("loadJob") || metaIndex.name.equals("cooldownJob")) { + if (metaIndex.name.equals("loadJob") + || metaIndex.name.equals("cooldownJob")) { LOG.info("Skip {} module", metaIndex.name); if (i < metaFooter.metaIndices.size() - 1) { IOUtils.skipFully(dis, metaFooter.metaIndices.get(i + 1).offset - metaIndex.offset); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 8ae6aca6fe..89bc12bdae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -56,6 +56,7 @@ import org.apache.doris.mysql.MysqlServerStatusFlag; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.stats.StatsErrorEstimator; +import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand; import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.proto.Data; import org.apache.doris.qe.QueryState.MysqlStateType; @@ -355,6 +356,14 @@ public class ConnectProcessor { if (ctx.getSessionVariable().isEnableNereidsPlanner()) { try { stmts = new NereidsParser().parseSQL(originStmt); + for (StatementBase stmt : stmts) { + LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) stmt; + // TODO: remove this after we could process CreatePolicyCommand + if (logicalPlanAdapter.getLogicalPlan() instanceof CreatePolicyCommand) { + stmts = null; + break; + } + } } catch (Exception e) { // TODO: We should catch all exception here until we support all query syntax. nereidsParseException = e; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 5aa22b27e2..427800ca10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -309,7 +309,7 @@ public class Coordinator { planRoot.getDescTable(), fragment.getOutputExprs()); } } - PrepareStmt prepareStmt = analyzer.getPrepareStmt(); + PrepareStmt prepareStmt = analyzer == null ? null : analyzer.getPrepareStmt(); if (prepareStmt != null) { // Used cached or better performance this.descTable = prepareStmt.getDescTable(); @@ -328,7 +328,7 @@ public class Coordinator { this.enablePipelineEngine = context.getSessionVariable().enablePipelineEngine; initQueryOptions(context); - setFromUserProperty(analyzer); + setFromUserProperty(context); this.queryGlobals.setNowString(DATE_FORMAT.format(new Date())); this.queryGlobals.setTimestampMs(System.currentTimeMillis()); @@ -371,8 +371,8 @@ public class Coordinator { nextInstanceId.setLo(queryId.lo + 1); } - private void setFromUserProperty(Analyzer analyzer) { - String qualifiedUser = analyzer.getQualifiedUser(); + private void setFromUserProperty(ConnectContext connectContext) { + String qualifiedUser = connectContext.getQualifiedUser(); // set cpu resource limit int cpuLimit = Env.getCurrentEnv().getAuth().getCpuResourceLimit(qualifiedUser); if (cpuLimit > 0) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index b491199eb7..52afe86d8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -611,13 +611,13 @@ public class SessionVariable implements Serializable, Writable { * the new optimizer is fully developed. I hope that day * would be coming soon. */ - @VariableMgr.VarAttr(name = ENABLE_NEREIDS_PLANNER) + @VariableMgr.VarAttr(name = ENABLE_NEREIDS_PLANNER, needForward = true) private boolean enableNereidsPlanner = false; - @VariableMgr.VarAttr(name = DISABLE_NEREIDS_RULES) + @VariableMgr.VarAttr(name = DISABLE_NEREIDS_RULES, needForward = true) private String disableNereidsRules = ""; - @VariableMgr.VarAttr(name = ENABLE_NEW_COST_MODEL) + @VariableMgr.VarAttr(name = ENABLE_NEW_COST_MODEL, needForward = true) private boolean enableNewCostModel = false; @VariableMgr.VarAttr(name = NEREIDS_STAR_SCHEMA_SUPPORT) @@ -626,24 +626,25 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = REWRITE_OR_TO_IN_PREDICATE_THRESHOLD, fuzzy = true) private int rewriteOrToInPredicateThreshold = 2; - @VariableMgr.VarAttr(name = NEREIDS_CBO_PENALTY_FACTOR) + @VariableMgr.VarAttr(name = NEREIDS_CBO_PENALTY_FACTOR, needForward = true) private double nereidsCboPenaltyFactor = 0.7; + @VariableMgr.VarAttr(name = ENABLE_NEREIDS_TRACE) private boolean enableNereidsTrace = false; - @VariableMgr.VarAttr(name = ENABLE_NEREIDS_RUNTIME_FILTER) + @VariableMgr.VarAttr(name = ENABLE_NEREIDS_RUNTIME_FILTER, needForward = true) private boolean enableNereidsRuntimeFilter = true; @VariableMgr.VarAttr(name = BROADCAST_RIGHT_TABLE_SCALE_FACTOR) private double broadcastRightTableScaleFactor = 10.0; - @VariableMgr.VarAttr(name = BROADCAST_ROW_COUNT_LIMIT) + @VariableMgr.VarAttr(name = BROADCAST_ROW_COUNT_LIMIT, needForward = true) private double broadcastRowCountLimit = 15000000; - @VariableMgr.VarAttr(name = BROADCAST_HASHTABLE_MEM_LIMIT_PERCENTAGE) + @VariableMgr.VarAttr(name = BROADCAST_HASHTABLE_MEM_LIMIT_PERCENTAGE, needForward = true) private double broadcastHashtableMemLimitPercentage = 0.2; - @VariableMgr.VarAttr(name = ENABLE_RUNTIME_FILTER_PRUNE) + @VariableMgr.VarAttr(name = ENABLE_RUNTIME_FILTER_PRUNE, needForward = true) public boolean enableRuntimeFilterPrune = false; /** @@ -693,7 +694,7 @@ public class SessionVariable implements Serializable, Writable { // This variable is used to avoid FE fallback to the original parser. When we execute SQL in regression tests // for nereids, fallback will cause the Doris return the correct result although the syntax is unsupported // in nereids for some mistaken modification. You should set it on the - @VariableMgr.VarAttr(name = ENABLE_FALLBACK_TO_ORIGINAL_PLANNER) + @VariableMgr.VarAttr(name = ENABLE_FALLBACK_TO_ORIGINAL_PLANNER, needForward = true) public boolean enableFallbackToOriginalPlanner = true; @VariableMgr.VarAttr(name = ENABLE_NEW_SHUFFLE_HASH_METHOD) @@ -767,7 +768,7 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = DROP_TABLE_IF_CTAS_FAILED, needForward = true) public boolean dropTableIfCtasFailed = true; - @VariableMgr.VarAttr(name = MAX_TABLE_COUNT_USE_CASCADES_JOIN_REORDER) + @VariableMgr.VarAttr(name = MAX_TABLE_COUNT_USE_CASCADES_JOIN_REORDER, needForward = true) public int maxTableCountUseCascadesJoinReorder = 10; // If this is true, the result of `show roles` will return all user default role @@ -1510,10 +1511,6 @@ public class SessionVariable implements Serializable, Writable { this.disableNereidsRules = disableNereidsRules; } - public boolean isNereidsStarSchemaSupport() { - return isEnableNereidsPlanner() && nereidsStarSchemaSupport; - } - public double getNereidsCboPenaltyFactor() { return nereidsCboPenaltyFactor; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 1be10db9ba..20a684836d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -103,8 +103,11 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.glue.LogicalPlanAdapter; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.stats.StatsErrorEstimator; import org.apache.doris.nereids.trees.plans.commands.Command; +import org.apache.doris.nereids.trees.plans.commands.Forward; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OriginalPlanner; import org.apache.doris.planner.Planner; @@ -244,7 +247,7 @@ public class StmtExecutor implements ProfileWriter { this.context.setStatementContext(statementContext); } - public static InternalService.PDataRow getRowStringValue(List<Expr> cols) throws UserException { + private static InternalService.PDataRow getRowStringValue(List<Expr> cols) throws UserException { if (cols.isEmpty()) { return null; } @@ -265,14 +268,6 @@ public class StmtExecutor implements ProfileWriter { return row.build(); } - public void setCoord(Coordinator coord) { - this.coord = coord; - } - - public Analyzer getAnalyzer() { - return analyzer; - } - // At the end of query execution, we begin to add up profile private void initProfile(QueryPlannerProfile plannerProfile, boolean waiteBeReport) { RuntimeProfile queryProfile; @@ -363,7 +358,7 @@ public class StmtExecutor implements ProfileWriter { } // this is a query stmt, but this non-master FE can not read, forward it to master - if ((parsedStmt instanceof QueryStmt) && !Env.getCurrentEnv().isMaster() + if (isQuery() && !Env.getCurrentEnv().isMaster() && !Env.getCurrentEnv().canRead()) { return true; } @@ -402,10 +397,6 @@ public class StmtExecutor implements ProfileWriter { return masterOpExecutor.getProxyStatus(); } - public boolean isQueryStmt() { - return parsedStmt != null && parsedStmt instanceof QueryStmt; - } - public boolean isInsertStmt() { return parsedStmt != null && parsedStmt instanceof InsertStmt; } @@ -432,11 +423,178 @@ public class StmtExecutor implements ProfileWriter { public void execute() throws Exception { UUID uuid = UUID.randomUUID(); TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + execute(queryId); + } + + public void execute(TUniqueId queryId) throws Exception { + SessionVariable sessionVariable = context.getSessionVariable(); Span executeSpan = context.getTracer().spanBuilder("execute").setParent(Context.current()).startSpan(); try (Scope scope = executeSpan.makeCurrent()) { - execute(queryId); + if (parsedStmt instanceof LogicalPlanAdapter + || (parsedStmt == null && sessionVariable.isEnableNereidsPlanner())) { + try { + executeByNereids(queryId); + } catch (NereidsException e) { + // try to fall back to legacy planner + if (!context.getSessionVariable().enableFallbackToOriginalPlanner) { + LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); + throw e.getException(); + } + LOG.warn("fall back to legacy planner, because: {}", e.getMessage(), e); + parsedStmt = null; + context.getState().setNereids(false); + executeByLegacy(queryId); + } + } else { + executeByLegacy(queryId); + } } finally { executeSpan.end(); + // revert Session Value + try { + VariableMgr.revertSessionValue(sessionVariable); + // origin value init + sessionVariable.setIsSingleSetVar(false); + sessionVariable.clearSessionOriginValue(); + } catch (DdlException e) { + LOG.warn("failed to revert Session value. {}", context.getQueryIdentifier(), e); + context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); + } + } + } + + private boolean checkBlockRules() throws AnalysisException { + try { + Env.getCurrentEnv().getSqlBlockRuleMgr().matchSql( + originStmt.originStmt, context.getSqlHash(), context.getQualifiedUser()); + } catch (AnalysisException e) { + LOG.warn(e.getMessage()); + context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); + return true; + } + + // limitations: partition_num, tablet_num, cardinality + List<ScanNode> scanNodeList = planner.getScanNodes(); + for (ScanNode scanNode : scanNodeList) { + if (scanNode instanceof OlapScanNode) { + OlapScanNode olapScanNode = (OlapScanNode) scanNode; + Env.getCurrentEnv().getSqlBlockRuleMgr().checkLimitations( + olapScanNode.getSelectedPartitionNum().longValue(), + olapScanNode.getSelectedTabletsNum(), + olapScanNode.getCardinality(), + context.getQualifiedUser()); + } + } + + return false; + } + + private void executeByNereids(TUniqueId queryId) { + context.setQueryId(queryId); + context.setStartTime(); + plannerProfile.setQueryBeginTime(); + context.setStmtId(STMT_ID_GENERATOR.incrementAndGet()); + try { + parseByNereids(); + Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter, + "Nereids only process LogicalPlanAdapter, but parsedStmt is " + parsedStmt.getClass().getName()); + context.getState().setNereids(true); + LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); + if (logicalPlan instanceof Command) { + if (logicalPlan instanceof Forward) { + redirectStatus = ((Forward) logicalPlan).toRedirectStatus(); + if (isForwardToMaster()) { + if (isProxy) { + // This is already a stmt forwarded from other FE. + // If goes here, which means we can't find a valid Master FE(some error happens). + // To avoid endless forward, throw exception here. + throw new UserException("The statement has been forwarded to master FE(" + + Env.getCurrentEnv().getSelfNode().getIp() + ") and failed to execute" + + " because Master FE is not ready. You may need to check FE's status"); + } + forwardToMaster(); + if (masterOpExecutor != null && masterOpExecutor.getQueryId() != null) { + context.setQueryId(masterOpExecutor.getQueryId()); + } + return; + } + } + try { + ((Command) logicalPlan).run(context, this); + } catch (QueryStateException e) { + LOG.warn("", e); + context.setState(e.getQueryState()); + } catch (UserException e) { + // Return message to info client what happened. + LOG.warn("DDL statement({}) process failed.", originStmt.originStmt, e); + context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); + } catch (Exception e) { + // Maybe our bug + LOG.warn("DDL statement(" + originStmt.originStmt + ") process failed.", e); + context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage()); + } + } else { + context.getState().setIsQuery(true); + if (context.getSessionVariable().enableProfile) { + ConnectContext.get().setStatsErrorEstimator(new StatsErrorEstimator()); + } + // create plan + planner = new NereidsPlanner(statementContext); + planner.plan(parsedStmt, context.getSessionVariable().toThrift()); + if (checkBlockRules()) { + return; + } + plannerProfile.setQueryPlanFinishTime(); + handleQueryWithRetry(queryId); + } + } catch (Exception e) { + throw new NereidsException(new AnalysisException("Unexpected exception: " + e.getMessage(), e)); + } + } + + private void parseByNereids() throws AnalysisException { + if (parsedStmt != null) { + return; + } + List<StatementBase> statements = new NereidsParser().parseSQL(originStmt.originStmt); + if (statements.size() <= originStmt.idx) { + throw new AnalysisException("Nereids parse failed. Parser get " + statements.size() + " statements," + + " but we need at least " + originStmt.idx + " statements."); + } + parsedStmt = statements.get(originStmt.idx); + } + + private void handleQueryWithRetry(TUniqueId queryId) throws Exception { + int retryTime = Config.max_query_retry_time; + for (int i = 0; i < retryTime; i++) { + try { + //reset query id for each retry + if (i > 0) { + UUID uuid = UUID.randomUUID(); + TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(), + uuid.getLeastSignificantBits()); + AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}", + DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId)); + context.setQueryId(newQueryId); + } + handleQueryStmt(); + break; + } catch (RpcException e) { + if (i == retryTime - 1) { + throw e; + } + if (!context.getMysqlChannel().isSend()) { + LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt); + } else { + throw e; + } + } finally { + // The final profile report occurs after be returns the query data, and the profile cannot be + // received after unregisterQuery(), causing the instance profile to be lost, so we should wait + // for the profile before unregisterQuery(). + endProfile(true); + QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); + } } } @@ -446,11 +604,7 @@ public class StmtExecutor implements ProfileWriter { // query id in ConnectContext will be changed when retry exec a query or master FE return a different one. // Exception: // IOException: talk with client failed. - public void execute(TUniqueId queryId) throws Exception { - SessionVariable sessionVariable = context.getSessionVariable(); - if (sessionVariable.enableProfile && sessionVariable.isEnableNereidsPlanner()) { - ConnectContext.get().setStatsErrorEstimator(new StatsErrorEstimator()); - } + public void executeByLegacy(TUniqueId queryId) throws Exception { context.setStartTime(); plannerProfile.setQueryBeginTime(); @@ -461,14 +615,6 @@ public class StmtExecutor implements ProfileWriter { context.getState().setIsQuery(true); } - if (parsedStmt instanceof LogicalPlanAdapter) { - context.getState().setNereids(true); - if (parsedStmt.getExplainOptions() == null - && !(((LogicalPlanAdapter) parsedStmt).getLogicalPlan() instanceof Command)) { - context.getState().setIsQuery(true); - } - } - try { if (context.isTxnModel() && !(parsedStmt instanceof InsertStmt) && !(parsedStmt instanceof TransactionStmt)) { @@ -482,19 +628,7 @@ public class StmtExecutor implements ProfileWriter { context.getTracer().spanBuilder("query analysis").setParent(Context.current()).startSpan(); try (Scope scope = queryAnalysisSpan.makeCurrent()) { // analyze this query - try { - analyze(context.getSessionVariable().toThrift()); - } catch (NereidsException e) { - if (!context.getSessionVariable().enableFallbackToOriginalPlanner) { - LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); - throw e.getException(); - } - // fall back to legacy planner - LOG.warn("fall back to legacy planner, because: {}", e.getMessage(), e); - parsedStmt = null; - context.getState().setNereids(false); - analyze(context.getSessionVariable().toThrift()); - } + analyze(context.getSessionVariable().toThrift()); } catch (Exception e) { queryAnalysisSpan.recordException(e); throw e; @@ -528,62 +662,14 @@ public class StmtExecutor implements ProfileWriter { return; } - if (parsedStmt instanceof QueryStmt || parsedStmt instanceof LogicalPlanAdapter) { + if (parsedStmt instanceof QueryStmt) { if (!parsedStmt.isExplain()) { // sql/sqlHash block - try { - Env.getCurrentEnv().getSqlBlockRuleMgr().matchSql( - originStmt.originStmt, context.getSqlHash(), context.getQualifiedUser()); - } catch (AnalysisException e) { - LOG.warn(e.getMessage()); - context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); + if (checkBlockRules()) { return; } - // limitations: partition_num, tablet_num, cardinality - List<ScanNode> scanNodeList = planner.getScanNodes(); - for (ScanNode scanNode : scanNodeList) { - if (scanNode instanceof OlapScanNode) { - OlapScanNode olapScanNode = (OlapScanNode) scanNode; - Env.getCurrentEnv().getSqlBlockRuleMgr().checkLimitations( - olapScanNode.getSelectedPartitionNum().longValue(), - olapScanNode.getSelectedTabletsNum(), - olapScanNode.getCardinality(), - context.getQualifiedUser()); - } - } - } - - int retryTime = Config.max_query_retry_time; - for (int i = 0; i < retryTime; i++) { - try { - //reset query id for each retry - if (i > 0) { - UUID uuid = UUID.randomUUID(); - TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(), - uuid.getLeastSignificantBits()); - AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}", - DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId)); - context.setQueryId(newQueryId); - } - handleQueryStmt(); - break; - } catch (RpcException e) { - if (i == retryTime - 1) { - throw e; - } - if (!context.getMysqlChannel().isSend()) { - LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt); - } else { - throw e; - } - } finally { - // The final profile report occurs after be returns the query data, and the profile cannot be - // received after unregisterQuery(), causing the instance profile to be lost, so we should wait - // for the profile before unregisterQuery(). - endProfile(true); - QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); - } } + handleQueryWithRetry(queryId); } else if (parsedStmt instanceof SetStmt) { handleSetStmt(); } else if (parsedStmt instanceof EnterStmt) { @@ -652,16 +738,6 @@ public class StmtExecutor implements ProfileWriter { context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR); } } finally { - // revert Session Value - try { - VariableMgr.revertSessionValue(sessionVariable); - // origin value init - sessionVariable.setIsSingleSetVar(false); - sessionVariable.clearSessionOriginValue(); - } catch (DdlException e) { - LOG.warn("failed to revert Session value. {}", context.getQueryIdentifier(), e); - context.getState().setError(e.getMysqlErrorCode(), e.getMessage()); - } if (!context.isTxnModel() && parsedStmt instanceof InsertStmt) { InsertStmt insertStmt = (InsertStmt) parsedStmt; // The transaction of an insert operation begin at analyze phase. @@ -703,9 +779,14 @@ public class StmtExecutor implements ProfileWriter { } } + private boolean isQuery() { + return parsedStmt instanceof QueryStmt + || (parsedStmt instanceof LogicalPlanAdapter + && !(((LogicalPlanAdapter) parsedStmt).getLogicalPlan() instanceof Command)); + } + private void forwardToMaster() throws Exception { - boolean isQuery = parsedStmt instanceof QueryStmt; - masterOpExecutor = new MasterOpExecutor(originStmt, context, redirectStatus, isQuery); + masterOpExecutor = new MasterOpExecutor(originStmt, context, redirectStatus, isQuery()); LOG.debug("need to transfer to Master. stmt: {}", context.getStmtId()); masterOpExecutor.execute(); } @@ -733,6 +814,8 @@ public class StmtExecutor implements ProfileWriter { context.getStmtId(), context.getForwardedStmtId()); } + parseByLegacy(); + boolean preparedStmtReanalyzed = false; PrepareStmtContext preparedStmtCtx = null; if (parsedStmt instanceof ExecuteStmt) { @@ -759,8 +842,6 @@ public class StmtExecutor implements ProfileWriter { preparedStmtCtx.stmt.analyze(analyzer); } - parse(); - // yiguolei: insert stmt's grammar analysis will write editlog, // so that we check if the stmt should be forward to master here // if the stmt should be forward to master, then just return here and the master will do analysis again @@ -793,8 +874,7 @@ public class StmtExecutor implements ProfileWriter { if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt - || parsedStmt instanceof CreateTableAsSelectStmt - || parsedStmt instanceof LogicalPlanAdapter) { + || parsedStmt instanceof CreateTableAsSelectStmt) { Map<Long, TableIf> tableMap = Maps.newTreeMap(); QueryStmt queryStmt; Set<String> parentViewNameSet = Sets.newHashSet(); @@ -833,9 +913,6 @@ public class StmtExecutor implements ProfileWriter { throw e; } catch (Exception e) { LOG.warn("Analyze failed. {}", context.getQueryIdentifier(), e); - if (parsedStmt instanceof LogicalPlanAdapter) { - throw new NereidsException(new AnalysisException("Unexpected exception: " + e.getMessage(), e)); - } throw new AnalysisException("Unexpected exception: " + e.getMessage()); } finally { MetaLockUtils.readUnlockTables(tables); @@ -860,7 +937,7 @@ public class StmtExecutor implements ProfileWriter { } } - private void parse() throws AnalysisException, DdlException { + private void parseByLegacy() throws AnalysisException, DdlException { // parsedStmt may already by set when constructing this StmtExecutor(); if (parsedStmt == null) { // Parse statement with parser generated by CUP&FLEX @@ -981,16 +1058,8 @@ public class StmtExecutor implements ProfileWriter { } } plannerProfile.setQueryAnalysisFinishTime(); - - if (parsedStmt instanceof LogicalPlanAdapter) { - // create plan - planner = new NereidsPlanner(statementContext); - } else { - planner = new OriginalPlanner(analyzer); - } - if (parsedStmt instanceof QueryStmt - || parsedStmt instanceof InsertStmt - || parsedStmt instanceof LogicalPlanAdapter) { + planner = new OriginalPlanner(analyzer); + if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) { planner.plan(parsedStmt, tQueryOptions); } // TODO(zc): @@ -1452,7 +1521,7 @@ public class StmtExecutor implements ProfileWriter { } } - public int executeForTxn(InsertStmt insertStmt) + private int executeForTxn(InsertStmt insertStmt) throws UserException, TException, InterruptedException, ExecutionException, TimeoutException { if (context.isTxnIniting()) { // first time, begin txn beginTxn(insertStmt.getDb(), insertStmt.getTbl()); @@ -1481,7 +1550,7 @@ public class StmtExecutor implements ProfileWriter { } for (List<Expr> row : selectStmt.getValueList().getRows()) { ++effectRows; - InternalService.PDataRow data = getRowStringValue(row); + InternalService.PDataRow data = StmtExecutor.getRowStringValue(row); if (data == null) { continue; } @@ -1900,7 +1969,7 @@ public class StmtExecutor implements ProfileWriter { private void handleLockTablesStmt() { } - private void handleExplainStmt(String result) throws IOException { + public void handleExplainStmt(String result) throws IOException { ShowResultSetMetaData metaData = ShowResultSetMetaData.builder() .addColumn(new Column("Explain String", ScalarType.createVarchar(20))) @@ -2082,7 +2151,7 @@ public class StmtExecutor implements ProfileWriter { return exprs.stream().map(e -> e.getType().getPrimitiveType()).collect(Collectors.toList()); } - private StatementBase setParsedStmt(StatementBase parsedStmt) { + public StatementBase setParsedStmt(StatementBase parsedStmt) { this.parsedStmt = parsedStmt; this.statementContext.setParsedStatement(parsedStmt); return parsedStmt; @@ -2091,10 +2160,30 @@ public class StmtExecutor implements ProfileWriter { public List<ResultRow> executeInternalQuery() { try { List<ResultRow> resultRows = new ArrayList<>(); - analyzer = new Analyzer(context.getEnv(), context); try { - analyze(context.getSessionVariable().toThrift()); - } catch (UserException e) { + if (ConnectContext.get() != null + && ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) { + try { + parseByNereids(); + Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter, + "Nereids only process LogicalPlanAdapter," + + " but parsedStmt is " + parsedStmt.getClass().getName()); + context.getState().setNereids(true); + context.getState().setIsQuery(true); + planner = new NereidsPlanner(statementContext); + planner.plan(parsedStmt, context.getSessionVariable().toThrift()); + } catch (Exception e) { + LOG.warn("fall back to legacy planner, because: {}", e.getMessage(), e); + parsedStmt = null; + context.getState().setNereids(false); + analyzer = new Analyzer(context.getEnv(), context); + analyze(context.getSessionVariable().toThrift()); + } + } else { + analyzer = new Analyzer(context.getEnv(), context); + analyze(context.getSessionVariable().toThrift()); + } + } catch (Exception e) { LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e); return resultRows; } @@ -2164,6 +2253,9 @@ public class StmtExecutor implements ProfileWriter { return resultRows; } + public QueryPlannerProfile getPlannerProfile() { + return plannerProfile; + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java index f32fbb2e53..07e6b7d448 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/parser/NereidsParserTest.java @@ -17,7 +17,6 @@ package org.apache.doris.nereids.parser; -import org.apache.doris.analysis.ExplainOptions; import org.apache.doris.analysis.StatementBase; import org.apache.doris.common.Pair; import org.apache.doris.nereids.StatementContext; @@ -170,12 +169,7 @@ public class NereidsParserTest extends ParserTestBase { LogicalPlan logicalPlan0 = ((LogicalPlanAdapter) statementBases.get(0)).getLogicalPlan(); LogicalPlan logicalPlan1 = ((LogicalPlanAdapter) statementBases.get(1)).getLogicalPlan(); Assertions.assertTrue(logicalPlan0 instanceof LogicalProject); - Assertions.assertTrue(logicalPlan1 instanceof LogicalProject); - Assertions.assertNull(statementBases.get(0).getExplainOptions()); - Assertions.assertNotNull(statementBases.get(1).getExplainOptions()); - ExplainOptions explainOptions = statementBases.get(1).getExplainOptions(); - Assertions.assertTrue(explainOptions.isGraph()); - Assertions.assertFalse(explainOptions.isVerbose()); + Assertions.assertTrue(logicalPlan1 instanceof ExplainCommand); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/preprocess/SelectHintTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/preprocess/SelectHintTest.java index 3e2007f5cb..7e6d3f0a62 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/preprocess/SelectHintTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/preprocess/SelectHintTest.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.preprocess; +import org.apache.doris.catalog.Env; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -26,7 +27,9 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.thrift.TUniqueId; +import mockit.Expectations; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -36,6 +39,7 @@ public class SelectHintTest { String sql = " SELECT /*+ SET_VAR(enable_nereids_planner=\"false\") */ 1"; ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); StatementContext statementContext = new StatementContext(ctx, new OriginStatement(sql, 0)); SessionVariable sv = ctx.getSessionVariable(); Assertions.assertNotNull(sv); @@ -47,7 +51,15 @@ public class SelectHintTest { // manually recover sv sv.setEnableNereidsPlanner(true); sv.enableFallbackToOriginalPlanner = false; - new StmtExecutor(ctx, sql).execute(); + StmtExecutor stmtExecutor = new StmtExecutor(ctx, sql); + + new Expectations(stmtExecutor) { + { + stmtExecutor.executeByLegacy((TUniqueId) any); + } + }; + + stmtExecutor.execute(); Assertions.assertTrue(sv.isEnableNereidsPlanner()); Assertions.assertFalse(sv.enableFallbackToOriginalPlanner); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java index 5193dd9a82..db5f5c03d5 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java @@ -50,7 +50,7 @@ public class SessionVariablesTest extends TestWithFeService { + "properties(\"replication_num\" = \"1\");"); sessionVariable = new SessionVariable(); - Field[] fields = SessionVariable.class.getFields(); + Field[] fields = SessionVariable.class.getDeclaredFields(); for (Field f : fields) { VariableMgr.VarAttr varAttr = f.getAnnotation(VariableMgr.VarAttr.class); if (varAttr == null || !varAttr.needForward()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org