This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new be2932accb6 [fix](cloud) Support clean tablet file cache when tablet drop (#46390) be2932accb6 is described below commit be2932accb68b20a65f0404793119f415deaa31d Author: deardeng <deng...@selectdb.com> AuthorDate: Mon Jan 13 11:25:50 2025 +0800 [fix](cloud) Support clean tablet file cache when tablet drop (#46390) --- be/src/agent/task_worker_pool.cpp | 43 ++++++- be/src/cloud/cloud_tablet.cpp | 6 +- be/src/cloud/cloud_tablet.h | 4 +- .../org/apache/doris/regression/suite/Suite.groovy | 91 ++++++++++++++ .../org/apache/doris/regression/util/Http.groovy | 2 +- .../tablets/test_clean_stale_rs_file_cache.groovy | 129 ++++++++++++++++++++ .../test_clean_tablet_when_drop_force_table.groovy | 132 ++++++++++++++------- .../test_clean_tablet_when_rebalance.groovy | 123 ++++++++++--------- 8 files changed, 424 insertions(+), 106 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 40b84facecf..329d94eaf38 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -1659,11 +1659,46 @@ void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& r .tag("tablet_id", drop_tablet_req.tablet_id); return; }); - // 1. erase lru from tablet mgr - // TODO(dx) clean tablet file cache - // get tablet's info(such as cachekey, tablet id, rsid) + MonotonicStopWatch watch; + watch.start(); + auto weak_tablets = engine.tablet_mgr().get_weak_tablets(); + std::ostringstream rowset_ids_stream; + bool found = false; + for (auto& weak_tablet : weak_tablets) { + auto tablet = weak_tablet.lock(); + if (tablet == nullptr) { + continue; + } + if (tablet->tablet_id() != drop_tablet_req.tablet_id) { + continue; + } + found = true; + auto clean_rowsets = tablet->get_snapshot_rowset(true); + // Get first 10 rowset IDs as comma-separated string, just for log + int count = 0; + for (const auto& rowset : clean_rowsets) { + if (count >= 10) break; + if (count > 0) { + rowset_ids_stream << ","; + } + rowset_ids_stream << rowset->rowset_id().to_string(); + count++; + } + + CloudTablet::recycle_cached_data(std::move(clean_rowsets)); + break; + } + + if (!found) { + LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id + << ", cost " << static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)"; + return; + } + engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id); - // 2. gen clean file cache task + LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id + << " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost " + << static_cast<int64_t>(watch.elapsed_time()) / 1e9 << "(s)"; return; } diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index f2f21162bf0..b8dd0eae4b7 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -386,6 +386,8 @@ void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, uint64_t CloudTablet::delete_expired_stale_rowsets() { std::vector<RowsetSharedPtr> expired_rowsets; + // ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2. + std::vector<RowsetSharedPtr> stale_rowsets; int64_t expired_stale_sweep_endtime = ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec; std::vector<std::string> version_to_delete; @@ -409,6 +411,7 @@ uint64_t CloudTablet::delete_expired_stale_rowsets() { auto rs_it = _stale_rs_version_map.find(v_ts->version()); if (rs_it != _stale_rs_version_map.end()) { expired_rowsets.push_back(rs_it->second); + stale_rowsets.push_back(rs_it->second); LOG(INFO) << "erase stale rowset, tablet_id=" << tablet_id() << " rowset_id=" << rs_it->second->rowset_id().to_string() << " version=" << rs_it->first.to_string(); @@ -456,7 +459,8 @@ void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowset if (config::enable_file_cache) { for (const auto& rs : rowsets) { - if (rs.use_count() >= 1) { + // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2. + if (rs.use_count() > 2) { LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count() << " references. File Cache won't be recycled when query is using it."; diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index c876518d868..2aa031ad503 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -210,12 +210,12 @@ public: void build_tablet_report_info(TTabletInfo* tablet_info); + static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets); + private: // FIXME(plat1ko): No need to record base size if rowsets are ordered by version void update_base_size(const Rowset& rs); - static void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets); - Status sync_if_not_running(); CloudStorageEngine& _engine; diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 5887fd607d4..2b121cb8f44 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -49,6 +49,7 @@ import org.apache.doris.regression.action.HttpCliAction import org.apache.doris.regression.util.DataUtils import org.apache.doris.regression.util.JdbcUtils import org.apache.doris.regression.util.Hdfs +import org.apache.doris.regression.util.Http import org.apache.doris.regression.util.SuiteUtils import org.apache.doris.regression.util.DebugPoint import org.apache.doris.regression.RunMode @@ -2831,4 +2832,94 @@ class Suite implements GroovyInterceptable { assertEquals(re_fe, re_be) assertEquals(re_fe, re_no_fold) } + + def backendIdToHost = { -> + def spb = sql_return_maparray """SHOW BACKENDS""" + def beIdToHost = [:] + spb.each { + beIdToHost[it.BackendId] = it.Host + } + beIdToHost + } + + def getTabletAndBeHostFromBe = { bes -> + def ret = [:] + bes.each { be -> + // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} + def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data + def tablets = data.tablets.collect { it.tablet_id as String } + tablets.each{ + ret[it] = data.host + } + } + ret + } + + def getTabletAndBeHostFromFe = { table -> + def result = sql_return_maparray """SHOW TABLETS FROM $table""" + def bes = backendIdToHost.call() + // tablet : [backendId, host] + def ret = [:] + result.each { + ret[it.TabletId] = [it.BackendId, bes[it.BackendId]] + } + ret + } + + // get rowset_id segment_id from ms + // curl '175.40.101.1:5000/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=27700&version=2' + def getSegmentFilesFromMs = { msHttpPort, tabletId, version, check_func -> + httpTest { + endpoint msHttpPort + op "get" + uri "/MetaService/http/get_value?token=greedisgood9999&unicode&key_type=MetaRowsetKey&instance_id=default_instance_id&tablet_id=${tabletId}&version=${version}" + check check_func + } + } + + def getRowsetFileCacheDirFromBe = { beHttpPort, msHttpPort, tabletId, version -> + def hashValues = [] + def segmentFiles = [] + getSegmentFilesFromMs(msHttpPort, tabletId, version) { + respCode, body -> + def json = parseJson(body) + logger.info("get tablet {} version {} from ms, response {}", tabletId, version, json) + // {"rowset_id":"0","partition_id":"27695","tablet_id":"27700","txn_id":"7057526525952","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"895","data_disk_size":"895","index_disk_size":"0","empty":false,"load_id":{"hi":"-1646598626735601581","lo":"-6677682539881484579"},"delete_flag":false,"creation_time":"1736153402","num_segments":"1","rowset_id_v2":"020 [...] + def segmentNum = json.num_segments as int + def rowsetId = json.rowset_id_v2 as String + segmentFiles = (0..<segmentNum).collect { i -> "${rowsetId}_${i}.dat" } + } + + segmentFiles.each { + // curl '175.40.51.3:8040/api/file_cache?op=hash&value=0200000000000004694889e84c76391cfd52ec7db0a483ba_0.dat' + def data = Http.GET("http://${beHttpPort}/api/file_cache?op=hash&value=${it}", true) + // {"hash":"2b79c649a1766dad371054ee168f0574"} + logger.info("get tablet {} segmentFile {}, response {}", tabletId, it, data) + hashValues << data.hash + } + hashValues + } + + // get table's tablet file cache + def getTabletFileCacheDirFromBe = { msHttpPort, table, version -> + // beHost HashFile + def beHostToHashFile = [:] + + def getTabletsAndHostFromFe = getTabletAndBeHostFromFe(table) + getTabletsAndHostFromFe.each { + def beHost = it.Value[1] + def tabletId = it.Key + def hashRet = getRowsetFileCacheDirFromBe(beHost + ":8040", msHttpPort, tabletId, version) + hashRet.each { + def hashFile = it + if (beHostToHashFile.containsKey(beHost)) { + beHostToHashFile[beHost].add(hashFile) + } else { + beHostToHashFile[beHost] = [hashFile] + } + } + } + beHostToHashFile + } + } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy index cd688a1fcfc..2a63f8763df 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/util/Http.groovy @@ -53,7 +53,7 @@ class Http { conn.setRequestProperty('Authorization', 'Basic cm9vdDo=') //token for root def code = conn.responseCode def text = conn.content.text - logger.info("http post url=${url}, isJson=${isJson}, response code=${code}, text=${text}") + logger.info("http get url=${url}, isJson=${isJson}, response code=${code}, text=${text}") Assert.assertEquals(200, code) if (isJson) { def json = new JsonSlurper() diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy new file mode 100644 index 00000000000..8d41939981a --- /dev/null +++ b/regression-test/suites/cloud_p0/tablets/test_clean_stale_rs_file_cache.groovy @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.Http + +suite('test_clean_stale_rs_file_cache', 'docker') { + if (!isCloudMode()) { + return; + } + def options = new ClusterOptions() + options.feConfigs += [ + 'cloud_cluster_check_interval_second=1', + 'cloud_tablet_rebalancer_interval_second=1', + 'sys_log_verbose_modules=org', + 'heartbeat_interval_second=1' + ] + options.beConfigs += [ + 'report_tablet_interval_seconds=1', + 'cumulative_compaction_min_deltas=5', + 'tablet_rowset_stale_sweep_by_size=false', + 'tablet_rowset_stale_sweep_time_sec=60', + 'vacuum_stale_rowsets_interval_s=10' + ] + options.setFeNum(1) + options.setBeNum(1) + options.cloudMode = true + + def table = "test_clean_stale_rs_file_cache" + + docker(options) { + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort + sql """CREATE TABLE $table ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `v1` varchar(2048) + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num"="1" + ); + """ + // version 2 + sql """ + insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3') + """ + def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // version 3 + sql """ + insert into $table values (10, 1, 'v1'), (20, 2, 'v2'), (30, 3, 'v3') + """ + def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // version 4 + sql """ + insert into $table values (100, 1, 'v1'), (200, 2, 'v2'), (300, 3, 'v3') + """ + // version 5 + sql """ + insert into $table values (1000, 1, 'v1'), (2000, 2, 'v2'), (3000, 3, 'v3') + """ + // version 6 + sql """ + insert into $table values (10000, 1, 'v1'), (20000, 2, 'v2'), (30000, 3, 'v3') + """ + + def mergedCacheDir = cacheDirVersion2 + cacheDirVersion3.collectEntries { host, hashFiles -> + [(host): cacheDirVersion2[host] ? (cacheDirVersion2[host] + hashFiles) : hashFiles] + } + for (int i = 0; i < 5; i++) { + sql """ + select count(*) from $table + """ + } + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + logger.info("fe tablets {}, cache dir {}", beforeGetFromFe , mergedCacheDir) + // wait compaction finish, and vacuum_stale_rowsets work + sleep(80 * 1000) + + // check cache file has been deleted + beforeGetFromFe.each { + def tabletId = it.Key + def backendId = it.Value[0] + def backendHost = it.Value[1] + def be = cluster.getBeByBackendId(backendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs) + def cacheDir = mergedCacheDir[backendHost] + + // add check + cacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " + + "Matching subdir found in: ${subDirs}") + } + } + + } +} diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy index 4dc847d603a..a65f59f85a1 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_drop_force_table.groovy @@ -16,7 +16,7 @@ // under the License. import org.apache.doris.regression.suite.ClusterOptions -import org.apache.doris.regression.util.Http + suite('test_clean_tablet_when_drop_force_table', 'docker') { if (!isCloudMode()) { @@ -31,51 +31,22 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { 'rehash_tablet_after_be_dead_seconds=5' ] options.beConfigs += [ - 'report_tablet_interval_seconds=1' + 'report_tablet_interval_seconds=1', + 'write_buffer_size=10240', + 'write_buffer_size_for_agg=10240' ] options.setFeNum(3) options.setBeNum(3) options.cloudMode = true options.enableDebugPoints() - def backendIdToHost = { -> - def spb = sql_return_maparray """SHOW BACKENDS""" - def beIdToHost = [:] - spb.each { - beIdToHost[it.BackendId] = it.Host - } - beIdToHost - } - - def getTabletAndBeHostFromFe = { table -> - def result = sql_return_maparray """SHOW TABLETS FROM $table""" - def bes = backendIdToHost.call() - // tablet : host - def ret = [:] - result.each { - ret[it.TabletId] = bes[it.BackendId] - } - ret - } - - def getTabletAndBeHostFromBe = { -> - def bes = cluster.getAllBackends() - def ret = [:] - bes.each { be -> - // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} - def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data - def tablets = data.tablets.collect { it.tablet_id as String } - tablets.each{ - ret[it] = data.host - } - } - ret - } - def testCase = { table, waitTime, useDp=false-> + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort sql """CREATE TABLE $table ( `k1` int(11) NULL, - `k2` int(11) NULL + `k2` int(11) NULL, + `v1` VARCHAR(2048) ) DUPLICATE KEY(`k1`, `k2`) COMMENT 'OLAP' @@ -84,23 +55,60 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { "replication_num"="1" ); """ + def random = new Random() + def generateRandomString = { int length -> + random.with { + def chars = ('A'..'Z').collect() + ('a'..'z').collect() + ('0'..'9').collect() + (1..length).collect { chars[nextInt(chars.size())] }.join('') + } + } + def valuesList = (1..30000).collect { i -> + def randomStr = generateRandomString(2000) + "($i, $i, '$randomStr')" + }.join(", ") + sql """ + set global max_allowed_packet = 1010241024 + """ + + context.reconnectFe() sql """ - insert into $table values (1, 1), (2, 2), (3, 3) + insert into $table values ${valuesList} """ for (int i = 0; i < 5; i++) { sql """ - select * from $table + select count(*) from $table """ } + valuesList = (30001..60000).collect { i -> + def randomStr = generateRandomString(2000) + "($i, $i, '$randomStr')" + }.join(", ") + sql """ + set global max_allowed_packet = 1010241024 + """ + context.reconnectFe() + sql """ + insert into $table values ${valuesList} + """ + // before drop table force def beforeGetFromFe = getTabletAndBeHostFromFe(table) - def beforeGetFromBe = getTabletAndBeHostFromBe.call() - logger.info("fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + // version 2 + def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // version 3 + def cacheDirVersion3 = getTabletFileCacheDirFromBe(msHttpPort, table, 3) + + def mergedCacheDir = cacheDirVersion2 + cacheDirVersion3.collectEntries { host, hashFiles -> + [(host): cacheDirVersion2[host] ? (cacheDirVersion2[host] + hashFiles) : hashFiles] + } + + logger.info("fe tablets {}, be tablets {}, cache dir {}", beforeGetFromFe, beforeGetFromBe, mergedCacheDir) beforeGetFromFe.each { assertTrue(beforeGetFromBe.containsKey(it.Key)) - assertEquals(beforeGetFromBe[it.Key], it.Value) + assertEquals(beforeGetFromBe[it.Key], it.Value[1]) } if (useDp) { GetDebugPoint().enableDebugPointForAllBEs("WorkPoolCloudDropTablet.drop_tablet_callback.failed") @@ -119,16 +127,50 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') { } def start = System.currentTimeMillis() / 1000 // tablet can't find in be - dockerAwaitUntil(50) { - def beTablets = getTabletAndBeHostFromBe.call().keySet() + dockerAwaitUntil(500) { + def beTablets = getTabletAndBeHostFromBe(cluster.getAllBackends()).keySet() logger.info("before drop tablets {}, after tablets {}", beforeGetFromFe, beTablets) - beforeGetFromFe.keySet().every { !getTabletAndBeHostFromBe.call().containsKey(it) } + beforeGetFromFe.keySet().every { !getTabletAndBeHostFromBe(cluster.getAllBackends()).containsKey(it) } } logger.info("table {}, cost {}s", table, System.currentTimeMillis() / 1000 - start) assertTrue(System.currentTimeMillis() / 1000 - start > waitTime) if (useDp) { futrue.get() } + + sleep(25 * 1000) + + // check cache file has been deleted + beforeGetFromFe.each { + def tabletId = it.Key + def backendId = it.Value[0] + def backendHost = it.Value[1] + def be = cluster.getBeByBackendId(backendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs) + def cacheDir = mergedCacheDir[backendHost] + + // add check + cacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " + + "Matching subdir found in: ${subDirs}") + } + } + } docker(options) { diff --git a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy index 4a44b317cc2..151de976a83 100644 --- a/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy +++ b/regression-test/suites/cloud_p0/tablets/test_clean_tablet_when_rebalance.groovy @@ -32,7 +32,9 @@ suite('test_clean_tablet_when_rebalance', 'docker') { ] options.feConfigs.add("rehash_tablet_after_be_dead_seconds=$rehashTime") options.beConfigs += [ - 'report_tablet_interval_seconds=1' + 'report_tablet_interval_seconds=1', + 'write_buffer_size=10240', + 'write_buffer_size_for_agg=10240' ] options.setFeNum(3) options.setBeNum(3) @@ -42,56 +44,22 @@ suite('test_clean_tablet_when_rebalance', 'docker') { def choseDeadBeIndex = 1 def table = "test_clean_tablet_when_rebalance" - def backendIdToHost = { -> - def spb = sql_return_maparray """SHOW BACKENDS""" - def beIdToHost = [:] - spb.each { - beIdToHost[it.BackendId] = it.Host - } - beIdToHost - } - - def getTabletAndBeHostFromFe = { -> - def result = sql_return_maparray """SHOW TABLETS FROM $table""" - def bes = backendIdToHost.call() - // tablet : host - def ret = [:] - result.each { - ret[it.TabletId] = bes[it.BackendId] - } - ret - } - - def getTabletAndBeHostFromBe = { -> - def bes = cluster.getAllBackends() - def ret = [:] - bes.each { be -> - // {"msg":"OK","code":0,"data":{"host":"128.2.51.2","tablets":[{"tablet_id":10560},{"tablet_id":10554},{"tablet_id":10552}]},"count":3} - def data = Http.GET("http://${be.host}:${be.httpPort}/tablets_json?limit=all", true).data - def tablets = data.tablets.collect { it.tablet_id as String } - tablets.each{ - ret[it] = data.host - } - } - ret - } - - def testCase = { deadTime -> + def testCase = { deadTime, mergedCacheDir -> boolean beDeadLong = deadTime > rehashTime ? true : false logger.info("begin exec beDeadLong {}", beDeadLong) for (int i = 0; i < 5; i++) { sql """ - select * from $table + select count(*) from $table """ } - def beforeGetFromFe = getTabletAndBeHostFromFe() - def beforeGetFromBe = getTabletAndBeHostFromBe.call() + def beforeGetFromFe = getTabletAndBeHostFromFe(table) + def beforeGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) logger.info("before fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) beforeGetFromFe.each { assertTrue(beforeGetFromBe.containsKey(it.Key)) - assertEquals(beforeGetFromBe[it.Key], it.Value) + assertEquals(beforeGetFromBe[it.Key], it.Value[1]) } cluster.stopBackends(choseDeadBeIndex) @@ -120,24 +88,72 @@ suite('test_clean_tablet_when_rebalance', 'docker') { bes.size() == (beDeadLong ? 2 : 3) } for (int i = 0; i < 5; i++) { + sleep(2000) sql """ - select * from $table + select count(*) from $table """ - sleep(1000) } - beforeGetFromFe = getTabletAndBeHostFromFe() - beforeGetFromBe = getTabletAndBeHostFromBe.call() - logger.info("after fe tablets {}, be tablets {}", beforeGetFromFe, beforeGetFromBe) - beforeGetFromFe.each { - assertTrue(beforeGetFromBe.containsKey(it.Key)) - assertEquals(beforeGetFromBe[it.Key], it.Value) + def afterGetFromFe = getTabletAndBeHostFromFe(table) + def afterGetFromBe = getTabletAndBeHostFromBe(cluster.getAllBackends()) + logger.info("after fe tablets {}, be tablets {}", afterGetFromFe, afterGetFromBe) + afterGetFromFe.each { + assertTrue(afterGetFromBe.containsKey(it.Key)) + assertEquals(afterGetFromBe[it.Key], it.Value[1]) + } + + // TODO(freemandealer) + // Once the freemandealer implements file cache cleanup during restart, enabling lines 107 to 145 will allow testing to confirm that after the rebalance, the tablet file cache on the BE will be cleared. In the current implementation, after restarting the BE and triggering the rebalance, the tablets in the tablet manager will be cleared, but the file cache cannot be cleaned up. + /* + if (beDeadLong) { + // check tablet file cache has been deleted + // after fe tablets {10309=[10003, 175.41.51.3], 10311=[10002, 175.41.51.2], 10313=[10003, 175.41.51.3]}, + afterGetFromFe.each { + logger.info("tablet_id {}, before host {}, after host {}", it.Key, beforeGetFromFe[it.Key][1], it.Value[1]) + if (beforeGetFromFe[it.Key][1] == it.Value[1]) { + return + } + logger.info("tablet_id {} has been reblanced from {} to {}", it.Key, beforeGetFromFe[it.Key][1], it.Value[1]) + // check before tablet file cache dir has been droped + + def tabletId = it.Key + def backendId = beforeGetFromFe[it.Key][0] + def backendHost = beforeGetFromFe[it.Key][1] + def be = cluster.getBeByBackendId(backendId.toLong()) + def dataPath = new File("${be.path}/storage/file_cache") + def subDirs = [] + + def collectDirs + collectDirs = { File dir -> + if (dir.exists()) { + dir.eachDir { subDir -> + subDirs << subDir.name + collectDirs(subDir) + } + } + } + + collectDirs(dataPath) + logger.info("BE {} file_cache subdirs: {}", backendHost, subDirs) + def cacheDir = mergedCacheDir[backendHost] + + // add check + cacheDir.each { hashFile -> + assertFalse(subDirs.any { subDir -> subDir.startsWith(hashFile) }, + "Found unexpected cache file pattern ${hashFile} in BE ${backendHost}'s file_cache directory. " + + "Matching subdir found in: ${subDirs}") + } + } } + */ } docker(options) { + def ms = cluster.getAllMetaservices().get(0) + def msHttpPort = ms.host + ":" + ms.httpPort sql """CREATE TABLE $table ( `k1` int(11) NULL, - `k2` int(11) NULL + `k2` int(11) NULL, + `v1` varchar(2048) ) DUPLICATE KEY(`k1`, `k2`) COMMENT 'OLAP' @@ -147,12 +163,13 @@ suite('test_clean_tablet_when_rebalance', 'docker') { ); """ sql """ - insert into $table values (1, 1), (2, 2), (3, 3) + insert into $table values (1, 1, 'v1'), (2, 2, 'v2'), (3, 3, 'v3') """ - // 'rehash_tablet_after_be_dead_seconds=10' + def cacheDirVersion2 = getTabletFileCacheDirFromBe(msHttpPort, table, 2) + // 'rehash_tablet_after_be_dead_seconds=100' // be-1 dead, but not dead for a long time - testCase(5) + testCase(5, cacheDirVersion2) // be-1 dead, and dead for a long time - testCase(200) + testCase(200, cacheDirVersion2) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org