This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e22f5891d2 [WIP](row store) two phase opt read row store (#18654)
e22f5891d2 is described below

commit e22f5891d2fba4a54cb421b065f8da5f00cc4286
Author: lihangyu <15605149...@163.com>
AuthorDate: Tue May 16 13:21:58 2023 +0800

    [WIP](row store) two phase opt read row store (#18654)
---
 be/src/exec/rowid_fetcher.cpp                      | 166 +++++++++++++++------
 be/src/exec/rowid_fetcher.h                        |  27 +++-
 be/src/exec/tablet_info.h                          |   7 +
 be/src/olap/rowset/beta_rowset_writer.cpp          |   1 -
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |   3 +
 be/src/olap/tablet.cpp                             |   5 +-
 be/src/olap/tablet.h                               |   2 +-
 be/src/olap/utils.h                                |  12 ++
 be/src/runtime/descriptors.cpp                     |   4 +-
 be/src/runtime/descriptors.h                       |   2 +-
 be/src/service/internal_service.cpp                | 125 ++++++++--------
 be/src/service/internal_service.h                  |   2 +-
 be/src/service/point_query_executor.cpp            |   8 +-
 be/src/vec/exec/vexchange_node.cpp                 |  27 ----
 be/src/vec/exec/vexchange_node.h                   |   7 -
 be/src/vec/exprs/vexpr_context.h                   |   8 +
 be/src/vec/exprs/vslot_ref.cpp                     |   5 +-
 be/src/vec/jsonb/serialize.cpp                     |   4 +
 be/src/vec/sink/vresult_sink.cpp                   |  38 ++++-
 be/src/vec/sink/vresult_sink.h                     |   5 +
 .../org/apache/doris/analysis/CreateTableStmt.java |   6 +-
 .../java/org/apache/doris/analysis/SelectStmt.java |  63 +++++---
 .../glue/translator/PhysicalPlanTranslator.java    |  45 ++++++
 .../org/apache/doris/planner/ExchangeNode.java     |  22 ---
 .../org/apache/doris/planner/OriginalPlanner.java  |  50 +++++--
 .../java/org/apache/doris/planner/ResultSink.java  |  16 ++
 .../java/org/apache/doris/planner/SortNode.java    |   2 +-
 .../org/apache/doris/system/SystemInfoService.java |  12 ++
 gensrc/proto/internal_service.proto                |  27 +++-
 gensrc/thrift/DataSinks.thrift                     |  11 +-
 gensrc/thrift/PlanNodes.thrift                     |   2 -
 .../scalar_types/sql/dup_key_2pr_q01.out           |   8 +
 .../scalar_types/sql/dup_key_2pr_q02.out           |   4 +
 .../scalar_types/sql/dup_key_2pr_q03.out           |   4 +
 .../scalar_types/sql/dup_key_2pr_q04.out           |   4 +
 .../suites/datatype_p0/scalar_types/load.groovy    |   8 +-
 .../scalar_types/sql/dup_key_2pr_q01.sql           |   1 +
 .../scalar_types/sql/dup_key_2pr_q02.sql           |   1 +
 .../scalar_types/sql/dup_key_2pr_q03.sql           |   1 +
 .../scalar_types/sql/dup_key_2pr_q04.sql           |   1 +
 40 files changed, 513 insertions(+), 233 deletions(-)

diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 46909904f9..a90182bce9 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -18,22 +18,26 @@
 #include "exec/rowid_fetcher.h"
 
 #include <brpc/callback.h>
-#include <brpc/controller.h>
 #include <butil/endpoint.h>
 #include <fmt/format.h>
+#include <gen_cpp/data.pb.h>
 #include <gen_cpp/internal_service.pb.h>
+#include <gen_cpp/types.pb.h>
 #include <glog/logging.h>
 #include <stddef.h>
 #include <stdint.h>
 
 #include <algorithm>
+#include <cstdint>
 #include <ostream>
 #include <string>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "bthread/countdown_event.h"
 #include "common/config.h"
+#include "common/consts.h"
 #include "exec/tablet_info.h" // DorisNodesInfo
 #include "olap/olap_common.h"
 #include "olap/utils.h"
@@ -48,11 +52,15 @@
 #include "vec/common/assert_cast.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/block.h" // Block
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/jsonb/serialize.h"
 
 namespace doris {
 
-Status RowIDFetcher::init(DorisNodesInfo* nodes_info) {
-    for (auto [node_id, node_info] : nodes_info->nodes_info()) {
+Status RowIDFetcher::init() {
+    DorisNodesInfo nodes_info;
+    nodes_info.setNodes(_fetch_option.t_fetch_opt.nodes_info);
+    for (auto [node_id, node_info] : nodes_info.nodes_info()) {
         auto client = 
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
                 node_info.host, node_info.brpc_port);
         if (!client) {
@@ -65,29 +73,33 @@ Status RowIDFetcher::init(DorisNodesInfo* nodes_info) {
     return Status::OK();
 }
 
-static std::string format_rowid(const GlobalRowLoacation& location) {
-    return fmt::format("{} {} {} {}", location.tablet_id,
-                       location.row_location.rowset_id.to_string(),
-                       location.row_location.segment_id, 
location.row_location.row_id);
-}
-
-PMultiGetRequest RowIDFetcher::_init_fetch_request(const 
vectorized::ColumnString& row_ids) {
+PMultiGetRequest RowIDFetcher::_init_fetch_request(const 
vectorized::ColumnString& row_locs) const {
     PMultiGetRequest mget_req;
-    _tuple_desc->to_protobuf(mget_req.mutable_desc());
-    for (auto slot : _tuple_desc->slots()) {
+    _fetch_option.desc->to_protobuf(mget_req.mutable_desc());
+    for (SlotDescriptor* slot : _fetch_option.desc->slots()) {
+        // ignore rowid
+        if (slot->col_name() == BeConsts::ROWID_COL) {
+            continue;
+        }
         slot->to_protobuf(mget_req.add_slots());
     }
-    for (size_t i = 0; i < row_ids.size(); ++i) {
-        PMultiGetRequest::RowId row_id;
-        StringRef row_id_rep = row_ids.get_data_at(i);
+    for (size_t i = 0; i < row_locs.size(); ++i) {
+        PRowLocation row_loc;
+        StringRef row_id_rep = row_locs.get_data_at(i);
+        // TODO: When transferring data between machines with different byte 
orders (endianness),
+        // not performing proper handling may lead to issues in parsing and 
exchanging the data.
         auto location = reinterpret_cast<const 
GlobalRowLoacation*>(row_id_rep.data);
-        row_id.set_tablet_id(location->tablet_id);
-        row_id.set_rowset_id(location->row_location.rowset_id.to_string());
-        row_id.set_segment_id(location->row_location.segment_id);
-        row_id.set_ordinal_id(location->row_location.row_id);
-        *mget_req.add_rowids() = std::move(row_id);
+        row_loc.set_tablet_id(location->tablet_id);
+        row_loc.set_rowset_id(location->row_location.rowset_id.to_string());
+        row_loc.set_segment_id(location->row_location.segment_id);
+        row_loc.set_ordinal_id(location->row_location.row_id);
+        *mget_req.add_row_locs() = std::move(row_loc);
     }
-    mget_req.set_be_exec_version(_st->be_exec_version());
+    PUniqueId& query_id = *mget_req.mutable_query_id();
+    query_id.set_hi(_fetch_option.runtime_state->query_id().hi);
+    query_id.set_lo(_fetch_option.runtime_state->query_id().lo);
+    
mget_req.set_be_exec_version(_fetch_option.runtime_state->be_exec_version());
+    mget_req.set_fetch_row_store(_fetch_option.t_fetch_opt.fetch_row_store);
     return mget_req;
 }
 
@@ -95,9 +107,12 @@ static void fetch_callback(bthread::CountdownEvent* 
counter) {
     Defer __defer([&] { counter->signal(); });
 }
 
-static Status MergeRPCResults(const std::vector<PMultiGetResponse>& rsps,
-                              const std::vector<brpc::Controller>& cntls,
-                              vectorized::MutableBlock* output_block) {
+Status RowIDFetcher::_merge_rpc_results(const PMultiGetRequest& request,
+                                        const std::vector<PMultiGetResponse>& 
rsps,
+                                        const std::vector<brpc::Controller>& 
cntls,
+                                        vectorized::Block* output_block,
+                                        std::vector<PRowLocation>* rows_id) 
const {
+    output_block->clear();
     for (const auto& cntl : cntls) {
         if (cntl.Failed()) {
             LOG(WARNING) << "Failed to fetch meet rpc error:" << 
cntl.ErrorText()
@@ -105,25 +120,61 @@ static Status MergeRPCResults(const 
std::vector<PMultiGetResponse>& rsps,
             return Status::InternalError(cntl.ErrorText());
         }
     }
-    for (const auto& resp : rsps) {
+
+    auto merge_function = [&](const PMultiGetResponse& resp) {
         Status st(resp.status());
         if (!st.ok()) {
             LOG(WARNING) << "Failed to fetch " << st.to_string();
             return st;
         }
+        for (const PRowLocation& row_id : resp.row_locs()) {
+            rows_id->push_back(row_id);
+        }
+        // Merge binary rows
+        if (request.fetch_row_store()) {
+            CHECK(resp.row_locs().size() == resp.binary_row_data_size());
+            if (output_block->is_empty_column()) {
+                *output_block = vectorized::Block(_fetch_option.desc->slots(), 
1);
+            }
+            for (int i = 0; i < resp.binary_row_data_size(); ++i) {
+                vectorized::JsonbSerializeUtil::jsonb_to_block(
+                        *_fetch_option.desc, resp.binary_row_data(i).data(),
+                        resp.binary_row_data(i).size(), *output_block);
+            }
+            return Status::OK();
+        }
+        // Merge partial blocks
         vectorized::Block partial_block(resp.block());
-        RETURN_IF_ERROR(output_block->merge(partial_block));
+        CHECK(resp.row_locs().size() == partial_block.rows());
+        if (output_block->is_empty_column()) {
+            output_block->swap(partial_block);
+        } else if (partial_block.columns() != output_block->columns()) {
+            return Status::Error<ErrorCode::INTERNAL_ERROR>(
+                    "Merge block not match, self:[{}], input:[{}], ", 
output_block->dump_types(),
+                    partial_block.dump_types());
+        } else {
+            for (int i = 0; i < output_block->columns(); ++i) {
+                
output_block->get_by_position(i).column->assume_mutable()->insert_range_from(
+                        *partial_block.get_by_position(i)
+                                 .column->convert_to_full_column_if_const()
+                                 .get(),
+                        0, partial_block.rows());
+            }
+        }
+        return Status::OK();
+    };
+
+    for (const auto& resp : rsps) {
+        RETURN_IF_ERROR(merge_function(resp));
     }
     return Status::OK();
 }
 
-Status RowIDFetcher::fetch(const vectorized::ColumnPtr& row_ids,
-                           vectorized::MutableBlock* res_block) {
+Status RowIDFetcher::fetch(const vectorized::ColumnPtr& column_row_ids,
+                           vectorized::Block* res_block) {
     CHECK(!_stubs.empty());
-    res_block->clear_column_data();
-    vectorized::MutableBlock mblock({_tuple_desc}, row_ids->size());
     PMultiGetRequest mget_req = _init_fetch_request(assert_cast<const 
vectorized::ColumnString&>(
-            *vectorized::remove_nullable(row_ids).get()));
+            *vectorized::remove_nullable(column_row_ids).get()));
     std::vector<PMultiGetResponse> resps(_stubs.size());
     std::vector<brpc::Controller> cntls(_stubs.size());
     bthread::CountdownEvent counter(_stubs.size());
@@ -133,20 +184,51 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr& 
row_ids,
         _stubs[i]->multiget_data(&cntls[i], &mget_req, &resps[i], callback);
     }
     counter.wait();
-    RETURN_IF_ERROR(MergeRPCResults(resps, cntls, &mblock));
-    // final sort by row_ids sequence, since row_ids is already sorted
-    vectorized::Block tmp = mblock.to_block();
-    std::unordered_map<std::string, uint32_t> row_order;
-    vectorized::ColumnPtr row_id_column = tmp.get_columns().back();
-    for (size_t x = 0; x < row_id_column->size(); ++x) {
+
+    // Merge
+    std::vector<PRowLocation> rows_locs;
+    rows_locs.reserve(rows_locs.size());
+    RETURN_IF_ERROR(_merge_rpc_results(mget_req, resps, cntls, res_block, 
&rows_locs));
+
+    // Final sort by row_ids sequence, since row_ids is already sorted if need
+    std::map<GlobalRowLoacation, size_t> positions;
+    for (size_t i = 0; i < rows_locs.size(); ++i) {
+        RowsetId rowset_id;
+        rowset_id.init(rows_locs[i].rowset_id());
+        GlobalRowLoacation grl(rows_locs[i].tablet_id(), rowset_id, 
rows_locs[i].segment_id(),
+                               rows_locs[i].ordinal_id());
+        positions[grl] = i;
+    };
+    vectorized::IColumn::Permutation permutation;
+    permutation.reserve(column_row_ids->size());
+    for (size_t i = 0; i < column_row_ids->size(); ++i) {
         auto location =
-                reinterpret_cast<const 
GlobalRowLoacation*>(row_id_column->get_data_at(x).data);
-        row_order[format_rowid(*location)] = x;
+                reinterpret_cast<const 
GlobalRowLoacation*>(column_row_ids->get_data_at(i).data);
+        permutation.push_back(positions[*location]);
+    }
+    size_t num_rows = res_block->rows();
+    for (size_t i = 0; i < res_block->columns(); ++i) {
+        res_block->get_by_position(i).column =
+                res_block->get_by_position(i).column->permute(permutation, 
num_rows);
     }
-    for (size_t x = 0; x < row_ids->size(); ++x) {
-        auto location = reinterpret_cast<const 
GlobalRowLoacation*>(row_ids->get_data_at(x).data);
-        res_block->add_row(&tmp, row_order[format_rowid(*location)]);
+    // shrink for char type
+    std::vector<size_t> char_type_idx;
+    for (size_t i = 0; i < _fetch_option.desc->slots().size(); i++) {
+        auto column_desc = _fetch_option.desc->slots()[i];
+        auto type_desc = column_desc->type();
+        do {
+            if (type_desc.type == TYPE_CHAR) {
+                char_type_idx.emplace_back(i);
+                break;
+            } else if (type_desc.type != TYPE_ARRAY) {
+                break;
+            }
+            // for Array<Char> or Array<Array<Char>>
+            type_desc = type_desc.children[0];
+        } while (true);
     }
+    res_block->shrink_char_type_column_suffix_zero(char_type_idx);
+    VLOG_DEBUG << "dump block:" << res_block->dump_data(0, 10);
     return Status::OK();
 }
 
diff --git a/be/src/exec/rowid_fetcher.h b/be/src/exec/rowid_fetcher.h
index 7f789b1c40..ae57c295a5 100644
--- a/be/src/exec/rowid_fetcher.h
+++ b/be/src/exec/rowid_fetcher.h
@@ -17,12 +17,16 @@
 
 #pragma once
 
+#include <brpc/controller.h>
+#include <gen_cpp/DataSinks_types.h>
 #include <gen_cpp/internal_service.pb.h>
 
 #include <memory>
 #include <vector>
 
 #include "common/status.h"
+#include "exec/tablet_info.h" // DorisNodesInfo
+#include "vec/core/block.h"
 #include "vec/data_types/data_type.h"
 
 namespace doris {
@@ -38,18 +42,29 @@ class MutableBlock;
 
 // fetch rows by global rowid
 // tablet_id/rowset_name/segment_id/ordinal_id
+
+struct FetchOption {
+    TupleDescriptor* desc = nullptr;
+    RuntimeState* runtime_state = nullptr;
+    TFetchOption t_fetch_opt;
+};
+
 class RowIDFetcher {
 public:
-    RowIDFetcher(TupleDescriptor* desc, RuntimeState* st) : _tuple_desc(desc), 
_st(st) {}
-    Status init(DorisNodesInfo* nodes_info);
-    Status fetch(const vectorized::ColumnPtr& row_ids, 
vectorized::MutableBlock* block);
+    RowIDFetcher(const FetchOption& fetch_opt) : _fetch_option(fetch_opt) {}
+    Status init();
+    Status fetch(const vectorized::ColumnPtr& row_ids, vectorized::Block* 
block);
 
 private:
-    PMultiGetRequest _init_fetch_request(const vectorized::ColumnString& 
row_ids);
+    PMultiGetRequest _init_fetch_request(const vectorized::ColumnString& 
row_ids) const;
+    Status _merge_rpc_results(const PMultiGetRequest& request,
+                              const std::vector<PMultiGetResponse>& rsps,
+                              const std::vector<brpc::Controller>& cntls,
+                              vectorized::Block* output_block,
+                              std::vector<PRowLocation>* rows_id) const;
 
     std::vector<std::shared_ptr<PBackendService_Stub>> _stubs;
-    TupleDescriptor* _tuple_desc;
-    RuntimeState* _st;
+    FetchOption _fetch_option;
 };
 
 } // namespace doris
diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h
index b4146ba6e0..bcfc5541eb 100644
--- a/be/src/exec/tablet_info.h
+++ b/be/src/exec/tablet_info.h
@@ -263,11 +263,18 @@ struct NodeInfo {
 
 class DorisNodesInfo {
 public:
+    DorisNodesInfo() = default;
     DorisNodesInfo(const TPaloNodesInfo& t_nodes) {
         for (auto& node : t_nodes.nodes) {
             _nodes.emplace(node.id, node);
         }
     }
+    void setNodes(const TPaloNodesInfo& t_nodes) {
+        _nodes.clear();
+        for (auto& node : t_nodes.nodes) {
+            _nodes.emplace(node.id, node);
+        }
+    }
     const NodeInfo* find_node(int64_t id) const {
         auto it = _nodes.find(id);
         if (it != std::end(_nodes)) {
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 60652c9d8c..5955e97d2c 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -421,7 +421,6 @@ Status BetaRowsetWriter::_add_block(const 
vectorized::Block* block,
             max_row_add = 
(*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
             DCHECK(max_row_add > 0);
         }
-
         size_t input_row_num = std::min(block_row_num - row_offset, 
size_t(max_row_add));
         auto s = (*segment_writer)->append_block(block, row_offset, 
input_row_num);
         if (UNLIKELY(!s.ok())) {
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index ac8715fd2d..bcf685a046 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -330,6 +330,9 @@ void 
SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
             break;
         }
     }
+    if (row_column_id == 0) {
+        return;
+    }
     vectorized::ColumnString* row_store_column =
             
static_cast<vectorized::ColumnString*>(block.get_by_position(row_column_id)
                                                            
.column->assume_mutable_ref()
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 6a2fc1553d..cfb7a22871 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2477,7 +2477,7 @@ Status Tablet::fetch_value_by_rowids(RowsetSharedPtr 
input_rowset, uint32_t segi
 
 Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation& 
row_location,
                                RowsetSharedPtr input_rowset, const 
TupleDescriptor* desc,
-                               OlapReaderStatistics& stats, vectorized::Block* 
block,
+                               OlapReaderStatistics& stats, std::string& 
values,
                                bool write_to_cache) {
     // read row data
     BetaRowsetSharedPtr rowset = 
std::static_pointer_cast<BetaRowset>(input_rowset);
@@ -2525,11 +2525,12 @@ Status Tablet::lookup_row_data(const Slice& 
encoded_key, const RowLocation& row_
     RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, 
column_ptr));
     assert(column_ptr->size() == 1);
     auto string_column = 
static_cast<vectorized::ColumnString*>(column_ptr.get());
+    StringRef value = string_column->get_data_at(0);
+    values = value.to_string();
     if (write_to_cache) {
         StringRef value = string_column->get_data_at(0);
         RowCache::instance()->insert({tablet_id(), encoded_key}, Slice 
{value.data, value.size});
     }
-    vectorized::JsonbSerializeUtil::jsonb_to_block(*desc, *string_column, 
*block);
     return Status::OK();
 }
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index e666dcb16d..4f7fc8dd9e 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -402,7 +402,7 @@ public:
     // Lookup a row with TupleDescriptor and fill Block
     Status lookup_row_data(const Slice& encoded_key, const RowLocation& 
row_location,
                            RowsetSharedPtr rowset, const TupleDescriptor* desc,
-                           OlapReaderStatistics& stats, vectorized::Block* 
block,
+                           OlapReaderStatistics& stats, std::string& values,
                            bool write_to_cache = false);
 
     Status fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segid,
diff --git a/be/src/olap/utils.h b/be/src/olap/utils.h
index f91b625db3..e4a167b987 100644
--- a/be/src/olap/utils.h
+++ b/be/src/olap/utils.h
@@ -301,6 +301,18 @@ struct GlobalRowLoacation {
             : tablet_id(tid), row_location(rsid, sid, rid) {}
     uint32_t tablet_id;
     RowLocation row_location;
+
+    bool operator==(const GlobalRowLoacation& rhs) const {
+        return tablet_id == rhs.tablet_id && row_location == rhs.row_location;
+    }
+
+    bool operator<(const GlobalRowLoacation& rhs) const {
+        if (tablet_id != rhs.tablet_id) {
+            return tablet_id < rhs.tablet_id;
+        } else {
+            return row_location < rhs.row_location;
+        }
+    }
 };
 
 } // namespace doris
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index c13e2b53d5..f4cb3f6dff 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -517,11 +517,11 @@ std::string RowDescriptor::debug_string() const {
     return ss.str();
 }
 
-int RowDescriptor::get_column_id(int slot_id) const {
+int RowDescriptor::get_column_id(int slot_id, bool force_materialize_slot) 
const {
     int column_id_counter = 0;
     for (const auto tuple_desc : _tuple_desc_map) {
         for (const auto slot : tuple_desc->slots()) {
-            if (!slot->need_materialize()) {
+            if (!force_materialize_slot && !slot->need_materialize()) {
                 continue;
             }
             if (slot->id() == slot_id) {
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 8fd6654e13..48ea79d879 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -494,7 +494,7 @@ public:
 
     std::string debug_string() const;
 
-    int get_column_id(int slot_id) const;
+    int get_column_id(int slot_id, bool force_materialize_slot = false) const;
 
 private:
     // Initializes tupleIdxMap during c'tor using the _tuple_desc_map.
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 80d02f6d58..c674223b8a 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -101,6 +101,7 @@
 #include "util/time.h"
 #include "util/uid_util.h"
 #include "vec/columns/column.h"
+#include "vec/columns/column_string.h"
 #include "vec/core/block.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/data_types/data_type.h"
@@ -109,6 +110,7 @@
 #include "vec/exec/format/json/new_json_reader.h"
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
+#include "vec/jsonb/serialize.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 
 namespace google {
@@ -1414,19 +1416,29 @@ void 
PInternalServiceImpl::response_slave_tablet_pull_rowset(
     }
 }
 
-static Status read_by_rowids(
-        std::pair<size_t, size_t> row_range_idx, const TupleDescriptor& desc,
-        const google::protobuf::RepeatedPtrField<PMultiGetRequest_RowId>& 
rowids,
-        vectorized::Block* sub_block) {
-    //read from row_range.first to row_range.second
-    for (size_t i = row_range_idx.first; i < row_range_idx.second; ++i) {
+Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
+                                        PMultiGetResponse* response) {
+    OlapReaderStatistics stats;
+    vectorized::Block result_block;
+
+    // init desc
+    TupleDescriptor desc(request.desc());
+    std::vector<SlotDescriptor> slots;
+    slots.reserve(request.slots().size());
+    for (const auto& pslot : request.slots()) {
+        slots.push_back(SlotDescriptor(pslot));
+        desc.add_slot(&slots.back());
+    }
+
+    // read row by row
+    for (size_t i = 0; i < request.row_locs_size(); ++i) {
+        const auto& row_loc = request.row_locs(i);
         MonotonicStopWatch watch;
         watch.start();
-        auto row_id = rowids[i];
         TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
-                row_id.tablet_id(), true /*include deleted*/);
+                row_loc.tablet_id(), true /*include deleted*/);
         RowsetId rowset_id;
-        rowset_id.init(row_id.rowset_id());
+        rowset_id.init(row_loc.rowset_id());
         if (!tablet) {
             continue;
         }
@@ -1436,6 +1448,13 @@ static Status read_by_rowids(
             LOG(INFO) << "no such rowset " << rowset_id;
             continue;
         }
+        size_t row_size = 0;
+        Defer _defer([&]() {
+            LOG_EVERY_N(INFO, 100)
+                    << "multiget_data single_row, cost(us):" << 
watch.elapsed_time() / 1000
+                    << ", row_size:" << row_size;
+            *response->add_row_locs() = row_loc;
+        });
         const TabletSchemaSPtr tablet_schema = rowset->tablet_schema();
         VLOG_DEBUG << "get tablet schema column_num:" << 
tablet_schema->num_columns()
                    << ", version:" << tablet_schema->schema_version()
@@ -1445,18 +1464,34 @@ static Status read_by_rowids(
         // find segment
         auto it = std::find_if(segment_cache.get_segments().begin(),
                                segment_cache.get_segments().end(),
-                               [&row_id](const segment_v2::SegmentSharedPtr& 
seg) {
-                                   return seg->id() == row_id.segment_id();
+                               [&row_loc](const segment_v2::SegmentSharedPtr& 
seg) {
+                                   return seg->id() == row_loc.segment_id();
                                });
         if (it == segment_cache.get_segments().end()) {
             continue;
         }
         segment_v2::SegmentSharedPtr segment = *it;
-        for (int x = 0; x < desc.slots().size() - 1; ++x) {
+        GlobalRowLoacation row_location(row_loc.tablet_id(), 
rowset->rowset_id(),
+                                        row_loc.segment_id(), 
row_loc.ordinal_id());
+        // fetch by row store, more effcient way
+        if (request.fetch_row_store()) {
+            CHECK(tablet->tablet_schema()->store_row_column());
+            RowLocation loc(rowset_id, segment->id(), row_loc.ordinal_id());
+            string* value = response->add_binary_row_data();
+            RETURN_IF_ERROR(tablet->lookup_row_data({}, loc, rowset, &desc, 
stats, *value));
+            row_size = value->size();
+            continue;
+        }
+
+        // fetch by column store
+        if (result_block.is_empty_column()) {
+            result_block = vectorized::Block(desc.slots(), 
request.row_locs().size());
+        }
+        for (int x = 0; x < desc.slots().size(); ++x) {
             int index = 
tablet_schema->field_index(desc.slots()[x]->col_unique_id());
             segment_v2::ColumnIterator* column_iterator = nullptr;
             vectorized::MutableColumnPtr column =
-                    sub_block->get_by_position(x).column->assume_mutable();
+                    result_block.get_by_position(x).column->assume_mutable();
             if (index < 0) {
                 column->insert_default();
                 continue;
@@ -1471,58 +1506,22 @@ static Status read_by_rowids(
             opt.stats = &stats;
             opt.use_page_cache = !config::disable_storage_page_cache;
             column_iterator->init(opt);
-            std::vector<segment_v2::rowid_t> rowids {
-                    static_cast<segment_v2::rowid_t>(row_id.ordinal_id())};
-            RETURN_IF_ERROR(column_iterator->read_by_rowids(rowids.data(), 1, 
column));
+            std::vector<segment_v2::rowid_t> single_row_loc {
+                    static_cast<segment_v2::rowid_t>(row_loc.ordinal_id())};
+            
RETURN_IF_ERROR(column_iterator->read_by_rowids(single_row_loc.data(), 1, 
column));
         }
-        LOG_EVERY_N(INFO, 100) << "multiget_data single_row, cost(us):"
-                               << watch.elapsed_time() / 1000;
-        GlobalRowLoacation row_location(row_id.tablet_id(), 
rowset->rowset_id(),
-                                        row_id.segment_id(), 
row_id.ordinal_id());
-        sub_block->get_columns().back()->assume_mutable()->insert_data(
-                reinterpret_cast<const char*>(&row_location), 
sizeof(GlobalRowLoacation));
-    }
-    return Status::OK();
-}
-
-Status PInternalServiceImpl::_multi_get(const PMultiGetRequest* request,
-                                        PMultiGetResponse* response) {
-    TupleDescriptor desc(request->desc());
-    std::vector<SlotDescriptor> slots;
-    slots.reserve(request->slots().size());
-    for (const auto& pslot : request->slots()) {
-        slots.push_back(SlotDescriptor(pslot));
-        desc.add_slot(&slots.back());
     }
-    assert(desc.slots().back()->col_name() == BeConsts::ROWID_COL);
-    vectorized::Block block(desc.slots(), request->rowids().size());
-    RETURN_IF_ERROR(
-            read_by_rowids(std::pair {0, request->rowids_size()}, desc, 
request->rowids(), &block));
-    std::vector<size_t> char_type_idx;
-    for (size_t i = 0; i < desc.slots().size(); i++) {
-        auto column_desc = desc.slots()[i];
-        auto type_desc = column_desc->type();
-        do {
-            if (type_desc.type == TYPE_CHAR) {
-                char_type_idx.emplace_back(i);
-                break;
-            } else if (type_desc.type != TYPE_ARRAY) {
-                break;
-            }
-            // for Array<Char> or Array<Array<Char>>
-            type_desc = type_desc.children[0];
-        } while (true);
+    // serialize block if not empty
+    if (!result_block.is_empty_column()) {
+        VLOG_DEBUG << "dump block:" << result_block.dump_data(0, 10)
+                   << ", be_exec_version:" << request.be_exec_version();
+        [[maybe_unused]] size_t compressed_size = 0;
+        [[maybe_unused]] size_t uncompressed_size = 0;
+        int be_exec_version = request.has_be_exec_version() ? 
request.be_exec_version() : 0;
+        RETURN_IF_ERROR(result_block.serialize(be_exec_version, 
response->mutable_block(),
+                                               &uncompressed_size, 
&compressed_size,
+                                               
segment_v2::CompressionTypePB::LZ4));
     }
-    // shrink char_type suffix zero data
-    block.shrink_char_type_column_suffix_zero(char_type_idx);
-    VLOG_DEBUG << "dump block:" << block.dump_data(0, 10)
-               << ", be_exec_version:" << request->be_exec_version();
-
-    [[maybe_unused]] size_t compressed_size = 0;
-    [[maybe_unused]] size_t uncompressed_size = 0;
-    int be_exec_version = request->has_be_exec_version() ? 
request->be_exec_version() : 0;
-    RETURN_IF_ERROR(block.serialize(be_exec_version, 
response->mutable_block(), &uncompressed_size,
-                                    &compressed_size, 
segment_v2::CompressionTypePB::LZ4));
     return Status::OK();
 }
 
@@ -1536,7 +1535,7 @@ void 
PInternalServiceImpl::multiget_data(google::protobuf::RpcController* contro
         watch.start();
         brpc::ClosureGuard closure_guard(done);
         response->mutable_status()->set_status_code(0);
-        Status st = _multi_get(request, response);
+        Status st = _multi_get(*request, response);
         st.to_protobuf(response->mutable_status());
         LOG(INFO) << "multiget_data finished, cost(us):" << 
watch.elapsed_time() / 1000;
     });
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index dae16604ad..8c6057d978 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -207,7 +207,7 @@ private:
     void _response_pull_slave_rowset(const std::string& remote_host, int64_t 
brpc_port,
                                      int64_t txn_id, int64_t tablet_id, 
int64_t node_id,
                                      bool is_succeed);
-    Status _multi_get(const PMultiGetRequest* request, PMultiGetResponse* 
response);
+    Status _multi_get(const PMultiGetRequest& request, PMultiGetResponse* 
response);
 
     void _get_column_ids_by_tablet_ids(google::protobuf::RpcController* 
controller,
                                        const PFetchColIdsRequest* request,
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index 21d6ac06ac..ab2ae7519e 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -23,6 +23,8 @@
 #include <gen_cpp/internal_service.pb.h>
 #include <stdlib.h>
 
+#include <vector>
+
 #include "olap/lru_cache.h"
 #include "olap/olap_tuple.h"
 #include "olap/row_cursor.h"
@@ -284,11 +286,15 @@ Status PointQueryExecutor::_lookup_row_data() {
         if (!_row_read_ctxs[i]._row_location.has_value()) {
             continue;
         }
+        std::string value;
         RETURN_IF_ERROR(_tablet->lookup_row_data(
                 _row_read_ctxs[i]._primary_key, 
_row_read_ctxs[i]._row_location.value(),
                 *(_row_read_ctxs[i]._rowset_ptr), _reusable->tuple_desc(),
-                _profile_metrics.read_stats, _result_block.get(),
+                _profile_metrics.read_stats, value,
                 !config::disable_storage_row_cache /*whether write row 
cache*/));
+        // serilize value to block, currently only jsonb row formt
+        
vectorized::JsonbSerializeUtil::jsonb_to_block(*_reusable->tuple_desc(), 
value.data(),
+                                                       value.size(), 
*_result_block);
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/vexchange_node.cpp 
b/be/src/vec/exec/vexchange_node.cpp
index a00f9cd839..8535f4a261 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -57,11 +57,6 @@ Status VExchangeNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
     _is_asc_order = tnode.exchange_node.sort_info.is_asc_order;
     _nulls_first = tnode.exchange_node.sort_info.nulls_first;
 
-    if (tnode.exchange_node.__isset.nodes_info) {
-        _nodes_info = _pool->add(new 
DorisNodesInfo(tnode.exchange_node.nodes_info));
-    }
-    _use_two_phase_read = 
tnode.exchange_node.sort_info.__isset.use_two_phase_read &&
-                          tnode.exchange_node.sort_info.use_two_phase_read;
     return Status::OK();
 }
 
@@ -100,19 +95,6 @@ Status VExchangeNode::open(RuntimeState* state) {
     return Status::OK();
 }
 
-Status VExchangeNode::_second_phase_fetch_data(RuntimeState* state, Block* 
final_block) {
-    auto row_id_col = final_block->get_by_position(final_block->columns() - 1);
-    auto tuple_desc = _row_descriptor.tuple_descriptors()[0];
-    RowIDFetcher id_fetcher(tuple_desc, state);
-    RETURN_IF_ERROR(id_fetcher.init(_nodes_info));
-    MutableBlock materialized_block(_row_descriptor.tuple_descriptors(), 
final_block->rows());
-    // fetch will sort block by sequence of ROWID_COL
-    RETURN_IF_ERROR(id_fetcher.fetch(row_id_col.column, &materialized_block));
-    // Notice swap may change the structure of final_block
-    final_block->swap(materialized_block.to_block());
-    return Status::OK();
-}
-
 Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, 
"VExchangeNode::get_next");
     SCOPED_TIMER(runtime_profile()->total_time_counter());
@@ -123,12 +105,6 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
         _is_ready = true;
         return Status::OK();
     }
-    if (_use_two_phase_read) {
-        // Block structure may be changed by calling 
_second_phase_fetch_data() before.
-        // So we should clear block before _stream_recvr->get_next, since
-        // blocks in VSortedRunMerger may not compatible with this block.
-        block->clear();
-    }
     auto status = _stream_recvr->get_next(block, eos);
     // In vsortrunmerger, it will set eos=true, and block not empty
     // so that eos==true, could not make sure that block not have valid data
@@ -153,9 +129,6 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
         }
         COUNTER_SET(_rows_returned_counter, _num_rows_returned);
     }
-    if (_use_two_phase_read && block->rows() > 0) {
-        RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
-    }
     return status;
 }
 
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index 46f5aa619c..58b61dc9af 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -61,9 +61,6 @@ public:
     // Status collect_query_statistics(QueryStatistics* statistics) override;
     void set_num_senders(int num_senders) { _num_senders = num_senders; }
 
-    // final materializtion, used only in topn node
-    Status _second_phase_fetch_data(RuntimeState* state, Block* final_block);
-
 private:
     int _num_senders;
     bool _is_merging;
@@ -78,10 +75,6 @@ private:
     VSortExecExprs _vsort_exec_exprs;
     std::vector<bool> _is_asc_order;
     std::vector<bool> _nulls_first;
-
-    // for fetch data by rowids
-    DorisNodesInfo* _nodes_info = nullptr;
-    bool _use_two_phase_read = false;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 5069a5a615..e00d34559b 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -90,6 +90,10 @@ public:
 
     void clone_fn_contexts(VExprContext* other);
 
+    bool force_materialize_slot() const { return _force_materialize_slot; }
+
+    void set_force_materialize_slot() { _force_materialize_slot = true; }
+
 private:
     friend class VExpr;
 
@@ -112,5 +116,9 @@ private:
 
     /// The depth of expression-tree.
     int _depth_num = 0;
+
+    // This flag only works on VSlotRef.
+    // Force to materialize even if the slot need_materialize is false, we 
just ignore need_materialize flag
+    bool _force_materialize_slot = false;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp
index 4b2e182563..5a34999acc 100644
--- a/be/src/vec/exprs/vslot_ref.cpp
+++ b/be/src/vec/exprs/vslot_ref.cpp
@@ -27,6 +27,7 @@
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
 #include "vec/core/block.h"
+#include "vec/exprs/vexpr_context.h"
 
 namespace doris {
 namespace vectorized {
@@ -59,12 +60,12 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const 
doris::RowDescriptor&
                 state->desc_tbl().debug_string());
     }
     _column_name = &slot_desc->col_name();
-    if (!slot_desc->need_materialize()) {
+    if (!context->force_materialize_slot() && !slot_desc->need_materialize()) {
         // slot should be ignored manually
         _column_id = -1;
         return Status::OK();
     }
-    _column_id = desc.get_column_id(_slot_id);
+    _column_id = desc.get_column_id(_slot_id, 
context->force_materialize_slot());
     if (_column_id < 0) {
         return Status::Error<ErrorCode::INTERNAL_ERROR>(
                 "VSlotRef {} have invalid slot id: {}, desc: {}, slot_desc: 
{}, desc_tbl: {}",
diff --git a/be/src/vec/jsonb/serialize.cpp b/be/src/vec/jsonb/serialize.cpp
index 69bc603b2f..c8b2eff0f4 100644
--- a/be/src/vec/jsonb/serialize.cpp
+++ b/be/src/vec/jsonb/serialize.cpp
@@ -25,6 +25,10 @@
 
 #include "olap/tablet_schema.h"
 #include "runtime/descriptors.h"
+#include "runtime/jsonb_value.h"
+#include "runtime/primitive_type.h"
+#include "runtime/types.h"
+#include "util/bitmap_value.h"
 #include "util/jsonb_document.h"
 #include "util/jsonb_stream.h"
 #include "util/jsonb_writer.h"
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index 15d41f1f02..d61dc13875 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -24,14 +24,19 @@
 #include <new>
 
 #include "common/config.h"
+#include "common/consts.h"
 #include "common/object_pool.h"
+#include "exec/rowid_fetcher.h"
+#include "gutil/port.h"
 #include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/result_buffer_mgr.h"
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "util/telemetry/telemetry.h"
 #include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
 #include "vec/sink/vmysql_result_writer.h"
 #include "vec/sink/vresult_writer.h"
 
@@ -51,7 +56,7 @@ VResultSink::VResultSink(const RowDescriptor& row_desc, const 
std::vector<TExpr>
     } else {
         _sink_type = sink.type;
     }
-
+    _fetch_option = sink.fetch_option;
     _name = "ResultSink";
 }
 
@@ -61,6 +66,12 @@ Status VResultSink::prepare_exprs(RuntimeState* state) {
     // From the thrift expressions create the real exprs.
     RETURN_IF_ERROR(
             VExpr::create_expr_trees(state->obj_pool(), _t_output_expr, 
&_output_vexpr_ctxs));
+    if (_fetch_option.use_two_phase_fetch) {
+        for (VExprContext* expr_ctx : _output_vexpr_ctxs) {
+            // Must materialize if it a slot, or the slot column id will be -1
+            expr_ctx->set_force_materialize_slot();
+        }
+    }
     // Prepare the exprs to run.
     RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
     return Status::OK();
@@ -100,9 +111,32 @@ Status VResultSink::open(RuntimeState* state) {
     return VExpr::open(_output_vexpr_ctxs, state);
 }
 
+Status VResultSink::second_phase_fetch_data(RuntimeState* state, Block* 
final_block) {
+    auto row_id_col = final_block->get_by_position(final_block->columns() - 1);
+    CHECK(row_id_col.name == BeConsts::ROWID_COL);
+    auto tuple_desc = _row_desc.tuple_descriptors()[0];
+    FetchOption fetch_option;
+    fetch_option.desc = tuple_desc;
+    fetch_option.t_fetch_opt = _fetch_option;
+    fetch_option.runtime_state = state;
+    RowIDFetcher id_fetcher(fetch_option);
+    RETURN_IF_ERROR(id_fetcher.init());
+    RETURN_IF_ERROR(id_fetcher.fetch(row_id_col.column, final_block));
+    return Status::OK();
+}
+
 Status VResultSink::send(RuntimeState* state, Block* block, bool eos) {
     INIT_AND_SCOPE_SEND_SPAN(state->get_tracer(), _send_span, 
"VResultSink::send");
-    return _writer->append_block(*block);
+    if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
+        RETURN_IF_ERROR(second_phase_fetch_data(state, block));
+    }
+    RETURN_IF_ERROR(_writer->append_block(*block));
+    if (_fetch_option.use_two_phase_fetch) {
+        // Block structure may be changed by calling 
_second_phase_fetch_data().
+        // So we should clear block in case of unmatched columns
+        block->clear();
+    }
+    return Status::OK();
 }
 
 Status VResultSink::close(RuntimeState* state, Status exec_status) {
diff --git a/be/src/vec/sink/vresult_sink.h b/be/src/vec/sink/vresult_sink.h
index adf5dd249d..97916544bf 100644
--- a/be/src/vec/sink/vresult_sink.h
+++ b/be/src/vec/sink/vresult_sink.h
@@ -28,6 +28,7 @@
 
 #include "common/status.h"
 #include "exec/data_sink.h"
+#include "vec/sink/vresult_writer.h"
 
 namespace doris {
 class RuntimeState;
@@ -141,6 +142,7 @@ public:
 
 private:
     Status prepare_exprs(RuntimeState* state);
+    Status second_phase_fetch_data(RuntimeState* state, Block* final_block);
     TResultSinkType::type _sink_type;
     // set file options when sink type is FILE
     std::unique_ptr<ResultFileOptions> _file_opts;
@@ -156,6 +158,9 @@ private:
     std::shared_ptr<VResultWriter> _writer;
     RuntimeProfile* _profile; // Allocated from _pool
     int _buf_size;            // Allocated from _pool
+
+    // for fetch data by rowids
+    TFetchOption _fetch_option;
 };
 } // namespace vectorized
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index d7a47f9279..18db7b5847 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -331,7 +331,6 @@ public class CreateTableStmt extends DdlStmt {
         }
 
         boolean enableUniqueKeyMergeOnWrite = false;
-        boolean enableStoreRowColumn = false;
         // analyze key desc
         if (engineName.equalsIgnoreCase("olap")) {
             // olap table
@@ -401,7 +400,6 @@ public class CreateTableStmt extends DdlStmt {
                 throw new AnalysisException(
                         PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE + " 
property only support unique key table");
             }
-
             if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
                 enableUniqueKeyMergeOnWrite = true;
                 if (properties != null) {
@@ -409,7 +407,6 @@ public class CreateTableStmt extends DdlStmt {
                     // so we just clone a properties map here.
                     enableUniqueKeyMergeOnWrite = 
PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(
                             new HashMap<>(properties));
-                    enableStoreRowColumn = 
PropertyAnalyzer.analyzeStoreRowColumn(new HashMap<>(properties));
                 }
             }
 
@@ -455,7 +452,8 @@ public class CreateTableStmt extends DdlStmt {
                 
columnDefs.add(ColumnDef.newDeleteSignColumnDef(AggregateType.REPLACE));
             }
         }
-        if (enableStoreRowColumn) {
+        // add a hidden column as row store
+        if (properties != null && PropertyAnalyzer.analyzeStoreRowColumn(new 
HashMap<>(properties))) {
             columnDefs.add(ColumnDef.newRowStoreColumnDef());
         }
         if (Config.enable_hidden_version_column_by_default && keysDesc != null
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index c8bc8ddf43..0a9bd5a50e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -658,7 +658,10 @@ public class SelectStmt extends QueryStmt {
             Set<SlotRef> orderingSlots = Sets.newHashSet();
             Set<SlotRef> conjuntSlots = Sets.newHashSet();
             TreeNode.collect(resultExprs, 
Predicates.instanceOf(SlotRef.class), resultSlots);
-            TreeNode.collect(sortInfo.getOrderingExprs(), 
Predicates.instanceOf(SlotRef.class), orderingSlots);
+            if (sortInfo != null) {
+                TreeNode.collect(sortInfo.getOrderingExprs(),
+                        Predicates.instanceOf(SlotRef.class), orderingSlots);
+            }
             if (whereClause != null) {
                 whereClause.collect(SlotRef.class, conjuntSlots);
             }
@@ -714,20 +717,23 @@ public class SelectStmt extends QueryStmt {
                 || 
!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
             return false;
         }
-        if (!evaluateOrderBy) {
-            // Need evaluate orderby, if sort node was eliminated then this 
optmization
-            // could be useless
-            return false;
-        }
-        // Only handle the simplest `SELECT ... FROM <tbl> WHERE ... ORDER BY 
... LIMIT ...` query
+        // Only handle the simplest `SELECT ... FROM <tbl> WHERE ... [ORDER BY 
...] [LIMIT ...]` query
         if (getAggInfo() != null
                 || getHavingPred() != null
                 || getWithClause() != null
                 || getAnalyticInfo() != null) {
             return false;
         }
+        // ignore short circuit query
+        if (isPointQueryShortCircuit()) {
+            return false;
+        }
+        // ignore insert into select
+        if (fromInsert) {
+            return false;
+        }
+        // ensure no sub query
         if (!analyzer.isRootAnalyzer()) {
-            // ensure no sub query
             return false;
         }
         // If select stmt has inline view or this is an inline view query stmt 
analyze call
@@ -750,24 +756,35 @@ public class SelectStmt extends QueryStmt {
         if (!olapTable.getEnableLightSchemaChange()) {
             return false;
         }
-        // Only TOPN query at present
-        if (getOrderByElements() == null
-                    || !hasLimit()
-                    || getLimit() <= 0
-                    || getLimit() > 
ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
-            return false;
-        }
-        // Check order by exprs are all slot refs
-        // Rethink? implement more generic to support all exprs
-        LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs());
-        LOG.debug("getOrderByElements {}", getOrderByElements());
-        for (Expr sortExpr : sortInfo.getOrderingExprs()) {
-            if (!(sortExpr instanceof SlotRef)) {
+        if (getOrderByElements() != null) {
+            if (!evaluateOrderBy) {
+                // Need evaluate orderby, if sort node was eliminated then 
this optmization
+                // could be useless
                 return false;
             }
+            // case1: general topn query, like: select * from tbl where xxx 
order by yyy limit n
+            if (!hasLimit()
+                        || getLimit() <= 0
+                        || getLimit() > 
ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
+                return false;
+            }
+            // Check order by exprs are all slot refs
+            // Rethink? implement more generic to support all exprs
+            LOG.debug("getOrderingExprs {}", sortInfo.getOrderingExprs());
+            LOG.debug("getOrderByElements {}", getOrderByElements());
+            for (Expr sortExpr : sortInfo.getOrderingExprs()) {
+                if (!(sortExpr instanceof SlotRef)) {
+                    return false;
+                }
+            }
+            isTwoPhaseOptEnabled = true;
+            return true;
+        } else {
+            // case2: optimize scan utilize row store column, query like 
select * from tbl where xxx [limit xxx]
+            // TODO: we only optimize query with select * at present
+            return olapTable.storeRowColumn() && 
selectList.getItems().stream().anyMatch(e -> e.isStar());
         }
-        isTwoPhaseOptEnabled = true;
-        return true;
+        // return false;
     }
 
     public List<TupleId> getTableRefIds() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index b7214f7d9d..e0340ba20b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -38,6 +38,7 @@ import org.apache.doris.analysis.TableRef;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.Function.NullableMode;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Table;
@@ -134,6 +135,7 @@ import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.RepeatNode;
+import org.apache.doris.planner.ResultSink;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.planner.SchemaScanNode;
 import org.apache.doris.planner.SelectNode;
@@ -146,6 +148,7 @@ import org.apache.doris.planner.external.HudiScanNode;
 import org.apache.doris.planner.external.iceberg.IcebergScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.tablefunction.TableValuedFunctionIf;
+import org.apache.doris.thrift.TFetchOption;
 import org.apache.doris.thrift.TPartitionType;
 import org.apache.doris.thrift.TPushAggOp;
 
@@ -200,6 +203,47 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         this.statsErrorEstimator = statsErrorEstimator;
     }
 
+    // We use two phase read to optimize sql like: select * from tbl [where 
xxx = ???] [order by column1] [limit n]
+    // in the first phase, we add an extra column `RowId` to Block, and sort 
blocks in TopN nodes
+    // in the second phase, we have n rows, we do a fetch rpc to get all 
rowids data for the n rows
+    // and reconconstruct the final block
+    private void setResultSinkFetchOptionIfNeed() {
+        boolean needFetch = false;
+        // Only single olap table should be fetched
+        OlapTable fetchOlapTable = null;
+        for (PlanFragment fragment : context.getPlanFragments()) {
+            PlanNode node = fragment.getPlanRoot();
+            PlanNode parent = null;
+            // OlapScanNode is the last node.
+            // So, just get the last two node and check if they are SortNode 
and OlapScan.
+            while (node.getChildren().size() != 0) {
+                parent = node;
+                node = node.getChildren().get(0);
+            }
+
+            // case1: general topn optimized query
+            if ((node instanceof OlapScanNode) && (parent instanceof 
SortNode)) {
+                SortNode sortNode = (SortNode) parent;
+                OlapScanNode scanNode = (OlapScanNode) node;
+                if (sortNode.getUseTwoPhaseReadOpt()) {
+                    needFetch = true;
+                    fetchOlapTable = scanNode.getOlapTable();
+                    break;
+                }
+            }
+        }
+        for (PlanFragment fragment : context.getPlanFragments()) {
+            if (needFetch && fragment.getSink() instanceof ResultSink) {
+                TFetchOption fetchOption = new TFetchOption();
+                fetchOption.setFetchRowStore(fetchOlapTable.storeRowColumn());
+                fetchOption.setUseTwoPhaseFetch(true);
+                
fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo());
+                ((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
+                break;
+            }
+        }
+    }
+
     /**
      * Translate Nereids Physical Plan tree to Stale Planner PlanFragment tree.
      *
@@ -236,6 +280,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         for (PlanFragment fragment : context.getPlanFragments()) {
             fragment.finalize(null);
         }
+        setResultSinkFetchOptionIfNeed();
         Collections.reverse(context.getPlanFragments());
         context.getDescTable().computeMemLayout();
         return rootFragment;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index c0cddcc242..ab82dad209 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -24,17 +24,12 @@ import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.SortInfo;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
-import org.apache.doris.catalog.Env;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.StatsRecursiveDerive;
-import org.apache.doris.system.Backend;
-import org.apache.doris.system.SystemInfoService;
 import org.apache.doris.thrift.TExchangeNode;
 import org.apache.doris.thrift.TExplainLevel;
-import org.apache.doris.thrift.TNodeInfo;
-import org.apache.doris.thrift.TPaloNodesInfo;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 
@@ -150,9 +145,6 @@ public class ExchangeNode extends PlanNode {
         }
         if (mergeInfo != null) {
             msg.exchange_node.setSortInfo(mergeInfo.toThrift());
-            if (mergeInfo.useTwoPhaseRead()) {
-                msg.exchange_node.setNodesInfo(createNodesInfo());
-            }
         }
         msg.exchange_node.setOffset(offset);
     }
@@ -174,18 +166,4 @@ public class ExchangeNode extends PlanNode {
     public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
         return prefix + "offset: " + offset + "\n";
     }
-
-    /**
-    * Set the parameters used to fetch data by rowid column
-    * after init().
-    */
-    private TPaloNodesInfo createNodesInfo() {
-        TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
-        SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
-        for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) {
-            Backend backend = systemInfoService.getBackend(id);
-            nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, 
backend.getHost(), backend.getBrpcPort()));
-        }
-        return nodesInfo;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index 49058a021c..42e72d6c84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -35,6 +35,7 @@ import org.apache.doris.analysis.TableName;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.Table;
@@ -51,6 +52,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.statistics.query.StatsDelta;
+import org.apache.doris.thrift.TFetchOption;
 import org.apache.doris.thrift.TQueryOptions;
 import org.apache.doris.thrift.TRuntimeFilterMode;
 
@@ -305,6 +307,10 @@ public class OriginalPlanner extends Planner {
                     // Double check this plan to ensure it's a general topn 
query
                     injectRowIdColumnSlot();
                     ((SortNode) singleNodePlan).setUseTwoPhaseReadOpt(true);
+                } else if (singleNodePlan instanceof OlapScanNode &&  
singleNodePlan.getChildren().size() == 0) {
+                    // Optimize query like `SELECT ... FROM <tbl> WHERE ... 
LIMIT ...`.
+                    // This typically used when row store enabled, to reduce 
scan cost
+                    injectRowIdColumnSlot();
                 } else {
                     // This is not a general topn query, rollback 
needMaterialize flag
                     for (SlotDescriptor slot : 
analyzer.getDescTbl().getSlotDescs().values()) {
@@ -463,11 +469,13 @@ public class OriginalPlanner extends Planner {
         return slotDesc;
     }
 
-    // We use two phase read to optimize sql like: select * from tbl [where 
xxx = ???] order by column1 limit n
+    // We use two phase read to optimize sql like: select * from tbl [where 
xxx = ???] [order by column1] [limit n]
     // in the first phase, we add an extra column `RowId` to Block, and sort 
blocks in TopN nodes
     // in the second phase, we have n rows, we do a fetch rpc to get all 
rowids date for the n rows
     // and reconconstruct the final block
     private void injectRowIdColumnSlot() {
+        boolean injected = false;
+        OlapTable olapTable = null;
         for (PlanFragment fragment : fragments) {
             PlanNode node = fragment.getPlanRoot();
             PlanNode parent = null;
@@ -478,17 +486,37 @@ public class OriginalPlanner extends Planner {
                 node = node.getChildren().get(0);
             }
 
-            if (!(node instanceof OlapScanNode) || !(parent instanceof 
SortNode)) {
-                continue;
+            // case1
+            if ((node instanceof OlapScanNode) && (parent instanceof 
SortNode)) {
+                SortNode sortNode = (SortNode) parent;
+                OlapScanNode scanNode = (OlapScanNode) node;
+                SlotDescriptor slot = injectRowIdColumnSlot(analyzer, 
scanNode.getTupleDesc());
+                injectRowIdColumnSlot(analyzer, 
sortNode.getSortInfo().getSortTupleDescriptor());
+                SlotRef extSlot = new SlotRef(slot);
+                sortNode.getResolvedTupleExprs().add(extSlot);
+                sortNode.getSortInfo().setUseTwoPhaseRead();
+                injected = true;
+                olapTable = scanNode.getOlapTable();
+                break;
+            }
+            // case2
+            if ((node instanceof OlapScanNode) && parent == null) {
+                OlapScanNode scanNode = (OlapScanNode) node;
+                injectRowIdColumnSlot(analyzer, scanNode.getTupleDesc());
+                injected = true;
+                olapTable = scanNode.getOlapTable();
+                break;
+            }
+        }
+        for (PlanFragment fragment : fragments) {
+            if (injected && fragment.getSink() instanceof ResultSink) {
+                TFetchOption fetchOption = new TFetchOption();
+                fetchOption.setFetchRowStore(olapTable.storeRowColumn());
+                fetchOption.setUseTwoPhaseFetch(true);
+                
fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo());
+                ((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
+                break;
             }
-            SortNode sortNode = (SortNode) parent;
-            OlapScanNode scanNode = (OlapScanNode) node;
-            SlotDescriptor slot = injectRowIdColumnSlot(analyzer, 
scanNode.getTupleDesc());
-            injectRowIdColumnSlot(analyzer, 
sortNode.getSortInfo().getSortTupleDescriptor());
-            SlotRef extSlot = new SlotRef(slot);
-            sortNode.getResolvedTupleExprs().add(extSlot);
-            sortNode.getSortInfo().setUseTwoPhaseRead();
-            break;
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
index c940851de8..0a79881d58 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TFetchOption;
 import org.apache.doris.thrift.TResultSink;
 
 /**
@@ -30,6 +31,8 @@ import org.apache.doris.thrift.TResultSink;
  */
 public class ResultSink extends DataSink {
     private final PlanNodeId exchNodeId;
+    // Two phase fetch option
+    private TFetchOption fetchOption;
 
     public ResultSink(PlanNodeId exchNodeId) {
         this.exchNodeId = exchNodeId;
@@ -43,13 +46,26 @@ public class ResultSink extends DataSink {
             strBuilder.append("V");
         }
         strBuilder.append("RESULT SINK\n");
+        if (fetchOption != null) {
+            strBuilder.append(prefix).append("   ").append("OPT TWO PHASE\n");
+            if (fetchOption.isFetchRowStore()) {
+                strBuilder.append(prefix).append("   ").append("FETCH ROW 
STORE\n");
+            }
+        }
         return strBuilder.toString();
     }
 
+    public void setFetchOption(TFetchOption fetchOption) {
+        this.fetchOption = fetchOption;
+    }
+
     @Override
     protected TDataSink toThrift() {
         TDataSink result = new TDataSink(TDataSinkType.RESULT_SINK);
         TResultSink tResultSink = new TResultSink();
+        if (fetchOption != null) {
+            tResultSink.setFetchOption(fetchOption);
+        }
         result.setResultSink(tResultSink);
         return result;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index 743c4fb54f..aab8f44186 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -144,7 +144,7 @@ public class SortNode extends PlanNode {
     }
 
     public boolean getUseTwoPhaseReadOpt() {
-        return useTopnOpt;
+        return this.useTwoPhaseReadOpt;
     }
 
     public void setUseTwoPhaseReadOpt(boolean useTwoPhaseReadOpt) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index de4db69569..2c5fd90f21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -35,6 +35,8 @@ import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend.BackendState;
+import org.apache.doris.thrift.TNodeInfo;
+import org.apache.doris.thrift.TPaloNodesInfo;
 import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TStorageMedium;
 
@@ -156,6 +158,16 @@ public class SystemInfoService {
         }
     };
 
+    public static TPaloNodesInfo createAliveNodesInfo() {
+        TPaloNodesInfo nodesInfo = new TPaloNodesInfo();
+        SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
+        for (Long id : systemInfoService.getBackendIds(true /*need alive*/)) {
+            Backend backend = systemInfoService.getBackend(id);
+            nodesInfo.addToNodes(new TNodeInfo(backend.getId(), 0, 
backend.getHost(), backend.getBrpcPort()));
+        }
+        return nodesInfo;
+    }
+
     // for deploy manager
     public void addBackends(List<HostInfo> hostInfos, boolean isFree)
             throws UserException {
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 3b905897c8..7a12d2be12 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -1,4 +1,5 @@
 // Licensed to the Apache Software Foundation (ASF) under one
+
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
 // regarding copyright ownership.  The ASF licenses this file
@@ -603,23 +604,35 @@ message PFetchTableSchemaResult {
   repeated PTypeDesc column_types = 4;
 }
 
+message PRowLocation {
+    optional int64 tablet_id = 1;
+    optional string rowset_id = 2;
+    optional uint64 segment_id = 3;
+    optional uint64 ordinal_id = 4;
+}
+
 message PMultiGetRequest {
-    message RowId {
-        optional int64 tablet_id = 1;
-        optional string rowset_id = 2;
-        optional uint64 segment_id = 3;
-        optional uint64 ordinal_id = 4;
-    };
-    repeated RowId rowids = 1;
+    repeated PRowLocation row_locs = 1;
     optional PTupleDescriptor desc = 2;
     repeated PSlotDescriptor slots = 3;
     // for compability
     optional int32 be_exec_version = 4;
+    optional bool fetch_row_store = 5;
+    optional PUniqueId query_id = 6;
 };
 
 message PMultiGetResponse {
     optional PBlock block = 1;
     optional PStatus status = 2;
+ 
+    // more effecient serialization fields for row store
+    enum RowFormat {
+        JSONB = 0;
+    };
+    optional RowFormat format = 3;
+    repeated bytes binary_row_data = 4;
+    // for sorting rows
+    repeated PRowLocation row_locs = 5;
 };
 
 message PFetchColIdsRequest {
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index f2a191a777..7883aab0b6 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -160,9 +160,18 @@ struct TMultiCastDataStreamSink {
     2: optional list<list<TPlanFragmentDestination>> destinations;
 }
 
+struct TFetchOption {
+    1: optional bool use_two_phase_fetch;
+    // Nodes in this cluster, used for second phase fetch
+    2: optional Descriptors.TPaloNodesInfo nodes_info;
+    // Whether fetch row store
+    3: optional bool fetch_row_store;
+}
+
 struct TResultSink {
     1: optional TResultSinkType type;
-    2: optional TResultFileSinkOptions file_options // deprecated
+    2: optional TResultFileSinkOptions file_options; // deprecated
+    3: optional TFetchOption fetch_option;
 }
 
 struct TResultFileSink {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ba97ae77e5..8afc3fde4c 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -908,8 +908,6 @@ struct TExchangeNode {
   2: optional TSortInfo sort_info
   // This is tHe number of rows to skip before returning results
   3: optional i64 offset
-  // Nodes in this cluster, used for second phase fetch
-  4: optional Descriptors.TPaloNodesInfo nodes_info
 }
 
 struct TOlapRewriteNode {
diff --git 
a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q01.out 
b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q01.out
new file mode 100644
index 0000000000..ed6e469cbf
--- /dev/null
+++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q01.out
@@ -0,0 +1,8 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !dup_key_2pr_q01 --
+\N     \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N
+\N     \N      \N      \N      \N      \N      \N      \N      \N      \N      
\N      \N      \N      \N      \N      \N      \N
+2022-04-15T18:12:20    -1476674200     76077621753438032.418   false   -81     
-17630  -10269  -268792458      13778.415       -2.147435208942339E9    
93226194917360130.580   2022-07-02      2022-08-07T20:32:33     2022-02-06      
195.140.76.121  qwh...@ooba.org Maywood Junction 85
+2022-12-09T05:37:25    115193592       2535754112868463.975    true    85      
-4281   22951   -1062042991     4304.926        -2.147336076742606E9    
1661169792864805.553    2022-10-14      2022-07-19T18:25:57     2022-12-01      
118.127.225.101 xgard...@talane.gov     Milwaukee Point 50
+2022-08-07T21:42:59    380691749       46148671567644994.456   false   102     
-6175   -147    -521124424      17149.252       -2.147205556984866E9    
44529271870524715.164   2022-03-18      2022-02-07T01:28:54     2022-05-25      
0.24.121.144    jamesd...@meezzy.org    Glendale Hill 78
+
diff --git 
a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q02.out 
b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q02.out
new file mode 100644
index 0000000000..ff911d4bf8
--- /dev/null
+++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q02.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !dup_key_2pr_q02 --
+2023-01-10T15:40:04    -49426869       59390574629503991.413   true    -19     
-6791   -6536   240107090       22801.043       7.01523382557248E8      
6338683861031199.120    2022-05-15      2022-01-26T07:36:05     2022-04-26      
139.41.223.19   seanflo...@chatterpoint.gov     Laurel Way 44
+
diff --git 
a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q03.out 
b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q03.out
new file mode 100644
index 0000000000..c0b3e57752
--- /dev/null
+++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q03.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !dup_key_2pr_q03 --
+2022-01-11T06:20:14    1799797425      30891281288216959.961   false   -5      
7495    -10784  1909249054      -31658.44       -1.992204200787845E9    
6080150162640151.680    2022-04-08      2022-09-16T21:29:08     2022-04-18      
29.121.52.42    ducimus_inventore_consequa...@dabfeed.net       Coleman Lane 51
+
diff --git 
a/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q04.out 
b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q04.out
new file mode 100644
index 0000000000..b05344a0b4
--- /dev/null
+++ b/regression-test/data/datatype_p0/scalar_types/sql/dup_key_2pr_q04.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !dup_key_2pr_q04 --
+2022-01-10T16:11:53    1906472648      30323570439371776.353   true    -64     
23204   -27723  1753599187      -28017.578      -8.7412093534361E7      
34175877199258167.909   2023-01-07      2022-07-02T15:40:57     2022-10-09      
60.230.5.23     asperiores_...@chatterpoint.name        Summit Plaza 70
+
diff --git a/regression-test/suites/datatype_p0/scalar_types/load.groovy 
b/regression-test/suites/datatype_p0/scalar_types/load.groovy
index 1eb73201d3..954f1df03a 100644
--- a/regression-test/suites/datatype_p0/scalar_types/load.groovy
+++ b/regression-test/suites/datatype_p0/scalar_types/load.groovy
@@ -48,7 +48,7 @@ suite("test_scalar_types_load", "p0") {
         DUPLICATE KEY(`k1`)
         COMMENT 'OLAP'
         DISTRIBUTED BY HASH(`k1`) BUCKETS 10
-        PROPERTIES("replication_num" = "1");
+        PROPERTIES("replication_num" = "1", "store_row_column" = "true");
         """
 
     // load data
@@ -100,7 +100,7 @@ suite("test_scalar_types_load", "p0") {
         DUPLICATE KEY(`c_datetimev2`, `c_bigint`, `c_decimalv3`)
         COMMENT 'OLAP'
         DISTRIBUTED BY HASH(`c_bigint`) BUCKETS 10
-        PROPERTIES("replication_num" = "1");
+        PROPERTIES("replication_num" = "1", "store_row_column" = "true");
         """
 
     // insert data into unique key table1 2 times
@@ -136,7 +136,7 @@ suite("test_scalar_types_load", "p0") {
         UNIQUE KEY(`c_datetimev2`, `c_bigint`, `c_decimalv3`)
         COMMENT 'OLAP'
         DISTRIBUTED BY HASH(`c_bigint`) BUCKETS 10
-        PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" 
= "true");
+        PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" 
= "true", "store_row_column" = "true");
         """
 
     // insert data into unique key table1 2 times
@@ -232,7 +232,7 @@ suite("test_scalar_types_load", "p0") {
         DUPLICATE KEY(`k1`)
         COMMENT 'OLAP'
         DISTRIBUTED BY HASH(`k1`) BUCKETS 10
-        PROPERTIES("replication_num" = "1");
+        PROPERTIES("replication_num" = "1", "store_row_column" = "true");
         """
 
     // insert data into dup table with index
diff --git 
a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q01.sql 
b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q01.sql
new file mode 100644
index 0000000000..a193207216
--- /dev/null
+++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q01.sql
@@ -0,0 +1 @@
+SELECT * FROM tbl_scalar_types_dup_3keys ORDER BY c_double, c_datetimev2, 
c_decimal  LIMIT 5;
\ No newline at end of file
diff --git 
a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q02.sql 
b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q02.sql
new file mode 100644
index 0000000000..a181e88cb8
--- /dev/null
+++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q02.sql
@@ -0,0 +1 @@
+SELECT * FROM tbl_scalar_types_dup_3keys where c_datetimev2 = '2023-01-10 
15:40:04';
\ No newline at end of file
diff --git 
a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q03.sql 
b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q03.sql
new file mode 100644
index 0000000000..820b02760d
--- /dev/null
+++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q03.sql
@@ -0,0 +1 @@
+SELECT * FROM tbl_scalar_types_dup_3keys where c_bigint = 1799797425;
\ No newline at end of file
diff --git 
a/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q04.sql 
b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q04.sql
new file mode 100644
index 0000000000..7d91b8ba31
--- /dev/null
+++ b/regression-test/suites/datatype_p0/scalar_types/sql/dup_key_2pr_q04.sql
@@ -0,0 +1 @@
+SELECT * FROM tbl_scalar_types_dup_3keys where c_largeint = '1753599187';
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to