This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 9ac86aae215 branch-4.1: [opt](cloud) Enable compaction on new tablets
during schema change queuing #61089 (#61629)
9ac86aae215 is described below
commit 9ac86aae215fafd21309de2e498457d456334a03
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 24 16:38:55 2026 +0800
branch-4.1: [opt](cloud) Enable compaction on new tablets during schema
change queuing #61089 (#61629)
Cherry-picked from #61089
Co-authored-by: Jimmy <[email protected]>
---
be/src/agent/agent_server.cpp | 3 +-
be/src/agent/task_worker_pool.cpp | 50 +++++-
be/src/agent/task_worker_pool.h | 6 +-
be/src/cloud/cloud_cumulative_compaction.cpp | 16 ++
be/src/cloud/cloud_schema_change_job.cpp | 37 +++++
be/test/agent/task_worker_pool_test.cpp | 49 ++++++
.../test_sc_compaction_cross_v1_race.groovy | 136 ++++++++++++++++
.../test_sc_compaction_optimization.groovy | 159 +++++++++++++++++++
...est_sc_compaction_optimization_with_load.groovy | 174 +++++++++++++++++++++
9 files changed, 625 insertions(+), 5 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index c54390d16cf..c00613b5df6 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -211,7 +211,8 @@ void AgentServer::cloud_start_workers(CloudStorageEngine&
engine, ExecEnv* exec_
_workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
"ALTER_TABLE", config::alter_tablet_worker_count,
- [&engine](auto&& task) { return
alter_cloud_tablet_callback(engine, task); });
+ [&engine](auto&& task) { return
alter_cloud_tablet_callback(engine, task); },
+ [&engine](auto&& task) { set_alter_version_before_enqueue(engine,
task); });
_workers[TTaskType::CALCULATE_DELETE_BITMAP] =
std::make_unique<TaskWorkerPool>(
"CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index ffdd110cf81..252f553dc55 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -520,9 +520,11 @@ bvar::Adder<uint64_t> report_index_policy_failed("report",
"index_policy_failed"
} // namespace
-TaskWorkerPool::TaskWorkerPool(std::string_view name, int worker_count,
- std::function<void(const TAgentTaskRequest&
task)> callback)
- : _callback(std::move(callback)) {
+TaskWorkerPool::TaskWorkerPool(
+ std::string_view name, int worker_count,
+ std::function<void(const TAgentTaskRequest& task)> callback,
+ std::function<void(const TAgentTaskRequest& task)> pre_submit_callback)
+ : _callback(std::move(callback)),
_pre_submit_callback(std::move(pre_submit_callback)) {
auto st = ThreadPoolBuilder(fmt::format("TaskWP_{}", name))
.set_min_threads(worker_count)
.set_max_threads(worker_count)
@@ -546,6 +548,9 @@ void TaskWorkerPool::stop() {
Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) {
return _submit_task(task, [this](auto&& task) {
+ if (_pre_submit_callback) {
+ _pre_submit_callback(task);
+ }
add_task_count(task, 1);
return _thread_pool->submit_func([this, task]() {
_callback(task);
@@ -2239,9 +2244,48 @@ void alter_cloud_tablet_callback(CloudStorageEngine&
engine, const TAgentTaskReq
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_last_active_time.set_value(now);
+
+ // Clean up alter_version before remove_task_info to avoid race:
+ // remove_task_info allows same-signature re-submit, whose
pre_submit_callback
+ // would set alter_version, then this cleanup would wipe it.
+ if (req.__isset.alter_tablet_req_v2) {
+ const auto& alter_req = req.alter_tablet_req_v2;
+ auto new_tablet =
engine.tablet_mgr().get_tablet(alter_req.new_tablet_id);
+ auto base_tablet =
engine.tablet_mgr().get_tablet(alter_req.base_tablet_id);
+ if (new_tablet.has_value()) {
+ new_tablet.value()->set_alter_version(-1);
+ }
+ if (base_tablet.has_value()) {
+ base_tablet.value()->set_alter_version(-1);
+ }
+ }
+
remove_task_info(req.task_type, req.signature);
}
+void set_alter_version_before_enqueue(CloudStorageEngine& engine, const
TAgentTaskRequest& req) {
+ if (!req.__isset.alter_tablet_req_v2) {
+ return;
+ }
+ const auto& alter_req = req.alter_tablet_req_v2;
+ if (alter_req.alter_version <= 1) {
+ return;
+ }
+ auto new_tablet = engine.tablet_mgr().get_tablet(alter_req.new_tablet_id);
+ if (!new_tablet.has_value() || new_tablet.value()->tablet_state() ==
TABLET_RUNNING) {
+ return;
+ }
+ auto base_tablet =
engine.tablet_mgr().get_tablet(alter_req.base_tablet_id);
+ if (!base_tablet.has_value()) {
+ return;
+ }
+ new_tablet.value()->set_alter_version(alter_req.alter_version);
+ base_tablet.value()->set_alter_version(alter_req.alter_version);
+ LOG(INFO) << "set alter_version=" << alter_req.alter_version
+ << " before enqueue, base_tablet=" << alter_req.base_tablet_id
+ << ", new_tablet=" << alter_req.new_tablet_id;
+}
+
void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) {
std::unordered_map<int64_t, int64_t> gc_tablet_infos;
if (!req.__isset.gc_binlog_req) {
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 300e1daa606..06e2f1f419c 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -50,7 +50,8 @@ public:
class TaskWorkerPool : public TaskWorkerPoolIf {
public:
TaskWorkerPool(std::string_view name, int worker_count,
- std::function<void(const TAgentTaskRequest&)> callback);
+ std::function<void(const TAgentTaskRequest&)> callback,
+ std::function<void(const TAgentTaskRequest&)>
pre_submit_callback = nullptr);
~TaskWorkerPool() override;
@@ -62,6 +63,7 @@ protected:
std::atomic_bool _stopped {false};
std::unique_ptr<ThreadPool> _thread_pool;
std::function<void(const TAgentTaskRequest&)> _callback;
+ std::function<void(const TAgentTaskRequest&)> _pre_submit_callback;
};
class PublishVersionWorkerPool final : public TaskWorkerPool {
@@ -180,6 +182,8 @@ void alter_tablet_callback(StorageEngine& engine, const
TAgentTaskRequest& req);
void alter_cloud_tablet_callback(CloudStorageEngine& engine, const
TAgentTaskRequest& req);
+void set_alter_version_before_enqueue(CloudStorageEngine& engine, const
TAgentTaskRequest& req);
+
void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info,
const TAgentTaskRequest& req);
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp
b/be/src/cloud/cloud_cumulative_compaction.cpp
index a6ad435ce63..bc37133e271 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -292,6 +292,22 @@ Status CloudCumulativeCompaction::modify_rowsets() {
LOG(INFO) <<
"CloudCumulativeCompaction::modify_rowsets.enable_spin_wait, exit";
});
+ // Block only NOTREADY tablets (SC new tablets) before compaction commit.
+ // RUNNING tablets (system tables, base tablets) are not affected.
+
DBUG_EXECUTE_IF("CloudCumulativeCompaction::modify_rowsets.block_notready", {
+ if (_tablet->tablet_state() == TABLET_NOTREADY) {
+ LOG(INFO) << "block NOTREADY tablet compaction before commit"
+ << ", tablet_id=" << _tablet->tablet_id() << ", output=["
+ << _input_rowsets.front()->start_version() << "-"
+ << _input_rowsets.back()->end_version() << "]";
+ while (DebugPoints::instance()->is_enable(
+
"CloudCumulativeCompaction::modify_rowsets.block_notready")) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(50));
+ }
+ LOG(INFO) << "release NOTREADY tablet compaction, tablet_id=" <<
_tablet->tablet_id();
+ }
+ });
+
DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
int64_t initiator = this->initiator();
int64_t get_delete_bitmap_lock_start_time = 0;
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index 807422c1fce..6e5b9b8b9c8 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -110,6 +110,13 @@ Status CloudSchemaChangeJob::process_alter_tablet(const
TAlterTabletReqV2& reque
_output_cumulative_point = _base_tablet->cumulative_layer_point();
std::vector<RowSetSplits> rs_splits;
int64_t base_max_version = _base_tablet->max_version_unlocked();
+
DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.override_base_max_version",
{
+ auto v = dp->param<int64_t>("version", -1);
+ if (v > 0) {
+ LOG(INFO) << "override base_max_version from " << base_max_version
<< " to " << v;
+ base_max_version = v;
+ }
+ });
cloud::TabletJobInfoPB job;
auto* idx = job.mutable_idx();
idx->set_tablet_id(_base_tablet->tablet_id());
@@ -147,6 +154,32 @@ Status CloudSchemaChangeJob::process_alter_tablet(const
TAlterTabletReqV2& reque
LOG(WARNING) << "inject error. res=" << res;
return res;
});
+
+ // Check for cross-V1 compaction rowsets on new tablet.
+ // During queue wait, compaction may have committed a rowset that crosses
the
+ // alter_version boundary (V1). This happens when compaction commits before
+ // prepare_tablet_job registers the SC job in meta-service.
+ // If such a rowset exists, SC commit would create version overlap, so we
+ // fail early and let FE retry (with a higher V1 next time).
+ {
+ RETURN_IF_ERROR(_new_tablet->sync_rowsets());
+ std::shared_lock rlock(_new_tablet->get_header_lock());
+ for (auto& [v, rs] : _new_tablet->rowset_map()) {
+ if (v.first > 1 && v.first <= start_resp.alter_version() &&
+ v.second > start_resp.alter_version()) {
+ LOG(WARNING) << "cross-V1 compaction detected on new tablet"
+ << ", tablet_id=" << _new_tablet->tablet_id() <<
", rowset=["
+ << v.first << "-" << v.second << "]"
+ << ", alter_version=" <<
start_resp.alter_version()
+ << ", job_id=" << _job_id << ". Will retry with
higher alter_version.";
+ return Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "cross-V1 compaction detected on new tablet,
tablet_id={}, "
+ "rowset=[{}-{}], alter_version={}",
+ _new_tablet->tablet_id(), v.first, v.second,
start_resp.alter_version());
+ }
+ }
+ }
+
if (request.alter_version > 1) {
// [0-1] is a placeholder rowset, no need to convert
RETURN_IF_ERROR(_base_tablet->capture_rs_readers({2,
start_resp.alter_version()},
@@ -155,6 +188,10 @@ Status CloudSchemaChangeJob::process_alter_tablet(const
TAlterTabletReqV2& reque
.enable_prefer_cached_rowset = false,
.query_freshness_tolerance_ms = -1}));
}
+ // Between prepare_tablet_job (SC job registered in meta-service) and
+ // set_alter_version (local alter_version update). Used to test cross-V1
race.
+
DBUG_EXECUTE_IF("CloudSchemaChangeJob::process_alter_tablet.after_prepare_job",
DBUG_BLOCK);
+
Defer defer2 {[&]() {
_new_tablet->set_alter_version(-1);
_base_tablet->set_alter_version(-1);
diff --git a/be/test/agent/task_worker_pool_test.cpp
b/be/test/agent/task_worker_pool_test.cpp
index 3b5c8ff3df0..9cd7ddd640d 100644
--- a/be/test/agent/task_worker_pool_test.cpp
+++ b/be/test/agent/task_worker_pool_test.cpp
@@ -54,6 +54,55 @@ TEST(TaskWorkerPoolTest, TaskWorkerPool) {
EXPECT_EQ(count.load(), 2);
}
+TEST(TaskWorkerPoolTest, PreSubmitCallback) {
+ std::atomic_int callback_count {0};
+ std::atomic_int pre_submit_count {0};
+ TaskWorkerPool workers(
+ "test", 1,
+ [&](auto&& task) {
+ std::this_thread::sleep_for(200ms);
+ ++callback_count;
+ },
+ [&](auto&& task) { ++pre_submit_count; });
+
+ TAgentTaskRequest task;
+ task.__set_signature(-1);
+ auto _ = workers.submit_task(task);
+ _ = workers.submit_task(task);
+
+ // pre_submit_callback is called synchronously before enqueue
+ EXPECT_EQ(pre_submit_count.load(), 2);
+
+ std::this_thread::sleep_for(600ms);
+ workers.stop();
+ EXPECT_EQ(callback_count.load(), 2);
+ EXPECT_EQ(pre_submit_count.load(), 2);
+}
+
+TEST(TaskWorkerPoolTest, PreSubmitCallbackWithDedup) {
+ std::atomic_int pre_submit_count {0};
+ std::atomic_int callback_count {0};
+ TaskWorkerPool workers(
+ "test", 1,
+ [&](auto&& task) {
+ std::this_thread::sleep_for(500ms);
+ ++callback_count;
+ },
+ [&](auto&& task) { ++pre_submit_count; });
+
+ TAgentTaskRequest task;
+ task.__set_task_type(TTaskType::ALTER);
+ task.__set_signature(12345);
+ auto _ = workers.submit_task(task);
+ _ = workers.submit_task(task); // Should be deduped by register_task_info
+
+ EXPECT_EQ(pre_submit_count.load(), 1); // Only called once, second was
deduped
+
+ std::this_thread::sleep_for(600ms);
+ workers.stop();
+ EXPECT_EQ(callback_count.load(), 1);
+}
+
TEST(TaskWorkerPoolTest, PriorTaskWorkerPool) {
std::atomic_int normal_count {0};
std::atomic_int high_prior_count {0};
diff --git
a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_race.groovy
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_race.groovy
new file mode 100644
index 00000000000..a34dd18e590
--- /dev/null
+++
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_cross_v1_race.groovy
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test: Reproduce cross-V1 compaction race that causes BE crash.
+//
+// Timeline:
+// SC blocked → compaction commits [5-10] on new tablet → SC runs with V1=6
(override)
+// → SC commit replaces [2,6] but [5-10] not deleted (crosses V1) → version
overlap → BE crash
+//
+// This simulates the multi-BE scenario where the SC-executing BE has a stale
+// base tablet version, causing V1 to be lower than compaction output max.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_sc_compaction_cross_v1_race', 'docker') {
+
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.enableDebugPoints()
+ options.beConfigs += ["enable_java_support=false"]
+ options.beConfigs += ["enable_new_tablet_do_compaction=true"]
+ options.beConfigs += ["alter_tablet_worker_count=1"]
+ options.beConfigs += ["cumulative_compaction_min_deltas=2"]
+ options.beNum = 1
+
+ docker(options) {
+ def tableName = "sc_cross_v1_test"
+
+ def getJobState = { tbl ->
+ def result = sql """SHOW ALTER TABLE COLUMN WHERE
IndexName='${tbl}' ORDER BY createtime DESC LIMIT 1"""
+ logger.info("getJobState: ${result}")
+ return result[0][9]
+ }
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE ${tableName} (
+ k1 int NOT NULL,
+ v1 varchar(100) NOT NULL,
+ v2 int NOT NULL
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ // Phase 1: Insert initial data (versions 2, 3, 4)
+ for (int i = 0; i < 3; i++) {
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${tableName} VALUES ")
+ for (int j = 0; j < 20; j++) {
+ if (j > 0) sb.append(", ")
+ def key = i * 20 + j + 1
+ sb.append("(${key}, 'val_${key}', ${key * 10})")
+ }
+ sql sb.toString()
+ }
+ assertEquals(60L, (sql "SELECT count(*) FROM ${tableName}")[0][0])
+
+ // Phase 2: Block SC at entry, let compaction run freely during block
+ def scBlock = 'CloudSchemaChangeJob::process_alter_tablet.block'
+ GetDebugPoint().enableDebugPointForAllBEs(scBlock)
+
+ try {
+ sql "ALTER TABLE ${tableName} MODIFY COLUMN v2 bigint"
+ sleep(10000)
+ assertEquals("RUNNING", getJobState(tableName))
+
+ // Phase 3: Insert 6 batches (versions 5-10), compaction runs
freely
+ for (int i = 0; i < 6; i++) {
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${tableName} VALUES ")
+ for (int j = 0; j < 10; j++) {
+ if (j > 0) sb.append(", ")
+ def key = 100 + i * 10 + j + 1
+ sb.append("(${key}, 'new_${key}', ${key * 10})")
+ }
+ sql sb.toString()
+ }
+
+ // Phase 4: Wait for compaction to merge [5-10] on new tablet
+ sleep(30000)
+
+ // Phase 5: Override V1=6, then release SC
+ // Compaction [5-10] already committed to meta-service (no SC job
yet → success)
+ // SC will run with V1=6 → SC commit replaces [2,6] → [5-10] not
deleted → overlap
+ GetDebugPoint().enableDebugPointForAllBEs(
+
'CloudSchemaChangeJob::process_alter_tablet.override_base_max_version',
+ [version: 6])
+
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(scBlock)
+ }
+
+ // Wait for SC to finish
+ int maxTries = 120
+ def finalState = ""
+ while (maxTries-- > 0) {
+ finalState = getJobState(tableName)
+ if (finalState == "FINISHED" || finalState == "CANCELLED") {
+ break
+ }
+ sleep(1000)
+ }
+
+ // Clean up debug point
+ GetDebugPoint().disableDebugPointForAllBEs(
+
'CloudSchemaChangeJob::process_alter_tablet.override_base_max_version')
+
+ logger.info("SC final state: ${finalState}")
+
+ // Wait for potential BE crash from overlapping rowsets
+ sleep(15000)
+
+ // Verify BE is still alive
+ def backendsAfter = sql_return_maparray("show backends")
+ logger.info("BE alive status after SC: ${backendsAfter.collect {
it.Alive }}")
+ assertTrue(backendsAfter.every { it.Alive.toString() == "true" },
+ "BE crashed after SC due to cross-V1 compaction rowset overlap")
+ }
+}
diff --git
a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization.groovy
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization.groovy
new file mode 100644
index 00000000000..a2953806515
--- /dev/null
+++
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization.groovy
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test: Verify that pre_submit_callback correctly sets alter_version on new
tablets
+// so that auto cumulative compaction can compact double-write rowsets during
SC.
+// Without the optimization, alter_version=-1 and auto compaction skips
NOTREADY tablets.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_sc_compaction_optimization', 'docker') {
+
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.enableDebugPoints()
+ options.beConfigs += ["enable_java_support=false"]
+ options.beConfigs += ["enable_new_tablet_do_compaction=true"]
+ options.beConfigs += ["alter_tablet_worker_count=1"]
+ options.beConfigs += ["cumulative_compaction_min_deltas=2"]
+ options.beNum = 1
+
+ docker(options) {
+ def tableName = "sc_opt_test"
+
+ def getJobState = { tbl ->
+ def result = sql """SHOW ALTER TABLE COLUMN WHERE
IndexName='${tbl}' ORDER BY createtime DESC LIMIT 1"""
+ logger.info("getJobState: ${result}")
+ return result[0][9]
+ }
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE ${tableName} (
+ k1 int NOT NULL,
+ v1 varchar(100) NOT NULL,
+ v2 int NOT NULL
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 2
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ // Insert initial data
+ for (int i = 0; i < 3; i++) {
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${tableName} VALUES ")
+ for (int j = 0; j < 20; j++) {
+ if (j > 0) sb.append(", ")
+ def key = i * 20 + j + 1
+ sb.append("(${key}, 'val_${key}', ${key * 10})")
+ }
+ sql sb.toString()
+ }
+
+ assertEquals(60L, (sql "SELECT count(*) FROM ${tableName}")[0][0])
+
+ def baseTablets = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+ assertEquals(2, baseTablets.size())
+ def baseTabletIds = baseTablets.collect { it.TabletId.toString() }
+
+ def backends = sql_return_maparray("show backends")
+ def be = backends[0]
+
+ // Block SC at the very beginning of process_alter_tablet, BEFORE
prepare_tablet_job.
+ // This avoids meta-service tablet job lock which would block BE HTTP
service.
+ def injectName = 'CloudSchemaChangeJob::process_alter_tablet.block'
+ GetDebugPoint().enableDebugPointForAllBEs(injectName)
+
+ try {
+ sql "ALTER TABLE ${tableName} MODIFY COLUMN v2 bigint"
+ sleep(10000)
+ assertEquals("RUNNING", getJobState(tableName))
+
+ def allTablets = sql_return_maparray("SHOW TABLETS FROM
${tableName}")
+ assertEquals(4, allTablets.size())
+ def newTablets = allTablets.findAll { !(it.TabletId.toString() in
baseTabletIds) }
+ assertEquals(2, newTablets.size())
+
+ // Insert 6 batches during SC -> creates double-write rowsets on
new tablets
+ for (int i = 0; i < 6; i++) {
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${tableName} VALUES ")
+ for (int j = 0; j < 10; j++) {
+ if (j > 0) sb.append(", ")
+ def key = 100 + i * 10 + j + 1
+ sb.append("(${key}, 'new_${key}', ${key * 10})")
+ }
+ sql sb.toString()
+ }
+
+ // Wait for auto compaction to sync double-write rowsets and
compact them
+ sleep(30000)
+
+ // Verify compaction happened: check if any non-placeholder rowset
spans multiple
+ // versions (e.g. [4-6]). Each INSERT creates a single-version
rowset [v-v],
+ // so a multi-version rowset is direct proof of compaction.
+ boolean compactionHappened = false
+ for (def tablet : newTablets) {
+ def tabletId = tablet.TabletId.toString()
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ if (code == 0) {
+ def status = parseJson(out.trim())
+ if (status.rowsets instanceof List) {
+ logger.info("New tablet ${tabletId} rowsets:
${status.rowsets}")
+ for (def rowset : status.rowsets) {
+ def match = (rowset =~ /\[(\d+)-(\d+)\]/)
+ if (match) {
+ def start = match[0][1] as int
+ def end = match[0][2] as int
+ if (start > 1 && end > start) {
+ logger.info("New tablet ${tabletId} has
merged rowset [${start}-${end}], compaction confirmed")
+ compactionHappened = true
+ break
+ }
+ }
+ }
+ }
+ }
+ if (compactionHappened) break
+ }
+ assertTrue(compactionHappened, "Expected auto compaction on new
tablets during SC queue wait")
+
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injectName)
+ }
+
+ // Wait for SC to finish
+ int maxTries = 300
+ def finalState = ""
+ while (maxTries-- > 0) {
+ finalState = getJobState(tableName)
+ if (finalState == "FINISHED" || finalState == "CANCELLED") {
+ sleep(10000) // Wait 10s for BE to fully recover after SC
+ break
+ }
+ sleep(1000)
+ }
+ assertEquals("FINISHED", finalState)
+
+ // Verify data correctness after SC
+ assertEquals(120L, (sql "SELECT count(*) FROM ${tableName}")[0][0])
+ assertEquals(120L, (sql "SELECT count(distinct k1) FROM
${tableName}")[0][0])
+
+ }
+}
diff --git
a/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization_with_load.groovy
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization_with_load.groovy
new file mode 100644
index 00000000000..31361fc1723
--- /dev/null
+++
b/regression-test/suites/cloud_p1/schema_change/compaction_optimization/test_sc_compaction_optimization_with_load.groovy
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test: End-to-end correctness with SC compaction optimization.
+// Verifies:
+// 1. SC completes successfully with concurrent writes and auto compaction on
new tablets
+// 2. alter_version cleanup works — post-SC compaction runs normally
+// 3. Data consistency after SC + compaction + continued loading
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite('test_sc_compaction_optimization_with_load', 'docker') {
+
+ def options = new ClusterOptions()
+ options.cloudMode = true
+ options.enableDebugPoints()
+ options.beConfigs += ["enable_java_support=false"]
+ options.beConfigs += ["enable_new_tablet_do_compaction=true"]
+ options.beConfigs += ["alter_tablet_worker_count=1"]
+ options.beConfigs += ["cumulative_compaction_min_deltas=2"]
+ options.beNum = 1
+
+ docker(options) {
+ def tableName = "sc_opt_load_test"
+
+ def getJobState = { tbl ->
+ def result = sql """SHOW ALTER TABLE COLUMN WHERE
IndexName='${tbl}' ORDER BY createtime DESC LIMIT 1"""
+ logger.info("getJobState: ${result}")
+ return result[0][9]
+ }
+
+ def insertBatch = { int startKey, int count, String prefix ->
+ StringBuilder sb = new StringBuilder()
+ sb.append("INSERT INTO ${tableName} VALUES ")
+ for (int j = 0; j < count; j++) {
+ if (j > 0) sb.append(", ")
+ def key = startKey + j
+ sb.append("(${key}, '${prefix}_k2_${key}', ${key},
'${prefix}_v2_${key}')")
+ }
+ sql sb.toString()
+ }
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE ${tableName} (
+ k1 int NOT NULL,
+ k2 varchar(50) NOT NULL,
+ v1 int NOT NULL,
+ v2 varchar(200) NOT NULL
+ )
+ DISTRIBUTED BY HASH(k1) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ // Phase 1: Load initial data
+ for (int i = 0; i < 5; i++) {
+ insertBatch(i * 30 + 1, 30, "init_${i}")
+ }
+ assertEquals(150L, (sql "SELECT count(*) FROM ${tableName}")[0][0])
+
+ def baseTablets = sql_return_maparray("SHOW TABLETS FROM ${tableName}")
+ assertEquals(3, baseTablets.size())
+ def baseTabletIds = baseTablets.collect { it.TabletId.toString() }
+
+ def backends = sql_return_maparray("show backends")
+ def be = backends[0]
+
+ // Block SC at the very beginning of process_alter_tablet, BEFORE
prepare_tablet_job.
+ // This avoids meta-service tablet job lock which would block BE HTTP
service.
+ def injectName = 'CloudSchemaChangeJob::process_alter_tablet.block'
+ GetDebugPoint().enableDebugPointForAllBEs(injectName)
+
+ try {
+ sql "ALTER TABLE ${tableName} MODIFY COLUMN v1 bigint"
+ sleep(10000)
+ assertEquals("RUNNING", getJobState(tableName))
+
+ // Phase 3: Heavy loading during SC
+ for (int i = 0; i < 8; i++) {
+ insertBatch(200 + i * 20, 20, "sc_${i}")
+ }
+
+ def allTablets = sql_return_maparray("SHOW TABLETS FROM
${tableName}")
+ assertEquals(6, allTablets.size())
+ def newTablets = allTablets.findAll { !(it.TabletId.toString() in
baseTabletIds) }
+ assertEquals(3, newTablets.size())
+
+ // Wait for auto compaction to sync double-write rowsets and
compact them
+ sleep(30000)
+
+ // Verify compaction happened: check if any non-placeholder rowset
spans multiple
+ // versions (e.g. [4-6]). Each INSERT creates a single-version
rowset [v-v],
+ // so a multi-version rowset is direct proof of compaction.
+ boolean compactionHappened = false
+ for (def tablet : newTablets) {
+ def tabletId = tablet.TabletId.toString()
+ def (code, out, err) = curl("GET", tablet.CompactionStatus)
+ if (code == 0) {
+ def status = parseJson(out.trim())
+ if (status.rowsets instanceof List) {
+ logger.info("New tablet ${tabletId} rowsets:
${status.rowsets}")
+ for (def rowset : status.rowsets) {
+ def match = (rowset =~ /\[(\d+)-(\d+)\]/)
+ if (match) {
+ def start = match[0][1] as int
+ def end = match[0][2] as int
+ if (start > 1 && end > start) {
+ logger.info("New tablet ${tabletId} has
merged rowset [${start}-${end}], compaction confirmed")
+ compactionHappened = true
+ break
+ }
+ }
+ }
+ }
+ }
+ if (compactionHappened) break
+ }
+ assertTrue(compactionHappened, "Expected auto compaction on new
tablets during SC queue wait")
+
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(injectName)
+ }
+
+ // Phase 4: Wait for SC completion
+ int maxTries = 300
+ def finalState = ""
+ while (maxTries-- > 0) {
+ finalState = getJobState(tableName)
+ if (finalState == "FINISHED" || finalState == "CANCELLED") {
+ sleep(10000) // Wait 10s for BE to fully recover
+ break
+ }
+ sleep(1000)
+ }
+ assertEquals("FINISHED", finalState)
+
+ // Phase 5: Verify data correctness and compaction
+ assertEquals(310L, (sql "SELECT count(*) FROM ${tableName}")[0][0])
+
+ // Verify column type changed
+ def schema = sql "DESC ${tableName}"
+ def v1Type = schema.find { it[0] == "v1" }[1]
+ assertEquals("bigint", v1Type.toLowerCase())
+
+ // Phase 6: Post-SC loading (verify alter_version cleanup)
+ for (int i = 0; i < 3; i++) {
+ insertBatch(500 + i * 10, 10, "post_${i}")
+ }
+ // Phase 6: Verify data correctness and schema change
+ def expectedCount = 150 + 160 + 30 // initial(5*30) + SC
inserts(8*20) + post-SC(3*10)
+ assertEquals(expectedCount, (sql "SELECT count(*) FROM
${tableName}")[0][0])
+ assertEquals(expectedCount, (sql "SELECT count(distinct k1) FROM
${tableName}")[0][0])
+
+ def desc = sql "DESC ${tableName}"
+ def v1Col = desc.find { it[0] == "v1" }
+ assertTrue(v1Col[1].toString().toLowerCase().contains("bigint"))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]