This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7ced060550a494636719553afe365e2cf8c5470f
Author: Xiangyu Wang <dut.xian...@gmail.com>
AuthorDate: Thu Aug 31 14:22:52 2023 +0800

    [Fix](cache) fix query cache returns wrong result after deleting 
partitions. (#23555)
    
    The reason is that sql cache just use partitionKey , latestVersion and 
latestTime to check if the cache should be returned, if we delete some 
partition(s) which is not the latest updated partition, all above values are 
not changed, so the cache will hit.
    Use a field to save the partition num of these tables and sum the partition 
nums and send it to BE, there are two situations which contains 
delete-partition ops:
    
    - just delete some partition(s), so the sum of partition num will be lower 
than before.
    - delete some partition(s) coexists with add some partition(s), so the 
latest time or latest version will be higher than before.
---
 be/src/runtime/cache/result_node.h                         | 11 +++++++++++
 .../main/java/org/apache/doris/qe/cache/CacheAnalyzer.java | 14 +++++++++++---
 .../java/org/apache/doris/qe/cache/RowBatchBuilder.java    |  3 ++-
 .../src/main/java/org/apache/doris/qe/cache/SqlCache.java  |  9 +++++++--
 .../test/java/org/apache/doris/qe/PartitionCacheTest.java  | 12 +++++++++++-
 gensrc/proto/internal_service.proto                        |  1 +
 6 files changed, 43 insertions(+), 7 deletions(-)

diff --git a/be/src/runtime/cache/result_node.h 
b/be/src/runtime/cache/result_node.h
index 3edf48261b..31ab193219 100644
--- a/be/src/runtime/cache/result_node.h
+++ b/be/src/runtime/cache/result_node.h
@@ -63,6 +63,9 @@ private:
         if (req_param.last_version_time() > 
_cache_value->param().last_version_time()) {
             return false;
         }
+        if (req_param.partition_num() != 
_cache_value->param().partition_num()) {
+            return false;
+        }
         return true;
     }
 
@@ -74,9 +77,17 @@ private:
         if (up_param.last_version_time() > 
_cache_value->param().last_version_time()) {
             return true;
         }
+        if (up_param.last_version_time() == 
_cache_value->param().last_version_time() &&
+            up_param.partition_num() != _cache_value->param().partition_num()) 
{
+            return true;
+        }
         if (up_param.last_version() > _cache_value->param().last_version()) {
             return true;
         }
+        if (up_param.last_version() == _cache_value->param().last_version() &&
+            up_param.partition_num() != _cache_value->param().partition_num()) 
{
+            return true;
+        }
         return false;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
index 980eb2d829..ab15096f0c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
@@ -141,12 +141,16 @@ public class CacheAnalyzer {
         public long latestPartitionId;
         public long latestVersion;
         public long latestTime;
+        public long partitionNum;
+        public long sumOfPartitionNum;
 
         public CacheTable() {
             olapTable = null;
             latestPartitionId = 0;
             latestVersion = 0;
             latestTime = 0;
+            partitionNum = 0;
+            sumOfPartitionNum = 0;
         }
 
         @Override
@@ -155,8 +159,8 @@ public class CacheAnalyzer {
         }
 
         public void debug() {
-            LOG.debug("table {}, partition id {}, ver {}, time {}", 
olapTable.getName(),
-                    latestPartitionId, latestVersion, latestTime);
+            LOG.debug("table {}, partition id {}, ver {}, time {}, partition 
num {}, sumOfPartitionNum: {}",
+                    olapTable.getName(), latestPartitionId, latestVersion, 
latestTime, partitionNum, sumOfPartitionNum);
         }
     }
 
@@ -216,7 +220,7 @@ public class CacheAnalyzer {
             }
             if (enablePartitionCache() && ((OlapScanNode) 
node).getSelectedPartitionNum() > 1
                     && selectStmt.hasGroupByClause()) {
-                LOG.debug("more than one partition scanned when qeury has agg, 
partition cache cannot use, queryid {}",
+                LOG.debug("more than one partition scanned when query has agg, 
partition cache cannot use, queryid {}",
                         DebugUtil.printId(queryId));
                 return CacheMode.None;
             }
@@ -226,6 +230,7 @@ public class CacheAnalyzer {
         MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
         Collections.sort(tblTimeList);
         latestTable = tblTimeList.get(0);
+        latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> 
item.partitionNum).sum();
         latestTable.debug();
 
         addAllViewStmt(selectStmt);
@@ -328,6 +333,7 @@ public class CacheAnalyzer {
         MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
         Collections.sort(tblTimeList);
         latestTable = tblTimeList.get(0);
+        latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> 
item.partitionNum).sum();
         latestTable.debug();
 
         addAllViewStmt((SetOperationStmt) parsedStmt);
@@ -382,6 +388,7 @@ public class CacheAnalyzer {
         MetricRepo.COUNTER_QUERY_OLAP_TABLE.increase(1L);
         Collections.sort(tblTimeList);
         latestTable = tblTimeList.get(0);
+        latestTable.sumOfPartitionNum = tblTimeList.stream().mapToLong(item -> 
item.partitionNum).sum();
         latestTable.debug();
 
         if (((LogicalPlanAdapter) 
parsedStmt).getStatementContext().getParsedStatement().isExplain()) {
@@ -576,6 +583,7 @@ public class CacheAnalyzer {
         CacheTable cacheTable = new CacheTable();
         OlapTable olapTable = node.getOlapTable();
         cacheTable.olapTable = olapTable;
+        cacheTable.partitionNum = node.getSelectedPartitionIds().size();
         for (Long partitionId : node.getSelectedPartitionIds()) {
             Partition partition = olapTable.getPartition(partitionId);
             if (partition.getVisibleVersionTime() >= cacheTable.latestTime) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
index 2cae79597a..665f47a793 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/RowBatchBuilder.java
@@ -112,7 +112,7 @@ public class RowBatchBuilder {
     }
 
     public InternalService.PUpdateCacheRequest buildSqlUpdateRequest(
-            String sql, long partitionKey, long lastVersion, long lastestTime) 
{
+            String sql, long partitionKey, long lastVersion, long lastestTime, 
long partitionNum) {
         if (updateRequest == null) {
             updateRequest = InternalService.PUpdateCacheRequest.newBuilder()
                     .setSqlKey(CacheProxy.getMd5(sql))
@@ -124,6 +124,7 @@ public class RowBatchBuilder {
                                 .setPartitionKey(partitionKey)
                                 .setLastVersion(lastVersion)
                                 .setLastVersionTime(lastestTime)
+                                .setPartitionNum(partitionNum)
                                 .build()).setDataSize(dataSize).addAllRows(
                                 rowList.stream().map(row -> 
ByteString.copyFrom(row))
                                         
.collect(Collectors.toList()))).build();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java
index 04f0ed35cb..9135b453c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/SqlCache.java
@@ -55,13 +55,18 @@ public class SqlCache extends Cache {
         return cacheKey;
     }
 
+    public long getSumOfPartitionNum() {
+        return latestTable.sumOfPartitionNum;
+    }
+
     public InternalService.PFetchCacheResult getCacheData(Status status) {
         InternalService.PFetchCacheRequest request = 
InternalService.PFetchCacheRequest.newBuilder()
                 .setSqlKey(CacheProxy.getMd5(getSqlWithViewStmt()))
                 .addParams(InternalService.PCacheParam.newBuilder()
                         .setPartitionKey(latestTable.latestPartitionId)
                         .setLastVersion(latestTable.latestVersion)
-                        .setLastVersionTime(latestTable.latestTime))
+                        .setLastVersionTime(latestTable.latestTime)
+                        .setPartitionNum(latestTable.sumOfPartitionNum))
                 .build();
 
         InternalService.PFetchCacheResult cacheResult = 
proxy.fetchCache(request, 10000, status);
@@ -94,7 +99,7 @@ public class SqlCache extends Cache {
 
         InternalService.PUpdateCacheRequest updateRequest =
                 rowBatchBuilder.buildSqlUpdateRequest(getSqlWithViewStmt(), 
latestTable.latestPartitionId,
-                        latestTable.latestVersion, latestTable.latestTime);
+                        latestTable.latestVersion, latestTable.latestTime, 
latestTable.sumOfPartitionNum);
         if (updateRequest.getValuesCount() > 0) {
             CacheBeProxy proxy = new CacheBeProxy();
             Status status = new Status();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java
index 4ccd783025..52ee9e009f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/PartitionCacheTest.java
@@ -1104,6 +1104,7 @@ public class PartitionCacheTest {
         Assert.assertEquals(cacheKey, "SELECT <slot 2> `eventdate` AS 
`eventdate`, <slot 3> count(`userid`) AS "
                 + "`count(``userid``)` FROM `testCluster:testDb`.`appevent` 
WHERE `eventdate` >= '2020-01-12' AND "
                 + "`eventdate` <= '2020-01-14' GROUP BY `eventdate`|");
+        Assert.assertEquals(selectedPartitionIds.size(), 
sqlCache.getSumOfPartitionNum());
     }
 
     @Test
@@ -1123,6 +1124,7 @@ public class PartitionCacheTest {
         String cacheKey = sqlCache.getSqlWithViewStmt();
         Types.PUniqueId sqlKey2 = CacheProxy.getMd5(cacheKey.replace("北京", 
"上海"));
         
Assert.assertNotEquals(CacheProxy.getMd5(sqlCache.getSqlWithViewStmt()), 
sqlKey2);
+        Assert.assertEquals(selectedPartitionIds.size(), 
sqlCache.getSumOfPartitionNum());
     }
 
     @Test
@@ -1141,6 +1143,7 @@ public class PartitionCacheTest {
         Assert.assertEquals(cacheKey, "SELECT 
`testCluster:testDb`.`view1`.`eventdate` AS `eventdate`, 
`testCluster:testDb`.`view1`."
                 + "`count(`userid`)` AS `count(``userid``)` FROM 
`testCluster:testDb`.`view1`|select eventdate, COUNT(userid) "
                 + "FROM appevent WHERE eventdate>=\"2020-01-12\" and 
eventdate<=\"2020-01-14\" GROUP BY eventdate");
+        Assert.assertEquals(selectedPartitionIds.size(), 
sqlCache.getSumOfPartitionNum());
     }
 
     @Test
@@ -1159,6 +1162,7 @@ public class PartitionCacheTest {
         Assert.assertEquals(cacheKey, "SELECT * from testDb.view1"
                     + "|select eventdate, COUNT(userid) FROM appevent "
                     + "WHERE eventdate>=\"2020-01-12\" and 
eventdate<=\"2020-01-14\" GROUP BY eventdate");
+        Assert.assertEquals(selectedPartitionIds.size(), 
sqlCache.getSumOfPartitionNum());
     }
 
     @Test
@@ -1185,6 +1189,7 @@ public class PartitionCacheTest {
                 + "`userid` FROM (SELECT `view2`.`eventdate` AS `eventdate`, 
`view2`.`userid` AS `userid` FROM "
                 + "`testCluster:testDb`.`view2` view2 WHERE 
`view2`.`eventdate` >= '2020-01-12' AND `view2`.`eventdate` "
                 + "<= '2020-01-14') origin|select eventdate, userid FROM 
appevent");
+        Assert.assertEquals(selectedPartitionIds.size(), 
sqlCache.getSumOfPartitionNum());
     }
 
 
@@ -1214,6 +1219,7 @@ public class PartitionCacheTest {
                     + "    from testDb.view2 view2 \n"
                     + "    where view2.eventdate >=\"2020-01-12\" and 
view2.eventdate <= \"2020-01-14\"\n"
                     + ") origin" + "|select eventdate, userid FROM appevent");
+        Assert.assertEquals(selectedPartitionIds.size(), 
sqlCache.getSumOfPartitionNum());
     }
 
     @Test
@@ -1292,6 +1298,7 @@ public class PartitionCacheTest {
                 + "`testCluster:testDb`.`view4`.`count(`userid`)` AS 
`count(``userid``)` FROM `testCluster:testDb`.`view4`|select "
                 + "eventdate, COUNT(userid) FROM view2 WHERE 
eventdate>=\"2020-01-12\" and "
                 + "eventdate<=\"2020-01-14\" GROUP BY eventdate|select 
eventdate, userid FROM appevent");
+        Assert.assertEquals(selectedPartitionIds.size(), 
sqlCache.getSumOfPartitionNum());
     }
 
     @Test
@@ -1311,6 +1318,7 @@ public class PartitionCacheTest {
                     + "|select eventdate, COUNT(userid) FROM view2 "
                     + "WHERE eventdate>=\"2020-01-12\" and 
eventdate<=\"2020-01-14\" GROUP BY eventdate"
                     + "|select eventdate, userid FROM appevent");
+        Assert.assertEquals(selectedPartitionIds.size(), 
sqlCache.getSumOfPartitionNum());
     }
 
     @Test
@@ -1328,10 +1336,12 @@ public class PartitionCacheTest {
         );
         ArrayList<Long> selectedPartitionIds
                 = Lists.newArrayList(20200112L, 20200113L, 20200114L, 
20200115L);
-        List<ScanNode> scanNodes = 
Lists.newArrayList(createProfileScanNode(selectedPartitionIds));
+        ScanNode scanNode = createProfileScanNode(selectedPartitionIds);
+        List<ScanNode> scanNodes = Lists.newArrayList(scanNode, scanNode, 
scanNode);
         CacheAnalyzer ca = new CacheAnalyzer(context, parseStmt, scanNodes);
         ca.checkCacheMode(0);
         Assert.assertEquals(ca.getCacheMode(), CacheMode.Sql);
+        Assert.assertEquals(selectedPartitionIds.size() * 3, ((SqlCache) 
ca.getCache()).getSumOfPartitionNum());
     }
 
     @Test
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 8b698b9948..8af26a317e 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -295,6 +295,7 @@ message PCacheParam {
     required int64 partition_key = 1;
     optional int64 last_version = 2;
     optional int64 last_version_time = 3;
+    optional int64 partition_num = 4;
 };
 
 message PCacheValue {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to