This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 600e0504807 branch-4.0: [Opt](freshness tolerance) Continue to capture
rowsets when the rowset is not in `_rowset_warm_up_states` (#61238) (#61680)
600e0504807 is described below
commit 600e050480785c4c63a83922e1915a2f6d396ba4
Author: bobhan1 <[email protected]>
AuthorDate: Wed Mar 25 11:06:18 2026 +0800
branch-4.0: [Opt](freshness tolerance) Continue to capture rowsets when the
rowset is not in `_rowset_warm_up_states` (#61238) (#61680)
pick https://github.com/apache/doris/pull/61238
---
be/src/cloud/cloud_tablet.cpp | 36 +++-
be/src/cloud/cloud_tablet.h | 3 +
.../cloud/cloud_tablet_query_prefer_cache_test.cpp | 6 +
.../cloud_tablet_query_with_tolerance_test.cpp | 196 +++++++++++++++++++++
be/test/cloud/cloud_tablet_test.cpp | 57 ++++++
5 files changed, 297 insertions(+), 1 deletion(-)
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index b3d9eb5a0ba..75f3218c5f3 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -80,6 +80,7 @@ bvar::Adder<int64_t> g_capture_with_freshness_tolerance_count(
"capture_with_freshness_tolerance_count");
bvar::Adder<int64_t> g_capture_with_freshness_tolerance_fallback_count(
"capture_with_freshness_tolerance_fallback_count");
+bvar::Adder<int64_t>
g_rowset_warmup_state_missing_count("rowset_warmup_state_missing_count");
bvar::Window<bvar::Adder<int64_t>> g_capture_prefer_cache_count_window(
"capture_prefer_cache_count_window", &g_capture_prefer_cache_count,
30);
bvar::Window<bvar::Adder<int64_t>>
g_capture_with_freshness_tolerance_count_window(
@@ -1790,7 +1791,32 @@ WarmUpState
CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trig
bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const {
auto it = _rowset_warm_up_states.find(rowset_id);
if (it == _rowset_warm_up_states.end()) {
- return false;
+ // The rowset is not in warmup state, which means the rowset has never
been warmed up.
+ // This may happen when the upstream BE tried to warm up rowsets on
this BE but this BE
+ // was restarting so the warmup failed, and _rowset_warm_up_states has
no entry for it.
+ //
+ // Normally the startup_timepoint check in
rowset_is_warmed_up_unlocked() would filter out
+ // such rowsets (visible_timestamp < startup_timepoint → assumed
warmed up). However,
+ // compaction-produced rowsets have their visible_timestamp set at
rowset builder
+ // initialization time rather than the final transaction commit time
on meta-service,
+ // so their visible_timestamp can be earlier than startup_timepoint,
causing the
+ // startup_timepoint check to NOT filter them out and reaching here
with no warmup entry.
+ //
+ // If such a rowset is before the cumulative compaction point and base
compaction never
+ // happens, returning false here would cause the version path
algorithm to exclude it,
+ // leading to a persistently low path_max_version. With continuous
upstream ingestion,
+ // the freshness tolerance fallback check would keep triggering,
making every query on
+ // this tablet fall back to reading all data from remote storage.
+ //
+ // Returning true (optimistically treating it as warmed up) allows the
version path to
+ // include it. On cache miss the data is transparently read from
remote storage per-segment
+ // and cached locally in 1MB blocks, so the problem self-heals through
subsequent queries.
+ g_rowset_warmup_state_missing_count << 1;
+ LOG_EVERY_N(WARNING, 100) << fmt::format(
+ "rowset warmup state missing, considering it as warmed up.
tablet_id={}, "
+ "rowset_id={}",
+ tablet_id(), rowset_id.to_string());
+ return true;
}
return it->second.state.progress == WarmUpProgress::DONE;
}
@@ -1803,5 +1829,13 @@ void CloudTablet::add_warmed_up_rowset(const RowsetId&
rowset_id) {
.start_tp = std::chrono::steady_clock::now()};
}
+void CloudTablet::add_not_warmed_up_rowset(const RowsetId& rowset_id) {
+ _rowset_warm_up_states[rowset_id] = {
+ .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
+ .progress = WarmUpProgress::DOING},
+ .num_segments = 1,
+ .start_tp = std::chrono::steady_clock::now()};
+}
+
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index f4642266f47..f0b4f6b307b 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -352,6 +352,9 @@ public:
bool is_rowset_warmed_up(const RowsetId& rowset_id) const;
void add_warmed_up_rowset(const RowsetId& rowset_id);
+ // Test helper: add a rowset to the warmup state map with DOING progress,
+ // so that is_rowset_warmed_up() returns false for it.
+ void add_not_warmed_up_rowset(const RowsetId& rowset_id);
std::string rowset_warmup_digest() const {
std::string res;
diff --git a/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp
b/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp
index 464afb9fc6c..5fa0eaab89b 100644
--- a/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp
+++ b/be/test/cloud/cloud_tablet_query_prefer_cache_test.cpp
@@ -87,6 +87,8 @@ public:
auto rs = create_rowset(Version {ver, ver});
if (warmup) {
tablet->add_warmed_up_rowset(rs->rowset_id());
+ } else {
+ tablet->add_not_warmed_up_rowset(rs->rowset_id());
}
rowsets.emplace_back(rs);
}
@@ -102,6 +104,8 @@ public:
auto rowset = create_rowset(Version {version, version},
visible_timestamp);
if (warmed_up) {
tablet->add_warmed_up_rowset(rowset->rowset_id());
+ } else {
+ tablet->add_not_warmed_up_rowset(rowset->rowset_id());
}
std::unique_lock wlock {tablet->get_header_lock()};
tablet->add_rowsets({rowset}, false, wlock, false);
@@ -114,6 +118,8 @@ public:
auto output_rowset = create_rowset(Version {start_version,
end_version}, visible_timestamp);
if (warmed_up) {
tablet->add_warmed_up_rowset(output_rowset->rowset_id());
+ } else {
+ tablet->add_not_warmed_up_rowset(output_rowset->rowset_id());
}
std::ranges::copy_if(std::views::values(tablet->rowset_map()),
std::back_inserter(input_rowsets), [=](const
RowsetSharedPtr& rowset) {
diff --git a/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp
b/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp
index 1a24ea275be..9237c0bef9c 100644
--- a/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp
+++ b/be/test/cloud/cloud_tablet_query_with_tolerance_test.cpp
@@ -99,6 +99,8 @@ public:
auto rowset = create_rowset(Version {version, version},
visible_timestamp);
if (warmed_up) {
tablet->add_warmed_up_rowset(rowset->rowset_id());
+ } else {
+ tablet->add_not_warmed_up_rowset(rowset->rowset_id());
}
std::unique_lock wlock {tablet->get_header_lock()};
tablet->add_rowsets({rowset}, false, wlock, false);
@@ -111,6 +113,8 @@ public:
auto output_rowset = create_rowset(Version {start_version,
end_version}, visible_timestamp);
if (warmed_up) {
tablet->add_warmed_up_rowset(output_rowset->rowset_id());
+ } else {
+ tablet->add_not_warmed_up_rowset(output_rowset->rowset_id());
}
std::ranges::copy_if(std::views::values(tablet->rowset_map()),
std::back_inserter(input_rowsets), [=](const
RowsetSharedPtr& rowset) {
@@ -125,6 +129,41 @@ public:
}
}
+ // Add a rowset whose warmup state is missing from
`_rowset_warm_up_states`.
+ // This simulates the scenario where the upstream BE tried to warm up
rowsets on this BE,
+ // but this BE was restarting so the warmup request was lost.
+ void add_new_version_rowset_missing_warmup_state(CloudTabletSPtr tablet,
int64_t version,
+ time_point<system_clock>
visible_timestamp) {
+ auto rowset = create_rowset(Version {version, version},
visible_timestamp);
+ // Intentionally do NOT add any warmup state entry for this rowset
+ std::unique_lock wlock {tablet->get_header_lock()};
+ tablet->add_rowsets({rowset}, false, wlock, false);
+ }
+
+ // Simulate a compaction output rowset whose warmup state is missing from
+ // `_rowset_warm_up_states`. This can happen when a compaction is produced
while the BE
+ // is restarting, and the visible_timestamp of compaction output is set
earlier than
+ // startup_timepoint, causing it to bypass the startup_timepoint filter.
+ void do_cumu_compaction_missing_warmup_state(CloudTabletSPtr tablet,
int64_t start_version,
+ int64_t end_version,
+ time_point<system_clock>
visible_timestamp) {
+ std::unique_lock wrlock {tablet->get_header_lock()};
+ std::vector<RowsetSharedPtr> input_rowsets;
+ auto output_rowset = create_rowset(Version {start_version,
end_version}, visible_timestamp);
+ // Intentionally do NOT add any warmup state entry for this rowset
+ std::ranges::copy_if(std::views::values(tablet->rowset_map()),
+ std::back_inserter(input_rowsets), [=](const
RowsetSharedPtr& rowset) {
+ return rowset->version().first >=
start_version &&
+ rowset->version().first <= end_version;
+ });
+ if (input_rowsets.size() == 1) {
+ tablet->add_rowsets({output_rowset}, true, wrlock);
+ } else {
+ tablet->delete_rowsets(input_rowsets, wrlock);
+ tablet->add_rowsets({output_rowset}, false, wrlock);
+ }
+ }
+
void check_capture_result(CloudTabletSPtr tablet, Version spec_version,
int64_t query_freshness_tolerance_ms,
const std::vector<Version>& expected_versions) {
@@ -1071,4 +1110,161 @@ TEST_F(TestFreshnessTolerance, testCaptureMow_3_1) {
std::vector<Version> expected_versions = {{0, 1}, {2, 10}, {11, 17}, {18,
18}};
check_capture_result(tablet, Version {0, 18},
query_freshness_tolerance_ms, expected_versions);
}
+
+// Tests for the behavior change in is_rowset_warmed_up: when a rowset's
warmup state
+// is missing from `_rowset_warm_up_states` (e.g. BE was restarting during
warmup),
+// is_rowset_warmed_up returns true (optimistically treat it as warmed up).
+// This prevents the version path from being blocked by missing warmup entries,
+// which could cause persistent fallback to remote storage reads.
+
+TEST_F(TestFreshnessTolerance, testCapture_missing_warmup_state_1) {
+ /*
+ Scenario: Compaction output rowset has visible_timestamp earlier than
startup_timepoint
+ but its warmup state is missing from the map (BE restarted during warmup).
+ The rowset should be treated as warmed up so it's included in the version
path.
+
+ now-10s now
+ │ 10s │
+ ◄────────────────────────┼
+ │ │
+ ┌────────┐ ┌──────────┐ │ ┌────────┐│
+ │in cache│ │ missing │ │ │ ││
+ │ │ │ warmup │ │ │ ││
+ │ [2-10] │ │ state │ │ │[18-18] ││
+ └────────┘ │ [11-17] │ │ └────────┘│
+ └──────────┘ │ │
+ now-40s now-35s │ now-3s │
+ (before startup) │ │
+
+ startup_timepoint: now-30s
+ [11-17]: visible_ts=now-35s, which is BEFORE startup_timepoint
+ → startup_timepoint check passes, treats it as warmed up
+ [18-18]: visible_ts=now-3s, which is AFTER startup_timepoint
+ → not in warmup map → is_rowset_warmed_up returns true
(optimistically)
+ return: [2-10],[11-17],[18-18]
+ */
+ _engine.set_startup_timepoint(system_clock::now() - seconds(30));
+ auto tablet = create_tablet_with_initial_rowsets(15);
+ do_cumu_compaction(tablet, 2, 10, true, system_clock::now() - seconds(40));
+ do_cumu_compaction(tablet, 11, 15, true, system_clock::now() -
seconds(20));
+ add_new_version_rowset(tablet, 16, true, system_clock::now() -
seconds(15));
+ add_new_version_rowset(tablet, 17, true, system_clock::now() - seconds(7));
+ add_new_version_rowset_missing_warmup_state(tablet, 18,
system_clock::now() - seconds(3));
+ // Compaction output has visible_ts before startup_timepoint, no warmup
state entry
+ do_cumu_compaction_missing_warmup_state(tablet, 11, 17,
system_clock::now() - seconds(35));
+
+ int64_t query_freshness_tolerance_ms = 10000; // 10s
+ // [11-17] passes startup_timepoint check (visible_ts < startup_timepoint
→ assumed warmed up)
+ // [18-18] is optimistically treated as warmed up (missing from warmup map
→ returns true)
+ // So the full path is captured, and then fallback check:
+ // - 18 start_version(18) > path_max_version? No, path includes it.
+ // - No fallback needed since path_max_version = 18
+ std::vector<Version> expected_versions = {{0, 1}, {2, 10}, {11, 17}, {18,
18}};
+ check_capture_result(tablet, Version {0, 18},
query_freshness_tolerance_ms, expected_versions);
+}
+
+TEST_F(TestFreshnessTolerance, testCapture_missing_warmup_state_2) {
+ /*
+ Scenario: Regular rowsets after startup have warmup state missing from
the map.
+ When visible_timestamp > startup_timepoint, these rowsets reach
is_rowset_warmed_up
+ which returns true for missing entries.
+
+ now-10s now
+
+ │ 10s │
+ ◄───────────────────────────┤
+ ┌────────┐ ┌─────────┐ ┌─────────┐│ ┌────────┐ ┌───────┐ │
+ │in cache│ │ in cache│ │in cache ││ │missing │ │missing│ │
+ │ │ │ │ │ ││ │warmup │ │warmup │ │
+ │ [2-10] │ │ [11-15] │ │ [16-16] ││ │state │ │state │ │
+ └────────┘ └─────────┘ └─────────┘│ │[17-17] │ │[18-18]│ │
+ │ └────────┘ └───────┘ │
+ now-40s now-20s now-15s │ now-7s now-3s │
+ │ │
+ return: [2-10],[11-15],[16-16],[17-17],[18-18]
+ note: rowsets 17 and 18 have no warmup state entry at all (BE was
restarting when
+ upstream tried to warm them up). They are optimistically treated as
warmed up.
+ Since all rowsets are now in the path, path_max_version=18, no
fallback.
+ */
+ _engine.set_startup_timepoint(system_clock::now() - seconds(200));
+ auto tablet = create_tablet_with_initial_rowsets(15);
+ do_cumu_compaction(tablet, 2, 10, true, system_clock::now() - seconds(40));
+ do_cumu_compaction(tablet, 11, 15, true, system_clock::now() -
seconds(20));
+ add_new_version_rowset(tablet, 16, true, system_clock::now() -
seconds(15));
+ add_new_version_rowset_missing_warmup_state(tablet, 17,
system_clock::now() - seconds(7));
+ add_new_version_rowset_missing_warmup_state(tablet, 18,
system_clock::now() - seconds(3));
+
+ int64_t query_freshness_tolerance_ms = 10000; // 10s
+ // Missing warmup state → optimistically warmed up → included in path
+ std::vector<Version> expected_versions = {{0, 1}, {2, 10}, {11, 15},
+ {16, 16}, {17, 17}, {18, 18}};
+ check_capture_result(tablet, Version {0, 18},
query_freshness_tolerance_ms, expected_versions);
+}
+
+TEST_F(TestFreshnessTolerance, testCapture_missing_warmup_state_3) {
+ /*
+ Scenario: Compaction output rowset has visible_timestamp AFTER
startup_timepoint,
+ and its warmup state is missing from the map. The stale rowsets that form
the version
+ path before compaction are all warmed up.
+
+ With the old behavior (missing → false), the compaction output [11-17]
would be excluded,
+ and the algorithm would use stale rowsets [11-15], [16-16] instead, but
[17-17] (stale,
+ also missing) would also be excluded → path_max_version would be stuck at
16.
+
+ With the new behavior (missing → true), [11-17] is included →
path_max_version = 18.
+
+ now-10s now
+ │ 10s │
+ ◄────────────────────────┼
+ │ │
+ ┌────────┐ │┌────────┐ ┌───────┐ │
+ │in cache│ ││missing │ │ │ │
+ │ │ ││warmup │ │ │ │
+ │ [2-10] │ ││state │ │[18-18]│ │
+ └────────┘ ││[11-17] │ └───────┘ │
+ │└────────┘ │
+ now-40s │ now-1s now-3s │
+
+ return: [2-10],[11-17],[18-18]
+ note: [11-17] missing from warmup map → treated as warmed up → included in
path
+ */
+ _engine.set_startup_timepoint(system_clock::now() - seconds(200));
+ auto tablet = create_tablet_with_initial_rowsets(15);
+ do_cumu_compaction(tablet, 2, 10, true, system_clock::now() - seconds(40));
+ do_cumu_compaction(tablet, 11, 15, true, system_clock::now() -
seconds(20));
+ add_new_version_rowset(tablet, 16, true, system_clock::now() -
seconds(15));
+ add_new_version_rowset(tablet, 17, true, system_clock::now() - seconds(7));
+ add_new_version_rowset(tablet, 18, true, system_clock::now() - seconds(3));
+ // Compaction output missing warmup state (BE restarted after compaction
was initiated)
+ do_cumu_compaction_missing_warmup_state(tablet, 11, 17,
system_clock::now() - seconds(1));
+
+ int64_t query_freshness_tolerance_ms = 10000; // 10s
+ // [11-17] missing → treated as warmed up → algorithm picks it over stale
rowsets
+ // The version path includes all versions, path_max_version = 18, no
fallback
+ std::vector<Version> expected_versions = {{0, 1}, {2, 10}, {11, 17}, {18,
18}};
+ check_capture_result(tablet, Version {0, 18},
query_freshness_tolerance_ms, expected_versions);
+}
+
+TEST_F(TestFreshnessTolerance, testCaptureMow_missing_warmup_state_1) {
+ /*
+ Same as testCapture_missing_warmup_state_2 but for MOW table.
+ Rowsets 17 and 18 have warmup state missing from the map.
+ They should be treated as warmed up.
+
+ return: [2-10],[11-15],[16-16],[17-17],[18-18]
+ */
+ _engine.set_startup_timepoint(system_clock::now() - seconds(200));
+ auto tablet = create_tablet_with_initial_rowsets(15, true);
+ do_cumu_compaction(tablet, 2, 10, true, system_clock::now() - seconds(40));
+ do_cumu_compaction(tablet, 11, 15, true, system_clock::now() -
seconds(20));
+ add_new_version_rowset(tablet, 16, true, system_clock::now() -
seconds(15));
+ add_new_version_rowset_missing_warmup_state(tablet, 17,
system_clock::now() - seconds(7));
+ add_new_version_rowset_missing_warmup_state(tablet, 18,
system_clock::now() - seconds(3));
+
+ int64_t query_freshness_tolerance_ms = 10000; // 10s
+ std::vector<Version> expected_versions = {{0, 1}, {2, 10}, {11, 15},
+ {16, 16}, {17, 17}, {18, 18}};
+ check_capture_result(tablet, Version {0, 18},
query_freshness_tolerance_ms, expected_versions);
+}
+
} // namespace doris
diff --git a/be/test/cloud/cloud_tablet_test.cpp
b/be/test/cloud/cloud_tablet_test.cpp
index 2c375dc1bca..904dc2e3fdf 100644
--- a/be/test/cloud/cloud_tablet_test.cpp
+++ b/be/test/cloud/cloud_tablet_test.cpp
@@ -940,4 +940,61 @@ TEST_F(CloudTabletSyncMetaTest,
TestSyncMetaMultipleProperties) {
sp->disable_processing();
sp->clear_all_call_backs();
}
+
+// Test is_rowset_warmed_up returns true for rowset NOT in the warmup state map
+// This is the behavior change: missing warmup state → optimistically warmed up
+TEST_F(CloudTabletWarmUpStateTest, TestIsRowsetWarmedUpMissingFromMap) {
+ auto rowset = create_rowset(Version(22, 22));
+ ASSERT_NE(rowset, nullptr);
+
+ // Rowset is not in the warmup state map at all
+ // Before the fix, this would return false. Now it returns true.
+ EXPECT_TRUE(_tablet->is_rowset_warmed_up(rowset->rowset_id()));
+}
+
+// Test is_rowset_warmed_up returns true for rowset with DONE state
+TEST_F(CloudTabletWarmUpStateTest, TestIsRowsetWarmedUpWithDoneState) {
+ auto rowset = create_rowset(Version(23, 23));
+ ASSERT_NE(rowset, nullptr);
+
+ _tablet->add_warmed_up_rowset(rowset->rowset_id());
+ EXPECT_TRUE(_tablet->is_rowset_warmed_up(rowset->rowset_id()));
+}
+
+// Test is_rowset_warmed_up returns false for rowset with DOING state (in map
but not done)
+TEST_F(CloudTabletWarmUpStateTest, TestIsRowsetWarmedUpWithDoingState) {
+ auto rowset = create_rowset(Version(24, 24));
+ ASSERT_NE(rowset, nullptr);
+
+ _tablet->add_not_warmed_up_rowset(rowset->rowset_id());
+ EXPECT_FALSE(_tablet->is_rowset_warmed_up(rowset->rowset_id()));
+}
+
+// Test add_not_warmed_up_rowset sets DOING state correctly
+TEST_F(CloudTabletWarmUpStateTest, TestAddNotWarmedUpRowset) {
+ auto rowset = create_rowset(Version(25, 25));
+ ASSERT_NE(rowset, nullptr);
+
+ _tablet->add_not_warmed_up_rowset(rowset->rowset_id());
+
+ WarmUpState state = _tablet->get_rowset_warmup_state(rowset->rowset_id());
+ WarmUpState expected_state =
+ WarmUpState {WarmUpTriggerSource::SYNC_ROWSET,
WarmUpProgress::DOING};
+ EXPECT_EQ(state, expected_state);
+}
+
+// Test that add_warmed_up_rowset can override add_not_warmed_up_rowset
+TEST_F(CloudTabletWarmUpStateTest, TestWarmedUpOverridesNotWarmedUp) {
+ auto rowset = create_rowset(Version(26, 26));
+ ASSERT_NE(rowset, nullptr);
+
+ // First mark as not warmed up
+ _tablet->add_not_warmed_up_rowset(rowset->rowset_id());
+ EXPECT_FALSE(_tablet->is_rowset_warmed_up(rowset->rowset_id()));
+
+ // Then mark as warmed up
+ _tablet->add_warmed_up_rowset(rowset->rowset_id());
+ EXPECT_TRUE(_tablet->is_rowset_warmed_up(rowset->rowset_id()));
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]