This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 39b5682d59 [Pipeline](shared_scan_opt) Support shared scan opt in 
pipeline exec engine
39b5682d59 is described below

commit 39b5682d59e798fa7fb60315dbf823aa34888891
Author: HappenLee <happen...@hotmail.com>
AuthorDate: Mon Mar 13 10:33:57 2023 +0800

    [Pipeline](shared_scan_opt) Support shared scan opt in pipeline exec engine
---
 be/src/common/status.h                             |   4 +
 be/src/exprs/runtime_filter.cpp                    |  50 ----------
 be/src/pipeline/exec/scan_operator.cpp             |  20 ++--
 be/src/pipeline/pipeline_fragment_context.cpp      |   3 +
 be/src/pipeline/pipeline_task.cpp                  |   3 +
 be/src/runtime/query_fragments_ctx.h               |   7 ++
 be/src/runtime/runtime_state.h                     |   6 ++
 be/src/vec/CMakeLists.txt                          |   3 +-
 be/src/vec/exec/scan/new_olap_scan_node.cpp        |   7 +-
 be/src/vec/exec/scan/pip_scanner_context.h         |  62 ++++++++++--
 be/src/vec/exec/scan/scanner_context.cpp           | 105 ++++++++++++---------
 be/src/vec/exec/scan/scanner_context.h             |  49 ++++------
 be/src/vec/exec/scan/vscan_node.cpp                | 105 +++++++++++++++------
 be/src/vec/exec/scan/vscan_node.h                  |  12 ++-
 be/src/vec/functions/function_timestamp.cpp        |   2 +-
 be/src/vec/runtime/shared_hash_table_controller.h  |   2 +-
 be/src/vec/runtime/shared_scanner_controller.h     |  69 ++++++++++++++
 .../properties/ChildOutputPropertyDeriver.java     |   5 +-
 .../org/apache/doris/planner/OlapScanNode.java     |   3 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  25 +++--
 .../java/org/apache/doris/qe/SessionVariable.java  |   1 +
 gensrc/thrift/PaloInternalService.thrift           |   4 +
 .../data/nereids_syntax_p0/grouping_sets.out       |  11 ++-
 .../nereids_syntax_p0/sub_query_correlated.out     |  90 +++++++++---------
 .../sub_query_diff_old_optimize.out                |  12 +--
 .../correctness_p0/test_colocate_join.groovy       |   2 +-
 .../duplicate/storage/test_dup_tab_char.groovy     |   2 +-
 .../nereids_function_p0/gen_function/gen.groovy    |   2 +-
 .../suites/nereids_syntax_p0/grouping_sets.groovy  |   4 +-
 .../suites/nereids_syntax_p0/set_operation.groovy  |   6 +-
 .../nereids_syntax_p0/sub_query_correlated.groovy  |  46 ++++-----
 .../sub_query_diff_old_optimize.groovy             |   4 +-
 .../performance_p0/redundant_conjuncts.groovy      |   4 +-
 .../conditional_functions/test_query_limit.groovy  |   6 +-
 .../window_functions/test_window_fn.groovy         |  20 ++--
 35 files changed, 472 insertions(+), 284 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index e4fdc27911..2c0da39b8e 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -246,6 +246,7 @@ E(SEGCOMPACTION_INIT_READER, -3117);
 E(SEGCOMPACTION_INIT_WRITER, -3118);
 E(SEGCOMPACTION_FAILED, -3119);
 E(PIP_WAIT_FOR_RF, -3120);
+E(PIP_WAIT_FOR_SC, -3121);
 E(INVERTED_INDEX_INVALID_PARAMETERS, -6000);
 E(INVERTED_INDEX_NOT_SUPPORTED, -6001);
 E(INVERTED_INDEX_CLUCENE_ERROR, -6002);
@@ -383,6 +384,7 @@ public:
     ERROR_CTOR(EndOfFile, END_OF_FILE)
     ERROR_CTOR(InternalError, INTERNAL_ERROR)
     ERROR_CTOR(WaitForRf, PIP_WAIT_FOR_RF)
+    ERROR_CTOR(WaitForScannerContext, PIP_WAIT_FOR_SC)
     ERROR_CTOR(RuntimeError, RUNTIME_ERROR)
     ERROR_CTOR(Cancelled, CANCELLED)
     ERROR_CTOR(MemoryLimitExceeded, MEM_LIMIT_EXCEEDED)
@@ -402,6 +404,8 @@ public:
 
     bool ok() const { return _code == ErrorCode::OK; }
 
+    bool is_blocked_by_sc() const { return _code == 
ErrorCode::PIP_WAIT_FOR_SC; }
+
     bool is_io_error() const {
         return ErrorCode::IO_ERROR == _code || ErrorCode::READ_UNENOUGH == 
_code ||
                ErrorCode::CHECKSUM_ERROR == _code || 
ErrorCode::FILE_DATA_ERROR == _code ||
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 7ec1f5a21c..5db63116cd 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -43,56 +43,6 @@
 #include "vec/runtime/shared_hash_table_controller.h"
 
 namespace doris {
-// PrimitiveType->TExprNodeType
-// TODO: use constexpr if we use c++14
-TExprNodeType::type get_expr_node_type(PrimitiveType type) {
-    switch (type) {
-    case TYPE_BOOLEAN:
-        return TExprNodeType::BOOL_LITERAL;
-
-    case TYPE_TINYINT:
-    case TYPE_SMALLINT:
-    case TYPE_INT:
-    case TYPE_BIGINT:
-        return TExprNodeType::INT_LITERAL;
-
-    case TYPE_LARGEINT:
-        return TExprNodeType::LARGE_INT_LITERAL;
-        break;
-
-    case TYPE_NULL:
-        return TExprNodeType::NULL_LITERAL;
-
-    case TYPE_FLOAT:
-    case TYPE_DOUBLE:
-    case TYPE_TIME:
-    case TYPE_TIMEV2:
-        return TExprNodeType::FLOAT_LITERAL;
-        break;
-
-    case TYPE_DECIMAL32:
-    case TYPE_DECIMAL64:
-    case TYPE_DECIMAL128I:
-    case TYPE_DECIMALV2:
-        return TExprNodeType::DECIMAL_LITERAL;
-
-    case TYPE_DATETIME:
-    case TYPE_DATEV2:
-    case TYPE_DATETIMEV2:
-        return TExprNodeType::DATE_LITERAL;
-
-    case TYPE_CHAR:
-    case TYPE_VARCHAR:
-    case TYPE_HLL:
-    case TYPE_OBJECT:
-    case TYPE_STRING:
-        return TExprNodeType::STRING_LITERAL;
-
-    default:
-        DCHECK(false) << "Invalid type.";
-        return TExprNodeType::NULL_LITERAL;
-    }
-}
 
 // PrimitiveType-> PColumnType
 // TODO: use constexpr if we use c++14
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index f673ca1afa..6451927f77 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -25,13 +25,21 @@ namespace doris::pipeline {
 OPERATOR_CODE_GENERATOR(ScanOperator, SourceOperator)
 
 bool ScanOperator::can_read() {
-    if (_node->_eos || _node->_scanner_ctx->done() || 
_node->_scanner_ctx->no_schedule()) {
-        // _eos: need eos
-        // _scanner_ctx->done(): need finish
-        // _scanner_ctx->no_schedule(): should schedule _scanner_ctx
-        return true;
+    if (!_node->_opened) {
+        if (_node->_should_create_scanner || _node->ready_to_open()) {
+            return true;
+        } else {
+            return false;
+        }
     } else {
-        return !_node->_scanner_ctx->empty_in_queue(); // there are some 
blocks to process
+        if (_node->_eos || _node->_scanner_ctx->done() || 
_node->_scanner_ctx->no_schedule()) {
+            // _eos: need eos
+            // _scanner_ctx->done(): need finish
+            // _scanner_ctx->no_schedule(): should schedule _scanner_ctx
+            return true;
+        } else {
+            return _node->ready_to_read(); // there are some blocks to process
+        }
     }
 }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 727383d1a5..c2852b67a9 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -344,6 +344,9 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     if (request.__isset.load_job_id) {
         _runtime_state->set_load_job_id(request.load_job_id);
     }
+    if (request.__isset.shared_scan_opt) {
+        _runtime_state->set_shared_scan_opt(request.shared_scan_opt);
+    }
 
     if (request.query_options.__isset.is_report_success) {
         
fragment_context->set_is_report_success(request.query_options.is_report_success);
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 6f5732121e..fecb7d61b3 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -135,6 +135,9 @@ Status PipelineTask::execute(bool* eos) {
             if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
                 set_state(PipelineTaskState::BLOCKED_FOR_RF);
                 return Status::OK();
+            } else if (st.is<ErrorCode::PIP_WAIT_FOR_SC>()) {
+                set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
+                return Status::OK();
             }
             RETURN_IF_ERROR(st);
         }
diff --git a/be/src/runtime/query_fragments_ctx.h 
b/be/src/runtime/query_fragments_ctx.h
index 389cecb860..098bcce063 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -33,6 +33,7 @@
 #include "util/threadpool.h"
 #include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/shared_hash_table_controller.h"
+#include "vec/runtime/shared_scanner_controller.h"
 
 namespace doris {
 
@@ -46,6 +47,7 @@ public:
             : fragment_num(total_fragment_num), timeout_second(-1), 
_exec_env(exec_env) {
         _start_time = DateTimeValue::local_time();
         _shared_hash_table_controller.reset(new 
vectorized::SharedHashTableController());
+        _shared_scanner_controller.reset(new 
vectorized::SharedScannerController());
     }
 
     ~QueryFragmentsCtx() {
@@ -122,6 +124,10 @@ public:
         return _shared_hash_table_controller;
     }
 
+    std::shared_ptr<vectorized::SharedScannerController> 
get_shared_scanner_controller() {
+        return _shared_scanner_controller;
+    }
+
     vectorized::RuntimePredicate& get_runtime_predicate() { return 
_runtime_predicate; }
 
 public:
@@ -167,6 +173,7 @@ private:
     std::atomic<bool> _is_cancelled {false};
 
     std::shared_ptr<vectorized::SharedHashTableController> 
_shared_hash_table_controller;
+    std::shared_ptr<vectorized::SharedScannerController> 
_shared_scanner_controller;
     vectorized::RuntimePredicate _runtime_predicate;
 };
 
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index d309390d6a..7ded1c66fc 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -88,6 +88,7 @@ public:
     bool abort_on_default_limit_exceeded() const {
         return _query_options.abort_on_default_limit_exceeded;
     }
+    int query_parallel_instance_num() const { return 
_query_options.parallel_instance; }
     int max_errors() const { return _query_options.max_errors; }
     int query_timeout() const { return _query_options.query_timeout; }
     int insert_timeout() const { return _query_options.insert_timeout; }
@@ -200,6 +201,10 @@ public:
 
     int64_t load_job_id() const { return _load_job_id; }
 
+    void set_shared_scan_opt(bool shared_scan_opt) { _shared_scan_opt = 
shared_scan_opt; }
+
+    bool shared_scan_opt() const { return _shared_scan_opt; }
+
     const std::string get_error_log_file_path() const { return 
_error_log_file_path; }
 
     // append error msg and error line to file when loading data.
@@ -453,6 +458,7 @@ private:
     std::string _db_name;
     std::string _load_dir;
     int64_t _load_job_id;
+    bool _shared_scan_opt = false;
 
     // mini load
     int64_t _normal_row_number;
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 29bea35d47..12343adf80 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -320,8 +320,7 @@ set(VEC_FILES
 if (WITH_MYSQL)
     set(VEC_FILES
             ${VEC_FILES}
-            exec/scan/mysql_scanner.cpp
-            )
+            exec/scan/mysql_scanner.cpp)
 endif ()
 
 add_library(Vec STATIC
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 8761ca380c..55babf3066 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -411,10 +411,9 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
         TabletSharedPtr tablet =
                 
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id, true, &err);
         if (tablet == nullptr) {
-            std::stringstream ss;
-            ss << "failed to get tablet: " << tablet_id << ", reason: " << err;
-            LOG(WARNING) << ss.str();
-            return Status::InternalError(ss.str());
+            auto err_str = fmt::format("failed to get tablet: {}, reason: {}", 
tablet_id, err);
+            LOG(WARNING) << err_str;
+            return Status::InternalError(err_str);
         }
 
         std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = 
&cond_ranges;
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h 
b/be/src/vec/exec/scan/pip_scanner_context.h
index 57d458d770..0e418256c2 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -33,20 +33,68 @@ public:
             : vectorized::ScannerContext(state, parent, input_tuple_desc, 
output_tuple_desc,
                                          scanners, limit, 
max_bytes_in_blocks_queue) {}
 
-    void _update_block_queue_empty() override { _blocks_queue_empty = 
_blocks_queue.empty(); }
+    Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
+                                int id, bool wait = false) override {
+        {
+            std::unique_lock<std::mutex> l(_transfer_lock);
+            if (state->is_cancelled()) {
+                _process_status = Status::Cancelled("cancelled");
+            }
 
-    Status get_block_from_queue(vectorized::BlockUPtr* block, bool* eos,
-                                bool wait = false) override {
-        return vectorized::ScannerContext::get_block_from_queue(block, eos, 
false);
+            if (!_process_status.ok()) {
+                return _process_status;
+            }
+        }
+
+        {
+            std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
+            if (!_blocks_queues[id].empty()) {
+                *block = std::move(_blocks_queues[id].front());
+                _blocks_queues[id].pop_front();
+                return Status::OK();
+            } else {
+                *eos = _is_finished || _should_stop;
+            }
+        }
+        return Status::OK();
     }
 
     // We should make those method lock free.
     bool done() override { return _is_finished || _should_stop || 
_status_error; }
-    bool no_schedule() override { return _num_running_scanners == 0 && 
_num_scheduling_ctx == 0; }
-    bool empty_in_queue() override { return _blocks_queue_empty; }
+
+    void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) 
override {
+        const int queue_size = _queue_mutexs.size();
+        const int block_size = blocks.size();
+        for (int i = 0; i < queue_size && i < block_size; ++i) {
+            int queue = _next_queue_to_feed;
+            {
+                std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
+                for (int j = i; j < block_size; j += queue_size) {
+                    _blocks_queues[queue].emplace_back(std::move(blocks[j]));
+                }
+            }
+            _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
+        }
+    }
+
+    bool empty_in_queue(int id) override {
+        std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
+        return _blocks_queues[id].empty();
+    }
+
+    void set_max_queue_size(int max_queue_size) override {
+        for (int i = 0; i < max_queue_size; ++i) {
+            _blocks_queue_empty.emplace_back(true);
+            _queue_mutexs.emplace_back(new std::mutex);
+            _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
+        }
+    }
 
 private:
-    std::atomic_bool _blocks_queue_empty = true;
+    int _next_queue_to_feed = 0;
+    std::vector<bool> _blocks_queue_empty;
+    std::vector<std::unique_ptr<std::mutex>> _queue_mutexs;
+    std::vector<std::list<vectorized::BlockUPtr>> _blocks_queues;
 };
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 70a962b8ea..16d8971227 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -23,24 +23,53 @@
 #include "runtime/runtime_state.h"
 #include "util/threadpool.h"
 #include "vec/core/block.h"
-#include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/exec/scan/vscan_node.h"
 #include "vec/exec/scan/vscanner.h"
 
 namespace doris::vectorized {
 
+ScannerContext::ScannerContext(doris::RuntimeState* state_, 
doris::vectorized::VScanNode* parent,
+                               const doris::TupleDescriptor* input_tuple_desc,
+                               const doris::TupleDescriptor* output_tuple_desc,
+                               const std::list<VScanner*>& scanners_, int64_t 
limit_,
+                               int64_t max_bytes_in_blocks_queue_)
+        : _state(state_),
+          _parent(parent),
+          _input_tuple_desc(input_tuple_desc),
+          _output_tuple_desc(output_tuple_desc),
+          _process_status(Status::OK()),
+          _batch_size(state_->batch_size()),
+          limit(limit_),
+          _max_bytes_in_queue(max_bytes_in_blocks_queue_),
+          _scanner_scheduler(state_->exec_env()->scanner_scheduler()),
+          _scanners(scanners_) {
+    ctx_id = UniqueId::gen_uid().to_string();
+    if (_scanners.empty()) {
+        _is_finished = true;
+    }
+}
+
+// After init function call, should not access _parent
 Status ScannerContext::init() {
     _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc : 
_output_tuple_desc;
     // 1. Calculate max concurrency
     // TODO: now the max thread num <= 
config::doris_scanner_thread_pool_thread_num / 4
     // should find a more reasonable value.
-    _max_thread_num =
-            std::min(config::doris_scanner_thread_pool_thread_num / 4, 
(int32_t)_scanners.size());
+    _max_thread_num = _state->shared_scan_opt() ? 
config::doris_scanner_thread_pool_thread_num
+                                                : 
config::doris_scanner_thread_pool_thread_num / 4;
+    _max_thread_num = std::min(_max_thread_num, (int32_t)_scanners.size());
     // For select * from table limit 10; should just use one thread.
     if (_parent->should_run_serial()) {
         _max_thread_num = 1;
     }
 
+    _scanner_profile = _parent->_scanner_profile;
+    _scanner_sched_counter = _parent->_scanner_sched_counter;
+    _scanner_ctx_sched_counter = _parent->_scanner_ctx_sched_counter;
+    _free_blocks_memory_usage = _parent->_free_blocks_memory_usage;
+    _newly_create_free_blocks_num = _parent->_newly_create_free_blocks_num;
+    _queued_blocks_memory_usage = _parent->_queued_blocks_memory_usage;
+    _scanner_wait_batch_timer = _parent->_scanner_wait_batch_timer;
     // 2. Calculate how many blocks need to be preallocated.
     // The calculation logic is as follows:
     //  1. Assuming that at most M rows can be scanned in one 
scan(config::doris_scanner_row_num),
@@ -50,8 +79,8 @@ Status ScannerContext::init() {
     auto doris_scanner_row_num =
             limit == -1 ? config::doris_scanner_row_num
                         : 
std::min(static_cast<int64_t>(config::doris_scanner_row_num), limit);
-    int real_block_size = limit == -1 ? _state->batch_size()
-                                      : 
std::min(static_cast<int64_t>(_state->batch_size()), limit);
+    int real_block_size =
+            limit == -1 ? _batch_size : 
std::min(static_cast<int64_t>(_batch_size), limit);
     _block_per_scanner = (doris_scanner_row_num + (real_block_size - 1)) / 
real_block_size;
     auto pre_alloc_block_count = _max_thread_num * _block_per_scanner;
 
@@ -64,7 +93,7 @@ Status ScannerContext::init() {
         free_blocks_memory_usage += block->allocated_bytes();
         _free_blocks.emplace_back(std::move(block));
     }
-    _parent->_free_blocks_memory_usage->add(free_blocks_memory_usage);
+    _free_blocks_memory_usage->add(free_blocks_memory_usage);
 
 #ifndef BE_TEST
     // 3. get thread token
@@ -91,20 +120,20 @@ vectorized::BlockUPtr ScannerContext::get_free_block(bool* 
has_free_block) {
         if (!_free_blocks.empty()) {
             auto block = std::move(_free_blocks.back());
             _free_blocks.pop_back();
-            _parent->_free_blocks_memory_usage->add(-block->allocated_bytes());
+            _free_blocks_memory_usage->add(-block->allocated_bytes());
             return block;
         }
     }
     *has_free_block = false;
 
-    COUNTER_UPDATE(_parent->_newly_create_free_blocks_num, 1);
-    return std::make_unique<vectorized::Block>(_real_tuple_desc->slots(), 
_state->batch_size(),
+    COUNTER_UPDATE(_newly_create_free_blocks_num, 1);
+    return std::make_unique<vectorized::Block>(_real_tuple_desc->slots(), 
_batch_size,
                                                true /*ignore invalid slots*/);
 }
 
 void ScannerContext::return_free_block(std::unique_ptr<vectorized::Block> 
block) {
     block->clear_column_data();
-    _parent->_free_blocks_memory_usage->add(block->allocated_bytes());
+    _free_blocks_memory_usage->add(block->allocated_bytes());
     std::lock_guard l(_free_blocks_lock);
     _free_blocks.emplace_back(std::move(block));
 }
@@ -117,18 +146,18 @@ void 
ScannerContext::append_blocks_to_queue(std::vector<vectorized::BlockUPtr>&
         _blocks_queue.push_back(std::move(b));
     }
     blocks.clear();
-    _update_block_queue_empty();
     _blocks_queue_added_cv.notify_one();
-    _parent->_queued_blocks_memory_usage->add(_cur_bytes_in_queue - 
old_bytes_in_queue);
+    _queued_blocks_memory_usage->add(_cur_bytes_in_queue - old_bytes_in_queue);
 }
 
-bool ScannerContext::empty_in_queue() {
-    std::unique_lock l(_transfer_lock);
+bool ScannerContext::empty_in_queue(int id) {
+    std::unique_lock<std::mutex> l(_transfer_lock);
     return _blocks_queue.empty();
 }
 
-Status ScannerContext::get_block_from_queue(vectorized::BlockUPtr* block, 
bool* eos, bool wait) {
-    std::unique_lock l(_transfer_lock);
+Status ScannerContext::get_block_from_queue(RuntimeState* state, 
vectorized::BlockUPtr* block,
+                                            bool* eos, int id, bool wait) {
+    std::unique_lock<std::mutex> l(_transfer_lock);
     // Normally, the scanner scheduler will schedule ctx.
     // But when the amount of data in the blocks queue exceeds the upper limit,
     // the scheduler will stop scheduling.
@@ -137,18 +166,18 @@ Status 
ScannerContext::get_block_from_queue(vectorized::BlockUPtr* block, bool*
     // data can be continuously fetched.
     if (_has_enough_space_in_blocks_queue() && _num_running_scanners == 0) {
         _num_scheduling_ctx++;
-        _state->exec_env()->scanner_scheduler()->submit(this);
+        _scanner_scheduler->submit(this);
     }
     // Wait for block from queue
     if (wait) {
-        SCOPED_TIMER(_parent->_scanner_wait_batch_timer);
+        SCOPED_TIMER(_scanner_wait_batch_timer);
         while (!(!_blocks_queue.empty() || _is_finished || 
!_process_status.ok() ||
-                 _state->is_cancelled())) {
+                 state->is_cancelled())) {
             _blocks_queue_added_cv.wait(l);
         }
     }
 
-    if (_state->is_cancelled()) {
+    if (state->is_cancelled()) {
         _process_status = Status::Cancelled("cancelled");
     }
 
@@ -159,10 +188,9 @@ Status 
ScannerContext::get_block_from_queue(vectorized::BlockUPtr* block, bool*
     if (!_blocks_queue.empty()) {
         *block = std::move(_blocks_queue.front());
         _blocks_queue.pop_front();
-        _update_block_queue_empty();
         auto block_bytes = (*block)->allocated_bytes();
         _cur_bytes_in_queue -= block_bytes;
-        _parent->_queued_blocks_memory_usage->add(-block_bytes);
+        _queued_blocks_memory_usage->add(-block_bytes);
         return Status::OK();
     } else {
         *eos = _is_finished;
@@ -181,9 +209,9 @@ bool ScannerContext::set_status_on_error(const Status& 
status) {
     return false;
 }
 
-Status ScannerContext::_close_and_clear_scanners() {
-    std::unique_lock l(_scanners_lock);
-    if (_state->enable_profile()) {
+Status ScannerContext::_close_and_clear_scanners(VScanNode* node, 
RuntimeState* state) {
+    std::unique_lock<std::mutex> l(_scanners_lock);
+    if (state->enable_profile()) {
         std::stringstream scanner_statistics;
         std::stringstream scanner_rows_read;
         scanner_statistics << "[";
@@ -207,13 +235,12 @@ Status ScannerContext::_close_and_clear_scanners() {
         }
         scanner_statistics << "]";
         scanner_rows_read << "]";
-        _parent->_scanner_profile->add_info_string("PerScannerRunningTime",
-                                                   scanner_statistics.str());
-        _parent->_scanner_profile->add_info_string("PerScannerRowsRead", 
scanner_rows_read.str());
+        node->_scanner_profile->add_info_string("PerScannerRunningTime", 
scanner_statistics.str());
+        node->_scanner_profile->add_info_string("PerScannerRowsRead", 
scanner_rows_read.str());
     }
     // Only unfinished scanners here
     for (auto scanner : _scanners) {
-        scanner->close(_state);
+        scanner->close(state);
         // Scanners are in ObjPool in ScanNode,
         // so no need to delete them here.
     }
@@ -221,8 +248,8 @@ Status ScannerContext::_close_and_clear_scanners() {
     return Status::OK();
 }
 
-void ScannerContext::clear_and_join() {
-    std::unique_lock l(_transfer_lock);
+void ScannerContext::clear_and_join(VScanNode* node, RuntimeState* state) {
+    std::unique_lock<std::mutex> l(_transfer_lock);
     do {
         if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) {
             break;
@@ -239,14 +266,10 @@ void ScannerContext::clear_and_join() {
     }
     // Must wait all running scanners stop running.
     // So that we can make sure to close all scanners.
-    _close_and_clear_scanners();
+    _close_and_clear_scanners(node, state);
 
-    COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling);
-    COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling);
     _blocks_queue.clear();
     _free_blocks.clear();
-
-    return;
 }
 
 bool ScannerContext::no_schedule() {
@@ -273,7 +296,7 @@ void 
ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
 
     std::lock_guard l(_transfer_lock);
     _num_scheduling_ctx++;
-    auto submit_st = _state->exec_env()->scanner_scheduler()->submit(this);
+    auto submit_st = _scanner_scheduler->submit(this);
     if (!submit_st.ok()) {
         _num_scheduling_ctx--;
     }
@@ -285,11 +308,6 @@ void 
ScannerContext::push_back_scanner_and_reschedule(VScanner* scanner) {
     // same scanner.
     if (scanner->need_to_close() && scanner->set_counted_down() &&
         (--_num_unfinished_scanners) == 0) {
-        // ATTN: this 2 counters will be set at close() again, which is the 
final values.
-        // But we set them here because the counter set at close() can not 
send to FE's profile.
-        // So we set them here, and the counter value may be little less than 
final values.
-        COUNTER_SET(_parent->_scanner_sched_counter, _num_scanner_scheduling);
-        COUNTER_SET(_parent->_scanner_ctx_sched_counter, _num_ctx_scheduling);
         _is_finished = true;
         _blocks_queue_added_cv.notify_one();
     }
@@ -306,7 +324,7 @@ void 
ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_ru
         if (_has_enough_space_in_blocks_queue()) {
             // If there are enough space in blocks queue,
             // the scanner number depends on the _free_blocks numbers
-            std::lock_guard l(_free_blocks_lock);
+            std::lock_guard f(_free_blocks_lock);
             thread_slot_num = _free_blocks.size() / _block_per_scanner;
             thread_slot_num += (_free_blocks.size() % _block_per_scanner != 0);
             thread_slot_num = std::min(thread_slot_num, _max_thread_num - 
_num_running_scanners);
@@ -340,7 +358,6 @@ void 
ScannerContext::get_next_batch_of_scanners(std::list<VScanner*>* current_ru
             }
         }
     }
-    return;
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 09500d59cf..1285467a5d 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -32,7 +32,6 @@ namespace doris {
 class PriorityThreadPool;
 class ThreadPool;
 class ThreadPoolToken;
-class ScannerScheduler;
 
 namespace vectorized {
 
@@ -51,35 +50,21 @@ class ScannerContext {
 public:
     ScannerContext(RuntimeState* state_, VScanNode* parent, const 
TupleDescriptor* input_tuple_desc,
                    const TupleDescriptor* output_tuple_desc, const 
std::list<VScanner*>& scanners_,
-                   int64_t limit_, int64_t max_bytes_in_blocks_queue_)
-            : _state(state_),
-              _parent(parent),
-              _input_tuple_desc(input_tuple_desc),
-              _output_tuple_desc(output_tuple_desc),
-              _process_status(Status::OK()),
-              limit(limit_),
-              _max_bytes_in_queue(max_bytes_in_blocks_queue_),
-              _scanners(scanners_) {
-        ctx_id = UniqueId::gen_uid().to_string();
-        if (_scanners.empty()) {
-            _is_finished = true;
-        }
-    }
+                   int64_t limit_, int64_t max_bytes_in_blocks_queue_);
 
     virtual ~ScannerContext() = default;
-
     Status init();
 
     vectorized::BlockUPtr get_free_block(bool* has_free_block);
     void return_free_block(std::unique_ptr<vectorized::Block> block);
 
     // Append blocks from scanners to the blocks queue.
-    void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks);
-
+    virtual void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& 
blocks);
     // Get next block from blocks queue. Called by ScanNode
     // Set eos to true if there is no more data to read.
     // And if eos is true, the block returned must be nullptr.
-    virtual Status get_block_from_queue(vectorized::BlockUPtr* block, bool* 
eos, bool wait = true);
+    virtual Status get_block_from_queue(RuntimeState* state, 
vectorized::BlockUPtr* block,
+                                        bool* eos, int id, bool wait = true);
 
     // When a scanner complete a scan, this method will be called
     // to return the scanner to the list for next scheduling.
@@ -121,7 +106,7 @@ public:
 
     void get_next_batch_of_scanners(std::list<VScanner*>* current_run);
 
-    void clear_and_join();
+    void clear_and_join(VScanNode* node, RuntimeState* state);
 
     virtual bool no_schedule();
 
@@ -129,12 +114,14 @@ public:
 
     RuntimeState* state() { return _state; }
 
-    void incr_num_ctx_scheduling(int64_t num) { _num_ctx_scheduling += num; }
-    void incr_num_scanner_scheduling(int64_t num) { _num_scanner_scheduling += 
num; }
+    void incr_num_ctx_scheduling(int64_t num) { 
_scanner_ctx_sched_counter->update(num); }
+    void incr_num_scanner_scheduling(int64_t num) { 
_scanner_sched_counter->update(num); }
 
     VScanNode* parent() { return _parent; }
 
-    virtual bool empty_in_queue();
+    virtual bool empty_in_queue(int id);
+
+    virtual void set_max_queue_size(int max_queue_size) {};
 
     // the unique id of this context
     std::string ctx_id;
@@ -143,15 +130,12 @@ public:
     std::vector<bthread_t> _btids;
 
 private:
-    Status _close_and_clear_scanners();
+    Status _close_and_clear_scanners(VScanNode* node, RuntimeState* state);
 
     inline bool _has_enough_space_in_blocks_queue() const {
         return _cur_bytes_in_queue < _max_bytes_in_queue / 2;
     }
 
-    // do nothing here, we only do update on pip_scanner_context
-    virtual void _update_block_queue_empty() {}
-
 protected:
     RuntimeState* _state;
     VScanNode* _parent;
@@ -198,6 +182,7 @@ protected:
     doris::Mutex _free_blocks_lock;
     std::vector<vectorized::BlockUPtr> _free_blocks;
 
+    int _batch_size;
     // The limit from SQL's limit clause
     int64_t limit;
 
@@ -221,6 +206,7 @@ protected:
     // The max limit bytes of blocks in blocks queue
     int64_t _max_bytes_in_queue;
 
+    doris::vectorized::ScannerScheduler* _scanner_scheduler;
     // List "scanners" saves all "unfinished" scanners.
     // The scanner scheduler will pop scanners from this list, run scanner,
     // and then if the scanner is not finished, will be pushed back to this 
list.
@@ -230,8 +216,13 @@ protected:
     std::vector<int64_t> _finished_scanner_runtime;
     std::vector<int64_t> _finished_scanner_rows_read;
 
-    int64_t _num_ctx_scheduling = 0;
-    int64_t _num_scanner_scheduling = 0;
+    std::shared_ptr<RuntimeProfile> _scanner_profile;
+    RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
+    RuntimeProfile::Counter* _scanner_ctx_sched_counter = nullptr;
+    RuntimeProfile::HighWaterMarkCounter* _free_blocks_memory_usage = nullptr;
+    RuntimeProfile::HighWaterMarkCounter* _queued_blocks_memory_usage = 
nullptr;
+    RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
+    RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index e7b46ca659..f019df4b60 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -66,6 +66,8 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
 Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::init(tnode, state));
     _state = state;
+    _is_pipeline_scan = state->enable_pipeline_exec();
+    _shared_scan_opt = state->shared_scan_opt();
 
     const TQueryOptions& query_options = state->query_options();
     if (query_options.__isset.max_scan_key_num) {
@@ -92,6 +94,21 @@ Status VScanNode::prepare(RuntimeState* state) {
     for (auto& rf_ctx : _runtime_filter_ctxs) {
         rf_ctx.runtime_filter->init_profile(_runtime_profile.get());
     }
+
+    if (_is_pipeline_scan) {
+        if (_shared_scan_opt) {
+            _shared_scanner_controller =
+                    
state->get_query_fragments_ctx()->get_shared_scanner_controller();
+            auto [should_create_scanner, queue_id] =
+                    
_shared_scanner_controller->should_build_scanner_and_queue_id(id());
+            _should_create_scanner = should_create_scanner;
+            _context_queue_id = queue_id;
+        } else {
+            _should_create_scanner = true;
+            _context_queue_id = 0;
+        }
+    }
+
     return Status::OK();
 }
 
@@ -113,18 +130,37 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::alloc_resource(state));
     RETURN_IF_ERROR(_acquire_runtime_filter());
     RETURN_IF_ERROR(_process_conjuncts());
-    if (_eos) {
-        return Status::OK();
-    }
 
-    std::list<VScanner*> scanners;
-    RETURN_IF_ERROR(_init_scanners(&scanners));
-    if (scanners.empty()) {
-        _eos = true;
+    if (_is_pipeline_scan) {
+        if (_should_create_scanner) {
+            auto status = !_eos ? _prepare_scanners() : Status::OK();
+            if (_scanner_ctx) {
+                DCHECK(!_eos && _num_scanners->value() > 0);
+                _scanner_ctx->set_max_queue_size(
+                        _shared_scan_opt ? 
std::max(state->query_parallel_instance_num(), 1) : 1);
+                RETURN_IF_ERROR(
+                        
_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
+            }
+            if (_shared_scan_opt) {
+                _shared_scanner_controller->set_scanner_context(id(),
+                                                                _eos ? nullptr 
: _scanner_ctx);
+            }
+            RETURN_IF_ERROR(status);
+        } else if (_shared_scanner_controller->scanner_context_is_ready(id())) 
{
+            _scanner_ctx = 
_shared_scanner_controller->get_scanner_context(id());
+            if (!_scanner_ctx) {
+                _eos = true;
+            }
+        } else {
+            return Status::WaitForScannerContext("Need wait for scanner 
context create");
+        }
     } else {
-        COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
-        RETURN_IF_ERROR(_start_scanners(scanners));
+        RETURN_IF_ERROR(!_eos ? _prepare_scanners() : Status::OK());
+        if (_scanner_ctx) {
+            
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
+        }
     }
+
     RETURN_IF_CANCELLED(state);
     _opened = true;
     return Status::OK();
@@ -163,7 +199,7 @@ Status VScanNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool*
     }
 
     vectorized::BlockUPtr scan_block = nullptr;
-    RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(&scan_block, eos));
+    RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, &scan_block, 
eos, _context_queue_id));
     if (*eos) {
         DCHECK(scan_block == nullptr);
         return Status::OK();
@@ -184,12 +220,6 @@ Status VScanNode::get_next(RuntimeState* state, 
vectorized::Block* block, bool*
 
 Status VScanNode::_init_profile() {
     // 1. counters for scan node
-    auto* memory_usage = _runtime_profile->create_child("MemoryUsage", true, 
true);
-    _runtime_profile->add_child(memory_usage, false, nullptr);
-    _queued_blocks_memory_usage =
-            memory_usage->AddHighWaterMarkCounter("QueuedBlocks", 
TUnit::BYTES);
-    _free_blocks_memory_usage = 
memory_usage->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES);
-
     _rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", 
TUnit::UNIT);
     _total_throughput_counter =
             runtime_profile()->add_rate_counter("TotalReadThroughput", 
_rows_read_counter);
@@ -201,30 +231,36 @@ Status VScanNode::_init_profile() {
     _scanner_profile.reset(new RuntimeProfile("VScanner"));
     runtime_profile()->add_child(_scanner_profile.get(), true, nullptr);
 
+    auto* memory_usage = _scanner_profile->create_child("MemoryUsage", true, 
true);
+    _runtime_profile->add_child(memory_usage, false, nullptr);
+    _queued_blocks_memory_usage =
+            memory_usage->AddHighWaterMarkCounter("QueuedBlocks", 
TUnit::BYTES);
+    _free_blocks_memory_usage = 
memory_usage->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES);
+    _newly_create_free_blocks_num =
+            ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", 
TUnit::UNIT);
+    // time of transfer thread to wait for block from scan thread
+    _scanner_wait_batch_timer = ADD_TIMER(_scanner_profile, 
"ScannerBatchWaitTime");
+    _scanner_sched_counter = ADD_COUNTER(_scanner_profile, 
"ScannerSchedCount", TUnit::UNIT);
+    _scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile, 
"ScannerCtxSchedCount", TUnit::UNIT);
+
     _scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime");
     _scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScannerCpuTime");
     _prefilter_timer = ADD_TIMER(_scanner_profile, "ScannerPrefilterTime");
     _convert_block_timer = ADD_TIMER(_scanner_profile, 
"ScannerConvertBlockTime");
     _filter_timer = ADD_TIMER(_scanner_profile, "ScannerFilterTime");
 
-    _scanner_sched_counter = ADD_COUNTER(_runtime_profile, 
"ScannerSchedCount", TUnit::UNIT);
-    _scanner_ctx_sched_counter = ADD_COUNTER(_runtime_profile, 
"ScannerCtxSchedCount", TUnit::UNIT);
-    // time of transfer thread to wait for block from scan thread
-    _scanner_wait_batch_timer = ADD_TIMER(_runtime_profile, 
"ScannerBatchWaitTime");
     // time of scan thread to wait for worker thread of the thread pool
     _scanner_wait_worker_timer = ADD_TIMER(_runtime_profile, 
"ScannerWorkerWaitTime");
 
     _pre_alloc_free_blocks_num =
             ADD_COUNTER(_runtime_profile, "PreAllocFreeBlocksNum", 
TUnit::UNIT);
-    _newly_create_free_blocks_num =
-            ADD_COUNTER(_runtime_profile, "NewlyCreateFreeBlocksNum", 
TUnit::UNIT);
     _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, 
"MaxScannerThreadNum", TUnit::UNIT);
 
     return Status::OK();
 }
 
 Status VScanNode::_start_scanners(const std::list<VScanner*>& scanners) {
-    if (_state->enable_pipeline_exec()) {
+    if (_is_pipeline_scan) {
         _scanner_ctx.reset(new pipeline::PipScannerContext(_state, this, 
_input_tuple_desc,
                                                            _output_tuple_desc, 
scanners, limit(),
                                                            
_state->query_options().mem_limit / 20));
@@ -234,7 +270,6 @@ Status VScanNode::_start_scanners(const 
std::list<VScanner*>& scanners) {
                                               
_state->query_options().mem_limit / 20));
     }
     RETURN_IF_ERROR(_scanner_ctx->init());
-    
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
     return Status::OK();
 }
 
@@ -374,10 +409,12 @@ Status VScanNode::close(RuntimeState* state) {
 void VScanNode::release_resource(RuntimeState* state) {
     START_AND_SCOPE_SPAN(state->get_tracer(), span, 
"VScanNode::release_resource");
     if (_scanner_ctx.get()) {
-        // stop and wait the scanner scheduler to be done
-        // _scanner_ctx may not be created for some short circuit case.
-        _scanner_ctx->set_should_stop();
-        _scanner_ctx->clear_and_join();
+        if (!state->enable_pipeline_exec() || _should_create_scanner) {
+            // stop and wait the scanner scheduler to be done
+            // _scanner_ctx may not be created for some short circuit case.
+            _scanner_ctx->set_should_stop();
+            _scanner_ctx->clear_and_join(this, state);
+        }
     }
 
     for (auto& ctx : _runtime_filter_ctxs) {
@@ -1318,4 +1355,16 @@ VScanNode::PushDownType 
VScanNode::_should_push_down_in_predicate(VInPredicate*
     return PushDownType::ACCEPTABLE;
 }
 
+Status VScanNode::_prepare_scanners() {
+    std::list<VScanner*> scanners;
+    RETURN_IF_ERROR(_init_scanners(&scanners));
+    if (scanners.empty()) {
+        _eos = true;
+    } else {
+        COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
+        RETURN_IF_ERROR(_start_scanners(scanners));
+    }
+
+    return Status::OK();
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index 23b1e59b51..51dc2edb1b 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -103,6 +103,8 @@ public:
     Status try_close();
 
     bool should_run_serial() const { return _should_run_serial; }
+    bool ready_to_open() { return 
_shared_scanner_controller->scanner_context_is_ready(id()); }
+    bool ready_to_read() { return 
!_scanner_ctx->empty_in_queue(_context_queue_id); }
 
     enum class PushDownType {
         // The predicate can not be pushed down to data source
@@ -178,8 +180,12 @@ protected:
     // Only predicate on key column can be pushed down.
     virtual bool _is_key_column(const std::string& col_name) { return false; }
 
+    Status _prepare_scanners();
+
 protected:
     RuntimeState* _state;
+    bool _is_pipeline_scan = false;
+    bool _shared_scan_opt = false;
     // For load scan node, there should be both input and output tuple 
descriptor.
     // For query scan node, there is only output_tuple_desc.
     TupleId _input_tuple_id = -1;
@@ -267,7 +273,11 @@ protected:
     int64_t _limit_per_scanner = -1;
 
 protected:
-    std::unique_ptr<RuntimeProfile> _scanner_profile;
+    std::shared_ptr<vectorized::SharedScannerController> 
_shared_scanner_controller;
+    bool _should_create_scanner = false;
+    int _context_queue_id = -1;
+
+    std::shared_ptr<RuntimeProfile> _scanner_profile;
 
     // rows read from the scanner (including those discarded by (pre)filters)
     RuntimeProfile::Counter* _rows_read_counter;
diff --git a/be/src/vec/functions/function_timestamp.cpp 
b/be/src/vec/functions/function_timestamp.cpp
index 4ec1b30cc1..0ba8ec5f84 100644
--- a/be/src/vec/functions/function_timestamp.cpp
+++ b/be/src/vec/functions/function_timestamp.cpp
@@ -417,7 +417,7 @@ struct UnixTimeStampDateImpl {
     static Status execute_impl(FunctionContext* context, Block& block,
                                const ColumnNumbers& arguments, size_t result,
                                size_t input_rows_count) {
-        const ColumnPtr col_source = 
block.get_by_position(arguments[0]).column;
+        const ColumnPtr& col_source = 
block.get_by_position(arguments[0]).column;
         auto col_result = ColumnVector<Int32>::create();
         auto null_map = ColumnVector<UInt8>::create();
         auto& col_result_data = col_result->get_data();
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index a6cf99edca..4c579f1d91 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -85,4 +85,4 @@ private:
 };
 
 } // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/runtime/shared_scanner_controller.h 
b/be/src/vec/runtime/shared_scanner_controller.h
new file mode 100644
index 0000000000..5fb2244c82
--- /dev/null
+++ b/be/src/vec/runtime/shared_scanner_controller.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "vec/core/block.h"
+#include "vec/exec/scan/scanner_context.h"
+
+namespace doris::vectorized {
+
+class SharedScannerController {
+public:
+    std::pair<bool, int> should_build_scanner_and_queue_id(int my_node_id) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        auto it = _scanner_parallel.find(my_node_id);
+
+        if (it == _scanner_parallel.cend()) {
+            _scanner_parallel.insert({my_node_id, 0});
+            return {true, 0};
+        } else {
+            auto queue_id = it->second;
+            _scanner_parallel[my_node_id] = queue_id + 1;
+            return {false, queue_id + 1};
+        }
+    }
+
+    void set_scanner_context(int my_node_id,
+                             const std::shared_ptr<ScannerContext> 
scanner_context) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        _scanner_context.insert({my_node_id, scanner_context});
+    }
+
+    bool scanner_context_is_ready(int my_node_id) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return _scanner_context.find(my_node_id) != _scanner_context.end();
+    }
+
+    std::shared_ptr<ScannerContext> get_scanner_context(int my_node_id) {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return _scanner_context[my_node_id];
+    }
+
+private:
+    std::mutex _mutex;
+    std::map<int /*node id*/, int /*parallel*/> _scanner_parallel;
+    std::map<int /*node id*/, std::shared_ptr<ScannerContext>> 
_scanner_context;
+};
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 4623ba311e..467bdc004c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -44,6 +44,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggrega
 import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.util.JoinUtils;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -218,7 +219,9 @@ public class ChildOutputPropertyDeriver extends 
PlanVisitor<PhysicalProperties,
     @Override
     public PhysicalProperties visitPhysicalOlapScan(PhysicalOlapScan olapScan, 
PlanContext context) {
         // TODO: find a better way to handle both tablet num == 1 and colocate 
table together in future
-        if (!olapScan.getTable().isColocateTable() && 
olapScan.getScanTabletNum() == 1) {
+        if (!olapScan.getTable().isColocateTable() && 
olapScan.getScanTabletNum() == 1
+                && 
(!ConnectContext.get().getSessionVariable().enablePipelineEngine()
+                        || 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() == 1)) {
             return PhysicalProperties.GATHER;
         } else if (olapScan.getDistributionSpec() instanceof 
DistributionSpecHash) {
             return PhysicalProperties.createHash((DistributionSpecHash) 
olapScan.getDistributionSpec());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index d53697a338..1821b55829 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -1118,6 +1118,9 @@ public class OlapScanNode extends ScanNode {
 
     @Override
     public int getNumInstances() {
+        if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
+            return 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+        }
         return result.size();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index d35bcec8c0..18b7ae9cab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1622,6 +1622,7 @@ public class Coordinator {
                 
bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(),
                         parallelExecInstanceNum, params);
             } else {
+                params.sharedScanOpt = true;
                 // case A
                 for (Entry<TNetworkAddress, Map<Integer, 
List<TScanRangeParams>>> entry : fragmentExecParamsMap.get(
                         
fragment.getFragmentId()).scanRangeAssignment.entrySet()) {
@@ -1630,13 +1631,22 @@ public class Coordinator {
 
                     for (Integer planNodeId : value.keySet()) {
                         List<TScanRangeParams> perNodeScanRanges = 
value.get(planNodeId);
-                        int expectedInstanceNum = 1;
-                        if (parallelExecInstanceNum > 1) {
-                            //the scan instance num should not larger than the 
tablets num
-                            expectedInstanceNum = 
Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
+                        List<List<TScanRangeParams>> perInstanceScanRanges = 
Lists.newArrayList();
+                        if (!enablePipelineEngine) {
+                            int expectedInstanceNum = 1;
+                            if (parallelExecInstanceNum > 1) {
+                                //the scan instance num should not larger than 
the tablets num
+                                expectedInstanceNum = 
Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
+                            }
+                            perInstanceScanRanges = 
ListUtil.splitBySize(perNodeScanRanges,
+                                    expectedInstanceNum);
+                        } else {
+                            int expectedInstanceNum = 
Math.min(parallelExecInstanceNum,
+                                    leftMostNode.getNumInstances());
+                            for (int j = 0; j < Math.max(expectedInstanceNum, 
1); j++) {
+                                perInstanceScanRanges.add(perNodeScanRanges);
+                            }
                         }
-                        List<List<TScanRangeParams>> perInstanceScanRanges = 
ListUtil.splitBySize(perNodeScanRanges,
-                                expectedInstanceNum);
 
                         LOG.debug("scan range number per instance is: {}", 
perInstanceScanRanges.size());
 
@@ -3034,6 +3044,8 @@ public class Coordinator {
         public List<FInstanceExecParam> instanceExecParams = 
Lists.newArrayList();
         public FragmentScanRangeAssignment scanRangeAssignment = new 
FragmentScanRangeAssignment();
 
+        public boolean sharedScanOpt = false;
+
         public FragmentExecParams(PlanFragment fragment) {
             this.fragment = fragment;
         }
@@ -3125,6 +3137,7 @@ public class Coordinator {
                             
fragment.isTransferQueryStatisticsWithEveryBatch());
                     params.setFragment(fragment.toThrift());
                     params.setLocalParams(Lists.newArrayList());
+                    params.setSharedScanOpt(sharedScanOpt);
                     res.put(instanceExecParam.host, params);
                 }
                 TPipelineFragmentParams params = 
res.get(instanceExecParam.host);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index ad471791ea..8dfd08a05e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1593,6 +1593,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setCodegenLevel(codegenLevel);
         tResult.setBeExecVersion(Config.be_exec_version);
         tResult.setEnablePipelineEngine(enablePipelineEngine);
+        tResult.setParallelInstance(parallelExecInstanceNum);
         tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
         
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
         
tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index cafcf1044d..d799b28945 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -201,9 +201,12 @@ struct TQueryOptions {
 
   // For debug purpose, skip delete bitmap when reading data
   63: optional bool skip_delete_bitmap = false
+
   64: optional bool dry_run_query = false
 
   65: optional bool enable_common_expr_pushdown = false;
+
+  66: optional i32 parallel_instance = 1
 }
     
 
@@ -598,6 +601,7 @@ struct TPipelineFragmentParams {
   22: optional TGlobalDict global_dict  // scan node could use the global dict 
to encode the string value to an integer
   23: optional Planner.TPlanFragment fragment
   24: list<TPipelineInstanceParams> local_params
+  25: optional bool shared_scan_opt = false;
 }
 
 struct TPipelineFragmentParamsList {
diff --git a/regression-test/data/nereids_syntax_p0/grouping_sets.out 
b/regression-test/data/nereids_syntax_p0/grouping_sets.out
index 6c18f54e2b..bbc2997e3c 100644
--- a/regression-test/data/nereids_syntax_p0/grouping_sets.out
+++ b/regression-test/data/nereids_syntax_p0/grouping_sets.out
@@ -159,16 +159,16 @@
 4      3       18
 
 -- !select3 --
-\N     \N      24
 \N     \N      6
+\N     \N      24
 \N     1       1
 \N     2       3
 \N     3       4
 \N     4       2
 \N     6       5
 \N     9       3
-1      \N      24
 1      \N      6
+1      \N      24
 1      1       1
 1      2       3
 1      3       4
@@ -216,13 +216,13 @@
 
 -- !select7 --
 1
-2
-3
-4
 1
 2
+2
+3
 3
 4
+4
 
 -- !select1 --
 a      1
@@ -258,3 +258,4 @@ all 1
 2      1
 2      1
 2      2
+
diff --git a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out 
b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out
index 80640f7372..b7e57d2613 100644
--- a/regression-test/data/nereids_syntax_p0/sub_query_correlated.out
+++ b/regression-test/data/nereids_syntax_p0/sub_query_correlated.out
@@ -167,23 +167,23 @@
 24     4
 
 -- !in_subquery_with_order --
-1      3
 1      2
+1      3
 2      5
-3      3
 20     2
 22     3
 24     4
+3      3
 
 -- !exists_subquery_with_order --
-1      3
 1      2
+1      3
 2      4
-3      4
-3      3
 20     2
 22     3
 24     4
+3      3
+3      4
 
 -- !scalar_subquery_with_limit --
 20     2
@@ -200,13 +200,13 @@
 20
 
 -- !case_when_subquery --
-4.0
-4.0
 20.0
 20.0
 20.0
 20.0
 20.0
+4.0
+4.0
 
 -- !in --
 1      2
@@ -244,106 +244,106 @@
 3      4
 
 -- !hash_join_with_other_conjuncts1 --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
+2      5
 3      3
+3      4
 
 -- !hash_join_with_other_conjuncts2 --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
+2      5
 3      3
+3      4
 
 -- !hash_join_with_other_conjuncts3 --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
+2      5
 3      3
+3      4
 
 -- !hash_join_with_other_conjuncts4 --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
+2      5
 3      3
+3      4
 
 -- !same_subquery_in_conjuncts --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
+2      5
 3      3
+3      4
 
 -- !two_subquery_in_one_conjuncts --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
+2      5
 3      3
+3      4
 
 -- !multi_subquery_in_and_scalry --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
+2      5
 3      3
+3      4
 
 -- !multi_subquery_in_and_exist --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
+2      5
 3      3
+3      4
 
 -- !multi_subquery_in_and_exist_sum --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
-3      3
+2      5
 20     2
 22     3
 24     4
+3      3
+3      4
 
 -- !multi_subquery_in_and_in --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
+2      5
 3      3
+3      4
 
 -- !multi_subquery_scalar_and_exist --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
-3      3
+2      5
 20     2
 22     3
 24     4
+3      3
+3      4
 
 -- !multi_subquery_scalar_and_scalar --
-1      3
 1      2
-2      5
+1      3
 2      4
-3      4
+2      5
 3      3
+3      4
 
 -- !multi_subquery_in_first_or_in_and_in --
 3      3
diff --git 
a/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out 
b/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out
index 2b66c921cb..91b55d0c89 100644
--- a/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out
+++ b/regression-test/data/nereids_syntax_p0/sub_query_diff_old_optimize.out
@@ -1,21 +1,21 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
 -- !exists_subquery_with_limit --
-1      3
 1      2
+1      3
 2      4
-3      4
-3      3
 20     2
 22     3
 24     4
+3      3
+3      4
 
 -- !exists_subquery_with_order_and_limit --
-1      3
 1      2
+1      3
 2      4
-3      4
-3      3
 20     2
 22     3
 24     4
+3      3
+3      4
 
diff --git a/regression-test/suites/correctness_p0/test_colocate_join.groovy 
b/regression-test/suites/correctness_p0/test_colocate_join.groovy
index 45e4e57bd3..e8bd7206b8 100644
--- a/regression-test/suites/correctness_p0/test_colocate_join.groovy
+++ b/regression-test/suites/correctness_p0/test_colocate_join.groovy
@@ -161,7 +161,7 @@ suite("test_colocate_join") {
             (20220101, 101, 202, 200, 100);"""
 
     explain {
-        sql("select " +
+        sql("select /*+SET_VAR(parallel_fragment_exec_instance_num=1)*/ " +
                 " sum_col1,sum_col2 " +
                 "from " +
                 "(select datekey,sum(sum_col1) as sum_col1  from 
test_query_colocate where datekey=20220101 group by datekey) t1 " +
diff --git 
a/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy
 
b/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy
index dd1eb207b4..cfbef017db 100644
--- 
a/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy
+++ 
b/regression-test/suites/data_model_p0/duplicate/storage/test_dup_tab_char.groovy
@@ -59,4 +59,4 @@ PROPERTIES (
 
     sql "drop table if exists ${table1}"
 
-}
\ No newline at end of file
+}
diff --git a/regression-test/suites/nereids_function_p0/gen_function/gen.groovy 
b/regression-test/suites/nereids_function_p0/gen_function/gen.groovy
index 22b97ff220..4ca054ab25 100644
--- a/regression-test/suites/nereids_function_p0/gen_function/gen.groovy
+++ b/regression-test/suites/nereids_function_p0/gen_function/gen.groovy
@@ -49,4 +49,4 @@ suite("nereids_gen_fn") {
        qt_sql_explode_split_outer_Varchar_Varchar_notnull '''
                select id, e from fn_test lateral view explode_split_outer('a, 
b, c, d', ',') lv as e order by id, e'''
 
-}
\ No newline at end of file
+}
diff --git a/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy 
b/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy
index 8e6cc6e5c7..d5298d154e 100644
--- a/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy
+++ b/regression-test/suites/nereids_syntax_p0/grouping_sets.groovy
@@ -121,7 +121,7 @@ suite("test_nereids_grouping_sets") {
                  rollup(k1_, k2) order by k1_, k2
                """
 
-    qt_select3 "select 1 as k, k3, sum(k1) from groupingSetsTable group by 
cube(k, k3) order by k, k3"
+    qt_select3 "select 1 as k, k3, sum(k1) as sum_k1 from groupingSetsTable 
group by cube(k, k3) order by k, k3, sum_k1"
 
     qt_select4 """
                  select k2, concat(k5, k6) as k_concat, sum(k1) from 
groupingSetsTable group by
@@ -230,7 +230,7 @@ suite("test_nereids_grouping_sets") {
                 from
                 grouping_sum_table
             ) T
-        ) T2;
+        ) T2 order by a;
     """
 
     order_qt_select1 """
diff --git a/regression-test/suites/nereids_syntax_p0/set_operation.groovy 
b/regression-test/suites/nereids_syntax_p0/set_operation.groovy
index 2e06f2aa78..c6838f9986 100644
--- a/regression-test/suites/nereids_syntax_p0/set_operation.groovy
+++ b/regression-test/suites/nereids_syntax_p0/set_operation.groovy
@@ -227,7 +227,7 @@ suite("test_nereids_set_operation") {
     }
     qt_union39 """(select  k1 from setOperationTable order by k1) union all 
(select k1 from setOperationTableNotNullable order by k1) order by k1;"""
 
-    qt_union40 """
+    order_qt_union40 """
         SELECT k1 FROM setOperationTable WHERE k2 = 2 
         INTERSECT 
         SELECT k1 FROM setOperationTable WHERE k1 = 1 
@@ -235,7 +235,7 @@ suite("test_nereids_set_operation") {
         SELECT k1 FROM setOperationTable WHERE k3 = 2
     """
 
-    qt_union41 """
+    order_qt_union41 """
     SELECT k1 FROM setOperationTable WHERE k2 = 1
     EXCEPT
     SELECT k1 FROM setOperationTable WHERE k3 = 2
@@ -245,7 +245,7 @@ suite("test_nereids_set_operation") {
     SELECT k1 FROM setOperationTable WHERE k2 > 0)
     """
 
-    qt_union42 """
+    order_qt_union42 """
     SELECT k1 FROM setOperationTable WHERE k2 = 1
     EXCEPT
     SELECT k1 FROM setOperationTable WHERE k3 = 2
diff --git 
a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy 
b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy
index 637e53223e..1d153405ea 100644
--- a/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy
+++ b/regression-test/suites/nereids_syntax_p0/sub_query_correlated.groovy
@@ -236,35 +236,35 @@ suite ("sub_query_correlated") {
     """*/
 
     //----------subquery with order----------
-    qt_scalar_subquery_with_order """
+    order_qt_scalar_subquery_with_order """
         select * from sub_query_correlated_subquery1 where 
sub_query_correlated_subquery1.k1 > (select 
sum(sub_query_correlated_subquery3.k3) a from sub_query_correlated_subquery3 
where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 
order by a);
     """
 
-    qt_in_subquery_with_order """
+    order_qt_in_subquery_with_order """
         select * from sub_query_correlated_subquery1 where 
sub_query_correlated_subquery1.k1 not in (select 
sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where 
sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by 
k2);
     """
 
-    qt_exists_subquery_with_order """
+    order_qt_exists_subquery_with_order """
         select * from sub_query_correlated_subquery1 where exists (select 
sub_query_correlated_subquery3.k3 from sub_query_correlated_subquery3 where 
sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 order by 
k2);
     """
 
     //----------subquery with limit----------
-    qt_scalar_subquery_with_limit """
+    order_qt_scalar_subquery_with_limit """
         select * from sub_query_correlated_subquery1 where 
sub_query_correlated_subquery1.k1 > (select 
sum(sub_query_correlated_subquery3.k3) a from sub_query_correlated_subquery3 
where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 
limit 1);
     """
 
     //----------subquery with order and limit----------
-    qt_scalar_subquery_with_order_and_limit """
+    order_qt_scalar_subquery_with_order_and_limit """
         select * from sub_query_correlated_subquery1 where 
sub_query_correlated_subquery1.k1 > (select 
sum(sub_query_correlated_subquery3.k3) a from sub_query_correlated_subquery3 
where sub_query_correlated_subquery3.v2 = sub_query_correlated_subquery1.k2 
order by a limit 1);
     """
 
     //---------subquery with Disjunctions-------------
-    qt_scalar_subquery_with_disjunctions """
+       order_qt_scalar_subquery_with_disjunctions """
         SELECT DISTINCT k1 FROM sub_query_correlated_subquery1 i1 WHERE 
((SELECT count(*) FROM sub_query_correlated_subquery1 WHERE ((k1 = i1.k1) AND 
(k2 = 2)) or ((k1 = i1.k1) AND (k2 = 1)) )  > 0);
     """
 
     //--------subquery case when-----------
-    qt_case_when_subquery """
+    order_qt_case_when_subquery """
         SELECT CASE
             WHEN (
                 SELECT COUNT(*) / 2
@@ -298,86 +298,86 @@ suite ("sub_query_correlated") {
         SELECT * FROM sub_query_correlated_subquery1 WHERE EXISTS (SELECT k1 
FROM sub_query_correlated_subquery3 WHERE k1 > 10) OR k1 < 10;
     """
 
-    qt_hash_join_with_other_conjuncts1 """
+    order_qt_hash_join_with_other_conjuncts1 """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 
FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 > 
sub_query_correlated_subquery3.k3) OR k1 < 10 ORDER BY k1;
     """
 
-    qt_hash_join_with_other_conjuncts2 """
+    order_qt_hash_join_with_other_conjuncts2 """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 
FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 < 
sub_query_correlated_subquery3.k3) OR k1 < 10 ORDER BY k1;
     """
 
-    qt_hash_join_with_other_conjuncts3 """
+    order_qt_hash_join_with_other_conjuncts3 """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 
FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 > 
sub_query_correlated_subquery3.k3) OR k1 < 11 ORDER BY k1;
     """
 
-    qt_hash_join_with_other_conjuncts4 """
+    order_qt_hash_join_with_other_conjuncts4 """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 
FROM sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 < 
sub_query_correlated_subquery3.k3) OR k1 < 11 ORDER BY k1;
     """
 
-    qt_same_subquery_in_conjuncts """
+    order_qt_same_subquery_in_conjuncts """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 
FROM sub_query_correlated_subquery3) OR k1 IN (SELECT k1 FROM 
sub_query_correlated_subquery3) OR k1 < 10 ORDER BY k1;
     """
 
-    qt_two_subquery_in_one_conjuncts """
+    order_qt_two_subquery_in_one_conjuncts """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 
FROM sub_query_correlated_subquery3) OR k1 IN (SELECT k3 FROM 
sub_query_correlated_subquery3) OR k1 < 10 ORDER BY k1;
     """
 
-    qt_multi_subquery_in_and_scalry """
+    order_qt_multi_subquery_in_and_scalry """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = 
sub_query_correlated_subquery3.k1)
                                                      OR k1 < (SELECT sum(k1) 
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = 
sub_query_correlated_subquery3.v1)
                                                      OR k1 < 10 ORDER BY k1;
     """
 
-    qt_multi_subquery_in_and_exist """
+    order_qt_multi_subquery_in_and_exist """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = 
sub_query_correlated_subquery3.k1)
                                                      OR exists (SELECT k1 FROM 
sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = 
sub_query_correlated_subquery3.v1)
                                                      OR k1 < 10 ORDER BY k1;
     """
 
-    qt_multi_subquery_in_and_exist_sum """
+    order_qt_multi_subquery_in_and_exist_sum """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = 
sub_query_correlated_subquery3.k1)
                                                      OR exists (SELECT sum(k1) 
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = 
sub_query_correlated_subquery3.v1)
                                                      OR k1 < 10 ORDER BY k1;
     """
 
-    qt_multi_subquery_in_and_in """
+    order_qt_multi_subquery_in_and_in """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 IN (SELECT k1 
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = 
sub_query_correlated_subquery3.k1)
                                                      OR k2 in (SELECT k2 FROM 
sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = 
sub_query_correlated_subquery3.v1)
                                                      OR k1 < 10 ORDER BY k1;
     """
 
-    qt_multi_subquery_scalar_and_exist """
+    order_qt_multi_subquery_scalar_and_exist """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 < (SELECT 
sum(k1) FROM sub_query_correlated_subquery3 where 
sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1)
                                                      OR exists (SELECT sum(k1) 
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = 
sub_query_correlated_subquery3.v1)
                                                      OR k1 < 10 ORDER BY k1;
     """
 
-    qt_multi_subquery_scalar_and_scalar """
+    order_qt_multi_subquery_scalar_and_scalar """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 < (SELECT 
sum(k1) FROM sub_query_correlated_subquery3 where 
sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1)
                                                      OR k2 < (SELECT sum(k1) 
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k1 = 
sub_query_correlated_subquery3.v1)
                                                      OR k1 < 10 ORDER BY k1;
     """
 
-    qt_multi_subquery_in_first_or_in_and_in """
+    order_qt_multi_subquery_in_first_or_in_and_in """
         SELECT * FROM sub_query_correlated_subquery1 WHERE (k1 in (SELECT k2 
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = 
sub_query_correlated_subquery3.k1) 
                                                                 or k2 in 
(SELECT k1 FROM sub_query_correlated_subquery3 where 
sub_query_correlated_subquery1.k2 = sub_query_correlated_subquery3.k1))
                                                             and k1 in (SELECT 
k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 
= sub_query_correlated_subquery3.k1)
     """
 
-    qt_multi_subquery_in_second_or_in_and_in """
+    order_qt_multi_subquery_in_second_or_in_and_in """
         SELECT * FROM sub_query_correlated_subquery1 WHERE k1 in (SELECT k2 
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = 
sub_query_correlated_subquery3.k1) 
                                                            or k2 in (SELECT k1 
FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 = 
sub_query_correlated_subquery3.k1)
                                                            and k1 in (SELECT 
k1 FROM sub_query_correlated_subquery3 where sub_query_correlated_subquery1.k2 
= sub_query_correlated_subquery3.k1)
     """
 
-    qt_multi_subquery_scalar_and_in_or_scalar_and_exists_agg """
+    order_qt_multi_subquery_scalar_and_in_or_scalar_and_exists_agg """
         SELECT * FROM sub_query_correlated_subquery1 WHERE ((k1 != (SELECT 
sum(k1) FROM sub_query_correlated_subquery3) and k1 = 1 OR k1 < 10) and k1 = 10 
and k1 = 15)
                                         and (k1 IN (SELECT k1 FROM 
sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 = 
sub_query_correlated_subquery3.k1)
                                              OR k1 < (SELECT sum(k1) FROM 
sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 = 
sub_query_correlated_subquery3.k1))
                                         and exists (SELECT sum(k1) FROM 
sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 = 
sub_query_correlated_subquery3.k1);
     """
 
-    qt_multi_subquery_scalar_and_in_or_scalar_and_exists """
+    order_qt_multi_subquery_scalar_and_in_or_scalar_and_exists """
         SELECT * FROM sub_query_correlated_subquery1 WHERE ((k1 != (SELECT 
sum(k1) FROM sub_query_correlated_subquery3) and k1 = 1 OR k1 < 10) and k1 = 10 
and k1 = 15)
                                         and (k1 IN (SELECT k1 FROM 
sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 = 
sub_query_correlated_subquery3.k1)
                                              OR k1 < (SELECT sum(k1) FROM 
sub_query_correlated_subquery3 WHERE sub_query_correlated_subquery1.k1 = 
sub_query_correlated_subquery3.k1))
diff --git 
a/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy 
b/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy
index 4ee579f839..6d5bc11a5a 100644
--- 
a/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy
+++ 
b/regression-test/suites/nereids_syntax_p0/sub_query_diff_old_optimize.groovy
@@ -159,7 +159,7 @@ suite ("sub_query_diff_old_optimize") {
     """*/
 
     //----------subquery with limit----------
-    qt_exists_subquery_with_limit """
+    order_qt_exists_subquery_with_limit """
         select * from sub_query_diff_old_optimize_subquery1 where exists 
(select sub_query_diff_old_optimize_subquery3.k3 from 
sub_query_diff_old_optimize_subquery3 where 
sub_query_diff_old_optimize_subquery3.v2 = 
sub_query_diff_old_optimize_subquery1.k2 limit 1);
     """
 
@@ -172,7 +172,7 @@ suite ("sub_query_diff_old_optimize") {
     }
 
     //----------subquery with order and limit-------
-    qt_exists_subquery_with_order_and_limit """
+    order_qt_exists_subquery_with_order_and_limit """
         select * from sub_query_diff_old_optimize_subquery1 where exists 
(select sub_query_diff_old_optimize_subquery3.k3 from 
sub_query_diff_old_optimize_subquery3 where 
sub_query_diff_old_optimize_subquery3.v2 = 
sub_query_diff_old_optimize_subquery1.k2 order by k1 limit 1);
     """
 
diff --git a/regression-test/suites/performance_p0/redundant_conjuncts.groovy 
b/regression-test/suites/performance_p0/redundant_conjuncts.groovy
index 86035612c3..4027c02aaf 100644
--- a/regression-test/suites/performance_p0/redundant_conjuncts.groovy
+++ b/regression-test/suites/performance_p0/redundant_conjuncts.groovy
@@ -32,10 +32,10 @@ suite("redundant_conjuncts") {
     """
     
     qt_redundant_conjuncts """
-    EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=2) */ v1 
FROM redundant_conjuncts WHERE k1 = 1 AND k1 = 1;
+    EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=2, 
parallel_fragment_exec_instance_num = 1) */ v1 FROM redundant_conjuncts WHERE 
k1 = 1 AND k1 = 1;
     """
 
     qt_redundant_conjuncts_gnerated_by_extract_common_filter """
-    EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=100) */ v1 
FROM redundant_conjuncts WHERE k1 = 1 OR k1 = 2;
+    EXPLAIN SELECT /*+SET_VAR(REWRITE_OR_TO_IN_PREDICATE_THRESHOLD=100, 
parallel_fragment_exec_instance_num = 1) */ v1 FROM redundant_conjuncts WHERE 
k1 = 1 OR k1 = 2;
     """
 }
diff --git 
a/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy
 
b/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy
index eadab28672..87becfe020 100644
--- 
a/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy
+++ 
b/regression-test/suites/query_p0/sql_functions/conditional_functions/test_query_limit.groovy
@@ -73,15 +73,15 @@ suite("test_query_limit", "query,p0") {
     qt_limit16 "select * from (select * from ${tableName} order by k1, k2, k3, 
k4 limit 1, 2) a limit 2, 2"
     qt_limit17 "select * from (select * from ${tableName} order by k1, k2, k3, 
k4 limit 1, 2) a limit 2, 3"
     test {
-        sql "select * from ${tableName} limit 1, 10"
+        sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=4)*/ * from 
${tableName} limit 1, 10"
         rowNum 2
     }
     test {
-        sql "select * from ${tableName} limit 2, 10"
+        sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=4)*/ * from 
${tableName} limit 2, 10"
         rowNum 1
     }
     test {
-        sql "select * from ${tableName} limit 3, 10"
+        sql "select /*+SET_VAR(parallel_fragment_exec_instance_num=4)*/ * from 
${tableName} limit 3, 10"
         rowNum 0
     }
 }
diff --git 
a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
 
b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
index 0e517f1d7e..181f597849 100644
--- 
a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
+++ 
b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_fn.groovy
@@ -123,7 +123,7 @@ suite("test_window_fn") {
         select first_value(salary) over(order by enroll_date range between 
unbounded preceding and UNBOUNDED following), last_value(salary) over(order by 
enroll_date range between unbounded preceding and UNBOUNDED following), salary, 
enroll_date from ${tbName1} order by salary, enroll_date;
     """
     qt_sql """
-        SELECT first_value(ten) OVER (PARTITION BY four ORDER BY ten), ten, 
four FROM ${tbName2} WHERE unique2 < 10;
+        SELECT first_value(ten) OVER (PARTITION BY four ORDER BY ten), ten, 
four FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
     """
     qt_sql """
         SELECT first_value(unique1) over (order by four range between current 
row and unbounded following),  
@@ -190,7 +190,7 @@ suite("test_window_fn") {
     """
     qt_sql """
         SELECT depname, empno, salary, rank() OVER (PARTITION BY depname ORDER 
BY salary, empno) 
-        FROM ${tbName1} ORDER BY rank() OVER (PARTITION BY depname ORDER BY 
salary, empno);
+        FROM ${tbName1} ORDER BY rank() OVER (PARTITION BY depname ORDER BY 
salary, empno), depname;
     """
     qt_sql """
         SELECT sum(salary) as s, row_number() OVER (ORDER BY depname)  as r, 
sum(sum(salary)) OVER (ORDER BY depname DESC) as ss 
@@ -207,10 +207,10 @@ suite("test_window_fn") {
         SELECT row_number() OVER (ORDER BY unique2) FROM ${tbName2} WHERE 
unique2 < 10;
     """
     qt_sql """
-        SELECT rank() OVER (PARTITION BY four ORDER BY ten) AS rank_1, ten, 
four FROM ${tbName2} WHERE unique2 < 10;
+        SELECT rank() OVER (PARTITION BY four ORDER BY ten) AS rank_1, ten, 
four FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
     """
     qt_sql """
-        SELECT dense_rank() OVER (PARTITION BY four ORDER BY ten), ten, four 
FROM ${tbName2} WHERE unique2 < 10;
+        SELECT dense_rank() OVER (PARTITION BY four ORDER BY ten), ten, four 
FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
     """
     qt_sql """
         select ten,   sum(unique1) + sum(unique2) as res,   rank() over (order 
by sum(unique1) + sum(unique2)) as rank from ${tbName2} group by ten order by 
ten;
@@ -256,7 +256,7 @@ suite("test_window_fn") {
         SELECT count(1) OVER (PARTITION BY four) as c, four FROM (SELECT * 
FROM ${tbName2} WHERE two = 1)s WHERE unique2 < 10 order by c, four;
     """
     qt_sql """
-        SELECT avg(four) OVER (PARTITION BY four ORDER BY thousand / 100) FROM 
${tbName2} WHERE unique2 < 10;
+        SELECT avg(four) OVER (PARTITION BY four ORDER BY thousand / 100) FROM 
${tbName2} WHERE unique2 < 10 order by four;
     """
     qt_sql """
         SELECT count(1) OVER (PARTITION BY four) FROM (SELECT * FROM 
${tbName2} WHERE FALSE)s;
@@ -278,19 +278,19 @@ suite("test_window_fn") {
 
     // lag
     qt_sql """
-        SELECT lag(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, four 
FROM ${tbName2} WHERE unique2 < 10;
+        SELECT lag(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, four 
FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
     """
 
 
     // lead
     qt_sql """
-        SELECT lead(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, 
four FROM ${tbName2} WHERE unique2 < 10;
+        SELECT lead(ten, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, 
four FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
     """
     qt_sql """
-        SELECT lead(ten * 2, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, 
four FROM ${tbName2} WHERE unique2 < 10;
+        SELECT lead(ten * 2, 1, 0) OVER (PARTITION BY four ORDER BY ten), ten, 
four FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
     """
     qt_sql """
-        SELECT lead(ten * 2, 1, -1) OVER (PARTITION BY four ORDER BY ten), 
ten, four FROM ${tbName2} WHERE unique2 < 10;
+        SELECT lead(ten * 2, 1, -1) OVER (PARTITION BY four ORDER BY ten), 
ten, four FROM ${tbName2} WHERE unique2 < 10 order by four, ten;
     """
 
 
@@ -353,7 +353,7 @@ suite("test_window_fn") {
     qt_sql_window_last_value """
         select u_id, u_city, u_salary,
         last_value(u_salary) over (partition by u_city order by u_id rows 
between unbounded preceding and 1 preceding) last_value_test
-        from example_window_tb;
+        from example_window_tb order by u_id;
     """
 
     sql "DROP TABLE IF EXISTS example_window_tb;"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to