This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 42f4271e9df [enhancement](nereids) speedup sql cache with variable (#37090) (#37119) 42f4271e9df is described below commit 42f4271e9dfb5c5d912e0d9335167271cdd831b5 Author: 924060929 <924060...@qq.com> AuthorDate: Tue Jul 2 19:25:22 2024 +0800 [enhancement](nereids) speedup sql cache with variable (#37090) (#37119) cherry pick from #37090 --- .../doris/common/NereidsSqlCacheManager.java | 49 +++++++----- .../org/apache/doris/nereids/NereidsPlanner.java | 79 +++---------------- .../org/apache/doris/nereids/SqlCacheContext.java | 88 +++++++++++----------- .../nereids/trees/plans/ComputeResultSet.java | 55 ++++++++++++++ .../plans/physical/PhysicalEmptyRelation.java | 39 +++++++++- .../plans/physical/PhysicalOneRowRelation.java | 49 +++++++++++- .../trees/plans/physical/PhysicalResultSink.java | 18 ++++- .../trees/plans/physical/PhysicalSqlCache.java | 11 ++- .../java/org/apache/doris/qe/StmtExecutor.java | 8 +- .../cache/parse_sql_from_sql_cache.groovy | 30 +++++--- 10 files changed, 276 insertions(+), 150 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java index cf6280650f0..cbc3c173af6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java @@ -48,6 +48,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache; +import org.apache.doris.nereids.util.Utils; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.Types.PUniqueId; import org.apache.doris.qe.ConnectContext; @@ -58,6 +59,7 @@ import org.apache.doris.qe.cache.SqlCache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import java.lang.reflect.Field; @@ -123,16 +125,14 @@ public class NereidsSqlCacheManager { SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get(); UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); String key = currentUserIdentity.toString() + ":" + sql.trim(); - if ((sqlCaches.getIfPresent(key) == null) && sqlCacheContext.getOrComputeCacheKeyMd5() != null + if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null && sqlCacheContext.getResultSetInFe().isPresent()) { sqlCaches.put(key, sqlCacheContext); } } - /** tryAddCache */ - public void tryAddCache( - ConnectContext connectContext, String sql, - CacheAnalyzer analyzer, boolean currentMissParseSqlFromSqlCache) { + /** tryAddBeCache */ + public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyzer analyzer) { Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext(); if (!sqlCacheContextOpt.isPresent()) { return; @@ -143,8 +143,7 @@ public class NereidsSqlCacheManager { SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get(); UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); String key = currentUserIdentity.toString() + ":" + sql.trim(); - if ((currentMissParseSqlFromSqlCache || sqlCaches.getIfPresent(key) == null) - && sqlCacheContext.getOrComputeCacheKeyMd5() != null) { + if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null) { SqlCache cache = (SqlCache) analyzer.getCache(); sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum()); sqlCacheContext.setLatestPartitionId(cache.getLatestId()); @@ -182,9 +181,6 @@ public class NereidsSqlCacheManager { if (viewsChanged(env, sqlCacheContext)) { return invalidateCache(key); } - if (usedVariablesChanged(sqlCacheContext)) { - return invalidateCache(key); - } LogicalEmptyRelation whateverPlan = new LogicalEmptyRelation(new RelationId(0), ImmutableList.of()); if (nondeterministicFunctionChanged(whateverPlan, connectContext, sqlCacheContext)) { @@ -201,7 +197,10 @@ public class NereidsSqlCacheManager { try { Optional<ResultSet> resultSetInFe = sqlCacheContext.getResultSetInFe(); - if (resultSetInFe.isPresent()) { + + List<Variable> currentVariables = resolveUserVariables(sqlCacheContext); + boolean usedVariablesChanged = usedVariablesChanged(currentVariables, sqlCacheContext); + if (resultSetInFe.isPresent() && !usedVariablesChanged) { MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L); String cachedPlan = sqlCacheContext.getPhysicalPlan(); @@ -214,7 +213,9 @@ public class NereidsSqlCacheManager { } Status status = new Status(); - PUniqueId cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5(); + PUniqueId cacheKeyMd5 = usedVariablesChanged + ? sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables)) + : sqlCacheContext.getOrComputeCacheKeyMd5(); InternalService.PFetchCacheResult cacheData = SqlCache.getCacheData(sqlCacheContext.getCacheProxy(), cacheKeyMd5, sqlCacheContext.getLatestPartitionId(), @@ -235,7 +236,7 @@ public class NereidsSqlCacheManager { ); return Optional.of(logicalSqlCache); } - return invalidateCache(key); + return Optional.empty(); } catch (Throwable t) { return invalidateCache(key); } @@ -342,12 +343,24 @@ public class NereidsSqlCacheManager { return false; } - private boolean usedVariablesChanged(SqlCacheContext sqlCacheContext) { - for (Variable variable : sqlCacheContext.getUsedVariables()) { + private List<Variable> resolveUserVariables(SqlCacheContext sqlCacheContext) { + List<Variable> cachedUsedVariables = sqlCacheContext.getUsedVariables(); + List<Variable> currentVariables = Lists.newArrayListWithCapacity(cachedUsedVariables.size()); + for (Variable cachedVariable : cachedUsedVariables) { Variable currentVariable = ExpressionAnalyzer.resolveUnboundVariable( - new UnboundVariable(variable.getName(), variable.getType())); - if (!Objects.equals(currentVariable, variable) - || variable.getRealExpression().anyMatch(Nondeterministic.class::isInstance)) { + new UnboundVariable(cachedVariable.getName(), cachedVariable.getType())); + currentVariables.add(currentVariable); + } + return currentVariables; + } + + private boolean usedVariablesChanged(List<Variable> currentVariables, SqlCacheContext sqlCacheContext) { + List<Variable> cachedUsedVariables = sqlCacheContext.getUsedVariables(); + for (int i = 0; i < cachedUsedVariables.size(); i++) { + Variable currentVariable = currentVariables.get(i); + Variable cachedVariable = cachedUsedVariables.get(i); + if (!Objects.equals(currentVariable, cachedVariable) + || cachedVariable.getRealExpression().anyMatch(Nondeterministic.class::isInstance)) { return true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 1ae1864ad3b..667784ad119 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -19,10 +19,7 @@ package org.apache.doris.nereids; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.ExplainOptions; -import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.StatementBase; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.common.NereidsException; import org.apache.doris.common.Pair; import org.apache.doris.common.profile.SummaryProfile; @@ -45,37 +42,28 @@ import org.apache.doris.nereids.processor.post.PlanPostProcessors; import org.apache.doris.nereids.processor.pre.PlanPreprocessors; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.exploration.mv.MaterializationContext; -import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.ComputeResultSet; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache; -import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; -import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; -import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.planner.ScanNode; -import org.apache.doris.qe.CommonResultSet; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ResultSet; -import org.apache.doris.qe.ResultSetMetaData; -import org.apache.doris.qe.cache.CacheAnalyzer; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -535,65 +523,16 @@ public class NereidsPlanner extends Planner { if (!(parsedStmt instanceof LogicalPlanAdapter)) { return Optional.empty(); } - if (physicalPlan instanceof PhysicalSqlCache - && ((PhysicalSqlCache) physicalPlan).getResultSet().isPresent()) { - return Optional.of(((PhysicalSqlCache) physicalPlan).getResultSet().get()); - } - if (!(physicalPlan instanceof PhysicalResultSink)) { - return Optional.empty(); - } - - Optional<SqlCacheContext> sqlCacheContext = statementContext.getSqlCacheContext(); - boolean enableSqlCache - = CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable()); - Plan child = physicalPlan.child(0); - if (child instanceof PhysicalOneRowRelation) { - PhysicalOneRowRelation physicalOneRowRelation = (PhysicalOneRowRelation) physicalPlan.child(0); - List<Column> columns = Lists.newArrayList(); - List<String> data = Lists.newArrayList(); - for (int i = 0; i < physicalOneRowRelation.getProjects().size(); i++) { - NamedExpression item = physicalOneRowRelation.getProjects().get(i); - NamedExpression output = physicalPlan.getOutput().get(i); - Expression expr = item.child(0); - if (expr instanceof Literal) { - LiteralExpr legacyExpr = ((Literal) expr).toLegacyLiteral(); - columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType())); - data.add(legacyExpr.getStringValueInFe()); - } else { - return Optional.empty(); - } - } - - ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); - ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data)); - if (sqlCacheContext.isPresent() && enableSqlCache) { - sqlCacheContext.get().setResultSetInFe(resultSet); - Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache( - statementContext.getConnectContext(), - statementContext.getOriginStatement().originStmt - ); + if (physicalPlan instanceof ComputeResultSet) { + Optional<SqlCacheContext> sqlCacheContext = statementContext.getSqlCacheContext(); + Optional<ResultSet> resultSet = ((ComputeResultSet) physicalPlan) + .computeResultInFe(cascadesContext, sqlCacheContext); + if (resultSet.isPresent()) { + return resultSet; } - return Optional.of(resultSet); - } else if (child instanceof PhysicalEmptyRelation) { - List<Column> columns = Lists.newArrayList(); - for (int i = 0; i < physicalPlan.getOutput().size(); i++) { - NamedExpression output = physicalPlan.getOutput().get(i); - columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType())); - } - - ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); - ResultSet resultSet = new CommonResultSet(metadata, ImmutableList.of()); - if (sqlCacheContext.isPresent() && enableSqlCache) { - sqlCacheContext.get().setResultSetInFe(resultSet); - Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache( - statementContext.getConnectContext(), - statementContext.getOriginStatement().originStmt - ); - } - return Optional.of(resultSet); - } else { - return Optional.empty(); } + + return Optional.empty(); } @VisibleForTesting diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java index f3fa61cecaa..a0c95a9113e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java @@ -329,53 +329,57 @@ public class SqlCacheContext { if (cacheKeyMd5 != null) { return cacheKeyMd5; } - - StringBuilder cacheKey = new StringBuilder(originSql); - for (Entry<FullTableName, String> entry : usedViews.entrySet()) { - cacheKey.append("|") - .append(entry.getKey()) - .append("=") - .append(entry.getValue()); - } - for (Variable usedVariable : usedVariables) { - cacheKey.append("|") - .append(usedVariable.getType().name()) - .append(":") - .append(usedVariable.getName()) - .append("=") - .append(usedVariable.getRealExpression().toSql()); - } - for (Pair<Expression, Expression> pair : foldNondeterministicPairs) { - cacheKey.append("|") - .append(pair.key().toSql()) - .append("=") - .append(pair.value().toSql()); - } - for (Entry<FullTableName, List<RowFilterPolicy>> entry : rowPolicies.entrySet()) { - List<RowFilterPolicy> policy = entry.getValue(); - if (policy.isEmpty()) { - continue; - } - cacheKey.append("|") - .append(entry.getKey()) - .append("=") - .append(policy); - } - for (Entry<FullColumnName, Optional<DataMaskPolicy>> entry : dataMaskPolicies.entrySet()) { - if (!entry.getValue().isPresent()) { - continue; - } - cacheKey.append("|") - .append(entry.getKey()) - .append("=") - .append(entry.getValue().map(Object::toString).orElse("")); - } - cacheKeyMd5 = CacheProxy.getMd5(cacheKey.toString()); + cacheKeyMd5 = doComputeCacheKeyMd5(usedVariables); } } return cacheKeyMd5; } + /** doComputeCacheKeyMd5 */ + public synchronized PUniqueId doComputeCacheKeyMd5(Set<Variable> usedVariables) { + StringBuilder cacheKey = new StringBuilder(originSql); + for (Entry<FullTableName, String> entry : usedViews.entrySet()) { + cacheKey.append("|") + .append(entry.getKey()) + .append("=") + .append(entry.getValue()); + } + for (Variable usedVariable : usedVariables) { + cacheKey.append("|") + .append(usedVariable.getType().name()) + .append(":") + .append(usedVariable.getName()) + .append("=") + .append(usedVariable.getRealExpression().toSql()); + } + for (Pair<Expression, Expression> pair : foldNondeterministicPairs) { + cacheKey.append("|") + .append(pair.key().toSql()) + .append("=") + .append(pair.value().toSql()); + } + for (Entry<FullTableName, List<RowFilterPolicy>> entry : rowPolicies.entrySet()) { + List<RowFilterPolicy> policy = entry.getValue(); + if (policy.isEmpty()) { + continue; + } + cacheKey.append("|") + .append(entry.getKey()) + .append("=") + .append(policy); + } + for (Entry<FullColumnName, Optional<DataMaskPolicy>> entry : dataMaskPolicies.entrySet()) { + if (!entry.getValue().isPresent()) { + continue; + } + cacheKey.append("|") + .append(entry.getKey()) + .append("=") + .append(entry.getValue().map(Object::toString).orElse("")); + } + return CacheProxy.getMd5(cacheKey.toString()); + } + public void setOriginSql(String originSql) { this.originSql = originSql.trim(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/ComputeResultSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/ComputeResultSet.java new file mode 100644 index 00000000000..beee784ec9d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/ComputeResultSet.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans; + +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.SqlCacheContext; +import org.apache.doris.qe.ResultSet; + +import java.util.Optional; + +/** + * <p> + * This class is used to return result set in fe without send fragment to be. + * Some plans support this function, for example: + * <li>1. the sql `select 100` will generate a plan, PhysicalOneRowRelation, and PhysicalOneRowRelation implement this + * interface, so fe can send the only row to client immediately. + * </li> + * <li>2. the sql `select * from tbl limit 0` will generate PhysicalEmptyRelation, which means no any rows returned, + * the PhysicalEmptyRelation implement this interface. + * </li> + * </p> + * <p> + * If you want to cache the result set in fe, you can implement this interface and write this code: + * </p> + * <pre> + * StatementContext statementContext = cascadesContext.getStatementContext(); + * boolean enableSqlCache + * = CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable()); + * if (sqlCacheContext.isPresent() && enableSqlCache) { + * sqlCacheContext.get().setResultSetInFe(resultSet); + * Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache( + * statementContext.getConnectContext(), + * statementContext.getOriginStatement().originStmt + * ); + * } + * </pre> + */ +public interface ComputeResultSet { + Optional<ResultSet> computeResultInFe(CascadesContext cascadesContext, Optional<SqlCacheContext> sqlCacheContext); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalEmptyRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalEmptyRelation.java index 2a9c344bd46..e01c3ead327 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalEmptyRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalEmptyRelation.java @@ -17,20 +17,31 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.SqlCacheContext; +import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.ComputeResultSet; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.algebra.EmptyRelation; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.CommonResultSet; +import org.apache.doris.qe.ResultSet; +import org.apache.doris.qe.ResultSetMetaData; +import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.util.List; import java.util.Objects; @@ -41,7 +52,7 @@ import java.util.Optional; * e.g. * select * from tbl limit 0 */ -public class PhysicalEmptyRelation extends PhysicalRelation implements EmptyRelation { +public class PhysicalEmptyRelation extends PhysicalRelation implements EmptyRelation, ComputeResultSet { private final List<? extends NamedExpression> projects; @@ -102,4 +113,30 @@ public class PhysicalEmptyRelation extends PhysicalRelation implements EmptyRela return new PhysicalEmptyRelation(relationId, projects, Optional.empty(), logicalPropertiesSupplier.get(), physicalProperties, statistics); } + + @Override + public Optional<ResultSet> computeResultInFe(CascadesContext cascadesContext, + Optional<SqlCacheContext> sqlCacheContext) { + List<Column> columns = Lists.newArrayList(); + List<Slot> outputSlots = getOutput(); + for (int i = 0; i < outputSlots.size(); i++) { + NamedExpression output = outputSlots.get(i); + columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType())); + } + + StatementContext statementContext = cascadesContext.getStatementContext(); + boolean enableSqlCache + = CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable()); + + ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); + ResultSet resultSet = new CommonResultSet(metadata, ImmutableList.of()); + if (sqlCacheContext.isPresent() && enableSqlCache) { + sqlCacheContext.get().setResultSetInFe(resultSet); + Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache( + statementContext.getConnectContext(), + statementContext.getOriginStatement().originStmt + ); + } + return Optional.of(resultSet); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOneRowRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOneRowRelation.java index e1ab09739ed..2b1b91891cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOneRowRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOneRowRelation.java @@ -17,21 +17,35 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.SqlCacheContext; +import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.ComputeResultSet; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.CommonResultSet; +import org.apache.doris.qe.ResultSet; +import org.apache.doris.qe.ResultSetMetaData; +import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -40,7 +54,7 @@ import java.util.Optional; * A physical relation that contains only one row consist of some constant expressions. * e.g. select 100, 'value' */ -public class PhysicalOneRowRelation extends PhysicalRelation implements OneRowRelation { +public class PhysicalOneRowRelation extends PhysicalRelation implements OneRowRelation, ComputeResultSet { private final List<NamedExpression> projects; @@ -119,4 +133,37 @@ public class PhysicalOneRowRelation extends PhysicalRelation implements OneRowRe return new PhysicalOneRowRelation(relationId, projects, groupExpression, logicalPropertiesSupplier.get(), physicalProperties, statistics); } + + @Override + public Optional<ResultSet> computeResultInFe( + CascadesContext cascadesContext, Optional<SqlCacheContext> sqlCacheContext) { + List<Column> columns = Lists.newArrayList(); + List<String> data = Lists.newArrayList(); + for (int i = 0; i < projects.size(); i++) { + NamedExpression item = projects.get(i); + NamedExpression output = getOutput().get(i); + Expression expr = item.child(0); + if (expr instanceof Literal) { + LiteralExpr legacyExpr = ((Literal) expr).toLegacyLiteral(); + columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType())); + data.add(legacyExpr.getStringValueInFe()); + } else { + return Optional.empty(); + } + } + + ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); + ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data)); + StatementContext statementContext = cascadesContext.getStatementContext(); + boolean enableSqlCache + = CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable()); + if (sqlCacheContext.isPresent() && enableSqlCache) { + sqlCacheContext.get().setResultSetInFe(resultSet); + Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache( + statementContext.getConnectContext(), + statementContext.getOriginStatement().originStmt + ); + } + return Optional.of(resultSet); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java index eba99cdfb21..aceb1f13774 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java @@ -17,16 +17,20 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.SqlCacheContext; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.ComputeResultSet; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.ResultSet; import org.apache.doris.statistics.Statistics; import com.google.common.base.Preconditions; @@ -39,7 +43,8 @@ import java.util.Optional; /** * result sink */ -public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CHILD_TYPE> implements Sink { +public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CHILD_TYPE> + implements Sink, ComputeResultSet { public PhysicalResultSink(List<NamedExpression> outputExprs, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { @@ -125,4 +130,15 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CH return new PhysicalResultSink<>(outputExprs, groupExpression, null, physicalProperties, statistics, child()); } + + @Override + public Optional<ResultSet> computeResultInFe( + CascadesContext cascadesContext, Optional<SqlCacheContext> sqlCacheContext) { + CHILD_TYPE child = child(); + if (child instanceof ComputeResultSet) { + return ((ComputeResultSet) child).computeResultInFe(cascadesContext, sqlCacheContext); + } else { + return Optional.empty(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSqlCache.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSqlCache.java index 824ca7e8924..124f52f6080 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSqlCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSqlCache.java @@ -19,12 +19,15 @@ package org.apache.doris.nereids.trees.plans.physical; import org.apache.doris.analysis.Expr; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.SqlCacheContext; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.FunctionalDependencies; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.ComputeResultSet; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.TreeStringPlan; @@ -44,7 +47,7 @@ import java.util.Objects; import java.util.Optional; /** PhysicalSqlCache */ -public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStringPlan { +public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStringPlan, ComputeResultSet { private final TUniqueId queryId; private final List<String> columnLabels; private final List<Expr> resultExprs; @@ -154,4 +157,10 @@ public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStri public String getChildrenTreeString() { return planBody; } + + @Override + public Optional<ResultSet> computeResultInFe( + CascadesContext cascadesContext, Optional<SqlCacheContext> sqlCacheContext) { + return resultSet; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 1e63a750748..5b8082fd0ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1591,7 +1591,7 @@ public class StmtExecutor { String originStmt = parsedStmt.getOrigStmt().originStmt; NereidsSqlCacheManager sqlCacheManager = context.getEnv().getSqlCacheManager(); if (cacheResult != null) { - sqlCacheManager.tryAddCache(context, originStmt, cacheAnalyzer, false); + sqlCacheManager.tryAddBeCache(context, originStmt, cacheAnalyzer); } } } @@ -1809,11 +1809,7 @@ public class StmtExecutor { Cache cache = cacheAnalyzer.getCache(); if (cache instanceof SqlCache && !cache.isDisableCache() && planner instanceof NereidsPlanner) { String originStmt = parsedStmt.getOrigStmt().originStmt; - LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) queryStmt; - boolean currentMissParseSqlFromSqlCache = !(logicalPlanAdapter.getLogicalPlan() - instanceof org.apache.doris.nereids.trees.plans.algebra.SqlCache); - context.getEnv().getSqlCacheManager().tryAddCache( - context, originStmt, cacheAnalyzer, currentMissParseSqlFromSqlCache); + context.getEnv().getSqlCacheManager().tryAddBeCache(context, originStmt, cacheAnalyzer); } } if (!isSendFields) { diff --git a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy index 085bfe81359..d95c3edc344 100644 --- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy +++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy @@ -485,14 +485,24 @@ suite("parse_sql_from_sql_cache") { sql "set enable_sql_cache=true" sql "set @custom_variable=10" - assertNoCache "select @custom_variable from test_use_plan_cache17" + assertNoCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" // create sql cache - sql "select @custom_variable from test_use_plan_cache17" + sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" // can use sql cache - assertHasCache "select @custom_variable from test_use_plan_cache17" + assertHasCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" sql "set @custom_variable=20" - assertNoCache "select @custom_variable from test_use_plan_cache17" + assertNoCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" + + def result2 = sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" + assertHasCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" + assertTrue(result2.size() == 1 && result2[0][0].toString().toInteger() == 20) + + // we can switch to origin value and reuse origin cache + sql "set @custom_variable=10" + assertHasCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" + def result1 = sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" + assertTrue(result1.size() == 1 && result1[0][0].toString().toInteger() == 10) } }), extraThread("test_udf", { @@ -634,12 +644,14 @@ suite("parse_sql_from_sql_cache") { sql "set enable_fallback_to_original_planner=false" sql "set enable_sql_cache=true" - assertNoCache "select * from (select 100 as id)a" - def result1 = sql "select * from (select 100 as id)a" + int randomInt = Math.random() * 2000000000 + + assertNoCache "select * from (select $randomInt as id)a" + def result1 = sql "select * from (select $randomInt as id)a" assertTrue(result1.size() == 1) - assertHasCache "select * from (select 100 as id)a" - def result2 = sql "select * from (select 100 as id)a" + assertHasCache "select * from (select $randomInt as id)a" + def result2 = sql "select * from (select $randomInt as id)a" assertTrue(result2.size() == 1) assertNoCache "select * from test_use_plan_cache20 limit 0" @@ -671,8 +683,6 @@ suite("parse_sql_from_sql_cache") { distributed by hash(id) properties('replication_num'='1')""" - - sql "insert into test_use_plan_cache21 values('2', '2')" sleep(100) sql "insert into test_use_plan_cache21 values('1', '1')" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org