This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e3b81e7e3fb [feature-WIP](query cache) cache tablets aggregate result, 
BE part (#40171)
e3b81e7e3fb is described below

commit e3b81e7e3fb20fb0be857650bfda45e67678b3af
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Wed Sep 11 14:28:06 2024 +0800

    [feature-WIP](query cache) cache tablets aggregate result, BE part (#40171)
    
    support cache tablets aggregate result
    
    for example
    
    SQL 1:
    ```sql
    select key, sum(value)
    from tbl
    where dt between '2024-08-01' and '2024-08-10'
    group by key
    ```
    
    SQL 2:
    ```sql
    select key, sum(value)
    from tbl
    where dt between '2024-08-5' and '2024-08-15'
    group by key
    ```
    
    SQL 1 will update the tablets aggregate result which partition between
    '2024-08-01' and '2024-08-10'.
    Then SQL 2 will reuse the tablets aggregate which partition between
    '2024-08-05' and '2024-08-10', and compute aggregate which partition
    between '2024-08-11' and '2024-08-15'
    
    We only support simple aggregate which not contains join with runtime
    filter, at present.
    
    # How to use
    
    ```sql
    set enable_query_cache=true;
    ```
---
 be/src/common/config.cpp                       |   2 +
 be/src/common/config.h                         |   3 +
 be/src/pipeline/dependency.h                   |   7 +
 be/src/pipeline/exec/cache_sink_operator.cpp   |  73 +++++++++
 be/src/pipeline/exec/cache_sink_operator.h     |  73 +++++++++
 be/src/pipeline/exec/cache_source_operator.cpp | 199 +++++++++++++++++++++++++
 be/src/pipeline/exec/cache_source_operator.h   | 104 +++++++++++++
 be/src/pipeline/exec/olap_scan_operator.cpp    |  34 +++--
 be/src/pipeline/exec/olap_scan_operator.h      |   4 +-
 be/src/pipeline/exec/operator.cpp              |   7 +-
 be/src/pipeline/pipeline_fragment_context.cpp  |  80 ++++++++--
 be/src/pipeline/query_cache/query_cache.cpp    |  70 +++++++++
 be/src/pipeline/query_cache/query_cache.h      | 151 +++++++++++++++++++
 be/src/runtime/exec_env.h                      |   7 +
 be/src/runtime/exec_env_init.cpp               |   9 +-
 be/src/runtime/memory/cache_policy.h           |   3 +
 be/src/runtime/memory/mem_tracker_limiter.cpp  |   2 +-
 be/src/vec/core/block.cpp                      |   2 -
 18 files changed, 804 insertions(+), 26 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 0c00bd1a38f..df8240cb234 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1327,6 +1327,8 @@ DEFINE_mInt32(lz4_compression_block_size, "262144");
 
 DEFINE_mBool(enable_pipeline_task_leakage_detect, "false");
 
+DEFINE_Int32(query_cache_size, "512");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 720f4f72cb4..5bca9ac280a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1415,6 +1415,9 @@ DECLARE_mInt32(lz4_compression_block_size);
 
 DECLARE_mBool(enable_pipeline_task_leakage_detect);
 
+// MB
+DECLARE_Int32(query_cache_size);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index f7990c097ef..e5738e48f93 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -35,6 +35,7 @@
 #include "pipeline/exec/join/process_hash_table_probe.h"
 #include "vec/common/sort/partition_sorter.h"
 #include "vec/common/sort/sorter.h"
+#include "vec/core/block.h"
 #include "vec/core/types.h"
 #include "vec/spill/spill_stream.h"
 
@@ -541,6 +542,12 @@ public:
     const int _child_count;
 };
 
+struct CacheSharedState : public BasicSharedState {
+    ENABLE_FACTORY_CREATOR(CacheSharedState)
+public:
+    DataQueue data_queue;
+};
+
 class MultiCastDataStreamer;
 
 struct MultiCastSharedState : public BasicSharedState {
diff --git a/be/src/pipeline/exec/cache_sink_operator.cpp 
b/be/src/pipeline/exec/cache_sink_operator.cpp
new file mode 100644
index 00000000000..b8b5b534659
--- /dev/null
+++ b/be/src/pipeline/exec/cache_sink_operator.cpp
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cache_sink_operator.h"
+
+#include <utility>
+
+#include "common/compiler_util.h" // IWYU pragma: keep
+#include "common/status.h"
+#include "pipeline/exec/data_queue.h"
+#include "pipeline/exec/operator.h"
+#include "runtime/runtime_state.h"
+#include "util/runtime_profile.h"
+
+namespace doris::pipeline {
+
+Status CacheSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& 
info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_init_timer);
+    _shared_state->data_queue.set_sink_dependency(_dependency, 0);
+    return Status::OK();
+}
+
+Status CacheSinkLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+    //    auto& p = _parent->cast<Parent>();
+
+    
_shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks());
+    return Status::OK();
+}
+
+CacheSinkOperatorX::CacheSinkOperatorX(int sink_id, int child_id)
+        : Base(sink_id, child_id, child_id) {
+    _name = "CACHE_SINK_OPERATOR";
+}
+
+Status CacheSinkOperatorX::open(RuntimeState* state) {
+    return Status::OK();
+}
+
+Status CacheSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos) {
+    auto& local_state = get_local_state(state);
+    SCOPED_TIMER(local_state.exec_time_counter());
+    COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
+
+    if (in_block->rows() > 0) {
+        local_state._shared_state->data_queue.push_block(
+                vectorized::Block::create_unique(std::move(*in_block)), 0);
+    }
+    if (UNLIKELY(eos)) {
+        local_state._shared_state->data_queue.set_finish(0);
+    }
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/cache_sink_operator.h 
b/be/src/pipeline/exec/cache_sink_operator.h
new file mode 100644
index 00000000000..9c4beb48df2
--- /dev/null
+++ b/be/src/pipeline/exec/cache_sink_operator.h
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "vec/core/block.h"
+
+namespace doris {
+class RuntimeState;
+
+namespace pipeline {
+class DataQueue;
+
+class CacheSinkOperatorX;
+class CacheSinkLocalState final : public 
PipelineXSinkLocalState<CacheSharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(CacheSinkLocalState);
+    CacheSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : 
Base(parent, state) {}
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
+    Status open(RuntimeState* state) override;
+    friend class CacheSinkOperatorX;
+    using Base = PipelineXSinkLocalState<CacheSharedState>;
+    using Parent = CacheSinkOperatorX;
+};
+
+class CacheSinkOperatorX final : public DataSinkOperatorX<CacheSinkLocalState> 
{
+public:
+    using Base = DataSinkOperatorX<CacheSinkLocalState>;
+
+    friend class CacheSinkLocalState;
+    CacheSinkOperatorX(int sink_id, int child_id);
+    ~CacheSinkOperatorX() override = default;
+    Status init(const TDataSink& tsink) override {
+        return Status::InternalError("{} should not init with TDataSink",
+                                     
DataSinkOperatorX<CacheSinkLocalState>::_name);
+    }
+
+    Status open(RuntimeState* state) override;
+
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
+
+    std::shared_ptr<BasicSharedState> create_shared_state() const override {
+        std::shared_ptr<BasicSharedState> ss = 
std::make_shared<CacheSharedState>();
+        ss->id = operator_id();
+        for (auto& dest : dests_id()) {
+            ss->related_op_ids.insert(dest);
+        }
+        return ss;
+    }
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/cache_source_operator.cpp 
b/be/src/pipeline/exec/cache_source_operator.cpp
new file mode 100644
index 00000000000..5f8c5befc6a
--- /dev/null
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/cache_source_operator.h"
+
+#include <functional>
+#include <utility>
+
+#include "common/status.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/operator.h"
+#include "vec/core/block.h"
+
+namespace doris {
+class RuntimeState;
+
+namespace pipeline {
+
+Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+    RETURN_IF_ERROR(Base::init(state, info));
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_init_timer);
+    ((CacheSharedState*)_dependency->shared_state())
+            
->data_queue.set_source_dependency(_shared_state->source_deps.front());
+    const auto& scan_ranges = info.scan_ranges;
+    bool hit_cache = false;
+    if (scan_ranges.size() > 1) {
+        return Status::InternalError("CacheSourceOperator only support one 
scan range, plan error");
+    }
+
+    const auto& cache_param = 
_parent->cast<CacheSourceOperatorX>()._cache_param;
+    // 1. init the slot orders
+    const auto& tuple_descs = 
_parent->cast<CacheSourceOperatorX>().row_desc().tuple_descriptors();
+    for (auto tuple_desc : tuple_descs) {
+        for (auto slot_desc : tuple_desc->slots()) {
+            if (cache_param.output_slot_mapping.find(slot_desc->id()) !=
+                cache_param.output_slot_mapping.end()) {
+                
_slot_orders.emplace_back(cache_param.output_slot_mapping.at(slot_desc->id()));
+            } else {
+                return Status::InternalError(
+                        fmt::format("Cache can find the mapping slot id {}, 
node id {}",
+                                    slot_desc->id(), cache_param.node_id));
+            }
+        }
+    }
+
+    // 2. build cache key by digest_tablet_id
+    RETURN_IF_ERROR(QueryCache::build_cache_key(scan_ranges, cache_param, 
&_cache_key, &_version));
+    _runtime_profile->add_info_string(
+            "CacheTabletId", 
std::to_string(scan_ranges[0].scan_range.palo_scan_range.tablet_id));
+
+    // 3. lookup the cache and find proper slot order
+    hit_cache = QueryCache::instance()->lookup(_cache_key, _version, 
&_query_cache_handle);
+    _runtime_profile->add_info_string("HitCache", hit_cache ? "1" : "0");
+    if (hit_cache && !cache_param.force_refresh_query_cache) {
+        _hit_cache_results = _query_cache_handle.get_cache_result();
+        auto hit_cache_slot_orders = 
_query_cache_handle.get_cache_slot_orders();
+
+        bool need_reorder = _slot_orders.size() != 
hit_cache_slot_orders->size();
+        if (!need_reorder) {
+            for (int i = 0; i < _slot_orders.size(); ++i) {
+                need_reorder = _slot_orders[i] != (*hit_cache_slot_orders)[i];
+            }
+        }
+
+        if (need_reorder) {
+            for (auto slot_id : _slot_orders) {
+                auto find_res = std::find(hit_cache_slot_orders->begin(),
+                                          hit_cache_slot_orders->end(), 
slot_id);
+                if (find_res != hit_cache_slot_orders->end()) {
+                    _hit_cache_column_orders.emplace_back(find_res -
+                                                          
hit_cache_slot_orders->begin());
+                } else {
+                    return Status::InternalError(fmt::format(
+                            "Cache can find the mapping slot id {}, node id 
{}, "
+                            "hit_cache_column_orders [{}]",
+                            slot_id, cache_param.node_id, 
fmt::join(*hit_cache_slot_orders, ",")));
+                }
+            }
+        }
+    }
+
+    return Status::OK();
+}
+
+Status CacheSourceLocalState::open(RuntimeState* state) {
+    SCOPED_TIMER(exec_time_counter());
+    SCOPED_TIMER(_open_timer);
+    RETURN_IF_ERROR(Base::open(state));
+
+    return Status::OK();
+}
+
+std::string CacheSourceLocalState::debug_string(int indentation_level) const {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}", 
Base::debug_string(indentation_level));
+    if (_shared_state) {
+        fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = 
{}, has_data = {})",
+                       _shared_state->data_queue.is_all_finish(),
+                       _shared_state->data_queue.remaining_has_data());
+    }
+    return fmt::to_string(debug_string_buffer);
+}
+
+Status CacheSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
+    auto& local_state = get_local_state(state);
+    SCOPED_TIMER(local_state.exec_time_counter());
+
+    block->clear_column_data(_row_descriptor.num_materialized_slots());
+    bool need_clone_empty = block->columns() == 0;
+
+    if (local_state._hit_cache_results == nullptr) {
+        Defer insert_cache([&] {
+            if (*eos && local_state._need_insert_cache) {
+                local_state._runtime_profile->add_info_string("InsertCache", 
"1");
+                local_state._global_cache->insert(local_state._cache_key, 
local_state._version,
+                                                  
local_state._local_cache_blocks,
+                                                  local_state._slot_orders,
+                                                  
local_state._current_query_cache_bytes);
+                local_state._local_cache_blocks.clear();
+            }
+        });
+
+        std::unique_ptr<vectorized::Block> output_block;
+        int child_idx = 0;
+        
RETURN_IF_ERROR(local_state._shared_state->data_queue.get_block_from_queue(&output_block,
+                                                                               
    &child_idx));
+        // Here, check the value of `_has_data(state)` again after 
`data_queue.is_all_finish()` is TRUE
+        // as there may be one or more blocks when 
`data_queue.is_all_finish()` is TRUE.
+        *eos = !_has_data(state) && 
local_state._shared_state->data_queue.is_all_finish();
+
+        if (!output_block) {
+            return Status::OK();
+        }
+
+        if (local_state._need_insert_cache) {
+            if (need_clone_empty) {
+                *block = output_block->clone_empty();
+            }
+            RETURN_IF_ERROR(
+                    
vectorized::MutableBlock::build_mutable_block(block).merge(*output_block));
+            local_state._current_query_cache_rows += output_block->rows();
+            auto mem_consume = output_block->allocated_bytes();
+            local_state._current_query_cache_bytes += mem_consume;
+            local_state._mem_tracker->consume(mem_consume);
+
+            if (_cache_param.entry_max_bytes < 
local_state._current_query_cache_bytes ||
+                _cache_param.entry_max_rows < 
local_state._current_query_cache_rows) {
+                // over the max bytes, pass through the data, no need to do 
cache
+                local_state._local_cache_blocks.clear();
+                local_state._need_insert_cache = false;
+                local_state._runtime_profile->add_info_string("InsertCache", 
"0");
+            } else {
+                
local_state._local_cache_blocks.emplace_back(std::move(output_block));
+            }
+        } else {
+            *block = std::move(*output_block);
+        }
+    } else {
+        if (local_state._hit_cache_pos < 
local_state._hit_cache_results->size()) {
+            const auto& hit_cache_block =
+                    
local_state._hit_cache_results->at(local_state._hit_cache_pos++);
+            if (need_clone_empty) {
+                *block = hit_cache_block->clone_empty();
+            }
+            RETURN_IF_ERROR(
+                    
vectorized::MutableBlock::build_mutable_block(block).merge(*hit_cache_block));
+            if (!local_state._hit_cache_column_orders.empty()) {
+                auto datas = block->get_columns_with_type_and_name();
+                block->clear();
+                for (auto loc : local_state._hit_cache_column_orders) {
+                    block->insert(datas[loc]);
+                }
+            }
+        } else {
+            *eos = true;
+        }
+    }
+
+    local_state.reached_limit(block, eos);
+    return Status::OK();
+}
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/cache_source_operator.h 
b/be/src/pipeline/exec/cache_source_operator.h
new file mode 100644
index 00000000000..e764323846b
--- /dev/null
+++ b/be/src/pipeline/exec/cache_source_operator.h
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <stdint.h>
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "pipeline/query_cache/query_cache.h"
+
+namespace doris {
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
+namespace pipeline {
+class DataQueue;
+
+class CacheSourceOperatorX;
+class CacheSourceLocalState final : public 
PipelineXLocalState<CacheSharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(CacheSourceLocalState);
+    using Base = PipelineXLocalState<CacheSharedState>;
+    using Parent = CacheSourceOperatorX;
+    CacheSourceLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {};
+
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
+
+    [[nodiscard]] std::string debug_string(int indentation_level = 0) const 
override;
+
+private:
+    friend class CacheSourceOperatorX;
+    friend class OperatorX<CacheSourceLocalState>;
+
+    QueryCache* _global_cache = QueryCache::instance();
+
+    std::string _cache_key {};
+    int64_t _version = 0;
+    std::vector<vectorized::BlockUPtr> _local_cache_blocks;
+    std::vector<int> _slot_orders;
+    size_t _current_query_cache_bytes = 0;
+    size_t _current_query_cache_rows = 0;
+    bool _need_insert_cache = true;
+
+    QueryCacheHandle _query_cache_handle;
+    std::vector<vectorized::BlockUPtr>* _hit_cache_results = nullptr;
+    std::vector<int> _hit_cache_column_orders;
+    int _hit_cache_pos = 0;
+};
+
+class CacheSourceOperatorX final : public OperatorX<CacheSourceLocalState> {
+public:
+    using Base = OperatorX<CacheSourceLocalState>;
+    CacheSourceOperatorX(ObjectPool* pool, int plan_node_id, int operator_id,
+                         const TQueryCacheParam& cache_param)
+            : Base(pool, plan_node_id, operator_id), _cache_param(cache_param) 
{
+        _op_name = "CACHE_SOURCE_OPERATOR";
+    };
+    ~CacheSourceOperatorX() override = default;
+    Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
+
+    bool is_source() const override { return true; }
+
+    Status open(RuntimeState* state) override {
+        static_cast<void>(Base::open(state));
+        return Status::OK();
+    }
+
+    const RowDescriptor& intermediate_row_desc() const override {
+        return _child->intermediate_row_desc();
+    }
+    RowDescriptor& row_descriptor() override { return 
_child->row_descriptor(); }
+    const RowDescriptor& row_desc() const override { return 
_child->row_desc(); }
+
+private:
+    TQueryCacheParam _cache_param;
+    bool _has_data(RuntimeState* state) const {
+        auto& local_state = get_local_state(state);
+        return local_state._shared_state->data_queue.remaining_has_data();
+    }
+    friend class CacheSourceLocalState;
+};
+
+} // namespace pipeline
+} // namespace doris
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index adba2b20185..172aa7a9c87 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -29,15 +29,13 @@
 #include "olap/tablet_manager.h"
 #include "pipeline/common/runtime_filter_consumer.h"
 #include "pipeline/exec/scan_operator.h"
+#include "pipeline/query_cache/query_cache.h"
 #include "service/backend_options.h"
 #include "util/to_string.h"
 #include "vec/exec/scan/new_olap_scanner.h"
-#include "vec/exec/scan/vscan_node.h"
-#include "vec/exprs/vcompound_pred.h"
 #include "vec/exprs/vectorized_fn_call.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
-#include "vec/exprs/vin_predicate.h"
 #include "vec/exprs/vslot_ref.h"
 #include "vec/functions/in.h"
 
@@ -147,7 +145,6 @@ Status OlapScanLocalState::_init_profile() {
             ADD_COUNTER(_segment_profile, "InvertedIndexDowngradeCount", 
TUnit::UNIT);
 
     _output_index_result_column_timer = ADD_TIMER(_segment_profile, 
"OutputIndexResultColumnTimer");
-
     _filtered_segment_counter = ADD_COUNTER(_segment_profile, 
"NumSegmentFiltered", TUnit::UNIT);
     _total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", 
TUnit::UNIT);
     _tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT);
@@ -397,10 +394,25 @@ TOlapScanNode& OlapScanLocalState::olap_scan_node() const 
{
 
 void OlapScanLocalState::set_scan_ranges(RuntimeState* state,
                                          const std::vector<TScanRangeParams>& 
scan_ranges) {
-    for (auto& scan_range : scan_ranges) {
-        DCHECK(scan_range.scan_range.__isset.palo_scan_range);
-        _scan_ranges.emplace_back(new 
TPaloScanRange(scan_range.scan_range.palo_scan_range));
-        COUNTER_UPDATE(_tablet_counter, 1);
+    const auto& cache_param = _parent->cast<OlapScanOperatorX>()._cache_param;
+    bool hit_cache = false;
+    if (!cache_param.digest.empty() && !cache_param.force_refresh_query_cache) 
{
+        std::string cache_key;
+        int64_t version = 0;
+        auto status = QueryCache::build_cache_key(scan_ranges, cache_param, 
&cache_key, &version);
+        if (!status.ok()) {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, 
status.msg());
+        }
+        doris::QueryCacheHandle handle;
+        hit_cache = QueryCache::instance()->lookup(cache_key, version, 
&handle);
+    }
+
+    if (!hit_cache) {
+        for (auto& scan_range : scan_ranges) {
+            DCHECK(scan_range.scan_range.__isset.palo_scan_range);
+            _scan_ranges.emplace_back(new 
TPaloScanRange(scan_range.scan_range.palo_scan_range));
+            COUNTER_UPDATE(_tablet_counter, 1);
+        }
     }
 }
 
@@ -568,9 +580,11 @@ void OlapScanLocalState::add_filter_info(int id, const 
PredicateFilterInfo& upda
 }
 
 OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, 
int operator_id,
-                                     const DescriptorTbl& descs, int 
parallel_tasks)
+                                     const DescriptorTbl& descs, int 
parallel_tasks,
+                                     const TQueryCacheParam& param)
         : ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs, 
parallel_tasks),
-          _olap_scan_node(tnode.olap_scan_node) {
+          _olap_scan_node(tnode.olap_scan_node),
+          _cache_param(param) {
     _output_tuple_id = tnode.olap_scan_node.tuple_id;
     if (_olap_scan_node.__isset.sort_info && 
_olap_scan_node.__isset.sort_limit) {
         _limit_per_scanner = _olap_scan_node.sort_limit;
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index 6a03a46e65e..4465ce5690e 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -198,11 +198,13 @@ private:
 class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
 public:
     OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
-                      const DescriptorTbl& descs, int parallel_tasks);
+                      const DescriptorTbl& descs, int parallel_tasks,
+                      const TQueryCacheParam& cache_param);
 
 private:
     friend class OlapScanLocalState;
     TOlapScanNode _olap_scan_node;
+    TQueryCacheParam _cache_param;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index d65769254b9..4a93bac67fe 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -17,7 +17,6 @@
 
 #include "operator.h"
 
-#include "common/logging.h"
 #include "common/status.h"
 #include "pipeline/dependency.h"
 #include "pipeline/exec/aggregation_sink_operator.h"
@@ -25,6 +24,8 @@
 #include "pipeline/exec/analytic_sink_operator.h"
 #include "pipeline/exec/analytic_source_operator.h"
 #include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/cache_sink_operator.h"
+#include "pipeline/exec/cache_source_operator.h"
 #include "pipeline/exec/datagen_operator.h"
 #include "pipeline/exec/distinct_streaming_aggregation_operator.h"
 #include "pipeline/exec/empty_set_operator.h"
@@ -694,6 +695,7 @@ DECLARE_OPERATOR(SetSinkLocalState<true>)
 DECLARE_OPERATOR(SetSinkLocalState<false>)
 DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
 DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
+DECLARE_OPERATOR(CacheSinkLocalState)
 
 #undef DECLARE_OPERATOR
 
@@ -725,6 +727,7 @@ DECLARE_OPERATOR(SchemaScanLocalState)
 DECLARE_OPERATOR(MetaScanLocalState)
 DECLARE_OPERATOR(LocalExchangeSourceLocalState)
 DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
+DECLARE_OPERATOR(CacheSourceLocalState)
 
 #undef DECLARE_OPERATOR
 
@@ -754,6 +757,7 @@ template class 
PipelineXSinkLocalState<MultiCastSharedState>;
 template class PipelineXSinkLocalState<SetSharedState>;
 template class PipelineXSinkLocalState<LocalExchangeSharedState>;
 template class PipelineXSinkLocalState<BasicSharedState>;
+template class PipelineXSinkLocalState<CacheSharedState>;
 
 template class PipelineXLocalState<HashJoinSharedState>;
 template class PipelineXLocalState<PartitionedHashJoinSharedState>;
@@ -765,6 +769,7 @@ template class PipelineXLocalState<AggSharedState>;
 template class PipelineXLocalState<PartitionedAggSharedState>;
 template class PipelineXLocalState<FakeSharedState>;
 template class PipelineXLocalState<UnionSharedState>;
+template class PipelineXLocalState<CacheSharedState>;
 template class PipelineXLocalState<MultiCastSharedState>;
 template class PipelineXLocalState<PartitionSortNodeSharedState>;
 template class PipelineXLocalState<SetSharedState>;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index a489273b68d..3513c2ba176 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -43,6 +43,8 @@
 #include "pipeline/exec/analytic_sink_operator.h"
 #include "pipeline/exec/analytic_source_operator.h"
 #include "pipeline/exec/assert_num_rows_operator.h"
+#include "pipeline/exec/cache_sink_operator.h"
+#include "pipeline/exec/cache_source_operator.h"
 #include "pipeline/exec/datagen_operator.h"
 #include "pipeline/exec/distinct_streaming_aggregation_operator.h"
 #include "pipeline/exec/empty_set_operator.h"
@@ -1168,10 +1170,13 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     // Therefore, here we need to use a stack-like structure.
     _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
     std::stringstream error_msg;
+    bool enable_query_cache = request.fragment.__isset.query_cache_param;
 
     switch (tnode.node_type) {
     case TPlanNodeType::OLAP_SCAN_NODE: {
-        op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs, 
_num_instances));
+        op.reset(new OlapScanOperatorX(
+                pool, tnode, next_operator_id(), descs, _num_instances,
+                enable_query_cache ? request.fragment.query_cache_param : 
TQueryCacheParam {}));
         RETURN_IF_ERROR(cur_pipe->add_operator(op));
         if (request.__isset.parallel_instances) {
             cur_pipe->set_num_tasks(request.parallel_instances);
@@ -1244,6 +1249,26 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                                          ": group by and output is empty");
         }
 
+        auto create_query_cache_operator = [&](PipelinePtr& new_pipe) {
+            auto cache_node_id = 
request.local_params[0].per_node_scan_ranges.begin()->first;
+            auto cache_source_id = next_operator_id();
+            op.reset(new CacheSourceOperatorX(pool, cache_node_id, 
cache_source_id,
+                                              
request.fragment.query_cache_param));
+            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+
+            const auto downstream_pipeline_id = cur_pipe->id();
+            if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+                _dag.insert({downstream_pipeline_id, {}});
+            }
+            new_pipe = add_pipeline(cur_pipe);
+            _dag[downstream_pipeline_id].push_back(new_pipe->id());
+
+            DataSinkOperatorPtr cache_sink(
+                    new CacheSinkOperatorX(next_sink_operator_id(), 
cache_source_id));
+            cache_sink->set_dests_id({op->operator_id()});
+            RETURN_IF_ERROR(new_pipe->set_sink(cache_sink));
+            return Status::OK();
+        };
         const bool group_by_limit_opt =
                 tnode.agg_node.__isset.agg_sort_info_by_group_key && 
tnode.limit > 0;
 
@@ -1256,24 +1281,59 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             
request.query_options.__isset.enable_distinct_streaming_aggregation &&
             request.query_options.enable_distinct_streaming_aggregation &&
             !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) {
-            op.reset(new DistinctStreamingAggOperatorX(pool, 
next_operator_id(), tnode, descs,
-                                                       
_require_bucket_distribution));
-            op->set_followed_by_shuffled_join(followed_by_shuffled_join);
-            _require_bucket_distribution =
-                    _require_bucket_distribution || 
op->require_data_distribution();
-            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            if (enable_query_cache) {
+                PipelinePtr new_pipe;
+                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
+
+                op.reset(new DistinctStreamingAggOperatorX(pool, 
next_operator_id(), tnode, descs,
+                                                           
_require_bucket_distribution));
+                op->set_followed_by_shuffled_join(false);
+                _require_bucket_distribution = true;
+                RETURN_IF_ERROR(new_pipe->add_operator(op));
+                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
+                cur_pipe = new_pipe;
+            } else {
+                op.reset(new DistinctStreamingAggOperatorX(pool, 
next_operator_id(), tnode, descs,
+                                                           
_require_bucket_distribution));
+                op->set_followed_by_shuffled_join(followed_by_shuffled_join);
+                _require_bucket_distribution =
+                        _require_bucket_distribution || 
op->require_data_distribution();
+                RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            }
         } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
                    tnode.agg_node.use_streaming_preaggregation &&
                    !tnode.agg_node.grouping_exprs.empty()) {
-            op.reset(new StreamingAggOperatorX(pool, next_operator_id(), 
tnode, descs));
-            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            if (enable_query_cache) {
+                PipelinePtr new_pipe;
+                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
+
+                op.reset(new StreamingAggOperatorX(pool, next_operator_id(), 
tnode, descs));
+                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
+                RETURN_IF_ERROR(new_pipe->add_operator(op));
+                cur_pipe = new_pipe;
+            } else {
+                op.reset(new StreamingAggOperatorX(pool, next_operator_id(), 
tnode, descs));
+                RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            }
         } else {
+            // create new pipeline to add query cache operator
+            PipelinePtr new_pipe;
+            if (enable_query_cache) {
+                RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
+            }
+
             if (enable_spill) {
                 op.reset(new PartitionedAggSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
             } else {
                 op.reset(new AggSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
             }
-            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            if (enable_query_cache) {
+                RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
+                RETURN_IF_ERROR(new_pipe->add_operator(op));
+                cur_pipe = new_pipe;
+            } else {
+                RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            }
 
             const auto downstream_pipeline_id = cur_pipe->id();
             if (_dag.find(downstream_pipeline_id) == _dag.end()) {
diff --git a/be/src/pipeline/query_cache/query_cache.cpp 
b/be/src/pipeline/query_cache/query_cache.cpp
new file mode 100644
index 00000000000..e6d41ecaba5
--- /dev/null
+++ b/be/src/pipeline/query_cache/query_cache.cpp
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "query_cache.h"
+
+namespace doris {
+
+std::vector<int>* QueryCacheHandle::get_cache_slot_orders() {
+    DCHECK(_handle);
+    auto result_ptr = reinterpret_cast<LRUHandle*>(_handle)->value;
+    return &((QueryCache::CacheValue*)(result_ptr))->slot_orders;
+}
+
+CacheResult* QueryCacheHandle::get_cache_result() {
+    DCHECK(_handle);
+    auto result_ptr = reinterpret_cast<LRUHandle*>(_handle)->value;
+    return &((QueryCache::CacheValue*)(result_ptr))->result;
+}
+
+int64_t QueryCacheHandle::get_cache_version() {
+    DCHECK(_handle);
+    auto result_ptr = reinterpret_cast<LRUHandle*>(_handle)->value;
+    return ((QueryCache::CacheValue*)(result_ptr))->version;
+}
+
+void QueryCache::insert(const CacheKey& key, int64_t version, CacheResult& res,
+                        const std::vector<int>& slot_orders, int64_t 
cache_size) {
+    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->query_cache_mem_tracker());
+    CacheResult cache_result;
+    for (auto& block_data : res) {
+        cache_result.emplace_back(vectorized::Block::create_unique())
+                ->swap(block_data->clone_empty());
+        
(void)vectorized::MutableBlock(cache_result.back().get()).merge(*block_data);
+    }
+    auto cache_value_ptr =
+            std::make_unique<QueryCache::CacheValue>(version, 
std::move(cache_result), slot_orders);
+
+    QueryCacheHandle(this, LRUCachePolicyTrackingManual::insert(
+                                   key, (void*)cache_value_ptr.release(), 
cache_size, cache_size,
+                                   CachePriority::NORMAL));
+}
+
+bool QueryCache::lookup(const CacheKey& key, int64_t version, 
doris::QueryCacheHandle* handle) {
+    
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->query_cache_mem_tracker());
+    auto* lru_handle = LRUCachePolicy::lookup(key);
+    if (lru_handle) {
+        QueryCacheHandle tmp_handle(this, lru_handle);
+        if (tmp_handle.get_cache_version() == version) {
+            *handle = std::move(tmp_handle);
+            return true;
+        }
+    }
+    return false;
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/query_cache/query_cache.h 
b/be/src/pipeline/query_cache/query_cache.h
new file mode 100644
index 00000000000..6ec00b91f78
--- /dev/null
+++ b/be/src/pipeline/query_cache/query_cache.h
@@ -0,0 +1,151 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <butil/macros.h>
+#include <glog/logging.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <roaring/roaring.hh>
+#include <string>
+
+#include "common/config.h"
+#include "common/status.h"
+#include "io/fs/file_system.h"
+#include "io/fs/path.h"
+#include "olap/lru_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/lru_cache_policy.h"
+#include "runtime/memory/mem_tracker.h"
+#include "util/slice.h"
+#include "util/time.h"
+
+namespace doris {
+
+using CacheResult = std::vector<vectorized::BlockUPtr>;
+// A handle for mid-result from query lru cache.
+// The handle will automatically release the cache entry when it is destroyed.
+// So the caller need to make sure the handle is valid in lifecycle.
+class QueryCacheHandle {
+public:
+    QueryCacheHandle() = default;
+    QueryCacheHandle(LRUCachePolicy* cache, Cache::Handle* handle)
+            : _cache(cache), _handle(handle) {}
+
+    ~QueryCacheHandle() {
+        if (_handle != nullptr) {
+            CHECK(_cache != nullptr);
+            {
+                SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+                        ExecEnv::GetInstance()->query_cache_mem_tracker());
+                _cache->release(_handle);
+            }
+        }
+    }
+
+    QueryCacheHandle(QueryCacheHandle&& other) noexcept {
+        std::swap(_cache, other._cache);
+        std::swap(_handle, other._handle);
+    }
+
+    QueryCacheHandle& operator=(QueryCacheHandle&& other) noexcept {
+        std::swap(_cache, other._cache);
+        std::swap(_handle, other._handle);
+        return *this;
+    }
+
+    std::vector<int>* get_cache_slot_orders();
+
+    CacheResult* get_cache_result();
+
+    int64_t get_cache_version();
+
+private:
+    LRUCachePolicy* _cache = nullptr;
+    Cache::Handle* _handle = nullptr;
+
+    // Don't allow copy and assign
+    DISALLOW_COPY_AND_ASSIGN(QueryCacheHandle);
+};
+
+class QueryCache : public LRUCachePolicyTrackingManual {
+public:
+    using LRUCachePolicyTrackingManual::insert;
+
+    struct CacheValue : public LRUCacheValueBase {
+        int64_t version;
+        CacheResult result;
+        std::vector<int> slot_orders;
+
+        CacheValue(int64_t v, CacheResult&& r, const std::vector<int>& so)
+                : LRUCacheValueBase(), version(v), result(std::move(r)), 
slot_orders(so) {}
+    };
+
+    // Create global instance of this class
+    static QueryCache* create_global_cache(size_t capacity, uint32_t 
num_shards = 16) {
+        auto* res = new QueryCache(capacity, num_shards);
+        return res;
+    }
+
+    static Status build_cache_key(const std::vector<TScanRangeParams>& 
scan_ranges,
+                                  const TQueryCacheParam& cache_param, 
std::string* cache_key,
+                                  int64_t* version) {
+        if (scan_ranges.size() > 1) {
+            return Status::InternalError(
+                    "CacheSourceOperator only support one scan range, plan 
error");
+        }
+        auto& scan_range = scan_ranges[0];
+        DCHECK(scan_range.scan_range.__isset.palo_scan_range);
+        auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
+
+        std::from_chars(scan_range.scan_range.palo_scan_range.version.data(),
+                        scan_range.scan_range.palo_scan_range.version.data() +
+                                
scan_range.scan_range.palo_scan_range.version.size(),
+                        *version);
+
+        auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
+        if (find_tablet == cache_param.tablet_to_range.end()) {
+            return Status::InternalError("Not find tablet in 
partition_to_tablets, plan error");
+        }
+
+        *cache_key = cache_param.digest +
+                     std::string(reinterpret_cast<char*>(&tablet_id), 
sizeof(tablet_id)) +
+                     find_tablet->second;
+
+        return Status::OK();
+    }
+
+    // Return global instance.
+    // Client should call create_global_cache before.
+    static QueryCache* instance() { return 
ExecEnv::GetInstance()->get_query_cache(); }
+
+    QueryCache() = delete;
+
+    QueryCache(size_t capacity, uint32_t num_shards)
+            : 
LRUCachePolicyTrackingManual(CachePolicy::CacheType::QUERY_CACHE, capacity,
+                                           LRUCacheType::SIZE, 3600 * 24, 
num_shards) {}
+
+    bool lookup(const CacheKey& key, int64_t version, QueryCacheHandle* 
handle);
+
+    void insert(const CacheKey& key, int64_t version, CacheResult& result,
+                const std::vector<int>& solt_orders, int64_t cache_size);
+};
+} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 98d82f274e7..38fcaceb479 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -65,6 +65,7 @@ class InvertedIndexQueryCache;
 class TmpFileDirs;
 } // namespace segment_v2
 
+class QueryCache;
 class WorkloadSchedPolicyMgr;
 class BfdParser;
 class BrokerMgr;
@@ -185,6 +186,9 @@ public:
     std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() {
         return _point_query_executor_mem_tracker;
     }
+    std::shared_ptr<MemTrackerLimiter> query_cache_mem_tracker() {
+        return _query_cache_mem_tracker;
+    }
     std::shared_ptr<MemTrackerLimiter> block_compression_mem_tracker() {
         return _block_compression_mem_tracker;
     }
@@ -303,6 +307,7 @@ public:
     segment_v2::InvertedIndexQueryCache* get_inverted_index_query_cache() {
         return _inverted_index_query_cache;
     }
+    QueryCache* get_query_cache() { return _query_cache; }
     std::shared_ptr<DummyLRUCache> get_dummy_lru_cache() { return 
_dummy_lru_cache; }
 
     pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() {
@@ -365,6 +370,7 @@ private:
     // Tracking memory may be shared between multiple queries.
     std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker;
     std::shared_ptr<MemTrackerLimiter> _block_compression_mem_tracker;
+    std::shared_ptr<MemTrackerLimiter> _query_cache_mem_tracker;
 
     // TODO, looking forward to more accurate tracking.
     std::shared_ptr<MemTrackerLimiter> _rowid_storage_reader_tracker;
@@ -436,6 +442,7 @@ private:
     CacheManager* _cache_manager = nullptr;
     segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = 
nullptr;
     segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
+    QueryCache* _query_cache = nullptr;
     std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr;
     std::unique_ptr<io::FDCache> _file_cache_open_fd_cache;
 
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 834e402ca1e..a69709d24d2 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -56,6 +56,7 @@
 #include "olap/tablet_schema_cache.h"
 #include "olap/wal/wal_manager.h"
 #include "pipeline/pipeline_tracing.h"
+#include "pipeline/query_cache/query_cache.h"
 #include "pipeline/task_queue.h"
 #include "pipeline/task_scheduler.h"
 #include "runtime/broker_mgr.h"
@@ -578,6 +579,9 @@ Status ExecEnv::_init_mem_env() {
     _orc_memory_pool = new doris::vectorized::ORCMemoryPool();
     _arrow_memory_pool = new doris::vectorized::ArrowMemoryPool();
 
+    _query_cache = QueryCache::create_global_cache(config::query_cache_size * 
1024L * 1024L);
+    LOG(INFO) << "query cache memory limit: " << config::query_cache_size << 
"MB";
+
     return Status::OK();
 }
 
@@ -597,7 +601,9 @@ void ExecEnv::init_mem_tracker() {
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"SegCompaction");
     _point_query_executor_mem_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"PointQueryExecutor");
-    _block_compression_mem_tracker =
+    _query_cache_mem_tracker =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"QueryCache");
+    _block_compression_mem_tracker = _block_compression_mem_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"BlockCompression");
     _rowid_storage_reader_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"RowIdStorageReader");
@@ -687,6 +693,7 @@ void ExecEnv::destroy() {
     SAFE_DELETE(_schema_cache);
     SAFE_DELETE(_segment_loader);
     SAFE_DELETE(_row_cache);
+    SAFE_DELETE(_query_cache);
 
     // Free resource after threads are stopped.
     // Some threads are still running, like threads created by 
_new_load_stream_mgr ...
diff --git a/be/src/runtime/memory/cache_policy.h 
b/be/src/runtime/memory/cache_policy.h
index c43ca0b2fb7..5241efb9c29 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -48,6 +48,7 @@ public:
         CLOUD_TXN_DELETE_BITMAP_CACHE = 17,
         NONE = 18, // not be used
         FOR_UT_CACHE_NUMBER = 19,
+        QUERY_CACHE = 20
     };
 
     static std::string type_string(CacheType type) {
@@ -90,6 +91,8 @@ public:
             return "CloudTxnDeleteBitmapCache";
         case CacheType::FOR_UT_CACHE_NUMBER:
             return "ForUTCacheNumber";
+        case CacheType::QUERY_CACHE:
+            return "QUERY_CACHE";
         default:
             LOG(FATAL) << "not match type of cache policy :" << 
static_cast<int>(type);
         }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 59546b11d51..5b3867b5347 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -127,7 +127,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
             "4. If you need to "
             "transfer memory tracking value between two trackers, can use 
transfer_to.";
     if (_consumption->current_value() != 0) {
-        // TODO, expect mem tracker equal to 0 at the load/compaction/etc. 
task end.
+// TODO, expect mem tracker equal to 0 at the load/compaction/etc. task end.
 #ifndef NDEBUG
         if (_type == Type::QUERY || (_type == Type::LOAD && 
!is_group_commit_load)) {
             std::string err_msg =
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 8bcce65f229..d4644fca489 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -45,12 +45,10 @@
 #include "util/runtime_profile.h"
 #include "util/simd/bits.h"
 #include "util/slice.h"
-#include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_const.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_vector.h"
-#include "vec/columns/columns_number.h"
 #include "vec/common/assert_cast.h"
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/data_types/data_type_nullable.h"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to