This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dd59f479af5 [fix](cloud) Delete local rowsets before add_rowsets in
cloud schema change (#62256)
dd59f479af5 is described below
commit dd59f479af5a855401e3f862c751e8416070a1e2
Author: bobhan1 <[email protected]>
AuthorDate: Fri Apr 10 11:31:52 2026 +0800
[fix](cloud) Delete local rowsets before add_rowsets in cloud schema change
(#62256)
### What problem does this PR solve?
Problem Summary:
During cloud schema change, the MS (Meta Service) side correctly
recycles rowsets in `[2, alter_version]` on the new tablet when
committing the SC job. However, the BE side did not mirror this behavior
— it directly called `add_rowsets` for the SC output without first
removing existing local rowsets. This could leave stale rowsets (e.g.,
compaction outputs on the new tablet) visible in `_rs_version_map`, and
since their delete bitmap does not cover the SC output rows, duplicate
keys may appear in MOW tables.
PR #61089 increased the likelihood of triggering this issue by enabling
compaction on new tablets during SC, which makes it more common for the
new tablet to have compaction rowsets with wider version ranges (e.g.,
`[818-822]`) that overlap with individual SC output rowsets (e.g.,
`[818],[819],...,[822]`). The `add_rowsets` overlap check
(`to_add_v.contains(v)`) is one-directional: `[818].contains([818-822])`
evaluates to false, so the stale compaction rowset was not removed.
Fix: Before calling `add_rowsets` for SC output, delete all local
rowsets in `[2, alter_version]` from the new tablet, mirroring the
MS-side recycle behavior. A new
`CloudTablet::delete_rowsets_for_schema_change` method is added that
also removes edges from the version graph, preventing the greedy capture
algorithm from preferring the wider stale compaction path over the
individual SC output rowsets.
---
be/src/cloud/cloud_schema_change_job.cpp | 23 +++
be/src/cloud/cloud_tablet.cpp | 28 ++++
be/src/cloud/cloud_tablet.h | 7 +
be/test/cloud/cloud_tablet_test.cpp | 248 +++++++++++++++++++++++++++++++
4 files changed, 306 insertions(+)
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index 300a41c464d..0f8d097e309 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -25,6 +25,7 @@
#include <memory>
#include <mutex>
#include <random>
+#include <ranges>
#include <thread>
#include "cloud/cloud_meta_mgr.h"
@@ -524,6 +525,28 @@ Status
CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
// during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in
another thread
std::unique_lock lock {_new_tablet->get_sync_meta_lock()};
std::unique_lock wlock(_new_tablet->get_header_lock());
+ // Mirror MS behavior: delete rowsets in [2, alter_version] before
adding
+ // SC output rowsets to avoid stale compaction rowsets remaining
visible.
+ {
+ int64_t alter_ver = sc_job->alter_version();
+ std::vector<RowsetSharedPtr> to_delete;
+ for (auto& [v, rs] : _new_tablet->rowset_map()) {
+ if (v.first >= 2 && v.second <= alter_ver) {
+ to_delete.push_back(rs);
+ }
+ }
+ if (!to_delete.empty()) {
+ LOG_INFO(
+ "schema change: delete {} local rowsets in [2, {}]
before adding SC "
+ "output, tablet_id={}, versions=[{}]",
+ to_delete.size(), alter_ver, _new_tablet->tablet_id(),
+ fmt::join(to_delete | std::views::transform([](const
auto& rs) {
+ return rs->version().to_string();
+ }),
+ ", "));
+ _new_tablet->delete_rowsets_for_schema_change(to_delete,
wlock);
+ }
+ }
_new_tablet->add_rowsets(std::move(_output_rowsets), true, wlock,
false);
_new_tablet->set_cumulative_layer_point(_output_cumulative_point);
_new_tablet->reset_approximate_stats(stats.num_rowsets(),
stats.num_segments(),
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index f7be421e442..56e739a7581 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -491,6 +491,34 @@ void CloudTablet::delete_rowsets(const
std::vector<RowsetSharedPtr>& to_delete,
_tablet_meta->modify_rs_metas({}, rs_metas, false);
}
+void CloudTablet::delete_rowsets_for_schema_change(const
std::vector<RowsetSharedPtr>& to_delete,
+
std::unique_lock<std::shared_mutex>&) {
+ if (to_delete.empty()) {
+ return;
+ }
+ std::vector<RowsetMetaSharedPtr> rs_metas;
+ rs_metas.reserve(to_delete.size());
+ for (auto&& rs : to_delete) {
+ rs_metas.push_back(rs->rowset_meta());
+ _rs_version_map.erase(rs->version());
+ // Remove edge from version graph so that the greedy capture algorithm
+ // won't prefer the wider stale compaction rowset over individual SC
+ // output rowsets (e.g. [818-822] vs [818],[819],...,[822]).
+ _timestamped_version_tracker.delete_version(rs->version());
+ }
+
+ // Use same_version=true to skip adding to _stale_rs_metas. Do NOT use the
+ // stale tracking mechanism (_stale_rs_version_map /
_stale_version_path_map)
+ // because SC output will create new rowsets with identical version ranges;
+ // a later compaction could put those into stale as well, causing two stale
+ // paths to reference the same version key -- when one path is cleaned
first,
+ // the other hits a DCHECK(false) in delete_expired_stale_rowsets().
+ _tablet_meta->modify_rs_metas({}, rs_metas, true);
+
+ // Schedule for direct cache cleanup. MS has already recycled these
rowsets.
+ add_unused_rowsets(to_delete);
+}
+
uint64_t CloudTablet::delete_expired_stale_rowsets() {
if (config::enable_mow_verbose_log) {
LOG_INFO("begin delete_expired_stale_rowset for tablet={}",
tablet_id());
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index b0c66c20586..807ca6207c4 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -159,6 +159,13 @@ public:
void delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
std::unique_lock<std::shared_mutex>& meta_lock);
+ // Like delete_rowsets, but also removes edges from the version graph.
+ // Used by schema change to prevent the greedy capture algorithm from
+ // preferring stale compaction rowsets over individual SC output rowsets.
+ // MUST hold EXCLUSIVE `_meta_lock`.
+ void delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>&
to_delete,
+ std::unique_lock<std::shared_mutex>&
meta_lock);
+
// When the tablet is dropped, we need to recycle cached data:
// 1. The data in file cache
// 2. The memory in tablet cache
diff --git a/be/test/cloud/cloud_tablet_test.cpp
b/be/test/cloud/cloud_tablet_test.cpp
index 66a07cbc296..d2f201b14d4 100644
--- a/be/test/cloud/cloud_tablet_test.cpp
+++ b/be/test/cloud/cloud_tablet_test.cpp
@@ -1361,4 +1361,252 @@ TEST_F(CloudTabletWarmUpStateTest,
TestWarmedUpOverridesNotWarmedUp) {
EXPECT_TRUE(_tablet->is_rowset_warmed_up(rowset->rowset_id()));
}
+class CloudTabletDeleteRowsetsForSchemaChangeTest : public testing::Test {
+public:
+ CloudTabletDeleteRowsetsForSchemaChangeTest() :
_engine(CloudStorageEngine(EngineOptions {})) {}
+
+ void SetUp() override {
+ _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5,
TTabletSchema(), 6, {{7, 8}},
+ UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK,
+ TCompressionType::LZ4F));
+ _tablet =
+ std::make_shared<CloudTablet>(_engine,
std::make_shared<TabletMeta>(*_tablet_meta));
+ }
+ void TearDown() override {}
+
+ RowsetSharedPtr create_rowset(Version version) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_version(version);
+ rs_meta->set_rowset_id(_engine.next_rowset_id());
+ RowsetSharedPtr rowset;
+ Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta,
&rowset);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ return rowset;
+ }
+
+protected:
+ TabletMetaSharedPtr _tablet_meta;
+ std::shared_ptr<CloudTablet> _tablet;
+ CloudStorageEngine _engine;
+};
+
+// Simulate the DORIS-25014 scenario:
+// - New tablet has compacted rowset [2-6] from compaction during SC
+// - SC produces individual output rowsets [2],[3],[4],[5],[6]
+// - Without the fix, add_rowsets fails to remove [2-6] because
+// [2].contains([2-6]) = false
+// - With delete_rowsets_for_schema_change, the stale compaction rowset is
+// removed from both _rs_version_map and version graph before add_rowsets
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest,
TestSchemaChangeDeletesCompactionRowset) {
+ // Setup: add placeholder [0-1] and compacted rowset [2-6]
+ auto rs_placeholder = create_rowset(Version(0, 1));
+ auto rs_compacted = create_rowset(Version(2, 6));
+ ASSERT_NE(rs_placeholder, nullptr);
+ ASSERT_NE(rs_compacted, nullptr);
+
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock,
false);
+ }
+ // Verify initial state
+ ASSERT_EQ(_tablet->rowset_map().size(), 2);
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6)));
+
+ // SC produces individual rowsets
+ std::vector<RowsetSharedPtr> sc_output;
+ for (int v = 2; v <= 6; v++) {
+ auto rs = create_rowset(Version(v, v));
+ ASSERT_NE(rs, nullptr);
+ sc_output.push_back(rs);
+ }
+
+ // Simulate delete_rowsets_for_schema_change + add_rowsets
+ int64_t alter_version = 6;
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ // Collect rowsets in [2, alter_version]
+ std::vector<RowsetSharedPtr> to_delete;
+ for (auto& [v, rs] : _tablet->rowset_map()) {
+ if (v.first >= 2 && v.second <= alter_version) {
+ to_delete.push_back(rs);
+ }
+ }
+ ASSERT_EQ(to_delete.size(), 1); // only [2-6]
+ ASSERT_EQ(to_delete[0]->version(), Version(2, 6));
+
+ _tablet->delete_rowsets_for_schema_change(to_delete, wlock);
+
+ // [2-6] should be removed from rs_version_map
+ ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6)));
+ // Should NOT go to stale (to avoid stale path conflicts), but to
unused
+ ASSERT_FALSE(_tablet->has_stale_rowsets());
+ ASSERT_TRUE(_tablet->need_remove_unused_rowsets());
+
+ _tablet->add_rowsets(std::move(sc_output), false, wlock, false);
+ }
+
+ // Verify: individual SC rowsets are now in rs_version_map
+ ASSERT_EQ(_tablet->rowset_map().size(), 6); // [0-1] + 5 individual
+ for (int v = 2; v <= 6; v++) {
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(v, v)))
+ << "Missing version " << v << "-" << v;
+ }
+ ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 6)));
+
+ // Verify: capture_consistent_versions works correctly (no stale edges)
+ auto versions_result =
_tablet->capture_consistent_versions_unlocked(Version(0, 6), {});
+ ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
+ auto& versions = versions_result.value();
+ ASSERT_EQ(versions.size(), 6); // [0-1] + [2],[3],[4],[5],[6]
+ ASSERT_EQ(versions[0], Version(0, 1));
+ for (int i = 0; i < 5; i++) {
+ ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i));
+ }
+}
+
+// Test that delete_rowsets_for_schema_change with empty input is a no-op
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest, TestEmptyDeleteIsNoop) {
+ auto rs = create_rowset(Version(0, 1));
+ ASSERT_NE(rs, nullptr);
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->add_rowsets({rs}, false, wlock, false);
+ }
+ ASSERT_EQ(_tablet->rowset_map().size(), 1);
+
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->delete_rowsets_for_schema_change({}, wlock);
+ }
+ ASSERT_EQ(_tablet->rowset_map().size(), 1);
+ ASSERT_FALSE(_tablet->has_stale_rowsets());
+}
+
+// Test with multiple compaction rowsets spanning different version ranges
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest,
TestMultipleCompactionRowsets) {
+ auto rs_placeholder = create_rowset(Version(0, 1));
+ auto rs_comp1 = create_rowset(Version(2, 5));
+ auto rs_comp2 = create_rowset(Version(6, 10));
+ auto rs_post = create_rowset(Version(11, 11)); // after alter_version,
should NOT be deleted
+ ASSERT_NE(rs_placeholder, nullptr);
+ ASSERT_NE(rs_comp1, nullptr);
+ ASSERT_NE(rs_comp2, nullptr);
+ ASSERT_NE(rs_post, nullptr);
+
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->add_rowsets({rs_placeholder, rs_comp1, rs_comp2, rs_post},
false, wlock, false);
+ }
+ ASSERT_EQ(_tablet->rowset_map().size(), 4);
+
+ // SC output: individual rowsets for versions 2-10
+ std::vector<RowsetSharedPtr> sc_output;
+ for (int v = 2; v <= 10; v++) {
+ auto rs = create_rowset(Version(v, v));
+ ASSERT_NE(rs, nullptr);
+ sc_output.push_back(rs);
+ }
+
+ int64_t alter_version = 10;
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ std::vector<RowsetSharedPtr> to_delete;
+ for (auto& [v, rs] : _tablet->rowset_map()) {
+ if (v.first >= 2 && v.second <= alter_version) {
+ to_delete.push_back(rs);
+ }
+ }
+ ASSERT_EQ(to_delete.size(), 2); // [2-5] and [6-10]
+
+ _tablet->delete_rowsets_for_schema_change(to_delete, wlock);
+
+ // Post-alter rowset should survive
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(11, 11)));
+ ASSERT_FALSE(_tablet->rowset_map().count(Version(2, 5)));
+ ASSERT_FALSE(_tablet->rowset_map().count(Version(6, 10)));
+
+ _tablet->add_rowsets(std::move(sc_output), false, wlock, false);
+ }
+
+ // Verify: [0-1], [2],[3],...,[10], [11-11]
+ ASSERT_EQ(_tablet->rowset_map().size(), 11);
+
+ // Verify capture
+ auto versions_result =
_tablet->capture_consistent_versions_unlocked(Version(0, 11), {});
+ ASSERT_TRUE(versions_result.has_value()) << versions_result.error();
+ auto& versions = versions_result.value();
+ ASSERT_EQ(versions.size(), 11);
+ ASSERT_EQ(versions[0], Version(0, 1));
+ for (int i = 0; i < 9; i++) {
+ ASSERT_EQ(versions[i + 1], Version(2 + i, 2 + i));
+ }
+ ASSERT_EQ(versions[10], Version(11, 11));
+}
+
+// Reproduce the CI crash scenario: SC delete puts rowsets to stale, then
+// compaction creates a new stale path with overlapping version keys. When
+// one stale path is cleaned, the other hits DCHECK(false) because the
+// version is already removed from _stale_rs_version_map.
+// With the fix (bypassing stale tracking), this should not happen.
+TEST_F(CloudTabletDeleteRowsetsForSchemaChangeTest,
TestNoStalePathConflictWithCompaction) {
+ // Setup: [0-1] placeholder, [2-6] compaction product during SC
+ auto rs_placeholder = create_rowset(Version(0, 1));
+ auto rs_compacted = create_rowset(Version(2, 6));
+ ASSERT_NE(rs_placeholder, nullptr);
+ ASSERT_NE(rs_compacted, nullptr);
+
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->add_rowsets({rs_placeholder, rs_compacted}, false, wlock,
false);
+ }
+
+ // SC output: individual rowsets [2],[3],[4],[5],[6]
+ std::vector<RowsetSharedPtr> sc_output;
+ for (int v = 2; v <= 6; v++) {
+ sc_output.push_back(create_rowset(Version(v, v)));
+ }
+
+ // Step 1: delete_rowsets_for_schema_change + add SC output
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ _tablet->delete_rowsets_for_schema_change({rs_compacted}, wlock);
+ _tablet->add_rowsets(std::move(sc_output), false, wlock, false);
+ }
+ // Stale should be empty — SC delete bypasses stale tracking
+ ASSERT_FALSE(_tablet->has_stale_rowsets());
+
+ // Step 2: compaction merges SC output [2],[3],[4],[5],[6] -> [2-6]
+ auto rs_new_compacted = create_rowset(Version(2, 6));
+ std::vector<RowsetSharedPtr> compaction_input;
+ {
+ std::unique_lock wlock(_tablet->get_header_lock());
+ for (auto& [v, rs] : _tablet->rowset_map()) {
+ if (v.first >= 2 && v.second <= 6) {
+ compaction_input.push_back(rs);
+ }
+ }
+ ASSERT_EQ(compaction_input.size(), 5);
+ // Normal compaction delete_rowsets — this WILL use stale tracking
+ _tablet->delete_rowsets(compaction_input, wlock);
+ _tablet->add_rowsets({rs_new_compacted}, false, wlock, false);
+ }
+ // Now stale has the compaction inputs
+ ASSERT_TRUE(_tablet->has_stale_rowsets());
+
+ // Step 3: delete_expired_stale_rowsets — this is where CI crashed
+ // With old code: stale path from SC and compaction both reference [2-6]
key,
+ // causing DCHECK(false). With fix: only compaction stale path exists, no
conflict.
+ config::tablet_rowset_stale_sweep_time_sec = 0; // expire immediately
+ ASSERT_NO_FATAL_FAILURE(_tablet->delete_expired_stale_rowsets());
+
+ // Verify final state: [0-1] and [2-6] active, no stale left
+ ASSERT_EQ(_tablet->rowset_map().size(), 2);
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(0, 1)));
+ ASSERT_TRUE(_tablet->rowset_map().count(Version(2, 6)));
+ ASSERT_FALSE(_tablet->has_stale_rowsets());
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]