This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch topn-lazy-materialize-poc
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/topn-lazy-materialize-poc by 
this push:
     new 3cefeab849b BE code topn lazy materialze (#48735)
3cefeab849b is described below

commit 3cefeab849b769b7e26277c156e970b06b99973a
Author: HappenLee <happen...@selectdb.com>
AuthorDate: Thu Mar 6 12:58:40 2025 +0800

    BE code topn lazy materialze (#48735)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/clucene                                     |   2 +-
 be/src/common/consts.h                             |   1 +
 be/src/exec/rowid_fetcher.cpp                      | 203 +++++++++++++
 be/src/exec/rowid_fetcher.h                        |  26 ++
 be/src/olap/CMakeLists.txt                         |   3 +-
 be/src/olap/id_manager.h                           | 195 ++++++++++++
 be/src/olap/rowset/segment_v2/column_reader.cpp    |  27 ++
 be/src/olap/rowset/segment_v2/column_reader.h      |  35 +++
 be/src/olap/rowset/segment_v2/segment_iterator.cpp |  11 +
 be/src/olap/schema.h                               |   6 +-
 be/src/olap/storage_engine.cpp                     |   4 +
 be/src/olap/tablet_schema.h                        |   9 +
 be/src/olap/utils.h                                |  11 +
 be/src/pipeline/dependency.cpp                     | 185 ++++++++++++
 be/src/pipeline/dependency.h                       |  47 ++-
 be/src/pipeline/exec/cache_sink_operator.cpp       |   1 -
 be/src/pipeline/exec/cache_sink_operator.h         |   6 +-
 be/src/pipeline/exec/cache_source_operator.cpp     |   2 +-
 be/src/pipeline/exec/cache_source_operator.h       |   4 +-
 .../exec/materialization_sink_operator.cpp         | 155 ++++++++++
 .../pipeline/exec/materialization_sink_operator.h  |  71 +++++
 .../exec/materialization_source_operator.cpp       |  59 ++++
 .../exec/materialization_source_operator.h         |  72 +++++
 be/src/pipeline/exec/operator.cpp                  |   8 +-
 be/src/pipeline/exec/scan_operator.cpp             |  10 +
 be/src/pipeline/pipeline_fragment_context.cpp      |  21 ++
 be/src/runtime/exec_env.h                          |   3 +
 be/src/runtime/exec_env_init.cpp                   |   3 +
 be/src/runtime/query_context.h                     |   1 -
 be/src/runtime/runtime_state.cpp                   |   5 +
 be/src/runtime/runtime_state.h                     |   8 +
 be/src/runtime/workload_group/workload_group.h     |   5 +
 be/src/service/backend_options.h                   |   1 +
 be/src/service/internal_service.cpp                |  38 +++
 be/src/service/internal_service.h                  |   4 +
 be/src/util/ref_count_closure.h                    |   7 +-
 be/src/vec/columns/column.h                        |   1 -
 be/src/vec/common/string_ref.cpp                   |   5 +-
 be/src/vec/exec/scan/new_olap_scanner.cpp          |   8 +
 be/src/vec/exec/scan/vfile_scanner.cpp             |   3 +-
 be/test/exec/hash_map/hash_table_method_test.cpp   |   5 +-
 be/test/olap/id_manager_test.cpp                   | 107 +++++++
 .../operator/materialization_shared_state_test.cpp | 336 +++++++++++++++++++++
 gensrc/proto/internal_service.proto                |  35 +++
 tools/clickbench-tools/conf/doris-cluster.conf     |   6 +-
 45 files changed, 1724 insertions(+), 31 deletions(-)

diff --git a/be/src/clucene b/be/src/clucene
index 3236e18d93b..2204eaec46a 160000
--- a/be/src/clucene
+++ b/be/src/clucene
@@ -1 +1 @@
-Subproject commit 3236e18d93bf96481493d88c34b6c2515f3b0b75
+Subproject commit 2204eaec46a68e5e9a1876b7021f24839ecb2cf0
diff --git a/be/src/common/consts.h b/be/src/common/consts.h
index 2ec9ae12679..548d5a771a2 100644
--- a/be/src/common/consts.h
+++ b/be/src/common/consts.h
@@ -27,6 +27,7 @@ const std::string CSV_WITH_NAMES_AND_TYPES = 
"csv_with_names_and_types";
 const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__";
 const std::string BLOCK_TEMP_COLUMN_SCANNER_FILTERED = 
"__TEMP__scanner_filtered";
 const std::string ROWID_COL = "__DORIS_ROWID_COL__";
+const std::string GLOBAL_ROWID_COL = "__DORIS_GLOBAL_ROWID_COL__";
 const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
 const std::string DYNAMIC_COLUMN_NAME = "__DORIS_DYNAMIC_COL__";
 const std::string PARTIAL_UPDATE_AUTO_INC_COL = 
"__PARTIAL_UPDATE_AUTO_INC_COLUMN__";
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index fa9f9571409..ac4023601e7 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -469,4 +469,207 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequest& request,
     return Status::OK();
 }
 
+Status RowIdStorageReader::read_by_rowids(const PMultiGetRequestV2& request,
+                                          PMultiGetResponseV2* response) {
+    if (request.request_block_descs_size()) {
+        OlapReaderStatistics stats;
+        std::vector<vectorized::Block> 
result_blocks(request.request_block_descs_size());
+        int64_t acquire_tablet_ms = 0;
+        int64_t acquire_rowsets_ms = 0;
+        int64_t acquire_segments_ms = 0;
+        int64_t lookup_row_data_ms = 0;
+        std::string row_store_buffer;
+
+        // Add counters for different file mapping types
+        std::unordered_map<FileMappingType, int64_t> file_type_counts;
+
+        auto id_file_map =
+                
ExecEnv::GetInstance()->get_id_manager()->get_id_file_map(request.query_id());
+        if (!id_file_map) {
+            return Status::InternalError("Backend:{} id_file_map is null, 
query_id: {}",
+                                         BackendOptions::get_localhost(),
+                                         print_id(request.query_id()));
+        }
+
+        for (int i = 0; i < request.request_block_descs_size(); ++i) {
+            const auto& request_block_desc = request.request_block_descs(i);
+
+            auto& result_block = result_blocks[i];
+            std::vector<SlotDescriptor> slots;
+            slots.reserve(request_block_desc.slots_size());
+            for (const auto& pslot : request_block_desc.slots()) {
+                slots.push_back(SlotDescriptor(pslot));
+            }
+            if (result_block.is_empty_column()) {
+                result_block = vectorized::Block(slots, 
request_block_desc.row_id_size());
+            }
+
+            TabletSchema full_read_schema;
+            for (const ColumnPB& column_pb : 
request_block_desc.column_descs()) {
+                full_read_schema.append_column(TabletColumn(column_pb));
+            }
+            std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey> 
iterator_map;
+
+            RowStoreReadStruct row_store_read_struct(row_store_buffer);
+            if (request_block_desc.fetch_row_store()) {
+                for (int j = 0; j < request_block_desc.slots_size(); ++j) {
+                    row_store_read_struct.serdes.emplace_back(
+                            slots[i].get_data_type_ptr()->get_serde());
+                    
row_store_read_struct.col_uid_to_idx[slots[i].col_unique_id()] = i;
+                    
row_store_read_struct.default_values.emplace_back(slots[i].col_default_value());
+                }
+            }
+
+            for (size_t j = 0; j < request_block_desc.row_id_size(); ++j) {
+                auto file_id = request_block_desc.file_id(j);
+                auto file_mapping = id_file_map->get_file_mapping(file_id);
+                if (!file_mapping) {
+                    return Status::InternalError(
+                            "Backend:{} file_mapping not found, query_id: {}, 
file_id: {}",
+                            BackendOptions::get_localhost(), 
print_id(request.query_id()), file_id);
+                }
+
+                // Count file mapping types
+                file_type_counts[file_mapping->type]++;
+
+                if (file_mapping->type == FileMappingType::DORIS_FORMAT) {
+                    RETURN_IF_ERROR(read_doris_format_row(
+                            id_file_map, file_mapping, 
request_block_desc.row_id(j), slots,
+                            full_read_schema, row_store_read_struct, stats, 
&acquire_tablet_ms,
+                            &acquire_rowsets_ms, &acquire_segments_ms, 
&lookup_row_data_ms,
+                            iterator_map, result_block));
+                }
+            }
+
+            [[maybe_unused]] size_t compressed_size = 0;
+            [[maybe_unused]] size_t uncompressed_size = 0;
+            int be_exec_version = request.has_be_exec_version() ? 
request.be_exec_version() : 0;
+            RETURN_IF_ERROR(result_block.serialize(
+                    be_exec_version, response->add_blocks()->mutable_block(), 
&uncompressed_size,
+                    &compressed_size, segment_v2::CompressionTypePB::LZ4));
+        }
+
+        // Build file type statistics string
+        std::string file_type_stats;
+        for (const auto& [type, count] : file_type_counts) {
+            if (!file_type_stats.empty()) {
+                file_type_stats += ", ";
+            }
+            file_type_stats += fmt::format("{}:{}", type, count);
+        }
+
+        LOG(INFO) << "Query stats: "
+                  << fmt::format(
+                             "hit_cached_pages:{}, total_pages_read:{}, 
compressed_bytes_read:{}, "
+                             "io_latency:{}ns, uncompressed_bytes_read:{}, 
bytes_read:{}, "
+                             "acquire_tablet_ms:{}, acquire_rowsets_ms:{}, 
acquire_segments_ms:{}, "
+                             "lookup_row_data_ms:{}, file_types:[{}]",
+                             stats.cached_pages_num, stats.total_pages_num,
+                             stats.compressed_bytes_read, stats.io_ns,
+                             stats.uncompressed_bytes_read, stats.bytes_read, 
acquire_tablet_ms,
+                             acquire_rowsets_ms, acquire_segments_ms, 
lookup_row_data_ms,
+                             file_type_stats);
+    }
+
+    if (request.has_gc_id_map() && request.gc_id_map()) {
+        
ExecEnv::GetInstance()->get_id_manager()->remove_id_file_map(request.query_id());
+    }
+
+    return Status::OK();
+}
+
+Status RowIdStorageReader::read_doris_format_row(
+        const std::shared_ptr<IdFileMap>& id_file_map,
+        const std::shared_ptr<FileMapping>& file_mapping, int64_t row_id,
+        std::vector<SlotDescriptor>& slots, const TabletSchema& 
full_read_schema,
+        RowStoreReadStruct& row_store_read_struct, OlapReaderStatistics& stats,
+        int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t* 
acquire_segments_ms,
+        int64_t* lookup_row_data_ms,
+        std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey>& 
iterator_map,
+        vectorized::Block& result_block) {
+    auto [tablet_id, rowset_id, segment_id] = 
file_mapping->get_doris_format_info();
+    BaseTabletSPtr tablet = scope_timer_run(
+            [&]() {
+                auto res = ExecEnv::get_tablet(tablet_id);
+                return !res.has_value() ? nullptr
+                                        : 
std::dynamic_pointer_cast<BaseTablet>(res.value());
+            },
+            acquire_tablet_ms);
+    if (!tablet) {
+        return Status::InternalError(
+                "Backend:{} tablet not found, tablet_id: {}, rowset_id: {}, 
segment_id: {}, "
+                "row_id: {}",
+                BackendOptions::get_localhost(), tablet_id, 
rowset_id.to_string(), segment_id,
+                row_id);
+    }
+
+    BetaRowsetSharedPtr rowset = std::static_pointer_cast<BetaRowset>(
+            scope_timer_run([&]() { return 
id_file_map->get_temp_rowset(tablet_id, rowset_id); },
+                            acquire_rowsets_ms));
+    if (!rowset) {
+        return Status::InternalError(
+                "Backend:{} rowset_id not found, tablet_id: {}, rowset_id: {}, 
segment_id: {}, "
+                "row_id: {}",
+                BackendOptions::get_localhost(), tablet_id, 
rowset_id.to_string(), segment_id,
+                row_id);
+    }
+
+    SegmentCacheHandle segment_cache;
+    RETURN_IF_ERROR(scope_timer_run(
+            [&]() {
+                return SegmentLoader::instance()->load_segments(rowset, 
&segment_cache, true);
+            },
+            acquire_segments_ms));
+
+    auto it =
+            std::find_if(segment_cache.get_segments().cbegin(), 
segment_cache.get_segments().cend(),
+                         [segment_id](const segment_v2::SegmentSharedPtr& seg) 
{
+                             return seg->id() == segment_id;
+                         });
+    if (it == segment_cache.get_segments().end()) {
+        return Status::InternalError(
+                "Backend:{} segment not found, tablet_id: {}, rowset_id: {}, 
segment_id: {}, "
+                "row_id: {}",
+                BackendOptions::get_localhost(), tablet_id, 
rowset_id.to_string(), segment_id,
+                row_id);
+    }
+    segment_v2::SegmentSharedPtr segment = *it;
+
+    // if row_store_read_struct not empty, means the line we should read from 
row_store
+    if (!row_store_read_struct.default_values.empty()) {
+        CHECK(tablet->tablet_schema()->has_row_store_for_all_columns());
+        RowLocation loc(rowset_id, segment->id(), row_id);
+        row_store_read_struct.row_store_buffer.clear();
+        RETURN_IF_ERROR(scope_timer_run(
+                [&]() {
+                    return tablet->lookup_row_data({}, loc, rowset, stats,
+                                                   
row_store_read_struct.row_store_buffer);
+                },
+                lookup_row_data_ms));
+
+        vectorized::JsonbSerializeUtil::jsonb_to_block(
+                row_store_read_struct.serdes, 
row_store_read_struct.row_store_buffer.data(),
+                row_store_read_struct.row_store_buffer.size(), 
row_store_read_struct.col_uid_to_idx,
+                result_block, row_store_read_struct.default_values, {});
+    } else {
+        for (int x = 0; x < slots.size(); ++x) {
+            vectorized::MutableColumnPtr column =
+                    result_block.get_by_position(x).column->assume_mutable();
+            IteratorKey iterator_key {.tablet_id = tablet_id,
+                                      .rowset_id = rowset_id,
+                                      .segment_id = segment_id,
+                                      .slot_id = slots[x].id()};
+            IteratorItem& iterator_item = iterator_map[iterator_key];
+            if (iterator_item.segment == nullptr) {
+                iterator_map[iterator_key].segment = segment;
+            }
+            segment = iterator_item.segment;
+            RETURN_IF_ERROR(segment->seek_and_read_by_rowid(full_read_schema, 
&slots[x], row_id,
+                                                            column, stats, 
iterator_item.iterator));
+        }
+    }
+
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/exec/rowid_fetcher.h b/be/src/exec/rowid_fetcher.h
index 1fc8b02a679..a43d76988d6 100644
--- a/be/src/exec/rowid_fetcher.h
+++ b/be/src/exec/rowid_fetcher.h
@@ -27,6 +27,7 @@
 
 #include "common/status.h"
 #include "exec/tablet_info.h" // DorisNodesInfo
+#include "olap/id_manager.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type.h"
 
@@ -36,6 +37,11 @@ class DorisNodesInfo;
 class RuntimeState;
 class TupleDescriptor;
 
+struct FileMapping;
+struct IteratorKey;
+struct IteratorItem;
+struct HashOfIteratorKey;
+
 namespace vectorized {
 template <typename T>
 class ColumnStr;
@@ -70,9 +76,29 @@ private:
     FetchOption _fetch_option;
 };
 
+struct RowStoreReadStruct {
+    RowStoreReadStruct(std::string& buffer) : row_store_buffer(buffer) {};
+    std::string& row_store_buffer;
+    vectorized::DataTypeSerDeSPtrs serdes;
+    std::unordered_map<uint32_t, uint32_t> col_uid_to_idx;
+    std::vector<std::string> default_values;
+};
+
 class RowIdStorageReader {
 public:
     static Status read_by_rowids(const PMultiGetRequest& request, 
PMultiGetResponse* response);
+    static Status read_by_rowids(const PMultiGetRequestV2& request, 
PMultiGetResponseV2* response);
+
+private:
+    static Status read_doris_format_row(
+            const std::shared_ptr<IdFileMap>& id_file_map,
+            const std::shared_ptr<FileMapping>& file_mapping, int64_t row_id,
+            std::vector<SlotDescriptor>& slots, const TabletSchema& 
full_read_schema,
+            RowStoreReadStruct& row_store_read_struct, OlapReaderStatistics& 
stats,
+            int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t* 
acquire_segments_ms,
+            int64_t* lookup_row_data_ms,
+            std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey>& 
iterator_map,
+            vectorized::Block& result_block);
 };
 
 } // namespace doris
diff --git a/be/src/olap/CMakeLists.txt b/be/src/olap/CMakeLists.txt
index bf19ef26764..6aa10435524 100644
--- a/be/src/olap/CMakeLists.txt
+++ b/be/src/olap/CMakeLists.txt
@@ -22,7 +22,8 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/olap")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/olap")
 
 file(GLOB_RECURSE SRC_FILES CONFIGURE_DEPENDS *.cpp)
-add_library(Olap STATIC ${SRC_FILES})
+add_library(Olap STATIC ${SRC_FILES}
+        id_manager.h)
 
 if (NOT USE_MEM_TRACKER)
     target_compile_options(Olap PRIVATE -Wno-unused-lambda-capture)
diff --git a/be/src/olap/id_manager.h b/be/src/olap/id_manager.h
new file mode 100644
index 00000000000..cb056b792cb
--- /dev/null
+++ b/be/src/olap/id_manager.h
@@ -0,0 +1,195 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <butil/macros.h>
+#include <gen_cpp/BackendService_types.h>
+#include <gen_cpp/Types_types.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <set>
+#include <shared_mutex>
+#include <string>
+#include <string_view>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "olap/olap_common.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+
+namespace doris {
+
+enum class FileMappingType {
+    DORIS_FORMAT, // for doris format file {tablet_id}{rowset_id}{segment_id}
+    ORC,
+    PARQUET
+};
+
+struct FileMapping {
+    FileMappingType type;
+    std::string value;
+
+    FileMapping(FileMappingType t, std::string v) : type(t), 
value(std::move(v)) {};
+
+    FileMapping(int64_t tablet_id, RowsetId rowset_id, uint32_t segment_id)
+            : type(FileMappingType::DORIS_FORMAT) {
+        value.resize(sizeof(tablet_id) + sizeof(rowset_id) + 
sizeof(segment_id));
+        auto* ptr = value.data();
+
+        memcpy(ptr, &tablet_id, sizeof(tablet_id));
+        ptr += sizeof(tablet_id);
+        memcpy(ptr, &rowset_id, sizeof(rowset_id));
+        ptr += sizeof(rowset_id);
+        memcpy(ptr, &segment_id, sizeof(segment_id));
+    }
+
+    std::tuple<int64_t, RowsetId, uint32_t> get_doris_format_info() const {
+        DCHECK(type == FileMappingType::DORIS_FORMAT);
+        DCHECK(value.size() == sizeof(int64_t) + sizeof(RowsetId) + 
sizeof(uint32_t));
+
+        auto* ptr = value.data();
+        int64_t tablet_id;
+        memcpy(&tablet_id, ptr, sizeof(tablet_id));
+        ptr += sizeof(tablet_id);
+        RowsetId rowset_id;
+        memcpy(&rowset_id, ptr, sizeof(rowset_id));
+        ptr += sizeof(rowset_id);
+        uint32_t segment_id;
+        memcpy(&segment_id, ptr, sizeof(segment_id));
+
+        return std::make_tuple(tablet_id, rowset_id, segment_id);
+    }
+};
+
+class IdFileMap {
+public:
+    IdFileMap(uint64_t expired_timestamp) : 
delayed_expired_timestamp(expired_timestamp) {}
+
+    std::shared_ptr<FileMapping> get_file_mapping(uint32_t id) {
+        std::shared_lock lock(_mtx);
+        auto it = _id_map.find(id);
+        if (it == _id_map.end()) {
+            return nullptr;
+        }
+        return it->second;
+    }
+
+    uint32 get_file_mapping_id(const std::shared_ptr<FileMapping>& mapping) {
+        DCHECK(mapping.get() != nullptr);
+        std::unique_lock lock(_mtx);
+        auto it = _mapping_to_id.find(mapping->value);
+        if (it != _mapping_to_id.end()) {
+            return it->second;
+        }
+        _id_map[_init_id++] = mapping;
+        _mapping_to_id[mapping->value] = _init_id - 1;
+
+        return _init_id - 1;
+    }
+
+    void add_temp_rowset(const RowsetSharedPtr& rowset) {
+        std::unique_lock lock(_mtx);
+        _temp_rowset_maps[{rowset->rowset_meta()->tablet_id(), 
rowset->rowset_id()}] = rowset;
+    }
+
+    RowsetSharedPtr get_temp_rowset(const int64_t tablet_id, const RowsetId& 
rowset_id) {
+        std::shared_lock lock(_mtx);
+        auto it = _temp_rowset_maps.find({tablet_id, rowset_id});
+        if (it == _temp_rowset_maps.end()) {
+            return nullptr;
+        }
+        return it->second;
+    }
+
+    int64_t get_delayed_expired_timestamp() { return 
delayed_expired_timestamp; }
+
+private:
+    std::shared_mutex _mtx;
+    uint32_t _init_id = 0;
+    std::unordered_map<std::string_view, uint32_t> _mapping_to_id;
+    std::unordered_map<uint32_t, std::shared_ptr<FileMapping>> _id_map;
+
+    // use in Doris Format to keep temp rowsets, preventing them from being 
deleted by compaction
+    std::unordered_map<std::pair<int64_t, RowsetId>, RowsetSharedPtr> 
_temp_rowset_maps;
+    uint64_t delayed_expired_timestamp = 0;
+};
+
+class IdManager {
+public:
+    static constexpr uint8_t ID_VERSION = 0;
+
+    IdManager() = default;
+
+    ~IdManager() {
+        std::unique_lock lock(_query_to_id_file_map_mtx);
+        _query_to_id_file_map.clear();
+    }
+
+    std::shared_ptr<IdFileMap> add_id_file_map(const UniqueId& query_id, int 
timeout) {
+        std::unique_lock lock(_query_to_id_file_map_mtx);
+        auto it = _query_to_id_file_map.find(query_id);
+        if (it == _query_to_id_file_map.end()) {
+            auto id_file_map = std::make_shared<IdFileMap>(UnixSeconds() + 
timeout);
+            _query_to_id_file_map[query_id] = id_file_map;
+            return id_file_map;
+        }
+        return it->second;
+    }
+
+    void gc_expired_id_file_map(int64_t now) {
+        std::unique_lock lock(_query_to_id_file_map_mtx);
+        for (auto it = _query_to_id_file_map.begin(); it != 
_query_to_id_file_map.end();) {
+            if (it->second->get_delayed_expired_timestamp() <= now) {
+                it = _query_to_id_file_map.erase(it);
+            } else {
+                ++it;
+            }
+        }
+    }
+
+    void remove_id_file_map(const UniqueId& query_id) {
+        std::unique_lock lock(_query_to_id_file_map_mtx);
+        _query_to_id_file_map.erase(query_id);
+    }
+
+    std::shared_ptr<IdFileMap> get_id_file_map(const UniqueId& query_id) {
+        std::shared_lock lock(_query_to_id_file_map_mtx);
+        auto it = _query_to_id_file_map.find(query_id);
+        if (it == _query_to_id_file_map.end()) {
+            return nullptr;
+        }
+        return it->second;
+    }
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(IdManager);
+
+    phmap::flat_hash_map<UniqueId, std::shared_ptr<IdFileMap>> 
_query_to_id_file_map;
+    std::shared_mutex _query_to_id_file_map_mtx;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp 
b/be/src/olap/rowset/segment_v2/column_reader.cpp
index 9948e7fd8cf..7f88970eb4a 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -1753,4 +1753,31 @@ Status DefaultNestedColumnIterator::read_by_rowids(const 
rowid_t* rowids, const
     return Status::OK();
 }
 
+Status RowIdColumnIteratorV2::next_batch(size_t* n, 
vectorized::MutableColumnPtr& dst,
+                                         bool* has_null) {
+    auto* string_column = assert_cast<vectorized::ColumnString*>(dst.get());
+
+    for (size_t i = 0; i < *n; ++i) {
+        uint32_t row_id = _current_rowid + i;
+        GlobalRowLoacationV2 location(_version, _backend_id, _file_id, row_id);
+        string_column->insert_data(reinterpret_cast<const char*>(&location),
+                                   sizeof(GlobalRowLoacationV2));
+    }
+    _current_rowid += *n;
+    return Status::OK();
+}
+
+Status RowIdColumnIteratorV2::read_by_rowids(const rowid_t* rowids, const 
size_t count,
+                                             vectorized::MutableColumnPtr& 
dst) {
+    auto* string_column = assert_cast<vectorized::ColumnString*>(dst.get());
+
+    for (size_t i = 0; i < count; ++i) {
+        uint32_t row_id = rowids[i];
+        GlobalRowLoacationV2 location(_version, _backend_id, _file_id, row_id);
+        string_column->insert_data(reinterpret_cast<const char*>(&location),
+                                   sizeof(GlobalRowLoacationV2));
+    }
+    return Status::OK();
+}
+
 } // namespace doris::segment_v2
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h 
b/be/src/olap/rowset/segment_v2/column_reader.h
index 2afc269a86c..91866ce97ad 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -634,6 +634,41 @@ private:
     int32_t _segment_id = 0;
 };
 
+// Add new RowIdColumnIteratorV2
+class RowIdColumnIteratorV2 : public ColumnIterator {
+public:
+    RowIdColumnIteratorV2(uint8_t version, int64_t backend_id, uint32_t 
file_id)
+            : _version(version), _backend_id(backend_id), _file_id(file_id) {}
+
+    Status seek_to_first() override {
+        _current_rowid = 0;
+        return Status::OK();
+    }
+
+    Status seek_to_ordinal(ordinal_t ord_idx) override {
+        _current_rowid = ord_idx;
+        return Status::OK();
+    }
+
+    Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
+        bool has_null;
+        return next_batch(n, dst, &has_null);
+    }
+
+    Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* 
has_null) override;
+
+    Status read_by_rowids(const rowid_t* rowids, const size_t count,
+                          vectorized::MutableColumnPtr& dst) override;
+
+    ordinal_t get_current_ordinal() const override { return _current_rowid; }
+
+private:
+    uint32_t _current_rowid = 0;
+    uint8_t _version;
+    int64_t _backend_id;
+    uint32_t _file_id;
+};
+
 class VariantRootColumnIterator : public ColumnIterator {
 public:
     VariantRootColumnIterator() = delete;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp 
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 5d2a3d8b60e..391e06d2e15 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -43,6 +43,7 @@
 #include "olap/bloom_filter_predicate.h"
 #include "olap/column_predicate.h"
 #include "olap/field.h"
+#include "olap/id_manager.h"
 #include "olap/iterators.h"
 #include "olap/like_column_predicate.h"
 #include "olap/match_predicate.h"
@@ -1024,6 +1025,16 @@ Status SegmentIterator::_init_return_column_iterators() {
                     new RowIdColumnIterator(_opts.tablet_id, _opts.rowset_id, 
_segment->id()));
             continue;
         }
+
+        if 
(_schema->column(cid)->name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+            auto& id_file_map = _opts.runtime_state->get_id_file_map();
+            uint32_t file_id = 
id_file_map->get_file_mapping_id(std::make_shared<FileMapping>(
+                    _opts.tablet_id, _opts.rowset_id, _segment->id()));
+            _column_iterators[cid].reset(new RowIdColumnIteratorV2(
+                    IdManager::ID_VERSION, BackendOptions::get_backend_id(), 
file_id));
+            continue;
+        }
+
         std::set<ColumnId> del_cond_id_set;
         _opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set);
         std::vector<bool> tmp_is_pred_column;
diff --git a/be/src/olap/schema.h b/be/src/olap/schema.h
index 6414db4153a..b7198511120 100644
--- a/be/src/olap/schema.h
+++ b/be/src/olap/schema.h
@@ -68,7 +68,8 @@ public:
             if (column.is_key()) {
                 ++num_key_columns;
             }
-            if (column.name() == BeConsts::ROWID_COL) {
+            if (column.name() == BeConsts::ROWID_COL ||
+                column.name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
                 _rowid_col_idx = cid;
             }
             if (column.name() == VERSION_COL) {
@@ -94,7 +95,8 @@ public:
             if (columns[i]->name() == DELETE_SIGN) {
                 _delete_sign_idx = i;
             }
-            if (columns[i]->name() == BeConsts::ROWID_COL) {
+            if (columns[i]->name() == BeConsts::ROWID_COL ||
+                columns[i]->name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
                 _rowid_col_idx = i;
             }
             if (columns[i]->name() == VERSION_COL) {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 3ba65813492..21ad2fa051d 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -53,6 +53,7 @@
 #include "io/fs/local_file_system.h"
 #include "olap/binlog.h"
 #include "olap/data_dir.h"
+#include "olap/id_manager.h"
 #include "olap/memtable_flush_executor.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
@@ -1491,6 +1492,9 @@ void BaseStorageEngine::_evict_querying_rowset() {
             }
         }
     }
+
+    uint64_t now = UnixSeconds();
+    ExecEnv::GetInstance()->get_id_manager()->gc_expired_id_file_map(now);
 }
 
 bool StorageEngine::add_broken_path(std::string path) {
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 957b9adb2b9..14202758f5b 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -398,6 +398,15 @@ public:
     long row_store_page_size() const { return _row_store_page_size; }
     void set_storage_page_size(long storage_page_size) { _storage_page_size = 
storage_page_size; }
     long storage_page_size() const { return _storage_page_size; }
+    bool has_global_row_id() const {
+        for (auto [col_name, _] : _field_name_to_index) {
+            if 
(col_name.start_with(StringRef(BeConsts::GLOBAL_ROWID_COL.data(),
+                                              
BeConsts::GLOBAL_ROWID_COL.size()))) {
+                return true;
+            }
+        }
+        return false;
+    }
 
     const std::vector<const TabletIndex*> inverted_indexes() const {
         std::vector<const TabletIndex*> inverted_indexes;
diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h
index c163aad1148..242e39c2c54 100644
--- a/be/src/olap/utils.h
+++ b/be/src/olap/utils.h
@@ -263,4 +263,15 @@ struct GlobalRowLoacation {
     }
 };
 
+struct GlobalRowLoacationV2 {
+    GlobalRowLoacationV2(uint8_t ver, uint64_t bid, uint32_t fid, uint32_t rid)
+            : version(ver), backend_id(bid), file_id(fid), row_id(rid) {}
+    uint8_t version;
+    int64_t backend_id;
+    uint32_t file_id;
+    uint32_t row_id;
+
+    auto operator<=>(const GlobalRowLoacationV2&) const = default;
+};
+
 } // namespace doris
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 46e1a366b7b..603d77e93c9 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -27,12 +27,14 @@
 #include "pipeline/pipeline_task.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
+#include "util/brpc_client_cache.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 #include "vec/exprs/vslot_ref.h"
 #include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
+#include "vec/utils/util.hpp"
 Dependency* BasicSharedState::create_source_dependency(int operator_id, int 
node_id,
                                                        const std::string& 
name) {
     source_deps.push_back(std::make_shared<Dependency>(operator_id, node_id, 
name + "_DEPENDENCY"));
@@ -456,4 +458,187 @@ void AggSharedState::refresh_top_limit(size_t row_id,
     limit_columns_min = limit_heap.top()._row_id;
 }
 
+Status MaterializationSharedState::merge_multi_response(vectorized::Block* 
block) {
+    // init the response_blocks
+    if (response_blocks.empty()) {
+        response_blocks = 
std::vector<vectorized::MutableBlock>(block_order_results.size());
+    }
+
+    std::map<int64_t, std::pair<vectorized::Block, int>> _block_maps;
+    for (int i = 0; i < block_order_results.size(); ++i) {
+        for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
+            vectorized::Block partial_block;
+            RETURN_IF_ERROR(
+                    
partial_block.deserialize(rpc_struct.callback->response_->blocks(i).block()));
+
+            if (!partial_block.is_empty_column()) {
+                if (!response_blocks[i].columns()) {
+                    response_blocks[i] = 
vectorized::MutableBlock(partial_block.clone_empty());
+                }
+                _block_maps[backend_id] = 
std::make_pair(std::move(partial_block), 0);
+            }
+        }
+
+        for (int j = 0; j < block_order_results[i].size(); ++j) {
+            auto backend_id = block_order_results[i][j];
+            if (backend_id) {
+                auto& source_block_rows = _block_maps[backend_id];
+                DCHECK(source_block_rows.second < 
source_block_rows.first.rows());
+                for (int k = 0; k < response_blocks[i].columns(); ++k) {
+                    response_blocks[i].get_column_by_position(k)->insert_from(
+                            *source_block_rows.first.get_by_position(k).column,
+                            source_block_rows.second);
+                }
+                source_block_rows.second++;
+            } else {
+                for (int k = 0; k < response_blocks[i].columns(); ++k) {
+                    
response_blocks[i].get_column_by_position(k)->insert_default();
+                }
+            }
+        }
+    }
+
+    // clear request/response
+    for (auto& [_, rpc_struct] : rpc_struct_map) {
+        for (int i = 0; i < rpc_struct.request.request_block_descs_size(); 
++i) {
+            rpc_struct.request.mutable_request_block_descs(i)->clear_row_id();
+            rpc_struct.request.mutable_request_block_descs(i)->clear_file_id();
+        }
+    }
+
+    for (int i = 0, j = 0, rowid_to_block_loc = rowid_locs[j]; i < 
origin_block.columns(); i++) {
+        if (i != rowid_to_block_loc) {
+            block->insert(origin_block.get_by_position(i));
+        } else {
+            auto response_block = response_blocks[j].to_block();
+            for (auto& data : response_block) {
+                block->insert(data);
+            }
+            if (++j < rowid_locs.size()) {
+                rowid_to_block_loc = rowid_locs[j];
+            }
+        }
+    }
+    origin_block.clear();
+    response_blocks.clear();
+
+    return Status::OK();
+}
+
+Dependency* MaterializationSharedState::create_source_dependency(int 
operator_id, int node_id,
+                                                                 const 
std::string& name) {
+    auto dep =
+            std::make_shared<CountedFinishDependency>(operator_id, node_id, 
name + "_DEPENDENCY");
+    dep->set_shared_state(this);
+    // just block source wait for add the counter in sink
+    dep->add(0);
+
+    source_deps.push_back(dep);
+    return source_deps.back().get();
+}
+
+Status MaterializationSharedState::create_muiltget_result(const 
vectorized::Columns& columns,
+                                                          bool eos, bool 
gc_id_map) {
+    const auto rows = columns.empty() ? 0 : columns[0]->size();
+    block_order_results.resize(columns.size());
+
+    for (int i = 0; i < columns.size(); ++i) {
+        const uint8_t* null_map = nullptr;
+        const vectorized::ColumnString* column_rowid = nullptr;
+        auto& column = columns[i];
+
+        if (auto column_ptr = 
check_and_get_column<vectorized::ColumnNullable>(*column)) {
+            null_map = column_ptr->get_null_map_data().data();
+            column_rowid = assert_cast<const vectorized::ColumnString*>(
+                    column_ptr->get_nested_column_ptr().get());
+        } else {
+            column_rowid = assert_cast<const 
vectorized::ColumnString*>(column.get());
+        }
+
+        auto& block_order = block_order_results[i];
+        block_order.resize(rows);
+
+        for (int j = 0; j < rows; ++j) {
+            if (!null_map || !null_map[j]) {
+                DCHECK(column_rowid->get_data_at(j).size == 
sizeof(GlobalRowLoacationV2));
+                GlobalRowLoacationV2 row_location =
+                        
*((GlobalRowLoacationV2*)column_rowid->get_data_at(j).data);
+                auto rpc_struct = rpc_struct_map.find(row_location.backend_id);
+                if (UNLIKELY(rpc_struct == rpc_struct_map.end())) {
+                    return Status::InternalError(
+                            "MaterializationSinkOperatorX failed to find 
rpc_struct, backend_id={}",
+                            row_location.backend_id);
+                }
+                
rpc_struct->second.request.mutable_request_block_descs(i)->add_row_id(
+                        row_location.row_id);
+                
rpc_struct->second.request.mutable_request_block_descs(i)->add_file_id(
+                        row_location.file_id);
+                block_order[j] = row_location.backend_id;
+            } else {
+                block_order[j] = 0;
+            }
+        }
+    }
+
+    if (eos && gc_id_map) {
+        for (auto& [_, rpc_struct] : rpc_struct_map) {
+            rpc_struct.request.set_gc_id_map(true);
+        }
+    }
+    last_block = eos;
+    need_merge_block = rows > 0;
+
+    return Status::OK();
+}
+
+Status MaterializationSharedState::init_multi_requests(
+        const TMaterializationNode& materialization_node, RuntimeState* state) 
{
+    PMultiGetRequestV2 multi_get_request;
+    // Initialize the base struct of PMultiGetRequestV2
+    multi_get_request.set_be_exec_version(state->be_exec_version());
+    
multi_get_request.set_wg_id(state->get_query_ctx()->workload_group()->id());
+    auto query_id = multi_get_request.mutable_query_id();
+    query_id->set_hi(state->query_id().hi);
+    query_id->set_lo(state->query_id().lo);
+    DCHECK_EQ(materialization_node.column_descs_lists.size(),
+              materialization_node.slot_locs_lists.size());
+
+    const auto& slots = state->desc_tbl()
+                                
.get_tuple_descriptor(materialization_node.intermediate_tuple_id)
+                                ->slots();
+    for (int i = 0; i < materialization_node.column_descs_lists.size(); ++i) {
+        auto request_block_desc = multi_get_request.add_request_block_descs();
+        
request_block_desc->set_fetch_row_store(materialization_node.fetch_row_stores[i]);
+        // Initialize the column_descs and slot_locs
+        auto& column_descs = materialization_node.column_descs_lists[i];
+        for (auto& column_desc_item : column_descs) {
+            
TabletColumn(column_desc_item).to_schema_pb(request_block_desc->add_column_descs());
+        }
+
+        auto& slot_locs = materialization_node.slot_locs_lists[i];
+        for (auto& slot_loc_item : slot_locs) {
+            slots[slot_loc_item]->to_protobuf(request_block_desc->add_slots());
+        }
+    }
+
+    // Initialize the stubs and requests for each BE
+    for (const auto& node_info : materialization_node.nodes_info.nodes) {
+        auto client = 
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+                node_info.host, node_info.async_internal_port);
+        if (!client) {
+            LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host
+                         << ", port=" << node_info.async_internal_port;
+            return Status::InternalError("RowIDFetcher failed to init rpc 
client, host={}, port={}",
+                                         node_info.host, 
node_info.async_internal_port);
+        }
+        rpc_struct_map.emplace(node_info.id, FetchRpcStruct {.stub = 
std::move(client),
+                                                             .request = 
multi_get_request,
+                                                             .callback = 
nullptr});
+    }
+    // add be_num ad count finish counter for source dependency
+    
((CountedFinishDependency*)source_deps.back().get())->add((int)rpc_struct_map.size());
+
+    return Status::OK();
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index ea1928215b0..22f52f1ea77 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -29,12 +29,14 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "gen_cpp/internal_service.pb.h"
 #include "gutil/integral_types.h"
 #include "pipeline/common/agg_utils.h"
 #include "pipeline/common/join_utils.h"
 #include "pipeline/common/set_utils.h"
 #include "pipeline/exec/data_queue.h"
 #include "pipeline/exec/join/process_hash_table_probe.h"
+#include "util/ref_count_closure.h"
 #include "util/stack_util.h"
 #include "vec/common/sort/partition_sorter.h"
 #include "vec/common/sort/sorter.h"
@@ -83,7 +85,8 @@ struct BasicSharedState {
 
     virtual ~BasicSharedState() = default;
 
-    Dependency* create_source_dependency(int operator_id, int node_id, const 
std::string& name);
+    virtual Dependency* create_source_dependency(int operator_id, int node_id,
+                                                 const std::string& name);
 
     Dependency* create_sink_dependency(int dest_id, int node_id, const 
std::string& name);
 };
@@ -173,12 +176,12 @@ public:
     CountedFinishDependency(int id, int node_id, std::string name)
             : Dependency(id, node_id, std::move(name), true) {}
 
-    void add() {
+    void add(uint32_t count = 1) {
         std::unique_lock<std::mutex> l(_mtx);
         if (!_counter) {
             block();
         }
-        _counter++;
+        _counter += count;
     }
 
     void sub() {
@@ -551,8 +554,8 @@ public:
     const int _child_count;
 };
 
-struct CacheSharedState : public BasicSharedState {
-    ENABLE_FACTORY_CREATOR(CacheSharedState)
+struct DataQueueSharedState : public BasicSharedState {
+    ENABLE_FACTORY_CREATOR(DataQueueSharedState)
 public:
     DataQueue data_queue;
 };
@@ -872,5 +875,39 @@ class QueryGlobalDependency final : public Dependency {
     ~QueryGlobalDependency() override = default;
     Dependency* is_blocked_by(PipelineTask* task = nullptr) override;
 };
+
+struct FetchRpcStruct {
+    std::shared_ptr<PBackendService_Stub> stub;
+    PMultiGetRequestV2 request;
+    std::shared_ptr<doris::DummyBrpcCallback<PMultiGetResponseV2>> callback;
+    MonotonicStopWatch rpc_timer;
+};
+
+struct MaterializationSharedState : public BasicSharedState {
+    ENABLE_FACTORY_CREATOR(MaterializationSharedState)
+public:
+    MaterializationSharedState() = default;
+
+    Status init_multi_requests(const TMaterializationNode& tnode, 
RuntimeState* state);
+    Status create_muiltget_result(const vectorized::Columns& columns, bool 
eos, bool gc_id_map);
+    Status merge_multi_response(vectorized::Block* block);
+
+    Dependency* create_source_dependency(int operator_id, int node_id,
+                                         const std::string& name) override;
+
+    bool rpc_struct_inited = false;
+    Status rpc_status = Status::OK();
+    bool last_block = false;
+    // empty materialization sink block not need to merge block
+    bool need_merge_block = true;
+    vectorized::Block origin_block;
+    // The rowid column of the origin block. should be replaced by the column 
of the result block.
+    std::vector<int> rowid_locs;
+    std::vector<vectorized::MutableBlock> response_blocks;
+    std::map<int64_t, FetchRpcStruct> rpc_struct_map;
+    // Register each line in which block to ensure the order of the result.
+    // Zero means NULL value.
+    std::vector<std::vector<int64_t>> block_order_results;
+};
 #include "common/compile_check_end.h"
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/cache_sink_operator.cpp 
b/be/src/pipeline/exec/cache_sink_operator.cpp
index f1a8a2c888e..93d50212875 100644
--- a/be/src/pipeline/exec/cache_sink_operator.cpp
+++ b/be/src/pipeline/exec/cache_sink_operator.cpp
@@ -40,7 +40,6 @@ Status CacheSinkLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
     RETURN_IF_ERROR(Base::open(state));
-    //    auto& p = _parent->cast<Parent>();
 
     
_shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks());
     return Status::OK();
diff --git a/be/src/pipeline/exec/cache_sink_operator.h 
b/be/src/pipeline/exec/cache_sink_operator.h
index d4f2113ed06..c67ada5735c 100644
--- a/be/src/pipeline/exec/cache_sink_operator.h
+++ b/be/src/pipeline/exec/cache_sink_operator.h
@@ -33,14 +33,14 @@ namespace pipeline {
 class DataQueue;
 
 class CacheSinkOperatorX;
-class CacheSinkLocalState final : public 
PipelineXSinkLocalState<CacheSharedState> {
+class CacheSinkLocalState final : public 
PipelineXSinkLocalState<DataQueueSharedState> {
 public:
     ENABLE_FACTORY_CREATOR(CacheSinkLocalState);
     CacheSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : 
Base(parent, state) {}
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
     friend class CacheSinkOperatorX;
-    using Base = PipelineXSinkLocalState<CacheSharedState>;
+    using Base = PipelineXSinkLocalState<DataQueueSharedState>;
     using Parent = CacheSinkOperatorX;
 };
 
@@ -59,7 +59,7 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
     std::shared_ptr<BasicSharedState> create_shared_state() const override {
-        std::shared_ptr<BasicSharedState> ss = 
std::make_shared<CacheSharedState>();
+        std::shared_ptr<BasicSharedState> ss = 
std::make_shared<DataQueueSharedState>();
         ss->id = operator_id();
         for (auto& dest : dests_id()) {
             ss->related_op_ids.insert(dest);
diff --git a/be/src/pipeline/exec/cache_source_operator.cpp 
b/be/src/pipeline/exec/cache_source_operator.cpp
index ec9f9ecc572..9db41c4cd85 100644
--- a/be/src/pipeline/exec/cache_source_operator.cpp
+++ b/be/src/pipeline/exec/cache_source_operator.cpp
@@ -34,7 +34,7 @@ Status CacheSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
-    ((CacheSharedState*)_dependency->shared_state())
+    ((DataQueueSharedState*)_dependency->shared_state())
             
->data_queue.set_source_dependency(_shared_state->source_deps.front());
     const auto& scan_ranges = info.scan_ranges;
     bool hit_cache = false;
diff --git a/be/src/pipeline/exec/cache_source_operator.h 
b/be/src/pipeline/exec/cache_source_operator.h
index 651f9ff5596..ec95301cbea 100644
--- a/be/src/pipeline/exec/cache_source_operator.h
+++ b/be/src/pipeline/exec/cache_source_operator.h
@@ -36,10 +36,10 @@ namespace pipeline {
 class DataQueue;
 
 class CacheSourceOperatorX;
-class CacheSourceLocalState final : public 
PipelineXLocalState<CacheSharedState> {
+class CacheSourceLocalState final : public 
PipelineXLocalState<DataQueueSharedState> {
 public:
     ENABLE_FACTORY_CREATOR(CacheSourceLocalState);
-    using Base = PipelineXLocalState<CacheSharedState>;
+    using Base = PipelineXLocalState<DataQueueSharedState>;
     using Parent = CacheSourceOperatorX;
     CacheSourceLocalState(RuntimeState* state, OperatorXBase* parent) : 
Base(state, parent) {};
 
diff --git a/be/src/pipeline/exec/materialization_sink_operator.cpp 
b/be/src/pipeline/exec/materialization_sink_operator.cpp
new file mode 100644
index 00000000000..acdae4c508b
--- /dev/null
+++ b/be/src/pipeline/exec/materialization_sink_operator.cpp
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/materialization_sink_operator.h"
+
+#include <bthread/countdown_event.h>
+#include <fmt/format.h>
+#include <gen_cpp/data.pb.h>
+#include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <gen_cpp/types.pb.h>
+
+#include <utility>
+
+#include "common/status.h"
+#include "pipeline/exec/operator.h"
+#include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
+#include "vec/columns/column.h"
+#include "vec/core/block.h"
+
+namespace doris {
+namespace pipeline {
+
+Status MaterializationSinkOperatorX::init(const doris::TPlanNode& tnode,
+                                          doris::RuntimeState* state) {
+    RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
+    DCHECK(tnode.__isset.materialization_node);
+    _materialization_node = tnode.materialization_node;
+    _gc_id_map = tnode.materialization_node.gc_id_map;
+    // Create result_expr_ctx_lists_ from thrift exprs.
+    auto& fetch_expr_lists = tnode.materialization_node.fetch_expr_lists;
+    RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(fetch_expr_lists, 
_rowid_exprs));
+    return Status::OK();
+}
+
+Status MaterializationSinkOperatorX::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(vectorized::VExpr::prepare(_rowid_exprs, state, 
_child->row_desc()));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_rowid_exprs, state));
+    return Status::OK();
+}
+
+template <typename Response>
+class MaterializationCallback : public ::doris::DummyBrpcCallback<Response> {
+    ENABLE_FACTORY_CREATOR(MaterializationCallback);
+
+public:
+    MaterializationCallback(std::weak_ptr<TaskExecutionContext> tast_exec_ctx,
+                            MaterializationSharedState* shared_state, 
MonotonicStopWatch& rpc_timer)
+            : _tast_exec_ctx(std::move(tast_exec_ctx)),
+              _shared_state(shared_state),
+              _rpc_timer(rpc_timer) {}
+
+    ~MaterializationCallback() override = default;
+    MaterializationCallback(const MaterializationCallback& other) = delete;
+    MaterializationCallback& operator=(const MaterializationCallback& other) = 
delete;
+
+    void call() noexcept override {
+        auto tast_exec_ctx = _tast_exec_ctx.lock();
+        if (!tast_exec_ctx) {
+            return;
+        }
+
+        _rpc_timer.stop();
+        if (::doris::DummyBrpcCallback<Response>::cntl_->Failed()) {
+            std::string err = fmt::format(
+                    "failed to send brpc when exchange, error={}, 
error_text={}, client: {}, "
+                    "latency = {}",
+                    
berror(::doris::DummyBrpcCallback<Response>::cntl_->ErrorCode()),
+                    ::doris::DummyBrpcCallback<Response>::cntl_->ErrorText(),
+                    BackendOptions::get_localhost(),
+                    ::doris::DummyBrpcCallback<Response>::cntl_->latency_us());
+            _shared_state->rpc_status = Status::InternalError(err);
+        } else {
+            _shared_state->rpc_status =
+                    
Status::create(doris::DummyBrpcCallback<Response>::response_->status());
+        }
+        
((CountedFinishDependency*)_shared_state->source_deps.back().get())->sub();
+    }
+
+private:
+    std::weak_ptr<TaskExecutionContext> _tast_exec_ctx;
+    MaterializationSharedState* _shared_state;
+    MonotonicStopWatch& _rpc_timer;
+};
+
+Status MaterializationSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,
+                                          bool eos) {
+    auto& local_state = get_local_state(state);
+    SCOPED_TIMER(local_state.exec_time_counter());
+    COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
+    if (!local_state._shared_state->rpc_struct_inited) {
+        RETURN_IF_ERROR(
+                
local_state._shared_state->init_multi_requests(_materialization_node, state));
+    }
+
+    if (in_block->rows() > 0 || eos) {
+        // block the pipeline wait the rpc response
+        if (!eos) {
+            local_state._shared_state->sink_deps.back()->block();
+        }
+        // execute the rowid exprs
+        vectorized::Columns columns;
+        if (in_block->rows() != 0) {
+            local_state._shared_state->rowid_locs.resize(_rowid_exprs.size());
+            for (int i = 0; i < _rowid_exprs.size(); ++i) {
+                auto& rowid_expr = _rowid_exprs[i];
+                RETURN_IF_ERROR(
+                        rowid_expr->execute(in_block, 
&local_state._shared_state->rowid_locs[i]));
+                columns.emplace_back(
+                        
in_block->get_by_position(local_state._shared_state->rowid_locs[i]).column);
+            }
+            local_state._shared_state->origin_block.swap(*in_block);
+        }
+        RETURN_IF_ERROR(
+                local_state._shared_state->create_muiltget_result(columns, 
eos, _gc_id_map));
+
+        for (auto& [backend_id, rpc_struct] : 
local_state._shared_state->rpc_struct_map) {
+            auto callback = 
MaterializationCallback<PMultiGetResponseV2>::create_shared(
+                    state->get_task_execution_context(), 
local_state._shared_state,
+                    rpc_struct.rpc_timer);
+            callback->cntl_->set_timeout_ms(config::fetch_rpc_timeout_seconds 
* 1000);
+            auto closure =
+                    AutoReleaseClosure<int, 
::doris::DummyBrpcCallback<PMultiGetResponseV2>>::
+                            create_unique(
+                                    std::make_shared<int>(), callback, 
state->get_query_ctx_weak(),
+                                    "Materialization Sink node id:" + 
std::to_string(node_id()) +
+                                            " target_backend_id:" + 
std::to_string(backend_id));
+            // send brpc request
+            rpc_struct.callback = callback;
+            rpc_struct.rpc_timer.start();
+            rpc_struct.stub->multiget_data_v2(callback->cntl_.get(), 
&rpc_struct.request,
+                                              callback->response_.get(), 
closure.release());
+        }
+    }
+
+    return Status::OK();
+}
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/materialization_sink_operator.h 
b/be/src/pipeline/exec/materialization_sink_operator.h
new file mode 100644
index 00000000000..813d12e017d
--- /dev/null
+++ b/be/src/pipeline/exec/materialization_sink_operator.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace pipeline {
+
+class MaterializationSinkOperatorX;
+class MaterializationSinkLocalState final
+        : public PipelineXSinkLocalState<MaterializationSharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(MaterializationSinkLocalState);
+    MaterializationSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state)
+            : Base(parent, state) {}
+
+private:
+    friend class MaterializationSinkOperatorX;
+    using Base = PipelineXSinkLocalState<MaterializationSharedState>;
+    using Parent = MaterializationSinkOperatorX;
+};
+
+class MaterializationSinkOperatorX final : public 
DataSinkOperatorX<MaterializationSinkLocalState> {
+public:
+    using Base = DataSinkOperatorX<MaterializationSinkLocalState>;
+
+    friend class MaterializationSinkLocalState;
+    MaterializationSinkOperatorX(int child_id, int sink_id, ObjectPool* pool,
+                                 const TPlanNode& tnode)
+            : Base(sink_id, tnode.node_id, child_id) {
+        _name = "MATERIALIZATION_SINK_OPERATOR";
+    }
+    ~MaterializationSinkOperatorX() override = default;
+
+    Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
+    Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
+
+private:
+    // Materialized slot by this node. The i-th result expr list refers to a 
slot of RowId
+    TMaterializationNode _materialization_node;
+    vectorized::VExprContextSPtrs _rowid_exprs;
+    bool _gc_id_map = false;
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/materialization_source_operator.cpp 
b/be/src/pipeline/exec/materialization_source_operator.cpp
new file mode 100644
index 00000000000..20e7c6351d9
--- /dev/null
+++ b/be/src/pipeline/exec/materialization_source_operator.cpp
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/materialization_source_operator.h"
+
+#include <utility>
+
+#include "common/status.h"
+#include "vec/core/block.h"
+
+namespace doris::pipeline {
+
+Status MaterializationSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
+                                                 bool* eos) {
+    auto& local_state = get_local_state(state);
+    SCOPED_TIMER(local_state.exec_time_counter());
+
+    if (!local_state._shared_state->rpc_status.ok()) {
+        return local_state._shared_state->rpc_status;
+    }
+
+    // clear origin block, do merge response to build a ret block
+    block->clear();
+    if (local_state._shared_state->need_merge_block) {
+        SCOPED_TIMER(local_state._merge_response_timer);
+        
RETURN_IF_ERROR(local_state._shared_state->merge_multi_response(block));
+    }
+
+    *eos = local_state._shared_state->last_block;
+    if (!*eos) {
+        local_state._shared_state->sink_deps.back()->ready();
+        
((CountedFinishDependency*)(local_state._shared_state->source_deps.back().get()))
+                ->add(local_state._shared_state->rpc_struct_map.size());
+    } else {
+        uint64_t max_rpc_time = 0;
+        for (auto& [_, rpc_struct] : 
local_state._shared_state->rpc_struct_map) {
+            max_rpc_time = std::max(max_rpc_time, 
rpc_struct.rpc_timer.elapsed_time());
+        }
+        COUNTER_SET(local_state._max_rpc_timer, (int64_t)max_rpc_time);
+    }
+
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/materialization_source_operator.h 
b/be/src/pipeline/exec/materialization_source_operator.h
new file mode 100644
index 00000000000..0c6a8b91047
--- /dev/null
+++ b/be/src/pipeline/exec/materialization_source_operator.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "common/status.h"
+#include "operator.h"
+#include "vec/core/block.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class RuntimeState;
+
+namespace pipeline {
+
+class MaterializationSourceOperatorX;
+class MaterializationSourceLocalState final
+        : public PipelineXLocalState<MaterializationSharedState> {
+public:
+    ENABLE_FACTORY_CREATOR(MaterializationSourceLocalState);
+    using Base = PipelineXLocalState<MaterializationSharedState>;
+    using Parent = MaterializationSourceOperatorX;
+    MaterializationSourceLocalState(RuntimeState* state, OperatorXBase* parent)
+            : Base(state, parent) {};
+
+    Status init(doris::RuntimeState* state, doris::pipeline::LocalStateInfo& 
info) override {
+        RETURN_IF_ERROR(Base::init(state, info));
+        _max_rpc_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "MaxRpcTime", 
2);
+        _merge_response_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, 
"MergeResponseTime", 2);
+        return Status::OK();
+    }
+
+private:
+    RuntimeProfile::Counter* _max_rpc_timer = nullptr;
+    RuntimeProfile::Counter* _merge_response_timer = nullptr;
+
+    friend class MaterializationSourceOperatorX;
+    friend class OperatorX<MaterializationSourceLocalState>;
+};
+
+class MaterializationSourceOperatorX final : public 
OperatorX<MaterializationSourceLocalState> {
+public:
+    using Base = OperatorX<MaterializationSourceLocalState>;
+    MaterializationSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, 
const int operator_id,
+                                   const DescriptorTbl& descs)
+            : Base(pool, tnode, operator_id, descs) {};
+    ~MaterializationSourceOperatorX() override = default;
+
+    Status get_block(doris::RuntimeState* state, vectorized::Block* block, 
bool* eos) override;
+
+    bool is_source() const override { return true; }
+};
+
+} // namespace pipeline
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index d2d0d6a6827..a2f708392af 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -41,6 +41,8 @@
 #include "pipeline/exec/iceberg_table_sink_operator.h"
 #include "pipeline/exec/jdbc_scan_operator.h"
 #include "pipeline/exec/jdbc_table_sink_operator.h"
+#include "pipeline/exec/materialization_sink_operator.h"
+#include "pipeline/exec/materialization_source_operator.h"
 #include "pipeline/exec/memory_scratch_sink_operator.h"
 #include "pipeline/exec/meta_scan_operator.h"
 #include "pipeline/exec/mock_operator.h"
@@ -706,6 +708,7 @@ DECLARE_OPERATOR(SetSinkLocalState<false>)
 DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
 DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
 DECLARE_OPERATOR(CacheSinkLocalState)
+DECLARE_OPERATOR(MaterializationSinkLocalState)
 
 #undef DECLARE_OPERATOR
 
@@ -738,6 +741,7 @@ DECLARE_OPERATOR(MetaScanLocalState)
 DECLARE_OPERATOR(LocalExchangeSourceLocalState)
 DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
 DECLARE_OPERATOR(CacheSourceLocalState)
+DECLARE_OPERATOR(MaterializationSourceLocalState)
 
 #ifdef BE_TEST
 DECLARE_OPERATOR(MockLocalState)
@@ -770,7 +774,7 @@ template class 
PipelineXSinkLocalState<MultiCastSharedState>;
 template class PipelineXSinkLocalState<SetSharedState>;
 template class PipelineXSinkLocalState<LocalExchangeSharedState>;
 template class PipelineXSinkLocalState<BasicSharedState>;
-template class PipelineXSinkLocalState<CacheSharedState>;
+template class PipelineXSinkLocalState<DataQueueSharedState>;
 
 template class PipelineXLocalState<HashJoinSharedState>;
 template class PipelineXLocalState<PartitionedHashJoinSharedState>;
@@ -782,7 +786,7 @@ template class PipelineXLocalState<AggSharedState>;
 template class PipelineXLocalState<PartitionedAggSharedState>;
 template class PipelineXLocalState<FakeSharedState>;
 template class PipelineXLocalState<UnionSharedState>;
-template class PipelineXLocalState<CacheSharedState>;
+template class PipelineXLocalState<DataQueueSharedState>;
 template class PipelineXLocalState<MultiCastSharedState>;
 template class PipelineXLocalState<PartitionSortNodeSharedState>;
 template class PipelineXLocalState<SetSharedState>;
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index ec283f46683..c8b19e3e1fc 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -102,6 +102,15 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
     }
     RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
     auto& p = _parent->cast<typename Derived::Parent>();
+
+    // init id_file_map() for runtime state
+    std::vector<SlotDescriptor*> slots = p._output_tuple_desc->slots();
+    for (auto slot : slots) {
+        if (slot->col_name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
+            state->set_id_file_map();
+        }
+    }
+
     _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
     for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) {
         RETURN_IF_ERROR(
@@ -1002,6 +1011,7 @@ Status ScanLocalState<Derived>::_start_scanners(
     // https://github.com/apache/doris/pull/44635
     const int parallism_of_scan_operator =
             p.is_serial_operator() ? 1 : p.query_parallel_instance_num();
+
     _scanner_ctx = vectorized::ScannerContext::create_shared(
             state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
             _scan_dependency, parallism_of_scan_operator);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 388b3102a21..2d8160d3e1e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -63,6 +63,8 @@
 #include "pipeline/exec/iceberg_table_sink_operator.h"
 #include "pipeline/exec/jdbc_scan_operator.h"
 #include "pipeline/exec/jdbc_table_sink_operator.h"
+#include "pipeline/exec/materialization_sink_operator.h"
+#include "pipeline/exec/materialization_source_operator.h"
 #include "pipeline/exec/memory_scratch_sink_operator.h"
 #include "pipeline/exec/meta_scan_operator.h"
 #include "pipeline/exec/multi_cast_data_stream_sink.h"
@@ -1584,6 +1586,25 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
         break;
     }
+    case TPlanNodeType::MATERIALIZATION_NODE: {
+        op.reset(new MaterializationSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
+
+        const auto downstream_pipeline_id = cur_pipe->id();
+        if (_dag.find(downstream_pipeline_id) == _dag.end()) {
+            _dag.insert({downstream_pipeline_id, {}});
+        }
+        auto new_pipe = add_pipeline(cur_pipe);
+        _dag[downstream_pipeline_id].push_back(new_pipe->id());
+
+        DataSinkOperatorPtr sink(new MaterializationSinkOperatorX(
+                op->operator_id(), next_sink_operator_id(), pool, tnode));
+        RETURN_IF_ERROR(new_pipe->set_sink(sink));
+        RETURN_IF_ERROR(new_pipe->sink()->init(tnode, _runtime_state.get()));
+        cur_pipe = new_pipe;
+        break;
+    }
     case TPlanNodeType::INTERSECT_NODE: {
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
                 pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, 
request));
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 1fefd9de642..b760e13f7c3 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -115,6 +115,7 @@ class LookupConnectionCache;
 class RowCache;
 class DummyLRUCache;
 class CacheManager;
+class IdManager;
 class ProcessProfile;
 class HeapProfiler;
 class WalManager;
@@ -330,6 +331,7 @@ public:
     LookupConnectionCache* get_lookup_connection_cache() { return 
_lookup_connection_cache; }
     RowCache* get_row_cache() { return _row_cache; }
     CacheManager* get_cache_manager() { return _cache_manager; }
+    IdManager* get_id_manager() { return _id_manager; }
     ProcessProfile* get_process_profile() { return _process_profile; }
     HeapProfiler* get_heap_profiler() { return _heap_profiler; }
     segment_v2::InvertedIndexSearcherCache* 
get_inverted_index_searcher_cache() {
@@ -476,6 +478,7 @@ private:
     LookupConnectionCache* _lookup_connection_cache = nullptr;
     RowCache* _row_cache = nullptr;
     CacheManager* _cache_manager = nullptr;
+    IdManager* _id_manager = nullptr;
     ProcessProfile* _process_profile = nullptr;
     HeapProfiler* _heap_profiler = nullptr;
     segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = 
nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 44aee2fde91..1f1ca3284ff 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -48,6 +48,7 @@
 #include "io/cache/fs_file_cache_storage.h"
 #include "io/fs/file_meta_cache.h"
 #include "io/fs/local_file_reader.h"
+#include "olap/id_manager.h"
 #include "olap/memtable_memory_limiter.h"
 #include "olap/olap_define.h"
 #include "olap/options.h"
@@ -457,6 +458,7 @@ Status ExecEnv::_init_mem_env() {
         return Status::InternalError(ss.str());
     }
 
+    _id_manager = new IdManager();
     _cache_manager = CacheManager::create_global_instance();
 
     int64_t storage_cache_limit =
@@ -801,6 +803,7 @@ void ExecEnv::destroy() {
     // cache_manager must be destoried after all cache.
     // https://github.com/apache/doris/issues/24082#issuecomment-1712544039
     SAFE_DELETE(_cache_manager);
+    SAFE_DELETE(_id_manager);
 
     // _heartbeat_flags must be destoried after staroge engine
     SAFE_DELETE(_heartbeat_flags);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index fc3d80cdbe0..14d2c5c26f2 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -32,7 +32,6 @@
 #include "common/config.h"
 #include "common/factory_creator.h"
 #include "common/object_pool.h"
-#include "pipeline/dependency.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/runtime_filter_mgr.h"
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 712e18159fd..d3b42f77d7d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -36,6 +36,7 @@
 #include "common/object_pool.h"
 #include "common/status.h"
 #include "io/fs/s3_file_system.h"
+#include "olap/id_manager.h"
 #include "olap/storage_engine.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/pipeline_task.h"
@@ -553,4 +554,8 @@ bool RuntimeState::low_memory_mode() const {
     return _query_ctx->low_memory_mode();
 }
 
+void RuntimeState::set_id_file_map() {
+    _id_file_map = _exec_env->get_id_manager()->add_id_file_map(_query_id, 
execution_timeout());
+}
+
 } // end namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index aa486d3b8b6..17d4126702c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -70,6 +70,7 @@ class Dependency;
 class DescriptorTbl;
 class ObjectPool;
 class ExecEnv;
+class IdFileMap;
 class RuntimeFilterMgr;
 class MemTrackerLimiter;
 class QueryContext;
@@ -661,6 +662,10 @@ public:
 
     int profile_level() const { return _profile_level; }
 
+    std::shared_ptr<IdFileMap>& get_id_file_map() { return _id_file_map; }
+
+    void set_id_file_map();
+
 private:
     Status create_error_log_file();
 
@@ -784,6 +789,9 @@ private:
     // error file path on s3, 
${bucket}/${prefix}/error_log/${label}_${fragment_instance_id}
     std::string _s3_error_log_file_path;
     std::mutex _s3_error_log_file_lock;
+
+    // used for encoding the global lazy materialize
+    std::shared_ptr<IdFileMap> _id_file_map = nullptr;
 };
 
 #define RETURN_IF_CANCELLED(state)               \
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 5f5b944f8e6..9d9e509c5cc 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -227,6 +227,11 @@ public:
 
     int64_t free_overcommited_memory(int64_t need_free_mem, RuntimeProfile* 
profile);
 
+    vectorized::SimplifiedScanScheduler* get_remote_scan_task_scheduler() {
+        std::shared_lock<std::shared_mutex> r_lock(_mutex);
+        return _remote_scan_task_sched.get();
+    }
+
 private:
     void create_cgroup_cpu_ctl_no_lock();
     void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
diff --git a/be/src/service/backend_options.h b/be/src/service/backend_options.h
index 0052eb41530..543863a6309 100644
--- a/be/src/service/backend_options.h
+++ b/be/src/service/backend_options.h
@@ -37,6 +37,7 @@ public:
     static std::string get_be_endpoint();
     static TBackend get_local_backend();
     static void set_backend_id(int64_t backend_id);
+    static int64_t get_backend_id() { return _s_backend_id; }
     static void set_localhost(const std::string& host);
     static bool is_bind_ipv6();
     static const char* get_service_bind_address();
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 19a0e88fe8f..b2029f0870d 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -105,6 +105,8 @@
 #include "runtime/stream_load/stream_load_context.h"
 #include "runtime/thread_context.h"
 #include "runtime/types.h"
+#include "runtime/workload_group/workload_group.h"
+#include "runtime/workload_group/workload_group_manager.h"
 #include "service/backend_options.h"
 #include "service/point_query_executor.h"
 #include "util/arrow/row_batch.h"
@@ -2056,6 +2058,42 @@ void 
PInternalService::multiget_data(google::protobuf::RpcController* controller
     }
 }
 
+void PInternalService::multiget_data_v2(google::protobuf::RpcController* 
controller,
+                                        const PMultiGetRequestV2* request,
+                                        PMultiGetResponseV2* response,
+                                        google::protobuf::Closure* done) {
+    auto wg = 
ExecEnv::GetInstance()->workload_group_mgr()->get_group(request->wg_id());
+    Status st = Status::OK();
+
+    if (!wg) [[unlikely]] {
+        brpc::ClosureGuard closure_guard(done);
+        st = Status::Error<TStatusCode::CANCELLED>("fail to find wg: wg id:" +
+                                                   
std::to_string(request->wg_id()));
+        st.to_protobuf(response->mutable_status());
+        return;
+    }
+
+    st = 
wg->get_remote_scan_task_scheduler()->submit_scan_task(vectorized::SimplifiedScanTask(
+            [request, response, done]() {
+                signal::set_signal_task_id(request->query_id());
+                // multi get data by rowid
+                MonotonicStopWatch watch;
+                watch.start();
+                brpc::ClosureGuard closure_guard(done);
+                response->mutable_status()->set_status_code(0);
+                
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->rowid_storage_reader_tracker());
+                Status st = RowIdStorageReader::read_by_rowids(*request, 
response);
+                st.to_protobuf(response->mutable_status());
+                LOG(INFO) << "multiget_data finished, cost(us):" << 
watch.elapsed_time() / 1000;
+            },
+            nullptr));
+
+    if (!st.ok()) {
+        brpc::ClosureGuard closure_guard(done);
+        st.to_protobuf(response->mutable_status());
+    }
+}
+
 void 
PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcController*
 cntl_base,
                                                       const 
PGetTabletVersionsRequest* request,
                                                       
PGetTabletVersionsResponse* response,
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index e3d03a6a449..6ce0ae868c1 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -211,6 +211,10 @@ public:
     void multiget_data(google::protobuf::RpcController* controller, const 
PMultiGetRequest* request,
                        PMultiGetResponse* response, google::protobuf::Closure* 
done) override;
 
+    void multiget_data_v2(google::protobuf::RpcController* controller,
+                          const PMultiGetRequestV2* request, 
PMultiGetResponseV2* response,
+                          google::protobuf::Closure* done) override;
+
     void tablet_fetch_data(google::protobuf::RpcController* controller,
                            const PTabletKeyLookupRequest* request,
                            PTabletKeyLookupResponse* response,
diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h
index 560aebb98ee..c1be06c691d 100644
--- a/be/src/util/ref_count_closure.h
+++ b/be/src/util/ref_count_closure.h
@@ -25,7 +25,6 @@
 #include "runtime/query_context.h"
 #include "runtime/thread_context.h"
 #include "service/brpc.h"
-#include "util/ref_count_closure.h"
 
 namespace doris {
 
@@ -82,7 +81,7 @@ class AutoReleaseClosure : public google::protobuf::Closure {
 
 public:
     AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> 
callback,
-                       std::weak_ptr<QueryContext> context = {})
+                       std::weak_ptr<QueryContext> context = {}, 
std::string_view error_msg = {})
             : request_(req), callback_(callback), context_(std::move(context)) 
{
         this->cntl_ = callback->cntl_;
         this->response_ = callback->response_;
@@ -113,10 +112,12 @@ public:
     // at any stage.
     std::shared_ptr<Request> request_;
     std::shared_ptr<ResponseType> response_;
+    std::string error_msg_;
 
 protected:
     virtual void _process_if_rpc_failed() {
-        std::string error_msg = "RPC meet failed: " + cntl_->ErrorText();
+        std::string error_msg =
+                fmt::format("RPC meet failed: {} {}", cntl_->ErrorText(), 
error_msg_);
         if (auto ctx = context_.lock(); ctx) {
             ctx->cancel(Status::NetworkError(error_msg));
         } else {
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 50a780e4355..9eb38675bb5 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -718,7 +718,6 @@ using ColumnPtr = IColumn::Ptr;
 using MutableColumnPtr = IColumn::MutablePtr;
 using Columns = std::vector<ColumnPtr>;
 using MutableColumns = std::vector<MutableColumnPtr>;
-using ColumnPtrs = std::vector<ColumnPtr>;
 using ColumnRawPtrs = std::vector<const IColumn*>;
 
 template <typename... Args>
diff --git a/be/src/vec/common/string_ref.cpp b/be/src/vec/common/string_ref.cpp
index 413c0338c10..e113694f34c 100644
--- a/be/src/vec/common/string_ref.cpp
+++ b/be/src/vec/common/string_ref.cpp
@@ -69,11 +69,14 @@ bool StringRef::end_with(char ch) const {
 }
 
 bool StringRef::start_with(const StringRef& search_string) const {
-    DCHECK(size >= search_string.size);
     if (search_string.size == 0) {
         return true;
     }
 
+    if (UNLIKELY(size < search_string.size)) {
+        return false;
+    }
+
 #if defined(__SSE2__) || defined(__aarch64__)
     return memequalSSE2Wide(data, search_string.data, search_string.size);
 #else
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 6bae4b3319e..344136bbc28 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -41,6 +41,7 @@
 #include "exprs/function_filter.h"
 #include "io/cache/block_file_cache_profile.h"
 #include "io/io_common.h"
+#include "olap/id_manager.h"
 #include "olap/inverted_index_profile.h"
 #include "olap/olap_common.h"
 #include "olap/olap_tuple.h"
@@ -410,6 +411,13 @@ Status NewOlapScanner::_init_tablet_reader_params(
         }
     }
 
+    if (tablet_schema->has_global_row_id()) {
+        auto& id_file_map = _state->get_id_file_map();
+        for (auto rs_reader : _tablet_reader_params.rs_splits) {
+            id_file_map->add_temp_rowset(rs_reader.rs_reader->rowset());
+        }
+    }
+
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index fe0c7315c5d..11819d1d75d 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -345,8 +345,9 @@ void VFileScanner::_get_slot_ids(VExpr* expr, 
std::vector<int>* slot_ids) {
         if (child_expr->is_slot_ref()) {
             VSlotRef* slot_ref = reinterpret_cast<VSlotRef*>(child_expr.get());
             slot_ids->emplace_back(slot_ref->slot_id());
+        } else {
+            _get_slot_ids(child_expr.get(), slot_ids);
         }
-        _get_slot_ids(child_expr.get(), slot_ids);
     }
 }
 
diff --git a/be/test/exec/hash_map/hash_table_method_test.cpp 
b/be/test/exec/hash_map/hash_table_method_test.cpp
index 31a18421123..5ca6a6cca36 100644
--- a/be/test/exec/hash_map/hash_table_method_test.cpp
+++ b/be/test/exec/hash_map/hash_table_method_test.cpp
@@ -27,7 +27,7 @@
 namespace doris::vectorized {
 
 template <typename HashMethodType>
-void test_insert(HashMethodType& method, ColumnPtrs column) {
+void test_insert(HashMethodType& method, Columns column) {
     using State = typename HashMethodType::State;
     ColumnRawPtrs key_raw_columns;
     for (auto column : column) {
@@ -49,8 +49,7 @@ void test_insert(HashMethodType& method, ColumnPtrs column) {
 }
 
 template <typename HashMethodType>
-void test_find(HashMethodType& method, ColumnPtrs column,
-               const std::vector<int64_t>& except_result) {
+void test_find(HashMethodType& method, Columns column, const 
std::vector<int64_t>& except_result) {
     using State = typename HashMethodType::State;
     ColumnRawPtrs key_raw_columns;
     for (auto column : column) {
diff --git a/be/test/olap/id_manager_test.cpp b/be/test/olap/id_manager_test.cpp
new file mode 100644
index 00000000000..12488b163f7
--- /dev/null
+++ b/be/test/olap/id_manager_test.cpp
@@ -0,0 +1,107 @@
+#include "olap/id_manager.h"
+
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <memory>
+#include <thread>
+#include <vector>
+
+#include "olap/olap_common.h"
+
+using namespace doris;
+
+TEST(IdFileMapTest, BasicOperations) {
+    IdFileMap id_file_map(1024);
+
+    // Test adding a file mapping
+    auto mapping1 =
+            std::make_shared<FileMapping>(FileMapping 
{FileMappingType::DORIS_FORMAT, "file1"});
+    uint32_t id1 = id_file_map.get_file_mapping_id(mapping1);
+    EXPECT_EQ(id1, 0);
+
+    auto mapping2 = std::make_shared<FileMapping>(FileMapping 
{FileMappingType::ORC, "file2"});
+    uint32_t id2 = id_file_map.get_file_mapping_id(mapping2);
+    EXPECT_EQ(id2, 1);
+
+    // Test getting a file mapping
+    auto retrieved_mapping1 = id_file_map.get_file_mapping(id1);
+    EXPECT_EQ(retrieved_mapping1->type, FileMappingType::DORIS_FORMAT);
+    EXPECT_EQ(retrieved_mapping1->value, "file1");
+
+    auto retrieved_mapping2 = id_file_map.get_file_mapping(id2);
+    EXPECT_EQ(retrieved_mapping2->type, FileMappingType::ORC);
+    EXPECT_EQ(retrieved_mapping2->value, "file2");
+
+    // Test getting a non-existent file mapping
+    auto retrieved_mapping3 = id_file_map.get_file_mapping(999);
+    EXPECT_EQ(retrieved_mapping3, nullptr);
+}
+
+TEST(IdFileMapTest, ConcurrentAddAndGet) {
+    IdFileMap id_file_map(1024);
+    std::vector<std::thread> threads;
+    std::atomic<int> counter(0);
+
+    for (int i = 0; i < 10; ++i) {
+        threads.emplace_back([&]() {
+            for (int j = 0; j < 100; ++j) {
+                auto mapping = std::make_shared<FileMapping>(FileMapping {
+                        FileMappingType::DORIS_FORMAT, "file" + 
std::to_string(counter++)});
+                uint32_t id = id_file_map.get_file_mapping_id(mapping);
+                auto retrieved_mapping = id_file_map.get_file_mapping(id);
+                EXPECT_EQ(retrieved_mapping->type, mapping->type);
+                EXPECT_EQ(retrieved_mapping->value, mapping->value);
+            }
+        });
+    }
+
+    for (auto& thread : threads) {
+        thread.join();
+    }
+}
+
+TEST(IdManagerTest, BasicOperations) {
+    IdManager id_manager;
+
+    // Test adding an IdFileMap
+    UniqueId query_id1 = UniqueId::gen_uid();
+    auto id_file_map1 = id_manager.add_id_file_map(query_id1, 1024);
+    EXPECT_NE(id_file_map1, nullptr);
+
+    UniqueId query_id2 = UniqueId::gen_uid();
+    auto id_file_map2 = id_manager.add_id_file_map(query_id2, 1024);
+    EXPECT_NE(id_file_map2, nullptr);
+
+    // Test getting an existing IdFileMap
+    auto retrieved_id_file_map1 = id_manager.add_id_file_map(query_id1, 1024);
+    EXPECT_EQ(retrieved_id_file_map1, id_file_map1);
+
+    // Test removing an IdFileMap
+    id_manager.remove_id_file_map(query_id1);
+    auto retrieved_id_file_map2 = id_manager.add_id_file_map(query_id1, 1024);
+    EXPECT_NE(retrieved_id_file_map2, id_file_map1);
+}
+
+TEST(IdManagerTest, ConcurrentAddAndRemove) {
+    IdManager id_manager;
+    std::vector<std::thread> threads;
+
+    for (int i = 0; i < 10; ++i) {
+        threads.emplace_back([&]() {
+            for (int j = 0; j < 10; ++j) {
+                UniqueId query_id = UniqueId::gen_uid();
+                auto id_file_map = id_manager.add_id_file_map(query_id, 1024);
+                EXPECT_NE(id_file_map, nullptr);
+
+                id_manager.remove_id_file_map(query_id);
+                auto retrieved_id_file_map = 
id_manager.add_id_file_map(query_id, 1024);
+                EXPECT_NE(retrieved_id_file_map, id_file_map);
+            }
+        });
+    }
+
+    for (auto& thread : threads) {
+        thread.join();
+    }
+}
diff --git a/be/test/pipeline/operator/materialization_shared_state_test.cpp 
b/be/test/pipeline/operator/materialization_shared_state_test.cpp
new file mode 100644
index 00000000000..6d3a6f28ffa
--- /dev/null
+++ b/be/test/pipeline/operator/materialization_shared_state_test.cpp
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "pipeline/dependency.h"
+#include "vec/columns/column_vector.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_string.h"
+
+namespace doris::pipeline {
+
+class MaterializationSharedStateTest : public testing::Test {
+protected:
+    void SetUp() override {
+        _shared_state = std::make_shared<MaterializationSharedState>();
+
+        // Setup test data types
+        _string_type = std::make_shared<vectorized::DataTypeString>();
+        _int_type = std::make_shared<vectorized::DataTypeInt32>();
+
+        // Create origin block with rowid column (ColumnString type)
+        _shared_state->origin_block = vectorized::Block();
+        _shared_state->origin_block.insert({_string_type->create_column(), 
_string_type, "rowid"});
+        _shared_state->origin_block.insert({_int_type->create_column(), 
_int_type, "value"});
+
+        // Add rowid location
+        _shared_state->rowid_locs = {0}; // First column is rowid
+
+        // Setup RPC structs for two backends
+        _backend_id1 = 1001;
+        _backend_id2 = 1002;
+        _shared_state->rpc_struct_map[_backend_id1] = FetchRpcStruct();
+        _shared_state->rpc_struct_map[_backend_id2] = FetchRpcStruct();
+        
_shared_state->rpc_struct_map[_backend_id1].request.add_request_block_descs();
+        
_shared_state->rpc_struct_map[_backend_id2].request.add_request_block_descs();
+    }
+
+    std::shared_ptr<MaterializationSharedState> _shared_state;
+    std::shared_ptr<vectorized::DataTypeString> _string_type;
+    std::shared_ptr<vectorized::DataTypeInt32> _int_type;
+    int64_t _backend_id1;
+    int64_t _backend_id2;
+};
+
+TEST_F(MaterializationSharedStateTest, TestCreateSourceDependency) {
+    // Test creating source dependencies
+    int test_op_id = 100;
+    int test_node_id = 200;
+    std::string test_name = "TEST";
+
+    auto* dep = _shared_state->create_source_dependency(test_op_id, 
test_node_id, test_name);
+
+    // Verify the dependency was created correctly
+    ASSERT_NE(dep, nullptr);
+    EXPECT_EQ(dep->id(), test_op_id);
+    EXPECT_EQ(dep->name(), test_name + "_DEPENDENCY");
+
+    // Verify it was added to source_deps
+    EXPECT_EQ(_shared_state->source_deps.size(), 1);
+    EXPECT_EQ(_shared_state->source_deps[0].get(), dep);
+}
+
+TEST_F(MaterializationSharedStateTest, TestCreateMultiGetResult) {
+    // Create test columns for rowids
+    vectorized::Columns columns;
+    auto rowid_col = _string_type->create_column();
+    auto* col_data = 
reinterpret_cast<vectorized::ColumnString*>(rowid_col.get());
+
+    // Create test GlobalRowLoacationV2 data
+    GlobalRowLoacationV2 loc1(0, _backend_id1, 1, 1);
+    GlobalRowLoacationV2 loc2(0, _backend_id2, 2, 2);
+
+    col_data->insert_data(reinterpret_cast<const char*>(&loc1), 
sizeof(GlobalRowLoacationV2));
+    col_data->insert_data(reinterpret_cast<const char*>(&loc2), 
sizeof(GlobalRowLoacationV2));
+    columns.push_back(std::move(rowid_col));
+
+    // Test creating multiget result
+    Status st = _shared_state->create_muiltget_result(columns, true, true);
+    EXPECT_TRUE(st.ok());
+
+    // Verify block_order_results
+    EXPECT_EQ(_shared_state->block_order_results.size(), columns.size());
+    EXPECT_EQ(_shared_state->last_block, true);
+}
+
+TEST_F(MaterializationSharedStateTest, TestMergeMultiResponse) {
+    // 1. Setup origin block with nullable rowid column
+    auto nullable_rowid_col = 
vectorized::ColumnNullable::create(_string_type->create_column(),
+                                                                 
vectorized::ColumnUInt8::create());
+    nullable_rowid_col->insert_data((char*)&nullable_rowid_col, 4);
+    nullable_rowid_col->insert_data(nullptr, 4);
+    nullable_rowid_col->insert_data((char*)&nullable_rowid_col, 4);
+
+    auto value_col = _int_type->create_column();
+    value_col->insert(100);
+    value_col->insert(101);
+    value_col->insert(200);
+
+    // Add test data to origin block
+    _shared_state->origin_block = vectorized::Block(
+            {{std::move(nullable_rowid_col), 
vectorized::make_nullable(_string_type), "rowid"},
+             {std::move(value_col), _int_type, "value"}});
+
+    // Set rowid column location
+    _shared_state->rowid_locs = {0};
+
+    // 2. Setup response blocks from multiple backends
+    // Backend 1's response
+    {
+        vectorized::Block resp_block1;
+        auto resp_value_col1 = _int_type->create_column();
+        auto* value_col_data1 =
+                
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col1.get());
+        value_col_data1->insert(100);
+        value_col_data1->insert(101);
+        resp_block1.insert(
+                {make_nullable(std::move(resp_value_col1)), 
make_nullable(_int_type), "value"});
+
+        auto callback1 = 
std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>();
+        callback1->response_.reset(new PMultiGetResponseV2());
+        auto serialized_block = 
callback1->response_->add_blocks()->mutable_block();
+        size_t uncompressed_size = 0;
+        size_t compressed_size = 0;
+        auto s = resp_block1.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
+                                       CompressionTypePB::LZ4);
+        EXPECT_TRUE(s.ok());
+
+        _shared_state->rpc_struct_map[_backend_id1].callback = callback1;
+    }
+
+    // Backend 2's response
+    {
+        vectorized::Block resp_block2;
+        auto resp_value_col2 = _int_type->create_column();
+        auto* value_col_data2 =
+                
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col2.get());
+        value_col_data2->insert(200);
+        resp_block2.insert(
+                {make_nullable(std::move(resp_value_col2)), 
make_nullable(_int_type), "value"});
+
+        auto callback2 = 
std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>();
+        callback2->response_.reset(new PMultiGetResponseV2());
+        auto serialized_block = 
callback2->response_->add_blocks()->mutable_block();
+
+        size_t uncompressed_size = 0;
+        size_t compressed_size = 0;
+        auto s = resp_block2.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
+                                       CompressionTypePB::LZ4);
+        EXPECT_TRUE(s.ok());
+
+        _shared_state->rpc_struct_map[_backend_id2].callback = callback2;
+    }
+
+    // 3. Setup block order results to control merge order
+    _shared_state->block_order_results = {
+            {_backend_id1, 0, _backend_id2} // First block order: BE1,BE1,BE2
+    };
+
+    // 4. Test merging responses
+    vectorized::Block result_block;
+    Status st = _shared_state->merge_multi_response(&result_block);
+    EXPECT_TRUE(st.ok());
+
+    // 5. Verify merged result
+    EXPECT_EQ(result_block.columns(), 2); // Should have original rowid column 
and value column
+    EXPECT_EQ(result_block.rows(), 3);    // Total 3 rows from both backends
+
+    // Verify the value column data is merged in correct order
+    auto* merged_value_col = result_block.get_by_position(0).column.get();
+    EXPECT_EQ(*((int*)merged_value_col->get_data_at(0).data), 100); // First 
value from BE1
+    EXPECT_EQ(merged_value_col->get_data_at(1).data,
+              nullptr); // Second value from BE1, replace by null
+    EXPECT_EQ(*((int*)merged_value_col->get_data_at(2).data), 200); // Third 
value from BE2
+}
+
+TEST_F(MaterializationSharedStateTest, TestMergeMultiResponseMultiBlocks) {
+    // 1. Setup origin block with multiple nullable rowid columns
+    auto nullable_rowid_col1 = vectorized::ColumnNullable::create(
+            _string_type->create_column(), vectorized::ColumnUInt8::create());
+    nullable_rowid_col1->insert_data((char*)&nullable_rowid_col1, 4);
+    nullable_rowid_col1->insert_data(nullptr, 4);
+    nullable_rowid_col1->insert_data((char*)&nullable_rowid_col1, 4);
+
+    auto nullable_rowid_col2 = vectorized::ColumnNullable::create(
+            _string_type->create_column(), vectorized::ColumnUInt8::create());
+    nullable_rowid_col2->insert_data((char*)&nullable_rowid_col2, 4);
+    nullable_rowid_col2->insert_data((char*)&nullable_rowid_col2, 4);
+    nullable_rowid_col2->insert_data(nullptr, 4);
+
+    auto value_col1 = _int_type->create_column();
+    value_col1->insert(100);
+    value_col1->insert(101);
+    value_col1->insert(102);
+
+    auto value_col2 = _int_type->create_column();
+    value_col2->insert(200);
+    value_col2->insert(201);
+    value_col2->insert(202);
+
+    // Add test data to origin block with multiple columns
+    _shared_state->origin_block = vectorized::Block(
+            {{std::move(nullable_rowid_col1), 
vectorized::make_nullable(_string_type), "rowid1"},
+             {std::move(nullable_rowid_col2), 
vectorized::make_nullable(_string_type), "rowid2"},
+             {std::move(value_col1), _int_type, "value1"},
+             {std::move(value_col2), _int_type, "value2"}});
+
+    // Set multiple rowid column locations
+    _shared_state->rowid_locs = {0, 1};
+
+    // 2. Setup response blocks from multiple backends for first rowid
+    {
+        vectorized::Block resp_block1;
+        auto resp_value_col1 = _int_type->create_column();
+        auto* value_col_data1 =
+                
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col1.get());
+        value_col_data1->insert(100);
+        resp_block1.insert(
+                {make_nullable(std::move(resp_value_col1)), 
make_nullable(_int_type), "value1"});
+
+        auto callback1 = 
std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>();
+        callback1->response_.reset(new PMultiGetResponseV2());
+        auto serialized_block = 
callback1->response_->add_blocks()->mutable_block();
+        size_t uncompressed_size = 0;
+        size_t compressed_size = 0;
+        auto s = resp_block1.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
+                                       CompressionTypePB::LZ4);
+        EXPECT_TRUE(s.ok());
+
+        _shared_state->rpc_struct_map[_backend_id1].callback = callback1;
+    }
+
+    // Backend 2's response for first rowid
+    {
+        vectorized::Block resp_block2;
+        auto resp_value_col2 = _int_type->create_column();
+        auto* value_col_data2 =
+                
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col2.get());
+        value_col_data2->insert(102);
+        resp_block2.insert(
+                {make_nullable(std::move(resp_value_col2)), 
make_nullable(_int_type), "value1"});
+
+        auto callback2 = 
std::make_shared<doris::DummyBrpcCallback<PMultiGetResponseV2>>();
+        callback2->response_.reset(new PMultiGetResponseV2());
+        auto serialized_block = 
callback2->response_->add_blocks()->mutable_block();
+        size_t uncompressed_size = 0;
+        size_t compressed_size = 0;
+        auto s = resp_block2.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
+                                       CompressionTypePB::LZ4);
+        EXPECT_TRUE(s.ok());
+
+        _shared_state->rpc_struct_map[_backend_id2].callback = callback2;
+    }
+
+    // Add second block responses for second rowid
+    {
+        vectorized::Block resp_block1;
+        auto resp_value_col1 = _int_type->create_column();
+        auto* value_col_data1 =
+                
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col1.get());
+        value_col_data1->insert(200);
+        resp_block1.insert(
+                {make_nullable(std::move(resp_value_col1)), 
make_nullable(_int_type), "value2"});
+
+        auto serialized_block = _shared_state->rpc_struct_map[_backend_id1]
+                                        .callback->response_->add_blocks()
+                                        ->mutable_block();
+        size_t uncompressed_size = 0;
+        size_t compressed_size = 0;
+        auto s = resp_block1.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
+                                       CompressionTypePB::LZ4);
+        EXPECT_TRUE(s.ok());
+    }
+
+    {
+        vectorized::Block resp_block2;
+        auto resp_value_col2 = _int_type->create_column();
+        auto* value_col_data2 =
+                
reinterpret_cast<vectorized::ColumnVector<int32_t>*>(resp_value_col2.get());
+        value_col_data2->insert(201);
+        resp_block2.insert(
+                {make_nullable(std::move(resp_value_col2)), 
make_nullable(_int_type), "value2"});
+
+        auto serialized_block = _shared_state->rpc_struct_map[_backend_id2]
+                                        .callback->response_->add_blocks()
+                                        ->mutable_block();
+        size_t uncompressed_size = 0;
+        size_t compressed_size = 0;
+        auto s = resp_block2.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
+                                       CompressionTypePB::LZ4);
+        EXPECT_TRUE(s.ok());
+    }
+
+    // 3. Setup block order results for both rowids
+    _shared_state->block_order_results = {
+            {_backend_id1, 0, _backend_id2}, // First block order: BE1,null,BE2
+            {_backend_id1, _backend_id2, 0}  // Second block order: 
BE1,BE2,null
+    };
+
+    // 4. Test merging responses
+    vectorized::Block result_block;
+    Status st = _shared_state->merge_multi_response(&result_block);
+    EXPECT_TRUE(st.ok());
+
+    // 5. Verify merged result
+    EXPECT_EQ(result_block.columns(), 4); // Should have two rowid columns and 
two value columns
+    EXPECT_EQ(result_block.rows(), 3);    // Total 3 rows from both backends
+
+    // Verify the first value column data is merged in correct order
+    auto* merged_value_col1 = result_block.get_by_position(0).column.get();
+    EXPECT_EQ(*((int*)merged_value_col1->get_data_at(0).data), 100);
+    EXPECT_EQ(merged_value_col1->get_data_at(1).data, nullptr);
+    EXPECT_EQ(*((int*)merged_value_col1->get_data_at(2).data), 102);
+
+    // Verify the second value column data is merged in correct order
+    auto* merged_value_col2 = result_block.get_by_position(1).column.get();
+    EXPECT_EQ(*((int*)merged_value_col2->get_data_at(0).data), 200);
+    EXPECT_EQ(*((int*)merged_value_col2->get_data_at(1).data), 201);
+    EXPECT_EQ(merged_value_col2->get_data_at(2).data, nullptr);
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 837d3f4a941..df406115c14 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -781,6 +781,40 @@ message PMultiGetResponse {
     repeated PRowLocation row_locs = 5;
 };
 
+// Eeach block have own schema to read
+message PRequestBlockDesc {
+    optional bool fetch_row_store = 1;
+    repeated PSlotDescriptor slots = 2;
+    repeated ColumnPB column_descs = 3;
+    repeated uint32 file_id = 4;
+    repeated uint32 row_id = 5;
+}
+
+message PMultiGetRequestV2 {
+    repeated PRequestBlockDesc request_block_descs = 1;
+
+    // for compability
+    optional int32 be_exec_version = 2;
+    optional PUniqueId query_id = 3;
+    optional bool gc_id_map = 4;
+    optional uint64 wg_id = 5;
+};
+
+message PMultiGetBlockV2 {
+    optional PBlock block = 1;
+    // more effecient serialization fields for row store
+    enum RowFormat {
+        JSONB = 0;
+    };
+    optional RowFormat format = 2;
+    repeated bytes binary_row_data = 3;
+}
+
+message PMultiGetResponseV2 {
+    optional PStatus status = 1;
+    repeated PMultiGetBlockV2 blocks = 2;
+};
+
 message PFetchColIdsRequest {
     message PFetchColIdParam {
         required int64 indexId = 1;
@@ -1026,6 +1060,7 @@ service PBackendService {
     rpc outfile_write_success(POutfileWriteSuccessRequest) returns 
(POutfileWriteSuccessResult);
     rpc fetch_table_schema(PFetchTableSchemaRequest) returns 
(PFetchTableSchemaResult);
     rpc multiget_data(PMultiGetRequest) returns (PMultiGetResponse);
+    rpc multiget_data_v2(PMultiGetRequestV2) returns (PMultiGetResponseV2);
     rpc get_file_cache_meta_by_tablet_id(PGetFileCacheMetaRequest) returns 
(PGetFileCacheMetaResponse);
     rpc tablet_fetch_data(PTabletKeyLookupRequest) returns 
(PTabletKeyLookupResponse);
     rpc get_column_ids_by_tablet_ids(PFetchColIdsRequest) returns 
(PFetchColIdsResponse);
diff --git a/tools/clickbench-tools/conf/doris-cluster.conf 
b/tools/clickbench-tools/conf/doris-cluster.conf
index cc7d8a2602e..7d70684941e 100644
--- a/tools/clickbench-tools/conf/doris-cluster.conf
+++ b/tools/clickbench-tools/conf/doris-cluster.conf
@@ -20,11 +20,11 @@ export FE_HOST='127.0.0.1'
 # BE host
 export BE_HOST='127.0.0.1'
 # http_port in fe.conf
-export FE_HTTP_PORT=8030
+export FE_HTTP_PORT=8137
 # webserver_port in be.conf
-export BE_WEBSERVER_PORT=8040
+export BE_WEBSERVER_PORT=8147
 # query_port in fe.conf
-export FE_QUERY_PORT=9030
+export FE_QUERY_PORT=9137
 # Doris username
 export USER='root'
 # Doris password


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to