This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 01b9e620034 [improvement](file cache) Try to read from remote storage when opening segment with CachedRemoteFileReader (#38645) 01b9e620034 is described below commit 01b9e62003408b10d01cd285616dd7c490e3c87c Author: Gavin Chou <gavineaglec...@gmail.com> AuthorDate: Tue Aug 6 12:08:08 2024 +0800 [improvement](file cache) Try to read from remote storage when opening segment with CachedRemoteFileReader (#38645) We may encounter the error "Bad segment" file in some rare cases where the file cache may not hold the correct segment files. We should read the remote original segment files to increase robustness. --- be/src/cloud/injection_point_action.cpp | 19 +++++++- be/src/olap/rowset/segment_v2/segment.cpp | 75 ++++++++++++++++++++++++++----- 2 files changed, 81 insertions(+), 13 deletions(-) diff --git a/be/src/cloud/injection_point_action.cpp b/be/src/cloud/injection_point_action.cpp index d5a13238837..be90ee23afd 100644 --- a/be/src/cloud/injection_point_action.cpp +++ b/be/src/cloud/injection_point_action.cpp @@ -108,6 +108,22 @@ void register_suites() { sp->set_call_back("VOlapTableSink::close", [](auto&&) { std::this_thread::sleep_for(std::chrono::seconds(5)); }); }); + suite_map.emplace("test_file_segment_cache_corruption", [] { + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("Segment::open:corruption", [](auto&& args) { + LOG(INFO) << "injection Segment::open:corruption"; + auto* arg0 = try_any_cast<Status*>(args[0]); + *arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error"); + }); + }); + suite_map.emplace("test_file_segment_cache_corruption1", [] { + auto* sp = SyncPoint::get_instance(); + sp->set_call_back("Segment::open:corruption1", [](auto&& args) { + LOG(INFO) << "injection Segment::open:corruption1"; + auto* arg0 = try_any_cast<Status*>(args[0]); + *arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error"); + }); + }); } void set_sleep(const std::string& point, HttpRequest* req) { @@ -215,6 +231,7 @@ void handle_set(HttpRequest* req) { void handle_clear(HttpRequest* req) { const auto& point = req->param("name"); auto* sp = SyncPoint::get_instance(); + LOG(INFO) << "clear injection point : " << (point.empty() ? "(all points)" : point); if (point.empty()) { // If point name is emtpy, clear all sp->clear_all_call_backs(); @@ -257,7 +274,7 @@ void handle_disable(HttpRequest* req) { InjectionPointAction::InjectionPointAction() = default; void InjectionPointAction::handle(HttpRequest* req) { - LOG(INFO) << req->debug_string(); + LOG(INFO) << "handle InjectionPointAction " << req->debug_string(); auto& op = req->param("op"); if (op == "set") { handle_set(req); diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 8d862528de6..1b31117f126 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -28,7 +28,9 @@ #include "common/logging.h" #include "common/status.h" +#include "cpp/sync_point.h" #include "io/cache/block_file_cache.h" +#include "io/cache/block_file_cache_factory.h" #include "io/fs/file_reader.h" #include "io/fs/file_system.h" #include "io/io_common.h" @@ -77,6 +79,15 @@ namespace doris::segment_v2 { static bvar::Adder<size_t> g_total_segment_num("doris_total_segment_num"); class InvertedIndexIterator; +io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) { + std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: npos + 1 == 0 + return io::BlockFileCache::hash(base); +} + +std::string file_cache_key_str(const std::string& seg_path) { + return file_cache_key_from_path(seg_path).to_string(); +} + Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t segment_id, RowsetId rowset_id, TabletSchemaSPtr tablet_schema, const io::FileReaderOptions& reader_options, @@ -84,9 +95,43 @@ Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t se io::FileReaderSPtr file_reader; RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options)); std::shared_ptr<Segment> segment(new Segment(segment_id, rowset_id, std::move(tablet_schema))); - segment->_fs = std::move(fs); + segment->_fs = fs; segment->_file_reader = std::move(file_reader); - RETURN_IF_ERROR(segment->_open()); + auto st = segment->_open(); + TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st); + if (st.is<ErrorCode::CORRUPTION>() && + reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) { + LOG(WARNING) << "bad segment file may be read from file cache, try to read remote source " + "file directly, file path: " + << path << " cache_key: " << file_cache_key_str(path); + auto file_key = file_cache_key_from_path(path); + auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); + file_cache->remove_if_cached(file_key); + + RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options)); + segment->_file_reader = std::move(file_reader); + st = segment->_open(); + TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st); + if (st.is<ErrorCode::CORRUPTION>()) { // corrupt again + LOG(WARNING) << "failed to try to read remote source file again with cache support," + << " try to read from remote directly, " + << " file path: " << path << " cache_key: " << file_cache_key_str(path); + file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); + file_cache->remove_if_cached(file_key); + + io::FileReaderOptions opt = reader_options; + opt.cache_type = io::FileCachePolicy::NO_CACHE; // skip cache + RETURN_IF_ERROR(fs->open_file(path, &file_reader, &opt)); + segment->_file_reader = std::move(file_reader); + st = segment->_open(); + if (!st.ok()) { + LOG(WARNING) << "failed to try to read remote source file directly," + << " file path: " << path + << " cache_key: " << file_cache_key_str(path); + } + } + } + RETURN_IF_ERROR(st); *output = std::move(segment); return Status::OK(); } @@ -256,8 +301,9 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) { // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4) auto file_size = _file_reader->size(); if (file_size < 12) { - return Status::Corruption("Bad segment file {}: file size {} < 12", - _file_reader->path().native(), file_size); + return Status::Corruption("Bad segment file {}: file size {} < 12, cache_key: {}", + _file_reader->path().native(), file_size, + file_cache_key_str(_file_reader->path().native())); } uint8_t fixed_buf[12]; @@ -269,15 +315,17 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) { DCHECK_EQ(bytes_read, 12); if (memcmp(fixed_buf + 8, k_segment_magic, k_segment_magic_length) != 0) { - return Status::Corruption("Bad segment file {}: magic number not match", - _file_reader->path().native()); + return Status::Corruption("Bad segment file {}: magic number not match, cache_key: {}", + _file_reader->path().native(), + file_cache_key_str(_file_reader->path().native())); } // read footer PB uint32_t footer_length = decode_fixed32_le(fixed_buf); if (file_size < 12 + footer_length) { - return Status::Corruption("Bad segment file {}: file size {} < {}", - _file_reader->path().native(), file_size, 12 + footer_length); + return Status::Corruption("Bad segment file {}: file size {} < {}, cache_key: {}", + _file_reader->path().native(), file_size, 12 + footer_length, + file_cache_key_str(_file_reader->path().native())); } std::string footer_buf; @@ -291,14 +339,17 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) { uint32_t actual_checksum = crc32c::Value(footer_buf.data(), footer_buf.size()); if (actual_checksum != expect_checksum) { return Status::Corruption( - "Bad segment file {}: footer checksum not match, actual={} vs expect={}", - _file_reader->path().native(), actual_checksum, expect_checksum); + "Bad segment file {}: footer checksum not match, actual={} vs expect={}, " + "cache_key: {}", + _file_reader->path().native(), actual_checksum, expect_checksum, + file_cache_key_str(_file_reader->path().native())); } // deserialize footer PB if (!footer->ParseFromString(footer_buf)) { - return Status::Corruption("Bad segment file {}: failed to parse SegmentFooterPB", - _file_reader->path().native()); + return Status::Corruption( + "Bad segment file {}: failed to parse SegmentFooterPB, cache_key: ", + _file_reader->path().native(), file_cache_key_str(_file_reader->path().native())); } return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org