This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 49faccbf50a [enhancement](nereids) speedup sql cache with use variable 
as partition predicate (#37943)
49faccbf50a is described below

commit 49faccbf50a4fa3bb266454e9134e4bfe91fef3b
Author: 924060929 <924060...@qq.com>
AuthorDate: Tue Jul 16 22:11:18 2024 +0800

    [enhancement](nereids) speedup sql cache with use variable as partition 
predicate (#37943)
    
    follow up #37090
    
    support reuse sql cache when use variable as partition predicate and 
variable change:
    ```sql
    set @dt='2024-07-16';
    -- create cache 1
    select * from tbl where dt = @dt;
    
    set @dt='2024-07-17';
    -- create cache 2, will not invalidate cache 1
    select * from tbl where dt = @dt;
    
    set @dt='2024-07-16';
    -- reuse cache 1
    select * from tbl where dt = @dt;
    ```
---
 .../doris/common/NereidsSqlCacheManager.java       | 43 ++++++++++++++++++++--
 .../org/apache/doris/nereids/SqlCacheContext.java  | 18 +++++++++
 .../java/org/apache/doris/qe/ConnectProcessor.java | 14 +++++++
 .../cache/parse_sql_from_sql_cache.groovy          | 20 ++++++++++
 4 files changed, 91 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
index cbc3c173af6..6c4d5901709 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
@@ -25,12 +25,14 @@ import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.View;
 import org.apache.doris.common.ConfigBase.DefaultConfHandler;
+import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.DataMaskPolicy;
 import org.apache.doris.mysql.privilege.RowFilterPolicy;
 import org.apache.doris.nereids.CascadesContext;
 import org.apache.doris.nereids.SqlCacheContext;
+import org.apache.doris.nereids.SqlCacheContext.CacheKeyType;
 import org.apache.doris.nereids.SqlCacheContext.FullColumnName;
 import org.apache.doris.nereids.SqlCacheContext.FullTableName;
 import org.apache.doris.nereids.SqlCacheContext.ScanTable;
@@ -124,7 +126,9 @@ public class NereidsSqlCacheManager {
 
         SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
         UserIdentity currentUserIdentity = 
connectContext.getCurrentUserIdentity();
-        String key = currentUserIdentity.toString() + ":" + sql.trim();
+        String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
+                ? currentUserIdentity.toString() + ":" + sql.trim()
+                : currentUserIdentity.toString() + ":" + 
DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
         if (sqlCaches.getIfPresent(key) == null && 
sqlCacheContext.getOrComputeCacheKeyMd5() != null
                 && sqlCacheContext.getResultSetInFe().isPresent()) {
             sqlCaches.put(key, sqlCacheContext);
@@ -142,7 +146,9 @@ public class NereidsSqlCacheManager {
         }
         SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
         UserIdentity currentUserIdentity = 
connectContext.getCurrentUserIdentity();
-        String key = currentUserIdentity.toString() + ":" + sql.trim();
+        String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
+                ? currentUserIdentity.toString() + ":" + sql.trim()
+                : currentUserIdentity.toString() + ":" + 
DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
         if (sqlCaches.getIfPresent(key) == null && 
sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
             SqlCache cache = (SqlCache) analyzer.getCache();
             sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum());
@@ -162,8 +168,7 @@ public class NereidsSqlCacheManager {
     /** tryParseSql */
     public Optional<LogicalSqlCache> tryParseSql(ConnectContext 
connectContext, String sql) {
         UserIdentity currentUserIdentity = 
connectContext.getCurrentUserIdentity();
-        Env env = connectContext.getEnv();
-        String key = currentUserIdentity.toString() + ":" + sql.trim();
+        String key = currentUserIdentity + ":" + sql.trim();
         SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key);
         if (sqlCacheContext == null) {
             return Optional.empty();
@@ -171,6 +176,36 @@ public class NereidsSqlCacheManager {
 
         // LOG.info("Total size: " + 
GraphLayout.parseInstance(sqlCacheContext).totalSize());
 
+        List<Variable> currentVariables = 
resolveUserVariables(sqlCacheContext);
+        if (usedVariablesChanged(currentVariables, sqlCacheContext)) {
+            String md5 = DebugUtil.printId(
+                    
sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables)));
+
+            String md5CacheKey = currentUserIdentity + ":" + md5;
+            SqlCacheContext sqlCacheContextWithVariable = 
sqlCaches.getIfPresent(md5CacheKey);
+
+            // already exist cache in the fe, but the variable is different to 
this query,
+            // we should create another cache context in fe, use another cache 
key
+            connectContext.getStatementContext()
+                    .getSqlCacheContext().ifPresent(ctx -> 
ctx.setCacheKeyType(CacheKeyType.MD5));
+
+            if (sqlCacheContextWithVariable != null) {
+                return tryParseSqlWithoutCheckVariable(
+                        connectContext, md5CacheKey, 
sqlCacheContextWithVariable, currentUserIdentity
+                );
+            } else {
+                return Optional.empty();
+            }
+        } else {
+            return tryParseSqlWithoutCheckVariable(connectContext, key, 
sqlCacheContext, currentUserIdentity);
+        }
+    }
+
+    private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
+            ConnectContext connectContext, String key,
+            SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) 
{
+        Env env = connectContext.getEnv();
+
         // check table and view and their columns authority
         if (privilegeChanged(connectContext, env, sqlCacheContext)) {
             return invalidateCache(key);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
index a0c95a9113e..4cf2418d91e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
@@ -86,6 +86,8 @@ public class SqlCacheContext {
     private volatile PUniqueId cacheKeyMd5;
     private volatile ResultSet resultSetInFe;
 
+    private volatile CacheKeyType cacheKeyType = CacheKeyType.SQL;
+
     public SqlCacheContext(UserIdentity userIdentity, TUniqueId queryId) {
         this.userIdentity = Objects.requireNonNull(userIdentity, "userIdentity 
cannot be null");
         this.queryId = Objects.requireNonNull(queryId, "queryId cannot be 
null");
@@ -392,6 +394,14 @@ public class SqlCacheContext {
         this.resultSetInFe = resultSetInFe;
     }
 
+    public CacheKeyType getCacheKeyType() {
+        return cacheKeyType;
+    }
+
+    public void setCacheKeyType(CacheKeyType cacheKeyType) {
+        this.cacheKeyType = cacheKeyType;
+    }
+
     /** FullTableName */
     @lombok.Data
     @lombok.AllArgsConstructor
@@ -434,4 +444,12 @@ public class SqlCacheContext {
             this.scanPartitions.add(partitionId);
         }
     }
+
+    /** CacheKeyType */
+    public enum CacheKeyType {
+        // use `userIdentity`:`sql`.trim() as Cache key in FE
+        SQL,
+        // use MD5 as Cache key in FE
+        MD5
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index aac7951ecdf..7b0ef09a744 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -49,6 +49,8 @@ import org.apache.doris.mysql.MysqlCommand;
 import org.apache.doris.mysql.MysqlPacket;
 import org.apache.doris.mysql.MysqlSerializer;
 import org.apache.doris.mysql.MysqlServerStatusFlag;
+import org.apache.doris.nereids.SqlCacheContext;
+import org.apache.doris.nereids.SqlCacheContext.CacheKeyType;
 import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.nereids.exceptions.ParseException;
@@ -230,9 +232,15 @@ public abstract class ConnectProcessor {
         boolean nereidsUseServerPrep = 
(sessionVariable.enableServeSidePreparedStatement
                     && !sessionVariable.isEnableInsertGroupCommit())
                         || mysqlCommand == MysqlCommand.COM_QUERY;
+        CacheKeyType cacheKeyType = null;
         if (nereidsUseServerPrep && sessionVariable.isEnableNereidsPlanner()) {
             if (wantToParseSqlFromSqlCache) {
                 cachedStmts = parseFromSqlCache(originStmt);
+                Optional<SqlCacheContext> sqlCacheContext = 
ConnectContext.get()
+                        .getStatementContext().getSqlCacheContext();
+                if (sqlCacheContext.isPresent()) {
+                    cacheKeyType = sqlCacheContext.get().getCacheKeyType();
+                }
                 if (cachedStmts != null) {
                     stmts = cachedStmts;
                 }
@@ -310,6 +318,12 @@ public abstract class ConnectProcessor {
                 
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
                 ctx.setExecutor(executor);
 
+                if (cacheKeyType != null) {
+                    SqlCacheContext sqlCacheContext =
+                            
executor.getContext().getStatementContext().getSqlCacheContext().get();
+                    sqlCacheContext.setCacheKeyType(cacheKeyType);
+                }
+
                 try {
                     executor.execute();
                     if (connectType.equals(ConnectType.MYSQL)) {
diff --git 
a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy 
b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
index d95c3edc344..2c17da24661 100644
--- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
+++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy
@@ -503,6 +503,26 @@ suite("parse_sql_from_sql_cache") {
                 assertHasCache "select @custom_variable from 
test_use_plan_cache17 where id = 1 and value = 1"
                 def result1 = sql "select @custom_variable from 
test_use_plan_cache17 where id = 1 and value = 1"
                 assertTrue(result1.size() == 1 && 
result1[0][0].toString().toInteger() == 10)
+
+
+                sql "set @custom_variable2=1"
+                assertNoCache "select * from test_use_plan_cache17 where id = 
@custom_variable2 and value = 1"
+                def res = sql "select * from test_use_plan_cache17 where id = 
@custom_variable2 and value = 1"
+                assertTrue(res[0][0] == 1)
+                assertHasCache "select * from test_use_plan_cache17 where id = 
@custom_variable2 and value = 1"
+
+                sql "set @custom_variable2=2"
+                assertNoCache "select* from test_use_plan_cache17 where id = 
@custom_variable2 and value = 1"
+                // should not invalidate cache with @custom_variable2=1
+                res = sql "select * from test_use_plan_cache17 where id = 
@custom_variable2 and value = 1"
+                assertTrue(res[0][0] == 2)
+                assertHasCache "select * from test_use_plan_cache17 where id = 
@custom_variable2 and value = 1"
+
+                sql "set @custom_variable2=1"
+                // should reuse cache
+                assertHasCache "select * from test_use_plan_cache17 where id = 
@custom_variable2 and value = 1"
+                res = sql "select * from test_use_plan_cache17 where id = 
@custom_variable2 and value = 1"
+                assertTrue(res[0][0] == 1)
             }
         }),
         extraThread("test_udf", {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to