This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fb98f5558e [Feature](Cloud) Support session variable 
disable_file_cache and enable_segment_cache in query (#37141)
2fb98f5558e is described below

commit 2fb98f5558e79ae64429ecccd87397008e8c0f29
Author: Shuo Wang <wangshuo...@gmail.com>
AuthorDate: Tue Aug 6 13:49:49 2024 +0800

    [Feature](Cloud) Support session variable disable_file_cache and 
enable_segment_cache in query (#37141)
    
    Currently, whether to read from file cache or remote storage is
    controlled by the BE config `enable_file_cache` in cloud mode.
    This PR proposed to control the file cache behavior via session
    variables when executing queries in cloud mode.
    It's more convenient when have such a session variable, cache behavior
    could be controlled per query/session without changing BE configs, such
    as:
    1. **Performance test**. Test the query performance when read from local
    file cache or remote storage for queries.
    2. **Data correctness**. Check if it's file cache issue for certain
    tables or queries.
    
    The read path has three kinds of caches: segment cache, page cache and
    file cache.
    
    | module       | cache| BE config    | session variable|
    |------------|------|----------| ---- |
    | Segment | segment cache | disable_segment_cache |
    **enable_segment_cache** (supportted by this PR) |
    | PageIO | page cache | disable_storage_page_cache | enable_page_cache |
    | FileReader | file cache | enable_file_cache | **disable_file_cache**
    (supportted by this PR) |
    
    The modification of the PR:
    
    - **enable_segment_cache**: add a new session variable
    enable_segment_cache to control use segment cache or not.
    - **disable_file_cache**: disable_file_cache was for write path in cloud
    mode. It's supported for read path when executing queries in the PR.
    
    With this PR,  data is read from remote storage without cache:
    ```sql
    set enable_segment_cache=false;
    set enable_page_cache=false;
    set disable_file_cache=true;
    ```
    
    Co-authored-by: Gavin Chou <gavineaglec...@gmail.com>
---
 be/src/exec/rowid_fetcher.cpp                        |  1 +
 be/src/olap/parallel_scanner_builder.cpp             |  9 ++++++++-
 be/src/olap/rowset/beta_rowset.cpp                   | 19 ++++++++++++-------
 be/src/olap/rowset/beta_rowset.h                     |  9 ++++++---
 be/src/olap/rowset/beta_rowset_reader.cpp            | 20 +++++++++++++++++---
 be/src/olap/segment_loader.cpp                       |  4 ++--
 be/src/olap/segment_loader.h                         |  3 ++-
 .../java/org/apache/doris/qe/SessionVariable.java    | 17 +++++++++++++++++
 gensrc/thrift/PaloInternalService.thrift             |  8 ++++++--
 9 files changed, 71 insertions(+), 19 deletions(-)

diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 0ec1c7ce3a3..beb8c2f0962 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -381,6 +381,7 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequest& request,
                     << ", row_size:" << row_size;
             *response->add_row_locs() = row_loc;
         });
+        // TODO: supoort session variable enable_page_cache and 
disable_file_cache if necessary.
         SegmentCacheHandle segment_cache;
         RETURN_IF_ERROR(scope_timer_run(
                 [&]() {
diff --git a/be/src/olap/parallel_scanner_builder.cpp 
b/be/src/olap/parallel_scanner_builder.cpp
index ac57448ade7..6a2503a70e9 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -179,13 +179,20 @@ Status ParallelScannerBuilder::_load() {
             RETURN_IF_ERROR(tablet->capture_consistent_rowsets_unlocked({0, 
version}, &rowsets));
         }
 
+        bool enable_segment_cache = 
_state->query_options().__isset.enable_segment_cache
+                                            ? 
_state->query_options().enable_segment_cache
+                                            : true;
+        bool disable_file_cache = 
_state->query_options().__isset.disable_file_cache
+                                          ? 
_state->query_options().disable_file_cache
+                                          : false;
         for (auto& rowset : rowsets) {
             RETURN_IF_ERROR(rowset->load());
             const auto rowset_id = rowset->rowset_id();
             auto& segment_cache_handle = _segment_cache_handles[rowset_id];
 
             RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
-                    std::dynamic_pointer_cast<BetaRowset>(rowset), 
&segment_cache_handle, true));
+                    std::dynamic_pointer_cast<BetaRowset>(rowset), 
&segment_cache_handle,
+                    enable_segment_cache, false, disable_file_cache));
             _total_rows += rowset->num_rows();
         }
     }
diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index 6d917c78d95..5114cc6595a 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -147,23 +147,26 @@ Status BetaRowset::get_segments_size(std::vector<size_t>* 
segments_size) {
     return Status::OK();
 }
 
-Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* 
segments) {
-    return load_segments(0, num_segments(), segments);
+Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* 
segments,
+                                 bool disable_file_cache) {
+    return load_segments(0, num_segments(), segments, disable_file_cache);
 }
 
 Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end,
-                                 std::vector<segment_v2::SegmentSharedPtr>* 
segments) {
+                                 std::vector<segment_v2::SegmentSharedPtr>* 
segments,
+                                 bool disable_file_cache) {
     int64_t seg_id = seg_id_begin;
     while (seg_id < seg_id_end) {
         std::shared_ptr<segment_v2::Segment> segment;
-        RETURN_IF_ERROR(load_segment(seg_id, &segment));
+        RETURN_IF_ERROR(load_segment(seg_id, &segment, disable_file_cache));
         segments->push_back(std::move(segment));
         seg_id++;
     }
     return Status::OK();
 }
 
-Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* 
segment) {
+Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* 
segment,
+                                bool disable_file_cache) {
     auto fs = _rowset_meta->fs();
     if (!fs) {
         return Status::Error<INIT_FAILED>("get fs failed");
@@ -172,12 +175,14 @@ Status BetaRowset::load_segment(int64_t seg_id, 
segment_v2::SegmentSharedPtr* se
     DCHECK(seg_id >= 0);
     auto seg_path = DORIS_TRY(segment_path(seg_id));
     io::FileReaderOptions reader_options {
-            .cache_type = config::enable_file_cache ? 
io::FileCachePolicy::FILE_BLOCK_CACHE
-                                                    : 
io::FileCachePolicy::NO_CACHE,
+            .cache_type = !disable_file_cache && config::enable_file_cache
+                                  ? io::FileCachePolicy::FILE_BLOCK_CACHE
+                                  : io::FileCachePolicy::NO_CACHE,
             .is_doris_table = true,
             .cache_base_path = "",
             .file_size = _rowset_meta->segment_file_size(seg_id),
     };
+
     auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), 
_schema, reader_options,
                                        segment);
     if (!s.ok()) {
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 52d5ac5c8a8..59ed6e061fe 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -71,12 +71,15 @@ public:
 
     Status check_file_exist() override;
 
-    Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);
+    Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
+                         bool disable_file_cache = false);
 
     Status load_segments(int64_t seg_id_begin, int64_t seg_id_end,
-                         std::vector<segment_v2::SegmentSharedPtr>* segments);
+                         std::vector<segment_v2::SegmentSharedPtr>* segments,
+                         bool disable_file_cache = false);
 
-    Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment);
+    Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment,
+                        bool disable_file_cache = false);
 
     Status get_segments_size(std::vector<size_t>* segments_size);
 
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 458b3d29547..4d953d1dbe3 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -249,10 +249,24 @@ Status 
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
     }
 
     // load segments
-    bool should_use_cache = use_cache || _read_context->reader_type == 
ReaderType::READER_QUERY;
+    bool disable_file_cache = false;
+    bool enable_segment_cache = true;
+    auto* state = read_context->runtime_state;
+    if (state != nullptr) {
+        disable_file_cache = state->query_options().__isset.disable_file_cache
+                                     ? 
state->query_options().disable_file_cache
+                                     : false;
+        enable_segment_cache = 
state->query_options().__isset.enable_segment_cache
+                                       ? 
state->query_options().enable_segment_cache
+                                       : true;
+    }
+    // When reader type is for query, session variable `enable_segment_cache` 
should be respected.
+    bool should_use_cache = use_cache || (_read_context->reader_type == 
ReaderType::READER_QUERY &&
+                                          enable_segment_cache);
     SegmentCacheHandle segment_cache_handle;
-    RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, 
&segment_cache_handle,
-                                                             
should_use_cache));
+    RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
+            _rowset, &segment_cache_handle, should_use_cache,
+            /*need_load_pk_index_and_bf*/ false, disable_file_cache));
 
     // create iterator for each segment
     auto& segments = segment_cache_handle.get_segments();
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index 12ab89af0be..98db0351240 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -52,7 +52,7 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) {
 
 Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
                                     SegmentCacheHandle* cache_handle, bool 
use_cache,
-                                    bool need_load_pk_index_and_bf) {
+                                    bool need_load_pk_index_and_bf, bool 
disable_file_cache) {
     if (cache_handle->is_inited()) {
         return Status::OK();
     }
@@ -62,7 +62,7 @@ Status SegmentLoader::load_segments(const 
BetaRowsetSharedPtr& rowset,
             continue;
         }
         segment_v2::SegmentSharedPtr segment;
-        RETURN_IF_ERROR(rowset->load_segment(i, &segment));
+        RETURN_IF_ERROR(rowset->load_segment(i, &segment, disable_file_cache));
         if (need_load_pk_index_and_bf) {
             RETURN_IF_ERROR(segment->load_pk_index_and_bf());
         }
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index 5bb8fae3c41..fc2f0d8c03f 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -118,7 +118,8 @@ public:
     // Load segments of "rowset", return the "cache_handle" which contains 
segments.
     // If use_cache is true, it will be loaded from _cache.
     Status load_segments(const BetaRowsetSharedPtr& rowset, 
SegmentCacheHandle* cache_handle,
-                         bool use_cache = false, bool 
need_load_pk_index_and_bf = false);
+                         bool use_cache = false, bool 
need_load_pk_index_and_bf = false,
+                         bool disable_file_cache = false);
 
     void erase_segment(const SegmentCache::CacheKey& key);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 3cb44d5b5ee..1aff6159d9a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1946,6 +1946,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String IGNORE_SHAPE_NODE = "ignore_shape_nodes";
 
+    public static final String ENABLE_SEGMENT_CACHE = "enable_segment_cache";
+
     public Set<String> getIgnoreShapePlanNodes() {
         return 
Arrays.stream(ignoreShapePlanNodes.split(",[\\s]*")).collect(ImmutableSet.toImmutableSet());
     }
@@ -2063,6 +2065,11 @@ public class SessionVariable implements Serializable, 
Writable {
     })
     public boolean useMaxLengthOfVarcharInCtas = true;
 
+    // Whether enable segment cache. Segment cache only works when FE's query 
options sets enableSegmentCache true
+    // along with BE's config `disable_segment_cache` false
+    @VariableMgr.VarAttr(name = ENABLE_SEGMENT_CACHE, needForward = true)
+    public boolean enableSegmentCache = true;
+
     /**
      * When enabling shard scroll, FE will plan scan ranges by shards of ES 
indices.
      * Otherwise, FE will plan a single query to ES.
@@ -3526,6 +3533,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.loadStreamPerNode = loadStreamPerNode;
     }
 
+    public void setEnableSegmentCache(boolean value) {
+        this.enableSegmentCache = value;
+    }
+
+    public boolean isEnableSegmentCache() {
+        return this.enableSegmentCache;
+    }
+
     /**
      * Serialize to thrift object.
      * Used for rest api.
@@ -3660,6 +3675,8 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames);
         tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames);
         tResult.setKeepCarriageReturn(keepCarriageReturn);
+
+        tResult.setEnableSegmentCache(enableSegmentCache);
         return tResult;
     }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 7aa640f80b8..a75e06f358b 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -308,7 +308,7 @@ struct TQueryOptions {
   113: optional bool enable_local_merge_sort = false;
 
   114: optional bool enable_parallel_result_sink = false;
-  
+
   115: optional bool enable_short_circuit_query_access_column_store = false;
 
   116: optional bool enable_no_need_read_data_opt = true;
@@ -324,13 +324,17 @@ struct TQueryOptions {
   121: optional bool keep_carriage_return = false; // \n,\r\n split line in 
CSV.
 
   122: optional i32 runtime_bloom_filter_min_size = 1048576;
-  
+
   //Access Parquet/ORC columns by name by default. Set this property to 
`false` to access columns
   //by their ordinal position in the Hive table definition.  
   123: optional bool hive_parquet_use_column_names = true;
   124: optional bool hive_orc_use_column_names = true;
 
+  125: optional bool enable_segment_cache = true;
+
   // For cloud, to control if the content would be written into file cache
+  // In write path, to control if the content would be written into file cache.
+  // In read path, read from file cache or remote storage when execute query.
   1000: optional bool disable_file_cache = false
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to