This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7e8c81d1983 branch-3.0: [Opt](cloud-sc) Clear stop token when `commit_tablet_job` fails #49275 (#49494) 7e8c81d1983 is described below commit 7e8c81d1983616426cca96d2c4eef825e298ce12 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Thu Mar 27 10:22:53 2025 +0800 branch-3.0: [Opt](cloud-sc) Clear stop token when `commit_tablet_job` fails #49275 (#49494) Cherry-picked from #49275 Co-authored-by: bobhan1 <bao...@selectdb.com> --- be/src/cloud/cloud_schema_change_job.cpp | 20 ++++-- be/src/cloud/cloud_storage_engine.cpp | 16 +++-- be/src/cloud/cloud_storage_engine.h | 2 +- .../test_cloud_sc_self_retry_with_stop_token.out | Bin 0 -> 167 bytes ...test_cloud_sc_self_retry_with_stop_token.groovy | 76 +++++++++++++++++++++ 5 files changed, 102 insertions(+), 12 deletions(-) diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index debdd587037..db2dacd7663 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -364,7 +364,16 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam DBUG_EXECUTE_IF("CloudSchemaChangeJob.process_alter_tablet.sleep", DBUG_BLOCK); // process delete bitmap if the table is MOW + bool has_stop_token {false}; + bool should_clear_stop_token {true}; + Defer defer {[&]() { + if (has_stop_token) { + static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token( + _new_tablet, should_clear_stop_token)); + } + }}; if (_new_tablet->enable_unique_key_merge_on_write()) { + has_stop_token = true; int64_t initiator = boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) & std::numeric_limits<int64_t>::max(); // If there are historical versions of rowsets, we need to recalculate their delete @@ -385,6 +394,11 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("test txn conflict"); } }); + DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job", { + LOG_INFO("inject retryable error before commit sc job, tablet={}", + _new_tablet->tablet_id()); + return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("injected retryable error"); + }); auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, &finish_resp); if (!st.ok()) { if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) { @@ -397,6 +411,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam return Status::OK(); } return st; + } else { + should_clear_stop_token = false; } const auto& stats = finish_resp.stats(); { @@ -419,10 +435,6 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, .tag("start_calc_delete_bitmap_version", start_calc_delete_bitmap_version) .tag("alter_version", alter_version); RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet, initiator)); - Defer defer {[&]() { - static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token(_new_tablet)); - }}; - TabletMetaSharedPtr tmp_meta = std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta())); tmp_meta->delete_bitmap().delete_bitmap.clear(); std::shared_ptr<CloudTablet> tmp_tablet = diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 2dcc01695d0..45d53642140 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -906,7 +906,7 @@ Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet return st; } -Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet) { +Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms) { std::shared_ptr<CloudCompactionStopToken> stop_token; { std::lock_guard lock(_compaction_mtx); @@ -918,12 +918,14 @@ Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tabl } _active_compaction_stop_tokens.erase(tablet->tablet_id()); } - // stop token will be removed when SC commit or abort - // RETURN_IF_ERROR(stop_token->do_unregister()); - LOG_INFO( - "successfully unregister compaction stop token for tablet_id={}, " - "delete_bitmap_lock_initiator={}", - tablet->tablet_id(), stop_token->initiator()); + LOG_INFO("successfully unregister compaction stop token for tablet_id={}", tablet->tablet_id()); + if (stop_token && clear_ms) { + RETURN_IF_ERROR(stop_token->do_unregister()); + LOG_INFO( + "successfully remove compaction stop token from MS for tablet_id={}, " + "delete_bitmap_lock_initiator={}", + tablet->tablet_id(), stop_token->initiator()); + } return Status::OK(); } diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 35be442195e..046116e3a24 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -150,7 +150,7 @@ public: Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator); - Status unregister_compaction_stop_token(CloudTabletSPtr tablet); + Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool clear_ms); private: void _refresh_storage_vault_info_thread_callback(); diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.out new file mode 100644 index 00000000000..69c340fa35a Binary files /dev/null and b/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.out differ diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.groovy new file mode 100644 index 00000000000..5978320aa19 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.groovy @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_cloud_sc_self_retry_with_stop_token", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + + def table1 = "test_cloud_sc_self_retry_with_stop_token" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2);" + sql "insert into ${table1} values(3,3,3,2);" + sql "sync;" + qt_sql "select * from ${table1} order by k1;" + + try { + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job") + + sql "alter table ${table1} modify column c2 varchar(100);" + + def res + Awaitility.await().atMost(40, TimeUnit.SECONDS).pollDelay(1, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> { + res = sql_return_maparray """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + logger.info("res: ${res}") + if (res[0].State == "FINISHED" || res[0].State == "CANCELLED") { + return true; + } + return false; + }); + + assert res[0].State == "CANCELLED" + assert !res[0].Msg.contains("compactions are not allowed on tablet_id") && !res[0].Msg.contains("stop token already exists") + assert res[0].Msg.contains("DELETE_BITMAP_LOCK_ERROR") + + qt_sql "select * from ${table1} order by k1;" + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org