This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 49faccbf50a [enhancement](nereids) speedup sql cache with use variable as partition predicate (#37943) 49faccbf50a is described below commit 49faccbf50a4fa3bb266454e9134e4bfe91fef3b Author: 924060929 <924060...@qq.com> AuthorDate: Tue Jul 16 22:11:18 2024 +0800 [enhancement](nereids) speedup sql cache with use variable as partition predicate (#37943) follow up #37090 support reuse sql cache when use variable as partition predicate and variable change: ```sql set @dt='2024-07-16'; -- create cache 1 select * from tbl where dt = @dt; set @dt='2024-07-17'; -- create cache 2, will not invalidate cache 1 select * from tbl where dt = @dt; set @dt='2024-07-16'; -- reuse cache 1 select * from tbl where dt = @dt; ``` --- .../doris/common/NereidsSqlCacheManager.java | 43 ++++++++++++++++++++-- .../org/apache/doris/nereids/SqlCacheContext.java | 18 +++++++++ .../java/org/apache/doris/qe/ConnectProcessor.java | 14 +++++++ .../cache/parse_sql_from_sql_cache.groovy | 20 ++++++++++ 4 files changed, 91 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java index cbc3c173af6..6c4d5901709 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java @@ -25,12 +25,14 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; import org.apache.doris.common.ConfigBase.DefaultConfHandler; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.DataMaskPolicy; import org.apache.doris.mysql.privilege.RowFilterPolicy; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.SqlCacheContext; +import org.apache.doris.nereids.SqlCacheContext.CacheKeyType; import org.apache.doris.nereids.SqlCacheContext.FullColumnName; import org.apache.doris.nereids.SqlCacheContext.FullTableName; import org.apache.doris.nereids.SqlCacheContext.ScanTable; @@ -124,7 +126,9 @@ public class NereidsSqlCacheManager { SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get(); UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); - String key = currentUserIdentity.toString() + ":" + sql.trim(); + String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL + ? currentUserIdentity.toString() + ":" + sql.trim() + : currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5()); if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null && sqlCacheContext.getResultSetInFe().isPresent()) { sqlCaches.put(key, sqlCacheContext); @@ -142,7 +146,9 @@ public class NereidsSqlCacheManager { } SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get(); UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); - String key = currentUserIdentity.toString() + ":" + sql.trim(); + String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL + ? currentUserIdentity.toString() + ":" + sql.trim() + : currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5()); if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null) { SqlCache cache = (SqlCache) analyzer.getCache(); sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum()); @@ -162,8 +168,7 @@ public class NereidsSqlCacheManager { /** tryParseSql */ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, String sql) { UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); - Env env = connectContext.getEnv(); - String key = currentUserIdentity.toString() + ":" + sql.trim(); + String key = currentUserIdentity + ":" + sql.trim(); SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key); if (sqlCacheContext == null) { return Optional.empty(); @@ -171,6 +176,36 @@ public class NereidsSqlCacheManager { // LOG.info("Total size: " + GraphLayout.parseInstance(sqlCacheContext).totalSize()); + List<Variable> currentVariables = resolveUserVariables(sqlCacheContext); + if (usedVariablesChanged(currentVariables, sqlCacheContext)) { + String md5 = DebugUtil.printId( + sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables))); + + String md5CacheKey = currentUserIdentity + ":" + md5; + SqlCacheContext sqlCacheContextWithVariable = sqlCaches.getIfPresent(md5CacheKey); + + // already exist cache in the fe, but the variable is different to this query, + // we should create another cache context in fe, use another cache key + connectContext.getStatementContext() + .getSqlCacheContext().ifPresent(ctx -> ctx.setCacheKeyType(CacheKeyType.MD5)); + + if (sqlCacheContextWithVariable != null) { + return tryParseSqlWithoutCheckVariable( + connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity + ); + } else { + return Optional.empty(); + } + } else { + return tryParseSqlWithoutCheckVariable(connectContext, key, sqlCacheContext, currentUserIdentity); + } + } + + private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable( + ConnectContext connectContext, String key, + SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) { + Env env = connectContext.getEnv(); + // check table and view and their columns authority if (privilegeChanged(connectContext, env, sqlCacheContext)) { return invalidateCache(key); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java index a0c95a9113e..4cf2418d91e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java @@ -86,6 +86,8 @@ public class SqlCacheContext { private volatile PUniqueId cacheKeyMd5; private volatile ResultSet resultSetInFe; + private volatile CacheKeyType cacheKeyType = CacheKeyType.SQL; + public SqlCacheContext(UserIdentity userIdentity, TUniqueId queryId) { this.userIdentity = Objects.requireNonNull(userIdentity, "userIdentity cannot be null"); this.queryId = Objects.requireNonNull(queryId, "queryId cannot be null"); @@ -392,6 +394,14 @@ public class SqlCacheContext { this.resultSetInFe = resultSetInFe; } + public CacheKeyType getCacheKeyType() { + return cacheKeyType; + } + + public void setCacheKeyType(CacheKeyType cacheKeyType) { + this.cacheKeyType = cacheKeyType; + } + /** FullTableName */ @lombok.Data @lombok.AllArgsConstructor @@ -434,4 +444,12 @@ public class SqlCacheContext { this.scanPartitions.add(partitionId); } } + + /** CacheKeyType */ + public enum CacheKeyType { + // use `userIdentity`:`sql`.trim() as Cache key in FE + SQL, + // use MD5 as Cache key in FE + MD5 + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index aac7951ecdf..7b0ef09a744 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -49,6 +49,8 @@ import org.apache.doris.mysql.MysqlCommand; import org.apache.doris.mysql.MysqlPacket; import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.mysql.MysqlServerStatusFlag; +import org.apache.doris.nereids.SqlCacheContext; +import org.apache.doris.nereids.SqlCacheContext.CacheKeyType; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.nereids.exceptions.ParseException; @@ -230,9 +232,15 @@ public abstract class ConnectProcessor { boolean nereidsUseServerPrep = (sessionVariable.enableServeSidePreparedStatement && !sessionVariable.isEnableInsertGroupCommit()) || mysqlCommand == MysqlCommand.COM_QUERY; + CacheKeyType cacheKeyType = null; if (nereidsUseServerPrep && sessionVariable.isEnableNereidsPlanner()) { if (wantToParseSqlFromSqlCache) { cachedStmts = parseFromSqlCache(originStmt); + Optional<SqlCacheContext> sqlCacheContext = ConnectContext.get() + .getStatementContext().getSqlCacheContext(); + if (sqlCacheContext.isPresent()) { + cacheKeyType = sqlCacheContext.get().getCacheKeyType(); + } if (cachedStmts != null) { stmts = cachedStmts; } @@ -310,6 +318,12 @@ public abstract class ConnectProcessor { executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime); ctx.setExecutor(executor); + if (cacheKeyType != null) { + SqlCacheContext sqlCacheContext = + executor.getContext().getStatementContext().getSqlCacheContext().get(); + sqlCacheContext.setCacheKeyType(cacheKeyType); + } + try { executor.execute(); if (connectType.equals(ConnectType.MYSQL)) { diff --git a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy index d95c3edc344..2c17da24661 100644 --- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy +++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy @@ -503,6 +503,26 @@ suite("parse_sql_from_sql_cache") { assertHasCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" def result1 = sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" assertTrue(result1.size() == 1 && result1[0][0].toString().toInteger() == 10) + + + sql "set @custom_variable2=1" + assertNoCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" + def res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" + assertTrue(res[0][0] == 1) + assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" + + sql "set @custom_variable2=2" + assertNoCache "select* from test_use_plan_cache17 where id = @custom_variable2 and value = 1" + // should not invalidate cache with @custom_variable2=1 + res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" + assertTrue(res[0][0] == 2) + assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" + + sql "set @custom_variable2=1" + // should reuse cache + assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" + res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" + assertTrue(res[0][0] == 1) } }), extraThread("test_udf", { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org