This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8445855cf25 [fix](cloud) Fix schema change stats id and cumulative 
point (#55703)
8445855cf25 is described below

commit 8445855cf25a57b25f6703ade3dcfc51852e4091
Author: walter <[email protected]>
AuthorDate: Fri Sep 5 17:34:31 2025 +0800

    [fix](cloud) Fix schema change stats id and cumulative point (#55703)
---
 cloud/src/meta-service/meta_service_job.cpp | 86 ++++++++++++++++++++---------
 cloud/test/meta_service_job_test.cpp        |  9 ++-
 2 files changed, 68 insertions(+), 27 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_job.cpp 
b/cloud/src/meta-service/meta_service_job.cpp
index 21fe8b9663b..b2e657c74f6 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -1341,6 +1341,29 @@ void schema_change_update_tablet_stats(const 
TabletSchemaChangeJobPB& schema_cha
                                                      
segment_size_remove_rowsets));
 }
 
+std::pair<TabletStatsPB, TabletStatsPB> 
split_tablet_stats_into_load_and_compact_parts(
+        const TabletStatsPB& stats) {
+    TabletStatsPB load_stats, compact_stats;
+    compact_stats.set_base_compaction_cnt(stats.base_compaction_cnt());
+    
compact_stats.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
+    compact_stats.set_cumulative_point(stats.cumulative_point());
+    
compact_stats.set_last_base_compaction_time_ms(stats.last_base_compaction_time_ms());
+    
compact_stats.set_last_cumu_compaction_time_ms(stats.last_cumu_compaction_time_ms());
+    compact_stats.set_full_compaction_cnt(stats.full_compaction_cnt());
+    
compact_stats.set_last_full_compaction_time_ms(stats.last_full_compaction_time_ms());
+    compact_stats.mutable_idx()->CopyFrom(stats.idx());
+
+    load_stats.set_num_rows(stats.num_rows());
+    load_stats.set_num_rowsets(stats.num_rowsets());
+    load_stats.set_num_segments(stats.num_segments());
+    load_stats.set_data_size(stats.data_size());
+    load_stats.set_index_size(stats.index_size());
+    load_stats.set_segment_size(stats.segment_size());
+    load_stats.mutable_idx()->CopyFrom(stats.idx());
+
+    return {load_stats, compact_stats};
+}
+
 std::pair<MetaServiceCode, std::string> scan_schema_change_input_rowsets(
         Transaction* txn, std::string_view instance_id, int64_t new_tablet_id,
         std::string& rs_start, std::string& rs_end, auto&& callback) {
@@ -1708,12 +1731,13 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
         }
     } else {
         std::vector<RowsetMetaCloudPB> rowset_metas;
-        TxnErrorCode err = reader.get_rowset_metas(txn.get(), tablet_id, 2,
+        TxnErrorCode err = reader.get_rowset_metas(txn.get(), new_tablet_id, 2,
                                                    
schema_change.alter_version(), &rowset_metas);
         if (err != TxnErrorCode::TXN_OK) {
             code = cast_as<ErrCategory::READ>(err);
-            msg = fmt::format("failed to get rowset metas, tablet_id={}, 
start={}, end={}, err={}",
-                              tablet_id, 2, schema_change.alter_version(), 
err);
+            msg = fmt::format(
+                    "failed to get rowset metas, new_tablet_id={}, start={}, 
end={}, err={}",
+                    new_tablet_id, 2, schema_change.alter_version(), err);
             LOG(WARNING) << msg;
             return;
         }
@@ -1730,10 +1754,12 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
     auto stats = response->mutable_stats();
     TabletStats detached_stats;
     if (is_versioned_read) {
-        TxnErrorCode err = reader.get_tablet_load_stats(txn.get(), tablet_id, 
stats, nullptr, true);
+        TxnErrorCode err =
+                reader.get_tablet_merged_stats(txn.get(), new_tablet_id, 
stats, nullptr, true);
         if (err != TxnErrorCode::TXN_OK) {
             code = cast_as<ErrCategory::READ>(err);
-            msg = fmt::format("failed to get tablet stats, tablet_id={}, 
err={}", tablet_id, err);
+            msg = fmt::format("failed to get tablet stats, tablet_id={}, 
err={}", new_tablet_id,
+                              err);
             LOG(WARNING) << msg;
             return;
         }
@@ -1754,48 +1780,56 @@ void process_schema_change_job(MetaServiceCode& code, 
std::string& msg, std::str
     }
     if (is_versioned_write) {
         // read new TabletLoadStatsKey -> TabletStatsPB
-        TabletStatsPB new_tablet_load_stats;
+        TabletStatsPB new_tablet_stats;
         MetaReader meta_reader(instance_id, txn_kv);
         Versionstamp* versionstamp = nullptr;
         TxnErrorCode err = TxnErrorCode::TXN_OK;
         if (is_versioned_read) {
-            new_tablet_load_stats.CopyFrom(*stats);
+            new_tablet_stats.CopyFrom(*stats);
         } else {
-            err = meta_reader.get_tablet_load_stats(txn.get(), new_tablet_id,
-                                                    &new_tablet_load_stats, 
versionstamp, false);
+            err = meta_reader.get_tablet_merged_stats(txn.get(), 
new_tablet_id, &new_tablet_stats,
+                                                      versionstamp, false);
         }
         if (err == TxnErrorCode::TXN_OK) {
             // new_tablet_load_stats exists, update TabletStatsPB
-            schema_change_update_tablet_stats(
-                    schema_change, &new_tablet_load_stats, num_remove_rows, 
size_remove_rowsets,
-                    num_remove_rowsets, num_remove_segments, 
index_size_remove_rowsets,
-                    segment_size_remove_rowsets);
+            schema_change_update_tablet_stats(schema_change, 
&new_tablet_stats, num_remove_rows,
+                                              size_remove_rowsets, 
num_remove_rowsets,
+                                              num_remove_segments, 
index_size_remove_rowsets,
+                                              segment_size_remove_rowsets);
         } else if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
             // First time switching from single write to double write mode
             // Step 1: Copy from single version stats as baseline
-            new_tablet_load_stats.CopyFrom(*stats);
+            new_tablet_stats.CopyFrom(*stats);
             // Step 2: Apply schema change updates
-            schema_change_update_tablet_stats(
-                    schema_change, &new_tablet_load_stats, num_remove_rows, 
size_remove_rowsets,
-                    num_remove_rowsets, num_remove_segments, 
index_size_remove_rowsets,
-                    segment_size_remove_rowsets);
+            schema_change_update_tablet_stats(schema_change, 
&new_tablet_stats, num_remove_rows,
+                                              size_remove_rowsets, 
num_remove_rowsets,
+                                              num_remove_segments, 
index_size_remove_rowsets,
+                                              segment_size_remove_rowsets);
         } else if (err != TxnErrorCode::TXN_OK) {
             code = cast_as<ErrCategory::READ>(err);
             msg = fmt::format("failed to get tablet compact stats, 
tablet_id={}, err={}", tablet_id,
                               err);
             return;
         }
-        // Write new TabletLoadStatsKey -> TabletStatsPB for versioned storage
-        auto new_tablet_load_stats_val = 
new_tablet_load_stats.SerializeAsString();
-        std::string new_tablet_load_stats_version_key =
-                versioned::tablet_load_stats_key({instance_id, new_tablet_id});
-        LOG_INFO("put versioned tablet compact stats key")
-                .tag("new_tablet_load_stats_version_key", 
hex(new_tablet_load_stats_version_key))
+
+        auto [load_stats, compact_stats] =
+                
split_tablet_stats_into_load_and_compact_parts(new_tablet_stats);
+        std::string load_value = load_stats.SerializeAsString();
+        std::string compact_value = compact_stats.SerializeAsString();
+        std::string load_stats_key = 
versioned::tablet_load_stats_key({instance_id, new_tablet_id});
+        std::string compact_stats_key =
+                versioned::tablet_compact_stats_key({instance_id, 
new_tablet_id});
+
+        LOG_INFO("put versioned tablet load/compact stats key")
                 .tag("tablet_id", tablet_id)
                 .tag("new_tablet_id", new_tablet_id)
-                .tag("value_size", new_tablet_load_stats_val.size())
+                .tag("load_value_size", load_value.size())
+                .tag("compact_value_size", compact_value.size())
+                .tag("load_stats_key", hex(load_stats_key))
+                .tag("compact_stats_key", hex(compact_stats_key))
                 .tag("instance_id", instance_id);
-        versioned_put(txn.get(), new_tablet_load_stats_version_key, 
new_tablet_load_stats_val);
+        versioned_put(txn.get(), load_stats_key, load_value);
+        versioned_put(txn.get(), compact_stats_key, compact_value);
     }
     schema_change_update_tablet_stats(schema_change, stats, num_remove_rows, 
size_remove_rowsets,
                                       num_remove_rowsets, num_remove_segments,
diff --git a/cloud/test/meta_service_job_test.cpp 
b/cloud/test/meta_service_job_test.cpp
index 8d7d805aa66..e53ed77fb35 100644
--- a/cloud/test/meta_service_job_test.cpp
+++ b/cloud/test/meta_service_job_test.cpp
@@ -1425,7 +1425,9 @@ TEST(MetaServiceJobVersionedReadTest, 
SchemaChangeJobTest) {
                        {tablet_id, new_tablet_id});
     }
 
-    // Create output rowsets for new tablet
+    // Now old table has rowsets [1, 2, 3, 4, 5], and new tablet has [4, 5]
+
+    // Create output rowsets for new tablet [1, 2, 3]
     std::vector<doris::RowsetMetaCloudPB> output_rowsets;
     for (int64_t i = 0; i < 3; ++i) {
         auto rowset = create_rowset(new_tablet_id, i + 2, i + 2, 100);
@@ -1471,6 +1473,8 @@ TEST(MetaServiceJobVersionedReadTest, 
SchemaChangeJobTest) {
         schema_change->set_size_output_rowsets(300 * 110);
         schema_change->set_index_size_output_rowsets(300 * 10);
         schema_change->set_segment_size_output_rowsets(300 * 110);
+        schema_change->set_output_cumulative_point(
+                4); // cumulative point from the old table to the new one.
 
         brpc::Controller cntl;
         meta_service->finish_tablet_job(&cntl, &req, &res, nullptr);
@@ -1496,6 +1500,8 @@ TEST(MetaServiceJobVersionedReadTest, 
SchemaChangeJobTest) {
         EXPECT_EQ(new_stats.segment_size(),
                   new_tablet_stats_pb.segment_size() +
                           
req.job().schema_change().segment_size_output_rowsets());
+        EXPECT_EQ(new_stats.cumulative_point(),
+                  req.job().schema_change().output_cumulative_point());
     }
 
     {
@@ -1524,6 +1530,7 @@ TEST(MetaServiceJobVersionedReadTest, 
SchemaChangeJobTest) {
         ASSERT_EQ(resp.rowset_meta(1).end_version(), 6);
     }
 
+    new_tablet_stats_pb = get_tablet_stats(new_tablet_id);
     {
         // Get the rowset metas of the new tablet
         GetRowsetRequest req;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to