This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f278793173e [fix](cloud) retry read_at when corruption using file 
cache (#48786)
f278793173e is described below

commit f278793173e482fd20ef32c3a9887c473774d097
Author: zhengyu <zhangzhen...@selectdb.com>
AuthorDate: Fri Mar 28 19:17:19 2025 +0800

    [fix](cloud) retry read_at when corruption using file cache (#48786)
---
 be/src/cloud/injection_point_action.cpp            | 22 +++++++
 be/src/olap/rowset/segment_v2/column_reader.cpp    | 23 ++++----
 .../rowset/segment_v2/indexed_column_reader.cpp    | 25 ++++----
 .../olap/rowset/segment_v2/ordinal_page_index.cpp  | 22 ++++---
 be/src/olap/rowset/segment_v2/page_io.cpp          | 67 ++++++++++++++++++++++
 be/src/olap/rowset/segment_v2/page_io.h            | 34 +++++++++--
 be/src/olap/rowset/segment_v2/segment.cpp          | 30 ++++------
 7 files changed, 162 insertions(+), 61 deletions(-)

diff --git a/be/src/cloud/injection_point_action.cpp 
b/be/src/cloud/injection_point_action.cpp
index bc6676313c1..4fce2609d8e 100644
--- a/be/src/cloud/injection_point_action.cpp
+++ b/be/src/cloud/injection_point_action.cpp
@@ -27,7 +27,9 @@
 #include "http/http_channel.h"
 #include "http/http_request.h"
 #include "http/http_status.h"
+#include "io/cache/cached_remote_file_reader.h"
 #include "olap/rowset/rowset.h"
+#include "olap/rowset/segment_v2/page_io.h"
 #include "util/stack_util.h"
 
 namespace doris {
@@ -133,6 +135,26 @@ void register_suites() {
             *arg0 = 
Status::Corruption<false>("test_file_segment_cache_corruption injection error");
         });
     });
+    // curl 
"be_ip:http_port/api/injection_point/apply_suite/PageIO::read_and_decompress_page:crc_failure"
+    suite_map.emplace("PageIO::read_and_decompress_page:crc_failure", [] {
+        auto* sp = SyncPoint::get_instance();
+        sp->set_call_back("PageIO::read_and_decompress_page:crc_failure_inj", 
[](auto&& args) {
+            LOG(INFO) << "PageIO::read_and_decompress_page:crc_failure_inj";
+            if (auto ctx = 
std::any_cast<segment_v2::InjectionContext*>(args[0])) {
+                uint32_t* crc = ctx->crc;
+                segment_v2::PageReadOptions* opts = ctx->opts;
+                auto cached_file_reader =
+                        
dynamic_cast<io::CachedRemoteFileReader*>(opts->file_reader);
+                if (cached_file_reader == nullptr) {
+                    return; // if not cachedreader, then do nothing
+                } else {
+                    memset(crc, 0, 32);
+                }
+            } else {
+                std::cerr << "Failed to cast std::any to InjectionContext*" << 
std::endl;
+            }
+        });
+    });
     // curl 
be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn'
     suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] {
         auto* sp = SyncPoint::get_instance();
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp 
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index bede84b4bee..431ef97dfbf 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -356,18 +356,17 @@ Status ColumnReader::read_page(const 
ColumnIteratorOptions& iter_opts, const Pag
                                PageHandle* handle, Slice* page_body, 
PageFooterPB* footer,
                                BlockCompressionCodec* codec) const {
     iter_opts.sanity_check();
-    PageReadOptions opts {
-            .verify_checksum = _opts.verify_checksum,
-            .use_page_cache = iter_opts.use_page_cache,
-            .kept_in_memory = _opts.kept_in_memory,
-            .type = iter_opts.type,
-            .file_reader = iter_opts.file_reader,
-            .page_pointer = pp,
-            .codec = codec,
-            .stats = iter_opts.stats,
-            .encoding_info = _encoding_info,
-            .io_ctx = iter_opts.io_ctx,
-    };
+    PageReadOptions opts(iter_opts.io_ctx);
+    opts.verify_checksum = _opts.verify_checksum;
+    opts.use_page_cache = iter_opts.use_page_cache;
+    opts.kept_in_memory = _opts.kept_in_memory;
+    opts.type = iter_opts.type;
+    opts.file_reader = iter_opts.file_reader;
+    opts.page_pointer = pp;
+    opts.codec = codec;
+    opts.stats = iter_opts.stats;
+    opts.encoding_info = _encoding_info;
+
     // index page should not pre decode
     if (iter_opts.type == INDEX_PAGE) opts.pre_decode = false;
     return PageIO::read_and_decompress_page(opts, handle, page_body, footer);
diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp 
b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
index 3f582293ee4..154c5073cfc 100644
--- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp
@@ -123,19 +123,18 @@ Status IndexedColumnReader::read_page(const PagePointer& 
pp, PageHandle* handle,
                                       OlapReaderStatistics* stats) const {
     OlapReaderStatistics tmp_stats;
     OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
-    PageReadOptions opts {
-            .use_page_cache = _use_page_cache,
-            .kept_in_memory = _kept_in_memory,
-            .pre_decode = pre_decode,
-            .type = type,
-            .file_reader = _file_reader.get(),
-            .page_pointer = pp,
-            .codec = codec,
-            .stats = stats_ptr,
-            .encoding_info = _encoding_info,
-            .io_ctx = io::IOContext {.is_index_data = true,
-                                     .file_cache_stats = 
&stats_ptr->file_cache_stats},
-    };
+    PageReadOptions opts(io::IOContext {.is_index_data = true,
+                                        .file_cache_stats = 
&stats_ptr->file_cache_stats});
+    opts.use_page_cache = _use_page_cache;
+    opts.kept_in_memory = _kept_in_memory;
+    opts.pre_decode = pre_decode;
+    opts.type = type;
+    opts.file_reader = _file_reader.get();
+    opts.page_pointer = pp;
+    opts.codec = codec;
+    opts.stats = stats_ptr;
+    opts.encoding_info = _encoding_info;
+
     if (_is_pk_index) {
         opts.type = PRIMARY_KEY_INDEX_PAGE;
     }
diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp 
b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
index 4995e779892..8faab35cebb 100644
--- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
+++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp
@@ -91,18 +91,16 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool 
kept_in_memory,
     // need to read index page
     OlapReaderStatistics tmp_stats;
     OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats;
-    PageReadOptions opts {
-            .use_page_cache = use_page_cache,
-            .kept_in_memory = kept_in_memory,
-            .type = INDEX_PAGE,
-            .file_reader = _file_reader.get(),
-            .page_pointer = PagePointer(index_meta->root_page().root_page()),
-            // ordinal index page uses NO_COMPRESSION right now
-            .codec = nullptr,
-            .stats = stats_ptr,
-            .io_ctx = io::IOContext {.is_index_data = true,
-                                     .file_cache_stats = 
&stats_ptr->file_cache_stats},
-    };
+    PageReadOptions opts(io::IOContext {.is_index_data = true,
+                                        .file_cache_stats = 
&stats_ptr->file_cache_stats});
+    opts.use_page_cache = use_page_cache;
+    opts.kept_in_memory = kept_in_memory;
+    opts.type = INDEX_PAGE;
+    opts.file_reader = _file_reader.get();
+    opts.page_pointer = PagePointer(index_meta->root_page().root_page());
+    // ordinal index page uses NO_COMPRESSION right now
+    opts.codec = nullptr;
+    opts.stats = stats_ptr;
 
     // read index page
     PageHandle page_handle;
diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp 
b/be/src/olap/rowset/segment_v2/page_io.cpp
index d014f6627f5..343c395c875 100644
--- a/be/src/olap/rowset/segment_v2/page_io.cpp
+++ b/be/src/olap/rowset/segment_v2/page_io.cpp
@@ -27,8 +27,13 @@
 #include <string>
 #include <utility>
 
+#include "cloud/config.h"
 #include "common/logging.h"
+#include "cpp/sync_point.h"
 #include "gutil/strings/substitute.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/cached_remote_file_reader.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/file_writer.h"
 #include "olap/olap_common.h"
@@ -111,6 +116,15 @@ Status PageIO::write_page(io::FileWriter* writer, const 
std::vector<Slice>& body
     return Status::OK();
 }
 
+io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
+    std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: 
npos + 1 == 0
+    return io::BlockFileCache::hash(base);
+}
+
+std::string file_cache_key_str(const std::string& seg_path) {
+    return file_cache_key_from_path(seg_path).to_string();
+}
+
 Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, 
PageHandle* handle,
                                          Slice* body, PageFooterPB* footer) {
     opts.sanity_check();
@@ -161,6 +175,9 @@ Status PageIO::read_and_decompress_page_(const 
PageReadOptions& opts, PageHandle
     if (opts.verify_checksum) {
         uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + 
page_slice.size - 4);
         uint32_t actual = crc32c::Value(page_slice.data, page_slice.size - 4);
+        InjectionContext ctx = {&actual, const_cast<PageReadOptions*>(&opts)};
+        (void)ctx;
+        
TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj",
 &ctx);
         if (expect != actual) {
             return Status::Corruption(
                     "Bad page: checksum mismatch (actual={} vs expect={}), 
file={}", actual, expect,
@@ -231,5 +248,55 @@ Status PageIO::read_and_decompress_page_(const 
PageReadOptions& opts, PageHandle
     return Status::OK();
 }
 
+Status PageIO::read_and_decompress_page(const PageReadOptions& opts, 
PageHandle* handle,
+                                        Slice* body, PageFooterPB* footer) {
+    // First try to read with file cache
+    Status st = do_read_and_decompress_page(opts, handle, body, footer);
+    if (!st.is<ErrorCode::CORRUPTION>() || !config::is_cloud_mode()) {
+        return st;
+    }
+
+    auto* cached_file_reader = 
dynamic_cast<io::CachedRemoteFileReader*>(opts.file_reader);
+    if (cached_file_reader == nullptr) {
+        return st;
+    }
+
+    // If we get CORRUPTION error and using file cache, clear cache and retry
+    LOG(WARNING) << "Bad page may be read from file cache, need retry."
+                 << " error msg: " << st.msg()
+                 << " file path: " << opts.file_reader->path().native()
+                 << " offset: " << opts.page_pointer.offset;
+
+    // Remove cache if exists
+    const std::string path = opts.file_reader->path().string();
+    auto file_key = file_cache_key_from_path(path);
+    auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
+    if (file_cache) {
+        file_cache->remove_if_cached(file_key);
+    }
+
+    // Retry with file cache
+    st = do_read_and_decompress_page(opts, handle, body, footer);
+    if (!st.is<ErrorCode::CORRUPTION>()) {
+        return st;
+    }
+
+    LOG(WARNING) << "Corruption again with retry downloading cache,"
+                 << " error msg: " << st.msg()
+                 << " file path: " << opts.file_reader->path().native()
+                 << " offset: " << opts.page_pointer.offset;
+
+    PageReadOptions new_opts = opts;
+    new_opts.file_reader = cached_file_reader->get_remote_reader();
+    st = do_read_and_decompress_page(new_opts, handle, body, footer);
+    if (!st.ok()) {
+        LOG(WARNING) << "Corruption again with retry read directly from 
remote,"
+                     << " error msg: " << st.msg()
+                     << " file path: " << opts.file_reader->path().native()
+                     << " offset: " << opts.page_pointer.offset << " Give up.";
+    }
+    return st;
+}
+
 } // namespace segment_v2
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/page_io.h 
b/be/src/olap/rowset/segment_v2/page_io.h
index b23af4b0b35..b3c9a1ca798 100644
--- a/be/src/olap/rowset/segment_v2/page_io.h
+++ b/be/src/olap/rowset/segment_v2/page_io.h
@@ -23,6 +23,7 @@
 
 #include "common/logging.h"
 #include "common/status.h"
+#include "io/cache/block_file_cache.h"
 #include "io/io_common.h"
 #include "olap/rowset/segment_v2/page_pointer.h"
 #include "util/slice.h"
@@ -41,6 +42,9 @@ namespace segment_v2 {
 class EncodingInfo;
 class PageHandle;
 
+io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path);
+std::string file_cache_key_str(const std::string& seg_path);
+
 struct PageReadOptions {
     // whether to verify page checksum
     bool verify_checksum = true;
@@ -66,12 +70,31 @@ struct PageReadOptions {
 
     const EncodingInfo* encoding_info = nullptr;
 
-    const io::IOContext& io_ctx;
+    const io::IOContext io_ctx;
 
     void sanity_check() const {
         CHECK_NOTNULL(file_reader);
         CHECK_NOTNULL(stats);
     }
+    PageReadOptions(const io::IOContext& ioctx) : io_ctx(ioctx) {}
+
+    PageReadOptions(const PageReadOptions& old) : io_ctx(old.io_ctx) {
+        file_reader = old.file_reader;
+        page_pointer = old.page_pointer;
+        codec = old.codec;
+        stats = old.stats;
+        verify_checksum = old.verify_checksum;
+        use_page_cache = old.use_page_cache;
+        kept_in_memory = old.kept_in_memory;
+        type = old.type;
+        encoding_info = old.encoding_info;
+        pre_decode = old.pre_decode;
+    }
+};
+
+struct InjectionContext {
+    uint32_t* crc;
+    PageReadOptions* opts;
 };
 
 inline std::ostream& operator<<(std::ostream& os, const PageReadOptions& opt) {
@@ -124,13 +147,16 @@ public:
     //     `body' points to page body,
     //     `footer' stores the page footer.
     // This method is exception safe, it will failed when allocate memory 
failed.
+    // deal with CORRUPTION when using file cache, retry from remote
     static Status read_and_decompress_page(const PageReadOptions& opts, 
PageHandle* handle,
-                                           Slice* body, PageFooterPB* footer) {
+                                           Slice* body, PageFooterPB* footer);
+
+private:
+    static Status do_read_and_decompress_page(const PageReadOptions& opts, 
PageHandle* handle,
+                                              Slice* body, PageFooterPB* 
footer) {
         RETURN_IF_CATCH_EXCEPTION(
                 { return read_and_decompress_page_(opts, handle, body, 
footer); });
     }
-
-private:
     // An internal method that not deal with exception.
     static Status read_and_decompress_page_(const PageReadOptions& opts, 
PageHandle* handle,
                                             Slice* body, PageFooterPB* footer);
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index 96a3a74e656..90135b5826d 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -78,15 +78,6 @@ namespace doris::segment_v2 {
 
 class InvertedIndexIterator;
 
-io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
-    std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: 
npos + 1 == 0
-    return io::BlockFileCache::hash(base);
-}
-
-std::string file_cache_key_str(const std::string& seg_path) {
-    return file_cache_key_from_path(seg_path).to_string();
-}
-
 Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t 
tablet_id,
                      uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr 
tablet_schema,
                      const io::FileReaderOptions& reader_options, 
std::shared_ptr<Segment>* output,
@@ -527,17 +518,16 @@ Status Segment::load_index(OlapReaderStatistics* stats) {
             // read and parse short key index page
             OlapReaderStatistics tmp_stats;
             OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : 
&tmp_stats;
-            PageReadOptions opts {
-                    .use_page_cache = true,
-                    .type = INDEX_PAGE,
-                    .file_reader = _file_reader.get(),
-                    .page_pointer = PagePointer(_sk_index_page),
-                    // short key index page uses NO_COMPRESSION for now
-                    .codec = nullptr,
-                    .stats = &tmp_stats,
-                    .io_ctx = io::IOContext {.is_index_data = true,
-                                             .file_cache_stats = 
&stats_ptr->file_cache_stats},
-            };
+            PageReadOptions opts(io::IOContext {.is_index_data = true,
+                                                .file_cache_stats = 
&stats_ptr->file_cache_stats});
+            opts.use_page_cache = true;
+            opts.type = INDEX_PAGE;
+            opts.file_reader = _file_reader.get();
+            opts.page_pointer = PagePointer(_sk_index_page);
+            // short key index page uses NO_COMPRESSION for now
+            opts.codec = nullptr;
+            opts.stats = &tmp_stats;
+
             Slice body;
             PageFooterPB footer;
             RETURN_IF_ERROR(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to