This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f278793173e [fix](cloud) retry read_at when corruption using file cache (#48786) f278793173e is described below commit f278793173e482fd20ef32c3a9887c473774d097 Author: zhengyu <zhangzhen...@selectdb.com> AuthorDate: Fri Mar 28 19:17:19 2025 +0800 [fix](cloud) retry read_at when corruption using file cache (#48786) --- be/src/cloud/injection_point_action.cpp | 22 +++++++ be/src/olap/rowset/segment_v2/column_reader.cpp | 23 ++++---- .../rowset/segment_v2/indexed_column_reader.cpp | 25 ++++---- .../olap/rowset/segment_v2/ordinal_page_index.cpp | 22 ++++--- be/src/olap/rowset/segment_v2/page_io.cpp | 67 ++++++++++++++++++++++ be/src/olap/rowset/segment_v2/page_io.h | 34 +++++++++-- be/src/olap/rowset/segment_v2/segment.cpp | 30 ++++------ 7 files changed, 162 insertions(+), 61 deletions(-) diff --git a/be/src/cloud/injection_point_action.cpp b/be/src/cloud/injection_point_action.cpp index bc6676313c1..4fce2609d8e 100644 --- a/be/src/cloud/injection_point_action.cpp +++ b/be/src/cloud/injection_point_action.cpp @@ -27,7 +27,9 @@ #include "http/http_channel.h" #include "http/http_request.h" #include "http/http_status.h" +#include "io/cache/cached_remote_file_reader.h" #include "olap/rowset/rowset.h" +#include "olap/rowset/segment_v2/page_io.h" #include "util/stack_util.h" namespace doris { @@ -133,6 +135,26 @@ void register_suites() { *arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error"); }); }); + // curl "be_ip:http_port/api/injection_point/apply_suite/PageIO::read_and_decompress_page:crc_failure" + suite_map.emplace("PageIO::read_and_decompress_page:crc_failure", [] { + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("PageIO::read_and_decompress_page:crc_failure_inj", [](auto&& args) { + LOG(INFO) << "PageIO::read_and_decompress_page:crc_failure_inj"; + if (auto ctx = std::any_cast<segment_v2::InjectionContext*>(args[0])) { + uint32_t* crc = ctx->crc; + segment_v2::PageReadOptions* opts = ctx->opts; + auto cached_file_reader = + dynamic_cast<io::CachedRemoteFileReader*>(opts->file_reader); + if (cached_file_reader == nullptr) { + return; // if not cachedreader, then do nothing + } else { + memset(crc, 0, 32); + } + } else { + std::cerr << "Failed to cast std::any to InjectionContext*" << std::endl; + } + }); + }); // curl be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn' suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] { auto* sp = SyncPoint::get_instance(); diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp index bede84b4bee..431ef97dfbf 100644 --- a/be/src/olap/rowset/segment_v2/column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/column_reader.cpp @@ -356,18 +356,17 @@ Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const Pag PageHandle* handle, Slice* page_body, PageFooterPB* footer, BlockCompressionCodec* codec) const { iter_opts.sanity_check(); - PageReadOptions opts { - .verify_checksum = _opts.verify_checksum, - .use_page_cache = iter_opts.use_page_cache, - .kept_in_memory = _opts.kept_in_memory, - .type = iter_opts.type, - .file_reader = iter_opts.file_reader, - .page_pointer = pp, - .codec = codec, - .stats = iter_opts.stats, - .encoding_info = _encoding_info, - .io_ctx = iter_opts.io_ctx, - }; + PageReadOptions opts(iter_opts.io_ctx); + opts.verify_checksum = _opts.verify_checksum; + opts.use_page_cache = iter_opts.use_page_cache; + opts.kept_in_memory = _opts.kept_in_memory; + opts.type = iter_opts.type; + opts.file_reader = iter_opts.file_reader; + opts.page_pointer = pp; + opts.codec = codec; + opts.stats = iter_opts.stats; + opts.encoding_info = _encoding_info; + // index page should not pre decode if (iter_opts.type == INDEX_PAGE) opts.pre_decode = false; return PageIO::read_and_decompress_page(opts, handle, page_body, footer); diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index 3f582293ee4..154c5073cfc 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -123,19 +123,18 @@ Status IndexedColumnReader::read_page(const PagePointer& pp, PageHandle* handle, OlapReaderStatistics* stats) const { OlapReaderStatistics tmp_stats; OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; - PageReadOptions opts { - .use_page_cache = _use_page_cache, - .kept_in_memory = _kept_in_memory, - .pre_decode = pre_decode, - .type = type, - .file_reader = _file_reader.get(), - .page_pointer = pp, - .codec = codec, - .stats = stats_ptr, - .encoding_info = _encoding_info, - .io_ctx = io::IOContext {.is_index_data = true, - .file_cache_stats = &stats_ptr->file_cache_stats}, - }; + PageReadOptions opts(io::IOContext {.is_index_data = true, + .file_cache_stats = &stats_ptr->file_cache_stats}); + opts.use_page_cache = _use_page_cache; + opts.kept_in_memory = _kept_in_memory; + opts.pre_decode = pre_decode; + opts.type = type; + opts.file_reader = _file_reader.get(); + opts.page_pointer = pp; + opts.codec = codec; + opts.stats = stats_ptr; + opts.encoding_info = _encoding_info; + if (_is_pk_index) { opts.type = PRIMARY_KEY_INDEX_PAGE; } diff --git a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp index 4995e779892..8faab35cebb 100644 --- a/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp +++ b/be/src/olap/rowset/segment_v2/ordinal_page_index.cpp @@ -91,18 +91,16 @@ Status OrdinalIndexReader::_load(bool use_page_cache, bool kept_in_memory, // need to read index page OlapReaderStatistics tmp_stats; OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; - PageReadOptions opts { - .use_page_cache = use_page_cache, - .kept_in_memory = kept_in_memory, - .type = INDEX_PAGE, - .file_reader = _file_reader.get(), - .page_pointer = PagePointer(index_meta->root_page().root_page()), - // ordinal index page uses NO_COMPRESSION right now - .codec = nullptr, - .stats = stats_ptr, - .io_ctx = io::IOContext {.is_index_data = true, - .file_cache_stats = &stats_ptr->file_cache_stats}, - }; + PageReadOptions opts(io::IOContext {.is_index_data = true, + .file_cache_stats = &stats_ptr->file_cache_stats}); + opts.use_page_cache = use_page_cache; + opts.kept_in_memory = kept_in_memory; + opts.type = INDEX_PAGE; + opts.file_reader = _file_reader.get(); + opts.page_pointer = PagePointer(index_meta->root_page().root_page()); + // ordinal index page uses NO_COMPRESSION right now + opts.codec = nullptr; + opts.stats = stats_ptr; // read index page PageHandle page_handle; diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp index d014f6627f5..343c395c875 100644 --- a/be/src/olap/rowset/segment_v2/page_io.cpp +++ b/be/src/olap/rowset/segment_v2/page_io.cpp @@ -27,8 +27,13 @@ #include <string> #include <utility> +#include "cloud/config.h" #include "common/logging.h" +#include "cpp/sync_point.h" #include "gutil/strings/substitute.h" +#include "io/cache/block_file_cache.h" +#include "io/cache/block_file_cache_factory.h" +#include "io/cache/cached_remote_file_reader.h" #include "io/fs/file_reader.h" #include "io/fs/file_writer.h" #include "olap/olap_common.h" @@ -111,6 +116,15 @@ Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body return Status::OK(); } +io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) { + std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0 + return io::BlockFileCache::hash(base); +} + +std::string file_cache_key_str(const std::string& seg_path) { + return file_cache_key_from_path(seg_path).to_string(); +} + Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle, Slice* body, PageFooterPB* footer) { opts.sanity_check(); @@ -161,6 +175,9 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle if (opts.verify_checksum) { uint32_t expect = decode_fixed32_le((uint8_t*)page_slice.data + page_slice.size - 4); uint32_t actual = crc32c::Value(page_slice.data, page_slice.size - 4); + InjectionContext ctx = {&actual, const_cast<PageReadOptions*>(&opts)}; + (void)ctx; + TEST_INJECTION_POINT_CALLBACK("PageIO::read_and_decompress_page:crc_failure_inj", &ctx); if (expect != actual) { return Status::Corruption( "Bad page: checksum mismatch (actual={} vs expect={}), file={}", actual, expect, @@ -231,5 +248,55 @@ Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle return Status::OK(); } +Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, + Slice* body, PageFooterPB* footer) { + // First try to read with file cache + Status st = do_read_and_decompress_page(opts, handle, body, footer); + if (!st.is<ErrorCode::CORRUPTION>() || !config::is_cloud_mode()) { + return st; + } + + auto* cached_file_reader = dynamic_cast<io::CachedRemoteFileReader*>(opts.file_reader); + if (cached_file_reader == nullptr) { + return st; + } + + // If we get CORRUPTION error and using file cache, clear cache and retry + LOG(WARNING) << "Bad page may be read from file cache, need retry." + << " error msg: " << st.msg() + << " file path: " << opts.file_reader->path().native() + << " offset: " << opts.page_pointer.offset; + + // Remove cache if exists + const std::string path = opts.file_reader->path().string(); + auto file_key = file_cache_key_from_path(path); + auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); + if (file_cache) { + file_cache->remove_if_cached(file_key); + } + + // Retry with file cache + st = do_read_and_decompress_page(opts, handle, body, footer); + if (!st.is<ErrorCode::CORRUPTION>()) { + return st; + } + + LOG(WARNING) << "Corruption again with retry downloading cache," + << " error msg: " << st.msg() + << " file path: " << opts.file_reader->path().native() + << " offset: " << opts.page_pointer.offset; + + PageReadOptions new_opts = opts; + new_opts.file_reader = cached_file_reader->get_remote_reader(); + st = do_read_and_decompress_page(new_opts, handle, body, footer); + if (!st.ok()) { + LOG(WARNING) << "Corruption again with retry read directly from remote," + << " error msg: " << st.msg() + << " file path: " << opts.file_reader->path().native() + << " offset: " << opts.page_pointer.offset << " Give up."; + } + return st; +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/page_io.h b/be/src/olap/rowset/segment_v2/page_io.h index b23af4b0b35..b3c9a1ca798 100644 --- a/be/src/olap/rowset/segment_v2/page_io.h +++ b/be/src/olap/rowset/segment_v2/page_io.h @@ -23,6 +23,7 @@ #include "common/logging.h" #include "common/status.h" +#include "io/cache/block_file_cache.h" #include "io/io_common.h" #include "olap/rowset/segment_v2/page_pointer.h" #include "util/slice.h" @@ -41,6 +42,9 @@ namespace segment_v2 { class EncodingInfo; class PageHandle; +io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path); +std::string file_cache_key_str(const std::string& seg_path); + struct PageReadOptions { // whether to verify page checksum bool verify_checksum = true; @@ -66,12 +70,31 @@ struct PageReadOptions { const EncodingInfo* encoding_info = nullptr; - const io::IOContext& io_ctx; + const io::IOContext io_ctx; void sanity_check() const { CHECK_NOTNULL(file_reader); CHECK_NOTNULL(stats); } + PageReadOptions(const io::IOContext& ioctx) : io_ctx(ioctx) {} + + PageReadOptions(const PageReadOptions& old) : io_ctx(old.io_ctx) { + file_reader = old.file_reader; + page_pointer = old.page_pointer; + codec = old.codec; + stats = old.stats; + verify_checksum = old.verify_checksum; + use_page_cache = old.use_page_cache; + kept_in_memory = old.kept_in_memory; + type = old.type; + encoding_info = old.encoding_info; + pre_decode = old.pre_decode; + } +}; + +struct InjectionContext { + uint32_t* crc; + PageReadOptions* opts; }; inline std::ostream& operator<<(std::ostream& os, const PageReadOptions& opt) { @@ -124,13 +147,16 @@ public: // `body' points to page body, // `footer' stores the page footer. // This method is exception safe, it will failed when allocate memory failed. + // deal with CORRUPTION when using file cache, retry from remote static Status read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, - Slice* body, PageFooterPB* footer) { + Slice* body, PageFooterPB* footer); + +private: + static Status do_read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle, + Slice* body, PageFooterPB* footer) { RETURN_IF_CATCH_EXCEPTION( { return read_and_decompress_page_(opts, handle, body, footer); }); } - -private: // An internal method that not deal with exception. static Status read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle, Slice* body, PageFooterPB* footer); diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 96a3a74e656..90135b5826d 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -78,15 +78,6 @@ namespace doris::segment_v2 { class InvertedIndexIterator; -io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) { - std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0 - return io::BlockFileCache::hash(base); -} - -std::string file_cache_key_str(const std::string& seg_path) { - return file_cache_key_from_path(seg_path).to_string(); -} - Status Segment::open(io::FileSystemSPtr fs, const std::string& path, int64_t tablet_id, uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, const io::FileReaderOptions& reader_options, std::shared_ptr<Segment>* output, @@ -527,17 +518,16 @@ Status Segment::load_index(OlapReaderStatistics* stats) { // read and parse short key index page OlapReaderStatistics tmp_stats; OlapReaderStatistics* stats_ptr = stats != nullptr ? stats : &tmp_stats; - PageReadOptions opts { - .use_page_cache = true, - .type = INDEX_PAGE, - .file_reader = _file_reader.get(), - .page_pointer = PagePointer(_sk_index_page), - // short key index page uses NO_COMPRESSION for now - .codec = nullptr, - .stats = &tmp_stats, - .io_ctx = io::IOContext {.is_index_data = true, - .file_cache_stats = &stats_ptr->file_cache_stats}, - }; + PageReadOptions opts(io::IOContext {.is_index_data = true, + .file_cache_stats = &stats_ptr->file_cache_stats}); + opts.use_page_cache = true; + opts.type = INDEX_PAGE; + opts.file_reader = _file_reader.get(); + opts.page_pointer = PagePointer(_sk_index_page); + // short key index page uses NO_COMPRESSION for now + opts.codec = nullptr; + opts.stats = &tmp_stats; + Slice body; PageFooterPB footer; RETURN_IF_ERROR( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org