This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 01b9e620034 [improvement](file cache) Try to read from remote storage 
when opening segment with CachedRemoteFileReader (#38645)
01b9e620034 is described below

commit 01b9e62003408b10d01cd285616dd7c490e3c87c
Author: Gavin Chou <gavineaglec...@gmail.com>
AuthorDate: Tue Aug 6 12:08:08 2024 +0800

    [improvement](file cache) Try to read from remote storage when opening 
segment with CachedRemoteFileReader (#38645)
    
    We may encounter the error "Bad segment" file in some rare cases where
    the file cache may not hold the correct segment files. We should read
    the remote original segment files to increase robustness.
---
 be/src/cloud/injection_point_action.cpp   | 19 +++++++-
 be/src/olap/rowset/segment_v2/segment.cpp | 75 ++++++++++++++++++++++++++-----
 2 files changed, 81 insertions(+), 13 deletions(-)

diff --git a/be/src/cloud/injection_point_action.cpp 
b/be/src/cloud/injection_point_action.cpp
index d5a13238837..be90ee23afd 100644
--- a/be/src/cloud/injection_point_action.cpp
+++ b/be/src/cloud/injection_point_action.cpp
@@ -108,6 +108,22 @@ void register_suites() {
         sp->set_call_back("VOlapTableSink::close",
                           [](auto&&) { 
std::this_thread::sleep_for(std::chrono::seconds(5)); });
     });
+    suite_map.emplace("test_file_segment_cache_corruption", [] {
+        auto* sp = SyncPoint::get_instance();
+        sp->set_call_back("Segment::open:corruption", [](auto&& args) {
+            LOG(INFO) << "injection Segment::open:corruption";
+            auto* arg0 = try_any_cast<Status*>(args[0]);
+            *arg0 = 
Status::Corruption<false>("test_file_segment_cache_corruption injection error");
+        });
+    });
+    suite_map.emplace("test_file_segment_cache_corruption1", [] {
+        auto* sp = SyncPoint::get_instance();
+        sp->set_call_back("Segment::open:corruption1", [](auto&& args) {
+            LOG(INFO) << "injection Segment::open:corruption1";
+            auto* arg0 = try_any_cast<Status*>(args[0]);
+            *arg0 = 
Status::Corruption<false>("test_file_segment_cache_corruption injection error");
+        });
+    });
 }
 
 void set_sleep(const std::string& point, HttpRequest* req) {
@@ -215,6 +231,7 @@ void handle_set(HttpRequest* req) {
 void handle_clear(HttpRequest* req) {
     const auto& point = req->param("name");
     auto* sp = SyncPoint::get_instance();
+    LOG(INFO) << "clear injection point : " << (point.empty() ? "(all points)" 
: point);
     if (point.empty()) {
         // If point name is emtpy, clear all
         sp->clear_all_call_backs();
@@ -257,7 +274,7 @@ void handle_disable(HttpRequest* req) {
 InjectionPointAction::InjectionPointAction() = default;
 
 void InjectionPointAction::handle(HttpRequest* req) {
-    LOG(INFO) << req->debug_string();
+    LOG(INFO) << "handle InjectionPointAction " << req->debug_string();
     auto& op = req->param("op");
     if (op == "set") {
         handle_set(req);
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp 
b/be/src/olap/rowset/segment_v2/segment.cpp
index 8d862528de6..1b31117f126 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -28,7 +28,9 @@
 
 #include "common/logging.h"
 #include "common/status.h"
+#include "cpp/sync_point.h"
 #include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
 #include "io/fs/file_reader.h"
 #include "io/fs/file_system.h"
 #include "io/io_common.h"
@@ -77,6 +79,15 @@ namespace doris::segment_v2 {
 static bvar::Adder<size_t> g_total_segment_num("doris_total_segment_num");
 class InvertedIndexIterator;
 
+io::UInt128Wrapper file_cache_key_from_path(const std::string& seg_path) {
+    std::string base = seg_path.substr(seg_path.rfind('/') + 1); // tricky: 
npos + 1 == 0
+    return io::BlockFileCache::hash(base);
+}
+
+std::string file_cache_key_str(const std::string& seg_path) {
+    return file_cache_key_from_path(seg_path).to_string();
+}
+
 Status Segment::open(io::FileSystemSPtr fs, const std::string& path, uint32_t 
segment_id,
                      RowsetId rowset_id, TabletSchemaSPtr tablet_schema,
                      const io::FileReaderOptions& reader_options,
@@ -84,9 +95,43 @@ Status Segment::open(io::FileSystemSPtr fs, const 
std::string& path, uint32_t se
     io::FileReaderSPtr file_reader;
     RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options));
     std::shared_ptr<Segment> segment(new Segment(segment_id, rowset_id, 
std::move(tablet_schema)));
-    segment->_fs = std::move(fs);
+    segment->_fs = fs;
     segment->_file_reader = std::move(file_reader);
-    RETURN_IF_ERROR(segment->_open());
+    auto st = segment->_open();
+    TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption", &st);
+    if (st.is<ErrorCode::CORRUPTION>() &&
+        reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) {
+        LOG(WARNING) << "bad segment file may be read from file cache, try to 
read remote source "
+                        "file directly, file path: "
+                     << path << " cache_key: " << file_cache_key_str(path);
+        auto file_key = file_cache_key_from_path(path);
+        auto* file_cache = 
io::FileCacheFactory::instance()->get_by_path(file_key);
+        file_cache->remove_if_cached(file_key);
+
+        RETURN_IF_ERROR(fs->open_file(path, &file_reader, &reader_options));
+        segment->_file_reader = std::move(file_reader);
+        st = segment->_open();
+        TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st);
+        if (st.is<ErrorCode::CORRUPTION>()) { // corrupt again
+            LOG(WARNING) << "failed to try to read remote source file again 
with cache support,"
+                         << " try to read from remote directly, "
+                         << " file path: " << path << " cache_key: " << 
file_cache_key_str(path);
+            file_cache = 
io::FileCacheFactory::instance()->get_by_path(file_key);
+            file_cache->remove_if_cached(file_key);
+
+            io::FileReaderOptions opt = reader_options;
+            opt.cache_type = io::FileCachePolicy::NO_CACHE; // skip cache
+            RETURN_IF_ERROR(fs->open_file(path, &file_reader, &opt));
+            segment->_file_reader = std::move(file_reader);
+            st = segment->_open();
+            if (!st.ok()) {
+                LOG(WARNING) << "failed to try to read remote source file 
directly,"
+                             << " file path: " << path
+                             << " cache_key: " << file_cache_key_str(path);
+            }
+        }
+    }
+    RETURN_IF_ERROR(st);
     *output = std::move(segment);
     return Status::OK();
 }
@@ -256,8 +301,9 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
     // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), 
MagicNumber(4)
     auto file_size = _file_reader->size();
     if (file_size < 12) {
-        return Status::Corruption("Bad segment file {}: file size {} < 12",
-                                  _file_reader->path().native(), file_size);
+        return Status::Corruption("Bad segment file {}: file size {} < 12, 
cache_key: {}",
+                                  _file_reader->path().native(), file_size,
+                                  
file_cache_key_str(_file_reader->path().native()));
     }
 
     uint8_t fixed_buf[12];
@@ -269,15 +315,17 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
     DCHECK_EQ(bytes_read, 12);
 
     if (memcmp(fixed_buf + 8, k_segment_magic, k_segment_magic_length) != 0) {
-        return Status::Corruption("Bad segment file {}: magic number not 
match",
-                                  _file_reader->path().native());
+        return Status::Corruption("Bad segment file {}: magic number not 
match, cache_key: {}",
+                                  _file_reader->path().native(),
+                                  
file_cache_key_str(_file_reader->path().native()));
     }
 
     // read footer PB
     uint32_t footer_length = decode_fixed32_le(fixed_buf);
     if (file_size < 12 + footer_length) {
-        return Status::Corruption("Bad segment file {}: file size {} < {}",
-                                  _file_reader->path().native(), file_size, 12 
+ footer_length);
+        return Status::Corruption("Bad segment file {}: file size {} < {}, 
cache_key: {}",
+                                  _file_reader->path().native(), file_size, 12 
+ footer_length,
+                                  
file_cache_key_str(_file_reader->path().native()));
     }
 
     std::string footer_buf;
@@ -291,14 +339,17 @@ Status Segment::_parse_footer(SegmentFooterPB* footer) {
     uint32_t actual_checksum = crc32c::Value(footer_buf.data(), 
footer_buf.size());
     if (actual_checksum != expect_checksum) {
         return Status::Corruption(
-                "Bad segment file {}: footer checksum not match, actual={} vs 
expect={}",
-                _file_reader->path().native(), actual_checksum, 
expect_checksum);
+                "Bad segment file {}: footer checksum not match, actual={} vs 
expect={}, "
+                "cache_key: {}",
+                _file_reader->path().native(), actual_checksum, 
expect_checksum,
+                file_cache_key_str(_file_reader->path().native()));
     }
 
     // deserialize footer PB
     if (!footer->ParseFromString(footer_buf)) {
-        return Status::Corruption("Bad segment file {}: failed to parse 
SegmentFooterPB",
-                                  _file_reader->path().native());
+        return Status::Corruption(
+                "Bad segment file {}: failed to parse SegmentFooterPB, 
cache_key: ",
+                _file_reader->path().native(), 
file_cache_key_str(_file_reader->path().native()));
     }
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to