This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7e8c81d1983 branch-3.0: [Opt](cloud-sc) Clear stop token when 
`commit_tablet_job` fails #49275 (#49494)
7e8c81d1983 is described below

commit 7e8c81d1983616426cca96d2c4eef825e298ce12
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Mar 27 10:22:53 2025 +0800

    branch-3.0: [Opt](cloud-sc) Clear stop token when `commit_tablet_job` fails 
#49275 (#49494)
    
    Cherry-picked from #49275
    
    Co-authored-by: bobhan1 <bao...@selectdb.com>
---
 be/src/cloud/cloud_schema_change_job.cpp           |  20 ++++--
 be/src/cloud/cloud_storage_engine.cpp              |  16 +++--
 be/src/cloud/cloud_storage_engine.h                |   2 +-
 .../test_cloud_sc_self_retry_with_stop_token.out   | Bin 0 -> 167 bytes
 ...test_cloud_sc_self_retry_with_stop_token.groovy |  76 +++++++++++++++++++++
 5 files changed, 102 insertions(+), 12 deletions(-)

diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index debdd587037..db2dacd7663 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -364,7 +364,16 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
 
     DBUG_EXECUTE_IF("CloudSchemaChangeJob.process_alter_tablet.sleep", 
DBUG_BLOCK);
     // process delete bitmap if the table is MOW
+    bool has_stop_token {false};
+    bool should_clear_stop_token {true};
+    Defer defer {[&]() {
+        if (has_stop_token) {
+            
static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token(
+                    _new_tablet, should_clear_stop_token));
+        }
+    }};
     if (_new_tablet->enable_unique_key_merge_on_write()) {
+        has_stop_token = true;
         int64_t initiator = 
boost::uuids::hash_value(UUIDGenerator::instance()->next_uuid()) &
                             std::numeric_limits<int64_t>::max();
         // If there are historical versions of rowsets, we need to recalculate 
their delete
@@ -385,6 +394,11 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
             return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("test 
txn conflict");
         }
     });
+    
DBUG_EXECUTE_IF("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job",
 {
+        LOG_INFO("inject retryable error before commit sc job, tablet={}",
+                 _new_tablet->tablet_id());
+        return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>("injected 
retryable error");
+    });
     auto st = _cloud_storage_engine.meta_mgr().commit_tablet_job(job, 
&finish_resp);
     if (!st.ok()) {
         if (finish_resp.status().code() == cloud::JOB_ALREADY_SUCCESS) {
@@ -397,6 +411,8 @@ Status 
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
             return Status::OK();
         }
         return st;
+    } else {
+        should_clear_stop_token = false;
     }
     const auto& stats = finish_resp.stats();
     {
@@ -419,10 +435,6 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
             .tag("start_calc_delete_bitmap_version", 
start_calc_delete_bitmap_version)
             .tag("alter_version", alter_version);
     
RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet,
 initiator));
-    Defer defer {[&]() {
-        
static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token(_new_tablet));
-    }};
-
     TabletMetaSharedPtr tmp_meta = 
std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta()));
     tmp_meta->delete_bitmap().delete_bitmap.clear();
     std::shared_ptr<CloudTablet> tmp_tablet =
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 2dcc01695d0..45d53642140 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -906,7 +906,7 @@ Status 
CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet
     return st;
 }
 
-Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr 
tablet) {
+Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr 
tablet, bool clear_ms) {
     std::shared_ptr<CloudCompactionStopToken> stop_token;
     {
         std::lock_guard lock(_compaction_mtx);
@@ -918,12 +918,14 @@ Status 
CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tabl
         }
         _active_compaction_stop_tokens.erase(tablet->tablet_id());
     }
-    // stop token will be removed when SC commit or abort
-    // RETURN_IF_ERROR(stop_token->do_unregister());
-    LOG_INFO(
-            "successfully unregister compaction stop token for tablet_id={}, "
-            "delete_bitmap_lock_initiator={}",
-            tablet->tablet_id(), stop_token->initiator());
+    LOG_INFO("successfully unregister compaction stop token for tablet_id={}", 
tablet->tablet_id());
+    if (stop_token && clear_ms) {
+        RETURN_IF_ERROR(stop_token->do_unregister());
+        LOG_INFO(
+                "successfully remove compaction stop token from MS for 
tablet_id={}, "
+                "delete_bitmap_lock_initiator={}",
+                tablet->tablet_id(), stop_token->initiator());
+    }
     return Status::OK();
 }
 
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index 35be442195e..046116e3a24 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -150,7 +150,7 @@ public:
 
     Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t 
initiator);
 
-    Status unregister_compaction_stop_token(CloudTabletSPtr tablet);
+    Status unregister_compaction_stop_token(CloudTabletSPtr tablet, bool 
clear_ms);
 
 private:
     void _refresh_storage_vault_info_thread_callback();
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.out
new file mode 100644
index 00000000000..69c340fa35a
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.out
 differ
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.groovy
new file mode 100644
index 00000000000..5978320aa19
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sc_self_retry_with_stop_token.groovy
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.TimeUnit
+import org.awaitility.Awaitility
+
+suite("test_cloud_sc_self_retry_with_stop_token", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+
+    def table1 = "test_cloud_sc_self_retry_with_stop_token"
+    sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+    sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+            `k1` int NOT NULL,
+            `c1` int,
+            `c2` int,
+            `c3` int
+            )UNIQUE KEY(k1)
+        DISTRIBUTED BY HASH(k1) BUCKETS 1
+        PROPERTIES (
+            "enable_unique_key_merge_on_write" = "true",
+            "disable_auto_compaction" = "true",
+            "replication_num" = "1"); """
+
+    sql "insert into ${table1} values(1,1,1,1);"
+    sql "insert into ${table1} values(2,2,2,2);"
+    sql "insert into ${table1} values(3,3,3,2);"
+    sql "sync;"
+    qt_sql "select * from ${table1} order by k1;"
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.fail.before.commit_job")
+
+        sql "alter table ${table1} modify column c2 varchar(100);"
+
+        def res
+        Awaitility.await().atMost(40, TimeUnit.SECONDS).pollDelay(1, 
TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
+            res = sql_return_maparray """ SHOW ALTER TABLE COLUMN WHERE 
TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """
+            logger.info("res: ${res}")
+            if (res[0].State == "FINISHED" || res[0].State == "CANCELLED") {
+                return true;
+            }
+            return false;
+        });
+
+        assert res[0].State == "CANCELLED"
+        assert !res[0].Msg.contains("compactions are not allowed on 
tablet_id") && !res[0].Msg.contains("stop token already exists")
+        assert res[0].Msg.contains("DELETE_BITMAP_LOCK_ERROR")
+
+        qt_sql "select * from ${table1} order by k1;"        
+    } catch(Exception e) {
+        logger.info(e.getMessage())
+        throw e
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to