This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new be2932accb6 [fix](cloud) Support clean tablet file cache when tablet 
drop (#46390)
be2932accb6 is described below

commit be2932accb68b20a65f0404793119f415deaa31d
Author: deardeng <deng...@selectdb.com>
AuthorDate: Mon Jan 13 11:25:50 2025 +0800

    [fix](cloud) Support clean tablet file cache when tablet drop (#46390)
---
 be/src/agent/task_worker_pool.cpp                  |  43 ++++++-
 be/src/cloud/cloud_tablet.cpp                      |   6 +-
 be/src/cloud/cloud_tablet.h                        |   4 +-
 .../org/apache/doris/regression/suite/Suite.groovy |  91 ++++++++++++++
 .../org/apache/doris/regression/util/Http.groovy   |   2 +-
 .../tablets/test_clean_stale_rs_file_cache.groovy  | 129 ++++++++++++++++++++
 .../test_clean_tablet_when_drop_force_table.groovy | 132 ++++++++++++++-------
 .../test_clean_tablet_when_rebalance.groovy        | 123 ++++++++++---------
 8 files changed, 424 insertions(+), 106 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 40b84facecf..329d94eaf38 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1659,11 +1659,46 @@ void drop_tablet_callback(CloudStorageEngine& engine, 
const TAgentTaskRequest& r
                 .tag("tablet_id", drop_tablet_req.tablet_id);
         return;
     });
-    // 1. erase lru from tablet mgr
-    // TODO(dx) clean tablet file cache
-    // get tablet's info(such as cachekey, tablet id, rsid)
+    MonotonicStopWatch watch;
+    watch.start();
+    auto weak_tablets = engine.tablet_mgr().get_weak_tablets();
+    std::ostringstream rowset_ids_stream;
+    bool found = false;
+    for (auto& weak_tablet : weak_tablets) {
+        auto tablet = weak_tablet.lock();
+        if (tablet == nullptr) {
+            continue;
+        }
+        if (tablet->tablet_id() != drop_tablet_req.tablet_id) {
+            continue;
+        }
+        found = true;
+        auto clean_rowsets = tablet->get_snapshot_rowset(true);
+        // Get first 10 rowset IDs as comma-separated string, just for log
+        int count = 0;
+        for (const auto& rowset : clean_rowsets) {
+            if (count >= 10) break;
+            if (count > 0) {
+                rowset_ids_stream << ",";
+            }
+            rowset_ids_stream << rowset->rowset_id().to_string();
+            count++;
+        }
+
+        CloudTablet::recycle_cached_data(std::move(clean_rowsets));
+        break;
+    }
+
+    if (!found) {
+        LOG(WARNING) << "tablet not found when dropping tablet_id=" << 
drop_tablet_req.tablet_id
+                     << ", cost " << 
static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
+        return;
+    }
+
     engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id);
-    // 2. gen clean file cache task
+    LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id
+              << " and clean file cache first 10 rowsets {" << 
rowset_ids_stream.str() << "}, cost "
+              << static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)";
     return;
 }
 
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index f2f21162bf0..b8dd0eae4b7 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -386,6 +386,8 @@ void CloudTablet::delete_rowsets(const 
std::vector<RowsetSharedPtr>& to_delete,
 
 uint64_t CloudTablet::delete_expired_stale_rowsets() {
     std::vector<RowsetSharedPtr> expired_rowsets;
+    // ATTN: trick, Use stale_rowsets to temporarily increase the reference 
count of the rowset shared pointer in _stale_rs_version_map so that in the 
recycle_cached_data function, it checks if the reference count is 2.
+    std::vector<RowsetSharedPtr> stale_rowsets;
     int64_t expired_stale_sweep_endtime =
             ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
     std::vector<std::string> version_to_delete;
@@ -409,6 +411,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() {
                 auto rs_it = _stale_rs_version_map.find(v_ts->version());
                 if (rs_it != _stale_rs_version_map.end()) {
                     expired_rowsets.push_back(rs_it->second);
+                    stale_rowsets.push_back(rs_it->second);
                     LOG(INFO) << "erase stale rowset, tablet_id=" << 
tablet_id()
                               << " rowset_id=" << 
rs_it->second->rowset_id().to_string()
                               << " version=" << rs_it->first.to_string();
@@ -456,7 +459,8 @@ void CloudTablet::recycle_cached_data(const 
std::vector<RowsetSharedPtr>& rowset
 
     if (config::enable_file_cache) {
         for (const auto& rs : rowsets) {
-            if (rs.use_count() >= 1) {
+            // rowsets and tablet._rs_version_map each hold a rowset 
shared_ptr, so at this point, the reference count of the shared_ptr is at least 
2.
+            if (rs.use_count() > 2) {
                 LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " 
has "
                              << rs.use_count()
                              << " references. File Cache won't be recycled 
when query is using it.";
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index c876518d868..2aa031ad503 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -210,12 +210,12 @@ public:
 
     void build_tablet_report_info(TTabletInfo* tablet_info);
 
+    static void recycle_cached_data(const std::vector<RowsetSharedPtr>& 
rowsets);
+
 private:
     // FIXME(plat1ko): No need to record base size if rowsets are ordered by 
version
     void update_base_size(const Rowset& rs);
 
-    static void recycle_cached_data(const std::vector<RowsetSharedPtr>& 
rowsets);
-
     Status sync_if_not_running();
 
     CloudStorageEngine& _engine;
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 5887fd607d4..2b121cb8f44 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -49,6 +49,7 @@ import org.apache.doris.regression.action.HttpCliAction
 import org.apache.doris.regression.util.DataUtils
 import org.apache.doris.regression.util.JdbcUtils
 import org.apache.doris.regression.util.Hdfs
+import org.apache.doris.regression.util.Http
 import org.apache.doris.regression.util.SuiteUtils
 import org.apache.doris.regression.util.DebugPoint
 import org.apache.doris.regression.RunMode
@@ -2831,4 +2832,94 @@ class Suite implements GroovyInterceptable {
         assertEquals(re_fe, re_be)
         assertEquals(re_fe, re_no_fold)
     }
+
+    def backendIdToHost = { ->
+        def spb = sql_return_maparray """SHOW BACKENDS"""
+        def beIdToHost = [:]
+        spb.each {
+            beIdToHost[it.BackendId] = it.Host
+        }
+        beIdToHost 
+    }
+
+    def getTabletAndBeHostFromBe = { bes ->
+        def ret = [:]
+        bes.each { be ->
+            // 
{"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3}
+            def data = 
Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all";, true).data
+            def tablets = data.tablets.collect { it.tablet_id as String }
+            tablets.each{
+                ret[it] = data.host
+            }
+        }
+        ret
+    }
+
+    def getTabletAndBeHostFromFe = { table ->
+        def result = sql_return_maparray """SHOW TABLETS FROM $table"""
+        def bes = backendIdToHost.call()
+        // tablet : [backendId, host]
+        def ret = [:]
+        result.each {
+            ret[it.TabletId] = [it.BackendId, bes[it.BackendId]]
+        }
+        ret
+    }
+
+    // get rowset_id segment_id from ms
+    // curl 
'175.40.101.1:5000/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=27700&version=2'
+    def getSegmentFilesFromMs = { msHttpPort, tabletId, version, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            op "get"
+            uri 
"/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=${tabletId}&version=${version}"
+            check check_func
+        }
+    }
+
+    def getRowsetFileCacheDirFromBe = { beHttpPort, msHttpPort, tabletId, 
version -> 
+        def hashValues = []
+        def segmentFiles = []
+        getSegmentFilesFromMs(msHttpPort, tabletId, version) {
+            respCode, body ->
+                def json = parseJson(body)
+                logger.info("get tablet {} version {} from ms, response {}", 
tabletId, version, json)
+                // 
{"rowset_id":"0","partition_id":"27695","tablet_id":"27700","txn_id":"7057526525952","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"895","data_disk_size":"895","index_disk_size":"0","empty":false,"load_id":{"hi":"-1646598626735601581","lo":"-6677682539881484579"},"delete_flag":false,"creation_time":"1736153402","num_segments":"1","rowset_id_v2":"020
 [...]
+                def segmentNum = json.num_segments as int
+                def rowsetId = json.rowset_id_v2 as String
+                segmentFiles = (0..<segmentNum).collect { i -> 
"${rowsetId}_${i}.dat" }
+        }
+
+        segmentFiles.each {
+            // curl 
'175.40.51.3:8040/api/file_cache?op=hash&value=0200000000000004694889e84c76391cfd52ec7db0a483ba_0.dat'
+            def data = 
Http.GET("http://${beHttpPort}/api/file_cache?op=hash&value=${it}";, true)
+            // {"hash":"2b79c649a1766dad371054ee168f0574"}
+            logger.info("get tablet {} segmentFile {}, response {}", tabletId, 
it, data)
+            hashValues << data.hash
+        }
+        hashValues
+    }
+
+    // get table's tablet file cache
+    def getTabletFileCacheDirFromBe = { msHttpPort, table, version ->
+        // beHost HashFile
+        def beHostToHashFile = [:]
+
+        def getTabletsAndHostFromFe = getTabletAndBeHostFromFe(table)
+        getTabletsAndHostFromFe.each {
+            def beHost = it.Value[1]
+            def tabletId = it.Key
+            def hashRet = getRowsetFileCacheDirFromBe(beHost + ":8040", 
msHttpPort, tabletId, version)
+            hashRet.each {
+                def hashFile = it
+                if (beHostToHashFile.containsKey(beHost)) {
+                    beHostToHashFile[beHost].add(hashFile)
+                } else {
+                    beHostToHashFile[beHost] = [hashFile]
+                }
+            }
+        }
+        beHostToHashFile
+    }
+
 }
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy
index cd688a1fcfc..2a63f8763df 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy
@@ -53,7 +53,7 @@ class Http {
         conn.setRequestProperty('Authorization', 'Basic cm9vdDo=') //token for 
root
         def code = conn.responseCode
         def text = conn.content.text
-        logger.info("http post url=${url}, isJson=${isJson}, response 
code=${code}, text=${text}")
+        logger.info("http get url=${url}, isJson=${isJson}, response 
code=${code}, text=${text}")
         Assert.assertEquals(200, code)
         if (isJson) {
             def json = new JsonSlurper()
diff --git 
a/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy 
b/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy
new file mode 100644
index 00000000000..8d41939981a
--- /dev/null
+++ 
b/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.Http
+
+suite('test_clean_stale_rs_file_cache', 'docker') {
+    if (!isCloudMode()) {
+        return;
+    }
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=1',
+        'cloud_tablet_rebalancer_interval_second=1',
+        'sys_log_verbose_modules=org',
+        'heartbeat_interval_second=1'
+    ]
+    options.beConfigs += [
+        'report_tablet_interval_seconds=1',
+        'cumulative_compaction_min_deltas=5',
+        'tablet_rowset_stale_sweep_by_size=false',
+        'tablet_rowset_stale_sweep_time_sec=60',
+        'vacuum_stale_rowsets_interval_s=10'
+    ]
+    options.setFeNum(1)
+    options.setBeNum(1)
+    options.cloudMode = true
+
+    def table = "test_clean_stale_rs_file_cache"
+
+    docker(options) {
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
+        sql """CREATE TABLE $table (
+            `k1` int(11) NULL,
+            `k2` int(11) NULL,
+            `v1` varchar(2048)
+            )
+            DUPLICATE KEY(`k1`, `k2`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+            PROPERTIES (
+            "replication_num"="1"
+            );
+        """
+        // version 2
+        sql """
+            insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3')
+        """
+        def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 
2)
+        // version 3
+        sql """
+            insert into $table values (10, 1, 'v1'), (20, 2, 'v2'), (30, 3, 
'v3')
+        """
+        def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 
2)
+        // version 4
+        sql """
+            insert into $table values (100, 1, 'v1'), (200, 2, 'v2'), (300, 3, 
'v3')
+        """
+        // version 5
+        sql """
+            insert into $table values (1000, 1, 'v1'), (2000, 2, 'v2'), (3000, 
3, 'v3')
+        """
+        // version 6
+        sql """
+            insert into $table values (10000, 1, 'v1'), (20000, 2, 'v2'), 
(30000, 3, 'v3')
+        """
+
+        def mergedCacheDir = cacheDirVersion2 + 
cacheDirVersion3.collectEntries { host, hashFiles ->
+            [(host): cacheDirVersion2[host] ? (cacheDirVersion2[host] + 
hashFiles) : hashFiles]
+        }
+        for (int i = 0; i < 5; i++) {
+            sql """
+                select count(*) from $table
+            """
+        }
+        def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+        logger.info("fe tablets {}, cache dir {}", beforeGetFromFe , 
mergedCacheDir)
+        // wait compaction finish, and vacuum_stale_rowsets work
+        sleep(80 * 1000)
+
+        // check cache file has been deleted
+        beforeGetFromFe.each {
+            def tabletId = it.Key
+            def backendId = it.Value[0]
+            def backendHost = it.Value[1]
+            def be = cluster.getBeByBackendId(backendId.toLong())
+            def dataPath = new File("${be.path}/storage/file_cache")
+            def subDirs = []
+            
+            def collectDirs
+            collectDirs = { File dir ->
+                if (dir.exists()) {
+                    dir.eachDir { subDir ->
+                        subDirs << subDir.name
+                        collectDirs(subDir) 
+                    }
+                }
+            }
+            
+            
+            collectDirs(dataPath)
+            logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs)
+            def cacheDir = mergedCacheDir[backendHost]
+
+            // add check
+            cacheDir.each { hashFile ->
+                assertFalse(subDirs.any { subDir -> 
subDir.startsWith(hashFile) }, 
+                "Found unexpected cache file pattern ${hashFile} in BE 
${backendHost}'s file_cache directory. " + 
+                "Matching subdir found in: ${subDirs}")
+            }
+        }
+
+    }
+}
diff --git 
a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy
 
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy
index 4dc847d603a..a65f59f85a1 100644
--- 
a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy
+++ 
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy
@@ -16,7 +16,7 @@
 // under the License.
 
 import org.apache.doris.regression.suite.ClusterOptions
-import org.apache.doris.regression.util.Http
+
 
 suite('test_clean_tablet_when_drop_force_table', 'docker') {
     if (!isCloudMode()) {
@@ -31,51 +31,22 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') {
         'rehash_tablet_after_be_dead_seconds=5'
     ]
     options.beConfigs += [
-        'report_tablet_interval_seconds=1'
+        'report_tablet_interval_seconds=1',
+        'write_buffer_size=10240',
+        'write_buffer_size_for_agg=10240'
     ]
     options.setFeNum(3)
     options.setBeNum(3)
     options.cloudMode = true
     options.enableDebugPoints()
     
-    def backendIdToHost = { ->
-        def spb = sql_return_maparray """SHOW BACKENDS"""
-        def beIdToHost = [:]
-        spb.each {
-            beIdToHost[it.BackendId] = it.Host
-        }
-        beIdToHost 
-    }
-
-    def getTabletAndBeHostFromFe = { table ->
-        def result = sql_return_maparray """SHOW TABLETS FROM $table"""
-        def bes = backendIdToHost.call()
-        // tablet : host
-        def ret = [:]
-        result.each {
-            ret[it.TabletId] = bes[it.BackendId]
-        }
-        ret
-    }
-
-    def getTabletAndBeHostFromBe = { ->
-        def bes = cluster.getAllBackends()
-        def ret = [:]
-        bes.each { be ->
-            // 
{"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3}
-            def data = 
Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all";, true).data
-            def tablets = data.tablets.collect { it.tablet_id as String }
-            tablets.each{
-                ret[it] = data.host
-            }
-        }
-        ret
-    }
-
     def testCase = { table, waitTime, useDp=false-> 
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
         sql """CREATE TABLE $table (
             `k1` int(11) NULL,
-            `k2` int(11) NULL
+            `k2` int(11) NULL,
+            `v1` VARCHAR(2048)
             )
             DUPLICATE KEY(`k1`, `k2`)
             COMMENT 'OLAP'
@@ -84,23 +55,60 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') {
             "replication_num"="1"
             );
         """
+        def random = new Random()
+        def generateRandomString = { int length ->
+            random.with {
+                def chars = ('A'..'Z').collect() + ('a'..'z').collect() + 
('0'..'9').collect()
+                (1..length).collect { chars[nextInt(chars.size())] }.join('')
+            }
+        }
+        def valuesList = (1..30000).collect { i -> 
+            def randomStr = generateRandomString(2000)
+            "($i, $i, '$randomStr')"
+        }.join(", ")
+        sql """
+            set global max_allowed_packet = 1010241024
+        """
+
+        context.reconnectFe()
         sql """
-            insert into $table values (1, 1), (2, 2), (3, 3)
+            insert into $table values ${valuesList}
         """
 
         for (int i = 0; i < 5; i++) {
             sql """
-                select * from $table
+                select count(*) from $table
             """
         }
 
+        valuesList = (30001..60000).collect { i -> 
+            def randomStr = generateRandomString(2000)
+            "($i, $i, '$randomStr')"
+        }.join(", ")
+        sql """
+            set global max_allowed_packet = 1010241024
+        """
+        context.reconnectFe()
+        sql """
+            insert into $table values ${valuesList}
+        """
+
         // before drop table force
         def beforeGetFromFe = getTabletAndBeHostFromFe(table)
-        def beforeGetFromBe = getTabletAndBeHostFromBe.call()
-        logger.info("fe tablets {}, be tablets {}", beforeGetFromFe, 
beforeGetFromBe)
+        def beforeGetFromBe = 
getTabletAndBeHostFromBe(cluster.getAllBackends())
+        // version 2
+        def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 
2)
+        // version 3
+        def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 
3)
+
+        def mergedCacheDir = cacheDirVersion2 + 
cacheDirVersion3.collectEntries { host, hashFiles ->
+            [(host): cacheDirVersion2[host] ? (cacheDirVersion2[host] + 
hashFiles) : hashFiles]
+        }
+        
+        logger.info("fe tablets {}, be tablets {}, cache dir {}", 
beforeGetFromFe, beforeGetFromBe, mergedCacheDir)
         beforeGetFromFe.each {
             assertTrue(beforeGetFromBe.containsKey(it.Key))
-            assertEquals(beforeGetFromBe[it.Key], it.Value)
+            assertEquals(beforeGetFromBe[it.Key], it.Value[1])
         }
         if (useDp) {
             
GetDebugPoint().enableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed")
@@ -119,16 +127,50 @@ suite('test_clean_tablet_when_drop_force_table', 
'docker') {
         }
         def start = System.currentTimeMillis() / 1000
         // tablet can't find in be 
-        dockerAwaitUntil(50) {
-            def beTablets = getTabletAndBeHostFromBe.call().keySet()
+        dockerAwaitUntil(500) {
+            def beTablets = 
getTabletAndBeHostFromBe(cluster.getAllBackends()).keySet()
             logger.info("before drop tablets {}, after tablets {}", 
beforeGetFromFe, beTablets)
-            beforeGetFromFe.keySet().every { 
!getTabletAndBeHostFromBe.call().containsKey(it) }
+            beforeGetFromFe.keySet().every { 
!getTabletAndBeHostFromBe(cluster.getAllBackends()).containsKey(it) }
         }
         logger.info("table {}, cost {}s", table, System.currentTimeMillis() / 
1000 - start)
         assertTrue(System.currentTimeMillis() / 1000 - start > waitTime)
         if (useDp) {
             futrue.get()
         }
+
+        sleep(25 * 1000)
+
+        // check cache file has been deleted
+        beforeGetFromFe.each {
+            def tabletId = it.Key
+            def backendId = it.Value[0]
+            def backendHost = it.Value[1]
+            def be = cluster.getBeByBackendId(backendId.toLong())
+            def dataPath = new File("${be.path}/storage/file_cache")
+            def subDirs = []
+            
+            def collectDirs
+            collectDirs = { File dir ->
+                if (dir.exists()) {
+                    dir.eachDir { subDir ->
+                        subDirs << subDir.name
+                        collectDirs(subDir) 
+                    }
+                }
+            }
+            
+            collectDirs(dataPath)
+            logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs)
+            def cacheDir = mergedCacheDir[backendHost]
+
+            // add check
+            cacheDir.each { hashFile ->
+                assertFalse(subDirs.any { subDir -> 
subDir.startsWith(hashFile) }, 
+                "Found unexpected cache file pattern ${hashFile} in BE 
${backendHost}'s file_cache directory. " + 
+                "Matching subdir found in: ${subDirs}")
+            }
+        }
+
     }
 
     docker(options) {
diff --git 
a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
 
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
index 4a44b317cc2..151de976a83 100644
--- 
a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
+++ 
b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy
@@ -32,7 +32,9 @@ suite('test_clean_tablet_when_rebalance', 'docker') {
     ]
     options.feConfigs.add("rehash_tablet_after_be_dead_seconds=$rehashTime")
     options.beConfigs += [
-        'report_tablet_interval_seconds=1'
+        'report_tablet_interval_seconds=1',
+        'write_buffer_size=10240',
+        'write_buffer_size_for_agg=10240'
     ]
     options.setFeNum(3)
     options.setBeNum(3)
@@ -42,56 +44,22 @@ suite('test_clean_tablet_when_rebalance', 'docker') {
     def choseDeadBeIndex = 1
     def table = "test_clean_tablet_when_rebalance"
 
-    def backendIdToHost = { ->
-        def spb = sql_return_maparray """SHOW BACKENDS"""
-        def beIdToHost = [:]
-        spb.each {
-            beIdToHost[it.BackendId] = it.Host
-        }
-        beIdToHost 
-    }
-
-    def getTabletAndBeHostFromFe = { ->
-        def result = sql_return_maparray """SHOW TABLETS FROM $table"""
-        def bes = backendIdToHost.call()
-        // tablet : host
-        def ret = [:]
-        result.each {
-            ret[it.TabletId] = bes[it.BackendId]
-        }
-        ret
-    }
-
-    def getTabletAndBeHostFromBe = { ->
-        def bes = cluster.getAllBackends()
-        def ret = [:]
-        bes.each { be ->
-            // 
{"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3}
-            def data = 
Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all";, true).data
-            def tablets = data.tablets.collect { it.tablet_id as String }
-            tablets.each{
-                ret[it] = data.host
-            }
-        }
-        ret
-    }
-
-    def testCase = { deadTime -> 
+    def testCase = { deadTime, mergedCacheDir -> 
         boolean beDeadLong = deadTime > rehashTime ? true : false
         logger.info("begin exec beDeadLong {}", beDeadLong)
 
         for (int i = 0; i < 5; i++) {
             sql """
-                select * from $table
+                select count(*) from $table
             """
         }
 
-        def beforeGetFromFe = getTabletAndBeHostFromFe()
-        def beforeGetFromBe = getTabletAndBeHostFromBe.call()
+        def beforeGetFromFe = getTabletAndBeHostFromFe(table)
+        def beforeGetFromBe = 
getTabletAndBeHostFromBe(cluster.getAllBackends())
         logger.info("before fe tablets {}, be tablets {}", beforeGetFromFe, 
beforeGetFromBe)
         beforeGetFromFe.each {
             assertTrue(beforeGetFromBe.containsKey(it.Key))
-            assertEquals(beforeGetFromBe[it.Key], it.Value)
+            assertEquals(beforeGetFromBe[it.Key], it.Value[1])
         }
 
         cluster.stopBackends(choseDeadBeIndex)
@@ -120,24 +88,72 @@ suite('test_clean_tablet_when_rebalance', 'docker') {
             bes.size() == (beDeadLong ? 2 : 3)
         }
         for (int i = 0; i < 5; i++) {
+            sleep(2000)
             sql """
-                select * from $table
+                select count(*) from $table
             """
-            sleep(1000)
         }
-        beforeGetFromFe = getTabletAndBeHostFromFe()
-        beforeGetFromBe = getTabletAndBeHostFromBe.call()
-        logger.info("after fe tablets {}, be tablets {}", beforeGetFromFe, 
beforeGetFromBe)
-        beforeGetFromFe.each {
-            assertTrue(beforeGetFromBe.containsKey(it.Key))
-            assertEquals(beforeGetFromBe[it.Key], it.Value)
+        def afterGetFromFe = getTabletAndBeHostFromFe(table)
+        def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends())
+        logger.info("after fe tablets {}, be tablets {}", afterGetFromFe, 
afterGetFromBe)
+        afterGetFromFe.each {
+            assertTrue(afterGetFromBe.containsKey(it.Key))
+            assertEquals(afterGetFromBe[it.Key], it.Value[1])
+        }
+
+        // TODO(freemandealer)
+        // Once the freemandealer implements file cache cleanup during 
restart, enabling lines 107 to 145 will allow testing to confirm that after the 
rebalance, the tablet file cache on the BE will be cleared. In the current 
implementation, after restarting the BE and triggering the rebalance, the 
tablets in the tablet manager will be cleared, but the file cache cannot be 
cleaned up.
+        /*
+        if (beDeadLong) {
+            // check tablet file cache has been deleted
+            // after fe tablets {10309=[10003, 175.41.51.3], 10311=[10002, 
175.41.51.2], 10313=[10003, 175.41.51.3]},
+            afterGetFromFe.each {
+                logger.info("tablet_id {}, before host {}, after host {}", 
it.Key, beforeGetFromFe[it.Key][1], it.Value[1])
+                if (beforeGetFromFe[it.Key][1] == it.Value[1]) {
+                    return
+                }
+                logger.info("tablet_id {} has been reblanced from {} to {}", 
it.Key, beforeGetFromFe[it.Key][1], it.Value[1])
+                // check before tablet file cache dir has been droped
+
+                def tabletId = it.Key
+                def backendId = beforeGetFromFe[it.Key][0]
+                def backendHost = beforeGetFromFe[it.Key][1]
+                def be = cluster.getBeByBackendId(backendId.toLong())
+                def dataPath = new File("${be.path}/storage/file_cache")
+                def subDirs = []
+                
+                def collectDirs
+                collectDirs = { File dir ->
+                    if (dir.exists()) {
+                        dir.eachDir { subDir ->
+                            subDirs << subDir.name
+                            collectDirs(subDir) 
+                        }
+                    }
+                }
+                
+                collectDirs(dataPath)
+                logger.info("BE {} file_cache subdirs: {}", backendHost, 
subDirs)
+                def cacheDir = mergedCacheDir[backendHost]
+
+                // add check
+                cacheDir.each { hashFile ->
+                    assertFalse(subDirs.any { subDir -> 
subDir.startsWith(hashFile) }, 
+                    "Found unexpected cache file pattern ${hashFile} in BE 
${backendHost}'s file_cache directory. " + 
+                    "Matching subdir found in: ${subDirs}")
+                }
+            }
         }
+        */
     }
 
     docker(options) {
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
         sql """CREATE TABLE $table (
             `k1` int(11) NULL,
-            `k2` int(11) NULL
+            `k2` int(11) NULL,
+            `v1` varchar(2048)
             )
             DUPLICATE KEY(`k1`, `k2`)
             COMMENT 'OLAP'
@@ -147,12 +163,13 @@ suite('test_clean_tablet_when_rebalance', 'docker') {
             );
         """
         sql """
-            insert into $table values (1, 1), (2, 2), (3, 3)
+            insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3')
         """
-        // 'rehash_tablet_after_be_dead_seconds=10'
+        def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 
2)
+        // 'rehash_tablet_after_be_dead_seconds=100'
         // be-1 dead, but not dead for a long time
-        testCase(5)
+        testCase(5, cacheDirVersion2)
         // be-1 dead, and dead for a long time
-        testCase(200)
+        testCase(200, cacheDirVersion2)
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to