This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 0f22ae9b27c [opt](filecache) schema change adaptive to file cache
(#58620)
0f22ae9b27c is described below
commit 0f22ae9b27c75d2cfee56a781cfb4162c982f586
Author: Haolin Guan <[email protected]>
AuthorDate: Fri Dec 12 09:27:30 2025 +0800
[opt](filecache) schema change adaptive to file cache (#58620)
Cherry-pick from #57470
---
be/src/cloud/cloud_cumulative_compaction.cpp | 2 +-
be/src/cloud/cloud_schema_change_job.cpp | 48 ++++-
be/src/cloud/cloud_schema_change_job.h | 3 +-
be/src/cloud/cloud_tablet.cpp | 2 +-
be/src/common/config.cpp | 4 +
be/src/common/config.h | 2 +
be/src/olap/schema_change.h | 1 +
.../test_filecache_with_alter_table.groovy | 234 +++++++++++++++++++++
8 files changed, 289 insertions(+), 7 deletions(-)
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp
b/be/src/cloud/cloud_cumulative_compaction.cpp
index 4c587ea3bca..0398f24a5ad 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -375,7 +375,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
if (_input_rowsets.size() == 1) {
DCHECK_EQ(_output_rowset->version(), _input_rowsets[0]->version());
// MUST NOT move input rowset to stale path
- cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock);
+ cloud_tablet()->add_rowsets({_output_rowset}, true, wrlock, true);
} else {
cloud_tablet()->delete_rowsets(_input_rowsets, wrlock);
cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock);
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index df59a78a583..c6ac0b6cb98 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -32,6 +32,7 @@
#include "olap/delete_handler.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
+#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/storage_engine.h"
@@ -217,6 +218,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const
TAlterTabletReqV2& reque
SchemaChangeParams sc_params;
+ // cache schema change output to file cache
+ std::vector<RowsetSharedPtr> rowsets;
+ rowsets.resize(rs_splits.size());
+ std::transform(rs_splits.begin(), rs_splits.end(), rowsets.begin(),
+ [](RowSetSplits& split) { return split.rs_reader->rowset();
});
+ sc_params.output_to_file_cache = _should_cache_sc_output(rowsets);
+
RETURN_IF_ERROR(DescriptorTbl::create(&sc_params.pool, request.desc_tbl,
&sc_params.desc_tbl));
sc_params.ref_rowset_readers.reserve(rs_splits.size());
for (RowSetSplits& split : rs_splits) {
@@ -309,6 +317,8 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
context.tablet_schema = _new_tablet->tablet_schema();
context.newest_write_timestamp = rs_reader->newest_write_timestamp();
context.storage_resource =
_cloud_storage_engine.get_storage_resource(sc_params.vault_id);
+ context.write_file_cache = sc_params.output_to_file_cache;
+ context.tablet = _new_tablet;
if (!context.storage_resource) {
return Status::InternalError("vault id not found, maybe not sync,
vault id {}",
sc_params.vault_id);
@@ -467,7 +477,7 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
// during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in
another thread
std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
std::unique_lock wlock(_new_tablet->get_header_lock());
- _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock);
+ _new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock,
false);
_new_tablet->set_cumulative_layer_point(_output_cumulative_point);
_new_tablet->reset_approximate_stats(stats.num_rowsets(),
stats.num_segments(),
stats.num_rows(),
stats.data_size());
@@ -503,7 +513,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t
alter_version,
std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
- tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
+ tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
// Set alter version to let the tmp_tablet can fill hole rowset
greater than alter_version
tmp_tablet->set_alter_version(alter_version);
}
@@ -521,7 +531,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t
alter_version,
DBUG_BLOCK);
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
- tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
+ tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
}
for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet,
rowset));
@@ -544,7 +554,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t
alter_version,
{max_version + 1, new_max_version}, CaptureRowsetOps {}));
{
std::unique_lock wlock(tmp_tablet->get_header_lock());
- tmp_tablet->add_rowsets(_output_rowsets, true, wlock);
+ tmp_tablet->add_rowsets(_output_rowsets, true, wlock, false);
}
for (auto rowset : ret.rowsets) {
RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet,
rowset));
@@ -595,4 +605,34 @@ void CloudSchemaChangeJob::clean_up_on_failure() {
}
}
+bool CloudSchemaChangeJob::_should_cache_sc_output(
+ const std::vector<RowsetSharedPtr>& input_rowsets) {
+ int64_t total_size = 0;
+ int64_t cached_index_size = 0;
+ int64_t cached_data_size = 0;
+
+ for (const auto& rs : input_rowsets) {
+ const RowsetMetaSharedPtr& rs_meta = rs->rowset_meta();
+ total_size += rs_meta->total_disk_size();
+ cached_index_size += rs->approximate_cache_index_size();
+ cached_data_size += rs->approximate_cached_data_size();
+ }
+
+ double input_hit_rate = static_cast<double>(cached_index_size +
cached_data_size) / total_size;
+
+ LOG(INFO) << "CloudSchemaChangeJob check cache sc output strategy. "
+ << "job_id=" << _job_id << ", input_rowsets_count=" <<
input_rowsets.size()
+ << ", total_size=" << total_size << ", cached_index_size=" <<
cached_index_size
+ << ", cached_data_size=" << cached_data_size << ",
input_hit_rate=" << input_hit_rate
+ << ", min_hit_ratio_threshold="
+ << config::file_cache_keep_schema_change_output_min_hit_ratio <<
", should_cache="
+ << (input_hit_rate >
config::file_cache_keep_schema_change_output_min_hit_ratio);
+
+ if (input_hit_rate >
config::file_cache_keep_schema_change_output_min_hit_ratio) {
+ return true;
+ }
+
+ return false;
+}
+
} // namespace doris
diff --git a/be/src/cloud/cloud_schema_change_job.h
b/be/src/cloud/cloud_schema_change_job.h
index 7d84279dd1d..0132d1f506a 100644
--- a/be/src/cloud/cloud_schema_change_job.h
+++ b/be/src/cloud/cloud_schema_change_job.h
@@ -39,13 +39,14 @@ public:
void clean_up_on_failure();
private:
+ bool _should_cache_sc_output(const std::vector<RowsetSharedPtr>&
input_rowsets);
+
Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
cloud::TabletJobInfoPB& job);
Status _process_delete_bitmap(int64_t alter_version, int64_t
start_calc_delete_bitmap_version,
int64_t initiator, const std::string&
vault_id);
-private:
CloudStorageEngine& _cloud_storage_engine;
std::shared_ptr<CloudTablet> _base_tablet;
std::shared_ptr<CloudTablet> _new_tablet;
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 913f8127bd0..ea5f52366ab 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -384,7 +384,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr>
to_add, bool version_
auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>&
rowsets) {
for (auto& rs : rowsets) {
- if (version_overlap || warmup_delta_data) {
+ if (warmup_delta_data) {
#ifndef BE_TEST
bool warm_up_state_updated = false;
// Warmup rowset data in background
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b38c05e5cab..939b1faf856 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1136,6 +1136,10 @@
DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");
DEFINE_mBool(enable_file_cache_adaptive_write, "true");
DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7");
+// if difference below this threshold, we consider cache's progressive
upgrading (2.0->3.0) successful
+DEFINE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold, "0.3");
+DEFINE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio, "0.7");
+
DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 25c95391968..7133adcca37 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1175,6 +1175,8 @@ DECLARE_mInt64(cache_lock_held_long_tail_threshold_us);
DECLARE_mBool(enable_file_cache_keep_base_compaction_output);
DECLARE_mBool(enable_file_cache_adaptive_write);
DECLARE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio);
+DECLARE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold);
+DECLARE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio);
DECLARE_mInt64(file_cache_remove_block_qps_limit);
DECLARE_mInt64(file_cache_background_gc_interval_ms);
DECLARE_mInt64(file_cache_background_block_lru_update_interval_ms);
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 175ea70c81d..3bc0fb0901d 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -280,6 +280,7 @@ struct SchemaChangeParams {
ObjectPool pool;
int32_t be_exec_version;
std::string vault_id;
+ bool output_to_file_cache;
};
class SchemaChangeJob {
diff --git
a/regression-test/suites/schema_change/test_filecache_with_alter_table.groovy
b/regression-test/suites/schema_change/test_filecache_with_alter_table.groovy
new file mode 100644
index 00000000000..87c33562dc6
--- /dev/null
+++
b/regression-test/suites/schema_change/test_filecache_with_alter_table.groovy
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+import org.apache.doris.regression.util.OutputUtils
+
[email protected]
+class RowsetInfo {
+ int startVersion
+ int endVersion
+ String id
+ String originalString
+}
+
+suite("test_filecache_with_alter_table", "docker") {
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.setFeNum(1)
+ options.setBeNum(1)
+
+ options.beConfigs.add('enable_flush_file_cache_async=false')
+
options.beConfigs.add('file_cache_enter_disk_resource_limit_mode_percent=99')
+ options.beConfigs.add('enable_evict_file_cache_in_advance=false')
+
options.beConfigs.add('file_cache_path=[{"path":"/opt/apache-doris/be/storage/file_cache","total_size":83886080,"query_limit":83886080}]')
+
+ def baseTestTable = "test_filecache_with_alter_table"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_backendBrpcPort = [:]
+ def csvPathPrefix = "/tmp/temp_csv_data"
+ def loadBatchNum = 20
+
+ def generateCsvData = {
+ def rowsPerFile = 32768
+ def columnsPerRow = 4
+ def headers = 'col1,col2,col3,col4'
+
+ def dir = new File(csvPathPrefix)
+ if (!dir.exists()) {
+ dir.mkdirs()
+ } else {
+ dir.eachFile { it.delete() }
+ }
+
+ long currentNumber = 1L
+ (1..loadBatchNum).each { fileIndex ->
+ def fileName = String.format("${csvPathPrefix}/data_%02d.csv",
fileIndex)
+ def csvFile = new File(fileName)
+
+ csvFile.withWriter('UTF-8') { writer ->
+ writer.writeLine(headers)
+ (1..rowsPerFile).each { rowIndex ->
+ def row = (1..columnsPerRow).collect { currentNumber++ }
+ writer.writeLine(row.join(','))
+ }
+ }
+ }
+ logger.info("Successfully generated ${loadBatchNum} CSV files in
${csvPathPrefix}")
+ }
+
+ def getTabletStatus = { tablet ->
+ String tabletId = tablet.TabletId
+ String backendId = tablet.BackendId
+ def beHost = backendId_to_backendIP[backendId]
+ def beHttpPort = backendId_to_backendHttpPort[backendId]
+
+ String command = "curl -s -X GET
http://${beHost}:${beHttpPort}/api/compaction/show?tablet_id=${tabletId}"
+
+ logger.info("Executing: ${command}")
+ def process = command.execute()
+ def exitCode = process.waitFor()
+ def output = process.getText()
+
+ logger.info("Get tablet status response: code=${exitCode},
out=${output}")
+ assertEquals(0, exitCode, "Failed to get tablet status.")
+
+ return parseJson(output.trim())
+ }
+
+ def waitForAlterJobToFinish = { tableName, timeoutMillis ->
+ def pollInterval = 1000
+ def timeElapsed = 0
+ while (timeElapsed <= timeoutMillis) {
+ def alterResult = sql_return_maparray """SHOW ALTER TABLE COLUMN
WHERE TableName = "${tableName}" ORDER BY CreateTime DESC LIMIT 1;"""
+ logger.info("Checking ALTER status for table '${tableName}':
${alterResult}")
+ if (alterResult && alterResult[0].State == "FINISHED") {
+ sleep(3000)
+ logger.info("ALTER job on table '${tableName}' finished.
Details: ${alterResult[0]}")
+ return
+ }
+ sleep(pollInterval)
+ timeElapsed += pollInterval
+ }
+ fail("Wait for ALTER job on table '${tableName}' to finish timed out
after ${timeoutMillis}ms.")
+ }
+
+ def runSchemaChangeCacheTest = { String testTable, double inputCacheRatio,
boolean expectOutputCached ->
+
logger.info("==================================================================================")
+ logger.info("Running Test Case on Table '${testTable}': Input Cache
Ratio = ${inputCacheRatio * 100}%, Expect Output Cached =
${expectOutputCached}")
+
logger.info("==================================================================================")
+
+ sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${testTable} (
+ col1 bigint,
+ col2 bigint,
+ col3 bigint,
+ col4 bigint
+ )
+ UNIQUE KEY(col1)
+ DISTRIBUTED BY HASH(col1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ )
+ """
+
+ (1..loadBatchNum).each { fileIndex ->
+ def fileName = String.format("${csvPathPrefix}/data_%02d.csv",
fileIndex)
+ streamLoad {
+ logger.info("Stream loading file index ${fileIndex} into table
${testTable}")
+ set "column_separator", ","
+ table testTable
+ file fileName
+ time 3000
+ check { res, exception, startTime, endTime ->
+ if (exception != null) throw exception
+ def json = parseJson(res)
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ }
+ sql """ SELECT COUNT(col1) from ${testTable} """
+
+ def tablets = sql_return_maparray "show tablets from ${testTable};"
+ assertEquals(1, tablets.size(), "Expected to find exactly one tablet.")
+ def tablet = tablets[0]
+ def beHost = backendId_to_backendIP[tablet.BackendId]
+ def beHttpPort = backendId_to_backendHttpPort[tablet.BackendId]
+
+ def tabletStatus = getTabletStatus(tablet)
+ List<RowsetInfo> originalRowsetInfos = tabletStatus["rowsets"].collect
{ rowsetStr ->
+ def parts = rowsetStr.split(" ")
+ def versionParts = parts[0].replace('[', '').replace(']',
'').split("-")
+ new RowsetInfo(
+ startVersion: versionParts[0].toInteger(),
+ endVersion: versionParts[1].toInteger(),
+ id: parts[4],
+ originalString: rowsetStr
+ )
+ }.findAll { it.startVersion != 0 }.sort { it.startVersion }
+
+ int numToClear = Math.round(originalRowsetInfos.size() * (1 -
inputCacheRatio)).toInteger()
+ logger.info("Total data rowsets: ${originalRowsetInfos.size()}.
Clearing cache for ${numToClear} rowsets to achieve ~${inputCacheRatio * 100}%
hit ratio.")
+
+ originalRowsetInfos.take(numToClear).each { rowset ->
+
Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=clear&sync=true&value=${rowset.id}_0.dat",
true)
+ }
+
+ def cachedInputRowsets = originalRowsetInfos.findAll { rowset ->
+ def data =
Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=list_cache&value=${rowset.id}_0.dat",
true)
+ data.any { item -> !item.endsWith("_idx") &&
!item.endsWith("_disposable") }
+ }
+
+ def actualCachedRatio = cachedInputRowsets.size() /
(double)originalRowsetInfos.size()
+ logger.info("Verification: Cached input rowsets:
${cachedInputRowsets.size()}. Actual cache ratio: ${actualCachedRatio * 100}%")
+ assertTrue(Math.abs(inputCacheRatio - actualCachedRatio) < 0.01,
"Actual cache ratio does not match expected ratio.")
+
+ logger.info("Triggering ALTER TABLE on ${testTable}")
+ sql """ALTER TABLE ${testTable} MODIFY COLUMN col2 VARCHAR(255)"""
+ waitForAlterJobToFinish(testTable, 60000)
+
+ tablets = sql_return_maparray "show tablets from ${testTable};"
+ tablet = tablets[0]
+ tabletStatus = getTabletStatus(tablet)
+
+ def newRowsetInfos = tabletStatus["rowsets"].collect { rowsetStr ->
+ def parts = rowsetStr.split(" ")
+ def version_pair = parts[0].replace('[', '').replace(']',
'').split('-')
+ new RowsetInfo(
+ startVersion: version_pair[0].toInteger(),
+ endVersion: version_pair[1].toInteger(),
+ id: parts[4],
+ originalString: rowsetStr
+ )
+ }.findAll { it.startVersion != 0 }.sort { it.startVersion }
+
+ def cachedOutputRowsets = newRowsetInfos.findAll { rowset ->
+ def data =
Http.GET("http://${beHost}:${beHttpPort}/api/file_cache?op=list_cache&value=${rowset.id}_0.dat",
true)
+ data.any { item -> !item.endsWith("_idx") &&
!item.endsWith("_disposable") }
+ }
+
+ logger.info("After ALTER, found ${cachedOutputRowsets.size()} cached
output rowsets out of ${newRowsetInfos.size()}.")
+
+ if (expectOutputCached) {
+ assertTrue(cachedOutputRowsets.size() > 0, "Expected output
rowsets to be cached, but none were found.")
+ } else {
+ assertEquals(0, cachedOutputRowsets.size(), "Expected output
rowsets NOT to be cached, but some were found.")
+ }
+ logger.info("Test Case Passed: Input Ratio ${inputCacheRatio * 100}%,
Output Cached Check: ${expectOutputCached}")
+
+ sql """ DROP TABLE IF EXISTS ${testTable} force;"""
+ }
+
+ docker(options) {
+ getBackendIpHttpAndBrpcPort(backendId_to_backendIP,
backendId_to_backendHttpPort, backendId_to_backendBrpcPort);
+
+ sql """ set global enable_auto_analyze = false;"""
+
+ generateCsvData()
+
+ runSchemaChangeCacheTest("${baseTestTable}_0", 0.0, false)
+ runSchemaChangeCacheTest("${baseTestTable}_65", 0.65, false)
+ runSchemaChangeCacheTest("${baseTestTable}_75", 0.75, true)
+ runSchemaChangeCacheTest("${baseTestTable}_100", 1.0, true)
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]